CrackedRuby logo

CrackedRuby

Resque

Overview

Resque creates, processes, and manages background jobs using Redis as the data store. Jobs are defined as Ruby classes with a perform method and queued for asynchronous execution by worker processes. The system separates job creation from execution, allowing web applications to remain responsive while computationally expensive or time-consuming tasks run in the background.

The architecture consists of three primary components: job classes that define work to be performed, queues that store serialized job data in Redis, and worker processes that pull jobs from queues and execute them. Redis serves as both the job storage mechanism and the communication layer between job producers and consumers.

# Define a job class
class EmailJob
  @queue = :email

  def self.perform(user_id, message)
    user = User.find(user_id)
    UserMailer.notification(user, message).deliver_now
  end
end

# Queue a job
Resque.enqueue(EmailJob, user.id, "Welcome to our service!")

Resque provides a web interface for monitoring queues, workers, and failed jobs. The interface displays real-time statistics about job processing, queue depths, and worker status. Job data serializes to JSON before storage in Redis, limiting arguments to JSON-compatible types.

# Worker execution
require 'resque'
worker = Resque::Worker.new(:email, :upload, :cleanup)
worker.work

The system handles job failures by moving failed jobs to a separate failed queue with error details and stack traces. Workers can be configured to process specific queues, multiple queues with priority ordering, or all available queues. Resque supports plugins that extend functionality through hooks and callbacks.

Basic Usage

Job classes require two elements: a @queue class variable specifying the target queue and a perform class method containing the job logic. Queue names use symbols and should reflect the job category or priority level.

class DataExportJob
  @queue = :data_export

  def self.perform(export_id, user_id)
    export = Export.find(export_id)
    user = User.find(user_id)

    csv_data = generate_csv(export.parameters)
    export.update(status: 'completed', file_path: save_csv(csv_data))

    ExportMailer.completed(user, export).deliver_now
  end

  private

  def self.generate_csv(parameters)
    # CSV generation logic
  end

  def self.save_csv(data)
    # File storage logic
  end
end

Queue jobs using Resque.enqueue with the job class and any required arguments. Arguments must be JSON-serializable, excluding Ruby objects that cannot convert to JSON. Pass object identifiers rather than objects themselves to avoid serialization issues and stale data problems.

# Correct - pass IDs
Resque.enqueue(DataExportJob, export.id, current_user.id)

# Incorrect - objects don't serialize properly
Resque.enqueue(DataExportJob, export, current_user)

Start worker processes from the command line or programmatically. Workers specify which queues to process and can handle multiple queues with priority ordering. Queue names in the worker definition determine processing order, with leftmost queues receiving highest priority.

# Command line worker
# QUEUE=data_export,email rake resque:work

# Programmatic worker
worker = Resque::Worker.new(:high_priority, :normal, :low_priority)
worker.verbose = true
worker.work(5) # Poll every 5 seconds

Configure Resque to connect to Redis by setting the Redis instance or connection URL. The default configuration connects to localhost:6379 without authentication. Production environments typically require explicit Redis configuration.

# Set Redis connection
Resque.redis = Redis.new(host: 'redis-server', port: 6379, db: 2)

# Or use URL format
Resque.redis = 'redis://username:password@redis-server:6379/2'

# Access current configuration
puts Resque.info

Advanced Usage

Complex job workflows often require job chaining, conditional execution, and dynamic queue assignment. Create parent jobs that spawn child jobs based on processing results or external conditions.

class BatchProcessorJob
  @queue = :batch_processing

  def self.perform(batch_id, chunk_size = 100)
    batch = Batch.find(batch_id)

    batch.items.find_in_batches(batch_size: chunk_size) do |chunk|
      chunk.each do |item|
        # Process items individually or in sub-batches
        case item.processing_type
        when 'image'
          Resque.enqueue_to(:image_processing, ImageProcessJob, item.id)
        when 'document'
          Resque.enqueue_to(:document_processing, DocumentJob, item.id)
        else
          Resque.enqueue_to(:default_processing, GenericJob, item.id)
        end
      end
    end

    # Queue completion check job
    Resque.enqueue_in(30.seconds, BatchCompletionJob, batch_id)
  end
end

Implement job priorities using multiple queues and worker configurations. Workers process queues in the order specified, creating a priority system where high-priority jobs preempt lower-priority work.

