Overview
ConditionVariable provides a mechanism for threads to wait for specific conditions and signal other threads when those conditions change. Ruby's ConditionVariable class works in conjunction with Mutex to create thread-safe coordination patterns where threads block until other threads signal that certain states have been reached.
The ConditionVariable class implements the classic condition variable synchronization primitive found in many threading systems. Threads acquire a mutex, check a condition, and if the condition is not met, they wait on the condition variable while atomically releasing the mutex. Other threads can then signal waiting threads when they modify shared state and conditions become true.
Ruby's implementation requires explicit mutex management. Threads must hold a mutex lock when calling wait methods, and the same mutex must be used consistently across all operations on shared state associated with the condition variable.
# Basic condition variable setup
mutex = Mutex.new
condition = ConditionVariable.new
shared_resource = []
# Consumer thread waits for data
Thread.new do
mutex.synchronize do
while shared_resource.empty?
condition.wait(mutex)
end
item = shared_resource.shift
puts "Consumed: #{item}"
end
end
# Producer thread adds data and signals
Thread.new do
sleep(1) # Simulate work
mutex.synchronize do
shared_resource << "data"
condition.signal
end
end
The condition variable automatically handles the complex atomic operations of releasing the mutex during wait and reacquiring it when signaled. This prevents race conditions that would occur if these operations were performed separately.
Ruby provides three primary methods for condition variable operations: wait
blocks the current thread until signaled, signal
wakes up one waiting thread, and broadcast
wakes up all waiting threads. These methods form the foundation for coordinating thread execution based on shared state conditions.
Basic Usage
ConditionVariable operations center around the wait-signal pattern where threads wait for conditions to be met and other threads signal when those conditions change. The most common pattern involves a mutex protecting shared state and a condition variable coordinating access based on that state.
Creating and using condition variables requires establishing the synchronization context first. Threads must hold the associated mutex before calling wait methods, and the same mutex must be used for all related operations.
require 'thread'
# Producer-consumer with condition variable
buffer = []
buffer_max = 5
mutex = Mutex.new
not_full = ConditionVariable.new
not_empty = ConditionVariable.new
producer = Thread.new do
10.times do |i|
mutex.synchronize do
while buffer.length >= buffer_max
not_full.wait(mutex)
end
buffer << "item_#{i}"
puts "Produced item_#{i}, buffer size: #{buffer.length}"
not_empty.signal
end
sleep(0.1)
end
end
consumer = Thread.new do
10.times do
mutex.synchronize do
while buffer.empty?
not_empty.wait(mutex)
end
item = buffer.shift
puts "Consumed #{item}, buffer size: #{buffer.length}"
not_full.signal
end
sleep(0.2)
end
end
producer.join
consumer.join
The wait method releases the mutex atomically and suspends the calling thread until another thread calls signal or broadcast on the same condition variable. When the thread wakes up, it reacquires the mutex before continuing execution.
Signaling operations communicate condition changes to waiting threads. The signal method wakes up one waiting thread, while broadcast wakes up all waiting threads. Threads that wake up must recheck their conditions since multiple threads might be competing for the same resources.
# Multiple consumers with broadcast
results = []
work_queue = [1, 2, 3, 4, 5]
mutex = Mutex.new
work_available = ConditionVariable.new
work_complete = ConditionVariable.new
# Multiple worker threads
workers = 3.times.map do |worker_id|
Thread.new do
loop do
task = nil
mutex.synchronize do
while work_queue.empty?
work_available.wait(mutex)
break if work_queue.nil? # Shutdown signal
end
break if work_queue.nil?
task = work_queue.shift
end
# Process task outside of mutex
result = task * task
mutex.synchronize do
results << result
work_complete.signal
end
end
end
end
# Wait for all work to complete
mutex.synchronize do
while results.length < 5
work_complete.wait(mutex)
end
work_queue = nil # Signal shutdown
work_available.broadcast # Wake all workers for shutdown
end
workers.each(&:join)
puts "Results: #{results.sort}"
The condition variable handles the complex synchronization automatically. When wait is called, the current thread releases the mutex and blocks. When signal or broadcast is called, waiting threads are awakened and attempt to reacquire the mutex before continuing.
Proper condition variable usage requires checking conditions in loops rather than simple if statements. This accounts for spurious wakeups and scenarios where multiple threads compete for the same resources after being signaled.
Thread Safety & Concurrency
ConditionVariable operations are thread-safe by design, but proper usage requires understanding the coordination between mutexes and condition variables. The condition variable itself is thread-safe, but the shared state it coordinates must be protected by the associated mutex.
Multiple threads can safely call wait, signal, and broadcast on the same condition variable simultaneously. The condition variable manages its internal state to handle concurrent access without race conditions. However, threads must follow the protocol of holding the mutex when calling wait and when accessing shared state.
# Thread-safe counter with condition-based waiting
class CountdownLatch
def initialize(count)
@count = count
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def countdown
@mutex.synchronize do
@count -= 1
if @count <= 0
@condition.broadcast
end
end
end
def wait
@mutex.synchronize do
while @count > 0
@condition.wait(@mutex)
end
end
end
def count
@mutex.synchronize { @count }
end
end
latch = CountdownLatch.new(3)
# Multiple threads waiting for countdown
waiters = 5.times.map do |i|
Thread.new do
puts "Thread #{i} waiting..."
latch.wait
puts "Thread #{i} proceeding!"
end
end
# Multiple threads counting down
counters = 3.times.map do |i|
Thread.new do
sleep(rand)
puts "Countdown #{i}"
latch.countdown
end
end
(waiters + counters).each(&:join)
Spurious wakeups can occur where threads wake up from wait without an explicit signal or broadcast. This behavior is implementation-dependent and can happen due to system-level interrupts or other threading system events. Code must handle spurious wakeups by rechecking conditions in loops.
The atomicity of wait operations prevents race conditions between releasing the mutex and blocking the thread. Without this atomicity, other threads could signal the condition variable in the window between mutex release and thread blocking, causing missed signals.
# Handling multiple condition variables safely
class ReadWriteCoordinator
def initialize
@readers = 0
@writers = 0
@write_requests = 0
@mutex = Mutex.new
@read_ready = ConditionVariable.new
@write_ready = ConditionVariable.new
end
def acquire_read_lock
@mutex.synchronize do
while @writers > 0 || @write_requests > 0
@read_ready.wait(@mutex)
end
@readers += 1
end
end
def release_read_lock
@mutex.synchronize do
@readers -= 1
if @readers == 0
@write_ready.broadcast
end
end
end
def acquire_write_lock
@mutex.synchronize do
@write_requests += 1
while @readers > 0 || @writers > 0
@write_ready.wait(@mutex)
end
@write_requests -= 1
@writers += 1
end
end
def release_write_lock
@mutex.synchronize do
@writers -= 1
@write_ready.broadcast
@read_ready.broadcast
end
end
end
coordinator = ReadWriteCoordinator.new
shared_data = "initial"
# Multiple readers
readers = 3.times.map do |i|
Thread.new do
coordinator.acquire_read_lock
puts "Reader #{i}: #{shared_data}"
sleep(0.1)
coordinator.release_read_lock
end
end
# Single writer
writer = Thread.new do
sleep(0.05)
coordinator.acquire_write_lock
shared_data = "updated"
puts "Writer updated data"
coordinator.release_write_lock
end
(readers + [writer]).each(&:join)
Signal delivery is not guaranteed to be fair or ordered. When multiple threads are waiting on the same condition variable, signal wakes up one thread arbitrarily. Applications requiring specific ordering must implement additional coordination mechanisms.
Broadcast operations wake up all waiting threads simultaneously, but threads must still acquire the mutex sequentially before proceeding. This creates a thundering herd effect where all threads wake up but only one can proceed at a time, potentially causing performance issues with many waiting threads.
Error Handling & Debugging
ConditionVariable operations can encounter several error conditions that require specific handling strategies. The most common errors involve incorrect mutex usage, deadlocks, and coordination failures between threads.
The primary error occurs when calling wait without holding the associated mutex. Ruby raises a ThreadError
when wait is called from a thread that does not own the mutex. This error prevents common programming mistakes that would lead to undefined behavior.
# Incorrect usage - raises ThreadError
mutex = Mutex.new
condition = ConditionVariable.new
begin
condition.wait(mutex) # Not holding mutex
rescue ThreadError => e
puts "Error: #{e.message}"
# => Error: current thread not owner
end
# Correct usage
mutex.synchronize do
condition.wait(mutex) # Holding mutex through synchronize
end
Deadlock detection and prevention requires careful analysis of mutex acquisition order and condition variable dependencies. Circular waiting patterns can develop when multiple condition variables and mutexes interact in complex ways.
# Deadlock-prone pattern
mutex1 = Mutex.new
mutex2 = Mutex.new
condition1 = ConditionVariable.new
condition2 = ConditionVariable.new
# Thread A acquires mutex1, waits for condition on mutex2
thread_a = Thread.new do
mutex1.synchronize do
puts "Thread A has mutex1"
mutex2.synchronize do
condition2.wait(mutex2) # Could deadlock
end
end
end
# Thread B acquires mutex2, waits for condition on mutex1
thread_b = Thread.new do
mutex2.synchronize do
puts "Thread B has mutex2"
mutex1.synchronize do
condition1.wait(mutex1) # Could deadlock
end
end
end
# Better pattern - consistent ordering
def safe_dual_wait(mutex1, mutex2, condition1, condition2)
# Always acquire mutexes in same order
first, second = [mutex1, mutex2].sort_by(&:object_id)
first.synchronize do
second.synchronize do
if first == mutex1
condition1.wait(mutex1)
else
condition2.wait(mutex2)
end
end
end
end
Timeout handling for condition variable waits prevents threads from waiting indefinitely when expected signals never arrive. Ruby's ConditionVariable does not directly support timeouts, but they can be implemented using thread scheduling.
# Implementing timeout for condition variable wait
class TimedConditionVariable
def initialize
@condition = ConditionVariable.new
end
def wait(mutex, timeout = nil)
if timeout
timed_wait(mutex, timeout)
else
@condition.wait(mutex)
end
end
def signal
@condition.signal
end
def broadcast
@condition.broadcast
end
private
def timed_wait(mutex, timeout)
start_time = Time.now
timeout_thread = Thread.new do
sleep(timeout)
Thread.current.kill # Self-terminate after timeout
end
begin
@condition.wait(mutex)
timeout_thread.kill if timeout_thread.alive?
true
rescue ThreadError
elapsed = Time.now - start_time
elapsed < timeout # Return false if timeout occurred
end
end
end
# Usage with timeout handling
mutex = Mutex.new
timed_condition = TimedConditionVariable.new
result = nil
waiter = Thread.new do
mutex.synchronize do
success = timed_condition.wait(mutex, 2.0)
result = success ? "Signaled" : "Timeout"
end
end
# Signal after 1 second (should succeed)
signaler = Thread.new do
sleep(1)
timed_condition.signal
end
waiter.join
signaler.join
puts result # => "Signaled"
Debugging condition variable issues requires tracing thread states and signal patterns. Lost signals occur when signals are sent before threads begin waiting, and missed wakeups happen when threads fail to recheck conditions after spurious wakeups.
Signal ordering problems arise in complex coordination patterns where the sequence of signals affects correctness. Debugging requires logging signal and wait operations with timestamps and thread identifiers to reconstruct the execution timeline.
# Debug wrapper for condition variables
class DebuggingConditionVariable
def initialize(name)
@name = name
@condition = ConditionVariable.new
@signal_count = 0
@wait_count = 0
end
def wait(mutex)
@wait_count += 1
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} waiting on #{@name} (wait ##{@wait_count})"
@condition.wait(mutex)
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} resumed from #{@name}"
end
def signal
@signal_count += 1
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} signaling #{@name} (signal ##{@signal_count})"
@condition.signal
end
def broadcast
@signal_count += 1
puts "[#{Time.now.strftime('%H:%M:%S.%3N')}] #{Thread.current.object_id} broadcasting #{@name} (broadcast ##{@signal_count})"
@condition.broadcast
end
end
Performance & Memory
ConditionVariable operations have specific performance characteristics that affect application scalability and resource usage. The wait operation involves thread context switches and kernel calls that can impact performance in high-frequency coordination scenarios.
Signal operations are generally fast since they only mark waiting threads as ready to run without performing immediate thread switches. However, broadcast operations can cause thundering herd problems when many threads are waiting, as all threads wake up simultaneously and compete for the same mutex.
# Performance comparison of signal vs broadcast
require 'benchmark'
def benchmark_condition_variable_pattern(num_threads, use_broadcast)
mutex = Mutex.new
condition = ConditionVariable.new
counter = 0
target = num_threads * 10
threads = num_threads.times.map do |i|
Thread.new do
10.times do
mutex.synchronize do
while counter >= target
condition.wait(mutex)
end
counter += 1
if use_broadcast && counter % 10 == 0
condition.broadcast
elsif !use_broadcast
condition.signal
end
end
end
end
end
start_time = Time.now
threads.each(&:join)
Time.now - start_time
end
puts "Signal approach:"
signal_time = benchmark_condition_variable_pattern(20, false)
puts "Time: #{signal_time}s"
puts "\nBroadcast approach:"
broadcast_time = benchmark_condition_variable_pattern(20, true)
puts "Time: #{broadcast_time}s"
Memory usage for condition variables is minimal for the objects themselves, but waiting threads consume stack memory and system resources. Each waiting thread maintains its stack and thread control block, which can become significant with many waiting threads.
Context switching overhead becomes prominent when threads frequently wait and signal on condition variables. Each wait operation involves switching from user space to kernel space and back, with associated CPU cache effects and scheduling overhead.
# Memory-efficient producer-consumer with bounded waiting
class BoundedBuffer
def initialize(capacity, max_waiters = 10)
@buffer = []
@capacity = capacity
@max_waiters = max_waiters
@mutex = Mutex.new
@not_empty = ConditionVariable.new
@not_full = ConditionVariable.new
@waiting_producers = 0
@waiting_consumers = 0
end
def put(item)
@mutex.synchronize do
if @waiting_producers >= @max_waiters
raise "Too many waiting producers"
end
@waiting_producers += 1
begin
while @buffer.length >= @capacity
@not_full.wait(@mutex)
end
@buffer << item
@not_empty.signal
ensure
@waiting_producers -= 1
end
end
end
def take
@mutex.synchronize do
if @waiting_consumers >= @max_waiters
raise "Too many waiting consumers"
end
@waiting_consumers += 1
begin
while @buffer.empty?
@not_empty.wait(@mutex)
end
item = @buffer.shift
@not_full.signal
item
ensure
@waiting_consumers -= 1
end
end
end
def stats
@mutex.synchronize do
{
buffer_size: @buffer.length,
waiting_producers: @waiting_producers,
waiting_consumers: @waiting_consumers
}
end
end
end
Lock contention can become severe when many threads compete for the same mutex associated with a condition variable. This creates bottlenecks where threads spend more time waiting for mutex access than performing actual work.
Batching operations can improve performance by reducing the frequency of wait and signal operations. Instead of signaling after each individual change, threads can accumulate changes and signal periodically or when thresholds are met.
# Batched signaling for better performance
class BatchedWorkQueue
def initialize(batch_size = 5)
@queue = []
@batch_size = batch_size
@mutex = Mutex.new
@work_available = ConditionVariable.new
@pending_signals = 0
end
def add_work(item)
@mutex.synchronize do
@queue << item
@pending_signals += 1
# Signal in batches to reduce contention
if @pending_signals >= @batch_size
@work_available.broadcast
@pending_signals = 0
end
end
end
def get_work(timeout = 1.0)
@mutex.synchronize do
start_time = Time.now
while @queue.empty?
elapsed = Time.now - start_time
return nil if elapsed >= timeout
@work_available.wait(@mutex)
end
@queue.shift
end
end
def flush_signals
@mutex.synchronize do
if @pending_signals > 0
@work_available.broadcast
@pending_signals = 0
end
end
end
end
CPU usage patterns with condition variables depend heavily on the wait-to-work ratio. Applications that wait frequently relative to processing time will show low CPU utilization, while those with short waits and long processing periods will maintain higher CPU usage.
Profiling condition variable usage requires monitoring thread states, wait times, and signal frequencies. High-frequency signaling with short waits can indicate inefficient coordination patterns that might benefit from alternative synchronization approaches.
Common Pitfalls
Lost signal problems occur when threads call signal before any threads are waiting on the condition variable. Unlike some synchronization primitives, condition variables do not queue signals, so signals sent to condition variables with no waiting threads are lost permanently.
# Lost signal scenario
mutex = Mutex.new
condition = ConditionVariable.new
flag = false
# Producer signals before consumer waits
producer = Thread.new do
mutex.synchronize do
flag = true
condition.signal # Signal sent but no one waiting
end
end
producer.join
# Consumer waits after signal was sent
consumer = Thread.new do
mutex.synchronize do
while !flag # Condition is already true
condition.wait(mutex) # Never wakes up - signal was lost
end
puts "Consumer proceeding"
end
end
# This will hang forever without additional signaling
# consumer.join # Would deadlock
The spurious wakeup problem requires condition checks in loops rather than simple if statements. Threads can wake up from wait without an explicit signal, and multiple threads might wake up from a single signal when only one should proceed.
# Incorrect - using if instead of while
mutex = Mutex.new
condition = ConditionVariable.new
resource_available = false
# Wrong approach
Thread.new do
mutex.synchronize do
if !resource_available # Should be 'while'
condition.wait(mutex)
end
# Might execute even when resource_available is still false
puts "Using resource"
end
end
# Correct approach
Thread.new do
mutex.synchronize do
while !resource_available # Recheck condition
condition.wait(mutex)
end
puts "Using resource safely"
end
end
Mutex ownership violations happen when threads call wait on a condition variable without properly holding the associated mutex. This creates undefined behavior and Ruby raises ThreadError to prevent these mistakes.
# Mutex ownership pitfall
mutex = Mutex.new
condition = ConditionVariable.new
# Wrong - releasing mutex before wait
Thread.new do
mutex.lock
puts "Have lock"
mutex.unlock
begin
condition.wait(mutex) # Error - don't own mutex
rescue ThreadError => e
puts "Cannot wait: #{e.message}"
end
end
# Correct - keep mutex ownership
Thread.new do
mutex.synchronize do
puts "Have lock through synchronize"
condition.wait(mutex) # Safe - mutex held via synchronize
end
end
The thundering herd problem occurs with broadcast operations when many threads are waiting and all wake up simultaneously to compete for the same resources. This creates unnecessary context switching and contention.
# Thundering herd with broadcast
mutex = Mutex.new
condition = ConditionVariable.new
shared_counter = 0
# Many threads waiting for same resource
waiters = 100.times.map do
Thread.new do
mutex.synchronize do
while shared_counter < 1
condition.wait(mutex)
end
# All 100 threads wake up but only one can decrement
if shared_counter > 0
shared_counter -= 1
puts "Thread #{Thread.current.object_id} got resource"
end
end
end
end
# Producer broadcasts to all
Thread.new do
sleep(0.1)
mutex.synchronize do
shared_counter = 1 # Only one resource available
condition.broadcast # Wakes all 100 threads unnecessarily
end
end
waiters.each(&:join)
Race conditions can develop between condition checks and wait calls when threads don't properly maintain mutex ownership throughout the entire check-and-wait sequence. Interrupting threads between condition evaluation and wait can cause missed signals.
Nested condition variables create complex coordination patterns that are prone to deadlocks and missed signals. When multiple condition variables depend on overlapping shared state, the interaction between different waiting and signaling patterns can become unpredictable.
# Dangerous nested condition pattern
class ProblematicCoordinator
def initialize
@state1 = false
@state2 = false
@mutex1 = Mutex.new
@mutex2 = Mutex.new
@condition1 = ConditionVariable.new
@condition2 = ConditionVariable.new
end
def wait_for_both
# Problematic - nested mutex acquisition with different orders
@mutex1.synchronize do
while !@state1
@condition1.wait(@mutex1)
end
@mutex2.synchronize do
while !@state2
@condition2.wait(@mutex2) # Potential deadlock
end
end
end
end
def signal_both
# Different order of acquisition
@mutex2.synchronize do
@state2 = true
@condition2.signal
end
@mutex1.synchronize do
@state1 = true
@condition1.signal
end
end
end
Signal starvation happens when specific threads consistently miss signals due to scheduling or timing issues. Fair queueing is not guaranteed with condition variables, so some threads might wait indefinitely while others consistently receive signals.
Memory leaks can occur with condition variables when threads wait indefinitely due to programming errors, consuming system resources without bound. Proper timeout handling and resource cleanup help prevent these scenarios.
Reference
Core Classes and Methods
Method | Parameters | Returns | Description |
---|---|---|---|
ConditionVariable.new |
None | ConditionVariable |
Creates new condition variable |
#wait(mutex) |
mutex (Mutex) |
self |
Waits for signal while releasing mutex |
#signal |
None | self |
Wakes up one waiting thread |
#broadcast |
None | self |
Wakes up all waiting threads |
Error Types
Error | Condition | Description |
---|---|---|
ThreadError |
wait without mutex ownership | Raised when calling wait without holding the mutex |
ThreadError |
mutex mismatch | Raised when wait is called with wrong mutex |
Thread States
State | Description | Transition |
---|---|---|
Running | Thread executing normally | wait → Waiting |
Waiting | Thread blocked on condition variable | signal /broadcast → Ready |
Ready | Thread ready to run, waiting for mutex | Mutex acquired → Running |
Usage Patterns
Pattern | Use Case | Key Characteristics |
---|---|---|
Producer-Consumer | Thread coordination with shared buffer | Wait when full/empty, signal on changes |
Reader-Writer | Multiple readers, exclusive writers | Multiple condition variables for different states |
Barrier | Wait for multiple threads to reach point | Broadcast when all threads ready |
Pool | Resource management with limited resources | Signal when resources become available |
Best Practices
Practice | Rationale | Implementation |
---|---|---|
Always use while loops for conditions | Handles spurious wakeups | while !condition { cv.wait(mutex) } |
Hold mutex during entire check-wait sequence | Prevents race conditions | Use mutex.synchronize block |
Signal after state changes | Ensures waiting threads see updates | Modify state then call signal |
Use consistent mutex ordering | Prevents deadlocks | Always acquire mutexes in same order |
Performance Considerations
Factor | Impact | Mitigation |
---|---|---|
Context switching overhead | High with frequent wait/signal | Batch operations, reduce signaling frequency |
Thundering herd with broadcast | CPU spikes, contention | Use signal instead of broadcast when possible |
Lock contention | Serialized access to shared state | Minimize critical section duration |
Memory usage per waiting thread | System resource consumption | Limit number of concurrent waiting threads |
Common Error Patterns
Anti-Pattern | Problem | Solution |
---|---|---|
if instead of while for condition |
Spurious wakeups, race conditions | Always use while loops |
Signaling without state change | Wasted wakeups, confusion | Only signal after meaningful state changes |
Multiple mutexes with condition variable | Deadlock potential, complexity | Use single mutex per condition variable |
Broadcast for single resource | Thundering herd effect | Use signal for exclusive resources |
Integration Examples
Framework | Usage Pattern | Key Points |
---|---|---|
Thread pools | Work distribution | Signal when new work arrives |
Event systems | Notification coordination | Broadcast for multiple subscribers |
Cache invalidation | Coordinated refresh | Signal when cache needs refresh |
Database connections | Pool management | Signal when connections available |