CrackedRuby logo

CrackedRuby

Event Loops

Event loops provide non-blocking I/O and asynchronous execution patterns for Ruby applications through reactor-based concurrency models.

Concurrency and Parallelism Async I/O
6.6.2

Overview

Event loops implement the reactor pattern in Ruby, managing I/O operations and callbacks without blocking the main execution thread. Ruby's event loop implementations coordinate between system-level I/O multiplexing and application-level callback execution, creating single-threaded concurrency through cooperative multitasking.

The core concept revolves around an event dispatcher that monitors multiple I/O sources simultaneously. When data becomes available or operations complete, the loop executes registered callbacks. This approach eliminates thread synchronization overhead while handling thousands of concurrent connections.

Ruby provides event loop functionality through several implementations. The Async gem offers a modern fiber-based approach, while EventMachine provides a more traditional callback-oriented model. The built-in IO.select forms the foundation for custom event loop implementations.

require 'async'

Async do
  task = Async do |task|
    task.sleep(1)
    puts "Async operation completed"
  end
  
  puts "Non-blocking execution continues"
  task.wait
end
# => Non-blocking execution continues
# => Async operation completed

Event loops excel at I/O-bound operations like network requests, file operations, and database queries. They transform blocking operations into asynchronous tasks that yield control during waiting periods. This design pattern maximizes CPU utilization by interleaving computation with I/O operations.

require 'async/http'

Async do
  internet = Async::HTTP::Internet.new
  
  responses = [
    internet.get("https://api.example1.com"),
    internet.get("https://api.example2.com"),
    internet.get("https://api.example3.com")
  ].map(&:wait)
  
  responses.each { |response| puts response.status }
ensure
  internet&.close
end

The fiber-based approach in modern Ruby event loops creates lightweight execution contexts. Each fiber represents an independent execution state that can be suspended and resumed. The event loop scheduler coordinates fiber execution based on I/O readiness and timer events.

Basic Usage

Event loop programming begins with creating an event loop context and defining asynchronous tasks within that context. The Async gem provides the most straightforward entry point for event loop programming in modern Ruby applications.

require 'async'

# Basic event loop with single task
Async do
  puts "Starting async task"
  Async.sleep(0.1)  # Non-blocking sleep
  puts "Task completed"
end

puts "Main thread continues immediately"

The Async block creates an event loop context where asynchronous operations execute cooperatively. Tasks within the block yield control during I/O operations, allowing other tasks to progress. The loop terminates when all tasks complete or encounter unhandled exceptions.

Multiple concurrent tasks execute within the same event loop through nested Async blocks. Each nested block creates an independent task that runs concurrently with other tasks in the same loop.

require 'async'

Async do |task|
  # Start multiple concurrent tasks
  tasks = 5.times.map do |i|
    task.async do
      puts "Task #{i} starting"
      Async.sleep(rand(0.1..0.5))
      puts "Task #{i} completed"
      i * 2
    end
  end
  
  # Wait for all tasks and collect results
  results = tasks.map(&:wait)
  puts "All tasks completed: #{results}"
end

File I/O operations integrate with event loops through asynchronous file handling. The event loop monitors file descriptor readiness and schedules callback execution when operations can proceed without blocking.

require 'async'
require 'async/io'

Async do |task|
  # Concurrent file operations
  files = ['file1.txt', 'file2.txt', 'file3.txt']
  
  file_tasks = files.map do |filename|
    task.async do
      File.open(filename, 'w') do |file|
        Async::IO::Generic.new(file).write("Data for #{filename}\n")
      end
      puts "Wrote #{filename}"
    end
  end
  
  file_tasks.each(&:wait)
  puts "All file operations completed"
end

Network operations represent the primary use case for event loops. HTTP clients and servers benefit significantly from non-blocking I/O patterns, handling multiple connections without thread proliferation.

require 'async/http'

Async do
  internet = Async::HTTP::Internet.new
  
  # Concurrent HTTP requests
  urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/1'
  ]
  
  start_time = Time.now
  
  responses = urls.map do |url|
    Async do
      response = internet.get(url)
      puts "#{url}: #{response.status}"
      response
    end
  end.map(&:wait)
  
  puts "Total time: #{Time.now - start_time} seconds"
