CrackedRuby CrackedRuby

Overview

Data integration addresses the fundamental challenge of working with data distributed across multiple systems, databases, file formats, and organizational boundaries. Modern applications rarely operate in isolation; they must consume data from external APIs, synchronize with partner systems, aggregate information from legacy databases, and process files in various formats. Data integration provides the technical foundation for these operations.

The scope of data integration extends beyond simple data transfer. It includes schema mapping, format transformation, data quality enforcement, conflict resolution, and maintaining referential integrity across systems. A typical integration might extract customer records from a legacy SQL database, combine them with clickstream data from a web analytics platform, enrich the dataset with third-party demographic information, transform everything into a common schema, and load it into a data warehouse for analysis.

Data integration operates at multiple levels: file-based batch processing, database-to-database replication, API-based real-time synchronization, and message queue-driven event streaming. Each approach serves different requirements for latency, consistency, and data volume. The choice depends on factors including data freshness requirements, source system capabilities, network constraints, and processing complexity.

# Simple file-based data integration example
require 'csv'
require 'json'

# Extract from CSV source
csv_data = CSV.read('customers.csv', headers: true)

# Transform and combine with API data
integrated_data = csv_data.map do |row|
  {
    id: row['customer_id'],
    name: row['full_name'],
    email: row['email'],
    # Enrich with external data
    location: fetch_location_data(row['zip_code']),
    segment: calculate_segment(row)
  }
end

# Load to JSON destination
File.write('integrated_customers.json', JSON.pretty_generate(integrated_data))

The complexity of data integration grows with the number of sources, the variety of formats, and the requirements for data quality and consistency. Production systems often integrate dozens of sources, handle millions of records, and maintain complex transformation rules that evolve with business requirements.

Key Principles

Data integration operates on several fundamental principles that determine system architecture and implementation approach.

Source-Target Paradigm: Every integration involves one or more source systems providing data and one or more target systems consuming data. Sources may be databases, APIs, file systems, message queues, or streaming platforms. Targets follow similar patterns. The integration layer sits between sources and targets, managing extraction, transformation, and loading operations.

Schema Mapping: Different systems represent data using different schemas, naming conventions, and data types. Schema mapping defines how fields in the source correspond to fields in the target. This includes handling mismatches like combining multiple source fields into a single target field, splitting single fields into multiple targets, or applying business rules during mapping.

# Schema mapping configuration
SCHEMA_MAP = {
  'customer_id' => { target: 'id', type: :integer },
  'first_name' => { target: 'given_name', type: :string },
  'last_name' => { target: 'family_name', type: :string },
  'dob' => { target: 'birth_date', type: :date, format: '%Y-%m-%d' },
  'account_balance' => { target: 'balance_cents', transform: ->(v) { (v.to_f * 100).to_i } }
}

def map_record(source_record, schema_map)
  schema_map.each_with_object({}) do |(source_field, config), result|
    value = source_record[source_field]
    value = config[:transform].call(value) if config[:transform]
    result[config[:target]] = convert_type(value, config[:type])
  end
end

Data Quality and Validation: Integration processes must validate data at multiple stages. Source data may be incomplete, incorrectly formatted, or violate business rules. The integration layer enforces quality through validation rules, handles missing data, detects duplicates, and ensures referential integrity. Failed validations require error handling strategies: reject records, quarantine for manual review, apply default values, or log warnings while proceeding.

Idempotency: Integration processes often need to run repeatedly, potentially processing the same source data multiple times due to failures, retries, or scheduled reruns. Idempotent operations produce the same result regardless of how many times they execute. This requires tracking which data has been processed, using upsert operations instead of inserts, and implementing proper conflict resolution.

Incremental vs Full Load: Full load processing extracts all data from sources each time, while incremental processing handles only new or changed records since the last run. Incremental processing reduces processing time and resource usage but requires tracking state and handling deletions. Full loads simplify logic but scale poorly with data volume.

