mirror of
https://github.com/discourse/discourse.git
synced 2025-02-25 18:55:32 -06:00
PERF: Don't allow a single user to monopolize the defer queue (#25593)
This commit is contained in:
parent
67229a7739
commit
30922855f2
@ -29,6 +29,7 @@ module Hijack
|
|||||||
Scheduler::Defer.later(
|
Scheduler::Defer.later(
|
||||||
"hijack #{params["controller"]} #{params["action"]} #{info}",
|
"hijack #{params["controller"]} #{params["action"]} #{info}",
|
||||||
force: false,
|
force: false,
|
||||||
|
current_user: current_user&.id,
|
||||||
&scheduled.method(:resolve)
|
&scheduled.method(:resolve)
|
||||||
)
|
)
|
||||||
rescue WorkQueue::WorkQueueFull
|
rescue WorkQueue::WorkQueueFull
|
||||||
|
@ -10,7 +10,9 @@ module Scheduler
|
|||||||
@async = !Rails.env.test?
|
@async = !Rails.env.test?
|
||||||
@queue =
|
@queue =
|
||||||
WorkQueue::ThreadSafeWrapper.new(
|
WorkQueue::ThreadSafeWrapper.new(
|
||||||
WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(100) },
|
WorkQueue::FairQueue.new(:site, 500) do
|
||||||
|
WorkQueue::FairQueue.new(:user, 100) { WorkQueue::BoundedQueue.new(50) }
|
||||||
|
end,
|
||||||
)
|
)
|
||||||
|
|
||||||
@mutex = Mutex.new
|
@mutex = Mutex.new
|
||||||
@ -48,7 +50,13 @@ module Scheduler
|
|||||||
@async = val
|
@async = val
|
||||||
end
|
end
|
||||||
|
|
||||||
def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk)
|
def later(
|
||||||
|
desc = nil,
|
||||||
|
db = RailsMultisite::ConnectionManagement.current_db,
|
||||||
|
force: true,
|
||||||
|
current_user: nil,
|
||||||
|
&blk
|
||||||
|
)
|
||||||
@stats_mutex.synchronize do
|
@stats_mutex.synchronize do
|
||||||
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
|
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
|
||||||
stats[:queued] += 1
|
stats[:queued] += 1
|
||||||
@ -56,7 +64,7 @@ module Scheduler
|
|||||||
|
|
||||||
if @async
|
if @async
|
||||||
start_thread if !@thread&.alive? && !@paused
|
start_thread if !@thread&.alive? && !@paused
|
||||||
@queue.push({ key: db, task: [db, blk, desc] }, force: force)
|
@queue.push({ site: db, user: current_user, db: db, job: blk, desc: desc }, force: force)
|
||||||
else
|
else
|
||||||
blk.call
|
blk.call
|
||||||
end
|
end
|
||||||
@ -93,7 +101,7 @@ module Scheduler
|
|||||||
|
|
||||||
# using non_block to match Ruby #deq
|
# using non_block to match Ruby #deq
|
||||||
def do_work(non_block = false)
|
def do_work(non_block = false)
|
||||||
db, job, desc = @queue.shift(block: !non_block)[:task]
|
db, job, desc = @queue.shift(block: !non_block).values_at(:db, :job, :desc)
|
||||||
|
|
||||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||||
db ||= RailsMultisite::ConnectionManagement::DEFAULT
|
db ||= RailsMultisite::ConnectionManagement::DEFAULT
|
||||||
|
@ -51,15 +51,16 @@ module WorkQueue
|
|||||||
class FairQueue
|
class FairQueue
|
||||||
attr_reader :size
|
attr_reader :size
|
||||||
|
|
||||||
def initialize(limit, &blk)
|
def initialize(key, limit, &blk)
|
||||||
@limit = limit
|
@limit = limit
|
||||||
@size = 0
|
@size = 0
|
||||||
|
@key = key
|
||||||
@elements = Hash.new { |h, k| h[k] = blk.call }
|
@elements = Hash.new { |h, k| h[k] = blk.call }
|
||||||
end
|
end
|
||||||
|
|
||||||
def push(task, force:)
|
def push(task, force:)
|
||||||
raise WorkQueueFull if !force && @size >= @limit
|
raise WorkQueueFull if !force && @size >= @limit
|
||||||
key, task = task.values_at(:key, :task)
|
key = task[@key]
|
||||||
@elements[key].push(task, force: force)
|
@elements[key].push(task, force: force)
|
||||||
@size += 1
|
@size += 1
|
||||||
nil
|
nil
|
||||||
@ -72,10 +73,8 @@ module WorkQueue
|
|||||||
task = queue.shift
|
task = queue.shift
|
||||||
|
|
||||||
@elements[key] = queue unless queue.empty?
|
@elements[key] = queue unless queue.empty?
|
||||||
|
|
||||||
@size -= 1
|
@size -= 1
|
||||||
|
task
|
||||||
{ key: key, task: task }
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -112,4 +112,40 @@ RSpec.describe Scheduler::Defer do
|
|||||||
|
|
||||||
expect(s).to eq("good")
|
expect(s).to eq("good")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "#later" do
|
||||||
|
let!(:ivar) { Concurrent::IVar.new }
|
||||||
|
let!(:responses) { Thread::Queue.new }
|
||||||
|
|
||||||
|
def later(db, current_user, request)
|
||||||
|
@defer.later(nil, db, current_user: current_user) do
|
||||||
|
ivar.value
|
||||||
|
responses.push([db, current_user, request])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it "runs jobs in a fair order" do
|
||||||
|
later("site1", 1, 1)
|
||||||
|
later("site1", 1, 2)
|
||||||
|
later("site1", 2, 3)
|
||||||
|
later("site2", 3, 4)
|
||||||
|
later("site2", 4, 5)
|
||||||
|
later("site2", 4, 6)
|
||||||
|
|
||||||
|
ivar.set(nil)
|
||||||
|
|
||||||
|
result = 6.times.map { responses.shift }
|
||||||
|
|
||||||
|
expect(result).to eq(
|
||||||
|
[
|
||||||
|
["site1", 1, 1],
|
||||||
|
["site2", 3, 4],
|
||||||
|
["site1", 2, 3],
|
||||||
|
["site2", 4, 5],
|
||||||
|
["site1", 1, 2],
|
||||||
|
["site2", 4, 6],
|
||||||
|
],
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
@ -74,7 +74,7 @@ end
|
|||||||
|
|
||||||
RSpec.describe WorkQueue::FairQueue do
|
RSpec.describe WorkQueue::FairQueue do
|
||||||
subject(:queue) do
|
subject(:queue) do
|
||||||
WorkQueue::FairQueue.new(global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
|
WorkQueue::FairQueue.new(:key, global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
|
||||||
end
|
end
|
||||||
|
|
||||||
let(:global_limit) { 5 }
|
let(:global_limit) { 5 }
|
||||||
|
Loading…
Reference in New Issue
Block a user