CrackedRuby CrackedRuby

Overview

Reactive programming represents a declarative programming paradigm focused on data streams and the propagation of change. The paradigm treats events, user inputs, API responses, and state changes as streams of values that flow through time, enabling systems to react automatically to changes in data sources.

The reactive model originated from functional reactive programming (FRP) research in the 1990s, with practical implementations emerging in the 2010s through libraries like RxJava and Rx.NET. The paradigm addresses challenges in asynchronous programming by providing compositional operators for transforming, filtering, and combining event streams.

At its core, reactive programming inverts traditional control flow. Instead of imperatively requesting data when needed, components subscribe to data streams and react when new values arrive. This inversion creates a push-based model where data producers drive execution flow rather than consumers polling for updates.

# Traditional imperative approach
def fetch_user_data(user_id)
  user = database.find_user(user_id)
  profile = api.fetch_profile(user.profile_id)
  posts = api.fetch_posts(user.id)
  { user: user, profile: profile, posts: posts }
end

# Reactive approach with observable streams
user_stream = Observable.from_database(:users)
profile_stream = user_stream.flat_map { |user| 
  Observable.from_api("/profiles/#{user.profile_id}") 
}
posts_stream = user_stream.flat_map { |user|
  Observable.from_api("/posts?user_id=#{user.id}")
}

combined = Observable.zip(user_stream, profile_stream, posts_stream)
combined.subscribe do |user, profile, posts|
  render_dashboard(user, profile, posts)
end

Reactive programming applies across domains including user interface development, real-time data processing, microservice orchestration, and event-driven systems. The paradigm excels in scenarios requiring coordination of multiple asynchronous operations, handling of continuous event streams, or responsive systems that adapt to changing data.

The reactive approach introduces complexity through abstraction but provides benefits in managing asynchronous operations, composing complex data flows, and maintaining responsive systems under varying load conditions.

Key Principles

Reactive programming builds on four fundamental concepts that define how data flows through systems and how components interact with changing data.

Observables represent data sources that emit values over time. An observable can emit zero or more values followed optionally by either a completion signal or an error. Observables remain dormant until an observer subscribes, at which point they begin producing values. This lazy evaluation model prevents unnecessary computation for unused streams.

# Observable that emits values over time
observable = Observable.create do |observer|
  observer.on_next(1)
  observer.on_next(2)
  observer.on_next(3)
  observer.on_completed
end

# Cold observable - computation happens per subscription
def create_cold_observable
  Observable.create do |observer|
    data = expensive_computation()
    observer.on_next(data)
    observer.on_completed
  end
end

# Hot observable - computation shared across subscriptions
def create_hot_observable
  subject = Subject.new
  Thread.new do
    loop do
      subject.on_next(sensor_reading())
      sleep(1)
    end
  end
  subject
end

Observers consume values emitted by observables through subscription. An observer implements three callbacks: on_next for handling emitted values, on_error for handling errors, and on_completed for handling stream completion. Observers can unsubscribe at any time to stop receiving values and free resources.

Operators transform, filter, combine, and manipulate observable streams. Operators take observables as input and return new observables, enabling functional composition of complex data transformations. Operators maintain the observable contract, propagating values, errors, and completion signals downstream.

# Transformation operators
numbers = Observable.from([1, 2, 3, 4, 5])

mapped = numbers.map { |n| n * 2 }
# Emits: 2, 4, 6, 8, 10

filtered = numbers.filter { |n| n.even? }
# Emits: 2, 4

scanned = numbers.scan(0) { |acc, n| acc + n }
# Emits: 1, 3, 6, 10, 15 (running sum)

Schedulers control execution context and threading for observable operations. Schedulers determine which thread emits values, processes transformations, and delivers results to observers. This separation of logic from execution context allows the same reactive pipeline to run synchronously in tests and asynchronously in production.

The marble diagram notation visualizes reactive streams over time, with each character representing a moment and symbols representing events:

Input:   --1--2--3--4--5--|
map(x2): --2--4--6--8--10-|
filter:  -----4-----8--10-|

-- : time passing
1  : emitted value
|  : completion
X  : error

Reactive streams follow specific lifecycle rules. Observables emit values through on_next, signal completion through on_completed or on_error, and emit no values after terminal events. Subscriptions return disposables that allow cancellation, preventing resource leaks in long-running streams.

Error handling in reactive streams differs from synchronous code. Errors propagate downstream automatically, terminating streams unless explicitly handled. Error operators like catch, retry, and on_error_resume_next provide recovery mechanisms while maintaining stream semantics.

# Error propagation and handling
observable = Observable.create do |observer|
  observer.on_next(1)
  observer.on_next(2)
  observer.on_error(StandardError.new("Failure"))
  observer.on_next(3)  # Never emitted
end

recovered = observable
  .catch { |error| Observable.just(-1) }
  .subscribe do |value|
    puts value
  end
# Prints: 1, 2, -1

Backpressure management handles situations where producers emit values faster than consumers can process them. Strategies include buffering values, dropping excess values, or signaling producers to slow down. Without backpressure handling, systems risk memory exhaustion or unbounded latency.

Ruby Implementation

Ruby implements reactive programming through several libraries and patterns, with RxRuby providing the most complete implementation of reactive extensions. Ruby's dynamic nature and block syntax align well with reactive operators, though performance characteristics differ from statically-typed implementations.

