CrackedRuby logo

CrackedRuby

Sidekiq

Overview

Sidekiq provides background job processing for Ruby applications through Redis-backed queues and multi-threaded workers. Jobs execute asynchronously outside the request-response cycle, improving application performance and user experience.

The core components include job classes, workers, queues, and the Sidekiq server process. Jobs inherit from Sidekiq::Job and define work to be performed later. Workers execute these jobs concurrently using threads. Queues organize jobs by priority and category. The server process manages worker threads and job execution.

class EmailJob
  include Sidekiq::Job

  def perform(user_id, template)
    user = User.find(user_id)
    UserMailer.welcome(user, template).deliver_now
  end
end

# Enqueue job for background processing
EmailJob.perform_async(123, 'welcome')

Sidekiq requires Redis for job storage and coordination between processes. Jobs serialize as JSON, limiting arguments to basic data types. The server process runs separately from web applications, scaling independently based on workload requirements.

# Configure Redis connection
Sidekiq.configure_server do |config|
  config.redis = { url: 'redis://localhost:6379/0' }
end

Sidekiq.configure_client do |config|
  config.redis = { url: 'redis://localhost:6379/0' }
end

Jobs execute with configurable retry logic, dead job handling, and queue prioritization. The web interface provides monitoring and management capabilities for job queues, worker processes, and failed jobs.

Basic Usage

Job classes define the work to be performed in background processes. The perform method contains the job logic and accepts serializable arguments.

class DataProcessingJob
  include Sidekiq::Job

  def perform(file_path, options = {})
    processor = DataProcessor.new(file_path)
    processor.clean_data if options['clean']
    processor.validate_data if options['validate']
    processor.save_results
  end
end

# Enqueue with different argument types
DataProcessingJob.perform_async('/tmp/data.csv', { 'clean' => true })
DataProcessingJob.perform_async('/tmp/raw.json', { 'validate' => true, 'clean' => false })

The perform_async method queues jobs immediately for background execution. Arguments must serialize to JSON, supporting strings, numbers, booleans, arrays, and hashes.

class ReportJob
  include Sidekiq::Job

  def perform(report_type, user_ids, date_range)
    report = ReportGenerator.new(report_type)
    report.include_users(user_ids)
    report.set_date_range(date_range['start'], date_range['end'])
    report.generate_and_send
  end
end

# Queue job with complex data structures
ReportJob.perform_async(
  'monthly_sales',
  [1, 2, 3, 4, 5],
  { 'start' => '2024-01-01', 'end' => '2024-01-31' }
)

Jobs can be scheduled for future execution using perform_in or perform_at methods. This enables delayed job processing and scheduled tasks.

class ReminderJob
  include Sidekiq::Job

  def perform(user_id, message)
    user = User.find(user_id)
    NotificationService.send_reminder(user, message)
  end
end

# Schedule job for future execution
ReminderJob.perform_in(24.hours, user.id, 'Your trial expires tomorrow')
ReminderJob.perform_at(1.week.from_now, user.id, 'Weekly newsletter available')

Queue names organize jobs by type and priority. Default queue handles standard jobs, but custom queues enable prioritization and resource allocation.

class UrgentJob
  include Sidekiq::Job
  sidekiq_options queue: 'high_priority'

  def perform(alert_data)
    AlertSystem.process_urgent(alert_data)
  end
end

class MaintenanceJob
  include Sidekiq::Job
  sidekiq_options queue: 'maintenance'

  def perform(task_name)
    MaintenanceRunner.execute(task_name)
  end
end

# Jobs route to specified queues
UrgentJob.perform_async({ severity: 'critical', message: 'System overload' })
MaintenanceJob.perform_async('cleanup_temp_files')

Advanced Usage

Sidekiq provides extensive configuration options for job behavior, retry logic, and queue management. Job classes can specify custom retry counts, queue priorities, and execution timeouts.

