CrackedRuby CrackedRuby

Overview

Stream processing refers to the continuous computation over unbounded data sets, where data arrives incrementally and results are produced as soon as possible. Unlike batch processing which operates on finite data sets with defined start and end points, stream processing handles data in motion, processing events as they occur.

The concept emerged from the need to handle high-velocity data sources such as sensor networks, financial trading systems, and user activity logs. Traditional batch processing systems introduced latency measured in hours or days, making them unsuitable for applications requiring immediate insights or rapid response to events.

Stream processing systems operate on the principle that data flows through a pipeline of processing stages. Each stage transforms, filters, aggregates, or enriches the data before passing it downstream. The processing model distinguishes between event time (when an event actually occurred) and processing time (when the system processes the event), a critical distinction for maintaining correctness in distributed systems.

# Conceptual stream processing flow
class StreamProcessor
  def initialize
    @source = EventSource.new
    @pipeline = []
  end

  def transform(&block)
    @pipeline << block
    self
  end

  def run
    @source.each_event do |event|
      result = @pipeline.reduce(event) { |data, stage| stage.call(data) }
      emit(result)
    end
  end
end

# Usage
processor = StreamProcessor.new
  .transform { |event| event.merge(timestamp: Time.now) }
  .transform { |event| event[:value] > 100 ? event : nil }
  .run

Modern stream processing systems must handle challenges including out-of-order data arrival, late-arriving data, exactly-once processing semantics, state management across distributed nodes, and failure recovery without data loss. These requirements drive architectural decisions and implementation complexity.

Key Principles

Event Time vs Processing Time

Event time represents when an event occurred in the real world, typically embedded in the event payload. Processing time represents when the stream processing system handles the event. The gap between these timestamps, known as event time skew, varies based on network latency, system load, and producer behavior. Processing systems must decide which time domain to use for operations like windowing and aggregation.

class Event
  attr_reader :data, :event_time, :processing_time

  def initialize(data, event_time)
    @data = data
    @event_time = event_time
    @processing_time = Time.now
  end

  def skew
    @processing_time - @event_time
  end
end

# Event arrives with embedded timestamp
event = Event.new({ sensor_id: 'temp_01', value: 22.5 }, Time.now - 300)
# => #<Event @event_time=2025-10-11 10:00:00, @processing_time=2025-10-11 10:05:00>
event.skew
# => 300 seconds

Windowing

Windows partition infinite streams into finite chunks for aggregation. Tumbling windows divide time into fixed, non-overlapping intervals. Sliding windows overlap, creating multiple windows for each event. Session windows group events separated by gaps of inactivity. Each windowing strategy serves different analytical requirements and impacts memory usage and computational complexity.

State Management

Stateful operations maintain information across multiple events. State can be local to a single processing node or distributed across a cluster. State management includes handling checkpointing for fault tolerance, state size limitations, and state access patterns. State must survive process restarts and node failures while maintaining consistency.

Backpressure

When downstream consumers process events slower than upstream producers generate them, the system experiences backpressure. Handling strategies include buffering (with bounded queues), dropping events, sampling, or signaling producers to slow down. Each approach trades off between system stability, data loss, and latency.

Exactly-Once Semantics

Stream processing systems offer different delivery guarantees. At-most-once delivery may lose events during failures. At-least-once delivery may process events multiple times. Exactly-once delivery ensures each event affects final results precisely once, requiring coordination between sources, processing, and sinks through distributed transactions or idempotent operations.

class ExactlyOnceProcessor
  def initialize(offset_store, output_store)
    @offset_store = offset_store
    @output_store = output_store
  end

  def process_event(event)
    offset = event[:offset]
    
    # Check if already processed
    return if @offset_store.processed?(offset)
    
    # Process within transaction
    @output_store.transaction do
      result = transform(event[:data])
      @output_store.write(result)
      @offset_store.mark_processed(offset)
    end
  end

  def transform(data)
    # Idempotent transformation
    data.merge(processed_at: Time.now.to_i)
  end
end

Watermarks

Watermarks represent the system's estimate of event time progress. A watermark at time T indicates no more events with event time less than T will arrive. Systems use watermarks to trigger window computations and determine when to emit results. Watermark generation balances completeness (waiting for late data) against latency (producing timely results).

