CrackedRuby logo

CrackedRuby

Building Custom Fiber Schedulers

A guide to creating custom scheduler implementations for Ruby's Fiber system to control cooperative concurrency execution flow.

Concurrency and Parallelism Fibers
6.3.5

Overview

Ruby's Fiber scheduler interface provides a mechanism for implementing custom cooperative concurrency systems. The scheduler controls how fibers yield execution and resume, replacing Ruby's default blocking behavior with non-blocking alternatives. Ruby calls scheduler methods when fibers perform potentially blocking operations like I/O, sleep, or mutex operations.

The scheduler interface consists of several hook methods that Ruby invokes during fiber execution. These methods receive operation details and return values that determine execution flow. A scheduler can choose to handle operations asynchronously, delegate to other systems, or fall back to default behavior.

class BasicScheduler
  def io_wait(io, events, timeout)
    # Handle I/O waiting logic
  end
  
  def kernel_sleep(duration)
    # Handle sleep operations
  end
  
  def block(blocker, timeout = nil)
    # Handle blocking operations
  end
end

Ruby activates a scheduler by assigning it to a fiber using Fiber.set_scheduler. The scheduler remains active for that fiber and any child fibers created within its context. When the fiber terminates, Ruby deactivates the scheduler.

Fiber.new do
  Fiber.set_scheduler(BasicScheduler.new)
  # Scheduler now handles blocking operations
  IO.read('/dev/zero', 1024)  # Calls scheduler.io_wait
  sleep(1)                    # Calls scheduler.kernel_sleep
end.resume

The scheduler interface emerged to support async I/O libraries and event-driven programming models. Custom schedulers enable integration with external event loops, efficient I/O multiplexing, and specialized concurrency patterns.

Basic Usage

Creating a functional scheduler requires implementing the core interface methods. The io_wait method handles I/O readiness checking, receiving a file descriptor, event mask, and optional timeout. Return true when the I/O operation can proceed, or false to indicate timeout or unavailability.

class EventLoopScheduler
  def initialize
    @ios = {}
    @running = false
  end
  
  def io_wait(io, events, timeout)
    @ios[io] = { events: events, timeout: timeout }
    start_loop unless @running
    # Return immediately for demo - real implementation would block appropriately
    true
  end
  
  private
  
  def start_loop
    @running = true
    # Event loop implementation would go here
  end
end

The kernel_sleep method receives a duration in seconds and controls sleep behavior. Schedulers can implement precise timing, integrate with event loops, or provide non-blocking alternatives. Returning true indicates the scheduler handled the sleep, while false delegates to Ruby's default blocking sleep.

class TimerScheduler
  def initialize
    @timers = []
  end
  
  def kernel_sleep(duration)
    return false if duration <= 0
    
    end_time = Time.now + duration
    @timers << { end_time: end_time, fiber: Fiber.current }
    
    # Yield control back to scheduler
    Fiber.yield
    true
  end
  
  def process_timers
    current_time = Time.now
    ready_timers = @timers.select { |timer| timer[:end_time] <= current_time }
    
    ready_timers.each do |timer|
      timer[:fiber].resume
    end
    
    @timers -= ready_timers
  end
end

The block method handles generic blocking operations like mutex acquisition or condition variable waits. The blocker parameter contains context about the blocking operation, while timeout specifies maximum wait duration.

class MutexScheduler
  def initialize
    @blocked_fibers = []
  end
  
  def block(blocker, timeout = nil)
    case blocker
    when Mutex
      handle_mutex_block(blocker, timeout)
    when ConditionVariable  
      handle_condition_block(blocker, timeout)
    else
      false  # Delegate to default behavior
    end
  end
  
  private
  
  def handle_mutex_block(mutex, timeout)
    @blocked_fibers << { 
      fiber: Fiber.current, 
      mutex: mutex, 
      timeout: timeout 
    }
    Fiber.yield
    true
  end
end

Schedulers must handle the close method to clean up resources when the scheduler deactivates. This method receives no parameters and should release any held resources, cancel pending operations, and perform necessary cleanup.

class ResourceScheduler
  def initialize
    @connections = []
    @timers = []
  end
  
  def close
    @connections.each(&:close)
    @timers.each(&:cancel)
    @connections.clear
    @timers.clear
  end
