CrackedRuby logo

CrackedRuby

Queue and SizedQueue

Thread-safe queue implementations for concurrent Ruby applications using producer-consumer patterns.

Standard Library Data Structures
4.1.2

Overview

Ruby provides two thread-safe queue classes in the standard library: Queue and SizedQueue. Both classes implement FIFO (first-in, first-out) data structures that handle concurrent access from multiple threads without requiring explicit synchronization. The Queue class creates unbounded queues that grow dynamically as items are added, while SizedQueue creates bounded queues with a maximum capacity that blocks producers when the queue reaches its limit.

These classes live in the thread library and become available when required. The Queue class inherits from Thread::Queue, and SizedQueue inherits from Thread::SizedQueue. Both classes share the same basic interface for adding and removing items but differ in their capacity management and blocking behavior.

require 'thread'

# Unbounded queue - grows without limit
queue = Queue.new

# Bounded queue with maximum capacity of 5 items
sized_queue = SizedQueue.new(5)

# Basic operations work identically
queue.push("item 1")
sized_queue.push("item 1")

item = queue.pop
item = sized_queue.pop

The primary use case for these queues involves producer-consumer patterns where one or more threads generate work items while other threads process them. The thread-safe nature eliminates the need for manual synchronization with mutexes or other locking mechanisms. Ruby handles all internal synchronization automatically, making these classes the preferred choice for inter-thread communication.

require 'thread'

# Producer thread adds work items
producer = Thread.new do
  queue = Queue.new
  10.times { |i| queue.push("task_#{i}") }
  queue.close
end

# Consumer thread processes items
consumer = Thread.new do
  while (item = queue.pop)
    puts "Processing: #{item}"
  end
end

Basic Usage

The core operations for both Queue and SizedQueue involve adding items with push (or its aliases << and enq) and removing items with pop (or its alias deq). The push operation adds items to the rear of the queue, while pop removes and returns items from the front, maintaining FIFO ordering.

require 'thread'

queue = Queue.new

# Adding items (all equivalent)
queue.push("first")
queue << "second"
queue.enq("third")

# Removing items (all equivalent)
item1 = queue.pop      # => "first"
item2 = queue.deq      # => "second" 
item3 = queue.shift    # => "third"

The pop method blocks the calling thread when the queue is empty, waiting until another thread adds an item. This blocking behavior enables clean producer-consumer patterns without busy-waiting or polling. To avoid blocking, use pop with the non_block parameter set to true, which raises ThreadError instead of waiting.

require 'thread'

queue = Queue.new

# This will block indefinitely since queue is empty
# item = queue.pop

# Non-blocking pop raises exception when empty
begin
  item = queue.pop(true)  # non_block: true
rescue ThreadError
  puts "Queue is empty"
end

# Check queue state
puts queue.empty?    # => true
puts queue.length    # => 0
puts queue.size      # => 0 (alias for length)

SizedQueue introduces capacity management through its constructor parameter and additional blocking behavior. When a SizedQueue reaches its maximum capacity, push operations block until space becomes available through pop operations or queue closure.

require 'thread'

# Create queue with capacity of 2 items
sized_queue = SizedQueue.new(2)

# Fill to capacity
sized_queue.push("item1")
sized_queue.push("item2")

puts sized_queue.length  # => 2
puts sized_queue.max     # => 2

# This would block since queue is full
# sized_queue.push("item3")

# Non-blocking push raises exception when full
begin
  sized_queue.push("item3", true)  # non_block: true
rescue ThreadError
  puts "Queue is full"
end

# Make space and add item
item = sized_queue.pop   # => "item1"
sized_queue.push("item3") # Now succeeds

Both queue types support closing operations that prevent further additions while allowing existing items to be consumed. The close method stops accepting new items, and closed? reports the queue state. Attempting to push to a closed queue raises ClosedQueueError.

require 'thread'

queue = Queue.new
queue.push("existing item")
queue.close

puts queue.closed?       # => true
puts queue.empty?        # => false

item = queue.pop         # => "existing item" (still works)

begin
  queue.push("new item")
rescue ClosedQueueError
  puts "Cannot add to closed queue"
end

Thread Safety & Concurrency