RxRuby implements the ReactiveX specification for Ruby, providing observables, operators, and schedulers. The library integrates with Ruby's threading model and supports both MRI and JRuby.

require 'rx'

# Creating observables
range_obs = Rx::Observable.range(1, 5)
array_obs = Rx::Observable.from_array([1, 2, 3])
interval_obs = Rx::Observable.interval(1.0)

# Creating from blocks
custom_obs = Rx::Observable.create do |observer|
  begin
    # Emit values
    observer.on_next("value 1")
    observer.on_next("value 2")
    observer.on_completed
  rescue => e
    observer.on_error(e)
  end
  
  # Return disposable for cleanup
  Rx::Subscription.create { puts "Cleanup" }
end

# Subscribing with observer
observer = Rx::Observer.configure do |o|
  o.on_next { |value| puts "Next: #{value}" }
  o.on_error { |error| puts "Error: #{error}" }
  o.on_completed { puts "Completed" }
end

subscription = custom_obs.subscribe(observer)

Ruby's block syntax simplifies operator chains compared to method chaining in other languages. Operators accept blocks for transformation logic, maintaining Ruby idioms while providing reactive semantics.

# Operator composition with Ruby blocks
source = Rx::Observable.from_array([
  { id: 1, name: "Alice", age: 30 },
  { id: 2, name: "Bob", age: 25 },
  { id: 3, name: "Charlie", age: 35 }
])

result = source
  .filter { |person| person[:age] > 26 }
  .map { |person| person[:name].upcase }
  .take(2)
  .to_a
  .wait
# => ["ALICE", "CHARLIE"]

Subjects serve as both observable and observer, enabling manual value emission and multicasting to multiple subscribers. RxRuby provides several subject types with different replay behaviors.

# Subject for manual emission
subject = Rx::Subject.new

subscription1 = subject.subscribe { |x| puts "Observer 1: #{x}" }
subscription2 = subject.subscribe { |x| puts "Observer 2: #{x}" }

subject.on_next(1)
subject.on_next(2)
# Both observers receive values

# ReplaySubject caches values for late subscribers
replay = Rx::ReplaySubject.new(2)  # Buffer size 2
replay.on_next(1)
replay.on_next(2)
replay.on_next(3)

replay.subscribe { |x| puts "Late subscriber: #{x}" }
# Prints: "Late subscriber: 2", "Late subscriber: 3"

# BehaviorSubject maintains current value
behavior = Rx::BehaviorSubject.new(0)
behavior.subscribe { |x| puts "Initial: #{x}" }
# Immediately prints: "Initial: 0"

behavior.on_next(1)
# Prints: "Initial: 1"

Schedulers in RxRuby control threading and execution timing. The library provides immediate, current thread, and thread pool schedulers, with support for custom implementations.

# Scheduler types
immediate = Rx::ImmediateScheduler.instance
current_thread = Rx::CurrentThreadScheduler.instance
thread_pool = Rx::ThreadPoolScheduler.new

# Observing on different schedulers
source = Rx::Observable.interval(0.1)
  .take(5)
  .observe_on(thread_pool)
  .subscribe do |value|
    puts "Value: #{value} on thread #{Thread.current.object_id}"
  end

# Subscribe on scheduler controls subscription execution
cold_observable = Rx::Observable.create do |observer|
  puts "Subscribed on thread #{Thread.current.object_id}"
  observer.on_next(1)
  observer.on_completed
end

cold_observable
  .subscribe_on(thread_pool)
  .subscribe { |x| puts "Received: #{x}" }

Ruby's integration with EventMachine and Celluloid provides alternative reactive patterns. EventMachine offers callback-based asynchronous I/O, while Celluloid provides actor-based concurrency with reactive capabilities.

# EventMachine reactive I/O
require 'eventmachine'

EM.run do
  http = EM::HttpRequest.new('http://api.example.com/data').get
  
  http.callback do
    data = JSON.parse(http.response)
    puts "Received: #{data}"
    EM.stop
  end
  
  http.errback do
    puts "Request failed"
    EM.stop
  end
end

# Celluloid actor with reactive behavior
require 'celluloid'

class DataProcessor
  include Celluloid
  
  def initialize
    @subscribers = []
  end
  
  def subscribe(&block)
    @subscribers << block
  end
  
  def process(data)
    result = transform(data)
    @subscribers.each { |sub| sub.call(result) }
  end
  
  private
  
  def transform(data)
    data.upcase
  end
end

Memory management in Ruby's reactive implementations requires attention to subscription disposal. Long-lived observables can accumulate subscriptions and closures, preventing garbage collection. Explicit disposal prevents leaks.

# Proper subscription management
class DataFeed
  def initialize
    @subscriptions = []
  end
  
  def start
    subscription = Rx::Observable.interval(1.0)
      .subscribe { |tick| process_tick(tick) }
    @subscriptions << subscription
  end
  
  def stop
    @subscriptions.each(&:dispose)
    @subscriptions.clear
  end
  
  private
  
  def process_tick(tick)
    puts "Processing tick #{tick}"
  end
end

# Automatic disposal with blocks
Rx::Observable.interval(0.1)
  .take(5)
  .subscribe_with_disposable do |subscription|
    subscription.on_next { |x| puts x }
    
    # Return cleanup logic
    -> { puts "Subscription disposed" }
  end

