CrackedRuby logo

CrackedRuby

Process Communication

Process communication mechanisms in Ruby for inter-process coordination, data exchange, and external program interaction.

Core Built-in Classes Process and System
2.10.2

Overview

Ruby provides several mechanisms for processes to communicate with each other and with external programs. The core facilities center around the Process class for spawning and managing child processes, IO operations for reading and writing data streams, and the Open3 module for advanced process control with separate input, output, and error streams.

Process communication in Ruby operates through standard Unix concepts: pipes, file descriptors, signals, and process hierarchies. Ruby wraps these system-level primitives in object-oriented interfaces that handle platform differences and provide error handling.

The primary classes involved are Process for process lifecycle management, IO and its subclasses for data streams, and Open3 for complex process interaction patterns. Ruby also provides PTY for pseudo-terminal communication and various signal handling mechanisms.

# Basic process spawning and communication
pid = Process.spawn("echo 'Hello from child'")
Process.wait(pid)
# => 12345

# Pipe-based communication
reader, writer = IO.pipe
if fork
  writer.close
  puts reader.read
  reader.close
else
  reader.close
  writer.write "Data from child process"
  writer.close
end

Ruby handles process communication through several patterns: one-way data flow using pipes, bidirectional communication through paired pipes or sockets, and signal-based coordination for process control events.

Basic Usage

Process spawning forms the foundation of Ruby's process communication. The Process.spawn method creates new processes with configurable input/output redirection, environment variables, and working directories.

# Spawn with output capture
pid = Process.spawn("ls -la", out: "/tmp/listing.txt")
Process.wait(pid)
File.read("/tmp/listing.txt")  # Contains directory listing

Pipe communication allows data exchange between parent and child processes. Ruby's IO.pipe creates linked file descriptors where data written to one end appears at the other end.

# Parent-child communication via pipes
reader, writer = IO.pipe

child_pid = fork do
  reader.close  # Close unused end in child
  
  10.times do |i|
    writer.puts "Message #{i} from child"
    sleep 0.1
  end
  
  writer.close
end

writer.close  # Close unused end in parent

while line = reader.gets
  puts "Parent received: #{line.chomp}"
end

reader.close
Process.wait(child_pid)

The Open3 module provides standardized access to process input, output, and error streams simultaneously. This approach works well for programs requiring interactive communication or error stream monitoring.

require 'open3'

# Bidirectional communication with a subprocess
stdin, stdout, stderr, wait_thr = Open3.popen3("sort")

# Send data to the subprocess
["zebra", "apple", "banana"].each do |word|
  stdin.puts word
end
stdin.close

# Read sorted results
sorted_output = stdout.read.split("\n")
puts sorted_output  # => ["apple", "banana", "zebra"]

stdout.close
stderr.close
wait_thr.value  # Wait for process completion

Signal-based communication provides event notification between processes. Ruby's signal handling allows processes to respond to system events and custom notifications.

# Signal handling for process coordination
child_pid = fork do
  Signal.trap("USR1") do
    puts "Child received custom signal"
  end
  
  sleep 10  # Wait for signals
end

sleep 1  # Let child set up signal handler
Process.kill("USR1", child_pid)
Process.wait(child_pid)

Advanced Usage

Complex process communication scenarios often require multiple communication channels, process pools, or sophisticated data serialization. Ruby provides several patterns for these advanced requirements.

Process pools manage multiple worker processes sharing a common communication interface. This pattern distributes work across processes while maintaining coordination through the parent process.

require 'json'