class IncrementalIntegration
  def initialize(source, target, state_store)
    @source = source
    @target = target
    @state_store = state_store
  end

  def run
    last_processed = @state_store.get('last_processed_timestamp')
    
    # Extract only new/modified records
    new_records = @source.fetch_since(last_processed)
    
    # Transform and load
    new_records.each do |record|
      transformed = transform(record)
      @target.upsert(transformed)
    end
    
    # Update state
    @state_store.set('last_processed_timestamp', Time.now)
  end
end

Consistency and Transactions: Integration processes must handle consistency requirements across systems. Strong consistency ensures all systems reflect the same state, while eventual consistency allows temporary divergence. Transactional integration commits changes atomically across sources and targets, rolling back on failure. Non-transactional approaches handle failures through compensation or retry logic.

Change Data Capture: CDC tracks changes at the source system level, capturing inserts, updates, and deletes as they occur. This provides a more accurate and efficient alternative to periodic polling. CDC implementations range from database triggers to log-based replication to application-level event publishing.

Implementation Approaches

Data integration implementations follow several established patterns, each optimized for different scenarios.

ETL (Extract, Transform, Load): The traditional approach extracts data from sources, transforms it in an intermediate processing layer, and loads results into targets. Transformation occurs outside source and target systems, providing isolation and flexibility. ETL suits batch processing scenarios where data undergoes complex transformations or combines multiple sources before loading.

ETL systems typically run on scheduled intervals, processing accumulated changes. This introduces latency between source changes and target updates but allows for complex aggregations, enrichment from multiple sources, and data quality checks. The transformation layer can be a dedicated server, a data processing framework, or distributed compute cluster depending on scale.

class ETLPipeline
  def initialize(sources, transformations, targets)
    @sources = sources
    @transformations = transformations
    @targets = targets
  end

  def execute
    # Extract phase
    raw_data = @sources.flat_map(&:extract)
    
    # Transform phase - occurs outside source/target systems
    transformed_data = raw_data
      .select { |record| valid_record?(record) }
      .map { |record| normalize(record) }
      .group_by { |record| record[:entity_type] }
      .transform_values { |records| aggregate(records) }
    
    # Load phase
    @targets.each do |target|
      target.load(transformed_data[target.entity_type])
    end
  end

  private

  def normalize(record)
    @transformations.reduce(record) do |result, transformation|
      transformation.apply(result)
    end
  end
end

ELT (Extract, Load, Transform): ELT loads raw data directly into the target system before transformation. Modern data warehouses with powerful compute capabilities perform transformations using SQL or warehouse-native tools. ELT reduces the complexity of the integration layer but requires targets capable of handling raw data and performing transformations.

ELT works well when targets are analytical databases optimized for bulk operations and complex queries. Raw data preservation allows reprocessing with different transformation logic without re-extracting from sources. The approach shifts transformation costs from integration infrastructure to the target system.

Real-Time Streaming: Streaming integration processes data as events occur, minimizing latency between source changes and target updates. Implementations use message queues, event streams, or change data capture to detect and propagate changes immediately. Streaming suits scenarios requiring low latency, real-time analytics, or event-driven architectures.

Streaming architectures introduce complexity around state management, exactly-once processing guarantees, and handling late-arriving or out-of-order events. The trade-off between latency and complexity drives the choice between batch and streaming approaches.

require 'kafka'

class StreamingIntegration
  def initialize(kafka_brokers, topic)
    @consumer = Kafka.new(kafka_brokers).consumer(group_id: 'integration-worker')
    @consumer.subscribe(topic)
  end

  def process_stream
    @consumer.each_message do |message|
      event = JSON.parse(message.value)
      
      # Transform immediately
      transformed = transform_event(event)
      
      # Load to target with minimal latency
      target_system.write(transformed)
      
      # Commit offset for exactly-once processing
      @consumer.commit_offsets
    end
  rescue Kafka::ProcessingError => e
    handle_stream_error(e, message)
  end
