CrackedRuby logo

CrackedRuby

Thread Safety

Overview

Thread safety in Ruby refers to code that functions correctly when accessed by multiple threads simultaneously. Ruby provides several mechanisms for writing thread-safe code, including mutexes, condition variables, atomic operations, and thread-safe data structures. The Global Interpreter Lock (GIL) in MRI Ruby prevents true parallelism for CPU-bound tasks but still requires careful synchronization for shared mutable state.

Ruby's threading model creates race conditions when multiple threads access and modify shared data without proper synchronization. A race condition occurs when the program's behavior depends on the relative timing of thread execution. Consider this unsafe counter example:

class UnsafeCounter
  def initialize
    @count = 0
  end
  
  def increment
    @count += 1  # Not atomic: read, add, write
  end
  
  def value
    @count
  end
end

counter = UnsafeCounter.new
threads = 10.times.map do
  Thread.new { 1000.times { counter.increment } }
end
threads.each(&:join)

puts counter.value  # Often less than 10,000 due to race conditions

The Thread class provides the foundation for concurrent execution. Ruby includes several synchronization primitives in the standard library: Mutex for mutual exclusion, ConditionVariable for thread coordination, Queue and SizedQueue for thread-safe communication, and Monitor for reentrant locking.

Ruby also provides Concurrent:: classes through the concurrent-ruby gem, offering modern thread-safe data structures and synchronization tools. These include Concurrent::Hash, Concurrent::Array, Concurrent::Atom, and various executor services.

The key principle in thread safety is controlling access to shared mutable state. Immutable objects are inherently thread-safe since they cannot change after creation. When mutation is necessary, synchronization ensures only one thread modifies data at a time.

Basic Usage

The Mutex class provides the most common synchronization mechanism. A mutex ensures exclusive access to a critical section of code. Only one thread can hold a mutex lock at any time:

require 'thread'

class SafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize do
      @count += 1
    end
  end
  
  def value
    @mutex.synchronize do
      @count
    end
  end
end

The synchronize method acquires the lock, executes the block, and releases the lock even if an exception occurs. This pattern prevents race conditions by serializing access to the @count variable.

Queue provides thread-safe communication between threads. Producers push items while consumers pop them. The queue handles synchronization internally:

queue = Queue.new

producer = Thread.new do
  5.times do |i|
    queue << "item #{i}"
    puts "Produced: item #{i}"
    sleep(0.1)
  end
end

consumer = Thread.new do
  5.times do
    item = queue.pop
    puts "Consumed: #{item}"
  end
end

[producer, consumer].each(&:join)

SizedQueue extends Queue with a maximum capacity. When full, producers block until space becomes available:

buffer = SizedQueue.new(2)

producer = Thread.new do
  10.times do |i|
    buffer << i
    puts "Added #{i} to buffer"
  end
end

consumer = Thread.new do
  sleep(1)  # Simulate slow consumer
  10.times do
    item = buffer.pop
    puts "Processed #{item}"
    sleep(0.2)
  end
end

Thread-local variables provide each thread with its own copy of a variable, eliminating the need for synchronization:

Thread.current[:user_id] = 123

def current_user_id
  Thread.current[:user_id]
end

threads = 3.times.map do |i|
  Thread.new do
    Thread.current[:user_id] = i + 1
    puts "Thread #{Thread.current.object_id}: #{current_user_id}"
  end
end
threads.each(&:join)

The Monitor class provides reentrant locking, allowing the same thread to acquire the lock multiple times:

require 'monitor'

class BankAccount
  def initialize(balance)
    @balance = balance
    @monitor = Monitor.new
  end
  
  def withdraw(amount)
    @monitor.synchronize do
      return false if @balance < amount
      @balance -= amount
      log_transaction("withdraw", amount)
      true
    end
  end
  
  private
  
  def log_transaction(type, amount)
    @monitor.synchronize do  # Reentrant - same thread can acquire again
      puts "#{type}: #{amount}, balance: #{@balance}"
    end
  end
