CrackedRuby logo

CrackedRuby

ConditionVariable

A guide to using ConditionVariable for thread synchronization and coordination in Ruby applications.

Concurrency and Parallelism Synchronization
6.4.3

Overview

ConditionVariable provides a mechanism for threads to wait for specific conditions and signal other threads when those conditions change. Ruby's ConditionVariable class works in conjunction with Mutex to create thread-safe coordination patterns where threads block until other threads signal that certain states have been reached.

The ConditionVariable class implements the classic condition variable synchronization primitive found in many threading systems. Threads acquire a mutex, check a condition, and if the condition is not met, they wait on the condition variable while atomically releasing the mutex. Other threads can then signal waiting threads when they modify shared state and conditions become true.

Ruby's implementation requires explicit mutex management. Threads must hold a mutex lock when calling wait methods, and the same mutex must be used consistently across all operations on shared state associated with the condition variable.

# Basic condition variable setup
mutex = Mutex.new
condition = ConditionVariable.new
shared_resource = []

# Consumer thread waits for data
Thread.new do
  mutex.synchronize do
    while shared_resource.empty?
      condition.wait(mutex)
    end
    item = shared_resource.shift
    puts "Consumed: #{item}"
  end
end

# Producer thread adds data and signals
Thread.new do
  sleep(1) # Simulate work
  mutex.synchronize do
    shared_resource << "data"
    condition.signal
  end
end

The condition variable automatically handles the complex atomic operations of releasing the mutex during wait and reacquiring it when signaled. This prevents race conditions that would occur if these operations were performed separately.

Ruby provides three primary methods for condition variable operations: wait blocks the current thread until signaled, signal wakes up one waiting thread, and broadcast wakes up all waiting threads. These methods form the foundation for coordinating thread execution based on shared state conditions.

Basic Usage

ConditionVariable operations center around the wait-signal pattern where threads wait for conditions to be met and other threads signal when those conditions change. The most common pattern involves a mutex protecting shared state and a condition variable coordinating access based on that state.

Creating and using condition variables requires establishing the synchronization context first. Threads must hold the associated mutex before calling wait methods, and the same mutex must be used for all related operations.

require 'thread'

# Producer-consumer with condition variable
buffer = []
buffer_max = 5
mutex = Mutex.new
not_full = ConditionVariable.new
not_empty = ConditionVariable.new

producer = Thread.new do
  10.times do |i|
    mutex.synchronize do
      while buffer.length >= buffer_max
        not_full.wait(mutex)
      end
      buffer << "item_#{i}"
      puts "Produced item_#{i}, buffer size: #{buffer.length}"
      not_empty.signal
    end
    sleep(0.1)
  end
end

consumer = Thread.new do
  10.times do
    mutex.synchronize do
      while buffer.empty?
        not_empty.wait(mutex)
      end
      item = buffer.shift
      puts "Consumed #{item}, buffer size: #{buffer.length}"
      not_full.signal
    end
    sleep(0.2)
  end
end

producer.join
consumer.join

The wait method releases the mutex atomically and suspends the calling thread until another thread calls signal or broadcast on the same condition variable. When the thread wakes up, it reacquires the mutex before continuing execution.

Signaling operations communicate condition changes to waiting threads. The signal method wakes up one waiting thread, while broadcast wakes up all waiting threads. Threads that wake up must recheck their conditions since multiple threads might be competing for the same resources.

# Multiple consumers with broadcast
results = []
work_queue = [1, 2, 3, 4, 5]
mutex = Mutex.new
work_available = ConditionVariable.new
work_complete = ConditionVariable.new

# Multiple worker threads
workers = 3.times.map do |worker_id|
  Thread.new do
    loop do
      task = nil
      mutex.synchronize do
        while work_queue.empty?
          work_available.wait(mutex)
          break if work_queue.nil? # Shutdown signal
        end
        break if work_queue.nil?
        task = work_queue.shift
      end
      
      # Process task outside of mutex
      result = task * task
      
      mutex.synchronize do
        results << result
        work_complete.signal
      end
    end
  end
end

# Wait for all work to complete
mutex.synchronize do
  while results.length < 5
    work_complete.wait(mutex)
  end
  work_queue = nil # Signal shutdown
  work_available.broadcast # Wake all workers for shutdown
