CrackedRuby logo

CrackedRuby

Barrier Synchronization

Barrier synchronization in Ruby provides coordination mechanisms for concurrent execution where multiple threads must reach a synchronization point before any can proceed.

Concurrency and Parallelism Synchronization
6.4.4

Overview

Barrier synchronization coordinates multiple threads by creating synchronization points where execution pauses until all participating threads arrive. Ruby implements barrier synchronization through the concurrent-ruby gem's Concurrent::CyclicBarrier and Concurrent::CountDownLatch classes, along with built-in primitives like Mutex and ConditionVariable for custom implementations.

A barrier acts as a gate that remains closed until a predetermined number of threads reach it. Once all threads arrive, the barrier opens and allows all threads to proceed simultaneously. This pattern proves essential in parallel algorithms where threads must complete specific phases before moving to the next stage.

require 'concurrent'

barrier = Concurrent::CyclicBarrier.new(3)

3.times do |i|
  Thread.new do
    puts "Thread #{i} starting work"
    sleep(rand(1..3))  # Simulate work
    puts "Thread #{i} finished work, waiting at barrier"
    barrier.wait
    puts "Thread #{i} proceeding after barrier"
  end
end

sleep(5)  # Let threads complete

The CyclicBarrier resets automatically after all threads pass through, making it reusable for multiple synchronization rounds. The CountDownLatch provides single-use barrier functionality where threads wait until a counter reaches zero.

latch = Concurrent::CountDownLatch.new(2)

worker_threads = 2.times.map do |i|
  Thread.new do
    puts "Worker #{i} completing task"
    sleep(rand(1..2))
    latch.count_down
  end
end

main_thread = Thread.new do
  latch.wait
  puts "All workers completed, main thread proceeding"
end

[worker_threads, main_thread].flatten.each(&:join)

Ruby's barrier synchronization supports both blocking and non-blocking operations, timeout mechanisms, and exception handling across thread boundaries. These primitives form the foundation for complex concurrent algorithms including parallel map-reduce operations, phased computations, and coordinated resource management.

Basic Usage

The Concurrent::CyclicBarrier creates reusable synchronization points for a fixed number of threads. Initialize with the required thread count and optional barrier action that executes when all threads arrive.

require 'concurrent'

# Create barrier for 4 threads
barrier = Concurrent::CyclicBarrier.new(4)

threads = 4.times.map do |id|
  Thread.new do
    # Phase 1: Individual work
    puts "Thread #{id}: Starting phase 1"
    sleep(rand(0.5..1.5))
    puts "Thread #{id}: Phase 1 complete"
    
    # Synchronization point
    barrier.wait
    
    # Phase 2: Coordinated work
    puts "Thread #{id}: Starting phase 2"
    sleep(rand(0.5..1.0))
    puts "Thread #{id}: All phases complete"
  end
end

threads.each(&:join)

The barrier action executes once when all threads arrive, running in one of the waiting threads. This proves useful for phase transitions or shared state updates.

barrier = Concurrent::CyclicBarrier.new(3) do
  puts "All threads reached barrier - transitioning to next phase"
end

data_processor = Array.new(3) { [] }

threads = 3.times.map do |worker_id|
  Thread.new do
    10.times do |round|
      # Process data chunk
      data_processor[worker_id] << "Round #{round} data"
      
      # Wait for all workers to complete round
      barrier.wait
      
      # All workers have completed this round
    end
  end
end

threads.each(&:join)
puts "Final data: #{data_processor}"

CountDownLatch provides one-time coordination where multiple threads signal completion to waiting threads. The latch starts with a count and decrements until reaching zero.

task_count = 5
latch = Concurrent::CountDownLatch.new(task_count)

# Create worker threads
workers = task_count.times.map do |task_id|
  Thread.new do
    puts "Task #{task_id}: Processing..."
    sleep(rand(1..3))
    puts "Task #{task_id}: Complete"
    latch.count_down  # Signal completion
  end
