CrackedRuby CrackedRuby

Overview

Condition variables are synchronization primitives that allow threads to suspend execution and release a lock atomically, waiting for a condition to be signaled by another thread. They solve a fundamental problem in concurrent programming: how to coordinate threads when one thread must wait for a specific state change that another thread will cause.

Without condition variables, threads must use busy-waiting (continuously checking a condition in a loop), which wastes CPU cycles. Condition variables provide an efficient mechanism for threads to sleep until notified, enabling event-driven synchronization where threads wait for specific events rather than polling for state changes.

Condition variables always work in conjunction with a mutex. The mutex protects the shared state that defines the condition, while the condition variable provides the wait and notification mechanism. This pairing ensures that checking the condition and waiting on it occurs atomically, preventing race conditions.

require 'thread'

mutex = Mutex.new
condition = ConditionVariable.new
queue = []

# Producer thread
Thread.new do
  10.times do |i|
    mutex.synchronize do
      queue << i
      condition.signal  # Notify waiting consumer
    end
    sleep 0.1
  end
end

# Consumer thread
Thread.new do
  10.times do
    mutex.synchronize do
      while queue.empty?
        condition.wait(mutex)  # Release mutex and wait
      end
      item = queue.shift
      puts "Consumed: #{item}"
    end
  end
end

sleep 2

The condition variable bridges the gap between thread communication and state synchronization. When a thread waits on a condition variable, it atomically releases its held mutex and enters a waiting state. When another thread signals the condition variable, one or more waiting threads wake up, reacquire the mutex, and continue execution. This atomic release-and-wait operation prevents the race condition where a notification could be lost between checking the condition and waiting.

Key Principles

Condition variables operate on three fundamental operations: wait, signal, and broadcast. Each operation has specific semantics that maintain the correctness of concurrent programs.

The wait operation is the most complex. When a thread calls wait on a condition variable, three things happen atomically: the thread checks that it holds the associated mutex, releases the mutex, and enters a sleep state on the condition variable's wait queue. The atomicity is critical because it prevents the window where a thread could release the mutex and another thread could signal the condition before the first thread enters the wait state. When the waiting thread is awakened, it must reacquire the mutex before returning from the wait call.

mutex = Mutex.new
cv = ConditionVariable.new
ready = false

# Waiting thread
Thread.new do
  mutex.synchronize do
    while !ready  # Always use while loop, not if
      cv.wait(mutex)  # Atomically releases mutex and waits
    end
    # Mutex is held again here
    puts "Condition satisfied"
  end
end

# Signaling thread
Thread.new do
  sleep 0.5
  mutex.synchronize do
    ready = true
    cv.signal  # Wake one waiting thread
  end
end

sleep 1

The signal operation wakes exactly one thread from the condition variable's wait queue. If no threads are waiting, the signal has no effect and is lost. The awakened thread does not immediately resume execution; it first must reacquire the mutex that it released during the wait call. This means if the signaling thread still holds the mutex when it calls signal, the awakened thread will block waiting for the mutex.

The broadcast operation wakes all threads waiting on the condition variable. This is necessary when the state change that occurred might satisfy conditions for multiple waiting threads, or when the condition needs to be reevaluated by all waiters. After a broadcast, all awakened threads compete to reacquire the mutex, and only one thread at a time can proceed while others wait for the mutex.

mutex = Mutex.new
cv = ConditionVariable.new
counter = 0
threshold = 5

# Multiple waiting threads
3.times do |i|
  Thread.new do
    mutex.synchronize do
      while counter < threshold
        puts "Thread #{i} waiting"
        cv.wait(mutex)
      end
      puts "Thread #{i} proceeding with counter=#{counter}"
    end
  end
end

# Thread that updates condition
Thread.new do
  sleep 1
  mutex.synchronize do
    counter = 10
    cv.broadcast  # Wake all waiting threads
  end
end

sleep 2

Spurious wakeups are a critical concept with condition variables. A waiting thread can wake up from a wait call even when no signal or broadcast occurred. This happens due to implementation details in the underlying operating system or runtime. For this reason, wait calls must always be in a while loop that rechecks the actual condition, never in a simple if statement. The pattern is always:

mutex.synchronize do
  while !condition_is_true
    cv.wait(mutex)
  end
  # Proceed with work
end

The predicate (condition being tested) must be protected by the same mutex used with the condition variable. This ensures that checking the condition and waiting on the condition variable are atomic with respect to threads that modify the condition and signal the condition variable. Without this protection, a race condition exists where a thread could check the condition (finding it false), then be preempted before calling wait, while another thread makes the condition true and signals, causing the signal to be lost.

Ruby Implementation

Ruby provides the ConditionVariable class in the standard library, which works with Mutex objects to provide thread synchronization. The implementation follows the standard condition variable semantics but with Ruby's threading model built on native system threads.

The basic API consists of three methods: wait, signal, and broadcast. The wait method accepts a mutex parameter and optionally a timeout value. The method releases the mutex, waits for a signal or timeout, then reacquires the mutex before returning.

require 'thread'

mutex = Mutex.new
cv = ConditionVariable.new
data = []
max_size = 5

# Producer with bounded buffer
producer = Thread.new do
  10.times do |i|
    mutex.synchronize do
      while data.size >= max_size
        puts "Producer waiting - buffer full"
        cv.wait(mutex)
      end
      data << i
      puts "Produced: #{i}, buffer size: #{data.size}"
      cv.signal  # Signal consumer
    end
    sleep rand(0.1..0.3)
  end
end

# Consumer
consumer = Thread.new do
  10.times do
    mutex.synchronize do
      while data.empty?
        puts "Consumer waiting - buffer empty"
        cv.wait(mutex)
      end
      item = data.shift
      puts "Consumed: #{item}, buffer size: #{data.size}"
      cv.signal  # Signal producer
    end
    sleep rand(0.1..0.3)
  end
end

producer.join
consumer.join

Ruby's ConditionVariable#wait accepts an optional timeout parameter. When a timeout is specified, the method returns even if no signal occurred, and the return value indicates whether a timeout occurred. This allows implementing bounded waiting without deadlock risk.

mutex = Mutex.new
cv = ConditionVariable.new
resource_available = false

Thread.new do
  mutex.synchronize do
    timeout_occurred = false
    
    until resource_available || timeout_occurred
      timeout_occurred = !cv.wait(mutex, 2.0)  # Wait up to 2 seconds
      
      if timeout_occurred && !resource_available
        puts "Timeout waiting for resource"
        # Handle timeout case
        break
      end
    end
    
    if resource_available
      puts "Resource acquired"
    end
  end
end

sleep 1
mutex.synchronize do
  resource_available = true
  cv.signal
end

sleep 3

The broadcast method wakes all waiting threads, which then compete for the mutex. This is essential when multiple threads might be affected by a state change or when implementing barriers where all threads must reach a synchronization point.

mutex = Mutex.new
cv = ConditionVariable.new
barrier_count = 0
thread_count = 4

threads = thread_count.times.map do |i|
  Thread.new do
    # Phase 1: Do some work
    sleep rand(0.5..1.5)
    puts "Thread #{i} reached barrier"
    
    # Wait at barrier
    mutex.synchronize do
      barrier_count += 1
      if barrier_count == thread_count
        # Last thread to arrive
        cv.broadcast  # Release all waiting threads
      else
        while barrier_count < thread_count
          cv.wait(mutex)
        end
      end
    end
    
    # Phase 2: Continue after all threads synchronized
    puts "Thread #{i} past barrier"
  end
end

threads.each(&:join)

Ruby's condition variables integrate with the Mutex class to ensure proper synchronization. The mutex must be locked when calling wait, and wait atomically unlocks it. The mutex is always relocked before wait returns, even if an exception is raised during the wait.

