CrackedRuby CrackedRuby

Publish-Subscribe Pattern

Overview

The Publish-Subscribe pattern establishes a one-to-many communication model where publishers emit messages without knowledge of subscribers, and subscribers receive messages without knowledge of publishers. An intermediary component, typically called a broker or event bus, mediates the flow of messages between these parties. This decoupling creates systems where components interact through events rather than direct method calls.

The pattern originated in message-oriented middleware systems during the 1980s and gained prominence with distributed systems and event-driven architectures. Unlike the Observer pattern where subjects maintain references to observers, publish-subscribe introduces a complete separation where publishers and subscribers operate independently of each other's existence, lifecycle, or location.

The core mechanism works through topics or channels. Publishers send messages to named topics, and subscribers register interest in specific topics. The broker matches publications to subscriptions and delivers messages accordingly. This indirection enables dynamic system composition where publishers and subscribers can be added or removed without affecting other components.

# Simple pub-sub demonstration
class EventBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
  end

  def subscribe(topic, &handler)
    @subscriptions[topic] << handler
  end

  def publish(topic, data)
    @subscriptions[topic].each { |handler| handler.call(data) }
  end
end

broker = EventBroker.new
broker.subscribe(:user_created) { |user| puts "Welcome email sent to #{user[:email]}" }
broker.subscribe(:user_created) { |user| puts "Analytics tracked: #{user[:id]}" }

broker.publish(:user_created, { id: 123, email: "user@example.com" })
# => Welcome email sent to user@example.com
# => Analytics tracked: 123

The pattern applies across different scales, from in-process event handling to distributed message queues spanning multiple services. Each implementation shares the core principle of decoupled communication through named topics.

Key Principles

The publish-subscribe pattern operates on several fundamental principles that distinguish it from other messaging patterns. These principles govern how messages flow through the system and how components interact.

Decoupling: Publishers and subscribers remain independent. Publishers emit events without knowing which subscribers exist or how many will receive the message. Subscribers register for topics without knowing which publishers will produce messages. This separation allows components to evolve independently and reduces interdependencies across the system.

Topic-based routing: Messages travel through named channels called topics. Publishers address messages to topics rather than specific recipients. Subscribers express interest in topics rather than connecting to specific publishers. The broker uses topic names to determine which subscribers receive each message. Topic hierarchies enable pattern-based subscriptions where a single subscription can match multiple related topics.

Asynchronous communication: Message delivery typically occurs asynchronously. Publishers continue execution immediately after emitting a message without waiting for subscriber processing. Subscribers process messages when they arrive, independent of publisher timing. This temporal decoupling prevents publishers from blocking on slow subscribers and enables parallel message processing.

One-to-many delivery: A single published message reaches multiple subscribers. The broker duplicates messages for each matching subscription. This multicast behavior differs from point-to-point messaging where each message has exactly one recipient. The number of message copies depends on subscriber registrations at the time of publication.

Dynamic subscription: Subscribers can register and unregister from topics at runtime. New subscribers immediately begin receiving messages for their registered topics. Removed subscribers stop receiving messages. This dynamic behavior enables runtime system reconfiguration without restarting components or modifying publisher code.

Message immutability: Published messages typically remain unchanged during delivery. Each subscriber receives the same message content. This principle prevents subscribers from interfering with each other through message modification. Immutable messages also enable features like message replay and audit trails.

The broker component centralizes routing logic and maintains subscription state. Brokers range from simple in-process objects to distributed message queues with persistence and delivery guarantees. The broker's responsibilities include:

  • Maintaining topic-to-subscriber mappings
  • Routing messages from publishers to matching subscribers
  • Managing subscriber lifecycles and handling unsubscriptions
  • Enforcing message delivery semantics (at-most-once, at-least-once, exactly-once)
  • Providing message persistence and replay capabilities in advanced implementations

Message flow follows a consistent sequence: publishers emit messages to topics, the broker matches topics to subscriptions, the broker delivers messages to matched subscribers, and subscribers process messages independently. Each step operates independently without blocking other steps.

# Demonstrating core principles
class TopicBroker
  def initialize
    @topics = Hash.new { |h, k| h[k] = [] }
  end

  def subscribe(topic, subscriber)
    @topics[topic] << subscriber unless @topics[topic].include?(subscriber)
  end

  def unsubscribe(topic, subscriber)
    @topics[topic].delete(subscriber)
  end

  def publish(topic, message)
    @topics[topic].each do |subscriber|
      subscriber.handle(message.dup) # Message isolation
    end
  end

  def subscriber_count(topic)
    @topics[topic].size
  end
end

class Subscriber
  attr_reader :received

  def initialize(name)
    @name = name
    @received = []
  end

  def handle(message)
    @received << message
    message[:processed_by] = @name # Isolated copy allows modification
  end
