Overview
Monitor provides a reentrant mutual exclusion mechanism for Ruby threads. Unlike basic mutexes, Monitor allows the same thread to acquire the lock multiple times without deadlocking. Ruby implements Monitor through the monitor
standard library, which wraps objects with synchronization primitives.
The Monitor class extends the functionality of Mutex by tracking which thread currently holds the lock and maintaining a count of how many times that thread has acquired it. This reentrant behavior makes Monitor safer for scenarios where methods may call other synchronized methods on the same object.
require 'monitor'
class Counter
include MonitorMixin
def initialize
super
@value = 0
end
def increment
synchronize do
@value += 1
end
end
def value
synchronize { @value }
end
end
counter = Counter.new
counter.increment
puts counter.value # => 1
Monitor supports condition variables through the wait
and signal
methods, enabling threads to coordinate based on specific conditions. The new_cond
method creates condition variables associated with the monitor.
require 'monitor'
class Buffer
include MonitorMixin
def initialize(max_size)
super()
@items = []
@max_size = max_size
@not_full = new_cond
@not_empty = new_cond
end
def put(item)
synchronize do
while @items.size >= @max_size
@not_full.wait
end
@items << item
@not_empty.signal
end
end
def get
synchronize do
while @items.empty?
@not_empty.wait
end
item = @items.shift
@not_full.signal
item
end
end
end
Ruby provides two primary ways to use Monitor: mixing MonitorMixin into classes or extending existing objects with Monitor functionality. The mixin approach integrates synchronization directly into class definitions, while object extension adds synchronization to existing instances.
Basic Usage
MonitorMixin provides the core synchronization interface through the synchronize
method, which accepts a block and ensures exclusive access while executing that block. The mixing class gains access to synchronization primitives without explicitly managing lock acquisition and release.
require 'monitor'
class BankAccount
include MonitorMixin
def initialize(balance)
super()
@balance = balance
end
def withdraw(amount)
synchronize do
raise "Insufficient funds" if @balance < amount
sleep(0.01) # Simulate processing time
@balance -= amount
end
end
def deposit(amount)
synchronize do
@balance += amount
end
end
def balance
synchronize { @balance }
end
end
account = BankAccount.new(100)
account.deposit(50)
account.withdraw(30)
puts account.balance # => 120
The extend
approach adds Monitor functionality to existing objects without modifying their class definition. This proves useful when working with objects from external libraries or when synchronization requirements emerge after object creation.
require 'monitor'
hash = {}
hash.extend(MonitorMixin)
# Thread-safe hash operations
hash.synchronize do
hash[:key1] = "value1"
hash[:key2] = "value2"
end
result = hash.synchronize { hash.keys.sort }
Monitor supports nested synchronization calls within the same thread without causing deadlocks. This reentrant behavior allows synchronized methods to call other synchronized methods safely.
class SafeCounter
include MonitorMixin
def initialize
super()
@count = 0
end
def increment
synchronize do
@count += 1
log_change # Calls another synchronized method
end
end
def log_change
synchronize do # Safe - same thread already holds the lock
puts "Count changed to #{@count}"
end
end
def count
synchronize { @count }
end
end
Condition variables coordinate thread communication by allowing threads to wait for specific conditions and signal when those conditions change. The new_cond
method creates condition variables bound to the monitor.
class WorkQueue
include MonitorMixin
def initialize
super()
@queue = []
@workers_available = new_cond
end
def add_work(item)
synchronize do
@queue << item
@workers_available.signal
end
end
def get_work
synchronize do
while @queue.empty?
@workers_available.wait
end
@queue.shift
end
end
end
Thread Safety & Concurrency
Monitor provides thread-safe access to shared data structures through its reentrant locking mechanism. The synchronization occurs at the object level, meaning multiple threads can safely access different Monitor-protected objects concurrently while ensuring exclusive access within each object.
require 'monitor'
class ThreadSafeArray
include MonitorMixin
def initialize
super()
@array = []
end
def <<(item)
synchronize do
@array << item
end
end
def pop
synchronize do
@array.pop
end
end
def size
synchronize { @array.size }
end
def each
# Create a copy to avoid holding the lock during iteration
items = synchronize { @array.dup }
items.each { |item| yield item }
end
end
# Multiple threads can safely modify the same array
array = ThreadSafeArray.new
threads = 10.times.map do |i|
Thread.new do
100.times { array << i }
end
end
threads.each(&:join)
puts array.size # => 1000
Condition variables enable complex synchronization patterns where threads must wait for specific states. The wait
method releases the monitor lock and suspends the thread until another thread calls signal
or broadcast
on the same condition variable.
class ProducerConsumer
include MonitorMixin
def initialize(capacity)
super()
@buffer = []
@capacity = capacity
@not_full = new_cond
@not_empty = new_cond
@finished = false
end
def produce(item)
synchronize do
while @buffer.size >= @capacity && !@finished
@not_full.wait
end
return if @finished
@buffer << item
@not_empty.signal
end
end
def consume
synchronize do
while @buffer.empty? && !@finished
@not_empty.wait
end
if @buffer.empty?
nil # No more items
else
item = @buffer.shift
@not_full.signal
item
end
end
end
def finish
synchronize do
@finished = true
@not_full.broadcast
@not_empty.broadcast
end
end
end
pc = ProducerConsumer.new(5)
producer = Thread.new do
10.times { |i| pc.produce("item-#{i}") }
pc.finish
end
consumer = Thread.new do
while (item = pc.consume)
puts "Consumed: #{item}"
end
end
[producer, consumer].each(&:join)
Monitor handles lock acquisition order automatically, but developers must still consider potential deadlock scenarios when multiple monitors interact. Always acquire monitors in a consistent order across all threads to avoid circular dependencies.
# Safe: Consistent lock ordering
class TransferManager
def self.transfer(from_account, to_account, amount)
# Always acquire locks in account ID order
first, second = [from_account, to_account].sort_by(&:id)
first.synchronize do
second.synchronize do
from_account.withdraw(amount)
to_account.deposit(amount)
end
end
end
end
The try_enter
method attempts non-blocking lock acquisition, returning immediately if another thread holds the lock. This approach prevents deadlocks in scenarios where blocking would be problematic.
class NonBlockingCounter
include MonitorMixin
def initialize
super()
@count = 0
end
def increment_if_available
if try_enter
begin
@count += 1
@count
ensure
exit
end
else
nil # Could not acquire lock
end
end
end
Performance & Memory
Monitor incurs performance overhead compared to unsynchronized access due to lock acquisition, release, and potential thread blocking. The overhead scales with lock contention - more threads competing for the same monitor increases wait times and context switching.
require 'monitor'
require 'benchmark'
class UnsafeCounter
def initialize
@count = 0
end
def increment
@count += 1
end
def count
@count
end
end
class SafeCounter
include MonitorMixin
def initialize
super()
@count = 0
end
def increment
synchronize { @count += 1 }
end
def count
synchronize { @count }
end
end
# Performance comparison
iterations = 100_000
unsafe_counter = UnsafeCounter.new
safe_counter = SafeCounter.new
Benchmark.bm(15) do |x|
x.report("Unsafe:") { iterations.times { unsafe_counter.increment } }
x.report("Safe:") { iterations.times { safe_counter.increment } }
end
# Results show Monitor adds measurable overhead
# user system total real
# Unsafe: 0.010000 0.000000 0.010000 ( 0.008234)
# Safe: 0.050000 0.000000 0.050000 ( 0.048456)
Memory usage increases with Monitor due to additional synchronization metadata stored per object. Each MonitorMixin inclusion adds instance variables for tracking lock state, owner thread, and recursion count.
require 'monitor'
# Memory usage comparison
class PlainObject
def initialize
@data = Array.new(1000) { rand(100) }
end
end
class MonitoredObject
include MonitorMixin
def initialize
super()
@data = Array.new(1000) { rand(100) }
end
end
# Check object memory footprint
def object_size(obj)
ObjectSpace.memsize_of(obj)
end
plain = PlainObject.new
monitored = MonitoredObject.new
puts "Plain object: #{object_size(plain)} bytes"
puts "Monitored object: #{object_size(monitored)} bytes"
# Monitored objects consume additional memory for synchronization
Lock contention severely impacts performance when many threads compete for the same monitor. Design systems to minimize shared state and reduce the duration of synchronized blocks.
# Poor performance: High contention
class HighContentionLogger
include MonitorMixin
def initialize
super()
@logs = []
end
def log(message)
synchronize do
# Expensive operation while holding lock
formatted_message = format_message(message)
@logs << formatted_message
write_to_file(formatted_message) # I/O while locked!
end
end
end
# Better performance: Minimize synchronized sections
class LowContentionLogger
include MonitorMixin
def initialize
super()
@logs = []
end
def log(message)
# Format outside synchronized block
formatted_message = format_message(message)
synchronize do
@logs << formatted_message
end
# I/O outside synchronized block
write_to_file(formatted_message)
end
end
Thread pools reduce the overhead of thread creation and destruction when using Monitor-protected resources. Reusing threads amortizes the cost of synchronization across multiple operations.
require 'monitor'
class ThreadPoolExample
include MonitorMixin
def initialize(pool_size)
super()
@tasks = []
@not_empty = new_cond
@shutdown = false
@workers = pool_size.times.map do
Thread.new { worker_loop }
end
end
def submit(&block)
synchronize do
@tasks << block
@not_empty.signal
end
end
def shutdown
synchronize do
@shutdown = true
@not_empty.broadcast
end
@workers.each(&:join)
end
private
def worker_loop
loop do
task = synchronize do
while @tasks.empty? && !@shutdown
@not_empty.wait
end
break if @shutdown && @tasks.empty?
@tasks.shift
end
break unless task
task.call
end
end
end
Common Pitfalls
Deadlocks occur when threads acquire multiple monitors in different orders, creating circular dependencies. Always establish a consistent lock ordering protocol across the application to prevent this scenario.
require 'monitor'
class Account
include MonitorMixin
attr_reader :id, :balance
def initialize(id, balance)
super()
@id = id
@balance = balance
end
def withdraw(amount)
synchronize do
raise "Insufficient funds" if @balance < amount
@balance -= amount
end
end
def deposit(amount)
synchronize { @balance += amount }
end
end
# DEADLOCK RISK: Inconsistent lock ordering
def risky_transfer(from, to, amount)
from.synchronize do
to.synchronize do # Different threads may acquire in different orders
from.withdraw(amount)
to.deposit(amount)
end
end
end
# SAFE: Consistent lock ordering
def safe_transfer(from, to, amount)
first, second = [from, to].sort_by(&:id)
first.synchronize do
second.synchronize do
from.withdraw(amount)
to.deposit(amount)
end
end
end
Exception handling within synchronized blocks can leave resources in inconsistent states. Always use proper cleanup mechanisms and consider the state implications of exceptions.
class ResourceManager
include MonitorMixin
def initialize
super()
@resources = []
@allocated = Set.new
end
# PROBLEMATIC: Exception can leave resource allocated
def unsafe_allocate
synchronize do
resource = @resources.pop
@allocated.add(resource)
# If this raises an exception, resource remains in @allocated
# but might not be properly initialized
resource.initialize_connection
resource
end
end
# SAFE: Exception handling preserves consistency
def safe_allocate
synchronize do
resource = @resources.pop
begin
resource.initialize_connection
@allocated.add(resource)
resource
rescue => e
@resources.push(resource) # Return to pool
raise
end
end
end
def release(resource)
synchronize do
@allocated.delete(resource)
resource.cleanup
@resources.push(resource)
end
end
end
Condition variable misuse leads to lost signals and indefinite waiting. Always check conditions in a loop rather than using simple if statements, as spurious wakeups can occur.
class BoundedQueue
include MonitorMixin
def initialize(capacity)
super()
@items = []
@capacity = capacity
@not_full = new_cond
@not_empty = new_cond
end
# WRONG: Single condition check
def unsafe_put(item)
synchronize do
if @items.size >= @capacity # Should be 'while'
@not_full.wait
end
@items << item
@not_empty.signal
end
end
# CORRECT: Loop-based condition checking
def safe_put(item)
synchronize do
while @items.size >= @capacity # Handles spurious wakeups
@not_full.wait
end
@items << item
@not_empty.signal
end
end
end
Lock duration minimization prevents performance bottlenecks and reduces contention. Perform expensive operations outside synchronized blocks when possible.
class EmailQueue
include MonitorMixin
def initialize
super()
@queue = []
end
# POOR: Long-running operation holds lock
def slow_process
synchronize do
while @queue.any?
email = @queue.shift
send_email(email) # Network I/O while holding lock!
log_delivery(email) # File I/O while holding lock!
end
end
end
# BETTER: Minimize lock duration
def fast_process
loop do
email = synchronize do
@queue.shift # Quick operation
end
break unless email
# Expensive operations outside lock
send_email(email)
log_delivery(email)
end
end
end
Production Patterns
Web applications frequently use Monitor for thread-safe caching mechanisms, ensuring cache consistency across multiple request threads while maintaining good performance characteristics.
require 'monitor'
class ApplicationCache
include MonitorMixin
def initialize(max_size: 1000, ttl: 3600)
super()
@cache = {}
@access_times = {}
@max_size = max_size
@ttl = ttl
end
def get(key)
synchronize do
cleanup_expired
if @cache.key?(key) && !expired?(key)
@access_times[key] = Time.now
@cache[key]
else
nil
end
end
end
def set(key, value)
synchronize do
cleanup_expired
evict_if_full
@cache[key] = value
@access_times[key] = Time.now
end
end
def stats
synchronize do
{
size: @cache.size,
max_size: @max_size,
hit_ratio: calculate_hit_ratio
}
end
end
private
def expired?(key)
Time.now - @access_times[key] > @ttl
end
def cleanup_expired
current_time = Time.now
@cache.keys.each do |key|
if current_time - @access_times[key] > @ttl
@cache.delete(key)
@access_times.delete(key)
end
end
end
def evict_if_full
return unless @cache.size >= @max_size
oldest_key = @access_times.min_by { |_, time| time }[0]
@cache.delete(oldest_key)
@access_times.delete(oldest_key)
end
end
# Usage in Rails controller
class ProductsController < ApplicationController
CACHE = ApplicationCache.new(max_size: 500)
def show
product = CACHE.get("product_#{params[:id]}")
unless product
product = Product.find(params[:id])
CACHE.set("product_#{params[:id]}", product)
end
render json: product
end
end
Background job systems rely on Monitor for coordinating work distribution among multiple worker processes, ensuring jobs are processed exactly once and handling failure scenarios gracefully.
require 'monitor'
class JobQueue
include MonitorMixin
def initialize
super()
@pending = []
@processing = {}
@failed = []
@not_empty = new_cond
@shutdown = false
@worker_threads = []
end
def start_workers(count)
synchronize do
count.times do |i|
@worker_threads << Thread.new { worker_loop("worker-#{i}") }
end
end
end
def enqueue(job_data)
job = {
id: SecureRandom.uuid,
data: job_data,
created_at: Time.now,
attempts: 0,
max_attempts: 3
}
synchronize do
@pending << job
@not_empty.signal
end
end
def shutdown
synchronize do
@shutdown = true
@not_empty.broadcast
end
@worker_threads.each(&:join)
end
def stats
synchronize do
{
pending: @pending.size,
processing: @processing.size,
failed: @failed.size,
workers: @worker_threads.size
}
end
end
private
def worker_loop(worker_id)
loop do
job = synchronize do
while @pending.empty? && !@shutdown
@not_empty.wait
end
break if @shutdown && @pending.empty?
job = @pending.shift
@processing[job[:id]] = {
job: job,
worker: worker_id,
started_at: Time.now
}
job
end
break unless job
begin
process_job(job)
synchronize do
@processing.delete(job[:id])
end
rescue => e
handle_job_failure(job, e)
end
end
end
def process_job(job)
# Simulate job processing
sleep(rand(0.1..0.5))
# Simulate occasional failures
raise "Random failure" if rand < 0.1
puts "Processed job #{job[:id]}"
end
def handle_job_failure(job, error)
synchronize do
@processing.delete(job[:id])
job[:attempts] += 1
job[:last_error] = error.message
if job[:attempts] < job[:max_attempts]
@pending << job
@not_empty.signal
else
@failed << job
puts "Job #{job[:id]} failed permanently: #{error.message}"
end
end
end
end
# Production usage
queue = JobQueue.new
queue.start_workers(4)
# Enqueue jobs from web requests
100.times { |i| queue.enqueue({action: "process_order", order_id: i}) }
# Monitor queue health
Thread.new do
loop do
puts queue.stats
sleep(5)
end
end
Database connection pooling uses Monitor to manage connection lifecycle, ensuring connections are properly allocated, returned, and maintained across multiple threads.
require 'monitor'
class ConnectionPool
include MonitorMixin
def initialize(size: 10, timeout: 5)
super()
@size = size
@timeout = timeout
@available = []
@allocated = {}
@created = 0
@connection_available = new_cond
end
def with_connection
connection = checkout
begin
yield connection
ensure
checkin(connection)
end
end
def checkout
synchronize do
loop do
if @available.any?
conn = @available.pop
@allocated[conn] = Thread.current
return conn
elsif @created < @size
conn = create_connection
@created += 1
@allocated[conn] = Thread.current
return conn
else
# Wait for connection with timeout
timeout_time = Time.now + @timeout
while Time.now < timeout_time
@connection_available.wait(@timeout)
break if @available.any?
end
raise ConnectionTimeoutError if @available.empty?
end
end
end
end
def checkin(connection)
synchronize do
@allocated.delete(connection)
if connection.valid?
@available << connection
@connection_available.signal
else
@created -= 1
connection.close
end
end
end
def pool_status
synchronize do
{
size: @size,
available: @available.size,
allocated: @allocated.size,
created: @created
}
end
end
private
def create_connection
# Simulate database connection creation
connection = Object.new
connection.define_singleton_method(:valid?) { true }
connection.define_singleton_method(:close) { puts "Connection closed" }
connection
end
class ConnectionTimeoutError < StandardError; end
end
# Usage in web application
DB_POOL = ConnectionPool.new(size: 20, timeout: 5)
def handle_request
DB_POOL.with_connection do |conn|
# Database operations
puts "Using connection: #{conn.object_id}"
sleep(0.1) # Simulate query time
end
end
Reference
MonitorMixin Methods
Method | Parameters | Returns | Description |
---|---|---|---|
synchronize(&block) |
block (Proc) |
Object |
Executes block with exclusive monitor access |
new_cond |
None | MonitorCondition |
Creates condition variable bound to monitor |
try_enter |
None | Boolean |
Non-blocking lock acquisition attempt |
enter |
None | nil |
Explicit lock acquisition (use with exit ) |
exit |
None | nil |
Explicit lock release (use with enter ) |
mon_locked? |
None | Boolean |
Returns true if current thread holds monitor |
mon_owned? |
None | Boolean |
Returns true if any thread holds monitor |
MonitorCondition Methods
Method | Parameters | Returns | Description |
---|---|---|---|
wait(timeout = nil) |
timeout (Numeric, optional) |
Boolean |
Waits for condition signal or timeout |
wait_while(&block) |
block (Proc) |
self |
Waits while block returns truthy value |
wait_until(&block) |
block (Proc) |
self |
Waits until block returns truthy value |
signal |
None | self |
Wakes one waiting thread |
broadcast |
None | self |
Wakes all waiting threads |
Monitor vs Mutex Comparison
Feature | Monitor | Mutex |
---|---|---|
Reentrant | Yes | No |
Condition Variables | Yes | No |
Thread Ownership | Tracked | Not tracked |
Deadlock Risk | Lower | Higher |
Performance | Slower | Faster |
Memory Usage | Higher | Lower |
Exception Classes
Exception | Inherits From | Description |
---|---|---|
MonitorError |
StandardError |
Base class for monitor-related errors |
ThreadError |
StandardError |
Raised on invalid thread operations |
Usage Patterns
Pattern | Use Case | Implementation |
---|---|---|
Resource Pool | Limited resource management | synchronize with availability tracking |
Producer-Consumer | Work queue coordination | Condition variables with wait /signal |
Cache | Thread-safe data storage | synchronize around get/set operations |
State Machine | Coordinated state transitions | Condition variables for state changes |
Performance Characteristics
Operation | Complexity | Notes |
---|---|---|
Lock acquisition | O(1) | Constant time when uncontended |
Reentrant lock | O(1) | Counter increment only |
Condition wait | O(1) | Thread suspension |
Condition signal | O(1) | Single thread wakeup |
Condition broadcast | O(n) | Wakes all waiting threads |
Common Configuration Options
Setting | Type | Default | Description |
---|---|---|---|
Timeout | Numeric | nil |
Wait timeout in seconds (nil = forever) |
Queue Depth | Integer | Unlimited | Maximum pending condition waiters |
Lock Recursion | Integer | Unlimited | Maximum reentrant acquisitions |
Thread Safety Guarantees
Guarantee | Description |
---|---|
Mutual Exclusion | Only one thread executes synchronized code |
Visibility | Changes made in synchronized blocks are visible to other threads |
Atomicity | Synchronized operations appear atomic to other threads |
Ordering | Operations within synchronized blocks maintain program order |