CrackedRuby CrackedRuby

Overview

Command Query Responsibility Segregation (CQRS) separates the responsibility for handling commands (writes) from queries (reads) in a system. Instead of using a single model to handle both operations, CQRS employs separate models optimized for their specific purposes. Commands modify state and return no data, while queries return data without modifying state.

The pattern emerged from Bertrand Meyer's Command-Query Separation (CQS) principle, which states that methods should either change state or return data, never both. CQRS extends this principle from the method level to the architectural level, creating separate paths through the application for commands and queries.

Traditional CRUD applications use the same model for reads and writes, leading to compromises. A single model must satisfy conflicting requirements: write operations need validation and business logic, while read operations need performance and specific data shapes. CQRS eliminates these compromises by allowing each side to evolve independently.

# Traditional approach - single model handles everything
class User < ApplicationRecord
  validates :email, presence: true
  
  def full_profile
    # Complex query for displaying profile
  end
  
  def update_preferences(prefs)
    # Complex write with validation
  end
end

# CQRS approach - separate concerns
class CreateUserCommand
  def execute(data)
    # Handle write with validation
  end
end

class UserProfileQuery
  def fetch(user_id)
    # Optimized read without write concerns
  end
end

The pattern proves particularly valuable in complex domains where read and write patterns differ significantly, where scalability requirements vary between operations, or where business logic complexity makes a unified model difficult to maintain.

Key Principles

CQRS operates on distinct separation between commands and queries, each with specific characteristics and responsibilities. Commands represent intentions to change state, carrying all necessary information to perform an operation. They return no data beyond success or failure indicators. Queries retrieve data without side effects, returning information in forms optimized for consumption.

The command side enforces business rules and maintains consistency. Commands flow through a command bus or handler that validates, processes, and applies changes. The system may reject commands that violate invariants, providing feedback about constraint violations. Command handlers interact with domain models that encapsulate business logic and ensure valid state transitions.

The query side optimizes for read performance and specific use cases. Queries access read models designed for particular views or reports, avoiding navigation through complex object graphs. Read models contain denormalized data shaped for presentation, eliminating joins and computations at query time. The system may maintain multiple read models from the same underlying data, each optimized for different scenarios.

# Command - changes state, returns nothing meaningful
class RegisterUser
  attr_reader :email, :name
  
  def initialize(email:, name:)
    @email = email
    @name = name
  end
end

class RegisterUserHandler
  def call(command)
    User.create!(
      email: command.email,
      name: command.name
    )
    # Returns success/failure, not data
  end
end

# Query - retrieves data, no side effects
class UserDashboardQuery
  def execute(user_id)
    # Returns denormalized data optimized for display
    {
      user: user_data(user_id),
      recent_orders: recent_orders(user_id),
      recommendations: recommendations(user_id)
    }
  end
  
  private
  
  def user_data(user_id)
    ReadModels::UserSummary.find(user_id)
  end
end

Data flow differs between sides. Commands typically follow a synchronous path where the system validates input, applies changes, and returns status. The system may publish events after successful command processing, notifying interested components of state changes. Queries access read models directly without triggering business logic or modifying state.

Eventual consistency often emerges in CQRS implementations. Write models and read models may temporarily diverge as the system propagates changes. The command side updates the write model immediately, while read models update asynchronously through event handlers or scheduled processes. Applications must accommodate this delay, typically through UI feedback or explicit refresh mechanisms.

The separation enables independent scaling. Read-heavy applications can scale the query side with caching, replicas, and optimized data stores while maintaining a simpler write side. Write-heavy applications can optimize command processing without burdening it with complex read requirements. Each side can use appropriate technologies and infrastructure.

# Command and query use different stores
class CommandSide
  def write_model
    # Uses transactional database
    @write_model ||= PostgresConnection.new
  end
end

class QuerySide
  def read_model
    # Uses optimized read store
    @read_model ||= RedisConnection.new
  end
end

Design Considerations

CQRS introduces complexity that must justify its benefits. The pattern suits complex domains where business logic significantly differs between reading and writing, where read and write patterns have different scaling requirements, or where multiple views of the same data serve different purposes. Simple CRUD applications gain little from CQRS and suffer from unnecessary complexity.

Eventual consistency presents the primary challenge. Users must tolerate delays between writing data and seeing updates in queries. Applications must communicate this delay clearly, perhaps showing loading states or stale data indicators. Critical operations may require synchronous updates or compensating mechanisms to maintain user trust.