end

Advanced Usage

Complex thread-safe patterns often involve multiple synchronization primitives working together. The producer-consumer pattern with condition signaling demonstrates coordinated thread communication:

class BoundedBuffer
  def initialize(capacity)
    @buffer = []
    @capacity = capacity
    @mutex = Mutex.new
    @not_full = ConditionVariable.new
    @not_empty = ConditionVariable.new
  end
  
  def put(item)
    @mutex.synchronize do
      while @buffer.size >= @capacity
        @not_full.wait(@mutex)
      end
      
      @buffer << item
      @not_empty.signal
    end
  end
  
  def take
    @mutex.synchronize do
      while @buffer.empty?
        @not_empty.wait(@mutex)
      end
      
      item = @buffer.shift
      @not_full.signal
      item
    end
  end
  
  def size
    @mutex.synchronize { @buffer.size }
  end
end

buffer = BoundedBuffer.new(5)

producers = 2.times.map do |id|
  Thread.new do
    10.times do |i|
      buffer.put("P#{id}-#{i}")
      puts "Producer #{id} added item #{i}"
      sleep(rand(0.1))
    end
  end
end

consumers = 3.times.map do |id|
  Thread.new do
    loop do
      item = buffer.take
      puts "Consumer #{id} got: #{item}"
      sleep(rand(0.2))
      break if item.end_with?("-9")  # Stop after last items
    end
  end
end

Read-write locks allow multiple concurrent readers while ensuring exclusive writer access. Ruby does not include a built-in read-write lock, but can be implemented using condition variables:

class ReadWriteLock
  def initialize
    @mutex = Mutex.new
    @readers_cond = ConditionVariable.new
    @writers_cond = ConditionVariable.new
    @readers = 0
    @writers = 0
    @writing = false
  end
  
  def read_lock
    @mutex.synchronize do
      while @writing || @writers > 0
        @readers_cond.wait(@mutex)
      end
      @readers += 1
    end
  end
  
  def read_unlock
    @mutex.synchronize do
      @readers -= 1
      @writers_cond.signal if @readers == 0
    end
  end
  
  def write_lock
    @mutex.synchronize do
      @writers += 1
      while @readers > 0 || @writing
        @writers_cond.wait(@mutex)
      end
      @writing = true
      @writers -= 1
    end
  end
  
  def write_unlock
    @mutex.synchronize do
      @writing = false
      @writers_cond.broadcast
      @readers_cond.broadcast
    end
  end
end

Double-checked locking implements lazy initialization in a thread-safe manner. The pattern checks a condition twice to minimize synchronization overhead:

class Singleton
  @@instance = nil
  @@mutex = Mutex.new
  
  def self.instance
    return @@instance if @@instance  # First check (unsynchronized)
    
    @@mutex.synchronize do
      return @@instance if @@instance  # Second check (synchronized)
      @@instance = new
    end
  end
  
  private_class_method :new
end

The thread-safe observer pattern requires careful coordination between subjects and observers:

class ThreadSafeSubject
  def initialize
    @observers = []
    @mutex = Mutex.new
  end
  
  def add_observer(observer)
    @mutex.synchronize do
      @observers << observer unless @observers.include?(observer)
    end
  end
  
  def remove_observer(observer)
    @mutex.synchronize do
      @observers.delete(observer)
    end
  end
  
  def notify_observers(event)
    observers_copy = nil
    @mutex.synchronize do
      observers_copy = @observers.dup
    end
    
    # Notify outside the lock to prevent deadlock
    observers_copy.each do |observer|
      begin
        observer.update(self, event)
      rescue => e
        # Handle observer errors without affecting others
        puts "Observer error: #{e.message}"
      end
    end
  end
end

Error Handling & Debugging

