2014-08-18 23:04:58 -05:00
|
|
|
require 'thread'
|
|
|
|
|
|
|
|
class SidekiqPauser
|
2019-02-13 19:22:40 -06:00
|
|
|
TTL = 60
|
|
|
|
PAUSED_KEY = "sidekiq_is_paused_v2"
|
|
|
|
|
2014-08-18 23:04:58 -05:00
|
|
|
def initialize
|
|
|
|
@mutex = Mutex.new
|
2019-02-13 19:22:40 -06:00
|
|
|
@dbs ||= Set.new
|
2014-08-18 23:04:58 -05:00
|
|
|
end
|
|
|
|
|
2019-02-18 20:55:53 -06:00
|
|
|
def pause!(value = "paused")
|
|
|
|
$redis.setex PAUSED_KEY, TTL, value
|
2018-07-09 20:53:25 -05:00
|
|
|
|
2014-08-18 23:04:58 -05:00
|
|
|
@mutex.synchronize do
|
2019-02-13 19:22:40 -06:00
|
|
|
extend_lease_thread
|
2014-08-18 23:04:58 -05:00
|
|
|
end
|
2014-02-12 22:27:04 -06:00
|
|
|
|
|
|
|
true
|
|
|
|
end
|
|
|
|
|
2014-08-18 23:04:58 -05:00
|
|
|
def paused?
|
2019-02-13 19:22:40 -06:00
|
|
|
!!$redis.get(PAUSED_KEY)
|
2014-02-12 22:27:04 -06:00
|
|
|
end
|
|
|
|
|
2019-02-13 20:33:58 -06:00
|
|
|
def unpause_all!
|
|
|
|
@mutex.synchronize do
|
|
|
|
@dbs = Set.new
|
|
|
|
stop_extend_lease_thread
|
|
|
|
end
|
|
|
|
|
|
|
|
RailsMultisite::ConnectionManagement.each_connection do
|
|
|
|
unpause! if paused?
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def paused_dbs
|
|
|
|
dbs = []
|
2019-02-18 20:55:53 -06:00
|
|
|
|
2019-02-13 20:33:58 -06:00
|
|
|
RailsMultisite::ConnectionManagement.each_connection do
|
|
|
|
dbs << RailsMultisite::ConnectionManagement.current_db if paused?
|
|
|
|
end
|
2019-02-18 20:55:53 -06:00
|
|
|
|
2019-02-13 20:33:58 -06:00
|
|
|
dbs
|
|
|
|
end
|
|
|
|
|
2014-08-18 23:04:58 -05:00
|
|
|
def unpause!
|
|
|
|
@mutex.synchronize do
|
2019-02-13 19:22:40 -06:00
|
|
|
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
|
2019-02-13 20:33:58 -06:00
|
|
|
stop_extend_lease_thread if @dbs.size == 0
|
2014-08-18 23:04:58 -05:00
|
|
|
end
|
|
|
|
|
2019-02-13 19:22:40 -06:00
|
|
|
$redis.del(PAUSED_KEY)
|
2014-02-12 22:27:04 -06:00
|
|
|
true
|
|
|
|
end
|
|
|
|
|
|
|
|
private
|
|
|
|
|
2019-02-13 20:33:58 -06:00
|
|
|
def stop_extend_lease_thread
|
|
|
|
# should always be called from a mutex
|
|
|
|
if t = @extend_lease_thread
|
|
|
|
@extend_lease_thread = nil
|
|
|
|
while t.alive?
|
2019-02-18 23:33:34 -06:00
|
|
|
begin
|
|
|
|
t.wakeup
|
|
|
|
rescue ThreadError => e
|
|
|
|
unless e.message =~ /killed thread/
|
|
|
|
raise e
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2019-02-13 20:33:58 -06:00
|
|
|
sleep 0
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-08-19 00:50:17 -05:00
|
|
|
def extend_lease_thread
|
2019-02-13 20:33:58 -06:00
|
|
|
# should always be called from a mutex
|
2019-02-13 19:22:40 -06:00
|
|
|
@dbs << RailsMultisite::ConnectionManagement.current_db
|
2019-02-18 20:55:53 -06:00
|
|
|
|
2019-02-13 19:22:40 -06:00
|
|
|
@extend_lease_thread ||= Thread.new do
|
2014-08-19 00:50:17 -05:00
|
|
|
while true do
|
2019-02-13 20:33:58 -06:00
|
|
|
break if !@extend_lease_thread
|
2014-08-18 23:04:58 -05:00
|
|
|
|
2019-02-13 19:22:40 -06:00
|
|
|
@dbs.each do |db|
|
|
|
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
2019-02-13 20:33:58 -06:00
|
|
|
if !$redis.expire(PAUSED_KEY, TTL)
|
|
|
|
# if it was unpaused in another process we got to remove the
|
|
|
|
# bad key
|
|
|
|
@dbs.delete(db)
|
|
|
|
end
|
2019-02-13 19:22:40 -06:00
|
|
|
end
|
|
|
|
end
|
2014-08-19 00:50:17 -05:00
|
|
|
|
2019-02-13 19:22:40 -06:00
|
|
|
sleep(Rails.env.test? ? 0.01 : TTL / 2)
|
|
|
|
end
|
|
|
|
end
|
2014-02-12 22:27:04 -06:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-08-18 23:04:58 -05:00
|
|
|
module Sidekiq
|
|
|
|
@pauser = SidekiqPauser.new
|
2019-02-18 20:55:53 -06:00
|
|
|
|
|
|
|
def self.pause!(key = nil)
|
|
|
|
key ? @pauser.pause!(key) : @pauser.pause!
|
2014-08-18 23:04:58 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def self.paused?
|
|
|
|
@pauser.paused?
|
|
|
|
end
|
|
|
|
|
|
|
|
def self.unpause!
|
|
|
|
@pauser.unpause!
|
|
|
|
end
|
2019-02-13 20:33:58 -06:00
|
|
|
|
|
|
|
def self.unpause_all!
|
|
|
|
@pauser.unpause_all!
|
|
|
|
end
|
|
|
|
|
|
|
|
def self.paused_dbs
|
|
|
|
@pauser.paused_dbs
|
|
|
|
end
|
2014-08-18 23:04:58 -05:00
|
|
|
end
|
|
|
|
|
2014-02-12 22:27:04 -06:00
|
|
|
# server middleware that will reschedule work whenever Sidekiq is paused
|
|
|
|
class Sidekiq::Pausable
|
|
|
|
|
|
|
|
def initialize(delay = 5.seconds)
|
|
|
|
@delay = delay
|
|
|
|
end
|
|
|
|
|
|
|
|
def call(worker, msg, queue)
|
2019-02-13 19:22:40 -06:00
|
|
|
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
|
2016-01-11 11:31:28 -06:00
|
|
|
worker.class.perform_in(@delay, *msg['args'])
|
2014-02-12 22:27:04 -06:00
|
|
|
else
|
2017-10-23 01:30:17 -05:00
|
|
|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
|
|
|
result = yield
|
|
|
|
duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
|
|
|
|
DiscourseEvent.trigger(:sidekiq_job_ran, worker, msg, queue, duration)
|
|
|
|
result
|
2014-02-12 22:27:04 -06:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2019-02-13 19:22:40 -06:00
|
|
|
private
|
|
|
|
|
|
|
|
def sidekiq_paused?(msg)
|
|
|
|
if site_id = msg["args"]&.first&.dig("current_site_id")
|
|
|
|
RailsMultisite::ConnectionManagement.with_connection(site_id) do
|
|
|
|
Sidekiq.paused?
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-02-12 22:27:04 -06:00
|
|
|
end
|