mirror of
https://github.com/discourse/discourse.git
synced 2025-02-20 11:48:26 -06:00
PERF: message_bus will be deferred by server when flooded
The message_bus performs a fair amount of work prior to hijacking requests this change ensures that if there is a situation where the server is flooded message_bus will inform client to back off for 30 seconds + random(120 secs) This back-off is ultra cheap and happens very early in the middleware. It corrects a situation where a flood to message bus could cause the app to become unresponsive MessageBus update is here to ensure message_bus gem properly respects Retry-After header and status 429. Under normal state this code should never trigger, to disable raise the value of DISCOURSE_REJECT_MESSAGE_BUS_QUEUE_SECONDS, default is to tell message bus to go away if we are queueing for 100ms or longer
This commit is contained in:
parent
b500ef77d7
commit
1f47ed1ea3
@ -178,7 +178,7 @@ GEM
|
|||||||
mini_mime (>= 0.1.1)
|
mini_mime (>= 0.1.1)
|
||||||
maxminddb (0.1.22)
|
maxminddb (0.1.22)
|
||||||
memory_profiler (0.9.13)
|
memory_profiler (0.9.13)
|
||||||
message_bus (2.2.0)
|
message_bus (2.2.2)
|
||||||
rack (>= 1.1.3)
|
rack (>= 1.1.3)
|
||||||
metaclass (0.0.4)
|
metaclass (0.0.4)
|
||||||
method_source (0.9.2)
|
method_source (0.9.2)
|
||||||
|
@ -224,6 +224,10 @@ force_anonymous_min_queue_seconds = 1
|
|||||||
# only trigger anon if we see more than N requests for this path in last 10 seconds
|
# only trigger anon if we see more than N requests for this path in last 10 seconds
|
||||||
force_anonymous_min_per_10_seconds = 3
|
force_anonymous_min_per_10_seconds = 3
|
||||||
|
|
||||||
|
# if a message bus request queues for 100ms or longer, we will reject it and ask consumer
|
||||||
|
# to back off
|
||||||
|
reject_message_bus_queue_seconds = 0.1
|
||||||
|
|
||||||
# disable search if app server is queueing for longer than this (in seconds)
|
# disable search if app server is queueing for longer than this (in seconds)
|
||||||
disable_search_queue_threshold = 1
|
disable_search_queue_threshold = 1
|
||||||
|
|
||||||
|
@ -17,6 +17,14 @@ end
|
|||||||
def setup_message_bus_env(env)
|
def setup_message_bus_env(env)
|
||||||
return if env["__mb"]
|
return if env["__mb"]
|
||||||
|
|
||||||
|
::Middleware::RequestTracker.populate_request_queue_seconds!(env)
|
||||||
|
|
||||||
|
if queue_time = env["REQUEST_QUEUE_SECONDS"]
|
||||||
|
if queue_time > (GlobalSetting.reject_message_bus_queue_seconds).to_f
|
||||||
|
raise RateLimiter::LimitExceeded, 30 + (rand * 120).to_i
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
host = RailsMultisite::ConnectionManagement.host(env)
|
host = RailsMultisite::ConnectionManagement.host(env)
|
||||||
RailsMultisite::ConnectionManagement.with_hostname(host) do
|
RailsMultisite::ConnectionManagement.with_hostname(host) do
|
||||||
extra_headers = {
|
extra_headers = {
|
||||||
|
@ -139,17 +139,23 @@ class Middleware::RequestTracker
|
|||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.populate_request_queue_seconds!(env)
|
||||||
|
if !env['REQUEST_QUEUE_SECONDS']
|
||||||
|
if queue_start = env['HTTP_X_REQUEST_START']
|
||||||
|
queue_start = queue_start.split("t=")[1].to_f
|
||||||
|
queue_time = (Time.now.to_f - queue_start)
|
||||||
|
env['REQUEST_QUEUE_SECONDS'] = queue_time
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def call(env)
|
def call(env)
|
||||||
result = nil
|
result = nil
|
||||||
log_request = true
|
log_request = true
|
||||||
|
|
||||||
# doing this as early as possible so we have an
|
# doing this as early as possible so we have an
|
||||||
# accurate counter
|
# accurate counter
|
||||||
if queue_start = env['HTTP_X_REQUEST_START']
|
::Middleware::RequestTracker.populate_request_queue_seconds!(env)
|
||||||
queue_start = queue_start.split("t=")[1].to_f
|
|
||||||
queue_time = (Time.now.to_f - queue_start)
|
|
||||||
env['REQUEST_QUEUE_SECONDS'] = queue_time
|
|
||||||
end
|
|
||||||
|
|
||||||
request = Rack::Request.new(env)
|
request = Rack::Request.new(env)
|
||||||
|
|
||||||
|
@ -14,6 +14,31 @@ describe 'rate limiter integration' do
|
|||||||
RateLimiter.disable
|
RateLimiter.disable
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "will rate limit message bus requests once queueing" do
|
||||||
|
freeze_time
|
||||||
|
|
||||||
|
global_setting :reject_message_bus_queue_seconds, 0.1
|
||||||
|
|
||||||
|
post "/message-bus/#{SecureRandom.hex}/poll", headers: {
|
||||||
|
"HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.2}"
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(response.status).to eq(429)
|
||||||
|
expect(response.headers['Retry-After']).to be > 29
|
||||||
|
end
|
||||||
|
|
||||||
|
it "will not rate limit when all is good" do
|
||||||
|
freeze_time
|
||||||
|
|
||||||
|
global_setting :reject_message_bus_queue_seconds, 0.1
|
||||||
|
|
||||||
|
post "/message-bus/#{SecureRandom.hex}/poll", headers: {
|
||||||
|
"HTTP_X_REQUEST_START" => "t=#{Time.now.to_f - 0.05}"
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(response.status).to eq(200)
|
||||||
|
end
|
||||||
|
|
||||||
it "will clear the token cookie if invalid" do
|
it "will clear the token cookie if invalid" do
|
||||||
name = Auth::DefaultCurrentUserProvider::TOKEN_COOKIE
|
name = Auth::DefaultCurrentUserProvider::TOKEN_COOKIE
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user