CrackedRuby logo

CrackedRuby

Fiber Scheduler with IO Operations

Fiber Scheduler with IO Operations enables non-blocking I/O through fiber-based cooperative concurrency in Ruby applications.

Concurrency and Parallelism Fibers
6.3.6

Overview

Ruby's Fiber Scheduler provides a mechanism for implementing non-blocking I/O operations using fibers instead of threads. The scheduler intercepts blocking I/O calls and yields control to other fibers when operations would block, allowing for cooperative concurrency without the overhead of thread switching.

The Fiber::Scheduler class serves as the base interface for implementing custom schedulers. When a scheduler is active, Ruby automatically hooks into I/O operations like socket reads, file access, and sleep calls, delegating the actual scheduling to the scheduler implementation.

require 'async'

Async do |task|
  task.async do
    puts "Starting first operation"
    sleep 1
    puts "First operation complete"
  end
  
  task.async do
    puts "Starting second operation"  
    sleep 1
    puts "Second operation complete"
  end
end

The scheduler manages fiber execution by responding to blocking operations. When a fiber encounters a potentially blocking call, the scheduler can either handle it asynchronously or allow it to block. Common scheduler implementations include async, nio4r-based schedulers, and event-driven systems.

Ruby integrates scheduler hooks directly into core I/O methods including IO#read, IO#write, Socket#accept, Kernel#sleep, and DNS resolution methods. The scheduler receives callbacks with operation details and can choose how to handle each blocking scenario.

class BasicScheduler
  def initialize
    @blocked = {}
    @ready = []
  end
  
  def block(blocker, timeout = nil)
    @blocked[Fiber.current] = blocker
    Fiber.yield
  end
  
  def unblock(blocker, fiber)
    @blocked.delete(fiber)
    @ready << fiber
  end
  
  def run
    until @ready.empty? && @blocked.empty?
      @ready.shift&.resume while @ready.any?
      # Process I/O events here
    end
  end
end

Schedulers must implement specific interface methods to handle different I/O scenarios. The io_wait method handles socket and file descriptor waiting, kernel_sleep manages time-based delays, and address_resolve handles DNS lookups. Each method receives relevant parameters and should return control appropriately.

Basic Usage

Setting up a fiber scheduler requires choosing an implementation and establishing the scheduler context. The most common approach uses the async gem, which provides a production-ready scheduler with comprehensive I/O handling.

require 'async'
require 'async/http/internet'

Async do
  internet = Async::HTTP::Internet.new
  
  response = internet.get("https://httpbin.org/delay/2")
  puts "Received: #{response.read}"
ensure
  internet&.close
end

Custom scheduler implementations inherit from Fiber::Scheduler and override specific methods based on the I/O operations they need to support. The scheduler activates when set as the current fiber's scheduler using Fiber.set_scheduler.

class SimpleScheduler < Fiber::Scheduler
  def initialize
    @selectables = {}
  end
  
  def io_wait(io, events, timeout)
    @selectables[io] = events
    Fiber.yield
  end
  
  def kernel_sleep(duration)
    # Handle sleep without blocking other fibers
    Thread.new { sleep(duration); Fiber.current.resume }
    Fiber.yield
  end
  
  def run
    loop do
      ready, = IO.select(@selectables.keys, nil, nil, 0.1)
      ready&.each do |io|
        fiber = @selectables.delete(io)
        fiber.resume if fiber
      end
      break if @selectables.empty?
    end
  end
end

Multiple fibers can run concurrently within a single scheduler context. Each fiber operates independently, yielding control when encountering blocking operations. The scheduler coordinates execution and ensures fair distribution of processing time.

scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)

fibers = 5.times.map do |i|
  Fiber.new do
    puts "Fiber #{i} starting"
    sleep(rand(0.5..2.0))
    puts "Fiber #{i} completed"
  end
end

fibers.each(&:resume)
scheduler.run

File operations integrate seamlessly with fiber schedulers. Reading from multiple files simultaneously becomes non-blocking when proper scheduler hooks are in place. The scheduler manages file descriptor readiness and resumes fibers when data becomes available.

Async do |task|
  files = %w[file1.txt file2.txt file3.txt]
  
  files.each do |filename|
    task.async do
      content = File.read(filename)
      puts "#{filename}: #{content.length} bytes"
    rescue Errno::ENOENT
      puts "#{filename}: not found"
    end
  end
end

