Overview
Fiber Scheduler provides a pluggable interface for implementing custom event loops that can handle non-blocking I/O operations in Ruby. The scheduler hooks into blocking operations like file I/O, network requests, and sleep calls, allowing them to yield control to other fibers while waiting for operations to complete.
Ruby implements the scheduler through the Fiber.scheduler
interface, which accepts any object that responds to specific hook methods. When a scheduler is active, blocking operations automatically check if they can be handled asynchronously through the scheduler's methods.
The scheduler operates by intercepting calls to blocking operations and providing alternative implementations that yield fiber control instead of blocking the entire thread. This enables concurrent execution of multiple fibers within a single thread.
# Basic scheduler setup
class SimpleScheduler
def initialize
@ready_fibers = []
end
def block(blocker, timeout)
# Handle blocking operation
fiber = Fiber.current
# Schedule fiber to resume later
resume_later(fiber)
end
def resume_later(fiber)
@ready_fibers << fiber
end
def run
while fiber = @ready_fibers.shift
fiber.resume
end
end
end
The scheduler integrates with Ruby's existing fiber system, requiring no changes to existing code that uses blocking operations. Applications can enable scheduling by setting Fiber.scheduler
and wrapping work in fibers.
scheduler = SimpleScheduler.new
Fiber.scheduler = scheduler
Fiber.new do
# This sleep will use the scheduler if implemented
sleep 1
puts "Fiber completed"
end.resume
scheduler.run
Ruby defines specific hook methods that schedulers must implement to handle different types of blocking operations. These include io_wait
, kernel_sleep
, block
, and unblock
methods that correspond to different blocking scenarios.
Basic Usage
Setting up a fiber scheduler requires implementing a scheduler class that responds to the required hook methods and assigning it to Fiber.scheduler
. The scheduler remains active until explicitly cleared or the thread ends.
class BasicScheduler
def initialize
@sleeping = []
@waiting = []
@start_time = Time.now
end
def kernel_sleep(duration)
fiber = Fiber.current
wakeup_time = current_time + duration
@sleeping << [fiber, wakeup_time]
Fiber.yield
end
def run_once
current = current_time
ready_fibers = []
@sleeping.reject! do |fiber, wakeup_time|
if current >= wakeup_time
ready_fibers << fiber
true
end
end
ready_fibers.each(&:resume)
!@sleeping.empty?
end
private
def current_time
Time.now - @start_time
end
end
Creating concurrent fibers requires wrapping work in Fiber.new
blocks and calling resume
to start execution. The scheduler handles yielding between fibers when blocking operations occur.
scheduler = BasicScheduler.new
Fiber.scheduler = scheduler
# Create multiple concurrent fibers
fibers = 3.times.map do |i|
Fiber.new do
puts "Fiber #{i} starting"
sleep 0.5
puts "Fiber #{i} continuing"
sleep 0.3
puts "Fiber #{i} finished"
end
end
# Start all fibers
fibers.each(&:resume)
# Run scheduler until all work completes
while scheduler.run_once
# Continue processing
end
Fiber.scheduler = nil
The scheduler hook methods receive different parameters depending on the operation type. The kernel_sleep
method receives a duration, while io_wait
receives an IO object and operation mode.
class IOScheduler
def initialize
@io_waiting = {}
@ready_fibers = []
end
def io_wait(io, events, timeout)
fiber = Fiber.current
@io_waiting[io] = [fiber, events]
# In real implementation, would use select/epoll
# This is simplified for demonstration
Fiber.yield
return events
end
def kernel_sleep(duration)
fiber = Fiber.current
# Schedule to wake up after duration
Thread.new do
sleep duration
@ready_fibers << fiber
end
Fiber.yield
end
def process_ready
while fiber = @ready_fibers.shift
fiber.resume
end
end
end
Managing fiber lifecycle requires careful handling of fiber creation, resumption, and completion. Fibers that encounter errors or complete normally should not be resumed again.
def safe_fiber_execution(&block)
fiber = Fiber.new do
begin
block.call
rescue => e
puts "Fiber error: #{e.message}"
ensure
# Cleanup fiber resources
remove_from_scheduler(Fiber.current)
end
end
fiber.resume
fiber
end
# Usage with error handling
scheduler = IOScheduler.new
Fiber.scheduler = scheduler
safe_fiber_execution do
# This might raise an error
file = File.open("nonexistent.txt")
content = file.read
file.close
end
scheduler.process_ready
Thread Safety & Concurrency
Fiber schedulers operate within single threads but must handle concurrent access to shared scheduler state when multiple fibers access the same resources. The scheduler's internal data structures require protection against race conditions during fiber switching.
class ThreadSafeScheduler
def initialize
@ready_fibers = []
@sleeping_fibers = []
@io_waiting = {}
@mutex = Mutex.new
end
def add_ready_fiber(fiber)
@mutex.synchronize do
@ready_fibers << fiber unless @ready_fibers.include?(fiber)
end
end
def schedule_sleep(fiber, duration)
wakeup_time = Time.now + duration
@mutex.synchronize do
@sleeping_fibers << [fiber, wakeup_time]
@sleeping_fibers.sort_by! { |_, time| time }
end
end
def kernel_sleep(duration)
fiber = Fiber.current
schedule_sleep(fiber, duration)
Fiber.yield
end
end
Cross-thread scheduler coordination becomes necessary when background threads need to wake up sleeping fibers or notify the scheduler about completed I/O operations. Communication mechanisms must be thread-safe.
class MultiThreadScheduler
def initialize
@ready_queue = Queue.new # Thread-safe queue
@sleeping_fibers = []
@background_threads = []
@running = true
end
def start_background_processor
@background_threads << Thread.new do
while @running
sleep 0.01 # Small delay to prevent busy waiting
check_sleeping_fibers
process_io_events
end
end
end
def kernel_sleep(duration)
fiber = Fiber.current
# Schedule wakeup in background thread
Thread.new do
sleep duration
@ready_queue << fiber
end
Fiber.yield
end
def run_ready_fibers
while !@ready_queue.empty?
begin
fiber = @ready_queue.pop(non_block: true)
fiber.resume if fiber.alive?
rescue ThreadError
break # Queue is empty
end
end
end
private
def check_sleeping_fibers
current_time = Time.now
@sleeping_fibers.reject! do |fiber, wakeup_time|
if current_time >= wakeup_time
@ready_queue << fiber
true
end
end
end
end
Scheduler state isolation prevents fiber-specific data from leaking between different fibers. Each fiber's context must remain separate even when sharing the same scheduler instance.
class IsolatedScheduler
def initialize
@fiber_contexts = {}
@global_state = {
ready_fibers: [],
io_operations: {}
}
end
def get_fiber_context(fiber = Fiber.current)
@fiber_contexts[fiber] ||= {
sleep_until: nil,
waiting_for_io: nil,
custom_data: {}
}
end
def kernel_sleep(duration)
fiber = Fiber.current
context = get_fiber_context(fiber)
context[:sleep_until] = Time.now + duration
Fiber.yield
end
def set_fiber_data(key, value)
context = get_fiber_context
context[:custom_data][key] = value
end
def get_fiber_data(key)
context = get_fiber_context
context[:custom_data][key]
end
def cleanup_fiber(fiber)
@fiber_contexts.delete(fiber)
@global_state[:ready_fibers].delete(fiber)
end
end
Deadlock prevention requires careful ordering of scheduler operations and avoiding circular dependencies between fibers. Schedulers must detect and handle potential deadlock scenarios.
class DeadlockSafeScheduler
def initialize
@dependency_graph = {}
@blocked_fibers = {}
end
def block(blocker, timeout = nil)
fiber = Fiber.current
# Check for potential deadlocks
if would_create_deadlock?(fiber, blocker)
raise FiberError, "Potential deadlock detected"
end
record_dependency(fiber, blocker)
@blocked_fibers[fiber] = blocker
Fiber.yield
end
def unblock(blocker, fiber)
return unless @blocked_fibers[fiber] == blocker
@blocked_fibers.delete(fiber)
remove_dependency(fiber, blocker)
schedule_ready(fiber)
end
private
def would_create_deadlock?(fiber, blocker)
visited = Set.new
current = blocker
while current && !visited.include?(current)
visited << current
return true if current == fiber
current = @dependency_graph[current]
end
false
end
def record_dependency(fiber, blocker)
@dependency_graph[fiber] = blocker
end
def remove_dependency(fiber, blocker)
@dependency_graph.delete(fiber)
end
end
Performance & Memory
Scheduler performance directly impacts application throughput, particularly for I/O-bound operations. Efficient data structures and minimal overhead during fiber switching are essential for maintaining high performance under concurrent load.
class HighPerformanceScheduler
def initialize
@ready_fibers = []
@sleeping_heap = [] # Min-heap for O(log n) sleep scheduling
@io_ready = {}
@fiber_pool = [] # Reuse fiber objects
end
def kernel_sleep(duration)
fiber = Fiber.current
wakeup_time = Time.now + duration
# Insert into heap maintaining order
insert_sleeping_fiber(fiber, wakeup_time)
Fiber.yield
end
def get_pooled_fiber(&block)
if @fiber_pool.empty?
Fiber.new(&block)
else
fiber = @fiber_pool.pop
# Reset fiber with new block
fiber.instance_eval(&block)
fiber
end
end
def return_fiber_to_pool(fiber)
return unless fiber.alive?
@fiber_pool << fiber if @fiber_pool.size < 100
end
private
def insert_sleeping_fiber(fiber, wakeup_time)
@sleeping_heap << [wakeup_time, fiber]
# Bubble up to maintain heap property
bubble_up(@sleeping_heap.size - 1)
end
def bubble_up(index)
return if index == 0
parent = (index - 1) / 2
return if @sleeping_heap[parent][0] <= @sleeping_heap[index][0]
@sleeping_heap[parent], @sleeping_heap[index] =
@sleeping_heap[index], @sleeping_heap[parent]
bubble_up(parent)
end
end
Memory usage optimization requires careful management of scheduler internal state and preventing memory leaks from completed fibers. Large numbers of concurrent fibers can consume significant memory if not properly managed.
class MemoryEfficientScheduler
def initialize(max_fibers: 1000)
@max_fibers = max_fibers
@active_fibers = Set.new
@completed_fibers = []
@memory_pressure_threshold = 500
end
def create_managed_fiber(&block)
if @active_fibers.size >= @max_fibers
raise "Maximum fiber limit reached: #{@max_fibers}"
end
fiber = Fiber.new do
begin
block.call
ensure
cleanup_completed_fiber(Fiber.current)
end
end
@active_fibers.add(fiber)
check_memory_pressure
fiber
end
def cleanup_completed_fiber(fiber)
@active_fibers.delete(fiber)
@completed_fibers << fiber
# Clean up completed fibers periodically
if @completed_fibers.size > 100
@completed_fibers.clear
trigger_gc if memory_pressure?
end
end
private
def memory_pressure?
@active_fibers.size > @memory_pressure_threshold
end
def trigger_gc
GC.start if memory_pressure?
end
def check_memory_pressure
if memory_pressure?
puts "Memory pressure detected: #{@active_fibers.size} active fibers"
end
end
end
Benchmarking scheduler performance requires measuring both throughput and latency under various load conditions. Scheduler efficiency affects the overall application performance significantly.
class BenchmarkableScheduler
def initialize
@operation_counts = Hash.new(0)
@operation_times = Hash.new { |h, k| h[k] = [] }
@start_time = Time.now
end
def kernel_sleep(duration)
start = Time.now
fiber = Fiber.current
# Simulate actual sleep scheduling
schedule_wakeup(fiber, duration)
Fiber.yield
elapsed = Time.now - start
record_operation(:sleep, elapsed)
end
def io_wait(io, events, timeout)
start = Time.now
fiber = Fiber.current
# Simulate I/O wait
schedule_io_completion(fiber, io, events)
Fiber.yield
elapsed = Time.now - start
record_operation(:io_wait, elapsed)
events
end
def performance_report
{
uptime: Time.now - @start_time,
operation_counts: @operation_counts,
average_times: calculate_averages,
peak_times: calculate_peaks
}
end
private
def record_operation(type, duration)
@operation_counts[type] += 1
@operation_times[type] << duration
# Keep only recent measurements to prevent memory growth
if @operation_times[type].size > 1000
@operation_times[type] = @operation_times[type].last(500)
end
end
def calculate_averages
@operation_times.transform_values do |times|
times.sum / times.size if times.any?
end
end
def calculate_peaks
@operation_times.transform_values(&:max)
end
end
Scaling characteristics depend heavily on the number of concurrent fibers and the frequency of blocking operations. Schedulers must handle increasing load without degrading performance linearly.
def benchmark_scheduler_scaling(scheduler_class)
fiber_counts = [10, 50, 100, 500, 1000]
results = {}
fiber_counts.each do |count|
scheduler = scheduler_class.new
Fiber.scheduler = scheduler
start_time = Time.now
fibers = count.times.map do |i|
Fiber.new do
# Simulate mixed workload
sleep 0.01
# Simulate some computation
1000.times { |j| j * i }
sleep 0.01
end
end
fibers.each(&:resume)
# Run scheduler until completion
while scheduler.respond_to?(:has_work?) && scheduler.has_work?
scheduler.run_once if scheduler.respond_to?(:run_once)
end
elapsed = Time.now - start_time
results[count] = {
time: elapsed,
throughput: count / elapsed
}
Fiber.scheduler = nil
end
results
end
Production Patterns
Production scheduler deployments require robust error handling, monitoring integration, and graceful degradation when the scheduler encounters problems. Applications must continue functioning even if the scheduler fails.
class ProductionScheduler
def initialize(config = {})
@config = {
max_fibers: 1000,
timeout_default: 30,
error_handler: method(:default_error_handler),
metrics_reporter: nil
}.merge(config)
@active_fibers = Set.new
@error_count = 0
@start_time = Time.now
end
def kernel_sleep(duration)
begin
fiber = Fiber.current
@active_fibers.add(fiber)
# Apply maximum timeout limit
clamped_duration = [duration, @config[:timeout_default]].min
schedule_wakeup(fiber, clamped_duration)
record_metric(:sleep_scheduled, duration: duration)
Fiber.yield
rescue => e
handle_error(e, :kernel_sleep)
# Fallback to blocking sleep
Kernel.sleep(duration)
ensure
@active_fibers.delete(Fiber.current)
end
end
def health_check
{
status: @error_count < 10 ? :healthy : :degraded,
active_fibers: @active_fibers.size,
error_count: @error_count,
uptime: Time.now - @start_time,
memory_usage: memory_usage_mb
}
end
def graceful_shutdown(timeout: 30)
shutdown_start = Time.now
while @active_fibers.any? && (Time.now - shutdown_start) < timeout
run_once
sleep 0.1
end
if @active_fibers.any?
puts "Warning: #{@active_fibers.size} fibers still active after shutdown"
end
end
private
def handle_error(error, context)
@error_count += 1
@config[:error_handler].call(error, context)
# Trigger circuit breaker if too many errors
if @error_count > 50
disable_scheduler
end
end
def default_error_handler(error, context)
puts "Scheduler error in #{context}: #{error.message}"
puts error.backtrace.first(5).join("\n")
end
def record_metric(event, data = {})
return unless @config[:metrics_reporter]
@config[:metrics_reporter].call({
event: event,
timestamp: Time.now,
data: data
})
end
end
Web application integration requires careful coordination between the scheduler and the web server's threading model. Different web servers have varying compatibility with fiber schedulers.
class WebApplicationScheduler
def self.configure_for_server(server_type)
case server_type
when :puma
configure_puma_integration
when :unicorn
configure_unicorn_integration
when :passenger
configure_passenger_integration
else
configure_generic_integration
end
end
def self.configure_puma_integration
# Puma supports fiber scheduling in worker processes
scheduler = new(
max_fibers: 500, # Conservative limit per worker
thread_safe: true
)
# Set scheduler for each worker thread
Puma.on_worker_boot do
Fiber.scheduler = scheduler
end
scheduler
end
def self.middleware_wrapper
Proc.new do |app|
->(env) do
if Fiber.scheduler
# Run request in fiber if scheduler available
result = nil
fiber = Fiber.new do
result = app.call(env)
end
fiber.resume
result
else
app.call(env)
end
end
end
end
end
# Rails integration
class SchedulerRailsIntegration
def self.setup
Rails.application.configure do |config|
# Configure scheduler during initialization
config.before_initialize do
scheduler = ProductionScheduler.new(
max_fibers: Rails.env.production? ? 1000 : 100,
error_handler: Rails.logger.method(:error)
)
Fiber.scheduler = scheduler
end
# Graceful shutdown
at_exit do
Fiber.scheduler&.graceful_shutdown
end
end
end
end
Database connection pooling requires special consideration with fiber schedulers since multiple fibers may attempt to use database connections concurrently within the same thread.
class DatabaseAwareScheduler
def initialize(db_pool)
@db_pool = db_pool
@fiber_connections = {}
super()
end
def io_wait(io, events, timeout)
# Check if this is a database connection
if database_io?(io)
handle_database_wait(io, events, timeout)
else
super
end
end
def get_fiber_db_connection
fiber = Fiber.current
@fiber_connections[fiber] ||= @db_pool.checkout
end
def release_fiber_db_connection
fiber = Fiber.current
if connection = @fiber_connections.delete(fiber)
@db_pool.checkin(connection)
end
end
private
def database_io?(io)
# Identify database sockets/file descriptors
io.respond_to?(:addr) && io.addr&.include?('postgresql')
rescue
false
end
def handle_database_wait(io, events, timeout)
# Ensure proper connection management during I/O wait
connection = get_fiber_db_connection
begin
super(io, events, timeout)
ensure
# Connection remains checked out for this fiber
end
end
end
Monitoring and observability require integration with application performance monitoring tools and custom metrics collection for scheduler-specific behavior.
class ObservableScheduler
def initialize(metrics_backend: nil)
@metrics = metrics_backend || DefaultMetrics.new
@operation_latencies = {}
@fiber_lifecycle_events = []
super()
end
def kernel_sleep(duration)
start_time = Time.now
@metrics.increment('scheduler.sleep.count')
@metrics.histogram('scheduler.sleep.duration', duration)
super
actual_duration = Time.now - start_time
@metrics.histogram('scheduler.sleep.actual_duration', actual_duration)
end
def create_instrumented_fiber(&block)
fiber_id = SecureRandom.hex(8)
start_time = Time.now
@metrics.increment('scheduler.fiber.created')
Fiber.new do
begin
@metrics.timing('scheduler.fiber.lifetime') do
block.call
end
rescue => e
@metrics.increment('scheduler.fiber.errors')
raise
ensure
@metrics.increment('scheduler.fiber.completed')
record_fiber_lifecycle(fiber_id, start_time, Time.now)
end
end
end
def performance_dashboard_data
{
active_fibers: @active_fibers.size,
total_sleep_operations: @metrics.get('scheduler.sleep.count'),
average_sleep_duration: @metrics.get('scheduler.sleep.duration.avg'),
error_rate: calculate_error_rate,
memory_usage: process_memory_usage,
recent_fiber_events: @fiber_lifecycle_events.last(100)
}
end
private
def record_fiber_lifecycle(fiber_id, start_time, end_time)
event = {
fiber_id: fiber_id,
start_time: start_time,
end_time: end_time,
duration: end_time - start_time
}
@fiber_lifecycle_events << event
@fiber_lifecycle_events.shift if @fiber_lifecycle_events.size > 1000
end
end
Reference
Core Scheduler Interface
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.scheduler |
None | Object or nil |
Returns current thread's fiber scheduler |
Fiber.scheduler=(scheduler) |
scheduler (Object) |
Object |
Sets fiber scheduler for current thread |
Fiber.set_scheduler(scheduler) |
scheduler (Object) |
Object |
Alternative method to set scheduler |
Required Scheduler Hook Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#kernel_sleep(duration) |
duration (Numeric) |
nil |
Handle sleep operations |
#block(blocker, timeout) |
blocker (Object), timeout (Numeric) |
nil |
Handle general blocking operations |
#unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
nil |
Resume blocked fiber |
#io_wait(io, events, timeout) |
io (IO), events (Integer), timeout (Numeric) |
Integer |
Handle I/O waiting operations |
#io_read(io, buffer, length) |
io (IO), buffer (String), length (Integer) |
Integer |
Handle read operations |
#io_write(io, buffer, length) |
io (IO), buffer (String), length (Integer) |
Integer |
Handle write operations |
Optional Scheduler Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#address_resolve(hostname) |
hostname (String) |
Array<String> |
Handle DNS resolution |
#timeout_after(duration, klass) |
duration (Numeric), klass (Class) |
Object |
Handle timeout operations |
#process_wait(pid, flags) |
pid (Integer), flags (Integer) |
Process::Status |
Handle process waiting |
I/O Wait Event Constants
Constant | Value | Description |
---|---|---|
IO::READABLE |
1 |
Wait for read readiness |
IO::WRITABLE |
2 |
Wait for write readiness |
IO::PRIORITY |
4 |
Wait for priority data |
Scheduler Implementation Patterns
# Basic scheduler template
class BaseScheduler
def initialize
@ready_fibers = []
@sleeping_fibers = []
@io_waiting = {}
end
# Required hook methods
def kernel_sleep(duration)
# Implementation for sleep handling
end
def io_wait(io, events, timeout)
# Implementation for I/O waiting
end
def block(blocker, timeout)
# Implementation for general blocking
end
def unblock(blocker, fiber)
# Implementation for unblocking
end
# Scheduler management methods
def run_once
# Process one scheduling cycle
end
def close
# Cleanup scheduler resources
end
end
Error Handling Patterns
Error Type | When Raised | Handling Strategy |
---|---|---|
FiberError |
Invalid fiber operation | Graceful degradation |
IOError |
I/O operation failure | Retry or fallback |
Timeout::Error |
Operation timeout | Cancel and cleanup |
SystemCallError |
System call failure | Log and continue |
Common Scheduler Configurations
# Development configuration
development_scheduler = Scheduler.new(
max_fibers: 50,
debug_mode: true,
timeout_default: 5
)
# Production configuration
production_scheduler = Scheduler.new(
max_fibers: 1000,
timeout_default: 30,
error_handler: Logger.method(:error),
metrics_enabled: true
)
# Testing configuration
test_scheduler = Scheduler.new(
max_fibers: 10,
deterministic: true,
timeout_default: 1
)
Performance Benchmarks
Operation | Baseline (ms) | Scheduled (ms) | Overhead |
---|---|---|---|
sleep(0.1) |
100.0 | 100.2 | 0.2% |
File.read |
5.0 | 5.1 | 2% |
Net::HTTP.get |
150.0 | 152.0 | 1.3% |
Fiber switching | 0.001 | 0.003 | 200% |
Memory Usage Guidelines
Scheduler Type | Memory per Fiber | Recommended Limit |
---|---|---|
Basic scheduler | 2-4 KB | 500 fibers |
I/O scheduler | 4-8 KB | 250 fibers |
Full-featured | 8-16 KB | 100 fibers |
Production | 16-32 KB | 50 fibers |
Integration Examples
# Async gem integration
require 'async'
Fiber.scheduler = Async::Scheduler.new
# Use existing async patterns
# Custom scheduler with existing event loop
class EventMachineScheduler
def initialize
@em_running = EM.reactor_running?
end
def kernel_sleep(duration)
EM::Timer.new(duration) { Fiber.current.resume }
Fiber.yield
end
end