From 71a38542a406eb32707782fd293c212878e9d2ab Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 12 Feb 2014 13:32:34 +1100 Subject: [PATCH] FEATURE: automatic orphan recovery BUGFIX: improve scheduler robustness, in case redis is disconnected during operation If sidekiq is terminated while task is running, it will be picked up and ran again New owner on tasks to help debugging better #stop semantics for tests --- lib/scheduler/manager.rb | 146 ++++++++++++++++++---- lib/scheduler/schedule_info.rb | 11 +- lib/scheduler/views/scheduler.erb | 4 + spec/components/scheduler/manager_spec.rb | 42 +++++++ 4 files changed, 177 insertions(+), 26 deletions(-) diff --git a/lib/scheduler/manager.rb b/lib/scheduler/manager.rb index b1449973a11..838c4db9d98 100644 --- a/lib/scheduler/manager.rb +++ b/lib/scheduler/manager.rb @@ -12,32 +12,78 @@ module Scheduler class Runner def initialize(manager) + @mutex = Mutex.new @queue = Queue.new @manager = manager + @reschedule_orphans_thread = Thread.new do + while true + sleep 1.minute + @mutex.synchronize do + reschedule_orphans + end + end + end + @keep_alive_thread = Thread.new do + while true + @mutex.synchronize do + keep_alive + end + sleep (@manager.keep_alive_duration / 2) + end + end @thread = Thread.new do while true - klass = @queue.deq - failed = false - start = Time.now.to_f - info = @manager.schedule_info(klass) - begin - info.prev_result = "RUNNING" - info.write! - klass.new.perform - rescue => e - Scheduler::Manager.handle_exception(e) - failed = true - end - duration = ((Time.now.to_f - start) * 1000).to_i - info.prev_duration = duration - info.prev_result = failed ? "FAILED" : "OK" - info.write! + process_queue end end end + def keep_alive + @manager.keep_alive + rescue => ex + Scheduler::Manager.handle_exception(ex) + end + + def reschedule_orphans + @manager.reschedule_orphans! + rescue => ex + Scheduler::Manager.handle_exception(ex) + end + + def process_queue + klass = @queue.deq + # hack alert, I need to both deq and set @running atomically. + @running = true + failed = false + start = Time.now.to_f + info = @mutex.synchronize { @manager.schedule_info(klass) } + begin + info.prev_result = "RUNNING" + @mutex.synchronize { info.write! } + klass.new.perform + rescue => e + Scheduler::Manager.handle_exception(e) + failed = true + end + duration = ((Time.now.to_f - start) * 1000).to_i + info.prev_duration = duration + info.prev_result = failed ? "FAILED" : "OK" + info.current_owner = nil + attempts(3) do + @mutex.synchronize { info.write! } + end + rescue => ex + Scheduler::Manager.handle_exception(ex) + ensure + @running = false + end + def stop! - @thread.kill + @mutex.synchronize do + @thread.kill + @keep_alive_thread.kill + @reschedule_orphans_thread.kill + end end def enq(klass) @@ -48,7 +94,23 @@ module Scheduler while !@queue.empty? && !(@queue.num_waiting > 0) sleep 0.001 end + # this is a hack, but is only used for test anyway + sleep 0.001 + while @running + sleep 0.001 + end end + + def attempts(n) + n.times { + begin + yield; break + rescue + sleep Random.rand + end + } + end + end def self.without_runner(redis=nil) @@ -94,22 +156,42 @@ module Scheduler end end + def reschedule_orphans! + lock do + redis.zrange(Manager.queue_key, 0, -1).each do |key| + klass = get_klass(key) + next unless klass + info = schedule_info(klass) + + if ['QUEUED', 'RUNNING'].include?(info.prev_result) && + (info.current_owner.blank? || !redis.get(info.current_owner)) + info.prev_result = 'ORPHAN' + info.next_run = Time.now.to_i + info.write! + end + end + end + end + + def get_klass(name) + name.constantize + rescue NameError + nil + end + def tick lock do (key, due), _ = redis.zrange Manager.queue_key, 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i - klass = begin - key.constantize - rescue NameError - nil - end + klass = get_klass(key) return unless klass info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil + info.current_owner = identity_key info.schedule! @runner.enq(klass) end @@ -126,6 +208,13 @@ module Scheduler self.class.current = nil end + def keep_alive_duration + 60 + end + + def keep_alive + redis.setex identity_key, keep_alive_duration, "" + end def lock got_lock = false @@ -157,6 +246,7 @@ module Scheduler redis.del Manager.lock_key end + def self.discover_schedules schedules = [] ObjectSpace.each_object(Scheduler::Schedule) do |schedule| @@ -165,6 +255,18 @@ module Scheduler schedules end + @mutex = Mutex.new + def self.seq + @mutex.synchronize do + @i ||= 0 + @i += 1 + end + end + + def identity_key + @identity_key ||= "_scheduler_#{`hostname`}:#{Process.pid}:#{self.class.seq}" + end + def self.lock_key "_scheduler_lock_" end diff --git a/lib/scheduler/schedule_info.rb b/lib/scheduler/schedule_info.rb index d8aeaa71fed..18d12147be9 100644 --- a/lib/scheduler/schedule_info.rb +++ b/lib/scheduler/schedule_info.rb @@ -3,7 +3,8 @@ module Scheduler attr_accessor :next_run, :prev_run, :prev_duration, - :prev_result + :prev_result, + :current_owner def initialize(klass, manager) @klass = klass @@ -21,10 +22,11 @@ module Scheduler @prev_run = data["prev_run"] @prev_result = data["prev_result"] @prev_duration = data["prev_duration"] + @current_owner = data["current_owner"] end rescue # corrupt redis - @next_run = @prev_run = @prev_result = @prev_duration = nil + @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil end def valid? @@ -57,14 +59,15 @@ module Scheduler next_run: @next_run, prev_run: @prev_run, prev_duration: @prev_duration, - prev_result: @prev_result + prev_result: @prev_result, + current_owner: @current_owner }.to_json redis.zadd Manager.queue_key, @next_run , @klass end def del! clear! - @next_run = @prev_run = @prev_result = @prev_duration = nil + @next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil end def key diff --git a/lib/scheduler/views/scheduler.erb b/lib/scheduler/views/scheduler.erb index dcd3e415742..b46bd4c2f5b 100644 --- a/lib/scheduler/views/scheduler.erb +++ b/lib/scheduler/views/scheduler.erb @@ -15,6 +15,7 @@ Last Run Last Result Last Duration + Last Owner Next Run Due Actions @@ -37,6 +38,9 @@ <%= @info.prev_duration %> + + <%= @info.current_owner %> + <% next_run = @info.next_run %> <% if next_run.nil? %> diff --git a/spec/components/scheduler/manager_spec.rb b/spec/components/scheduler/manager_spec.rb index b1ff8e0ee57..2f9dbe8e524 100644 --- a/spec/components/scheduler/manager_spec.rb +++ b/spec/components/scheduler/manager_spec.rb @@ -23,19 +23,60 @@ describe Scheduler::Manager do sleep 0.001 end end + + class SuperLongJob + extend ::Scheduler::Schedule + + every 10.minutes + + def perform + sleep 1000 + end + end end let(:manager) { Scheduler::Manager.new(DiscourseRedis.new) } before do + $redis.del manager.class.lock_key $redis.del manager.class.queue_key + manager.remove(Testing::RandomJob) + manager.remove(Testing::SuperLongJob) end after do manager.stop! + manager.remove(Testing::RandomJob) + manager.remove(Testing::SuperLongJob) + end + + describe '#sync' do + + it 'increases' do + Scheduler::Manager.seq.should == Scheduler::Manager.seq - 1 + end end describe '#tick' do + + it 'should recover from crashed manager' do + + info = manager.schedule_info(Testing::SuperLongJob) + info.next_run = Time.now.to_i - 1 + info.write! + + manager.tick + manager.stop! + + $redis.del manager.identity_key + + manager = Scheduler::Manager.new(DiscourseRedis.new) + manager.reschedule_orphans! + + info = manager.schedule_info(Testing::SuperLongJob) + info.next_run.should <= Time.now.to_i + end + it 'should only run pending job once' do Testing::RandomJob.runs = 0 @@ -48,6 +89,7 @@ describe Scheduler::Manager do Thread.new do manager = Scheduler::Manager.new(DiscourseRedis.new) manager.blocking_tick + manager.stop! end end.map(&:join)