Practical Examples

Reactive programming solves real-world problems involving asynchronous data coordination, event stream processing, and responsive system design. The following examples demonstrate practical applications with complete implementations.

Real-time search with debouncing prevents excessive API calls by delaying search execution until user input stabilizes. This pattern applies to autocomplete, live validation, and any scenario where rapid input changes should not trigger immediate processing.

require 'rx'

class SearchBox
  def initialize(search_api)
    @search_api = search_api
    @input_subject = Rx::Subject.new
    
    setup_reactive_search
  end
  
  def on_input_change(query)
    @input_subject.on_next(query)
  end
  
  private
  
  def setup_reactive_search
    @input_subject
      .debounce(0.3)  # Wait 300ms after last input
      .filter { |query| query.length >= 3 }  # Minimum query length
      .distinct_until_changed  # Skip duplicate consecutive queries
      .flat_map_latest do |query|
        # Cancel previous search when new query arrives
        Rx::Observable.start do
          @search_api.search(query)
        end.catch { |error| 
          Rx::Observable.just({ error: error.message, results: [] })
        }
      end
      .observe_on(Rx::ImmediateScheduler.instance)
      .subscribe do |results|
        display_results(results)
      end
  end
  
  def display_results(results)
    if results[:error]
      puts "Error: #{results[:error]}"
    else
      puts "Found #{results.length} results"
      results.each { |r| puts "  - #{r[:title]}" }
    end
  end
end

# Usage
api = SearchAPI.new
search_box = SearchBox.new(api)

# Simulating rapid user input
search_box.on_input_change("r")
sleep(0.1)
search_box.on_input_change("re")
sleep(0.1)
search_box.on_input_change("rea")
sleep(0.1)
search_box.on_input_change("reac")
sleep(0.1)
search_box.on_input_change("react")
# Only "react" triggers search after 300ms delay

Event aggregation from multiple sources combines data streams from different origins, coordinating timing and handling partial failures. This pattern applies to dashboard updates, monitoring systems, and any scenario requiring synchronized data from multiple services.

class DashboardAggregator
  def initialize(user_service, metrics_service, notification_service)
    @user_service = user_service
    @metrics_service = metrics_service
    @notification_service = notification_service
  end
  
  def fetch_dashboard_data(user_id)
    # Create independent observables for each service
    user_obs = Rx::Observable.start do
      @user_service.fetch_user(user_id)
    end.catch { |error| 
      Rx::Observable.just({ error: error.message })
    }
    
    metrics_obs = Rx::Observable.start do
      @metrics_service.fetch_metrics(user_id)
    end.catch { |error|
      Rx::Observable.just([])  # Return empty on error
    }
    
    notifications_obs = Rx::Observable.start do
      @notification_service.fetch_notifications(user_id)
    end.timeout(2.0) { 
      Rx::Observable.just([])  # Fallback if service slow
    }
    
    # Combine all streams, waiting for all to complete
    Rx::Observable.zip(user_obs, metrics_obs, notifications_obs)
      .map do |user, metrics, notifications|
        {
          user: user,
          metrics: metrics,
          notifications: notifications,
          loaded_at: Time.now
        }
      end
  end
end

# Usage with subscription
aggregator = DashboardAggregator.new(user_svc, metrics_svc, notif_svc)

aggregator.fetch_dashboard_data(123).subscribe(
  lambda { |data| render_dashboard(data) },
  lambda { |error| show_error(error) },
  lambda { puts "Dashboard loaded" }
)

Rate-limited API consumption processes items from a queue while respecting API rate limits, handling backpressure when producers outpace consumers. This pattern applies to batch processing, webhook handling, and any scenario with throughput constraints.

class RateLimitedProcessor
  def initialize(api_client, requests_per_second: 10)
    @api_client = api_client
    @rate_limit_interval = 1.0 / requests_per_second
    @queue = Queue.new
    @subject = Rx::Subject.new
    
    setup_processing_pipeline
  end
  
  def enqueue(item)
    @queue << item
    @subject.on_next(item)
  end
  
  def stop
    @subject.on_completed
  end
  
  private
  
  def setup_processing_pipeline
    @subject
      .buffer_with_time_or_count(@rate_limit_interval, 1)
      .flat_map do |items|
        # Process one item at rate-limited pace
        item = items.first
        Rx::Observable.start do
          process_item(item)
        end
      end
      .retry(3)  # Retry failed requests up to 3 times
      .subscribe(
        lambda { |result| handle_success(result) },
        lambda { |error| handle_error(error) },
        lambda { puts "Processing completed" }
      )
  end
  
  def process_item(item)
    result = @api_client.post("/items", item)
    { item: item, result: result, status: :success }
  rescue => error
    { item: item, error: error, status: :failed }
  end
  
  def handle_success(result)
    if result[:status] == :success
      puts "Processed item #{result[:item][:id]}"
    else
      puts "Failed to process item #{result[:item][:id]}: #{result[:error]}"
    end
  end
  
  def handle_error(error)
    puts "Pipeline error: #{error.message}"
  end
end

# Usage
processor = RateLimitedProcessor.new(api_client, requests_per_second: 5)

100.times do |i|
  processor.enqueue({ id: i, data: "Item #{i}" })
