Overview
Ruby implements threads through the Thread
class, which provides cooperative multitasking within a single Ruby process. Ruby threads are managed by the Global Interpreter Lock (GIL), which allows only one thread to execute Ruby code at a time, though threads can run concurrently during I/O operations.
The Thread
class handles thread lifecycle, state management, and synchronization primitives. Ruby creates threads using Thread.new
or Thread.start
, passing a block containing the code to execute. Each thread maintains its own stack and local variables while sharing the process heap.
# Basic thread creation
thread = Thread.new do
puts "Thread executing: #{Thread.current.object_id}"
end
thread.join
# => Thread executing: 70234567890123
Ruby provides several thread states: run
, sleep
, aborting
, dead
, and false
for threads that terminated with exceptions. The Thread.current
method returns the currently executing thread, while Thread.main
returns the main program thread.
# Thread state inspection
main_thread = Thread.main
current_thread = Thread.current
puts main_thread == current_thread # => true (in main thread)
puts Thread.current.status # => "run"
Thread-local variables store data specific to each thread using Thread.current[key]
syntax. Ruby also supports fiber-local variables and provides synchronization primitives through the Mutex
, ConditionVariable
, and Queue
classes.
# Thread-local storage
Thread.current[:user_id] = 12345
Thread.new do
Thread.current[:user_id] = 67890
puts Thread.current[:user_id] # => 67890
end.join
puts Thread.current[:user_id] # => 12345
Ruby handles thread exceptions by default termination, though unhandled exceptions in non-main threads only affect that specific thread unless Thread.abort_on_exception
is set to true.
Basic Usage
Creating threads in Ruby requires passing a block to Thread.new
or Thread.start
. The block contains the code to execute in the new thread. Ruby immediately starts executing the thread after creation.
# Thread creation and execution
thread = Thread.new do
5.times do |i|
puts "Working: #{i}"
sleep(0.1)
end
"completion_result"
end
# Continue main thread work
puts "Main thread continues"
# Wait for thread completion and get result
result = thread.join
puts "Thread returned: #{result}"
The join
method blocks the calling thread until the target thread completes execution. Ruby returns the block's last expression as the thread's value. Calling join
on an already completed thread immediately returns the stored result.
# Multiple threads with join
threads = []
3.times do |i|
threads << Thread.new do
sleep(rand(0.5))
"Thread #{i} completed"
end
end
# Wait for all threads and collect results
results = threads.map(&:join)
puts results
# => ["Thread 0 completed", "Thread 1 completed", "Thread 2 completed"]
Ruby provides detach
for threads that don't require result collection. Detached threads run independently, and their resources get cleaned up automatically upon completion.
# Detached thread execution
Thread.new do
# Background processing
log_data = collect_metrics
send_to_analytics(log_data)
end.detach
# Main thread continues without waiting
process_user_request
Thread termination occurs through several mechanisms. Normal completion happens when the block finishes executing. The kill
or terminate
methods force immediate termination, while exit
allows graceful shutdown from within the thread.
# Thread termination patterns
worker = Thread.new do
loop do
break if Thread.current[:stop_requested]
perform_work
sleep(1)
end
end
# Graceful shutdown
worker[:stop_requested] = true
worker.join
# Force termination (avoid in production)
# worker.kill
Ruby supports passing arguments to thread blocks through block parameters. Arguments get evaluated in the calling thread's context before passing to the new thread.
# Thread arguments
def process_data(data, options = {})
Thread.new(data, options) do |thread_data, thread_options|
# Process using thread-local copies
result = thread_data.map { |item| transform(item, thread_options) }
thread_options[:callback]&.call(result)
result
end
end
data = [1, 2, 3, 4, 5]
thread = process_data(data, multiplier: 2)
puts thread.join # => [2, 4, 6, 8, 10]
Thread Safety & Concurrency
Ruby's Global Interpreter Lock prevents true parallelism for CPU-bound tasks but allows concurrent execution during I/O operations. Thread safety requires careful coordination when multiple threads access shared data structures.
The Mutex
class provides mutual exclusion for critical sections. Ruby blocks other threads attempting to acquire the same mutex until the holding thread releases it.
# Mutex for shared resource protection
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
current = @count
sleep(0.001) # Simulate processing
@count = current + 1
end
end
def value
@mutex.synchronize { @count }
end
end
counter = Counter.new
threads = 10.times.map do
Thread.new { 100.times { counter.increment } }
end
threads.each(&:join)
puts counter.value # => 1000 (without mutex, often less)
Race conditions occur when thread execution order affects program behavior. Ruby's thread scheduling is non-deterministic, making race conditions difficult to reproduce consistently.
# Race condition demonstration
shared_array = []
mutex = Mutex.new
threads = 5.times.map do |i|
Thread.new do
# Without synchronization - race condition
100.times do
# shared_array << i # Unsafe
# With synchronization - thread safe
mutex.synchronize do
shared_array << i
end
end
end
end
threads.each(&:join)
puts shared_array.size # => 500 (with mutex)
ConditionVariable
enables thread coordination through wait/signal patterns. Threads wait for specific conditions while other threads signal when conditions become true.
# Producer-consumer with condition variable
class Buffer
def initialize(max_size = 5)
@items = []
@max_size = max_size
@mutex = Mutex.new
@not_full = ConditionVariable.new
@not_empty = ConditionVariable.new
end
def put(item)
@mutex.synchronize do
@not_full.wait(@mutex) while @items.size >= @max_size
@items << item
@not_empty.signal
end
end
def get
@mutex.synchronize do
@not_empty.wait(@mutex) while @items.empty?
item = @items.shift
@not_full.signal
item
end
end
end
buffer = Buffer.new
# Producer thread
producer = Thread.new do
10.times do |i|
buffer.put("item_#{i}")
puts "Produced: item_#{i}"
end
end
# Consumer thread
consumer = Thread.new do
10.times do
item = buffer.get
puts "Consumed: #{item}"
end
end
[producer, consumer].each(&:join)
The Queue
and SizedQueue
classes provide thread-safe data structures with built-in synchronization. Queue
handles unlimited items while SizedQueue
blocks producers when capacity is reached.
# Thread-safe queue operations
work_queue = SizedQueue.new(10)
# Worker threads
workers = 3.times.map do |id|
Thread.new do
while job = work_queue.pop
puts "Worker #{id} processing: #{job}"
sleep(0.1) # Simulate work
end
end
end
# Producer adding work
producer = Thread.new do
20.times do |i|
work_queue << "job_#{i}"
end
# Signal completion
3.times { work_queue << nil }
end
[producer, *workers].each(&:join)
Deadlocks occur when threads wait indefinitely for resources held by other waiting threads. Ruby provides timeout mechanisms and consistent lock ordering to prevent deadlocks.
# Deadlock prevention with consistent ordering
class Account
def initialize(id, balance)
@id = id
@balance = balance
@mutex = Mutex.new
end
def transfer(amount, to_account)
# Consistent lock ordering prevents deadlock
first, second = [@mutex, to_account.instance_variable_get(:@mutex)]
.sort_by { |m| m.object_id }
first.synchronize do
second.synchronize do
if @balance >= amount
@balance -= amount
to_account.instance_variable_set(:@balance,
to_account.instance_variable_get(:@balance) + amount)
true
else
false
end
end
end
end
end
Performance & Memory
Ruby threads consume memory for stack space and object overhead. Each thread allocates approximately 8KB for its stack, though this grows as needed. Thread creation overhead includes memory allocation and initialization costs.
The Global Interpreter Lock limits CPU-bound thread performance to single-core utilization. I/O-bound operations benefit from threading since the GIL releases during system calls, allowing other threads to execute.
# I/O-bound vs CPU-bound performance comparison
require 'benchmark'
require 'net/http'
# I/O-bound - benefits from threading
def fetch_urls(urls)
threads = urls.map do |url|
Thread.new(url) do |thread_url|
Net::HTTP.get_response(URI(thread_url))
end
end
threads.map(&:join)
end
# CPU-bound - limited by GIL
def calculate_primes(ranges)
threads = ranges.map do |range|
Thread.new(range) do |thread_range|
thread_range.select do |n|
(2...n).none? { |i| n % i == 0 }
end
end
end
threads.map(&:join)
end
urls = ['http://example.com'] * 5
ranges = [(1..1000), (1001..2000), (2001..3000)]
Benchmark.bm(15) do |x|
x.report('Sequential I/O:') { urls.map { |url| Net::HTTP.get_response(URI(url)) } }
x.report('Threaded I/O:') { fetch_urls(urls) }
x.report('Sequential CPU:') { ranges.map { |r| r.select { |n| (2...n).none? { |i| n % i == 0 } } } }
x.report('Threaded CPU:') { calculate_primes(ranges) }
end
Thread pools reduce creation overhead by reusing existing threads for multiple tasks. Ruby's thread pool pattern amortizes initialization costs across many operations.
# Simple thread pool implementation
class ThreadPool
def initialize(size)
@size = size
@jobs = Queue.new
@pool = Array.new(size) do
Thread.new do
catch(:exit) do
loop do
job, args, block = @jobs.pop
throw :exit if job == :exit
block.call(*args) if block
end
end
end
end
end
def schedule(*args, &block)
@jobs << [:work, args, block]
end
def shutdown
@size.times { @jobs << [:exit] }
@pool.each(&:join)
end
end
pool = ThreadPool.new(3)
# Schedule work
10.times do |i|
pool.schedule(i) do |num|
puts "Processing #{num} on #{Thread.current.object_id}"
sleep(0.1)
end
end
pool.shutdown
Memory leaks occur when threads maintain references to objects after completion. Ruby's garbage collector handles thread cleanup, but application code must avoid circular references and persistent connections.
# Memory management in long-running threads
class WorkerManager
def initialize
@workers = {}
@mutex = Mutex.new
end
def start_worker(id, &work_block)
@mutex.synchronize do
@workers[id] = Thread.new do
begin
work_block.call
ensure
# Cleanup thread-local data
Thread.current.keys.each do |key|
Thread.current[key] = nil
end
# Remove from active workers
@mutex.synchronize { @workers.delete(id) }
end
end
end
end
def active_count
@mutex.synchronize { @workers.size }
end
def shutdown_all
@mutex.synchronize do
@workers.each_value(&:kill)
@workers.clear
end
end
end
Thread-local variables consume memory per thread instance. Excessive thread-local storage multiplied across many threads creates significant memory overhead.
# Thread-local memory usage
class MemoryMonitor
def self.measure_thread_memory
GC.start
before = GC.stat[:total_allocated_objects]
threads = 100.times.map do |i|
Thread.new do
# Heavy thread-local data
Thread.current[:large_data] = Array.new(1000, "data_#{i}")
Thread.current[:cache] = Hash.new { |h, k| h[k] = [] }
sleep(1)
end
end
threads.each(&:join)
GC.start
after = GC.stat[:total_allocated_objects]
puts "Objects allocated: #{after - before}"
end
end
MemoryMonitor.measure_thread_memory
Production Patterns
Production thread management requires careful resource allocation, error handling, and monitoring. Thread pools provide predictable resource usage and prevent thread exhaustion under high load.
# Production-ready thread pool with error handling
class ProductionThreadPool
class WorkerError < StandardError; end
def initialize(size, max_queue: 1000)
@size = size
@max_queue = max_queue
@jobs = SizedQueue.new(max_queue)
@workers = []
@shutdown = false
@error_handler = method(:default_error_handler)
@mutex = Mutex.new
@stats = { processed: 0, failed: 0 }
create_workers
end
def submit(job_data, &block)
raise 'Pool shutdown' if @shutdown
@jobs.push([job_data, block])
rescue ThreadError
raise WorkerError, 'Queue full - system overloaded'
end
def on_error(&block)
@error_handler = block
end
def stats
@mutex.synchronize { @stats.dup }
end
def shutdown(timeout: 30)
@shutdown = true
@size.times { @jobs.push(nil) }
deadline = Time.now + timeout
@workers.each do |worker|
remaining = deadline - Time.now
if remaining > 0
worker.join(remaining)
worker.kill unless worker.status.nil?
else
worker.kill
end
end
end
private
def create_workers
@size.times do |i|
@workers << Thread.new do
Thread.current.name = "worker-#{i}"
loop do
job_data, block = @jobs.pop
break if job_data.nil?
begin
block.call(job_data)
@mutex.synchronize { @stats[:processed] += 1 }
rescue => error
@mutex.synchronize { @stats[:failed] += 1 }
@error_handler.call(error, job_data)
end
end
end
end
end
def default_error_handler(error, job_data)
puts "Worker error: #{error.message} processing #{job_data.inspect}"
end
end
# Usage in web application
pool = ProductionThreadPool.new(10, max_queue: 500)
pool.on_error do |error, job_data|
# Log to monitoring service
ErrorTracker.notify(error, context: { job: job_data })
end
# Handle background jobs
pool.submit(user_id: 123, action: 'send_email') do |job|
EmailService.deliver(job[:user_id], job[:action])
end
# Monitor pool health
Thread.new do
loop do
stats = pool.stats
Metrics.gauge('threadpool.processed', stats[:processed])
Metrics.gauge('threadpool.failed', stats[:failed])
sleep(60)
end
end.detach
Application servers like Puma and Unicorn implement different threading strategies. Puma uses threads within worker processes while Unicorn forks separate processes. Configuration must balance memory usage with request concurrency.
# Puma server configuration
class ServerConfiguration
def self.setup_puma
# config/puma.rb
workers ENV.fetch("WEB_CONCURRENCY") { 2 }
threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 }
threads threads_count, threads_count
# Thread-local connection pools
before_fork do
ActiveRecord::Base.connection_pool.disconnect!
end
on_worker_boot do
ActiveRecord::Base.establish_connection
end
end
def self.monitor_thread_usage
Thread.new do
loop do
thread_count = Thread.list.size
memory_usage = `ps -o pid,rss -p #{Process.pid}`.split.last.to_i
Rails.logger.info({
threads: thread_count,
memory_kb: memory_usage,
timestamp: Time.current
})
sleep(30)
end
end.detach
end
end
Database connection pooling prevents connection exhaustion in multi-threaded applications. Each thread acquires connections from a shared pool and returns them after use.
# Thread-safe database operations
class DatabaseWorker
def initialize(connection_pool)
@pool = connection_pool
end
def process_batch(records)
threads = []
records.each_slice(100) do |batch|
threads << Thread.new(batch) do |thread_batch|
@pool.with_connection do |conn|
conn.transaction do
thread_batch.each do |record|
# Process individual record
conn.exec_params(
"INSERT INTO processed_data (data) VALUES ($1)",
[record.to_json]
)
end
end
end
rescue => error
Rails.logger.error("Batch processing failed: #{error}")
raise
end
end
# Wait for all batches with timeout
threads.each do |thread|
thread.join(30) || thread.kill
end
end
end
# Usage in background job
class DataProcessor
def perform(dataset_id)
dataset = Dataset.find(dataset_id)
worker = DatabaseWorker.new(ActiveRecord::Base.connection_pool)
begin
worker.process_batch(dataset.records)
dataset.update(status: 'completed')
rescue => error
dataset.update(status: 'failed', error_message: error.message)
raise
end
end
end
Graceful shutdown handling ensures threads complete current work before termination. Production applications must coordinate shutdown across multiple subsystems.
# Graceful shutdown coordinator
class ShutdownManager
def initialize
@subsystems = []
@shutdown_initiated = false
@mutex = Mutex.new
trap_signals
end
def register(subsystem)
@mutex.synchronize { @subsystems << subsystem }
end
def shutdown_initiated?
@shutdown_initiated
end
private
def trap_signals
%w[INT TERM].each do |signal|
Signal.trap(signal) { initiate_shutdown }
end
end
def initiate_shutdown
@mutex.synchronize do
return if @shutdown_initiated
@shutdown_initiated = true
end
puts "Shutdown initiated..."
@subsystems.each do |subsystem|
Thread.new do
begin
subsystem.shutdown
rescue => error
puts "Subsystem shutdown error: #{error}"
end
end
end
# Wait for subsystems
sleep(5)
exit(0)
end
end
shutdown_manager = ShutdownManager.new
shutdown_manager.register(pool)
shutdown_manager.register(DatabaseWorker.new)
Reference
Thread Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
Thread.new(*args, &block) |
*args (Object), &block (Proc) |
Thread |
Creates and starts new thread with block |
Thread.start(*args, &block) |
*args (Object), &block (Proc) |
Thread |
Alias for Thread.new |
Thread.current |
None | Thread |
Returns currently executing thread |
Thread.main |
None | Thread |
Returns main program thread |
Thread.list |
None | Array<Thread> |
Returns array of all living threads |
Thread.kill(thread) |
thread (Thread) |
Thread |
Terminates specified thread |
Thread.pass |
None | nil |
Yields execution to thread scheduler |
Thread.stop |
None | nil |
Suspends current thread until woken |
Thread Instance Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#join(timeout = nil) |
timeout (Numeric, nil) |
Thread |
Waits for thread completion, returns thread |
#value |
None | Object |
Returns thread result, joins if needed |
#kill |
None | Thread |
Terminates thread immediately |
#terminate |
None | Thread |
Alias for kill |
#exit |
None | Thread |
Graceful thread exit from within thread |
#status |
None | String, Boolean, nil |
Returns thread status |
#alive? |
None | Boolean |
Returns true if thread is running or sleeping |
#stop? |
None | Boolean |
Returns true if thread is stopped |
#wakeup |
None | Thread |
Resumes suspended thread |
#run |
None | Thread |
Wakes up thread, marks as runnable |
#priority |
None | Integer |
Returns thread priority (-3 to 3) |
#priority=(val) |
val (Integer) |
Integer |
Sets thread priority |
#safe_level |
None | Integer |
Returns thread safe level (deprecated) |
#group |
None | ThreadGroup, nil |
Returns thread's ThreadGroup |
#[]=(key, value) |
key (Symbol), value (Object) |
Object |
Sets thread-local variable |
#[](key) |
key (Symbol) |
Object, nil |
Gets thread-local variable |
#key?(key) |
key (Symbol) |
Boolean |
Tests if thread-local key exists |
#keys |
None | Array<Symbol> |
Returns thread-local variable keys |
#thread_variable_get(key) |
key (Symbol) |
Object, nil |
Gets thread variable (not fiber-local) |
#thread_variable_set(key, val) |
key (Symbol), value (Object) |
Object |
Sets thread variable |
#thread_variables |
None | Array<Symbol> |
Returns thread variable keys |
#thread_variable?(key) |
key (Symbol) |
Boolean |
Tests if thread variable exists |
Thread Status Values
Status | Description |
---|---|
"run" |
Thread is currently running |
"sleep" |
Thread is sleeping or waiting |
"aborting" |
Thread is aborting |
false |
Thread terminated with exception |
nil |
Thread terminated normally |
Synchronization Classes
Class | Purpose | Key Methods |
---|---|---|
Mutex |
Mutual exclusion lock | #lock , #unlock , #synchronize |
ConditionVariable |
Thread coordination | #wait , #signal , #broadcast |
Queue |
Thread-safe FIFO queue | #push , #pop , #empty? , #size |
SizedQueue |
Bounded thread-safe queue | #push , #pop , #max= , #num_waiting |
Monitor |
Reentrant mutex with conditions | #synchronize , #new_cond |
Exception Handling
Class | Description |
---|---|
ThreadError |
General thread-related errors |
FiberError |
Fiber-related errors |
ClosedQueueError |
Operations on closed queue |
Global Variables
Variable | Description |
---|---|
Thread.abort_on_exception |
If true, unhandled exceptions abort program |
Thread.report_on_exception |
If true, reports unhandled exceptions to stderr |
Common Patterns
Pattern | Use Case |
---|---|
Thread Pool | Reuse threads for multiple tasks |
Producer-Consumer | Coordinate data processing between threads |
Worker Queue | Distribute work items across thread pool |
Fork-Join | Parallel processing with result aggregation |
Pipeline | Sequential processing stages with buffers |