mirror of
https://github.com/discourse/discourse.git
synced 2024-11-21 08:34:17 -06:00
DEV: Refactor uploads_importer
script (#29292)
* DEV: Implement uploads command entrypoint - Setup Thor UploadsCommand for CLI - First pass at modularizing various parts of the exising `uploads_import` script * DEV: First attempt at modularizing missing uploads fixer task Move missing upload fix to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads uploader task Move uploader to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads optimizer task Move optimizer to a dedicated uploads task implementation unit * DEV: Various follow up fixes to get optimization working - Start threads early - Improve "log" message formatting - Add missing `copy_to_tempfile` method on "uploader" task * DEV: Refactor a bit more Deduplicate and move most of threading premitives to base task as-is * DEV: Remove redundant condition in uploads db migration * DEV: More deduplication Move task retry logic to base class and tidy up other implementation details carried over from the existing script
This commit is contained in:
parent
4b8b7b2fb5
commit
6c91148db8
1
migrations/.gitignore
vendored
1
migrations/.gitignore
vendored
@ -4,3 +4,4 @@
|
||||
tmp/*
|
||||
private/
|
||||
Gemfile.lock
|
||||
/config/upload.yml
|
@ -19,5 +19,5 @@ module Migrations
|
||||
end
|
||||
end
|
||||
|
||||
CommandLineInterface.start
|
||||
Dir.chdir(Rails.root) { CommandLineInterface.start } # rubocop:disable Discourse/NoChdir
|
||||
end
|
||||
|
50
migrations/config/upload.yml.sample
Normal file
50
migrations/config/upload.yml.sample
Normal file
@ -0,0 +1,50 @@
|
||||
source_db_path: "/path/to/your/db.sqlite3"
|
||||
output_db_path: "/path/to/your/uploads.sqlite3"
|
||||
|
||||
root_paths:
|
||||
- "/path/to/your/files"
|
||||
- "/path/to/more/files"
|
||||
|
||||
# Files that are downloaded from URLs are cached in this directory.
|
||||
download_cache_path: "/path/to/downloaded/files"
|
||||
|
||||
# The number of threads to use for processing uploads is calculated as:
|
||||
# thread_count = [number of cores] * [thread_count_factor]
|
||||
# The thread count will be doubled if uploads are stored on S3 because there's a higher latency.
|
||||
thread_count_factor: 1.5
|
||||
|
||||
# Delete uploads from the output database that are not found in the source database.
|
||||
delete_surplus_uploads: false
|
||||
|
||||
# Delete uploads from the output database that do not have a Discourse upload record.
|
||||
delete_missing_uploads: false
|
||||
|
||||
# Check if files are missing in the upload store and update the database accordingly.
|
||||
# Set to false and re-run the script afterwards if you want to create new uploads for missing files.
|
||||
fix_missing: false
|
||||
|
||||
# Create optimized images for post uploads and avatars.
|
||||
create_optimized_images: false
|
||||
|
||||
site_settings:
|
||||
authorized_extensions: "*"
|
||||
max_attachment_size_kb: 102_400
|
||||
max_image_size_kb: 102_400
|
||||
|
||||
enable_s3_uploads: true
|
||||
s3_upload_bucket: "your-bucket-name"
|
||||
s3_region: "your-region"
|
||||
s3_access_key_id: "your-access-key-id"
|
||||
s3_secret_access_key: "your-secret-access-key"
|
||||
s3_cdn_url: "https://your-cdn-url.com"
|
||||
|
||||
# Set this to true if the site is a multisite and configure the `multisite_db_name` accordingly
|
||||
multisite: false
|
||||
multisite_db_name: "default"
|
||||
|
||||
# Sometimes a file can be found at one of many locations. Here's a list of transformations that can
|
||||
# be applied to the path to try and find the file. The first transformation that results in a file
|
||||
# being found will be used.
|
||||
path_replacements:
|
||||
# - ["/foo/", "/bar"]
|
||||
# - ["/foo/", "/bar/baz/"]
|
@ -0,0 +1,18 @@
|
||||
CREATE TABLE uploads
|
||||
(
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
upload JSON_TEXT,
|
||||
markdown TEXT,
|
||||
skip_reason TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE optimized_images
|
||||
(
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
optimized_images JSON_TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE downloads (
|
||||
id TEXT PRIMARY KEY NOT NULL,
|
||||
original_filename TEXT NOT NULL
|
||||
);
|
@ -3,9 +3,46 @@
|
||||
module Migrations::CLI::UploadCommand
|
||||
def self.included(thor)
|
||||
thor.class_eval do
|
||||
desc "upload", "Upload a file"
|
||||
desc "upload", "Upload media uploads"
|
||||
option :settings,
|
||||
type: :string,
|
||||
desc: "Uploads settings file path",
|
||||
default: "./migrations/config/upload.yml",
|
||||
aliases: "-s",
|
||||
banner: "path"
|
||||
option :fix_missing, type: :boolean, desc: "Fix missing uploads"
|
||||
option :optimize, type: :boolean, desc: "Optimize uploads"
|
||||
def upload
|
||||
puts "Uploading..."
|
||||
puts "Starting uploads..."
|
||||
|
||||
validate_settings_file!
|
||||
settings = load_settings
|
||||
|
||||
::Migrations::Uploader::Uploads.perform!(settings)
|
||||
|
||||
puts ""
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def load_settings
|
||||
settings = ::Migrations::SettingsParser.parse!(options.settings)
|
||||
merge_settings_from_cli_args!(settings)
|
||||
|
||||
settings
|
||||
end
|
||||
|
||||
def merge_settings_from_cli_args!(settings)
|
||||
settings[:fix_missing] = options.fix_missing if options.fix_missing.present?
|
||||
settings[:create_optimized_images] = options.optimize if options.optimize.present?
|
||||
end
|
||||
|
||||
def validate_settings_file!
|
||||
path = options.settings
|
||||
|
||||
if !File.exist?(path)
|
||||
raise ::Migrations::NoSettingsFound, "Settings file not found: #{path}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -10,6 +10,9 @@ require "zeitwerk"
|
||||
require_relative "converters"
|
||||
|
||||
module Migrations
|
||||
class NoSettingsFound < StandardError
|
||||
end
|
||||
|
||||
def self.root_path
|
||||
@root_path ||= File.expand_path("..", __dir__)
|
||||
end
|
||||
|
81
migrations/lib/settings_parser.rb
Normal file
81
migrations/lib/settings_parser.rb
Normal file
@ -0,0 +1,81 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations
|
||||
class SettingsParser
|
||||
class InvalidYaml < StandardError
|
||||
end
|
||||
class ValidationError < StandardError
|
||||
end
|
||||
|
||||
REQUIRED_KEYS = %i[source_db_path output_db_path root_paths]
|
||||
|
||||
def initialize(options)
|
||||
@options = options
|
||||
|
||||
validate!
|
||||
end
|
||||
|
||||
def [](key)
|
||||
@options[key]
|
||||
end
|
||||
|
||||
def []=(key, value)
|
||||
@options[key] = value
|
||||
end
|
||||
|
||||
def fetch(key, default)
|
||||
@options.fetch(key, default)
|
||||
end
|
||||
|
||||
def self.parse!(path)
|
||||
new(YAML.load_file(path, symbolize_names: true))
|
||||
rescue Psych::SyntaxError => e
|
||||
raise InvalidYaml.new(e.message)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def validate!
|
||||
validate_required_keys
|
||||
validate_paths
|
||||
validate_options
|
||||
end
|
||||
|
||||
def validate_required_keys
|
||||
missing = REQUIRED_KEYS - @options.keys
|
||||
|
||||
raise ValidationError, "Missing required keys: #{missing.join(", ")}" if missing.any?
|
||||
end
|
||||
|
||||
def validate_paths
|
||||
%i[source_db_path output_db_path].each do |key|
|
||||
path = @options[key]
|
||||
|
||||
next unless path
|
||||
|
||||
dir = File.dirname(path)
|
||||
raise ValidationError, "Directory not writable: #{dir}" unless File.writable?(dir)
|
||||
end
|
||||
|
||||
if !@options[:root_paths].is_a?(Array)
|
||||
raise ValidationError, "Root paths must be an array of paths"
|
||||
end
|
||||
|
||||
@options[:root_paths].each do |path|
|
||||
raise ValidationError, "Directory not readable: #{path}" unless File.readable?(path)
|
||||
end
|
||||
end
|
||||
|
||||
def validate_options
|
||||
validate_thread_count_factor if @options[:thread_count_factor]
|
||||
end
|
||||
|
||||
def validate_thread_count_factor
|
||||
count = @options[:thread_count_factor]
|
||||
|
||||
unless count.is_a?(Numeric) && count.positive?
|
||||
raise ValidationError, "Thread count factor must be numeric and positive"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
80
migrations/lib/uploader/site_settings.rb
Normal file
80
migrations/lib/uploader/site_settings.rb
Normal file
@ -0,0 +1,80 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations::Uploader
|
||||
class SiteSettings
|
||||
class S3UploadsConfigurationError < StandardError
|
||||
end
|
||||
|
||||
def initialize(options)
|
||||
@options = options
|
||||
end
|
||||
|
||||
def configure!
|
||||
configure_basic_uploads
|
||||
configure_multisite if @options[:multisite]
|
||||
configure_s3 if @options[:enable_s3_uploads]
|
||||
end
|
||||
|
||||
def self.configure!(options)
|
||||
new(options).configure!
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def configure_basic_uploads
|
||||
SiteSetting.clean_up_uploads = false
|
||||
SiteSetting.authorized_extensions = @options[:authorized_extensions]
|
||||
SiteSetting.max_attachment_size_kb = @options[:max_attachment_size_kb]
|
||||
SiteSetting.max_image_size_kb = @options[:max_image_size_kb]
|
||||
end
|
||||
|
||||
def configure_multisite
|
||||
# rubocop:disable Discourse/NoDirectMultisiteManipulation
|
||||
Rails.configuration.multisite = true
|
||||
# rubocop:enable Discourse/NoDirectMultisiteManipulation
|
||||
|
||||
RailsMultisite::ConnectionManagement.class_eval do
|
||||
def self.current_db_override=(value)
|
||||
@current_db_override = value
|
||||
end
|
||||
def self.current_db
|
||||
@current_db_override
|
||||
end
|
||||
end
|
||||
|
||||
RailsMultisite::ConnectionManagement.current_db_override = @options[:multisite_db_name]
|
||||
end
|
||||
|
||||
def configure_s3
|
||||
SiteSetting.s3_access_key_id = @options[:s3_access_key_id]
|
||||
SiteSetting.s3_secret_access_key = @options[:s3_secret_access_key]
|
||||
SiteSetting.s3_upload_bucket = @options[:s3_upload_bucket]
|
||||
SiteSetting.s3_region = @options[:s3_region]
|
||||
SiteSetting.s3_cdn_url = @options[:s3_cdn_url]
|
||||
SiteSetting.enable_s3_uploads = true
|
||||
|
||||
if SiteSetting.enable_s3_uploads != true
|
||||
raise S3UploadsConfigurationError, "Failed to enable S3 uploads"
|
||||
end
|
||||
|
||||
verify_s3_uploads_configuration!
|
||||
end
|
||||
|
||||
def verify_s3_uploads_configuration!
|
||||
Tempfile.open("discourse-s3-test") do |tmpfile|
|
||||
tmpfile.write("test")
|
||||
tmpfile.rewind
|
||||
|
||||
upload =
|
||||
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(Discourse::SYSTEM_USER_ID)
|
||||
|
||||
unless upload.present? && upload.persisted? && upload.errors.blank? &&
|
||||
upload.url.start_with?("//")
|
||||
raise S3UploadsConfigurationError, "Failed to upload to S3"
|
||||
end
|
||||
|
||||
upload.destroy
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
140
migrations/lib/uploader/tasks/base.rb
Normal file
140
migrations/lib/uploader/tasks/base.rb
Normal file
@ -0,0 +1,140 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "etc"
|
||||
require "colored2"
|
||||
|
||||
module Migrations::Uploader
|
||||
module Tasks
|
||||
class Base
|
||||
class NotImplementedError < StandardError
|
||||
end
|
||||
|
||||
TRANSACTION_SIZE = 1000
|
||||
QUEUE_SIZE = 1000
|
||||
DEFAULT_THREAD_FACTOR = 1.5
|
||||
|
||||
attr_reader :uploads_db,
|
||||
:intermediate_db,
|
||||
:settings,
|
||||
:work_queue,
|
||||
:status_queue,
|
||||
:discourse_store,
|
||||
:error_count,
|
||||
:current_count,
|
||||
:missing_count,
|
||||
:skipped_count
|
||||
|
||||
def initialize(databases, settings)
|
||||
@uploads_db = databases[:uploads_db]
|
||||
@intermediate_db = databases[:intermediate_db]
|
||||
|
||||
@settings = settings
|
||||
|
||||
@work_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
@status_queue = SizedQueue.new(QUEUE_SIZE)
|
||||
@discourse_store = Discourse.store
|
||||
|
||||
@error_count = 0
|
||||
@current_count = 0
|
||||
@missing_count = 0
|
||||
@skipped_count = 0
|
||||
end
|
||||
|
||||
def run!
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
||||
def self.run!(databases, settings)
|
||||
new(databases, settings).run!
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def handle_status_update
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
||||
def enqueue_jobs
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
||||
def instantiate_task_resource
|
||||
{}
|
||||
end
|
||||
|
||||
def start_status_thread
|
||||
Thread.new do
|
||||
while !(result = status_queue.pop).nil?
|
||||
handle_status_update(result)
|
||||
log_status
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def start_consumer_threads
|
||||
thread_count.times.map { |index| consumer_thread(index) }
|
||||
end
|
||||
|
||||
def consumer_thread(index)
|
||||
Thread.new do
|
||||
Thread.current.name = "worker-#{index}"
|
||||
resource = instantiate_task_resource
|
||||
|
||||
while (row = work_queue.pop)
|
||||
process_upload(row, resource)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def start_producer_thread
|
||||
Thread.new { enqueue_jobs }
|
||||
end
|
||||
|
||||
def thread_count
|
||||
@thread_count ||= calculate_thread_count
|
||||
end
|
||||
|
||||
def add_multisite_prefix(path)
|
||||
return path if !Rails.configuration.multisite
|
||||
|
||||
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path)
|
||||
end
|
||||
|
||||
def file_exists?(path)
|
||||
if discourse_store.external?
|
||||
discourse_store.object_from_path(path).exists?
|
||||
else
|
||||
File.exist?(File.join(discourse_store.public_dir, path))
|
||||
end
|
||||
end
|
||||
|
||||
def with_retries(max: 3)
|
||||
count = 0
|
||||
|
||||
loop do
|
||||
result = yield
|
||||
break result if result
|
||||
|
||||
count += 1
|
||||
break nil if count >= max
|
||||
|
||||
sleep(calculate_backoff(count))
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def calculate_backoff(retry_count)
|
||||
0.25 * retry_count
|
||||
end
|
||||
|
||||
def calculate_thread_count
|
||||
base = Etc.nprocessors
|
||||
thread_count_factor = settings.fetch(:thread_count_factor, DEFAULT_THREAD_FACTOR)
|
||||
store_factor = discourse_store.external? ? 2 : 1
|
||||
|
||||
(base * thread_count_factor * store_factor).to_i
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
84
migrations/lib/uploader/tasks/fixer.rb
Normal file
84
migrations/lib/uploader/tasks/fixer.rb
Normal file
@ -0,0 +1,84 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations::Uploader
|
||||
module Tasks
|
||||
class Fixer < Base
|
||||
def run!
|
||||
return if max_count.zero?
|
||||
|
||||
puts "Fixing missing uploads..."
|
||||
|
||||
status_thread = start_status_thread
|
||||
consumer_threads = start_consumer_threads
|
||||
producer_thread = start_producer_thread
|
||||
|
||||
producer_thread.join
|
||||
work_queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def max_count
|
||||
@max_count ||=
|
||||
uploads_db.db.query_single_splat("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
|
||||
end
|
||||
|
||||
def enqueue_jobs
|
||||
uploads_db
|
||||
.db
|
||||
.query(
|
||||
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
|
||||
) { |row| work_queue << row }
|
||||
end
|
||||
|
||||
def instantiate_task_resource
|
||||
OpenStruct.new(url: "")
|
||||
end
|
||||
|
||||
def handle_status_update(result)
|
||||
@current_count += 1
|
||||
|
||||
case result[:status]
|
||||
when :ok
|
||||
# ignore
|
||||
when :error
|
||||
@error_count += 1
|
||||
puts " Error in #{result[:id]}"
|
||||
when :missing
|
||||
@missing_count += 1
|
||||
puts " Missing #{result[:id]}"
|
||||
|
||||
uploads_db.db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
|
||||
Upload.delete_by(id: result[:upload_id])
|
||||
end
|
||||
end
|
||||
|
||||
def process_upload(row, fake_upload)
|
||||
upload = JSON.parse(row[:upload], symbolize_names: true)
|
||||
fake_upload.url = upload[:url]
|
||||
path = add_multisite_prefix(discourse_store.get_path_for_upload(fake_upload))
|
||||
|
||||
status = file_exists?(path) ? :ok : :missing
|
||||
|
||||
update_status_queue(row, upload, status)
|
||||
rescue StandardError => error
|
||||
puts error.message
|
||||
status = :error
|
||||
update_status_queue(row, upload, status)
|
||||
end
|
||||
|
||||
def update_status_queue(row, upload, status)
|
||||
status_queue << { id: row[:id], upload_id: upload[:id], status: status }
|
||||
end
|
||||
|
||||
def log_status
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
print "\r%7d / %7d (%s, %s missing)" %
|
||||
[current_count, max_count, error_count_text, missing_count]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
244
migrations/lib/uploader/tasks/optimizer.rb
Normal file
244
migrations/lib/uploader/tasks/optimizer.rb
Normal file
@ -0,0 +1,244 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations::Uploader
|
||||
module Tasks
|
||||
class Optimizer < Base
|
||||
def initialize(databases, settings)
|
||||
super(databases, settings)
|
||||
|
||||
initialize_existing_ids_tracking_sets
|
||||
initialize_discourse_resources
|
||||
@max_count = 0
|
||||
end
|
||||
|
||||
def run!
|
||||
puts "", "Creating optimized images..."
|
||||
|
||||
disable_optimized_image_lock
|
||||
|
||||
start_tracking_sets_loader_threads.each(&:join)
|
||||
status_thread = start_status_thread
|
||||
consumer_threads = start_consumer_threads
|
||||
producer_thread = start_producer_thread
|
||||
|
||||
producer_thread.join
|
||||
work_queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def initialize_existing_ids_tracking_sets
|
||||
@optimized_upload_ids = Set.new
|
||||
@post_upload_ids = Set.new
|
||||
@avatar_upload_ids = Set.new
|
||||
end
|
||||
|
||||
def initialize_discourse_resources
|
||||
@avatar_sizes = Discourse.avatar_sizes
|
||||
@system_user = Discourse.system_user
|
||||
@category_id = Category.last.id
|
||||
end
|
||||
|
||||
def disable_optimized_image_lock
|
||||
# allow more than 1 thread to optimized images at the same time
|
||||
OptimizedImage.lock_per_machine = false
|
||||
end
|
||||
|
||||
def start_tracking_sets_loader_threads
|
||||
[
|
||||
start_optimized_upload_ids_loader_thread,
|
||||
start_post_upload_ids_loader_thread,
|
||||
start_avatar_upload_ids_loader_thread,
|
||||
start_max_count_loader_thread,
|
||||
]
|
||||
end
|
||||
|
||||
def handle_status_update(params)
|
||||
@current_count += 1
|
||||
|
||||
case params.delete(:status)
|
||||
when :ok
|
||||
uploads_db.insert(<<~SQL, params)
|
||||
INSERT INTO optimized_images (id, optimized_images)
|
||||
VALUES (:id, :optimized_images)
|
||||
SQL
|
||||
when :error
|
||||
@error_count += 1
|
||||
when :skipped
|
||||
@skipped_count += 1
|
||||
end
|
||||
end
|
||||
|
||||
def start_optimized_upload_ids_loader_thread
|
||||
Thread.new do
|
||||
@uploads_db
|
||||
.db
|
||||
.query("SELECT id FROM optimized_images") { |row| @optimized_upload_ids << row[:id] }
|
||||
end
|
||||
end
|
||||
|
||||
def start_post_upload_ids_loader_thread
|
||||
Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT upload_ids
|
||||
FROM posts
|
||||
WHERE upload_ids IS NOT NULL
|
||||
SQL
|
||||
|
||||
@intermediate_db
|
||||
.db
|
||||
.query(sql) { |row| JSON.parse(row[:upload_ids]).each { |id| @post_upload_ids << id } }
|
||||
end
|
||||
end
|
||||
|
||||
def start_avatar_upload_ids_loader_thread
|
||||
Thread.new do
|
||||
sql = <<~SQL
|
||||
SELECT avatar_upload_id
|
||||
FROM users
|
||||
WHERE avatar_upload_id IS NOT NULL
|
||||
SQL
|
||||
|
||||
@intermediate_db.db.query(sql) { |row| @avatar_upload_ids << row[:avatar_upload_id] }
|
||||
end
|
||||
end
|
||||
|
||||
def start_max_count_loader_thread
|
||||
Thread.new do
|
||||
@max_count =
|
||||
@uploads_db.db.query_single_splat(
|
||||
"SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL",
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def enqueue_jobs
|
||||
sql = <<~SQL
|
||||
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
|
||||
FROM uploads
|
||||
WHERE upload IS NOT NULL
|
||||
ORDER BY rowid
|
||||
SQL
|
||||
|
||||
@uploads_db
|
||||
.db
|
||||
.query(sql) do |row|
|
||||
upload_id = row[:upload_id]
|
||||
|
||||
if @optimized_upload_ids.include?(upload_id) || !row[:markdown].start_with?("![")
|
||||
status_queue << { id: row[:upload_id], status: :skipped }
|
||||
next
|
||||
end
|
||||
|
||||
if @post_upload_ids.include?(upload_id)
|
||||
row[:type] = "post"
|
||||
work_queue << row
|
||||
elsif @avatar_upload_ids.include?(upload_id)
|
||||
row[:type] = "avatar"
|
||||
work_queue << row
|
||||
else
|
||||
status_queue << { id: row[:upload_id], status: :skipped }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def start_consumer_threads
|
||||
Jobs.run_immediately!
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
def log_status
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
|
||||
print "\r%7d / %7d (%s, %d skipped)" %
|
||||
[current_count, @max_count, error_count_text, skipped_count]
|
||||
end
|
||||
|
||||
def instantiate_task_resource
|
||||
PostCreator.new(
|
||||
@system_user,
|
||||
raw: "Topic created by uploads_importer",
|
||||
acting_user: @system_user,
|
||||
skip_validations: true,
|
||||
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
|
||||
archetype: Archetype.default,
|
||||
category: @category_id,
|
||||
).create!
|
||||
end
|
||||
|
||||
def process_upload(row, post)
|
||||
result = with_retries { attempt_optimization(row, post) }
|
||||
status_queue << (result || { id: row[:upload_id], status: :error })
|
||||
end
|
||||
|
||||
def attempt_optimization(row, post)
|
||||
upload = Upload.find_by(sha1: row[:upload_sha1])
|
||||
optimized_images = create_optimized_images(row[:type], row[:markdown], upload, post)
|
||||
|
||||
return if optimized_images.blank?
|
||||
|
||||
processed_optimized_images = process_optimized_images(optimized_images)
|
||||
|
||||
if images_valid?(processed_optimized_images)
|
||||
{
|
||||
id: row[:upload_id],
|
||||
optimized_images: serialize_optimized_images(processed_optimized_images),
|
||||
status: :ok,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
def images_valid?(images)
|
||||
!images.nil? && images.all?(&:present?) && images.all?(&:persisted?) &&
|
||||
images.all? { |o| o.errors.blank? }
|
||||
end
|
||||
|
||||
def serialize_optimized_images(images)
|
||||
images.presence&.to_json(only: OptimizedImage.column_names)
|
||||
end
|
||||
|
||||
def create_optimized_images(type, markdown, upload, post)
|
||||
case type
|
||||
when "post"
|
||||
post.update_columns(baked_at: nil, cooked: "", raw: markdown)
|
||||
post.reload
|
||||
post.rebake!
|
||||
|
||||
OptimizedImage.where(upload_id: upload.id).to_a
|
||||
when "avatar"
|
||||
@avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
|
||||
end
|
||||
rescue StandardError => e
|
||||
puts e.message
|
||||
puts e.stacktrace
|
||||
|
||||
nil
|
||||
end
|
||||
|
||||
def process_optimized_images(images)
|
||||
begin
|
||||
images.map! do |image|
|
||||
next if image.blank?
|
||||
|
||||
image_path = add_multisite_prefix(discourse_store.get_path_for_optimized_image(image))
|
||||
|
||||
unless file_exists?(image_path)
|
||||
image.destroy
|
||||
image = nil
|
||||
end
|
||||
|
||||
image
|
||||
end
|
||||
rescue StandardError
|
||||
images = nil
|
||||
end
|
||||
|
||||
images
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
327
migrations/lib/uploader/tasks/uploader.rb
Normal file
327
migrations/lib/uploader/tasks/uploader.rb
Normal file
@ -0,0 +1,327 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations::Uploader
|
||||
module Tasks
|
||||
class Uploader < Base
|
||||
MAX_FILE_SIZE = 1.gigabyte
|
||||
|
||||
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
|
||||
|
||||
def run!
|
||||
puts "", "Uploading uploads..."
|
||||
|
||||
process_existing_uploads
|
||||
|
||||
status_thread = start_status_thread
|
||||
consumer_threads = start_consumer_threads
|
||||
producer_thread = start_producer_thread
|
||||
|
||||
producer_thread.join
|
||||
work_queue.close
|
||||
consumer_threads.each(&:join)
|
||||
status_queue.close
|
||||
status_thread.join
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def process_existing_uploads
|
||||
delete_missing_uploads if settings[:delete_missing_uploads]
|
||||
initialize_existing_ids_tracking_sets
|
||||
handle_surplus_uploads if surplus_upload_ids.any?
|
||||
|
||||
@max_count = (@source_existing_ids - @output_existing_ids).size
|
||||
@source_existing_ids = nil
|
||||
|
||||
puts "Found #{@output_existing_ids.size} existing uploads. #{@max_count} are missing."
|
||||
end
|
||||
|
||||
def initialize_existing_ids_tracking_sets
|
||||
@output_existing_ids = load_existing_ids(uploads_db.db, Set.new)
|
||||
@source_existing_ids = load_existing_ids(intermediate_db.db, Set.new)
|
||||
end
|
||||
|
||||
def load_existing_ids(db, set)
|
||||
db.query("SELECT id FROM uploads") { |row| set << row[:id] }
|
||||
|
||||
set
|
||||
end
|
||||
|
||||
def handle_surplus_uploads
|
||||
if settings[:delete_surplus_uploads]
|
||||
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
|
||||
|
||||
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
|
||||
placeholders = (["?"] * ids.size).join(",")
|
||||
uploads_db.db.execute(<<~SQL, ids)
|
||||
DELETE FROM uploads
|
||||
WHERE id IN (#{placeholders})
|
||||
SQL
|
||||
end
|
||||
|
||||
@output_existing_ids -= surplus_upload_ids
|
||||
else
|
||||
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
|
||||
"Run with `delete_surplus_uploads: true` to delete them."
|
||||
end
|
||||
|
||||
@surplus_upload_ids = nil
|
||||
end
|
||||
|
||||
def surplus_upload_ids
|
||||
@surplus_upload_ids ||= @output_existing_ids - @source_existing_ids
|
||||
end
|
||||
|
||||
def handle_status_update(params)
|
||||
@current_count += 1
|
||||
|
||||
begin
|
||||
if params.delete(:skipped) == true
|
||||
@skipped_count += 1
|
||||
elsif (error_message = params.delete(:error)) || params[:upload].nil?
|
||||
@error_count += 1
|
||||
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
|
||||
end
|
||||
|
||||
uploads_db.insert(<<~SQL, params)
|
||||
INSERT INTO uploads (id, upload, markdown, skip_reason)
|
||||
VALUES (:id, :upload, :markdown, :skip_reason)
|
||||
SQL
|
||||
rescue StandardError => e
|
||||
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
|
||||
@error_count += 1
|
||||
end
|
||||
end
|
||||
|
||||
def enqueue_jobs
|
||||
intermediate_db
|
||||
.db
|
||||
.query("SELECT * FROM uploads ORDER BY id") do |row|
|
||||
work_queue << row if @output_existing_ids.exclude?(row[:id])
|
||||
end
|
||||
end
|
||||
|
||||
def find_file_in_paths(row)
|
||||
relative_path = row[:relative_path] || ""
|
||||
|
||||
settings[:root_paths].each do |root_path|
|
||||
path = File.join(root_path, relative_path, row[:filename])
|
||||
|
||||
return path if File.exist?(path)
|
||||
|
||||
settings[:path_replacements].each do |from, to|
|
||||
path = File.join(root_path, relative_path.sub(from, to), row[:filename])
|
||||
|
||||
return path if File.exist?(path)
|
||||
end
|
||||
end
|
||||
|
||||
nil
|
||||
end
|
||||
|
||||
def handle_missing_file(row)
|
||||
status_queue << { id: row[:id], upload: nil, skipped: true, skip_reason: "file not found" }
|
||||
end
|
||||
|
||||
def process_upload(row, _)
|
||||
metadata = build_metadata(row)
|
||||
data_file = nil
|
||||
path = nil
|
||||
|
||||
if row[:data].present?
|
||||
data_file = Tempfile.new("discourse-upload", binmode: true)
|
||||
data_file.write(row[:data])
|
||||
data_file.rewind
|
||||
path = data_file.path
|
||||
elsif row[:url].present?
|
||||
path, metadata.original_filename = download_file(url: row[:url], id: row[:id])
|
||||
metadata.origin_url = row[:url]
|
||||
return if !path
|
||||
else
|
||||
path = find_file_in_paths(row)
|
||||
return handle_missing_file(row) if path.nil?
|
||||
end
|
||||
|
||||
error_message = nil
|
||||
result =
|
||||
with_retries do
|
||||
upload =
|
||||
copy_to_tempfile(path) do |file|
|
||||
begin
|
||||
UploadCreator.new(
|
||||
file,
|
||||
metadata.original_filename,
|
||||
type: row[:type],
|
||||
origin: metadata.origin_url,
|
||||
).create_for(Discourse::SYSTEM_USER_ID)
|
||||
rescue StandardError => e
|
||||
error_message = e.message
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
|
||||
upload_path = add_multisite_prefix(discourse_store.get_path_for_upload(upload))
|
||||
|
||||
unless file_exists?(upload_path)
|
||||
upload.destroy
|
||||
upload = nil
|
||||
upload_okay = false
|
||||
end
|
||||
end
|
||||
|
||||
if upload_okay
|
||||
{
|
||||
id: row[:id],
|
||||
upload: upload.attributes.to_json,
|
||||
markdown:
|
||||
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
|
||||
skip_reason: nil,
|
||||
}
|
||||
else
|
||||
error_message =
|
||||
upload&.errors&.full_messages&.join(", ") || error_message || "unknown error"
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
if result.nil?
|
||||
status_queue << {
|
||||
id: row[:id],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: "too many retries: #{error_message}",
|
||||
skip_reason: "too many retries",
|
||||
}
|
||||
else
|
||||
status_queue << result
|
||||
end
|
||||
rescue StandardError => e
|
||||
status_queue << {
|
||||
id: row[:id],
|
||||
upload: nil,
|
||||
markdown: nil,
|
||||
error: e.message,
|
||||
skip_reason: "error",
|
||||
}
|
||||
ensure
|
||||
data_file&.close!
|
||||
end
|
||||
|
||||
def build_metadata(row)
|
||||
UploadMetadata.new(
|
||||
original_filename: row[:display_filename] || row[:filename],
|
||||
description: row[:description].presence,
|
||||
)
|
||||
end
|
||||
|
||||
def delete_missing_uploads
|
||||
puts "Deleting missing uploads from uploads database..."
|
||||
|
||||
uploads_db.db.execute(<<~SQL)
|
||||
DELETE FROM uploads
|
||||
WHERE upload IS NULL
|
||||
SQL
|
||||
end
|
||||
|
||||
def download_file(url:, id:, retry_count: 0)
|
||||
path = download_cache_path(id)
|
||||
original_filename = nil
|
||||
|
||||
if File.exist?(path) && (original_filename = get_original_filename(id))
|
||||
return path, original_filename
|
||||
end
|
||||
|
||||
fd = FinalDestination.new(url)
|
||||
file = nil
|
||||
|
||||
fd.get do |response, chunk, uri|
|
||||
if file.nil?
|
||||
check_response!(response, uri)
|
||||
original_filename = extract_filename_from_response(response, uri)
|
||||
file = File.open(path, "wb")
|
||||
end
|
||||
|
||||
file.write(chunk)
|
||||
|
||||
if file.size > MAX_FILE_SIZE
|
||||
file.close
|
||||
file.unlink
|
||||
file = nil
|
||||
throw :done
|
||||
end
|
||||
end
|
||||
|
||||
if file
|
||||
file.close
|
||||
uploads_db.insert(
|
||||
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
|
||||
[id, original_filename],
|
||||
)
|
||||
return path, original_filename
|
||||
end
|
||||
|
||||
nil
|
||||
end
|
||||
|
||||
def download_cache_path(id)
|
||||
id = id.gsub("/", "_").gsub("=", "-")
|
||||
File.join(settings[:download_cache_path], id)
|
||||
end
|
||||
|
||||
def get_original_filename(id)
|
||||
uploads_db.db.query_single_splat("SELECT original_filename FROM downloads WHERE id = ?", id)
|
||||
end
|
||||
|
||||
def check_response!(response, uri)
|
||||
if uri.blank?
|
||||
code = response.code.to_i
|
||||
|
||||
if code >= 400
|
||||
raise "#{code} Error"
|
||||
else
|
||||
throw :done
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def extract_filename_from_response(response, uri)
|
||||
filename =
|
||||
if (header = response.header["Content-Disposition"].presence)
|
||||
disposition_filename =
|
||||
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
|
||||
if disposition_filename.present?
|
||||
URI.decode_www_form_component(disposition_filename)
|
||||
else
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
filename = File.basename(uri.path).presence || "file" if filename.blank?
|
||||
|
||||
if File.extname(filename).blank? && response.content_type.present?
|
||||
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
|
||||
filename = "#{filename}.#{ext}" if ext.present?
|
||||
end
|
||||
|
||||
filename
|
||||
end
|
||||
|
||||
def copy_to_tempfile(source_path)
|
||||
extension = File.extname(source_path)
|
||||
|
||||
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
|
||||
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
|
||||
tmpfile.rewind
|
||||
yield(tmpfile)
|
||||
end
|
||||
end
|
||||
|
||||
def log_status
|
||||
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
|
||||
print "\r%7d / %7d (%s, %s skipped)" %
|
||||
[current_count, @max_count, error_count_text, skipped_count]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
80
migrations/lib/uploader/uploads.rb
Normal file
80
migrations/lib/uploader/uploads.rb
Normal file
@ -0,0 +1,80 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module Migrations::Uploader
|
||||
class Uploads
|
||||
attr_reader :settings, :databases
|
||||
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
@databases = setup_databases
|
||||
configure_services
|
||||
end
|
||||
|
||||
def perform!
|
||||
tasks = build_task_pipeline
|
||||
tasks.each { |task| task.run!(databases, settings) }
|
||||
ensure
|
||||
cleanup_resources
|
||||
end
|
||||
|
||||
def self.perform!(settings = {})
|
||||
new(settings).perform!
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def build_task_pipeline
|
||||
[].tap do |tasks|
|
||||
tasks << Tasks::Fixer if settings[:fix_missing]
|
||||
tasks << Tasks::Uploader
|
||||
tasks << Tasks::Optimizer if settings[:create_optimized_images]
|
||||
end
|
||||
end
|
||||
|
||||
def setup_databases
|
||||
run_uploads_db_migrations
|
||||
|
||||
{
|
||||
uploads_db: create_database_connection(:uploads),
|
||||
intermediate_db: create_database_connection(:intermediate),
|
||||
}
|
||||
end
|
||||
|
||||
def create_database_connection(type)
|
||||
path = type == :uploads ? settings[:output_db_path] : settings[:source_db_path]
|
||||
|
||||
# TODO: Using "raw" db connection here for now
|
||||
# Investigate using ::Migrations::Database::IntermediateDB.setup(db)
|
||||
# Should we have a ::Migrations::Database::UploadsDB.setup(db)?
|
||||
::Migrations::Database.connect(path)
|
||||
end
|
||||
|
||||
def run_uploads_db_migrations
|
||||
::Migrations::Database.migrate(
|
||||
settings[:output_db_path],
|
||||
migrations_path: ::Migrations::Database::UPLOADS_DB_SCHEMA_PATH,
|
||||
)
|
||||
end
|
||||
|
||||
def configure_services
|
||||
configure_logging
|
||||
configure_site_settings
|
||||
end
|
||||
|
||||
def configure_logging
|
||||
@original_exifr_logger = EXIFR.logger
|
||||
|
||||
# disable logging for EXIFR which is used by ImageOptim
|
||||
EXIFR.logger = Logger.new(nil)
|
||||
end
|
||||
|
||||
def configure_site_settings
|
||||
SiteSettings.configure!(settings[:site_settings])
|
||||
end
|
||||
|
||||
def cleanup_resources
|
||||
databases.values.each(&:close)
|
||||
EXIFR.logger = @original_exifr_logger
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue
Block a user