end

workers.each(&:join)
puts "Results: #{results.sort}"

The condition variable handles the complex synchronization automatically. When wait is called, the current thread releases the mutex and blocks. When signal or broadcast is called, waiting threads are awakened and attempt to reacquire the mutex before continuing.

Proper condition variable usage requires checking conditions in loops rather than simple if statements. This accounts for spurious wakeups and scenarios where multiple threads compete for the same resources after being signaled.

Thread Safety & Concurrency

ConditionVariable operations are thread-safe by design, but proper usage requires understanding the coordination between mutexes and condition variables. The condition variable itself is thread-safe, but the shared state it coordinates must be protected by the associated mutex.

Multiple threads can safely call wait, signal, and broadcast on the same condition variable simultaneously. The condition variable manages its internal state to handle concurrent access without race conditions. However, threads must follow the protocol of holding the mutex when calling wait and when accessing shared state.

# Thread-safe counter with condition-based waiting
class CountdownLatch
  def initialize(count)
    @count = count
    @mutex = Mutex.new
    @condition = ConditionVariable.new
  end
  
  def countdown
    @mutex.synchronize do
      @count -= 1
      if @count <= 0
        @condition.broadcast
      end
    end
  end
  
  def wait
    @mutex.synchronize do
      while @count > 0
        @condition.wait(@mutex)
      end
    end
  end
  
  def count
    @mutex.synchronize { @count }
  end
end

latch = CountdownLatch.new(3)

# Multiple threads waiting for countdown
waiters = 5.times.map do |i|
  Thread.new do
    puts "Thread #{i} waiting..."
    latch.wait
    puts "Thread #{i} proceeding!"
  end
end

# Multiple threads counting down
counters = 3.times.map do |i|
  Thread.new do
    sleep(rand)
    puts "Countdown #{i}"
    latch.countdown
  end
end

(waiters + counters).each(&:join)

Spurious wakeups can occur where threads wake up from wait without an explicit signal or broadcast. This behavior is implementation-dependent and can happen due to system-level interrupts or other threading system events. Code must handle spurious wakeups by rechecking conditions in loops.

The atomicity of wait operations prevents race conditions between releasing the mutex and blocking the thread. Without this atomicity, other threads could signal the condition variable in the window between mutex release and thread blocking, causing missed signals.

# Handling multiple condition variables safely
class ReadWriteCoordinator
  def initialize
    @readers = 0
    @writers = 0
    @write_requests = 0
    @mutex = Mutex.new
    @read_ready = ConditionVariable.new
    @write_ready = ConditionVariable.new
  end
  
  def acquire_read_lock
    @mutex.synchronize do
      while @writers > 0 || @write_requests > 0
        @read_ready.wait(@mutex)
      end
      @readers += 1
    end
  end
  
  def release_read_lock
    @mutex.synchronize do
      @readers -= 1
      if @readers == 0
        @write_ready.broadcast
      end
    end
  end
  
  def acquire_write_lock
    @mutex.synchronize do
      @write_requests += 1
      while @readers > 0 || @writers > 0
        @write_ready.wait(@mutex)
      end
      @write_requests -= 1
      @writers += 1
    end
  end
  
  def release_write_lock
    @mutex.synchronize do
      @writers -= 1
      @write_ready.broadcast
      @read_ready.broadcast
    end
  end
end

coordinator = ReadWriteCoordinator.new
shared_data = "initial"

# Multiple readers
readers = 3.times.map do |i|
  Thread.new do
    coordinator.acquire_read_lock
    puts "Reader #{i}: #{shared_data}"
    sleep(0.1)
    coordinator.release_read_lock
  end
end

# Single writer
writer = Thread.new do
  sleep(0.05)
  coordinator.acquire_write_lock
  shared_data = "updated"
  puts "Writer updated data"
  coordinator.release_write_lock
end

(readers + [writer]).each(&:join)

Signal delivery is not guaranteed to be fair or ordered. When multiple threads are waiting on the same condition variable, signal wakes up one thread arbitrarily. Applications requiring specific ordering must implement additional coordination mechanisms.

