Overview
Event loops implement the reactor pattern in Ruby, managing I/O operations and callbacks without blocking the main execution thread. Ruby's event loop implementations coordinate between system-level I/O multiplexing and application-level callback execution, creating single-threaded concurrency through cooperative multitasking.
The core concept revolves around an event dispatcher that monitors multiple I/O sources simultaneously. When data becomes available or operations complete, the loop executes registered callbacks. This approach eliminates thread synchronization overhead while handling thousands of concurrent connections.
Ruby provides event loop functionality through several implementations. The Async gem offers a modern fiber-based approach, while EventMachine provides a more traditional callback-oriented model. The built-in IO.select
forms the foundation for custom event loop implementations.
require 'async'
Async do
task = Async do |task|
task.sleep(1)
puts "Async operation completed"
end
puts "Non-blocking execution continues"
task.wait
end
# => Non-blocking execution continues
# => Async operation completed
Event loops excel at I/O-bound operations like network requests, file operations, and database queries. They transform blocking operations into asynchronous tasks that yield control during waiting periods. This design pattern maximizes CPU utilization by interleaving computation with I/O operations.
require 'async/http'
Async do
internet = Async::HTTP::Internet.new
responses = [
internet.get("https://api.example1.com"),
internet.get("https://api.example2.com"),
internet.get("https://api.example3.com")
].map(&:wait)
responses.each { |response| puts response.status }
ensure
internet&.close
end
The fiber-based approach in modern Ruby event loops creates lightweight execution contexts. Each fiber represents an independent execution state that can be suspended and resumed. The event loop scheduler coordinates fiber execution based on I/O readiness and timer events.
Basic Usage
Event loop programming begins with creating an event loop context and defining asynchronous tasks within that context. The Async gem provides the most straightforward entry point for event loop programming in modern Ruby applications.
require 'async'
# Basic event loop with single task
Async do
puts "Starting async task"
Async.sleep(0.1) # Non-blocking sleep
puts "Task completed"
end
puts "Main thread continues immediately"
The Async
block creates an event loop context where asynchronous operations execute cooperatively. Tasks within the block yield control during I/O operations, allowing other tasks to progress. The loop terminates when all tasks complete or encounter unhandled exceptions.
Multiple concurrent tasks execute within the same event loop through nested Async
blocks. Each nested block creates an independent task that runs concurrently with other tasks in the same loop.
require 'async'
Async do |task|
# Start multiple concurrent tasks
tasks = 5.times.map do |i|
task.async do
puts "Task #{i} starting"
Async.sleep(rand(0.1..0.5))
puts "Task #{i} completed"
i * 2
end
end
# Wait for all tasks and collect results
results = tasks.map(&:wait)
puts "All tasks completed: #{results}"
end
File I/O operations integrate with event loops through asynchronous file handling. The event loop monitors file descriptor readiness and schedules callback execution when operations can proceed without blocking.
require 'async'
require 'async/io'
Async do |task|
# Concurrent file operations
files = ['file1.txt', 'file2.txt', 'file3.txt']
file_tasks = files.map do |filename|
task.async do
File.open(filename, 'w') do |file|
Async::IO::Generic.new(file).write("Data for #{filename}\n")
end
puts "Wrote #{filename}"
end
end
file_tasks.each(&:wait)
puts "All file operations completed"
end
Network operations represent the primary use case for event loops. HTTP clients and servers benefit significantly from non-blocking I/O patterns, handling multiple connections without thread proliferation.
require 'async/http'
Async do
internet = Async::HTTP::Internet.new
# Concurrent HTTP requests
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
start_time = Time.now
responses = urls.map do |url|
Async do
response = internet.get(url)
puts "#{url}: #{response.status}"
response
end
end.map(&:wait)
puts "Total time: #{Time.now - start_time} seconds"
ensure
internet&.close
end
Thread Safety & Concurrency
Event loops in Ruby operate within single threads but provide concurrency through cooperative multitasking. This model eliminates most thread safety concerns while requiring careful attention to fiber coordination and shared state management.
Fiber-based concurrency in event loops shares memory space between all executing tasks. Variables defined outside task blocks remain accessible to all tasks, creating potential race conditions during yield points. Critical sections require explicit synchronization mechanisms.
require 'async'
class SharedCounter
def initialize
@count = 0
@mutex = Async::Semaphore.new(1)
end
def increment
@mutex.acquire do
current = @count
Async.sleep(0.001) # Simulate processing delay
@count = current + 1
end
end
attr_reader :count
end
Async do |task|
counter = SharedCounter.new
# Multiple tasks incrementing shared counter
tasks = 100.times.map do
task.async { counter.increment }
end
tasks.each(&:wait)
puts "Final count: #{counter.count}" # Should be 100
end
Task communication patterns in event loops rely on channels, queues, and condition variables rather than traditional thread synchronization primitives. The Async gem provides fiber-safe implementations of these coordination mechanisms.
require 'async'
require 'async/queue'
Async do |task|
queue = Async::Queue.new
# Producer task
producer = task.async do
10.times do |i|
queue.enqueue("Item #{i}")
Async.sleep(0.1)
end
queue.enqueue(nil) # Signal completion
end
# Consumer task
consumer = task.async do
while item = queue.dequeue
break if item.nil?
puts "Processing: #{item}"
Async.sleep(0.05) # Simulate work
end
end
[producer, consumer].each(&:wait)
end
The event loop scheduler coordinates fiber execution through a priority system based on I/O readiness and timer expiration. Tasks voluntarily yield control during blocking operations, allowing the scheduler to activate ready tasks.
require 'async'
Async do |task|
# Demonstrate fiber coordination
barrier = Async::Barrier.new
5.times do |i|
barrier.async do
puts "Task #{i} starting"
Async.sleep(rand(0.1..0.3))
puts "Task #{i} reaching barrier"
end
end
puts "Waiting for all tasks to reach barrier"
barrier.wait
puts "All tasks synchronized"
end
Memory sharing between fibers requires careful state management. Unlike threads, fibers share the same heap and stack space, making memory corruption impossible but requiring coordination for mutable state access.
require 'async'
class TaskCoordinator
def initialize
@results = {}
@condition = Async::Condition.new
end
def store_result(task_id, result)
@results[task_id] = result
@condition.broadcast if @results.size >= 3
end
def wait_for_completion
@condition.wait while @results.size < 3
@results.values
end
end
Async do |task|
coordinator = TaskCoordinator.new
# Start worker tasks
3.times do |i|
task.async do
result = perform_computation(i)
coordinator.store_result(i, result)
end
end
# Wait for all results
results = coordinator.wait_for_completion
puts "Collected results: #{results}"
end
def perform_computation(id)
sleep(rand(0.1..0.5))
id ** 2
end
Performance & Memory
Event loops optimize performance by eliminating thread creation overhead and context switching costs. A single event loop can handle thousands of concurrent operations using minimal system resources compared to traditional thread-per-connection models.
Memory usage in event loop applications remains constant regardless of concurrent operation count. Each fiber consumes approximately 4KB of stack space, while threads typically require 1-8MB per thread. This difference becomes significant when handling thousands of concurrent connections.
require 'async'
require 'benchmark'
def measure_memory(&block)
GC.start
before = GC.stat[:total_allocated_objects]
block.call
GC.start
after = GC.stat[:total_allocated_objects]
after - before
end
# Compare event loop vs thread performance
puts "Event Loop Memory Usage:"
event_loop_objects = measure_memory do
Async do |task|
1000.times.map do |i|
task.async do
Async.sleep(0.001)
i * 2
end
end.map(&:wait)
end
end
puts "Objects allocated: #{event_loop_objects}"
# Thread-based equivalent would create 1000 threads
puts "\nThread Memory Usage (estimated):"
puts "1000 threads × ~8MB = ~8GB memory"
puts "Event loops: ~4KB × 1000 = ~4MB memory"
CPU utilization efficiency stems from the cooperative multitasking model. Tasks yield control during I/O operations rather than consuming CPU cycles waiting. The event loop maximizes CPU usage by scheduling computation-ready tasks immediately.
require 'async'
require 'async/http'
require 'benchmark'
# Performance comparison for concurrent HTTP requests
urls = Array.new(50) { |i| "https://httpbin.org/delay/0.1?id=#{i}" }
# Event loop approach
event_loop_time = Benchmark.realtime do
Async do
internet = Async::HTTP::Internet.new
responses = urls.map do |url|
Async { internet.get(url) }
end.map(&:wait)
puts "Event loop processed #{responses.size} requests"
ensure
internet&.close
end
end
puts "Event loop time: #{event_loop_time.round(2)} seconds"
puts "Theoretical sequential time: #{urls.size * 0.1} seconds"
puts "Speedup factor: #{(urls.size * 0.1 / event_loop_time).round(1)}x"
I/O multiplexing performance depends on the underlying system implementation. Event loops in Ruby utilize epoll
on Linux, kqueue
on BSD systems, and select
on other platforms. These system calls monitor hundreds of file descriptors with minimal overhead.
require 'async'
class PerformanceMonitor
def initialize
@start_time = Time.now
@operation_count = 0
end
def record_operation
@operation_count += 1
end
def statistics
elapsed = Time.now - @start_time
ops_per_second = @operation_count / elapsed
{
elapsed: elapsed.round(2),
operations: @operation_count,
ops_per_second: ops_per_second.round(1)
}
end
end
Async do |task|
monitor = PerformanceMonitor.new
# Simulate high-frequency operations
1000.times do |i|
task.async do
# Simulate I/O operation
Async.sleep(0.001)
monitor.record_operation
end
end
# Monitor performance every second
stats_task = task.async do
loop do
Async.sleep(1)
stats = monitor.statistics
puts "Operations: #{stats[:operations]}, Rate: #{stats[:ops_per_second]}/sec"
break if stats[:operations] >= 1000
end
end
stats_task.wait
puts "Final statistics: #{monitor.statistics}"
end
Memory allocation patterns in event loops favor small, frequent allocations over large buffer pools. The garbage collector handles fiber stack cleanup efficiently, but developers should monitor object allocation rates during high-throughput operations.
Production Patterns
Production event loop applications require robust error handling, monitoring, and graceful shutdown mechanisms. Long-running services must handle connection failures, resource exhaustion, and system signal management without losing data or corrupting state.
Server applications using event loops typically implement connection pooling and resource management patterns. Database connections, HTTP clients, and file handles require careful lifecycle management to prevent resource leaks.
require 'async'
require 'async/http'
class HTTPService
def initialize(max_connections: 100)
@semaphore = Async::Semaphore.new(max_connections)
@internet = nil
end
def start
@internet = Async::HTTP::Internet.new
end
def stop
@internet&.close
@internet = nil
end
def fetch_with_limit(url)
@semaphore.acquire do
@internet.get(url)
end
end
end
# Production server pattern
Async do |task|
service = HTTPService.new(max_connections: 50)
service.start
# Handle graceful shutdown
shutdown = false
trap("TERM") { shutdown = true }
trap("INT") { shutdown = true }
# Main service loop
until shutdown
begin
# Process incoming requests
urls = get_pending_urls() # Hypothetical method
request_tasks = urls.map do |url|
task.async do
response = service.fetch_with_limit(url)
process_response(response) # Hypothetical method
end
end
request_tasks.each(&:wait)
Async.sleep(0.1) # Brief pause between batches
rescue => error
puts "Service error: #{error.message}"
# Log error but continue operation
end
end
puts "Shutting down gracefully..."
ensure
service&.stop
end
Monitoring and observability in event loop applications focus on fiber counts, queue depths, and I/O operation latencies. Custom metrics collection helps identify performance bottlenecks and resource constraints.
require 'async'
class EventLoopMonitor
def initialize
@metrics = {
active_tasks: 0,
completed_tasks: 0,
failed_tasks: 0,
average_latency: 0.0
}
@latency_samples = []
end
def start_task
@metrics[:active_tasks] += 1
Time.now
end
def complete_task(start_time, success: true)
@metrics[:active_tasks] -= 1
if success
@metrics[:completed_tasks] += 1
else
@metrics[:failed_tasks] += 1
end
latency = Time.now - start_time
@latency_samples << latency
@latency_samples = @latency_samples.last(100) # Keep recent samples
@metrics[:average_latency] =
@latency_samples.sum / @latency_samples.size
end
def report
total_tasks = @metrics[:completed_tasks] + @metrics[:failed_tasks]
success_rate = @metrics[:completed_tasks].to_f / total_tasks * 100
puts "Active: #{@metrics[:active_tasks]}, " \
"Completed: #{@metrics[:completed_tasks]}, " \
"Failed: #{@metrics[:failed_tasks]}, " \
"Success: #{success_rate.round(1)}%, " \
"Avg Latency: #{(@metrics[:average_latency] * 1000).round(1)}ms"
end
end
Async do |task|
monitor = EventLoopMonitor.new
# Start monitoring task
monitoring_task = task.async do
loop do
Async.sleep(5)
monitor.report
end
end
# Simulate production workload
100.times do |i|
task.async do
start_time = monitor.start_task
begin
# Simulate variable latency work
Async.sleep(rand(0.01..0.1))
success = rand > 0.1 # 90% success rate
raise "Simulated error" unless success
monitor.complete_task(start_time, success: true)
rescue => error
monitor.complete_task(start_time, success: false)
end
end
end
# Let monitoring run for a bit
Async.sleep(10)
monitoring_task.stop
monitor.report
end
Error recovery patterns in production event loops implement circuit breakers, retry mechanisms, and fallback strategies. These patterns prevent cascading failures and maintain service availability during partial system failures.
require 'async'
class CircuitBreaker
def initialize(failure_threshold: 5, timeout: 30)
@failure_threshold = failure_threshold
@timeout = timeout
@failure_count = 0
@last_failure_time = nil
@state = :closed # :closed, :open, :half_open
end
def call
case @state
when :open
if Time.now - @last_failure_time > @timeout
@state = :half_open
attempt_call { yield }
else
raise "Circuit breaker is open"
end
when :half_open
attempt_call { yield }
when :closed
attempt_call { yield }
end
end
private
def attempt_call
result = yield
on_success
result
rescue => error
on_failure
raise error
end
def on_success
@failure_count = 0
@state = :closed
end
def on_failure
@failure_count += 1
@last_failure_time = Time.now
if @failure_count >= @failure_threshold
@state = :open
end
end
end
# Production usage with circuit breaker
Async do |task|
breaker = CircuitBreaker.new(failure_threshold: 3, timeout: 10)
# Simulate service calls with failures
20.times do |i|
task.async do
begin
result = breaker.call do
# Simulate unreliable service
if rand < 0.3 # 30% failure rate
raise "Service unavailable"
end
Async.sleep(0.1)
"Success #{i}"
end
puts "Request #{i}: #{result}"
rescue => error
puts "Request #{i} failed: #{error.message}"
end
end
Async.sleep(0.5) # Space out requests
end
end
Error Handling & Debugging
Event loop error handling requires different approaches than traditional synchronous code. Exceptions in asynchronous tasks propagate through the fiber execution context, requiring explicit error boundaries and cleanup mechanisms.
Unhandled exceptions in event loop tasks terminate the specific fiber but may not crash the entire event loop. The Async gem provides exception handling mechanisms that capture errors from failed tasks while allowing other tasks to continue execution.
require 'async'
Async do |task|
# Task that will fail
failing_task = task.async do
Async.sleep(0.1)
raise StandardError, "Task failed!"
end
# Task that will succeed
success_task = task.async do
Async.sleep(0.2)
"Success!"
end
# Handle errors from individual tasks
begin
result1 = failing_task.wait
rescue => error
puts "Caught error from failing task: #{error.message}"
end
begin
result2 = success_task.wait
puts "Success task returned: #{result2}"
rescue => error
puts "Unexpected error: #{error.message}"
end
end
Error propagation in nested task hierarchies follows the fiber parent-child relationship. Child task errors bubble up to parent tasks unless explicitly caught, potentially terminating entire task trees.
require 'async'
class TaskManager
def initialize
@errors = []
end
def run_with_error_collection
Async do |parent_task|
# Create error boundary for child tasks
child_tasks = 10.times.map do |i|
parent_task.async do
begin
perform_work(i)
rescue => error
@errors << { task_id: i, error: error }
nil # Return nil to indicate failure
end
end
end
# Wait for all tasks and filter results
results = child_tasks.map(&:wait).compact
puts "Completed tasks: #{results.size}"
puts "Failed tasks: #{@errors.size}"
@errors.each do |error_info|
puts "Task #{error_info[:task_id]} failed: #{error_info[:error].message}"
end
results
end
end
private
def perform_work(task_id)
Async.sleep(0.1)
# Randomly fail some tasks
if rand < 0.3
raise "Task #{task_id} encountered an error"
end
"Result from task #{task_id}"
end
end
manager = TaskManager.new
results = manager.run_with_error_collection
puts "Final results count: #{results.size}"
Debugging asynchronous code requires specialized techniques for tracing execution flow across fiber boundaries. Stack traces in event loops show fiber switching points rather than traditional call hierarchies.
require 'async'
module AsyncDebugger
def self.trace_execution(label)
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} - Fiber: #{Fiber.current.object_id}"
if block_given?
begin
result = yield
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} completed"
result
rescue => error
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{label} failed: #{error.message}"
puts error.backtrace.first(5).map { |line| " #{line}" }.join("\n")
raise error
end
end
end
end
Async do |task|
# Debug complex async operation
result = AsyncDebugger.trace_execution("Main operation") do
subtasks = 3.times.map do |i|
task.async do
AsyncDebugger.trace_execution("Subtask #{i}") do
Async.sleep(rand(0.1..0.3))
if i == 1 # Simulate error in one subtask
raise "Subtask #{i} failed"
end
"Result #{i}"
end
end
end
# Collect results with error handling
results = []
subtasks.each_with_index do |subtask, index|
begin
results << subtask.wait
rescue => error
puts "Subtask #{index} error handled: #{error.message}"
results << nil
end
end
results.compact
end
puts "Final results: #{result}"
end
Timeout handling in event loops prevents tasks from running indefinitely. The Async gem provides timeout mechanisms that cancel long-running operations and raise timeout exceptions.
require 'async'
class TimeoutHandler
def initialize(default_timeout: 5.0)
@default_timeout = default_timeout
end
def with_timeout(timeout: @default_timeout, &block)
Async do |task|
# Create timeout task
timeout_task = task.async do
Async.sleep(timeout)
raise Async::TimeoutError, "Operation timed out after #{timeout} seconds"
end
# Create work task
work_task = task.async(&block)
# Race between timeout and work completion
begin
result = work_task.wait
timeout_task.stop # Cancel timeout
result
rescue Async::Stop
# Work was cancelled by timeout
work_task.stop
raise Async::TimeoutError, "Operation was cancelled"
end
end
end
end
Async do |task|
handler = TimeoutHandler.new(default_timeout: 2.0)
# Operation that completes within timeout
begin
result = handler.with_timeout do
Async.sleep(1.0)
"Quick operation"
end
puts "Result: #{result}"
rescue Async::TimeoutError => error
puts "Timeout error: #{error.message}"
end
# Operation that exceeds timeout
begin
result = handler.with_timeout(timeout: 1.0) do
Async.sleep(3.0)
"Slow operation"
end
puts "Result: #{result}"
rescue Async::TimeoutError => error
puts "Timeout error: #{error.message}"
end
end
Reference
Core Classes and Methods
Class/Method | Parameters | Returns | Description |
---|---|---|---|
Async { } |
Block | Task |
Creates event loop context and executes block |
Async.sleep(duration) |
duration (Numeric) |
nil |
Non-blocking sleep for specified seconds |
Task#async { } |
Block | Task |
Creates concurrent task within current context |
Task#wait |
None | Object |
Blocks until task completes, returns result |
Task#stop |
None | nil |
Cancels task execution |
Task#finished? |
None | Boolean |
Checks if task has completed |
Task#running? |
None | Boolean |
Checks if task is currently executing |
Synchronization Primitives
Class | Constructor | Purpose |
---|---|---|
Async::Semaphore |
new(limit) |
Limits concurrent access to resources |
Async::Barrier |
new |
Synchronization point for multiple tasks |
Async::Condition |
new |
Allows tasks to wait for specific conditions |
Async::Queue |
new(limit=nil) |
Thread-safe queue for task communication |
Async::Notification |
new |
One-time signaling between tasks |
Semaphore Operations
Method | Parameters | Returns | Description |
---|---|---|---|
#acquire { } |
Block | Object |
Acquires semaphore, executes block, releases |
#try_acquire { } |
Block | Object or nil |
Non-blocking acquire attempt |
#count |
None | Integer |
Current available permits |
#limit |
None | Integer |
Maximum permits |
Queue Operations
Method | Parameters | Returns | Description |
---|---|---|---|
#enqueue(item) |
item (Object) |
nil |
Adds item to queue |
#dequeue |
None | Object |
Removes and returns item from queue |
#empty? |
None | Boolean |
Checks if queue is empty |
#size |
None | Integer |
Number of items in queue |
HTTP Client (Async::HTTP)
Class | Method | Parameters | Returns |
---|---|---|---|
Internet |
#get(url, **options) |
url (String), options (Hash) |
Response |
Internet |
#post(url, **options) |
url (String), options (Hash) |
Response |
Internet |
#put(url, **options) |
url (String), options (Hash) |
Response |
Internet |
#delete(url, **options) |
url (String), options (Hash) |
Response |
Internet |
#close |
None | nil |
Response Object
Method | Returns | Description |
---|---|---|
#status |
Integer |
HTTP status code |
#headers |
Hash |
Response headers |
#body |
String |
Response body content |
#read |
String |
Reads response body |
#close |
nil |
Closes response stream |
Exception Classes
Exception | Inheritance | Description |
---|---|---|
Async::Stop |
Exception |
Task cancellation signal |
Async::TimeoutError |
StandardError |
Operation timeout exceeded |
Async::Wrapper::Cancelled |
StandardError |
I/O operation cancelled |
Task States
State | Description | Methods |
---|---|---|
:running |
Task is executing | #running? returns true |
:complete |
Task finished successfully | #finished? returns true |
:failed |
Task terminated with exception | #finished? returns true |
:stopped |
Task was cancelled | #stopped? returns true |
Performance Monitoring
Metric | Method | Description |
---|---|---|
Active Tasks | Async::Task.current.children.count |
Number of running child tasks |
Fiber Count | ObjectSpace.each_object(Fiber).count |
Total fibers in memory |
GC Stats | GC.stat |
Memory allocation statistics |
Common Patterns
Pattern | Code Template | Use Case |
---|---|---|
Task Group | tasks = n.times.map { task.async { } }; tasks.map(&:wait) |
Parallel execution |
Resource Pool | semaphore.acquire { resource.use } |
Limited resource access |
Producer-Consumer | queue.enqueue(item); item = queue.dequeue |
Task communication |
Circuit Breaker | breaker.call { risky_operation } |
Fault tolerance |
Timeout Wrapper | Async::TimeoutError rescue pattern |
Operation timeouts |