Overview
Ruby's Fiber scheduler interface provides a mechanism for implementing custom cooperative concurrency systems. The scheduler controls how fibers yield execution and resume, replacing Ruby's default blocking behavior with non-blocking alternatives. Ruby calls scheduler methods when fibers perform potentially blocking operations like I/O, sleep, or mutex operations.
The scheduler interface consists of several hook methods that Ruby invokes during fiber execution. These methods receive operation details and return values that determine execution flow. A scheduler can choose to handle operations asynchronously, delegate to other systems, or fall back to default behavior.
class BasicScheduler
def io_wait(io, events, timeout)
# Handle I/O waiting logic
end
def kernel_sleep(duration)
# Handle sleep operations
end
def block(blocker, timeout = nil)
# Handle blocking operations
end
end
Ruby activates a scheduler by assigning it to a fiber using Fiber.set_scheduler
. The scheduler remains active for that fiber and any child fibers created within its context. When the fiber terminates, Ruby deactivates the scheduler.
Fiber.new do
Fiber.set_scheduler(BasicScheduler.new)
# Scheduler now handles blocking operations
IO.read('/dev/zero', 1024) # Calls scheduler.io_wait
sleep(1) # Calls scheduler.kernel_sleep
end.resume
The scheduler interface emerged to support async I/O libraries and event-driven programming models. Custom schedulers enable integration with external event loops, efficient I/O multiplexing, and specialized concurrency patterns.
Basic Usage
Creating a functional scheduler requires implementing the core interface methods. The io_wait
method handles I/O readiness checking, receiving a file descriptor, event mask, and optional timeout. Return true
when the I/O operation can proceed, or false
to indicate timeout or unavailability.
class EventLoopScheduler
def initialize
@ios = {}
@running = false
end
def io_wait(io, events, timeout)
@ios[io] = { events: events, timeout: timeout }
start_loop unless @running
# Return immediately for demo - real implementation would block appropriately
true
end
private
def start_loop
@running = true
# Event loop implementation would go here
end
end
The kernel_sleep
method receives a duration in seconds and controls sleep behavior. Schedulers can implement precise timing, integrate with event loops, or provide non-blocking alternatives. Returning true
indicates the scheduler handled the sleep, while false
delegates to Ruby's default blocking sleep.
class TimerScheduler
def initialize
@timers = []
end
def kernel_sleep(duration)
return false if duration <= 0
end_time = Time.now + duration
@timers << { end_time: end_time, fiber: Fiber.current }
# Yield control back to scheduler
Fiber.yield
true
end
def process_timers
current_time = Time.now
ready_timers = @timers.select { |timer| timer[:end_time] <= current_time }
ready_timers.each do |timer|
timer[:fiber].resume
end
@timers -= ready_timers
end
end
The block
method handles generic blocking operations like mutex acquisition or condition variable waits. The blocker parameter contains context about the blocking operation, while timeout specifies maximum wait duration.
class MutexScheduler
def initialize
@blocked_fibers = []
end
def block(blocker, timeout = nil)
case blocker
when Mutex
handle_mutex_block(blocker, timeout)
when ConditionVariable
handle_condition_block(blocker, timeout)
else
false # Delegate to default behavior
end
end
private
def handle_mutex_block(mutex, timeout)
@blocked_fibers << {
fiber: Fiber.current,
mutex: mutex,
timeout: timeout
}
Fiber.yield
true
end
end
Schedulers must handle the close
method to clean up resources when the scheduler deactivates. This method receives no parameters and should release any held resources, cancel pending operations, and perform necessary cleanup.
class ResourceScheduler
def initialize
@connections = []
@timers = []
end
def close
@connections.each(&:close)
@timers.each(&:cancel)
@connections.clear
@timers.clear
end
end
Thread Safety & Concurrency
Fiber schedulers operate within single threads but must handle concurrent fiber switching safely. Ruby can switch between fibers at scheduler method boundaries, creating race conditions if schedulers maintain shared state without proper synchronization.
Scheduler methods execute atomically from Ruby's perspective, but internal scheduler operations may yield control. Data structures accessed across fiber switches require careful design to prevent corruption or inconsistent states.
class ThreadSafeScheduler
def initialize
@io_registry = Concurrent::Hash.new
@timer_queue = Queue.new
@mutex = Mutex.new
end
def io_wait(io, events, timeout)
# Atomic registration prevents race conditions
@mutex.synchronize do
@io_registry[io] = {
fiber: Fiber.current,
events: events,
timeout: timeout,
registered_at: Time.now
}
end
Fiber.yield # Safe yield point
@io_registry.delete(io)
true
end
def process_io_events
ready_ios = []
@mutex.synchronize do
@io_registry.each do |io, data|
if io_ready?(io, data[:events])
ready_ios << data[:fiber]
end
end
end
ready_ios.each(&:resume)
end
end
Multiple fibers sharing a scheduler create complex interaction patterns. Scheduler state changes from one fiber affect operations in other fibers. Implementing proper isolation prevents unintended cross-fiber interference.
class IsolatedScheduler
def initialize
@fiber_contexts = {}
end
def io_wait(io, events, timeout)
context = fiber_context(Fiber.current)
context[:pending_io] << { io: io, events: events }
# Process only this fiber's I/O operations
process_fiber_io(Fiber.current)
true
end
def kernel_sleep(duration)
context = fiber_context(Fiber.current)
context[:sleep_until] = Time.now + duration
# Yield only affects current fiber
Fiber.yield
context.delete(:sleep_until)
true
end
private
def fiber_context(fiber)
@fiber_contexts[fiber] ||= {
pending_io: [],
timers: [],
blocked_on: nil
}
end
def process_fiber_io(fiber)
context = fiber_context(fiber)
ready_operations = context[:pending_io].select { |op| io_ready?(op[:io], op[:events]) }
unless ready_operations.empty?
context[:pending_io] -= ready_operations
fiber.resume if fiber != Fiber.current
end
end
end
Schedulers integrating with external event systems must synchronize between Ruby's fiber execution and external callbacks. External events arriving during fiber execution require queuing and processing at safe yield points.
class EventSystemScheduler
def initialize
@event_queue = Queue.new
@external_system = ExternalEventSystem.new
@external_system.on_event { |event| @event_queue << event }
end
def io_wait(io, events, timeout)
request_id = @external_system.register_io(io, events)
loop do
if event = @event_queue.pop(non_block: true)
if event.request_id == request_id
@external_system.unregister(request_id)
return event.success?
else
# Re-queue events for other fibers
@event_queue << event
end
end
Fiber.yield # Allow other fibers to process
end
rescue ThreadError # Queue empty
Fiber.yield
retry
end
end
Nested scheduler scenarios occur when fibers create child fibers with different schedulers. Parent scheduler behavior must account for child scheduler operations and avoid interference with child execution contexts.
Advanced Usage
Complex scheduler implementations often compose multiple scheduling strategies, delegating different operation types to specialized handlers. This approach maintains modularity while supporting diverse concurrency patterns within a single scheduler.
class CompositeScheduler
def initialize
@io_handler = SelectIOHandler.new
@timer_handler = HeapTimerHandler.new
@mutex_handler = FairMutexHandler.new
@fallback_handler = DefaultHandler.new
end
def io_wait(io, events, timeout)
case io
when TCPSocket, UDPSocket
@io_handler.wait_socket(io, events, timeout)
when File
@io_handler.wait_file(io, events, timeout)
else
@fallback_handler.io_wait(io, events, timeout)
end
end
def kernel_sleep(duration)
if duration < 0.001 # Sub-millisecond sleeps
@timer_handler.precise_sleep(duration)
else
@timer_handler.standard_sleep(duration)
end
end
def block(blocker, timeout = nil)
handler = case blocker
when Mutex then @mutex_handler
when ConditionVariable then @mutex_handler
else @fallback_handler
end
handler.block(blocker, timeout)
end
def close
[@io_handler, @timer_handler, @mutex_handler, @fallback_handler].each(&:close)
end
end
Adaptive schedulers modify behavior based on runtime conditions, switching strategies as workload characteristics change. These schedulers monitor performance metrics and automatically optimize execution patterns.
class AdaptiveScheduler
def initialize
@strategies = {
low_latency: LowLatencyStrategy.new,
high_throughput: HighThroughputStrategy.new,
balanced: BalancedStrategy.new
}
@current_strategy = :balanced
@metrics = PerformanceMetrics.new
end
def io_wait(io, events, timeout)
start_time = Time.now
result = current_handler.io_wait(io, events, timeout)
@metrics.record_io_wait(Time.now - start_time)
adapt_strategy if @metrics.sample_count % 100 == 0
result
end
def kernel_sleep(duration)
current_handler.kernel_sleep(duration)
end
private
def current_handler
@strategies[@current_strategy]
end
def adapt_strategy
latency = @metrics.average_latency
throughput = @metrics.operations_per_second
new_strategy = if latency > 0.1
:low_latency
elsif throughput < 1000
:high_throughput
else
:balanced
end
if new_strategy != @current_strategy
@current_strategy = new_strategy
@metrics.reset
end
end
end
Priority-based scheduling implements fiber ordering based on importance or deadline requirements. This pattern suits applications with mixed workload priorities or real-time constraints.
class PriorityScheduler
def initialize
@priority_queues = Hash.new { |h, k| h[k] = [] }
@fiber_priorities = {}
@current_priority = 0
end
def set_fiber_priority(fiber, priority)
@fiber_priorities[fiber] = priority
end
def io_wait(io, events, timeout)
fiber = Fiber.current
priority = @fiber_priorities[fiber] || 0
@priority_queues[priority] << {
fiber: fiber,
io: io,
events: events,
timeout: timeout,
created_at: Time.now
}
Fiber.yield
true
end
def process_ready_fibers
# Process highest priority fibers first
@priority_queues.keys.sort.reverse.each do |priority|
queue = @priority_queues[priority]
ready_requests = queue.select { |req| io_ready?(req[:io], req[:events]) }
ready_requests.each do |request|
queue.delete(request)
request[:fiber].resume
end
end
end
def schedule_next_fiber
highest_priority = @priority_queues.keys.max
return unless highest_priority
next_request = @priority_queues[highest_priority].shift
next_request[:fiber].resume if next_request
end
end
Cooperative batching optimizes I/O operations by grouping similar requests and processing them together. This approach reduces system call overhead and improves throughput for I/O-intensive workloads.
class BatchingScheduler
def initialize
@io_batches = Hash.new { |h, k| h[k] = [] }
@batch_size = 32
@batch_timeout = 0.001
end
def io_wait(io, events, timeout)
batch_key = "#{events}_#{timeout}"
batch = @io_batches[batch_key]
batch << {
fiber: Fiber.current,
io: io,
events: events,
timeout: timeout,
added_at: Time.now
}
if batch.size >= @batch_size
process_batch(batch_key)
else
schedule_batch_timeout(batch_key)
end
Fiber.yield
true
end
private
def process_batch(batch_key)
batch = @io_batches[batch_key]
return if batch.empty?
ios = batch.map { |req| req[:io] }
events = batch.first[:events]
timeout = batch.map { |req| req[:timeout] }.min
ready_ios = select_multiple(ios, events, timeout)
batch.each do |request|
if ready_ios.include?(request[:io])
request[:fiber].resume
else
# Re-queue non-ready requests
@io_batches[batch_key] << request
end
end
@io_batches[batch_key] = @io_batches[batch_key] - batch
end
def schedule_batch_timeout(batch_key)
Thread.new do
sleep(@batch_timeout)
process_batch(batch_key)
end
end
end
Performance & Memory
Scheduler performance directly impacts application responsiveness and resource utilization. Efficient data structures for tracking fiber states and pending operations minimize overhead during context switches and operation dispatching.
Hash-based lookup structures provide O(1) access for fiber identification but consume memory proportional to active fiber count. Array-based structures reduce memory overhead but increase lookup time for large fiber populations.
class OptimizedScheduler
def initialize
# Use arrays for better cache locality
@io_requests = []
@timer_requests = []
# Hash for O(1) fiber lookups when needed
@fiber_index = {}
@request_pool = ObjectPool.new(IORequest, initial_size: 100)
end
def io_wait(io, events, timeout)
request = @request_pool.acquire
request.setup(Fiber.current, io, events, timeout)
@io_requests << request
@fiber_index[Fiber.current] = request
Fiber.yield
@io_requests.delete(request)
@fiber_index.delete(Fiber.current)
@request_pool.release(request)
true
end
def process_io_efficiently
# Process in chunks to maintain cache efficiency
chunk_size = 64
@io_requests.each_slice(chunk_size) do |chunk|
ready_requests = chunk.select { |req| io_ready?(req.io, req.events) }
ready_requests.each { |req| req.fiber.resume }
end
end
end
class ObjectPool
def initialize(klass, initial_size: 10)
@klass = klass
@available = initial_size.times.map { klass.new }
@in_use = []
end
def acquire
if @available.empty?
@klass.new
else
@available.pop
end
end
def release(object)
object.reset if object.respond_to?(:reset)
@available << object
end
end
Memory allocation patterns affect garbage collection pressure. Schedulers creating many temporary objects during operation processing increase GC frequency and pause times. Object pooling and reuse strategies reduce allocation overhead.
class LowAllocationScheduler
def initialize
@io_buffer = Array.new(1000) # Pre-allocated buffer
@io_count = 0
@reusable_events = IOEvent.new # Single reusable event object
end
def io_wait(io, events, timeout)
# Reuse buffer space instead of creating arrays
index = @io_count
@io_buffer[index] = {
fiber: Fiber.current,
io: io.fileno, # Store file descriptor number only
events: events,
timeout: timeout
}
@io_count += 1
Fiber.yield
# Compact buffer by moving last element to freed position
@io_count -= 1
@io_buffer[index] = @io_buffer[@io_count] if index < @io_count
true
end
def select_ready_ios
return if @io_count == 0
# Reuse event object instead of creating new ones
@reusable_events.reset
(0...@io_count).each do |i|
request = @io_buffer[i]
fd = request[:io]
if io_ready_fd?(fd, request[:events])
@reusable_events.add_ready(request[:fiber])
end
end
@reusable_events.resume_all
end
end
Scheduler overhead measurement requires careful profiling to identify bottlenecks. Common performance issues include excessive system calls, inefficient data structure operations, and unnecessary object allocations during hot paths.
class ProfilingScheduler
def initialize
@base_scheduler = HighPerformanceScheduler.new
@metrics = {
io_wait_calls: 0,
io_wait_time: 0.0,
sleep_calls: 0,
sleep_time: 0.0,
block_calls: 0,
block_time: 0.0
}
end
def io_wait(io, events, timeout)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = @base_scheduler.io_wait(io, events, timeout)
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@metrics[:io_wait_calls] += 1
@metrics[:io_wait_time] += (end_time - start_time)
result
end
def kernel_sleep(duration)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
result = @base_scheduler.kernel_sleep(duration)
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@metrics[:sleep_calls] += 1
@metrics[:sleep_time] += (end_time - start_time)
result
end
def performance_report
total_calls = @metrics.values_at(:io_wait_calls, :sleep_calls, :block_calls).sum
total_time = @metrics.values_at(:io_wait_time, :sleep_time, :block_time).sum
{
total_operations: total_calls,
total_scheduler_time: total_time,
average_operation_time: total_time / total_calls,
io_wait_percentage: (@metrics[:io_wait_time] / total_time) * 100,
operations_per_second: total_calls / total_time
}
end
end
Scaling characteristics vary significantly between scheduler implementations. Event-driven schedulers handle thousands of concurrent fibers efficiently, while polling-based schedulers degrade with increased fiber counts due to O(n) operation costs.
Reference
Core Scheduler Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#io_wait(io, events, timeout) |
io (IO), events (Integer), timeout (Numeric/nil) |
Boolean |
Handle I/O waiting operations |
#kernel_sleep(duration) |
duration (Numeric) |
Boolean |
Handle sleep operations |
#block(blocker, timeout) |
blocker (Object), timeout (Numeric/nil) |
Boolean |
Handle blocking operations |
#unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
void |
Unblock specific fiber |
#close |
none | void |
Clean up scheduler resources |
#fiber_wait(fiber, timeout) |
fiber (Fiber), timeout (Numeric/nil) |
Boolean |
Wait for fiber completion |
IO Event Constants
Constant | Value | Description |
---|---|---|
IO::READABLE |
1 | IO ready for reading |
IO::WRITABLE |
2 | IO ready for writing |
IO::PRIORITY |
4 | IO has priority data |
Scheduler Activation Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.set_scheduler(scheduler) |
scheduler (Object/nil) |
Object/nil |
Set fiber scheduler |
Fiber.scheduler |
none | Object/nil |
Get current scheduler |
Fiber.with_scheduler(scheduler) { } |
scheduler (Object), block |
Object |
Execute block with scheduler |
Common Blocker Types
Class | Usage | Scheduler Method |
---|---|---|
Mutex |
Mutex locking | #block(mutex, timeout) |
ConditionVariable |
Condition waiting | #block(cv, timeout) |
Queue |
Queue operations | #block(queue, timeout) |
Thread |
Thread joining | #block(thread, timeout) |
Error Handling
Error Class | Condition | Handling Strategy |
---|---|---|
FiberError |
Invalid fiber operation | Check fiber state before operations |
IOError |
I/O operation failed | Validate file descriptors and permissions |
SystemCallError |
System call failure | Implement retry logic with backoff |
Timeout::Error |
Operation timeout | Set appropriate timeout values |
Implementation Patterns
# Basic scheduler template
class CustomScheduler
def io_wait(io, events, timeout)
# Implementation required
false # Delegate to default behavior
end
def kernel_sleep(duration)
# Implementation optional
false # Use default sleep
end
def block(blocker, timeout = nil)
# Implementation optional
false # Use default blocking
end
def close
# Cleanup implementation
end
end
# Scheduler registration pattern
Fiber.new do
Fiber.set_scheduler(CustomScheduler.new)
# Scheduler active for this fiber and children
end.resume
# Resource cleanup pattern
begin
Fiber.set_scheduler(scheduler)
# Fiber operations
ensure
Fiber.set_scheduler(nil) # Deactivate scheduler
scheduler.close if scheduler.respond_to?(:close)
end
Performance Considerations
Operation | Cost | Optimization Strategy |
---|---|---|
Fiber switching | Low | Minimize unnecessary yields |
I/O polling | Medium | Batch I/O operations when possible |
System calls | High | Cache results and reduce call frequency |
Object allocation | Variable | Use object pools for frequent allocations |
Thread Safety Notes
- Scheduler methods execute within single threads
- Fiber switching can occur at method boundaries
- Shared scheduler state requires synchronization
- External event integration needs thread-safe queuing
- Resource cleanup must handle concurrent access