mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
168 lines
5.3 KiB
Go
168 lines
5.3 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See License.txt for license information.
|
|
|
|
package app
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/mattermost/mattermost-server/model"
|
|
)
|
|
|
|
func (a *App) bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) {
|
|
for line := range lines {
|
|
if err := a.ImportLine(line.LineImportData, dryRun); err != nil {
|
|
errors <- LineImportWorkerError{err, line.LineNumber}
|
|
}
|
|
}
|
|
wg.Done()
|
|
}
|
|
|
|
func (a *App) BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) {
|
|
scanner := bufio.NewScanner(fileReader)
|
|
lineNumber := 0
|
|
|
|
a.Srv.Store.LockToMaster()
|
|
defer a.Srv.Store.UnlockFromMaster()
|
|
|
|
errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely.
|
|
var wg sync.WaitGroup
|
|
var linesChan chan LineImportWorkerData
|
|
lastLineType := ""
|
|
|
|
for scanner.Scan() {
|
|
decoder := json.NewDecoder(strings.NewReader(scanner.Text()))
|
|
lineNumber++
|
|
|
|
var line LineImportData
|
|
if err := decoder.Decode(&line); err != nil {
|
|
return model.NewAppError("BulkImport", "app.import.bulk_import.json_decode.error", nil, err.Error(), http.StatusBadRequest), lineNumber
|
|
} else {
|
|
if lineNumber == 1 {
|
|
importDataFileVersion, apperr := processImportDataFileVersionLine(line)
|
|
if apperr != nil {
|
|
return apperr, lineNumber
|
|
}
|
|
|
|
if importDataFileVersion != 1 {
|
|
return model.NewAppError("BulkImport", "app.import.bulk_import.unsupported_version.error", nil, "", http.StatusBadRequest), lineNumber
|
|
}
|
|
} else {
|
|
if line.Type != lastLineType {
|
|
if lastLineType != "" {
|
|
// Changing type. Clear out the worker queue before continuing.
|
|
close(linesChan)
|
|
wg.Wait()
|
|
|
|
// Check no errors occurred while waiting for the queue to empty.
|
|
if len(errorsChan) != 0 {
|
|
err := <-errorsChan
|
|
return err.Error, err.LineNumber
|
|
}
|
|
}
|
|
|
|
// Set up the workers and channel for this type.
|
|
lastLineType = line.Type
|
|
linesChan = make(chan LineImportWorkerData, workers)
|
|
for i := 0; i < workers; i++ {
|
|
wg.Add(1)
|
|
go a.bulkImportWorker(dryRun, &wg, linesChan, errorsChan)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case linesChan <- LineImportWorkerData{line, lineNumber}:
|
|
case err := <-errorsChan:
|
|
close(linesChan)
|
|
wg.Wait()
|
|
return err.Error, err.LineNumber
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// No more lines. Clear out the worker queue before continuing.
|
|
close(linesChan)
|
|
wg.Wait()
|
|
|
|
// Check no errors occurred while waiting for the queue to empty.
|
|
if len(errorsChan) != 0 {
|
|
err := <-errorsChan
|
|
return err.Error, err.LineNumber
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return model.NewAppError("BulkImport", "app.import.bulk_import.file_scan.error", nil, err.Error(), http.StatusInternalServerError), 0
|
|
}
|
|
|
|
return nil, 0
|
|
}
|
|
|
|
func processImportDataFileVersionLine(line LineImportData) (int, *model.AppError) {
|
|
if line.Type != "version" || line.Version == nil {
|
|
return -1, model.NewAppError("BulkImport", "app.import.process_import_data_file_version_line.invalid_version.error", nil, "", http.StatusBadRequest)
|
|
}
|
|
|
|
return *line.Version, nil
|
|
}
|
|
|
|
func (a *App) ImportLine(line LineImportData, dryRun bool) *model.AppError {
|
|
switch {
|
|
case line.Type == "scheme":
|
|
if line.Scheme == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_scheme.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportScheme(line.Scheme, dryRun)
|
|
}
|
|
case line.Type == "team":
|
|
if line.Team == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_team.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportTeam(line.Team, dryRun)
|
|
}
|
|
case line.Type == "channel":
|
|
if line.Channel == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_channel.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportChannel(line.Channel, dryRun)
|
|
}
|
|
case line.Type == "user":
|
|
if line.User == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_user.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportUser(line.User, dryRun)
|
|
}
|
|
case line.Type == "post":
|
|
if line.Post == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_post.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportPost(line.Post, dryRun)
|
|
}
|
|
case line.Type == "direct_channel":
|
|
if line.DirectChannel == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_direct_channel.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportDirectChannel(line.DirectChannel, dryRun)
|
|
}
|
|
case line.Type == "direct_post":
|
|
if line.DirectPost == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_direct_post.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportDirectPost(line.DirectPost, dryRun)
|
|
}
|
|
case line.Type == "emoji":
|
|
if line.Emoji == nil {
|
|
return model.NewAppError("BulkImport", "app.import.import_line.null_emoji.error", nil, "", http.StatusBadRequest)
|
|
} else {
|
|
return a.ImportEmoji(line.Emoji, dryRun)
|
|
}
|
|
default:
|
|
return model.NewAppError("BulkImport", "app.import.import_line.unknown_line_type.error", map[string]interface{}{"Type": line.Type}, "", http.StatusBadRequest)
|
|
}
|
|
}
|