mirror of
https://github.com/discourse/discourse.git
synced 2024-11-21 16:38:15 -06:00
DEV: Add ability to log a warning when Sidekiq job runs for too long (#27446)
This commits introduces the `sidekiq_report_long_running_jobs_minutes` global setting which allows a site administrator to log a warning in the Rails log when a Sidekiq job has been running for too long. The warning is logged with the backtrace of the thread that is processing the Sidekiq job to make it easier to figure out what a sidekiq job is stuck on.
This commit is contained in:
parent
8058344f8e
commit
ae1d8c50da
@ -190,6 +190,9 @@ serve_static_assets = false
|
|||||||
# number of sidekiq workers (launched via unicorn master)
|
# number of sidekiq workers (launched via unicorn master)
|
||||||
sidekiq_workers = 5
|
sidekiq_workers = 5
|
||||||
|
|
||||||
|
# Logs Sidekiq jobs that have been running for longer than the configured number of minutes to the Rails log
|
||||||
|
sidekiq_report_long_running_jobs_minutes =
|
||||||
|
|
||||||
# connection reaping helps keep connection counts down, postgres
|
# connection reaping helps keep connection counts down, postgres
|
||||||
# will not work properly with huge numbers of open connections
|
# will not work properly with huge numbers of open connections
|
||||||
# reap connections from pool that are older than 30 seconds
|
# reap connections from pool that are older than 30 seconds
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
require "sidekiq/pausable"
|
require "sidekiq/pausable"
|
||||||
require "sidekiq_logster_reporter"
|
require "sidekiq_logster_reporter"
|
||||||
|
require "sidekiq_long_running_job_logger"
|
||||||
|
|
||||||
Sidekiq.configure_client { |config| config.redis = Discourse.sidekiq_redis_config }
|
Sidekiq.configure_client { |config| config.redis = Discourse.sidekiq_redis_config }
|
||||||
|
|
||||||
@ -9,6 +10,10 @@ Sidekiq.configure_server do |config|
|
|||||||
config.redis = Discourse.sidekiq_redis_config
|
config.redis = Discourse.sidekiq_redis_config
|
||||||
|
|
||||||
config.server_middleware { |chain| chain.add Sidekiq::Pausable }
|
config.server_middleware { |chain| chain.add Sidekiq::Pausable }
|
||||||
|
|
||||||
|
if stuck_sidekiq_job_minutes = GlobalSetting.sidekiq_report_long_running_jobs_minutes
|
||||||
|
config.on(:startup) { SidekiqLongRunningJobLogger.new(stuck_sidekiq_job_minutes:).start }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if Sidekiq.server?
|
if Sidekiq.server?
|
||||||
|
81
lib/sidekiq_long_running_job_logger.rb
Normal file
81
lib/sidekiq_long_running_job_logger.rb
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
class SidekiqLongRunningJobLogger
|
||||||
|
attr_reader :thread
|
||||||
|
|
||||||
|
def initialize(stuck_sidekiq_job_minutes:)
|
||||||
|
@mutex = Mutex.new
|
||||||
|
@stop_requested = false
|
||||||
|
|
||||||
|
# Assume that setting the value of `stuck_sidekiq_job_minutes` lower than 0 is a mistake and set it to 1. This makes
|
||||||
|
# the code in this class easier to reason about.
|
||||||
|
@stuck_sidekiq_job_minutes = stuck_sidekiq_job_minutes <= 0 ? 1 : stuck_sidekiq_job_minutes.ceil
|
||||||
|
end
|
||||||
|
|
||||||
|
def start
|
||||||
|
@thread ||=
|
||||||
|
begin
|
||||||
|
hostname = Discourse.os_hostname
|
||||||
|
seconds_to_sleep_between_checks = (@stuck_sidekiq_job_minutes * 60) / 2
|
||||||
|
|
||||||
|
Thread.new do
|
||||||
|
loop do
|
||||||
|
break if self.stop_requested?
|
||||||
|
|
||||||
|
begin
|
||||||
|
current_long_running_jobs = Set.new
|
||||||
|
|
||||||
|
Sidekiq::Workers.new.each do |process_id, thread_id, work|
|
||||||
|
next unless process_id.start_with?(hostname)
|
||||||
|
|
||||||
|
if Time.at(work["run_at"]).to_i >=
|
||||||
|
(Time.now - (60 * @stuck_sidekiq_job_minutes)).to_i
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
|
jid = work.dig("payload", "jid")
|
||||||
|
current_long_running_jobs << jid
|
||||||
|
|
||||||
|
next if @seen_long_running_jobs&.include?(jid)
|
||||||
|
|
||||||
|
if thread = Thread.list.find { |t| t["sidekiq_tid"] == thread_id }
|
||||||
|
Rails.logger.warn(<<~MSG)
|
||||||
|
Sidekiq job `#{work.dig("payload", "class")}` has been running for more than #{@stuck_sidekiq_job_minutes} minutes
|
||||||
|
#{thread.backtrace.join("\n")}
|
||||||
|
MSG
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@seen_long_running_jobs = current_long_running_jobs
|
||||||
|
|
||||||
|
yield if block_given?
|
||||||
|
rescue => error
|
||||||
|
Discourse.warn_exception(
|
||||||
|
error,
|
||||||
|
message: "Unexpected error in SidekiqLongRunningJobChecker thread",
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
sleep seconds_to_sleep_between_checks
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process.
|
||||||
|
def stop
|
||||||
|
@mutex.synchronize { @stop_requested = true }
|
||||||
|
|
||||||
|
if @thread
|
||||||
|
@thread.wakeup
|
||||||
|
@thread.join
|
||||||
|
@thread = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def stop_requested?
|
||||||
|
@mutex.synchronize { @stop_requested }
|
||||||
|
end
|
||||||
|
end
|
80
spec/lib/sidekiq_long_running_job_logger_spec.rb
Normal file
80
spec/lib/sidekiq_long_running_job_logger_spec.rb
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require "sidekiq_long_running_job_logger"
|
||||||
|
|
||||||
|
RSpec.describe SidekiqLongRunningJobLogger do
|
||||||
|
before do
|
||||||
|
@orig_logger = Rails.logger
|
||||||
|
Rails.logger = @fake_logger = FakeLogger.new
|
||||||
|
end
|
||||||
|
|
||||||
|
after { Rails.logger = @orig_logger }
|
||||||
|
|
||||||
|
it "logs long-running jobs" do
|
||||||
|
hostname = Discourse.os_hostname
|
||||||
|
stuck_sidekiq_job_minutes = 10
|
||||||
|
|
||||||
|
Sidekiq::Workers
|
||||||
|
.expects(:new)
|
||||||
|
.returns(
|
||||||
|
[
|
||||||
|
[
|
||||||
|
"#{hostname}:1234",
|
||||||
|
"some_sidekiq_id",
|
||||||
|
{
|
||||||
|
"run_at" => (Time.now - (60 * (stuck_sidekiq_job_minutes + 1))).to_i,
|
||||||
|
"payload" => {
|
||||||
|
"jid" => "job_1",
|
||||||
|
"class" => "TestWorker",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
[
|
||||||
|
"#{hostname}:1234",
|
||||||
|
"some_other_sidekiq_id",
|
||||||
|
{
|
||||||
|
"run_at" => Time.now.to_i,
|
||||||
|
"payload" => {
|
||||||
|
"jid" => "job_2",
|
||||||
|
"class" => "AnotherWorker",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
],
|
||||||
|
)
|
||||||
|
.twice
|
||||||
|
|
||||||
|
thread = mock("Thread")
|
||||||
|
thread.expects(:[]).with("sidekiq_tid").returns("some_sidekiq_id").once
|
||||||
|
thread.expects(:backtrace).returns(%w[line lines]).once
|
||||||
|
Thread.expects(:list).returns([thread]).once
|
||||||
|
|
||||||
|
begin
|
||||||
|
checker = described_class.new(stuck_sidekiq_job_minutes:)
|
||||||
|
|
||||||
|
loops = 0
|
||||||
|
|
||||||
|
checker.start { loops += 1 }
|
||||||
|
|
||||||
|
wait_for { loops == 1 }
|
||||||
|
|
||||||
|
expect(@fake_logger.warnings.size).to eq(1)
|
||||||
|
|
||||||
|
expect(@fake_logger.warnings).to include(
|
||||||
|
"Sidekiq job `TestWorker` has been running for more than 10 minutes\nline\nlines\n",
|
||||||
|
)
|
||||||
|
|
||||||
|
checker.thread.wakeup # Force the thread to run the next loop
|
||||||
|
|
||||||
|
wait_for { loops == 2 }
|
||||||
|
|
||||||
|
expect(@fake_logger.warnings.size).to eq(1)
|
||||||
|
|
||||||
|
expect(@fake_logger.warnings).to include(
|
||||||
|
"Sidekiq job `TestWorker` has been running for more than 10 minutes\nline\nlines\n",
|
||||||
|
)
|
||||||
|
ensure
|
||||||
|
checker.stop
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue
Block a user