From 9919ee1900db7ad582cb3e5a4714fec78891f2c3 Mon Sep 17 00:00:00 2001 From: Vinoth Kannan Date: Tue, 13 Aug 2019 11:29:31 +0530 Subject: [PATCH] FIX: remove the tmp inventory files after the s3 uploads check. --- lib/s3_inventory.rb | 104 +++++++++++++++------------ spec/components/s3_inventory_spec.rb | 14 ++-- 2 files changed, 62 insertions(+), 56 deletions(-) diff --git a/lib/s3_inventory.rb b/lib/s3_inventory.rb index 501cb91b06e..ebe40fddf11 100644 --- a/lib/s3_inventory.rb +++ b/lib/s3_inventory.rb @@ -31,50 +31,58 @@ class S3Inventory end DistributedMutex.synchronize("s3_inventory_list_missing_#{type}") do - download_inventory_files_to_tmp_directory - decompress_inventory_files + begin + files.each do |file| + next if File.exists?(file[:filename][0...-3]) - multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/" - ActiveRecord::Base.transaction do - begin - connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))") - connection.copy_data("COPY #{table_name} FROM STDIN CSV") do - files.each do |file| - CSV.foreach(file[:filename][0...-3], headers: false) do |row| - key = row[CSV_KEY_INDEX] - next if Rails.configuration.multisite && key.exclude?(multisite_prefix) - url = File.join(Discourse.store.absolute_base_url, key) - connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n") + download_inventory_file_to_tmp_directory(file) + decompress_inventory_file(file) + end + + multisite_prefix = "uploads/#{RailsMultisite::ConnectionManagement.current_db}/" + ActiveRecord::Base.transaction do + begin + connection.exec("CREATE TEMP TABLE #{table_name}(url text UNIQUE, etag text, PRIMARY KEY(etag, url))") + connection.copy_data("COPY #{table_name} FROM STDIN CSV") do + files.each do |file| + CSV.foreach(file[:filename][0...-3], headers: false) do |row| + key = row[CSV_KEY_INDEX] + next if Rails.configuration.multisite && key.exclude?(multisite_prefix) + url = File.join(Discourse.store.absolute_base_url, key) + connection.put_copy_data("#{url},#{row[CSV_ETAG_INDEX]}\n") + end end end - end - # backfilling etags - connection.async_exec("UPDATE #{model.table_name} - SET etag = #{table_name}.etag - FROM #{table_name} - WHERE #{model.table_name}.etag IS NULL - AND #{model.table_name}.url = #{table_name}.url") + # backfilling etags + connection.async_exec("UPDATE #{model.table_name} + SET etag = #{table_name}.etag + FROM #{table_name} + WHERE #{model.table_name}.etag IS NULL + AND #{model.table_name}.url = #{table_name}.url") - list_missing_post_uploads if type == "original" + list_missing_post_uploads if type == "original" - uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model - missing_uploads = uploads - .joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag") - .where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL") + uploads = (model == Upload) ? model.by_users.where("created_at < ?", inventory_date) : model + missing_uploads = uploads + .joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag") + .where("#{table_name}.etag IS NULL AND #{model.table_name}.etag IS NOT NULL") - if (missing_count = missing_uploads.count) > 0 - missing_uploads.select(:id, :url).find_each do |upload| - log upload.url + if (missing_count = missing_uploads.count) > 0 + missing_uploads.select(:id, :url).find_each do |upload| + log upload.url + end + + log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing" end - log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing" + Discourse.stats.set("missing_s3_#{model.table_name}", missing_count) + ensure + connection.exec("DROP TABLE #{table_name}") unless connection.nil? end - - Discourse.stats.set("missing_s3_#{model.table_name}", missing_count) - ensure - connection.exec("DROP TABLE #{table_name}") unless connection.nil? end + ensure + cleanup! end end end @@ -118,22 +126,18 @@ class S3Inventory log "#{missing[:count]} post uploads are missing." end - def download_inventory_files_to_tmp_directory - files.each do |file| - next if File.exists?(file[:filename]) + def download_inventory_file_to_tmp_directory(file) + return if File.exists?(file[:filename]) - log "Downloading inventory file '#{file[:key]}' to tmp directory..." - failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory." + log "Downloading inventory file '#{file[:key]}' to tmp directory..." + failure_message = "Failed to inventory file '#{file[:key]}' to tmp directory." - @s3_helper.download_file(file[:key], file[:filename], failure_message) - end + @s3_helper.download_file(file[:key], file[:filename], failure_message) end - def decompress_inventory_files - files.each do |file| - log "Decompressing inventory file '#{file[:filename]}', this may take a while..." - Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory) - end + def decompress_inventory_file(file) + log "Decompressing inventory file '#{file[:filename]}', this may take a while..." + Discourse::Utils.execute_command('gzip', '--decompress', file[:filename], failure_message: "Failed to decompress inventory file '#{file[:filename]}'.", chdir: tmp_directory) end def update_bucket_policy @@ -173,6 +177,13 @@ class S3Inventory private + def cleanup! + files.each do |file| + File.delete(file[:filename]) if File.exists?(file[:filename]) + File.delete(file[:filename][0...-3]) if File.exists?(file[:filename][0...-3]) + end + end + def connection @connection ||= ActiveRecord::Base.connection.raw_connection end @@ -202,8 +213,7 @@ class S3Inventory def tmp_directory @tmp_directory ||= begin current_db = RailsMultisite::ConnectionManagement.current_db - timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S") - directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp) + directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db) FileUtils.mkdir_p(directory) directory end diff --git a/spec/components/s3_inventory_spec.rb b/spec/components/s3_inventory_spec.rb index a63913a7b74..d29a3029993 100644 --- a/spec/components/s3_inventory_spec.rb +++ b/spec/components/s3_inventory_spec.rb @@ -48,6 +48,8 @@ describe "S3Inventory" do next_marker: "eyJNYXJrZXIiOiBudWxsLCAiYm90b190cnVuY2F0ZV9hbW91bnQiOiAyfQ==" } }) + + inventory.stubs(:cleanup!) end it "should raise error if an inventory file is not found" do @@ -67,9 +69,7 @@ describe "S3Inventory" do Fabricate(:upload, etag: "ETag2", created_at: Time.now) Fabricate(:upload, created_at: 2.days.ago) - inventory.expects(:download_inventory_files_to_tmp_directory) - inventory.expects(:decompress_inventory_files) - inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2) + inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3) inventory.expects(:inventory_date).returns(Time.now) output = capture_stdout do @@ -87,9 +87,7 @@ describe "S3Inventory" do ] files.each { |file| Fabricate(:upload, url: file[0]) } - inventory.expects(:download_inventory_files_to_tmp_directory) - inventory.expects(:decompress_inventory_files) - inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2) + inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3) output = capture_stdout do expect { inventory.backfill_etags_and_list_missing }.to change { Upload.where(etag: nil).count }.by(-2) @@ -111,9 +109,7 @@ describe "S3Inventory" do post.link_post_uploads upload.delete - inventory.expects(:download_inventory_files_to_tmp_directory) - inventory.expects(:decompress_inventory_files) - inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(2) + inventory.expects(:files).returns([{ key: "Key", filename: "#{csv_filename}.gz" }]).times(3) output = capture_stdout do inventory.backfill_etags_and_list_missing