CrackedRuby CrackedRuby

Overview

Event sourcing treats the event log as the primary source of truth for application state. Instead of storing the current state of an entity in a database and overwriting it with each change, event sourcing persists every state change as an immutable event in an append-only log. The current state derives from replaying these events from the beginning of time.

This pattern emerged from domain-driven design and CQRS (Command Query Responsibility Segregation) communities, addressing limitations in traditional CRUD systems where historical context disappears after updates. When a bank account balance changes, traditional systems record only the new balance. Event sourcing records the deposit or withdrawal event itself, preserving the complete audit trail.

The event store acts as both the database and message bus. Applications write events to the store and read them back to reconstruct state. Each event represents a fact that occurred in the past, expressed in past tense: OrderPlaced, PaymentProcessed, ItemShipped. Events contain all data needed to apply the state change, including timestamps, user identifiers, and domain-specific payload.

# Traditional state storage
class Account
  def withdraw(amount)
    @balance -= amount
    save_to_database  # Overwrites previous balance
  end
end

# Event sourcing approach
class Account
  def withdraw(amount)
    event = MoneyWithdrawn.new(
      account_id: @id,
      amount: amount,
      timestamp: Time.now
    )
    apply(event)
    event_store.append(event)
  end
end

Event sourcing provides complete audit trails, temporal queries, and debugging capabilities. The event log enables time travel to any previous state, event replay for analytics, and reconstruction of state evolution. This pattern fits domains where history matters: financial systems, healthcare records, collaborative editing, compliance-heavy industries.

Key Principles

Event sourcing operates on several foundational principles that distinguish it from state-oriented persistence.

Immutability: Events never change after creation. An event represents something that happened in the past, and the past cannot be altered. If an event contains incorrect data, the system appends a compensating event rather than modifying the original. A PaymentProcessed event with the wrong amount requires a PaymentCorrected event, not editing the original.

Append-Only Storage: The event store accepts only insertions, never updates or deletes. This simplifies concurrent writes and eliminates update conflicts. Each event receives a sequence number or position, creating a total ordering within each aggregate stream. The append-only nature enables optimizations like file-based storage and sequential I/O patterns.

State as Projection: Current state exists as a projection derived from events. Applications rebuild state by replaying events in order and applying each to an initial empty state. State transitions follow deterministic rules: given the same sequence of events, replay produces identical state. This determinism enables testing, debugging, and reasoning about system behavior.

class ShoppingCart
  def self.from_events(events)
    cart = new
    events.each { |event| cart.apply(event) }
    cart
  end

  def apply(event)
    case event
    when ItemAdded
      @items[event.product_id] = event.quantity
    when ItemRemoved
      @items.delete(event.product_id)
    when CartCleared
      @items.clear
    end
  end
end

Aggregate Boundaries: Events organize into streams, typically one stream per aggregate root in domain-driven design. An aggregate defines a consistency boundary. The Order aggregate has an orders-123 stream containing all events for that specific order. Stream isolation enables concurrent processing and prevents conflicts across aggregates.

Optimistic Concurrency: When appending events, the application specifies the expected stream version. If another process modified the stream concurrently, the append fails with a version conflict. The application loads current state, retries the operation, and appends with the new expected version. This approach avoids locks while maintaining consistency within aggregate boundaries.

Event Evolution: As systems evolve, event schemas change. Event sourcing systems need strategies for handling old event formats. Options include upcasting (transforming old events to new format during read), versioned event types, or maintaining backward-compatible schemas. Events stored today must remain readable years later.

class EventUpcaster
  def upcast(event)
    case event
    when CustomerRegisteredV1
      CustomerRegisteredV2.new(
        customer_id: event.customer_id,
        email: event.email,
        name: event.name,
        phone: nil  # New field, defaults to nil
      )
    else
      event
    end
  end
end

The event store guarantees atomicity within a stream: either all events in a batch append successfully or none do. This atomic append provides the transaction boundary. Applications cannot atomically update multiple aggregate streams; that requires eventual consistency patterns like process managers or sagas.

Ruby Implementation

Ruby implements event sourcing through several components: event classes, aggregates that apply events, event stores for persistence, and projections for queries.

