class Sidekiq::Processor
The Processor
is a standalone thread which:
-
fetches a job from Redis
-
executes the job
a. instantiate the Worker b. run the middleware chain c. call #perform
A Processor
can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)
If an error occurs in the job execution, the Processor
calls the Manager
to create a new one to replace itself and exits.
Constants
- FAILURE
- PROCESSED
- WORKER_STATE
Attributes
job[R]
thread[R]
Public Class Methods
new(mgr)
click to toggle source
# File lib/sidekiq/processor.rb, line 32 def initialize(mgr) @mgr = mgr @down = false @done = false @job = nil @thread = nil @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) @reloader = Sidekiq.options[:reloader] @logging = (mgr.options[:job_logger] || Sidekiq::JobLogger).new @retrier = Sidekiq::JobRetry.new end
Public Instance Methods
kill(wait=false)
click to toggle source
# File lib/sidekiq/processor.rb, line 50 def kill(wait=false) @done = true return if !@thread # unlike the other actors, terminate does not wait # for the thread to finish because we don't know how # long the job will take to finish. Instead we # provide a `kill` method to call after the shutdown # timeout passes. @thread.raise ::Sidekiq::Shutdown @thread.value if wait end
start()
click to toggle source
# File lib/sidekiq/processor.rb, line 62 def start @thread ||= safe_thread("processor", &method(:run)) end
terminate(wait=false)
click to toggle source
# File lib/sidekiq/processor.rb, line 44 def terminate(wait=false) @done = true return if !@thread @thread.value if wait end
Private Instance Methods
cloned(thing)
click to toggle source
Deep clone the arguments passed to the worker so that if the job fails, what is pushed back onto Redis hasn't been mutated by the worker.
# File lib/sidekiq/processor.rb, line 263 def cloned(thing) Marshal.load(Marshal.dump(thing)) end
constantize(str)
click to toggle source
# File lib/sidekiq/processor.rb, line 267 def constantize(str) names = str.split('::') names.shift if names.empty? || names.first.empty? names.inject(Object) do |constant, name| # the false flag limits search for name to under the constant namespace # which mimics Rails' behaviour constant.const_defined?(name, false) ? constant.const_get(name, false) : constant.const_missing(name) end end
dispatch(job_hash, queue) { |worker| ... }
click to toggle source
# File lib/sidekiq/processor.rb, line 118 def dispatch(job_hash, queue) # since middleware can mutate the job hash # we clone here so we report the original # job structure to the Web UI pristine = cloned(job_hash) Sidekiq::Logging.with_job_hash_context(job_hash) do @retrier.global(pristine, queue) do @logging.call(job_hash, queue) do stats(pristine, queue) do # Rails 5 requires a Reloader to wrap code execution. In order to # constantize the worker and instantiate an instance, we have to call # the Reloader. It handles code loading, db connection management, etc. # Effectively this block denotes a "unit of work" to Rails. @reloader.call do klass = constantize(job_hash['class']) worker = klass.new worker.jid = job_hash['jid'] @retrier.local(worker, pristine, queue) do yield worker end end end end end end end
execute_job(worker, cloned_args)
click to toggle source
# File lib/sidekiq/processor.rb, line 191 def execute_job(worker, cloned_args) worker.perform(*cloned_args) end
fetch()
click to toggle source
# File lib/sidekiq/processor.rb, line 98 def fetch j = get_one if j && @done j.requeue nil else j end end
get_one()
click to toggle source
# File lib/sidekiq/processor.rb, line 87 def get_one begin work = @strategy.retrieve_work (logger.info { "Redis is online, #{::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @down} sec downtime" }; @down = nil) if @down work rescue Sidekiq::Shutdown rescue => ex handle_fetch_exception(ex) end end
handle_fetch_exception(ex)
click to toggle source
# File lib/sidekiq/processor.rb, line 108 def handle_fetch_exception(ex) if !@down @down = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) logger.error("Error fetching job: #{ex}") handle_exception(ex) end sleep(1) nil end
process(work)
click to toggle source
# File lib/sidekiq/processor.rb, line 146 def process(work) jobstr = work.job queue = work.queue_name # Treat malformed JSON as a special case: job goes straight to the morgue. job_hash = nil begin job_hash = Sidekiq.load_json(jobstr) rescue => ex handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) # we can't notify because the job isn't a valid hash payload. DeadSet.new.kill(jobstr, notify_failure: false) return work.acknowledge end ack = true begin dispatch(job_hash, queue) do |worker| Sidekiq.server_middleware.invoke(worker, job_hash, queue) do execute_job(worker, cloned(job_hash['args'])) end end rescue Sidekiq::Shutdown # Had to force kill this job because it didn't finish # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false rescue Sidekiq::JobRetry::Handled => h # this is the common case: job raised error and Sidekiq::JobRetry::Handled # signals that we created a retry successfully. We can acknowlege the job. e = h.cause ? h.cause : h handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr }) raise e rescue Exception => ex # Unexpected error! This is very bad and indicates an exception that got past # the retry subsystem (e.g. network partition). We won't acknowledge the job # so it can be rescued when using Sidekiq Pro. ack = false handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr }) raise e ensure work.acknowledge if ack end end
process_one()
click to toggle source
# File lib/sidekiq/processor.rb, line 81 def process_one @job = fetch process(@job) if @job @job = nil end
run()
click to toggle source
# File lib/sidekiq/processor.rb, line 68 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
stats(job_hash, queue) { || ... }
click to toggle source
# File lib/sidekiq/processor.rb, line 245 def stats(job_hash, queue) tid = Sidekiq::Logging.tid WORKER_STATE.set(tid, {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i }) begin yield rescue Exception FAILURE.incr raise ensure WORKER_STATE.delete(tid) PROCESSED.incr end end