class PriorityWorkerManager
  QUEUE_PRIORITIES = {
    critical: [:critical],
    high: [:critical, :high],
    normal: [:critical, :high, :normal],
    background: [:critical, :high, :normal, :background]
  }.freeze

  def self.spawn_workers(priority_level, count = 1)
    queues = QUEUE_PRIORITIES[priority_level]

    count.times do |i|
      pid = fork do
        worker = Resque::Worker.new(*queues)
        worker.verbose = true
        worker.log "Worker #{i+1} starting on #{queues.join(', ')}"
        worker.work
      end

      Process.detach(pid)
    end
  end
end

Create job middleware using hooks to add functionality like logging, metrics collection, or job modification. Hooks execute at specific points in the job lifecycle, providing extensibility without modifying job classes directly.

module JobTimingMiddleware
  def around_perform_timing(*args)
    start_time = Time.current
    yield
  ensure
    duration = Time.current - start_time
    job_name = self.class.name
    Rails.logger.info "Job #{job_name} completed in #{duration.round(3)}s"
    Metrics.increment("jobs.#{job_name.underscore}.duration", duration)
  end
end

class TimedJob
  extend JobTimingMiddleware
  @queue = :timed_operations

  def self.perform(operation_id)
    operation = Operation.find(operation_id)
    operation.execute!
  end
end

Implement job deduplication to prevent duplicate jobs from accumulating in queues. Store job signatures in Redis with expiration times to track recently queued jobs and skip duplicates.

module JobDeduplication
  def self.included(base)
    base.extend(ClassMethods)
  end

  module ClassMethods
    def perform_with_dedup(*args)
      job_key = dedup_key(*args)

      return false if Resque.redis.exists(job_key)

      Resque.redis.setex(job_key, dedup_expiry, true)
      perform(*args)
    end

    def enqueue_deduplicated(*args)
      job_key = dedup_key(*args)

      return false if Resque.redis.exists(job_key)

      Resque.redis.setex(job_key, dedup_expiry, true)
      Resque.enqueue(self, *args)
    end

    private

    def dedup_key(*args)
      "dedup:#{name}:#{Digest::SHA1.hexdigest(args.to_json)}"
    end

    def dedup_expiry
      3600 # 1 hour
    end
  end
end

Error Handling & Debugging

Failed jobs move to the failed queue with complete error information including exception class, message, backtrace, and original job data. Access failed jobs through the Resque web interface or programmatically for analysis and reprocessing.

# Examine failed jobs
failed_jobs = Resque::Failure.all(0, 10) # Get first 10 failed jobs

failed_jobs.each do |job|
  puts "Job: #{job['payload']['class']}"
  puts "Args: #{job['payload']['args'].inspect}"
  puts "Error: #{job['error']}"
  puts "Failed at: #{job['failed_at']}"
  puts "Backtrace: #{job['backtrace'].join("\n")}"
end

Implement custom error handling within job classes to manage recoverable errors and provide graceful degradation. Catch specific exceptions and decide whether to retry, log, or fail completely.

class RobustApiJob
  @queue = :api_calls

  def self.perform(endpoint, data, retry_count = 0)
    response = make_api_call(endpoint, data)
    process_response(response)
  rescue Net::TimeoutError, Net::HTTPServerError => e
    if retry_count < 3
      Resque.enqueue_in(
        exponential_backoff(retry_count),
        self,
        endpoint,
        data,
        retry_count + 1
      )
    else
      notify_administrators(e, endpoint, data)
      raise e # Job will go to failed queue
    end
  rescue Net::HTTPClientError => e
    # Don't retry client errors, but log them
    Rails.logger.error "API client error: #{e.message}"
    raise e
  end

  private

  def self.exponential_backoff(attempt)
    (2 ** attempt) * 60 # 2, 4, 8 minutes
  end
end

Create monitoring jobs that check system health and job queue status. These jobs can detect stuck queues, excessive failure rates, or worker problems and trigger alerts.

class QueueMonitorJob
  @queue = :monitoring

  def self.perform
    queue_stats = collect_queue_statistics
    worker_stats = collect_worker_statistics
    failure_stats = collect_failure_statistics

    check_queue_depths(queue_stats)
    check_worker_health(worker_stats)
    check_failure_rates(failure_stats)
  end

  private

  def self.collect_queue_statistics
    Resque.queues.map do |queue|
      {
        name: queue,
        size: Resque.size(queue),
        oldest_job: oldest_job_age(queue)
      }
    end
  end

  def self.check_queue_depths(stats)
    stats.each do |queue_stat|
      if queue_stat[:size] > queue_threshold(queue_stat[:name])
        alert_queue_backup(queue_stat)
      end

      if queue_stat[:oldest_job] > age_threshold
        alert_stale_jobs(queue_stat)
      end
    end
  end