end

Thread Safety & Concurrency

Fiber schedulers operate within single threads but must handle concurrent fiber switching safely. Ruby can switch between fibers at scheduler method boundaries, creating race conditions if schedulers maintain shared state without proper synchronization.

Scheduler methods execute atomically from Ruby's perspective, but internal scheduler operations may yield control. Data structures accessed across fiber switches require careful design to prevent corruption or inconsistent states.

class ThreadSafeScheduler
  def initialize
    @io_registry = Concurrent::Hash.new
    @timer_queue = Queue.new
    @mutex = Mutex.new
  end
  
  def io_wait(io, events, timeout)
    # Atomic registration prevents race conditions
    @mutex.synchronize do
      @io_registry[io] = {
        fiber: Fiber.current,
        events: events,
        timeout: timeout,
        registered_at: Time.now
      }
    end
    
    Fiber.yield  # Safe yield point
    @io_registry.delete(io)
    true
  end
  
  def process_io_events
    ready_ios = []
    
    @mutex.synchronize do
      @io_registry.each do |io, data|
        if io_ready?(io, data[:events])
          ready_ios << data[:fiber]
        end
      end
    end
    
    ready_ios.each(&:resume)
  end
end

Multiple fibers sharing a scheduler create complex interaction patterns. Scheduler state changes from one fiber affect operations in other fibers. Implementing proper isolation prevents unintended cross-fiber interference.

class IsolatedScheduler
  def initialize
    @fiber_contexts = {}
  end
  
  def io_wait(io, events, timeout)
    context = fiber_context(Fiber.current)
    context[:pending_io] << { io: io, events: events }
    
    # Process only this fiber's I/O operations
    process_fiber_io(Fiber.current)
    true
  end
  
  def kernel_sleep(duration)
    context = fiber_context(Fiber.current)
    context[:sleep_until] = Time.now + duration
    
    # Yield only affects current fiber
    Fiber.yield
    context.delete(:sleep_until)
    true
  end
  
  private
  
  def fiber_context(fiber)
    @fiber_contexts[fiber] ||= {
      pending_io: [],
      timers: [],
      blocked_on: nil
    }
  end
  
  def process_fiber_io(fiber)
    context = fiber_context(fiber)
    ready_operations = context[:pending_io].select { |op| io_ready?(op[:io], op[:events]) }
    
    unless ready_operations.empty?
      context[:pending_io] -= ready_operations
      fiber.resume if fiber != Fiber.current
    end
  end
end

Schedulers integrating with external event systems must synchronize between Ruby's fiber execution and external callbacks. External events arriving during fiber execution require queuing and processing at safe yield points.

class EventSystemScheduler
  def initialize
    @event_queue = Queue.new
    @external_system = ExternalEventSystem.new
    @external_system.on_event { |event| @event_queue << event }
  end
  
  def io_wait(io, events, timeout)
    request_id = @external_system.register_io(io, events)
    
    loop do
      if event = @event_queue.pop(non_block: true)
        if event.request_id == request_id
          @external_system.unregister(request_id)
          return event.success?
        else
          # Re-queue events for other fibers
          @event_queue << event
        end
      end
      
      Fiber.yield  # Allow other fibers to process
    end
  rescue ThreadError  # Queue empty
    Fiber.yield
    retry
  end
end

Nested scheduler scenarios occur when fibers create child fibers with different schedulers. Parent scheduler behavior must account for child scheduler operations and avoid interference with child execution contexts.

Advanced Usage

Complex scheduler implementations often compose multiple scheduling strategies, delegating different operation types to specialized handlers. This approach maintains modularity while supporting diverse concurrency patterns within a single scheduler.

