CrackedRuby logo

CrackedRuby

Delayed Job

Overview

Delayed Job provides asynchronous job processing for Ruby applications through database-backed queues. The gem stores jobs as records in a database table and processes them through worker processes that poll for available work. Jobs execute in separate processes from the main application, enabling long-running tasks without blocking web requests.

The core architecture centers around the Delayed::Job model class and Delayed::Worker processing engine. Jobs get serialized into the database with YAML, including the target object, method name, and arguments. Workers continuously poll the jobs table for available work, deserialize job data, and execute the specified methods.

# Enqueue a job to run later
user = User.find(123)
user.delay.send_welcome_email

# Create a job with priority and run time
Delayed::Job.enqueue(EmailJob.new(user.id), priority: 10, run_at: 2.hours.from_now)

# Check job status
job = Delayed::Job.last
job.failed_at  # => nil if successful
job.attempts   # => number of execution attempts

Delayed Job integrates with ActiveRecord models through the delay method proxy, which converts any method call into an asynchronous job. The system handles job serialization, worker management, retry logic, and failure tracking automatically.

The database table structure includes columns for job handler (serialized YAML), priority, attempts, run time, and error tracking. Workers claim jobs atomically using database locks to prevent race conditions in multi-worker environments.

# Database columns for delayed_jobs table
# id, priority, attempts, handler, last_error, run_at, locked_at, failed_at, locked_by, queue, created_at, updated_at

# Custom job class
class ReportGenerationJob
  def initialize(report_id)
    @report_id = report_id
  end
  
  def perform
    report = Report.find(@report_id)
    report.generate_pdf
    report.send_to_recipients
  end
end

Basic Usage

The delay method provides the simplest way to defer method execution on any object. This approach works with ActiveRecord models, plain Ruby objects, and class methods. The delay proxy captures the method call and converts it into a serialized job entry.

# Defer instance method execution
user = User.find(1)
user.delay.calculate_statistics
user.delay.send_notification("Account updated")

# Defer class method execution
User.delay.cleanup_inactive_accounts
Mailer.delay.send_weekly_digest

# Schedule jobs for future execution
user.delay(run_at: 1.day.from_now).expire_trial_account
user.delay(priority: 5).process_urgent_request

The Delayed::Job.enqueue method accepts job objects that respond to perform. This approach provides more control over job behavior and enables custom job classes with complex initialization logic.

class DataImportJob
  def initialize(file_path, user_id)
    @file_path = file_path
    @user_id = user_id
  end
  
  def perform
    user = User.find(@user_id)
    ImportService.new(@file_path, user).execute
  end
end

# Enqueue custom job instances
Delayed::Job.enqueue(DataImportJob.new("/tmp/import.csv", current_user.id))
Delayed::Job.enqueue(DataImportJob.new("/tmp/large.csv", current_user.id), priority: 0, queue: 'imports')

Job prioritization uses numeric values where lower numbers indicate higher priority. The default priority is 0, with negative numbers for urgent jobs and positive numbers for lower priority work.

# Priority examples
Delayed::Job.enqueue(UrgentJob.new, priority: -10)  # Highest priority
Delayed::Job.enqueue(NormalJob.new, priority: 0)    # Default priority  
Delayed::Job.enqueue(CleanupJob.new, priority: 100) # Lowest priority

# Queue-based job organization
user.delay(queue: 'emails').send_welcome_message
user.delay(queue: 'reports').generate_monthly_report
user.delay(queue: 'maintenance').cleanup_old_data

Worker processes handle job execution through the Delayed::Worker class. Workers poll the database for available jobs, execute them, and update job records with results or error information.

# Start a worker programmatically
worker = Delayed::Worker.new
worker.name = "worker-#{Process.pid}"
worker.start

# Configure worker behavior
worker.max_attempts = 5
worker.max_run_time = 4.hours
worker.sleep_delay = 10.seconds

Advanced Usage

Custom job classes enable complex initialization patterns, error handling, and performance optimizations. Jobs can implement additional methods beyond perform to control retry behavior, error handling, and resource cleanup.

