Overview
Fiber schedulers in Ruby provide a mechanism for implementing non-blocking I/O operations through cooperative multitasking. The scheduler intercepts blocking operations like file reads, network requests, and sleep calls, suspending the current fiber and yielding control to other fibers until the operation completes.
Ruby implements fiber schedulers through the Fiber.set_scheduler
method, which accepts any object implementing the scheduler interface. The scheduler receives callbacks when fibers attempt blocking operations, enabling the scheduler to manage multiple concurrent operations without blocking the entire thread.
require 'async'
# Basic fiber scheduler setup
Async do |task|
# This runs in a fiber managed by the Async scheduler
response = task.async do
Net::HTTP.get(URI('https://example.com'))
end
puts response.wait
end
The scheduler interface defines methods like io_wait
, kernel_sleep
, block
, and unblock
that handle different types of blocking operations. When a fiber calls a method that would normally block, Ruby checks for an active scheduler and delegates the operation to it instead of blocking the thread.
class BasicScheduler
def io_wait(fiber, io, events, timeout)
# Suspend fiber until I/O is ready
IO.select([io], [], [], timeout)
end
def kernel_sleep(duration)
# Non-blocking sleep implementation
Thread.new { sleep(duration) }
end
end
Fiber.set_scheduler(BasicScheduler.new)
Fiber schedulers excel in I/O-bound applications where traditional threading would create significant overhead. The scheduler maintains a queue of suspended fibers and resumes them when their blocking operations complete, achieving high concurrency with minimal memory overhead compared to thread-based approaches.
Basic Usage
Setting up a fiber scheduler requires implementing the scheduler interface methods and registering the scheduler with the current thread. The async
gem provides a production-ready scheduler implementation that handles most common use cases.
require 'async'
require 'async/http/internet'
# HTTP requests with fiber scheduler
Async do
internet = Async::HTTP::Internet.new
# Sequential-looking code that runs concurrently
response1 = internet.get('https://api.github.com/users/octocat')
response2 = internet.get('https://api.github.com/users/defunkt')
puts response1.read
puts response2.read
ensure
internet&.close
end
File operations automatically become non-blocking when a scheduler is active. The scheduler intercepts calls to File.read
, IO#read
, and similar methods, suspending the fiber until the operation completes.
Async do |task|
# Multiple file reads execute concurrently
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = files.map do |filename|
task.async do
File.read(filename)
end
end
# Collect results as they complete
contents = tasks.map(&:wait)
puts contents
end
Database operations integrate with fiber schedulers through connection adapters that support async operations. The pg
gem with async-postgres
provides non-blocking PostgreSQL access.
require 'async'
require 'async/postgres'
Async do
endpoint = Async::Postgres::Endpoint.new("postgresql://localhost/mydb")
endpoint.with do |connection|
# Multiple queries execute concurrently
result1 = connection.async_exec("SELECT * FROM users WHERE active = true")
result2 = connection.async_exec("SELECT COUNT(*) FROM orders")
puts result1.to_a
puts result2.to_a
end
end
Nested async blocks create child tasks that inherit the parent scheduler context. Child tasks can spawn their own async operations, creating a hierarchy of concurrent operations.
Async do |parent_task|
parent_task.async do |child_task|
# Nested async operations
child_task.async { sleep(1); puts "Task 1" }
child_task.async { sleep(2); puts "Task 2" }
child_task.async { sleep(0.5); puts "Task 3" }
end.wait
puts "All child tasks completed"
end
# Output: Task 3, Task 1, Task 2, All child tasks completed
Thread Safety & Concurrency
Fiber schedulers operate within a single thread but manage multiple fibers, creating concurrency without traditional threading concerns. However, shared state between fibers requires careful management since fiber switches can occur at any point during I/O operations.
require 'async'
class Counter
def initialize
@value = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
current = @value
# Fiber switch could occur here during I/O
sleep(0.001) # Simulated I/O operation
@value = current + 1
end
end
attr_reader :value
end
# Race condition demonstration
Async do |task|
counter = Counter.new
# Multiple fibers incrementing shared state
10.times do
task.async { counter.increment }
end
end
Unlike preemptive threading, fiber schedulers only switch contexts during blocking operations, reducing but not eliminating race conditions. Mutations that span multiple operations still require synchronization.
class SafeFiberQueue
def initialize
@items = []
@waiting_fibers = []
end
def push(item)
@items << item
# Wake up a waiting fiber if any
if fiber = @waiting_fibers.shift
fiber.resume
end
end
def pop
return @items.shift unless @items.empty?
# Suspend current fiber until item available
@waiting_fibers << Fiber.current
Fiber.yield
@items.shift
end
end
Cross-fiber communication often uses channels or queues that handle fiber suspension and resumption. The scheduler coordinates these operations without exposing low-level fiber management.
require 'async'
Async do |task|
channel = Async::Channel.new
# Producer fiber
task.async do
5.times do |i|
channel.send("Message #{i}")
sleep(0.1)
end
channel.close
end
# Consumer fiber
task.async do
while message = channel.receive
puts "Received: #{message}"
end
end
end
Scheduler implementations must handle fiber lifecycle management correctly to prevent memory leaks and ensure proper cleanup. Dead fibers should be removed from scheduler data structures.
class CustomScheduler
def initialize
@waiting_fibers = {}
@io_objects = {}
end
def io_wait(fiber, io, events, timeout)
@waiting_fibers[fiber] = { io: io, events: events, timeout: timeout }
@io_objects[io] = fiber
# Setup cleanup for dead fibers
fiber.add_cleanup_proc { cleanup_fiber(fiber) }
end
private
def cleanup_fiber(fiber)
if io_info = @waiting_fibers.delete(fiber)
@io_objects.delete(io_info[:io])
end
end
end
Performance & Memory
Fiber schedulers provide significant performance advantages for I/O-bound applications by eliminating thread creation overhead and reducing context switching costs. A single thread can manage thousands of concurrent fibers with minimal memory usage.
require 'async'
require 'async/http/internet'
require 'benchmark'
# Performance comparison: sequential vs concurrent requests
urls = Array.new(100) { |i| "https://httpbin.org/delay/#{rand(3)}" }
# Sequential execution
sequential_time = Benchmark.measure do
urls.each do |url|
Net::HTTP.get(URI(url))
end
end
# Concurrent execution with fiber scheduler
concurrent_time = Benchmark.measure do
Async do
internet = Async::HTTP::Internet.new
tasks = urls.map do |url|
Async { internet.get(url) }
end
tasks.map(&:wait)
ensure
internet&.close
end
end
puts "Sequential: #{sequential_time.real}s"
puts "Concurrent: #{concurrent_time.real}s"
# Typical results: Sequential: 150s, Concurrent: 3s
Memory usage scales linearly with the number of concurrent fibers rather than the exponential growth seen with thread-based concurrency. Each fiber consumes approximately 4KB of stack space compared to 2MB for threads.
require 'async'
# Memory usage comparison
def measure_memory
GC.start
`ps -o rss= -p #{Process.pid}`.to_i
end
baseline_memory = measure_memory
# Create 1000 concurrent fibers
Async do |task|
1000.times do
task.async do
sleep(10) # Keep fibers alive for measurement
end
end
fiber_memory = measure_memory
puts "Memory per fiber: #{(fiber_memory - baseline_memory) / 1000} KB"
# Typical result: Memory per fiber: ~4 KB
end
Scheduler overhead varies based on implementation complexity and the number of I/O operations being managed. The async
gem scheduler optimizes performance through efficient event loop management and fiber pooling.
require 'async'
require 'benchmark/ips'
# Scheduler overhead measurement
Benchmark.ips do |x|
x.config(time: 5, warmup: 2)
x.report("direct sleep") do
sleep(0.001)
end
x.report("scheduled sleep") do
Async do
sleep(0.001)
end
end
x.compare!
end
# Results show scheduler overhead typically <10% for I/O operations
Large-scale concurrent operations require careful resource management to prevent excessive memory usage from accumulated results or pending operations.
require 'async'
require 'async/semaphore'
class BatchProcessor
def initialize(concurrency_limit: 100)
@semaphore = Async::Semaphore.new(concurrency_limit)
end
def process_urls(urls)
Async do |task|
urls.each_slice(1000) do |batch|
batch_tasks = batch.map do |url|
@semaphore.async do
process_single_url(url)
end
end
# Process in batches to limit memory usage
batch_results = batch_tasks.map(&:wait)
yield batch_results
end
end
end
private
def process_single_url(url)
# Process individual URL
Net::HTTP.get(URI(url))
end
end
Production Patterns
Production applications using fiber schedulers typically implement connection pooling, error recovery, and monitoring to handle real-world deployment challenges. The async
gem provides production-ready patterns for common scenarios.
require 'async'
require 'async/http/internet'
require 'async/pool'
class APIClient
def initialize(pool_size: 10, timeout: 30)
@pool = Async::Pool.new(pool_size) do
Async::HTTP::Internet.new
end
@timeout = timeout
end
def fetch_user_data(user_ids)
Async do |task|
user_ids.map do |user_id|
task.with_timeout(@timeout) do
@pool.acquire do |internet|
response = internet.get("https://api.example.com/users/#{user_id}")
JSON.parse(response.read)
end
end
end.map(&:wait)
end
rescue Async::TimeoutError => e
Rails.logger.error("API timeout: #{e.message}")
raise
end
def close
@pool.close
end
end
Database connection management becomes critical in production environments where connection limits constrain concurrent operations. Connection pools must integrate with the fiber scheduler.
require 'async'
require 'async/postgres'
class DatabaseService
def initialize(database_url, pool_size: 25)
@endpoint = Async::Postgres::Endpoint.new(database_url)
@pool_size = pool_size
end
def execute_batch_queries(queries)
Async do |task|
@endpoint.with_pool(limit: @pool_size) do |pool|
tasks = queries.map do |query|
task.async do
pool.with do |connection|
connection.async_exec(query[:sql], query[:params])
end
end
end
tasks.map(&:wait)
end
end
end
def health_check
Async do
@endpoint.with do |connection|
result = connection.async_exec("SELECT 1")
result.ntuples > 0
end
end
rescue => e
Rails.logger.error("Database health check failed: #{e.message}")
false
end
end
Monitoring and observability require specialized approaches since traditional thread-based monitoring tools don't capture fiber-level metrics accurately.
require 'async'
class PerformanceMonitor
def initialize
@metrics = {
active_fibers: 0,
completed_operations: 0,
failed_operations: 0,
average_response_time: 0
}
end
def track_operation(name)
start_time = Time.now
@metrics[:active_fibers] += 1
begin
result = yield
@metrics[:completed_operations] += 1
result
rescue => e
@metrics[:failed_operations] += 1
raise
ensure
duration = Time.now - start_time
update_average_response_time(duration)
@metrics[:active_fibers] -= 1
# Log metrics periodically
log_metrics if @metrics[:completed_operations] % 100 == 0
end
end
private
def update_average_response_time(duration)
total_ops = @metrics[:completed_operations] + @metrics[:failed_operations]
current_avg = @metrics[:average_response_time]
@metrics[:average_response_time] = (current_avg * (total_ops - 1) + duration) / total_ops
end
def log_metrics
Rails.logger.info("Fiber metrics: #{@metrics}")
end
end
Load balancing across multiple processes requires careful coordination since fiber schedulers operate within single processes. Process-level distribution handles scaling beyond single-process limits.
# config/puma.rb - Production configuration
require 'async'
# Enable fiber scheduler in Puma workers
on_worker_boot do
# Initialize scheduler-aware connection pools
ActiveRecord::Base.establish_connection
# Setup Redis connection pool for scheduler
$redis_pool = ConnectionPool.new(size: 25) do
Redis.new(url: ENV['REDIS_URL'])
end
end
# Application controller integration
class ApplicationController < ActionController::Base
around_action :with_fiber_scheduler
private
def with_fiber_scheduler
Async do
yield
end
rescue => e
Rails.logger.error("Fiber scheduler error: #{e.message}")
raise
end
end
Error Handling & Debugging
Error handling in fiber-scheduled code requires understanding how exceptions propagate through the scheduler and async operations. Unhandled exceptions in fibers can terminate the entire scheduler context if not properly caught.
require 'async'
# Exception propagation in fiber hierarchies
Async do |parent_task|
begin
child_task = parent_task.async do
sleep(1)
raise StandardError, "Child fiber error"
end
child_task.wait # Exception propagates here
rescue StandardError => e
puts "Caught child error: #{e.message}"
# Parent continues executing
end
puts "Parent task completed"
end
Timeout handling becomes complex when dealing with nested async operations. Each level of nesting can have its own timeout, requiring careful coordination to prevent resource leaks.
require 'async'
class TimeoutHandler
def self.with_nested_timeouts
Async do |task|
begin
# Outer timeout: 10 seconds total
task.with_timeout(10) do
results = []
# Process items with individual timeouts
%w[item1 item2 item3].each do |item|
result = task.with_timeout(3) do
process_item(item)
end
results << result
end
results
end
rescue Async::TimeoutError => e
# Determine which timeout triggered
if e.message.include?('parent')
puts "Overall operation timed out"
else
puts "Individual item timed out"
end
# Cleanup partial results
cleanup_resources
raise
end
end
end
private
def self.process_item(item)
# Simulated processing that might timeout
sleep(rand(5))
"processed #{item}"
end
def self.cleanup_resources
# Close connections, release locks, etc.
puts "Cleaning up resources after timeout"
end
end
Debugging fiber-based code requires specialized techniques since traditional stack traces don't show the full execution context across fiber boundaries.
require 'async'
class FiberDebugger
def self.enable_debugging
# Enhanced stack traces for fibers
Thread.current[:fiber_stack] = []
end
def self.debug_async_operation(name)
Thread.current[:fiber_stack] << {
name: name,
fiber: Fiber.current,
backtrace: caller
}
begin
result = yield
puts "#{name} completed successfully"
result
rescue => e
print_fiber_stack_trace(e)
raise
ensure
Thread.current[:fiber_stack].pop
end
end
private
def self.print_fiber_stack_trace(error)
puts "Error in fiber hierarchy:"
puts " #{error.class}: #{error.message}"
Thread.current[:fiber_stack].reverse.each_with_index do |frame, index|
puts " #{index}: #{frame[:name]} (Fiber #{frame[:fiber].object_id})"
frame[:backtrace].first(3).each do |line|
puts " #{line}"
end
end
end
end
# Usage example
FiberDebugger.enable_debugging
Async do |task|
FiberDebugger.debug_async_operation("API batch processing") do
task.async do
FiberDebugger.debug_async_operation("Individual API call") do
# This will provide enhanced debugging if an error occurs
raise "API call failed"
end
end.wait
end
end
Resource leaks in scheduler-based code often manifest as fibers that never terminate or I/O objects that remain open. Implementing fiber lifecycle tracking helps identify these issues.
class ResourceTracker
def initialize
@active_resources = {}
@fiber_count = 0
end
def track_fiber_resource(resource_type, resource)
fiber_id = Fiber.current.object_id
@active_resources[fiber_id] ||= []
@active_resources[fiber_id] << { type: resource_type, resource: resource }
@fiber_count += 1
# Setup cleanup when fiber completes
Fiber.current.add_finalizer do
cleanup_fiber_resources(fiber_id)
end
end
def report_resource_usage
puts "Active fibers: #{@fiber_count}"
puts "Resources by type:"
resource_counts = Hash.new(0)
@active_resources.values.flatten.each do |item|
resource_counts[item[:type]] += 1
end
resource_counts.each do |type, count|
puts " #{type}: #{count}"
end
end
private
def cleanup_fiber_resources(fiber_id)
resources = @active_resources.delete(fiber_id) || []
resources.each do |item|
case item[:type]
when :http_connection
item[:resource].close rescue nil
when :file_handle
item[:resource].close rescue nil
when :database_connection
item[:resource].disconnect rescue nil
end
end
@fiber_count -= 1
end
end
Reference
Core Scheduler Interface
Method | Parameters | Returns | Description |
---|---|---|---|
Fiber.set_scheduler(scheduler) |
scheduler (Object) |
Object |
Sets scheduler for current thread |
Fiber.scheduler |
None | Object | nil |
Returns current thread's scheduler |
scheduler.io_wait(fiber, io, events, timeout) |
fiber (Fiber), io (IO), events (Integer), timeout (Float) |
Boolean |
Handle I/O waiting for fiber |
scheduler.kernel_sleep(duration) |
duration (Float) |
void |
Handle non-blocking sleep |
scheduler.block(blocker, timeout) |
blocker (Object), timeout (Float) |
void |
Block fiber on custom condition |
scheduler.unblock(blocker, fiber) |
blocker (Object), fiber (Fiber) |
void |
Unblock specific fiber |
Async Gem Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Async { block } |
block (Proc) |
Task |
Creates async task with scheduler |
Async(annotation) { block } |
annotation (String), block (Proc) |
Task |
Creates annotated async task |
task.async { block } |
block (Proc) |
Task |
Creates child async task |
task.wait |
None | Object |
Waits for task completion |
task.with_timeout(duration) { block } |
duration (Float), block (Proc) |
Object |
Executes block with timeout |
task.sleep(duration) |
duration (Float) |
void |
Non-blocking sleep |
Connection and Pool Classes
Class | Purpose | Key Methods |
---|---|---|
Async::HTTP::Internet |
HTTP client with connection pooling | get , post , put , delete , close |
Async::Pool |
Generic resource pool | acquire , release , close |
Async::Semaphore |
Concurrency limiter | acquire , async , count , limit |
Async::Channel |
Fiber communication channel | send , receive , close |
Async::Postgres::Endpoint |
PostgreSQL connection endpoint | with , with_pool , close |
Exception Hierarchy
StandardError
├── Async::Stop
├── Async::TimeoutError
├── Async::Wrapper::Cancelled
└── IO::Error (scheduler I/O failures)
Performance Characteristics
Operation Type | Scheduler Overhead | Memory Usage | Concurrency Limit |
---|---|---|---|
HTTP Requests | 5-10% | ~4KB per fiber | 1000s concurrent |
Database Queries | 2-5% | ~6KB per fiber | Limited by pool size |
File Operations | <2% | ~3KB per fiber | OS file descriptor limit |
Sleep/Timers | <1% | ~2KB per fiber | 10000s concurrent |
Scheduler Implementation Checklist
- Implement required interface methods (
io_wait
,kernel_sleep
) - Handle fiber lifecycle and cleanup
- Manage I/O multiplexing (select/poll/epoll)
- Implement timeout mechanisms
- Track active fibers and resources
- Handle scheduler shutdown gracefully
- Provide debugging and monitoring hooks
- Thread-safe scheduler state management
Common Configuration Patterns
# Production Async HTTP setup
internet = Async::HTTP::Internet.new(
pool_size: 100,
timeout: 30,
retries: 3,
keep_alive: true
)
# Database connection pool
endpoint = Async::Postgres::Endpoint.new(
"postgresql://user:pass@host/db",
pool_size: 25,
timeout: 10
)
# Semaphore for rate limiting
limiter = Async::Semaphore.new(
limit: 50,
parent: current_task
)
Debugging Environment Variables
Variable | Effect | Values |
---|---|---|
ASYNC_DEBUG |
Enable debug logging | 1 , true |
ASYNC_POOL_DEBUG |
Pool allocation tracking | 1 , true |
ASYNC_METHOD_MISSING_DEBUG |
Method resolution debugging | 1 , true |
RUBY_FIBER_VM_STACK_SIZE |
Fiber stack size | Integer (bytes) |