Implementation Approaches

Event-Driven Architecture

Event-driven stream processing reacts to each event individually. Processing nodes subscribe to event sources, apply transformations, and publish results to downstream topics. This approach provides low latency but requires careful state management and coordination.

Event-driven systems typically use message brokers as the backbone. Producers publish events to topics, consumers subscribe to topics and process events independently. The broker handles buffering, routing, and delivery guarantees. This decoupling allows independent scaling of producers and consumers.

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Producer │────▶│  Broker  │────▶│ Consumer │
│          │     │  (Topic) │     │          │
└──────────┘     └──────────┘     └──────────┘
                 ┌──────────┐
                 │ Consumer │
                 │    2     │
                 └──────────┘

Micro-Batching

Micro-batching collects events into small batches before processing. Each micro-batch receives a batch ID and processes atomically. This approach simplifies exactly-once semantics and state management compared to pure event-by-event processing, at the cost of increased latency.

The batch size determines the latency-throughput tradeoff. Smaller batches reduce latency but increase processing overhead. Larger batches amortize overhead across more events but delay results. Typical micro-batch intervals range from 100 milliseconds to several seconds.

class MicroBatchProcessor
  def initialize(batch_interval: 1.0)
    @batch_interval = batch_interval
    @batch = []
    @batch_mutex = Mutex.new
  end

  def add_event(event)
    @batch_mutex.synchronize { @batch << event }
  end

  def run
    loop do
      sleep @batch_interval
      process_batch
    end
  end

  private

  def process_batch
    batch = @batch_mutex.synchronize do
      current = @batch.dup
      @batch.clear
      current
    end

    return if batch.empty?

    # Process batch atomically
    results = batch.map { |event| transform(event) }
    emit_batch(results)
  end

  def transform(event)
    event.merge(batch_id: generate_batch_id)
  end
end

Dataflow Programming

Dataflow models express computation as directed acyclic graphs where nodes represent operations and edges represent data flow. The runtime schedules operations based on data availability. This approach provides clear semantics for parallelization and optimization but requires building or adopting a dataflow framework.

Dataflow systems separate logical computation from physical execution. The same dataflow graph can execute on a single machine or distributed cluster. The runtime handles partitioning, distribution, and fault tolerance based on the graph structure and data characteristics.

Lambda Architecture

Lambda architecture combines batch and stream processing layers. The batch layer processes complete historical data to produce accurate views. The speed layer processes recent data with approximate algorithms to provide low-latency results. The serving layer merges both views for queries.

This approach addresses the difficulty of achieving both correctness and low latency in pure stream processing. The batch layer corrects any errors from the speed layer. The cost includes operational complexity and the need to implement logic twice.

Kappa Architecture

Kappa architecture eliminates the batch layer, using only stream processing with event replay capability. All data flows through the streaming system. Reprocessing historical data involves replaying events from the log. This simplifies the architecture but requires the streaming system to handle both low-latency and high-throughput processing.

Ruby Implementation

Ruby provides several libraries for stream processing, ranging from low-level message broker clients to higher-level streaming frameworks. The ecosystem includes production-grade tools for building streaming applications.

Karafka Framework

Karafka provides a Ruby framework for processing Apache Kafka streams. It handles consumer group management, message deserialization, error handling, and integrates with Ruby's ecosystem.

# Karafka consumer for processing user events
class UserEventsConsumer < Karafka::BaseConsumer
  def consume
    messages.each do |message|
      event = JSON.parse(message.payload)
      process_event(event)
    end
  end

  private

  def process_event(event)
    case event['type']
    when 'user_signup'
      handle_signup(event)
    when 'user_login'
      handle_login(event)
    else
      logger.warn("Unknown event type: #{event['type']}")
    end
  end

  def handle_signup(event)
    user_id = event['user_id']
    
    # Stateful processing: track signups per hour
    current_hour = Time.now.strftime('%Y-%m-%d-%H')
    counter_key = "signups:#{current_hour}"
    
    Redis.current.incr(counter_key)
    Redis.current.expire(counter_key, 7200)
    
    # Emit derived event
    produce_async(
      topic: 'user-analytics',
      payload: {
        type: 'signup_processed',
        user_id: user_id,
        timestamp: event['timestamp']
      }.to_json
    )
  end

  def handle_login(event)
    user_id = event['user_id']
    session_key = "sessions:#{user_id}"
    
    # Store session state
    Redis.current.setex(session_key, 3600, event['session_id'])
  end