class ProcessPool
  def initialize(worker_count = 4)
    @workers = []
    @work_queue = []
    
    worker_count.times do |i|
      reader, writer = IO.pipe
      child_reader, child_writer = IO.pipe
      
      pid = fork do
        # Child process: close parent ends
        writer.close
        child_reader.close
        
        # Worker loop
        while data = reader.gets
          task = JSON.parse(data.chomp)
          result = process_task(task)
          child_writer.puts(JSON.dump(result))
        end
        
        reader.close
        child_writer.close
      end
      
      # Parent: close child ends
      reader.close
      child_writer.close
      
      @workers << {
        pid: pid,
        input: writer,
        output: child_reader
      }
    end
  end
  
  def submit_work(task)
    worker = @workers.sample  # Simple round-robin would be better
    worker[:input].puts(JSON.dump(task))
    
    # Read result
    result_json = worker[:output].gets
    JSON.parse(result_json.chomp)
  end
  
  def shutdown
    @workers.each do |worker|
      worker[:input].close
      worker[:output].close
      Process.wait(worker[:pid])
    end
  end
  
  private
  
  def process_task(task)
    # Simulate work
    { result: task['input'] * 2, worker_pid: Process.pid }
  end
end

# Usage
pool = ProcessPool.new(2)
result = pool.submit_work({ 'input' => 21 })
puts result  # => {"result"=>42, "worker_pid"=>12346}
pool.shutdown

Bidirectional streaming communication handles continuous data exchange between processes. This pattern works for implementing custom protocols or streaming data processors.

require 'open3'

class StreamProcessor
  def initialize(command)
    @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(command)
    @output_thread = start_output_reader
  end
  
  def send_data(data)
    @stdin.puts data
    @stdin.flush
  end
  
  def close_input
    @stdin.close
  end
  
  def wait_completion
    @output_thread.join
    @wait_thr.value
  end
  
  private
  
  def start_output_reader
    Thread.new do
      while line = @stdout.gets
        yield line.chomp if block_given?
      end
    rescue IOError
      # Output stream closed
    ensure
      @stdout.close
    end
  end
end

# Process streaming data through external command
processor = StreamProcessor.new("grep ERROR") do |error_line|
  puts "Found error: #{error_line}"
end

# Send log lines to the grep process
log_lines = [
  "INFO: Application started",
  "ERROR: Database connection failed", 
  "WARN: Retrying connection",
  "ERROR: Authentication timeout"
]

log_lines.each { |line| processor.send_data(line) }
processor.close_input
processor.wait_completion

Named pipes (FIFOs) enable communication between unrelated processes through filesystem-based channels. Ruby handles named pipe creation and access through standard file operations.

require 'pathname'

# Create named pipe for cross-process communication
fifo_path = "/tmp/ruby_communication_pipe"
system("mkfifo #{fifo_path}")

# Producer process
producer_pid = fork do
  File.open(fifo_path, "w") do |pipe|
    20.times do |i|
      data = { timestamp: Time.now, counter: i, message: "Update #{i}" }
      pipe.puts JSON.dump(data)
      sleep 0.5
    end
  end
end

# Consumer process  
consumer_pid = fork do
  File.open(fifo_path, "r") do |pipe|
    while line = pipe.gets
      data = JSON.parse(line.chomp)
      puts "Received: #{data['message']} at #{data['timestamp']}"
    end
  end
end

# Wait for both processes
Process.wait(producer_pid)
Process.wait(consumer_pid)

# Cleanup
File.unlink(fifo_path)

Error Handling & Debugging

Process communication introduces several error conditions: process crashes, broken pipes, signal delivery failures, and resource exhaustion. Ruby provides mechanisms to detect and handle these conditions gracefully.

Process exit status monitoring prevents silent failures in child processes. The Process::Status object contains detailed information about how a process terminated.

require 'open3'

def safe_process_execution(command, input_data = nil)
  begin
    stdin, stdout, stderr, wait_thr = Open3.popen3(command)
    
    if input_data
      stdin.write(input_data)
      stdin.close
    end
    
    output = stdout.read
    errors = stderr.read
    status = wait_thr.value
    
    stdout.close
    stderr.close
    
    unless status.success?
      raise ProcessExecutionError.new(
        "Command failed: #{command}", 
        status.exitstatus, 
        errors
      )
    end
    
    output
    
  rescue Errno::ENOENT
    raise ProcessExecutionError.new("Command not found: #{command}")
  rescue Errno::EPIPE
    raise ProcessCommunicationError.new("Broken pipe during execution")
  rescue IOError => e
    raise ProcessCommunicationError.new("IO error: #{e.message}")
  end
