Overview
Delayed Job provides asynchronous job processing for Ruby applications through database-backed queues. The gem stores jobs as records in a database table and processes them through worker processes that poll for available work. Jobs execute in separate processes from the main application, enabling long-running tasks without blocking web requests.
The core architecture centers around the Delayed::Job
model class and Delayed::Worker
processing engine. Jobs get serialized into the database with YAML, including the target object, method name, and arguments. Workers continuously poll the jobs table for available work, deserialize job data, and execute the specified methods.
# Enqueue a job to run later
user = User.find(123)
user.delay.send_welcome_email
# Create a job with priority and run time
Delayed::Job.enqueue(EmailJob.new(user.id), priority: 10, run_at: 2.hours.from_now)
# Check job status
job = Delayed::Job.last
job.failed_at # => nil if successful
job.attempts # => number of execution attempts
Delayed Job integrates with ActiveRecord models through the delay
method proxy, which converts any method call into an asynchronous job. The system handles job serialization, worker management, retry logic, and failure tracking automatically.
The database table structure includes columns for job handler (serialized YAML), priority, attempts, run time, and error tracking. Workers claim jobs atomically using database locks to prevent race conditions in multi-worker environments.
# Database columns for delayed_jobs table
# id, priority, attempts, handler, last_error, run_at, locked_at, failed_at, locked_by, queue, created_at, updated_at
# Custom job class
class ReportGenerationJob
def initialize(report_id)
@report_id = report_id
end
def perform
report = Report.find(@report_id)
report.generate_pdf
report.send_to_recipients
end
end
Basic Usage
The delay
method provides the simplest way to defer method execution on any object. This approach works with ActiveRecord models, plain Ruby objects, and class methods. The delay proxy captures the method call and converts it into a serialized job entry.
# Defer instance method execution
user = User.find(1)
user.delay.calculate_statistics
user.delay.send_notification("Account updated")
# Defer class method execution
User.delay.cleanup_inactive_accounts
Mailer.delay.send_weekly_digest
# Schedule jobs for future execution
user.delay(run_at: 1.day.from_now).expire_trial_account
user.delay(priority: 5).process_urgent_request
The Delayed::Job.enqueue
method accepts job objects that respond to perform
. This approach provides more control over job behavior and enables custom job classes with complex initialization logic.
class DataImportJob
def initialize(file_path, user_id)
@file_path = file_path
@user_id = user_id
end
def perform
user = User.find(@user_id)
ImportService.new(@file_path, user).execute
end
end
# Enqueue custom job instances
Delayed::Job.enqueue(DataImportJob.new("/tmp/import.csv", current_user.id))
Delayed::Job.enqueue(DataImportJob.new("/tmp/large.csv", current_user.id), priority: 0, queue: 'imports')
Job prioritization uses numeric values where lower numbers indicate higher priority. The default priority is 0, with negative numbers for urgent jobs and positive numbers for lower priority work.
# Priority examples
Delayed::Job.enqueue(UrgentJob.new, priority: -10) # Highest priority
Delayed::Job.enqueue(NormalJob.new, priority: 0) # Default priority
Delayed::Job.enqueue(CleanupJob.new, priority: 100) # Lowest priority
# Queue-based job organization
user.delay(queue: 'emails').send_welcome_message
user.delay(queue: 'reports').generate_monthly_report
user.delay(queue: 'maintenance').cleanup_old_data
Worker processes handle job execution through the Delayed::Worker
class. Workers poll the database for available jobs, execute them, and update job records with results or error information.
# Start a worker programmatically
worker = Delayed::Worker.new
worker.name = "worker-#{Process.pid}"
worker.start
# Configure worker behavior
worker.max_attempts = 5
worker.max_run_time = 4.hours
worker.sleep_delay = 10.seconds
Advanced Usage
Custom job classes enable complex initialization patterns, error handling, and performance optimizations. Jobs can implement additional methods beyond perform
to control retry behavior, error handling, and resource cleanup.
class BatchProcessingJob
def initialize(batch_id, options = {})
@batch_id = batch_id
@chunk_size = options[:chunk_size] || 1000
@retry_failed = options[:retry_failed] || false
end
def perform
batch = ProcessingBatch.find(@batch_id)
batch.records.find_in_batches(batch_size: @chunk_size) do |chunk|
process_chunk(chunk)
end
batch.mark_completed!
end
# Control maximum retry attempts
def max_attempts
3
end
# Custom failure handling
def failure(job)
batch = ProcessingBatch.find(@batch_id)
batch.mark_failed!(job.last_error)
AdminMailer.delay.job_failure_notification(@batch_id, job.last_error)
end
# Reschedule logic for temporary failures
def reschedule_at(time, attempts)
case attempts
when 1 then 1.minute.from_now
when 2 then 10.minutes.from_now
else 1.hour.from_now
end
end
private
def process_chunk(records)
records.each { |record| record.process! }
end
end
Job hooks provide lifecycle callbacks for logging, monitoring, and cleanup operations. These methods execute at specific points during job processing and enable custom behavior without modifying core worker logic.
class MonitoredJob
def initialize(task_id)
@task_id = task_id
end
def perform
task = Task.find(@task_id)
task.execute_complex_operation
end
# Called before job execution
def before(job)
Rails.logger.info "Starting job #{job.id} for task #{@task_id}"
@start_time = Time.current
end
# Called after successful execution
def after(job)
duration = Time.current - @start_time
Rails.logger.info "Job #{job.id} completed in #{duration.round(2)} seconds"
Metrics.record_job_duration(duration)
end
# Called when job encounters errors
def error(job, exception)
Rails.logger.error "Job #{job.id} failed: #{exception.message}"
ErrorTracker.notify(exception, job_id: job.id, task_id: @task_id)
end
# Called when job reaches max attempts
def failure(job)
task = Task.find(@task_id)
task.mark_failed!
AdminMailer.delay(priority: -5).critical_job_failure(@task_id, job.last_error)
end
end
Queue-based job routing enables worker specialization and resource allocation. Different worker processes can handle specific queues, allowing separation of CPU-intensive, I/O-bound, and time-sensitive operations.
# Configure queue-specific workers
class EmailWorker < Delayed::Worker
def initialize
super
@queues = ['emails', 'notifications']
@sleep_delay = 5.seconds
end
end
class ReportWorker < Delayed::Worker
def initialize
super
@queues = ['reports']
@sleep_delay = 30.seconds
@max_run_time = 2.hours
end
end
# Enqueue jobs to specific queues
User.delay(queue: 'emails').send_password_reset(user_id)
Report.delay(queue: 'reports').generate_annual_summary(year)
Job serialization customization enables handling of complex objects and optimizing database storage. Custom serialization reduces payload size and handles objects that don't serialize cleanly with YAML.
class OptimizedJob
def initialize(model_class, model_id, action, options = {})
@model_class = model_class.to_s
@model_id = model_id
@action = action.to_s
@options = options
end
def perform
model = @model_class.constantize.find(@model_id)
model.public_send(@action, @options)
end
# Display name for admin interface
def display_name
"#{@model_class}##{@action} for ID #{@model_id}"
end
end
# Usage with complex objects
OptimizedJob.new(User, user.id, :send_welcome_email, template: 'premium')
Error Handling & Debugging
Delayed Job implements automatic retry logic with exponential backoff for failed jobs. The system tracks attempt counts, error messages, and failure timestamps to provide comprehensive error information and prevent infinite retry loops.
# Configure global retry behavior
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 4.hours
Delayed::Worker.delay_jobs = !Rails.env.test?
# Custom retry logic in job classes
class NetworkServiceJob
def initialize(endpoint, data)
@endpoint = endpoint
@data = data
end
def perform
response = HTTPService.post(@endpoint, @data)
handle_response(response)
rescue Net::TimeoutError, Net::HTTPError => e
# Re-raise for automatic retry
raise e
rescue JSON::ParserError => e
# Don't retry parse errors
Rails.logger.error "JSON parse failed for #{@endpoint}: #{e.message}"
return false
end
def max_attempts
5
end
def reschedule_at(time, attempts)
# Exponential backoff with jitter
delay = (attempts ** 2) * 30 + rand(30)
time + delay.seconds
end
end
Failed job investigation requires examining job records, error messages, and execution context. The database stores complete error information including backtraces and attempt history.
# Query failed jobs
failed_jobs = Delayed::Job.where.not(failed_at: nil)
recent_failures = Delayed::Job.where('failed_at > ?', 1.hour.ago)
# Examine specific failure
job = Delayed::Job.find(123)
puts job.last_error # Exception message and backtrace
puts job.attempts # Number of execution attempts
puts job.failed_at # Timestamp of final failure
puts job.handler # Serialized job data (YAML)
# Inspect job payload
begin
job_object = YAML.load(job.handler)
puts job_object.class
puts job_object.instance_variables
rescue => e
puts "Deserialization failed: #{e.message}"
end
Exception handling within jobs determines retry behavior and error reporting. Raising exceptions triggers the retry mechanism, while returning false marks jobs as failed without retries.
class DataProcessingJob
def initialize(file_path)
@file_path = file_path
end
def perform
validate_file_exists!
process_data_file
rescue FileNotFoundError
# Don't retry missing files
Rails.logger.error "File not found: #{@file_path}"
return false
rescue DataValidationError => e
# Retry validation errors up to max attempts
Rails.logger.warn "Validation failed (attempt #{current_attempts}): #{e.message}"
raise e if should_retry_validation?
return false
rescue StandardError => e
# Log unexpected errors and retry
Rails.logger.error "Unexpected error processing #{@file_path}: #{e.message}"
ErrorTracker.notify(e, file_path: @file_path)
raise e
end
private
def current_attempts
# Access current job context (Rails 3.2+)
Thread.current[:delayed_job]&.attempts || 0
end
def should_retry_validation?
current_attempts < 2
end
def validate_file_exists!
raise FileNotFoundError unless File.exist?(@file_path)
end
end
Job debugging often requires reproducing execution context and examining serialization issues. Creating debug methods and logging strategies helps identify problems in complex job hierarchies.
class DebuggableJob
def initialize(params)
@params = params
@debug = Rails.env.development?
end
def perform
log_debug "Starting job with params: #{@params.inspect}"
result = execute_main_logic
log_debug "Job completed successfully with result: #{result.inspect}"
result
rescue => e
log_error "Job failed with exception: #{e.class} - #{e.message}"
log_error "Backtrace: #{e.backtrace.join("\n")}" if @debug
log_error "Job params: #{@params.inspect}"
# Capture additional context
capture_debug_context(e)
raise e
end
private
def log_debug(message)
Rails.logger.debug "[#{self.class}] #{message}" if @debug
end
def log_error(message)
Rails.logger.error "[#{self.class}] #{message}"
end
def capture_debug_context(exception)
context = {
job_class: self.class.name,
params: @params,
ruby_version: RUBY_VERSION,
rails_version: Rails.version,
timestamp: Time.current.iso8601
}
Rails.logger.error "Debug context: #{context.to_json}"
end
end
Production Patterns
Worker process management requires careful consideration of resource allocation, monitoring, and graceful shutdown procedures. Production deployments typically use process managers like systemd, upstart, or specialized tools like god or monit.
# config/initializers/delayed_job.rb
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 4.hours
Delayed::Worker.default_queue_name = 'default'
Delayed::Worker.delay_jobs = !Rails.env.test?
Delayed::Worker.raise_signal_exceptions = :term
# Configure different worker types
if Rails.env.production?
# Email workers - high frequency, low resource
Delayed::Worker.queue_attributes = {
'emails' => { priority_range: (-10..10) },
'reports' => { priority_range: (-5..5) },
'maintenance' => { priority_range: (0..100) }
}
end
Database optimization becomes critical as job volume increases. Proper indexing, table maintenance, and query optimization prevent performance degradation in high-throughput environments.
# Migration for optimized delayed_jobs table
class OptimizeDelayedJobsTable < ActiveRecord::Migration[6.1]
def change
add_index :delayed_jobs, [:priority, :run_at], name: 'delayed_jobs_priority'
add_index :delayed_jobs, [:queue, :priority, :run_at], name: 'delayed_jobs_queue_priority'
add_index :delayed_jobs, :failed_at
add_index :delayed_jobs, :locked_at
add_index :delayed_jobs, :locked_by
# Regular cleanup job
execute <<-SQL
CREATE OR REPLACE FUNCTION cleanup_old_jobs()
RETURNS INTEGER AS $$
DECLARE
deleted_count INTEGER;
BEGIN
DELETE FROM delayed_jobs
WHERE (failed_at IS NOT NULL OR (attempts > 0 AND locked_at IS NULL))
AND created_at < NOW() - INTERVAL '7 days';
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;
SQL
end
end
Monitoring and alerting infrastructure tracks job processing rates, failure patterns, and queue depths. Integration with monitoring systems enables proactive identification of processing bottlenecks and system issues.
class MonitoringJob
def self.collect_metrics
metrics = {
total_jobs: Delayed::Job.count,
pending_jobs: Delayed::Job.where(locked_at: nil, failed_at: nil).count,
failed_jobs: Delayed::Job.where.not(failed_at: nil).count,
locked_jobs: Delayed::Job.where.not(locked_at: nil).count,
queue_depths: queue_depths,
oldest_pending: oldest_pending_job_age
}
# Send to monitoring service
StatsD.gauge('delayed_job.total', metrics[:total_jobs])
StatsD.gauge('delayed_job.pending', metrics[:pending_jobs])
StatsD.gauge('delayed_job.failed', metrics[:failed_jobs])
# Alert on concerning conditions
if metrics[:pending_jobs] > 1000
AlertService.notify('High job queue depth', metrics)
end
if metrics[:oldest_pending] > 1.hour.to_i
AlertService.notify('Jobs not processing', metrics)
end
metrics
end
private
def self.queue_depths
Delayed::Job.where(locked_at: nil, failed_at: nil)
.group(:queue)
.count
end
def self.oldest_pending_job_age
oldest = Delayed::Job.where(locked_at: nil, failed_at: nil)
.minimum(:created_at)
oldest ? Time.current - oldest : 0
end
end
# Schedule regular monitoring
MonitoringJob.delay(queue: 'monitoring', run_at: 1.minute.from_now).collect_metrics
Deployment strategies must account for job processing during application updates. Graceful worker shutdown and job migration prevent data loss and ensure processing continuity.
# lib/tasks/delayed_job.rake
namespace :delayed_job do
desc 'Gracefully stop all workers and wait for current jobs to complete'
task graceful_stop: :environment do
workers = Delayed::Worker.running_workers
puts "Found #{workers.count} running workers"
workers.each do |worker|
puts "Sending TERM signal to worker #{worker.name} (PID: #{worker.pid})"
Process.kill('TERM', worker.pid.to_i)
rescue Errno::ESRCH
puts "Worker #{worker.name} already stopped"
end
# Wait for workers to finish current jobs
timeout = 300 # 5 minutes
start_time = Time.current
while Time.current - start_time < timeout
running = Delayed::Worker.running_workers.count
break if running == 0
puts "#{running} workers still running, waiting..."
sleep 5
end
puts "All workers stopped gracefully"
end
desc 'Clean up old completed and failed jobs'
task cleanup: :environment do
cutoff = 7.days.ago
completed = Delayed::Job.where('failed_at IS NULL AND locked_at IS NULL AND attempts > 0 AND created_at < ?', cutoff)
failed = Delayed::Job.where('failed_at IS NOT NULL AND failed_at < ?', cutoff)
puts "Deleting #{completed.count} completed jobs older than #{cutoff}"
completed.delete_all
puts "Deleting #{failed.count} failed jobs older than #{cutoff}"
failed.delete_all
end
end
Reference
Core Classes and Methods
Class/Method | Parameters | Returns | Description |
---|---|---|---|
Delayed::Job.enqueue(job, **opts) |
job (Object), priority (Integer), run_at (Time), queue (String) |
Delayed::Job |
Enqueues job object for processing |
Object#delay(**opts) |
priority (Integer), run_at (Time), queue (String) |
Delayed::PerformableMethod |
Creates delayed method proxy |
Delayed::Worker.new(**opts) |
options (Hash) |
Delayed::Worker |
Creates new worker instance |
Delayed::Worker#start |
None | void |
Starts worker processing loop |
Delayed::Job#reschedule(time = nil) |
time (Time) |
Boolean |
Reschedules failed job for retry |
Delayed::Job#reload_handler |
None | Object |
Deserializes and returns job object |
Configuration Options
Option | Type | Default | Description |
---|---|---|---|
max_attempts |
Integer | 25 | Maximum retry attempts per job |
max_run_time |
Duration | 4.hours | Maximum execution time per job |
sleep_delay |
Duration | 5.seconds | Worker polling interval |
default_queue_name |
String | nil | Default queue for jobs without queue specified |
delay_jobs |
Boolean | true | Enable/disable job queueing (false runs synchronously) |
destroy_failed_jobs |
Boolean | true | Automatically delete permanently failed jobs |
logger |
Logger | Rails.logger | Logger instance for worker output |
raise_signal_exceptions |
Symbol | :term | Signal handling behavior (:term, :int, :quit) |
Job Status Fields
Field | Type | Description |
---|---|---|
priority |
Integer | Job priority (lower numbers = higher priority) |
attempts |
Integer | Number of execution attempts |
handler |
Text | Serialized job object (YAML) |
last_error |
Text | Exception message and backtrace from last failure |
run_at |
DateTime | Earliest execution time |
locked_at |
DateTime | Timestamp when worker claimed job |
failed_at |
DateTime | Timestamp of final failure |
locked_by |
String | Worker identifier that claimed job |
queue |
String | Queue name for job routing |
Custom Job Interface
Method | Required | Parameters | Returns | Description |
---|---|---|---|---|
perform |
Yes | None | Any |
Main job execution method |
max_attempts |
No | None | Integer |
Override global max attempts |
reschedule_at(time, attempts) |
No | time (Time), attempts (Integer) |
Time |
Custom retry schedule |
failure(job) |
No | job (Delayed::Job) |
void |
Called when job permanently fails |
before(job) |
No | job (Delayed::Job) |
void |
Called before job execution |
after(job) |
No | job (Delayed::Job) |
void |
Called after successful execution |
error(job, exception) |
No | job (Delayed::Job), exception (Exception) |
void |
Called when job raises exception |
display_name |
No | None | String |
Human-readable job description |
Queue Configuration
Queue Attribute | Type | Description |
---|---|---|
priority_range |
Range | Allowed priority values for queue |
max_attempts |
Integer | Queue-specific retry limit |
destroy_failed_jobs |
Boolean | Auto-delete failed jobs in queue |
Worker Command Line Options
Option | Type | Description |
---|---|---|
--queue=name |
String | Process specific queue only |
--min-priority=n |
Integer | Process jobs with priority >= n |
--max-priority=n |
Integer | Process jobs with priority <= n |
--sleep-delay=n |
Integer | Polling interval in seconds |
--max-attempts=n |
Integer | Maximum retry attempts |
--max-runtime=n |
Integer | Maximum job runtime in seconds |
--read-ahead=n |
Integer | Number of jobs to read ahead |
--exit-on-complete |
Boolean | Exit worker after processing all jobs |
Error Types
Exception | Retry Behavior | Description |
---|---|---|
StandardError |
Yes | Generic errors trigger retry |
Delayed::DeserializationError |
No | Job data corruption, cannot retry |
SystemExit |
No | Intentional process termination |
SignalException |
No | Process received shutdown signal |
Interrupt |
No | Process interrupted (Ctrl+C) |