Ruby implements both Queue and SizedQueue with internal mutexes and condition variables that handle all synchronization automatically. Multiple threads can call push and pop operations concurrently without requiring external synchronization. The implementation guarantees atomic operations and maintains FIFO ordering even under heavy concurrent access.

The blocking behavior of these queues provides natural flow control in producer-consumer scenarios. When consumers process items faster than producers create them, consumers wait on empty queues. When producers create items faster than consumers process them, the system behavior differs between queue types: Queue grows unboundedly while SizedQueue blocks producers at capacity.

require 'thread'

queue = Queue.new
completed = []
mutex = Mutex.new

# Multiple producer threads
producers = 3.times.map do |i|
  Thread.new do
    10.times do |j|
      queue.push("producer_#{i}_item_#{j}")
      sleep(0.01)  # Simulate work
    end
  end
end

# Multiple consumer threads
consumers = 2.times.map do |i|
  Thread.new do
    loop do
      item = queue.pop(true) rescue break  # Non-blocking
      mutex.synchronize { completed << item }
      sleep(0.02)  # Simulate processing
    end
  end
end

# Wait for producers to finish
producers.each(&:join)

# Give consumers time to finish remaining items
sleep(1)

puts "Completed #{completed.length} items"

SizedQueue provides backpressure mechanisms that prevent runaway memory consumption when producers outpace consumers. The blocking behavior of push operations creates a natural throttling effect, causing fast producers to wait for slow consumers to catch up.

require 'thread'

# Small queue creates backpressure
sized_queue = SizedQueue.new(3)
processing_times = []

# Fast producer
producer = Thread.new do
  start_time = Time.now
  20.times do |i|
    sized_queue.push("item_#{i}")
    processing_times << Time.now - start_time
  end
end

# Slow consumer
consumer = Thread.new do
  20.times do
    item = sized_queue.pop
    puts "Processing: #{item}"
    sleep(0.1)  # Simulate slow processing
  end
end

producer.join
consumer.join

# Producer was throttled by consumer speed
puts "Producer blocked #{processing_times.count { |t| t > 0.05 }} times"

Race conditions cannot occur with proper queue usage since Ruby handles all internal state management. However, external state accessed by queue consumers still requires appropriate synchronization. The queue operations themselves are atomic, but combining multiple queue operations or accessing shared variables requires additional coordination.

require 'thread'

# Safe: Queue operations are atomic
queue = Queue.new
counter = 0
mutex = Mutex.new

# Multiple threads incrementing counter
threads = 5.times.map do
  Thread.new do
    100.times do
      item = "work_item"
      queue.push(item)
      
      # External state needs synchronization
      mutex.synchronize { counter += 1 }
    end
  end
end

threads.each(&:join)
puts "Queue size: #{queue.size}"
puts "Counter: #{counter}"

Deadlock situations can occur when threads hold multiple resources or when queue operations interact with other synchronization primitives. Avoid holding locks while performing blocking queue operations, and maintain consistent ordering when acquiring multiple locks.

require 'thread'

# Potential deadlock scenario
queue1 = Queue.new
queue2 = Queue.new
mutex1 = Mutex.new
mutex2 = Mutex.new

# Thread A: acquires mutex1 then waits on queue1
thread_a = Thread.new do
  mutex1.synchronize do
    puts "Thread A has mutex1"
    item = queue1.pop  # Could block forever
    mutex2.synchronize { puts "Thread A has both mutexes" }
  end
end

# Thread B: acquires mutex2 then waits on queue2
thread_b = Thread.new do
  mutex2.synchronize do
    puts "Thread B has mutex2"
    item = queue2.pop  # Could block forever
    mutex1.synchronize { puts "Thread B has both mutexes" }
  end
end

# Better approach: avoid holding locks during blocking operations

Performance & Memory

Queue objects consume memory proportional to the number of stored items since they maintain an internal array that grows dynamically. Each queued item adds a reference to the internal storage, and memory usage increases linearly with queue depth. Ruby's garbage collector can reclaim memory from popped items, but the internal array may retain capacity for performance reasons.

SizedQueue provides memory usage bounds through capacity limits, making it suitable for scenarios where memory consumption must remain predictable. The fixed capacity prevents unbounded growth but may impact throughput when producers consistently exceed the queue's capacity and experience blocking delays.

require 'thread'
require 'benchmark'

