Overview
Fiber scheduling in Ruby provides a mechanism for implementing cooperative multitasking and non-blocking I/O operations. The Fiber::Scheduler
interface allows custom schedulers to manage fiber execution, enabling applications to handle thousands of concurrent operations without traditional threading overhead.
Ruby's fiber scheduler operates through a hook-based system where the runtime delegates blocking operations to scheduler implementations. When a fiber encounters a blocking operation like network I/O, the scheduler can suspend the fiber and resume it when the operation completes, allowing other fibers to execute in the meantime.
The core components include the Fiber::Scheduler
class for implementing custom schedulers, Fiber.set_scheduler
for registering schedulers, and various scheduler hooks that Ruby calls during blocking operations. The scheduler interface defines methods for handling different types of blocking operations including I/O, timeouts, and process operations.
# Basic fiber scheduler setup
class BasicScheduler
def initialize
@ios = {}
@timeouts = []
end
def io_wait(io, events, timeout = nil)
# Handle I/O waiting logic
puts "Waiting for I/O on #{io} with events #{events}"
end
def kernel_sleep(duration)
# Handle sleep operations
puts "Sleeping for #{duration} seconds"
end
end
Fiber.set_scheduler(BasicScheduler.new)
Ruby checks for an active scheduler during blocking operations and delegates control when one exists. This delegation happens transparently, requiring no changes to existing code that performs blocking I/O operations.
# Scheduler automatically handles blocking operations
Fiber.schedule do
Net::HTTP.get(URI('https://example.com')) # Non-blocking with scheduler
sleep(1) # Non-blocking with scheduler
end
The scheduler interface supports various operation types including file I/O, network operations, process management, and time-based delays. Each operation type has corresponding scheduler methods that implementations can override to provide custom behavior.
Basic Usage
Implementing a fiber scheduler requires defining a class that responds to specific scheduler hook methods. Ruby calls these methods when fibers encounter blocking operations, allowing the scheduler to manage execution flow.
require 'fiber'
require 'io/wait'
class SimpleScheduler
def initialize
@ready_fibers = []
@ios = {}
@sleeping = []
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
@ios[io] = { fiber: fiber, events: events }
# Suspend current fiber
Fiber.yield
# Cleanup when resumed
@ios.delete(io)
end
def kernel_sleep(duration)
fiber = Fiber.current
wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
@sleeping << { fiber: fiber, wake_time: wake_time }
Fiber.yield
end
def run_loop
until @ready_fibers.empty? && @ios.empty? && @sleeping.empty?
# Resume ready fibers
@ready_fibers.shift&.resume until @ready_fibers.empty?
# Check for ready I/O operations
ready_ios = IO.select(@ios.keys, [], [], 0)
if ready_ios
ready_ios[0].each do |io|
fiber_info = @ios.delete(io)
@ready_fibers << fiber_info[:fiber] if fiber_info
end
end
# Wake sleeping fibers
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@sleeping.select! do |sleep_info|
if current_time >= sleep_info[:wake_time]
@ready_fibers << sleep_info[:fiber]
false
else
true
end
end
end
end
end
Setting up a scheduler requires calling Fiber.set_scheduler
with a scheduler instance. Once set, all blocking operations within scheduled fibers delegate to the scheduler automatically.
scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)
# Create scheduled fibers
3.times do |i|
Fiber.schedule do
puts "Fiber #{i} starting"
sleep(rand(1..3))
puts "Fiber #{i} finished"
end
end
# Run the scheduler loop
scheduler.run_loop
The Fiber.schedule
method creates fibers that automatically use the active scheduler. These fibers start immediately but yield control when encountering blocking operations, allowing the scheduler to manage execution.
Schedulers handle multiple concurrent operations by maintaining state about suspended fibers and the conditions that will wake them. The scheduler's run loop continually checks for completed operations and resumes the appropriate fibers.
# Multiple concurrent HTTP requests
require 'net/http'
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/3'
]
results = []
urls.each_with_index do |url, index|
Fiber.schedule do
start_time = Time.now
response = Net::HTTP.get_response(URI(url))
duration = Time.now - start_time
results << { index: index, status: response.code, duration: duration }
end
end
scheduler.run_loop
puts results.inspect
Thread Safety & Concurrency
Fiber schedulers operate within a single thread context, eliminating many traditional threading concerns while introducing scheduler-specific synchronization requirements. The scheduler manages cooperative multitasking where fibers voluntarily yield control, requiring careful handling of shared state.
Scheduler implementations must handle concurrent access to internal data structures safely. Multiple fibers may simultaneously invoke scheduler methods, requiring synchronization mechanisms to prevent race conditions in scheduler state.
class ThreadSafeScheduler
def initialize
@mutex = Mutex.new
@ios = {}
@sleeping = []
@ready_fibers = Queue.new
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
@mutex.synchronize do
@ios[io] = { fiber: fiber, events: events, timeout: timeout }
end
Fiber.yield
@mutex.synchronize do
@ios.delete(io)
end
end
def kernel_sleep(duration)
fiber = Fiber.current
wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
@mutex.synchronize do
@sleeping << { fiber: fiber, wake_time: wake_time }
end
Fiber.yield
end
def schedule_fiber(fiber)
@ready_fibers << fiber
end
def run_loop
loop do
# Process ready fibers
begin
while fiber = @ready_fibers.pop(non_block: true)
fiber.resume if fiber.alive?
end
rescue ThreadError
# Queue empty
end
# Check I/O and timeouts under lock
@mutex.synchronize do
check_ios
check_timeouts
break if @ios.empty? && @sleeping.empty? && @ready_fibers.empty?
end
sleep(0.001) # Small delay to prevent busy waiting
end
end
private
def check_ios
return if @ios.empty?
ready_ios = IO.select(@ios.keys, [], [], 0)
return unless ready_ios
ready_ios[0].each do |io|
fiber_info = @ios.delete(io)
@ready_fibers << fiber_info[:fiber] if fiber_info
end
end
def check_timeouts
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@sleeping.select! do |sleep_info|
if current_time >= sleep_info[:wake_time]
@ready_fibers << sleep_info[:fiber]
false
else
true
end
end
end
end
Fiber scheduling differs from preemptive threading because fibers yield control voluntarily. This cooperative model eliminates many race conditions but requires fibers to yield regularly to maintain system responsiveness.
Cross-fiber communication often requires explicit synchronization primitives. Shared variables between fibers need protection when the scheduler might context-switch during modifications.
# Fiber-safe shared counter
class FiberSafeCounter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
@count += 1
end
end
def value
@mutex.synchronize { @count }
end
end
counter = FiberSafeCounter.new
10.times do |i|
Fiber.schedule do
100.times do
counter.increment
Fiber.yield # Voluntary yield point
end
end
end
scheduler.run_loop
puts "Final count: #{counter.value}"
Deadlock prevention requires careful ordering of scheduler operations and avoiding situations where fibers wait indefinitely for conditions that cannot be satisfied. Schedulers should implement timeout mechanisms and detect circular dependencies.
Resource cleanup becomes critical when fibers terminate unexpectedly. Schedulers must track allocated resources and ensure proper cleanup when fibers complete or encounter errors.
Performance & Memory
Fiber scheduling provides significant performance advantages over traditional threading for I/O-bound applications. Fibers consume approximately 4KB of memory per instance compared to thread stacks that typically require 1-8MB, enabling applications to handle tens of thousands of concurrent operations.
Context switching between fibers occurs entirely in user space, eliminating kernel overhead associated with thread switching. This reduction results in microsecond-level context switches compared to millisecond-level thread switches.
require 'benchmark'
# Memory comparison between threads and fibers
def memory_usage
GC.start
`ps -o rss= -p #{Process.pid}`.to_i
end
# Thread-based approach
def thread_benchmark(count)
start_memory = memory_usage
threads = []
time = Benchmark.realtime do
count.times do |i|
threads << Thread.new do
sleep(0.1)
i * 2
end
end
threads.each(&:join)
end
end_memory = memory_usage
{ time: time, memory: end_memory - start_memory }
end
# Fiber-based approach
def fiber_benchmark(count)
scheduler = SimpleScheduler.new
Fiber.set_scheduler(scheduler)
start_memory = memory_usage
time = Benchmark.realtime do
count.times do |i|
Fiber.schedule do
sleep(0.1)
i * 2
end
end
scheduler.run_loop
end
end_memory = memory_usage
{ time: time, memory: end_memory - start_memory }
end
# Compare performance with 1000 concurrent operations
thread_result = thread_benchmark(1000)
fiber_result = fiber_benchmark(1000)
puts "Threads: #{thread_result[:time]}s, #{thread_result[:memory]}KB memory"
puts "Fibers: #{fiber_result[:time]}s, #{fiber_result[:memory]}KB memory"
Scheduler efficiency depends heavily on the run loop implementation. Busy-waiting schedulers waste CPU cycles, while overly passive schedulers introduce latency. Optimal schedulers balance responsiveness with CPU efficiency.
# Efficient scheduler with adaptive polling
class PerformantScheduler
def initialize
@ios = {}
@sleeping = []
@ready_fibers = []
@poll_timeout = 0.001
@max_batch_size = 100
end
def run_loop
loop do
batch_processed = process_ready_fibers
io_ready = check_io_operations
timeouts_ready = check_sleeping_fibers
# Adaptive polling - increase timeout when less active
if batch_processed + io_ready + timeouts_ready == 0
@poll_timeout = [@poll_timeout * 1.5, 0.1].min
break if should_exit?
sleep(@poll_timeout)
else
@poll_timeout = 0.001 # Reset to responsive polling
end
end
end
private
def process_ready_fibers
processed = 0
batch_size = [@ready_fibers.size, @max_batch_size].min
batch_size.times do
fiber = @ready_fibers.shift
if fiber&.alive?
fiber.resume
processed += 1
end
end
processed
end
def check_io_operations
return 0 if @ios.empty?
ready_ios = IO.select(@ios.keys, [], [], 0)
return 0 unless ready_ios
ready_ios[0].each do |io|
fiber_info = @ios.delete(io)
@ready_fibers << fiber_info[:fiber] if fiber_info
end
ready_ios[0].size
end
def check_sleeping_fibers
return 0 if @sleeping.empty?
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
woken = 0
@sleeping.select! do |sleep_info|
if current_time >= sleep_info[:wake_time]
@ready_fibers << sleep_info[:fiber]
woken += 1
false
else
true
end
end
woken
end
def should_exit?
@ready_fibers.empty? && @ios.empty? && @sleeping.empty?
end
end
Memory management in fiber schedulers requires attention to fiber lifecycle and resource cleanup. Long-running schedulers must prevent memory leaks from accumulated fiber state and ensure garbage collection of completed fibers.
Profiling fiber applications requires different techniques than traditional thread-based profiling. The cooperative nature of fibers means blocking operations yield immediately, affecting timing measurements and requiring scheduler-aware profiling tools.
Production Patterns
Production fiber scheduler implementations require robust error handling, monitoring capabilities, and integration with existing application frameworks. Real-world schedulers handle connection pooling, request routing, and graceful shutdown procedures.
Web applications benefit significantly from fiber scheduling when handling concurrent HTTP requests. The scheduler manages thousands of simultaneous connections without the memory overhead of thread-per-request architectures.
# Production-ready HTTP server with fiber scheduling
require 'socket'
require 'uri'
class FiberHTTPServer
def initialize(host: 'localhost', port: 3000, max_connections: 10000)
@host = host
@port = port
@max_connections = max_connections
@server = TCPServer.new(@host, @port)
@connections = Set.new
@stats = { requests: 0, errors: 0, connections: 0 }
@running = false
end
def start
@running = true
scheduler = ProductionScheduler.new
Fiber.set_scheduler(scheduler)
puts "Server starting on #{@host}:#{@port}"
# Main accept loop
Fiber.schedule do
accept_loop
end
# Statistics reporting
Fiber.schedule do
stats_loop
end
scheduler.run_loop
end
def stop
@running = false
@server.close
end
private
def accept_loop
while @running
begin
client = @server.accept
if @connections.size >= @max_connections
client.close
next
end
@connections << client
@stats[:connections] += 1
Fiber.schedule do
handle_client(client)
end
rescue => e
puts "Accept error: #{e.message}"
@stats[:errors] += 1
end
end
end
def handle_client(client)
begin
request = parse_request(client)
response = process_request(request)
send_response(client, response)
@stats[:requests] += 1
rescue => e
puts "Request error: #{e.message}"
@stats[:errors] += 1
ensure
client.close
@connections.delete(client)
end
end
def parse_request(client)
request_line = client.gets
return nil unless request_line
method, path, version = request_line.strip.split(' ')
headers = {}
while (line = client.gets.strip) && !line.empty?
key, value = line.split(': ', 2)
headers[key.downcase] = value
end
{ method: method, path: path, headers: headers }
end
def process_request(request)
case request[:path]
when '/health'
{ status: 200, body: 'OK' }
when '/stats'
{ status: 200, body: @stats.to_json, headers: { 'content-type' => 'application/json' } }
when '/slow'
sleep(1) # Simulated slow operation
{ status: 200, body: 'Completed slow operation' }
else
{ status: 404, body: 'Not Found' }
end
end
def send_response(client, response)
status_line = "HTTP/1.1 #{response[:status]} #{status_text(response[:status])}\r\n"
headers = response[:headers] || {}
headers['content-length'] = response[:body].bytesize.to_s
client.write(status_line)
headers.each { |key, value| client.write("#{key}: #{value}\r\n") }
client.write("\r\n")
client.write(response[:body])
end
def status_text(code)
case code
when 200 then 'OK'
when 404 then 'Not Found'
when 500 then 'Internal Server Error'
else 'Unknown'
end
end
def stats_loop
while @running
sleep(10)
puts "Stats: #{@stats}"
end
end
end
class ProductionScheduler
def initialize
@ios = {}
@sleeping = []
@ready_fibers = []
@shutdown = false
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
deadline = timeout ? Time.now + timeout : nil
@ios[io] = { fiber: fiber, events: events, deadline: deadline }
Fiber.yield
@ios.delete(io)
end
def kernel_sleep(duration)
fiber = Fiber.current
wake_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration
@sleeping << { fiber: fiber, wake_time: wake_time }
Fiber.yield
end
def run_loop
while !@shutdown || has_active_fibers?
process_ready_fibers
check_io_timeouts
check_sleeping_fibers
# Efficient I/O polling
unless @ios.empty?
ios = @ios.keys
ready_ios = IO.select(ios, [], [], 0.001)
if ready_ios
ready_ios[0].each do |io|
fiber_info = @ios.delete(io)
@ready_fibers << fiber_info[:fiber] if fiber_info
end
end
end
sleep(0.001) if @ready_fibers.empty? && @ios.empty?
end
end
def shutdown
@shutdown = true
end
private
def process_ready_fibers
while fiber = @ready_fibers.shift
fiber.resume if fiber.alive?
end
end
def check_io_timeouts
return if @ios.empty?
current_time = Time.now
@ios.each do |io, info|
if info[:deadline] && current_time > info[:deadline]
@ready_fibers << info[:fiber]
@ios.delete(io)
end
end
end
def check_sleeping_fibers
return if @sleeping.empty?
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@sleeping.select! do |sleep_info|
if current_time >= sleep_info[:wake_time]
@ready_fibers << sleep_info[:fiber]
false
else
true
end
end
end
def has_active_fibers?
!@ready_fibers.empty? || !@ios.empty? || !@sleeping.empty?
end
end
Database connection pooling with fiber schedulers requires special consideration. Traditional connection pools designed for threads may not work correctly with fiber scheduling due to different concurrency assumptions.
# Fiber-aware database connection pool
class FiberConnectionPool
def initialize(size: 10, &connection_factory)
@size = size
@connection_factory = connection_factory
@available = Queue.new
@busy = Set.new
@mutex = Mutex.new
# Pre-populate pool
@size.times do
@available << @connection_factory.call
end
end
def with_connection
connection = acquire_connection
begin
yield connection
ensure
release_connection(connection)
end
end
def stats
@mutex.synchronize do
{
total: @size,
available: @available.size,
busy: @busy.size
}
end
end
private
def acquire_connection
connection = @available.pop # Blocks until available
@mutex.synchronize do
@busy << connection
end
connection
end
def release_connection(connection)
@mutex.synchronize do
@busy.delete(connection)
end
@available << connection
end
end
# Usage in fiber-scheduled application
db_pool = FiberConnectionPool.new(size: 20) do
# Connection factory
SomeDatabase.connect(host: 'localhost', port: 5432)
end
# Handle multiple concurrent database queries
100.times do |i|
Fiber.schedule do
db_pool.with_connection do |conn|
result = conn.query("SELECT * FROM users WHERE id = #{i}")
puts "Query #{i} returned #{result.count} rows"
end
end
end
Monitoring fiber schedulers requires custom metrics that track fiber lifecycle, scheduler efficiency, and resource utilization. Traditional thread-based monitoring tools provide limited insight into fiber behavior.
Reference
Fiber::Scheduler Interface
The Fiber::Scheduler
class defines the interface that custom schedulers must implement. Ruby calls these methods during fiber execution when blocking operations occur.
Method | Parameters | Returns | Description |
---|---|---|---|
#io_wait(io, events, timeout) |
io (IO), events (Integer), timeout (Numeric/nil) |
Integer/nil |
Handle I/O waiting operations |
#kernel_sleep(duration) |
duration (Numeric) |
nil |
Handle sleep operations |
#block(blocker, timeout) |
blocker (Object), timeout (Numeric/nil) |
Object |
Handle generic blocking operations |
#unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
nil |
Unblock specific fiber |
#timeout_after(timeout, exception_class) |
timeout (Numeric), exception_class (Class) |
Object |
Handle timeout operations |
#process_wait(pid, flags) |
pid (Integer), flags (Integer) |
Process::Status |
Handle process waiting |
#io_select(readables, writables, exceptables, timeout) |
Arrays of IO objects, timeout (Numeric/nil) |
Array/nil |
Handle IO.select operations |
Fiber Class Methods
Core methods for managing fiber scheduling and scheduler lifecycle.
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.set_scheduler(scheduler) |
scheduler (Fiber::Scheduler/nil) |
Fiber::Scheduler/nil |
Set current thread's scheduler |
Fiber.scheduler |
None | Fiber::Scheduler/nil |
Get current thread's scheduler |
Fiber.schedule(&block) |
block (Proc) |
Fiber |
Create and schedule new fiber |
Fiber.blocking? |
None | Boolean |
Check if current fiber is blocking |
I/O Events Constants
Event types used with io_wait
method to specify the type of I/O operation to wait for.
Constant | Value | Description |
---|---|---|
IO::READABLE |
1 | Wait for readable data |
IO::WRITABLE |
2 | Wait for writable space |
IO::PRIORITY |
4 | Wait for priority data |
Scheduler Hook Methods
Optional methods that schedulers can implement to handle specific types of operations.
Method | Purpose | Required |
---|---|---|
#io_wait |
I/O operations | Yes |
#kernel_sleep |
Sleep operations | Yes |
#block |
Generic blocking | No |
#unblock |
Unblock operations | No |
#timeout_after |
Timeout handling | No |
#process_wait |
Process operations | No |
#io_select |
Select operations | No |
Error Handling
Common exceptions that may occur during fiber scheduling operations.
Exception | Trigger | Description |
---|---|---|
FiberError |
Invalid fiber operations | Base fiber exception class |
FiberError |
Dead fiber resume | Attempting to resume terminated fiber |
LocalJumpError |
Invalid yield | Yielding outside fiber context |
SystemCallError |
I/O failures | Operating system I/O errors |
Performance Characteristics
Typical performance metrics for fiber scheduling compared to threading.
Metric | Fibers | Threads | Improvement |
---|---|---|---|
Memory per unit | ~4KB | ~1-8MB | 250-2000x |
Context switch time | ~1μs | ~1ms | 1000x |
Creation overhead | ~1μs | ~100μs | 100x |
Maximum concurrent | ~100k+ | ~1k-10k | 10-100x |
Scheduler State Management
Recommended data structures and patterns for implementing scheduler state tracking.
State Type | Recommended Structure | Purpose |
---|---|---|
Ready fibers | Array or Queue |
Fibers ready to resume |
Waiting I/O | Hash (IO => fiber info) |
I/O wait tracking |
Sleeping fibers | Array of wake times |
Time-based delays |
Blocked operations | Hash (blocker => fiber) |
Generic blocking |
Integration Patterns
Common integration approaches for different application types.
Application Type | Scheduler Pattern | Key Considerations |
---|---|---|
Web servers | Event-driven with I/O polling | Connection limits, request routing |
Database apps | Connection pool aware | Pool sizing, query timeouts |
Background jobs | Queue-based processing | Job scheduling, error recovery |
API clients | Concurrent request batching | Rate limiting, timeout handling |