CrackedRuby logo

CrackedRuby

Mutex

Ruby's Mutex class provides mutual exclusion locks for synchronizing access to shared resources in multi-threaded applications.

Concurrency and Parallelism Synchronization
6.4.1

Overview

Mutex implements a mutual exclusion lock that prevents multiple threads from simultaneously accessing the same resource. Ruby's Mutex class provides the fundamental synchronization primitive for thread-safe programming, ensuring that only one thread can hold the lock at any given time.

The Mutex class operates on a simple principle: when a thread acquires a mutex, other threads attempting to acquire the same mutex will block until the first thread releases it. This mechanism prevents race conditions and ensures data consistency in concurrent applications.

Ruby's Mutex implementation is built into the standard library and integrates with the Ruby thread scheduler. The class provides both blocking and non-blocking acquisition methods, along with automatic unlock handling through block-based synchronization.

# Basic mutex creation and usage
mutex = Mutex.new

# Thread-safe counter using mutex
counter = 0
mutex = Mutex.new

threads = 10.times.map do
  Thread.new do
    1000.times do
      mutex.synchronize { counter += 1 }
    end
  end
end

threads.each(&:join)
puts counter
# => 10000 (always consistent)

The primary use cases for Mutex include protecting shared variables, synchronizing access to files or network resources, implementing thread-safe data structures, and coordinating complex multi-threaded operations. Mutex works with Ruby's Thread class and forms the foundation for higher-level synchronization constructs.

# Protecting shared resource access
class ThreadSafeLogger
  def initialize
    @mutex = Mutex.new
    @log_entries = []
  end

  def log(message)
    @mutex.synchronize do
      @log_entries << "#{Time.now}: #{message}"
      File.write('app.log', @log_entries.last + "\n", mode: 'a')
    end
  end
end

Mutex integrates with Ruby's exception handling, automatically releasing locks when exceptions occur within synchronized blocks. This prevents deadlock situations that could arise from unreleased locks due to unexpected errors.

Basic Usage

Creating a Mutex requires calling the constructor without arguments. Each Mutex instance represents a separate lock, and threads compete for access to individual mutex objects rather than a global locking mechanism.

# Creating and using a basic mutex
mutex = Mutex.new

# Manual lock/unlock pattern
mutex.lock
begin
  # Critical section code
  shared_data = modify_shared_resource()
ensure
  mutex.unlock
end

The synchronize method provides the preferred approach for mutex usage. This method automatically handles lock acquisition and release, including proper cleanup when exceptions occur within the synchronized block.

# Recommended synchronize pattern
mutex = Mutex.new
shared_array = []

10.times do |i|
  Thread.new do
    mutex.synchronize do
      # Critical section is automatically protected
      shared_array << "Thread #{i} data"
      sleep(0.1)  # Simulate work
      puts "Added data from thread #{i}"
    end
  end
end

The try_lock method attempts to acquire the mutex without blocking. This method returns true if the lock was successfully acquired, or false if another thread currently holds the mutex. This pattern enables non-blocking synchronization strategies.

# Non-blocking mutex acquisition
mutex = Mutex.new
work_queue = []

worker_thread = Thread.new do
  loop do
    if mutex.try_lock
      begin
        unless work_queue.empty?
          item = work_queue.shift
          process_item(item)
        end
      ensure
        mutex.unlock
      end
    else
      # Mutex busy, do other work
      perform_background_tasks()
    end
    sleep(0.01)
  end
end

The locked? method returns the current lock status without attempting to acquire the mutex. This method proves useful for debugging and monitoring mutex states, though it should not be used for synchronization decisions due to race conditions between checking and acting.

# Checking mutex status
mutex = Mutex.new

Thread.new do
  mutex.lock
  puts "Lock acquired: #{mutex.locked?}"  # => true
  sleep(2)
  mutex.unlock
end

sleep(0.1)
puts "Lock status from main thread: #{mutex.locked?}"  # => true

Multiple mutexes can coordinate complex synchronization scenarios. However, acquiring multiple mutexes requires careful ordering to prevent deadlock situations where threads wait indefinitely for each other's locks.

# Multiple mutex coordination (careful ordering required)
mutex_a = Mutex.new
mutex_b = Mutex.new

# Always acquire mutexes in consistent order
def transfer_data(from, to, mutex_a, mutex_b)
  # Acquire locks in consistent order regardless of parameters
  first_mutex, second_mutex = [mutex_a, mutex_b].sort_by(&:object_id)
  
  first_mutex.synchronize do
    second_mutex.synchronize do
      # Safe to access both resources
      data = from.extract_data
      to.store_data(data)
    end
  end
end

Thread Safety & Concurrency

Mutex prevents race conditions by ensuring atomic access to shared resources. Race conditions occur when multiple threads access shared data simultaneously, leading to inconsistent or corrupted state. Mutex eliminates these conditions by serializing access to critical sections.