Event Definition: Events use plain Ruby classes or structs. Each event type represents a domain occurrence with named attributes. Events include identifiers, timestamps, and domain payload. Ruby's flexibility supports various serialization formats.

class Event
  attr_reader :aggregate_id, :timestamp, :metadata

  def initialize(aggregate_id:, timestamp: Time.now, metadata: {})
    @aggregate_id = aggregate_id
    @timestamp = timestamp
    @metadata = metadata
  end
end

class AccountOpened < Event
  attr_reader :initial_balance, :owner_name

  def initialize(aggregate_id:, initial_balance:, owner_name:, **args)
    super(aggregate_id: aggregate_id, **args)
    @initial_balance = initial_balance
    @owner_name = owner_name
  end
end

class MoneyDeposited < Event
  attr_reader :amount

  def initialize(aggregate_id:, amount:, **args)
    super(aggregate_id: aggregate_id, **args)
    @amount = amount
  end
end

Aggregate Implementation: Aggregates maintain current state and expose commands that produce events. The aggregate applies each event to its internal state through an apply method, typically using pattern matching on event types.

class BankAccount
  attr_reader :id, :balance, :version, :uncommitted_events

  def initialize(id)
    @id = id
    @balance = 0
    @version = 0
    @uncommitted_events = []
  end

  def open(initial_balance, owner_name)
    raise "Account already opened" if @balance > 0
    apply_new_event(
      AccountOpened.new(
        aggregate_id: @id,
        initial_balance: initial_balance,
        owner_name: owner_name
      )
    )
  end

  def deposit(amount)
    raise ArgumentError, "Amount must be positive" if amount <= 0
    apply_new_event(
      MoneyDeposited.new(aggregate_id: @id, amount: amount)
    )
  end

  def withdraw(amount)
    raise ArgumentError, "Amount must be positive" if amount <= 0
    raise "Insufficient funds" if @balance < amount
    apply_new_event(
      MoneyWithdrawn.new(aggregate_id: @id, amount: amount)
    )
  end

  def load_from_history(events)
    events.each { |event| apply_event(event) }
    @uncommitted_events.clear
    self
  end

  private

  def apply_new_event(event)
    apply_event(event)
    @uncommitted_events << event
  end

  def apply_event(event)
    case event
    when AccountOpened
      @balance = event.initial_balance
    when MoneyDeposited
      @balance += event.amount
    when MoneyWithdrawn
      @balance -= event.amount
    end
    @version += 1
  end
end

Event Store Interface: The event store provides methods to append events and read stream history. A simple file-based implementation demonstrates the core concepts.

class FileEventStore
  def initialize(directory)
    @directory = directory
    FileUtils.mkdir_p(@directory)
  end

  def append_to_stream(stream_id, events, expected_version)
    stream_file = stream_path(stream_id)
    
    File.open(stream_file, 'a+') do |file|
      file.flock(File::LOCK_EX)
      
      current_events = read_events_from_file(file)
      current_version = current_events.size
      
      if current_version != expected_version
        raise ConcurrencyError, 
          "Expected version #{expected_version}, got #{current_version}"
      end
      
      events.each do |event|
        serialized = serialize_event(event)
        file.puts(serialized)
      end
    end
  end

  def read_stream(stream_id)
    stream_file = stream_path(stream_id)
    return [] unless File.exist?(stream_file)
    
    File.readlines(stream_file).map { |line| deserialize_event(line) }
  end

  private

  def stream_path(stream_id)
    File.join(@directory, "#{stream_id}.events")
  end

  def serialize_event(event)
    {
      type: event.class.name,
      data: event.instance_variables.each_with_object({}) do |var, hash|
        hash[var.to_s.delete('@')] = event.instance_variable_get(var)
      end
    }.to_json
  end

  def deserialize_event(json_line)
    data = JSON.parse(json_line)
    event_class = Object.const_get(data['type'])
    event_class.new(**data['data'].transform_keys(&:to_sym))
  end
end

Repository Pattern: A repository loads aggregates from the event store and saves uncommitted events back.