end

Ruby-Kafka Client

The ruby-kafka gem provides low-level access to Kafka clusters. Applications build custom processing logic on top of the client library.

require 'kafka'

class StreamProcessor
  def initialize(brokers)
    @kafka = Kafka.new(brokers)
    @consumer = @kafka.consumer(group_id: 'processing-group')
    @producer = @kafka.producer
  end

  def subscribe_and_process(topic)
    @consumer.subscribe(topic)

    @consumer.each_message do |message|
      process_message(message)
    end
  end

  private

  def process_message(message)
    data = JSON.parse(message.value)
    
    # Transform data
    result = transform(data)
    
    # Write to output topic with same partition key
    @producer.produce(
      result.to_json,
      topic: 'processed-events',
      partition_key: message.key
    )
    
    # Deliver messages periodically
    @producer.deliver_messages if Time.now.to_i % 5 == 0
  end

  def transform(data)
    data.merge(
      processed_at: Time.now.iso8601,
      processor_id: Socket.gethostname
    )
  end

  def shutdown
    @producer.shutdown
    @consumer.stop
  end
end

Windowing Implementation

Ruby applications implement windowing logic explicitly since most Ruby message broker clients don't provide built-in windowing operators.

class TumblingWindowProcessor
  def initialize(window_size_seconds)
    @window_size = window_size_seconds
    @windows = Hash.new { |h, k| h[k] = [] }
    @window_mutex = Mutex.new
  end

  def add_event(event)
    window_key = window_for_event(event)
    
    @window_mutex.synchronize do
      @windows[window_key] << event
    end
    
    # Trigger window if complete
    trigger_window(window_key) if window_complete?(window_key)
  end

  private

  def window_for_event(event)
    timestamp = event[:event_time].to_i
    (timestamp / @window_size) * @window_size
  end

  def window_complete?(window_key)
    current_window = (Time.now.to_i / @window_size) * @window_size
    window_key < current_window
  end

  def trigger_window(window_key)
    events = @window_mutex.synchronize do
      @windows.delete(window_key) || []
    end

    return if events.empty?

    # Compute aggregation
    result = {
      window_start: Time.at(window_key),
      window_end: Time.at(window_key + @window_size),
      event_count: events.size,
      total_value: events.sum { |e| e[:value] }
    }

    emit_window_result(result)
  end
end

# Usage
processor = TumblingWindowProcessor.new(60) # 60-second windows

# Process events
processor.add_event({ event_time: Time.now - 120, value: 100 })
processor.add_event({ event_time: Time.now - 90, value: 150 })
processor.add_event({ event_time: Time.now - 30, value: 75 })

Session Window Implementation

Session windows group events separated by inactivity gaps. This requires tracking timeout periods per key.

class SessionWindowProcessor
  def initialize(timeout_seconds)
    @timeout = timeout_seconds
    @sessions = {}
    @mutex = Mutex.new
  end

  def process_event(event)
    key = event[:key]
    
    @mutex.synchronize do
      session = @sessions[key]
      
      if session.nil? || session_expired?(session)
        # Start new session
        close_session(key) if session
        @sessions[key] = new_session(event)
      else
        # Extend existing session
        session[:events] << event
        session[:last_event_time] = event[:event_time]
      end
    end
  end

  private

  def session_expired?(session)
    Time.now - session[:last_event_time] > @timeout
  end

  def new_session(event)
    {
      key: event[:key],
      events: [event],
      start_time: event[:event_time],
      last_event_time: event[:event_time]
    }
  end

  def close_session(key)
    session = @sessions.delete(key)
    return unless session

    emit_session({
      key: key,
      duration: session[:last_event_time] - session[:start_time],
      event_count: session[:events].size,
      events: session[:events]
    })
  end

  def check_timeouts
    @mutex.synchronize do
      @sessions.select { |_, s| session_expired?(s) }.each_key do |key|
        close_session(key)
      end
    end
  end
end

Stateful Processing with Redis

Redis provides distributed state storage for Ruby stream processors, supporting operations like counters, sets, and sorted sets with atomic operations.

