Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Name your workers with ease #35

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 36 additions & 20 deletions lib/delayed/job.rb
@@ -1,3 +1,5 @@
require 'timeout'

module Delayed

class DeserializationError < StandardError
Expand All @@ -6,8 +8,11 @@ class DeserializationError < StandardError
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
@@max_attempts = 25
@@max_run_time = 4.hours

cattr_accessor :max_attempts, :max_run_time

set_table_name :delayed_jobs

# By default failed jobs are destroyed after too many attempts.
Expand Down Expand Up @@ -63,33 +68,40 @@ def payload_object=(object)
# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
if max_attempts_exceeded?
max_attempts_exceeded
else
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
save!
end
end

def max_attempts_exceeded?
self.attempts > max_attempts
end

def max_attempts_exceeded
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
end

# Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
def run_with_lock(max_run_time, worker_name)
logger.info "* [JOB] aquiring lock on #{name}"
logger.info "* [JOB] acquiring lock on #{name}"
unless lock_exclusively!(max_run_time, worker_name)
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
return nil # no work done
end

begin
raise "Attempted to run this job for a #{attempts} time which exceeds the max allowed of #{max_attempts}" if max_attempts_exceeded?
runtime = Benchmark.realtime do
invoke_job # TODO: raise error if takes longer than max_run_time
Timeout.timeout(max_run_time.to_i) { invoke_job }
destroy
end
# TODO: warn if runtime > max_run_time ?
Expand Down Expand Up @@ -117,8 +129,7 @@ def self.enqueue(*args, &block)
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
# Return in random order prevent everyone trying to do same head job at once.
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
def self.find_available(limit = 5, max_run_time = max_run_time)

time_now = db_time_now

Expand All @@ -138,16 +149,14 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

conditions.unshift(sql)

records = ActiveRecord::Base.silence do
ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

records.sort_by { rand() }
end

# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
def self.reserve_and_run_one_job(max_run_time = max_run_time)

# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
Expand All @@ -165,11 +174,11 @@ def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
self.class.update_all(["locked_at = ?, locked_by = ?, attempts = ?", now, worker, self.attempts += 1], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
self.class.update_all(["locked_at = ?, attempts = ?", now, self.attempts += 1], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
Expand All @@ -190,6 +199,13 @@ def unlock
def log_exception(error)
logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
begin
scout_response = RailzScout.submit_bug(error, nil, nil)
logger.error("* [JOB] #{name} posted case #{scout_response[:case_number]} to Fogbugz")
rescue => e
logger.error("* [JOB] #{name} FogBugz post raised an error: #{e}")
logger.error(error.backtrace)
end
end

# Do num jobs and return stats on success/failure.
Expand Down Expand Up @@ -233,7 +249,7 @@ def deserialize(source)
return handler if handler.respond_to?(:perform)

raise DeserializationError,
'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
Expand Down
1 change: 1 addition & 0 deletions lib/delayed/worker.rb
Expand Up @@ -11,6 +11,7 @@ class Worker

def initialize(options={})
@quiet = options[:quiet]
Delayed::Job.worker_name = options[:worker_name] if options.has_key?(:worker_name)
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
Expand Down