Network operations benefit significantly from scheduler-based concurrency. Socket connections, HTTP requests, and database queries can run simultaneously without thread overhead. Each operation yields appropriately when waiting for network responses.

Thread Safety & Concurrency

Fiber schedulers operate within a single thread, eliminating many traditional thread safety concerns. However, shared state between fibers still requires careful consideration since fiber switching can occur at yield points during I/O operations.

Scheduler implementations must handle concurrent access to internal data structures. Multiple fibers may simultaneously register I/O interests or modify scheduler state. Using thread-safe collections or proper synchronization prevents data corruption.

class ThreadSafeScheduler < Fiber::Scheduler
  def initialize
    @io_map = Concurrent::Map.new
    @timers = Concurrent::Array.new
    @mutex = Mutex.new
  end
  
  def io_wait(io, events, timeout)
    @mutex.synchronize do
      @io_map[io] = { fiber: Fiber.current, events: events }
    end
    Fiber.yield
  end
  
  def process_ready_ios
    ready_ios = []
    @mutex.synchronize do
      @io_map.each { |io, data| ready_ios << [io, data] }
    end
    
    ready_ios.each do |io, data|
      data[:fiber].resume if io_ready?(io, data[:events])
    end
  end
end

Cross-fiber communication requires careful synchronization mechanisms. Fiber-safe queues, channels, or semaphores ensure proper message passing without race conditions. Direct fiber-to-fiber communication should avoid shared mutable state.

class FiberChannel
  def initialize
    @queue = Queue.new
    @waiting_readers = []
  end
  
  def send(message)
    if @waiting_readers.any?
      fiber = @waiting_readers.shift
      fiber.resume(message)
    else
      @queue << message
    end
  end
  
  def receive
    return @queue.pop(true) unless @queue.empty?
    
    @waiting_readers << Fiber.current
    Fiber.yield
  end
end

Scheduler lifecycle management becomes critical in concurrent scenarios. Proper cleanup prevents resource leaks and ensures graceful shutdown. Schedulers should track active fibers and provide mechanisms for orderly termination.

class ManagedScheduler < Fiber::Scheduler
  def initialize
    @active_fibers = Set.new
    @shutdown_requested = false
  end
  
  def fiber_create(&block)
    fiber = Fiber.new do
      @active_fibers.add(Fiber.current)
      begin
        yield
      ensure
        @active_fibers.delete(Fiber.current)
      end
    end
    fiber.resume
  end
  
  def shutdown
    @shutdown_requested = true
    @active_fibers.each(&:kill)
    @active_fibers.clear
  end
  
  def run
    until @shutdown_requested && @active_fibers.empty?
      process_events
      break if @shutdown_requested && @active_fibers.empty?
    end
  end
end

Database connection pooling requires special attention in fiber-scheduled environments. Connection pools designed for threads may not work correctly with fiber switching. Fiber-aware connection management ensures proper resource allocation and prevents connection leaks.

class FiberConnectionPool
  def initialize(size = 10)
    @pool = Queue.new
    @waiting_fibers = Queue.new
    
    size.times { @pool << create_connection }
  end
  
  def acquire
    return @pool.pop(true) unless @pool.empty?
    
    @waiting_fibers << Fiber.current
    Fiber.yield
  end
  
  def release(connection)
    if @waiting_fibers.empty?
      @pool << connection
    else
      fiber = @waiting_fibers.pop(true)
      fiber.resume(connection)
    end
  rescue ThreadError
    @pool << connection
  end
end

Performance & Memory

Fiber schedulers provide significant performance advantages over thread-based concurrency for I/O-intensive applications. Context switching between fibers incurs minimal overhead compared to thread switching, allowing applications to handle thousands of concurrent operations efficiently.

Memory consumption scales favorably with fiber-based scheduling. Each fiber requires approximately 4KB of stack space compared to 1-2MB per thread. This difference becomes substantial when managing hundreds or thousands of concurrent connections.

require 'benchmark'
require 'async'

def measure_memory_usage
  before = `ps -o rss= -p #{Process.pid}`.to_i
  yield
  after = `ps -o rss= -p #{Process.pid}`.to_i
  after - before
end

# Fiber-based approach
fiber_memory = measure_memory_usage do
  Async do |task|
    1000.times do |i|
      task.async do
        sleep 0.1
        # Simulate I/O work
      end
    end
  end
end

puts "Fiber approach: #{fiber_memory}KB"

