CrackedRuby logo

CrackedRuby

Fiber Scheduling

A complete guide to implementing and managing fiber scheduling in Ruby for concurrent and non-blocking I/O operations.

Concurrency and Parallelism Fibers
6.3.2

Overview

Fiber scheduling in Ruby provides a mechanism for implementing cooperative multitasking and non-blocking I/O operations. The Fiber::Scheduler interface allows custom schedulers to manage fiber execution, enabling applications to handle thousands of concurrent operations without traditional threading overhead.

Ruby's fiber scheduler operates through a hook-based system where the runtime delegates blocking operations to scheduler implementations. When a fiber encounters a blocking operation like network I/O, the scheduler can suspend the fiber and resume it when the operation completes, allowing other fibers to execute in the meantime.

The core components include the Fiber::Scheduler class for implementing custom schedulers, Fiber.set_scheduler for registering schedulers, and various scheduler hooks that Ruby calls during blocking operations. The scheduler interface defines methods for handling different types of blocking operations including I/O, timeouts, and process operations.

# Basic fiber scheduler setup
class BasicScheduler
  def initialize
    @ios = {}
    @timeouts = []
  end

  def io_wait(io, events, timeout = nil)
    # Handle I/O waiting logic
    puts "Waiting for I/O on #{io} with events #{events}"
  end

  def kernel_sleep(duration)
    # Handle sleep operations
    puts "Sleeping for #{duration} seconds"
  end
end

Fiber.set_scheduler(BasicScheduler.new)

Ruby checks for an active scheduler during blocking operations and delegates control when one exists. This delegation happens transparently, requiring no changes to existing code that performs blocking I/O operations.

# Scheduler automatically handles blocking operations
Fiber.schedule do
  Net::HTTP.get(URI('https://example.com'))  # Non-blocking with scheduler
  sleep(1)  # Non-blocking with scheduler
end

The scheduler interface supports various operation types including file I/O, network operations, process management, and time-based delays. Each operation type has corresponding scheduler methods that implementations can override to provide custom behavior.

Basic Usage

Implementing a fiber scheduler requires defining a class that responds to specific scheduler hook methods. Ruby calls these methods when fibers encounter blocking operations, allowing the scheduler to manage execution flow.

require 'fiber'
require 'io/wait'

class SimpleScheduler
  def initialize
    @ready_fibers = []
    @ios = {}
    @sleeping = []
  end

  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    @ios[io] = { fiber: fiber, events: events }
    
    # Suspend current fiber
    Fiber.yield
    
    # Cleanup when resumed
    @ios.delete(io)
  end

  def kernel_sleep(duration)
    fiber = Fiber.current
    wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
    @sleeping << { fiber: fiber, wake_time: wake_time }
    
    Fiber.yield
  end

  def run_loop
    until @ready_fibers.empty? && @ios.empty? && @sleeping.empty?
      # Resume ready fibers
      @ready_fibers.shift&.resume until @ready_fibers.empty?
      
      # Check for ready I/O operations
      ready_ios = IO.select(@ios.keys, [], [], 0)
      if ready_ios
        ready_ios[0].each do |io|
          fiber_info = @ios.delete(io)
          @ready_fibers << fiber_info[:fiber] if fiber_info
        end
      end
      
      # Wake sleeping fibers
      current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
      @sleeping.select! do |sleep_info|
        if current_time >= sleep_info[:wake_time]
          @ready_fibers << sleep_info[:fiber]
          false
        else
          true
        end
      end
    end
  end
end

Setting up a scheduler requires calling Fiber.set_scheduler with a scheduler instance. Once set, all blocking operations within scheduled fibers delegate to the scheduler automatically.

scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)

# Create scheduled fibers
3.times do |i|
  Fiber.schedule do
    puts "Fiber #{i} starting"
    sleep(rand(1..3))
    puts "Fiber #{i} finished"
  end
end

# Run the scheduler loop
scheduler.run_loop

The Fiber.schedule method creates fibers that automatically use the active scheduler. These fibers start immediately but yield control when encountering blocking operations, allowing the scheduler to manage execution.

Schedulers handle multiple concurrent operations by maintaining state about suspended fibers and the conditions that will wake them. The scheduler's run loop continually checks for completed operations and resumes the appropriate fibers.

# Multiple concurrent HTTP requests
require 'net/http'

urls = [
  'https://httpbin.org/delay/1',
  'https://httpbin.org/delay/2', 
  'https://httpbin.org/delay/3'
]

results = []

