CrackedRuby CrackedRuby

Pipe and Filter Architecture

Overview

Pipe and Filter Architecture organizes systems as a sequence of processing components (filters) that transform data as it passes through connecting channels (pipes). Each filter operates independently, reading input from one or more pipes, performing a specific transformation, and writing output to outgoing pipes. The architecture derives its name from Unix shell pipelines, where commands connect via the pipe operator to form data processing chains.

This pattern addresses the challenge of building flexible data processing systems that remain maintainable as complexity grows. Traditional monolithic processors become difficult to modify and test as requirements change. Pipe and Filter Architecture decomposes processing into discrete, single-purpose components that combine to achieve complex transformations. Each filter implements one transformation step, making the system easier to understand, test, and modify.

The pattern applies across multiple domains. Unix commands form processing pipelines through shell operators. Compilers organize as multi-stage pipelines progressing from lexical analysis through code generation. Data processing systems use filter chains to cleanse, transform, and aggregate information. Web servers apply middleware filters to requests and responses. Message processing systems route and transform messages through filter networks.

# Unix-style pipeline concept
input_data
  .map { |item| filter_one(item) }
  .select { |item| filter_two(item) }
  .map { |item| filter_three(item) }

The architecture imposes constraints that shape system structure. Filters remain independent without shared state. Data flows in one direction through the pipeline. Each filter exposes uniform interfaces for connecting to pipes. These constraints enable composability—filters combine in different arrangements to create varied processing behaviors without modifying individual components.

Key Principles

The Pipe and Filter Architecture rests on several fundamental concepts that define how components interact and data moves through the system.

Filters perform transformations on data. Each filter implements a single, well-defined operation without knowledge of upstream or downstream components. Filters read input, apply transformations, and produce output. The filter remains unaware of data sources or destinations—it only knows the data format it receives and produces. This isolation enables reuse across different pipelines and independent testing of transformation logic.

Filters fall into distinct categories based on their behavior. Producer filters generate data without input, serving as pipeline sources. Examples include file readers, database queries, sensors, or data generators. Transformer filters accept input, modify it, and produce output. These perform operations like format conversion, calculation, validation, or enrichment. Consumer filters accept input without producing output, serving as pipeline endpoints. Examples include file writers, database inserts, displays, or logging components. Tester filters examine input and produce filtered subsets based on criteria, acting as conditional gates in the pipeline.

Pipes connect filters by transmitting data between components. A pipe links one filter's output to another filter's input, establishing data flow direction. Pipes enforce the communication protocol between filters, ensuring output format from one filter matches input expectations of the next. The pipe handles data buffering, flow control, and potentially format adaptation between components.

Pipes implement different connection patterns. Sequential pipes form linear chains where data passes through each filter in order. Split pipes duplicate data to multiple downstream filters for parallel processing. Merge pipes combine outputs from multiple filters into a single stream. Feedback pipes route output back to earlier pipeline stages for iterative processing.

Data flow moves in one direction through the pipeline without backtracking. Filters cannot query upstream components for additional data or send acknowledgments backward. This unidirectional constraint simplifies reasoning about system behavior and prevents circular dependencies. Data arrives at each filter in the order produced by upstream components, though buffering may introduce delays.

The architecture supports two primary execution models. Push-based flow has upstream filters actively send data to downstream components. When a filter produces output, it immediately pushes data to the next filter, which processes it and pushes to subsequent stages. This model minimizes latency but may overwhelm slow downstream filters. Pull-based flow has downstream filters request data from upstream components. When a filter needs input, it pulls from the preceding filter, which may pull from its predecessors. This model naturally handles backpressure but adds latency from request-response cycles.

Filter independence mandates that no filter depends on implementation details of other filters. Filters communicate solely through pipes using agreed-upon data formats. A filter cannot directly invoke methods on other filters, access their internal state, or make assumptions about their presence. This independence allows replacing or rearranging filters without cascade modifications.

Composability emerges from standardized interfaces and independence. The same filter participates in multiple pipelines serving different purposes. Filters combine in various orders to create different processing behaviors. New pipelines assemble from existing filters without code changes. This composability accelerates development and improves code reuse.

Design Considerations

Pipe and Filter Architecture suits specific problem domains while imposing constraints that make it unsuitable for others. Understanding when to apply this pattern requires evaluating data flow characteristics, processing requirements, and system qualities.

The pattern excels when processing follows a clear sequence of transformation steps. Each step represents an independent operation that can execute without knowledge of other steps. The data undergoes successive modifications as it progresses through stages. Examples include compilers transforming source code through parsing, optimization, and code generation; ETL systems extracting, cleansing, and loading data; image processing applying filters like blur, sharpen, and color correction; log processing parsing, filtering, and aggregating events.

Systems requiring high composability benefit from this architecture. When new processing behaviors emerge by rearranging existing filters, development accelerates. When filters reuse across multiple pipelines, maintenance effort decreases. When testing individual filters in isolation improves test coverage, quality increases. This composability advantage grows with the number of distinct filters and pipelines in the system.