ensure
  internet&.close
end

Thread Safety & Concurrency

Event loops in Ruby operate within single threads but provide concurrency through cooperative multitasking. This model eliminates most thread safety concerns while requiring careful attention to fiber coordination and shared state management.

Fiber-based concurrency in event loops shares memory space between all executing tasks. Variables defined outside task blocks remain accessible to all tasks, creating potential race conditions during yield points. Critical sections require explicit synchronization mechanisms.

require 'async'

class SharedCounter
  def initialize
    @count = 0
    @mutex = Async::Semaphore.new(1)
  end
  
  def increment
    @mutex.acquire do
      current = @count
      Async.sleep(0.001)  # Simulate processing delay
      @count = current + 1
    end
  end
  
  attr_reader :count
end

Async do |task|
  counter = SharedCounter.new
  
  # Multiple tasks incrementing shared counter
  tasks = 100.times.map do
    task.async { counter.increment }
  end
  
  tasks.each(&:wait)
  puts "Final count: #{counter.count}"  # Should be 100
end

Task communication patterns in event loops rely on channels, queues, and condition variables rather than traditional thread synchronization primitives. The Async gem provides fiber-safe implementations of these coordination mechanisms.

require 'async'
require 'async/queue'

Async do |task|
  queue = Async::Queue.new
  
  # Producer task
  producer = task.async do
    10.times do |i|
      queue.enqueue("Item #{i}")
      Async.sleep(0.1)
    end
    queue.enqueue(nil)  # Signal completion
  end
  
  # Consumer task
  consumer = task.async do
    while item = queue.dequeue
      break if item.nil?
      puts "Processing: #{item}"
      Async.sleep(0.05)  # Simulate work
    end
  end
  
  [producer, consumer].each(&:wait)
end

The event loop scheduler coordinates fiber execution through a priority system based on I/O readiness and timer expiration. Tasks voluntarily yield control during blocking operations, allowing the scheduler to activate ready tasks.

require 'async'

Async do |task|
  # Demonstrate fiber coordination
  barrier = Async::Barrier.new
  
  5.times do |i|
    barrier.async do
      puts "Task #{i} starting"
      Async.sleep(rand(0.1..0.3))
      puts "Task #{i} reaching barrier"
    end
  end
  
  puts "Waiting for all tasks to reach barrier"
  barrier.wait
  puts "All tasks synchronized"
end

Memory sharing between fibers requires careful state management. Unlike threads, fibers share the same heap and stack space, making memory corruption impossible but requiring coordination for mutable state access.

require 'async'

class TaskCoordinator
  def initialize
    @results = {}
    @condition = Async::Condition.new
  end
  
  def store_result(task_id, result)
    @results[task_id] = result
    @condition.broadcast if @results.size >= 3
  end
  
  def wait_for_completion
    @condition.wait while @results.size < 3
    @results.values
  end
end

Async do |task|
  coordinator = TaskCoordinator.new
  
  # Start worker tasks
  3.times do |i|
    task.async do
      result = perform_computation(i)
      coordinator.store_result(i, result)
    end
  end
  
  # Wait for all results
  results = coordinator.wait_for_completion
  puts "Collected results: #{results}"
end

def perform_computation(id)
  sleep(rand(0.1..0.5))
  id ** 2
end

Performance & Memory

Event loops optimize performance by eliminating thread creation overhead and context switching costs. A single event loop can handle thousands of concurrent operations using minimal system resources compared to traditional thread-per-connection models.

Memory usage in event loop applications remains constant regardless of concurrent operation count. Each fiber consumes approximately 4KB of stack space, while threads typically require 1-8MB per thread. This difference becomes significant when handling thousands of concurrent connections.

require 'async'
require 'benchmark'

def measure_memory(&block)
  GC.start
  before = GC.stat[:total_allocated_objects]
  block.call
  GC.start
  after = GC.stat[:total_allocated_objects]
  after - before
end