Broadcast operations wake up all waiting threads simultaneously, but threads must still acquire the mutex sequentially before proceeding. This creates a thundering herd effect where all threads wake up but only one can proceed at a time, potentially causing performance issues with many waiting threads.

Error Handling & Debugging

ConditionVariable operations can encounter several error conditions that require specific handling strategies. The most common errors involve incorrect mutex usage, deadlocks, and coordination failures between threads.

The primary error occurs when calling wait without holding the associated mutex. Ruby raises a ThreadError when wait is called from a thread that does not own the mutex. This error prevents common programming mistakes that would lead to undefined behavior.

# Incorrect usage - raises ThreadError
mutex = Mutex.new
condition = ConditionVariable.new

begin
  condition.wait(mutex) # Not holding mutex
rescue ThreadError => e
  puts "Error: #{e.message}"
  # => Error: current thread not owner
end

# Correct usage
mutex.synchronize do
  condition.wait(mutex) # Holding mutex through synchronize
end

Deadlock detection and prevention requires careful analysis of mutex acquisition order and condition variable dependencies. Circular waiting patterns can develop when multiple condition variables and mutexes interact in complex ways.

# Deadlock-prone pattern
mutex1 = Mutex.new
mutex2 = Mutex.new
condition1 = ConditionVariable.new
condition2 = ConditionVariable.new

# Thread A acquires mutex1, waits for condition on mutex2
thread_a = Thread.new do
  mutex1.synchronize do
    puts "Thread A has mutex1"
    mutex2.synchronize do
      condition2.wait(mutex2) # Could deadlock
    end
  end
end

# Thread B acquires mutex2, waits for condition on mutex1
thread_b = Thread.new do
  mutex2.synchronize do
    puts "Thread B has mutex2"
    mutex1.synchronize do
      condition1.wait(mutex1) # Could deadlock
    end
  end
end

# Better pattern - consistent ordering
def safe_dual_wait(mutex1, mutex2, condition1, condition2)
  # Always acquire mutexes in same order
  first, second = [mutex1, mutex2].sort_by(&:object_id)
  first.synchronize do
    second.synchronize do
      if first == mutex1
        condition1.wait(mutex1)
      else
        condition2.wait(mutex2)
      end
    end
  end
end

Timeout handling for condition variable waits prevents threads from waiting indefinitely when expected signals never arrive. Ruby's ConditionVariable does not directly support timeouts, but they can be implemented using thread scheduling.

# Implementing timeout for condition variable wait
class TimedConditionVariable
  def initialize
    @condition = ConditionVariable.new
  end
  
  def wait(mutex, timeout = nil)
    if timeout
      timed_wait(mutex, timeout)
    else
      @condition.wait(mutex)
    end
  end
  
  def signal
    @condition.signal
  end
  
  def broadcast
    @condition.broadcast
  end
  
  private
  
  def timed_wait(mutex, timeout)
    start_time = Time.now
    timeout_thread = Thread.new do
      sleep(timeout)
      Thread.current.kill # Self-terminate after timeout
    end
    
    begin
      @condition.wait(mutex)
      timeout_thread.kill if timeout_thread.alive?
      true
    rescue ThreadError
      elapsed = Time.now - start_time
      elapsed < timeout # Return false if timeout occurred
    end
  end
end

# Usage with timeout handling
mutex = Mutex.new
timed_condition = TimedConditionVariable.new
result = nil

waiter = Thread.new do
  mutex.synchronize do
    success = timed_condition.wait(mutex, 2.0)
    result = success ? "Signaled" : "Timeout"
  end
end

# Signal after 1 second (should succeed)
signaler = Thread.new do
  sleep(1)
  timed_condition.signal
end

waiter.join
signaler.join
puts result # => "Signaled"

Debugging condition variable issues requires tracing thread states and signal patterns. Lost signals occur when signals are sent before threads begin waiting, and missed wakeups happen when threads fail to recheck conditions after spurious wakeups.

Signal ordering problems arise in complex coordination patterns where the sequence of signals affects correctness. Debugging requires logging signal and wait operations with timestamps and thread identifiers to reconstruct the execution timeline.

