CrackedRuby logo

CrackedRuby

Work Queues

Work Queues in Ruby provide thread-safe data structures for coordinating work between multiple threads using Queue, Thread::Queue, and Thread::SizedQueue classes.

Concurrency and Parallelism Thread Pools
6.7.2

Overview

Ruby implements work queues through the Queue class and its thread-aware variants in the Thread module. These classes provide First-In-First-Out (FIFO) data structures with built-in synchronization for multi-threaded applications. The Queue class serves as the foundation, while Thread::Queue and Thread::SizedQueue offer additional thread management capabilities.

Thread::Queue acts as an alias to Queue, providing the same functionality with explicit thread namespace identification. Thread::SizedQueue extends this concept by adding capacity limits, blocking producers when the queue reaches maximum size and automatically resuming when space becomes available.

Work queues excel at decoupling producers from consumers, enabling asynchronous processing patterns. Producers add work items without waiting for completion, while consumers retrieve and process items at their own pace. This separation allows applications to handle varying workloads efficiently.

require 'thread'

# Basic queue creation
queue = Queue.new
sized_queue = Thread::SizedQueue.new(10)

# Producer thread
Thread.new do
  5.times { |i| queue.push("task_#{i}") }
end

# Consumer thread
Thread.new do
  while task = queue.pop
    puts "Processing: #{task}"
  end
end

The queue classes handle synchronization automatically, eliminating the need for manual mutex management. Multiple threads can safely push and pop items without race conditions. When queues are empty, consumer threads block until new items arrive, providing efficient waiting mechanisms without CPU spinning.

Ruby's work queues support any object type as queue items. Applications commonly store work units as hashes containing job data, method references, or custom objects representing tasks. The queue implementation handles object references, allowing complex data structures to flow through the processing pipeline.

# Complex work items
work_queue = Queue.new

# Adding different types of work
work_queue << { type: :email, to: 'user@example.com', body: 'Hello' }
work_queue << { type: :image, path: '/tmp/image.jpg', resize: '100x100' }
work_queue << proc { puts "Lambda work item executed" }

Basic Usage

Queue operations center around push and pop methods, with several aliases providing familiar terminology. The push method (also available as << and enq) adds items to the queue tail. The pop method (also available as deq and shift) removes items from the queue head, maintaining FIFO order.

queue = Queue.new

# Adding items (all equivalent)
queue.push("first")
queue << "second" 
queue.enq("third")

# Removing items (all equivalent)  
item1 = queue.pop
item2 = queue.deq
item3 = queue.shift

puts [item1, item2, item3]
# => ["first", "second", "third"]

The pop method blocks when queues are empty, causing threads to wait until new items arrive. This blocking behavior enables efficient consumer patterns without polling loops. Applications can override blocking with the non_block parameter, which raises ThreadError when queues are empty.

queue = Queue.new

# Blocking pop (waits indefinitely)
Thread.new do
  item = queue.pop
  puts "Got: #{item}"
end

# Non-blocking pop  
begin
  item = queue.pop(true)  # non_block = true
rescue ThreadError
  puts "Queue empty"
end

Thread::SizedQueue adds capacity management through the max= method. When queues reach maximum size, push operations block until space becomes available. This backpressure mechanism prevents memory exhaustion when producers outpace consumers.

sized_queue = Thread::SizedQueue.new(3)

# Fill to capacity
3.times { |i| sized_queue.push("item_#{i}") }
puts sized_queue.size # => 3

# This would block since queue is full
Thread.new do
  sized_queue.push("blocked_item")
  puts "Finally added blocked item"
end

# Make space by consuming
consumed = sized_queue.pop
puts "Consumed: #{consumed}"
# "Finally added blocked item" prints after space created

Queue inspection methods provide visibility into queue state. The size method returns current item count, empty? checks for zero items, and length acts as a size alias. These methods help applications monitor queue depth and make processing decisions.

queue = Queue.new
queue.push("one")
queue.push("two")

puts queue.size     # => 2
puts queue.length   # => 2 (alias)
puts queue.empty?   # => false

queue.pop
queue.pop
puts queue.empty?   # => true

The clear method removes all items from queues, providing reset functionality. This operation executes atomically, ensuring consistent state even when multiple threads access the queue simultaneously.

queue = Queue.new
10.times { |i| queue.push(i) }
puts queue.size  # => 10

queue.clear
puts queue.size  # => 0
puts queue.empty?  # => true

Thread Safety & Concurrency

Ruby's Queue classes provide complete thread safety through internal synchronization mechanisms. Multiple threads can simultaneously push and pop items without external synchronization requirements. The implementation uses condition variables and mutexes internally, handling all race condition scenarios automatically.