class BatchProcessingJob
  def initialize(batch_id, options = {})
    @batch_id = batch_id
    @chunk_size = options[:chunk_size] || 1000
    @retry_failed = options[:retry_failed] || false
  end
  
  def perform
    batch = ProcessingBatch.find(@batch_id)
    batch.records.find_in_batches(batch_size: @chunk_size) do |chunk|
      process_chunk(chunk)
    end
    batch.mark_completed!
  end
  
  # Control maximum retry attempts
  def max_attempts
    3
  end
  
  # Custom failure handling
  def failure(job)
    batch = ProcessingBatch.find(@batch_id)
    batch.mark_failed!(job.last_error)
    AdminMailer.delay.job_failure_notification(@batch_id, job.last_error)
  end
  
  # Reschedule logic for temporary failures
  def reschedule_at(time, attempts)
    case attempts
    when 1 then 1.minute.from_now
    when 2 then 10.minutes.from_now
    else 1.hour.from_now
    end
  end
  
  private
  
  def process_chunk(records)
    records.each { |record| record.process! }
  end
end

Job hooks provide lifecycle callbacks for logging, monitoring, and cleanup operations. These methods execute at specific points during job processing and enable custom behavior without modifying core worker logic.

class MonitoredJob
  def initialize(task_id)
    @task_id = task_id
  end
  
  def perform
    task = Task.find(@task_id)
    task.execute_complex_operation
  end
  
  # Called before job execution
  def before(job)
    Rails.logger.info "Starting job #{job.id} for task #{@task_id}"
    @start_time = Time.current
  end
  
  # Called after successful execution
  def after(job)
    duration = Time.current - @start_time
    Rails.logger.info "Job #{job.id} completed in #{duration.round(2)} seconds"
    Metrics.record_job_duration(duration)
  end
  
  # Called when job encounters errors
  def error(job, exception)
    Rails.logger.error "Job #{job.id} failed: #{exception.message}"
    ErrorTracker.notify(exception, job_id: job.id, task_id: @task_id)
  end
  
  # Called when job reaches max attempts
  def failure(job)
    task = Task.find(@task_id)
    task.mark_failed!
    AdminMailer.delay(priority: -5).critical_job_failure(@task_id, job.last_error)
  end
end

Queue-based job routing enables worker specialization and resource allocation. Different worker processes can handle specific queues, allowing separation of CPU-intensive, I/O-bound, and time-sensitive operations.

# Configure queue-specific workers
class EmailWorker < Delayed::Worker
  def initialize
    super
    @queues = ['emails', 'notifications']
    @sleep_delay = 5.seconds
  end
end

class ReportWorker < Delayed::Worker  
  def initialize
    super
    @queues = ['reports']
    @sleep_delay = 30.seconds
    @max_run_time = 2.hours
  end
end

# Enqueue jobs to specific queues
User.delay(queue: 'emails').send_password_reset(user_id)
Report.delay(queue: 'reports').generate_annual_summary(year)

Job serialization customization enables handling of complex objects and optimizing database storage. Custom serialization reduces payload size and handles objects that don't serialize cleanly with YAML.

class OptimizedJob
  def initialize(model_class, model_id, action, options = {})
    @model_class = model_class.to_s
    @model_id = model_id
    @action = action.to_s
    @options = options
  end
  
  def perform
    model = @model_class.constantize.find(@model_id)
    model.public_send(@action, @options)
  end
  
  # Display name for admin interface
  def display_name
    "#{@model_class}##{@action} for ID #{@model_id}"
  end
end

# Usage with complex objects
OptimizedJob.new(User, user.id, :send_welcome_email, template: 'premium')

Error Handling & Debugging

Delayed Job implements automatic retry logic with exponential backoff for failed jobs. The system tracks attempt counts, error messages, and failure timestamps to provide comprehensive error information and prevent infinite retry loops.

# Configure global retry behavior
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 4.hours
Delayed::Worker.delay_jobs = !Rails.env.test?

