Overview
Ruby provides two thread-safe queue classes in the standard library: Queue
and SizedQueue
. Both classes implement FIFO (first-in, first-out) data structures that handle concurrent access from multiple threads without requiring explicit synchronization. The Queue
class creates unbounded queues that grow dynamically as items are added, while SizedQueue
creates bounded queues with a maximum capacity that blocks producers when the queue reaches its limit.
These classes live in the thread
library and become available when required. The Queue
class inherits from Thread::Queue
, and SizedQueue
inherits from Thread::SizedQueue
. Both classes share the same basic interface for adding and removing items but differ in their capacity management and blocking behavior.
require 'thread'
# Unbounded queue - grows without limit
queue = Queue.new
# Bounded queue with maximum capacity of 5 items
sized_queue = SizedQueue.new(5)
# Basic operations work identically
queue.push("item 1")
sized_queue.push("item 1")
item = queue.pop
item = sized_queue.pop
The primary use case for these queues involves producer-consumer patterns where one or more threads generate work items while other threads process them. The thread-safe nature eliminates the need for manual synchronization with mutexes or other locking mechanisms. Ruby handles all internal synchronization automatically, making these classes the preferred choice for inter-thread communication.
require 'thread'
# Producer thread adds work items
producer = Thread.new do
queue = Queue.new
10.times { |i| queue.push("task_#{i}") }
queue.close
end
# Consumer thread processes items
consumer = Thread.new do
while (item = queue.pop)
puts "Processing: #{item}"
end
end
Basic Usage
The core operations for both Queue
and SizedQueue
involve adding items with push
(or its aliases <<
and enq
) and removing items with pop
(or its alias deq
). The push
operation adds items to the rear of the queue, while pop
removes and returns items from the front, maintaining FIFO ordering.
require 'thread'
queue = Queue.new
# Adding items (all equivalent)
queue.push("first")
queue << "second"
queue.enq("third")
# Removing items (all equivalent)
item1 = queue.pop # => "first"
item2 = queue.deq # => "second"
item3 = queue.shift # => "third"
The pop
method blocks the calling thread when the queue is empty, waiting until another thread adds an item. This blocking behavior enables clean producer-consumer patterns without busy-waiting or polling. To avoid blocking, use pop
with the non_block
parameter set to true
, which raises ThreadError
instead of waiting.
require 'thread'
queue = Queue.new
# This will block indefinitely since queue is empty
# item = queue.pop
# Non-blocking pop raises exception when empty
begin
item = queue.pop(true) # non_block: true
rescue ThreadError
puts "Queue is empty"
end
# Check queue state
puts queue.empty? # => true
puts queue.length # => 0
puts queue.size # => 0 (alias for length)
SizedQueue
introduces capacity management through its constructor parameter and additional blocking behavior. When a SizedQueue
reaches its maximum capacity, push
operations block until space becomes available through pop
operations or queue closure.
require 'thread'
# Create queue with capacity of 2 items
sized_queue = SizedQueue.new(2)
# Fill to capacity
sized_queue.push("item1")
sized_queue.push("item2")
puts sized_queue.length # => 2
puts sized_queue.max # => 2
# This would block since queue is full
# sized_queue.push("item3")
# Non-blocking push raises exception when full
begin
sized_queue.push("item3", true) # non_block: true
rescue ThreadError
puts "Queue is full"
end
# Make space and add item
item = sized_queue.pop # => "item1"
sized_queue.push("item3") # Now succeeds
Both queue types support closing operations that prevent further additions while allowing existing items to be consumed. The close
method stops accepting new items, and closed?
reports the queue state. Attempting to push to a closed queue raises ClosedQueueError
.
require 'thread'
queue = Queue.new
queue.push("existing item")
queue.close
puts queue.closed? # => true
puts queue.empty? # => false
item = queue.pop # => "existing item" (still works)
begin
queue.push("new item")
rescue ClosedQueueError
puts "Cannot add to closed queue"
end
Thread Safety & Concurrency
Ruby implements both Queue
and SizedQueue
with internal mutexes and condition variables that handle all synchronization automatically. Multiple threads can call push
and pop
operations concurrently without requiring external synchronization. The implementation guarantees atomic operations and maintains FIFO ordering even under heavy concurrent access.
The blocking behavior of these queues provides natural flow control in producer-consumer scenarios. When consumers process items faster than producers create them, consumers wait on empty queues. When producers create items faster than consumers process them, the system behavior differs between queue types: Queue
grows unboundedly while SizedQueue
blocks producers at capacity.
require 'thread'
queue = Queue.new
completed = []
mutex = Mutex.new
# Multiple producer threads
producers = 3.times.map do |i|
Thread.new do
10.times do |j|
queue.push("producer_#{i}_item_#{j}")
sleep(0.01) # Simulate work
end
end
end
# Multiple consumer threads
consumers = 2.times.map do |i|
Thread.new do
loop do
item = queue.pop(true) rescue break # Non-blocking
mutex.synchronize { completed << item }
sleep(0.02) # Simulate processing
end
end
end
# Wait for producers to finish
producers.each(&:join)
# Give consumers time to finish remaining items
sleep(1)
puts "Completed #{completed.length} items"
SizedQueue
provides backpressure mechanisms that prevent runaway memory consumption when producers outpace consumers. The blocking behavior of push
operations creates a natural throttling effect, causing fast producers to wait for slow consumers to catch up.
require 'thread'
# Small queue creates backpressure
sized_queue = SizedQueue.new(3)
processing_times = []
# Fast producer
producer = Thread.new do
start_time = Time.now
20.times do |i|
sized_queue.push("item_#{i}")
processing_times << Time.now - start_time
end
end
# Slow consumer
consumer = Thread.new do
20.times do
item = sized_queue.pop
puts "Processing: #{item}"
sleep(0.1) # Simulate slow processing
end
end
producer.join
consumer.join
# Producer was throttled by consumer speed
puts "Producer blocked #{processing_times.count { |t| t > 0.05 }} times"
Race conditions cannot occur with proper queue usage since Ruby handles all internal state management. However, external state accessed by queue consumers still requires appropriate synchronization. The queue operations themselves are atomic, but combining multiple queue operations or accessing shared variables requires additional coordination.
require 'thread'
# Safe: Queue operations are atomic
queue = Queue.new
counter = 0
mutex = Mutex.new
# Multiple threads incrementing counter
threads = 5.times.map do
Thread.new do
100.times do
item = "work_item"
queue.push(item)
# External state needs synchronization
mutex.synchronize { counter += 1 }
end
end
end
threads.each(&:join)
puts "Queue size: #{queue.size}"
puts "Counter: #{counter}"
Deadlock situations can occur when threads hold multiple resources or when queue operations interact with other synchronization primitives. Avoid holding locks while performing blocking queue operations, and maintain consistent ordering when acquiring multiple locks.
require 'thread'
# Potential deadlock scenario
queue1 = Queue.new
queue2 = Queue.new
mutex1 = Mutex.new
mutex2 = Mutex.new
# Thread A: acquires mutex1 then waits on queue1
thread_a = Thread.new do
mutex1.synchronize do
puts "Thread A has mutex1"
item = queue1.pop # Could block forever
mutex2.synchronize { puts "Thread A has both mutexes" }
end
end
# Thread B: acquires mutex2 then waits on queue2
thread_b = Thread.new do
mutex2.synchronize do
puts "Thread B has mutex2"
item = queue2.pop # Could block forever
mutex1.synchronize { puts "Thread B has both mutexes" }
end
end
# Better approach: avoid holding locks during blocking operations
Performance & Memory
Queue
objects consume memory proportional to the number of stored items since they maintain an internal array that grows dynamically. Each queued item adds a reference to the internal storage, and memory usage increases linearly with queue depth. Ruby's garbage collector can reclaim memory from popped items, but the internal array may retain capacity for performance reasons.
SizedQueue
provides memory usage bounds through capacity limits, making it suitable for scenarios where memory consumption must remain predictable. The fixed capacity prevents unbounded growth but may impact throughput when producers consistently exceed the queue's capacity and experience blocking delays.
require 'thread'
require 'benchmark'
# Memory usage comparison
def measure_memory_usage(queue, items)
GC.start
before = GC.stat[:total_allocated_objects]
items.times { |i| queue.push("item_#{i}") }
GC.start
after = GC.stat[:total_allocated_objects]
after - before
end
unbounded_queue = Queue.new
bounded_queue = SizedQueue.new(1000)
unbounded_memory = measure_memory_usage(unbounded_queue, 10_000)
bounded_memory = measure_memory_usage(bounded_queue, 1000)
puts "Unbounded queue memory: #{unbounded_memory} objects"
puts "Bounded queue memory: #{bounded_memory} objects"
Throughput performance depends on contention levels and the ratio of producers to consumers. Single producer/single consumer scenarios achieve maximum throughput since they minimize lock contention. Multiple producers or consumers introduce synchronization overhead that reduces overall throughput but enables parallel processing.
require 'thread'
require 'benchmark'
def benchmark_queue_operations(queue, producers, consumers, items_per_producer)
start_time = Time.now
# Create producer threads
producer_threads = producers.times.map do |i|
Thread.new do
items_per_producer.times do |j|
queue.push("producer_#{i}_item_#{j}")
end
end
end
# Create consumer threads
processed = 0
mutex = Mutex.new
total_items = producers * items_per_producer
consumer_threads = consumers.times.map do
Thread.new do
while processed < total_items
begin
item = queue.pop(true)
mutex.synchronize { processed += 1 }
rescue ThreadError
sleep(0.001) # Queue empty, brief pause
end
end
end
end
# Wait for completion
producer_threads.each(&:join)
consumer_threads.each(&:join)
end_time = Time.now
total_items / (end_time - start_time)
end
# Test different configurations
configurations = [
{ producers: 1, consumers: 1 },
{ producers: 4, consumers: 1 },
{ producers: 1, consumers: 4 },
{ producers: 4, consumers: 4 }
]
queue = Queue.new
configurations.each do |config|
throughput = benchmark_queue_operations(queue, config[:producers], config[:consumers], 10_000)
puts "#{config[:producers]}P/#{config[:consumers]}C: #{throughput.round(2)} items/sec"
queue.clear # Reset for next test
end
Lock contention increases with the number of concurrent threads accessing the queue. Ruby's internal synchronization uses mutexes that serialize access to queue operations. Higher contention levels reduce throughput and increase latency variance as threads wait for lock acquisition.
require 'thread'
# Measure contention impact
def measure_contention_delay(queue, thread_count, operations_per_thread)
delays = []
mutex = Mutex.new
threads = thread_count.times.map do
Thread.new do
operations_per_thread.times do
start_time = Time.now
queue.push("item")
item = queue.pop
delay = Time.now - start_time
mutex.synchronize { delays << delay }
end
end
end
threads.each(&:join)
delays
end
queue = Queue.new
[1, 2, 4, 8, 16].each do |thread_count|
delays = measure_contention_delay(queue, thread_count, 1000)
avg_delay = delays.sum / delays.length
max_delay = delays.max
puts "#{thread_count} threads: avg=#{(avg_delay * 1000).round(3)}ms, max=#{(max_delay * 1000).round(3)}ms"
end
Production Patterns
Background job processing represents the most common production use of Ruby queues. Web applications push work items onto queues while separate worker processes consume and execute jobs. This pattern decouples request handling from time-consuming operations, improving response times and system scalability.
require 'thread'
require 'json'
class JobQueue
def initialize(max_size = 1000)
@queue = SizedQueue.new(max_size)
@workers = []
@shutdown = false
end
def enqueue_job(job_type, payload)
job = {
id: SecureRandom.uuid,
type: job_type,
payload: payload,
enqueued_at: Time.now
}
@queue.push(job)
job[:id]
end
def start_workers(worker_count = 4)
worker_count.times do |i|
@workers << Thread.new do
worker_loop(i)
end
end
end
def shutdown
@shutdown = true
@queue.close
@workers.each(&:join)
end
private
def worker_loop(worker_id)
while !@shutdown
begin
job = @queue.pop(true) # Non-blocking
process_job(job, worker_id)
rescue ThreadError
# Queue empty, brief pause
sleep(0.1) unless @shutdown
rescue ClosedQueueError
break # Queue closed, worker should exit
end
end
end
def process_job(job, worker_id)
start_time = Time.now
puts "Worker #{worker_id} processing job #{job[:id]}"
case job[:type]
when 'email'
send_email(job[:payload])
when 'image_resize'
resize_image(job[:payload])
when 'report_generation'
generate_report(job[:payload])
else
puts "Unknown job type: #{job[:type]}"
end
duration = Time.now - start_time
puts "Job #{job[:id]} completed in #{duration.round(2)}s"
rescue => e
puts "Job #{job[:id]} failed: #{e.message}"
end
def send_email(payload)
sleep(0.5) # Simulate email sending
end
def resize_image(payload)
sleep(1.0) # Simulate image processing
end
def generate_report(payload)
sleep(2.0) # Simulate report generation
end
end
# Usage in web application
job_queue = JobQueue.new(500)
job_queue.start_workers(8)
# Enqueue jobs from web requests
job_id = job_queue.enqueue_job('email', { to: 'user@example.com', subject: 'Welcome' })
job_id = job_queue.enqueue_job('image_resize', { path: '/uploads/image.jpg', size: '300x300' })
# Graceful shutdown
at_exit { job_queue.shutdown }
Batch processing systems use queues to coordinate work distribution across multiple processing nodes. Producer processes scan for work and populate queues while consumer processes execute batch operations. This pattern supports horizontal scaling by adding more consumer processes or nodes.
require 'thread'
class BatchProcessor
def initialize(batch_size: 10, max_queue_size: 1000)
@batch_size = batch_size
@work_queue = SizedQueue.new(max_queue_size)
@batch_queue = Queue.new
@processors = []
@batcher = nil
end
def add_work_item(item)
@work_queue.push(item)
end
def start_processing(processor_count: 4)
# Start batch creation thread
@batcher = Thread.new { create_batches }
# Start batch processor threads
processor_count.times do |i|
@processors << Thread.new { process_batches(i) }
end
end
def shutdown
@work_queue.close
@batcher&.join
@batch_queue.close
@processors.each(&:join)
end
private
def create_batches
current_batch = []
while !@work_queue.closed?
begin
item = @work_queue.pop(true)
current_batch << item
if current_batch.length >= @batch_size
@batch_queue.push(current_batch)
current_batch = []
end
rescue ThreadError
# Work queue empty
if current_batch.any?
@batch_queue.push(current_batch)
current_batch = []
end
sleep(0.1)
rescue ClosedQueueError
# Send final batch if any items remain
@batch_queue.push(current_batch) if current_batch.any?
break
end
end
@batch_queue.close
end
def process_batches(processor_id)
while !@batch_queue.closed?
begin
batch = @batch_queue.pop(true)
process_batch(batch, processor_id)
rescue ThreadError
sleep(0.1) # Batch queue empty
rescue ClosedQueueError
break
end
end
end
def process_batch(batch, processor_id)
puts "Processor #{processor_id} handling batch of #{batch.length} items"
# Simulate batch processing
batch.each do |item|
# Process individual item
sleep(0.01)
end
puts "Processor #{processor_id} completed batch"
end
end
# Usage
processor = BatchProcessor.new(batch_size: 20)
processor.start_processing(processor_count: 6)
# Add work items
1000.times do |i|
processor.add_work_item("work_item_#{i}")
end
sleep(5) # Let processing continue
processor.shutdown
Monitoring queue metrics provides visibility into system health and performance. Key metrics include queue depth, processing rates, and worker utilization. Applications should expose these metrics through monitoring endpoints or logging systems.
require 'thread'
require 'json'
class MonitoredQueue
def initialize(max_size: 1000)
@queue = SizedQueue.new(max_size)
@metrics = {
enqueued_total: 0,
dequeued_total: 0,
processing_time_total: 0,
error_total: 0
}
@metrics_mutex = Mutex.new
end
def enqueue(item)
@metrics_mutex.synchronize { @metrics[:enqueued_total] += 1 }
@queue.push(item)
end
def dequeue_and_process(&block)
item = @queue.pop
start_time = Time.now
begin
result = block.call(item)
processing_time = Time.now - start_time
@metrics_mutex.synchronize do
@metrics[:dequeued_total] += 1
@metrics[:processing_time_total] += processing_time
end
result
rescue => e
@metrics_mutex.synchronize { @metrics[:error_total] += 1 }
raise
end
end
def metrics
@metrics_mutex.synchronize do
current_metrics = @metrics.dup
current_metrics[:queue_depth] = @queue.length
current_metrics[:queue_utilization] = @queue.length.to_f / @queue.max
if current_metrics[:dequeued_total] > 0
current_metrics[:avg_processing_time] =
current_metrics[:processing_time_total] / current_metrics[:dequeued_total]
end
current_metrics
end
end
def metrics_json
JSON.pretty_generate(metrics)
end
end
# Usage with monitoring
monitored_queue = MonitoredQueue.new(max_size: 100)
# Producer thread
producer = Thread.new do
200.times do |i|
monitored_queue.enqueue("task_#{i}")
sleep(0.01)
end
end
# Consumer threads
consumers = 3.times.map do |i|
Thread.new do
loop do
begin
monitored_queue.dequeue_and_process do |item|
# Simulate work with occasional errors
sleep(0.05)
raise "Processing error" if rand(100) < 5 # 5% error rate
"processed_#{item}"
end
rescue => e
puts "Error processing item: #{e.message}"
end
end
end
end
# Metrics reporting thread
reporter = Thread.new do
loop do
sleep(2)
puts "\n--- Queue Metrics ---"
puts monitored_queue.metrics_json
end
end
# Let system run
producer.join
sleep(10)
consumers.each(&:kill)
reporter.kill
puts "\nFinal Metrics:"
puts monitored_queue.metrics_json
Error Handling & Debugging
Queue operations can raise several exception types that applications must handle appropriately. ThreadError
occurs during non-blocking operations when queues are empty (for pop
) or full (for push
in SizedQueue
). ClosedQueueError
occurs when attempting to push items onto closed queues. These exceptions represent normal operational conditions rather than error states.
require 'thread'
def safe_queue_operations
queue = Queue.new
sized_queue = SizedQueue.new(2)
# Handle empty queue with non-blocking pop
begin
item = queue.pop(true) # non_block: true
rescue ThreadError => e
puts "Queue empty: #{e.message}"
item = nil
end
# Handle full sized queue with non-blocking push
sized_queue.push("item1")
sized_queue.push("item2") # Now full
begin
sized_queue.push("item3", true) # non_block: true
rescue ThreadError => e
puts "Queue full: #{e.message}"
end
# Handle closed queue operations
queue.close
begin
queue.push("new_item")
rescue ClosedQueueError => e
puts "Queue closed: #{e.message}"
end
# Pop still works on closed queue until empty
queue.push("existing") if queue.empty? # This would raise ClosedQueueError
# Safe way to check before operations
unless queue.closed?
queue.push("item")
end
end
safe_queue_operations
Timeout scenarios occur when threads wait indefinitely on blocking queue operations. Ruby does not provide built-in timeout mechanisms for queue operations, so applications must implement timeout handling using separate threads or the Timeout
module, though the latter approach can introduce complications with thread cleanup.
require 'thread'
require 'timeout'
class TimeoutQueue
def initialize(queue)
@queue = queue
end
def pop_with_timeout(timeout_seconds)
result = nil
exception = nil
# Use a separate thread to avoid Timeout module issues
worker = Thread.new do
begin
result = @queue.pop
rescue => e
exception = e
end
end
if worker.join(timeout_seconds)
# Thread completed within timeout
raise exception if exception
result
else
# Thread timed out
worker.kill
raise TimeoutError, "Queue pop operation timed out after #{timeout_seconds} seconds"
end
end
def push_with_timeout(item, timeout_seconds)
exception = nil
worker = Thread.new do
begin
@queue.push(item)
rescue => e
exception = e
end
end
unless worker.join(timeout_seconds)
worker.kill
raise TimeoutError, "Queue push operation timed out after #{timeout_seconds} seconds"
end
raise exception if exception
end
end
# Usage example
queue = Queue.new
timeout_queue = TimeoutQueue.new(queue)
# This will timeout since queue is empty
begin
item = timeout_queue.pop_with_timeout(2.0)
rescue TimeoutError => e
puts "Operation timed out: #{e.message}"
end
# Add item and try again
queue.push("test_item")
item = timeout_queue.pop_with_timeout(2.0) # Succeeds immediately
puts "Retrieved: #{item}"
Debugging queue-related issues requires understanding internal queue state and thread interactions. Common debugging approaches include logging queue operations, monitoring queue depth over time, and tracking thread states during blocking operations.
require 'thread'
require 'logger'
class DebuggingQueue
def initialize(name, max_size: nil)
@name = name
@queue = max_size ? SizedQueue.new(max_size) : Queue.new
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG
@operation_count = 0
@mutex = Mutex.new
end
def push(item, non_block: false)
operation_id = next_operation_id
thread_id = Thread.current.object_id
@logger.debug "#{@name}[#{operation_id}] Thread #{thread_id}: Attempting push of #{item.inspect}"
@logger.debug "#{@name}[#{operation_id}] Queue state before push: size=#{@queue.size}, closed=#{@queue.closed?}"
if @queue.is_a?(SizedQueue)
@logger.debug "#{@name}[#{operation_id}] SizedQueue max capacity: #{@queue.max}"
end
start_time = Time.now
begin
@queue.push(item, non_block)
duration = Time.now - start_time
@logger.debug "#{@name}[#{operation_id}] Push completed in #{duration.round(4)}s"
rescue => e
duration = Time.now - start_time
@logger.error "#{@name}[#{operation_id}] Push failed after #{duration.round(4)}s: #{e.class} - #{e.message}"
raise
end
end
def pop(non_block: false)
operation_id = next_operation_id
thread_id = Thread.current.object_id
@logger.debug "#{@name}[#{operation_id}] Thread #{thread_id}: Attempting pop"
@logger.debug "#{@name}[#{operation_id}] Queue state before pop: size=#{@queue.size}, closed=#{@queue.closed?}"
start_time = Time.now
begin
item = @queue.pop(non_block)
duration = Time.now - start_time
@logger.debug "#{@name}[#{operation_id}] Pop completed in #{duration.round(4)}s, retrieved: #{item.inspect}"
item
rescue => e
duration = Time.now - start_time
@logger.error "#{@name}[#{operation_id}] Pop failed after #{duration.round(4)}s: #{e.class} - #{e.message}"
raise
end
end
def close
@logger.info "#{@name}: Closing queue"
@queue.close
end
def debug_state
state = {
size: @queue.size,
closed: @queue.closed?,
empty: @queue.empty?
}
if @queue.is_a?(SizedQueue)
state[:max] = @queue.max
state[:full] = @queue.size >= @queue.max
end
@logger.info "#{@name} current state: #{state}"
state
end
private
def next_operation_id
@mutex.synchronize { @operation_count += 1 }
end
end
# Debugging example with potential deadlock
debug_queue = DebuggingQueue.new("TestQueue", max_size: 2)
# Fill queue to capacity
debug_queue.push("item1")
debug_queue.push("item2")
debug_queue.debug_state
# This thread will block on push
blocker = Thread.new do
debug_queue.push("item3") # Blocks until space available
end
sleep(1) # Let thread start and block
# Check thread state
puts "Blocker thread state: #{blocker.status}" # Should be "sleep"
# Unblock by consuming item
item = debug_queue.pop
puts "Unblocked with: #{item}"
blocker.join
debug_queue.debug_state
Memory leaks can occur when queue references prevent garbage collection of queued objects. Applications should monitor memory usage and ensure that long-lived queues do not accumulate references to large objects unnecessarily.
require 'thread'
# Example of potential memory leak
def demonstrate_memory_leak
queue = Queue.new
# Adding large objects that won't be garbage collected
1000.times do |i|
large_object = Array.new(10_000) { |j| "data_#{i}_#{j}" }
queue.push(large_object)
end
# Queue holds references, preventing GC
puts "Queue size: #{queue.size}"
puts "Memory before GC: #{GC.stat[:total_allocated_objects]}"
GC.start
puts "Memory after GC: #{GC.stat[:total_allocated_objects]}"
# Clear queue to allow GC
queue.clear
GC.start
puts "Memory after queue clear: #{GC.stat[:total_allocated_objects]}"
end
# Better approach: process items as they're added
def demonstrate_memory_efficiency
queue = Queue.new
processed_count = 0
# Consumer thread processes items immediately
consumer = Thread.new do
while (item = queue.pop)
# Process and release reference
processed_count += 1
item = nil # Explicit cleanup
GC.start if processed_count % 100 == 0
break if processed_count >= 1000
end
end
# Producer adds items
1000.times do |i|
large_object = Array.new(10_000) { |j| "data_#{i}_#{j}" }
queue.push(large_object)
end
consumer.join
puts "Processed #{processed_count} items with bounded memory usage"
end
demonstrate_memory_leak
demonstrate_memory_efficiency
Reference
Queue Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Queue.new |
None | Queue |
Creates new unbounded queue |
SizedQueue Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
SizedQueue.new(max) |
max (Integer) |
SizedQueue |
Creates bounded queue with maximum capacity |
Instance Methods (Both Classes)
Method | Parameters | Returns | Description |
---|---|---|---|
#push(object, non_block=false) |
object (Any), non_block (Boolean) |
self |
Adds object to rear of queue |
#<<(object) |
object (Any) |
self |
Alias for push (always blocking) |
#enq(object, non_block=false) |
object (Any), non_block (Boolean) |
self |
Alias for push |
#pop(non_block=false) |
non_block (Boolean) |
Object |
Removes and returns object from front |
#deq(non_block=false) |
non_block (Boolean) |
Object |
Alias for pop |
#shift(non_block=false) |
non_block (Boolean) |
Object |
Alias for pop |
#length |
None | Integer |
Returns current number of objects |
#size |
None | Integer |
Alias for length |
#empty? |
None | Boolean |
Returns true if queue contains no objects |
#clear |
None | self |
Removes all objects from queue |
#close |
None | self |
Prevents further additions to queue |
#closed? |
None | Boolean |
Returns true if queue is closed |
SizedQueue-Specific Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#max |
None | Integer |
Returns maximum queue capacity |
#max=(number) |
number (Integer) |
Integer |
Sets maximum queue capacity |
Exception Classes
Exception | Inheritance | Raised When |
---|---|---|
ThreadError |
StandardError |
Non-blocking operation cannot complete |
ClosedQueueError |
StopIteration |
Push attempted on closed queue |
Thread States During Queue Operations
Operation | Queue State | Thread State | Behavior |
---|---|---|---|
push |
Not full, open | Running | Returns immediately |
push |
Full (SizedQueue ) |
Sleep | Blocks until space available |
push |
Closed | Running | Raises ClosedQueueError |
push(non_block: true) |
Full | Running | Raises ThreadError |
pop |
Not empty | Running | Returns immediately |
pop |
Empty | Sleep | Blocks until item available |
pop(non_block: true) |
Empty | Running | Raises ThreadError |
Memory and Performance Characteristics
Aspect | Queue | SizedQueue |
---|---|---|
Memory growth | Unbounded | Bounded by max |
Push throughput | High (no capacity checks) | Limited by capacity |
Memory predictability | Low | High |
Backpressure | None | Automatic |
Suitable for | High-throughput scenarios | Memory-constrained environments |
Thread Safety Guarantees
Operation | Atomicity | Ordering | Synchronization |
---|---|---|---|
Single push |
Atomic | FIFO maintained | Internal mutex |
Single pop |
Atomic | FIFO maintained | Internal mutex |
Multiple operations | Not atomic | FIFO per operation | Manual coordination required |
State queries (length , empty? ) |
Atomic snapshot | Consistent with queue state | Internal mutex |