end

# Main thread waits for all tasks
puts "Waiting for all tasks to complete..."
latch.wait
puts "All tasks finished!"

workers.each(&:join)

Both barrier types support timeout operations to prevent indefinite blocking. The wait method accepts timeout values and returns true if successful or false if timeout occurred.

barrier = Concurrent::CyclicBarrier.new(2)

Thread.new do
  result = barrier.wait(2.0)  # Wait maximum 2 seconds
  if result
    puts "Barrier opened successfully"
  else
    puts "Timeout occurred waiting for barrier"
  end
end

sleep(3)  # Simulate delayed second thread
Thread.new { barrier.wait }
sleep(1)

Thread Safety & Concurrency

Barrier synchronization primitives maintain internal thread safety through atomic operations and proper memory synchronization. However, application code must handle shared state access correctly around barrier points.

The CyclicBarrier guarantees that barrier actions execute with proper memory visibility. All writes performed by threads before reaching the barrier become visible to threads proceeding after the barrier opens.

barrier = Concurrent::CyclicBarrier.new(3)
shared_data = Concurrent::Array.new

threads = 3.times.map do |id|
  Thread.new do
    # Phase 1: Each thread modifies shared data
    10.times { |i| shared_data << "Thread #{id}, item #{i}" }
    
    # Barrier ensures all modifications are visible
    barrier.wait
    
    # Phase 2: All threads see complete shared_data
    puts "Thread #{id} sees #{shared_data.length} items"
    
    barrier.wait
    
    # Phase 3: Safe to process complete dataset
    thread_items = shared_data.select { |item| item.include?("Thread #{id}") }
    puts "Thread #{id} processed #{thread_items.length} own items"
  end
end

threads.each(&:join)

Race conditions can occur when threads access shared resources without proper synchronization beyond the barrier points. Combine barriers with mutexes or thread-safe collections for complete safety.

barrier = Concurrent::CyclicBarrier.new(4)
mutex = Mutex.new
results = []

worker_threads = 4.times.map do |worker_id|
  Thread.new do
    local_results = []
    
    # Phase 1: Independent computation
    100.times { |i| local_results << worker_id * 100 + i }
    
    barrier.wait  # Synchronize completion of phase 1
    
    # Phase 2: Thread-safe result aggregation
    mutex.synchronize do
      results.concat(local_results)
    end
    
    barrier.wait  # Synchronize completion of phase 2
    
    # Phase 3: Process complete results
    worker_share = results.select { |r| r / 100 == worker_id }
    puts "Worker #{worker_id}: processed #{worker_share.length} results"
  end
end

worker_threads.each(&:join)
puts "Total results: #{results.length}"

Exception handling in barrier synchronization requires careful consideration. If a thread encounters an exception before reaching the barrier, other threads may wait indefinitely unless proper cleanup occurs.

barrier = Concurrent::CyclicBarrier.new(3)
exception_barrier = Concurrent::CountDownLatch.new(1)

threads = 3.times.map do |id|
  Thread.new do
    begin
      if id == 1
        raise StandardError, "Simulated worker failure"
      end
      
      sleep(rand(1..2))  # Simulate work
      puts "Thread #{id} reaching barrier"
      barrier.wait
      puts "Thread #{id} passed barrier"
      
    rescue => e
      puts "Thread #{id} failed: #{e.message}"
      exception_barrier.count_down  # Signal failure to other threads
      raise
    end
  end
end

# Monitor for exceptions
exception_thread = Thread.new do
  if exception_barrier.wait(3.0)
    puts "Exception detected, cleaning up remaining threads"
    threads.each { |t| t.raise(Interrupt) if t.alive? }
  end
end

threads.each do |t|
  begin
    t.join
  rescue => e
    puts "Thread terminated: #{e.class}"
  end
end

exception_thread.join

Barrier synchronization works effectively with other concurrency patterns. Thread pools can coordinate batch processing where worker threads synchronize between processing phases.

