CrackedRuby logo

CrackedRuby

Thread Groups

A comprehensive guide to Ruby's ThreadGroup class for organizing and managing collections of threads in concurrent applications.

Concurrency and Parallelism Threading
6.1.4

Overview

Ruby's ThreadGroup class provides a mechanism for organizing threads into logical collections and applying group-level operations. Thread groups create boundaries for thread management, allowing developers to control thread behavior collectively rather than managing individual threads separately.

Ruby creates every thread as a member of a thread group. By default, all threads belong to ThreadGroup::Default, but threads can be moved between custom groups using the add method. Once a thread joins a group, it remains in that group until explicitly moved or the thread terminates.

The ThreadGroup class supports two primary operations: adding threads to groups and enclosing groups to prevent new threads from joining. Thread groups do not provide scheduling control or execution prioritization - they function purely as organizational containers.

# Default thread group contains all threads initially
puts ThreadGroup::Default.list.size
# => 1 (main thread)

# Create a custom thread group
worker_group = ThreadGroup.new
puts worker_group.list.size
# => 0 (empty group)

Thread groups serve several use cases in concurrent Ruby applications. They isolate related threads for batch operations, implement security boundaries by restricting thread creation, and provide debugging assistance by grouping threads logically. Applications use thread groups to manage worker thread pools, control resource access, and coordinate shutdown procedures across related threads.

# Creating threads in a custom group
processing_group = ThreadGroup.new

5.times do |i|
  thread = Thread.new { sleep(2) }
  processing_group.add(thread)
end

puts "Processing group has #{processing_group.list.size} threads"
# => Processing group has 5 threads

The ThreadGroup implementation in Ruby maintains internal thread lists and provides methods for inspection and modification. Groups track their member threads automatically, removing terminated threads from their internal collections without explicit cleanup.

Basic Usage

Creating thread groups begins with instantiating the ThreadGroup class. New groups start empty and accept threads through the add method. Threads can only belong to one group at a time, so adding a thread to a new group removes it from its previous group.

# Create a new thread group
download_group = ThreadGroup.new

# Start some threads and add them to the group
urls = ['http://api1.com', 'http://api2.com', 'http://api3.com']
threads = urls.map do |url|
  Thread.new do
    # Simulate API call
    sleep(rand(1..3))
    puts "Downloaded from #{url}"
  end
end

# Add all threads to the group
threads.each { |thread| download_group.add(thread) }

puts "Download group contains #{download_group.list.size} threads"
# => Download group contains 3 threads

The list method returns an array of all threads currently in the group. This array reflects the live state of the group, automatically excluding terminated threads. Groups maintain this list internally, updating it as threads complete or get moved between groups.

image_group = ThreadGroup.new
completed_threads = []

# Create threads with different execution times
10.times do |i|
  thread = Thread.new do
    sleep(i * 0.1)  # Some finish faster than others
    completed_threads << i
  end
  image_group.add(thread)
end

# Check group size periodically
3.times do
  sleep(0.3)
  puts "Active threads: #{image_group.list.size}"
end
# => Active threads: 7
# => Active threads: 4
# => Active threads: 0

Thread groups provide the enclose method to prevent new threads from joining the group. Once enclosed, attempting to add threads to the group raises a ThreadError. This feature creates security boundaries and prevents unauthorized thread additions after initialization.

secure_group = ThreadGroup.new

# Add initial threads
2.times do |i|
  thread = Thread.new { sleep(1) }
  secure_group.add(thread)
end

# Enclose the group
secure_group.enclose

# This will raise ThreadError
begin
  new_thread = Thread.new { puts "Hello" }
  secure_group.add(new_thread)
rescue ThreadError => e
  puts "Cannot add thread: #{e.message}"
  # => Cannot add thread: can't move to the enclosed thread group
end

Thread groups automatically handle thread lifecycle management. When threads complete execution, groups remove them from their internal lists without requiring explicit cleanup. This automatic management simplifies thread pool implementations and reduces memory leaks.

batch_group = ThreadGroup.new

# Create a batch of short-lived threads
batch_size = 50
batch_size.times do |i|
  thread = Thread.new do
    result = i * i
    # Thread completes quickly
  end
  batch_group.add(thread)
