CrackedRuby logo

CrackedRuby

Process Creation and Management

Overview

Ruby provides multiple approaches for creating and managing external processes, ranging from simple command execution to complex subprocess communication. The primary interface centers around the Process module and related kernel methods like spawn, system, exec, and backtick operators.

Process creation in Ruby operates through several distinct methods, each optimized for different use cases. The spawn method offers the most control, returning a process ID for later management. The system method blocks until completion and returns success status. The exec method replaces the current process entirely. Backticks and %x{} capture command output directly.

# Basic process creation methods
pid = spawn("sleep 5")          # Non-blocking, returns PID
system("echo hello")            # Blocking, returns true/false
exec("ruby script.rb")          # Replaces current process
output = `date`                 # Captures output

The Process module provides comprehensive process management capabilities including waiting for child processes, sending signals, managing process groups, and handling process termination. Ruby automatically handles process cleanup through the garbage collector, but explicit management prevents zombie processes and resource leaks.

# Process management and control
pid = spawn("long_running_task.rb")
Process.detach(pid)             # Prevent zombie processes
Process.kill("TERM", pid)       # Send termination signal
Process.wait(pid)               # Wait for completion

Ruby's process management integrates with the underlying operating system's process model, providing access to process IDs, parent-child relationships, process groups, and session management. The implementation handles cross-platform differences between Unix-like systems and Windows, abstracting common operations while exposing platform-specific functionality when needed.

Basic Usage

Process creation begins with selecting the appropriate method based on whether you need output capture, process control, or simple command execution. The most flexible approach uses spawn with options for fine-grained control over process environment, working directory, and input/output redirection.

# Spawn with environment and options
pid = spawn(
  {"PATH" => "/usr/local/bin:/usr/bin"},
  "python3 script.py",
  chdir: "/tmp",
  out: "output.log",
  err: "error.log"
)

status = Process.wait(pid)
puts "Exit status: #{status}"

The system method handles simple cases where you need to execute a command and check success status. Unlike spawn, system blocks until the process completes and returns boolean success status, making it suitable for sequential operations.

# Sequential command execution
if system("git status")
  puts "Repository is clean"
  system("git push origin main")
else
  puts "Repository has changes"
  exit 1
end

For capturing command output, backticks and %x{} provide convenient syntax, while Open3 module offers more sophisticated input/output handling with separate streams for stdout and stderr.

# Output capture methods
current_branch = `git rev-parse --abbrev-ref HEAD`.strip
file_count = %x{find . -name "*.rb" | wc -l}.to_i

# Advanced I/O handling
require 'open3'
stdout, stderr, status = Open3.capture3("ruby -c script.rb")
puts "Syntax check: #{status.success? ? 'OK' : 'ERROR'}"
puts stderr unless stderr.empty?

Process communication through pipes enables complex data processing workflows. Ruby provides IO.popen for bidirectional communication and Open3.popen3 for separate input, output, and error streams.

# Bidirectional process communication
IO.popen("sort", "r+") do |pipe|
  pipe.puts "zebra"
  pipe.puts "apple"
  pipe.puts "banana"
  pipe.close_write

  sorted_output = pipe.read
  puts sorted_output
end

# Multiple stream handling
Open3.popen3("wc -l") do |stdin, stdout, stderr, thread|
  stdin.puts "line one"
  stdin.puts "line two"
  stdin.close

  line_count = stdout.read.strip.to_i
  puts "Lines counted: #{line_count}"
end

Error Handling & Debugging

Process operations generate various types of errors requiring different handling strategies. Command execution failures manifest as non-zero exit status, while system-level errors like missing executables or permission issues raise exceptions.

# Comprehensive error handling
begin
  pid = spawn("nonexistent_command")
  status = Process.wait(pid)

  unless status.success?
    puts "Command failed with exit code: #{status.exitstatus}"
    puts "Terminated by signal: #{status.termsig}" if status.signaled?
  end
rescue Errno::ENOENT => e
  puts "Command not found: #{e.message}"
rescue Errno::EACCES => e
  puts "Permission denied: #{e.message}"
rescue SystemCallError => e
  puts "System error: #{e.message}"
end

Process timeout handling prevents indefinite blocking when commands hang or run longer than expected. Ruby doesn't provide built-in timeout for process operations, requiring manual implementation with threads or the Timeout module.

# Process timeout implementation
require 'timeout'

