CrackedRuby logo

CrackedRuby

Fiber Scheduler Performance

Ruby's fiber scheduler implementation for non-blocking I/O operations and asynchronous concurrency patterns.

Concurrency and Parallelism Fibers
6.3.7

Overview

Fiber schedulers in Ruby provide a mechanism for implementing non-blocking I/O operations through cooperative multitasking. The scheduler intercepts blocking operations like file reads, network requests, and sleep calls, suspending the current fiber and yielding control to other fibers until the operation completes.

Ruby implements fiber schedulers through the Fiber.set_scheduler method, which accepts any object implementing the scheduler interface. The scheduler receives callbacks when fibers attempt blocking operations, enabling the scheduler to manage multiple concurrent operations without blocking the entire thread.

require 'async'

# Basic fiber scheduler setup
Async do |task|
  # This runs in a fiber managed by the Async scheduler
  response = task.async do
    Net::HTTP.get(URI('https://example.com'))
  end
  puts response.wait
end

The scheduler interface defines methods like io_wait, kernel_sleep, block, and unblock that handle different types of blocking operations. When a fiber calls a method that would normally block, Ruby checks for an active scheduler and delegates the operation to it instead of blocking the thread.

class BasicScheduler
  def io_wait(fiber, io, events, timeout)
    # Suspend fiber until I/O is ready
    IO.select([io], [], [], timeout)
  end
  
  def kernel_sleep(duration)
    # Non-blocking sleep implementation
    Thread.new { sleep(duration) }
  end
end

Fiber.set_scheduler(BasicScheduler.new)

Fiber schedulers excel in I/O-bound applications where traditional threading would create significant overhead. The scheduler maintains a queue of suspended fibers and resumes them when their blocking operations complete, achieving high concurrency with minimal memory overhead compared to thread-based approaches.

Basic Usage

Setting up a fiber scheduler requires implementing the scheduler interface methods and registering the scheduler with the current thread. The async gem provides a production-ready scheduler implementation that handles most common use cases.

require 'async'
require 'async/http/internet'

# HTTP requests with fiber scheduler
Async do
  internet = Async::HTTP::Internet.new
  
  # Sequential-looking code that runs concurrently
  response1 = internet.get('https://api.github.com/users/octocat')
  response2 = internet.get('https://api.github.com/users/defunkt')
  
  puts response1.read
  puts response2.read
ensure
  internet&.close
end

File operations automatically become non-blocking when a scheduler is active. The scheduler intercepts calls to File.read, IO#read, and similar methods, suspending the fiber until the operation completes.

Async do |task|
  # Multiple file reads execute concurrently
  files = ['file1.txt', 'file2.txt', 'file3.txt']
  
  tasks = files.map do |filename|
    task.async do
      File.read(filename)
    end
  end
  
  # Collect results as they complete
  contents = tasks.map(&:wait)
  puts contents
end

Database operations integrate with fiber schedulers through connection adapters that support async operations. The pg gem with async-postgres provides non-blocking PostgreSQL access.

require 'async'
require 'async/postgres'

Async do
  endpoint = Async::Postgres::Endpoint.new("postgresql://localhost/mydb")
  
  endpoint.with do |connection|
    # Multiple queries execute concurrently
    result1 = connection.async_exec("SELECT * FROM users WHERE active = true")
    result2 = connection.async_exec("SELECT COUNT(*) FROM orders")
    
    puts result1.to_a
    puts result2.to_a
  end
end

Nested async blocks create child tasks that inherit the parent scheduler context. Child tasks can spawn their own async operations, creating a hierarchy of concurrent operations.

Async do |parent_task|
  parent_task.async do |child_task|
    # Nested async operations
    child_task.async { sleep(1); puts "Task 1" }
    child_task.async { sleep(2); puts "Task 2" }
    child_task.async { sleep(0.5); puts "Task 3" }
  end.wait
  
  puts "All child tasks completed"
end
# Output: Task 3, Task 1, Task 2, All child tasks completed

Thread Safety & Concurrency

Fiber schedulers operate within a single thread but manage multiple fibers, creating concurrency without traditional threading concerns. However, shared state between fibers requires careful management since fiber switches can occur at any point during I/O operations.

require 'async'