end

sleep(25)  # Allow processing to complete
processor.stop

WebSocket message stream processing handles real-time bidirectional communication with message filtering, transformation, and error recovery. This pattern applies to chat applications, live updates, and collaborative editing.

require 'faye/websocket'
require 'rx'

class WebSocketStream
  def initialize(url)
    @url = url
    @message_subject = Rx::Subject.new
    @connection_subject = Rx::BehaviorSubject.new(:disconnected)
  end
  
  def connect
    EM.run do
      @ws = Faye::WebSocket::Client.new(@url)
      
      @ws.on :open do |event|
        @connection_subject.on_next(:connected)
        puts "WebSocket connected"
      end
      
      @ws.on :message do |event|
        @message_subject.on_next(JSON.parse(event.data))
      end
      
      @ws.on :close do |event|
        @connection_subject.on_next(:disconnected)
        @message_subject.on_completed
        puts "WebSocket closed"
      end
      
      @ws.on :error do |event|
        @message_subject.on_error(StandardError.new(event.message))
      end
    end
  end
  
  def messages
    @message_subject
  end
  
  def connection_status
    @connection_subject
  end
  
  def send_message(data)
    @ws.send(JSON.generate(data)) if @ws
  end
  
  def disconnect
    @ws.close if @ws
  end
end

# Processing messages with reactive operators
ws = WebSocketStream.new('ws://example.com/stream')
ws.connect

# Filter and transform specific message types
chat_messages = ws.messages
  .filter { |msg| msg['type'] == 'chat' }
  .map { |msg| { user: msg['user'], text: msg['text'], time: Time.now } }
  .subscribe { |msg| display_chat_message(msg) }

# Aggregate metrics from stream
metrics = ws.messages
  .filter { |msg| msg['type'] == 'metric' }
  .window_with_time(5.0)  # 5-second windows
  .flat_map do |window|
    window.reduce([]) { |acc, msg| acc << msg }
  end
  .subscribe { |msgs| update_metrics_dashboard(msgs) }

# Monitor connection status
ws.connection_status
  .distinct_until_changed
  .subscribe do |status|
    if status == :disconnected
      puts "Connection lost, attempting reconnection..."
      sleep(5)
      ws.connect
    end
  end

Common Patterns

Reactive programming employs established patterns for composing asynchronous operations, managing data flow, and handling errors. These patterns address recurring challenges in stream-based architectures.

Observable creation patterns determine how streams originate and emit values. Cold observables create new data sources per subscription, providing isolation but duplicating work. Hot observables share a single data source across subscriptions, reducing redundancy but coupling subscribers.

# Cold observable - independent execution per subscriber
def create_cold_http_observable(url)
  Rx::Observable.create do |observer|
    begin
      response = HTTP.get(url)
      observer.on_next(response.body)
      observer.on_completed
    rescue => error
      observer.on_error(error)
    end
  end
end

cold = create_cold_http_observable('http://api.example.com/data')
subscription1 = cold.subscribe { |data| process_data(data) }
subscription2 = cold.subscribe { |data| cache_data(data) }
# Makes two HTTP requests

# Hot observable - shared execution
def create_hot_sensor_observable(sensor)
  subject = Rx::Subject.new
  
  Thread.new do
    loop do
      reading = sensor.read
      subject.on_next(reading)
      sleep(1)
    end
  end
  
  subject
end

hot = create_hot_sensor_observable(temperature_sensor)
subscription1 = hot.subscribe { |temp| log_temperature(temp) }
sleep(5)
subscription2 = hot.subscribe { |temp| alert_if_high(temp) }
# Both share same sensor readings, second subscriber misses first 5 seconds

Transformation pipelines chain operators to process data through multiple stages. Each operator receives an observable and returns a new observable, maintaining immutability and enabling functional composition.

# Multi-stage data transformation pipeline
def process_sensor_data(sensor_stream)
  sensor_stream
    .map { |reading| { value: reading[:value], timestamp: Time.now } }
    .filter { |data| data[:value] > 0 }  # Remove invalid readings
    .buffer_with_time(5.0)  # Collect 5 seconds of readings
    .map { |buffer| calculate_average(buffer) }
    .scan([]) { |history, avg| history << avg; history.last(10) }  # Keep last 10 averages
    .map { |history| detect_trend(history) }
    .distinct_until_changed  # Only emit when trend changes
end

# Flattening strategies for nested observables
def fetch_user_details(user_ids)
  Rx::Observable.from_array(user_ids)
    .flat_map do |id|
      # flat_map flattens and merges concurrent requests
      fetch_user_observable(id)
    end
    # Results arrive in completion order, not input order
end

def fetch_user_details_ordered(user_ids)
  Rx::Observable.from_array(user_ids)
    .concat_map do |id|
      # concat_map maintains order but processes sequentially
      fetch_user_observable(id)
    end
    # Results arrive in input order
end

def fetch_latest_user_details(user_id_stream)
  user_id_stream
    .flat_map_latest do |id|
      # flat_map_latest cancels previous request when new id arrives
      fetch_user_observable(id)
    end
    # Only most recent request completes
end

Error handling strategies determine how streams respond to failures. Different strategies suit different reliability requirements and business logic.