def run_with_timeout(command, timeout_seconds)
  pid = spawn(command)

  begin
    Timeout.timeout(timeout_seconds) do
      Process.wait(pid)
    end
  rescue Timeout::Error
    Process.kill("KILL", pid)
    Process.wait(pid) rescue nil
    raise "Command timed out after #{timeout_seconds} seconds"
  end
end

# Usage with error recovery
begin
  run_with_timeout("slow_command.rb", 30)
rescue => e
  puts "Process execution failed: #{e.message}"
  # Implement fallback or retry logic
end

Signal handling complications arise when processes ignore termination signals or handle them in unexpected ways. Escalating from gentle termination to forced killing provides robust process cleanup.

# Robust process termination
def terminate_process(pid)
  return unless process_running?(pid)

  # Try gentle termination first
  Process.kill("TERM", pid)

  # Wait briefly for graceful shutdown
  sleep(0.5)
  return unless process_running?(pid)

  # Force termination if necessary
  Process.kill("KILL", pid)
  Process.wait(pid) rescue nil
end

def process_running?(pid)
  Process.kill(0, pid)
  true
rescue Errno::ESRCH
  false
rescue Errno::EPERM
  true # Process exists but we can't signal it
end

Debugging process issues requires understanding the relationship between parent and child processes, signal propagation, and resource inheritance. Logging process lifecycle events and examining exit statuses provides insight into failure modes.

# Process debugging with detailed logging
class ProcessManager
  def initialize(logger)
    @logger = logger
  end

  def execute(command, **options)
    @logger.info("Executing: #{command}")
    @logger.debug("Options: #{options.inspect}")

    start_time = Time.now
    pid = spawn(command, **options)
    @logger.info("Process started: PID #{pid}")

    status = Process.wait(pid)
    duration = Time.now - start_time

    @logger.info("Process completed: PID #{pid}, duration #{duration}s")

    if status.success?
      @logger.info("Success: exit code #{status.exitstatus}")
    else
      @logger.error("Failure: exit code #{status.exitstatus}")
      @logger.error("Signal: #{status.termsig}") if status.signaled?
    end

    status
  rescue => e
    @logger.error("Process error: #{e.class} - #{e.message}")
    raise
  end
end

Thread Safety & Concurrency

Process management in multithreaded Ruby applications requires careful coordination to prevent race conditions and resource conflicts. Multiple threads creating processes simultaneously can exhaust system resources or interfere with signal handling.

# Thread-safe process pool
require 'thread'

class ProcessPool
  def initialize(max_concurrent = 4)
    @max_concurrent = max_concurrent
    @active_processes = {}
    @mutex = Mutex.new
    @condition = ConditionVariable.new
  end

  def execute(command, **options)
    wait_for_slot

    Thread.new do
      begin
        pid = spawn(command, **options)
        register_process(pid)
        status = Process.wait(pid)
        status
      ensure
        unregister_process(pid)
      end
    end
  end

  private

  def wait_for_slot
    @mutex.synchronize do
      while @active_processes.size >= @max_concurrent
        @condition.wait(@mutex)
      end
    end
  end

  def register_process(pid)
    @mutex.synchronize do
      @active_processes[pid] = Time.now
    end
  end

  def unregister_process(pid)
    @mutex.synchronize do
      @active_processes.delete(pid)
      @condition.signal
    end
  end
end

Signal handling in multithreaded environments presents unique challenges since signals are delivered to the process, not individual threads. Ruby's signal handling mechanism interacts with thread scheduling, potentially causing unexpected behavior.

# Coordinated signal handling across threads
class ProcessCoordinator
  def initialize
    @processes = {}
    @shutdown = false
    @mutex = Mutex.new

    setup_signal_handlers
  end

  def spawn_managed(command, **options)
    return nil if @shutdown

    pid = spawn(command, **options)

    @mutex.synchronize do
      @processes[pid] = {
        command: command,
        started_at: Time.now,
        thread: Thread.current
      }
    end

    pid
  end

  def cleanup_all
    pids = nil

    @mutex.synchronize do
      pids = @processes.keys
      @shutdown = true
    end

    pids.each do |pid|
      terminate_process_gracefully(pid)
    end
  end

  private

  def setup_signal_handlers
    %w[INT TERM].each do |signal|
      Signal.trap(signal) do
        Thread.new { cleanup_all }
      end
    end
  end

  def terminate_process_gracefully(pid)
    Process.kill("TERM", pid)

    # Wait in separate thread to avoid blocking
    Thread.new do
      sleep(5)
      Process.kill("KILL", pid) rescue nil

      @mutex.synchronize do
        @processes.delete(pid)
      end
    end
  end