# Handle eventual consistency in UI
class OrderController < ApplicationController
  def create
    command = CreateOrder.new(order_params)
    command_bus.execute(command)
    
    # Don't immediately query - may not be updated
    flash[:notice] = "Order submitted. Processing..."
    redirect_to orders_path(status: :processing)
  end
  
  def show
    query = OrderQuery.new
    @order = query.find(params[:id])
    
    # Indicate if data may be stale
    @stale = @order.updated_at < 5.seconds.ago
  end
end

Data duplication increases storage requirements. Read models replicate information from write models, formatted for specific views. Multiple read models multiply this duplication. Storage costs must be weighed against performance benefits. Stale read models add another dimension, requiring strategies to detect and rebuild corrupted or outdated data.

Operational complexity grows with separate models. Debugging requires tracing through command handlers, event publishers, event handlers, and read model updates. Deployment must coordinate schema changes across models. Monitoring must track command processing, event delivery, and read model freshness. Teams need expertise in distributed system patterns.

# Monitoring eventual consistency lag
class EventProcessingMonitor
  def check_lag
    last_command = CommandLog.maximum(:created_at)
    last_processed = EventProcessor.maximum(:processed_at)
    
    lag_seconds = last_command - last_processed
    alert if lag_seconds > threshold
  end
end

The pattern fits naturally with event sourcing but does not require it. Event sourcing stores state changes as events rather than current state, providing an audit trail and enabling temporal queries. CQRS and event sourcing complement each other: commands generate events, events update read models. However, CQRS works with traditional persistence, updating both write and read models through conventional means.

Domain complexity drives CQRS adoption. Systems with rich business rules, multiple user roles with different views, or complex validation workflows benefit from separated concerns. E-commerce platforms handle order placement differently than order history queries. Financial systems validate transactions strictly while supporting various reporting needs. Collaborative systems track changes meticulously while presenting current state simply.

Testing becomes more involved. Command handlers need unit tests verifying business logic and integration tests confirming persistence. Event handlers require tests ensuring correct read model updates. End-to-end tests must verify eventual consistency paths. Test infrastructure must support asynchronous processing and multiple data stores.

# Testing command and query independently
RSpec.describe RegisterUserHandler do
  it "creates user with valid data" do
    command = RegisterUser.new(email: "test@example.com")
    handler.call(command)
    
    expect(User.exists?(email: "test@example.com")).to be true
  end
end

RSpec.describe UserQuery do
  it "returns user summary" do
    create_user_in_read_model(id: 1)
    
    result = query.execute(1)
    expect(result[:email]).to eq("test@example.com")
  end
end

Technology choices differ between sides. Write models use databases emphasizing consistency and transactions. Read models prioritize query performance, potentially using document stores, caching layers, or search engines. The query side might employ Elasticsearch for full-text search or Redis for high-speed access while the command side uses PostgreSQL for ACID guarantees.

Ruby Implementation

Ruby implements CQRS through command objects, handler classes, and query objects. Commands encapsulate operations as plain Ruby objects, handlers process commands and modify state, and queries retrieve data from optimized read models. Several gems provide infrastructure, though many teams build custom implementations tailored to their needs.

Command objects represent intentions with explicit attributes. They validate inputs and carry all information needed to perform operations. Commands remain immutable after creation, preventing accidental modification during processing.

class CreateOrder
  include ActiveModel::Model
  include ActiveModel::Validations
  
  attr_accessor :user_id, :items, :shipping_address
  
  validates :user_id, presence: true
  validates :items, presence: true
  validate :items_available
  
  def items_available
    items.each do |item|
      unless Inventory.available?(item[:product_id], item[:quantity])
        errors.add(:items, "#{item[:product_id]} not available")
      end
    end
  end
end

Command handlers execute business logic. They receive validated commands, interact with domain models, persist changes, and publish events. Handlers remain focused on a single command type, simplifying testing and maintenance.

class CreateOrderHandler
  def initialize(repository: OrderRepository.new,
                 event_bus: EventBus.instance)
    @repository = repository
    @event_bus = event_bus
  end
  
  def call(command)
    order = Order.new(
      user_id: command.user_id,
      items: command.items,
      shipping_address: command.shipping_address
    )
    
    order.calculate_total
    order.reserve_inventory
    
    @repository.save(order)
    
    @event_bus.publish(OrderCreated.new(
      order_id: order.id,
      user_id: order.user_id,
      total: order.total
    ))
    
    Result.success(order.id)
  rescue => e
    Result.failure(e.message)
  end
end

A command bus routes commands to appropriate handlers. It provides a single entry point for all commands, centralizing cross-cutting concerns like logging, authorization, and transaction management.