require 'concurrent'

class PhaseCoordinator
  def initialize(thread_count)
    @barrier = Concurrent::CyclicBarrier.new(thread_count)
    @phase_data = Concurrent::Hash.new
  end
  
  def wait_for_phase_completion(phase_name, data = nil)
    Thread.current[:phase_data] = data if data
    @barrier.wait
    
    # Collect data from all threads
    all_data = Thread.list
      .select { |t| t[:phase_data] }
      .map { |t| t[:phase_data] }
      .compact
    
    Thread.current[:phase_data] = nil
    all_data
  end
end

coordinator = PhaseCoordinator.new(3)

threads = 3.times.map do |id|
  Thread.new do
    # Phase 1: Data collection
    local_data = Array.new(10) { rand(100) }
    collected_data = coordinator.wait_for_phase_completion("collection", local_data)
    
    # Phase 2: Data analysis
    analysis = collected_data.flatten.sum / collected_data.flatten.length.to_f
    analysis_results = coordinator.wait_for_phase_completion("analysis", analysis)
    
    # Phase 3: Report generation
    puts "Thread #{id}: Average from #{analysis_results.length} analyses: #{analysis_results.sum / analysis_results.length}"
    coordinator.wait_for_phase_completion("reporting")
  end
end

threads.each(&:join)

Error Handling & Debugging

Barrier synchronization introduces specific error conditions that require targeted debugging strategies. Thread starvation occurs when fewer threads than expected reach the barrier, causing indefinite blocking.

require 'timeout'

class DebuggableBarrier
  def initialize(expected_threads, name = "barrier")
    @barrier = Concurrent::CyclicBarrier.new(expected_threads)
    @expected = expected_threads
    @name = name
    @waiting_threads = Concurrent::Set.new
  end
  
  def wait(timeout = nil)
    thread_id = Thread.current.object_id
    @waiting_threads.add(thread_id)
    
    puts "[#{@name}] Thread #{thread_id} waiting (#{@waiting_threads.size}/#{@expected})"
    
    begin
      if timeout
        result = @barrier.wait(timeout)
        unless result
          puts "[#{@name}] Timeout! Waiting threads: #{@waiting_threads.to_a}"
          debug_barrier_state
        end
        result
      else
        @barrier.wait
      end
    ensure
      @waiting_threads.delete(thread_id)
    end
  end
  
  private
  
  def debug_barrier_state
    puts "[#{@name}] Expected: #{@expected}, Currently waiting: #{@waiting_threads.size}"
    puts "[#{@name}] Waiting thread IDs: #{@waiting_threads.to_a}"
    puts "[#{@name}] All threads: #{Thread.list.map(&:object_id)}"
  end
end

# Demonstrate debugging thread starvation
barrier = DebuggableBarrier.new(3, "test_barrier")

threads = 2.times.map do |id|  # Only create 2 threads instead of 3
  Thread.new do
    puts "Thread #{id} starting"
    sleep(rand(1..2))
    barrier.wait(5.0)  # Will timeout
    puts "Thread #{id} completed"
  end
end

threads.each(&:join)

Exception propagation across barrier boundaries requires careful handling to prevent thread leakage and resource cleanup issues.

class ExceptionSafeBarrier
  def initialize(thread_count)
    @barrier = Concurrent::CyclicBarrier.new(thread_count)
    @exception_occurred = Concurrent::AtomicBoolean.new(false)
    @exception_details = Concurrent::AtomicReference.new
  end
  
  def wait_with_exception_handling
    if @exception_occurred.value
      stored_exception = @exception_details.value
      raise stored_exception if stored_exception
    end
    
    @barrier.wait
  rescue => e
    # Store first exception that occurs
    unless @exception_occurred.compare_and_set(false, true)
      @exception_details.set(e)
    end
    
    # Wake up other waiting threads by signaling exception state
    raise e
  end
  
  def exception_occurred?
    @exception_occurred.value
  end
  
  def stored_exception
    @exception_details.value
  end