Race conditions create non-deterministic bugs that are difficult to reproduce and debug. The primary debugging strategy involves adding logging with thread identification and using consistent locking order to prevent deadlocks:

class DebuggableCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
    @debug = true
  end
  
  def increment
    thread_id = Thread.current.object_id
    log("Thread #{thread_id} requesting lock")
    
    @mutex.synchronize do
      log("Thread #{thread_id} acquired lock, count: #{@count}")
      old_value = @count
      sleep(0.001) if @debug  # Increase chance of race conditions
      @count = old_value + 1
      log("Thread #{thread_id} incremented to #{@count}")
    end
    
    log("Thread #{thread_id} released lock")
  end
  
  private
  
  def log(message)
    puts "#{Time.now.strftime('%H:%M:%S.%3N')} - #{message}" if @debug
  end
end

Deadlock occurs when threads wait for each other's locks indefinitely. The classic solution is establishing a consistent lock ordering:

class Account
  @@next_id = 0
  @@id_mutex = Mutex.new
  
  def initialize(balance)
    @balance = balance
    @mutex = Mutex.new
    
    @@id_mutex.synchronize do
      @id = @@next_id += 1
    end
  end
  
  def transfer(amount, to_account)
    # Always acquire locks in ID order to prevent deadlock
    first, second = [@id, to_account.instance_variable_get(:@id)].sort
    first_account = first == @id ? self : to_account
    second_account = first == @id ? to_account : self
    
    first_account.instance_variable_get(:@mutex).synchronize do
      second_account.instance_variable_get(:@mutex).synchronize do
        if first_account == self
          return false if @balance < amount
          @balance -= amount
          to_account.instance_variable_set(:@balance, 
            to_account.instance_variable_get(:@balance) + amount)
        else
          return false if to_account.instance_variable_get(:@balance) < amount
          to_account.instance_variable_set(:@balance, 
            to_account.instance_variable_get(:@balance) - amount)
          @balance += amount
        end
        true
      end
    end
  end
end

Exception handling within synchronized blocks requires careful consideration. Exceptions can leave shared state in inconsistent conditions:

class RobustService
  def initialize
    @tasks = []
    @mutex = Mutex.new
    @failed_tasks = []
  end
  
  def process_task(task)
    @mutex.synchronize do
      @tasks << task
      
      begin
        result = perform_work(task)
        @tasks.delete(task)
        result
      rescue StandardError => e
        # Move failed task to separate collection
        @tasks.delete(task)
        @failed_tasks << { task: task, error: e, timestamp: Time.now }
        
        # Re-raise to notify caller
        raise
      end
    end
  end
  
  def retry_failed_tasks
    failed_copy = nil
    @mutex.synchronize do
      failed_copy = @failed_tasks.dup
      @failed_tasks.clear
    end
    
    failed_copy.each do |failed|
      begin
        process_task(failed[:task])
        puts "Retry succeeded for task: #{failed[:task]}"
      rescue StandardError => e
        puts "Retry failed for task: #{failed[:task]}, error: #{e.message}"
      end
    end
  end
  
  private
  
  def perform_work(task)
    # Simulate work that might fail
    raise "Task failed" if task[:should_fail]
    "Result for #{task[:name]}"
  end
end

Timeout mechanisms prevent threads from waiting indefinitely. Ruby's Timeout module works with thread synchronization:

require 'timeout'

class TimeoutQueue
  def initialize
    @queue = Queue.new
  end
  
  def pop_with_timeout(timeout_seconds)
    result = nil
    thread = Thread.new { result = @queue.pop }
    
    begin
      Timeout.timeout(timeout_seconds) do
        thread.join
      end
      result
    rescue Timeout::Error
      thread.kill
      raise "Queue pop timed out after #{timeout_seconds} seconds"
    end
  end
  
  def push(item)
    @queue.push(item)
  end
end

Performance & Memory

Thread synchronization introduces performance overhead through lock contention and context switching. Measuring this impact requires careful benchmarking:

require 'benchmark'

class PerformanceCounter
  def initialize(use_mutex = true)
    @count = 0
    @mutex = use_mutex ? Mutex.new : nil
  end
  
  def increment
    if @mutex
      @mutex.synchronize { @count += 1 }
    else
      @count += 1  # Unsafe but faster
    end
  end
  
  def value
    if @mutex
      @mutex.synchronize { @count }
    else
      @count
    end
  end
end

# Benchmark different approaches
Benchmark.bm(20) do |x|
  iterations = 100_000
  
  x.report("No synchronization") do
    counter = PerformanceCounter.new(false)
    iterations.times { counter.increment }
  end
  
  x.report("Mutex synchronization") do
    counter = PerformanceCounter.new(true)
    iterations.times { counter.increment }
  end
  
  x.report("Atomic operations") do
    require 'concurrent'
    counter = Concurrent::AtomicFixnum.new(0)
    iterations.times { counter.increment }
  end
end

Lock-free data structures can improve performance by avoiding mutex overhead. The concurrent-ruby gem provides several lock-free implementations:

require 'concurrent'

# Compare hash performance under concurrent access
def benchmark_hash_access(hash_class, thread_count = 4)
  hash = hash_class.new
  
  # Pre-populate
  1000.times { |i| hash[i] = "value#{i}" }
  
  start_time = Time.now
  
  threads = thread_count.times.map do |thread_id|
    Thread.new do
      1000.times do |i|
        key = rand(1000)
        if rand < 0.5
          hash[key] = "updated#{thread_id}-#{i}"
        else
          hash[key]
        end
      end
    end
  end
  
  threads.each(&:join)
  Time.now - start_time
end

# Test different hash implementations
puts "Standard Hash: #{benchmark_hash_access(Hash)}"
puts "Concurrent Hash: #{benchmark_hash_access(Concurrent::Hash)}"
puts "Synchronized Hash: #{benchmark_hash_access(Hash.extend(MonitorMixin))}"

Memory usage patterns differ between thread synchronization approaches. Mutexes consume minimal memory, while thread-safe collections may use more memory for internal coordination:

require 'objspace'

def measure_memory_usage(description)
  ObjectSpace.garbage_collect
  before = ObjectSpace.count_objects
  
  yield
  
  ObjectSpace.garbage_collect
  after = ObjectSpace.count_objects
  
  puts "#{description}:"
  puts "  Objects created: #{after[:TOTAL] - before[:TOTAL]}"
  puts "  Memory (approx): #{(after[:TOTAL] - before[:TOTAL]) * 40} bytes"
end

# Compare memory usage of different approaches
measure_memory_usage("1000 mutexes") do
  1000.times { Mutex.new }
end

measure_memory_usage("1000 concurrent hashes") do
  require 'concurrent'
  1000.times { Concurrent::Hash.new }
end

measure_memory_usage("1000 standard hashes with monitors") do
  require 'monitor'
  1000.times { {}.extend(MonitorMixin) }
end

Connection pooling demonstrates how thread safety affects resource management. Proper synchronization ensures efficient resource utilization:

class ConnectionPool
  def initialize(size, &connection_factory)
    @size = size
    @factory = connection_factory
    @pool = Queue.new
    @created = 0
    @mutex = Mutex.new
    @checked_out = {}
  end
  
  def with_connection
    connection = checkout
    begin
      yield connection
    ensure
      checkin(connection)
    end
  end
  
  def checkout
    connection = nil
    @mutex.synchronize do
      if @pool.empty? && @created < @size
        connection = @factory.call
        @created += 1
      end
    end
    
    connection ||= @pool.pop
    @mutex.synchronize do
      @checked_out[connection] = Thread.current
    end
    connection
  end
  
  def checkin(connection)
    @mutex.synchronize do
      if @checked_out[connection] == Thread.current
        @checked_out.delete(connection)
        @pool.push(connection)
      else
        raise "Connection not checked out by current thread"
      end
    end
  end
  
  def stats
    @mutex.synchronize do
      {
        size: @size,
        created: @created,
        available: @pool.size,
        checked_out: @checked_out.size
      }
    end
  end
