Overview
Futures and promises provide abstractions for managing asynchronous computations and their eventual results. A future represents a read-only view of a value that will become available, while a promise represents the writable side that produces that value. These constructs decouple the creation of a result from its retrieval, allowing code to continue execution without blocking while waiting for long-running operations.
The concept originated in functional programming languages and distributed systems research in the 1970s and 1980s. Futures appeared in languages like MultiLisp and Act1, while promises emerged from work on distributed computing. Modern programming languages adopted these patterns to handle I/O operations, network requests, and parallel computations without blocking threads.
In synchronous code, function calls block until complete. When a function performs a network request, the calling thread waits for the response before proceeding. Futures and promises transform this model by returning immediately with a container object representing the eventual result. The calling code can continue execution, register callbacks for completion, or explicitly wait when the value becomes necessary.
# Synchronous blocking call
result = fetch_data_from_api(url)
process(result)
# Asynchronous with future
future = fetch_data_from_api_async(url)
# Execution continues immediately
do_other_work()
# Retrieve result when needed
result = future.value
process(result)
The distinction between futures and promises depends on the language and library implementation. Some systems use "future" for both readable and writable aspects, while others maintain separate types. The terminology varies, but the core concept remains consistent: representing values that don't exist yet but will exist in the future.
Key Principles
Futures and promises operate on three fundamental states: pending, fulfilled, and rejected. A computation starts in the pending state when created. It transitions to fulfilled when the operation completes successfully with a value, or to rejected when an error occurs. This state machine ensures each computation resolves exactly once.
The separation of concerns between producer and consumer defines the architecture. The producer holds the promise and controls when and how the value gets set. Consumers receive futures that provide read-only access to the eventual value. This separation prevents consumers from interfering with value production and maintains clear ownership boundaries.
# Producer creates promise and controls resolution
promise = Concurrent::Promise.new do
expensive_computation()
end
# Consumer receives future interface
future = promise.execute
result = future.value # Blocks until resolved
Non-blocking execution forms the core advantage. When code initiates an asynchronous operation, it receives a future immediately and continues executing other tasks. The runtime or thread pool handles the actual computation separately. This approach maximizes CPU utilization by preventing threads from idling during I/O operations.
Composition mechanisms allow combining multiple futures into complex workflows. Operations like chaining, mapping, and joining enable building sophisticated asynchronous pipelines. Each operation produces a new future, maintaining immutability and referential transparency.
future1 = async_operation_1()
future2 = async_operation_2()
# Combine results from both futures
combined = Concurrent::Promises.zip(future1, future2).then do |result1, result2|
merge(result1, result2)
end
Error propagation through the future chain ensures failures don't get lost. When a future rejects, the error travels through subsequent operations until handled explicitly. This automatic propagation prevents silent failures common in callback-based code.
Thread safety requirements constrain implementation details. Multiple threads may attempt to resolve a promise or read from a future simultaneously. Implementations must use synchronization primitives to ensure state transitions occur atomically and values propagate correctly across memory barriers.
The happens-before relationship establishes ordering guarantees. Setting a promise's value happens-before any thread observing that value through the future. This memory model ensures visibility across threads without requiring explicit synchronization in user code.
Lazy versus eager evaluation determines when computation begins. Eager futures start execution immediately when created, while lazy futures defer work until a thread requests the value. The choice affects resource consumption and computation timing.
Ruby Implementation
Ruby provides futures and promises through the concurrent-ruby gem, the standard library's asynchronous execution capabilities, and manual implementation using threads and mutexes. The concurrent-ruby library offers the most comprehensive and production-ready implementation.
The Concurrent::Promise class serves as Ruby's primary future/promise implementation. Promises encapsulate asynchronous computation and provide a future-like interface for retrieving results. Creating a promise involves passing a block containing the computation:
require 'concurrent-ruby'
promise = Concurrent::Promise.new do
sleep(2) # Simulate long operation
compute_expensive_result()
end
promise.execute # Starts async execution
result = promise.value # Blocks until complete
The Concurrent::Future class provides similar functionality with automatic execution on creation:
future = Concurrent::Future.execute do
fetch_from_database()
end
# Execution starts immediately
other_work()
# Wait for result when needed
data = future.value
State inspection methods allow checking computation status without blocking. The pending?, fulfilled?, and rejected? methods return the current state. The complete? method returns true for either fulfilled or rejected states:
future = Concurrent::Future.execute { slow_operation() }
if future.pending?
puts "Still computing"
end
future.wait # Block until complete
if future.fulfilled?
puts "Success: #{future.value}"
elsif future.rejected?
puts "Failed: #{future.reason}"
end
Chaining operations through then creates pipelines of dependent computations. Each then call returns a new future that resolves when the previous future completes and the transformation finishes:
future = Concurrent::Future.execute { fetch_user(id) }
.then { |user| fetch_posts(user) }
.then { |posts| transform_posts(posts) }
.then { |data| render_response(data) }
final_result = future.value
Error handling uses rescue to catch exceptions and provide fallback values or alternative computations:
future = Concurrent::Future.execute { risky_operation() }
.rescue { |error|
log_error(error)
default_value
}
result = future.value # Returns default if operation fails
The Concurrent::Promises module provides advanced composition operators. The zip function combines multiple futures into a single future containing an array of results:
user_future = Concurrent::Promises.future { fetch_user(id) }
posts_future = Concurrent::Promises.future { fetch_posts(id) }
comments_future = Concurrent::Promises.future { fetch_comments(id) }
combined = Concurrent::Promises.zip(user_future, posts_future, comments_future)
.then { |user, posts, comments|
build_profile(user, posts, comments)
}
profile = combined.value
The any combinator returns the first future to resolve successfully:
primary = Concurrent::Promises.future { fetch_from_primary_db() }
secondary = Concurrent::Promises.future { fetch_from_secondary_db() }
fastest = Concurrent::Promises.any(primary, secondary)
data = fastest.value # Uses whichever responds first
Custom executors control how futures run. By default, futures use the global thread pool, but applications can specify custom executors for different workload characteristics:
io_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 5,
max_threads: 20,
max_queue: 100
)
future = Concurrent::Future.execute(executor: io_executor) do
network_request()
end
Manual implementation without libraries demonstrates the underlying concepts. A basic promise uses a mutex and condition variable for thread coordination:
class SimplePromise
def initialize
@mutex = Mutex.new
@condition = ConditionVariable.new
@resolved = false
@value = nil
@error = nil
end
def resolve(value)
@mutex.synchronize do
return if @resolved
@value = value
@resolved = true
@condition.broadcast
end
end
def reject(error)
@mutex.synchronize do
return if @resolved
@error = error
@resolved = true
@condition.broadcast
end
end
def value
@mutex.synchronize do
@condition.wait(@mutex) until @resolved
raise @error if @error
@value
end
end
end
# Usage
promise = SimplePromise.new
Thread.new do
sleep(1)
promise.resolve(compute_value())
end
result = promise.value # Blocks until resolved
Practical Examples
Web service aggregation demonstrates futures' ability to parallelize independent requests. An application displaying user profile information needs data from multiple services: user details, posts, friends list, and recommendations. Sequential requests would sum the latency of each call, while futures enable concurrent fetching:
def build_user_profile(user_id)
user = Concurrent::Promises.future { UserService.fetch(user_id) }
posts = Concurrent::Promises.future { PostService.fetch_by_user(user_id) }
friends = Concurrent::Promises.future { FriendService.fetch_friends(user_id) }
recommendations = Concurrent::Promises.future {
RecommendationService.get_recommendations(user_id)
}
Concurrent::Promises.zip(user, posts, friends, recommendations)
.then { |u, p, f, r|
ProfileData.new(
user: u.value,
posts: p.value,
friends: f.value,
recommendations: r.value
)
}
.value
end
Database query parallelization speeds up complex reports requiring data from multiple tables. Instead of sequential queries blocking the database connection, futures execute queries concurrently:
def generate_sales_report(start_date, end_date)
sales_future = Concurrent::Future.execute do
database.query("SELECT * FROM sales WHERE date BETWEEN ? AND ?",
start_date, end_date)
end
inventory_future = Concurrent::Future.execute do
database.query("SELECT * FROM inventory WHERE updated >= ?", start_date)
end
customer_future = Concurrent::Future.execute do
database.query("SELECT * FROM customers WHERE last_purchase >= ?",
start_date)
end
sales = sales_future.value
inventory = inventory_future.value
customers = customer_future.value
compile_report(sales, inventory, customers)
end
Background job processing with futures provides progress tracking and cancellation. A video transcoding service processes uploaded files asynchronously while keeping the client informed:
class VideoTranscoder
def transcode_async(video_id)
promise = Concurrent::Promise.new do
video = Video.find(video_id)
# Transcode in multiple quality levels
qualities = ['720p', '1080p', '4k']
results = qualities.map do |quality|
Concurrent::Future.execute do
transcode_to_quality(video, quality)
end
end
# Wait for all qualities
results.map(&:value)
end
promise.execute
promise
end
def check_progress(promise)
return :complete if promise.fulfilled?
return :failed if promise.rejected?
:processing
end
end
# Usage
transcoder = VideoTranscoder.new
future = transcoder.transcode_async(video_id)
# Poll for completion
until future.complete?
status = transcoder.check_progress(future)
update_ui(status)
sleep(1)
end
transcoded_files = future.value
Cache warming with futures preloads data during application startup without blocking initialization. The application starts serving requests while caches populate asynchronously:
class ApplicationCache
def warm_caches
futures = []
futures << Concurrent::Future.execute do
@product_cache = Product.all.index_by(&:id)
Rails.logger.info "Product cache loaded: #{@product_cache.size} items"
end
futures << Concurrent::Future.execute do
@category_cache = Category.includes(:products).to_a
Rails.logger.info "Category cache loaded: #{@category_cache.size} items"
end
futures << Concurrent::Future.execute do
@config_cache = Configuration.all.to_h { |c| [c.key, c.value] }
Rails.logger.info "Config cache loaded: #{@config_cache.size} items"
end
# Return future that completes when all caches ready
Concurrent::Promises.zip(*futures).then do
Rails.logger.info "All caches warmed"
end
end
end
# During initialization
cache = ApplicationCache.new
warm_future = cache.warm_caches
# Application continues starting
setup_routes()
initialize_middleware()
# Wait for caches before accepting requests
warm_future.wait
start_server()
Common Patterns
The callback pattern registers functions to execute upon future resolution. Instead of blocking to retrieve a value, code provides callbacks for success and failure cases:
future = Concurrent::Future.execute { fetch_data() }
future.on_success { |value|
process_result(value)
notify_observers(value)
}
future.on_error { |error|
log_error(error)
alert_monitoring_system(error)
}
The timeout pattern prevents indefinite blocking when waiting for futures. Applications specify maximum wait times and handle timeout scenarios explicitly:
def fetch_with_timeout(url, timeout_seconds)
future = Concurrent::Future.execute { HTTP.get(url) }
if future.wait(timeout_seconds)
future.value
else
raise TimeoutError, "Request exceeded #{timeout_seconds} seconds"
end
end
begin
response = fetch_with_timeout(api_url, 5)
rescue TimeoutError
response = fetch_from_cache(api_url)
end
The fan-out/fan-in pattern distributes work across multiple futures and aggregates results. Batch processing operations split data into chunks, process them concurrently, and merge outputs:
def process_batch(items, chunk_size = 100)
chunks = items.each_slice(chunk_size).to_a
futures = chunks.map do |chunk|
Concurrent::Future.execute do
chunk.map { |item| process_item(item) }
end
end
results = futures.flat_map(&:value)
results
end
# Process 10,000 items in parallel chunks
items = (1..10_000).to_a
processed = process_batch(items, 100)
The retry pattern handles transient failures by automatically retrying failed operations with exponential backoff:
def future_with_retry(max_attempts: 3, initial_delay: 1.0)
attempt = 0
future = Concurrent::Future.execute do
begin
attempt += 1
yield
rescue StandardError => e
if attempt < max_attempts
delay = initial_delay * (2 ** (attempt - 1))
sleep(delay)
retry
else
raise e
end
end
end
future
end
# Usage
future = future_with_retry(max_attempts: 3) do
unreliable_api_call()
end
result = future.value
The circuit breaker pattern prevents cascading failures by stopping requests to failing services. After a threshold of failures, the circuit opens and fails fast without attempting the operation:
class CircuitBreaker
def initialize(failure_threshold: 5, timeout: 60)
@failure_threshold = failure_threshold
@timeout = timeout
@failures = 0
@last_failure_time = nil
@state = :closed
end
def call(&block)
if @state == :open && Time.now - @last_failure_time > @timeout
@state = :half_open
end
if @state == :open
return Concurrent::Promise.reject(
CircuitOpenError.new("Circuit breaker is open")
)
end
future = Concurrent::Future.execute(&block)
future.on_success { reset_failures }
future.on_error { record_failure }
future
end
private
def record_failure
@failures += 1
@last_failure_time = Time.now
@state = :open if @failures >= @failure_threshold
end
def reset_failures
@failures = 0
@state = :closed
end
end
The memoized future pattern caches computation results to avoid redundant work. Multiple consumers can share a single future, with the computation executing only once:
class MemoizedFuture
def initialize(&block)
@block = block
@future = nil
@mutex = Mutex.new
end
def get
@mutex.synchronize do
@future ||= Concurrent::Future.execute(&@block)
end
end
end
# Usage - computation runs once despite multiple callers
expensive_computation = MemoizedFuture.new { compute_complex_result() }
future1 = expensive_computation.get
future2 = expensive_computation.get # Returns same future
future3 = expensive_computation.get # Returns same future
result = future1.value # All three share this result
Error Handling & Edge Cases
Exception propagation through future chains requires careful handling to prevent error swallowing. When a future rejects, subsequent operations in the chain skip unless explicitly rescuing:
future = Concurrent::Future.execute { failing_operation() }
.then { |result| transform(result) } # Skipped if previous failed
.then { |result| finalize(result) } # Also skipped
.rescue { |error|
# Handle error from any stage
log_error(error)
fallback_value
}
result = future.value
Timeout handling prevents resource leaks when futures never resolve. The timeout mechanism doesn't cancel the underlying computation, only stops waiting for it. The operation continues executing in the background:
future = Concurrent::Future.execute { very_long_operation() }
begin
result = future.wait(timeout: 5)
if result.nil?
raise TimeoutError, "Operation timed out"
end
future.value
rescue TimeoutError => e
# Future still executing in background thread
# Clean up resources if possible
cleanup_resources()
raise
end
Cancellation requires explicit support since running futures cannot be forcibly stopped. The computation must periodically check a cancellation flag and exit gracefully:
class CancellableFuture
def initialize(&block)
@cancelled = Concurrent::AtomicBoolean.new(false)
@future = Concurrent::Future.execute do
block.call(@cancelled)
end
end
def cancel
@cancelled.make_true
end
def value
@future.value
end
end
# Usage
future = CancellableFuture.new do |cancelled|
results = []
data.each do |item|
return results if cancelled.true? # Check flag
results << process(item)
end
results
end
# Cancel from another thread
future.cancel
Race conditions occur when multiple threads resolve a promise simultaneously. Promise implementations must ensure atomic state transitions:
# Thread-safe promise resolution
class SafePromise
def initialize
@mutex = Mutex.new
@resolved = false
@value = nil
end
def try_resolve(value)
@mutex.synchronize do
return false if @resolved
@value = value
@resolved = true
true # Successfully resolved
end
end
def value
@mutex.synchronize { @value }
end
end
promise = SafePromise.new
# Multiple threads try to resolve
threads = 10.times.map do |i|
Thread.new { promise.try_resolve(i) }
end
threads.each(&:join)
# Only first thread succeeds; others return false
Deadlock prevention requires attention to lock ordering when futures depend on each other. Circular dependencies between futures cause threads to wait indefinitely:
# Deadlock scenario
future1 = Concurrent::Future.new { future2.value }
future2 = Concurrent::Future.new { future1.value }
future1.execute
future2.execute
# Both futures wait for each other forever
# future1.value # Deadlocks
Memory leaks occur when long-lived futures retain references to large objects. Future chains accumulate intermediate results even if only the final value matters:
# Memory leak - retains all intermediate results
future = Concurrent::Future.execute { load_large_dataset() }
.then { |data| process_step1(data) }
.then { |data| process_step2(data) }
.then { |data| process_step3(data) }
# Better - release intermediate values
future = Concurrent::Future.execute { load_large_dataset() }
.then do |data|
result = process_step1(data)
data = nil # Release original
result
end
Thread pool exhaustion happens when blocking operations run in the future executor. Futures that wait for other futures consume threads from the pool, potentially causing deadlock:
# Dangerous - blocks thread pool thread
future1 = Concurrent::Future.execute do
# This blocks a thread pool thread while waiting
future2 = Concurrent::Future.execute { slow_operation() }
future2.value # Blocks thread
end
# Better - use chaining
future1 = Concurrent::Future.execute { slow_operation() }
.then { |result| process(result) }
Unhandled rejections cause silent failures when no error handlers exist. Applications should ensure all futures have error handling paths:
# Bad - errors disappear
future = Concurrent::Future.execute { failing_operation() }
# If not retrieved, error never surfaces
# Good - explicit error handling
future = Concurrent::Future.execute { failing_operation() }
.rescue { |error|
ErrorReporter.notify(error)
default_value
}
# Or wait with error handling
begin
result = future.value
rescue StandardError => e
handle_error(e)
end
Design Considerations
Choosing between futures and callbacks depends on the composition complexity and error handling requirements. Callbacks work for simple async operations but become unwieldy with multiple dependencies. Futures provide composition operators that keep code readable:
# Callback hell
fetch_user(id) do |user, error|
if error
handle_error(error)
else
fetch_posts(user.id) do |posts, error|
if error
handle_error(error)
else
fetch_comments(posts) do |comments, error|
if error
handle_error(error)
else
render(user, posts, comments)
end
end
end
end
end
end
# Future composition
Concurrent::Future.execute { fetch_user(id) }
.then { |user| fetch_posts(user.id) }
.then { |posts| fetch_comments(posts) }
.then { |comments| render(user, posts, comments) }
.rescue { |error| handle_error(error) }
Synchronous versus asynchronous execution affects system throughput and resource utilization. Synchronous code blocks threads during I/O, limiting concurrent operations to thread count. Asynchronous futures enable higher concurrency by freeing threads during waits. The trade-off involves increased complexity and harder debugging.
Eager versus lazy evaluation determines when computation starts. Eager futures begin execution immediately, consuming resources regardless of whether results get used. Lazy futures defer work until requested, saving resources but adding latency when values become necessary:
# Eager - starts immediately
eager = Concurrent::Future.execute { expensive_computation() }
# Work starts even if never used
# Lazy - defers execution
lazy = Concurrent::Promises.delay { expensive_computation() }
# Work starts only when value requested
result = lazy.value
Thread pool sizing affects performance characteristics. Too few threads cause queuing delays, while too many threads increase context switching overhead and memory consumption. Thread count should match workload type: CPU-bound work benefits from thread count near CPU core count, while I/O-bound work handles more threads:
# CPU-bound workload
cpu_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 2,
max_threads: Concurrent.processor_count,
max_queue: 100
)
# I/O-bound workload
io_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 10,
max_threads: 100,
max_queue: 1000
)
Error recovery strategy determines system resilience. Fast-fail approaches reject immediately when errors occur, propagating failures to callers. Retry with backoff attempts recovery before failing. Circuit breakers prevent cascading failures. The choice depends on error characteristics and system requirements.
Backpressure mechanisms prevent overwhelming downstream systems when producers generate work faster than consumers process it. Bounded queues block producers when full, providing natural backpressure. Unbounded queues risk memory exhaustion. Applications must choose queue sizes based on acceptable latency and memory constraints.
Testing asynchronous code requires different approaches than synchronous code. Futures introduce nondeterminism and timing dependencies. Tests must wait for futures to complete and handle potential race conditions. Mock futures can isolate units from async dependencies:
# Testing with futures
def test_async_operation
future = perform_async_operation()
future.wait(timeout: 5)
assert future.fulfilled?
assert_equal expected_result, future.value
end
# Mocking futures
def test_with_mock_future
mock_future = Concurrent::Promises.fulfilled_future(mock_result)
allow(service).to receive(:fetch_async).and_return(mock_future)
result = process_with_service()
assert_equal processed_mock_result, result
end
Reference
Future States
| State | Description | Transitions To |
|---|---|---|
| Pending | Computation not yet complete | Fulfilled or Rejected |
| Fulfilled | Successfully completed with value | Terminal state |
| Rejected | Failed with error | Terminal state |
Core Operations
| Operation | Purpose | Blocks |
|---|---|---|
| execute | Start async computation | No |
| value | Retrieve result | Yes, until resolved |
| wait | Wait for completion | Yes, until complete |
| wait(timeout) | Wait with timeout | Yes, up to timeout |
| pending? | Check if still computing | No |
| fulfilled? | Check if succeeded | No |
| rejected? | Check if failed | No |
| complete? | Check if done (either state) | No |
Composition Methods
| Method | Description | Returns |
|---|---|---|
| then | Chain dependent computation | New future |
| rescue | Handle errors | New future |
| on_success | Register success callback | Self |
| on_error | Register error callback | Self |
| zip | Combine multiple futures | Future with array |
| any | Race multiple futures | Future with first result |
| delay | Create lazy future | Deferred future |
Concurrent-Ruby Promise Creation
# Execute immediately
future = Concurrent::Future.execute { computation() }
# Manual execution
promise = Concurrent::Promise.new { computation() }
promise.execute
# Fulfilled promise
fulfilled = Concurrent::Promises.fulfilled_future(value)
# Rejected promise
rejected = Concurrent::Promises.rejected_future(error)
# Lazy promise
lazy = Concurrent::Promises.delay { computation() }
Error Handling Patterns
| Pattern | Implementation | Use Case |
|---|---|---|
| Rescue chain | then().rescue() | Single fallback |
| Try-catch value | begin future.value rescue end | Explicit handling |
| Default on error | rescue { default_value } | Fallback values |
| Error callback | on_error { handle } | Side effects only |
| Retry | rescue { retry_future } | Transient failures |
Common Configuration Options
| Option | Purpose | Typical Values |
|---|---|---|
| executor | Thread pool to use | Custom executor instance |
| timeout | Maximum wait time | Seconds (integer or float) |
| max_threads | Thread pool size | CPU count for CPU-bound, 20-100 for I/O |
| max_queue | Queue capacity | 100-1000 depending on memory |
State Transition Rules
# Promise can only resolve once
promise.resolve(value1)
promise.resolve(value2) # Ignored - already resolved
# Rejected promises skip then() blocks
Concurrent::Future.execute { raise "Error" }
.then { |v| process(v) } # Skipped
.rescue { |e| handle(e) } # Executes
# Rescue converts rejection to fulfillment
future = Concurrent::Future.execute { raise "Error" }
.rescue { |e| "default" }
future.fulfilled? # true
future.value # "default"
Performance Characteristics
| Operation | Time Complexity | Notes |
|---|---|---|
| Create future | O(1) | Allocates promise object |
| Read value (resolved) | O(1) | Returns cached value |
| Read value (pending) | O(blocking) | Waits for resolution |
| Chain then() | O(1) | Creates new future |
| Zip n futures | O(n) | Waits for slowest |
Thread Safety Guarantees
| Aspect | Guarantee |
|---|---|
| Multiple resolve attempts | Only first succeeds |
| Reading from resolved future | Thread-safe, no locks |
| Callback registration | Thread-safe |
| Value visibility | Happens-before relationship established |
| Exception propagation | Safely crosses thread boundaries |