# Memory usage comparison
def measure_memory_usage(queue, items)
  GC.start
  before = GC.stat[:total_allocated_objects]
  
  items.times { |i| queue.push("item_#{i}") }
  
  GC.start
  after = GC.stat[:total_allocated_objects]
  after - before
end

unbounded_queue = Queue.new
bounded_queue = SizedQueue.new(1000)

unbounded_memory = measure_memory_usage(unbounded_queue, 10_000)
bounded_memory = measure_memory_usage(bounded_queue, 1000)

puts "Unbounded queue memory: #{unbounded_memory} objects"
puts "Bounded queue memory: #{bounded_memory} objects"

Throughput performance depends on contention levels and the ratio of producers to consumers. Single producer/single consumer scenarios achieve maximum throughput since they minimize lock contention. Multiple producers or consumers introduce synchronization overhead that reduces overall throughput but enables parallel processing.

require 'thread'
require 'benchmark'

def benchmark_queue_operations(queue, producers, consumers, items_per_producer)
  start_time = Time.now
  
  # Create producer threads
  producer_threads = producers.times.map do |i|
    Thread.new do
      items_per_producer.times do |j|
        queue.push("producer_#{i}_item_#{j}")
      end
    end
  end
  
  # Create consumer threads
  processed = 0
  mutex = Mutex.new
  total_items = producers * items_per_producer
  
  consumer_threads = consumers.times.map do
    Thread.new do
      while processed < total_items
        begin
          item = queue.pop(true)
          mutex.synchronize { processed += 1 }
        rescue ThreadError
          sleep(0.001)  # Queue empty, brief pause
        end
      end
    end
  end
  
  # Wait for completion
  producer_threads.each(&:join)
  consumer_threads.each(&:join)
  
  end_time = Time.now
  total_items / (end_time - start_time)
end

# Test different configurations
configurations = [
  { producers: 1, consumers: 1 },
  { producers: 4, consumers: 1 },
  { producers: 1, consumers: 4 },
  { producers: 4, consumers: 4 }
]

queue = Queue.new
configurations.each do |config|
  throughput = benchmark_queue_operations(queue, config[:producers], config[:consumers], 10_000)
  puts "#{config[:producers]}P/#{config[:consumers]}C: #{throughput.round(2)} items/sec"
  queue.clear  # Reset for next test
end

Lock contention increases with the number of concurrent threads accessing the queue. Ruby's internal synchronization uses mutexes that serialize access to queue operations. Higher contention levels reduce throughput and increase latency variance as threads wait for lock acquisition.

require 'thread'

# Measure contention impact
def measure_contention_delay(queue, thread_count, operations_per_thread)
  delays = []
  mutex = Mutex.new
  
  threads = thread_count.times.map do
    Thread.new do
      operations_per_thread.times do
        start_time = Time.now
        queue.push("item")
        item = queue.pop
        delay = Time.now - start_time
        
        mutex.synchronize { delays << delay }
      end
    end
  end
  
  threads.each(&:join)
  delays
end

queue = Queue.new

[1, 2, 4, 8, 16].each do |thread_count|
  delays = measure_contention_delay(queue, thread_count, 1000)
  avg_delay = delays.sum / delays.length
  max_delay = delays.max
  
  puts "#{thread_count} threads: avg=#{(avg_delay * 1000).round(3)}ms, max=#{(max_delay * 1000).round(3)}ms"
end

Production Patterns

Background job processing represents the most common production use of Ruby queues. Web applications push work items onto queues while separate worker processes consume and execute jobs. This pattern decouples request handling from time-consuming operations, improving response times and system scalability.

require 'thread'
require 'json'