end

API-Based Integration: Modern systems expose APIs for data access, enabling integration through HTTP requests. API integration polls endpoints periodically or receives webhook callbacks when data changes. This approach handles systems without direct database access or file system exposure.

API integration must handle rate limiting, authentication, pagination, and error responses. Implementations balance polling frequency against API quotas and cost. Webhooks provide lower latency but require infrastructure to receive and process callbacks reliably.

Database Replication: Direct database-to-database replication copies data between systems using database-native mechanisms. This includes logical replication, binary log streaming, or database links. Replication provides low latency and high throughput but tightly couples systems and limits transformation capabilities.

Ruby Implementation

Ruby provides multiple approaches for implementing data integration, from simple file processing to sophisticated data pipeline frameworks.

File-Based Integration: Ruby's standard library handles common file formats. CSV, JSON, and XML processing cover most file-based integration scenarios. For Excel files, the creek or roo gems provide streaming and random access respectively.

require 'csv'
require 'json'
require 'nokogiri'

class FileIntegration
  def process_csv(input_path, output_path)
    CSV.open(output_path, 'w') do |output|
      CSV.foreach(input_path, headers: true) do |row|
        transformed = {
          'id' => row['customer_id'],
          'full_name' => "#{row['first_name']} #{row['last_name']}",
          'email' => row['email']&.downcase,
          'created' => parse_date(row['signup_date'])
        }
        output << transformed.values
      end
    end
  end

  def integrate_xml_to_json(xml_path, json_path)
    doc = Nokogiri::XML(File.read(xml_path))
    
    records = doc.xpath('//customer').map do |node|
      {
        id: node['id'].to_i,
        name: node.at_xpath('name').text,
        orders: node.xpath('orders/order').map { |o| o['id'].to_i }
      }
    end
    
    File.write(json_path, JSON.pretty_generate(records))
  end
end

Database Integration: The sequel gem provides a database toolkit for integration work. Unlike ActiveRecord, Sequel focuses on direct SQL operations and supports a wider range of databases. It handles connection pooling, query building, and transactions across multiple databases.

require 'sequel'

class DatabaseIntegration
  def initialize(source_db_url, target_db_url)
    @source = Sequel.connect(source_db_url)
    @target = Sequel.connect(target_db_url)
  end

  def sync_customers
    # Extract from source with pagination
    @source[:customers].order(:id).each_page(1000) do |page|
      # Transform
      transformed = page.map do |row|
        {
          customer_id: row[:id],
          email: row[:email],
          full_name: "#{row[:first_name]} #{row[:last_name]}",
          status: normalize_status(row[:status]),
          updated_at: Time.now
        }
      end
      
      # Load to target using upsert for idempotency
      @target[:integrated_customers].multi_insert(
        transformed,
        on_duplicate_key_update: {
          email: Sequel[:values][:email],
          full_name: Sequel[:values][:full_name],
          status: Sequel[:values][:status],
          updated_at: Sequel[:values][:updated_at]
        }
      )
    end
  end

  def incremental_sync(last_sync_time)
    # Fetch only records modified since last sync
    new_records = @source[:customers]
      .where { updated_at > last_sync_time }
      .all
    
    new_records.each do |record|
      @target[:integrated_customers].insert_conflict(
        target: :customer_id,
        update: { 
          email: record[:email],
          updated_at: record[:updated_at]
        }
      ).insert(transform(record))
    end
  end
end

API Integration: The faraday gem provides a flexible HTTP client for API integration. It supports middleware for authentication, retries, logging, and response parsing. For webhook receivers, Sinatra or Rails provides HTTP endpoints.

require 'faraday'
require 'faraday/retry'