CPU utilization patterns differ significantly between threaded and fiber-based approaches. Schedulers maintain active processing on a single thread, avoiding context switching overhead but requiring efficient event loop implementations to prevent blocking.

Performance optimization focuses on minimizing scheduler overhead and maximizing I/O throughput. Efficient event polling, batch processing of ready operations, and careful memory management in scheduler data structures directly impact application performance.

class OptimizedScheduler < Fiber::Scheduler
  BATCH_SIZE = 100
  
  def initialize
    @io_selector = IO::Select.new
    @ready_queue = []
    @batch_buffer = []
  end
  
  def process_events
    # Process events in batches to reduce overhead
    ready_count = @io_selector.select(@batch_buffer, 0.01)
    return unless ready_count > 0
    
    @batch_buffer.take(ready_count).each do |io|
      fiber = @io_map.delete(io)
      @ready_queue << fiber if fiber
    end
    
    process_ready_fibers
  end
  
  def process_ready_fibers
    processed = 0
    while fiber = @ready_queue.shift
      fiber.resume
      processed += 1
      break if processed >= BATCH_SIZE
    end
  end
end

Profiling fiber-scheduled applications requires specialized tools and techniques. Traditional thread-based profilers may not accurately represent fiber execution patterns. Custom instrumentation and fiber-aware profiling help identify performance bottlenecks.

class InstrumentedScheduler < Fiber::Scheduler
  def initialize
    super
    @stats = {
      io_waits: 0,
      sleeps: 0,
      fiber_switches: 0,
      total_wait_time: 0
    }
  end
  
  def io_wait(io, events, timeout)
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    @stats[:io_waits] += 1
    
    result = super
    
    wait_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    @stats[:total_wait_time] += wait_time
    result
  end
  
  def report_stats
    puts "Scheduler Statistics:"
    @stats.each { |key, value| puts "  #{key}: #{value}" }
  end
end

Memory leak prevention requires careful attention to fiber lifecycle management and resource cleanup. Long-running schedulers must properly dispose of completed fiber contexts and avoid retaining references to finished operations.

Production Patterns

Production deployments of fiber schedulers require robust error handling and recovery mechanisms. Failed fibers should not crash the entire scheduler, and proper logging helps diagnose issues in concurrent environments.

class ProductionScheduler < Fiber::Scheduler
  def initialize(logger: Logger.new(STDOUT))
    super()
    @logger = logger
    @error_handler = method(:default_error_handler)
  end
  
  def fiber_create(&block)
    Fiber.new do
      begin
        yield
      rescue => error
        @error_handler.call(error, Fiber.current)
      end
    end
  end
  
  def default_error_handler(error, fiber)
    @logger.error "Fiber error: #{error.message}"
    @logger.error error.backtrace.join("\n")
    # Optionally restart the fiber or perform cleanup
  end
  
  def set_error_handler(&block)
    @error_handler = block
  end
end

Monitoring fiber scheduler performance in production requires custom metrics and observability tools. Key metrics include active fiber count, I/O wait times, scheduler loop iteration frequency, and error rates.

class MonitoredScheduler < Fiber::Scheduler
  def initialize(metrics_reporter)
    super()
    @metrics = metrics_reporter
    @start_time = Time.now
  end
  
  def run
    loop_count = 0
    
    loop do
      iteration_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      
      process_events
      loop_count += 1
      
      if loop_count % 1000 == 0
        @metrics.gauge('scheduler.active_fibers', @active_fibers.size)
        @metrics.gauge('scheduler.loop_frequency', loop_count / (Time.now - @start_time))
      end
      
      iteration_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - iteration_start
      @metrics.histogram('scheduler.iteration_time', iteration_time)
      
      break if should_shutdown?
    end
  end
end

Integration with application servers like Puma or Unicorn requires careful coordination between server processes and fiber schedulers. Each worker process should manage its own scheduler instance to avoid cross-process interference.

# config.ru
class Application
  def initialize
    @scheduler = AsyncScheduler.new
  end
  
  def call(env)
    Fiber.set_scheduler(@scheduler)
    
    response = Fiber.new do
      # Process request with fiber scheduling
      handle_request(env)
    end.resume
    
    response
  ensure
    Fiber.set_scheduler(nil)
  end
  
  private
  
  def handle_request(env)
    case env['PATH_INFO']
    when '/slow'
      # Simulate slow I/O operation
      sleep 2
      [200, {}, ['Slow response']]
    else
      [200, {}, ['Fast response']]
    end
  end
