CrackedRuby logo

CrackedRuby

Process Pools

Managing and implementing process pools in Ruby applications.

Concurrency and Parallelism Process-based Parallelism
6.5.3

Overview

Process pools in Ruby provide concurrent execution by spawning multiple child processes to handle work in parallel. Ruby implements process pools through the Process module, fork system calls, and process management patterns that distribute tasks across worker processes while maintaining parent-child communication.

The core mechanism relies on Process.fork to create child processes, Process.spawn for more controlled process creation, and inter-process communication through pipes, signals, or shared files. Unlike thread pools that share memory, process pools provide complete isolation between workers, eliminating concerns about the Global Interpreter Lock (GIL) and shared state corruption.

# Basic process pool with fork
pids = []
4.times do
  pid = fork do
    # Worker process code
    puts "Worker #{Process.pid} processing task"
    exit(0)
  end
  pids << pid
end

# Wait for all workers to complete
pids.each { |pid| Process.waitpid(pid) }

Ruby's process pools excel at CPU-intensive tasks, I/O-bound operations that benefit from true parallelism, and scenarios requiring fault isolation. The Process module provides essential methods like waitpid, kill, and detach for lifecycle management.

# Process pool with communication
reader, writer = IO.pipe
pid = fork do
  reader.close
  writer.puts "Result from worker #{Process.pid}"
  writer.close
end
writer.close
result = reader.gets
Process.waitpid(pid)

Process pools handle resource-intensive operations without blocking the main process, distribute work across CPU cores effectively, and provide failure isolation where one process crash doesn't affect others.

Basic Usage

Creating a basic process pool starts with spawning worker processes and distributing tasks among them. The fundamental pattern involves forking multiple processes, assigning work to each, and coordinating their execution.

class SimpleProcessPool
  def initialize(size = 4)
    @size = size
    @workers = []
  end

  def execute(tasks)
    tasks.each_slice((tasks.length / @size.to_f).ceil) do |chunk|
      pid = fork do
        chunk.each do |task|
          process_task(task)
        end
        exit(0)
      end
      @workers << pid
    end

    wait_for_completion
  end

  private

  def wait_for_completion
    @workers.each { |pid| Process.waitpid(pid) }
    @workers.clear
  end

  def process_task(task)
    # Task processing logic
    sleep(rand(0.1..0.5)) # Simulate work
    puts "Processed task: #{task}"
  end
end

pool = SimpleProcessPool.new(3)
pool.execute([1, 2, 3, 4, 5, 6, 7, 8, 9])

Inter-process communication requires establishing communication channels before forking. Pipes provide bidirectional communication between parent and child processes.

class CommunicatingPool
  def initialize(worker_count = 4)
    @worker_count = worker_count
    @pipes = []
    @pids = []
  end

  def start
    @worker_count.times do |i|
      parent_read, child_write = IO.pipe
      child_read, parent_write = IO.pipe

      pid = fork do
        parent_read.close
        parent_write.close
        worker_loop(child_read, child_write, i)
      end

      child_read.close
      child_write.close
      @pipes << { read: parent_read, write: parent_write }
      @pids << pid
    end
  end

  def send_task(worker_id, data)
    @pipes[worker_id][:write].puts(data.to_json)
  end

  def receive_result(worker_id)
    JSON.parse(@pipes[worker_id][:read].gets)
  end

  private

  def worker_loop(input, output, worker_id)
    while line = input.gets
      task_data = JSON.parse(line)
      result = perform_work(task_data, worker_id)
      output.puts(result.to_json)
    end
  end

  def perform_work(data, worker_id)
    { worker_id: worker_id, result: data['value'] * 2, processed_at: Time.now }
  end
end

Process pools handle task distribution by maintaining queues and worker assignments. The parent process coordinates work distribution while workers execute tasks independently.

require 'json'

class QueuedProcessPool
  def initialize(pool_size = 4)
    @pool_size = pool_size
    @task_queue = []
    @workers = {}
    @results = {}
  end

  def submit_tasks(tasks)
    @task_queue.concat(tasks)
    spawn_workers
    distribute_tasks
    collect_results
  end

  private

  def spawn_workers
    @pool_size.times do |i|
      task_pipe_r, task_pipe_w = IO.pipe
      result_pipe_r, result_pipe_w = IO.pipe

      pid = fork do
        task_pipe_w.close
        result_pipe_r.close
        worker_process(task_pipe_r, result_pipe_w)
      end

      task_pipe_r.close
      result_pipe_w.close

      @workers[i] = {
        pid: pid,
        task_pipe: task_pipe_w,
        result_pipe: result_pipe_r
      }
    end
  end

  def distribute_tasks
    @task_queue.each_with_index do |task, index|
      worker_id = index % @pool_size
      @workers[worker_id][:task_pipe].puts(task.to_json)
    end

    # Signal end of tasks
    @workers.each { |_, worker| worker[:task_pipe].close }
  end

  def worker_process(task_input, result_output)
    while line = task_input.gets
      task = JSON.parse(line)
      result = execute_task(task)
      result_output.puts(result.to_json)
    end
    result_output.close
  end

  def execute_task(task)
    # Process the task
    { id: task['id'], result: task['data'].upcase, worker_pid: Process.pid }
  end