class CommandBus
  def initialize
    @handlers = {}
  end
  
  def register(command_class, handler)
    @handlers[command_class] = handler
  end
  
  def execute(command)
    return failure("Invalid command") unless command.valid?
    
    handler = @handlers[command.class]
    return failure("No handler") unless handler
    
    ApplicationRecord.transaction do
      handler.call(command)
    end
  end
  
  private
  
  def failure(message)
    Result.failure(message)
  end
end

# Configuration
bus = CommandBus.new
bus.register(CreateOrder, CreateOrderHandler.new)
bus.register(CancelOrder, CancelOrderHandler.new)

Query objects encapsulate read operations. They access read models directly, apply filtering and sorting, and return data formatted for consumption. Queries avoid business logic, focusing purely on data retrieval.

class OrderHistoryQuery
  def initialize(connection: ReadDatabase.connection)
    @connection = connection
  end
  
  def execute(user_id:, limit: 50, offset: 0)
    @connection.exec_params(<<-SQL, [user_id, limit, offset])
      SELECT 
        o.id,
        o.created_at,
        o.total,
        o.status,
        json_agg(
          json_build_object(
            'product_name', oi.product_name,
            'quantity', oi.quantity,
            'price', oi.price
          )
        ) as items
      FROM orders_read_model o
      JOIN order_items_read_model oi ON oi.order_id = o.id
      WHERE o.user_id = $1
      GROUP BY o.id
      ORDER BY o.created_at DESC
      LIMIT $2 OFFSET $3
    SQL
  end
end

Event handlers update read models asynchronously. They subscribe to domain events published after command processing, transforming events into read model updates. Ruby applications often use Sidekiq or other background job processors for this purpose.

class OrderCreatedHandler
  def call(event)
    ReadModels::OrderSummary.create(
      id: event.order_id,
      user_id: event.user_id,
      total: event.total,
      status: 'pending',
      created_at: event.occurred_at
    )
    
    UserStatistics.increment_order_count(event.user_id)
  end
end

# Event subscription
EventBus.subscribe(OrderCreated, OrderCreatedHandler.new)

Rails applications integrate CQRS through service objects and concerns. Controllers dispatch commands through the command bus and queries through query objects, remaining thin orchestration layers.

class OrdersController < ApplicationController
  def create
    command = CreateOrder.new(order_params)
    
    result = command_bus.execute(command)
    
    if result.success?
      render json: { order_id: result.value }, status: :created
    else
      render json: { errors: result.error }, status: :unprocessable_entity
    end
  end
  
  def index
    query = OrderHistoryQuery.new
    orders = query.execute(
      user_id: current_user.id,
      limit: params[:limit],
      offset: params[:offset]
    )
    
    render json: orders
  end
  
  private
  
  def command_bus
    @command_bus ||= CommandBus.instance
  end
end

The arkency/command_bus gem provides a lightweight command bus implementation. It handles command routing, validation, and result handling with minimal configuration.

# Gemfile
gem 'arkency-command_bus'

# Setup
command_bus = Arkency::CommandBus.new
command_bus.register(CreateOrder, CreateOrderHandler.new)

# Usage
result = command_bus.call(CreateOrder.new(order_params))

RailsEventStore offers comprehensive event sourcing and CQRS infrastructure. It stores events, manages subscriptions, and provides projection capabilities for building read models.

# Gemfile
gem 'rails_event_store'

# Configuration
Rails.application.configure do
  config.to_prepare do
    Rails.configuration.event_store = RailsEventStore::Client.new
  end
end

# Publishing events
event_store = Rails.configuration.event_store
event = OrderCreated.new(data: {
  order_id: order.id,
  user_id: user.id
})
event_store.publish(event, stream_name: "Order$#{order.id}")

# Subscribing to events
event_store.subscribe(OrderCreatedHandler.new, to: [OrderCreated])

Read model projections transform event streams into queryable views. Projections replay events to build initial state and process new events as they arrive.

class OrderSummaryProjection
  def initialize(event_store)
    @event_store = event_store
  end
  
  def rebuild
    ReadModels::OrderSummary.delete_all
    
    @event_store.read.each do |event|
      handle(event)
    end
  end
  
  def handle(event)
    case event
    when OrderCreated
      handle_order_created(event)
    when OrderShipped
      handle_order_shipped(event)
    when OrderCancelled
      handle_order_cancelled(event)
    end
  end
  
  private
  
  def handle_order_created(event)
    ReadModels::OrderSummary.create(
      id: event.data[:order_id],
      user_id: event.data[:user_id],
      status: 'created'
    )
  end
  
  def handle_order_shipped(event)
    summary = ReadModels::OrderSummary.find(event.data[:order_id])
    summary.update(status: 'shipped', shipped_at: event.metadata[:timestamp])
  end
end

Implementation Approaches

