CrackedRuby logo

CrackedRuby

Worker Threads

This guide covers Worker Threads in Ruby, including parallel execution patterns, communication mechanisms, and production deployment strategies.

Concurrency and Parallelism Thread Pools
6.7.3

Overview

Worker Threads provide true parallel execution in Ruby by running code in separate operating system threads without the Global Interpreter Lock (GIL) restrictions. Each worker thread operates as an isolated Ruby interpreter instance that can execute CPU-intensive operations concurrently with the main thread and other worker threads.

Ruby implements Worker Threads through the Ractor class, which creates actor-like processes that communicate through message passing. Each Ractor maintains its own object space, preventing shared mutable state and eliminating most thread safety concerns that plague traditional Ruby threading.

# Create a worker thread for CPU-intensive calculation
worker = Ractor.new do
  def prime?(n)
    return false if n < 2
    (2..Math.sqrt(n)).none? { |i| n % i == 0 }
  end
  
  # Receive work from main thread
  while number = Ractor.receive
    result = prime?(number)
    Ractor.yield result
  end
end

# Send work to worker and get result
worker.send(97)
puts worker.take # => true

Ractors operate on a share-nothing architecture where objects cannot be freely passed between threads. Only immutable objects, Ractor-safe objects, or objects explicitly copied can cross Ractor boundaries. This design prevents data races and eliminates the need for locks in most scenarios.

# Immutable objects can be shared
worker = Ractor.new do |name|
  "Hello, #{name}!" # String literals are shareable
end.send("World")

puts worker.take # => "Hello, World!"

# Mutable objects must be moved or copied
data = [1, 2, 3, 4, 5]
worker = Ractor.new do
  numbers = Ractor.receive
  numbers.sum
end

worker.send(data, move: true) # Transfer ownership
puts worker.take # => 15

Basic Usage

Creating Worker Threads involves instantiating Ractors with blocks that define their behavior. The main thread communicates with workers through send and receive operations, while workers return results using Ractor.yield or by terminating with a return value.

# Basic worker creation and communication
calculator = Ractor.new do
  loop do
    operation, a, b = Ractor.receive
    case operation
    when :add
      Ractor.yield(a + b)
    when :multiply
      Ractor.yield(a * b)
    when :stop
      break
    end
  end
end

# Send operations to worker
calculator.send([:add, 10, 5])
puts calculator.take # => 15

calculator.send([:multiply, 4, 6])
puts calculator.take # => 24

calculator.send([:stop])

Workers can be configured with initial arguments passed during creation. These arguments must be Ractor-safe objects that can cross thread boundaries without violating the isolation model.

# Worker with initialization parameters
file_processor = Ractor.new("data.txt", /\d+/) do |filename, pattern|
  File.readlines(filename).map do |line|
    line.scan(pattern).map(&:to_i)
  end.flatten
end

numbers = file_processor.take
puts numbers.sum

Multiple workers can operate concurrently to process different parts of a workload. The main thread distributes work among available workers and collects results as they complete.

# Parallel processing with multiple workers
def create_worker_pool(size)
  Array.new(size) do |i|
    Ractor.new(i) do |worker_id|
      loop do
        work = Ractor.receive
        break if work == :shutdown
        
        # Simulate CPU-intensive work
        result = work.map { |n| n ** 2 }
        Ractor.yield [worker_id, result]
      end
    end
  end
end

workers = create_worker_pool(3)
data_chunks = [[1,2,3], [4,5,6], [7,8,9]]

# Distribute work
data_chunks.each_with_index do |chunk, i|
  workers[i].send(chunk)
end

# Collect results
results = workers.map(&:take)
results.each do |worker_id, values|
  puts "Worker #{worker_id}: #{values.join(', ')}"
end

# Shutdown workers
workers.each { |w| w.send(:shutdown) }

Worker threads can also function as independent services that process streams of incoming work rather than single tasks.

# Long-running worker service
log_processor = Ractor.new do
  processed_count = 0
  
  loop do
    message = Ractor.receive
    case message
    when Hash
      # Process log entry
      level = message[:level]
      content = message[:message]
      timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S")
      
      puts "[#{timestamp}] #{level.upcase}: #{content}"
      processed_count += 1
    when :status
      Ractor.yield processed_count
    when :shutdown
      break
    end
  end
end

# Send log messages
log_processor.send(level: :info, message: "Application started")
log_processor.send(level: :error, message: "Database connection failed")

# Check status
log_processor.send(:status)
puts "Processed #{log_processor.take} messages"

log_processor.send(:shutdown)

Thread Safety & Concurrency

Worker Threads achieve thread safety through complete isolation rather than synchronization mechanisms. Each Ractor operates in its own memory space with no shared mutable state, eliminating data races and deadlock conditions that affect traditional threaded programs.