end

class ProcessExecutionError < StandardError
  attr_reader :exit_code, :stderr_output
  
  def initialize(message, exit_code = nil, stderr_output = nil)
    super(message)
    @exit_code = exit_code
    @stderr_output = stderr_output
  end
end

class ProcessCommunicationError < StandardError; end

# Usage with error handling
begin
  result = safe_process_execution("nonexistent_command")
rescue ProcessExecutionError => e
  puts "Process error: #{e.message}"
  puts "Exit code: #{e.exit_code}" if e.exit_code
  puts "Stderr: #{e.stderr_output}" if e.stderr_output
end

Pipe error handling addresses common issues like broken pipes, blocked writes, and premature process termination. Ruby raises specific exceptions for different pipe failure modes.

class RobustPipeCommunicator
  def initialize
    @reader, @writer = IO.pipe
    @child_pid = nil
  end
  
  def start_child_process
    @child_pid = fork do
      @writer.close  # Close writer in child
      
      begin
        while data = @reader.gets
          # Simulate processing that might fail
          if data.include?("error")
            raise "Simulated processing error"
          end
          
          puts "Child processed: #{data.chomp}"
        end
      rescue => e
        warn "Child process error: #{e.message}"
        exit(1)
      ensure
        @reader.close
      end
    end
    
    @reader.close  # Close reader in parent
  end
  
  def send_data(data)
    begin
      @writer.puts(data)
      @writer.flush
    rescue Errno::EPIPE
      # Child process terminated unexpectedly
      check_child_status
      raise ProcessCommunicationError.new("Child process terminated")
    rescue IOError => e
      raise ProcessCommunicationError.new("Write error: #{e.message}")
    end
  end
  
  def close_and_wait
    begin
      @writer.close
    rescue IOError
      # Writer already closed
    end
    
    if @child_pid
      pid, status = Process.wait2(@child_pid)
      unless status.success?
        raise ProcessExecutionError.new(
          "Child process failed", 
          status.exitstatus
        )
      end
    end
  end
  
  private
  
  def check_child_status
    return unless @child_pid
    
    begin
      pid, status = Process.waitpid2(@child_pid, Process::WNOHANG)
      if pid  # Child has exited
        @child_pid = nil
        return status
      end
    rescue Errno::ECHILD
      # Child already reaped
      @child_pid = nil
    end
    
    nil
  end
end

# Usage with comprehensive error handling
communicator = RobustPipeCommunicator.new
communicator.start_child_process

begin
  communicator.send_data("normal data\n")
  communicator.send_data("error data\n")  # This will cause child to fail
  communicator.send_data("more data\n")   # This might raise EPIPE
rescue ProcessCommunicationError => e
  puts "Communication error: #{e.message}"
ensure
  begin
    communicator.close_and_wait
  rescue ProcessExecutionError => e
    puts "Child process failed: #{e.message}"
  end
end

Debugging process communication issues requires visibility into data flow, process states, and timing relationships. Ruby provides several approaches for instrumenting communication channels.

class DebuggableProcessCommunicator
  def initialize(debug: false)
    @debug = debug
    @log_file = debug ? File.open("/tmp/process_debug.log", "w") : nil
  end
  
  def execute_with_logging(command, input_data = nil)
    log("Starting command: #{command}")
    
    begin
      stdin, stdout, stderr, wait_thr = Open3.popen3(command)
      
      if input_data
        log("Sending input: #{input_data.bytesize} bytes")
        stdin.write(input_data)
        stdin.close
      end
      
      # Read with timeout to avoid hanging
      output = read_with_timeout(stdout, 30)
      errors = read_with_timeout(stderr, 5)
      
      status = wait_thr.value
      log("Process completed: exit=#{status.exitstatus}")
      
      { output: output, errors: errors, status: status }
      
    rescue => e
      log("Exception: #{e.class}: #{e.message}")
      raise
    ensure
      [stdin, stdout, stderr].each do |io|
        io.close rescue IOError
      end
    end
  end
  
  private
  
  def read_with_timeout(io, timeout)
    if IO.select([io], nil, nil, timeout)
      io.read
    else
      log("Read timeout after #{timeout} seconds")
      ""
    end
  end
  
  def log(message)
    return unless @debug
    
    timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S.%3N")
    log_entry = "[#{timestamp}] #{message}\n"
    
    @log_file.write(log_entry)
    @log_file.flush
    warn(log_entry.chomp)
  end
