CrackedRuby CrackedRuby

Overview

Bulk operations execute multiple actions in a single request or transaction rather than processing them one at a time. A system that needs to insert 10,000 database records can either execute 10,000 individual INSERT statements or combine them into batched operations. The difference in execution time often spans orders of magnitude.

The concept applies across domains: database operations, API requests, file processing, message queue operations, and data transformations. Each context involves the same fundamental trade-off between granular control and operational efficiency. Individual operations provide immediate feedback and simple error handling but consume excessive resources. Bulk operations sacrifice some flexibility to achieve dramatic performance improvements.

Consider a web application that imports user data from a CSV file containing 50,000 rows. Processing each row with a separate database INSERT statement might take 30 minutes and generate 50,000 network round trips. Converting this to bulk inserts of 1,000 records per batch reduces execution time to under 2 minutes with only 50 database calls. The system handles the same data with identical results but consumes 93% less time.

The pattern extends beyond databases. REST APIs frequently implement bulk endpoints that accept arrays of resources. Cloud storage services provide batch upload operations. Message brokers support publishing multiple messages atomically. Data processing pipelines batch records through transformation stages. Each implementation optimizes for network efficiency, resource utilization, and throughput.

Bulk operations introduce complexity around partial failures, transaction boundaries, and memory management. A batch of 1,000 operations might succeed for 997 records and fail for 3. The system must detect these partial failures, identify which specific operations failed, and decide whether to retry them, roll back the entire batch, or accept partial success. These concerns shape bulk operation design across all contexts.

Key Principles

Batching groups multiple operations into a single unit of work. Instead of executing operations as they arrive, the system accumulates them until reaching a threshold based on count, size, or time. A database bulk insert might batch 500 records, while an API bulk update might batch based on payload size reaching 1MB. Time-based batching executes after a fixed interval regardless of accumulated count, ensuring operations don't wait indefinitely during low-traffic periods.

Batch size directly impacts both performance and resource usage. Small batches of 10-50 operations reduce memory consumption and limit blast radius when errors occur, but don't fully capitalize on bulk operation efficiencies. Large batches of 5,000-10,000 operations maximize throughput but risk memory exhaustion and make error recovery more difficult. The optimal size depends on operation type, data characteristics, available memory, and error tolerance.

Atomicity determines whether a batch succeeds or fails as a single unit. Atomic batches either complete entirely or roll back completely, never leaving the system in a partially updated state. Database transactions provide this guarantee naturally—a bulk insert of 1,000 records either commits all 1,000 or commits none. Non-atomic batches process each operation independently, allowing partial success where 900 records succeed and 100 fail.

Atomicity choice depends on requirements. Financial transactions typically require atomic batches where partial completion creates accounting inconsistencies. Log ingestion tolerates non-atomic batches where losing 1% of records during processing is acceptable. Systems often combine approaches, using atomic batches for critical operations and non-atomic batches for non-critical bulk processing.

Error propagation defines how failures in bulk operations surface to calling code. Fail-fast approaches abort the entire batch on the first error, returning immediately with partial work potentially committed. Fail-complete approaches process all operations regardless of individual failures, collecting errors to return after batch completion. The approach impacts retry logic, debugging, and system behavior under partial failures.

Memory management constrains bulk operation design. Loading 100,000 records into memory for batch processing might consume gigabytes of RAM. Streaming approaches process records incrementally, maintaining a small working set in memory while iterating through the full dataset. Chunking divides large batches into smaller sub-batches processed sequentially, balancing memory usage against the overhead of multiple database round trips or API calls.

Idempotency ensures that processing the same bulk operation multiple times produces the same result as processing it once. When bulk operations fail partway through, the system must decide whether to retry the entire batch or only the failed subset. Idempotent operations can safely retry the full batch without checking for duplicates. Non-idempotent operations require tracking which specific operations succeeded to avoid processing them twice.

Consider bulk email sending where 800 of 1,000 emails send successfully before a network error. Retrying the full batch without idempotency sends 800 users duplicate emails. Idempotent design with deduplication based on message IDs ensures each user receives exactly one email regardless of retries. The principle applies equally to database operations, API calls, and file processing.

Transaction boundaries define the scope of atomic operations in bulk processing. A single database transaction can encompass an entire batch, providing atomicity but potentially holding locks for extended periods. Multiple smaller transactions reduce lock contention but complicate rollback if later operations in the batch fail. Two-phase commit protocols coordinate transactions across multiple systems but add latency and failure modes.