class CompositeScheduler
  def initialize
    @io_handler = SelectIOHandler.new
    @timer_handler = HeapTimerHandler.new
    @mutex_handler = FairMutexHandler.new
    @fallback_handler = DefaultHandler.new
  end
  
  def io_wait(io, events, timeout)
    case io
    when TCPSocket, UDPSocket
      @io_handler.wait_socket(io, events, timeout)
    when File
      @io_handler.wait_file(io, events, timeout) 
    else
      @fallback_handler.io_wait(io, events, timeout)
    end
  end
  
  def kernel_sleep(duration)
    if duration < 0.001  # Sub-millisecond sleeps
      @timer_handler.precise_sleep(duration)
    else
      @timer_handler.standard_sleep(duration)
    end
  end
  
  def block(blocker, timeout = nil)
    handler = case blocker
             when Mutex then @mutex_handler
             when ConditionVariable then @mutex_handler
             else @fallback_handler
             end
    
    handler.block(blocker, timeout)
  end
  
  def close
    [@io_handler, @timer_handler, @mutex_handler, @fallback_handler].each(&:close)
  end
end

Adaptive schedulers modify behavior based on runtime conditions, switching strategies as workload characteristics change. These schedulers monitor performance metrics and automatically optimize execution patterns.

class AdaptiveScheduler
  def initialize
    @strategies = {
      low_latency: LowLatencyStrategy.new,
      high_throughput: HighThroughputStrategy.new,
      balanced: BalancedStrategy.new
    }
    @current_strategy = :balanced
    @metrics = PerformanceMetrics.new
  end
  
  def io_wait(io, events, timeout)
    start_time = Time.now
    result = current_handler.io_wait(io, events, timeout)
    @metrics.record_io_wait(Time.now - start_time)
    
    adapt_strategy if @metrics.sample_count % 100 == 0
    result
  end
  
  def kernel_sleep(duration)
    current_handler.kernel_sleep(duration)
  end
  
  private
  
  def current_handler
    @strategies[@current_strategy]
  end
  
  def adapt_strategy
    latency = @metrics.average_latency
    throughput = @metrics.operations_per_second
    
    new_strategy = if latency > 0.1
                    :low_latency
                  elsif throughput < 1000
                    :high_throughput
                  else
                    :balanced
                  end
    
    if new_strategy != @current_strategy
      @current_strategy = new_strategy
      @metrics.reset
    end
  end
end

Priority-based scheduling implements fiber ordering based on importance or deadline requirements. This pattern suits applications with mixed workload priorities or real-time constraints.

class PriorityScheduler
  def initialize
    @priority_queues = Hash.new { |h, k| h[k] = [] }
    @fiber_priorities = {}
    @current_priority = 0
  end
  
  def set_fiber_priority(fiber, priority)
    @fiber_priorities[fiber] = priority
  end
  
  def io_wait(io, events, timeout)
    fiber = Fiber.current
    priority = @fiber_priorities[fiber] || 0
    
    @priority_queues[priority] << {
      fiber: fiber,
      io: io,
      events: events,
      timeout: timeout,
      created_at: Time.now
    }
    
    Fiber.yield
    true
  end
  
  def process_ready_fibers
    # Process highest priority fibers first
    @priority_queues.keys.sort.reverse.each do |priority|
      queue = @priority_queues[priority]
      ready_requests = queue.select { |req| io_ready?(req[:io], req[:events]) }
      
      ready_requests.each do |request|
        queue.delete(request)
        request[:fiber].resume
      end
    end
  end
  
  def schedule_next_fiber
    highest_priority = @priority_queues.keys.max
    return unless highest_priority
    
    next_request = @priority_queues[highest_priority].shift
    next_request[:fiber].resume if next_request
  end
end

Cooperative batching optimizes I/O operations by grouping similar requests and processing them together. This approach reduces system call overhead and improves throughput for I/O-intensive workloads.

class BatchingScheduler
  def initialize
    @io_batches = Hash.new { |h, k| h[k] = [] }
    @batch_size = 32
    @batch_timeout = 0.001
  end
  
  def io_wait(io, events, timeout)
    batch_key = "#{events}_#{timeout}"
    batch = @io_batches[batch_key]
    
    batch << {
      fiber: Fiber.current,
      io: io,
      events: events,
      timeout: timeout,
      added_at: Time.now
    }
    
    if batch.size >= @batch_size
      process_batch(batch_key)
    else
      schedule_batch_timeout(batch_key)
    end
    
    Fiber.yield
    true
  end
  
  private
  
  def process_batch(batch_key)
    batch = @io_batches[batch_key]
    return if batch.empty?
    
    ios = batch.map { |req| req[:io] }
    events = batch.first[:events]
    timeout = batch.map { |req| req[:timeout] }.min
    
    ready_ios = select_multiple(ios, events, timeout)
    
    batch.each do |request|
      if ready_ios.include?(request[:io])
        request[:fiber].resume
      else
        # Re-queue non-ready requests
        @io_batches[batch_key] << request
      end
    end
    
    @io_batches[batch_key] = @io_batches[batch_key] - batch
  end
  
  def schedule_batch_timeout(batch_key)
    Thread.new do
      sleep(@batch_timeout)
      process_batch(batch_key)
    end
  end