class JobQueue
  def initialize(max_size = 1000)
    @queue = SizedQueue.new(max_size)
    @workers = []
    @shutdown = false
  end
  
  def enqueue_job(job_type, payload)
    job = {
      id: SecureRandom.uuid,
      type: job_type,
      payload: payload,
      enqueued_at: Time.now
    }
    
    @queue.push(job)
    job[:id]
  end
  
  def start_workers(worker_count = 4)
    worker_count.times do |i|
      @workers << Thread.new do
        worker_loop(i)
      end
    end
  end
  
  def shutdown
    @shutdown = true
    @queue.close
    @workers.each(&:join)
  end
  
  private
  
  def worker_loop(worker_id)
    while !@shutdown
      begin
        job = @queue.pop(true)  # Non-blocking
        process_job(job, worker_id)
      rescue ThreadError
        # Queue empty, brief pause
        sleep(0.1) unless @shutdown
      rescue ClosedQueueError
        break  # Queue closed, worker should exit
      end
    end
  end
  
  def process_job(job, worker_id)
    start_time = Time.now
    puts "Worker #{worker_id} processing job #{job[:id]}"
    
    case job[:type]
    when 'email'
      send_email(job[:payload])
    when 'image_resize'
      resize_image(job[:payload])
    when 'report_generation'
      generate_report(job[:payload])
    else
      puts "Unknown job type: #{job[:type]}"
    end
    
    duration = Time.now - start_time
    puts "Job #{job[:id]} completed in #{duration.round(2)}s"
  rescue => e
    puts "Job #{job[:id]} failed: #{e.message}"
  end
  
  def send_email(payload)
    sleep(0.5)  # Simulate email sending
  end
  
  def resize_image(payload)
    sleep(1.0)  # Simulate image processing
  end
  
  def generate_report(payload)
    sleep(2.0)  # Simulate report generation
  end
end

# Usage in web application
job_queue = JobQueue.new(500)
job_queue.start_workers(8)

# Enqueue jobs from web requests
job_id = job_queue.enqueue_job('email', { to: 'user@example.com', subject: 'Welcome' })
job_id = job_queue.enqueue_job('image_resize', { path: '/uploads/image.jpg', size: '300x300' })

# Graceful shutdown
at_exit { job_queue.shutdown }

Batch processing systems use queues to coordinate work distribution across multiple processing nodes. Producer processes scan for work and populate queues while consumer processes execute batch operations. This pattern supports horizontal scaling by adding more consumer processes or nodes.

require 'thread'

class BatchProcessor
  def initialize(batch_size: 10, max_queue_size: 1000)
    @batch_size = batch_size
    @work_queue = SizedQueue.new(max_queue_size)
    @batch_queue = Queue.new
    @processors = []
    @batcher = nil
  end
  
  def add_work_item(item)
    @work_queue.push(item)
  end
  
  def start_processing(processor_count: 4)
    # Start batch creation thread
    @batcher = Thread.new { create_batches }
    
    # Start batch processor threads
    processor_count.times do |i|
      @processors << Thread.new { process_batches(i) }
    end
  end
  
  def shutdown
    @work_queue.close
    @batcher&.join
    @batch_queue.close
    @processors.each(&:join)
  end
  
  private
  
  def create_batches
    current_batch = []
    
    while !@work_queue.closed?
      begin
        item = @work_queue.pop(true)
        current_batch << item
        
        if current_batch.length >= @batch_size
          @batch_queue.push(current_batch)
          current_batch = []
        end
      rescue ThreadError
        # Work queue empty
        if current_batch.any?
          @batch_queue.push(current_batch)
          current_batch = []
        end
        sleep(0.1)
      rescue ClosedQueueError
        # Send final batch if any items remain
        @batch_queue.push(current_batch) if current_batch.any?
        break
      end
    end
    
    @batch_queue.close
  end
  
  def process_batches(processor_id)
    while !@batch_queue.closed?
      begin
        batch = @batch_queue.pop(true)
        process_batch(batch, processor_id)
      rescue ThreadError
        sleep(0.1)  # Batch queue empty
      rescue ClosedQueueError
        break
      end
    end
  end
  
  def process_batch(batch, processor_id)
    puts "Processor #{processor_id} handling batch of #{batch.length} items"
    
    # Simulate batch processing
    batch.each do |item|
      # Process individual item
      sleep(0.01)
    end
    
    puts "Processor #{processor_id} completed batch"
  end
end

# Usage
processor = BatchProcessor.new(batch_size: 20)
processor.start_processing(processor_count: 6)

# Add work items
1000.times do |i|
  processor.add_work_item("work_item_#{i}")
end

sleep(5)  # Let processing continue
processor.shutdown

Monitoring queue metrics provides visibility into system health and performance. Key metrics include queue depth, processing rates, and worker utilization. Applications should expose these metrics through monitoring endpoints or logging systems.

require 'thread'
require 'json'

