REFACTOR: extract scheduler to the mini_scheduler gem

This commit is contained in:
Neil Lalonde
2018-07-31 17:12:55 -04:00
parent fd29ecb91a
commit 4ad7ce70ce
16 changed files with 28 additions and 1106 deletions

View File

@@ -43,6 +43,8 @@ module Discourse
# other desired context.
# See app/jobs/base.rb for the error_context function.
def self.handle_job_exception(ex, context = {}, parent_logger = nil)
return if ex.class == Jobs::HandledExceptionWrapper
context ||= {}
parent_logger ||= SidekiqExceptionHandler

View File

@@ -1,360 +0,0 @@
# Initially we used sidetiq, this was a problem:
#
# 1. No mechnism to add "randomisation" into job execution
# 2. No stats about previous runs or failures
# 3. Dependency on ice_cube gem causes runaway CPU
require_dependency 'distributed_mutex'
module Scheduler
class Manager
attr_accessor :random_ratio, :redis, :enable_stats
class Runner
def initialize(manager)
@stopped = false
@mutex = Mutex.new
@queue = Queue.new
@manager = manager
@reschedule_orphans_thread = Thread.new do
while !@stopped
sleep 1.minute
@mutex.synchronize do
reschedule_orphans
end
end
end
@keep_alive_thread = Thread.new do
while !@stopped
@mutex.synchronize do
keep_alive
end
sleep (@manager.keep_alive_duration / 2)
end
end
@thread = Thread.new do
while !@stopped
process_queue
end
end
end
def keep_alive
@manager.keep_alive
rescue => ex
Discourse.handle_job_exception(ex, message: "Scheduling manager keep-alive")
end
def reschedule_orphans
@manager.reschedule_orphans!
rescue => ex
Discourse.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
end
def hostname
@hostname ||= begin
`hostname`
rescue
"unknown"
end
end
def process_queue
klass = @queue.deq
return unless klass
# hack alert, I need to both deq and set @running atomically.
@running = true
failed = false
start = Time.now.to_f
info = @mutex.synchronize { @manager.schedule_info(klass) }
stat = nil
error = nil
begin
info.prev_result = "RUNNING"
@mutex.synchronize { info.write! }
if @manager.enable_stats
RailsMultisite::ConnectionManagement.with_connection("default") do
stat = SchedulerStat.create!(
name: klass.to_s,
hostname: hostname,
pid: Process.pid,
started_at: Time.zone.now,
live_slots_start: GC.stat[:heap_live_slots]
)
end
end
klass.new.perform
rescue => e
if e.class != Jobs::HandledExceptionWrapper
Discourse.handle_job_exception(e, message: "Running a scheduled job", job: klass)
end
error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
failed = true
end
duration = ((Time.now.to_f - start) * 1000).to_i
info.prev_duration = duration
info.prev_result = failed ? "FAILED" : "OK"
info.current_owner = nil
if stat
RailsMultisite::ConnectionManagement.with_connection("default") do
stat.update!(
duration_ms: duration,
live_slots_finish: GC.stat[:heap_live_slots],
success: !failed,
error: error
)
DiscourseEvent.trigger(:scheduled_job_ran, stat)
end
end
attempts(3) do
@mutex.synchronize { info.write! }
end
rescue => ex
Discourse.handle_job_exception(ex, message: "Processing scheduled job queue")
ensure
@running = false
ActiveRecord::Base.connection_handler.clear_active_connections!
end
def stop!
return if @stopped
@mutex.synchronize do
@stopped = true
@keep_alive_thread.kill
@reschedule_orphans_thread.kill
@keep_alive_thread.join
@reschedule_orphans_thread.join
enq(nil)
kill_thread = Thread.new do
sleep 0.5
@thread.kill
end
@thread.join
kill_thread.kill
kill_thread.join
end
end
def enq(klass)
@queue << klass
end
def wait_till_done
while !@queue.empty? && !(@queue.num_waiting > 0)
sleep 0.001
end
# this is a hack, but is only used for test anyway
sleep 0.001
while @running
sleep 0.001
end
end
def attempts(n)
n.times {
begin
yield; break
rescue
sleep Random.rand
end
}
end
end
def self.without_runner(redis = nil)
self.new(redis, skip_runner: true)
end
def initialize(redis = nil, options = nil)
@redis = $redis || redis
@random_ratio = 0.1
unless options && options[:skip_runner]
@runner = Runner.new(self)
self.class.current = self
end
@hostname = options && options[:hostname]
@manager_id = SecureRandom.hex
if options && options.key?(:enable_stats)
@enable_stats = options[:enable_stats]
else
@enable_stats = true
end
end
def self.current
@current
end
def self.current=(manager)
@current = manager
end
def hostname
@hostname ||= `hostname`.strip
end
def schedule_info(klass)
ScheduleInfo.new(klass, self)
end
def next_run(klass)
schedule_info(klass).next_run
end
def ensure_schedule!(klass)
lock do
schedule_info(klass).schedule!
end
end
def remove(klass)
lock do
schedule_info(klass).del!
end
end
def reschedule_orphans!
lock do
reschedule_orphans_on!
reschedule_orphans_on!(hostname)
end
end
def reschedule_orphans_on!(hostname = nil)
redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
klass = get_klass(key)
next unless klass
info = schedule_info(klass)
if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
(info.current_owner.blank? || !redis.get(info.current_owner))
info.prev_result = 'ORPHAN'
info.next_run = Time.now.to_i
info.write!
end
end
end
def get_klass(name)
name.constantize
rescue NameError
nil
end
def tick
lock do
schedule_next_job
schedule_next_job(hostname)
end
end
def schedule_next_job(hostname = nil)
(key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true
return unless key
if due.to_i <= Time.now.to_i
klass = get_klass(key)
unless klass
# corrupt key, nuke it (renamed job or something)
redis.zrem Manager.queue_key(hostname), key
return
end
info = schedule_info(klass)
info.prev_run = Time.now.to_i
info.prev_result = "QUEUED"
info.prev_duration = -1
info.next_run = nil
info.current_owner = identity_key
info.schedule!
@runner.enq(klass)
end
end
def blocking_tick
tick
@runner.wait_till_done
end
def stop!
@runner.stop!
self.class.current = nil
end
def keep_alive_duration
60
end
def keep_alive
redis.setex identity_key, keep_alive_duration, ""
end
def lock
DistributedMutex.new(Manager.lock_key).synchronize do
yield
end
end
def self.discover_schedules
# hack for developemnt reloader is crazytown
# multiple classes with same name can be in
# object space
unique = Set.new
schedules = []
ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
if schedule.scheduled?
next if unique.include?(schedule.to_s)
schedules << schedule
unique << schedule.to_s
end
end
schedules
end
@mutex = Mutex.new
def self.seq
@mutex.synchronize do
@i ||= 0
@i += 1
end
end
def identity_key
@identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
def self.lock_key
"_scheduler_lock_"
end
def self.queue_key(hostname = nil)
if hostname
"_scheduler_queue_#{hostname}_"
else
"_scheduler_queue_"
end
end
def self.schedule_key(klass, hostname = nil)
if hostname
"_scheduler_#{klass}_#{hostname}"
else
"_scheduler_#{klass}"
end
end
end
end

View File

@@ -1,37 +0,0 @@
module Scheduler::Schedule
def daily(options = nil)
if options
@daily = options
end
@daily
end
def every(duration = nil)
if duration
@every = duration
if manager = Scheduler::Manager.current
manager.ensure_schedule!(self)
end
end
@every
end
# schedule job indepndently on each host (looking at hostname)
def per_host
@per_host = true
end
def is_per_host
@per_host
end
def schedule_info
manager = Scheduler::Manager.without_runner
manager.schedule_info self
end
def scheduled?
!!@every || !!@daily
end
end

View File

@@ -1,138 +0,0 @@
module Scheduler
class ScheduleInfo
attr_accessor :next_run,
:prev_run,
:prev_duration,
:prev_result,
:current_owner
def initialize(klass, manager)
@klass = klass
@manager = manager
data = nil
if data = @manager.redis.get(key)
data = JSON.parse(data)
end
if data
@next_run = data["next_run"]
@prev_run = data["prev_run"]
@prev_result = data["prev_result"]
@prev_duration = data["prev_duration"]
@current_owner = data["current_owner"]
end
rescue
# corrupt redis
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
# this means the schedule is going to fire, it is setup correctly
# invalid schedules are fixed by running "schedule!"
# this happens automatically after if fire by the manager
def valid?
return false unless @next_run
(!@prev_run && @next_run < Time.now.to_i + 5.minutes) || valid_every? || valid_daily?
end
def valid_every?
return false unless @klass.every
!!@prev_run &&
@prev_run <= Time.now.to_i &&
@next_run < @prev_run + @klass.every * (1 + @manager.random_ratio)
end
def valid_daily?
return false unless @klass.daily
return true if !@prev_run && @next_run && @next_run <= (Time.zone.now + 1.day).to_i
!!@prev_run &&
@prev_run <= Time.zone.now.to_i &&
@next_run < @prev_run + 1.day
end
def schedule_every!
if !valid? && @prev_run
mixup = @klass.every * @manager.random_ratio
mixup = (mixup * Random.rand - mixup / 2).to_i
@next_run = @prev_run + mixup + @klass.every
end
if !valid?
@next_run = Time.now.to_i + 5.minutes * Random.rand
end
end
def schedule_daily!
return if valid?
at = @klass.daily[:at] || 0
today_begin = Time.zone.now.midnight.to_i
today_offset = DateTime.now.seconds_since_midnight
# If it's later today
if at > today_offset
@next_run = today_begin + at
else
# Otherwise do it tomorrow
@next_run = today_begin + 1.day + at
end
end
def schedule!
if @klass.every
schedule_every!
elsif @klass.daily
schedule_daily!
end
write!
end
def write!
clear!
redis.set key, {
next_run: @next_run,
prev_run: @prev_run,
prev_duration: @prev_duration,
prev_result: @prev_result,
current_owner: @current_owner
}.to_json
redis.zadd queue_key, @next_run, @klass if @next_run
end
def del!
clear!
@next_run = @prev_run = @prev_result = @prev_duration = @current_owner = nil
end
def key
if @klass.is_per_host
Manager.schedule_key(@klass, @manager.hostname)
else
Manager.schedule_key(@klass)
end
end
def queue_key
if @klass.is_per_host
Manager.queue_key(@manager.hostname)
else
Manager.queue_key
end
end
def redis
@manager.redis
end
private
def clear!
redis.del key
redis.zrem queue_key, @klass
end
end
end

View File

@@ -1,7 +0,0 @@
module Scheduler
end
require_dependency 'scheduler/schedule'
require_dependency 'scheduler/schedule_info'
require_dependency 'scheduler/manager'
require_dependency 'scheduler/defer'

View File

@@ -1,47 +0,0 @@
<header class="row">
<div class="col-sm-12">
<h3>Scheduler History</h3>
</div>
</header>
<div class="container">
<div class="row">
<div class="col-md-9">
<% if @scheduler_stats.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead>
<th style="width: 30%">Job Name</th>
<th style="width: 15%">Hostname:Pid</th>
<th style="width: 15%">Live Slots delta</th>
<th style="width: 15%">Started At</th>
<th style="width: 15%">Duration</th>
<th style="width: 15%"></th>
</thead>
<tbody>
<% @scheduler_stats.each do |stat| %>
<tr>
<td><%= stat.name %></td>
<td><%= stat.hostname %>:<%= stat.pid %></td>
<td>
<% if stat.live_slots_start && stat.live_slots_finish %>
<%= stat.live_slots_finish - stat.live_slots_start %>
<% end %>
</td>
<td><%= sane_time stat.started_at %></td>
<td><%= sane_duration stat.duration_ms %></td>
<td>
<% if stat.success.nil? %>
RUNNING
<% elsif !stat.success %>
FAILED
<% end %>
</td>
</tr>
<% end %>
</tbody>
</table>
<% end %>
</div>
</div>
</div>

View File

@@ -1,73 +0,0 @@
<header class="row">
<% if Sidekiq.paused? %>
<div class="col-sm-12">
<div class="alert alert-danger text-center">
<h2>SIDEKIQ IS PAUSED!</h2>
</div>
</div>
<% end %>
<div class="col-sm-12">
<h3>Recurring Jobs <a style='font-size:50%; margin-left: 30px' href='scheduler/history'>history</a></h3>
</div>
</header>
<div class="container">
<div class="row">
<div class="col-md-9">
<% if @schedules.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead>
<th style="width: 30%">Worker</th>
<th style="width: 15%">Last Run</th>
<th style="width: 15%">Last Result</th>
<th style="width: 15%">Last Duration</th>
<th style="width: 15%">Last Owner</th>
<th style="width: 15%">Next Run Due</th>
<th style="width: 10%">Actions</th>
</thead>
<% @schedules.each do |schedule| %>
<% @info = schedule.schedule_info %>
<tr>
<td>
<%= schedule %>
<td>
<% prev = @info.prev_run %>
<% if prev.nil? %>
Never
<% else %>
<%= relative_time(Time.at(prev)) %>
<% end %>
</td>
<td>
<%= @info.prev_result %>
</td>
<td>
<%= sane_duration @info.prev_duration %>
</td>
<td>
<%= @info.current_owner %>
</td>
<td>
<% next_run = @info.next_run %>
<% if next_run.nil? %>
Not Scheduled Yet
<% else %>
<%= relative_time(Time.at(next_run)) %>
<% end %>
</td>
<td>
<form action="<%= "#{root_path}scheduler/#{schedule}/trigger" %>" method="post">
<%= csrf_tag if respond_to?(:csrf_tag) %>
<input class="btn btn-danger btn-small" type="submit" name="trigger" value="Trigger" data-confirm="Are you sure you want to trigger this job?" />
</form>
</td>
</tr>
<% end %>
</table>
<% else %>
<div class="alert alert-success">No recurring jobs found.</div>
<% end %>
</div>
</div>
</div>

View File

@@ -1,65 +0,0 @@
# Based off sidetiq https://github.com/tobiassvn/sidetiq/blob/master/lib/sidetiq/web.rb
module Scheduler
module Web
VIEWS = File.expand_path('views', File.dirname(__FILE__)) unless defined? VIEWS
def self.registered(app)
app.helpers do
def sane_time(time)
return unless time
time
end
def sane_duration(duration)
return unless duration
if duration < 1000
"#{duration}ms"
elsif duration < 60 * 1000
"#{'%.2f' % (duration / 1000.0)} secs"
end
end
end
app.get "/scheduler" do
RailsMultisite::ConnectionManagement.with_connection("default") do
@manager = Scheduler::Manager.without_runner
@schedules = Scheduler::Manager.discover_schedules.sort do |a, b|
a_next = a.schedule_info.next_run
b_next = b.schedule_info.next_run
if a_next && b_next
a_next <=> b_next
elsif a_next
-1
else
1
end
end
erb File.read(File.join(VIEWS, 'scheduler.erb')), locals: { view_path: VIEWS }
end
end
app.get "/scheduler/history" do
@scheduler_stats = SchedulerStat.order('started_at desc').limit(200)
erb File.read(File.join(VIEWS, 'history.erb')), locals: { view_path: VIEWS }
end
app.post "/scheduler/:name/trigger" do
halt 404 unless (name = params[:name])
RailsMultisite::ConnectionManagement.with_connection("default") do
klass = name.constantize
info = klass.schedule_info
info.next_run = Time.now.to_i
info.write!
redirect "#{root_path}scheduler"
end
end
end
end
end
Sidekiq::Web.register(Scheduler::Web)
Sidekiq::Web.tabs["Scheduler"] = "scheduler"

View File

@@ -28,7 +28,7 @@ end
desc "run every task the scheduler knows about in that order, use only for debugging"
task 'scheduler:run_all' => :environment do
Scheduler::Manager.discover_schedules.each do |schedule|
MiniScheduler::Manager.discover_schedules.each do |schedule|
puts "Running #{schedule}"
time { schedule.new.execute({}) }
end