CrackedRuby CrackedRuby

Overview

Kappa Architecture represents a data processing architecture pattern that addresses the operational complexity of Lambda Architecture by proposing a single processing path. Jay Kreps introduced this approach in 2014 as a response to the dual-layer complexity of maintaining both batch and stream processing systems. The architecture centers on treating all data as streams and storing events in an immutable, replayable log.

The fundamental premise replaces Lambda Architecture's batch and speed layers with a unified stream processing layer. All data flows through a distributed log system where events are stored in order of arrival. This log serves as the source of truth, and all processing happens through stream processors that read from this log. When reprocessing becomes necessary due to code changes or error correction, the system creates a new stream processor instance that reads from the beginning of the log while the current processor continues serving live traffic.

Traditional database architectures model data as mutable state that changes through updates and deletes. Kappa Architecture inverts this model by treating the log of changes as primary and derived views as secondary. Applications generate events that append to the log, and stream processors transform these events into materialized views optimized for queries. The immutable log preserves complete history, enabling time-travel queries and reproducible data processing.

Consider an e-commerce analytics system. In Lambda Architecture, raw purchase events would feed into both a batch system that processes daily summaries and a stream processor handling real-time updates. Kappa Architecture eliminates this duplication by processing all events through a stream processor that maintains running totals. When the calculation logic changes, a new processor version starts reading from the log's beginning, computes updated totals in parallel with the live processor, and switches over once caught up.

# Kappa Architecture event flow
class OrderEvent
  attr_reader :order_id, :amount, :timestamp
  
  def initialize(order_id:, amount:, timestamp:)
    @order_id = order_id
    @amount = amount
    @timestamp = timestamp
  end
end

# Events append to immutable log
event_log = EventLog.new('orders')
event_log.append(OrderEvent.new(
  order_id: 'ord-123',
  amount: 99.99,
  timestamp: Time.now
))

The architecture proves particularly suitable for event-driven systems, real-time analytics, and applications where data lineage and reprocessing capabilities matter. Organizations processing continuous data streams benefit from the simplified operational model and reduced code duplication.

Key Principles

Kappa Architecture builds on three foundational principles that distinguish it from traditional batch-oriented and hybrid architectures.

Immutable Event Log as Source of Truth

All data exists as a sequence of immutable events stored in order of occurrence. Applications generate events representing state changes, user actions, or system observations. These events append to a distributed log that preserves them indefinitely or according to retention policies. The log functions as the authoritative record, and no process modifies events after writing. This immutability guarantees that replaying events produces identical results given the same processing logic.

class ImmutableLog
  def initialize(topic)
    @topic = topic
    @kafka = Kafka.new(['localhost:9092'])
    @producer = @kafka.producer
  end
  
  def append(event)
    @producer.produce(
      event.to_json,
      topic: @topic,
      partition_key: event.id
    )
    @producer.deliver_messages
  end
  
  # Events never deleted or modified, only added
  def read_from(offset)
    consumer = @kafka.consumer(group_id: 'processor')
    consumer.subscribe(@topic)
    consumer.seek(@topic, 0, offset)
    
    consumer.each_message do |message|
      yield JSON.parse(message.value)
    end
  end
end

Single Stream Processing Path

All computation happens through stream processors that read from the event log and produce derived views or trigger actions. The architecture eliminates separate batch and speed layers, unifying data processing in a single model. Stream processors consume events continuously, maintaining state that updates with each new event. This unified approach removes the complexity of reconciling results from different processing systems and eliminates code duplication between batch and streaming implementations.

Stream processors can be parallelized by partitioning the input log, with each processor instance handling a subset of partitions. This partitioning enables horizontal scaling while maintaining ordered processing within partitions. Processors checkpoint their progress, allowing recovery from failures by resuming from the last committed offset.

class StreamProcessor
  def initialize(log, view_store)
    @log = log
    @view_store = view_store
    @state = {}
  end
  
  def process
    @log.read_from(last_checkpoint) do |event|
      case event['type']
      when 'order_placed'
        update_revenue_view(event)
      when 'order_cancelled'
        adjust_revenue_view(event)
      end
      
      checkpoint(event['offset'])
    end
  end
  
  private
  
  def update_revenue_view(event)
    date = Date.parse(event['timestamp'])
    current = @view_store.get("revenue:#{date}") || 0
    @view_store.set("revenue:#{date}", current + event['amount'])
  end
end

Reprocessing Through Log Replay

Changes to processing logic or corrections of errors require recomputing derived views. Rather than implementing separate backfill mechanisms, Kappa Architecture handles reprocessing by starting a new processor instance that reads from the log's beginning. This new processor runs in parallel with the current production processor, computing results from scratch. Once the new processor catches up to the log's current end, the application switches to using its output, and the old processor shuts down.

This approach guarantees that reprocessed results match what would have been produced if the new logic had been running from the start. The technique works because the log preserves the complete event history, and processing logic operates deterministically on this history.

class ReprocessingCoordinator
  def reprocess_with_new_logic(processor_version)
    new_processor = StreamProcessor.new(
      @log,
      view_store: create_temporary_store(processor_version)
    )
    
    # Start from beginning of log
    reprocessing_task = Thread.new do
      new_processor.process_from_offset(0)
    end
    
    # Wait for catch-up
    loop do
      break if new_processor.current_offset >= @log.latest_offset - 1000
      sleep(5)
    end
    
    # Switch to new view
    @view_store.atomically do
      promote_temporary_store(processor_version)
    end
    
    @current_processor.shutdown
    @current_processor = new_processor
  end
end

The log functions as both the source of input events and the mechanism for state recovery. This dual role simplifies the architecture by eliminating the need for separate snapshot storage and restoration logic. Processors can recreate any derived view by replaying the relevant portion of the log.

State management becomes a function of event processing. Processors maintain local state that updates with each event, but this state remains ephemeral and disposable. Losing processor state does not cause data loss because replaying from the log reconstructs the state. This property enables stateless or state-light processor designs that simplify scaling and recovery.

The architecture supports multiple simultaneous views of the same data. Different processors can read the same event log and produce different materialized views optimized for specific query patterns. An analytics system might maintain time-series aggregations, user-level summaries, and product-level metrics from a single stream of transaction events.

Design Considerations

Selecting Kappa Architecture involves evaluating trade-offs between operational simplicity and specific processing requirements.