end

barrier = ExceptionSafeBarrier.new(4)

threads = 4.times.map do |id|
  Thread.new do
    begin
      sleep(rand(0.5..1.0))
      
      if id == 2
        raise StandardError, "Worker #{id} encountered error"
      end
      
      puts "Thread #{id} reaching barrier"
      barrier.wait_with_exception_handling
      puts "Thread #{id} passed barrier"
      
    rescue => e
      puts "Thread #{id} handling exception: #{e.message}"
      
      # Check if this thread caused the exception or received it
      if barrier.stored_exception == e
        puts "Thread #{id} was the source of the exception"
      else
        puts "Thread #{id} received exception from another thread"
      end
      
      raise e
    end
  end
end

# Collect thread results and exceptions
results = threads.map do |thread|
  begin
    thread.join
    :success
  rescue => e
    { error: e.message, thread: thread.object_id }
  end
end

puts "Results: #{results}"

Memory leaks can occur when barrier objects accumulate references to completed threads. Implement cleanup strategies for long-running applications.

class ManagedBarrierPool
  def initialize
    @active_barriers = Concurrent::Hash.new
    @cleanup_thread = start_cleanup_thread
  end
  
  def get_barrier(key, thread_count)
    @active_barriers.compute_if_absent(key) do
      {
        barrier: Concurrent::CyclicBarrier.new(thread_count),
        created_at: Time.now,
        last_used: Time.now,
        use_count: 0
      }
    end
    
    barrier_info = @active_barriers[key]
    barrier_info[:last_used] = Time.now
    barrier_info[:use_count] += 1
    
    barrier_info[:barrier]
  end
  
  def cleanup_stale_barriers(max_age = 300)  # 5 minutes
    cutoff_time = Time.now - max_age
    
    @active_barriers.each_pair do |key, info|
      if info[:last_used] < cutoff_time
        puts "Cleaning up stale barrier: #{key}"
        @active_barriers.delete(key)
      end
    end
  end
  
  def barrier_stats
    @active_barriers.transform_values do |info|
      {
        age: Time.now - info[:created_at],
        last_used_ago: Time.now - info[:last_used],
        use_count: info[:use_count]
      }
    end
  end
  
  private
  
  def start_cleanup_thread
    Thread.new do
      loop do
        sleep(60)  # Cleanup every minute
        cleanup_stale_barriers
      end
    rescue => e
      puts "Cleanup thread error: #{e.message}"
      retry
    end
  end
end

# Usage with automatic cleanup
pool = ManagedBarrierPool.new

# Simulate multiple barrier usage patterns
10.times do |batch|
  threads = 3.times.map do |id|
    Thread.new do
      barrier = pool.get_barrier("batch_#{batch % 3}", 3)
      
      sleep(rand(0.1..0.5))
      barrier.wait
      
      puts "Batch #{batch}, Thread #{id} completed"
    end
  end
  
  threads.each(&:join)
  sleep(0.1)
end

puts "Final barrier stats:"
puts pool.barrier_stats
sleep(2)  # Allow cleanup to run
puts "After cleanup:"
puts pool.barrier_stats

Performance & Memory

Barrier synchronization performance depends on thread count, contention patterns, and underlying system capabilities. Context switching overhead increases significantly with higher thread counts.

require 'benchmark'

def benchmark_barrier_performance(thread_counts, iterations = 1000)
  results = {}
  
  thread_counts.each do |count|
    time = Benchmark.measure do
      iterations.times do
        barrier = Concurrent::CyclicBarrier.new(count)
        
        threads = count.times.map do |id|
          Thread.new do
            barrier.wait
          end
        end
        
        threads.each(&:join)
      end
    end
    
    results[count] = {
      total_time: time.real,
      per_iteration: time.real / iterations,
      per_thread_per_iteration: time.real / (iterations * count)
    }
  end
  
  results