The architecture supports parallel processing when filters operate independently without shared state. Multiple instances of the same filter process different data items concurrently. Different filters in the pipeline execute simultaneously on different data items. This parallelism improves throughput on multi-core systems and distributed environments. However, achieving effective parallelism requires careful buffer sizing and flow control to prevent bottlenecks.

Dataflow characteristics determine pattern suitability. The architecture works well with streaming data that arrives continuously and processes incrementally. Batch processing systems where data flows through distinct transformation stages also fit naturally. However, systems requiring random access to data, complex navigation through data structures, or frequent backtracking face challenges. The unidirectional data flow constraint makes it difficult to implement algorithms requiring multiple passes or data lookups.

Interaction patterns influence architectural choice. Systems with primarily data transformation operations match the pattern well. Systems requiring complex control flow, conditional branching based on multiple data sources, or coordination between components face difficulties. The filter independence constraint makes it hard to implement transactions spanning multiple filters or maintain consistency across parallel branches.

Performance trade-offs require evaluation. The architecture introduces overhead from data copying between filters and buffering in pipes. Each filter adds latency as data traverses the pipeline. Systems requiring minimal latency or processing massive data volumes may find this overhead prohibitive. However, the architecture enables optimization opportunities through parallel execution, filter fusion, and pipeline restructuring that can outweigh the overhead.

State management presents challenges in this architecture. Filters should remain stateless between data items to ensure independence and enable parallelism. Transformations requiring state—such as aggregations, joins, or windowing operations—require careful design. Options include embedding state within data items, implementing stateful filters with explicit state management, or using external state stores. Each approach involves trade-offs between filter independence and transformation expressiveness.

Error handling complexity increases with pipeline length. When a filter encounters an error, deciding how to handle it depends on system requirements. Options include propagating errors downstream as special data items, logging errors and continuing processing, halting the pipeline, or routing problematic data to error-handling filters. The error strategy must balance fault tolerance with processing guarantees.

Alternative patterns address similar problems with different trade-offs. Layered Architecture organizes systems as hierarchical layers where each layer provides services to the layer above. This pattern suits systems with clear abstraction levels but lacks the composability and parallel processing of Pipe and Filter. Microservices Architecture decomposes systems into independently deployable services communicating through APIs. This provides deployment flexibility and technology diversity but adds operational complexity. Event-Driven Architecture connects components through asynchronous event propagation, supporting loose coupling but introducing complexity in reasoning about system behavior.

Selecting between patterns depends on primary system goals. Choose Pipe and Filter when transformation composability and parallel processing dominate requirements. Choose Layered when abstraction levels and dependency management matter most. Choose Microservices when deployment independence and team autonomy take priority. Choose Event-Driven when asynchronous processing and component decoupling drive decisions.

Ruby Implementation

Ruby's enumerable methods and object model naturally express Pipe and Filter concepts. The language provides multiple approaches for building pipelines, from simple method chains to explicit filter classes.

Method chaining creates pipelines by connecting enumerable operations. Each method acts as a filter, transforming data and passing results to the next method. This approach works well for straightforward pipelines with standard transformations.

# Simple pipeline using enumerable methods
data = ["alpha", "beta", "gamma", "delta"]

result = data
  .map(&:upcase)                    # Transform filter
  .select { |s| s.length > 4 }      # Tester filter
  .map { |s| s.reverse }            # Transform filter
  .first(2)                         # Consumer filter

# => ["AHPLA", "AMMAD"]

Proc objects encapsulate filters as callable objects that combine through composition. This approach provides more explicit filter definitions while maintaining Ruby's functional style.

# Filters as Proc objects
uppercase_filter = ->(items) { items.map(&:upcase) }
length_filter = ->(items) { items.select { |s| s.length > 4 } }
reverse_filter = ->(items) { items.map(&:reverse) }

# Compose filters into pipeline
pipeline = ->(data) {
  reverse_filter.call(
    length_filter.call(
      uppercase_filter.call(data)
    )
  )
}

result = pipeline.call(["alpha", "beta", "gamma", "delta"])

Filter classes provide reusable components with explicit interfaces. Each filter class implements a standard interface for processing data, enabling composition and substitution.

# Base filter interface
class Filter
  def call(input)
    raise NotImplementedError
  end
end

# Producer filter: generates data
class DataSource < Filter
  def initialize(data)
    @data = data
  end
  
  def call(input = nil)
    @data
  end
end

# Transformer filter: modifies data
class UppercaseTransformer < Filter
  def call(input)
    input.map(&:upcase)
  end
end

class LengthFilter < Filter
  def initialize(min_length)
    @min_length = min_length
  end
  
  def call(input)
    input.select { |s| s.length > @min_length }
  end
end

class ReverseTransformer < Filter
  def call(input)
    input.map(&:reverse)
  end
end

# Pipeline composition
class Pipeline
  def initialize
    @filters = []
  end
  
  def add_filter(filter)
    @filters << filter
    self
  end
  
  def execute(input = nil)
    @filters.reduce(input) { |data, filter| filter.call(data) }
  end