class ComplexProcessingJob
  include Sidekiq::Job
  sidekiq_options queue: 'heavy_processing', 
                  retry: 3, 
                  backtrace: true,
                  dead: false

  def perform(dataset_id, processing_options)
    dataset = Dataset.find(dataset_id)
    
    processor = ProcessingEngine.new(dataset)
    processor.apply_transformations(processing_options['transformations'])
    
    if processing_options['validate_results']
      processor.run_validation_suite
    end
    
    processor.save_processed_data
    NotificationService.job_completed(dataset_id, 'processing')
  end
end

Middleware allows custom logic to run before and after job execution. Server middleware wraps job execution, while client middleware wraps job enqueueing.

class JobLoggingMiddleware
  def call(job_instance, job_payload)
    job_class = job_payload['class']
    job_id = job_payload['jid']
    
    Rails.logger.info "Starting job #{job_class} with ID #{job_id}"
    start_time = Time.current
    
    yield
    
    duration = Time.current - start_time
    Rails.logger.info "Completed job #{job_class} in #{duration.round(2)}s"
  rescue => e
    Rails.logger.error "Job #{job_class} failed: #{e.message}"
    raise
  end
end

# Register middleware
Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.add JobLoggingMiddleware
  end
end

Sidekiq supports job batches for coordinating related work and tracking completion status. Batches group jobs and provide callbacks for success or failure scenarios.

class BatchProcessingJob
  include Sidekiq::Job

  def perform(file_chunk, batch_id)
    chunk_processor = ChunkProcessor.new(file_chunk)
    results = chunk_processor.process
    
    # Store results with batch reference
    BatchResult.create(
      batch_id: batch_id,
      chunk_data: file_chunk,
      processed_data: results
    )
  end
end

# Create and populate batch
batch = Sidekiq::Batch.new
batch.description = "Process large dataset"
batch.on(:success, BatchCompletionJob)

batch.jobs do
  large_dataset.chunks.each_with_index do |chunk, index|
    BatchProcessingJob.perform_async(chunk, batch.bid)
  end
end

Cron-like scheduling enables periodic job execution using the sidekiq-cron gem. Jobs run on specified schedules without external cron dependencies.

# Add to initializer
Sidekiq::Cron::Job.create(
  name: 'Daily cleanup',
  cron: '0 2 * * *',
  class: 'DailyMaintenanceJob'
)

Sidekiq::Cron::Job.create(
  name: 'Hourly metrics',
  cron: '0 * * * *',
  class: 'MetricsCollectionJob',
  args: ['system_metrics']
)

class DailyMaintenanceJob
  include Sidekiq::Job
  
  def perform
    cleanup_temp_files
    archive_old_logs
    update_search_indexes
  end
  
  private
  
  def cleanup_temp_files
    Dir.glob('/tmp/app_temp_*').each { |file| File.delete(file) }
  end
  
  def archive_old_logs
    LogArchiver.archive_logs_older_than(30.days)
  end
  
  def update_search_indexes
    SearchIndexer.refresh_all_indexes
  end
end

Error Handling & Debugging

Sidekiq provides comprehensive error handling through retry mechanisms, dead job queues, and exception tracking. Jobs automatically retry on failure with exponential backoff.

class FileProcessingJob
  include Sidekiq::Job
  sidekiq_options retry: 5

  def perform(file_path, processing_type)
    begin
      file_processor = FileProcessor.new(file_path)
      
      case processing_type
      when 'image'
        process_image_file(file_processor)
      when 'document'
        process_document_file(file_processor)
      else
        raise ArgumentError, "Unknown processing type: #{processing_type}"
      end
      
    rescue FileNotFoundError => e
      # Don't retry if file doesn't exist
      raise Sidekiq::Job::Skip.new("File not found: #{file_path}")
    rescue NetworkTimeoutError => e
      # Log and let Sidekiq retry
      Rails.logger.warn "Network timeout processing #{file_path}, will retry"
      raise
    rescue ProcessingError => e
      # Custom retry logic based on error severity
      if e.severity == 'fatal'
        raise Sidekiq::Job::Skip.new("Fatal processing error: #{e.message}")
      else
        raise # Allow normal retry
      end
    end
  end
  
  private
  
  def process_image_file(processor)
    processor.resize_image
    processor.optimize_compression
    processor.save_processed_image
  end
  
  def process_document_file(processor)
    processor.extract_text
    processor.index_content
    processor.save_metadata
  end