end

# Test scaling characteristics
thread_counts = [2, 4, 8, 16, 32]
results = benchmark_barrier_performance(thread_counts, 100)

puts "Barrier Performance Analysis:"
puts "Threads | Total(s) | Per-Iter(ms) | Per-Thread-Iter(ms)"
puts "-" * 55

results.each do |count, metrics|
  printf "%7d | %8.3f | %12.3f | %18.6f\n",
    count,
    metrics[:total_time],
    metrics[:per_iteration] * 1000,
    metrics[:per_thread_per_iteration] * 1000
end

Memory usage scales with thread count and barrier state. Monitor memory consumption in long-running applications with many barriers.

class MemoryAwareBarrier
  def initialize(thread_count, name = "barrier")
    @barrier = Concurrent::CyclicBarrier.new(thread_count)
    @name = name
    @wait_count = Concurrent::AtomicFixnum.new(0)
    @memory_samples = Concurrent::Array.new
  end
  
  def wait
    sample_memory_before = get_memory_usage
    @barrier.wait
    sample_memory_after = get_memory_usage
    
    wait_count = @wait_count.increment
    
    @memory_samples << {
      wait_number: wait_count,
      before_kb: sample_memory_before,
      after_kb: sample_memory_after,
      delta_kb: sample_memory_after - sample_memory_before
    }
  end
  
  def memory_report
    return "No samples collected" if @memory_samples.empty?
    
    samples = @memory_samples.value
    total_delta = samples.sum { |s| s[:delta_kb] }
    avg_delta = total_delta / samples.length.to_f
    
    "#{@name}: #{samples.length} waits, total Δ#{total_delta}KB, avg Δ#{avg_delta.round(2)}KB per wait"
  end
  
  private
  
  def get_memory_usage
    # Simplified memory measurement - in production use more sophisticated tools
    GC.stat(:heap_allocated_pages) * 16  # Approximate KB
  end
end

# Demonstrate memory tracking
barrier = MemoryAwareBarrier.new(4, "memory_test")

threads = 4.times.map do |id|
  Thread.new do
    10.times do |round|
      # Allocate some memory to observe patterns
      temp_data = Array.new(1000) { "data_#{id}_#{round}_#{rand(10000)}" }
      
      barrier.wait
      
      # Memory might be released here
      temp_data = nil
      GC.start if round % 5 == 0  # Periodic GC
    end
  end
end

threads.each(&:join)
puts barrier.memory_report

Optimize barrier usage by minimizing state sharing and reducing synchronization frequency. Batch operations where possible to reduce barrier overhead.

class BatchedBarrierProcessor
  def initialize(thread_count, batch_size = 10)
    @thread_count = thread_count
    @batch_size = batch_size
    @work_queue = Queue.new
    @result_queue = Queue.new
  end
  
  def process_items(items)
    # Divide items into batches
    items.each_slice(@batch_size) do |batch|
      @work_queue << batch
    end
    
    # Signal completion
    @thread_count.times { @work_queue << :done }
    
    # Start worker threads
    threads = @thread_count.times.map do |id|
      Thread.new { worker_loop(id) }
    end
    
    # Collect results
    results = []
    @thread_count.times do
      results.concat(@result_queue.pop)
    end
    
    threads.each(&:join)
    results.flatten
  end
  
  private
  
  def worker_loop(worker_id)
    local_results = []
    barrier = Concurrent::CyclicBarrier.new(@thread_count)
    
    loop do
      batch = @work_queue.pop
      break if batch == :done
      
      # Process batch independently
      batch_results = batch.map { |item| process_item(item, worker_id) }
      local_results.concat(batch_results)
      
      # Synchronize batch completion (optional - for coordinated processing)
      barrier.wait if batch.length == @batch_size
    end
    
    @result_queue << local_results
  end
  
  def process_item(item, worker_id)
    # Simulate processing
    "#{item}_processed_by_#{worker_id}"
  end
