mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
[MM-57500] Streaming support for importing file attachments (#26629)
* Bulk import: stream file attachments uploads * Add comment with context on buffer size * Add file name to logs * Use sha256 to do checksum * Fix bad merge * Fix import file * Update test --------- Co-authored-by: Mattermost Build <build@mattermost.com>
This commit is contained in:
parent
ffc08858cf
commit
446c763fa8
@ -633,6 +633,12 @@ func UploadFileSetRaw() func(t *UploadFileTask) {
|
||||
}
|
||||
}
|
||||
|
||||
func UploadFileSetExtractContent(value bool) func(t *UploadFileTask) {
|
||||
return func(t *UploadFileTask) {
|
||||
t.ExtractContent = value
|
||||
}
|
||||
}
|
||||
|
||||
type UploadFileTask struct {
|
||||
Logger mlog.LoggerIFace
|
||||
|
||||
@ -659,6 +665,10 @@ type UploadFileTask struct {
|
||||
// the file. Plugins are still invoked.
|
||||
Raw bool
|
||||
|
||||
// Whether or not to extract file attachments content
|
||||
// This is used by the bulk import process.
|
||||
ExtractContent bool
|
||||
|
||||
//=============================================================
|
||||
// Internal state
|
||||
|
||||
@ -729,14 +739,15 @@ func (a *App) UploadFileX(c request.CTX, channelID, name string, input io.Reader
|
||||
))
|
||||
|
||||
t := &UploadFileTask{
|
||||
Logger: c.Logger(),
|
||||
ChannelId: filepath.Base(channelID),
|
||||
Name: filepath.Base(name),
|
||||
Input: input,
|
||||
maxFileSize: *a.Config().FileSettings.MaxFileSize,
|
||||
maxImageRes: *a.Config().FileSettings.MaxImageResolution,
|
||||
imgDecoder: a.ch.imgDecoder,
|
||||
imgEncoder: a.ch.imgEncoder,
|
||||
Logger: c.Logger(),
|
||||
ChannelId: filepath.Base(channelID),
|
||||
Name: filepath.Base(name),
|
||||
Input: input,
|
||||
maxFileSize: *a.Config().FileSettings.MaxFileSize,
|
||||
maxImageRes: *a.Config().FileSettings.MaxImageResolution,
|
||||
imgDecoder: a.ch.imgDecoder,
|
||||
imgEncoder: a.ch.imgEncoder,
|
||||
ExtractContent: true,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(t)
|
||||
@ -803,7 +814,7 @@ func (a *App) UploadFileX(c request.CTX, channelID, name string, input io.Reader
|
||||
}
|
||||
}
|
||||
|
||||
if *a.Config().FileSettings.ExtractContent {
|
||||
if *a.Config().FileSettings.ExtractContent && t.ExtractContent {
|
||||
infoCopy := *t.fileinfo
|
||||
a.Srv().GoBuffered(func() {
|
||||
err := a.ExtractContentFromFileInfo(c, &infoCopy)
|
||||
@ -1264,15 +1275,6 @@ func (a *App) SetFileSearchableContent(rctx request.CTX, fileID string, data str
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) getFileInfoIgnoreCloudLimit(rctx request.CTX, fileID string) (*model.FileInfo, *model.AppError) {
|
||||
fileInfo, appErr := a.Srv().getFileInfo(fileID)
|
||||
if appErr == nil {
|
||||
a.generateMiniPreview(rctx, fileInfo)
|
||||
}
|
||||
|
||||
return fileInfo, appErr
|
||||
}
|
||||
|
||||
func (a *App) GetFileInfos(rctx request.CTX, page, perPage int, opt *model.GetFileInfosOptions) ([]*model.FileInfo, *model.AppError) {
|
||||
fileInfos, err := a.Srv().Store().FileInfo().GetWithOptions(page, perPage, opt)
|
||||
if err != nil {
|
||||
@ -1317,20 +1319,6 @@ func (a *App) GetFile(rctx request.CTX, fileID string) ([]byte, *model.AppError)
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (a *App) getFileIgnoreCloudLimit(rctx request.CTX, fileID string) ([]byte, *model.AppError) {
|
||||
info, err := a.getFileInfoIgnoreCloudLimit(rctx, fileID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := a.ReadFile(info.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (a *App) CopyFileInfos(rctx request.CTX, userID string, fileIDs []string) ([]string, *model.AppError) {
|
||||
var newFileIds []string
|
||||
|
||||
|
@ -6,7 +6,7 @@ package app
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
||||
@ -1272,10 +1273,65 @@ func (a *App) importReplies(rctx request.CTX, data []imports.ReplyImportData, po
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareFilesContent(fileA, fileB io.Reader, bufSize int64) (bool, error) {
|
||||
aHash := sha256.New()
|
||||
bHash := sha256.New()
|
||||
|
||||
if bufSize == 0 {
|
||||
// This buffer size was selected after some extensive benchmarking
|
||||
// (BenchmarkCompareFilesContent) and it showed to provide
|
||||
// a good compromise between processing speed and allocated memory,
|
||||
// especially in the common case of the readers being part of an S3 stored ZIP file.
|
||||
// See https://github.com/mattermost/mattermost/pull/26629 for full context.
|
||||
bufSize = 1024 * 1024 * 2 // 2MB
|
||||
}
|
||||
|
||||
var nA, nB int64
|
||||
var errA, errB error
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var buf []byte
|
||||
// If the reader has a WriteTo method (e.g. *os.File)
|
||||
// we can avoid the buffer allocation.
|
||||
if _, ok := fileA.(io.WriterTo); !ok {
|
||||
buf = make([]byte, bufSize)
|
||||
}
|
||||
nA, errA = io.CopyBuffer(aHash, fileA, buf)
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var buf []byte
|
||||
// If the reader has a WriteTo method (e.g. *os.File)
|
||||
// we can avoid the buffer allocation.
|
||||
if _, ok := fileA.(io.WriterTo); !ok {
|
||||
buf = make([]byte, bufSize)
|
||||
}
|
||||
nB, errB = io.CopyBuffer(bHash, fileB, buf)
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
if errA != nil {
|
||||
return false, fmt.Errorf("failed to compare files: %w", errA)
|
||||
}
|
||||
|
||||
if errB != nil {
|
||||
return false, fmt.Errorf("failed to compare files: %w", errB)
|
||||
}
|
||||
|
||||
if nA != nB {
|
||||
return false, fmt.Errorf("size mismatch: %d != %d", nA, nB)
|
||||
}
|
||||
|
||||
return bytes.Equal(aHash.Sum(nil), bHash.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func (a *App) importAttachment(rctx request.CTX, data *imports.AttachmentImportData, post *model.Post, teamID string, extractContent bool) (*model.FileInfo, *model.AppError) {
|
||||
var (
|
||||
name string
|
||||
file io.Reader
|
||||
name string
|
||||
file io.ReadCloser
|
||||
fileSize int64
|
||||
)
|
||||
if data.Data != nil {
|
||||
zipFile, err := data.Data.Open()
|
||||
@ -1284,7 +1340,8 @@ func (a *App) importAttachment(rctx request.CTX, data *imports.AttachmentImportD
|
||||
}
|
||||
defer zipFile.Close()
|
||||
name = data.Data.Name
|
||||
file = zipFile.(io.Reader)
|
||||
fileSize = int64(data.Data.UncompressedSize64)
|
||||
file = zipFile
|
||||
|
||||
rctx.Logger().Info("Preparing file upload from ZIP", mlog.String("file_name", name), mlog.Uint("file_size", data.Data.UncompressedSize64))
|
||||
} else {
|
||||
@ -1296,58 +1353,81 @@ func (a *App) importAttachment(rctx request.CTX, data *imports.AttachmentImportD
|
||||
name = realFile.Name()
|
||||
file = realFile
|
||||
|
||||
fields := []mlog.Field{mlog.String("file_name", name)}
|
||||
if info, err := realFile.Stat(); err != nil {
|
||||
fields = append(fields, mlog.Int("file_size", info.Size()))
|
||||
info, err := realFile.Stat()
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.file_stat.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest).Wrap(err)
|
||||
}
|
||||
rctx.Logger().Info("Preparing file upload from file system", fields...)
|
||||
fileSize = info.Size()
|
||||
|
||||
rctx.Logger().Info("Preparing file upload from file system", mlog.String("file_name", name), mlog.Int("file_size", info.Size()))
|
||||
}
|
||||
|
||||
timestamp := utils.TimeFromMillis(post.CreateAt)
|
||||
|
||||
fileData, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.read_file_data.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest)
|
||||
}
|
||||
|
||||
// Go over existing files in the post and see if there already exists a file with the same name, size and hash. If so - skip it
|
||||
if post.Id != "" {
|
||||
oldFiles, err := a.getFileInfosForPostIgnoreCloudLimit(rctx, post.Id, true, false)
|
||||
oldFiles, err := a.Srv().Store().FileInfo().GetForPost(post.Id, true, false, true)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.file_upload.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest)
|
||||
}
|
||||
for _, oldFile := range oldFiles {
|
||||
if oldFile.Name != path.Base(name) || oldFile.Size != int64(len(fileData)) {
|
||||
if oldFile.Name != path.Base(name) || oldFile.Size != fileSize {
|
||||
continue
|
||||
}
|
||||
|
||||
// check sha1
|
||||
newHash := sha1.Sum(fileData)
|
||||
oldFileData, err := a.getFileIgnoreCloudLimit(rctx, oldFile.Id)
|
||||
if err != nil {
|
||||
oldFileReader, appErr := a.FileReader(oldFile.Path)
|
||||
if appErr != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.file_upload.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest)
|
||||
}
|
||||
oldHash := sha1.Sum(oldFileData)
|
||||
defer oldFileReader.Close()
|
||||
|
||||
if bytes.Equal(oldHash[:], newHash[:]) {
|
||||
rctx.Logger().Info("Skipping uploading of file because name already exists", mlog.String("file_name", name))
|
||||
if ok, err := compareFilesContent(oldFileReader, file, 0); err != nil {
|
||||
rctx.Logger().Error("Failed to compare files content", mlog.String("file_name", name), mlog.Err(err))
|
||||
} else if ok {
|
||||
rctx.Logger().Info("Skipping uploading of file because name already exists and content matches", mlog.String("file_name", name))
|
||||
return oldFile, nil
|
||||
}
|
||||
|
||||
rctx.Logger().Info("File contents don't match, will re-upload", mlog.String("file_name", name))
|
||||
|
||||
// Since compareFilesContent needs to read the whole file we need to
|
||||
// either seek back (local file) or re-open it (zip file).
|
||||
if f, ok := file.(*os.File); ok {
|
||||
rctx.Logger().Info("File is *os.File, can seek", mlog.String("file_name", name))
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.seek_file.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest).Wrap(err)
|
||||
}
|
||||
} else if data.Data != nil {
|
||||
rctx.Logger().Info("File is from ZIP, can't seek, opening again", mlog.String("file_name", name))
|
||||
file.Close()
|
||||
|
||||
f, err := data.Data.Open()
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("BulkImport", "app.import.attachment.bad_file.error", map[string]any{"FilePath": *data.Path}, "", http.StatusBadRequest).Wrap(err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
file = f
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
rctx.Logger().Info("Uploading file with name", mlog.String("file_name", name))
|
||||
|
||||
fileInfo, appErr := a.DoUploadFile(rctx, timestamp, teamID, post.ChannelId, post.UserId, name, fileData, extractContent)
|
||||
fileInfo, appErr := a.UploadFileX(rctx, post.ChannelId, name, file,
|
||||
UploadFileSetTeamId(teamID),
|
||||
UploadFileSetUserId(post.UserId),
|
||||
UploadFileSetTimestamp(timestamp),
|
||||
UploadFileSetContentLength(fileSize),
|
||||
UploadFileSetExtractContent(extractContent),
|
||||
)
|
||||
if appErr != nil {
|
||||
rctx.Logger().Error("Failed to upload file", mlog.Err(appErr), mlog.String("file_name", name))
|
||||
return nil, appErr
|
||||
}
|
||||
|
||||
if fileInfo.IsImage() && !fileInfo.IsSvg() {
|
||||
a.HandleImages(rctx, []string{fileInfo.PreviewPath}, []string{fileInfo.ThumbnailPath}, [][]byte{fileData})
|
||||
}
|
||||
|
||||
return fileInfo, nil
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ package app
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@ -21,6 +22,7 @@ import (
|
||||
"github.com/mattermost/mattermost/server/v8/channels/testlib"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/utils"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/utils/fileutils"
|
||||
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
|
||||
)
|
||||
|
||||
func TestImportImportScheme(t *testing.T) {
|
||||
@ -4383,6 +4385,85 @@ func TestImportPostAndRepliesWithAttachments(t *testing.T) {
|
||||
assert.Contains(t, attachments[0].Path, "noteam")
|
||||
AssertFileIdsInPost(attachments, th, t)
|
||||
})
|
||||
|
||||
t.Run("import existing post with different attachment's content", func(t *testing.T) {
|
||||
tmpDir := os.TempDir()
|
||||
filePath := filepath.Join(tmpDir, "test_diff.png")
|
||||
|
||||
t.Run("different size", func(t *testing.T) {
|
||||
testImage := filepath.Join(testsDir, "test.png")
|
||||
imageData, err := os.ReadFile(testImage)
|
||||
require.NoError(t, err)
|
||||
err = os.WriteFile(filePath, imageData, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &filePath}}
|
||||
data.Post.Replies = nil
|
||||
data.Post.Message = model.NewString("new post")
|
||||
errLine, appErr := th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, appErr)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments := GetAttachments(user3.Id, th, t)
|
||||
require.Len(t, attachments, 2)
|
||||
assert.Contains(t, attachments[1].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments[1:], th, t)
|
||||
|
||||
testImage = filepath.Join(testsDir, "test-data-graph.png")
|
||||
imageData, err = os.ReadFile(testImage)
|
||||
require.NoError(t, err)
|
||||
err = os.WriteFile(filePath, imageData, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &filePath}}
|
||||
data.Post.Replies = nil
|
||||
errLine, appErr = th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, appErr)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments2 := GetAttachments(user3.Id, th, t)
|
||||
require.NotEqual(t, attachments, attachments2)
|
||||
require.Len(t, attachments2, 2)
|
||||
assert.Contains(t, attachments2[1].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments2[1:], th, t)
|
||||
})
|
||||
|
||||
t.Run("same size", func(t *testing.T) {
|
||||
imageData, err := os.ReadFile(filepath.Join(testsDir, "test_img_diff_A.png"))
|
||||
require.NoError(t, err)
|
||||
err = os.WriteFile(filePath, imageData, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &filePath}}
|
||||
data.Post.Replies = nil
|
||||
data.Post.Message = model.NewString("new post2")
|
||||
errLine, appErr := th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, appErr)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments := GetAttachments(user3.Id, th, t)
|
||||
require.Len(t, attachments, 3)
|
||||
assert.Contains(t, attachments[2].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments[2:], th, t)
|
||||
|
||||
imageData, err = os.ReadFile(filepath.Join(testsDir, "test_img_diff_B.png"))
|
||||
require.NoError(t, err)
|
||||
err = os.WriteFile(filePath, imageData, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &filePath}}
|
||||
data.Post.Replies = nil
|
||||
errLine, appErr = th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, appErr)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments2 := GetAttachments(user3.Id, th, t)
|
||||
require.NotEqual(t, attachments, attachments2)
|
||||
require.Len(t, attachments2, 3)
|
||||
assert.Contains(t, attachments2[2].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments2[2:], th, t)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestImportDirectPostWithAttachments(t *testing.T) {
|
||||
@ -4574,7 +4655,6 @@ func TestZippedImportPostAndRepliesWithAttachments(t *testing.T) {
|
||||
|
||||
require.NotEmpty(t, testZipReader.File)
|
||||
imageData := testZipReader.File[0]
|
||||
require.NoError(t, err, "failed to copy test Image file into zip")
|
||||
|
||||
testMarkDown := filepath.Join(testsDir, "test-attachments.md")
|
||||
data := imports.LineImportWorkerData{
|
||||
@ -4662,4 +4742,327 @@ func TestZippedImportPostAndRepliesWithAttachments(t *testing.T) {
|
||||
assert.Contains(t, attachments[0].Path, "noteam")
|
||||
AssertFileIdsInPost(attachments, th, t)
|
||||
})
|
||||
|
||||
t.Run("import existing post with different attachment's content", func(t *testing.T) {
|
||||
var fileA, fileB *zip.File
|
||||
for _, f := range testZipReader.File {
|
||||
if f.Name == "data/test_img_diff_A.png" {
|
||||
fileA = f
|
||||
} else if f.Name == "data/test_img_diff_B.png" {
|
||||
fileB = f
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, fileA)
|
||||
require.NotNil(t, fileB)
|
||||
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &fileA.Name, Data: fileA}}
|
||||
data.Post.Message = model.NewString("new post")
|
||||
data.Post.Replies = nil
|
||||
errLine, err := th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments := GetAttachments(user3.Id, th, t)
|
||||
require.Len(t, attachments, 2)
|
||||
assert.Contains(t, attachments[1].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments[1:], th, t)
|
||||
|
||||
fileB.Name = fileA.Name
|
||||
data.Post.Attachments = &[]imports.AttachmentImportData{{Path: &fileA.Name, Data: fileB}}
|
||||
errLine, err = th.App.importMultiplePostLines(th.Context, []imports.LineImportWorkerData{data}, false, true)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 0, errLine)
|
||||
|
||||
attachments = GetAttachments(user3.Id, th, t)
|
||||
require.Len(t, attachments, 2)
|
||||
assert.Contains(t, attachments[1].Path, team.Id)
|
||||
AssertFileIdsInPost(attachments[1:], th, t)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCompareFilesContent(t *testing.T) {
|
||||
t.Run("empty", func(t *testing.T) {
|
||||
ok, err := compareFilesContent(strings.NewReader(""), strings.NewReader(""), 0)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
})
|
||||
|
||||
t.Run("no match", func(t *testing.T) {
|
||||
ok, err := compareFilesContent(strings.NewReader("fileA"), strings.NewReader("fileB"), 0)
|
||||
require.NoError(t, err)
|
||||
require.False(t, ok)
|
||||
})
|
||||
|
||||
t.Run("match", func(t *testing.T) {
|
||||
ok, err := compareFilesContent(strings.NewReader("fileA"), strings.NewReader("fileA"), 0)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkCompareFilesContent(b *testing.B) {
|
||||
tmpDir := os.TempDir()
|
||||
fileAPath := filepath.Join(tmpDir, "fileA")
|
||||
fileBPath := filepath.Join(tmpDir, "fileB")
|
||||
|
||||
fileA, err := os.Create(fileAPath)
|
||||
require.NoError(b, err)
|
||||
defer fileA.Close()
|
||||
defer os.Remove(fileAPath)
|
||||
|
||||
fileB, err := os.Create(fileBPath)
|
||||
require.NoError(b, err)
|
||||
defer fileB.Close()
|
||||
defer os.Remove(fileBPath)
|
||||
|
||||
fileSize := int64(1024 * 1024 * 1024) // 1GB
|
||||
|
||||
err = fileA.Truncate(fileSize)
|
||||
require.NoError(b, err)
|
||||
err = fileB.Truncate(fileSize)
|
||||
require.NoError(b, err)
|
||||
|
||||
bufSizesMap := map[string]int64{
|
||||
"32KB": 1024 * 32, // current default of io.Copy
|
||||
"128KB": 1024 * 128,
|
||||
"1MB": 1024 * 1024,
|
||||
"2MB": 1024 * 1024 * 2,
|
||||
"4MB": 1024 * 1024 * 4,
|
||||
"8MB": 1024 * 1024 * 8,
|
||||
}
|
||||
|
||||
fileSizesMap := map[string]int64{
|
||||
"512KB": 1024 * 512,
|
||||
"1MB": 1024 * 1024,
|
||||
"10MB": 1024 * 1024 * 10,
|
||||
"100MB": 1024 * 1024 * 100,
|
||||
"1GB": 1024 * 1024 * 1000,
|
||||
}
|
||||
|
||||
// To force order
|
||||
bufSizeLabels := []string{"32KB", "128KB", "1MB", "2MB", "4MB", "8MB"}
|
||||
fileSizeLabels := []string{"512KB", "1MB", "10MB", "100MB", "1GB"}
|
||||
|
||||
b.Run("plain", func(b *testing.B) {
|
||||
b.Run("local", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.StopTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := fileA.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
_, err = fileB.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
|
||||
b.StartTimer()
|
||||
ok, err := compareFilesContent(fileA, fileB, 0)
|
||||
b.StopTimer()
|
||||
require.NoError(b, err)
|
||||
require.True(b, ok)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("s3", func(b *testing.B) {
|
||||
th := SetupConfig(b, func(cfg *model.Config) {
|
||||
cfg.FileSettings = model.FileSettings{
|
||||
DriverName: model.NewString(model.ImageDriverS3),
|
||||
AmazonS3AccessKeyId: model.NewString(model.MinioAccessKey),
|
||||
AmazonS3SecretAccessKey: model.NewString(model.MinioSecretKey),
|
||||
AmazonS3Bucket: model.NewString("comparefilescontentbucket"),
|
||||
AmazonS3Endpoint: model.NewString("localhost:9000"),
|
||||
AmazonS3Region: model.NewString(""),
|
||||
AmazonS3PathPrefix: model.NewString(""),
|
||||
AmazonS3SSL: model.NewBool(false),
|
||||
AmazonS3RequestTimeoutMilliseconds: model.NewInt64(300 * 1000),
|
||||
}
|
||||
})
|
||||
defer th.TearDown()
|
||||
|
||||
err := th.App.Srv().FileBackend().(*filestore.S3FileBackend).TestConnection()
|
||||
require.NoError(b, err)
|
||||
|
||||
_, err = fileA.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
_, err = fileB.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
|
||||
_, appErr := th.App.WriteFile(fileA, "compareFileA")
|
||||
require.Nil(b, appErr)
|
||||
defer th.App.RemoveFile("compareFileA")
|
||||
|
||||
_, appErr = th.App.WriteFile(fileB, "compareFileB")
|
||||
require.Nil(b, appErr)
|
||||
defer th.App.RemoveFile("compareFileB")
|
||||
|
||||
rdA, appErr := th.App.FileReader("compareFileA")
|
||||
require.Nil(b, appErr)
|
||||
defer rdA.Close()
|
||||
|
||||
rdB, appErr := th.App.FileReader("compareFileB")
|
||||
require.Nil(b, appErr)
|
||||
defer rdB.Close()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for _, fileSizeLabel := range fileSizeLabels {
|
||||
fileSize := fileSizesMap[fileSizeLabel]
|
||||
for _, bufSizeLabel := range bufSizeLabels {
|
||||
bufSize := bufSizesMap[bufSizeLabel]
|
||||
b.Run("bufSize-fileSize"+fileSizeLabel+"-bufSize"+bufSizeLabel, func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.StopTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := rdA.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
_, err = rdB.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
|
||||
b.StartTimer()
|
||||
ok, err := compareFilesContent(&io.LimitedReader{
|
||||
R: rdA,
|
||||
N: fileSize,
|
||||
}, &io.LimitedReader{
|
||||
R: rdB,
|
||||
N: fileSize,
|
||||
}, bufSize)
|
||||
b.StopTimer()
|
||||
require.NoError(b, err)
|
||||
require.True(b, ok)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
b.Run("zip", func(b *testing.B) {
|
||||
zipFilePath := filepath.Join(tmpDir, "compareFiles.zip")
|
||||
zipFile, err := os.Create(zipFilePath)
|
||||
require.NoError(b, err)
|
||||
defer zipFile.Close()
|
||||
defer os.Remove(zipFilePath)
|
||||
|
||||
zipWr := zip.NewWriter(zipFile)
|
||||
|
||||
fileAZipWr, err := zipWr.CreateHeader(&zip.FileHeader{
|
||||
Name: "compareFileA",
|
||||
Method: zip.Store,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
_, err = io.Copy(fileAZipWr, fileA)
|
||||
require.NoError(b, err)
|
||||
|
||||
fileBZipWr, err := zipWr.CreateHeader(&zip.FileHeader{
|
||||
Name: "compareFileB",
|
||||
Method: zip.Store,
|
||||
})
|
||||
require.NoError(b, err)
|
||||
_, err = io.Copy(fileBZipWr, fileB)
|
||||
require.NoError(b, err)
|
||||
|
||||
err = zipWr.Close()
|
||||
require.NoError(b, err)
|
||||
|
||||
info, err := zipFile.Stat()
|
||||
require.NoError(b, err)
|
||||
|
||||
zipFileSize := info.Size()
|
||||
|
||||
b.Run("local", func(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
|
||||
for _, label := range bufSizeLabels {
|
||||
bufSize := bufSizesMap[label]
|
||||
b.Run("bufSize-"+label, func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.StopTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := zipFile.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
zipRd, err := zip.NewReader(zipFile, zipFileSize)
|
||||
require.NoError(b, err)
|
||||
|
||||
zipFileA, err := zipRd.Open("compareFileA")
|
||||
require.NoError(b, err)
|
||||
|
||||
zipFileB, err := zipRd.Open("compareFileB")
|
||||
require.NoError(b, err)
|
||||
|
||||
b.StartTimer()
|
||||
ok, err := compareFilesContent(zipFileA, zipFileB, bufSize)
|
||||
b.StopTimer()
|
||||
require.NoError(b, err)
|
||||
require.True(b, ok)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("s3", func(b *testing.B) {
|
||||
th := SetupConfig(b, func(cfg *model.Config) {
|
||||
cfg.FileSettings = model.FileSettings{
|
||||
DriverName: model.NewString(model.ImageDriverS3),
|
||||
AmazonS3AccessKeyId: model.NewString(model.MinioAccessKey),
|
||||
AmazonS3SecretAccessKey: model.NewString(model.MinioSecretKey),
|
||||
AmazonS3Bucket: model.NewString("comparefilescontentbucket"),
|
||||
AmazonS3Endpoint: model.NewString("localhost:9000"),
|
||||
AmazonS3Region: model.NewString(""),
|
||||
AmazonS3PathPrefix: model.NewString(""),
|
||||
AmazonS3SSL: model.NewBool(false),
|
||||
AmazonS3RequestTimeoutMilliseconds: model.NewInt64(300 * 1000),
|
||||
}
|
||||
})
|
||||
defer th.TearDown()
|
||||
|
||||
err := th.App.Srv().FileBackend().(*filestore.S3FileBackend).TestConnection()
|
||||
require.NoError(b, err)
|
||||
|
||||
_, appErr := th.App.WriteFile(zipFile, "compareFiles.zip")
|
||||
require.Nil(b, appErr)
|
||||
defer th.App.RemoveFile("compareFiles.zip")
|
||||
|
||||
zipFileRd, appErr := th.App.FileReader("compareFiles.zip")
|
||||
require.Nil(b, appErr)
|
||||
defer zipFileRd.Close()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
for _, fileSizeLabel := range fileSizeLabels {
|
||||
fileSize := fileSizesMap[fileSizeLabel]
|
||||
for _, bufSizeLabel := range bufSizeLabels {
|
||||
bufSize := bufSizesMap[bufSizeLabel]
|
||||
b.Run("bufSize-fileSize"+fileSizeLabel+"-bufSize"+bufSizeLabel, func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.StopTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := zipFileRd.Seek(0, io.SeekStart)
|
||||
require.NoError(b, err)
|
||||
zipRd, err := zip.NewReader(zipFileRd.(io.ReaderAt), zipFileSize)
|
||||
require.NoError(b, err)
|
||||
|
||||
zipFileA, err := zipRd.Open("compareFileA")
|
||||
require.NoError(b, err)
|
||||
|
||||
zipFileB, err := zipRd.Open("compareFileB")
|
||||
require.NoError(b, err)
|
||||
|
||||
b.StartTimer()
|
||||
ok, err := compareFilesContent(&io.LimitedReader{
|
||||
R: zipFileA,
|
||||
N: fileSize,
|
||||
}, &io.LimitedReader{
|
||||
R: zipFileB,
|
||||
N: fileSize,
|
||||
}, bufSize)
|
||||
b.StopTimer()
|
||||
require.NoError(b, err)
|
||||
require.True(b, ok)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -267,12 +267,16 @@ func TestImportProcessImportDataFileVersionLine(t *testing.T) {
|
||||
}
|
||||
|
||||
func GetAttachments(userID string, th *TestHelper, t *testing.T) []*model.FileInfo {
|
||||
t.Helper()
|
||||
|
||||
fileInfos, err := th.App.Srv().Store().FileInfo().GetForUser(userID)
|
||||
require.NoError(t, err)
|
||||
return fileInfos
|
||||
}
|
||||
|
||||
func AssertFileIdsInPost(files []*model.FileInfo, th *TestHelper, t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
postID := files[0].PostId
|
||||
require.NotNil(t, postID)
|
||||
|
||||
|
@ -1782,17 +1782,6 @@ func (a *App) GetFileInfosForPost(rctx request.CTX, postID string, fromMaster bo
|
||||
return fileInfos, firstInaccessibleFileTime, nil
|
||||
}
|
||||
|
||||
func (a *App) getFileInfosForPostIgnoreCloudLimit(rctx request.CTX, postID string, fromMaster bool, includeDeleted bool) ([]*model.FileInfo, *model.AppError) {
|
||||
fileInfos, err := a.Srv().Store().FileInfo().GetForPost(postID, fromMaster, includeDeleted, true)
|
||||
if err != nil {
|
||||
return nil, model.NewAppError("getFileInfosForPostIgnoreCloudLimit", "app.file_info.get_for_post.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
}
|
||||
|
||||
a.generateMiniPreviewForInfos(rctx, fileInfos)
|
||||
|
||||
return fileInfos, nil
|
||||
}
|
||||
|
||||
func (a *App) PostWithProxyAddedToImageURLs(post *model.Post) *model.Post {
|
||||
if f := a.ImageProxyAdder(); f != nil {
|
||||
return post.WithRewrittenImageURLs(f)
|
||||
|
@ -401,7 +401,7 @@ func (s *MmctlE2ETestSuite) TestImportValidateCmdF() {
|
||||
s.Require().Equal(struct {
|
||||
UnusedAttachments []string `json:"unused_attachments"`
|
||||
}{
|
||||
UnusedAttachments: []string{"data/test2.png"},
|
||||
UnusedAttachments: []string{"data/test2.png", "data/test_img_diff_A.png", "data/test_img_diff_B.png"},
|
||||
}, printer.GetLines()[1].(struct {
|
||||
UnusedAttachments []string `json:"unused_attachments"`
|
||||
}))
|
||||
|
@ -5410,13 +5410,17 @@
|
||||
"id": "app.import.attachment.bad_file.error",
|
||||
"translation": "Error reading the file at: \"{{.FilePath}}\""
|
||||
},
|
||||
{
|
||||
"id": "app.import.attachment.file_stat.error",
|
||||
"translation": "Error reading the file status: \"{{.FilePath}}\""
|
||||
},
|
||||
{
|
||||
"id": "app.import.attachment.file_upload.error",
|
||||
"translation": "Error uploading the file: \"{{.FilePath}}\""
|
||||
},
|
||||
{
|
||||
"id": "app.import.attachment.read_file_data.error",
|
||||
"translation": "Failed to read file attachment during import."
|
||||
"id": "app.import.attachment.seek_file.error",
|
||||
"translation": "Error seeking the file: \"{{.FilePath}}\""
|
||||
},
|
||||
{
|
||||
"id": "app.import.bulk_import.file_scan.error",
|
||||
|
Binary file not shown.
BIN
server/tests/test_img_diff_A.png
Normal file
BIN
server/tests/test_img_diff_A.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 285 B |
BIN
server/tests/test_img_diff_B.png
Normal file
BIN
server/tests/test_img_diff_B.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 285 B |
Loading…
Reference in New Issue
Block a user