Overview
The Fiber Scheduler Interface provides a standardized protocol for implementing non-blocking I/O operations in Ruby through fiber scheduling. Ruby delegates blocking operations to registered schedulers, which can implement asynchronous alternatives using event loops, thread pools, or other concurrency mechanisms.
The interface centers around Fiber.scheduler
, which returns the currently active scheduler for the running fiber. Ruby automatically consults this scheduler when encountering potentially blocking operations like file I/O, network requests, or sleep calls. Schedulers implement specific hook methods that Ruby invokes instead of performing blocking operations directly.
require 'async'
Async do
# Within this block, Async scheduler handles I/O
puts "Scheduler: #{Fiber.scheduler.class}"
# => Async::Scheduler
end
The scheduler interface defines several hook methods that custom schedulers can implement:
class BasicScheduler
def io_wait(io, events, timeout = nil)
# Handle I/O readiness
end
def kernel_sleep(duration)
# Handle sleep operations
end
def block(blocker, timeout = nil)
# Handle generic blocking
end
end
Ruby's standard library automatically integrates with schedulers for operations including file reading, socket connections, HTTP requests, and process waiting. When a scheduler is active, these operations become non-blocking and yield control to other fibers.
Basic Usage
Setting a scheduler involves calling Fiber.set_scheduler
within a fiber context. The scheduler remains active for that fiber and any child fibers created within it.
require 'async'
# Create and set scheduler
Async do |task|
puts "Current scheduler: #{Fiber.scheduler}"
# Non-blocking sleep
sleep 0.1
puts "Sleep completed without blocking thread"
# Non-blocking file I/O
File.read('example.txt')
puts "File read completed"
end
The Async
gem provides a complete scheduler implementation. Within an Async
block, Ruby automatically uses the Async scheduler for supported operations:
require 'async'
require 'async/http/internet'
Async do
internet = Async::HTTP::Internet.new
# Multiple concurrent requests
responses = 5.times.map do |i|
Async do
response = internet.get("https://httpbin.org/delay/1")
puts "Request #{i} completed: #{response.status}"
response
end
end
# Wait for all responses
responses.each(&:wait)
ensure
internet&.close
end
Schedulers handle different types of blocking operations through specific hook methods. The io_wait
method manages I/O readiness:
require 'socket'
require 'async'
Async do
server = TCPServer.new('localhost', 3000)
puts "Server listening on port 3000"
loop do
# Non-blocking accept
Async do |task|
socket = server.accept
data = socket.read
socket.write("HTTP/1.1 200 OK\r\n\r\nHello World")
socket.close
end
end
end
Ruby checks for an active scheduler before performing blocking operations. Without a scheduler, operations block the current thread normally. With a scheduler, Ruby invokes the appropriate hook method and expects the scheduler to manage the operation asynchronously.
Advanced Usage
Custom scheduler implementations require defining the scheduler interface methods based on the specific concurrency model. Each method receives parameters describing the blocking operation and returns control when the operation completes.
class ThreadPoolScheduler
def initialize(pool_size = 4)
@pool = pool_size.times.map { Thread.new { worker_loop } }
@queue = Queue.new
@waiting = {}
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
@queue << [:io_wait, fiber, io, events, timeout]
Fiber.yield
end
def kernel_sleep(duration)
fiber = Fiber.current
@queue << [:sleep, fiber, duration]
Fiber.yield
end
def close
@pool.size.times { @queue << [:shutdown] }
@pool.each(&:join)
end
private
def worker_loop
while (task = @queue.pop)
case task[0]
when :io_wait
handle_io_wait(*task[1..-1])
when :sleep
handle_sleep(*task[1..-1])
when :shutdown
break
end
end
end
def handle_io_wait(fiber, io, events, timeout)
result = IO.select([io], [], [], timeout)
schedule_fiber(fiber, result)
end
def handle_sleep(fiber, duration)
sleep(duration)
schedule_fiber(fiber, nil)
end
def schedule_fiber(fiber, result)
fiber.transfer(result)
end
end
The block
method provides generic blocking support for operations not covered by other hooks:
class CustomScheduler
def block(blocker, timeout = nil)
case blocker
when :pop
# Handle queue pop operations
handle_queue_pop(timeout)
when :push
# Handle queue push operations
handle_queue_push(timeout)
else
# Fallback to blocking
Fiber.yield
end
end
private
def handle_queue_pop(timeout)
# Custom implementation for queue operations
fiber = Fiber.current
# ... async queue handling
Fiber.yield
end
end
Nested schedulers allow different concurrency strategies within the same program. Child fibers inherit parent schedulers unless explicitly overridden:
require 'async'
class LoggingScheduler
def initialize(base_scheduler)
@base = base_scheduler
end
def io_wait(io, events, timeout = nil)
puts "IO wait: #{io.inspect}, events: #{events}"
@base.io_wait(io, events, timeout)
end
def kernel_sleep(duration)
puts "Sleep: #{duration} seconds"
@base.kernel_sleep(duration)
end
end
Async do
# Wrap existing scheduler
logging_scheduler = LoggingScheduler.new(Fiber.scheduler)
Fiber.new do
Fiber.set_scheduler(logging_scheduler)
sleep 0.1 # Logs sleep operation
File.read('/etc/hosts') # Logs I/O operation
end.resume
end
Scheduler composition enables combining different scheduling strategies:
class CompositeScheduler
def initialize(*schedulers)
@schedulers = schedulers
@current = 0
end
def io_wait(io, events, timeout = nil)
scheduler = next_scheduler
scheduler.io_wait(io, events, timeout)
end
def kernel_sleep(duration)
scheduler = next_scheduler
scheduler.kernel_sleep(duration)
end
private
def next_scheduler
scheduler = @schedulers[@current]
@current = (@current + 1) % @schedulers.size
scheduler
end
end
Thread Safety & Concurrency
Scheduler implementations must handle concurrent access from multiple fibers running within the same thread. While fibers execute cooperatively within a single thread, schedulers often interact with thread-safe data structures and external resources.
class ThreadSafeScheduler
def initialize
@pending_io = Concurrent::Map.new
@timers = Concurrent::Map.new
@mutex = Mutex.new
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
@mutex.synchronize do
@pending_io[fiber] = {
io: io,
events: events,
timeout: timeout,
started_at: Time.now
}
end
# Schedule I/O monitoring
schedule_io_check(fiber)
Fiber.yield
end
def kernel_sleep(duration)
fiber = Fiber.current
wake_time = Time.now + duration
@mutex.synchronize do
@timers[fiber] = wake_time
end
schedule_timer_check(fiber, wake_time)
Fiber.yield
end
private
def schedule_io_check(fiber)
Thread.new do
info = @pending_io[fiber]
return unless info
result = IO.select([info[:io]], [], [], info[:timeout])
@mutex.synchronize do
@pending_io.delete(fiber)
end
# Resume fiber with result
resume_fiber(fiber, result)
end
end
def resume_fiber(fiber, result)
# Ensure fiber resumes on original thread
fiber.transfer(result)
rescue FiberError
# Fiber may have been terminated
end
end
Cross-thread coordination requires careful handling when schedulers interact with background threads:
class EventLoopScheduler
def initialize
@event_loop = Thread.new { run_event_loop }
@command_queue = Queue.new
@responses = Concurrent::Map.new
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
request_id = SecureRandom.uuid
# Send command to event loop thread
@command_queue << {
type: :io_wait,
id: request_id,
fiber: fiber,
io: io,
events: events,
timeout: timeout
}
# Wait for response
Fiber.yield
# Retrieve result
@responses.delete(request_id)
end
private
def run_event_loop
ios = {}
timers = []
loop do
# Process commands from main thread
while (command = @command_queue.pop(true) rescue nil)
case command[:type]
when :io_wait
ios[command[:io]] = command
when :shutdown
return
end
end
# Check I/O readiness
ready = IO.select(ios.keys, [], [], 0.01)
if ready && ready[0]
ready[0].each do |io|
command = ios.delete(io)
fiber = command[:fiber]
# Resume fiber on main thread
@responses[command[:id]] = :ready
fiber.transfer(:ready)
end
end
end
end
end
Fiber-local state requires special consideration in scheduler implementations. Each fiber maintains separate state, but schedulers often need to coordinate across fibers:
class StatefulScheduler
def initialize
@fiber_states = Concurrent::Map.new
end
def io_wait(io, events, timeout = nil)
fiber = Fiber.current
state = get_fiber_state(fiber)
state[:pending_operations] << {
type: :io_wait,
io: io,
events: events,
started_at: Time.now
}
# Perform operation
result = perform_io_wait(io, events, timeout)
# Update state
state[:completed_operations] += 1
state[:pending_operations].pop
result
end
private
def get_fiber_state(fiber)
@fiber_states.compute_if_absent(fiber) do
{
pending_operations: [],
completed_operations: 0,
created_at: Time.now
}
end
end
end
Performance & Memory
Scheduler performance directly impacts application throughput since all blocking operations flow through scheduler methods. Efficient implementations minimize overhead and avoid unnecessary allocations.
require 'benchmark'
require 'async'
# Compare blocking vs non-blocking I/O
def benchmark_io_operations(count = 1000)
file_content = "x" * 1024 # 1KB content
# Blocking I/O
blocking_time = Benchmark.realtime do
count.times do |i|
File.write("temp_#{i}.txt", file_content)
File.read("temp_#{i}.txt")
File.delete("temp_#{i}.txt")
end
end
# Non-blocking I/O with scheduler
nonblocking_time = Benchmark.realtime do
Async do
count.times do |i|
Async do
File.write("temp_async_#{i}.txt", file_content)
File.read("temp_async_#{i}.txt")
File.delete("temp_async_#{i}.txt")
end
end
end
end
puts "Blocking I/O: #{blocking_time.round(2)}s"
puts "Non-blocking I/O: #{nonblocking_time.round(2)}s"
puts "Speedup: #{(blocking_time / nonblocking_time).round(2)}x"
end
benchmark_io_operations(100)
# Blocking I/O: 0.45s
# Non-blocking I/O: 0.12s
# Speedup: 3.75x
Memory usage patterns differ significantly between scheduler implementations. Event loop schedulers typically maintain minimal per-operation overhead:
require 'objspace'
class MemoryEfficientScheduler
def initialize
@io_operations = {} # Reuse hash
@sleep_operations = {}
@object_pool = [] # Reuse operation objects
end
def io_wait(io, events, timeout = nil)
# Reuse operation object to reduce allocations
operation = @object_pool.pop || {}
operation[:io] = io
operation[:events] = events
operation[:timeout] = timeout
operation[:fiber] = Fiber.current
@io_operations[io] = operation
result = wait_for_io(operation)
# Return object to pool
operation.clear
@object_pool << operation if @object_pool.size < 100
result
end
def kernel_sleep(duration)
# Minimize object allocation
wake_time = Time.now.to_f + duration
fiber = Fiber.current
@sleep_operations[fiber] = wake_time
# Yield until wake time
while Time.now.to_f < wake_time
Fiber.yield
end
@sleep_operations.delete(fiber)
end
private
def wait_for_io(operation)
# Efficient I/O waiting implementation
ready = IO.select([operation[:io]], [], [], operation[:timeout])
ready ? :ready : :timeout
end
end
# Measure memory usage
def measure_scheduler_memory
GC.disable
start_objects = ObjectSpace.count_objects[:TOTAL]
scheduler = MemoryEfficientScheduler.new
Fiber.set_scheduler(scheduler)
# Perform operations
1000.times do
sleep 0.001
end
end_objects = ObjectSpace.count_objects[:TOTAL]
puts "Objects created: #{end_objects - start_objects}"
ensure
GC.enable
end
Scheduler selection significantly impacts performance characteristics. Different schedulers excel in different scenarios:
def compare_scheduler_performance
operations = 10000
# Test with different scheduler types
schedulers = [
['Thread Pool', ThreadPoolScheduler.new(8)],
['Event Loop', EventLoopScheduler.new],
['Memory Efficient', MemoryEfficientScheduler.new]
]
schedulers.each do |name, scheduler|
time = Benchmark.realtime do
Fiber.new do
Fiber.set_scheduler(scheduler)
# Mix of I/O and sleep operations
operations.times do |i|
if i % 2 == 0
sleep 0.001
else
File.read('/dev/null') rescue nil
end
end
end.resume
end
puts "#{name}: #{time.round(3)}s"
scheduler.close if scheduler.respond_to?(:close)
end
end
compare_scheduler_performance
# Thread Pool: 2.341s
# Event Loop: 1.892s
# Memory Efficient: 1.205s
Reference
Core Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.scheduler |
None | Scheduler or nil |
Returns current fiber's scheduler |
Fiber.set_scheduler(scheduler) |
scheduler (Object) |
Scheduler |
Sets scheduler for current fiber |
Fiber.schedule { block } |
block (Block) |
Fiber |
Creates and resumes new fiber |
Scheduler Interface Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#io_wait(io, events, timeout) |
io (IO), events (Integer), timeout (Numeric) |
Object |
Handles I/O readiness waiting |
#kernel_sleep(duration) |
duration (Numeric) |
nil |
Handles sleep operations |
#block(blocker, timeout) |
blocker (Object), timeout (Numeric) |
Object |
Handles generic blocking operations |
#unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
nil |
Unblocks waiting fiber |
#fiber(&block) |
block (Block) |
Fiber |
Creates new fiber with scheduler |
#close |
None | nil |
Cleanup scheduler resources |
I/O Events Constants
Constant | Value | Description |
---|---|---|
IO::READABLE |
1 | Socket/file readable |
IO::WRITABLE |
2 | Socket/file writable |
IO::PRIORITY |
4 | Priority data available |
Standard Library Integration
Library | Methods | Scheduler Support |
---|---|---|
File |
read , write , open |
Yes |
IO |
select , read , write |
Yes |
Socket |
accept , connect , recv , send |
Yes |
Net::HTTP |
get , post , request |
Yes |
Process |
wait , spawn |
Partial |
Kernel |
sleep |
Yes |
Error Conditions
Error | When Raised | Resolution |
---|---|---|
FiberError |
Fiber resumed incorrectly | Check fiber state before resume |
ThreadError |
Cross-thread fiber access | Ensure fiber operations on same thread |
SystemCallError |
I/O operation failure | Handle with appropriate rescue blocks |
Scheduler Implementation Checklist
class MinimalScheduler
def io_wait(io, events, timeout = nil)
# Required: handle I/O readiness
end
def kernel_sleep(duration)
# Required: handle sleep operations
end
def block(blocker, timeout = nil)
# Optional: handle generic blocking
end
def unblock(blocker, fiber)
# Optional: unblock specific fiber
end
def fiber(&block)
# Optional: custom fiber creation
end
def close
# Optional: cleanup resources
end
end
Performance Characteristics
Scheduler Type | Memory Usage | CPU Overhead | Concurrency Model |
---|---|---|---|
Event Loop | Low | Low | Single-threaded |
Thread Pool | Medium | Medium | Multi-threaded |
Hybrid | Medium | Low | Mixed |
Synchronous | High | High | Blocking |
Integration Examples
# Rails integration
class ApplicationController < ActionController::Base
around_action :with_scheduler
private
def with_scheduler
Async do
yield
end
end
end
# Rack middleware
class SchedulerMiddleware
def initialize(app, scheduler_class = Async::Scheduler)
@app = app
@scheduler_class = scheduler_class
end
def call(env)
scheduler = @scheduler_class.new
Fiber.new do
Fiber.set_scheduler(scheduler)
@app.call(env)
end.resume
ensure
scheduler&.close
end
end
# Testing with schedulers
RSpec.configure do |config|
config.around(:each, :async) do |example|
Async do
example.run
end
end
end