Concurrent access patterns emerge naturally from queue thread safety. Producer threads add work items while consumer threads process them, creating pipeline architectures. Applications can scale by adding multiple consumer threads to handle increased workloads without modifying queue access patterns.

require 'thread'

shared_queue = Queue.new
processed_count = 0
count_mutex = Mutex.new

# Multiple producers
3.times do |producer_id|
  Thread.new do
    10.times do |task_id|
      work_item = "producer_#{producer_id}_task_#{task_id}"
      shared_queue.push(work_item)
      puts "Produced: #{work_item}"
    end
  end
end

# Multiple consumers
2.times do |consumer_id|
  Thread.new do
    loop do
      begin
        item = shared_queue.pop(true)
        sleep(0.1)  # Simulate processing
        
        count_mutex.synchronize do
          processed_count += 1
        end
        
        puts "Consumer #{consumer_id} processed: #{item}"
      rescue ThreadError
        # Queue empty, check again
        sleep(0.1)
      end
    end
  end
end

Thread::SizedQueue backpressure mechanisms create natural flow control in concurrent systems. When queues reach capacity, producer threads automatically block until consumers create space. This self-regulating behavior prevents memory exhaustion and maintains system stability under varying load conditions.

sized_queue = Thread::SizedQueue.new(5)
production_complete = false

# Fast producer
producer = Thread.new do
  100.times do |i|
    sized_queue.push("heavy_task_#{i}")
    # Automatically blocks when queue full
  end
  production_complete = true
end

# Slower consumer  
consumer = Thread.new do
  while !production_complete || !sized_queue.empty?
    item = sized_queue.pop
    sleep(0.05)  # Simulate slower processing
    puts "Processed: #{item}"
  end
end

producer.join
consumer.join

Queue blocking semantics require careful shutdown planning in concurrent applications. Consumer threads blocked on empty queues won't exit naturally, requiring explicit signals or sentinel values. Applications commonly use poison pill patterns, pushing special termination objects to signal consumer shutdown.

SHUTDOWN_SIGNAL = :shutdown
work_queue = Queue.new

# Consumer with shutdown handling
consumer = Thread.new do
  while (item = work_queue.pop) != SHUTDOWN_SIGNAL
    puts "Processing: #{item}"
    sleep(0.1)
  end
  puts "Consumer shutting down"
end

# Producer with shutdown signal
producer = Thread.new do
  10.times { |i| work_queue.push("task_#{i}") }
  work_queue.push(SHUTDOWN_SIGNAL)
end

producer.join
consumer.join

Exception handling in concurrent queue processing requires thread-local considerations. Exceptions in consumer threads don't propagate to producer threads automatically. Applications must implement explicit error reporting mechanisms, often using separate error queues or shared error tracking structures.

work_queue = Queue.new
error_queue = Queue.new

consumer = Thread.new do
  while item = work_queue.pop
    begin
      # Simulate processing that might fail
      raise "Processing failed" if item.include?("error")
      puts "Successfully processed: #{item}"
    rescue => e
      error_info = { item: item, error: e.message, thread: Thread.current }
      error_queue.push(error_info)
    end
  end
end

# Error monitoring thread
error_monitor = Thread.new do
  while error_info = error_queue.pop
    puts "ERROR - Item: #{error_info[:item]}, Message: #{error_info[:error]}"
  end
end

Production Patterns

Production work queue implementations typically integrate with job processing frameworks like Sidekiq, Resque, or DelayedJob. These systems use Ruby's Queue classes internally while providing additional features like persistence, retry logic, and monitoring interfaces. Applications often combine built-in queues for immediate processing with external systems for durable background jobs.

# Hybrid immediate/background processing
class JobProcessor
  def initialize
    @immediate_queue = Queue.new
    @background_jobs = []
    start_immediate_processor
  end
  
  def enqueue_immediate(job)
    @immediate_queue.push(job)
  end
  
  def enqueue_background(job_class, *args)
    # Integrate with Sidekiq or similar
    @background_jobs << { class: job_class, args: args, queued_at: Time.now }
  end
  
  private
  
  def start_immediate_processor
    Thread.new do
      while job = @immediate_queue.pop
        begin
          job.call
        rescue => e
          log_error(e, job)
        end
      end
    end
  end
  
  def log_error(error, job)
    puts "Job failed: #{error.message}"
  end
end

processor = JobProcessor.new
processor.enqueue_immediate(proc { puts "Immediate task" })

Monitoring and observability patterns center around queue depth tracking and processing rate measurement. Production systems typically expose metrics through monitoring endpoints, allowing operations teams to track queue backlogs and identify processing bottlenecks.