# Retry with exponential backoff
def fetch_with_retry(url, max_retries: 3)
  Rx::Observable.start { HTTP.get(url) }
    .retry_when do |errors|
      errors
        .zip(Rx::Observable.range(1, max_retries)) { |error, attempt| attempt }
        .flat_map do |attempt|
          delay = 2 ** attempt  # Exponential backoff
          puts "Retry attempt #{attempt} after #{delay} seconds"
          Rx::Observable.timer(delay)
        end
    end
end

# Fallback to alternative source on error
def fetch_with_fallback(primary_url, fallback_url)
  Rx::Observable.start { HTTP.get(primary_url) }
    .on_error_resume_next do |error|
      puts "Primary source failed: #{error.message}"
      Rx::Observable.start { HTTP.get(fallback_url) }
    end
end

# Continue stream despite individual item failures
def process_items_resilient(items)
  Rx::Observable.from_array(items)
    .flat_map do |item|
      Rx::Observable.start { process_item(item) }
        .catch do |error|
          # Log error but don't terminate stream
          puts "Failed to process #{item}: #{error.message}"
          Rx::Observable.empty
        end
    end
end

# Timeout with fallback value
def fetch_with_timeout(url, timeout_seconds: 5, default_value: nil)
  Rx::Observable.start { HTTP.get(url) }
    .timeout(timeout_seconds) do
      Rx::Observable.just(default_value)
    end
end

Backpressure management controls flow when producers emit faster than consumers process. Without backpressure handling, systems accumulate unbounded queues leading to memory exhaustion.

# Buffering strategy - accumulate values
def buffer_strategy(fast_producer)
  fast_producer
    .buffer_with_time_or_count(1.0, 100)  # Buffer up to 1 second or 100 items
    .subscribe do |buffer|
      process_batch(buffer)
    end
end

# Sampling strategy - take periodic samples
def sampling_strategy(fast_producer)
  fast_producer
    .sample(0.5)  # Sample every 500ms
    .subscribe do |value|
      process_sample(value)
    end
end

# Throttling strategy - rate limit emissions
def throttle_strategy(fast_producer)
  fast_producer
    .throttle_first(1.0)  # Take first item per second, drop rest
    .subscribe do |value|
      process_value(value)
    end
end

# Dropping strategy - discard excess values
def drop_strategy(fast_producer)
  fast_producer
    .take_last(10)  # Keep only last 10 items
    .subscribe do |value|
      process_value(value)
    end
end

Resource cleanup patterns ensure proper disposal of subscriptions, connections, and system resources. Missing cleanup causes resource leaks in long-running applications.

# Composite disposable for multiple subscriptions
class DataMonitor
  def initialize
    @disposables = Rx::CompositeDisposable.new
  end
  
  def start
    subscription1 = temperature_stream.subscribe { |t| log_temperature(t) }
    subscription2 = pressure_stream.subscribe { |p| log_pressure(p) }
    subscription3 = humidity_stream.subscribe { |h| log_humidity(h) }
    
    @disposables.add(subscription1)
    @disposables.add(subscription2)
    @disposables.add(subscription3)
  end
  
  def stop
    @disposables.dispose  # Disposes all subscriptions
  end
end

# Automatic disposal with take_until
def monitor_until_signal(data_stream, stop_signal)
  data_stream
    .take_until(stop_signal)  # Automatically disposes when stop_signal emits
    .subscribe { |data| process_data(data) }
end

# Resource management with ensure block
def with_managed_stream(url)
  connection = nil
  
  Rx::Observable.create do |observer|
    begin
      connection = WebSocket.connect(url)
      
      connection.on_message do |msg|
        observer.on_next(msg)
      end
      
      Rx::Subscription.create do
        connection.close if connection
      end
    rescue => error
      observer.on_error(error)
    end
  end
end

Design Considerations

Reactive programming introduces architectural decisions regarding when to adopt the paradigm, how to structure reactive components, and what trade-offs to accept. The paradigm suits specific problem domains while adding complexity inappropriate for simpler scenarios.

Problem domain suitability determines whether reactive programming provides benefits over alternatives. The paradigm excels with asynchronous event streams, multiple concurrent operations requiring coordination, and systems requiring backpressure management. Applications processing real-time data feeds, coordinating microservices, or handling complex user interactions benefit from reactive abstractions.

Reactive programming adds overhead for simple request-response patterns, straightforward CRUD operations, or synchronous workflows. Imperative code remains clearer for sequential operations without asynchronous concerns. Introducing reactive abstractions for problems solvable with simpler patterns increases cognitive load without corresponding benefits.

# Unnecessary reactive complexity for simple case
def fetch_user_simple(id)
  # Simple synchronous approach is clearer
  user = User.find(id)
  { user: user, status: :found }
rescue ActiveRecord::RecordNotFound
  { user: nil, status: :not_found }
end

# Appropriate reactive usage for complex coordination
def fetch_dashboard_reactive(user_id)
  # Multiple async sources benefit from reactive coordination
  user_obs = Rx::Observable.start { User.find(user_id) }
  posts_obs = Rx::Observable.start { Post.where(user_id: user_id).limit(10) }
  friends_obs = Rx::Observable.start { Friendship.where(user_id: user_id) }
  
  Rx::Observable.zip(user_obs, posts_obs, friends_obs)
    .timeout(5.0)
    .map { |user, posts, friends| build_dashboard(user, posts, friends) }
