CrackedRuby CrackedRuby

Overview

Futures and promises provide abstractions for managing asynchronous computations and their eventual results. A future represents a read-only view of a value that will become available, while a promise represents the writable side that produces that value. These constructs decouple the creation of a result from its retrieval, allowing code to continue execution without blocking while waiting for long-running operations.

The concept originated in functional programming languages and distributed systems research in the 1970s and 1980s. Futures appeared in languages like MultiLisp and Act1, while promises emerged from work on distributed computing. Modern programming languages adopted these patterns to handle I/O operations, network requests, and parallel computations without blocking threads.

In synchronous code, function calls block until complete. When a function performs a network request, the calling thread waits for the response before proceeding. Futures and promises transform this model by returning immediately with a container object representing the eventual result. The calling code can continue execution, register callbacks for completion, or explicitly wait when the value becomes necessary.

# Synchronous blocking call
result = fetch_data_from_api(url)
process(result)

# Asynchronous with future
future = fetch_data_from_api_async(url)
# Execution continues immediately
do_other_work()
# Retrieve result when needed
result = future.value
process(result)

The distinction between futures and promises depends on the language and library implementation. Some systems use "future" for both readable and writable aspects, while others maintain separate types. The terminology varies, but the core concept remains consistent: representing values that don't exist yet but will exist in the future.

Key Principles

Futures and promises operate on three fundamental states: pending, fulfilled, and rejected. A computation starts in the pending state when created. It transitions to fulfilled when the operation completes successfully with a value, or to rejected when an error occurs. This state machine ensures each computation resolves exactly once.

The separation of concerns between producer and consumer defines the architecture. The producer holds the promise and controls when and how the value gets set. Consumers receive futures that provide read-only access to the eventual value. This separation prevents consumers from interfering with value production and maintains clear ownership boundaries.

# Producer creates promise and controls resolution
promise = Concurrent::Promise.new do
  expensive_computation()
end

# Consumer receives future interface
future = promise.execute
result = future.value  # Blocks until resolved

Non-blocking execution forms the core advantage. When code initiates an asynchronous operation, it receives a future immediately and continues executing other tasks. The runtime or thread pool handles the actual computation separately. This approach maximizes CPU utilization by preventing threads from idling during I/O operations.

Composition mechanisms allow combining multiple futures into complex workflows. Operations like chaining, mapping, and joining enable building sophisticated asynchronous pipelines. Each operation produces a new future, maintaining immutability and referential transparency.

future1 = async_operation_1()
future2 = async_operation_2()

# Combine results from both futures
combined = Concurrent::Promises.zip(future1, future2).then do |result1, result2|
  merge(result1, result2)
end

Error propagation through the future chain ensures failures don't get lost. When a future rejects, the error travels through subsequent operations until handled explicitly. This automatic propagation prevents silent failures common in callback-based code.

Thread safety requirements constrain implementation details. Multiple threads may attempt to resolve a promise or read from a future simultaneously. Implementations must use synchronization primitives to ensure state transitions occur atomically and values propagate correctly across memory barriers.

The happens-before relationship establishes ordering guarantees. Setting a promise's value happens-before any thread observing that value through the future. This memory model ensures visibility across threads without requiring explicit synchronization in user code.

Lazy versus eager evaluation determines when computation begins. Eager futures start execution immediately when created, while lazy futures defer work until a thread requests the value. The choice affects resource consumption and computation timing.

Ruby Implementation

Ruby provides futures and promises through the concurrent-ruby gem, the standard library's asynchronous execution capabilities, and manual implementation using threads and mutexes. The concurrent-ruby library offers the most comprehensive and production-ready implementation.

The Concurrent::Promise class serves as Ruby's primary future/promise implementation. Promises encapsulate asynchronous computation and provide a future-like interface for retrieving results. Creating a promise involves passing a block containing the computation:

require 'concurrent-ruby'

promise = Concurrent::Promise.new do
  sleep(2)  # Simulate long operation
  compute_expensive_result()
end

promise.execute  # Starts async execution
result = promise.value  # Blocks until complete

The Concurrent::Future class provides similar functionality with automatic execution on creation:

future = Concurrent::Future.execute do
  fetch_from_database()
end

# Execution starts immediately
other_work()

# Wait for result when needed
data = future.value

State inspection methods allow checking computation status without blocking. The pending?, fulfilled?, and rejected? methods return the current state. The complete? method returns true for either fulfilled or rejected states:

future = Concurrent::Future.execute { slow_operation() }

if future.pending?
  puts "Still computing"
end

future.wait  # Block until complete

if future.fulfilled?
  puts "Success: #{future.value}"
elsif future.rejected?
  puts "Failed: #{future.reason}"
end

Chaining operations through then creates pipelines of dependent computations. Each then call returns a new future that resolves when the previous future completes and the transformation finishes:

future = Concurrent::Future.execute { fetch_user(id) }
  .then { |user| fetch_posts(user) }
  .then { |posts| transform_posts(posts) }
  .then { |data| render_response(data) }

final_result = future.value

Error handling uses rescue to catch exceptions and provide fallback values or alternative computations:

future = Concurrent::Future.execute { risky_operation() }
  .rescue { |error| 
    log_error(error)
    default_value
  }

result = future.value  # Returns default if operation fails

The Concurrent::Promises module provides advanced composition operators. The zip function combines multiple futures into a single future containing an array of results:

user_future = Concurrent::Promises.future { fetch_user(id) }
posts_future = Concurrent::Promises.future { fetch_posts(id) }
comments_future = Concurrent::Promises.future { fetch_comments(id) }

combined = Concurrent::Promises.zip(user_future, posts_future, comments_future)
  .then { |user, posts, comments|
    build_profile(user, posts, comments)
  }

profile = combined.value

The any combinator returns the first future to resolve successfully:

primary = Concurrent::Promises.future { fetch_from_primary_db() }
secondary = Concurrent::Promises.future { fetch_from_secondary_db() }

fastest = Concurrent::Promises.any(primary, secondary)
data = fastest.value  # Uses whichever responds first

Custom executors control how futures run. By default, futures use the global thread pool, but applications can specify custom executors for different workload characteristics:

io_executor = Concurrent::ThreadPoolExecutor.new(
  min_threads: 5,
  max_threads: 20,
  max_queue: 100
)

future = Concurrent::Future.execute(executor: io_executor) do
  network_request()
end

Manual implementation without libraries demonstrates the underlying concepts. A basic promise uses a mutex and condition variable for thread coordination:

class SimplePromise
  def initialize
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @resolved = false
    @value = nil
    @error = nil
  end

  def resolve(value)
    @mutex.synchronize do
      return if @resolved
      @value = value
      @resolved = true
      @condition.broadcast
    end
  end

  def reject(error)
    @mutex.synchronize do
      return if @resolved
      @error = error
      @resolved = true
      @condition.broadcast
    end
  end

  def value
    @mutex.synchronize do
      @condition.wait(@mutex) until @resolved
      raise @error if @error
      @value
    end
  end
end

# Usage
promise = SimplePromise.new

Thread.new do
  sleep(1)
  promise.resolve(compute_value())
end

result = promise.value  # Blocks until resolved

Practical Examples

Web service aggregation demonstrates futures' ability to parallelize independent requests. An application displaying user profile information needs data from multiple services: user details, posts, friends list, and recommendations. Sequential requests would sum the latency of each call, while futures enable concurrent fetching:

def build_user_profile(user_id)
  user = Concurrent::Promises.future { UserService.fetch(user_id) }
  posts = Concurrent::Promises.future { PostService.fetch_by_user(user_id) }
  friends = Concurrent::Promises.future { FriendService.fetch_friends(user_id) }
  recommendations = Concurrent::Promises.future { 
    RecommendationService.get_recommendations(user_id) 
  }

  Concurrent::Promises.zip(user, posts, friends, recommendations)
    .then { |u, p, f, r|
      ProfileData.new(
        user: u.value,
        posts: p.value,
        friends: f.value,
        recommendations: r.value
      )
    }
    .value
end

Database query parallelization speeds up complex reports requiring data from multiple tables. Instead of sequential queries blocking the database connection, futures execute queries concurrently:

def generate_sales_report(start_date, end_date)
  sales_future = Concurrent::Future.execute do
    database.query("SELECT * FROM sales WHERE date BETWEEN ? AND ?", 
                   start_date, end_date)
  end

  inventory_future = Concurrent::Future.execute do
    database.query("SELECT * FROM inventory WHERE updated >= ?", start_date)
  end

  customer_future = Concurrent::Future.execute do
    database.query("SELECT * FROM customers WHERE last_purchase >= ?", 
                   start_date)
  end

  sales = sales_future.value
  inventory = inventory_future.value
  customers = customer_future.value

  compile_report(sales, inventory, customers)
end

Background job processing with futures provides progress tracking and cancellation. A video transcoding service processes uploaded files asynchronously while keeping the client informed:

class VideoTranscoder
  def transcode_async(video_id)
    promise = Concurrent::Promise.new do
      video = Video.find(video_id)
      
      # Transcode in multiple quality levels
      qualities = ['720p', '1080p', '4k']
      results = qualities.map do |quality|
        Concurrent::Future.execute do
          transcode_to_quality(video, quality)
        end
      end

      # Wait for all qualities
      results.map(&:value)
    end

    promise.execute
    promise
  end

  def check_progress(promise)
    return :complete if promise.fulfilled?
    return :failed if promise.rejected?
    :processing
  end
end

# Usage
transcoder = VideoTranscoder.new
future = transcoder.transcode_async(video_id)

# Poll for completion
until future.complete?
  status = transcoder.check_progress(future)
  update_ui(status)
  sleep(1)
end

transcoded_files = future.value

Cache warming with futures preloads data during application startup without blocking initialization. The application starts serving requests while caches populate asynchronously:

class ApplicationCache
  def warm_caches
    futures = []

    futures << Concurrent::Future.execute do
      @product_cache = Product.all.index_by(&:id)
      Rails.logger.info "Product cache loaded: #{@product_cache.size} items"
    end

    futures << Concurrent::Future.execute do
      @category_cache = Category.includes(:products).to_a
      Rails.logger.info "Category cache loaded: #{@category_cache.size} items"
    end

    futures << Concurrent::Future.execute do
      @config_cache = Configuration.all.to_h { |c| [c.key, c.value] }
      Rails.logger.info "Config cache loaded: #{@config_cache.size} items"
    end

    # Return future that completes when all caches ready
    Concurrent::Promises.zip(*futures).then do
      Rails.logger.info "All caches warmed"
    end
  end
end

# During initialization
cache = ApplicationCache.new
warm_future = cache.warm_caches

# Application continues starting
setup_routes()
initialize_middleware()

# Wait for caches before accepting requests
warm_future.wait
start_server()

Common Patterns

The callback pattern registers functions to execute upon future resolution. Instead of blocking to retrieve a value, code provides callbacks for success and failure cases:

future = Concurrent::Future.execute { fetch_data() }

future.on_success { |value| 
  process_result(value)
  notify_observers(value)
}

future.on_error { |error|
  log_error(error)
  alert_monitoring_system(error)
}

The timeout pattern prevents indefinite blocking when waiting for futures. Applications specify maximum wait times and handle timeout scenarios explicitly:

def fetch_with_timeout(url, timeout_seconds)
  future = Concurrent::Future.execute { HTTP.get(url) }
  
  if future.wait(timeout_seconds)
    future.value
  else
    raise TimeoutError, "Request exceeded #{timeout_seconds} seconds"
  end
end

begin
  response = fetch_with_timeout(api_url, 5)
rescue TimeoutError
  response = fetch_from_cache(api_url)
end

The fan-out/fan-in pattern distributes work across multiple futures and aggregates results. Batch processing operations split data into chunks, process them concurrently, and merge outputs:

def process_batch(items, chunk_size = 100)
  chunks = items.each_slice(chunk_size).to_a
  
  futures = chunks.map do |chunk|
    Concurrent::Future.execute do
      chunk.map { |item| process_item(item) }
    end
  end

  results = futures.flat_map(&:value)
  results
end

# Process 10,000 items in parallel chunks
items = (1..10_000).to_a
processed = process_batch(items, 100)

