From dc8c6b8958cd6bd7bea7ab8ba3683f6b381e7020 Mon Sep 17 00:00:00 2001 From: Gerhard Schlager Date: Sun, 10 Dec 2023 22:38:03 +0100 Subject: [PATCH] DEV: Lots of improvements to the generic_bulk import script Notable changes: * Imports a lot more tables from core and plugins * site settings * uploads with necessary upload references * groups and group members * user profiles * user options * user fields & values * muted users * user notes (plugin) * user followers (plugin) * user avatars * tag groups and tags * tag users (notification settings for tags / user) * category permissions * polls with options and votes * post votes (plugin) * solutions (plugin) * gamification scores (plugin) * events (plugin) * badges and badge groupings * user badges * optimized images * topic users (notification settings for topics) * post custom fields * permalinks and permalink normalizations * It creates the `migration_mappings` table which is used to store the mapping for a handful of imported tables * Detects duplicate group names and renames them * Pre-cooking for attachments, images and mentions * Outputs instructions when gems are missing * Supports importing uploads from a DB generated by `uploads_importer.rb` * Checks that all required plugins exists and enables them if needed * A couple of optimizations and additions in `import.rake` --- lib/tasks/import.rake | 140 +- script/bulk_import/base.rb | 879 +++++++++++- script/bulk_import/generic_bulk.rb | 2116 ++++++++++++++++++++++++++-- 3 files changed, 2926 insertions(+), 209 deletions(-) diff --git a/lib/tasks/import.rake b/lib/tasks/import.rake index 79eb54726d1..df49e09301d 100644 --- a/lib/tasks/import.rake +++ b/lib/tasks/import.rake @@ -15,6 +15,7 @@ task "import:ensure_consistency" => :environment do insert_user_stats insert_user_visits insert_draft_sequences + insert_automatic_group_users update_user_stats update_posts @@ -24,8 +25,11 @@ task "import:ensure_consistency" => :environment do update_groups update_tag_stats update_topic_users + update_topic_featured_users create_category_definitions + # run_jobs + log "Done!" end @@ -213,11 +217,16 @@ end def insert_user_stats log "Inserting user stats..." - DB.exec <<-SQL + DB.exec <<~SQL INSERT INTO user_stats (user_id, new_since) - SELECT id, created_at - FROM users - ON CONFLICT DO NOTHING + SELECT id, created_at + FROM users u + WHERE NOT EXISTS ( + SELECT 1 + FROM user_stats us + WHERE us.user_id = u.id + ) + ON CONFLICT DO NOTHING SQL end @@ -247,6 +256,40 @@ def insert_draft_sequences SQL end +def insert_automatic_group_users + Group::AUTO_GROUPS.each do |group_name, group_id| + user_condition = + case group_name + when :everyone + "TRUE" + when :admins + "id > 0 AND admin AND NOT staged" + when :moderators + "id > 0 AND moderator AND NOT staged" + when :staff + "id > 0 AND (moderator OR admin) AND NOT staged" + when :trust_level_1, :trust_level_2, :trust_level_3, :trust_level_4 + "id > 0 AND trust_level >= :min_trust_level AND NOT staged" + when :trust_level_0 + "id > 0 AND NOT staged" + end + + DB.exec(<<~SQL, group_id: group_id, min_trust_level: group_id - 10) + INSERT INTO group_users (group_id, user_id, created_at, updated_at) + SELECT :group_id, id, NOW(), NOW() + FROM users u + WHERE #{user_condition} + AND NOT EXISTS ( + SELECT 1 + FROM group_users gu + WHERE gu.group_id = :group_id AND gu.user_id = u.id + ) + SQL + + Group.reset_user_count(Group.find(group_id)) + end +end + def update_user_stats log "Updating user stats..." @@ -404,9 +447,9 @@ def update_users GROUP BY p.user_id ) UPDATE users - SET first_seen_at = X.min_created_at - , last_seen_at = X.max_created_at - , last_posted_at = X.max_created_at + SET first_seen_at = LEAST(first_seen_at, X.min_created_at) + , last_seen_at = GREATEST(last_seen_at, X.max_created_at) + , last_posted_at = GREATEST(last_posted_at, X.max_created_at) FROM X WHERE id = X.user_id AND (COALESCE(first_seen_at, '1970-01-01') <> X.min_created_at @@ -459,9 +502,15 @@ def update_topic_users SQL end +def update_topic_featured_users + log "Updating topic featured users..." + TopicFeaturedUsers.ensure_consistency! +end + def create_category_definitions log "Creating category definitions" Category.ensure_consistency! + Site.clear_cache end def log(message) @@ -654,3 +703,80 @@ task "import:update_avatars_from_sso" => :environment do status_queue.close status_thread.join end + +def run_jobs + log "Running jobs" + + Jobs::EnsureDbConsistency.new.execute({}) + Jobs::DirectoryRefreshOlder.new.execute({}) + Jobs::DirectoryRefreshDaily.new.execute({}) + Jobs::ReindexSearch.new.execute({}) + Jobs::TopRefreshToday.new.execute({}) + Jobs::TopRefreshOlder.new.execute({}) + Jobs::Weekly.new.execute({}) +end + +desc "Rebake posts that contain polls" +task "import:rebake_uncooked_posts_with_polls" => :environment do + log "Rebaking posts with polls" + + Jobs.run_immediately! + + posts = + Post.where("EXISTS (SELECT 1 FROM polls WHERE polls.post_id = posts.id)").where( + "baked_version <> ? or baked_version IS NULL", + Post::BAKED_VERSION, + ) + + max_count = posts.count + current_count = 0 + + posts.find_each(order: :desc) do |post| + post.rebake! + current_count += 1 + print "\r%7d / %7d" % [current_count, max_count] + end +end + +desc "Rebake posts that contain events" +task "import:rebake_uncooked_posts_with_events" => :environment do + log "Rebaking posts with events" + + Jobs.run_immediately! + + posts = + Post.where( + "EXISTS (SELECT 1 FROM discourse_post_event_events WHERE discourse_post_event_events.id = posts.id)", + ).where("baked_version <> ? or baked_version IS NULL", Post::BAKED_VERSION) + + max_count = posts.count + current_count = 0 + + posts.find_each(order: :desc) do |post| + post.rebake! + current_count += 1 + print "\r%7d / %7d" % [current_count, max_count] + end +end + +desc "Rebake posts that have tag" +task "import:rebake_uncooked_posts_with_tag", [:tag_name] => :environment do |_task, args| + log "Rebaking posts with tag" + + Jobs.run_immediately! + + posts = + Post.where( + "EXISTS (SELECT 1 FROM topic_tags JOIN tags ON tags.id = topic_tags.tag_id WHERE topic_tags.topic_id = posts.topic_id AND tags.name = ?)", + args[:tag_name], + ).where("baked_version <> ? or baked_version IS NULL", Post::BAKED_VERSION) + + max_count = posts.count + current_count = 0 + + posts.find_each(order: :desc) do |post| + post.rebake! + current_count += 1 + print "\r%7d / %7d" % [current_count, max_count] + end +end diff --git a/script/bulk_import/base.rb b/script/bulk_import/base.rb index cc4b607fb1d..9e5be0ab6a7 100644 --- a/script/bulk_import/base.rb +++ b/script/bulk_import/base.rb @@ -87,16 +87,20 @@ class BulkImport::Base end def run + start_time = Time.now + puts "Starting..." Rails.logger.level = 3 # :error, so that we don't create log files that are many GB preload_i18n + create_migration_mappings_table fix_highest_post_numbers load_imported_ids load_indexes execute fix_primary_keys execute_after - puts "Done! Now run the 'import:ensure_consistency' rake task." + puts "Done! (#{((Time.now - start_time) / 60).to_i} minutes)" + puts "Now run the 'import:ensure_consistency' rake task." end def preload_i18n @@ -106,6 +110,20 @@ class BulkImport::Base ActiveSupport::Inflector.transliterate("test") end + MAPPING_TYPES = Enum.new(upload: 1, badge: 2, poll: 3, poll_option: 4) + + def create_migration_mappings_table + puts "Creating migration mappings table..." + @raw_connection.exec <<~SQL + CREATE TABLE IF NOT EXISTS migration_mappings ( + original_id VARCHAR(255) NOT NULL, + type INTEGER NOT NULL, + discourse_id VARCHAR(255) NOT NULL, + PRIMARY KEY (original_id, type) + ) + SQL + end + def fix_highest_post_numbers puts "Fixing highest post numbers..." @raw_connection.exec <<-SQL @@ -152,7 +170,6 @@ class BulkImport::Base puts "Loading imported user ids..." @users, imported_user_ids = imported_ids("user") @last_imported_user_id = imported_user_ids.max || -1 - @pre_existing_user_ids = Set.new puts "Loading imported category ids..." @categories, imported_category_ids = imported_ids("category") @@ -189,10 +206,25 @@ class BulkImport::Base map end + def load_index(type) + map = {} + + @raw_connection.send_query( + "SELECT original_id, discourse_id FROM migration_mappings WHERE type = #{type}", + ) + @raw_connection.set_single_row_mode + + @raw_connection.get_result.stream_each { |row| map[row["original_id"]] = row["discourse_id"] } + + @raw_connection.get_result + + map + end + def load_indexes puts "Loading groups indexes..." @last_group_id = last_id(Group) - @group_names = Group.unscoped.pluck(:name).map(&:downcase).to_set + group_names = Group.unscoped.pluck(:name).map(&:downcase).to_set puts "Loading users indexes..." @last_user_id = last_id(User) @@ -200,22 +232,29 @@ class BulkImport::Base @last_sso_record_id = last_id(SingleSignOnRecord) @emails = UserEmail.pluck(:email, :user_id).to_h @external_ids = SingleSignOnRecord.pluck(:external_id, :user_id).to_h - @usernames_lower = User.unscoped.pluck(:username_lower).to_set + @usernames_and_groupnames_lower = User.unscoped.pluck(:username_lower).to_set.merge(group_names) + @anonymized_user_suffixes = + DB.query_single( + "SELECT SUBSTRING(username_lower, 5)::BIGINT FROM users WHERE username_lower ~* '^anon\\d+$'", + ).to_set @mapped_usernames = UserCustomField .joins(:user) .where(name: "import_username") .pluck("user_custom_fields.value", "users.username") .to_h + @last_user_avatar_id = last_id(UserAvatar) + @last_upload_id = last_id(Upload) puts "Loading categories indexes..." @last_category_id = last_id(Category) + @last_category_group_id = last_id(CategoryGroup) @highest_category_position = Category.unscoped.maximum(:position) || 0 @category_names = Category .unscoped .pluck(:parent_category_id, :name) - .map { |pci, name| "#{pci}-#{name}" } + .map { |pci, name| "#{pci}-#{name.downcase}" } .to_set puts "Loading topics indexes..." @@ -229,6 +268,21 @@ class BulkImport::Base puts "Loading post actions indexes..." @last_post_action_id = last_id(PostAction) + + puts "Loading upload indexes..." + @uploads_mapping = load_index(MAPPING_TYPES[:upload]) + @uploads_by_sha1 = Upload.pluck(:sha1, :id).to_h + @upload_urls_by_id = Upload.pluck(:id, :url).to_h + + puts "Loading badge indexes..." + @badge_mapping = load_index(MAPPING_TYPES[:badge]) + @last_badge_id = last_id(Badge) + + puts "Loading poll indexes..." + @poll_mapping = load_index(MAPPING_TYPES[:poll]) + @poll_option_mapping = load_index(MAPPING_TYPES[:poll_option]) + @last_poll_id = last_id(Poll) + @last_poll_option_id = last_id(PollOption) end def use_bbcode_to_md? @@ -261,6 +315,11 @@ class BulkImport::Base if @last_category_id > 0 @raw_connection.exec("SELECT setval('#{Category.sequence_name}', #{@last_category_id})") end + if @last_category_group_id > 0 + @raw_connection.exec( + "SELECT setval('#{CategoryGroup.sequence_name}', #{@last_category_group_id})", + ) + end if @last_topic_id > 0 @raw_connection.exec("SELECT setval('#{Topic.sequence_name}', #{@last_topic_id})") end @@ -270,6 +329,21 @@ class BulkImport::Base if @last_post_action_id > 0 @raw_connection.exec("SELECT setval('#{PostAction.sequence_name}', #{@last_post_action_id})") end + if @last_user_avatar_id > 0 + @raw_connection.exec("SELECT setval('#{UserAvatar.sequence_name}', #{@last_user_avatar_id})") + end + if @last_upload_id > 0 + @raw_connection.exec("SELECT setval('#{Upload.sequence_name}', #{@last_upload_id})") + end + if @last_badge_id > 0 + @raw_connection.exec("SELECT setval('#{Badge.sequence_name}', #{@last_badge_id})") + end + if @last_poll_id > 0 + @raw_connection.exec("SELECT setval('#{Poll.sequence_name}', #{@last_poll_id})") + end + if @last_poll_option_id > 0 + @raw_connection.exec("SELECT setval('#{PollOption.sequence_name}', #{@last_poll_option_id})") + end end def group_id_from_imported_id(id) @@ -292,6 +366,18 @@ class BulkImport::Base @posts[id.to_i] end + def upload_id_from_original_id(id) + @uploads_mapping[id.to_s]&.to_i + end + + def upload_id_from_sha1(sha1) + @uploads_by_sha1[sha1] + end + + def upload_url_from_id(id) + @upload_urls_by_id[id] + end + def post_number_from_imported_id(id) post_id = post_id_from_imported_id(id) post_id && @post_number_by_post_id[post_id] @@ -302,7 +388,32 @@ class BulkImport::Base post_id && @topic_id_by_post_id[post_id] end - GROUP_COLUMNS ||= %i[id name title bio_raw bio_cooked created_at updated_at] + def badge_id_from_original_id(id) + @badge_mapping[id.to_s]&.to_i + end + + def poll_id_from_original_id(id) + @poll_mapping[id.to_s]&.to_i + end + + def poll_option_id_from_original_id(id) + @poll_option_mapping[id.to_s]&.to_i + end + + GROUP_COLUMNS ||= %i[ + id + name + full_name + title + bio_raw + bio_cooked + visibility_level + members_visibility_level + mentionable_level + messageable_level + created_at + updated_at + ] USER_COLUMNS ||= %i[ id @@ -319,6 +430,7 @@ class BulkImport::Base primary_group_id suspended_at suspended_till + last_seen_at last_emailed_at created_at updated_at @@ -344,6 +456,10 @@ class BulkImport::Base digest_attempted_at ] + USER_HISTORY_COLUMNS ||= %i[action acting_user_id target_user_id details created_at updated_at] + + USER_AVATAR_COLUMNS ||= %i[id user_id custom_upload_id created_at updated_at] + USER_PROFILE_COLUMNS ||= %i[user_id location website bio_raw bio_cooked views] USER_SSO_RECORD_COLUMNS ||= %i[ @@ -361,8 +477,55 @@ class BulkImport::Base external_card_background_url ] + USER_OPTION_COLUMNS ||= %i[ + user_id + mailing_list_mode + mailing_list_mode_frequency + email_level + email_messages_level + email_previous_replies + email_in_reply_to + email_digests + digest_after_minutes + include_tl0_in_digests + automatically_unpin_topics + enable_quoting + external_links_in_new_tab + dynamic_favicon + new_topic_duration_minutes + auto_track_topics_after_msecs + notification_level_when_replying + like_notification_frequency + skip_new_user_tips + hide_profile_and_presence + sidebar_link_to_filtered_list + sidebar_show_count_of_new_items + timezone + ] + + USER_FOLLOWER_COLUMNS ||= %i[user_id follower_id level created_at updated_at] + GROUP_USER_COLUMNS ||= %i[group_id user_id created_at updated_at] + USER_CUSTOM_FIELD_COLUMNS ||= %i[user_id name value created_at updated_at] + + POST_CUSTOM_FIELD_COLUMNS ||= %i[post_id name value created_at updated_at] + + TOPIC_CUSTOM_FIELD_COLUMNS ||= %i[topic_id name value created_at updated_at] + + USER_ACTION_COLUMNS ||= %i[ + action_type + user_id + target_topic_id + target_post_id + target_user_id + acting_user_id + created_at + updated_at + ] + + MUTED_USER_COLUMNS ||= %i[user_id muted_user_id created_at updated_at] + CATEGORY_COLUMNS ||= %i[ id name @@ -372,10 +535,18 @@ class BulkImport::Base description position parent_category_id + read_restricted + uploaded_logo_id created_at updated_at ] + CATEGORY_CUSTOM_FIELD_COLUMNS ||= %i[category_id name value created_at updated_at] + + CATEGORY_GROUP_COLUMNS ||= %i[id category_id group_id permission_type created_at updated_at] + + CATEGORY_TAG_GROUP_COLUMNS ||= %i[category_id tag_group_id created_at updated_at] + TOPIC_COLUMNS ||= %i[ id archetype @@ -389,6 +560,7 @@ class BulkImport::Base closed pinned_at views + subtype created_at bumped_at updated_at @@ -436,6 +608,148 @@ class BulkImport::Base TOPIC_TAG_COLUMNS ||= %i[topic_id tag_id created_at updated_at] + TOPIC_USER_COLUMNS ||= %i[ + user_id + topic_id + last_read_post_number + last_visited_at + first_visited_at + notification_level + notifications_changed_at + notifications_reason_id + total_msecs_viewed + ] + + TAG_USER_COLUMNS ||= %i[tag_id user_id notification_level created_at updated_at] + + UPLOAD_COLUMNS ||= %i[ + id + user_id + original_filename + filesize + width + height + url + created_at + updated_at + sha1 + origin + retain_hours + extension + thumbnail_width + thumbnail_height + etag + secure + access_control_post_id + original_sha1 + animated + verification_status + security_last_changed_at + security_last_changed_reason + dominant_color + ] + + UPLOAD_REFERENCE_COLUMNS ||= %i[upload_id target_type target_id created_at updated_at] + + OPTIMIZED_IMAGE_COLUMNS ||= %i[ + sha1 + extension + width + height + upload_id + url + filesize + etag + version + created_at + updated_at + ] + + POST_VOTING_VOTE_COLUMNS ||= %i[user_id votable_type votable_id direction created_at] + + BADGE_COLUMNS ||= %i[ + id + name + description + badge_type_id + badge_grouping_id + long_description + image_upload_id + created_at + updated_at + multiple_grant + query + ] + + USER_BADGE_COLUMNS ||= %i[badge_id user_id granted_at granted_by_id seq post_id created_at] + + GAMIFICATION_SCORE_EVENT_COLUMNS ||= %i[user_id date points description created_at updated_at] + + POST_EVENT_COLUMNS ||= %i[ + id + status + original_starts_at + original_ends_at + deleted_at + raw_invitees + name + url + custom_fields + reminders + recurrence + timezone + minimal + ] + + POST_EVENT_DATES_COLUMNS ||= %i[ + event_id + starts_at + ends_at + reminder_counter + event_will_start_sent_at + event_started_sent_at + finished_at + created_at + updated_at + ] + + POLL_COLUMNS ||= %i[ + id + post_id + name + close_at + type + status + results + visibility + min + max + step + anonymous_voters + created_at + updated_at + chart_type + groups + title + ] + + POLL_OPTION_COLUMNS ||= %i[id poll_id digest html anonymous_votes created_at updated_at] + + POLL_VOTE_COLUMNS ||= %i[poll_id poll_option_id user_id created_at updated_at] + + PLUGIN_STORE_ROW_COLUMNS ||= %i[plugin_name key type_name value] + + PERMALINK_COLUMNS ||= %i[ + url + topic_id + post_id + category_id + tag_id + external_url + created_at + updated_at + ] + def create_groups(rows, &block) create_records(rows, "group", GROUP_COLUMNS, &block) end @@ -453,51 +767,179 @@ class BulkImport::Base def create_user_emails(rows, &block) create_records(rows, "user_email", USER_EMAIL_COLUMNS, &block) end + def create_user_stats(rows, &block) create_records(rows, "user_stat", USER_STAT_COLUMNS, &block) end + + def create_user_histories(rows, &block) + create_records(rows, "user_history", USER_HISTORY_COLUMNS, &block) + end + + def create_user_avatars(rows, &block) + create_records(rows, "user_avatar", USER_AVATAR_COLUMNS, &block) + end + def create_user_profiles(rows, &block) create_records(rows, "user_profile", USER_PROFILE_COLUMNS, &block) end + + def create_user_options(rows, &block) + create_records(rows, "user_option", USER_OPTION_COLUMNS, &block) + end + + def create_user_followers(rows, &block) + create_records(rows, "user_follower", USER_FOLLOWER_COLUMNS, &block) + end + def create_single_sign_on_records(rows, &block) create_records(rows, "single_sign_on_record", USER_SSO_RECORD_COLUMNS, &block) end + + def create_user_custom_fields(rows, &block) + create_records(rows, "user_custom_field", USER_CUSTOM_FIELD_COLUMNS, &block) + end + + def create_muted_users(rows, &block) + create_records(rows, "muted_user", MUTED_USER_COLUMNS, &block) + end + def create_group_users(rows, &block) create_records(rows, "group_user", GROUP_USER_COLUMNS, &block) end + def create_categories(rows, &block) create_records(rows, "category", CATEGORY_COLUMNS, &block) end + + def create_category_custom_fields(rows, &block) + create_records(rows, "category_custom_field", CATEGORY_CUSTOM_FIELD_COLUMNS, &block) + end + + def create_category_groups(rows, &block) + create_records(rows, "category_group", CATEGORY_GROUP_COLUMNS, &block) + end + + def create_category_tag_groups(rows, &block) + create_records(rows, "category_tag_group", CATEGORY_TAG_GROUP_COLUMNS, &block) + end + def create_topics(rows, &block) create_records(rows, "topic", TOPIC_COLUMNS, &block) end + def create_posts(rows, &block) create_records(rows, "post", POST_COLUMNS, &block) end + def create_post_actions(rows, &block) create_records(rows, "post_action", POST_ACTION_COLUMNS, &block) end + def create_topic_allowed_users(rows, &block) create_records(rows, "topic_allowed_user", TOPIC_ALLOWED_USER_COLUMNS, &block) end + def create_topic_tags(rows, &block) create_records(rows, "topic_tag", TOPIC_TAG_COLUMNS, &block) end + def create_topic_users(rows, &block) + create_records(rows, "topic_user", TOPIC_USER_COLUMNS, &block) + end + + def create_tag_users(rows, &block) + create_records(rows, "tag_user", TAG_USER_COLUMNS, &block) + end + + def create_uploads(rows, &block) + create_records_with_mapping(rows, "upload", UPLOAD_COLUMNS, &block) + end + + def create_upload_references(rows, &block) + create_records(rows, "upload_reference", UPLOAD_REFERENCE_COLUMNS, &block) + end + + def create_optimized_images(rows, &block) + create_records(rows, "optimized_image", OPTIMIZED_IMAGE_COLUMNS, &block) + end + + def create_post_voting_votes(rows, &block) + create_records(rows, "post_voting_vote", POST_VOTING_VOTE_COLUMNS, &block) + end + + def create_post_custom_fields(rows, &block) + create_records(rows, "post_custom_field", POST_CUSTOM_FIELD_COLUMNS, &block) + end + + def create_topic_custom_fields(rows, &block) + create_records(rows, "topic_custom_field", TOPIC_CUSTOM_FIELD_COLUMNS, &block) + end + + def create_user_actions(rows, &block) + create_records(rows, "user_action", USER_ACTION_COLUMNS, &block) + end + + def create_badges(rows, &block) + create_records_with_mapping(rows, "badge", BADGE_COLUMNS, &block) + end + + def create_user_badges(rows, &block) + create_records(rows, "user_badge", USER_BADGE_COLUMNS, &block) + end + + def create_gamification_score_events(rows, &block) + create_records(rows, "gamification_score_event", GAMIFICATION_SCORE_EVENT_COLUMNS, &block) + end + + def create_post_events(rows, &block) + create_records(rows, "discourse_post_event_events", POST_EVENT_COLUMNS, &block) + end + + def create_post_event_dates(rows, &block) + create_records(rows, "discourse_calendar_post_event_dates", POST_EVENT_DATES_COLUMNS, &block) + end + + def create_polls(rows, &block) + create_records_with_mapping(rows, "poll", POLL_COLUMNS, &block) + end + + def create_poll_options(rows, &block) + create_records_with_mapping(rows, "poll_option", POLL_OPTION_COLUMNS, &block) + end + + def create_poll_votes(rows, &block) + create_records(rows, "poll_vote", POLL_VOTE_COLUMNS, &block) + end + + def create_plugin_store_rows(rows, &block) + create_records(rows, "plugin_store_row", PLUGIN_STORE_ROW_COLUMNS, &block) + end + + def create_permalinks(rows, &block) + create_records(rows, "permalink", PERMALINK_COLUMNS, &block) + end + def process_group(group) @groups[group[:imported_id].to_i] = group[:id] = @last_group_id += 1 group[:name] = fix_name(group[:name]) - unless @group_names.add?(group[:name].downcase) + unless @usernames_and_groupnames_lower.add?(group[:name].downcase) group_name = group[:name] + "_1" - group_name.next! until @group_names.add?(group_name.downcase) + group_name.next! until @usernames_and_groupnames_lower.add?(group_name.downcase) group[:name] = group_name end group[:title] = group[:title].scrub.strip.presence if group[:title].present? group[:bio_raw] = group[:bio_raw].scrub.strip.presence if group[:bio_raw].present? group[:bio_cooked] = pre_cook(group[:bio_raw]) if group[:bio_raw].present? + + group[:visibility_level] ||= Group.visibility_levels[:public] + group[:members_visibility_level] ||= Group.visibility_levels[:public] + group[:mentionable_level] ||= Group::ALIAS_LEVELS[:nobody] + group[:messageable_level] ||= Group::ALIAS_LEVELS[:nobody] + group[:created_at] ||= NOW group[:updated_at] ||= group[:created_at] group @@ -507,8 +949,7 @@ class BulkImport::Base if user[:email].present? user[:email].downcase! - if existing_user_id = @emails[user[:email]] - @pre_existing_user_ids << existing_user_id + if (existing_user_id = @emails[user[:email]]) @users[user[:imported_id].to_i] = existing_user_id user[:skip] = true return user @@ -516,8 +957,7 @@ class BulkImport::Base end if user[:external_id].present? - if existing_user_id = @external_ids[user[:external_id]] - @pre_existing_user_ids << existing_user_id + if (existing_user_id = @external_ids[user[:external_id]]) @users[user[:imported_id].to_i] = existing_user_id user[:skip] = true return user @@ -526,7 +966,7 @@ class BulkImport::Base @users[user[:imported_id].to_i] = user[:id] = @last_user_id += 1 - imported_username = user[:username].dup + imported_username = user[:original_username].presence || user[:username].dup user[:username] = fix_name(user[:username]).presence || random_username @@ -536,9 +976,9 @@ class BulkImport::Base end # unique username_lower - unless @usernames_lower.add?(user[:username].downcase) + unless @usernames_and_groupnames_lower.add?(user[:username].downcase) username = user[:username] + "_1" - username.next! until @usernames_lower.add?(username.downcase) + username.next! until @usernames_and_groupnames_lower.add?(username.downcase) user[:username] = username end @@ -562,11 +1002,7 @@ class BulkImport::Base end def process_user_email(user_email) - user_id = @users[user_email[:imported_user_id].to_i] - return { skip: true } if @pre_existing_user_ids.include?(user_id) - user_email[:id] = @last_user_email_id += 1 - user_email[:user_id] = user_id user_email[:primary] = true user_email[:created_at] ||= NOW user_email[:updated_at] ||= user_email[:created_at] @@ -581,10 +1017,7 @@ class BulkImport::Base end def process_user_stat(user_stat) - user_id = @users[user_stat[:imported_user_id].to_i] - return { skip: true } if @pre_existing_user_ids.include?(user_id) - - user_stat[:user_id] = user_id + user_stat[:user_id] = user_id_from_imported_id(user_email[:imported_user_id]) user_stat[:topics_entered] ||= 0 user_stat[:time_read] ||= 0 user_stat[:days_visited] ||= 0 @@ -599,21 +1032,62 @@ class BulkImport::Base user_stat end - def process_user_profile(user_profile) - return { skip: true } if @pre_existing_user_ids.include?(user_profile[:user_id]) + def process_user_history(history) + history[:created_at] ||= NOW + history[:updated_at] ||= NOW + history + end + def process_muted_user(muted_user) + muted_user[:created_at] ||= NOW + muted_user[:updated_at] ||= NOW + muted_user + end + + def process_user_profile(user_profile) user_profile[:bio_raw] = (user_profile[:bio_raw].presence || "").scrub.strip.presence user_profile[:bio_cooked] = pre_cook(user_profile[:bio_raw]) if user_profile[:bio_raw].present? user_profile[:views] ||= 0 user_profile end - def process_single_sign_on_record(sso_record) - user_id = @users[sso_record[:imported_user_id].to_i] - return { skip: true } if @pre_existing_user_ids.include?(user_id) + USER_OPTION_DEFAULTS = { + mailing_list_mode: SiteSetting.default_email_mailing_list_mode, + mailing_list_mode_frequency: SiteSetting.default_email_mailing_list_mode_frequency, + email_level: SiteSetting.default_email_level, + email_messages_level: SiteSetting.default_email_messages_level, + email_previous_replies: SiteSetting.default_email_previous_replies, + email_in_reply_to: SiteSetting.default_email_in_reply_to, + email_digests: SiteSetting.default_email_digest_frequency.to_i > 0, + digest_after_minutes: SiteSetting.default_email_digest_frequency, + include_tl0_in_digests: SiteSetting.default_include_tl0_in_digests, + automatically_unpin_topics: SiteSetting.default_topics_automatic_unpin, + enable_quoting: SiteSetting.default_other_enable_quoting, + external_links_in_new_tab: SiteSetting.default_other_external_links_in_new_tab, + dynamic_favicon: SiteSetting.default_other_dynamic_favicon, + new_topic_duration_minutes: SiteSetting.default_other_new_topic_duration_minutes, + auto_track_topics_after_msecs: SiteSetting.default_other_auto_track_topics_after_msecs, + notification_level_when_replying: SiteSetting.default_other_notification_level_when_replying, + like_notification_frequency: SiteSetting.default_other_like_notification_frequency, + skip_new_user_tips: SiteSetting.default_other_skip_new_user_tips, + hide_profile_and_presence: SiteSetting.default_hide_profile_and_presence, + sidebar_link_to_filtered_list: SiteSetting.default_sidebar_link_to_filtered_list, + sidebar_show_count_of_new_items: SiteSetting.default_sidebar_show_count_of_new_items, + } + def process_user_option(user_option) + USER_OPTION_DEFAULTS.each { |key, value| user_option[key] = value if user_option[key].nil? } + user_option + end + + def process_user_follower(user_follower) + user_follower[:created_at] ||= NOW + user_follower[:updated_at] ||= NOW + user_follower + end + + def process_single_sign_on_record(sso_record) sso_record[:id] = @last_sso_record_id += 1 - sso_record[:user_id] = user_id sso_record[:last_payload] ||= "" sso_record[:created_at] = NOW sso_record[:updated_at] = NOW @@ -627,20 +1101,36 @@ class BulkImport::Base end def process_category(category) - if category[:existing_id].present? - @categories[category[:imported_id].to_i] = category[:existing_id] + if (existing_category_id = category[:existing_id]).present? + if existing_category_id.is_a?(String) + existing_category_id = SiteSetting.get(category[:existing_id]) + end + + @categories[category[:imported_id].to_i] = existing_category_id category[:skip] = true return category end category[:id] ||= @last_category_id += 1 @categories[category[:imported_id].to_i] ||= category[:id] - category[:name] = category[:name][0...50].scrub.strip - # TODO: unique name - category[:name_lower] = category[:name].downcase - category[:slug] ||= Slug.ascii_generator(category[:name_lower]) + + next_number = 1 + original_name = name = category[:name][0...50].scrub.strip + + while @category_names.include?("#{category[:parent_category_id]}-#{name.downcase}") + name = "#{original_name[0...50 - next_number.to_s.length]}#{next_number}" + next_number += 1 + end + + @category_names << "#{category[:parent_category_id]}-#{name.downcase}" + name_lower = name.downcase + + category[:name] = name + category[:name_lower] = name_lower + category[:slug] ||= Slug.ascii_generator(name_lower) category[:description] = (category[:description] || "").scrub.strip.presence category[:user_id] ||= Discourse::SYSTEM_USER_ID + category[:read_restricted] = false if category[:read_restricted].nil? category[:created_at] ||= NOW category[:updated_at] ||= category[:created_at] @@ -654,6 +1144,25 @@ class BulkImport::Base category end + def process_category_custom_field(field) + field[:created_at] ||= NOW + field[:updated_at] ||= NOW + field + end + + def process_category_group(category_group) + category_group[:id] = @last_category_group_id += 1 + category_group[:created_at] = NOW + category_group[:updated_at] = NOW + category_group + end + + def process_category_tag_group(category_tag_group) + category_tag_group[:created_at] = NOW + category_tag_group[:updated_at] = NOW + category_tag_group + end + def process_topic(topic) @topics[topic[:imported_id].to_i] = topic[:id] = @last_topic_id += 1 topic[:archetype] ||= Archetype.default @@ -692,12 +1201,26 @@ class BulkImport::Base end post[:raw] = normalize_text(post[:raw]) post[:like_count] ||= 0 + post[:score] ||= 0 post[:cooked] = pre_cook post[:raw] post[:hidden] ||= false post[:word_count] = post[:raw].scan(/[[:word:]]+/).size post[:created_at] ||= NOW post[:last_version_at] = post[:created_at] post[:updated_at] ||= post[:created_at] + + if post[:raw].bytes.include?(0) + STDERR.puts "Skipping post with original ID #{post[:imported_id]} because `raw` contains null bytes" + post[:skip] = true + end + + post[:reply_to_post_number] = nil if post[:reply_to_post_number] == 1 + + if post[:cooked].bytes.include?(0) + STDERR.puts "Skipping post with original ID #{post[:imported_id]} because `cooked` contains null bytes" + post[:skip] = true + end + post end @@ -722,6 +1245,61 @@ class BulkImport::Base topic_tag end + def process_topic_user(topic_user) + topic_user + end + + def process_tag_user(tag_user) + tag_user[:created_at] = NOW + tag_user[:updated_at] = NOW + tag_user + end + + def process_upload(upload) + if (existing_upload_id = upload_id_from_sha1(upload[:sha1])) + @imported_records[upload[:original_id]] = existing_upload_id + @uploads_mapping[upload[:original_id]] = existing_upload_id + return { skip: true } + end + + upload[:id] = @last_upload_id += 1 + upload[:user_id] ||= Discourse::SYSTEM_USER_ID + upload[:created_at] ||= NOW + upload[:updated_at] ||= NOW + + @imported_records[upload[:original_id]] = upload[:id] + @uploads_mapping[upload[:original_id]] = upload[:id] + @uploads_by_sha1[upload[:sha1]] = upload[:id] + @upload_urls_by_id[upload[:id]] = upload[:url] + + upload + end + + def process_upload_reference(upload_reference) + upload_reference[:created_at] ||= NOW + upload_reference[:updated_at] ||= NOW + upload_reference + end + + def process_optimized_image(optimized_image) + optimized_image[:user_id] ||= Discourse::SYSTEM_USER_ID + optimized_image[:created_at] ||= NOW + optimized_image[:updated_at] ||= NOW + optimized_image + end + + def process_post_voting_vote(vote) + vote[:created_at] ||= NOW + vote + end + + def process_user_avatar(avatar) + avatar[:id] = @last_user_avatar_id += 1 + avatar[:created_at] ||= NOW + avatar[:updated_at] ||= NOW + avatar + end + def process_raw(original_raw) raw = original_raw.dup # fix whitespaces @@ -870,30 +1448,146 @@ class BulkImport::Base raw end - def create_records(rows, name, columns) + def process_user_custom_field(field) + field[:created_at] ||= NOW + field[:updated_at] ||= NOW + field + end + + def process_post_custom_field(field) + field[:created_at] ||= NOW + field[:updated_at] ||= NOW + field + end + + def process_topic_custom_field(field) + field[:created_at] ||= NOW + field[:updated_at] ||= NOW + field + end + + def process_user_action(user_action) + user_action[:created_at] ||= NOW + user_action[:updated_at] ||= NOW + user_action + end + + def process_badge(badge) + badge[:id] = @last_badge_id += 1 + badge[:created_at] ||= NOW + badge[:updated_at] ||= NOW + badge[:multiple_grant] = false if badge[:multiple_grant].nil? + + @imported_records[badge[:original_id].to_s] = badge[:id] + @badge_mapping[badge[:original_id].to_s] = badge[:id] + + badge + end + + def process_user_badge(user_badge) + user_badge[:granted_at] ||= NOW + user_badge[:granted_by_id] ||= Discourse::SYSTEM_USER_ID + user_badge[:created_at] ||= user_badge[:granted_at] + user_badge + end + + def process_gamification_score_event(score_event) + score_event[:created_at] ||= NOW + score_event[:updated_at] ||= NOW + score_event + end + + def process_discourse_post_event_events(post_event) + post_event + end + + def process_discourse_calendar_post_event_dates(post_event_date) + post_event_date[:created_at] ||= NOW + post_event_date[:updated_at] ||= NOW + post_event_date + end + + def process_poll(poll) + poll[:id] = @last_poll_id += 1 + poll[:created_at] ||= NOW + poll[:updated_at] ||= NOW + + @imported_records[poll[:original_id].to_s] = poll[:id] + @poll_mapping[poll[:original_id].to_s] = poll[:id] + + poll + end + + def process_poll_option(poll_option) + poll_option[:id] = id = @last_poll_option_id += 1 + poll_option[:created_at] ||= NOW + poll_option[:updated_at] ||= NOW + poll_option[:anonymous_votes] ||= nil + + poll_option[:digest] = Digest::MD5.hexdigest([poll_option[:html]].to_json) + + poll_option[:original_ids] + .map(&:to_s) + .each do |original_id| + @imported_records[original_id] = id + @poll_option_mapping[original_id] = id + end + + poll_option + end + + def process_poll_vote(poll_vote) + poll_vote[:created_at] ||= NOW + poll_vote[:updated_at] ||= NOW + poll_vote + end + + def process_plugin_store_row(plugin_store_row) + plugin_store_row + end + + def process_permalink(permalink) + permalink[:created_at] ||= NOW + permalink[:updated_at] ||= NOW + permalink + end + + def create_records(all_rows, name, columns, &block) start = Time.now imported_ids = [] process_method_name = "process_#{name}" - sql = "COPY #{name.pluralize} (#{columns.map { |c| "\"#{c}\"" }.join(",")}) FROM STDIN" + rows_created = 0 - @raw_connection.copy_data(sql, @encoder) do - rows.each do |row| - begin - next unless mapped = yield(row) - processed = send(process_method_name, mapped) - imported_ids << mapped[:imported_id] unless mapped[:imported_id].nil? - imported_ids |= mapped[:imported_ids] unless mapped[:imported_ids].nil? - @raw_connection.put_copy_data columns.map { |c| processed[c] } unless processed[:skip] - rows_created += 1 - if rows_created % 100 == 0 - print "\r%7d - %6d/sec" % [rows_created, rows_created.to_f / (Time.now - start)] + all_rows.each_slice(1_000) do |rows| + sql = "COPY #{name.pluralize} (#{columns.map { |c| "\"#{c}\"" }.join(",")}) FROM STDIN" + + begin + @raw_connection.copy_data(sql, @encoder) do + rows.each do |row| + begin + if (mapped = yield(row)) + processed = send(process_method_name, mapped) + imported_ids << mapped[:imported_id] unless mapped[:imported_id].nil? + imported_ids |= mapped[:imported_ids] unless mapped[:imported_ids].nil? + unless processed[:skip] + @raw_connection.put_copy_data columns.map { |c| processed[c] } + end + end + rows_created += 1 + if rows_created % 100 == 0 + print "\r%7d - %6d/sec" % [rows_created, rows_created.to_f / (Time.now - start)] + end + rescue => e + puts "\n" + puts "ERROR: #{e.message}" + puts e.backtrace.join("\n") + end end - rescue => e - puts "\n" - puts "ERROR: #{e.message}" - puts e.backtrace.join("\n") end + rescue => e + puts "First Row: #{rows.first.inspect}" + raise e end end @@ -902,14 +1596,23 @@ class BulkImport::Base end id_mapping_method_name = "#{name}_id_from_imported_id".freeze - return unless respond_to?(id_mapping_method_name) + return true unless respond_to?(id_mapping_method_name) create_custom_fields(name, "id", imported_ids) do |imported_id| { record_id: send(id_mapping_method_name, imported_id), value: imported_id } end + true rescue => e # FIXME: errors catched here stop the rest of the COPY puts e.message puts e.backtrace.join("\n") + false + end + + def create_records_with_mapping(all_rows, name, columns, &block) + @imported_records = {} + if create_records(all_rows, name, columns, &block) + store_mappings(MAPPING_TYPES[name.to_sym], @imported_records) + end end def create_custom_fields(table, name, rows) @@ -924,6 +1627,17 @@ class BulkImport::Base end end + def store_mappings(type, rows) + return if rows.empty? + + sql = "COPY migration_mappings (original_id, type, discourse_id) FROM STDIN" + @raw_connection.copy_data(sql, @encoder) do + rows.each do |original_id, discourse_id| + @raw_connection.put_copy_data [original_id, type, discourse_id] + end + end + end + def create_upload(user_id, path, source_filename) @uploader.create_upload(user_id, path, source_filename) end @@ -935,6 +1649,7 @@ class BulkImport::Base def fix_name(name) name.scrub! if name && !name.valid_encoding? return if name.blank? + # TODO Support Unicode if allowed in site settings and try to reuse logic from UserNameSuggester if possible name = ActiveSupport::Inflector.transliterate(name) name.gsub!(/[^\w.-]+/, "_") name.gsub!(/^\W+/, "") @@ -954,16 +1669,16 @@ class BulkImport::Base end def pre_cook(raw) - cooked = raw - + # TODO Check if this is still up-to-date # Convert YouTube URLs to lazyYT DOMs before being transformed into links - cooked.gsub!(%r{\nhttps\://www.youtube.com/watch\?v=(\w+)\n}) do - video_id = $1 - result = <<-HTML + cooked = + raw.gsub(%r{\nhttps\://www.youtube.com/watch\?v=(\w+)\n}) do + video_id = $1 + result = <<-HTML
- HTML - result.strip - end + HTML + result.strip + end cooked = @markdown.render(cooked).scrub.strip @@ -999,7 +1714,49 @@ class BulkImport::Base end end + # Attachments + cooked.gsub!(%r{(.*?)\|attachment}) do + upload_base62, filename = $1, $2 + %{#{filename}} + end + + # Images + cooked.gsub!(%r{}) do + short_url, alt, width, height = $1, $2, $3, $4 + upload_sha1 = Upload.sha1_from_short_url(short_url) + upload_base62 = Upload.base62_sha1(upload_sha1) + upload_id = @uploads_by_sha1[upload_sha1] + upload_url = @upload_urls_by_id[upload_id] + cdn_url = Discourse.store.cdn_url(upload_url) + + attributes = +%{loading="lazy"} + attributes << %{ alt="#{alt}"} if alt.present? + attributes << %{ width="#{width}"} if width.present? + attributes << %{ height="#{height}"} if height.present? + if width.present? && height.present? + attributes << %{ style="aspect-ratio: #{width} / #{height};"} + end + + %{@#{name}| + elsif Group.where("LOWER(name) = ?", normalized_name).exists? + %|@#{name}| + else + "@#{name}" + end + end + + # TODO Check if scrub or strip is inserting \x00 which is causing Postgres COPY to fail cooked.scrub.strip + cooked.gsub!(/\x00/, "") + cooked end def user_avatar(user) diff --git a/script/bulk_import/generic_bulk.rb b/script/bulk_import/generic_bulk.rb index 08ef615e73c..f3ad6e6227f 100644 --- a/script/bulk_import/generic_bulk.rb +++ b/script/bulk_import/generic_bulk.rb @@ -1,16 +1,29 @@ # frozen_string_literal: true -require_relative "base" -require "sqlite3" -require "json" +begin + require_relative "base" + require "sqlite3" + require "json" +rescue LoadError + STDERR.puts "", + "ERROR: Failed to load required gems.", + "", + "You need to enable the `generic_import` group in your Gemfile.", + "Execute the following command to do so:", + "", + "\tbundle config set --local with generic_import && bundle install", + "" + exit 1 +end class BulkImport::Generic < BulkImport::Base AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"] UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"] - def initialize(db_path) + def initialize(db_path, uploads_db_path = nil) super() - @db = create_connection(db_path) + @source_db = create_connection(db_path) + @uploads_db = create_connection(uploads_db_path) if uploads_db_path end def start @@ -25,41 +38,165 @@ class BulkImport::Generic < BulkImport::Base end def execute - import_categories + enable_required_plugins + import_site_settings + + import_uploads + + # needs to happen before users, because keeping group names is more important than usernames + import_groups + import_users import_user_emails + import_user_profiles + import_user_options + import_user_fields + import_user_field_values import_single_sign_on_records + import_muted_users + import_user_histories + import_user_notes + import_user_note_counts + import_user_followers + + import_user_avatars + update_uploaded_avatar_id + + import_group_members + + import_tag_groups + import_tags + import_tag_users + + import_categories + import_category_custom_fields + import_category_tag_groups + import_category_permissions + import_topics import_posts + import_post_custom_fields + + import_polls + import_poll_options + import_poll_votes + + import_topic_tags import_topic_allowed_users + import_likes + import_votes + import_answers + import_gamification_scores + import_post_events + + import_badge_groupings + import_badges + import_user_badges + + import_upload_references + import_optimized_images + + import_topic_users + update_topic_users + import_user_stats - import_tags + + import_permalink_normalizations + import_permalinks + end + + def execute_after + import_category_about_topics + + @source_db.close + @uploads_db.close if @uploads_db + end + + def enable_required_plugins + puts "", "Enabling required plugins..." + + required_plugin_names = @source_db.get_first_value(<<~SQL)&.then(&JSON.method(:parse)) + SELECT value + FROM config + WHERE name = 'enable_required_plugins' + SQL + + return if required_plugin_names.blank? + + plugins_by_name = Discourse.plugins_by_name + + required_plugin_names.each do |plugin_name| + if (plugin = plugins_by_name[plugin_name]) + if !plugin.enabled? && plugin.configurable? + SiteSetting.set(plugin.enabled_site_setting, true) + end + puts " #{plugin_name} plugin enabled" + else + puts " ERROR: The #{plugin_name} plugin is required, but not installed." + exit 1 + end + end + end + + def import_site_settings + puts "", "Importing site settings..." + + rows = query(<<~SQL) + SELECT name, value, action + FROM site_settings + ORDER BY ROWID + SQL + + all_settings = SiteSetting.all_settings + + rows.each do |row| + name = row["name"].to_sym + setting = all_settings.find { |s| s[:setting] == name } + next unless setting + + case row["action"] + when "update" + SiteSetting.set_and_log(name, row["value"]) + when "append" + raise "Cannot append to #{name} setting" if setting[:type] != "list" + items = (SiteSetting.get(name) || "").split("|") + items << row["value"] unless items.include?(row["value"]) + SiteSetting.set_and_log(name, items.join("|")) + end + end + + rows.close end def import_categories - puts "Importing categories..." + puts "", "Importing categories..." categories = query(<<~SQL) - WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug, - old_relative_url, existing_id, level, rowid) AS ( - SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug, - c.old_relative_url, c.existing_id, 0 AS level, c.ROWID - FROM categories c - WHERE c.parent_category_id IS NULL - UNION - SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug, - c.old_relative_url, c.existing_id, tree.level + 1 AS level, c.ROWID - FROM categories c, - tree - WHERE c.parent_category_id = tree.id - ) - SELECT * - FROM tree - ORDER BY level, rowid + WITH + RECURSIVE + tree AS ( + SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, + c.slug, c.existing_id, c.position, c.logo_upload_id, 0 AS level + FROM categories c + WHERE c.parent_category_id IS NULL + UNION ALL + SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, + c.slug, c.existing_id, c.position, c.logo_upload_id, tree.level + 1 AS level + FROM categories c, + tree + WHERE c.parent_category_id = tree.id + ) + SELECT id, parent_category_id, name, description, color, text_color, read_restricted, slug, existing_id, logo_upload_id, + COALESCE(position, + ROW_NUMBER() OVER (PARTITION BY parent_category_id ORDER BY parent_category_id NULLS FIRST, name)) AS position + FROM tree + ORDER BY level, position, id SQL create_categories(categories) do |row| + next if category_id_from_imported_id(row["id"]).present? + { imported_id: row["id"], existing_id: row["existing_id"], @@ -68,20 +205,176 @@ class BulkImport::Generic < BulkImport::Base parent_category_id: row["parent_category_id"] ? category_id_from_imported_id(row["parent_category_id"]) : nil, slug: row["slug"], + read_restricted: row["read_restricted"], + uploaded_logo_id: + row["logo_upload_id"] ? upload_id_from_original_id(row["logo_upload_id"]) : nil, } end + + categories.close end - def import_users - puts "Importing users..." + def import_category_about_topics + puts "", %|Creating "About..." topics for categories...| + start_time = Time.now + Category.ensure_consistency! + Site.clear_cache - users = query(<<~SQL) - SELECT ROWID, * - FROM users + categories = query(<<~SQL) + SELECT id, about_topic_title + FROM categories + WHERE about_topic_title IS NOT NULL + ORDER BY id + SQL + + categories.each do |row| + if (about_topic_title = row["about_topic_title"]).present? + if (category_id = category_id_from_imported_id(row["id"])) + topic = Category.find(category_id).topic + topic.title = about_topic_title + topic.save!(validate: false) + end + end + end + + categories.close + + puts " Creating took #{(Time.now - start_time).to_i} seconds." + end + + def import_category_custom_fields + puts "", "Importing category custom fields..." + + category_custom_fields = query(<<~SQL) + SELECT * + FROM category_custom_fields + ORDER BY category_id, name + SQL + + field_names = + query("SELECT DISTINCT name FROM category_custom_fields") { _1.map { |row| row["name"] } } + existing_category_custom_fields = + CategoryCustomField.where(name: field_names).pluck(:category_id, :name).to_set + + create_category_custom_fields(category_custom_fields) do |row| + category_id = category_id_from_imported_id(row["category_id"]) + next if category_id.nil? + + next if existing_category_custom_fields.include?([category_id, row["name"]]) + + { category_id: category_id, name: row["name"], value: row["value"] } + end + + category_custom_fields.close + end + + def import_category_tag_groups + puts "", "Importing category tag groups..." + + category_tag_groups = query(<<~SQL) + SELECT c.id AS category_id, t.value AS tag_group_id + FROM categories c, + JSON_EACH(c.tag_group_ids) t + ORDER BY category_id, tag_group_id + SQL + + existing_category_tag_groups = CategoryTagGroup.pluck(:category_id, :tag_group_id).to_set + + create_category_tag_groups(category_tag_groups) do |row| + category_id = category_id_from_imported_id(row["category_id"]) + tag_group_id = @tag_group_mapping[row["tag_group_id"]] + + next unless category_id && tag_group_id + next unless existing_category_tag_groups.add?([category_id, tag_group_id]) + + { category_id: category_id, tag_group_id: tag_group_id } + end + + category_tag_groups.close + end + + def import_category_permissions + puts "", "Importing category permissions..." + + permissions = query(<<~SQL) + SELECT c.id AS category_id, p.value -> 'group_id' AS group_id, p.value -> 'permission_type' AS permission_type + FROM categories c, + JSON_EACH(c.permissions) p + SQL + + existing_category_group_ids = CategoryGroup.pluck(:category_id, :group_id).to_set + + create_category_groups(permissions) do |row| + category_id = category_id_from_imported_id(row["category_id"]) + group_id = group_id_from_imported_id(row["group_id"]) + next if existing_category_group_ids.include?([category_id, group_id]) + + { category_id: category_id, group_id: group_id, permission_type: row["permission_type"] } + end + + permissions.close + end + + def import_groups + puts "", "Importing groups..." + + groups = query(<<~SQL) + SELECT * + FROM groups + ORDER BY id + SQL + + create_groups(groups) do |row| + next if group_id_from_imported_id(row["id"]).present? + + { + imported_id: row["id"], + name: row["name"], + full_name: row["full_name"], + visibility_level: row["visibility_level"], + members_visibility_level: row["members_visibility_level"], + mentionable_level: row["mentionable_level"], + messageable_level: row["messageable_level"], + } + end + + groups.close + end + + def import_group_members + puts "", "Importing group members..." + + group_members = query(<<~SQL) + SELECT * + FROM group_members ORDER BY ROWID SQL + existing_group_user_ids = GroupUser.pluck(:group_id, :user_id).to_set + + create_group_users(group_members) do |row| + group_id = group_id_from_imported_id(row["group_id"]) + user_id = user_id_from_imported_id(row["user_id"]) + next if existing_group_user_ids.include?([group_id, user_id]) + + { group_id: group_id, user_id: user_id } + end + + group_members.close + end + + def import_users + puts "", "Importing users..." + + users = query(<<~SQL) + SELECT * + FROM users + ORDER BY id + SQL + create_users(users) do |row| + next if user_id_from_imported_id(row["id"]).present? + sso_record = JSON.parse(row["sso_record"]) if row["sso_record"].present? if row["suspension"].present? @@ -90,97 +383,249 @@ class BulkImport::Generic < BulkImport::Base suspended_till = suspension["suspended_till"] end + if row["anonymized"] == 1 + while true + anon_suffix = (SecureRandom.random_number * 100_000_000).to_i + break if !@anonymized_user_suffixes.include?(anon_suffix) + end + + row["username"] = "anon_#{anon_suffix}" + row["email"] = "#{row["username"]}#{UserAnonymizer::EMAIL_SUFFIX}" + row["name"] = nil + row["registration_ip_address"] = nil + + @anonymized_user_suffixes << anon_suffix + end + { imported_id: row["id"], username: row["username"], + original_username: row["original_username"], name: row["name"], email: row["email"], - external_id: sso_record&.fetch("external_id"), + external_id: sso_record&.fetch("external_id", nil), created_at: to_datetime(row["created_at"]), + last_seen_at: to_datetime(row["last_seen_at"]), admin: row["admin"], moderator: row["moderator"], suspended_at: suspended_at, suspended_till: suspended_till, + registration_ip_address: row["registration_ip_address"], } end + + users.close end def import_user_emails puts "", "Importing user emails..." + existing_user_ids = UserEmail.pluck(:user_id).to_set + users = query(<<~SQL) - SELECT ROWID, id, email, created_at + SELECT id, email, created_at FROM users - ORDER BY ROWID + ORDER BY id SQL create_user_emails(users) do |row| + user_id = user_id_from_imported_id(row["id"]) + next if user_id && existing_user_ids.include?(user_id) + + { user_id: user_id, email: row["email"], created_at: to_datetime(row["created_at"]) } + end + + users.close + end + + def import_user_profiles + puts "", "Importing user profiles..." + + users = query(<<~SQL) + SELECT id, bio + FROM users + WHERE bio IS NOT NULL + ORDER BY id + SQL + + existing_user_ids = UserProfile.pluck(:user_id).to_set + + create_user_profiles(users) do |row| + user_id = user_id_from_imported_id(row["id"]) + next if user_id && existing_user_ids.include?(user_id) + + { user_id: user_id, bio_raw: row["bio"] } + end + + users.close + end + + def import_user_options + puts "", "Importing user options..." + + users = query(<<~SQL) + SELECT id, timezone, email_level, email_messages_level, email_digests + FROM users + WHERE timezone IS NOT NULL + ORDER BY id + SQL + + existing_user_ids = UserOption.pluck(:user_id).to_set + + create_user_options(users) do |row| + user_id = user_id_from_imported_id(row["id"]) + next if user_id && existing_user_ids.include?(user_id) + { - # FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id" - imported_id: row["id"], - imported_user_id: row["id"], - email: row["email"], - created_at: to_datetime(row["created_at"]), + user_id: user_id, + timezone: row["timezone"], + email_level: row["email_level"], + email_messages_level: row["email_messages_level"], + email_digests: row["email_digests"], } end + + users.close + end + + def import_user_fields + puts "", "Importing user fields..." + + user_fields = query(<<~SQL) + SELECT * + FROM user_fields + ORDER BY ROWID + SQL + + existing_user_field_names = UserField.pluck(:name).to_set + + user_fields.each do |row| + next if existing_user_field_names.include?(row["name"]) + + options = row.delete("options") + field = UserField.create!(row) + + if options.present? + JSON.parse(options).each { |option| field.user_field_options.create!(value: option) } + end + end + + user_fields.close + end + + def import_user_field_values + puts "", "Importing user field values..." + + discourse_field_mapping = UserField.pluck(:name, :id).to_h + + user_fields = query("SELECT id, name FROM user_fields") + + field_id_mapping = + user_fields + .map do |row| + discourse_field_id = discourse_field_mapping[row["name"]] + field_name = "#{User::USER_FIELD_PREFIX}#{discourse_field_id}" + [row["id"], field_name] + end + .to_h + + user_fields.close + + # TODO make restriction to non-anonymized users configurable + values = query(<<~SQL) + SELECT v.* + FROM user_field_values v + JOIN users u ON v.user_id = u.id + WHERE u.anonymized = FALSE + SQL + + existing_user_fields = + UserCustomField.where("name LIKE '#{User::USER_FIELD_PREFIX}%'").pluck(:user_id, :name).to_set + + create_user_custom_fields(values) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + field_name = field_id_mapping[row["field_id"]] + next if user_id && field_name && existing_user_fields.include?([user_id, field_name]) + + { user_id: user_id, name: field_name, value: row["value"] } + end + + values.close end def import_single_sign_on_records puts "", "Importing SSO records..." users = query(<<~SQL) - SELECT ROWID, id, sso_record + SELECT id, sso_record FROM users WHERE sso_record IS NOT NULL - ORDER BY ROWID + ORDER BY id SQL + existing_user_ids = SingleSignOnRecord.pluck(:user_id).to_set + create_single_sign_on_records(users) do |row| + user_id = user_id_from_imported_id(row["id"]) + next if user_id && existing_user_ids.include?(user_id) + sso_record = JSON.parse(row["sso_record"], symbolize_names: true) - # FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id" - sso_record[:imported_id] = row["id"] - sso_record[:imported_user_id] = row["id"] + sso_record[:user_id] = user_id sso_record end + + users.close end def import_topics - puts "Importing topics..." + puts "", "Importing topics..." topics = query(<<~SQL) - SELECT ROWID, * + SELECT * FROM topics - ORDER BY ROWID + ORDER BY id SQL create_topics(topics) do |row| + unless row["category_id"] && (category_id = category_id_from_imported_id(row["category_id"])) + next + end + + next if topic_id_from_imported_id(row["id"]).present? + { archetype: row["private_message"] ? Archetype.private_message : Archetype.default, imported_id: row["id"], title: row["title"], user_id: user_id_from_imported_id(row["user_id"]), created_at: to_datetime(row["created_at"]), - category_id: category_id_from_imported_id(row["category_id"]), + category_id: category_id, closed: to_boolean(row["closed"]), views: row["views"], + subtype: row["subtype"], } end + + topics.close end def import_topic_allowed_users - puts "Importing topic_allowed_users..." + # FIXME: This is not working correctly because it imports only the first user from the list! + # Groups are ignored completely. And there is no check for existing records. + + puts "", "Importing topic_allowed_users..." topics = query(<<~SQL) - SELECT ROWID, * + SELECT * FROM topics WHERE private_message IS NOT NULL - ORDER BY ROWID + ORDER BY id SQL added = 0 create_topic_allowed_users(topics) do |row| - next unless topic_id = topic_id_from_imported_id(row["id"]) + next unless (topic_id = topic_id_from_imported_id(row["id"])) imported_user_id = JSON.parse(row["private_message"])["user_ids"].first user_id = user_id_from_imported_id(imported_user_id) added += 1 @@ -191,164 +636,1553 @@ class BulkImport::Generic < BulkImport::Base } end - puts "", "Added #{added} topic_allowed_users records." + topics.close + + puts " Added #{added} topic_allowed_users records." end def import_posts - puts "Importing posts..." + puts "", "Importing posts..." posts = query(<<~SQL) - SELECT ROWID, * + SELECT * FROM posts - ORDER BY topic_id, post_number + ORDER BY topic_id, id SQL + group_names = Group.pluck(:id, :name).to_h + # TODO: Investigate feasibility of loading all users on large sites + user_names = User.pluck(:id, :username).to_h + create_posts(posts) do |row| next if row["raw"].blank? - next unless topic_id = topic_id_from_imported_id(row["topic_id"]) + next unless (topic_id = topic_id_from_imported_id(row["topic_id"])) + next if post_id_from_imported_id(row["id"]).present? + + # TODO Ensure that we calculate the `like_count` if the column is empty, but the DB contains likes. + # Otherwise #import_user_stats will not be able to calculate the correct `likes_received` value. { imported_id: row["id"], topic_id: topic_id, user_id: user_id_from_imported_id(row["user_id"]), created_at: to_datetime(row["created_at"]), - raw: row["raw"], + raw: post_raw(row, group_names, user_names), like_count: row["like_count"], + reply_to_post_number: + row["reply_to_post_id"] ? post_number_from_imported_id(row["reply_to_post_id"]) : nil, } end + + posts.close + end + + def post_raw(row, group_names, user_names) + raw = row["raw"] + placeholders = row["placeholders"]&.then { |json| JSON.parse(json) } + + if (polls = placeholders&.fetch("polls", nil)) + poll_mapping = polls.map { |poll| [poll["poll_id"], poll["placeholder"]] }.to_h + + poll_details = query(<<~SQL, { post_id: row["id"] }) + SELECT p.*, ROW_NUMBER() OVER (PARTITION BY p.post_id, p.name ORDER BY p.id) AS seq, + JSON_GROUP_ARRAY(DISTINCT TRIM(po.text)) AS options + FROM polls p + JOIN poll_options po ON p.id = po.poll_id + WHERE p.post_id = :post_id + ORDER BY p.id, po.position, po.id + SQL + + poll_details.each do |poll| + if (placeholder = poll_mapping[poll["id"]]) + raw.gsub!(placeholder, poll_bbcode(poll)) + end + end + + poll_details.close + end + + if (mentions = placeholders&.fetch("mentions", nil)) + mentions.each do |mention| + name = + if mention["type"] == "user" + user_names[user_id_from_imported_id(mention["id"])] + elsif mention["type"] == "group" + group_names[group_id_from_imported_id(mention["id"])] + end + + puts "#{mention["type"]} not found -- #{mention["id"]}" unless name + raw.gsub!(mention["placeholder"], "@#{name}") + end + end + + if (event = placeholders&.fetch("event", nil)) + event_details = @source_db.get_first_row(<<~SQL, { event_id: event["event_id"] }) + SELECT * + FROM events + WHERE id = :event_id + SQL + + raw.gsub!(event["placeholder"], event_bbcode(event_details)) if event_details + end + + if row["upload_ids"].present? && @uploads_db + upload_ids = JSON.parse(row["upload_ids"]) + upload_ids_placeholders = (["?"] * upload_ids.size).join(",") + + query( + "SELECT id, markdown FROM uploads WHERE id IN (#{upload_ids_placeholders})", + upload_ids, + db: @uploads_db, + ).tap do |result_set| + result_set.each { |upload| raw.gsub!("[upload|#{upload["id"]}]", upload["markdown"] || "") } + result_set.close + end + end + + raw + end + + def process_raw(original_raw) + original_raw + end + + def poll_name(row) + name = +(row["name"] || "poll") + name << "-#{row["seq"]}" if row["seq"] > 1 + name + end + + def poll_bbcode(row) + return unless defined?(::Poll) + + name = poll_name(row) + type = ::Poll.types.key(row["type"]) + regular_type = type == ::Poll.types[:regular] + number_type = type == ::Poll.types[:number] + result_visibility = ::Poll.results.key(row["results"]) + min = row["min"] + max = row["max"] + step = row["step"] + visibility = ::Poll.visibilities.key(row["visibility"]) + chart_type = ::Poll.chart_types.key(row["chart_type"]) + groups = row["groups"] + auto_close = to_datetime(row["close_at"]) + title = row["title"] + options = JSON.parse(row["options"]) + + text = +"[poll" + text << " name=#{name}" if name != "poll" + text << " type=#{type}" + text << " results=#{result_visibility}" + text << " min=#{min}" if min && !regular_type + text << " max=#{max}" if max && !regular_type + text << " step=#{step}" if step && !number_type + text << " public=true" if visibility == Poll.visibilities[:everyone] + text << " chartType=#{chart_type}" if chart_type.present? && !regular_type + text << " groups=#{groups.join(",")}" if groups.present? + text << " close=#{auto_close.utc.iso8601}" if auto_close + text << "]\n" + text << "# #{title}\n" if title.present? + text << options.map { |o| "* #{o}" }.join("\n") if options.present? && !number_type + text << "\n[/poll]\n" + text + end + + def event_bbcode(event) + return unless defined?(::DiscoursePostEvent) + + starts_at = to_datetime(event["starts_at"]) + ends_at = to_datetime(event["ends_at"]) + status = ::DiscoursePostEvent::Event.statuses[event["status"]].to_s + name = + if (name = event["name"].presence) + name.ljust(::DiscoursePostEvent::Event::MIN_NAME_LENGTH, ".").truncate( + ::DiscoursePostEvent::Event::MAX_NAME_LENGTH, + ) + end + url = event["url"] + custom_fields = event["custom_fields"] ? JSON.parse(event["custom_fields"]) : nil + + text = +"[event" + text << %{ start="#{starts_at.utc.strftime("%Y-%m-%d %H:%M")}"} if starts_at + text << %{ end="#{ends_at.utc.strftime("%Y-%m-%d %H:%M")}"} if ends_at + text << %{ timezone="UTC"} + text << %{ status="#{status}"} if status + text << %{ name="#{name}"} if name + text << %{ url="#{url}"} if url + custom_fields.each { |key, value| text << %{ #{key}="#{value}"} } if custom_fields + text << "]\n" + text << "[/event]\n" + text + end + + def import_post_custom_fields + puts "", "Importing post custom fields..." + + post_custom_fields = query(<<~SQL) + SELECT * + FROM post_custom_fields + ORDER BY post_id, name + SQL + + field_names = + query("SELECT DISTINCT name FROM post_custom_fields") { _1.map { |row| row["name"] } } + existing_post_custom_fields = + PostCustomField.where(name: field_names).pluck(:post_id, :name).to_set + + create_post_custom_fields(post_custom_fields) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + next if post_id.nil? + + next if existing_post_custom_fields.include?([post_id, row["name"]]) + + { post_id: post_id, name: row["name"], value: row["value"] } + end + + post_custom_fields.close + end + + def import_polls + unless defined?(::Poll) + puts "", "Skipping polls, because the poll plugin is not installed." + return + end + + puts "", "Importing polls..." + + polls = query(<<~SQL) + SELECT *, ROW_NUMBER() OVER (PARTITION BY post_id, name ORDER BY id) AS seq + FROM polls + ORDER BY id + SQL + + create_polls(polls) do |row| + next if poll_id_from_original_id(row["id"]).present? + + post_id = post_id_from_imported_id(row["post_id"]) + next unless post_id + + { + original_id: row["id"], + post_id: post_id, + name: poll_name(row), + closed_at: to_datetime(row["closed_at"]), + type: row["type"], + status: row["status"], + results: row["results"], + visibility: row["visibility"], + min: row["min"], + max: row["max"], + step: row["step"], + anonymous_voters: row["anonymous_voters"], + created_at: to_datetime(row["created_at"]), + chart_type: row["chart_type"], + groups: row["groups"], + title: row["title"], + } + end + + polls.close + + puts "", "Importing polls into post custom fields..." + + polls = query(<<~SQL) + SELECT post_id, MIN(created_at) AS created_at + FROM polls + GROUP BY post_id + ORDER BY post_id + SQL + + field_name = DiscoursePoll::HAS_POLLS + value = "true" + existing_fields = PostCustomField.where(name: field_name).pluck(:post_id).to_set + + create_post_custom_fields(polls) do |row| + next unless (post_id = post_id_from_imported_id(row["post_id"])) + next if existing_fields.include?(post_id) + + { + post_id: post_id, + name: field_name, + value: value, + created_at: to_datetime(row["created_at"]), + } + end + + polls.close + end + + def import_poll_options + unless defined?(::Poll) + puts "", "Skipping polls, because the poll plugin is not installed." + return + end + + puts "", "Importing poll options..." + + poll_options = query(<<~SQL) + SELECT poll_id, TRIM(text) AS text, MIN(created_at) AS created_at, GROUP_CONCAT(id) AS option_ids + FROM poll_options + GROUP BY 1, 2 + ORDER BY poll_id, position, id + SQL + + create_poll_options(poll_options) do |row| + poll_id = poll_id_from_original_id(row["poll_id"]) + next unless poll_id + + option_ids = row["option_ids"].split(",") + option_ids.each { |option_id| next if poll_option_id_from_original_id(option_id).present? } + + { + original_ids: option_ids, + poll_id: poll_id, + html: row["text"], + created_at: to_datetime(row["created_at"]), + } + end + + poll_options.close + end + + def import_poll_votes + unless defined?(::Poll) + puts "", "Skipping polls, because the poll plugin is not installed." + return + end + + puts "", "Importing poll votes..." + + poll_votes = query(<<~SQL) + SELECT po.poll_id, pv.poll_option_id, pv.user_id, pv.created_at + FROM poll_votes pv + JOIN poll_options po ON pv.poll_option_id = po.id + ORDER BY pv.poll_option_id, pv.user_id + SQL + + existing_poll_votes = PollVote.pluck(:poll_option_id, :user_id).to_set + + create_poll_votes(poll_votes) do |row| + poll_id = poll_id_from_original_id(row["poll_id"]) + poll_option_id = poll_option_id_from_original_id(row["poll_option_id"]) + user_id = user_id_from_imported_id(row["user_id"]) + next unless poll_id && poll_option_id && user_id + + next unless existing_poll_votes.add?([poll_option_id, user_id]) + + { + poll_id: poll_id, + poll_option_id: poll_option_id, + user_id: user_id, + created_at: row["created_at"], + } + end + + poll_votes.close end def import_likes - puts "Importing likes..." - - @imported_likes = Set.new + puts "", "Importing likes..." likes = query(<<~SQL) - SELECT ROWID, * - FROM likes - ORDER BY ROWID + SELECT post_id, user_id, created_at + FROM likes + ORDER BY post_id, user_id SQL + post_action_type_id = PostActionType.types[:like] + existing_likes = + PostAction.where(post_action_type_id: post_action_type_id).pluck(:post_id, :user_id).to_set + create_post_actions(likes) do |row| post_id = post_id_from_imported_id(row["post_id"]) user_id = user_id_from_imported_id(row["user_id"]) - next if post_id.nil? || user_id.nil? - next if @imported_likes.add?([post_id, user_id]).nil? + next unless post_id && user_id + next unless existing_likes.add?([post_id, user_id]) { - # FIXME: missing imported_id - post_id: post_id_from_imported_id(row["post_id"]), - user_id: user_id_from_imported_id(row["user_id"]), - post_action_type_id: 2, + post_id: post_id, + user_id: user_id, + post_action_type_id: post_action_type_id, created_at: to_datetime(row["created_at"]), } end + + likes.close + + puts "", "Updating like counts of topics..." + start_time = Time.now + + DB.exec(<<~SQL) + WITH + likes AS ( + SELECT topic_id, SUM(like_count) AS like_count FROM posts WHERE like_count > 0 GROUP BY topic_id + ) + UPDATE topics + SET like_count = likes.like_count + FROM likes + WHERE topics.id = likes.topic_id + AND topics.like_count <> likes.like_count + SQL + + puts " Update took #{(Time.now - start_time).to_i} seconds." + end + + def import_topic_users + puts "", "Importing topic users..." + + topic_users = query(<<~SQL) + SELECT * + FROM topic_users + ORDER BY user_id, topic_id + SQL + + existing_topics = TopicUser.pluck(:topic_id).to_set + + create_topic_users(topic_users) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + topic_id = topic_id_from_imported_id(row["topic_id"]) + next unless user_id && topic_id + next if existing_topics.include?(topic_id) + + { + user_id: user_id, + topic_id: topic_id, + last_read_post_number: row["last_read_post_number"], + last_visited_at: to_datetime(row["last_visited_at"]), + first_visited_at: to_datetime(row["first_visited_at"]), + notification_level: row["notification_level"], + notifications_changed_at: to_datetime(row["notifications_changed_at"]), + notifications_reason_id: + row["notifications_reason_id"] || TopicUser.notification_reasons[:user_changed], + total_msecs_viewed: row["total_msecs_viewed"] || 0, + } + end + + topic_users.close + end + + def update_topic_users + puts "", "Updating topic users..." + + start_time = Time.now + + params = { + post_action_type_id: PostActionType.types[:like], + msecs_viewed_per_post: 10_000, + notification_level_topic_created: NotificationLevels.topic_levels[:watching], + notification_level_posted: NotificationLevels.topic_levels[:tracking], + reason_topic_created: TopicUser.notification_reasons[:created_topic], + reason_posted: TopicUser.notification_reasons[:created_post], + } + + DB.exec(<<~SQL, params) + INSERT INTO topic_users (user_id, topic_id, posted, last_read_post_number, first_visited_at, last_visited_at, + notification_level, notifications_changed_at, notifications_reason_id, total_msecs_viewed, + last_posted_at) + SELECT p.user_id, p.topic_id, TRUE AS posted, MAX(p.post_number) AS last_read_post_number, + MIN(p.created_at) AS first_visited_at, MAX(p.created_at) AS last_visited_at, + CASE WHEN MIN(p.post_number) = 1 THEN :notification_level_topic_created + ELSE :notification_level_posted END AS notification_level, MIN(p.created_at) AS notifications_changed_at, + CASE WHEN MIN(p.post_number) = 1 THEN :reason_topic_created ELSE :reason_posted END AS notifications_reason_id, + MAX(p.post_number) * :msecs_viewed_per_post AS total_msecs_viewed, MAX(p.created_at) AS last_posted_at + FROM posts p + JOIN topics t ON p.topic_id = t.id + WHERE p.user_id > 0 + AND p.deleted_at IS NULL + AND NOT p.hidden + AND t.deleted_at IS NULL + AND t.visible + GROUP BY p.user_id, p.topic_id + ON CONFLICT (user_id, topic_id) DO UPDATE SET posted = excluded.posted, + last_read_post_number = GREATEST(topic_users.last_read_post_number, excluded.last_read_post_number), + first_visited_at = LEAST(topic_users.first_visited_at, excluded.first_visited_at), + last_visited_at = GREATEST(topic_users.last_visited_at, excluded.last_visited_at), + notification_level = GREATEST(topic_users.notification_level, excluded.notification_level), + notifications_changed_at = CASE WHEN COALESCE(excluded.notification_level, 0) > COALESCE(topic_users.notification_level, 0) + THEN COALESCE(excluded.notifications_changed_at, topic_users.notifications_changed_at) + ELSE topic_users.notifications_changed_at END, + notifications_reason_id = CASE WHEN COALESCE(excluded.notification_level, 0) > COALESCE(topic_users.notification_level, 0) + THEN COALESCE(excluded.notifications_reason_id, topic_users.notifications_reason_id) + ELSE topic_users.notifications_reason_id END, + total_msecs_viewed = CASE WHEN topic_users.total_msecs_viewed = 0 + THEN excluded.total_msecs_viewed + ELSE topic_users.total_msecs_viewed END, + last_posted_at = GREATEST(topic_users.last_posted_at, excluded.last_posted_at) + SQL + + DB.exec(<<~SQL, params) + INSERT INTO topic_users (user_id, topic_id, last_read_post_number, first_visited_at, last_visited_at, total_msecs_viewed, liked) + SELECT pa.user_id, p.topic_id, MAX(p.post_number) AS last_read_post_number, MIN(pa.created_at) AS first_visited_at, + MAX(pa.created_at) AS last_visited_at, MAX(p.post_number) * :msecs_viewed_per_post AS total_msecs_viewed, + TRUE AS liked + FROM post_actions pa + JOIN posts p ON pa.post_id = p.id + JOIN topics t ON p.topic_id = t.id + WHERE pa.post_action_type_id = :post_action_type_id + AND pa.user_id > 0 + AND pa.deleted_at IS NULL + AND p.deleted_at IS NULL + AND NOT p.hidden + AND t.deleted_at IS NULL + AND t.visible + GROUP BY pa.user_id, p.topic_id + ON CONFLICT (user_id, topic_id) DO UPDATE SET last_read_post_number = GREATEST(topic_users.last_read_post_number, excluded.last_read_post_number), + first_visited_at = LEAST(topic_users.first_visited_at, excluded.first_visited_at), + last_visited_at = GREATEST(topic_users.last_visited_at, excluded.last_visited_at), + total_msecs_viewed = CASE WHEN topic_users.total_msecs_viewed = 0 + THEN excluded.total_msecs_viewed + ELSE topic_users.total_msecs_viewed END, + liked = excluded.liked + SQL + + puts " Updated topic users in #{(Time.now - start_time).to_i} seconds." end def import_user_stats - puts "Importing user stats..." + puts "", "Importing user stats..." - users = query(<<~SQL) - WITH posts_counts AS ( - SELECT COUNT(p.id) AS count, p.user_id - FROM posts p GROUP BY p.user_id - ), - topic_counts AS ( - SELECT COUNT(t.id) AS count, t.user_id - FROM topics t GROUP BY t.user_id - ), - first_post AS ( - SELECT MIN(p.created_at) AS created_at, p.user_id - FROM posts p GROUP BY p.user_id ORDER BY p.created_at ASC - ) - SELECT u.id AS user_id, u.created_at, pc.count AS posts, tc.count AS topics, fp.created_at AS first_post - FROM users u - JOIN posts_counts pc ON u.id = pc.user_id - JOIN topic_counts tc ON u.id = tc.user_id - JOIN first_post fp ON u.id = fp.user_id + start_time = Time.now + + # TODO Merge with #update_user_stats from import.rake and check if there are privacy concerns + # E.g. maybe we need to exclude PMs from the calculation? + + DB.exec(<<~SQL) + WITH + post_stats AS ( + SELECT p.user_id, COUNT(p.id) AS post_count, MIN(p.created_at) AS first_post_created_at, + SUM(like_count) AS likes_received + FROM posts p + GROUP BY p.user_id + ), + topic_stats AS ( + SELECT t.user_id, COUNT(t.id) AS topic_count FROM topics t GROUP BY t.user_id + ), + like_stats AS ( + SELECT pa.user_id, COUNT(*) AS likes_given + FROM post_actions pa + WHERE pa.post_action_type_id = 2 + GROUP BY pa.user_id + ) + INSERT + INTO user_stats (user_id, new_since, post_count, topic_count, first_post_created_at, likes_received, likes_given) + SELECT u.id, u.created_at AS new_since, COALESCE(ps.post_count, 0) AS post_count, + COALESCE(ts.topic_count, 0) AS topic_count, ps.first_post_created_at, + COALESCE(ps.likes_received, 0) AS likes_received, COALESCE(ls.likes_given, 0) AS likes_given + FROM users u + LEFT JOIN post_stats ps ON u.id = ps.user_id + LEFT JOIN topic_stats ts ON u.id = ts.user_id + LEFT JOIN like_stats ls ON u.id = ls.user_id + WHERE NOT EXISTS ( + SELECT 1 + FROM user_stats us + WHERE us.user_id = u.id + ) + ON CONFLICT DO NOTHING SQL - create_user_stats(users) do |row| - user = { - imported_id: row["user_id"], - imported_user_id: row["user_id"], - new_since: to_datetime(row["created_at"]), - post_count: row["posts"], - topic_count: row["topics"], - first_post_created_at: to_datetime(row["first_post"]), - } + puts " Imported user stats in #{(Time.now - start_time).to_i} seconds." + end - likes_received = @db.execute(<<~SQL) - SELECT COUNT(l.id) AS likes_received - FROM likes l JOIN posts p ON l.post_id = p.id - WHERE p.user_id = #{row["user_id"]} - SQL + def import_muted_users + puts "", "Importing muted users..." - user[:likes_received] = row["likes_received"] if likes_received + muted_users = query(<<~SQL) + SELECT * + FROM muted_users + SQL - likes_given = @db.execute(<<~SQL) - SELECT COUNT(l.id) AS likes_given - FROM likes l - WHERE l.user_id = #{row["user_id"]} - SQL + existing_user_ids = MutedUser.pluck(:user_id).to_set - user[:likes_given] = row["likes_given"] if likes_given + create_muted_users(muted_users) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + next if user_id && existing_user_ids.include?(user_id) - user + { user_id: user_id, muted_user_id: user_id_from_imported_id(row["muted_user_id"]) } end + + muted_users.close + end + + def import_user_histories + puts "", "Importing user histories..." + + user_histories = query(<<~SQL) + SELECT id, JSON_EXTRACT(suspension, '$.reason') AS reason + FROM users + WHERE suspension IS NOT NULL + SQL + + action_id = UserHistory.actions[:suspend_user] + existing_user_ids = UserHistory.where(action: action_id).pluck(:target_user_id).to_set + + create_user_histories(user_histories) do |row| + user_id = user_id_from_imported_id(row["id"]) + next if user_id && existing_user_ids.include?(user_id) + + { + action: action_id, + acting_user_id: Discourse::SYSTEM_USER_ID, + target_user_id: user_id, + details: row["reason"], + } + end + + user_histories.close + end + + def import_user_notes + puts "", "Importing user notes..." + + unless defined?(::DiscourseUserNotes) + puts " Skipping import of user notes because the plugin is not installed." + return + end + + user_notes = query(<<~SQL) + SELECT user_id, + JSON_GROUP_ARRAY(JSON_OBJECT('raw', raw, 'created_by', created_by_user_id, 'created_at', + created_at)) AS note_json_text + FROM user_notes + GROUP BY user_id + ORDER BY user_id, id + SQL + + existing_user_ids = + PluginStoreRow + .where(plugin_name: "user_notes") + .pluck(:key) + .map { |key| key.delete_prefix("notes:").to_i } + .to_set + + create_plugin_store_rows(user_notes) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + next if !user_id || existing_user_ids.include?(user_id) + + notes = JSON.parse(row["note_json_text"], symbolize_names: true) + notes.each do |note| + note[:id] = SecureRandom.hex(16) + note[:user_id] = user_id + note[:created_by] = ( + if note[:created_by] + user_id_from_imported_id(note[:created_by]) + else + Discourse::SYSTEM_USER_ID + end + ) + note[:created_at] = to_datetime(note[:created_at]) + end + + { + plugin_name: "user_notes", + key: "notes:#{user_id}", + type_name: "JSON", + value: notes.to_json, + } + end + + user_notes.close + end + + def import_user_note_counts + puts "", "Importing user note counts..." + + unless defined?(::DiscourseUserNotes) + puts " Skipping import of user notes because the plugin is not installed." + return + end + + user_note_counts = query(<<~SQL) + SELECT user_id, COUNT(*) AS count + FROM user_notes + GROUP BY user_id + ORDER BY user_id + SQL + + existing_user_ids = UserCustomField.where(name: "user_notes_count").pluck(:user_id).to_set + + create_user_custom_fields(user_note_counts) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + next if !user_id || existing_user_ids.include?(user_id) + + { user_id: user_id, name: "user_notes_count", value: row["count"].to_s } + end + + user_note_counts.close + end + + def import_user_followers + puts "", "Importing user followers..." + + unless defined?(::Follow) + puts " Skipping import of user followers because the plugin is not installed." + return + end + + user_followers = query(<<~SQL) + SELECT * + FROM user_followers + ORDER BY user_id, follower_id + SQL + + existing_followers = UserFollower.pluck(:user_id, :follower_id).to_set + notification_level = Follow::Notification.levels[:watching] + + create_user_followers(user_followers) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + follower_id = user_id_from_imported_id(row["follower_id"]) + + next if !user_id || !follower_id || existing_followers.include?([user_id, follower_id]) + + { + user_id: user_id, + follower_id: follower_id, + level: notification_level, + created_at: to_datetime(row["created_at"]), + } + end + + user_followers.close + end + + def import_uploads + return if !@uploads_db + + puts "", "Importing uploads..." + + uploads = query(<<~SQL, db: @uploads_db) + SELECT id, upload + FROM uploads + WHERE upload IS NOT NULL + ORDER BY rowid + SQL + + create_uploads(uploads) do |row| + next if upload_id_from_original_id(row["id"]).present? + + upload = JSON.parse(row["upload"], symbolize_names: true) + upload[:original_id] = row["id"] + upload + end + + uploads.close + end + + def import_optimized_images + return if !@uploads_db + + puts "", "Importing optimized images..." + + optimized_images = query(<<~SQL, db: @uploads_db) + SELECT oi.id AS upload_id, x.value AS optimized_image + FROM optimized_images oi, + JSON_EACH(oi.optimized_images) x + WHERE optimized_images IS NOT NULL + ORDER BY oi.rowid, x.value -> 'id' + SQL + + DB.exec(<<~SQL) + DELETE + FROM optimized_images oi + WHERE EXISTS ( + SELECT 1 + FROM migration_mappings mm + WHERE mm.type = 1 + AND mm.discourse_id::BIGINT = oi.upload_id + ) + SQL + + existing_optimized_images = OptimizedImage.pluck(:upload_id, :height, :width).to_set + + create_optimized_images(optimized_images) do |row| + upload_id = upload_id_from_original_id(row["upload_id"]) + next unless upload_id + + optimized_image = JSON.parse(row["optimized_image"], symbolize_names: true) + + unless existing_optimized_images.add?( + [upload_id, optimized_image[:height], optimized_image[:width]], + ) + next + end + + optimized_image[:upload_id] = upload_id + optimized_image + end + + optimized_images.close + end + + def import_user_avatars + return if !@uploads_db + + puts "", "Importing user avatars..." + + avatars = query(<<~SQL) + SELECT id, avatar_upload_id + FROM users + WHERE avatar_upload_id IS NOT NULL + ORDER BY id + SQL + + existing_user_ids = UserAvatar.pluck(:user_id).to_set + + create_user_avatars(avatars) do |row| + user_id = user_id_from_imported_id(row["id"]) + upload_id = upload_id_from_original_id(row["avatar_upload_id"]) + next if !upload_id || !user_id || existing_user_ids.include?(user_id) + + { user_id: user_id, custom_upload_id: upload_id } + end + + avatars.close + end + + def import_upload_references + puts "", "Importing upload references for user avatars..." + start_time = Time.now + DB.exec(<<~SQL) + INSERT INTO upload_references (upload_id, target_type, target_id, created_at, updated_at) + SELECT ua.custom_upload_id, 'UserAvatar', ua.id, ua.created_at, ua.updated_at + FROM user_avatars ua + WHERE ua.custom_upload_id IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM upload_references ur + WHERE ur.upload_id = ua.custom_upload_id + AND ur.target_type = 'UserAvatar' + AND ur.target_id = ua.id + ) + ON CONFLICT DO NOTHING + SQL + puts " Import took #{(Time.now - start_time).to_i} seconds." + + puts "", "Importing upload references for categories..." + start_time = Time.now + DB.exec(<<~SQL) + INSERT INTO upload_references (upload_id, target_type, target_id, created_at, updated_at) + SELECT upload_id, 'Category', target_id, created_at, updated_at + FROM ( + SELECT uploaded_logo_id AS upload_id, id AS target_id, created_at, updated_at + FROM categories + WHERE uploaded_logo_id IS NOT NULL + UNION + SELECT uploaded_logo_dark_id AS upload_id, id AS target_id, created_at, updated_at + FROM categories + WHERE uploaded_logo_dark_id IS NOT NULL + UNION + SELECT uploaded_background_id AS upload_id, id AS target_id, created_at, updated_at + FROM categories + WHERE uploaded_background_id IS NOT NULL + ) x + WHERE NOT EXISTS ( + SELECT 1 + FROM upload_references ur + WHERE ur.upload_id = x.upload_id + AND ur.target_type = 'Category' + AND ur.target_id = x.target_id + ) + ON CONFLICT DO NOTHING + SQL + puts " Import took #{(Time.now - start_time).to_i} seconds." + + puts "", "Importing upload references for badges..." + start_time = Time.now + DB.exec(<<~SQL) + INSERT INTO upload_references (upload_id, target_type, target_id, created_at, updated_at) + SELECT image_upload_id, 'Badge', id, created_at, updated_at + FROM badges b + WHERE image_upload_id IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM upload_references ur + WHERE ur.upload_id = b.image_upload_id + AND ur.target_type = 'Badge' + AND ur.target_id = b.id + ) + ON CONFLICT DO NOTHING + SQL + puts " Import took #{(Time.now - start_time).to_i} seconds." + + puts "", "Importing upload references for posts..." + post_uploads = query(<<~SQL) + SELECT p.id AS post_id, u.value AS upload_id + FROM posts p, + JSON_EACH(p.upload_ids) u + WHERE upload_ids IS NOT NULL + SQL + + existing_upload_references = + UploadReference.where(target_type: "Post").pluck(:upload_id, :target_id).to_set + + create_upload_references(post_uploads) do |row| + upload_id = upload_id_from_original_id(row["upload_id"]) + post_id = post_id_from_imported_id(row["post_id"]) + + next unless upload_id && post_id + next unless existing_upload_references.add?([upload_id, post_id]) + + { upload_id: upload_id, target_type: "Post", target_id: post_id } + end + + post_uploads.close + end + + def update_uploaded_avatar_id + puts "", "Updating user's uploaded_avatar_id column..." + + start_time = Time.now + + DB.exec(<<~SQL) + UPDATE users u + SET uploaded_avatar_id = ua.custom_upload_id + FROM user_avatars ua + WHERE u.uploaded_avatar_id IS NULL + AND u.id = ua.user_id + AND ua.custom_upload_id IS NOT NULL + SQL + + puts " Update took #{(Time.now - start_time).to_i} seconds." + end + + def import_tag_groups + puts "", "Importing tag groups..." + + SiteSetting.tags_listed_by_group = true + + @tag_group_mapping = {} + + tag_groups = query(<<~SQL) + SELECT * + FROM tag_groups + ORDER BY id + SQL + + tag_groups.each do |row| + tag_group = TagGroup.find_or_create_by!(name: row["name"]) + @tag_group_mapping[row["id"]] = tag_group.id + + if (permissions = row["permissions"]) + tag_group.permissions = + JSON + .parse(permissions) + .map do |p| + group_id = p["existing_group_id"] || group_id_from_imported_id(p["group_id"]) + group_id ? [group_id, p["permission_type"]] : nil + end + .compact + tag_group.save! + end + end + + tag_groups.close end def import_tags puts "", "Importing tags..." - tags = - query("SELECT id as topic_id, tags FROM topics") - .map do |r| - next unless r["tags"] - [r["topic_id"], JSON.parse(r["tags"]).uniq] + SiteSetting.max_tag_length = 100 if SiteSetting.max_tag_length < 100 + + @tag_mapping = {} + + tags = query(<<~SQL) + SELECT * + FROM tags + ORDER BY id + SQL + + tags.each do |row| + cleaned_tag_name = DiscourseTagging.clean_tag(row["name"]) + tag = Tag.find_or_create_by!(name: cleaned_tag_name) + @tag_mapping[row["id"]] = tag.id + + if row["tag_group_id"] + TagGroupMembership.find_or_create_by!( + tag_id: tag.id, + tag_group_id: @tag_group_mapping[row["tag_group_id"]], + ) + end + end + + tags.close + end + + def import_topic_tags + puts "", "Importing topic tags..." + + if !@tag_mapping + puts " Skipping import of topic tags because tags have not been imported." + return + end + + topic_tags = query(<<~SQL) + SELECT * + FROM topic_tags + ORDER BY topic_id, tag_id + SQL + + existing_topic_tags = TopicTag.pluck(:topic_id, :tag_id).to_set + + create_topic_tags(topic_tags) do |row| + topic_id = topic_id_from_imported_id(row["topic_id"]) + tag_id = @tag_mapping[row["tag_id"]] + + next unless topic_id && tag_id + next unless existing_topic_tags.add?([topic_id, tag_id]) + + { topic_id: topic_id, tag_id: tag_id } + end + + topic_tags.close + end + + def import_votes + puts "", "Importing votes for posts..." + + unless defined?(::PostVoting) + puts " Skipping import of votes for posts because the plugin is not installed." + return + end + + votes = query(<<~SQL) + SELECT * + FROM votes + WHERE votable_type = 'Post' + SQL + + votable_type = "Post" + existing_votes = + PostVotingVote.where(votable_type: votable_type).pluck(:user_id, :votable_id).to_set + + create_post_voting_votes(votes) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + post_id = post_id_from_imported_id(row["votable_id"]) + + next unless user_id && post_id + next unless existing_votes.add?([user_id, post_id]) + + { + user_id: user_id, + direction: row["direction"], + votable_type: votable_type, + votable_id: post_id, + created_at: to_datetime(row["created_at"]), + } + end + + votes.close + + puts "", "Updating vote counts of posts..." + + start_time = Time.now + + DB.exec(<<~SQL) + WITH + votes AS ( + SELECT votable_id AS post_id, SUM(CASE direction WHEN 'up' THEN 1 ELSE -1 END) AS vote_count + FROM post_voting_votes + GROUP BY votable_id + ) + UPDATE posts + SET qa_vote_count = votes.vote_count + FROM votes + WHERE votes.post_id = posts.id + AND votes.vote_count <> posts.qa_vote_count + SQL + + puts " Update took #{(Time.now - start_time).to_i} seconds." + end + + def import_answers + puts "", "Importing solutions into post custom fields..." + + solutions = query(<<~SQL) + SELECT * + FROM solutions + ORDER BY topic_id + SQL + + field_name = "is_accepted_answer" + value = "true" + existing_fields = PostCustomField.where(name: field_name).pluck(:post_id).to_set + + create_post_custom_fields(solutions) do |row| + next unless (post_id = post_id_from_imported_id(row["post_id"])) + next unless existing_fields.add?(post_id) + + { + post_id: post_id, + name: field_name, + value: value, + created_at: to_datetime(row["created_at"]), + } + end + + puts "", "Importing solutions into topic custom fields..." + + solutions.reset + + field_name = "accepted_answer_post_id" + existing_fields = TopicCustomField.where(name: field_name).pluck(:topic_id).to_set + + create_topic_custom_fields(solutions) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + topic_id = topic_id_from_imported_id(row["topic_id"]) + + next unless post_id && topic_id + next unless existing_fields.add?(topic_id) + + { + topic_id: topic_id, + name: field_name, + value: post_id.to_s, + created_at: to_datetime(row["created_at"]), + } + end + + puts "", "Importing solutions into user actions..." + + existing_fields = nil + solutions.reset + + action_type = UserAction::SOLVED + existing_actions = UserAction.where(action_type: action_type).pluck(:target_post_id).to_set + + create_user_actions(solutions) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + next unless post_id && existing_actions.add?(post_id) + + topic_id = topic_id_from_imported_id(row["topic_id"]) + user_id = user_id_from_imported_id(row["user_id"]) + next unless topic_id && user_id + + acting_user_id = row["acting_user_id"] ? user_id_from_imported_id(row["acting_user_id"]) : nil + + { + action_type: action_type, + user_id: user_id, + target_topic_id: topic_id, + target_post_id: post_id, + acting_user_id: acting_user_id || Discourse::SYSTEM_USER_ID, + } + end + + solutions.close + end + + def import_gamification_scores + puts "", "Importing gamification scores..." + + unless defined?(::DiscourseGamification) + puts " Skipping import of gamification scores because the plugin is not installed." + return + end + + # TODO Make this configurable + from_date = Date.today + DiscourseGamification::GamificationLeaderboard.all.each do |leaderboard| + leaderboard.update!(from_date: from_date) + end + + scores = query(<<~SQL) + SELECT * + FROM gamification_score_events + ORDER BY id + SQL + + # TODO Better way of detecting existing scores? + existing_scores = DiscourseGamification::GamificationScoreEvent.pluck(:user_id, :date).to_set + + create_gamification_score_events(scores) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + next unless user_id + + date = to_date(row["date"]) || from_date + next if existing_scores.include?([user_id, date]) + + { + user_id: user_id, + date: date, + points: row["points"], + description: row["description"], + created_at: to_datetime(row["created_at"]), + } + end + + scores.close + end + + def import_post_events + puts "", "Importing events..." + + unless defined?(::DiscoursePostEvent) + puts " Skipping import of events because the plugin is not installed." + return + end + + post_events = query(<<~SQL) + SELECT * + FROM events + ORDER BY id + SQL + + default_custom_fields = "{}" + timezone = "UTC" + public_group_invitees = "{#{::DiscoursePostEvent::Event::PUBLIC_GROUP}}" + standalone_invitees = "{}" + + existing_events = DiscoursePostEvent::Event.pluck(:id).to_set + + create_post_events(post_events) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + next if !post_id || existing_events.include?(post_id) + + { + id: post_id, + status: row["status"], + original_starts_at: to_datetime(row["starts_at"]), + original_ends_at: to_datetime(row["ends_at"]), + name: row["name"], + url: row["url"] ? row["url"][0..999] : nil, + custom_fields: row["custom_fields"] || default_custom_fields, + timezone: timezone, + raw_invitees: + ( + if row["status"] == ::DiscoursePostEvent::Event.statuses[:public] + public_group_invitees + else + standalone_invitees + end + ), + } + end + + puts "", "Importing event dates..." + + post_events.reset + existing_events = DiscoursePostEvent::EventDate.pluck(:event_id).to_set + + create_post_event_dates(post_events) do |row| + post_id = post_id_from_imported_id(row["post_id"]) + next if !post_id || existing_events.include?(post_id) + + { + event_id: post_id, + starts_at: to_datetime(row["starts_at"]), + ends_at: to_datetime(row["ends_at"]), + } + end + + puts "", "Importing topic event custom fields..." + + post_events.reset + field_name = DiscoursePostEvent::TOPIC_POST_EVENT_STARTS_AT + existing_fields = TopicCustomField.where(name: field_name).pluck(:topic_id).to_set + + create_topic_custom_fields(post_events) do |row| + date = to_datetime(row["starts_at"]) + next unless date + + topic_id = topic_id_from_imported_post_id(row["post_id"]) + next if !topic_id || existing_fields.include?(topic_id) + + { topic_id: topic_id, name: field_name, value: date.utc.strftime("%Y-%m-%d %H:%M:%S") } + end + + post_events.reset + field_name = DiscoursePostEvent::TOPIC_POST_EVENT_ENDS_AT + existing_fields = TopicCustomField.where(name: field_name).pluck(:topic_id).to_set + + create_topic_custom_fields(post_events) do |row| + date = to_datetime(row["ends_at"]) + next unless date + + topic_id = topic_id_from_imported_post_id(row["post_id"]) + next if !topic_id || existing_fields.include?(topic_id) + + { topic_id: topic_id, name: field_name, value: date.utc.strftime("%Y-%m-%d %H:%M:%S") } + end + + post_events.close + end + + def import_tag_users + puts "", "Importing tag users..." + + tag_users = query(<<~SQL) + SELECT * + FROM tag_users + ORDER BY tag_id, user_id + SQL + + existing_tag_users = TagUser.distinct.pluck(:user_id).to_set + + create_tag_users(tag_users) do |row| + tag_id = @tag_mapping[row["tag_id"]] + user_id = user_id_from_imported_id(row["user_id"]) + + next unless tag_id && user_id + next if existing_tag_users.include?(user_id) + + { tag_id: tag_id, user_id: user_id, notification_level: row["notification_level"] } + end + + tag_users.close + end + + def import_badge_groupings + puts "", "Importing badge groupings..." + + rows = query(<<~SQL) + SELECT DISTINCT badge_group + FROM badges + ORDER BY badge_group + SQL + + @badge_group_mapping = {} + max_position = BadgeGrouping.maximum(:position) || 0 + + rows.each do |row| + grouping = + BadgeGrouping.find_or_create_by!(name: row["badge_group"]) do |bg| + bg.position = max_position += 1 end - .compact + @badge_group_mapping[row["badge_group"]] = grouping.id + end - tag_mapping = {} + rows.close + end - tags - .map(&:last) - .flatten - .compact - .uniq - .each do |tag_name| - cleaned_tag_name = DiscourseTagging.clean_tag(tag_name) - tag = Tag.find_by_name(cleaned_tag_name) || Tag.create!(name: cleaned_tag_name) - tag_mapping[tag_name] = tag.id + def import_badges + puts "", "Importing badges..." + + badges = query(<<~SQL) + SELECT * + FROM badges + ORDER BY id + SQL + + existing_badge_names = Badge.pluck(:name).to_set + + create_badges(badges) do |row| + next if badge_id_from_original_id(row["id"]).present? + + badge_name = row["name"] + unless existing_badge_names.add?(badge_name) + badge_name = badge_name + "_1" + badge_name.next! until existing_badge_names.add?(badge_name) end - tags_disaggregated = - tags - .map do |topic_id, tags_of_topic| - tags_of_topic.map { |t| { topic_id: topic_id, tag_id: tag_mapping.fetch(t) } } - end - .flatten - - create_topic_tags(tags_disaggregated) do |row| - next unless topic_id = topic_id_from_imported_id(row[:topic_id]) - - { topic_id: topic_id, tag_id: row[:tag_id] } + { + original_id: row["id"], + name: badge_name, + description: row["description"], + badge_type_id: row["badge_type_id"], + badge_grouping_id: @badge_group_mapping[row["badge_group"]], + long_description: row["long_description"], + image_upload_id: + row["image_upload_id"] ? upload_id_from_original_id(row["image_upload_id"]) : nil, + query: row["query"], + } end + + badges.close + end + + def import_user_badges + puts "", "Importing user badges..." + + user_badges = query(<<~SQL) + SELECT user_id, badge_id, granted_at, + ROW_NUMBER() OVER (PARTITION BY user_id, badge_id ORDER BY granted_at) - 1 AS seq + FROM user_badges + ORDER BY user_id, badge_id, granted_at + SQL + + existing_user_badges = UserBadge.distinct.pluck(:user_id, :badge_id, :seq).to_set + + create_user_badges(user_badges) do |row| + user_id = user_id_from_imported_id(row["user_id"]) + badge_id = badge_id_from_original_id(row["badge_id"]) + seq = row["seq"] + + next unless user_id && badge_id + next if existing_user_badges.include?([user_id, badge_id, seq]) + + { user_id: user_id, badge_id: badge_id, granted_at: to_datetime(row["granted_at"]), seq: seq } + end + + user_badges.close + + puts "", "Updating badge grant counts..." + start_time = Time.now + + DB.exec(<<~SQL) + WITH + grants AS ( + SELECT badge_id, COUNT(*) AS grant_count FROM user_badges GROUP BY badge_id + ) + + UPDATE badges + SET grant_count = grants.grant_count + FROM grants + WHERE badges.id = grants.badge_id + AND badges.grant_count <> grants.grant_count + SQL + + puts " Update took #{(Time.now - start_time).to_i} seconds." + end + + def import_permalink_normalizations + puts "", "Importing permalink normalizations..." + + start_time = Time.now + + rows = query(<<~SQL) + SELECT normalization + FROM permalink_normalizations + ORDER BY normalization + SQL + + normalizations = SiteSetting.permalink_normalizations + normalizations = normalizations.blank? ? [] : normalizations.split("|") + + rows.each do |row| + normalization = row["normalization"] + normalizations << normalization unless normalizations.include?(normalization) + end + + SiteSetting.permalink_normalizations = normalizations.join("|") + rows.close + + puts " Import took #{(Time.now - start_time).to_i} seconds." + end + + def import_permalinks + puts "", "Importing permalinks for topics..." + + rows = query(<<~SQL) + SELECT id, old_relative_url + FROM topics + WHERE old_relative_url IS NOT NULL + ORDER BY id + SQL + + existing_permalinks = Permalink.where("topic_id IS NOT NULL").pluck(:topic_id).to_set + + create_permalinks(rows) do |row| + topic_id = topic_id_from_imported_id(row["id"]) + next if !topic_id || existing_permalinks.include?(topic_id) + + { url: row["old_relative_url"], topic_id: topic_id } + end + + rows.close + + puts "", "Importing permalinks for posts..." + + rows = query(<<~SQL) + SELECT id, old_relative_url + FROM posts + WHERE old_relative_url IS NOT NULL + ORDER BY id + SQL + + existing_permalinks = Permalink.where("post_id IS NOT NULL").pluck(:post_id).to_set + + create_permalinks(rows) do |row| + post_id = post_id_from_imported_id(row["id"]) + next if !post_id || existing_permalinks.include?(post_id) + + { url: row["old_relative_url"], post_id: post_id } + end + + rows.close + + puts "", "Importing permalinks for categories..." + + rows = query(<<~SQL) + SELECT id, old_relative_url + FROM categories + WHERE old_relative_url IS NOT NULL + ORDER BY id + SQL + + existing_permalinks = Permalink.where("category_id IS NOT NULL").pluck(:category_id).to_set + + create_permalinks(rows) do |row| + category_id = category_id_from_imported_id(row["id"]) + next if !category_id || existing_permalinks.include?(category_id) + + { url: row["old_relative_url"], category_id: category_id } + end + + rows.close + + if @tag_mapping + puts "", "Importing permalinks for tags..." + + rows = query(<<~SQL) + SELECT id, old_relative_url + FROM tags + WHERE old_relative_url IS NOT NULL + ORDER BY id + SQL + + existing_permalinks = Permalink.where("tag_id IS NOT NULL").pluck(:tag_id).to_set + + create_permalinks(rows) do |row| + tag_id = @tag_mapping[row["id"]] + next if !tag_id || existing_permalinks.include?(tag_id) + + { url: row["old_relative_url"], tag_id: tag_id } + end + + rows.close + else + puts " Skipping import of topic tags because tags have not been imported." + end + + puts "", "Importing permalinks for external/relative URLs..." + + rows = query(<<~SQL) + SELECT url, external_url + FROM permalinks + WHERE external_url IS NOT NULL + ORDER BY url + SQL + + existing_permalinks = Permalink.where("external_url IS NOT NULL").pluck(:external_url).to_set + + create_permalinks(rows) do |row| + next if existing_permalinks.include?(row["external_url"]) + + { url: row["url"], external_url: row["external_url"] } + end + + rows.close end def create_connection(path) sqlite = SQLite3::Database.new(path, results_as_hash: true) sqlite.busy_timeout = 60_000 # 60 seconds - sqlite.auto_vacuum = "full" - sqlite.foreign_keys = true sqlite.journal_mode = "wal" sqlite.synchronous = "normal" sqlite end - def query(sql) - @db.prepare(sql).execute + def query(sql, *bind_vars, db: @source_db) + result_set = db.prepare(sql).execute(*bind_vars) + + if block_given? + result = yield result_set + result_set.close + result + else + result_set + end end def to_date(text) @@ -364,4 +2198,4 @@ class BulkImport::Generic < BulkImport::Base end end -BulkImport::Generic.new(ARGV.first).start +BulkImport::Generic.new(ARGV[0], ARGV[1]).start