class APIIntegration
  def initialize(api_url, api_key)
    @client = Faraday.new(url: api_url) do |conn|
      conn.request :json
      conn.request :retry, max: 3, interval: 0.5, backoff_factor: 2
      conn.response :json
      conn.headers['Authorization'] = "Bearer #{api_key}"
      conn.adapter Faraday.default_adapter
    end
  end

  def fetch_and_integrate(endpoint, target_db)
    page = 1
    loop do
      response = @client.get(endpoint, page: page, per_page: 100)
      
      break if response.body['data'].empty?
      
      records = response.body['data'].map { |item| transform_api_record(item) }
      target_db[:api_imports].multi_insert(records)
      
      page += 1
      
      # Respect rate limits
      sleep 0.5
    end
  end

  def process_webhook(payload)
    event_type = payload['type']
    
    case event_type
    when 'customer.created'
      handle_customer_creation(payload['data'])
    when 'customer.updated'
      handle_customer_update(payload['data'])
    when 'customer.deleted'
      handle_customer_deletion(payload['data'])
    end
  rescue StandardError => e
    # Log error but return 200 to prevent webhook retries
    ErrorLogger.log(e, context: { payload: payload })
  end
end

Message Queue Integration: The bunny gem integrates with RabbitMQ for message-based integration. The ruby-kafka gem handles Kafka streams. These tools enable asynchronous, decoupled integration patterns.

require 'bunny'

class MessageQueueIntegration
  def initialize(rabbitmq_url)
    @connection = Bunny.new(rabbitmq_url)
    @connection.start
    @channel = @connection.create_channel
  end

  def publish_changes(source_db)
    exchange = @channel.topic('data.changes', durable: true)
    
    # Detect changes using timestamp
    source_db[:customers].where { updated_at > 1.hour.ago }.each do |record|
      message = {
        entity: 'customer',
        action: 'update',
        data: record,
        timestamp: Time.now.iso8601
      }
      
      routing_key = "customer.update.#{record[:region]}"
      exchange.publish(
        message.to_json,
        routing_key: routing_key,
        persistent: true
      )
    end
  end

  def consume_and_integrate(queue_name, target_db)
    queue = @channel.queue(queue_name, durable: true)
    
    queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, payload|
      message = JSON.parse(payload)
      
      begin
        transformed = transform_message(message)
        target_db[:integrated_data].insert(transformed)
        
        # Acknowledge successful processing
        @channel.ack(delivery_info.delivery_tag)
      rescue StandardError => e
        # Reject and requeue on failure
        @channel.nack(delivery_info.delivery_tag, false, true)
        ErrorLogger.log(e, context: { message: message })
      end
    end
  end
end

Data Pipeline Framework: For complex workflows, the kiba gem provides an ETL framework. It defines sources, transforms, and destinations as Ruby code, handling the pipeline execution and error management.

require 'kiba'
require 'kiba-common/sources/csv'
require 'kiba-common/destinations/csv'

# Define data pipeline
job = Kiba.parse do
  source Kiba::Common::Sources::CSV,
    filename: 'source.csv',
    csv_options: { headers: true, header_converters: :symbol }

  transform do |row|
    row[:email] = row[:email].downcase
    row[:full_name] = "#{row[:first_name]} #{row[:last_name]}"
    row[:signup_date] = Date.parse(row[:signup_date])
    row
  end

  transform do |row|
    next unless valid_email?(row[:email])
    next if row[:status] == 'deleted'
    row
  end

  transform do |row|
    # Enrich with external data
    row[:location_data] = LocationService.lookup(row[:zip_code])
    row
  end

  destination Kiba::Common::Destinations::CSV,
    filename: 'output.csv',
    csv_options: { headers: :first_row }
end

# Execute pipeline
Kiba.run(job)

Tools & Ecosystem

Ruby's data integration ecosystem includes general-purpose libraries and specialized tools for common integration patterns.

Database Tools: The sequel gem handles most database integration needs. It connects to PostgreSQL, MySQL, SQLite, Oracle, SQL Server, and others through unified APIs. The pg gem provides direct PostgreSQL access with COPY operations for bulk loading. The mysql2 gem offers similar capabilities for MySQL.