When Kappa Architecture Fits

Stream-first workloads where data arrives continuously and requires near-real-time processing benefit most from Kappa Architecture. Systems processing user interactions, sensor telemetry, application logs, or financial transactions align well with the streaming model. The architecture excels when queries primarily access recent data or time-windowed aggregations rather than requiring full historical scans.

Event-driven systems that model state changes as domain events match Kappa's event log naturally. Applications implementing event sourcing or CQRS patterns find the immutable log provides the event store and reprocessing capability these patterns require. The single processing path reduces complexity compared to maintaining separate batch and streaming implementations.

Organizations prioritizing operational simplicity over maximum flexibility gain from Kappa's unified model. Teams avoid the coordination overhead of keeping batch and streaming code synchronized and the infrastructure costs of running parallel processing systems. The reduced surface area for bugs and the simplified deployment model accelerate development velocity.

# Good fit: Real-time analytics on streaming data
class UserActivityAnalytics
  def initialize
    @activity_log = ImmutableLog.new('user_activity')
    @metrics_store = Redis.new
  end
  
  def process_activity_stream
    StreamProcessor.new(@activity_log, @metrics_store).process do |event|
      increment_metric("user:#{event['user_id']}:actions")
      increment_metric("feature:#{event['feature']}:usage")
      update_last_seen(event['user_id'], event['timestamp'])
    end
  end
end

When Lambda Architecture Remains Preferable

Complex analytical queries spanning full historical datasets often require batch processing's efficiency. Stream processors excel at incremental computation but struggle with queries demanding joins across large datasets or complex aggregations over years of history. Lambda Architecture's batch layer handles these cases more efficiently by leveraging distributed batch processing frameworks.

Use cases requiring exact results rather than approximate streaming answers favor Lambda's batch layer. While stream processors can maintain accurate counts and sums for bounded windows, computing exact distinct counts or complex percentiles across unbounded data proves challenging. Batch jobs that process complete datasets produce precise results without the approximations streaming systems sometimes require.

Existing infrastructure investments in batch processing systems may justify Lambda Architecture. Organizations with mature Hadoop or Spark deployments and teams experienced in batch processing might find extending these systems more practical than migrating to pure streaming. The transition costs and learning curve for stream processing could outweigh Kappa's operational benefits.

Log Retention and Storage Costs

Kappa Architecture's reprocessing capability depends on retaining the complete event log. Storage costs scale with retention period and event volume. High-velocity systems generating millions of events per second accumulate petabytes of log data over months. Organizations must balance reprocessing flexibility against storage expenses.

Tiered storage strategies help manage costs by moving older log segments to cheaper storage tiers. Recent data remains on fast SSDs for active processing, while historical segments migrate to object storage or cold storage. Reprocessing jobs read from appropriate tiers based on the data range needed.

class TieredLogStorage
  def initialize
    @hot_storage = KafkaCluster.new('hot', retention: 7.days)
    @warm_storage = S3Bucket.new('warm', retention: 90.days)
    @cold_storage = Glacier.new('cold')
  end
  
  def archive_old_segments
    @hot_storage.segments_older_than(7.days).each do |segment|
      @warm_storage.upload(segment)
      segment.delete if @warm_storage.verify(segment)
    end
    
    @warm_storage.segments_older_than(90.days).each do |segment|
      @cold_storage.archive(segment)
      segment.delete if @cold_storage.verify(segment)
    end
  end
  
  def reprocess_from_date(start_date)
    # Route reads to appropriate tier
    tier = storage_tier_for_date(start_date)
    tier.read_from(start_date)
  end
end

Compaction policies reduce storage by consolidating events that supersede each other. For entity-oriented data where later events replace earlier versions, log compaction retains only the most recent state for each entity. This technique preserves the ability to reconstruct current state while discarding intermediate history.

Schema Evolution and Compatibility

Event schemas evolve as applications change. Adding fields to events, changing data types, or restructuring nested objects impacts processors reading from the log. Schema compatibility strategies determine how processors handle events written with different schema versions.

Forward compatibility allows old processors to read new events by ignoring unknown fields. Backward compatibility lets new processors read old events by providing defaults for missing fields. Kappa Architecture processors typically require both forward and backward compatibility to support gradual rollouts and reprocessing.

class EventDeserializer
  def deserialize(event_data, expected_version)
    parsed = JSON.parse(event_data)
    schema_version = parsed['schema_version'] || 1
    
    case schema_version
    when 1
      # Old schema: amount as integer cents
      {
        order_id: parsed['order_id'],
        amount: parsed['amount'] / 100.0,
        currency: 'USD' # Default for v1
      }
    when 2
      # New schema: amount as decimal with currency
      {
        order_id: parsed['order_id'],
        amount: parsed['amount'],
        currency: parsed['currency']
      }
    end
  end
end

Schema registries provide centralized schema management and validation. Producers register schemas before writing events, and consumers fetch schemas when reading. This centralization enables schema evolution policies and compatibility checking. The registry validates that schema changes maintain compatibility, preventing breaking changes.

State Size and Partitioning

Stream processors maintaining state face memory constraints as state grows. Processors holding aggregations, joined views, or entity caches must fit state in memory or spill to disk. Large state impacts performance and recovery time after failures.

Partitioning distributes state across multiple processor instances. Each instance handles a subset of keys, maintaining only the state for its partition. Proper partition key selection ensures even distribution and maintains processing guarantees. Keys representing entities should hash to consistent partitions to keep related state together.

class PartitionedProcessor
  def initialize(partition_id, total_partitions)
    @partition_id = partition_id
    @total_partitions = total_partitions
    @local_state = {}
  end
  
  def process_event(event)
    # Only process if event belongs to this partition
    event_partition = hash_key(event.key) % @total_partitions
    return unless event_partition == @partition_id
    
    update_local_state(event)
  end
  
  def handles_key?(key)
    hash_key(key) % @total_partitions == @partition_id
  end
  
  private
  
  def hash_key(key)
    Digest::MD5.hexdigest(key).to_i(16)
  end
end

External state stores supplement in-memory state for large datasets. Processors store state in key-value databases like RocksDB or Redis, accessing only the needed subset during processing. This approach trades some performance for the ability to handle state larger than available memory.

Implementation Approaches