Simple CQRS maintains a single database with separate command and query paths. Commands update the write model through business logic. Queries read from the same database using optimized queries or denormalized tables. This approach provides CQRS benefits without operational complexity.

# Single database, separate paths
class CommandSide
  def create_order(command)
    order = Order.create!(command.attributes)
    OrderSummary.create_from_order(order)  # Denormalized read model
    order.id
  end
end

class QuerySide
  def order_summary(order_id)
    OrderSummary.find(order_id)  # Read from denormalized table
  end
end

The read model exists as denormalized tables or materialized views in the same database. Database triggers or application code keep read models synchronized with write models. This approach minimizes operational overhead while providing query optimization.

Separate database CQRS uses distinct databases for commands and queries. The write database optimizes for consistency and transactions. The read database optimizes for query performance, perhaps using different technology altogether. Changes propagate from write to read database through events or replication.

class DatabaseConfiguration
  def self.write_db
    ActiveRecord::Base.establish_connection(
      adapter: 'postgresql',
      database: 'app_write',
      host: 'write-db.example.com'
    )
  end
  
  def self.read_db
    ActiveRecord::Base.establish_connection(
      adapter: 'postgresql',
      database: 'app_read',
      host: 'read-db.example.com',
      replica: true
    )
  end
end

class WriteModel < ActiveRecord::Base
  establish_connection :write_db
end

class ReadModel < ActiveRecord::Base
  establish_connection :read_db
end

Event-driven CQRS publishes events after command processing. Event handlers update read models asynchronously. This approach provides loose coupling and scales better but introduces eventual consistency. Background job processors handle event delivery and retry failed updates.

class EventDrivenCommandHandler
  def call(command)
    result = nil
    
    ActiveRecord::Base.transaction do
      result = process_command(command)
      publish_events(result.events)
    end
    
    result
  end
  
  private
  
  def publish_events(events)
    events.each do |event|
      EventPublisher.publish_async(event)
    end
  end
end

class EventPublisher
  def self.publish_async(event)
    UpdateReadModelJob.perform_later(event.to_json)
  end
end

class UpdateReadModelJob < ApplicationJob
  queue_as :events
  
  def perform(event_json)
    event = JSON.parse(event_json)
    handler = EventHandlerRegistry.handler_for(event['type'])
    handler.call(event)
  end
end

Multiple read models serve different query needs. One read model might optimize for list views, another for detailed displays, and a third for reporting. Each subscribes to relevant events and maintains its own optimized structure.

# List view read model - minimal data
class OrderListReadModel
  def self.handle_order_created(event)
    create(
      order_id: event[:order_id],
      created_at: event[:created_at],
      total: event[:total],
      status: event[:status]
    )
  end
end

# Detail view read model - comprehensive data
class OrderDetailReadModel
  def self.handle_order_created(event)
    create(
      order_id: event[:order_id],
      user: event[:user_data],
      items: event[:items],
      shipping_address: event[:shipping_address],
      billing_address: event[:billing_address],
      total: event[:total],
      status: event[:status]
    )
  end
end

# Analytics read model - aggregated data
class OrderAnalyticsReadModel
  def self.handle_order_created(event)
    date = event[:created_at].to_date
    
    increment_counter(date, :order_count)
    increment_counter(date, :revenue, event[:total])
  end
end

Event sourcing stores all state changes as events rather than current state. Commands generate events, which persist to an event store. Read models rebuild from event streams. This approach provides complete audit trails and temporal queries but adds complexity.

class EventSourcedOrder
  attr_reader :id, :events
  
  def initialize(id)
    @id = id
    @events = []
  end
  
  def create(user_id:, items:)
    apply OrderCreated.new(
      order_id: @id,
      user_id: user_id,
      items: items
    )
  end
  
  def ship(tracking_number)
    raise "Cannot ship cancelled order" if @cancelled
    
    apply OrderShipped.new(
      order_id: @id,
      tracking_number: tracking_number
    )
  end
  
  def apply(event)
    @events << event
    
    case event
    when OrderCreated
      @user_id = event.user_id
      @items = event.items
    when OrderShipped
      @shipped = true
    when OrderCancelled
      @cancelled = true
    end
  end
  
  def self.rebuild_from_events(events)
    order = new(events.first.order_id)
    events.each { |event| order.apply(event) }
    order
  end
end

Saga patterns coordinate long-running processes across multiple aggregates. A saga listens for events and issues commands in response, orchestrating complex workflows. CQRS facilitates sagas by providing clear command and event boundaries.