end

Common Pitfalls

The most common thread safety mistake is assuming operations are atomic when they are not. Even simple assignments can involve multiple steps:

# This looks atomic but isn't
@count += 1

# Equivalent to:
# temp = @count
# temp = temp + 1  
# @count = temp

# Multiple threads can interfere between these steps

Sharing mutable objects between threads without proper synchronization leads to data corruption:

# Dangerous: sharing mutable array
shared_array = []
threads = 10.times.map do
  Thread.new do
    100.times { |i| shared_array << i }  # Race condition
  end
end
threads.each(&:join)
puts shared_array.size  # Often less than 1000

# Safe: use queue for thread-safe sharing
queue = Queue.new
threads = 10.times.map do
  Thread.new do
    100.times { |i| queue << i }
  end
end
threads.each(&:join)

result = []
result << queue.pop until queue.empty?
puts result.size  # Always 1000

Double-locking can occur when synchronization code calls other synchronized methods on the same object. This creates potential for deadlock or unnecessary blocking:

class ProblematicClass
  def initialize
    @mutex = Mutex.new
    @data = []
  end
  
  def add_item(item)
    @mutex.synchronize do
      validate_item(item)  # This also acquires the same mutex
      @data << item
    end
  end
  
  def validate_item(item)
    @mutex.synchronize do  # Deadlock if same thread
      raise "Invalid item" if item.nil?
    end
  end
end

# Solution: extract non-synchronized validation
class FixedClass
  def initialize
    @mutex = Mutex.new
    @data = []
  end
  
  def add_item(item)
    validate_item(item)  # Validate outside lock
    @mutex.synchronize do
      @data << item
    end
  end
  
  private
  
  def validate_item(item)
    raise "Invalid item" if item.nil?
  end
end

Using instance variables across threads without synchronization creates visibility problems. Changes made by one thread may not be visible to others:

class VisibilityProblem
  def initialize
    @flag = false
  end
  
  def start_worker
    Thread.new do
      sleep(1)
      @flag = true  # Change may not be visible to other threads
      puts "Flag set"
    end
  end
  
  def wait_for_flag
    until @flag  # May loop forever due to visibility issues
      sleep(0.1)
    end
    puts "Flag detected"
  end
end

# Solution: use proper synchronization
class VisibilitySolution
  def initialize
    @flag = false
    @mutex = Mutex.new
    @condition = ConditionVariable.new
  end
  
  def start_worker
    Thread.new do
      sleep(1)
      @mutex.synchronize do
        @flag = true
        @condition.signal
      end
      puts "Flag set"
    end
  end
  
  def wait_for_flag
    @mutex.synchronize do
      @condition.wait(@mutex) until @flag
    end
    puts "Flag detected"
  end
end

Exception handling within synchronized blocks requires careful attention. Exceptions can leave locks in unexpected states:

# Problematic: exception leaves object in inconsistent state
class UnsafeProcessor
  def initialize
    @items = []
    @mutex = Mutex.new
  end
  
  def process_items
    @mutex.synchronize do
      @items.each do |item|
        process_item(item)
        @items.delete(item)  # If process_item fails, item remains
      end
    end
  end
  
  private
  
  def process_item(item)
    raise "Processing failed" if item == :bad_item
  end
end

# Safe: handle exceptions appropriately
class SafeProcessor
  def initialize
    @items = []
    @mutex = Mutex.new
    @processed = []
    @failed = []
  end
  
  def process_items
    @mutex.synchronize do
      items_to_process = @items.dup
      @items.clear
      
      items_to_process.each do |item|
        begin
          process_item(item)
          @processed << item
        rescue StandardError => e
          @failed << { item: item, error: e }
        end
      end
    end
  end