end

Abstraction trade-offs balance expressiveness against comprehension difficulty. Reactive operators provide concise composition but create indirection between cause and effect. Developers must understand operator semantics, stream lifecycle, and subscription timing. Teams without reactive programming experience face steeper learning curves compared to callback or promise-based approaches.

Debugging reactive pipelines requires different techniques than imperative code. Stack traces point to operator composition rather than business logic. Observable chains execute asynchronously, complicating mental models of execution flow. Logging and instrumentation points must be inserted into streams to observe values and timing.

# Reactive pipeline debugging
def debuggable_pipeline(source)
  source
    .do_on_next { |x| puts "After source: #{x}" }
    .map { |x| transform(x) }
    .do_on_next { |x| puts "After transform: #{x}" }
    .filter { |x| valid?(x) }
    .do_on_next { |x| puts "After filter: #{x}" }
    .subscribe(
      lambda { |x| puts "Final: #{x}" },
      lambda { |e| puts "Error: #{e.message}\n#{e.backtrace.join("\n")}" }
    )
end

Performance characteristics differ between reactive implementations and direct imperative code. Reactive abstractions add overhead through operator chains, subscription management, and scheduler coordination. For CPU-bound operations on small datasets, imperative loops often outperform reactive pipelines. Reactive benefits appear with I/O-bound operations, concurrent processing, or complex coordination logic.

Memory usage patterns in reactive systems depend on operator selection. Buffering operators accumulate values in memory, risking exhaustion without bounds. Stateful operators maintain internal state across emissions. Cold observables duplicate resources per subscription. Understanding memory implications of operator chains prevents resource leaks.

# Memory-efficient reactive patterns
def memory_conscious_processing(large_dataset)
  # Avoid buffering entire dataset
  Rx::Observable.from_enumerable(large_dataset)
    .map { |item| process_item(item) }
    .filter { |result| result[:valid] }
    .take(100)  # Limit result size
    .subscribe { |result| handle_result(result) }
    
  # vs memory-intensive approach
  Rx::Observable.from_enumerable(large_dataset)
    .to_a  # Loads entire dataset into memory
    .wait
    .map { |item| process_item(item) }  # Array operations, not reactive
end

Testing reactive code requires different strategies than synchronous code. Tests must handle asynchronous execution, timing dependencies, and subscription management. Test schedulers provide deterministic timing, replacing real-time delays with virtual time.

require 'rspec'

RSpec.describe DataProcessor do
  let(:scheduler) { Rx::TestScheduler.new }
  
  it 'debounces rapid inputs' do
    inputs = scheduler.create_hot_observable(
      on_next(100, 'a'),
      on_next(150, 'ab'),
      on_next(200, 'abc'),  # This one should emit after debounce
      on_next(500, 'abcd')  # This one too
    )
    
    results = []
    processor = DataProcessor.new(inputs, debounce: 100)
    
    scheduler.schedule_absolute(0) do
      processor.output.subscribe { |x| results << x }
    end
    
    scheduler.start
    
    expect(results).to eq(['abc', 'abcd'])
  end
end

Incremental adoption strategies allow introducing reactive programming without rewriting entire codebases. Reactive components can wrap existing APIs, exposing observable interfaces while maintaining backward compatibility. Teams adopt reactive patterns in high-value scenarios, evaluating benefits before broader application.

Observable interfaces can coexist with callback and promise-based code through adapter patterns. Conversion operators bridge between reactive and non-reactive code, enabling gradual migration.

# Wrapping callback API with observable
def wrap_callback_api(api)
  Rx::Observable.create do |observer|
    api.fetch do |result, error|
      if error
        observer.on_error(error)
      else
        observer.on_next(result)
        observer.on_completed
      end
    end
  end
end

# Converting promise to observable
def promise_to_observable(promise)
  Rx::Observable.create do |observer|
    promise.then do |value|
      observer.on_next(value)
      observer.on_completed
    end.catch do |error|
      observer.on_error(error)
    end
  end
end

# Converting observable to callback
def observable_to_callback(observable, &block)
  observable.subscribe(
    lambda { |value| block.call(value, nil) },
    lambda { |error| block.call(nil, error) }
  )
end

Tools & Ecosystem

Ruby's reactive programming ecosystem includes libraries implementing reactive extensions, supporting tools for testing and debugging, and integration with existing Ruby frameworks and tools.

RxRuby serves as the primary reactive extensions library for Ruby, implementing the ReactiveX specification. The library provides observables, operators, schedulers, and subjects with semantics matching other ReactiveX implementations. RxRuby supports MRI Ruby and JRuby, with threading models adapted to each platform.

Installation through RubyGems:

# Gemfile
gem 'rx', '~> 0.0.3'

# Installation
bundle install

RxRuby operators cover creation, transformation, filtering, combination, error handling, utility functions, and conditional operations. The library follows ReactiveX operator naming conventions with Ruby idioms where appropriate.

# Core RxRuby operators by category

# Creation operators
Rx::Observable.just(value)
Rx::Observable.from_array([1, 2, 3])
Rx::Observable.range(1, 10)
Rx::Observable.interval(1.0)
Rx::Observable.timer(5.0)
Rx::Observable.create { |observer| }

