Overview
Condition variables are synchronization primitives that allow threads to suspend execution and release a lock atomically, waiting for a condition to be signaled by another thread. They solve a fundamental problem in concurrent programming: how to coordinate threads when one thread must wait for a specific state change that another thread will cause.
Without condition variables, threads must use busy-waiting (continuously checking a condition in a loop), which wastes CPU cycles. Condition variables provide an efficient mechanism for threads to sleep until notified, enabling event-driven synchronization where threads wait for specific events rather than polling for state changes.
Condition variables always work in conjunction with a mutex. The mutex protects the shared state that defines the condition, while the condition variable provides the wait and notification mechanism. This pairing ensures that checking the condition and waiting on it occurs atomically, preventing race conditions.
require 'thread'
mutex = Mutex.new
condition = ConditionVariable.new
queue = []
# Producer thread
Thread.new do
10.times do |i|
mutex.synchronize do
queue << i
condition.signal # Notify waiting consumer
end
sleep 0.1
end
end
# Consumer thread
Thread.new do
10.times do
mutex.synchronize do
while queue.empty?
condition.wait(mutex) # Release mutex and wait
end
item = queue.shift
puts "Consumed: #{item}"
end
end
end
sleep 2
The condition variable bridges the gap between thread communication and state synchronization. When a thread waits on a condition variable, it atomically releases its held mutex and enters a waiting state. When another thread signals the condition variable, one or more waiting threads wake up, reacquire the mutex, and continue execution. This atomic release-and-wait operation prevents the race condition where a notification could be lost between checking the condition and waiting.
Key Principles
Condition variables operate on three fundamental operations: wait, signal, and broadcast. Each operation has specific semantics that maintain the correctness of concurrent programs.
The wait operation is the most complex. When a thread calls wait on a condition variable, three things happen atomically: the thread checks that it holds the associated mutex, releases the mutex, and enters a sleep state on the condition variable's wait queue. The atomicity is critical because it prevents the window where a thread could release the mutex and another thread could signal the condition before the first thread enters the wait state. When the waiting thread is awakened, it must reacquire the mutex before returning from the wait call.
mutex = Mutex.new
cv = ConditionVariable.new
ready = false
# Waiting thread
Thread.new do
mutex.synchronize do
while !ready # Always use while loop, not if
cv.wait(mutex) # Atomically releases mutex and waits
end
# Mutex is held again here
puts "Condition satisfied"
end
end
# Signaling thread
Thread.new do
sleep 0.5
mutex.synchronize do
ready = true
cv.signal # Wake one waiting thread
end
end
sleep 1
The signal operation wakes exactly one thread from the condition variable's wait queue. If no threads are waiting, the signal has no effect and is lost. The awakened thread does not immediately resume execution; it first must reacquire the mutex that it released during the wait call. This means if the signaling thread still holds the mutex when it calls signal, the awakened thread will block waiting for the mutex.
The broadcast operation wakes all threads waiting on the condition variable. This is necessary when the state change that occurred might satisfy conditions for multiple waiting threads, or when the condition needs to be reevaluated by all waiters. After a broadcast, all awakened threads compete to reacquire the mutex, and only one thread at a time can proceed while others wait for the mutex.
mutex = Mutex.new
cv = ConditionVariable.new
counter = 0
threshold = 5
# Multiple waiting threads
3.times do |i|
Thread.new do
mutex.synchronize do
while counter < threshold
puts "Thread #{i} waiting"
cv.wait(mutex)
end
puts "Thread #{i} proceeding with counter=#{counter}"
end
end
end
# Thread that updates condition
Thread.new do
sleep 1
mutex.synchronize do
counter = 10
cv.broadcast # Wake all waiting threads
end
end
sleep 2
Spurious wakeups are a critical concept with condition variables. A waiting thread can wake up from a wait call even when no signal or broadcast occurred. This happens due to implementation details in the underlying operating system or runtime. For this reason, wait calls must always be in a while loop that rechecks the actual condition, never in a simple if statement. The pattern is always:
mutex.synchronize do
while !condition_is_true
cv.wait(mutex)
end
# Proceed with work
end
The predicate (condition being tested) must be protected by the same mutex used with the condition variable. This ensures that checking the condition and waiting on the condition variable are atomic with respect to threads that modify the condition and signal the condition variable. Without this protection, a race condition exists where a thread could check the condition (finding it false), then be preempted before calling wait, while another thread makes the condition true and signals, causing the signal to be lost.
Ruby Implementation
Ruby provides the ConditionVariable class in the standard library, which works with Mutex objects to provide thread synchronization. The implementation follows the standard condition variable semantics but with Ruby's threading model built on native system threads.
The basic API consists of three methods: wait, signal, and broadcast. The wait method accepts a mutex parameter and optionally a timeout value. The method releases the mutex, waits for a signal or timeout, then reacquires the mutex before returning.
require 'thread'
mutex = Mutex.new
cv = ConditionVariable.new
data = []
max_size = 5
# Producer with bounded buffer
producer = Thread.new do
10.times do |i|
mutex.synchronize do
while data.size >= max_size
puts "Producer waiting - buffer full"
cv.wait(mutex)
end
data << i
puts "Produced: #{i}, buffer size: #{data.size}"
cv.signal # Signal consumer
end
sleep rand(0.1..0.3)
end
end
# Consumer
consumer = Thread.new do
10.times do
mutex.synchronize do
while data.empty?
puts "Consumer waiting - buffer empty"
cv.wait(mutex)
end
item = data.shift
puts "Consumed: #{item}, buffer size: #{data.size}"
cv.signal # Signal producer
end
sleep rand(0.1..0.3)
end
end
producer.join
consumer.join
Ruby's ConditionVariable#wait accepts an optional timeout parameter. When a timeout is specified, the method returns even if no signal occurred, and the return value indicates whether a timeout occurred. This allows implementing bounded waiting without deadlock risk.
mutex = Mutex.new
cv = ConditionVariable.new
resource_available = false
Thread.new do
mutex.synchronize do
timeout_occurred = false
until resource_available || timeout_occurred
timeout_occurred = !cv.wait(mutex, 2.0) # Wait up to 2 seconds
if timeout_occurred && !resource_available
puts "Timeout waiting for resource"
# Handle timeout case
break
end
end
if resource_available
puts "Resource acquired"
end
end
end
sleep 1
mutex.synchronize do
resource_available = true
cv.signal
end
sleep 3
The broadcast method wakes all waiting threads, which then compete for the mutex. This is essential when multiple threads might be affected by a state change or when implementing barriers where all threads must reach a synchronization point.
mutex = Mutex.new
cv = ConditionVariable.new
barrier_count = 0
thread_count = 4
threads = thread_count.times.map do |i|
Thread.new do
# Phase 1: Do some work
sleep rand(0.5..1.5)
puts "Thread #{i} reached barrier"
# Wait at barrier
mutex.synchronize do
barrier_count += 1
if barrier_count == thread_count
# Last thread to arrive
cv.broadcast # Release all waiting threads
else
while barrier_count < thread_count
cv.wait(mutex)
end
end
end
# Phase 2: Continue after all threads synchronized
puts "Thread #{i} past barrier"
end
end
threads.each(&:join)
Ruby's condition variables integrate with the Mutex class to ensure proper synchronization. The mutex must be locked when calling wait, and wait atomically unlocks it. The mutex is always relocked before wait returns, even if an exception is raised during the wait.
class ThreadSafeQueue
def initialize
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = []
@closed = false
end
def push(item)
@mutex.synchronize do
raise "Queue closed" if @closed
@queue << item
@cv.signal # Wake one waiting consumer
end
end
def pop
@mutex.synchronize do
while @queue.empty? && !@closed
@cv.wait(@mutex)
end
if @queue.empty? && @closed
nil # Queue drained and closed
else
@queue.shift
end
end
end
def close
@mutex.synchronize do
@closed = true
@cv.broadcast # Wake all waiting consumers
end
end
end
queue = ThreadSafeQueue.new
# Producer threads
3.times do |i|
Thread.new do
5.times do |j|
queue.push("Item-#{i}-#{j}")
sleep 0.1
end
end
end
# Consumer threads
2.times do |i|
Thread.new do
loop do
item = queue.pop
break if item.nil?
puts "Consumer #{i} got: #{item}"
end
end
end
sleep 2
queue.close
sleep 1
Practical Examples
The producer-consumer pattern is the canonical use case for condition variables. Multiple producers generate data and multiple consumers process it, with a bounded buffer between them. Condition variables coordinate when the buffer is full or empty.
class BoundedBuffer
def initialize(capacity)
@capacity = capacity
@mutex = Mutex.new
@not_full = ConditionVariable.new
@not_empty = ConditionVariable.new
@buffer = []
end
def produce(item)
@mutex.synchronize do
while @buffer.size >= @capacity
@not_full.wait(@mutex)
end
@buffer << item
@not_empty.signal
end
end
def consume
@mutex.synchronize do
while @buffer.empty?
@not_empty.wait(@mutex)
end
item = @buffer.shift
@not_full.signal
item
end
end
end
buffer = BoundedBuffer.new(3)
# Multiple producers
producers = 2.times.map do |id|
Thread.new do
5.times do |i|
item = "P#{id}-#{i}"
buffer.produce(item)
puts "Produced: #{item}"
sleep rand(0.1..0.5)
end
end
end
# Multiple consumers
consumers = 3.times.map do |id|
Thread.new do
4.times do
item = buffer.consume
puts "Consumer #{id} consumed: #{item}"
sleep rand(0.2..0.6)
end
end
end
(producers + consumers).each(&:join)
A read-write lock implementation demonstrates multiple condition variables working together. Multiple readers can hold the lock simultaneously, but writers need exclusive access. Condition variables coordinate when readers can proceed and when writers can proceed.
class ReadWriteLock
def initialize
@mutex = Mutex.new
@readers_cv = ConditionVariable.new
@writers_cv = ConditionVariable.new
@readers = 0
@writers = 0
@write_requests = 0
end
def read_lock
@mutex.synchronize do
# Wait while any writer is active or waiting
while @writers > 0 || @write_requests > 0
@readers_cv.wait(@mutex)
end
@readers += 1
end
end
def read_unlock
@mutex.synchronize do
@readers -= 1
if @readers == 0
@writers_cv.signal # Wake a waiting writer
end
end
end
def write_lock
@mutex.synchronize do
@write_requests += 1
# Wait while any readers or writers are active
while @readers > 0 || @writers > 0
@writers_cv.wait(@mutex)
end
@write_requests -= 1
@writers += 1
end
end
def write_unlock
@mutex.synchronize do
@writers -= 1
# Prefer waiting writers, but if none, wake readers
if @write_requests > 0
@writers_cv.signal
else
@readers_cv.broadcast
end
end
end
end
lock = ReadWriteLock.new
counter = 0
# Reader threads
readers = 5.times.map do |id|
Thread.new do
3.times do
lock.read_lock
value = counter
puts "Reader #{id} read: #{value}"
sleep rand(0.1..0.2)
lock.read_unlock
sleep rand(0.1..0.3)
end
end
end
# Writer threads
writers = 2.times.map do |id|
Thread.new do
3.times do
lock.write_lock
counter += 1
puts "Writer #{id} wrote: #{counter}"
sleep rand(0.1..0.2)
lock.write_unlock
sleep rand(0.2..0.4)
end
end
end
(readers + writers).each(&:join)
Thread pools use condition variables to coordinate between worker threads waiting for tasks and a scheduler distributing work. Workers wait when no tasks are available, and the scheduler signals when new work arrives.
class ThreadPool
def initialize(size)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = []
@shutdown = false
@workers = size.times.map do |id|
Thread.new { worker_loop(id) }
end
end
def submit(&block)
@mutex.synchronize do
raise "Pool shutdown" if @shutdown
@queue << block
@cv.signal # Wake one worker
end
end
def shutdown
@mutex.synchronize do
@shutdown = true
@cv.broadcast # Wake all workers to exit
end
@workers.each(&:join)
end
private
def worker_loop(id)
loop do
task = nil
@mutex.synchronize do
while @queue.empty? && !@shutdown
@cv.wait(@mutex)
end
return if @shutdown && @queue.empty?
task = @queue.shift unless @queue.empty?
end
if task
begin
task.call
rescue => e
puts "Worker #{id} error: #{e.message}"
end
end
end
end
end
pool = ThreadPool.new(3)
10.times do |i|
pool.submit do
puts "Task #{i} executing on thread #{Thread.current.object_id}"
sleep rand(0.5..1.0)
puts "Task #{i} completed"
end
end
sleep 6
pool.shutdown
Common Patterns
The wait-in-loop pattern is fundamental to correct condition variable usage. The condition must be rechecked after waking from wait because of spurious wakeups and because another thread might have consumed the state change between the signal and the current thread reacquiring the mutex.
# CORRECT: Always use while loop
mutex.synchronize do
while !condition_satisfied?
cv.wait(mutex)
end
# Safe to proceed
end
# INCORRECT: Never use if statement
mutex.synchronize do
if !condition_satisfied? # Wrong!
cv.wait(mutex)
end
# Condition might not be satisfied here
end
The signal-under-lock pattern keeps the mutex held when calling signal or broadcast. This ensures the condition variable operation is atomic with the state change. While it is legal to signal without holding the mutex, doing so can lead to subtle race conditions and missed wakeups.
# Preferred pattern: signal while holding mutex
mutex.synchronize do
state_changed = true
cv.signal # Mutex still held
end
# Acceptable but can miss wakeups
state_changed = true
cv.signal # No mutex - potential race
The predicate-encapsulation pattern wraps condition checking in a method to ensure consistent condition evaluation. This prevents errors where different parts of code check the condition differently.
class SharedState
def initialize
@mutex = Mutex.new
@cv = ConditionVariable.new
@count = 0
@target = 10
end
def wait_for_target
@mutex.synchronize do
while !target_reached?
@cv.wait(@mutex)
end
end
end
def increment
@mutex.synchronize do
@count += 1
cv.signal if target_reached?
end
end
private
def target_reached?
@count >= @target
end
end
The timeout-with-predicate pattern combines timeout waits with predicate rechecking to handle both spurious wakeups and actual timeouts correctly.
def wait_with_timeout(mutex, cv, timeout)
deadline = Time.now + timeout
mutex.synchronize do
until condition_satisfied?
remaining = deadline - Time.now
return false if remaining <= 0 # Timeout expired
# Wait might return early (spurious wakeup)
cv.wait(mutex, remaining)
end
true # Condition satisfied
end
end
The broadcast-on-state-change pattern uses broadcast when a state change might affect multiple waiting threads, even if the application logic expects only one thread to proceed. This prevents deadlocks when the assumption about single waiters is wrong.
class StateManager
def initialize
@mutex = Mutex.new
@cv = ConditionVariable.new
@state = :initial
end
def transition_to(new_state)
@mutex.synchronize do
@state = new_state
@cv.broadcast # Wake all - let them recheck conditions
end
end
def wait_for_state(desired_state)
@mutex.synchronize do
while @state != desired_state
@cv.wait(@mutex)
end
end
end
end
The two-phase-wait pattern uses two condition variables to coordinate bidirectional synchronization, where each side waits for the other and signals back when ready.
class Rendezvous
def initialize
@mutex = Mutex.new
@cv_a = ConditionVariable.new
@cv_b = ConditionVariable.new
@a_ready = false
@b_ready = false
end
def arrive_a(data)
@mutex.synchronize do
@data_from_a = data
@a_ready = true
@cv_b.signal # Tell B that A is ready
while !@b_ready
@cv_a.wait(@mutex) # Wait for B
end
@data_from_b
end
end
def arrive_b(data)
@mutex.synchronize do
@data_from_b = data
@b_ready = true
@cv_a.signal # Tell A that B is ready
while !@a_ready
@cv_b.wait(@mutex) # Wait for A
end
@data_from_a
end
end
end
Common Pitfalls
Lost wakeup occurs when a signal happens before a thread calls wait. Since signals are not queued, the waiting thread never wakes up. This happens when the condition is checked and found false, but before wait is called, another thread makes the condition true and signals.
# INCORRECT: Race condition leading to lost wakeup
if !condition # Check outside mutex
mutex.synchronize do
cv.wait(mutex) # Signal might have occurred between check and wait
end
end
# CORRECT: Check condition while holding mutex
mutex.synchronize do
while !condition
cv.wait(mutex) # Check and wait are atomic
end
end
Spurious wakeup ignorance happens when code uses if instead of while for condition checking. The thread wakes, assumes the condition is true, and proceeds incorrectly. This leads to race conditions where state assumptions are violated.
# INCORRECT: Vulnerable to spurious wakeups
mutex.synchronize do
if queue.empty?
cv.wait(mutex)
end
item = queue.shift # Might still be empty!
end
# CORRECT: Recheck after wakeup
mutex.synchronize do
while queue.empty?
cv.wait(mutex)
end
item = queue.shift # Guaranteed non-empty
end
Deadlock from signal-without-broadcast occurs when multiple threads wait for a condition but only one signal is sent when the condition benefits multiple threads. The signaled thread might not be the one that should proceed, and it doesn't signal others.
# INCORRECT: Can deadlock with multiple waiters
mutex.synchronize do
shared_resource_available = true
cv.signal # Only wakes one thread, others stuck
end
# CORRECT: Wake all when multiple might proceed
mutex.synchronize do
shared_resource_available = true
cv.broadcast # All threads recheck condition
end
Mutex-mismatch occurs when the mutex used with wait differs from the mutex protecting the condition. This breaks atomicity and allows race conditions.
# INCORRECT: Different mutexes
mutex1 = Mutex.new
mutex2 = Mutex.new
cv = ConditionVariable.new
Thread.new do
mutex1.synchronize do
while !ready
cv.wait(mutex2) # Wrong mutex!
end
end
end
Thread.new do
mutex2.synchronize do
ready = true
cv.signal
end
end
The forgotten-signal pitfall happens when code modifies the condition but forgets to signal the condition variable. Waiting threads remain blocked indefinitely even though their condition is satisfied.
class Counter
def initialize
@mutex = Mutex.new
@cv = ConditionVariable.new
@count = 0
end
# INCORRECT: Missing signal
def increment_bad
@mutex.synchronize do
@count += 1
# Forgot to signal!
end
end
# CORRECT: Signal after state change
def increment_good
@mutex.synchronize do
@count += 1
@cv.signal
end
end
def wait_for(target)
@mutex.synchronize do
while @count < target
@cv.wait(@mutex)
end
end
end
end
Releasing-mutex-early happens when code manually unlocks the mutex before the condition variable operation completes, breaking the atomic release-and-wait contract.
# INCORRECT: Manual unlock breaks atomicity
mutex.lock
if !condition
mutex.unlock # Wrong!
cv.wait(mutex) # Will raise error - mutex not held
end
# CORRECT: Let wait handle mutex atomically
mutex.synchronize do
while !condition
cv.wait(mutex) # Handles unlock and relock
end
end
Condition-without-state occurs when using condition variables for pure signaling without shared state. This fails because signals are not queued - if no thread is waiting, the signal is lost.
# INCORRECT: No shared state
cv = ConditionVariable.new
mutex = Mutex.new
Thread.new do
cv.signal # Lost if no one waiting yet
end
Thread.new do
mutex.synchronize do
cv.wait(mutex) # Might wait forever
end
end
# CORRECT: Use shared state
cv = ConditionVariable.new
mutex = Mutex.new
signaled = false
Thread.new do
mutex.synchronize do
signaled = true
cv.signal
end
end
Thread.new do
mutex.synchronize do
while !signaled
cv.wait(mutex)
end
end
end
Reference
Core Operations
| Operation | Description | Mutex Requirement |
|---|---|---|
| wait(mutex) | Release mutex atomically and wait for signal | Must hold mutex |
| wait(mutex, timeout) | Wait with timeout in seconds, returns false on timeout | Must hold mutex |
| signal | Wake one waiting thread | Recommended to hold mutex |
| broadcast | Wake all waiting threads | Recommended to hold mutex |
State Transitions
| Thread State | Trigger | Next State | Mutex Status |
|---|---|---|---|
| Running | Calls wait | Waiting | Released |
| Waiting | Receives signal | Ready | Attempting reacquire |
| Waiting | Timeout expires | Ready | Attempting reacquire |
| Ready | Acquires mutex | Running | Held |
Wait Loop Template
mutex.synchronize do
while !condition_predicate?
cv.wait(mutex)
end
# Condition guaranteed true here
# Mutex is held
end
Signal Under Lock Template
mutex.synchronize do
modify_shared_state()
if single_waiter_expected?
cv.signal
else
cv.broadcast
end
end
Timeout Wait Template
mutex.synchronize do
deadline = Time.now + timeout_seconds
until condition_satisfied?
remaining = deadline - Time.now
return :timeout if remaining <= 0
cv.wait(mutex, remaining)
end
:success
end
Common Synchronization Patterns
| Pattern | Use Case | Condition Variables |
|---|---|---|
| Producer-Consumer | Bounded buffer coordination | Two: not_full and not_empty |
| Read-Write Lock | Multiple readers or exclusive writer | Two: readers and writers |
| Barrier | Wait for all threads to reach point | One: barrier_cv |
| Thread Pool | Workers wait for tasks | One: task_available |
| Semaphore | Resource counting | One: resource_available |
Decision Matrix
| Scenario | Use Signal | Use Broadcast |
|---|---|---|
| Single waiter expected | Yes | Optional |
| Multiple potential waiters | No | Yes |
| All waiters should recheck | No | Yes |
| Implementing barrier | No | Yes |
| Queue with one consumer | Yes | No |
| State change affects many | No | Yes |
| Shutdown notification | No | Yes |
Debugging Checklist
| Check | Correct Behavior | Common Error |
|---|---|---|
| Wait in loop | while loop used | if statement used |
| Mutex consistency | Same mutex for condition and state | Different mutexes |
| Signal presence | Signal called after state change | Signal forgotten |
| Predicate existence | Shared state exists | Pure signaling without state |
| Timeout handling | Recheck condition after timeout | Assume timeout means failure |
| Exception safety | Mutex reacquired on exception | Manual unlock |
Ruby-Specific Details
| Aspect | Behavior | Notes |
|---|---|---|
| Thread safety | Internally synchronized | Safe for concurrent use |
| Exception handling | Mutex reacquired even on exception | Always cleanup-safe |
| Timeout precision | System-dependent | Use Time.now for accuracy |
| Signal semantics | Lost if no waiters | Not queued |
| Broadcast cost | Wakes all waiters | Consider thundering herd |
| GVL interaction | Releases GVL during wait | Allows other Ruby threads to run |
Performance Characteristics
| Operation | Relative Cost | Scalability |
|---|---|---|
| signal | Low | O(1) |
| broadcast | Medium | O(number of waiters) |
| wait | High | Requires context switch |
| wait with timeout | Higher | Additional timer management |
| Spurious wakeup | High | Requires full recheck |