Multiple strategies exist for implementing Kappa Architecture, each with different trade-offs around complexity, performance, and operational characteristics.

Log-Centric Implementation

The log-centric approach uses a distributed log system as the primary infrastructure component. Apache Kafka dominates this space, providing distributed, partitioned, replicated logs with configurable retention. Producers append events to topics, and consumers read from topics using consumer groups that coordinate partition assignment.

This implementation separates the log from processing logic. The log provides durability, ordering, and replayability as infrastructure services. Processing applications consume from the log using client libraries, maintaining independence from the log implementation. This separation enables upgrading processors without touching the log infrastructure and vice versa.

require 'ruby-kafka'

class KafkaKappaSystem
  def initialize(brokers)
    @kafka = Kafka.new(brokers)
    @topics = {}
  end
  
  def create_event_stream(name, partitions: 10, retention: 30.days)
    @kafka.create_topic(name, num_partitions: partitions, replication_factor: 3)
    @topics[name] = {
      producer: @kafka.async_producer(
        delivery_threshold: 100,
        delivery_interval: 1
      ),
      retention: retention
    }
  end
  
  def write_event(stream, event)
    @topics[stream][:producer].produce(
      event.to_json,
      topic: stream,
      partition_key: event.partition_key
    )
  end
  
  def process_stream(stream, group_id, &block)
    consumer = @kafka.consumer(group_id: group_id)
    consumer.subscribe(stream)
    
    consumer.each_message(automatically_mark_as_processed: false) do |message|
      event = JSON.parse(message.value, symbolize_names: true)
      block.call(event)
      consumer.mark_message_as_processed(message)
    end
  end
end

Multiple processing frameworks integrate with log systems. Stream processing frameworks like Apache Flink, Apache Samza, or Kafka Streams provide high-level abstractions for transforming and aggregating streams. These frameworks handle state management, fault tolerance, and exactly-once processing semantics. Simpler use cases might use direct consumer API calls without framework overhead.

Database Change Capture

Change Data Capture (CDC) treats database transaction logs as event streams. Applications write to traditional databases using standard database operations, and CDC tools extract changes from transaction logs and publish them to event streams. This approach bridges conventional database-centric architectures with streaming patterns.

CDC enables Kappa Architecture without requiring applications to write directly to event logs. Legacy applications continue using databases while streaming processors consume change events. The database transaction log becomes the source of truth, and the event stream provides a streaming view of changes.

require 'debezium'

class DatabaseCDC
  def initialize(database_config)
    @connector = Debezium::Connector.new(
      connector_class: 'io.debezium.connector.postgresql.PostgresConnector',
      tasks_max: 1,
      database_hostname: database_config[:host],
      database_dbname: database_config[:database],
      database_user: database_config[:user],
      database_password: database_config[:password],
      table_include_list: database_config[:tables]
    )
  end
  
  def stream_changes
    @connector.start do |change_event|
      {
        operation: change_event.operation, # CREATE, UPDATE, DELETE
        table: change_event.table,
        before: change_event.before_values,
        after: change_event.after_values,
        timestamp: change_event.timestamp
      }
    end
  end
end

# Stream processor consuming CDC events
class CDCProcessor
  def process_database_changes(cdc_stream)
    cdc_stream.stream_changes do |change|
      case change[:operation]
      when 'CREATE', 'UPDATE'
        update_search_index(change[:after])
        invalidate_cache(change[:table], change[:after]['id'])
      when 'DELETE'
        remove_from_search_index(change[:table], change[:before]['id'])
        invalidate_cache(change[:table], change[:before]['id'])
      end
    end
  end
end

CDC introduces latency between database writes and event availability. Transaction log parsing and event publishing add milliseconds to seconds of delay. Applications requiring immediate consistency between database state and derived views must account for this lag.

Event Sourcing Integration

Event sourcing stores all state changes as a sequence of events, making it naturally compatible with Kappa Architecture. Applications model domain logic as events that represent state transitions. These events append to an event store, which serves as both the system of record and the input stream for processors.

The event store functions as the immutable log in Kappa Architecture. Read models derive from the event stream through projection processors that build query-optimized views. Changing projections requires replaying events, which event sourcing architectures already support.

class EventSourcedKappa
  def initialize
    @event_store = EventStore.new
    @projections = {}
  end
  
  def append_event(aggregate_id, event)
    @event_store.append(
      stream: "aggregate-#{aggregate_id}",
      event: event,
      expected_version: event.expected_version
    )
  end
  
  def register_projection(name, &block)
    @projections[name] = Projection.new(name, block)
  end
  
  def rebuild_projection(name)
    projection = @projections[name]
    projection.reset
    
    @event_store.read_all_events do |event|
      projection.apply(event)
    end
  end
end

# Example projection
class OrderAnalyticsProjection
  def initialize
    @daily_totals = {}
    @product_counts = {}
  end
  
  def apply(event)
    case event.type
    when 'order_placed'
      date = event.timestamp.to_date
      @daily_totals[date] ||= 0
      @daily_totals[date] += event.data[:total]
      
      event.data[:items].each do |item|
        @product_counts[item[:product_id]] ||= 0
        @product_counts[item[:product_id]] += item[:quantity]
      end
    end
  end
end

Event sourcing provides stronger guarantees around event ordering and completeness. Events for the same aggregate serialize through optimistic concurrency checks, preventing lost updates. This serialization maintains consistency that pure streaming architectures must implement separately.

Microservices with Event Streaming

Microservices architectures use event streaming for service communication and state synchronization. Each service publishes events describing state changes, and other services subscribe to relevant event streams. This pattern creates a distributed event log where each service contributes its events.

Services implement Kappa Architecture locally by treating incoming event streams as inputs and maintaining derived views specific to their needs. A recommendation service might consume purchase and view events to maintain user preference profiles. An inventory service consumes order events to update stock levels.

class OrderService
  def initialize(event_bus)
    @event_bus = event_bus
    @orders = {}
  end
  
  def place_order(order_data)
    order = Order.new(order_data)
    @orders[order.id] = order
    
    @event_bus.publish('orders', {
      type: 'order_placed',
      order_id: order.id,
      user_id: order.user_id,
      items: order.items,
      total: order.total,
      timestamp: Time.now
    })
    
    order
  end
end