end

Advanced Usage

Advanced process pool implementations provide sophisticated worker management, dynamic scaling, and robust error recovery mechanisms. These patterns handle complex scenarios including worker recycling, load balancing, and graceful shutdown procedures.

class ManagedProcessPool
  attr_reader :active_workers, :completed_tasks

  def initialize(min_workers: 2, max_workers: 8, max_tasks_per_worker: 100)
    @min_workers = min_workers
    @max_workers = max_workers
    @max_tasks_per_worker = max_tasks_per_worker
    @workers = {}
    @task_queue = Queue.new
    @result_queue = Queue.new
    @active_workers = 0
    @completed_tasks = 0
    @shutdown = false
  end

  def start
    spawn_initial_workers
    start_manager_thread
  end

  def submit(task)
    raise "Pool is shutting down" if @shutdown
    @task_queue << task
    scale_up_if_needed
  end

  def shutdown(timeout: 30)
    @shutdown = true
    @task_queue << :shutdown

    # Graceful shutdown with timeout
    shutdown_start = Time.now
    while @active_workers > 0 && (Time.now - shutdown_start) < timeout
      sleep(0.1)
    end

    # Force kill remaining workers
    @workers.each do |_, worker|
      Process.kill('KILL', worker[:pid]) rescue nil
    end
  end

  private

  def spawn_initial_workers
    @min_workers.times { spawn_worker }
  end

  def spawn_worker
    return if @active_workers >= @max_workers

    parent_conn, child_conn = create_connection_pair

    pid = fork do
      parent_conn.close
      worker_main_loop(child_conn)
    end

    child_conn.close
    worker_id = SecureRandom.uuid

    @workers[worker_id] = {
      pid: pid,
      connection: parent_conn,
      tasks_processed: 0,
      started_at: Time.now
    }

    @active_workers += 1
    monitor_worker(worker_id)
  end

  def create_connection_pair
    parent_read, child_write = IO.pipe
    child_read, parent_write = IO.pipe

    parent_conn = WorkerConnection.new(parent_read, parent_write)
    child_conn = WorkerConnection.new(child_read, child_write)

    [parent_conn, child_conn]
  end

  def worker_main_loop(connection)
    tasks_processed = 0

    loop do
      task = connection.receive
      break if task == :shutdown || task == :recycle

      begin
        result = process_task(task)
        connection.send({ status: :success, result: result, worker_pid: Process.pid })
        tasks_processed += 1
      rescue => e
        connection.send({
          status: :error,
          error: e.message,
          worker_pid: Process.pid
        })
      end

      break if tasks_processed >= @max_tasks_per_worker
    end
  ensure
    connection.close
  end

  def monitor_worker(worker_id)
    Thread.new do
      worker = @workers[worker_id]

      loop do
        begin
          result = worker[:connection].receive_nonblock
          @result_queue << result
          worker[:tasks_processed] += 1

          # Recycle worker if it's processed too many tasks
          if worker[:tasks_processed] >= @max_tasks_per_worker
            recycle_worker(worker_id)
            break
          end
        rescue IO::WaitReadable
          sleep(0.01)
        rescue EOFError
          # Worker died
          cleanup_worker(worker_id)
          break
        end
      end
    end
  end

  def recycle_worker(worker_id)
    worker = @workers[worker_id]
    worker[:connection].send(:recycle)
    Process.waitpid(worker[:pid])
    cleanup_worker(worker_id)
    spawn_worker if @active_workers < @min_workers
  end

  def cleanup_worker(worker_id)
    worker = @workers.delete(worker_id)
    worker[:connection].close if worker
    @active_workers -= 1
  end
end

class WorkerConnection
  def initialize(read_io, write_io)
    @read_io = read_io
    @write_io = write_io
  end

  def send(data)
    message = Marshal.dump(data)
    @write_io.write([message.length].pack('L'))
    @write_io.write(message)
    @write_io.flush
  end

  def receive
    length_data = @read_io.read(4)
    return nil unless length_data

    length = length_data.unpack('L')[0]
    message_data = @read_io.read(length)
    Marshal.load(message_data)
  end

  def receive_nonblock
    length_data = @read_io.read_nonblock(4)
    length = length_data.unpack('L')[0]
    message_data = @read_io.read_nonblock(length)
    Marshal.load(message_data)
  end

  def close
    @read_io.close unless @read_io.closed?
    @write_io.close unless @write_io.closed?
  end
end

Load balancing distributes tasks based on worker availability and performance metrics. Advanced pools track worker statistics and adjust task assignments accordingly.

