CrackedRuby logo

CrackedRuby

Async Gem

A comprehensive guide to Ruby's Async gem for building scalable asynchronous I/O applications.

Concurrency and Parallelism Async I/O
6.6.3

Overview

The Async gem provides a modern asynchronous I/O framework for Ruby applications. Built around an event-driven reactor pattern, Async enables developers to write concurrent code that can handle thousands of simultaneous I/O operations without blocking execution threads. The gem implements cooperative multitasking through fibers, allowing Ruby applications to achieve high concurrency while maintaining readable, sequential-style code.

The core component is the Async::Reactor class, which manages an event loop that schedules and executes asynchronous tasks. Tasks are represented by Async::Task objects that wrap fiber-based execution contexts. The framework provides async-compatible implementations of common I/O operations, including HTTP requests, file operations, and network communication.

require 'async'

Async do
  puts "Starting async task"
  Async::Task.current.sleep(1)
  puts "Task completed after 1 second"
end

The Async gem integrates with Ruby's existing I/O classes through monkey-patching or wrapper implementations. When a task encounters a blocking I/O operation, the reactor suspends that task's fiber and continues executing other tasks until the I/O operation completes. This approach maximizes CPU utilization and enables applications to handle concurrent requests efficiently.

require 'async'
require 'async/http/internet'

Async do |task|
  internet = Async::HTTP::Internet.new
  
  # Multiple concurrent HTTP requests
  responses = task.async do
    [
      internet.get("https://httpbin.org/delay/1"),
      internet.get("https://httpbin.org/delay/2"),
      internet.get("https://httpbin.org/delay/3")
    ].map(&:wait)
  end
  
  puts "All requests completed"
ensure
  internet&.close
end

The framework distinguishes itself from thread-based concurrency by avoiding the overhead of OS thread context switching and eliminating race conditions that plague multi-threaded applications. Since all tasks execute within a single thread, shared state access requires no synchronization primitives.

Basic Usage

Creating asynchronous execution contexts requires the Async method, which establishes a reactor and runs the provided block within an async task. The reactor automatically starts an event loop that continues until all tasks complete or an unhandled exception occurs.

require 'async'

Async do
  puts "Inside async block"
  # Async operations go here
end

puts "After async block completes"

The Async::Task.current method provides access to the currently executing task, which offers methods for suspending execution, spawning child tasks, and managing task lifecycle. The sleep method demonstrates cooperative yielding, where the current task suspends and allows other tasks to execute.

require 'async'

Async do
  puts "Task 1 starting"
  
  # Spawn a child task
  task2 = Async::Task.current.async do
    puts "Task 2 starting"
    Async::Task.current.sleep(2)
    puts "Task 2 completing"
    "result from task 2"
  end
  
  puts "Task 1 sleeping"
  Async::Task.current.sleep(1)
  puts "Task 1 resuming"
  
  # Wait for child task to complete
  result = task2.wait
  puts "Received: #{result}"
end

File I/O operations become asynchronous when using the appropriate async-compatible methods. The Async::IO module provides non-blocking alternatives to Ruby's standard I/O classes.

require 'async'
require 'async/io'

Async do
  # Asynchronous file reading
  Async::IO::Generic.open('/etc/hosts', 'r') do |file|
    content = file.read
    puts "File size: #{content.bytesize} bytes"
  end
  
  # Multiple concurrent file operations
  files = ['file1.txt', 'file2.txt', 'file3.txt']
  
  tasks = files.map do |filename|
    Async::Task.current.async do
      if File.exist?(filename)
        content = File.read(filename)
        puts "#{filename}: #{content.lines.count} lines"
      end
    end
  end
  
  # Wait for all file operations to complete
  tasks.each(&:wait)
end

Network programming with Async involves using async-compatible socket implementations. The Async::IO::Socket classes provide non-blocking network communication that integrates seamlessly with the reactor pattern.

require 'async'
require 'async/io'