File Processing: Standard library modules handle CSV, JSON, and YAML. The nokogiri gem processes XML and HTML. For Excel files, creek streams large files with low memory usage, while roo provides random access. The rubyzip gem extracts and creates ZIP archives. The parquet gem reads Parquet files common in big data systems.

API Clients: The faraday gem provides flexible HTTP clients with middleware support. The httparty gem offers simpler syntax for straightforward API calls. The rest-client gem handles basic REST operations. For OAuth authentication, the oauth2 gem implements the protocol. The graphql-client gem consumes GraphQL APIs.

Message Queues: The bunny gem connects to RabbitMQ. The ruby-kafka gem handles Kafka producers and consumers with support for exactly-once semantics. The aws-sdk-sqs gem integrates with Amazon SQS. The google-cloud-pubsub gem connects to Google Cloud Pub/Sub.

ETL Frameworks: The kiba gem provides lightweight ETL pipelines. It defines sources, transformations, and destinations as Ruby code, with built-in error handling and logging. Kiba handles common patterns like batching, buffering, and checkpointing.

Data Quality: The activerecord-import gem bulk imports validated ActiveRecord objects. The data_cleaner gem detects and fixes common data quality issues. Custom validation typically combines Ruby's built-in validation with domain-specific rules.

Scheduling: The rufus-scheduler gem runs integration jobs on schedules within Ruby processes. For distributed scheduling, systems integrate with external schedulers like cron, Kubernetes CronJobs, or workflow engines like Apache Airflow.

require 'rufus-scheduler'

scheduler = Rufus::Scheduler.new

# Run integration every hour
scheduler.every '1h' do
  DatabaseIntegration.new(source_url, target_url).sync_customers
end

# Run daily at specific time
scheduler.cron '0 2 * * *' do
  FileIntegration.new.process_daily_batch
end

scheduler.join

Monitoring: The statsd-ruby gem sends metrics to StatsD for monitoring integration performance. The prometheus-client gem exposes metrics for Prometheus. The sentry-ruby gem captures and reports errors.

Cloud Platform Integration: Cloud providers offer SDKs for their services. The aws-sdk-s3 gem handles S3 file operations. The google-cloud-storage gem provides similar capabilities for GCS. The azure-storage-blob gem connects to Azure Blob Storage. These SDKs include retry logic, authentication, and multipart upload capabilities.

Performance Considerations

Data integration performance depends on throughput, latency, and resource utilization. Different bottlenecks require different optimization strategies.

Batching: Processing records individually incurs overhead from network round trips, transaction commits, and function calls. Batching groups multiple records into single operations, reducing overhead. Database bulk inserts, API batch endpoints, and file buffering all benefit from batching.

class BatchIntegration
  BATCH_SIZE = 1000

  def integrate_with_batching(source, target)
    source.each_slice(BATCH_SIZE) do |batch|
      transformed = batch.map { |record| transform(record) }
      
      # Single database round trip for entire batch
      target.multi_insert(transformed)
    end
  end

  # Comparison: individual inserts
  def integrate_without_batching(source, target)
    source.each do |record|
      transformed = transform(record)
      # Separate round trip for each record
      target.insert(transformed)
    end
  end
end

Optimal batch size balances throughput against memory usage and transaction duration. Small batches increase overhead; large batches consume memory and extend transaction locks. Testing under realistic conditions determines the right size for each scenario.

Parallel Processing: Ruby's Global Interpreter Lock limits CPU parallelism within a single process, but I/O-bound integration tasks benefit from concurrency. Multiple threads handle network requests, database queries, or file I/O simultaneously while one thread waits on I/O, others execute.

require 'concurrent-ruby'