class StatefulStreamProcessor
  def initialize(redis)
    @redis = redis
  end

  def process_click_event(event)
    user_id = event[:user_id]
    page = event[:page]
    timestamp = event[:timestamp].to_i

    # Track unique visitors per page
    @redis.sadd("page:#{page}:visitors", user_id)

    # Track click count per user
    @redis.incr("user:#{user_id}:clicks")

    # Track recent pages in sorted set (score = timestamp)
    @redis.zadd("user:#{user_id}:recent_pages", timestamp, page)
    @redis.zremrangebyrank("user:#{user_id}:recent_pages", 0, -11) # Keep last 10

    # Sliding window counter: clicks in last hour
    hour_ago = timestamp - 3600
    @redis.zremrangebyscore("user:#{user_id}:hourly_clicks", '-inf', hour_ago)
    @redis.zadd("user:#{user_id}:hourly_clicks", timestamp, SecureRandom.uuid)

    # Check if user is highly active
    hourly_clicks = @redis.zcard("user:#{user_id}:hourly_clicks")
    emit_alert(user_id) if hourly_clicks > 100
  end

  def get_user_stats(user_id)
    {
      total_clicks: @redis.get("user:#{user_id}:clicks").to_i,
      hourly_clicks: @redis.zcard("user:#{user_id}:hourly_clicks"),
      recent_pages: @redis.zrevrange("user:#{user_id}:recent_pages", 0, -1)
    }
  end
end

Design Considerations

Stream vs Batch Processing

Choose stream processing when results must be available within seconds or minutes of event occurrence. Applications requiring real-time dashboards, fraud detection, or immediate alerting benefit from streaming. Batch processing suits analytical workloads where results can wait hours or days, such as daily reports or monthly aggregations.

Stream processing introduces complexity in state management, fault tolerance, and testing compared to batch processing. Batch systems process complete datasets with well-defined boundaries, simplifying reasoning about correctness. Streaming systems must handle continuously arriving data, out-of-order events, and unbounded state.

Cost considerations differ between approaches. Stream processing requires continuously running infrastructure, incurring constant operational costs. Batch processing can use ephemeral clusters, paying only during execution. However, stream processing may reduce overall costs by enabling faster business decisions or preventing issues through early detection.

Message Ordering Guarantees

Maintaining event order impacts system design. Kafka preserves order within partitions but not across partitions. Processing events from multiple partitions in parallel breaks global ordering. Applications requiring strict ordering must either use a single partition (limiting throughput) or implement application-level coordination.

Many use cases tolerate relaxed ordering. Processing user events from different users doesn't require cross-user ordering. Processing events from a single user requires ordering within that user's event stream. Partition keys group related events into the same partition, maintaining relevant ordering while allowing parallel processing.

# Partition key determines ordering scope
class EventProducer
  def send_user_event(user_id, event_data)
    # All events for same user go to same partition
    producer.produce(
      event_data.to_json,
      topic: 'user-events',
      partition_key: user_id.to_s
    )
  end

  def send_global_event(event_data)
    # No partition key: random partition assignment
    producer.produce(
      event_data.to_json,
      topic: 'system-events'
    )
  end
end

State Management Strategies

Local state stored in process memory provides fast access but doesn't survive restarts. Applications must rebuild state from source topics on startup. External state stores like Redis or PostgreSQL survive restarts but introduce network latency and additional failure modes.

State size determines viable approaches. Small state fits in memory with periodic snapshotting to durable storage. Large state requires external storage or distributed state backends. State growth rate affects long-term viability; unbounded state requires compaction or time-based expiration.

Scaling Considerations

Stream processing scales horizontally by partitioning topics and distributing partitions across consumer instances. Each partition has a single active consumer in a consumer group. Adding consumers increases parallelism up to the partition count. Systems requiring more parallelism need more partitions, but partition count impacts broker performance and coordination overhead.

Stateful processing complicates scaling. State partitioning must align with message partitioning. Redistributing partitions during scaling requires migrating state between instances. Some frameworks handle this automatically; others require manual state migration.

Common Patterns

Event Sourcing

Event sourcing stores all state changes as immutable events. The current state derives from replaying events from the beginning. This pattern provides complete audit history and enables temporal queries. Stream processing naturally aligns with event sourcing by processing the event log.