class Counter
  def initialize
    @value = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize do
      current = @value
      # Fiber switch could occur here during I/O
      sleep(0.001) # Simulated I/O operation
      @value = current + 1
    end
  end
  
  attr_reader :value
end

# Race condition demonstration
Async do |task|
  counter = Counter.new
  
  # Multiple fibers incrementing shared state
  10.times do
    task.async { counter.increment }
  end
end

Unlike preemptive threading, fiber schedulers only switch contexts during blocking operations, reducing but not eliminating race conditions. Mutations that span multiple operations still require synchronization.

class SafeFiberQueue
  def initialize
    @items = []
    @waiting_fibers = []
  end
  
  def push(item)
    @items << item
    # Wake up a waiting fiber if any
    if fiber = @waiting_fibers.shift
      fiber.resume
    end
  end
  
  def pop
    return @items.shift unless @items.empty?
    
    # Suspend current fiber until item available
    @waiting_fibers << Fiber.current
    Fiber.yield
    @items.shift
  end
end

Cross-fiber communication often uses channels or queues that handle fiber suspension and resumption. The scheduler coordinates these operations without exposing low-level fiber management.

require 'async'

Async do |task|
  channel = Async::Channel.new
  
  # Producer fiber
  task.async do
    5.times do |i|
      channel.send("Message #{i}")
      sleep(0.1)
    end
    channel.close
  end
  
  # Consumer fiber
  task.async do
    while message = channel.receive
      puts "Received: #{message}"
    end
  end
end

Scheduler implementations must handle fiber lifecycle management correctly to prevent memory leaks and ensure proper cleanup. Dead fibers should be removed from scheduler data structures.

class CustomScheduler
  def initialize
    @waiting_fibers = {}
    @io_objects = {}
  end
  
  def io_wait(fiber, io, events, timeout)
    @waiting_fibers[fiber] = { io: io, events: events, timeout: timeout }
    @io_objects[io] = fiber
    
    # Setup cleanup for dead fibers
    fiber.add_cleanup_proc { cleanup_fiber(fiber) }
  end
  
  private
  
  def cleanup_fiber(fiber)
    if io_info = @waiting_fibers.delete(fiber)
      @io_objects.delete(io_info[:io])
    end
  end
end

Performance & Memory

Fiber schedulers provide significant performance advantages for I/O-bound applications by eliminating thread creation overhead and reducing context switching costs. A single thread can manage thousands of concurrent fibers with minimal memory usage.

require 'async'
require 'async/http/internet'
require 'benchmark'

# Performance comparison: sequential vs concurrent requests
urls = Array.new(100) { |i| "https://httpbin.org/delay/#{rand(3)}" }

# Sequential execution
sequential_time = Benchmark.measure do
  urls.each do |url|
    Net::HTTP.get(URI(url))
  end
end

# Concurrent execution with fiber scheduler
concurrent_time = Benchmark.measure do
  Async do
    internet = Async::HTTP::Internet.new
    
    tasks = urls.map do |url|
      Async { internet.get(url) }
    end
    
    tasks.map(&:wait)
  ensure
    internet&.close
  end
end

puts "Sequential: #{sequential_time.real}s"
puts "Concurrent: #{concurrent_time.real}s"
# Typical results: Sequential: 150s, Concurrent: 3s

Memory usage scales linearly with the number of concurrent fibers rather than the exponential growth seen with thread-based concurrency. Each fiber consumes approximately 4KB of stack space compared to 2MB for threads.

require 'async'

# Memory usage comparison
def measure_memory
  GC.start
  `ps -o rss= -p #{Process.pid}`.to_i
end

baseline_memory = measure_memory

# Create 1000 concurrent fibers
Async do |task|
  1000.times do
    task.async do
      sleep(10) # Keep fibers alive for measurement
    end
  end
  
  fiber_memory = measure_memory
  puts "Memory per fiber: #{(fiber_memory - baseline_memory) / 1000} KB"
  # Typical result: Memory per fiber: ~4 KB
end

Scheduler overhead varies based on implementation complexity and the number of I/O operations being managed. The async gem scheduler optimizes performance through efficient event loop management and fiber pooling.

require 'async'
require 'benchmark/ips'