urls.each_with_index do |url, index|
  Fiber.schedule do
    start_time = Time.now
    response = Net::HTTP.get_response(URI(url))
    duration = Time.now - start_time
    results << { index: index, status: response.code, duration: duration }
  end
end

scheduler.run_loop
puts results.inspect

Thread Safety & Concurrency

Fiber schedulers operate within a single thread context, eliminating many traditional threading concerns while introducing scheduler-specific synchronization requirements. The scheduler manages cooperative multitasking where fibers voluntarily yield control, requiring careful handling of shared state.

Scheduler implementations must handle concurrent access to internal data structures safely. Multiple fibers may simultaneously invoke scheduler methods, requiring synchronization mechanisms to prevent race conditions in scheduler state.

class ThreadSafeScheduler
  def initialize
    @mutex = Mutex.new
    @ios = {}
    @sleeping = []
    @ready_fibers = Queue.new
  end

  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    
    @mutex.synchronize do
      @ios[io] = { fiber: fiber, events: events, timeout: timeout }
    end
    
    Fiber.yield
    
    @mutex.synchronize do
      @ios.delete(io)
    end
  end

  def kernel_sleep(duration)
    fiber = Fiber.current
    wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
    
    @mutex.synchronize do
      @sleeping << { fiber: fiber, wake_time: wake_time }
    end
    
    Fiber.yield
  end

  def schedule_fiber(fiber)
    @ready_fibers << fiber
  end

  def run_loop
    loop do
      # Process ready fibers
      begin
        while fiber = @ready_fibers.pop(non_block: true)
          fiber.resume if fiber.alive?
        end
      rescue ThreadError
        # Queue empty
      end

      # Check I/O and timeouts under lock
      @mutex.synchronize do
        check_ios
        check_timeouts
        break if @ios.empty? && @sleeping.empty? && @ready_fibers.empty?
      end

      sleep(0.001)  # Small delay to prevent busy waiting
    end
  end

  private

  def check_ios
    return if @ios.empty?
    
    ready_ios = IO.select(@ios.keys, [], [], 0)
    return unless ready_ios

    ready_ios[0].each do |io|
      fiber_info = @ios.delete(io)
      @ready_fibers << fiber_info[:fiber] if fiber_info
    end
  end

  def check_timeouts
    current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    
    @sleeping.select! do |sleep_info|
      if current_time >= sleep_info[:wake_time]
        @ready_fibers << sleep_info[:fiber]
        false
      else
        true
      end
    end
  end
end

Fiber scheduling differs from preemptive threading because fibers yield control voluntarily. This cooperative model eliminates many race conditions but requires fibers to yield regularly to maintain system responsiveness.

Cross-fiber communication often requires explicit synchronization primitives. Shared variables between fibers need protection when the scheduler might context-switch during modifications.

# Fiber-safe shared counter
class FiberSafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end

  def increment
    @mutex.synchronize do
      @count += 1
    end
  end

  def value
    @mutex.synchronize { @count }
  end
end

counter = FiberSafeCounter.new

10.times do |i|
  Fiber.schedule do
    100.times do
      counter.increment
      Fiber.yield  # Voluntary yield point
    end
  end
end

scheduler.run_loop
puts "Final count: #{counter.value}"

Deadlock prevention requires careful ordering of scheduler operations and avoiding situations where fibers wait indefinitely for conditions that cannot be satisfied. Schedulers should implement timeout mechanisms and detect circular dependencies.

Resource cleanup becomes critical when fibers terminate unexpectedly. Schedulers must track allocated resources and ensure proper cleanup when fibers complete or encounter errors.

Performance & Memory

Fiber scheduling provides significant performance advantages over traditional threading for I/O-bound applications. Fibers consume approximately 4KB of memory per instance compared to thread stacks that typically require 1-8MB, enabling applications to handle tens of thousands of concurrent operations.

Context switching between fibers occurs entirely in user space, eliminating kernel overhead associated with thread switching. This reduction results in microsecond-level context switches compared to millisecond-level thread switches.

require 'benchmark'

# Memory comparison between threads and fibers
def memory_usage
  GC.start
  `ps -o rss= -p #{Process.pid}`.to_i
end

# Thread-based approach
def thread_benchmark(count)
  start_memory = memory_usage
  threads = []
  
  time = Benchmark.realtime do
    count.times do |i|
      threads << Thread.new do
        sleep(0.1)
        i * 2
      end
    end
    
    threads.each(&:join)
  end
  
  end_memory = memory_usage
  { time: time, memory: end_memory - start_memory }
end

