Overview
Resque creates, processes, and manages background jobs using Redis as the data store. Jobs are defined as Ruby classes with a perform
method and queued for asynchronous execution by worker processes. The system separates job creation from execution, allowing web applications to remain responsive while computationally expensive or time-consuming tasks run in the background.
The architecture consists of three primary components: job classes that define work to be performed, queues that store serialized job data in Redis, and worker processes that pull jobs from queues and execute them. Redis serves as both the job storage mechanism and the communication layer between job producers and consumers.
# Define a job class
class EmailJob
@queue = :email
def self.perform(user_id, message)
user = User.find(user_id)
UserMailer.notification(user, message).deliver_now
end
end
# Queue a job
Resque.enqueue(EmailJob, user.id, "Welcome to our service!")
Resque provides a web interface for monitoring queues, workers, and failed jobs. The interface displays real-time statistics about job processing, queue depths, and worker status. Job data serializes to JSON before storage in Redis, limiting arguments to JSON-compatible types.
# Worker execution
require 'resque'
worker = Resque::Worker.new(:email, :upload, :cleanup)
worker.work
The system handles job failures by moving failed jobs to a separate failed queue with error details and stack traces. Workers can be configured to process specific queues, multiple queues with priority ordering, or all available queues. Resque supports plugins that extend functionality through hooks and callbacks.
Basic Usage
Job classes require two elements: a @queue
class variable specifying the target queue and a perform
class method containing the job logic. Queue names use symbols and should reflect the job category or priority level.
class DataExportJob
@queue = :data_export
def self.perform(export_id, user_id)
export = Export.find(export_id)
user = User.find(user_id)
csv_data = generate_csv(export.parameters)
export.update(status: 'completed', file_path: save_csv(csv_data))
ExportMailer.completed(user, export).deliver_now
end
private
def self.generate_csv(parameters)
# CSV generation logic
end
def self.save_csv(data)
# File storage logic
end
end
Queue jobs using Resque.enqueue
with the job class and any required arguments. Arguments must be JSON-serializable, excluding Ruby objects that cannot convert to JSON. Pass object identifiers rather than objects themselves to avoid serialization issues and stale data problems.
# Correct - pass IDs
Resque.enqueue(DataExportJob, export.id, current_user.id)
# Incorrect - objects don't serialize properly
Resque.enqueue(DataExportJob, export, current_user)
Start worker processes from the command line or programmatically. Workers specify which queues to process and can handle multiple queues with priority ordering. Queue names in the worker definition determine processing order, with leftmost queues receiving highest priority.
# Command line worker
# QUEUE=data_export,email rake resque:work
# Programmatic worker
worker = Resque::Worker.new(:high_priority, :normal, :low_priority)
worker.verbose = true
worker.work(5) # Poll every 5 seconds
Configure Resque to connect to Redis by setting the Redis instance or connection URL. The default configuration connects to localhost:6379
without authentication. Production environments typically require explicit Redis configuration.
# Set Redis connection
Resque.redis = Redis.new(host: 'redis-server', port: 6379, db: 2)
# Or use URL format
Resque.redis = 'redis://username:password@redis-server:6379/2'
# Access current configuration
puts Resque.info
Advanced Usage
Complex job workflows often require job chaining, conditional execution, and dynamic queue assignment. Create parent jobs that spawn child jobs based on processing results or external conditions.
class BatchProcessorJob
@queue = :batch_processing
def self.perform(batch_id, chunk_size = 100)
batch = Batch.find(batch_id)
batch.items.find_in_batches(batch_size: chunk_size) do |chunk|
chunk.each do |item|
# Process items individually or in sub-batches
case item.processing_type
when 'image'
Resque.enqueue_to(:image_processing, ImageProcessJob, item.id)
when 'document'
Resque.enqueue_to(:document_processing, DocumentJob, item.id)
else
Resque.enqueue_to(:default_processing, GenericJob, item.id)
end
end
end
# Queue completion check job
Resque.enqueue_in(30.seconds, BatchCompletionJob, batch_id)
end
end
Implement job priorities using multiple queues and worker configurations. Workers process queues in the order specified, creating a priority system where high-priority jobs preempt lower-priority work.
class PriorityWorkerManager
QUEUE_PRIORITIES = {
critical: [:critical],
high: [:critical, :high],
normal: [:critical, :high, :normal],
background: [:critical, :high, :normal, :background]
}.freeze
def self.spawn_workers(priority_level, count = 1)
queues = QUEUE_PRIORITIES[priority_level]
count.times do |i|
pid = fork do
worker = Resque::Worker.new(*queues)
worker.verbose = true
worker.log "Worker #{i+1} starting on #{queues.join(', ')}"
worker.work
end
Process.detach(pid)
end
end
end
Create job middleware using hooks to add functionality like logging, metrics collection, or job modification. Hooks execute at specific points in the job lifecycle, providing extensibility without modifying job classes directly.
module JobTimingMiddleware
def around_perform_timing(*args)
start_time = Time.current
yield
ensure
duration = Time.current - start_time
job_name = self.class.name
Rails.logger.info "Job #{job_name} completed in #{duration.round(3)}s"
Metrics.increment("jobs.#{job_name.underscore}.duration", duration)
end
end
class TimedJob
extend JobTimingMiddleware
@queue = :timed_operations
def self.perform(operation_id)
operation = Operation.find(operation_id)
operation.execute!
end
end
Implement job deduplication to prevent duplicate jobs from accumulating in queues. Store job signatures in Redis with expiration times to track recently queued jobs and skip duplicates.
module JobDeduplication
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
def perform_with_dedup(*args)
job_key = dedup_key(*args)
return false if Resque.redis.exists(job_key)
Resque.redis.setex(job_key, dedup_expiry, true)
perform(*args)
end
def enqueue_deduplicated(*args)
job_key = dedup_key(*args)
return false if Resque.redis.exists(job_key)
Resque.redis.setex(job_key, dedup_expiry, true)
Resque.enqueue(self, *args)
end
private
def dedup_key(*args)
"dedup:#{name}:#{Digest::SHA1.hexdigest(args.to_json)}"
end
def dedup_expiry
3600 # 1 hour
end
end
end
Error Handling & Debugging
Failed jobs move to the failed queue with complete error information including exception class, message, backtrace, and original job data. Access failed jobs through the Resque web interface or programmatically for analysis and reprocessing.
# Examine failed jobs
failed_jobs = Resque::Failure.all(0, 10) # Get first 10 failed jobs
failed_jobs.each do |job|
puts "Job: #{job['payload']['class']}"
puts "Args: #{job['payload']['args'].inspect}"
puts "Error: #{job['error']}"
puts "Failed at: #{job['failed_at']}"
puts "Backtrace: #{job['backtrace'].join("\n")}"
end
Implement custom error handling within job classes to manage recoverable errors and provide graceful degradation. Catch specific exceptions and decide whether to retry, log, or fail completely.
class RobustApiJob
@queue = :api_calls
def self.perform(endpoint, data, retry_count = 0)
response = make_api_call(endpoint, data)
process_response(response)
rescue Net::TimeoutError, Net::HTTPServerError => e
if retry_count < 3
Resque.enqueue_in(
exponential_backoff(retry_count),
self,
endpoint,
data,
retry_count + 1
)
else
notify_administrators(e, endpoint, data)
raise e # Job will go to failed queue
end
rescue Net::HTTPClientError => e
# Don't retry client errors, but log them
Rails.logger.error "API client error: #{e.message}"
raise e
end
private
def self.exponential_backoff(attempt)
(2 ** attempt) * 60 # 2, 4, 8 minutes
end
end
Create monitoring jobs that check system health and job queue status. These jobs can detect stuck queues, excessive failure rates, or worker problems and trigger alerts.
class QueueMonitorJob
@queue = :monitoring
def self.perform
queue_stats = collect_queue_statistics
worker_stats = collect_worker_statistics
failure_stats = collect_failure_statistics
check_queue_depths(queue_stats)
check_worker_health(worker_stats)
check_failure_rates(failure_stats)
end
private
def self.collect_queue_statistics
Resque.queues.map do |queue|
{
name: queue,
size: Resque.size(queue),
oldest_job: oldest_job_age(queue)
}
end
end
def self.check_queue_depths(stats)
stats.each do |queue_stat|
if queue_stat[:size] > queue_threshold(queue_stat[:name])
alert_queue_backup(queue_stat)
end
if queue_stat[:oldest_job] > age_threshold
alert_stale_jobs(queue_stat)
end
end
end
end
Use job-specific logging to track execution details and aid in debugging. Structured logging helps identify patterns in job failures and performance issues.
class LoggedJob
@queue = :logged_operations
def self.perform(operation_id, context = {})
logger.info(
"Job started",
job: name,
operation_id: operation_id,
context: context
)
begin
result = execute_operation(operation_id, context)
logger.info(
"Job completed successfully",
job: name,
operation_id: operation_id,
result_summary: summarize_result(result)
)
rescue => e
logger.error(
"Job failed",
job: name,
operation_id: operation_id,
error_class: e.class.name,
error_message: e.message,
context: context
)
raise e
end
end
private
def self.logger
@logger ||= Logger.new('log/jobs.log').tap do |log|
log.formatter = proc do |severity, datetime, progname, msg|
JSON.generate(
timestamp: datetime.iso8601,
level: severity,
message: msg
) + "\n"
end
end
end
end
Performance & Memory
Job memory usage accumulates over time in long-running worker processes. Monitor memory consumption and implement worker restart strategies to prevent memory leaks from degrading system performance.
class MemoryAwareWorker < Resque::Worker
MAX_MEMORY_MB = 512
MAX_JOBS_PROCESSED = 1000
def initialize(*queues)
super
@jobs_processed = 0
@start_memory = current_memory_usage
end
def work(interval = 5)
super(interval) do |job|
yield job if block_given?
@jobs_processed += 1
if should_restart?
log "Worker restart triggered - memory: #{current_memory_usage}MB, jobs: #{@jobs_processed}"
shutdown
end
end
end
private
def should_restart?
memory_threshold_exceeded? || job_count_threshold_exceeded?
end
def memory_threshold_exceeded?
current_memory_usage > MAX_MEMORY_MB
end
def job_count_threshold_exceeded?
@jobs_processed >= MAX_JOBS_PROCESSED
end
def current_memory_usage
`ps -o pid,rss -p #{Process.pid}`.split("\n").last.split.last.to_i / 1024
end
end
Optimize job performance by minimizing database queries, caching frequently accessed data, and using efficient algorithms. Profile jobs to identify bottlenecks and optimize accordingly.
class OptimizedBulkUpdateJob
@queue = :bulk_operations
def self.perform(model_name, ids, updates)
model_class = model_name.constantize
# Batch database operations
model_class.transaction do
# Use single query instead of individual updates
model_class.where(id: ids).update_all(updates)
# Batch cache invalidation
cache_keys = ids.map { |id| "#{model_name}:#{id}" }
Rails.cache.delete_multi(cache_keys)
# Trigger callbacks efficiently
if model_class.respond_to?(:after_bulk_update)
model_class.after_bulk_update(ids, updates)
end
end
end
end
Implement job chunking for large dataset processing to control memory usage and provide progress tracking. Split large operations into manageable pieces that can be processed independently.
class ChunkedDataProcessorJob
@queue = :data_processing
CHUNK_SIZE = 1000
def self.perform(dataset_id, offset = 0, chunk_size = CHUNK_SIZE)
dataset = Dataset.find(dataset_id)
records = dataset.records.limit(chunk_size).offset(offset)
return if records.empty?
# Process current chunk
results = process_records(records)
update_progress(dataset, offset, results)
# Queue next chunk
if records.size == chunk_size
Resque.enqueue(self, dataset_id, offset + chunk_size, chunk_size)
else
finalize_processing(dataset)
end
end
private
def self.process_records(records)
records.map do |record|
# Process individual record
transform_record(record)
end
end
def self.update_progress(dataset, offset, results)
processed_count = offset + results.size
total_count = dataset.total_record_count
progress_percentage = (processed_count.to_f / total_count * 100).round(2)
dataset.update(
processed_count: processed_count,
progress_percentage: progress_percentage,
last_processed_at: Time.current
)
end
end
Configure Redis appropriately for job queue workloads. Use persistent storage configurations and monitor Redis memory usage to prevent job loss during system failures.
# Redis configuration for job queues
redis_config = {
host: ENV['REDIS_HOST'],
port: ENV['REDIS_PORT'],
db: ENV['REDIS_DB'] || 0,
driver: :hiredis, # Faster C-based Redis driver
# Connection pool settings for multi-threaded workers
pool: {
size: ENV.fetch('REDIS_POOL_SIZE', 5).to_i,
timeout: ENV.fetch('REDIS_POOL_TIMEOUT', 5).to_i
},
# Timeouts and retries
connect_timeout: 30,
read_timeout: 30,
write_timeout: 30,
reconnect_attempts: 3,
reconnect_delay: 1.5
}
Resque.redis = Redis.new(redis_config)
Production Patterns
Deploy Resque workers as separate processes or containers, isolated from web application processes. Use process managers like systemd, supervisor, or container orchestrators to maintain worker availability and handle restarts.
# systemd service configuration for Resque workers
# /etc/systemd/system/resque-worker@.service
class ResqueWorkerManager
def self.generate_systemd_config(queue_name, worker_count = 1)
config = <<~SYSTEMD
[Unit]
Description=Resque Worker for #{queue_name} queue (%i)
After=redis.service
[Service]
Type=simple
User=deploy
WorkingDirectory=/var/www/app/current
Environment=RAILS_ENV=production
Environment=QUEUE=#{queue_name}
Environment=PIDFILE=/var/www/app/shared/tmp/pids/resque_#{queue_name}_%i.pid
ExecStart=/usr/local/bin/bundle exec rake resque:work
ExecReload=/bin/kill -USR2 $MAINPID
Restart=always
RestartSec=10
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=resque-#{queue_name}-%i
[Install]
WantedBy=multi-user.target
SYSTEMD
puts config
end
end
Implement comprehensive monitoring using job metrics, queue depth tracking, and worker health checks. Monitor both system-level metrics and business-specific job outcomes.
class ResqueMetrics
include Singleton
def initialize
@statsd = Statsd.new('localhost', 8125)
@last_stats = {}
end
def collect_and_report
current_stats = gather_statistics
calculate_rates(current_stats)
report_metrics(current_stats)
@last_stats = current_stats
end
private
def gather_statistics
{
queues: collect_queue_stats,
workers: collect_worker_stats,
failed_jobs: Resque::Failure.count,
processed_jobs: Resque::Stat.processed,
redis_info: Resque.info
}
end
def collect_queue_stats
Resque.queues.map do |queue|
{
name: queue,
size: Resque.size(queue),
latency: calculate_queue_latency(queue)
}
end
end
def report_metrics(stats)
# Queue metrics
stats[:queues].each do |queue|
@statsd.gauge("resque.queue.#{queue[:name]}.size", queue[:size])
@statsd.gauge("resque.queue.#{queue[:name]}.latency", queue[:latency])
end
# Worker metrics
@statsd.gauge('resque.workers.total', stats[:workers][:total])
@statsd.gauge('resque.workers.working', stats[:workers][:working])
# Job metrics
@statsd.gauge('resque.jobs.failed', stats[:failed_jobs])
@statsd.gauge('resque.jobs.processed', stats[:processed_jobs])
end
end
Set up job result tracking and business metric collection. Track not just technical metrics but also business outcomes and job success rates for different job types.
class BusinessMetricsJob
@queue = :metrics_collection
def self.perform(metric_type, time_period)
case metric_type
when 'email_delivery'
collect_email_metrics(time_period)
when 'data_export'
collect_export_metrics(time_period)
when 'image_processing'
collect_processing_metrics(time_period)
end
end
private
def self.collect_email_metrics(period)
start_time = period.hours.ago
sent_count = EmailJob.completed.where(created_at: start_time..).count
failed_count = EmailJob.failed.where(created_at: start_time..).count
success_rate = sent_count.to_f / (sent_count + failed_count) * 100
Metrics.gauge('business.email.success_rate', success_rate)
Metrics.count('business.email.sent', sent_count)
Metrics.count('business.email.failed', failed_count)
end
def self.collect_processing_metrics(period)
jobs = ImageProcessJob.where(created_at: period.hours.ago..)
avg_processing_time = jobs.completed.average(:processing_duration) || 0
throughput = jobs.completed.count / period.to_f # jobs per hour
Metrics.gauge('business.image_processing.avg_duration', avg_processing_time)
Metrics.gauge('business.image_processing.throughput', throughput)
end
end
Implement graceful shutdown handling for worker processes to ensure jobs complete properly during deployments or system maintenance.
class GracefulWorker < Resque::Worker
def initialize(*queues)
super
register_signal_handlers
@shutdown_requested = false
end
def work(interval = 5)
startup
until @shutdown_requested
if job = reserve
working_on(job)
perform(job)
done_working
else
procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
sleep(interval)
end
end
shutdown_gracefully
end
private
def register_signal_handlers
trap('TERM') { initiate_shutdown }
trap('INT') { initiate_shutdown }
trap('USR2') { initiate_restart }
end
def initiate_shutdown
log "Shutdown signal received, finishing current job..."
@shutdown_requested = true
end
def shutdown_gracefully
log "Worker shutting down gracefully"
unregister_worker
Resque.redis.quit
end
def initiate_restart
log "Restart signal received"
@shutdown_requested = true
# After shutdown, process manager will restart worker
end
end
Reference
Core Classes and Methods
Class/Method | Parameters | Returns | Description |
---|---|---|---|
Resque.enqueue(klass, *args) |
klass (Class), *args (JSON-serializable) |
Boolean |
Queues job with specified arguments |
Resque.enqueue_to(queue, klass, *args) |
queue (Symbol), klass (Class), *args |
Boolean |
Queues job to specific queue |
Resque.enqueue_in(seconds, klass, *args) |
seconds (Integer), klass (Class), *args |
Boolean |
Schedules job for future execution |
Resque.dequeue(klass, *args) |
klass (Class), *args |
Integer |
Removes matching jobs from queue |
Resque.size(queue) |
queue (Symbol) |
Integer |
Returns number of jobs in queue |
Resque.peek(queue, start, count) |
queue (Symbol), start (Integer), count (Integer) |
Array |
Returns jobs from queue without removing |
Resque::Worker.new(*queues) |
*queues (Symbols) |
Worker |
Creates worker for specified queues |
Worker#work(interval) |
interval (Integer) |
nil |
Starts worker with polling interval |
Worker#shutdown |
None | nil |
Gracefully shuts down worker |
Resque::Failure.all(offset, limit) |
offset (Integer), limit (Integer) |
Array |
Returns failed job records |
Resque::Failure.clear |
None | nil |
Removes all failed jobs |
Job Class Requirements
Component | Type | Required | Description |
---|---|---|---|
@queue |
Symbol | Yes | Class variable specifying target queue |
perform |
Class method | Yes | Contains job execution logic |
Arguments | JSON-serializable | Yes | All job arguments must serialize to JSON |
Queue Operations
Method | Purpose | Example |
---|---|---|
Resque.queues |
List all queues | [:high, :normal, :low] |
Resque.remove_queue(queue) |
Delete queue and jobs | Resque.remove_queue(:old_queue) |
Resque.queue_size(queue) |
Get queue depth | Resque.queue_size(:email) |
Worker Configuration
Property | Type | Default | Description |
---|---|---|---|
verbose |
Boolean | false |
Enable detailed logging |
very_verbose |
Boolean | false |
Enable debug-level logging |
term_timeout |
Integer | 4 |
Seconds to wait before SIGKILL |
term_child |
Boolean | false |
Send TERM to job process |
run_at_exit_hooks |
Boolean | false |
Run at_exit hooks on shutdown |
Redis Configuration
Option | Type | Description | Example |
---|---|---|---|
host |
String | Redis server hostname | 'redis.example.com' |
port |
Integer | Redis server port | 6379 |
db |
Integer | Redis database number | 0 |
namespace |
String | Key prefix for Resque data | 'myapp:jobs' |
password |
String | Redis authentication password | 'secret123' |
Hook Methods
Hook | Timing | Parameters | Use Case |
---|---|---|---|
before_perform |
Before job execution | *args |
Validation, logging, setup |
after_perform |
After successful job | *args |
Cleanup, metrics, notifications |
around_perform |
Wraps job execution | *args |
Timing, error handling, transactions |
on_failure |
After job failure | exception, *args |
Error logging, alerts, cleanup |
Error Classes
Exception | Parent Class | Description |
---|---|---|
Resque::TermException |
SignalException |
Worker received termination signal |
Resque::DirtyExit |
StandardError |
Job process exited uncleanly |
Resque::PruneDeadWorkerDirtyExit |
StandardError |
Worker cleanup failure |
Web Interface Routes
Path | Purpose |
---|---|
/ |
Overview dashboard |
/queues |
Queue listing and management |
/workers |
Worker status and management |
/failed |
Failed job inspection and retry |
/stats |
System statistics and graphs |
Environment Variables
Variable | Default | Description |
---|---|---|
QUEUE |
None | Comma-separated queue names for worker |
QUEUES |
None | Alternative to QUEUE variable |
REDIS_URL |
redis://localhost:6379 |
Redis connection URL |
RESQUE_WEB_HTTP_BASIC_AUTH |
None | Username:password for web interface |
INTERVAL |
5 |
Worker polling interval in seconds |
PIDFILE |
None | Path for worker process ID file |