end

broker = TopicBroker.new
sub1 = Subscriber.new("Analytics")
sub2 = Subscriber.new("Logger")

broker.subscribe(:order_placed, sub1)
broker.subscribe(:order_placed, sub2)

broker.publish(:order_placed, { order_id: 456, total: 99.99 })
# => Both subscribers receive independent message copies

sub1.received.first[:processed_by]  # => "Analytics"
sub2.received.first[:processed_by]  # => "Logger"

Ruby Implementation

Ruby provides multiple approaches for implementing publish-subscribe patterns, from the built-in Observable module to custom implementations and third-party gems. Each approach offers different trade-offs between simplicity and features.

Observable Module: Ruby's standard library includes an Observable module that implements a basic observer pattern. While technically observer rather than pure pub-sub, it demonstrates the core concepts:

require 'observer'

class EventPublisher
  include Observable

  def emit_event(event_type, data)
    changed
    notify_observers(event_type, data)
  end
end

class EventSubscriber
  def initialize(name)
    @name = name
  end

  def update(event_type, data)
    puts "#{@name} received #{event_type}: #{data}"
  end
end

publisher = EventPublisher.new
subscriber1 = EventSubscriber.new("Subscriber A")
subscriber2 = EventSubscriber.new("Subscriber B")

publisher.add_observer(subscriber1)
publisher.add_observer(subscriber2)

publisher.emit_event(:user_login, { user_id: 789 })
# => Subscriber A received user_login: {:user_id=>789}
# => Subscriber B received user_login: {:user_id=>789}

The Observable module has limitations: it couples observers to a single observable, doesn't support topic-based routing, and requires observers to implement an update method. For true pub-sub with topics, custom implementations work better.

Custom Topic-Based Implementation: A topic-based broker implementation provides more flexibility:

class PubSubBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
    @subscription_ids = {}
  end

  def subscribe(topic, &block)
    subscription_id = generate_id
    @subscriptions[topic] << { id: subscription_id, handler: block }
    @subscription_ids[subscription_id] = topic
    subscription_id
  end

  def unsubscribe(subscription_id)
    topic = @subscription_ids.delete(subscription_id)
    return unless topic

    @subscriptions[topic].reject! { |sub| sub[:id] == subscription_id }
  end

  def publish(topic, event)
    @subscriptions[topic].each do |subscription|
      subscription[:handler].call(event)
    end
  end

  def clear_topic(topic)
    @subscriptions[topic].each { |sub| @subscription_ids.delete(sub[:id]) }
    @subscriptions.delete(topic)
  end

  private

  def generate_id
    "sub_#{Time.now.to_i}_#{rand(10000)}"
  end
end

broker = PubSubBroker.new

# Subscribe to different topics
analytics_sub = broker.subscribe(:page_view) do |event|
  puts "Analytics: Page #{event[:page]} viewed"
end

logger_sub = broker.subscribe(:page_view) do |event|
  puts "Logger: [#{Time.now}] #{event[:page]}"
end

error_sub = broker.subscribe(:error) do |event|
  puts "Error handler: #{event[:message]}"
end

broker.publish(:page_view, { page: "/dashboard", user_id: 123 })
# => Analytics: Page /dashboard viewed
# => Logger: [2025-01-15 10:30:00 -0500] /dashboard

broker.publish(:error, { message: "Database timeout" })
# => Error handler: Database timeout

broker.unsubscribe(logger_sub)
broker.publish(:page_view, { page: "/profile", user_id: 123 })
# => Analytics: Page /profile viewed
# (Logger no longer receives events)

Wildcard Topic Matching: Advanced implementations support pattern-based topic subscriptions:

class PatternBroker
  def initialize
    @subscriptions = []
  end

  def subscribe(pattern, &block)
    subscription_id = generate_id
    regex = pattern_to_regex(pattern)
    @subscriptions << { id: subscription_id, pattern: regex, handler: block }
    subscription_id
  end

  def publish(topic, event)
    @subscriptions.each do |sub|
      sub[:handler].call(topic, event) if sub[:pattern].match?(topic)
    end
  end

  def unsubscribe(subscription_id)
    @subscriptions.reject! { |sub| sub[:id] == subscription_id }
  end

  private

  def pattern_to_regex(pattern)
    # Convert wildcard pattern to regex
    # * matches single segment, ** matches multiple segments
    escaped = Regexp.escape(pattern)
    regex_pattern = escaped.gsub('\*\*', '.*').gsub('\*', '[^.]+')
    Regexp.new("^#{regex_pattern}$")
  end

  def generate_id
    "sub_#{Time.now.to_i}_#{rand(10000)}"
  end
end

broker = PatternBroker.new