# Fiber-based approach
def fiber_benchmark(count)
  scheduler = SimpleScheduler.new
  Fiber.set_scheduler(scheduler)
  
  start_memory = memory_usage
  
  time = Benchmark.realtime do
    count.times do |i|
      Fiber.schedule do
        sleep(0.1)
        i * 2
      end
    end
    
    scheduler.run_loop
  end
  
  end_memory = memory_usage
  { time: time, memory: end_memory - start_memory }
end

# Compare performance with 1000 concurrent operations
thread_result = thread_benchmark(1000)
fiber_result = fiber_benchmark(1000)

puts "Threads: #{thread_result[:time]}s, #{thread_result[:memory]}KB memory"
puts "Fibers: #{fiber_result[:time]}s, #{fiber_result[:memory]}KB memory"

Scheduler efficiency depends heavily on the run loop implementation. Busy-waiting schedulers waste CPU cycles, while overly passive schedulers introduce latency. Optimal schedulers balance responsiveness with CPU efficiency.

# Efficient scheduler with adaptive polling
class PerformantScheduler
  def initialize
    @ios = {}
    @sleeping = []
    @ready_fibers = []
    @poll_timeout = 0.001
    @max_batch_size = 100
  end

  def run_loop
    loop do
      batch_processed = process_ready_fibers
      io_ready = check_io_operations
      timeouts_ready = check_sleeping_fibers
      
      # Adaptive polling - increase timeout when less active
      if batch_processed + io_ready + timeouts_ready == 0
        @poll_timeout = [@poll_timeout * 1.5, 0.1].min
        break if should_exit?
        sleep(@poll_timeout)
      else
        @poll_timeout = 0.001  # Reset to responsive polling
      end
    end
  end

  private

  def process_ready_fibers
    processed = 0
    batch_size = [@ready_fibers.size, @max_batch_size].min
    
    batch_size.times do
      fiber = @ready_fibers.shift
      if fiber&.alive?
        fiber.resume
        processed += 1
      end
    end
    
    processed
  end

  def check_io_operations
    return 0 if @ios.empty?
    
    ready_ios = IO.select(@ios.keys, [], [], 0)
    return 0 unless ready_ios
    
    ready_ios[0].each do |io|
      fiber_info = @ios.delete(io)
      @ready_fibers << fiber_info[:fiber] if fiber_info
    end
    
    ready_ios[0].size
  end

  def check_sleeping_fibers
    return 0 if @sleeping.empty?
    
    current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    woken = 0
    
    @sleeping.select! do |sleep_info|
      if current_time >= sleep_info[:wake_time]
        @ready_fibers << sleep_info[:fiber]
        woken += 1
        false
      else
        true
      end
    end
    
    woken
  end

  def should_exit?
    @ready_fibers.empty? && @ios.empty? && @sleeping.empty?
  end
end

Memory management in fiber schedulers requires attention to fiber lifecycle and resource cleanup. Long-running schedulers must prevent memory leaks from accumulated fiber state and ensure garbage collection of completed fibers.

Profiling fiber applications requires different techniques than traditional thread-based profiling. The cooperative nature of fibers means blocking operations yield immediately, affecting timing measurements and requiring scheduler-aware profiling tools.

Production Patterns

Production fiber scheduler implementations require robust error handling, monitoring capabilities, and integration with existing application frameworks. Real-world schedulers handle connection pooling, request routing, and graceful shutdown procedures.

Web applications benefit significantly from fiber scheduling when handling concurrent HTTP requests. The scheduler manages thousands of simultaneous connections without the memory overhead of thread-per-request architectures.

# Production-ready HTTP server with fiber scheduling
require 'socket'
require 'uri'

