FIX: Add multisite support to Sidekiq::Pausable. (#6960)

Having a global Sidekiq pause switch is problematic because a site in
the cluster can pause Sidekiq for the entire cluster.
This commit is contained in:
Guo Xiang Tan
2019-02-14 10:22:40 +09:00
committed by Sam
parent ba2fb2024f
commit 53d592ad3b
3 changed files with 121 additions and 54 deletions

View File

@@ -1,15 +1,19 @@
require 'thread' require 'thread'
class SidekiqPauser class SidekiqPauser
TTL = 60
PAUSED_KEY = "sidekiq_is_paused_v2"
def initialize def initialize
@mutex = Mutex.new @mutex = Mutex.new
@dbs ||= Set.new
end end
def pause! def pause!
redis.setex paused_key, 60, "paused" $redis.setex PAUSED_KEY, TTL, "paused"
@mutex.synchronize do @mutex.synchronize do
@extend_lease_thread ||= extend_lease_thread extend_lease_thread
sleep 0.001 while !paused? sleep 0.001 while !paused?
end end
@@ -17,38 +21,38 @@ class SidekiqPauser
end end
def paused? def paused?
!!redis.get(paused_key) !!$redis.get(PAUSED_KEY)
end end
def unpause! def unpause!
@mutex.synchronize do @mutex.synchronize do
@extend_lease_thread = nil @dbs.delete(RailsMultisite::ConnectionManagement.current_db)
@extend_lease_thread = nil if @dbs.size == 0
end end
redis.del(paused_key) $redis.del(PAUSED_KEY)
true true
end end
private private
def extend_lease_thread def extend_lease_thread
Thread.new do @dbs << RailsMultisite::ConnectionManagement.current_db
@extend_lease_thread ||= Thread.new do
while true do while true do
break unless @mutex.synchronize { @extend_lease_thread } break unless @mutex.synchronize { @extend_lease_thread }
redis.expire paused_key, 60
sleep(Rails.env.test? ? 0.01 : 30) @dbs.each do |db|
RailsMultisite::ConnectionManagement.with_connection(db) do
$redis.expire PAUSED_KEY, TTL
end
end
sleep(Rails.env.test? ? 0.01 : TTL / 2)
end end
end end
end end
def redis
$redis.without_namespace
end
def paused_key
"sidekiq_is_paused_v2"
end
end end
module Sidekiq module Sidekiq
@@ -74,7 +78,7 @@ class Sidekiq::Pausable
end end
def call(worker, msg, queue) def call(worker, msg, queue)
if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker) if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
worker.class.perform_in(@delay, *msg['args']) worker.class.perform_in(@delay, *msg['args'])
else else
start = Process.clock_gettime(Process::CLOCK_MONOTONIC) start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@@ -85,4 +89,14 @@ class Sidekiq::Pausable
end end
end end
private
def sidekiq_paused?(msg)
if site_id = msg["args"]&.first&.dig("current_site_id")
RailsMultisite::ConnectionManagement.with_connection(site_id) do
Sidekiq.paused?
end
end
end
end end

View File

@@ -1,36 +0,0 @@
require 'rails_helper'
require_dependency 'sidekiq/pausable'
describe Sidekiq do
after do
Sidekiq.unpause!
end
it "can pause and unpause" do
Sidekiq.pause!
expect(Sidekiq.paused?).to eq(true)
Sidekiq.unpause!
expect(Sidekiq.paused?).to eq(false)
end
it "can still run heartbeats when paused" do
Sidekiq.pause!
freeze_time 1.week.from_now
jobs = Sidekiq::ScheduledSet.new
Sidekiq::Testing.disable! do
jobs.clear
middleware = Sidekiq::Pausable.new
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
"done"
end
jobs = Sidekiq::ScheduledSet.new
expect(jobs.size).to eq(0)
end
end
end

View File

@@ -0,0 +1,89 @@
require 'rails_helper'
require_dependency 'sidekiq/pausable'
RSpec.describe "Pausing/Unpausing Sidekiq", type: :multisite do
after do
$redis.flushall
end
describe '#pause!, #unpause! and #paused?' do
it "can pause and unpause" do
Sidekiq.pause!
expect(Sidekiq.paused?).to eq(true)
test_multisite_connection('second') do
expect(Sidekiq.paused?).to eq(false)
end
Sidekiq.unpause!
expect(Sidekiq.paused?).to eq(false)
test_multisite_connection('second') do
Sidekiq.pause!
expect(Sidekiq.paused?).to eq(true)
end
end
end
end
RSpec.describe Sidekiq::Pausable do
after do
$redis.flushall
end
it "can still run heartbeats when paused" do
Sidekiq.pause!
freeze_time 1.week.from_now
jobs = Sidekiq::ScheduledSet.new
jobs.clear
middleware = Sidekiq::Pausable.new
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
"done"
end
jobs = Sidekiq::ScheduledSet.new
expect(jobs.size).to eq(0)
end
describe 'when sidekiq is paused', type: :multisite do
let(:middleware) { Sidekiq::Pausable.new }
def call_middleware(db = RailsMultisite::ConnectionManagement::DEFAULT)
middleware.call(Jobs::PostAlert.new, {
"args" => [{ "current_site_id" => db }]
}, "critical") do
yield
end
end
it 'should delay the job' do
Sidekiq.pause!
called = false
called2 = false
call_middleware { called = true }
expect(called).to eq(false)
test_multisite_connection('second') do
call_middleware('second') { called2 = true }
expect(called2).to eq(true)
end
Sidekiq.unpause!
call_middleware { called = true }
expect(called).to eq(true)
test_multisite_connection('second') do
Sidekiq.pause!
call_middleware('second') { called2 = false }
expect(called2).to eq(true)
end
end
end
end