Ruby Implementation

Ruby provides bulk operation support through ActiveRecord, Enumerable methods, and third-party gems. ActiveRecord's bulk methods optimize database operations, while Enumerable enables efficient data transformations and aggregations.

ActiveRecord Bulk Insert uses insert_all to add multiple records in a single SQL statement:

# Individual inserts - slow
users.each do |user_data|
  User.create(user_data)
end

# Bulk insert - fast
User.insert_all([
  { name: 'Alice', email: 'alice@example.com', created_at: Time.current },
  { name: 'Bob', email: 'bob@example.com', created_at: Time.current },
  { name: 'Charlie', email: 'charlie@example.com', created_at: Time.current }
])
# Generates: INSERT INTO users (name, email, created_at) VALUES ('Alice', ...), ('Bob', ...), ('Charlie', ...)

The method returns the number of inserted records and accepts options for handling conflicts. Set unique_by to specify columns for conflict detection and on_duplicate to define behavior when duplicates exist:

User.insert_all(
  user_records,
  unique_by: :email,
  on_duplicate: :skip
)
# Skips records where email already exists

User.insert_all(
  user_records,
  unique_by: :email,
  on_duplicate: :update,
  update_only: [:name, :updated_at]
)
# Updates name and updated_at for existing emails

Bulk Update operations use update_all for uniform updates or upsert_all for mixed insert/update scenarios:

# Update all records matching a condition
User.where(status: 'pending').update_all(status: 'active', activated_at: Time.current)
# Single UPDATE statement affects multiple rows

# Upsert - insert new records or update existing ones
User.upsert_all([
  { id: 1, name: 'Alice Updated', email: 'alice@example.com' },
  { id: nil, name: 'New User', email: 'new@example.com' }
], unique_by: :id)
# Updates record 1, inserts record with nil id

Bulk Delete removes multiple records efficiently:

# Delete records matching criteria
User.where('created_at < ?', 1.year.ago).delete_all
# Single DELETE statement

# Delete specific records by IDs
User.where(id: [1, 5, 7, 12, 24]).delete_all
# DELETE FROM users WHERE id IN (1, 5, 7, 12, 24)

Chunked Processing handles datasets too large for memory using find_each and find_in_batches:

# Process records in batches
User.find_each(batch_size: 1000) do |user|
  user.process_some_operation
end
# Loads 1000 records at a time, processes each individually

# Process entire batches
User.find_in_batches(batch_size: 1000) do |batch|
  # batch is an array of up to 1000 User objects
  email_service.send_bulk(batch.map(&:email))
end

Enumerable Bulk Operations transform and aggregate data efficiently:

# Bulk transformation
results = [1, 2, 3, 4, 5].map { |n| n * 2 }
# => [2, 4, 6, 8, 10]

# Bulk filtering and mapping
active_emails = users
  .select { |u| u.active? }
  .map { |u| u.email }

# Bulk reduction
total = orders.sum(&:amount)
# Calculates sum in Ruby memory

# Database-level bulk reduction (preferred for large datasets)
total = Order.sum(:amount)
# Executes: SELECT SUM(amount) FROM orders

Bulk HTTP Requests with concurrent processing:

require 'concurrent-ruby'

urls = ['http://api.example.com/resource1', 'http://api.example.com/resource2']
pool = Concurrent::FixedThreadPool.new(5)
promises = urls.map do |url|
  Concurrent::Promise.execute(executor: pool) do
    Net::HTTP.get(URI(url))
  end
end

results = promises.map(&:value)
pool.shutdown

Bulk Redis Operations use pipelining:

require 'redis'

redis = Redis.new
redis.pipelined do |pipeline|
  (1..1000).each do |i|
    pipeline.set("key:#{i}", "value#{i}")
  end
end
# Sends all 1000 commands at once, waits for all responses

Practical Examples

CSV Import with Bulk Database Inserts

A system imports product data from a CSV file containing 50,000 rows. Processing each row individually takes 45 minutes. Bulk inserts reduce this to 90 seconds:

require 'csv'

products = []
batch_size = 1000

