CrackedRuby logo

CrackedRuby

Monitor

Documentation for Ruby's Monitor class providing reentrant mutex synchronization for thread-safe access to shared resources.

Concurrency and Parallelism Synchronization
6.4.2

Overview

Monitor provides a reentrant mutual exclusion mechanism for Ruby threads. Unlike basic mutexes, Monitor allows the same thread to acquire the lock multiple times without deadlocking. Ruby implements Monitor through the monitor standard library, which wraps objects with synchronization primitives.

The Monitor class extends the functionality of Mutex by tracking which thread currently holds the lock and maintaining a count of how many times that thread has acquired it. This reentrant behavior makes Monitor safer for scenarios where methods may call other synchronized methods on the same object.

require 'monitor'

class Counter
  include MonitorMixin
  
  def initialize
    super
    @value = 0
  end
  
  def increment
    synchronize do
      @value += 1
    end
  end
  
  def value
    synchronize { @value }
  end
end

counter = Counter.new
counter.increment
puts counter.value # => 1

Monitor supports condition variables through the wait and signal methods, enabling threads to coordinate based on specific conditions. The new_cond method creates condition variables associated with the monitor.

require 'monitor'

class Buffer
  include MonitorMixin
  
  def initialize(max_size)
    super()
    @items = []
    @max_size = max_size
    @not_full = new_cond
    @not_empty = new_cond
  end
  
  def put(item)
    synchronize do
      while @items.size >= @max_size
        @not_full.wait
      end
      @items << item
      @not_empty.signal
    end
  end
  
  def get
    synchronize do
      while @items.empty?
        @not_empty.wait
      end
      item = @items.shift
      @not_full.signal
      item
    end
  end
end

Ruby provides two primary ways to use Monitor: mixing MonitorMixin into classes or extending existing objects with Monitor functionality. The mixin approach integrates synchronization directly into class definitions, while object extension adds synchronization to existing instances.

Basic Usage

MonitorMixin provides the core synchronization interface through the synchronize method, which accepts a block and ensures exclusive access while executing that block. The mixing class gains access to synchronization primitives without explicitly managing lock acquisition and release.

require 'monitor'

class BankAccount
  include MonitorMixin
  
  def initialize(balance)
    super()
    @balance = balance
  end
  
  def withdraw(amount)
    synchronize do
      raise "Insufficient funds" if @balance < amount
      sleep(0.01) # Simulate processing time
      @balance -= amount
    end
  end
  
  def deposit(amount)
    synchronize do
      @balance += amount
    end
  end
  
  def balance
    synchronize { @balance }
  end
end

account = BankAccount.new(100)
account.deposit(50)
account.withdraw(30)
puts account.balance # => 120

The extend approach adds Monitor functionality to existing objects without modifying their class definition. This proves useful when working with objects from external libraries or when synchronization requirements emerge after object creation.

require 'monitor'

hash = {}
hash.extend(MonitorMixin)

# Thread-safe hash operations
hash.synchronize do
  hash[:key1] = "value1"
  hash[:key2] = "value2"
end

result = hash.synchronize { hash.keys.sort }

Monitor supports nested synchronization calls within the same thread without causing deadlocks. This reentrant behavior allows synchronized methods to call other synchronized methods safely.

class SafeCounter
  include MonitorMixin
  
  def initialize
    super()
    @count = 0
  end
  
  def increment
    synchronize do
      @count += 1
      log_change # Calls another synchronized method
    end
  end
  
  def log_change
    synchronize do # Safe - same thread already holds the lock
      puts "Count changed to #{@count}"
    end
  end
  
  def count
    synchronize { @count }
  end
end

Condition variables coordinate thread communication by allowing threads to wait for specific conditions and signal when those conditions change. The new_cond method creates condition variables bound to the monitor.

