Overview
Ruby's ThreadGroup class provides a mechanism for organizing threads into logical collections and applying group-level operations. Thread groups create boundaries for thread management, allowing developers to control thread behavior collectively rather than managing individual threads separately.
Ruby creates every thread as a member of a thread group. By default, all threads belong to ThreadGroup::Default
, but threads can be moved between custom groups using the add
method. Once a thread joins a group, it remains in that group until explicitly moved or the thread terminates.
The ThreadGroup class supports two primary operations: adding threads to groups and enclosing groups to prevent new threads from joining. Thread groups do not provide scheduling control or execution prioritization - they function purely as organizational containers.
# Default thread group contains all threads initially
puts ThreadGroup::Default.list.size
# => 1 (main thread)
# Create a custom thread group
worker_group = ThreadGroup.new
puts worker_group.list.size
# => 0 (empty group)
Thread groups serve several use cases in concurrent Ruby applications. They isolate related threads for batch operations, implement security boundaries by restricting thread creation, and provide debugging assistance by grouping threads logically. Applications use thread groups to manage worker thread pools, control resource access, and coordinate shutdown procedures across related threads.
# Creating threads in a custom group
processing_group = ThreadGroup.new
5.times do |i|
thread = Thread.new { sleep(2) }
processing_group.add(thread)
end
puts "Processing group has #{processing_group.list.size} threads"
# => Processing group has 5 threads
The ThreadGroup implementation in Ruby maintains internal thread lists and provides methods for inspection and modification. Groups track their member threads automatically, removing terminated threads from their internal collections without explicit cleanup.
Basic Usage
Creating thread groups begins with instantiating the ThreadGroup class. New groups start empty and accept threads through the add
method. Threads can only belong to one group at a time, so adding a thread to a new group removes it from its previous group.
# Create a new thread group
download_group = ThreadGroup.new
# Start some threads and add them to the group
urls = ['http://api1.com', 'http://api2.com', 'http://api3.com']
threads = urls.map do |url|
Thread.new do
# Simulate API call
sleep(rand(1..3))
puts "Downloaded from #{url}"
end
end
# Add all threads to the group
threads.each { |thread| download_group.add(thread) }
puts "Download group contains #{download_group.list.size} threads"
# => Download group contains 3 threads
The list
method returns an array of all threads currently in the group. This array reflects the live state of the group, automatically excluding terminated threads. Groups maintain this list internally, updating it as threads complete or get moved between groups.
image_group = ThreadGroup.new
completed_threads = []
# Create threads with different execution times
10.times do |i|
thread = Thread.new do
sleep(i * 0.1) # Some finish faster than others
completed_threads << i
end
image_group.add(thread)
end
# Check group size periodically
3.times do
sleep(0.3)
puts "Active threads: #{image_group.list.size}"
end
# => Active threads: 7
# => Active threads: 4
# => Active threads: 0
Thread groups provide the enclose
method to prevent new threads from joining the group. Once enclosed, attempting to add threads to the group raises a ThreadError. This feature creates security boundaries and prevents unauthorized thread additions after initialization.
secure_group = ThreadGroup.new
# Add initial threads
2.times do |i|
thread = Thread.new { sleep(1) }
secure_group.add(thread)
end
# Enclose the group
secure_group.enclose
# This will raise ThreadError
begin
new_thread = Thread.new { puts "Hello" }
secure_group.add(new_thread)
rescue ThreadError => e
puts "Cannot add thread: #{e.message}"
# => Cannot add thread: can't move to the enclosed thread group
end
Thread groups automatically handle thread lifecycle management. When threads complete execution, groups remove them from their internal lists without requiring explicit cleanup. This automatic management simplifies thread pool implementations and reduces memory leaks.
batch_group = ThreadGroup.new
# Create a batch of short-lived threads
batch_size = 50
batch_size.times do |i|
thread = Thread.new do
result = i * i
# Thread completes quickly
end
batch_group.add(thread)
end
puts "Initial batch size: #{batch_group.list.size}"
sleep(0.1) # Let threads complete
puts "Final batch size: #{batch_group.list.size}"
# => Initial batch size: 50
# => Final batch size: 0
Thread Safety & Concurrency
Thread groups themselves are thread-safe for basic operations like adding threads and checking group membership. Ruby's ThreadGroup implementation uses internal synchronization to prevent race conditions when multiple threads modify group membership simultaneously.
shared_group = ThreadGroup.new
modification_threads = []
# Multiple threads adding to the same group
10.times do |i|
modification_threads << Thread.new do
worker = Thread.new { sleep(0.1) }
shared_group.add(worker) # Thread-safe operation
end
end
modification_threads.each(&:join)
puts "Shared group size: #{shared_group.list.size}"
# => Shared group size: 10
However, operations on thread collections returned by list
require additional synchronization. The array returned by list
represents a snapshot at that moment, but the underlying group membership may change between the method call and array usage.
coordination_group = ThreadGroup.new
results = Queue.new
# Add worker threads to group
5.times do |i|
thread = Thread.new do
sleep(rand(0.1..0.5))
results << "Worker #{i} completed"
end
coordination_group.add(thread)
end
# Race condition: list size may change during iteration
coordination_group.list.each_with_index do |thread, index|
# Group membership might change here
if thread.alive?
puts "Thread #{index} is still running"
end
end
For reliable group operations, capture the thread list once and work with that snapshot. This approach prevents inconsistencies when group membership changes during processing.
task_group = ThreadGroup.new
status_mutex = Mutex.new
completed_count = 0
# Create tasks with shared state
8.times do |i|
thread = Thread.new do
sleep(rand(0.1..0.3))
status_mutex.synchronize { completed_count += 1 }
end
task_group.add(thread)
end
# Capture snapshot for consistent operations
active_threads = task_group.list
puts "Starting with #{active_threads.size} threads"
# Wait for all threads in snapshot
active_threads.each(&:join)
status_mutex.synchronize do
puts "Completed tasks: #{completed_count}"
# => Completed tasks: 8
end
Thread groups work with other concurrency primitives like mutexes, condition variables, and queues. Groups provide the organizational structure while other primitives handle synchronization and communication.
producer_group = ThreadGroup.new
consumer_group = ThreadGroup.new
buffer = Queue.new
shutdown = false
# Producer threads
3.times do |i|
producer = Thread.new do
counter = 0
until shutdown
buffer << "Item #{i}-#{counter}"
counter += 1
sleep(0.1)
end
end
producer_group.add(producer)
end
# Consumer threads
2.times do |i|
consumer = Thread.new do
until shutdown && buffer.empty?
begin
item = buffer.pop(true) # Non-blocking
puts "Consumer #{i} processed: #{item}"
rescue ThreadError
sleep(0.01) # Queue empty
end
end
end
consumer_group.add(consumer)
end
sleep(0.5)
shutdown = true
# Wait for all producers and consumers
(producer_group.list + consumer_group.list).each(&:join)
Error Handling & Debugging
Thread groups simplify error handling by providing centralized access to all related threads. Applications can check thread status, handle exceptions, and coordinate error recovery across group members.
processing_group = ThreadGroup.new
error_log = Queue.new
# Create threads that might fail
5.times do |i|
thread = Thread.new do
begin
# Simulate work that might fail
raise StandardError, "Process #{i} failed" if i.odd?
sleep(0.2)
"Process #{i} succeeded"
rescue => e
error_log << { thread_id: Thread.current.object_id, error: e.message }
raise # Re-raise to mark thread as failed
end
end
processing_group.add(thread)
end
# Check for thread failures
failed_threads = []
processing_group.list.each do |thread|
begin
thread.join
rescue => e
failed_threads << { thread: thread, error: e }
end
end
puts "Failed threads: #{failed_threads.size}"
until error_log.empty?
error_info = error_log.pop(true) rescue nil
puts "Error: #{error_info[:error]}" if error_info
end
Debugging thread groups involves inspecting thread states and tracking thread relationships. The list
method provides current thread membership, while individual thread inspection reveals execution status.
debug_group = ThreadGroup.new
thread_info = {}
# Create threads with different behaviors
threads_data = [
{ name: 'fast', duration: 0.1 },
{ name: 'slow', duration: 1.0 },
{ name: 'error', duration: 0.5 }
]
threads_data.each do |data|
thread = Thread.new do
Thread.current[:name] = data[:name]
if data[:name] == 'error'
sleep(data[:duration] / 2)
raise RuntimeError, "Intentional error"
else
sleep(data[:duration])
end
end
debug_group.add(thread)
thread_info[thread.object_id] = data
end
# Debug thread status
def debug_thread_group(group, info)
puts "=== Thread Group Debug ==="
group.list.each do |thread|
thread_data = info[thread.object_id]
status = case thread.status
when 'run' then 'Running'
when 'sleep' then 'Sleeping'
when false then 'Terminated normally'
when nil then 'Terminated with exception'
else thread.status
end
puts "Thread #{thread_data[:name]}: #{status}"
puts " Alive: #{thread.alive?}"
puts " Stop: #{thread.stop?}" if thread.respond_to?(:stop?)
end
end
# Monitor thread states
3.times do
debug_thread_group(debug_group, thread_info)
sleep(0.3)
puts
end
Exception handling in thread groups requires careful consideration of error propagation. Unhandled exceptions in threads terminate those threads but don't affect other group members or the main thread unless explicitly handled.
resilient_group = ThreadGroup.new
exception_handler = Thread.new do
loop do
sleep(0.1)
resilient_group.list.each do |thread|
if thread.status.nil? # Thread died from exception
begin
thread.join # This will re-raise the exception
rescue => e
puts "Caught exception from thread: #{e.message}"
puts "Backtrace: #{e.backtrace.first(3).join(', ')}"
# Could restart thread here or take corrective action
puts "Thread #{thread.object_id} will be replaced"
end
end
end
break if resilient_group.list.empty?
end
end
# Create threads that will fail at different times
5.times do |i|
thread = Thread.new do
sleep(i * 0.1)
raise ArgumentError, "Worker #{i} encountered invalid data"
end
resilient_group.add(thread)
end
exception_handler.join
puts "All threads processed with exception handling"
Common Pitfalls
Thread group membership changes immediately when adding threads, but the effects on thread behavior only manifest when threads check group-related operations. This timing difference creates subtle bugs in thread coordination logic.
timing_group = ThreadGroup.new
coordination_results = []
# Pitfall: Assuming immediate group behavior changes
thread = Thread.new do
sleep(0.1) # Thread starts in Default group
coordination_results << "Thread sees group: #{Thread.current.group}"
end
# Add to group after thread starts
timing_group.add(thread)
puts "Added to group immediately"
thread.join
# Thread might still see old group depending on timing
Another common pitfall involves expecting thread groups to provide execution control. Thread groups organize threads but don't influence scheduling, priority, or execution order. Applications requiring execution control need additional synchronization mechanisms.
# Pitfall: Expecting group-based execution control
priority_group = ThreadGroup.new
execution_order = []
# These threads won't execute in group-addition order
%w[first second third].each_with_index do |name, i|
thread = Thread.new do
sleep(rand(0.01..0.03)) # Random timing
execution_order << name
end
priority_group.add(thread)
end
priority_group.list.each(&:join)
puts "Execution order: #{execution_order}"
# => Execution order might be: ["third", "first", "second"]
The enclose
method creates a permanent restriction that cannot be reversed. Applications that enclose groups prematurely lose the ability to add threads later, even when logically appropriate.
# Pitfall: Premature group enclosure
dynamic_group = ThreadGroup.new
# Add initial threads
2.times do |i|
thread = Thread.new { sleep(0.1) }
dynamic_group.add(thread)
end
# Enclose too early
dynamic_group.enclose
# Later need to add more threads fails
additional_work = Thread.new { puts "Additional work" }
begin
dynamic_group.add(additional_work)
rescue ThreadError
puts "Cannot expand enclosed group - must create new group"
overflow_group = ThreadGroup.new
overflow_group.add(additional_work)
end
Thread group list
returns a snapshot that becomes stale as threads complete. Iterating over stale lists causes errors when threads terminate between list capture and iteration.
# Pitfall: Using stale thread lists
volatile_group = ThreadGroup.new
# Add short-lived threads
10.times do |i|
thread = Thread.new { sleep(i * 0.01) } # Different lifespans
volatile_group.add(thread)
end
# Capture list immediately
thread_list = volatile_group.list
sleep(0.05) # Some threads complete
# Stale list causes issues
thread_list.each do |thread|
if thread.alive? # Check before operations
puts "Thread #{thread.object_id} status: #{thread.status}"
else
puts "Thread #{thread.object_id} already terminated"
end
end
Memory leaks can occur when applications maintain references to thread groups after all member threads complete. Ruby's garbage collector cannot reclaim thread groups while external references exist, even when groups serve no purpose.
# Pitfall: Retaining unnecessary group references
group_registry = {}
def create_task_group(name)
group = ThreadGroup.new
3.times do |i|
thread = Thread.new { sleep(0.1) }
group.add(thread)
end
group_registry[name] = group # Permanent reference
group
end
# Create many temporary groups
100.times { |i| create_task_group("batch_#{i}") }
# All threads complete but groups remain in memory
sleep(0.2)
puts "Registry size: #{group_registry.size}" # => 100
# Proper cleanup required
group_registry.clear
Reference
ThreadGroup Class Methods
Method | Parameters | Returns | Description |
---|---|---|---|
ThreadGroup.new |
None | ThreadGroup |
Creates a new empty thread group |
ThreadGroup Instance Methods
Method | Parameters | Returns | Description |
---|---|---|---|
#add(thread) |
thread (Thread) |
ThreadGroup |
Adds thread to group, removing from previous group |
#enclose |
None | ThreadGroup |
Prevents new threads from joining group |
#enclosed? |
None | Boolean |
Returns true if group is enclosed |
#list |
None | Array<Thread> |
Returns array of active threads in group |
Constants
Constant | Value | Description |
---|---|---|
ThreadGroup::Default |
ThreadGroup |
Default group containing all threads initially |
Exception Types
Exception | Conditions | Description |
---|---|---|
ThreadError |
Adding thread to enclosed group | Raised when attempting to modify enclosed group |
ArgumentError |
Invalid thread parameter | Raised when non-Thread object passed to add |
Thread Group States
State | Description | Methods Affected |
---|---|---|
Open | Accepts new threads | add succeeds |
Enclosed | Rejects new threads | add raises ThreadError |
Thread States in Groups
Thread Status | Meaning | Group Behavior |
---|---|---|
'run' |
Currently executing | Included in list |
'sleep' |
Waiting/blocked | Included in list |
false |
Terminated normally | Removed from list |
nil |
Terminated with exception | Removed from list |
Common Usage Patterns
# Worker pool pattern
pool = ThreadGroup.new
workers = 5.times.map do |i|
Thread.new { worker_loop(i) }
end
workers.each { |w| pool.add(w) }
# Secure group pattern
secure = ThreadGroup.new
trusted_threads.each { |t| secure.add(t) }
secure.enclose
# Batch processing pattern
batch = ThreadGroup.new
tasks.each do |task|
thread = Thread.new { process(task) }
batch.add(thread)
end
batch.list.each(&:join)
# Group monitoring pattern
def monitor_group(group)
group.list.map do |thread|
{
id: thread.object_id,
status: thread.status,
alive: thread.alive?
}
end
end
Integration with Concurrent Primitives
Primitive | Usage with ThreadGroup | Example |
---|---|---|
Mutex |
Synchronize group operations | mutex.synchronize { group.list.each {...} } |
Queue |
Communication between group members | `queue = Queue.new; group.list.each { |
ConditionVariable |
Coordinate group activities | `cv.wait(mutex) until group.list.all? { |
Performance Characteristics
Operation | Time Complexity | Notes |
---|---|---|
add |
O(1) | Constant time thread addition |
list |
O(n) | Linear in number of active threads |
enclose |
O(1) | Constant time state change |
enclosed? |
O(1) | Constant time state check |