Overview
Ruby's Fiber Scheduler provides a mechanism for implementing non-blocking I/O operations using fibers instead of threads. The scheduler intercepts blocking I/O calls and yields control to other fibers when operations would block, allowing for cooperative concurrency without the overhead of thread switching.
The Fiber::Scheduler class serves as the base interface for implementing custom schedulers. When a scheduler is active, Ruby automatically hooks into I/O operations like socket reads, file access, and sleep calls, delegating the actual scheduling to the scheduler implementation.
require 'async'
Async do |task|
task.async do
puts "Starting first operation"
sleep 1
puts "First operation complete"
end
task.async do
puts "Starting second operation"
sleep 1
puts "Second operation complete"
end
end
The scheduler manages fiber execution by responding to blocking operations. When a fiber encounters a potentially blocking call, the scheduler can either handle it asynchronously or allow it to block. Common scheduler implementations include async, nio4r-based schedulers, and event-driven systems.
Ruby integrates scheduler hooks directly into core I/O methods including IO#read
, IO#write
, Socket#accept
, Kernel#sleep
, and DNS resolution methods. The scheduler receives callbacks with operation details and can choose how to handle each blocking scenario.
class BasicScheduler
def initialize
@blocked = {}
@ready = []
end
def block(blocker, timeout = nil)
@blocked[Fiber.current] = blocker
Fiber.yield
end
def unblock(blocker, fiber)
@blocked.delete(fiber)
@ready << fiber
end
def run
until @ready.empty? && @blocked.empty?
@ready.shift&.resume while @ready.any?
# Process I/O events here
end
end
end
Schedulers must implement specific interface methods to handle different I/O scenarios. The io_wait
method handles socket and file descriptor waiting, kernel_sleep
manages time-based delays, and address_resolve
handles DNS lookups. Each method receives relevant parameters and should return control appropriately.
Basic Usage
Setting up a fiber scheduler requires choosing an implementation and establishing the scheduler context. The most common approach uses the async gem, which provides a production-ready scheduler with comprehensive I/O handling.
require 'async'
require 'async/http/internet'
Async do
internet = Async::HTTP::Internet.new
response = internet.get("https://httpbin.org/delay/2")
puts "Received: #{response.read}"
ensure
internet&.close
end
Custom scheduler implementations inherit from Fiber::Scheduler and override specific methods based on the I/O operations they need to support. The scheduler activates when set as the current fiber's scheduler using Fiber.set_scheduler
.
class SimpleScheduler < Fiber::Scheduler
def initialize
@selectables = {}
end
def io_wait(io, events, timeout)
@selectables[io] = events
Fiber.yield
end
def kernel_sleep(duration)
# Handle sleep without blocking other fibers
Thread.new { sleep(duration); Fiber.current.resume }
Fiber.yield
end
def run
loop do
ready, = IO.select(@selectables.keys, nil, nil, 0.1)
ready&.each do |io|
fiber = @selectables.delete(io)
fiber.resume if fiber
end
break if @selectables.empty?
end
end
end
Multiple fibers can run concurrently within a single scheduler context. Each fiber operates independently, yielding control when encountering blocking operations. The scheduler coordinates execution and ensures fair distribution of processing time.
scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)
fibers = 5.times.map do |i|
Fiber.new do
puts "Fiber #{i} starting"
sleep(rand(0.5..2.0))
puts "Fiber #{i} completed"
end
end
fibers.each(&:resume)
scheduler.run
File operations integrate seamlessly with fiber schedulers. Reading from multiple files simultaneously becomes non-blocking when proper scheduler hooks are in place. The scheduler manages file descriptor readiness and resumes fibers when data becomes available.
Async do |task|
files = %w[file1.txt file2.txt file3.txt]
files.each do |filename|
task.async do
content = File.read(filename)
puts "#{filename}: #{content.length} bytes"
rescue Errno::ENOENT
puts "#{filename}: not found"
end
end
end
Network operations benefit significantly from scheduler-based concurrency. Socket connections, HTTP requests, and database queries can run simultaneously without thread overhead. Each operation yields appropriately when waiting for network responses.
Thread Safety & Concurrency
Fiber schedulers operate within a single thread, eliminating many traditional thread safety concerns. However, shared state between fibers still requires careful consideration since fiber switching can occur at yield points during I/O operations.
Scheduler implementations must handle concurrent access to internal data structures. Multiple fibers may simultaneously register I/O interests or modify scheduler state. Using thread-safe collections or proper synchronization prevents data corruption.
class ThreadSafeScheduler < Fiber::Scheduler
def initialize
@io_map = Concurrent::Map.new
@timers = Concurrent::Array.new
@mutex = Mutex.new
end
def io_wait(io, events, timeout)
@mutex.synchronize do
@io_map[io] = { fiber: Fiber.current, events: events }
end
Fiber.yield
end
def process_ready_ios
ready_ios = []
@mutex.synchronize do
@io_map.each { |io, data| ready_ios << [io, data] }
end
ready_ios.each do |io, data|
data[:fiber].resume if io_ready?(io, data[:events])
end
end
end
Cross-fiber communication requires careful synchronization mechanisms. Fiber-safe queues, channels, or semaphores ensure proper message passing without race conditions. Direct fiber-to-fiber communication should avoid shared mutable state.
class FiberChannel
def initialize
@queue = Queue.new
@waiting_readers = []
end
def send(message)
if @waiting_readers.any?
fiber = @waiting_readers.shift
fiber.resume(message)
else
@queue << message
end
end
def receive
return @queue.pop(true) unless @queue.empty?
@waiting_readers << Fiber.current
Fiber.yield
end
end
Scheduler lifecycle management becomes critical in concurrent scenarios. Proper cleanup prevents resource leaks and ensures graceful shutdown. Schedulers should track active fibers and provide mechanisms for orderly termination.
class ManagedScheduler < Fiber::Scheduler
def initialize
@active_fibers = Set.new
@shutdown_requested = false
end
def fiber_create(&block)
fiber = Fiber.new do
@active_fibers.add(Fiber.current)
begin
yield
ensure
@active_fibers.delete(Fiber.current)
end
end
fiber.resume
end
def shutdown
@shutdown_requested = true
@active_fibers.each(&:kill)
@active_fibers.clear
end
def run
until @shutdown_requested && @active_fibers.empty?
process_events
break if @shutdown_requested && @active_fibers.empty?
end
end
end
Database connection pooling requires special attention in fiber-scheduled environments. Connection pools designed for threads may not work correctly with fiber switching. Fiber-aware connection management ensures proper resource allocation and prevents connection leaks.
class FiberConnectionPool
def initialize(size = 10)
@pool = Queue.new
@waiting_fibers = Queue.new
size.times { @pool << create_connection }
end
def acquire
return @pool.pop(true) unless @pool.empty?
@waiting_fibers << Fiber.current
Fiber.yield
end
def release(connection)
if @waiting_fibers.empty?
@pool << connection
else
fiber = @waiting_fibers.pop(true)
fiber.resume(connection)
end
rescue ThreadError
@pool << connection
end
end
Performance & Memory
Fiber schedulers provide significant performance advantages over thread-based concurrency for I/O-intensive applications. Context switching between fibers incurs minimal overhead compared to thread switching, allowing applications to handle thousands of concurrent operations efficiently.
Memory consumption scales favorably with fiber-based scheduling. Each fiber requires approximately 4KB of stack space compared to 1-2MB per thread. This difference becomes substantial when managing hundreds or thousands of concurrent connections.
require 'benchmark'
require 'async'
def measure_memory_usage
before = `ps -o rss= -p #{Process.pid}`.to_i
yield
after = `ps -o rss= -p #{Process.pid}`.to_i
after - before
end
# Fiber-based approach
fiber_memory = measure_memory_usage do
Async do |task|
1000.times do |i|
task.async do
sleep 0.1
# Simulate I/O work
end
end
end
end
puts "Fiber approach: #{fiber_memory}KB"
CPU utilization patterns differ significantly between threaded and fiber-based approaches. Schedulers maintain active processing on a single thread, avoiding context switching overhead but requiring efficient event loop implementations to prevent blocking.
Performance optimization focuses on minimizing scheduler overhead and maximizing I/O throughput. Efficient event polling, batch processing of ready operations, and careful memory management in scheduler data structures directly impact application performance.
class OptimizedScheduler < Fiber::Scheduler
BATCH_SIZE = 100
def initialize
@io_selector = IO::Select.new
@ready_queue = []
@batch_buffer = []
end
def process_events
# Process events in batches to reduce overhead
ready_count = @io_selector.select(@batch_buffer, 0.01)
return unless ready_count > 0
@batch_buffer.take(ready_count).each do |io|
fiber = @io_map.delete(io)
@ready_queue << fiber if fiber
end
process_ready_fibers
end
def process_ready_fibers
processed = 0
while fiber = @ready_queue.shift
fiber.resume
processed += 1
break if processed >= BATCH_SIZE
end
end
end
Profiling fiber-scheduled applications requires specialized tools and techniques. Traditional thread-based profilers may not accurately represent fiber execution patterns. Custom instrumentation and fiber-aware profiling help identify performance bottlenecks.
class InstrumentedScheduler < Fiber::Scheduler
def initialize
super
@stats = {
io_waits: 0,
sleeps: 0,
fiber_switches: 0,
total_wait_time: 0
}
end
def io_wait(io, events, timeout)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@stats[:io_waits] += 1
result = super
wait_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
@stats[:total_wait_time] += wait_time
result
end
def report_stats
puts "Scheduler Statistics:"
@stats.each { |key, value| puts " #{key}: #{value}" }
end
end
Memory leak prevention requires careful attention to fiber lifecycle management and resource cleanup. Long-running schedulers must properly dispose of completed fiber contexts and avoid retaining references to finished operations.
Production Patterns
Production deployments of fiber schedulers require robust error handling and recovery mechanisms. Failed fibers should not crash the entire scheduler, and proper logging helps diagnose issues in concurrent environments.
class ProductionScheduler < Fiber::Scheduler
def initialize(logger: Logger.new(STDOUT))
super()
@logger = logger
@error_handler = method(:default_error_handler)
end
def fiber_create(&block)
Fiber.new do
begin
yield
rescue => error
@error_handler.call(error, Fiber.current)
end
end
end
def default_error_handler(error, fiber)
@logger.error "Fiber error: #{error.message}"
@logger.error error.backtrace.join("\n")
# Optionally restart the fiber or perform cleanup
end
def set_error_handler(&block)
@error_handler = block
end
end
Monitoring fiber scheduler performance in production requires custom metrics and observability tools. Key metrics include active fiber count, I/O wait times, scheduler loop iteration frequency, and error rates.
class MonitoredScheduler < Fiber::Scheduler
def initialize(metrics_reporter)
super()
@metrics = metrics_reporter
@start_time = Time.now
end
def run
loop_count = 0
loop do
iteration_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
process_events
loop_count += 1
if loop_count % 1000 == 0
@metrics.gauge('scheduler.active_fibers', @active_fibers.size)
@metrics.gauge('scheduler.loop_frequency', loop_count / (Time.now - @start_time))
end
iteration_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - iteration_start
@metrics.histogram('scheduler.iteration_time', iteration_time)
break if should_shutdown?
end
end
end
Integration with application servers like Puma or Unicorn requires careful coordination between server processes and fiber schedulers. Each worker process should manage its own scheduler instance to avoid cross-process interference.
# config.ru
class Application
def initialize
@scheduler = AsyncScheduler.new
end
def call(env)
Fiber.set_scheduler(@scheduler)
response = Fiber.new do
# Process request with fiber scheduling
handle_request(env)
end.resume
response
ensure
Fiber.set_scheduler(nil)
end
private
def handle_request(env)
case env['PATH_INFO']
when '/slow'
# Simulate slow I/O operation
sleep 2
[200, {}, ['Slow response']]
else
[200, {}, ['Fast response']]
end
end
end
run Application.new
Health checks and readiness probes must account for scheduler state and fiber execution. Standard HTTP health endpoints should verify scheduler responsiveness and detect deadlock conditions.
class HealthCheckScheduler < Fiber::Scheduler
def initialize
super
@last_activity = Time.now
@health_check_fiber = nil
end
def start_health_monitoring
@health_check_fiber = Fiber.new do
loop do
sleep 10
check_scheduler_health
end
end.resume
end
def check_scheduler_health
time_since_activity = Time.now - @last_activity
if time_since_activity > 30
Rails.logger.warn "Scheduler inactive for #{time_since_activity}s"
# Trigger alerting or restart logic
end
@last_activity = Time.now
end
def healthy?
return false unless @health_check_fiber&.alive?
(Time.now - @last_activity) < 60
end
end
Database connection management in production requires special considerations for connection pooling, transaction handling, and connection health monitoring within fiber contexts.
class DatabaseScheduler < Fiber::Scheduler
def initialize(pool_size = 20)
super()
@connection_pool = ConnectionPool.new(size: pool_size) do
PG.connect(host: 'localhost', dbname: 'app_production')
end
end
def with_connection
connection = @connection_pool.checkout
# Ensure connection is healthy
connection.exec('SELECT 1') rescue reconnect(connection)
yield connection
ensure
@connection_pool.checkin(connection) if connection
end
private
def reconnect(connection)
connection.reset rescue nil
connection = PG.connect(host: 'localhost', dbname: 'app_production')
end
end
Reference
Core Scheduler Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#io_wait(io, events, timeout) |
io (IO), events (Integer), timeout (Numeric) |
Integer or nil |
Blocks until IO ready for specified events |
#kernel_sleep(duration) |
duration (Numeric) |
nil |
Non-blocking sleep for specified duration |
#block(blocker, timeout) |
blocker (Object), timeout (Numeric) |
nil |
Generic blocking operation handler |
#unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
nil |
Resume blocked fiber |
#address_resolve(hostname) |
hostname (String) |
Array |
Non-blocking DNS resolution |
#timeout_after(duration, &block) |
duration (Numeric), block (Proc) |
Object |
Execute block with timeout |
Fiber Scheduler Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.set_scheduler(scheduler) |
scheduler (Fiber::Scheduler) |
Fiber::Scheduler |
Set scheduler for current fiber |
Fiber.scheduler |
None | Fiber::Scheduler or nil |
Get current fiber scheduler |
Fiber.blocking? |
None | Boolean |
Check if scheduler allows blocking |
Event Constants
Constant | Value | Description |
---|---|---|
IO::READABLE |
1 | IO ready for reading |
IO::WRITABLE |
2 | IO ready for writing |
IO::PRIORITY |
4 | Priority data available |
Common Scheduler Implementations
Library | Class | Features |
---|---|---|
async |
Async::Scheduler |
Full-featured production scheduler |
nio4r |
NIO::Selector |
Event-driven I/O selector |
evt |
Evt::Scheduler |
High-performance event scheduler |
Error Classes
Exception | Inherits From | Description |
---|---|---|
FiberError |
StandardError |
General fiber operation errors |
Fiber::SchedulerError |
FiberError |
Scheduler-specific errors |
IO::TimeoutError |
IOError |
I/O operation timeout |
Performance Benchmarks
Operation | Threads (1000) | Fibers (1000) | Memory Difference |
---|---|---|---|
Creation | ~2GB RAM | ~50MB RAM | 97% reduction |
Context Switch | ~10μs | ~0.1μs | 100x faster |
I/O Wait | Blocking | Non-blocking | N/A |
Configuration Options
Setting | Default | Range | Description |
---|---|---|---|
Stack Size | 4KB | 4KB-1MB | Fiber stack allocation |
Pool Size | CPU cores | 1-1000 | Connection pool size |
Timeout | 30s | 1s-300s | Default I/O timeout |
Debugging Commands
Method | Purpose | Example |
---|---|---|
Fiber.current |
Get current fiber | puts Fiber.current.inspect |
#backtrace |
Fiber stack trace | fiber.backtrace |
#alive? |
Check fiber status | fiber.alive? |
#status |
Fiber state | fiber.status |
Integration Patterns
Framework | Integration Method | Configuration |
---|---|---|
Rails | Middleware/Initializer | config.fiber_scheduler = true |
Sidekiq | Custom server middleware | Override job execution |
Puma | Server configuration | Multi-threaded with fiber schedulers |
Sinatra | Before filter | Set scheduler per request |