Files
mattermost/utils/file_backend_s3.go
Jonathan 375c0632fa PLT-7503: Create Message Export Scheduled Task and CLI Command (#7612)
* Created message export scheduled task

* Added CLI command to immediately kick off an export job

* Added email addresses for users joining and leaving the channel to the export

* Added support for both MySQL and PostgreSQL

* Fixing gofmt error

* Added a new ChannelMemberHistory store and associated tests

* Updating the ChannelMemberHistory channel as users create/join/leave channels

* Added user email to the message export object so it can be included in the actiance export xml

* Don't fail to log a leave event if a corresponding join event wasn't logged

* Adding copyright notices

* Adding message export settings to daily diagnostics report

* Added System Console integration for message export

* Cleaned up TODOs

* Made batch size configurable

* Added export from timestamp to CLI command

* Made ChannelMemberHistory table updates best effort

* Added a context-based timeout option to the message export CLI

* Minor PR updates/improvements

* Removed unnecessary fields from MessageExport object to reduce query overhead

* Removed JSON functions from the message export query in an effort to optimize performance

* Changed the way that channel member history queries and purges work to better account for edge cases

* Fixing a test I missed with the last refactor

* Added file copy functionality to file backend, improved config validation, added default config values

* Fixed file copy tests

* More concise use of the testing libraries

* Fixed context leak error

* Changed default export path to correctly place an 'export' directory under the 'data' directory

* Can't delete records from a read replica

* Fixed copy file tests

* Start job workers when license is applied, if configured to do so

* Suggestions from the PR

* Moar unit tests

* Fixed test imports
2017-11-30 09:07:04 -05:00

244 lines
7.5 KiB
Go

// Copyright (c) 2017-present Mattermost, Inc. All Rights Reserved.
// See License.txt for license information.
package utils
import (
"bytes"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
l4g "github.com/alecthomas/log4go"
s3 "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/credentials"
"github.com/mattermost/mattermost-server/model"
)
type S3FileBackend struct {
endpoint string
accessKey string
secretKey string
secure bool
signV2 bool
region string
bucket string
encrypt bool
trace bool
}
// Similar to s3.New() but allows initialization of signature v2 or signature v4 client.
// If signV2 input is false, function always returns signature v4.
//
// Additionally this function also takes a user defined region, if set
// disables automatic region lookup.
func (b *S3FileBackend) s3New() (*s3.Client, error) {
var creds *credentials.Credentials
if b.signV2 {
creds = credentials.NewStatic(b.accessKey, b.secretKey, "", credentials.SignatureV2)
} else {
creds = credentials.NewStatic(b.accessKey, b.secretKey, "", credentials.SignatureV4)
}
s3Clnt, err := s3.NewWithCredentials(b.endpoint, creds, b.secure, b.region)
if err != nil {
return nil, err
}
if b.trace {
s3Clnt.TraceOn(os.Stdout)
}
return s3Clnt, nil
}
func (b *S3FileBackend) TestConnection() *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("TestFileConnection", "Bad connection to S3 or minio.", nil, err.Error(), http.StatusInternalServerError)
}
exists, err := s3Clnt.BucketExists(b.bucket)
if err != nil {
return model.NewAppError("TestFileConnection", "Error checking if bucket exists.", nil, err.Error(), http.StatusInternalServerError)
}
if !exists {
l4g.Warn("Bucket specified does not exist. Attempting to create...")
err := s3Clnt.MakeBucket(b.bucket, b.region)
if err != nil {
l4g.Error("Unable to create bucket.")
return model.NewAppError("TestFileConnection", "Unable to create bucket", nil, err.Error(), http.StatusInternalServerError)
}
}
l4g.Info("Connection to S3 or minio is good. Bucket exists.")
return nil
}
func (b *S3FileBackend) ReadFile(path string) ([]byte, *model.AppError) {
s3Clnt, err := b.s3New()
if err != nil {
return nil, model.NewAppError("ReadFile", "api.file.read_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
minioObject, err := s3Clnt.GetObject(b.bucket, path)
if err != nil {
return nil, model.NewAppError("ReadFile", "api.file.read_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
defer minioObject.Close()
if f, err := ioutil.ReadAll(minioObject); err != nil {
return nil, model.NewAppError("ReadFile", "api.file.read_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
} else {
return f, nil
}
}
func (b *S3FileBackend) CopyFile(oldPath, newPath string) *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("copyFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
source := s3.NewSourceInfo(b.bucket, oldPath, nil)
destination, err := s3.NewDestinationInfo(b.bucket, newPath, nil, s3CopyMetadata(b.encrypt))
if err != nil {
return model.NewAppError("copyFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if err = s3Clnt.CopyObject(destination, source); err != nil {
return model.NewAppError("copyFile", "api.file.move_file.copy_within_s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return nil
}
func (b *S3FileBackend) MoveFile(oldPath, newPath string) *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("moveFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
source := s3.NewSourceInfo(b.bucket, oldPath, nil)
destination, err := s3.NewDestinationInfo(b.bucket, newPath, nil, s3CopyMetadata(b.encrypt))
if err != nil {
return model.NewAppError("moveFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if err = s3Clnt.CopyObject(destination, source); err != nil {
return model.NewAppError("moveFile", "api.file.move_file.copy_within_s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if err = s3Clnt.RemoveObject(b.bucket, oldPath); err != nil {
return model.NewAppError("moveFile", "api.file.move_file.delete_from_s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return nil
}
func (b *S3FileBackend) WriteFile(f []byte, path string) *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("WriteFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
ext := filepath.Ext(path)
metaData := s3Metadata(b.encrypt, "binary/octet-stream")
if model.IsFileExtImage(ext) {
metaData = s3Metadata(b.encrypt, model.GetImageMimeType(ext))
}
if _, err = s3Clnt.PutObjectWithMetadata(b.bucket, path, bytes.NewReader(f), metaData, nil); err != nil {
return model.NewAppError("WriteFile", "api.file.write_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return nil
}
func (b *S3FileBackend) RemoveFile(path string) *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("RemoveFile", "utils.file.remove_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
if err := s3Clnt.RemoveObject(b.bucket, path); err != nil {
return model.NewAppError("RemoveFile", "utils.file.remove_file.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
return nil
}
func getPathsFromObjectInfos(in <-chan s3.ObjectInfo) <-chan string {
out := make(chan string, 1)
go func() {
defer close(out)
for {
info, done := <-in
if !done {
break
}
out <- info.Key
}
}()
return out
}
func (b *S3FileBackend) ListDirectory(path string) (*[]string, *model.AppError) {
var paths []string
s3Clnt, err := b.s3New()
if err != nil {
return nil, model.NewAppError("ListDirectory", "utils.file.list_directory.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
doneCh := make(chan struct{})
defer close(doneCh)
for object := range s3Clnt.ListObjects(b.bucket, path, false, doneCh) {
if object.Err != nil {
return nil, model.NewAppError("ListDirectory", "utils.file.list_directory.s3.app_error", nil, object.Err.Error(), http.StatusInternalServerError)
}
paths = append(paths, strings.Trim(object.Key, "/"))
}
return &paths, nil
}
func (b *S3FileBackend) RemoveDirectory(path string) *model.AppError {
s3Clnt, err := b.s3New()
if err != nil {
return model.NewAppError("RemoveDirectory", "utils.file.remove_directory.s3.app_error", nil, err.Error(), http.StatusInternalServerError)
}
doneCh := make(chan struct{})
for err := range s3Clnt.RemoveObjects(b.bucket, getPathsFromObjectInfos(s3Clnt.ListObjects(b.bucket, path, true, doneCh))) {
if err.Err != nil {
doneCh <- struct{}{}
return model.NewAppError("RemoveDirectory", "utils.file.remove_directory.s3.app_error", nil, err.Err.Error(), http.StatusInternalServerError)
}
}
close(doneCh)
return nil
}
func s3Metadata(encrypt bool, contentType string) map[string][]string {
metaData := make(map[string][]string)
if contentType != "" {
metaData["Content-Type"] = []string{"contentType"}
}
if encrypt {
metaData["x-amz-server-side-encryption"] = []string{"AES256"}
}
return metaData
}
func s3CopyMetadata(encrypt bool) map[string]string {
metaData := make(map[string]string)
metaData["x-amz-server-side-encryption"] = "AES256"
return metaData
}