The retry pattern handles transient failures by automatically retrying failed operations with exponential backoff:

def future_with_retry(max_attempts: 3, initial_delay: 1.0)
  attempt = 0
  
  future = Concurrent::Future.execute do
    begin
      attempt += 1
      yield
    rescue StandardError => e
      if attempt < max_attempts
        delay = initial_delay * (2 ** (attempt - 1))
        sleep(delay)
        retry
      else
        raise e
      end
    end
  end
  
  future
end

# Usage
future = future_with_retry(max_attempts: 3) do
  unreliable_api_call()
end

result = future.value

The circuit breaker pattern prevents cascading failures by stopping requests to failing services. After a threshold of failures, the circuit opens and fails fast without attempting the operation:

class CircuitBreaker
  def initialize(failure_threshold: 5, timeout: 60)
    @failure_threshold = failure_threshold
    @timeout = timeout
    @failures = 0
    @last_failure_time = nil
    @state = :closed
  end

  def call(&block)
    if @state == :open && Time.now - @last_failure_time > @timeout
      @state = :half_open
    end

    if @state == :open
      return Concurrent::Promise.reject(
        CircuitOpenError.new("Circuit breaker is open")
      )
    end

    future = Concurrent::Future.execute(&block)
    
    future.on_success { reset_failures }
    future.on_error { record_failure }
    
    future
  end

  private

  def record_failure
    @failures += 1
    @last_failure_time = Time.now
    @state = :open if @failures >= @failure_threshold
  end

  def reset_failures
    @failures = 0
    @state = :closed
  end
end

The memoized future pattern caches computation results to avoid redundant work. Multiple consumers can share a single future, with the computation executing only once:

class MemoizedFuture
  def initialize(&block)
    @block = block
    @future = nil
    @mutex = Mutex.new
  end

  def get
    @mutex.synchronize do
      @future ||= Concurrent::Future.execute(&@block)
    end
  end
end

# Usage - computation runs once despite multiple callers
expensive_computation = MemoizedFuture.new { compute_complex_result() }

future1 = expensive_computation.get
future2 = expensive_computation.get  # Returns same future
future3 = expensive_computation.get  # Returns same future

result = future1.value  # All three share this result

Error Handling & Edge Cases

Exception propagation through future chains requires careful handling to prevent error swallowing. When a future rejects, subsequent operations in the chain skip unless explicitly rescuing:

future = Concurrent::Future.execute { failing_operation() }
  .then { |result| transform(result) }  # Skipped if previous failed
  .then { |result| finalize(result) }   # Also skipped
  .rescue { |error| 
    # Handle error from any stage
    log_error(error)
    fallback_value
  }

result = future.value

Timeout handling prevents resource leaks when futures never resolve. The timeout mechanism doesn't cancel the underlying computation, only stops waiting for it. The operation continues executing in the background:

future = Concurrent::Future.execute { very_long_operation() }

begin
  result = future.wait(timeout: 5)
  if result.nil?
    raise TimeoutError, "Operation timed out"
  end
  future.value
rescue TimeoutError => e
  # Future still executing in background thread
  # Clean up resources if possible
  cleanup_resources()
  raise
end

Cancellation requires explicit support since running futures cannot be forcibly stopped. The computation must periodically check a cancellation flag and exit gracefully:

class CancellableFuture
  def initialize(&block)
    @cancelled = Concurrent::AtomicBoolean.new(false)
    @future = Concurrent::Future.execute do
      block.call(@cancelled)
    end
  end

  def cancel
    @cancelled.make_true
  end

  def value
    @future.value
  end
end

# Usage
future = CancellableFuture.new do |cancelled|
  results = []
  data.each do |item|
    return results if cancelled.true?  # Check flag
    results << process(item)
  end
  results
end

# Cancel from another thread
future.cancel

Race conditions occur when multiple threads resolve a promise simultaneously. Promise implementations must ensure atomic state transitions:

# Thread-safe promise resolution
class SafePromise
  def initialize
    @mutex = Mutex.new
    @resolved = false
    @value = nil
  end

  def try_resolve(value)
    @mutex.synchronize do
      return false if @resolved
      @value = value
      @resolved = true
      true  # Successfully resolved
    end
  end

  def value
    @mutex.synchronize { @value }
  end