# Subscribe to patterns
broker.subscribe("user.*") do |topic, event|
  puts "User event on #{topic}: #{event[:action]}"
end

broker.subscribe("user.*.created") do |topic, event|
  puts "Creation event: #{event[:entity]}"
end

broker.subscribe("**") do |topic, event|
  puts "Audit: #{topic}"
end

broker.publish("user.account.created", { entity: "Account", action: "create" })
# => User event on user.account.created: create
# => Creation event: Account
# => Audit: user.account.created

broker.publish("order.placed", { order_id: 999 })
# => Audit: order.placed

Thread-Safe Implementation: For concurrent access, thread safety becomes critical:

require 'monitor'

class ThreadSafeBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
    @monitor = Monitor.new
  end

  def subscribe(topic, &block)
    @monitor.synchronize do
      subscription_id = generate_id
      @subscriptions[topic] << { id: subscription_id, handler: block }
      subscription_id
    end
  end

  def publish(topic, event)
    handlers = @monitor.synchronize do
      @subscriptions[topic].map { |sub| sub[:handler] }
    end

    # Execute handlers outside the lock to prevent deadlock
    handlers.each { |handler| handler.call(event) }
  end

  def unsubscribe(subscription_id)
    @monitor.synchronize do
      @subscriptions.each do |topic, subs|
        subs.reject! { |sub| sub[:id] == subscription_id }
      end
    end
  end

  private

  def generate_id
    "sub_#{Time.now.to_i}_#{rand(10000)}"
  end
end

Wisper Gem: The Wisper gem provides a production-ready pub-sub implementation:

# gem install wisper
require 'wisper'

class OrderProcessor
  include Wisper::Publisher

  def process_order(order)
    # Business logic
    validate_order(order)
    charge_payment(order)

    # Publish events
    broadcast(:order_processed, order)
    broadcast(:inventory_updated, order.items)
  end

  private

  def validate_order(order)
    # Validation logic
  end

  def charge_payment(order)
    # Payment logic
  end
end

class EmailNotifier
  def order_processed(order)
    puts "Sending confirmation email for order #{order.id}"
  end
end

class InventoryManager
  def inventory_updated(items)
    puts "Updating inventory for #{items.size} items"
  end
end

processor = OrderProcessor.new
processor.subscribe(EmailNotifier.new)
processor.subscribe(InventoryManager.new)

order = OpenStruct.new(id: 123, items: [{sku: "ABC", qty: 2}])
processor.process_order(order)
# => Sending confirmation email for order 123
# => Updating inventory for 1 items

Practical Examples

The publish-subscribe pattern applies to numerous real-world scenarios across different application types. These examples demonstrate how the pattern solves specific architectural challenges.

E-commerce Order Processing: Order placement triggers multiple independent operations:

class OrderBroker
  def initialize
    @handlers = Hash.new { |h, k| h[k] = [] }
  end

  def on(event, &block)
    @handlers[event] << block
  end

  def emit(event, data)
    @handlers[event].each { |handler| handler.call(data) }
  end
end

class OrderService
  attr_reader :broker

  def initialize(broker)
    @broker = broker
  end

  def place_order(order_data)
    order = create_order(order_data)
    @broker.emit(:order_placed, order)
    order
  end

  private

  def create_order(data)
    { id: rand(10000), items: data[:items], total: data[:total], customer_id: data[:customer_id] }
  end
end

class PaymentProcessor
  def initialize(broker)
    broker.on(:order_placed) { |order| process_payment(order) }
  end

  def process_payment(order)
    puts "Processing payment of $#{order[:total]} for order #{order[:id]}"
    # Payment gateway integration
  end
end

class InventoryService
  def initialize(broker)
    broker.on(:order_placed) { |order| update_inventory(order) }
  end

  def update_inventory(order)
    puts "Reserving inventory for order #{order[:id]}: #{order[:items].size} items"
    # Inventory database update
  end
end

class NotificationService
  def initialize(broker)
    broker.on(:order_placed) { |order| send_confirmation(order) }
  end

  def send_confirmation(order)
    puts "Sending order confirmation for #{order[:id]} to customer #{order[:customer_id]}"
    # Email/SMS service integration
  end
end

class AnalyticsTracker
  def initialize(broker)
    broker.on(:order_placed) { |order| track_order(order) }
  end

  def track_order(order)
    puts "Recording analytics: order_value=$#{order[:total]}, items=#{order[:items].size}"
    # Analytics platform API call
  end
end

# Wire up the system
broker = OrderBroker.new
order_service = OrderService.new(broker)
payment = PaymentProcessor.new(broker)
inventory = InventoryService.new(broker)
notifications = NotificationService.new(broker)
analytics = AnalyticsTracker.new(broker)