# Debug wrapper for condition variables
class DebuggingConditionVariable
  def initialize(name)
    @name = name
    @condition = ConditionVariable.new
    @signal_count = 0
    @wait_count = 0
  end
  
  def wait(mutex)
    @wait_count += 1
    puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} waiting on #{@name} (wait ##{@wait_count})"
    @condition.wait(mutex)
    puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} resumed from #{@name}"
  end
  
  def signal
    @signal_count += 1
    puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} signaling #{@name} (signal ##{@signal_count})"
    @condition.signal
  end
  
  def broadcast
    @signal_count += 1
    puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} broadcasting #{@name} (broadcast ##{@signal_count})"
    @condition.broadcast
  end
end

Performance & Memory

ConditionVariable operations have specific performance characteristics that affect application scalability and resource usage. The wait operation involves thread context switches and kernel calls that can impact performance in high-frequency coordination scenarios.

Signal operations are generally fast since they only mark waiting threads as ready to run without performing immediate thread switches. However, broadcast operations can cause thundering herd problems when many threads are waiting, as all threads wake up simultaneously and compete for the same mutex.

# Performance comparison of signal vs broadcast
require 'benchmark'

def benchmark_condition_variable_pattern(num_threads, use_broadcast)
  mutex = Mutex.new
  condition = ConditionVariable.new
  counter = 0
  target = num_threads * 10
  
  threads = num_threads.times.map do |i|
    Thread.new do
      10.times do
        mutex.synchronize do
          while counter >= target
            condition.wait(mutex)
          end
          counter += 1
          if use_broadcast && counter % 10 == 0
            condition.broadcast
          elsif !use_broadcast
            condition.signal
          end
        end
      end
    end
  end
  
  start_time = Time.now
  threads.each(&:join)
  Time.now - start_time
end

puts "Signal approach:"
signal_time = benchmark_condition_variable_pattern(20, false)
puts "Time: #{signal_time}s"

puts "\nBroadcast approach:"
broadcast_time = benchmark_condition_variable_pattern(20, true)
puts "Time: #{broadcast_time}s"

Memory usage for condition variables is minimal for the objects themselves, but waiting threads consume stack memory and system resources. Each waiting thread maintains its stack and thread control block, which can become significant with many waiting threads.

Context switching overhead becomes prominent when threads frequently wait and signal on condition variables. Each wait operation involves switching from user space to kernel space and back, with associated CPU cache effects and scheduling overhead.

# Memory-efficient producer-consumer with bounded waiting
class BoundedBuffer
  def initialize(capacity, max_waiters = 10)
    @buffer = []
    @capacity = capacity
    @max_waiters = max_waiters
    @mutex = Mutex.new
    @not_empty = ConditionVariable.new
    @not_full = ConditionVariable.new
    @waiting_producers = 0
    @waiting_consumers = 0
  end
  
  def put(item)
    @mutex.synchronize do
      if @waiting_producers >= @max_waiters
        raise "Too many waiting producers"
      end
      
      @waiting_producers += 1
      begin
        while @buffer.length >= @capacity
          @not_full.wait(@mutex)
        end
        @buffer << item
        @not_empty.signal
      ensure
        @waiting_producers -= 1
      end
    end
  end
  
  def take
    @mutex.synchronize do
      if @waiting_consumers >= @max_waiters
        raise "Too many waiting consumers"
      end
      
      @waiting_consumers += 1
      begin
        while @buffer.empty?
          @not_empty.wait(@mutex)
        end
        item = @buffer.shift
        @not_full.signal
        item
      ensure
        @waiting_consumers -= 1
      end
    end
  end
  
  def stats
    @mutex.synchronize do
      {
        buffer_size: @buffer.length,
        waiting_producers: @waiting_producers,
        waiting_consumers: @waiting_consumers
      }
    end
  end
end

Lock contention can become severe when many threads compete for the same mutex associated with a condition variable. This creates bottlenecks where threads spend more time waiting for mutex access than performing actual work.

Batching operations can improve performance by reducing the frequency of wait and signal operations. Instead of signaling after each individual change, threads can accumulate changes and signal periodically or when thresholds are met.

