SECURITY: Don't allow a particular site to monopolize the defer queue

This commit is contained in:
Daniel Waterworth 2023-07-28 12:53:51 +01:00 committed by David Taylor
parent 0736611423
commit 26e267478d
No known key found for this signature in database
GPG Key ID: 46904C18B1D3F434
5 changed files with 361 additions and 8 deletions

View File

@ -23,13 +23,25 @@ module Hijack
transfer_timings = MethodProfiler.transfer
io = hijack.call
scheduled = Concurrent::Promises.resolvable_event
begin
Scheduler::Defer.later(
"hijack #{params["controller"]} #{params["action"]} #{info}",
force: false,
&scheduled.method(:resolve)
)
rescue WorkQueue::WorkQueueFull
return render plain: "", status: 503
end
# duplicate headers so other middleware does not mess with it
# on the way down the stack
original_headers = response.headers.dup
Scheduler::Defer.later("hijack #{params["controller"]} #{params["action"]} #{info}") do
io = hijack.call
scheduled.on_resolution! do
MethodProfiler.start(transfer_timings)
begin
Thread.current[Logster::Logger::LOGSTER_ENV] = env
@ -148,6 +160,7 @@ module Hijack
tempfiles&.each(&:close!)
end
end
# not leaked out, we use 418 ... I am a teapot to denote that we are hijacked
render plain: "", status: 418
else

View File

@ -8,7 +8,11 @@ module Scheduler
def initialize
@async = !Rails.env.test?
@queue = Queue.new
@queue =
WorkQueue::ThreadSafeWrapper.new(
WorkQueue::FairQueue.new(500) { WorkQueue::BoundedQueue.new(10) },
)
@mutex = Mutex.new
@stats_mutex = Mutex.new
@paused = false
@ -23,7 +27,7 @@ module Scheduler
end
def length
@queue.length
@queue.size
end
def stats
@ -44,7 +48,7 @@ module Scheduler
@async = val
end
def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, &blk)
def later(desc = nil, db = RailsMultisite::ConnectionManagement.current_db, force: true, &blk)
@stats_mutex.synchronize do
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
stats[:queued] += 1
@ -52,7 +56,7 @@ module Scheduler
if @async
start_thread if !@thread&.alive? && !@paused
@queue << [db, blk, desc]
@queue.push({ key: db, task: [db, blk, desc] }, force: force)
else
blk.call
end
@ -71,7 +75,7 @@ module Scheduler
end
def do_all_work
do_work(_non_block = true) while !@queue.empty?
do_work(non_block = true) while !@queue.empty?
end
private
@ -89,7 +93,8 @@ module Scheduler
# using non_block to match Ruby #deq
def do_work(non_block = false)
db, job, desc = @queue.deq(non_block)
db, job, desc = @queue.shift(block: !non_block)[:task]
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
db ||= RailsMultisite::ConnectionManagement::DEFAULT

111
lib/work_queue.rb Normal file
View File

@ -0,0 +1,111 @@
# frozen_string_literal: true
require "monitor"
module WorkQueue
class WorkQueueFull < StandardError
end
class ThreadSafeWrapper
include MonitorMixin
def initialize(queue)
mon_initialize
@queue = queue
@has_items = new_cond
end
def push(task, force:)
synchronize do
previously_empty = @queue.empty?
@queue.push(task, force: force)
@has_items.signal if previously_empty
end
end
def shift(block:)
synchronize do
loop do
if task = @queue.shift
break task
elsif block
@has_items.wait
else
break nil
end
end
end
end
def empty?
synchronize { @queue.empty? }
end
def size
synchronize { @queue.size }
end
end
class FairQueue
attr_reader :size
def initialize(limit, &blk)
@limit = limit
@size = 0
@elements = Hash.new { |h, k| h[k] = blk.call }
end
def push(task, force:)
raise WorkQueueFull if !force && @size >= @limit
key, task = task.values_at(:key, :task)
@elements[key].push(task, force: force)
@size += 1
nil
end
def shift
unless @elements.empty?
key, queue = @elements.shift
task = queue.shift
@elements[key] = queue unless queue.empty?
@size -= 1
{ key: key, task: task }
end
end
def empty?
@elements.empty?
end
end
class BoundedQueue
def initialize(limit)
@limit = limit
@elements = []
end
def push(task, force:)
raise WorkQueueFull if !force && @elements.size >= @limit
@elements << task
nil
end
def shift
@elements.shift
end
def empty?
@elements.empty?
end
def size
@elements.size
end
end
end

View File

@ -225,4 +225,12 @@ RSpec.describe Hijack do
expect(ran).to eq(false)
end
it "handles the queue being full" do
Scheduler::Defer.stubs(:later).raises(WorkQueue::WorkQueueFull.new)
tester.hijack_test {}
expect(tester.response.status).to eq(503)
end
end

216
spec/lib/work_queue_spec.rb Normal file
View File

