CrackedRuby logo

CrackedRuby

Thread Pool Patterns

Thread pool patterns in Ruby provide controlled execution of concurrent tasks by managing a fixed number of worker threads that process jobs from a shared queue.

Concurrency and Parallelism Thread Pools
6.7.1

Overview

Thread pools address the overhead of creating and destroying threads for each task by maintaining a collection of reusable worker threads. Ruby implements thread pools through the Thread class combined with thread-safe data structures like Queue and SizedQueue. The pattern separates task submission from task execution, allowing applications to control concurrency levels and resource usage.

Ruby's thread pool implementations typically consist of three components: a work queue that holds pending tasks, a collection of worker threads that consume tasks from the queue, and a coordination mechanism that manages thread lifecycle and graceful shutdown. The Thread class provides the fundamental building blocks, while Queue and SizedQueue handle thread-safe task distribution.

require 'thread'

class BasicThreadPool
  def initialize(size = 4)
    @size = size
    @jobs = Queue.new
    @pool = Array.new(@size) do |i|
      Thread.new do
        while job = @jobs.pop
          job.call
        end
      end
    end
  end
end

Thread pools excel in scenarios with many short-lived tasks, I/O-bound operations, and situations where thread creation overhead impacts performance. Ruby applications commonly use thread pools for web request processing, background job execution, and parallel data processing tasks.

The Global Interpreter Lock (GIL) in MRI Ruby limits true parallelism for CPU-bound tasks, making thread pools most effective for I/O-bound operations where threads spend time waiting for external resources. Alternative Ruby implementations like JRuby and TruffleRuby provide better parallelism characteristics for CPU-intensive workloads.

Basic Usage

Creating a thread pool starts with defining the pool size and initializing worker threads. The pool size determines the maximum number of concurrent tasks and should balance system resources against desired parallelism. A typical implementation uses a Queue to distribute work and maintains worker threads in a loop that processes tasks until shutdown.

class WorkerPool
  def initialize(size = 4)
    @size = size
    @jobs = Queue.new
    @shutdown = false
    @workers = []
    
    create_workers
  end
  
  def submit(&block)
    raise "Pool is shutdown" if @shutdown
    @jobs << block
  end
  
  private
  
  def create_workers
    @size.times do |i|
      @workers << Thread.new do
        loop do
          job = @jobs.pop
          break if job == :shutdown
          
          begin
            job.call
          rescue => e
            puts "Worker #{i} error: #{e.message}"
          end
        end
      end
    end
  end
end

Task submission involves adding callable objects to the work queue. The submit method accepts blocks that worker threads execute asynchronously. Each worker thread runs a continuous loop, popping jobs from the queue and executing them until receiving a shutdown signal.

pool = WorkerPool.new(3)

# Submit various types of work
5.times do |i|
  pool.submit do
    puts "Processing task #{i} on #{Thread.current}"
    sleep(rand(1..3))
    puts "Completed task #{i}"
  end
end

