CrackedRuby logo

CrackedRuby

Non-blocking I/O

Non-blocking I/O in Ruby provides mechanisms to perform input/output operations without blocking the executing thread, enabling concurrent processing and improved application responsiveness.

Concurrency and Parallelism Async I/O
6.6.1

Overview

Non-blocking I/O operations in Ruby allow programs to initiate I/O requests that return immediately, regardless of whether the operation completed. Ruby implements non-blocking I/O through several mechanisms: the IO#nonblock method family, IO.select, IO.poll, and higher-level abstractions like Fibers and event loops.

The core principle involves setting file descriptors to non-blocking mode, where read and write operations return immediately if no data is available or if the operation cannot complete. When an operation cannot proceed, Ruby raises IO::EAGAINWaitReadable or IO::EAGAINWaitWritable exceptions rather than blocking the thread.

require 'socket'

server = TCPServer.new('localhost', 3000)
server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)

begin
  client = server.accept_nonblock
  puts "New client connected: #{client}"
rescue IO::EAGAINWaitReadable
  puts "No pending connections"
end

Ruby's IO.select method provides multiplexed I/O monitoring, allowing programs to wait for multiple file descriptors to become ready for reading or writing:

readable, writable = IO.select([server], [], [], 0.1)
if readable&.include?(server)
  client = server.accept_nonblock
end

The Fiber class works with non-blocking I/O to create cooperative multitasking patterns. When a non-blocking operation would block, the fiber can yield control to other fibers, creating async-like behavior:

fiber = Fiber.new do
  begin
    data = socket.read_nonblock(1024)
    process_data(data)
  rescue IO::EAGAINWaitReadable
    Fiber.yield(:wait_readable)
    retry
  end
end

Basic Usage

Non-blocking I/O operations begin with configuring file descriptors for non-blocking behavior. The fcntl system call sets the O_NONBLOCK flag, making subsequent operations return immediately rather than waiting for completion.

require 'socket'
require 'fcntl'

socket = TCPSocket.new('example.com', 80)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)

# Non-blocking write
begin
  bytes_written = socket.write_nonblock("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
  puts "Wrote #{bytes_written} bytes"
rescue IO::EAGAINWaitWritable
  puts "Socket not ready for writing"
end

Reading data non-blockingly requires handling the absence of available data gracefully. The read_nonblock method attempts to read the specified number of bytes, raising an exception if no data is available:

begin
  data = socket.read_nonblock(1024)
  puts "Received: #{data.length} bytes"
rescue IO::EAGAINWaitReadable
  puts "No data available for reading"
rescue EOFError
  puts "Connection closed by peer"
end

The IO.select method monitors multiple file descriptors simultaneously, returning arrays of descriptors ready for reading, writing, or having exceptional conditions. The timeout parameter prevents indefinite blocking:

sockets = [socket1, socket2, socket3]
readable, writable, exceptional = IO.select(sockets, sockets, sockets, 1.0)

readable&.each do |sock|
  begin
    data = sock.read_nonblock(1024)
    handle_incoming_data(sock, data)
  rescue IO::EAGAINWaitReadable
    # Handle race condition where select indicated ready but no data available
  end
end

File I/O also supports non-blocking operations, though the behavior differs from network sockets. Regular files generally don't block on reads or writes, but pipes, FIFOs, and device files can benefit from non-blocking operations:

require 'fcntl'

pipe_read, pipe_write = IO.pipe
pipe_read.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)

begin
  data = pipe_read.read_nonblock(1024)
  puts "Read from pipe: #{data}"
rescue IO::EAGAINWaitReadable
  puts "Pipe empty"
end

Thread Safety & Concurrency

Non-blocking I/O operations interact with Ruby's Global Interpreter Lock (GIL) and threading model in specific ways. While I/O operations can release the GIL, non-blocking operations complete quickly and may not provide the same opportunities for thread switching as blocking operations.

Thread safety concerns arise when multiple threads access the same file descriptor. Ruby's I/O objects include internal synchronization, but application-level coordination remains necessary for consistent behavior:

require 'thread'

class ThreadSafeSocket
  def initialize(socket)
    @socket = socket
    @read_mutex = Mutex.new
    @write_mutex = Mutex.new
    @socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  end

  def safe_read_nonblock(maxlen)
    @read_mutex.synchronize do
      @socket.read_nonblock(maxlen)
    end
  rescue IO::EAGAINWaitReadable => e
    e.io = @socket
    raise e
  end

  def safe_write_nonblock(data)
    @write_mutex.synchronize do
      @socket.write_nonblock(data)
    end
  rescue IO::EAGAINWaitWritable => e
    e.io = @socket
    raise e
  end
end

Fiber-based concurrency provides cooperative multitasking that works naturally with non-blocking I/O. Fibers can yield when I/O operations would block, allowing other fibers to execute:

class FiberScheduler
  def initialize
    @readable_fibers = {}
    @writable_fibers = {}
  end

  def wait_readable(io, fiber)
    @readable_fibers[io] = fiber
  end

  def wait_writable(io, fiber)
    @writable_fibers[io] = fiber
  end

  def run
    loop do
      readable, writable = IO.select(@readable_fibers.keys, @writable_fibers.keys, [], 0.1)
      
      readable&.each do |io|
        fiber = @readable_fibers.delete(io)
        fiber.resume if fiber
      end
      
      writable&.each do |io|
        fiber = @writable_fibers.delete(io)
        fiber.resume if fiber
      end
      
      break if @readable_fibers.empty? && @writable_fibers.empty?
    end
  end
end

def async_read(socket, scheduler)
  Fiber.new do
    begin
      data = socket.read_nonblock(1024)
      yield data
    rescue IO::EAGAINWaitReadable
      scheduler.wait_readable(socket, Fiber.current)
      Fiber.yield
      retry
    end
  end
end

Actor-pattern implementations often combine non-blocking I/O with message passing between threads. Each actor manages its own file descriptors while communicating through thread-safe queues:

require 'thread'

class IOActor
  def initialize
    @inbox = Queue.new
    @sockets = []
    @running = true
    @thread = Thread.new { run }
  end

  def add_socket(socket)
    socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @inbox.push([:add_socket, socket])
  end

  def stop
    @inbox.push([:stop])
    @thread.join
  end

  private

  def run
    while @running
      # Process messages from inbox
      while !@inbox.empty?
        command, data = @inbox.pop(true) rescue nil
        next unless command
        
        case command
        when :add_socket
          @sockets << data
        when :stop
          @running = false
        end
      end

      # Handle I/O on registered sockets
      next if @sockets.empty?
      
      readable, = IO.select(@sockets, [], [], 0.1)
      readable&.each do |socket|
        handle_socket_data(socket)
      end
    end
  end

  def handle_socket_data(socket)
    begin
      data = socket.read_nonblock(1024)
      process_received_data(socket, data)
    rescue IO::EAGAINWaitReadable
      # Socket became unready between select and read
    rescue EOFError
      @sockets.delete(socket)
      socket.close
    end
  end
end

Performance & Memory

Non-blocking I/O provides performance benefits in scenarios involving multiple concurrent connections or when I/O operations experience variable latency. The primary advantage comes from avoiding thread creation overhead and reducing context switching costs.

Memory usage patterns differ significantly between blocking and non-blocking approaches. Blocking I/O typically requires one thread per connection, with each thread consuming stack space (typically 1-8MB). Non-blocking I/O can handle thousands of connections with a single thread:

require 'benchmark'
require 'socket'

def blocking_server(port, max_connections)
  server = TCPServer.new('localhost', port)
  threads = []
  
  max_connections.times do |i|
    threads << Thread.new do
      client = server.accept
      data = client.read(1024)
      client.write("Echo: #{data}")
      client.close
    end
  end
  
  threads.each(&:join)
end