# Demonstrating race condition without mutex
class UnsafeCounter
  def initialize
    @count = 0
  end

  def increment
    # Race condition: read-modify-write not atomic
    current = @count
    @count = current + 1
  end

  def value
    @count
  end
end

# With proper mutex protection
class SafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end

  def increment
    @mutex.synchronize do
      @count += 1  # Now atomic
    end
  end

  def value
    @mutex.synchronize { @count }
  end
end

Deadlock prevention requires establishing consistent lock ordering when multiple mutexes are involved. Deadlocks occur when threads hold locks while waiting for other locks, creating circular dependencies. Ruby provides no automatic deadlock detection, making prevention crucial.

# Deadlock-safe resource management
class BankAccount
  def initialize(balance, id)
    @balance = balance
    @id = id
    @mutex = Mutex.new
  end

  def transfer_to(other_account, amount)
    # Prevent deadlock by consistent ordering
    first_account, second_account = [@id, other_account.instance_variable_get(:@id)].min == @id ? 
                                   [self, other_account] : [other_account, self]
    
    first_account.instance_variable_get(:@mutex).synchronize do
      second_account.instance_variable_get(:@mutex).synchronize do
        if @balance >= amount
          @balance -= amount
          other_account.instance_variable_get(:@balance) += amount
          true
        else
          false
        end
      end
    end
  end
end

Reader-writer patterns require careful consideration with Mutex, as standard mutexes provide exclusive access regardless of operation type. Multiple readers can safely access data simultaneously, but writers require exclusive access.

# Reader-writer coordination with multiple mutexes
class ThreadSafeCache
  def initialize
    @cache = {}
    @read_mutex = Mutex.new
    @write_mutex = Mutex.new
    @reader_count = 0
    @reader_count_mutex = Mutex.new
  end

  def read(key)
    @reader_count_mutex.synchronize do
      @reader_count += 1
      @read_mutex.lock if @reader_count == 1
    end

    begin
      @cache[key]
    ensure
      @reader_count_mutex.synchronize do
        @reader_count -= 1
        @read_mutex.unlock if @reader_count == 0
      end
    end
  end

  def write(key, value)
    @write_mutex.synchronize do
      @read_mutex.synchronize do
        @cache[key] = value
      end
    end
  end
end

Timeout handling prevents indefinite blocking when acquiring mutexes. While Mutex doesn't provide built-in timeout functionality, combining try_lock with timing mechanisms creates timeout behavior for critical applications.

# Mutex with timeout simulation
class TimeoutMutex
  def initialize
    @mutex = Mutex.new
  end

  def synchronize_with_timeout(timeout_seconds)
    start_time = Time.now
    acquired = false

    until acquired || (Time.now - start_time) > timeout_seconds
      acquired = @mutex.try_lock
      sleep(0.001) unless acquired
    end

    if acquired
      begin
        yield
      ensure
        @mutex.unlock
      end
    else
      raise Timeout::Error, "Failed to acquire mutex within #{timeout_seconds} seconds"
    end
  end
end

# Usage with timeout protection
timeout_mutex = TimeoutMutex.new

begin
  timeout_mutex.synchronize_with_timeout(5.0) do
    # Critical section with 5-second timeout
    perform_critical_operation()
  end
rescue Timeout::Error
  puts "Operation timed out - mutex was busy"
end

Performance & Memory

Mutex operations introduce overhead through system calls and context switching. Each synchronize call requires acquiring and releasing the lock, which involves kernel-level operations on most platforms. Understanding this overhead helps optimize critical performance paths.

Lock contention occurs when multiple threads frequently compete for the same mutex, causing threads to block and reducing parallel execution benefits. High contention scenarios require redesigning synchronization strategies rather than simply adding more locks.

# Measuring mutex overhead
require 'benchmark'

counter = 0
mutex = Mutex.new

# Without mutex (unsafe but fast)
time_without_mutex = Benchmark.measure do
  1000.times { counter += 1 }
end

counter = 0

# With mutex (safe but slower)
time_with_mutex = Benchmark.measure do
  1000.times do
    mutex.synchronize { counter += 1 }
  end
end

puts "Without mutex: #{time_without_mutex.real * 1000}ms"
puts "With mutex: #{time_with_mutex.real * 1000}ms"
puts "Overhead: #{((time_with_mutex.real / time_without_mutex.real) - 1) * 100}%"

Lock-free alternatives can eliminate mutex overhead in specific scenarios. Atomic operations, immutable data structures, and thread-local storage reduce synchronization requirements while maintaining thread safety.

