Overview
Stream processing refers to the continuous computation over unbounded data sets, where data arrives incrementally and results are produced as soon as possible. Unlike batch processing which operates on finite data sets with defined start and end points, stream processing handles data in motion, processing events as they occur.
The concept emerged from the need to handle high-velocity data sources such as sensor networks, financial trading systems, and user activity logs. Traditional batch processing systems introduced latency measured in hours or days, making them unsuitable for applications requiring immediate insights or rapid response to events.
Stream processing systems operate on the principle that data flows through a pipeline of processing stages. Each stage transforms, filters, aggregates, or enriches the data before passing it downstream. The processing model distinguishes between event time (when an event actually occurred) and processing time (when the system processes the event), a critical distinction for maintaining correctness in distributed systems.
# Conceptual stream processing flow
class StreamProcessor
def initialize
@source = EventSource.new
@pipeline = []
end
def transform(&block)
@pipeline << block
self
end
def run
@source.each_event do |event|
result = @pipeline.reduce(event) { |data, stage| stage.call(data) }
emit(result)
end
end
end
# Usage
processor = StreamProcessor.new
.transform { |event| event.merge(timestamp: Time.now) }
.transform { |event| event[:value] > 100 ? event : nil }
.run
Modern stream processing systems must handle challenges including out-of-order data arrival, late-arriving data, exactly-once processing semantics, state management across distributed nodes, and failure recovery without data loss. These requirements drive architectural decisions and implementation complexity.
Key Principles
Event Time vs Processing Time
Event time represents when an event occurred in the real world, typically embedded in the event payload. Processing time represents when the stream processing system handles the event. The gap between these timestamps, known as event time skew, varies based on network latency, system load, and producer behavior. Processing systems must decide which time domain to use for operations like windowing and aggregation.
class Event
attr_reader :data, :event_time, :processing_time
def initialize(data, event_time)
@data = data
@event_time = event_time
@processing_time = Time.now
end
def skew
@processing_time - @event_time
end
end
# Event arrives with embedded timestamp
event = Event.new({ sensor_id: 'temp_01', value: 22.5 }, Time.now - 300)
# => #<Event @event_time=2025-10-11 10:00:00, @processing_time=2025-10-11 10:05:00>
event.skew
# => 300 seconds
Windowing
Windows partition infinite streams into finite chunks for aggregation. Tumbling windows divide time into fixed, non-overlapping intervals. Sliding windows overlap, creating multiple windows for each event. Session windows group events separated by gaps of inactivity. Each windowing strategy serves different analytical requirements and impacts memory usage and computational complexity.
State Management
Stateful operations maintain information across multiple events. State can be local to a single processing node or distributed across a cluster. State management includes handling checkpointing for fault tolerance, state size limitations, and state access patterns. State must survive process restarts and node failures while maintaining consistency.
Backpressure
When downstream consumers process events slower than upstream producers generate them, the system experiences backpressure. Handling strategies include buffering (with bounded queues), dropping events, sampling, or signaling producers to slow down. Each approach trades off between system stability, data loss, and latency.
Exactly-Once Semantics
Stream processing systems offer different delivery guarantees. At-most-once delivery may lose events during failures. At-least-once delivery may process events multiple times. Exactly-once delivery ensures each event affects final results precisely once, requiring coordination between sources, processing, and sinks through distributed transactions or idempotent operations.
class ExactlyOnceProcessor
def initialize(offset_store, output_store)
@offset_store = offset_store
@output_store = output_store
end
def process_event(event)
offset = event[:offset]
# Check if already processed
return if @offset_store.processed?(offset)
# Process within transaction
@output_store.transaction do
result = transform(event[:data])
@output_store.write(result)
@offset_store.mark_processed(offset)
end
end
def transform(data)
# Idempotent transformation
data.merge(processed_at: Time.now.to_i)
end
end
Watermarks
Watermarks represent the system's estimate of event time progress. A watermark at time T indicates no more events with event time less than T will arrive. Systems use watermarks to trigger window computations and determine when to emit results. Watermark generation balances completeness (waiting for late data) against latency (producing timely results).
Implementation Approaches
Event-Driven Architecture
Event-driven stream processing reacts to each event individually. Processing nodes subscribe to event sources, apply transformations, and publish results to downstream topics. This approach provides low latency but requires careful state management and coordination.
Event-driven systems typically use message brokers as the backbone. Producers publish events to topics, consumers subscribe to topics and process events independently. The broker handles buffering, routing, and delivery guarantees. This decoupling allows independent scaling of producers and consumers.
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Producer │────▶│ Broker │────▶│ Consumer │
│ │ │ (Topic) │ │ │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ Consumer │
│ 2 │
└──────────┘
Micro-Batching
Micro-batching collects events into small batches before processing. Each micro-batch receives a batch ID and processes atomically. This approach simplifies exactly-once semantics and state management compared to pure event-by-event processing, at the cost of increased latency.
The batch size determines the latency-throughput tradeoff. Smaller batches reduce latency but increase processing overhead. Larger batches amortize overhead across more events but delay results. Typical micro-batch intervals range from 100 milliseconds to several seconds.
class MicroBatchProcessor
def initialize(batch_interval: 1.0)
@batch_interval = batch_interval
@batch = []
@batch_mutex = Mutex.new
end
def add_event(event)
@batch_mutex.synchronize { @batch << event }
end
def run
loop do
sleep @batch_interval
process_batch
end
end
private
def process_batch
batch = @batch_mutex.synchronize do
current = @batch.dup
@batch.clear
current
end
return if batch.empty?
# Process batch atomically
results = batch.map { |event| transform(event) }
emit_batch(results)
end
def transform(event)
event.merge(batch_id: generate_batch_id)
end
end
Dataflow Programming
Dataflow models express computation as directed acyclic graphs where nodes represent operations and edges represent data flow. The runtime schedules operations based on data availability. This approach provides clear semantics for parallelization and optimization but requires building or adopting a dataflow framework.
Dataflow systems separate logical computation from physical execution. The same dataflow graph can execute on a single machine or distributed cluster. The runtime handles partitioning, distribution, and fault tolerance based on the graph structure and data characteristics.
Lambda Architecture
Lambda architecture combines batch and stream processing layers. The batch layer processes complete historical data to produce accurate views. The speed layer processes recent data with approximate algorithms to provide low-latency results. The serving layer merges both views for queries.
This approach addresses the difficulty of achieving both correctness and low latency in pure stream processing. The batch layer corrects any errors from the speed layer. The cost includes operational complexity and the need to implement logic twice.
Kappa Architecture
Kappa architecture eliminates the batch layer, using only stream processing with event replay capability. All data flows through the streaming system. Reprocessing historical data involves replaying events from the log. This simplifies the architecture but requires the streaming system to handle both low-latency and high-throughput processing.
Ruby Implementation
Ruby provides several libraries for stream processing, ranging from low-level message broker clients to higher-level streaming frameworks. The ecosystem includes production-grade tools for building streaming applications.
Karafka Framework
Karafka provides a Ruby framework for processing Apache Kafka streams. It handles consumer group management, message deserialization, error handling, and integrates with Ruby's ecosystem.
# Karafka consumer for processing user events
class UserEventsConsumer < Karafka::BaseConsumer
def consume
messages.each do |message|
event = JSON.parse(message.payload)
process_event(event)
end
end
private
def process_event(event)
case event['type']
when 'user_signup'
handle_signup(event)
when 'user_login'
handle_login(event)
else
logger.warn("Unknown event type: #{event['type']}")
end
end
def handle_signup(event)
user_id = event['user_id']
# Stateful processing: track signups per hour
current_hour = Time.now.strftime('%Y-%m-%d-%H')
counter_key = "signups:#{current_hour}"
Redis.current.incr(counter_key)
Redis.current.expire(counter_key, 7200)
# Emit derived event
produce_async(
topic: 'user-analytics',
payload: {
type: 'signup_processed',
user_id: user_id,
timestamp: event['timestamp']
}.to_json
)
end
def handle_login(event)
user_id = event['user_id']
session_key = "sessions:#{user_id}"
# Store session state
Redis.current.setex(session_key, 3600, event['session_id'])
end
end
Ruby-Kafka Client
The ruby-kafka gem provides low-level access to Kafka clusters. Applications build custom processing logic on top of the client library.
require 'kafka'
class StreamProcessor
def initialize(brokers)
@kafka = Kafka.new(brokers)
@consumer = @kafka.consumer(group_id: 'processing-group')
@producer = @kafka.producer
end
def subscribe_and_process(topic)
@consumer.subscribe(topic)
@consumer.each_message do |message|
process_message(message)
end
end
private
def process_message(message)
data = JSON.parse(message.value)
# Transform data
result = transform(data)
# Write to output topic with same partition key
@producer.produce(
result.to_json,
topic: 'processed-events',
partition_key: message.key
)
# Deliver messages periodically
@producer.deliver_messages if Time.now.to_i % 5 == 0
end
def transform(data)
data.merge(
processed_at: Time.now.iso8601,
processor_id: Socket.gethostname
)
end
def shutdown
@producer.shutdown
@consumer.stop
end
end
Windowing Implementation
Ruby applications implement windowing logic explicitly since most Ruby message broker clients don't provide built-in windowing operators.
class TumblingWindowProcessor
def initialize(window_size_seconds)
@window_size = window_size_seconds
@windows = Hash.new { |h, k| h[k] = [] }
@window_mutex = Mutex.new
end
def add_event(event)
window_key = window_for_event(event)
@window_mutex.synchronize do
@windows[window_key] << event
end
# Trigger window if complete
trigger_window(window_key) if window_complete?(window_key)
end
private
def window_for_event(event)
timestamp = event[:event_time].to_i
(timestamp / @window_size) * @window_size
end
def window_complete?(window_key)
current_window = (Time.now.to_i / @window_size) * @window_size
window_key < current_window
end
def trigger_window(window_key)
events = @window_mutex.synchronize do
@windows.delete(window_key) || []
end
return if events.empty?
# Compute aggregation
result = {
window_start: Time.at(window_key),
window_end: Time.at(window_key + @window_size),
event_count: events.size,
total_value: events.sum { |e| e[:value] }
}
emit_window_result(result)
end
end
# Usage
processor = TumblingWindowProcessor.new(60) # 60-second windows
# Process events
processor.add_event({ event_time: Time.now - 120, value: 100 })
processor.add_event({ event_time: Time.now - 90, value: 150 })
processor.add_event({ event_time: Time.now - 30, value: 75 })
Session Window Implementation
Session windows group events separated by inactivity gaps. This requires tracking timeout periods per key.
class SessionWindowProcessor
def initialize(timeout_seconds)
@timeout = timeout_seconds
@sessions = {}
@mutex = Mutex.new
end
def process_event(event)
key = event[:key]
@mutex.synchronize do
session = @sessions[key]
if session.nil? || session_expired?(session)
# Start new session
close_session(key) if session
@sessions[key] = new_session(event)
else
# Extend existing session
session[:events] << event
session[:last_event_time] = event[:event_time]
end
end
end
private
def session_expired?(session)
Time.now - session[:last_event_time] > @timeout
end
def new_session(event)
{
key: event[:key],
events: [event],
start_time: event[:event_time],
last_event_time: event[:event_time]
}
end
def close_session(key)
session = @sessions.delete(key)
return unless session
emit_session({
key: key,
duration: session[:last_event_time] - session[:start_time],
event_count: session[:events].size,
events: session[:events]
})
end
def check_timeouts
@mutex.synchronize do
@sessions.select { |_, s| session_expired?(s) }.each_key do |key|
close_session(key)
end
end
end
end
Stateful Processing with Redis
Redis provides distributed state storage for Ruby stream processors, supporting operations like counters, sets, and sorted sets with atomic operations.
class StatefulStreamProcessor
def initialize(redis)
@redis = redis
end
def process_click_event(event)
user_id = event[:user_id]
page = event[:page]
timestamp = event[:timestamp].to_i
# Track unique visitors per page
@redis.sadd("page:#{page}:visitors", user_id)
# Track click count per user
@redis.incr("user:#{user_id}:clicks")
# Track recent pages in sorted set (score = timestamp)
@redis.zadd("user:#{user_id}:recent_pages", timestamp, page)
@redis.zremrangebyrank("user:#{user_id}:recent_pages", 0, -11) # Keep last 10
# Sliding window counter: clicks in last hour
hour_ago = timestamp - 3600
@redis.zremrangebyscore("user:#{user_id}:hourly_clicks", '-inf', hour_ago)
@redis.zadd("user:#{user_id}:hourly_clicks", timestamp, SecureRandom.uuid)
# Check if user is highly active
hourly_clicks = @redis.zcard("user:#{user_id}:hourly_clicks")
emit_alert(user_id) if hourly_clicks > 100
end
def get_user_stats(user_id)
{
total_clicks: @redis.get("user:#{user_id}:clicks").to_i,
hourly_clicks: @redis.zcard("user:#{user_id}:hourly_clicks"),
recent_pages: @redis.zrevrange("user:#{user_id}:recent_pages", 0, -1)
}
end
end
Design Considerations
Stream vs Batch Processing
Choose stream processing when results must be available within seconds or minutes of event occurrence. Applications requiring real-time dashboards, fraud detection, or immediate alerting benefit from streaming. Batch processing suits analytical workloads where results can wait hours or days, such as daily reports or monthly aggregations.
Stream processing introduces complexity in state management, fault tolerance, and testing compared to batch processing. Batch systems process complete datasets with well-defined boundaries, simplifying reasoning about correctness. Streaming systems must handle continuously arriving data, out-of-order events, and unbounded state.
Cost considerations differ between approaches. Stream processing requires continuously running infrastructure, incurring constant operational costs. Batch processing can use ephemeral clusters, paying only during execution. However, stream processing may reduce overall costs by enabling faster business decisions or preventing issues through early detection.
Message Ordering Guarantees
Maintaining event order impacts system design. Kafka preserves order within partitions but not across partitions. Processing events from multiple partitions in parallel breaks global ordering. Applications requiring strict ordering must either use a single partition (limiting throughput) or implement application-level coordination.
Many use cases tolerate relaxed ordering. Processing user events from different users doesn't require cross-user ordering. Processing events from a single user requires ordering within that user's event stream. Partition keys group related events into the same partition, maintaining relevant ordering while allowing parallel processing.
# Partition key determines ordering scope
class EventProducer
def send_user_event(user_id, event_data)
# All events for same user go to same partition
producer.produce(
event_data.to_json,
topic: 'user-events',
partition_key: user_id.to_s
)
end
def send_global_event(event_data)
# No partition key: random partition assignment
producer.produce(
event_data.to_json,
topic: 'system-events'
)
end
end
State Management Strategies
Local state stored in process memory provides fast access but doesn't survive restarts. Applications must rebuild state from source topics on startup. External state stores like Redis or PostgreSQL survive restarts but introduce network latency and additional failure modes.
State size determines viable approaches. Small state fits in memory with periodic snapshotting to durable storage. Large state requires external storage or distributed state backends. State growth rate affects long-term viability; unbounded state requires compaction or time-based expiration.
Scaling Considerations
Stream processing scales horizontally by partitioning topics and distributing partitions across consumer instances. Each partition has a single active consumer in a consumer group. Adding consumers increases parallelism up to the partition count. Systems requiring more parallelism need more partitions, but partition count impacts broker performance and coordination overhead.
Stateful processing complicates scaling. State partitioning must align with message partitioning. Redistributing partitions during scaling requires migrating state between instances. Some frameworks handle this automatically; others require manual state migration.
Common Patterns
Event Sourcing
Event sourcing stores all state changes as immutable events. The current state derives from replaying events from the beginning. This pattern provides complete audit history and enables temporal queries. Stream processing naturally aligns with event sourcing by processing the event log.
class EventSourcedAccount
def initialize(account_id, event_stream)
@account_id = account_id
@balance = 0
@event_stream = event_stream
rebuild_state
end
def debit(amount)
event = {
type: 'account_debited',
account_id: @account_id,
amount: amount,
timestamp: Time.now.to_i
}
@event_stream.append(event)
apply_event(event)
end
def credit(amount)
event = {
type: 'account_credited',
account_id: @account_id,
amount: amount,
timestamp: Time.now.to_i
}
@event_stream.append(event)
apply_event(event)
end
private
def rebuild_state
@event_stream.read(@account_id).each do |event|
apply_event(event)
end
end
def apply_event(event)
case event[:type]
when 'account_debited'
@balance -= event[:amount]
when 'account_credited'
@balance += event[:amount]
end
end
end
CQRS (Command Query Responsibility Segregation)
CQRS separates write operations (commands) from read operations (queries). Commands modify state by appending events. Stream processors build read-optimized views from events. This pattern enables independent scaling of writes and reads, with specialized data models for each.
Stream processors maintain materialized views by consuming events and updating query databases. Multiple views serve different query patterns. View updates occur asynchronously from command processing, introducing eventual consistency.
Change Data Capture
CDC streams database changes as events, transforming databases into event sources. Applications consume database change events for cache invalidation, search index updates, or cross-system synchronization. CDC decouples data producers from consumers without modifying application code.
class CDCProcessor
def process_change_event(event)
table = event[:table]
operation = event[:operation]
data = event[:data]
case table
when 'users'
handle_user_change(operation, data)
when 'orders'
handle_order_change(operation, data)
end
end
private
def handle_user_change(operation, data)
user_id = data['id']
case operation
when 'INSERT', 'UPDATE'
# Update search index
SearchIndex.upsert(user_id, data)
# Invalidate cache
Cache.delete("user:#{user_id}")
# Emit to analytics
produce_event('user-updated', data)
when 'DELETE'
SearchIndex.delete(user_id)
Cache.delete("user:#{user_id}")
end
end
end
Stream Joins
Joins combine related events from multiple streams. Stream-to-stream joins require windowing to bound the join. Stream-to-table joins enrich events with reference data. Temporal joins consider event time alignment.
class StreamJoiner
def initialize(window_size)
@window_size = window_size
@left_buffer = {}
@right_buffer = {}
end
def add_left(event)
key = event[:join_key]
@left_buffer[key] = event
perform_join(key)
cleanup_expired(key)
end
def add_right(event)
key = event[:join_key]
@right_buffer[key] = event
perform_join(key)
cleanup_expired(key)
end
private
def perform_join(key)
left = @left_buffer[key]
right = @right_buffer[key]
return unless left && right
# Check if events within window
time_diff = (left[:event_time] - right[:event_time]).abs
return if time_diff > @window_size
emit_joined({
left: left,
right: right,
join_key: key
})
end
end
Saga Pattern
Sagas coordinate multi-step transactions across services using compensating actions. Each step emits events. Failures trigger compensating events to undo completed steps. Stream processors orchestrate saga execution by consuming step events and emitting next steps or compensations.
Performance Considerations
Throughput Optimization
Stream processing throughput depends on several factors. Batch size affects throughput: larger batches amortize per-message overhead. Network round trips dominate latency for small messages; batching reduces round trip count. Producer batching accumulates messages before sending. Consumer prefetching retrieves multiple messages per request.
class OptimizedProducer
def initialize(kafka)
@producer = kafka.producer(
max_buffer_size: 10_000, # Buffer up to 10k messages
max_buffer_bytesize: 10_000_000, # Or 10MB
delivery_interval: 5 # Send every 5 seconds
)
end
def send_event(event)
@producer.produce(
event.to_json,
topic: 'events'
)
# Messages buffered, not sent immediately
end
def flush
@producer.deliver_messages # Send accumulated messages
end
end
class OptimizedConsumer
def initialize(kafka)
@consumer = kafka.consumer(
group_id: 'processing-group',
fetcher_max_queue_size: 50, # Prefetch up to 50 batches
max_wait_time: 1 # Wait up to 1s for full fetch
)
end
end
Latency Optimization
Reducing latency requires minimizing processing time per event and network overhead. Avoid blocking operations in hot paths. Batch database writes to reduce round trips. Use asynchronous I/O for network calls. Pipeline processing stages to overlap I/O and computation.
Local state provides microsecond access times compared to milliseconds for remote storage. Cache frequently accessed reference data in memory. Precompute derived values during idle periods. Partition state to avoid contention between parallel processors.
Memory Management
Unbounded buffering leads to out-of-memory errors under load. Implement bounded queues with backpressure signaling. Drop or sample events when buffers fill. Monitor memory usage and trigger garbage collection before exhaustion.
Windowed operations accumulate events in memory. Window size and event rate determine memory requirements. Large windows or high event rates require memory-efficient data structures or external storage. Implement window eviction policies to bound memory usage.
class MemoryBoundedProcessor
def initialize(max_buffer_size)
@buffer = []
@max_size = max_buffer_size
@dropped_count = 0
end
def add_event(event)
if @buffer.size >= @max_size
@dropped_count += 1
emit_metric('events.dropped', 1)
return false
end
@buffer << event
true
end
def process_batch
batch = @buffer.shift(100) || []
batch.each { |event| process_event(event) }
end
end
Partition Balancing
Uneven partition distribution causes load imbalance. Hot partitions receive disproportionate traffic, limiting throughput to the slowest consumer. Choosing partition keys requires understanding data distribution. User IDs may distribute unevenly if some users generate far more events. Random partitioning distributes load evenly but breaks ordering guarantees.
Monitor partition lag per consumer instance. Large lag variance indicates imbalance. Repartition data with more uniform keys. Use composite keys combining multiple attributes. Hash keys to distribute deterministically.
Compression
Network bandwidth often limits throughput. Message compression reduces bytes transmitted at CPU cost. Compression ratios depend on data characteristics: JSON compresses well, binary data less so. Batch compression achieves better ratios than per-message compression.
# Configure producer compression
producer = kafka.producer(
compression_codec: :snappy, # Fast compression
compression_threshold: 1024 # Compress batches > 1KB
)
# Or use gzip for better compression ratio
producer = kafka.producer(
compression_codec: :gzip
)
Tools & Ecosystem
Apache Kafka
Kafka provides distributed message streaming with high throughput and durability. Topics partition data across brokers. Consumer groups coordinate parallel processing. Kafka Connect integrates with external systems. Kafka Streams offers stream processing DSL for JVM languages.
Ruby applications use kafka-ruby gem for producer and consumer APIs. Karafka framework provides Rails-like conventions for building Kafka applications. Deployment requires running Kafka broker clusters and ZooKeeper or KRaft for coordination.
RabbitMQ
RabbitMQ implements AMQP protocol for message queuing. Exchanges route messages to queues based on routing rules. Multiple binding patterns support various messaging topologies. RabbitMQ provides management UI and plugins for monitoring.
Bunny gem offers Ruby client for RabbitMQ. RabbitMQ suits lower-throughput workloads compared to Kafka but provides more flexible routing. RabbitMQ deletes consumed messages; Kafka retains messages for configurable periods.
Redis Streams
Redis Streams adds stream processing primitives to Redis. Consumer groups enable parallel processing. Commands support range queries, blocking reads, and automatic ID generation. Redis Streams suits applications already using Redis for caching or state storage.
require 'redis'
redis = Redis.new
# Add to stream
redis.xadd('events', { user: 'alice', action: 'login' })
# Read from stream
messages = redis.xread('events', '0', count: 10)
# Consumer group
redis.xgroup(:create, 'events', 'processors', '0')
redis.xreadgroup('processors', 'consumer1', 'events', '>')
AWS Kinesis
Kinesis provides managed streaming service on AWS. Kinesis Data Streams stores records in shards. Kinesis Data Firehose delivers streams to S3, Redshift, or Elasticsearch. Kinesis Data Analytics runs SQL queries on streams.
Ruby applications use aws-sdk-kinesis gem. Kinesis handles infrastructure management and scaling. Costs increase with throughput. Kinesis suits AWS-native applications requiring managed services.
Google Cloud Pub/Sub
Pub/Sub offers managed messaging on Google Cloud. Topics publish messages, subscriptions consume messages. Push subscriptions deliver messages via HTTP webhooks. Pull subscriptions let consumers poll for messages.
require 'google/cloud/pubsub'
pubsub = Google::Cloud::Pubsub.new
# Publish
topic = pubsub.topic('events')
topic.publish({ data: { user: 'bob' } }.to_json)
# Subscribe
subscription = topic.subscription('processor')
subscription.listen do |message|
process(message.data)
message.acknowledge!
end
Monitoring Tools
Prometheus collects metrics from stream processors. Export lag, throughput, error rates, and processing latency. Kafka exporters expose broker and consumer metrics. Custom exporters expose application-specific metrics.
Grafana visualizes metrics with dashboards. Create alerts for high lag, dropped messages, or processing errors. Track trends over time to identify degradation.
OpenTelemetry instruments applications for distributed tracing. Trace event flow through processing pipeline. Identify bottlenecks and failure points. Export traces to Jaeger or Zipkin.
Reference
Core Concepts
| Concept | Description | Usage |
|---|---|---|
| Event Time | Timestamp when event occurred in real world | Use for accurate temporal analysis |
| Processing Time | Timestamp when system processes event | Use for operational metrics |
| Watermark | Estimate of event time progress | Triggers window computations |
| Window | Finite chunk of infinite stream | Enables aggregation over time ranges |
| State | Information maintained across events | Supports stateful operations like joins |
| Partition | Unit of parallelism and ordering | Determines scaling and ordering guarantees |
| Consumer Group | Set of consumers processing topic | Coordinates parallel processing |
| Offset | Position in topic partition | Tracks processing progress |
| Backpressure | Downstream slower than upstream | Prevents buffer overflow |
| Exactly-Once | Each event affects results once | Strongest delivery guarantee |
Windowing Types
| Type | Behavior | Use Case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping intervals | Hourly aggregations |
| Sliding | Fixed-size, overlapping intervals | Moving averages |
| Session | Events grouped by inactivity gaps | User session analysis |
| Global | Single window for entire stream | Final aggregation |
Delivery Guarantees
| Guarantee | Behavior | Tradeoffs |
|---|---|---|
| At-Most-Once | May lose events on failure | Lowest latency, simplest |
| At-Least-Once | May duplicate events on failure | Moderate complexity |
| Exactly-Once | Each event processed once | Highest latency, most complex |
Kafka Configuration Parameters
| Parameter | Purpose | Typical Values |
|---|---|---|
| batch.size | Producer batch size in bytes | 16384 to 1048576 |
| linger.ms | Producer batch wait time | 0 to 100 |
| fetch.min.bytes | Consumer minimum fetch size | 1 to 1048576 |
| max.poll.records | Records per consumer poll | 100 to 1000 |
| session.timeout.ms | Consumer heartbeat timeout | 10000 to 30000 |
| enable.idempotence | Enable exactly-once producer | true for exactly-once |
| isolation.level | Consumer transaction isolation | read_uncommitted or read_committed |
Ruby Gem Comparison
| Gem | Purpose | Strengths |
|---|---|---|
| ruby-kafka | Kafka client library | Full protocol support, production-ready |
| karafka | Kafka application framework | Rails-like conventions, routing DSL |
| bunny | RabbitMQ client | AMQP support, flexible routing |
| aws-sdk-kinesis | AWS Kinesis client | AWS integration, managed service |
| google-cloud-pubsub | Google Pub/Sub client | GCP integration, push subscriptions |
Performance Metrics
| Metric | Measures | Target Range |
|---|---|---|
| Throughput | Events processed per second | Application-specific |
| Latency | End-to-end processing time | < 100ms for real-time |
| Lag | Unconsumed message count | < 1000 for low latency |
| Error Rate | Failed events per second | < 0.1% of throughput |
| CPU Usage | Processor utilization | 60-80% sustained |
| Memory Usage | Process memory consumption | < 80% of available |
Common Operations
| Operation | Implementation | Complexity |
|---|---|---|
| Filter | Evaluate predicate per event | O(1) per event |
| Map | Transform each event | O(1) per event |
| FlatMap | Transform to multiple events | O(n) per event |
| Aggregate | Accumulate values in window | O(w) for window size w |
| Join | Combine streams by key | O(w1 + w2) for window sizes |
| Deduplicate | Remove duplicate events | O(n) for n recent events |