Overview
Thread safety in Ruby refers to code that functions correctly when accessed by multiple threads simultaneously. Ruby provides several mechanisms for writing thread-safe code, including mutexes, condition variables, atomic operations, and thread-safe data structures. The Global Interpreter Lock (GIL) in MRI Ruby prevents true parallelism for CPU-bound tasks but still requires careful synchronization for shared mutable state.
Ruby's threading model creates race conditions when multiple threads access and modify shared data without proper synchronization. A race condition occurs when the program's behavior depends on the relative timing of thread execution. Consider this unsafe counter example:
class UnsafeCounter
def initialize
@count = 0
end
def increment
@count += 1 # Not atomic: read, add, write
end
def value
@count
end
end
counter = UnsafeCounter.new
threads = 10.times.map do
Thread.new { 1000.times { counter.increment } }
end
threads.each(&:join)
puts counter.value # Often less than 10,000 due to race conditions
The Thread
class provides the foundation for concurrent execution. Ruby includes several synchronization primitives in the standard library: Mutex
for mutual exclusion, ConditionVariable
for thread coordination, Queue
and SizedQueue
for thread-safe communication, and Monitor
for reentrant locking.
Ruby also provides Concurrent::
classes through the concurrent-ruby gem, offering modern thread-safe data structures and synchronization tools. These include Concurrent::Hash
, Concurrent::Array
, Concurrent::Atom
, and various executor services.
The key principle in thread safety is controlling access to shared mutable state. Immutable objects are inherently thread-safe since they cannot change after creation. When mutation is necessary, synchronization ensures only one thread modifies data at a time.
Basic Usage
The Mutex
class provides the most common synchronization mechanism. A mutex ensures exclusive access to a critical section of code. Only one thread can hold a mutex lock at any time:
require 'thread'
class SafeCounter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
@count += 1
end
end
def value
@mutex.synchronize do
@count
end
end
end
The synchronize
method acquires the lock, executes the block, and releases the lock even if an exception occurs. This pattern prevents race conditions by serializing access to the @count
variable.
Queue
provides thread-safe communication between threads. Producers push items while consumers pop them. The queue handles synchronization internally:
queue = Queue.new
producer = Thread.new do
5.times do |i|
queue << "item #{i}"
puts "Produced: item #{i}"
sleep(0.1)
end
end
consumer = Thread.new do
5.times do
item = queue.pop
puts "Consumed: #{item}"
end
end
[producer, consumer].each(&:join)
SizedQueue
extends Queue
with a maximum capacity. When full, producers block until space becomes available:
buffer = SizedQueue.new(2)
producer = Thread.new do
10.times do |i|
buffer << i
puts "Added #{i} to buffer"
end
end
consumer = Thread.new do
sleep(1) # Simulate slow consumer
10.times do
item = buffer.pop
puts "Processed #{item}"
sleep(0.2)
end
end
Thread-local variables provide each thread with its own copy of a variable, eliminating the need for synchronization:
Thread.current[:user_id] = 123
def current_user_id
Thread.current[:user_id]
end
threads = 3.times.map do |i|
Thread.new do
Thread.current[:user_id] = i + 1
puts "Thread #{Thread.current.object_id}: #{current_user_id}"
end
end
threads.each(&:join)
The Monitor
class provides reentrant locking, allowing the same thread to acquire the lock multiple times:
require 'monitor'
class BankAccount
def initialize(balance)
@balance = balance
@monitor = Monitor.new
end
def withdraw(amount)
@monitor.synchronize do
return false if @balance < amount
@balance -= amount
log_transaction("withdraw", amount)
true
end
end
private
def log_transaction(type, amount)
@monitor.synchronize do # Reentrant - same thread can acquire again
puts "#{type}: #{amount}, balance: #{@balance}"
end
end
end
Advanced Usage
Complex thread-safe patterns often involve multiple synchronization primitives working together. The producer-consumer pattern with condition signaling demonstrates coordinated thread communication:
class BoundedBuffer
def initialize(capacity)
@buffer = []
@capacity = capacity
@mutex = Mutex.new
@not_full = ConditionVariable.new
@not_empty = ConditionVariable.new
end
def put(item)
@mutex.synchronize do
while @buffer.size >= @capacity
@not_full.wait(@mutex)
end
@buffer << item
@not_empty.signal
end
end
def take
@mutex.synchronize do
while @buffer.empty?
@not_empty.wait(@mutex)
end
item = @buffer.shift
@not_full.signal
item
end
end
def size
@mutex.synchronize { @buffer.size }
end
end
buffer = BoundedBuffer.new(5)
producers = 2.times.map do |id|
Thread.new do
10.times do |i|
buffer.put("P#{id}-#{i}")
puts "Producer #{id} added item #{i}"
sleep(rand(0.1))
end
end
end
consumers = 3.times.map do |id|
Thread.new do
loop do
item = buffer.take
puts "Consumer #{id} got: #{item}"
sleep(rand(0.2))
break if item.end_with?("-9") # Stop after last items
end
end
end
Read-write locks allow multiple concurrent readers while ensuring exclusive writer access. Ruby does not include a built-in read-write lock, but can be implemented using condition variables:
class ReadWriteLock
def initialize
@mutex = Mutex.new
@readers_cond = ConditionVariable.new
@writers_cond = ConditionVariable.new
@readers = 0
@writers = 0
@writing = false
end
def read_lock
@mutex.synchronize do
while @writing || @writers > 0
@readers_cond.wait(@mutex)
end
@readers += 1
end
end
def read_unlock
@mutex.synchronize do
@readers -= 1
@writers_cond.signal if @readers == 0
end
end
def write_lock
@mutex.synchronize do
@writers += 1
while @readers > 0 || @writing
@writers_cond.wait(@mutex)
end
@writing = true
@writers -= 1
end
end
def write_unlock
@mutex.synchronize do
@writing = false
@writers_cond.broadcast
@readers_cond.broadcast
end
end
end
Double-checked locking implements lazy initialization in a thread-safe manner. The pattern checks a condition twice to minimize synchronization overhead:
class Singleton
@@instance = nil
@@mutex = Mutex.new
def self.instance
return @@instance if @@instance # First check (unsynchronized)
@@mutex.synchronize do
return @@instance if @@instance # Second check (synchronized)
@@instance = new
end
end
private_class_method :new
end
The thread-safe observer pattern requires careful coordination between subjects and observers:
class ThreadSafeSubject
def initialize
@observers = []
@mutex = Mutex.new
end
def add_observer(observer)
@mutex.synchronize do
@observers << observer unless @observers.include?(observer)
end
end
def remove_observer(observer)
@mutex.synchronize do
@observers.delete(observer)
end
end
def notify_observers(event)
observers_copy = nil
@mutex.synchronize do
observers_copy = @observers.dup
end
# Notify outside the lock to prevent deadlock
observers_copy.each do |observer|
begin
observer.update(self, event)
rescue => e
# Handle observer errors without affecting others
puts "Observer error: #{e.message}"
end
end
end
end
Error Handling & Debugging
Race conditions create non-deterministic bugs that are difficult to reproduce and debug. The primary debugging strategy involves adding logging with thread identification and using consistent locking order to prevent deadlocks:
class DebuggableCounter
def initialize
@count = 0
@mutex = Mutex.new
@debug = true
end
def increment
thread_id = Thread.current.object_id
log("Thread #{thread_id} requesting lock")
@mutex.synchronize do
log("Thread #{thread_id} acquired lock, count: #{@count}")
old_value = @count
sleep(0.001) if @debug # Increase chance of race conditions
@count = old_value + 1
log("Thread #{thread_id} incremented to #{@count}")
end
log("Thread #{thread_id} released lock")
end
private
def log(message)
puts "#{Time.now.strftime('%H:%M:%S.%3N')} - #{message}" if @debug
end
end
Deadlock occurs when threads wait for each other's locks indefinitely. The classic solution is establishing a consistent lock ordering:
class Account
@@next_id = 0
@@id_mutex = Mutex.new
def initialize(balance)
@balance = balance
@mutex = Mutex.new
@@id_mutex.synchronize do
@id = @@next_id += 1
end
end
def transfer(amount, to_account)
# Always acquire locks in ID order to prevent deadlock
first, second = [@id, to_account.instance_variable_get(:@id)].sort
first_account = first == @id ? self : to_account
second_account = first == @id ? to_account : self
first_account.instance_variable_get(:@mutex).synchronize do
second_account.instance_variable_get(:@mutex).synchronize do
if first_account == self
return false if @balance < amount
@balance -= amount
to_account.instance_variable_set(:@balance,
to_account.instance_variable_get(:@balance) + amount)
else
return false if to_account.instance_variable_get(:@balance) < amount
to_account.instance_variable_set(:@balance,
to_account.instance_variable_get(:@balance) - amount)
@balance += amount
end
true
end
end
end
end
Exception handling within synchronized blocks requires careful consideration. Exceptions can leave shared state in inconsistent conditions:
class RobustService
def initialize
@tasks = []
@mutex = Mutex.new
@failed_tasks = []
end
def process_task(task)
@mutex.synchronize do
@tasks << task
begin
result = perform_work(task)
@tasks.delete(task)
result
rescue StandardError => e
# Move failed task to separate collection
@tasks.delete(task)
@failed_tasks << { task: task, error: e, timestamp: Time.now }
# Re-raise to notify caller
raise
end
end
end
def retry_failed_tasks
failed_copy = nil
@mutex.synchronize do
failed_copy = @failed_tasks.dup
@failed_tasks.clear
end
failed_copy.each do |failed|
begin
process_task(failed[:task])
puts "Retry succeeded for task: #{failed[:task]}"
rescue StandardError => e
puts "Retry failed for task: #{failed[:task]}, error: #{e.message}"
end
end
end
private
def perform_work(task)
# Simulate work that might fail
raise "Task failed" if task[:should_fail]
"Result for #{task[:name]}"
end
end
Timeout mechanisms prevent threads from waiting indefinitely. Ruby's Timeout
module works with thread synchronization:
require 'timeout'
class TimeoutQueue
def initialize
@queue = Queue.new
end
def pop_with_timeout(timeout_seconds)
result = nil
thread = Thread.new { result = @queue.pop }
begin
Timeout.timeout(timeout_seconds) do
thread.join
end
result
rescue Timeout::Error
thread.kill
raise "Queue pop timed out after #{timeout_seconds} seconds"
end
end
def push(item)
@queue.push(item)
end
end
Performance & Memory
Thread synchronization introduces performance overhead through lock contention and context switching. Measuring this impact requires careful benchmarking:
require 'benchmark'
class PerformanceCounter
def initialize(use_mutex = true)
@count = 0
@mutex = use_mutex ? Mutex.new : nil
end
def increment
if @mutex
@mutex.synchronize { @count += 1 }
else
@count += 1 # Unsafe but faster
end
end
def value
if @mutex
@mutex.synchronize { @count }
else
@count
end
end
end
# Benchmark different approaches
Benchmark.bm(20) do |x|
iterations = 100_000
x.report("No synchronization") do
counter = PerformanceCounter.new(false)
iterations.times { counter.increment }
end
x.report("Mutex synchronization") do
counter = PerformanceCounter.new(true)
iterations.times { counter.increment }
end
x.report("Atomic operations") do
require 'concurrent'
counter = Concurrent::AtomicFixnum.new(0)
iterations.times { counter.increment }
end
end
Lock-free data structures can improve performance by avoiding mutex overhead. The concurrent-ruby gem provides several lock-free implementations:
require 'concurrent'
# Compare hash performance under concurrent access
def benchmark_hash_access(hash_class, thread_count = 4)
hash = hash_class.new
# Pre-populate
1000.times { |i| hash[i] = "value#{i}" }
start_time = Time.now
threads = thread_count.times.map do |thread_id|
Thread.new do
1000.times do |i|
key = rand(1000)
if rand < 0.5
hash[key] = "updated#{thread_id}-#{i}"
else
hash[key]
end
end
end
end
threads.each(&:join)
Time.now - start_time
end
# Test different hash implementations
puts "Standard Hash: #{benchmark_hash_access(Hash)}"
puts "Concurrent Hash: #{benchmark_hash_access(Concurrent::Hash)}"
puts "Synchronized Hash: #{benchmark_hash_access(Hash.extend(MonitorMixin))}"
Memory usage patterns differ between thread synchronization approaches. Mutexes consume minimal memory, while thread-safe collections may use more memory for internal coordination:
require 'objspace'
def measure_memory_usage(description)
ObjectSpace.garbage_collect
before = ObjectSpace.count_objects
yield
ObjectSpace.garbage_collect
after = ObjectSpace.count_objects
puts "#{description}:"
puts " Objects created: #{after[:TOTAL] - before[:TOTAL]}"
puts " Memory (approx): #{(after[:TOTAL] - before[:TOTAL]) * 40} bytes"
end
# Compare memory usage of different approaches
measure_memory_usage("1000 mutexes") do
1000.times { Mutex.new }
end
measure_memory_usage("1000 concurrent hashes") do
require 'concurrent'
1000.times { Concurrent::Hash.new }
end
measure_memory_usage("1000 standard hashes with monitors") do
require 'monitor'
1000.times { {}.extend(MonitorMixin) }
end
Connection pooling demonstrates how thread safety affects resource management. Proper synchronization ensures efficient resource utilization:
class ConnectionPool
def initialize(size, &connection_factory)
@size = size
@factory = connection_factory
@pool = Queue.new
@created = 0
@mutex = Mutex.new
@checked_out = {}
end
def with_connection
connection = checkout
begin
yield connection
ensure
checkin(connection)
end
end
def checkout
connection = nil
@mutex.synchronize do
if @pool.empty? && @created < @size
connection = @factory.call
@created += 1
end
end
connection ||= @pool.pop
@mutex.synchronize do
@checked_out[connection] = Thread.current
end
connection
end
def checkin(connection)
@mutex.synchronize do
if @checked_out[connection] == Thread.current
@checked_out.delete(connection)
@pool.push(connection)
else
raise "Connection not checked out by current thread"
end
end
end
def stats
@mutex.synchronize do
{
size: @size,
created: @created,
available: @pool.size,
checked_out: @checked_out.size
}
end
end
end
Common Pitfalls
The most common thread safety mistake is assuming operations are atomic when they are not. Even simple assignments can involve multiple steps:
# This looks atomic but isn't
@count += 1
# Equivalent to:
# temp = @count
# temp = temp + 1
# @count = temp
# Multiple threads can interfere between these steps
Sharing mutable objects between threads without proper synchronization leads to data corruption:
# Dangerous: sharing mutable array
shared_array = []
threads = 10.times.map do
Thread.new do
100.times { |i| shared_array << i } # Race condition
end
end
threads.each(&:join)
puts shared_array.size # Often less than 1000
# Safe: use queue for thread-safe sharing
queue = Queue.new
threads = 10.times.map do
Thread.new do
100.times { |i| queue << i }
end
end
threads.each(&:join)
result = []
result << queue.pop until queue.empty?
puts result.size # Always 1000
Double-locking can occur when synchronization code calls other synchronized methods on the same object. This creates potential for deadlock or unnecessary blocking:
class ProblematicClass
def initialize
@mutex = Mutex.new
@data = []
end
def add_item(item)
@mutex.synchronize do
validate_item(item) # This also acquires the same mutex
@data << item
end
end
def validate_item(item)
@mutex.synchronize do # Deadlock if same thread
raise "Invalid item" if item.nil?
end
end
end
# Solution: extract non-synchronized validation
class FixedClass
def initialize
@mutex = Mutex.new
@data = []
end
def add_item(item)
validate_item(item) # Validate outside lock
@mutex.synchronize do
@data << item
end
end
private
def validate_item(item)
raise "Invalid item" if item.nil?
end
end
Using instance variables across threads without synchronization creates visibility problems. Changes made by one thread may not be visible to others:
class VisibilityProblem
def initialize
@flag = false
end
def start_worker
Thread.new do
sleep(1)
@flag = true # Change may not be visible to other threads
puts "Flag set"
end
end
def wait_for_flag
until @flag # May loop forever due to visibility issues
sleep(0.1)
end
puts "Flag detected"
end
end
# Solution: use proper synchronization
class VisibilitySolution
def initialize
@flag = false
@mutex = Mutex.new
@condition = ConditionVariable.new
end
def start_worker
Thread.new do
sleep(1)
@mutex.synchronize do
@flag = true
@condition.signal
end
puts "Flag set"
end
end
def wait_for_flag
@mutex.synchronize do
@condition.wait(@mutex) until @flag
end
puts "Flag detected"
end
end
Exception handling within synchronized blocks requires careful attention. Exceptions can leave locks in unexpected states:
# Problematic: exception leaves object in inconsistent state
class UnsafeProcessor
def initialize
@items = []
@mutex = Mutex.new
end
def process_items
@mutex.synchronize do
@items.each do |item|
process_item(item)
@items.delete(item) # If process_item fails, item remains
end
end
end
private
def process_item(item)
raise "Processing failed" if item == :bad_item
end
end
# Safe: handle exceptions appropriately
class SafeProcessor
def initialize
@items = []
@mutex = Mutex.new
@processed = []
@failed = []
end
def process_items
@mutex.synchronize do
items_to_process = @items.dup
@items.clear
items_to_process.each do |item|
begin
process_item(item)
@processed << item
rescue StandardError => e
@failed << { item: item, error: e }
end
end
end
end
end
Reference
Core Classes and Methods
Class | Purpose | Thread Safety |
---|---|---|
Thread |
Basic thread creation and management | Individual threads are isolated |
Mutex |
Mutual exclusion locking | Thread-safe by design |
Monitor |
Reentrant mutual exclusion | Thread-safe, allows nested locking |
ConditionVariable |
Thread coordination and signaling | Must be used with Mutex |
Queue |
Thread-safe FIFO queue | Fully thread-safe |
SizedQueue |
Bounded thread-safe queue | Fully thread-safe |
Mutex Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new |
None | Mutex |
Creates new mutex |
#lock |
None | self |
Acquires exclusive lock |
#unlock |
None | self |
Releases lock |
#try_lock |
None | Boolean |
Attempts non-blocking lock |
#synchronize(&block) |
Block | Block result | Executes block with lock held |
#locked? |
None | Boolean |
Returns lock status |
#owned? |
None | Boolean |
Returns true if current thread owns lock |
Queue Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new |
None | Queue |
Creates unbounded queue |
#push(obj) |
Object | self |
Adds item to queue |
#<<(obj) |
Object | self |
Alias for push |
#pop(non_block=false) |
Boolean | Object | Removes and returns item |
#shift(non_block=false) |
Boolean | Object | Alias for pop |
#empty? |
None | Boolean |
Returns true if queue is empty |
#size |
None | Integer |
Returns number of items |
#length |
None | Integer |
Alias for size |
#clear |
None | self |
Removes all items |
SizedQueue Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new(max) |
Integer | SizedQueue |
Creates bounded queue |
#max |
None | Integer |
Returns maximum size |
#max=(size) |
Integer | Integer | Sets maximum size |
#push(obj, non_block=false) |
Object, Boolean | self |
Adds item, blocks if full |
#pop(non_block=false) |
Boolean | Object | Removes item, blocks if empty |
ConditionVariable Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new |
None | ConditionVariable |
Creates condition variable |
#wait(mutex, timeout=nil) |
Mutex, Numeric | self |
Waits for signal |
#signal |
None | self |
Wakes one waiting thread |
#broadcast |
None | self |
Wakes all waiting threads |
Thread Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Thread.new(&block) |
Block | Thread |
Creates and starts new thread |
Thread.current |
None | Thread |
Returns current thread |
Thread.list |
None | Array |
Returns all living threads |
Thread.kill(thread) |
Thread | Thread | Terminates specified thread |
#join(limit=nil) |
Numeric | Thread or nil | Waits for thread completion |
#alive? |
None | Boolean |
Returns true if thread is running |
#stop? |
None | Boolean |
Returns true if thread is stopped |
#kill |
None | Thread | Terminates current thread |
#value |
None | Object | Returns thread's return value |
Thread-Local Variables
Method | Parameters | Returns | Description |
---|---|---|---|
Thread.current[:key] |
Symbol/String | Object | Gets thread-local value |
Thread.current[:key] = value |
Symbol/String, Object | Object | Sets thread-local value |
#thread_variable_get(key) |
String | Object | Gets thread variable |
#thread_variable_set(key, value) |
String, Object | Object | Sets thread variable |
Common Patterns
Initialization Pattern:
@@instance = nil
@@mutex = Mutex.new
def self.instance
return @@instance if @@instance
@@mutex.synchronize do
@@instance ||= new
end
end
Producer-Consumer Pattern:
queue = Queue.new
producer = Thread.new { loop { queue << produce_item } }
consumer = Thread.new { loop { process_item(queue.pop) } }
Worker Pool Pattern:
work_queue = Queue.new
workers = Array.new(4) { Thread.new { loop { process(work_queue.pop) } } }
Exception Classes
Exception | Cause | Prevention |
---|---|---|
ThreadError |
Thread operation failed | Check thread state before operations |
Timeout::Error |
Operation exceeded time limit | Set appropriate timeouts |
DeadlockError |
All threads waiting for locks | Use consistent lock ordering |
StopIteration |
Queue closed during iteration | Handle queue lifecycle properly |
Performance Considerations
Operation | Cost | Alternative |
---|---|---|
Mutex synchronization | Medium | Lock-free data structures |
Queue operations | Low | Direct variable access (unsafe) |
Thread creation | High | Thread pools |
Context switching | Medium | Reduce thread count |
Lock contention | High | Reduce critical section size |