# Reducing lock contention with partitioning
class PartitionedCounter
  def initialize(partitions = 4)
    @partitions = partitions
    @counters = Array.new(partitions) { { value: 0, mutex: Mutex.new } }
  end

  def increment
    # Distribute load across partitions
    partition = Thread.current.object_id % @partitions
    counter_data = @counters[partition]
    
    counter_data[:mutex].synchronize do
      counter_data[:value] += 1
    end
  end

  def total
    @counters.sum { |data| data[:mutex].synchronize { data[:value] } }
  end
end

Memory usage for Mutex objects remains minimal, but accumulated lock contention can cause memory pressure through thread stack buildup. Blocked threads consume stack space while waiting for mutex acquisition, potentially leading to memory exhaustion under extreme contention.

# Monitoring mutex contention effects
class ContentionMonitor
  def initialize
    @mutex = Mutex.new
    @wait_times = []
    @wait_mutex = Mutex.new
  end

  def synchronized_operation
    start_time = Time.now
    
    @mutex.synchronize do
      wait_time = Time.now - start_time
      @wait_mutex.synchronize { @wait_times << wait_time }
      
      # Actual work
      yield if block_given?
    end
  end

  def average_wait_time
    @wait_mutex.synchronize do
      return 0.0 if @wait_times.empty?
      @wait_times.sum / @wait_times.length
    end
  end

  def max_wait_time
    @wait_mutex.synchronize { @wait_times.max || 0.0 }
  end
end

Optimization strategies focus on minimizing critical section duration and reducing lock frequency. Batching operations, using immutable data where possible, and employing lock-free algorithms for hot paths improve overall performance.

# Optimized batch processing to reduce lock frequency
class BatchProcessor
  def initialize(batch_size = 100)
    @batch_size = batch_size
    @buffer = []
    @mutex = Mutex.new
  end

  def add_item(item)
    batch_to_process = nil
    
    @mutex.synchronize do
      @buffer << item
      if @buffer.length >= @batch_size
        batch_to_process = @buffer.dup
        @buffer.clear
      end
    end

    # Process outside mutex to reduce critical section time
    if batch_to_process
      process_batch(batch_to_process)
    end
  end

  private

  def process_batch(items)
    # Expensive processing outside critical section
    items.each { |item| expensive_operation(item) }
  end
end

Production Patterns

Production applications require robust mutex usage patterns that handle failures gracefully while maintaining performance under load. Error recovery, monitoring, and debugging capabilities become essential for production deployments.

Connection pooling represents a common production pattern where mutexes coordinate access to shared resources like database connections, network sockets, or file handles.

# Production-ready connection pool
class ConnectionPool
  def initialize(size, &connection_factory)
    @size = size
    @connections = []
    @available = []
    @mutex = Mutex.new
    @resource_condition = ConditionVariable.new
    @connection_factory = connection_factory

    # Pre-populate pool
    @size.times do
      conn = @connection_factory.call
      @connections << conn
      @available << conn
    end
  end

  def with_connection(timeout = 30)
    connection = acquire_connection(timeout)
    begin
      yield connection
    ensure
      release_connection(connection)
    end
  end

  private

  def acquire_connection(timeout)
    deadline = Time.now + timeout
    
    @mutex.synchronize do
      while @available.empty?
        remaining = deadline - Time.now
        raise Timeout::Error, "Connection acquisition timeout" if remaining <= 0
        
        @resource_condition.wait(@mutex, remaining)
      end
      
      @available.pop
    end
  end

  def release_connection(connection)
    @mutex.synchronize do
      @available << connection
      @resource_condition.signal
    end
  end
end

Graceful shutdown patterns ensure proper cleanup when applications terminate. Mutexes must be properly released and any pending operations completed before process termination.

# Graceful shutdown with mutex coordination
class GracefulService
  def initialize
    @running = true
    @shutdown_mutex = Mutex.new
    @worker_threads = []
    @pending_work = Queue.new
  end

  def start_workers(count = 4)
    count.times do |i|
      thread = Thread.new do
        worker_loop(i)
      end
      @worker_threads << thread
    end
  end

  def add_work(item)
    @shutdown_mutex.synchronize do
      return false unless @running
      @pending_work << item
      true
    end
  end

  def shutdown(timeout = 30)
    @shutdown_mutex.synchronize do
      @running = false
    end

    # Wait for workers to finish current tasks
    deadline = Time.now + timeout
    @worker_threads.each do |thread|
      remaining = deadline - Time.now
      thread.join(remaining > 0 ? remaining : 0)
    end

    # Force terminate any remaining threads
    @worker_threads.each { |t| t.kill if t.alive? }
  end

  private

  def worker_loop(worker_id)
    while true
      @shutdown_mutex.synchronize do
        break unless @running
      end

      begin
        work_item = @pending_work.pop(non_block: true)
        process_work_item(work_item, worker_id)
      rescue ThreadError
        # Queue empty, continue loop
        sleep(0.1)
      end
    end
  end
end