end

Custom retry logic enables job-specific failure handling. The sidekiq_retry_in method calculates retry delays based on attempt count and exception type.

class APIIntegrationJob
  include Sidekiq::Job
  sidekiq_options retry: 10

  def perform(api_endpoint, payload)
    api_client = APIClient.new(api_endpoint)
    response = api_client.post(payload)
    
    unless response.success?
      raise APIError.new("API call failed: #{response.error_message}")
    end
    
    process_api_response(response.data)
  end
  
  def self.sidekiq_retry_in(count, exception)
    case exception
    when APIRateLimitError
      # Wait longer for rate limit errors
      (count ** 2) * 60 # Exponential backoff in minutes
    when APIMaintenanceError
      # Fixed delay for maintenance windows
      30.minutes
    when APITimeoutError
      # Quick retry for timeouts
      [10, 30, 60, 300][count - 1] || 600
    else
      # Default exponential backoff
      (count ** 4) + 15
    end
  end
  
  private
  
  def process_api_response(data)
    DataProcessor.new(data).process_and_store
  end
end

The sidekiq_retries_exhausted hook handles jobs that exceed retry limits. This enables cleanup, notification, or alternative processing paths.

class CriticalDataJob
  include Sidekiq::Job
  sidekiq_options retry: 3

  def perform(critical_data_id)
    data = CriticalData.find(critical_data_id)
    processor = CriticalDataProcessor.new(data)
    processor.validate_and_process
  end
  
  def self.sidekiq_retries_exhausted(job_payload, exception)
    data_id = job_payload['args'].first
    
    # Log failure for monitoring
    Rails.logger.error "Critical job failed permanently: #{data_id}"
    
    # Mark data as failed in database
    CriticalData.find(data_id).update(
      processing_status: 'failed',
      failure_reason: exception.message,
      failed_at: Time.current
    )
    
    # Send alert to operations team
    AlertService.critical_job_failed(data_id, exception.message)
    
    # Optionally queue fallback processing
    FallbackProcessingJob.perform_async(data_id)
  end
end

Debugging failed jobs involves examining job arguments, exception details, and execution context. The web interface provides detailed failure information and manual retry capabilities.

class DebuggableJob
  include Sidekiq::Job
  sidekiq_options backtrace: 20 # Include full backtrace

  def perform(complex_data)
    begin
      Rails.logger.info "Processing job with data: #{complex_data.inspect}"
      
      # Add debugging checkpoints
      Rails.logger.debug "Starting validation phase"
      validate_input_data(complex_data)
      
      Rails.logger.debug "Starting processing phase"
      result = process_complex_data(complex_data)
      
      Rails.logger.debug "Processing completed successfully"
      result
    rescue => e
      # Log detailed error context
      Rails.logger.error "Job failed at step: #{@current_step}"
      Rails.logger.error "Input data: #{complex_data.inspect}"
      Rails.logger.error "Error: #{e.class.name} - #{e.message}"
      
      raise # Re-raise for Sidekiq handling
    end
  end
  
  private
  
  def validate_input_data(data)
    @current_step = 'validation'
    # Validation logic with detailed logging
  end
  
  def process_complex_data(data)
    @current_step = 'processing'
    # Processing logic with checkpoint logging
  end
end

Production Patterns