end

# Build and execute pipeline
pipeline = Pipeline.new
  .add_filter(DataSource.new(["alpha", "beta", "gamma", "delta"]))
  .add_filter(UppercaseTransformer.new)
  .add_filter(LengthFilter.new(4))
  .add_filter(ReverseTransformer.new)

result = pipeline.execute
# => ["AHPLA", "AMMED", "ATLED"]

Enumerator objects enable lazy evaluation for efficient memory usage with large datasets. Filters yield results incrementally instead of materializing entire collections.

# Lazy pipeline with Enumerator
class LazyFilter
  def initialize(source)
    @source = source
  end
  
  def each
    return enum_for(:each) unless block_given?
    
    @source.each do |item|
      result = transform(item)
      yield result if result
    end
  end
  
  def transform(item)
    item  # Override in subclasses
  end
end

class LazyUppercase < LazyFilter
  def transform(item)
    item.upcase
  end
end

class LazyLengthFilter < LazyFilter
  def initialize(source, min_length)
    super(source)
    @min_length = min_length
  end
  
  def transform(item)
    item if item.length > @min_length
  end
end

class LazyReverse < LazyFilter
  def transform(item)
    item.reverse
  end
end

# Build lazy pipeline
data = ["alpha", "beta", "gamma", "delta"]
pipeline = LazyReverse.new(
  LazyLengthFilter.new(
    LazyUppercase.new(data),
    4
  )
)

# Data processes on-demand
pipeline.each do |item|
  puts item  # Processes one item at a time
end

Concurrent pipelines process data in parallel using threads or fibers. This approach improves throughput for CPU-bound transformations on multi-core systems.

require 'concurrent'

class ConcurrentPipeline
  def initialize(thread_pool_size = 4)
    @filters = []
    @pool = Concurrent::FixedThreadPool.new(thread_pool_size)
  end
  
  def add_filter(filter)
    @filters << filter
    self
  end
  
  def execute(input_data)
    # Process items concurrently through pipeline
    futures = input_data.map do |item|
      Concurrent::Future.execute(executor: @pool) do
        @filters.reduce(item) { |data, filter| filter.call(data) }
      end
    end
    
    # Collect results
    futures.map(&:value)
  ensure
    @pool.shutdown
    @pool.wait_for_termination
  end
end

# Single-item filters for concurrent processing
class ItemUppercase < Filter
  def call(input)
    input.upcase
  end
end

class ItemLengthCheck < Filter
  def initialize(min_length)
    @min_length = min_length
  end
  
  def call(input)
    input if input.length > @min_length
  end
end

class ItemReverse < Filter
  def call(input)
    input.reverse
  end
end

# Execute concurrent pipeline
pipeline = ConcurrentPipeline.new(4)
  .add_filter(ItemUppercase.new)
  .add_filter(ItemLengthCheck.new(4))
  .add_filter(ItemReverse.new)

result = pipeline.execute(["alpha", "beta", "gamma", "delta"])
# => ["AHPLA", nil, "AMMAD", "ATLED"]

Named pipe pattern implements explicit pipe objects that handle data buffering and flow control between filters.

require 'thread'

class Pipe
  def initialize(buffer_size = 10)
    @queue = Queue.new
    @buffer_size = buffer_size
    @closed = false
  end
  
  def write(data)
    raise "Pipe closed" if @closed
    @queue.push(data)
  end
  
  def read
    @queue.pop
  end
  
  def close
    @closed = true
    @queue.push(:end_of_stream)
  end
  
  def closed?
    @closed
  end
end

class ThreadedFilter
  def initialize(input_pipe, output_pipe)
    @input_pipe = input_pipe
    @output_pipe = output_pipe
  end
  
  def start
    Thread.new do
      loop do
        data = @input_pipe.read
        break if data == :end_of_stream
        
        result = process(data)
        @output_pipe.write(result) if result
      end
      
      @output_pipe.close
    end
  end
  
  def process(data)
    data  # Override in subclasses
  end
end

# Example usage with pipes
pipe1 = Pipe.new
pipe2 = Pipe.new
pipe3 = Pipe.new

filter1 = ThreadedFilter.new(pipe1, pipe2) do |data|
  data.upcase
end

filter2 = ThreadedFilter.new(pipe2, pipe3) do |data|
  data.reverse if data.length > 4
end

# Start filters
t1 = filter1.start
t2 = filter2.start

# Write data to pipeline
["alpha", "beta", "gamma"].each { |item| pipe1.write(item) }
pipe1.close

# Read results
results = []
loop do
  data = pipe3.read
  break if data == :end_of_stream
  results << data
end

[t1, t2].each(&:join)
# results => ["AHPLA", "AMMAD"]

Practical Examples

Log Processing Pipeline

Log analysis systems process streams of log entries through multiple transformation stages. This example demonstrates a pipeline that parses, filters, enriches, and aggregates log data.