end

# Usage for debugging communication issues
debugger = DebuggableProcessCommunicator.new(debug: true)
result = debugger.execute_with_logging("sort", "zebra\napple\nbanana\n")
puts "Output: #{result[:output]}"

Production Patterns

Production environments require robust process communication patterns that handle failures gracefully, provide monitoring capabilities, and scale across multiple servers. Ruby applications commonly implement these patterns for background job processing, microservice communication, and external system integration.

Background job processing systems use process communication for task distribution and result collection. This pattern isolates CPU-intensive or potentially unstable work from the main application process.

require 'json'
require 'socket'

class ProductionJobProcessor
  def initialize(worker_count: 4, queue_path: "/tmp/job_queue")
    @worker_count = worker_count
    @queue_path = queue_path
    @workers = []
    @running = false
    @stats = {
      jobs_processed: 0,
      jobs_failed: 0,
      workers_restarted: 0
    }
  end
  
  def start
    @running = true
    setup_signal_handlers
    start_workers
    monitor_workers
  end
  
  def stop
    @running = false
    @workers.each { |w| Process.kill("TERM", w[:pid]) rescue nil }
    @workers.each { |w| Process.wait(w[:pid]) rescue nil }
  end
  
  def enqueue_job(job_data)
    UNIXSocket.open(@queue_path) do |socket|
      socket.write(JSON.dump(job_data) + "\n")
    end
  rescue Errno::ECONNREFUSED
    # Queue server not running
    false
  end
  
  def stats
    @stats.dup
  end
  
  private
  
  def start_workers
    @worker_count.times do |worker_id|
      spawn_worker(worker_id)
    end
  end
  
  def spawn_worker(worker_id)
    reader, writer = IO.pipe
    
    pid = fork do
      writer.close
      worker_loop(worker_id, reader)
    end
    
    reader.close
    @workers[worker_id] = { 
      pid: pid, 
      control_pipe: writer,
      started_at: Time.now,
      jobs_processed: 0
    }
  end
  
  def worker_loop(worker_id, control_pipe)
    Signal.trap("TERM") { exit(0) }
    
    UNIXServer.open("#{@queue_path}.worker.#{worker_id}") do |server|
      while @running
        begin
          client = server.accept_nonblock
          job_json = client.gets
          next unless job_json
          
          job_data = JSON.parse(job_json.chomp)
          result = process_job(job_data)
          
          client.write(JSON.dump(result) + "\n")
          control_pipe.puts("job_completed")
          
        rescue IO::WaitReadable
          sleep(0.1)
        rescue => e
          control_pipe.puts("job_failed:#{e.message}")
        ensure
          client&.close
        end
      end
    end
  rescue => e
    warn "Worker #{worker_id} crashed: #{e.message}"
    exit(1)
  end
  
  def process_job(job_data)
    # Simulate job processing
    case job_data['type']
    when 'email'
      send_email(job_data['recipient'], job_data['subject'])
    when 'image_resize'
      resize_image(job_data['path'], job_data['dimensions'])
    when 'data_export'
      export_data(job_data['query'], job_data['format'])
    else
      raise "Unknown job type: #{job_data['type']}"
    end
  end
  
  def monitor_workers
    Thread.new do
      while @running
        @workers.each_with_index do |worker, index|
          next unless worker
          
          begin
            pid, status = Process.waitpid2(worker[:pid], Process::WNOHANG)
            if pid  # Worker exited
              if status.success?
                @stats[:jobs_processed] += worker[:jobs_processed]
              else
                @stats[:jobs_failed] += 1
              end
              
              @stats[:workers_restarted] += 1
              spawn_worker(index)  # Restart worker
            end
          rescue Errno::ECHILD
            # Worker already reaped, restart it
            spawn_worker(index)
          end
        end
        
        sleep(1)
      end
    end
  end
  
  def setup_signal_handlers
    Signal.trap("INT") { stop }
    Signal.trap("TERM") { stop }
    
    Signal.trap("USR1") do
      puts "Current stats: #{stats}"
    end
  end