class ThreadSafeQueue
  def initialize
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @queue = []
    @closed = false
  end
  
  def push(item)
    @mutex.synchronize do
      raise "Queue closed" if @closed
      @queue << item
      @cv.signal  # Wake one waiting consumer
    end
  end
  
  def pop
    @mutex.synchronize do
      while @queue.empty? && !@closed
        @cv.wait(@mutex)
      end
      
      if @queue.empty? && @closed
        nil  # Queue drained and closed
      else
        @queue.shift
      end
    end
  end
  
  def close
    @mutex.synchronize do
      @closed = true
      @cv.broadcast  # Wake all waiting consumers
    end
  end
end

queue = ThreadSafeQueue.new

# Producer threads
3.times do |i|
  Thread.new do
    5.times do |j|
      queue.push("Item-#{i}-#{j}")
      sleep 0.1
    end
  end
end

# Consumer threads
2.times do |i|
  Thread.new do
    loop do
      item = queue.pop
      break if item.nil?
      puts "Consumer #{i} got: #{item}"
    end
  end
end

sleep 2
queue.close
sleep 1

Practical Examples

The producer-consumer pattern is the canonical use case for condition variables. Multiple producers generate data and multiple consumers process it, with a bounded buffer between them. Condition variables coordinate when the buffer is full or empty.

class BoundedBuffer
  def initialize(capacity)
    @capacity = capacity
    @mutex = Mutex.new
    @not_full = ConditionVariable.new
    @not_empty = ConditionVariable.new
    @buffer = []
  end
  
  def produce(item)
    @mutex.synchronize do
      while @buffer.size >= @capacity
        @not_full.wait(@mutex)
      end
      
      @buffer << item
      @not_empty.signal
    end
  end
  
  def consume
    @mutex.synchronize do
      while @buffer.empty?
        @not_empty.wait(@mutex)
      end
      
      item = @buffer.shift
      @not_full.signal
      item
    end
  end
end

buffer = BoundedBuffer.new(3)

# Multiple producers
producers = 2.times.map do |id|
  Thread.new do
    5.times do |i|
      item = "P#{id}-#{i}"
      buffer.produce(item)
      puts "Produced: #{item}"
      sleep rand(0.1..0.5)
    end
  end
end

# Multiple consumers
consumers = 3.times.map do |id|
  Thread.new do
    4.times do
      item = buffer.consume
      puts "Consumer #{id} consumed: #{item}"
      sleep rand(0.2..0.6)
    end
  end
end

(producers + consumers).each(&:join)

A read-write lock implementation demonstrates multiple condition variables working together. Multiple readers can hold the lock simultaneously, but writers need exclusive access. Condition variables coordinate when readers can proceed and when writers can proceed.

class ReadWriteLock
  def initialize
    @mutex = Mutex.new
    @readers_cv = ConditionVariable.new
    @writers_cv = ConditionVariable.new
    @readers = 0
    @writers = 0
    @write_requests = 0
  end
  
  def read_lock
    @mutex.synchronize do
      # Wait while any writer is active or waiting
      while @writers > 0 || @write_requests > 0
        @readers_cv.wait(@mutex)
      end
      @readers += 1
    end
  end
  
  def read_unlock
    @mutex.synchronize do
      @readers -= 1
      if @readers == 0
        @writers_cv.signal  # Wake a waiting writer
      end
    end
  end
  
  def write_lock
    @mutex.synchronize do
      @write_requests += 1
      
      # Wait while any readers or writers are active
      while @readers > 0 || @writers > 0
        @writers_cv.wait(@mutex)
      end
      
      @write_requests -= 1
      @writers += 1
    end
  end
  
  def write_unlock
    @mutex.synchronize do
      @writers -= 1
      
      # Prefer waiting writers, but if none, wake readers
      if @write_requests > 0
        @writers_cv.signal
      else
        @readers_cv.broadcast
      end
    end
  end
end

lock = ReadWriteLock.new
counter = 0

# Reader threads
readers = 5.times.map do |id|
  Thread.new do
    3.times do
      lock.read_lock
      value = counter
      puts "Reader #{id} read: #{value}"
      sleep rand(0.1..0.2)
      lock.read_unlock
      sleep rand(0.1..0.3)
    end
  end
end