class MonitoredQueue
  def initialize(max_size: 1000)
    @queue = SizedQueue.new(max_size)
    @metrics = {
      enqueued_total: 0,
      dequeued_total: 0,
      processing_time_total: 0,
      error_total: 0
    }
    @metrics_mutex = Mutex.new
  end
  
  def enqueue(item)
    @metrics_mutex.synchronize { @metrics[:enqueued_total] += 1 }
    @queue.push(item)
  end
  
  def dequeue_and_process(&block)
    item = @queue.pop
    start_time = Time.now
    
    begin
      result = block.call(item)
      processing_time = Time.now - start_time
      
      @metrics_mutex.synchronize do
        @metrics[:dequeued_total] += 1
        @metrics[:processing_time_total] += processing_time
      end
      
      result
    rescue => e
      @metrics_mutex.synchronize { @metrics[:error_total] += 1 }
      raise
    end
  end
  
  def metrics
    @metrics_mutex.synchronize do
      current_metrics = @metrics.dup
      current_metrics[:queue_depth] = @queue.length
      current_metrics[:queue_utilization] = @queue.length.to_f / @queue.max
      
      if current_metrics[:dequeued_total] > 0
        current_metrics[:avg_processing_time] = 
          current_metrics[:processing_time_total] / current_metrics[:dequeued_total]
      end
      
      current_metrics
    end
  end
  
  def metrics_json
    JSON.pretty_generate(metrics)
  end
end

# Usage with monitoring
monitored_queue = MonitoredQueue.new(max_size: 100)

# Producer thread
producer = Thread.new do
  200.times do |i|
    monitored_queue.enqueue("task_#{i}")
    sleep(0.01)
  end
end

# Consumer threads
consumers = 3.times.map do |i|
  Thread.new do
    loop do
      begin
        monitored_queue.dequeue_and_process do |item|
          # Simulate work with occasional errors
          sleep(0.05)
          raise "Processing error" if rand(100) < 5  # 5% error rate
          "processed_#{item}"
        end
      rescue => e
        puts "Error processing item: #{e.message}"
      end
    end
  end
end

# Metrics reporting thread
reporter = Thread.new do
  loop do
    sleep(2)
    puts "\n--- Queue Metrics ---"
    puts monitored_queue.metrics_json
  end
end

# Let system run
producer.join
sleep(10)
consumers.each(&:kill)
reporter.kill

puts "\nFinal Metrics:"
puts monitored_queue.metrics_json

Error Handling & Debugging

Queue operations can raise several exception types that applications must handle appropriately. ThreadError occurs during non-blocking operations when queues are empty (for pop) or full (for push in SizedQueue). ClosedQueueError occurs when attempting to push items onto closed queues. These exceptions represent normal operational conditions rather than error states.

require 'thread'

def safe_queue_operations
  queue = Queue.new
  sized_queue = SizedQueue.new(2)
  
  # Handle empty queue with non-blocking pop
  begin
    item = queue.pop(true)  # non_block: true
  rescue ThreadError => e
    puts "Queue empty: #{e.message}"
    item = nil
  end
  
  # Handle full sized queue with non-blocking push
  sized_queue.push("item1")
  sized_queue.push("item2")  # Now full
  
  begin
    sized_queue.push("item3", true)  # non_block: true
  rescue ThreadError => e
    puts "Queue full: #{e.message}"
  end
  
  # Handle closed queue operations
  queue.close
  
  begin
    queue.push("new_item")
  rescue ClosedQueueError => e
    puts "Queue closed: #{e.message}"
  end
  
  # Pop still works on closed queue until empty
  queue.push("existing") if queue.empty?  # This would raise ClosedQueueError
  
  # Safe way to check before operations
  unless queue.closed?
    queue.push("item")
  end
end

safe_queue_operations

Timeout scenarios occur when threads wait indefinitely on blocking queue operations. Ruby does not provide built-in timeout mechanisms for queue operations, so applications must implement timeout handling using separate threads or the Timeout module, though the latter approach can introduce complications with thread cleanup.

require 'thread'
require 'timeout'

