CrackedRuby logo

CrackedRuby

Fiber Scheduler Interface

Ruby Fiber Scheduler Interface enables non-blocking I/O operations through fiber-based concurrency and pluggable scheduler implementations.

Concurrency and Parallelism Async I/O
6.6.4

Overview

The Fiber Scheduler Interface provides a standardized protocol for implementing non-blocking I/O operations in Ruby through fiber scheduling. Ruby delegates blocking operations to registered schedulers, which can implement asynchronous alternatives using event loops, thread pools, or other concurrency mechanisms.

The interface centers around Fiber.scheduler, which returns the currently active scheduler for the running fiber. Ruby automatically consults this scheduler when encountering potentially blocking operations like file I/O, network requests, or sleep calls. Schedulers implement specific hook methods that Ruby invokes instead of performing blocking operations directly.

require 'async'

Async do
  # Within this block, Async scheduler handles I/O
  puts "Scheduler: #{Fiber.scheduler.class}"
  # => Async::Scheduler
end

The scheduler interface defines several hook methods that custom schedulers can implement:

class BasicScheduler
  def io_wait(io, events, timeout = nil)
    # Handle I/O readiness
  end
  
  def kernel_sleep(duration)
    # Handle sleep operations
  end
  
  def block(blocker, timeout = nil)
    # Handle generic blocking
  end
end

Ruby's standard library automatically integrates with schedulers for operations including file reading, socket connections, HTTP requests, and process waiting. When a scheduler is active, these operations become non-blocking and yield control to other fibers.

Basic Usage

Setting a scheduler involves calling Fiber.set_scheduler within a fiber context. The scheduler remains active for that fiber and any child fibers created within it.

require 'async'

# Create and set scheduler
Async do |task|
  puts "Current scheduler: #{Fiber.scheduler}"
  
  # Non-blocking sleep
  sleep 0.1
  puts "Sleep completed without blocking thread"
  
  # Non-blocking file I/O
  File.read('example.txt')
  puts "File read completed"
end

The Async gem provides a complete scheduler implementation. Within an Async block, Ruby automatically uses the Async scheduler for supported operations:

require 'async'
require 'async/http/internet'

Async do
  internet = Async::HTTP::Internet.new
  
  # Multiple concurrent requests
  responses = 5.times.map do |i|
    Async do
      response = internet.get("https://httpbin.org/delay/1")
      puts "Request #{i} completed: #{response.status}"
      response
    end
  end
  
  # Wait for all responses
  responses.each(&:wait)
ensure
  internet&.close
end

Schedulers handle different types of blocking operations through specific hook methods. The io_wait method manages I/O readiness:

require 'socket'
require 'async'

Async do
  server = TCPServer.new('localhost', 3000)
  puts "Server listening on port 3000"
  
  loop do
    # Non-blocking accept
    Async do |task|
      socket = server.accept
      data = socket.read
      socket.write("HTTP/1.1 200 OK\r\n\r\nHello World")
      socket.close
    end
  end
end

Ruby checks for an active scheduler before performing blocking operations. Without a scheduler, operations block the current thread normally. With a scheduler, Ruby invokes the appropriate hook method and expects the scheduler to manage the operation asynchronously.

Advanced Usage

Custom scheduler implementations require defining the scheduler interface methods based on the specific concurrency model. Each method receives parameters describing the blocking operation and returns control when the operation completes.

class ThreadPoolScheduler
  def initialize(pool_size = 4)
    @pool = pool_size.times.map { Thread.new { worker_loop } }
    @queue = Queue.new
    @waiting = {}
  end
  
  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    @queue << [:io_wait, fiber, io, events, timeout]
    Fiber.yield
  end
  
  def kernel_sleep(duration)
    fiber = Fiber.current
    @queue << [:sleep, fiber, duration]
    Fiber.yield
  end
  
  def close
    @pool.size.times { @queue << [:shutdown] }
    @pool.each(&:join)
  end
  
  private
  
  def worker_loop
    while (task = @queue.pop)
      case task[0]
      when :io_wait
        handle_io_wait(*task[1..-1])
      when :sleep
        handle_sleep(*task[1..-1])
      when :shutdown
        break
      end
    end
  end
  
  def handle_io_wait(fiber, io, events, timeout)
    result = IO.select([io], [], [], timeout)
    schedule_fiber(fiber, result)
  end
  
  def handle_sleep(fiber, duration)
    sleep(duration)
    schedule_fiber(fiber, nil)
  end
  
  def schedule_fiber(fiber, result)
    fiber.transfer(result)
  end