end

Use job-specific logging to track execution details and aid in debugging. Structured logging helps identify patterns in job failures and performance issues.

class LoggedJob
  @queue = :logged_operations

  def self.perform(operation_id, context = {})
    logger.info(
      "Job started",
      job: name,
      operation_id: operation_id,
      context: context
    )

    begin
      result = execute_operation(operation_id, context)

      logger.info(
        "Job completed successfully",
        job: name,
        operation_id: operation_id,
        result_summary: summarize_result(result)
      )
    rescue => e
      logger.error(
        "Job failed",
        job: name,
        operation_id: operation_id,
        error_class: e.class.name,
        error_message: e.message,
        context: context
      )
      raise e
    end
  end

  private

  def self.logger
    @logger ||= Logger.new('log/jobs.log').tap do |log|
      log.formatter = proc do |severity, datetime, progname, msg|
        JSON.generate(
          timestamp: datetime.iso8601,
          level: severity,
          message: msg
        ) + "\n"
      end
    end
  end
end

Performance & Memory

Job memory usage accumulates over time in long-running worker processes. Monitor memory consumption and implement worker restart strategies to prevent memory leaks from degrading system performance.

class MemoryAwareWorker < Resque::Worker
  MAX_MEMORY_MB = 512
  MAX_JOBS_PROCESSED = 1000

  def initialize(*queues)
    super
    @jobs_processed = 0
    @start_memory = current_memory_usage
  end

  def work(interval = 5)
    super(interval) do |job|
      yield job if block_given?

      @jobs_processed += 1

      if should_restart?
        log "Worker restart triggered - memory: #{current_memory_usage}MB, jobs: #{@jobs_processed}"
        shutdown
      end
    end
  end

  private

  def should_restart?
    memory_threshold_exceeded? || job_count_threshold_exceeded?
  end

  def memory_threshold_exceeded?
    current_memory_usage > MAX_MEMORY_MB
  end

  def job_count_threshold_exceeded?
    @jobs_processed >= MAX_JOBS_PROCESSED
  end

  def current_memory_usage
    `ps -o pid,rss -p #{Process.pid}`.split("\n").last.split.last.to_i / 1024
  end
end

Optimize job performance by minimizing database queries, caching frequently accessed data, and using efficient algorithms. Profile jobs to identify bottlenecks and optimize accordingly.

class OptimizedBulkUpdateJob
  @queue = :bulk_operations

  def self.perform(model_name, ids, updates)
    model_class = model_name.constantize

    # Batch database operations
    model_class.transaction do
      # Use single query instead of individual updates
      model_class.where(id: ids).update_all(updates)

      # Batch cache invalidation
      cache_keys = ids.map { |id| "#{model_name}:#{id}" }
      Rails.cache.delete_multi(cache_keys)

      # Trigger callbacks efficiently
      if model_class.respond_to?(:after_bulk_update)
        model_class.after_bulk_update(ids, updates)
      end
    end
  end
end

Implement job chunking for large dataset processing to control memory usage and provide progress tracking. Split large operations into manageable pieces that can be processed independently.

class ChunkedDataProcessorJob
  @queue = :data_processing
  CHUNK_SIZE = 1000

  def self.perform(dataset_id, offset = 0, chunk_size = CHUNK_SIZE)
    dataset = Dataset.find(dataset_id)

    records = dataset.records.limit(chunk_size).offset(offset)
    return if records.empty?

    # Process current chunk
    results = process_records(records)
    update_progress(dataset, offset, results)

    # Queue next chunk
    if records.size == chunk_size
      Resque.enqueue(self, dataset_id, offset + chunk_size, chunk_size)
    else
      finalize_processing(dataset)
    end
  end

  private

  def self.process_records(records)
    records.map do |record|
      # Process individual record
      transform_record(record)
    end
  end

  def self.update_progress(dataset, offset, results)
    processed_count = offset + results.size
    total_count = dataset.total_record_count
    progress_percentage = (processed_count.to_f / total_count * 100).round(2)

    dataset.update(
      processed_count: processed_count,
      progress_percentage: progress_percentage,
      last_processed_at: Time.current
    )
  end
end

Configure Redis appropriately for job queue workloads. Use persistent storage configurations and monitor Redis memory usage to prevent job loss during system failures.