end

# Compare performance: frequent vs batched synchronization
items = (1..1000).to_a

# Frequent synchronization approach
frequent_sync_time = Benchmark.measure do
  barrier = Concurrent::CyclicBarrier.new(4)
  
  threads = 4.times.map do |id|
    Thread.new do
      items.select.with_index { |_, i| i % 4 == id }.each do |item|
        processed = "#{item}_processed"
        barrier.wait  # Synchronize after each item
      end
    end
  end
  
  threads.each(&:join)
end

# Batched synchronization approach
batched_time = Benchmark.measure do
  processor = BatchedBarrierProcessor.new(4, 50)
  results = processor.process_items(items)
end

puts "Frequent sync: #{frequent_sync_time.real.round(3)}s"
puts "Batched sync: #{batched_time.real.round(3)}s"
puts "Speedup: #{(frequent_sync_time.real / batched_time.real).round(2)}x"

Production Patterns

Production barrier synchronization requires robust error handling, monitoring, and graceful degradation capabilities. Implement circuit breaker patterns for barrier failures.

class ProductionBarrierManager
  def initialize(thread_count, name, timeout = 30.0)
    @thread_count = thread_count
    @name = name
    @timeout = timeout
    @barrier = Concurrent::CyclicBarrier.new(thread_count)
    @metrics = initialize_metrics
    @circuit_breaker = CircuitBreaker.new
  end
  
  def wait_with_monitoring
    start_time = Time.now
    
    begin
      @metrics[:attempts] += 1
      
      if @circuit_breaker.open?
        @metrics[:circuit_breaker_rejects] += 1
        raise CircuitBreakerOpen, "Barrier #{@name} circuit breaker is open"
      end
      
      result = @barrier.wait(@timeout)
      
      if result
        @metrics[:successful_waits] += 1
        @circuit_breaker.record_success
      else
        @metrics[:timeouts] += 1
        @circuit_breaker.record_failure
        raise TimeoutError, "Barrier #{@name} timeout after #{@timeout}s"
      end
      
      duration = Time.now - start_time
      @metrics[:wait_times] << duration
      
      result
      
    rescue => e
      @metrics[:errors] += 1
      @circuit_breaker.record_failure
      log_error(e, start_time)
      raise
    end
  end
  
  def health_status
    {
      name: @name,
      thread_count: @thread_count,
      circuit_breaker_state: @circuit_breaker.state,
      metrics: @metrics.dup.tap { |m| m.delete(:wait_times) },
      avg_wait_time: @metrics[:wait_times].empty? ? 0 : 
        @metrics[:wait_times].sum / @metrics[:wait_times].length,
      success_rate: calculate_success_rate
    }
  end
  
  private
  
  def initialize_metrics
    {
      attempts: 0,
      successful_waits: 0,
      timeouts: 0,
      errors: 0,
      circuit_breaker_rejects: 0,
      wait_times: []
    }
  end
  
  def calculate_success_rate
    return 0.0 if @metrics[:attempts] == 0
    (@metrics[:successful_waits].to_f / @metrics[:attempts]) * 100
  end
  
  def log_error(error, start_time)
    duration = Time.now - start_time
    puts "[#{Time.now}] Barrier #{@name} error after #{duration.round(3)}s: #{error.class} - #{error.message}"
  end
end

class CircuitBreaker
  def initialize(failure_threshold = 5, recovery_timeout = 60)
    @failure_threshold = failure_threshold
    @recovery_timeout = recovery_timeout
    @failure_count = 0
    @last_failure_time = nil
    @state = :closed  # :closed, :open, :half_open
  end
  
  def open?
    case @state
    when :closed
      false
    when :open
      if Time.now - @last_failure_time > @recovery_timeout
        @state = :half_open
        false
      else
        true
      end
    when :half_open
      false
    end
  end
  
  def record_success
    @failure_count = 0
    @state = :closed
  end
  
  def record_failure
    @failure_count += 1
    @last_failure_time = Time.now
    
    if @failure_count >= @failure_threshold
      @state = :open
    end
  end
  
  def state
    @state
  end