class LogEntry
  attr_accessor :timestamp, :level, :message, :metadata
  
  def initialize(timestamp, level, message, metadata = {})
    @timestamp = timestamp
    @level = level
    @message = message
    @metadata = metadata
  end
end

# Producer: Read log lines
class LogReader < Filter
  def initialize(log_lines)
    @log_lines = log_lines
  end
  
  def call(input = nil)
    @log_lines
  end
end

# Parser: Convert strings to structured entries
class LogParser < Filter
  PATTERN = /^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+)$/
  
  def call(input)
    input.map do |line|
      if match = line.match(PATTERN)
        LogEntry.new(
          Time.parse(match[1]),
          match[2],
          match[3]
        )
      end
    end.compact
  end
end

# Filter: Select entries matching criteria
class LogLevelFilter < Filter
  def initialize(levels)
    @levels = levels
  end
  
  def call(input)
    input.select { |entry| @levels.include?(entry.level) }
  end
end

# Enrichment: Add metadata
class LogEnricher < Filter
  def initialize(enrichment_data)
    @enrichment_data = enrichment_data
  end
  
  def call(input)
    input.map do |entry|
      # Extract user ID from message
      if entry.message =~ /user_id=(\d+)/
        user_id = $1
        entry.metadata[:user_name] = @enrichment_data[user_id]
      end
      entry
    end
  end
end

# Aggregator: Group and count
class LogAggregator < Filter
  def call(input)
    input.group_by(&:level).transform_values(&:count)
  end
end

# Execute pipeline
log_lines = [
  "2025-01-15 10:23:45 [INFO] User logged in user_id=123",
  "2025-01-15 10:24:12 [ERROR] Database connection failed",
  "2025-01-15 10:24:45 [WARN] High memory usage detected",
  "2025-01-15 10:25:03 [ERROR] Request timeout user_id=456",
  "2025-01-15 10:25:30 [INFO] Cache invalidated"
]

enrichment = { "123" => "alice", "456" => "bob" }

pipeline = Pipeline.new
  .add_filter(LogReader.new(log_lines))
  .add_filter(LogParser.new)
  .add_filter(LogLevelFilter.new(["ERROR", "WARN"]))
  .add_filter(LogEnricher.new(enrichment))
  .add_filter(LogAggregator.new)

result = pipeline.execute
# => {"ERROR"=>2, "WARN"=>1}

Data ETL Pipeline

Extract, Transform, Load operations follow the Pipe and Filter pattern naturally. This example shows a pipeline that extracts data from multiple sources, transforms it, and loads into a target system.

# Extract filters
class DatabaseExtractor < Filter
  def initialize(connection)
    @connection = connection
  end
  
  def call(input = nil)
    # Simulated database query
    [
      { id: 1, name: "Product A", price: 100, category: "electronics" },
      { id: 2, name: "Product B", price: 50, category: "books" },
      { id: 3, name: "Product C", price: 200, category: "electronics" }
    ]
  end
end

class APIExtractor < Filter
  def initialize(endpoint)
    @endpoint = endpoint
  end
  
  def call(input = nil)
    # Simulated API response
    [
      { product_id: 1, inventory: 50, warehouse: "east" },
      { product_id: 2, inventory: 100, warehouse: "west" },
      { product_id: 3, inventory: 25, warehouse: "east" }
    ]
  end
end

# Transform filters
class DataJoiner < Filter
  def initialize(join_key, secondary_data)
    @join_key = join_key
    @secondary_data = secondary_data
  end
  
  def call(input)
    input.map do |record|
      join_value = record[@join_key]
      secondary = @secondary_data.find { |r| r[:product_id] == join_value }
      record.merge(secondary || {})
    end
  end
end

class PriceCalculator < Filter
  TAX_RATE = 0.08
  
  def call(input)
    input.map do |record|
      if record[:price]
        record[:price_with_tax] = (record[:price] * (1 + TAX_RATE)).round(2)
      end
      record
    end
  end
end

class CategoryFilter < Filter
  def initialize(allowed_categories)
    @allowed_categories = allowed_categories
  end
  
  def call(input)
    input.select { |record| @allowed_categories.include?(record[:category]) }
  end
end

# Transform to output format
class OutputFormatter < Filter
  def call(input)
    input.map do |record|
      {
        product_name: record[:name],
        final_price: record[:price_with_tax],
        stock_level: record[:inventory],
        location: record[:warehouse]
      }
    end
  end
end

# Execute ETL pipeline
db_data = DatabaseExtractor.new("db_connection").call
api_data = APIExtractor.new("api.example.com").call

pipeline = Pipeline.new
  .add_filter(DataSource.new(db_data))
  .add_filter(DataJoiner.new(:id, api_data))
  .add_filter(PriceCalculator.new)
  .add_filter(CategoryFilter.new(["electronics"]))
  .add_filter(OutputFormatter.new)

result = pipeline.execute
# => [
#   {product_name: "Product A", final_price: 108.0, stock_level: 50, location: "east"},
#   {product_name: "Product C", final_price: 216.0, stock_level: 25, location: "east"}
# ]