end

# Production usage with monitoring
processor = ProductionJobProcessor.new(worker_count: 8)

# Start in background thread
processing_thread = Thread.new { processor.start }

# Enqueue jobs
processor.enqueue_job({
  type: 'email',
  recipient: 'user@example.com',
  subject: 'Welcome'
})

# Check stats periodically
Thread.new do
  loop do
    puts processor.stats
    sleep(30)
  end
end

Microservice communication patterns handle service-to-service communication in distributed systems. This approach uses process spawning for service isolation and standard communication protocols.

require 'socket'
require 'json'

class MicroserviceGateway
  def initialize(services_config)
    @services = {}
    @health_check_interval = 30
    services_config.each { |name, config| register_service(name, config) }
    start_health_monitoring
  end
  
  def call_service(service_name, method, params = {})
    service = @services[service_name]
    raise "Service not found: #{service_name}" unless service
    raise "Service unhealthy: #{service_name}" unless service[:healthy]
    
    request = {
      method: method,
      params: params,
      request_id: generate_request_id
    }
    
    begin
      UNIXSocket.open(service[:socket_path]) do |socket|
        socket.write(JSON.dump(request) + "\n")
        response_json = socket.gets
        
        response = JSON.parse(response_json.chomp)
        
        if response['error']
          raise ServiceError.new(response['error'])
        else
          response['result']
        end
      end
    rescue Errno::ECONNREFUSED, Errno::ENOENT
      service[:healthy] = false
      restart_service(service_name)
      raise ServiceUnavailableError.new("Service unavailable: #{service_name}")
    end
  end
  
  private
  
  def register_service(name, config)
    socket_path = "/tmp/service_#{name}.sock"
    
    pid = fork do
      start_service_process(name, config, socket_path)
    end
    
    @services[name] = {
      pid: pid,
      socket_path: socket_path,
      config: config,
      healthy: true,
      last_health_check: Time.now
    }
    
    # Give service time to start
    sleep(1)
  end
  
  def start_service_process(name, config, socket_path)
    File.unlink(socket_path) if File.exist?(socket_path)
    
    UNIXServer.open(socket_path) do |server|
      service_instance = create_service_instance(config)
      
      while true
        client = server.accept
        request_json = client.gets
        next unless request_json
        
        begin
          request = JSON.parse(request_json.chomp)
          result = service_instance.send(request['method'], request['params'])
          response = { result: result }
        rescue => e
          response = { error: e.message }
        end
        
        client.write(JSON.dump(response) + "\n")
        client.close
      end
    end
  rescue => e
    warn "Service #{name} crashed: #{e.message}"
    exit(1)
  end
  
  def create_service_instance(config)
    case config['type']
    when 'calculator'
      CalculatorService.new
    when 'database'
      DatabaseService.new(config['connection_string'])
    else
      raise "Unknown service type: #{config['type']}"
    end
  end
  
  def start_health_monitoring
    Thread.new do
      loop do
        @services.each { |name, service| check_service_health(name) }
        sleep(@health_check_interval)
      end
    end
  end
  
  def check_service_health(service_name)
    begin
      call_service(service_name, 'health_check')
      @services[service_name][:healthy] = true
      @services[service_name][:last_health_check] = Time.now
    rescue => e
      @services[service_name][:healthy] = false
    end
  end
  
  def restart_service(service_name)
    service = @services[service_name]
    Process.kill("TERM", service[:pid]) rescue nil
    Process.wait(service[:pid]) rescue nil
    
    register_service(service_name, service[:config])
  end
  
  def generate_request_id
    "#{Time.now.to_f}-#{rand(10000)}"
  end
end

class ServiceError < StandardError; end
class ServiceUnavailableError < StandardError; end