# Batched signaling for better performance
class BatchedWorkQueue
  def initialize(batch_size = 5)
    @queue = []
    @batch_size = batch_size
    @mutex = Mutex.new
    @work_available = ConditionVariable.new
    @pending_signals = 0
  end
  
  def add_work(item)
    @mutex.synchronize do
      @queue << item
      @pending_signals += 1
      
      # Signal in batches to reduce contention
      if @pending_signals >= @batch_size
        @work_available.broadcast
        @pending_signals = 0
      end
    end
  end
  
  def get_work(timeout = 1.0)
    @mutex.synchronize do
      start_time = Time.now
      
      while @queue.empty?
        elapsed = Time.now - start_time
        return nil if elapsed >= timeout
        
        @work_available.wait(@mutex)
      end
      
      @queue.shift
    end
  end
  
  def flush_signals
    @mutex.synchronize do
      if @pending_signals > 0
        @work_available.broadcast
        @pending_signals = 0
      end
    end
  end
end

CPU usage patterns with condition variables depend heavily on the wait-to-work ratio. Applications that wait frequently relative to processing time will show low CPU utilization, while those with short waits and long processing periods will maintain higher CPU usage.

Profiling condition variable usage requires monitoring thread states, wait times, and signal frequencies. High-frequency signaling with short waits can indicate inefficient coordination patterns that might benefit from alternative synchronization approaches.

Common Pitfalls

Lost signal problems occur when threads call signal before any threads are waiting on the condition variable. Unlike some synchronization primitives, condition variables do not queue signals, so signals sent to condition variables with no waiting threads are lost permanently.

# Lost signal scenario
mutex = Mutex.new
condition = ConditionVariable.new
flag = false

# Producer signals before consumer waits
producer = Thread.new do
  mutex.synchronize do
    flag = true
    condition.signal # Signal sent but no one waiting
  end
end

producer.join

# Consumer waits after signal was sent
consumer = Thread.new do
  mutex.synchronize do
    while !flag # Condition is already true
      condition.wait(mutex) # Never wakes up - signal was lost
    end
    puts "Consumer proceeding"
  end
end

# This will hang forever without additional signaling
# consumer.join # Would deadlock

The spurious wakeup problem requires condition checks in loops rather than simple if statements. Threads can wake up from wait without an explicit signal, and multiple threads might wake up from a single signal when only one should proceed.

# Incorrect - using if instead of while
mutex = Mutex.new
condition = ConditionVariable.new
resource_available = false

# Wrong approach
Thread.new do
  mutex.synchronize do
    if !resource_available # Should be 'while'
      condition.wait(mutex)
    end
    # Might execute even when resource_available is still false
    puts "Using resource"
  end
end

# Correct approach
Thread.new do
  mutex.synchronize do
    while !resource_available # Recheck condition
      condition.wait(mutex)
    end
    puts "Using resource safely"
  end
end

Mutex ownership violations happen when threads call wait on a condition variable without properly holding the associated mutex. This creates undefined behavior and Ruby raises ThreadError to prevent these mistakes.

# Mutex ownership pitfall
mutex = Mutex.new
condition = ConditionVariable.new

# Wrong - releasing mutex before wait
Thread.new do
  mutex.lock
  puts "Have lock"
  mutex.unlock
  
  begin
    condition.wait(mutex) # Error - don't own mutex
  rescue ThreadError => e
    puts "Cannot wait: #{e.message}"
  end
end

# Correct - keep mutex ownership
Thread.new do
  mutex.synchronize do
    puts "Have lock through synchronize"
    condition.wait(mutex) # Safe - mutex held via synchronize
  end
end

The thundering herd problem occurs with broadcast operations when many threads are waiting and all wake up simultaneously to compete for the same resources. This creates unnecessary context switching and contention.

# Thundering herd with broadcast
mutex = Mutex.new
condition = ConditionVariable.new
shared_counter = 0

# Many threads waiting for same resource
waiters = 100.times.map do
  Thread.new do
    mutex.synchronize do
      while shared_counter < 1
        condition.wait(mutex)
      end
      # All 100 threads wake up but only one can decrement
      if shared_counter > 0
        shared_counter -= 1
        puts "Thread #{Thread.current.object_id} got resource"
      end
    end
  end
end