class WorkQueue
  include MonitorMixin
  
  def initialize
    super()
    @queue = []
    @workers_available = new_cond
  end
  
  def add_work(item)
    synchronize do
      @queue << item
      @workers_available.signal
    end
  end
  
  def get_work
    synchronize do
      while @queue.empty?
        @workers_available.wait
      end
      @queue.shift
    end
  end
end

Thread Safety & Concurrency

Monitor provides thread-safe access to shared data structures through its reentrant locking mechanism. The synchronization occurs at the object level, meaning multiple threads can safely access different Monitor-protected objects concurrently while ensuring exclusive access within each object.

require 'monitor'

class ThreadSafeArray
  include MonitorMixin
  
  def initialize
    super()
    @array = []
  end
  
  def <<(item)
    synchronize do
      @array << item
    end
  end
  
  def pop
    synchronize do
      @array.pop
    end
  end
  
  def size
    synchronize { @array.size }
  end
  
  def each
    # Create a copy to avoid holding the lock during iteration
    items = synchronize { @array.dup }
    items.each { |item| yield item }
  end
end

# Multiple threads can safely modify the same array
array = ThreadSafeArray.new
threads = 10.times.map do |i|
  Thread.new do
    100.times { array << i }
  end
end

threads.each(&:join)
puts array.size # => 1000

Condition variables enable complex synchronization patterns where threads must wait for specific states. The wait method releases the monitor lock and suspends the thread until another thread calls signal or broadcast on the same condition variable.

class ProducerConsumer
  include MonitorMixin
  
  def initialize(capacity)
    super()
    @buffer = []
    @capacity = capacity
    @not_full = new_cond
    @not_empty = new_cond
    @finished = false
  end
  
  def produce(item)
    synchronize do
      while @buffer.size >= @capacity && !@finished
        @not_full.wait
      end
      return if @finished
      
      @buffer << item
      @not_empty.signal
    end
  end
  
  def consume
    synchronize do
      while @buffer.empty? && !@finished
        @not_empty.wait
      end
      
      if @buffer.empty?
        nil # No more items
      else
        item = @buffer.shift
        @not_full.signal
        item
      end
    end
  end
  
  def finish
    synchronize do
      @finished = true
      @not_full.broadcast
      @not_empty.broadcast
    end
  end
end

pc = ProducerConsumer.new(5)

producer = Thread.new do
  10.times { |i| pc.produce("item-#{i}") }
  pc.finish
end

consumer = Thread.new do
  while (item = pc.consume)
    puts "Consumed: #{item}"
  end
end

[producer, consumer].each(&:join)

Monitor handles lock acquisition order automatically, but developers must still consider potential deadlock scenarios when multiple monitors interact. Always acquire monitors in a consistent order across all threads to avoid circular dependencies.

# Safe: Consistent lock ordering
class TransferManager
  def self.transfer(from_account, to_account, amount)
    # Always acquire locks in account ID order
    first, second = [from_account, to_account].sort_by(&:id)
    
    first.synchronize do
      second.synchronize do
        from_account.withdraw(amount)
        to_account.deposit(amount)
      end
    end
  end
end

The try_enter method attempts non-blocking lock acquisition, returning immediately if another thread holds the lock. This approach prevents deadlocks in scenarios where blocking would be problematic.

class NonBlockingCounter
  include MonitorMixin
  
  def initialize
    super()
    @count = 0
  end
  
  def increment_if_available
    if try_enter
      begin
        @count += 1
        @count
      ensure
        exit
      end
    else
      nil # Could not acquire lock
    end
  end
end

Performance & Memory

Monitor incurs performance overhead compared to unsynchronized access due to lock acquisition, release, and potential thread blocking. The overhead scales with lock contention - more threads competing for the same monitor increases wait times and context switching.

require 'monitor'
require 'benchmark'

class UnsafeCounter
  def initialize
    @count = 0
  end
  
  def increment
    @count += 1
  end
  
  def count
    @count
  end
end

class SafeCounter
  include MonitorMixin
  
  def initialize
    super()
    @count = 0
  end
  
  def increment
    synchronize { @count += 1 }
  end
  
  def count
    synchronize { @count }
  end