# Compare event loop vs thread performance
puts "Event Loop Memory Usage:"
event_loop_objects = measure_memory do
  Async do |task|
    1000.times.map do |i|
      task.async do
        Async.sleep(0.001)
        i * 2
      end
    end.map(&:wait)
  end
end

puts "Objects allocated: #{event_loop_objects}"

# Thread-based equivalent would create 1000 threads
puts "\nThread Memory Usage (estimated):"
puts "1000 threads × ~8MB = ~8GB memory"
puts "Event loops: ~4KB × 1000 = ~4MB memory"

CPU utilization efficiency stems from the cooperative multitasking model. Tasks yield control during I/O operations rather than consuming CPU cycles waiting. The event loop maximizes CPU usage by scheduling computation-ready tasks immediately.

require 'async'
require 'async/http'
require 'benchmark'

# Performance comparison for concurrent HTTP requests
urls = Array.new(50) { |i| "https://httpbin.org/delay/0.1?id=#{i}" }

# Event loop approach
event_loop_time = Benchmark.realtime do
  Async do
    internet = Async::HTTP::Internet.new
    
    responses = urls.map do |url|
      Async { internet.get(url) }
    end.map(&:wait)
    
    puts "Event loop processed #{responses.size} requests"
  ensure
    internet&.close
  end
end

puts "Event loop time: #{event_loop_time.round(2)} seconds"
puts "Theoretical sequential time: #{urls.size * 0.1} seconds"
puts "Speedup factor: #{(urls.size * 0.1 / event_loop_time).round(1)}x"

I/O multiplexing performance depends on the underlying system implementation. Event loops in Ruby utilize epoll on Linux, kqueue on BSD systems, and select on other platforms. These system calls monitor hundreds of file descriptors with minimal overhead.

require 'async'

class PerformanceMonitor
  def initialize
    @start_time = Time.now
    @operation_count = 0
  end
  
  def record_operation
    @operation_count += 1
  end
  
  def statistics
    elapsed = Time.now - @start_time
    ops_per_second = @operation_count / elapsed
    {
      elapsed: elapsed.round(2),
      operations: @operation_count,
      ops_per_second: ops_per_second.round(1)
    }
  end
end

Async do |task|
  monitor = PerformanceMonitor.new
  
  # Simulate high-frequency operations
  1000.times do |i|
    task.async do
      # Simulate I/O operation
      Async.sleep(0.001)
      monitor.record_operation
    end
  end
  
  # Monitor performance every second
  stats_task = task.async do
    loop do
      Async.sleep(1)
      stats = monitor.statistics
      puts "Operations: #{stats[:operations]}, Rate: #{stats[:ops_per_second]}/sec"
      break if stats[:operations] >= 1000
    end
  end
  
  stats_task.wait
  puts "Final statistics: #{monitor.statistics}"
end

Memory allocation patterns in event loops favor small, frequent allocations over large buffer pools. The garbage collector handles fiber stack cleanup efficiently, but developers should monitor object allocation rates during high-throughput operations.

Production Patterns

Production event loop applications require robust error handling, monitoring, and graceful shutdown mechanisms. Long-running services must handle connection failures, resource exhaustion, and system signal management without losing data or corrupting state.

Server applications using event loops typically implement connection pooling and resource management patterns. Database connections, HTTP clients, and file handles require careful lifecycle management to prevent resource leaks.

require 'async'
require 'async/http'

class HTTPService
  def initialize(max_connections: 100)
    @semaphore = Async::Semaphore.new(max_connections)
    @internet = nil
  end
  
  def start
    @internet = Async::HTTP::Internet.new
  end
  
  def stop
    @internet&.close
    @internet = nil
  end
  
  def fetch_with_limit(url)
    @semaphore.acquire do
      @internet.get(url)
    end
  end
end

# Production server pattern
Async do |task|
  service = HTTPService.new(max_connections: 50)
  service.start
  
  # Handle graceful shutdown
  shutdown = false
  trap("TERM") { shutdown = true }
  trap("INT") { shutdown = true }
  
  # Main service loop
  until shutdown
    begin
      # Process incoming requests
      urls = get_pending_urls()  # Hypothetical method
      
      request_tasks = urls.map do |url|
        task.async do
          response = service.fetch_with_limit(url)
          process_response(response)  # Hypothetical method
        end
      end
      
      request_tasks.each(&:wait)
      Async.sleep(0.1)  # Brief pause between batches
      
    rescue => error
      puts "Service error: #{error.message}"
      # Log error but continue operation
    end
  end
  
  puts "Shutting down gracefully..."