class FiberHTTPServer
  def initialize(host: 'localhost', port: 3000, max_connections: 10000)
    @host = host
    @port = port
    @max_connections = max_connections
    @server = TCPServer.new(@host, @port)
    @connections = Set.new
    @stats = { requests: 0, errors: 0, connections: 0 }
    @running = false
  end

  def start
    @running = true
    scheduler = ProductionScheduler.new
    Fiber.set_scheduler(scheduler)

    puts "Server starting on #{@host}:#{@port}"

    # Main accept loop
    Fiber.schedule do
      accept_loop
    end

    # Statistics reporting
    Fiber.schedule do
      stats_loop
    end

    scheduler.run_loop
  end

  def stop
    @running = false
    @server.close
  end

  private

  def accept_loop
    while @running
      begin
        client = @server.accept
        
        if @connections.size >= @max_connections
          client.close
          next
        end

        @connections << client
        @stats[:connections] += 1

        Fiber.schedule do
          handle_client(client)
        end
      rescue => e
        puts "Accept error: #{e.message}"
        @stats[:errors] += 1
      end
    end
  end

  def handle_client(client)
    begin
      request = parse_request(client)
      response = process_request(request)
      send_response(client, response)
      @stats[:requests] += 1
    rescue => e
      puts "Request error: #{e.message}"
      @stats[:errors] += 1
    ensure
      client.close
      @connections.delete(client)
    end
  end

  def parse_request(client)
    request_line = client.gets
    return nil unless request_line

    method, path, version = request_line.strip.split(' ')
    headers = {}

    while (line = client.gets.strip) && !line.empty?
      key, value = line.split(': ', 2)
      headers[key.downcase] = value
    end

    { method: method, path: path, headers: headers }
  end

  def process_request(request)
    case request[:path]
    when '/health'
      { status: 200, body: 'OK' }
    when '/stats'
      { status: 200, body: @stats.to_json, headers: { 'content-type' => 'application/json' } }
    when '/slow'
      sleep(1)  # Simulated slow operation
      { status: 200, body: 'Completed slow operation' }
    else
      { status: 404, body: 'Not Found' }
    end
  end

  def send_response(client, response)
    status_line = "HTTP/1.1 #{response[:status]} #{status_text(response[:status])}\r\n"
    headers = response[:headers] || {}
    headers['content-length'] = response[:body].bytesize.to_s

    client.write(status_line)
    headers.each { |key, value| client.write("#{key}: #{value}\r\n") }
    client.write("\r\n")
    client.write(response[:body])
  end

  def status_text(code)
    case code
    when 200 then 'OK'
    when 404 then 'Not Found'
    when 500 then 'Internal Server Error'
    else 'Unknown'
    end
  end

  def stats_loop
    while @running
      sleep(10)
      puts "Stats: #{@stats}"
    end
  end
end

class ProductionScheduler
  def initialize
    @ios = {}
    @sleeping = []
    @ready_fibers = []
    @shutdown = false
  end

  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    deadline = timeout ? Time.now + timeout : nil

    @ios[io] = { fiber: fiber, events: events, deadline: deadline }
    Fiber.yield

    @ios.delete(io)
  end

  def kernel_sleep(duration)
    fiber = Fiber.current
    wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration

    @sleeping << { fiber: fiber, wake_time: wake_time }
    Fiber.yield
  end

  def run_loop
    while !@shutdown || has_active_fibers?
      process_ready_fibers
      check_io_timeouts
      check_sleeping_fibers
      
      # Efficient I/O polling
      unless @ios.empty?
        ios = @ios.keys
        ready_ios = IO.select(ios, [], [], 0.001)
        
        if ready_ios
          ready_ios[0].each do |io|
            fiber_info = @ios.delete(io)
            @ready_fibers << fiber_info[:fiber] if fiber_info
          end
        end
      end

      sleep(0.001) if @ready_fibers.empty? && @ios.empty?
    end
  end

  def shutdown
    @shutdown = true
  end

  private

  def process_ready_fibers
    while fiber = @ready_fibers.shift
      fiber.resume if fiber.alive?
    end
  end

  def check_io_timeouts
    return if @ios.empty?

    current_time = Time.now
    
    @ios.each do |io, info|
      if info[:deadline] && current_time > info[:deadline]
        @ready_fibers << info[:fiber]
        @ios.delete(io)
      end
    end
  end

  def check_sleeping_fibers
    return if @sleeping.empty?

    current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    
    @sleeping.select! do |sleep_info|
      if current_time >= sleep_info[:wake_time]
        @ready_fibers << sleep_info[:fiber]
        false
      else
        true
      end
    end
  end

  def has_active_fibers?
    !@ready_fibers.empty? || !@ios.empty? || !@sleeping.empty?
  end
end

Database connection pooling with fiber schedulers requires special consideration. Traditional connection pools designed for threads may not work correctly with fiber scheduling due to different concurrency assumptions.

# Fiber-aware database connection pool
class FiberConnectionPool
  def initialize(size: 10, &connection_factory)
    @size = size
    @connection_factory = connection_factory
    @available = Queue.new
    @busy = Set.new
    @mutex = Mutex.new

    # Pre-populate pool
    @size.times do
      @available << @connection_factory.call
    end
  end

  def with_connection
    connection = acquire_connection
    
    begin
      yield connection
    ensure
      release_connection(connection)
    end
  end

  def stats
    @mutex.synchronize do
      {
        total: @size,
        available: @available.size,
        busy: @busy.size
      }
    end
  end

  private

  def acquire_connection
    connection = @available.pop  # Blocks until available
    
    @mutex.synchronize do
      @busy << connection
    end
    
    connection
  end

  def release_connection(connection)
    @mutex.synchronize do
      @busy.delete(connection)
    end
    
    @available << connection
  end
