mirror of
https://github.com/discourse/discourse.git
synced 2025-02-25 18:55:32 -06:00
FEATURE: Use amazon s3 inventory to manage upload stats (#6867)
This commit is contained in:
@@ -32,9 +32,7 @@ module BackupRestore
|
||||
end
|
||||
|
||||
def download_file(filename, destination_path, failure_message = nil)
|
||||
unless @s3_helper.object(filename).download_file(destination_path)
|
||||
raise failure_message&.to_s || "Failed to download file"
|
||||
end
|
||||
@s3_helper.download_file(filename, destination_path, failure_message)
|
||||
end
|
||||
|
||||
def upload_file(filename, source_path, content_type)
|
||||
|
@@ -463,6 +463,11 @@ module Discourse
|
||||
end
|
||||
end
|
||||
|
||||
DiscourseEvent.on(:site_setting_saved) do |site_setting|
|
||||
name = site_setting.name.to_s
|
||||
Jobs.enqueue(:update_s3_inventory) if name.include?("s3_inventory") || name == "s3_upload_bucket"
|
||||
end
|
||||
|
||||
def self.current_user_provider
|
||||
@current_user_provider || Auth::DefaultCurrentUserProvider
|
||||
end
|
||||
|
@@ -124,8 +124,14 @@ module FileStore
|
||||
end
|
||||
|
||||
def list_missing_uploads(skip_optimized: false)
|
||||
list_missing(Upload, "original/")
|
||||
list_missing(OptimizedImage, "optimized/") unless skip_optimized
|
||||
if SiteSetting.enable_s3_inventory
|
||||
require 's3_inventory'
|
||||
S3Inventory.new(s3_helper, :upload).list_missing
|
||||
S3Inventory.new(s3_helper, :optimized).list_missing unless skip_optimized
|
||||
else
|
||||
list_missing(Upload, "original/")
|
||||
list_missing(OptimizedImage, "optimized/") unless skip_optimized
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
@@ -140,7 +146,7 @@ module FileStore
|
||||
verified_ids = []
|
||||
|
||||
files.each do |f|
|
||||
id = model.where("url LIKE '%#{f.key}'").pluck(:id).first if f.size > 0
|
||||
id = model.where("url LIKE '%#{f.key}' AND etag = '#{f.etag}'").pluck(:id).first
|
||||
verified_ids << id if id.present?
|
||||
marker = f.key
|
||||
end
|
||||
@@ -150,7 +156,7 @@ module FileStore
|
||||
files = @s3_helper.list(prefix, marker)
|
||||
end
|
||||
|
||||
missing_uploads = model.where("id NOT IN (SELECT val FROM verified_ids)")
|
||||
missing_uploads = model.joins('LEFT JOIN verified_ids ON verified_ids.val = id').where("verified_ids.val IS NULL")
|
||||
missing_count = missing_uploads.count
|
||||
|
||||
if missing_count > 0
|
||||
|
@@ -205,6 +205,20 @@ class S3Helper
|
||||
opts
|
||||
end
|
||||
|
||||
def download_file(filename, destination_path, failure_message = nil)
|
||||
unless object(filename).download_file(destination_path)
|
||||
raise failure_message&.to_s || "Failed to download file"
|
||||
end
|
||||
end
|
||||
|
||||
def s3_client
|
||||
@s3_client ||= Aws::S3::Client.new(@s3_options)
|
||||
end
|
||||
|
||||
def s3_inventory_path(path = 'inventory')
|
||||
get_path_for_s3_upload(path)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def default_s3_options
|
||||
@@ -228,10 +242,6 @@ class S3Helper
|
||||
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, "/")
|
||||
end
|
||||
|
||||
def s3_client
|
||||
@s3_client ||= Aws::S3::Client.new(@s3_options)
|
||||
end
|
||||
|
||||
def s3_resource
|
||||
Aws::S3::Resource.new(client: s3_client)
|
||||
end
|
||||
|
193
lib/s3_inventory.rb
Normal file
193
lib/s3_inventory.rb
Normal file
@@ -0,0 +1,193 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "aws-sdk-s3"
|
||||
require "csv"
|
||||
|
||||
class S3Inventory
|
||||
|
||||
attr_reader :inventory_id, :csv_filename, :model
|
||||
|
||||
CSV_KEY_INDEX ||= 1
|
||||
CSV_ETAG_INDEX ||= 2
|
||||
INVENTORY_PREFIX ||= "inventory"
|
||||
|
||||
def initialize(s3_helper, type)
|
||||
@s3_helper = s3_helper
|
||||
|
||||
if type == :upload
|
||||
@inventory_id = "original"
|
||||
@model = Upload
|
||||
elsif type == :optimized
|
||||
@inventory_id = "optimized"
|
||||
@model = OptimizedImage
|
||||
end
|
||||
end
|
||||
|
||||
def file
|
||||
@file ||= unsorted_files.sort_by { |file| -file.last_modified.to_i }.first
|
||||
end
|
||||
|
||||
def list_missing
|
||||
if file.blank?
|
||||
error("Failed to list inventory from S3")
|
||||
return
|
||||
end
|
||||
|
||||
DistributedMutex.synchronize("s3_inventory_list_missing_#{inventory_id}") do
|
||||
current_db = RailsMultisite::ConnectionManagement.current_db
|
||||
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
|
||||
@tmp_directory = File.join(Rails.root, "tmp", INVENTORY_PREFIX, current_db, timestamp)
|
||||
@archive_filename = File.join(@tmp_directory, File.basename(file.key))
|
||||
@csv_filename = @archive_filename[0...-3]
|
||||
|
||||
FileUtils.mkdir_p(@tmp_directory)
|
||||
download_inventory_file_to_tmp_directory
|
||||
decompress_inventory_file
|
||||
|
||||
begin
|
||||
table_name = "#{inventory_id}_inventory"
|
||||
connection = ActiveRecord::Base.connection.raw_connection
|
||||
connection.exec("CREATE TEMP TABLE #{table_name}(key text UNIQUE, etag text PRIMARY KEY)")
|
||||
connection.copy_data("COPY #{table_name} FROM STDIN CSV") do
|
||||
CSV.foreach(csv_filename, headers: false) do |row|
|
||||
connection.put_copy_data("#{row[CSV_KEY_INDEX]},#{row[CSV_ETAG_INDEX]}\n")
|
||||
end
|
||||
end
|
||||
|
||||
uploads = model.where("created_at < ?", file.last_modified)
|
||||
missing_uploads = uploads.joins("LEFT JOIN #{table_name} ON #{table_name}.etag = #{model.table_name}.etag").where("#{table_name}.etag is NULL")
|
||||
|
||||
if (missing_count = missing_uploads.count) > 0
|
||||
missing_uploads.select(:id, :url).find_each do |upload|
|
||||
log upload.url
|
||||
end
|
||||
|
||||
log "#{missing_count} of #{uploads.count} #{model.name.underscore.pluralize} are missing"
|
||||
end
|
||||
ensure
|
||||
connection.exec("DROP TABLE #{table_name}") unless connection.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def download_inventory_file_to_tmp_directory
|
||||
log "Downloading inventory file to tmp directory..."
|
||||
failure_message = "Failed to inventory file to tmp directory."
|
||||
|
||||
@s3_helper.download_file(file.key, @archive_filename, failure_message)
|
||||
end
|
||||
|
||||
def decompress_inventory_file
|
||||
log "Decompressing inventory file, this may take a while..."
|
||||
|
||||
FileUtils.cd(@tmp_directory) do
|
||||
Discourse::Utils.execute_command('gzip', '--decompress', @archive_filename, failure_message: "Failed to decompress inventory file.")
|
||||
end
|
||||
end
|
||||
|
||||
def update_bucket_policy
|
||||
@s3_helper.s3_client.put_bucket_policy(
|
||||
bucket: bucket_name,
|
||||
policy: {
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Sid": "InventoryAndAnalyticsPolicy",
|
||||
"Effect": "Allow",
|
||||
"Principal": { "Service": "s3.amazonaws.com" },
|
||||
"Action": ["s3:PutObject"],
|
||||
"Resource": ["arn:aws:s3:::#{inventory_root_path}/*"],
|
||||
"Condition": {
|
||||
"ArnLike": {
|
||||
"aws:SourceArn": "arn:aws:s3:::#{bucket_name}"
|
||||
},
|
||||
"StringEquals": {
|
||||
"s3:x-amz-acl": "bucket-owner-full-control"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}.to_json
|
||||
)
|
||||
end
|
||||
|
||||
def update_bucket_inventory_configuration
|
||||
@s3_helper.s3_client.put_bucket_inventory_configuration(
|
||||
bucket: bucket_name,
|
||||
id: inventory_id,
|
||||
inventory_configuration: inventory_configuration,
|
||||
use_accelerate_endpoint: false
|
||||
)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def inventory_configuration
|
||||
filter_prefix = inventory_id
|
||||
destination_prefix = File.join(INVENTORY_PREFIX, inventory_id)
|
||||
|
||||
if bucket_folder_path.present?
|
||||
filter_prefix = File.join(bucket_folder_path, filter_prefix)
|
||||
destination_prefix = File.join(bucket_folder_path, destination_prefix)
|
||||
end
|
||||
|
||||
{
|
||||
destination: {
|
||||
s3_bucket_destination: {
|
||||
bucket: "arn:aws:s3:::#{bucket_name}",
|
||||
prefix: destination_prefix,
|
||||
format: "CSV"
|
||||
}
|
||||
},
|
||||
filter: {
|
||||
prefix: filter_prefix
|
||||
},
|
||||
is_enabled: SiteSetting.enable_s3_inventory,
|
||||
id: inventory_id,
|
||||
included_object_versions: "Current",
|
||||
optional_fields: ["ETag"],
|
||||
schedule: {
|
||||
frequency: "Daily"
|
||||
}
|
||||
}
|
||||
end
|
||||
|
||||
def bucket_name
|
||||
@s3_helper.s3_bucket_name
|
||||
end
|
||||
|
||||
def bucket_folder_path
|
||||
@s3_helper.s3_bucket_folder_path
|
||||
end
|
||||
|
||||
def unsorted_files
|
||||
objects = []
|
||||
|
||||
@s3_helper.list(File.join(inventory_path, "data")).each do |obj|
|
||||
if obj.key.match?(/\.csv\.gz$/i)
|
||||
objects << obj
|
||||
end
|
||||
end
|
||||
|
||||
objects
|
||||
rescue Aws::Errors::ServiceError => e
|
||||
log("Failed to list inventory from S3", e)
|
||||
end
|
||||
|
||||
def inventory_path
|
||||
File.join(inventory_root_path, inventory_id)
|
||||
end
|
||||
|
||||
def inventory_root_path
|
||||
File.join(bucket_name, bucket_folder_path || "", INVENTORY_PREFIX)
|
||||
end
|
||||
|
||||
def log(message, ex = nil)
|
||||
puts(message)
|
||||
Rails.logger.error("#{ex}\n" + (ex.backtrace || []).join("\n")) if ex
|
||||
end
|
||||
|
||||
def error(message)
|
||||
log(message, StandardError.new(message))
|
||||
end
|
||||
end
|
@@ -53,6 +53,10 @@ module SiteSettings::Validations
|
||||
validate_error :s3_upload_bucket_is_required if new_val == "t" && SiteSetting.s3_upload_bucket.blank?
|
||||
end
|
||||
|
||||
def validate_enable_s3_inventory(new_val)
|
||||
validate_error :enable_s3_uploads_is_required if new_val == "t" && !SiteSetting.enable_s3_uploads?
|
||||
end
|
||||
|
||||
def validate_backup_location(new_val)
|
||||
return unless new_val == BackupLocationSiteSetting::S3
|
||||
validate_error(:s3_backup_requires_s3_settings, setting_name: "s3_backup_bucket") if SiteSetting.s3_backup_bucket.blank?
|
||||
|
Reference in New Issue
Block a user