class AggregateRepository
  def initialize(event_store)
    @event_store = event_store
  end

  def load(aggregate_class, aggregate_id)
    events = @event_store.read_stream(aggregate_id)
    aggregate = aggregate_class.new(aggregate_id)
    aggregate.load_from_history(events)
    aggregate
  end

  def save(aggregate)
    expected_version = aggregate.version - aggregate.uncommitted_events.size
    @event_store.append_to_stream(
      aggregate.id,
      aggregate.uncommitted_events,
      expected_version
    )
  end
end

Ruby gems like rails_event_store and eventide provide production-ready implementations with features like subscriptions, projections, and multiple storage backends. These libraries handle serialization, versioning, and concurrent access patterns.

Practical Examples

Event sourcing applies to domains where state history provides value beyond the current snapshot.

Order Processing System: An e-commerce order progresses through multiple states. Event sourcing captures each transition with full context.

class OrderCreated < Event
  attr_reader :customer_id, :items, :total_amount
end

class PaymentReceived < Event
  attr_reader :payment_method, :transaction_id, :amount
end

class OrderShipped < Event
  attr_reader :shipping_carrier, :tracking_number
end

class Order
  attr_reader :id, :status, :items, :payment_status

  def initialize(id)
    @id = id
    @status = :draft
    @items = []
    @payment_status = :unpaid
  end

  def create(customer_id, items, total_amount)
    apply_new_event(
      OrderCreated.new(
        aggregate_id: @id,
        customer_id: customer_id,
        items: items,
        total_amount: total_amount
      )
    )
  end

  def receive_payment(payment_method, transaction_id, amount)
    raise "Order not created" if @status == :draft
    raise "Already paid" if @payment_status == :paid
    
    apply_new_event(
      PaymentReceived.new(
        aggregate_id: @id,
        payment_method: payment_method,
        transaction_id: transaction_id,
        amount: amount
      )
    )
  end

  def ship(carrier, tracking_number)
    raise "Payment required" unless @payment_status == :paid
    raise "Already shipped" if @status == :shipped
    
    apply_new_event(
      OrderShipped.new(
        aggregate_id: @id,
        shipping_carrier: carrier,
        tracking_number: tracking_number
      )
    )
  end

  private

  def apply_event(event)
    case event
    when OrderCreated
      @status = :pending_payment
      @items = event.items
    when PaymentReceived
      @payment_status = :paid
      @status = :paid
    when OrderShipped
      @status = :shipped
    end
  end
end

This implementation provides complete order history. Customer service can see exactly when payment arrived, which items were in the original order versus added later, and the full timeline of status changes.

Inventory Management: Track stock levels with full audit trails for compliance and debugging.

class StockReceived < Event
  attr_reader :product_id, :quantity, :supplier_id, :unit_cost
end

class StockReserved < Event
  attr_reader :order_id, :quantity
end

class StockReleased < Event
  attr_reader :order_id, :quantity, :reason
end

class InventoryItem
  attr_reader :product_id, :available, :reserved

  def initialize(product_id)
    @product_id = product_id
    @available = 0
    @reserved = 0
    @reservations = {}
  end

  def receive_stock(quantity, supplier_id, unit_cost)
    apply_new_event(
      StockReceived.new(
        aggregate_id: @product_id,
        product_id: @product_id,
        quantity: quantity,
        supplier_id: supplier_id,
        unit_cost: unit_cost
      )
    )
  end

  def reserve_for_order(order_id, quantity)
    raise "Insufficient stock" if @available < quantity
    
    apply_new_event(
      StockReserved.new(
        aggregate_id: @product_id,
        order_id: order_id,
        quantity: quantity
      )
    )
  end

  def release_reservation(order_id, reason)
    quantity = @reservations[order_id]
    raise "No reservation for order" unless quantity
    
    apply_new_event(
      StockReleased.new(
        aggregate_id: @product_id,
        order_id: order_id,
        quantity: quantity,
        reason: reason
      )
    )
  end

  private

  def apply_event(event)
    case event
    when StockReceived
      @available += event.quantity
    when StockReserved
      @available -= event.quantity
      @reserved += event.quantity
      @reservations[event.order_id] = event.quantity
    when StockReleased
      @reserved -= event.quantity
      @available += event.quantity
      @reservations.delete(event.order_id)
    end
  end
