Overview
Reactive programming represents a declarative programming paradigm focused on data streams and the propagation of change. The paradigm treats events, user inputs, API responses, and state changes as streams of values that flow through time, enabling systems to react automatically to changes in data sources.
The reactive model originated from functional reactive programming (FRP) research in the 1990s, with practical implementations emerging in the 2010s through libraries like RxJava and Rx.NET. The paradigm addresses challenges in asynchronous programming by providing compositional operators for transforming, filtering, and combining event streams.
At its core, reactive programming inverts traditional control flow. Instead of imperatively requesting data when needed, components subscribe to data streams and react when new values arrive. This inversion creates a push-based model where data producers drive execution flow rather than consumers polling for updates.
# Traditional imperative approach
def fetch_user_data(user_id)
user = database.find_user(user_id)
profile = api.fetch_profile(user.profile_id)
posts = api.fetch_posts(user.id)
{ user: user, profile: profile, posts: posts }
end
# Reactive approach with observable streams
user_stream = Observable.from_database(:users)
profile_stream = user_stream.flat_map { |user|
Observable.from_api("/profiles/#{user.profile_id}")
}
posts_stream = user_stream.flat_map { |user|
Observable.from_api("/posts?user_id=#{user.id}")
}
combined = Observable.zip(user_stream, profile_stream, posts_stream)
combined.subscribe do |user, profile, posts|
render_dashboard(user, profile, posts)
end
Reactive programming applies across domains including user interface development, real-time data processing, microservice orchestration, and event-driven systems. The paradigm excels in scenarios requiring coordination of multiple asynchronous operations, handling of continuous event streams, or responsive systems that adapt to changing data.
The reactive approach introduces complexity through abstraction but provides benefits in managing asynchronous operations, composing complex data flows, and maintaining responsive systems under varying load conditions.
Key Principles
Reactive programming builds on four fundamental concepts that define how data flows through systems and how components interact with changing data.
Observables represent data sources that emit values over time. An observable can emit zero or more values followed optionally by either a completion signal or an error. Observables remain dormant until an observer subscribes, at which point they begin producing values. This lazy evaluation model prevents unnecessary computation for unused streams.
# Observable that emits values over time
observable = Observable.create do |observer|
observer.on_next(1)
observer.on_next(2)
observer.on_next(3)
observer.on_completed
end
# Cold observable - computation happens per subscription
def create_cold_observable
Observable.create do |observer|
data = expensive_computation()
observer.on_next(data)
observer.on_completed
end
end
# Hot observable - computation shared across subscriptions
def create_hot_observable
subject = Subject.new
Thread.new do
loop do
subject.on_next(sensor_reading())
sleep(1)
end
end
subject
end
Observers consume values emitted by observables through subscription. An observer implements three callbacks: on_next for handling emitted values, on_error for handling errors, and on_completed for handling stream completion. Observers can unsubscribe at any time to stop receiving values and free resources.
Operators transform, filter, combine, and manipulate observable streams. Operators take observables as input and return new observables, enabling functional composition of complex data transformations. Operators maintain the observable contract, propagating values, errors, and completion signals downstream.
# Transformation operators
numbers = Observable.from([1, 2, 3, 4, 5])
mapped = numbers.map { |n| n * 2 }
# Emits: 2, 4, 6, 8, 10
filtered = numbers.filter { |n| n.even? }
# Emits: 2, 4
scanned = numbers.scan(0) { |acc, n| acc + n }
# Emits: 1, 3, 6, 10, 15 (running sum)
Schedulers control execution context and threading for observable operations. Schedulers determine which thread emits values, processes transformations, and delivers results to observers. This separation of logic from execution context allows the same reactive pipeline to run synchronously in tests and asynchronously in production.
The marble diagram notation visualizes reactive streams over time, with each character representing a moment and symbols representing events:
Input: --1--2--3--4--5--|
map(x2): --2--4--6--8--10-|
filter: -----4-----8--10-|
-- : time passing
1 : emitted value
| : completion
X : error
Reactive streams follow specific lifecycle rules. Observables emit values through on_next, signal completion through on_completed or on_error, and emit no values after terminal events. Subscriptions return disposables that allow cancellation, preventing resource leaks in long-running streams.
Error handling in reactive streams differs from synchronous code. Errors propagate downstream automatically, terminating streams unless explicitly handled. Error operators like catch, retry, and on_error_resume_next provide recovery mechanisms while maintaining stream semantics.
# Error propagation and handling
observable = Observable.create do |observer|
observer.on_next(1)
observer.on_next(2)
observer.on_error(StandardError.new("Failure"))
observer.on_next(3) # Never emitted
end
recovered = observable
.catch { |error| Observable.just(-1) }
.subscribe do |value|
puts value
end
# Prints: 1, 2, -1
Backpressure management handles situations where producers emit values faster than consumers can process them. Strategies include buffering values, dropping excess values, or signaling producers to slow down. Without backpressure handling, systems risk memory exhaustion or unbounded latency.
Ruby Implementation
Ruby implements reactive programming through several libraries and patterns, with RxRuby providing the most complete implementation of reactive extensions. Ruby's dynamic nature and block syntax align well with reactive operators, though performance characteristics differ from statically-typed implementations.
RxRuby implements the ReactiveX specification for Ruby, providing observables, operators, and schedulers. The library integrates with Ruby's threading model and supports both MRI and JRuby.
require 'rx'
# Creating observables
range_obs = Rx::Observable.range(1, 5)
array_obs = Rx::Observable.from_array([1, 2, 3])
interval_obs = Rx::Observable.interval(1.0)
# Creating from blocks
custom_obs = Rx::Observable.create do |observer|
begin
# Emit values
observer.on_next("value 1")
observer.on_next("value 2")
observer.on_completed
rescue => e
observer.on_error(e)
end
# Return disposable for cleanup
Rx::Subscription.create { puts "Cleanup" }
end
# Subscribing with observer
observer = Rx::Observer.configure do |o|
o.on_next { |value| puts "Next: #{value}" }
o.on_error { |error| puts "Error: #{error}" }
o.on_completed { puts "Completed" }
end
subscription = custom_obs.subscribe(observer)
Ruby's block syntax simplifies operator chains compared to method chaining in other languages. Operators accept blocks for transformation logic, maintaining Ruby idioms while providing reactive semantics.
# Operator composition with Ruby blocks
source = Rx::Observable.from_array([
{ id: 1, name: "Alice", age: 30 },
{ id: 2, name: "Bob", age: 25 },
{ id: 3, name: "Charlie", age: 35 }
])
result = source
.filter { |person| person[:age] > 26 }
.map { |person| person[:name].upcase }
.take(2)
.to_a
.wait
# => ["ALICE", "CHARLIE"]
Subjects serve as both observable and observer, enabling manual value emission and multicasting to multiple subscribers. RxRuby provides several subject types with different replay behaviors.
# Subject for manual emission
subject = Rx::Subject.new
subscription1 = subject.subscribe { |x| puts "Observer 1: #{x}" }
subscription2 = subject.subscribe { |x| puts "Observer 2: #{x}" }
subject.on_next(1)
subject.on_next(2)
# Both observers receive values
# ReplaySubject caches values for late subscribers
replay = Rx::ReplaySubject.new(2) # Buffer size 2
replay.on_next(1)
replay.on_next(2)
replay.on_next(3)
replay.subscribe { |x| puts "Late subscriber: #{x}" }
# Prints: "Late subscriber: 2", "Late subscriber: 3"
# BehaviorSubject maintains current value
behavior = Rx::BehaviorSubject.new(0)
behavior.subscribe { |x| puts "Initial: #{x}" }
# Immediately prints: "Initial: 0"
behavior.on_next(1)
# Prints: "Initial: 1"
Schedulers in RxRuby control threading and execution timing. The library provides immediate, current thread, and thread pool schedulers, with support for custom implementations.
# Scheduler types
immediate = Rx::ImmediateScheduler.instance
current_thread = Rx::CurrentThreadScheduler.instance
thread_pool = Rx::ThreadPoolScheduler.new
# Observing on different schedulers
source = Rx::Observable.interval(0.1)
.take(5)
.observe_on(thread_pool)
.subscribe do |value|
puts "Value: #{value} on thread #{Thread.current.object_id}"
end
# Subscribe on scheduler controls subscription execution
cold_observable = Rx::Observable.create do |observer|
puts "Subscribed on thread #{Thread.current.object_id}"
observer.on_next(1)
observer.on_completed
end
cold_observable
.subscribe_on(thread_pool)
.subscribe { |x| puts "Received: #{x}" }
Ruby's integration with EventMachine and Celluloid provides alternative reactive patterns. EventMachine offers callback-based asynchronous I/O, while Celluloid provides actor-based concurrency with reactive capabilities.
# EventMachine reactive I/O
require 'eventmachine'
EM.run do
http = EM::HttpRequest.new('http://api.example.com/data').get
http.callback do
data = JSON.parse(http.response)
puts "Received: #{data}"
EM.stop
end
http.errback do
puts "Request failed"
EM.stop
end
end
# Celluloid actor with reactive behavior
require 'celluloid'
class DataProcessor
include Celluloid
def initialize
@subscribers = []
end
def subscribe(&block)
@subscribers << block
end
def process(data)
result = transform(data)
@subscribers.each { |sub| sub.call(result) }
end
private
def transform(data)
data.upcase
end
end
Memory management in Ruby's reactive implementations requires attention to subscription disposal. Long-lived observables can accumulate subscriptions and closures, preventing garbage collection. Explicit disposal prevents leaks.
# Proper subscription management
class DataFeed
def initialize
@subscriptions = []
end
def start
subscription = Rx::Observable.interval(1.0)
.subscribe { |tick| process_tick(tick) }
@subscriptions << subscription
end
def stop
@subscriptions.each(&:dispose)
@subscriptions.clear
end
private
def process_tick(tick)
puts "Processing tick #{tick}"
end
end
# Automatic disposal with blocks
Rx::Observable.interval(0.1)
.take(5)
.subscribe_with_disposable do |subscription|
subscription.on_next { |x| puts x }
# Return cleanup logic
-> { puts "Subscription disposed" }
end
Practical Examples
Reactive programming solves real-world problems involving asynchronous data coordination, event stream processing, and responsive system design. The following examples demonstrate practical applications with complete implementations.
Real-time search with debouncing prevents excessive API calls by delaying search execution until user input stabilizes. This pattern applies to autocomplete, live validation, and any scenario where rapid input changes should not trigger immediate processing.
require 'rx'
class SearchBox
def initialize(search_api)
@search_api = search_api
@input_subject = Rx::Subject.new
setup_reactive_search
end
def on_input_change(query)
@input_subject.on_next(query)
end
private
def setup_reactive_search
@input_subject
.debounce(0.3) # Wait 300ms after last input
.filter { |query| query.length >= 3 } # Minimum query length
.distinct_until_changed # Skip duplicate consecutive queries
.flat_map_latest do |query|
# Cancel previous search when new query arrives
Rx::Observable.start do
@search_api.search(query)
end.catch { |error|
Rx::Observable.just({ error: error.message, results: [] })
}
end
.observe_on(Rx::ImmediateScheduler.instance)
.subscribe do |results|
display_results(results)
end
end
def display_results(results)
if results[:error]
puts "Error: #{results[:error]}"
else
puts "Found #{results.length} results"
results.each { |r| puts " - #{r[:title]}" }
end
end
end
# Usage
api = SearchAPI.new
search_box = SearchBox.new(api)
# Simulating rapid user input
search_box.on_input_change("r")
sleep(0.1)
search_box.on_input_change("re")
sleep(0.1)
search_box.on_input_change("rea")
sleep(0.1)
search_box.on_input_change("reac")
sleep(0.1)
search_box.on_input_change("react")
# Only "react" triggers search after 300ms delay
Event aggregation from multiple sources combines data streams from different origins, coordinating timing and handling partial failures. This pattern applies to dashboard updates, monitoring systems, and any scenario requiring synchronized data from multiple services.
class DashboardAggregator
def initialize(user_service, metrics_service, notification_service)
@user_service = user_service
@metrics_service = metrics_service
@notification_service = notification_service
end
def fetch_dashboard_data(user_id)
# Create independent observables for each service
user_obs = Rx::Observable.start do
@user_service.fetch_user(user_id)
end.catch { |error|
Rx::Observable.just({ error: error.message })
}
metrics_obs = Rx::Observable.start do
@metrics_service.fetch_metrics(user_id)
end.catch { |error|
Rx::Observable.just([]) # Return empty on error
}
notifications_obs = Rx::Observable.start do
@notification_service.fetch_notifications(user_id)
end.timeout(2.0) {
Rx::Observable.just([]) # Fallback if service slow
}
# Combine all streams, waiting for all to complete
Rx::Observable.zip(user_obs, metrics_obs, notifications_obs)
.map do |user, metrics, notifications|
{
user: user,
metrics: metrics,
notifications: notifications,
loaded_at: Time.now
}
end
end
end
# Usage with subscription
aggregator = DashboardAggregator.new(user_svc, metrics_svc, notif_svc)
aggregator.fetch_dashboard_data(123).subscribe(
lambda { |data| render_dashboard(data) },
lambda { |error| show_error(error) },
lambda { puts "Dashboard loaded" }
)
Rate-limited API consumption processes items from a queue while respecting API rate limits, handling backpressure when producers outpace consumers. This pattern applies to batch processing, webhook handling, and any scenario with throughput constraints.
class RateLimitedProcessor
def initialize(api_client, requests_per_second: 10)
@api_client = api_client
@rate_limit_interval = 1.0 / requests_per_second
@queue = Queue.new
@subject = Rx::Subject.new
setup_processing_pipeline
end
def enqueue(item)
@queue << item
@subject.on_next(item)
end
def stop
@subject.on_completed
end
private
def setup_processing_pipeline
@subject
.buffer_with_time_or_count(@rate_limit_interval, 1)
.flat_map do |items|
# Process one item at rate-limited pace
item = items.first
Rx::Observable.start do
process_item(item)
end
end
.retry(3) # Retry failed requests up to 3 times
.subscribe(
lambda { |result| handle_success(result) },
lambda { |error| handle_error(error) },
lambda { puts "Processing completed" }
)
end
def process_item(item)
result = @api_client.post("/items", item)
{ item: item, result: result, status: :success }
rescue => error
{ item: item, error: error, status: :failed }
end
def handle_success(result)
if result[:status] == :success
puts "Processed item #{result[:item][:id]}"
else
puts "Failed to process item #{result[:item][:id]}: #{result[:error]}"
end
end
def handle_error(error)
puts "Pipeline error: #{error.message}"
end
end
# Usage
processor = RateLimitedProcessor.new(api_client, requests_per_second: 5)
100.times do |i|
processor.enqueue({ id: i, data: "Item #{i}" })
end
sleep(25) # Allow processing to complete
processor.stop
WebSocket message stream processing handles real-time bidirectional communication with message filtering, transformation, and error recovery. This pattern applies to chat applications, live updates, and collaborative editing.
require 'faye/websocket'
require 'rx'
class WebSocketStream
def initialize(url)
@url = url
@message_subject = Rx::Subject.new
@connection_subject = Rx::BehaviorSubject.new(:disconnected)
end
def connect
EM.run do
@ws = Faye::WebSocket::Client.new(@url)
@ws.on :open do |event|
@connection_subject.on_next(:connected)
puts "WebSocket connected"
end
@ws.on :message do |event|
@message_subject.on_next(JSON.parse(event.data))
end
@ws.on :close do |event|
@connection_subject.on_next(:disconnected)
@message_subject.on_completed
puts "WebSocket closed"
end
@ws.on :error do |event|
@message_subject.on_error(StandardError.new(event.message))
end
end
end
def messages
@message_subject
end
def connection_status
@connection_subject
end
def send_message(data)
@ws.send(JSON.generate(data)) if @ws
end
def disconnect
@ws.close if @ws
end
end
# Processing messages with reactive operators
ws = WebSocketStream.new('ws://example.com/stream')
ws.connect
# Filter and transform specific message types
chat_messages = ws.messages
.filter { |msg| msg['type'] == 'chat' }
.map { |msg| { user: msg['user'], text: msg['text'], time: Time.now } }
.subscribe { |msg| display_chat_message(msg) }
# Aggregate metrics from stream
metrics = ws.messages
.filter { |msg| msg['type'] == 'metric' }
.window_with_time(5.0) # 5-second windows
.flat_map do |window|
window.reduce([]) { |acc, msg| acc << msg }
end
.subscribe { |msgs| update_metrics_dashboard(msgs) }
# Monitor connection status
ws.connection_status
.distinct_until_changed
.subscribe do |status|
if status == :disconnected
puts "Connection lost, attempting reconnection..."
sleep(5)
ws.connect
end
end
Common Patterns
Reactive programming employs established patterns for composing asynchronous operations, managing data flow, and handling errors. These patterns address recurring challenges in stream-based architectures.
Observable creation patterns determine how streams originate and emit values. Cold observables create new data sources per subscription, providing isolation but duplicating work. Hot observables share a single data source across subscriptions, reducing redundancy but coupling subscribers.
# Cold observable - independent execution per subscriber
def create_cold_http_observable(url)
Rx::Observable.create do |observer|
begin
response = HTTP.get(url)
observer.on_next(response.body)
observer.on_completed
rescue => error
observer.on_error(error)
end
end
end
cold = create_cold_http_observable('http://api.example.com/data')
subscription1 = cold.subscribe { |data| process_data(data) }
subscription2 = cold.subscribe { |data| cache_data(data) }
# Makes two HTTP requests
# Hot observable - shared execution
def create_hot_sensor_observable(sensor)
subject = Rx::Subject.new
Thread.new do
loop do
reading = sensor.read
subject.on_next(reading)
sleep(1)
end
end
subject
end
hot = create_hot_sensor_observable(temperature_sensor)
subscription1 = hot.subscribe { |temp| log_temperature(temp) }
sleep(5)
subscription2 = hot.subscribe { |temp| alert_if_high(temp) }
# Both share same sensor readings, second subscriber misses first 5 seconds
Transformation pipelines chain operators to process data through multiple stages. Each operator receives an observable and returns a new observable, maintaining immutability and enabling functional composition.
# Multi-stage data transformation pipeline
def process_sensor_data(sensor_stream)
sensor_stream
.map { |reading| { value: reading[:value], timestamp: Time.now } }
.filter { |data| data[:value] > 0 } # Remove invalid readings
.buffer_with_time(5.0) # Collect 5 seconds of readings
.map { |buffer| calculate_average(buffer) }
.scan([]) { |history, avg| history << avg; history.last(10) } # Keep last 10 averages
.map { |history| detect_trend(history) }
.distinct_until_changed # Only emit when trend changes
end
# Flattening strategies for nested observables
def fetch_user_details(user_ids)
Rx::Observable.from_array(user_ids)
.flat_map do |id|
# flat_map flattens and merges concurrent requests
fetch_user_observable(id)
end
# Results arrive in completion order, not input order
end
def fetch_user_details_ordered(user_ids)
Rx::Observable.from_array(user_ids)
.concat_map do |id|
# concat_map maintains order but processes sequentially
fetch_user_observable(id)
end
# Results arrive in input order
end
def fetch_latest_user_details(user_id_stream)
user_id_stream
.flat_map_latest do |id|
# flat_map_latest cancels previous request when new id arrives
fetch_user_observable(id)
end
# Only most recent request completes
end
Error handling strategies determine how streams respond to failures. Different strategies suit different reliability requirements and business logic.
# Retry with exponential backoff
def fetch_with_retry(url, max_retries: 3)
Rx::Observable.start { HTTP.get(url) }
.retry_when do |errors|
errors
.zip(Rx::Observable.range(1, max_retries)) { |error, attempt| attempt }
.flat_map do |attempt|
delay = 2 ** attempt # Exponential backoff
puts "Retry attempt #{attempt} after #{delay} seconds"
Rx::Observable.timer(delay)
end
end
end
# Fallback to alternative source on error
def fetch_with_fallback(primary_url, fallback_url)
Rx::Observable.start { HTTP.get(primary_url) }
.on_error_resume_next do |error|
puts "Primary source failed: #{error.message}"
Rx::Observable.start { HTTP.get(fallback_url) }
end
end
# Continue stream despite individual item failures
def process_items_resilient(items)
Rx::Observable.from_array(items)
.flat_map do |item|
Rx::Observable.start { process_item(item) }
.catch do |error|
# Log error but don't terminate stream
puts "Failed to process #{item}: #{error.message}"
Rx::Observable.empty
end
end
end
# Timeout with fallback value
def fetch_with_timeout(url, timeout_seconds: 5, default_value: nil)
Rx::Observable.start { HTTP.get(url) }
.timeout(timeout_seconds) do
Rx::Observable.just(default_value)
end
end
Backpressure management controls flow when producers emit faster than consumers process. Without backpressure handling, systems accumulate unbounded queues leading to memory exhaustion.
# Buffering strategy - accumulate values
def buffer_strategy(fast_producer)
fast_producer
.buffer_with_time_or_count(1.0, 100) # Buffer up to 1 second or 100 items
.subscribe do |buffer|
process_batch(buffer)
end
end
# Sampling strategy - take periodic samples
def sampling_strategy(fast_producer)
fast_producer
.sample(0.5) # Sample every 500ms
.subscribe do |value|
process_sample(value)
end
end
# Throttling strategy - rate limit emissions
def throttle_strategy(fast_producer)
fast_producer
.throttle_first(1.0) # Take first item per second, drop rest
.subscribe do |value|
process_value(value)
end
end
# Dropping strategy - discard excess values
def drop_strategy(fast_producer)
fast_producer
.take_last(10) # Keep only last 10 items
.subscribe do |value|
process_value(value)
end
end
Resource cleanup patterns ensure proper disposal of subscriptions, connections, and system resources. Missing cleanup causes resource leaks in long-running applications.
# Composite disposable for multiple subscriptions
class DataMonitor
def initialize
@disposables = Rx::CompositeDisposable.new
end
def start
subscription1 = temperature_stream.subscribe { |t| log_temperature(t) }
subscription2 = pressure_stream.subscribe { |p| log_pressure(p) }
subscription3 = humidity_stream.subscribe { |h| log_humidity(h) }
@disposables.add(subscription1)
@disposables.add(subscription2)
@disposables.add(subscription3)
end
def stop
@disposables.dispose # Disposes all subscriptions
end
end
# Automatic disposal with take_until
def monitor_until_signal(data_stream, stop_signal)
data_stream
.take_until(stop_signal) # Automatically disposes when stop_signal emits
.subscribe { |data| process_data(data) }
end
# Resource management with ensure block
def with_managed_stream(url)
connection = nil
Rx::Observable.create do |observer|
begin
connection = WebSocket.connect(url)
connection.on_message do |msg|
observer.on_next(msg)
end
Rx::Subscription.create do
connection.close if connection
end
rescue => error
observer.on_error(error)
end
end
end
Design Considerations
Reactive programming introduces architectural decisions regarding when to adopt the paradigm, how to structure reactive components, and what trade-offs to accept. The paradigm suits specific problem domains while adding complexity inappropriate for simpler scenarios.
Problem domain suitability determines whether reactive programming provides benefits over alternatives. The paradigm excels with asynchronous event streams, multiple concurrent operations requiring coordination, and systems requiring backpressure management. Applications processing real-time data feeds, coordinating microservices, or handling complex user interactions benefit from reactive abstractions.
Reactive programming adds overhead for simple request-response patterns, straightforward CRUD operations, or synchronous workflows. Imperative code remains clearer for sequential operations without asynchronous concerns. Introducing reactive abstractions for problems solvable with simpler patterns increases cognitive load without corresponding benefits.
# Unnecessary reactive complexity for simple case
def fetch_user_simple(id)
# Simple synchronous approach is clearer
user = User.find(id)
{ user: user, status: :found }
rescue ActiveRecord::RecordNotFound
{ user: nil, status: :not_found }
end
# Appropriate reactive usage for complex coordination
def fetch_dashboard_reactive(user_id)
# Multiple async sources benefit from reactive coordination
user_obs = Rx::Observable.start { User.find(user_id) }
posts_obs = Rx::Observable.start { Post.where(user_id: user_id).limit(10) }
friends_obs = Rx::Observable.start { Friendship.where(user_id: user_id) }
Rx::Observable.zip(user_obs, posts_obs, friends_obs)
.timeout(5.0)
.map { |user, posts, friends| build_dashboard(user, posts, friends) }
end
Abstraction trade-offs balance expressiveness against comprehension difficulty. Reactive operators provide concise composition but create indirection between cause and effect. Developers must understand operator semantics, stream lifecycle, and subscription timing. Teams without reactive programming experience face steeper learning curves compared to callback or promise-based approaches.
Debugging reactive pipelines requires different techniques than imperative code. Stack traces point to operator composition rather than business logic. Observable chains execute asynchronously, complicating mental models of execution flow. Logging and instrumentation points must be inserted into streams to observe values and timing.
# Reactive pipeline debugging
def debuggable_pipeline(source)
source
.do_on_next { |x| puts "After source: #{x}" }
.map { |x| transform(x) }
.do_on_next { |x| puts "After transform: #{x}" }
.filter { |x| valid?(x) }
.do_on_next { |x| puts "After filter: #{x}" }
.subscribe(
lambda { |x| puts "Final: #{x}" },
lambda { |e| puts "Error: #{e.message}\n#{e.backtrace.join("\n")}" }
)
end
Performance characteristics differ between reactive implementations and direct imperative code. Reactive abstractions add overhead through operator chains, subscription management, and scheduler coordination. For CPU-bound operations on small datasets, imperative loops often outperform reactive pipelines. Reactive benefits appear with I/O-bound operations, concurrent processing, or complex coordination logic.
Memory usage patterns in reactive systems depend on operator selection. Buffering operators accumulate values in memory, risking exhaustion without bounds. Stateful operators maintain internal state across emissions. Cold observables duplicate resources per subscription. Understanding memory implications of operator chains prevents resource leaks.
# Memory-efficient reactive patterns
def memory_conscious_processing(large_dataset)
# Avoid buffering entire dataset
Rx::Observable.from_enumerable(large_dataset)
.map { |item| process_item(item) }
.filter { |result| result[:valid] }
.take(100) # Limit result size
.subscribe { |result| handle_result(result) }
# vs memory-intensive approach
Rx::Observable.from_enumerable(large_dataset)
.to_a # Loads entire dataset into memory
.wait
.map { |item| process_item(item) } # Array operations, not reactive
end
Testing reactive code requires different strategies than synchronous code. Tests must handle asynchronous execution, timing dependencies, and subscription management. Test schedulers provide deterministic timing, replacing real-time delays with virtual time.
require 'rspec'
RSpec.describe DataProcessor do
let(:scheduler) { Rx::TestScheduler.new }
it 'debounces rapid inputs' do
inputs = scheduler.create_hot_observable(
on_next(100, 'a'),
on_next(150, 'ab'),
on_next(200, 'abc'), # This one should emit after debounce
on_next(500, 'abcd') # This one too
)
results = []
processor = DataProcessor.new(inputs, debounce: 100)
scheduler.schedule_absolute(0) do
processor.output.subscribe { |x| results << x }
end
scheduler.start
expect(results).to eq(['abc', 'abcd'])
end
end
Incremental adoption strategies allow introducing reactive programming without rewriting entire codebases. Reactive components can wrap existing APIs, exposing observable interfaces while maintaining backward compatibility. Teams adopt reactive patterns in high-value scenarios, evaluating benefits before broader application.
Observable interfaces can coexist with callback and promise-based code through adapter patterns. Conversion operators bridge between reactive and non-reactive code, enabling gradual migration.
# Wrapping callback API with observable
def wrap_callback_api(api)
Rx::Observable.create do |observer|
api.fetch do |result, error|
if error
observer.on_error(error)
else
observer.on_next(result)
observer.on_completed
end
end
end
end
# Converting promise to observable
def promise_to_observable(promise)
Rx::Observable.create do |observer|
promise.then do |value|
observer.on_next(value)
observer.on_completed
end.catch do |error|
observer.on_error(error)
end
end
end
# Converting observable to callback
def observable_to_callback(observable, &block)
observable.subscribe(
lambda { |value| block.call(value, nil) },
lambda { |error| block.call(nil, error) }
)
end
Tools & Ecosystem
Ruby's reactive programming ecosystem includes libraries implementing reactive extensions, supporting tools for testing and debugging, and integration with existing Ruby frameworks and tools.
RxRuby serves as the primary reactive extensions library for Ruby, implementing the ReactiveX specification. The library provides observables, operators, schedulers, and subjects with semantics matching other ReactiveX implementations. RxRuby supports MRI Ruby and JRuby, with threading models adapted to each platform.
Installation through RubyGems:
# Gemfile
gem 'rx', '~> 0.0.3'
# Installation
bundle install
RxRuby operators cover creation, transformation, filtering, combination, error handling, utility functions, and conditional operations. The library follows ReactiveX operator naming conventions with Ruby idioms where appropriate.
# Core RxRuby operators by category
# Creation operators
Rx::Observable.just(value)
Rx::Observable.from_array([1, 2, 3])
Rx::Observable.range(1, 10)
Rx::Observable.interval(1.0)
Rx::Observable.timer(5.0)
Rx::Observable.create { |observer| }
# Transformation operators
observable.map { |x| x * 2 }
observable.flat_map { |x| Rx::Observable.just(x) }
observable.scan(0) { |acc, x| acc + x }
observable.buffer_with_count(5)
observable.window_with_time(1.0)
# Filtering operators
observable.filter { |x| x > 0 }
observable.take(10)
observable.skip(5)
observable.distinct
observable.distinct_until_changed
observable.debounce(0.3)
observable.throttle_first(1.0)
# Combination operators
Rx::Observable.merge(obs1, obs2)
Rx::Observable.concat(obs1, obs2)
Rx::Observable.zip(obs1, obs2) { |a, b| [a, b] }
observable.combine_latest(other) { |a, b| [a, b] }
# Error handling operators
observable.catch { |error| Rx::Observable.just(default) }
observable.retry(3)
observable.on_error_resume_next { other_observable }
observable.timeout(5.0) { Rx::Observable.empty }
Celluloid provides actor-based concurrency with reactive capabilities. Actors process messages asynchronously, maintaining internal state while coordinating with other actors. Celluloid integrates with RxRuby for reactive message processing.
require 'celluloid/current'
class SensorActor
include Celluloid
def initialize
@subscribers = []
@reading = 0
end
def subscribe(&block)
@subscribers << block
end
def update_reading(value)
@reading = value
@subscribers.each { |subscriber| subscriber.call(value) }
end
def current_reading
@reading
end
end
# Usage
sensor = SensorActor.new
sensor.subscribe { |reading| puts "Reading: #{reading}" }
sensor.update_reading(42)
Concurrent-Ruby provides abstractions for concurrent programming including promises, futures, and thread pools. The library complements reactive programming by supplying lower-level concurrency primitives.
require 'concurrent'
# Promises as alternative to observables
promise = Concurrent::Promise.execute do
perform_computation()
end
promise.then { |result| process_result(result) }
.rescue { |error| handle_error(error) }
# Converting between promises and observables
def promise_to_observable(promise)
Rx::Observable.create do |observer|
promise.on_success { |value| observer.on_next(value); observer.on_completed }
promise.on_failure { |error| observer.on_error(error) }
end
end
EventMachine handles asynchronous I/O through a reactor pattern. While not strictly reactive programming, EventMachine provides callback-based asynchronous operations that can be wrapped in observables.
require 'eventmachine'
require 'em-http-request'
def http_observable(url)
Rx::Observable.create do |observer|
EM.run do
http = EM::HttpRequest.new(url).get
http.callback do
observer.on_next(http.response)
observer.on_completed
EM.stop
end
http.errback do
observer.on_error(StandardError.new("Request failed"))
EM.stop
end
end
end
end
Testing tools for reactive code include RxRuby's TestScheduler for deterministic timing, RSpec matchers for observable assertions, and debugging utilities.
# TestScheduler for deterministic tests
require 'rx'
scheduler = Rx::TestScheduler.new
# Create observable with scheduled emissions
source = scheduler.create_hot_observable(
Rx::Recorded.on_next(100, 'A'),
Rx::Recorded.on_next(200, 'B'),
Rx::Recorded.on_next(300, 'C'),
Rx::Recorded.on_completed(400)
)
# Record subscriptions
results = scheduler.create_observer
subscription = source.subscribe(results)
# Advance virtual time
scheduler.start
# Assert results
expect(results.messages.length).to eq(4)
expect(results.messages[0].value.value).to eq('A')
Debugging reactive streams requires specialized approaches since traditional debuggers struggle with asynchronous execution and operator composition. Logging operators, side-effect operators, and visualization tools aid understanding.
# Debugging operators
observable
.do_on_next { |x| puts "Value: #{x}" }
.do_on_error { |e| puts "Error: #{e}" }
.do_on_completed { puts "Completed" }
.do_on_subscribe { puts "Subscribed" }
.do_on_terminate { puts "Terminated" }
# Timing instrumentation
observable
.timestamp # Adds timestamp to each value
.time_interval # Measures time between emissions
.subscribe { |timed_value| puts "Value: #{timed_value.value} at #{timed_value.timestamp}" }
Integration with Rails applies reactive patterns to web applications, background jobs, and real-time features. ActionCable provides WebSocket support that combines with reactive streams for real-time updates.
# Reactive ActionCable channel
class SensorChannel < ApplicationCable::Channel
def subscribed
sensor_stream = SensorService.reactive_stream
@subscription = sensor_stream
.sample(1.0) # Send updates every second
.subscribe do |reading|
transmit(reading: reading)
end
end
def unsubscribed
@subscription&.dispose
end
end
Reference
Observable Creation Methods
| Method | Description | Use Case |
|---|---|---|
| create | Build custom observable with observer callbacks | Custom data sources requiring manual emission control |
| just | Emit single value then complete | Wrapping single values in observable context |
| from_array | Emit array elements sequentially | Converting collections to streams |
| range | Emit sequence of integers | Generating numeric sequences for testing or iteration |
| interval | Emit incrementing numbers at time intervals | Periodic polling or heartbeat signals |
| timer | Emit single value after delay | Delayed execution or timeout implementation |
| empty | Complete immediately without emitting | Placeholder streams or conditional logic |
| never | Never emit or complete | Testing subscription lifecycle |
| throw | Immediately emit error | Error injection for testing |
| defer | Defer observable creation until subscription | Late binding or per-subscription customization |
| start | Execute block on subscription and emit result | Wrapping synchronous operations asynchronously |
Core Transformation Operators
| Operator | Behavior | Common Usage |
|---|---|---|
| map | Transform each value | Data transformation, type conversion |
| flat_map | Transform to observable and flatten | Async operations per value, request chaining |
| flat_map_latest | Transform to observable, cancel previous | Latest-value semantics, search-as-you-type |
| concat_map | Transform to observable, maintain order | Sequential processing maintaining order |
| scan | Accumulate state across values | Running totals, state machines |
| buffer_with_count | Collect values into arrays of fixed size | Batch processing fixed-size groups |
| buffer_with_time | Collect values over time window | Time-based aggregation |
| window_with_count | Group into observable windows by count | Nested stream processing by count |
| window_with_time | Group into observable windows by time | Nested stream processing by time |
| group_by | Split stream into grouped observables | Partitioning by key for parallel processing |
Filtering Operators
| Operator | Behavior | Common Usage |
|---|---|---|
| filter | Emit only values matching predicate | Conditional value passing |
| take | Emit first n values then complete | Limiting result size |
| take_last | Emit last n values on completion | Tail of stream |
| skip | Skip first n values | Ignoring initial values |
| distinct | Emit only distinct values | Deduplication across entire stream |
| distinct_until_changed | Emit only when value differs from previous | Deduplication of consecutive duplicates |
| debounce | Emit value only after quiet period | Input stabilization, search delay |
| throttle_first | Emit first value per time window | Rate limiting with immediate response |
| sample | Emit most recent value at intervals | Periodic sampling of high-frequency source |
| first | Emit first value then complete | Single-value extraction |
| last | Emit last value on completion | Final value extraction |
| element_at | Emit value at specific index | Index-based access |
Combination Operators
| Operator | Behavior | Common Usage |
|---|---|---|
| merge | Interleave multiple observables | Combining concurrent sources |
| concat | Emit observables sequentially | Sequential processing maintaining source order |
| zip | Combine corresponding values from multiple sources | Coordinating parallel operations |
| combine_latest | Emit combination when any source emits | Real-time derived values |
| with_latest_from | Combine with latest from another source | Enriching stream with context |
| start_with | Prepend initial value | Providing default or initial state |
| switch_latest | Subscribe to latest observable | Canceling outdated requests |
Error Handling Operators
| Operator | Behavior | Common Usage |
|---|---|---|
| catch | Replace error with alternative observable | Error recovery with fallback |
| retry | Resubscribe on error | Transient failure recovery |
| retry_when | Custom retry logic with delays | Exponential backoff, retry limits |
| on_error_resume_next | Continue with next observable on error | Graceful degradation |
| timeout | Error if no emission within duration | Detecting stalled streams |
Scheduler Types
| Scheduler | Execution Context | Use Case |
|---|---|---|
| ImmediateScheduler | Current thread synchronously | Testing, simple synchronous operations |
| CurrentThreadScheduler | Current thread with queue | Trampolining to prevent stack overflow |
| ThreadPoolScheduler | Thread pool | Concurrent processing, I/O operations |
| NewThreadScheduler | New thread per operation | Isolated execution contexts |
| TestScheduler | Virtual time | Deterministic testing with time control |
Subject Types
| Subject | Behavior | Use Case |
|---|---|---|
| Subject | Basic publish-subscribe | Manual event emission |
| ReplaySubject | Caches and replays values | Late subscriber initialization |
| BehaviorSubject | Emits current value immediately | State representation |
| AsyncSubject | Emits last value on completion | Single async result |
Observable Lifecycle Events
| Event | Timing | Handler |
|---|---|---|
| on_next | Value emitted | Process emitted values |
| on_error | Error occurred | Handle error, stream terminates |
| on_completed | Stream finished | Cleanup, stream terminates |
| on_subscribe | Subscription created | Initialize resources |
| on_dispose | Subscription disposed | Release resources |
Common Marble Diagram Symbols
| Symbol | Meaning |
|---|---|
| -- | Time passing |
| 1 or a | Emitted value |
| | | Successful completion |
| X | Error |
| # | Subscription start |
| ^ | Unsubscription |
Error Recovery Strategy Selection
| Strategy | When to Use |
|---|---|
| catch | Fallback value or alternative source available |
| retry | Transient errors likely to resolve |
| retry_when | Custom backoff or limited attempts needed |
| on_error_resume_next | Continue processing despite errors |
| timeout | Detect and handle stalled operations |
Backpressure Strategy Selection
| Strategy | When to Use |
|---|---|
| buffer | Can accumulate values temporarily |
| sample | Periodic snapshots sufficient |
| throttle | Rate limiting with immediate first value |
| debounce | Only final stable value needed |
| take_last | Only most recent values matter |
| drop | Missing values acceptable |
Testing Pattern Examples
# TestScheduler setup
scheduler = Rx::TestScheduler.new
# Create cold observable
cold = scheduler.create_cold_observable(
Rx::Recorded.on_next(100, 'A'),
Rx::Recorded.on_completed(200)
)
# Create hot observable
hot = scheduler.create_hot_observable(
Rx::Recorded.on_next(100, 'A'),
Rx::Recorded.on_next(200, 'B')
)
# Record results
observer = scheduler.create_observer
cold.subscribe(observer)
# Run scheduler
scheduler.start
# Assert results
observer.messages # Array of Rx::Recorded
Observable Memory Management
| Pattern | Implementation |
|---|---|
| Dispose single subscription | subscription.dispose |
| Dispose multiple subscriptions | composite_disposable.add(subscription); composite_disposable.dispose |
| Automatic disposal | observable.take_until(signal) |
| Timeout disposal | observable.timeout(seconds) |
| Resource cleanup | Return Rx::Subscription.create with cleanup block |
Integration Patterns
| Pattern | Ruby Implementation |
|---|---|
| Callback to Observable | Rx::Observable.create wrapping callback API |
| Promise to Observable | Subscribe to promise then/catch and forward to observer |
| Observable to Callback | Subscribe and invoke callback with results/errors |
| EventMachine integration | Wrap EM deferrable with observable creation |
| ActionCable integration | Subscribe to observable in ActionCable channel |