# Custom retry logic in job classes
class NetworkServiceJob
  def initialize(endpoint, data)
    @endpoint = endpoint
    @data = data
  end
  
  def perform
    response = HTTPService.post(@endpoint, @data)
    handle_response(response)
  rescue Net::TimeoutError, Net::HTTPError => e
    # Re-raise for automatic retry
    raise e
  rescue JSON::ParserError => e
    # Don't retry parse errors
    Rails.logger.error "JSON parse failed for #{@endpoint}: #{e.message}"
    return false
  end
  
  def max_attempts
    5
  end
  
  def reschedule_at(time, attempts)
    # Exponential backoff with jitter
    delay = (attempts ** 2) * 30 + rand(30)
    time + delay.seconds
  end
end

Failed job investigation requires examining job records, error messages, and execution context. The database stores complete error information including backtraces and attempt history.

# Query failed jobs
failed_jobs = Delayed::Job.where.not(failed_at: nil)
recent_failures = Delayed::Job.where('failed_at > ?', 1.hour.ago)

# Examine specific failure
job = Delayed::Job.find(123)
puts job.last_error       # Exception message and backtrace
puts job.attempts         # Number of execution attempts
puts job.failed_at        # Timestamp of final failure
puts job.handler          # Serialized job data (YAML)

# Inspect job payload
begin
  job_object = YAML.load(job.handler)
  puts job_object.class
  puts job_object.instance_variables
rescue => e
  puts "Deserialization failed: #{e.message}"
end

Exception handling within jobs determines retry behavior and error reporting. Raising exceptions triggers the retry mechanism, while returning false marks jobs as failed without retries.

class DataProcessingJob
  def initialize(file_path)
    @file_path = file_path
  end
  
  def perform
    validate_file_exists!
    process_data_file
  rescue FileNotFoundError
    # Don't retry missing files
    Rails.logger.error "File not found: #{@file_path}"
    return false
  rescue DataValidationError => e
    # Retry validation errors up to max attempts
    Rails.logger.warn "Validation failed (attempt #{current_attempts}): #{e.message}"
    raise e if should_retry_validation?
    return false
  rescue StandardError => e
    # Log unexpected errors and retry
    Rails.logger.error "Unexpected error processing #{@file_path}: #{e.message}"
    ErrorTracker.notify(e, file_path: @file_path)
    raise e
  end
  
  private
  
  def current_attempts
    # Access current job context (Rails 3.2+)
    Thread.current[:delayed_job]&.attempts || 0
  end
  
  def should_retry_validation?
    current_attempts < 2
  end
  
  def validate_file_exists!
    raise FileNotFoundError unless File.exist?(@file_path)
  end
end

Job debugging often requires reproducing execution context and examining serialization issues. Creating debug methods and logging strategies helps identify problems in complex job hierarchies.

class DebuggableJob
  def initialize(params)
    @params = params
    @debug = Rails.env.development?
  end
  
  def perform
    log_debug "Starting job with params: #{@params.inspect}"
    
    result = execute_main_logic
    
    log_debug "Job completed successfully with result: #{result.inspect}"
    result
  rescue => e
    log_error "Job failed with exception: #{e.class} - #{e.message}"
    log_error "Backtrace: #{e.backtrace.join("\n")}" if @debug
    log_error "Job params: #{@params.inspect}"
    
    # Capture additional context
    capture_debug_context(e)
    raise e
  end
  
  private
  
  def log_debug(message)
    Rails.logger.debug "[#{self.class}] #{message}" if @debug
  end
  
  def log_error(message)
    Rails.logger.error "[#{self.class}] #{message}"
  end
  
  def capture_debug_context(exception)
    context = {
      job_class: self.class.name,
      params: @params,
      ruby_version: RUBY_VERSION,
      rails_version: Rails.version,
      timestamp: Time.current.iso8601
    }
    
    Rails.logger.error "Debug context: #{context.to_json}"
  end
end

Production Patterns

Worker process management requires careful consideration of resource allocation, monitoring, and graceful shutdown procedures. Production deployments typically use process managers like systemd, upstart, or specialized tools like god or monit.

# config/initializers/delayed_job.rb
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 4.hours
Delayed::Worker.default_queue_name = 'default'
Delayed::Worker.delay_jobs = !Rails.env.test?
Delayed::Worker.raise_signal_exceptions = :term