ensure
  service&.stop
end

Monitoring and observability in event loop applications focus on fiber counts, queue depths, and I/O operation latencies. Custom metrics collection helps identify performance bottlenecks and resource constraints.

require 'async'

class EventLoopMonitor
  def initialize
    @metrics = {
      active_tasks: 0,
      completed_tasks: 0,
      failed_tasks: 0,
      average_latency: 0.0
    }
    @latency_samples = []
  end
  
  def start_task
    @metrics[:active_tasks] += 1
    Time.now
  end
  
  def complete_task(start_time, success: true)
    @metrics[:active_tasks] -= 1
    
    if success
      @metrics[:completed_tasks] += 1
    else
      @metrics[:failed_tasks] += 1
    end
    
    latency = Time.now - start_time
    @latency_samples << latency
    @latency_samples = @latency_samples.last(100)  # Keep recent samples
    
    @metrics[:average_latency] = 
      @latency_samples.sum / @latency_samples.size
  end
  
  def report
    total_tasks = @metrics[:completed_tasks] + @metrics[:failed_tasks]
    success_rate = @metrics[:completed_tasks].to_f / total_tasks * 100
    
    puts "Active: #{@metrics[:active_tasks]}, " \
         "Completed: #{@metrics[:completed_tasks]}, " \
         "Failed: #{@metrics[:failed_tasks]}, " \
         "Success: #{success_rate.round(1)}%, " \
         "Avg Latency: #{(@metrics[:average_latency] * 1000).round(1)}ms"
  end
end

Async do |task|
  monitor = EventLoopMonitor.new
  
  # Start monitoring task
  monitoring_task = task.async do
    loop do
      Async.sleep(5)
      monitor.report
    end
  end
  
  # Simulate production workload
  100.times do |i|
    task.async do
      start_time = monitor.start_task
      
      begin
        # Simulate variable latency work
        Async.sleep(rand(0.01..0.1))
        success = rand > 0.1  # 90% success rate
        
        raise "Simulated error" unless success
        
        monitor.complete_task(start_time, success: true)
      rescue => error
        monitor.complete_task(start_time, success: false)
      end
    end
  end
  
  # Let monitoring run for a bit
  Async.sleep(10)
  monitoring_task.stop
  monitor.report
end

Error recovery patterns in production event loops implement circuit breakers, retry mechanisms, and fallback strategies. These patterns prevent cascading failures and maintain service availability during partial system failures.

require 'async'

class CircuitBreaker
  def initialize(failure_threshold: 5, timeout: 30)
    @failure_threshold = failure_threshold
    @timeout = timeout
    @failure_count = 0
    @last_failure_time = nil
    @state = :closed  # :closed, :open, :half_open
  end
  
  def call
    case @state
    when :open
      if Time.now - @last_failure_time > @timeout
        @state = :half_open
        attempt_call { yield }
      else
        raise "Circuit breaker is open"
      end
    when :half_open
      attempt_call { yield }
    when :closed
      attempt_call { yield }
    end
  end
  
  private
  
  def attempt_call
    result = yield
    on_success
    result
  rescue => error
    on_failure
    raise error
  end
  
  def on_success
    @failure_count = 0
    @state = :closed
  end
  
  def on_failure
    @failure_count += 1
    @last_failure_time = Time.now
    
    if @failure_count >= @failure_threshold
      @state = :open
    end
  end
end

# Production usage with circuit breaker
Async do |task|
  breaker = CircuitBreaker.new(failure_threshold: 3, timeout: 10)
  
  # Simulate service calls with failures
  20.times do |i|
    task.async do
      begin
        result = breaker.call do
          # Simulate unreliable service
          if rand < 0.3  # 30% failure rate
            raise "Service unavailable"
          end
          
          Async.sleep(0.1)
          "Success #{i}"
        end
        
        puts "Request #{i}: #{result}"
      rescue => error
        puts "Request #{i} failed: #{error.message}"
      end
    end
    
    Async.sleep(0.5)  # Space out requests
  end