# Place an order
order = order_service.place_order(
  items: [{ sku: "WIDGET-1", quantity: 2, price: 29.99 }],
  total: 59.98,
  customer_id: 456
)
# => Processing payment of $59.98 for order 7834
# => Reserving inventory for order 7834: 1 items
# => Sending order confirmation for 7834 to customer 456
# => Recording analytics: order_value=$59.98, items=1

This architecture enables adding new order-related functionality without modifying the OrderService. New services simply subscribe to the order_placed event.

Real-time Dashboard Updates: Applications with multiple dashboard widgets subscribe to relevant data changes:

class DataBroker
  def initialize
    @topics = Hash.new { |h, k| h[k] = [] }
    @client_connections = {}
  end

  def connect_client(client_id)
    @client_connections[client_id] = []
    client_id
  end

  def subscribe_client(client_id, topic, &callback)
    subscription = { client: client_id, callback: callback }
    @topics[topic] << subscription
    @client_connections[client_id] << { topic: topic, subscription: subscription }
  end

  def disconnect_client(client_id)
    @client_connections[client_id]&.each do |sub_info|
      @topics[sub_info[:topic]].delete(sub_info[:subscription])
    end
    @client_connections.delete(client_id)
  end

  def publish(topic, data)
    @topics[topic].each do |subscription|
      subscription[:callback].call(data)
    end
  end
end

class DashboardWidget
  def initialize(id, broker, topics)
    @id = id
    @broker = broker
    @data = {}
    
    topics.each do |topic|
      @broker.subscribe_client(@id, topic) { |data| update(topic, data) }
    end
  end

  def update(topic, data)
    @data[topic] = data
    puts "Widget #{@id} updated #{topic}: #{data}"
  end

  def current_data
    @data
  end
end

# Simulation
broker = DataBroker.new

# Different widgets subscribe to different metrics
sales_widget = DashboardWidget.new("sales_01", broker, ["sales.total", "sales.today"])
traffic_widget = DashboardWidget.new("traffic_01", broker, ["traffic.visitors", "traffic.pageviews"])
system_widget = DashboardWidget.new("system_01", broker, ["system.cpu", "system.memory"])

# Data sources publish updates
broker.publish("sales.total", { amount: 125430, currency: "USD" })
broker.publish("sales.today", { amount: 8450, currency: "USD" })
broker.publish("traffic.visitors", { count: 1547, unique: 1203 })
broker.publish("system.cpu", { usage: 45.3, cores: 8 })

# => Widget sales_01 updated sales.total: {:amount=>125430, :currency=>"USD"}
# => Widget sales_01 updated sales.today: {:amount=>8450, :currency=>"USD"}
# => Widget traffic_01 updated traffic.visitors: {:count=>1547, :unique=>1203}
# => Widget system_01 updated system.cpu: {:usage=>45.3, :cores=>8}

Plugin System: Applications with plugin architectures use pub-sub to allow plugins to react to core events:

class PluginBroker
  def initialize
    @hooks = Hash.new { |h, k| h[k] = [] }
    @plugins = []
  end

  def register_plugin(plugin)
    @plugins << plugin
    plugin.initialize_hooks(self)
  end

  def register_hook(event, priority: 50, &callback)
    @hooks[event] << { priority: priority, callback: callback }
    @hooks[event].sort_by! { |hook| -hook[:priority] }
  end

  def trigger(event, context = {})
    results = []
    @hooks[event].each do |hook|
      result = hook[:callback].call(context)
      results << result
      break if result == :stop
    end
    results
  end
end

class Plugin
  def initialize(name)
    @name = name
  end

  def initialize_hooks(broker)
    # Override in subclasses
  end
end

class SecurityPlugin < Plugin
  def initialize_hooks(broker)
    broker.register_hook(:before_request, priority: 100) do |context|
      validate_token(context[:request])
    end

    broker.register_hook(:after_response) do |context|
      log_access(context[:request], context[:response])
    end
  end

  private

  def validate_token(request)
    if request[:token].nil? || request[:token].empty?
      puts "Security: Rejected request without token"
      :stop
    else
      puts "Security: Token validated"
      :continue
    end
  end

  def log_access(request, response)
    puts "Security: Logged access to #{request[:path]} (#{response[:status]})"
  end
end

class CachingPlugin < Plugin
  def initialize_hooks(broker)
    @cache = {}

    broker.register_hook(:before_request, priority: 75) do |context|
      check_cache(context)
    end

    broker.register_hook(:after_response, priority: 90) do |context|
      store_cache(context)
    end
  end

  private

  def check_cache(context)
    cached = @cache[context[:request][:path]]
    if cached
      puts "Cache: Hit for #{context[:request][:path]}"
      context[:response] = cached
      :stop
    else
      puts "Cache: Miss for #{context[:request][:path]}"
      :continue
    end
  end

  def store_cache(context)
    @cache[context[:request][:path]] = context[:response]
    puts "Cache: Stored #{context[:request][:path]}"
  end