CSV.foreach('products.csv', headers: true) do |row|
  products << {
    sku: row['sku'],
    name: row['name'],
    price: row['price'].to_f,
    created_at: Time.current,
    updated_at: Time.current
  }
  
  if products.size >= batch_size
    Product.insert_all(products)
    products.clear
  end
end

# Insert remaining records
Product.insert_all(products) if products.any?

This approach loads 1,000 records into memory, inserts them as a batch, clears the array, and repeats. Memory usage stays constant regardless of file size. Each batch executes a single INSERT statement with 1,000 value sets.

Bulk API Updates with Error Collection

An integration service synchronizes 5,000 user records with an external API. The API provides a bulk endpoint accepting up to 100 records per request. Some records fail validation:

class UserSyncService
  BATCH_SIZE = 100
  
  def sync_users(users)
    results = { success: [], errors: [] }
    
    users.each_slice(BATCH_SIZE) do |batch|
      response = api_client.bulk_update(batch.map(&:to_api_format))
      
      response['results'].each_with_index do |result, index|
        user = batch[index]
        if result['success']
          results[:success] << user.id
        else
          results[:errors] << {
            user_id: user.id,
            error: result['error_message']
          }
        end
      end
    end
    
    results
  end
  
  private
  
  def api_client
    @api_client ||= ExternalApiClient.new
  end
end

service = UserSyncService.new
results = service.sync_users(User.where(sync_pending: true))

puts "Synced: #{results[:success].count}"
puts "Failed: #{results[:errors].count}"
results[:errors].each do |error|
  puts "User #{error[:user_id]}: #{error[:error]}"
end

The service processes 5,000 users in 50 API calls instead of 5,000 individual calls. Failed records collect in an errors array for review and retry without blocking successful records.

Bulk Email Sending with Rate Limiting

An application sends password reset emails to 20,000 users. The email service limits bulk sends to 500 recipients per request with a rate limit of 10 requests per minute:

class BulkEmailService
  MAX_RECIPIENTS_PER_BATCH = 500
  MAX_REQUESTS_PER_MINUTE = 10
  
  def send_password_resets(users)
    sent_count = 0
    request_count = 0
    minute_start = Time.now
    
    users.each_slice(MAX_RECIPIENTS_PER_BATCH) do |batch|
      # Rate limiting
      if request_count >= MAX_REQUESTS_PER_MINUTE
        elapsed = Time.now - minute_start
        sleep(60 - elapsed) if elapsed < 60
        minute_start = Time.now
        request_count = 0
      end
      
      email_service.send_bulk(
        batch.map { |u| { to: u.email, template: 'password_reset', data: { token: u.reset_token } } }
      )
      
      sent_count += batch.size
      request_count += 1
      
      Rails.logger.info "Sent #{sent_count}/#{users.size} password reset emails"
    end
    
    sent_count
  end
end

The service respects API rate limits while maximizing throughput. Each batch sends to 500 users. After 10 batches (5,000 emails), it pauses until the minute elapses before continuing.

Bulk Data Transformation with Parallel Processing

A reporting system transforms 100,000 raw log entries into aggregated metrics. Sequential processing takes 20 minutes. Parallel bulk processing reduces this to 3 minutes:

require 'concurrent-ruby'

class LogAggregator
  def aggregate_logs(log_entries)
    chunk_size = 5000
    thread_pool = Concurrent::FixedThreadPool.new(4)
    
    chunks = log_entries.each_slice(chunk_size).to_a
    promises = chunks.map do |chunk|
      Concurrent::Promise.execute(executor: thread_pool) do
        process_chunk(chunk)
      end
    end
    
    aggregated_results = promises.flat_map(&:value)
    thread_pool.shutdown
    
    aggregated_results
  end
  
  private
  
  def process_chunk(chunk)
    chunk.group_by { |entry| entry[:user_id] }.map do |user_id, entries|
      {
        user_id: user_id,
        total_requests: entries.size,
        total_duration: entries.sum { |e| e[:duration] },
        error_count: entries.count { |e| e[:status] >= 400 }
      }
    end
  end
end

aggregator = LogAggregator.new
metrics = aggregator.aggregate_logs(LogEntry.all)
Metric.insert_all(metrics)

The aggregator divides 100,000 entries into 20 chunks of 5,000 records. Four worker threads process chunks concurrently. Each thread aggregates its chunk independently, then results merge into a single array for bulk insertion.

Common Patterns