Objects passed between Ractors must satisfy shareability requirements. Immutable objects like numbers, symbols, and frozen strings can be shared freely. Mutable objects require explicit copying or ownership transfer through the move option.

# Demonstrating object shareability
shareable_data = {
  number: 42,
  symbol: :status,
  frozen_string: "Hello".freeze,
  frozen_array: [1, 2, 3].freeze
}

worker = Ractor.new do
  data = Ractor.receive
  puts "Received: #{data.inspect}"
  data[:number] * 2
end

worker.send(shareable_data) # Safe to share
result = worker.take
puts result # => 84

Attempting to share non-Ractor-safe objects raises Ractor::IsolationError. This prevents accidental state corruption that could occur with traditional threading approaches.

# Non-shareable objects cause isolation errors
mutable_array = [1, 2, 3]
worker = Ractor.new do
  Ractor.receive.sum
end

begin
  worker.send(mutable_array)
rescue Ractor::IsolationError => e
  puts "Cannot share mutable array: #{e.message}"
  
  # Solution: use move semantics
  worker.send(mutable_array, move: true)
  puts worker.take # => 6
end

Complex coordination patterns require careful design of message protocols between workers and the main thread. Workers can maintain internal state safely since no other thread can access their memory space.

# Coordinated worker pool with state management
class WorkerPool
  def initialize(size)
    @workers = Array.new(size) do |i|
      create_worker(i)
    end
    @next_worker = 0
  end
  
  def submit_work(task)
    worker = @workers[@next_worker]
    @next_worker = (@next_worker + 1) % @workers.size
    
    worker.send(task, move: true)
    worker
  end
  
  def shutdown
    @workers.each { |w| w.send(:terminate) }
  end
  
  private
  
  def create_worker(id)
    Ractor.new(id) do |worker_id|
      completed_tasks = 0
      
      loop do
        task = Ractor.receive
        break if task == :terminate
        
        # Process task with access to worker-local state
        result = process_task(task)
        completed_tasks += 1
        
        Ractor.yield({
          worker_id: worker_id,
          result: result,
          completed_count: completed_tasks
        })
      end
    end
  end
  
  def process_task(task)
    # Simulate work
    task.map { |x| Math.sin(x) }.sum
  end
end

pool = WorkerPool.new(4)
tasks = Array.new(10) { Array.new(100) { rand } }

# Submit all tasks and collect results
futures = tasks.map { |task| pool.submit_work(task) }
results = futures.map(&:take)

results.each do |result|
  puts "Worker #{result[:worker_id]}: #{result[:result].round(4)} (#{result[:completed_count]} tasks)"
end

pool.shutdown

Pipeline architectures can chain multiple workers where each stage processes data and passes results to the next stage. This pattern maintains isolation while enabling complex data transformations.

# Multi-stage processing pipeline
def create_pipeline
  # Stage 1: Data validation and parsing
  parser = Ractor.new do
    loop do
      raw_data = Ractor.receive
      break if raw_data == :shutdown
      
      # Validate and parse input
      parsed = raw_data.select { |item| item.is_a?(String) && !item.empty? }
                       .map { |s| s.strip.downcase }
      
      Ractor.yield parsed
    end
  end
  
  # Stage 2: Data transformation
  transformer = Ractor.new do
    loop do
      data = Ractor.receive
      break if data == :shutdown
      
      # Transform data
      transformed = data.map do |word|
        {
          word: word,
          length: word.length,
          vowel_count: word.count('aeiou'),
          hash: word.hash
        }
      end
      
      Ractor.yield transformed
    end
  end
  
  # Stage 3: Aggregation
  aggregator = Ractor.new do
    total_words = 0
    total_length = 0
    
    loop do
      data = Ractor.receive
      break if data == :shutdown
      
      total_words += data.length
      total_length += data.sum { |item| item[:length] }
      
      Ractor.yield({
        batch_size: data.length,
        total_processed: total_words,
        average_length: total_length.to_f / total_words
      })
    end
  end
  
  [parser, transformer, aggregator]
end

pipeline = create_pipeline
raw_data = ["Hello", "", "World", "Ruby", "  Programming  ", nil, "Ractor"]

# Process through pipeline
pipeline[0].send(raw_data, move: true)
parsed = pipeline[0].take

pipeline[1].send(parsed, move: true)
transformed = pipeline[1].take

pipeline[2].send(transformed, move: true)
stats = pipeline[2].take

puts "Processed #{stats[:batch_size]} words"
puts "Total processed: #{stats[:total_processed]}"
puts "Average length: #{stats[:average_length].round(2)}"