class ParallelIntegration
  def integrate_concurrent(sources, target)
    pool = Concurrent::FixedThreadPool.new(10)
    futures = []

    sources.each do |source|
      futures << Concurrent::Future.execute(executor: pool) do
        data = source.extract
        transformed = data.map { |record| transform(record) }
        target.load(transformed)
      end
    end

    # Wait for all futures to complete
    futures.each(&:value)
    pool.shutdown
    pool.wait_for_termination
  end
end

For CPU-intensive transformations, process-based parallelism overcomes the GIL. The parallel gem spawns worker processes that perform independent transformations. This approach multiplies memory usage since each process loads data and code.

Streaming vs Loading: Streaming processes data incrementally, keeping memory usage constant regardless of dataset size. Loading accumulates data in memory before processing, enabling operations that require full dataset access but consuming memory proportional to data size.

# Streaming approach - constant memory
def stream_large_file(input_path, output_path)
  CSV.open(output_path, 'w') do |output|
    CSV.foreach(input_path, headers: true) do |row|
      # Process one row at a time
      transformed = transform(row)
      output << transformed.values
    end
  end
end

# Loading approach - memory scales with file size
def load_large_file(input_path)
  # Loads entire file into memory
  data = CSV.read(input_path, headers: true)
  
  # Enables operations requiring full dataset
  data.sort_by { |row| row['timestamp'] }
    .group_by { |row| row['category'] }
    .transform_values { |rows| aggregate(rows) }
end

Connection Pooling: Database connections have establishment overhead and connection limits. Connection pools maintain reusable connections, eliminating per-query establishment costs. Pool size balances concurrency against database connection limits.

Indexing: Target system indexes accelerate lookups during upsert operations. Integration processes performing updates or checking for duplicates benefit significantly from indexes on key columns. However, indexes slow down bulk inserts; some systems drop indexes before bulk loading and rebuild afterward.

Compression: Compressing data before transmission reduces network time when bandwidth limits throughput. CPU overhead from compression must be less than time saved on transmission. Compression works best for text data over slow networks.

Change Data Capture Performance: CDC implementations vary widely in overhead. Log-based CDC reads database transaction logs with minimal source impact. Trigger-based CDC adds overhead to every source modification. Polling-based approaches impact the source with repeated queries but avoid installation of CDC components.

Common Patterns

Data integration implementations follow recurring patterns that address common challenges.

Upsert Pattern: Upsert combines insert and update operations, inserting new records and updating existing ones based on a key. This pattern ensures idempotency, allowing integration processes to run repeatedly without creating duplicates.

def upsert_pattern(source_records, target_db)
  source_records.each_slice(1000) do |batch|
    # Using PostgreSQL ON CONFLICT syntax
    target_db[:customers].insert_conflict(
      target: :customer_id,  # Key column
      update: {
        name: Sequel[:excluded][:name],
        email: Sequel[:excluded][:email],
        updated_at: Sequel[:excluded][:updated_at]
      }
    ).multi_insert(batch)
  end
end

# Alternative using manual check
def manual_upsert(record, target_db)
  existing = target_db[:customers].where(customer_id: record[:customer_id]).first
  
  if existing
    target_db[:customers]
      .where(customer_id: record[:customer_id])
      .update(record)
  else
    target_db[:customers].insert(record)
  end
end

Staging Table Pattern: Complex integrations load raw data into staging tables before final transformation and loading. This separates extraction from transformation, enables data quality checks, and provides rollback points. Staging tables hold temporary data cleared after successful integration.

def staging_table_integration(source, target_db)
  # Load raw data to staging
  target_db[:staging_customers].delete  # Clear staging
  raw_data = source.extract
  target_db[:staging_customers].multi_insert(raw_data)
  
  # Validate staged data
  invalid_records = target_db[:staging_customers]
    .where { email !~ /@/ }
    .or { name.is_null }
  
  if invalid_records.count > 0
    raise "Found #{invalid_records.count} invalid records"
  end
  
  # Transform from staging to final table
  target_db.transaction do
    target_db.run <<~SQL
      INSERT INTO customers (customer_id, name, email, status)
      SELECT customer_id, 
             UPPER(name), 
             LOWER(email),
             CASE 
               WHEN last_order_date > NOW() - INTERVAL '6 months' 
               THEN 'active' 
               ELSE 'inactive' 
             END
      FROM staging_customers
      ON CONFLICT (customer_id) DO UPDATE
      SET name = EXCLUDED.name,
          email = EXCLUDED.email,
          status = EXCLUDED.status
    SQL
  end