end

The event log answers questions traditional systems struggle with: "Who reserved this stock?", "When did inventory drop below threshold?", "What was the cost basis for historical shipments?"

Collaboration System: Document editing with full version history and conflict resolution.

class DocumentCreated < Event
  attr_reader :title, :created_by
end

class ContentUpdated < Event
  attr_reader :editor_id, :change_description, :new_content
end

class Document
  attr_reader :id, :title, :content, :editors

  def initialize(id)
    @id = id
    @content = ""
    @editors = Set.new
  end

  def create(title, created_by)
    apply_new_event(
      DocumentCreated.new(
        aggregate_id: @id,
        title: title,
        created_by: created_by
      )
    )
  end

  def update_content(editor_id, new_content, description)
    apply_new_event(
      ContentUpdated.new(
        aggregate_id: @id,
        editor_id: editor_id,
        change_description: description,
        new_content: new_content
      )
    )
  end

  private

  def apply_event(event)
    case event
    when DocumentCreated
      @title = event.title
      @editors << event.created_by
    when ContentUpdated
      @content = event.new_content
      @editors << event.editor_id
    end
  end
end

Event sourcing enables time travel through document versions, attribution of changes to specific editors, and diff generation between any two points in time.

Design Considerations

Event sourcing introduces architectural trade-offs that impact system design and operational characteristics.

When to Use Event Sourcing: Event sourcing fits domains where state transitions matter as much as current state. Financial systems, audit-heavy applications, and systems requiring temporal queries benefit from the pattern. If the business asks "how did we arrive at this state?" or "what was the state at specific time?", event sourcing provides natural answers. Systems requiring compliance audit trails, undo/redo functionality, or event-driven architectures align well.

Conversely, domains with primarily CRUD operations see limited benefit. A user profile with name and email address requires only current values. The overhead of event sourcing outweighs benefits unless profile history holds business value. Systems with complex queries across multiple aggregates struggle with event sourcing's aggregate-centric view.

Complexity Trade-offs: Event sourcing increases implementation complexity compared to CRUD. Developers write separate code paths for commands (write) and queries (read). The learning curve steepens for teams unfamiliar with CQRS and DDD concepts. Testing requires event sequence verification rather than simple state assertions.

The pattern simplifies other aspects: audit trails emerge naturally, concurrent modifications handle cleanly within aggregate boundaries, and debugging becomes easier with complete state history. The trade-off favors domains where these benefits justify the complexity cost.

Storage Requirements: Event stores grow indefinitely as events accumulate. A frequently-modified aggregate generates thousands of events over years. Storage space increases linearly with event volume. Systems need strategies for managing growth: archiving old events, implementing snapshots, or partitioning streams.

The append-only nature enables storage optimizations. Sequential writes perform well on spinning disks. File-based stores compress easily. Cloud object storage handles archival economically. The storage cost trades against the value of historical data.

Query Model Separation: Event sourcing works with CQRS to separate write and read models. Commands append events; queries read from projections built from events. This separation requires maintaining projections, handling eventual consistency, and synchronizing query models with event streams.

The separation enables query optimization independent of write model structure. Projections denormalize data for specific query patterns. Multiple projections from the same events serve different use cases. The read side scales independently from writes.

Eventual Consistency: Event sourcing embraces eventual consistency across aggregate boundaries. When one aggregate's event triggers updates to another aggregate, processing happens asynchronously. The system remains temporarily inconsistent during processing.

Applications need strategies for handling inconsistency: UI feedback for pending operations, retry logic for failed projections, and business rules that accept delays. Some domains tolerate eventual consistency naturally (analytics, notifications); others require careful design (financial transactions, inventory allocation).

Event Schema Evolution: Events persist for system lifetime, potentially years. Schema changes require migration strategies. Options include versioning events, transforming during read, or maintaining multiple event versions simultaneously. The chosen strategy impacts upgrade complexity and backward compatibility.

Careful initial event design reduces future changes. Including all relevant data in events avoids needing additional information later. Avoiding implementation details in events prevents coupling to specific implementations. The design needs foresight about potential schema changes.

Common Patterns