# Shutdown pipeline
pipeline.each { |stage| stage.send(:shutdown) }

Performance & Memory

Worker Threads excel at CPU-intensive tasks that can be parallelized across multiple cores. Each Ractor runs on a separate operating system thread with its own Ruby interpreter instance, enabling true parallel execution without GIL constraints.

Memory usage scales with the number of Ractors since each maintains its own object space and interpreter state. A typical Ractor consumes 1-2 MB of base memory before loading application code and data.

# Memory and performance measurement utilities
def measure_memory_usage
  GC.start
  # Note: Actual memory measurement would require external tools
  # This demonstrates the pattern for monitoring
  puts "Memory usage: #{GC.stat(:heap_live_slots)} live objects"
end

def benchmark_parallel_vs_sequential
  data = Array.new(100_000) { rand(1000) }
  
  # Sequential processing
  start_time = Time.now
  sequential_result = data.map { |n| Math.sqrt(n) * Math.sin(n) }.sum
  sequential_time = Time.now - start_time
  
  # Parallel processing
  measure_memory_usage
  start_time = Time.now
  
  chunk_size = data.size / 4
  chunks = data.each_slice(chunk_size).to_a
  
  workers = chunks.map do |chunk|
    Ractor.new do
      work = Ractor.receive
      work.map { |n| Math.sqrt(n) * Math.sin(n) }.sum
    end
  end
  
  chunks.each_with_index { |chunk, i| workers[i].send(chunk, move: true) }
  parallel_result = workers.map(&:take).sum
  parallel_time = Time.now - start_time
  
  measure_memory_usage
  
  puts "Sequential: #{sequential_time.round(4)}s, Result: #{sequential_result.round(4)}"
  puts "Parallel:   #{parallel_time.round(4)}s, Result: #{parallel_result.round(4)}"
  puts "Speedup: #{(sequential_time / parallel_time).round(2)}x"
end

benchmark_parallel_vs_sequential

Optimal performance requires balancing the number of workers with available CPU cores and task characteristics. Too many workers can cause excessive context switching overhead, while too few underutilize available parallelism.

# Dynamic worker pool sizing based on workload
class AdaptiveWorkerPool
  def initialize(min_workers: 1, max_workers: nil)
    @min_workers = min_workers
    @max_workers = max_workers || Etc.nprocessors
    @workers = []
    @task_queue = Queue.new
    @results = Queue.new
    
    # Start with minimum workers
    scale_to(@min_workers)
  end
  
  def process_batch(tasks, timeout: 60)
    start_time = Time.now
    submitted_count = 0
    
    # Submit all tasks
    tasks.each do |task|
      @task_queue << task
      submitted_count += 1
    end
    
    # Monitor progress and adapt worker count
    results = []
    while results.size < submitted_count && (Time.now - start_time) < timeout
      begin
        result = @results.pop(timeout: 1)
        results << result if result
      rescue ThreadError
        # Check if we need more workers
        if @task_queue.size > @workers.size && @workers.size < @max_workers
          add_worker
        end
      end
    end
    
    results
  end
  
  private
  
  def scale_to(count)
    while @workers.size < count
      add_worker
    end
  end
  
  def add_worker
    worker = Ractor.new(@task_queue, @results) do |task_queue, results|
      while task = task_queue.pop
        break if task == :shutdown
        
        # Process task
        processed = expensive_computation(task)
        results << processed
      end
    end
    
    @workers << worker
    puts "Scaled to #{@workers.size} workers"
  end
  
  def expensive_computation(data)
    # Simulate CPU-intensive work
    data.map { |x| (1..1000).map { |i| Math.sin(x * i) }.sum }
  end
end

# Note: This example shows the concept but requires additional
# thread-safe queue implementation for full functionality

Memory management becomes critical when processing large datasets since each Ractor maintains separate object spaces. Strategies include streaming processing, chunked data handling, and explicit garbage collection coordination.

# Memory-efficient streaming processor
class StreamProcessor
  def initialize(worker_count: 4)
    @workers = create_worker_pool(worker_count)
    @current_worker = 0
  end
  
  def process_large_file(filename, chunk_size: 1000)
    File.foreach(filename).each_slice(chunk_size) do |chunk|
      worker = next_worker
      
      # Send chunk to worker and immediately get result
      worker.send(chunk.dup, move: true)
      result = worker.take
      
      # Process result immediately to avoid memory buildup
      yield result if block_given?
      
      # Trigger GC periodically
      GC.start if rand < 0.1
    end
  end
  
  private
  
  def create_worker_pool(count)
    Array.new(count) do
      Ractor.new do
        loop do
          chunk = Ractor.receive
          break if chunk == :shutdown
          
          # Process chunk with limited memory footprint
          processed = chunk.map(&:strip)
                          .reject(&:empty?)
                          .map { |line| analyze_line(line) }
          
          # Force local GC before yielding
          GC.start
          Ractor.yield processed
        end
      end
    end
  end
  
  def next_worker
    worker = @workers[@current_worker]
    @current_worker = (@current_worker + 1) % @workers.size
    worker
  end
  
  def analyze_line(line)
    words = line.split
    {
      word_count: words.size,
      char_count: line.length,
      complexity: words.map(&:length).sum.to_f / words.size
    }
  end
