Overview
Data integration addresses the fundamental challenge of working with data distributed across multiple systems, databases, file formats, and organizational boundaries. Modern applications rarely operate in isolation; they must consume data from external APIs, synchronize with partner systems, aggregate information from legacy databases, and process files in various formats. Data integration provides the technical foundation for these operations.
The scope of data integration extends beyond simple data transfer. It includes schema mapping, format transformation, data quality enforcement, conflict resolution, and maintaining referential integrity across systems. A typical integration might extract customer records from a legacy SQL database, combine them with clickstream data from a web analytics platform, enrich the dataset with third-party demographic information, transform everything into a common schema, and load it into a data warehouse for analysis.
Data integration operates at multiple levels: file-based batch processing, database-to-database replication, API-based real-time synchronization, and message queue-driven event streaming. Each approach serves different requirements for latency, consistency, and data volume. The choice depends on factors including data freshness requirements, source system capabilities, network constraints, and processing complexity.
# Simple file-based data integration example
require 'csv'
require 'json'
# Extract from CSV source
csv_data = CSV.read('customers.csv', headers: true)
# Transform and combine with API data
integrated_data = csv_data.map do |row|
{
id: row['customer_id'],
name: row['full_name'],
email: row['email'],
# Enrich with external data
location: fetch_location_data(row['zip_code']),
segment: calculate_segment(row)
}
end
# Load to JSON destination
File.write('integrated_customers.json', JSON.pretty_generate(integrated_data))
The complexity of data integration grows with the number of sources, the variety of formats, and the requirements for data quality and consistency. Production systems often integrate dozens of sources, handle millions of records, and maintain complex transformation rules that evolve with business requirements.
Key Principles
Data integration operates on several fundamental principles that determine system architecture and implementation approach.
Source-Target Paradigm: Every integration involves one or more source systems providing data and one or more target systems consuming data. Sources may be databases, APIs, file systems, message queues, or streaming platforms. Targets follow similar patterns. The integration layer sits between sources and targets, managing extraction, transformation, and loading operations.
Schema Mapping: Different systems represent data using different schemas, naming conventions, and data types. Schema mapping defines how fields in the source correspond to fields in the target. This includes handling mismatches like combining multiple source fields into a single target field, splitting single fields into multiple targets, or applying business rules during mapping.
# Schema mapping configuration
SCHEMA_MAP = {
'customer_id' => { target: 'id', type: :integer },
'first_name' => { target: 'given_name', type: :string },
'last_name' => { target: 'family_name', type: :string },
'dob' => { target: 'birth_date', type: :date, format: '%Y-%m-%d' },
'account_balance' => { target: 'balance_cents', transform: ->(v) { (v.to_f * 100).to_i } }
}
def map_record(source_record, schema_map)
schema_map.each_with_object({}) do |(source_field, config), result|
value = source_record[source_field]
value = config[:transform].call(value) if config[:transform]
result[config[:target]] = convert_type(value, config[:type])
end
end
Data Quality and Validation: Integration processes must validate data at multiple stages. Source data may be incomplete, incorrectly formatted, or violate business rules. The integration layer enforces quality through validation rules, handles missing data, detects duplicates, and ensures referential integrity. Failed validations require error handling strategies: reject records, quarantine for manual review, apply default values, or log warnings while proceeding.
Idempotency: Integration processes often need to run repeatedly, potentially processing the same source data multiple times due to failures, retries, or scheduled reruns. Idempotent operations produce the same result regardless of how many times they execute. This requires tracking which data has been processed, using upsert operations instead of inserts, and implementing proper conflict resolution.
Incremental vs Full Load: Full load processing extracts all data from sources each time, while incremental processing handles only new or changed records since the last run. Incremental processing reduces processing time and resource usage but requires tracking state and handling deletions. Full loads simplify logic but scale poorly with data volume.
class IncrementalIntegration
def initialize(source, target, state_store)
@source = source
@target = target
@state_store = state_store
end
def run
last_processed = @state_store.get('last_processed_timestamp')
# Extract only new/modified records
new_records = @source.fetch_since(last_processed)
# Transform and load
new_records.each do |record|
transformed = transform(record)
@target.upsert(transformed)
end
# Update state
@state_store.set('last_processed_timestamp', Time.now)
end
end
Consistency and Transactions: Integration processes must handle consistency requirements across systems. Strong consistency ensures all systems reflect the same state, while eventual consistency allows temporary divergence. Transactional integration commits changes atomically across sources and targets, rolling back on failure. Non-transactional approaches handle failures through compensation or retry logic.
Change Data Capture: CDC tracks changes at the source system level, capturing inserts, updates, and deletes as they occur. This provides a more accurate and efficient alternative to periodic polling. CDC implementations range from database triggers to log-based replication to application-level event publishing.
Implementation Approaches
Data integration implementations follow several established patterns, each optimized for different scenarios.
ETL (Extract, Transform, Load): The traditional approach extracts data from sources, transforms it in an intermediate processing layer, and loads results into targets. Transformation occurs outside source and target systems, providing isolation and flexibility. ETL suits batch processing scenarios where data undergoes complex transformations or combines multiple sources before loading.
ETL systems typically run on scheduled intervals, processing accumulated changes. This introduces latency between source changes and target updates but allows for complex aggregations, enrichment from multiple sources, and data quality checks. The transformation layer can be a dedicated server, a data processing framework, or distributed compute cluster depending on scale.
class ETLPipeline
def initialize(sources, transformations, targets)
@sources = sources
@transformations = transformations
@targets = targets
end
def execute
# Extract phase
raw_data = @sources.flat_map(&:extract)
# Transform phase - occurs outside source/target systems
transformed_data = raw_data
.select { |record| valid_record?(record) }
.map { |record| normalize(record) }
.group_by { |record| record[:entity_type] }
.transform_values { |records| aggregate(records) }
# Load phase
@targets.each do |target|
target.load(transformed_data[target.entity_type])
end
end
private
def normalize(record)
@transformations.reduce(record) do |result, transformation|
transformation.apply(result)
end
end
end
ELT (Extract, Load, Transform): ELT loads raw data directly into the target system before transformation. Modern data warehouses with powerful compute capabilities perform transformations using SQL or warehouse-native tools. ELT reduces the complexity of the integration layer but requires targets capable of handling raw data and performing transformations.
ELT works well when targets are analytical databases optimized for bulk operations and complex queries. Raw data preservation allows reprocessing with different transformation logic without re-extracting from sources. The approach shifts transformation costs from integration infrastructure to the target system.
Real-Time Streaming: Streaming integration processes data as events occur, minimizing latency between source changes and target updates. Implementations use message queues, event streams, or change data capture to detect and propagate changes immediately. Streaming suits scenarios requiring low latency, real-time analytics, or event-driven architectures.
Streaming architectures introduce complexity around state management, exactly-once processing guarantees, and handling late-arriving or out-of-order events. The trade-off between latency and complexity drives the choice between batch and streaming approaches.
require 'kafka'
class StreamingIntegration
def initialize(kafka_brokers, topic)
@consumer = Kafka.new(kafka_brokers).consumer(group_id: 'integration-worker')
@consumer.subscribe(topic)
end
def process_stream
@consumer.each_message do |message|
event = JSON.parse(message.value)
# Transform immediately
transformed = transform_event(event)
# Load to target with minimal latency
target_system.write(transformed)
# Commit offset for exactly-once processing
@consumer.commit_offsets
end
rescue Kafka::ProcessingError => e
handle_stream_error(e, message)
end
end
API-Based Integration: Modern systems expose APIs for data access, enabling integration through HTTP requests. API integration polls endpoints periodically or receives webhook callbacks when data changes. This approach handles systems without direct database access or file system exposure.
API integration must handle rate limiting, authentication, pagination, and error responses. Implementations balance polling frequency against API quotas and cost. Webhooks provide lower latency but require infrastructure to receive and process callbacks reliably.
Database Replication: Direct database-to-database replication copies data between systems using database-native mechanisms. This includes logical replication, binary log streaming, or database links. Replication provides low latency and high throughput but tightly couples systems and limits transformation capabilities.
Ruby Implementation
Ruby provides multiple approaches for implementing data integration, from simple file processing to sophisticated data pipeline frameworks.
File-Based Integration: Ruby's standard library handles common file formats. CSV, JSON, and XML processing cover most file-based integration scenarios. For Excel files, the creek or roo gems provide streaming and random access respectively.
require 'csv'
require 'json'
require 'nokogiri'
class FileIntegration
def process_csv(input_path, output_path)
CSV.open(output_path, 'w') do |output|
CSV.foreach(input_path, headers: true) do |row|
transformed = {
'id' => row['customer_id'],
'full_name' => "#{row['first_name']} #{row['last_name']}",
'email' => row['email']&.downcase,
'created' => parse_date(row['signup_date'])
}
output << transformed.values
end
end
end
def integrate_xml_to_json(xml_path, json_path)
doc = Nokogiri::XML(File.read(xml_path))
records = doc.xpath('//customer').map do |node|
{
id: node['id'].to_i,
name: node.at_xpath('name').text,
orders: node.xpath('orders/order').map { |o| o['id'].to_i }
}
end
File.write(json_path, JSON.pretty_generate(records))
end
end
Database Integration: The sequel gem provides a database toolkit for integration work. Unlike ActiveRecord, Sequel focuses on direct SQL operations and supports a wider range of databases. It handles connection pooling, query building, and transactions across multiple databases.
require 'sequel'
class DatabaseIntegration
def initialize(source_db_url, target_db_url)
@source = Sequel.connect(source_db_url)
@target = Sequel.connect(target_db_url)
end
def sync_customers
# Extract from source with pagination
@source[:customers].order(:id).each_page(1000) do |page|
# Transform
transformed = page.map do |row|
{
customer_id: row[:id],
email: row[:email],
full_name: "#{row[:first_name]} #{row[:last_name]}",
status: normalize_status(row[:status]),
updated_at: Time.now
}
end
# Load to target using upsert for idempotency
@target[:integrated_customers].multi_insert(
transformed,
on_duplicate_key_update: {
email: Sequel[:values][:email],
full_name: Sequel[:values][:full_name],
status: Sequel[:values][:status],
updated_at: Sequel[:values][:updated_at]
}
)
end
end
def incremental_sync(last_sync_time)
# Fetch only records modified since last sync
new_records = @source[:customers]
.where { updated_at > last_sync_time }
.all
new_records.each do |record|
@target[:integrated_customers].insert_conflict(
target: :customer_id,
update: {
email: record[:email],
updated_at: record[:updated_at]
}
).insert(transform(record))
end
end
end
API Integration: The faraday gem provides a flexible HTTP client for API integration. It supports middleware for authentication, retries, logging, and response parsing. For webhook receivers, Sinatra or Rails provides HTTP endpoints.
require 'faraday'
require 'faraday/retry'
class APIIntegration
def initialize(api_url, api_key)
@client = Faraday.new(url: api_url) do |conn|
conn.request :json
conn.request :retry, max: 3, interval: 0.5, backoff_factor: 2
conn.response :json
conn.headers['Authorization'] = "Bearer #{api_key}"
conn.adapter Faraday.default_adapter
end
end
def fetch_and_integrate(endpoint, target_db)
page = 1
loop do
response = @client.get(endpoint, page: page, per_page: 100)
break if response.body['data'].empty?
records = response.body['data'].map { |item| transform_api_record(item) }
target_db[:api_imports].multi_insert(records)
page += 1
# Respect rate limits
sleep 0.5
end
end
def process_webhook(payload)
event_type = payload['type']
case event_type
when 'customer.created'
handle_customer_creation(payload['data'])
when 'customer.updated'
handle_customer_update(payload['data'])
when 'customer.deleted'
handle_customer_deletion(payload['data'])
end
rescue StandardError => e
# Log error but return 200 to prevent webhook retries
ErrorLogger.log(e, context: { payload: payload })
end
end
Message Queue Integration: The bunny gem integrates with RabbitMQ for message-based integration. The ruby-kafka gem handles Kafka streams. These tools enable asynchronous, decoupled integration patterns.
require 'bunny'
class MessageQueueIntegration
def initialize(rabbitmq_url)
@connection = Bunny.new(rabbitmq_url)
@connection.start
@channel = @connection.create_channel
end
def publish_changes(source_db)
exchange = @channel.topic('data.changes', durable: true)
# Detect changes using timestamp
source_db[:customers].where { updated_at > 1.hour.ago }.each do |record|
message = {
entity: 'customer',
action: 'update',
data: record,
timestamp: Time.now.iso8601
}
routing_key = "customer.update.#{record[:region]}"
exchange.publish(
message.to_json,
routing_key: routing_key,
persistent: true
)
end
end
def consume_and_integrate(queue_name, target_db)
queue = @channel.queue(queue_name, durable: true)
queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, payload|
message = JSON.parse(payload)
begin
transformed = transform_message(message)
target_db[:integrated_data].insert(transformed)
# Acknowledge successful processing
@channel.ack(delivery_info.delivery_tag)
rescue StandardError => e
# Reject and requeue on failure
@channel.nack(delivery_info.delivery_tag, false, true)
ErrorLogger.log(e, context: { message: message })
end
end
end
end
Data Pipeline Framework: For complex workflows, the kiba gem provides an ETL framework. It defines sources, transforms, and destinations as Ruby code, handling the pipeline execution and error management.
require 'kiba'
require 'kiba-common/sources/csv'
require 'kiba-common/destinations/csv'
# Define data pipeline
job = Kiba.parse do
source Kiba::Common::Sources::CSV,
filename: 'source.csv',
csv_options: { headers: true, header_converters: :symbol }
transform do |row|
row[:email] = row[:email].downcase
row[:full_name] = "#{row[:first_name]} #{row[:last_name]}"
row[:signup_date] = Date.parse(row[:signup_date])
row
end
transform do |row|
next unless valid_email?(row[:email])
next if row[:status] == 'deleted'
row
end
transform do |row|
# Enrich with external data
row[:location_data] = LocationService.lookup(row[:zip_code])
row
end
destination Kiba::Common::Destinations::CSV,
filename: 'output.csv',
csv_options: { headers: :first_row }
end
# Execute pipeline
Kiba.run(job)
Tools & Ecosystem
Ruby's data integration ecosystem includes general-purpose libraries and specialized tools for common integration patterns.
Database Tools: The sequel gem handles most database integration needs. It connects to PostgreSQL, MySQL, SQLite, Oracle, SQL Server, and others through unified APIs. The pg gem provides direct PostgreSQL access with COPY operations for bulk loading. The mysql2 gem offers similar capabilities for MySQL.
File Processing: Standard library modules handle CSV, JSON, and YAML. The nokogiri gem processes XML and HTML. For Excel files, creek streams large files with low memory usage, while roo provides random access. The rubyzip gem extracts and creates ZIP archives. The parquet gem reads Parquet files common in big data systems.
API Clients: The faraday gem provides flexible HTTP clients with middleware support. The httparty gem offers simpler syntax for straightforward API calls. The rest-client gem handles basic REST operations. For OAuth authentication, the oauth2 gem implements the protocol. The graphql-client gem consumes GraphQL APIs.
Message Queues: The bunny gem connects to RabbitMQ. The ruby-kafka gem handles Kafka producers and consumers with support for exactly-once semantics. The aws-sdk-sqs gem integrates with Amazon SQS. The google-cloud-pubsub gem connects to Google Cloud Pub/Sub.
ETL Frameworks: The kiba gem provides lightweight ETL pipelines. It defines sources, transformations, and destinations as Ruby code, with built-in error handling and logging. Kiba handles common patterns like batching, buffering, and checkpointing.
Data Quality: The activerecord-import gem bulk imports validated ActiveRecord objects. The data_cleaner gem detects and fixes common data quality issues. Custom validation typically combines Ruby's built-in validation with domain-specific rules.
Scheduling: The rufus-scheduler gem runs integration jobs on schedules within Ruby processes. For distributed scheduling, systems integrate with external schedulers like cron, Kubernetes CronJobs, or workflow engines like Apache Airflow.
require 'rufus-scheduler'
scheduler = Rufus::Scheduler.new
# Run integration every hour
scheduler.every '1h' do
DatabaseIntegration.new(source_url, target_url).sync_customers
end
# Run daily at specific time
scheduler.cron '0 2 * * *' do
FileIntegration.new.process_daily_batch
end
scheduler.join
Monitoring: The statsd-ruby gem sends metrics to StatsD for monitoring integration performance. The prometheus-client gem exposes metrics for Prometheus. The sentry-ruby gem captures and reports errors.
Cloud Platform Integration: Cloud providers offer SDKs for their services. The aws-sdk-s3 gem handles S3 file operations. The google-cloud-storage gem provides similar capabilities for GCS. The azure-storage-blob gem connects to Azure Blob Storage. These SDKs include retry logic, authentication, and multipart upload capabilities.
Performance Considerations
Data integration performance depends on throughput, latency, and resource utilization. Different bottlenecks require different optimization strategies.
Batching: Processing records individually incurs overhead from network round trips, transaction commits, and function calls. Batching groups multiple records into single operations, reducing overhead. Database bulk inserts, API batch endpoints, and file buffering all benefit from batching.
class BatchIntegration
BATCH_SIZE = 1000
def integrate_with_batching(source, target)
source.each_slice(BATCH_SIZE) do |batch|
transformed = batch.map { |record| transform(record) }
# Single database round trip for entire batch
target.multi_insert(transformed)
end
end
# Comparison: individual inserts
def integrate_without_batching(source, target)
source.each do |record|
transformed = transform(record)
# Separate round trip for each record
target.insert(transformed)
end
end
end
Optimal batch size balances throughput against memory usage and transaction duration. Small batches increase overhead; large batches consume memory and extend transaction locks. Testing under realistic conditions determines the right size for each scenario.
Parallel Processing: Ruby's Global Interpreter Lock limits CPU parallelism within a single process, but I/O-bound integration tasks benefit from concurrency. Multiple threads handle network requests, database queries, or file I/O simultaneously while one thread waits on I/O, others execute.
require 'concurrent-ruby'
class ParallelIntegration
def integrate_concurrent(sources, target)
pool = Concurrent::FixedThreadPool.new(10)
futures = []
sources.each do |source|
futures << Concurrent::Future.execute(executor: pool) do
data = source.extract
transformed = data.map { |record| transform(record) }
target.load(transformed)
end
end
# Wait for all futures to complete
futures.each(&:value)
pool.shutdown
pool.wait_for_termination
end
end
For CPU-intensive transformations, process-based parallelism overcomes the GIL. The parallel gem spawns worker processes that perform independent transformations. This approach multiplies memory usage since each process loads data and code.
Streaming vs Loading: Streaming processes data incrementally, keeping memory usage constant regardless of dataset size. Loading accumulates data in memory before processing, enabling operations that require full dataset access but consuming memory proportional to data size.
# Streaming approach - constant memory
def stream_large_file(input_path, output_path)
CSV.open(output_path, 'w') do |output|
CSV.foreach(input_path, headers: true) do |row|
# Process one row at a time
transformed = transform(row)
output << transformed.values
end
end
end
# Loading approach - memory scales with file size
def load_large_file(input_path)
# Loads entire file into memory
data = CSV.read(input_path, headers: true)
# Enables operations requiring full dataset
data.sort_by { |row| row['timestamp'] }
.group_by { |row| row['category'] }
.transform_values { |rows| aggregate(rows) }
end
Connection Pooling: Database connections have establishment overhead and connection limits. Connection pools maintain reusable connections, eliminating per-query establishment costs. Pool size balances concurrency against database connection limits.
Indexing: Target system indexes accelerate lookups during upsert operations. Integration processes performing updates or checking for duplicates benefit significantly from indexes on key columns. However, indexes slow down bulk inserts; some systems drop indexes before bulk loading and rebuild afterward.
Compression: Compressing data before transmission reduces network time when bandwidth limits throughput. CPU overhead from compression must be less than time saved on transmission. Compression works best for text data over slow networks.
Change Data Capture Performance: CDC implementations vary widely in overhead. Log-based CDC reads database transaction logs with minimal source impact. Trigger-based CDC adds overhead to every source modification. Polling-based approaches impact the source with repeated queries but avoid installation of CDC components.
Common Patterns
Data integration implementations follow recurring patterns that address common challenges.
Upsert Pattern: Upsert combines insert and update operations, inserting new records and updating existing ones based on a key. This pattern ensures idempotency, allowing integration processes to run repeatedly without creating duplicates.
def upsert_pattern(source_records, target_db)
source_records.each_slice(1000) do |batch|
# Using PostgreSQL ON CONFLICT syntax
target_db[:customers].insert_conflict(
target: :customer_id, # Key column
update: {
name: Sequel[:excluded][:name],
email: Sequel[:excluded][:email],
updated_at: Sequel[:excluded][:updated_at]
}
).multi_insert(batch)
end
end
# Alternative using manual check
def manual_upsert(record, target_db)
existing = target_db[:customers].where(customer_id: record[:customer_id]).first
if existing
target_db[:customers]
.where(customer_id: record[:customer_id])
.update(record)
else
target_db[:customers].insert(record)
end
end
Staging Table Pattern: Complex integrations load raw data into staging tables before final transformation and loading. This separates extraction from transformation, enables data quality checks, and provides rollback points. Staging tables hold temporary data cleared after successful integration.
def staging_table_integration(source, target_db)
# Load raw data to staging
target_db[:staging_customers].delete # Clear staging
raw_data = source.extract
target_db[:staging_customers].multi_insert(raw_data)
# Validate staged data
invalid_records = target_db[:staging_customers]
.where { email !~ /@/ }
.or { name.is_null }
if invalid_records.count > 0
raise "Found #{invalid_records.count} invalid records"
end
# Transform from staging to final table
target_db.transaction do
target_db.run <<~SQL
INSERT INTO customers (customer_id, name, email, status)
SELECT customer_id,
UPPER(name),
LOWER(email),
CASE
WHEN last_order_date > NOW() - INTERVAL '6 months'
THEN 'active'
ELSE 'inactive'
END
FROM staging_customers
ON CONFLICT (customer_id) DO UPDATE
SET name = EXCLUDED.name,
email = EXCLUDED.email,
status = EXCLUDED.status
SQL
end
ensure
target_db[:staging_customers].delete
end
Slowly Changing Dimension: Historical tracking maintains multiple versions of records to track changes over time. Type 1 overwrites old values, Type 2 creates new rows with effective date ranges, and Type 3 adds columns for previous values.
# Type 2 SCD - maintains history
def update_with_history(new_record, target_db)
current = target_db[:customer_history]
.where(customer_id: new_record[:customer_id])
.where(end_date: nil)
.first
if current && current[:email] != new_record[:email]
# Close current record
target_db[:customer_history]
.where(id: current[:id])
.update(end_date: Date.today)
# Insert new version
target_db[:customer_history].insert(
customer_id: new_record[:customer_id],
email: new_record[:email],
start_date: Date.today,
end_date: nil
)
elsif !current
# First version
target_db[:customer_history].insert(
customer_id: new_record[:customer_id],
email: new_record[:email],
start_date: Date.today,
end_date: nil
)
end
end
Checkpoint Pattern: Long-running integrations save progress periodically, enabling resume from the last checkpoint after failures. Checkpoints store the last successfully processed record ID or timestamp.
class CheckpointedIntegration
def initialize(source, target, checkpoint_store)
@source = source
@target = target
@checkpoint_store = checkpoint_store
end
def run
last_id = @checkpoint_store.get('last_processed_id') || 0
@source.fetch_after(last_id).each_slice(1000) do |batch|
transformed = batch.map { |record| transform(record) }
@target.load(transformed)
# Save checkpoint after each successful batch
last_id = batch.last[:id]
@checkpoint_store.set('last_processed_id', last_id)
end
rescue StandardError => e
ErrorLogger.log(e, context: { last_checkpoint: last_id })
raise
end
end
Fan-Out Pattern: A single source fans out to multiple targets. This pattern distributes data across systems, maintains read replicas, or feeds multiple downstream processes. Fan-out implementations must handle partial failures where some targets succeed while others fail.
def fan_out_integration(source, targets)
source.extract.each_slice(1000) do |batch|
transformed = batch.map { |record| transform(record) }
results = targets.map do |target|
begin
target.load(transformed)
{ target: target.name, status: :success }
rescue StandardError => e
{ target: target.name, status: :failed, error: e.message }
end
end
# Log partial failures
failures = results.select { |r| r[:status] == :failed }
if failures.any?
PartialFailureLogger.log(failures)
end
end
end
Lookup Table Pattern: Enrichment operations frequently reference dimension tables to add descriptive attributes. Caching lookup tables in memory reduces database queries for repeated lookups.
class LookupEnrichment
def initialize(target_db)
@target_db = target_db
# Cache lookup tables on initialization
@product_lookup = build_product_lookup
@location_lookup = build_location_lookup
end
def enrich_record(record)
record.merge(
product_name: @product_lookup[record[:product_id]],
location: @location_lookup[record[:zip_code]]
)
end
private
def build_product_lookup
@target_db[:products].select(:id, :name).to_hash(:id, :name)
end
def build_location_lookup
@target_db[:locations].select(:zip_code, :city, :state)
.to_hash(:zip_code, [:city, :state])
end
end
Reference
Integration Patterns Comparison
| Pattern | Use Case | Latency | Complexity | Failure Handling |
|---|---|---|---|---|
| ETL Batch | Complex transformations, multiple sources | Hours to minutes | High | Retry entire batch |
| ELT | Raw data preservation, warehouse analytics | Hours to minutes | Medium | Reprocess with new logic |
| Real-time Streaming | Event-driven, low latency requirements | Milliseconds | High | Dead letter queues |
| API Polling | Third-party data, rate limits | Minutes | Low | Exponential backoff |
| Database Replication | Low latency, minimal transformation | Seconds | Low | Automatic failover |
| File Transfer | Periodic bulk data, legacy systems | Hours | Low | Manual reprocessing |
Data Quality Rules
| Validation Type | Implementation | Action on Failure |
|---|---|---|
| Format Validation | Regex patterns, type checking | Reject record |
| Range Validation | Numeric bounds, date ranges | Reject or default value |
| Referential Integrity | Foreign key lookups | Reject or defer loading |
| Duplicate Detection | Key matching, fuzzy matching | Keep newest or manual review |
| Completeness Check | Required field validation | Reject record |
| Cross-field Validation | Business rule enforcement | Reject record |
Common Data Type Mappings
| Source Type | Ruby Type | Target Database Type | Transformation Consideration |
|---|---|---|---|
| String | String | VARCHAR, TEXT | Encoding, length limits |
| Integer | Integer | INTEGER, BIGINT | Range overflow |
| Decimal | BigDecimal | NUMERIC, DECIMAL | Precision preservation |
| Date | Date | DATE | Format parsing, timezone |
| Timestamp | Time | TIMESTAMP | Timezone conversion |
| Boolean | TrueClass/FalseClass | BOOLEAN | String representations |
| JSON | Hash | JSON, JSONB | Serialization format |
| Binary | String (binary encoding) | BYTEA, BLOB | Encoding handling |
Performance Optimization Checklist
| Optimization | Impact | Implementation Effort | When to Apply |
|---|---|---|---|
| Batching | High | Low | Always for bulk operations |
| Indexing | High | Low | Upsert operations, lookups |
| Connection Pooling | Medium | Low | Multiple concurrent queries |
| Parallel Processing | High | Medium | Multiple independent sources |
| Compression | Medium | Low | Network-bound transfers |
| Streaming | High | Medium | Memory-limited environments |
| Caching | High | Medium | Repeated lookups |
| Incremental Loading | High | Medium | Large datasets, frequent runs |
Idempotency Strategies
| Strategy | Implementation | Use Case |
|---|---|---|
| Upsert Operations | ON CONFLICT, MERGE | Record-level updates |
| Idempotency Keys | UUID tracking | API requests |
| Timestamp Comparison | Track last modified time | Incremental updates |
| State Tables | Record processing status | Multi-step workflows |
| Transaction Isolation | Database SERIALIZABLE level | Financial transactions |
| Deduplication | Hash or key-based filtering | Event streams |
Error Handling Strategies
| Strategy | Description | Example Implementation |
|---|---|---|
| Retry with Backoff | Retry failed operations with increasing delays | Exponential backoff for API calls |
| Dead Letter Queue | Route failed messages to separate queue | RabbitMQ DLQ, Kafka error topics |
| Circuit Breaker | Stop attempting failing operations temporarily | Fail fast after threshold |
| Compensation | Reverse successfully completed steps | Saga pattern for distributed transactions |
| Quarantine | Isolate problematic records for review | Move to error table |
| Skip and Log | Continue processing, log failures | Non-critical validations |
Ruby Integration Libraries
| Library | Purpose | Key Features |
|---|---|---|
| sequel | Database toolkit | Multi-database, connection pooling, transactions |
| faraday | HTTP client | Middleware, retries, adapters |
| bunny | RabbitMQ client | Message acknowledgment, routing |
| ruby-kafka | Kafka client | Consumer groups, exactly-once semantics |
| kiba | ETL framework | Pipeline definition, error handling |
| nokogiri | XML/HTML parsing | XPath, CSS selectors, streaming |
| creek | Excel streaming | Low memory, large files |
| aws-sdk-s3 | S3 integration | Multipart upload, presigned URLs |
| parallel | Process parallelism | Fork-based, thread-based modes |
Monitoring Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
| Records Processed | Count of successful integrations | Below expected rate |
| Processing Duration | Time to complete integration | Exceeds SLA |
| Error Rate | Percentage of failed records | Above acceptable threshold |
| Data Latency | Time between source update and target availability | Exceeds freshness requirement |
| Queue Depth | Pending messages in processing queue | Indicates backlog |
| Resource Utilization | CPU, memory, disk usage | Approaching limits |
| API Rate Limit Usage | Percentage of rate limit consumed | Above safety margin |