CrackedRuby logo

CrackedRuby

Thread Creation and Management

This guide covers creating, managing, and synchronizing threads in Ruby for concurrent programming.

Concurrency and Parallelism Threading
6.1.1

Overview

Ruby implements threads through the Thread class, which provides cooperative multitasking within a single Ruby process. Ruby threads are managed by the Global Interpreter Lock (GIL), which allows only one thread to execute Ruby code at a time, though threads can run concurrently during I/O operations.

The Thread class handles thread lifecycle, state management, and synchronization primitives. Ruby creates threads using Thread.new or Thread.start, passing a block containing the code to execute. Each thread maintains its own stack and local variables while sharing the process heap.

# Basic thread creation
thread = Thread.new do
  puts "Thread executing: #{Thread.current.object_id}"
end
thread.join
# => Thread executing: 70234567890123

Ruby provides several thread states: run, sleep, aborting, dead, and false for threads that terminated with exceptions. The Thread.current method returns the currently executing thread, while Thread.main returns the main program thread.

# Thread state inspection
main_thread = Thread.main
current_thread = Thread.current
puts main_thread == current_thread  # => true (in main thread)
puts Thread.current.status  # => "run"

Thread-local variables store data specific to each thread using Thread.current[key] syntax. Ruby also supports fiber-local variables and provides synchronization primitives through the Mutex, ConditionVariable, and Queue classes.

# Thread-local storage
Thread.current[:user_id] = 12345
Thread.new do
  Thread.current[:user_id] = 67890
  puts Thread.current[:user_id]  # => 67890
end.join
puts Thread.current[:user_id]  # => 12345

Ruby handles thread exceptions by default termination, though unhandled exceptions in non-main threads only affect that specific thread unless Thread.abort_on_exception is set to true.

Basic Usage

Creating threads in Ruby requires passing a block to Thread.new or Thread.start. The block contains the code to execute in the new thread. Ruby immediately starts executing the thread after creation.

# Thread creation and execution
thread = Thread.new do
  5.times do |i|
    puts "Working: #{i}"
    sleep(0.1)
  end
  "completion_result"
end

# Continue main thread work
puts "Main thread continues"

# Wait for thread completion and get result
result = thread.join
puts "Thread returned: #{result}"

The join method blocks the calling thread until the target thread completes execution. Ruby returns the block's last expression as the thread's value. Calling join on an already completed thread immediately returns the stored result.

# Multiple threads with join
threads = []

3.times do |i|
  threads << Thread.new do
    sleep(rand(0.5))
    "Thread #{i} completed"
  end
end

# Wait for all threads and collect results
results = threads.map(&:join)
puts results
# => ["Thread 0 completed", "Thread 1 completed", "Thread 2 completed"]

Ruby provides detach for threads that don't require result collection. Detached threads run independently, and their resources get cleaned up automatically upon completion.

# Detached thread execution
Thread.new do
  # Background processing
  log_data = collect_metrics
  send_to_analytics(log_data)
end.detach

# Main thread continues without waiting
process_user_request

Thread termination occurs through several mechanisms. Normal completion happens when the block finishes executing. The kill or terminate methods force immediate termination, while exit allows graceful shutdown from within the thread.

# Thread termination patterns
worker = Thread.new do
  loop do
    break if Thread.current[:stop_requested]
    perform_work
    sleep(1)
  end
end

# Graceful shutdown
worker[:stop_requested] = true
worker.join

# Force termination (avoid in production)
# worker.kill

Ruby supports passing arguments to thread blocks through block parameters. Arguments get evaluated in the calling thread's context before passing to the new thread.

# Thread arguments
def process_data(data, options = {})
  Thread.new(data, options) do |thread_data, thread_options|
    # Process using thread-local copies
    result = thread_data.map { |item| transform(item, thread_options) }
    thread_options[:callback]&.call(result)
    result
  end
end

data = [1, 2, 3, 4, 5]
thread = process_data(data, multiplier: 2)
puts thread.join  # => [2, 4, 6, 8, 10]

Thread Safety & Concurrency

Ruby's Global Interpreter Lock prevents true parallelism for CPU-bound tasks but allows concurrent execution during I/O operations. Thread safety requires careful coordination when multiple threads access shared data structures.