Image Processing Pipeline

Image transformations compose naturally as filter chains. This example demonstrates basic image operations through a pipeline structure.

class Image
  attr_accessor :pixels, :width, :height
  
  def initialize(width, height, pixels)
    @width = width
    @height = height
    @pixels = pixels
  end
  
  def get_pixel(x, y)
    @pixels[y * @width + x]
  end
  
  def set_pixel(x, y, value)
    @pixels[y * @width + x] = value
  end
end

# Brightness adjustment filter
class BrightnessFilter < Filter
  def initialize(adjustment)
    @adjustment = adjustment
  end
  
  def call(input)
    input.pixels = input.pixels.map { |p| [(p + @adjustment).clamp(0, 255), 255].min }
    input
  end
end

# Contrast adjustment filter
class ContrastFilter < Filter
  def initialize(factor)
    @factor = factor
  end
  
  def call(input)
    input.pixels = input.pixels.map do |p|
      ((p - 128) * @factor + 128).clamp(0, 255).round
    end
    input
  end
end

# Blur filter (simple box blur)
class BlurFilter < Filter
  def initialize(radius = 1)
    @radius = radius
  end
  
  def call(input)
    output_pixels = input.pixels.dup
    
    input.height.times do |y|
      input.width.times do |x|
        sum = 0
        count = 0
        
        (-@radius..@radius).each do |dy|
          (-@radius..@radius).each do |dx|
            nx, ny = x + dx, y + dy
            if nx >= 0 && nx < input.width && ny >= 0 && ny < input.height
              sum += input.get_pixel(nx, ny)
              count += 1
            end
          end
        end
        
        output_pixels[y * input.width + x] = sum / count
      end
    end
    
    input.pixels = output_pixels
    input
  end
end

# Threshold filter (binarization)
class ThresholdFilter < Filter
  def initialize(threshold = 128)
    @threshold = threshold
  end
  
  def call(input)
    input.pixels = input.pixels.map { |p| p >= @threshold ? 255 : 0 }
    input
  end
end

# Create and process image
image = Image.new(4, 4, [
  100, 120, 130, 140,
  110, 125, 135, 145,
  120, 130, 140, 150,
  130, 140, 150, 160
])

pipeline = Pipeline.new
  .add_filter(BrightnessFilter.new(20))
  .add_filter(ContrastFilter.new(1.2))
  .add_filter(BlurFilter.new(1))
  .add_filter(ThresholdFilter.new(140))

result = pipeline.execute(image)
# => Image with processed pixels

Real-World Applications

Unix Command Pipeline

Operating systems implement Pipe and Filter Architecture at the shell level. Commands connect through pipes to perform complex text processing operations. Each command acts as an independent filter processing standard input and producing standard output.

# Ruby implementation of Unix-style pipeline
class ShellCommand < Filter
  def initialize(command)
    @command = command
  end
  
  def call(input)
    IO.popen(@command, "r+") do |pipe|
      pipe.write(input.join("\n"))
      pipe.close_write
      pipe.read.split("\n")
    end
  end
end

# Example: Process log files similar to:
# cat access.log | grep "ERROR" | cut -d' ' -f1 | sort | uniq -c
log_data = [
  "2025-01-15 ERROR Database timeout",
  "2025-01-15 INFO Request completed",
  "2025-01-16 ERROR Connection failed",
  "2025-01-15 ERROR Database timeout"
]

pipeline = Pipeline.new
  .add_filter(DataSource.new(log_data))
  .add_filter(Filter.new { |input| input.select { |line| line.include?("ERROR") } })
  .add_filter(Filter.new { |input| input.map { |line| line.split[0] } })
  .add_filter(Filter.new { |input| input.sort })
  .add_filter(Filter.new { |input| input.tally })

result = pipeline.execute
# => {"2025-01-15"=>2, "2025-01-16"=>1}

Web Request Processing

Web frameworks use middleware chains as Pipe and Filter implementations. Each middleware component processes requests and responses, handling concerns like authentication, logging, compression, and caching independently.

class RackMiddleware
  def initialize(app)
    @app = app
  end
  
  def call(env)
    @app.call(env)
  end
end

class RequestLogger < RackMiddleware
  def call(env)
    start_time = Time.now
    status, headers, body = @app.call(env)
    duration = Time.now - start_time
    
    puts "#{env['REQUEST_METHOD']} #{env['PATH_INFO']} - #{status} (#{duration}s)"
    [status, headers, body]
  end
end

class AuthenticationMiddleware < RackMiddleware
  def call(env)
    token = env['HTTP_AUTHORIZATION']
    
    unless valid_token?(token)
      return [401, {'Content-Type' => 'text/plain'}, ['Unauthorized']]
    end
    
    @app.call(env)
  end
  
  def valid_token?(token)
    # Authentication logic
    token && token.start_with?('Bearer ')
  end
end