end

Concurrent process output handling prevents garbled output when multiple processes write simultaneously. Proper stream management and output buffering maintain data integrity.

# Thread-safe output collection
class OutputCollector
  def initialize
    @outputs = {}
    @mutex = Mutex.new
  end

  def capture_output(command, identifier)
    Thread.new do
      Open3.popen3(command) do |stdin, stdout, stderr, thread|
        stdin.close

        output_thread = Thread.new do
          stdout.each_line do |line|
            store_output(identifier, :stdout, line)
          end
        end

        error_thread = Thread.new do
          stderr.each_line do |line|
            store_output(identifier, :stderr, line)
          end
        end

        output_thread.join
        error_thread.join
        thread.value
      end
    end
  end

  def get_output(identifier)
    @mutex.synchronize do
      @outputs[identifier]&.dup || { stdout: [], stderr: [] }
    end
  end

  private

  def store_output(identifier, stream, line)
    @mutex.synchronize do
      @outputs[identifier] ||= { stdout: [], stderr: [] }
      @outputs[identifier][stream] << line.chomp
    end
  end
end

Production Patterns

Production process management requires robust error recovery, resource monitoring, and graceful degradation when external commands fail. Process pools and queuing systems prevent resource exhaustion while maintaining system responsiveness.

# Production-ready process manager
class ProductionProcessManager
  include Singleton

  def initialize
    @process_count = 0
    @max_processes = ENV.fetch('MAX_PROCESSES', '10').to_i
    @process_timeout = ENV.fetch('PROCESS_TIMEOUT', '300').to_i
    @mutex = Mutex.new
    @logger = Logger.new('process_manager.log')

    setup_monitoring
  end

  def execute_command(command, **options)
    check_resource_limits

    @mutex.synchronize { @process_count += 1 }

    begin
      execute_with_monitoring(command, **options)
    ensure
      @mutex.synchronize { @process_count -= 1 }
    end
  end

  private

  def execute_with_monitoring(command, **options)
    start_time = Time.now
    @logger.info("Starting process: #{command}")

    begin
      Timeout.timeout(@process_timeout) do
        pid = spawn(command, **options)
        status = Process.wait(pid)

        duration = Time.now - start_time
        @logger.info("Process completed: #{command}, duration: #{duration}s, exit: #{status.exitstatus}")

        raise ProcessError, "Command failed: #{status.exitstatus}" unless status.success?
        status
      end
    rescue Timeout::Error => e
      @logger.error("Process timeout: #{command}")
      raise ProcessTimeoutError, "Process timed out after #{@process_timeout}s"
    rescue => e
      @logger.error("Process error: #{command}, error: #{e.message}")
      raise
    end
  end

  def check_resource_limits
    current_count = @mutex.synchronize { @process_count }

    if current_count >= @max_processes
      raise ProcessLimitError, "Maximum process limit reached: #{@max_processes}"
    end

    # Check system resources
    load_average = `uptime`.match(/load average: ([\d.]+)/)[1].to_f
    if load_average > 5.0
      @logger.warn("High system load: #{load_average}")
    end
  end

  def setup_monitoring
    Thread.new do
      loop do
        sleep(60)
        log_system_status
      end
    end
  end

  def log_system_status
    current_count = @mutex.synchronize { @process_count }
    memory_usage = `ps -o pid,vsz,rss,comm -p #{Process.pid}`.lines.last

    @logger.info("System status - Active processes: #{current_count}, Memory: #{memory_usage}")
  end
end

Container and orchestration environments require special consideration for process lifecycle management, signal propagation, and resource constraints. Process managers must handle container shutdown signals and resource limits gracefully.