# Redis configuration for job queues
redis_config = {
  host: ENV['REDIS_HOST'],
  port: ENV['REDIS_PORT'],
  db: ENV['REDIS_DB'] || 0,
  driver: :hiredis, # Faster C-based Redis driver

  # Connection pool settings for multi-threaded workers
  pool: {
    size: ENV.fetch('REDIS_POOL_SIZE', 5).to_i,
    timeout: ENV.fetch('REDIS_POOL_TIMEOUT', 5).to_i
  },

  # Timeouts and retries
  connect_timeout: 30,
  read_timeout: 30,
  write_timeout: 30,
  reconnect_attempts: 3,
  reconnect_delay: 1.5
}

Resque.redis = Redis.new(redis_config)

Production Patterns

Deploy Resque workers as separate processes or containers, isolated from web application processes. Use process managers like systemd, supervisor, or container orchestrators to maintain worker availability and handle restarts.

# systemd service configuration for Resque workers
# /etc/systemd/system/resque-worker@.service

class ResqueWorkerManager
  def self.generate_systemd_config(queue_name, worker_count = 1)
    config = <<~SYSTEMD
      [Unit]
      Description=Resque Worker for #{queue_name} queue (%i)
      After=redis.service

      [Service]
      Type=simple
      User=deploy
      WorkingDirectory=/var/www/app/current
      Environment=RAILS_ENV=production
      Environment=QUEUE=#{queue_name}
      Environment=PIDFILE=/var/www/app/shared/tmp/pids/resque_#{queue_name}_%i.pid

      ExecStart=/usr/local/bin/bundle exec rake resque:work
      ExecReload=/bin/kill -USR2 $MAINPID

      Restart=always
      RestartSec=10
      StandardOutput=syslog
      StandardError=syslog
      SyslogIdentifier=resque-#{queue_name}-%i

      [Install]
      WantedBy=multi-user.target
    SYSTEMD

    puts config
  end
end

Implement comprehensive monitoring using job metrics, queue depth tracking, and worker health checks. Monitor both system-level metrics and business-specific job outcomes.

class ResqueMetrics
  include Singleton

  def initialize
    @statsd = Statsd.new('localhost', 8125)
    @last_stats = {}
  end

  def collect_and_report
    current_stats = gather_statistics
    calculate_rates(current_stats)
    report_metrics(current_stats)
    @last_stats = current_stats
  end

  private

  def gather_statistics
    {
      queues: collect_queue_stats,
      workers: collect_worker_stats,
      failed_jobs: Resque::Failure.count,
      processed_jobs: Resque::Stat.processed,
      redis_info: Resque.info
    }
  end

  def collect_queue_stats
    Resque.queues.map do |queue|
      {
        name: queue,
        size: Resque.size(queue),
        latency: calculate_queue_latency(queue)
      }
    end
  end

  def report_metrics(stats)
    # Queue metrics
    stats[:queues].each do |queue|
      @statsd.gauge("resque.queue.#{queue[:name]}.size", queue[:size])
      @statsd.gauge("resque.queue.#{queue[:name]}.latency", queue[:latency])
    end

    # Worker metrics
    @statsd.gauge('resque.workers.total', stats[:workers][:total])
    @statsd.gauge('resque.workers.working', stats[:workers][:working])

    # Job metrics
    @statsd.gauge('resque.jobs.failed', stats[:failed_jobs])
    @statsd.gauge('resque.jobs.processed', stats[:processed_jobs])
  end
end

Set up job result tracking and business metric collection. Track not just technical metrics but also business outcomes and job success rates for different job types.

class BusinessMetricsJob
  @queue = :metrics_collection

  def self.perform(metric_type, time_period)
    case metric_type
    when 'email_delivery'
      collect_email_metrics(time_period)
    when 'data_export'
      collect_export_metrics(time_period)
    when 'image_processing'
      collect_processing_metrics(time_period)
    end
  end

  private

  def self.collect_email_metrics(period)
    start_time = period.hours.ago

    sent_count = EmailJob.completed.where(created_at: start_time..).count
    failed_count = EmailJob.failed.where(created_at: start_time..).count

    success_rate = sent_count.to_f / (sent_count + failed_count) * 100

    Metrics.gauge('business.email.success_rate', success_rate)
    Metrics.count('business.email.sent', sent_count)
    Metrics.count('business.email.failed', failed_count)
  end

  def self.collect_processing_metrics(period)
    jobs = ImageProcessJob.where(created_at: period.hours.ago..)

    avg_processing_time = jobs.completed.average(:processing_duration) || 0
    throughput = jobs.completed.count / period.to_f # jobs per hour

    Metrics.gauge('business.image_processing.avg_duration', avg_processing_time)
    Metrics.gauge('business.image_processing.throughput', throughput)
  end