end

run Application.new

Health checks and readiness probes must account for scheduler state and fiber execution. Standard HTTP health endpoints should verify scheduler responsiveness and detect deadlock conditions.

class HealthCheckScheduler < Fiber::Scheduler
  def initialize
    super
    @last_activity = Time.now
    @health_check_fiber = nil
  end
  
  def start_health_monitoring
    @health_check_fiber = Fiber.new do
      loop do
        sleep 10
        check_scheduler_health
      end
    end.resume
  end
  
  def check_scheduler_health
    time_since_activity = Time.now - @last_activity
    
    if time_since_activity > 30
      Rails.logger.warn "Scheduler inactive for #{time_since_activity}s"
      # Trigger alerting or restart logic
    end
    
    @last_activity = Time.now
  end
  
  def healthy?
    return false unless @health_check_fiber&.alive?
    
    (Time.now - @last_activity) < 60
  end
end

Database connection management in production requires special considerations for connection pooling, transaction handling, and connection health monitoring within fiber contexts.

class DatabaseScheduler < Fiber::Scheduler
  def initialize(pool_size = 20)
    super()
    @connection_pool = ConnectionPool.new(size: pool_size) do
      PG.connect(host: 'localhost', dbname: 'app_production')
    end
  end
  
  def with_connection
    connection = @connection_pool.checkout
    
    # Ensure connection is healthy
    connection.exec('SELECT 1') rescue reconnect(connection)
    
    yield connection
  ensure
    @connection_pool.checkin(connection) if connection
  end
  
  private
  
  def reconnect(connection)
    connection.reset rescue nil
    connection = PG.connect(host: 'localhost', dbname: 'app_production')
  end
end

Reference

Core Scheduler Methods

Method Parameters Returns Description
#io_wait(io, events, timeout) io (IO), events (Integer), timeout (Numeric) Integer or nil Blocks until IO ready for specified events
#kernel_sleep(duration) duration (Numeric) nil Non-blocking sleep for specified duration
#block(blocker, timeout) blocker (Object), timeout (Numeric) nil Generic blocking operation handler
#unblock(blocker, fiber) blocker (Object), fiber (Fiber) nil Resume blocked fiber
#address_resolve(hostname) hostname (String) Array Non-blocking DNS resolution
#timeout_after(duration, &block) duration (Numeric), block (Proc) Object Execute block with timeout

Fiber Scheduler Class Methods

Method Parameters Returns Description
Fiber.set_scheduler(scheduler) scheduler (Fiber::Scheduler) Fiber::Scheduler Set scheduler for current fiber
Fiber.scheduler None Fiber::Scheduler or nil Get current fiber scheduler
Fiber.blocking? None Boolean Check if scheduler allows blocking

Event Constants

Constant Value Description
IO::READABLE 1 IO ready for reading
IO::WRITABLE 2 IO ready for writing
IO::PRIORITY 4 Priority data available

Common Scheduler Implementations

Library Class Features
async Async::Scheduler Full-featured production scheduler
nio4r NIO::Selector Event-driven I/O selector
evt Evt::Scheduler High-performance event scheduler

Error Classes

Exception Inherits From Description
FiberError StandardError General fiber operation errors
Fiber::SchedulerError FiberError Scheduler-specific errors
IO::TimeoutError IOError I/O operation timeout

Performance Benchmarks

Operation Threads (1000) Fibers (1000) Memory Difference
Creation ~2GB RAM ~50MB RAM 97% reduction
Context Switch ~10μs ~0.1μs 100x faster
I/O Wait Blocking Non-blocking N/A

Configuration Options

Setting Default Range Description
Stack Size 4KB 4KB-1MB Fiber stack allocation
Pool Size CPU cores 1-1000 Connection pool size
Timeout 30s 1s-300s Default I/O timeout

Debugging Commands

Method Purpose Example
Fiber.current Get current fiber puts Fiber.current.inspect
#backtrace Fiber stack trace fiber.backtrace
#alive? Check fiber status fiber.alive?
#status Fiber state fiber.status

Integration Patterns

Framework Integration Method Configuration
Rails Middleware/Initializer config.fiber_scheduler = true
Sidekiq Custom server middleware Override job execution
Puma Server configuration Multi-threaded with fiber schedulers
Sinatra Before filter Set scheduler per request