# Writer threads
writers = 2.times.map do |id|
  Thread.new do
    3.times do
      lock.write_lock
      counter += 1
      puts "Writer #{id} wrote: #{counter}"
      sleep rand(0.1..0.2)
      lock.write_unlock
      sleep rand(0.2..0.4)
    end
  end
end

(readers + writers).each(&:join)

Thread pools use condition variables to coordinate between worker threads waiting for tasks and a scheduler distributing work. Workers wait when no tasks are available, and the scheduler signals when new work arrives.

class ThreadPool
  def initialize(size)
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @queue = []
    @shutdown = false
    
    @workers = size.times.map do |id|
      Thread.new { worker_loop(id) }
    end
  end
  
  def submit(&block)
    @mutex.synchronize do
      raise "Pool shutdown" if @shutdown
      @queue << block
      @cv.signal  # Wake one worker
    end
  end
  
  def shutdown
    @mutex.synchronize do
      @shutdown = true
      @cv.broadcast  # Wake all workers to exit
    end
    @workers.each(&:join)
  end
  
  private
  
  def worker_loop(id)
    loop do
      task = nil
      
      @mutex.synchronize do
        while @queue.empty? && !@shutdown
          @cv.wait(@mutex)
        end
        
        return if @shutdown && @queue.empty?
        task = @queue.shift unless @queue.empty?
      end
      
      if task
        begin
          task.call
        rescue => e
          puts "Worker #{id} error: #{e.message}"
        end
      end
    end
  end
end

pool = ThreadPool.new(3)

10.times do |i|
  pool.submit do
    puts "Task #{i} executing on thread #{Thread.current.object_id}"
    sleep rand(0.5..1.0)
    puts "Task #{i} completed"
  end
end

sleep 6
pool.shutdown

Common Patterns

The wait-in-loop pattern is fundamental to correct condition variable usage. The condition must be rechecked after waking from wait because of spurious wakeups and because another thread might have consumed the state change between the signal and the current thread reacquiring the mutex.

# CORRECT: Always use while loop
mutex.synchronize do
  while !condition_satisfied?
    cv.wait(mutex)
  end
  # Safe to proceed
end

# INCORRECT: Never use if statement
mutex.synchronize do
  if !condition_satisfied?  # Wrong!
    cv.wait(mutex)
  end
  # Condition might not be satisfied here
end

The signal-under-lock pattern keeps the mutex held when calling signal or broadcast. This ensures the condition variable operation is atomic with the state change. While it is legal to signal without holding the mutex, doing so can lead to subtle race conditions and missed wakeups.

# Preferred pattern: signal while holding mutex
mutex.synchronize do
  state_changed = true
  cv.signal  # Mutex still held
end

# Acceptable but can miss wakeups
state_changed = true
cv.signal  # No mutex - potential race

The predicate-encapsulation pattern wraps condition checking in a method to ensure consistent condition evaluation. This prevents errors where different parts of code check the condition differently.

class SharedState
  def initialize
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @count = 0
    @target = 10
  end
  
  def wait_for_target
    @mutex.synchronize do
      while !target_reached?
        @cv.wait(@mutex)
      end
    end
  end
  
  def increment
    @mutex.synchronize do
      @count += 1
      cv.signal if target_reached?
    end
  end
  
  private
  
  def target_reached?
    @count >= @target
  end
end

The timeout-with-predicate pattern combines timeout waits with predicate rechecking to handle both spurious wakeups and actual timeouts correctly.

def wait_with_timeout(mutex, cv, timeout)
  deadline = Time.now + timeout
  
  mutex.synchronize do
    until condition_satisfied?
      remaining = deadline - Time.now
      return false if remaining <= 0  # Timeout expired
      
      # Wait might return early (spurious wakeup)
      cv.wait(mutex, remaining)
    end
    true  # Condition satisfied
  end
end

The broadcast-on-state-change pattern uses broadcast when a state change might affect multiple waiting threads, even if the application logic expects only one thread to proceed. This prevents deadlocks when the assumption about single waiters is wrong.