end

puts "Initial batch size: #{batch_group.list.size}"
sleep(0.1)  # Let threads complete
puts "Final batch size: #{batch_group.list.size}"
# => Initial batch size: 50
# => Final batch size: 0

Thread Safety & Concurrency

Thread groups themselves are thread-safe for basic operations like adding threads and checking group membership. Ruby's ThreadGroup implementation uses internal synchronization to prevent race conditions when multiple threads modify group membership simultaneously.

shared_group = ThreadGroup.new
modification_threads = []

# Multiple threads adding to the same group
10.times do |i|
  modification_threads << Thread.new do
    worker = Thread.new { sleep(0.1) }
    shared_group.add(worker)  # Thread-safe operation
  end
end

modification_threads.each(&:join)
puts "Shared group size: #{shared_group.list.size}"
# => Shared group size: 10

However, operations on thread collections returned by list require additional synchronization. The array returned by list represents a snapshot at that moment, but the underlying group membership may change between the method call and array usage.

coordination_group = ThreadGroup.new
results = Queue.new

# Add worker threads to group
5.times do |i|
  thread = Thread.new do
    sleep(rand(0.1..0.5))
    results << "Worker #{i} completed"
  end
  coordination_group.add(thread)
end

# Race condition: list size may change during iteration
coordination_group.list.each_with_index do |thread, index|
  # Group membership might change here
  if thread.alive?
    puts "Thread #{index} is still running"
  end
end

For reliable group operations, capture the thread list once and work with that snapshot. This approach prevents inconsistencies when group membership changes during processing.

task_group = ThreadGroup.new
status_mutex = Mutex.new
completed_count = 0

# Create tasks with shared state
8.times do |i|
  thread = Thread.new do
    sleep(rand(0.1..0.3))
    status_mutex.synchronize { completed_count += 1 }
  end
  task_group.add(thread)
end

# Capture snapshot for consistent operations
active_threads = task_group.list
puts "Starting with #{active_threads.size} threads"

# Wait for all threads in snapshot
active_threads.each(&:join)

status_mutex.synchronize do
  puts "Completed tasks: #{completed_count}"
  # => Completed tasks: 8
end

Thread groups work with other concurrency primitives like mutexes, condition variables, and queues. Groups provide the organizational structure while other primitives handle synchronization and communication.

producer_group = ThreadGroup.new
consumer_group = ThreadGroup.new
buffer = Queue.new
shutdown = false

# Producer threads
3.times do |i|
  producer = Thread.new do
    counter = 0
    until shutdown
      buffer << "Item #{i}-#{counter}"
      counter += 1
      sleep(0.1)
    end
  end
  producer_group.add(producer)
end

# Consumer threads
2.times do |i|
  consumer = Thread.new do
    until shutdown && buffer.empty?
      begin
        item = buffer.pop(true)  # Non-blocking
        puts "Consumer #{i} processed: #{item}"
      rescue ThreadError
        sleep(0.01)  # Queue empty
      end
    end
  end
  consumer_group.add(consumer)
end

sleep(0.5)
shutdown = true

# Wait for all producers and consumers
(producer_group.list + consumer_group.list).each(&:join)

Error Handling & Debugging

Thread groups simplify error handling by providing centralized access to all related threads. Applications can check thread status, handle exceptions, and coordinate error recovery across group members.

processing_group = ThreadGroup.new
error_log = Queue.new

# Create threads that might fail
5.times do |i|
  thread = Thread.new do
    begin
      # Simulate work that might fail
      raise StandardError, "Process #{i} failed" if i.odd?
      sleep(0.2)
      "Process #{i} succeeded"
    rescue => e
      error_log << { thread_id: Thread.current.object_id, error: e.message }
      raise  # Re-raise to mark thread as failed
    end
  end
  processing_group.add(thread)
end

# Check for thread failures
failed_threads = []
processing_group.list.each do |thread|
  begin
    thread.join
  rescue => e
    failed_threads << { thread: thread, error: e }
  end
end