class TimeoutQueue
  def initialize(queue)
    @queue = queue
  end
  
  def pop_with_timeout(timeout_seconds)
    result = nil
    exception = nil
    
    # Use a separate thread to avoid Timeout module issues
    worker = Thread.new do
      begin
        result = @queue.pop
      rescue => e
        exception = e
      end
    end
    
    if worker.join(timeout_seconds)
      # Thread completed within timeout
      raise exception if exception
      result
    else
      # Thread timed out
      worker.kill
      raise TimeoutError, "Queue pop operation timed out after #{timeout_seconds} seconds"
    end
  end
  
  def push_with_timeout(item, timeout_seconds)
    exception = nil
    
    worker = Thread.new do
      begin
        @queue.push(item)
      rescue => e
        exception = e
      end
    end
    
    unless worker.join(timeout_seconds)
      worker.kill
      raise TimeoutError, "Queue push operation timed out after #{timeout_seconds} seconds"
    end
    
    raise exception if exception
  end
end

# Usage example
queue = Queue.new
timeout_queue = TimeoutQueue.new(queue)

# This will timeout since queue is empty
begin
  item = timeout_queue.pop_with_timeout(2.0)
rescue TimeoutError => e
  puts "Operation timed out: #{e.message}"
end

# Add item and try again
queue.push("test_item")
item = timeout_queue.pop_with_timeout(2.0)  # Succeeds immediately
puts "Retrieved: #{item}"

Debugging queue-related issues requires understanding internal queue state and thread interactions. Common debugging approaches include logging queue operations, monitoring queue depth over time, and tracking thread states during blocking operations.

require 'thread'
require 'logger'

class DebuggingQueue
  def initialize(name, max_size: nil)
    @name = name
    @queue = max_size ? SizedQueue.new(max_size) : Queue.new
    @logger = Logger.new(STDOUT)
    @logger.level = Logger::DEBUG
    @operation_count = 0
    @mutex = Mutex.new
  end
  
  def push(item, non_block: false)
    operation_id = next_operation_id
    thread_id = Thread.current.object_id
    
    @logger.debug "#{@name}[#{operation_id}] Thread #{thread_id}: Attempting push of #{item.inspect}"
    @logger.debug "#{@name}[#{operation_id}] Queue state before push: size=#{@queue.size}, closed=#{@queue.closed?}"
    
    if @queue.is_a?(SizedQueue)
      @logger.debug "#{@name}[#{operation_id}] SizedQueue max capacity: #{@queue.max}"
    end
    
    start_time = Time.now
    
    begin
      @queue.push(item, non_block)
      duration = Time.now - start_time
      @logger.debug "#{@name}[#{operation_id}] Push completed in #{duration.round(4)}s"
    rescue => e
      duration = Time.now - start_time
      @logger.error "#{@name}[#{operation_id}] Push failed after #{duration.round(4)}s: #{e.class} - #{e.message}"
      raise
    end
  end
  
  def pop(non_block: false)
    operation_id = next_operation_id
    thread_id = Thread.current.object_id
    
    @logger.debug "#{@name}[#{operation_id}] Thread #{thread_id}: Attempting pop"
    @logger.debug "#{@name}[#{operation_id}] Queue state before pop: size=#{@queue.size}, closed=#{@queue.closed?}"
    
    start_time = Time.now
    
    begin
      item = @queue.pop(non_block)
      duration = Time.now - start_time
      @logger.debug "#{@name}[#{operation_id}] Pop completed in #{duration.round(4)}s, retrieved: #{item.inspect}"
      item
    rescue => e
      duration = Time.now - start_time
      @logger.error "#{@name}[#{operation_id}] Pop failed after #{duration.round(4)}s: #{e.class} - #{e.message}"
      raise
    end
  end
  
  def close
    @logger.info "#{@name}: Closing queue"
    @queue.close
  end
  
  def debug_state
    state = {
      size: @queue.size,
      closed: @queue.closed?,
      empty: @queue.empty?
    }
    
    if @queue.is_a?(SizedQueue)
      state[:max] = @queue.max
      state[:full] = @queue.size >= @queue.max
    end
    
    @logger.info "#{@name} current state: #{state}"
    state
  end
  
  private
  
  def next_operation_id
    @mutex.synchronize { @operation_count += 1 }
  end
end

# Debugging example with potential deadlock
debug_queue = DebuggingQueue.new("TestQueue", max_size: 2)

# Fill queue to capacity
debug_queue.push("item1")
debug_queue.push("item2")
debug_queue.debug_state

# This thread will block on push
blocker = Thread.new do
  debug_queue.push("item3")  # Blocks until space available
end