class InventoryService
  def initialize(event_bus)
    @event_bus = event_bus
    @stock_levels = {}
    
    @event_bus.subscribe('orders') do |event|
      process_order_event(event)
    end
  end
  
  def process_order_event(event)
    return unless event[:type] == 'order_placed'
    
    event[:items].each do |item|
      @stock_levels[item[:product_id]] ||= load_current_stock(item[:product_id])
      @stock_levels[item[:product_id]] -= item[:quantity]
      
      if @stock_levels[item[:product_id]] < 10
        @event_bus.publish('inventory', {
          type: 'low_stock_alert',
          product_id: item[:product_id],
          current_level: @stock_levels[item[:product_id]]
        })
      end
    end
  end
end

Service independence introduces challenges around event schema coordination and versioning. Changes to event formats impact all consuming services. Schema registries and API contracts help manage evolution, but distributed ownership complicates coordination compared to monolithic architectures.

Ruby Implementation

Ruby's ecosystem provides libraries and patterns for implementing Kappa Architecture components, though the Ruby community focuses more on request-response web applications than stream processing.

Kafka Integration

The ruby-kafka gem provides a native Ruby client for Apache Kafka, enabling both event production and consumption. The library supports producer configurations for throughput optimization and consumer groups for parallel processing.

require 'ruby-kafka'

class RubyKafkaProducer
  def initialize(brokers)
    @kafka = Kafka.new(
      seed_brokers: brokers,
      client_id: 'ruby-producer',
      logger: Logger.new($stdout)
    )
    
    # Async producer buffers messages for batch delivery
    @producer = @kafka.async_producer(
      delivery_threshold: 100,     # Deliver after 100 messages
      delivery_interval: 5,         # Or after 5 seconds
      max_buffer_size: 1000,
      max_buffer_bytesize: 10_000_000
    )
  end
  
  def produce_event(topic, event)
    @producer.produce(
      event.to_json,
      topic: topic,
      partition_key: event[:user_id]&.to_s,
      headers: {
        'event_type' => event[:type],
        'timestamp' => Time.now.to_i.to_s
      }
    )
  end
  
  def shutdown
    @producer.deliver_messages  # Flush remaining messages
    @producer.shutdown
  end
end

Consumer implementations handle message fetching, offset management, and rebalancing. The synchronous consumer API processes messages sequentially with manual offset commits for delivery guarantees.

class RubyKafkaConsumer
  def initialize(brokers, group_id)
    @kafka = Kafka.new(seed_brokers: brokers)
    @consumer = @kafka.consumer(
      group_id: group_id,
      session_timeout: 30,
      offset_commit_interval: 10,
      offset_retention_time: 7 * 24 * 60 * 60  # 1 week
    )
  end
  
  def consume_events(topic, processor)
    @consumer.subscribe(topic, start_from_beginning: false)
    
    @consumer.each_message(automatically_mark_as_processed: false) do |message|
      begin
        event = JSON.parse(message.value, symbolize_names: true)
        processor.process(event)
        
        # Only commit after successful processing
        @consumer.mark_message_as_processed(message)
      rescue JSON::ParserError => e
        Logger.error("Invalid JSON in message: #{e.message}")
        # Skip malformed messages
        @consumer.mark_message_as_processed(message)
      rescue StandardError => e
        Logger.error("Processing failed: #{e.message}")
        # Don't mark as processed - will retry
        raise
      end
    end
  end
  
  def stop
    @consumer.stop
  end
end

Stream Processing with Karafka

Karafka provides a framework for building Kafka-based stream processing applications in Ruby. It handles consumer lifecycle management, routing, and worker coordination.

require 'karafka'

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka = { 'bootstrap.servers': 'localhost:9092' }
    config.client_id = 'kappa-processor'
  end
  
  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      partition_assignment_strategy :range
      max_wait_time 1_000
    end
    
    topic :user_events do
      consumer UserEventsConsumer
      max_messages 100
    end
  end
end

class OrdersConsumer < Karafka::BaseConsumer
  def initialize
    super
    @redis = Redis.new
    @metrics = Metrics.new
  end
  
  def consume
    messages.each do |message|
      event = JSON.parse(message.raw_payload, symbolize_names: true)
      
      case event[:type]
      when 'order_placed'
        update_revenue_metrics(event)
        update_product_sales(event)
      when 'order_cancelled'
        adjust_revenue_metrics(event)
      end
      
      mark_as_consumed(message)
    end
  end
  
  private
  
  def update_revenue_metrics(event)
    date_key = "revenue:#{event[:timestamp].to_date}"
    @redis.incrbyfloat(date_key, event[:total])
    @metrics.increment('orders.placed')
  end
  
  def update_product_sales(event)
    event[:items].each do |item|
      product_key = "product:#{item[:id]}:sales"
      @redis.incrby(product_key, item[:quantity])
    end
  end
end

State Management with Redis

Redis provides fast external state storage for stream processors. Processors maintain derived views in Redis data structures, enabling queries while processing continues.

class RedisStateStore
  def initialize(redis_url = 'redis://localhost:6379')
    @redis = Redis.new(url: redis_url)
  end
  
  # Atomic counter updates
  def increment_counter(key, by: 1)
    @redis.incrby(key, by)
  end
  
  # Time-series data with sorted sets
  def record_metric(metric_name, value, timestamp = Time.now)
    @redis.zadd(
      "timeseries:#{metric_name}",
      timestamp.to_f,
      "#{timestamp.to_f}:#{value}"
    )
  end
  
  def get_metric_range(metric_name, start_time, end_time)
    @redis.zrangebyscore(
      "timeseries:#{metric_name}",
      start_time.to_f,
      end_time.to_f
    ).map { |entry| entry.split(':').last.to_f }
  end
  
  # Aggregated views with hashes
  def update_aggregation(key, field, value)
    @redis.hincrby(key, field, value)
  end
  
  def get_aggregation(key)
    @redis.hgetall(key).transform_values(&:to_i)
  end
  
  # Windowed aggregations with expiration
  def update_windowed_metric(metric_name, window_duration, value)
    window_key = current_window_key(metric_name, window_duration)
    @redis.multi do |txn|
      txn.incrbyfloat(window_key, value)
      txn.expire(window_key, window_duration * 2)
    end
  end
  
  private
  
  def current_window_key(metric_name, window_duration)
    window_start = (Time.now.to_i / window_duration) * window_duration
    "window:#{metric_name}:#{window_start}"
  end
