mirror of
https://github.com/discourse/discourse.git
synced 2025-02-25 18:55:32 -06:00
move job files so they live underneath app/ and not in lib/
introduce new setting email_always, that will force emails to send to users regardless of presence on site
This commit is contained in:
218
lib/jobs.rb
218
lib/jobs.rb
@@ -1,218 +0,0 @@
|
||||
module Jobs
|
||||
|
||||
def self.queued
|
||||
Sidekiq::Stats.new.enqueued
|
||||
end
|
||||
|
||||
def self.last_job_performed_at
|
||||
Sidekiq.redis do |r|
|
||||
int = r.get('last_job_perform_at')
|
||||
int ? Time.at(int.to_i) : nil
|
||||
end
|
||||
end
|
||||
|
||||
def self.num_email_retry_jobs
|
||||
Sidekiq::RetrySet.new.select { |job| job.klass =~ /Email$/ }.size
|
||||
end
|
||||
|
||||
class Base
|
||||
|
||||
class Instrumenter
|
||||
|
||||
def self.stats
|
||||
Thread.current[:db_stats] ||= Stats.new
|
||||
end
|
||||
|
||||
class Stats
|
||||
attr_accessor :query_count, :duration_ms
|
||||
|
||||
def initialize
|
||||
@query_count = 0
|
||||
@duration_ms = 0
|
||||
end
|
||||
end
|
||||
|
||||
def call(name, start, finish, message_id, values)
|
||||
stats = Instrumenter.stats
|
||||
stats.query_count += 1
|
||||
stats.duration_ms += (((finish - start).to_f) * 1000).to_i
|
||||
end
|
||||
end
|
||||
|
||||
include Sidekiq::Worker
|
||||
|
||||
def initialize
|
||||
@db_duration = 0
|
||||
end
|
||||
|
||||
def log(*args)
|
||||
puts args
|
||||
args.each do |arg|
|
||||
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
def self.delayed_perform(opts={})
|
||||
self.new.perform(opts)
|
||||
end
|
||||
|
||||
def execute(opts={})
|
||||
raise "Overwrite me!"
|
||||
end
|
||||
|
||||
def last_db_duration
|
||||
@db_duration || 0
|
||||
end
|
||||
|
||||
def ensure_db_instrumented
|
||||
@@instrumented ||= begin
|
||||
ActiveSupport::Notifications.subscribe('sql.active_record', Instrumenter.new)
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
def perform(*args)
|
||||
ensure_db_instrumented
|
||||
opts = args.extract_options!.with_indifferent_access
|
||||
|
||||
if SiteSetting.queue_jobs?
|
||||
Sidekiq.redis do |r|
|
||||
r.set('last_job_perform_at', Time.now.to_i)
|
||||
end
|
||||
end
|
||||
|
||||
if opts.delete(:sync_exec)
|
||||
if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
|
||||
raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
|
||||
else
|
||||
return execute(opts)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
dbs =
|
||||
if opts[:current_site_id]
|
||||
[opts[:current_site_id]]
|
||||
else
|
||||
RailsMultisite::ConnectionManagement.all_dbs
|
||||
end
|
||||
|
||||
total_db_time = 0
|
||||
dbs.each do |db|
|
||||
begin
|
||||
thread_exception = nil
|
||||
# NOTE: This looks odd, in fact it looks crazy but there is a reason
|
||||
# A bug in therubyracer means that under certain conditions running in a fiber
|
||||
# can cause the whole v8 context to corrupt so much that it will hang sidekiq
|
||||
#
|
||||
# If you are brave and want to try to fix this either in celluloid or therubyracer, the repro is:
|
||||
#
|
||||
# 1. Create a big Discourse db: (you can start from script/profile_db_generator.rb)
|
||||
# 2. Queue a ton of jobs, eg: User.pluck(:id).each{|id| Jobs.enqueue(:user_email, type: :digest, user_id: id)};
|
||||
# 3. Run sidekiq
|
||||
#
|
||||
# The issue only happens in Ruby 2.0 for some reason, you start getting V8::Error with no context
|
||||
#
|
||||
# See: https://github.com/cowboyd/therubyracer/issues/206
|
||||
#
|
||||
# The restricted stack space of fibers opens a bunch of risks up, by avoiding them altogether
|
||||
# we can mitigate giving up a very marginal amount of throughput
|
||||
#
|
||||
# Ideally we could just tell sidekiq to avoid fibers
|
||||
|
||||
t = Thread.new do
|
||||
begin
|
||||
RailsMultisite::ConnectionManagement.establish_connection(db: db)
|
||||
I18n.locale = SiteSetting.default_locale
|
||||
execute(opts)
|
||||
rescue => e
|
||||
thread_exception = e
|
||||
ensure
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
total_db_time += Instrumenter.stats.duration_ms
|
||||
end
|
||||
end
|
||||
t.join
|
||||
|
||||
raise thread_exception if thread_exception
|
||||
end
|
||||
end
|
||||
|
||||
ensure
|
||||
ActiveRecord::Base.connection_handler.clear_active_connections!
|
||||
@db_duration = total_db_time
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class Scheduled < Base
|
||||
include Sidetiq::Schedulable
|
||||
end
|
||||
|
||||
def self.enqueue(job_name, opts={})
|
||||
klass = "Jobs::#{job_name.to_s.camelcase}".constantize
|
||||
|
||||
# Unless we want to work on all sites
|
||||
unless opts.delete(:all_sites)
|
||||
opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
|
||||
end
|
||||
|
||||
# If we are able to queue a job, do it
|
||||
if SiteSetting.queue_jobs?
|
||||
if opts[:delay_for].present?
|
||||
klass.delay_for(opts.delete(:delay_for)).delayed_perform(opts)
|
||||
else
|
||||
Sidekiq::Client.enqueue(klass, opts)
|
||||
end
|
||||
else
|
||||
# Otherwise execute the job right away
|
||||
opts.delete(:delay_for)
|
||||
opts[:sync_exec] = true
|
||||
klass.new.perform(opts)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def self.enqueue_in(secs, job_name, opts={})
|
||||
enqueue(job_name, opts.merge!(delay_for: secs))
|
||||
end
|
||||
|
||||
def self.enqueue_at(datetime, job_name, opts={})
|
||||
enqueue_in( [(datetime - Time.zone.now).to_i, 0].max, job_name, opts )
|
||||
end
|
||||
|
||||
def self.cancel_scheduled_job(job_name, params={})
|
||||
jobs = scheduled_for(job_name, params)
|
||||
return false if jobs.empty?
|
||||
jobs.each { |job| job.delete }
|
||||
true
|
||||
end
|
||||
|
||||
def self.scheduled_for(job_name, params={})
|
||||
job_class = "Jobs::#{job_name.to_s.camelcase}"
|
||||
Sidekiq::ScheduledSet.new.select do |scheduled_job|
|
||||
if scheduled_job.klass == 'Sidekiq::Extensions::DelayedClass'
|
||||
job_args = YAML.load(scheduled_job.args[0])
|
||||
job_args_class, _, (job_args_params, *) = job_args
|
||||
if job_args_class.to_s == job_class && job_args_params
|
||||
matched = true
|
||||
params.each do |key, value|
|
||||
unless job_args_params[key] == value
|
||||
matched = false
|
||||
break
|
||||
end
|
||||
end
|
||||
matched
|
||||
else
|
||||
false
|
||||
end
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Require all jobs
|
||||
Dir["#{Rails.root}/lib/jobs/*"].each {|file| require_dependency file }
|
||||
@@ -1,12 +0,0 @@
|
||||
module Jobs
|
||||
|
||||
class CategoryStats < Jobs::Scheduled
|
||||
recurrence { daily.hour_of_day(4) }
|
||||
|
||||
def execute(args)
|
||||
Category.update_stats
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,15 +0,0 @@
|
||||
module Jobs
|
||||
class CloseTopic < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
topic = Topic.where(id: args[:topic_id]).first
|
||||
if topic and topic.auto_close_at and !topic.closed? and !topic.deleted_at
|
||||
closer = User.where(id: args[:user_id]).first
|
||||
if Guardian.new(closer).can_moderate?(topic)
|
||||
topic.update_status('autoclosed', true, closer)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,16 +0,0 @@
|
||||
module Jobs
|
||||
class DashboardStats < Jobs::Scheduled
|
||||
recurrence { hourly.minute_of_hour(0,30) }
|
||||
|
||||
def execute(args)
|
||||
stats_json = AdminDashboardData.fetch_stats.as_json
|
||||
|
||||
# Add some extra time to the expiry so that the next job run has plenty of time to
|
||||
# finish before previous cached value expires.
|
||||
$redis.setex AdminDashboardData.stats_cache_key, (AdminDashboardData.recalculate_interval + 5).minutes, stats_json.to_json
|
||||
|
||||
stats_json
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,10 +0,0 @@
|
||||
module Jobs
|
||||
# various consistency checks
|
||||
class DestroyOldDeletionStubs < Jobs::Scheduled
|
||||
recurrence { hourly.minute_of_hour(0, 30) }
|
||||
|
||||
def execute(args)
|
||||
PostDestroyer.destroy_stubs
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,30 +0,0 @@
|
||||
require_dependency 'avatar_detector'
|
||||
|
||||
module Jobs
|
||||
|
||||
class DetectAvatars < Jobs::Scheduled
|
||||
recurrence { daily.hour_of_day(8) }
|
||||
|
||||
def execute(args)
|
||||
return unless SiteSetting.detect_custom_avatars?
|
||||
|
||||
# Find a random sampling of users of trust level 1 or higher who don't have a custom avatar.
|
||||
user_stats = UserStat.where('user_stats.has_custom_avatar = false AND users.trust_level > 0')
|
||||
.includes(:user)
|
||||
.order("random()")
|
||||
.limit(SiteSetting.max_daily_gravatar_crawls)
|
||||
|
||||
if user_stats.present?
|
||||
user_stats.each do |us|
|
||||
us.update_column(:has_custom_avatar, true) if AvatarDetector.new(us.user).has_custom_avatar?
|
||||
UserHistory.create!(
|
||||
action: UserHistory.actions[:checked_for_custom_avatar],
|
||||
target_user_id: us.user_id
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,30 +0,0 @@
|
||||
module Jobs
|
||||
|
||||
# A daily job that will enqueue digest emails to be sent to users
|
||||
class EnqueueDigestEmails < Jobs::Scheduled
|
||||
recurrence { daily.hour_of_day(6) }
|
||||
|
||||
def execute(args)
|
||||
target_user_ids.each do |user_id|
|
||||
Jobs.enqueue(:user_email, type: :digest, user_id: user_id)
|
||||
end
|
||||
end
|
||||
|
||||
def target_user_ids
|
||||
# Users who want to receive emails and haven't been emailed in the last day
|
||||
query = User.real
|
||||
.where(email_digests: true, active: true)
|
||||
.where("COALESCE(last_emailed_at, '2010-01-01') <= CURRENT_TIMESTAMP - ('1 DAY'::INTERVAL * digest_after_days)")
|
||||
.where("COALESCE(last_seen_at, '2010-01-01') <= CURRENT_TIMESTAMP - ('1 DAY'::INTERVAL * digest_after_days)")
|
||||
|
||||
# If the site requires approval, make sure the user is approved
|
||||
if SiteSetting.must_approve_users?
|
||||
query = query.where("approved OR moderator OR admin")
|
||||
end
|
||||
|
||||
query.pluck(:id)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,14 +0,0 @@
|
||||
module Jobs
|
||||
# various consistency checks
|
||||
class EnsureDbConsistency < Jobs::Scheduled
|
||||
recurrence { daily.hour_of_day(2) }
|
||||
|
||||
def execute(args)
|
||||
TopicUser.ensure_consistency!
|
||||
UserVisit.ensure_consistency!
|
||||
Group.refresh_automatic_groups!
|
||||
Notification.ensure_consistency!
|
||||
UserAction.ensure_consistency!
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -1,115 +0,0 @@
|
||||
require_dependency 'export/json_encoder'
|
||||
require_dependency 'export/export'
|
||||
require_dependency 'import/import'
|
||||
|
||||
module Jobs
|
||||
|
||||
class Exporter < Jobs::Base
|
||||
|
||||
sidekiq_options retry: false
|
||||
|
||||
def execute(args)
|
||||
raise Import::ImportInProgressError if Import::is_import_running?
|
||||
raise Export::ExportInProgressError if Export::is_export_running?
|
||||
|
||||
@format = args[:format] || :json
|
||||
|
||||
@output_base_filename = File.absolute_path( args[:filename] || File.join( Rails.root, 'tmp', "export-#{Time.now.strftime('%Y-%m-%d-%H%M%S')}" ) )
|
||||
@output_base_filename = @output_base_filename[0...-3] if @output_base_filename[-3..-1] == '.gz'
|
||||
@output_base_filename = @output_base_filename[0...-4] if @output_base_filename[-4..-1] == '.tar'
|
||||
|
||||
@user = args[:user_id] ? User.where(id: args[:user_id].to_i).first : nil
|
||||
|
||||
start_export
|
||||
@encoder.write_schema_info( source: 'discourse', version: Export.current_schema_version )
|
||||
ordered_models_for_export.each do |model|
|
||||
log " #{model.table_name}"
|
||||
column_info = model.columns
|
||||
order_col = column_info.map(&:name).find {|x| x == 'id'} || order_columns_for(model)
|
||||
@encoder.write_table(model.table_name, column_info) do |num_rows_written|
|
||||
if order_col
|
||||
model.connection.select_rows("select * from #{model.table_name} order by #{order_col} limit #{batch_size} offset #{num_rows_written}")
|
||||
else
|
||||
# Take the rows in the order the database returns them
|
||||
log "WARNING: no order by clause is being used for #{model.name} (#{model.table_name}). Please update Jobs::Exporter order_columns_for for #{model.name}."
|
||||
model.connection.select_rows("select * from #{model.table_name} limit #{batch_size} offset #{num_rows_written}")
|
||||
end
|
||||
end
|
||||
end
|
||||
"#{@output_base_filename}.tar.gz"
|
||||
ensure
|
||||
finish_export
|
||||
end
|
||||
|
||||
def ordered_models_for_export
|
||||
Export.models_included_in_export
|
||||
end
|
||||
|
||||
def order_columns_for(model)
|
||||
@order_columns_for_hash ||= {
|
||||
'CategoryFeaturedTopic' => 'category_id, topic_id',
|
||||
'CategorySearchData' => 'category_id',
|
||||
'PostOneboxRender' => 'post_id, onebox_render_id',
|
||||
'PostReply' => 'post_id, reply_id',
|
||||
'PostSearchData' => 'post_id',
|
||||
'PostTiming' => 'topic_id, post_number, user_id',
|
||||
'SiteContent' => 'content_type',
|
||||
'TopicUser' => 'topic_id, user_id',
|
||||
'UserSearchData' => 'user_id',
|
||||
'View' => 'parent_id, parent_type, ip_address, viewed_at'
|
||||
}
|
||||
@order_columns_for_hash[model.name]
|
||||
end
|
||||
|
||||
def batch_size
|
||||
1000
|
||||
end
|
||||
|
||||
def start_export
|
||||
if @format == :json
|
||||
@encoder = Export::JsonEncoder.new
|
||||
else
|
||||
raise Export::FormatInvalidError
|
||||
end
|
||||
Export.set_export_started
|
||||
Discourse.enable_maintenance_mode
|
||||
end
|
||||
|
||||
def finish_export
|
||||
if @encoder
|
||||
@encoder.finish
|
||||
create_tar_file
|
||||
@encoder.remove_tmp_directory('export')
|
||||
end
|
||||
ensure
|
||||
Export.set_export_is_not_running
|
||||
Discourse.disable_maintenance_mode
|
||||
send_notification
|
||||
end
|
||||
|
||||
def create_tar_file
|
||||
filenames = @encoder.filenames
|
||||
|
||||
FileUtils.cd( File.dirname(filenames.first) ) do
|
||||
`tar cvf #{@output_base_filename}.tar #{File.basename(filenames.first)}`
|
||||
end
|
||||
|
||||
FileUtils.cd( File.join(Rails.root, 'public') ) do
|
||||
Upload.find_each do |upload|
|
||||
`tar rvf #{@output_base_filename}.tar #{upload.url[1..-1]}` unless upload.url[0,4] == 'http'
|
||||
end
|
||||
end
|
||||
|
||||
`gzip #{@output_base_filename}.tar`
|
||||
|
||||
true
|
||||
end
|
||||
|
||||
def send_notification
|
||||
SystemMessage.new(@user).create('export_succeeded') if @user
|
||||
true
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,27 +0,0 @@
|
||||
module Jobs
|
||||
|
||||
class FeatureTopicUsers < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
topic_id = args[:topic_id]
|
||||
raise Discourse::InvalidParameters.new(:topic_id) unless topic_id.present?
|
||||
|
||||
topic = Topic.where(id: topic_id).first
|
||||
|
||||
# there are 3 cases here
|
||||
# 1. topic was atomically nuked, this should be skipped
|
||||
# 2. topic was deleted, this should be skipped
|
||||
# 3. error an incorrect topic_id was sent
|
||||
|
||||
unless topic.present?
|
||||
max_id = Topic.with_deleted.maximum(:id).to_i
|
||||
raise Discourse::InvalidParameters.new(:topic_id) if max_id < topic_id
|
||||
return
|
||||
end
|
||||
|
||||
topic.feature_topic_users(args)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,48 +0,0 @@
|
||||
require "image_sorcery"
|
||||
|
||||
module Jobs
|
||||
|
||||
class GenerateAvatars < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
upload = Upload.where(id: args[:upload_id]).first
|
||||
return unless upload.present?
|
||||
|
||||
external_copy = Discourse.store.download(upload) if Discourse.store.external?
|
||||
original_path = if Discourse.store.external?
|
||||
external_copy.path
|
||||
else
|
||||
Discourse.store.path_for(upload)
|
||||
end
|
||||
|
||||
# we'll extract the first frame when it's a gif
|
||||
source = original_path
|
||||
source << "[0]" unless SiteSetting.allow_animated_avatars
|
||||
|
||||
[120, 45, 32, 25, 20].each do |s|
|
||||
# handle retina too
|
||||
[s, s * 2].each do |size|
|
||||
# create a temp file with the same extension as the original
|
||||
temp_file = Tempfile.new(["discourse-avatar", File.extname(original_path)])
|
||||
temp_path = temp_file.path
|
||||
# create a centered square thumbnail
|
||||
if ImageSorcery.new(source).convert(temp_path, gravity: "center", thumbnail: "#{size}x#{size}^", extent: "#{size}x#{size}", background: "transparent")
|
||||
Discourse.store.store_avatar(temp_file, upload, size)
|
||||
end
|
||||
# close && remove temp file
|
||||
temp_file.close!
|
||||
end
|
||||
end
|
||||
|
||||
# make sure we remove the cached copy from external stores
|
||||
external_copy.close! if Discourse.store.external?
|
||||
|
||||
user = User.where(id: upload.user_id).first
|
||||
user.uploaded_avatar_template = Discourse.store.absolute_avatar_template(upload)
|
||||
user.save!
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,275 +0,0 @@
|
||||
require_dependency 'import/json_decoder'
|
||||
require_dependency 'import/import'
|
||||
require_dependency 'import/adapter/base'
|
||||
require_dependency 'directory_helper'
|
||||
|
||||
(Dir.entries(File.join( Rails.root, 'lib', 'import', 'adapter' )) - ['.', '..', 'base.rb']).each do |f|
|
||||
require_dependency "import/adapter/#{f}"
|
||||
end
|
||||
|
||||
module Jobs
|
||||
|
||||
class Importer < Jobs::Base
|
||||
|
||||
include DirectoryHelper
|
||||
sidekiq_options retry: false
|
||||
|
||||
BACKUP_SCHEMA = 'backup'
|
||||
|
||||
def initialize
|
||||
@index_definitions = {}
|
||||
@format = :json
|
||||
@warnings = []
|
||||
end
|
||||
|
||||
def execute(args)
|
||||
ordered_models_for_import.each { |model| model.primary_key } # a HACK to workaround cache problems
|
||||
|
||||
raise Import::ImportDisabledError unless SiteSetting.allow_import?
|
||||
raise Import::ImportInProgressError if Import::is_import_running?
|
||||
raise Export::ExportInProgressError if Export::is_export_running?
|
||||
|
||||
# Disable printing of NOTICE, DETAIL and other unimportant messages from postgresql
|
||||
User.exec_sql("SET client_min_messages TO WARNING")
|
||||
|
||||
@format = args[:format] || :json
|
||||
@archive_filename = args[:filename]
|
||||
if args[:user_id]
|
||||
# After the import is done, we'll need to reload the user record and make sure it's the same person
|
||||
# before sending a notification
|
||||
user = User.where(id: args[:user_id].to_i).first
|
||||
@user_info = { user_id: user.id, email: user.email }
|
||||
else
|
||||
@user_info = nil
|
||||
end
|
||||
|
||||
start_import
|
||||
backup_tables
|
||||
begin
|
||||
load_data
|
||||
create_indexes
|
||||
extract_uploads
|
||||
rescue
|
||||
log "Performing a ROLLBACK because something went wrong!"
|
||||
rollback
|
||||
raise
|
||||
end
|
||||
ensure
|
||||
finish_import
|
||||
end
|
||||
|
||||
def ordered_models_for_import
|
||||
Export.models_included_in_export
|
||||
end
|
||||
|
||||
def start_import
|
||||
if @format != :json
|
||||
raise Import::FormatInvalidError
|
||||
elsif @archive_filename.nil?
|
||||
raise Import::FilenameMissingError
|
||||
else
|
||||
extract_files
|
||||
@decoder = Import::JsonDecoder.new( File.join(tmp_directory('import'), 'tables.json') )
|
||||
Import.set_import_started
|
||||
Discourse.enable_maintenance_mode
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
def extract_files
|
||||
FileUtils.cd( tmp_directory('import') ) do
|
||||
`tar xvzf #{@archive_filename} tables.json`
|
||||
end
|
||||
end
|
||||
|
||||
def backup_tables
|
||||
log " Backing up tables"
|
||||
ActiveRecord::Base.transaction do
|
||||
create_backup_schema
|
||||
ordered_models_for_import.each do |model|
|
||||
backup_and_setup_table( model )
|
||||
end
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
def create_backup_schema
|
||||
User.exec_sql("DROP SCHEMA IF EXISTS #{BACKUP_SCHEMA} CASCADE")
|
||||
User.exec_sql("CREATE SCHEMA #{BACKUP_SCHEMA}")
|
||||
self
|
||||
end
|
||||
|
||||
def backup_and_setup_table( model )
|
||||
log " #{model.table_name}"
|
||||
@index_definitions[model.table_name] = model.exec_sql("SELECT indexdef FROM pg_indexes WHERE tablename = '#{model.table_name}' and schemaname = 'public';").map { |x| x['indexdef'] }
|
||||
model.exec_sql("ALTER TABLE #{model.table_name} SET SCHEMA #{BACKUP_SCHEMA}")
|
||||
model.exec_sql("CREATE TABLE #{model.table_name} (LIKE #{BACKUP_SCHEMA}.#{model.table_name} INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING COMMENTS INCLUDING STORAGE);")
|
||||
self
|
||||
end
|
||||
|
||||
def load_data
|
||||
log " Importing data"
|
||||
@decoder.start(
|
||||
callbacks: {
|
||||
schema_info: method(:set_schema_info),
|
||||
table_data: method(:load_table)
|
||||
}
|
||||
)
|
||||
self
|
||||
end
|
||||
|
||||
def set_schema_info(arg)
|
||||
if arg[:source] && arg[:source].downcase == 'discourse'
|
||||
if arg[:version] && arg[:version] <= Export.current_schema_version
|
||||
@export_schema_version = arg[:version]
|
||||
if arg[:table_count] == ordered_models_for_import.size
|
||||
true
|
||||
else
|
||||
raise Import::WrongTableCountError.new("Expected to find #{ordered_models_for_import.size} tables, but export file has #{arg[:table_count]} tables!")
|
||||
end
|
||||
elsif arg[:version].nil?
|
||||
raise ArgumentError.new("The schema version must be provided.")
|
||||
else
|
||||
raise Import::UnsupportedSchemaVersion.new("Export file is from a newer version of Discourse. Upgrade and run migrations to import this file.")
|
||||
end
|
||||
else
|
||||
raise Import::UnsupportedExportSource
|
||||
end
|
||||
end
|
||||
|
||||
def load_table(table_name, fields_arg, row_data, row_count)
|
||||
fields = fields_arg.dup
|
||||
model = Export::models_included_in_export.find { |m| m.table_name == table_name }
|
||||
if model
|
||||
|
||||
@adapters ||= Import.adapters_for_version( @export_schema_version )
|
||||
|
||||
log " #{table_name}: #{row_count} rows"
|
||||
|
||||
if @adapters[table_name]
|
||||
@adapters[table_name].each do |adapter|
|
||||
fields = adapter.apply_to_column_names(table_name, fields)
|
||||
end
|
||||
end
|
||||
|
||||
if fields.size > model.columns.size
|
||||
raise Import::WrongFieldCountError.new("Table #{table_name} is expected to have #{model.columns.size} fields, but got #{fields.size}! Maybe your Discourse server is older than the server that this export file comes from?")
|
||||
end
|
||||
|
||||
# If there are fewer fields in the data than the model has, then insert only those fields and
|
||||
# hope that the table uses default values or allows null for the missing columns.
|
||||
# If the table doesn't have defaults or is not nullable, then a migration adapter should have been created
|
||||
# along with the migration.
|
||||
|
||||
column_info = model.columns
|
||||
|
||||
col_num = -1
|
||||
rows = row_data.map do |row|
|
||||
if @adapters[table_name]
|
||||
@adapters[table_name].each do |adapter|
|
||||
row = adapter.apply_to_row(table_name, row)
|
||||
end
|
||||
end
|
||||
row
|
||||
end.transpose.map do |col_values|
|
||||
col_num += 1
|
||||
case column_info[col_num].type
|
||||
when :boolean
|
||||
col_values.map { |v| v.nil? ? nil : (v == 'f' ? false : true) }
|
||||
else
|
||||
col_values
|
||||
end
|
||||
end.transpose
|
||||
|
||||
parameter_markers = fields.map {|x| "?"}.join(',')
|
||||
sql_stmt = "INSERT INTO #{table_name} (#{fields.join(',')}) VALUES (#{parameter_markers})"
|
||||
|
||||
rows.each do |row|
|
||||
User.exec_sql(sql_stmt, *row)
|
||||
end
|
||||
|
||||
true
|
||||
else
|
||||
add_warning "Export file contained an unrecognized table named: #{table_name}! It was ignored."
|
||||
end
|
||||
end
|
||||
|
||||
def create_indexes
|
||||
log " Creating indexes"
|
||||
ordered_models_for_import.each do |model|
|
||||
log " #{model.table_name}"
|
||||
@index_definitions[model.table_name].each do |indexdef|
|
||||
model.exec_sql( indexdef )
|
||||
end
|
||||
|
||||
# The indexdef statements don't create the primary keys, so we need to find the primary key and do it ourselves.
|
||||
pkey_index_def = @index_definitions[model.table_name].find { |ixdef| ixdef =~ / ([\S]{1,}_pkey) / }
|
||||
if pkey_index_def && pkey_index_name = / ([\S]{1,}_pkey) /.match(pkey_index_def)[1]
|
||||
model.exec_sql( "ALTER TABLE ONLY #{model.table_name} ADD PRIMARY KEY USING INDEX #{pkey_index_name}" )
|
||||
end
|
||||
|
||||
if model.columns.map(&:name).include?('id')
|
||||
max_id = model.exec_sql("SELECT MAX(id) AS max FROM #{model.table_name}")[0]['max'].to_i + 1
|
||||
seq_name = "#{model.table_name}_id_seq"
|
||||
model.exec_sql("CREATE SEQUENCE #{seq_name} START WITH #{max_id} INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1")
|
||||
model.exec_sql("ALTER SEQUENCE #{seq_name} OWNED BY #{model.table_name}.id")
|
||||
model.exec_sql("ALTER TABLE #{model.table_name} ALTER COLUMN id SET DEFAULT nextval('#{seq_name}')")
|
||||
end
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
def extract_uploads
|
||||
if `tar tf #{@archive_filename} | grep "uploads/"`.present?
|
||||
FileUtils.cd( File.join(Rails.root, 'public') ) do
|
||||
`tar -xz --keep-newer-files -f #{@archive_filename} uploads/`
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def rollback
|
||||
ordered_models_for_import.each do |model|
|
||||
log " #{model.table_name}"
|
||||
model.exec_sql("DROP TABLE IF EXISTS #{model.table_name}") rescue nil
|
||||
begin
|
||||
model.exec_sql("ALTER TABLE #{BACKUP_SCHEMA}.#{model.table_name} SET SCHEMA public")
|
||||
rescue => e
|
||||
log " Failed to restore. #{e.class.name}: #{e.message}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def finish_import
|
||||
Import.set_import_is_not_running
|
||||
Discourse.disable_maintenance_mode
|
||||
remove_tmp_directory('import')
|
||||
|
||||
if @warnings.size > 0
|
||||
log "WARNINGS:"
|
||||
@warnings.each do |message|
|
||||
log " #{message}"
|
||||
end
|
||||
end
|
||||
|
||||
# send_notification
|
||||
end
|
||||
|
||||
def send_notification
|
||||
# Doesn't work. "WARNING: Can't mass-assign protected attributes: created_at"
|
||||
# Still a problem with the activerecord schema_cache I think.
|
||||
# if @user_info && @user_info[:user_id]
|
||||
# user = User.where(id: @user_info[:user_id]).first
|
||||
# if user && user.email == @user_info[:email]
|
||||
# SystemMessage.new(user).create('import_succeeded')
|
||||
# end
|
||||
# end
|
||||
true
|
||||
end
|
||||
|
||||
def add_warning(message)
|
||||
@warnings << message
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,18 +0,0 @@
|
||||
require_dependency 'email/sender'
|
||||
|
||||
module Jobs
|
||||
|
||||
# Asynchronously send an email
|
||||
class InviteEmail < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
raise Discourse::InvalidParameters.new(:invite_id) unless args[:invite_id].present?
|
||||
|
||||
invite = Invite.where(id: args[:invite_id]).first
|
||||
message = InviteMailer.send_invite(invite)
|
||||
Email::Sender.new(message, :invite).send
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,31 +0,0 @@
|
||||
module Jobs
|
||||
|
||||
class NotifyMovedPosts < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
raise Discourse::InvalidParameters.new(:post_ids) if args[:post_ids].blank?
|
||||
raise Discourse::InvalidParameters.new(:moved_by_id) if args[:moved_by_id].blank?
|
||||
|
||||
# Make sure we don't notify the same user twice (in case multiple posts were moved at once.)
|
||||
users_notified = Set.new
|
||||
posts = Post.where(id: args[:post_ids]).where('user_id <> ?', args[:moved_by_id]).includes(:user, :topic)
|
||||
if posts.present?
|
||||
moved_by = User.where(id: args[:moved_by_id]).first
|
||||
|
||||
posts.each do |p|
|
||||
unless users_notified.include?(p.user_id)
|
||||
p.user.notifications.create(notification_type: Notification.types[:moved_post],
|
||||
topic_id: p.topic_id,
|
||||
post_number: p.post_number,
|
||||
data: {topic_title: p.topic.title,
|
||||
display_username: moved_by.username}.to_json)
|
||||
users_notified << p.user_id
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,20 +0,0 @@
|
||||
require_dependency 'admin_user_index_query'
|
||||
|
||||
module Jobs
|
||||
|
||||
class PendingUsersReminder < Jobs::Scheduled
|
||||
|
||||
recurrence { daily.hour_of_day(9) }
|
||||
|
||||
def execute(args)
|
||||
if SiteSetting.must_approve_users
|
||||
count = AdminUserIndexQuery.new({query: 'pending'}).find_users_query.count
|
||||
if count > 0
|
||||
GroupMessage.create(Group[:moderators].name, :pending_users_reminder, {limit_once_per: false, message_params: {count: count}})
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,32 +0,0 @@
|
||||
require_dependency 'score_calculator'
|
||||
|
||||
module Jobs
|
||||
|
||||
# This job will run on a regular basis to update statistics and denormalized data.
|
||||
# If it does not run, the site will not function properly.
|
||||
class PeriodicalUpdates < Jobs::Scheduled
|
||||
recurrence { hourly.minute_of_hour(3, 18, 33, 48) }
|
||||
|
||||
def execute(args)
|
||||
|
||||
# Update the average times
|
||||
Post.calculate_avg_time
|
||||
Topic.calculate_avg_time
|
||||
|
||||
# Feature topics in categories
|
||||
CategoryFeaturedTopic.feature_topics
|
||||
|
||||
# Update view counts for users
|
||||
User.update_view_counts
|
||||
|
||||
# Update the scores of posts
|
||||
ScoreCalculator.new.calculate
|
||||
|
||||
# Refresh Hot Topics
|
||||
HotTopic.refresh!
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,35 +0,0 @@
|
||||
#
|
||||
# Connects to a mailbox and checks for replies
|
||||
#
|
||||
require 'net/pop'
|
||||
require_dependency 'email/receiver'
|
||||
|
||||
module Jobs
|
||||
class PollMailbox < Jobs::Scheduled
|
||||
recurrence { minutely }
|
||||
sidekiq_options retry: false
|
||||
|
||||
def execute(args)
|
||||
if SiteSetting.pop3s_polling_enabled?
|
||||
poll_pop3s
|
||||
end
|
||||
end
|
||||
|
||||
def poll_pop3s
|
||||
Net::POP3.enable_ssl(OpenSSL::SSL::VERIFY_NONE)
|
||||
Net::POP3.start(SiteSetting.pop3s_polling_host,
|
||||
SiteSetting.pop3s_polling_port,
|
||||
SiteSetting.pop3s_polling_username,
|
||||
SiteSetting.pop3s_polling_password) do |pop|
|
||||
unless pop.mails.empty?
|
||||
pop.each do |mail|
|
||||
if Email::Receiver.new(mail.pop).process == Email::Receiver.results[:processed]
|
||||
mail.delete
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,26 +0,0 @@
|
||||
require 'image_sizer'
|
||||
require_dependency 'cooked_post_processor'
|
||||
|
||||
module Jobs
|
||||
|
||||
class ProcessPost < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
post = Post.where(id: args[:post_id]).first
|
||||
# two levels of deletion
|
||||
return unless post.present? && post.topic.present?
|
||||
|
||||
if args[:cook].present?
|
||||
post.update_column(:cooked, post.cook(post.raw, topic_id: post.topic_id))
|
||||
end
|
||||
|
||||
cp = CookedPostProcessor.new(post, args)
|
||||
cp.post_process
|
||||
|
||||
# If we changed the document, save it
|
||||
post.update_column(:cooked, cp.html) if cp.dirty?
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,21 +0,0 @@
|
||||
require 'image_sizer'
|
||||
require_dependency 'system_message'
|
||||
|
||||
module Jobs
|
||||
|
||||
class SendSystemMessage < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
raise Discourse::InvalidParameters.new(:user_id) unless args[:user_id].present?
|
||||
raise Discourse::InvalidParameters.new(:message_type) unless args[:message_type].present?
|
||||
|
||||
user = User.where(id: args[:user_id]).first
|
||||
return if user.blank?
|
||||
|
||||
system_message = SystemMessage.new(user)
|
||||
system_message.create(args[:message_type])
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,18 +0,0 @@
|
||||
require_dependency 'email/sender'
|
||||
|
||||
module Jobs
|
||||
|
||||
# Asynchronously send an email
|
||||
class TestEmail < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
|
||||
raise Discourse::InvalidParameters.new(:to_address) unless args[:to_address].present?
|
||||
|
||||
message = TestMailer.send_test(args[:to_address])
|
||||
Email::Sender.new(message, :test_message).send
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,76 +0,0 @@
|
||||
require_dependency 'email/sender'
|
||||
|
||||
module Jobs
|
||||
|
||||
# Asynchronously send an email to a user
|
||||
class UserEmail < Jobs::Base
|
||||
|
||||
def execute(args)
|
||||
|
||||
# Required parameters
|
||||
raise Discourse::InvalidParameters.new(:user_id) unless args[:user_id].present?
|
||||
raise Discourse::InvalidParameters.new(:type) unless args[:type].present?
|
||||
|
||||
# Find the user
|
||||
user = User.where(id: args[:user_id]).first
|
||||
return if !user || user.is_banned?
|
||||
|
||||
seen_recently = (user.last_seen_at.present? && user.last_seen_at > SiteSetting.email_time_window_mins.minutes.ago)
|
||||
|
||||
email_args = {}
|
||||
|
||||
if args[:post_id]
|
||||
|
||||
# Don't email a user about a post when we've seen them recently.
|
||||
return if seen_recently
|
||||
|
||||
post = Post.where(id: args[:post_id]).first
|
||||
return unless post.present?
|
||||
|
||||
email_args[:post] = post
|
||||
end
|
||||
|
||||
email_args[:email_token] = args[:email_token] if args[:email_token].present?
|
||||
|
||||
notification = nil
|
||||
notification = Notification.where(id: args[:notification_id]).first if args[:notification_id].present?
|
||||
if notification.present?
|
||||
|
||||
# Don't email a user about a post when we've seen them recently.
|
||||
return if seen_recently
|
||||
|
||||
# Load the post if present
|
||||
email_args[:post] ||= notification.post
|
||||
email_args[:notification] = notification
|
||||
|
||||
# Don't send email if the notification this email is about has already been read
|
||||
return if notification.read?
|
||||
end
|
||||
|
||||
return if skip_email_for_post(email_args[:post], user)
|
||||
|
||||
# Make sure that mailer exists
|
||||
raise Discourse::InvalidParameters.new(:type) unless UserNotifications.respond_to?(args[:type])
|
||||
|
||||
message = UserNotifications.send(args[:type], user, email_args)
|
||||
# Update the to address if we have a custom one
|
||||
if args[:to_address].present?
|
||||
message.to = [args[:to_address]]
|
||||
end
|
||||
|
||||
Email::Sender.new(message, args[:type], user).send
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# If this email has a related post, don't send an email if it's been deleted or seen recently.
|
||||
def skip_email_for_post(post, user)
|
||||
post &&
|
||||
(post.topic.blank? ||
|
||||
post.user_deleted? ||
|
||||
PostTiming.where(topic_id: post.topic_id, post_number: post.post_number, user_id: user.id).present?)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -1,31 +0,0 @@
|
||||
require_dependency 'discourse_hub'
|
||||
require_dependency 'discourse_updates'
|
||||
|
||||
module Jobs
|
||||
class VersionCheck < Jobs::Scheduled
|
||||
recurrence { daily }
|
||||
|
||||
def execute(args)
|
||||
if SiteSetting.version_checks? and (DiscourseUpdates.updated_at.nil? or DiscourseUpdates.updated_at < 1.minute.ago)
|
||||
begin
|
||||
should_send_email = (SiteSetting.new_version_emails and DiscourseUpdates.missing_versions_count and DiscourseUpdates.missing_versions_count == 0)
|
||||
|
||||
json = DiscourseHub.discourse_version_check
|
||||
DiscourseUpdates.latest_version = json['latestVersion']
|
||||
DiscourseUpdates.critical_updates_available = json['criticalUpdates']
|
||||
DiscourseUpdates.missing_versions_count = json['missingVersionsCount']
|
||||
DiscourseUpdates.updated_at = Time.zone.now
|
||||
|
||||
if should_send_email and json['missingVersionsCount'] > 0
|
||||
message = VersionMailer.send_notice
|
||||
Email::Sender.new(message, :new_version).send
|
||||
end
|
||||
rescue => e
|
||||
raise e unless Rails.env == 'development' # Fail version check silently in development mode
|
||||
end
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user