class LoadBalancedPool
  def initialize(worker_count: 4)
    @worker_count = worker_count
    @workers = []
    @worker_stats = {}
    @task_assignments = {}
  end

  def start
    create_workers
    start_load_monitor
  end

  def execute_batch(tasks)
    task_assignments = calculate_optimal_distribution(tasks)

    task_assignments.each do |worker_id, worker_tasks|
      assign_tasks_to_worker(worker_id, worker_tasks)
    end

    wait_for_batch_completion
  end

  private

  def calculate_optimal_distribution(tasks)
    # Calculate distribution based on worker performance
    worker_weights = @workers.map.with_index do |_, i|
      stats = @worker_stats[i] || { avg_time: 1.0, success_rate: 1.0 }
      weight = stats[:success_rate] / stats[:avg_time]
      [i, weight]
    end.to_h

    total_weight = worker_weights.values.sum
    distribution = {}

    tasks.each_with_index do |task, index|
      # Assign based on weighted round-robin
      weight_position = (index.to_f / tasks.length) * total_weight

      cumulative_weight = 0
      selected_worker = worker_weights.find do |worker_id, weight|
        cumulative_weight += weight
        cumulative_weight >= weight_position
      end&.first || 0

      distribution[selected_worker] ||= []
      distribution[selected_worker] << task
    end

    distribution
  end

  def start_load_monitor
    Thread.new do
      loop do
        update_worker_statistics
        rebalance_if_needed
        sleep(1)
      end
    end
  end

  def update_worker_statistics
    @workers.each_with_index do |worker, i|
      begin
        stats = worker[:connection].get_stats_nonblock
        @worker_stats[i] = stats if stats
      rescue IO::WaitReadable
        # No stats available yet
      end
    end
  end
end

Thread Safety & Concurrency

Process pools provide inherent isolation between workers, eliminating shared memory concerns that plague thread-based concurrency. Each process operates with its own memory space, preventing race conditions and data corruption between workers.

class IsolatedProcessPool
  def initialize(pool_size: 4)
    @pool_size = pool_size
    @workers = []
    @shared_state_file = "/tmp/process_pool_state_#{Process.pid}"
    initialize_shared_state
  end

  def start
    @pool_size.times do |worker_id|
      spawn_isolated_worker(worker_id)
    end
  end

  def execute_with_shared_state(tasks)
    # Distribute tasks across workers
    task_chunks = tasks.each_slice((tasks.length / @pool_size.to_f).ceil).to_a

    pids = task_chunks.map.with_index do |chunk, worker_id|
      fork do
        worker_with_file_coordination(chunk, worker_id)
      end
    end

    # Wait for all workers and collect results
    pids.each { |pid| Process.waitpid(pid) }
    read_final_state
  end

  private

  def initialize_shared_state
    File.write(@shared_state_file, { counter: 0, results: [] }.to_json)
  end

  def worker_with_file_coordination(tasks, worker_id)
    tasks.each do |task|
      result = process_task(task, worker_id)

      # File-based coordination for shared state
      update_shared_state(result) do |state|
        state['counter'] += 1
        state['results'] << result
        state
      end
    end
  end

  def update_shared_state(data)
    # File locking for atomic updates
    File.open(@shared_state_file, 'r+') do |file|
      file.flock(File::LOCK_EX)

      begin
        current_state = JSON.parse(file.read)
        updated_state = yield(current_state)

        file.rewind
        file.write(updated_state.to_json)
        file.truncate(file.pos)
      ensure
        file.flock(File::LOCK_UN)
      end
    end
  end

  def process_task(task, worker_id)
    {
      task_id: task[:id],
      worker_id: worker_id,
      worker_pid: Process.pid,
      result: expensive_computation(task[:data]),
      processed_at: Time.now.to_f
    }
  end

  def expensive_computation(data)
    # Simulate CPU-intensive work
    (1..data[:iterations]).reduce(0) { |sum, i| sum + Math.sqrt(i) }
  end
end

Signal-based coordination manages worker lifecycle and handles interruption scenarios gracefully. Process pools implement signal handlers for clean shutdown and worker management.