end

Performance & Memory

Scheduler performance directly impacts application responsiveness and resource utilization. Efficient data structures for tracking fiber states and pending operations minimize overhead during context switches and operation dispatching.

Hash-based lookup structures provide O(1) access for fiber identification but consume memory proportional to active fiber count. Array-based structures reduce memory overhead but increase lookup time for large fiber populations.

class OptimizedScheduler
  def initialize
    # Use arrays for better cache locality
    @io_requests = []
    @timer_requests = []
    
    # Hash for O(1) fiber lookups when needed
    @fiber_index = {}
    @request_pool = ObjectPool.new(IORequest, initial_size: 100)
  end
  
  def io_wait(io, events, timeout)
    request = @request_pool.acquire
    request.setup(Fiber.current, io, events, timeout)
    
    @io_requests << request
    @fiber_index[Fiber.current] = request
    
    Fiber.yield
    
    @io_requests.delete(request)
    @fiber_index.delete(Fiber.current)
    @request_pool.release(request)
    true
  end
  
  def process_io_efficiently
    # Process in chunks to maintain cache efficiency
    chunk_size = 64
    @io_requests.each_slice(chunk_size) do |chunk|
      ready_requests = chunk.select { |req| io_ready?(req.io, req.events) }
      ready_requests.each { |req| req.fiber.resume }
    end
  end
end

class ObjectPool
  def initialize(klass, initial_size: 10)
    @klass = klass
    @available = initial_size.times.map { klass.new }
    @in_use = []
  end
  
  def acquire
    if @available.empty?
      @klass.new
    else
      @available.pop
    end
  end
  
  def release(object)
    object.reset if object.respond_to?(:reset)
    @available << object
  end
end

Memory allocation patterns affect garbage collection pressure. Schedulers creating many temporary objects during operation processing increase GC frequency and pause times. Object pooling and reuse strategies reduce allocation overhead.

class LowAllocationScheduler
  def initialize
    @io_buffer = Array.new(1000)  # Pre-allocated buffer
    @io_count = 0
    @reusable_events = IOEvent.new  # Single reusable event object
  end
  
  def io_wait(io, events, timeout)
    # Reuse buffer space instead of creating arrays
    index = @io_count
    @io_buffer[index] = {
      fiber: Fiber.current,
      io: io.fileno,  # Store file descriptor number only
      events: events,
      timeout: timeout
    }
    @io_count += 1
    
    Fiber.yield
    
    # Compact buffer by moving last element to freed position
    @io_count -= 1
    @io_buffer[index] = @io_buffer[@io_count] if index < @io_count
    true
  end
  
  def select_ready_ios
    return if @io_count == 0
    
    # Reuse event object instead of creating new ones
    @reusable_events.reset
    
    (0...@io_count).each do |i|
      request = @io_buffer[i]
      fd = request[:io]
      
      if io_ready_fd?(fd, request[:events])
        @reusable_events.add_ready(request[:fiber])
      end
    end
    
    @reusable_events.resume_all
  end
end

Scheduler overhead measurement requires careful profiling to identify bottlenecks. Common performance issues include excessive system calls, inefficient data structure operations, and unnecessary object allocations during hot paths.