end

Reference

Core Classes and Methods

Class Purpose Thread Safety
Thread Basic thread creation and management Individual threads are isolated
Mutex Mutual exclusion locking Thread-safe by design
Monitor Reentrant mutual exclusion Thread-safe, allows nested locking
ConditionVariable Thread coordination and signaling Must be used with Mutex
Queue Thread-safe FIFO queue Fully thread-safe
SizedQueue Bounded thread-safe queue Fully thread-safe

Mutex Methods

Method Parameters Returns Description
#new None Mutex Creates new mutex
#lock None self Acquires exclusive lock
#unlock None self Releases lock
#try_lock None Boolean Attempts non-blocking lock
#synchronize(&block) Block Block result Executes block with lock held
#locked? None Boolean Returns lock status
#owned? None Boolean Returns true if current thread owns lock

Queue Methods

Method Parameters Returns Description
#new None Queue Creates unbounded queue
#push(obj) Object self Adds item to queue
#<<(obj) Object self Alias for push
#pop(non_block=false) Boolean Object Removes and returns item
#shift(non_block=false) Boolean Object Alias for pop
#empty? None Boolean Returns true if queue is empty
#size None Integer Returns number of items
#length None Integer Alias for size
#clear None self Removes all items

SizedQueue Methods

Method Parameters Returns Description
#new(max) Integer SizedQueue Creates bounded queue
#max None Integer Returns maximum size
#max=(size) Integer Integer Sets maximum size
#push(obj, non_block=false) Object, Boolean self Adds item, blocks if full
#pop(non_block=false) Boolean Object Removes item, blocks if empty

ConditionVariable Methods

Method Parameters Returns Description
#new None ConditionVariable Creates condition variable
#wait(mutex, timeout=nil) Mutex, Numeric self Waits for signal
#signal None self Wakes one waiting thread
#broadcast None self Wakes all waiting threads

Thread Methods

Method Parameters Returns Description
Thread.new(&block) Block Thread Creates and starts new thread
Thread.current None Thread Returns current thread
Thread.list None Array Returns all living threads
Thread.kill(thread) Thread Thread Terminates specified thread
#join(limit=nil) Numeric Thread or nil Waits for thread completion
#alive? None Boolean Returns true if thread is running
#stop? None Boolean Returns true if thread is stopped
#kill None Thread Terminates current thread
#value None Object Returns thread's return value

Thread-Local Variables

Method Parameters Returns Description
Thread.current[:key] Symbol/String Object Gets thread-local value
Thread.current[:key] = value Symbol/String, Object Object Sets thread-local value
#thread_variable_get(key) String Object Gets thread variable
#thread_variable_set(key, value) String, Object Object Sets thread variable

Common Patterns

Initialization Pattern:

@@instance = nil
@@mutex = Mutex.new

def self.instance
  return @@instance if @@instance
  @@mutex.synchronize do
    @@instance ||= new
  end
end

Producer-Consumer Pattern:

queue = Queue.new
producer = Thread.new { loop { queue << produce_item } }
consumer = Thread.new { loop { process_item(queue.pop) } }

Worker Pool Pattern:

work_queue = Queue.new
workers = Array.new(4) { Thread.new { loop { process(work_queue.pop) } } }

Exception Classes

Exception Cause Prevention
ThreadError Thread operation failed Check thread state before operations
Timeout::Error Operation exceeded time limit Set appropriate timeouts
DeadlockError All threads waiting for locks Use consistent lock ordering
StopIteration Queue closed during iteration Handle queue lifecycle properly

Performance Considerations

Operation Cost Alternative
Mutex synchronization Medium Lock-free data structures
Queue operations Low Direct variable access (unsafe)
Thread creation High Thread pools
Context switching Medium Reduce thread count
Lock contention High Reduce critical section size