# Container-aware process management
class ContainerProcessManager
  def initialize
    @shutdown_requested = false
    @active_processes = []
    @mutex = Mutex.new

    setup_container_signal_handling
    setup_health_monitoring
  end

  def execute_with_health_check(command, **options)
    return false if @shutdown_requested

    pid = spawn(command, **options.merge(pgroup: true))

    @mutex.synchronize do
      @active_processes << pid
    end

    begin
      monitor_process_health(pid)
      status = Process.wait(pid)
      status.success?
    ensure
      @mutex.synchronize do
        @active_processes.delete(pid)
      end
    end
  end

  def graceful_shutdown
    @shutdown_requested = true

    active_pids = @mutex.synchronize { @active_processes.dup }

    active_pids.each do |pid|
      # Send SIGTERM to process group
      Process.kill("-TERM", pid) rescue nil
    end

    # Wait for graceful shutdown
    sleep(10)

    # Force kill remaining processes
    active_pids.each do |pid|
      Process.kill("-KILL", pid) rescue nil
      Process.wait(pid) rescue nil
    end
  end

  private

  def setup_container_signal_handling
    %w[TERM INT].each do |signal|
      Signal.trap(signal) do
        Thread.new { graceful_shutdown }
      end
    end
  end

  def setup_health_monitoring
    Thread.new do
      loop do
        check_container_resources
        sleep(30)
      end
    end
  end

  def monitor_process_health(pid)
    Thread.new do
      loop do
        break unless process_running?(pid)

        # Check memory usage
        memory_info = `ps -p #{pid} -o rss=`.strip
        if memory_info.to_i > 1024 * 1024  # 1GB in KB
          puts "Warning: Process #{pid} using high memory: #{memory_info}KB"
        end

        sleep(10)
      end
    end
  end

  def check_container_resources
    # Monitor container limits
    memory_usage = File.read('/sys/fs/cgroup/memory/memory.usage_in_bytes').to_i rescue 0
    memory_limit = File.read('/sys/fs/cgroup/memory/memory.limit_in_bytes').to_i rescue 0

    if memory_limit > 0 && memory_usage > (memory_limit * 0.8)
      puts "Warning: Container memory usage at #{(memory_usage.to_f / memory_limit * 100).round(1)}%"
    end
  end
end

Integration with web frameworks requires careful consideration of request-response cycles, background job processing, and resource cleanup. Process execution within web requests must handle timeouts and prevent resource leaks that could affect other requests.

# Web framework integration example
class WebProcessController
  def initialize
    @background_jobs = Queue.new
    @job_processor = start_job_processor
  end

  def execute_sync(command, timeout: 30)
    begin
      Timeout.timeout(timeout) do
        system(command)
      end
    rescue Timeout::Error
      { success: false, error: "Command timed out" }
    rescue => e
      { success: false, error: e.message }
    end
  end

  def execute_async(command, callback: nil)
    job_id = SecureRandom.uuid

    @background_jobs << {
      id: job_id,
      command: command,
      callback: callback,
      created_at: Time.now
    }

    { job_id: job_id, status: :queued }
  end

  private

  def start_job_processor
    Thread.new do
      loop do
        job = @background_jobs.pop
        process_background_job(job)
      end
    end
  end

  def process_background_job(job)
    start_time = Time.now

    begin
      pid = spawn(job[:command])
      status = Process.wait(pid)

      duration = Time.now - start_time
      result = {
        job_id: job[:id],
        success: status.success?,
        exit_code: status.exitstatus,
        duration: duration
      }

      job[:callback]&.call(result)
    rescue => e
      job[:callback]&.call({
        job_id: job[:id],
        success: false,
        error: e.message,
        duration: Time.now - start_time
      })
    end
  end
end

Common Pitfalls

Zombie process accumulation occurs when parent processes fail to wait for child process termination. Ruby's garbage collector eventually cleans up zombies, but explicit process management prevents resource exhaustion and maintains system health.

# Zombie process prevention
class ProcessCleaner
  def initialize
    @child_pids = Set.new
    @mutex = Mutex.new

    setup_child_reaping
  end

  def spawn_managed(command, **options)
    pid = spawn(command, **options)

    @mutex.synchronize do
      @child_pids << pid
    end

    pid
  end

  private

  def setup_child_reaping
    # Handle SIGCHLD to clean up zombies immediately
    Signal.trap("CLD") do
      reap_children
    end

    # Periodic cleanup as backup
    Thread.new do
      loop do
        sleep(30)
        reap_children
      end
    end
  end

  def reap_children
    @mutex.synchronize do
      @child_pids.each do |pid|
        begin
          wpid, status = Process.waitpid2(pid, Process::WNOHANG)
          @child_pids.delete(pid) if wpid
        rescue Errno::ECHILD
          # Child already reaped
          @child_pids.delete(pid)
        end
      end
    end
  end