# Transformation operators
observable.map { |x| x * 2 }
observable.flat_map { |x| Rx::Observable.just(x) }
observable.scan(0) { |acc, x| acc + x }
observable.buffer_with_count(5)
observable.window_with_time(1.0)

# Filtering operators
observable.filter { |x| x > 0 }
observable.take(10)
observable.skip(5)
observable.distinct
observable.distinct_until_changed
observable.debounce(0.3)
observable.throttle_first(1.0)

# Combination operators
Rx::Observable.merge(obs1, obs2)
Rx::Observable.concat(obs1, obs2)
Rx::Observable.zip(obs1, obs2) { |a, b| [a, b] }
observable.combine_latest(other) { |a, b| [a, b] }

# Error handling operators
observable.catch { |error| Rx::Observable.just(default) }
observable.retry(3)
observable.on_error_resume_next { other_observable }
observable.timeout(5.0) { Rx::Observable.empty }

Celluloid provides actor-based concurrency with reactive capabilities. Actors process messages asynchronously, maintaining internal state while coordinating with other actors. Celluloid integrates with RxRuby for reactive message processing.

require 'celluloid/current'

class SensorActor
  include Celluloid
  
  def initialize
    @subscribers = []
    @reading = 0
  end
  
  def subscribe(&block)
    @subscribers << block
  end
  
  def update_reading(value)
    @reading = value
    @subscribers.each { |subscriber| subscriber.call(value) }
  end
  
  def current_reading
    @reading
  end
end

# Usage
sensor = SensorActor.new
sensor.subscribe { |reading| puts "Reading: #{reading}" }
sensor.update_reading(42)

Concurrent-Ruby provides abstractions for concurrent programming including promises, futures, and thread pools. The library complements reactive programming by supplying lower-level concurrency primitives.

require 'concurrent'

# Promises as alternative to observables
promise = Concurrent::Promise.execute do
  perform_computation()
end

promise.then { |result| process_result(result) }
  .rescue { |error| handle_error(error) }

# Converting between promises and observables
def promise_to_observable(promise)
  Rx::Observable.create do |observer|
    promise.on_success { |value| observer.on_next(value); observer.on_completed }
    promise.on_failure { |error| observer.on_error(error) }
  end
end

EventMachine handles asynchronous I/O through a reactor pattern. While not strictly reactive programming, EventMachine provides callback-based asynchronous operations that can be wrapped in observables.

require 'eventmachine'
require 'em-http-request'

def http_observable(url)
  Rx::Observable.create do |observer|
    EM.run do
      http = EM::HttpRequest.new(url).get
      
      http.callback do
        observer.on_next(http.response)
        observer.on_completed
        EM.stop
      end
      
      http.errback do
        observer.on_error(StandardError.new("Request failed"))
        EM.stop
      end
    end
  end
end

Testing tools for reactive code include RxRuby's TestScheduler for deterministic timing, RSpec matchers for observable assertions, and debugging utilities.

# TestScheduler for deterministic tests
require 'rx'

scheduler = Rx::TestScheduler.new

# Create observable with scheduled emissions
source = scheduler.create_hot_observable(
  Rx::Recorded.on_next(100, 'A'),
  Rx::Recorded.on_next(200, 'B'),
  Rx::Recorded.on_next(300, 'C'),
  Rx::Recorded.on_completed(400)
)

# Record subscriptions
results = scheduler.create_observer
subscription = source.subscribe(results)

# Advance virtual time
scheduler.start

# Assert results
expect(results.messages.length).to eq(4)
expect(results.messages[0].value.value).to eq('A')

Debugging reactive streams requires specialized approaches since traditional debuggers struggle with asynchronous execution and operator composition. Logging operators, side-effect operators, and visualization tools aid understanding.

# Debugging operators
observable
  .do_on_next { |x| puts "Value: #{x}" }
  .do_on_error { |e| puts "Error: #{e}" }
  .do_on_completed { puts "Completed" }
  .do_on_subscribe { puts "Subscribed" }
  .do_on_terminate { puts "Terminated" }

# Timing instrumentation
observable
  .timestamp  # Adds timestamp to each value
  .time_interval  # Measures time between emissions
  .subscribe { |timed_value| puts "Value: #{timed_value.value} at #{timed_value.timestamp}" }

Integration with Rails applies reactive patterns to web applications, background jobs, and real-time features. ActionCable provides WebSocket support that combines with reactive streams for real-time updates.

# Reactive ActionCable channel
class SensorChannel < ApplicationCable::Channel
  def subscribed
    sensor_stream = SensorService.reactive_stream
    
    @subscription = sensor_stream
      .sample(1.0)  # Send updates every second
      .subscribe do |reading|
        transmit(reading: reading)
      end
  end
  
  def unsubscribed
    @subscription&.dispose
  end
end

Reference

Observable Creation Methods

Method Description Use Case
create Build custom observable with observer callbacks Custom data sources requiring manual emission control
just Emit single value then complete Wrapping single values in observable context
from_array Emit array elements sequentially Converting collections to streams
range Emit sequence of integers Generating numeric sequences for testing or iteration
interval Emit incrementing numbers at time intervals Periodic polling or heartbeat signals
timer Emit single value after delay Delayed execution or timeout implementation
empty Complete immediately without emitting Placeholder streams or conditional logic
never Never emit or complete Testing subscription lifecycle
throw Immediately emit error Error injection for testing
defer Defer observable creation until subscription Late binding or per-subscription customization
start Execute block on subscription and emit result Wrapping synchronous operations asynchronously