# Example service implementations
class CalculatorService
  def add(params)
    params['a'] + params['b']
  end
  
  def multiply(params)  
    params['a'] * params['b']
  end
  
  def health_check(params = {})
    'ok'
  end
end

# Usage in production environment
gateway = MicroserviceGateway.new({
  'calculator' => { 'type' => 'calculator' },
  'database' => { 'type' => 'database', 'connection_string' => 'postgres://...' }
})

result = gateway.call_service('calculator', 'add', { 'a' => 5, 'b' => 3 })
puts result  # => 8

Reference

Process Class Methods

Method Parameters Returns Description
Process.spawn(command, **options) command (String), options (Hash) Integer Spawns new process, returns PID
Process.fork(&block) block (optional) Integer or nil Creates child process, returns PID in parent, nil in child
Process.wait(pid = -1) pid (Integer) Integer Waits for process completion, returns PID
Process.wait2(pid = -1) pid (Integer) Array Returns [PID, Process::Status]
Process.waitpid(pid, flags = 0) pid (Integer), flags (Integer) Integer Waits for specific process
Process.kill(signal, *pids) signal (String/Integer), pids (Integer) Integer Sends signal to processes
Process.daemon(nochdir = nil, noclose = nil) nochdir (Boolean), noclose (Boolean) Integer Daemonizes current process

IO Pipe Methods

Method Parameters Returns Description
IO.pipe(encoding = nil) encoding (String/Encoding) Array Creates pipe pair [reader, writer]
IO.popen(command, mode = "r") command (String), mode (String) IO Opens pipe to subprocess
#close_read None nil Closes read end of pipe
#close_write None nil Closes write end of pipe
#closed? None Boolean Checks if pipe is closed

Open3 Module Methods

Method Parameters Returns Description
Open3.popen3(command, **opts) command (String), options (Hash) Array Returns [stdin, stdout, stderr, wait_thr]
Open3.popen2(command, **opts) command (String), options (Hash) Array Returns [stdin, stdout, wait_thr]
Open3.popen2e(command, **opts) command (String), options (Hash) Array Returns [stdin, stdout_stderr, wait_thr]
Open3.capture2(command, **opts) command (String), options (Hash) Array Returns [stdout, Process::Status]
Open3.capture3(command, **opts) command (String), options (Hash) Array Returns [stdout, stderr, Process::Status]
Open3.pipeline(*commands) commands (Array) Array Chains commands with pipes

Process Spawn Options

Option Type Description
:in IO, String, Array Input redirection
:out IO, String, Array Output redirection
:err IO, String, Array Error redirection
:chdir String Working directory
:umask Integer File creation mask
:pgroup true, Integer Process group
:env Hash Environment variables
:rlimit_* Integer, Array Resource limits
:unsetenv_others Boolean Clear inherited environment
:close_others Boolean Close non-standard file descriptors

Signal Names

Signal Value Description
"TERM" 15 Termination request
"KILL" 9 Force termination
"INT" 2 Interrupt (Ctrl+C)
"HUP" 1 Hangup
"USR1" 10 User-defined signal 1
"USR2" 12 User-defined signal 2
"CHLD" 17 Child status changed
"STOP" 19 Stop process
"CONT" 18 Continue process

Process::Status Methods

Method Parameters Returns Description
#success? None Boolean Process exited successfully
#exitstatus None Integer or nil Exit status code
#signaled? None Boolean Terminated by signal
#termsig None Integer or nil Terminating signal number
#stopped? None Boolean Process stopped
#stopsig None Integer or nil Stop signal number
#coredump? None Boolean Core dump generated

Common Error Classes

Error Description
Errno::ECHILD No child processes
Errno::EPIPE Broken pipe
Errno::ENOENT Command not found
Errno::ECONNREFUSED Connection refused
IOError Input/output error
SystemCallError System call failed

Process Wait Flags

Flag Value Description
Process::WNOHANG 1 Non-blocking wait
Process::WUNTRACED 2 Report stopped children
Process::WCONTINUED 8 Report continued children