Overview
Pipe and Filter Architecture organizes systems as a sequence of processing components (filters) that transform data as it passes through connecting channels (pipes). Each filter operates independently, reading input from one or more pipes, performing a specific transformation, and writing output to outgoing pipes. The architecture derives its name from Unix shell pipelines, where commands connect via the pipe operator to form data processing chains.
This pattern addresses the challenge of building flexible data processing systems that remain maintainable as complexity grows. Traditional monolithic processors become difficult to modify and test as requirements change. Pipe and Filter Architecture decomposes processing into discrete, single-purpose components that combine to achieve complex transformations. Each filter implements one transformation step, making the system easier to understand, test, and modify.
The pattern applies across multiple domains. Unix commands form processing pipelines through shell operators. Compilers organize as multi-stage pipelines progressing from lexical analysis through code generation. Data processing systems use filter chains to cleanse, transform, and aggregate information. Web servers apply middleware filters to requests and responses. Message processing systems route and transform messages through filter networks.
# Unix-style pipeline concept
input_data
.map { |item| filter_one(item) }
.select { |item| filter_two(item) }
.map { |item| filter_three(item) }
The architecture imposes constraints that shape system structure. Filters remain independent without shared state. Data flows in one direction through the pipeline. Each filter exposes uniform interfaces for connecting to pipes. These constraints enable composability—filters combine in different arrangements to create varied processing behaviors without modifying individual components.
Key Principles
The Pipe and Filter Architecture rests on several fundamental concepts that define how components interact and data moves through the system.
Filters perform transformations on data. Each filter implements a single, well-defined operation without knowledge of upstream or downstream components. Filters read input, apply transformations, and produce output. The filter remains unaware of data sources or destinations—it only knows the data format it receives and produces. This isolation enables reuse across different pipelines and independent testing of transformation logic.
Filters fall into distinct categories based on their behavior. Producer filters generate data without input, serving as pipeline sources. Examples include file readers, database queries, sensors, or data generators. Transformer filters accept input, modify it, and produce output. These perform operations like format conversion, calculation, validation, or enrichment. Consumer filters accept input without producing output, serving as pipeline endpoints. Examples include file writers, database inserts, displays, or logging components. Tester filters examine input and produce filtered subsets based on criteria, acting as conditional gates in the pipeline.
Pipes connect filters by transmitting data between components. A pipe links one filter's output to another filter's input, establishing data flow direction. Pipes enforce the communication protocol between filters, ensuring output format from one filter matches input expectations of the next. The pipe handles data buffering, flow control, and potentially format adaptation between components.
Pipes implement different connection patterns. Sequential pipes form linear chains where data passes through each filter in order. Split pipes duplicate data to multiple downstream filters for parallel processing. Merge pipes combine outputs from multiple filters into a single stream. Feedback pipes route output back to earlier pipeline stages for iterative processing.
Data flow moves in one direction through the pipeline without backtracking. Filters cannot query upstream components for additional data or send acknowledgments backward. This unidirectional constraint simplifies reasoning about system behavior and prevents circular dependencies. Data arrives at each filter in the order produced by upstream components, though buffering may introduce delays.
The architecture supports two primary execution models. Push-based flow has upstream filters actively send data to downstream components. When a filter produces output, it immediately pushes data to the next filter, which processes it and pushes to subsequent stages. This model minimizes latency but may overwhelm slow downstream filters. Pull-based flow has downstream filters request data from upstream components. When a filter needs input, it pulls from the preceding filter, which may pull from its predecessors. This model naturally handles backpressure but adds latency from request-response cycles.
Filter independence mandates that no filter depends on implementation details of other filters. Filters communicate solely through pipes using agreed-upon data formats. A filter cannot directly invoke methods on other filters, access their internal state, or make assumptions about their presence. This independence allows replacing or rearranging filters without cascade modifications.
Composability emerges from standardized interfaces and independence. The same filter participates in multiple pipelines serving different purposes. Filters combine in various orders to create different processing behaviors. New pipelines assemble from existing filters without code changes. This composability accelerates development and improves code reuse.
Design Considerations
Pipe and Filter Architecture suits specific problem domains while imposing constraints that make it unsuitable for others. Understanding when to apply this pattern requires evaluating data flow characteristics, processing requirements, and system qualities.
The pattern excels when processing follows a clear sequence of transformation steps. Each step represents an independent operation that can execute without knowledge of other steps. The data undergoes successive modifications as it progresses through stages. Examples include compilers transforming source code through parsing, optimization, and code generation; ETL systems extracting, cleansing, and loading data; image processing applying filters like blur, sharpen, and color correction; log processing parsing, filtering, and aggregating events.
Systems requiring high composability benefit from this architecture. When new processing behaviors emerge by rearranging existing filters, development accelerates. When filters reuse across multiple pipelines, maintenance effort decreases. When testing individual filters in isolation improves test coverage, quality increases. This composability advantage grows with the number of distinct filters and pipelines in the system.
The architecture supports parallel processing when filters operate independently without shared state. Multiple instances of the same filter process different data items concurrently. Different filters in the pipeline execute simultaneously on different data items. This parallelism improves throughput on multi-core systems and distributed environments. However, achieving effective parallelism requires careful buffer sizing and flow control to prevent bottlenecks.
Dataflow characteristics determine pattern suitability. The architecture works well with streaming data that arrives continuously and processes incrementally. Batch processing systems where data flows through distinct transformation stages also fit naturally. However, systems requiring random access to data, complex navigation through data structures, or frequent backtracking face challenges. The unidirectional data flow constraint makes it difficult to implement algorithms requiring multiple passes or data lookups.
Interaction patterns influence architectural choice. Systems with primarily data transformation operations match the pattern well. Systems requiring complex control flow, conditional branching based on multiple data sources, or coordination between components face difficulties. The filter independence constraint makes it hard to implement transactions spanning multiple filters or maintain consistency across parallel branches.
Performance trade-offs require evaluation. The architecture introduces overhead from data copying between filters and buffering in pipes. Each filter adds latency as data traverses the pipeline. Systems requiring minimal latency or processing massive data volumes may find this overhead prohibitive. However, the architecture enables optimization opportunities through parallel execution, filter fusion, and pipeline restructuring that can outweigh the overhead.
State management presents challenges in this architecture. Filters should remain stateless between data items to ensure independence and enable parallelism. Transformations requiring state—such as aggregations, joins, or windowing operations—require careful design. Options include embedding state within data items, implementing stateful filters with explicit state management, or using external state stores. Each approach involves trade-offs between filter independence and transformation expressiveness.
Error handling complexity increases with pipeline length. When a filter encounters an error, deciding how to handle it depends on system requirements. Options include propagating errors downstream as special data items, logging errors and continuing processing, halting the pipeline, or routing problematic data to error-handling filters. The error strategy must balance fault tolerance with processing guarantees.
Alternative patterns address similar problems with different trade-offs. Layered Architecture organizes systems as hierarchical layers where each layer provides services to the layer above. This pattern suits systems with clear abstraction levels but lacks the composability and parallel processing of Pipe and Filter. Microservices Architecture decomposes systems into independently deployable services communicating through APIs. This provides deployment flexibility and technology diversity but adds operational complexity. Event-Driven Architecture connects components through asynchronous event propagation, supporting loose coupling but introducing complexity in reasoning about system behavior.
Selecting between patterns depends on primary system goals. Choose Pipe and Filter when transformation composability and parallel processing dominate requirements. Choose Layered when abstraction levels and dependency management matter most. Choose Microservices when deployment independence and team autonomy take priority. Choose Event-Driven when asynchronous processing and component decoupling drive decisions.
Ruby Implementation
Ruby's enumerable methods and object model naturally express Pipe and Filter concepts. The language provides multiple approaches for building pipelines, from simple method chains to explicit filter classes.
Method chaining creates pipelines by connecting enumerable operations. Each method acts as a filter, transforming data and passing results to the next method. This approach works well for straightforward pipelines with standard transformations.
# Simple pipeline using enumerable methods
data = ["alpha", "beta", "gamma", "delta"]
result = data
.map(&:upcase) # Transform filter
.select { |s| s.length > 4 } # Tester filter
.map { |s| s.reverse } # Transform filter
.first(2) # Consumer filter
# => ["AHPLA", "AMMAD"]
Proc objects encapsulate filters as callable objects that combine through composition. This approach provides more explicit filter definitions while maintaining Ruby's functional style.
# Filters as Proc objects
uppercase_filter = ->(items) { items.map(&:upcase) }
length_filter = ->(items) { items.select { |s| s.length > 4 } }
reverse_filter = ->(items) { items.map(&:reverse) }
# Compose filters into pipeline
pipeline = ->(data) {
reverse_filter.call(
length_filter.call(
uppercase_filter.call(data)
)
)
}
result = pipeline.call(["alpha", "beta", "gamma", "delta"])
Filter classes provide reusable components with explicit interfaces. Each filter class implements a standard interface for processing data, enabling composition and substitution.
# Base filter interface
class Filter
def call(input)
raise NotImplementedError
end
end
# Producer filter: generates data
class DataSource < Filter
def initialize(data)
@data = data
end
def call(input = nil)
@data
end
end
# Transformer filter: modifies data
class UppercaseTransformer < Filter
def call(input)
input.map(&:upcase)
end
end
class LengthFilter < Filter
def initialize(min_length)
@min_length = min_length
end
def call(input)
input.select { |s| s.length > @min_length }
end
end
class ReverseTransformer < Filter
def call(input)
input.map(&:reverse)
end
end
# Pipeline composition
class Pipeline
def initialize
@filters = []
end
def add_filter(filter)
@filters << filter
self
end
def execute(input = nil)
@filters.reduce(input) { |data, filter| filter.call(data) }
end
end
# Build and execute pipeline
pipeline = Pipeline.new
.add_filter(DataSource.new(["alpha", "beta", "gamma", "delta"]))
.add_filter(UppercaseTransformer.new)
.add_filter(LengthFilter.new(4))
.add_filter(ReverseTransformer.new)
result = pipeline.execute
# => ["AHPLA", "AMMED", "ATLED"]
Enumerator objects enable lazy evaluation for efficient memory usage with large datasets. Filters yield results incrementally instead of materializing entire collections.
# Lazy pipeline with Enumerator
class LazyFilter
def initialize(source)
@source = source
end
def each
return enum_for(:each) unless block_given?
@source.each do |item|
result = transform(item)
yield result if result
end
end
def transform(item)
item # Override in subclasses
end
end
class LazyUppercase < LazyFilter
def transform(item)
item.upcase
end
end
class LazyLengthFilter < LazyFilter
def initialize(source, min_length)
super(source)
@min_length = min_length
end
def transform(item)
item if item.length > @min_length
end
end
class LazyReverse < LazyFilter
def transform(item)
item.reverse
end
end
# Build lazy pipeline
data = ["alpha", "beta", "gamma", "delta"]
pipeline = LazyReverse.new(
LazyLengthFilter.new(
LazyUppercase.new(data),
4
)
)
# Data processes on-demand
pipeline.each do |item|
puts item # Processes one item at a time
end
Concurrent pipelines process data in parallel using threads or fibers. This approach improves throughput for CPU-bound transformations on multi-core systems.
require 'concurrent'
class ConcurrentPipeline
def initialize(thread_pool_size = 4)
@filters = []
@pool = Concurrent::FixedThreadPool.new(thread_pool_size)
end
def add_filter(filter)
@filters << filter
self
end
def execute(input_data)
# Process items concurrently through pipeline
futures = input_data.map do |item|
Concurrent::Future.execute(executor: @pool) do
@filters.reduce(item) { |data, filter| filter.call(data) }
end
end
# Collect results
futures.map(&:value)
ensure
@pool.shutdown
@pool.wait_for_termination
end
end
# Single-item filters for concurrent processing
class ItemUppercase < Filter
def call(input)
input.upcase
end
end
class ItemLengthCheck < Filter
def initialize(min_length)
@min_length = min_length
end
def call(input)
input if input.length > @min_length
end
end
class ItemReverse < Filter
def call(input)
input.reverse
end
end
# Execute concurrent pipeline
pipeline = ConcurrentPipeline.new(4)
.add_filter(ItemUppercase.new)
.add_filter(ItemLengthCheck.new(4))
.add_filter(ItemReverse.new)
result = pipeline.execute(["alpha", "beta", "gamma", "delta"])
# => ["AHPLA", nil, "AMMAD", "ATLED"]
Named pipe pattern implements explicit pipe objects that handle data buffering and flow control between filters.
require 'thread'
class Pipe
def initialize(buffer_size = 10)
@queue = Queue.new
@buffer_size = buffer_size
@closed = false
end
def write(data)
raise "Pipe closed" if @closed
@queue.push(data)
end
def read
@queue.pop
end
def close
@closed = true
@queue.push(:end_of_stream)
end
def closed?
@closed
end
end
class ThreadedFilter
def initialize(input_pipe, output_pipe)
@input_pipe = input_pipe
@output_pipe = output_pipe
end
def start
Thread.new do
loop do
data = @input_pipe.read
break if data == :end_of_stream
result = process(data)
@output_pipe.write(result) if result
end
@output_pipe.close
end
end
def process(data)
data # Override in subclasses
end
end
# Example usage with pipes
pipe1 = Pipe.new
pipe2 = Pipe.new
pipe3 = Pipe.new
filter1 = ThreadedFilter.new(pipe1, pipe2) do |data|
data.upcase
end
filter2 = ThreadedFilter.new(pipe2, pipe3) do |data|
data.reverse if data.length > 4
end
# Start filters
t1 = filter1.start
t2 = filter2.start
# Write data to pipeline
["alpha", "beta", "gamma"].each { |item| pipe1.write(item) }
pipe1.close
# Read results
results = []
loop do
data = pipe3.read
break if data == :end_of_stream
results << data
end
[t1, t2].each(&:join)
# results => ["AHPLA", "AMMAD"]
Practical Examples
Log Processing Pipeline
Log analysis systems process streams of log entries through multiple transformation stages. This example demonstrates a pipeline that parses, filters, enriches, and aggregates log data.
class LogEntry
attr_accessor :timestamp, :level, :message, :metadata
def initialize(timestamp, level, message, metadata = {})
@timestamp = timestamp
@level = level
@message = message
@metadata = metadata
end
end
# Producer: Read log lines
class LogReader < Filter
def initialize(log_lines)
@log_lines = log_lines
end
def call(input = nil)
@log_lines
end
end
# Parser: Convert strings to structured entries
class LogParser < Filter
PATTERN = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)$/
def call(input)
input.map do |line|
if match = line.match(PATTERN)
LogEntry.new(
Time.parse(match[1]),
match[2],
match[3]
)
end
end.compact
end
end
# Filter: Select entries matching criteria
class LogLevelFilter < Filter
def initialize(levels)
@levels = levels
end
def call(input)
input.select { |entry| @levels.include?(entry.level) }
end
end
# Enrichment: Add metadata
class LogEnricher < Filter
def initialize(enrichment_data)
@enrichment_data = enrichment_data
end
def call(input)
input.map do |entry|
# Extract user ID from message
if entry.message =~ /user_id=(\d+)/
user_id = $1
entry.metadata[:user_name] = @enrichment_data[user_id]
end
entry
end
end
end
# Aggregator: Group and count
class LogAggregator < Filter
def call(input)
input.group_by(&:level).transform_values(&:count)
end
end
# Execute pipeline
log_lines = [
"2025-01-15 10:23:45 [INFO] User logged in user_id=123",
"2025-01-15 10:24:12 [ERROR] Database connection failed",
"2025-01-15 10:24:45 [WARN] High memory usage detected",
"2025-01-15 10:25:03 [ERROR] Request timeout user_id=456",
"2025-01-15 10:25:30 [INFO] Cache invalidated"
]
enrichment = { "123" => "alice", "456" => "bob" }
pipeline = Pipeline.new
.add_filter(LogReader.new(log_lines))
.add_filter(LogParser.new)
.add_filter(LogLevelFilter.new(["ERROR", "WARN"]))
.add_filter(LogEnricher.new(enrichment))
.add_filter(LogAggregator.new)
result = pipeline.execute
# => {"ERROR"=>2, "WARN"=>1}
Data ETL Pipeline
Extract, Transform, Load operations follow the Pipe and Filter pattern naturally. This example shows a pipeline that extracts data from multiple sources, transforms it, and loads into a target system.
# Extract filters
class DatabaseExtractor < Filter
def initialize(connection)
@connection = connection
end
def call(input = nil)
# Simulated database query
[
{ id: 1, name: "Product A", price: 100, category: "electronics" },
{ id: 2, name: "Product B", price: 50, category: "books" },
{ id: 3, name: "Product C", price: 200, category: "electronics" }
]
end
end
class APIExtractor < Filter
def initialize(endpoint)
@endpoint = endpoint
end
def call(input = nil)
# Simulated API response
[
{ product_id: 1, inventory: 50, warehouse: "east" },
{ product_id: 2, inventory: 100, warehouse: "west" },
{ product_id: 3, inventory: 25, warehouse: "east" }
]
end
end
# Transform filters
class DataJoiner < Filter
def initialize(join_key, secondary_data)
@join_key = join_key
@secondary_data = secondary_data
end
def call(input)
input.map do |record|
join_value = record[@join_key]
secondary = @secondary_data.find { |r| r[:product_id] == join_value }
record.merge(secondary || {})
end
end
end
class PriceCalculator < Filter
TAX_RATE = 0.08
def call(input)
input.map do |record|
if record[:price]
record[:price_with_tax] = (record[:price] * (1 + TAX_RATE)).round(2)
end
record
end
end
end
class CategoryFilter < Filter
def initialize(allowed_categories)
@allowed_categories = allowed_categories
end
def call(input)
input.select { |record| @allowed_categories.include?(record[:category]) }
end
end
# Transform to output format
class OutputFormatter < Filter
def call(input)
input.map do |record|
{
product_name: record[:name],
final_price: record[:price_with_tax],
stock_level: record[:inventory],
location: record[:warehouse]
}
end
end
end
# Execute ETL pipeline
db_data = DatabaseExtractor.new("db_connection").call
api_data = APIExtractor.new("api.example.com").call
pipeline = Pipeline.new
.add_filter(DataSource.new(db_data))
.add_filter(DataJoiner.new(:id, api_data))
.add_filter(PriceCalculator.new)
.add_filter(CategoryFilter.new(["electronics"]))
.add_filter(OutputFormatter.new)
result = pipeline.execute
# => [
# {product_name: "Product A", final_price: 108.0, stock_level: 50, location: "east"},
# {product_name: "Product C", final_price: 216.0, stock_level: 25, location: "east"}
# ]
Image Processing Pipeline
Image transformations compose naturally as filter chains. This example demonstrates basic image operations through a pipeline structure.
class Image
attr_accessor :pixels, :width, :height
def initialize(width, height, pixels)
@width = width
@height = height
@pixels = pixels
end
def get_pixel(x, y)
@pixels[y * @width + x]
end
def set_pixel(x, y, value)
@pixels[y * @width + x] = value
end
end
# Brightness adjustment filter
class BrightnessFilter < Filter
def initialize(adjustment)
@adjustment = adjustment
end
def call(input)
input.pixels = input.pixels.map { |p| [(p + @adjustment).clamp(0, 255), 255].min }
input
end
end
# Contrast adjustment filter
class ContrastFilter < Filter
def initialize(factor)
@factor = factor
end
def call(input)
input.pixels = input.pixels.map do |p|
((p - 128) * @factor + 128).clamp(0, 255).round
end
input
end
end
# Blur filter (simple box blur)
class BlurFilter < Filter
def initialize(radius = 1)
@radius = radius
end
def call(input)
output_pixels = input.pixels.dup
input.height.times do |y|
input.width.times do |x|
sum = 0
count = 0
(-@radius..@radius).each do |dy|
(-@radius..@radius).each do |dx|
nx, ny = x + dx, y + dy
if nx >= 0 && nx < input.width && ny >= 0 && ny < input.height
sum += input.get_pixel(nx, ny)
count += 1
end
end
end
output_pixels[y * input.width + x] = sum / count
end
end
input.pixels = output_pixels
input
end
end
# Threshold filter (binarization)
class ThresholdFilter < Filter
def initialize(threshold = 128)
@threshold = threshold
end
def call(input)
input.pixels = input.pixels.map { |p| p >= @threshold ? 255 : 0 }
input
end
end
# Create and process image
image = Image.new(4, 4, [
100, 120, 130, 140,
110, 125, 135, 145,
120, 130, 140, 150,
130, 140, 150, 160
])
pipeline = Pipeline.new
.add_filter(BrightnessFilter.new(20))
.add_filter(ContrastFilter.new(1.2))
.add_filter(BlurFilter.new(1))
.add_filter(ThresholdFilter.new(140))
result = pipeline.execute(image)
# => Image with processed pixels
Real-World Applications
Unix Command Pipeline
Operating systems implement Pipe and Filter Architecture at the shell level. Commands connect through pipes to perform complex text processing operations. Each command acts as an independent filter processing standard input and producing standard output.
# Ruby implementation of Unix-style pipeline
class ShellCommand < Filter
def initialize(command)
@command = command
end
def call(input)
IO.popen(@command, "r+") do |pipe|
pipe.write(input.join("\n"))
pipe.close_write
pipe.read.split("\n")
end
end
end
# Example: Process log files similar to:
# cat access.log | grep "ERROR" | cut -d' ' -f1 | sort | uniq -c
log_data = [
"2025-01-15 ERROR Database timeout",
"2025-01-15 INFO Request completed",
"2025-01-16 ERROR Connection failed",
"2025-01-15 ERROR Database timeout"
]
pipeline = Pipeline.new
.add_filter(DataSource.new(log_data))
.add_filter(Filter.new { |input| input.select { |line| line.include?("ERROR") } })
.add_filter(Filter.new { |input| input.map { |line| line.split[0] } })
.add_filter(Filter.new { |input| input.sort })
.add_filter(Filter.new { |input| input.tally })
result = pipeline.execute
# => {"2025-01-15"=>2, "2025-01-16"=>1}
Web Request Processing
Web frameworks use middleware chains as Pipe and Filter implementations. Each middleware component processes requests and responses, handling concerns like authentication, logging, compression, and caching independently.
class RackMiddleware
def initialize(app)
@app = app
end
def call(env)
@app.call(env)
end
end
class RequestLogger < RackMiddleware
def call(env)
start_time = Time.now
status, headers, body = @app.call(env)
duration = Time.now - start_time
puts "#{env['REQUEST_METHOD']} #{env['PATH_INFO']} - #{status} (#{duration}s)"
[status, headers, body]
end
end
class AuthenticationMiddleware < RackMiddleware
def call(env)
token = env['HTTP_AUTHORIZATION']
unless valid_token?(token)
return [401, {'Content-Type' => 'text/plain'}, ['Unauthorized']]
end
@app.call(env)
end
def valid_token?(token)
# Authentication logic
token && token.start_with?('Bearer ')
end
end
class CompressionMiddleware < RackMiddleware
def call(env)
status, headers, body = @app.call(env)
if env['HTTP_ACCEPT_ENCODING']&.include?('gzip')
compressed_body = compress(body)
headers['Content-Encoding'] = 'gzip'
headers['Content-Length'] = compressed_body.length.to_s
[status, headers, [compressed_body]]
else
[status, headers, body]
end
end
def compress(body)
# Gzip compression
body.join
end
end
# Application endpoint
app = ->(env) { [200, {'Content-Type' => 'text/plain'}, ['Hello World']] }
# Build middleware stack (pipeline)
stack = CompressionMiddleware.new(
AuthenticationMiddleware.new(
RequestLogger.new(app)
)
)
# Process request
env = {
'REQUEST_METHOD' => 'GET',
'PATH_INFO' => '/api/users',
'HTTP_AUTHORIZATION' => 'Bearer token123',
'HTTP_ACCEPT_ENCODING' => 'gzip'
}
response = stack.call(env)
Message Processing System
Distributed systems route messages through filter chains for transformation, routing, and delivery. Each filter handles specific aspects like validation, enrichment, routing decisions, or protocol conversion.
class Message
attr_accessor :id, :type, :payload, :metadata
def initialize(id, type, payload, metadata = {})
@id = id
@type = type
@payload = payload
@metadata = metadata
end
end
class MessageValidator < Filter
REQUIRED_FIELDS = [:id, :type, :payload]
def call(input)
input.select do |message|
REQUIRED_FIELDS.all? { |field| message.send(field) }
end
end
end
class MessageEnricher < Filter
def initialize(user_service)
@user_service = user_service
end
def call(input)
input.map do |message|
if user_id = message.payload[:user_id]
user_data = @user_service.fetch(user_id)
message.metadata[:user_name] = user_data[:name]
message.metadata[:user_tier] = user_data[:tier]
end
message
end
end
end
class MessageRouter < Filter
def initialize(routes)
@routes = routes
end
def call(input)
input.map do |message|
route = @routes[message.type]
message.metadata[:destination] = route if route
message
end
end
end
class MessageTransformer < Filter
def call(input)
input.map do |message|
case message.type
when :order
transform_order(message)
when :notification
transform_notification(message)
else
message
end
end
end
def transform_order(message)
message.payload[:total] = message.payload[:items].sum { |i| i[:price] * i[:quantity] }
message
end
def transform_notification(message)
message.payload[:formatted_text] = "Alert: #{message.payload[:text]}"
message
end
end
class PriorityFilter < Filter
def call(input)
input.sort_by do |message|
tier = message.metadata[:user_tier] || 'standard'
{ 'premium' => 0, 'standard' => 1, 'basic' => 2 }[tier]
end
end
end
# Simulated user service
user_service = Struct.new(:data) do
def fetch(id)
data[id] || { name: 'Unknown', tier: 'basic' }
end
end.new({
123 => { name: 'Alice', tier: 'premium' },
456 => { name: 'Bob', tier: 'standard' }
})
# Create messages
messages = [
Message.new(1, :order, { user_id: 123, items: [{ price: 10, quantity: 2 }] }),
Message.new(2, :notification, { user_id: 456, text: "System maintenance" }),
Message.new(3, :order, { user_id: 789, items: [{ price: 5, quantity: 3 }] })
]
# Build processing pipeline
routes = { order: 'order-queue', notification: 'notification-queue' }
pipeline = Pipeline.new
.add_filter(DataSource.new(messages))
.add_filter(MessageValidator.new)
.add_filter(MessageEnricher.new(user_service))
.add_filter(MessageTransformer.new)
.add_filter(MessageRouter.new(routes))
.add_filter(PriorityFilter.new)
result = pipeline.execute
# => Processed and sorted messages with enriched metadata
Stream Processing System
Real-time data analytics systems process event streams through filter chains. This example shows windowed aggregation and anomaly detection on streaming data.
require 'time'
class Event
attr_accessor :timestamp, :sensor_id, :value
def initialize(timestamp, sensor_id, value)
@timestamp = timestamp
@sensor_id = sensor_id
@value = value
end
end
class WindowAggregator < Filter
def initialize(window_seconds)
@window_seconds = window_seconds
end
def call(input)
windows = input.group_by do |event|
(event.timestamp.to_i / @window_seconds) * @window_seconds
end
windows.map do |window_start, events|
{
window_start: Time.at(window_start),
sensor_id: events.first.sensor_id,
count: events.size,
avg_value: events.sum(&:value) / events.size.to_f,
max_value: events.map(&:value).max,
min_value: events.map(&:value).min
}
end
end
end
class AnomalyDetector < Filter
def initialize(threshold_stddev = 2.0)
@threshold = threshold_stddev
end
def call(input)
values = input.map { |w| w[:avg_value] }
mean = values.sum / values.size.to_f
variance = values.sum { |v| (v - mean) ** 2 } / values.size.to_f
stddev = Math.sqrt(variance)
input.map do |window|
z_score = (window[:avg_value] - mean) / stddev
window[:anomaly] = z_score.abs > @threshold
window[:z_score] = z_score.round(2)
window
end
end
end
class AlertGenerator < Filter
def call(input)
input.select { |window| window[:anomaly] }
.map do |window|
{
alert_time: window[:window_start],
sensor_id: window[:sensor_id],
value: window[:avg_value],
severity: window[:z_score].abs > 3 ? 'critical' : 'warning'
}
end
end
end
# Generate sample event stream
base_time = Time.now
events = (0..20).map do |i|
value = 50 + rand(-5..5)
value += 30 if i == 15 # Anomaly
Event.new(base_time + i * 10, 'sensor-1', value)
end
pipeline = Pipeline.new
.add_filter(DataSource.new(events))
.add_filter(WindowAggregator.new(60))
.add_filter(AnomalyDetector.new(2.0))
.add_filter(AlertGenerator.new)
alerts = pipeline.execute
# => Alert records for anomalous windows
Performance Considerations
Pipeline architectures introduce overhead from data movement between filters and execution coordination. Understanding performance characteristics enables optimization for specific workload requirements.
Data copying between filters impacts memory usage and execution time. Each filter receives input, transforms it, and produces output. Immutable data structures require copying at each stage. Large datasets passing through many filters accumulate significant copying overhead.
# Inefficient: Creates new array at each stage
def inefficient_pipeline(data)
data
.map { |x| x * 2 } # Copy 1
.select { |x| x > 10 } # Copy 2
.map { |x| x + 1 } # Copy 3
.take(100) # Copy 4
end
# Efficient: Lazy evaluation minimizes copies
def efficient_pipeline(data)
data
.lazy # Enable lazy evaluation
.map { |x| x * 2 }
.select { |x| x > 10 }
.map { |x| x + 1 }
.take(100) # Only materializes needed items
.force
end
# Benchmark difference
require 'benchmark'
data = (1..1_000_000).to_a
Benchmark.bm do |x|
x.report("eager") { inefficient_pipeline(data) }
x.report("lazy") { efficient_pipeline(data) }
end
Filter granularity affects performance trade-offs. Fine-grained filters provide composability but increase overhead from method calls and data passing. Coarse-grained filters reduce overhead but decrease reusability and parallel processing opportunities.
# Fine-grained: More overhead, better composability
class AddTax < Filter
def call(input)
input.map { |price| price * 1.08 }
end
end
class RoundPrice < Filter
def call(input)
input.map { |price| price.round(2) }
end
end
class FormatCurrency < Filter
def call(input)
input.map { |price| "$#{price}" }
end
end
# Coarse-grained: Less overhead, reduced composability
class PriceFormatter < Filter
def call(input)
input.map { |price| "$#{(price * 1.08).round(2)}" }
end
end
Buffering strategies balance throughput and latency. Small buffers reduce memory usage but increase synchronization overhead. Large buffers improve throughput but increase latency and memory consumption. Optimal buffer size depends on filter processing rates and data arrival patterns.
Parallel execution improves throughput when filters operate independently. Multiple filter instances process different data items concurrently. Different filters execute simultaneously on different items in the pipeline. Achieving effective parallelism requires balanced filter processing rates to prevent bottlenecks.
require 'concurrent'
# Sequential processing
def sequential_pipeline(data, filters)
data.map do |item|
filters.reduce(item) { |acc, filter| filter.call(acc) }
end
end
# Parallel processing of items
def parallel_pipeline(data, filters)
thread_pool = Concurrent::FixedThreadPool.new(4)
futures = data.map do |item|
Concurrent::Future.execute(executor: thread_pool) do
filters.reduce(item) { |acc, filter| filter.call(acc) }
end
end
results = futures.map(&:value)
thread_pool.shutdown
thread_pool.wait_for_termination
results
end
# Benchmark
data = (1..1000).to_a
filters = [
Filter.new { |x| x * 2 },
Filter.new { |x| Math.sqrt(x) },
Filter.new { |x| x.round(2) }
]
Benchmark.bm do |x|
x.report("sequential") { sequential_pipeline(data, filters) }
x.report("parallel") { parallel_pipeline(data, filters) }
end
Filter fusion combines adjacent filters into single operations to reduce overhead. The optimization eliminates intermediate data structures and method call overhead while preserving pipeline semantics.
# Before fusion: Three separate filters
pipeline1 = Pipeline.new
.add_filter(Filter.new { |data| data.map { |x| x * 2 } })
.add_filter(Filter.new { |data| data.map { |x| x + 1 } })
.add_filter(Filter.new { |data| data.map { |x| x ** 2 } })
# After fusion: Single combined filter
pipeline2 = Pipeline.new
.add_filter(Filter.new { |data|
data.map { |x| ((x * 2) + 1) ** 2 }
})
# Fusion eliminates intermediate arrays and iterations
Memory efficiency improves through streaming approaches. Processing items incrementally instead of materializing entire collections reduces memory footprint. Enumerators and lazy evaluation enable memory-efficient pipelines on large datasets.
class StreamingFilter
def initialize(input_stream)
@input = input_stream
end
def each
return enum_for(:each) unless block_given?
@input.each do |item|
result = process(item)
yield result if result
end
end
def process(item)
item
end
end
# Process large file without loading into memory
def process_large_file(filename)
File.foreach(filename)
.lazy
.map { |line| parse_line(line) }
.select { |record| valid_record?(record) }
.map { |record| transform_record(record) }
.each { |record| write_output(record) }
end
Backpressure handling prevents fast producers from overwhelming slow consumers. Bounded buffers between filters limit memory growth. Blocking writes on full buffers provide natural flow control. This coordination ensures system stability under varying load conditions.
Reference
Pipeline Components
| Component | Description | Characteristics |
|---|---|---|
| Producer Filter | Generates data without input | Acts as data source, initiates pipeline execution |
| Transformer Filter | Modifies input data | Stateless transformation, one-to-one mapping |
| Tester Filter | Filters based on criteria | Selective pass-through, predicate-based |
| Consumer Filter | Processes without output | Acts as data sink, terminates pipeline branch |
| Splitter Filter | Duplicates to multiple outputs | Enables parallel processing paths |
| Merger Filter | Combines multiple inputs | Synchronizes parallel branches |
Filter Patterns
| Pattern | Implementation | Use Case |
|---|---|---|
| Map | Transform each item | Data conversion, calculation, formatting |
| Filter | Select subset based on criteria | Validation, conditional routing |
| Reduce | Aggregate to single value | Summation, counting, accumulation |
| FlatMap | Transform to multiple items | Expansion, decomposition |
| Group | Organize by key | Classification, bucketing |
| Sort | Order by criteria | Ranking, prioritization |
| Window | Batch by time or count | Temporal aggregation, batching |
| Join | Combine from multiple sources | Enrichment, correlation |
Ruby Pipeline Methods
| Method | Category | Behavior |
|---|---|---|
| map | Transformer | Applies function to each element |
| select | Tester | Keeps elements matching predicate |
| reject | Tester | Removes elements matching predicate |
| reduce | Consumer | Aggregates elements to single value |
| flat_map | Transformer | Maps and flattens nested results |
| group_by | Transformer | Groups elements by key function |
| sort_by | Transformer | Orders elements by comparison |
| take | Consumer | Limits output to first n elements |
| lazy | Modifier | Enables lazy evaluation |
| each | Consumer | Iterates without transformation |
Execution Models
| Model | Characteristics | Advantages | Disadvantages |
|---|---|---|---|
| Push-based | Upstream pushes to downstream | Low latency, simple control flow | Requires backpressure handling |
| Pull-based | Downstream pulls from upstream | Natural backpressure, lazy evaluation | Higher latency, complex coordination |
| Synchronous | Sequential execution in caller thread | Simple debugging, predictable | Limited parallelism |
| Asynchronous | Concurrent execution in separate threads | High throughput, parallel processing | Complex synchronization, debugging |
| Streaming | Incremental processing | Memory efficient | Requires streaming-aware filters |
| Batch | Process complete datasets | Optimization opportunities | High memory usage, latency |
Performance Characteristics
| Aspect | Impact | Optimization Technique |
|---|---|---|
| Data copying | Memory and CPU overhead | Use lazy evaluation, in-place modifications |
| Filter overhead | Method call costs | Fuse adjacent filters, reduce granularity |
| Buffering | Memory usage vs throughput | Tune buffer sizes, implement backpressure |
| Synchronization | Thread coordination costs | Minimize shared state, use lock-free structures |
| Serialization | Format conversion overhead | Use binary formats, avoid unnecessary conversion |
| State management | Memory and consistency costs | Keep filters stateless, externalize state |
Design Trade-offs
| Consideration | Option A | Option B |
|---|---|---|
| Filter granularity | Fine-grained: Better composability | Coarse-grained: Lower overhead |
| Data flow | Push: Lower latency | Pull: Better backpressure |
| Execution | Synchronous: Simpler | Asynchronous: Higher throughput |
| State | Stateless: Better parallelism | Stateful: More expressive |
| Coupling | Loose: More flexibility | Tight: Better performance |
| Error handling | Fail fast: Quick feedback | Continue: Better availability |
Implementation Checklist
| Task | Consideration |
|---|---|
| Define filter interface | Standard input/output contract, error handling protocol |
| Choose execution model | Synchronous vs asynchronous, push vs pull |
| Implement pipe mechanism | Buffering strategy, flow control, backpressure |
| Design filter composition | Builder pattern, method chaining, configuration |
| Handle errors | Error propagation strategy, recovery mechanisms |
| Manage state | Stateless filters preferred, externalize when needed |
| Optimize performance | Profile bottlenecks, fuse filters, parallelize |
| Add monitoring | Track throughput, latency, buffer usage |
| Implement testing | Unit test filters, integration test pipelines |
| Document pipeline | Filter purposes, data formats, configuration |