class StateManager
  def initialize
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @state = :initial
  end
  
  def transition_to(new_state)
    @mutex.synchronize do
      @state = new_state
      @cv.broadcast  # Wake all - let them recheck conditions
    end
  end
  
  def wait_for_state(desired_state)
    @mutex.synchronize do
      while @state != desired_state
        @cv.wait(@mutex)
      end
    end
  end
end

The two-phase-wait pattern uses two condition variables to coordinate bidirectional synchronization, where each side waits for the other and signals back when ready.

class Rendezvous
  def initialize
    @mutex = Mutex.new
    @cv_a = ConditionVariable.new
    @cv_b = ConditionVariable.new
    @a_ready = false
    @b_ready = false
  end
  
  def arrive_a(data)
    @mutex.synchronize do
      @data_from_a = data
      @a_ready = true
      @cv_b.signal  # Tell B that A is ready
      
      while !@b_ready
        @cv_a.wait(@mutex)  # Wait for B
      end
      
      @data_from_b
    end
  end
  
  def arrive_b(data)
    @mutex.synchronize do
      @data_from_b = data
      @b_ready = true
      @cv_a.signal  # Tell A that B is ready
      
      while !@a_ready
        @cv_b.wait(@mutex)  # Wait for A
      end
      
      @data_from_a
    end
  end
end

Common Pitfalls

Lost wakeup occurs when a signal happens before a thread calls wait. Since signals are not queued, the waiting thread never wakes up. This happens when the condition is checked and found false, but before wait is called, another thread makes the condition true and signals.

# INCORRECT: Race condition leading to lost wakeup
if !condition  # Check outside mutex
  mutex.synchronize do
    cv.wait(mutex)  # Signal might have occurred between check and wait
  end
end

# CORRECT: Check condition while holding mutex
mutex.synchronize do
  while !condition
    cv.wait(mutex)  # Check and wait are atomic
  end
end

Spurious wakeup ignorance happens when code uses if instead of while for condition checking. The thread wakes, assumes the condition is true, and proceeds incorrectly. This leads to race conditions where state assumptions are violated.

# INCORRECT: Vulnerable to spurious wakeups
mutex.synchronize do
  if queue.empty?
    cv.wait(mutex)
  end
  item = queue.shift  # Might still be empty!
end

# CORRECT: Recheck after wakeup
mutex.synchronize do
  while queue.empty?
    cv.wait(mutex)
  end
  item = queue.shift  # Guaranteed non-empty
end

Deadlock from signal-without-broadcast occurs when multiple threads wait for a condition but only one signal is sent when the condition benefits multiple threads. The signaled thread might not be the one that should proceed, and it doesn't signal others.

# INCORRECT: Can deadlock with multiple waiters
mutex.synchronize do
  shared_resource_available = true
  cv.signal  # Only wakes one thread, others stuck
end

# CORRECT: Wake all when multiple might proceed
mutex.synchronize do
  shared_resource_available = true
  cv.broadcast  # All threads recheck condition
end

Mutex-mismatch occurs when the mutex used with wait differs from the mutex protecting the condition. This breaks atomicity and allows race conditions.

# INCORRECT: Different mutexes
mutex1 = Mutex.new
mutex2 = Mutex.new
cv = ConditionVariable.new

Thread.new do
  mutex1.synchronize do
    while !ready
      cv.wait(mutex2)  # Wrong mutex!
    end
  end
end

Thread.new do
  mutex2.synchronize do
    ready = true
    cv.signal
  end
end

The forgotten-signal pitfall happens when code modifies the condition but forgets to signal the condition variable. Waiting threads remain blocked indefinitely even though their condition is satisfied.

class Counter
  def initialize
    @mutex = Mutex.new
    @cv = ConditionVariable.new
    @count = 0
  end
  
  # INCORRECT: Missing signal
  def increment_bad
    @mutex.synchronize do
      @count += 1
      # Forgot to signal!
    end
  end
  
  # CORRECT: Signal after state change
  def increment_good
    @mutex.synchronize do
      @count += 1
      @cv.signal
    end
  end
  
  def wait_for(target)
    @mutex.synchronize do
      while @count < target
        @cv.wait(@mutex)
      end
    end
  end
end