# Process some I/O operations
urls = %w[http://example.com http://google.com http://github.com]
urls.each do |url|
  pool.submit do
    # Simulated HTTP request
    sleep(0.5)
    puts "Fetched #{url}"
  end
end

Graceful shutdown requires signaling worker threads to stop processing new jobs and waiting for current jobs to complete. The shutdown process typically involves stopping job submission, sending shutdown signals to workers, and joining worker threads to ensure completion.

class ManagedThreadPool
  def initialize(size = 4)
    @size = size
    @jobs = Queue.new
    @shutdown = false
    @workers = create_workers
  end
  
  def submit(&block)
    raise "Pool is shutdown" if @shutdown
    @jobs << block
  end
  
  def shutdown
    @shutdown = true
    
    # Send shutdown signal to each worker
    @size.times { @jobs << :shutdown }
    
    # Wait for all workers to complete
    @workers.each(&:join)
  end
  
  def shutdown!
    @shutdown = true
    @workers.each(&:kill)
  end
  
  private
  
  def create_workers
    Array.new(@size) do
      Thread.new do
        while job = @jobs.pop
          break if job == :shutdown
          job.call rescue nil
        end
      end
    end
  end
end

The SizedQueue class provides backpressure by limiting the number of queued jobs, preventing memory issues when job submission outpaces processing. This bounded queue blocks submission when full, creating natural throttling.

class BoundedThreadPool
  def initialize(size = 4, max_queue = 10)
    @jobs = SizedQueue.new(max_queue)
    @workers = Array.new(size) do
      Thread.new do
        while job = @jobs.pop
          break if job == :shutdown
          job.call rescue nil
        end
      end
    end
  end
  
  def submit(&block)
    @jobs << block  # Blocks when queue is full
  end
end

Thread Safety & Concurrency

Thread pools require careful synchronization to prevent race conditions and ensure data integrity. Ruby's Queue and SizedQueue classes provide thread-safe operations, but shared state between tasks needs explicit synchronization using mutexes, condition variables, or other primitives.

Worker threads access shared resources concurrently, creating potential race conditions when multiple threads modify the same data. The Mutex class provides mutual exclusion, ensuring only one thread accesses protected code sections at a time.

class SafeCounter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize do
      @count += 1
    end
  end
  
  def value
    @mutex.synchronize { @count }
  end
end

# Thread-safe usage in pool
class ThreadSafePool
  def initialize(size = 4)
    @jobs = Queue.new
    @results = SafeCounter.new
    @workers = create_workers(size)
  end
  
  def process_batch(items)
    items.each do |item|
      submit do
        process_item(item)
        @results.increment
      end
    end
  end
  
  private
  
  def process_item(item)
    # Simulate processing work
    sleep(0.1)
    item.upcase
  end
end

Condition variables coordinate thread execution when workers need to wait for specific conditions. The ConditionVariable class works with mutexes to implement waiting and signaling patterns common in producer-consumer scenarios.

class ProducerConsumerPool
  def initialize(worker_count = 3)
    @buffer = []
    @mutex = Mutex.new
    @condition = ConditionVariable.new
    @running = true
    
    @workers = Array.new(worker_count) do
      Thread.new { worker_loop }
    end
  end
  
  def add_work(item)
    @mutex.synchronize do
      @buffer << item
      @condition.signal
    end
  end
  
  def shutdown
    @mutex.synchronize do
      @running = false
      @condition.broadcast
    end
    @workers.each(&:join)
  end
  
  private
  
  def worker_loop
    loop do
      item = nil
      
      @mutex.synchronize do
        while @buffer.empty? && @running
          @condition.wait(@mutex)
        end
        
        break unless @running
        item = @buffer.shift
      end
      
      process_work(item) if item
    end
  end
  
  def process_work(item)
    puts "Processing #{item} on #{Thread.current}"
    sleep(rand(0.5..1.5))
  end
end

Atomic operations prevent race conditions for simple numeric operations without requiring explicit locking. Ruby provides thread-safe operations through classes like Concurrent::AtomicReference in the concurrent-ruby gem, though basic atomic behavior exists in core Ruby for certain operations.

class AtomicPool
  def initialize(size = 4)
    @completed_jobs = 0
    @jobs = Queue.new
    @completion_mutex = Mutex.new
    
    @workers = Array.new(size) do
      Thread.new { worker_loop }
    end
  end
  
  def submit(&job)
    @jobs << job
  end
  
  def completed_count
    @completion_mutex.synchronize { @completed_jobs }
  end
  
  private
  
  def worker_loop
    while job = @jobs.pop
      break if job == :shutdown
      
      begin
        job.call
        @completion_mutex.synchronize { @completed_jobs += 1 }
      rescue => e
        handle_error(e)
      end
    end
  end
end

Thread-local storage isolates data per thread, preventing conflicts when workers need thread-specific state. Ruby's Thread.current hash provides simple thread-local storage, while the thread gem offers more sophisticated options.

class ThreadLocalPool
  def initialize(size = 4)
    @jobs = Queue.new
    @workers = Array.new(size) do |i|
      Thread.new do
        Thread.current[:worker_id] = i
        Thread.current[:processed] = 0
        worker_loop
      end
    end
  end
  
  def worker_stats
    @workers.map do |worker|
      {
        id: worker[:worker_id],
        processed: worker[:processed],
        alive: worker.alive?
      }
    end
  end
  
  private
  
  def worker_loop
    while job = @jobs.pop
      break if job == :shutdown
      
      job.call
      Thread.current[:processed] += 1
    end
  end
end

Performance & Memory

Thread pool performance depends on pool size, task characteristics, and system resources. The optimal pool size balances resource usage with throughput, typically ranging from the number of CPU cores for CPU-bound tasks to higher values for I/O-bound operations. Benchmarking different pool sizes reveals performance characteristics for specific workloads.

Memory usage in thread pools includes thread stack space, queued job objects, and shared data structures. Each thread consumes stack space (typically 1-8MB per thread in Ruby), making large pools memory-intensive. The job queue grows with submitted tasks, requiring monitoring in high-throughput scenarios.

require 'benchmark'

class BenchmarkedPool
  def initialize(size)
    @size = size
    @jobs = Queue.new
    @completed = 0
    @mutex = Mutex.new
    @workers = create_workers
  end
  
  def self.benchmark_sizes(work_count = 1000)
    sizes = [1, 2, 4, 8, 16, 32]
    
    sizes.each do |size|
      time = Benchmark.measure do
        pool = new(size)
        
        work_count.times do |i|
          pool.submit do
            # Simulate I/O work
            sleep(0.01)
            Math.sqrt(i)
          end
        end
        
        pool.shutdown
      end
      
      puts "Pool size #{size}: #{time.real.round(2)}s"
    end
  end
  
  def submit(&block)
    @jobs << block
  end
  
  def shutdown
    @size.times { @jobs << :shutdown }
    @workers.each(&:join)
  end
  
  private
  
  def create_workers
    Array.new(@size) do
      Thread.new do
        while job = @jobs.pop
          break if job == :shutdown
          
          job.call
          @mutex.synchronize { @completed += 1 }
        end
      end
    end
  end
end

GC pressure increases with thread pools due to frequent object creation and cross-thread references. Jobs submitted as blocks create closure objects, while results and intermediate objects add to memory pressure. Monitoring GC statistics reveals the impact of threading patterns.

class MemoryAwarePool
  def initialize(size = 4, gc_interval = 100)
    @size = size
    @jobs = Queue.new
    @processed = 0
    @gc_interval = gc_interval
    @workers = create_workers
  end
  
  def submit(&block)
    @jobs << block
  end
  
  def stats
    {
      processed: @processed,
      queue_size: @jobs.size,
      gc_count: GC.count,
      memory_usage: `ps -o rss= -p #{$$}`.to_i
    }
  end
  
  private
  
  def create_workers
    Array.new(@size) do
      Thread.new do
        processed = 0
        
        while job = @jobs.pop
          break if job == :shutdown
          
          job.call
          processed += 1
          @processed += 1
          
          # Periodic GC to manage memory
          if processed % @gc_interval == 0
            GC.start
          end
        end
      end
    end
  end
end

Queue sizing affects memory usage and system behavior under load. Unbounded queues consume memory proportional to submission rate minus processing rate, potentially causing memory exhaustion. Bounded queues provide backpressure but may block submission threads.

class AdaptivePool
  def initialize(size = 4, initial_queue_size = 10)
    @size = size
    @jobs = SizedQueue.new(initial_queue_size)
    @queue_size = initial_queue_size
    @workers = create_workers
    @stats_mutex = Mutex.new
    @submission_times = []
  end
  
  def submit(&block)
    start_time = Time.now
    
    begin
      @jobs << block
      record_submission_time(Time.now - start_time)
    rescue ThreadError => e
      # Queue full - could implement retry logic or adaptive resizing
      raise "Pool queue full: #{e.message}"
    end
  end
  
  def queue_utilization
    (@jobs.size.to_f / @jobs.max).round(2)
  end
  
  private
  
  def record_submission_time(duration)
    @stats_mutex.synchronize do
      @submission_times << duration
      @submission_times = @submission_times.last(100)  # Keep recent history
    end
  end
  
  def avg_submission_time
    @stats_mutex.synchronize do
      return 0 if @submission_times.empty?
      @submission_times.sum / @submission_times.size
    end
  end
end

Production Patterns

Production thread pools require robust error handling, monitoring, graceful degradation, and operational visibility. Applications typically implement circuit breakers, health checks, and metrics collection to maintain reliability under varying load conditions.

Error isolation prevents individual job failures from affecting other tasks or worker threads. Each worker catches and handles exceptions locally, logging errors while continuing to process subsequent jobs. Dead letter queues capture failed jobs for retry or analysis.

class ProductionPool
  def initialize(size = 4, max_retries = 3)
    @size = size
    @max_retries = max_retries
    @jobs = Queue.new
    @failed_jobs = Queue.new
    @stats = ThreadSafeStats.new
    @logger = Logger.new(STDOUT)
    @workers = create_workers
    @monitor_thread = create_monitor
  end
  
  def submit(job_data, &block)
    job = Job.new(job_data, block, @max_retries)
    @jobs << job
    @stats.increment(:submitted)
  end
  
  def failed_jobs_count
    @failed_jobs.size
  end
  
  def health_check
    {
      active_workers: @workers.count(&:alive?),
      queue_size: @jobs.size,
      failed_jobs: failed_jobs_count,
      stats: @stats.snapshot
    }
  end
  
  private
  
  class Job
    attr_reader :data, :block, :retries_left, :created_at
    
    def initialize(data, block, max_retries)
      @data = data
      @block = block
      @retries_left = max_retries
      @created_at = Time.now
    end
    
    def execute
      @block.call(@data)
    end
    
    def retry?
      @retries_left > 0
    end
    
    def decrement_retries
      @retries_left -= 1
    end
  end
  
  def create_workers
    Array.new(@size) do |i|
      Thread.new do
        @logger.info "Worker #{i} started"
        
        while job = @jobs.pop
          break if job == :shutdown
          
          process_job(job, i)
        end
        
        @logger.info "Worker #{i} shutting down"
      end
    end
  end
  
  def process_job(job, worker_id)
    start_time = Time.now
    
    begin
      job.execute
      @stats.increment(:completed)
      @stats.record_duration(Time.now - start_time)
    rescue => e
      @logger.error "Worker #{worker_id} job failed: #{e.message}"
      @logger.error e.backtrace.join("\n")
      
      @stats.increment(:failed)
      
      if job.retry?
        job.decrement_retries
        @jobs << job
        @stats.increment(:retried)
      else
        @failed_jobs << job
      end
    end
  end
  
  def create_monitor
    Thread.new do
      loop do
        sleep(60)
        report_metrics
      end
    end
  end
  
  def report_metrics
    health = health_check
    @logger.info "Pool health: #{health}"
    
    # Replace workers that died
    @workers.each_with_index do |worker, i|
      unless worker.alive?
        @logger.warn "Replacing dead worker #{i}"
        @workers[i] = create_replacement_worker(i)
      end
    end
  end
end

class ThreadSafeStats
  def initialize
    @data = {}
    @mutex = Mutex.new
  end
  
  def increment(key)
    @mutex.synchronize do
      @data[key] = (@data[key] || 0) + 1
    end
  end
  
  def record_duration(duration)
    @mutex.synchronize do
      @data[:total_duration] = (@data[:total_duration] || 0) + duration
      @data[:duration_count] = (@data[:duration_count] || 0) + 1
    end
  end
  
  def snapshot
    @mutex.synchronize { @data.dup }
  end
end

Circuit breaker patterns prevent cascading failures when external dependencies become unavailable. The pool monitors failure rates and temporarily stops processing certain types of jobs when failure thresholds exceed acceptable levels.

class CircuitBreakerPool
  CIRCUIT_STATES = %i[closed open half_open].freeze
  
  def initialize(size = 4, failure_threshold = 5, timeout = 60)
    @size = size
    @failure_threshold = failure_threshold
    @timeout = timeout
    @circuit_state = :closed
    @failure_count = 0
    @last_failure_time = nil
    @mutex = Mutex.new
    
    @jobs = Queue.new
    @workers = create_workers
  end
  
  def submit(job_type, &block)
    if circuit_open?(job_type)
      raise CircuitOpenError, "Circuit breaker open for #{job_type}"
    end
    
    @jobs << [job_type, block]
  end
  
  private
  
  class CircuitOpenError < StandardError; end
  
  def circuit_open?(job_type)
    @mutex.synchronize do
      case @circuit_state
      when :closed
        false
      when :open
        if Time.now - @last_failure_time > @timeout
          @circuit_state = :half_open
          false
        else
          true
        end
      when :half_open
        false
      end
    end
  end
  
  def record_success
    @mutex.synchronize do
      @failure_count = 0
      @circuit_state = :closed if @circuit_state == :half_open
    end
  end
  
  def record_failure
    @mutex.synchronize do
      @failure_count += 1
      @last_failure_time = Time.now
      
      if @failure_count >= @failure_threshold
        @circuit_state = :open
      end
    end
  end
  
  def create_workers
    Array.new(@size) do
      Thread.new do
        while job_data = @jobs.pop
          break if job_data == :shutdown
          
          job_type, block = job_data
          
          begin
            block.call
            record_success
          rescue => e
            record_failure
            raise e
          end
        end
      end
    end
  end
end

Integration with monitoring systems provides operational visibility into thread pool behavior. Metrics collection tracks throughput, latency, error rates, and resource utilization for performance analysis and alerting.

class MonitoredPool
  def initialize(size = 4, metrics_client = nil)
    @size = size
    @metrics = metrics_client || SimpleMetrics.new
    @jobs = Queue.new
    @workers = create_workers
    @start_time = Time.now
    
    setup_periodic_reporting
  end
  
  def submit(&block)
    @metrics.increment('pool.jobs.submitted')
    @jobs << block
  end
  
  def shutdown
    @size.times { @jobs << :shutdown }
    @workers.each(&:join)
    @metrics.gauge('pool.status', 'shutdown')
  end
  
  private
  
  def create_workers
    Array.new(@size) do |i|
      Thread.new do
        @metrics.increment('pool.workers.started')
        
        while job = @jobs.pop
          break if job == :shutdown
          process_with_metrics(job)
        end
        
        @metrics.increment('pool.workers.stopped')
      end
    end
  end
  
  def process_with_metrics(job)
    start_time = Time.now
    
    begin
      job.call
      @metrics.increment('pool.jobs.completed')
      @metrics.timing('pool.job.duration', Time.now - start_time)
    rescue => e
      @metrics.increment('pool.jobs.failed')
      raise e
    end
  end
  
  def setup_periodic_reporting
    Thread.new do
      loop do
        sleep(30)
        report_pool_metrics
      end
    end
  end
  
  def report_pool_metrics
    @metrics.gauge('pool.queue.size', @jobs.size)
    @metrics.gauge('pool.workers.alive', @workers.count(&:alive?))
    @metrics.gauge('pool.uptime', Time.now - @start_time)
  end
end

class SimpleMetrics
  def initialize
    @data = Hash.new(0)
    @mutex = Mutex.new
  end
  
  def increment(key, value = 1)
    @mutex.synchronize { @data[key] += value }
  end
  
  def gauge(key, value)
    @mutex.synchronize { @data[key] = value }
  end
  
  def timing(key, duration)
    gauge("#{key}.ms", (duration * 1000).round(2))
  end
  
  def snapshot
    @mutex.synchronize { @data.dup }
  end
end

Reference

Core Classes and Methods

Class Purpose Key Methods
Thread Creates and manages individual threads #new, #join, #kill, #alive?, #value
Queue Thread-safe FIFO queue for job distribution #push (<<), #pop, #size, #empty?
SizedQueue Bounded queue with backpressure #max, #max=, inherits Queue methods
Mutex Mutual exclusion lock #synchronize, #lock, #unlock, #owned?
ConditionVariable Thread coordination primitive #wait, #signal, #broadcast

Thread Pool Implementation Patterns

Pattern Use Case Implementation
Basic Pool Simple task execution Fixed workers + unbounded queue
Bounded Pool Memory-conscious processing Fixed workers + SizedQueue
Adaptive Pool Variable load handling Dynamic worker adjustment
Priority Pool Task prioritization Multiple queues by priority
Circuit Breaker Pool Fault tolerance Failure tracking + circuit states

Queue Types and Characteristics

Queue Type Thread Safety Blocking Behavior Memory Usage
Queue Yes Blocks on pop when empty Unbounded
SizedQueue Yes Blocks on push when full, pop when empty Bounded
Array No Non-blocking Dynamic
Custom implementations Depends Configurable Varies

Worker Thread Lifecycle States

State Description Transitions
new Thread created but not started run
run Actively processing jobs sleep, stop
sleep Waiting for jobs or conditions run, stop
stop Gracefully shutting down dead
dead Thread terminated Final state

Common Configuration Options

Option Default Range Impact
Pool Size 4 1-100+ Concurrency level, memory usage
Queue Size Unlimited 1-10000+ Memory usage, backpressure
Timeout None 1s-300s Responsiveness, resource cleanup
Max Retries 0 0-10 Fault tolerance, processing time

Error Handling Strategies

Strategy Implementation Pros Cons
Ignore rescue nil Simple, fast Silent failures
Log Only rescue => e; log(e) Visible errors No recovery
Retry Requeue failed jobs Fault tolerance Potential loops
Dead Letter Separate failed job queue Analysis capability Additional complexity
Circuit Breaker Failure threshold tracking Prevents cascading failures Complex logic

Performance Tuning Guidelines

Metric Measurement Optimization Strategy
Throughput Jobs/second Increase pool size, optimize job logic
Latency Time to completion Reduce queue wait time, faster processing
Memory RSS, heap usage Bound queues, periodic GC
CPU Usage System load Balance pool size with available cores
I/O Wait Thread blocking time Increase pool size for I/O-bound tasks

Thread Safety Checklist

Concern Solution Implementation
Shared mutable state Mutex synchronization @mutex.synchronize { }
Atomic operations Thread-safe primitives Queue, SizedQueue
Resource cleanup Ensure proper shutdown Join threads, close resources
Exception isolation Per-worker error handling rescue in worker loops
Memory visibility Proper synchronization Use mutexes or atomic classes

Monitoring Metrics

Metric Category Key Indicators Collection Method
Throughput Jobs submitted/completed/failed per second Counter increments
Latency Job processing time, queue wait time Timer measurements
Queue Health Queue size, utilization percentage Gauge snapshots
Worker Status Active workers, thread lifecycle events State tracking
Error Rates Failure percentage, retry counts Error counters
Resource Usage Memory, CPU, thread count System metrics