end

# Production usage example with monitoring
class DataPipelineStage
  def initialize(stage_name, parallelism = 4)
    @stage_name = stage_name
    @parallelism = parallelism
    @barrier_manager = ProductionBarrierManager.new(parallelism, stage_name)
    @processing_queue = Queue.new
    @results = Concurrent::Array.new
  end
  
  def process_batch(items)
    # Distribute work
    items.each_slice(items.length / @parallelism + 1) do |chunk|
      @processing_queue << chunk
    end
    
    # Start workers
    workers = @parallelism.times.map do |worker_id|
      Thread.new { worker_process(worker_id) }
    end
    
    # Wait for completion
    workers.each(&:join)
    
    # Return results and health status
    {
      results: @results.to_a.flatten,
      health: @barrier_manager.health_status
    }
  end
  
  private
  
  def worker_process(worker_id)
    chunk = @processing_queue.pop(true) rescue nil
    return unless chunk
    
    begin
      # Process chunk
      processed_items = chunk.map { |item| "#{@stage_name}_#{worker_id}_#{item}" }
      
      # Synchronize completion
      @barrier_manager.wait_with_monitoring
      
      # Store results
      @results.concat(processed_items)
      
    rescue => e
      puts "Worker #{worker_id} failed: #{e.message}"
      # Continue operation with degraded performance
    end
  end
end

# Simulate production pipeline
pipeline_stage = DataPipelineStage.new("data_transformation", 3)

begin
  result = pipeline_stage.process_batch((1..100).to_a)
  puts "Processed #{result[:results].length} items"
  puts "Health status: #{result[:health]}"
rescue => e
  puts "Pipeline stage failed: #{e.message}"
end

Implement graceful shutdown procedures that properly release waiting threads and clean up resources.

class GracefulBarrierShutdown
  def initialize(thread_count)
    @thread_count = thread_count
    @barrier = Concurrent::CyclicBarrier.new(thread_count)
    @shutdown_requested = Concurrent::AtomicBoolean.new(false)
    @active_threads = Concurrent::Set.new
  end
  
  def register_thread
    @active_threads.add(Thread.current)
    Thread.current.at_exit { @active_threads.delete(Thread.current) }
  end
  
  def wait_or_shutdown
    register_thread
    
    if @shutdown_requested.value
      raise ShutdownRequested, "Barrier shutdown in progress"
    end
    
    @barrier.wait
  rescue ShutdownRequested
    puts "Thread #{Thread.current.object_id}: Shutdown requested, exiting gracefully"
    raise
  end
  
  def initiate_shutdown(timeout = 10.0)
    puts "Initiating graceful shutdown..."
    @shutdown_requested.set(true)
    
    # Wake up threads by sending interrupts
    @active_threads.each do |thread|
      thread.raise(ShutdownRequested) if thread.alive?
    end
    
    # Wait for threads to complete
    deadline = Time.now + timeout
    while @active_threads.any?(&:alive?) && Time.now < deadline
      sleep(0.1)
    end
    
    # Force terminate remaining threads
    remaining = @active_threads.select(&:alive?)
    unless remaining.empty?
      puts "Force terminating #{remaining.length} threads"
      remaining.each { |t| t.kill }
    end
    
    puts "Shutdown complete"
  end
  
  def active_thread_count
    @active_threads.count(&:alive?)
  end
end

class ShutdownRequested < StandardError; end

# Demonstrate graceful shutdown
shutdown_manager = GracefulBarrierShutdown.new(4)