class EventSourcedAccount
  def initialize(account_id, event_stream)
    @account_id = account_id
    @balance = 0
    @event_stream = event_stream
    
    rebuild_state
  end

  def debit(amount)
    event = {
      type: 'account_debited',
      account_id: @account_id,
      amount: amount,
      timestamp: Time.now.to_i
    }
    
    @event_stream.append(event)
    apply_event(event)
  end

  def credit(amount)
    event = {
      type: 'account_credited',
      account_id: @account_id,
      amount: amount,
      timestamp: Time.now.to_i
    }
    
    @event_stream.append(event)
    apply_event(event)
  end

  private

  def rebuild_state
    @event_stream.read(@account_id).each do |event|
      apply_event(event)
    end
  end

  def apply_event(event)
    case event[:type]
    when 'account_debited'
      @balance -= event[:amount]
    when 'account_credited'
      @balance += event[:amount]
    end
  end
end

CQRS (Command Query Responsibility Segregation)

CQRS separates write operations (commands) from read operations (queries). Commands modify state by appending events. Stream processors build read-optimized views from events. This pattern enables independent scaling of writes and reads, with specialized data models for each.

Stream processors maintain materialized views by consuming events and updating query databases. Multiple views serve different query patterns. View updates occur asynchronously from command processing, introducing eventual consistency.

Change Data Capture

CDC streams database changes as events, transforming databases into event sources. Applications consume database change events for cache invalidation, search index updates, or cross-system synchronization. CDC decouples data producers from consumers without modifying application code.

class CDCProcessor
  def process_change_event(event)
    table = event[:table]
    operation = event[:operation]
    data = event[:data]

    case table
    when 'users'
      handle_user_change(operation, data)
    when 'orders'
      handle_order_change(operation, data)
    end
  end

  private

  def handle_user_change(operation, data)
    user_id = data['id']

    case operation
    when 'INSERT', 'UPDATE'
      # Update search index
      SearchIndex.upsert(user_id, data)
      
      # Invalidate cache
      Cache.delete("user:#{user_id}")
      
      # Emit to analytics
      produce_event('user-updated', data)
    when 'DELETE'
      SearchIndex.delete(user_id)
      Cache.delete("user:#{user_id}")
    end
  end
end

Stream Joins

Joins combine related events from multiple streams. Stream-to-stream joins require windowing to bound the join. Stream-to-table joins enrich events with reference data. Temporal joins consider event time alignment.

class StreamJoiner
  def initialize(window_size)
    @window_size = window_size
    @left_buffer = {}
    @right_buffer = {}
  end

  def add_left(event)
    key = event[:join_key]
    @left_buffer[key] = event
    
    perform_join(key)
    cleanup_expired(key)
  end

  def add_right(event)
    key = event[:join_key]
    @right_buffer[key] = event
    
    perform_join(key)
    cleanup_expired(key)
  end

  private

  def perform_join(key)
    left = @left_buffer[key]
    right = @right_buffer[key]
    
    return unless left && right
    
    # Check if events within window
    time_diff = (left[:event_time] - right[:event_time]).abs
    return if time_diff > @window_size
    
    emit_joined({
      left: left,
      right: right,
      join_key: key
    })
  end
end

Saga Pattern

Sagas coordinate multi-step transactions across services using compensating actions. Each step emits events. Failures trigger compensating events to undo completed steps. Stream processors orchestrate saga execution by consuming step events and emitting next steps or compensations.

Performance Considerations

Throughput Optimization

Stream processing throughput depends on several factors. Batch size affects throughput: larger batches amortize per-message overhead. Network round trips dominate latency for small messages; batching reduces round trip count. Producer batching accumulates messages before sending. Consumer prefetching retrieves multiple messages per request.

class OptimizedProducer
  def initialize(kafka)
    @producer = kafka.producer(
      max_buffer_size: 10_000,        # Buffer up to 10k messages
      max_buffer_bytesize: 10_000_000, # Or 10MB
      delivery_interval: 5             # Send every 5 seconds
    )
  end

  def send_event(event)
    @producer.produce(
      event.to_json,
      topic: 'events'
    )
    # Messages buffered, not sent immediately
  end

  def flush
    @producer.deliver_messages # Send accumulated messages
  end
