From ae1d8c50da9906646de7b8ea3c18939ce50e024e Mon Sep 17 00:00:00 2001 From: Alan Guo Xiang Tan Date: Thu, 13 Jun 2024 14:24:44 +0800 Subject: [PATCH] DEV: Add ability to log a warning when Sidekiq job runs for too long (#27446) This commits introduces the `sidekiq_report_long_running_jobs_minutes` global setting which allows a site administrator to log a warning in the Rails log when a Sidekiq job has been running for too long. The warning is logged with the backtrace of the thread that is processing the Sidekiq job to make it easier to figure out what a sidekiq job is stuck on. --- config/discourse_defaults.conf | 3 + config/initializers/100-sidekiq.rb | 5 ++ lib/sidekiq_long_running_job_logger.rb | 81 +++++++++++++++++++ .../sidekiq_long_running_job_logger_spec.rb | 80 ++++++++++++++++++ 4 files changed, 169 insertions(+) create mode 100644 lib/sidekiq_long_running_job_logger.rb create mode 100644 spec/lib/sidekiq_long_running_job_logger_spec.rb diff --git a/config/discourse_defaults.conf b/config/discourse_defaults.conf index 51e7d5aed07..e8ad28e2c58 100644 --- a/config/discourse_defaults.conf +++ b/config/discourse_defaults.conf @@ -190,6 +190,9 @@ serve_static_assets = false # number of sidekiq workers (launched via unicorn master) sidekiq_workers = 5 +# Logs Sidekiq jobs that have been running for longer than the configured number of minutes to the Rails log +sidekiq_report_long_running_jobs_minutes = + # connection reaping helps keep connection counts down, postgres # will not work properly with huge numbers of open connections # reap connections from pool that are older than 30 seconds diff --git a/config/initializers/100-sidekiq.rb b/config/initializers/100-sidekiq.rb index 3ce1927722f..6048203e6c0 100644 --- a/config/initializers/100-sidekiq.rb +++ b/config/initializers/100-sidekiq.rb @@ -2,6 +2,7 @@ require "sidekiq/pausable" require "sidekiq_logster_reporter" +require "sidekiq_long_running_job_logger" Sidekiq.configure_client { |config| config.redis = Discourse.sidekiq_redis_config } @@ -9,6 +10,10 @@ Sidekiq.configure_server do |config| config.redis = Discourse.sidekiq_redis_config config.server_middleware { |chain| chain.add Sidekiq::Pausable } + + if stuck_sidekiq_job_minutes = GlobalSetting.sidekiq_report_long_running_jobs_minutes + config.on(:startup) { SidekiqLongRunningJobLogger.new(stuck_sidekiq_job_minutes:).start } + end end if Sidekiq.server? diff --git a/lib/sidekiq_long_running_job_logger.rb b/lib/sidekiq_long_running_job_logger.rb new file mode 100644 index 00000000000..e6152d0c137 --- /dev/null +++ b/lib/sidekiq_long_running_job_logger.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +class SidekiqLongRunningJobLogger + attr_reader :thread + + def initialize(stuck_sidekiq_job_minutes:) + @mutex = Mutex.new + @stop_requested = false + + # Assume that setting the value of `stuck_sidekiq_job_minutes` lower than 0 is a mistake and set it to 1. This makes + # the code in this class easier to reason about. + @stuck_sidekiq_job_minutes = stuck_sidekiq_job_minutes <= 0 ? 1 : stuck_sidekiq_job_minutes.ceil + end + + def start + @thread ||= + begin + hostname = Discourse.os_hostname + seconds_to_sleep_between_checks = (@stuck_sidekiq_job_minutes * 60) / 2 + + Thread.new do + loop do + break if self.stop_requested? + + begin + current_long_running_jobs = Set.new + + Sidekiq::Workers.new.each do |process_id, thread_id, work| + next unless process_id.start_with?(hostname) + + if Time.at(work["run_at"]).to_i >= + (Time.now - (60 * @stuck_sidekiq_job_minutes)).to_i + next + end + + jid = work.dig("payload", "jid") + current_long_running_jobs << jid + + next if @seen_long_running_jobs&.include?(jid) + + if thread = Thread.list.find { |t| t["sidekiq_tid"] == thread_id } + Rails.logger.warn(<<~MSG) + Sidekiq job `#{work.dig("payload", "class")}` has been running for more than #{@stuck_sidekiq_job_minutes} minutes + #{thread.backtrace.join("\n")} + MSG + end + end + + @seen_long_running_jobs = current_long_running_jobs + + yield if block_given? + rescue => error + Discourse.warn_exception( + error, + message: "Unexpected error in SidekiqLongRunningJobChecker thread", + ) + end + + sleep seconds_to_sleep_between_checks + end + end + end + end + + # Used for testing to stop the thread. In production, the thread is expected to live for the lifetime of the process. + def stop + @mutex.synchronize { @stop_requested = true } + + if @thread + @thread.wakeup + @thread.join + @thread = nil + end + end + + private + + def stop_requested? + @mutex.synchronize { @stop_requested } + end +end diff --git a/spec/lib/sidekiq_long_running_job_logger_spec.rb b/spec/lib/sidekiq_long_running_job_logger_spec.rb new file mode 100644 index 00000000000..2b6486a9742 --- /dev/null +++ b/spec/lib/sidekiq_long_running_job_logger_spec.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "sidekiq_long_running_job_logger" + +RSpec.describe SidekiqLongRunningJobLogger do + before do + @orig_logger = Rails.logger + Rails.logger = @fake_logger = FakeLogger.new + end + + after { Rails.logger = @orig_logger } + + it "logs long-running jobs" do + hostname = Discourse.os_hostname + stuck_sidekiq_job_minutes = 10 + + Sidekiq::Workers + .expects(:new) + .returns( + [ + [ + "#{hostname}:1234", + "some_sidekiq_id", + { + "run_at" => (Time.now - (60 * (stuck_sidekiq_job_minutes + 1))).to_i, + "payload" => { + "jid" => "job_1", + "class" => "TestWorker", + }, + }, + ], + [ + "#{hostname}:1234", + "some_other_sidekiq_id", + { + "run_at" => Time.now.to_i, + "payload" => { + "jid" => "job_2", + "class" => "AnotherWorker", + }, + }, + ], + ], + ) + .twice + + thread = mock("Thread") + thread.expects(:[]).with("sidekiq_tid").returns("some_sidekiq_id").once + thread.expects(:backtrace).returns(%w[line lines]).once + Thread.expects(:list).returns([thread]).once + + begin + checker = described_class.new(stuck_sidekiq_job_minutes:) + + loops = 0 + + checker.start { loops += 1 } + + wait_for { loops == 1 } + + expect(@fake_logger.warnings.size).to eq(1) + + expect(@fake_logger.warnings).to include( + "Sidekiq job `TestWorker` has been running for more than 10 minutes\nline\nlines\n", + ) + + checker.thread.wakeup # Force the thread to run the next loop + + wait_for { loops == 2 } + + expect(@fake_logger.warnings.size).to eq(1) + + expect(@fake_logger.warnings).to include( + "Sidekiq job `TestWorker` has been running for more than 10 minutes\nline\nlines\n", + ) + ensure + checker.stop + end + end +end