class OrderFulfillmentSaga
  def initialize(command_bus)
    @command_bus = command_bus
  end
  
  def handle_order_created(event)
    @command_bus.execute(
      ReserveInventory.new(
        order_id: event.order_id,
        items: event.items
      )
    )
  end
  
  def handle_inventory_reserved(event)
    @command_bus.execute(
      ProcessPayment.new(
        order_id: event.order_id,
        amount: event.amount
      )
    )
  end
  
  def handle_payment_processed(event)
    @command_bus.execute(
      ShipOrder.new(order_id: event.order_id)
    )
  end
  
  def handle_payment_failed(event)
    @command_bus.execute(
      ReleaseInventory.new(order_id: event.order_id)
    )
    
    @command_bus.execute(
      CancelOrder.new(
        order_id: event.order_id,
        reason: 'Payment failed'
      )
    )
  end
end

Practical Examples

An e-commerce platform demonstrates CQRS separating order processing from order history queries. Customers place orders through commands that validate inventory, calculate totals, and reserve stock. The system publishes events after successful order creation. Order history queries access read models optimized for display without navigating complex relationships.

# Command for creating orders
class PlaceOrder
  include ActiveModel::Model
  
  attr_accessor :customer_id, :items, :shipping_address, :billing_address
  
  validates :customer_id, :items, :shipping_address, :billing_address,
            presence: true
  validate :items_in_stock
  
  def items_in_stock
    items.each do |item|
      available = Inventory.check_availability(
        item[:product_id],
        item[:quantity]
      )
      
      unless available
        errors.add(:items, "#{item[:product_id]} out of stock")
      end
    end
  end
end

# Command handler
class PlaceOrderHandler
  def initialize(inventory:, pricing:, repository:)
    @inventory = inventory
    @pricing = pricing
    @repository = repository
  end
  
  def call(command)
    # Calculate order total
    total = @pricing.calculate_total(command.items)
    
    # Reserve inventory
    reservation_id = @inventory.reserve(command.items)
    
    # Create order
    order = Order.new(
      customer_id: command.customer_id,
      items: command.items,
      shipping_address: command.shipping_address,
      billing_address: command.billing_address,
      total: total,
      reservation_id: reservation_id
    )
    
    @repository.save(order)
    
    # Publish event
    EventBus.publish(OrderPlaced.new(
      order_id: order.id,
      customer_id: order.customer_id,
      items: order.items,
      total: order.total,
      placed_at: Time.current
    ))
    
    order.id
  rescue => e
    @inventory.release(reservation_id) if reservation_id
    raise
  end
end

# Event handler updates read model
class OrderPlacedHandler
  def call(event)
    OrderReadModel.create!(
      id: event.order_id,
      customer_id: event.customer_id,
      item_count: event.items.size,
      total: event.total,
      status: 'pending',
      placed_at: event.placed_at
    )
    
    event.items.each do |item|
      OrderItemReadModel.create!(
        order_id: event.order_id,
        product_id: item[:product_id],
        product_name: item[:product_name],
        quantity: item[:quantity],
        price: item[:price]
      )
    end
  end
end

# Query for order history
class CustomerOrderHistoryQuery
  def execute(customer_id, page: 1, per_page: 20)
    OrderReadModel
      .where(customer_id: customer_id)
      .includes(:order_items)
      .order(placed_at: :desc)
      .page(page)
      .per(per_page)
      .map do |order|
        {
          id: order.id,
          placed_at: order.placed_at,
          item_count: order.item_count,
          total: order.total,
          status: order.status,
          items: order.order_items.map do |item|
            {
              product_name: item.product_name,
              quantity: item.quantity,
              price: item.price
            }
          end
        }
      end
  end
end

A collaborative document editing system uses CQRS to handle concurrent edits and display current document state. Commands represent edit operations, validated against current version. The system publishes events for each accepted edit. Read models maintain the current document text optimized for rendering, updated through event handlers that apply operations.

# Edit command
class ApplyDocumentEdit
  attr_reader :document_id, :user_id, :operation, :version
  
  def initialize(document_id:, user_id:, operation:, version:)
    @document_id = document_id
    @user_id = user_id
    @operation = operation  # e.g., { type: 'insert', position: 10, text: 'hello' }
    @version = version
  end
end

# Command handler with conflict detection
class ApplyDocumentEditHandler
  def call(command)
    document = DocumentWriteModel.find(command.document_id)
    
    # Check version for optimistic locking
    if document.version != command.version
      return Result.failure('Version conflict - document changed')
    end
    
    # Validate operation
    unless valid_operation?(command.operation, document)
      return Result.failure('Invalid operation')
    end
    
    # Record edit
    edit = DocumentEdit.create!(
      document_id: command.document_id,
      user_id: command.user_id,
      operation: command.operation,
      version: document.version + 1
    )
    
    document.update!(version: edit.version)
    
    # Publish event
    EventBus.publish(DocumentEdited.new(
      document_id: command.document_id,
      user_id: command.user_id,
      operation: command.operation,
      version: edit.version,
      timestamp: edit.created_at
    ))
    
    Result.success(edit.version)
  end
