Overview
Ruby implements work queues through the Queue class and its thread-aware variants in the Thread module. These classes provide First-In-First-Out (FIFO) data structures with built-in synchronization for multi-threaded applications. The Queue class serves as the foundation, while Thread::Queue and Thread::SizedQueue offer additional thread management capabilities.
Thread::Queue acts as an alias to Queue, providing the same functionality with explicit thread namespace identification. Thread::SizedQueue extends this concept by adding capacity limits, blocking producers when the queue reaches maximum size and automatically resuming when space becomes available.
Work queues excel at decoupling producers from consumers, enabling asynchronous processing patterns. Producers add work items without waiting for completion, while consumers retrieve and process items at their own pace. This separation allows applications to handle varying workloads efficiently.
require 'thread'
# Basic queue creation
queue = Queue.new
sized_queue = Thread::SizedQueue.new(10)
# Producer thread
Thread.new do
5.times { |i| queue.push("task_#{i}") }
end
# Consumer thread
Thread.new do
while task = queue.pop
puts "Processing: #{task}"
end
end
The queue classes handle synchronization automatically, eliminating the need for manual mutex management. Multiple threads can safely push and pop items without race conditions. When queues are empty, consumer threads block until new items arrive, providing efficient waiting mechanisms without CPU spinning.
Ruby's work queues support any object type as queue items. Applications commonly store work units as hashes containing job data, method references, or custom objects representing tasks. The queue implementation handles object references, allowing complex data structures to flow through the processing pipeline.
# Complex work items
work_queue = Queue.new
# Adding different types of work
work_queue << { type: :email, to: 'user@example.com', body: 'Hello' }
work_queue << { type: :image, path: '/tmp/image.jpg', resize: '100x100' }
work_queue << proc { puts "Lambda work item executed" }
Basic Usage
Queue operations center around push and pop methods, with several aliases providing familiar terminology. The push method (also available as << and enq) adds items to the queue tail. The pop method (also available as deq and shift) removes items from the queue head, maintaining FIFO order.
queue = Queue.new
# Adding items (all equivalent)
queue.push("first")
queue << "second"
queue.enq("third")
# Removing items (all equivalent)
item1 = queue.pop
item2 = queue.deq
item3 = queue.shift
puts [item1, item2, item3]
# => ["first", "second", "third"]
The pop method blocks when queues are empty, causing threads to wait until new items arrive. This blocking behavior enables efficient consumer patterns without polling loops. Applications can override blocking with the non_block parameter, which raises ThreadError when queues are empty.
queue = Queue.new
# Blocking pop (waits indefinitely)
Thread.new do
item = queue.pop
puts "Got: #{item}"
end
# Non-blocking pop
begin
item = queue.pop(true) # non_block = true
rescue ThreadError
puts "Queue empty"
end
Thread::SizedQueue adds capacity management through the max= method. When queues reach maximum size, push operations block until space becomes available. This backpressure mechanism prevents memory exhaustion when producers outpace consumers.
sized_queue = Thread::SizedQueue.new(3)
# Fill to capacity
3.times { |i| sized_queue.push("item_#{i}") }
puts sized_queue.size # => 3
# This would block since queue is full
Thread.new do
sized_queue.push("blocked_item")
puts "Finally added blocked item"
end
# Make space by consuming
consumed = sized_queue.pop
puts "Consumed: #{consumed}"
# "Finally added blocked item" prints after space created
Queue inspection methods provide visibility into queue state. The size method returns current item count, empty? checks for zero items, and length acts as a size alias. These methods help applications monitor queue depth and make processing decisions.
queue = Queue.new
queue.push("one")
queue.push("two")
puts queue.size # => 2
puts queue.length # => 2 (alias)
puts queue.empty? # => false
queue.pop
queue.pop
puts queue.empty? # => true
The clear method removes all items from queues, providing reset functionality. This operation executes atomically, ensuring consistent state even when multiple threads access the queue simultaneously.
queue = Queue.new
10.times { |i| queue.push(i) }
puts queue.size # => 10
queue.clear
puts queue.size # => 0
puts queue.empty? # => true
Thread Safety & Concurrency
Ruby's Queue classes provide complete thread safety through internal synchronization mechanisms. Multiple threads can simultaneously push and pop items without external synchronization requirements. The implementation uses condition variables and mutexes internally, handling all race condition scenarios automatically.
Concurrent access patterns emerge naturally from queue thread safety. Producer threads add work items while consumer threads process them, creating pipeline architectures. Applications can scale by adding multiple consumer threads to handle increased workloads without modifying queue access patterns.
require 'thread'
shared_queue = Queue.new
processed_count = 0
count_mutex = Mutex.new
# Multiple producers
3.times do |producer_id|
Thread.new do
10.times do |task_id|
work_item = "producer_#{producer_id}_task_#{task_id}"
shared_queue.push(work_item)
puts "Produced: #{work_item}"
end
end
end
# Multiple consumers
2.times do |consumer_id|
Thread.new do
loop do
begin
item = shared_queue.pop(true)
sleep(0.1) # Simulate processing
count_mutex.synchronize do
processed_count += 1
end
puts "Consumer #{consumer_id} processed: #{item}"
rescue ThreadError
# Queue empty, check again
sleep(0.1)
end
end
end
end
Thread::SizedQueue backpressure mechanisms create natural flow control in concurrent systems. When queues reach capacity, producer threads automatically block until consumers create space. This self-regulating behavior prevents memory exhaustion and maintains system stability under varying load conditions.
sized_queue = Thread::SizedQueue.new(5)
production_complete = false
# Fast producer
producer = Thread.new do
100.times do |i|
sized_queue.push("heavy_task_#{i}")
# Automatically blocks when queue full
end
production_complete = true
end
# Slower consumer
consumer = Thread.new do
while !production_complete || !sized_queue.empty?
item = sized_queue.pop
sleep(0.05) # Simulate slower processing
puts "Processed: #{item}"
end
end
producer.join
consumer.join
Queue blocking semantics require careful shutdown planning in concurrent applications. Consumer threads blocked on empty queues won't exit naturally, requiring explicit signals or sentinel values. Applications commonly use poison pill patterns, pushing special termination objects to signal consumer shutdown.
SHUTDOWN_SIGNAL = :shutdown
work_queue = Queue.new
# Consumer with shutdown handling
consumer = Thread.new do
while (item = work_queue.pop) != SHUTDOWN_SIGNAL
puts "Processing: #{item}"
sleep(0.1)
end
puts "Consumer shutting down"
end
# Producer with shutdown signal
producer = Thread.new do
10.times { |i| work_queue.push("task_#{i}") }
work_queue.push(SHUTDOWN_SIGNAL)
end
producer.join
consumer.join
Exception handling in concurrent queue processing requires thread-local considerations. Exceptions in consumer threads don't propagate to producer threads automatically. Applications must implement explicit error reporting mechanisms, often using separate error queues or shared error tracking structures.
work_queue = Queue.new
error_queue = Queue.new
consumer = Thread.new do
while item = work_queue.pop
begin
# Simulate processing that might fail
raise "Processing failed" if item.include?("error")
puts "Successfully processed: #{item}"
rescue => e
error_info = { item: item, error: e.message, thread: Thread.current }
error_queue.push(error_info)
end
end
end
# Error monitoring thread
error_monitor = Thread.new do
while error_info = error_queue.pop
puts "ERROR - Item: #{error_info[:item]}, Message: #{error_info[:error]}"
end
end
Production Patterns
Production work queue implementations typically integrate with job processing frameworks like Sidekiq, Resque, or DelayedJob. These systems use Ruby's Queue classes internally while providing additional features like persistence, retry logic, and monitoring interfaces. Applications often combine built-in queues for immediate processing with external systems for durable background jobs.
# Hybrid immediate/background processing
class JobProcessor
def initialize
@immediate_queue = Queue.new
@background_jobs = []
start_immediate_processor
end
def enqueue_immediate(job)
@immediate_queue.push(job)
end
def enqueue_background(job_class, *args)
# Integrate with Sidekiq or similar
@background_jobs << { class: job_class, args: args, queued_at: Time.now }
end
private
def start_immediate_processor
Thread.new do
while job = @immediate_queue.pop
begin
job.call
rescue => e
log_error(e, job)
end
end
end
end
def log_error(error, job)
puts "Job failed: #{error.message}"
end
end
processor = JobProcessor.new
processor.enqueue_immediate(proc { puts "Immediate task" })
Monitoring and observability patterns center around queue depth tracking and processing rate measurement. Production systems typically expose metrics through monitoring endpoints, allowing operations teams to track queue backlogs and identify processing bottlenecks.
class MonitoredQueue
def initialize(name, max_size = nil)
@name = name
@queue = max_size ? Thread::SizedQueue.new(max_size) : Queue.new
@enqueued_count = 0
@processed_count = 0
@error_count = 0
@mutex = Mutex.new
end
def push(item)
@queue.push(item)
@mutex.synchronize { @enqueued_count += 1 }
end
def pop
item = @queue.pop
@mutex.synchronize { @processed_count += 1 }
item
rescue => e
@mutex.synchronize { @error_count += 1 }
raise
end
def metrics
@mutex.synchronize do
{
name: @name,
current_size: @queue.size,
enqueued_total: @enqueued_count,
processed_total: @processed_count,
error_total: @error_count,
success_rate: @processed_count.to_f / [@enqueued_count, 1].max
}
end
end
end
# Usage in web application
monitored_queue = MonitoredQueue.new("email_queue", 100)
# Metrics endpoint (Sinatra/Rails)
get '/metrics' do
content_type 'application/json'
monitored_queue.metrics.to_json
end
Graceful shutdown patterns ensure work completion before application termination. Production systems typically implement signal handlers that stop accepting new work while allowing existing items to complete processing.
class GracefulWorker
def initialize(queue_size = 50)
@queue = Thread::SizedQueue.new(queue_size)
@workers = []
@shutdown = false
@mutex = Mutex.new
setup_signal_handlers
start_workers
end
def enqueue(work_item)
return false if @shutdown
@queue.push(work_item)
true
end
def shutdown(timeout = 30)
puts "Initiating graceful shutdown..."
@mutex.synchronize { @shutdown = true }
# Signal all workers to stop
@workers.size.times { @queue.push(:shutdown) }
# Wait for workers to complete
@workers.each { |worker| worker.join(timeout) }
puts "Shutdown complete"
end
private
def setup_signal_handlers
Signal.trap('TERM') { shutdown }
Signal.trap('INT') { shutdown }
end
def start_workers(worker_count = 3)
worker_count.times do |i|
@workers << Thread.new do
while (item = @queue.pop) != :shutdown
process_item(item)
end
puts "Worker #{i} shut down"
end
end
end
def process_item(item)
# Actual work processing
sleep(0.1) # Simulate work
puts "Processed: #{item}"
end
end
Performance & Memory
Queue performance characteristics vary significantly based on usage patterns and queue types. Basic Queue objects excel at high-throughput scenarios with minimal overhead, while Thread::SizedQueue introduces additional synchronization costs due to capacity management. Memory usage grows linearly with queue depth, making monitoring essential for long-running applications.
Benchmark comparisons reveal performance differences between queue operations and competing approaches. Queue push operations execute in constant time regardless of queue size, while pop operations may block depending on queue state. Non-blocking operations avoid thread scheduling overhead but require exception handling.
require 'benchmark'
def benchmark_queue_operations
queue = Queue.new
sized_queue = Thread::SizedQueue.new(1000)
Benchmark.bm(20) do |x|
# Push performance
x.report("Queue push (1M):") do
1_000_000.times { |i| queue.push(i) }
end
x.report("SizedQueue push (1M):") do
1_000_000.times { |i| sized_queue.push(i) }
end
# Pop performance
x.report("Queue pop (1M):") do
1_000_000.times { queue.pop }
end
x.report("SizedQueue pop (1M):") do
1_000_000.times { sized_queue.pop }
end
end
end
benchmark_queue_operations
Memory profiling reveals queue storage overhead and garbage collection behavior. Queue objects maintain internal arrays that resize dynamically, causing memory allocation spikes during growth periods. Applications processing large objects should monitor total memory consumption, not just queue depth.
require 'objspace'
def analyze_queue_memory
queue = Queue.new
# Baseline memory
GC.start
initial_objects = ObjectSpace.count_objects[:TOTAL]
# Add large objects
1000.times do |i|
large_object = {
id: i,
data: "x" * 1000, # 1KB string
timestamp: Time.now,
metadata: (1..100).to_a
}
queue.push(large_object)
end
# Memory after adding objects
GC.start
after_objects = ObjectSpace.count_objects[:TOTAL]
puts "Objects before: #{initial_objects}"
puts "Objects after: #{after_objects}"
puts "Objects added: #{after_objects - initial_objects}"
puts "Queue size: #{queue.size}"
# Measure queue overhead vs direct array
array = []
1000.times { |i| array << "item_#{i}" }
queue_memory = ObjectSpace.memsize_of(queue)
array_memory = ObjectSpace.memsize_of(array)
puts "Queue memory: #{queue_memory} bytes"
puts "Array memory: #{array_memory} bytes"
end
analyze_queue_memory
Scaling patterns address queue performance under high concurrency and large data volumes. Multiple queue strategy distributes load across several queue instances, reducing contention between threads. Worker pool sizing affects throughput, with optimal counts depending on workload characteristics and system resources.
class PerformantQueueSystem
def initialize(queue_count: 4, workers_per_queue: 2)
@queues = Array.new(queue_count) { Queue.new }
@workers = []
@stats = { processed: 0, errors: 0 }
@stats_mutex = Mutex.new
start_workers(workers_per_queue)
end
def enqueue(item)
# Distribute items across queues using hash
queue_index = item.hash % @queues.size
@queues[queue_index].push(item)
end
def stats
@stats_mutex.synchronize { @stats.dup }
end
private
def start_workers(workers_per_queue)
@queues.each_with_index do |queue, queue_index|
workers_per_queue.times do |worker_index|
@workers << Thread.new do
worker_loop(queue, "#{queue_index}-#{worker_index}")
end
end
end
end
def worker_loop(queue, worker_id)
while item = queue.pop
begin
process_item(item)
@stats_mutex.synchronize { @stats[:processed] += 1 }
rescue => e
@stats_mutex.synchronize { @stats[:errors] += 1 }
puts "Worker #{worker_id} error: #{e.message}"
end
end
end
def process_item(item)
# Simulate CPU-bound work
result = 0
1000.times { result += rand(100) }
result
end
end
# Performance testing
system = PerformantQueueSystem.new
start_time = Time.now
# Enqueue work items
10_000.times { |i| system.enqueue("task_#{i}") }
# Wait for processing
sleep(5)
end_time = Time.now
stats = system.stats
puts "Processed: #{stats[:processed]} items"
puts "Errors: #{stats[:errors]} items"
puts "Rate: #{stats[:processed] / (end_time - start_time)} items/second"
Reference
Core Classes
Class | Description | Thread Safe | Capacity |
---|---|---|---|
Queue |
Basic FIFO queue | Yes | Unlimited |
Thread::Queue |
Alias for Queue | Yes | Unlimited |
Thread::SizedQueue |
Capacity-limited queue | Yes | Configurable |
Queue Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new |
None | Queue |
Creates empty queue |
#push(obj) |
obj (Any) |
Queue |
Adds object to queue tail |
#<<(obj) |
obj (Any) |
Queue |
Alias for push |
#enq(obj) |
obj (Any) |
Queue |
Alias for push |
#pop(non_block=false) |
non_block (Boolean) |
Object |
Removes object from queue head |
#deq(non_block=false) |
non_block (Boolean) |
Object |
Alias for pop |
#shift(non_block=false) |
non_block (Boolean) |
Object |
Alias for pop |
#size |
None | Integer |
Returns current item count |
#length |
None | Integer |
Alias for size |
#empty? |
None | Boolean |
Returns true if queue has no items |
#clear |
None | Queue |
Removes all items |
#num_waiting |
None | Integer |
Returns count of waiting threads |
SizedQueue Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#new(max) |
max (Integer) |
SizedQueue |
Creates queue with capacity limit |
#max |
None | Integer |
Returns maximum capacity |
#max=(number) |
number (Integer) |
Integer |
Sets maximum capacity |
#push(obj, non_block=false) |
obj (Any), non_block (Boolean) |
SizedQueue |
Adds object, blocks if full |
#pop(non_block=false) |
non_block (Boolean) |
Object |
Removes object from head |
Exception Types
Exception | Condition | Recovery Strategy |
---|---|---|
ThreadError |
Non-blocking pop on empty queue | Check queue state or retry |
ThreadError |
Non-blocking push on full SizedQueue | Wait or increase capacity |
ArgumentError |
Invalid capacity value | Use positive integer |
Blocking Behavior
Operation | Empty Queue | Full SizedQueue | Non-blocking Mode |
---|---|---|---|
pop |
Blocks indefinitely | N/A | Raises ThreadError |
push |
Returns immediately | Blocks until space available | Raises ThreadError |
size |
Returns 0 | Returns max capacity | Returns immediately |
Thread Coordination Patterns
Pattern | Implementation | Use Case |
---|---|---|
Producer-Consumer | Multiple threads push/pop | Workload distribution |
Pipeline | Chain of queues | Multi-stage processing |
Fan-out | One producer, multiple consumers | Parallel processing |
Fan-in | Multiple producers, one consumer | Result aggregation |
Poison Pill | Special sentinel object | Graceful shutdown |
Performance Characteristics
Operation | Time Complexity | Memory Impact | Blocking Potential |
---|---|---|---|
push |
O(1) | Increases with item size | SizedQueue only |
pop |
O(1) | Decreases with consumption | Empty queues |
size |
O(1) | None | Never |
clear |
O(n) | Releases all items | Never |