Fixed-Size Batching processes a specific number of operations per batch regardless of data characteristics:

def process_in_batches(items, batch_size)
  items.each_slice(batch_size) do |batch|
    process_batch(batch)
  end
end

process_in_batches(users, 1000)

This pattern works well when operation cost is uniform across items. Each batch takes roughly the same time to process, making progress predictable. The pattern fails when item size varies significantly—batching 1,000 small records versus 1,000 large records produces different memory and processing characteristics.

Size-Based Batching accumulates items until total size reaches a threshold:

def batch_by_size(items, max_size_bytes)
  batches = []
  current_batch = []
  current_size = 0
  
  items.each do |item|
    item_size = item.to_json.bytesize
    
    if current_size + item_size > max_size_bytes && current_batch.any?
      batches << current_batch
      current_batch = []
      current_size = 0
    end
    
    current_batch << item
    current_size += item_size
  end
  
  batches << current_batch if current_batch.any?
  batches
end

batches = batch_by_size(documents, 1_048_576) # 1MB batches

API endpoints often enforce payload size limits. HTTP requests larger than 1MB might be rejected. Size-based batching ensures each batch stays under the limit regardless of record count. A batch might contain 2,000 small records or 50 large records.

Time-Window Batching accumulates operations over a fixed time period:

class TimeWindowBatcher
  def initialize(window_seconds, &processor)
    @window_seconds = window_seconds
    @processor = processor
    @buffer = []
    @mutex = Mutex.new
    @last_flush = Time.now
    
    start_flush_thread
  end
  
  def add(item)
    @mutex.synchronize do
      @buffer << item
    end
  end
  
  private
  
  def start_flush_thread
    Thread.new do
      loop do
        sleep @window_seconds
        flush
      end
    end
  end
  
  def flush
    items = @mutex.synchronize do
      batch = @buffer.dup
      @buffer.clear
      batch
    end
    
    @processor.call(items) if items.any?
  end
end

batcher = TimeWindowBatcher.new(5) do |items|
  Database.insert_all(items)
end

# Items accumulate for 5 seconds, then flush
batcher.add(record1)
batcher.add(record2)
# ... 5 seconds elapse ...
# Processor receives all accumulated items

This pattern suits systems with variable load. During high traffic, batches accumulate many items and flush every 5 seconds. During low traffic, batches contain few items but still flush regularly, preventing indefinite delays.

Staged Pipeline Batching chains multiple bulk operations:

class BulkPipeline
  def initialize
    @stages = []
  end
  
  def add_stage(&block)
    @stages << block
    self
  end
  
  def process(items, batch_size)
    items.each_slice(batch_size) do |batch|
      @stages.reduce(batch) do |data, stage|
        stage.call(data)
      end
    end
  end
end

pipeline = BulkPipeline.new
  .add_stage { |batch| batch.map { |item| transform(item) } }
  .add_stage { |batch| batch.select { |item| valid?(item) } }
  .add_stage { |batch| Database.insert_all(batch); batch }

pipeline.process(raw_data, 500)

Each stage transforms the batch and passes results to the next stage. The pattern maintains batching benefits throughout the pipeline. A batch of 500 records flows through transformation, validation, and persistence without breaking into individual operations.

Parallel Batch Processing distributes batches across multiple workers:

require 'concurrent-ruby'

def parallel_batch_process(items, batch_size, thread_count)
  thread_pool = Concurrent::FixedThreadPool.new(thread_count)
  futures = []
  
  items.each_slice(batch_size) do |batch|
    futures << Concurrent::Future.execute(executor: thread_pool) do
      process_batch(batch)
    end
  end
  
  results = futures.map(&:value)
  thread_pool.shutdown
  
  results
end

results = parallel_batch_process(records, 1000, 4)

Four worker threads process different batches concurrently. A dataset of 10,000 records splits into ten batches of 1,000. With four threads, the first four batches process simultaneously, followed by the next four, then the final two. Total time equals sequential processing time divided by thread count, assuming CPU availability and no contention.

Error Handling & Edge Cases

Partial Batch Failures occur when some operations in a batch succeed while others fail. The system must identify which operations failed and decide how to proceed:

def bulk_insert_with_fallback(records)
  begin
    Model.insert_all(records)
  rescue ActiveRecord::RecordNotUnique => e
    # Bulk insert failed, try inserting individually to identify problematic records
    failed_records = []
    
    records.each do |record|
      begin
        Model.create!(record)
      rescue ActiveRecord::RecordNotUnique
        failed_records << record
      end
    end
    
    Rails.logger.warn "#{failed_records.size} records failed during bulk insert"
    raise unless failed_records.empty?
  end
end

When a bulk insert fails due to a constraint violation, the code falls back to individual inserts. This identifies exactly which records caused failures. The approach trades performance for precision—the fallback is slower but provides detailed error information.

Transaction Rollback with Partial Progress

Database transactions ensure atomicity but create complexity when bulk operations span multiple transactions:

def bulk_process_with_checkpoints(items, batch_size)
  progress = { processed: 0, failed: 0, last_successful_id: nil }
  
  items.each_slice(batch_size) do |batch|
    begin
      ActiveRecord::Base.transaction do
        batch.each do |item|
          process_item(item)
          progress[:last_successful_id] = item.id
        end
        progress[:processed] += batch.size
      end
    rescue => e
      progress[:failed] += batch.size
      Rails.logger.error "Batch failed: #{e.message}. Last successful ID: #{progress[:last_successful_id]}"
      # Continue with next batch or abort based on requirements
      raise if abort_on_error?
    end
  end
  
  progress
end

Each batch runs in its own transaction. If a batch fails, previous batches remain committed. The system tracks the last successfully processed ID, enabling resume from the failure point on retry rather than reprocessing everything.

Memory Exhaustion Prevention

Loading excessive data into memory causes out-of-memory errors:

# Dangerous - loads entire table into memory
all_records = User.all.to_a
process_bulk(all_records)

# Safe - streams records in batches
User.find_each(batch_size: 1000) do |user|
  # Process individual user
end

# Safe - accumulates batch, processes, clears
batch = []
User.find_each(batch_size: 1000) do |user|
  batch << user
  
  if batch.size >= 100
    process_bulk(batch)
    batch.clear
  end
end
process_bulk(batch) if batch.any?

The streaming approach loads 1,000 records at a time from the database, processes them, and loads the next batch. Memory usage stays constant regardless of table size. The batch accumulation variant maintains a 100-record working set while streaming through the full dataset.

Deadlock Resolution

Concurrent bulk operations can create database deadlocks when they modify overlapping record sets:

def bulk_update_with_retry(records, max_retries = 3)
  attempt = 0
  
  begin
    ActiveRecord::Base.transaction do
      records.each_slice(100) do |batch|
        Model.where(id: batch.map(&:id)).update_all(status: 'processed')
      end
    end
  rescue ActiveRecord::Deadlocked => e
    attempt += 1
    if attempt < max_retries
      sleep(Random.rand(0.1..0.5)) # Random backoff
      retry
    else
      raise
    end
  end
end

When a deadlock occurs, the code waits a random interval then retries. Random backoff prevents two competing operations from repeatedly deadlocking in lockstep. After three attempts, the error propagates for investigation.

Timeout Handling

Long-running bulk operations risk exceeding timeout limits:

def bulk_process_with_timeout(items, batch_size, timeout_seconds)
  start_time = Time.now
  processed = []
  
  items.each_slice(batch_size) do |batch|
    elapsed = Time.now - start_time
    remaining = timeout_seconds - elapsed
    
    if remaining <= 0
      Rails.logger.warn "Timeout exceeded after processing #{processed.size}/#{items.size} items"
      break
    end
    
    begin
      Timeout.timeout(remaining) do
        process_batch(batch)
        processed.concat(batch)
      end
    rescue Timeout::Error
      Rails.logger.error "Batch timed out with #{remaining}s remaining"
      break
    end
  end
  
  processed
end

The code checks elapsed time before each batch. If the overall timeout approaches, processing stops gracefully. Individual batch timeouts prevent a single slow batch from consuming all remaining time. The function returns successfully processed items, allowing partial progress to be recorded.

Invalid Data Handling

Bulk operations with invalid data need validation strategies:

def bulk_insert_with_validation(records)
  valid_records = []
  invalid_records = []
  
  records.each do |record|
    model = Model.new(record)
    if model.valid?
      valid_records << record
    else
      invalid_records << { record: record, errors: model.errors.full_messages }
    end
  end
  
  Model.insert_all(valid_records) if valid_records.any?
  
  {
    inserted: valid_records.size,
    invalid: invalid_records
  }
