Overview
Batch processing executes a series of jobs on a computer without manual intervention, processing accumulated data in groups rather than individually as transactions occur. The system collects data over a period, then processes the entire batch during scheduled windows, typically during off-peak hours to minimize impact on interactive systems.
This processing model contrasts with real-time or stream processing where each transaction receives immediate handling. Batch processing trades immediacy for efficiency, processing thousands or millions of records in a single operation with optimized resource utilization. Financial institutions use batch processing for end-of-day transaction reconciliation, e-commerce platforms for order fulfillment, and data warehouses for ETL operations.
The batch processing pattern emerged from early computing constraints when interactive processing proved impractical for large datasets. Modern batch systems inherit these core principles while adding distributed computing capabilities, fault tolerance, and integration with real-time systems. Organizations continue adopting batch processing for operations where data freshness requirements permit delays measured in minutes or hours rather than milliseconds.
# Simple batch processing example
class OrderProcessor
def process_batch(orders)
orders.each_slice(100) do |batch|
ActiveRecord::Base.transaction do
batch.each { |order| process_order(order) }
end
end
end
end
Batch processing systems typically separate into three phases: data collection, processing, and output generation. The collection phase accumulates input records from various sources into staging areas. Processing applies business logic, transformations, and computations to the collected data. Output generation writes results to databases, files, or downstream systems.
Key Principles
Batch processing operates on several fundamental principles that distinguish it from other processing paradigms. The principle of deferred execution postpones processing until a complete dataset accumulates, allowing the system to optimize operations across the entire batch rather than optimizing individual transactions. This deferral enables strategies like sorted processing, bulk operations, and resource pooling impossible with per-transaction processing.
Atomicity at the batch level ensures that processing either completes successfully for all records or fails without partial updates. Unlike interactive systems where atomicity applies to individual transactions, batch systems treat the entire batch as a single unit of work. Failed batches trigger rollback mechanisms that restore the system to its pre-batch state or mark failed records for reprocessing.
class PayrollBatch
def process
ActiveRecord::Base.transaction do
employees.each do |employee|
calculate_pay(employee)
generate_payslip(employee)
record_payment(employee)
end
# All succeed or all rollback
end
rescue => error
log_failure(error)
notify_administrators
raise
end
end
Resource optimization concentrates system resources on batch processing during scheduled windows. The system loads data into memory, establishes database connections, and initializes processing contexts once per batch rather than per record. This amortizes setup costs across thousands of operations, dramatically reducing per-record overhead compared to interactive processing.
The principle of idempotency allows batch jobs to execute multiple times with identical results, critical for recovery from failures. Idempotent batch operations detect previously processed records and skip reprocessing, or they structure operations such that repeated execution produces the same outcome. Systems achieve idempotency through unique identifiers, timestamps, or status tracking.
Checkpointing preserves progress at intervals during batch execution. When processing millions of records, checkpoint mechanisms record completed work, enabling restart from the last checkpoint rather than reprocessing the entire batch after failures. Checkpoints balance recovery granularity against checkpoint overhead.
class LargeBatchProcessor
CHECKPOINT_INTERVAL = 1000
def process_with_checkpoints(records)
records.each_slice(CHECKPOINT_INTERVAL) do |chunk|
process_chunk(chunk)
save_checkpoint(chunk.last.id)
end
end
def resume_from_checkpoint
last_processed_id = load_checkpoint
records.where('id > ?', last_processed_id)
end
end
Error isolation contains failures to individual records or sub-batches without aborting the entire batch. Systems log failed records to error queues for manual review or automated retry while allowing successfully processed records to commit. This principle prevents single malformed records from blocking processing of millions of valid records.
The scheduling principle executes batch jobs during predetermined windows based on data availability, resource constraints, and business requirements. Schedulers coordinate dependencies between batches, ensuring upstream jobs complete before downstream consumers execute. Schedule-driven execution provides predictability for capacity planning and SLA management.
Ruby Implementation
Ruby provides multiple approaches for implementing batch processing, from simple iteration patterns to sophisticated frameworks. The language's blocks and enumerable methods offer natural batch processing constructs, while gems extend capabilities for distributed processing and job scheduling.
The Enumerable#each_slice method partitions collections into fixed-size batches, foundational for memory-efficient processing of large datasets:
class UserEmailBatch
BATCH_SIZE = 500
def send_newsletter
User.find_each(batch_size: BATCH_SIZE) do |user|
NewsletterMailer.deliver_later(user)
end
end
end
ActiveRecord provides find_each and find_in_batches methods that retrieve records in batches, preventing memory exhaustion when processing millions of database rows. These methods handle pagination internally, yielding batches to the processing block:
class ReportGenerator
def generate_monthly_report
Report.find_in_batches(batch_size: 1000) do |batch|
statistics = calculate_statistics(batch)
persist_statistics(statistics)
end
end
private
def calculate_statistics(batch)
{
total_revenue: batch.sum(&:revenue),
average_value: batch.sum(&:revenue) / batch.size,
processed_at: Time.current
}
end
end
The Sidekiq gem implements background job processing with batch operations through its Pro version. Sidekiq batches track completion of related jobs, triggering callbacks when all jobs finish:
class OrderFulfillmentBatch
def process_orders(order_ids)
batch = Sidekiq::Batch.new
batch.on(:success, self.class, 'batch_id' => batch.bid)
batch.jobs do
order_ids.each do |order_id|
FulfillmentWorker.perform_async(order_id)
end
end
end
def on_success(status, options)
NotificationService.notify_completion(options['batch_id'])
end
end
Ruby's Parallel gem enables concurrent batch processing across multiple threads or processes:
require 'parallel'
class ImageProcessor
def process_images(image_paths)
Parallel.each(image_paths, in_processes: 4) do |path|
image = MiniMagick::Image.open(path)
image.resize '800x600'
image.write output_path(path)
end
end
end
The concurrent-ruby gem provides thread-safe data structures and executors for building custom batch processors:
require 'concurrent'
class DataTransformer
def transform_batch(records)
pool = Concurrent::FixedThreadPool.new(5)
futures = records.map do |record|
Concurrent::Future.execute(executor: pool) do
transform_record(record)
end
end
results = futures.map(&:value)
pool.shutdown
pool.wait_for_termination
results
end
end
ActiveJob provides a framework-agnostic interface for background job processing, supporting batch operations through various adapters:
class DataExportJob < ApplicationJob
queue_as :exports
def perform(export_id)
export = Export.find(export_id)
records = export.scope.find_in_batches(batch_size: 5000)
CSV.open(export.file_path, 'wb') do |csv|
records.each do |batch|
batch.each do |record|
csv << record.to_csv_row
end
end
end
export.mark_completed!
end
end
Custom batch processors often implement retry logic with exponential backoff:
class ResilientBatchProcessor
MAX_RETRIES = 3
def process_with_retry(batch)
retries = 0
begin
process_batch(batch)
rescue StandardError => error
retries += 1
if retries <= MAX_RETRIES
sleep(2 ** retries)
retry
else
handle_failure(batch, error)
raise
end
end
end
end
Implementation Approaches
Batch processing implementations vary based on data volume, processing complexity, and latency requirements. Selection between approaches depends on factors including fault tolerance needs, resource constraints, and integration requirements.
Sequential processing executes batch operations in order on a single thread or process. This approach suits datasets fitting in memory with processing times under several minutes. Sequential processing simplifies error handling and maintains processing order:
class SequentialProcessor
def process(records)
results = []
errors = []
records.each do |record|
begin
results << process_record(record)
rescue => error
errors << { record: record, error: error }
end
end
{ results: results, errors: errors }
end
end
Sequential processing avoids concurrency complications but scales poorly with increasing data volumes. Processing time grows linearly with record count, limiting throughput on large datasets.
Parallel processing distributes batch operations across multiple threads or processes, reducing total processing time through concurrent execution. This approach requires thread-safe operations and coordination mechanisms:
The implementation partitions the batch into chunks, assigns chunks to workers, and aggregates results. Parallelism introduces complexity around shared state, resource contention, and result ordering.
Staged processing breaks batch operations into sequential stages connected by queues. Each stage processes records independently, passing results to the next stage. This pipeline architecture enables specialized processing at each stage:
class PipelineProcessor
def process
stage1_queue = Queue.new
stage2_queue = Queue.new
# Stage 1: Extract and validate
Thread.new do
extract_records.each { |record| stage1_queue << record }
stage1_queue.close
end
# Stage 2: Transform
Thread.new do
while record = stage1_queue.pop
transformed = transform(record)
stage2_queue << transformed
end
stage2_queue.close
end
# Stage 3: Load
while record = stage2_queue.pop
load_to_database(record)
end
end
end
Staged processing enables independent scaling of each stage and natural fault isolation. Slow stages create backpressure, preventing upstream stages from overwhelming downstream processors.
Distributed batch processing partitions work across multiple machines, processing data in parallel across a cluster. Frameworks like Hadoop or Spark implement distributed batch processing, though Ruby applications typically integrate with these systems rather than implementing distribution natively.
Micro-batching processes small batches continuously rather than accumulating large batches for periodic processing. This hybrid approach reduces latency while maintaining batch processing efficiency:
class MicroBatchProcessor
BATCH_SIZE = 100
BATCH_TIMEOUT = 5.seconds
def start
@queue = []
@last_flush = Time.current
loop do
record = fetch_next_record(timeout: 1)
@queue << record if record
should_flush = @queue.size >= BATCH_SIZE ||
Time.current - @last_flush > BATCH_TIMEOUT
if should_flush && @queue.any?
process_batch(@queue)
@queue.clear
@last_flush = Time.current
end
end
end
end
Micro-batching balances latency against throughput, processing records in small groups as they arrive rather than waiting for complete batch accumulation.
Event-driven batch processing triggers batch operations in response to events rather than schedules. Events might include file arrivals, queue depth thresholds, or external system notifications. This approach processes data as soon as sufficient volume accumulates rather than waiting for scheduled windows.
Practical Examples
E-commerce order fulfillment demonstrates typical batch processing patterns. Orders accumulate throughout the day, then batch processing generates picking lists, updates inventory, and creates shipping labels during evening processing windows:
class OrderFulfillmentBatch
def process_daily_orders
orders = Order.unfulfilled.where('created_at >= ?', 24.hours.ago)
orders.find_in_batches(batch_size: 500) do |batch|
ActiveRecord::Base.transaction do
inventory_updates = []
shipping_labels = []
batch.each do |order|
# Validate inventory availability
unless check_inventory(order)
order.mark_backorder!
next
end
# Reserve inventory
inventory_updates.concat(reserve_items(order))
# Generate shipping label
shipping_labels << create_shipping_label(order)
order.mark_ready_to_ship!
end
# Bulk operations
InventoryItem.import(inventory_updates, on_duplicate_key_update: [:quantity])
ShippingLabel.import(shipping_labels)
# Notify warehouse
WarehouseNotifier.send_picking_list(batch.map(&:id))
end
end
end
end
Financial reconciliation processes transactions in batches to match payments, detect discrepancies, and generate reports. The batch runs after business hours when transaction volume stabilizes:
class PaymentReconciliationBatch
def reconcile_daily_payments
date = Date.yesterday
internal_payments = Payment.where(processed_date: date)
external_records = fetch_processor_records(date)
reconciliation_results = {
matched: [],
missing_internal: [],
missing_external: [],
amount_mismatch: []
}
# Match records
internal_payments.find_each do |payment|
external = external_records.find { |r| r.transaction_id == payment.external_id }
if external.nil?
reconciliation_results[:missing_external] << payment
elsif external.amount != payment.amount
reconciliation_results[:amount_mismatch] << {
payment: payment,
external: external,
difference: payment.amount - external.amount
}
else
reconciliation_results[:matched] << payment
external_records.delete(external)
end
end
# Remaining external records have no internal match
reconciliation_results[:missing_internal] = external_records
# Generate report
ReconciliationReport.create!(
date: date,
matched_count: reconciliation_results[:matched].size,
discrepancies: reconciliation_results.except(:matched),
status: discrepancies_found? ? 'requires_review' : 'clean'
)
notify_finance_team if discrepancies_found?
end
end
Data warehouse ETL operations extract data from source systems, transform formats, and load into analytical databases. The batch processes overnight when source system load decreases:
class CustomerDataWarehouseBatch
def execute
extraction_timestamp = Time.current
# Extract from multiple sources
customers = extract_customers
orders = extract_orders
support_tickets = extract_support_tickets
# Transform and enrich
enriched_data = customers.map do |customer|
{
customer_id: customer.id,
total_orders: orders.count { |o| o.customer_id == customer.id },
total_spent: orders.select { |o| o.customer_id == customer.id }
.sum(&:total),
open_tickets: support_tickets.count { |t| t.customer_id == customer.id && t.open? },
customer_score: calculate_score(customer, orders, support_tickets),
extracted_at: extraction_timestamp
}
end
# Load to warehouse
WarehouseConnection.transaction do
WarehouseConnection.execute('DELETE FROM customer_facts WHERE date = ?', Date.today)
enriched_data.each_slice(1000) do |chunk|
CustomerFact.insert_all(chunk)
end
end
# Update materialized views
WarehouseConnection.execute('REFRESH MATERIALIZED VIEW customer_segments')
end
end
Email campaign processing sends newsletters to subscriber lists in batches to avoid overwhelming mail servers and respect rate limits:
class NewsletterBatch
BATCH_SIZE = 100
DELAY_BETWEEN_BATCHES = 2.seconds
def send_campaign(campaign_id)
campaign = Campaign.find(campaign_id)
subscribers = campaign.active_subscribers
total_sent = 0
total_failed = 0
subscribers.find_in_batches(batch_size: BATCH_SIZE) do |batch|
batch.each do |subscriber|
begin
EmailDeliveryService.send(
to: subscriber.email,
subject: campaign.subject,
body: personalize_content(campaign.body, subscriber)
)
total_sent += 1
rescue => error
log_delivery_failure(subscriber, error)
total_failed += 1
end
end
# Rate limiting
sleep DELAY_BETWEEN_BATCHES
end
campaign.update!(
sent_at: Time.current,
total_sent: total_sent,
total_failed: total_failed
)
end
end
Design Considerations
Selecting batch processing over alternative approaches requires evaluating latency tolerance, resource constraints, and data characteristics. Batch processing excels when deferred processing provides acceptable user experience and enables significant efficiency gains.
Latency requirements determine batch processing viability. Operations tolerating minutes or hours of delay suit batch processing, while real-time requirements demand streaming or synchronous processing. Financial end-of-day processing accepts overnight latency, but fraud detection requires sub-second response times.
Consider the business impact of delayed processing. Customer-facing operations often require immediate feedback, while internal analytics and reporting tolerate batch delays. Mixed approaches combine real-time processing for critical operations with batch processing for supplementary tasks.
Data volume influences batch processing suitability. Small datasets processed frequently may incur unnecessary overhead from batch infrastructure, while large datasets benefit from batch optimizations. Batch processing becomes advantageous when per-record setup costs exceed per-record processing costs.
The volume threshold depends on processing complexity. Simple operations like email sending benefit from batching at thousands of records, while complex transformations show advantages at hundreds of records due to higher setup costs.
Resource utilization patterns affect architecture decisions. Batch processing concentrates resource usage during processing windows, enabling resource sharing with interactive systems during off-peak hours. Organizations with predictable load patterns can right-size infrastructure for peak batch processing rather than maintaining excess capacity.
Batch processing enables resource optimizations impossible with per-transaction processing:
class OptimizedBatchProcessor
def process_with_caching(records)
# Load reference data once
reference_data = load_reference_data
# Establish single database connection
connection = establish_connection
# Prepare statement once
statement = connection.prepare(INSERT_SQL)
records.each_slice(1000) do |batch|
batch.each do |record|
enriched = enrich_with_reference(record, reference_data)
statement.execute(enriched)
end
end
statement.close
connection.close
end
end
Error handling complexity increases with batch size. Large batches require sophisticated error handling to prevent single failures from blocking thousands of records. Small batches simplify error handling but reduce efficiency gains.
Consider whether partial success meets requirements. Some domains require all-or-nothing semantics, while others tolerate processing a subset of records. All-or-nothing semantics demand transactional guarantees or complex compensation logic.
Idempotency requirements shape implementation complexity. Systems requiring exactly-once processing need deduplication mechanisms, state tracking, and coordination protocols. At-least-once processing accepts simpler implementations with retry logic.
Data dependencies between batch operations require careful scheduling. Complex dependency graphs need orchestration frameworks, while independent batches run in parallel. Consider whether batches must execute in order or can run concurrently.
Scalability patterns differ between vertical and horizontal scaling. Vertical scaling increases resources on single machines, while horizontal scaling distributes across clusters. Ruby batch processors typically scale vertically within process limits, then horizontally through job queues distributing work across workers.
Performance Considerations
Batch processing performance depends on optimizing I/O operations, minimizing per-record overhead, and maximizing parallelism. Performance tuning balances throughput against latency, resource utilization, and system stability.
Database batching dramatically reduces round-trip latency by executing multiple operations in single statements. INSERT operations benefit most from batching:
class BulkInserter
def insert_records(records)
# Inefficient: N database round-trips
records.each { |record| Record.create(record) }
# Efficient: 1 database round-trip per batch
records.each_slice(1000) do |batch|
Record.insert_all(batch)
end
end
end
Bulk inserts reduce network overhead and database locking contention. PostgreSQL and MySQL optimize bulk inserts internally, achieving 10-100x speedups over individual inserts.
Connection pooling amortizes connection establishment costs across multiple operations. Database connections require TCP handshakes, authentication, and session initialization, overhead measured in milliseconds per connection:
class PooledBatchProcessor
def initialize
@pool = ConnectionPool.new(size: 10, timeout: 5) do
Database.establish_connection
end
end
def process_batch(records)
@pool.with do |connection|
records.each { |record| connection.execute(process_sql(record)) }
end
end
end
Memory management prevents out-of-memory errors during large batch processing. Stream processing through datasets rather than loading entire batches into memory:
class MemoryEfficientProcessor
def process_large_dataset
# Inefficient: loads millions of records into memory
records = Record.all
records.each { |record| process(record) }
# Efficient: streams records in batches
Record.find_each(batch_size: 1000) do |record|
process(record)
end
end
end
Parallel processing utilizes multiple CPU cores for CPU-bound operations. Ruby's Global Interpreter Lock limits thread-based parallelism for CPU-intensive work, favoring process-based parallelism:
require 'parallel'
class ParallelBatchProcessor
def process_cpu_intensive(records)
Parallel.map(records, in_processes: 8) do |record|
cpu_intensive_transformation(record)
end
end
end
Process-based parallelism avoids GIL contention but increases memory usage since each process maintains separate memory space. Thread-based parallelism suits I/O-bound operations where threads spend time waiting for external resources.
Batch size tuning balances memory usage, transaction overhead, and failure granularity. Larger batches reduce per-batch overhead but increase memory consumption and failure impact. Smaller batches increase overhead but improve failure isolation:
# Benchmark different batch sizes
require 'benchmark'
[100, 500, 1000, 5000].each do |batch_size|
time = Benchmark.realtime do
records.each_slice(batch_size) do |batch|
process_batch(batch)
end
end
puts "Batch size #{batch_size}: #{time} seconds"
end
Optimal batch size depends on record size, processing complexity, and database characteristics. Start with 1000 records and adjust based on profiling results.
Index optimization accelerates batch operations involving lookups or joins. Temporary indexes created before batch processing and dropped afterward can improve performance:
class IndexOptimizedBatch
def process_with_temporary_index
connection.execute('CREATE INDEX CONCURRENTLY idx_temp ON records (lookup_field)')
records.find_each do |record|
# Queries benefit from temporary index
related = RelatedRecord.where(lookup_field: record.key)
process_with_related(record, related)
end
connection.execute('DROP INDEX idx_temp')
end
end
Caching reference data eliminates repeated database queries for data referenced by each record. Load reference data once before batch processing:
class CachedReferenceProcessor
def process_with_cache(records)
# Load all reference data once
categories = Category.all.index_by(&:id)
price_rules = PriceRule.active.to_a
records.each do |record|
category = categories[record.category_id]
applicable_rules = price_rules.select { |r| r.applies_to?(record) }
process_with_references(record, category, applicable_rules)
end
end
end
Error Handling & Edge Cases
Batch processing error handling ensures system stability, data integrity, and operational visibility when failures occur. Robust error handling distinguishes production-ready batch systems from prototypes.
Record-level error handling isolates failures to individual records, preventing cascading failures across the entire batch:
class ResilientBatchProcessor
def process_with_error_handling(records)
results = {
successful: [],
failed: []
}
records.each do |record|
begin
result = process_record(record)
results[:successful] << { record: record, result: result }
rescue StandardError => error
results[:failed] << {
record: record,
error: error.message,
backtrace: error.backtrace.first(5)
}
ErrorLogger.log(error, record: record.id)
end
end
# Process failed records
handle_failures(results[:failed]) if results[:failed].any?
results
end
end
Record-level isolation allows partial batch success, critical for large batches where complete reprocessing would waste resources.
Transactional boundaries ensure atomicity within logical units. Batch processing typically requires transactions at the sub-batch level rather than the entire batch:
class TransactionalBatchProcessor
TRANSACTION_SIZE = 100
def process_with_transactions(records)
records.each_slice(TRANSACTION_SIZE) do |chunk|
ActiveRecord::Base.transaction do
chunk.each { |record| process_record(record) }
end
rescue ActiveRecord::Rollback => error
log_transaction_failure(chunk, error)
# Continue with next chunk
end
end
end
Transaction size balances atomicity guarantees against rollback costs. Large transactions lock resources longer and increase rollback overhead during failures.
Dead letter queues store records that fail repeatedly after retry attempts. These queues enable manual review and correction without blocking batch progress:
class DeadLetterBatchProcessor
MAX_RETRIES = 3
def process_with_dlq(records)
records.each do |record|
retry_count = 0
begin
process_record(record)
rescue StandardError => error
retry_count += 1
if retry_count < MAX_RETRIES
sleep(2 ** retry_count)
retry
else
DeadLetterQueue.add(
record: record,
error: error.message,
retry_count: retry_count,
original_batch_id: batch_id
)
end
end
end
end
end
Timeout handling prevents individual records from blocking batch progress indefinitely. Set timeouts for external service calls and long-running operations:
require 'timeout'
class TimeoutBatchProcessor
RECORD_TIMEOUT = 30.seconds
def process_with_timeout(records)
records.each do |record|
begin
Timeout.timeout(RECORD_TIMEOUT) do
process_record(record)
end
rescue Timeout::Error
log_timeout(record)
next
end
end
end
end
Data validation prevents corrupt or malformed data from entering processing pipelines. Validate records before processing to fail fast:
class ValidatingBatchProcessor
def process_with_validation(records)
valid_records = []
invalid_records = []
records.each do |record|
validation_result = validate_record(record)
if validation_result.valid?
valid_records << record
else
invalid_records << {
record: record,
errors: validation_result.errors
}
end
end
# Process valid records
process_batch(valid_records)
# Report invalid records
ValidationFailureReport.create!(
batch_id: batch_id,
invalid_count: invalid_records.size,
failures: invalid_records
)
end
end
Duplicate detection prevents reprocessing records when batch jobs run multiple times. Track processed records using idempotency keys:
class IdempotentBatchProcessor
def process_idempotent(records)
records.each do |record|
idempotency_key = generate_key(record)
next if ProcessedRecord.exists?(idempotency_key: idempotency_key)
begin
result = process_record(record)
ProcessedRecord.create!(
idempotency_key: idempotency_key,
record_id: record.id,
processed_at: Time.current
)
rescue StandardError => error
log_processing_error(record, error)
end
end
end
end
Partial failure recovery enables batch restart from the last successful checkpoint rather than reprocessing the entire batch:
class CheckpointedBatchProcessor
def process_with_checkpoints(records)
checkpoint = load_checkpoint || 0
records.drop(checkpoint).each_with_index do |record, index|
process_record(record)
if (index + 1) % 1000 == 0
save_checkpoint(checkpoint + index + 1)
end
end
clear_checkpoint
end
end
Tools & Ecosystem
Ruby's batch processing ecosystem includes frameworks for job scheduling, background processing, and distributed computing. Tool selection depends on batch complexity, scale requirements, and infrastructure constraints.
Sidekiq dominates Ruby background job processing, offering reliability, performance, and extensive features. Sidekiq Pro and Enterprise add batch operations, rate limiting, and enhanced reliability:
class SidekiqBatchJob
include Sidekiq::Worker
def perform(record_ids)
batch = Sidekiq::Batch.new
batch.description = 'Process records batch'
batch.on(:success, self.class, 'notification' => true)
batch.on(:death, self.class, 'alert' => true)
batch.jobs do
record_ids.each do |id|
RecordProcessor.perform_async(id)
end
end
end
def on_success(status, options)
NotificationService.notify_success if options['notification']
end
end
Sidekiq uses Redis for job storage and coordination, providing visibility into job status and retry management through its web interface.
Delayed Job offers database-backed job queueing without external dependencies. Its simplicity suits applications avoiding additional infrastructure:
class BatchProcessor
def self.perform(batch_id)
batch = Batch.find(batch_id)
batch.records.find_each do |record|
process_record(record)
end
batch.mark_completed!
end
handle_asynchronously :perform
end
# Queue the job
BatchProcessor.perform(batch_id)
Delayed Job stores jobs in the application database, eliminating Redis dependency but potentially impacting database performance under high job volumes.
Resque provides Redis-backed job processing with strong fork-based concurrency. Each job executes in a separate forked process:
class ResqueBatchJob
@queue = :batch_processing
def self.perform(record_ids)
record_ids.each do |id|
record = Record.find(id)
process_record(record)
end
end
end
# Enqueue
Resque.enqueue(ResqueBatchJob, record_ids)
Whenever manages cron job scheduling with Ruby DSL, generating crontab entries from readable configuration:
# config/schedule.rb
every 1.day, at: '2:00 am' do
runner 'DailyBatchProcessor.execute'
end
every :monday, at: '3:00 am' do
runner 'WeeklyReportBatch.generate'
end
Rufus-scheduler provides in-process job scheduling without external dependencies:
require 'rufus-scheduler'
scheduler = Rufus::Scheduler.new
scheduler.cron '0 2 * * *' do
DailyBatch.process
end
scheduler.every '1h' do
HourlyBatch.process
end
ActiveJob abstracts job queueing, supporting multiple backends including Sidekiq, Delayed Job, and Resque:
class BatchProcessingJob < ApplicationJob
queue_as :batch
def perform(batch_id)
batch = Batch.find(batch_id)
batch.records.find_each { |record| process(record) }
end
end
# Enqueue with any supported backend
BatchProcessingJob.perform_later(batch_id)
ETL tools like Kiba provide extract-transform-load frameworks for data processing pipelines:
require 'kiba'
job = Kiba.parse do
source CsvSource, filename: 'input.csv'
transform do |row|
row[:email] = row[:email].downcase
row[:created_at] = Time.parse(row[:date])
row
end
destination DatabaseDestination, table: :users
end
Kiba.run(job)
Good Job provides a multithreaded, Postgres-based job queue alternative to Redis-dependent solutions:
class BatchJob < ApplicationJob
def perform(records)
records.each do |record|
process_record(record)
end
end
end
Good Job stores jobs in Postgres, leveraging advisory locks for coordination and LISTEN/NOTIFY for real-time job scheduling.
Reference
Batch Processing Patterns
| Pattern | Description | Use Case |
|---|---|---|
| Sequential | Process records in order on single thread | Small datasets, order-dependent operations |
| Parallel | Distribute processing across threads or processes | CPU-intensive transformations, large datasets |
| Pipeline | Multi-stage processing with queues between stages | Complex transformations, independent stages |
| Micro-batch | Process small batches continuously | Near real-time requirements with batch efficiency |
| Event-driven | Trigger batches based on events | Variable arrival rates, just-in-time processing |
Performance Optimization Techniques
| Technique | Impact | Implementation |
|---|---|---|
| Bulk database operations | 10-100x speedup | Use insert_all, update_all, delete_all |
| Connection pooling | Reduce connection overhead | Configure pool size based on concurrency |
| Caching reference data | Eliminate repeated queries | Load reference data once per batch |
| Parallel processing | Utilize multiple cores | Use Parallel gem or process pools |
| Transaction batching | Reduce commit overhead | Group operations into larger transactions |
| Index optimization | Accelerate lookups | Create temporary indexes for batch queries |
Error Handling Strategies
| Strategy | Behavior | Trade-offs |
|---|---|---|
| Fail fast | Abort batch on first error | Simplest implementation, highest impact |
| Record isolation | Continue processing after errors | Partial success, complex error tracking |
| Transaction chunks | Atomic sub-batches | Balanced atomicity and progress |
| Retry with backoff | Retry failed operations | Handles transient failures, adds latency |
| Dead letter queue | Store persistent failures | Prevents blocking, requires manual review |
| Checkpoint recovery | Resume from last checkpoint | Minimize reprocessing, adds complexity |
Ruby Batch Processing Gems
| Gem | Purpose | Key Features |
|---|---|---|
| Sidekiq | Background job processing | Redis-backed, batch operations, web UI |
| Delayed Job | Database-backed jobs | No external dependencies, simple setup |
| Resque | Redis-backed jobs | Fork-based concurrency, failure handling |
| ActiveJob | Framework abstraction | Backend-agnostic, Rails integration |
| Whenever | Cron job scheduling | Ruby DSL for crontab |
| Rufus-scheduler | In-process scheduling | No external dependencies |
| Parallel | Parallel processing | Thread and process-based parallelism |
| Kiba | ETL framework | Extract-transform-load pipelines |
Batch Size Guidelines
| Data Volume | Recommended Batch Size | Rationale |
|---|---|---|
| < 1,000 records | 100-500 | Balance overhead and simplicity |
| 1,000-100,000 | 500-2,000 | Optimize memory and transaction costs |
| 100,000-1M | 1,000-5,000 | Checkpoint granularity vs overhead |
| > 1M records | 2,000-10,000 | Maximize throughput, manage memory |
Common Failure Scenarios
| Scenario | Detection | Recovery |
|---|---|---|
| Database deadlock | Exception during commit | Retry with exponential backoff |
| Out of memory | Process termination | Reduce batch size, stream processing |
| External service timeout | Request timeout | Implement circuit breaker, retry logic |
| Invalid data format | Validation error | Skip record, log to error queue |
| Duplicate processing | Idempotency check | Track processed records by key |
| Partial network failure | Connection error | Checkpoint progress, resume from checkpoint |
Transaction Isolation Levels
| Level | Behavior | Batch Processing Impact |
|---|---|---|
| Read uncommitted | Dirty reads allowed | Fastest, risky for financial data |
| Read committed | No dirty reads | Good balance for most batches |
| Repeatable read | Consistent snapshot | Prevents some anomalies, higher locking |
| Serializable | Full isolation | Slowest, maximum consistency |
Monitoring Metrics
| Metric | Purpose | Alert Threshold |
|---|---|---|
| Processing duration | Detect slowdowns | >2x baseline |
| Error rate | Track failures | >5% failed records |
| Queue depth | Monitor backlog | >10,000 pending jobs |
| Memory usage | Prevent OOM | >80% available memory |
| Success rate | Overall health | <95% success |
| Throughput | Performance tracking | <50% expected rate |