# Scheduler overhead measurement
Benchmark.ips do |x|
  x.config(time: 5, warmup: 2)
  
  x.report("direct sleep") do
    sleep(0.001)
  end
  
  x.report("scheduled sleep") do
    Async do
      sleep(0.001)
    end
  end
  
  x.compare!
end
# Results show scheduler overhead typically <10% for I/O operations

Large-scale concurrent operations require careful resource management to prevent excessive memory usage from accumulated results or pending operations.

require 'async'
require 'async/semaphore'

class BatchProcessor
  def initialize(concurrency_limit: 100)
    @semaphore = Async::Semaphore.new(concurrency_limit)
  end
  
  def process_urls(urls)
    Async do |task|
      urls.each_slice(1000) do |batch|
        batch_tasks = batch.map do |url|
          @semaphore.async do
            process_single_url(url)
          end
        end
        
        # Process in batches to limit memory usage
        batch_results = batch_tasks.map(&:wait)
        yield batch_results
      end
    end
  end
  
  private
  
  def process_single_url(url)
    # Process individual URL
    Net::HTTP.get(URI(url))
  end
end

Production Patterns

Production applications using fiber schedulers typically implement connection pooling, error recovery, and monitoring to handle real-world deployment challenges. The async gem provides production-ready patterns for common scenarios.

require 'async'
require 'async/http/internet'
require 'async/pool'

class APIClient
  def initialize(pool_size: 10, timeout: 30)
    @pool = Async::Pool.new(pool_size) do
      Async::HTTP::Internet.new
    end
    @timeout = timeout
  end
  
  def fetch_user_data(user_ids)
    Async do |task|
      user_ids.map do |user_id|
        task.with_timeout(@timeout) do
          @pool.acquire do |internet|
            response = internet.get("https://api.example.com/users/#{user_id}")
            JSON.parse(response.read)
          end
        end
      end.map(&:wait)
    end
  rescue Async::TimeoutError => e
    Rails.logger.error("API timeout: #{e.message}")
    raise
  end
  
  def close
    @pool.close
  end
end

Database connection management becomes critical in production environments where connection limits constrain concurrent operations. Connection pools must integrate with the fiber scheduler.

require 'async'
require 'async/postgres'

class DatabaseService
  def initialize(database_url, pool_size: 25)
    @endpoint = Async::Postgres::Endpoint.new(database_url)
    @pool_size = pool_size
  end
  
  def execute_batch_queries(queries)
    Async do |task|
      @endpoint.with_pool(limit: @pool_size) do |pool|
        tasks = queries.map do |query|
          task.async do
            pool.with do |connection|
              connection.async_exec(query[:sql], query[:params])
            end
          end
        end
        
        tasks.map(&:wait)
      end
    end
  end
  
  def health_check
    Async do
      @endpoint.with do |connection|
        result = connection.async_exec("SELECT 1")
        result.ntuples > 0
      end
    end
  rescue => e
    Rails.logger.error("Database health check failed: #{e.message}")
    false
  end
end

Monitoring and observability require specialized approaches since traditional thread-based monitoring tools don't capture fiber-level metrics accurately.

require 'async'

class PerformanceMonitor
  def initialize
    @metrics = {
      active_fibers: 0,
      completed_operations: 0,
      failed_operations: 0,
      average_response_time: 0
    }
  end
  
  def track_operation(name)
    start_time = Time.now
    @metrics[:active_fibers] += 1
    
    begin
      result = yield
      @metrics[:completed_operations] += 1
      result
    rescue => e
      @metrics[:failed_operations] += 1
      raise
    ensure
      duration = Time.now - start_time
      update_average_response_time(duration)
      @metrics[:active_fibers] -= 1
      
      # Log metrics periodically
      log_metrics if @metrics[:completed_operations] % 100 == 0
    end
  end
  
  private
  
  def update_average_response_time(duration)
    total_ops = @metrics[:completed_operations] + @metrics[:failed_operations]
    current_avg = @metrics[:average_response_time]
    @metrics[:average_response_time] = (current_avg * (total_ops - 1) + duration) / total_ops
  end
  
  def log_metrics
    Rails.logger.info("Fiber metrics: #{@metrics}")
  end
end

Load balancing across multiple processes requires careful coordination since fiber schedulers operate within single processes. Process-level distribution handles scaling beyond single-process limits.

# config/puma.rb - Production configuration
require 'async'

