Overview
Pipes provide inter-process communication (IPC) through a kernel-maintained buffer that connects the output of one process to the input of another. The operating system manages the buffer, handles synchronization, and enforces read/write ordering. Two distinct types exist: anonymous pipes, which connect related processes through inheritance, and named pipes (FIFOs), which exist as filesystem entries accessible to unrelated processes.
Anonymous pipes create a unidirectional data channel between a parent process and its children. When a process forks, both parent and child inherit the pipe's file descriptors, establishing a communication path. The kernel maintains the pipe buffer in memory, typically 4KB to 64KB depending on the operating system, and blocks readers when the buffer is empty or writers when the buffer is full.
Named pipes extend the pipe concept to unrelated processes by creating a special file in the filesystem. The file serves as a rendezvous point where multiple processes can open read or write ends. The operating system handles the connection and data transfer without storing data on disk—the "file" exists only as a reference point.
# Anonymous pipe connecting parent and child
reader, writer = IO.pipe
if fork
# Parent process
writer.close
data = reader.read
reader.close
puts "Received: #{data}"
else
# Child process
reader.close
writer.write("Message from child")
writer.close
end
The pipe abstraction appears in command-line shells as the vertical bar operator, connecting standard output of one program to standard input of another. This mechanism forms the foundation of Unix philosophy: small programs that perform specific tasks and communicate through text streams.
Key Principles
Pipes implement several fundamental principles that define their behavior and limitations. Understanding these principles clarifies when pipes provide appropriate solutions and when alternative IPC mechanisms better fit requirements.
Unidirectional Data Flow: Each anonymous pipe supports data movement in a single direction. One process writes to the write end while another reads from the read end. Bidirectional communication requires creating two pipes, reversing the read and write roles for each direction. This constraint simplifies kernel implementation and prevents deadlock scenarios where both processes simultaneously attempt incompatible operations.
Byte Stream Semantics: Pipes transmit raw bytes without preserving message boundaries. If a writer sends three separate writes of "ABC", "DEF", and "GHI", the reader might receive "ABCDEFGHI" in one read operation or split across multiple reads like "ABCD", "EF", and "GHI". Applications requiring message framing must implement their own delimiters or length prefixes.
Blocking Behavior: Read operations block when no data exists in the buffer until a writer produces data or closes the write end. Write operations block when the buffer fills until a reader consumes data or closes the read end. This automatic flow control prevents fast writers from overwhelming slow readers, though it introduces potential deadlock conditions if processes wait on each other incorrectly.
Automatic Cleanup: When all write ends close, readers receive end-of-file (EOF). When all read ends close, writers receive SIGPIPE signals or EPIPE errors. The kernel tracks reference counts for both ends and performs cleanup when the final reference disappears. This mechanism prevents resource leaks even if processes terminate unexpectedly.
Buffer Management: The kernel allocates a fixed-size circular buffer for each pipe, typically 4KB on Linux systems (configurable up to 1MB). The buffer resides in kernel memory, not user space, requiring system calls for each read or write operation. Buffer size affects performance: larger buffers reduce context switches but consume more kernel memory.
# Demonstrating byte stream semantics - no message boundaries
reader, writer = IO.pipe
# Writer sends three separate messages
writer.write("First")
writer.write("Second")
writer.write("Third")
writer.close
# Reader might receive them combined
data = reader.read(20) # Could get "FirstSecondThird" in one read
reader.close
Named Pipe Semantics: Named pipes add filesystem-based discovery to pipe semantics. Creating a FIFO makes a special file that processes can open by path. Opening a named pipe for reading blocks until another process opens it for writing, and vice versa. This handshake ensures both ends exist before data transfer begins.
Capacity Limits: The PIPE_BUF constant defines the maximum atomic write size, typically 4KB on POSIX systems. Writes smaller than PIPE_BUF complete atomically or fail entirely, preventing interleaving when multiple writers use the same pipe. Writes exceeding PIPE_BUF may split across multiple operations, allowing other writers to interleave data.
Synchronization: Pipes provide implicit synchronization through blocking. A reader waiting for data automatically suspends until data arrives. A writer blocked on a full buffer resumes when space becomes available. This eliminates the need for explicit locks or semaphores in simple producer-consumer scenarios.
Implementation Approaches
Selecting between anonymous pipes and named pipes depends on process relationships, discovery requirements, and lifetime constraints. Each approach offers distinct advantages for specific communication patterns.
Anonymous Pipes with Fork: This approach creates a pipe before forking, giving both parent and child inherited file descriptors. The pattern works well for parent-child communication where the parent launches the child and manages its lifecycle. After forking, each process closes the unused end, establishing a unidirectional channel.
# Parent-to-child communication pattern
reader, writer = IO.pipe
pid = fork do
writer.close # Child closes write end
message = reader.read
reader.close
puts "Child received: #{message}"
exit
end
reader.close # Parent closes read end
writer.write("Configuration data")
writer.close
Process.wait(pid)
This approach limits communication to related processes. The pipe has no name or filesystem presence, preventing unrelated processes from joining the conversation. The pipe automatically disappears when both processes close their ends, requiring no cleanup code.
Anonymous Pipes with Exec: Combining pipes with exec replaces the child process image while preserving inherited file descriptors. This pattern connects a Ruby script to external programs, capturing their output or feeding their input. The file descriptor numbers remain valid across the exec boundary.
# Capturing output from external command
reader, writer = IO.pipe
pid = fork do
reader.close
STDOUT.reopen(writer)
writer.close
exec("ls", "-la")
end
writer.close
output = reader.read
reader.close
Process.wait(pid)
puts "Program output:\n#{output}"
Bidirectional Anonymous Pipes: Creating two pipes enables full-duplex communication between processes. One pipe carries parent-to-child messages while the other carries child-to-parent messages. Each process closes both unused ends, leaving two open descriptors: one for reading, one for writing.
# Full-duplex communication
parent_read, child_write = IO.pipe
child_read, parent_write = IO.pipe
pid = fork do
parent_read.close
parent_write.close
# Child receives request
request = child_read.read(100)
child_read.close
# Child sends response
child_write.write("Processed: #{request}")
child_write.close
exit
end
child_read.close
child_write.close
# Parent sends request
parent_write.write("Data to process")
parent_write.close
# Parent receives response
response = parent_read.read
parent_read.close
Process.wait(pid)
Named Pipes for Unrelated Processes: Named pipes solve the discovery problem by creating a filesystem entry that processes can open by path. This approach suits scenarios where processes start independently and lack parent-child relationships. The FIFO acts as a rendezvous point with well-known location.
# Process A: Writer
fifo_path = "/tmp/myfifo"
File.mkfifo(fifo_path, 0644) unless File.exist?(fifo_path)
File.open(fifo_path, "w") do |fifo|
fifo.write("Message through named pipe")
end
# Process B: Reader (separate program)
fifo_path = "/tmp/myfifo"
File.open(fifo_path, "r") do |fifo|
message = fifo.read
puts "Received: #{message}"
end
File.unlink(fifo_path) # Clean up FIFO
Named pipes support multiple readers or multiple writers, though behavior becomes complex. Multiple readers compete for data with each read consuming bytes that other readers cannot access. Multiple writers may interleave data unless writes remain smaller than PIPE_BUF.
Pipeline Composition: Chaining multiple processes through pipes creates data processing pipelines. Each process reads from its input pipe, transforms the data, and writes to its output pipe. The pattern scales to arbitrary numbers of stages, with the kernel managing buffers between each pair.
# Three-stage pipeline: generator -> transformer -> consumer
r1, w1 = IO.pipe
r2, w2 = IO.pipe
# Stage 1: Generator
pid1 = fork do
r1.close
r2.close
w2.close
10.times { |i| w1.puts(i) }
w1.close
end
# Stage 2: Transformer
pid2 = fork do
w1.close
r2.close
r1.each_line { |line| w2.puts(line.to_i * 2) }
r1.close
w2.close
end
# Stage 3: Consumer (main process)
w1.close
w2.close
r1.close
r2.each_line { |line| puts "Result: #{line}" }
r2.close
Process.wait(pid1)
Process.wait(pid2)
Ruby Implementation
Ruby provides pipe support through the IO class and File module, exposing both anonymous and named pipe creation with standard file descriptor operations. The implementation wraps POSIX pipe system calls with Ruby's IO abstractions, handling buffering, encoding, and cross-platform differences.
Creating Anonymous Pipes: The IO.pipe method creates a pipe and returns two IO objects representing the read and write ends. These objects support standard IO operations: read, write, puts, gets, each_line, and close. Ruby handles file descriptor management and cleanup when objects go out of scope or get explicitly closed.
reader, writer = IO.pipe
# Check pipe properties
puts reader.fileno # File descriptor number for read end
puts writer.fileno # File descriptor number for write end
puts reader.stat.pipe? # true - identifies as pipe
Writing and Reading: IO objects returned by IO.pipe support buffered and unbuffered operations. The write method sends bytes immediately while puts adds newline delimiters. Read methods include read (all bytes), read(n) (up to n bytes), gets (one line), and each_line (iterator).
reader, writer = IO.pipe
# Writer in separate thread
Thread.new do
writer.puts("Line 1")
writer.puts("Line 2")
writer.puts("Line 3")
writer.close
end
# Reader processes lines
reader.each_line do |line|
puts "Received: #{line.chomp}"
end
reader.close
Non-blocking Operations: Setting O_NONBLOCK flag changes blocking behavior, causing reads on empty pipes or writes on full pipes to raise Errno::EAGAIN instead of blocking. This mode enables select/poll-based multiplexing across multiple pipes.
require 'fcntl'
reader, writer = IO.pipe
reader.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
begin
data = reader.read_nonblock(100)
puts "Read: #{data}"
rescue IO::WaitReadable
puts "No data available yet"
# Could use IO.select([reader]) to wait
end
writer.close
reader.close
Process Integration: The IO.popen method combines fork, pipe creation, and exec into a single operation. It launches an external command with its standard input or output connected through a pipe, returning an IO object for communication.
# Capturing command output
IO.popen(["ls", "-la"], "r") do |pipe|
pipe.each_line do |line|
puts "Output: #{line}"
end
end
# Sending input to command
IO.popen(["sort"], "w") do |pipe|
pipe.puts("zebra")
pipe.puts("apple")
pipe.puts("monkey")
end
The mode parameter determines direction: "r" for reading command output, "w" for writing to command input, or "r+" for bidirectional communication. The block form automatically closes the pipe and waits for the child process.
Named Pipe Creation: Ruby lacks built-in named pipe creation in core, requiring the File.mkfifo method from extensions or system calls through FFI. Once created, named pipes open like regular files with File.open, though opening blocks until both ends connect.
require 'fileutils'
fifo_path = "/tmp/ruby_fifo"
# Create FIFO using system call
system("mkfifo", fifo_path) unless File.exist?(fifo_path)
# Writer process
fork do
File.open(fifo_path, "w") do |fifo|
fifo.puts("Message 1")
sleep 1
fifo.puts("Message 2")
end
end
# Reader process (main)
File.open(fifo_path, "r") do |fifo|
fifo.each_line do |line|
puts "Read: #{line.chomp}"
end
end
Process.wait
File.unlink(fifo_path)
Pipe Capacity Management: Ruby exposes fcntl operations for querying and setting pipe buffer size on Linux systems. The F_GETPIPE_SZ and F_SETPIPE_SZ operations control kernel buffer allocation, affecting performance and memory usage.
require 'fcntl'
reader, writer = IO.pipe
# Query current pipe capacity (Linux only)
if defined?(Fcntl::F_GETPIPE_SZ)
capacity = reader.fcntl(Fcntl::F_GETPIPE_SZ)
puts "Pipe capacity: #{capacity} bytes"
# Attempt to increase capacity
begin
new_capacity = reader.fcntl(Fcntl::F_SETPIPE_SZ, 65536)
puts "New capacity: #{new_capacity} bytes"
rescue Errno::EPERM
puts "Permission denied for capacity change"
end
end
reader.close
writer.close
Binary Data Handling: Pipes transmit bytes without interpretation. Setting binary mode prevents Ruby from performing encoding conversions or newline translations, essential for transmitting non-text data.
reader, writer = IO.pipe
reader.binmode
writer.binmode
# Write binary data
binary_data = [0x89, 0x50, 0x4E, 0x47].pack("C*") # PNG signature
writer.write(binary_data)
writer.close
# Read preserves exact bytes
received = reader.read
puts received.bytes.map { |b| "0x%02X" % b }.join(" ")
reader.close
Select-based Multiplexing: IO.select monitors multiple pipes simultaneously, returning when any become readable or writable. This approach handles multiple communication channels without threading.
pipes = 3.times.map { IO.pipe }
readers = pipes.map(&:first)
writers = pipes.map(&:last)
# Write to random pipe from thread
Thread.new do
sleep 0.5
writers.sample.puts("Random message")
writers.each(&:close)
end
# Wait for any reader to have data
ready_readers, _, _ = IO.select(readers)
ready_readers.each do |reader|
puts "Pipe #{readers.index(reader)} says: #{reader.gets}"
end
readers.each(&:close)
Error Handling: Pipes generate specific exceptions for error conditions. Errno::EPIPE occurs when writing to a pipe with no readers. IO::EAGAINWaitReadable and IO::EAGAINWaitWritable signal non-blocking operation would block.
reader, writer = IO.pipe
reader.close # Close read end
begin
writer.write("Data")
rescue Errno::EPIPE => e
puts "Broken pipe: #{e.message}"
ensure
writer.close
end
Practical Examples
Pipes solve real-world problems in process coordination, output capture, data streaming, and inter-program communication. These examples demonstrate complete implementations addressing common requirements.
Log Aggregator: Multiple worker processes send log messages through pipes to a central collector that timestamps and writes to a single file. This pattern prevents log interleaving and coordinates access to shared resources.
require 'time'
log_file = "aggregated.log"
readers = []
# Spawn 3 worker processes
3.times do |worker_id|
reader, writer = IO.pipe
readers << reader
fork do
reader.close
5.times do |i|
writer.puts("Worker #{worker_id}: Task #{i} completed")
sleep rand(0.1..0.5)
end
writer.close
end
writer.close
end
# Aggregator reads from all pipes
File.open(log_file, "w") do |log|
until readers.all?(&:eof?)
ready, _, _ = IO.select(readers, nil, nil, 0.1)
ready&.each do |reader|
begin
line = reader.read_nonblock(1024)
log.puts("[#{Time.now.iso8601}] #{line}")
log.flush
rescue IO::WaitReadable
# No data available yet
rescue EOFError
# Reader closed
end
end
end
end
readers.each(&:close)
Process.waitall
Command Pipeline Builder: Chains multiple external commands through pipes, similar to shell pipelines but with programmatic control and error handling.
def pipeline(*commands)
pipes = (commands.size - 1).times.map { IO.pipe }
pids = []
commands.each_with_index do |cmd, index|
pids << fork do
# Connect input pipe
if index > 0
STDIN.reopen(pipes[index - 1][0])
end
# Connect output pipe
if index < commands.size - 1
STDOUT.reopen(pipes[index][1])
end
# Close all pipe ends in child
pipes.each { |r, w| r.close; w.close }
exec(*cmd)
end
end
# Close all pipes in parent
pipes.each { |r, w| r.close; w.close }
# Wait for all commands
pids.each { |pid| Process.wait(pid) }
end
# Usage: equivalent to "cat file.txt | grep error | wc -l"
pipeline(
["cat", "file.txt"],
["grep", "error"],
["wc", "-l"]
)
Producer-Consumer with Backpressure: Implements a work queue where producers generate tasks faster than consumers process them. The pipe's limited buffer provides natural backpressure, blocking producers when consumers fall behind.
TASK_COUNT = 100
reader, writer = IO.pipe
# Producer generates tasks
producer = Thread.new do
TASK_COUNT.times do |i|
task = "Task #{i}: #{rand(1000)}"
writer.puts(task)
puts "Produced: #{task}"
end
writer.close
end
# Consumer processes tasks slowly
consumer = Thread.new do
reader.each_line do |task|
puts "Processing: #{task.chomp}"
sleep 0.1 # Simulate slow processing
end
reader.close
end
producer.join
consumer.join
puts "Completed #{TASK_COUNT} tasks"
Named Pipe Service: Creates a named pipe that accepts commands from multiple clients, demonstrating a simple request-response service.
# Server process
require 'json'
fifo_path = "/tmp/service_fifo"
system("mkfifo", fifo_path) unless File.exist?(fifo_path)
puts "Service listening on #{fifo_path}"
loop do
File.open(fifo_path, "r+") do |fifo|
request = fifo.gets
break if request.nil?
data = JSON.parse(request)
result = case data["command"]
when "upcase" then data["text"].upcase
when "reverse" then data["text"].reverse
else "Unknown command"
end
fifo.puts(JSON.generate(result: result))
end
end
File.unlink(fifo_path)
# Client process
require 'json'
fifo_path = "/tmp/service_fifo"
File.open(fifo_path, "w+") do |fifo|
request = { command: "upcase", text: "hello world" }
fifo.puts(JSON.generate(request))
response = fifo.gets
result = JSON.parse(response)
puts "Response: #{result["result"]}"
end
Streaming Data Processor: Reads large files in chunks through a pipe, processes each chunk in parallel threads, and writes results through another pipe.
def stream_process(input_file, output_file)
reader, writer = IO.pipe
processed_reader, processed_writer = IO.pipe
# Reader thread: feeds data into pipeline
reader_thread = Thread.new do
File.open(input_file, "r") do |file|
file.each_line do |line|
writer.puts(line)
end
end
writer.close
end
# Processor thread: transforms data
processor_thread = Thread.new do
reader.each_line do |line|
processed = line.upcase.reverse
processed_writer.puts(processed)
end
reader.close
processed_writer.close
end
# Writer thread: saves results
writer_thread = Thread.new do
File.open(output_file, "w") do |file|
processed_reader.each_line do |line|
file.puts(line)
end
end
processed_reader.close
end
reader_thread.join
processor_thread.join
writer_thread.join
end
stream_process("input.txt", "output.txt")
Integration & Interoperability
Pipes integrate with shell environments, containerized systems, network protocols, and cross-language applications. Understanding integration patterns enables pipes to bridge disparate technologies.
Shell Command Integration: Ruby scripts commonly launch shell commands and capture output through pipes. The backtick operator, %x{}, and IO.popen all create pipes behind the scenes, connecting Ruby code to shell utilities.
# Different methods of shell integration
output1 = `ls -la` # Backticks
output2 = %x{ps aux | grep ruby} # Percent-x
IO.popen("df -h") { |io| output3 = io.read } # Explicit pipe
# More control with Open3
require 'open3'
stdout, stderr, status = Open3.capture3("command arg1 arg2")
puts "Exit code: #{status.exitstatus}"
puts "Output: #{stdout}"
puts "Errors: #{stderr}"
Docker Container Communication: Named pipes enable communication between host and container processes when mounted as volumes. The pipe exists on the host filesystem but remains accessible inside the container.
# Host process creating named pipe
fifo_path = "/tmp/docker_pipe"
system("mkfifo", fifo_path)
# Docker command mounting pipe
system("docker run -v /tmp/docker_pipe:/pipe alpine sh -c 'echo Container message > /pipe'")
# Host process reading from pipe
File.open(fifo_path, "r") do |fifo|
message = fifo.read
puts "From container: #{message}"
end
File.unlink(fifo_path)
Systemd Service Integration: Named pipes work with systemd services for structured logging or command interfaces. Services declare pipe dependencies in unit files, ensuring proper initialization order.
# Service writing to pipe
fifo_path = "/run/myservice.fifo"
loop do
begin
File.open(fifo_path, "w") do |fifo|
fifo.puts("Service status: operational")
sleep 5
end
rescue Errno::ENXIO
# No reader connected yet
sleep 1
end
end
Database Query Streaming: Large query results stream through pipes to avoid loading entire datasets into memory. The database client writes results to a pipe while processing code reads incrementally.
require 'pg'
reader, writer = IO.pipe
# Background thread executes query
query_thread = Thread.new do
conn = PG.connect(dbname: 'mydb')
conn.exec("SELECT * FROM large_table") do |result|
result.each do |row|
writer.puts(row.to_json)
end
end
conn.close
writer.close
end
# Main thread processes results incrementally
reader.each_line do |line|
row = JSON.parse(line)
# Process row without loading entire result set
puts "Processing: #{row['id']}"
end
reader.close
query_thread.join
Named Pipe Protocol Implementation: Multiple processes communicate through a protocol layered on named pipes, with message framing and command dispatch.
class PipeProtocol
def initialize(fifo_path)
@fifo_path = fifo_path
system("mkfifo", @fifo_path) unless File.exist?(@fifo_path)
end
def send_message(type, payload)
File.open(@fifo_path, "w") do |fifo|
message = { type: type, payload: payload, timestamp: Time.now.to_i }
fifo.puts(JSON.generate(message))
end
end
def receive_message
File.open(@fifo_path, "r") do |fifo|
line = fifo.gets
return nil if line.nil?
JSON.parse(line, symbolize_names: true)
end
end
def cleanup
File.unlink(@fifo_path) if File.exist?(@fifo_path)
end
end
# Usage
protocol = PipeProtocol.new("/tmp/protocol_fifo")
protocol.send_message("event", { action: "start", value: 42 })
message = protocol.receive_message
puts "Received #{message[:type]}: #{message[:payload]}"
protocol.cleanup
Cross-Language Communication: Pipes enable Ruby to communicate with programs written in other languages. Both processes use standard file descriptors, making the implementation language irrelevant.
# Ruby process
reader, writer = IO.pipe
pid = fork do
reader.close
# Spawn Python process with inherited pipe
STDIN.reopen(writer)
writer.close
exec("python3", "-c", "import sys; print('From Python:', sys.stdin.read())")
end
writer.puts("Message from Ruby")
writer.close
output = reader.read
reader.close
Process.wait(pid)
puts "Python responded: #{output}"
Common Pitfalls
Pipes introduce subtle issues through blocking behavior, buffer limitations, and cleanup requirements. Recognizing these pitfalls prevents deadlocks, data corruption, and resource leaks.
Deadlock from Bidirectional Single Pipe: Attempting bidirectional communication through one pipe causes deadlock when both processes try to read and write simultaneously. Each process blocks waiting for the other to read before writing.
# WRONG: Causes deadlock
reader, writer = IO.pipe
pid = fork do
# Child writes then reads
writer.write("From child")
message = reader.read(100) # Blocks forever
end
# Parent writes then reads
writer.write("From parent")
message = reader.read(100) # Blocks forever
Process.wait(pid)
The solution requires two pipes or careful ordering where one side writes completely before the other writes.
# CORRECT: Two pipes for bidirectional communication
to_child_r, to_child_w = IO.pipe
to_parent_r, to_parent_w = IO.pipe
pid = fork do
to_child_w.close
to_parent_r.close
to_parent_w.write("From child")
to_parent_w.close
message = to_child_r.read
to_child_r.close
end
to_child_r.close
to_parent_w.close
to_child_w.write("From parent")
to_child_w.close
message = to_parent_r.read
to_parent_r.close
Process.wait(pid)
Buffer Overflow Blocking: Writing more data than the pipe buffer holds blocks the writer until a reader consumes data. If the writer blocks without closing the write end and no reader exists, the process hangs indefinitely.
# WRONG: Writes huge data without reading
reader, writer = IO.pipe
# Attempt to write 10MB to 64KB buffer
large_data = "x" * (10 * 1024 * 1024)
writer.write(large_data) # Blocks when buffer fills
writer.close
data = reader.read
reader.close
The solution splits writing and reading into separate threads or processes, or uses non-blocking operations with select.
# CORRECT: Write in background thread
reader, writer = IO.pipe
writer_thread = Thread.new do
large_data = "x" * (10 * 1024 * 1024)
writer.write(large_data)
writer.close
end
data = reader.read
reader.close
writer_thread.join
Forgotten File Descriptor Closure: Failing to close pipe ends prevents EOF detection and leaks file descriptors. If a parent process keeps both ends open after forking, the child's closure of the write end does not signal EOF to readers.
# WRONG: Parent keeps write end open
reader, writer = IO.pipe
pid = fork do
# Child closes write end properly
writer.close
reader.close
end
# Parent never closes write end
data = reader.read # Hangs forever waiting for EOF
reader.close
Process.wait(pid)
Each process must close the ends it does not use. After forking, the parent closes its copy of the write end if only the child writes.
# CORRECT: Close unused ends
reader, writer = IO.pipe
pid = fork do
writer.close # Child closes write end
reader.close # Child closes read end
end
writer.close # Parent closes write end
reader.close # Parent closes read end
Process.wait(pid)
Named Pipe Opening Deadlock: Opening a named pipe for reading blocks until a writer opens it, and opening for writing blocks until a reader opens it. If processes open in the same order, both block forever.
# WRONG: Both processes try to open for writing first
fifo_path = "/tmp/deadlock_fifo"
system("mkfifo", fifo_path)
# Process 1
fork do
writer = File.open(fifo_path, "w") # Blocks waiting for reader
writer.write("message")
writer.close
end
# Process 2
fork do
writer = File.open(fifo_path, "w") # Blocks waiting for reader
writer.write("message")
writer.close
end
Process.waitall
The solution assigns complementary roles: one process opens for reading while the other opens for writing, or uses non-blocking open operations.
Broken Pipe Signal Handling: Writing to a pipe with no readers generates SIGPIPE by default, terminating the process unless the signal is trapped. This occurs when readers close early or crash.
# WRONG: Unhandled SIGPIPE kills process
reader, writer = IO.pipe
reader.close # Close reader immediately
begin
writer.write("Data") # Raises Errno::EPIPE after SIGPIPE
rescue Errno::EPIPE
puts "Pipe broken - this code never runs due to signal"
end
writer.close
Trap the signal or handle Errno::EPIPE exceptions to prevent termination.
# CORRECT: Handle broken pipe
Signal.trap("PIPE", "IGNORE") # Prevent signal termination
reader, writer = IO.pipe
reader.close
begin
writer.write("Data")
rescue Errno::EPIPE => e
puts "Pipe broken: #{e.message}"
ensure
writer.close
end
Message Boundary Corruption: Multiple writers interleaving data larger than PIPE_BUF corrupt message boundaries. Each writer's data may split, allowing other writers to insert data mid-message.
# WRONG: Large writes from multiple processes
reader, writer = IO.pipe
3.times do
fork do
# Write 10KB message (exceeds PIPE_BUF)
message = "Process #{Process.pid}: " + ("x" * 10000)
writer.write(message)
writer.close
end
end
writer.close
data = reader.read
reader.close
Process.waitall
# Data contains interleaved fragments
The solution keeps messages smaller than PIPE_BUF or uses a single writer with a message queue.
Select Starvation: Using IO.select with multiple pipes may cause starvation if one pipe continuously has data while others remain empty. The ready pipe gets serviced repeatedly while others never get checked.
# WRONG: Starves pipe 2 if pipe 1 always has data
pipes = [IO.pipe, IO.pipe]
readers = pipes.map(&:first)
writers = pipes.map(&:last)
# Pipe 1 constantly generates data
Thread.new do
loop { writers[0].write("x"); sleep 0.001 }
end
# Pipe 2 rarely generates data
Thread.new do
sleep 1
writers[1].write("important")
end
# Reader only processes pipe 1
loop do
ready, _, _ = IO.select(readers)
ready.each { |r| r.read_nonblock(1) }
end
Implement round-robin reading or set timeouts in select to ensure fairness.
Reference
Core Methods
| Method | Description | Returns |
|---|---|---|
| IO.pipe | Creates anonymous pipe | Array of [reader, writer] |
| IO.popen | Opens pipe to command | IO object |
| IO.select | Multiplexes multiple IOs | Array of [readable, writable, errors] |
| File.mkfifo | Creates named pipe (extension) | nil |
| IO#read | Reads all available data | String |
| IO#read_nonblock | Reads without blocking | String or raises IO::WaitReadable |
| IO#write | Writes data to pipe | Integer (bytes written) |
| IO#write_nonblock | Writes without blocking | Integer or raises IO::WaitWritable |
| IO#close | Closes pipe end | nil |
| IO#eof? | Checks for end of file | Boolean |
| IO#fcntl | Controls file descriptor flags | Integer |
Pipe Properties
| Property | Description | Typical Value |
|---|---|---|
| Buffer Size | Kernel pipe buffer capacity | 4KB to 64KB (Linux) |
| PIPE_BUF | Maximum atomic write size | 4096 bytes (POSIX) |
| Max Pipes | Per-process pipe limit | Limited by file descriptor ulimit |
| Persistence | Lifetime of pipe | Until all ends close |
Opening Modes
| Mode | Direction | Blocking Behavior |
|---|---|---|
| r | Read only | Blocks until writer opens |
| w | Write only | Blocks until reader opens |
| r+ | Read and write | Blocks until other end opens |
Error Conditions
| Error | Cause | Handling |
|---|---|---|
| Errno::EPIPE | Writing to closed pipe | Trap SIGPIPE or catch exception |
| Errno::EAGAIN | Non-blocking operation would block | Retry after IO.select |
| Errno::ENXIO | Opening named pipe with no other end | Use non-blocking open or timeout |
| EOFError | Reading from closed pipe | Check EOF before reading |
| Errno::EEXIST | Creating existing named pipe | Check existence before creation |
File Descriptor Flags
| Flag | Purpose | Operation |
|---|---|---|
| O_NONBLOCK | Non-blocking mode | fcntl(F_SETFL) |
| O_CLOEXEC | Close on exec | fcntl(F_SETFD) |
| F_GETFL | Get status flags | fcntl(F_GETFL) |
| F_SETFL | Set status flags | fcntl(F_SETFL) |
| F_GETPIPE_SZ | Get buffer size (Linux) | fcntl(F_GETPIPE_SZ) |
| F_SETPIPE_SZ | Set buffer size (Linux) | fcntl(F_SETPIPE_SZ) |
Common Patterns
| Pattern | Pipe Configuration | Use Case |
|---|---|---|
| Parent-to-Child | 1 pipe, parent writes | Configuration passing |
| Child-to-Parent | 1 pipe, child writes | Result collection |
| Bidirectional | 2 pipes, each direction | Request-response |
| Pipeline | N-1 pipes for N processes | Data transformation chain |
| Fan-out | Multiple children read same pipe | Work distribution |
| Fan-in | Multiple children write to parent | Result aggregation |
Signal Handling
| Signal | Generated When | Default Action |
|---|---|---|
| SIGPIPE | Writing to pipe with no readers | Terminate process |
Platform Differences
| Feature | Linux | macOS | Windows |
|---|---|---|---|
| Named Pipes | FIFO in filesystem | FIFO in filesystem | Named pipe server |
| Buffer Size | 4KB default, up to 1MB | 4KB | 512 bytes to 1MB |
| F_SETPIPE_SZ | Supported | Not supported | N/A |
| O_DIRECT | Supported (limited) | Not supported | Not supported |