def nonblocking_server(port, max_connections)
  server = TCPServer.new('localhost', port)
  server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  
  clients = []
  processed = 0
  
  while processed < max_connections
    # Accept new connections
    begin
      while client = server.accept_nonblock
        client.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
        clients << { socket: client, state: :reading, buffer: '' }
      end
    rescue IO::EAGAINWaitReadable
    end
    
    # Process existing connections
    clients.reject! do |client_data|
      socket = client_data[:socket]
      
      begin
        case client_data[:state]
        when :reading
          data = socket.read_nonblock(1024)
          client_data[:buffer] << data
          if client_data[:buffer].include?("\n")
            client_data[:response] = "Echo: #{client_data[:buffer]}"
            client_data[:state] = :writing
          end
        when :writing
          written = socket.write_nonblock(client_data[:response])
          client_data[:response] = client_data[:response][written..-1]
          if client_data[:response].empty?
            socket.close
            processed += 1
            next true  # Remove from clients array
          end
        end
      rescue IO::EAGAINWaitReadable, IO::EAGAINWaitWritable
        # Socket not ready, continue in next iteration
      rescue EOFError
        socket.close
        processed += 1
        next true  # Remove from clients array
      end
      
      false  # Keep in clients array
    end
  end
end

Buffer management becomes critical in non-blocking scenarios. Since operations may complete partially, applications must handle incomplete reads and writes:

class BufferedSocket
  def initialize(socket)
    @socket = socket
    @socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @read_buffer = ''
    @write_buffer = ''
  end

  def buffered_read_line
    loop do
      if line_end = @read_buffer.index("\n")
        line = @read_buffer[0..line_end]
        @read_buffer = @read_buffer[(line_end + 1)..-1]
        return line
      end

      begin
        data = @socket.read_nonblock(1024)
        @read_buffer << data
      rescue IO::EAGAINWaitReadable
        return nil  # No complete line available
      rescue EOFError
        return @read_buffer.empty? ? nil : @read_buffer
      end
    end
  end

  def buffered_write(data)
    @write_buffer << data
    flush_write_buffer
  end

  def flush_write_buffer
    return true if @write_buffer.empty?

    begin
      written = @socket.write_nonblock(@write_buffer)
      @write_buffer = @write_buffer[written..-1]
      @write_buffer.empty?
    rescue IO::EAGAINWaitWritable
      false  # Buffer not fully flushed
    end
  end

  def has_buffered_data?
    !@read_buffer.empty?
  end

  def write_buffer_size
    @write_buffer.length
  end
end

CPU usage patterns in non-blocking I/O applications often show different characteristics than blocking applications. The event loop consumes CPU continuously, even when no I/O is occurring, due to polling overhead:

class OptimizedEventLoop
  def initialize
    @sockets = {}
    @last_activity = Time.now
    @sleep_duration = 0.001  # Start with 1ms sleep
  end

  def add_socket(socket, handler)
    socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @sockets[socket] = handler
  end

  def run
    loop do
      activity_found = false
      readable, writable = IO.select(@sockets.keys, @sockets.keys, [], @sleep_duration)
      
      [readable, writable].each do |ready_sockets|
        next unless ready_sockets
        
        ready_sockets.each do |socket|
          handler = @sockets[socket]
          handler.call(socket) if handler
          activity_found = true
        end
      end
      
      # Adaptive sleep duration based on activity
      if activity_found
        @last_activity = Time.now
        @sleep_duration = 0.001
      else
        time_since_activity = Time.now - @last_activity
        @sleep_duration = [time_since_activity * 0.1, 0.1].min
      end
    end
  end
end

Error Handling & Debugging

Non-blocking I/O introduces specific error conditions that require careful handling. The most common exceptions are IO::EAGAINWaitReadable and IO::EAGAINWaitWritable, which indicate that an operation cannot proceed immediately but may succeed later.

These exceptions inherit from IO::WaitReadable and IO::WaitWritable respectively, allowing programs to catch the general case without specific knowledge of the underlying system error:

def robust_nonblocking_read(socket, maxlen)
  retries = 0
  max_retries = 3

  begin
    data = socket.read_nonblock(maxlen)
    return data
  rescue IO::WaitReadable => e
    retries += 1
    if retries <= max_retries
      IO.select([socket], nil, nil, 0.1)  # Wait up to 100ms
      retry
    else
      raise TimeoutError, "Socket not ready after #{max_retries} retries"
    end
  rescue EOFError
    return nil  # Connection closed gracefully
  rescue SystemCallError => e
    case e.errno
    when Errno::ECONNRESET::Errno
      raise ConnectionResetError, "Connection reset by peer"
    when Errno::EPIPE::Errno
      raise BrokenPipeError, "Broken pipe - peer closed connection"
    else
      raise NetworkError, "Unexpected system error: #{e.message}"
    end
  end
end

Race conditions commonly occur between IO.select indicating readiness and the actual I/O operation. The file descriptor may become unavailable in the brief interval between these operations:

class RobustEventLoop
  def initialize
    @sockets = []
    @error_count = Hash.new(0)
  end

  def add_socket(socket)
    socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @sockets << socket
  end

  def run
    loop do
      ready_to_read, ready_to_write = IO.select(@sockets, @sockets, [], 1.0)
      
      [ready_to_read, ready_to_write].each do |ready_sockets|
        next unless ready_sockets
        
        ready_sockets.each do |socket|
          handle_socket_with_error_tracking(socket)
        end
      end
      
      cleanup_errored_sockets
    end
  end

  private

  def handle_socket_with_error_tracking(socket)
    begin
      data = socket.read_nonblock(1024)
      process_data(socket, data)
      @error_count.delete(socket)  # Reset error count on successful read
    rescue IO::WaitReadable
      # Race condition: socket became unready after select
      @error_count[socket] += 1
    rescue EOFError
      @sockets.delete(socket)
      @error_count.delete(socket)
      socket.close
    rescue StandardError => e
      @error_count[socket] += 1
      log_error(socket, e)
    end
  end

  def cleanup_errored_sockets
    @error_count.each do |socket, count|
      if count > 10
        puts "Removing problematic socket after #{count} errors"
        @sockets.delete(socket)
        @error_count.delete(socket)
        socket.close rescue nil
      end
    end
  end

  def log_error(socket, error)
    puts "Error on #{socket}: #{error.class} - #{error.message}"
    puts error.backtrace.first(5) if error.backtrace
  end
end

Debugging non-blocking I/O applications requires different techniques than traditional blocking code. State inspection becomes crucial since operations may be partially completed:

