Overview
The Publish-Subscribe pattern establishes a one-to-many communication model where publishers emit messages without knowledge of subscribers, and subscribers receive messages without knowledge of publishers. An intermediary component, typically called a broker or event bus, mediates the flow of messages between these parties. This decoupling creates systems where components interact through events rather than direct method calls.
The pattern originated in message-oriented middleware systems during the 1980s and gained prominence with distributed systems and event-driven architectures. Unlike the Observer pattern where subjects maintain references to observers, publish-subscribe introduces a complete separation where publishers and subscribers operate independently of each other's existence, lifecycle, or location.
The core mechanism works through topics or channels. Publishers send messages to named topics, and subscribers register interest in specific topics. The broker matches publications to subscriptions and delivers messages accordingly. This indirection enables dynamic system composition where publishers and subscribers can be added or removed without affecting other components.
# Simple pub-sub demonstration
class EventBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
end
def subscribe(topic, &handler)
@subscriptions[topic] << handler
end
def publish(topic, data)
@subscriptions[topic].each { |handler| handler.call(data) }
end
end
broker = EventBroker.new
broker.subscribe(:user_created) { |user| puts "Welcome email sent to #{user[:email]}" }
broker.subscribe(:user_created) { |user| puts "Analytics tracked: #{user[:id]}" }
broker.publish(:user_created, { id: 123, email: "user@example.com" })
# => Welcome email sent to user@example.com
# => Analytics tracked: 123
The pattern applies across different scales, from in-process event handling to distributed message queues spanning multiple services. Each implementation shares the core principle of decoupled communication through named topics.
Key Principles
The publish-subscribe pattern operates on several fundamental principles that distinguish it from other messaging patterns. These principles govern how messages flow through the system and how components interact.
Decoupling: Publishers and subscribers remain independent. Publishers emit events without knowing which subscribers exist or how many will receive the message. Subscribers register for topics without knowing which publishers will produce messages. This separation allows components to evolve independently and reduces interdependencies across the system.
Topic-based routing: Messages travel through named channels called topics. Publishers address messages to topics rather than specific recipients. Subscribers express interest in topics rather than connecting to specific publishers. The broker uses topic names to determine which subscribers receive each message. Topic hierarchies enable pattern-based subscriptions where a single subscription can match multiple related topics.
Asynchronous communication: Message delivery typically occurs asynchronously. Publishers continue execution immediately after emitting a message without waiting for subscriber processing. Subscribers process messages when they arrive, independent of publisher timing. This temporal decoupling prevents publishers from blocking on slow subscribers and enables parallel message processing.
One-to-many delivery: A single published message reaches multiple subscribers. The broker duplicates messages for each matching subscription. This multicast behavior differs from point-to-point messaging where each message has exactly one recipient. The number of message copies depends on subscriber registrations at the time of publication.
Dynamic subscription: Subscribers can register and unregister from topics at runtime. New subscribers immediately begin receiving messages for their registered topics. Removed subscribers stop receiving messages. This dynamic behavior enables runtime system reconfiguration without restarting components or modifying publisher code.
Message immutability: Published messages typically remain unchanged during delivery. Each subscriber receives the same message content. This principle prevents subscribers from interfering with each other through message modification. Immutable messages also enable features like message replay and audit trails.
The broker component centralizes routing logic and maintains subscription state. Brokers range from simple in-process objects to distributed message queues with persistence and delivery guarantees. The broker's responsibilities include:
- Maintaining topic-to-subscriber mappings
- Routing messages from publishers to matching subscribers
- Managing subscriber lifecycles and handling unsubscriptions
- Enforcing message delivery semantics (at-most-once, at-least-once, exactly-once)
- Providing message persistence and replay capabilities in advanced implementations
Message flow follows a consistent sequence: publishers emit messages to topics, the broker matches topics to subscriptions, the broker delivers messages to matched subscribers, and subscribers process messages independently. Each step operates independently without blocking other steps.
# Demonstrating core principles
class TopicBroker
def initialize
@topics = Hash.new { |h, k| h[k] = [] }
end
def subscribe(topic, subscriber)
@topics[topic] << subscriber unless @topics[topic].include?(subscriber)
end
def unsubscribe(topic, subscriber)
@topics[topic].delete(subscriber)
end
def publish(topic, message)
@topics[topic].each do |subscriber|
subscriber.handle(message.dup) # Message isolation
end
end
def subscriber_count(topic)
@topics[topic].size
end
end
class Subscriber
attr_reader :received
def initialize(name)
@name = name
@received = []
end
def handle(message)
@received << message
message[:processed_by] = @name # Isolated copy allows modification
end
end
broker = TopicBroker.new
sub1 = Subscriber.new("Analytics")
sub2 = Subscriber.new("Logger")
broker.subscribe(:order_placed, sub1)
broker.subscribe(:order_placed, sub2)
broker.publish(:order_placed, { order_id: 456, total: 99.99 })
# => Both subscribers receive independent message copies
sub1.received.first[:processed_by] # => "Analytics"
sub2.received.first[:processed_by] # => "Logger"
Ruby Implementation
Ruby provides multiple approaches for implementing publish-subscribe patterns, from the built-in Observable module to custom implementations and third-party gems. Each approach offers different trade-offs between simplicity and features.
Observable Module: Ruby's standard library includes an Observable module that implements a basic observer pattern. While technically observer rather than pure pub-sub, it demonstrates the core concepts:
require 'observer'
class EventPublisher
include Observable
def emit_event(event_type, data)
changed
notify_observers(event_type, data)
end
end
class EventSubscriber
def initialize(name)
@name = name
end
def update(event_type, data)
puts "#{@name} received #{event_type}: #{data}"
end
end
publisher = EventPublisher.new
subscriber1 = EventSubscriber.new("Subscriber A")
subscriber2 = EventSubscriber.new("Subscriber B")
publisher.add_observer(subscriber1)
publisher.add_observer(subscriber2)
publisher.emit_event(:user_login, { user_id: 789 })
# => Subscriber A received user_login: {:user_id=>789}
# => Subscriber B received user_login: {:user_id=>789}
The Observable module has limitations: it couples observers to a single observable, doesn't support topic-based routing, and requires observers to implement an update method. For true pub-sub with topics, custom implementations work better.
Custom Topic-Based Implementation: A topic-based broker implementation provides more flexibility:
class PubSubBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
@subscription_ids = {}
end
def subscribe(topic, &block)
subscription_id = generate_id
@subscriptions[topic] << { id: subscription_id, handler: block }
@subscription_ids[subscription_id] = topic
subscription_id
end
def unsubscribe(subscription_id)
topic = @subscription_ids.delete(subscription_id)
return unless topic
@subscriptions[topic].reject! { |sub| sub[:id] == subscription_id }
end
def publish(topic, event)
@subscriptions[topic].each do |subscription|
subscription[:handler].call(event)
end
end
def clear_topic(topic)
@subscriptions[topic].each { |sub| @subscription_ids.delete(sub[:id]) }
@subscriptions.delete(topic)
end
private
def generate_id
"sub_#{Time.now.to_i}_#{rand(10000)}"
end
end
broker = PubSubBroker.new
# Subscribe to different topics
analytics_sub = broker.subscribe(:page_view) do |event|
puts "Analytics: Page #{event[:page]} viewed"
end
logger_sub = broker.subscribe(:page_view) do |event|
puts "Logger: [#{Time.now}] #{event[:page]}"
end
error_sub = broker.subscribe(:error) do |event|
puts "Error handler: #{event[:message]}"
end
broker.publish(:page_view, { page: "/dashboard", user_id: 123 })
# => Analytics: Page /dashboard viewed
# => Logger: [2025-01-15 10:30:00 -0500] /dashboard
broker.publish(:error, { message: "Database timeout" })
# => Error handler: Database timeout
broker.unsubscribe(logger_sub)
broker.publish(:page_view, { page: "/profile", user_id: 123 })
# => Analytics: Page /profile viewed
# (Logger no longer receives events)
Wildcard Topic Matching: Advanced implementations support pattern-based topic subscriptions:
class PatternBroker
def initialize
@subscriptions = []
end
def subscribe(pattern, &block)
subscription_id = generate_id
regex = pattern_to_regex(pattern)
@subscriptions << { id: subscription_id, pattern: regex, handler: block }
subscription_id
end
def publish(topic, event)
@subscriptions.each do |sub|
sub[:handler].call(topic, event) if sub[:pattern].match?(topic)
end
end
def unsubscribe(subscription_id)
@subscriptions.reject! { |sub| sub[:id] == subscription_id }
end
private
def pattern_to_regex(pattern)
# Convert wildcard pattern to regex
# * matches single segment, ** matches multiple segments
escaped = Regexp.escape(pattern)
regex_pattern = escaped.gsub('\*\*', '.*').gsub('\*', '[^.]+')
Regexp.new("^#{regex_pattern}$")
end
def generate_id
"sub_#{Time.now.to_i}_#{rand(10000)}"
end
end
broker = PatternBroker.new
# Subscribe to patterns
broker.subscribe("user.*") do |topic, event|
puts "User event on #{topic}: #{event[:action]}"
end
broker.subscribe("user.*.created") do |topic, event|
puts "Creation event: #{event[:entity]}"
end
broker.subscribe("**") do |topic, event|
puts "Audit: #{topic}"
end
broker.publish("user.account.created", { entity: "Account", action: "create" })
# => User event on user.account.created: create
# => Creation event: Account
# => Audit: user.account.created
broker.publish("order.placed", { order_id: 999 })
# => Audit: order.placed
Thread-Safe Implementation: For concurrent access, thread safety becomes critical:
require 'monitor'
class ThreadSafeBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
@monitor = Monitor.new
end
def subscribe(topic, &block)
@monitor.synchronize do
subscription_id = generate_id
@subscriptions[topic] << { id: subscription_id, handler: block }
subscription_id
end
end
def publish(topic, event)
handlers = @monitor.synchronize do
@subscriptions[topic].map { |sub| sub[:handler] }
end
# Execute handlers outside the lock to prevent deadlock
handlers.each { |handler| handler.call(event) }
end
def unsubscribe(subscription_id)
@monitor.synchronize do
@subscriptions.each do |topic, subs|
subs.reject! { |sub| sub[:id] == subscription_id }
end
end
end
private
def generate_id
"sub_#{Time.now.to_i}_#{rand(10000)}"
end
end
Wisper Gem: The Wisper gem provides a production-ready pub-sub implementation:
# gem install wisper
require 'wisper'
class OrderProcessor
include Wisper::Publisher
def process_order(order)
# Business logic
validate_order(order)
charge_payment(order)
# Publish events
broadcast(:order_processed, order)
broadcast(:inventory_updated, order.items)
end
private
def validate_order(order)
# Validation logic
end
def charge_payment(order)
# Payment logic
end
end
class EmailNotifier
def order_processed(order)
puts "Sending confirmation email for order #{order.id}"
end
end
class InventoryManager
def inventory_updated(items)
puts "Updating inventory for #{items.size} items"
end
end
processor = OrderProcessor.new
processor.subscribe(EmailNotifier.new)
processor.subscribe(InventoryManager.new)
order = OpenStruct.new(id: 123, items: [{sku: "ABC", qty: 2}])
processor.process_order(order)
# => Sending confirmation email for order 123
# => Updating inventory for 1 items
Practical Examples
The publish-subscribe pattern applies to numerous real-world scenarios across different application types. These examples demonstrate how the pattern solves specific architectural challenges.
E-commerce Order Processing: Order placement triggers multiple independent operations:
class OrderBroker
def initialize
@handlers = Hash.new { |h, k| h[k] = [] }
end
def on(event, &block)
@handlers[event] << block
end
def emit(event, data)
@handlers[event].each { |handler| handler.call(data) }
end
end
class OrderService
attr_reader :broker
def initialize(broker)
@broker = broker
end
def place_order(order_data)
order = create_order(order_data)
@broker.emit(:order_placed, order)
order
end
private
def create_order(data)
{ id: rand(10000), items: data[:items], total: data[:total], customer_id: data[:customer_id] }
end
end
class PaymentProcessor
def initialize(broker)
broker.on(:order_placed) { |order| process_payment(order) }
end
def process_payment(order)
puts "Processing payment of $#{order[:total]} for order #{order[:id]}"
# Payment gateway integration
end
end
class InventoryService
def initialize(broker)
broker.on(:order_placed) { |order| update_inventory(order) }
end
def update_inventory(order)
puts "Reserving inventory for order #{order[:id]}: #{order[:items].size} items"
# Inventory database update
end
end
class NotificationService
def initialize(broker)
broker.on(:order_placed) { |order| send_confirmation(order) }
end
def send_confirmation(order)
puts "Sending order confirmation for #{order[:id]} to customer #{order[:customer_id]}"
# Email/SMS service integration
end
end
class AnalyticsTracker
def initialize(broker)
broker.on(:order_placed) { |order| track_order(order) }
end
def track_order(order)
puts "Recording analytics: order_value=$#{order[:total]}, items=#{order[:items].size}"
# Analytics platform API call
end
end
# Wire up the system
broker = OrderBroker.new
order_service = OrderService.new(broker)
payment = PaymentProcessor.new(broker)
inventory = InventoryService.new(broker)
notifications = NotificationService.new(broker)
analytics = AnalyticsTracker.new(broker)
# Place an order
order = order_service.place_order(
items: [{ sku: "WIDGET-1", quantity: 2, price: 29.99 }],
total: 59.98,
customer_id: 456
)
# => Processing payment of $59.98 for order 7834
# => Reserving inventory for order 7834: 1 items
# => Sending order confirmation for 7834 to customer 456
# => Recording analytics: order_value=$59.98, items=1
This architecture enables adding new order-related functionality without modifying the OrderService. New services simply subscribe to the order_placed event.
Real-time Dashboard Updates: Applications with multiple dashboard widgets subscribe to relevant data changes:
class DataBroker
def initialize
@topics = Hash.new { |h, k| h[k] = [] }
@client_connections = {}
end
def connect_client(client_id)
@client_connections[client_id] = []
client_id
end
def subscribe_client(client_id, topic, &callback)
subscription = { client: client_id, callback: callback }
@topics[topic] << subscription
@client_connections[client_id] << { topic: topic, subscription: subscription }
end
def disconnect_client(client_id)
@client_connections[client_id]&.each do |sub_info|
@topics[sub_info[:topic]].delete(sub_info[:subscription])
end
@client_connections.delete(client_id)
end
def publish(topic, data)
@topics[topic].each do |subscription|
subscription[:callback].call(data)
end
end
end
class DashboardWidget
def initialize(id, broker, topics)
@id = id
@broker = broker
@data = {}
topics.each do |topic|
@broker.subscribe_client(@id, topic) { |data| update(topic, data) }
end
end
def update(topic, data)
@data[topic] = data
puts "Widget #{@id} updated #{topic}: #{data}"
end
def current_data
@data
end
end
# Simulation
broker = DataBroker.new
# Different widgets subscribe to different metrics
sales_widget = DashboardWidget.new("sales_01", broker, ["sales.total", "sales.today"])
traffic_widget = DashboardWidget.new("traffic_01", broker, ["traffic.visitors", "traffic.pageviews"])
system_widget = DashboardWidget.new("system_01", broker, ["system.cpu", "system.memory"])
# Data sources publish updates
broker.publish("sales.total", { amount: 125430, currency: "USD" })
broker.publish("sales.today", { amount: 8450, currency: "USD" })
broker.publish("traffic.visitors", { count: 1547, unique: 1203 })
broker.publish("system.cpu", { usage: 45.3, cores: 8 })
# => Widget sales_01 updated sales.total: {:amount=>125430, :currency=>"USD"}
# => Widget sales_01 updated sales.today: {:amount=>8450, :currency=>"USD"}
# => Widget traffic_01 updated traffic.visitors: {:count=>1547, :unique=>1203}
# => Widget system_01 updated system.cpu: {:usage=>45.3, :cores=>8}
Plugin System: Applications with plugin architectures use pub-sub to allow plugins to react to core events:
class PluginBroker
def initialize
@hooks = Hash.new { |h, k| h[k] = [] }
@plugins = []
end
def register_plugin(plugin)
@plugins << plugin
plugin.initialize_hooks(self)
end
def register_hook(event, priority: 50, &callback)
@hooks[event] << { priority: priority, callback: callback }
@hooks[event].sort_by! { |hook| -hook[:priority] }
end
def trigger(event, context = {})
results = []
@hooks[event].each do |hook|
result = hook[:callback].call(context)
results << result
break if result == :stop
end
results
end
end
class Plugin
def initialize(name)
@name = name
end
def initialize_hooks(broker)
# Override in subclasses
end
end
class SecurityPlugin < Plugin
def initialize_hooks(broker)
broker.register_hook(:before_request, priority: 100) do |context|
validate_token(context[:request])
end
broker.register_hook(:after_response) do |context|
log_access(context[:request], context[:response])
end
end
private
def validate_token(request)
if request[:token].nil? || request[:token].empty?
puts "Security: Rejected request without token"
:stop
else
puts "Security: Token validated"
:continue
end
end
def log_access(request, response)
puts "Security: Logged access to #{request[:path]} (#{response[:status]})"
end
end
class CachingPlugin < Plugin
def initialize_hooks(broker)
@cache = {}
broker.register_hook(:before_request, priority: 75) do |context|
check_cache(context)
end
broker.register_hook(:after_response, priority: 90) do |context|
store_cache(context)
end
end
private
def check_cache(context)
cached = @cache[context[:request][:path]]
if cached
puts "Cache: Hit for #{context[:request][:path]}"
context[:response] = cached
:stop
else
puts "Cache: Miss for #{context[:request][:path]}"
:continue
end
end
def store_cache(context)
@cache[context[:request][:path]] = context[:response]
puts "Cache: Stored #{context[:request][:path]}"
end
end
# Application setup
broker = PluginBroker.new
broker.register_plugin(SecurityPlugin.new("security"))
broker.register_plugin(CachingPlugin.new("cache"))
# Simulate request processing
def process_request(broker, request)
context = { request: request, response: nil }
result = broker.trigger(:before_request, context)
return context[:response] if result.include?(:stop)
context[:response] = { status: 200, body: "Data for #{request[:path]}" }
broker.trigger(:after_response, context)
context[:response]
end
# First request
process_request(broker, { path: "/api/users", token: "abc123" })
# => Security: Token validated
# => Cache: Miss for /api/users
# => Cache: Stored /api/users
# => Security: Logged access to /api/users (200)
# Second request (cached)
process_request(broker, { path: "/api/users", token: "abc123" })
# => Security: Token validated
# => Cache: Hit for /api/users
# Unauthorized request
process_request(broker, { path: "/api/admin", token: nil })
# => Security: Rejected request without token
Design Considerations
Selecting publish-subscribe requires evaluating trade-offs between decoupling benefits and complexity costs. Several factors influence whether pub-sub fits a specific architectural scenario.
Decoupling vs. Visibility: Publish-subscribe maximizes decoupling by removing direct dependencies between publishers and subscribers. Publishers emit events without knowledge of downstream consumers, enabling independent evolution of components. However, this decoupling reduces visibility into system behavior. Tracing event flows becomes harder when connections exist implicitly through topics rather than explicitly through method calls. The trade-off becomes significant in systems where understanding causality matters for debugging or compliance.
Direct method calls provide clear execution paths visible in stack traces. Pub-sub distributes execution across multiple independent handlers, making it harder to understand what happens when an event fires. This characteristic makes pub-sub less appropriate for core business logic where explicit control flow aids comprehension.
Temporal Coupling: Synchronous communication couples sender and receiver temporally—the sender waits for the receiver to complete. Pub-sub breaks temporal coupling by default through asynchronous delivery. Publishers continue execution immediately after emitting events without waiting for subscriber processing.
This asynchrony benefits performance when subscriber processing involves slow operations like network calls or file I/O. The publisher avoids blocking on these operations. However, asynchrony complicates error handling and transaction management. Publishers cannot directly handle exceptions from subscribers or coordinate transactional boundaries across event handlers.
Failure Handling Complexity: When a subscriber fails, the publisher typically remains unaware. This independence means publishers cannot retry failed operations or take corrective action. Applications requiring reliable delivery need additional infrastructure: message queues with persistence, retry mechanisms, dead letter queues for failed messages, and monitoring to detect processing failures.
Simple in-process pub-sub implementations often use fire-and-forget semantics where delivery failures go unnoticed. This approach works for non-critical events like analytics tracking but fails for events requiring guaranteed processing like payment confirmations.
Ordering Guarantees: Pub-sub implementations vary in message ordering guarantees. Simple implementations deliver messages to subscribers in publication order within a single topic. However, when multiple publishers emit to the same topic concurrently, message ordering becomes undefined. Distributed pub-sub systems like message queues may deliver messages out of order due to network latency, partitioning, or retry logic.
Applications requiring strict ordering need either to serialize all publications to a topic or implement application-level sequence numbers and reordering logic in subscribers. This complexity makes pub-sub less suitable for scenarios where event sequence matters critically.
Testing Complexity: Direct method calls enable straightforward testing through mocking or stubbing dependencies. Pub-sub testing requires setting up brokers, registering test subscribers, and verifying asynchronous event delivery. Integration tests become more complex when multiple subscribers interact through events.
Testing individual publishers and subscribers in isolation remains straightforward. Testing the complete system behavior requires infrastructure to trigger events and verify subscriber actions. This overhead makes pub-sub less attractive for simple scenarios where a few direct method calls would suffice.
When Pub-Sub Fits:
The pattern suits scenarios with these characteristics:
- Multiple components need to react to the same event
- Components should evolve independently without coordinating changes
- Event producers should not depend on consumers
- The system benefits from runtime composition of behaviors
- Asynchronous processing improves performance
- Component reuse across different contexts matters
When Alternatives Work Better:
Direct method calls or dependency injection work better when:
- Event flows require clear visibility for debugging
- Operations need synchronous error handling
- Transaction boundaries span event handlers
- Strict message ordering matters
- Testing complexity outweighs decoupling benefits
- The system has simple, stable component relationships
Design Patterns Comparison:
# Direct method call - tight coupling, clear flow
class OrderProcessor
def initialize(payment_processor, inventory_manager)
@payment_processor = payment_processor
@inventory_manager = inventory_manager
end
def process(order)
@payment_processor.charge(order)
@inventory_manager.reserve(order.items)
end
end
# Pub-sub - loose coupling, implicit flow
class OrderProcessor
def initialize(event_bus)
@event_bus = event_bus
end
def process(order)
@event_bus.publish(:order_placed, order)
end
end
The direct method call version makes dependencies explicit and execution flow clear. The pub-sub version decouples OrderProcessor from downstream services but obscures what happens after order placement.
Hybrid Approaches: Many systems combine patterns, using pub-sub for optional cross-cutting concerns while keeping core business logic in direct method calls:
class OrderProcessor
def initialize(payment_processor, inventory_manager, event_bus)
@payment_processor = payment_processor
@inventory_manager = inventory_manager
@event_bus = event_bus
end
def process(order)
# Critical path uses direct calls
@payment_processor.charge(order)
@inventory_manager.reserve(order.items)
# Optional features use pub-sub
@event_bus.publish(:order_placed, order)
order
end
end
This hybrid keeps payment and inventory operations explicit while allowing optional subscribers like analytics or notifications to attach through events.
Common Patterns
Several established patterns build on the publish-subscribe foundation to solve recurring architectural challenges. These patterns extend basic pub-sub with additional capabilities or constraints.
Event Sourcing Integration: Event sourcing stores system state as a sequence of events. Combining event sourcing with pub-sub creates systems where the event store acts as the ultimate source of truth, and pub-sub broadcasts state changes to interested parties:
class EventStore
def initialize(broker)
@events = []
@broker = broker
end
def append(event_type, data)
event = {
id: generate_id,
type: event_type,
data: data,
timestamp: Time.now,
version: @events.size + 1
}
@events << event
@broker.publish(event_type, event)
event
end
def replay(from_version: 0)
@events[from_version..-1].each do |event|
@broker.publish(event[:type], event)
end
end
def events_of_type(type)
@events.select { |e| e[:type] == type }
end
private
def generate_id
"evt_#{Time.now.to_i}_#{rand(10000)}"
end
end
class AccountProjection
attr_reader :balance
def initialize(broker, event_store)
@balance = 0
@broker = broker
broker.subscribe(:account_credited) { |event| credit(event) }
broker.subscribe(:account_debited) { |event| debit(event) }
# Rebuild state from event store
event_store.replay
end
private
def credit(event)
@balance += event[:data][:amount]
end
def debit(event)
@balance -= event[:data][:amount]
end
end
broker = EventBroker.new
store = EventStore.new(broker)
# Create projection
account = AccountProjection.new(broker, store)
# Append events
store.append(:account_credited, { amount: 100 })
store.append(:account_debited, { amount: 30 })
account.balance # => 70
Request-Reply Pattern: While pub-sub typically operates one-way, request-reply adds response capabilities using correlation IDs and reply topics:
class RequestReplyBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
@pending_requests = {}
end
def subscribe(topic, &handler)
@subscriptions[topic] << handler
end
def request(topic, request_data, timeout: 5)
correlation_id = generate_id
reply_topic = "reply_#{correlation_id}"
# Setup reply handler
promise = nil
subscribe(reply_topic) do |response|
promise = response
end
# Send request
request = request_data.merge(
correlation_id: correlation_id,
reply_to: reply_topic
)
publish(topic, request)
# Wait for reply
start_time = Time.now
until promise || (Time.now - start_time > timeout)
sleep 0.01
end
raise "Request timeout" unless promise
promise
end
def reply(request, response_data)
publish(request[:reply_to], response_data)
end
def publish(topic, data)
@subscriptions[topic].each { |handler| handler.call(data) }
end
private
def generate_id
"req_#{Time.now.to_i}_#{rand(10000)}"
end
end
broker = RequestReplyBroker.new
# Service handles requests
broker.subscribe(:user_query) do |request|
user = { id: request[:user_id], name: "User#{request[:user_id]}", email: "user#{request[:user_id]}@example.com" }
broker.reply(request, user)
end
# Client makes request
response = broker.request(:user_query, { user_id: 123 })
# => {:id=>123, :name=>"User123", :email=>"user123@example.com"}
Priority-Based Delivery: Some events require processing before others. Priority-based pub-sub enables ordered handler execution:
class PriorityBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
end
def subscribe(topic, priority: 0, &handler)
@subscriptions[topic] << { priority: priority, handler: handler }
@subscriptions[topic].sort_by! { |sub| -sub[:priority] }
end
def publish(topic, event)
@subscriptions[topic].each do |sub|
sub[:handler].call(event)
end
end
end
broker = PriorityBroker.new
broker.subscribe(:request, priority: 100) { |e| puts "Authentication" }
broker.subscribe(:request, priority: 75) { |e| puts "Rate limiting" }
broker.subscribe(:request, priority: 50) { |e| puts "Logging" }
broker.subscribe(:request, priority: 0) { |e| puts "Request processing" }
broker.publish(:request, { path: "/api/data" })
# => Authentication
# => Rate limiting
# => Logging
# => Request processing
Filtered Subscriptions: Subscribers may want only specific events within a topic based on event attributes:
class FilteringBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
end
def subscribe(topic, filter: nil, &handler)
@subscriptions[topic] << {
filter: filter || proc { true },
handler: handler
}
end
def publish(topic, event)
@subscriptions[topic].each do |sub|
sub[:handler].call(event) if sub[:filter].call(event)
end
end
end
broker = FilteringBroker.new
# Subscribe with filters
broker.subscribe(:order, filter: ->(e) { e[:total] > 100 }) do |order|
puts "High-value order: $#{order[:total]}"
end
broker.subscribe(:order, filter: ->(e) { e[:country] == "US" }) do |order|
puts "US order: #{order[:id]}"
end
broker.subscribe(:order) do |order|
puts "All orders: #{order[:id]}"
end
broker.publish(:order, { id: 1, total: 150, country: "US" })
# => High-value order: $150
# => US order: 1
# => All orders: 1
broker.publish(:order, { id: 2, total: 50, country: "US" })
# => US order: 2
# => All orders: 2
Dead Letter Channel: Failed message processing needs special handling to prevent message loss:
class ResilientBroker
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
@dead_letters = []
@max_retries = 3
end
def subscribe(topic, &handler)
@subscriptions[topic] << handler
end
def publish(topic, event)
@subscriptions[topic].each do |handler|
attempt = 0
begin
attempt += 1
handler.call(event)
rescue StandardError => e
if attempt < @max_retries
retry
else
handle_dead_letter(topic, event, e)
end
end
end
end
def dead_letters
@dead_letters
end
private
def handle_dead_letter(topic, event, error)
@dead_letters << {
topic: topic,
event: event,
error: error.message,
timestamp: Time.now
}
puts "Dead letter: #{topic} - #{error.message}"
end
end
broker = ResilientBroker.new
broker.subscribe(:data_process) do |event|
raise "Processing failed" if event[:corrupt]
puts "Processed: #{event[:id]}"
end
broker.publish(:data_process, { id: 1, corrupt: false })
# => Processed: 1
broker.publish(:data_process, { id: 2, corrupt: true })
# => Dead letter: data_process - Processing failed
broker.dead_letters.size # => 1
Implementation Approaches
Different architectural contexts demand different pub-sub implementation strategies. The choice depends on scale, reliability requirements, and system distribution.
In-Process Event Bus: Single-process applications use in-memory brokers for simple, fast event distribution within an application boundary. This approach minimizes latency and eliminates external dependencies but limits scalability to single-process execution.
In-process implementations work well for desktop applications, command-line tools, or monolithic web applications where all components run in the same process. Events distribute through direct method calls wrapped in pub-sub semantics. The broker maintains subscriber lists in memory and invokes handlers synchronously or asynchronously using threads or concurrent data structures.
Performance characteristics favor in-process pub-sub when message volume stays moderate and event processing completes quickly. Synchronous handler execution blocks publishers until all subscribers complete, creating backpressure when subscribers perform slow operations. Asynchronous execution using thread pools or actor models prevents blocking but adds complexity in error handling and testing.
Message Queue Integration: Distributed systems requiring reliable, persistent message delivery integrate with message queue systems like RabbitMQ, Redis, or Amazon SQS. These systems provide durability, delivery guarantees, and multi-consumer distribution across process boundaries.
Message queues decouple publishers and subscribers temporally and spatially—publishers and subscribers can run on different hosts, start and stop independently, and process messages at different rates. The queue persists messages to disk, ensuring delivery even if subscribers crash or disconnect temporarily.
This approach introduces operational complexity through queue server deployment and monitoring. Message serialization overhead increases latency compared to in-process delivery. However, the reliability and scalability benefits outweigh these costs in distributed architectures requiring guaranteed message delivery.
HTTP Webhooks: Systems exposing events to external parties or third-party integrations often implement pub-sub through HTTP webhooks. Publishers POST event data to subscriber-provided HTTP endpoints when events occur.
Webhooks work across organizational boundaries where subscribers control their infrastructure independently. The pattern enables event-driven integration between services without shared message queue infrastructure. However, webhooks require retry logic, failure handling, and subscriber endpoint validation to handle network issues and ensure reliable delivery.
Database as Event Store: Applications needing event history or audit trails implement pub-sub by writing events to a database table and polling or using database triggers to notify subscribers. This approach combines event persistence with pub-sub distribution.
Database storage provides queryable event history and enables event replay for rebuilding projections or debugging. Polling introduces latency between event publication and subscriber notification. Database triggers or change data capture mechanisms reduce latency but add database complexity.
Hybrid Architecture: Production systems often combine approaches, using in-process pub-sub for low-latency local events while publishing critical events to message queues for reliable cross-service communication:
class HybridBroker
def initialize(local_broker, message_queue)
@local = local_broker
@queue = message_queue
end
def subscribe_local(topic, &handler)
@local.subscribe(topic, &handler)
end
def subscribe_distributed(topic, &handler)
@queue.subscribe(topic, &handler)
end
def publish_local(topic, event)
@local.publish(topic, event)
end
def publish_distributed(topic, event)
@queue.publish(topic, event)
end
def publish_both(topic, event)
publish_local(topic, event)
publish_distributed(topic, event)
end
end
This pattern routes high-frequency, non-critical events through the local broker while sending critical business events requiring durability and cross-service delivery through the message queue. The separation balances performance and reliability based on event criticality.
Stream Processing: High-throughput applications requiring real-time analytics or complex event processing integrate pub-sub with stream processing platforms like Apache Kafka or AWS Kinesis. These systems optimize for high-volume event streams with partitioning, parallel processing, and exactly-once delivery semantics.
Stream processing enables operations like windowed aggregations, event correlation, and stateful transformations on event streams. The architecture suits applications processing millions of events per second with low latency requirements like real-time monitoring, fraud detection, or recommendation systems.
Reference
Core Components
| Component | Responsibility | Implementation Options |
|---|---|---|
| Publisher | Emits events to topics | Service objects, domain models, controllers |
| Subscriber | Processes published events | Handler objects, service objects, workers |
| Broker | Routes events from publishers to subscribers | In-process object, message queue, event store |
| Topic | Named channel for related events | String identifier, hierarchical namespace, pattern |
| Event | Message payload | Hash, object, serialized data |
Message Delivery Semantics
| Semantic | Guarantee | Use Case |
|---|---|---|
| At-most-once | Delivered zero or one time, may lose messages | Non-critical analytics, logging |
| At-least-once | Delivered one or more times, duplicates possible | Idempotent operations, event processing |
| Exactly-once | Delivered exactly one time | Financial transactions, state changes |
| Best-effort | No guarantees, fire-and-forget | Low-priority notifications |
Topic Naming Conventions
| Convention | Example | Description |
|---|---|---|
| Dot notation | user.account.created | Hierarchical namespace with segments |
| Colon notation | user:login | Category and action pairs |
| Slash notation | /orders/placed | Path-like structure |
| Event suffix | OrderPlacedEvent | Class-based naming |
Pattern Variations
| Pattern | Characteristics | When to Use |
|---|---|---|
| Fan-out | One publisher, multiple subscribers | Event broadcast, notifications |
| Fan-in | Multiple publishers, one subscriber | Log aggregation, monitoring |
| Request-reply | Synchronous request with response | Remote procedure calls, queries |
| Filtered | Subscribers receive subset of events | Content-based routing |
| Priority | Events processed in priority order | Rate limiting, authentication |
Common Event Types
| Event Type | Example | Typical Data |
|---|---|---|
| Domain event | OrderPlaced | Aggregate ID, state changes |
| Integration event | PaymentProcessed | External system response |
| System event | ApplicationStarted | Timestamp, version |
| Command event | ProcessRefund | Action to perform, parameters |
Ruby Gems
| Gem | Purpose | Features |
|---|---|---|
| Wisper | Object-level pub-sub | Clean API, global subscriptions, async options |
| Concurrent-ruby | Thread-safe structures | Async promises, actors, channels |
| Bunny | RabbitMQ client | Reliable delivery, routing, persistence |
| Redis | Message broker | Pub-sub commands, streams, persistence |
Error Handling Strategies
| Strategy | Approach | Trade-offs |
|---|---|---|
| Ignore | Log and continue | Simple but loses messages |
| Retry | Attempt delivery N times | May succeed eventually, delays processing |
| Dead letter | Move to failure queue | Preserves messages, requires monitoring |
| Circuit breaker | Stop after threshold | Prevents cascade failures |
Testing Approaches
| Approach | Technique | Example |
|---|---|---|
| Spy subscriber | Record received events | Verify event data and call count |
| Mock broker | Replace with test double | Control event delivery timing |
| Synchronous mode | Execute handlers inline | Deterministic test execution |
| Event capture | Collect published events | Assert on event sequence |
Performance Characteristics
| Factor | Impact | Mitigation |
|---|---|---|
| Synchronous delivery | Blocks publisher | Use async delivery, thread pools |
| Handler count | Linear time increase | Prioritize handlers, filter subscriptions |
| Event size | Memory and serialization cost | Minimize payload, use references |
| Queue depth | Memory consumption | Backpressure, rate limiting |