end

promise = SafePromise.new

# Multiple threads try to resolve
threads = 10.times.map do |i|
  Thread.new { promise.try_resolve(i) }
end

threads.each(&:join)
# Only first thread succeeds; others return false

Deadlock prevention requires attention to lock ordering when futures depend on each other. Circular dependencies between futures cause threads to wait indefinitely:

# Deadlock scenario
future1 = Concurrent::Future.new { future2.value }
future2 = Concurrent::Future.new { future1.value }

future1.execute
future2.execute

# Both futures wait for each other forever
# future1.value  # Deadlocks

Memory leaks occur when long-lived futures retain references to large objects. Future chains accumulate intermediate results even if only the final value matters:

# Memory leak - retains all intermediate results
future = Concurrent::Future.execute { load_large_dataset() }
  .then { |data| process_step1(data) }
  .then { |data| process_step2(data) }
  .then { |data| process_step3(data) }

# Better - release intermediate values
future = Concurrent::Future.execute { load_large_dataset() }
  .then do |data| 
    result = process_step1(data)
    data = nil  # Release original
    result
  end

Thread pool exhaustion happens when blocking operations run in the future executor. Futures that wait for other futures consume threads from the pool, potentially causing deadlock:

# Dangerous - blocks thread pool thread
future1 = Concurrent::Future.execute do
  # This blocks a thread pool thread while waiting
  future2 = Concurrent::Future.execute { slow_operation() }
  future2.value  # Blocks thread
end

# Better - use chaining
future1 = Concurrent::Future.execute { slow_operation() }
  .then { |result| process(result) }

Unhandled rejections cause silent failures when no error handlers exist. Applications should ensure all futures have error handling paths:

# Bad - errors disappear
future = Concurrent::Future.execute { failing_operation() }
# If not retrieved, error never surfaces

# Good - explicit error handling
future = Concurrent::Future.execute { failing_operation() }
  .rescue { |error|
    ErrorReporter.notify(error)
    default_value
  }

# Or wait with error handling
begin
  result = future.value
rescue StandardError => e
  handle_error(e)
end

Design Considerations

Choosing between futures and callbacks depends on the composition complexity and error handling requirements. Callbacks work for simple async operations but become unwieldy with multiple dependencies. Futures provide composition operators that keep code readable:

# Callback hell
fetch_user(id) do |user, error|
  if error
    handle_error(error)
  else
    fetch_posts(user.id) do |posts, error|
      if error
        handle_error(error)
      else
        fetch_comments(posts) do |comments, error|
          if error
            handle_error(error)
          else
            render(user, posts, comments)
          end
        end
      end
    end
  end
end

# Future composition
Concurrent::Future.execute { fetch_user(id) }
  .then { |user| fetch_posts(user.id) }
  .then { |posts| fetch_comments(posts) }
  .then { |comments| render(user, posts, comments) }
  .rescue { |error| handle_error(error) }

Synchronous versus asynchronous execution affects system throughput and resource utilization. Synchronous code blocks threads during I/O, limiting concurrent operations to thread count. Asynchronous futures enable higher concurrency by freeing threads during waits. The trade-off involves increased complexity and harder debugging.

Eager versus lazy evaluation determines when computation starts. Eager futures begin execution immediately, consuming resources regardless of whether results get used. Lazy futures defer work until requested, saving resources but adding latency when values become necessary:

# Eager - starts immediately
eager = Concurrent::Future.execute { expensive_computation() }
# Work starts even if never used

# Lazy - defers execution
lazy = Concurrent::Promises.delay { expensive_computation() }
# Work starts only when value requested
result = lazy.value

Thread pool sizing affects performance characteristics. Too few threads cause queuing delays, while too many threads increase context switching overhead and memory consumption. Thread count should match workload type: CPU-bound work benefits from thread count near CPU core count, while I/O-bound work handles more threads:

# CPU-bound workload
cpu_executor = Concurrent::ThreadPoolExecutor.new(
  min_threads: 2,
  max_threads: Concurrent.processor_count,
  max_queue: 100
)