end

# Performance comparison
iterations = 100_000

unsafe_counter = UnsafeCounter.new
safe_counter = SafeCounter.new

Benchmark.bm(15) do |x|
  x.report("Unsafe:") { iterations.times { unsafe_counter.increment } }
  x.report("Safe:") { iterations.times { safe_counter.increment } }
end

# Results show Monitor adds measurable overhead
#                      user     system      total        real
# Unsafe:          0.010000   0.000000   0.010000 (  0.008234)
# Safe:            0.050000   0.000000   0.050000 (  0.048456)

Memory usage increases with Monitor due to additional synchronization metadata stored per object. Each MonitorMixin inclusion adds instance variables for tracking lock state, owner thread, and recursion count.

require 'monitor'

# Memory usage comparison
class PlainObject
  def initialize
    @data = Array.new(1000) { rand(100) }
  end
end

class MonitoredObject
  include MonitorMixin
  
  def initialize
    super()
    @data = Array.new(1000) { rand(100) }
  end
end

# Check object memory footprint
def object_size(obj)
  ObjectSpace.memsize_of(obj)
end

plain = PlainObject.new
monitored = MonitoredObject.new

puts "Plain object: #{object_size(plain)} bytes"
puts "Monitored object: #{object_size(monitored)} bytes"
# Monitored objects consume additional memory for synchronization

Lock contention severely impacts performance when many threads compete for the same monitor. Design systems to minimize shared state and reduce the duration of synchronized blocks.

# Poor performance: High contention
class HighContentionLogger
  include MonitorMixin
  
  def initialize
    super()
    @logs = []
  end
  
  def log(message)
    synchronize do
      # Expensive operation while holding lock
      formatted_message = format_message(message)
      @logs << formatted_message
      write_to_file(formatted_message) # I/O while locked!
    end
  end
end

# Better performance: Minimize synchronized sections
class LowContentionLogger
  include MonitorMixin
  
  def initialize
    super()
    @logs = []
  end
  
  def log(message)
    # Format outside synchronized block
    formatted_message = format_message(message)
    
    synchronize do
      @logs << formatted_message
    end
    
    # I/O outside synchronized block
    write_to_file(formatted_message)
  end
end

Thread pools reduce the overhead of thread creation and destruction when using Monitor-protected resources. Reusing threads amortizes the cost of synchronization across multiple operations.

require 'monitor'

class ThreadPoolExample
  include MonitorMixin
  
  def initialize(pool_size)
    super()
    @tasks = []
    @not_empty = new_cond
    @shutdown = false
    
    @workers = pool_size.times.map do
      Thread.new { worker_loop }
    end
  end
  
  def submit(&block)
    synchronize do
      @tasks << block
      @not_empty.signal
    end
  end
  
  def shutdown
    synchronize do
      @shutdown = true
      @not_empty.broadcast
    end
    @workers.each(&:join)
  end
  
  private
  
  def worker_loop
    loop do
      task = synchronize do
        while @tasks.empty? && !@shutdown
          @not_empty.wait
        end
        break if @shutdown && @tasks.empty?
        @tasks.shift
      end
      
      break unless task
      task.call
    end
  end
end

Common Pitfalls

Deadlocks occur when threads acquire multiple monitors in different orders, creating circular dependencies. Always establish a consistent lock ordering protocol across the application to prevent this scenario.

require 'monitor'

class Account
  include MonitorMixin
  attr_reader :id, :balance
  
  def initialize(id, balance)
    super()
    @id = id
    @balance = balance
  end
  
  def withdraw(amount)
    synchronize do
      raise "Insufficient funds" if @balance < amount
      @balance -= amount
    end
  end
  
  def deposit(amount)
    synchronize { @balance += amount }
  end
end

# DEADLOCK RISK: Inconsistent lock ordering
def risky_transfer(from, to, amount)
  from.synchronize do
    to.synchronize do # Different threads may acquire in different orders
      from.withdraw(amount)
      to.deposit(amount)
    end
  end
