Overview
Process pools in Ruby provide concurrent execution by spawning multiple child processes to handle work in parallel. Ruby implements process pools through the Process
module, fork
system calls, and process management patterns that distribute tasks across worker processes while maintaining parent-child communication.
The core mechanism relies on Process.fork
to create child processes, Process.spawn
for more controlled process creation, and inter-process communication through pipes, signals, or shared files. Unlike thread pools that share memory, process pools provide complete isolation between workers, eliminating concerns about the Global Interpreter Lock (GIL) and shared state corruption.
# Basic process pool with fork
pids = []
4.times do
pid = fork do
# Worker process code
puts "Worker #{Process.pid} processing task"
exit(0)
end
pids << pid
end
# Wait for all workers to complete
pids.each { |pid| Process.waitpid(pid) }
Ruby's process pools excel at CPU-intensive tasks, I/O-bound operations that benefit from true parallelism, and scenarios requiring fault isolation. The Process
module provides essential methods like waitpid
, kill
, and detach
for lifecycle management.
# Process pool with communication
reader, writer = IO.pipe
pid = fork do
reader.close
writer.puts "Result from worker #{Process.pid}"
writer.close
end
writer.close
result = reader.gets
Process.waitpid(pid)
Process pools handle resource-intensive operations without blocking the main process, distribute work across CPU cores effectively, and provide failure isolation where one process crash doesn't affect others.
Basic Usage
Creating a basic process pool starts with spawning worker processes and distributing tasks among them. The fundamental pattern involves forking multiple processes, assigning work to each, and coordinating their execution.
class SimpleProcessPool
def initialize(size = 4)
@size = size
@workers = []
end
def execute(tasks)
tasks.each_slice((tasks.length / @size.to_f).ceil) do |chunk|
pid = fork do
chunk.each do |task|
process_task(task)
end
exit(0)
end
@workers << pid
end
wait_for_completion
end
private
def wait_for_completion
@workers.each { |pid| Process.waitpid(pid) }
@workers.clear
end
def process_task(task)
# Task processing logic
sleep(rand(0.1..0.5)) # Simulate work
puts "Processed task: #{task}"
end
end
pool = SimpleProcessPool.new(3)
pool.execute([1, 2, 3, 4, 5, 6, 7, 8, 9])
Inter-process communication requires establishing communication channels before forking. Pipes provide bidirectional communication between parent and child processes.
class CommunicatingPool
def initialize(worker_count = 4)
@worker_count = worker_count
@pipes = []
@pids = []
end
def start
@worker_count.times do |i|
parent_read, child_write = IO.pipe
child_read, parent_write = IO.pipe
pid = fork do
parent_read.close
parent_write.close
worker_loop(child_read, child_write, i)
end
child_read.close
child_write.close
@pipes << { read: parent_read, write: parent_write }
@pids << pid
end
end
def send_task(worker_id, data)
@pipes[worker_id][:write].puts(data.to_json)
end
def receive_result(worker_id)
JSON.parse(@pipes[worker_id][:read].gets)
end
private
def worker_loop(input, output, worker_id)
while line = input.gets
task_data = JSON.parse(line)
result = perform_work(task_data, worker_id)
output.puts(result.to_json)
end
end
def perform_work(data, worker_id)
{ worker_id: worker_id, result: data['value'] * 2, processed_at: Time.now }
end
end
Process pools handle task distribution by maintaining queues and worker assignments. The parent process coordinates work distribution while workers execute tasks independently.
require 'json'
class QueuedProcessPool
def initialize(pool_size = 4)
@pool_size = pool_size
@task_queue = []
@workers = {}
@results = {}
end
def submit_tasks(tasks)
@task_queue.concat(tasks)
spawn_workers
distribute_tasks
collect_results
end
private
def spawn_workers
@pool_size.times do |i|
task_pipe_r, task_pipe_w = IO.pipe
result_pipe_r, result_pipe_w = IO.pipe
pid = fork do
task_pipe_w.close
result_pipe_r.close
worker_process(task_pipe_r, result_pipe_w)
end
task_pipe_r.close
result_pipe_w.close
@workers[i] = {
pid: pid,
task_pipe: task_pipe_w,
result_pipe: result_pipe_r
}
end
end
def distribute_tasks
@task_queue.each_with_index do |task, index|
worker_id = index % @pool_size
@workers[worker_id][:task_pipe].puts(task.to_json)
end
# Signal end of tasks
@workers.each { |_, worker| worker[:task_pipe].close }
end
def worker_process(task_input, result_output)
while line = task_input.gets
task = JSON.parse(line)
result = execute_task(task)
result_output.puts(result.to_json)
end
result_output.close
end
def execute_task(task)
# Process the task
{ id: task['id'], result: task['data'].upcase, worker_pid: Process.pid }
end
end
Advanced Usage
Advanced process pool implementations provide sophisticated worker management, dynamic scaling, and robust error recovery mechanisms. These patterns handle complex scenarios including worker recycling, load balancing, and graceful shutdown procedures.
class ManagedProcessPool
attr_reader :active_workers, :completed_tasks
def initialize(min_workers: 2, max_workers: 8, max_tasks_per_worker: 100)
@min_workers = min_workers
@max_workers = max_workers
@max_tasks_per_worker = max_tasks_per_worker
@workers = {}
@task_queue = Queue.new
@result_queue = Queue.new
@active_workers = 0
@completed_tasks = 0
@shutdown = false
end
def start
spawn_initial_workers
start_manager_thread
end
def submit(task)
raise "Pool is shutting down" if @shutdown
@task_queue << task
scale_up_if_needed
end
def shutdown(timeout: 30)
@shutdown = true
@task_queue << :shutdown
# Graceful shutdown with timeout
shutdown_start = Time.now
while @active_workers > 0 && (Time.now - shutdown_start) < timeout
sleep(0.1)
end
# Force kill remaining workers
@workers.each do |_, worker|
Process.kill('KILL', worker[:pid]) rescue nil
end
end
private
def spawn_initial_workers
@min_workers.times { spawn_worker }
end
def spawn_worker
return if @active_workers >= @max_workers
parent_conn, child_conn = create_connection_pair
pid = fork do
parent_conn.close
worker_main_loop(child_conn)
end
child_conn.close
worker_id = SecureRandom.uuid
@workers[worker_id] = {
pid: pid,
connection: parent_conn,
tasks_processed: 0,
started_at: Time.now
}
@active_workers += 1
monitor_worker(worker_id)
end
def create_connection_pair
parent_read, child_write = IO.pipe
child_read, parent_write = IO.pipe
parent_conn = WorkerConnection.new(parent_read, parent_write)
child_conn = WorkerConnection.new(child_read, child_write)
[parent_conn, child_conn]
end
def worker_main_loop(connection)
tasks_processed = 0
loop do
task = connection.receive
break if task == :shutdown || task == :recycle
begin
result = process_task(task)
connection.send({ status: :success, result: result, worker_pid: Process.pid })
tasks_processed += 1
rescue => e
connection.send({
status: :error,
error: e.message,
worker_pid: Process.pid
})
end
break if tasks_processed >= @max_tasks_per_worker
end
ensure
connection.close
end
def monitor_worker(worker_id)
Thread.new do
worker = @workers[worker_id]
loop do
begin
result = worker[:connection].receive_nonblock
@result_queue << result
worker[:tasks_processed] += 1
# Recycle worker if it's processed too many tasks
if worker[:tasks_processed] >= @max_tasks_per_worker
recycle_worker(worker_id)
break
end
rescue IO::WaitReadable
sleep(0.01)
rescue EOFError
# Worker died
cleanup_worker(worker_id)
break
end
end
end
end
def recycle_worker(worker_id)
worker = @workers[worker_id]
worker[:connection].send(:recycle)
Process.waitpid(worker[:pid])
cleanup_worker(worker_id)
spawn_worker if @active_workers < @min_workers
end
def cleanup_worker(worker_id)
worker = @workers.delete(worker_id)
worker[:connection].close if worker
@active_workers -= 1
end
end
class WorkerConnection
def initialize(read_io, write_io)
@read_io = read_io
@write_io = write_io
end
def send(data)
message = Marshal.dump(data)
@write_io.write([message.length].pack('L'))
@write_io.write(message)
@write_io.flush
end
def receive
length_data = @read_io.read(4)
return nil unless length_data
length = length_data.unpack('L')[0]
message_data = @read_io.read(length)
Marshal.load(message_data)
end
def receive_nonblock
length_data = @read_io.read_nonblock(4)
length = length_data.unpack('L')[0]
message_data = @read_io.read_nonblock(length)
Marshal.load(message_data)
end
def close
@read_io.close unless @read_io.closed?
@write_io.close unless @write_io.closed?
end
end
Load balancing distributes tasks based on worker availability and performance metrics. Advanced pools track worker statistics and adjust task assignments accordingly.
class LoadBalancedPool
def initialize(worker_count: 4)
@worker_count = worker_count
@workers = []
@worker_stats = {}
@task_assignments = {}
end
def start
create_workers
start_load_monitor
end
def execute_batch(tasks)
task_assignments = calculate_optimal_distribution(tasks)
task_assignments.each do |worker_id, worker_tasks|
assign_tasks_to_worker(worker_id, worker_tasks)
end
wait_for_batch_completion
end
private
def calculate_optimal_distribution(tasks)
# Calculate distribution based on worker performance
worker_weights = @workers.map.with_index do |_, i|
stats = @worker_stats[i] || { avg_time: 1.0, success_rate: 1.0 }
weight = stats[:success_rate] / stats[:avg_time]
[i, weight]
end.to_h
total_weight = worker_weights.values.sum
distribution = {}
tasks.each_with_index do |task, index|
# Assign based on weighted round-robin
weight_position = (index.to_f / tasks.length) * total_weight
cumulative_weight = 0
selected_worker = worker_weights.find do |worker_id, weight|
cumulative_weight += weight
cumulative_weight >= weight_position
end&.first || 0
distribution[selected_worker] ||= []
distribution[selected_worker] << task
end
distribution
end
def start_load_monitor
Thread.new do
loop do
update_worker_statistics
rebalance_if_needed
sleep(1)
end
end
end
def update_worker_statistics
@workers.each_with_index do |worker, i|
begin
stats = worker[:connection].get_stats_nonblock
@worker_stats[i] = stats if stats
rescue IO::WaitReadable
# No stats available yet
end
end
end
end
Thread Safety & Concurrency
Process pools provide inherent isolation between workers, eliminating shared memory concerns that plague thread-based concurrency. Each process operates with its own memory space, preventing race conditions and data corruption between workers.
class IsolatedProcessPool
def initialize(pool_size: 4)
@pool_size = pool_size
@workers = []
@shared_state_file = "/tmp/process_pool_state_#{Process.pid}"
initialize_shared_state
end
def start
@pool_size.times do |worker_id|
spawn_isolated_worker(worker_id)
end
end
def execute_with_shared_state(tasks)
# Distribute tasks across workers
task_chunks = tasks.each_slice((tasks.length / @pool_size.to_f).ceil).to_a
pids = task_chunks.map.with_index do |chunk, worker_id|
fork do
worker_with_file_coordination(chunk, worker_id)
end
end
# Wait for all workers and collect results
pids.each { |pid| Process.waitpid(pid) }
read_final_state
end
private
def initialize_shared_state
File.write(@shared_state_file, { counter: 0, results: [] }.to_json)
end
def worker_with_file_coordination(tasks, worker_id)
tasks.each do |task|
result = process_task(task, worker_id)
# File-based coordination for shared state
update_shared_state(result) do |state|
state['counter'] += 1
state['results'] << result
state
end
end
end
def update_shared_state(data)
# File locking for atomic updates
File.open(@shared_state_file, 'r+') do |file|
file.flock(File::LOCK_EX)
begin
current_state = JSON.parse(file.read)
updated_state = yield(current_state)
file.rewind
file.write(updated_state.to_json)
file.truncate(file.pos)
ensure
file.flock(File::LOCK_UN)
end
end
end
def process_task(task, worker_id)
{
task_id: task[:id],
worker_id: worker_id,
worker_pid: Process.pid,
result: expensive_computation(task[:data]),
processed_at: Time.now.to_f
}
end
def expensive_computation(data)
# Simulate CPU-intensive work
(1..data[:iterations]).reduce(0) { |sum, i| sum + Math.sqrt(i) }
end
end
Signal-based coordination manages worker lifecycle and handles interruption scenarios gracefully. Process pools implement signal handlers for clean shutdown and worker management.
class SignalManagedPool
SIGNALS = %w[INT TERM USR1 USR2].freeze
def initialize(worker_count: 4)
@worker_count = worker_count
@workers = {}
@master_pid = Process.pid
@shutdown_requested = false
setup_signal_handlers
end
def start
spawn_workers
master_loop
end
private
def setup_signal_handlers
# Graceful shutdown
%w[INT TERM].each do |signal|
Signal.trap(signal) do
@shutdown_requested = true
graceful_shutdown
end
end
# Worker management signals
Signal.trap('USR1') { add_worker }
Signal.trap('USR2') { remove_worker }
Signal.trap('CHLD') { handle_child_death }
end
def spawn_workers
@worker_count.times do |i|
spawn_worker(i)
end
end
def spawn_worker(worker_id)
pid = fork do
setup_worker_signals
worker_main_loop(worker_id)
end
@workers[worker_id] = {
pid: pid,
started_at: Time.now,
tasks_processed: 0
}
end
def setup_worker_signals
# Worker-specific signal handling
Signal.trap('TERM') { exit(0) }
Signal.trap('USR1') { dump_worker_status }
# Reset other signals to default
%w[INT USR2 CHLD].each { |sig| Signal.trap(sig, 'DEFAULT') }
end
def worker_main_loop(worker_id)
loop do
begin
task = receive_task
break if task.nil?
process_task_with_signals(task, worker_id)
send_heartbeat(worker_id)
rescue => e
log_worker_error(worker_id, e)
sleep(0.1) # Brief pause before continuing
end
end
end
def process_task_with_signals(task, worker_id)
start_time = Time.now
# Set up timeout using ALRM signal
Signal.trap('ALRM') { raise TimeoutError, "Task timeout" }
alarm(task[:timeout] || 30)
begin
result = execute_task(task)
send_result(worker_id, result, Time.now - start_time)
ensure
alarm(0) # Cancel timeout
Signal.trap('ALRM', 'DEFAULT')
end
end
def master_loop
loop do
break if @shutdown_requested && @workers.empty?
check_worker_health
handle_task_queue
sleep(0.1)
end
end
def graceful_shutdown
puts "Initiating graceful shutdown..."
# Send TERM signal to all workers
@workers.each do |_, worker|
Process.kill('TERM', worker[:pid]) rescue nil
end
# Wait for workers with timeout
shutdown_timeout = Time.now + 30
until @workers.empty? || Time.now > shutdown_timeout
sleep(0.1)
end
# Force kill remaining workers
@workers.each do |_, worker|
Process.kill('KILL', worker[:pid]) rescue nil
end
end
def handle_child_death
loop do
pid, status = Process.waitpid2(-1, Process::WNOHANG)
break unless pid
worker_id = @workers.find { |_, w| w[:pid] == pid }&.first
if worker_id
handle_worker_death(worker_id, status)
end
end
rescue Errno::ECHILD
# No child processes
end
def handle_worker_death(worker_id, status)
worker = @workers.delete(worker_id)
unless @shutdown_requested
if status.exitsignal
puts "Worker #{worker_id} killed by signal #{status.exitsignal}"
elsif status.exitstatus != 0
puts "Worker #{worker_id} exited with status #{status.exitstatus}"
end
# Respawn worker if not shutting down
spawn_worker(worker_id)
end
end
end
Performance & Memory
Process pools optimize performance through parallelization but introduce overhead from process creation, inter-process communication, and memory duplication. Understanding these tradeoffs guides effective implementation decisions.
require 'benchmark'
class PerformanceAnalyzedPool
def initialize(worker_count: 4)
@worker_count = worker_count
@metrics = {
task_times: [],
memory_usage: [],
process_overhead: {},
communication_latency: []
}
end
def benchmark_execution(tasks, &task_processor)
puts "Benchmarking with #{@worker_count} workers, #{tasks.length} tasks"
# Memory baseline
baseline_memory = get_memory_usage
execution_time = Benchmark.realtime do
execute_with_metrics(tasks, &task_processor)
end
peak_memory = @metrics[:memory_usage].max
memory_overhead = peak_memory - baseline_memory
print_performance_report(execution_time, memory_overhead)
end
def execute_with_metrics(tasks, &task_processor)
task_chunks = distribute_tasks(tasks)
memory_monitor = start_memory_monitoring
pids = task_chunks.map.with_index do |chunk, worker_id|
fork do
worker_with_profiling(chunk, worker_id, &task_processor)
end
end
# Monitor parent process performance
parent_monitor = Thread.new { monitor_parent_process }
pids.each { |pid| Process.waitpid(pid) }
memory_monitor.kill
parent_monitor.kill
aggregate_worker_metrics
end
private
def distribute_tasks(tasks)
chunk_size = (tasks.length / @worker_count.to_f).ceil
tasks.each_slice(chunk_size).to_a
end
def worker_with_profiling(tasks, worker_id, &task_processor)
worker_metrics = {
worker_id: worker_id,
worker_pid: Process.pid,
start_memory: get_process_memory,
task_times: [],
communication_overhead: []
}
tasks.each do |task|
task_start = Time.now
result = if task_processor
task_processor.call(task)
else
default_task_processing(task)
end
task_duration = Time.now - task_start
worker_metrics[:task_times] << task_duration
# Measure communication overhead
comm_start = Time.now
report_task_completion(worker_id, result)
comm_duration = Time.now - comm_start
worker_metrics[:communication_overhead] << comm_duration
end
worker_metrics[:end_memory] = get_process_memory
worker_metrics[:memory_growth] = worker_metrics[:end_memory] - worker_metrics[:start_memory]
save_worker_metrics(worker_metrics)
end
def start_memory_monitoring
Thread.new do
loop do
total_memory = get_total_pool_memory
@metrics[:memory_usage] << total_memory
sleep(0.1)
end
end
end
def get_process_memory
# Get RSS (Resident Set Size) in KB
`ps -o rss= -p #{Process.pid}`.to_i
end
def get_total_pool_memory
# Sum memory usage of all pool processes
pool_pids = [@master_pid] + @workers.values.map { |w| w[:pid] }
pool_pids.sum { |pid| `ps -o rss= -p #{pid}`.to_i rescue 0 }
end
def print_performance_report(execution_time, memory_overhead)
avg_task_time = @metrics[:task_times].sum / @metrics[:task_times].length
tasks_per_second = @metrics[:task_times].length / execution_time
avg_comm_latency = @metrics[:communication_latency].sum / @metrics[:communication_latency].length
puts <<~REPORT
Performance Report:
==================
Total execution time: #{execution_time.round(3)}s
Tasks per second: #{tasks_per_second.round(2)}
Average task time: #{avg_task_time.round(4)}s
Memory overhead: #{memory_overhead}KB
Communication latency: #{avg_comm_latency.round(4)}s
Worker Distribution:
REPORT
print_worker_performance_breakdown
end
def print_worker_performance_breakdown
worker_files = Dir.glob("/tmp/worker_metrics_*.json")
worker_files.each do |file|
metrics = JSON.parse(File.read(file))
avg_time = metrics['task_times'].sum / metrics['task_times'].length
puts " Worker #{metrics['worker_id']} (PID #{metrics['worker_pid']}):"
puts " Tasks processed: #{metrics['task_times'].length}"
puts " Average time: #{avg_time.round(4)}s"
puts " Memory growth: #{metrics['memory_growth']}KB"
puts " Comm overhead: #{(metrics['communication_overhead'].sum * 1000).round(2)}ms"
end
# Cleanup metrics files
worker_files.each { |file| File.delete(file) }
end
def save_worker_metrics(metrics)
filename = "/tmp/worker_metrics_#{metrics[:worker_id]}.json"
File.write(filename, metrics.to_json)
end
end
# Comparative performance analysis
class PoolPerformanceComparison
def compare_pool_sizes(tasks, pool_sizes = [1, 2, 4, 8])
puts "Comparing pool sizes: #{pool_sizes.join(', ')}"
results = pool_sizes.map do |size|
puts "\nTesting pool size: #{size}"
pool = PerformanceAnalyzedPool.new(worker_count: size)
execution_time = Benchmark.realtime do
pool.execute_with_metrics(tasks) do |task|
# CPU-intensive task simulation
(1..task[:complexity]).reduce(0) { |sum, i| sum + Math.sqrt(i) }
end
end
{
pool_size: size,
execution_time: execution_time,
throughput: tasks.length / execution_time
}
end
print_comparison_results(results)
end
private
def print_comparison_results(results)
puts "\nPerformance Comparison Results:"
puts "================================"
results.each do |result|
puts "Pool size #{result[:pool_size]}: " \
"#{result[:execution_time].round(3)}s " \
"(#{result[:throughput].round(2)} tasks/s)"
end
best_result = results.max_by { |r| r[:throughput] }
puts "\nBest performance: Pool size #{best_result[:pool_size]}"
end
end
Memory optimization strategies focus on minimizing process overhead and managing shared resources effectively. Copy-on-write memory semantics reduce initial memory consumption in forked processes.
class MemoryOptimizedPool
def initialize(worker_count: 4, preload_data: true)
@worker_count = worker_count
@shared_data = preload_data ? load_shared_resources : nil
@copy_on_write_optimized = true
end
def execute_memory_efficient(tasks)
# Pre-fork optimization: load shared data before forking
# to take advantage of copy-on-write semantics
if @copy_on_write_optimized
preload_immutable_data
disable_gc_during_fork
end
pids = spawn_optimized_workers(tasks)
# Re-enable GC in parent
GC.enable if @copy_on_write_optimized
pids.each { |pid| Process.waitpid(pid) }
end
private
def preload_immutable_data
# Load read-only data that will be shared across processes
@lookup_tables = load_lookup_tables
@configuration = load_configuration
@templates = load_templates
# Force string deduplication to save memory
ObjectSpace.each_object(String) { |str| str.freeze if str.frozen? }
end
def disable_gc_during_fork
# Disable GC to prevent copy-on-write violations during fork
GC.disable
end
def spawn_optimized_workers(tasks)
task_distribution = distribute_tasks_efficiently(tasks)
task_distribution.map.with_index do |worker_tasks, worker_id|
fork do
# Re-enable GC in worker process
GC.enable
# Set worker-specific GC settings
optimize_worker_gc
# Process assigned tasks
process_worker_tasks(worker_tasks, worker_id)
end
end
end
def optimize_worker_gc
# Tune GC for worker process characteristics
GC.start(full_mark: false) # Initial cleanup
# Adjust GC parameters for worker workload
if defined?(GC.tune)
GC.tune(
malloc_limit: 16_777_216, # 16MB
heap_slots_increment: 10_000,
heap_slots_growth_factor: 1.8,
oldmalloc_limit: 16_777_216
)
end
end
def distribute_tasks_efficiently(tasks)
# Balance task distribution based on estimated memory usage
estimated_memory = tasks.map { |task| estimate_task_memory(task) }
# Sort tasks by memory usage for better distribution
sorted_indices = (0...tasks.length).sort_by { |i| -estimated_memory[i] }
# Distribute using round-robin on sorted tasks
distribution = Array.new(@worker_count) { [] }
sorted_indices.each_with_index do |task_index, i|
worker_id = i % @worker_count
distribution[worker_id] << tasks[task_index]
end
distribution
end
def estimate_task_memory(task)
# Rough memory estimation based on task characteristics
base_memory = 1024 # Base task overhead
data_memory = task.to_s.bytesize * 2 # Rough data size multiplication
base_memory + data_memory
end
def process_worker_tasks(tasks, worker_id)
tasks.each_with_index do |task, index|
process_single_task(task)
# Periodic memory management
if (index + 1) % 100 == 0
perform_incremental_gc
end
end
# Final cleanup
GC.start(full_mark: true)
end
def perform_incremental_gc
# Incremental GC to manage memory growth
3.times { GC.start(full_mark: false) }
end
end
Error Handling & Debugging
Process pool error handling requires managing failures in both parent coordination and individual worker processes. Robust implementations detect worker crashes, handle communication failures, and provide comprehensive debugging information.
class RobustProcessPool
class PoolError < StandardError; end
class WorkerCrashError < PoolError; end
class CommunicationError < PoolError; end
class TaskTimeoutError < PoolError; end
def initialize(worker_count: 4, max_retries: 3, timeout: 30)
@worker_count = worker_count
@max_retries = max_retries
@timeout = timeout
@workers = {}
@failed_tasks = []
@error_log = []
end
def execute_with_resilience(tasks)
setup_error_monitoring
begin
execute_tasks_with_recovery(tasks)
rescue => e
handle_critical_failure(e)
raise
ensure
cleanup_and_report_errors
end
end
private
def execute_tasks_with_recovery(tasks)
remaining_tasks = tasks.dup
attempt = 0
while !remaining_tasks.empty? && attempt < @max_retries
attempt += 1
puts "Execution attempt #{attempt}/#{@max_retries}"
failed_tasks = execute_attempt(remaining_tasks)
if failed_tasks.empty?
break
else
remaining_tasks = failed_tasks
log_attempt_failure(attempt, failed_tasks)
sleep(attempt * 0.5) # Exponential backoff
end
end
raise PoolError, "Failed to complete tasks after #{@max_retries} attempts" unless remaining_tasks.empty?
end
def execute_attempt(tasks)
workers = spawn_monitored_workers
failed_tasks = []
begin
task_assignments = distribute_tasks_with_monitoring(tasks, workers)
# Monitor execution with timeout
execution_results = monitor_execution_with_timeout(task_assignments)
# Identify failed tasks
failed_tasks = identify_failed_tasks(tasks, execution_results)
rescue => e
log_execution_error("Execution attempt failed", e)
failed_tasks = tasks # Retry all tasks
ensure
cleanup_workers(workers)
end
failed_tasks
end
def spawn_monitored_workers
workers = {}
@worker_count.times do |worker_id|
begin
worker = create_monitored_worker(worker_id)
workers[worker_id] = worker
rescue => e
log_worker_creation_error(worker_id, e)
# Continue with fewer workers rather than failing completely
end
end
raise PoolError, "Failed to create any workers" if workers.empty?
workers
end
def create_monitored_worker(worker_id)
parent_conn, child_conn = create_reliable_connection
pid = fork do
Signal.trap('TERM') { exit(0) } # Graceful shutdown
parent_conn.close
begin
worker_loop_with_error_handling(child_conn, worker_id)
rescue => e
error_data = {
worker_id: worker_id,
error: e.message,
backtrace: e.backtrace,
pid: Process.pid
}
# Try to report error before dying
begin
child_conn.send({ type: :worker_error, data: error_data })
rescue
# Connection might be broken
end
exit(1)
ensure
child_conn.close
end
end
child_conn.close
{
pid: pid,
connection: parent_conn,
started_at: Time.now,
status: :active,
tasks_assigned: 0,
tasks_completed: 0
}
end
def worker_loop_with_error_handling(connection, worker_id)
loop do
begin
# Receive task with timeout
task = connection.receive_with_timeout(@timeout)
break if task.nil? || task[:type] == :shutdown
# Process with error isolation
result = process_task_safely(task, worker_id)
connection.send({ type: :result, data: result })
rescue CommunicationError => e
# Communication failure - log and exit
puts "Worker #{worker_id}: Communication failed - #{e.message}"
break
rescue TaskTimeoutError => e
# Task timeout - report and continue
connection.send({
type: :task_timeout,
data: { task_id: task[:id], worker_id: worker_id }
})
rescue => e
# Task processing error - report and continue
connection.send({
type: :task_error,
data: {
task_id: task[:id],
worker_id: worker_id,
error: e.message,
backtrace: e.backtrace
}
})
end
end
end
def process_task_safely(task, worker_id)
start_time = Time.now
# Set up task timeout using thread
timeout_thread = Thread.new do
sleep(@timeout)
Thread.main.raise(TaskTimeoutError, "Task #{task[:id]} timed out")
end
begin
result = execute_task(task)
{
task_id: task[:id],
worker_id: worker_id,
result: result,
execution_time: Time.now - start_time,
status: :success
}
ensure
timeout_thread.kill
end
end
def monitor_execution_with_timeout(task_assignments)
start_time = Time.now
results = {}
active_workers = task_assignments.keys.to_set
while !active_workers.empty? && (Time.now - start_time) < (@timeout * 2)
active_workers.each do |worker_id|
worker = @workers[worker_id]
next unless worker
begin
message = worker[:connection].receive_nonblock
handle_worker_message(worker_id, message, results)
# Remove worker from active set if it's done
if message[:type] == :worker_complete
active_workers.delete(worker_id)
end
rescue IO::WaitReadable
# No message available
rescue EOFError
# Worker died
handle_worker_death(worker_id, active_workers)
end
end
sleep(0.01)
end
# Handle timeout
if !active_workers.empty?
log_timeout_error(active_workers)
raise PoolError, "Execution timed out with workers: #{active_workers.to_a}"
end
results
end
def handle_worker_message(worker_id, message, results)
case message[:type]
when :result
results[message[:data][:task_id]] = message[:data]
when :task_error
@failed_tasks << message[:data]
log_task_error(message[:data])
when :task_timeout
@failed_tasks << message[:data]
log_task_timeout(message[:data])
when :worker_error
log_worker_error(message[:data])
raise WorkerCrashError, "Worker #{worker_id} crashed: #{message[:data][:error]}"
end
end
def log_execution_error(context, error)
@error_log << {
timestamp: Time.now,
type: :execution_error,
context: context,
error: error.message,
backtrace: error.backtrace
}
end
def log_task_error(error_data)
@error_log << {
timestamp: Time.now,
type: :task_error,
task_id: error_data[:task_id],
worker_id: error_data[:worker_id],
error: error_data[:error],
backtrace: error_data[:backtrace]
}
end
def cleanup_and_report_errors
unless @error_log.empty?
puts "\nError Summary:"
puts "=" * 50
@error_log.group_by { |e| e[:type] }.each do |error_type, errors|
puts "#{error_type.to_s.upcase}: #{errors.length} occurrences"
errors.first(3).each do |error|
puts " - #{error[:error]} (#{error[:timestamp]})"
end
puts " ... and #{errors.length - 3} more" if errors.length > 3
end
end
# Write detailed error log to file for analysis
write_error_log_to_file if @error_log.any?
end
def write_error_log_to_file
filename = "process_pool_errors_#{Time.now.strftime('%Y%m%d_%H%M%S')}.log"
File.write(filename, @error_log.map(&:to_json).join("\n"))
puts "\nDetailed error log written to: #{filename}"
end
end
class ReliableConnection
def initialize(read_io, write_io)
@read_io = read_io
@write_io = write_io
@message_id = 0
end
def send(data)
@message_id += 1
message = {
id: @message_id,
timestamp: Time.now.to_f,
payload: data
}
serialized = Marshal.dump(message)
header = [serialized.length].pack('L')
begin
@write_io.write(header + serialized)
@write_io.flush
rescue Errno::EPIPE, Errno::ECONNRESET
raise CommunicationError, "Connection broken during send"
end
end
def receive_with_timeout(timeout)
ready = IO.select([@read_io], nil, nil, timeout)
raise CommunicationError, "Receive timeout" unless ready
header = @read_io.read(4)
raise CommunicationError, "Connection closed" unless header
length = header.unpack('L')[0]
data = @read_io.read(length)
message = Marshal.load(data)
message[:payload]
rescue Errno::ECONNRESET, EOFError
raise CommunicationError, "Connection reset by peer"
end
end
Reference
Core Process Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Process.fork { block } |
Block (optional) | Integer (pid) or nil |
Creates child process, returns pid in parent, nil in child |
Process.spawn(command, **options) |
Command (String/Array), options (Hash) | Integer (pid) |
Spawns new process with specified command and options |
Process.waitpid(pid, flags=0) |
pid (Integer), flags (Integer) |
Integer (pid) |
Waits for specific process to terminate |
Process.waitpid2(pid, flags=0) |
pid (Integer), flags (Integer) |
[Integer, Process::Status] |
Returns pid and status object |
Process.wait |
None | Integer (pid) |
Waits for any child process to terminate |
Process.detach(pid) |
pid (Integer) |
Thread |
Creates thread to wait for process, prevents zombies |
Process.kill(signal, *pids) |
signal (String/Integer), pids (Integers) |
Integer (count) |
Sends signal to specified processes |
Process Status Methods
Method | Parameters | Returns | Description |
---|---|---|---|
status.exited? |
None | Boolean |
True if process exited normally |
status.exitstatus |
None | Integer or nil |
Exit status code if exited normally |
status.signaled? |
None | Boolean |
True if process terminated by signal |
status.termsig |
None | Integer or nil |
Signal number that terminated process |
status.stopsig |
None | Integer or nil |
Signal number that stopped process |
status.success? |
None | Boolean |
True if process exited with status 0 |
Signal Constants
Constant | Value | Description |
---|---|---|
Signal::TERM |
15 | Termination request (graceful) |
Signal::KILL |
9 | Force kill (cannot be caught) |
Signal::INT |
2 | Interrupt (Ctrl-C) |
Signal::USR1 |
10 | User-defined signal 1 |
Signal::USR2 |
12 | User-defined signal 2 |
Signal::CHLD |
17 | Child status changed |
Signal::ALRM |
14 | Alarm timer expired |
Process Spawn Options
Option | Type | Description |
---|---|---|
:in , :out , :err |
IO , :close , String |
Redirect standard streams |
:unsetenv_others |
Boolean |
Clear environment except specified |
:pgroup |
Integer , true |
Process group ID or create new group |
:rlimit_* |
[Integer, Integer] |
Resource limits (cpu, fsize, etc.) |
:umask |
Integer |
File creation mask |
:chdir |
String |
Working directory for child process |
:close_others |
Boolean |
Close file descriptors except 0,1,2 |
Inter-Process Communication Patterns
Pattern | Use Case | Pros | Cons |
---|---|---|---|
Pipes | Simple bidirectional communication | Fast, built-in | Limited to parent-child |
Named Pipes (FIFO) | Process-to-process messaging | Persistent, file-based | Requires filesystem cleanup |
Shared Files | State coordination | Simple, debuggable | I/O overhead, race conditions |
Signals | Process control, notifications | Immediate, lightweight | Limited data, platform differences |
Sockets | Network-like IPC | Full-duplex, networked | Setup complexity |
Message Queues | Async messaging | Buffered, persistent | Platform-specific |
Process Pool Configuration Parameters
Parameter | Typical Range | Description |
---|---|---|
Pool Size | 2-16 processes | Number of worker processes |
Max Tasks/Worker | 100-1000 tasks | Tasks before worker recycling |
Task Timeout | 30-300 seconds | Maximum task execution time |
Startup Timeout | 5-30 seconds | Worker initialization timeout |
Shutdown Timeout | 10-60 seconds | Graceful shutdown wait time |
Max Retries | 3-5 attempts | Failed task retry limit |
Backoff Multiplier | 1.5-2.0 | Retry delay exponential factor |
Common Process Pool Errors
Error | Cause | Resolution |
---|---|---|
Errno::ECHILD | No child processes to wait for | Check process lifecycle management |
Errno::ESRCH | Process doesn't exist | Verify PID validity before operations |
Errno::EPERM | Permission denied for process operation | Check user permissions and process ownership |
Errno::EPIPE | Broken pipe during communication | Implement connection error handling |
Process::Status timeout | Worker process hangs | Implement task timeouts and monitoring |
Zombie processes | Child processes not reaped | Use Process.detach or proper waitpid calls |
Memory Optimization Strategies
Strategy | Implementation | Memory Impact |
---|---|---|
Copy-on-Write | Load shared data before fork | Reduces initial memory by 50-80% |
Worker Recycling | Limit tasks per worker process | Prevents memory leaks accumulation |
GC Tuning | Adjust heap parameters per worker | Reduces GC overhead by 20-40% |
String Deduplication | Freeze shared strings | Saves 10-30% on string-heavy workloads |
Lazy Loading | Load resources on demand | Reduces baseline memory usage |
Process Affinity | Bind processes to CPU cores | Improves cache locality |