class MonitoredQueue
  def initialize(name, max_size = nil)
    @name = name
    @queue = max_size ? Thread::SizedQueue.new(max_size) : Queue.new
    @enqueued_count = 0
    @processed_count = 0
    @error_count = 0
    @mutex = Mutex.new
  end
  
  def push(item)
    @queue.push(item)
    @mutex.synchronize { @enqueued_count += 1 }
  end
  
  def pop
    item = @queue.pop
    @mutex.synchronize { @processed_count += 1 }
    item
  rescue => e
    @mutex.synchronize { @error_count += 1 }
    raise
  end
  
  def metrics
    @mutex.synchronize do
      {
        name: @name,
        current_size: @queue.size,
        enqueued_total: @enqueued_count,
        processed_total: @processed_count,
        error_total: @error_count,
        success_rate: @processed_count.to_f / [@enqueued_count, 1].max
      }
    end
  end
end

# Usage in web application
monitored_queue = MonitoredQueue.new("email_queue", 100)

# Metrics endpoint (Sinatra/Rails)
get '/metrics' do
  content_type 'application/json'
  monitored_queue.metrics.to_json
end

Graceful shutdown patterns ensure work completion before application termination. Production systems typically implement signal handlers that stop accepting new work while allowing existing items to complete processing.

class GracefulWorker
  def initialize(queue_size = 50)
    @queue = Thread::SizedQueue.new(queue_size)
    @workers = []
    @shutdown = false
    @mutex = Mutex.new
    
    setup_signal_handlers
    start_workers
  end
  
  def enqueue(work_item)
    return false if @shutdown
    @queue.push(work_item)
    true
  end
  
  def shutdown(timeout = 30)
    puts "Initiating graceful shutdown..."
    @mutex.synchronize { @shutdown = true }
    
    # Signal all workers to stop
    @workers.size.times { @queue.push(:shutdown) }
    
    # Wait for workers to complete
    @workers.each { |worker| worker.join(timeout) }
    
    puts "Shutdown complete"
  end
  
  private
  
  def setup_signal_handlers
    Signal.trap('TERM') { shutdown }
    Signal.trap('INT') { shutdown }
  end
  
  def start_workers(worker_count = 3)
    worker_count.times do |i|
      @workers << Thread.new do
        while (item = @queue.pop) != :shutdown
          process_item(item)
        end
        puts "Worker #{i} shut down"
      end
    end
  end
  
  def process_item(item)
    # Actual work processing
    sleep(0.1)  # Simulate work
    puts "Processed: #{item}"
  end
end

Performance & Memory

Queue performance characteristics vary significantly based on usage patterns and queue types. Basic Queue objects excel at high-throughput scenarios with minimal overhead, while Thread::SizedQueue introduces additional synchronization costs due to capacity management. Memory usage grows linearly with queue depth, making monitoring essential for long-running applications.

Benchmark comparisons reveal performance differences between queue operations and competing approaches. Queue push operations execute in constant time regardless of queue size, while pop operations may block depending on queue state. Non-blocking operations avoid thread scheduling overhead but require exception handling.

require 'benchmark'

def benchmark_queue_operations
  queue = Queue.new
  sized_queue = Thread::SizedQueue.new(1000)
  
  Benchmark.bm(20) do |x|
    # Push performance
    x.report("Queue push (1M):") do
      1_000_000.times { |i| queue.push(i) }
    end
    
    x.report("SizedQueue push (1M):") do  
      1_000_000.times { |i| sized_queue.push(i) }
    end
    
    # Pop performance
    x.report("Queue pop (1M):") do
      1_000_000.times { queue.pop }
    end
    
    x.report("SizedQueue pop (1M):") do
      1_000_000.times { sized_queue.pop }
    end
  end
end

benchmark_queue_operations

Memory profiling reveals queue storage overhead and garbage collection behavior. Queue objects maintain internal arrays that resize dynamically, causing memory allocation spikes during growth periods. Applications processing large objects should monitor total memory consumption, not just queue depth.

require 'objspace'

def analyze_queue_memory
  queue = Queue.new
  
  # Baseline memory
  GC.start
  initial_objects = ObjectSpace.count_objects[:TOTAL]
  
  # Add large objects
  1000.times do |i|
    large_object = { 
      id: i, 
      data: "x" * 1000,  # 1KB string
      timestamp: Time.now,
      metadata: (1..100).to_a
    }
    queue.push(large_object)
  end
  
  # Memory after adding objects
  GC.start
  after_objects = ObjectSpace.count_objects[:TOTAL]
  
  puts "Objects before: #{initial_objects}"
  puts "Objects after: #{after_objects}"  
  puts "Objects added: #{after_objects - initial_objects}"
  puts "Queue size: #{queue.size}"
  
  # Measure queue overhead vs direct array
  array = []
  1000.times { |i| array << "item_#{i}" }
  
  queue_memory = ObjectSpace.memsize_of(queue)
  array_memory = ObjectSpace.memsize_of(array)
  
  puts "Queue memory: #{queue_memory} bytes"
  puts "Array memory: #{array_memory} bytes"