class CompressionMiddleware < RackMiddleware
  def call(env)
    status, headers, body = @app.call(env)
    
    if env['HTTP_ACCEPT_ENCODING']&.include?('gzip')
      compressed_body = compress(body)
      headers['Content-Encoding'] = 'gzip'
      headers['Content-Length'] = compressed_body.length.to_s
      [status, headers, [compressed_body]]
    else
      [status, headers, body]
    end
  end
  
  def compress(body)
    # Gzip compression
    body.join
  end
end

# Application endpoint
app = ->(env) { [200, {'Content-Type' => 'text/plain'}, ['Hello World']] }

# Build middleware stack (pipeline)
stack = CompressionMiddleware.new(
  AuthenticationMiddleware.new(
    RequestLogger.new(app)
  )
)

# Process request
env = {
  'REQUEST_METHOD' => 'GET',
  'PATH_INFO' => '/api/users',
  'HTTP_AUTHORIZATION' => 'Bearer token123',
  'HTTP_ACCEPT_ENCODING' => 'gzip'
}

response = stack.call(env)

Message Processing System

Distributed systems route messages through filter chains for transformation, routing, and delivery. Each filter handles specific aspects like validation, enrichment, routing decisions, or protocol conversion.

class Message
  attr_accessor :id, :type, :payload, :metadata
  
  def initialize(id, type, payload, metadata = {})
    @id = id
    @type = type
    @payload = payload
    @metadata = metadata
  end
end

class MessageValidator < Filter
  REQUIRED_FIELDS = [:id, :type, :payload]
  
  def call(input)
    input.select do |message|
      REQUIRED_FIELDS.all? { |field| message.send(field) }
    end
  end
end

class MessageEnricher < Filter
  def initialize(user_service)
    @user_service = user_service
  end
  
  def call(input)
    input.map do |message|
      if user_id = message.payload[:user_id]
        user_data = @user_service.fetch(user_id)
        message.metadata[:user_name] = user_data[:name]
        message.metadata[:user_tier] = user_data[:tier]
      end
      message
    end
  end
end

class MessageRouter < Filter
  def initialize(routes)
    @routes = routes
  end
  
  def call(input)
    input.map do |message|
      route = @routes[message.type]
      message.metadata[:destination] = route if route
      message
    end
  end
end

class MessageTransformer < Filter
  def call(input)
    input.map do |message|
      case message.type
      when :order
        transform_order(message)
      when :notification
        transform_notification(message)
      else
        message
      end
    end
  end
  
  def transform_order(message)
    message.payload[:total] = message.payload[:items].sum { |i| i[:price] * i[:quantity] }
    message
  end
  
  def transform_notification(message)
    message.payload[:formatted_text] = "Alert: #{message.payload[:text]}"
    message
  end
end

class PriorityFilter < Filter
  def call(input)
    input.sort_by do |message|
      tier = message.metadata[:user_tier] || 'standard'
      { 'premium' => 0, 'standard' => 1, 'basic' => 2 }[tier]
    end
  end
end

# Simulated user service
user_service = Struct.new(:data) do
  def fetch(id)
    data[id] || { name: 'Unknown', tier: 'basic' }
  end
end.new({
  123 => { name: 'Alice', tier: 'premium' },
  456 => { name: 'Bob', tier: 'standard' }
})

# Create messages
messages = [
  Message.new(1, :order, { user_id: 123, items: [{ price: 10, quantity: 2 }] }),
  Message.new(2, :notification, { user_id: 456, text: "System maintenance" }),
  Message.new(3, :order, { user_id: 789, items: [{ price: 5, quantity: 3 }] })
]

# Build processing pipeline
routes = { order: 'order-queue', notification: 'notification-queue' }

pipeline = Pipeline.new
  .add_filter(DataSource.new(messages))
  .add_filter(MessageValidator.new)
  .add_filter(MessageEnricher.new(user_service))
  .add_filter(MessageTransformer.new)
  .add_filter(MessageRouter.new(routes))
  .add_filter(PriorityFilter.new)

result = pipeline.execute
# => Processed and sorted messages with enriched metadata

Stream Processing System

Real-time data analytics systems process event streams through filter chains. This example shows windowed aggregation and anomaly detection on streaming data.

require 'time'

class Event
  attr_accessor :timestamp, :sensor_id, :value
  
  def initialize(timestamp, sensor_id, value)
    @timestamp = timestamp
    @sensor_id = sensor_id
    @value = value
  end
end

class WindowAggregator < Filter
  def initialize(window_seconds)
    @window_seconds = window_seconds
  end
  
  def call(input)
    windows = input.group_by do |event|
      (event.timestamp.to_i / @window_seconds) * @window_seconds
    end
    
    windows.map do |window_start, events|
      {
        window_start: Time.at(window_start),
        sensor_id: events.first.sensor_id,
        count: events.size,
        avg_value: events.sum(&:value) / events.size.to_f,
        max_value: events.map(&:value).max,
        min_value: events.map(&:value).min
      }
    end
  end
end