Health monitoring requires tracking mutex contention and lock acquisition times. High contention indicates potential bottlenecks requiring architectural changes rather than tuning parameters.

# Production mutex monitoring
class MonitoredMutex
  def initialize(name)
    @name = name
    @mutex = Mutex.new
    @metrics = {
      acquisitions: 0,
      contentions: 0,
      total_wait_time: 0.0,
      max_wait_time: 0.0
    }
    @metrics_mutex = Mutex.new
  end

  def synchronize
    start_time = Time.now
    acquired_immediately = @mutex.try_lock

    unless acquired_immediately
      record_contention
      @mutex.lock
    end

    wait_time = Time.now - start_time
    record_acquisition(wait_time)

    begin
      yield
    ensure
      @mutex.unlock
    end
  end

  def metrics
    @metrics_mutex.synchronize { @metrics.dup }
  end

  def reset_metrics
    @metrics_mutex.synchronize do
      @metrics[:acquisitions] = 0
      @metrics[:contentions] = 0
      @metrics[:total_wait_time] = 0.0
      @metrics[:max_wait_time] = 0.0
    end
  end

  private

  def record_contention
    @metrics_mutex.synchronize do
      @metrics[:contentions] += 1
    end
  end

  def record_acquisition(wait_time)
    @metrics_mutex.synchronize do
      @metrics[:acquisitions] += 1
      @metrics[:total_wait_time] += wait_time
      @metrics[:max_wait_time] = [wait_time, @metrics[:max_wait_time]].max
    end
  end
end

Error recovery patterns handle exceptions that occur while holding mutexes. Proper exception handling ensures locks are released even when errors occur, preventing deadlocks and resource leaks.

# Robust error handling with mutex
class ErrorRecoveryProcessor
  def initialize
    @mutex = Mutex.new
    @error_count = 0
    @last_error = nil
  end

  def process_with_recovery(data)
    @mutex.synchronize do
      begin
        result = risky_operation(data)
        reset_error_state
        result
      rescue => error
        handle_error(error)
        raise  # Re-raise after recording
      end
    end
  rescue => error
    # Error handling outside mutex
    log_error(error)
    nil
  end

  def error_rate
    @mutex.synchronize do
      total_operations = @error_count + successful_operations
      return 0.0 if total_operations == 0
      @error_count.to_f / total_operations
    end
  end

  private

  def handle_error(error)
    @error_count += 1
    @last_error = error
    
    # Circuit breaker logic
    if @error_count > 10
      raise CircuitBreakerError, "Too many consecutive errors"
    end
  end

  def reset_error_state
    @error_count = 0
    @last_error = nil
  end
end

Reference

Core Methods

Method Parameters Returns Description
Mutex.new None Mutex Creates a new mutex instance
#lock None self Acquires the mutex, blocking until available
#unlock None self Releases the mutex
#try_lock None Boolean Attempts to acquire mutex without blocking
#synchronize Block Object Executes block with mutex locked
#locked? None Boolean Returns true if mutex is currently locked
#sleep(timeout=nil) timeout (Numeric) Numeric Sleeps while temporarily releasing mutex

Thread State Methods

Method Returns Description
#owned? Boolean Returns true if current thread owns the mutex

Exception Classes

Exception Inherits From Description
ThreadError StandardError Raised for invalid mutex operations

Common Usage Patterns

Pattern Code Template Use Case
Basic synchronization mutex.synchronize { code } Protecting shared resources
Non-blocking check if mutex.try_lock; code; mutex.unlock; end Avoiding deadlock in complex scenarios
Manual lock/unlock mutex.lock; begin; code; ensure; mutex.unlock; end Complex control flow requirements
Status checking puts "Locked: #{mutex.locked?}" Debugging and monitoring

Performance Characteristics

Operation Time Complexity Notes
#lock O(1) - O(∞) Constant time if available, blocks if contended
#unlock O(1) Constant time operation
#try_lock O(1) Always constant time, never blocks
#synchronize O(1) + block time Adds minimal overhead to block execution
#locked? O(1) Constant time status check

Thread Safety Guarantees

Aspect Guarantee
Mutual exclusion Only one thread can hold mutex at a time
Memory visibility Changes made in synchronized block visible to subsequent acquirers
Exception safety Mutex automatically unlocked when exception propagates from synchronized block
Reentrancy Not reentrant - same thread cannot acquire mutex it already owns

Common Error Conditions

Error Cause Solution
ThreadError: Attempt to unlock a mutex which is not locked Calling unlock on unlocked mutex Use synchronize or check locked? status
ThreadError: Attempt to unlock a mutex which is locked by another thread Thread unlocking mutex it doesn't own Only unlock from owning thread
Deadlock Circular wait between threads Establish consistent lock ordering
Performance degradation High lock contention Reduce critical section size or use lock partitioning