end

Reprocessing Coordinator

Implementing reprocessing requires coordinating between old and new processor versions while maintaining service availability.

class ReprocessingCoordinator
  def initialize(kafka_brokers, state_store)
    @kafka_brokers = kafka_brokers
    @state_store = state_store
    @processors = {}
  end
  
  def start_reprocessing(processor_class, version)
    old_processor = @processors[:current]
    
    # Create new processor with temporary state
    temp_state = create_versioned_state(version)
    new_processor = processor_class.new(
      brokers: @kafka_brokers,
      group_id: "processor-v#{version}",
      state_store: temp_state
    )
    
    # Start new processor from beginning
    Thread.new do
      new_processor.process_from_beginning
    end
    
    # Monitor catch-up progress
    monitor_thread = Thread.new do
      loop do
        lag = calculate_lag(new_processor)
        puts "Reprocessing lag: #{lag} messages"
        
        if lag < 1000  # Near caught up
          cutover(old_processor, new_processor, version)
          break
        end
        
        sleep(10)
      end
    end
    
    @processors[version] = {
      processor: new_processor,
      monitor: monitor_thread
    }
  end
  
  private
  
  def create_versioned_state(version)
    RedisStateStore.new("redis://localhost:6379/#{version}")
  end
  
  def calculate_lag(processor)
    consumer_lag = processor.consumer.lag
    consumer_lag.values.sum { |partition_lag| partition_lag.lag }
  end
  
  def cutover(old_processor, new_processor, version)
    # Brief pause for final synchronization
    sleep(2)
    
    # Atomic switch of state store references
    @state_store.switch_to_version(version)
    
    # Stop old processor
    old_processor.stop
    
    # Promote new processor to current
    @processors[:current] = new_processor
    
    puts "Cutover complete to version #{version}"
  end
end

Event Schema Management

Schema validation and evolution require careful handling to maintain compatibility across processor versions.

require 'json-schema'

class EventSchemaRegistry
  def initialize
    @schemas = {}
    load_schemas
  end
  
  def load_schemas
    Dir.glob('schemas/*.json').each do |schema_file|
      schema = JSON.parse(File.read(schema_file))
      event_type = schema['title']
      version = schema['version']
      
      @schemas[event_type] ||= {}
      @schemas[event_type][version] = schema
    end
  end
  
  def validate_event(event_type, event_data)
    version = event_data['schema_version'] || latest_version(event_type)
    schema = @schemas.dig(event_type, version)
    
    raise "Unknown event type: #{event_type}" unless schema
    
    JSON::Validator.validate!(schema, event_data)
  end
  
  def latest_version(event_type)
    @schemas[event_type].keys.max
  end
  
  def check_compatibility(event_type, new_schema)
    current_version = latest_version(event_type)
    current_schema = @schemas[event_type][current_version]
    
    # Check backward compatibility
    backward_compatible = check_backward_compatibility(current_schema, new_schema)
    
    # Check forward compatibility
    forward_compatible = check_forward_compatibility(current_schema, new_schema)
    
    {
      backward_compatible: backward_compatible,
      forward_compatible: forward_compatible,
      fully_compatible: backward_compatible && forward_compatible
    }
  end
  
  private
  
  def check_backward_compatibility(old_schema, new_schema)
    # New schema can read data written with old schema
    old_required = Set.new(old_schema['required'] || [])
    new_required = Set.new(new_schema['required'] || [])
    
    # New schema must not require fields that didn't exist
    (new_required - old_required).empty?
  end
  
  def check_forward_compatibility(old_schema, new_schema)
    # Old schema can read data written with new schema
    old_properties = old_schema['properties'].keys
    new_properties = new_schema['properties'].keys
    
    # No removal of properties that old schema expects
    (Set.new(old_properties) - Set.new(new_properties)).empty?
  end
end

Tools & Ecosystem

Several tools and libraries support building Kappa Architecture systems in Ruby and provide integration points with streaming infrastructure.

Apache Kafka

Kafka serves as the distributed log foundation for most Kappa Architecture implementations. The platform provides topics for organizing event streams, partitions for parallelism, and consumer groups for coordinated consumption.

Ruby applications interact with Kafka through the ruby-kafka gem for direct client access or higher-level frameworks like Karafka. Kafka's operational tooling includes monitoring via JMX metrics, configuration management through ZooKeeper or KRaft, and CLI tools for administration.

Event Store Databases

EventStoreDB provides a database purpose-built for event sourcing with native stream support. Events organize into streams with strong ordering guarantees and optimistic concurrency control.

require 'event_store_client'

class EventStoreKappa
  def initialize
    @client = EventStoreClient::Client.new(
      url: 'esdb://localhost:2113',
      verify_ssl: false
    )
  end
  
  def append_event(stream_name, event_type, data)
    event = EventStoreClient::DeserializedEvent.new(
      type: event_type,
      data: data,
      metadata: { timestamp: Time.now.to_i }
    )
    
    @client.append_to_stream(stream_name, event)
  end
  
  def subscribe_to_all(&block)
    @client.subscribe_to_all(from_position: :start) do |event|
      block.call({
        stream: event.stream_name,
        type: event.type,
        data: event.data,
        position: event.stream_position
      })
    end
  end
  
  def create_projection(name, query)
    @client.create_projection(
      name: name,
      query: query,
      emit: true
    )
  end
end

Stream Processing Frameworks

While Ruby lacks mature stream processing frameworks comparable to Java's ecosystem, several tools enable stream processing patterns.

The karafka framework provides consumer management, routing, and worker pools for Kafka-based processing. It handles consumer lifecycle, rebalancing, and error recovery.

# Karafka consumer with error handling
class RobustConsumer < Karafka::BaseConsumer
  def consume
    messages.each do |message|
      with_retry(max_attempts: 3) do
        process_message(message)
        mark_as_consumed(message)
      end
    end
  end
  
  private
  
  def process_message(message)
    event = parse_event(message)
    validate_event(event)
    apply_business_logic(event)
    persist_results(event)
  end
  
  def with_retry(max_attempts:)
    attempts = 0
    begin
      yield
    rescue RetryableError => e
      attempts += 1
      if attempts < max_attempts
        sleep(2**attempts)  # Exponential backoff
        retry
      else
        log_failure(e)
        raise
      end
    end
  end