end

# Usage for processing large files efficiently
processor = StreamProcessor.new(worker_count: 4)
total_stats = { lines: 0, words: 0, chars: 0 }

processor.process_large_file("large_file.txt") do |batch_results|
  batch_results.each do |stats|
    total_stats[:lines] += 1
    total_stats[:words] += stats[:word_count]
    total_stats[:chars] += stats[:char_count]
  end
  
  # Print progress without accumulating memory
  puts "Processed #{total_stats[:lines]} lines..." if total_stats[:lines] % 10000 == 0
end

puts "Final stats: #{total_stats}"

Error Handling & Debugging

Worker Thread error handling requires different strategies than traditional threading since exceptions cannot propagate directly between isolated Ractors. Each worker must handle its own exceptions and communicate error states through the message passing interface.

# Robust error handling in worker threads
def create_resilient_worker
  Ractor.new do
    loop do
      begin
        task = Ractor.receive
        break if task == :shutdown
        
        case task[:type]
        when :calculate
          result = risky_calculation(task[:data])
          Ractor.yield({ status: :success, result: result })
        when :process_file
          content = File.read(task[:filename])
          processed = content.lines.map(&:strip).reject(&:empty?)
          Ractor.yield({ status: :success, result: processed })
        else
          Ractor.yield({ status: :error, error: "Unknown task type: #{task[:type]}" })
        end
        
      rescue StandardError => e
        # Capture exception details for main thread
        error_info = {
          status: :error,
          error_class: e.class.name,
          error_message: e.message,
          backtrace: e.backtrace[0..5] # Limit backtrace size
        }
        Ractor.yield(error_info)
      end
    end
  end
end

def risky_calculation(data)
  raise ArgumentError, "Data cannot be negative" if data < 0
  raise RuntimeError, "Calculation overflow" if data > 1000
  
  Math.sqrt(data) * 100
end

# Using resilient worker with comprehensive error handling
worker = create_resilient_worker

test_cases = [
  { type: :calculate, data: 16 },     # Should succeed
  { type: :calculate, data: -5 },     # Will raise ArgumentError
  { type: :calculate, data: 2000 },   # Will raise RuntimeError
  { type: :invalid, data: 42 },       # Unknown task type
  { type: :process_file, filename: "nonexistent.txt" } # File error
]

test_cases.each_with_index do |task, i|
  puts "Test case #{i + 1}:"
  worker.send(task)
  
  response = worker.take
  case response[:status]
  when :success
    puts "  Success: #{response[:result]}"
  when :error
    puts "  Error: #{response[:error_class]} - #{response[:error_message]}"
    puts "  Backtrace: #{response[:backtrace]&.first}" if response[:backtrace]
  end
end

worker.send(:shutdown)

Debugging Worker Threads presents unique challenges since traditional debugging tools cannot inspect across Ractor boundaries. Effective debugging relies on structured logging, state reporting mechanisms, and careful message protocol design.

# Debugging support for worker threads
module WorkerDebugging
  def self.create_debug_worker(worker_id)
    Ractor.new(worker_id) do |id|
      debug_log = []
      processed_count = 0
      start_time = Time.now
      
      loop do
        message = Ractor.receive
        timestamp = Time.now
        
        case message
        when Hash
          if message[:debug_command]
            handle_debug_command(message, debug_log, processed_count, start_time)
          else
            # Normal work processing
            debug_log << { time: timestamp, action: "received_work", data: message.keys }
            
            begin
              result = process_work(message)
              processed_count += 1
              
              debug_log << { time: timestamp, action: "completed_work", result_size: result.size }
              Ractor.yield({ status: :success, result: result, worker_id: id })
              
            rescue => e
              debug_log << { time: timestamp, action: "error", error: e.class.name }
              Ractor.yield({ 
                status: :error, 
                error: e.message, 
                worker_id: id,
                debug_log: debug_log.last(5) # Recent activity
              })
            end
          end
          
        when :shutdown
          debug_log << { time: timestamp, action: "shutdown" }
          break
        end
        
        # Limit debug log size
        debug_log = debug_log.last(100) if debug_log.size > 100
      end
    end
  end
  
  def self.handle_debug_command(message, debug_log, processed_count, start_time)
    case message[:debug_command]
    when :status
      status = {
        processed_count: processed_count,
        uptime: Time.now - start_time,
        recent_activity: debug_log.last(5)
      }
      Ractor.yield({ debug_status: status })
      
    when :log_dump
      Ractor.yield({ debug_log: debug_log })
      
    when :memory_stats
      # Note: In practice, this would use more sophisticated memory inspection
      stats = {
        gc_stat: GC.stat.slice(:count, :heap_live_slots, :heap_free_slots),
        object_count: ObjectSpace.count_objects
      }
      Ractor.yield({ memory_stats: stats })
    end
  end
  
  def self.process_work(data)
    # Simulate work that might fail
    data[:items].map do |item|
      raise "Invalid item" if item.nil?
      item.to_s.upcase
    end
  end