# Enable fiber scheduler in Puma workers
on_worker_boot do
  # Initialize scheduler-aware connection pools
  ActiveRecord::Base.establish_connection
  
  # Setup Redis connection pool for scheduler
  $redis_pool = ConnectionPool.new(size: 25) do
    Redis.new(url: ENV['REDIS_URL'])
  end
end

# Application controller integration
class ApplicationController < ActionController::Base
  around_action :with_fiber_scheduler
  
  private
  
  def with_fiber_scheduler
    Async do
      yield
    end
  rescue => e
    Rails.logger.error("Fiber scheduler error: #{e.message}")
    raise
  end
end

Error Handling & Debugging

Error handling in fiber-scheduled code requires understanding how exceptions propagate through the scheduler and async operations. Unhandled exceptions in fibers can terminate the entire scheduler context if not properly caught.

require 'async'

# Exception propagation in fiber hierarchies
Async do |parent_task|
  begin
    child_task = parent_task.async do
      sleep(1)
      raise StandardError, "Child fiber error"
    end
    
    child_task.wait # Exception propagates here
  rescue StandardError => e
    puts "Caught child error: #{e.message}"
    # Parent continues executing
  end
  
  puts "Parent task completed"
end

Timeout handling becomes complex when dealing with nested async operations. Each level of nesting can have its own timeout, requiring careful coordination to prevent resource leaks.

require 'async'

class TimeoutHandler
  def self.with_nested_timeouts
    Async do |task|
      begin
        # Outer timeout: 10 seconds total
        task.with_timeout(10) do
          results = []
          
          # Process items with individual timeouts
          %w[item1 item2 item3].each do |item|
            result = task.with_timeout(3) do
              process_item(item)
            end
            results << result
          end
          
          results
        end
      rescue Async::TimeoutError => e
        # Determine which timeout triggered
        if e.message.include?('parent')
          puts "Overall operation timed out"
        else
          puts "Individual item timed out"
        end
        
        # Cleanup partial results
        cleanup_resources
        raise
      end
    end
  end
  
  private
  
  def self.process_item(item)
    # Simulated processing that might timeout
    sleep(rand(5))
    "processed #{item}"
  end
  
  def self.cleanup_resources
    # Close connections, release locks, etc.
    puts "Cleaning up resources after timeout"
  end
end

Debugging fiber-based code requires specialized techniques since traditional stack traces don't show the full execution context across fiber boundaries.

require 'async'

class FiberDebugger
  def self.enable_debugging
    # Enhanced stack traces for fibers
    Thread.current[:fiber_stack] = []
  end
  
  def self.debug_async_operation(name)
    Thread.current[:fiber_stack] << {
      name: name,
      fiber: Fiber.current,
      backtrace: caller
    }
    
    begin
      result = yield
      puts "#{name} completed successfully"
      result
    rescue => e
      print_fiber_stack_trace(e)
      raise
    ensure
      Thread.current[:fiber_stack].pop
    end
  end
  
  private
  
  def self.print_fiber_stack_trace(error)
    puts "Error in fiber hierarchy:"
    puts "  #{error.class}: #{error.message}"
    
    Thread.current[:fiber_stack].reverse.each_with_index do |frame, index|
      puts "  #{index}: #{frame[:name]} (Fiber #{frame[:fiber].object_id})"
      frame[:backtrace].first(3).each do |line|
        puts "    #{line}"
      end
    end
  end
end

# Usage example
FiberDebugger.enable_debugging

Async do |task|
  FiberDebugger.debug_async_operation("API batch processing") do
    task.async do
      FiberDebugger.debug_async_operation("Individual API call") do
        # This will provide enhanced debugging if an error occurs
        raise "API call failed"
      end
    end.wait
  end
end

Resource leaks in scheduler-based code often manifest as fibers that never terminate or I/O objects that remain open. Implementing fiber lifecycle tracking helps identify these issues.

