[MM-31247] Implement job to delete export files (#16793)

* Include filepaths for post attachments

* Cleanup

* Enable exporting file attachments

* Fix file import

* Enable zip export

* Support creating missing directories when unzipping

* Add test

* Add translations

* Export direct channel posts attachments

* Fix returned values order

Remove pointer to slice in return

* [MM-31597] Implement export process job (#16626)

* Implement export process job

* Add translations

* Remove unused value

* [MM-31249] Add /exports API endpoint (#16633)

* Implement API endpoints to list, download and delete export files

* Add endpoint for single resource

* Update i18n/en.json

Co-authored-by: Ibrahim Serdar Acikgoz <serdaracikgoz86@gmail.com>

* Update i18n/en.json

Co-authored-by: Ibrahim Serdar Acikgoz <serdaracikgoz86@gmail.com>

Co-authored-by: Ibrahim Serdar Acikgoz <serdaracikgoz86@gmail.com>

* Implement job to delete export files

* Fix app layers

* Fix typo

Co-authored-by: Ibrahim Serdar Acikgoz <serdaracikgoz86@gmail.com>
This commit is contained in:
Claudio Costa
2021-02-16 09:21:56 +01:00
committed by GitHub
parent cdd408b60e
commit ee5668b834
12 changed files with 243 additions and 1 deletions

View File

@@ -122,6 +122,13 @@ func (a *App) initJobs() {
if jobsImportDeleteInterface != nil {
a.srv.Jobs.ImportDelete = jobsImportDeleteInterface(a)
}
if jobsExportDeleteInterface != nil {
a.srv.Jobs.ExportDelete = jobsExportDeleteInterface(a)
}
if jobsExportProcessInterface != nil {
a.srv.Jobs.ExportProcess = jobsExportProcessInterface(a)
}
if jobsExportProcessInterface != nil {
a.srv.Jobs.ExportProcess = jobsExportProcessInterface(a)

View File

@@ -126,6 +126,12 @@ func RegisterJobsExportProcessInterface(f func(*App) tjobs.ExportProcessInterfac
jobsExportProcessInterface = f
}
var jobsExportDeleteInterface func(*App) tjobs.ExportDeleteInterface
func RegisterJobsExportDeleteInterface(f func(*App) tjobs.ExportDeleteInterface) {
jobsExportDeleteInterface = f
}
var productNoticesJobInterface func(*App) tjobs.ProductNoticesJobInterface
func RegisterProductNoticesJobInterface(f func(*App) tjobs.ProductNoticesJobInterface) {

View File

@@ -4096,7 +4096,7 @@
},
{
"id": "app.export.export_attachment.create_file.error",
"translation": "Failed to created file during export."
"translation": "Failed to create file during export."
},
{
"id": "app.export.export_attachment.mkdirall.error",

View File

@@ -30,4 +30,7 @@ import (
// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty.
_ "github.com/mattermost/mattermost-server/v5/jobs/export_process"
// This is a placeholder so this package can be imported in Team Edition when it will be otherwise empty.
_ "github.com/mattermost/mattermost-server/v5/jobs/export_delete"
)

View File

@@ -0,0 +1,51 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package export_delete
import (
"time"
"github.com/mattermost/mattermost-server/v5/app"
"github.com/mattermost/mattermost-server/v5/model"
)
const (
jobName = "ExportDelete"
schedFrequency = 24 * time.Hour
)
type Scheduler struct {
app *app.App
}
func (i *ExportDeleteInterfaceImpl) MakeScheduler() model.Scheduler {
return &Scheduler{i.app}
}
func (scheduler *Scheduler) Name() string {
return jobName + "Scheduler"
}
func (scheduler *Scheduler) JobType() string {
return model.JOB_TYPE_EXPORT_DELETE
}
func (scheduler *Scheduler) Enabled(cfg *model.Config) bool {
return *cfg.ExportSettings.Directory != "" && *cfg.ExportSettings.RetentionDays > 0
}
func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, pendingJobs bool, lastSuccessfulJob *model.Job) *time.Time {
nextTime := time.Now().Add(schedFrequency)
return &nextTime
}
func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
data := map[string]string{}
job, err := scheduler.app.Srv().Jobs.CreateJob(model.JOB_TYPE_EXPORT_DELETE, data)
if err != nil {
return nil, err
}
return job, nil
}

View File

@@ -0,0 +1,137 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package export_delete
import (
"path/filepath"
"time"
"github.com/mattermost/mattermost-server/v5/app"
"github.com/mattermost/mattermost-server/v5/jobs"
tjobs "github.com/mattermost/mattermost-server/v5/jobs/interfaces"
"github.com/mattermost/mattermost-server/v5/mlog"
"github.com/mattermost/mattermost-server/v5/model"
)
func init() {
app.RegisterJobsExportDeleteInterface(func(a *app.App) tjobs.ExportDeleteInterface {
return &ExportDeleteInterfaceImpl{a}
})
}
type ExportDeleteInterfaceImpl struct {
app *app.App
}
type ExportDeleteWorker struct {
name string
stopChan chan struct{}
stoppedChan chan struct{}
jobsChan chan model.Job
jobServer *jobs.JobServer
app *app.App
}
func (i *ExportDeleteInterfaceImpl) MakeWorker() model.Worker {
return &ExportDeleteWorker{
name: "ExportDelete",
stopChan: make(chan struct{}),
stoppedChan: make(chan struct{}),
jobsChan: make(chan model.Job),
jobServer: i.app.Srv().Jobs,
app: i.app,
}
}
func (w *ExportDeleteWorker) JobChannel() chan<- model.Job {
return w.jobsChan
}
func (w *ExportDeleteWorker) Run() {
mlog.Debug("Worker started", mlog.String("worker", w.name))
defer func() {
mlog.Debug("Worker finished", mlog.String("worker", w.name))
close(w.stoppedChan)
}()
for {
select {
case <-w.stopChan:
mlog.Debug("Worker received stop signal", mlog.String("worker", w.name))
return
case job := <-w.jobsChan:
mlog.Debug("Worker received a new candidate job.", mlog.String("worker", w.name))
w.doJob(&job)
}
}
}
func (w *ExportDeleteWorker) Stop() {
mlog.Debug("Worker stopping", mlog.String("worker", w.name))
close(w.stopChan)
<-w.stoppedChan
}
func (w *ExportDeleteWorker) doJob(job *model.Job) {
if claimed, err := w.jobServer.ClaimJob(job); err != nil {
mlog.Warn("Worker experienced an error while trying to claim job",
mlog.String("worker", w.name),
mlog.String("job_id", job.Id),
mlog.String("error", err.Error()))
return
} else if !claimed {
return
}
exportPath := *w.app.Config().ExportSettings.Directory
retentionTime := time.Duration(*w.app.Config().ExportSettings.RetentionDays) * 24 * time.Hour
exports, appErr := w.app.ListDirectory(exportPath)
if appErr != nil {
w.setJobError(job, appErr)
return
}
var hasErrs bool
for i := range exports {
filename := filepath.Base(exports[i])
modTime, appErr := w.app.FileModTime(filepath.Join(exportPath, filename))
if appErr != nil {
mlog.Debug("Worker: Failed to get file modification time",
mlog.Err(appErr), mlog.String("export", exports[i]))
hasErrs = true
continue
}
if time.Now().After(modTime.Add(retentionTime)) {
// remove file data from storage.
if appErr := w.app.RemoveFile(exports[i]); appErr != nil {
mlog.Debug("Worker: Failed to remove file",
mlog.Err(appErr), mlog.String("export", exports[i]))
hasErrs = true
continue
}
}
}
if hasErrs {
mlog.Warn("Worker: errors occurred")
}
mlog.Info("Worker: Job is complete", mlog.String("worker", w.name), mlog.String("job_id", job.Id))
w.setJobSuccess(job)
}
func (w *ExportDeleteWorker) setJobSuccess(job *model.Job) {
if err := w.app.Srv().Jobs.SetJobSuccess(job); err != nil {
mlog.Error("Worker: Failed to set success for job", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
w.setJobError(job, err)
}
}
func (w *ExportDeleteWorker) setJobError(job *model.Job, appError *model.AppError) {
if err := w.app.Srv().Jobs.SetJobError(job, appError); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("worker", w.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
}

View File

@@ -0,0 +1,11 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package interfaces
import "github.com/mattermost/mattermost-server/v5/model"
type ExportDeleteInterface interface {
MakeWorker() model.Worker
MakeScheduler() model.Scheduler
}

View File

@@ -170,6 +170,13 @@ func (watcher *Watcher) PollAndNotify() {
default:
}
}
} else if job.Type == model.JOB_TYPE_EXPORT_DELETE {
if watcher.workers.ExportDelete != nil {
select {
case watcher.workers.ExportDelete.JobChannel() <- *job:
default:
}
}
} else if job.Type == model.JOB_TYPE_CLOUD {
if watcher.workers.Cloud != nil {
select {

View File

@@ -82,6 +82,10 @@ func (srv *JobServer) InitSchedulers() *Schedulers {
schedulers.schedulers = append(schedulers.schedulers, importDeleteInterface.MakeScheduler())
}
if exportDeleteInterface := srv.ExportDelete; exportDeleteInterface != nil {
schedulers.schedulers = append(schedulers.schedulers, exportDeleteInterface.MakeScheduler())
}
schedulers.nextRunTimes = make([]*time.Time, len(schedulers.schedulers))
return schedulers
}

View File

@@ -33,6 +33,7 @@ type JobServer struct {
ImportProcess tjobs.ImportProcessInterface
ImportDelete tjobs.ImportDeleteInterface
ExportProcess tjobs.ExportProcessInterface
ExportDelete tjobs.ExportDeleteInterface
Cloud ejobs.CloudJobInterface
}

View File

@@ -30,6 +30,7 @@ type Workers struct {
ImportProcess model.Worker
ImportDelete model.Worker
ExportProcess model.Worker
ExportDelete model.Worker
Cloud model.Worker
listenerId string
@@ -97,6 +98,10 @@ func (srv *JobServer) InitWorkers() *Workers {
workers.ExportProcess = exportProcessInterface.MakeWorker()
}
if exportDeleteInterface := srv.ExportDelete; exportDeleteInterface != nil {
workers.ExportDelete = exportDeleteInterface.MakeWorker()
}
if cloudInterface := srv.Cloud; cloudInterface != nil {
workers.Cloud = cloudInterface.MakeWorker()
}
@@ -164,6 +169,10 @@ func (workers *Workers) Start() *Workers {
go workers.ExportProcess.Run()
}
if workers.ExportDelete != nil {
go workers.ExportDelete.Run()
}
if workers.Cloud != nil {
go workers.Cloud.Run()
}
@@ -289,6 +298,10 @@ func (workers *Workers) Stop() *Workers {
workers.ExportProcess.Stop()
}
if workers.ExportDelete != nil {
workers.ExportDelete.Stop()
}
if workers.Cloud != nil {
workers.Cloud.Stop()
}

View File

@@ -25,6 +25,7 @@ const (
JOB_TYPE_IMPORT_PROCESS = "import_process"
JOB_TYPE_IMPORT_DELETE = "import_delete"
JOB_TYPE_EXPORT_PROCESS = "export_process"
JOB_TYPE_EXPORT_DELETE = "export_delete"
JOB_TYPE_CLOUD = "cloud"
JOB_STATUS_PENDING = "pending"
@@ -72,6 +73,7 @@ func (j *Job) IsValid() *AppError {
case JOB_TYPE_IMPORT_PROCESS:
case JOB_TYPE_IMPORT_DELETE:
case JOB_TYPE_EXPORT_PROCESS:
case JOB_TYPE_EXPORT_DELETE:
case JOB_TYPE_CLOUD:
default:
return NewAppError("Job.IsValid", "model.job.is_valid.type.app_error", nil, "id="+j.Id, http.StatusBadRequest)