class AnomalyDetector < Filter
  def initialize(threshold_stddev = 2.0)
    @threshold = threshold_stddev
  end
  
  def call(input)
    values = input.map { |w| w[:avg_value] }
    mean = values.sum / values.size.to_f
    variance = values.sum { |v| (v - mean) ** 2 } / values.size.to_f
    stddev = Math.sqrt(variance)
    
    input.map do |window|
      z_score = (window[:avg_value] - mean) / stddev
      window[:anomaly] = z_score.abs > @threshold
      window[:z_score] = z_score.round(2)
      window
    end
  end
end

class AlertGenerator < Filter
  def call(input)
    input.select { |window| window[:anomaly] }
          .map do |window|
      {
        alert_time: window[:window_start],
        sensor_id: window[:sensor_id],
        value: window[:avg_value],
        severity: window[:z_score].abs > 3 ? 'critical' : 'warning'
      }
    end
  end
end

# Generate sample event stream
base_time = Time.now
events = (0..20).map do |i|
  value = 50 + rand(-5..5)
  value += 30 if i == 15  # Anomaly
  Event.new(base_time + i * 10, 'sensor-1', value)
end

pipeline = Pipeline.new
  .add_filter(DataSource.new(events))
  .add_filter(WindowAggregator.new(60))
  .add_filter(AnomalyDetector.new(2.0))
  .add_filter(AlertGenerator.new)

alerts = pipeline.execute
# => Alert records for anomalous windows

Performance Considerations

Pipeline architectures introduce overhead from data movement between filters and execution coordination. Understanding performance characteristics enables optimization for specific workload requirements.

Data copying between filters impacts memory usage and execution time. Each filter receives input, transforms it, and produces output. Immutable data structures require copying at each stage. Large datasets passing through many filters accumulate significant copying overhead.

# Inefficient: Creates new array at each stage
def inefficient_pipeline(data)
  data
    .map { |x| x * 2 }        # Copy 1
    .select { |x| x > 10 }     # Copy 2
    .map { |x| x + 1 }         # Copy 3
    .take(100)                 # Copy 4
end

# Efficient: Lazy evaluation minimizes copies
def efficient_pipeline(data)
  data
    .lazy                       # Enable lazy evaluation
    .map { |x| x * 2 }
    .select { |x| x > 10 }
    .map { |x| x + 1 }
    .take(100)                  # Only materializes needed items
    .force
end

# Benchmark difference
require 'benchmark'
data = (1..1_000_000).to_a

Benchmark.bm do |x|
  x.report("eager") { inefficient_pipeline(data) }
  x.report("lazy")  { efficient_pipeline(data) }
end

Filter granularity affects performance trade-offs. Fine-grained filters provide composability but increase overhead from method calls and data passing. Coarse-grained filters reduce overhead but decrease reusability and parallel processing opportunities.

# Fine-grained: More overhead, better composability
class AddTax < Filter
  def call(input)
    input.map { |price| price * 1.08 }
  end
end

class RoundPrice < Filter
  def call(input)
    input.map { |price| price.round(2) }
  end
end

class FormatCurrency < Filter
  def call(input)
    input.map { |price| "$#{price}" }
  end
end

# Coarse-grained: Less overhead, reduced composability
class PriceFormatter < Filter
  def call(input)
    input.map { |price| "$#{(price * 1.08).round(2)}" }
  end
end

Buffering strategies balance throughput and latency. Small buffers reduce memory usage but increase synchronization overhead. Large buffers improve throughput but increase latency and memory consumption. Optimal buffer size depends on filter processing rates and data arrival patterns.

Parallel execution improves throughput when filters operate independently. Multiple filter instances process different data items concurrently. Different filters execute simultaneously on different items in the pipeline. Achieving effective parallelism requires balanced filter processing rates to prevent bottlenecks.

require 'concurrent'

# Sequential processing
def sequential_pipeline(data, filters)
  data.map do |item|
    filters.reduce(item) { |acc, filter| filter.call(acc) }
  end
end

# Parallel processing of items
def parallel_pipeline(data, filters)
  thread_pool = Concurrent::FixedThreadPool.new(4)
  
  futures = data.map do |item|
    Concurrent::Future.execute(executor: thread_pool) do
      filters.reduce(item) { |acc, filter| filter.call(acc) }
    end
  end
  
  results = futures.map(&:value)
  thread_pool.shutdown
  thread_pool.wait_for_termination
  results
end

# Benchmark
data = (1..1000).to_a
filters = [
  Filter.new { |x| x * 2 },
  Filter.new { |x| Math.sqrt(x) },
  Filter.new { |x| x.round(2) }
]

Benchmark.bm do |x|
  x.report("sequential") { sequential_pipeline(data, filters) }
  x.report("parallel")   { parallel_pipeline(data, filters) }
end

Filter fusion combines adjacent filters into single operations to reduce overhead. The optimization eliminates intermediate data structures and method call overhead while preserving pipeline semantics.