@ -0,0 +1,216 @@
# frozen_string_literal: true
RSpec.describe WorkQueue::BoundedQueue do
subject(:queue) { WorkQueue::BoundedQueue.new(3) }
let(:task1) { "Task 1" }
let(:task2) { "Task 2" }
let(:task3) { "Task 3" }
let(:task4) { "Task 4" }
describe "#push" do
context "when the queue is not full" do
it "adds the task to the queue" do
queue.push(task1, force: false)
expect(queue.size).to eq(1)
end
end
context "when the queue is full" do
before do
queue.push(task1, force: false)
queue.push(task2, force: false)
queue.push(task3, force: false)
end
it "adds the task to the queue if force parameter is true" do
expect { queue.push(task4, force: true) }.not_to raise_error
expect(queue.size).to eq(4)
end
it "raises an error if the force parameter is false" do
expect { queue.push(task4, force: false) }.to raise_error(WorkQueue::WorkQueueFull)
end
end
end
describe "#shift" do
it "removes and returns the first task from the queue" do
queue.push(task1, force: false)
queue.push(task2, force: false)
expect(queue.shift).to eq(task1)
expect(queue.shift).to eq(task2)
expect(queue.size).to eq(0)
expect(queue).to be_empty
end
it "returns nil when the queue is empty" do
shifted_task = queue.shift
expect(shifted_task).to be_nil
end
end
describe "#empty?" do
it "returns true if the queue is empty" do
expect(queue).to be_empty
end
it "returns false if the queue is not empty" do
queue.push(task1, force: false)
expect(queue).not_to be_empty
end
end
describe "#size" do
it "returns the number of tasks in the queue" do
queue.push(task1, force: false)
queue.push(task2, force: false)
expect(queue.size).to eq(2)
end
end
end
RSpec.describe WorkQueue::FairQueue do
subject(:queue) do
WorkQueue::FairQueue.new(global_limit) { WorkQueue::BoundedQueue.new(per_key_limit) }
end
let(:global_limit) { 5 }
let(:per_key_limit) { 3 }
let(:key1) { :key1 }
let(:key2) { :key2 }
let(:key3) { :key3 }
let(:task1) { "task1" }
let(:task2) { "task2" }
let(:task3) { "task3" }
let(:task4) { "task4" }
let(:task5) { "task5" }
let(:task6) { "task6" }
describe "#push" do
context "when no previous tasks exist for the key" do
it "adds the task to the queue" do
queue.push({ key: key1, task: task1 }, force: false)
expect(queue.size).to eq(1)
end
end
context "when the global limit is reached" do
before do
queue.push({ key: key1, task: task1 }, force: false)
queue.push({ key: key2, task: task2 }, force: false)
queue.push({ key: key3, task: task3 }, force: false)
queue.push({ key: key1, task: task4 }, force: false)
queue.push({ key: key2, task: task5 }, force: false)
end
it "raises an error if the force parameter is false" do
expect { queue.push({ key: key3, task: task6 }, force: false) }.to raise_error(
WorkQueue::WorkQueueFull,
)
end
it "adds the task to the queue if the force parameter is true" do
queue.push({ key: key3, task: task6 }, force: true)
expect(queue.size).to eq(6)
end
end
end
describe "#shift" do
it "removes and returns tasks in FIFO order when the keys are different" do
queue.push({ key: key1, task: task1 }, force: false)
queue.push({ key: key2, task: task2 }, force: false)
queue.push({ key: key3, task: task3 }, force: false)
expect(queue.shift).to eq({ key: key1, task: task1 })
expect(queue.shift).to eq({ key: key2, task: task2 })
expect(queue.shift).to eq({ key: key3, task: task3 })
expect(queue.size).to eq(0)
expect(queue).to be_empty
end
it "removes and returns tasks in FIFO order by key when the keys are the same" do
queue.push({ key: key1, task: task1 }, force: false)
queue.push({ key: key1, task: task3 }, force: false)
queue.push({ key: key2, task: task2 }, force: false)
queue.push({ key: key2, task: task4 }, force: false)
expect(queue.shift).to eq({ key: key1, task: task1 })
expect(queue.shift).to eq({ key: key2, task: task2 })
expect(queue.shift).to eq({ key: key1, task: task3 })
expect(queue.shift).to eq({ key: key2, task: task4 })
expect(queue.size).to eq(0)
expect(queue).to be_empty
end
it "returns nil when the queue is empty" do
shifted_task = queue.shift
expect(shifted_task).to be_nil
end
end
describe "#empty?" do
it "returns true if the queue is empty" do
expect(queue).to be_empty
end
it "returns false if the queue is not empty" do
queue.push({ key: key1, task: task1 }, force: false)
expect(queue).not_to be_empty
end
end
describe "#size" do
it "returns the number of tasks in the queue" do
queue.push({ key: key1, task: task1 }, force: false)
queue.push({ key: key2, task: task2 }, force: false)
expect(queue.size).to eq(2)
end
end
end
RSpec.describe WorkQueue::ThreadSafeWrapper do
subject(:queue) { WorkQueue::ThreadSafeWrapper.new(WorkQueue::BoundedQueue.new(3)) }
let(:task) { "task1" }
describe "#push" do
it "delegates the push operation to the inner queue" do
queue.push(task, force: false)
expect(queue).not_to be_empty
end
end
describe "#shift" do
context "when block is true" do
it "waits until an item is available and then returns it" do
result = nil
thread = Thread.new { result = queue.shift(block: true) }
expect(thread).to be_alive
queue.push(task, force: false)
thread.join
expect(result).to eq(task)
end
end
context "when block is false" do
it "returns nil immediately if no item is available" do
shifted_task = queue.shift(block: false)
expect(shifted_task).to be_nil
end
it "returns the first available item if one is present" do
queue.push(task, force: false)
shifted_task = queue.shift(block: false)
expect(shifted_task).to eq(task)
end
end
end
end