Overview
Barrier synchronization coordinates multiple threads by creating synchronization points where execution pauses until all participating threads arrive. Ruby implements barrier synchronization through the concurrent-ruby
gem's Concurrent::CyclicBarrier
and Concurrent::CountDownLatch
classes, along with built-in primitives like Mutex
and ConditionVariable
for custom implementations.
A barrier acts as a gate that remains closed until a predetermined number of threads reach it. Once all threads arrive, the barrier opens and allows all threads to proceed simultaneously. This pattern proves essential in parallel algorithms where threads must complete specific phases before moving to the next stage.
require 'concurrent'
barrier = Concurrent::CyclicBarrier.new(3)
3.times do |i|
Thread.new do
puts "Thread #{i} starting work"
sleep(rand(1..3)) # Simulate work
puts "Thread #{i} finished work, waiting at barrier"
barrier.wait
puts "Thread #{i} proceeding after barrier"
end
end
sleep(5) # Let threads complete
The CyclicBarrier
resets automatically after all threads pass through, making it reusable for multiple synchronization rounds. The CountDownLatch
provides single-use barrier functionality where threads wait until a counter reaches zero.
latch = Concurrent::CountDownLatch.new(2)
worker_threads = 2.times.map do |i|
Thread.new do
puts "Worker #{i} completing task"
sleep(rand(1..2))
latch.count_down
end
end
main_thread = Thread.new do
latch.wait
puts "All workers completed, main thread proceeding"
end
[worker_threads, main_thread].flatten.each(&:join)
Ruby's barrier synchronization supports both blocking and non-blocking operations, timeout mechanisms, and exception handling across thread boundaries. These primitives form the foundation for complex concurrent algorithms including parallel map-reduce operations, phased computations, and coordinated resource management.
Basic Usage
The Concurrent::CyclicBarrier
creates reusable synchronization points for a fixed number of threads. Initialize with the required thread count and optional barrier action that executes when all threads arrive.
require 'concurrent'
# Create barrier for 4 threads
barrier = Concurrent::CyclicBarrier.new(4)
threads = 4.times.map do |id|
Thread.new do
# Phase 1: Individual work
puts "Thread #{id}: Starting phase 1"
sleep(rand(0.5..1.5))
puts "Thread #{id}: Phase 1 complete"
# Synchronization point
barrier.wait
# Phase 2: Coordinated work
puts "Thread #{id}: Starting phase 2"
sleep(rand(0.5..1.0))
puts "Thread #{id}: All phases complete"
end
end
threads.each(&:join)
The barrier action executes once when all threads arrive, running in one of the waiting threads. This proves useful for phase transitions or shared state updates.
barrier = Concurrent::CyclicBarrier.new(3) do
puts "All threads reached barrier - transitioning to next phase"
end
data_processor = Array.new(3) { [] }
threads = 3.times.map do |worker_id|
Thread.new do
10.times do |round|
# Process data chunk
data_processor[worker_id] << "Round #{round} data"
# Wait for all workers to complete round
barrier.wait
# All workers have completed this round
end
end
end
threads.each(&:join)
puts "Final data: #{data_processor}"
CountDownLatch
provides one-time coordination where multiple threads signal completion to waiting threads. The latch starts with a count and decrements until reaching zero.
task_count = 5
latch = Concurrent::CountDownLatch.new(task_count)
# Create worker threads
workers = task_count.times.map do |task_id|
Thread.new do
puts "Task #{task_id}: Processing..."
sleep(rand(1..3))
puts "Task #{task_id}: Complete"
latch.count_down # Signal completion
end
end
# Main thread waits for all tasks
puts "Waiting for all tasks to complete..."
latch.wait
puts "All tasks finished!"
workers.each(&:join)
Both barrier types support timeout operations to prevent indefinite blocking. The wait
method accepts timeout values and returns true
if successful or false
if timeout occurred.
barrier = Concurrent::CyclicBarrier.new(2)
Thread.new do
result = barrier.wait(2.0) # Wait maximum 2 seconds
if result
puts "Barrier opened successfully"
else
puts "Timeout occurred waiting for barrier"
end
end
sleep(3) # Simulate delayed second thread
Thread.new { barrier.wait }
sleep(1)
Thread Safety & Concurrency
Barrier synchronization primitives maintain internal thread safety through atomic operations and proper memory synchronization. However, application code must handle shared state access correctly around barrier points.
The CyclicBarrier
guarantees that barrier actions execute with proper memory visibility. All writes performed by threads before reaching the barrier become visible to threads proceeding after the barrier opens.
barrier = Concurrent::CyclicBarrier.new(3)
shared_data = Concurrent::Array.new
threads = 3.times.map do |id|
Thread.new do
# Phase 1: Each thread modifies shared data
10.times { |i| shared_data << "Thread #{id}, item #{i}" }
# Barrier ensures all modifications are visible
barrier.wait
# Phase 2: All threads see complete shared_data
puts "Thread #{id} sees #{shared_data.length} items"
barrier.wait
# Phase 3: Safe to process complete dataset
thread_items = shared_data.select { |item| item.include?("Thread #{id}") }
puts "Thread #{id} processed #{thread_items.length} own items"
end
end
threads.each(&:join)
Race conditions can occur when threads access shared resources without proper synchronization beyond the barrier points. Combine barriers with mutexes or thread-safe collections for complete safety.
barrier = Concurrent::CyclicBarrier.new(4)
mutex = Mutex.new
results = []
worker_threads = 4.times.map do |worker_id|
Thread.new do
local_results = []
# Phase 1: Independent computation
100.times { |i| local_results << worker_id * 100 + i }
barrier.wait # Synchronize completion of phase 1
# Phase 2: Thread-safe result aggregation
mutex.synchronize do
results.concat(local_results)
end
barrier.wait # Synchronize completion of phase 2
# Phase 3: Process complete results
worker_share = results.select { |r| r / 100 == worker_id }
puts "Worker #{worker_id}: processed #{worker_share.length} results"
end
end
worker_threads.each(&:join)
puts "Total results: #{results.length}"
Exception handling in barrier synchronization requires careful consideration. If a thread encounters an exception before reaching the barrier, other threads may wait indefinitely unless proper cleanup occurs.
barrier = Concurrent::CyclicBarrier.new(3)
exception_barrier = Concurrent::CountDownLatch.new(1)
threads = 3.times.map do |id|
Thread.new do
begin
if id == 1
raise StandardError, "Simulated worker failure"
end
sleep(rand(1..2)) # Simulate work
puts "Thread #{id} reaching barrier"
barrier.wait
puts "Thread #{id} passed barrier"
rescue => e
puts "Thread #{id} failed: #{e.message}"
exception_barrier.count_down # Signal failure to other threads
raise
end
end
end
# Monitor for exceptions
exception_thread = Thread.new do
if exception_barrier.wait(3.0)
puts "Exception detected, cleaning up remaining threads"
threads.each { |t| t.raise(Interrupt) if t.alive? }
end
end
threads.each do |t|
begin
t.join
rescue => e
puts "Thread terminated: #{e.class}"
end
end
exception_thread.join
Barrier synchronization works effectively with other concurrency patterns. Thread pools can coordinate batch processing where worker threads synchronize between processing phases.
require 'concurrent'
class PhaseCoordinator
def initialize(thread_count)
@barrier = Concurrent::CyclicBarrier.new(thread_count)
@phase_data = Concurrent::Hash.new
end
def wait_for_phase_completion(phase_name, data = nil)
Thread.current[:phase_data] = data if data
@barrier.wait
# Collect data from all threads
all_data = Thread.list
.select { |t| t[:phase_data] }
.map { |t| t[:phase_data] }
.compact
Thread.current[:phase_data] = nil
all_data
end
end
coordinator = PhaseCoordinator.new(3)
threads = 3.times.map do |id|
Thread.new do
# Phase 1: Data collection
local_data = Array.new(10) { rand(100) }
collected_data = coordinator.wait_for_phase_completion("collection", local_data)
# Phase 2: Data analysis
analysis = collected_data.flatten.sum / collected_data.flatten.length.to_f
analysis_results = coordinator.wait_for_phase_completion("analysis", analysis)
# Phase 3: Report generation
puts "Thread #{id}: Average from #{analysis_results.length} analyses: #{analysis_results.sum / analysis_results.length}"
coordinator.wait_for_phase_completion("reporting")
end
end
threads.each(&:join)
Error Handling & Debugging
Barrier synchronization introduces specific error conditions that require targeted debugging strategies. Thread starvation occurs when fewer threads than expected reach the barrier, causing indefinite blocking.
require 'timeout'
class DebuggableBarrier
def initialize(expected_threads, name = "barrier")
@barrier = Concurrent::CyclicBarrier.new(expected_threads)
@expected = expected_threads
@name = name
@waiting_threads = Concurrent::Set.new
end
def wait(timeout = nil)
thread_id = Thread.current.object_id
@waiting_threads.add(thread_id)
puts "[#{@name}] Thread #{thread_id} waiting (#{@waiting_threads.size}/#{@expected})"
begin
if timeout
result = @barrier.wait(timeout)
unless result
puts "[#{@name}] Timeout! Waiting threads: #{@waiting_threads.to_a}"
debug_barrier_state
end
result
else
@barrier.wait
end
ensure
@waiting_threads.delete(thread_id)
end
end
private
def debug_barrier_state
puts "[#{@name}] Expected: #{@expected}, Currently waiting: #{@waiting_threads.size}"
puts "[#{@name}] Waiting thread IDs: #{@waiting_threads.to_a}"
puts "[#{@name}] All threads: #{Thread.list.map(&:object_id)}"
end
end
# Demonstrate debugging thread starvation
barrier = DebuggableBarrier.new(3, "test_barrier")
threads = 2.times.map do |id| # Only create 2 threads instead of 3
Thread.new do
puts "Thread #{id} starting"
sleep(rand(1..2))
barrier.wait(5.0) # Will timeout
puts "Thread #{id} completed"
end
end
threads.each(&:join)
Exception propagation across barrier boundaries requires careful handling to prevent thread leakage and resource cleanup issues.
class ExceptionSafeBarrier
def initialize(thread_count)
@barrier = Concurrent::CyclicBarrier.new(thread_count)
@exception_occurred = Concurrent::AtomicBoolean.new(false)
@exception_details = Concurrent::AtomicReference.new
end
def wait_with_exception_handling
if @exception_occurred.value
stored_exception = @exception_details.value
raise stored_exception if stored_exception
end
@barrier.wait
rescue => e
# Store first exception that occurs
unless @exception_occurred.compare_and_set(false, true)
@exception_details.set(e)
end
# Wake up other waiting threads by signaling exception state
raise e
end
def exception_occurred?
@exception_occurred.value
end
def stored_exception
@exception_details.value
end
end
barrier = ExceptionSafeBarrier.new(4)
threads = 4.times.map do |id|
Thread.new do
begin
sleep(rand(0.5..1.0))
if id == 2
raise StandardError, "Worker #{id} encountered error"
end
puts "Thread #{id} reaching barrier"
barrier.wait_with_exception_handling
puts "Thread #{id} passed barrier"
rescue => e
puts "Thread #{id} handling exception: #{e.message}"
# Check if this thread caused the exception or received it
if barrier.stored_exception == e
puts "Thread #{id} was the source of the exception"
else
puts "Thread #{id} received exception from another thread"
end
raise e
end
end
end
# Collect thread results and exceptions
results = threads.map do |thread|
begin
thread.join
:success
rescue => e
{ error: e.message, thread: thread.object_id }
end
end
puts "Results: #{results}"
Memory leaks can occur when barrier objects accumulate references to completed threads. Implement cleanup strategies for long-running applications.
class ManagedBarrierPool
def initialize
@active_barriers = Concurrent::Hash.new
@cleanup_thread = start_cleanup_thread
end
def get_barrier(key, thread_count)
@active_barriers.compute_if_absent(key) do
{
barrier: Concurrent::CyclicBarrier.new(thread_count),
created_at: Time.now,
last_used: Time.now,
use_count: 0
}
end
barrier_info = @active_barriers[key]
barrier_info[:last_used] = Time.now
barrier_info[:use_count] += 1
barrier_info[:barrier]
end
def cleanup_stale_barriers(max_age = 300) # 5 minutes
cutoff_time = Time.now - max_age
@active_barriers.each_pair do |key, info|
if info[:last_used] < cutoff_time
puts "Cleaning up stale barrier: #{key}"
@active_barriers.delete(key)
end
end
end
def barrier_stats
@active_barriers.transform_values do |info|
{
age: Time.now - info[:created_at],
last_used_ago: Time.now - info[:last_used],
use_count: info[:use_count]
}
end
end
private
def start_cleanup_thread
Thread.new do
loop do
sleep(60) # Cleanup every minute
cleanup_stale_barriers
end
rescue => e
puts "Cleanup thread error: #{e.message}"
retry
end
end
end
# Usage with automatic cleanup
pool = ManagedBarrierPool.new
# Simulate multiple barrier usage patterns
10.times do |batch|
threads = 3.times.map do |id|
Thread.new do
barrier = pool.get_barrier("batch_#{batch % 3}", 3)
sleep(rand(0.1..0.5))
barrier.wait
puts "Batch #{batch}, Thread #{id} completed"
end
end
threads.each(&:join)
sleep(0.1)
end
puts "Final barrier stats:"
puts pool.barrier_stats
sleep(2) # Allow cleanup to run
puts "After cleanup:"
puts pool.barrier_stats
Performance & Memory
Barrier synchronization performance depends on thread count, contention patterns, and underlying system capabilities. Context switching overhead increases significantly with higher thread counts.
require 'benchmark'
def benchmark_barrier_performance(thread_counts, iterations = 1000)
results = {}
thread_counts.each do |count|
time = Benchmark.measure do
iterations.times do
barrier = Concurrent::CyclicBarrier.new(count)
threads = count.times.map do |id|
Thread.new do
barrier.wait
end
end
threads.each(&:join)
end
end
results[count] = {
total_time: time.real,
per_iteration: time.real / iterations,
per_thread_per_iteration: time.real / (iterations * count)
}
end
results
end
# Test scaling characteristics
thread_counts = [2, 4, 8, 16, 32]
results = benchmark_barrier_performance(thread_counts, 100)
puts "Barrier Performance Analysis:"
puts "Threads | Total(s) | Per-Iter(ms) | Per-Thread-Iter(ms)"
puts "-" * 55
results.each do |count, metrics|
printf "%7d | %8.3f | %12.3f | %18.6f\n",
count,
metrics[:total_time],
metrics[:per_iteration] * 1000,
metrics[:per_thread_per_iteration] * 1000
end
Memory usage scales with thread count and barrier state. Monitor memory consumption in long-running applications with many barriers.
class MemoryAwareBarrier
def initialize(thread_count, name = "barrier")
@barrier = Concurrent::CyclicBarrier.new(thread_count)
@name = name
@wait_count = Concurrent::AtomicFixnum.new(0)
@memory_samples = Concurrent::Array.new
end
def wait
sample_memory_before = get_memory_usage
@barrier.wait
sample_memory_after = get_memory_usage
wait_count = @wait_count.increment
@memory_samples << {
wait_number: wait_count,
before_kb: sample_memory_before,
after_kb: sample_memory_after,
delta_kb: sample_memory_after - sample_memory_before
}
end
def memory_report
return "No samples collected" if @memory_samples.empty?
samples = @memory_samples.value
total_delta = samples.sum { |s| s[:delta_kb] }
avg_delta = total_delta / samples.length.to_f
"#{@name}: #{samples.length} waits, total Δ#{total_delta}KB, avg Δ#{avg_delta.round(2)}KB per wait"
end
private
def get_memory_usage
# Simplified memory measurement - in production use more sophisticated tools
GC.stat(:heap_allocated_pages) * 16 # Approximate KB
end
end
# Demonstrate memory tracking
barrier = MemoryAwareBarrier.new(4, "memory_test")
threads = 4.times.map do |id|
Thread.new do
10.times do |round|
# Allocate some memory to observe patterns
temp_data = Array.new(1000) { "data_#{id}_#{round}_#{rand(10000)}" }
barrier.wait
# Memory might be released here
temp_data = nil
GC.start if round % 5 == 0 # Periodic GC
end
end
end
threads.each(&:join)
puts barrier.memory_report
Optimize barrier usage by minimizing state sharing and reducing synchronization frequency. Batch operations where possible to reduce barrier overhead.
class BatchedBarrierProcessor
def initialize(thread_count, batch_size = 10)
@thread_count = thread_count
@batch_size = batch_size
@work_queue = Queue.new
@result_queue = Queue.new
end
def process_items(items)
# Divide items into batches
items.each_slice(@batch_size) do |batch|
@work_queue << batch
end
# Signal completion
@thread_count.times { @work_queue << :done }
# Start worker threads
threads = @thread_count.times.map do |id|
Thread.new { worker_loop(id) }
end
# Collect results
results = []
@thread_count.times do
results.concat(@result_queue.pop)
end
threads.each(&:join)
results.flatten
end
private
def worker_loop(worker_id)
local_results = []
barrier = Concurrent::CyclicBarrier.new(@thread_count)
loop do
batch = @work_queue.pop
break if batch == :done
# Process batch independently
batch_results = batch.map { |item| process_item(item, worker_id) }
local_results.concat(batch_results)
# Synchronize batch completion (optional - for coordinated processing)
barrier.wait if batch.length == @batch_size
end
@result_queue << local_results
end
def process_item(item, worker_id)
# Simulate processing
"#{item}_processed_by_#{worker_id}"
end
end
# Compare performance: frequent vs batched synchronization
items = (1..1000).to_a
# Frequent synchronization approach
frequent_sync_time = Benchmark.measure do
barrier = Concurrent::CyclicBarrier.new(4)
threads = 4.times.map do |id|
Thread.new do
items.select.with_index { |_, i| i % 4 == id }.each do |item|
processed = "#{item}_processed"
barrier.wait # Synchronize after each item
end
end
end
threads.each(&:join)
end
# Batched synchronization approach
batched_time = Benchmark.measure do
processor = BatchedBarrierProcessor.new(4, 50)
results = processor.process_items(items)
end
puts "Frequent sync: #{frequent_sync_time.real.round(3)}s"
puts "Batched sync: #{batched_time.real.round(3)}s"
puts "Speedup: #{(frequent_sync_time.real / batched_time.real).round(2)}x"
Production Patterns
Production barrier synchronization requires robust error handling, monitoring, and graceful degradation capabilities. Implement circuit breaker patterns for barrier failures.
class ProductionBarrierManager
def initialize(thread_count, name, timeout = 30.0)
@thread_count = thread_count
@name = name
@timeout = timeout
@barrier = Concurrent::CyclicBarrier.new(thread_count)
@metrics = initialize_metrics
@circuit_breaker = CircuitBreaker.new
end
def wait_with_monitoring
start_time = Time.now
begin
@metrics[:attempts] += 1
if @circuit_breaker.open?
@metrics[:circuit_breaker_rejects] += 1
raise CircuitBreakerOpen, "Barrier #{@name} circuit breaker is open"
end
result = @barrier.wait(@timeout)
if result
@metrics[:successful_waits] += 1
@circuit_breaker.record_success
else
@metrics[:timeouts] += 1
@circuit_breaker.record_failure
raise TimeoutError, "Barrier #{@name} timeout after #{@timeout}s"
end
duration = Time.now - start_time
@metrics[:wait_times] << duration
result
rescue => e
@metrics[:errors] += 1
@circuit_breaker.record_failure
log_error(e, start_time)
raise
end
end
def health_status
{
name: @name,
thread_count: @thread_count,
circuit_breaker_state: @circuit_breaker.state,
metrics: @metrics.dup.tap { |m| m.delete(:wait_times) },
avg_wait_time: @metrics[:wait_times].empty? ? 0 :
@metrics[:wait_times].sum / @metrics[:wait_times].length,
success_rate: calculate_success_rate
}
end
private
def initialize_metrics
{
attempts: 0,
successful_waits: 0,
timeouts: 0,
errors: 0,
circuit_breaker_rejects: 0,
wait_times: []
}
end
def calculate_success_rate
return 0.0 if @metrics[:attempts] == 0
(@metrics[:successful_waits].to_f / @metrics[:attempts]) * 100
end
def log_error(error, start_time)
duration = Time.now - start_time
puts "[#{Time.now}] Barrier #{@name} error after #{duration.round(3)}s: #{error.class} - #{error.message}"
end
end
class CircuitBreaker
def initialize(failure_threshold = 5, recovery_timeout = 60)
@failure_threshold = failure_threshold
@recovery_timeout = recovery_timeout
@failure_count = 0
@last_failure_time = nil
@state = :closed # :closed, :open, :half_open
end
def open?
case @state
when :closed
false
when :open
if Time.now - @last_failure_time > @recovery_timeout
@state = :half_open
false
else
true
end
when :half_open
false
end
end
def record_success
@failure_count = 0
@state = :closed
end
def record_failure
@failure_count += 1
@last_failure_time = Time.now
if @failure_count >= @failure_threshold
@state = :open
end
end
def state
@state
end
end
# Production usage example with monitoring
class DataPipelineStage
def initialize(stage_name, parallelism = 4)
@stage_name = stage_name
@parallelism = parallelism
@barrier_manager = ProductionBarrierManager.new(parallelism, stage_name)
@processing_queue = Queue.new
@results = Concurrent::Array.new
end
def process_batch(items)
# Distribute work
items.each_slice(items.length / @parallelism + 1) do |chunk|
@processing_queue << chunk
end
# Start workers
workers = @parallelism.times.map do |worker_id|
Thread.new { worker_process(worker_id) }
end
# Wait for completion
workers.each(&:join)
# Return results and health status
{
results: @results.to_a.flatten,
health: @barrier_manager.health_status
}
end
private
def worker_process(worker_id)
chunk = @processing_queue.pop(true) rescue nil
return unless chunk
begin
# Process chunk
processed_items = chunk.map { |item| "#{@stage_name}_#{worker_id}_#{item}" }
# Synchronize completion
@barrier_manager.wait_with_monitoring
# Store results
@results.concat(processed_items)
rescue => e
puts "Worker #{worker_id} failed: #{e.message}"
# Continue operation with degraded performance
end
end
end
# Simulate production pipeline
pipeline_stage = DataPipelineStage.new("data_transformation", 3)
begin
result = pipeline_stage.process_batch((1..100).to_a)
puts "Processed #{result[:results].length} items"
puts "Health status: #{result[:health]}"
rescue => e
puts "Pipeline stage failed: #{e.message}"
end
Implement graceful shutdown procedures that properly release waiting threads and clean up resources.
class GracefulBarrierShutdown
def initialize(thread_count)
@thread_count = thread_count
@barrier = Concurrent::CyclicBarrier.new(thread_count)
@shutdown_requested = Concurrent::AtomicBoolean.new(false)
@active_threads = Concurrent::Set.new
end
def register_thread
@active_threads.add(Thread.current)
Thread.current.at_exit { @active_threads.delete(Thread.current) }
end
def wait_or_shutdown
register_thread
if @shutdown_requested.value
raise ShutdownRequested, "Barrier shutdown in progress"
end
@barrier.wait
rescue ShutdownRequested
puts "Thread #{Thread.current.object_id}: Shutdown requested, exiting gracefully"
raise
end
def initiate_shutdown(timeout = 10.0)
puts "Initiating graceful shutdown..."
@shutdown_requested.set(true)
# Wake up threads by sending interrupts
@active_threads.each do |thread|
thread.raise(ShutdownRequested) if thread.alive?
end
# Wait for threads to complete
deadline = Time.now + timeout
while @active_threads.any?(&:alive?) && Time.now < deadline
sleep(0.1)
end
# Force terminate remaining threads
remaining = @active_threads.select(&:alive?)
unless remaining.empty?
puts "Force terminating #{remaining.length} threads"
remaining.each { |t| t.kill }
end
puts "Shutdown complete"
end
def active_thread_count
@active_threads.count(&:alive?)
end
end
class ShutdownRequested < StandardError; end
# Demonstrate graceful shutdown
shutdown_manager = GracefulBarrierShutdown.new(4)
# Start worker threads
workers = 4.times.map do |id|
Thread.new do
begin
10.times do |round|
puts "Worker #{id}: Round #{round}"
sleep(rand(0.5..1.5))
shutdown_manager.wait_or_shutdown
puts "Worker #{id}: Completed round #{round}"
end
rescue ShutdownRequested
puts "Worker #{id}: Received shutdown signal"
end
end
end
# Simulate shutdown after some time
shutdown_thread = Thread.new do
sleep(3.0) # Let workers run for 3 seconds
shutdown_manager.initiate_shutdown
end
# Wait for all threads
workers.each do |worker|
begin
worker.join
rescue => e
puts "Worker terminated: #{e.class}"
end
end
shutdown_thread.join
puts "All workers terminated. Active threads: #{shutdown_manager.active_thread_count}"
Reference
Concurrent::CyclicBarrier
Method | Parameters | Returns | Description |
---|---|---|---|
new(parties, &action) |
parties (Integer), action (Proc, optional) |
CyclicBarrier |
Creates barrier for specified thread count with optional action |
#wait(timeout = nil) |
timeout (Numeric, optional) |
Boolean |
Blocks until all parties arrive; returns true on success, false on timeout |
#parties |
None | Integer |
Returns the number of threads required to trip the barrier |
#number_waiting |
None | Integer |
Returns the number of threads currently waiting at the barrier |
#broken? |
None | Boolean |
Returns true if barrier is in broken state due to exceptions |
#reset |
None | nil |
Resets barrier to initial state, breaking any waiting threads |
Concurrent::CountDownLatch
Method | Parameters | Returns | Description |
---|---|---|---|
new(count) |
count (Integer) |
CountDownLatch |
Creates latch with initial count |
#wait(timeout = nil) |
timeout (Numeric, optional) |
Boolean |
Blocks until count reaches zero; returns true on success, false on timeout |
#count_down |
None | nil |
Decrements the count by one |
#count |
None | Integer |
Returns current count value |
Exception Classes
Exception | Description | Common Causes |
---|---|---|
Concurrent::TimeoutError |
Barrier wait operation timed out | Insufficient threads reaching barrier, deadlocked threads |
Concurrent::BrokenBarrierError |
Barrier broken due to exception or reset | Thread exception before barrier, explicit reset call |
Concurrent::IllegalStateError |
Invalid operation on latch/barrier | Count down below zero, wait on completed latch |
Barrier States
State | CyclicBarrier | CountDownLatch | Description |
---|---|---|---|
Initial | parties waiting slots |
Count > 0 | Ready to accept waiting threads |
Waiting | Some threads at barrier | Count > 0, threads waiting | Partial completion |
Triggered | All threads proceed | Count = 0 | Synchronization point reached |
Reset | Back to initial state | N/A (single-use) | Ready for next synchronization cycle |
Broken | Exception occurred | N/A | Unusable until reset |
Performance Characteristics
Operation | Time Complexity | Memory Usage | Thread Safety |
---|---|---|---|
Barrier creation | O(1) | O(parties) | Constructor thread-safe |
wait() operation |
O(parties) | O(1) additional | Fully thread-safe |
count_down() |
O(1) | O(1) | Atomic operation |
State queries | O(1) | O(1) | Lock-free reads |
Timeout Behavior
Scenario | CyclicBarrier | CountDownLatch | Recovery Strategy |
---|---|---|---|
Timeout during wait | Returns false , barrier remains usable |
Returns false , latch remains usable |
Retry or fallback logic |
Partial timeout | Some threads timeout, others continue waiting | Threads continue waiting | Check barrier state |
Exception during wait | Barrier becomes broken | Latch remains usable | Reset barrier or create new instance |
Common Usage Patterns
# Producer-Consumer Coordination
latch = Concurrent::CountDownLatch.new(producer_count)
producers.each { |p| Thread.new { produce_data; latch.count_down } }
latch.wait # Wait for all producers
# Phase-based Processing
barrier = Concurrent::CyclicBarrier.new(worker_count)
workers.each { |w| Thread.new { phase1; barrier.wait; phase2; barrier.wait } }
# Master-Worker Coordination
barrier = Concurrent::CyclicBarrier.new(workers + 1) do
# Master thread coordination logic
end
Thread Safety Guarantees
- Memory Visibility: All writes before barrier crossing are visible after
- Ordering: No reordering of operations across barrier boundaries
- Atomicity: Barrier state changes are atomic operations
- Exception Safety: Proper cleanup on thread interruption or exceptions