end

The block method provides generic blocking support for operations not covered by other hooks:

class CustomScheduler
  def block(blocker, timeout = nil)
    case blocker
    when :pop
      # Handle queue pop operations
      handle_queue_pop(timeout)
    when :push  
      # Handle queue push operations
      handle_queue_push(timeout)
    else
      # Fallback to blocking
      Fiber.yield
    end
  end
  
  private
  
  def handle_queue_pop(timeout)
    # Custom implementation for queue operations
    fiber = Fiber.current
    # ... async queue handling
    Fiber.yield
  end
end

Nested schedulers allow different concurrency strategies within the same program. Child fibers inherit parent schedulers unless explicitly overridden:

require 'async'

class LoggingScheduler
  def initialize(base_scheduler)
    @base = base_scheduler
  end
  
  def io_wait(io, events, timeout = nil)
    puts "IO wait: #{io.inspect}, events: #{events}"
    @base.io_wait(io, events, timeout)
  end
  
  def kernel_sleep(duration)
    puts "Sleep: #{duration} seconds"
    @base.kernel_sleep(duration)
  end
end

Async do
  # Wrap existing scheduler
  logging_scheduler = LoggingScheduler.new(Fiber.scheduler)
  
  Fiber.new do
    Fiber.set_scheduler(logging_scheduler)
    sleep 0.1  # Logs sleep operation
    File.read('/etc/hosts')  # Logs I/O operation
  end.resume
end

Scheduler composition enables combining different scheduling strategies:

class CompositeScheduler
  def initialize(*schedulers)
    @schedulers = schedulers
    @current = 0
  end
  
  def io_wait(io, events, timeout = nil)
    scheduler = next_scheduler
    scheduler.io_wait(io, events, timeout)
  end
  
  def kernel_sleep(duration)
    scheduler = next_scheduler  
    scheduler.kernel_sleep(duration)
  end
  
  private
  
  def next_scheduler
    scheduler = @schedulers[@current]
    @current = (@current + 1) % @schedulers.size
    scheduler
  end
end

Thread Safety & Concurrency

Scheduler implementations must handle concurrent access from multiple fibers running within the same thread. While fibers execute cooperatively within a single thread, schedulers often interact with thread-safe data structures and external resources.

class ThreadSafeScheduler
  def initialize
    @pending_io = Concurrent::Map.new
    @timers = Concurrent::Map.new
    @mutex = Mutex.new
  end
  
  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    
    @mutex.synchronize do
      @pending_io[fiber] = {
        io: io,
        events: events, 
        timeout: timeout,
        started_at: Time.now
      }
    end
    
    # Schedule I/O monitoring
    schedule_io_check(fiber)
    Fiber.yield
  end
  
  def kernel_sleep(duration)
    fiber = Fiber.current
    wake_time = Time.now + duration
    
    @mutex.synchronize do
      @timers[fiber] = wake_time
    end
    
    schedule_timer_check(fiber, wake_time)
    Fiber.yield
  end
  
  private
  
  def schedule_io_check(fiber)
    Thread.new do
      info = @pending_io[fiber]
      return unless info
      
      result = IO.select([info[:io]], [], [], info[:timeout])
      
      @mutex.synchronize do
        @pending_io.delete(fiber)
      end
      
      # Resume fiber with result
      resume_fiber(fiber, result)
    end
  end
  
  def resume_fiber(fiber, result)
    # Ensure fiber resumes on original thread
    fiber.transfer(result)
  rescue FiberError
    # Fiber may have been terminated
  end
end

Cross-thread coordination requires careful handling when schedulers interact with background threads:

class EventLoopScheduler
  def initialize
    @event_loop = Thread.new { run_event_loop }
    @command_queue = Queue.new
    @responses = Concurrent::Map.new
  end
  
  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    request_id = SecureRandom.uuid
    
    # Send command to event loop thread
    @command_queue << {
      type: :io_wait,
      id: request_id,
      fiber: fiber,
      io: io,
      events: events,
      timeout: timeout
    }
    
    # Wait for response
    Fiber.yield
    
    # Retrieve result
    @responses.delete(request_id)
  end
  
  private
  
  def run_event_loop
    ios = {}
    timers = []
    
    loop do
      # Process commands from main thread
      while (command = @command_queue.pop(true) rescue nil)
        case command[:type]
        when :io_wait
          ios[command[:io]] = command
        when :shutdown
          return
        end
      end
      
      # Check I/O readiness
      ready = IO.select(ios.keys, [], [], 0.01)
      if ready && ready[0]
        ready[0].each do |io|
          command = ios.delete(io)
          fiber = command[:fiber]
          
          # Resume fiber on main thread
          @responses[command[:id]] = :ready
          fiber.transfer(:ready)
        end
      end
    end
  end