Releasing-mutex-early happens when code manually unlocks the mutex before the condition variable operation completes, breaking the atomic release-and-wait contract.

# INCORRECT: Manual unlock breaks atomicity
mutex.lock
if !condition
  mutex.unlock  # Wrong!
  cv.wait(mutex)  # Will raise error - mutex not held
end

# CORRECT: Let wait handle mutex atomically
mutex.synchronize do
  while !condition
    cv.wait(mutex)  # Handles unlock and relock
  end
end

Condition-without-state occurs when using condition variables for pure signaling without shared state. This fails because signals are not queued - if no thread is waiting, the signal is lost.

# INCORRECT: No shared state
cv = ConditionVariable.new
mutex = Mutex.new

Thread.new do
  cv.signal  # Lost if no one waiting yet
end

Thread.new do
  mutex.synchronize do
    cv.wait(mutex)  # Might wait forever
  end
end

# CORRECT: Use shared state
cv = ConditionVariable.new
mutex = Mutex.new
signaled = false

Thread.new do
  mutex.synchronize do
    signaled = true
    cv.signal
  end
end

Thread.new do
  mutex.synchronize do
    while !signaled
      cv.wait(mutex)
    end
  end
end

Reference

Core Operations

Operation Description Mutex Requirement
wait(mutex) Release mutex atomically and wait for signal Must hold mutex
wait(mutex, timeout) Wait with timeout in seconds, returns false on timeout Must hold mutex
signal Wake one waiting thread Recommended to hold mutex
broadcast Wake all waiting threads Recommended to hold mutex

State Transitions

Thread State Trigger Next State Mutex Status
Running Calls wait Waiting Released
Waiting Receives signal Ready Attempting reacquire
Waiting Timeout expires Ready Attempting reacquire
Ready Acquires mutex Running Held

Wait Loop Template

mutex.synchronize do
  while !condition_predicate?
    cv.wait(mutex)
  end
  # Condition guaranteed true here
  # Mutex is held
end

Signal Under Lock Template

mutex.synchronize do
  modify_shared_state()
  
  if single_waiter_expected?
    cv.signal
  else
    cv.broadcast
  end
end

Timeout Wait Template

mutex.synchronize do
  deadline = Time.now + timeout_seconds
  
  until condition_satisfied?
    remaining = deadline - Time.now
    return :timeout if remaining <= 0
    
    cv.wait(mutex, remaining)
  end
  
  :success
end

Common Synchronization Patterns

Pattern Use Case Condition Variables
Producer-Consumer Bounded buffer coordination Two: not_full and not_empty
Read-Write Lock Multiple readers or exclusive writer Two: readers and writers
Barrier Wait for all threads to reach point One: barrier_cv
Thread Pool Workers wait for tasks One: task_available
Semaphore Resource counting One: resource_available

Decision Matrix

Scenario Use Signal Use Broadcast
Single waiter expected Yes Optional
Multiple potential waiters No Yes
All waiters should recheck No Yes
Implementing barrier No Yes
Queue with one consumer Yes No
State change affects many No Yes
Shutdown notification No Yes

Debugging Checklist

Check Correct Behavior Common Error
Wait in loop while loop used if statement used
Mutex consistency Same mutex for condition and state Different mutexes
Signal presence Signal called after state change Signal forgotten
Predicate existence Shared state exists Pure signaling without state
Timeout handling Recheck condition after timeout Assume timeout means failure
Exception safety Mutex reacquired on exception Manual unlock

Ruby-Specific Details

Aspect Behavior Notes
Thread safety Internally synchronized Safe for concurrent use
Exception handling Mutex reacquired even on exception Always cleanup-safe
Timeout precision System-dependent Use Time.now for accuracy
Signal semantics Lost if no waiters Not queued
Broadcast cost Wakes all waiters Consider thundering herd
GVL interaction Releases GVL during wait Allows other Ruby threads to run

Performance Characteristics

Operation Relative Cost Scalability
signal Low O(1)
broadcast Medium O(number of waiters)
wait High Requires context switch
wait with timeout Higher Additional timer management
Spurious wakeup High Requires full recheck