class ProfilingScheduler
  def initialize
    @base_scheduler = HighPerformanceScheduler.new
    @metrics = {
      io_wait_calls: 0,
      io_wait_time: 0.0,
      sleep_calls: 0,
      sleep_time: 0.0,
      block_calls: 0,
      block_time: 0.0
    }
  end
  
  def io_wait(io, events, timeout)
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    result = @base_scheduler.io_wait(io, events, timeout)
    end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    
    @metrics[:io_wait_calls] += 1
    @metrics[:io_wait_time] += (end_time - start_time)
    result
  end
  
  def kernel_sleep(duration)
    start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    result = @base_scheduler.kernel_sleep(duration)
    end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    
    @metrics[:sleep_calls] += 1
    @metrics[:sleep_time] += (end_time - start_time)
    result
  end
  
  def performance_report
    total_calls = @metrics.values_at(:io_wait_calls, :sleep_calls, :block_calls).sum
    total_time = @metrics.values_at(:io_wait_time, :sleep_time, :block_time).sum
    
    {
      total_operations: total_calls,
      total_scheduler_time: total_time,
      average_operation_time: total_time / total_calls,
      io_wait_percentage: (@metrics[:io_wait_time] / total_time) * 100,
      operations_per_second: total_calls / total_time
    }
  end
end

Scaling characteristics vary significantly between scheduler implementations. Event-driven schedulers handle thousands of concurrent fibers efficiently, while polling-based schedulers degrade with increased fiber counts due to O(n) operation costs.

Reference

Core Scheduler Methods

Method Parameters Returns Description
#io_wait(io, events, timeout) io (IO), events (Integer), timeout (Numeric/nil) Boolean Handle I/O waiting operations
#kernel_sleep(duration) duration (Numeric) Boolean Handle sleep operations
#block(blocker, timeout) blocker (Object), timeout (Numeric/nil) Boolean Handle blocking operations
#unblock(blocker, fiber) blocker (Object), fiber (Fiber) void Unblock specific fiber
#close none void Clean up scheduler resources
#fiber_wait(fiber, timeout) fiber (Fiber), timeout (Numeric/nil) Boolean Wait for fiber completion

IO Event Constants

Constant Value Description
IO::READABLE 1 IO ready for reading
IO::WRITABLE 2 IO ready for writing
IO::PRIORITY 4 IO has priority data

Scheduler Activation Methods

Method Parameters Returns Description
Fiber.set_scheduler(scheduler) scheduler (Object/nil) Object/nil Set fiber scheduler
Fiber.scheduler none Object/nil Get current scheduler
Fiber.with_scheduler(scheduler) { } scheduler (Object), block Object Execute block with scheduler

Common Blocker Types

Class Usage Scheduler Method
Mutex Mutex locking #block(mutex, timeout)
ConditionVariable Condition waiting #block(cv, timeout)
Queue Queue operations #block(queue, timeout)
Thread Thread joining #block(thread, timeout)

Error Handling

Error Class Condition Handling Strategy
FiberError Invalid fiber operation Check fiber state before operations
IOError I/O operation failed Validate file descriptors and permissions
SystemCallError System call failure Implement retry logic with backoff
Timeout::Error Operation timeout Set appropriate timeout values

Implementation Patterns

# Basic scheduler template
class CustomScheduler
  def io_wait(io, events, timeout)
    # Implementation required
    false  # Delegate to default behavior
  end
  
  def kernel_sleep(duration)
    # Implementation optional
    false  # Use default sleep
  end
  
  def block(blocker, timeout = nil)
    # Implementation optional
    false  # Use default blocking
  end
  
  def close
    # Cleanup implementation
  end
end

# Scheduler registration pattern
Fiber.new do
  Fiber.set_scheduler(CustomScheduler.new)
  # Scheduler active for this fiber and children
end.resume

# Resource cleanup pattern
begin
  Fiber.set_scheduler(scheduler)
  # Fiber operations
ensure
  Fiber.set_scheduler(nil)  # Deactivate scheduler
  scheduler.close if scheduler.respond_to?(:close)
end

Performance Considerations

Operation Cost Optimization Strategy
Fiber switching Low Minimize unnecessary yields
I/O polling Medium Batch I/O operations when possible
System calls High Cache results and reduce call frequency
Object allocation Variable Use object pools for frequent allocations

Thread Safety Notes

  • Scheduler methods execute within single threads
  • Fiber switching can occur at method boundaries
  • Shared scheduler state requires synchronization
  • External event integration needs thread-safe queuing
  • Resource cleanup must handle concurrent access