Overview
Worker Threads provide true parallel execution in Ruby by running code in separate operating system threads without the Global Interpreter Lock (GIL) restrictions. Each worker thread operates as an isolated Ruby interpreter instance that can execute CPU-intensive operations concurrently with the main thread and other worker threads.
Ruby implements Worker Threads through the Ractor
class, which creates actor-like processes that communicate through message passing. Each Ractor maintains its own object space, preventing shared mutable state and eliminating most thread safety concerns that plague traditional Ruby threading.
# Create a worker thread for CPU-intensive calculation
worker = Ractor.new do
def prime?(n)
return false if n < 2
(2..Math.sqrt(n)).none? { |i| n % i == 0 }
end
# Receive work from main thread
while number = Ractor.receive
result = prime?(number)
Ractor.yield result
end
end
# Send work to worker and get result
worker.send(97)
puts worker.take # => true
Ractors operate on a share-nothing architecture where objects cannot be freely passed between threads. Only immutable objects, Ractor-safe objects, or objects explicitly copied can cross Ractor boundaries. This design prevents data races and eliminates the need for locks in most scenarios.
# Immutable objects can be shared
worker = Ractor.new do |name|
"Hello, #{name}!" # String literals are shareable
end.send("World")
puts worker.take # => "Hello, World!"
# Mutable objects must be moved or copied
data = [1, 2, 3, 4, 5]
worker = Ractor.new do
numbers = Ractor.receive
numbers.sum
end
worker.send(data, move: true) # Transfer ownership
puts worker.take # => 15
Basic Usage
Creating Worker Threads involves instantiating Ractors with blocks that define their behavior. The main thread communicates with workers through send
and receive
operations, while workers return results using Ractor.yield
or by terminating with a return value.
# Basic worker creation and communication
calculator = Ractor.new do
loop do
operation, a, b = Ractor.receive
case operation
when :add
Ractor.yield(a + b)
when :multiply
Ractor.yield(a * b)
when :stop
break
end
end
end
# Send operations to worker
calculator.send([:add, 10, 5])
puts calculator.take # => 15
calculator.send([:multiply, 4, 6])
puts calculator.take # => 24
calculator.send([:stop])
Workers can be configured with initial arguments passed during creation. These arguments must be Ractor-safe objects that can cross thread boundaries without violating the isolation model.
# Worker with initialization parameters
file_processor = Ractor.new("data.txt", /\d+/) do |filename, pattern|
File.readlines(filename).map do |line|
line.scan(pattern).map(&:to_i)
end.flatten
end
numbers = file_processor.take
puts numbers.sum
Multiple workers can operate concurrently to process different parts of a workload. The main thread distributes work among available workers and collects results as they complete.
# Parallel processing with multiple workers
def create_worker_pool(size)
Array.new(size) do |i|
Ractor.new(i) do |worker_id|
loop do
work = Ractor.receive
break if work == :shutdown
# Simulate CPU-intensive work
result = work.map { |n| n ** 2 }
Ractor.yield [worker_id, result]
end
end
end
end
workers = create_worker_pool(3)
data_chunks = [[1,2,3], [4,5,6], [7,8,9]]
# Distribute work
data_chunks.each_with_index do |chunk, i|
workers[i].send(chunk)
end
# Collect results
results = workers.map(&:take)
results.each do |worker_id, values|
puts "Worker #{worker_id}: #{values.join(', ')}"
end
# Shutdown workers
workers.each { |w| w.send(:shutdown) }
Worker threads can also function as independent services that process streams of incoming work rather than single tasks.
# Long-running worker service
log_processor = Ractor.new do
processed_count = 0
loop do
message = Ractor.receive
case message
when Hash
# Process log entry
level = message[:level]
content = message[:message]
timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S")
puts "[#{timestamp}] #{level.upcase}: #{content}"
processed_count += 1
when :status
Ractor.yield processed_count
when :shutdown
break
end
end
end
# Send log messages
log_processor.send(level: :info, message: "Application started")
log_processor.send(level: :error, message: "Database connection failed")
# Check status
log_processor.send(:status)
puts "Processed #{log_processor.take} messages"
log_processor.send(:shutdown)
Thread Safety & Concurrency
Worker Threads achieve thread safety through complete isolation rather than synchronization mechanisms. Each Ractor operates in its own memory space with no shared mutable state, eliminating data races and deadlock conditions that affect traditional threaded programs.
Objects passed between Ractors must satisfy shareability requirements. Immutable objects like numbers, symbols, and frozen strings can be shared freely. Mutable objects require explicit copying or ownership transfer through the move
option.
# Demonstrating object shareability
shareable_data = {
number: 42,
symbol: :status,
frozen_string: "Hello".freeze,
frozen_array: [1, 2, 3].freeze
}
worker = Ractor.new do
data = Ractor.receive
puts "Received: #{data.inspect}"
data[:number] * 2
end
worker.send(shareable_data) # Safe to share
result = worker.take
puts result # => 84
Attempting to share non-Ractor-safe objects raises Ractor::IsolationError
. This prevents accidental state corruption that could occur with traditional threading approaches.
# Non-shareable objects cause isolation errors
mutable_array = [1, 2, 3]
worker = Ractor.new do
Ractor.receive.sum
end
begin
worker.send(mutable_array)
rescue Ractor::IsolationError => e
puts "Cannot share mutable array: #{e.message}"
# Solution: use move semantics
worker.send(mutable_array, move: true)
puts worker.take # => 6
end
Complex coordination patterns require careful design of message protocols between workers and the main thread. Workers can maintain internal state safely since no other thread can access their memory space.
# Coordinated worker pool with state management
class WorkerPool
def initialize(size)
@workers = Array.new(size) do |i|
create_worker(i)
end
@next_worker = 0
end
def submit_work(task)
worker = @workers[@next_worker]
@next_worker = (@next_worker + 1) % @workers.size
worker.send(task, move: true)
worker
end
def shutdown
@workers.each { |w| w.send(:terminate) }
end
private
def create_worker(id)
Ractor.new(id) do |worker_id|
completed_tasks = 0
loop do
task = Ractor.receive
break if task == :terminate
# Process task with access to worker-local state
result = process_task(task)
completed_tasks += 1
Ractor.yield({
worker_id: worker_id,
result: result,
completed_count: completed_tasks
})
end
end
end
def process_task(task)
# Simulate work
task.map { |x| Math.sin(x) }.sum
end
end
pool = WorkerPool.new(4)
tasks = Array.new(10) { Array.new(100) { rand } }
# Submit all tasks and collect results
futures = tasks.map { |task| pool.submit_work(task) }
results = futures.map(&:take)
results.each do |result|
puts "Worker #{result[:worker_id]}: #{result[:result].round(4)} (#{result[:completed_count]} tasks)"
end
pool.shutdown
Pipeline architectures can chain multiple workers where each stage processes data and passes results to the next stage. This pattern maintains isolation while enabling complex data transformations.
# Multi-stage processing pipeline
def create_pipeline
# Stage 1: Data validation and parsing
parser = Ractor.new do
loop do
raw_data = Ractor.receive
break if raw_data == :shutdown
# Validate and parse input
parsed = raw_data.select { |item| item.is_a?(String) && !item.empty? }
.map { |s| s.strip.downcase }
Ractor.yield parsed
end
end
# Stage 2: Data transformation
transformer = Ractor.new do
loop do
data = Ractor.receive
break if data == :shutdown
# Transform data
transformed = data.map do |word|
{
word: word,
length: word.length,
vowel_count: word.count('aeiou'),
hash: word.hash
}
end
Ractor.yield transformed
end
end
# Stage 3: Aggregation
aggregator = Ractor.new do
total_words = 0
total_length = 0
loop do
data = Ractor.receive
break if data == :shutdown
total_words += data.length
total_length += data.sum { |item| item[:length] }
Ractor.yield({
batch_size: data.length,
total_processed: total_words,
average_length: total_length.to_f / total_words
})
end
end
[parser, transformer, aggregator]
end
pipeline = create_pipeline
raw_data = ["Hello", "", "World", "Ruby", " Programming ", nil, "Ractor"]
# Process through pipeline
pipeline[0].send(raw_data, move: true)
parsed = pipeline[0].take
pipeline[1].send(parsed, move: true)
transformed = pipeline[1].take
pipeline[2].send(transformed, move: true)
stats = pipeline[2].take
puts "Processed #{stats[:batch_size]} words"
puts "Total processed: #{stats[:total_processed]}"
puts "Average length: #{stats[:average_length].round(2)}"
# Shutdown pipeline
pipeline.each { |stage| stage.send(:shutdown) }
Performance & Memory
Worker Threads excel at CPU-intensive tasks that can be parallelized across multiple cores. Each Ractor runs on a separate operating system thread with its own Ruby interpreter instance, enabling true parallel execution without GIL constraints.
Memory usage scales with the number of Ractors since each maintains its own object space and interpreter state. A typical Ractor consumes 1-2 MB of base memory before loading application code and data.
# Memory and performance measurement utilities
def measure_memory_usage
GC.start
# Note: Actual memory measurement would require external tools
# This demonstrates the pattern for monitoring
puts "Memory usage: #{GC.stat(:heap_live_slots)} live objects"
end
def benchmark_parallel_vs_sequential
data = Array.new(100_000) { rand(1000) }
# Sequential processing
start_time = Time.now
sequential_result = data.map { |n| Math.sqrt(n) * Math.sin(n) }.sum
sequential_time = Time.now - start_time
# Parallel processing
measure_memory_usage
start_time = Time.now
chunk_size = data.size / 4
chunks = data.each_slice(chunk_size).to_a
workers = chunks.map do |chunk|
Ractor.new do
work = Ractor.receive
work.map { |n| Math.sqrt(n) * Math.sin(n) }.sum
end
end
chunks.each_with_index { |chunk, i| workers[i].send(chunk, move: true) }
parallel_result = workers.map(&:take).sum
parallel_time = Time.now - start_time
measure_memory_usage
puts "Sequential: #{sequential_time.round(4)}s, Result: #{sequential_result.round(4)}"
puts "Parallel: #{parallel_time.round(4)}s, Result: #{parallel_result.round(4)}"
puts "Speedup: #{(sequential_time / parallel_time).round(2)}x"
end
benchmark_parallel_vs_sequential
Optimal performance requires balancing the number of workers with available CPU cores and task characteristics. Too many workers can cause excessive context switching overhead, while too few underutilize available parallelism.
# Dynamic worker pool sizing based on workload
class AdaptiveWorkerPool
def initialize(min_workers: 1, max_workers: nil)
@min_workers = min_workers
@max_workers = max_workers || Etc.nprocessors
@workers = []
@task_queue = Queue.new
@results = Queue.new
# Start with minimum workers
scale_to(@min_workers)
end
def process_batch(tasks, timeout: 60)
start_time = Time.now
submitted_count = 0
# Submit all tasks
tasks.each do |task|
@task_queue << task
submitted_count += 1
end
# Monitor progress and adapt worker count
results = []
while results.size < submitted_count && (Time.now - start_time) < timeout
begin
result = @results.pop(timeout: 1)
results << result if result
rescue ThreadError
# Check if we need more workers
if @task_queue.size > @workers.size && @workers.size < @max_workers
add_worker
end
end
end
results
end
private
def scale_to(count)
while @workers.size < count
add_worker
end
end
def add_worker
worker = Ractor.new(@task_queue, @results) do |task_queue, results|
while task = task_queue.pop
break if task == :shutdown
# Process task
processed = expensive_computation(task)
results << processed
end
end
@workers << worker
puts "Scaled to #{@workers.size} workers"
end
def expensive_computation(data)
# Simulate CPU-intensive work
data.map { |x| (1..1000).map { |i| Math.sin(x * i) }.sum }
end
end
# Note: This example shows the concept but requires additional
# thread-safe queue implementation for full functionality
Memory management becomes critical when processing large datasets since each Ractor maintains separate object spaces. Strategies include streaming processing, chunked data handling, and explicit garbage collection coordination.
# Memory-efficient streaming processor
class StreamProcessor
def initialize(worker_count: 4)
@workers = create_worker_pool(worker_count)
@current_worker = 0
end
def process_large_file(filename, chunk_size: 1000)
File.foreach(filename).each_slice(chunk_size) do |chunk|
worker = next_worker
# Send chunk to worker and immediately get result
worker.send(chunk.dup, move: true)
result = worker.take
# Process result immediately to avoid memory buildup
yield result if block_given?
# Trigger GC periodically
GC.start if rand < 0.1
end
end
private
def create_worker_pool(count)
Array.new(count) do
Ractor.new do
loop do
chunk = Ractor.receive
break if chunk == :shutdown
# Process chunk with limited memory footprint
processed = chunk.map(&:strip)
.reject(&:empty?)
.map { |line| analyze_line(line) }
# Force local GC before yielding
GC.start
Ractor.yield processed
end
end
end
end
def next_worker
worker = @workers[@current_worker]
@current_worker = (@current_worker + 1) % @workers.size
worker
end
def analyze_line(line)
words = line.split
{
word_count: words.size,
char_count: line.length,
complexity: words.map(&:length).sum.to_f / words.size
}
end
end
# Usage for processing large files efficiently
processor = StreamProcessor.new(worker_count: 4)
total_stats = { lines: 0, words: 0, chars: 0 }
processor.process_large_file("large_file.txt") do |batch_results|
batch_results.each do |stats|
total_stats[:lines] += 1
total_stats[:words] += stats[:word_count]
total_stats[:chars] += stats[:char_count]
end
# Print progress without accumulating memory
puts "Processed #{total_stats[:lines]} lines..." if total_stats[:lines] % 10000 == 0
end
puts "Final stats: #{total_stats}"
Error Handling & Debugging
Worker Thread error handling requires different strategies than traditional threading since exceptions cannot propagate directly between isolated Ractors. Each worker must handle its own exceptions and communicate error states through the message passing interface.
# Robust error handling in worker threads
def create_resilient_worker
Ractor.new do
loop do
begin
task = Ractor.receive
break if task == :shutdown
case task[:type]
when :calculate
result = risky_calculation(task[:data])
Ractor.yield({ status: :success, result: result })
when :process_file
content = File.read(task[:filename])
processed = content.lines.map(&:strip).reject(&:empty?)
Ractor.yield({ status: :success, result: processed })
else
Ractor.yield({ status: :error, error: "Unknown task type: #{task[:type]}" })
end
rescue StandardError => e
# Capture exception details for main thread
error_info = {
status: :error,
error_class: e.class.name,
error_message: e.message,
backtrace: e.backtrace[0..5] # Limit backtrace size
}
Ractor.yield(error_info)
end
end
end
end
def risky_calculation(data)
raise ArgumentError, "Data cannot be negative" if data < 0
raise RuntimeError, "Calculation overflow" if data > 1000
Math.sqrt(data) * 100
end
# Using resilient worker with comprehensive error handling
worker = create_resilient_worker
test_cases = [
{ type: :calculate, data: 16 }, # Should succeed
{ type: :calculate, data: -5 }, # Will raise ArgumentError
{ type: :calculate, data: 2000 }, # Will raise RuntimeError
{ type: :invalid, data: 42 }, # Unknown task type
{ type: :process_file, filename: "nonexistent.txt" } # File error
]
test_cases.each_with_index do |task, i|
puts "Test case #{i + 1}:"
worker.send(task)
response = worker.take
case response[:status]
when :success
puts " Success: #{response[:result]}"
when :error
puts " Error: #{response[:error_class]} - #{response[:error_message]}"
puts " Backtrace: #{response[:backtrace]&.first}" if response[:backtrace]
end
end
worker.send(:shutdown)
Debugging Worker Threads presents unique challenges since traditional debugging tools cannot inspect across Ractor boundaries. Effective debugging relies on structured logging, state reporting mechanisms, and careful message protocol design.
# Debugging support for worker threads
module WorkerDebugging
def self.create_debug_worker(worker_id)
Ractor.new(worker_id) do |id|
debug_log = []
processed_count = 0
start_time = Time.now
loop do
message = Ractor.receive
timestamp = Time.now
case message
when Hash
if message[:debug_command]
handle_debug_command(message, debug_log, processed_count, start_time)
else
# Normal work processing
debug_log << { time: timestamp, action: "received_work", data: message.keys }
begin
result = process_work(message)
processed_count += 1
debug_log << { time: timestamp, action: "completed_work", result_size: result.size }
Ractor.yield({ status: :success, result: result, worker_id: id })
rescue => e
debug_log << { time: timestamp, action: "error", error: e.class.name }
Ractor.yield({
status: :error,
error: e.message,
worker_id: id,
debug_log: debug_log.last(5) # Recent activity
})
end
end
when :shutdown
debug_log << { time: timestamp, action: "shutdown" }
break
end
# Limit debug log size
debug_log = debug_log.last(100) if debug_log.size > 100
end
end
end
def self.handle_debug_command(message, debug_log, processed_count, start_time)
case message[:debug_command]
when :status
status = {
processed_count: processed_count,
uptime: Time.now - start_time,
recent_activity: debug_log.last(5)
}
Ractor.yield({ debug_status: status })
when :log_dump
Ractor.yield({ debug_log: debug_log })
when :memory_stats
# Note: In practice, this would use more sophisticated memory inspection
stats = {
gc_stat: GC.stat.slice(:count, :heap_live_slots, :heap_free_slots),
object_count: ObjectSpace.count_objects
}
Ractor.yield({ memory_stats: stats })
end
end
def self.process_work(data)
# Simulate work that might fail
data[:items].map do |item|
raise "Invalid item" if item.nil?
item.to_s.upcase
end
end
end
# Using debug-enabled workers
workers = Array.new(3) { |i| WorkerDebugging.create_debug_worker(i) }
# Send some work
work_items = [
{ items: ["hello", "world"] },
{ items: ["ruby", nil, "ractor"] }, # Contains invalid item
{ items: (1..5).to_a }
]
work_items.each_with_index do |work, i|
worker = workers[i % workers.size]
worker.send(work)
result = worker.take
puts "Work #{i}: #{result[:status]}"
if result[:status] == :error
puts " Error: #{result[:error]}"
puts " Recent activity:"
result[:debug_log]&.each do |entry|
puts " #{entry[:time].strftime('%H:%M:%S')} - #{entry[:action]}: #{entry.except(:time, :action)}"
end
end
end
# Query worker status
workers.each_with_index do |worker, i|
worker.send({ debug_command: :status })
status = worker.take
puts "Worker #{i} status:"
puts " Processed: #{status[:debug_status][:processed_count]} tasks"
puts " Uptime: #{status[:debug_status][:uptime].round(2)} seconds"
end
workers.each { |w| w.send(:shutdown) }
Timeout handling prevents workers from hanging indefinitely on problematic tasks. Implementation requires careful coordination between timeout detection and worker cleanup.
# Timeout-aware worker management
class TimeoutWorkerManager
def initialize(default_timeout: 30)
@default_timeout = default_timeout
@active_workers = {}
end
def execute_with_timeout(task, timeout: nil)
timeout ||= @default_timeout
worker = create_monitored_worker(task, timeout)
worker_id = worker.object_id
@active_workers[worker_id] = { worker: worker, start_time: Time.now }
begin
# Wait for result with timeout
result = worker.take
@active_workers.delete(worker_id)
result
rescue Ractor::ClosedError
@active_workers.delete(worker_id)
{ status: :error, error: "Worker terminated unexpectedly" }
rescue => e
cleanup_worker(worker_id)
{ status: :error, error: "Timeout or execution error: #{e.message}" }
end
end
def monitor_timeouts
Thread.new do
loop do
current_time = Time.now
@active_workers.each do |worker_id, info|
if current_time - info[:start_time] > @default_timeout
puts "Worker #{worker_id} exceeded timeout, cleaning up..."
cleanup_worker(worker_id)
end
end
sleep 1
end
end
end
private
def create_monitored_worker(task, timeout)
Ractor.new(task, timeout) do |work, time_limit|
start_time = Time.now
begin
# Periodically check if we've exceeded timeout
result = perform_work_with_checks(work, time_limit, start_time)
Ractor.yield({ status: :success, result: result, duration: Time.now - start_time })
rescue => e
Ractor.yield({
status: :error,
error: e.message,
duration: Time.now - start_time
})
end
end
end
def perform_work_with_checks(work, time_limit, start_time)
work[:items].map.with_index do |item, i|
# Check timeout periodically during long operations
if i % 100 == 0 && (Time.now - start_time) > time_limit
raise Timeout::Error, "Operation exceeded #{time_limit} seconds"
end
# Simulate varying work duration
sleep(work[:delay] || 0.01)
item * 2
end
end
def cleanup_worker(worker_id)
if info = @active_workers.delete(worker_id)
begin
# Attempt graceful cleanup
# Note: In practice, forceful termination of Ractors
# may require additional coordination
puts "Cleaned up worker #{worker_id} after #{(Time.now - info[:start_time]).round(2)}s"
rescue => e
puts "Error during worker cleanup: #{e.message}"
end
end
end
end
# Using timeout-aware workers
manager = TimeoutWorkerManager.new(default_timeout: 5)
timeout_monitor = manager.monitor_timeouts
test_tasks = [
{ items: (1..10).to_a, delay: 0.1 }, # Should complete
{ items: (1..100).to_a, delay: 0.1 }, # Will timeout
{ items: ["error"], delay: 0 } # Will raise exception
]
test_tasks.each_with_index do |task, i|
puts "Executing task #{i + 1}:"
result = manager.execute_with_timeout(task, timeout: 3)
puts " Status: #{result[:status]}"
puts " Duration: #{result[:duration]&.round(2)}s" if result[:duration]
puts " Error: #{result[:error]}" if result[:error]
puts " Result size: #{result[:result]&.size}" if result[:result]
end
timeout_monitor.kill
Production Patterns
Production deployment of Worker Threads requires careful consideration of resource management, monitoring, error recovery, and integration with existing application architectures. Worker pools must be sized appropriately for the deployment environment and workload characteristics.
# Production-ready worker pool implementation
class ProductionWorkerPool
attr_reader :stats
def initialize(config = {})
@min_workers = config[:min_workers] || 2
@max_workers = config[:max_workers] || [Etc.nprocessors * 2, 8].min
@idle_timeout = config[:idle_timeout] || 300
@task_timeout = config[:task_timeout] || 60
@workers = {}
@task_queue = Queue.new
@results = {}
@stats = initialize_stats
@monitor_thread = nil
@shutdown = false
start_monitoring
scale_to(@min_workers)
end
def submit_async(task_data)
task_id = SecureRandom.uuid
@stats[:tasks_submitted] += 1
@task_queue << { id: task_id, data: task_data, submitted_at: Time.now }
task_id
end
def get_result(task_id, timeout: @task_timeout)
deadline = Time.now + timeout
while Time.now < deadline
if result = @results.delete(task_id)
@stats[:tasks_completed] += 1
return result
end
sleep 0.01
end
@stats[:tasks_timeout] += 1
{ status: :timeout, error: "Task #{task_id} timed out after #{timeout}s" }
end
def health_check
{
status: @shutdown ? :shutdown : :healthy,
workers: {
active: @workers.size,
min: @min_workers,
max: @max_workers
},
queue: {
pending: @task_queue.size,
processing: @workers.count { |_, info| info[:current_task] }
},
stats: @stats.dup
}
end
def shutdown(timeout: 30)
@shutdown = true
@monitor_thread&.kill
# Stop accepting new work
@task_queue.clear
# Wait for workers to complete current tasks
deadline = Time.now + timeout
while @workers.any? && Time.now < deadline
@workers.each { |_, info| info[:worker].send(:shutdown) if info[:worker] }
sleep 0.1
cleanup_completed_workers
end
# Force cleanup remaining workers
@workers.clear
puts "Worker pool shutdown complete"
end
private
def initialize_stats
{
tasks_submitted: 0,
tasks_completed: 0,
tasks_failed: 0,
tasks_timeout: 0,
workers_created: 0,
workers_destroyed: 0,
average_task_time: 0.0
}
end
def start_monitoring
@monitor_thread = Thread.new do
loop do
break if @shutdown
begin
manage_worker_pool
cleanup_completed_workers
update_performance_stats
sleep 1
rescue => e
puts "Monitor error: #{e.message}"
sleep 5
end
end
end
end
def manage_worker_pool
queue_size = @task_queue.size
active_workers = @workers.size
# Scale up if queue is building up
if queue_size > active_workers && active_workers < @max_workers
add_worker
end
# Scale down if workers are idle
if queue_size == 0 && active_workers > @min_workers
idle_workers = @workers.select do |_, info|
info[:current_task].nil? &&
(Time.now - info[:last_task_at]) > @idle_timeout
end
if idle_workers.any?
worker_id, info = idle_workers.first
remove_worker(worker_id)
end
end
end
def add_worker
worker_id = SecureRandom.uuid
worker = Ractor.new(worker_id, @task_queue, @results) do |id, queue, results|
loop do
begin
task = queue.pop(timeout: 1)
break if task == :shutdown || task.nil?
start_time = Time.now
# Process the actual work
result = perform_task(task[:data])
duration = Time.now - start_time
results[task[:id]] = {
status: :success,
result: result,
duration: duration,
worker_id: id
}
rescue => e
results[task[:id]] = {
status: :error,
error: e.message,
error_class: e.class.name,
worker_id: id
}
end
end
end
@workers[worker_id] = {
worker: worker,
created_at: Time.now,
last_task_at: Time.now,
current_task: nil
}
@stats[:workers_created] += 1
puts "Added worker #{worker_id} (total: #{@workers.size})"
end
def remove_worker(worker_id)
if info = @workers.delete(worker_id)
info[:worker].send(:shutdown)
@stats[:workers_destroyed] += 1
puts "Removed worker #{worker_id} (total: #{@workers.size})"
end
end
def cleanup_completed_workers
@workers.delete_if do |worker_id, info|
begin
info[:worker].take # Non-blocking check if worker is done
false # Keep worker if still active
rescue Ractor::ClosedError
@stats[:workers_destroyed] += 1
true # Remove closed worker
rescue Ractor::EmptyError
false # Worker is alive but no message available
end
end
end
def update_performance_stats
# Calculate average task completion time
completed = @stats[:tasks_completed]
if completed > 0
# This would need more sophisticated timing tracking in practice
@stats[:average_task_time] = calculate_average_task_time
end
end
def calculate_average_task_time
# Placeholder for actual timing calculation
0.5
end
def perform_task(data)
# Example task processing - replace with actual business logic
case data[:type]
when :image_resize
process_image(data[:image_data], data[:dimensions])
when :data_analysis
analyze_dataset(data[:dataset])
when :report_generation
generate_report(data[:parameters])
else
raise "Unknown task type: #{data[:type]}"
end
end
def process_image(image_data, dimensions)
# Simulate image processing
sleep(0.1 + rand * 0.2)
"Processed image to #{dimensions[:width]}x#{dimensions[:height]}"
end
def analyze_dataset(dataset)
# Simulate data analysis
sleep(0.2 + rand * 0.3)
{
rows_processed: dataset[:rows],
insights: ["Pattern A detected", "Anomaly in sector B"],
confidence: rand * 0.4 + 0.6
}
end
def generate_report(parameters)
# Simulate report generation
sleep(0.5 + rand * 0.5)
{
report_id: SecureRandom.uuid,
pages: rand(10) + 5,
format: parameters[:format] || "PDF"
}
end
end
# Production usage example
pool = ProductionWorkerPool.new(
min_workers: 2,
max_workers: 6,
idle_timeout: 120,
task_timeout: 30
)
# Submit various types of work
tasks = [
{ type: :image_resize, image_data: "...", dimensions: { width: 800, height: 600 } },
{ type: :data_analysis, dataset: { rows: 10000, columns: 50 } },
{ type: :report_generation, parameters: { format: "HTML", template: "standard" } }
]
task_ids = tasks.map { |task| pool.submit_async(task) }
# Monitor and collect results
task_ids.each do |task_id|
result = pool.get_result(task_id)
puts "Task #{task_id[0..7]}: #{result[:status]}"
puts " Result: #{result[:result]}" if result[:result]
puts " Duration: #{result[:duration]&.round(3)}s" if result[:duration]
end
# Health monitoring
puts "\nPool Health:"
health = pool.health_check
puts " Status: #{health[:status]}"
puts " Active workers: #{health[:workers][:active]}"
puts " Queue size: #{health[:queue][:pending]}"
puts " Tasks completed: #{health[:stats][:tasks_completed]}"
sleep 2 # Let monitoring cycle run
pool.shutdown
Integration with web frameworks requires careful request lifecycle management and resource cleanup to prevent memory leaks and worker exhaustion.
# Rails integration pattern (conceptual)
class BackgroundJobProcessor
def self.instance
@instance ||= new
end
def initialize
@worker_pool = ProductionWorkerPool.new(
min_workers: Rails.env.production? ? 4 : 2,
max_workers: Rails.env.production? ? 12 : 4
)
# Graceful shutdown hooks
at_exit { @worker_pool.shutdown }
if defined?(Rails)
Rails.application.config.after_initialize do
register_shutdown_hooks
end
end
end
def process_async(job_class, *args)
task_data = {
type: :background_job,
job_class: job_class.name,
arguments: args,
environment: Rails.env.to_s,
request_id: Current.request_id # If using request tracking
}
@worker_pool.submit_async(task_data)
end
def wait_for_result(task_id, timeout: 30)
@worker_pool.get_result(task_id, timeout: timeout)
end
def status
@worker_pool.health_check
end
private
def register_shutdown_hooks
# Graceful shutdown for various deployment scenarios
%w[TERM INT].each do |signal|
Signal.trap(signal) do
puts "Received #{signal}, shutting down worker pool..."
@worker_pool.shutdown(timeout: 30)
exit
end
end
end
end
# Usage in Rails controllers
class ReportsController < ApplicationController
def generate
processor = BackgroundJobProcessor.instance
task_id = processor.process_async(ReportGenerationJob, params[:report_type], current_user.id)
render json: { task_id: task_id, status: "processing" }
end
def status
task_id = params[:task_id]
result = BackgroundJobProcessor.instance.wait_for_result(task_id, timeout: 1)
case result[:status]
when :success
render json: { status: "completed", result: result[:result] }
when :timeout
render json: { status: "processing" }
when :error
render json: { status: "error", error: result[:error] }
end
end
end
# Health check endpoint
class HealthController < ApplicationController
def worker_status
status = BackgroundJobProcessor.instance.status
render json: status
end
end
Reference
Core Classes and Methods
Class/Method | Parameters | Returns | Description |
---|---|---|---|
Ractor.new(&block) |
Block, optional args | Ractor |
Creates new worker thread with isolated memory space |
Ractor.new(arg1, arg2, &block) |
Arguments, block | Ractor |
Creates worker with initialization arguments |
#send(object, move: false) |
Object, move option | Ractor |
Sends message to worker (copies or moves object) |
#take |
None | Object |
Blocks until worker yields result |
Ractor.receive |
None | Object |
Receives message in worker (blocks until message available) |
Ractor.yield(object) |
Object | nil |
Sends result from worker to main thread |
#close_incoming |
None | Ractor |
Prevents further messages to worker |
#close_outgoing |
None | Ractor |
Prevents worker from sending more messages |
Communication Patterns
Pattern | Use Case | Implementation |
---|---|---|
Request-Response | Single task processing | worker.send(task); result = worker.take |
Producer-Consumer | Stream processing | Worker loops with Ractor.receive |
Pipeline | Multi-stage processing | Chain workers where each processes and forwards |
Broadcast | Fan-out distribution | Send same data to multiple workers |
Worker Pool | Load balancing | Distribute tasks among available workers |
Object Shareability Rules
Type | Shareable | Notes |
---|---|---|
Integer , Float |
✓ | Immutable numeric types |
String (frozen) |
✓ | Frozen strings are shareable |
String (mutable) |
✗ | Must use move: true or freeze |
Symbol |
✓ | Symbols are immutable |
Array (frozen, all elements shareable) |
✓ | Deep immutability required |
Hash (frozen, all keys/values shareable) |
✓ | Deep immutability required |
Class , Module |
✓ | Class and module objects |
Proc , Method |
✗ | Cannot cross Ractor boundaries |
Custom objects | ✗ | Unless explicitly made Ractor-safe |
Exception Types
Exception | Cause | Resolution |
---|---|---|
Ractor::IsolationError |
Sharing non-Ractor-safe object | Use move: true or make object shareable |
Ractor::ClosedError |
Sending to closed Ractor | Check Ractor state before sending |
Ractor::EmptyError |
No message available (non-blocking operations) | Handle gracefully or use blocking operations |
Ractor::UnsafeError |
Accessing unsafe operations | Avoid thread-unsafe operations in workers |
Performance Considerations
Factor | Impact | Recommendation |
---|---|---|
Worker count | CPU utilization vs overhead | Start with CPU core count, adjust based on workload |
Message size | Memory usage and copy overhead | Minimize message payload, use streaming for large data |
Object copying | Performance penalty for large objects | Use move: true when possible |
GC coordination | Memory pressure across workers | Monitor heap usage, trigger GC strategically |
Task granularity | Context switching overhead | Balance task size with parallelization benefits |
Configuration Options
# Worker pool configuration
{
min_workers: 2, # Minimum active workers
max_workers: 8, # Maximum workers under load
idle_timeout: 300, # Seconds before idle worker removal
task_timeout: 60, # Maximum seconds per task
queue_size_limit: 1000, # Maximum queued tasks
shutdown_timeout: 30 # Graceful shutdown time limit
}
# Message passing options
worker.send(data, move: true) # Transfer ownership
worker.send(data.dup) # Send copy (default behavior)
worker.send(data.freeze) # Make shareable by freezing
# Ractor creation patterns
Ractor.new { /* worker code */ } # Simple worker
Ractor.new(config) { |cfg| /* use config */ } # Parameterized worker
Ractor.new(name: "worker-1") { /* named worker */ } # Named worker (debugging)