Overview
Non-blocking I/O operations in Ruby allow programs to initiate I/O requests that return immediately, regardless of whether the operation completed. Ruby implements non-blocking I/O through several mechanisms: the IO#nonblock
method family, IO.select
, IO.poll
, and higher-level abstractions like Fibers and event loops.
The core principle involves setting file descriptors to non-blocking mode, where read and write operations return immediately if no data is available or if the operation cannot complete. When an operation cannot proceed, Ruby raises IO::EAGAINWaitReadable
or IO::EAGAINWaitWritable
exceptions rather than blocking the thread.
require 'socket'
server = TCPServer.new('localhost', 3000)
server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
begin
client = server.accept_nonblock
puts "New client connected: #{client}"
rescue IO::EAGAINWaitReadable
puts "No pending connections"
end
Ruby's IO.select
method provides multiplexed I/O monitoring, allowing programs to wait for multiple file descriptors to become ready for reading or writing:
readable, writable = IO.select([server], [], [], 0.1)
if readable&.include?(server)
client = server.accept_nonblock
end
The Fiber class works with non-blocking I/O to create cooperative multitasking patterns. When a non-blocking operation would block, the fiber can yield control to other fibers, creating async-like behavior:
fiber = Fiber.new do
begin
data = socket.read_nonblock(1024)
process_data(data)
rescue IO::EAGAINWaitReadable
Fiber.yield(:wait_readable)
retry
end
end
Basic Usage
Non-blocking I/O operations begin with configuring file descriptors for non-blocking behavior. The fcntl
system call sets the O_NONBLOCK
flag, making subsequent operations return immediately rather than waiting for completion.
require 'socket'
require 'fcntl'
socket = TCPSocket.new('example.com', 80)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
# Non-blocking write
begin
bytes_written = socket.write_nonblock("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
puts "Wrote #{bytes_written} bytes"
rescue IO::EAGAINWaitWritable
puts "Socket not ready for writing"
end
Reading data non-blockingly requires handling the absence of available data gracefully. The read_nonblock
method attempts to read the specified number of bytes, raising an exception if no data is available:
begin
data = socket.read_nonblock(1024)
puts "Received: #{data.length} bytes"
rescue IO::EAGAINWaitReadable
puts "No data available for reading"
rescue EOFError
puts "Connection closed by peer"
end
The IO.select
method monitors multiple file descriptors simultaneously, returning arrays of descriptors ready for reading, writing, or having exceptional conditions. The timeout parameter prevents indefinite blocking:
sockets = [socket1, socket2, socket3]
readable, writable, exceptional = IO.select(sockets, sockets, sockets, 1.0)
readable&.each do |sock|
begin
data = sock.read_nonblock(1024)
handle_incoming_data(sock, data)
rescue IO::EAGAINWaitReadable
# Handle race condition where select indicated ready but no data available
end
end
File I/O also supports non-blocking operations, though the behavior differs from network sockets. Regular files generally don't block on reads or writes, but pipes, FIFOs, and device files can benefit from non-blocking operations:
require 'fcntl'
pipe_read, pipe_write = IO.pipe
pipe_read.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
begin
data = pipe_read.read_nonblock(1024)
puts "Read from pipe: #{data}"
rescue IO::EAGAINWaitReadable
puts "Pipe empty"
end
Thread Safety & Concurrency
Non-blocking I/O operations interact with Ruby's Global Interpreter Lock (GIL) and threading model in specific ways. While I/O operations can release the GIL, non-blocking operations complete quickly and may not provide the same opportunities for thread switching as blocking operations.
Thread safety concerns arise when multiple threads access the same file descriptor. Ruby's I/O objects include internal synchronization, but application-level coordination remains necessary for consistent behavior:
require 'thread'
class ThreadSafeSocket
def initialize(socket)
@socket = socket
@read_mutex = Mutex.new
@write_mutex = Mutex.new
@socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
end
def safe_read_nonblock(maxlen)
@read_mutex.synchronize do
@socket.read_nonblock(maxlen)
end
rescue IO::EAGAINWaitReadable => e
e.io = @socket
raise e
end
def safe_write_nonblock(data)
@write_mutex.synchronize do
@socket.write_nonblock(data)
end
rescue IO::EAGAINWaitWritable => e
e.io = @socket
raise e
end
end
Fiber-based concurrency provides cooperative multitasking that works naturally with non-blocking I/O. Fibers can yield when I/O operations would block, allowing other fibers to execute:
class FiberScheduler
def initialize
@readable_fibers = {}
@writable_fibers = {}
end
def wait_readable(io, fiber)
@readable_fibers[io] = fiber
end
def wait_writable(io, fiber)
@writable_fibers[io] = fiber
end
def run
loop do
readable, writable = IO.select(@readable_fibers.keys, @writable_fibers.keys, [], 0.1)
readable&.each do |io|
fiber = @readable_fibers.delete(io)
fiber.resume if fiber
end
writable&.each do |io|
fiber = @writable_fibers.delete(io)
fiber.resume if fiber
end
break if @readable_fibers.empty? && @writable_fibers.empty?
end
end
end
def async_read(socket, scheduler)
Fiber.new do
begin
data = socket.read_nonblock(1024)
yield data
rescue IO::EAGAINWaitReadable
scheduler.wait_readable(socket, Fiber.current)
Fiber.yield
retry
end
end
end
Actor-pattern implementations often combine non-blocking I/O with message passing between threads. Each actor manages its own file descriptors while communicating through thread-safe queues:
require 'thread'
class IOActor
def initialize
@inbox = Queue.new
@sockets = []
@running = true
@thread = Thread.new { run }
end
def add_socket(socket)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@inbox.push([:add_socket, socket])
end
def stop
@inbox.push([:stop])
@thread.join
end
private
def run
while @running
# Process messages from inbox
while !@inbox.empty?
command, data = @inbox.pop(true) rescue nil
next unless command
case command
when :add_socket
@sockets << data
when :stop
@running = false
end
end
# Handle I/O on registered sockets
next if @sockets.empty?
readable, = IO.select(@sockets, [], [], 0.1)
readable&.each do |socket|
handle_socket_data(socket)
end
end
end
def handle_socket_data(socket)
begin
data = socket.read_nonblock(1024)
process_received_data(socket, data)
rescue IO::EAGAINWaitReadable
# Socket became unready between select and read
rescue EOFError
@sockets.delete(socket)
socket.close
end
end
end
Performance & Memory
Non-blocking I/O provides performance benefits in scenarios involving multiple concurrent connections or when I/O operations experience variable latency. The primary advantage comes from avoiding thread creation overhead and reducing context switching costs.
Memory usage patterns differ significantly between blocking and non-blocking approaches. Blocking I/O typically requires one thread per connection, with each thread consuming stack space (typically 1-8MB). Non-blocking I/O can handle thousands of connections with a single thread:
require 'benchmark'
require 'socket'
def blocking_server(port, max_connections)
server = TCPServer.new('localhost', port)
threads = []
max_connections.times do |i|
threads << Thread.new do
client = server.accept
data = client.read(1024)
client.write("Echo: #{data}")
client.close
end
end
threads.each(&:join)
end
def nonblocking_server(port, max_connections)
server = TCPServer.new('localhost', port)
server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
clients = []
processed = 0
while processed < max_connections
# Accept new connections
begin
while client = server.accept_nonblock
client.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
clients << { socket: client, state: :reading, buffer: '' }
end
rescue IO::EAGAINWaitReadable
end
# Process existing connections
clients.reject! do |client_data|
socket = client_data[:socket]
begin
case client_data[:state]
when :reading
data = socket.read_nonblock(1024)
client_data[:buffer] << data
if client_data[:buffer].include?("\n")
client_data[:response] = "Echo: #{client_data[:buffer]}"
client_data[:state] = :writing
end
when :writing
written = socket.write_nonblock(client_data[:response])
client_data[:response] = client_data[:response][written..-1]
if client_data[:response].empty?
socket.close
processed += 1
next true # Remove from clients array
end
end
rescue IO::EAGAINWaitReadable, IO::EAGAINWaitWritable
# Socket not ready, continue in next iteration
rescue EOFError
socket.close
processed += 1
next true # Remove from clients array
end
false # Keep in clients array
end
end
end
Buffer management becomes critical in non-blocking scenarios. Since operations may complete partially, applications must handle incomplete reads and writes:
class BufferedSocket
def initialize(socket)
@socket = socket
@socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@read_buffer = ''
@write_buffer = ''
end
def buffered_read_line
loop do
if line_end = @read_buffer.index("\n")
line = @read_buffer[0..line_end]
@read_buffer = @read_buffer[(line_end + 1)..-1]
return line
end
begin
data = @socket.read_nonblock(1024)
@read_buffer << data
rescue IO::EAGAINWaitReadable
return nil # No complete line available
rescue EOFError
return @read_buffer.empty? ? nil : @read_buffer
end
end
end
def buffered_write(data)
@write_buffer << data
flush_write_buffer
end
def flush_write_buffer
return true if @write_buffer.empty?
begin
written = @socket.write_nonblock(@write_buffer)
@write_buffer = @write_buffer[written..-1]
@write_buffer.empty?
rescue IO::EAGAINWaitWritable
false # Buffer not fully flushed
end
end
def has_buffered_data?
!@read_buffer.empty?
end
def write_buffer_size
@write_buffer.length
end
end
CPU usage patterns in non-blocking I/O applications often show different characteristics than blocking applications. The event loop consumes CPU continuously, even when no I/O is occurring, due to polling overhead:
class OptimizedEventLoop
def initialize
@sockets = {}
@last_activity = Time.now
@sleep_duration = 0.001 # Start with 1ms sleep
end
def add_socket(socket, handler)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@sockets[socket] = handler
end
def run
loop do
activity_found = false
readable, writable = IO.select(@sockets.keys, @sockets.keys, [], @sleep_duration)
[readable, writable].each do |ready_sockets|
next unless ready_sockets
ready_sockets.each do |socket|
handler = @sockets[socket]
handler.call(socket) if handler
activity_found = true
end
end
# Adaptive sleep duration based on activity
if activity_found
@last_activity = Time.now
@sleep_duration = 0.001
else
time_since_activity = Time.now - @last_activity
@sleep_duration = [time_since_activity * 0.1, 0.1].min
end
end
end
end
Error Handling & Debugging
Non-blocking I/O introduces specific error conditions that require careful handling. The most common exceptions are IO::EAGAINWaitReadable
and IO::EAGAINWaitWritable
, which indicate that an operation cannot proceed immediately but may succeed later.
These exceptions inherit from IO::WaitReadable
and IO::WaitWritable
respectively, allowing programs to catch the general case without specific knowledge of the underlying system error:
def robust_nonblocking_read(socket, maxlen)
retries = 0
max_retries = 3
begin
data = socket.read_nonblock(maxlen)
return data
rescue IO::WaitReadable => e
retries += 1
if retries <= max_retries
IO.select([socket], nil, nil, 0.1) # Wait up to 100ms
retry
else
raise TimeoutError, "Socket not ready after #{max_retries} retries"
end
rescue EOFError
return nil # Connection closed gracefully
rescue SystemCallError => e
case e.errno
when Errno::ECONNRESET::Errno
raise ConnectionResetError, "Connection reset by peer"
when Errno::EPIPE::Errno
raise BrokenPipeError, "Broken pipe - peer closed connection"
else
raise NetworkError, "Unexpected system error: #{e.message}"
end
end
end
Race conditions commonly occur between IO.select
indicating readiness and the actual I/O operation. The file descriptor may become unavailable in the brief interval between these operations:
class RobustEventLoop
def initialize
@sockets = []
@error_count = Hash.new(0)
end
def add_socket(socket)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@sockets << socket
end
def run
loop do
ready_to_read, ready_to_write = IO.select(@sockets, @sockets, [], 1.0)
[ready_to_read, ready_to_write].each do |ready_sockets|
next unless ready_sockets
ready_sockets.each do |socket|
handle_socket_with_error_tracking(socket)
end
end
cleanup_errored_sockets
end
end
private
def handle_socket_with_error_tracking(socket)
begin
data = socket.read_nonblock(1024)
process_data(socket, data)
@error_count.delete(socket) # Reset error count on successful read
rescue IO::WaitReadable
# Race condition: socket became unready after select
@error_count[socket] += 1
rescue EOFError
@sockets.delete(socket)
@error_count.delete(socket)
socket.close
rescue StandardError => e
@error_count[socket] += 1
log_error(socket, e)
end
end
def cleanup_errored_sockets
@error_count.each do |socket, count|
if count > 10
puts "Removing problematic socket after #{count} errors"
@sockets.delete(socket)
@error_count.delete(socket)
socket.close rescue nil
end
end
end
def log_error(socket, error)
puts "Error on #{socket}: #{error.class} - #{error.message}"
puts error.backtrace.first(5) if error.backtrace
end
end
Debugging non-blocking I/O applications requires different techniques than traditional blocking code. State inspection becomes crucial since operations may be partially completed:
class DebuggableNonblockingServer
def initialize(port)
@server = TCPServer.new('localhost', port)
@server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@clients = {}
@stats = {
connections_accepted: 0,
bytes_read: 0,
bytes_written: 0,
errors: Hash.new(0)
}
@debug_mode = ENV['DEBUG'] == 'true'
end
def run
loop do
accept_new_connections
process_existing_connections
print_debug_info if @debug_mode
sleep 0.01
end
end
private
def accept_new_connections
loop do
begin
client_socket = @server.accept_nonblock
client_socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
client_id = client_socket.object_id
@clients[client_id] = {
socket: client_socket,
state: :reading,
buffer: '',
bytes_read: 0,
bytes_written: 0,
created_at: Time.now
}
@stats[:connections_accepted] += 1
debug_log "Accepted connection #{client_id}"
rescue IO::WaitReadable
break # No more pending connections
rescue StandardError => e
@stats[:errors][e.class] += 1
debug_log "Error accepting connection: #{e.message}"
break
end
end
end
def process_existing_connections
@clients.each do |client_id, client_data|
begin
case client_data[:state]
when :reading
data = client_data[:socket].read_nonblock(1024)
client_data[:buffer] << data
client_data[:bytes_read] += data.length
@stats[:bytes_read] += data.length
if client_data[:buffer].include?("\n")
client_data[:response] = "Echo: #{client_data[:buffer]}"
client_data[:state] = :writing
debug_log "Client #{client_id} ready for writing"
end
when :writing
written = client_data[:socket].write_nonblock(client_data[:response])
client_data[:response] = client_data[:response][written..-1]
client_data[:bytes_written] += written
@stats[:bytes_written] += written
if client_data[:response].empty?
client_data[:socket].close
@clients.delete(client_id)
debug_log "Client #{client_id} connection closed"
end
end
rescue IO::WaitReadable, IO::WaitWritable
# Socket not ready, will try again next iteration
rescue EOFError
client_data[:socket].close
@clients.delete(client_id)
debug_log "Client #{client_id} disconnected"
rescue StandardError => e
@stats[:errors][e.class] += 1
client_data[:socket].close rescue nil
@clients.delete(client_id)
debug_log "Error with client #{client_id}: #{e.message}"
end
end
end
def print_debug_info
return unless @debug_mode && Time.now.to_i % 5 == 0 # Every 5 seconds
puts "\n=== Debug Info ==="
puts "Active connections: #{@clients.count}"
puts "Total accepted: #{@stats[:connections_accepted]}"
puts "Bytes read: #{@stats[:bytes_read]}"
puts "Bytes written: #{@stats[:bytes_written]}"
puts "Errors: #{@stats[:errors]}"
@clients.each do |client_id, data|
age = Time.now - data[:created_at]
puts " Client #{client_id}: #{data[:state]}, age: #{age.round(2)}s, " \
"read: #{data[:bytes_read]}, written: #{data[:bytes_written]}"
end
puts "=================="
end
def debug_log(message)
puts "[#{Time.now}] #{message}" if @debug_mode
end
end
Production Patterns
Production deployments of non-blocking I/O applications require careful consideration of resource limits, monitoring, and graceful degradation. Operating system limits on file descriptors often become the primary constraint for high-concurrency applications.
Connection pooling and resource management become critical when handling thousands of simultaneous connections. Applications must track and limit resource usage to prevent system exhaustion:
class ProductionServer
MAX_CONNECTIONS = 10000
CONNECTION_TIMEOUT = 300 # 5 minutes
def initialize(port)
@server = TCPServer.new('localhost', port)
@server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@connections = {}
@connection_count = 0
@last_cleanup = Time.now
# Set process limits
Process.setrlimit(Process::RLIMIT_NOFILE, 65536, 65536)
# Initialize metrics
@metrics = {
total_connections: 0,
active_connections: 0,
bytes_processed: 0,
errors_per_minute: CircularBuffer.new(60),
response_times: CircularBuffer.new(1000)
}
# Start background threads
start_metrics_thread
start_cleanup_thread
end
def run
trap('INT') { shutdown_gracefully }
trap('TERM') { shutdown_gracefully }
puts "Server starting on port #{@server.addr[1]} (PID: #{Process.pid})"
loop do
accept_connections
process_connections
sleep 0.001 # Yield briefly to prevent CPU spinning
end
rescue SystemExit
puts "Server shutting down gracefully..."
end
private
def accept_connections
loop do
if @connection_count >= MAX_CONNECTIONS
# Reject new connections when at capacity
begin
client = @server.accept_nonblock
client.write("HTTP/1.1 503 Service Unavailable\r\n\r\n")
client.close
rescue IO::WaitReadable
break
end
next
end
begin
client_socket = @server.accept_nonblock
setup_client_connection(client_socket)
rescue IO::WaitReadable
break # No pending connections
rescue StandardError => e
record_error(e)
break
end
end
end
def setup_client_connection(socket)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
# Configure socket options for production
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
connection_id = socket.object_id
@connections[connection_id] = {
socket: socket,
created_at: Time.now,
last_activity: Time.now,
bytes_read: 0,
bytes_written: 0,
state: :reading,
request_buffer: '',
response_queue: []
}
@connection_count += 1
@metrics[:total_connections] += 1
@metrics[:active_connections] = @connection_count
end
def process_connections
return if @connections.empty?
# Use select to efficiently wait for ready sockets
readable = @connections.values.map { |conn| conn[:socket] }
writable = @connections.values.select { |conn| !conn[:response_queue].empty? }.map { |conn| conn[:socket] }
ready_read, ready_write = IO.select(readable, writable, [], 0.1)
ready_read&.each { |socket| handle_read(socket) }
ready_write&.each { |socket| handle_write(socket) }
end
def handle_read(socket)
connection = @connections[socket.object_id]
return unless connection
start_time = Time.now
begin
data = socket.read_nonblock(4096)
connection[:request_buffer] << data
connection[:bytes_read] += data.length
connection[:last_activity] = Time.now
@metrics[:bytes_processed] += data.length
# Process complete HTTP requests
while request = extract_http_request(connection[:request_buffer])
response = process_http_request(request)
connection[:response_queue] << response
end
rescue IO::WaitReadable
# Socket not ready, will try again later
rescue EOFError
close_connection(socket.object_id)
rescue StandardError => e
record_error(e)
close_connection(socket.object_id)
ensure
response_time = Time.now - start_time
@metrics[:response_times].add(response_time)
end
end
def handle_write(socket)
connection = @connections[socket.object_id]
return unless connection
return if connection[:response_queue].empty?
begin
response = connection[:response_queue].first
written = socket.write_nonblock(response)
connection[:bytes_written] += written
connection[:last_activity] = Time.now
if written == response.length
connection[:response_queue].shift
# Close connection after response (HTTP/1.0 style)
close_connection(socket.object_id) if connection[:response_queue].empty?
else
connection[:response_queue][0] = response[written..-1]
end
rescue IO::WaitWritable
# Socket not ready for writing
rescue StandardError => e
record_error(e)
close_connection(socket.object_id)
end
end
def extract_http_request(buffer)
end_of_headers = buffer.index("\r\n\r\n")
return nil unless end_of_headers
request = buffer[0..(end_of_headers + 3)]
buffer.replace(buffer[(end_of_headers + 4)..-1] || '')
request
end
def process_http_request(request)
# Simple HTTP response
body = "Hello, World! Time: #{Time.now}\n"
"HTTP/1.1 200 OK\r\n" \
"Content-Length: #{body.length}\r\n" \
"Content-Type: text/plain\r\n" \
"Connection: close\r\n" \
"\r\n" \
"#{body}"
end
def close_connection(connection_id)
connection = @connections.delete(connection_id)
return unless connection
connection[:socket].close rescue nil
@connection_count -= 1
@metrics[:active_connections] = @connection_count
end
def start_cleanup_thread
Thread.new do
loop do
sleep 30 # Run cleanup every 30 seconds
cleanup_stale_connections
end
end
end
def cleanup_stale_connections
now = Time.now
stale_connections = @connections.select do |_, conn|
(now - conn[:last_activity]) > CONNECTION_TIMEOUT
end
stale_connections.each do |connection_id, _|
puts "Closing stale connection: #{connection_id}"
close_connection(connection_id)
end
end
def start_metrics_thread
Thread.new do
loop do
sleep 60 # Log metrics every minute
log_metrics
end
end
end
def log_metrics
avg_response_time = @metrics[:response_times].average
errors_last_minute = @metrics[:errors_per_minute].sum
puts "[METRICS] Active: #{@metrics[:active_connections]}, " \
"Total: #{@metrics[:total_connections]}, " \
"Avg Response: #{avg_response_time&.round(4)}s, " \
"Errors/min: #{errors_last_minute}, " \
"Bytes: #{@metrics[:bytes_processed]}"
end
def record_error(error)
@metrics[:errors_per_minute].add(1)
puts "[ERROR] #{error.class}: #{error.message}"
end
def shutdown_gracefully
puts "Shutting down server..."
# Close all client connections
@connections.each { |id, _| close_connection(id) }
# Close server socket
@server.close rescue nil
exit(0)
end
end
class CircularBuffer
def initialize(size)
@size = size
@buffer = []
@index = 0
end
def add(value)
if @buffer.length < @size
@buffer << value
else
@buffer[@index] = value
@index = (@index + 1) % @size
end
end
def sum
@buffer.sum
end
def average
return nil if @buffer.empty?
@buffer.sum.to_f / @buffer.length
end
end
Load balancing and horizontal scaling patterns often distribute non-blocking I/O servers across multiple processes or machines. Process-based scaling avoids GIL limitations while maintaining the benefits of event-driven architecture:
class ScalableServerCluster
def initialize(port, worker_count = nil)
@port = port
@worker_count = worker_count || [4, Process.nproc].min
@workers = []
@master_pid = Process.pid
end
def start
puts "Starting server cluster with #{@worker_count} workers on port #{@port}"
# Create shared server socket
server = TCPServer.new('localhost', @port)
server.listen(1024) # Large listen backlog
# Start worker processes
@worker_count.times do |i|
pid = fork do
Process.setproctitle("worker-#{i}")
worker = ProductionServer.new(nil) # Pass nil since we have server socket
worker.instance_variable_set(:@server, server)
worker.run
end
@workers << { pid: pid, worker_id: i }
puts "Started worker #{i} (PID: #{pid})"
end
# Master process monitoring
setup_signal_handlers
monitor_workers
end
private
def setup_signal_handlers
trap('INT') { shutdown_cluster }
trap('TERM') { shutdown_cluster }
trap('CHLD') { handle_worker_death }
end
def monitor_workers
loop do
sleep 5
check_worker_health
end
end
def check_worker_health
@workers.each do |worker|
unless process_exists?(worker[:pid])
puts "Worker #{worker[:worker_id]} (PID: #{worker[:pid]}) died, restarting..."
restart_worker(worker)
end
end
end
def restart_worker(dead_worker)
server = TCPServer.new('localhost', @port)
new_pid = fork do
Process.setproctitle("worker-#{dead_worker[:worker_id]}")
worker = ProductionServer.new(nil)
worker.instance_variable_set(:@server, server)
worker.run
end
dead_worker[:pid] = new_pid
puts "Restarted worker #{dead_worker[:worker_id]} (new PID: #{new_pid})"
end
def process_exists?(pid)
Process.getpgid(pid)
true
rescue Errno::ESRCH
false
end
def handle_worker_death
# Reap zombie processes
loop do
pid = Process.waitpid(-1, Process::WNOHANG)
break unless pid
puts "Reaped worker process #{pid}"
end
rescue Errno::ECHILD
# No child processes
end
def shutdown_cluster
puts "Shutting down cluster..."
@workers.each do |worker|
begin
Process.kill('TERM', worker[:pid])
rescue Errno::ESRCH
# Process already dead
end
end
# Wait for workers to shut down gracefully
timeout = Time.now + 10
while Time.now < timeout && @workers.any? { |w| process_exists?(w[:pid]) }
sleep 0.1
end
# Force kill any remaining workers
@workers.each do |worker|
begin
Process.kill('KILL', worker[:pid])
rescue Errno::ESRCH
end
end
exit(0)
end
end
Reference
Core Methods
Method | Parameters | Returns | Description |
---|---|---|---|
IO#read_nonblock(maxlen, outbuf=nil) |
maxlen (Integer), outbuf (String) |
String |
Read up to maxlen bytes without blocking |
IO#write_nonblock(string) |
string (String) |
Integer |
Write string without blocking, returns bytes written |
TCPServer#accept_nonblock(**opts) |
exception (Boolean) |
TCPSocket |
Accept connection without blocking |
IO.select(read_ios, write_ios=[], error_ios=[], timeout=nil) |
Arrays of IO objects, timeout (Numeric) | Array |
Monitor multiple IO objects for readiness |
IO#fcntl(cmd, arg=0) |
cmd (Integer), arg (Integer) |
Integer |
Perform control operations on file descriptor |
Exception Hierarchy
Exception | Parent | Description |
---|---|---|
IO::WaitReadable |
IOError |
Base class for read wait conditions |
IO::WaitWritable |
IOError |
Base class for write wait conditions |
IO::EAGAINWaitReadable |
IO::WaitReadable , Errno::EAGAIN |
No data available for reading |
IO::EAGAINWaitWritable |
IO::WaitWritable , Errno::EAGAIN |
Cannot write data immediately |
IO::EWOULDBLOCKWaitReadable |
IO::WaitReadable , Errno::EWOULDBLOCK |
Operation would block (reading) |
IO::EWOULDBLOCKWaitWritable |
IO::WaitWritable , Errno::EWOULDBLOCK |
Operation would block (writing) |
File Control Constants
Constant | Value | Description |
---|---|---|
Fcntl::F_GETFL |
Platform-specific | Get file descriptor flags |
Fcntl::F_SETFL |
Platform-specific | Set file descriptor flags |
Fcntl::O_NONBLOCK |
Platform-specific | Non-blocking I/O flag |
Fcntl::O_RDONLY |
0 | Read-only access mode |
Fcntl::O_WRONLY |
1 | Write-only access mode |
Fcntl::O_RDWR |
2 | Read-write access mode |
Socket Options
Method | Parameters | Description |
---|---|---|
Socket#setsockopt(level, optname, optval) |
level (Integer), optname (Integer), optval |
Set socket option |
Socket::SOL_SOCKET |
- | Socket level for generic socket options |
Socket::IPPROTO_TCP |
- | TCP protocol level |
Socket::TCP_NODELAY |
- | Disable Nagle's algorithm |
Socket::SO_KEEPALIVE |
- | Enable keep-alive messages |
IO.select Return Values
Position | Content | Description |
---|---|---|
[0] |
Array<IO> or nil |
IO objects ready for reading |
[1] |
Array<IO> or nil |
IO objects ready for writing |
[2] |
Array<IO> or nil |
IO objects with exceptional conditions |
Common Error Codes
Error | Errno | Common Cause |
---|---|---|
EAGAIN |
11 | Resource temporarily unavailable |
EWOULDBLOCK |
11 | Operation would block (usually same as EAGAIN) |
ECONNRESET |
104 | Connection reset by peer |
EPIPE |
32 | Broken pipe (write to closed socket) |
EBADF |
9 | Bad file descriptor |
EINTR |
4 | Interrupted system call |
Buffer Size Guidelines
Operation Type | Recommended Size | Rationale |
---|---|---|
Socket read | 4096-8192 bytes | Balance between system calls and memory usage |
File read | 64KB-1MB | Larger buffers reduce system call overhead |
Socket write | 4096-65536 bytes | Avoid partial writes while limiting memory |
Pipe operations | 4096 bytes | Match typical pipe buffer size |
Performance Considerations
Scenario | Blocking I/O | Non-blocking I/O | Recommendation |
---|---|---|---|
< 100 connections | Lower CPU overhead | Higher CPU overhead | Use blocking I/O |
100-1000 connections | High memory usage | Moderate memory usage | Consider non-blocking |
> 1000 connections | Excessive memory/threads | Efficient resource usage | Use non-blocking I/O |
CPU-bound tasks | Better thread utilization | Single-threaded bottleneck | Use blocking with threads |
I/O-bound tasks | Thread context switching | Efficient event loop | Use non-blocking I/O |