Overview
Sidekiq provides background job processing for Ruby applications through Redis-backed queues and multi-threaded workers. Jobs execute asynchronously outside the request-response cycle, improving application performance and user experience.
The core components include job classes, workers, queues, and the Sidekiq server process. Jobs inherit from Sidekiq::Job
and define work to be performed later. Workers execute these jobs concurrently using threads. Queues organize jobs by priority and category. The server process manages worker threads and job execution.
class EmailJob
include Sidekiq::Job
def perform(user_id, template)
user = User.find(user_id)
UserMailer.welcome(user, template).deliver_now
end
end
# Enqueue job for background processing
EmailJob.perform_async(123, 'welcome')
Sidekiq requires Redis for job storage and coordination between processes. Jobs serialize as JSON, limiting arguments to basic data types. The server process runs separately from web applications, scaling independently based on workload requirements.
# Configure Redis connection
Sidekiq.configure_server do |config|
config.redis = { url: 'redis://localhost:6379/0' }
end
Sidekiq.configure_client do |config|
config.redis = { url: 'redis://localhost:6379/0' }
end
Jobs execute with configurable retry logic, dead job handling, and queue prioritization. The web interface provides monitoring and management capabilities for job queues, worker processes, and failed jobs.
Basic Usage
Job classes define the work to be performed in background processes. The perform
method contains the job logic and accepts serializable arguments.
class DataProcessingJob
include Sidekiq::Job
def perform(file_path, options = {})
processor = DataProcessor.new(file_path)
processor.clean_data if options['clean']
processor.validate_data if options['validate']
processor.save_results
end
end
# Enqueue with different argument types
DataProcessingJob.perform_async('/tmp/data.csv', { 'clean' => true })
DataProcessingJob.perform_async('/tmp/raw.json', { 'validate' => true, 'clean' => false })
The perform_async
method queues jobs immediately for background execution. Arguments must serialize to JSON, supporting strings, numbers, booleans, arrays, and hashes.
class ReportJob
include Sidekiq::Job
def perform(report_type, user_ids, date_range)
report = ReportGenerator.new(report_type)
report.include_users(user_ids)
report.set_date_range(date_range['start'], date_range['end'])
report.generate_and_send
end
end
# Queue job with complex data structures
ReportJob.perform_async(
'monthly_sales',
[1, 2, 3, 4, 5],
{ 'start' => '2024-01-01', 'end' => '2024-01-31' }
)
Jobs can be scheduled for future execution using perform_in
or perform_at
methods. This enables delayed job processing and scheduled tasks.
class ReminderJob
include Sidekiq::Job
def perform(user_id, message)
user = User.find(user_id)
NotificationService.send_reminder(user, message)
end
end
# Schedule job for future execution
ReminderJob.perform_in(24.hours, user.id, 'Your trial expires tomorrow')
ReminderJob.perform_at(1.week.from_now, user.id, 'Weekly newsletter available')
Queue names organize jobs by type and priority. Default queue handles standard jobs, but custom queues enable prioritization and resource allocation.
class UrgentJob
include Sidekiq::Job
sidekiq_options queue: 'high_priority'
def perform(alert_data)
AlertSystem.process_urgent(alert_data)
end
end
class MaintenanceJob
include Sidekiq::Job
sidekiq_options queue: 'maintenance'
def perform(task_name)
MaintenanceRunner.execute(task_name)
end
end
# Jobs route to specified queues
UrgentJob.perform_async({ severity: 'critical', message: 'System overload' })
MaintenanceJob.perform_async('cleanup_temp_files')
Advanced Usage
Sidekiq provides extensive configuration options for job behavior, retry logic, and queue management. Job classes can specify custom retry counts, queue priorities, and execution timeouts.
class ComplexProcessingJob
include Sidekiq::Job
sidekiq_options queue: 'heavy_processing',
retry: 3,
backtrace: true,
dead: false
def perform(dataset_id, processing_options)
dataset = Dataset.find(dataset_id)
processor = ProcessingEngine.new(dataset)
processor.apply_transformations(processing_options['transformations'])
if processing_options['validate_results']
processor.run_validation_suite
end
processor.save_processed_data
NotificationService.job_completed(dataset_id, 'processing')
end
end
Middleware allows custom logic to run before and after job execution. Server middleware wraps job execution, while client middleware wraps job enqueueing.
class JobLoggingMiddleware
def call(job_instance, job_payload)
job_class = job_payload['class']
job_id = job_payload['jid']
Rails.logger.info "Starting job #{job_class} with ID #{job_id}"
start_time = Time.current
yield
duration = Time.current - start_time
Rails.logger.info "Completed job #{job_class} in #{duration.round(2)}s"
rescue => e
Rails.logger.error "Job #{job_class} failed: #{e.message}"
raise
end
end
# Register middleware
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add JobLoggingMiddleware
end
end
Sidekiq supports job batches for coordinating related work and tracking completion status. Batches group jobs and provide callbacks for success or failure scenarios.
class BatchProcessingJob
include Sidekiq::Job
def perform(file_chunk, batch_id)
chunk_processor = ChunkProcessor.new(file_chunk)
results = chunk_processor.process
# Store results with batch reference
BatchResult.create(
batch_id: batch_id,
chunk_data: file_chunk,
processed_data: results
)
end
end
# Create and populate batch
batch = Sidekiq::Batch.new
batch.description = "Process large dataset"
batch.on(:success, BatchCompletionJob)
batch.jobs do
large_dataset.chunks.each_with_index do |chunk, index|
BatchProcessingJob.perform_async(chunk, batch.bid)
end
end
Cron-like scheduling enables periodic job execution using the sidekiq-cron gem. Jobs run on specified schedules without external cron dependencies.
# Add to initializer
Sidekiq::Cron::Job.create(
name: 'Daily cleanup',
cron: '0 2 * * *',
class: 'DailyMaintenanceJob'
)
Sidekiq::Cron::Job.create(
name: 'Hourly metrics',
cron: '0 * * * *',
class: 'MetricsCollectionJob',
args: ['system_metrics']
)
class DailyMaintenanceJob
include Sidekiq::Job
def perform
cleanup_temp_files
archive_old_logs
update_search_indexes
end
private
def cleanup_temp_files
Dir.glob('/tmp/app_temp_*').each { |file| File.delete(file) }
end
def archive_old_logs
LogArchiver.archive_logs_older_than(30.days)
end
def update_search_indexes
SearchIndexer.refresh_all_indexes
end
end
Error Handling & Debugging
Sidekiq provides comprehensive error handling through retry mechanisms, dead job queues, and exception tracking. Jobs automatically retry on failure with exponential backoff.
class FileProcessingJob
include Sidekiq::Job
sidekiq_options retry: 5
def perform(file_path, processing_type)
begin
file_processor = FileProcessor.new(file_path)
case processing_type
when 'image'
process_image_file(file_processor)
when 'document'
process_document_file(file_processor)
else
raise ArgumentError, "Unknown processing type: #{processing_type}"
end
rescue FileNotFoundError => e
# Don't retry if file doesn't exist
raise Sidekiq::Job::Skip.new("File not found: #{file_path}")
rescue NetworkTimeoutError => e
# Log and let Sidekiq retry
Rails.logger.warn "Network timeout processing #{file_path}, will retry"
raise
rescue ProcessingError => e
# Custom retry logic based on error severity
if e.severity == 'fatal'
raise Sidekiq::Job::Skip.new("Fatal processing error: #{e.message}")
else
raise # Allow normal retry
end
end
end
private
def process_image_file(processor)
processor.resize_image
processor.optimize_compression
processor.save_processed_image
end
def process_document_file(processor)
processor.extract_text
processor.index_content
processor.save_metadata
end
end
Custom retry logic enables job-specific failure handling. The sidekiq_retry_in
method calculates retry delays based on attempt count and exception type.
class APIIntegrationJob
include Sidekiq::Job
sidekiq_options retry: 10
def perform(api_endpoint, payload)
api_client = APIClient.new(api_endpoint)
response = api_client.post(payload)
unless response.success?
raise APIError.new("API call failed: #{response.error_message}")
end
process_api_response(response.data)
end
def self.sidekiq_retry_in(count, exception)
case exception
when APIRateLimitError
# Wait longer for rate limit errors
(count ** 2) * 60 # Exponential backoff in minutes
when APIMaintenanceError
# Fixed delay for maintenance windows
30.minutes
when APITimeoutError
# Quick retry for timeouts
[10, 30, 60, 300][count - 1] || 600
else
# Default exponential backoff
(count ** 4) + 15
end
end
private
def process_api_response(data)
DataProcessor.new(data).process_and_store
end
end
The sidekiq_retries_exhausted
hook handles jobs that exceed retry limits. This enables cleanup, notification, or alternative processing paths.
class CriticalDataJob
include Sidekiq::Job
sidekiq_options retry: 3
def perform(critical_data_id)
data = CriticalData.find(critical_data_id)
processor = CriticalDataProcessor.new(data)
processor.validate_and_process
end
def self.sidekiq_retries_exhausted(job_payload, exception)
data_id = job_payload['args'].first
# Log failure for monitoring
Rails.logger.error "Critical job failed permanently: #{data_id}"
# Mark data as failed in database
CriticalData.find(data_id).update(
processing_status: 'failed',
failure_reason: exception.message,
failed_at: Time.current
)
# Send alert to operations team
AlertService.critical_job_failed(data_id, exception.message)
# Optionally queue fallback processing
FallbackProcessingJob.perform_async(data_id)
end
end
Debugging failed jobs involves examining job arguments, exception details, and execution context. The web interface provides detailed failure information and manual retry capabilities.
class DebuggableJob
include Sidekiq::Job
sidekiq_options backtrace: 20 # Include full backtrace
def perform(complex_data)
begin
Rails.logger.info "Processing job with data: #{complex_data.inspect}"
# Add debugging checkpoints
Rails.logger.debug "Starting validation phase"
validate_input_data(complex_data)
Rails.logger.debug "Starting processing phase"
result = process_complex_data(complex_data)
Rails.logger.debug "Processing completed successfully"
result
rescue => e
# Log detailed error context
Rails.logger.error "Job failed at step: #{@current_step}"
Rails.logger.error "Input data: #{complex_data.inspect}"
Rails.logger.error "Error: #{e.class.name} - #{e.message}"
raise # Re-raise for Sidekiq handling
end
end
private
def validate_input_data(data)
@current_step = 'validation'
# Validation logic with detailed logging
end
def process_complex_data(data)
@current_step = 'processing'
# Processing logic with checkpoint logging
end
end
Production Patterns
Production Sidekiq deployments require careful configuration for reliability, monitoring, and scalability. Worker processes should be monitored and automatically restarted on failure.
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
config.redis = {
url: ENV['REDIS_URL'],
network_timeout: 5,
pool_timeout: 5,
size: ENV.fetch('SIDEKIQ_REDIS_POOL_SIZE', 25).to_i
}
config.concurrency = ENV.fetch('SIDEKIQ_CONCURRENCY', 25).to_i
# Production error handling
config.error_handlers << lambda do |exception, context|
Rails.logger.error "Sidekiq job failed: #{exception.message}"
Rails.logger.error "Context: #{context.inspect}"
# Send to error tracking service
ErrorTracker.notify(exception, context)
# Custom alerting for critical jobs
if context[:job] && context[:job]['queue'] == 'critical'
AlertService.critical_job_failed(context[:job], exception)
end
end
end
Sidekiq.configure_client do |config|
config.redis = {
url: ENV['REDIS_URL'],
network_timeout: 5,
pool_timeout: 5,
size: ENV.fetch('SIDEKIQ_CLIENT_REDIS_POOL_SIZE', 5).to_i
}
end
Queue prioritization ensures critical jobs process before lower-priority work. Worker processes can be configured to handle specific queue combinations.
# Different worker processes for different queue priorities
# High priority worker: sidekiq -q critical,2 -q high,1
# Standard worker: sidekiq -q default,2 -q low,1
# Maintenance worker: sidekiq -q maintenance
class SystemAlertJob
include Sidekiq::Job
sidekiq_options queue: 'critical', retry: false
def perform(alert_type, alert_data)
case alert_type
when 'security_breach'
SecurityTeam.immediate_alert(alert_data)
SystemLockdown.initiate if alert_data['severity'] == 'critical'
when 'system_failure'
OperationsTeam.page_on_call(alert_data)
FailoverSystem.activate if alert_data['auto_failover']
end
end
end
class NewsletterJob
include Sidekiq::Job
sidekiq_options queue: 'low', retry: 5
def perform(subscriber_batch)
subscriber_batch.each do |subscriber_id|
subscriber = Subscriber.find(subscriber_id)
NewsletterMailer.weekly_digest(subscriber).deliver_now
sleep(0.1) # Rate limiting
end
end
end
Health checks monitor Sidekiq processes and queue depths. Automated alerts notify operations teams when queues back up or workers fail.
class SidekiqHealthCheck
def self.check_system_health
health_report = {
redis_connected: redis_healthy?,
workers_running: workers_healthy?,
queue_depths: queue_depth_analysis,
failed_jobs: failed_job_analysis,
memory_usage: memory_analysis
}
send_alerts_if_needed(health_report)
health_report
end
private
def self.redis_healthy?
Sidekiq.redis { |conn| conn.ping == 'PONG' }
rescue
false
end
def self.workers_healthy?
worker_count = Sidekiq::ProcessSet.new.size
expected_workers = ENV.fetch('EXPECTED_WORKER_COUNT', 3).to_i
worker_count >= expected_workers
end
def self.queue_depth_analysis
Sidekiq::Queue.all.map do |queue|
depth = queue.size
latency = queue.latency
{
name: queue.name,
depth: depth,
latency: latency,
alert: depth > queue_threshold(queue.name)
}
end
end
def self.queue_threshold(queue_name)
thresholds = {
'critical' => 10,
'high' => 100,
'default' => 1000,
'low' => 5000
}
thresholds[queue_name] || 500
end
def self.failed_job_analysis
retry_set = Sidekiq::RetrySet.new
dead_set = Sidekiq::DeadSet.new
{
retrying: retry_set.size,
dead: dead_set.size,
recent_failures: recent_failure_count
}
end
def self.memory_analysis
stats = Sidekiq::Stats.new
{
processed: stats.processed,
failed: stats.failed,
busy: stats.workers_size,
processes: stats.processes_size
}
end
end
# Schedule regular health checks
class HealthCheckJob
include Sidekiq::Job
def perform
health_report = SidekiqHealthCheck.check_system_health
HealthMetrics.record(health_report)
end
end
Rails integration patterns handle job enqueueing in web requests efficiently. Jobs should be queued after successful database transactions to maintain consistency.
class OrdersController < ApplicationController
def create
@order = Order.new(order_params)
Order.transaction do
@order.save!
# Enqueue jobs after successful database commit
@order.line_items.each do |item|
InventoryUpdateJob.perform_async(item.product_id, item.quantity)
end
# Send confirmation email
OrderConfirmationJob.perform_async(@order.id)
# Update analytics
AnalyticsJob.perform_async('order_created', @order.attributes)
# Schedule follow-up tasks
FollowUpEmailJob.perform_in(3.days, @order.id, 'review_request')
end
redirect_to @order, notice: 'Order created successfully!'
rescue ActiveRecord::RecordInvalid
render :new, status: :unprocessable_entity
end
private
def order_params
params.require(:order).permit(:customer_id, :shipping_address,
line_items_attributes: [:product_id, :quantity])
end
end
Reference
Core Job Methods
Method | Parameters | Returns | Description |
---|---|---|---|
perform_async(*args) |
Variable arguments | String (job ID) |
Enqueues job for immediate background processing |
perform_in(interval, *args) |
interval (Time/Numeric), arguments |
String (job ID) |
Schedules job for future execution after interval |
perform_at(timestamp, *args) |
timestamp (Time), arguments |
String (job ID) |
Schedules job for execution at specific time |
perform(*args) |
Variable arguments | Object |
Executes job synchronously in current process |
Configuration Options
Option | Type | Default | Description |
---|---|---|---|
queue |
String |
'default' |
Queue name for job routing |
retry |
Integer/Boolean |
25 |
Number of retry attempts, false disables |
backtrace |
Integer/Boolean |
false |
Lines of backtrace to store, true stores all |
dead |
Boolean |
true |
Whether failed jobs move to dead set |
tags |
Array<String> |
[] |
Tags for job organization and filtering |
Middleware Interface
Method | Parameters | Returns | Description |
---|---|---|---|
call(worker, job, queue) |
Worker instance, job hash, queue name | Object |
Server middleware execution wrapper |
call(job_class, job, queue, redis_pool) |
Job class, job hash, queue, redis pool | Object |
Client middleware execution wrapper |
Queue Operations
Method | Parameters | Returns | Description |
---|---|---|---|
Sidekiq::Queue.new(name) |
name (String) |
Sidekiq::Queue |
Creates queue interface object |
#size |
None | Integer |
Returns number of jobs in queue |
#latency |
None | Float |
Returns seconds since oldest job queued |
#clear |
None | Integer |
Removes all jobs from queue |
#each |
Block | Enumerator |
Iterates through jobs in queue |
Stats and Monitoring
Method | Parameters | Returns | Description |
---|---|---|---|
Sidekiq::Stats.new |
None | Sidekiq::Stats |
Creates stats interface object |
#processed |
None | Integer |
Total jobs processed since startup |
#failed |
None | Integer |
Total jobs failed since startup |
#workers_size |
None | Integer |
Number of currently busy workers |
#processes_size |
None | Integer |
Number of running Sidekiq processes |
Error Handling Hooks
Hook | Parameters | Description |
---|---|---|
sidekiq_retry_in(count, exception) |
Retry count, exception instance | Returns seconds until next retry |
sidekiq_retries_exhausted(job, exception) |
Job payload hash, exception | Called when retries exhausted |
Redis Configuration
Parameter | Type | Description |
---|---|---|
url |
String |
Redis connection URL |
namespace |
String |
Redis key namespace prefix |
pool_timeout |
Integer |
Connection pool checkout timeout |
network_timeout |
Integer |
Redis network operation timeout |
size |
Integer |
Connection pool size |
Common Error Classes
Error | Inheritance | Description |
---|---|---|
Sidekiq::Job::Skip |
StandardError |
Prevents job retry and moves to dead set |
Sidekiq::Shutdown |
Interrupt |
Raised when Sidekiq server shutting down |
Redis::ConnectionError |
RuntimeError |
Redis connection failure |
Redis::TimeoutError |
RuntimeError |
Redis operation timeout |
Environment Variables
Variable | Default | Description |
---|---|---|
REDIS_URL |
redis://localhost:6379/0 |
Redis server connection string |
SIDEKIQ_CONCURRENCY |
25 |
Number of worker threads per process |
SIDEKIQ_TIMEOUT |
25 |
Job execution timeout in seconds |
SIDEKIQ_REDIS_POOL_SIZE |
Same as concurrency | Redis connection pool size |