end

class OptimizedConsumer
  def initialize(kafka)
    @consumer = kafka.consumer(
      group_id: 'processing-group',
      fetcher_max_queue_size: 50,     # Prefetch up to 50 batches
      max_wait_time: 1                # Wait up to 1s for full fetch
    )
  end
end

Latency Optimization

Reducing latency requires minimizing processing time per event and network overhead. Avoid blocking operations in hot paths. Batch database writes to reduce round trips. Use asynchronous I/O for network calls. Pipeline processing stages to overlap I/O and computation.

Local state provides microsecond access times compared to milliseconds for remote storage. Cache frequently accessed reference data in memory. Precompute derived values during idle periods. Partition state to avoid contention between parallel processors.

Memory Management

Unbounded buffering leads to out-of-memory errors under load. Implement bounded queues with backpressure signaling. Drop or sample events when buffers fill. Monitor memory usage and trigger garbage collection before exhaustion.

Windowed operations accumulate events in memory. Window size and event rate determine memory requirements. Large windows or high event rates require memory-efficient data structures or external storage. Implement window eviction policies to bound memory usage.

class MemoryBoundedProcessor
  def initialize(max_buffer_size)
    @buffer = []
    @max_size = max_buffer_size
    @dropped_count = 0
  end

  def add_event(event)
    if @buffer.size >= @max_size
      @dropped_count += 1
      emit_metric('events.dropped', 1)
      return false
    end

    @buffer << event
    true
  end

  def process_batch
    batch = @buffer.shift(100) || []
    batch.each { |event| process_event(event) }
  end
end

Partition Balancing

Uneven partition distribution causes load imbalance. Hot partitions receive disproportionate traffic, limiting throughput to the slowest consumer. Choosing partition keys requires understanding data distribution. User IDs may distribute unevenly if some users generate far more events. Random partitioning distributes load evenly but breaks ordering guarantees.

Monitor partition lag per consumer instance. Large lag variance indicates imbalance. Repartition data with more uniform keys. Use composite keys combining multiple attributes. Hash keys to distribute deterministically.

Compression

Network bandwidth often limits throughput. Message compression reduces bytes transmitted at CPU cost. Compression ratios depend on data characteristics: JSON compresses well, binary data less so. Batch compression achieves better ratios than per-message compression.

# Configure producer compression
producer = kafka.producer(
  compression_codec: :snappy,  # Fast compression
  compression_threshold: 1024  # Compress batches > 1KB
)

# Or use gzip for better compression ratio
producer = kafka.producer(
  compression_codec: :gzip
)

Tools & Ecosystem

Apache Kafka

Kafka provides distributed message streaming with high throughput and durability. Topics partition data across brokers. Consumer groups coordinate parallel processing. Kafka Connect integrates with external systems. Kafka Streams offers stream processing DSL for JVM languages.

Ruby applications use kafka-ruby gem for producer and consumer APIs. Karafka framework provides Rails-like conventions for building Kafka applications. Deployment requires running Kafka broker clusters and ZooKeeper or KRaft for coordination.

RabbitMQ

RabbitMQ implements AMQP protocol for message queuing. Exchanges route messages to queues based on routing rules. Multiple binding patterns support various messaging topologies. RabbitMQ provides management UI and plugins for monitoring.

Bunny gem offers Ruby client for RabbitMQ. RabbitMQ suits lower-throughput workloads compared to Kafka but provides more flexible routing. RabbitMQ deletes consumed messages; Kafka retains messages for configurable periods.

Redis Streams

Redis Streams adds stream processing primitives to Redis. Consumer groups enable parallel processing. Commands support range queries, blocking reads, and automatic ID generation. Redis Streams suits applications already using Redis for caching or state storage.

require 'redis'

redis = Redis.new

# Add to stream
redis.xadd('events', { user: 'alice', action: 'login' })

# Read from stream
messages = redis.xread('events', '0', count: 10)

# Consumer group
redis.xgroup(:create, 'events', 'processors', '0')
redis.xreadgroup('processors', 'consumer1', 'events', '>')

AWS Kinesis