end

Fiber-local state requires special consideration in scheduler implementations. Each fiber maintains separate state, but schedulers often need to coordinate across fibers:

class StatefulScheduler
  def initialize
    @fiber_states = Concurrent::Map.new
  end
  
  def io_wait(io, events, timeout = nil)
    fiber = Fiber.current
    state = get_fiber_state(fiber)
    
    state[:pending_operations] << {
      type: :io_wait,
      io: io,
      events: events,
      started_at: Time.now
    }
    
    # Perform operation
    result = perform_io_wait(io, events, timeout)
    
    # Update state
    state[:completed_operations] += 1
    state[:pending_operations].pop
    
    result
  end
  
  private
  
  def get_fiber_state(fiber)
    @fiber_states.compute_if_absent(fiber) do
      {
        pending_operations: [],
        completed_operations: 0,
        created_at: Time.now
      }
    end
  end
end

Performance & Memory

Scheduler performance directly impacts application throughput since all blocking operations flow through scheduler methods. Efficient implementations minimize overhead and avoid unnecessary allocations.

require 'benchmark'
require 'async'

# Compare blocking vs non-blocking I/O
def benchmark_io_operations(count = 1000)
  file_content = "x" * 1024  # 1KB content
  
  # Blocking I/O
  blocking_time = Benchmark.realtime do
    count.times do |i|
      File.write("temp_#{i}.txt", file_content)
      File.read("temp_#{i}.txt")
      File.delete("temp_#{i}.txt")
    end
  end
  
  # Non-blocking I/O with scheduler
  nonblocking_time = Benchmark.realtime do
    Async do
      count.times do |i|
        Async do
          File.write("temp_async_#{i}.txt", file_content)
          File.read("temp_async_#{i}.txt") 
          File.delete("temp_async_#{i}.txt")
        end
      end
    end
  end
  
  puts "Blocking I/O: #{blocking_time.round(2)}s"
  puts "Non-blocking I/O: #{nonblocking_time.round(2)}s"
  puts "Speedup: #{(blocking_time / nonblocking_time).round(2)}x"
end

benchmark_io_operations(100)
# Blocking I/O: 0.45s
# Non-blocking I/O: 0.12s  
# Speedup: 3.75x

Memory usage patterns differ significantly between scheduler implementations. Event loop schedulers typically maintain minimal per-operation overhead:

require 'objspace'

class MemoryEfficientScheduler
  def initialize
    @io_operations = {}  # Reuse hash
    @sleep_operations = {}
    @object_pool = []  # Reuse operation objects
  end
  
  def io_wait(io, events, timeout = nil)
    # Reuse operation object to reduce allocations
    operation = @object_pool.pop || {}
    operation[:io] = io
    operation[:events] = events
    operation[:timeout] = timeout
    operation[:fiber] = Fiber.current
    
    @io_operations[io] = operation
    
    result = wait_for_io(operation)
    
    # Return object to pool
    operation.clear
    @object_pool << operation if @object_pool.size < 100
    
    result
  end
  
  def kernel_sleep(duration)
    # Minimize object allocation
    wake_time = Time.now.to_f + duration
    fiber = Fiber.current
    
    @sleep_operations[fiber] = wake_time
    
    # Yield until wake time
    while Time.now.to_f < wake_time
      Fiber.yield
    end
    
    @sleep_operations.delete(fiber)
  end
  
  private
  
  def wait_for_io(operation)
    # Efficient I/O waiting implementation
    ready = IO.select([operation[:io]], [], [], operation[:timeout])
    ready ? :ready : :timeout
  end
end

# Measure memory usage
def measure_scheduler_memory
  GC.disable
  
  start_objects = ObjectSpace.count_objects[:TOTAL]
  
  scheduler = MemoryEfficientScheduler.new
  Fiber.set_scheduler(scheduler)
  
  # Perform operations
  1000.times do
    sleep 0.001
  end
  
  end_objects = ObjectSpace.count_objects[:TOTAL]
  
  puts "Objects created: #{end_objects - start_objects}"