# Before fusion: Three separate filters
pipeline1 = Pipeline.new
  .add_filter(Filter.new { |data| data.map { |x| x * 2 } })
  .add_filter(Filter.new { |data| data.map { |x| x + 1 } })
  .add_filter(Filter.new { |data| data.map { |x| x ** 2 } })

# After fusion: Single combined filter
pipeline2 = Pipeline.new
  .add_filter(Filter.new { |data| 
    data.map { |x| ((x * 2) + 1) ** 2 }
  })

# Fusion eliminates intermediate arrays and iterations

Memory efficiency improves through streaming approaches. Processing items incrementally instead of materializing entire collections reduces memory footprint. Enumerators and lazy evaluation enable memory-efficient pipelines on large datasets.

class StreamingFilter
  def initialize(input_stream)
    @input = input_stream
  end
  
  def each
    return enum_for(:each) unless block_given?
    
    @input.each do |item|
      result = process(item)
      yield result if result
    end
  end
  
  def process(item)
    item
  end
end

# Process large file without loading into memory
def process_large_file(filename)
  File.foreach(filename)
    .lazy
    .map { |line| parse_line(line) }
    .select { |record| valid_record?(record) }
    .map { |record| transform_record(record) }
    .each { |record| write_output(record) }
end

Backpressure handling prevents fast producers from overwhelming slow consumers. Bounded buffers between filters limit memory growth. Blocking writes on full buffers provide natural flow control. This coordination ensures system stability under varying load conditions.

Reference

Pipeline Components

Component Description Characteristics
Producer Filter Generates data without input Acts as data source, initiates pipeline execution
Transformer Filter Modifies input data Stateless transformation, one-to-one mapping
Tester Filter Filters based on criteria Selective pass-through, predicate-based
Consumer Filter Processes without output Acts as data sink, terminates pipeline branch
Splitter Filter Duplicates to multiple outputs Enables parallel processing paths
Merger Filter Combines multiple inputs Synchronizes parallel branches

Filter Patterns

Pattern Implementation Use Case
Map Transform each item Data conversion, calculation, formatting
Filter Select subset based on criteria Validation, conditional routing
Reduce Aggregate to single value Summation, counting, accumulation
FlatMap Transform to multiple items Expansion, decomposition
Group Organize by key Classification, bucketing
Sort Order by criteria Ranking, prioritization
Window Batch by time or count Temporal aggregation, batching
Join Combine from multiple sources Enrichment, correlation

Ruby Pipeline Methods

Method Category Behavior
map Transformer Applies function to each element
select Tester Keeps elements matching predicate
reject Tester Removes elements matching predicate
reduce Consumer Aggregates elements to single value
flat_map Transformer Maps and flattens nested results
group_by Transformer Groups elements by key function
sort_by Transformer Orders elements by comparison
take Consumer Limits output to first n elements
lazy Modifier Enables lazy evaluation
each Consumer Iterates without transformation

Execution Models

Model Characteristics Advantages Disadvantages
Push-based Upstream pushes to downstream Low latency, simple control flow Requires backpressure handling
Pull-based Downstream pulls from upstream Natural backpressure, lazy evaluation Higher latency, complex coordination
Synchronous Sequential execution in caller thread Simple debugging, predictable Limited parallelism
Asynchronous Concurrent execution in separate threads High throughput, parallel processing Complex synchronization, debugging
Streaming Incremental processing Memory efficient Requires streaming-aware filters
Batch Process complete datasets Optimization opportunities High memory usage, latency

Performance Characteristics

Aspect Impact Optimization Technique
Data copying Memory and CPU overhead Use lazy evaluation, in-place modifications
Filter overhead Method call costs Fuse adjacent filters, reduce granularity
Buffering Memory usage vs throughput Tune buffer sizes, implement backpressure
Synchronization Thread coordination costs Minimize shared state, use lock-free structures
Serialization Format conversion overhead Use binary formats, avoid unnecessary conversion
State management Memory and consistency costs Keep filters stateless, externalize state

Design Trade-offs

Consideration Option A Option B
Filter granularity Fine-grained: Better composability Coarse-grained: Lower overhead
Data flow Push: Lower latency Pull: Better backpressure
Execution Synchronous: Simpler Asynchronous: Higher throughput
State Stateless: Better parallelism Stateful: More expressive
Coupling Loose: More flexibility Tight: Better performance
Error handling Fail fast: Quick feedback Continue: Better availability

Implementation Checklist

Task Consideration
Define filter interface Standard input/output contract, error handling protocol
Choose execution model Synchronous vs asynchronous, push vs pull
Implement pipe mechanism Buffering strategy, flow control, backpressure
Design filter composition Builder pattern, method chaining, configuration
Handle errors Error propagation strategy, recovery mechanisms
Manage state Stateless filters preferred, externalize when needed
Optimize performance Profile bottlenecks, fuse filters, parallelize
Add monitoring Track throughput, latency, buffer usage
Implement testing Unit test filters, integration test pipelines
Document pipeline Filter purposes, data formats, configuration