Production Sidekiq deployments require careful configuration for reliability, monitoring, and scalability. Worker processes should be monitored and automatically restarted on failure.

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.redis = { 
    url: ENV['REDIS_URL'],
    network_timeout: 5,
    pool_timeout: 5,
    size: ENV.fetch('SIDEKIQ_REDIS_POOL_SIZE', 25).to_i
  }
  
  config.concurrency = ENV.fetch('SIDEKIQ_CONCURRENCY', 25).to_i
  
  # Production error handling
  config.error_handlers << lambda do |exception, context|
    Rails.logger.error "Sidekiq job failed: #{exception.message}"
    Rails.logger.error "Context: #{context.inspect}"
    
    # Send to error tracking service
    ErrorTracker.notify(exception, context)
    
    # Custom alerting for critical jobs
    if context[:job] && context[:job]['queue'] == 'critical'
      AlertService.critical_job_failed(context[:job], exception)
    end
  end
end

Sidekiq.configure_client do |config|
  config.redis = { 
    url: ENV['REDIS_URL'],
    network_timeout: 5,
    pool_timeout: 5,
    size: ENV.fetch('SIDEKIQ_CLIENT_REDIS_POOL_SIZE', 5).to_i
  }
end

Queue prioritization ensures critical jobs process before lower-priority work. Worker processes can be configured to handle specific queue combinations.

# Different worker processes for different queue priorities
# High priority worker: sidekiq -q critical,2 -q high,1
# Standard worker: sidekiq -q default,2 -q low,1
# Maintenance worker: sidekiq -q maintenance

class SystemAlertJob
  include Sidekiq::Job
  sidekiq_options queue: 'critical', retry: false

  def perform(alert_type, alert_data)
    case alert_type
    when 'security_breach'
      SecurityTeam.immediate_alert(alert_data)
      SystemLockdown.initiate if alert_data['severity'] == 'critical'
    when 'system_failure'
      OperationsTeam.page_on_call(alert_data)
      FailoverSystem.activate if alert_data['auto_failover']
    end
  end
end

class NewsletterJob
  include Sidekiq::Job
  sidekiq_options queue: 'low', retry: 5

  def perform(subscriber_batch)
    subscriber_batch.each do |subscriber_id|
      subscriber = Subscriber.find(subscriber_id)
      NewsletterMailer.weekly_digest(subscriber).deliver_now
      sleep(0.1) # Rate limiting
    end
  end
end

Health checks monitor Sidekiq processes and queue depths. Automated alerts notify operations teams when queues back up or workers fail.

class SidekiqHealthCheck
  def self.check_system_health
    health_report = {
      redis_connected: redis_healthy?,
      workers_running: workers_healthy?,
      queue_depths: queue_depth_analysis,
      failed_jobs: failed_job_analysis,
      memory_usage: memory_analysis
    }
    
    send_alerts_if_needed(health_report)
    health_report
  end
  
  private
  
  def self.redis_healthy?
    Sidekiq.redis { |conn| conn.ping == 'PONG' }
  rescue
    false
  end
  
  def self.workers_healthy?
    worker_count = Sidekiq::ProcessSet.new.size
    expected_workers = ENV.fetch('EXPECTED_WORKER_COUNT', 3).to_i
    
    worker_count >= expected_workers
  end
  
  def self.queue_depth_analysis
    Sidekiq::Queue.all.map do |queue|
      depth = queue.size
      latency = queue.latency
      
      {
        name: queue.name,
        depth: depth,
        latency: latency,
        alert: depth > queue_threshold(queue.name)
      }
    end
  end
  
  def self.queue_threshold(queue_name)
    thresholds = {
      'critical' => 10,
      'high' => 100,
      'default' => 1000,
      'low' => 5000
    }
    thresholds[queue_name] || 500
  end
  
  def self.failed_job_analysis
    retry_set = Sidekiq::RetrySet.new
    dead_set = Sidekiq::DeadSet.new
    
    {
      retrying: retry_set.size,
      dead: dead_set.size,
      recent_failures: recent_failure_count
    }
  end
  
  def self.memory_analysis
    stats = Sidekiq::Stats.new
    {
      processed: stats.processed,
      failed: stats.failed,
      busy: stats.workers_size,
      processes: stats.processes_size
    }
  end
end

# Schedule regular health checks
class HealthCheckJob
  include Sidekiq::Job
  
  def perform
    health_report = SidekiqHealthCheck.check_system_health
    HealthMetrics.record(health_report)
  end