end

Pre-validation separates valid records from invalid ones. Valid records proceed to bulk insert. Invalid records collect with error messages for logging or retry after correction. This prevents a single invalid record from blocking an entire batch.

Performance Considerations

Database Round-Trip Reduction

Individual operations incur network latency for each database call. Bulk operations amortize this cost across multiple records:

require 'benchmark'

# Individual inserts
time_individual = Benchmark.realtime do
  1000.times do |i|
    User.create(name: "User #{i}", email: "user#{i}@example.com")
  end
end
# => ~8.5 seconds (1000 network round trips)

# Bulk insert
records = 1000.times.map do |i|
  { name: "User #{i}", email: "user#{i}@example.com", created_at: Time.current }
end

time_bulk = Benchmark.realtime do
  User.insert_all(records)
end
# => ~0.15 seconds (1 network round trip)

puts "Individual: #{time_individual}s"
puts "Bulk: #{time_bulk}s"
puts "Speedup: #{(time_individual / time_bulk).round(1)}x"
# Individual: 8.5s
# Bulk: 0.15s
# Speedup: 56.7x

Each individual insert requires a network packet to the database and a response packet back. At 8ms per round trip, 1,000 inserts take 8 seconds just for network overhead. Bulk inserts batch the data into a single request, eliminating 999 round trips.

Batch Size Optimization

Optimal batch size balances memory usage, network efficiency, and database load:

def benchmark_batch_sizes(record_count)
  records = record_count.times.map { |i| { name: "User #{i}" } }
  batch_sizes = [100, 500, 1000, 2000, 5000]
  
  results = batch_sizes.map do |size|
    time = Benchmark.realtime do
      records.each_slice(size) do |batch|
        Model.insert_all(batch)
      end
    end
    
    [size, time, record_count / time]
  end
  
  results.each do |size, time, throughput|
    puts "Batch size #{size}: #{time.round(2)}s (#{throughput.round(0)} records/sec)"
  end
end

benchmark_batch_sizes(10_000)
# Batch size 100: 2.45s (4082 records/sec)
# Batch size 500: 0.89s (11236 records/sec)
# Batch size 1000: 0.52s (19231 records/sec)
# Batch size 2000: 0.48s (20833 records/sec)
# Batch size 5000: 0.51s (19608 records/sec)

Performance improves significantly from batch size 100 to 1,000 as network round trips decrease from 100 to 10. Beyond 1,000, gains diminish as database parsing and transaction overhead become the bottleneck. At 5,000, performance slightly degrades due to increased memory usage and transaction lock duration.

Index Considerations

Indexes slow down bulk inserts as the database updates each index for every inserted row:

class AddIndexManagement
  def bulk_load_with_index_management(records)
    # Disable indexes before bulk insert
    ActiveRecord::Base.connection.execute(
      "ALTER TABLE users DISABLE KEYS"
    ) if mysql?
    
    records.each_slice(5000) do |batch|
      User.insert_all(batch)
    end
    
    # Re-enable and rebuild indexes
    ActiveRecord::Base.connection.execute(
      "ALTER TABLE users ENABLE KEYS"
    ) if mysql?
  end
  
  private
  
  def mysql?
    ActiveRecord::Base.connection.adapter_name.downcase.include?('mysql')
  end
end

For PostgreSQL, dropping indexes before bulk loading and recreating them afterward can be faster for large datasets:

def bulk_load_postgres(records)
  indexes = get_indexes('users')
  
  # Drop indexes
  indexes.each { |idx| ActiveRecord::Base.connection.execute("DROP INDEX #{idx}") }
  
  # Bulk insert
  records.each_slice(5000) { |batch| User.insert_all(batch) }
  
  # Recreate indexes
  recreate_indexes('users', indexes)
end

This optimization makes sense when loading millions of records. For thousands of records, the overhead of dropping and recreating indexes exceeds the time saved during insertion.

Connection Pool Saturation

Concurrent bulk operations can exhaust database connection pools:

# config/database.yml
production:
  pool: 25  # Maximum concurrent connections
  