Kinesis provides managed streaming service on AWS. Kinesis Data Streams stores records in shards. Kinesis Data Firehose delivers streams to S3, Redshift, or Elasticsearch. Kinesis Data Analytics runs SQL queries on streams.

Ruby applications use aws-sdk-kinesis gem. Kinesis handles infrastructure management and scaling. Costs increase with throughput. Kinesis suits AWS-native applications requiring managed services.

Google Cloud Pub/Sub

Pub/Sub offers managed messaging on Google Cloud. Topics publish messages, subscriptions consume messages. Push subscriptions deliver messages via HTTP webhooks. Pull subscriptions let consumers poll for messages.

require 'google/cloud/pubsub'

pubsub = Google::Cloud::Pubsub.new

# Publish
topic = pubsub.topic('events')
topic.publish({ data: { user: 'bob' } }.to_json)

# Subscribe
subscription = topic.subscription('processor')
subscription.listen do |message|
  process(message.data)
  message.acknowledge!
end

Monitoring Tools

Prometheus collects metrics from stream processors. Export lag, throughput, error rates, and processing latency. Kafka exporters expose broker and consumer metrics. Custom exporters expose application-specific metrics.

Grafana visualizes metrics with dashboards. Create alerts for high lag, dropped messages, or processing errors. Track trends over time to identify degradation.

OpenTelemetry instruments applications for distributed tracing. Trace event flow through processing pipeline. Identify bottlenecks and failure points. Export traces to Jaeger or Zipkin.

Reference

Core Concepts

Concept Description Usage
Event Time Timestamp when event occurred in real world Use for accurate temporal analysis
Processing Time Timestamp when system processes event Use for operational metrics
Watermark Estimate of event time progress Triggers window computations
Window Finite chunk of infinite stream Enables aggregation over time ranges
State Information maintained across events Supports stateful operations like joins
Partition Unit of parallelism and ordering Determines scaling and ordering guarantees
Consumer Group Set of consumers processing topic Coordinates parallel processing
Offset Position in topic partition Tracks processing progress
Backpressure Downstream slower than upstream Prevents buffer overflow
Exactly-Once Each event affects results once Strongest delivery guarantee

Windowing Types

Type Behavior Use Case
Tumbling Fixed-size, non-overlapping intervals Hourly aggregations
Sliding Fixed-size, overlapping intervals Moving averages
Session Events grouped by inactivity gaps User session analysis
Global Single window for entire stream Final aggregation

Delivery Guarantees

Guarantee Behavior Tradeoffs
At-Most-Once May lose events on failure Lowest latency, simplest
At-Least-Once May duplicate events on failure Moderate complexity
Exactly-Once Each event processed once Highest latency, most complex

Kafka Configuration Parameters

Parameter Purpose Typical Values
batch.size Producer batch size in bytes 16384 to 1048576
linger.ms Producer batch wait time 0 to 100
fetch.min.bytes Consumer minimum fetch size 1 to 1048576
max.poll.records Records per consumer poll 100 to 1000
session.timeout.ms Consumer heartbeat timeout 10000 to 30000
enable.idempotence Enable exactly-once producer true for exactly-once
isolation.level Consumer transaction isolation read_uncommitted or read_committed

Ruby Gem Comparison

Gem Purpose Strengths
ruby-kafka Kafka client library Full protocol support, production-ready
karafka Kafka application framework Rails-like conventions, routing DSL
bunny RabbitMQ client AMQP support, flexible routing
aws-sdk-kinesis AWS Kinesis client AWS integration, managed service
google-cloud-pubsub Google Pub/Sub client GCP integration, push subscriptions

Performance Metrics

Metric Measures Target Range
Throughput Events processed per second Application-specific
Latency End-to-end processing time < 100ms for real-time
Lag Unconsumed message count < 1000 for low latency
Error Rate Failed events per second < 0.1% of throughput
CPU Usage Processor utilization 60-80% sustained
Memory Usage Process memory consumption < 80% of available

Common Operations

Operation Implementation Complexity
Filter Evaluate predicate per event O(1) per event
Map Transform each event O(1) per event
FlatMap Transform to multiple events O(n) per event
Aggregate Accumulate values in window O(w) for window size w
Join Combine streams by key O(w1 + w2) for window sizes
Deduplicate Remove duplicate events O(n) for n recent events