Several patterns combine with event sourcing to address specific challenges.

CQRS (Command Query Responsibility Segregation): CQRS separates write operations (commands) from read operations (queries). Commands modify aggregate state by appending events. Queries read from projections built from event streams. The pattern pairs naturally with event sourcing.

# Command side
class CreateOrderCommand
  attr_reader :order_id, :customer_id, :items

  def execute
    order = Order.new(@order_id)
    order.create(@customer_id, @items, calculate_total(@items))
    repository.save(order)
  end
end

# Query side - projection
class OrderSummaryProjection
  def handle(event)
    case event
    when OrderCreated
      create_summary(
        order_id: event.aggregate_id,
        customer_id: event.customer_id,
        status: 'pending',
        total: event.total_amount,
        created_at: event.timestamp
      )
    when OrderShipped
      update_summary(event.aggregate_id, status: 'shipped')
    end
  end

  private

  def create_summary(attributes)
    # Insert into read model database
    @database.insert(:order_summaries, attributes)
  end

  def update_summary(order_id, changes)
    # Update read model
    @database.update(:order_summaries, order_id, changes)
  end
end

CQRS enables independent optimization of reads and writes. Complex queries run against denormalized projections without impacting write performance. The read model scales separately from writes.

Snapshots: Replaying thousands of events on each aggregate load becomes expensive. Snapshots capture aggregate state at specific versions, allowing replay from snapshot rather than beginning.

class SnapshotStore
  def save_snapshot(aggregate_id, state, version)
    snapshot = {
      aggregate_id: aggregate_id,
      state: serialize(state),
      version: version,
      created_at: Time.now
    }
    @database.insert(:snapshots, snapshot)
  end

  def load_snapshot(aggregate_id)
    row = @database.query(
      "SELECT * FROM snapshots WHERE aggregate_id = ? 
       ORDER BY version DESC LIMIT 1",
      aggregate_id
    ).first
    
    return nil unless row
    
    {
      state: deserialize(row['state']),
      version: row['version']
    }
  end
end

class SnapshotRepository
  def load(aggregate_class, aggregate_id)
    snapshot = @snapshot_store.load_snapshot(aggregate_id)
    
    if snapshot
      aggregate = aggregate_class.restore_from_snapshot(
        aggregate_id,
        snapshot[:state]
      )
      start_version = snapshot[:version]
    else
      aggregate = aggregate_class.new(aggregate_id)
      start_version = 0
    end
    
    events = @event_store.read_stream(aggregate_id, from_version: start_version)
    aggregate.load_from_history(events)
    aggregate
  end

  def save(aggregate)
    super(aggregate)
    
    if should_snapshot?(aggregate)
      @snapshot_store.save_snapshot(
        aggregate.id,
        aggregate.state,
        aggregate.version
      )
    end
  end

  private

  def should_snapshot?(aggregate)
    aggregate.version % 100 == 0  # Snapshot every 100 events
  end
end

Snapshot frequency balances storage cost against load performance. Snapshotting every N events limits replay to N events maximum. Applications tune N based on event size and load patterns.

Process Managers (Sagas): Coordinating multiple aggregates requires process managers that listen to events and issue commands. A payment process manager listens for OrderCreated events, initiates payment processing, and issues OrderPaid commands on success.

class PaymentProcessManager
  def handle(event)
    case event
    when OrderCreated
      process = PaymentProcess.new(
        order_id: event.aggregate_id,
        amount: event.total_amount
      )
      process.start
      save_process(process)
      
      initiate_payment(
        order_id: event.aggregate_id,
        amount: event.total_amount
      )
      
    when PaymentAuthorized
      process = load_process(event.order_id)
      process.mark_authorized
      save_process(process)
      
      command = MarkOrderPaidCommand.new(order_id: event.order_id)
      command.execute
      
    when PaymentFailed
      process = load_process(event.order_id)
      process.mark_failed
      save_process(process)
      
      command = CancelOrderCommand.new(
        order_id: event.order_id,
        reason: "Payment failed"
      )
      command.execute
    end
  end
end

Process managers maintain their own state as events, making them resumable after failures. The manager tracks which steps completed and which remain, handling retries and compensation.

