Overview
Kappa Architecture represents a data processing architecture pattern that addresses the operational complexity of Lambda Architecture by proposing a single processing path. Jay Kreps introduced this approach in 2014 as a response to the dual-layer complexity of maintaining both batch and stream processing systems. The architecture centers on treating all data as streams and storing events in an immutable, replayable log.
The fundamental premise replaces Lambda Architecture's batch and speed layers with a unified stream processing layer. All data flows through a distributed log system where events are stored in order of arrival. This log serves as the source of truth, and all processing happens through stream processors that read from this log. When reprocessing becomes necessary due to code changes or error correction, the system creates a new stream processor instance that reads from the beginning of the log while the current processor continues serving live traffic.
Traditional database architectures model data as mutable state that changes through updates and deletes. Kappa Architecture inverts this model by treating the log of changes as primary and derived views as secondary. Applications generate events that append to the log, and stream processors transform these events into materialized views optimized for queries. The immutable log preserves complete history, enabling time-travel queries and reproducible data processing.
Consider an e-commerce analytics system. In Lambda Architecture, raw purchase events would feed into both a batch system that processes daily summaries and a stream processor handling real-time updates. Kappa Architecture eliminates this duplication by processing all events through a stream processor that maintains running totals. When the calculation logic changes, a new processor version starts reading from the log's beginning, computes updated totals in parallel with the live processor, and switches over once caught up.
# Kappa Architecture event flow
class OrderEvent
attr_reader :order_id, :amount, :timestamp
def initialize(order_id:, amount:, timestamp:)
@order_id = order_id
@amount = amount
@timestamp = timestamp
end
end
# Events append to immutable log
event_log = EventLog.new('orders')
event_log.append(OrderEvent.new(
order_id: 'ord-123',
amount: 99.99,
timestamp: Time.now
))
The architecture proves particularly suitable for event-driven systems, real-time analytics, and applications where data lineage and reprocessing capabilities matter. Organizations processing continuous data streams benefit from the simplified operational model and reduced code duplication.
Key Principles
Kappa Architecture builds on three foundational principles that distinguish it from traditional batch-oriented and hybrid architectures.
Immutable Event Log as Source of Truth
All data exists as a sequence of immutable events stored in order of occurrence. Applications generate events representing state changes, user actions, or system observations. These events append to a distributed log that preserves them indefinitely or according to retention policies. The log functions as the authoritative record, and no process modifies events after writing. This immutability guarantees that replaying events produces identical results given the same processing logic.
class ImmutableLog
def initialize(topic)
@topic = topic
@kafka = Kafka.new(['localhost:9092'])
@producer = @kafka.producer
end
def append(event)
@producer.produce(
event.to_json,
topic: @topic,
partition_key: event.id
)
@producer.deliver_messages
end
# Events never deleted or modified, only added
def read_from(offset)
consumer = @kafka.consumer(group_id: 'processor')
consumer.subscribe(@topic)
consumer.seek(@topic, 0, offset)
consumer.each_message do |message|
yield JSON.parse(message.value)
end
end
end
Single Stream Processing Path
All computation happens through stream processors that read from the event log and produce derived views or trigger actions. The architecture eliminates separate batch and speed layers, unifying data processing in a single model. Stream processors consume events continuously, maintaining state that updates with each new event. This unified approach removes the complexity of reconciling results from different processing systems and eliminates code duplication between batch and streaming implementations.
Stream processors can be parallelized by partitioning the input log, with each processor instance handling a subset of partitions. This partitioning enables horizontal scaling while maintaining ordered processing within partitions. Processors checkpoint their progress, allowing recovery from failures by resuming from the last committed offset.
class StreamProcessor
def initialize(log, view_store)
@log = log
@view_store = view_store
@state = {}
end
def process
@log.read_from(last_checkpoint) do |event|
case event['type']
when 'order_placed'
update_revenue_view(event)
when 'order_cancelled'
adjust_revenue_view(event)
end
checkpoint(event['offset'])
end
end
private
def update_revenue_view(event)
date = Date.parse(event['timestamp'])
current = @view_store.get("revenue:#{date}") || 0
@view_store.set("revenue:#{date}", current + event['amount'])
end
end
Reprocessing Through Log Replay
Changes to processing logic or corrections of errors require recomputing derived views. Rather than implementing separate backfill mechanisms, Kappa Architecture handles reprocessing by starting a new processor instance that reads from the log's beginning. This new processor runs in parallel with the current production processor, computing results from scratch. Once the new processor catches up to the log's current end, the application switches to using its output, and the old processor shuts down.
This approach guarantees that reprocessed results match what would have been produced if the new logic had been running from the start. The technique works because the log preserves the complete event history, and processing logic operates deterministically on this history.
class ReprocessingCoordinator
def reprocess_with_new_logic(processor_version)
new_processor = StreamProcessor.new(
@log,
view_store: create_temporary_store(processor_version)
)
# Start from beginning of log
reprocessing_task = Thread.new do
new_processor.process_from_offset(0)
end
# Wait for catch-up
loop do
break if new_processor.current_offset >= @log.latest_offset - 1000
sleep(5)
end
# Switch to new view
@view_store.atomically do
promote_temporary_store(processor_version)
end
@current_processor.shutdown
@current_processor = new_processor
end
end
The log functions as both the source of input events and the mechanism for state recovery. This dual role simplifies the architecture by eliminating the need for separate snapshot storage and restoration logic. Processors can recreate any derived view by replaying the relevant portion of the log.
State management becomes a function of event processing. Processors maintain local state that updates with each event, but this state remains ephemeral and disposable. Losing processor state does not cause data loss because replaying from the log reconstructs the state. This property enables stateless or state-light processor designs that simplify scaling and recovery.
The architecture supports multiple simultaneous views of the same data. Different processors can read the same event log and produce different materialized views optimized for specific query patterns. An analytics system might maintain time-series aggregations, user-level summaries, and product-level metrics from a single stream of transaction events.
Design Considerations
Selecting Kappa Architecture involves evaluating trade-offs between operational simplicity and specific processing requirements.
When Kappa Architecture Fits
Stream-first workloads where data arrives continuously and requires near-real-time processing benefit most from Kappa Architecture. Systems processing user interactions, sensor telemetry, application logs, or financial transactions align well with the streaming model. The architecture excels when queries primarily access recent data or time-windowed aggregations rather than requiring full historical scans.
Event-driven systems that model state changes as domain events match Kappa's event log naturally. Applications implementing event sourcing or CQRS patterns find the immutable log provides the event store and reprocessing capability these patterns require. The single processing path reduces complexity compared to maintaining separate batch and streaming implementations.
Organizations prioritizing operational simplicity over maximum flexibility gain from Kappa's unified model. Teams avoid the coordination overhead of keeping batch and streaming code synchronized and the infrastructure costs of running parallel processing systems. The reduced surface area for bugs and the simplified deployment model accelerate development velocity.
# Good fit: Real-time analytics on streaming data
class UserActivityAnalytics
def initialize
@activity_log = ImmutableLog.new('user_activity')
@metrics_store = Redis.new
end
def process_activity_stream
StreamProcessor.new(@activity_log, @metrics_store).process do |event|
increment_metric("user:#{event['user_id']}:actions")
increment_metric("feature:#{event['feature']}:usage")
update_last_seen(event['user_id'], event['timestamp'])
end
end
end
When Lambda Architecture Remains Preferable
Complex analytical queries spanning full historical datasets often require batch processing's efficiency. Stream processors excel at incremental computation but struggle with queries demanding joins across large datasets or complex aggregations over years of history. Lambda Architecture's batch layer handles these cases more efficiently by leveraging distributed batch processing frameworks.
Use cases requiring exact results rather than approximate streaming answers favor Lambda's batch layer. While stream processors can maintain accurate counts and sums for bounded windows, computing exact distinct counts or complex percentiles across unbounded data proves challenging. Batch jobs that process complete datasets produce precise results without the approximations streaming systems sometimes require.
Existing infrastructure investments in batch processing systems may justify Lambda Architecture. Organizations with mature Hadoop or Spark deployments and teams experienced in batch processing might find extending these systems more practical than migrating to pure streaming. The transition costs and learning curve for stream processing could outweigh Kappa's operational benefits.
Log Retention and Storage Costs
Kappa Architecture's reprocessing capability depends on retaining the complete event log. Storage costs scale with retention period and event volume. High-velocity systems generating millions of events per second accumulate petabytes of log data over months. Organizations must balance reprocessing flexibility against storage expenses.
Tiered storage strategies help manage costs by moving older log segments to cheaper storage tiers. Recent data remains on fast SSDs for active processing, while historical segments migrate to object storage or cold storage. Reprocessing jobs read from appropriate tiers based on the data range needed.
class TieredLogStorage
def initialize
@hot_storage = KafkaCluster.new('hot', retention: 7.days)
@warm_storage = S3Bucket.new('warm', retention: 90.days)
@cold_storage = Glacier.new('cold')
end
def archive_old_segments
@hot_storage.segments_older_than(7.days).each do |segment|
@warm_storage.upload(segment)
segment.delete if @warm_storage.verify(segment)
end
@warm_storage.segments_older_than(90.days).each do |segment|
@cold_storage.archive(segment)
segment.delete if @cold_storage.verify(segment)
end
end
def reprocess_from_date(start_date)
# Route reads to appropriate tier
tier = storage_tier_for_date(start_date)
tier.read_from(start_date)
end
end
Compaction policies reduce storage by consolidating events that supersede each other. For entity-oriented data where later events replace earlier versions, log compaction retains only the most recent state for each entity. This technique preserves the ability to reconstruct current state while discarding intermediate history.
Schema Evolution and Compatibility
Event schemas evolve as applications change. Adding fields to events, changing data types, or restructuring nested objects impacts processors reading from the log. Schema compatibility strategies determine how processors handle events written with different schema versions.
Forward compatibility allows old processors to read new events by ignoring unknown fields. Backward compatibility lets new processors read old events by providing defaults for missing fields. Kappa Architecture processors typically require both forward and backward compatibility to support gradual rollouts and reprocessing.
class EventDeserializer
def deserialize(event_data, expected_version)
parsed = JSON.parse(event_data)
schema_version = parsed['schema_version'] || 1
case schema_version
when 1
# Old schema: amount as integer cents
{
order_id: parsed['order_id'],
amount: parsed['amount'] / 100.0,
currency: 'USD' # Default for v1
}
when 2
# New schema: amount as decimal with currency
{
order_id: parsed['order_id'],
amount: parsed['amount'],
currency: parsed['currency']
}
end
end
end
Schema registries provide centralized schema management and validation. Producers register schemas before writing events, and consumers fetch schemas when reading. This centralization enables schema evolution policies and compatibility checking. The registry validates that schema changes maintain compatibility, preventing breaking changes.
State Size and Partitioning
Stream processors maintaining state face memory constraints as state grows. Processors holding aggregations, joined views, or entity caches must fit state in memory or spill to disk. Large state impacts performance and recovery time after failures.
Partitioning distributes state across multiple processor instances. Each instance handles a subset of keys, maintaining only the state for its partition. Proper partition key selection ensures even distribution and maintains processing guarantees. Keys representing entities should hash to consistent partitions to keep related state together.
class PartitionedProcessor
def initialize(partition_id, total_partitions)
@partition_id = partition_id
@total_partitions = total_partitions
@local_state = {}
end
def process_event(event)
# Only process if event belongs to this partition
event_partition = hash_key(event.key) % @total_partitions
return unless event_partition == @partition_id
update_local_state(event)
end
def handles_key?(key)
hash_key(key) % @total_partitions == @partition_id
end
private
def hash_key(key)
Digest::MD5.hexdigest(key).to_i(16)
end
end
External state stores supplement in-memory state for large datasets. Processors store state in key-value databases like RocksDB or Redis, accessing only the needed subset during processing. This approach trades some performance for the ability to handle state larger than available memory.
Implementation Approaches
Multiple strategies exist for implementing Kappa Architecture, each with different trade-offs around complexity, performance, and operational characteristics.
Log-Centric Implementation
The log-centric approach uses a distributed log system as the primary infrastructure component. Apache Kafka dominates this space, providing distributed, partitioned, replicated logs with configurable retention. Producers append events to topics, and consumers read from topics using consumer groups that coordinate partition assignment.
This implementation separates the log from processing logic. The log provides durability, ordering, and replayability as infrastructure services. Processing applications consume from the log using client libraries, maintaining independence from the log implementation. This separation enables upgrading processors without touching the log infrastructure and vice versa.
require 'ruby-kafka'
class KafkaKappaSystem
def initialize(brokers)
@kafka = Kafka.new(brokers)
@topics = {}
end
def create_event_stream(name, partitions: 10, retention: 30.days)
@kafka.create_topic(name, num_partitions: partitions, replication_factor: 3)
@topics[name] = {
producer: @kafka.async_producer(
delivery_threshold: 100,
delivery_interval: 1
),
retention: retention
}
end
def write_event(stream, event)
@topics[stream][:producer].produce(
event.to_json,
topic: stream,
partition_key: event.partition_key
)
end
def process_stream(stream, group_id, &block)
consumer = @kafka.consumer(group_id: group_id)
consumer.subscribe(stream)
consumer.each_message(automatically_mark_as_processed: false) do |message|
event = JSON.parse(message.value, symbolize_names: true)
block.call(event)
consumer.mark_message_as_processed(message)
end
end
end
Multiple processing frameworks integrate with log systems. Stream processing frameworks like Apache Flink, Apache Samza, or Kafka Streams provide high-level abstractions for transforming and aggregating streams. These frameworks handle state management, fault tolerance, and exactly-once processing semantics. Simpler use cases might use direct consumer API calls without framework overhead.
Database Change Capture
Change Data Capture (CDC) treats database transaction logs as event streams. Applications write to traditional databases using standard database operations, and CDC tools extract changes from transaction logs and publish them to event streams. This approach bridges conventional database-centric architectures with streaming patterns.
CDC enables Kappa Architecture without requiring applications to write directly to event logs. Legacy applications continue using databases while streaming processors consume change events. The database transaction log becomes the source of truth, and the event stream provides a streaming view of changes.
require 'debezium'
class DatabaseCDC
def initialize(database_config)
@connector = Debezium::Connector.new(
connector_class: 'io.debezium.connector.postgresql.PostgresConnector',
tasks_max: 1,
database_hostname: database_config[:host],
database_dbname: database_config[:database],
database_user: database_config[:user],
database_password: database_config[:password],
table_include_list: database_config[:tables]
)
end
def stream_changes
@connector.start do |change_event|
{
operation: change_event.operation, # CREATE, UPDATE, DELETE
table: change_event.table,
before: change_event.before_values,
after: change_event.after_values,
timestamp: change_event.timestamp
}
end
end
end
# Stream processor consuming CDC events
class CDCProcessor
def process_database_changes(cdc_stream)
cdc_stream.stream_changes do |change|
case change[:operation]
when 'CREATE', 'UPDATE'
update_search_index(change[:after])
invalidate_cache(change[:table], change[:after]['id'])
when 'DELETE'
remove_from_search_index(change[:table], change[:before]['id'])
invalidate_cache(change[:table], change[:before]['id'])
end
end
end
end
CDC introduces latency between database writes and event availability. Transaction log parsing and event publishing add milliseconds to seconds of delay. Applications requiring immediate consistency between database state and derived views must account for this lag.
Event Sourcing Integration
Event sourcing stores all state changes as a sequence of events, making it naturally compatible with Kappa Architecture. Applications model domain logic as events that represent state transitions. These events append to an event store, which serves as both the system of record and the input stream for processors.
The event store functions as the immutable log in Kappa Architecture. Read models derive from the event stream through projection processors that build query-optimized views. Changing projections requires replaying events, which event sourcing architectures already support.
class EventSourcedKappa
def initialize
@event_store = EventStore.new
@projections = {}
end
def append_event(aggregate_id, event)
@event_store.append(
stream: "aggregate-#{aggregate_id}",
event: event,
expected_version: event.expected_version
)
end
def register_projection(name, &block)
@projections[name] = Projection.new(name, block)
end
def rebuild_projection(name)
projection = @projections[name]
projection.reset
@event_store.read_all_events do |event|
projection.apply(event)
end
end
end
# Example projection
class OrderAnalyticsProjection
def initialize
@daily_totals = {}
@product_counts = {}
end
def apply(event)
case event.type
when 'order_placed'
date = event.timestamp.to_date
@daily_totals[date] ||= 0
@daily_totals[date] += event.data[:total]
event.data[:items].each do |item|
@product_counts[item[:product_id]] ||= 0
@product_counts[item[:product_id]] += item[:quantity]
end
end
end
end
Event sourcing provides stronger guarantees around event ordering and completeness. Events for the same aggregate serialize through optimistic concurrency checks, preventing lost updates. This serialization maintains consistency that pure streaming architectures must implement separately.
Microservices with Event Streaming
Microservices architectures use event streaming for service communication and state synchronization. Each service publishes events describing state changes, and other services subscribe to relevant event streams. This pattern creates a distributed event log where each service contributes its events.
Services implement Kappa Architecture locally by treating incoming event streams as inputs and maintaining derived views specific to their needs. A recommendation service might consume purchase and view events to maintain user preference profiles. An inventory service consumes order events to update stock levels.
class OrderService
def initialize(event_bus)
@event_bus = event_bus
@orders = {}
end
def place_order(order_data)
order = Order.new(order_data)
@orders[order.id] = order
@event_bus.publish('orders', {
type: 'order_placed',
order_id: order.id,
user_id: order.user_id,
items: order.items,
total: order.total,
timestamp: Time.now
})
order
end
end
class InventoryService
def initialize(event_bus)
@event_bus = event_bus
@stock_levels = {}
@event_bus.subscribe('orders') do |event|
process_order_event(event)
end
end
def process_order_event(event)
return unless event[:type] == 'order_placed'
event[:items].each do |item|
@stock_levels[item[:product_id]] ||= load_current_stock(item[:product_id])
@stock_levels[item[:product_id]] -= item[:quantity]
if @stock_levels[item[:product_id]] < 10
@event_bus.publish('inventory', {
type: 'low_stock_alert',
product_id: item[:product_id],
current_level: @stock_levels[item[:product_id]]
})
end
end
end
end
Service independence introduces challenges around event schema coordination and versioning. Changes to event formats impact all consuming services. Schema registries and API contracts help manage evolution, but distributed ownership complicates coordination compared to monolithic architectures.
Ruby Implementation
Ruby's ecosystem provides libraries and patterns for implementing Kappa Architecture components, though the Ruby community focuses more on request-response web applications than stream processing.
Kafka Integration
The ruby-kafka gem provides a native Ruby client for Apache Kafka, enabling both event production and consumption. The library supports producer configurations for throughput optimization and consumer groups for parallel processing.
require 'ruby-kafka'
class RubyKafkaProducer
def initialize(brokers)
@kafka = Kafka.new(
seed_brokers: brokers,
client_id: 'ruby-producer',
logger: Logger.new($stdout)
)
# Async producer buffers messages for batch delivery
@producer = @kafka.async_producer(
delivery_threshold: 100, # Deliver after 100 messages
delivery_interval: 5, # Or after 5 seconds
max_buffer_size: 1000,
max_buffer_bytesize: 10_000_000
)
end
def produce_event(topic, event)
@producer.produce(
event.to_json,
topic: topic,
partition_key: event[:user_id]&.to_s,
headers: {
'event_type' => event[:type],
'timestamp' => Time.now.to_i.to_s
}
)
end
def shutdown
@producer.deliver_messages # Flush remaining messages
@producer.shutdown
end
end
Consumer implementations handle message fetching, offset management, and rebalancing. The synchronous consumer API processes messages sequentially with manual offset commits for delivery guarantees.
class RubyKafkaConsumer
def initialize(brokers, group_id)
@kafka = Kafka.new(seed_brokers: brokers)
@consumer = @kafka.consumer(
group_id: group_id,
session_timeout: 30,
offset_commit_interval: 10,
offset_retention_time: 7 * 24 * 60 * 60 # 1 week
)
end
def consume_events(topic, processor)
@consumer.subscribe(topic, start_from_beginning: false)
@consumer.each_message(automatically_mark_as_processed: false) do |message|
begin
event = JSON.parse(message.value, symbolize_names: true)
processor.process(event)
# Only commit after successful processing
@consumer.mark_message_as_processed(message)
rescue JSON::ParserError => e
Logger.error("Invalid JSON in message: #{e.message}")
# Skip malformed messages
@consumer.mark_message_as_processed(message)
rescue StandardError => e
Logger.error("Processing failed: #{e.message}")
# Don't mark as processed - will retry
raise
end
end
end
def stop
@consumer.stop
end
end
Stream Processing with Karafka
Karafka provides a framework for building Kafka-based stream processing applications in Ruby. It handles consumer lifecycle management, routing, and worker coordination.
require 'karafka'
class KarafkaApp < Karafka::App
setup do |config|
config.kafka = { 'bootstrap.servers': 'localhost:9092' }
config.client_id = 'kappa-processor'
end
routes.draw do
topic :orders do
consumer OrdersConsumer
partition_assignment_strategy :range
max_wait_time 1_000
end
topic :user_events do
consumer UserEventsConsumer
max_messages 100
end
end
end
class OrdersConsumer < Karafka::BaseConsumer
def initialize
super
@redis = Redis.new
@metrics = Metrics.new
end
def consume
messages.each do |message|
event = JSON.parse(message.raw_payload, symbolize_names: true)
case event[:type]
when 'order_placed'
update_revenue_metrics(event)
update_product_sales(event)
when 'order_cancelled'
adjust_revenue_metrics(event)
end
mark_as_consumed(message)
end
end
private
def update_revenue_metrics(event)
date_key = "revenue:#{event[:timestamp].to_date}"
@redis.incrbyfloat(date_key, event[:total])
@metrics.increment('orders.placed')
end
def update_product_sales(event)
event[:items].each do |item|
product_key = "product:#{item[:id]}:sales"
@redis.incrby(product_key, item[:quantity])
end
end
end
State Management with Redis
Redis provides fast external state storage for stream processors. Processors maintain derived views in Redis data structures, enabling queries while processing continues.
class RedisStateStore
def initialize(redis_url = 'redis://localhost:6379')
@redis = Redis.new(url: redis_url)
end
# Atomic counter updates
def increment_counter(key, by: 1)
@redis.incrby(key, by)
end
# Time-series data with sorted sets
def record_metric(metric_name, value, timestamp = Time.now)
@redis.zadd(
"timeseries:#{metric_name}",
timestamp.to_f,
"#{timestamp.to_f}:#{value}"
)
end
def get_metric_range(metric_name, start_time, end_time)
@redis.zrangebyscore(
"timeseries:#{metric_name}",
start_time.to_f,
end_time.to_f
).map { |entry| entry.split(':').last.to_f }
end
# Aggregated views with hashes
def update_aggregation(key, field, value)
@redis.hincrby(key, field, value)
end
def get_aggregation(key)
@redis.hgetall(key).transform_values(&:to_i)
end
# Windowed aggregations with expiration
def update_windowed_metric(metric_name, window_duration, value)
window_key = current_window_key(metric_name, window_duration)
@redis.multi do |txn|
txn.incrbyfloat(window_key, value)
txn.expire(window_key, window_duration * 2)
end
end
private
def current_window_key(metric_name, window_duration)
window_start = (Time.now.to_i / window_duration) * window_duration
"window:#{metric_name}:#{window_start}"
end
end
Reprocessing Coordinator
Implementing reprocessing requires coordinating between old and new processor versions while maintaining service availability.
class ReprocessingCoordinator
def initialize(kafka_brokers, state_store)
@kafka_brokers = kafka_brokers
@state_store = state_store
@processors = {}
end
def start_reprocessing(processor_class, version)
old_processor = @processors[:current]
# Create new processor with temporary state
temp_state = create_versioned_state(version)
new_processor = processor_class.new(
brokers: @kafka_brokers,
group_id: "processor-v#{version}",
state_store: temp_state
)
# Start new processor from beginning
Thread.new do
new_processor.process_from_beginning
end
# Monitor catch-up progress
monitor_thread = Thread.new do
loop do
lag = calculate_lag(new_processor)
puts "Reprocessing lag: #{lag} messages"
if lag < 1000 # Near caught up
cutover(old_processor, new_processor, version)
break
end
sleep(10)
end
end
@processors[version] = {
processor: new_processor,
monitor: monitor_thread
}
end
private
def create_versioned_state(version)
RedisStateStore.new("redis://localhost:6379/#{version}")
end
def calculate_lag(processor)
consumer_lag = processor.consumer.lag
consumer_lag.values.sum { |partition_lag| partition_lag.lag }
end
def cutover(old_processor, new_processor, version)
# Brief pause for final synchronization
sleep(2)
# Atomic switch of state store references
@state_store.switch_to_version(version)
# Stop old processor
old_processor.stop
# Promote new processor to current
@processors[:current] = new_processor
puts "Cutover complete to version #{version}"
end
end
Event Schema Management
Schema validation and evolution require careful handling to maintain compatibility across processor versions.
require 'json-schema'
class EventSchemaRegistry
def initialize
@schemas = {}
load_schemas
end
def load_schemas
Dir.glob('schemas/*.json').each do |schema_file|
schema = JSON.parse(File.read(schema_file))
event_type = schema['title']
version = schema['version']
@schemas[event_type] ||= {}
@schemas[event_type][version] = schema
end
end
def validate_event(event_type, event_data)
version = event_data['schema_version'] || latest_version(event_type)
schema = @schemas.dig(event_type, version)
raise "Unknown event type: #{event_type}" unless schema
JSON::Validator.validate!(schema, event_data)
end
def latest_version(event_type)
@schemas[event_type].keys.max
end
def check_compatibility(event_type, new_schema)
current_version = latest_version(event_type)
current_schema = @schemas[event_type][current_version]
# Check backward compatibility
backward_compatible = check_backward_compatibility(current_schema, new_schema)
# Check forward compatibility
forward_compatible = check_forward_compatibility(current_schema, new_schema)
{
backward_compatible: backward_compatible,
forward_compatible: forward_compatible,
fully_compatible: backward_compatible && forward_compatible
}
end
private
def check_backward_compatibility(old_schema, new_schema)
# New schema can read data written with old schema
old_required = Set.new(old_schema['required'] || [])
new_required = Set.new(new_schema['required'] || [])
# New schema must not require fields that didn't exist
(new_required - old_required).empty?
end
def check_forward_compatibility(old_schema, new_schema)
# Old schema can read data written with new schema
old_properties = old_schema['properties'].keys
new_properties = new_schema['properties'].keys
# No removal of properties that old schema expects
(Set.new(old_properties) - Set.new(new_properties)).empty?
end
end
Tools & Ecosystem
Several tools and libraries support building Kappa Architecture systems in Ruby and provide integration points with streaming infrastructure.
Apache Kafka
Kafka serves as the distributed log foundation for most Kappa Architecture implementations. The platform provides topics for organizing event streams, partitions for parallelism, and consumer groups for coordinated consumption.
Ruby applications interact with Kafka through the ruby-kafka gem for direct client access or higher-level frameworks like Karafka. Kafka's operational tooling includes monitoring via JMX metrics, configuration management through ZooKeeper or KRaft, and CLI tools for administration.
Event Store Databases
EventStoreDB provides a database purpose-built for event sourcing with native stream support. Events organize into streams with strong ordering guarantees and optimistic concurrency control.
require 'event_store_client'
class EventStoreKappa
def initialize
@client = EventStoreClient::Client.new(
url: 'esdb://localhost:2113',
verify_ssl: false
)
end
def append_event(stream_name, event_type, data)
event = EventStoreClient::DeserializedEvent.new(
type: event_type,
data: data,
metadata: { timestamp: Time.now.to_i }
)
@client.append_to_stream(stream_name, event)
end
def subscribe_to_all(&block)
@client.subscribe_to_all(from_position: :start) do |event|
block.call({
stream: event.stream_name,
type: event.type,
data: event.data,
position: event.stream_position
})
end
end
def create_projection(name, query)
@client.create_projection(
name: name,
query: query,
emit: true
)
end
end
Stream Processing Frameworks
While Ruby lacks mature stream processing frameworks comparable to Java's ecosystem, several tools enable stream processing patterns.
The karafka framework provides consumer management, routing, and worker pools for Kafka-based processing. It handles consumer lifecycle, rebalancing, and error recovery.
# Karafka consumer with error handling
class RobustConsumer < Karafka::BaseConsumer
def consume
messages.each do |message|
with_retry(max_attempts: 3) do
process_message(message)
mark_as_consumed(message)
end
end
end
private
def process_message(message)
event = parse_event(message)
validate_event(event)
apply_business_logic(event)
persist_results(event)
end
def with_retry(max_attempts:)
attempts = 0
begin
yield
rescue RetryableError => e
attempts += 1
if attempts < max_attempts
sleep(2**attempts) # Exponential backoff
retry
else
log_failure(e)
raise
end
end
end
end
State Storage Options
Stream processors require fast state access for maintaining aggregations and joined views. Ruby applications use various storage backends depending on requirements.
Redis excels for counters, sets, and time-series data with its atomic operations and data structure support. PostgreSQL provides ACID guarantees for state requiring transactional consistency. RocksDB offers embedded key-value storage with high performance for local state.
class MultiStoreStateManager
def initialize
@redis = Redis.new # Fast counters and caches
@postgres = PG.connect(dbname: 'analytics') # Transactional state
@rocksdb = RocksDB::DB.new('/tmp/rocksdb') # Local state
end
def increment_counter(key)
@redis.incr(key)
end
def update_aggregate(entity_id, changes)
@postgres.exec_params(
'INSERT INTO aggregates (entity_id, data, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (entity_id)
DO UPDATE SET data = $2, updated_at = NOW()',
[entity_id, changes.to_json]
)
end
def store_local_state(key, value)
@rocksdb.put(key, value)
end
def get_local_state(key)
@rocksdb.get(key)
end
end
Monitoring and Observability
Monitoring stream processing systems requires tracking consumer lag, processing rates, and error rates. The ruby-kafka gem provides instrumentation hooks for metrics collection.
require 'prometheus/client'
class KafkaMetrics
def initialize
@registry = Prometheus::Client.registry
@messages_processed = @registry.counter(
:kafka_messages_processed_total,
docstring: 'Total messages processed',
labels: [:topic, :partition]
)
@processing_duration = @registry.histogram(
:kafka_message_processing_duration_seconds,
docstring: 'Message processing duration',
labels: [:topic]
)
@consumer_lag = @registry.gauge(
:kafka_consumer_lag,
docstring: 'Consumer lag in messages',
labels: [:topic, :partition]
)
end
def track_message_processed(topic, partition)
@messages_processed.increment(labels: { topic: topic, partition: partition })
end
def track_processing_duration(topic, duration)
@processing_duration.observe(duration, labels: { topic: topic })
end
def update_lag(topic, partition, lag)
@consumer_lag.set(lag, labels: { topic: topic, partition: partition })
end
end
class InstrumentedConsumer < Karafka::BaseConsumer
def initialize
super
@metrics = KafkaMetrics.new
end
def consume
messages.each do |message|
start_time = Time.now
process_message(message)
duration = Time.now - start_time
@metrics.track_message_processed(message.topic, message.partition)
@metrics.track_processing_duration(message.topic, duration)
mark_as_consumed(message)
end
update_lag_metrics
end
private
def update_lag_metrics
lag_info = consumer.lag
lag_info.each do |topic, partitions|
partitions.each do |partition, lag|
@metrics.update_lag(topic, partition, lag)
end
end
end
end
Performance Considerations
Kappa Architecture performance depends on log throughput, processing latency, and state access patterns.
Throughput and Partitioning
Event throughput scales horizontally through partitioning. Each partition provides an ordered sequence of events that processes independently. Increasing partition count enables adding more consumer instances for parallel processing.
Partition key selection impacts distribution. Hashing user IDs distributes load evenly across partitions while maintaining order for each user's events. Skewed distributions where some keys generate significantly more events than others create hot partitions that bottleneck throughput.
class PartitioningStrategy
def initialize(partition_count)
@partition_count = partition_count
end
# Hash-based partitioning for even distribution
def hash_partition(key)
Digest::MD5.hexdigest(key.to_s).to_i(16) % @partition_count
end
# Range-based partitioning for time-series data
def range_partition(timestamp)
hour_bucket = timestamp.hour
hour_bucket % @partition_count
end
# Analyze partition balance
def analyze_distribution(events)
distribution = Hash.new(0)
events.each do |event|
partition = hash_partition(event[:key])
distribution[partition] += 1
end
{
min: distribution.values.min,
max: distribution.values.max,
avg: distribution.values.sum / distribution.size.to_f,
skew: distribution.values.max.to_f / distribution.values.min
}
end
end
Producer batching amortizes network overhead across multiple events. Buffering events in memory and sending batches reduces per-message cost but increases latency between event generation and log availability. Tuning batch size and timeout balances throughput against latency requirements.
Processing Latency
End-to-end latency encompasses event production, log replication, consumption, processing, and state updates. Each stage contributes to overall delay.
Producer acknowledgment settings trade durability for latency. Waiting for replication to multiple brokers before acknowledging writes increases durability but adds milliseconds of latency. Fire-and-forget sending minimizes producer latency but risks message loss.
Consumer fetch settings affect how quickly processors receive new events. Larger fetch sizes reduce polling overhead but delay processing of small batches. Fetch wait time controls how long consumers wait for minimum batch sizes before returning partial batches.
class LatencyOptimizedConsumer
def initialize(brokers, topic)
@kafka = Kafka.new(seed_brokers: brokers)
@consumer = @kafka.consumer(
group_id: 'low-latency-processor',
# Fetch configuration for low latency
fetch_max_wait_time: 100, # Max 100ms wait
fetch_min_bytes: 1, # Return immediately with any data
session_timeout: 10,
heartbeat_interval: 3
)
@consumer.subscribe(topic)
end
def process_with_timing
@consumer.each_message do |message|
receive_time = Time.now
event = JSON.parse(message.value, symbolize_names: true)
# Calculate end-to-end latency
produce_time = Time.at(event[:timestamp])
latency_ms = (receive_time - produce_time) * 1000
process_event(event)
processing_time_ms = (Time.now - receive_time) * 1000
record_metrics(latency_ms, processing_time_ms)
end
end
end
State Access Patterns
Stream processors reading and writing state frequently face bottlenecks from state access latency. In-memory state provides microsecond access but limits state size to available memory. External stores enable larger state at the cost of network round trips.
Batching state operations amortizes connection overhead. Accumulating updates in memory and flushing periodically reduces database round trips. This batching trades freshness of materialized views for throughput.
class BatchedStateUpdater
def initialize(state_store, batch_size: 100, flush_interval: 5)
@state_store = state_store
@batch_size = batch_size
@flush_interval = flush_interval
@pending_updates = []
@last_flush = Time.now
start_flush_timer
end
def update(key, value)
@pending_updates << { key: key, value: value }
flush if should_flush?
end
def flush
return if @pending_updates.empty?
@state_store.batch_update(@pending_updates)
@pending_updates.clear
@last_flush = Time.now
end
private
def should_flush?
@pending_updates.size >= @batch_size ||
Time.now - @last_flush >= @flush_interval
end
def start_flush_timer
Thread.new do
loop do
sleep(@flush_interval)
flush
end
end
end
end
Read caching reduces state access frequency for frequently accessed values. Maintaining a local cache of recently read state values avoids repeated database queries. Cache invalidation strategies ensure consistency between cache and authoritative state store.
Backpressure and Flow Control
Processors that cannot keep up with event arrival rates accumulate lag. Backpressure mechanisms prevent memory exhaustion when processing falls behind ingestion.
Consumer pause and resume provides explicit flow control. Processors monitoring their queue depth can pause consumption when buffers fill and resume when caught up. This prevents out-of-memory conditions from unbounded queues.
class BackpressureConsumer
def initialize(kafka, max_queue_size: 1000)
@consumer = kafka.consumer(group_id: 'backpressure-aware')
@max_queue_size = max_queue_size
@queue = Queue.new
@paused = false
end
def start_consuming(topic)
@consumer.subscribe(topic)
# Consumer thread
Thread.new do
@consumer.each_message do |message|
if @queue.size >= @max_queue_size && !@paused
@consumer.pause(topic, message.partition)
@paused = true
puts "Paused due to backpressure"
end
@queue.push(message)
end
end
# Processing thread
Thread.new do
loop do
message = @queue.pop
process_message(message)
if @queue.size < @max_queue_size / 2 && @paused
@consumer.resume(topic, message.partition)
@paused = false
puts "Resumed consumption"
end
end
end
end
end
Real-World Applications
Kappa Architecture proves effective for several production use cases where streaming semantics align with business requirements.
Real-Time Analytics Dashboards
Analytics systems displaying live metrics derive values from event streams without batch recomputation delays. User activity tracking, application monitoring, and business intelligence dashboards consume events and maintain running aggregations.
A web analytics system processes page view events to update visitor counts, session durations, and conversion funnels in real-time. Stream processors maintain windowed metrics that reflect recent activity while older data rolls off.
class RealTimeAnalytics
def initialize
@metrics_store = Redis.new
@kafka_consumer = create_consumer('page_views')
end
def process_page_views
@kafka_consumer.consume_events do |event|
update_visitor_counts(event)
update_page_metrics(event)
detect_conversion_funnels(event)
update_session_data(event)
end
end
private
def update_visitor_counts(event)
# Real-time unique visitors using HyperLogLog
@metrics_store.pfadd('visitors:unique', event[:visitor_id])
@metrics_store.incr('visitors:total')
# Windowed metrics for current hour
hour_key = "visitors:#{Time.now.strftime('%Y%m%d%H')}"
@metrics_store.pfadd(hour_key, event[:visitor_id])
@metrics_store.expire(hour_key, 86400)
end
def update_page_metrics(event)
page_key = "page:#{event[:path]}"
@metrics_store.hincrby(page_key, 'views', 1)
@metrics_store.hset(page_key, 'last_view', Time.now.to_i)
# Track load time percentiles
@metrics_store.zadd(
"#{page_key}:load_times",
event[:load_time],
"#{Time.now.to_f}:#{event[:load_time]}"
)
end
def detect_conversion_funnels(event)
return unless event[:conversion_event]
funnel_key = "funnel:#{event[:visitor_id]}"
step = @metrics_store.llen(funnel_key)
@metrics_store.rpush(funnel_key, event[:path])
@metrics_store.expire(funnel_key, 3600)
if conversion_complete?(event[:path])
@metrics_store.incr('conversions:completed')
@metrics_store.hincrby('conversions:by_source', event[:source], 1)
end
end
end
Event-Driven Microservices
Microservices coordinate through event streams rather than synchronous API calls. Services publish domain events describing state changes, and interested services consume relevant streams to maintain eventual consistency.
An e-commerce system uses events to coordinate order processing, inventory management, and fulfillment. When orders are placed, the order service publishes events consumed by inventory, payment, and shipping services.
class OrderManagementSystem
def initialize(event_bus)
@event_bus = event_bus
@state_store = Redis.new
end
def process_order_lifecycle
@event_bus.consume('orders') do |event|
case event[:type]
when 'order_placed'
validate_inventory(event)
initiate_payment(event)
when 'payment_confirmed'
allocate_inventory(event)
schedule_fulfillment(event)
when 'order_shipped'
update_tracking(event)
notify_customer(event)
end
update_order_state(event)
end
end
private
def validate_inventory(event)
event[:items].each do |item|
available = @state_store.get("inventory:#{item[:product_id]}").to_i
if available < item[:quantity]
@event_bus.publish('orders', {
type: 'order_rejected',
order_id: event[:order_id],
reason: 'insufficient_inventory',
product_id: item[:product_id]
})
return false
end
end
true
end
def allocate_inventory(event)
@event_bus.publish('inventory', {
type: 'inventory_allocated',
order_id: event[:order_id],
items: event[:items],
timestamp: Time.now
})
end
end
Fraud Detection Systems
Fraud detection analyzes transaction patterns in real-time to identify suspicious activity. Stream processors maintain user behavior profiles and evaluate new transactions against learned patterns and rule engines.
Transaction events flow through multiple detection stages that compute risk scores, check velocity rules, and identify anomalous patterns. High-risk transactions trigger immediate review while normal transactions proceed.
class FraudDetectionProcessor
def initialize
@behavior_store = Redis.new
@rules_engine = FraudRulesEngine.new
@ml_model = AnomalyDetectionModel.load
end
def analyze_transaction_stream
consume_events('transactions') do |transaction|
risk_score = calculate_risk_score(transaction)
if risk_score > 0.8
block_transaction(transaction, risk_score)
elsif risk_score > 0.5
flag_for_review(transaction, risk_score)
else
approve_transaction(transaction)
end
update_user_profile(transaction)
end
end
private
def calculate_risk_score(transaction)
# Velocity checks
velocity_score = check_transaction_velocity(transaction)
# Behavioral analysis
behavior_score = analyze_user_behavior(transaction)
# ML anomaly detection
anomaly_score = @ml_model.predict(transaction_features(transaction))
# Rules-based checks
rules_score = @rules_engine.evaluate(transaction)
# Weighted combination
(velocity_score * 0.3 + behavior_score * 0.3 +
anomaly_score * 0.2 + rules_score * 0.2)
end
def check_transaction_velocity(transaction)
user_id = transaction[:user_id]
window_key = "velocity:#{user_id}:#{Time.now.to_i / 300}" # 5-min window
count = @behavior_store.incr(window_key)
@behavior_store.expire(window_key, 600)
return 1.0 if count > 10 # More than 10 transactions in 5 minutes
return 0.7 if count > 5
0.1
end
def analyze_user_behavior(transaction)
profile = load_user_profile(transaction[:user_id])
# Compare to historical patterns
amount_deviation = (transaction[:amount] - profile[:avg_amount]).abs / profile[:std_amount]
location_anomaly = profile[:usual_locations].include?(transaction[:location]) ? 0 : 1
time_anomaly = usual_transaction_time?(transaction[:timestamp], profile) ? 0 : 0.5
[amount_deviation / 3, location_anomaly, time_anomaly].max
end
end
Log Aggregation and Monitoring
Centralized logging systems aggregate application and infrastructure logs into searchable streams. Stream processors parse logs, extract metrics, detect error patterns, and trigger alerts.
Logs from distributed services flow into a central log stream. Processors enrich logs with metadata, compute error rates, and identify cascading failures across service boundaries.
Reference
Architecture Components
| Component | Description | Purpose |
|---|---|---|
| Immutable Log | Ordered sequence of events stored durably | Source of truth for all data |
| Stream Processor | Application that reads events and produces outputs | Transforms events into views or actions |
| Materialized View | Derived dataset optimized for queries | Serves read queries efficiently |
| Event Producer | Application that generates events | Records state changes and observations |
| Consumer Group | Set of processors sharing partition consumption | Enables parallel processing |
| Checkpoint | Saved position in event stream | Allows resume after failures |
Comparison with Lambda Architecture
| Aspect | Kappa Architecture | Lambda Architecture |
|---|---|---|
| Processing Paths | Single stream processing path | Separate batch and speed layers |
| Code Duplication | Single implementation | Duplicate logic in batch and stream |
| Reprocessing | Replay events from log | Rerun batch jobs |
| Operational Complexity | Lower - one system to operate | Higher - coordinate two systems |
| Query Consistency | Eventually consistent | Requires merging batch and stream results |
| State Management | Stream processor state only | Batch layer state and stream state |
| Best For | Stream-native workloads | Complex batch analytics |
Event Log Characteristics
| Property | Description | Implementation Consideration |
|---|---|---|
| Immutability | Events never modified after writing | Design around append-only operations |
| Ordering | Events ordered within partitions | Use partition keys to maintain order |
| Retention | Duration events remain available | Balance reprocessing needs with storage costs |
| Durability | Events survive failures | Configure replication factor appropriately |
| Partitioning | Events distributed across partitions | Choose partition keys for even distribution |
Ruby Kafka Configuration Options
| Setting | Purpose | Typical Value |
|---|---|---|
| delivery_threshold | Messages before flush | 100-1000 |
| delivery_interval | Seconds before flush | 1-10 |
| max_buffer_size | Max messages in buffer | 1000-10000 |
| session_timeout | Consumer heartbeat timeout | 30 seconds |
| offset_commit_interval | Frequency of offset commits | 10 seconds |
| fetch_max_wait_time | Max wait for fetch | 100-500 ms |
State Store Selection Matrix
| Use Case | Recommended Store | Reason |
|---|---|---|
| Counters and metrics | Redis | Atomic operations, low latency |
| User profiles | Redis or PostgreSQL | Fast reads, structured data |
| Large aggregations | PostgreSQL | ACID guarantees, complex queries |
| Time-series data | InfluxDB or TimescaleDB | Optimized for time-series workloads |
| Local processor state | RocksDB | Embedded, high performance |
| Distributed coordination | etcd or ZooKeeper | Consensus and coordination primitives |
Common Processing Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Map | Transform each event independently | Data enrichment, format conversion |
| Filter | Select subset of events | Routing, data selection |
| Aggregate | Combine events into summary | Counting, summing, averaging |
| Join | Combine events from multiple streams | Enrichment with reference data |
| Window | Group events by time periods | Time-based analytics |
| Deduplicate | Remove duplicate events | Idempotent processing |
Reprocessing Decision Matrix
| Scenario | Approach | Consideration |
|---|---|---|
| Bug fix in logic | Full replay | Ensure deterministic processing |
| Add new metric | Parallel processor | Minimize impact on live system |
| Schema migration | Versioned processor | Maintain compatibility during transition |
| Historical backfill | Batch from log | May require additional resources |
| Performance optimization | Shadow processing | Validate before cutover |
Performance Optimization Techniques
| Technique | Impact | Trade-off |
|---|---|---|
| Increase partitions | Higher throughput | More coordination overhead |
| Batch state updates | Reduced latency | Delayed view consistency |
| Local caching | Faster reads | Memory usage, consistency lag |
| Async processing | Better throughput | Complexity in error handling |
| Compression | Lower storage costs | CPU overhead |
| Consumer parallelization | Faster processing | Increased resource usage |
Monitoring Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
| Consumer lag | Messages behind latest offset | Over 10000 for critical streams |
| Processing rate | Messages per second | Below expected rate |
| Error rate | Failed processing attempts | Over 1% |
| End-to-end latency | Time from produce to process | Over 10 seconds |
| State store latency | Read/write operation time | Over 100ms |
| Rebalance frequency | Consumer group rebalances | More than 1 per hour |