# Configure different worker types
if Rails.env.production?
  # Email workers - high frequency, low resource
  Delayed::Worker.queue_attributes = {
    'emails' => { priority_range: (-10..10) },
    'reports' => { priority_range: (-5..5) },
    'maintenance' => { priority_range: (0..100) }
  }
end

Database optimization becomes critical as job volume increases. Proper indexing, table maintenance, and query optimization prevent performance degradation in high-throughput environments.

# Migration for optimized delayed_jobs table
class OptimizeDelayedJobsTable < ActiveRecord::Migration[6.1]
  def change
    add_index :delayed_jobs, [:priority, :run_at], name: 'delayed_jobs_priority'
    add_index :delayed_jobs, [:queue, :priority, :run_at], name: 'delayed_jobs_queue_priority'
    add_index :delayed_jobs, :failed_at
    add_index :delayed_jobs, :locked_at
    add_index :delayed_jobs, :locked_by
    
    # Regular cleanup job
    execute <<-SQL
      CREATE OR REPLACE FUNCTION cleanup_old_jobs() 
      RETURNS INTEGER AS $$
      DECLARE
        deleted_count INTEGER;
      BEGIN
        DELETE FROM delayed_jobs 
        WHERE (failed_at IS NOT NULL OR (attempts > 0 AND locked_at IS NULL))
        AND created_at < NOW() - INTERVAL '7 days';
        
        GET DIAGNOSTICS deleted_count = ROW_COUNT;
        RETURN deleted_count;
      END;
      $$ LANGUAGE plpgsql;
    SQL
  end
end

Monitoring and alerting infrastructure tracks job processing rates, failure patterns, and queue depths. Integration with monitoring systems enables proactive identification of processing bottlenecks and system issues.

class MonitoringJob
  def self.collect_metrics
    metrics = {
      total_jobs: Delayed::Job.count,
      pending_jobs: Delayed::Job.where(locked_at: nil, failed_at: nil).count,
      failed_jobs: Delayed::Job.where.not(failed_at: nil).count,
      locked_jobs: Delayed::Job.where.not(locked_at: nil).count,
      queue_depths: queue_depths,
      oldest_pending: oldest_pending_job_age
    }
    
    # Send to monitoring service
    StatsD.gauge('delayed_job.total', metrics[:total_jobs])
    StatsD.gauge('delayed_job.pending', metrics[:pending_jobs])
    StatsD.gauge('delayed_job.failed', metrics[:failed_jobs])
    
    # Alert on concerning conditions
    if metrics[:pending_jobs] > 1000
      AlertService.notify('High job queue depth', metrics)
    end
    
    if metrics[:oldest_pending] > 1.hour.to_i
      AlertService.notify('Jobs not processing', metrics)
    end
    
    metrics
  end
  
  private
  
  def self.queue_depths
    Delayed::Job.where(locked_at: nil, failed_at: nil)
                .group(:queue)
                .count
  end
  
  def self.oldest_pending_job_age
    oldest = Delayed::Job.where(locked_at: nil, failed_at: nil)
                         .minimum(:created_at)
    oldest ? Time.current - oldest : 0
  end
end

# Schedule regular monitoring
MonitoringJob.delay(queue: 'monitoring', run_at: 1.minute.from_now).collect_metrics

Deployment strategies must account for job processing during application updates. Graceful worker shutdown and job migration prevent data loss and ensure processing continuity.