class ResourceTracker
  def initialize
    @active_resources = {}
    @fiber_count = 0
  end
  
  def track_fiber_resource(resource_type, resource)
    fiber_id = Fiber.current.object_id
    @active_resources[fiber_id] ||= []
    @active_resources[fiber_id] << { type: resource_type, resource: resource }
    @fiber_count += 1
    
    # Setup cleanup when fiber completes
    Fiber.current.add_finalizer do
      cleanup_fiber_resources(fiber_id)
    end
  end
  
  def report_resource_usage
    puts "Active fibers: #{@fiber_count}"
    puts "Resources by type:"
    
    resource_counts = Hash.new(0)
    @active_resources.values.flatten.each do |item|
      resource_counts[item[:type]] += 1
    end
    
    resource_counts.each do |type, count|
      puts "  #{type}: #{count}"
    end
  end
  
  private
  
  def cleanup_fiber_resources(fiber_id)
    resources = @active_resources.delete(fiber_id) || []
    resources.each do |item|
      case item[:type]
      when :http_connection
        item[:resource].close rescue nil
      when :file_handle
        item[:resource].close rescue nil
      when :database_connection
        item[:resource].disconnect rescue nil
      end
    end
    @fiber_count -= 1
  end
end

Reference

Core Scheduler Interface

Method Parameters Returns Description
Fiber.set_scheduler(scheduler) scheduler (Object) Object Sets scheduler for current thread
Fiber.scheduler None Object | nil Returns current thread's scheduler
scheduler.io_wait(fiber, io, events, timeout) fiber (Fiber), io (IO), events (Integer), timeout (Float) Boolean Handle I/O waiting for fiber
scheduler.kernel_sleep(duration) duration (Float) void Handle non-blocking sleep
scheduler.block(blocker, timeout) blocker (Object), timeout (Float) void Block fiber on custom condition
scheduler.unblock(blocker, fiber) blocker (Object), fiber (Fiber) void Unblock specific fiber

Async Gem Methods

Method Parameters Returns Description
Async { block } block (Proc) Task Creates async task with scheduler
Async(annotation) { block } annotation (String), block (Proc) Task Creates annotated async task
task.async { block } block (Proc) Task Creates child async task
task.wait None Object Waits for task completion
task.with_timeout(duration) { block } duration (Float), block (Proc) Object Executes block with timeout
task.sleep(duration) duration (Float) void Non-blocking sleep

Connection and Pool Classes

Class Purpose Key Methods
Async::HTTP::Internet HTTP client with connection pooling get, post, put, delete, close
Async::Pool Generic resource pool acquire, release, close
Async::Semaphore Concurrency limiter acquire, async, count, limit
Async::Channel Fiber communication channel send, receive, close
Async::Postgres::Endpoint PostgreSQL connection endpoint with, with_pool, close

Exception Hierarchy

StandardError
├── Async::Stop
├── Async::TimeoutError
├── Async::Wrapper::Cancelled
└── IO::Error (scheduler I/O failures)

Performance Characteristics

Operation Type Scheduler Overhead Memory Usage Concurrency Limit
HTTP Requests 5-10% ~4KB per fiber 1000s concurrent
Database Queries 2-5% ~6KB per fiber Limited by pool size
File Operations <2% ~3KB per fiber OS file descriptor limit
Sleep/Timers <1% ~2KB per fiber 10000s concurrent

Scheduler Implementation Checklist

  • Implement required interface methods (io_wait, kernel_sleep)
  • Handle fiber lifecycle and cleanup
  • Manage I/O multiplexing (select/poll/epoll)
  • Implement timeout mechanisms
  • Track active fibers and resources
  • Handle scheduler shutdown gracefully
  • Provide debugging and monitoring hooks
  • Thread-safe scheduler state management

Common Configuration Patterns

# Production Async HTTP setup
internet = Async::HTTP::Internet.new(
  pool_size: 100,
  timeout: 30,
  retries: 3,
  keep_alive: true
)

# Database connection pool
endpoint = Async::Postgres::Endpoint.new(
  "postgresql://user:pass@host/db",
  pool_size: 25,
  timeout: 10
)

# Semaphore for rate limiting
limiter = Async::Semaphore.new(
  limit: 50,
  parent: current_task
)

Debugging Environment Variables

Variable Effect Values
ASYNC_DEBUG Enable debug logging 1, true
ASYNC_POOL_DEBUG Pool allocation tracking 1, true
ASYNC_METHOD_MISSING_DEBUG Method resolution debugging 1, true
RUBY_FIBER_VM_STACK_SIZE Fiber stack size Integer (bytes)