# I/O-bound workload
io_executor = Concurrent::ThreadPoolExecutor.new(
  min_threads: 10,
  max_threads: 100,
  max_queue: 1000
)

Error recovery strategy determines system resilience. Fast-fail approaches reject immediately when errors occur, propagating failures to callers. Retry with backoff attempts recovery before failing. Circuit breakers prevent cascading failures. The choice depends on error characteristics and system requirements.

Backpressure mechanisms prevent overwhelming downstream systems when producers generate work faster than consumers process it. Bounded queues block producers when full, providing natural backpressure. Unbounded queues risk memory exhaustion. Applications must choose queue sizes based on acceptable latency and memory constraints.

Testing asynchronous code requires different approaches than synchronous code. Futures introduce nondeterminism and timing dependencies. Tests must wait for futures to complete and handle potential race conditions. Mock futures can isolate units from async dependencies:

# Testing with futures
def test_async_operation
  future = perform_async_operation()
  future.wait(timeout: 5)
  
  assert future.fulfilled?
  assert_equal expected_result, future.value
end

# Mocking futures
def test_with_mock_future
  mock_future = Concurrent::Promises.fulfilled_future(mock_result)
  allow(service).to receive(:fetch_async).and_return(mock_future)
  
  result = process_with_service()
  assert_equal processed_mock_result, result
end

Reference

Future States

State Description Transitions To
Pending Computation not yet complete Fulfilled or Rejected
Fulfilled Successfully completed with value Terminal state
Rejected Failed with error Terminal state

Core Operations

Operation Purpose Blocks
execute Start async computation No
value Retrieve result Yes, until resolved
wait Wait for completion Yes, until complete
wait(timeout) Wait with timeout Yes, up to timeout
pending? Check if still computing No
fulfilled? Check if succeeded No
rejected? Check if failed No
complete? Check if done (either state) No

Composition Methods

Method Description Returns
then Chain dependent computation New future
rescue Handle errors New future
on_success Register success callback Self
on_error Register error callback Self
zip Combine multiple futures Future with array
any Race multiple futures Future with first result
delay Create lazy future Deferred future

Concurrent-Ruby Promise Creation

# Execute immediately
future = Concurrent::Future.execute { computation() }

# Manual execution
promise = Concurrent::Promise.new { computation() }
promise.execute

# Fulfilled promise
fulfilled = Concurrent::Promises.fulfilled_future(value)

# Rejected promise
rejected = Concurrent::Promises.rejected_future(error)

# Lazy promise
lazy = Concurrent::Promises.delay { computation() }

Error Handling Patterns

Pattern Implementation Use Case
Rescue chain then().rescue() Single fallback
Try-catch value begin future.value rescue end Explicit handling
Default on error rescue { default_value } Fallback values
Error callback on_error { handle } Side effects only
Retry rescue { retry_future } Transient failures

Common Configuration Options

Option Purpose Typical Values
executor Thread pool to use Custom executor instance
timeout Maximum wait time Seconds (integer or float)
max_threads Thread pool size CPU count for CPU-bound, 20-100 for I/O
max_queue Queue capacity 100-1000 depending on memory

State Transition Rules

# Promise can only resolve once
promise.resolve(value1)
promise.resolve(value2)  # Ignored - already resolved

# Rejected promises skip then() blocks
Concurrent::Future.execute { raise "Error" }
  .then { |v| process(v) }  # Skipped
  .rescue { |e| handle(e) }  # Executes

# Rescue converts rejection to fulfillment
future = Concurrent::Future.execute { raise "Error" }
  .rescue { |e| "default" }

future.fulfilled?  # true
future.value      # "default"

Performance Characteristics

Operation Time Complexity Notes
Create future O(1) Allocates promise object
Read value (resolved) O(1) Returns cached value
Read value (pending) O(blocking) Waits for resolution
Chain then() O(1) Creates new future
Zip n futures O(n) Waits for slowest

Thread Safety Guarantees

Aspect Guarantee
Multiple resolve attempts Only first succeeds
Reading from resolved future Thread-safe, no locks
Callback registration Thread-safe
Value visibility Happens-before relationship established
Exception propagation Safely crosses thread boundaries