class DebuggableNonblockingServer
  def initialize(port)
    @server = TCPServer.new('localhost', port)
    @server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @clients = {}
    @stats = {
      connections_accepted: 0,
      bytes_read: 0,
      bytes_written: 0,
      errors: Hash.new(0)
    }
    @debug_mode = ENV['DEBUG'] == 'true'
  end

  def run
    loop do
      accept_new_connections
      process_existing_connections
      print_debug_info if @debug_mode
      sleep 0.01
    end
  end

  private

  def accept_new_connections
    loop do
      begin
        client_socket = @server.accept_nonblock
        client_socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
        
        client_id = client_socket.object_id
        @clients[client_id] = {
          socket: client_socket,
          state: :reading,
          buffer: '',
          bytes_read: 0,
          bytes_written: 0,
          created_at: Time.now
        }
        
        @stats[:connections_accepted] += 1
        debug_log "Accepted connection #{client_id}"
        
      rescue IO::WaitReadable
        break  # No more pending connections
      rescue StandardError => e
        @stats[:errors][e.class] += 1
        debug_log "Error accepting connection: #{e.message}"
        break
      end
    end
  end

  def process_existing_connections
    @clients.each do |client_id, client_data|
      begin
        case client_data[:state]
        when :reading
          data = client_data[:socket].read_nonblock(1024)
          client_data[:buffer] << data
          client_data[:bytes_read] += data.length
          @stats[:bytes_read] += data.length
          
          if client_data[:buffer].include?("\n")
            client_data[:response] = "Echo: #{client_data[:buffer]}"
            client_data[:state] = :writing
            debug_log "Client #{client_id} ready for writing"
          end
          
        when :writing
          written = client_data[:socket].write_nonblock(client_data[:response])
          client_data[:response] = client_data[:response][written..-1]
          client_data[:bytes_written] += written
          @stats[:bytes_written] += written
          
          if client_data[:response].empty?
            client_data[:socket].close
            @clients.delete(client_id)
            debug_log "Client #{client_id} connection closed"
          end
        end
        
      rescue IO::WaitReadable, IO::WaitWritable
        # Socket not ready, will try again next iteration
      rescue EOFError
        client_data[:socket].close
        @clients.delete(client_id)
        debug_log "Client #{client_id} disconnected"
      rescue StandardError => e
        @stats[:errors][e.class] += 1
        client_data[:socket].close rescue nil
        @clients.delete(client_id)
        debug_log "Error with client #{client_id}: #{e.message}"
      end
    end
  end

  def print_debug_info
    return unless @debug_mode && Time.now.to_i % 5 == 0  # Every 5 seconds
    
    puts "\n=== Debug Info ==="
    puts "Active connections: #{@clients.count}"
    puts "Total accepted: #{@stats[:connections_accepted]}"
    puts "Bytes read: #{@stats[:bytes_read]}"
    puts "Bytes written: #{@stats[:bytes_written]}"
    puts "Errors: #{@stats[:errors]}"
    
    @clients.each do |client_id, data|
      age = Time.now - data[:created_at]
      puts "  Client #{client_id}: #{data[:state]}, age: #{age.round(2)}s, " \
           "read: #{data[:bytes_read]}, written: #{data[:bytes_written]}"
    end
    puts "=================="
  end

  def debug_log(message)
    puts "[#{Time.now}] #{message}" if @debug_mode
  end
end

Production Patterns

Production deployments of non-blocking I/O applications require careful consideration of resource limits, monitoring, and graceful degradation. Operating system limits on file descriptors often become the primary constraint for high-concurrency applications.

Connection pooling and resource management become critical when handling thousands of simultaneous connections. Applications must track and limit resource usage to prevent system exhaustion:

class ProductionServer
  MAX_CONNECTIONS = 10000
  CONNECTION_TIMEOUT = 300  # 5 minutes
  
  def initialize(port)
    @server = TCPServer.new('localhost', port)
    @server.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    
    @connections = {}
    @connection_count = 0
    @last_cleanup = Time.now
    
    # Set process limits
    Process.setrlimit(Process::RLIMIT_NOFILE, 65536, 65536)
    
    # Initialize metrics
    @metrics = {
      total_connections: 0,
      active_connections: 0,
      bytes_processed: 0,
      errors_per_minute: CircularBuffer.new(60),
      response_times: CircularBuffer.new(1000)
    }
    
    # Start background threads
    start_metrics_thread
    start_cleanup_thread
  end

  def run
    trap('INT') { shutdown_gracefully }
    trap('TERM') { shutdown_gracefully }
    
    puts "Server starting on port #{@server.addr[1]} (PID: #{Process.pid})"
    
    loop do
      accept_connections
      process_connections
      sleep 0.001  # Yield briefly to prevent CPU spinning
    end
  rescue SystemExit
    puts "Server shutting down gracefully..."
  end

  private

  def accept_connections
    loop do
      if @connection_count >= MAX_CONNECTIONS
        # Reject new connections when at capacity
        begin
          client = @server.accept_nonblock
          client.write("HTTP/1.1 503 Service Unavailable\r\n\r\n")
          client.close
        rescue IO::WaitReadable
          break
        end
        next
      end

      begin
        client_socket = @server.accept_nonblock
        setup_client_connection(client_socket)
      rescue IO::WaitReadable
        break  # No pending connections
      rescue StandardError => e
        record_error(e)
        break
      end
    end
  end

  def setup_client_connection(socket)
    socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    
    # Configure socket options for production
    socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
    socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
    
    connection_id = socket.object_id
    @connections[connection_id] = {
      socket: socket,
      created_at: Time.now,
      last_activity: Time.now,
      bytes_read: 0,
      bytes_written: 0,
      state: :reading,
      request_buffer: '',
      response_queue: []
    }
    
    @connection_count += 1
    @metrics[:total_connections] += 1
    @metrics[:active_connections] = @connection_count
  end

  def process_connections
    return if @connections.empty?

    # Use select to efficiently wait for ready sockets
    readable = @connections.values.map { |conn| conn[:socket] }
    writable = @connections.values.select { |conn| !conn[:response_queue].empty? }.map { |conn| conn[:socket] }
    
    ready_read, ready_write = IO.select(readable, writable, [], 0.1)
    
    ready_read&.each { |socket| handle_read(socket) }
    ready_write&.each { |socket| handle_write(socket) }
  end

  def handle_read(socket)
    connection = @connections[socket.object_id]
    return unless connection
    
    start_time = Time.now
    
    begin
      data = socket.read_nonblock(4096)
      connection[:request_buffer] << data
      connection[:bytes_read] += data.length
      connection[:last_activity] = Time.now
      @metrics[:bytes_processed] += data.length
      
      # Process complete HTTP requests
      while request = extract_http_request(connection[:request_buffer])
        response = process_http_request(request)
        connection[:response_queue] << response
      end
      
    rescue IO::WaitReadable
      # Socket not ready, will try again later
    rescue EOFError
      close_connection(socket.object_id)
    rescue StandardError => e
      record_error(e)
      close_connection(socket.object_id)
    ensure
      response_time = Time.now - start_time
      @metrics[:response_times].add(response_time)
    end
  end

  def handle_write(socket)
    connection = @connections[socket.object_id]
    return unless connection
    
    return if connection[:response_queue].empty?
    
    begin
      response = connection[:response_queue].first
      written = socket.write_nonblock(response)
      connection[:bytes_written] += written
      connection[:last_activity] = Time.now
      
      if written == response.length
        connection[:response_queue].shift
        # Close connection after response (HTTP/1.0 style)
        close_connection(socket.object_id) if connection[:response_queue].empty?
      else
        connection[:response_queue][0] = response[written..-1]
      end
      
    rescue IO::WaitWritable
      # Socket not ready for writing
    rescue StandardError => e
      record_error(e)
      close_connection(socket.object_id)
    end
  end

  def extract_http_request(buffer)
    end_of_headers = buffer.index("\r\n\r\n")
    return nil unless end_of_headers
    
    request = buffer[0..(end_of_headers + 3)]
    buffer.replace(buffer[(end_of_headers + 4)..-1] || '')
    request
  end

  def process_http_request(request)
    # Simple HTTP response
    body = "Hello, World! Time: #{Time.now}\n"
    "HTTP/1.1 200 OK\r\n" \
    "Content-Length: #{body.length}\r\n" \
    "Content-Type: text/plain\r\n" \
    "Connection: close\r\n" \
    "\r\n" \
    "#{body}"
  end

  def close_connection(connection_id)
    connection = @connections.delete(connection_id)
    return unless connection
    
    connection[:socket].close rescue nil
    @connection_count -= 1
    @metrics[:active_connections] = @connection_count
  end

  def start_cleanup_thread
    Thread.new do
      loop do
        sleep 30  # Run cleanup every 30 seconds
        cleanup_stale_connections
      end
    end
  end

  def cleanup_stale_connections
    now = Time.now
    stale_connections = @connections.select do |_, conn|
      (now - conn[:last_activity]) > CONNECTION_TIMEOUT
    end
    
    stale_connections.each do |connection_id, _|
      puts "Closing stale connection: #{connection_id}"
      close_connection(connection_id)
    end
  end

  def start_metrics_thread
    Thread.new do
      loop do
        sleep 60  # Log metrics every minute
        log_metrics
      end
    end
  end

  def log_metrics
    avg_response_time = @metrics[:response_times].average
    errors_last_minute = @metrics[:errors_per_minute].sum
    
    puts "[METRICS] Active: #{@metrics[:active_connections]}, " \
         "Total: #{@metrics[:total_connections]}, " \
         "Avg Response: #{avg_response_time&.round(4)}s, " \
         "Errors/min: #{errors_last_minute}, " \
         "Bytes: #{@metrics[:bytes_processed]}"
  end

  def record_error(error)
    @metrics[:errors_per_minute].add(1)
    puts "[ERROR] #{error.class}: #{error.message}"
  end

  def shutdown_gracefully
    puts "Shutting down server..."
    
    # Close all client connections
    @connections.each { |id, _| close_connection(id) }
    
    # Close server socket
    @server.close rescue nil
    
    exit(0)
  end