Event Enrichment: Some events need additional context not available in the aggregate. An enricher subscribes to events, adds contextual data, and publishes enriched events.

class OrderEventEnricher
  def handle(event)
    return unless event.is_a?(OrderShipped)
    
    customer = customer_service.find(event.customer_id)
    enriched = OrderShippedEnriched.new(
      aggregate_id: event.aggregate_id,
      shipping_carrier: event.shipping_carrier,
      tracking_number: event.tracking_number,
      customer_email: customer.email,
      customer_name: customer.name,
      metadata: event.metadata
    )
    
    event_bus.publish(enriched)
  end
end

Enrichment decouples aggregates from external data requirements. The original event remains minimal. Downstream projections consume enriched events with all needed context.

Performance Considerations

Event sourcing's performance characteristics differ from traditional CRUD systems, with specific optimization opportunities and bottlenecks.

Write Performance: Appending events performs faster than updating state. Append-only storage uses sequential I/O, avoiding random seeks. No indexes need updating during writes. File-based event stores achieve high throughput with simple appends.

Optimistic concurrency introduces retry overhead. Applications retrying commands after version conflicts decrease effective throughput. Aggregate granularity impacts conflict rates. Finer-grained aggregates reduce conflicts; coarser aggregates increase conflict probability.

# Measure append throughput
require 'benchmark'

event_store = FileEventStore.new('./events')
events_per_aggregate = 1000
aggregate_count = 100

time = Benchmark.realtime do
  aggregate_count.times do |i|
    events = (1..events_per_aggregate).map do |j|
      MoneyDeposited.new(aggregate_id: "account-#{i}", amount: 100)
    end
    event_store.append_to_stream("account-#{i}", events, 0)
  end
end

total_events = aggregate_count * events_per_aggregate
puts "Appended #{total_events} events in #{time.round(2)}s"
puts "Throughput: #{(total_events / time).round} events/second"

Read Performance: Loading aggregates requires replaying all events. Long-lived aggregates with thousands of events experience slow loads. Snapshots mitigate this by limiting replay to recent events.

# Load performance with and without snapshots
require 'benchmark'

aggregate_id = 'account-1'
event_count = 10_000

# Without snapshot
time_without = Benchmark.realtime do
  events = event_store.read_stream(aggregate_id)
  account = BankAccount.new(aggregate_id)
  account.load_from_history(events)
end

# With snapshot (every 1000 events)
time_with = Benchmark.realtime do
  snapshot = snapshot_store.load_snapshot(aggregate_id)
  events = event_store.read_stream(
    aggregate_id,
    from_version: snapshot[:version]
  )
  account = BankAccount.restore_from_snapshot(
    aggregate_id,
    snapshot[:state]
  )
  account.load_from_history(events)
end

puts "Load without snapshot: #{(time_without * 1000).round}ms"
puts "Load with snapshot: #{(time_with * 1000).round}ms"
puts "Speedup: #{(time_without / time_with).round(1)}x"

Projection performance depends on event volume and projection complexity. Simple projections handle high throughput. Complex projections with multiple database queries per event become bottlenecks. Batch processing events improves projection throughput.

Storage Optimization: Event stores compress well due to structured, repetitive data. JSON events with repeated field names compress significantly. Binary formats like Protocol Buffers reduce storage further.

# Event size comparison
require 'json'
require 'zlib'

event = MoneyDeposited.new(
  aggregate_id: 'account-123',
  amount: 100,
  timestamp: Time.now
)

json = serialize_event(event)
compressed = Zlib::Deflate.deflate(json)

puts "JSON size: #{json.bytesize} bytes"
puts "Compressed size: #{compressed.bytesize} bytes"
puts "Compression ratio: #{(json.bytesize.to_f / compressed.bytesize).round(1)}x"

Archiving old events to cold storage reduces costs. Active events stay in fast storage; historical events move to object storage. Queries spanning archived events fetch from multiple storage tiers.

Scaling Patterns: Event stores partition by aggregate ID. Hash-based partitioning distributes aggregates across shards. Each shard handles independent streams, enabling horizontal scaling.