end

# Read model for current document state
class DocumentStateReadModel < ApplicationRecord
  def self.handle_document_edited(event)
    doc = find_or_create_by(document_id: event.document_id)
    
    # Apply operation to current text
    doc.current_text = apply_operation(
      doc.current_text,
      event.operation
    )
    
    doc.version = event.version
    doc.last_edited_by = event.user_id
    doc.last_edited_at = event.timestamp
    doc.save!
  end
  
  def self.apply_operation(text, operation)
    case operation['type']
    when 'insert'
      text.insert(operation['position'], operation['text'])
    when 'delete'
      text.slice!(operation['position'], operation['length'])
    when 'replace'
      text[operation['position'], operation['length']] = operation['text']
    end
    text
  end
end

# Query for document display
class DocumentDisplayQuery
  def execute(document_id)
    doc = DocumentStateReadModel.find_by(document_id: document_id)
    
    {
      id: doc.document_id,
      text: doc.current_text,
      version: doc.version,
      last_edited_by: doc.last_edited_by,
      last_edited_at: doc.last_edited_at,
      editors: recent_editors(document_id)
    }
  end
  
  private
  
  def recent_editors(document_id)
    DocumentEdit
      .where(document_id: document_id)
      .where('created_at > ?', 1.hour.ago)
      .select(:user_id)
      .distinct
      .limit(10)
  end
end

A financial trading system demonstrates CQRS separating trade execution from position queries. Trade commands undergo strict validation before execution, checking account balances, position limits, and market conditions. The system records each trade immediately in the write model. Position queries aggregate trades into current holdings, profit/loss, and margin calculations, updated asynchronously from trade events.

# Trade execution command
class ExecuteTrade
  attr_reader :account_id, :symbol, :quantity, :price, :side
  
  validates :account_id, :symbol, :quantity, :price, :side, presence: true
  validates :side, inclusion: { in: %w[buy sell] }
  validates :quantity, numericality: { greater_than: 0 }
  validate :sufficient_funds, if: -> { side == 'buy' }
  validate :sufficient_position, if: -> { side == 'sell' }
  
  def sufficient_funds
    required = quantity * price
    available = Account.find(account_id).available_funds
    
    errors.add(:quantity, 'Insufficient funds') if required > available
  end
  
  def sufficient_position
    current = Position.find_by(account_id: account_id, symbol: symbol)&.quantity || 0
    
    errors.add(:quantity, 'Insufficient position') if quantity > current
  end
end

# Trade handler
class ExecuteTradeHandler
  def call(command)
    ActiveRecord::Base.transaction do
      trade = Trade.create!(
        account_id: command.account_id,
        symbol: command.symbol,
        quantity: command.quantity,
        price: command.price,
        side: command.side,
        executed_at: Time.current
      )
      
      # Update account balance
      Account.find(command.account_id).update_balance_for_trade(trade)
      
      # Publish event
      EventBus.publish(TradeExecuted.new(
        trade_id: trade.id,
        account_id: trade.account_id,
        symbol: trade.symbol,
        quantity: trade.quantity,
        price: trade.price,
        side: trade.side,
        executed_at: trade.executed_at
      ))
      
      trade.id
    end
  end
end

# Read model for position summary
class PositionSummaryReadModel < ApplicationRecord
  def self.handle_trade_executed(event)
    position = find_or_initialize_by(
      account_id: event.account_id,
      symbol: event.symbol
    )
    
    case event.side
    when 'buy'
      position.quantity += event.quantity
      position.total_cost += event.quantity * event.price
    when 'sell'
      position.quantity -= event.quantity
      realized_pnl = event.quantity * (event.price - position.average_cost)
      position.realized_pnl += realized_pnl
    end
    
    position.average_cost = position.total_cost / position.quantity if position.quantity > 0
    position.last_trade_at = event.executed_at
    position.save!
  end
end

# Query for account positions
class AccountPositionsQuery
  def execute(account_id)
    positions = PositionSummaryReadModel
      .where(account_id: account_id)
      .where('quantity > 0')
    
    positions.map do |position|
      current_price = MarketData.current_price(position.symbol)
      unrealized_pnl = position.quantity * (current_price - position.average_cost)
      
      {
        symbol: position.symbol,
        quantity: position.quantity,
        average_cost: position.average_cost,
        current_price: current_price,
        market_value: position.quantity * current_price,
        unrealized_pnl: unrealized_pnl,
        realized_pnl: position.realized_pnl,
        total_pnl: unrealized_pnl + position.realized_pnl
      }
    end
  end