Async do |task|
  # Create an async TCP server
  server_task = task.async do
    Async::IO::Endpoint.tcp('localhost', 8080).bind do |server|
      puts "Server listening on port 8080"
      
      server.listen(128) do |client|
        data = client.read
        client.write("Echo: #{data}")
        client.close
      end
    end
  end
  
  # Give server time to start
  task.sleep(0.1)
  
  # Create multiple client connections
  5.times do |i|
    task.async do
      Async::IO::Endpoint.tcp('localhost', 8080).connect do |client|
        client.write("Message #{i}")
        response = client.read
        puts response
      end
    end
  end
  
  task.sleep(1)
  server_task.stop
end

Thread Safety & Concurrency

The Async gem implements a single-threaded concurrency model where all tasks execute within the same OS thread. This design eliminates traditional thread safety concerns since no two tasks execute simultaneously. However, understanding task scheduling and yielding behavior is crucial for writing correct async code.

Tasks yield control at specific suspension points, primarily during I/O operations or explicit sleep calls. Between these points, task execution is atomic - no other task can interrupt or modify shared state. This characteristic simplifies state management but requires careful consideration of when tasks yield control.

require 'async'

class Counter
  def initialize
    @value = 0
  end
  
  def increment
    # This operation is atomic - no other task can interrupt
    old_value = @value
    @value = old_value + 1
    puts "Incremented from #{old_value} to #{@value}"
  end
  
  def slow_increment
    old_value = @value
    # Yielding here allows other tasks to run
    Async::Task.current.sleep(0.01)
    @value = old_value + 1
    puts "Slow increment: #{old_value} -> #{@value}"
  end
end

Async do |task|
  counter = Counter.new
  
  # These increments are safe - atomic execution
  10.times do
    task.async { counter.increment }
  end
  
  # These may produce incorrect results due to yielding
  10.times do
    task.async { counter.slow_increment }
  end
  
  task.sleep(1) # Wait for all tasks to complete
end

When building async applications that span multiple threads, synchronization becomes necessary. The Async::Semaphore class provides a mechanism for controlling access to shared resources across async contexts, even when those contexts exist in different threads.

require 'async'
require 'async/semaphore'

# Shared resource protection
semaphore = Async::Semaphore.new(3) # Allow 3 concurrent accesses

Async do |task|
  10.times do |i|
    task.async do
      semaphore.acquire do
        puts "Task #{i} acquired semaphore"
        # Simulate work that should be limited
        Async::Task.current.sleep(1)
        puts "Task #{i} releasing semaphore"
      end
    end
  end
end

The reactor itself is thread-safe and can accept tasks from multiple threads. However, tasks submitted to a reactor always execute within that reactor's thread. Cross-thread communication requires careful coordination using thread-safe data structures or message passing patterns.

require 'async'
require 'async/notification'

# Thread-safe communication between async contexts
notification = Async::Notification.new

# Producer thread
producer_thread = Thread.new do
  sleep(2) # Simulate work in another thread
  notification.signal("Data ready")
end

# Consumer in async context
Async do
  puts "Waiting for notification"
  result = notification.wait
  puts "Received: #{result}"
end

producer_thread.join

Race conditions can still occur in async code when tasks make assumptions about the state of shared resources after yielding control. The key to avoiding these issues is minimizing the scope of operations that depend on shared state and using appropriate synchronization primitives when necessary.

require 'async'

class AsyncSafeResource
  def initialize
    @data = {}
    @mutex = Async::Mutex.new
  end
  
  def update(key, value)
    @mutex.synchronize do
      # Critical section - other tasks cannot interrupt
      old_value = @data[key]
      # Even if this yields, the mutex prevents other tasks
      # from entering this critical section
      Async::Task.current.sleep(0.01)
      @data[key] = value
      puts "Updated #{key}: #{old_value} -> #{value}"
    end
  end
  
  def read(key)
    @mutex.synchronize do
      @data[key]
    end
  end
end

Performance & Memory

The Async gem delivers significant performance improvements for I/O-bound applications by eliminating thread overhead and maximizing CPU utilization during I/O waits. Benchmarks consistently show that async applications can handle 10-100x more concurrent connections than traditional threaded alternatives, depending on the workload characteristics.

Memory usage remains predictable and low because fibers consume significantly less memory than OS threads. A typical fiber requires 4KB of stack space compared to the 1-2MB default stack size for threads. This efficiency enables applications to maintain thousands of concurrent connections without exhausting system memory.

require 'async'
require 'async/http/internet'
require 'benchmark'