class SignalManagedPool
  SIGNALS = %w[INT TERM USR1 USR2].freeze

  def initialize(worker_count: 4)
    @worker_count = worker_count
    @workers = {}
    @master_pid = Process.pid
    @shutdown_requested = false
    setup_signal_handlers
  end

  def start
    spawn_workers
    master_loop
  end

  private

  def setup_signal_handlers
    # Graceful shutdown
    %w[INT TERM].each do |signal|
      Signal.trap(signal) do
        @shutdown_requested = true
        graceful_shutdown
      end
    end

    # Worker management signals
    Signal.trap('USR1') { add_worker }
    Signal.trap('USR2') { remove_worker }
    Signal.trap('CHLD') { handle_child_death }
  end

  def spawn_workers
    @worker_count.times do |i|
      spawn_worker(i)
    end
  end

  def spawn_worker(worker_id)
    pid = fork do
      setup_worker_signals
      worker_main_loop(worker_id)
    end

    @workers[worker_id] = {
      pid: pid,
      started_at: Time.now,
      tasks_processed: 0
    }
  end

  def setup_worker_signals
    # Worker-specific signal handling
    Signal.trap('TERM') { exit(0) }
    Signal.trap('USR1') { dump_worker_status }

    # Reset other signals to default
    %w[INT USR2 CHLD].each { |sig| Signal.trap(sig, 'DEFAULT') }
  end

  def worker_main_loop(worker_id)
    loop do
      begin
        task = receive_task
        break if task.nil?

        process_task_with_signals(task, worker_id)
        send_heartbeat(worker_id)
      rescue => e
        log_worker_error(worker_id, e)
        sleep(0.1) # Brief pause before continuing
      end
    end
  end

  def process_task_with_signals(task, worker_id)
    start_time = Time.now

    # Set up timeout using ALRM signal
    Signal.trap('ALRM') { raise TimeoutError, "Task timeout" }
    alarm(task[:timeout] || 30)

    begin
      result = execute_task(task)
      send_result(worker_id, result, Time.now - start_time)
    ensure
      alarm(0) # Cancel timeout
      Signal.trap('ALRM', 'DEFAULT')
    end
  end

  def master_loop
    loop do
      break if @shutdown_requested && @workers.empty?

      check_worker_health
      handle_task_queue
      sleep(0.1)
    end
  end

  def graceful_shutdown
    puts "Initiating graceful shutdown..."

    # Send TERM signal to all workers
    @workers.each do |_, worker|
      Process.kill('TERM', worker[:pid]) rescue nil
    end

    # Wait for workers with timeout
    shutdown_timeout = Time.now + 30
    until @workers.empty? || Time.now > shutdown_timeout
      sleep(0.1)
    end

    # Force kill remaining workers
    @workers.each do |_, worker|
      Process.kill('KILL', worker[:pid]) rescue nil
    end
  end

  def handle_child_death
    loop do
      pid, status = Process.waitpid2(-1, Process::WNOHANG)
      break unless pid

      worker_id = @workers.find { |_, w| w[:pid] == pid }&.first
      if worker_id
        handle_worker_death(worker_id, status)
      end
    end
  rescue Errno::ECHILD
    # No child processes
  end

  def handle_worker_death(worker_id, status)
    worker = @workers.delete(worker_id)

    unless @shutdown_requested
      if status.exitsignal
        puts "Worker #{worker_id} killed by signal #{status.exitsignal}"
      elsif status.exitstatus != 0
        puts "Worker #{worker_id} exited with status #{status.exitstatus}"
      end

      # Respawn worker if not shutting down
      spawn_worker(worker_id)
    end
  end
end

Performance & Memory

Process pools optimize performance through parallelization but introduce overhead from process creation, inter-process communication, and memory duplication. Understanding these tradeoffs guides effective implementation decisions.

require 'benchmark'

