Overview
Command Query Responsibility Segregation (CQRS) separates the responsibility for handling commands (writes) from queries (reads) in a system. Instead of using a single model to handle both operations, CQRS employs separate models optimized for their specific purposes. Commands modify state and return no data, while queries return data without modifying state.
The pattern emerged from Bertrand Meyer's Command-Query Separation (CQS) principle, which states that methods should either change state or return data, never both. CQRS extends this principle from the method level to the architectural level, creating separate paths through the application for commands and queries.
Traditional CRUD applications use the same model for reads and writes, leading to compromises. A single model must satisfy conflicting requirements: write operations need validation and business logic, while read operations need performance and specific data shapes. CQRS eliminates these compromises by allowing each side to evolve independently.
# Traditional approach - single model handles everything
class User < ApplicationRecord
validates :email, presence: true
def full_profile
# Complex query for displaying profile
end
def update_preferences(prefs)
# Complex write with validation
end
end
# CQRS approach - separate concerns
class CreateUserCommand
def execute(data)
# Handle write with validation
end
end
class UserProfileQuery
def fetch(user_id)
# Optimized read without write concerns
end
end
The pattern proves particularly valuable in complex domains where read and write patterns differ significantly, where scalability requirements vary between operations, or where business logic complexity makes a unified model difficult to maintain.
Key Principles
CQRS operates on distinct separation between commands and queries, each with specific characteristics and responsibilities. Commands represent intentions to change state, carrying all necessary information to perform an operation. They return no data beyond success or failure indicators. Queries retrieve data without side effects, returning information in forms optimized for consumption.
The command side enforces business rules and maintains consistency. Commands flow through a command bus or handler that validates, processes, and applies changes. The system may reject commands that violate invariants, providing feedback about constraint violations. Command handlers interact with domain models that encapsulate business logic and ensure valid state transitions.
The query side optimizes for read performance and specific use cases. Queries access read models designed for particular views or reports, avoiding navigation through complex object graphs. Read models contain denormalized data shaped for presentation, eliminating joins and computations at query time. The system may maintain multiple read models from the same underlying data, each optimized for different scenarios.
# Command - changes state, returns nothing meaningful
class RegisterUser
attr_reader :email, :name
def initialize(email:, name:)
@email = email
@name = name
end
end
class RegisterUserHandler
def call(command)
User.create!(
email: command.email,
name: command.name
)
# Returns success/failure, not data
end
end
# Query - retrieves data, no side effects
class UserDashboardQuery
def execute(user_id)
# Returns denormalized data optimized for display
{
user: user_data(user_id),
recent_orders: recent_orders(user_id),
recommendations: recommendations(user_id)
}
end
private
def user_data(user_id)
ReadModels::UserSummary.find(user_id)
end
end
Data flow differs between sides. Commands typically follow a synchronous path where the system validates input, applies changes, and returns status. The system may publish events after successful command processing, notifying interested components of state changes. Queries access read models directly without triggering business logic or modifying state.
Eventual consistency often emerges in CQRS implementations. Write models and read models may temporarily diverge as the system propagates changes. The command side updates the write model immediately, while read models update asynchronously through event handlers or scheduled processes. Applications must accommodate this delay, typically through UI feedback or explicit refresh mechanisms.
The separation enables independent scaling. Read-heavy applications can scale the query side with caching, replicas, and optimized data stores while maintaining a simpler write side. Write-heavy applications can optimize command processing without burdening it with complex read requirements. Each side can use appropriate technologies and infrastructure.
# Command and query use different stores
class CommandSide
def write_model
# Uses transactional database
@write_model ||= PostgresConnection.new
end
end
class QuerySide
def read_model
# Uses optimized read store
@read_model ||= RedisConnection.new
end
end
Design Considerations
CQRS introduces complexity that must justify its benefits. The pattern suits complex domains where business logic significantly differs between reading and writing, where read and write patterns have different scaling requirements, or where multiple views of the same data serve different purposes. Simple CRUD applications gain little from CQRS and suffer from unnecessary complexity.
Eventual consistency presents the primary challenge. Users must tolerate delays between writing data and seeing updates in queries. Applications must communicate this delay clearly, perhaps showing loading states or stale data indicators. Critical operations may require synchronous updates or compensating mechanisms to maintain user trust.
# Handle eventual consistency in UI
class OrderController < ApplicationController
def create
command = CreateOrder.new(order_params)
command_bus.execute(command)
# Don't immediately query - may not be updated
flash[:notice] = "Order submitted. Processing..."
redirect_to orders_path(status: :processing)
end
def show
query = OrderQuery.new
@order = query.find(params[:id])
# Indicate if data may be stale
@stale = @order.updated_at < 5.seconds.ago
end
end
Data duplication increases storage requirements. Read models replicate information from write models, formatted for specific views. Multiple read models multiply this duplication. Storage costs must be weighed against performance benefits. Stale read models add another dimension, requiring strategies to detect and rebuild corrupted or outdated data.
Operational complexity grows with separate models. Debugging requires tracing through command handlers, event publishers, event handlers, and read model updates. Deployment must coordinate schema changes across models. Monitoring must track command processing, event delivery, and read model freshness. Teams need expertise in distributed system patterns.
# Monitoring eventual consistency lag
class EventProcessingMonitor
def check_lag
last_command = CommandLog.maximum(:created_at)
last_processed = EventProcessor.maximum(:processed_at)
lag_seconds = last_command - last_processed
alert if lag_seconds > threshold
end
end
The pattern fits naturally with event sourcing but does not require it. Event sourcing stores state changes as events rather than current state, providing an audit trail and enabling temporal queries. CQRS and event sourcing complement each other: commands generate events, events update read models. However, CQRS works with traditional persistence, updating both write and read models through conventional means.
Domain complexity drives CQRS adoption. Systems with rich business rules, multiple user roles with different views, or complex validation workflows benefit from separated concerns. E-commerce platforms handle order placement differently than order history queries. Financial systems validate transactions strictly while supporting various reporting needs. Collaborative systems track changes meticulously while presenting current state simply.
Testing becomes more involved. Command handlers need unit tests verifying business logic and integration tests confirming persistence. Event handlers require tests ensuring correct read model updates. End-to-end tests must verify eventual consistency paths. Test infrastructure must support asynchronous processing and multiple data stores.
# Testing command and query independently
RSpec.describe RegisterUserHandler do
it "creates user with valid data" do
command = RegisterUser.new(email: "test@example.com")
handler.call(command)
expect(User.exists?(email: "test@example.com")).to be true
end
end
RSpec.describe UserQuery do
it "returns user summary" do
create_user_in_read_model(id: 1)
result = query.execute(1)
expect(result[:email]).to eq("test@example.com")
end
end
Technology choices differ between sides. Write models use databases emphasizing consistency and transactions. Read models prioritize query performance, potentially using document stores, caching layers, or search engines. The query side might employ Elasticsearch for full-text search or Redis for high-speed access while the command side uses PostgreSQL for ACID guarantees.
Ruby Implementation
Ruby implements CQRS through command objects, handler classes, and query objects. Commands encapsulate operations as plain Ruby objects, handlers process commands and modify state, and queries retrieve data from optimized read models. Several gems provide infrastructure, though many teams build custom implementations tailored to their needs.
Command objects represent intentions with explicit attributes. They validate inputs and carry all information needed to perform operations. Commands remain immutable after creation, preventing accidental modification during processing.
class CreateOrder
include ActiveModel::Model
include ActiveModel::Validations
attr_accessor :user_id, :items, :shipping_address
validates :user_id, presence: true
validates :items, presence: true
validate :items_available
def items_available
items.each do |item|
unless Inventory.available?(item[:product_id], item[:quantity])
errors.add(:items, "#{item[:product_id]} not available")
end
end
end
end
Command handlers execute business logic. They receive validated commands, interact with domain models, persist changes, and publish events. Handlers remain focused on a single command type, simplifying testing and maintenance.
class CreateOrderHandler
def initialize(repository: OrderRepository.new,
event_bus: EventBus.instance)
@repository = repository
@event_bus = event_bus
end
def call(command)
order = Order.new(
user_id: command.user_id,
items: command.items,
shipping_address: command.shipping_address
)
order.calculate_total
order.reserve_inventory
@repository.save(order)
@event_bus.publish(OrderCreated.new(
order_id: order.id,
user_id: order.user_id,
total: order.total
))
Result.success(order.id)
rescue => e
Result.failure(e.message)
end
end
A command bus routes commands to appropriate handlers. It provides a single entry point for all commands, centralizing cross-cutting concerns like logging, authorization, and transaction management.
class CommandBus
def initialize
@handlers = {}
end
def register(command_class, handler)
@handlers[command_class] = handler
end
def execute(command)
return failure("Invalid command") unless command.valid?
handler = @handlers[command.class]
return failure("No handler") unless handler
ApplicationRecord.transaction do
handler.call(command)
end
end
private
def failure(message)
Result.failure(message)
end
end
# Configuration
bus = CommandBus.new
bus.register(CreateOrder, CreateOrderHandler.new)
bus.register(CancelOrder, CancelOrderHandler.new)
Query objects encapsulate read operations. They access read models directly, apply filtering and sorting, and return data formatted for consumption. Queries avoid business logic, focusing purely on data retrieval.
class OrderHistoryQuery
def initialize(connection: ReadDatabase.connection)
@connection = connection
end
def execute(user_id:, limit: 50, offset: 0)
@connection.exec_params(<<-SQL, [user_id, limit, offset])
SELECT
o.id,
o.created_at,
o.total,
o.status,
json_agg(
json_build_object(
'product_name', oi.product_name,
'quantity', oi.quantity,
'price', oi.price
)
) as items
FROM orders_read_model o
JOIN order_items_read_model oi ON oi.order_id = o.id
WHERE o.user_id = $1
GROUP BY o.id
ORDER BY o.created_at DESC
LIMIT $2 OFFSET $3
SQL
end
end
Event handlers update read models asynchronously. They subscribe to domain events published after command processing, transforming events into read model updates. Ruby applications often use Sidekiq or other background job processors for this purpose.
class OrderCreatedHandler
def call(event)
ReadModels::OrderSummary.create(
id: event.order_id,
user_id: event.user_id,
total: event.total,
status: 'pending',
created_at: event.occurred_at
)
UserStatistics.increment_order_count(event.user_id)
end
end
# Event subscription
EventBus.subscribe(OrderCreated, OrderCreatedHandler.new)
Rails applications integrate CQRS through service objects and concerns. Controllers dispatch commands through the command bus and queries through query objects, remaining thin orchestration layers.
class OrdersController < ApplicationController
def create
command = CreateOrder.new(order_params)
result = command_bus.execute(command)
if result.success?
render json: { order_id: result.value }, status: :created
else
render json: { errors: result.error }, status: :unprocessable_entity
end
end
def index
query = OrderHistoryQuery.new
orders = query.execute(
user_id: current_user.id,
limit: params[:limit],
offset: params[:offset]
)
render json: orders
end
private
def command_bus
@command_bus ||= CommandBus.instance
end
end
The arkency/command_bus gem provides a lightweight command bus implementation. It handles command routing, validation, and result handling with minimal configuration.
# Gemfile
gem 'arkency-command_bus'
# Setup
command_bus = Arkency::CommandBus.new
command_bus.register(CreateOrder, CreateOrderHandler.new)
# Usage
result = command_bus.call(CreateOrder.new(order_params))
RailsEventStore offers comprehensive event sourcing and CQRS infrastructure. It stores events, manages subscriptions, and provides projection capabilities for building read models.
# Gemfile
gem 'rails_event_store'
# Configuration
Rails.application.configure do
config.to_prepare do
Rails.configuration.event_store = RailsEventStore::Client.new
end
end
# Publishing events
event_store = Rails.configuration.event_store
event = OrderCreated.new(data: {
order_id: order.id,
user_id: user.id
})
event_store.publish(event, stream_name: "Order$#{order.id}")
# Subscribing to events
event_store.subscribe(OrderCreatedHandler.new, to: [OrderCreated])
Read model projections transform event streams into queryable views. Projections replay events to build initial state and process new events as they arrive.
class OrderSummaryProjection
def initialize(event_store)
@event_store = event_store
end
def rebuild
ReadModels::OrderSummary.delete_all
@event_store.read.each do |event|
handle(event)
end
end
def handle(event)
case event
when OrderCreated
handle_order_created(event)
when OrderShipped
handle_order_shipped(event)
when OrderCancelled
handle_order_cancelled(event)
end
end
private
def handle_order_created(event)
ReadModels::OrderSummary.create(
id: event.data[:order_id],
user_id: event.data[:user_id],
status: 'created'
)
end
def handle_order_shipped(event)
summary = ReadModels::OrderSummary.find(event.data[:order_id])
summary.update(status: 'shipped', shipped_at: event.metadata[:timestamp])
end
end
Implementation Approaches
Simple CQRS maintains a single database with separate command and query paths. Commands update the write model through business logic. Queries read from the same database using optimized queries or denormalized tables. This approach provides CQRS benefits without operational complexity.
# Single database, separate paths
class CommandSide
def create_order(command)
order = Order.create!(command.attributes)
OrderSummary.create_from_order(order) # Denormalized read model
order.id
end
end
class QuerySide
def order_summary(order_id)
OrderSummary.find(order_id) # Read from denormalized table
end
end
The read model exists as denormalized tables or materialized views in the same database. Database triggers or application code keep read models synchronized with write models. This approach minimizes operational overhead while providing query optimization.
Separate database CQRS uses distinct databases for commands and queries. The write database optimizes for consistency and transactions. The read database optimizes for query performance, perhaps using different technology altogether. Changes propagate from write to read database through events or replication.
class DatabaseConfiguration
def self.write_db
ActiveRecord::Base.establish_connection(
adapter: 'postgresql',
database: 'app_write',
host: 'write-db.example.com'
)
end
def self.read_db
ActiveRecord::Base.establish_connection(
adapter: 'postgresql',
database: 'app_read',
host: 'read-db.example.com',
replica: true
)
end
end
class WriteModel < ActiveRecord::Base
establish_connection :write_db
end
class ReadModel < ActiveRecord::Base
establish_connection :read_db
end
Event-driven CQRS publishes events after command processing. Event handlers update read models asynchronously. This approach provides loose coupling and scales better but introduces eventual consistency. Background job processors handle event delivery and retry failed updates.
class EventDrivenCommandHandler
def call(command)
result = nil
ActiveRecord::Base.transaction do
result = process_command(command)
publish_events(result.events)
end
result
end
private
def publish_events(events)
events.each do |event|
EventPublisher.publish_async(event)
end
end
end
class EventPublisher
def self.publish_async(event)
UpdateReadModelJob.perform_later(event.to_json)
end
end
class UpdateReadModelJob < ApplicationJob
queue_as :events
def perform(event_json)
event = JSON.parse(event_json)
handler = EventHandlerRegistry.handler_for(event['type'])
handler.call(event)
end
end
Multiple read models serve different query needs. One read model might optimize for list views, another for detailed displays, and a third for reporting. Each subscribes to relevant events and maintains its own optimized structure.
# List view read model - minimal data
class OrderListReadModel
def self.handle_order_created(event)
create(
order_id: event[:order_id],
created_at: event[:created_at],
total: event[:total],
status: event[:status]
)
end
end
# Detail view read model - comprehensive data
class OrderDetailReadModel
def self.handle_order_created(event)
create(
order_id: event[:order_id],
user: event[:user_data],
items: event[:items],
shipping_address: event[:shipping_address],
billing_address: event[:billing_address],
total: event[:total],
status: event[:status]
)
end
end
# Analytics read model - aggregated data
class OrderAnalyticsReadModel
def self.handle_order_created(event)
date = event[:created_at].to_date
increment_counter(date, :order_count)
increment_counter(date, :revenue, event[:total])
end
end
Event sourcing stores all state changes as events rather than current state. Commands generate events, which persist to an event store. Read models rebuild from event streams. This approach provides complete audit trails and temporal queries but adds complexity.
class EventSourcedOrder
attr_reader :id, :events
def initialize(id)
@id = id
@events = []
end
def create(user_id:, items:)
apply OrderCreated.new(
order_id: @id,
user_id: user_id,
items: items
)
end
def ship(tracking_number)
raise "Cannot ship cancelled order" if @cancelled
apply OrderShipped.new(
order_id: @id,
tracking_number: tracking_number
)
end
def apply(event)
@events << event
case event
when OrderCreated
@user_id = event.user_id
@items = event.items
when OrderShipped
@shipped = true
when OrderCancelled
@cancelled = true
end
end
def self.rebuild_from_events(events)
order = new(events.first.order_id)
events.each { |event| order.apply(event) }
order
end
end
Saga patterns coordinate long-running processes across multiple aggregates. A saga listens for events and issues commands in response, orchestrating complex workflows. CQRS facilitates sagas by providing clear command and event boundaries.
class OrderFulfillmentSaga
def initialize(command_bus)
@command_bus = command_bus
end
def handle_order_created(event)
@command_bus.execute(
ReserveInventory.new(
order_id: event.order_id,
items: event.items
)
)
end
def handle_inventory_reserved(event)
@command_bus.execute(
ProcessPayment.new(
order_id: event.order_id,
amount: event.amount
)
)
end
def handle_payment_processed(event)
@command_bus.execute(
ShipOrder.new(order_id: event.order_id)
)
end
def handle_payment_failed(event)
@command_bus.execute(
ReleaseInventory.new(order_id: event.order_id)
)
@command_bus.execute(
CancelOrder.new(
order_id: event.order_id,
reason: 'Payment failed'
)
)
end
end
Practical Examples
An e-commerce platform demonstrates CQRS separating order processing from order history queries. Customers place orders through commands that validate inventory, calculate totals, and reserve stock. The system publishes events after successful order creation. Order history queries access read models optimized for display without navigating complex relationships.
# Command for creating orders
class PlaceOrder
include ActiveModel::Model
attr_accessor :customer_id, :items, :shipping_address, :billing_address
validates :customer_id, :items, :shipping_address, :billing_address,
presence: true
validate :items_in_stock
def items_in_stock
items.each do |item|
available = Inventory.check_availability(
item[:product_id],
item[:quantity]
)
unless available
errors.add(:items, "#{item[:product_id]} out of stock")
end
end
end
end
# Command handler
class PlaceOrderHandler
def initialize(inventory:, pricing:, repository:)
@inventory = inventory
@pricing = pricing
@repository = repository
end
def call(command)
# Calculate order total
total = @pricing.calculate_total(command.items)
# Reserve inventory
reservation_id = @inventory.reserve(command.items)
# Create order
order = Order.new(
customer_id: command.customer_id,
items: command.items,
shipping_address: command.shipping_address,
billing_address: command.billing_address,
total: total,
reservation_id: reservation_id
)
@repository.save(order)
# Publish event
EventBus.publish(OrderPlaced.new(
order_id: order.id,
customer_id: order.customer_id,
items: order.items,
total: order.total,
placed_at: Time.current
))
order.id
rescue => e
@inventory.release(reservation_id) if reservation_id
raise
end
end
# Event handler updates read model
class OrderPlacedHandler
def call(event)
OrderReadModel.create!(
id: event.order_id,
customer_id: event.customer_id,
item_count: event.items.size,
total: event.total,
status: 'pending',
placed_at: event.placed_at
)
event.items.each do |item|
OrderItemReadModel.create!(
order_id: event.order_id,
product_id: item[:product_id],
product_name: item[:product_name],
quantity: item[:quantity],
price: item[:price]
)
end
end
end
# Query for order history
class CustomerOrderHistoryQuery
def execute(customer_id, page: 1, per_page: 20)
OrderReadModel
.where(customer_id: customer_id)
.includes(:order_items)
.order(placed_at: :desc)
.page(page)
.per(per_page)
.map do |order|
{
id: order.id,
placed_at: order.placed_at,
item_count: order.item_count,
total: order.total,
status: order.status,
items: order.order_items.map do |item|
{
product_name: item.product_name,
quantity: item.quantity,
price: item.price
}
end
}
end
end
end
A collaborative document editing system uses CQRS to handle concurrent edits and display current document state. Commands represent edit operations, validated against current version. The system publishes events for each accepted edit. Read models maintain the current document text optimized for rendering, updated through event handlers that apply operations.
# Edit command
class ApplyDocumentEdit
attr_reader :document_id, :user_id, :operation, :version
def initialize(document_id:, user_id:, operation:, version:)
@document_id = document_id
@user_id = user_id
@operation = operation # e.g., { type: 'insert', position: 10, text: 'hello' }
@version = version
end
end
# Command handler with conflict detection
class ApplyDocumentEditHandler
def call(command)
document = DocumentWriteModel.find(command.document_id)
# Check version for optimistic locking
if document.version != command.version
return Result.failure('Version conflict - document changed')
end
# Validate operation
unless valid_operation?(command.operation, document)
return Result.failure('Invalid operation')
end
# Record edit
edit = DocumentEdit.create!(
document_id: command.document_id,
user_id: command.user_id,
operation: command.operation,
version: document.version + 1
)
document.update!(version: edit.version)
# Publish event
EventBus.publish(DocumentEdited.new(
document_id: command.document_id,
user_id: command.user_id,
operation: command.operation,
version: edit.version,
timestamp: edit.created_at
))
Result.success(edit.version)
end
end
# Read model for current document state
class DocumentStateReadModel < ApplicationRecord
def self.handle_document_edited(event)
doc = find_or_create_by(document_id: event.document_id)
# Apply operation to current text
doc.current_text = apply_operation(
doc.current_text,
event.operation
)
doc.version = event.version
doc.last_edited_by = event.user_id
doc.last_edited_at = event.timestamp
doc.save!
end
def self.apply_operation(text, operation)
case operation['type']
when 'insert'
text.insert(operation['position'], operation['text'])
when 'delete'
text.slice!(operation['position'], operation['length'])
when 'replace'
text[operation['position'], operation['length']] = operation['text']
end
text
end
end
# Query for document display
class DocumentDisplayQuery
def execute(document_id)
doc = DocumentStateReadModel.find_by(document_id: document_id)
{
id: doc.document_id,
text: doc.current_text,
version: doc.version,
last_edited_by: doc.last_edited_by,
last_edited_at: doc.last_edited_at,
editors: recent_editors(document_id)
}
end
private
def recent_editors(document_id)
DocumentEdit
.where(document_id: document_id)
.where('created_at > ?', 1.hour.ago)
.select(:user_id)
.distinct
.limit(10)
end
end
A financial trading system demonstrates CQRS separating trade execution from position queries. Trade commands undergo strict validation before execution, checking account balances, position limits, and market conditions. The system records each trade immediately in the write model. Position queries aggregate trades into current holdings, profit/loss, and margin calculations, updated asynchronously from trade events.
# Trade execution command
class ExecuteTrade
attr_reader :account_id, :symbol, :quantity, :price, :side
validates :account_id, :symbol, :quantity, :price, :side, presence: true
validates :side, inclusion: { in: %w[buy sell] }
validates :quantity, numericality: { greater_than: 0 }
validate :sufficient_funds, if: -> { side == 'buy' }
validate :sufficient_position, if: -> { side == 'sell' }
def sufficient_funds
required = quantity * price
available = Account.find(account_id).available_funds
errors.add(:quantity, 'Insufficient funds') if required > available
end
def sufficient_position
current = Position.find_by(account_id: account_id, symbol: symbol)&.quantity || 0
errors.add(:quantity, 'Insufficient position') if quantity > current
end
end
# Trade handler
class ExecuteTradeHandler
def call(command)
ActiveRecord::Base.transaction do
trade = Trade.create!(
account_id: command.account_id,
symbol: command.symbol,
quantity: command.quantity,
price: command.price,
side: command.side,
executed_at: Time.current
)
# Update account balance
Account.find(command.account_id).update_balance_for_trade(trade)
# Publish event
EventBus.publish(TradeExecuted.new(
trade_id: trade.id,
account_id: trade.account_id,
symbol: trade.symbol,
quantity: trade.quantity,
price: trade.price,
side: trade.side,
executed_at: trade.executed_at
))
trade.id
end
end
end
# Read model for position summary
class PositionSummaryReadModel < ApplicationRecord
def self.handle_trade_executed(event)
position = find_or_initialize_by(
account_id: event.account_id,
symbol: event.symbol
)
case event.side
when 'buy'
position.quantity += event.quantity
position.total_cost += event.quantity * event.price
when 'sell'
position.quantity -= event.quantity
realized_pnl = event.quantity * (event.price - position.average_cost)
position.realized_pnl += realized_pnl
end
position.average_cost = position.total_cost / position.quantity if position.quantity > 0
position.last_trade_at = event.executed_at
position.save!
end
end
# Query for account positions
class AccountPositionsQuery
def execute(account_id)
positions = PositionSummaryReadModel
.where(account_id: account_id)
.where('quantity > 0')
positions.map do |position|
current_price = MarketData.current_price(position.symbol)
unrealized_pnl = position.quantity * (current_price - position.average_cost)
{
symbol: position.symbol,
quantity: position.quantity,
average_cost: position.average_cost,
current_price: current_price,
market_value: position.quantity * current_price,
unrealized_pnl: unrealized_pnl,
realized_pnl: position.realized_pnl,
total_pnl: unrealized_pnl + position.realized_pnl
}
end
end
end
Common Patterns
Task-based UI pattern shapes commands around user intentions rather than CRUD operations. Instead of generic update commands, the system defines specific commands like ApproveExpense, DenyExpense, RequestRevision. Each command captures the business intent clearly, enabling better validation and audit trails.
# Task-based commands reflect business operations
class ApproveExpense
attr_reader :expense_id, :approver_id, :notes
end
class DenyExpense
attr_reader :expense_id, :approver_id, :reason
end
class RequestRevision
attr_reader :expense_id, :approver_id, :requested_changes
end
# Not generic CRUD
class UpdateExpense
attr_reader :expense_id, :attributes # Loses business intent
end
Event-carried state transfer includes all necessary data in events rather than requiring queries. Event handlers receive complete information to update read models without additional database calls. This pattern reduces coupling and improves performance but increases event size.
class OrderPlacedWithFullData
attr_reader :order_id, :customer_id, :customer_name, :customer_email,
:items, :subtotal, :tax, :shipping_cost, :total,
:shipping_address, :billing_address, :placed_at
def initialize(order)
@order_id = order.id
@customer_id = order.customer_id
@customer_name = order.customer.name
@customer_email = order.customer.email
@items = order.items.map do |item|
{
product_id: item.product_id,
product_name: item.product.name,
product_sku: item.product.sku,
quantity: item.quantity,
unit_price: item.unit_price,
total_price: item.total_price
}
end
@subtotal = order.subtotal
@tax = order.tax
@shipping_cost = order.shipping_cost
@total = order.total
@shipping_address = order.shipping_address.to_h
@billing_address = order.billing_address.to_h
@placed_at = order.placed_at
end
end
# Handler needs no additional queries
class OrderPlacedHandler
def call(event)
OrderReadModel.create!(
id: event.order_id,
customer_id: event.customer_id,
customer_name: event.customer_name,
# All data available in event
)
end
end
Snapshot pattern periodically captures aggregate state to avoid replaying long event streams. The system stores snapshots at intervals, then replays only events after the most recent snapshot when rebuilding aggregates. This pattern improves performance for event-sourced systems with long histories.
class AggregateWithSnapshots
def self.load(aggregate_id)
snapshot = Snapshot.latest_for(aggregate_id)
aggregate = if snapshot
deserialize_from_snapshot(snapshot)
else
new(aggregate_id)
end
events_since_snapshot(aggregate_id, snapshot&.version || 0).each do |event|
aggregate.apply(event)
end
aggregate
end
def save
events_since_last_snapshot.each do |event|
EventStore.append(event)
end
create_snapshot_if_needed
end
private
def create_snapshot_if_needed
return unless should_snapshot?
Snapshot.create!(
aggregate_id: id,
aggregate_type: self.class.name,
version: version,
state: serialize_state
)
end
def should_snapshot?
# Snapshot every 100 events
version % 100 == 0
end
end
Denormalization pattern optimizes read models for specific queries by duplicating and precomputing data. Multiple read models may contain overlapping data shaped differently. This pattern trades storage and consistency complexity for query performance.
# Multiple read models from same source
class ProductListReadModel < ApplicationRecord
# Optimized for product listings
# Fields: id, name, price, thumbnail_url, rating, availability
end
class ProductDetailReadModel < ApplicationRecord
# Optimized for product detail page
# Fields: id, name, description, price, images (json),
# specifications (json), reviews_summary (json)
end
class ProductSearchReadModel < ApplicationRecord
# Optimized for search
# Fields: id, name, searchable_text, category_ids (array),
# tags (array), price_range
end
# Event handler updates all three
class ProductUpdatedHandler
def call(event)
update_list_model(event)
update_detail_model(event)
update_search_model(event)
end
end
Inbox pattern ensures exactly-once event processing by tracking which events each handler has processed. The system stores incoming events in an inbox table with unique identifiers. Handlers check the inbox before processing and mark events complete afterward, preventing duplicate processing.
class EventInbox < ApplicationRecord
# Columns: event_id (unique), handler_name, status, processed_at
end
class IdempotentEventHandler
def call(event)
# Check if already processed
return if EventInbox.exists?(
event_id: event.id,
handler_name: self.class.name,
status: 'completed'
)
# Record processing started
inbox_entry = EventInbox.create!(
event_id: event.id,
handler_name: self.class.name,
status: 'processing'
)
begin
process_event(event)
# Mark completed
inbox_entry.update!(
status: 'completed',
processed_at: Time.current
)
rescue => e
inbox_entry.update!(status: 'failed', error: e.message)
raise
end
end
end
Versioning pattern handles evolving event schemas over time. Events include version numbers, and handlers support multiple versions. The system can upgrade old events to new formats or maintain backward-compatible handlers.
class VersionedEvent
attr_reader :version, :data
def initialize(version:, data:)
@version = version
@data = data
end
end
class OrderCreatedHandler
def call(event)
case event.version
when 1
handle_v1(event.data)
when 2
handle_v2(event.data)
else
raise "Unsupported event version: #{event.version}"
end
end
private
def handle_v1(data)
# Original format: simple order data
create_read_model_from_v1(data)
end
def handle_v2(data)
# New format: includes customer preferences
create_read_model_from_v2(data)
end
end
Reference
Command Components
| Component | Purpose | Characteristics |
|---|---|---|
| Command Object | Encapsulate write operations | Immutable, validated, carries intent |
| Command Handler | Execute business logic | Stateless, focused on single command |
| Command Bus | Route commands to handlers | Centralized entry point, transaction management |
| Domain Model | Enforce business rules | Rich behavior, encapsulates invariants |
| Repository | Persist aggregates | Abstract storage concerns |
| Event Publisher | Notify of state changes | Asynchronous, reliable delivery |
Query Components
| Component | Purpose | Characteristics |
|---|---|---|
| Query Object | Encapsulate read operations | Immutable, no side effects |
| Read Model | Store denormalized data | Optimized for specific views |
| Query Handler | Execute data retrieval | Direct database access, no business logic |
| Projection | Transform events to read models | Replay capability, idempotent |
| View Model | Shape data for presentation | Specific to use case |
Event Types
| Event | Command | Purpose |
|---|---|---|
| OrderCreated | CreateOrder | Record order placement |
| OrderShipped | ShipOrder | Track shipment initiation |
| OrderCancelled | CancelOrder | Handle cancellation |
| PaymentProcessed | ProcessPayment | Confirm payment completion |
| InventoryReserved | ReserveInventory | Reserve stock for order |
| UserRegistered | RegisterUser | Record new user signup |
Read Model Update Strategies
| Strategy | Mechanism | Consistency | Complexity |
|---|---|---|---|
| Synchronous | Immediate update in transaction | Strong | Low |
| Event-driven | Process events asynchronously | Eventual | Medium |
| Scheduled | Periodic batch updates | Eventual | Low |
| Streaming | Real-time event stream processing | Near real-time | High |
| Pull-based | Query source on demand | Strong | Low |
Consistency Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Strong Consistency | Read reflects all writes immediately | Financial transactions, inventory |
| Eventual Consistency | Reads eventually reflect writes | Analytics, recommendations |
| Read Your Writes | Users see their own changes | User profiles, settings |
| Monotonic Reads | Successive reads never go backward | Activity feeds, notifications |
| Bounded Staleness | Maximum lag defined | Real-time dashboards with acceptable delay |
Ruby Gems for CQRS
| Gem | Purpose | Features |
|---|---|---|
| rails_event_store | Event sourcing and CQRS infrastructure | Event store, subscriptions, projections |
| arkency-command_bus | Command routing and handling | Simple routing, validation support |
| dry-transaction | Transaction composition | Step-based command processing |
| eventide-postgres | Event sourcing for PostgreSQL | Message store, event streams |
| sequent | Event sourcing and CQRS framework | Aggregates, projections, workflows |
Implementation Decisions
| Decision | Options | Considerations |
|---|---|---|
| Database Strategy | Single database, separate databases, different technologies | Operational complexity vs optimization |
| Consistency Model | Synchronous, asynchronous, hybrid | User experience vs scalability |
| Event Storage | Same database, dedicated event store, message queue | Query capabilities, durability |
| Read Model Updates | Immediate, background jobs, stream processing | Latency requirements, throughput |
| Command Validation | In command, in handler, distributed | Performance, maintainability |
Performance Optimization
| Technique | Implementation | Benefit |
|---|---|---|
| Caching | Redis, Memcached for read models | Reduce database load |
| Materialized Views | Database-level precomputed queries | Faster complex queries |
| Read Replicas | Multiple read databases | Scale query throughput |
| Event Batching | Process multiple events together | Reduce update overhead |
| Denormalization | Duplicate data in read models | Eliminate joins |
| Indexing | Appropriate indexes on read models | Query performance |
Testing Strategies
| Test Type | Focus | Approach |
|---|---|---|
| Command Handler Tests | Business logic correctness | Unit test with mocked dependencies |
| Event Handler Tests | Read model updates | Integration test with test database |
| Query Tests | Data retrieval accuracy | Integration test with fixtures |
| End-to-End Tests | Complete workflows | Test command through query |
| Contract Tests | Event schema compatibility | Verify event structure |
| Load Tests | Performance under load | Simulate high traffic scenarios |