# Performance comparison: sequential vs concurrent requests
urls = Array.new(50) { |i| "https://httpbin.org/delay/1" }

# Sequential approach
sequential_time = Benchmark.realtime do
  internet = Async::HTTP::Internet.new
  
  urls.each do |url|
    response = internet.get(url)
    puts "Response code: #{response.status}"
  end
  
  internet.close
end

# Concurrent approach
concurrent_time = Benchmark.realtime do
  Async do |task|
    internet = Async::HTTP::Internet.new
    
    tasks = urls.map do |url|
      task.async do
        response = internet.get(url)
        puts "Response code: #{response.status}"
      end
    end
    
    tasks.each(&:wait)
    internet.close
  end
end

puts "Sequential: #{sequential_time.round(2)}s"
puts "Concurrent: #{concurrent_time.round(2)}s"
puts "Speedup: #{(sequential_time / concurrent_time).round(2)}x"

Memory profiling reveals that async applications maintain stable memory usage patterns even under high load. The reactor's event loop efficiently manages fiber lifecycles, creating and destroying tasks as needed without significant memory fragmentation.

require 'async'
require 'async/http/internet'

def measure_memory
  GC.start
  `ps -o rss= -p #{Process.pid}`.to_i
end

puts "Initial memory: #{measure_memory} KB"

Async do |task|
  internet = Async::HTTP::Internet.new
  
  # Create many concurrent tasks
  1000.times do |i|
    task.async do
      response = internet.get("https://httpbin.org/json")
      data = response.read
      # Process response data
      JSON.parse(data) if data
    end
    
    # Measure memory every 100 tasks
    if i % 100 == 0
      puts "Memory after #{i + 1} tasks: #{measure_memory} KB"
    end
  end
  
  internet.close
end

puts "Final memory: #{measure_memory} KB"

Optimizing async applications requires understanding when tasks yield control and minimizing unnecessary context switches. Batching operations and reducing the frequency of I/O calls can significantly improve throughput.

require 'async'
require 'async/http/internet'

# Inefficient: many small requests
Async do |task|
  internet = Async::HTTP::Internet.new
  
  inefficient_time = Benchmark.realtime do
    100.times do |i|
      response = internet.get("https://httpbin.org/json")
      data = response.read
    end
  end
  
  # Efficient: batch requests
  efficient_time = Benchmark.realtime do
    tasks = 100.times.map do |i|
      task.async do
        response = internet.get("https://httpbin.org/json")
        response.read
      end
    end
    
    results = tasks.map(&:wait)
  end
  
  puts "Inefficient: #{inefficient_time.round(2)}s"
  puts "Efficient: #{efficient_time.round(2)}s"
  
  internet.close
end

The reactor's internal scheduling algorithm prioritizes tasks based on I/O readiness, ensuring that tasks with completed I/O operations run before those still waiting. This scheduling strategy maximizes throughput by minimizing idle time and maintaining high CPU utilization.

Connection pooling and resource reuse become critical performance factors in async applications. The framework provides built-in connection pooling for HTTP clients, but custom resource pools may be necessary for other types of I/O operations.

require 'async'
require 'async/pool'

# Custom resource pool for database connections
class DatabasePool < Async::Pool::Controller
  def initialize(size: 10)
    super()
    @available = size.times.map { create_connection }
  end
  
  private
  
  def create_connection
    # Simulate database connection creation
    connection_id = Random.rand(1000..9999)
    puts "Created connection #{connection_id}"
    connection_id
  end
  
  def retire_connection(connection)
    puts "Retired connection #{connection}"
  end
end

Async do |task|
  pool = DatabasePool.new(size: 5)
  
  # Multiple tasks sharing connection pool
  20.times do |i|
    task.async do
      pool.acquire do |connection|
        puts "Task #{i} using connection #{connection}"
        Async::Task.current.sleep(0.5) # Simulate database query
      end
    end
  end
end

Production Patterns

Production async applications require robust patterns for error handling, monitoring, and resource management. The reactor pattern naturally fits web server architectures, enabling applications to handle thousands of concurrent HTTP requests efficiently.

Web frameworks like Falcon leverage the Async gem to provide high-performance HTTP servers. Integrating async code with existing Rails applications requires careful consideration of ActiveRecord connections and other blocking operations that can disrupt the async execution model.

