Overview
Data pipelines represent a structured approach to data movement and transformation across systems. A data pipeline consists of connected stages that extract data from sources, apply transformations, and load results into destinations. Each stage performs specific operations on data as it flows through the pipeline.
The concept emerged from batch processing systems in mainframe computing, where jobs processed data sequentially through defined steps. Modern data pipelines extend this foundation to handle real-time streaming, distributed processing, and complex transformation logic across multiple systems.
Data pipelines address several software development challenges. They decouple data producers from consumers, allowing independent evolution of source and destination systems. They centralize data transformation logic, making changes easier to test and deploy. They provide monitoring and recovery mechanisms for data processing operations.
# Basic pipeline structure
pipeline = DataPipeline.new do |p|
p.extract from: Database.new("source_db")
p.transform { |record| record.merge(processed_at: Time.now) }
p.load to: Database.new("target_db")
end
pipeline.run
# => Processed 10,000 records in 45 seconds
The pipeline concept applies across multiple scales, from single-process data transformations to distributed systems processing petabytes of data. Ruby applications commonly use pipelines for ETL operations, event processing, data synchronization, and report generation.
Key Principles
Data pipelines operate on several fundamental principles that define their behavior and capabilities.
Stage-based processing divides work into discrete stages, each responsible for specific operations. Stages connect through defined interfaces, receiving input from upstream stages and producing output for downstream stages. This separation enables testing individual stages independently and modifying pipeline structure without rewriting all code.
class ExtractStage
def call(context)
context[:records] = database.query("SELECT * FROM orders WHERE status = 'new'")
context
end
end
class TransformStage
def call(context)
context[:records] = context[:records].map { |r| normalize(r) }
context
end
end
class LoadStage
def call(context)
warehouse.bulk_insert(context[:records])
context
end
end
Data flow semantics determine how data moves through stages. Push-based systems have stages actively send data to the next stage. Pull-based systems have stages request data from previous stages. Hybrid approaches combine both patterns based on stage characteristics.
Error handling and recovery mechanisms ensure pipelines handle failures gracefully. Stages may fail due to invalid data, connectivity issues, or resource constraints. Pipelines implement retry logic, dead letter queues for failed records, and checkpoint mechanisms to resume processing after failures.
Idempotency ensures running a pipeline multiple times with the same input produces the same output. This property enables safe retries and reprocessing. Pipelines achieve idempotency through transactional operations, deterministic transformations, and upsert semantics in load stages.
class IdempotentLoader
def load(records)
records.each do |record|
# Upsert ensures reprocessing doesn't create duplicates
database.execute(
"INSERT INTO events (id, data, processed_at)
VALUES (?, ?, ?)
ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
record[:id], record[:data], Time.now
)
end
end
end
Backpressure manages situations where downstream stages cannot keep pace with upstream stages. Without backpressure, fast producers overwhelm slow consumers, causing memory exhaustion. Backpressure mechanisms include bounded queues, rate limiting, and flow control signals between stages.
Parallelism and concurrency determine how pipelines process multiple data items simultaneously. Data parallelism processes multiple records concurrently within a stage. Pipeline parallelism runs multiple stages concurrently. Partitioning splits data across multiple parallel pipeline instances.
Data quality and validation ensure pipeline inputs and outputs meet defined schemas and constraints. Pipelines validate data at stage boundaries, reject invalid records or apply correction logic, and track data quality metrics throughout processing.
Implementation Approaches
Data pipeline implementations vary based on processing patterns, latency requirements, and data volumes.
Batch processing handles data in discrete batches at scheduled intervals. The pipeline reads a set of records, processes them completely, and writes results before starting the next batch. Batch pipelines optimize for throughput over latency, processing large volumes efficiently but with delay between data arrival and availability.
class BatchPipeline
def run(batch_size: 1000)
loop do
batch = extract_batch(batch_size)
break if batch.empty?
transformed = transform_batch(batch)
load_batch(transformed)
checkpoint(batch.last[:id])
end
end
private
def extract_batch(size)
database.query(
"SELECT * FROM events WHERE id > ? ORDER BY id LIMIT ?",
last_processed_id, size
)
end
end
Batch pipelines suit scenarios with periodic reporting, data warehouse loading, and bulk data migrations. They handle failures by reprocessing entire batches, simplifying recovery logic. Batch processing achieves high throughput through bulk operations and reduced transaction overhead.
Stream processing handles continuous data flows, processing records as they arrive. Stream pipelines maintain low latency between data production and availability, making them suitable for real-time applications. Stream processing frameworks provide windowing, stateful operations, and exactly-once processing semantics.
class StreamPipeline
def initialize
@kafka_consumer = Kafka::Consumer.new("events_topic")
@state_store = StateStore.new
end
def run
@kafka_consumer.each_message do |message|
record = JSON.parse(message.value)
# Stateful processing with exactly-once semantics
@state_store.transaction do
process_record(record)
@state_store.commit_offset(message.offset)
end
end
end
end
Stream pipelines excel at event processing, real-time analytics, and trigger-based actions. They require careful state management and coordination to maintain consistency across restarts and failures.
Micro-batch processing combines batch and stream characteristics, processing small batches at frequent intervals. This approach balances the throughput benefits of batch processing with the lower latency of streaming. Micro-batch systems often use fixed time windows, processing all records arriving within each window.
Lambda architecture separates batch and stream processing paths. The batch layer processes complete datasets to produce accurate views. The speed layer processes recent data with lower latency. A serving layer merges results from both layers. This architecture handles the CAP theorem trade-offs by providing both eventual accuracy and low-latency approximations.
Kappa architecture simplifies lambda architecture by using only stream processing. The pipeline processes all data as streams, including historical data treated as replayed events. This approach reduces operational complexity by maintaining a single processing system but requires stream processing frameworks capable of handling historical data volumes.
Ruby Implementation
Ruby provides multiple approaches for implementing data pipelines, from lightweight libraries to framework integrations.
Procedural pipelines use method chaining and blocks to define processing stages:
class Pipeline
def initialize(data)
@data = data
end
def filter(&block)
@data = @data.select(&block)
self
end
def map(&block)
@data = @data.map(&block)
self
end
def reduce(initial, &block)
@data.reduce(initial, &block)
end
end
result = Pipeline.new(records)
.filter { |r| r[:status] == 'active' }
.map { |r| r[:amount] }
.reduce(0, :+)
Sidekiq pipelines use background job processing for asynchronous stages:
class ExtractJob
include Sidekiq::Job
def perform(batch_id)
records = extract_from_source(batch_id)
TransformJob.perform_bulk(records.map { |r| [batch_id, r] })
end
end
class TransformJob
include Sidekiq::Job
def perform(batch_id, record)
transformed = transform_record(record)
LoadJob.perform_async(batch_id, transformed)
end
end
This approach scales horizontally by adding Sidekiq workers and provides natural failure isolation since each job can retry independently.
Resque chain implements pipelines through job dependencies:
class PipelineOrchestrator
def self.execute(source_id)
extract_job = ExtractJob.new(source_id)
transform_job = TransformJob.new
load_job = LoadJob.new
Resque.enqueue_to(:pipeline, extract_job)
extract_job.on_complete { Resque.enqueue(transform_job) }
transform_job.on_complete { Resque.enqueue(load_job) }
end
end
Rails Active Job pipelines integrate with Rails applications:
class DataPipelineJob < ApplicationJob
queue_as :data_processing
def perform(source_table, target_table)
records = extract(source_table)
transformed = transform(records)
load(transformed, target_table)
rescue => e
DataPipelineFailureJob.perform_later(source_table, e.message)
raise
end
private
def extract(table)
ActiveRecord::Base.connection.select_all(
"SELECT * FROM #{table} WHERE processed = false"
).to_a
end
def transform(records)
records.map do |record|
{
id: record['id'],
normalized_name: normalize_name(record['name']),
processed_at: Time.current
}
end
end
def load(records, table)
values = records.map { |r| "(#{r[:id]}, '#{r[:normalized_name]}', '#{r[:processed_at]}'))" }.join(',')
ActiveRecord::Base.connection.execute(
"INSERT INTO #{table} (id, name, processed_at) VALUES #{values}"
)
end
end
Custom DSL approach creates expressive pipeline definitions:
class DataPipeline
attr_reader :stages
def initialize(&block)
@stages = []
instance_eval(&block)
end
def extract(source)
@stages << ExtractStage.new(source)
end
def transform(&block)
@stages << TransformStage.new(block)
end
def filter(&block)
@stages << FilterStage.new(block)
end
def load(destination)
@stages << LoadStage.new(destination)
end
def run
context = {}
@stages.reduce(context) { |ctx, stage| stage.call(ctx) }
end
end
pipeline = DataPipeline.new do
extract from: :database
filter { |record| record[:amount] > 100 }
transform { |record| record.merge(category: categorize(record)) }
load to: :warehouse
end
pipeline.run
Concurrent Ruby pipelines manage parallel processing:
require 'concurrent-ruby'
class ConcurrentPipeline
def initialize(concurrency: 4)
@pool = Concurrent::FixedThreadPool.new(concurrency)
@queue = Concurrent::Array.new
end
def process(records)
futures = records.map do |record|
Concurrent::Future.execute(executor: @pool) do
transform_record(record)
end
end
futures.map(&:value)
end
end
Performance Considerations
Pipeline performance depends on throughput, latency, resource utilization, and scalability characteristics.
Throughput optimization focuses on processing volume per time unit. Batch processing achieves higher throughput than record-by-record processing by amortizing overhead across multiple records. Bulk database operations reduce network round-trips and transaction costs.
# Low throughput: individual inserts
records.each do |record|
database.execute("INSERT INTO target VALUES (?)", record)
end
# 100 records/second
# High throughput: bulk insert
values = records.map { |r| "(#{r[:id]}, '#{r[:data]}')" }.join(',')
database.execute("INSERT INTO target (id, data) VALUES #{values}")
# 10,000 records/second
Latency optimization reduces time between data arrival and availability. Stream processing minimizes latency but often sacrifices throughput. Micro-batching balances both concerns by processing small batches frequently.
Memory management prevents pipelines from exhausting available memory. Streaming through data rather than loading complete datasets into memory enables processing arbitrarily large inputs. Bounded queues between stages prevent fast producers from accumulating unbounded data.
class MemoryEfficientPipeline
def process_large_file(filename)
File.foreach(filename).lazy
.map { |line| parse_line(line) }
.select { |record| valid?(record) }
.each_slice(1000) { |batch| load_batch(batch) }
end
end
Parallelization strategies improve performance on multi-core systems. Data parallelism processes multiple records concurrently within a stage. Pipeline parallelism runs multiple stages concurrently, with each stage processing different data. Partitioning divides data across independent pipeline instances.
class ParallelPipeline
def process_partitioned(records, partitions: 4)
# Partition data
buckets = records.group_by { |r| r[:id] % partitions }
# Process partitions concurrently
threads = buckets.map do |partition_id, partition_records|
Thread.new do
partition_records.each { |r| process_record(r) }
end
end
threads.each(&:join)
end
end
Bottleneck identification locates stages limiting overall pipeline performance. Monitoring stage processing times and queue depths reveals bottlenecks. Common bottlenecks include slow database queries, network I/O, CPU-intensive transformations, and serialization overhead.
Caching strategies reduce redundant computation. Caching reference data, compiled transformations, and computed aggregates improves performance when the same data appears multiple times.
class CachedTransformStage
def initialize
@reference_cache = {}
end
def transform(record)
# Cache reference data lookup
category = @reference_cache[record[:category_id]] ||=
database.query("SELECT * FROM categories WHERE id = ?", record[:category_id])
record.merge(category_name: category[:name])
end
end
Backpressure handling prevents overwhelming slow stages. Blocking when queues fill causes upstream stages to wait for downstream stages to catch up. Bounded queues with blocking puts implement backpressure naturally.
Resource pooling shares expensive resources across pipeline stages. Database connection pools, HTTP client pools, and thread pools reduce resource creation overhead and prevent resource exhaustion.
Tools & Ecosystem
Ruby applications access various tools for building data pipelines.
Resque provides Redis-backed job processing for asynchronous pipeline stages:
class DataExtractionJob
@queue = :data_pipeline
def self.perform(source_id)
records = Database.query("SELECT * FROM source WHERE id > ?", source_id)
records.each do |record|
Resque.enqueue(DataTransformJob, record)
end
end
end
Resque works well for batch pipelines with relatively simple processing logic. It lacks built-in features for complex workflows but integrates easily into existing Rails applications.
Sidekiq offers faster performance than Resque with similar API:
class StreamProcessorJob
include Sidekiq::Job
sidekiq_options queue: 'realtime', retry: 3
def perform(event_data)
event = parse_event(event_data)
process_event(event)
WebhookNotificationJob.perform_async(event[:id])
end
end
Sidekiq uses threads instead of processes, reducing memory overhead and improving throughput. Commercial versions add rate limiting, batching, and periodic jobs.
Karafka integrates Ruby applications with Apache Kafka for stream processing:
class EventsConsumer < Karafka::BaseConsumer
def consume
messages.each do |message|
event = JSON.parse(message.payload)
transform_and_store(event)
end
end
end
# Configuration
class KarafkaApp < Karafka::App
setup do |config|
config.kafka = { 'bootstrap.servers': 'localhost:9092' }
end
routes.draw do
topic :events do
consumer EventsConsumer
end
end
end
Karafka enables building stream processing pipelines that integrate with Kafka ecosystems used across the organization.
Apache Airflow orchestrates complex pipelines with dependencies and scheduling. While written in Python, Ruby applications can integrate through Airflow's API or by wrapping Ruby scripts in Airflow operators.
AWS Data Pipeline provides managed orchestration for data workflows. Ruby SDKs interact with Data Pipeline for job submission and monitoring.
Sneakers processes RabbitMQ messages for event-driven pipelines:
class DataProcessor
include Sneakers::Worker
from_queue 'events', env: nil
def work(msg)
data = JSON.parse(msg)
process_data(data)
ack!
end
end
dry-transaction composes operations with automatic error handling:
require 'dry/transaction'
class DataPipeline
include Dry::Transaction
step :extract
step :validate
step :transform
step :load
def extract(input)
records = database.query("SELECT * FROM source")
Success(records: records)
end
def validate(records:)
invalid = records.reject { |r| valid?(r) }
return Failure(invalid: invalid) unless invalid.empty?
Success(records: records)
end
def transform(records:)
transformed = records.map { |r| apply_transformations(r) }
Success(records: transformed)
end
def load(records:)
warehouse.bulk_insert(records)
Success(count: records.size)
end
end
Sequel provides efficient database operations for extract and load stages:
require 'sequel'
DB = Sequel.connect('postgres://localhost/database')
class SequelPipeline
def extract
DB[:source_table]
.where(processed: false)
.select(:id, :data, :created_at)
end
def transform(dataset)
dataset.map do |row|
{
source_id: row[:id],
normalized: normalize(row[:data]),
timestamp: row[:created_at]
}
end
end
def load(records)
DB[:target_table].multi_insert(records)
end
end
Design Considerations
Choosing appropriate pipeline architecture requires evaluating trade-offs across multiple dimensions.
Batch vs stream processing represents a fundamental choice. Batch processing optimizes for throughput and simplifies exactly-once semantics, but introduces latency. Stream processing minimizes latency and enables real-time responses, but increases system complexity and requires careful state management. Select batch processing when periodic updates suffice and data volumes benefit from bulk operations. Select stream processing when applications require low latency or need to react to events as they occur.
Push vs pull data flow affects system coupling and backpressure handling. Push-based systems have producers send data to consumers, leading to simpler producer logic but requiring consumers to handle backpressure. Pull-based systems have consumers request data from producers, naturally implementing backpressure but increasing coordination complexity. Hybrid approaches use push for real-time data and pull for batch data.
Centralized vs distributed orchestration determines operational complexity. Centralized orchestrators (Airflow, Prefect) provide visibility and control but create single points of failure. Distributed orchestration embeds scheduling logic in pipeline code, improving resilience but complicating monitoring. Use centralized orchestration for complex workflows with many dependencies. Use distributed approaches for simple, independent pipelines requiring high availability.
Stateful vs stateless processing impacts scaling and recovery. Stateless stages process each record independently, enabling horizontal scaling and simple recovery. Stateful stages maintain information across records (aggregations, windowing, deduplication), requiring state management and coordination. Design stateless stages when possible. Introduce statefulness only when required by business logic.
# Stateless: each record processed independently
def transform(record)
record.merge(normalized_name: record[:name].upcase)
end
# Stateful: maintains information across records
class AggregatingStage
def initialize
@totals = Hash.new(0)
end
def transform(record)
@totals[record[:category]] += record[:amount]
record.merge(category_total: @totals[record[:category]])
end
end
Schema enforcement determines data quality guarantees. Strict schemas validate all data against defined structures, catching errors early but requiring schema evolution processes. Schemaless processing handles varied data formats flexibly but pushes validation responsibility to consumers. Enforce schemas at pipeline boundaries. Allow flexibility within pipeline stages for exploratory work.
Error handling strategies balance reliability and complexity. Fail-fast approaches halt processing on errors, maintaining consistency but requiring manual intervention. Retry mechanisms automatically handle transient failures, improving reliability but potentially amplifying failures. Dead letter queues isolate problematic records for later analysis. Combine strategies: retry transient errors, dead letter queue for data issues, fail-fast for configuration errors.
Monitoring and observability requirements influence architecture choices. Pipelines processing business-critical data require comprehensive monitoring, alerting, and audit trails. Internal data synchronization may need only basic health checks. Design monitoring into pipelines from the start, not as an afterthought.
Reference
Pipeline Stage Types
| Stage Type | Purpose | Input | Output |
|---|---|---|---|
| Extract | Read data from sources | Source identifiers | Raw records |
| Transform | Modify data structure or content | Raw records | Transformed records |
| Filter | Remove unwanted records | Record stream | Filtered stream |
| Aggregate | Combine multiple records | Record groups | Summary records |
| Enrich | Add information from external sources | Records + keys | Enhanced records |
| Validate | Check data quality | Records | Valid records + errors |
| Load | Write data to destinations | Transformed records | Load statistics |
Common Pipeline Patterns
| Pattern | Description | Use Case |
|---|---|---|
| ETL | Extract-Transform-Load | Data warehouse loading |
| ELT | Extract-Load-Transform | Cloud data lakes |
| CDC | Change Data Capture | Database replication |
| Streaming ETL | Continuous transformation | Real-time analytics |
| Batch with Checkpoints | Resumable batch processing | Large data migrations |
| Lambda Architecture | Batch + stream layers | CAP theorem trade-offs |
| Kappa Architecture | Stream-only processing | Simplified operations |
Performance Characteristics
| Approach | Latency | Throughput | Complexity | State Management |
|---|---|---|---|---|
| Batch | Minutes to hours | Very high | Low | Simple |
| Micro-batch | Seconds | High | Medium | Moderate |
| Stream | Milliseconds | Medium | High | Complex |
| Parallel batch | Minutes | Very high | Medium | Partition-local |
Ruby Pipeline Libraries
| Library | Processing Model | Concurrency | Best For |
|---|---|---|---|
| Sidekiq | Async jobs | Multi-threaded | Background processing |
| Resque | Async jobs | Multi-process | Batch jobs |
| Karafka | Stream | Multi-threaded | Kafka integration |
| Sneakers | Message queue | Multi-threaded | RabbitMQ integration |
| dry-transaction | Synchronous | Single-threaded | Business logic composition |
Error Handling Strategies
| Strategy | Pros | Cons | Implementation |
|---|---|---|---|
| Fail-fast | Maintains consistency | Requires intervention | Raise exceptions immediately |
| Retry with backoff | Handles transient failures | May amplify problems | Exponential backoff logic |
| Dead letter queue | Isolates bad data | Requires monitoring | Separate storage for failed records |
| Skip and log | Processing continues | May hide issues | Log errors, continue pipeline |
| Circuit breaker | Protects downstream | Temporary data loss | Stop after threshold failures |
Monitoring Metrics
| Metric | Purpose | Collection Method |
|---|---|---|
| Records processed | Throughput measurement | Counter per stage |
| Processing latency | Performance tracking | Timer per record |
| Error rate | Reliability indicator | Failed records / total records |
| Queue depth | Backpressure indicator | Queue size between stages |
| Stage duration | Bottleneck identification | Timer per stage execution |
| Resource utilization | Capacity planning | CPU, memory, I/O monitoring |
Configuration Options
pipeline_config = {
batch_size: 1000,
concurrency: 4,
retry_attempts: 3,
retry_delay: 5,
timeout: 300,
checkpoint_interval: 100,
enable_monitoring: true,
dead_letter_queue: "failed_records",
max_memory: "2GB"
}
State Management Approaches
| Approach | Durability | Performance | Complexity | Use Case |
|---|---|---|---|---|
| In-memory | None | Highest | Low | Temporary aggregations |
| Database | High | Low | Medium | Persistent state |
| Redis | Medium | High | Low | Shared state, caching |
| File-based checkpoints | High | Medium | Medium | Batch processing resume |
| External state store | High | Medium | High | Distributed processing |