end

# Application setup
broker = PluginBroker.new
broker.register_plugin(SecurityPlugin.new("security"))
broker.register_plugin(CachingPlugin.new("cache"))

# Simulate request processing
def process_request(broker, request)
  context = { request: request, response: nil }
  
  result = broker.trigger(:before_request, context)
  return context[:response] if result.include?(:stop)

  context[:response] = { status: 200, body: "Data for #{request[:path]}" }
  broker.trigger(:after_response, context)
  
  context[:response]
end

# First request
process_request(broker, { path: "/api/users", token: "abc123" })
# => Security: Token validated
# => Cache: Miss for /api/users
# => Cache: Stored /api/users
# => Security: Logged access to /api/users (200)

# Second request (cached)
process_request(broker, { path: "/api/users", token: "abc123" })
# => Security: Token validated
# => Cache: Hit for /api/users

# Unauthorized request
process_request(broker, { path: "/api/admin", token: nil })
# => Security: Rejected request without token

Design Considerations

Selecting publish-subscribe requires evaluating trade-offs between decoupling benefits and complexity costs. Several factors influence whether pub-sub fits a specific architectural scenario.

Decoupling vs. Visibility: Publish-subscribe maximizes decoupling by removing direct dependencies between publishers and subscribers. Publishers emit events without knowledge of downstream consumers, enabling independent evolution of components. However, this decoupling reduces visibility into system behavior. Tracing event flows becomes harder when connections exist implicitly through topics rather than explicitly through method calls. The trade-off becomes significant in systems where understanding causality matters for debugging or compliance.

Direct method calls provide clear execution paths visible in stack traces. Pub-sub distributes execution across multiple independent handlers, making it harder to understand what happens when an event fires. This characteristic makes pub-sub less appropriate for core business logic where explicit control flow aids comprehension.

Temporal Coupling: Synchronous communication couples sender and receiver temporally—the sender waits for the receiver to complete. Pub-sub breaks temporal coupling by default through asynchronous delivery. Publishers continue execution immediately after emitting events without waiting for subscriber processing.

This asynchrony benefits performance when subscriber processing involves slow operations like network calls or file I/O. The publisher avoids blocking on these operations. However, asynchrony complicates error handling and transaction management. Publishers cannot directly handle exceptions from subscribers or coordinate transactional boundaries across event handlers.

Failure Handling Complexity: When a subscriber fails, the publisher typically remains unaware. This independence means publishers cannot retry failed operations or take corrective action. Applications requiring reliable delivery need additional infrastructure: message queues with persistence, retry mechanisms, dead letter queues for failed messages, and monitoring to detect processing failures.

Simple in-process pub-sub implementations often use fire-and-forget semantics where delivery failures go unnoticed. This approach works for non-critical events like analytics tracking but fails for events requiring guaranteed processing like payment confirmations.

Ordering Guarantees: Pub-sub implementations vary in message ordering guarantees. Simple implementations deliver messages to subscribers in publication order within a single topic. However, when multiple publishers emit to the same topic concurrently, message ordering becomes undefined. Distributed pub-sub systems like message queues may deliver messages out of order due to network latency, partitioning, or retry logic.

Applications requiring strict ordering need either to serialize all publications to a topic or implement application-level sequence numbers and reordering logic in subscribers. This complexity makes pub-sub less suitable for scenarios where event sequence matters critically.

Testing Complexity: Direct method calls enable straightforward testing through mocking or stubbing dependencies. Pub-sub testing requires setting up brokers, registering test subscribers, and verifying asynchronous event delivery. Integration tests become more complex when multiple subscribers interact through events.

Testing individual publishers and subscribers in isolation remains straightforward. Testing the complete system behavior requires infrastructure to trigger events and verify subscriber actions. This overhead makes pub-sub less attractive for simple scenarios where a few direct method calls would suffice.

When Pub-Sub Fits:

The pattern suits scenarios with these characteristics:

  • Multiple components need to react to the same event
  • Components should evolve independently without coordinating changes
  • Event producers should not depend on consumers
  • The system benefits from runtime composition of behaviors
  • Asynchronous processing improves performance
  • Component reuse across different contexts matters

When Alternatives Work Better:

Direct method calls or dependency injection work better when:

  • Event flows require clear visibility for debugging
  • Operations need synchronous error handling
  • Transaction boundaries span event handlers
  • Strict message ordering matters
  • Testing complexity outweighs decoupling benefits
  • The system has simple, stable component relationships

Design Patterns Comparison:

# Direct method call - tight coupling, clear flow
class OrderProcessor
  def initialize(payment_processor, inventory_manager)
    @payment_processor = payment_processor
    @inventory_manager = inventory_manager
  end

  def process(order)
    @payment_processor.charge(order)
    @inventory_manager.reserve(order.items)
  end
end

# Pub-sub - loose coupling, implicit flow
class OrderProcessor
  def initialize(event_bus)
    @event_bus = event_bus
  end

  def process(order)
    @event_bus.publish(:order_placed, order)
  end
end

The direct method call version makes dependencies explicit and execution flow clear. The pub-sub version decouples OrderProcessor from downstream services but obscures what happens after order placement.

Hybrid Approaches: Many systems combine patterns, using pub-sub for optional cross-cutting concerns while keeping core business logic in direct method calls:

class OrderProcessor
  def initialize(payment_processor, inventory_manager, event_bus)
    @payment_processor = payment_processor
    @inventory_manager = inventory_manager
    @event_bus = event_bus
  end

  def process(order)
    # Critical path uses direct calls
    @payment_processor.charge(order)
    @inventory_manager.reserve(order.items)
    
    # Optional features use pub-sub
    @event_bus.publish(:order_placed, order)
    
    order
  end
end

This hybrid keeps payment and inventory operations explicit while allowing optional subscribers like analytics or notifications to attach through events.

Common Patterns

Several established patterns build on the publish-subscribe foundation to solve recurring architectural challenges. These patterns extend basic pub-sub with additional capabilities or constraints.

Event Sourcing Integration: Event sourcing stores system state as a sequence of events. Combining event sourcing with pub-sub creates systems where the event store acts as the ultimate source of truth, and pub-sub broadcasts state changes to interested parties:

class EventStore
  def initialize(broker)
    @events = []
    @broker = broker
  end

  def append(event_type, data)
    event = {
      id: generate_id,
      type: event_type,
      data: data,
      timestamp: Time.now,
      version: @events.size + 1
    }
    
    @events << event
    @broker.publish(event_type, event)
    event
  end

  def replay(from_version: 0)
    @events[from_version..-1].each do |event|
      @broker.publish(event[:type], event)
    end
  end

  def events_of_type(type)
    @events.select { |e| e[:type] == type }
  end

  private

  def generate_id
    "evt_#{Time.now.to_i}_#{rand(10000)}"
  end
end

class AccountProjection
  attr_reader :balance

  def initialize(broker, event_store)
    @balance = 0
    @broker = broker
    
    broker.subscribe(:account_credited) { |event| credit(event) }
    broker.subscribe(:account_debited) { |event| debit(event) }
    
    # Rebuild state from event store
    event_store.replay
  end

  private

  def credit(event)
    @balance += event[:data][:amount]
  end

  def debit(event)
    @balance -= event[:data][:amount]
  end
end

broker = EventBroker.new
store = EventStore.new(broker)

# Create projection
account = AccountProjection.new(broker, store)

# Append events
store.append(:account_credited, { amount: 100 })
store.append(:account_debited, { amount: 30 })

account.balance  # => 70

Request-Reply Pattern: While pub-sub typically operates one-way, request-reply adds response capabilities using correlation IDs and reply topics:

class RequestReplyBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
    @pending_requests = {}
  end

  def subscribe(topic, &handler)
    @subscriptions[topic] << handler
  end

  def request(topic, request_data, timeout: 5)
    correlation_id = generate_id
    reply_topic = "reply_#{correlation_id}"
    
    # Setup reply handler
    promise = nil
    subscribe(reply_topic) do |response|
      promise = response
    end

    # Send request
    request = request_data.merge(
      correlation_id: correlation_id,
      reply_to: reply_topic
    )
    publish(topic, request)

    # Wait for reply
    start_time = Time.now
    until promise || (Time.now - start_time > timeout)
      sleep 0.01
    end

    raise "Request timeout" unless promise
    promise
  end

  def reply(request, response_data)
    publish(request[:reply_to], response_data)
  end

  def publish(topic, data)
    @subscriptions[topic].each { |handler| handler.call(data) }
  end

  private

  def generate_id
    "req_#{Time.now.to_i}_#{rand(10000)}"
  end
end

broker = RequestReplyBroker.new

# Service handles requests
broker.subscribe(:user_query) do |request|
  user = { id: request[:user_id], name: "User#{request[:user_id]}", email: "user#{request[:user_id]}@example.com" }
  broker.reply(request, user)
end

# Client makes request
response = broker.request(:user_query, { user_id: 123 })
# => {:id=>123, :name=>"User123", :email=>"user123@example.com"}

Priority-Based Delivery: Some events require processing before others. Priority-based pub-sub enables ordered handler execution:

class PriorityBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
  end

  def subscribe(topic, priority: 0, &handler)
    @subscriptions[topic] << { priority: priority, handler: handler }
    @subscriptions[topic].sort_by! { |sub| -sub[:priority] }
  end

  def publish(topic, event)
    @subscriptions[topic].each do |sub|
      sub[:handler].call(event)
    end
  end
end

broker = PriorityBroker.new

broker.subscribe(:request, priority: 100) { |e| puts "Authentication" }
broker.subscribe(:request, priority: 75) { |e| puts "Rate limiting" }
broker.subscribe(:request, priority: 50) { |e| puts "Logging" }
broker.subscribe(:request, priority: 0) { |e| puts "Request processing" }

broker.publish(:request, { path: "/api/data" })
# => Authentication
# => Rate limiting
# => Logging
# => Request processing

Filtered Subscriptions: Subscribers may want only specific events within a topic based on event attributes:

class FilteringBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
  end

  def subscribe(topic, filter: nil, &handler)
    @subscriptions[topic] << {
      filter: filter || proc { true },
      handler: handler
    }
  end

  def publish(topic, event)
    @subscriptions[topic].each do |sub|
      sub[:handler].call(event) if sub[:filter].call(event)
    end
  end
end

broker = FilteringBroker.new

# Subscribe with filters
broker.subscribe(:order, filter: ->(e) { e[:total] > 100 }) do |order|
  puts "High-value order: $#{order[:total]}"
end

broker.subscribe(:order, filter: ->(e) { e[:country] == "US" }) do |order|
  puts "US order: #{order[:id]}"
end

broker.subscribe(:order) do |order|
  puts "All orders: #{order[:id]}"
end

broker.publish(:order, { id: 1, total: 150, country: "US" })
# => High-value order: $150
# => US order: 1
# => All orders: 1

broker.publish(:order, { id: 2, total: 50, country: "US" })
# => US order: 2
# => All orders: 2

Dead Letter Channel: Failed message processing needs special handling to prevent message loss:

class ResilientBroker
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
    @dead_letters = []
    @max_retries = 3
  end

  def subscribe(topic, &handler)
    @subscriptions[topic] << handler
  end

  def publish(topic, event)
    @subscriptions[topic].each do |handler|
      attempt = 0
      begin
        attempt += 1
        handler.call(event)
      rescue StandardError => e
        if attempt < @max_retries
          retry
        else
          handle_dead_letter(topic, event, e)
        end
      end
    end
  end

  def dead_letters
    @dead_letters
  end

  private

  def handle_dead_letter(topic, event, error)
    @dead_letters << {
      topic: topic,
      event: event,
      error: error.message,
      timestamp: Time.now
    }
    puts "Dead letter: #{topic} - #{error.message}"
  end
end

broker = ResilientBroker.new

broker.subscribe(:data_process) do |event|
  raise "Processing failed" if event[:corrupt]
  puts "Processed: #{event[:id]}"
end

broker.publish(:data_process, { id: 1, corrupt: false })
# => Processed: 1

broker.publish(:data_process, { id: 2, corrupt: true })
# => Dead letter: data_process - Processing failed

broker.dead_letters.size  # => 1

Implementation Approaches

Different architectural contexts demand different pub-sub implementation strategies. The choice depends on scale, reliability requirements, and system distribution.

In-Process Event Bus: Single-process applications use in-memory brokers for simple, fast event distribution within an application boundary. This approach minimizes latency and eliminates external dependencies but limits scalability to single-process execution.

In-process implementations work well for desktop applications, command-line tools, or monolithic web applications where all components run in the same process. Events distribute through direct method calls wrapped in pub-sub semantics. The broker maintains subscriber lists in memory and invokes handlers synchronously or asynchronously using threads or concurrent data structures.

Performance characteristics favor in-process pub-sub when message volume stays moderate and event processing completes quickly. Synchronous handler execution blocks publishers until all subscribers complete, creating backpressure when subscribers perform slow operations. Asynchronous execution using thread pools or actor models prevents blocking but adds complexity in error handling and testing.

Message Queue Integration: Distributed systems requiring reliable, persistent message delivery integrate with message queue systems like RabbitMQ, Redis, or Amazon SQS. These systems provide durability, delivery guarantees, and multi-consumer distribution across process boundaries.

Message queues decouple publishers and subscribers temporally and spatially—publishers and subscribers can run on different hosts, start and stop independently, and process messages at different rates. The queue persists messages to disk, ensuring delivery even if subscribers crash or disconnect temporarily.

This approach introduces operational complexity through queue server deployment and monitoring. Message serialization overhead increases latency compared to in-process delivery. However, the reliability and scalability benefits outweigh these costs in distributed architectures requiring guaranteed message delivery.

HTTP Webhooks: Systems exposing events to external parties or third-party integrations often implement pub-sub through HTTP webhooks. Publishers POST event data to subscriber-provided HTTP endpoints when events occur.