# Producer broadcasts to all
Thread.new do
  sleep(0.1)
  mutex.synchronize do
    shared_counter = 1 # Only one resource available
    condition.broadcast # Wakes all 100 threads unnecessarily
  end
end

waiters.each(&:join)

Race conditions can develop between condition checks and wait calls when threads don't properly maintain mutex ownership throughout the entire check-and-wait sequence. Interrupting threads between condition evaluation and wait can cause missed signals.

Nested condition variables create complex coordination patterns that are prone to deadlocks and missed signals. When multiple condition variables depend on overlapping shared state, the interaction between different waiting and signaling patterns can become unpredictable.

# Dangerous nested condition pattern
class ProblematicCoordinator
  def initialize
    @state1 = false
    @state2 = false
    @mutex1 = Mutex.new
    @mutex2 = Mutex.new
    @condition1 = ConditionVariable.new
    @condition2 = ConditionVariable.new
  end
  
  def wait_for_both
    # Problematic - nested mutex acquisition with different orders
    @mutex1.synchronize do
      while !@state1
        @condition1.wait(@mutex1)
      end
      
      @mutex2.synchronize do
        while !@state2
          @condition2.wait(@mutex2) # Potential deadlock
        end
      end
    end
  end
  
  def signal_both
    # Different order of acquisition
    @mutex2.synchronize do
      @state2 = true
      @condition2.signal
    end
    
    @mutex1.synchronize do
      @state1 = true
      @condition1.signal
    end
  end
end

Signal starvation happens when specific threads consistently miss signals due to scheduling or timing issues. Fair queueing is not guaranteed with condition variables, so some threads might wait indefinitely while others consistently receive signals.

Memory leaks can occur with condition variables when threads wait indefinitely due to programming errors, consuming system resources without bound. Proper timeout handling and resource cleanup help prevent these scenarios.

Reference

Core Classes and Methods

Method Parameters Returns Description
ConditionVariable.new None ConditionVariable Creates new condition variable
#wait(mutex) mutex (Mutex) self Waits for signal while releasing mutex
#signal None self Wakes up one waiting thread
#broadcast None self Wakes up all waiting threads

Error Types

Error Condition Description
ThreadError wait without mutex ownership Raised when calling wait without holding the mutex
ThreadError mutex mismatch Raised when wait is called with wrong mutex

Thread States

State Description Transition
Running Thread executing normally wait → Waiting
Waiting Thread blocked on condition variable signal/broadcast → Ready
Ready Thread ready to run, waiting for mutex Mutex acquired → Running

Usage Patterns

Pattern Use Case Key Characteristics
Producer-Consumer Thread coordination with shared buffer Wait when full/empty, signal on changes
Reader-Writer Multiple readers, exclusive writers Multiple condition variables for different states
Barrier Wait for multiple threads to reach point Broadcast when all threads ready
Pool Resource management with limited resources Signal when resources become available

Best Practices

Practice Rationale Implementation
Always use while loops for conditions Handles spurious wakeups while !condition { cv.wait(mutex) }
Hold mutex during entire check-wait sequence Prevents race conditions Use mutex.synchronize block
Signal after state changes Ensures waiting threads see updates Modify state then call signal
Use consistent mutex ordering Prevents deadlocks Always acquire mutexes in same order

Performance Considerations

Factor Impact Mitigation
Context switching overhead High with frequent wait/signal Batch operations, reduce signaling frequency
Thundering herd with broadcast CPU spikes, contention Use signal instead of broadcast when possible
Lock contention Serialized access to shared state Minimize critical section duration
Memory usage per waiting thread System resource consumption Limit number of concurrent waiting threads

Common Error Patterns

Anti-Pattern Problem Solution
if instead of while for condition Spurious wakeups, race conditions Always use while loops
Signaling without state change Wasted wakeups, confusion Only signal after meaningful state changes
Multiple mutexes with condition variable Deadlock potential, complexity Use single mutex per condition variable
Broadcast for single resource Thundering herd effect Use signal for exclusive resources

Integration Examples

Framework Usage Pattern Key Points
Thread pools Work distribution Signal when new work arrives
Event systems Notification coordination Broadcast for multiple subscribers
Cache invalidation Coordinated refresh Signal when cache needs refresh
Database connections Pool management Signal when connections available