The Mutex class provides mutual exclusion for critical sections. Ruby blocks other threads attempting to acquire the same mutex until the holding thread releases it.

# Mutex for shared resource protection
class Counter
  def initialize
    @count = 0
    @mutex = Mutex.new
  end
  
  def increment
    @mutex.synchronize do
      current = @count
      sleep(0.001)  # Simulate processing
      @count = current + 1
    end
  end
  
  def value
    @mutex.synchronize { @count }
  end
end

counter = Counter.new
threads = 10.times.map do
  Thread.new { 100.times { counter.increment } }
end

threads.each(&:join)
puts counter.value  # => 1000 (without mutex, often less)

Race conditions occur when thread execution order affects program behavior. Ruby's thread scheduling is non-deterministic, making race conditions difficult to reproduce consistently.

# Race condition demonstration
shared_array = []
mutex = Mutex.new

threads = 5.times.map do |i|
  Thread.new do
    # Without synchronization - race condition
    100.times do
      # shared_array << i  # Unsafe
      
      # With synchronization - thread safe
      mutex.synchronize do
        shared_array << i
      end
    end
  end
end

threads.each(&:join)
puts shared_array.size  # => 500 (with mutex)

ConditionVariable enables thread coordination through wait/signal patterns. Threads wait for specific conditions while other threads signal when conditions become true.

# Producer-consumer with condition variable
class Buffer
  def initialize(max_size = 5)
    @items = []
    @max_size = max_size
    @mutex = Mutex.new
    @not_full = ConditionVariable.new
    @not_empty = ConditionVariable.new
  end
  
  def put(item)
    @mutex.synchronize do
      @not_full.wait(@mutex) while @items.size >= @max_size
      @items << item
      @not_empty.signal
    end
  end
  
  def get
    @mutex.synchronize do
      @not_empty.wait(@mutex) while @items.empty?
      item = @items.shift
      @not_full.signal
      item
    end
  end
end

buffer = Buffer.new

# Producer thread
producer = Thread.new do
  10.times do |i|
    buffer.put("item_#{i}")
    puts "Produced: item_#{i}"
  end
end

# Consumer thread
consumer = Thread.new do
  10.times do
    item = buffer.get
    puts "Consumed: #{item}"
  end
end

[producer, consumer].each(&:join)

The Queue and SizedQueue classes provide thread-safe data structures with built-in synchronization. Queue handles unlimited items while SizedQueue blocks producers when capacity is reached.

# Thread-safe queue operations
work_queue = SizedQueue.new(10)

# Worker threads
workers = 3.times.map do |id|
  Thread.new do
    while job = work_queue.pop
      puts "Worker #{id} processing: #{job}"
      sleep(0.1)  # Simulate work
    end
  end
end

# Producer adding work
producer = Thread.new do
  20.times do |i|
    work_queue << "job_#{i}"
  end
  # Signal completion
  3.times { work_queue << nil }
end

[producer, *workers].each(&:join)

Deadlocks occur when threads wait indefinitely for resources held by other waiting threads. Ruby provides timeout mechanisms and consistent lock ordering to prevent deadlocks.

# Deadlock prevention with consistent ordering
class Account
  def initialize(id, balance)
    @id = id
    @balance = balance
    @mutex = Mutex.new
  end
  
  def transfer(amount, to_account)
    # Consistent lock ordering prevents deadlock
    first, second = [@mutex, to_account.instance_variable_get(:@mutex)]
                   .sort_by { |m| m.object_id }
    
    first.synchronize do
      second.synchronize do
        if @balance >= amount
          @balance -= amount
          to_account.instance_variable_set(:@balance, 
            to_account.instance_variable_get(:@balance) + amount)
          true
        else
          false
        end
      end
    end
  end
end

Performance & Memory

Ruby threads consume memory for stack space and object overhead. Each thread allocates approximately 8KB for its stack, though this grows as needed. Thread creation overhead includes memory allocation and initialization costs.

The Global Interpreter Lock limits CPU-bound thread performance to single-core utilization. I/O-bound operations benefit from threading since the GIL releases during system calls, allowing other threads to execute.

# I/O-bound vs CPU-bound performance comparison
require 'benchmark'
require 'net/http'