end

analyze_queue_memory

Scaling patterns address queue performance under high concurrency and large data volumes. Multiple queue strategy distributes load across several queue instances, reducing contention between threads. Worker pool sizing affects throughput, with optimal counts depending on workload characteristics and system resources.

class PerformantQueueSystem
  def initialize(queue_count: 4, workers_per_queue: 2)
    @queues = Array.new(queue_count) { Queue.new }
    @workers = []
    @stats = { processed: 0, errors: 0 }
    @stats_mutex = Mutex.new
    
    start_workers(workers_per_queue)
  end
  
  def enqueue(item)
    # Distribute items across queues using hash
    queue_index = item.hash % @queues.size
    @queues[queue_index].push(item)
  end
  
  def stats
    @stats_mutex.synchronize { @stats.dup }
  end
  
  private
  
  def start_workers(workers_per_queue)
    @queues.each_with_index do |queue, queue_index|
      workers_per_queue.times do |worker_index|
        @workers << Thread.new do
          worker_loop(queue, "#{queue_index}-#{worker_index}")
        end
      end
    end
  end
  
  def worker_loop(queue, worker_id)
    while item = queue.pop
      begin
        process_item(item)
        @stats_mutex.synchronize { @stats[:processed] += 1 }
      rescue => e
        @stats_mutex.synchronize { @stats[:errors] += 1 }
        puts "Worker #{worker_id} error: #{e.message}"
      end
    end
  end
  
  def process_item(item)
    # Simulate CPU-bound work
    result = 0
    1000.times { result += rand(100) }
    result
  end
end

# Performance testing
system = PerformantQueueSystem.new
start_time = Time.now

# Enqueue work items
10_000.times { |i| system.enqueue("task_#{i}") }

# Wait for processing
sleep(5)
end_time = Time.now

stats = system.stats
puts "Processed: #{stats[:processed]} items"
puts "Errors: #{stats[:errors]} items"
puts "Rate: #{stats[:processed] / (end_time - start_time)} items/second"

Reference

Core Classes

Class Description Thread Safe Capacity
Queue Basic FIFO queue Yes Unlimited
Thread::Queue Alias for Queue Yes Unlimited
Thread::SizedQueue Capacity-limited queue Yes Configurable

Queue Methods

Method Parameters Returns Description
#new None Queue Creates empty queue
#push(obj) obj (Any) Queue Adds object to queue tail
#<<(obj) obj (Any) Queue Alias for push
#enq(obj) obj (Any) Queue Alias for push
#pop(non_block=false) non_block (Boolean) Object Removes object from queue head
#deq(non_block=false) non_block (Boolean) Object Alias for pop
#shift(non_block=false) non_block (Boolean) Object Alias for pop
#size None Integer Returns current item count
#length None Integer Alias for size
#empty? None Boolean Returns true if queue has no items
#clear None Queue Removes all items
#num_waiting None Integer Returns count of waiting threads

SizedQueue Methods

Method Parameters Returns Description
#new(max) max (Integer) SizedQueue Creates queue with capacity limit
#max None Integer Returns maximum capacity
#max=(number) number (Integer) Integer Sets maximum capacity
#push(obj, non_block=false) obj (Any), non_block (Boolean) SizedQueue Adds object, blocks if full
#pop(non_block=false) non_block (Boolean) Object Removes object from head

Exception Types

Exception Condition Recovery Strategy
ThreadError Non-blocking pop on empty queue Check queue state or retry
ThreadError Non-blocking push on full SizedQueue Wait or increase capacity
ArgumentError Invalid capacity value Use positive integer

Blocking Behavior

Operation Empty Queue Full SizedQueue Non-blocking Mode
pop Blocks indefinitely N/A Raises ThreadError
push Returns immediately Blocks until space available Raises ThreadError
size Returns 0 Returns max capacity Returns immediately

Thread Coordination Patterns

Pattern Implementation Use Case
Producer-Consumer Multiple threads push/pop Workload distribution
Pipeline Chain of queues Multi-stage processing
Fan-out One producer, multiple consumers Parallel processing
Fan-in Multiple producers, one consumer Result aggregation
Poison Pill Special sentinel object Graceful shutdown

Performance Characteristics

Operation Time Complexity Memory Impact Blocking Potential
push O(1) Increases with item size SizedQueue only
pop O(1) Decreases with consumption Empty queues
size O(1) None Never
clear O(n) Releases all items Never