end

# Using debug-enabled workers
workers = Array.new(3) { |i| WorkerDebugging.create_debug_worker(i) }

# Send some work
work_items = [
  { items: ["hello", "world"] },
  { items: ["ruby", nil, "ractor"] }, # Contains invalid item
  { items: (1..5).to_a }
]

work_items.each_with_index do |work, i|
  worker = workers[i % workers.size]
  worker.send(work)
  
  result = worker.take
  puts "Work #{i}: #{result[:status]}"
  
  if result[:status] == :error
    puts "  Error: #{result[:error]}"
    puts "  Recent activity:"
    result[:debug_log]&.each do |entry|
      puts "    #{entry[:time].strftime('%H:%M:%S')} - #{entry[:action]}: #{entry.except(:time, :action)}"
    end
  end
end

# Query worker status
workers.each_with_index do |worker, i|
  worker.send({ debug_command: :status })
  status = worker.take
  
  puts "Worker #{i} status:"
  puts "  Processed: #{status[:debug_status][:processed_count]} tasks"
  puts "  Uptime: #{status[:debug_status][:uptime].round(2)} seconds"
end

workers.each { |w| w.send(:shutdown) }

Timeout handling prevents workers from hanging indefinitely on problematic tasks. Implementation requires careful coordination between timeout detection and worker cleanup.

# Timeout-aware worker management
class TimeoutWorkerManager
  def initialize(default_timeout: 30)
    @default_timeout = default_timeout
    @active_workers = {}
  end
  
  def execute_with_timeout(task, timeout: nil)
    timeout ||= @default_timeout
    
    worker = create_monitored_worker(task, timeout)
    worker_id = worker.object_id
    @active_workers[worker_id] = { worker: worker, start_time: Time.now }
    
    begin
      # Wait for result with timeout
      result = worker.take
      @active_workers.delete(worker_id)
      result
      
    rescue Ractor::ClosedError
      @active_workers.delete(worker_id)
      { status: :error, error: "Worker terminated unexpectedly" }
      
    rescue => e
      cleanup_worker(worker_id)
      { status: :error, error: "Timeout or execution error: #{e.message}" }
    end
  end
  
  def monitor_timeouts
    Thread.new do
      loop do
        current_time = Time.now
        
        @active_workers.each do |worker_id, info|
          if current_time - info[:start_time] > @default_timeout
            puts "Worker #{worker_id} exceeded timeout, cleaning up..."
            cleanup_worker(worker_id)
          end
        end
        
        sleep 1
      end
    end
  end
  
  private
  
  def create_monitored_worker(task, timeout)
    Ractor.new(task, timeout) do |work, time_limit|
      start_time = Time.now
      
      begin
        # Periodically check if we've exceeded timeout
        result = perform_work_with_checks(work, time_limit, start_time)
        Ractor.yield({ status: :success, result: result, duration: Time.now - start_time })
        
      rescue => e
        Ractor.yield({ 
          status: :error, 
          error: e.message,
          duration: Time.now - start_time 
        })
      end
    end
  end
  
  def perform_work_with_checks(work, time_limit, start_time)
    work[:items].map.with_index do |item, i|
      # Check timeout periodically during long operations
      if i % 100 == 0 && (Time.now - start_time) > time_limit
        raise Timeout::Error, "Operation exceeded #{time_limit} seconds"
      end
      
      # Simulate varying work duration
      sleep(work[:delay] || 0.01)
      item * 2
    end
  end
  
  def cleanup_worker(worker_id)
    if info = @active_workers.delete(worker_id)
      begin
        # Attempt graceful cleanup
        # Note: In practice, forceful termination of Ractors
        # may require additional coordination
        puts "Cleaned up worker #{worker_id} after #{(Time.now - info[:start_time]).round(2)}s"
      rescue => e
        puts "Error during worker cleanup: #{e.message}"
      end
    end
  end