end

# Usage in fiber-scheduled application
db_pool = FiberConnectionPool.new(size: 20) do
  # Connection factory
  SomeDatabase.connect(host: 'localhost', port: 5432)
end

# Handle multiple concurrent database queries
100.times do |i|
  Fiber.schedule do
    db_pool.with_connection do |conn|
      result = conn.query("SELECT * FROM users WHERE id = #{i}")
      puts "Query #{i} returned #{result.count} rows"
    end
  end
end

Monitoring fiber schedulers requires custom metrics that track fiber lifecycle, scheduler efficiency, and resource utilization. Traditional thread-based monitoring tools provide limited insight into fiber behavior.

Reference

Fiber::Scheduler Interface

The Fiber::Scheduler class defines the interface that custom schedulers must implement. Ruby calls these methods during fiber execution when blocking operations occur.

Method Parameters Returns Description
#io_wait(io, events, timeout) io (IO), events (Integer), timeout (Numeric/nil) Integer/nil Handle I/O waiting operations
#kernel_sleep(duration) duration (Numeric) nil Handle sleep operations
#block(blocker, timeout) blocker (Object), timeout (Numeric/nil) Object Handle generic blocking operations
#unblock(blocker, fiber) blocker (Object), fiber (Fiber) nil Unblock specific fiber
#timeout_after(timeout, exception_class) timeout (Numeric), exception_class (Class) Object Handle timeout operations
#process_wait(pid, flags) pid (Integer), flags (Integer) Process::Status Handle process waiting
#io_select(readables, writables, exceptables, timeout) Arrays of IO objects, timeout (Numeric/nil) Array/nil Handle IO.select operations

Fiber Class Methods

Core methods for managing fiber scheduling and scheduler lifecycle.

Method Parameters Returns Description
Fiber.set_scheduler(scheduler) scheduler (Fiber::Scheduler/nil) Fiber::Scheduler/nil Set current thread's scheduler
Fiber.scheduler None Fiber::Scheduler/nil Get current thread's scheduler
Fiber.schedule(&block) block (Proc) Fiber Create and schedule new fiber
Fiber.blocking? None Boolean Check if current fiber is blocking

I/O Events Constants

Event types used with io_wait method to specify the type of I/O operation to wait for.

Constant Value Description
IO::READABLE 1 Wait for readable data
IO::WRITABLE 2 Wait for writable space
IO::PRIORITY 4 Wait for priority data

Scheduler Hook Methods

Optional methods that schedulers can implement to handle specific types of operations.

Method Purpose Required
#io_wait I/O operations Yes
#kernel_sleep Sleep operations Yes
#block Generic blocking No
#unblock Unblock operations No
#timeout_after Timeout handling No
#process_wait Process operations No
#io_select Select operations No

Error Handling

Common exceptions that may occur during fiber scheduling operations.

Exception Trigger Description
FiberError Invalid fiber operations Base fiber exception class
FiberError Dead fiber resume Attempting to resume terminated fiber
LocalJumpError Invalid yield Yielding outside fiber context
SystemCallError I/O failures Operating system I/O errors

Performance Characteristics

Typical performance metrics for fiber scheduling compared to threading.

Metric Fibers Threads Improvement
Memory per unit ~4KB ~1-8MB 250-2000x
Context switch time ~1μs ~1ms 1000x
Creation overhead ~1μs ~100μs 100x
Maximum concurrent ~100k+ ~1k-10k 10-100x

Scheduler State Management

Recommended data structures and patterns for implementing scheduler state tracking.

State Type Recommended Structure Purpose
Ready fibers Array or Queue Fibers ready to resume
Waiting I/O Hash (IO => fiber info) I/O wait tracking
Sleeping fibers Array of wake times Time-based delays
Blocked operations Hash (blocker => fiber) Generic blocking

Integration Patterns

Common integration approaches for different application types.

Application Type Scheduler Pattern Key Considerations
Web servers Event-driven with I/O polling Connection limits, request routing
Database apps Connection pool aware Pool sizing, query timeouts
Background jobs Queue-based processing Job scheduling, error recovery
API clients Concurrent request batching Rate limiting, timeout handling