require 'async'
require 'async/http/server'
require 'async/http/endpoint'

# Production HTTP server with request handling
class ProductionServer
  def initialize(port: 8080)
    @endpoint = Async::HTTP::Endpoint.parse("http://localhost:#{port}")
    @request_count = 0
    @start_time = Time.now
  end
  
  def start
    Async do |task|
      # Health monitoring task
      task.async do
        loop do
          Async::Task.current.sleep(30)
          uptime = Time.now - @start_time
          puts "Server uptime: #{uptime.round(2)}s, requests: #{@request_count}"
        end
      end
      
      # HTTP server task
      server = Async::HTTP::Server.for(@endpoint) do |request|
        @request_count += 1
        
        case request.path
        when '/health'
          handle_health_check(request)
        when '/api/data'
          handle_data_request(request)
        else
          Protocol::HTTP::Response[404, {}, ["Not Found"]]
        end
      rescue => error
        puts "Request error: #{error.message}"
        Protocol::HTTP::Response[500, {}, ["Internal Server Error"]]
      end
      
      puts "Server starting on #{@endpoint.url}"
      server.run
    end
  end
  
  private
  
  def handle_health_check(request)
    response_data = {
      status: 'healthy',
      uptime: Time.now - @start_time,
      requests: @request_count
    }.to_json
    
    Protocol::HTTP::Response[200, { 'content-type' => 'application/json' }, [response_data]]
  end
  
  def handle_data_request(request)
    # Simulate async data processing
    Async::Task.current.sleep(0.1)
    
    data = { message: 'Hello from async server', timestamp: Time.now.iso8601 }.to_json
    Protocol::HTTP::Response[200, { 'content-type' => 'application/json' }, [data]]
  end
end

server = ProductionServer.new
server.start

Graceful shutdown handling ensures that in-flight requests complete before the server terminates. Signal handlers can coordinate with the reactor to stop accepting new connections while allowing existing tasks to finish.

require 'async'
require 'async/http/server'

class GracefulServer
  def initialize
    @running = true
    @active_requests = 0
    @shutdown_timeout = 30
  end
  
  def start
    # Setup signal handlers
    trap('TERM') { initiate_shutdown }
    trap('INT') { initiate_shutdown }
    
    Async do |task|
      server = Async::HTTP::Server.for(Async::HTTP::Endpoint.parse('http://localhost:8080')) do |request|
        @active_requests += 1
        
        begin
          # Return 503 if shutting down
          unless @running
            return Protocol::HTTP::Response[503, {}, ["Service Unavailable"]]
          end
          
          # Process request
          handle_request(request)
        ensure
          @active_requests -= 1
        end
      end
      
      server.run
    end
  end
  
  private
  
  def initiate_shutdown
    puts "Shutdown initiated. Waiting for #{@active_requests} active requests..."
    @running = false
    
    # Wait for active requests to complete
    timeout = Time.now + @shutdown_timeout
    while @active_requests > 0 && Time.now < timeout
      sleep(0.1)
    end
    
    if @active_requests > 0
      puts "Force shutdown: #{@active_requests} requests still active"
    else
      puts "Graceful shutdown complete"
    end
    
    exit(0)
  end
  
  def handle_request(request)
    # Simulate request processing
    Async::Task.current.sleep(rand(0.1..2.0))
    Protocol::HTTP::Response[200, {}, ["Request processed"]]
  end
end

Monitoring async applications requires tracking reactor health, task queue depths, and I/O operation latencies. Custom metrics collection can integrate with monitoring systems like Prometheus or StatsD.

require 'async'
require 'async/clock'

class AsyncMetrics
  def initialize
    @task_count = 0
    @completed_tasks = 0
    @total_execution_time = 0
    @start_time = Async::Clock.now
  end
  
  def track_task(&block)
    @task_count += 1
    start_time = Async::Clock.now
    
    begin
      result = yield
      @completed_tasks += 1
      @total_execution_time += Async::Clock.now - start_time
      result
    rescue => error
      puts "Task failed: #{error.message}"
      raise
    ensure
      @task_count -= 1
    end
  end
  
  def report_stats
    uptime = Async::Clock.now - @start_time
    avg_execution_time = @completed_tasks > 0 ? @total_execution_time / @completed_tasks : 0
    
    {
      uptime: uptime,
      active_tasks: @task_count,
      completed_tasks: @completed_tasks,
      average_execution_time: avg_execution_time,
      tasks_per_second: @completed_tasks / uptime
    }
  end