end

# SAFE: Consistent lock ordering
def safe_transfer(from, to, amount)
  first, second = [from, to].sort_by(&:id)
  first.synchronize do
    second.synchronize do
      from.withdraw(amount)
      to.deposit(amount)
    end
  end
end

Exception handling within synchronized blocks can leave resources in inconsistent states. Always use proper cleanup mechanisms and consider the state implications of exceptions.

class ResourceManager
  include MonitorMixin
  
  def initialize
    super()
    @resources = []
    @allocated = Set.new
  end
  
  # PROBLEMATIC: Exception can leave resource allocated
  def unsafe_allocate
    synchronize do
      resource = @resources.pop
      @allocated.add(resource)
      
      # If this raises an exception, resource remains in @allocated
      # but might not be properly initialized
      resource.initialize_connection
      resource
    end
  end
  
  # SAFE: Exception handling preserves consistency
  def safe_allocate
    synchronize do
      resource = @resources.pop
      begin
        resource.initialize_connection
        @allocated.add(resource)
        resource
      rescue => e
        @resources.push(resource) # Return to pool
        raise
      end
    end
  end
  
  def release(resource)
    synchronize do
      @allocated.delete(resource)
      resource.cleanup
      @resources.push(resource)
    end
  end
end

Condition variable misuse leads to lost signals and indefinite waiting. Always check conditions in a loop rather than using simple if statements, as spurious wakeups can occur.

class BoundedQueue
  include MonitorMixin
  
  def initialize(capacity)
    super()
    @items = []
    @capacity = capacity
    @not_full = new_cond
    @not_empty = new_cond
  end
  
  # WRONG: Single condition check
  def unsafe_put(item)
    synchronize do
      if @items.size >= @capacity # Should be 'while'
        @not_full.wait
      end
      @items << item
      @not_empty.signal
    end
  end
  
  # CORRECT: Loop-based condition checking
  def safe_put(item)
    synchronize do
      while @items.size >= @capacity # Handles spurious wakeups
        @not_full.wait
      end
      @items << item
      @not_empty.signal
    end
  end
end

Lock duration minimization prevents performance bottlenecks and reduces contention. Perform expensive operations outside synchronized blocks when possible.

class EmailQueue
  include MonitorMixin
  
  def initialize
    super()
    @queue = []
  end
  
  # POOR: Long-running operation holds lock
  def slow_process
    synchronize do
      while @queue.any?
        email = @queue.shift
        send_email(email) # Network I/O while holding lock!
        log_delivery(email) # File I/O while holding lock!
      end
    end
  end
  
  # BETTER: Minimize lock duration
  def fast_process
    loop do
      email = synchronize do
        @queue.shift # Quick operation
      end
      
      break unless email
      
      # Expensive operations outside lock
      send_email(email)
      log_delivery(email)
    end
  end
end

Production Patterns

Web applications frequently use Monitor for thread-safe caching mechanisms, ensuring cache consistency across multiple request threads while maintaining good performance characteristics.

require 'monitor'

class ApplicationCache
  include MonitorMixin
  
  def initialize(max_size: 1000, ttl: 3600)
    super()
    @cache = {}
    @access_times = {}
    @max_size = max_size
    @ttl = ttl
  end
  
  def get(key)
    synchronize do
      cleanup_expired
      
      if @cache.key?(key) && !expired?(key)
        @access_times[key] = Time.now
        @cache[key]
      else
        nil
      end
    end
  end
  
  def set(key, value)
    synchronize do
      cleanup_expired
      evict_if_full
      
      @cache[key] = value
      @access_times[key] = Time.now
    end
  end
  
  def stats
    synchronize do
      {
        size: @cache.size,
        max_size: @max_size,
        hit_ratio: calculate_hit_ratio
      }
    end
  end
  
  private
  
  def expired?(key)
    Time.now - @access_times[key] > @ttl
  end
  
  def cleanup_expired
    current_time = Time.now
    @cache.keys.each do |key|
      if current_time - @access_times[key] > @ttl
        @cache.delete(key)
        @access_times.delete(key)
      end
    end
  end
  
  def evict_if_full
    return unless @cache.size >= @max_size
    
    oldest_key = @access_times.min_by { |_, time| time }[0]
    @cache.delete(oldest_key)
    @access_times.delete(oldest_key)
  end