# Start worker threads
workers = 4.times.map do |id|
  Thread.new do
    begin
      10.times do |round|
        puts "Worker #{id}: Round #{round}"
        sleep(rand(0.5..1.5))
        shutdown_manager.wait_or_shutdown
        puts "Worker #{id}: Completed round #{round}"
      end
    rescue ShutdownRequested
      puts "Worker #{id}: Received shutdown signal"
    end
  end
end

# Simulate shutdown after some time
shutdown_thread = Thread.new do
  sleep(3.0)  # Let workers run for 3 seconds
  shutdown_manager.initiate_shutdown
end

# Wait for all threads
workers.each do |worker|
  begin
    worker.join
  rescue => e
    puts "Worker terminated: #{e.class}"
  end
end

shutdown_thread.join
puts "All workers terminated. Active threads: #{shutdown_manager.active_thread_count}"

Reference

Concurrent::CyclicBarrier

Method Parameters Returns Description
new(parties, &action) parties (Integer), action (Proc, optional) CyclicBarrier Creates barrier for specified thread count with optional action
#wait(timeout = nil) timeout (Numeric, optional) Boolean Blocks until all parties arrive; returns true on success, false on timeout
#parties None Integer Returns the number of threads required to trip the barrier
#number_waiting None Integer Returns the number of threads currently waiting at the barrier
#broken? None Boolean Returns true if barrier is in broken state due to exceptions
#reset None nil Resets barrier to initial state, breaking any waiting threads

Concurrent::CountDownLatch

Method Parameters Returns Description
new(count) count (Integer) CountDownLatch Creates latch with initial count
#wait(timeout = nil) timeout (Numeric, optional) Boolean Blocks until count reaches zero; returns true on success, false on timeout
#count_down None nil Decrements the count by one
#count None Integer Returns current count value

Exception Classes

Exception Description Common Causes
Concurrent::TimeoutError Barrier wait operation timed out Insufficient threads reaching barrier, deadlocked threads
Concurrent::BrokenBarrierError Barrier broken due to exception or reset Thread exception before barrier, explicit reset call
Concurrent::IllegalStateError Invalid operation on latch/barrier Count down below zero, wait on completed latch

Barrier States

State CyclicBarrier CountDownLatch Description
Initial parties waiting slots Count > 0 Ready to accept waiting threads
Waiting Some threads at barrier Count > 0, threads waiting Partial completion
Triggered All threads proceed Count = 0 Synchronization point reached
Reset Back to initial state N/A (single-use) Ready for next synchronization cycle
Broken Exception occurred N/A Unusable until reset

Performance Characteristics

Operation Time Complexity Memory Usage Thread Safety
Barrier creation O(1) O(parties) Constructor thread-safe
wait() operation O(parties) O(1) additional Fully thread-safe
count_down() O(1) O(1) Atomic operation
State queries O(1) O(1) Lock-free reads

Timeout Behavior

Scenario CyclicBarrier CountDownLatch Recovery Strategy
Timeout during wait Returns false, barrier remains usable Returns false, latch remains usable Retry or fallback logic
Partial timeout Some threads timeout, others continue waiting Threads continue waiting Check barrier state
Exception during wait Barrier becomes broken Latch remains usable Reset barrier or create new instance

Common Usage Patterns

# Producer-Consumer Coordination
latch = Concurrent::CountDownLatch.new(producer_count)
producers.each { |p| Thread.new { produce_data; latch.count_down } }
latch.wait  # Wait for all producers

# Phase-based Processing
barrier = Concurrent::CyclicBarrier.new(worker_count)
workers.each { |w| Thread.new { phase1; barrier.wait; phase2; barrier.wait } }

# Master-Worker Coordination
barrier = Concurrent::CyclicBarrier.new(workers + 1) do
  # Master thread coordination logic
end

Thread Safety Guarantees

  • Memory Visibility: All writes before barrier crossing are visible after
  • Ordering: No reordering of operations across barrier boundaries
  • Atomicity: Barrier state changes are atomic operations
  • Exception Safety: Proper cleanup on thread interruption or exceptions