end

# Usage in production application
Async do |task|
  metrics = AsyncMetrics.new
  
  # Periodic metrics reporting
  task.async do
    loop do
      Async::Task.current.sleep(60)
      stats = metrics.report_stats
      puts "Async metrics: #{stats}"
    end
  end
  
  # Application tasks with metrics tracking
  100.times do |i|
    task.async do
      metrics.track_task do
        # Simulate work
        Async::Task.current.sleep(rand(0.1..0.5))
        puts "Completed task #{i}"
      end
    end
  end
end

Error Handling & Debugging

Async applications present unique error handling challenges due to their concurrent nature and fiber-based execution model. Exceptions that occur within async tasks do not automatically propagate to the parent context, requiring explicit error handling strategies to prevent silent failures.

The fundamental pattern for async error handling involves using task objects to capture and propagate exceptions. When a task encounters an unhandled exception, the error is stored within the task object and raised when another task calls the wait method.

require 'async'

Async do |task|
  # Task that will fail
  failing_task = task.async do
    Async::Task.current.sleep(1)
    raise StandardError, "Something went wrong"
  end
  
  # Task that will succeed
  success_task = task.async do
    Async::Task.current.sleep(0.5)
    "Success result"
  end
  
  begin
    # This will raise the exception from failing_task
    result = failing_task.wait
    puts "This won't be reached"
  rescue StandardError => error
    puts "Caught error: #{error.message}"
  end
  
  # Success task continues independently
  puts "Success result: #{success_task.wait}"
end

Handling multiple concurrent tasks requires collecting results and errors systematically. The pattern of mapping tasks and then waiting for results ensures that all tasks complete while properly handling any exceptions that occur.

require 'async'
require 'async/http/internet'

def process_urls_with_error_handling(urls)
  Async do |task|
    internet = Async::HTTP::Internet.new
    
    tasks = urls.map do |url|
      task.async do
        begin
          response = internet.get(url)
          { url: url, status: response.status, success: true }
        rescue => error
          { url: url, error: error.message, success: false }
        end
      end
    end
    
    # Collect all results, including errors
    results = tasks.map do |task_obj|
      begin
        task_obj.wait
      rescue => error
        { url: "unknown", error: error.message, success: false }
      end
    end
    
    internet.close
    results
  end
end

urls = [
  'https://httpbin.org/status/200',
  'https://httpbin.org/status/404', 
  'https://invalid-domain-12345.com',
  'https://httpbin.org/status/500'
]

results = process_urls_with_error_handling(urls)
results.each do |result|
  if result[:success]
    puts "#{result[:url]}: HTTP #{result[:status]}"
  else
    puts "#{result[:url]}: ERROR - #{result[:error]}"
  end
end

Debugging async code requires different approaches than traditional synchronous debugging. Stack traces become more complex due to fiber switching, and debugger integration may not work as expected. Logging becomes crucial for understanding execution flow and identifying issues.

require 'async'
require 'logger'

class AsyncLogger
  def initialize
    @logger = Logger.new(STDOUT)
    @logger.formatter = proc do |severity, datetime, progname, msg|
      task_id = Async::Task.current&.object_id || 'main'
      "[#{datetime}] #{severity} [Task:#{task_id}] #{msg}\n"
    end
  end
  
  def info(message)
    @logger.info(message)
  end
  
  def error(message)
    @logger.error(message)
  end
  
  def debug(message)
    @logger.debug(message)
  end
end

