mirror of
https://github.com/discourse/discourse.git
synced 2025-02-25 18:55:32 -06:00
FIX: Add multisite support to Sidekiq::Pausable. (#6960)
Having a global Sidekiq pause switch is problematic because a site in the cluster can pause Sidekiq for the entire cluster.
This commit is contained in:
@@ -1,15 +1,19 @@
|
|||||||
require 'thread'
|
require 'thread'
|
||||||
|
|
||||||
class SidekiqPauser
|
class SidekiqPauser
|
||||||
|
TTL = 60
|
||||||
|
PAUSED_KEY = "sidekiq_is_paused_v2"
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
|
@dbs ||= Set.new
|
||||||
end
|
end
|
||||||
|
|
||||||
def pause!
|
def pause!
|
||||||
redis.setex paused_key, 60, "paused"
|
$redis.setex PAUSED_KEY, TTL, "paused"
|
||||||
|
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@extend_lease_thread ||= extend_lease_thread
|
extend_lease_thread
|
||||||
sleep 0.001 while !paused?
|
sleep 0.001 while !paused?
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -17,38 +21,38 @@ class SidekiqPauser
|
|||||||
end
|
end
|
||||||
|
|
||||||
def paused?
|
def paused?
|
||||||
!!redis.get(paused_key)
|
!!$redis.get(PAUSED_KEY)
|
||||||
end
|
end
|
||||||
|
|
||||||
def unpause!
|
def unpause!
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@extend_lease_thread = nil
|
@dbs.delete(RailsMultisite::ConnectionManagement.current_db)
|
||||||
|
@extend_lease_thread = nil if @dbs.size == 0
|
||||||
end
|
end
|
||||||
|
|
||||||
redis.del(paused_key)
|
$redis.del(PAUSED_KEY)
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def extend_lease_thread
|
def extend_lease_thread
|
||||||
Thread.new do
|
@dbs << RailsMultisite::ConnectionManagement.current_db
|
||||||
|
|
||||||
|
@extend_lease_thread ||= Thread.new do
|
||||||
while true do
|
while true do
|
||||||
break unless @mutex.synchronize { @extend_lease_thread }
|
break unless @mutex.synchronize { @extend_lease_thread }
|
||||||
redis.expire paused_key, 60
|
|
||||||
sleep(Rails.env.test? ? 0.01 : 30)
|
@dbs.each do |db|
|
||||||
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
||||||
|
$redis.expire PAUSED_KEY, TTL
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep(Rails.env.test? ? 0.01 : TTL / 2)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def redis
|
|
||||||
$redis.without_namespace
|
|
||||||
end
|
|
||||||
|
|
||||||
def paused_key
|
|
||||||
"sidekiq_is_paused_v2"
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
@@ -74,7 +78,7 @@ class Sidekiq::Pausable
|
|||||||
end
|
end
|
||||||
|
|
||||||
def call(worker, msg, queue)
|
def call(worker, msg, queue)
|
||||||
if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker)
|
if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker)
|
||||||
worker.class.perform_in(@delay, *msg['args'])
|
worker.class.perform_in(@delay, *msg['args'])
|
||||||
else
|
else
|
||||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||||
@@ -85,4 +89,14 @@ class Sidekiq::Pausable
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def sidekiq_paused?(msg)
|
||||||
|
if site_id = msg["args"]&.first&.dig("current_site_id")
|
||||||
|
RailsMultisite::ConnectionManagement.with_connection(site_id) do
|
||||||
|
Sidekiq.paused?
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|||||||
@@ -1,36 +0,0 @@
|
|||||||
require 'rails_helper'
|
|
||||||
require_dependency 'sidekiq/pausable'
|
|
||||||
|
|
||||||
describe Sidekiq do
|
|
||||||
after do
|
|
||||||
Sidekiq.unpause!
|
|
||||||
end
|
|
||||||
|
|
||||||
it "can pause and unpause" do
|
|
||||||
Sidekiq.pause!
|
|
||||||
expect(Sidekiq.paused?).to eq(true)
|
|
||||||
Sidekiq.unpause!
|
|
||||||
expect(Sidekiq.paused?).to eq(false)
|
|
||||||
end
|
|
||||||
|
|
||||||
it "can still run heartbeats when paused" do
|
|
||||||
Sidekiq.pause!
|
|
||||||
|
|
||||||
freeze_time 1.week.from_now
|
|
||||||
|
|
||||||
jobs = Sidekiq::ScheduledSet.new
|
|
||||||
|
|
||||||
Sidekiq::Testing.disable! do
|
|
||||||
jobs.clear
|
|
||||||
|
|
||||||
middleware = Sidekiq::Pausable.new
|
|
||||||
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
|
|
||||||
"done"
|
|
||||||
end
|
|
||||||
|
|
||||||
jobs = Sidekiq::ScheduledSet.new
|
|
||||||
expect(jobs.size).to eq(0)
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
end
|
|
||||||
89
spec/multisite/pausable_spec.rb
Normal file
89
spec/multisite/pausable_spec.rb
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
require 'rails_helper'
|
||||||
|
require_dependency 'sidekiq/pausable'
|
||||||
|
|
||||||
|
RSpec.describe "Pausing/Unpausing Sidekiq", type: :multisite do
|
||||||
|
after do
|
||||||
|
$redis.flushall
|
||||||
|
end
|
||||||
|
|
||||||
|
describe '#pause!, #unpause! and #paused?' do
|
||||||
|
it "can pause and unpause" do
|
||||||
|
Sidekiq.pause!
|
||||||
|
expect(Sidekiq.paused?).to eq(true)
|
||||||
|
|
||||||
|
test_multisite_connection('second') do
|
||||||
|
expect(Sidekiq.paused?).to eq(false)
|
||||||
|
end
|
||||||
|
|
||||||
|
Sidekiq.unpause!
|
||||||
|
|
||||||
|
expect(Sidekiq.paused?).to eq(false)
|
||||||
|
|
||||||
|
test_multisite_connection('second') do
|
||||||
|
Sidekiq.pause!
|
||||||
|
expect(Sidekiq.paused?).to eq(true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
RSpec.describe Sidekiq::Pausable do
|
||||||
|
after do
|
||||||
|
$redis.flushall
|
||||||
|
end
|
||||||
|
|
||||||
|
it "can still run heartbeats when paused" do
|
||||||
|
Sidekiq.pause!
|
||||||
|
|
||||||
|
freeze_time 1.week.from_now
|
||||||
|
|
||||||
|
jobs = Sidekiq::ScheduledSet.new
|
||||||
|
jobs.clear
|
||||||
|
middleware = Sidekiq::Pausable.new
|
||||||
|
|
||||||
|
middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do
|
||||||
|
"done"
|
||||||
|
end
|
||||||
|
|
||||||
|
jobs = Sidekiq::ScheduledSet.new
|
||||||
|
expect(jobs.size).to eq(0)
|
||||||
|
end
|
||||||
|
|
||||||
|
describe 'when sidekiq is paused', type: :multisite do
|
||||||
|
let(:middleware) { Sidekiq::Pausable.new }
|
||||||
|
|
||||||
|
def call_middleware(db = RailsMultisite::ConnectionManagement::DEFAULT)
|
||||||
|
middleware.call(Jobs::PostAlert.new, {
|
||||||
|
"args" => [{ "current_site_id" => db }]
|
||||||
|
}, "critical") do
|
||||||
|
yield
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'should delay the job' do
|
||||||
|
Sidekiq.pause!
|
||||||
|
|
||||||
|
called = false
|
||||||
|
called2 = false
|
||||||
|
call_middleware { called = true }
|
||||||
|
|
||||||
|
expect(called).to eq(false)
|
||||||
|
|
||||||
|
test_multisite_connection('second') do
|
||||||
|
call_middleware('second') { called2 = true }
|
||||||
|
expect(called2).to eq(true)
|
||||||
|
end
|
||||||
|
|
||||||
|
Sidekiq.unpause!
|
||||||
|
call_middleware { called = true }
|
||||||
|
|
||||||
|
expect(called).to eq(true)
|
||||||
|
|
||||||
|
test_multisite_connection('second') do
|
||||||
|
Sidekiq.pause!
|
||||||
|
call_middleware('second') { called2 = false }
|
||||||
|
expect(called2).to eq(true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
Reference in New Issue
Block a user