class PartitionedEventStore
  def initialize(partitions)
    @partitions = partitions
    @stores = @partitions.times.map { |i| FileEventStore.new("./partition-#{i}") }
  end

  def append_to_stream(stream_id, events, expected_version)
    partition = partition_for_stream(stream_id)
    @stores[partition].append_to_stream(stream_id, events, expected_version)
  end

  def read_stream(stream_id)
    partition = partition_for_stream(stream_id)
    @stores[partition].read_stream(stream_id)
  end

  private

  def partition_for_stream(stream_id)
    stream_id.hash.abs % @partitions
  end
end

Projection scaling uses competing consumers. Multiple instances subscribe to events, each processing a subset of streams. Consumer groups coordinate to avoid processing same events twice.

Reference

Core Event Sourcing Concepts

Concept Description Purpose
Event Immutable record of state change Captures what happened in past tense
Event Store Append-only log of events Primary source of truth for state
Aggregate Consistency boundary containing related state Ensures state changes remain consistent
Stream Ordered sequence of events for one aggregate Maintains event ordering per aggregate
Projection Denormalized view built from events Optimizes queries for specific use cases
Snapshot Cached aggregate state at specific version Reduces event replay time

Event Store Operations

Operation Description Concurrency
Append Add events to stream end Optimistic with version check
Read Stream Get all events for aggregate No locking required
Read All Get events across all streams Read-only, no locks
Subscribe Receive events as published Multiple subscribers allowed

Aggregate Lifecycle Methods

Method Purpose When Called
command methods Accept business operations User or system initiates action
apply_new_event Apply and queue uncommitted event Inside command methods
apply_event Update state from event During replay and new events
load_from_history Rebuild state from events When loading from store
uncommitted_events Return events not yet persisted Before saving to store

Event Design Guidelines

Guideline Rationale Example
Past tense naming Events represent completed facts OrderShipped not ShipOrder
Include all data Avoid querying during replay Include item details not just IDs
Immutable structure Events never change Use new event for corrections
Version events Support schema evolution Add version field or use inheritance

Snapshot Strategy Options

Strategy When to Snapshot Trade-offs
Fixed Interval Every N events Simple but may snapshot unnecessarily
Time-based Every N hours Predictable storage cost
Load-triggered After reading many events Adapts to usage patterns
Size-based When stream exceeds size Controls replay time directly

Common Event Patterns

Pattern Description Use Case
Created/Updated/Deleted Standard lifecycle Most domain entities
Requested/Approved/Rejected Approval workflow Multi-step processes
Started/Completed/Failed Async operations Background jobs, external calls
Reserved/Confirmed/Cancelled Two-phase operations Inventory, bookings

CQRS Projection Patterns

Pattern Description Consistency
Inline Projection Update during event processing Immediate but couples write/read
Async Projection Subscribe and update separately Eventual consistency
Batch Projection Process events in batches Higher latency, better throughput
Multiple Projections Several views from same events Each optimized for specific queries

Version Conflict Resolution

Strategy When to Use Implementation
Retry Conflicts rare Reload, reapply, retry append
Merge Concurrent edits compatible Merge changes, append both
Last Write Wins Order irrelevant Accept conflict, overwrite
User Resolution Cannot merge automatically Return conflict to user

Storage Backend Options

Backend Characteristics Trade-offs
File System Simple, fast sequential I/O Limited scalability
SQL Database ACID guarantees, familiar Write amplification with indexes
NoSQL Database Horizontal scaling Consistency guarantees vary
Object Storage Low cost, unlimited capacity Higher latency
Specialized Store Optimized for events Additional dependency

Event Serialization Formats

Format Advantages Disadvantages
JSON Human-readable, flexible schema Larger size, slower parsing
MessagePack Compact binary, fast Not human-readable
Protocol Buffers Strongly typed, efficient Requires schema definition
Avro Schema evolution support Complexity overhead

Monitoring Metrics

Metric What to Track Why Important
Append Latency Time to write events Indicates write performance
Replay Time Time to load aggregate User-facing latency
Stream Length Events per aggregate Snapshot decision criteria
Projection Lag Events behind current Query freshness indicator
Version Conflicts Failed append count Concurrency issues
Storage Growth Events per day Capacity planning