# Bulk processing with connection management
def parallel_bulk_process(items, batch_size, max_workers)
  # Limit workers to connection pool size minus buffer for web requests
  effective_workers = [max_workers, ActiveRecord::Base.connection_pool.size - 5].min
  
  thread_pool = Concurrent::FixedThreadPool.new(effective_workers)
  
  items.each_slice(batch_size) do |batch|
    Concurrent::Promise.execute(executor: thread_pool) do
      ActiveRecord::Base.connection_pool.with_connection do
        process_batch(batch)
      end
    end.value
  end
  
  thread_pool.shutdown
end

Each worker thread needs a database connection. Spawning 50 worker threads with a pool size of 25 causes 25 threads to block waiting for available connections. Limiting workers to pool size minus a buffer for web requests prevents connection starvation.

Query Plan Impact

Bulk operations can trigger different query plans than single-row operations:

-- Single row operation uses index
UPDATE users SET status = 'active' WHERE id = 123;
-- Uses: INDEX SCAN on users_pkey

-- Bulk operation may use sequential scan
UPDATE users SET status = 'active' WHERE id IN (1,2,3,...10000);
-- Uses: SEQUENTIAL SCAN on users (depending on optimizer)

When updating thousands of rows, the database optimizer might choose a sequential table scan over an index scan. This is faster for large result sets but slower for small ones. Batching bulk updates into smaller groups can encourage index usage:

def bulk_update_optimized(ids, updates)
  ids.each_slice(500) do |batch|
    Model.where(id: batch).update_all(updates)
  end
end

Reference

Bulk Operation Methods

Method Purpose Returns
insert_all Insert multiple records Integer count
upsert_all Insert or update records Integer count
update_all Update matching records Integer count
delete_all Delete matching records Integer count
find_each Iterate records in batches Each record
find_in_batches Iterate batches of records Each batch array

Configuration Parameters

Parameter Description Typical Range
batch_size Records per batch 100-5000
thread_pool_size Concurrent workers 2-10
timeout_seconds Maximum operation duration 30-300
max_retries Retry attempts on failure 3-5
backoff_seconds Delay between retries 0.1-5.0

insert_all Options

Option Values Effect
unique_by Column name(s) Conflict detection key
on_duplicate skip, update, error Duplicate handling strategy
returning Column name(s) Columns to return after insert
update_only Column name(s) Columns to update on conflict
record_timestamps true, false Set created_at/updated_at

Batch Processing Patterns

Pattern Use Case Trade-off
Fixed-size batching Uniform records Simple but may exceed memory or size limits
Size-based batching Variable record sizes Complex but respects size constraints
Time-window batching Streaming data Ensures timely processing but may batch varying counts
Parallel batching CPU-intensive operations Maximum throughput but higher resource usage
Staged pipeline Multi-step processing Maintains batching efficiency but more complex

Error Handling Strategies

Strategy Behavior Best For
Fail-fast Abort on first error Critical operations requiring atomicity
Fail-complete Process all, collect errors Non-critical bulk operations
Retry with fallback Retry failed items individually Operations with occasional failures
Checkpoint and resume Save progress, resume on failure Long-running batch jobs
Dead letter queue Move failed items for later processing Asynchronous bulk processing

Performance Characteristics

Operation Individual Time Bulk Time (1000 records) Speedup
Database insert 8ms 150ms 53x
API call 100ms 2000ms (10 batches) 50x
File write 5ms 80ms 63x
Redis SET 1ms 20ms 50x
Email send 200ms 5000ms (2 batches) 40x

Optimal Batch Sizes by Operation

Operation Type Recommended Size Reason
Database insert 500-2000 Balance memory and transaction duration
Database update 100-500 Prevent lock contention
API bulk endpoint Per API limit Respect service constraints
File processing 1000-5000 Amortize I/O overhead
Message queue publish 100-1000 Balance latency and throughput
Memory transformation Based on available RAM Prevent out-of-memory errors

Common Bulk Operation Errors

Error Cause Solution
ActiveRecord::RecordNotUnique Duplicate key violation Use upsert_all or handle individually
ActiveRecord::Deadlocked Concurrent updates lock conflict Retry with random backoff
PG::QueryCanceled Statement timeout exceeded Reduce batch size or increase timeout
NoMemoryError Batch too large for available RAM Decrease batch size or stream records
PG::ConnectionBad Connection lost during operation Implement retry with reconnection
ActiveRecord::StatementInvalid SQL syntax or constraint error Validate data before bulk operation