end

Signal handling complexity arises from timing issues, signal masking, and platform differences. Signals can be delivered at any time, interrupting normal program flow and potentially corrupting shared state.

# Robust signal handling with state protection
class SignalSafeProcessor
  def initialize
    @processing = false
    @shutdown_requested = false
    @active_operations = {}
    @signal_queue = Queue.new

    setup_signal_handling
    start_signal_processor
  end

  def process_command(command)
    return false if @shutdown_requested

    operation_id = SecureRandom.uuid
    @active_operations[operation_id] = {
      command: command,
      started_at: Time.now,
      thread: Thread.current
    }

    begin
      execute_safely(command)
    ensure
      @active_operations.delete(operation_id)
    end
  end

  private

  def setup_signal_handling
    %w[INT TERM USR1 USR2].each do |signal|
      Signal.trap(signal) do |signo|
        # Don't process signals directly - queue them
        @signal_queue << { signal: signal, received_at: Time.now }
      end
    end
  end

  def start_signal_processor
    Thread.new do
      loop do
        signal_info = @signal_queue.pop
        handle_signal_safely(signal_info)
      end
    end
  end

  def handle_signal_safely(signal_info)
    case signal_info[:signal]
    when 'INT', 'TERM'
      puts "Shutdown signal received, finishing active operations..."
      @shutdown_requested = true

      # Wait for active operations to complete
      start_time = Time.now
      while !@active_operations.empty? && (Time.now - start_time) < 30
        sleep(0.1)
      end

      # Force terminate remaining operations
      @active_operations.each do |id, op|
        op[:thread].raise(Interrupt)
      end

      exit(0)
    when 'USR1'
      # Status signal
      puts "Active operations: #{@active_operations.size}"
    end
  end

  def execute_safely(command)
    old_mask = Signal.sigprocmask(Signal::SIG_BLOCK, %w[INT TERM])

    begin
      system(command)
    ensure
      Signal.sigprocmask(Signal::SIG_SETMASK, old_mask)
    end
  end
end

Environment variable inheritance and modification can cause unexpected behavior when child processes receive different environments than expected. Explicit environment management prevents configuration conflicts.

# Safe environment handling
class EnvironmentManager
  def self.spawn_with_clean_env(command, custom_env = {})
    # Start with minimal environment
    clean_env = {
      'PATH' => ENV['PATH'],
      'HOME' => ENV['HOME'],
      'USER' => ENV['USER'],
      'LANG' => ENV['LANG'] || 'C'
    }

    # Add custom environment variables
    final_env = clean_env.merge(custom_env)

    # Validate environment values
    final_env.each do |key, value|
      unless value.nil? || value.is_a?(String)
        raise ArgumentError, "Environment variable #{key} must be string, got #{value.class}"
      end
    end

    spawn(final_env, command)
  end

  def self.spawn_isolated(command, **options)
    # Complete environment isolation
    isolated_env = options.delete(:env) || {}

    # Ensure basic variables exist
    isolated_env['PATH'] ||= '/usr/bin:/bin'

    spawn(isolated_env, command, **options)
  end
end

# Usage with environment validation
begin
  pid = EnvironmentManager.spawn_with_clean_env(
    "python script.py",
    'PYTHONPATH' => '/opt/lib/python',
    'DEBUG' => 'true'
  )

  status = Process.wait(pid)
  puts "Process completed with status: #{status.exitstatus}"
rescue ArgumentError => e
  puts "Environment error: #{e.message}"
end

File descriptor leakage occurs when spawned processes inherit open file descriptors from the parent process, potentially causing resource exhaustion or unexpected behavior. Explicit file descriptor management prevents these issues.