class PerformanceAnalyzedPool
  def initialize(worker_count: 4)
    @worker_count = worker_count
    @metrics = {
      task_times: [],
      memory_usage: [],
      process_overhead: {},
      communication_latency: []
    }
  end

  def benchmark_execution(tasks, &task_processor)
    puts "Benchmarking with #{@worker_count} workers, #{tasks.length} tasks"

    # Memory baseline
    baseline_memory = get_memory_usage

    execution_time = Benchmark.realtime do
      execute_with_metrics(tasks, &task_processor)
    end

    peak_memory = @metrics[:memory_usage].max
    memory_overhead = peak_memory - baseline_memory

    print_performance_report(execution_time, memory_overhead)
  end

  def execute_with_metrics(tasks, &task_processor)
    task_chunks = distribute_tasks(tasks)
    memory_monitor = start_memory_monitoring

    pids = task_chunks.map.with_index do |chunk, worker_id|
      fork do
        worker_with_profiling(chunk, worker_id, &task_processor)
      end
    end

    # Monitor parent process performance
    parent_monitor = Thread.new { monitor_parent_process }

    pids.each { |pid| Process.waitpid(pid) }
    memory_monitor.kill
    parent_monitor.kill

    aggregate_worker_metrics
  end

  private

  def distribute_tasks(tasks)
    chunk_size = (tasks.length / @worker_count.to_f).ceil
    tasks.each_slice(chunk_size).to_a
  end

  def worker_with_profiling(tasks, worker_id, &task_processor)
    worker_metrics = {
      worker_id: worker_id,
      worker_pid: Process.pid,
      start_memory: get_process_memory,
      task_times: [],
      communication_overhead: []
    }

    tasks.each do |task|
      task_start = Time.now

      result = if task_processor
        task_processor.call(task)
      else
        default_task_processing(task)
      end

      task_duration = Time.now - task_start
      worker_metrics[:task_times] << task_duration

      # Measure communication overhead
      comm_start = Time.now
      report_task_completion(worker_id, result)
      comm_duration = Time.now - comm_start
      worker_metrics[:communication_overhead] << comm_duration
    end

    worker_metrics[:end_memory] = get_process_memory
    worker_metrics[:memory_growth] = worker_metrics[:end_memory] - worker_metrics[:start_memory]

    save_worker_metrics(worker_metrics)
  end

  def start_memory_monitoring
    Thread.new do
      loop do
        total_memory = get_total_pool_memory
        @metrics[:memory_usage] << total_memory
        sleep(0.1)
      end
    end
  end

  def get_process_memory
    # Get RSS (Resident Set Size) in KB
    `ps -o rss= -p #{Process.pid}`.to_i
  end

  def get_total_pool_memory
    # Sum memory usage of all pool processes
    pool_pids = [@master_pid] + @workers.values.map { |w| w[:pid] }
    pool_pids.sum { |pid| `ps -o rss= -p #{pid}`.to_i rescue 0 }
  end

  def print_performance_report(execution_time, memory_overhead)
    avg_task_time = @metrics[:task_times].sum / @metrics[:task_times].length
    tasks_per_second = @metrics[:task_times].length / execution_time
    avg_comm_latency = @metrics[:communication_latency].sum / @metrics[:communication_latency].length

    puts <<~REPORT

      Performance Report:
      ==================
      Total execution time: #{execution_time.round(3)}s
      Tasks per second: #{tasks_per_second.round(2)}
      Average task time: #{avg_task_time.round(4)}s
      Memory overhead: #{memory_overhead}KB
      Communication latency: #{avg_comm_latency.round(4)}s

      Worker Distribution:
    REPORT

    print_worker_performance_breakdown
  end

  def print_worker_performance_breakdown
    worker_files = Dir.glob("/tmp/worker_metrics_*.json")

    worker_files.each do |file|
      metrics = JSON.parse(File.read(file))
      avg_time = metrics['task_times'].sum / metrics['task_times'].length

      puts "  Worker #{metrics['worker_id']} (PID #{metrics['worker_pid']}):"
      puts "    Tasks processed: #{metrics['task_times'].length}"
      puts "    Average time: #{avg_time.round(4)}s"
      puts "    Memory growth: #{metrics['memory_growth']}KB"
      puts "    Comm overhead: #{(metrics['communication_overhead'].sum * 1000).round(2)}ms"
    end

    # Cleanup metrics files
    worker_files.each { |file| File.delete(file) }
  end

  def save_worker_metrics(metrics)
    filename = "/tmp/worker_metrics_#{metrics[:worker_id]}.json"
    File.write(filename, metrics.to_json)
  end
end

# Comparative performance analysis
class PoolPerformanceComparison
  def compare_pool_sizes(tasks, pool_sizes = [1, 2, 4, 8])
    puts "Comparing pool sizes: #{pool_sizes.join(', ')}"

    results = pool_sizes.map do |size|
      puts "\nTesting pool size: #{size}"

      pool = PerformanceAnalyzedPool.new(worker_count: size)

      execution_time = Benchmark.realtime do
        pool.execute_with_metrics(tasks) do |task|
          # CPU-intensive task simulation
          (1..task[:complexity]).reduce(0) { |sum, i| sum + Math.sqrt(i) }
        end
      end

      {
        pool_size: size,
        execution_time: execution_time,
        throughput: tasks.length / execution_time
      }
    end

    print_comparison_results(results)
  end

  private

  def print_comparison_results(results)
    puts "\nPerformance Comparison Results:"
    puts "================================"

    results.each do |result|
      puts "Pool size #{result[:pool_size]}: " \
           "#{result[:execution_time].round(3)}s " \
           "(#{result[:throughput].round(2)} tasks/s)"
    end

    best_result = results.max_by { |r| r[:throughput] }
    puts "\nBest performance: Pool size #{best_result[:pool_size]}"
  end
end

Memory optimization strategies focus on minimizing process overhead and managing shared resources effectively. Copy-on-write memory semantics reduce initial memory consumption in forked processes.