end

State Storage Options

Stream processors require fast state access for maintaining aggregations and joined views. Ruby applications use various storage backends depending on requirements.

Redis excels for counters, sets, and time-series data with its atomic operations and data structure support. PostgreSQL provides ACID guarantees for state requiring transactional consistency. RocksDB offers embedded key-value storage with high performance for local state.

class MultiStoreStateManager
  def initialize
    @redis = Redis.new  # Fast counters and caches
    @postgres = PG.connect(dbname: 'analytics')  # Transactional state
    @rocksdb = RocksDB::DB.new('/tmp/rocksdb')  # Local state
  end
  
  def increment_counter(key)
    @redis.incr(key)
  end
  
  def update_aggregate(entity_id, changes)
    @postgres.exec_params(
      'INSERT INTO aggregates (entity_id, data, updated_at) 
       VALUES ($1, $2, NOW())
       ON CONFLICT (entity_id) 
       DO UPDATE SET data = $2, updated_at = NOW()',
      [entity_id, changes.to_json]
    )
  end
  
  def store_local_state(key, value)
    @rocksdb.put(key, value)
  end
  
  def get_local_state(key)
    @rocksdb.get(key)
  end
end

Monitoring and Observability

Monitoring stream processing systems requires tracking consumer lag, processing rates, and error rates. The ruby-kafka gem provides instrumentation hooks for metrics collection.

require 'prometheus/client'

class KafkaMetrics
  def initialize
    @registry = Prometheus::Client.registry
    
    @messages_processed = @registry.counter(
      :kafka_messages_processed_total,
      docstring: 'Total messages processed',
      labels: [:topic, :partition]
    )
    
    @processing_duration = @registry.histogram(
      :kafka_message_processing_duration_seconds,
      docstring: 'Message processing duration',
      labels: [:topic]
    )
    
    @consumer_lag = @registry.gauge(
      :kafka_consumer_lag,
      docstring: 'Consumer lag in messages',
      labels: [:topic, :partition]
    )
  end
  
  def track_message_processed(topic, partition)
    @messages_processed.increment(labels: { topic: topic, partition: partition })
  end
  
  def track_processing_duration(topic, duration)
    @processing_duration.observe(duration, labels: { topic: topic })
  end
  
  def update_lag(topic, partition, lag)
    @consumer_lag.set(lag, labels: { topic: topic, partition: partition })
  end
end

class InstrumentedConsumer < Karafka::BaseConsumer
  def initialize
    super
    @metrics = KafkaMetrics.new
  end
  
  def consume
    messages.each do |message|
      start_time = Time.now
      
      process_message(message)
      
      duration = Time.now - start_time
      @metrics.track_message_processed(message.topic, message.partition)
      @metrics.track_processing_duration(message.topic, duration)
      
      mark_as_consumed(message)
    end
    
    update_lag_metrics
  end
  
  private
  
  def update_lag_metrics
    lag_info = consumer.lag
    lag_info.each do |topic, partitions|
      partitions.each do |partition, lag|
        @metrics.update_lag(topic, partition, lag)
      end
    end
  end
end

Performance Considerations

Kappa Architecture performance depends on log throughput, processing latency, and state access patterns.

Throughput and Partitioning

Event throughput scales horizontally through partitioning. Each partition provides an ordered sequence of events that processes independently. Increasing partition count enables adding more consumer instances for parallel processing.

Partition key selection impacts distribution. Hashing user IDs distributes load evenly across partitions while maintaining order for each user's events. Skewed distributions where some keys generate significantly more events than others create hot partitions that bottleneck throughput.

class PartitioningStrategy
  def initialize(partition_count)
    @partition_count = partition_count
  end
  
  # Hash-based partitioning for even distribution
  def hash_partition(key)
    Digest::MD5.hexdigest(key.to_s).to_i(16) % @partition_count
  end
  
  # Range-based partitioning for time-series data
  def range_partition(timestamp)
    hour_bucket = timestamp.hour
    hour_bucket % @partition_count
  end
  
  # Analyze partition balance
  def analyze_distribution(events)
    distribution = Hash.new(0)
    
    events.each do |event|
      partition = hash_partition(event[:key])
      distribution[partition] += 1
    end
    
    {
      min: distribution.values.min,
      max: distribution.values.max,
      avg: distribution.values.sum / distribution.size.to_f,
      skew: distribution.values.max.to_f / distribution.values.min
    }
  end
end

Producer batching amortizes network overhead across multiple events. Buffering events in memory and sending batches reduces per-message cost but increases latency between event generation and log availability. Tuning batch size and timeout balances throughput against latency requirements.

Processing Latency

End-to-end latency encompasses event production, log replication, consumption, processing, and state updates. Each stage contributes to overall delay.

Producer acknowledgment settings trade durability for latency. Waiting for replication to multiple brokers before acknowledging writes increases durability but adds milliseconds of latency. Fire-and-forget sending minimizes producer latency but risks message loss.

Consumer fetch settings affect how quickly processors receive new events. Larger fetch sizes reduce polling overhead but delay processing of small batches. Fetch wait time controls how long consumers wait for minimum batch sizes before returning partial batches.

class LatencyOptimizedConsumer
  def initialize(brokers, topic)
    @kafka = Kafka.new(seed_brokers: brokers)
    @consumer = @kafka.consumer(
      group_id: 'low-latency-processor',
      # Fetch configuration for low latency
      fetch_max_wait_time: 100,      # Max 100ms wait
      fetch_min_bytes: 1,             # Return immediately with any data
      session_timeout: 10,
      heartbeat_interval: 3
    )
    @consumer.subscribe(topic)
  end
  
  def process_with_timing
    @consumer.each_message do |message|
      receive_time = Time.now
      event = JSON.parse(message.value, symbolize_names: true)
      
      # Calculate end-to-end latency
      produce_time = Time.at(event[:timestamp])
      latency_ms = (receive_time - produce_time) * 1000
      
      process_event(event)
      
      processing_time_ms = (Time.now - receive_time) * 1000
      
      record_metrics(latency_ms, processing_time_ms)
    end
  end
end

State Access Patterns

