From 6c91148db80f32318811952604f308b1c6d583fd Mon Sep 17 00:00:00 2001 From: Selase Krakani <849886+s3lase@users.noreply.github.com> Date: Thu, 31 Oct 2024 13:31:12 +0000 Subject: [PATCH] DEV: Refactor `uploads_importer` script (#29292) * DEV: Implement uploads command entrypoint - Setup Thor UploadsCommand for CLI - First pass at modularizing various parts of the exising `uploads_import` script * DEV: First attempt at modularizing missing uploads fixer task Move missing upload fix to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads uploader task Move uploader to a dedicated uploads task implementation unit * DEV: First attempt at modularizing missing uploads optimizer task Move optimizer to a dedicated uploads task implementation unit * DEV: Various follow up fixes to get optimization working - Start threads early - Improve "log" message formatting - Add missing `copy_to_tempfile` method on "uploader" task * DEV: Refactor a bit more Deduplicate and move most of threading premitives to base task as-is * DEV: Remove redundant condition in uploads db migration * DEV: More deduplication Move task retry logic to base class and tidy up other implementation details carried over from the existing script --- migrations/.gitignore | 1 + migrations/bin/cli | 2 +- migrations/config/upload.yml.sample | 50 +++ .../db/uploads_db_schema/100-base-schema.sql | 18 + migrations/lib/cli/upload_command.rb | 41 ++- migrations/lib/migrations.rb | 3 + migrations/lib/settings_parser.rb | 81 +++++ migrations/lib/uploader/site_settings.rb | 80 +++++ migrations/lib/uploader/tasks/base.rb | 140 ++++++++ migrations/lib/uploader/tasks/fixer.rb | 84 +++++ migrations/lib/uploader/tasks/optimizer.rb | 244 +++++++++++++ migrations/lib/uploader/tasks/uploader.rb | 327 ++++++++++++++++++ migrations/lib/uploader/uploads.rb | 80 +++++ 13 files changed, 1148 insertions(+), 3 deletions(-) create mode 100644 migrations/config/upload.yml.sample create mode 100644 migrations/lib/settings_parser.rb create mode 100644 migrations/lib/uploader/site_settings.rb create mode 100644 migrations/lib/uploader/tasks/base.rb create mode 100644 migrations/lib/uploader/tasks/fixer.rb create mode 100644 migrations/lib/uploader/tasks/optimizer.rb create mode 100644 migrations/lib/uploader/tasks/uploader.rb create mode 100644 migrations/lib/uploader/uploads.rb diff --git a/migrations/.gitignore b/migrations/.gitignore index 702cfb5955d..ceb138b9512 100644 --- a/migrations/.gitignore +++ b/migrations/.gitignore @@ -4,3 +4,4 @@ tmp/* private/ Gemfile.lock +/config/upload.yml \ No newline at end of file diff --git a/migrations/bin/cli b/migrations/bin/cli index e6a5ea96b37..f34f76fafc5 100755 --- a/migrations/bin/cli +++ b/migrations/bin/cli @@ -19,5 +19,5 @@ module Migrations end end - CommandLineInterface.start + Dir.chdir(Rails.root) { CommandLineInterface.start } # rubocop:disable Discourse/NoChdir end diff --git a/migrations/config/upload.yml.sample b/migrations/config/upload.yml.sample new file mode 100644 index 00000000000..8c311c6a28c --- /dev/null +++ b/migrations/config/upload.yml.sample @@ -0,0 +1,50 @@ +source_db_path: "/path/to/your/db.sqlite3" +output_db_path: "/path/to/your/uploads.sqlite3" + +root_paths: + - "/path/to/your/files" + - "/path/to/more/files" + +# Files that are downloaded from URLs are cached in this directory. +download_cache_path: "/path/to/downloaded/files" + +# The number of threads to use for processing uploads is calculated as: +# thread_count = [number of cores] * [thread_count_factor] +# The thread count will be doubled if uploads are stored on S3 because there's a higher latency. +thread_count_factor: 1.5 + +# Delete uploads from the output database that are not found in the source database. +delete_surplus_uploads: false + +# Delete uploads from the output database that do not have a Discourse upload record. +delete_missing_uploads: false + +# Check if files are missing in the upload store and update the database accordingly. +# Set to false and re-run the script afterwards if you want to create new uploads for missing files. +fix_missing: false + +# Create optimized images for post uploads and avatars. +create_optimized_images: false + +site_settings: + authorized_extensions: "*" + max_attachment_size_kb: 102_400 + max_image_size_kb: 102_400 + + enable_s3_uploads: true + s3_upload_bucket: "your-bucket-name" + s3_region: "your-region" + s3_access_key_id: "your-access-key-id" + s3_secret_access_key: "your-secret-access-key" + s3_cdn_url: "https://your-cdn-url.com" + + # Set this to true if the site is a multisite and configure the `multisite_db_name` accordingly + multisite: false + multisite_db_name: "default" + +# Sometimes a file can be found at one of many locations. Here's a list of transformations that can +# be applied to the path to try and find the file. The first transformation that results in a file +# being found will be used. +path_replacements: +# - ["/foo/", "/bar"] +# - ["/foo/", "/bar/baz/"] \ No newline at end of file diff --git a/migrations/db/uploads_db_schema/100-base-schema.sql b/migrations/db/uploads_db_schema/100-base-schema.sql index e69de29bb2d..4defb1ef14b 100644 --- a/migrations/db/uploads_db_schema/100-base-schema.sql +++ b/migrations/db/uploads_db_schema/100-base-schema.sql @@ -0,0 +1,18 @@ +CREATE TABLE uploads +( + id TEXT PRIMARY KEY NOT NULL, + upload JSON_TEXT, + markdown TEXT, + skip_reason TEXT +); + +CREATE TABLE optimized_images +( + id TEXT PRIMARY KEY NOT NULL, + optimized_images JSON_TEXT +); + +CREATE TABLE downloads ( + id TEXT PRIMARY KEY NOT NULL, + original_filename TEXT NOT NULL +); diff --git a/migrations/lib/cli/upload_command.rb b/migrations/lib/cli/upload_command.rb index f85777518ce..61210fef671 100644 --- a/migrations/lib/cli/upload_command.rb +++ b/migrations/lib/cli/upload_command.rb @@ -3,9 +3,46 @@ module Migrations::CLI::UploadCommand def self.included(thor) thor.class_eval do - desc "upload", "Upload a file" + desc "upload", "Upload media uploads" + option :settings, + type: :string, + desc: "Uploads settings file path", + default: "./migrations/config/upload.yml", + aliases: "-s", + banner: "path" + option :fix_missing, type: :boolean, desc: "Fix missing uploads" + option :optimize, type: :boolean, desc: "Optimize uploads" def upload - puts "Uploading..." + puts "Starting uploads..." + + validate_settings_file! + settings = load_settings + + ::Migrations::Uploader::Uploads.perform!(settings) + + puts "" + end + + private + + def load_settings + settings = ::Migrations::SettingsParser.parse!(options.settings) + merge_settings_from_cli_args!(settings) + + settings + end + + def merge_settings_from_cli_args!(settings) + settings[:fix_missing] = options.fix_missing if options.fix_missing.present? + settings[:create_optimized_images] = options.optimize if options.optimize.present? + end + + def validate_settings_file! + path = options.settings + + if !File.exist?(path) + raise ::Migrations::NoSettingsFound, "Settings file not found: #{path}" + end end end end diff --git a/migrations/lib/migrations.rb b/migrations/lib/migrations.rb index ebc969a6088..cb51a43c4b8 100644 --- a/migrations/lib/migrations.rb +++ b/migrations/lib/migrations.rb @@ -10,6 +10,9 @@ require "zeitwerk" require_relative "converters" module Migrations + class NoSettingsFound < StandardError + end + def self.root_path @root_path ||= File.expand_path("..", __dir__) end diff --git a/migrations/lib/settings_parser.rb b/migrations/lib/settings_parser.rb new file mode 100644 index 00000000000..173df438977 --- /dev/null +++ b/migrations/lib/settings_parser.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +module Migrations + class SettingsParser + class InvalidYaml < StandardError + end + class ValidationError < StandardError + end + + REQUIRED_KEYS = %i[source_db_path output_db_path root_paths] + + def initialize(options) + @options = options + + validate! + end + + def [](key) + @options[key] + end + + def []=(key, value) + @options[key] = value + end + + def fetch(key, default) + @options.fetch(key, default) + end + + def self.parse!(path) + new(YAML.load_file(path, symbolize_names: true)) + rescue Psych::SyntaxError => e + raise InvalidYaml.new(e.message) + end + + private + + def validate! + validate_required_keys + validate_paths + validate_options + end + + def validate_required_keys + missing = REQUIRED_KEYS - @options.keys + + raise ValidationError, "Missing required keys: #{missing.join(", ")}" if missing.any? + end + + def validate_paths + %i[source_db_path output_db_path].each do |key| + path = @options[key] + + next unless path + + dir = File.dirname(path) + raise ValidationError, "Directory not writable: #{dir}" unless File.writable?(dir) + end + + if !@options[:root_paths].is_a?(Array) + raise ValidationError, "Root paths must be an array of paths" + end + + @options[:root_paths].each do |path| + raise ValidationError, "Directory not readable: #{path}" unless File.readable?(path) + end + end + + def validate_options + validate_thread_count_factor if @options[:thread_count_factor] + end + + def validate_thread_count_factor + count = @options[:thread_count_factor] + + unless count.is_a?(Numeric) && count.positive? + raise ValidationError, "Thread count factor must be numeric and positive" + end + end + end +end diff --git a/migrations/lib/uploader/site_settings.rb b/migrations/lib/uploader/site_settings.rb new file mode 100644 index 00000000000..bd6a8a92a6f --- /dev/null +++ b/migrations/lib/uploader/site_settings.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +module Migrations::Uploader + class SiteSettings + class S3UploadsConfigurationError < StandardError + end + + def initialize(options) + @options = options + end + + def configure! + configure_basic_uploads + configure_multisite if @options[:multisite] + configure_s3 if @options[:enable_s3_uploads] + end + + def self.configure!(options) + new(options).configure! + end + + private + + def configure_basic_uploads + SiteSetting.clean_up_uploads = false + SiteSetting.authorized_extensions = @options[:authorized_extensions] + SiteSetting.max_attachment_size_kb = @options[:max_attachment_size_kb] + SiteSetting.max_image_size_kb = @options[:max_image_size_kb] + end + + def configure_multisite + # rubocop:disable Discourse/NoDirectMultisiteManipulation + Rails.configuration.multisite = true + # rubocop:enable Discourse/NoDirectMultisiteManipulation + + RailsMultisite::ConnectionManagement.class_eval do + def self.current_db_override=(value) + @current_db_override = value + end + def self.current_db + @current_db_override + end + end + + RailsMultisite::ConnectionManagement.current_db_override = @options[:multisite_db_name] + end + + def configure_s3 + SiteSetting.s3_access_key_id = @options[:s3_access_key_id] + SiteSetting.s3_secret_access_key = @options[:s3_secret_access_key] + SiteSetting.s3_upload_bucket = @options[:s3_upload_bucket] + SiteSetting.s3_region = @options[:s3_region] + SiteSetting.s3_cdn_url = @options[:s3_cdn_url] + SiteSetting.enable_s3_uploads = true + + if SiteSetting.enable_s3_uploads != true + raise S3UploadsConfigurationError, "Failed to enable S3 uploads" + end + + verify_s3_uploads_configuration! + end + + def verify_s3_uploads_configuration! + Tempfile.open("discourse-s3-test") do |tmpfile| + tmpfile.write("test") + tmpfile.rewind + + upload = + UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(Discourse::SYSTEM_USER_ID) + + unless upload.present? && upload.persisted? && upload.errors.blank? && + upload.url.start_with?("//") + raise S3UploadsConfigurationError, "Failed to upload to S3" + end + + upload.destroy + end + end + end +end diff --git a/migrations/lib/uploader/tasks/base.rb b/migrations/lib/uploader/tasks/base.rb new file mode 100644 index 00000000000..71b68a1ba58 --- /dev/null +++ b/migrations/lib/uploader/tasks/base.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +require "etc" +require "colored2" + +module Migrations::Uploader + module Tasks + class Base + class NotImplementedError < StandardError + end + + TRANSACTION_SIZE = 1000 + QUEUE_SIZE = 1000 + DEFAULT_THREAD_FACTOR = 1.5 + + attr_reader :uploads_db, + :intermediate_db, + :settings, + :work_queue, + :status_queue, + :discourse_store, + :error_count, + :current_count, + :missing_count, + :skipped_count + + def initialize(databases, settings) + @uploads_db = databases[:uploads_db] + @intermediate_db = databases[:intermediate_db] + + @settings = settings + + @work_queue = SizedQueue.new(QUEUE_SIZE) + @status_queue = SizedQueue.new(QUEUE_SIZE) + @discourse_store = Discourse.store + + @error_count = 0 + @current_count = 0 + @missing_count = 0 + @skipped_count = 0 + end + + def run! + raise NotImplementedError + end + + def self.run!(databases, settings) + new(databases, settings).run! + end + + protected + + def handle_status_update + raise NotImplementedError + end + + def enqueue_jobs + raise NotImplementedError + end + + def instantiate_task_resource + {} + end + + def start_status_thread + Thread.new do + while !(result = status_queue.pop).nil? + handle_status_update(result) + log_status + end + end + end + + def start_consumer_threads + thread_count.times.map { |index| consumer_thread(index) } + end + + def consumer_thread(index) + Thread.new do + Thread.current.name = "worker-#{index}" + resource = instantiate_task_resource + + while (row = work_queue.pop) + process_upload(row, resource) + end + end + end + + def start_producer_thread + Thread.new { enqueue_jobs } + end + + def thread_count + @thread_count ||= calculate_thread_count + end + + def add_multisite_prefix(path) + return path if !Rails.configuration.multisite + + File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path) + end + + def file_exists?(path) + if discourse_store.external? + discourse_store.object_from_path(path).exists? + else + File.exist?(File.join(discourse_store.public_dir, path)) + end + end + + def with_retries(max: 3) + count = 0 + + loop do + result = yield + break result if result + + count += 1 + break nil if count >= max + + sleep(calculate_backoff(count)) + end + end + + private + + def calculate_backoff(retry_count) + 0.25 * retry_count + end + + def calculate_thread_count + base = Etc.nprocessors + thread_count_factor = settings.fetch(:thread_count_factor, DEFAULT_THREAD_FACTOR) + store_factor = discourse_store.external? ? 2 : 1 + + (base * thread_count_factor * store_factor).to_i + end + end + end +end diff --git a/migrations/lib/uploader/tasks/fixer.rb b/migrations/lib/uploader/tasks/fixer.rb new file mode 100644 index 00000000000..20b2bee09e9 --- /dev/null +++ b/migrations/lib/uploader/tasks/fixer.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +module Migrations::Uploader + module Tasks + class Fixer < Base + def run! + return if max_count.zero? + + puts "Fixing missing uploads..." + + status_thread = start_status_thread + consumer_threads = start_consumer_threads + producer_thread = start_producer_thread + + producer_thread.join + work_queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + private + + def max_count + @max_count ||= + uploads_db.db.query_single_splat("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL") + end + + def enqueue_jobs + uploads_db + .db + .query( + "SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC", + ) { |row| work_queue << row } + end + + def instantiate_task_resource + OpenStruct.new(url: "") + end + + def handle_status_update(result) + @current_count += 1 + + case result[:status] + when :ok + # ignore + when :error + @error_count += 1 + puts " Error in #{result[:id]}" + when :missing + @missing_count += 1 + puts " Missing #{result[:id]}" + + uploads_db.db.execute("DELETE FROM uploads WHERE id = ?", result[:id]) + Upload.delete_by(id: result[:upload_id]) + end + end + + def process_upload(row, fake_upload) + upload = JSON.parse(row[:upload], symbolize_names: true) + fake_upload.url = upload[:url] + path = add_multisite_prefix(discourse_store.get_path_for_upload(fake_upload)) + + status = file_exists?(path) ? :ok : :missing + + update_status_queue(row, upload, status) + rescue StandardError => error + puts error.message + status = :error + update_status_queue(row, upload, status) + end + + def update_status_queue(row, upload, status) + status_queue << { id: row[:id], upload_id: upload[:id], status: status } + end + + def log_status + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + print "\r%7d / %7d (%s, %s missing)" % + [current_count, max_count, error_count_text, missing_count] + end + end + end +end diff --git a/migrations/lib/uploader/tasks/optimizer.rb b/migrations/lib/uploader/tasks/optimizer.rb new file mode 100644 index 00000000000..bfe871974a2 --- /dev/null +++ b/migrations/lib/uploader/tasks/optimizer.rb @@ -0,0 +1,244 @@ +# frozen_string_literal: true + +module Migrations::Uploader + module Tasks + class Optimizer < Base + def initialize(databases, settings) + super(databases, settings) + + initialize_existing_ids_tracking_sets + initialize_discourse_resources + @max_count = 0 + end + + def run! + puts "", "Creating optimized images..." + + disable_optimized_image_lock + + start_tracking_sets_loader_threads.each(&:join) + status_thread = start_status_thread + consumer_threads = start_consumer_threads + producer_thread = start_producer_thread + + producer_thread.join + work_queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + private + + def initialize_existing_ids_tracking_sets + @optimized_upload_ids = Set.new + @post_upload_ids = Set.new + @avatar_upload_ids = Set.new + end + + def initialize_discourse_resources + @avatar_sizes = Discourse.avatar_sizes + @system_user = Discourse.system_user + @category_id = Category.last.id + end + + def disable_optimized_image_lock + # allow more than 1 thread to optimized images at the same time + OptimizedImage.lock_per_machine = false + end + + def start_tracking_sets_loader_threads + [ + start_optimized_upload_ids_loader_thread, + start_post_upload_ids_loader_thread, + start_avatar_upload_ids_loader_thread, + start_max_count_loader_thread, + ] + end + + def handle_status_update(params) + @current_count += 1 + + case params.delete(:status) + when :ok + uploads_db.insert(<<~SQL, params) + INSERT INTO optimized_images (id, optimized_images) + VALUES (:id, :optimized_images) + SQL + when :error + @error_count += 1 + when :skipped + @skipped_count += 1 + end + end + + def start_optimized_upload_ids_loader_thread + Thread.new do + @uploads_db + .db + .query("SELECT id FROM optimized_images") { |row| @optimized_upload_ids << row[:id] } + end + end + + def start_post_upload_ids_loader_thread + Thread.new do + sql = <<~SQL + SELECT upload_ids + FROM posts + WHERE upload_ids IS NOT NULL + SQL + + @intermediate_db + .db + .query(sql) { |row| JSON.parse(row[:upload_ids]).each { |id| @post_upload_ids << id } } + end + end + + def start_avatar_upload_ids_loader_thread + Thread.new do + sql = <<~SQL + SELECT avatar_upload_id + FROM users + WHERE avatar_upload_id IS NOT NULL + SQL + + @intermediate_db.db.query(sql) { |row| @avatar_upload_ids << row[:avatar_upload_id] } + end + end + + def start_max_count_loader_thread + Thread.new do + @max_count = + @uploads_db.db.query_single_splat( + "SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL", + ) + end + end + + def enqueue_jobs + sql = <<~SQL + SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown + FROM uploads + WHERE upload IS NOT NULL + ORDER BY rowid + SQL + + @uploads_db + .db + .query(sql) do |row| + upload_id = row[:upload_id] + + if @optimized_upload_ids.include?(upload_id) || !row[:markdown].start_with?("![") + status_queue << { id: row[:upload_id], status: :skipped } + next + end + + if @post_upload_ids.include?(upload_id) + row[:type] = "post" + work_queue << row + elsif @avatar_upload_ids.include?(upload_id) + row[:type] = "avatar" + work_queue << row + else + status_queue << { id: row[:upload_id], status: :skipped } + end + end + end + + def start_consumer_threads + Jobs.run_immediately! + + super + end + + def log_status + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + + print "\r%7d / %7d (%s, %d skipped)" % + [current_count, @max_count, error_count_text, skipped_count] + end + + def instantiate_task_resource + PostCreator.new( + @system_user, + raw: "Topic created by uploads_importer", + acting_user: @system_user, + skip_validations: true, + title: "Topic created by uploads_importer - #{SecureRandom.hex}", + archetype: Archetype.default, + category: @category_id, + ).create! + end + + def process_upload(row, post) + result = with_retries { attempt_optimization(row, post) } + status_queue << (result || { id: row[:upload_id], status: :error }) + end + + def attempt_optimization(row, post) + upload = Upload.find_by(sha1: row[:upload_sha1]) + optimized_images = create_optimized_images(row[:type], row[:markdown], upload, post) + + return if optimized_images.blank? + + processed_optimized_images = process_optimized_images(optimized_images) + + if images_valid?(processed_optimized_images) + { + id: row[:upload_id], + optimized_images: serialize_optimized_images(processed_optimized_images), + status: :ok, + } + end + end + + def images_valid?(images) + !images.nil? && images.all?(&:present?) && images.all?(&:persisted?) && + images.all? { |o| o.errors.blank? } + end + + def serialize_optimized_images(images) + images.presence&.to_json(only: OptimizedImage.column_names) + end + + def create_optimized_images(type, markdown, upload, post) + case type + when "post" + post.update_columns(baked_at: nil, cooked: "", raw: markdown) + post.reload + post.rebake! + + OptimizedImage.where(upload_id: upload.id).to_a + when "avatar" + @avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) } + end + rescue StandardError => e + puts e.message + puts e.stacktrace + + nil + end + + def process_optimized_images(images) + begin + images.map! do |image| + next if image.blank? + + image_path = add_multisite_prefix(discourse_store.get_path_for_optimized_image(image)) + + unless file_exists?(image_path) + image.destroy + image = nil + end + + image + end + rescue StandardError + images = nil + end + + images + end + end + end +end diff --git a/migrations/lib/uploader/tasks/uploader.rb b/migrations/lib/uploader/tasks/uploader.rb new file mode 100644 index 00000000000..7775317f540 --- /dev/null +++ b/migrations/lib/uploader/tasks/uploader.rb @@ -0,0 +1,327 @@ +# frozen_string_literal: true + +module Migrations::Uploader + module Tasks + class Uploader < Base + MAX_FILE_SIZE = 1.gigabyte + + UploadMetadata = Struct.new(:original_filename, :origin_url, :description) + + def run! + puts "", "Uploading uploads..." + + process_existing_uploads + + status_thread = start_status_thread + consumer_threads = start_consumer_threads + producer_thread = start_producer_thread + + producer_thread.join + work_queue.close + consumer_threads.each(&:join) + status_queue.close + status_thread.join + end + + private + + def process_existing_uploads + delete_missing_uploads if settings[:delete_missing_uploads] + initialize_existing_ids_tracking_sets + handle_surplus_uploads if surplus_upload_ids.any? + + @max_count = (@source_existing_ids - @output_existing_ids).size + @source_existing_ids = nil + + puts "Found #{@output_existing_ids.size} existing uploads. #{@max_count} are missing." + end + + def initialize_existing_ids_tracking_sets + @output_existing_ids = load_existing_ids(uploads_db.db, Set.new) + @source_existing_ids = load_existing_ids(intermediate_db.db, Set.new) + end + + def load_existing_ids(db, set) + db.query("SELECT id FROM uploads") { |row| set << row[:id] } + + set + end + + def handle_surplus_uploads + if settings[:delete_surplus_uploads] + puts "Deleting #{surplus_upload_ids.size} uploads from output database..." + + surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids| + placeholders = (["?"] * ids.size).join(",") + uploads_db.db.execute(<<~SQL, ids) + DELETE FROM uploads + WHERE id IN (#{placeholders}) + SQL + end + + @output_existing_ids -= surplus_upload_ids + else + puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \ + "Run with `delete_surplus_uploads: true` to delete them." + end + + @surplus_upload_ids = nil + end + + def surplus_upload_ids + @surplus_upload_ids ||= @output_existing_ids - @source_existing_ids + end + + def handle_status_update(params) + @current_count += 1 + + begin + if params.delete(:skipped) == true + @skipped_count += 1 + elsif (error_message = params.delete(:error)) || params[:upload].nil? + @error_count += 1 + puts "", "Failed to create upload: #{params[:id]} (#{error_message})", "" + end + + uploads_db.insert(<<~SQL, params) + INSERT INTO uploads (id, upload, markdown, skip_reason) + VALUES (:id, :upload, :markdown, :skip_reason) + SQL + rescue StandardError => e + puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", "" + @error_count += 1 + end + end + + def enqueue_jobs + intermediate_db + .db + .query("SELECT * FROM uploads ORDER BY id") do |row| + work_queue << row if @output_existing_ids.exclude?(row[:id]) + end + end + + def find_file_in_paths(row) + relative_path = row[:relative_path] || "" + + settings[:root_paths].each do |root_path| + path = File.join(root_path, relative_path, row[:filename]) + + return path if File.exist?(path) + + settings[:path_replacements].each do |from, to| + path = File.join(root_path, relative_path.sub(from, to), row[:filename]) + + return path if File.exist?(path) + end + end + + nil + end + + def handle_missing_file(row) + status_queue << { id: row[:id], upload: nil, skipped: true, skip_reason: "file not found" } + end + + def process_upload(row, _) + metadata = build_metadata(row) + data_file = nil + path = nil + + if row[:data].present? + data_file = Tempfile.new("discourse-upload", binmode: true) + data_file.write(row[:data]) + data_file.rewind + path = data_file.path + elsif row[:url].present? + path, metadata.original_filename = download_file(url: row[:url], id: row[:id]) + metadata.origin_url = row[:url] + return if !path + else + path = find_file_in_paths(row) + return handle_missing_file(row) if path.nil? + end + + error_message = nil + result = + with_retries do + upload = + copy_to_tempfile(path) do |file| + begin + UploadCreator.new( + file, + metadata.original_filename, + type: row[:type], + origin: metadata.origin_url, + ).create_for(Discourse::SYSTEM_USER_ID) + rescue StandardError => e + error_message = e.message + nil + end + end + + if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?) + upload_path = add_multisite_prefix(discourse_store.get_path_for_upload(upload)) + + unless file_exists?(upload_path) + upload.destroy + upload = nil + upload_okay = false + end + end + + if upload_okay + { + id: row[:id], + upload: upload.attributes.to_json, + markdown: + UploadMarkdown.new(upload).to_markdown(display_name: metadata.description), + skip_reason: nil, + } + else + error_message = + upload&.errors&.full_messages&.join(", ") || error_message || "unknown error" + nil + end + end + + if result.nil? + status_queue << { + id: row[:id], + upload: nil, + markdown: nil, + error: "too many retries: #{error_message}", + skip_reason: "too many retries", + } + else + status_queue << result + end + rescue StandardError => e + status_queue << { + id: row[:id], + upload: nil, + markdown: nil, + error: e.message, + skip_reason: "error", + } + ensure + data_file&.close! + end + + def build_metadata(row) + UploadMetadata.new( + original_filename: row[:display_filename] || row[:filename], + description: row[:description].presence, + ) + end + + def delete_missing_uploads + puts "Deleting missing uploads from uploads database..." + + uploads_db.db.execute(<<~SQL) + DELETE FROM uploads + WHERE upload IS NULL + SQL + end + + def download_file(url:, id:, retry_count: 0) + path = download_cache_path(id) + original_filename = nil + + if File.exist?(path) && (original_filename = get_original_filename(id)) + return path, original_filename + end + + fd = FinalDestination.new(url) + file = nil + + fd.get do |response, chunk, uri| + if file.nil? + check_response!(response, uri) + original_filename = extract_filename_from_response(response, uri) + file = File.open(path, "wb") + end + + file.write(chunk) + + if file.size > MAX_FILE_SIZE + file.close + file.unlink + file = nil + throw :done + end + end + + if file + file.close + uploads_db.insert( + "INSERT INTO downloads (id, original_filename) VALUES (?, ?)", + [id, original_filename], + ) + return path, original_filename + end + + nil + end + + def download_cache_path(id) + id = id.gsub("/", "_").gsub("=", "-") + File.join(settings[:download_cache_path], id) + end + + def get_original_filename(id) + uploads_db.db.query_single_splat("SELECT original_filename FROM downloads WHERE id = ?", id) + end + + def check_response!(response, uri) + if uri.blank? + code = response.code.to_i + + if code >= 400 + raise "#{code} Error" + else + throw :done + end + end + end + + def extract_filename_from_response(response, uri) + filename = + if (header = response.header["Content-Disposition"].presence) + disposition_filename = + header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1] + if disposition_filename.present? + URI.decode_www_form_component(disposition_filename) + else + nil + end + end + + filename = File.basename(uri.path).presence || "file" if filename.blank? + + if File.extname(filename).blank? && response.content_type.present? + ext = MiniMime.lookup_by_content_type(response.content_type)&.extension + filename = "#{filename}.#{ext}" if ext.present? + end + + filename + end + + def copy_to_tempfile(source_path) + extension = File.extname(source_path) + + Tempfile.open(["discourse-upload", extension]) do |tmpfile| + File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) } + tmpfile.rewind + yield(tmpfile) + end + end + + def log_status + error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors" + print "\r%7d / %7d (%s, %s skipped)" % + [current_count, @max_count, error_count_text, skipped_count] + end + end + end +end diff --git a/migrations/lib/uploader/uploads.rb b/migrations/lib/uploader/uploads.rb new file mode 100644 index 00000000000..931de237695 --- /dev/null +++ b/migrations/lib/uploader/uploads.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +module Migrations::Uploader + class Uploads + attr_reader :settings, :databases + + def initialize(settings) + @settings = settings + @databases = setup_databases + configure_services + end + + def perform! + tasks = build_task_pipeline + tasks.each { |task| task.run!(databases, settings) } + ensure + cleanup_resources + end + + def self.perform!(settings = {}) + new(settings).perform! + end + + private + + def build_task_pipeline + [].tap do |tasks| + tasks << Tasks::Fixer if settings[:fix_missing] + tasks << Tasks::Uploader + tasks << Tasks::Optimizer if settings[:create_optimized_images] + end + end + + def setup_databases + run_uploads_db_migrations + + { + uploads_db: create_database_connection(:uploads), + intermediate_db: create_database_connection(:intermediate), + } + end + + def create_database_connection(type) + path = type == :uploads ? settings[:output_db_path] : settings[:source_db_path] + + # TODO: Using "raw" db connection here for now + # Investigate using ::Migrations::Database::IntermediateDB.setup(db) + # Should we have a ::Migrations::Database::UploadsDB.setup(db)? + ::Migrations::Database.connect(path) + end + + def run_uploads_db_migrations + ::Migrations::Database.migrate( + settings[:output_db_path], + migrations_path: ::Migrations::Database::UPLOADS_DB_SCHEMA_PATH, + ) + end + + def configure_services + configure_logging + configure_site_settings + end + + def configure_logging + @original_exifr_logger = EXIFR.logger + + # disable logging for EXIFR which is used by ImageOptim + EXIFR.logger = Logger.new(nil) + end + + def configure_site_settings + SiteSettings.configure!(settings[:site_settings]) + end + + def cleanup_resources + databases.values.each(&:close) + EXIFR.logger = @original_exifr_logger + end + end +end