class MemoryOptimizedPool
  def initialize(worker_count: 4, preload_data: true)
    @worker_count = worker_count
    @shared_data = preload_data ? load_shared_resources : nil
    @copy_on_write_optimized = true
  end

  def execute_memory_efficient(tasks)
    # Pre-fork optimization: load shared data before forking
    # to take advantage of copy-on-write semantics
    if @copy_on_write_optimized
      preload_immutable_data
      disable_gc_during_fork
    end

    pids = spawn_optimized_workers(tasks)

    # Re-enable GC in parent
    GC.enable if @copy_on_write_optimized

    pids.each { |pid| Process.waitpid(pid) }
  end

  private

  def preload_immutable_data
    # Load read-only data that will be shared across processes
    @lookup_tables = load_lookup_tables
    @configuration = load_configuration
    @templates = load_templates

    # Force string deduplication to save memory
    ObjectSpace.each_object(String) { |str| str.freeze if str.frozen? }
  end

  def disable_gc_during_fork
    # Disable GC to prevent copy-on-write violations during fork
    GC.disable
  end

  def spawn_optimized_workers(tasks)
    task_distribution = distribute_tasks_efficiently(tasks)

    task_distribution.map.with_index do |worker_tasks, worker_id|
      fork do
        # Re-enable GC in worker process
        GC.enable

        # Set worker-specific GC settings
        optimize_worker_gc

        # Process assigned tasks
        process_worker_tasks(worker_tasks, worker_id)
      end
    end
  end

  def optimize_worker_gc
    # Tune GC for worker process characteristics
    GC.start(full_mark: false) # Initial cleanup

    # Adjust GC parameters for worker workload
    if defined?(GC.tune)
      GC.tune(
        malloc_limit: 16_777_216,     # 16MB
        heap_slots_increment: 10_000,
        heap_slots_growth_factor: 1.8,
        oldmalloc_limit: 16_777_216
      )
    end
  end

  def distribute_tasks_efficiently(tasks)
    # Balance task distribution based on estimated memory usage
    estimated_memory = tasks.map { |task| estimate_task_memory(task) }

    # Sort tasks by memory usage for better distribution
    sorted_indices = (0...tasks.length).sort_by { |i| -estimated_memory[i] }

    # Distribute using round-robin on sorted tasks
    distribution = Array.new(@worker_count) { [] }
    sorted_indices.each_with_index do |task_index, i|
      worker_id = i % @worker_count
      distribution[worker_id] << tasks[task_index]
    end

    distribution
  end

  def estimate_task_memory(task)
    # Rough memory estimation based on task characteristics
    base_memory = 1024 # Base task overhead
    data_memory = task.to_s.bytesize * 2 # Rough data size multiplication

    base_memory + data_memory
  end

  def process_worker_tasks(tasks, worker_id)
    tasks.each_with_index do |task, index|
      process_single_task(task)

      # Periodic memory management
      if (index + 1) % 100 == 0
        perform_incremental_gc
      end
    end

    # Final cleanup
    GC.start(full_mark: true)
  end

  def perform_incremental_gc
    # Incremental GC to manage memory growth
    3.times { GC.start(full_mark: false) }
  end
end

Error Handling & Debugging

Process pool error handling requires managing failures in both parent coordination and individual worker processes. Robust implementations detect worker crashes, handle communication failures, and provide comprehensive debugging information.