Core Transformation Operators

Operator Behavior Common Usage
map Transform each value Data transformation, type conversion
flat_map Transform to observable and flatten Async operations per value, request chaining
flat_map_latest Transform to observable, cancel previous Latest-value semantics, search-as-you-type
concat_map Transform to observable, maintain order Sequential processing maintaining order
scan Accumulate state across values Running totals, state machines
buffer_with_count Collect values into arrays of fixed size Batch processing fixed-size groups
buffer_with_time Collect values over time window Time-based aggregation
window_with_count Group into observable windows by count Nested stream processing by count
window_with_time Group into observable windows by time Nested stream processing by time
group_by Split stream into grouped observables Partitioning by key for parallel processing

Filtering Operators

Operator Behavior Common Usage
filter Emit only values matching predicate Conditional value passing
take Emit first n values then complete Limiting result size
take_last Emit last n values on completion Tail of stream
skip Skip first n values Ignoring initial values
distinct Emit only distinct values Deduplication across entire stream
distinct_until_changed Emit only when value differs from previous Deduplication of consecutive duplicates
debounce Emit value only after quiet period Input stabilization, search delay
throttle_first Emit first value per time window Rate limiting with immediate response
sample Emit most recent value at intervals Periodic sampling of high-frequency source
first Emit first value then complete Single-value extraction
last Emit last value on completion Final value extraction
element_at Emit value at specific index Index-based access

Combination Operators

Operator Behavior Common Usage
merge Interleave multiple observables Combining concurrent sources
concat Emit observables sequentially Sequential processing maintaining source order
zip Combine corresponding values from multiple sources Coordinating parallel operations
combine_latest Emit combination when any source emits Real-time derived values
with_latest_from Combine with latest from another source Enriching stream with context
start_with Prepend initial value Providing default or initial state
switch_latest Subscribe to latest observable Canceling outdated requests

Error Handling Operators

Operator Behavior Common Usage
catch Replace error with alternative observable Error recovery with fallback
retry Resubscribe on error Transient failure recovery
retry_when Custom retry logic with delays Exponential backoff, retry limits
on_error_resume_next Continue with next observable on error Graceful degradation
timeout Error if no emission within duration Detecting stalled streams

Scheduler Types

Scheduler Execution Context Use Case
ImmediateScheduler Current thread synchronously Testing, simple synchronous operations
CurrentThreadScheduler Current thread with queue Trampolining to prevent stack overflow
ThreadPoolScheduler Thread pool Concurrent processing, I/O operations
NewThreadScheduler New thread per operation Isolated execution contexts
TestScheduler Virtual time Deterministic testing with time control

Subject Types

Subject Behavior Use Case
Subject Basic publish-subscribe Manual event emission
ReplaySubject Caches and replays values Late subscriber initialization
BehaviorSubject Emits current value immediately State representation
AsyncSubject Emits last value on completion Single async result

Observable Lifecycle Events

Event Timing Handler
on_next Value emitted Process emitted values
on_error Error occurred Handle error, stream terminates
on_completed Stream finished Cleanup, stream terminates
on_subscribe Subscription created Initialize resources
on_dispose Subscription disposed Release resources

Common Marble Diagram Symbols

Symbol Meaning
-- Time passing
1 or a Emitted value
| Successful completion
X Error
# Subscription start
^ Unsubscription

Error Recovery Strategy Selection

Strategy When to Use
catch Fallback value or alternative source available
retry Transient errors likely to resolve
retry_when Custom backoff or limited attempts needed
on_error_resume_next Continue processing despite errors
timeout Detect and handle stalled operations

Backpressure Strategy Selection

Strategy When to Use
buffer Can accumulate values temporarily
sample Periodic snapshots sufficient
throttle Rate limiting with immediate first value
debounce Only final stable value needed
take_last Only most recent values matter
drop Missing values acceptable

Testing Pattern Examples

# TestScheduler setup
scheduler = Rx::TestScheduler.new

# Create cold observable
cold = scheduler.create_cold_observable(
  Rx::Recorded.on_next(100, 'A'),
  Rx::Recorded.on_completed(200)
)

# Create hot observable
hot = scheduler.create_hot_observable(
  Rx::Recorded.on_next(100, 'A'),
  Rx::Recorded.on_next(200, 'B')
)

# Record results
observer = scheduler.create_observer
cold.subscribe(observer)

# Run scheduler
scheduler.start

# Assert results
observer.messages  # Array of Rx::Recorded

Observable Memory Management

Pattern Implementation
Dispose single subscription subscription.dispose
Dispose multiple subscriptions composite_disposable.add(subscription); composite_disposable.dispose
Automatic disposal observable.take_until(signal)
Timeout disposal observable.timeout(seconds)
Resource cleanup Return Rx::Subscription.create with cleanup block

Integration Patterns

Pattern Ruby Implementation
Callback to Observable Rx::Observable.create wrapping callback API
Promise to Observable Subscribe to promise then/catch and forward to observer
Observable to Callback Subscribe and invoke callback with results/errors
EventMachine integration Wrap EM deferrable with observable creation
ActionCable integration Subscribe to observable in ActionCable channel