FEATURE: push post rebake regular task to low priority queue

This allows us to run regular rebakes without starving the normal queue.

It additionally adds the ability to specify queue with `Jobs.enqueue` so
we can specifically queue a job with lower priority using the `queue` arg.
This commit is contained in:
Sam 2019-01-09 08:57:20 +11:00
parent cfb8e157a2
commit e08a3f719c
4 changed files with 46 additions and 18 deletions

View File

@ -197,14 +197,28 @@ module Jobs
# If we are able to queue a job, do it # If we are able to queue a job, do it
if SiteSetting.queue_jobs? if SiteSetting.queue_jobs?
if opts[:delay_for].present? hash = {
klass.perform_in(opts.delete(:delay_for), opts) 'class' => klass,
else 'args' => [opts]
Sidekiq::Client.enqueue(klass, opts) }
if delay = opts.delete(:delay_for)
if delay.to_f > 0
hash['at'] = Time.now.to_f + delay.to_f
end
end end
if queue = opts.delete(:queue)
hash['queue'] = queue
end
klass.client_push(hash)
else else
# Otherwise execute the job right away # Otherwise execute the job right away
opts.delete(:delay_for) opts.delete(:delay_for)
opts.delete(:queue)
opts[:sync_exec] = true opts[:sync_exec] = true
if Rails.env == "development" if Rails.env == "development"
Scheduler::Defer.later("job") do Scheduler::Defer.later("job") do

View File

@ -15,7 +15,7 @@ module Jobs
(@call_count % 24) == 1 (@call_count % 24) == 1
end end
def execute(args) def execute(args = nil)
# Feature topics in categories # Feature topics in categories
CategoryFeaturedTopic.feature_topics(batched: true) CategoryFeaturedTopic.feature_topics(batched: true)
@ -29,7 +29,7 @@ module Jobs
# Forces rebake of old posts where needed, as long as no system avatars need updating # Forces rebake of old posts where needed, as long as no system avatars need updating
if !SiteSetting.automatically_download_gravatars || !UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first if !SiteSetting.automatically_download_gravatars || !UserAvatar.where("last_gravatar_download_attempt IS NULL").limit(1).first
problems = Post.rebake_old(SiteSetting.rebake_old_posts_count) problems = Post.rebake_old(SiteSetting.rebake_old_posts_count, priority: :low)
problems.each do |hash| problems.each do |hash|
post_id = hash[:post].id post_id = hash[:post].id
Discourse.handle_job_exception(hash[:ex], error_context(args, "Rebaking post id #{post_id}", post_id: post_id)) Discourse.handle_job_exception(hash[:ex], error_context(args, "Rebaking post id #{post_id}", post_id: post_id))

View File

@ -515,7 +515,7 @@ class Post < ActiveRecord::Base
PostRevisor.new(self).revise!(updated_by, changes, opts) PostRevisor.new(self).revise!(updated_by, changes, opts)
end end
def self.rebake_old(limit) def self.rebake_old(limit, priority: :normal)
limiter = RateLimiter.new( limiter = RateLimiter.new(
nil, nil,
@ -534,7 +534,7 @@ class Post < ActiveRecord::Base
break if !limiter.can_perform? break if !limiter.can_perform?
post = Post.find(id) post = Post.find(id)
post.rebake! post.rebake!(priority: priority)
begin begin
limiter.performed! limiter.performed!
@ -560,15 +560,13 @@ class Post < ActiveRecord::Base
problems problems
end end
def rebake!(opts = nil) def rebake!(invalidate_broken_images: false, invalidate_oneboxes: false, priority: nil)
opts ||= {} new_cooked = cook(raw, topic_id: topic_id, invalidate_oneboxes: invalidate_oneboxes)
new_cooked = cook(raw, topic_id: topic_id, invalidate_oneboxes: opts.fetch(:invalidate_oneboxes, false))
old_cooked = cooked old_cooked = cooked
update_columns(cooked: new_cooked, baked_at: Time.new, baked_version: BAKED_VERSION) update_columns(cooked: new_cooked, baked_at: Time.new, baked_version: BAKED_VERSION)
if opts.fetch(:invalidate_broken_images, false) if invalidate_broken_images
custom_fields.delete(BROKEN_IMAGES) custom_fields.delete(BROKEN_IMAGES)
save_custom_fields save_custom_fields
end end
@ -578,7 +576,7 @@ class Post < ActiveRecord::Base
QuotedPost.extract_from(self) QuotedPost.extract_from(self)
# make sure we trigger the post process # make sure we trigger the post process
trigger_post_process(bypass_bump: true) trigger_post_process(bypass_bump: true, priority: priority)
publish_change_to_clients!(:rebaked) publish_change_to_clients!(:rebaked)
@ -692,7 +690,7 @@ class Post < ActiveRecord::Base
end end
# Enqueue post processing for this post # Enqueue post processing for this post
def trigger_post_process(bypass_bump: false) def trigger_post_process(bypass_bump: false, priority: :normal)
args = { args = {
post_id: id, post_id: id,
bypass_bump: bypass_bump, bypass_bump: bypass_bump,
@ -700,6 +698,11 @@ class Post < ActiveRecord::Base
args[:image_sizes] = image_sizes if image_sizes.present? args[:image_sizes] = image_sizes if image_sizes.present?
args[:invalidate_oneboxes] = true if invalidate_oneboxes.present? args[:invalidate_oneboxes] = true if invalidate_oneboxes.present?
args[:cooking_options] = self.cooking_options args[:cooking_options] = self.cooking_options
if priority == :low
args[:queue] = 'low'
end
Jobs.enqueue(:process_post, args) Jobs.enqueue(:process_post, args)
DiscourseEvent.trigger(:after_trigger_post_process, self) DiscourseEvent.trigger(:after_trigger_post_process, self)
end end

View File

@ -13,13 +13,24 @@ describe Jobs::PeriodicalUpdates do
SiteSetting.automatically_download_gravatars = false SiteSetting.automatically_download_gravatars = false
post = create_post post = create_post
post.update_columns(baked_at: Time.new(2000, 1, 1), baked_version: -1) post.update_columns(baked_at: Time.new(2000, 1, 1), baked_version: -1)
described_class.new.execute({})
Sidekiq::Testing.fake! do
Jobs::ProcessPost.jobs.clear
Jobs::PeriodicalUpdates.new.execute
jobs = Jobs::ProcessPost.jobs
expect(jobs.length).to eq(1)
expect(jobs[0]["queue"]).to eq("low")
end
post.reload post.reload
expect(post.baked_at).to be > 1.day.ago expect(post.baked_at).to be > 1.day.ago
baked = post.baked_at baked = post.baked_at
described_class.new.execute({})
# does not rebake
Jobs::PeriodicalUpdates.new.execute
post.reload post.reload
expect(post.baked_at).to eq(baked) expect(post.baked_at).to eq(baked)
end end