end

class CircularBuffer
  def initialize(size)
    @size = size
    @buffer = []
    @index = 0
  end

  def add(value)
    if @buffer.length < @size
      @buffer << value
    else
      @buffer[@index] = value
      @index = (@index + 1) % @size
    end
  end

  def sum
    @buffer.sum
  end

  def average
    return nil if @buffer.empty?
    @buffer.sum.to_f / @buffer.length
  end
end

Load balancing and horizontal scaling patterns often distribute non-blocking I/O servers across multiple processes or machines. Process-based scaling avoids GIL limitations while maintaining the benefits of event-driven architecture:

class ScalableServerCluster
  def initialize(port, worker_count = nil)
    @port = port
    @worker_count = worker_count || [4, Process.nproc].min
    @workers = []
    @master_pid = Process.pid
  end

  def start
    puts "Starting server cluster with #{@worker_count} workers on port #{@port}"
    
    # Create shared server socket
    server = TCPServer.new('localhost', @port)
    server.listen(1024)  # Large listen backlog
    
    # Start worker processes
    @worker_count.times do |i|
      pid = fork do
        Process.setproctitle("worker-#{i}")
        worker = ProductionServer.new(nil)  # Pass nil since we have server socket
        worker.instance_variable_set(:@server, server)
        worker.run
      end
      
      @workers << { pid: pid, worker_id: i }
      puts "Started worker #{i} (PID: #{pid})"
    end
    
    # Master process monitoring
    setup_signal_handlers
    monitor_workers
  end

  private

  def setup_signal_handlers
    trap('INT') { shutdown_cluster }
    trap('TERM') { shutdown_cluster }
    trap('CHLD') { handle_worker_death }
  end

  def monitor_workers
    loop do
      sleep 5
      check_worker_health
    end
  end

  def check_worker_health
    @workers.each do |worker|
      unless process_exists?(worker[:pid])
        puts "Worker #{worker[:worker_id]} (PID: #{worker[:pid]}) died, restarting..."
        restart_worker(worker)
      end
    end
  end

  def restart_worker(dead_worker)
    server = TCPServer.new('localhost', @port)
    
    new_pid = fork do
      Process.setproctitle("worker-#{dead_worker[:worker_id]}")
      worker = ProductionServer.new(nil)
      worker.instance_variable_set(:@server, server)
      worker.run
    end
    
    dead_worker[:pid] = new_pid
    puts "Restarted worker #{dead_worker[:worker_id]} (new PID: #{new_pid})"
  end

  def process_exists?(pid)
    Process.getpgid(pid)
    true
  rescue Errno::ESRCH
    false
  end

  def handle_worker_death
    # Reap zombie processes
    loop do
      pid = Process.waitpid(-1, Process::WNOHANG)
      break unless pid
      puts "Reaped worker process #{pid}"
    end
  rescue Errno::ECHILD
    # No child processes
  end

  def shutdown_cluster
    puts "Shutting down cluster..."
    
    @workers.each do |worker|
      begin
        Process.kill('TERM', worker[:pid])
      rescue Errno::ESRCH
        # Process already dead
      end
    end
    
    # Wait for workers to shut down gracefully
    timeout = Time.now + 10
    while Time.now < timeout && @workers.any? { |w| process_exists?(w[:pid]) }
      sleep 0.1
    end
    
    # Force kill any remaining workers
    @workers.each do |worker|
      begin
        Process.kill('KILL', worker[:pid])
      rescue Errno::ESRCH
      end
    end
    
    exit(0)
  end