# I/O-bound - benefits from threading
def fetch_urls(urls)
  threads = urls.map do |url|
    Thread.new(url) do |thread_url|
      Net::HTTP.get_response(URI(thread_url))
    end
  end
  threads.map(&:join)
end

# CPU-bound - limited by GIL
def calculate_primes(ranges)
  threads = ranges.map do |range|
    Thread.new(range) do |thread_range|
      thread_range.select do |n|
        (2...n).none? { |i| n % i == 0 }
      end
    end
  end
  threads.map(&:join)
end

urls = ['http://example.com'] * 5
ranges = [(1..1000), (1001..2000), (2001..3000)]

Benchmark.bm(15) do |x|
  x.report('Sequential I/O:') { urls.map { |url| Net::HTTP.get_response(URI(url)) } }
  x.report('Threaded I/O:') { fetch_urls(urls) }
  
  x.report('Sequential CPU:') { ranges.map { |r| r.select { |n| (2...n).none? { |i| n % i == 0 } } } }
  x.report('Threaded CPU:') { calculate_primes(ranges) }
end

Thread pools reduce creation overhead by reusing existing threads for multiple tasks. Ruby's thread pool pattern amortizes initialization costs across many operations.

# Simple thread pool implementation
class ThreadPool
  def initialize(size)
    @size = size
    @jobs = Queue.new
    @pool = Array.new(size) do
      Thread.new do
        catch(:exit) do
          loop do
            job, args, block = @jobs.pop
            throw :exit if job == :exit
            block.call(*args) if block
          end
        end
      end
    end
  end
  
  def schedule(*args, &block)
    @jobs << [:work, args, block]
  end
  
  def shutdown
    @size.times { @jobs << [:exit] }
    @pool.each(&:join)
  end
end

pool = ThreadPool.new(3)

# Schedule work
10.times do |i|
  pool.schedule(i) do |num|
    puts "Processing #{num} on #{Thread.current.object_id}"
    sleep(0.1)
  end
end

pool.shutdown

Memory leaks occur when threads maintain references to objects after completion. Ruby's garbage collector handles thread cleanup, but application code must avoid circular references and persistent connections.

# Memory management in long-running threads
class WorkerManager
  def initialize
    @workers = {}
    @mutex = Mutex.new
  end
  
  def start_worker(id, &work_block)
    @mutex.synchronize do
      @workers[id] = Thread.new do
        begin
          work_block.call
        ensure
          # Cleanup thread-local data
          Thread.current.keys.each do |key|
            Thread.current[key] = nil
          end
          
          # Remove from active workers
          @mutex.synchronize { @workers.delete(id) }
        end
      end
    end
  end
  
  def active_count
    @mutex.synchronize { @workers.size }
  end
  
  def shutdown_all
    @mutex.synchronize do
      @workers.each_value(&:kill)
      @workers.clear
    end
  end
end

Thread-local variables consume memory per thread instance. Excessive thread-local storage multiplied across many threads creates significant memory overhead.

# Thread-local memory usage
class MemoryMonitor
  def self.measure_thread_memory
    GC.start
    before = GC.stat[:total_allocated_objects]
    
    threads = 100.times.map do |i|
      Thread.new do
        # Heavy thread-local data
        Thread.current[:large_data] = Array.new(1000, "data_#{i}")
        Thread.current[:cache] = Hash.new { |h, k| h[k] = [] }
        sleep(1)
      end
    end
    
    threads.each(&:join)
    GC.start
    after = GC.stat[:total_allocated_objects]
    
    puts "Objects allocated: #{after - before}"
  end
end

MemoryMonitor.measure_thread_memory

Production Patterns

Production thread management requires careful resource allocation, error handling, and monitoring. Thread pools provide predictable resource usage and prevent thread exhaustion under high load.