# lib/tasks/delayed_job.rake
namespace :delayed_job do
  desc 'Gracefully stop all workers and wait for current jobs to complete'
  task graceful_stop: :environment do
    workers = Delayed::Worker.running_workers
    puts "Found #{workers.count} running workers"
    
    workers.each do |worker|
      puts "Sending TERM signal to worker #{worker.name} (PID: #{worker.pid})"
      Process.kill('TERM', worker.pid.to_i)
    rescue Errno::ESRCH
      puts "Worker #{worker.name} already stopped"
    end
    
    # Wait for workers to finish current jobs
    timeout = 300 # 5 minutes
    start_time = Time.current
    
    while Time.current - start_time < timeout
      running = Delayed::Worker.running_workers.count
      break if running == 0
      
      puts "#{running} workers still running, waiting..."
      sleep 5
    end
    
    puts "All workers stopped gracefully"
  end
  
  desc 'Clean up old completed and failed jobs'
  task cleanup: :environment do
    cutoff = 7.days.ago
    
    completed = Delayed::Job.where('failed_at IS NULL AND locked_at IS NULL AND attempts > 0 AND created_at < ?', cutoff)
    failed = Delayed::Job.where('failed_at IS NOT NULL AND failed_at < ?', cutoff)
    
    puts "Deleting #{completed.count} completed jobs older than #{cutoff}"
    completed.delete_all
    
    puts "Deleting #{failed.count} failed jobs older than #{cutoff}"  
    failed.delete_all
  end
end

Reference

Core Classes and Methods

Class/Method Parameters Returns Description
Delayed::Job.enqueue(job, **opts) job (Object), priority (Integer), run_at (Time), queue (String) Delayed::Job Enqueues job object for processing
Object#delay(**opts) priority (Integer), run_at (Time), queue (String) Delayed::PerformableMethod Creates delayed method proxy
Delayed::Worker.new(**opts) options (Hash) Delayed::Worker Creates new worker instance
Delayed::Worker#start None void Starts worker processing loop
Delayed::Job#reschedule(time = nil) time (Time) Boolean Reschedules failed job for retry
Delayed::Job#reload_handler None Object Deserializes and returns job object

Configuration Options

Option Type Default Description
max_attempts Integer 25 Maximum retry attempts per job
max_run_time Duration 4.hours Maximum execution time per job
sleep_delay Duration 5.seconds Worker polling interval
default_queue_name String nil Default queue for jobs without queue specified
delay_jobs Boolean true Enable/disable job queueing (false runs synchronously)
destroy_failed_jobs Boolean true Automatically delete permanently failed jobs
logger Logger Rails.logger Logger instance for worker output
raise_signal_exceptions Symbol :term Signal handling behavior (:term, :int, :quit)

Job Status Fields

Field Type Description
priority Integer Job priority (lower numbers = higher priority)
attempts Integer Number of execution attempts
handler Text Serialized job object (YAML)
last_error Text Exception message and backtrace from last failure
run_at DateTime Earliest execution time
locked_at DateTime Timestamp when worker claimed job
failed_at DateTime Timestamp of final failure
locked_by String Worker identifier that claimed job
queue String Queue name for job routing

Custom Job Interface

Method Required Parameters Returns Description
perform Yes None Any Main job execution method
max_attempts No None Integer Override global max attempts
reschedule_at(time, attempts) No time (Time), attempts (Integer) Time Custom retry schedule
failure(job) No job (Delayed::Job) void Called when job permanently fails
before(job) No job (Delayed::Job) void Called before job execution
after(job) No job (Delayed::Job) void Called after successful execution
error(job, exception) No job (Delayed::Job), exception (Exception) void Called when job raises exception
display_name No None String Human-readable job description

Queue Configuration

Queue Attribute Type Description
priority_range Range Allowed priority values for queue
max_attempts Integer Queue-specific retry limit
destroy_failed_jobs Boolean Auto-delete failed jobs in queue

Worker Command Line Options

Option Type Description
--queue=name String Process specific queue only
--min-priority=n Integer Process jobs with priority >= n
--max-priority=n Integer Process jobs with priority <= n
--sleep-delay=n Integer Polling interval in seconds
--max-attempts=n Integer Maximum retry attempts
--max-runtime=n Integer Maximum job runtime in seconds
--read-ahead=n Integer Number of jobs to read ahead
--exit-on-complete Boolean Exit worker after processing all jobs

Error Types

Exception Retry Behavior Description
StandardError Yes Generic errors trigger retry
Delayed::DeserializationError No Job data corruption, cannot retry
SystemExit No Intentional process termination
SignalException No Process received shutdown signal
Interrupt No Process interrupted (Ctrl+C)