Overview
The Async gem provides a modern asynchronous I/O framework for Ruby applications. Built around an event-driven reactor pattern, Async enables developers to write concurrent code that can handle thousands of simultaneous I/O operations without blocking execution threads. The gem implements cooperative multitasking through fibers, allowing Ruby applications to achieve high concurrency while maintaining readable, sequential-style code.
The core component is the Async::Reactor
class, which manages an event loop that schedules and executes asynchronous tasks. Tasks are represented by Async::Task
objects that wrap fiber-based execution contexts. The framework provides async-compatible implementations of common I/O operations, including HTTP requests, file operations, and network communication.
require 'async'
Async do
puts "Starting async task"
Async::Task.current.sleep(1)
puts "Task completed after 1 second"
end
The Async gem integrates with Ruby's existing I/O classes through monkey-patching or wrapper implementations. When a task encounters a blocking I/O operation, the reactor suspends that task's fiber and continues executing other tasks until the I/O operation completes. This approach maximizes CPU utilization and enables applications to handle concurrent requests efficiently.
require 'async'
require 'async/http/internet'
Async do |task|
internet = Async::HTTP::Internet.new
# Multiple concurrent HTTP requests
responses = task.async do
[
internet.get("https://httpbin.org/delay/1"),
internet.get("https://httpbin.org/delay/2"),
internet.get("https://httpbin.org/delay/3")
].map(&:wait)
end
puts "All requests completed"
ensure
internet&.close
end
The framework distinguishes itself from thread-based concurrency by avoiding the overhead of OS thread context switching and eliminating race conditions that plague multi-threaded applications. Since all tasks execute within a single thread, shared state access requires no synchronization primitives.
Basic Usage
Creating asynchronous execution contexts requires the Async
method, which establishes a reactor and runs the provided block within an async task. The reactor automatically starts an event loop that continues until all tasks complete or an unhandled exception occurs.
require 'async'
Async do
puts "Inside async block"
# Async operations go here
end
puts "After async block completes"
The Async::Task.current
method provides access to the currently executing task, which offers methods for suspending execution, spawning child tasks, and managing task lifecycle. The sleep
method demonstrates cooperative yielding, where the current task suspends and allows other tasks to execute.
require 'async'
Async do
puts "Task 1 starting"
# Spawn a child task
task2 = Async::Task.current.async do
puts "Task 2 starting"
Async::Task.current.sleep(2)
puts "Task 2 completing"
"result from task 2"
end
puts "Task 1 sleeping"
Async::Task.current.sleep(1)
puts "Task 1 resuming"
# Wait for child task to complete
result = task2.wait
puts "Received: #{result}"
end
File I/O operations become asynchronous when using the appropriate async-compatible methods. The Async::IO
module provides non-blocking alternatives to Ruby's standard I/O classes.
require 'async'
require 'async/io'
Async do
# Asynchronous file reading
Async::IO::Generic.open('/etc/hosts', 'r') do |file|
content = file.read
puts "File size: #{content.bytesize} bytes"
end
# Multiple concurrent file operations
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = files.map do |filename|
Async::Task.current.async do
if File.exist?(filename)
content = File.read(filename)
puts "#{filename}: #{content.lines.count} lines"
end
end
end
# Wait for all file operations to complete
tasks.each(&:wait)
end
Network programming with Async involves using async-compatible socket implementations. The Async::IO::Socket
classes provide non-blocking network communication that integrates seamlessly with the reactor pattern.
require 'async'
require 'async/io'
Async do |task|
# Create an async TCP server
server_task = task.async do
Async::IO::Endpoint.tcp('localhost', 8080).bind do |server|
puts "Server listening on port 8080"
server.listen(128) do |client|
data = client.read
client.write("Echo: #{data}")
client.close
end
end
end
# Give server time to start
task.sleep(0.1)
# Create multiple client connections
5.times do |i|
task.async do
Async::IO::Endpoint.tcp('localhost', 8080).connect do |client|
client.write("Message #{i}")
response = client.read
puts response
end
end
end
task.sleep(1)
server_task.stop
end
Thread Safety & Concurrency
The Async gem implements a single-threaded concurrency model where all tasks execute within the same OS thread. This design eliminates traditional thread safety concerns since no two tasks execute simultaneously. However, understanding task scheduling and yielding behavior is crucial for writing correct async code.
Tasks yield control at specific suspension points, primarily during I/O operations or explicit sleep
calls. Between these points, task execution is atomic - no other task can interrupt or modify shared state. This characteristic simplifies state management but requires careful consideration of when tasks yield control.
require 'async'
class Counter
def initialize
@value = 0
end
def increment
# This operation is atomic - no other task can interrupt
old_value = @value
@value = old_value + 1
puts "Incremented from #{old_value} to #{@value}"
end
def slow_increment
old_value = @value
# Yielding here allows other tasks to run
Async::Task.current.sleep(0.01)
@value = old_value + 1
puts "Slow increment: #{old_value} -> #{@value}"
end
end
Async do |task|
counter = Counter.new
# These increments are safe - atomic execution
10.times do
task.async { counter.increment }
end
# These may produce incorrect results due to yielding
10.times do
task.async { counter.slow_increment }
end
task.sleep(1) # Wait for all tasks to complete
end
When building async applications that span multiple threads, synchronization becomes necessary. The Async::Semaphore
class provides a mechanism for controlling access to shared resources across async contexts, even when those contexts exist in different threads.
require 'async'
require 'async/semaphore'
# Shared resource protection
semaphore = Async::Semaphore.new(3) # Allow 3 concurrent accesses
Async do |task|
10.times do |i|
task.async do
semaphore.acquire do
puts "Task #{i} acquired semaphore"
# Simulate work that should be limited
Async::Task.current.sleep(1)
puts "Task #{i} releasing semaphore"
end
end
end
end
The reactor itself is thread-safe and can accept tasks from multiple threads. However, tasks submitted to a reactor always execute within that reactor's thread. Cross-thread communication requires careful coordination using thread-safe data structures or message passing patterns.
require 'async'
require 'async/notification'
# Thread-safe communication between async contexts
notification = Async::Notification.new
# Producer thread
producer_thread = Thread.new do
sleep(2) # Simulate work in another thread
notification.signal("Data ready")
end
# Consumer in async context
Async do
puts "Waiting for notification"
result = notification.wait
puts "Received: #{result}"
end
producer_thread.join
Race conditions can still occur in async code when tasks make assumptions about the state of shared resources after yielding control. The key to avoiding these issues is minimizing the scope of operations that depend on shared state and using appropriate synchronization primitives when necessary.
require 'async'
class AsyncSafeResource
def initialize
@data = {}
@mutex = Async::Mutex.new
end
def update(key, value)
@mutex.synchronize do
# Critical section - other tasks cannot interrupt
old_value = @data[key]
# Even if this yields, the mutex prevents other tasks
# from entering this critical section
Async::Task.current.sleep(0.01)
@data[key] = value
puts "Updated #{key}: #{old_value} -> #{value}"
end
end
def read(key)
@mutex.synchronize do
@data[key]
end
end
end
Performance & Memory
The Async gem delivers significant performance improvements for I/O-bound applications by eliminating thread overhead and maximizing CPU utilization during I/O waits. Benchmarks consistently show that async applications can handle 10-100x more concurrent connections than traditional threaded alternatives, depending on the workload characteristics.
Memory usage remains predictable and low because fibers consume significantly less memory than OS threads. A typical fiber requires 4KB of stack space compared to the 1-2MB default stack size for threads. This efficiency enables applications to maintain thousands of concurrent connections without exhausting system memory.
require 'async'
require 'async/http/internet'
require 'benchmark'
# Performance comparison: sequential vs concurrent requests
urls = Array.new(50) { |i| "https://httpbin.org/delay/1" }
# Sequential approach
sequential_time = Benchmark.realtime do
internet = Async::HTTP::Internet.new
urls.each do |url|
response = internet.get(url)
puts "Response code: #{response.status}"
end
internet.close
end
# Concurrent approach
concurrent_time = Benchmark.realtime do
Async do |task|
internet = Async::HTTP::Internet.new
tasks = urls.map do |url|
task.async do
response = internet.get(url)
puts "Response code: #{response.status}"
end
end
tasks.each(&:wait)
internet.close
end
end
puts "Sequential: #{sequential_time.round(2)}s"
puts "Concurrent: #{concurrent_time.round(2)}s"
puts "Speedup: #{(sequential_time / concurrent_time).round(2)}x"
Memory profiling reveals that async applications maintain stable memory usage patterns even under high load. The reactor's event loop efficiently manages fiber lifecycles, creating and destroying tasks as needed without significant memory fragmentation.
require 'async'
require 'async/http/internet'
def measure_memory
GC.start
`ps -o rss= -p #{Process.pid}`.to_i
end
puts "Initial memory: #{measure_memory} KB"
Async do |task|
internet = Async::HTTP::Internet.new
# Create many concurrent tasks
1000.times do |i|
task.async do
response = internet.get("https://httpbin.org/json")
data = response.read
# Process response data
JSON.parse(data) if data
end
# Measure memory every 100 tasks
if i % 100 == 0
puts "Memory after #{i + 1} tasks: #{measure_memory} KB"
end
end
internet.close
end
puts "Final memory: #{measure_memory} KB"
Optimizing async applications requires understanding when tasks yield control and minimizing unnecessary context switches. Batching operations and reducing the frequency of I/O calls can significantly improve throughput.
require 'async'
require 'async/http/internet'
# Inefficient: many small requests
Async do |task|
internet = Async::HTTP::Internet.new
inefficient_time = Benchmark.realtime do
100.times do |i|
response = internet.get("https://httpbin.org/json")
data = response.read
end
end
# Efficient: batch requests
efficient_time = Benchmark.realtime do
tasks = 100.times.map do |i|
task.async do
response = internet.get("https://httpbin.org/json")
response.read
end
end
results = tasks.map(&:wait)
end
puts "Inefficient: #{inefficient_time.round(2)}s"
puts "Efficient: #{efficient_time.round(2)}s"
internet.close
end
The reactor's internal scheduling algorithm prioritizes tasks based on I/O readiness, ensuring that tasks with completed I/O operations run before those still waiting. This scheduling strategy maximizes throughput by minimizing idle time and maintaining high CPU utilization.
Connection pooling and resource reuse become critical performance factors in async applications. The framework provides built-in connection pooling for HTTP clients, but custom resource pools may be necessary for other types of I/O operations.
require 'async'
require 'async/pool'
# Custom resource pool for database connections
class DatabasePool < Async::Pool::Controller
def initialize(size: 10)
super()
@available = size.times.map { create_connection }
end
private
def create_connection
# Simulate database connection creation
connection_id = Random.rand(1000..9999)
puts "Created connection #{connection_id}"
connection_id
end
def retire_connection(connection)
puts "Retired connection #{connection}"
end
end
Async do |task|
pool = DatabasePool.new(size: 5)
# Multiple tasks sharing connection pool
20.times do |i|
task.async do
pool.acquire do |connection|
puts "Task #{i} using connection #{connection}"
Async::Task.current.sleep(0.5) # Simulate database query
end
end
end
end
Production Patterns
Production async applications require robust patterns for error handling, monitoring, and resource management. The reactor pattern naturally fits web server architectures, enabling applications to handle thousands of concurrent HTTP requests efficiently.
Web frameworks like Falcon leverage the Async gem to provide high-performance HTTP servers. Integrating async code with existing Rails applications requires careful consideration of ActiveRecord connections and other blocking operations that can disrupt the async execution model.
require 'async'
require 'async/http/server'
require 'async/http/endpoint'
# Production HTTP server with request handling
class ProductionServer
def initialize(port: 8080)
@endpoint = Async::HTTP::Endpoint.parse("http://localhost:#{port}")
@request_count = 0
@start_time = Time.now
end
def start
Async do |task|
# Health monitoring task
task.async do
loop do
Async::Task.current.sleep(30)
uptime = Time.now - @start_time
puts "Server uptime: #{uptime.round(2)}s, requests: #{@request_count}"
end
end
# HTTP server task
server = Async::HTTP::Server.for(@endpoint) do |request|
@request_count += 1
case request.path
when '/health'
handle_health_check(request)
when '/api/data'
handle_data_request(request)
else
Protocol::HTTP::Response[404, {}, ["Not Found"]]
end
rescue => error
puts "Request error: #{error.message}"
Protocol::HTTP::Response[500, {}, ["Internal Server Error"]]
end
puts "Server starting on #{@endpoint.url}"
server.run
end
end
private
def handle_health_check(request)
response_data = {
status: 'healthy',
uptime: Time.now - @start_time,
requests: @request_count
}.to_json
Protocol::HTTP::Response[200, { 'content-type' => 'application/json' }, [response_data]]
end
def handle_data_request(request)
# Simulate async data processing
Async::Task.current.sleep(0.1)
data = { message: 'Hello from async server', timestamp: Time.now.iso8601 }.to_json
Protocol::HTTP::Response[200, { 'content-type' => 'application/json' }, [data]]
end
end
server = ProductionServer.new
server.start
Graceful shutdown handling ensures that in-flight requests complete before the server terminates. Signal handlers can coordinate with the reactor to stop accepting new connections while allowing existing tasks to finish.
require 'async'
require 'async/http/server'
class GracefulServer
def initialize
@running = true
@active_requests = 0
@shutdown_timeout = 30
end
def start
# Setup signal handlers
trap('TERM') { initiate_shutdown }
trap('INT') { initiate_shutdown }
Async do |task|
server = Async::HTTP::Server.for(Async::HTTP::Endpoint.parse('http://localhost:8080')) do |request|
@active_requests += 1
begin
# Return 503 if shutting down
unless @running
return Protocol::HTTP::Response[503, {}, ["Service Unavailable"]]
end
# Process request
handle_request(request)
ensure
@active_requests -= 1
end
end
server.run
end
end
private
def initiate_shutdown
puts "Shutdown initiated. Waiting for #{@active_requests} active requests..."
@running = false
# Wait for active requests to complete
timeout = Time.now + @shutdown_timeout
while @active_requests > 0 && Time.now < timeout
sleep(0.1)
end
if @active_requests > 0
puts "Force shutdown: #{@active_requests} requests still active"
else
puts "Graceful shutdown complete"
end
exit(0)
end
def handle_request(request)
# Simulate request processing
Async::Task.current.sleep(rand(0.1..2.0))
Protocol::HTTP::Response[200, {}, ["Request processed"]]
end
end
Monitoring async applications requires tracking reactor health, task queue depths, and I/O operation latencies. Custom metrics collection can integrate with monitoring systems like Prometheus or StatsD.
require 'async'
require 'async/clock'
class AsyncMetrics
def initialize
@task_count = 0
@completed_tasks = 0
@total_execution_time = 0
@start_time = Async::Clock.now
end
def track_task(&block)
@task_count += 1
start_time = Async::Clock.now
begin
result = yield
@completed_tasks += 1
@total_execution_time += Async::Clock.now - start_time
result
rescue => error
puts "Task failed: #{error.message}"
raise
ensure
@task_count -= 1
end
end
def report_stats
uptime = Async::Clock.now - @start_time
avg_execution_time = @completed_tasks > 0 ? @total_execution_time / @completed_tasks : 0
{
uptime: uptime,
active_tasks: @task_count,
completed_tasks: @completed_tasks,
average_execution_time: avg_execution_time,
tasks_per_second: @completed_tasks / uptime
}
end
end
# Usage in production application
Async do |task|
metrics = AsyncMetrics.new
# Periodic metrics reporting
task.async do
loop do
Async::Task.current.sleep(60)
stats = metrics.report_stats
puts "Async metrics: #{stats}"
end
end
# Application tasks with metrics tracking
100.times do |i|
task.async do
metrics.track_task do
# Simulate work
Async::Task.current.sleep(rand(0.1..0.5))
puts "Completed task #{i}"
end
end
end
end
Error Handling & Debugging
Async applications present unique error handling challenges due to their concurrent nature and fiber-based execution model. Exceptions that occur within async tasks do not automatically propagate to the parent context, requiring explicit error handling strategies to prevent silent failures.
The fundamental pattern for async error handling involves using task objects to capture and propagate exceptions. When a task encounters an unhandled exception, the error is stored within the task object and raised when another task calls the wait
method.
require 'async'
Async do |task|
# Task that will fail
failing_task = task.async do
Async::Task.current.sleep(1)
raise StandardError, "Something went wrong"
end
# Task that will succeed
success_task = task.async do
Async::Task.current.sleep(0.5)
"Success result"
end
begin
# This will raise the exception from failing_task
result = failing_task.wait
puts "This won't be reached"
rescue StandardError => error
puts "Caught error: #{error.message}"
end
# Success task continues independently
puts "Success result: #{success_task.wait}"
end
Handling multiple concurrent tasks requires collecting results and errors systematically. The pattern of mapping tasks and then waiting for results ensures that all tasks complete while properly handling any exceptions that occur.
require 'async'
require 'async/http/internet'
def process_urls_with_error_handling(urls)
Async do |task|
internet = Async::HTTP::Internet.new
tasks = urls.map do |url|
task.async do
begin
response = internet.get(url)
{ url: url, status: response.status, success: true }
rescue => error
{ url: url, error: error.message, success: false }
end
end
end
# Collect all results, including errors
results = tasks.map do |task_obj|
begin
task_obj.wait
rescue => error
{ url: "unknown", error: error.message, success: false }
end
end
internet.close
results
end
end
urls = [
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://invalid-domain-12345.com',
'https://httpbin.org/status/500'
]
results = process_urls_with_error_handling(urls)
results.each do |result|
if result[:success]
puts "#{result[:url]}: HTTP #{result[:status]}"
else
puts "#{result[:url]}: ERROR - #{result[:error]}"
end
end
Debugging async code requires different approaches than traditional synchronous debugging. Stack traces become more complex due to fiber switching, and debugger integration may not work as expected. Logging becomes crucial for understanding execution flow and identifying issues.
require 'async'
require 'logger'
class AsyncLogger
def initialize
@logger = Logger.new(STDOUT)
@logger.formatter = proc do |severity, datetime, progname, msg|
task_id = Async::Task.current&.object_id || 'main'
"[#{datetime}] #{severity} [Task:#{task_id}] #{msg}\n"
end
end
def info(message)
@logger.info(message)
end
def error(message)
@logger.error(message)
end
def debug(message)
@logger.debug(message)
end
end
# Usage in async context
Async do |task|
logger = AsyncLogger.new
logger.info("Starting async operation")
# Multiple tasks with logging
tasks = 5.times.map do |i|
task.async do
logger.debug("Task #{i} starting")
begin
# Simulate work that might fail
Async::Task.current.sleep(rand(0.1..0.5))
if rand < 0.3 # 30% chance of failure
raise "Random failure in task #{i}"
end
logger.info("Task #{i} completed successfully")
"Result #{i}"
rescue => error
logger.error("Task #{i} failed: #{error.message}")
raise
end
end
end
# Wait for all tasks with error handling
results = []
tasks.each_with_index do |task_obj, i|
begin
result = task_obj.wait
results << result
rescue => error
logger.error("Failed to get result from task #{i}")
results << nil
end
end
logger.info("All tasks completed. Results: #{results.compact.size}/#{tasks.size}")
end
Timeout handling requires careful coordination with the reactor's scheduling system. The Async::TimeoutError
class provides a standard mechanism for handling operations that exceed expected completion times.
require 'async'
def with_timeout(seconds, &block)
Async do |task|
timeout_task = task.async do
Async::Task.current.sleep(seconds)
raise Async::TimeoutError, "Operation timed out after #{seconds} seconds"
end
work_task = task.async(&block)
# Race between timeout and work completion
result = task.async do
begin
work_task.wait
ensure
timeout_task.stop if timeout_task.running?
end
end
result.wait
end
end
# Example usage with timeout
Async do
begin
result = with_timeout(2) do
puts "Starting long operation"
Async::Task.current.sleep(3) # This will timeout
"Operation completed"
end
puts "Result: #{result}"
rescue Async::TimeoutError => error
puts "Operation failed: #{error.message}"
end
end
Reference
Core Classes
Class | Purpose | Key Methods |
---|---|---|
Async::Reactor |
Event loop management | #run , #stop , #stopped? |
Async::Task |
Fiber-based execution context | #async , #wait , #stop , #sleep |
Async::Condition |
Task synchronization primitive | #wait , #signal |
Async::Semaphore |
Resource access control | #acquire , #release |
Async::Notification |
One-time signaling mechanism | #wait , #signal |
Task Management
Method | Parameters | Returns | Description |
---|---|---|---|
Async { block } |
Block | Task |
Creates reactor and runs async block |
Task.current.async { block } |
Block | Task |
Spawns child task within current reactor |
Task#wait |
None | Object |
Waits for task completion, returns result |
Task#stop |
None | nil |
Stops task execution |
Task#sleep(duration) |
duration (Numeric) |
nil |
Suspends task for specified seconds |
Task#yield |
None | nil |
Yields control to other tasks |
I/O Operations
Class | Purpose | Common Methods |
---|---|---|
Async::IO::Generic |
Async file I/O wrapper | #open , #read , #write , #close |
Async::IO::Socket |
Network socket operations | #connect , #bind , #listen , #accept |
Async::IO::Endpoint |
Network endpoint abstraction | #tcp , #udp , #unix |
Async::HTTP::Internet |
HTTP client interface | #get , #post , #put , #delete |
Synchronization Primitives
Primitive | Use Case | Methods | Example |
---|---|---|---|
Semaphore |
Limit concurrent access | #acquire , #release |
Database connection pooling |
Condition |
Wait for specific conditions | #wait , #signal |
Producer-consumer patterns |
Notification |
One-time signaling | #wait , #signal |
Startup completion |
Barrier |
Synchronize multiple tasks | #wait |
Batch processing coordination |
Error Classes
Exception | Inheritance | Description |
---|---|---|
Async::Stop |
Exception |
Clean task termination |
Async::TimeoutError |
StandardError |
Operation timeout exceeded |
Async::Wrapper |
StandardError |
I/O wrapper errors |
Async::ReactorStopped |
StandardError |
Reactor no longer running |
Configuration Options
Option | Type | Default | Description |
---|---|---|---|
:reactor |
Symbol |
:auto |
Reactor implementation (:auto , :nio , :epoll ) |
:timeout |
Numeric |
nil |
Default timeout for operations |
:buffer_size |
Integer |
8192 |
I/O buffer size in bytes |
:reuse_address |
Boolean |
true |
Socket address reuse flag |
HTTP Client Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#get(url, **options) |
URL string, options hash | Response |
HTTP GET request |
#post(url, body, **options) |
URL, body, options | Response |
HTTP POST request |
#put(url, body, **options) |
URL, body, options | Response |
HTTP PUT request |
#delete(url, **options) |
URL, options | Response |
HTTP DELETE request |
#head(url, **options) |
URL, options | Response |
HTTP HEAD request |
Reactor States
State | Description | Transitions |
---|---|---|
:starting |
Reactor initializing | → :running |
:running |
Processing events | → :stopping , :stopped |
:stopping |
Graceful shutdown | → :stopped |
:stopped |
No longer processing | Terminal state |
Task States
State | Description | Methods Available |
---|---|---|
:running |
Currently executing | #stop , #async , #sleep |
:suspended |
Waiting for I/O or timer | #stop |
:stopped |
Terminated | #wait (returns result or raises) |
:failed |
Terminated with exception | #wait (raises exception) |