sleep(1)  # Let thread start and block

# Check thread state
puts "Blocker thread state: #{blocker.status}"  # Should be "sleep"

# Unblock by consuming item
item = debug_queue.pop
puts "Unblocked with: #{item}"

blocker.join
debug_queue.debug_state

Memory leaks can occur when queue references prevent garbage collection of queued objects. Applications should monitor memory usage and ensure that long-lived queues do not accumulate references to large objects unnecessarily.

require 'thread'

# Example of potential memory leak
def demonstrate_memory_leak
  queue = Queue.new
  
  # Adding large objects that won't be garbage collected
  1000.times do |i|
    large_object = Array.new(10_000) { |j| "data_#{i}_#{j}" }
    queue.push(large_object)
  end
  
  # Queue holds references, preventing GC
  puts "Queue size: #{queue.size}"
  puts "Memory before GC: #{GC.stat[:total_allocated_objects]}"
  
  GC.start
  puts "Memory after GC: #{GC.stat[:total_allocated_objects]}"
  
  # Clear queue to allow GC
  queue.clear
  GC.start
  puts "Memory after queue clear: #{GC.stat[:total_allocated_objects]}"
end

# Better approach: process items as they're added
def demonstrate_memory_efficiency
  queue = Queue.new
  processed_count = 0
  
  # Consumer thread processes items immediately
  consumer = Thread.new do
    while (item = queue.pop)
      # Process and release reference
      processed_count += 1
      item = nil  # Explicit cleanup
      
      GC.start if processed_count % 100 == 0
      break if processed_count >= 1000
    end
  end
  
  # Producer adds items
  1000.times do |i|
    large_object = Array.new(10_000) { |j| "data_#{i}_#{j}" }
    queue.push(large_object)
  end
  
  consumer.join
  puts "Processed #{processed_count} items with bounded memory usage"
end

demonstrate_memory_leak
demonstrate_memory_efficiency

Reference

Queue Class Methods

Method Parameters Returns Description
Queue.new None Queue Creates new unbounded queue

SizedQueue Class Methods

Method Parameters Returns Description
SizedQueue.new(max) max (Integer) SizedQueue Creates bounded queue with maximum capacity

Instance Methods (Both Classes)

Method Parameters Returns Description
#push(object, non_block=false) object (Any), non_block (Boolean) self Adds object to rear of queue
#<<(object) object (Any) self Alias for push (always blocking)
#enq(object, non_block=false) object (Any), non_block (Boolean) self Alias for push
#pop(non_block=false) non_block (Boolean) Object Removes and returns object from front
#deq(non_block=false) non_block (Boolean) Object Alias for pop
#shift(non_block=false) non_block (Boolean) Object Alias for pop
#length None Integer Returns current number of objects
#size None Integer Alias for length
#empty? None Boolean Returns true if queue contains no objects
#clear None self Removes all objects from queue
#close None self Prevents further additions to queue
#closed? None Boolean Returns true if queue is closed

SizedQueue-Specific Methods

Method Parameters Returns Description
#max None Integer Returns maximum queue capacity
#max=(number) number (Integer) Integer Sets maximum queue capacity

Exception Classes

Exception Inheritance Raised When
ThreadError StandardError Non-blocking operation cannot complete
ClosedQueueError StopIteration Push attempted on closed queue

Thread States During Queue Operations

Operation Queue State Thread State Behavior
push Not full, open Running Returns immediately
push Full (SizedQueue) Sleep Blocks until space available
push Closed Running Raises ClosedQueueError
push(non_block: true) Full Running Raises ThreadError
pop Not empty Running Returns immediately
pop Empty Sleep Blocks until item available
pop(non_block: true) Empty Running Raises ThreadError

Memory and Performance Characteristics

Aspect Queue SizedQueue
Memory growth Unbounded Bounded by max
Push throughput High (no capacity checks) Limited by capacity
Memory predictability Low High
Backpressure None Automatic
Suitable for High-throughput scenarios Memory-constrained environments

Thread Safety Guarantees

Operation Atomicity Ordering Synchronization
Single push Atomic FIFO maintained Internal mutex
Single pop Atomic FIFO maintained Internal mutex
Multiple operations Not atomic FIFO per operation Manual coordination required
State queries (length, empty?) Atomic snapshot Consistent with queue state Internal mutex