Overview
Thread pools address the overhead of creating and destroying threads for each task by maintaining a collection of reusable worker threads. Ruby implements thread pools through the Thread
class combined with thread-safe data structures like Queue
and SizedQueue
. The pattern separates task submission from task execution, allowing applications to control concurrency levels and resource usage.
Ruby's thread pool implementations typically consist of three components: a work queue that holds pending tasks, a collection of worker threads that consume tasks from the queue, and a coordination mechanism that manages thread lifecycle and graceful shutdown. The Thread
class provides the fundamental building blocks, while Queue
and SizedQueue
handle thread-safe task distribution.
require 'thread'
class BasicThreadPool
def initialize(size = 4)
@size = size
@jobs = Queue.new
@pool = Array.new(@size) do |i|
Thread.new do
while job = @jobs.pop
job.call
end
end
end
end
end
Thread pools excel in scenarios with many short-lived tasks, I/O-bound operations, and situations where thread creation overhead impacts performance. Ruby applications commonly use thread pools for web request processing, background job execution, and parallel data processing tasks.
The Global Interpreter Lock (GIL) in MRI Ruby limits true parallelism for CPU-bound tasks, making thread pools most effective for I/O-bound operations where threads spend time waiting for external resources. Alternative Ruby implementations like JRuby and TruffleRuby provide better parallelism characteristics for CPU-intensive workloads.
Basic Usage
Creating a thread pool starts with defining the pool size and initializing worker threads. The pool size determines the maximum number of concurrent tasks and should balance system resources against desired parallelism. A typical implementation uses a Queue
to distribute work and maintains worker threads in a loop that processes tasks until shutdown.
class WorkerPool
def initialize(size = 4)
@size = size
@jobs = Queue.new
@shutdown = false
@workers = []
create_workers
end
def submit(&block)
raise "Pool is shutdown" if @shutdown
@jobs << block
end
private
def create_workers
@size.times do |i|
@workers << Thread.new do
loop do
job = @jobs.pop
break if job == :shutdown
begin
job.call
rescue => e
puts "Worker #{i} error: #{e.message}"
end
end
end
end
end
end
Task submission involves adding callable objects to the work queue. The submit
method accepts blocks that worker threads execute asynchronously. Each worker thread runs a continuous loop, popping jobs from the queue and executing them until receiving a shutdown signal.
pool = WorkerPool.new(3)
# Submit various types of work
5.times do |i|
pool.submit do
puts "Processing task #{i} on #{Thread.current}"
sleep(rand(1..3))
puts "Completed task #{i}"
end
end
# Process some I/O operations
urls = %w[http://example.com http://google.com http://github.com]
urls.each do |url|
pool.submit do
# Simulated HTTP request
sleep(0.5)
puts "Fetched #{url}"
end
end
Graceful shutdown requires signaling worker threads to stop processing new jobs and waiting for current jobs to complete. The shutdown process typically involves stopping job submission, sending shutdown signals to workers, and joining worker threads to ensure completion.
class ManagedThreadPool
def initialize(size = 4)
@size = size
@jobs = Queue.new
@shutdown = false
@workers = create_workers
end
def submit(&block)
raise "Pool is shutdown" if @shutdown
@jobs << block
end
def shutdown
@shutdown = true
# Send shutdown signal to each worker
@size.times { @jobs << :shutdown }
# Wait for all workers to complete
@workers.each(&:join)
end
def shutdown!
@shutdown = true
@workers.each(&:kill)
end
private
def create_workers
Array.new(@size) do
Thread.new do
while job = @jobs.pop
break if job == :shutdown
job.call rescue nil
end
end
end
end
end
The SizedQueue
class provides backpressure by limiting the number of queued jobs, preventing memory issues when job submission outpaces processing. This bounded queue blocks submission when full, creating natural throttling.
class BoundedThreadPool
def initialize(size = 4, max_queue = 10)
@jobs = SizedQueue.new(max_queue)
@workers = Array.new(size) do
Thread.new do
while job = @jobs.pop
break if job == :shutdown
job.call rescue nil
end
end
end
end
def submit(&block)
@jobs << block # Blocks when queue is full
end
end
Thread Safety & Concurrency
Thread pools require careful synchronization to prevent race conditions and ensure data integrity. Ruby's Queue
and SizedQueue
classes provide thread-safe operations, but shared state between tasks needs explicit synchronization using mutexes, condition variables, or other primitives.
Worker threads access shared resources concurrently, creating potential race conditions when multiple threads modify the same data. The Mutex
class provides mutual exclusion, ensuring only one thread accesses protected code sections at a time.
class SafeCounter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
@count += 1
end
end
def value
@mutex.synchronize { @count }
end
end
# Thread-safe usage in pool
class ThreadSafePool
def initialize(size = 4)
@jobs = Queue.new
@results = SafeCounter.new
@workers = create_workers(size)
end
def process_batch(items)
items.each do |item|
submit do
process_item(item)
@results.increment
end
end
end
private
def process_item(item)
# Simulate processing work
sleep(0.1)
item.upcase
end
end
Condition variables coordinate thread execution when workers need to wait for specific conditions. The ConditionVariable
class works with mutexes to implement waiting and signaling patterns common in producer-consumer scenarios.
class ProducerConsumerPool
def initialize(worker_count = 3)
@buffer = []
@mutex = Mutex.new
@condition = ConditionVariable.new
@running = true
@workers = Array.new(worker_count) do
Thread.new { worker_loop }
end
end
def add_work(item)
@mutex.synchronize do
@buffer << item
@condition.signal
end
end
def shutdown
@mutex.synchronize do
@running = false
@condition.broadcast
end
@workers.each(&:join)
end
private
def worker_loop
loop do
item = nil
@mutex.synchronize do
while @buffer.empty? && @running
@condition.wait(@mutex)
end
break unless @running
item = @buffer.shift
end
process_work(item) if item
end
end
def process_work(item)
puts "Processing #{item} on #{Thread.current}"
sleep(rand(0.5..1.5))
end
end
Atomic operations prevent race conditions for simple numeric operations without requiring explicit locking. Ruby provides thread-safe operations through classes like Concurrent::AtomicReference
in the concurrent-ruby gem, though basic atomic behavior exists in core Ruby for certain operations.
class AtomicPool
def initialize(size = 4)
@completed_jobs = 0
@jobs = Queue.new
@completion_mutex = Mutex.new
@workers = Array.new(size) do
Thread.new { worker_loop }
end
end
def submit(&job)
@jobs << job
end
def completed_count
@completion_mutex.synchronize { @completed_jobs }
end
private
def worker_loop
while job = @jobs.pop
break if job == :shutdown
begin
job.call
@completion_mutex.synchronize { @completed_jobs += 1 }
rescue => e
handle_error(e)
end
end
end
end
Thread-local storage isolates data per thread, preventing conflicts when workers need thread-specific state. Ruby's Thread.current
hash provides simple thread-local storage, while the thread
gem offers more sophisticated options.
class ThreadLocalPool
def initialize(size = 4)
@jobs = Queue.new
@workers = Array.new(size) do |i|
Thread.new do
Thread.current[:worker_id] = i
Thread.current[:processed] = 0
worker_loop
end
end
end
def worker_stats
@workers.map do |worker|
{
id: worker[:worker_id],
processed: worker[:processed],
alive: worker.alive?
}
end
end
private
def worker_loop
while job = @jobs.pop
break if job == :shutdown
job.call
Thread.current[:processed] += 1
end
end
end
Performance & Memory
Thread pool performance depends on pool size, task characteristics, and system resources. The optimal pool size balances resource usage with throughput, typically ranging from the number of CPU cores for CPU-bound tasks to higher values for I/O-bound operations. Benchmarking different pool sizes reveals performance characteristics for specific workloads.
Memory usage in thread pools includes thread stack space, queued job objects, and shared data structures. Each thread consumes stack space (typically 1-8MB per thread in Ruby), making large pools memory-intensive. The job queue grows with submitted tasks, requiring monitoring in high-throughput scenarios.
require 'benchmark'
class BenchmarkedPool
def initialize(size)
@size = size
@jobs = Queue.new
@completed = 0
@mutex = Mutex.new
@workers = create_workers
end
def self.benchmark_sizes(work_count = 1000)
sizes = [1, 2, 4, 8, 16, 32]
sizes.each do |size|
time = Benchmark.measure do
pool = new(size)
work_count.times do |i|
pool.submit do
# Simulate I/O work
sleep(0.01)
Math.sqrt(i)
end
end
pool.shutdown
end
puts "Pool size #{size}: #{time.real.round(2)}s"
end
end
def submit(&block)
@jobs << block
end
def shutdown
@size.times { @jobs << :shutdown }
@workers.each(&:join)
end
private
def create_workers
Array.new(@size) do
Thread.new do
while job = @jobs.pop
break if job == :shutdown
job.call
@mutex.synchronize { @completed += 1 }
end
end
end
end
end
GC pressure increases with thread pools due to frequent object creation and cross-thread references. Jobs submitted as blocks create closure objects, while results and intermediate objects add to memory pressure. Monitoring GC statistics reveals the impact of threading patterns.
class MemoryAwarePool
def initialize(size = 4, gc_interval = 100)
@size = size
@jobs = Queue.new
@processed = 0
@gc_interval = gc_interval
@workers = create_workers
end
def submit(&block)
@jobs << block
end
def stats
{
processed: @processed,
queue_size: @jobs.size,
gc_count: GC.count,
memory_usage: `ps -o rss= -p #{$$}`.to_i
}
end
private
def create_workers
Array.new(@size) do
Thread.new do
processed = 0
while job = @jobs.pop
break if job == :shutdown
job.call
processed += 1
@processed += 1
# Periodic GC to manage memory
if processed % @gc_interval == 0
GC.start
end
end
end
end
end
end
Queue sizing affects memory usage and system behavior under load. Unbounded queues consume memory proportional to submission rate minus processing rate, potentially causing memory exhaustion. Bounded queues provide backpressure but may block submission threads.
class AdaptivePool
def initialize(size = 4, initial_queue_size = 10)
@size = size
@jobs = SizedQueue.new(initial_queue_size)
@queue_size = initial_queue_size
@workers = create_workers
@stats_mutex = Mutex.new
@submission_times = []
end
def submit(&block)
start_time = Time.now
begin
@jobs << block
record_submission_time(Time.now - start_time)
rescue ThreadError => e
# Queue full - could implement retry logic or adaptive resizing
raise "Pool queue full: #{e.message}"
end
end
def queue_utilization
(@jobs.size.to_f / @jobs.max).round(2)
end
private
def record_submission_time(duration)
@stats_mutex.synchronize do
@submission_times << duration
@submission_times = @submission_times.last(100) # Keep recent history
end
end
def avg_submission_time
@stats_mutex.synchronize do
return 0 if @submission_times.empty?
@submission_times.sum / @submission_times.size
end
end
end
Production Patterns
Production thread pools require robust error handling, monitoring, graceful degradation, and operational visibility. Applications typically implement circuit breakers, health checks, and metrics collection to maintain reliability under varying load conditions.
Error isolation prevents individual job failures from affecting other tasks or worker threads. Each worker catches and handles exceptions locally, logging errors while continuing to process subsequent jobs. Dead letter queues capture failed jobs for retry or analysis.
class ProductionPool
def initialize(size = 4, max_retries = 3)
@size = size
@max_retries = max_retries
@jobs = Queue.new
@failed_jobs = Queue.new
@stats = ThreadSafeStats.new
@logger = Logger.new(STDOUT)
@workers = create_workers
@monitor_thread = create_monitor
end
def submit(job_data, &block)
job = Job.new(job_data, block, @max_retries)
@jobs << job
@stats.increment(:submitted)
end
def failed_jobs_count
@failed_jobs.size
end
def health_check
{
active_workers: @workers.count(&:alive?),
queue_size: @jobs.size,
failed_jobs: failed_jobs_count,
stats: @stats.snapshot
}
end
private
class Job
attr_reader :data, :block, :retries_left, :created_at
def initialize(data, block, max_retries)
@data = data
@block = block
@retries_left = max_retries
@created_at = Time.now
end
def execute
@block.call(@data)
end
def retry?
@retries_left > 0
end
def decrement_retries
@retries_left -= 1
end
end
def create_workers
Array.new(@size) do |i|
Thread.new do
@logger.info "Worker #{i} started"
while job = @jobs.pop
break if job == :shutdown
process_job(job, i)
end
@logger.info "Worker #{i} shutting down"
end
end
end
def process_job(job, worker_id)
start_time = Time.now
begin
job.execute
@stats.increment(:completed)
@stats.record_duration(Time.now - start_time)
rescue => e
@logger.error "Worker #{worker_id} job failed: #{e.message}"
@logger.error e.backtrace.join("\n")
@stats.increment(:failed)
if job.retry?
job.decrement_retries
@jobs << job
@stats.increment(:retried)
else
@failed_jobs << job
end
end
end
def create_monitor
Thread.new do
loop do
sleep(60)
report_metrics
end
end
end
def report_metrics
health = health_check
@logger.info "Pool health: #{health}"
# Replace workers that died
@workers.each_with_index do |worker, i|
unless worker.alive?
@logger.warn "Replacing dead worker #{i}"
@workers[i] = create_replacement_worker(i)
end
end
end
end
class ThreadSafeStats
def initialize
@data = {}
@mutex = Mutex.new
end
def increment(key)
@mutex.synchronize do
@data[key] = (@data[key] || 0) + 1
end
end
def record_duration(duration)
@mutex.synchronize do
@data[:total_duration] = (@data[:total_duration] || 0) + duration
@data[:duration_count] = (@data[:duration_count] || 0) + 1
end
end
def snapshot
@mutex.synchronize { @data.dup }
end
end
Circuit breaker patterns prevent cascading failures when external dependencies become unavailable. The pool monitors failure rates and temporarily stops processing certain types of jobs when failure thresholds exceed acceptable levels.
class CircuitBreakerPool
CIRCUIT_STATES = %i[closed open half_open].freeze
def initialize(size = 4, failure_threshold = 5, timeout = 60)
@size = size
@failure_threshold = failure_threshold
@timeout = timeout
@circuit_state = :closed
@failure_count = 0
@last_failure_time = nil
@mutex = Mutex.new
@jobs = Queue.new
@workers = create_workers
end
def submit(job_type, &block)
if circuit_open?(job_type)
raise CircuitOpenError, "Circuit breaker open for #{job_type}"
end
@jobs << [job_type, block]
end
private
class CircuitOpenError < StandardError; end
def circuit_open?(job_type)
@mutex.synchronize do
case @circuit_state
when :closed
false
when :open
if Time.now - @last_failure_time > @timeout
@circuit_state = :half_open
false
else
true
end
when :half_open
false
end
end
end
def record_success
@mutex.synchronize do
@failure_count = 0
@circuit_state = :closed if @circuit_state == :half_open
end
end
def record_failure
@mutex.synchronize do
@failure_count += 1
@last_failure_time = Time.now
if @failure_count >= @failure_threshold
@circuit_state = :open
end
end
end
def create_workers
Array.new(@size) do
Thread.new do
while job_data = @jobs.pop
break if job_data == :shutdown
job_type, block = job_data
begin
block.call
record_success
rescue => e
record_failure
raise e
end
end
end
end
end
end
Integration with monitoring systems provides operational visibility into thread pool behavior. Metrics collection tracks throughput, latency, error rates, and resource utilization for performance analysis and alerting.
class MonitoredPool
def initialize(size = 4, metrics_client = nil)
@size = size
@metrics = metrics_client || SimpleMetrics.new
@jobs = Queue.new
@workers = create_workers
@start_time = Time.now
setup_periodic_reporting
end
def submit(&block)
@metrics.increment('pool.jobs.submitted')
@jobs << block
end
def shutdown
@size.times { @jobs << :shutdown }
@workers.each(&:join)
@metrics.gauge('pool.status', 'shutdown')
end
private
def create_workers
Array.new(@size) do |i|
Thread.new do
@metrics.increment('pool.workers.started')
while job = @jobs.pop
break if job == :shutdown
process_with_metrics(job)
end
@metrics.increment('pool.workers.stopped')
end
end
end
def process_with_metrics(job)
start_time = Time.now
begin
job.call
@metrics.increment('pool.jobs.completed')
@metrics.timing('pool.job.duration', Time.now - start_time)
rescue => e
@metrics.increment('pool.jobs.failed')
raise e
end
end
def setup_periodic_reporting
Thread.new do
loop do
sleep(30)
report_pool_metrics
end
end
end
def report_pool_metrics
@metrics.gauge('pool.queue.size', @jobs.size)
@metrics.gauge('pool.workers.alive', @workers.count(&:alive?))
@metrics.gauge('pool.uptime', Time.now - @start_time)
end
end
class SimpleMetrics
def initialize
@data = Hash.new(0)
@mutex = Mutex.new
end
def increment(key, value = 1)
@mutex.synchronize { @data[key] += value }
end
def gauge(key, value)
@mutex.synchronize { @data[key] = value }
end
def timing(key, duration)
gauge("#{key}.ms", (duration * 1000).round(2))
end
def snapshot
@mutex.synchronize { @data.dup }
end
end
Reference
Core Classes and Methods
Class | Purpose | Key Methods |
---|---|---|
Thread |
Creates and manages individual threads | #new , #join , #kill , #alive? , #value |
Queue |
Thread-safe FIFO queue for job distribution | #push (<< ), #pop , #size , #empty? |
SizedQueue |
Bounded queue with backpressure | #max , #max= , inherits Queue methods |
Mutex |
Mutual exclusion lock | #synchronize , #lock , #unlock , #owned? |
ConditionVariable |
Thread coordination primitive | #wait , #signal , #broadcast |
Thread Pool Implementation Patterns
Pattern | Use Case | Implementation |
---|---|---|
Basic Pool | Simple task execution | Fixed workers + unbounded queue |
Bounded Pool | Memory-conscious processing | Fixed workers + SizedQueue |
Adaptive Pool | Variable load handling | Dynamic worker adjustment |
Priority Pool | Task prioritization | Multiple queues by priority |
Circuit Breaker Pool | Fault tolerance | Failure tracking + circuit states |
Queue Types and Characteristics
Queue Type | Thread Safety | Blocking Behavior | Memory Usage |
---|---|---|---|
Queue |
Yes | Blocks on pop when empty |
Unbounded |
SizedQueue |
Yes | Blocks on push when full, pop when empty |
Bounded |
Array |
No | Non-blocking | Dynamic |
Custom implementations | Depends | Configurable | Varies |
Worker Thread Lifecycle States
State | Description | Transitions |
---|---|---|
new |
Thread created but not started | → run |
run |
Actively processing jobs | → sleep , stop |
sleep |
Waiting for jobs or conditions | → run , stop |
stop |
Gracefully shutting down | → dead |
dead |
Thread terminated | Final state |
Common Configuration Options
Option | Default | Range | Impact |
---|---|---|---|
Pool Size | 4 | 1-100+ | Concurrency level, memory usage |
Queue Size | Unlimited | 1-10000+ | Memory usage, backpressure |
Timeout | None | 1s-300s | Responsiveness, resource cleanup |
Max Retries | 0 | 0-10 | Fault tolerance, processing time |
Error Handling Strategies
Strategy | Implementation | Pros | Cons |
---|---|---|---|
Ignore | rescue nil |
Simple, fast | Silent failures |
Log Only | rescue => e; log(e) |
Visible errors | No recovery |
Retry | Requeue failed jobs | Fault tolerance | Potential loops |
Dead Letter | Separate failed job queue | Analysis capability | Additional complexity |
Circuit Breaker | Failure threshold tracking | Prevents cascading failures | Complex logic |
Performance Tuning Guidelines
Metric | Measurement | Optimization Strategy |
---|---|---|
Throughput | Jobs/second | Increase pool size, optimize job logic |
Latency | Time to completion | Reduce queue wait time, faster processing |
Memory | RSS, heap usage | Bound queues, periodic GC |
CPU Usage | System load | Balance pool size with available cores |
I/O Wait | Thread blocking time | Increase pool size for I/O-bound tasks |
Thread Safety Checklist
Concern | Solution | Implementation |
---|---|---|
Shared mutable state | Mutex synchronization | @mutex.synchronize { } |
Atomic operations | Thread-safe primitives | Queue , SizedQueue |
Resource cleanup | Ensure proper shutdown | Join threads, close resources |
Exception isolation | Per-worker error handling | rescue in worker loops |
Memory visibility | Proper synchronization | Use mutexes or atomic classes |
Monitoring Metrics
Metric Category | Key Indicators | Collection Method |
---|---|---|
Throughput | Jobs submitted/completed/failed per second | Counter increments |
Latency | Job processing time, queue wait time | Timer measurements |
Queue Health | Queue size, utilization percentage | Gauge snapshots |
Worker Status | Active workers, thread lifecycle events | State tracking |
Error Rates | Failure percentage, retry counts | Error counters |
Resource Usage | Memory, CPU, thread count | System metrics |