# File descriptor management
class SecureSpawner
  def self.spawn_secure(command, **options)
    # Close unnecessary file descriptors in child process
    options[:close_others] = true unless options.key?(:close_others)

    # Redirect stdin/stdout/stderr explicitly
    options[:in] ||= '/dev/null' if options[:in].nil?

    # Set process group to prevent signal propagation issues
    options[:pgroup] = true unless options.key?(:pgroup)

    pid = spawn(command, **options)

    # Verify process started correctly
    sleep(0.01)  # Brief pause to let process initialize
    unless process_running?(pid)
      raise ProcessError, "Process failed to start or died immediately"
    end

    pid
  end

  def self.process_running?(pid)
    Process.kill(0, pid)
    true
  rescue Errno::ESRCH
    false
  rescue Errno::EPERM
    true  # Process exists but we can't signal it
  end

  # Safe file descriptor usage in process communication
  def self.communicate_safely(command)
    r, w = IO.pipe

    begin
      pid = spawn(command, out: w, close_others: true)
      w.close  # Close write end in parent

      output = r.read
      status = Process.wait(pid)

      [output, status.success?]
    ensure
      r.close rescue nil
      w.close rescue nil
    end
  end
end

Reference

Process Creation Methods

Method Parameters Returns Description
spawn(command, **opts) command (String), options (Hash) Integer (PID) Creates new process, returns immediately
system(command, **opts) command (String), options (Hash) Boolean Blocks until completion, returns success status
exec(command, **opts) command (String), options (Hash) Does not return Replaces current process
Backticks command (String) String Captures and returns stdout
%x{command} command (String) String Alternative syntax for capturing stdout
IO.popen(command, mode) command (String), mode (String) IO object Creates pipe for bidirectional communication

Spawn Options

Option Type Description
:chdir String Working directory for child process
:env Hash Environment variables (replaces inherited environment)
:in, :out, :err String, IO, Array Redirect standard streams
:close_others Boolean Close all file descriptors except stdin/stdout/stderr
:pgroup Boolean, Integer Process group ID or true for new group
:rlimit_* Array Resource limits (e.g., :rlimit_cpu => [60, 60])
:umask Integer File creation mask
:unsetenv_others Boolean Unset all environment variables except specified

Process Management

Method Parameters Returns Description
Process.wait(pid) pid (Integer) Process::Status Wait for specific process
Process.wait2(pid) pid (Integer) Array Returns [pid, status]
Process.waitall None Array Wait for all child processes
Process.waitpid(pid, flags) pid (Integer), flags (Integer) Integer Non-blocking wait with flags
Process.detach(pid) pid (Integer) Thread Detach process to prevent zombies
Process.kill(signal, pid) signal (String/Integer), pid (Integer) Integer Send signal to process

Process Status

Method Returns Description
status.success? Boolean True if exit status is 0
status.exitstatus Integer Process exit status code
status.signaled? Boolean True if terminated by signal
status.termsig Integer Signal number that terminated process
status.stopped? Boolean True if process is stopped
status.stopsig Integer Signal that stopped process
status.pid Integer Process ID

Signal Constants

Signal Value Description
"HUP" 1 Hangup
"INT" 2 Interrupt (Ctrl+C)
"QUIT" 3 Quit (Ctrl+)
"KILL" 9 Kill (cannot be caught)
"TERM" 15 Terminate (default for Process.kill)
"CONT" 18 Continue stopped process
"STOP" 19 Stop process (cannot be caught)
"USR1" 30 User-defined signal 1
"USR2" 31 User-defined signal 2

Wait Flags

Flag Value Description
Process::WNOHANG 1 Return immediately if no child available
Process::WUNTRACED 2 Return for stopped children too
Process::WCONTINUED 4 Return for continued children

Process Information

Method Returns Description
Process.pid Integer Current process ID
Process.ppid Integer Parent process ID
Process.getpgrp Integer Process group ID
Process.getpriority(which, who) Integer Process priority
Process.setpriority(which, who, priority) 0 Set process priority
Process.uid Integer User ID
Process.gid Integer Group ID

Exception Classes

Exception Description
Errno::ENOENT Command or file not found
Errno::EACCES Permission denied
Errno::E2BIG Argument list too long
Errno::ENOEXEC Not an executable file
Errno::ENOMEM Out of memory
Errno::ECHILD No child processes
Errno::ESRCH No such process

Open3 Methods

Method Parameters Returns Description
Open3.capture3(command) command (String) [String, String, Status] Capture stdout, stderr, status
Open3.popen3(command) command (String) Block yields [IO, IO, IO, Thread] Separate I/O streams
Open3.pipeline(commands) commands (Array) Array Execute command pipeline
Open3.pipeline_r(*commands) commands (Array) [IO, Array<Thread>] Pipeline with readable end
Open3.pipeline_w(*commands) commands (Array) [IO, Array<Thread>] Pipeline with writable end