end

# Using timeout-aware workers
manager = TimeoutWorkerManager.new(default_timeout: 5)
timeout_monitor = manager.monitor_timeouts

test_tasks = [
  { items: (1..10).to_a, delay: 0.1 },      # Should complete
  { items: (1..100).to_a, delay: 0.1 },     # Will timeout
  { items: ["error"], delay: 0 }            # Will raise exception
]

test_tasks.each_with_index do |task, i|
  puts "Executing task #{i + 1}:"
  result = manager.execute_with_timeout(task, timeout: 3)
  
  puts "  Status: #{result[:status]}"
  puts "  Duration: #{result[:duration]&.round(2)}s" if result[:duration]
  puts "  Error: #{result[:error]}" if result[:error]
  puts "  Result size: #{result[:result]&.size}" if result[:result]
end

timeout_monitor.kill

Production Patterns

Production deployment of Worker Threads requires careful consideration of resource management, monitoring, error recovery, and integration with existing application architectures. Worker pools must be sized appropriately for the deployment environment and workload characteristics.

# Production-ready worker pool implementation
class ProductionWorkerPool
  attr_reader :stats
  
  def initialize(config = {})
    @min_workers = config[:min_workers] || 2
    @max_workers = config[:max_workers] || [Etc.nprocessors * 2, 8].min
    @idle_timeout = config[:idle_timeout] || 300
    @task_timeout = config[:task_timeout] || 60
    
    @workers = {}
    @task_queue = Queue.new
    @results = {}
    @stats = initialize_stats
    @monitor_thread = nil
    @shutdown = false
    
    start_monitoring
    scale_to(@min_workers)
  end
  
  def submit_async(task_data)
    task_id = SecureRandom.uuid
    
    @stats[:tasks_submitted] += 1
    @task_queue << { id: task_id, data: task_data, submitted_at: Time.now }
    
    task_id
  end
  
  def get_result(task_id, timeout: @task_timeout)
    deadline = Time.now + timeout
    
    while Time.now < deadline
      if result = @results.delete(task_id)
        @stats[:tasks_completed] += 1
        return result
      end
      
      sleep 0.01
    end
    
    @stats[:tasks_timeout] += 1
    { status: :timeout, error: "Task #{task_id} timed out after #{timeout}s" }
  end
  
  def health_check
    {
      status: @shutdown ? :shutdown : :healthy,
      workers: {
        active: @workers.size,
        min: @min_workers,
        max: @max_workers
      },
      queue: {
        pending: @task_queue.size,
        processing: @workers.count { |_, info| info[:current_task] }
      },
      stats: @stats.dup
    }
  end
  
  def shutdown(timeout: 30)
    @shutdown = true
    @monitor_thread&.kill
    
    # Stop accepting new work
    @task_queue.clear
    
    # Wait for workers to complete current tasks
    deadline = Time.now + timeout
    while @workers.any? && Time.now < deadline
      @workers.each { |_, info| info[:worker].send(:shutdown) if info[:worker] }
      sleep 0.1
      cleanup_completed_workers
    end
    
    # Force cleanup remaining workers
    @workers.clear
    puts "Worker pool shutdown complete"
  end
  
  private
  
  def initialize_stats
    {
      tasks_submitted: 0,
      tasks_completed: 0,
      tasks_failed: 0,
      tasks_timeout: 0,
      workers_created: 0,
      workers_destroyed: 0,
      average_task_time: 0.0
    }
  end
  
  def start_monitoring
    @monitor_thread = Thread.new do
      loop do
        break if @shutdown
        
        begin
          manage_worker_pool
          cleanup_completed_workers
          update_performance_stats
          
          sleep 1
        rescue => e
          puts "Monitor error: #{e.message}"
          sleep 5
        end
      end
    end
  end
  
  def manage_worker_pool
    queue_size = @task_queue.size
    active_workers = @workers.size
    
    # Scale up if queue is building up
    if queue_size > active_workers && active_workers < @max_workers
      add_worker
    end
    
    # Scale down if workers are idle
    if queue_size == 0 && active_workers > @min_workers
      idle_workers = @workers.select do |_, info|
        info[:current_task].nil? && 
        (Time.now - info[:last_task_at]) > @idle_timeout
      end
      
      if idle_workers.any?
        worker_id, info = idle_workers.first
        remove_worker(worker_id)
      end
    end
  end
  
  def add_worker
    worker_id = SecureRandom.uuid
    
    worker = Ractor.new(worker_id, @task_queue, @results) do |id, queue, results|
      loop do
        begin
          task = queue.pop(timeout: 1)
          break if task == :shutdown || task.nil?
          
          start_time = Time.now
          
          # Process the actual work
          result = perform_task(task[:data])
          
          duration = Time.now - start_time
          
          results[task[:id]] = {
            status: :success,
            result: result,
            duration: duration,
            worker_id: id
          }
          
        rescue => e
          results[task[:id]] = {
            status: :error,
            error: e.message,
            error_class: e.class.name,
            worker_id: id
          }
        end
      end
    end
    
    @workers[worker_id] = {
      worker: worker,
      created_at: Time.now,
      last_task_at: Time.now,
      current_task: nil
    }
    
    @stats[:workers_created] += 1
    puts "Added worker #{worker_id} (total: #{@workers.size})"
  end
  
  def remove_worker(worker_id)
    if info = @workers.delete(worker_id)
      info[:worker].send(:shutdown)
      @stats[:workers_destroyed] += 1
      puts "Removed worker #{worker_id} (total: #{@workers.size})"
    end
  end
  
  def cleanup_completed_workers
    @workers.delete_if do |worker_id, info|
      begin
        info[:worker].take # Non-blocking check if worker is done
        false # Keep worker if still active
      rescue Ractor::ClosedError
        @stats[:workers_destroyed] += 1
        true # Remove closed worker
      rescue Ractor::EmptyError
        false # Worker is alive but no message available
      end
    end
  end
  
  def update_performance_stats
    # Calculate average task completion time
    completed = @stats[:tasks_completed]
    if completed > 0
      # This would need more sophisticated timing tracking in practice
      @stats[:average_task_time] = calculate_average_task_time
    end
  end
  
  def calculate_average_task_time
    # Placeholder for actual timing calculation
    0.5
  end
  
  def perform_task(data)
    # Example task processing - replace with actual business logic
    case data[:type]
    when :image_resize
      process_image(data[:image_data], data[:dimensions])
    when :data_analysis
      analyze_dataset(data[:dataset])
    when :report_generation
      generate_report(data[:parameters])
    else
      raise "Unknown task type: #{data[:type]}"
    end
  end
  
  def process_image(image_data, dimensions)
    # Simulate image processing
    sleep(0.1 + rand * 0.2)
    "Processed image to #{dimensions[:width]}x#{dimensions[:height]}"
  end
  
  def analyze_dataset(dataset)
    # Simulate data analysis
    sleep(0.2 + rand * 0.3)
    {
      rows_processed: dataset[:rows],
      insights: ["Pattern A detected", "Anomaly in sector B"],
      confidence: rand * 0.4 + 0.6
    }
  end
  
  def generate_report(parameters)
    # Simulate report generation
    sleep(0.5 + rand * 0.5)
    {
      report_id: SecureRandom.uuid,
      pages: rand(10) + 5,
      format: parameters[:format] || "PDF"
    }
  end