end

Reference

Core Methods

Method Parameters Returns Description
IO#read_nonblock(maxlen, outbuf=nil) maxlen (Integer), outbuf (String) String Read up to maxlen bytes without blocking
IO#write_nonblock(string) string (String) Integer Write string without blocking, returns bytes written
TCPServer#accept_nonblock(**opts) exception (Boolean) TCPSocket Accept connection without blocking
IO.select(read_ios, write_ios=[], error_ios=[], timeout=nil) Arrays of IO objects, timeout (Numeric) Array Monitor multiple IO objects for readiness
IO#fcntl(cmd, arg=0) cmd (Integer), arg (Integer) Integer Perform control operations on file descriptor

Exception Hierarchy

Exception Parent Description
IO::WaitReadable IOError Base class for read wait conditions
IO::WaitWritable IOError Base class for write wait conditions
IO::EAGAINWaitReadable IO::WaitReadable, Errno::EAGAIN No data available for reading
IO::EAGAINWaitWritable IO::WaitWritable, Errno::EAGAIN Cannot write data immediately
IO::EWOULDBLOCKWaitReadable IO::WaitReadable, Errno::EWOULDBLOCK Operation would block (reading)
IO::EWOULDBLOCKWaitWritable IO::WaitWritable, Errno::EWOULDBLOCK Operation would block (writing)

File Control Constants

Constant Value Description
Fcntl::F_GETFL Platform-specific Get file descriptor flags
Fcntl::F_SETFL Platform-specific Set file descriptor flags
Fcntl::O_NONBLOCK Platform-specific Non-blocking I/O flag
Fcntl::O_RDONLY 0 Read-only access mode
Fcntl::O_WRONLY 1 Write-only access mode
Fcntl::O_RDWR 2 Read-write access mode

Socket Options

Method Parameters Description
Socket#setsockopt(level, optname, optval) level (Integer), optname (Integer), optval Set socket option
Socket::SOL_SOCKET - Socket level for generic socket options
Socket::IPPROTO_TCP - TCP protocol level
Socket::TCP_NODELAY - Disable Nagle's algorithm
Socket::SO_KEEPALIVE - Enable keep-alive messages

IO.select Return Values

Position Content Description
[0] Array<IO> or nil IO objects ready for reading
[1] Array<IO> or nil IO objects ready for writing
[2] Array<IO> or nil IO objects with exceptional conditions

Common Error Codes

Error Errno Common Cause
EAGAIN 11 Resource temporarily unavailable
EWOULDBLOCK 11 Operation would block (usually same as EAGAIN)
ECONNRESET 104 Connection reset by peer
EPIPE 32 Broken pipe (write to closed socket)
EBADF 9 Bad file descriptor
EINTR 4 Interrupted system call

Buffer Size Guidelines

Operation Type Recommended Size Rationale
Socket read 4096-8192 bytes Balance between system calls and memory usage
File read 64KB-1MB Larger buffers reduce system call overhead
Socket write 4096-65536 bytes Avoid partial writes while limiting memory
Pipe operations 4096 bytes Match typical pipe buffer size

Performance Considerations

Scenario Blocking I/O Non-blocking I/O Recommendation
< 100 connections Lower CPU overhead Higher CPU overhead Use blocking I/O
100-1000 connections High memory usage Moderate memory usage Consider non-blocking
> 1000 connections Excessive memory/threads Efficient resource usage Use non-blocking I/O
CPU-bound tasks Better thread utilization Single-threaded bottleneck Use blocking with threads
I/O-bound tasks Thread context switching Efficient event loop Use non-blocking I/O