# Usage in async context
Async do |task|
  logger = AsyncLogger.new
  
  logger.info("Starting async operation")
  
  # Multiple tasks with logging
  tasks = 5.times.map do |i|
    task.async do
      logger.debug("Task #{i} starting")
      
      begin
        # Simulate work that might fail
        Async::Task.current.sleep(rand(0.1..0.5))
        
        if rand < 0.3  # 30% chance of failure
          raise "Random failure in task #{i}"
        end
        
        logger.info("Task #{i} completed successfully")
        "Result #{i}"
      rescue => error
        logger.error("Task #{i} failed: #{error.message}")
        raise
      end
    end
  end
  
  # Wait for all tasks with error handling
  results = []
  tasks.each_with_index do |task_obj, i|
    begin
      result = task_obj.wait
      results << result
    rescue => error
      logger.error("Failed to get result from task #{i}")
      results << nil
    end
  end
  
  logger.info("All tasks completed. Results: #{results.compact.size}/#{tasks.size}")
end

Timeout handling requires careful coordination with the reactor's scheduling system. The Async::TimeoutError class provides a standard mechanism for handling operations that exceed expected completion times.

require 'async'

def with_timeout(seconds, &block)
  Async do |task|
    timeout_task = task.async do
      Async::Task.current.sleep(seconds)
      raise Async::TimeoutError, "Operation timed out after #{seconds} seconds"
    end
    
    work_task = task.async(&block)
    
    # Race between timeout and work completion
    result = task.async do
      begin
        work_task.wait
      ensure
        timeout_task.stop if timeout_task.running?
      end
    end
    
    result.wait
  end
end

# Example usage with timeout
Async do
  begin
    result = with_timeout(2) do
      puts "Starting long operation"
      Async::Task.current.sleep(3) # This will timeout
      "Operation completed"
    end
    
    puts "Result: #{result}"
  rescue Async::TimeoutError => error
    puts "Operation failed: #{error.message}"
  end
end

Reference

Core Classes

Class Purpose Key Methods
Async::Reactor Event loop management #run, #stop, #stopped?
Async::Task Fiber-based execution context #async, #wait, #stop, #sleep
Async::Condition Task synchronization primitive #wait, #signal
Async::Semaphore Resource access control #acquire, #release
Async::Notification One-time signaling mechanism #wait, #signal

Task Management

Method Parameters Returns Description
Async { block } Block Task Creates reactor and runs async block
Task.current.async { block } Block Task Spawns child task within current reactor
Task#wait None Object Waits for task completion, returns result
Task#stop None nil Stops task execution
Task#sleep(duration) duration (Numeric) nil Suspends task for specified seconds
Task#yield None nil Yields control to other tasks

I/O Operations

Class Purpose Common Methods
Async::IO::Generic Async file I/O wrapper #open, #read, #write, #close
Async::IO::Socket Network socket operations #connect, #bind, #listen, #accept
Async::IO::Endpoint Network endpoint abstraction #tcp, #udp, #unix
Async::HTTP::Internet HTTP client interface #get, #post, #put, #delete

Synchronization Primitives

Primitive Use Case Methods Example
Semaphore Limit concurrent access #acquire, #release Database connection pooling
Condition Wait for specific conditions #wait, #signal Producer-consumer patterns
Notification One-time signaling #wait, #signal Startup completion
Barrier Synchronize multiple tasks #wait Batch processing coordination

Error Classes

Exception Inheritance Description
Async::Stop Exception Clean task termination
Async::TimeoutError StandardError Operation timeout exceeded
Async::Wrapper StandardError I/O wrapper errors
Async::ReactorStopped StandardError Reactor no longer running

Configuration Options

Option Type Default Description
:reactor Symbol :auto Reactor implementation (:auto, :nio, :epoll)
:timeout Numeric nil Default timeout for operations
:buffer_size Integer 8192 I/O buffer size in bytes
:reuse_address Boolean true Socket address reuse flag

HTTP Client Methods

Method Parameters Returns Description
#get(url, **options) URL string, options hash Response HTTP GET request
#post(url, body, **options) URL, body, options Response HTTP POST request
#put(url, body, **options) URL, body, options Response HTTP PUT request
#delete(url, **options) URL, options Response HTTP DELETE request
#head(url, **options) URL, options Response HTTP HEAD request

Reactor States

State Description Transitions
:starting Reactor initializing :running
:running Processing events :stopping, :stopped
:stopping Graceful shutdown :stopped
:stopped No longer processing Terminal state

Task States

State Description Methods Available
:running Currently executing #stop, #async, #sleep
:suspended Waiting for I/O or timer #stop
:stopped Terminated #wait (returns result or raises)
:failed Terminated with exception #wait (raises exception)