end

Common Patterns

Task-based UI pattern shapes commands around user intentions rather than CRUD operations. Instead of generic update commands, the system defines specific commands like ApproveExpense, DenyExpense, RequestRevision. Each command captures the business intent clearly, enabling better validation and audit trails.

# Task-based commands reflect business operations
class ApproveExpense
  attr_reader :expense_id, :approver_id, :notes
end

class DenyExpense
  attr_reader :expense_id, :approver_id, :reason
end

class RequestRevision
  attr_reader :expense_id, :approver_id, :requested_changes
end

# Not generic CRUD
class UpdateExpense
  attr_reader :expense_id, :attributes  # Loses business intent
end

Event-carried state transfer includes all necessary data in events rather than requiring queries. Event handlers receive complete information to update read models without additional database calls. This pattern reduces coupling and improves performance but increases event size.

class OrderPlacedWithFullData
  attr_reader :order_id, :customer_id, :customer_name, :customer_email,
              :items, :subtotal, :tax, :shipping_cost, :total,
              :shipping_address, :billing_address, :placed_at
  
  def initialize(order)
    @order_id = order.id
    @customer_id = order.customer_id
    @customer_name = order.customer.name
    @customer_email = order.customer.email
    @items = order.items.map do |item|
      {
        product_id: item.product_id,
        product_name: item.product.name,
        product_sku: item.product.sku,
        quantity: item.quantity,
        unit_price: item.unit_price,
        total_price: item.total_price
      }
    end
    @subtotal = order.subtotal
    @tax = order.tax
    @shipping_cost = order.shipping_cost
    @total = order.total
    @shipping_address = order.shipping_address.to_h
    @billing_address = order.billing_address.to_h
    @placed_at = order.placed_at
  end
end

# Handler needs no additional queries
class OrderPlacedHandler
  def call(event)
    OrderReadModel.create!(
      id: event.order_id,
      customer_id: event.customer_id,
      customer_name: event.customer_name,
      # All data available in event
    )
  end
end

Snapshot pattern periodically captures aggregate state to avoid replaying long event streams. The system stores snapshots at intervals, then replays only events after the most recent snapshot when rebuilding aggregates. This pattern improves performance for event-sourced systems with long histories.

class AggregateWithSnapshots
  def self.load(aggregate_id)
    snapshot = Snapshot.latest_for(aggregate_id)
    
    aggregate = if snapshot
      deserialize_from_snapshot(snapshot)
    else
      new(aggregate_id)
    end
    
    events_since_snapshot(aggregate_id, snapshot&.version || 0).each do |event|
      aggregate.apply(event)
    end
    
    aggregate
  end
  
  def save
    events_since_last_snapshot.each do |event|
      EventStore.append(event)
    end
    
    create_snapshot_if_needed
  end
  
  private
  
  def create_snapshot_if_needed
    return unless should_snapshot?
    
    Snapshot.create!(
      aggregate_id: id,
      aggregate_type: self.class.name,
      version: version,
      state: serialize_state
    )
  end
  
  def should_snapshot?
    # Snapshot every 100 events
    version % 100 == 0
  end
end

Denormalization pattern optimizes read models for specific queries by duplicating and precomputing data. Multiple read models may contain overlapping data shaped differently. This pattern trades storage and consistency complexity for query performance.

# Multiple read models from same source
class ProductListReadModel < ApplicationRecord
  # Optimized for product listings
  # Fields: id, name, price, thumbnail_url, rating, availability
end

class ProductDetailReadModel < ApplicationRecord
  # Optimized for product detail page
  # Fields: id, name, description, price, images (json),
  #         specifications (json), reviews_summary (json)
end

class ProductSearchReadModel < ApplicationRecord
  # Optimized for search
  # Fields: id, name, searchable_text, category_ids (array),
  #         tags (array), price_range
end

# Event handler updates all three
class ProductUpdatedHandler
  def call(event)
    update_list_model(event)
    update_detail_model(event)
    update_search_model(event)
  end
end

Inbox pattern ensures exactly-once event processing by tracking which events each handler has processed. The system stores incoming events in an inbox table with unique identifiers. Handlers check the inbox before processing and mark events complete afterward, preventing duplicate processing.

class EventInbox < ApplicationRecord
  # Columns: event_id (unique), handler_name, status, processed_at
end