class RobustProcessPool
  class PoolError < StandardError; end
  class WorkerCrashError < PoolError; end
  class CommunicationError < PoolError; end
  class TaskTimeoutError < PoolError; end

  def initialize(worker_count: 4, max_retries: 3, timeout: 30)
    @worker_count = worker_count
    @max_retries = max_retries
    @timeout = timeout
    @workers = {}
    @failed_tasks = []
    @error_log = []
  end

  def execute_with_resilience(tasks)
    setup_error_monitoring

    begin
      execute_tasks_with_recovery(tasks)
    rescue => e
      handle_critical_failure(e)
      raise
    ensure
      cleanup_and_report_errors
    end
  end

  private

  def execute_tasks_with_recovery(tasks)
    remaining_tasks = tasks.dup
    attempt = 0

    while !remaining_tasks.empty? && attempt < @max_retries
      attempt += 1
      puts "Execution attempt #{attempt}/#{@max_retries}"

      failed_tasks = execute_attempt(remaining_tasks)

      if failed_tasks.empty?
        break
      else
        remaining_tasks = failed_tasks
        log_attempt_failure(attempt, failed_tasks)
        sleep(attempt * 0.5) # Exponential backoff
      end
    end

    raise PoolError, "Failed to complete tasks after #{@max_retries} attempts" unless remaining_tasks.empty?
  end

  def execute_attempt(tasks)
    workers = spawn_monitored_workers
    failed_tasks = []

    begin
      task_assignments = distribute_tasks_with_monitoring(tasks, workers)

      # Monitor execution with timeout
      execution_results = monitor_execution_with_timeout(task_assignments)

      # Identify failed tasks
      failed_tasks = identify_failed_tasks(tasks, execution_results)

    rescue => e
      log_execution_error("Execution attempt failed", e)
      failed_tasks = tasks # Retry all tasks
    ensure
      cleanup_workers(workers)
    end

    failed_tasks
  end

  def spawn_monitored_workers
    workers = {}

    @worker_count.times do |worker_id|
      begin
        worker = create_monitored_worker(worker_id)
        workers[worker_id] = worker
      rescue => e
        log_worker_creation_error(worker_id, e)
        # Continue with fewer workers rather than failing completely
      end
    end

    raise PoolError, "Failed to create any workers" if workers.empty?
    workers
  end

  def create_monitored_worker(worker_id)
    parent_conn, child_conn = create_reliable_connection

    pid = fork do
      Signal.trap('TERM') { exit(0) } # Graceful shutdown
      parent_conn.close

      begin
        worker_loop_with_error_handling(child_conn, worker_id)
      rescue => e
        error_data = {
          worker_id: worker_id,
          error: e.message,
          backtrace: e.backtrace,
          pid: Process.pid
        }

        # Try to report error before dying
        begin
          child_conn.send({ type: :worker_error, data: error_data })
        rescue
          # Connection might be broken
        end

        exit(1)
      ensure
        child_conn.close
      end
    end

    child_conn.close

    {
      pid: pid,
      connection: parent_conn,
      started_at: Time.now,
      status: :active,
      tasks_assigned: 0,
      tasks_completed: 0
    }
  end

  def worker_loop_with_error_handling(connection, worker_id)
    loop do
      begin
        # Receive task with timeout
        task = connection.receive_with_timeout(@timeout)
        break if task.nil? || task[:type] == :shutdown

        # Process with error isolation
        result = process_task_safely(task, worker_id)
        connection.send({ type: :result, data: result })

      rescue CommunicationError => e
        # Communication failure - log and exit
        puts "Worker #{worker_id}: Communication failed - #{e.message}"
        break
      rescue TaskTimeoutError => e
        # Task timeout - report and continue
        connection.send({
          type: :task_timeout,
          data: { task_id: task[:id], worker_id: worker_id }
        })
      rescue => e
        # Task processing error - report and continue
        connection.send({
          type: :task_error,
          data: {
            task_id: task[:id],
            worker_id: worker_id,
            error: e.message,
            backtrace: e.backtrace
          }
        })
      end
    end
  end

  def process_task_safely(task, worker_id)
    start_time = Time.now

    # Set up task timeout using thread
    timeout_thread = Thread.new do
      sleep(@timeout)
      Thread.main.raise(TaskTimeoutError, "Task #{task[:id]} timed out")
    end

    begin
      result = execute_task(task)

      {
        task_id: task[:id],
        worker_id: worker_id,
        result: result,
        execution_time: Time.now - start_time,
        status: :success
      }
    ensure
      timeout_thread.kill
    end
  end

  def monitor_execution_with_timeout(task_assignments)
    start_time = Time.now
    results = {}
    active_workers = task_assignments.keys.to_set

    while !active_workers.empty? && (Time.now - start_time) < (@timeout * 2)
      active_workers.each do |worker_id|
        worker = @workers[worker_id]
        next unless worker

        begin
          message = worker[:connection].receive_nonblock
          handle_worker_message(worker_id, message, results)

          # Remove worker from active set if it's done
          if message[:type] == :worker_complete
            active_workers.delete(worker_id)
          end

        rescue IO::WaitReadable
          # No message available
        rescue EOFError
          # Worker died
          handle_worker_death(worker_id, active_workers)
        end
      end

      sleep(0.01)
    end

    # Handle timeout
    if !active_workers.empty?
      log_timeout_error(active_workers)
      raise PoolError, "Execution timed out with workers: #{active_workers.to_a}"
    end

    results
  end

  def handle_worker_message(worker_id, message, results)
    case message[:type]
    when :result
      results[message[:data][:task_id]] = message[:data]
    when :task_error
      @failed_tasks << message[:data]
      log_task_error(message[:data])
    when :task_timeout
      @failed_tasks << message[:data]
      log_task_timeout(message[:data])
    when :worker_error
      log_worker_error(message[:data])
      raise WorkerCrashError, "Worker #{worker_id} crashed: #{message[:data][:error]}"
    end
  end

  def log_execution_error(context, error)
    @error_log << {
      timestamp: Time.now,
      type: :execution_error,
      context: context,
      error: error.message,
      backtrace: error.backtrace
    }
  end

  def log_task_error(error_data)
    @error_log << {
      timestamp: Time.now,
      type: :task_error,
      task_id: error_data[:task_id],
      worker_id: error_data[:worker_id],
      error: error_data[:error],
      backtrace: error_data[:backtrace]
    }
  end

  def cleanup_and_report_errors
    unless @error_log.empty?
      puts "\nError Summary:"
      puts "=" * 50

      @error_log.group_by { |e| e[:type] }.each do |error_type, errors|
        puts "#{error_type.to_s.upcase}: #{errors.length} occurrences"
        errors.first(3).each do |error|
          puts "  - #{error[:error]} (#{error[:timestamp]})"
        end
        puts "  ... and #{errors.length - 3} more" if errors.length > 3
      end
    end

    # Write detailed error log to file for analysis
    write_error_log_to_file if @error_log.any?
  end

  def write_error_log_to_file
    filename = "process_pool_errors_#{Time.now.strftime('%Y%m%d_%H%M%S')}.log"
    File.write(filename, @error_log.map(&:to_json).join("\n"))
    puts "\nDetailed error log written to: #{filename}"
  end
end