# Production-ready thread pool with error handling
class ProductionThreadPool
  class WorkerError < StandardError; end
  
  def initialize(size, max_queue: 1000)
    @size = size
    @max_queue = max_queue
    @jobs = SizedQueue.new(max_queue)
    @workers = []
    @shutdown = false
    @error_handler = method(:default_error_handler)
    @mutex = Mutex.new
    @stats = { processed: 0, failed: 0 }
    
    create_workers
  end
  
  def submit(job_data, &block)
    raise 'Pool shutdown' if @shutdown
    @jobs.push([job_data, block])
  rescue ThreadError
    raise WorkerError, 'Queue full - system overloaded'
  end
  
  def on_error(&block)
    @error_handler = block
  end
  
  def stats
    @mutex.synchronize { @stats.dup }
  end
  
  def shutdown(timeout: 30)
    @shutdown = true
    @size.times { @jobs.push(nil) }
    
    deadline = Time.now + timeout
    @workers.each do |worker|
      remaining = deadline - Time.now
      if remaining > 0
        worker.join(remaining)
        worker.kill unless worker.status.nil?
      else
        worker.kill
      end
    end
  end
  
  private
  
  def create_workers
    @size.times do |i|
      @workers << Thread.new do
        Thread.current.name = "worker-#{i}"
        
        loop do
          job_data, block = @jobs.pop
          break if job_data.nil?
          
          begin
            block.call(job_data)
            @mutex.synchronize { @stats[:processed] += 1 }
          rescue => error
            @mutex.synchronize { @stats[:failed] += 1 }
            @error_handler.call(error, job_data)
          end
        end
      end
    end
  end
  
  def default_error_handler(error, job_data)
    puts "Worker error: #{error.message} processing #{job_data.inspect}"
  end
end

# Usage in web application
pool = ProductionThreadPool.new(10, max_queue: 500)

pool.on_error do |error, job_data|
  # Log to monitoring service
  ErrorTracker.notify(error, context: { job: job_data })
end

# Handle background jobs
pool.submit(user_id: 123, action: 'send_email') do |job|
  EmailService.deliver(job[:user_id], job[:action])
end

# Monitor pool health
Thread.new do
  loop do
    stats = pool.stats
    Metrics.gauge('threadpool.processed', stats[:processed])
    Metrics.gauge('threadpool.failed', stats[:failed])
    sleep(60)
  end
end.detach

Application servers like Puma and Unicorn implement different threading strategies. Puma uses threads within worker processes while Unicorn forks separate processes. Configuration must balance memory usage with request concurrency.

# Puma server configuration
class ServerConfiguration
  def self.setup_puma
    # config/puma.rb
    workers ENV.fetch("WEB_CONCURRENCY") { 2 }
    threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 }
    threads threads_count, threads_count
    
    # Thread-local connection pools
    before_fork do
      ActiveRecord::Base.connection_pool.disconnect!
    end
    
    on_worker_boot do
      ActiveRecord::Base.establish_connection
    end
  end
  
  def self.monitor_thread_usage
    Thread.new do
      loop do
        thread_count = Thread.list.size
        memory_usage = `ps -o pid,rss -p #{Process.pid}`.split.last.to_i
        
        Rails.logger.info({
          threads: thread_count,
          memory_kb: memory_usage,
          timestamp: Time.current
        })
        
        sleep(30)
      end
    end.detach
  end
end

Database connection pooling prevents connection exhaustion in multi-threaded applications. Each thread acquires connections from a shared pool and returns them after use.

# Thread-safe database operations
class DatabaseWorker
  def initialize(connection_pool)
    @pool = connection_pool
  end
  
  def process_batch(records)
    threads = []
    
    records.each_slice(100) do |batch|
      threads << Thread.new(batch) do |thread_batch|
        @pool.with_connection do |conn|
          conn.transaction do
            thread_batch.each do |record|
              # Process individual record
              conn.exec_params(
                "INSERT INTO processed_data (data) VALUES ($1)",
                [record.to_json]
              )
            end
          end
        end
      rescue => error
        Rails.logger.error("Batch processing failed: #{error}")
        raise
      end
    end
    
    # Wait for all batches with timeout
    threads.each do |thread|
      thread.join(30) || thread.kill
    end
  end
end

# Usage in background job
class DataProcessor
  def perform(dataset_id)
    dataset = Dataset.find(dataset_id)
    worker = DatabaseWorker.new(ActiveRecord::Base.connection_pool)
    
    begin
      worker.process_batch(dataset.records)
      dataset.update(status: 'completed')
    rescue => error
      dataset.update(status: 'failed', error_message: error.message)
      raise
    end
  end
end

Graceful shutdown handling ensures threads complete current work before termination. Production applications must coordinate shutdown across multiple subsystems.