class IdempotentEventHandler
  def call(event)
    # Check if already processed
    return if EventInbox.exists?(
      event_id: event.id,
      handler_name: self.class.name,
      status: 'completed'
    )
    
    # Record processing started
    inbox_entry = EventInbox.create!(
      event_id: event.id,
      handler_name: self.class.name,
      status: 'processing'
    )
    
    begin
      process_event(event)
      
      # Mark completed
      inbox_entry.update!(
        status: 'completed',
        processed_at: Time.current
      )
    rescue => e
      inbox_entry.update!(status: 'failed', error: e.message)
      raise
    end
  end
end

Versioning pattern handles evolving event schemas over time. Events include version numbers, and handlers support multiple versions. The system can upgrade old events to new formats or maintain backward-compatible handlers.

class VersionedEvent
  attr_reader :version, :data
  
  def initialize(version:, data:)
    @version = version
    @data = data
  end
end

class OrderCreatedHandler
  def call(event)
    case event.version
    when 1
      handle_v1(event.data)
    when 2
      handle_v2(event.data)
    else
      raise "Unsupported event version: #{event.version}"
    end
  end
  
  private
  
  def handle_v1(data)
    # Original format: simple order data
    create_read_model_from_v1(data)
  end
  
  def handle_v2(data)
    # New format: includes customer preferences
    create_read_model_from_v2(data)
  end
end

Reference

Command Components

Component Purpose Characteristics
Command Object Encapsulate write operations Immutable, validated, carries intent
Command Handler Execute business logic Stateless, focused on single command
Command Bus Route commands to handlers Centralized entry point, transaction management
Domain Model Enforce business rules Rich behavior, encapsulates invariants
Repository Persist aggregates Abstract storage concerns
Event Publisher Notify of state changes Asynchronous, reliable delivery

Query Components

Component Purpose Characteristics
Query Object Encapsulate read operations Immutable, no side effects
Read Model Store denormalized data Optimized for specific views
Query Handler Execute data retrieval Direct database access, no business logic
Projection Transform events to read models Replay capability, idempotent
View Model Shape data for presentation Specific to use case

Event Types

Event Command Purpose
OrderCreated CreateOrder Record order placement
OrderShipped ShipOrder Track shipment initiation
OrderCancelled CancelOrder Handle cancellation
PaymentProcessed ProcessPayment Confirm payment completion
InventoryReserved ReserveInventory Reserve stock for order
UserRegistered RegisterUser Record new user signup

Read Model Update Strategies

Strategy Mechanism Consistency Complexity
Synchronous Immediate update in transaction Strong Low
Event-driven Process events asynchronously Eventual Medium
Scheduled Periodic batch updates Eventual Low
Streaming Real-time event stream processing Near real-time High
Pull-based Query source on demand Strong Low

Consistency Patterns

Pattern Description Use Case
Strong Consistency Read reflects all writes immediately Financial transactions, inventory
Eventual Consistency Reads eventually reflect writes Analytics, recommendations
Read Your Writes Users see their own changes User profiles, settings
Monotonic Reads Successive reads never go backward Activity feeds, notifications
Bounded Staleness Maximum lag defined Real-time dashboards with acceptable delay

Ruby Gems for CQRS

Gem Purpose Features
rails_event_store Event sourcing and CQRS infrastructure Event store, subscriptions, projections
arkency-command_bus Command routing and handling Simple routing, validation support
dry-transaction Transaction composition Step-based command processing
eventide-postgres Event sourcing for PostgreSQL Message store, event streams
sequent Event sourcing and CQRS framework Aggregates, projections, workflows

Implementation Decisions

Decision Options Considerations
Database Strategy Single database, separate databases, different technologies Operational complexity vs optimization
Consistency Model Synchronous, asynchronous, hybrid User experience vs scalability
Event Storage Same database, dedicated event store, message queue Query capabilities, durability
Read Model Updates Immediate, background jobs, stream processing Latency requirements, throughput
Command Validation In command, in handler, distributed Performance, maintainability

Performance Optimization

Technique Implementation Benefit
Caching Redis, Memcached for read models Reduce database load
Materialized Views Database-level precomputed queries Faster complex queries
Read Replicas Multiple read databases Scale query throughput
Event Batching Process multiple events together Reduce update overhead
Denormalization Duplicate data in read models Eliminate joins
Indexing Appropriate indexes on read models Query performance

Testing Strategies

Test Type Focus Approach
Command Handler Tests Business logic correctness Unit test with mocked dependencies
Event Handler Tests Read model updates Integration test with test database
Query Tests Data retrieval accuracy Integration test with fixtures
End-to-End Tests Complete workflows Test command through query
Contract Tests Event schema compatibility Verify event structure
Load Tests Performance under load Simulate high traffic scenarios