ensure
  target_db[:staging_customers].delete
end

Slowly Changing Dimension: Historical tracking maintains multiple versions of records to track changes over time. Type 1 overwrites old values, Type 2 creates new rows with effective date ranges, and Type 3 adds columns for previous values.

# Type 2 SCD - maintains history
def update_with_history(new_record, target_db)
  current = target_db[:customer_history]
    .where(customer_id: new_record[:customer_id])
    .where(end_date: nil)
    .first
  
  if current && current[:email] != new_record[:email]
    # Close current record
    target_db[:customer_history]
      .where(id: current[:id])
      .update(end_date: Date.today)
    
    # Insert new version
    target_db[:customer_history].insert(
      customer_id: new_record[:customer_id],
      email: new_record[:email],
      start_date: Date.today,
      end_date: nil
    )
  elsif !current
    # First version
    target_db[:customer_history].insert(
      customer_id: new_record[:customer_id],
      email: new_record[:email],
      start_date: Date.today,
      end_date: nil
    )
  end
end

Checkpoint Pattern: Long-running integrations save progress periodically, enabling resume from the last checkpoint after failures. Checkpoints store the last successfully processed record ID or timestamp.

class CheckpointedIntegration
  def initialize(source, target, checkpoint_store)
    @source = source
    @target = target
    @checkpoint_store = checkpoint_store
  end

  def run
    last_id = @checkpoint_store.get('last_processed_id') || 0
    
    @source.fetch_after(last_id).each_slice(1000) do |batch|
      transformed = batch.map { |record| transform(record) }
      @target.load(transformed)
      
      # Save checkpoint after each successful batch
      last_id = batch.last[:id]
      @checkpoint_store.set('last_processed_id', last_id)
    end
  rescue StandardError => e
    ErrorLogger.log(e, context: { last_checkpoint: last_id })
    raise
  end
end

Fan-Out Pattern: A single source fans out to multiple targets. This pattern distributes data across systems, maintains read replicas, or feeds multiple downstream processes. Fan-out implementations must handle partial failures where some targets succeed while others fail.

def fan_out_integration(source, targets)
  source.extract.each_slice(1000) do |batch|
    transformed = batch.map { |record| transform(record) }
    
    results = targets.map do |target|
      begin
        target.load(transformed)
        { target: target.name, status: :success }
      rescue StandardError => e
        { target: target.name, status: :failed, error: e.message }
      end
    end
    
    # Log partial failures
    failures = results.select { |r| r[:status] == :failed }
    if failures.any?
      PartialFailureLogger.log(failures)
    end
  end
end

Lookup Table Pattern: Enrichment operations frequently reference dimension tables to add descriptive attributes. Caching lookup tables in memory reduces database queries for repeated lookups.

class LookupEnrichment
  def initialize(target_db)
    @target_db = target_db
    # Cache lookup tables on initialization
    @product_lookup = build_product_lookup
    @location_lookup = build_location_lookup
  end

  def enrich_record(record)
    record.merge(
      product_name: @product_lookup[record[:product_id]],
      location: @location_lookup[record[:zip_code]]
    )
  end

  private

  def build_product_lookup
    @target_db[:products].select(:id, :name).to_hash(:id, :name)
  end

  def build_location_lookup
    @target_db[:locations].select(:zip_code, :city, :state)
      .to_hash(:zip_code, [:city, :state])
  end
end

Reference

Integration Patterns Comparison