end

Error Handling & Debugging

Event loop error handling requires different approaches than traditional synchronous code. Exceptions in asynchronous tasks propagate through the fiber execution context, requiring explicit error boundaries and cleanup mechanisms.

Unhandled exceptions in event loop tasks terminate the specific fiber but may not crash the entire event loop. The Async gem provides exception handling mechanisms that capture errors from failed tasks while allowing other tasks to continue execution.

require 'async'

Async do |task|
  # Task that will fail
  failing_task = task.async do
    Async.sleep(0.1)
    raise StandardError, "Task failed!"
  end
  
  # Task that will succeed
  success_task = task.async do
    Async.sleep(0.2)
    "Success!"
  end
  
  # Handle errors from individual tasks
  begin
    result1 = failing_task.wait
  rescue => error
    puts "Caught error from failing task: #{error.message}"
  end
  
  begin
    result2 = success_task.wait
    puts "Success task returned: #{result2}"
  rescue => error
    puts "Unexpected error: #{error.message}"
  end
end

Error propagation in nested task hierarchies follows the fiber parent-child relationship. Child task errors bubble up to parent tasks unless explicitly caught, potentially terminating entire task trees.

require 'async'

class TaskManager
  def initialize
    @errors = []
  end
  
  def run_with_error_collection
    Async do |parent_task|
      # Create error boundary for child tasks
      child_tasks = 10.times.map do |i|
        parent_task.async do
          begin
            perform_work(i)
          rescue => error
            @errors << { task_id: i, error: error }
            nil  # Return nil to indicate failure
          end
        end
      end
      
      # Wait for all tasks and filter results
      results = child_tasks.map(&:wait).compact
      
      puts "Completed tasks: #{results.size}"
      puts "Failed tasks: #{@errors.size}"
      
      @errors.each do |error_info|
        puts "Task #{error_info[:task_id]} failed: #{error_info[:error].message}"
      end
      
      results
    end
  end
  
  private
  
  def perform_work(task_id)
    Async.sleep(0.1)
    
    # Randomly fail some tasks
    if rand < 0.3
      raise "Task #{task_id} encountered an error"
    end
    
    "Result from task #{task_id}"
  end
end

manager = TaskManager.new
results = manager.run_with_error_collection
puts "Final results count: #{results.size}"

Debugging asynchronous code requires specialized techniques for tracing execution flow across fiber boundaries. Stack traces in event loops show fiber switching points rather than traditional call hierarchies.

require 'async'

module AsyncDebugger
  def self.trace_execution(label)
    puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} - Fiber: #{Fiber.current.object_id}"
    
    if block_given?
      begin
        result = yield
        puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} completed"
        result
      rescue => error
        puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} failed: #{error.message}"
        puts error.backtrace.first(5).map { |line| "  #{line}" }.join("\n")
        raise error
      end
    end
  end
end

Async do |task|
  # Debug complex async operation
  result = AsyncDebugger.trace_execution("Main operation") do
    subtasks = 3.times.map do |i|
      task.async do
        AsyncDebugger.trace_execution("Subtask #{i}") do
          Async.sleep(rand(0.1..0.3))
          
          if i == 1  # Simulate error in one subtask
            raise "Subtask #{i} failed"
          end
          
          "Result #{i}"
        end
      end
    end
    
    # Collect results with error handling
    results = []
    subtasks.each_with_index do |subtask, index|
      begin
        results << subtask.wait
      rescue => error
        puts "Subtask #{index} error handled: #{error.message}"
        results << nil
      end
    end
    
    results.compact
  end
  
  puts "Final results: #{result}"
end

Timeout handling in event loops prevents tasks from running indefinitely. The Async gem provides timeout mechanisms that cancel long-running operations and raise timeout exceptions.

require 'async'

