Overview
The Actor Model represents a mathematical model of concurrent computation where actors serve as the universal primitives of concurrent computation. Each actor operates as an isolated computational entity that processes messages sequentially from its mailbox, maintaining its own private state that no other actor can access directly. Communication between actors occurs exclusively through asynchronous message passing, eliminating shared state and the associated synchronization problems that plague traditional concurrent programming.
Carl Hewitt introduced the Actor Model in 1973 as a theoretical framework for concurrent computation. The model addresses fundamental challenges in distributed systems: how to manage state safely across multiple computational units, how to handle failures gracefully, and how to scale systems horizontally. Unlike thread-based concurrency models that rely on locks and shared memory, the Actor Model enforces isolation and asynchronous communication as core architectural constraints.
In the Actor Model, an actor can perform three fundamental operations when it receives a message: create new actors, send messages to other actors it knows about, and determine how to handle the next message. This simplicity belies the model's power for building distributed systems that span multiple machines, containers, or processes. The model naturally maps to distributed environments because actors already communicate through messages rather than shared memory.
The Actor Model gained practical prominence through Erlang, a language designed for telecommunications systems requiring extreme reliability. Modern implementations exist across many languages, including Akka for the JVM, Orleans for .NET, and various Ruby libraries. Each implementation adds practical features like supervision trees, routing strategies, and distributed actor registries while maintaining the core principles of isolation and message passing.
# Conceptual actor structure
class TemperatureSensor
def initialize
@readings = []
@mailbox = Queue.new
start_processing
end
def send_message(message)
@mailbox.push(message)
end
private
def start_processing
Thread.new do
loop do
message = @mailbox.pop
handle_message(message)
end
end
end
def handle_message(message)
case message[:type]
when :record
@readings << message[:temperature]
when :get_average
avg = @readings.sum / @readings.size.to_f
message[:reply_to].send_message({ type: :average, value: avg })
end
end
end
This example shows the basic structure of an actor: a mailbox for receiving messages, isolated state (the readings array), and sequential message processing. The actor never exposes its internal state directly, and all communication happens through messages.
Key Principles
The Actor Model rests on several foundational principles that distinguish it from other concurrency models. Understanding these principles clarifies why the model behaves as it does and what guarantees it provides.
Encapsulation and Isolation: Each actor maintains private state that no other actor can access or modify. This isolation eliminates race conditions and the need for locks. When an actor needs information from another actor's state, it must send a message and wait for a response. This forced decoupling makes actors naturally suitable for distribution across network boundaries, since the programming model remains the same whether actors run in the same process or on different continents.
Asynchronous Message Passing: Actors communicate by sending messages asynchronously. The sending actor does not block waiting for a response; it continues processing other messages. This asynchronicity enables high concurrency and prevents cascading blocking scenarios where one slow actor slows down an entire system. Message sends complete immediately from the sender's perspective, with the message placed in the recipient's mailbox for later processing.
Sequential Message Processing: Each actor processes messages from its mailbox one at a time, in the order received (though some implementations allow priority mailboxes). This sequential processing means the actor never needs to worry about concurrent access to its own state. The actor's internal logic runs single-threaded, dramatically simplifying reasoning about state changes. While the system as a whole runs concurrently through many actors, each individual actor remains sequential.
Location Transparency: A well-designed actor system treats local and remote actors identically. The code sending a message to an actor doesn't know or care whether that actor runs in the same process, a different process on the same machine, or on a remote machine. This transparency enables systems to start small and scale horizontally by distributing actors across machines without changing application code.
Actor Lifecycle: Actors have a defined lifecycle: creation, message processing, and termination. During creation, an actor sets up its initial state. During message processing, it handles messages and modifies its state accordingly. Termination can occur gracefully (the actor decides to stop) or through supervision (another actor terminates it). Modern actor systems add supervision trees, where parent actors monitor child actors and restart them on failure.
# Demonstrating asynchronous message passing
class BankAccount
def initialize(initial_balance)
@balance = initial_balance
@mailbox = Queue.new
@running = true
start_processing
end
def deposit(amount, reply_to)
send_message({ type: :deposit, amount: amount, reply_to: reply_to })
end
def withdraw(amount, reply_to)
send_message({ type: :withdraw, amount: amount, reply_to: reply_to })
end
def stop
send_message({ type: :stop })
end
private
def send_message(message)
@mailbox.push(message)
end
def start_processing
Thread.new do
while @running
message = @mailbox.pop
handle_message(message)
end
end
end
def handle_message(message)
case message[:type]
when :deposit
@balance += message[:amount]
message[:reply_to]&.send_message({
type: :success,
balance: @balance
})
when :withdraw
if @balance >= message[:amount]
@balance -= message[:amount]
message[:reply_to]&.send_message({
type: :success,
balance: @balance
})
else
message[:reply_to]&.send_message({
type: :insufficient_funds,
balance: @balance
})
end
when :stop
@running = false
end
end
end
class AccountManager
def initialize
@mailbox = Queue.new
start_processing
end
def send_message(message)
@mailbox.push(message)
end
private
def start_processing
Thread.new do
loop do
message = @mailbox.pop
puts "Transaction result: #{message[:type]}, Balance: #{message[:balance]}"
end
end
end
end
# Usage
manager = AccountManager.new
account = BankAccount.new(1000)
account.deposit(500, manager)
account.withdraw(200, manager)
account.withdraw(2000, manager) # Will fail
This example demonstrates how actors communicate asynchronously through messages. The bank account actor processes deposits and withdrawals sequentially, ensuring its balance remains consistent without locks. The account manager actor receives responses asynchronously, decoupled from when the account actually processes each transaction.
Message Ordering Guarantees: The Actor Model provides specific ordering guarantees. Messages sent from Actor A to Actor B arrive in the order sent, though messages from different senders have no guaranteed ordering. This property, called per-sender message ordering, simplifies reasoning about message sequences from the same source while allowing flexibility in handling messages from multiple sources.
# Demonstrating message ordering
class OrderProcessor
def initialize
@orders = []
@mailbox = Queue.new
start_processing
end
def send_message(message)
@mailbox.push(message)
end
private
def start_processing
Thread.new do
loop do
message = @mailbox.pop
handle_message(message)
end
end
end
def handle_message(message)
case message[:type]
when :place_order
@orders << message[:order]
puts "Order placed: #{message[:order][:id]}"
when :cancel_order
@orders.reject! { |o| o[:id] == message[:order_id] }
puts "Order cancelled: #{message[:order_id]}"
end
end
end
processor = OrderProcessor.new
# Messages from the same sender arrive in order
processor.send_message({ type: :place_order, order: { id: 1, item: 'Widget' } })
processor.send_message({ type: :place_order, order: { id: 2, item: 'Gadget' } })
processor.send_message({ type: :cancel_order, order_id: 1 })
sleep 0.1 # Give actor time to process
# Output will show orders 1, 2, then cancellation of 1, always in this sequence
Ruby Implementation
Ruby provides several libraries implementing the Actor Model, each with different trade-offs. The Concurrent Ruby gem offers the most widely-used modern implementation, providing both low-level actor primitives and higher-level abstractions. Celluloid was historically popular but is no longer actively maintained. Understanding Ruby's implementation options helps select the right tool for specific requirements.
Concurrent Ruby Actors: The concurrent-ruby gem implements actors through the Concurrent::Actor module. This implementation handles thread management, mailbox queuing, and message dispatch automatically. Actors in Concurrent Ruby support both fire-and-forget messages and request-response patterns through futures.
require 'concurrent'
class Counter < Concurrent::Actor::Context
def initialize(initial_value = 0)
@count = initial_value
end
def on_message(message)
case message
when :increment
@count += 1
@count
when :decrement
@count -= 1
@count
when :value
@count
when [:add, Integer]
@count += message[1]
@count
else
puts "Unknown message: #{message}"
end
end
end
# Create actor
counter = Concurrent::Actor.spawn(Counter, :counter, 10)
# Send fire-and-forget messages
counter << :increment
counter << :increment
counter << [:add, 5]
# Send request and wait for response
result = counter.ask(:value).value
puts "Current count: #{result}" # => 17
# Actors automatically handle concurrency
threads = 10.times.map do
Thread.new do
100.times { counter << :increment }
end
end
threads.each(&:join)
final_count = counter.ask(:value).value
puts "Final count after concurrent increments: #{final_count}" # => 1017
This example shows Concurrent Ruby's actor implementation. The << operator sends asynchronous messages (fire-and-forget), while ask sends a message and returns a future that resolves when the actor responds. The actor handles all messages sequentially, maintaining consistency even under concurrent message sends from multiple threads.
Actor References and Communication: Concurrent Ruby actors communicate through actor references. When creating an actor, the system returns a reference that other code uses to send messages. This reference can be passed to other actors, enabling dynamic communication topologies. The reference itself is thread-safe and can be shared freely across threads.
require 'concurrent'
class Logger < Concurrent::Actor::Context
def on_message(message)
puts "[#{Time.now}] #{message[:level]}: #{message[:text]}"
end
end
class Worker < Concurrent::Actor::Context
def initialize(logger_ref)
@logger = logger_ref
end
def on_message(message)
case message
when :do_work
@logger << { level: 'INFO', text: 'Starting work' }
sleep(rand(0.1..0.5)) # Simulate work
@logger << { level: 'INFO', text: 'Work completed' }
:done
when :error_work
@logger << { level: 'ERROR', text: 'Work failed' }
raise 'Simulated failure'
end
end
end
# Create logger actor
logger = Concurrent::Actor.spawn(Logger, :logger)
# Create worker actors with logger reference
workers = 5.times.map do |i|
Concurrent::Actor.spawn(Worker, "worker_#{i}", logger)
end
# Send work to all workers
workers.each { |w| w << :do_work }
sleep 1 # Let work complete
This example demonstrates passing actor references. Each worker actor receives the logger's reference during initialization and uses it to send log messages. The logger actor remains decoupled from the workers; it doesn't know which actors send it messages.
Supervision and Error Handling: Concurrent Ruby actors support supervision through the supervision strategy pattern. Parent actors can monitor child actors and decide how to handle failures: restart the actor, escalate to a higher-level supervisor, or stop the actor. This capability makes actor systems self-healing.
require 'concurrent'
class DatabaseConnection < Concurrent::Actor::Context
def initialize
connect
end
def on_message(message)
case message
when :query
raise 'Connection lost' if rand < 0.3 # Simulate failures
{ result: 'Data fetched successfully' }
end
end
private
def connect
puts "Connecting to database..."
@connected = true
end
end
class DatabaseSupervisor < Concurrent::Actor::Context
def initialize
@db = spawn_db_connection
end
def on_message(message)
case message
when :query
begin
result = @db.ask!(:query, 1) # Timeout after 1 second
result
rescue => e
puts "Query failed: #{e.message}, restarting connection..."
@db.terminate!
@db = spawn_db_connection
{ error: 'Connection restarted, please retry' }
end
end
end
private
def spawn_db_connection
Concurrent::Actor.spawn(DatabaseConnection, :db_connection)
end
end
supervisor = Concurrent::Actor.spawn(DatabaseSupervisor, :db_supervisor)
# Make several queries, some will fail and trigger restarts
10.times do |i|
result = supervisor.ask(:query).value
puts "Query #{i}: #{result}"
sleep 0.1
end
This example shows a supervisor pattern. The supervisor manages a database connection actor, catching failures and restarting the connection when needed. This pattern creates resilient systems that recover automatically from transient failures.
Actor Pools: For CPU-intensive work, Concurrent Ruby supports actor pools that distribute messages across multiple actor instances. This pattern increases throughput by processing multiple messages concurrently while maintaining actor semantics for each individual actor.
require 'concurrent'
class ImageProcessor < Concurrent::Actor::Context
def on_message(message)
case message
when [:process, String]
filename = message[1]
# Simulate image processing
sleep(rand(0.5..1.5))
{ filename: filename, processed: true, thread: Thread.current.object_id }
end
end
end
# Create pool with 4 workers
pool = Concurrent::Actor.pool(ImageProcessor, 4)
# Process multiple images concurrently
images = 10.times.map { |i| "image_#{i}.jpg" }
futures = images.map do |img|
pool.ask([:process, img])
end
# Wait for all processing to complete
results = futures.map(&:value)
results.each do |result|
puts "#{result[:filename]} processed on thread #{result[:thread]}"
end
# Notice different thread IDs showing concurrent processing
Common Patterns
Actor systems have established patterns that solve recurring design challenges. These patterns provide tested solutions for common scenarios like request aggregation, work distribution, and state management.
Request-Reply Pattern: The most basic actor pattern involves sending a message and expecting a response. The sender includes a reference to itself (or a reply-to actor) in the message, and the receiver sends its response to that reference. This pattern implements synchronous-style communication over the asynchronous actor infrastructure.
require 'concurrent'
class Calculator < Concurrent::Actor::Context
def on_message(message)
case message
when [:add, Numeric, Numeric]
message[1] + message[2]
when [:multiply, Numeric, Numeric]
message[1] * message[2]
when [:divide, Numeric, Numeric]
return { error: 'Division by zero' } if message[2] == 0
message[1] / message[2].to_f
end
end
end
calculator = Concurrent::Actor.spawn(Calculator, :calculator)
# Using ask for request-reply
result = calculator.ask([:add, 10, 5]).value
puts "10 + 5 = #{result}"
result = calculator.ask([:multiply, 4, 7]).value
puts "4 * 7 = #{result}"
result = calculator.ask([:divide, 10, 0]).value
puts "10 / 0 = #{result}" # => {:error=>"Division by zero"}
Router Pattern: A router actor distributes work among a pool of worker actors based on routing logic. Common routing strategies include round-robin, random selection, smallest mailbox (least busy), and content-based routing. This pattern balances load and increases system throughput.
require 'concurrent'
class TaskWorker < Concurrent::Actor::Context
def initialize(worker_id)
@worker_id = worker_id
@tasks_completed = 0
end
def on_message(message)
case message
when [:execute, Hash]
sleep(message[:duration]) # Simulate work
@tasks_completed += 1
{ worker_id: @worker_id, task: message[:name], completed: @tasks_completed }
when :stats
{ worker_id: @worker_id, completed: @tasks_completed }
end
end
end
class RoundRobinRouter < Concurrent::Actor::Context
def initialize(worker_count)
@workers = worker_count.times.map do |i|
Concurrent::Actor.spawn(TaskWorker, "worker_#{i}", i)
end
@current_index = 0
end
def on_message(message)
case message
when [:execute, Hash]
worker = @workers[@current_index]
@current_index = (@current_index + 1) % @workers.size
worker.ask(message).value
when :stats
@workers.map { |w| w.ask(:stats).value }
end
end
end
router = Concurrent::Actor.spawn(RoundRobinRouter, :router, 3)
# Send tasks that will be distributed across workers
tasks = [
{ name: 'Task 1', duration: 0.1 },
{ name: 'Task 2', duration: 0.2 },
{ name: 'Task 3', duration: 0.1 },
{ name: 'Task 4', duration: 0.15 },
{ name: 'Task 5', duration: 0.1 }
]
results = tasks.map { |task| router.ask([:execute, task]).value }
results.each { |r| puts "#{r[:task]} completed by worker #{r[:worker_id]}" }
# Check distribution
stats = router.ask(:stats).value
stats.each { |s| puts "Worker #{s[:worker_id]}: #{s[:completed]} tasks" }
Pipeline Pattern: A pipeline connects actors in sequence, where each actor performs one stage of processing and forwards the result to the next actor. This pattern maps naturally to data transformation workflows and stream processing.
require 'concurrent'
class DataValidator < Concurrent::Actor::Context
def initialize(next_actor)
@next = next_actor
end
def on_message(message)
return { error: 'Invalid data' } unless message.is_a?(Hash) && message[:value]
@next << message
end
end
class DataTransformer < Concurrent::Actor::Context
def initialize(next_actor)
@next = next_actor
end
def on_message(message)
transformed = message.merge(
value: message[:value].upcase,
transformed_at: Time.now
)
@next << transformed
end
end
class DataPersister < Concurrent::Actor::Context
def initialize(results_actor)
@results = results_actor
end
def on_message(message)
# Simulate saving to database
saved = message.merge(id: rand(1000), persisted: true)
@results << saved
end
end
class ResultCollector < Concurrent::Actor::Context
def initialize
@results = []
end
def on_message(message)
case message
when Hash
@results << message
when :get_results
@results
end
end
end
# Build pipeline from right to left
collector = Concurrent::Actor.spawn(ResultCollector, :collector)
persister = Concurrent::Actor.spawn(DataPersister, :persister, collector)
transformer = Concurrent::Actor.spawn(DataTransformer, :transformer, persister)
validator = Concurrent::Actor.spawn(DataValidator, :validator, transformer)
# Send data through pipeline
data_items = [
{ value: 'hello' },
{ value: 'world' },
'invalid', # Will be rejected
{ value: 'pipeline' }
]
data_items.each { |item| validator << item }
sleep 0.5 # Let pipeline complete
results = collector.ask(:get_results).value
puts "Processed #{results.size} items:"
results.each { |r| puts " #{r[:value]} (ID: #{r[:id]})" }
Scatter-Gather Pattern: This pattern distributes a request to multiple actors (scatter), then collects and aggregates their responses (gather). The pattern handles parallel processing where results from multiple sources need combination.
require 'concurrent'
class PriceChecker < Concurrent::Actor::Context
def initialize(store_name, base_price)
@store_name = store_name
@base_price = base_price
end
def on_message(message)
case message
when :check_price
# Simulate checking price with variable delay
sleep(rand(0.1..0.3))
variation = rand(-0.1..0.1)
price = @base_price * (1 + variation)
{ store: @store_name, price: price.round(2) }
end
end
end
class PriceAggregator < Concurrent::Actor::Context
def initialize(checkers)
@checkers = checkers
end
def on_message(message)
case message
when :get_best_price
futures = @checkers.map { |checker| checker.ask(:check_price) }
prices = futures.map(&:value)
best = prices.min_by { |p| p[:price] }
{
best_price: best,
all_prices: prices.sort_by { |p| p[:price] },
average: (prices.sum { |p| p[:price] } / prices.size).round(2)
}
end
end
end
# Create price checkers for different stores
checkers = [
Concurrent::Actor.spawn(PriceChecker, :checker1, 'Store A', 99.99),
Concurrent::Actor.spawn(PriceChecker, :checker2, 'Store B', 99.99),
Concurrent::Actor.spawn(PriceChecker, :checker3, 'Store C', 99.99),
Concurrent::Actor.spawn(PriceChecker, :checker4, 'Store D', 99.99)
]
aggregator = Concurrent::Actor.spawn(PriceAggregator, :aggregator, checkers)
result = aggregator.ask(:get_best_price).value
puts "Best price: #{result[:best_price][:store]} at $#{result[:best_price][:price]}"
puts "Average price: $#{result[:average]}"
puts "\nAll prices:"
result[:all_prices].each do |p|
puts " #{p[:store]}: $#{p[:price]}"
end
Practical Examples
Real-world actor systems demonstrate how the model handles complex scenarios. These examples show complete implementations solving practical problems.
Chat Room Server: A chat room application illustrates multiple actors coordinating to manage state and communication. The system includes room actors that maintain participant lists and message history, user actors representing connected clients, and a lobby actor managing room creation.
require 'concurrent'
class ChatUser < Concurrent::Actor::Context
def initialize(username)
@username = username
@current_room = nil
end
def on_message(message)
case message
when [:joined_room, Concurrent::Actor::Reference]
@current_room = message[1]
{ status: :ok, message: "Joined room" }
when [:message, Hash]
puts "[#{message[:room]}] #{message[:from]}: #{message[:text]}"
{ status: :delivered }
when [:left_room]
@current_room = nil
{ status: :ok }
end
end
end
class ChatRoom < Concurrent::Actor::Context
def initialize(room_name)
@room_name = room_name
@participants = {}
@message_history = []
end
def on_message(message)
case message
when [:join, String, Concurrent::Actor::Reference]
username, user_actor = message[1], message[2]
@participants[username] = user_actor
# Send history to new user
@message_history.last(10).each do |msg|
user_actor << [:message, msg]
end
broadcast(username, "#{username} joined the room", exclude: username)
user_actor << [:joined_room, Actor.current]
{ status: :joined, participant_count: @participants.size }
when [:send_message, String, String]
username, text = message[1], message[2]
return { error: 'Not a participant' } unless @participants[username]
msg = { room: @room_name, from: username, text: text, time: Time.now }
@message_history << msg
broadcast(username, text)
{ status: :sent }
when [:leave, String]
username = message[1]
user_actor = @participants.delete(username)
user_actor&.<< [:left_room]
broadcast(username, "#{username} left the room", exclude: username)
{ status: :left }
when :info
{
name: @room_name,
participants: @participants.keys,
message_count: @message_history.size
}
end
end
private
def broadcast(from_username, text, exclude: nil)
msg = { room: @room_name, from: from_username, text: text }
@participants.each do |username, actor|
actor << [:message, msg] unless username == exclude
end
end
end
class ChatLobby < Concurrent::Actor::Context
def initialize
@rooms = {}
end
def on_message(message)
case message
when [:create_room, String]
room_name = message[1]
return { error: 'Room exists' } if @rooms[room_name]
room = Concurrent::Actor.spawn(ChatRoom, "room_#{room_name}", room_name)
@rooms[room_name] = room
{ status: :created, room: room }
when [:get_room, String]
room_name = message[1]
room = @rooms[room_name]
room ? { status: :found, room: room } : { error: 'Room not found' }
when :list_rooms
room_infos = @rooms.map do |name, room|
room.ask(:info).value
end
{ rooms: room_infos }
end
end
end
# Create lobby and users
lobby = Concurrent::Actor.spawn(ChatLobby, :lobby)
alice = Concurrent::Actor.spawn(ChatUser, :alice, 'Alice')
bob = Concurrent::Actor.spawn(ChatUser, :bob, 'Bob')
charlie = Concurrent::Actor.spawn(ChatUser, :charlie, 'Charlie')
# Create and join room
lobby.ask([:create_room, 'general']).value
room_result = lobby.ask([:get_room, 'general']).value
room = room_result[:room]
room.ask([:join, 'Alice', alice]).value
room.ask([:join, 'Bob', bob]).value
# Send messages
room << [:send_message, 'Alice', 'Hello everyone!']
room << [:send_message, 'Bob', 'Hi Alice!']
sleep 0.1 # Let messages propagate
# Charlie joins and receives history
room.ask([:join, 'Charlie', charlie]).value
room << [:send_message, 'Charlie', 'Hey, what did I miss?']
sleep 0.1
# Check room info
info = room.ask(:info).value
puts "\nRoom '#{info[:name]}' has #{info[:participants].size} participants"
puts "Participants: #{info[:participants].join(', ')}"
Job Queue System: A distributed job queue demonstrates actor-based task processing with priority handling, retry logic, and worker management. The system distributes jobs across workers and handles failures gracefully.
require 'concurrent'
class Job
attr_reader :id, :type, :payload, :priority, :attempts
def initialize(id, type, payload, priority = 5)
@id = id
@type = type
@payload = payload
@priority = priority
@attempts = 0
@max_attempts = 3
end
def attempt!
@attempts += 1
end
def failed?
@attempts >= @max_attempts
end
end
class Worker < Concurrent::Actor::Context
def initialize(worker_id, queue)
@worker_id = worker_id
@queue = queue
@jobs_completed = 0
@jobs_failed = 0
end
def on_message(message)
case message
when [:execute, Job]
job = message[1]
begin
result = execute_job(job)
@jobs_completed += 1
@queue << [:job_completed, job, result, @worker_id]
{ status: :success, job_id: job.id }
rescue => e
@jobs_failed += 1
@queue << [:job_failed, job, e.message, @worker_id]
{ status: :failed, job_id: job.id, error: e.message }
end
when :stats
{
worker_id: @worker_id,
completed: @jobs_completed,
failed: @jobs_failed
}
end
end
private
def execute_job(job)
case job.type
when :email
sleep(0.1) # Simulate sending email
raise 'SMTP error' if rand < 0.2 # 20% failure rate
{ sent: true, recipient: job.payload[:to] }
when :report
sleep(0.2) # Simulate generating report
{ generated: true, pages: rand(10..100) }
when :backup
sleep(0.15)
raise 'Disk full' if rand < 0.1 # 10% failure rate
{ backed_up: true, size_mb: rand(100..1000) }
end
end
end
class JobQueue < Concurrent::Actor::Context
def initialize(worker_count)
@pending_jobs = []
@failed_jobs = []
@completed_jobs = []
@workers = worker_count.times.map do |i|
Concurrent::Actor.spawn(Worker, "worker_#{i}", i, Actor.current)
end
@worker_index = 0
start_dispatcher
end
def on_message(message)
case message
when [:enqueue, Job]
job = message[1]
insert_by_priority(job)
{ status: :enqueued, job_id: job.id, position: @pending_jobs.index(job) }
when [:job_completed, Job, Hash, Integer]
job, result, worker_id = message[1], message[2], message[3]
@completed_jobs << { job: job, result: result, worker: worker_id }
when [:job_failed, Job, String, Integer]
job, error, worker_id = message[1], message[2], message[3]
job.attempt!
if job.failed?
@failed_jobs << { job: job, error: error }
puts "Job #{job.id} permanently failed after #{job.attempts} attempts"
else
puts "Job #{job.id} failed (attempt #{job.attempts}), requeueing..."
insert_by_priority(job)
end
when :stats
{
pending: @pending_jobs.size,
completed: @completed_jobs.size,
failed: @failed_jobs.size,
worker_stats: @workers.map { |w| w.ask(:stats).value }
}
end
end
private
def insert_by_priority(job)
index = @pending_jobs.bsearch_index { |j| j.priority <= job.priority } || @pending_jobs.size
@pending_jobs.insert(index, job)
end
def start_dispatcher
Thread.new do
loop do
if @pending_jobs.any?
job = @pending_jobs.shift
worker = @workers[@worker_index]
@worker_index = (@worker_index + 1) % @workers.size
worker << [:execute, job]
end
sleep 0.05
end
end
end
end
# Create queue with 3 workers
queue = Concurrent::Actor.spawn(JobQueue, :queue, 3)
# Enqueue various jobs with different priorities
jobs = [
Job.new(1, :email, { to: 'user@example.com' }, priority: 8),
Job.new(2, :report, { type: 'monthly' }, priority: 5),
Job.new(3, :backup, { target: '/data' }, priority: 10),
Job.new(4, :email, { to: 'admin@example.com' }, priority: 9),
Job.new(5, :report, { type: 'daily' }, priority: 3),
Job.new(6, :backup, { target: '/logs' }, priority: 7)
]
jobs.each { |job| queue << [:enqueue, job] }
# Let jobs process
sleep 3
# Get statistics
stats = queue.ask(:stats).value
puts "\nQueue Statistics:"
puts " Pending: #{stats[:pending]}"
puts " Completed: #{stats[:completed]}"
puts " Failed: #{stats[:failed]}"
puts "\nWorker Statistics:"
stats[:worker_stats].each do |ws|
puts " Worker #{ws[:worker_id]}: #{ws[:completed]} completed, #{ws[:failed]} failed"
end
Design Considerations
Selecting the Actor Model requires evaluating trade-offs against other concurrency models. Understanding when actors fit naturally and when alternative approaches work better guides architectural decisions.
When Actors Excel: Actor systems handle scenarios involving isolated state, asynchronous processing, and distribution across network boundaries. Web servers processing independent requests benefit from actor isolation, where each request handler runs as an actor managing its own state. Real-time systems like chat servers, gaming backends, and IoT platforms map naturally to actors because they involve many independent entities communicating asynchronously.
Systems requiring fault tolerance benefit from actor supervision trees. When a component can fail and restart without affecting the entire system, actors provide a natural structure. Financial transaction processors use actors to isolate transaction state, ensuring one failed transaction doesn't corrupt others. The supervision hierarchy lets failed actors restart while maintaining system availability.
When Actors Struggle: Actor systems add complexity for problems that don't involve concurrency or distribution. A single-threaded command-line utility gains no benefit from actors and suffers from added indirection and message-passing overhead. Simple CRUD applications often work better with traditional thread-per-request models or even single-threaded event loops.
Problems requiring shared mutable state across many actors face challenges. If 100 actors all need to read and update the same counter, the counter becomes a bottleneck whether implemented as an actor (serializing access) or as shared memory (requiring locks). Database-backed applications already have a concurrency model (database transactions), and adding actors can create impedance mismatch.
Actors don't eliminate the need to think about ordering and consistency. While individual actors process messages sequentially, multiple actors interacting still face distributed systems challenges. Banking systems can't simply make each account an actor without carefully handling transfer operations that span two accounts. The classic two-phase commit problems remain, just expressed through actor messages rather than shared memory.
Granularity Decisions: Choosing actor boundaries requires balancing isolation benefits against coordination costs. Very fine-grained actors (one per domain object) maximize isolation but increase message-passing overhead and complicate object relationships. Very coarse-grained actors (one per major subsystem) reduce message overhead but sacrifice concurrency and fault isolation.
require 'concurrent'
# Fine-grained: Each order line item is an actor
class OrderLineActor < Concurrent::Actor::Context
def initialize(product_id, quantity, price)
@product_id = product_id
@quantity = quantity
@price = price
end
def on_message(message)
case message
when :subtotal
@quantity * @price
when [:update_quantity, Integer]
@quantity = message[1]
end
end
end
# Coarse-grained: Entire order is one actor
class OrderActor < Concurrent::Actor::Context
def initialize
@line_items = []
end
def on_message(message)
case message
when [:add_item, Hash]
@line_items << message[1]
when :total
@line_items.sum { |item| item[:quantity] * item[:price] }
when [:update_quantity, Integer, Integer]
product_id, new_quantity = message[1], message[2]
item = @line_items.find { |i| i[:product_id] == product_id }
item[:quantity] = new_quantity if item
end
end
end
# Trade-off: Fine-grained allows concurrent line item updates but requires
# coordination for order-level operations. Coarse-grained simplifies order
# operations but serializes all updates.
State Management Strategies: Actor state remains encapsulated, but persistence requires explicit design. Actors can maintain all state in memory (fast but volatile), persist state changes to a database (durable but slower), or use event sourcing where actors rebuild state from event logs. Event sourcing works well with actors because messages naturally represent events.
Memory-only actors work for ephemeral state like active sessions or request-scoped data. Database-backed actors suit durable entities like user accounts or product catalogs. Event-sourced actors benefit from auditability requirements and the ability to rebuild state at any point in time. Each strategy involves trade-offs between performance, durability, and complexity.
Distribution Decisions: Location transparency lets actors move between processes or machines, but network distribution introduces new failure modes. Distributed actors must handle network partitions, message loss (if using unreliable transports), and increased latency. Local actors avoid these issues but can't scale beyond one machine.
Starting with local actors and adding distribution later works when the actor system design maintains location transparency. Avoiding assumptions about message delivery timing or actor co-location makes the transition smoother. Some systems begin with all actors local and selectively distribute specific actors as load demands, rather than immediately building a distributed system.
Error Handling & Edge Cases
Actor systems handle failures differently from traditional exception-based error handling. Understanding failure modes and recovery strategies makes actor systems reliable.
Supervision Strategies: Supervisors monitor child actors and respond to failures through supervision strategies. The one-for-one strategy restarts only the failed actor, preserving other children. The one-for-all strategy restarts all children when any child fails, useful when children depend on each other. The rest-for-one strategy restarts the failed actor and any actors started after it, handling dependency chains.
require 'concurrent'
class TemperatureSensor < Concurrent::Actor::Context
def initialize(sensor_id)
@sensor_id = sensor_id
@reading_count = 0
end
def on_message(message)
case message
when :read
@reading_count += 1
# Simulate sensor failures
raise "Sensor #{@sensor_id} malfunction" if rand < 0.15
{ sensor_id: @sensor_id, temperature: rand(15.0..30.0).round(1) }
when :stats
{ sensor_id: @sensor_id, readings: @reading_count }
end
end
end
class SensorSupervisor < Concurrent::Actor::Context
def initialize(sensor_count)
@sensors = sensor_count.times.map do |i|
spawn_sensor(i)
end
@restart_counts = Hash.new(0)
end
def on_message(message)
case message
when [:read, Integer]
sensor_idx = message[1]
sensor = @sensors[sensor_idx]
begin
sensor.ask!(:read, 0.5)
rescue => e
puts "Sensor #{sensor_idx} failed: #{e.message}"
# One-for-one supervision: restart only failed sensor
@restart_counts[sensor_idx] += 1
if @restart_counts[sensor_idx] <= 3
puts "Restarting sensor #{sensor_idx}..."
sensor.terminate!
@sensors[sensor_idx] = spawn_sensor(sensor_idx)
# Retry read after restart
@sensors[sensor_idx].ask!(:read, 0.5)
else
{ error: "Sensor #{sensor_idx} failed permanently" }
end
end
when :stats
@sensors.map.with_index do |sensor, i|
begin
sensor.ask(:stats).value.merge(restarts: @restart_counts[i])
rescue
{ sensor_id: i, status: 'failed', restarts: @restart_counts[i] }
end
end
end
end
private
def spawn_sensor(sensor_id)
Concurrent::Actor.spawn(TemperatureSensor, "sensor_#{sensor_id}", sensor_id)
end
end
supervisor = Concurrent::Actor.spawn(SensorSupervisor, :supervisor, 3)
# Read from sensors repeatedly, some will fail and restart
20.times do |i|
result = supervisor.ask([:read, i % 3]).value rescue { error: 'failed' }
puts "Reading #{i}: #{result}"
sleep 0.1
end
# Check statistics showing restart counts
stats = supervisor.ask(:stats).value
puts "\nSensor Statistics:"
stats.each do |s|
puts " Sensor #{s[:sensor_id]}: #{s[:readings]} readings, #{s[:restarts]} restarts"
end
Message Loss and Delivery: Actor systems differ in message delivery guarantees. At-most-once delivery (send and forget) provides no guarantee the message arrives. At-least-once delivery ensures messages arrive but might duplicate them. Exactly-once delivery (the hardest to implement) guarantees each message processes once.
Most actor frameworks provide at-most-once delivery for local actors, where message loss occurs only on actor termination. Distributed actor systems face network-level message loss and typically implement at-least-once delivery with idempotency to handle duplicates.
require 'concurrent'
class IdempotentHandler < Concurrent::Actor::Context
def initialize
@processed_messages = Set.new
@results = {}
end
def on_message(message)
case message
when [:process, String, Hash]
message_id, data = message[1], message[2]
# Check if already processed (idempotency)
if @processed_messages.include?(message_id)
puts "Duplicate message #{message_id}, returning cached result"
return @results[message_id]
end
# Process message
result = perform_processing(data)
# Record processing
@processed_messages.add(message_id)
@results[message_id] = result
result
end
end
private
def perform_processing(data)
sleep 0.1 # Simulate work
{ processed: true, data: data, timestamp: Time.now }
end
end
handler = Concurrent::Actor.spawn(IdempotentHandler, :handler)
# Simulate sending duplicate messages (as might happen with at-least-once delivery)
handler << [:process, 'msg-001', { value: 100 }]
sleep 0.15
handler << [:process, 'msg-001', { value: 100 }] # Duplicate
result = handler.ask([:process, 'msg-002', { value: 200 }]).value
# First msg-001 processes, second is ignored due to idempotency
Deadlock Prevention: Actors avoid traditional deadlocks because they don't hold locks, but message-level deadlocks can occur when actors wait for responses in cycles. Actor A sends to B and waits for response, B sends to A and waits for response - neither can proceed. Preventing these cycles requires either avoiding synchronous message sends (using fire-and-forget) or carefully analyzing actor communication patterns.
require 'concurrent'
# BAD: Potential deadlock
class ActorA < Concurrent::Actor::Context
def initialize(actor_b)
@actor_b = actor_b
end
def on_message(message)
case message
when :start
# This can deadlock if B tries to ask back to A
result = @actor_b.ask(:request_from_a).value
result
when :request_from_b
'Response from A'
end
end
end
# GOOD: Deadlock-free with callback pattern
class ImprovedActorA < Concurrent::Actor::Context
def initialize
@pending_callbacks = {}
end
def on_message(message)
case message
when [:start, Concurrent::Actor::Reference]
actor_b = message[1]
request_id = SecureRandom.uuid
@pending_callbacks[request_id] = Time.now
# Send request with callback reference
actor_b << [:request_from_a, request_id, Actor.current]
{ status: :sent, request_id: request_id }
when [:response, String, Object]
request_id, result = message[1], message[2]
@pending_callbacks.delete(request_id)
{ status: :completed, result: result }
end
end
end
class ImprovedActorB < Concurrent::Actor::Context
def on_message(message)
case message
when [:request_from_a, String, Concurrent::Actor::Reference]
request_id, reply_to = message[1], message[2]
# Process and send response back asynchronously
result = { data: 'Processed' }
reply_to << [:response, request_id, result]
end
end
end
Back Pressure: When a fast producer sends messages faster than a slow consumer processes them, the consumer's mailbox grows unbounded. Implementing back pressure prevents memory exhaustion. Strategies include bounded mailboxes that block or drop messages when full, flow control where consumers signal readiness, or priority schemes that process important messages first.
require 'concurrent'
class BoundedMailboxConsumer < Concurrent::Actor::Context
MAX_QUEUE_SIZE = 10
def initialize
@queue = []
@processing = false
end
def on_message(message)
case message
when [:process, Object]
if @queue.size >= MAX_QUEUE_SIZE
return { status: :rejected, reason: 'queue full' }
end
@queue << message[1]
process_queue unless @processing
{ status: :accepted, queue_size: @queue.size }
when :drain
process_queue
end
end
private
def process_queue
@processing = true
Thread.new do
while @queue.any?
item = @queue.shift
sleep 0.1 # Simulate slow processing
puts "Processed: #{item}"
end
@processing = false
end
end
end
consumer = Concurrent::Actor.spawn(BoundedMailboxConsumer, :consumer)
# Try to send more messages than queue can hold
results = 15.times.map do |i|
consumer.ask([:process, "Item #{i}"]).value
end
accepted = results.count { |r| r[:status] == :accepted }
rejected = results.count { |r| r[:status] == :rejected }
puts "\nAccepted: #{accepted}, Rejected: #{rejected}"
Reference
Actor Model implementation reference for Ruby development.
Core Concepts
| Concept | Definition | Key Property |
|---|---|---|
| Actor | Isolated computational unit with private state and mailbox | Processes messages sequentially |
| Message | Asynchronous communication unit sent between actors | Immutable and ordered per sender |
| Mailbox | Queue holding messages for an actor to process | FIFO ordering (typically) |
| Reference | Handle to an actor for sending messages | Location transparent |
| Supervision | Parent-child relationship for fault tolerance | Automatic restart on failure |
Message Delivery Guarantees
| Guarantee | Meaning | Use Case |
|---|---|---|
| At-most-once | Message might not arrive | Fire-and-forget notifications |
| At-least-once | Message arrives one or more times | Idempotent operations |
| Exactly-once | Message processes exactly once | Critical transactions |
Concurrent Ruby Actor API
| Method | Purpose | Returns |
|---|---|---|
| Concurrent::Actor.spawn | Create new actor | Actor reference |
| reference << message | Send async message | Message |
| reference.ask(message) | Send message and get future | Future |
| reference.ask!(message, timeout) | Send message with timeout | Result or raises |
| reference.terminate! | Stop actor immediately | Boolean |
| Actor.current | Get current actor reference | Reference or nil |
Common Actor Patterns
| Pattern | Description | When to Use |
|---|---|---|
| Request-Reply | Send message and wait for response | Synchronous-style operations |
| Router | Distribute messages across worker pool | Load balancing |
| Pipeline | Chain actors for sequential processing | Data transformation workflows |
| Scatter-Gather | Send to multiple actors and aggregate results | Parallel queries |
| Publisher-Subscriber | Broadcast messages to subscribers | Event distribution |
Supervision Strategies
| Strategy | Behavior | Application |
|---|---|---|
| One-for-One | Restart only failed child | Independent children |
| One-for-All | Restart all children when one fails | Interdependent children |
| Rest-for-One | Restart failed child and those started after | Sequential dependencies |
Actor System Design Checklist
| Consideration | Question | Impact |
|---|---|---|
| Granularity | How many actors per domain concept? | Message overhead vs isolation |
| State | Where does persistent state live? | Durability vs performance |
| Distribution | Which actors need location transparency? | Scalability vs complexity |
| Failure handling | What supervision strategy per actor tree? | Fault tolerance approach |
| Backpressure | How to handle mailbox overflow? | Memory usage and stability |
Performance Characteristics
| Operation | Typical Cost | Optimization |
|---|---|---|
| Message send (local) | Microseconds | Use fire-and-forget for throughput |
| Message send (remote) | Milliseconds | Batch messages when possible |
| Actor creation | Microseconds | Pool actors for frequent creation |
| Mailbox operation | O(1) enqueue/dequeue | Monitor mailbox sizes |
| State access | Memory read | Keep hot state in actor memory |
Common Implementation Patterns
# Actor with timeout handling
class TimeoutActor < Concurrent::Actor::Context
def on_message(message)
case message
when [:request, Object, Concurrent::Actor::Reference, Numeric]
data, reply_to, timeout = message[1], message[2], message[3]
Thread.new do
result = process_with_timeout(data, timeout)
reply_to << [:response, result]
end
{ status: :processing }
end
end
private
def process_with_timeout(data, timeout)
Concurrent::Promises.future do
# Processing logic
end.wait(timeout)
rescue Concurrent::TimeoutError
{ error: 'timeout' }
end
end
# Actor with state persistence
class PersistentActor < Concurrent::Actor::Context
def initialize(actor_id, db)
@actor_id = actor_id
@db = db
@state = load_state
end
def on_message(message)
case message
when [:update, Hash]
@state.merge!(message[1])
save_state
{ status: :updated }
end
end
private
def load_state
@db.get(@actor_id) || {}
end
def save_state
@db.set(@actor_id, @state)
end
end