discourse/script/bulk_import/uploads_importer.rb

804 lines
24 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
puts "Loading application..."
require_relative "../../config/environment"
require "etc"
require "colored2"
begin
require "sqlite3"
rescue LoadError
STDERR.puts "",
"ERROR: Failed to load required gems.",
"",
"You need to enable the `generic_import` group in your Gemfile.",
"Execute the following command to do so:",
"",
"\tbundle config set --local with generic_import && bundle install",
""
exit 1
end
module BulkImport
MAX_FILE_SIZE = 1.gigabyte
UploadMetadata = Struct.new(:original_filename, :origin_url, :description)
class UploadsImporter
TRANSACTION_SIZE = 1000
QUEUE_SIZE = 1000
def initialize(settings_path)
@settings = YAML.load_file(settings_path, symbolize_names: true)
@settings[:path_replacements] ||= []
@root_paths = @settings[:root_paths]
@output_db = create_connection(@settings[:output_db_path])
initialize_output_db
configure_site_settings
end
def run
# disable logging for EXIFR which is used by ImageOptim
EXIFR.logger = Logger.new(nil)
if @settings[:fix_missing]
@source_db = create_connection(@settings[:output_db_path])
puts "Fixing missing uploads..."
fix_missing
else
@source_db = create_connection(@settings[:source_db_path])
puts "Uploading uploads..."
upload_files
puts "", "Creating optimized images..."
create_optimized_images if @settings[:create_optimized_images]
end
puts ""
ensure
close
end
def upload_files
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
if @settings[:delete_missing_uploads]
puts "Deleting missing uploads from output database..."
@output_db.execute(<<~SQL)
DELETE FROM uploads
WHERE upload IS NULL
SQL
end
output_existing_ids = Set.new
query("SELECT id FROM uploads", @output_db).tap do |result_set|
result_set.each { |row| output_existing_ids << row["id"] }
result_set.close
end
source_existing_ids = Set.new
query("SELECT id FROM uploads", @source_db).tap do |result_set|
result_set.each { |row| source_existing_ids << row["id"] }
result_set.close
end
if (surplus_upload_ids = output_existing_ids - source_existing_ids).any?
if @settings[:delete_surplus_uploads]
puts "Deleting #{surplus_upload_ids.size} uploads from output database..."
surplus_upload_ids.each_slice(TRANSACTION_SIZE) do |ids|
placeholders = (["?"] * ids.size).join(",")
@output_db.execute(<<~SQL, ids)
DELETE FROM uploads
WHERE id IN (#{placeholders})
SQL
end
output_existing_ids -= surplus_upload_ids
else
puts "Found #{surplus_upload_ids.size} surplus uploads in output database. " \
"Run with `delete_surplus_uploads: true` to delete them."
end
surplus_upload_ids = nil
end
max_count = (source_existing_ids - output_existing_ids).size
source_existing_ids = nil
puts "Found #{output_existing_ids.size} existing uploads. #{max_count} are missing."
producer_thread =
Thread.new do
query("SELECT * FROM uploads ORDER BY id", @source_db).tap do |result_set|
result_set.each { |row| queue << row if output_existing_ids.exclude?(row["id"]) }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
skipped_count = 0
current_count = 0
while !(params = status_queue.pop).nil?
begin
if params.delete(:skipped) == true
skipped_count += 1
elsif (error_message = params.delete(:error)) || params[:upload].nil?
error_count += 1
puts "", "Failed to create upload: #{params[:id]} (#{error_message})", ""
end
insert(<<~SQL, params)
INSERT INTO uploads (id, upload, markdown, skip_reason)
VALUES (:id, :upload, :markdown, :skip_reason)
SQL
rescue StandardError => e
puts "", "Failed to insert upload: #{params[:id]} (#{e.message}))", ""
error_count += 1
end
current_count += 1
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
(Etc.nprocessors * @settings[:thread_count_factor]).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
store = Discourse.store
while (row = queue.pop)
begin
data_file = nil
path = nil
metadata =
UploadMetadata.new(
original_filename: row["display_filename"] || row["filename"],
description: row["description"].presence,
)
if row["data"].present?
data_file = Tempfile.new("discourse-upload", binmode: true)
data_file.write(row["data"])
data_file.rewind
path = data_file.path
elsif row["url"].present?
path, metadata.original_filename = download_file(url: row["url"], id: row["id"])
metadata.origin_url = row["url"]
next if !path
else
relative_path = row["relative_path"] || ""
file_exists = false
@root_paths.each do |root_path|
path = File.join(root_path, relative_path, row["filename"])
break if (file_exists = File.exist?(path))
@settings[:path_replacements].each do |from, to|
path = File.join(root_path, relative_path.sub(from, to), row["filename"])
break if (file_exists = File.exist?(path))
end
end
if !file_exists
status_queue << {
id: row["id"],
upload: nil,
skipped: true,
skip_reason: "file not found",
}
next
end
end
retry_count = 0
loop do
error_message = nil
upload =
copy_to_tempfile(path) do |file|
begin
UploadCreator.new(
file,
metadata.original_filename,
type: row["type"],
origin: metadata.origin_url,
).create_for(Discourse::SYSTEM_USER_ID)
rescue StandardError => e
error_message = e.message
nil
end
end
if (upload_okay = upload.present? && upload.persisted? && upload.errors.blank?)
upload_path = add_multisite_prefix(store.get_path_for_upload(upload))
file_exists =
if store.external?
store.object_from_path(upload_path).exists?
else
File.exist?(File.join(store.public_dir, upload_path))
end
unless file_exists
upload.destroy
upload = nil
upload_okay = false
end
end
if upload_okay
status_queue << {
id: row["id"],
upload: upload.attributes.to_json,
markdown:
UploadMarkdown.new(upload).to_markdown(display_name: metadata.description),
skip_reason: nil,
}
break
elsif retry_count >= 3
error_message ||= upload&.errors&.full_messages&.join(", ") || "unknown error"
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: "too many retries: #{error_message}",
skip_reason: "too many retries",
}
break
end
retry_count += 1
sleep 0.25 * retry_count
end
rescue StandardError => e
status_queue << {
id: row["id"],
upload: nil,
markdown: nil,
error: e.message,
skip_reason: "error",
}
ensure
data_file&.close!
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def download_file(url:, id:, retry_count: 0)
path = download_cache_path(id)
original_filename = nil
if File.exist?(path) && (original_filename = get_original_filename(id))
return path, original_filename
end
fd = FinalDestination.new(url)
file = nil
fd.get do |response, chunk, uri|
if file.nil?
check_response!(response, uri)
original_filename = extract_filename_from_response(response, uri)
file = File.open(path, "wb")
end
file.write(chunk)
if file.size > MAX_FILE_SIZE
file.close
file.unlink
file = nil
throw :done
end
end
if file
file.close
insert(
"INSERT INTO downloads (id, original_filename) VALUES (?, ?)",
[id, original_filename],
)
return path, original_filename
end
nil
end
def download_cache_path(id)
id = id.gsub("/", "_").gsub("=", "-")
File.join(@settings[:download_cache_path], id)
end
def get_original_filename(id)
@output_db.get_first_value("SELECT original_filename FROM downloads WHERE id = ?", id)
end
def check_response!(response, uri)
if uri.blank?
code = response.code.to_i
if code >= 400
raise "#{code} Error"
else
throw :done
end
end
end
def extract_filename_from_response(response, uri)
filename =
if (header = response.header["Content-Disposition"].presence)
disposition_filename =
header[/filename\*=UTF-8''(\S+)\b/i, 1] || header[/filename=(?:"(.+)"|[^\s;]+)/i, 1]
disposition_filename.present? ? URI.decode_www_form_component(disposition_filename) : nil
end
filename = File.basename(uri.path).presence || "file" if filename.blank?
if File.extname(filename).blank? && response.content_type.present?
ext = MiniMime.lookup_by_content_type(response.content_type)&.extension
filename = "#{filename}.#{ext}" if ext.present?
end
filename
end
def fix_missing
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
max_count =
@source_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
producer_thread =
Thread.new do
query(
"SELECT id, upload FROM uploads WHERE upload IS NOT NULL ORDER BY rowid DESC",
@source_db,
).tap do |result_set|
result_set.each { |row| queue << row }
result_set.close
end
end
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
missing_count = 0
while !(result = status_queue.pop).nil?
current_count += 1
case result[:status]
when :ok
# ignore
when :error
error_count += 1
puts "Error in #{result[:id]}"
when :missing
missing_count += 1
puts "Missing #{result[:id]}"
@output_db.execute("DELETE FROM uploads WHERE id = ?", result[:id])
Upload.delete_by(id: result[:upload_id])
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %s missing)" %
[current_count, max_count, error_count_text, missing_count]
end
end
store = Discourse.store
(Etc.nprocessors * @settings[:thread_count_factor] * 2).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
fake_upload = OpenStruct.new(url: "")
while (row = queue.pop)
begin
upload = JSON.parse(row["upload"])
fake_upload.url = upload["url"]
path = add_multisite_prefix(store.get_path_for_upload(fake_upload))
file_exists =
if store.external?
store.object_from_path(path).exists?
else
File.exist?(File.join(store.public_dir, path))
end
if file_exists
status_queue << { id: row["id"], upload_id: upload["id"], status: :ok }
else
status_queue << { id: row["id"], upload_id: upload["id"], status: :missing }
end
rescue StandardError => e
puts e.message
status_queue << { id: row["id"], upload_id: upload["id"], status: :error }
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
def create_optimized_images
init_threads = []
optimized_upload_ids = Set.new
post_upload_ids = Set.new
avatar_upload_ids = Set.new
max_count = 0
# allow more than 1 thread to optimized images at the same time
OptimizedImage.lock_per_machine = false
init_threads << Thread.new do
query("SELECT id FROM optimized_images", @output_db).tap do |result_set|
result_set.each { |row| optimized_upload_ids << row["id"] }
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT upload_ids
FROM posts
WHERE upload_ids IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each do |row|
JSON.parse(row["upload_ids"]).each { |id| post_upload_ids << id }
end
result_set.close
end
end
init_threads << Thread.new do
sql = <<~SQL
SELECT avatar_upload_id
FROM users
WHERE avatar_upload_id IS NOT NULL
SQL
query(sql, @source_db).tap do |result_set|
result_set.each { |row| avatar_upload_ids << row["avatar_upload_id"] }
result_set.close
end
end
init_threads << Thread.new do
max_count =
@output_db.get_first_value("SELECT COUNT(*) FROM uploads WHERE upload IS NOT NULL")
end
init_threads.each(&:join)
status_queue = SizedQueue.new(QUEUE_SIZE)
status_thread =
Thread.new do
error_count = 0
current_count = 0
skipped_count = 0
while !(params = status_queue.pop).nil?
current_count += 1
case params.delete(:status)
when :ok
insert(<<~SQL, params)
INSERT INTO optimized_images (id, optimized_images)
VALUES (:id, :optimized_images)
SQL
when :error
error_count += 1
when :skipped
skipped_count += 1
end
error_count_text = error_count > 0 ? "#{error_count} errors".red : "0 errors"
print "\r%7d / %7d (%s, %d skipped)" %
[current_count, max_count, error_count_text, skipped_count]
end
end
queue = SizedQueue.new(QUEUE_SIZE)
consumer_threads = []
producer_thread =
Thread.new do
sql = <<~SQL
SELECT id AS upload_id, upload ->> 'sha1' AS upload_sha1, markdown
FROM uploads
WHERE upload IS NOT NULL
ORDER BY rowid
SQL
query(sql, @output_db).tap do |result_set|
result_set.each do |row|
upload_id = row["upload_id"]
if optimized_upload_ids.include?(upload_id) || !row["markdown"].start_with?("![")
status_queue << { id: row["upload_id"], status: :skipped }
next
end
if post_upload_ids.include?(upload_id)
row["type"] = "post"
queue << row
elsif avatar_upload_ids.include?(upload_id)
row["type"] = "avatar"
queue << row
else
status_queue << { id: row["upload_id"], status: :skipped }
end
end
result_set.close
end
end
avatar_sizes = Discourse.avatar_sizes
store = Discourse.store
remote_factor = store.external? ? 2 : 1
Jobs.run_immediately!
(Etc.nprocessors * @settings[:thread_count_factor] * remote_factor).to_i.times do |index|
consumer_threads << Thread.new do
Thread.current.name = "worker-#{index}"
post =
PostCreator.new(
Discourse.system_user,
raw: "Topic created by uploads_importer",
acting_user: Discourse.system_user,
skip_validations: true,
title: "Topic created by uploads_importer - #{SecureRandom.hex}",
archetype: Archetype.default,
category: Category.last.id,
).create!
while (row = queue.pop)
retry_count = 0
loop do
upload = Upload.find_by(sha1: row["upload_sha1"])
optimized_images =
begin
case row["type"]
when "post"
post.update_columns(baked_at: nil, cooked: "", raw: row["markdown"])
post.reload
post.rebake!
OptimizedImage.where(upload_id: upload.id).to_a
when "avatar"
avatar_sizes.map { |size| OptimizedImage.create_for(upload, size, size) }
end
rescue StandardError => e
puts e.message
puts e.stacktrace
nil
end
begin
if optimized_images.present?
optimized_images.map! do |optimized_image|
next if optimized_image.blank?
optimized_image_path =
add_multisite_prefix(store.get_path_for_optimized_image(optimized_image))
file_exists =
if store.external?
store.object_from_path(optimized_image_path).exists?
else
File.exist?(File.join(store.public_dir, optimized_image_path))
end
unless file_exists
optimized_image.destroy
optimized_image = nil
end
optimized_image
end
end
rescue StandardError
optimized_images = nil
end
optimized_images_okay =
!optimized_images.nil? && optimized_images.all?(&:present?) &&
optimized_images.all?(&:persisted?) &&
optimized_images.all? { |o| o.errors.blank? }
if optimized_images_okay
status_queue << {
id: row["upload_id"],
optimized_images: optimized_images.presence&.to_json,
status: :ok,
}
break
elsif retry_count >= 3
status_queue << { id: row["upload_id"], status: :error }
break
end
retry_count += 1
sleep 0.25 * retry_count
end
end
end
end
producer_thread.join
queue.close
consumer_threads.each(&:join)
status_queue.close
status_thread.join
end
private
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60_000 # 60 seconds
sqlite.journal_mode = "WAL"
sqlite.synchronous = "off"
sqlite
end
def query(sql, db)
db.prepare(sql).execute
end
def initialize_output_db
@statement_counter = 0
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS uploads (
id TEXT PRIMARY KEY NOT NULL,
upload JSON_TEXT,
markdown TEXT,
skip_reason TEXT
)
SQL
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS optimized_images (
id TEXT PRIMARY KEY NOT NULL,
optimized_images JSON_TEXT
)
SQL
@output_db.execute(<<~SQL)
CREATE TABLE IF NOT EXISTS downloads (
id TEXT PRIMARY KEY NOT NULL,
original_filename TEXT NOT NULL
)
SQL
end
def insert(sql, bind_vars = [])
@output_db.transaction if @statement_counter == 0
@output_db.execute(sql, bind_vars)
if (@statement_counter += 1) > TRANSACTION_SIZE
@output_db.commit
@statement_counter = 0
end
end
def close
@source_db.close if @source_db
if @output_db
@output_db.commit if @output_db.transaction_active?
@output_db.close
end
end
def copy_to_tempfile(source_path)
extension = File.extname(source_path)
Tempfile.open(["discourse-upload", extension]) do |tmpfile|
File.open(source_path, "rb") { |source_stream| IO.copy_stream(source_stream, tmpfile) }
tmpfile.rewind
yield(tmpfile)
end
end
def configure_site_settings
settings = @settings[:site_settings]
SiteSetting.clean_up_uploads = false
SiteSetting.authorized_extensions = settings[:authorized_extensions]
SiteSetting.max_attachment_size_kb = settings[:max_attachment_size_kb]
SiteSetting.max_image_size_kb = settings[:max_image_size_kb]
if settings[:multisite]
# rubocop:disable Discourse/NoDirectMultisiteManipulation
Rails.configuration.multisite = true
# rubocop:enable Discourse/NoDirectMultisiteManipulation
RailsMultisite::ConnectionManagement.class_eval do
def self.current_db_override=(value)
@current_db_override = value
end
def self.current_db
@current_db_override
end
end
RailsMultisite::ConnectionManagement.current_db_override = settings[:multisite_db_name]
end
if settings[:enable_s3_uploads]
SiteSetting.s3_access_key_id = settings[:s3_access_key_id]
SiteSetting.s3_secret_access_key = settings[:s3_secret_access_key]
SiteSetting.s3_upload_bucket = settings[:s3_upload_bucket]
SiteSetting.s3_region = settings[:s3_region]
SiteSetting.s3_cdn_url = settings[:s3_cdn_url]
SiteSetting.enable_s3_uploads = true
raise "Failed to enable S3 uploads" if SiteSetting.enable_s3_uploads != true
Tempfile.open("discourse-s3-test") do |tmpfile|
tmpfile.write("test")
tmpfile.rewind
upload =
UploadCreator.new(tmpfile, "discourse-s3-test.txt").create_for(
Discourse::SYSTEM_USER_ID,
)
unless upload.present? && upload.persisted? && upload.errors.blank? &&
upload.url.start_with?("//")
raise "Failed to upload to S3"
end
upload.destroy
end
end
end
def add_multisite_prefix(path)
if Rails.configuration.multisite
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path)
else
path
end
end
end
end
# bundle exec ruby script/bulk_import/uploads_importer.rb /path/to/uploads_importer.yml
BulkImport::UploadsImporter.new(ARGV.first).run