end

# Usage in Rails controller
class ProductsController < ApplicationController
  CACHE = ApplicationCache.new(max_size: 500)
  
  def show
    product = CACHE.get("product_#{params[:id]}")
    
    unless product
      product = Product.find(params[:id])
      CACHE.set("product_#{params[:id]}", product)
    end
    
    render json: product
  end
end

Background job systems rely on Monitor for coordinating work distribution among multiple worker processes, ensuring jobs are processed exactly once and handling failure scenarios gracefully.

require 'monitor'

class JobQueue
  include MonitorMixin
  
  def initialize
    super()
    @pending = []
    @processing = {}
    @failed = []
    @not_empty = new_cond
    @shutdown = false
    @worker_threads = []
  end
  
  def start_workers(count)
    synchronize do
      count.times do |i|
        @worker_threads << Thread.new { worker_loop("worker-#{i}") }
      end
    end
  end
  
  def enqueue(job_data)
    job = {
      id: SecureRandom.uuid,
      data: job_data,
      created_at: Time.now,
      attempts: 0,
      max_attempts: 3
    }
    
    synchronize do
      @pending << job
      @not_empty.signal
    end
  end
  
  def shutdown
    synchronize do
      @shutdown = true
      @not_empty.broadcast
    end
    @worker_threads.each(&:join)
  end
  
  def stats
    synchronize do
      {
        pending: @pending.size,
        processing: @processing.size,
        failed: @failed.size,
        workers: @worker_threads.size
      }
    end
  end
  
  private
  
  def worker_loop(worker_id)
    loop do
      job = synchronize do
        while @pending.empty? && !@shutdown
          @not_empty.wait
        end
        
        break if @shutdown && @pending.empty?
        
        job = @pending.shift
        @processing[job[:id]] = {
          job: job,
          worker: worker_id,
          started_at: Time.now
        }
        job
      end
      
      break unless job
      
      begin
        process_job(job)
        
        synchronize do
          @processing.delete(job[:id])
        end
      rescue => e
        handle_job_failure(job, e)
      end
    end
  end
  
  def process_job(job)
    # Simulate job processing
    sleep(rand(0.1..0.5))
    
    # Simulate occasional failures
    raise "Random failure" if rand < 0.1
    
    puts "Processed job #{job[:id]}"
  end
  
  def handle_job_failure(job, error)
    synchronize do
      @processing.delete(job[:id])
      job[:attempts] += 1
      job[:last_error] = error.message
      
      if job[:attempts] < job[:max_attempts]
        @pending << job
        @not_empty.signal
      else
        @failed << job
        puts "Job #{job[:id]} failed permanently: #{error.message}"
      end
    end
  end
end

# Production usage
queue = JobQueue.new
queue.start_workers(4)

# Enqueue jobs from web requests
100.times { |i| queue.enqueue({action: "process_order", order_id: i}) }

# Monitor queue health
Thread.new do
  loop do
    puts queue.stats
    sleep(5)
  end
end

Database connection pooling uses Monitor to manage connection lifecycle, ensuring connections are properly allocated, returned, and maintained across multiple threads.

require 'monitor'

