module PgJobs

Simple ActiveJob worker for PostgreSQL using LISTEN/NOTIFY and SKIP LOCKED.

Supports most ActiveJob features like multiple queues, priorities and wait times.

To use this as your Rails job queue, add this to your environment configuration (config/environments/production.rb):

config.active_job.queue_adapter = :pg_jobs

Then run one or multiple workers for the default queue with

bin/rails runner PgJobs.work

or for other queues with

bin/rails runner "PgJobs.work(:my_queue)"

Needs PostgreSQL 9.5 to use SKIP LOCKED.

Constants

VERSION

Public Class Methods

enqueue(job, scheduled_for = nil) click to toggle source

Enqueue a new job to run at a given time or immediately

@param job [ActiveJob::Base] The ActiveJob job object to schedule @param scheduled_for [Integer,Time] Timestamp when the job should be

executed. Use nil if the job should be run immediately.
# File lib/pg_jobs.rb, line 72
def self.enqueue(job, scheduled_for = nil)
  PgJob.create!(job_data: job.serialize,
                scheduled_for: scheduled_for && Time.at(scheduled_for),
                priority: job.priority || 100,
                queue_name: job.queue_name || 'default')
end
execute_job(pg_job) click to toggle source

Execute a PgJob instance. Calls `ActiveJob::Base.execute`.

# File lib/pg_jobs.rb, line 80
def self.execute_job(pg_job)
  ActiveJob::Base.execute(pg_job.job_data)
rescue => e
  Rails.logger.error do
    "[pg_jobs] [#{pg_job.queue_name}] [#{pg_job.job_data['job_id']}] " \
    "Error while executing job: #{e}\n" + e.backtrace.join("\n")
  end
end
work(queue_name = 'default', timeout: 10, exit_signals: %w[INT TERM]) click to toggle source

Run a worker process for a given queue name. Will run all scheduled jobs in the queue ordered by their priorities (lowest first) and then wait for PostgreSQL LISTEN events to run new jobs. For jobs that are scheduled for a later time, it wakes up in an interval given by the timeout parameter to check for jobs that became due in the meantime.

Handles SIGTERM for graceful shutdown. This signal will interrupt neither the execution of a job nor waiting for a new job, so a shorter timeout means a faster shutdown on SIGTERM.

@param queue_name [String] The name of the queue to work on @param timeout [integer] Interval to check for due jobs @param exit_signals [Array<String>] Array of signal names for graceful exit

# File lib/pg_jobs.rb, line 39
def self.work(queue_name = 'default', timeout: 10, exit_signals: %w[INT TERM])
  exit_signal = false
  job_running = false

  exit_signals.each do |signal|
    Signal.trap(signal) do
      raise SignalException, signal unless job_running

      # Put this message to STDERR because the logger cannot be used in a trap context
      STDERR.puts "Received signal #{signal}, waiting for current job to finish"
      exit_signal = true
    end
  end

  Rails.logger.info do
    "[pg_jobs] [#{queue_name}] " \
    "Starting pg_jobs worker for queue '#{queue_name}' with wait timeout #{timeout} seconds"
  end

  PgJob.yield_jobs(queue_name, timeout) do |pg_job|
    job_running = true
    execute_job(pg_job)
    job_running = false

    break if exit_signal
  end
end