Webhooks work across organizational boundaries where subscribers control their infrastructure independently. The pattern enables event-driven integration between services without shared message queue infrastructure. However, webhooks require retry logic, failure handling, and subscriber endpoint validation to handle network issues and ensure reliable delivery.

Database as Event Store: Applications needing event history or audit trails implement pub-sub by writing events to a database table and polling or using database triggers to notify subscribers. This approach combines event persistence with pub-sub distribution.

Database storage provides queryable event history and enables event replay for rebuilding projections or debugging. Polling introduces latency between event publication and subscriber notification. Database triggers or change data capture mechanisms reduce latency but add database complexity.

Hybrid Architecture: Production systems often combine approaches, using in-process pub-sub for low-latency local events while publishing critical events to message queues for reliable cross-service communication:

class HybridBroker
  def initialize(local_broker, message_queue)
    @local = local_broker
    @queue = message_queue
  end

  def subscribe_local(topic, &handler)
    @local.subscribe(topic, &handler)
  end

  def subscribe_distributed(topic, &handler)
    @queue.subscribe(topic, &handler)
  end

  def publish_local(topic, event)
    @local.publish(topic, event)
  end

  def publish_distributed(topic, event)
    @queue.publish(topic, event)
  end

  def publish_both(topic, event)
    publish_local(topic, event)
    publish_distributed(topic, event)
  end
end

This pattern routes high-frequency, non-critical events through the local broker while sending critical business events requiring durability and cross-service delivery through the message queue. The separation balances performance and reliability based on event criticality.

Stream Processing: High-throughput applications requiring real-time analytics or complex event processing integrate pub-sub with stream processing platforms like Apache Kafka or AWS Kinesis. These systems optimize for high-volume event streams with partitioning, parallel processing, and exactly-once delivery semantics.

Stream processing enables operations like windowed aggregations, event correlation, and stateful transformations on event streams. The architecture suits applications processing millions of events per second with low latency requirements like real-time monitoring, fraud detection, or recommendation systems.

Reference

Core Components

Component Responsibility Implementation Options
Publisher Emits events to topics Service objects, domain models, controllers
Subscriber Processes published events Handler objects, service objects, workers
Broker Routes events from publishers to subscribers In-process object, message queue, event store
Topic Named channel for related events String identifier, hierarchical namespace, pattern
Event Message payload Hash, object, serialized data

Message Delivery Semantics

Semantic Guarantee Use Case
At-most-once Delivered zero or one time, may lose messages Non-critical analytics, logging
At-least-once Delivered one or more times, duplicates possible Idempotent operations, event processing
Exactly-once Delivered exactly one time Financial transactions, state changes
Best-effort No guarantees, fire-and-forget Low-priority notifications

Topic Naming Conventions

Convention Example Description
Dot notation user.account.created Hierarchical namespace with segments
Colon notation user:login Category and action pairs
Slash notation /orders/placed Path-like structure
Event suffix OrderPlacedEvent Class-based naming

Pattern Variations

Pattern Characteristics When to Use
Fan-out One publisher, multiple subscribers Event broadcast, notifications
Fan-in Multiple publishers, one subscriber Log aggregation, monitoring
Request-reply Synchronous request with response Remote procedure calls, queries
Filtered Subscribers receive subset of events Content-based routing
Priority Events processed in priority order Rate limiting, authentication

Common Event Types

Event Type Example Typical Data
Domain event OrderPlaced Aggregate ID, state changes
Integration event PaymentProcessed External system response
System event ApplicationStarted Timestamp, version
Command event ProcessRefund Action to perform, parameters

Ruby Gems

Gem Purpose Features
Wisper Object-level pub-sub Clean API, global subscriptions, async options
Concurrent-ruby Thread-safe structures Async promises, actors, channels
Bunny RabbitMQ client Reliable delivery, routing, persistence
Redis Message broker Pub-sub commands, streams, persistence

Error Handling Strategies

Strategy Approach Trade-offs
Ignore Log and continue Simple but loses messages
Retry Attempt delivery N times May succeed eventually, delays processing
Dead letter Move to failure queue Preserves messages, requires monitoring
Circuit breaker Stop after threshold Prevents cascade failures

Testing Approaches

Approach Technique Example
Spy subscriber Record received events Verify event data and call count
Mock broker Replace with test double Control event delivery timing
Synchronous mode Execute handlers inline Deterministic test execution
Event capture Collect published events Assert on event sequence

Performance Characteristics

Factor Impact Mitigation
Synchronous delivery Blocks publisher Use async delivery, thread pools
Handler count Linear time increase Prioritize handlers, filter subscriptions
Event size Memory and serialization cost Minimize payload, use references
Queue depth Memory consumption Backpressure, rate limiting