class ReliableConnection
  def initialize(read_io, write_io)
    @read_io = read_io
    @write_io = write_io
    @message_id = 0
  end

  def send(data)
    @message_id += 1
    message = {
      id: @message_id,
      timestamp: Time.now.to_f,
      payload: data
    }

    serialized = Marshal.dump(message)
    header = [serialized.length].pack('L')

    begin
      @write_io.write(header + serialized)
      @write_io.flush
    rescue Errno::EPIPE, Errno::ECONNRESET
      raise CommunicationError, "Connection broken during send"
    end
  end

  def receive_with_timeout(timeout)
    ready = IO.select([@read_io], nil, nil, timeout)
    raise CommunicationError, "Receive timeout" unless ready

    header = @read_io.read(4)
    raise CommunicationError, "Connection closed" unless header

    length = header.unpack('L')[0]
    data = @read_io.read(length)

    message = Marshal.load(data)
    message[:payload]
  rescue Errno::ECONNRESET, EOFError
    raise CommunicationError, "Connection reset by peer"
  end
end

Reference

Core Process Methods

Method Parameters Returns Description
Process.fork { block } Block (optional) Integer (pid) or nil Creates child process, returns pid in parent, nil in child
Process.spawn(command, **options) Command (String/Array), options (Hash) Integer (pid) Spawns new process with specified command and options
Process.waitpid(pid, flags=0) pid (Integer), flags (Integer) Integer (pid) Waits for specific process to terminate
Process.waitpid2(pid, flags=0) pid (Integer), flags (Integer) [Integer, Process::Status] Returns pid and status object
Process.wait None Integer (pid) Waits for any child process to terminate
Process.detach(pid) pid (Integer) Thread Creates thread to wait for process, prevents zombies
Process.kill(signal, *pids) signal (String/Integer), pids (Integers) Integer (count) Sends signal to specified processes

Process Status Methods

Method Parameters Returns Description
status.exited? None Boolean True if process exited normally
status.exitstatus None Integer or nil Exit status code if exited normally
status.signaled? None Boolean True if process terminated by signal
status.termsig None Integer or nil Signal number that terminated process
status.stopsig None Integer or nil Signal number that stopped process
status.success? None Boolean True if process exited with status 0

Signal Constants

Constant Value Description
Signal::TERM 15 Termination request (graceful)
Signal::KILL 9 Force kill (cannot be caught)
Signal::INT 2 Interrupt (Ctrl-C)
Signal::USR1 10 User-defined signal 1
Signal::USR2 12 User-defined signal 2
Signal::CHLD 17 Child status changed
Signal::ALRM 14 Alarm timer expired

Process Spawn Options

Option Type Description
:in, :out, :err IO, :close, String Redirect standard streams
:unsetenv_others Boolean Clear environment except specified
:pgroup Integer, true Process group ID or create new group
:rlimit_* [Integer, Integer] Resource limits (cpu, fsize, etc.)
:umask Integer File creation mask
:chdir String Working directory for child process
:close_others Boolean Close file descriptors except 0,1,2

Inter-Process Communication Patterns

Pattern Use Case Pros Cons
Pipes Simple bidirectional communication Fast, built-in Limited to parent-child
Named Pipes (FIFO) Process-to-process messaging Persistent, file-based Requires filesystem cleanup
Shared Files State coordination Simple, debuggable I/O overhead, race conditions
Signals Process control, notifications Immediate, lightweight Limited data, platform differences
Sockets Network-like IPC Full-duplex, networked Setup complexity
Message Queues Async messaging Buffered, persistent Platform-specific

Process Pool Configuration Parameters

Parameter Typical Range Description
Pool Size 2-16 processes Number of worker processes
Max Tasks/Worker 100-1000 tasks Tasks before worker recycling
Task Timeout 30-300 seconds Maximum task execution time
Startup Timeout 5-30 seconds Worker initialization timeout
Shutdown Timeout 10-60 seconds Graceful shutdown wait time
Max Retries 3-5 attempts Failed task retry limit
Backoff Multiplier 1.5-2.0 Retry delay exponential factor

Common Process Pool Errors

Error Cause Resolution
Errno::ECHILD No child processes to wait for Check process lifecycle management
Errno::ESRCH Process doesn't exist Verify PID validity before operations
Errno::EPERM Permission denied for process operation Check user permissions and process ownership
Errno::EPIPE Broken pipe during communication Implement connection error handling
Process::Status timeout Worker process hangs Implement task timeouts and monitoring
Zombie processes Child processes not reaped Use Process.detach or proper waitpid calls

Memory Optimization Strategies

Strategy Implementation Memory Impact
Copy-on-Write Load shared data before fork Reduces initial memory by 50-80%
Worker Recycling Limit tasks per worker process Prevents memory leaks accumulation
GC Tuning Adjust heap parameters per worker Reduces GC overhead by 20-40%
String Deduplication Freeze shared strings Saves 10-30% on string-heavy workloads
Lazy Loading Load resources on demand Reduces baseline memory usage
Process Affinity Bind processes to CPU cores Improves cache locality