puts "Failed threads: #{failed_threads.size}"
until error_log.empty?
  error_info = error_log.pop(true) rescue nil
  puts "Error: #{error_info[:error]}" if error_info
end

Debugging thread groups involves inspecting thread states and tracking thread relationships. The list method provides current thread membership, while individual thread inspection reveals execution status.

debug_group = ThreadGroup.new
thread_info = {}

# Create threads with different behaviors
threads_data = [
  { name: 'fast', duration: 0.1 },
  { name: 'slow', duration: 1.0 },
  { name: 'error', duration: 0.5 }
]

threads_data.each do |data|
  thread = Thread.new do
    Thread.current[:name] = data[:name]
    if data[:name] == 'error'
      sleep(data[:duration] / 2)
      raise RuntimeError, "Intentional error"
    else
      sleep(data[:duration])
    end
  end
  debug_group.add(thread)
  thread_info[thread.object_id] = data
end

# Debug thread status
def debug_thread_group(group, info)
  puts "=== Thread Group Debug ==="
  group.list.each do |thread|
    thread_data = info[thread.object_id]
    status = case thread.status
             when 'run' then 'Running'
             when 'sleep' then 'Sleeping'
             when false then 'Terminated normally'
             when nil then 'Terminated with exception'
             else thread.status
             end
    
    puts "Thread #{thread_data[:name]}: #{status}"
    puts "  Alive: #{thread.alive?}"
    puts "  Stop: #{thread.stop?}" if thread.respond_to?(:stop?)
  end
end

# Monitor thread states
3.times do
  debug_thread_group(debug_group, thread_info)
  sleep(0.3)
  puts
end

Exception handling in thread groups requires careful consideration of error propagation. Unhandled exceptions in threads terminate those threads but don't affect other group members or the main thread unless explicitly handled.

resilient_group = ThreadGroup.new
exception_handler = Thread.new do
  loop do
    sleep(0.1)
    resilient_group.list.each do |thread|
      if thread.status.nil?  # Thread died from exception
        begin
          thread.join  # This will re-raise the exception
        rescue => e
          puts "Caught exception from thread: #{e.message}"
          puts "Backtrace: #{e.backtrace.first(3).join(', ')}"
          
          # Could restart thread here or take corrective action
          puts "Thread #{thread.object_id} will be replaced"
        end
      end
    end
    break if resilient_group.list.empty?
  end
end

# Create threads that will fail at different times
5.times do |i|
  thread = Thread.new do
    sleep(i * 0.1)
    raise ArgumentError, "Worker #{i} encountered invalid data"
  end
  resilient_group.add(thread)
end

exception_handler.join
puts "All threads processed with exception handling"

Common Pitfalls

Thread group membership changes immediately when adding threads, but the effects on thread behavior only manifest when threads check group-related operations. This timing difference creates subtle bugs in thread coordination logic.

timing_group = ThreadGroup.new
coordination_results = []

# Pitfall: Assuming immediate group behavior changes
thread = Thread.new do
  sleep(0.1)  # Thread starts in Default group
  coordination_results << "Thread sees group: #{Thread.current.group}"
end

# Add to group after thread starts
timing_group.add(thread)
puts "Added to group immediately"

thread.join
# Thread might still see old group depending on timing

Another common pitfall involves expecting thread groups to provide execution control. Thread groups organize threads but don't influence scheduling, priority, or execution order. Applications requiring execution control need additional synchronization mechanisms.

# Pitfall: Expecting group-based execution control
priority_group = ThreadGroup.new
execution_order = []

# These threads won't execute in group-addition order
%w[first second third].each_with_index do |name, i|
  thread = Thread.new do
    sleep(rand(0.01..0.03))  # Random timing
    execution_order << name
  end
  priority_group.add(thread)
end

priority_group.list.each(&:join)
puts "Execution order: #{execution_order}"
# => Execution order might be: ["third", "first", "second"]

The enclose method creates a permanent restriction that cannot be reversed. Applications that enclose groups prematurely lose the ability to add threads later, even when logically appropriate.

# Pitfall: Premature group enclosure
dynamic_group = ThreadGroup.new

# Add initial threads
2.times do |i|
  thread = Thread.new { sleep(0.1) }
  dynamic_group.add(thread)