ensure
  GC.enable
end

Scheduler selection significantly impacts performance characteristics. Different schedulers excel in different scenarios:

def compare_scheduler_performance
  operations = 10000
  
  # Test with different scheduler types
  schedulers = [
    ['Thread Pool', ThreadPoolScheduler.new(8)],
    ['Event Loop', EventLoopScheduler.new], 
    ['Memory Efficient', MemoryEfficientScheduler.new]
  ]
  
  schedulers.each do |name, scheduler|
    time = Benchmark.realtime do
      Fiber.new do
        Fiber.set_scheduler(scheduler)
        
        # Mix of I/O and sleep operations
        operations.times do |i|
          if i % 2 == 0
            sleep 0.001
          else
            File.read('/dev/null') rescue nil
          end
        end
      end.resume
    end
    
    puts "#{name}: #{time.round(3)}s"
    scheduler.close if scheduler.respond_to?(:close)
  end
end

compare_scheduler_performance
# Thread Pool: 2.341s
# Event Loop: 1.892s  
# Memory Efficient: 1.205s

Reference

Core Methods

Method Parameters Returns Description
Fiber.scheduler None Scheduler or nil Returns current fiber's scheduler
Fiber.set_scheduler(scheduler) scheduler (Object) Scheduler Sets scheduler for current fiber
Fiber.schedule { block } block (Block) Fiber Creates and resumes new fiber

Scheduler Interface Methods

Method Parameters Returns Description
#io_wait(io, events, timeout) io (IO), events (Integer), timeout (Numeric) Object Handles I/O readiness waiting
#kernel_sleep(duration) duration (Numeric) nil Handles sleep operations
#block(blocker, timeout) blocker (Object), timeout (Numeric) Object Handles generic blocking operations
#unblock(blocker, fiber) blocker (Object), fiber (Fiber) nil Unblocks waiting fiber
#fiber(&block) block (Block) Fiber Creates new fiber with scheduler
#close None nil Cleanup scheduler resources

I/O Events Constants

Constant Value Description
IO::READABLE 1 Socket/file readable
IO::WRITABLE 2 Socket/file writable
IO::PRIORITY 4 Priority data available

Standard Library Integration

Library Methods Scheduler Support
File read, write, open Yes
IO select, read, write Yes
Socket accept, connect, recv, send Yes
Net::HTTP get, post, request Yes
Process wait, spawn Partial
Kernel sleep Yes

Error Conditions

Error When Raised Resolution
FiberError Fiber resumed incorrectly Check fiber state before resume
ThreadError Cross-thread fiber access Ensure fiber operations on same thread
SystemCallError I/O operation failure Handle with appropriate rescue blocks

Scheduler Implementation Checklist

class MinimalScheduler
  def io_wait(io, events, timeout = nil)
    # Required: handle I/O readiness
  end
  
  def kernel_sleep(duration)
    # Required: handle sleep operations  
  end
  
  def block(blocker, timeout = nil)
    # Optional: handle generic blocking
  end
  
  def unblock(blocker, fiber)
    # Optional: unblock specific fiber
  end
  
  def fiber(&block)
    # Optional: custom fiber creation
  end
  
  def close
    # Optional: cleanup resources
  end
end

Performance Characteristics

Scheduler Type Memory Usage CPU Overhead Concurrency Model
Event Loop Low Low Single-threaded
Thread Pool Medium Medium Multi-threaded
Hybrid Medium Low Mixed
Synchronous High High Blocking

Integration Examples

# Rails integration
class ApplicationController < ActionController::Base
  around_action :with_scheduler
  
  private
  
  def with_scheduler
    Async do
      yield
    end
  end
end

# Rack middleware
class SchedulerMiddleware
  def initialize(app, scheduler_class = Async::Scheduler)
    @app = app
    @scheduler_class = scheduler_class
  end
  
  def call(env)
    scheduler = @scheduler_class.new
    Fiber.new do
      Fiber.set_scheduler(scheduler)
      @app.call(env)
    end.resume
  ensure
    scheduler&.close
  end
end

# Testing with schedulers
RSpec.configure do |config|
  config.around(:each, :async) do |example|
    Async do
      example.run
    end
  end
end