Overview
Ruby provides several mechanisms for processes to communicate with each other and with external programs. The core facilities center around the Process
class for spawning and managing child processes, IO
operations for reading and writing data streams, and the Open3
module for advanced process control with separate input, output, and error streams.
Process communication in Ruby operates through standard Unix concepts: pipes, file descriptors, signals, and process hierarchies. Ruby wraps these system-level primitives in object-oriented interfaces that handle platform differences and provide error handling.
The primary classes involved are Process
for process lifecycle management, IO
and its subclasses for data streams, and Open3
for complex process interaction patterns. Ruby also provides PTY
for pseudo-terminal communication and various signal handling mechanisms.
# Basic process spawning and communication
pid = Process.spawn("echo 'Hello from child'")
Process.wait(pid)
# => 12345
# Pipe-based communication
reader, writer = IO.pipe
if fork
writer.close
puts reader.read
reader.close
else
reader.close
writer.write "Data from child process"
writer.close
end
Ruby handles process communication through several patterns: one-way data flow using pipes, bidirectional communication through paired pipes or sockets, and signal-based coordination for process control events.
Basic Usage
Process spawning forms the foundation of Ruby's process communication. The Process.spawn
method creates new processes with configurable input/output redirection, environment variables, and working directories.
# Spawn with output capture
pid = Process.spawn("ls -la", out: "/tmp/listing.txt")
Process.wait(pid)
File.read("/tmp/listing.txt") # Contains directory listing
Pipe communication allows data exchange between parent and child processes. Ruby's IO.pipe
creates linked file descriptors where data written to one end appears at the other end.
# Parent-child communication via pipes
reader, writer = IO.pipe
child_pid = fork do
reader.close # Close unused end in child
10.times do |i|
writer.puts "Message #{i} from child"
sleep 0.1
end
writer.close
end
writer.close # Close unused end in parent
while line = reader.gets
puts "Parent received: #{line.chomp}"
end
reader.close
Process.wait(child_pid)
The Open3
module provides standardized access to process input, output, and error streams simultaneously. This approach works well for programs requiring interactive communication or error stream monitoring.
require 'open3'
# Bidirectional communication with a subprocess
stdin, stdout, stderr, wait_thr = Open3.popen3("sort")
# Send data to the subprocess
["zebra", "apple", "banana"].each do |word|
stdin.puts word
end
stdin.close
# Read sorted results
sorted_output = stdout.read.split("\n")
puts sorted_output # => ["apple", "banana", "zebra"]
stdout.close
stderr.close
wait_thr.value # Wait for process completion
Signal-based communication provides event notification between processes. Ruby's signal handling allows processes to respond to system events and custom notifications.
# Signal handling for process coordination
child_pid = fork do
Signal.trap("USR1") do
puts "Child received custom signal"
end
sleep 10 # Wait for signals
end
sleep 1 # Let child set up signal handler
Process.kill("USR1", child_pid)
Process.wait(child_pid)
Advanced Usage
Complex process communication scenarios often require multiple communication channels, process pools, or sophisticated data serialization. Ruby provides several patterns for these advanced requirements.
Process pools manage multiple worker processes sharing a common communication interface. This pattern distributes work across processes while maintaining coordination through the parent process.
require 'json'
class ProcessPool
def initialize(worker_count = 4)
@workers = []
@work_queue = []
worker_count.times do |i|
reader, writer = IO.pipe
child_reader, child_writer = IO.pipe
pid = fork do
# Child process: close parent ends
writer.close
child_reader.close
# Worker loop
while data = reader.gets
task = JSON.parse(data.chomp)
result = process_task(task)
child_writer.puts(JSON.dump(result))
end
reader.close
child_writer.close
end
# Parent: close child ends
reader.close
child_writer.close
@workers << {
pid: pid,
input: writer,
output: child_reader
}
end
end
def submit_work(task)
worker = @workers.sample # Simple round-robin would be better
worker[:input].puts(JSON.dump(task))
# Read result
result_json = worker[:output].gets
JSON.parse(result_json.chomp)
end
def shutdown
@workers.each do |worker|
worker[:input].close
worker[:output].close
Process.wait(worker[:pid])
end
end
private
def process_task(task)
# Simulate work
{ result: task['input'] * 2, worker_pid: Process.pid }
end
end
# Usage
pool = ProcessPool.new(2)
result = pool.submit_work({ 'input' => 21 })
puts result # => {"result"=>42, "worker_pid"=>12346}
pool.shutdown
Bidirectional streaming communication handles continuous data exchange between processes. This pattern works for implementing custom protocols or streaming data processors.
require 'open3'
class StreamProcessor
def initialize(command)
@stdin, @stdout, @stderr, @wait_thr = Open3.popen3(command)
@output_thread = start_output_reader
end
def send_data(data)
@stdin.puts data
@stdin.flush
end
def close_input
@stdin.close
end
def wait_completion
@output_thread.join
@wait_thr.value
end
private
def start_output_reader
Thread.new do
while line = @stdout.gets
yield line.chomp if block_given?
end
rescue IOError
# Output stream closed
ensure
@stdout.close
end
end
end
# Process streaming data through external command
processor = StreamProcessor.new("grep ERROR") do |error_line|
puts "Found error: #{error_line}"
end
# Send log lines to the grep process
log_lines = [
"INFO: Application started",
"ERROR: Database connection failed",
"WARN: Retrying connection",
"ERROR: Authentication timeout"
]
log_lines.each { |line| processor.send_data(line) }
processor.close_input
processor.wait_completion
Named pipes (FIFOs) enable communication between unrelated processes through filesystem-based channels. Ruby handles named pipe creation and access through standard file operations.
require 'pathname'
# Create named pipe for cross-process communication
fifo_path = "/tmp/ruby_communication_pipe"
system("mkfifo #{fifo_path}")
# Producer process
producer_pid = fork do
File.open(fifo_path, "w") do |pipe|
20.times do |i|
data = { timestamp: Time.now, counter: i, message: "Update #{i}" }
pipe.puts JSON.dump(data)
sleep 0.5
end
end
end
# Consumer process
consumer_pid = fork do
File.open(fifo_path, "r") do |pipe|
while line = pipe.gets
data = JSON.parse(line.chomp)
puts "Received: #{data['message']} at #{data['timestamp']}"
end
end
end
# Wait for both processes
Process.wait(producer_pid)
Process.wait(consumer_pid)
# Cleanup
File.unlink(fifo_path)
Error Handling & Debugging
Process communication introduces several error conditions: process crashes, broken pipes, signal delivery failures, and resource exhaustion. Ruby provides mechanisms to detect and handle these conditions gracefully.
Process exit status monitoring prevents silent failures in child processes. The Process::Status
object contains detailed information about how a process terminated.
require 'open3'
def safe_process_execution(command, input_data = nil)
begin
stdin, stdout, stderr, wait_thr = Open3.popen3(command)
if input_data
stdin.write(input_data)
stdin.close
end
output = stdout.read
errors = stderr.read
status = wait_thr.value
stdout.close
stderr.close
unless status.success?
raise ProcessExecutionError.new(
"Command failed: #{command}",
status.exitstatus,
errors
)
end
output
rescue Errno::ENOENT
raise ProcessExecutionError.new("Command not found: #{command}")
rescue Errno::EPIPE
raise ProcessCommunicationError.new("Broken pipe during execution")
rescue IOError => e
raise ProcessCommunicationError.new("IO error: #{e.message}")
end
end
class ProcessExecutionError < StandardError
attr_reader :exit_code, :stderr_output
def initialize(message, exit_code = nil, stderr_output = nil)
super(message)
@exit_code = exit_code
@stderr_output = stderr_output
end
end
class ProcessCommunicationError < StandardError; end
# Usage with error handling
begin
result = safe_process_execution("nonexistent_command")
rescue ProcessExecutionError => e
puts "Process error: #{e.message}"
puts "Exit code: #{e.exit_code}" if e.exit_code
puts "Stderr: #{e.stderr_output}" if e.stderr_output
end
Pipe error handling addresses common issues like broken pipes, blocked writes, and premature process termination. Ruby raises specific exceptions for different pipe failure modes.
class RobustPipeCommunicator
def initialize
@reader, @writer = IO.pipe
@child_pid = nil
end
def start_child_process
@child_pid = fork do
@writer.close # Close writer in child
begin
while data = @reader.gets
# Simulate processing that might fail
if data.include?("error")
raise "Simulated processing error"
end
puts "Child processed: #{data.chomp}"
end
rescue => e
warn "Child process error: #{e.message}"
exit(1)
ensure
@reader.close
end
end
@reader.close # Close reader in parent
end
def send_data(data)
begin
@writer.puts(data)
@writer.flush
rescue Errno::EPIPE
# Child process terminated unexpectedly
check_child_status
raise ProcessCommunicationError.new("Child process terminated")
rescue IOError => e
raise ProcessCommunicationError.new("Write error: #{e.message}")
end
end
def close_and_wait
begin
@writer.close
rescue IOError
# Writer already closed
end
if @child_pid
pid, status = Process.wait2(@child_pid)
unless status.success?
raise ProcessExecutionError.new(
"Child process failed",
status.exitstatus
)
end
end
end
private
def check_child_status
return unless @child_pid
begin
pid, status = Process.waitpid2(@child_pid, Process::WNOHANG)
if pid # Child has exited
@child_pid = nil
return status
end
rescue Errno::ECHILD
# Child already reaped
@child_pid = nil
end
nil
end
end
# Usage with comprehensive error handling
communicator = RobustPipeCommunicator.new
communicator.start_child_process
begin
communicator.send_data("normal data\n")
communicator.send_data("error data\n") # This will cause child to fail
communicator.send_data("more data\n") # This might raise EPIPE
rescue ProcessCommunicationError => e
puts "Communication error: #{e.message}"
ensure
begin
communicator.close_and_wait
rescue ProcessExecutionError => e
puts "Child process failed: #{e.message}"
end
end
Debugging process communication issues requires visibility into data flow, process states, and timing relationships. Ruby provides several approaches for instrumenting communication channels.
class DebuggableProcessCommunicator
def initialize(debug: false)
@debug = debug
@log_file = debug ? File.open("/tmp/process_debug.log", "w") : nil
end
def execute_with_logging(command, input_data = nil)
log("Starting command: #{command}")
begin
stdin, stdout, stderr, wait_thr = Open3.popen3(command)
if input_data
log("Sending input: #{input_data.bytesize} bytes")
stdin.write(input_data)
stdin.close
end
# Read with timeout to avoid hanging
output = read_with_timeout(stdout, 30)
errors = read_with_timeout(stderr, 5)
status = wait_thr.value
log("Process completed: exit=#{status.exitstatus}")
{ output: output, errors: errors, status: status }
rescue => e
log("Exception: #{e.class}: #{e.message}")
raise
ensure
[stdin, stdout, stderr].each do |io|
io.close rescue IOError
end
end
end
private
def read_with_timeout(io, timeout)
if IO.select([io], nil, nil, timeout)
io.read
else
log("Read timeout after #{timeout} seconds")
""
end
end
def log(message)
return unless @debug
timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%3N")
log_entry = "[#{timestamp}] #{message}\n"
@log_file.write(log_entry)
@log_file.flush
warn(log_entry.chomp)
end
end
# Usage for debugging communication issues
debugger = DebuggableProcessCommunicator.new(debug: true)
result = debugger.execute_with_logging("sort", "zebra\napple\nbanana\n")
puts "Output: #{result[:output]}"
Production Patterns
Production environments require robust process communication patterns that handle failures gracefully, provide monitoring capabilities, and scale across multiple servers. Ruby applications commonly implement these patterns for background job processing, microservice communication, and external system integration.
Background job processing systems use process communication for task distribution and result collection. This pattern isolates CPU-intensive or potentially unstable work from the main application process.
require 'json'
require 'socket'
class ProductionJobProcessor
def initialize(worker_count: 4, queue_path: "/tmp/job_queue")
@worker_count = worker_count
@queue_path = queue_path
@workers = []
@running = false
@stats = {
jobs_processed: 0,
jobs_failed: 0,
workers_restarted: 0
}
end
def start
@running = true
setup_signal_handlers
start_workers
monitor_workers
end
def stop
@running = false
@workers.each { |w| Process.kill("TERM", w[:pid]) rescue nil }
@workers.each { |w| Process.wait(w[:pid]) rescue nil }
end
def enqueue_job(job_data)
UNIXSocket.open(@queue_path) do |socket|
socket.write(JSON.dump(job_data) + "\n")
end
rescue Errno::ECONNREFUSED
# Queue server not running
false
end
def stats
@stats.dup
end
private
def start_workers
@worker_count.times do |worker_id|
spawn_worker(worker_id)
end
end
def spawn_worker(worker_id)
reader, writer = IO.pipe
pid = fork do
writer.close
worker_loop(worker_id, reader)
end
reader.close
@workers[worker_id] = {
pid: pid,
control_pipe: writer,
started_at: Time.now,
jobs_processed: 0
}
end
def worker_loop(worker_id, control_pipe)
Signal.trap("TERM") { exit(0) }
UNIXServer.open("#{@queue_path}.worker.#{worker_id}") do |server|
while @running
begin
client = server.accept_nonblock
job_json = client.gets
next unless job_json
job_data = JSON.parse(job_json.chomp)
result = process_job(job_data)
client.write(JSON.dump(result) + "\n")
control_pipe.puts("job_completed")
rescue IO::WaitReadable
sleep(0.1)
rescue => e
control_pipe.puts("job_failed:#{e.message}")
ensure
client&.close
end
end
end
rescue => e
warn "Worker #{worker_id} crashed: #{e.message}"
exit(1)
end
def process_job(job_data)
# Simulate job processing
case job_data['type']
when 'email'
send_email(job_data['recipient'], job_data['subject'])
when 'image_resize'
resize_image(job_data['path'], job_data['dimensions'])
when 'data_export'
export_data(job_data['query'], job_data['format'])
else
raise "Unknown job type: #{job_data['type']}"
end
end
def monitor_workers
Thread.new do
while @running
@workers.each_with_index do |worker, index|
next unless worker
begin
pid, status = Process.waitpid2(worker[:pid], Process::WNOHANG)
if pid # Worker exited
if status.success?
@stats[:jobs_processed] += worker[:jobs_processed]
else
@stats[:jobs_failed] += 1
end
@stats[:workers_restarted] += 1
spawn_worker(index) # Restart worker
end
rescue Errno::ECHILD
# Worker already reaped, restart it
spawn_worker(index)
end
end
sleep(1)
end
end
end
def setup_signal_handlers
Signal.trap("INT") { stop }
Signal.trap("TERM") { stop }
Signal.trap("USR1") do
puts "Current stats: #{stats}"
end
end
end
# Production usage with monitoring
processor = ProductionJobProcessor.new(worker_count: 8)
# Start in background thread
processing_thread = Thread.new { processor.start }
# Enqueue jobs
processor.enqueue_job({
type: 'email',
recipient: 'user@example.com',
subject: 'Welcome'
})
# Check stats periodically
Thread.new do
loop do
puts processor.stats
sleep(30)
end
end
Microservice communication patterns handle service-to-service communication in distributed systems. This approach uses process spawning for service isolation and standard communication protocols.
require 'socket'
require 'json'
class MicroserviceGateway
def initialize(services_config)
@services = {}
@health_check_interval = 30
services_config.each { |name, config| register_service(name, config) }
start_health_monitoring
end
def call_service(service_name, method, params = {})
service = @services[service_name]
raise "Service not found: #{service_name}" unless service
raise "Service unhealthy: #{service_name}" unless service[:healthy]
request = {
method: method,
params: params,
request_id: generate_request_id
}
begin
UNIXSocket.open(service[:socket_path]) do |socket|
socket.write(JSON.dump(request) + "\n")
response_json = socket.gets
response = JSON.parse(response_json.chomp)
if response['error']
raise ServiceError.new(response['error'])
else
response['result']
end
end
rescue Errno::ECONNREFUSED, Errno::ENOENT
service[:healthy] = false
restart_service(service_name)
raise ServiceUnavailableError.new("Service unavailable: #{service_name}")
end
end
private
def register_service(name, config)
socket_path = "/tmp/service_#{name}.sock"
pid = fork do
start_service_process(name, config, socket_path)
end
@services[name] = {
pid: pid,
socket_path: socket_path,
config: config,
healthy: true,
last_health_check: Time.now
}
# Give service time to start
sleep(1)
end
def start_service_process(name, config, socket_path)
File.unlink(socket_path) if File.exist?(socket_path)
UNIXServer.open(socket_path) do |server|
service_instance = create_service_instance(config)
while true
client = server.accept
request_json = client.gets
next unless request_json
begin
request = JSON.parse(request_json.chomp)
result = service_instance.send(request['method'], request['params'])
response = { result: result }
rescue => e
response = { error: e.message }
end
client.write(JSON.dump(response) + "\n")
client.close
end
end
rescue => e
warn "Service #{name} crashed: #{e.message}"
exit(1)
end
def create_service_instance(config)
case config['type']
when 'calculator'
CalculatorService.new
when 'database'
DatabaseService.new(config['connection_string'])
else
raise "Unknown service type: #{config['type']}"
end
end
def start_health_monitoring
Thread.new do
loop do
@services.each { |name, service| check_service_health(name) }
sleep(@health_check_interval)
end
end
end
def check_service_health(service_name)
begin
call_service(service_name, 'health_check')
@services[service_name][:healthy] = true
@services[service_name][:last_health_check] = Time.now
rescue => e
@services[service_name][:healthy] = false
end
end
def restart_service(service_name)
service = @services[service_name]
Process.kill("TERM", service[:pid]) rescue nil
Process.wait(service[:pid]) rescue nil
register_service(service_name, service[:config])
end
def generate_request_id
"#{Time.now.to_f}-#{rand(10000)}"
end
end
class ServiceError < StandardError; end
class ServiceUnavailableError < StandardError; end
# Example service implementations
class CalculatorService
def add(params)
params['a'] + params['b']
end
def multiply(params)
params['a'] * params['b']
end
def health_check(params = {})
'ok'
end
end
# Usage in production environment
gateway = MicroserviceGateway.new({
'calculator' => { 'type' => 'calculator' },
'database' => { 'type' => 'database', 'connection_string' => 'postgres://...' }
})
result = gateway.call_service('calculator', 'add', { 'a' => 5, 'b' => 3 })
puts result # => 8
Reference
Process Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Process.spawn(command, **options) |
command (String), options (Hash) |
Integer |
Spawns new process, returns PID |
Process.fork(&block) |
block (optional) | Integer or nil |
Creates child process, returns PID in parent, nil in child |
Process.wait(pid = -1) |
pid (Integer) |
Integer |
Waits for process completion, returns PID |
Process.wait2(pid = -1) |
pid (Integer) |
Array |
Returns [PID, Process::Status] |
Process.waitpid(pid, flags = 0) |
pid (Integer), flags (Integer) |
Integer |
Waits for specific process |
Process.kill(signal, *pids) |
signal (String/Integer), pids (Integer) |
Integer |
Sends signal to processes |
Process.daemon(nochdir = nil, noclose = nil) |
nochdir (Boolean), noclose (Boolean) |
Integer |
Daemonizes current process |
IO Pipe Methods
Method | Parameters | Returns | Description |
---|---|---|---|
IO.pipe(encoding = nil) |
encoding (String/Encoding) |
Array |
Creates pipe pair [reader, writer] |
IO.popen(command, mode = "r") |
command (String), mode (String) |
IO |
Opens pipe to subprocess |
#close_read |
None | nil |
Closes read end of pipe |
#close_write |
None | nil |
Closes write end of pipe |
#closed? |
None | Boolean |
Checks if pipe is closed |
Open3 Module Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Open3.popen3(command, **opts) |
command (String), options (Hash) |
Array |
Returns [stdin, stdout, stderr, wait_thr] |
Open3.popen2(command, **opts) |
command (String), options (Hash) |
Array |
Returns [stdin, stdout, wait_thr] |
Open3.popen2e(command, **opts) |
command (String), options (Hash) |
Array |
Returns [stdin, stdout_stderr, wait_thr] |
Open3.capture2(command, **opts) |
command (String), options (Hash) |
Array |
Returns [stdout, Process::Status] |
Open3.capture3(command, **opts) |
command (String), options (Hash) |
Array |
Returns [stdout, stderr, Process::Status] |
Open3.pipeline(*commands) |
commands (Array) | Array |
Chains commands with pipes |
Process Spawn Options
Option | Type | Description |
---|---|---|
:in |
IO , String , Array |
Input redirection |
:out |
IO , String , Array |
Output redirection |
:err |
IO , String , Array |
Error redirection |
:chdir |
String |
Working directory |
:umask |
Integer |
File creation mask |
:pgroup |
true , Integer |
Process group |
:env |
Hash |
Environment variables |
:rlimit_* |
Integer , Array |
Resource limits |
:unsetenv_others |
Boolean |
Clear inherited environment |
:close_others |
Boolean |
Close non-standard file descriptors |
Signal Names
Signal | Value | Description |
---|---|---|
"TERM" |
15 | Termination request |
"KILL" |
9 | Force termination |
"INT" |
2 | Interrupt (Ctrl+C) |
"HUP" |
1 | Hangup |
"USR1" |
10 | User-defined signal 1 |
"USR2" |
12 | User-defined signal 2 |
"CHLD" |
17 | Child status changed |
"STOP" |
19 | Stop process |
"CONT" |
18 | Continue process |
Process::Status Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#success? |
None | Boolean |
Process exited successfully |
#exitstatus |
None | Integer or nil |
Exit status code |
#signaled? |
None | Boolean |
Terminated by signal |
#termsig |
None | Integer or nil |
Terminating signal number |
#stopped? |
None | Boolean |
Process stopped |
#stopsig |
None | Integer or nil |
Stop signal number |
#coredump? |
None | Boolean |
Core dump generated |
Common Error Classes
Error | Description |
---|---|
Errno::ECHILD |
No child processes |
Errno::EPIPE |
Broken pipe |
Errno::ENOENT |
Command not found |
Errno::ECONNREFUSED |
Connection refused |
IOError |
Input/output error |
SystemCallError |
System call failed |
Process Wait Flags
Flag | Value | Description |
---|---|---|
Process::WNOHANG |
1 | Non-blocking wait |
Process::WUNTRACED |
2 | Report stopped children |
Process::WCONTINUED |
8 | Report continued children |