end

# Enclose too early
dynamic_group.enclose

# Later need to add more threads fails
additional_work = Thread.new { puts "Additional work" }
begin
  dynamic_group.add(additional_work)
rescue ThreadError
  puts "Cannot expand enclosed group - must create new group"
  overflow_group = ThreadGroup.new
  overflow_group.add(additional_work)
end

Thread group list returns a snapshot that becomes stale as threads complete. Iterating over stale lists causes errors when threads terminate between list capture and iteration.

# Pitfall: Using stale thread lists
volatile_group = ThreadGroup.new

# Add short-lived threads
10.times do |i|
  thread = Thread.new { sleep(i * 0.01) }  # Different lifespans
  volatile_group.add(thread)
end

# Capture list immediately
thread_list = volatile_group.list
sleep(0.05)  # Some threads complete

# Stale list causes issues
thread_list.each do |thread|
  if thread.alive?  # Check before operations
    puts "Thread #{thread.object_id} status: #{thread.status}"
  else
    puts "Thread #{thread.object_id} already terminated"
  end
end

Memory leaks can occur when applications maintain references to thread groups after all member threads complete. Ruby's garbage collector cannot reclaim thread groups while external references exist, even when groups serve no purpose.

# Pitfall: Retaining unnecessary group references
group_registry = {}

def create_task_group(name)
  group = ThreadGroup.new
  3.times do |i|
    thread = Thread.new { sleep(0.1) }
    group.add(thread)
  end
  group_registry[name] = group  # Permanent reference
  group
end

# Create many temporary groups
100.times { |i| create_task_group("batch_#{i}") }

# All threads complete but groups remain in memory
sleep(0.2)
puts "Registry size: #{group_registry.size}"  # => 100

# Proper cleanup required
group_registry.clear

Reference

ThreadGroup Class Methods

Method Parameters Returns Description
ThreadGroup.new None ThreadGroup Creates a new empty thread group

ThreadGroup Instance Methods

Method Parameters Returns Description
#add(thread) thread (Thread) ThreadGroup Adds thread to group, removing from previous group
#enclose None ThreadGroup Prevents new threads from joining group
#enclosed? None Boolean Returns true if group is enclosed
#list None Array<Thread> Returns array of active threads in group

Constants

Constant Value Description
ThreadGroup::Default ThreadGroup Default group containing all threads initially

Exception Types

Exception Conditions Description
ThreadError Adding thread to enclosed group Raised when attempting to modify enclosed group
ArgumentError Invalid thread parameter Raised when non-Thread object passed to add

Thread Group States

State Description Methods Affected
Open Accepts new threads add succeeds
Enclosed Rejects new threads add raises ThreadError

Thread States in Groups

Thread Status Meaning Group Behavior
'run' Currently executing Included in list
'sleep' Waiting/blocked Included in list
false Terminated normally Removed from list
nil Terminated with exception Removed from list

Common Usage Patterns

# Worker pool pattern
pool = ThreadGroup.new
workers = 5.times.map do |i|
  Thread.new { worker_loop(i) }
end
workers.each { |w| pool.add(w) }

# Secure group pattern  
secure = ThreadGroup.new
trusted_threads.each { |t| secure.add(t) }
secure.enclose

# Batch processing pattern
batch = ThreadGroup.new
tasks.each do |task|
  thread = Thread.new { process(task) }
  batch.add(thread)
end
batch.list.each(&:join)

# Group monitoring pattern
def monitor_group(group)
  group.list.map do |thread|
    {
      id: thread.object_id,
      status: thread.status,
      alive: thread.alive?
    }
  end
end

Integration with Concurrent Primitives

Primitive Usage with ThreadGroup Example
Mutex Synchronize group operations mutex.synchronize { group.list.each {...} }
Queue Communication between group members `queue = Queue.new; group.list.each {
ConditionVariable Coordinate group activities `cv.wait(mutex) until group.list.all? {

Performance Characteristics

Operation Time Complexity Notes
add O(1) Constant time thread addition
list O(n) Linear in number of active threads
enclose O(1) Constant time state change
enclosed? O(1) Constant time state check