end

Implement graceful shutdown handling for worker processes to ensure jobs complete properly during deployments or system maintenance.

class GracefulWorker < Resque::Worker
  def initialize(*queues)
    super
    register_signal_handlers
    @shutdown_requested = false
  end

  def work(interval = 5)
    startup

    until @shutdown_requested
      if job = reserve
        working_on(job)
        perform(job)
        done_working
      else
        procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
        sleep(interval)
      end
    end

    shutdown_gracefully
  end

  private

  def register_signal_handlers
    trap('TERM') { initiate_shutdown }
    trap('INT')  { initiate_shutdown }
    trap('USR2') { initiate_restart }
  end

  def initiate_shutdown
    log "Shutdown signal received, finishing current job..."
    @shutdown_requested = true
  end

  def shutdown_gracefully
    log "Worker shutting down gracefully"
    unregister_worker
    Resque.redis.quit
  end

  def initiate_restart
    log "Restart signal received"
    @shutdown_requested = true
    # After shutdown, process manager will restart worker
  end
end

Reference

Core Classes and Methods

Class/Method Parameters Returns Description
Resque.enqueue(klass, *args) klass (Class), *args (JSON-serializable) Boolean Queues job with specified arguments
Resque.enqueue_to(queue, klass, *args) queue (Symbol), klass (Class), *args Boolean Queues job to specific queue
Resque.enqueue_in(seconds, klass, *args) seconds (Integer), klass (Class), *args Boolean Schedules job for future execution
Resque.dequeue(klass, *args) klass (Class), *args Integer Removes matching jobs from queue
Resque.size(queue) queue (Symbol) Integer Returns number of jobs in queue
Resque.peek(queue, start, count) queue (Symbol), start (Integer), count (Integer) Array Returns jobs from queue without removing
Resque::Worker.new(*queues) *queues (Symbols) Worker Creates worker for specified queues
Worker#work(interval) interval (Integer) nil Starts worker with polling interval
Worker#shutdown None nil Gracefully shuts down worker
Resque::Failure.all(offset, limit) offset (Integer), limit (Integer) Array Returns failed job records
Resque::Failure.clear None nil Removes all failed jobs

Job Class Requirements

Component Type Required Description
@queue Symbol Yes Class variable specifying target queue
perform Class method Yes Contains job execution logic
Arguments JSON-serializable Yes All job arguments must serialize to JSON

Queue Operations

Method Purpose Example
Resque.queues List all queues [:high, :normal, :low]
Resque.remove_queue(queue) Delete queue and jobs Resque.remove_queue(:old_queue)
Resque.queue_size(queue) Get queue depth Resque.queue_size(:email)

Worker Configuration

Property Type Default Description
verbose Boolean false Enable detailed logging
very_verbose Boolean false Enable debug-level logging
term_timeout Integer 4 Seconds to wait before SIGKILL
term_child Boolean false Send TERM to job process
run_at_exit_hooks Boolean false Run at_exit hooks on shutdown

Redis Configuration

Option Type Description Example
host String Redis server hostname 'redis.example.com'
port Integer Redis server port 6379
db Integer Redis database number 0
namespace String Key prefix for Resque data 'myapp:jobs'
password String Redis authentication password 'secret123'

Hook Methods

Hook Timing Parameters Use Case
before_perform Before job execution *args Validation, logging, setup
after_perform After successful job *args Cleanup, metrics, notifications
around_perform Wraps job execution *args Timing, error handling, transactions
on_failure After job failure exception, *args Error logging, alerts, cleanup

Error Classes

Exception Parent Class Description
Resque::TermException SignalException Worker received termination signal
Resque::DirtyExit StandardError Job process exited uncleanly
Resque::PruneDeadWorkerDirtyExit StandardError Worker cleanup failure

Web Interface Routes

Path Purpose
/ Overview dashboard
/queues Queue listing and management
/workers Worker status and management
/failed Failed job inspection and retry
/stats System statistics and graphs

Environment Variables

Variable Default Description
QUEUE None Comma-separated queue names for worker
QUEUES None Alternative to QUEUE variable
REDIS_URL redis://localhost:6379 Redis connection URL
RESQUE_WEB_HTTP_BASIC_AUTH None Username:password for web interface
INTERVAL 5 Worker polling interval in seconds
PIDFILE None Path for worker process ID file