end

# Production usage example
pool = ProductionWorkerPool.new(
  min_workers: 2,
  max_workers: 6,
  idle_timeout: 120,
  task_timeout: 30
)

# Submit various types of work
tasks = [
  { type: :image_resize, image_data: "...", dimensions: { width: 800, height: 600 } },
  { type: :data_analysis, dataset: { rows: 10000, columns: 50 } },
  { type: :report_generation, parameters: { format: "HTML", template: "standard" } }
]

task_ids = tasks.map { |task| pool.submit_async(task) }

# Monitor and collect results
task_ids.each do |task_id|
  result = pool.get_result(task_id)
  puts "Task #{task_id[0..7]}: #{result[:status]}"
  puts "  Result: #{result[:result]}" if result[:result]
  puts "  Duration: #{result[:duration]&.round(3)}s" if result[:duration]
end

# Health monitoring
puts "\nPool Health:"
health = pool.health_check
puts "  Status: #{health[:status]}"
puts "  Active workers: #{health[:workers][:active]}"
puts "  Queue size: #{health[:queue][:pending]}"
puts "  Tasks completed: #{health[:stats][:tasks_completed]}"

sleep 2 # Let monitoring cycle run

pool.shutdown

Integration with web frameworks requires careful request lifecycle management and resource cleanup to prevent memory leaks and worker exhaustion.