class ConnectionPool
  include MonitorMixin
  
  def initialize(size: 10, timeout: 5)
    super()
    @size = size
    @timeout = timeout
    @available = []
    @allocated = {}
    @created = 0
    @connection_available = new_cond
  end
  
  def with_connection
    connection = checkout
    begin
      yield connection
    ensure
      checkin(connection)
    end
  end
  
  def checkout
    synchronize do
      loop do
        if @available.any?
          conn = @available.pop
          @allocated[conn] = Thread.current
          return conn
        elsif @created < @size
          conn = create_connection
          @created += 1
          @allocated[conn] = Thread.current
          return conn
        else
          # Wait for connection with timeout
          timeout_time = Time.now + @timeout
          
          while Time.now < timeout_time
            @connection_available.wait(@timeout)
            break if @available.any?
          end
          
          raise ConnectionTimeoutError if @available.empty?
        end
      end
    end
  end
  
  def checkin(connection)
    synchronize do
      @allocated.delete(connection)
      
      if connection.valid?
        @available << connection
        @connection_available.signal
      else
        @created -= 1
        connection.close
      end
    end
  end
  
  def pool_status
    synchronize do
      {
        size: @size,
        available: @available.size,
        allocated: @allocated.size,
        created: @created
      }
    end
  end
  
  private
  
  def create_connection
    # Simulate database connection creation
    connection = Object.new
    connection.define_singleton_method(:valid?) { true }
    connection.define_singleton_method(:close) { puts "Connection closed" }
    connection
  end
  
  class ConnectionTimeoutError < StandardError; end
end

# Usage in web application
DB_POOL = ConnectionPool.new(size: 20, timeout: 5)

def handle_request
  DB_POOL.with_connection do |conn|
    # Database operations
    puts "Using connection: #{conn.object_id}"
    sleep(0.1) # Simulate query time
  end
end

Reference

MonitorMixin Methods

Method Parameters Returns Description
synchronize(&block) block (Proc) Object Executes block with exclusive monitor access
new_cond None MonitorCondition Creates condition variable bound to monitor
try_enter None Boolean Non-blocking lock acquisition attempt
enter None nil Explicit lock acquisition (use with exit)
exit None nil Explicit lock release (use with enter)
mon_locked? None Boolean Returns true if current thread holds monitor
mon_owned? None Boolean Returns true if any thread holds monitor

MonitorCondition Methods

Method Parameters Returns Description
wait(timeout = nil) timeout (Numeric, optional) Boolean Waits for condition signal or timeout
wait_while(&block) block (Proc) self Waits while block returns truthy value
wait_until(&block) block (Proc) self Waits until block returns truthy value
signal None self Wakes one waiting thread
broadcast None self Wakes all waiting threads

Monitor vs Mutex Comparison

Feature Monitor Mutex
Reentrant Yes No
Condition Variables Yes No
Thread Ownership Tracked Not tracked
Deadlock Risk Lower Higher
Performance Slower Faster
Memory Usage Higher Lower

Exception Classes

Exception Inherits From Description
MonitorError StandardError Base class for monitor-related errors
ThreadError StandardError Raised on invalid thread operations

Usage Patterns

Pattern Use Case Implementation
Resource Pool Limited resource management synchronize with availability tracking
Producer-Consumer Work queue coordination Condition variables with wait/signal
Cache Thread-safe data storage synchronize around get/set operations
State Machine Coordinated state transitions Condition variables for state changes

Performance Characteristics

Operation Complexity Notes
Lock acquisition O(1) Constant time when uncontended
Reentrant lock O(1) Counter increment only
Condition wait O(1) Thread suspension
Condition signal O(1) Single thread wakeup
Condition broadcast O(n) Wakes all waiting threads

Common Configuration Options

Setting Type Default Description
Timeout Numeric nil Wait timeout in seconds (nil = forever)
Queue Depth Integer Unlimited Maximum pending condition waiters
Lock Recursion Integer Unlimited Maximum reentrant acquisitions

Thread Safety Guarantees

Guarantee Description
Mutual Exclusion Only one thread executes synchronized code
Visibility Changes made in synchronized blocks are visible to other threads
Atomicity Synchronized operations appear atomic to other threads
Ordering Operations within synchronized blocks maintain program order