Stream processors reading and writing state frequently face bottlenecks from state access latency. In-memory state provides microsecond access but limits state size to available memory. External stores enable larger state at the cost of network round trips.

Batching state operations amortizes connection overhead. Accumulating updates in memory and flushing periodically reduces database round trips. This batching trades freshness of materialized views for throughput.

class BatchedStateUpdater
  def initialize(state_store, batch_size: 100, flush_interval: 5)
    @state_store = state_store
    @batch_size = batch_size
    @flush_interval = flush_interval
    @pending_updates = []
    @last_flush = Time.now
    
    start_flush_timer
  end
  
  def update(key, value)
    @pending_updates << { key: key, value: value }
    
    flush if should_flush?
  end
  
  def flush
    return if @pending_updates.empty?
    
    @state_store.batch_update(@pending_updates)
    @pending_updates.clear
    @last_flush = Time.now
  end
  
  private
  
  def should_flush?
    @pending_updates.size >= @batch_size ||
      Time.now - @last_flush >= @flush_interval
  end
  
  def start_flush_timer
    Thread.new do
      loop do
        sleep(@flush_interval)
        flush
      end
    end
  end
end

Read caching reduces state access frequency for frequently accessed values. Maintaining a local cache of recently read state values avoids repeated database queries. Cache invalidation strategies ensure consistency between cache and authoritative state store.

Backpressure and Flow Control

Processors that cannot keep up with event arrival rates accumulate lag. Backpressure mechanisms prevent memory exhaustion when processing falls behind ingestion.

Consumer pause and resume provides explicit flow control. Processors monitoring their queue depth can pause consumption when buffers fill and resume when caught up. This prevents out-of-memory conditions from unbounded queues.

class BackpressureConsumer
  def initialize(kafka, max_queue_size: 1000)
    @consumer = kafka.consumer(group_id: 'backpressure-aware')
    @max_queue_size = max_queue_size
    @queue = Queue.new
    @paused = false
  end
  
  def start_consuming(topic)
    @consumer.subscribe(topic)
    
    # Consumer thread
    Thread.new do
      @consumer.each_message do |message|
        if @queue.size >= @max_queue_size && !@paused
          @consumer.pause(topic, message.partition)
          @paused = true
          puts "Paused due to backpressure"
        end
        
        @queue.push(message)
      end
    end
    
    # Processing thread
    Thread.new do
      loop do
        message = @queue.pop
        process_message(message)
        
        if @queue.size < @max_queue_size / 2 && @paused
          @consumer.resume(topic, message.partition)
          @paused = false
          puts "Resumed consumption"
        end
      end
    end
  end
end

Real-World Applications

Kappa Architecture proves effective for several production use cases where streaming semantics align with business requirements.

Real-Time Analytics Dashboards

Analytics systems displaying live metrics derive values from event streams without batch recomputation delays. User activity tracking, application monitoring, and business intelligence dashboards consume events and maintain running aggregations.

A web analytics system processes page view events to update visitor counts, session durations, and conversion funnels in real-time. Stream processors maintain windowed metrics that reflect recent activity while older data rolls off.

class RealTimeAnalytics
  def initialize
    @metrics_store = Redis.new
    @kafka_consumer = create_consumer('page_views')
  end
  
  def process_page_views
    @kafka_consumer.consume_events do |event|
      update_visitor_counts(event)
      update_page_metrics(event)
      detect_conversion_funnels(event)
      update_session_data(event)
    end
  end
  
  private
  
  def update_visitor_counts(event)
    # Real-time unique visitors using HyperLogLog
    @metrics_store.pfadd('visitors:unique', event[:visitor_id])
    @metrics_store.incr('visitors:total')
    
    # Windowed metrics for current hour
    hour_key = "visitors:#{Time.now.strftime('%Y%m%d%H')}"
    @metrics_store.pfadd(hour_key, event[:visitor_id])
    @metrics_store.expire(hour_key, 86400)
  end
  
  def update_page_metrics(event)
    page_key = "page:#{event[:path]}"
    @metrics_store.hincrby(page_key, 'views', 1)
    @metrics_store.hset(page_key, 'last_view', Time.now.to_i)
    
    # Track load time percentiles
    @metrics_store.zadd(
      "#{page_key}:load_times",
      event[:load_time],
      "#{Time.now.to_f}:#{event[:load_time]}"
    )
  end
  
  def detect_conversion_funnels(event)
    return unless event[:conversion_event]
    
    funnel_key = "funnel:#{event[:visitor_id]}"
    step = @metrics_store.llen(funnel_key)
    
    @metrics_store.rpush(funnel_key, event[:path])
    @metrics_store.expire(funnel_key, 3600)
    
    if conversion_complete?(event[:path])
      @metrics_store.incr('conversions:completed')
      @metrics_store.hincrby('conversions:by_source', event[:source], 1)
    end
  end
end

Event-Driven Microservices

Microservices coordinate through event streams rather than synchronous API calls. Services publish domain events describing state changes, and interested services consume relevant streams to maintain eventual consistency.

An e-commerce system uses events to coordinate order processing, inventory management, and fulfillment. When orders are placed, the order service publishes events consumed by inventory, payment, and shipping services.

class OrderManagementSystem
  def initialize(event_bus)
    @event_bus = event_bus
    @state_store = Redis.new
  end
  
  def process_order_lifecycle
    @event_bus.consume('orders') do |event|
      case event[:type]
      when 'order_placed'
        validate_inventory(event)
        initiate_payment(event)
      when 'payment_confirmed'
        allocate_inventory(event)
        schedule_fulfillment(event)
      when 'order_shipped'
        update_tracking(event)
        notify_customer(event)
      end
      
      update_order_state(event)
    end
  end
  
  private
  
  def validate_inventory(event)
    event[:items].each do |item|
      available = @state_store.get("inventory:#{item[:product_id]}").to_i
      
      if available < item[:quantity]
        @event_bus.publish('orders', {
          type: 'order_rejected',
          order_id: event[:order_id],
          reason: 'insufficient_inventory',
          product_id: item[:product_id]
        })
        return false
      end
    end
    
    true
  end
  
  def allocate_inventory(event)
    @event_bus.publish('inventory', {
      type: 'inventory_allocated',
      order_id: event[:order_id],
      items: event[:items],
      timestamp: Time.now
    })
  end
end

Fraud Detection Systems