Pattern Use Case Latency Complexity Failure Handling
ETL Batch Complex transformations, multiple sources Hours to minutes High Retry entire batch
ELT Raw data preservation, warehouse analytics Hours to minutes Medium Reprocess with new logic
Real-time Streaming Event-driven, low latency requirements Milliseconds High Dead letter queues
API Polling Third-party data, rate limits Minutes Low Exponential backoff
Database Replication Low latency, minimal transformation Seconds Low Automatic failover
File Transfer Periodic bulk data, legacy systems Hours Low Manual reprocessing

Data Quality Rules

Validation Type Implementation Action on Failure
Format Validation Regex patterns, type checking Reject record
Range Validation Numeric bounds, date ranges Reject or default value
Referential Integrity Foreign key lookups Reject or defer loading
Duplicate Detection Key matching, fuzzy matching Keep newest or manual review
Completeness Check Required field validation Reject record
Cross-field Validation Business rule enforcement Reject record

Common Data Type Mappings

Source Type Ruby Type Target Database Type Transformation Consideration
String String VARCHAR, TEXT Encoding, length limits
Integer Integer INTEGER, BIGINT Range overflow
Decimal BigDecimal NUMERIC, DECIMAL Precision preservation
Date Date DATE Format parsing, timezone
Timestamp Time TIMESTAMP Timezone conversion
Boolean TrueClass/FalseClass BOOLEAN String representations
JSON Hash JSON, JSONB Serialization format
Binary String (binary encoding) BYTEA, BLOB Encoding handling

Performance Optimization Checklist

Optimization Impact Implementation Effort When to Apply
Batching High Low Always for bulk operations
Indexing High Low Upsert operations, lookups
Connection Pooling Medium Low Multiple concurrent queries
Parallel Processing High Medium Multiple independent sources
Compression Medium Low Network-bound transfers
Streaming High Medium Memory-limited environments
Caching High Medium Repeated lookups
Incremental Loading High Medium Large datasets, frequent runs

Idempotency Strategies

Strategy Implementation Use Case
Upsert Operations ON CONFLICT, MERGE Record-level updates
Idempotency Keys UUID tracking API requests
Timestamp Comparison Track last modified time Incremental updates
State Tables Record processing status Multi-step workflows
Transaction Isolation Database SERIALIZABLE level Financial transactions
Deduplication Hash or key-based filtering Event streams

Error Handling Strategies

Strategy Description Example Implementation
Retry with Backoff Retry failed operations with increasing delays Exponential backoff for API calls
Dead Letter Queue Route failed messages to separate queue RabbitMQ DLQ, Kafka error topics
Circuit Breaker Stop attempting failing operations temporarily Fail fast after threshold
Compensation Reverse successfully completed steps Saga pattern for distributed transactions
Quarantine Isolate problematic records for review Move to error table
Skip and Log Continue processing, log failures Non-critical validations

Ruby Integration Libraries

Library Purpose Key Features
sequel Database toolkit Multi-database, connection pooling, transactions
faraday HTTP client Middleware, retries, adapters
bunny RabbitMQ client Message acknowledgment, routing
ruby-kafka Kafka client Consumer groups, exactly-once semantics
kiba ETL framework Pipeline definition, error handling
nokogiri XML/HTML parsing XPath, CSS selectors, streaming
creek Excel streaming Low memory, large files
aws-sdk-s3 S3 integration Multipart upload, presigned URLs
parallel Process parallelism Fork-based, thread-based modes

Monitoring Metrics

Metric Description Alert Threshold
Records Processed Count of successful integrations Below expected rate
Processing Duration Time to complete integration Exceeds SLA
Error Rate Percentage of failed records Above acceptable threshold
Data Latency Time between source update and target availability Exceeds freshness requirement
Queue Depth Pending messages in processing queue Indicates backlog
Resource Utilization CPU, memory, disk usage Approaching limits
API Rate Limit Usage Percentage of rate limit consumed Above safety margin