end

Rails integration patterns handle job enqueueing in web requests efficiently. Jobs should be queued after successful database transactions to maintain consistency.

class OrdersController < ApplicationController
  def create
    @order = Order.new(order_params)
    
    Order.transaction do
      @order.save!
      
      # Enqueue jobs after successful database commit
      @order.line_items.each do |item|
        InventoryUpdateJob.perform_async(item.product_id, item.quantity)
      end
      
      # Send confirmation email
      OrderConfirmationJob.perform_async(@order.id)
      
      # Update analytics
      AnalyticsJob.perform_async('order_created', @order.attributes)
      
      # Schedule follow-up tasks
      FollowUpEmailJob.perform_in(3.days, @order.id, 'review_request')
    end
    
    redirect_to @order, notice: 'Order created successfully!'
  rescue ActiveRecord::RecordInvalid
    render :new, status: :unprocessable_entity
  end
  
  private
  
  def order_params
    params.require(:order).permit(:customer_id, :shipping_address, 
                                  line_items_attributes: [:product_id, :quantity])
  end
end

Reference

Core Job Methods

Method Parameters Returns Description
perform_async(*args) Variable arguments String (job ID) Enqueues job for immediate background processing
perform_in(interval, *args) interval (Time/Numeric), arguments String (job ID) Schedules job for future execution after interval
perform_at(timestamp, *args) timestamp (Time), arguments String (job ID) Schedules job for execution at specific time
perform(*args) Variable arguments Object Executes job synchronously in current process

Configuration Options

Option Type Default Description
queue String 'default' Queue name for job routing
retry Integer/Boolean 25 Number of retry attempts, false disables
backtrace Integer/Boolean false Lines of backtrace to store, true stores all
dead Boolean true Whether failed jobs move to dead set
tags Array<String> [] Tags for job organization and filtering

Middleware Interface

Method Parameters Returns Description
call(worker, job, queue) Worker instance, job hash, queue name Object Server middleware execution wrapper
call(job_class, job, queue, redis_pool) Job class, job hash, queue, redis pool Object Client middleware execution wrapper

Queue Operations

Method Parameters Returns Description
Sidekiq::Queue.new(name) name (String) Sidekiq::Queue Creates queue interface object
#size None Integer Returns number of jobs in queue
#latency None Float Returns seconds since oldest job queued
#clear None Integer Removes all jobs from queue
#each Block Enumerator Iterates through jobs in queue

Stats and Monitoring

Method Parameters Returns Description
Sidekiq::Stats.new None Sidekiq::Stats Creates stats interface object
#processed None Integer Total jobs processed since startup
#failed None Integer Total jobs failed since startup
#workers_size None Integer Number of currently busy workers
#processes_size None Integer Number of running Sidekiq processes

Error Handling Hooks

Hook Parameters Description
sidekiq_retry_in(count, exception) Retry count, exception instance Returns seconds until next retry
sidekiq_retries_exhausted(job, exception) Job payload hash, exception Called when retries exhausted

Redis Configuration

Parameter Type Description
url String Redis connection URL
namespace String Redis key namespace prefix
pool_timeout Integer Connection pool checkout timeout
network_timeout Integer Redis network operation timeout
size Integer Connection pool size

Common Error Classes

Error Inheritance Description
Sidekiq::Job::Skip StandardError Prevents job retry and moves to dead set
Sidekiq::Shutdown Interrupt Raised when Sidekiq server shutting down
Redis::ConnectionError RuntimeError Redis connection failure
Redis::TimeoutError RuntimeError Redis operation timeout

Environment Variables

Variable Default Description
REDIS_URL redis://localhost:6379/0 Redis server connection string
SIDEKIQ_CONCURRENCY 25 Number of worker threads per process
SIDEKIQ_TIMEOUT 25 Job execution timeout in seconds
SIDEKIQ_REDIS_POOL_SIZE Same as concurrency Redis connection pool size