# Rails integration pattern (conceptual)
class BackgroundJobProcessor
  def self.instance
    @instance ||= new
  end
  
  def initialize
    @worker_pool = ProductionWorkerPool.new(
      min_workers: Rails.env.production? ? 4 : 2,
      max_workers: Rails.env.production? ? 12 : 4
    )
    
    # Graceful shutdown hooks
    at_exit { @worker_pool.shutdown }
    
    if defined?(Rails)
      Rails.application.config.after_initialize do
        register_shutdown_hooks
      end
    end
  end
  
  def process_async(job_class, *args)
    task_data = {
      type: :background_job,
      job_class: job_class.name,
      arguments: args,
      environment: Rails.env.to_s,
      request_id: Current.request_id # If using request tracking
    }
    
    @worker_pool.submit_async(task_data)
  end
  
  def wait_for_result(task_id, timeout: 30)
    @worker_pool.get_result(task_id, timeout: timeout)
  end
  
  def status
    @worker_pool.health_check
  end
  
  private
  
  def register_shutdown_hooks
    # Graceful shutdown for various deployment scenarios
    %w[TERM INT].each do |signal|
      Signal.trap(signal) do
        puts "Received #{signal}, shutting down worker pool..."
        @worker_pool.shutdown(timeout: 30)
        exit
      end
    end
  end
end

# Usage in Rails controllers
class ReportsController < ApplicationController
  def generate
    processor = BackgroundJobProcessor.instance
    task_id = processor.process_async(ReportGenerationJob, params[:report_type], current_user.id)
    
    render json: { task_id: task_id, status: "processing" }
  end
  
  def status
    task_id = params[:task_id]
    result = BackgroundJobProcessor.instance.wait_for_result(task_id, timeout: 1)
    
    case result[:status]
    when :success
      render json: { status: "completed", result: result[:result] }
    when :timeout
      render json: { status: "processing" }
    when :error
      render json: { status: "error", error: result[:error] }
    end
  end
end

# Health check endpoint
class HealthController < ApplicationController
  def worker_status
    status = BackgroundJobProcessor.instance.status
    render json: status
  end
end

Reference

Core Classes and Methods

Class/Method Parameters Returns Description
Ractor.new(&block) Block, optional args Ractor Creates new worker thread with isolated memory space
Ractor.new(arg1, arg2, &block) Arguments, block Ractor Creates worker with initialization arguments
#send(object, move: false) Object, move option Ractor Sends message to worker (copies or moves object)
#take None Object Blocks until worker yields result
Ractor.receive None Object Receives message in worker (blocks until message available)
Ractor.yield(object) Object nil Sends result from worker to main thread
#close_incoming None Ractor Prevents further messages to worker
#close_outgoing None Ractor Prevents worker from sending more messages

Communication Patterns

Pattern Use Case Implementation
Request-Response Single task processing worker.send(task); result = worker.take
Producer-Consumer Stream processing Worker loops with Ractor.receive
Pipeline Multi-stage processing Chain workers where each processes and forwards
Broadcast Fan-out distribution Send same data to multiple workers
Worker Pool Load balancing Distribute tasks among available workers

Object Shareability Rules

Type Shareable Notes
Integer, Float Immutable numeric types
String (frozen) Frozen strings are shareable
String (mutable) Must use move: true or freeze
Symbol Symbols are immutable
Array (frozen, all elements shareable) Deep immutability required
Hash (frozen, all keys/values shareable) Deep immutability required
Class, Module Class and module objects
Proc, Method Cannot cross Ractor boundaries
Custom objects Unless explicitly made Ractor-safe

Exception Types

Exception Cause Resolution
Ractor::IsolationError Sharing non-Ractor-safe object Use move: true or make object shareable
Ractor::ClosedError Sending to closed Ractor Check Ractor state before sending
Ractor::EmptyError No message available (non-blocking operations) Handle gracefully or use blocking operations
Ractor::UnsafeError Accessing unsafe operations Avoid thread-unsafe operations in workers

Performance Considerations

Factor Impact Recommendation
Worker count CPU utilization vs overhead Start with CPU core count, adjust based on workload
Message size Memory usage and copy overhead Minimize message payload, use streaming for large data
Object copying Performance penalty for large objects Use move: true when possible
GC coordination Memory pressure across workers Monitor heap usage, trigger GC strategically
Task granularity Context switching overhead Balance task size with parallelization benefits

Configuration Options

# Worker pool configuration
{
  min_workers: 2,              # Minimum active workers
  max_workers: 8,              # Maximum workers under load
  idle_timeout: 300,           # Seconds before idle worker removal
  task_timeout: 60,            # Maximum seconds per task
  queue_size_limit: 1000,      # Maximum queued tasks
  shutdown_timeout: 30         # Graceful shutdown time limit
}

# Message passing options
worker.send(data, move: true)     # Transfer ownership
worker.send(data.dup)            # Send copy (default behavior)
worker.send(data.freeze)         # Make shareable by freezing

# Ractor creation patterns
Ractor.new { /* worker code */ }                    # Simple worker
Ractor.new(config) { |cfg| /* use config */ }      # Parameterized worker
Ractor.new(name: "worker-1") { /* named worker */ } # Named worker (debugging)