class TimeoutHandler
  def initialize(default_timeout: 5.0)
    @default_timeout = default_timeout
  end
  
  def with_timeout(timeout: @default_timeout, &block)
    Async do |task|
      # Create timeout task
      timeout_task = task.async do
        Async.sleep(timeout)
        raise Async::TimeoutError, "Operation timed out after #{timeout} seconds"
      end
      
      # Create work task
      work_task = task.async(&block)
      
      # Race between timeout and work completion
      begin
        result = work_task.wait
        timeout_task.stop  # Cancel timeout
        result
      rescue Async::Stop
        # Work was cancelled by timeout
        work_task.stop
        raise Async::TimeoutError, "Operation was cancelled"
      end
    end
  end
end

Async do |task|
  handler = TimeoutHandler.new(default_timeout: 2.0)
  
  # Operation that completes within timeout
  begin
    result = handler.with_timeout do
      Async.sleep(1.0)
      "Quick operation"
    end
    puts "Result: #{result}"
  rescue Async::TimeoutError => error
    puts "Timeout error: #{error.message}"
  end
  
  # Operation that exceeds timeout
  begin
    result = handler.with_timeout(timeout: 1.0) do
      Async.sleep(3.0)
      "Slow operation"
    end
    puts "Result: #{result}"
  rescue Async::TimeoutError => error
    puts "Timeout error: #{error.message}"
  end
end

Reference

Core Classes and Methods

Class/Method Parameters Returns Description
Async { } Block Task Creates event loop context and executes block
Async.sleep(duration) duration (Numeric) nil Non-blocking sleep for specified seconds
Task#async { } Block Task Creates concurrent task within current context
Task#wait None Object Blocks until task completes, returns result
Task#stop None nil Cancels task execution
Task#finished? None Boolean Checks if task has completed
Task#running? None Boolean Checks if task is currently executing

Synchronization Primitives

Class Constructor Purpose
Async::Semaphore new(limit) Limits concurrent access to resources
Async::Barrier new Synchronization point for multiple tasks
Async::Condition new Allows tasks to wait for specific conditions
Async::Queue new(limit=nil) Thread-safe queue for task communication
Async::Notification new One-time signaling between tasks

Semaphore Operations

Method Parameters Returns Description
#acquire { } Block Object Acquires semaphore, executes block, releases
#try_acquire { } Block Object or nil Non-blocking acquire attempt
#count None Integer Current available permits
#limit None Integer Maximum permits

Queue Operations

Method Parameters Returns Description
#enqueue(item) item (Object) nil Adds item to queue
#dequeue None Object Removes and returns item from queue
#empty? None Boolean Checks if queue is empty
#size None Integer Number of items in queue

HTTP Client (Async::HTTP)

Class Method Parameters Returns
Internet #get(url, **options) url (String), options (Hash) Response
Internet #post(url, **options) url (String), options (Hash) Response
Internet #put(url, **options) url (String), options (Hash) Response
Internet #delete(url, **options) url (String), options (Hash) Response
Internet #close None nil

Response Object

Method Returns Description
#status Integer HTTP status code
#headers Hash Response headers
#body String Response body content
#read String Reads response body
#close nil Closes response stream

Exception Classes

Exception Inheritance Description
Async::Stop Exception Task cancellation signal
Async::TimeoutError StandardError Operation timeout exceeded
Async::Wrapper::Cancelled StandardError I/O operation cancelled

Task States

State Description Methods
:running Task is executing #running? returns true
:complete Task finished successfully #finished? returns true
:failed Task terminated with exception #finished? returns true
:stopped Task was cancelled #stopped? returns true

Performance Monitoring

Metric Method Description
Active Tasks Async::Task.current.children.count Number of running child tasks
Fiber Count ObjectSpace.each_object(Fiber).count Total fibers in memory
GC Stats GC.stat Memory allocation statistics

Common Patterns

Pattern Code Template Use Case
Task Group tasks = n.times.map { task.async { } }; tasks.map(&:wait) Parallel execution
Resource Pool semaphore.acquire { resource.use } Limited resource access
Producer-Consumer queue.enqueue(item); item = queue.dequeue Task communication
Circuit Breaker breaker.call { risky_operation } Fault tolerance
Timeout Wrapper Async::TimeoutError rescue pattern Operation timeouts