# Graceful shutdown coordinator
class ShutdownManager
  def initialize
    @subsystems = []
    @shutdown_initiated = false
    @mutex = Mutex.new
    
    trap_signals
  end
  
  def register(subsystem)
    @mutex.synchronize { @subsystems << subsystem }
  end
  
  def shutdown_initiated?
    @shutdown_initiated
  end
  
  private
  
  def trap_signals
    %w[INT TERM].each do |signal|
      Signal.trap(signal) { initiate_shutdown }
    end
  end
  
  def initiate_shutdown
    @mutex.synchronize do
      return if @shutdown_initiated
      @shutdown_initiated = true
    end
    
    puts "Shutdown initiated..."
    
    @subsystems.each do |subsystem|
      Thread.new do
        begin
          subsystem.shutdown
        rescue => error
          puts "Subsystem shutdown error: #{error}"
        end
      end
    end
    
    # Wait for subsystems
    sleep(5)
    exit(0)
  end
end

shutdown_manager = ShutdownManager.new
shutdown_manager.register(pool)
shutdown_manager.register(DatabaseWorker.new)

Reference

Thread Class Methods

Method Parameters Returns Description
Thread.new(*args, &block) *args (Object), &block (Proc) Thread Creates and starts new thread with block
Thread.start(*args, &block) *args (Object), &block (Proc) Thread Alias for Thread.new
Thread.current None Thread Returns currently executing thread
Thread.main None Thread Returns main program thread
Thread.list None Array<Thread> Returns array of all living threads
Thread.kill(thread) thread (Thread) Thread Terminates specified thread
Thread.pass None nil Yields execution to thread scheduler
Thread.stop None nil Suspends current thread until woken

Thread Instance Methods

Method Parameters Returns Description
#join(timeout = nil) timeout (Numeric, nil) Thread Waits for thread completion, returns thread
#value None Object Returns thread result, joins if needed
#kill None Thread Terminates thread immediately
#terminate None Thread Alias for kill
#exit None Thread Graceful thread exit from within thread
#status None String, Boolean, nil Returns thread status
#alive? None Boolean Returns true if thread is running or sleeping
#stop? None Boolean Returns true if thread is stopped
#wakeup None Thread Resumes suspended thread
#run None Thread Wakes up thread, marks as runnable
#priority None Integer Returns thread priority (-3 to 3)
#priority=(val) val (Integer) Integer Sets thread priority
#safe_level None Integer Returns thread safe level (deprecated)
#group None ThreadGroup, nil Returns thread's ThreadGroup
#[]=(key, value) key (Symbol), value (Object) Object Sets thread-local variable
#[](key) key (Symbol) Object, nil Gets thread-local variable
#key?(key) key (Symbol) Boolean Tests if thread-local key exists
#keys None Array<Symbol> Returns thread-local variable keys
#thread_variable_get(key) key (Symbol) Object, nil Gets thread variable (not fiber-local)
#thread_variable_set(key, val) key (Symbol), value (Object) Object Sets thread variable
#thread_variables None Array<Symbol> Returns thread variable keys
#thread_variable?(key) key (Symbol) Boolean Tests if thread variable exists

Thread Status Values

Status Description
"run" Thread is currently running
"sleep" Thread is sleeping or waiting
"aborting" Thread is aborting
false Thread terminated with exception
nil Thread terminated normally

Synchronization Classes

Class Purpose Key Methods
Mutex Mutual exclusion lock #lock, #unlock, #synchronize
ConditionVariable Thread coordination #wait, #signal, #broadcast
Queue Thread-safe FIFO queue #push, #pop, #empty?, #size
SizedQueue Bounded thread-safe queue #push, #pop, #max=, #num_waiting
Monitor Reentrant mutex with conditions #synchronize, #new_cond

Exception Handling

Class Description
ThreadError General thread-related errors
FiberError Fiber-related errors
ClosedQueueError Operations on closed queue

Global Variables

Variable Description
Thread.abort_on_exception If true, unhandled exceptions abort program
Thread.report_on_exception If true, reports unhandled exceptions to stderr

Common Patterns

Pattern Use Case
Thread Pool Reuse threads for multiple tasks
Producer-Consumer Coordinate data processing between threads
Worker Queue Distribute work items across thread pool
Fork-Join Parallel processing with result aggregation
Pipeline Sequential processing stages with buffers