Fraud detection analyzes transaction patterns in real-time to identify suspicious activity. Stream processors maintain user behavior profiles and evaluate new transactions against learned patterns and rule engines.

Transaction events flow through multiple detection stages that compute risk scores, check velocity rules, and identify anomalous patterns. High-risk transactions trigger immediate review while normal transactions proceed.

class FraudDetectionProcessor
  def initialize
    @behavior_store = Redis.new
    @rules_engine = FraudRulesEngine.new
    @ml_model = AnomalyDetectionModel.load
  end
  
  def analyze_transaction_stream
    consume_events('transactions') do |transaction|
      risk_score = calculate_risk_score(transaction)
      
      if risk_score > 0.8
        block_transaction(transaction, risk_score)
      elsif risk_score > 0.5
        flag_for_review(transaction, risk_score)
      else
        approve_transaction(transaction)
      end
      
      update_user_profile(transaction)
    end
  end
  
  private
  
  def calculate_risk_score(transaction)
    # Velocity checks
    velocity_score = check_transaction_velocity(transaction)
    
    # Behavioral analysis
    behavior_score = analyze_user_behavior(transaction)
    
    # ML anomaly detection
    anomaly_score = @ml_model.predict(transaction_features(transaction))
    
    # Rules-based checks
    rules_score = @rules_engine.evaluate(transaction)
    
    # Weighted combination
    (velocity_score * 0.3 + behavior_score * 0.3 + 
     anomaly_score * 0.2 + rules_score * 0.2)
  end
  
  def check_transaction_velocity(transaction)
    user_id = transaction[:user_id]
    window_key = "velocity:#{user_id}:#{Time.now.to_i / 300}"  # 5-min window
    
    count = @behavior_store.incr(window_key)
    @behavior_store.expire(window_key, 600)
    
    return 1.0 if count > 10  # More than 10 transactions in 5 minutes
    return 0.7 if count > 5
    0.1
  end
  
  def analyze_user_behavior(transaction)
    profile = load_user_profile(transaction[:user_id])
    
    # Compare to historical patterns
    amount_deviation = (transaction[:amount] - profile[:avg_amount]).abs / profile[:std_amount]
    location_anomaly = profile[:usual_locations].include?(transaction[:location]) ? 0 : 1
    time_anomaly = usual_transaction_time?(transaction[:timestamp], profile) ? 0 : 0.5
    
    [amount_deviation / 3, location_anomaly, time_anomaly].max
  end
end

Log Aggregation and Monitoring

Centralized logging systems aggregate application and infrastructure logs into searchable streams. Stream processors parse logs, extract metrics, detect error patterns, and trigger alerts.

Logs from distributed services flow into a central log stream. Processors enrich logs with metadata, compute error rates, and identify cascading failures across service boundaries.

Reference

Architecture Components

Component Description Purpose
Immutable Log Ordered sequence of events stored durably Source of truth for all data
Stream Processor Application that reads events and produces outputs Transforms events into views or actions
Materialized View Derived dataset optimized for queries Serves read queries efficiently
Event Producer Application that generates events Records state changes and observations
Consumer Group Set of processors sharing partition consumption Enables parallel processing
Checkpoint Saved position in event stream Allows resume after failures

Comparison with Lambda Architecture

Aspect Kappa Architecture Lambda Architecture
Processing Paths Single stream processing path Separate batch and speed layers
Code Duplication Single implementation Duplicate logic in batch and stream
Reprocessing Replay events from log Rerun batch jobs
Operational Complexity Lower - one system to operate Higher - coordinate two systems
Query Consistency Eventually consistent Requires merging batch and stream results
State Management Stream processor state only Batch layer state and stream state
Best For Stream-native workloads Complex batch analytics

Event Log Characteristics

Property Description Implementation Consideration
Immutability Events never modified after writing Design around append-only operations
Ordering Events ordered within partitions Use partition keys to maintain order
Retention Duration events remain available Balance reprocessing needs with storage costs
Durability Events survive failures Configure replication factor appropriately
Partitioning Events distributed across partitions Choose partition keys for even distribution

Ruby Kafka Configuration Options

Setting Purpose Typical Value
delivery_threshold Messages before flush 100-1000
delivery_interval Seconds before flush 1-10
max_buffer_size Max messages in buffer 1000-10000
session_timeout Consumer heartbeat timeout 30 seconds
offset_commit_interval Frequency of offset commits 10 seconds
fetch_max_wait_time Max wait for fetch 100-500 ms

State Store Selection Matrix

Use Case Recommended Store Reason
Counters and metrics Redis Atomic operations, low latency
User profiles Redis or PostgreSQL Fast reads, structured data
Large aggregations PostgreSQL ACID guarantees, complex queries
Time-series data InfluxDB or TimescaleDB Optimized for time-series workloads
Local processor state RocksDB Embedded, high performance
Distributed coordination etcd or ZooKeeper Consensus and coordination primitives

Common Processing Patterns

Pattern Description Use Case
Map Transform each event independently Data enrichment, format conversion
Filter Select subset of events Routing, data selection
Aggregate Combine events into summary Counting, summing, averaging
Join Combine events from multiple streams Enrichment with reference data
Window Group events by time periods Time-based analytics
Deduplicate Remove duplicate events Idempotent processing

Reprocessing Decision Matrix

Scenario Approach Consideration
Bug fix in logic Full replay Ensure deterministic processing
Add new metric Parallel processor Minimize impact on live system
Schema migration Versioned processor Maintain compatibility during transition
Historical backfill Batch from log May require additional resources
Performance optimization Shadow processing Validate before cutover

Performance Optimization Techniques

Technique Impact Trade-off
Increase partitions Higher throughput More coordination overhead
Batch state updates Reduced latency Delayed view consistency
Local caching Faster reads Memory usage, consistency lag
Async processing Better throughput Complexity in error handling
Compression Lower storage costs CPU overhead
Consumer parallelization Faster processing Increased resource usage

Monitoring Metrics

Metric Description Alert Threshold
Consumer lag Messages behind latest offset Over 10000 for critical streams
Processing rate Messages per second Below expected rate
Error rate Failed processing attempts Over 1%
End-to-end latency Time from produce to process Over 10 seconds
State store latency Read/write operation time Over 100ms
Rebalance frequency Consumer group rebalances More than 1 per hour