Rebuild channel index (#26080)

* WIP

* Added rebuild channels index functionality

* Added rough logic to send message to all sysadmins

* WIP

* WIP

* WIP

* Cleanup

* i18n fix

* reading through all pages of system admins

* Fixed webapp style

* i18n fix

* Added help text

* i18n fix

* i18n update

* Updated system console button action

* Updated snapshots

* some cleanup

* Updated snapshot

* Update server/channels/app/server.go

Co-authored-by: Daniel Espino García <larkox@gmail.com>

* fixed typo

* Refactoring to improve readibility

* moved index check to API later during config update

* Added some docs

* Updated get system bot

---------

Co-authored-by: Daniel Espino García <larkox@gmail.com>
This commit is contained in:
Harshil Sharma 2024-03-13 10:26:06 +05:30 committed by GitHub
parent ccc00b9077
commit 774df37464
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 493 additions and 117 deletions

View File

@ -167,6 +167,15 @@ func updateConfig(c *Context, w http.ResponseWriter, r *http.Request) {
}
}
// if ES autocomplete was enabled, we need to make sure that index has been checked.
// we need to stop enabling ES autocomplete otherwise.
if !*appCfg.ElasticsearchSettings.EnableAutocomplete && *cfg.ElasticsearchSettings.EnableAutocomplete {
if !c.App.SearchEngine().ElasticsearchEngine.IsAutocompletionEnabled() {
c.Err = model.NewAppError("updateConfig", "api.config.update.elasticsearch.autocomplete_cannot_be_enabled_error", nil, "", http.StatusBadRequest)
return
}
}
c.App.HandleMessageExportConfig(cfg, appCfg)
if appErr := cfg.IsValid(); appErr != nil {

View File

@ -71,7 +71,8 @@ func purgeElasticsearchIndexes(c *Context, w http.ResponseWriter, r *http.Reques
return
}
if err := c.App.PurgeElasticsearchIndexes(c.AppContext); err != nil {
specifiedIndexesQuery := r.URL.Query()["index"]
if err := c.App.PurgeElasticsearchIndexes(c.AppContext, specifiedIndexesQuery); err != nil {
c.Err = err
return
}

View File

@ -430,6 +430,7 @@ type AppIface interface {
AddDirectChannels(c request.CTX, teamID string, user *model.User) *model.AppError
AddLdapPrivateCertificate(fileData *multipart.FileHeader) *model.AppError
AddLdapPublicCertificate(fileData *multipart.FileHeader) *model.AppError
AddLicenseListener(listener func(oldLicense, newLicense *model.License)) string
AddRemoteCluster(rc *model.RemoteCluster) (*model.RemoteCluster, *model.AppError)
AddSamlIdpCertificate(fileData *multipart.FileHeader) *model.AppError
AddSamlPrivateCertificate(fileData *multipart.FileHeader) *model.AppError
@ -988,7 +989,7 @@ type AppIface interface {
Publish(message *model.WebSocketEvent)
PublishUserTyping(userID, channelID, parentId string) *model.AppError
PurgeBleveIndexes(c request.CTX) *model.AppError
PurgeElasticsearchIndexes(c request.CTX) *model.AppError
PurgeElasticsearchIndexes(c request.CTX, indexes []string) *model.AppError
QueryLogs(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError)
ReadFile(path string) ([]byte, *model.AppError)
RecycleDatabaseConnection(rctx request.CTX)

View File

@ -16,16 +16,6 @@ import (
"github.com/mattermost/mattermost/server/v8/channels/store"
)
func (a *App) getSysAdminsEmailRecipients() ([]*model.User, *model.AppError) {
userOptions := &model.UserGetOptions{
Page: 0,
PerPage: 100,
Role: model.SystemAdminRoleId,
Inactive: false,
}
return a.GetUsersFromProfiles(userOptions)
}
func getCurrentPlanName(a *App) (string, *model.AppError) {
subscription, err := a.Cloud().GetSubscription("")
if err != nil {
@ -48,7 +38,7 @@ func getCurrentPlanName(a *App) (string, *model.AppError) {
}
func (a *App) SendPaymentFailedEmail(failedPayment *model.FailedPayment) *model.AppError {
sysAdmins, err := a.getSysAdminsEmailRecipients()
sysAdmins, err := a.getAllSystemAdmins()
if err != nil {
return err
}
@ -77,7 +67,7 @@ func getCurrentProduct(subscriptionProductID string, products []*model.Product)
}
func (a *App) SendDelinquencyEmail(emailToSend model.DelinquencyEmail) *model.AppError {
sysAdmins, aErr := a.getSysAdminsEmailRecipients()
sysAdmins, aErr := a.getAllSystemAdmins()
if aErr != nil {
return aErr
}
@ -161,7 +151,7 @@ func getNextBillingDateString() string {
}
func (a *App) SendUpgradeConfirmationEmail(isYearly bool) *model.AppError {
sysAdmins, e := a.getSysAdminsEmailRecipients()
sysAdmins, e := a.getAllSystemAdmins()
if e != nil {
return e
}
@ -220,7 +210,7 @@ func (a *App) SendUpgradeConfirmationEmail(isYearly bool) *model.AppError {
// SendNoCardPaymentFailedEmail
func (a *App) SendNoCardPaymentFailedEmail() *model.AppError {
sysAdmins, err := a.getSysAdminsEmailRecipients()
sysAdmins, err := a.getAllSystemAdmins()
if err != nil {
return err
}
@ -323,7 +313,7 @@ func (a *App) DoSubscriptionRenewalCheck() {
return
}
sysAdmins, aErr := a.getSysAdminsEmailRecipients()
sysAdmins, aErr := a.getAllSystemAdmins()
if aErr != nil {
a.Log().Error("Error getting sys admins", mlog.Err(aErr))
return

View File

@ -64,6 +64,10 @@ func (a *App) AddConfigListener(listener func(*model.Config, *model.Config)) str
return a.Srv().platform.AddConfigListener(listener)
}
func (a *App) AddLicenseListener(listener func(oldLicense, newLicense *model.License)) string {
return a.Srv().platform.AddLicenseListener(listener)
}
// Removes a listener function by the unique ID returned when AddConfigListener was called
func (a *App) RemoveConfigListener(id string) {
a.Srv().platform.RemoveConfigListener(id)

View File

@ -0,0 +1,142 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"errors"
"net/url"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/i18n"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/utils"
)
func (a *App) initElasticsearchChannelIndexCheck() {
// the logic of when to perform the check has been derived from platform/searchengine.StartSearchEngine()
// Wherever we're starting the engine, we're checking the index mapping here.
a.elasticsearchChannelIndexCheckWithRetry()
a.AddConfigListener(func(oldConfig, newConfig *model.Config) {
if a.SearchEngine().ElasticsearchEngine == nil {
return
}
oldESConfig := oldConfig.ElasticsearchSettings
newESConfig := newConfig.ElasticsearchSettings
// if indexing is turned on, check.
if !*oldESConfig.EnableIndexing && *newESConfig.EnableIndexing {
a.elasticsearchChannelIndexCheckWithRetry()
} else if *newESConfig.EnableIndexing && (*oldESConfig.Password != *newESConfig.Password || *oldESConfig.Username != *newESConfig.Username || *oldESConfig.ConnectionURL != *newESConfig.ConnectionURL || *oldESConfig.Sniff != *newESConfig.Sniff) {
// ES client reconnects if credentials or address changes
a.elasticsearchChannelIndexCheckWithRetry()
}
})
a.AddLicenseListener(func(oldLicense, newLicense *model.License) {
if a.SearchEngine() == nil {
return
}
// if a license was added, and it has ES enabled-
if oldLicense == nil && newLicense != nil {
if a.SearchEngine().ElasticsearchEngine != nil {
a.elasticsearchChannelIndexCheckWithRetry()
}
}
})
}
func (a *App) elasticsearchChannelIndexCheckWithRetry() {
// this is being done async to not block license application and config
// processes as the listeners for those are called synchronously.
go func() {
// using progressive retry because ES client may take some time to connect and be ready.
_ = utils.LongProgressiveRetry(func() error {
if !*a.Config().ElasticsearchSettings.EnableIndexing {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elasticsearch indexing is disabled")
return nil
}
elastic := a.SearchEngine().ElasticsearchEngine
if elastic == nil {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elastic engine is nil")
return errors.New("retry")
}
if !elastic.IsActive() {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elastic.IsActive is false")
return errors.New("retry")
}
a.elasticsearchChannelIndexCheck()
return nil
})
}()
}
func (a *App) elasticsearchChannelIndexCheck() {
if needNotify := a.elasticChannelsIndexNeedNotifyAdmins(); !needNotify {
return
}
// notify all system admins
systemBot, appErr := a.GetSystemBot(request.EmptyContext(a.Log()))
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: couldn't get system bot", mlog.Err(appErr))
return
}
sysAdmins, appErr := a.getAllSystemAdmins()
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred fetching all system admins", mlog.Err(appErr))
}
elasticsearchSettingsSectionLink, err := url.JoinPath(*a.Config().ServiceSettings.SiteURL, "admin_console/environment/elasticsearch")
if err != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred constructing Elasticsearch system console section path")
return
}
// TODO include a link to changelog
postMessage := i18n.T("app.channel.elasticsearch_channel_index.notify_admin.message", map[string]interface{}{"ElasticsearchSection": elasticsearchSettingsSectionLink})
for _, sysAdmin := range sysAdmins {
var channel *model.Channel
channel, appErr = a.GetOrCreateDirectChannel(request.EmptyContext(a.Log()), sysAdmin.Id, systemBot.UserId)
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred ensuring DM channel between system bot and sys admin", mlog.Err(appErr))
continue
}
post := &model.Post{
Message: postMessage,
UserId: systemBot.UserId,
ChannelId: channel.Id,
}
_, appErr = a.CreatePost(request.EmptyContext(a.Log()), post, channel, true, false)
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred creating post", mlog.Err(appErr))
continue
}
}
}
func (a *App) elasticChannelsIndexNeedNotifyAdmins() bool {
elastic := a.SearchEngine().ElasticsearchEngine
if elastic == nil {
a.Log().Debug("elasticChannelsIndexNeedNotifyAdmins: skipping because elastic engine is nil")
return false
}
if elastic.IsChannelsIndexVerified() {
a.Log().Debug("elasticChannelsIndexNeedNotifyAdmins: skipping because channels index is verified")
return false
}
return true
}

View File

@ -50,12 +50,6 @@ func RegisterJobsElasticsearchIndexerInterface(f func(*Server) ejobs.IndexerJobI
jobsElasticsearchIndexerInterface = f
}
var jobsElasticsearchFixChannelIndexInterface func(*Server) ejobs.ElasticsearchFixChannelIndexInterface
func RegisterJobsElasticsearchFixChannelIndexInterface(f func(*Server) ejobs.ElasticsearchFixChannelIndexInterface) {
jobsElasticsearchFixChannelIndexInterface = f
}
var jobsLdapSyncInterface func(*App) ejobs.LdapSyncInterface
func RegisterJobsLdapSyncInterface(f func(*App) ejobs.LdapSyncInterface) {

View File

@ -554,31 +554,6 @@ func (s *Server) doPostPriorityConfigDefaultTrueMigration() {
}
}
func (s *Server) doElasticsearchFixChannelIndex(c request.CTX) {
s.AddLicenseListener(func(oldLicense, newLicense *model.License) {
s.elasticsearchFixChannelIndex(c, newLicense)
})
s.elasticsearchFixChannelIndex(c, s.License())
}
func (s *Server) elasticsearchFixChannelIndex(c request.CTX, license *model.License) {
if model.BuildEnterpriseReady != "true" || license == nil || !*license.Features.Elasticsearch {
mlog.Debug("Skipping triggering Elasticsearch channel index fix job as build is not Enterprise ready")
return
}
// If the migration is already marked as completed, don't do it again.
if _, err := s.Store().System().GetByName(model.MigrationKeyElasticsearchFixChannelIndex); err == nil {
mlog.Debug("Skipping triggering Elasticsearch channel index fix job as it is already marked completed in database")
return
}
if _, appErr := s.Jobs.CreateJobOnce(c, model.JobTypeElasticsearchFixChannelIndex, nil); appErr != nil {
mlog.Fatal("failed to start job for fixing Elasticsearch channels index", mlog.Err(appErr))
}
}
func (s *Server) doCloudS3PathMigrations(c request.CTX) {
// This migration is only applicable for cloud environments
if os.Getenv("MM_CLOUD_FILESTORE_BIFROST") == "" {
@ -672,7 +647,6 @@ func (s *Server) doAppMigrations() {
s.doFirstAdminSetupCompleteMigration()
s.doRemainingSchemaMigrations()
s.doPostPriorityConfigDefaultTrueMigration()
s.doElasticsearchFixChannelIndex(c)
s.doCloudS3PathMigrations(c)
s.doDeleteEmptyDraftsMigration(c)
s.doDeleteOrphanDraftsMigration(c)

View File

@ -247,6 +247,23 @@ func (a *OpenTracingAppLayer) AddLdapPublicCertificate(fileData *multipart.FileH
return resultVar0
}
func (a *OpenTracingAppLayer) AddLicenseListener(listener func(oldLicense, newLicense *model.License)) string {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.AddLicenseListener")
a.ctx = newCtx
a.app.Srv().Store().SetContext(newCtx)
defer func() {
a.app.Srv().Store().SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0 := a.app.AddLicenseListener(listener)
return resultVar0
}
func (a *OpenTracingAppLayer) AddPublicKey(name string, key io.Reader) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.AddPublicKey")
@ -13818,7 +13835,7 @@ func (a *OpenTracingAppLayer) PurgeBleveIndexes(c request.CTX) *model.AppError {
return resultVar0
}
func (a *OpenTracingAppLayer) PurgeElasticsearchIndexes(c request.CTX) *model.AppError {
func (a *OpenTracingAppLayer) PurgeElasticsearchIndexes(c request.CTX, indexes []string) *model.AppError {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.PurgeElasticsearchIndexes")
@ -13830,7 +13847,7 @@ func (a *OpenTracingAppLayer) PurgeElasticsearchIndexes(c request.CTX) *model.Ap
}()
defer span.Finish()
resultVar0 := a.app.PurgeElasticsearchIndexes(c)
resultVar0 := a.app.PurgeElasticsearchIndexes(c, indexes)
if resultVar0 != nil {
span.LogFields(spanlog.Error(resultVar0))

View File

@ -36,18 +36,21 @@ func (a *App) SetSearchEngine(se *searchengine.Broker) {
a.ch.srv.platform.SearchEngine = se
}
func (a *App) PurgeElasticsearchIndexes(c request.CTX) *model.AppError {
func (a *App) PurgeElasticsearchIndexes(c request.CTX, indexes []string) *model.AppError {
engine := a.SearchEngine().ElasticsearchEngine
if engine == nil {
err := model.NewAppError("PurgeElasticsearchIndexes", "ent.elasticsearch.test_config.license.error", nil, "", http.StatusNotImplemented)
return err
}
if err := engine.PurgeIndexes(c); err != nil {
return err
var appErr *model.AppError
if len(indexes) > 0 {
appErr = engine.PurgeIndexList(c, indexes)
} else {
appErr = engine.PurgeIndexes(c)
}
return nil
return appErr
}
func (a *App) PurgeBleveIndexes(c request.CTX) *model.AppError {

View File

@ -490,6 +490,8 @@ func NewServer(options ...Option) (*Server, error) {
}
})
app.initElasticsearchChannelIndexCheck()
return s, nil
}
@ -1479,11 +1481,6 @@ func (s *Server) initJobs() {
s.Jobs.RegisterJobType(model.JobTypeElasticsearchPostIndexing, builder.MakeWorker(), nil)
}
if jobsElasticsearchFixChannelIndexInterface != nil {
builder := jobsElasticsearchFixChannelIndexInterface(s)
s.Jobs.RegisterJobType(model.JobTypeElasticsearchFixChannelIndex, builder.MakeWorker(), nil)
}
if jobsLdapSyncInterface != nil {
builder := jobsLdapSyncInterface(New(ServerConnector(s.Channels())))
s.Jobs.RegisterJobType(model.JobTypeLdapSync, builder.MakeWorker(), builder.MakeScheduler())

View File

@ -2864,3 +2864,13 @@ func (a *App) UserIsFirstAdmin(user *model.User) bool {
return true
}
func (a *App) getAllSystemAdmins() ([]*model.User, *model.AppError) {
userOptions := &model.UserGetOptions{
Page: 0,
PerPage: 500,
Role: model.SystemAdminRoleId,
Inactive: false,
}
return a.GetUsersFromProfiles(userOptions)
}

View File

@ -7,10 +7,20 @@ import (
"time"
)
var backoffTimeouts = []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond, 400 * time.Millisecond}
var shortBackoffTimeouts = []time.Duration{50 * time.Millisecond, 100 * time.Millisecond, 200 * time.Millisecond, 200 * time.Millisecond, 400 * time.Millisecond, 400 * time.Millisecond}
var longBackoffTimeouts = []time.Duration{500 * time.Millisecond, 1 * time.Second, 2 * time.Second, 5 * time.Second, 8 * time.Second, 10 * time.Second}
// ProgressiveRetry executes a BackoffOperation and waits an increasing time before retrying the operation.
func ProgressiveRetry(operation func() error) error {
return CustomProgressiveRetry(operation, shortBackoffTimeouts)
}
func LongProgressiveRetry(operation func() error) error {
return CustomProgressiveRetry(operation, longBackoffTimeouts)
}
func CustomProgressiveRetry(operation func() error, backoffTimeouts []time.Duration) error {
var err error
for attempts := 0; attempts < len(backoffTimeouts); attempts++ {

View File

@ -15,7 +15,3 @@ type ElasticsearchAggregatorInterface interface {
MakeWorker() model.Worker
MakeScheduler() Scheduler
}
type ElasticsearchFixChannelIndexInterface interface {
MakeWorker() model.Worker
}

View File

@ -1649,6 +1649,10 @@
"id": "api.config.reload_config.app_error",
"translation": "Failed to reload config."
},
{
"id": "api.config.update.elasticsearch.autocomplete_cannot_be_enabled_error",
"translation": "Channel autocomplete cannot be enabled as channel index schema is out of date. It is recommended to regenerate your channel index. See the Mattermost changelog for more information"
},
{
"id": "api.config.update_config.clear_siteurl.app_error",
"translation": "Site URL cannot be cleared."
@ -4754,6 +4758,10 @@
"id": "app.channel.delete.app_error",
"translation": "Unable to delete the channel."
},
{
"id": "app.channel.elasticsearch_channel_index.notify_admin.message",
"translation": "Your Elasticsearch channel index schema is out of date. It is recommended to regenerate your channel index.\nClick the `Rebuild Channels Index` button in [Elasticsearch section in System Console]({{.ElasticsearchSection}}) to fix the issue.\nSee Mattermost changelog for more information."
},
{
"id": "app.channel.get.existing.app_error",
"translation": "Unable to find the existing channel."
@ -7462,6 +7470,10 @@
"id": "bleveengine.purge_file_index.error",
"translation": "Failed to purge file indexes."
},
{
"id": "bleveengine.purge_list.not_implemented",
"translation": "Purge list feature is not available for Bleve."
},
{
"id": "bleveengine.purge_post_index.error",
"translation": "Failed to purge post indexes."
@ -7826,10 +7838,18 @@
"id": "ent.elasticsearch.post.get_posts_batch_for_indexing.error",
"translation": "Unable to get the posts batch for indexing."
},
{
"id": "ent.elasticsearch.purge_index.delete_failed",
"translation": "Failed to delete an Elasticsearch index"
},
{
"id": "ent.elasticsearch.purge_indexes.delete_failed",
"translation": "Failed to delete Elasticsearch index"
},
{
"id": "ent.elasticsearch.purge_indexes.unknown_index",
"translation": "Failed to delete an unknown index specified"
},
{
"id": "ent.elasticsearch.refresh_indexes.refresh_failed",
"translation": "Failed to refresh Elasticsearch indexes"

View File

@ -292,6 +292,10 @@ func (b *BleveEngine) PurgeIndexes(rctx request.CTX) *model.AppError {
return b.openIndexes()
}
func (b *BleveEngine) PurgeIndexList(rctx request.CTX, indexes []string) *model.AppError {
return model.NewAppError("Bleve.PurgeIndex", "bleveengine.purge_list.not_implemented", nil, "not implemented", http.StatusNotFound)
}
func (b *BleveEngine) DataRetentionDeleteIndexes(rctx request.CTX, cutoff time.Time) *model.AppError {
return nil
}
@ -331,3 +335,7 @@ func (b *BleveEngine) UpdateConfig(cfg *model.Config) {
}
b.cfg = cfg
}
func (b *BleveEngine) IsChannelsIndexVerified() bool {
return true
}

View File

@ -47,6 +47,8 @@ type SearchEngineInterface interface {
DeleteFilesBatch(rctx request.CTX, endTime, limit int64) *model.AppError
TestConfig(rctx request.CTX, cfg *model.Config) *model.AppError
PurgeIndexes(rctx request.CTX) *model.AppError
PurgeIndexList(rctx request.CTX, indexes []string) *model.AppError
RefreshIndexes(rctx request.CTX) *model.AppError
DataRetentionDeleteIndexes(rctx request.CTX, cutoff time.Time) *model.AppError
IsChannelsIndexVerified() bool
}

View File

@ -327,6 +327,20 @@ func (_m *SearchEngineInterface) IsAutocompletionEnabled() bool {
return r0
}
// IsChannelsIndexVerified provides a mock function with given fields:
func (_m *SearchEngineInterface) IsChannelsIndexVerified() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// IsEnabled provides a mock function with given fields:
func (_m *SearchEngineInterface) IsEnabled() bool {
ret := _m.Called()
@ -383,6 +397,22 @@ func (_m *SearchEngineInterface) IsSearchEnabled() bool {
return r0
}
// PurgeIndexList provides a mock function with given fields: rctx, indexes
func (_m *SearchEngineInterface) PurgeIndexList(rctx request.CTX, indexes []string) *model.AppError {
ret := _m.Called(rctx, indexes)
var r0 *model.AppError
if rf, ok := ret.Get(0).(func(request.CTX, []string) *model.AppError); ok {
r0 = rf(rctx, indexes)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.AppError)
}
}
return r0
}
// PurgeIndexes provides a mock function with given fields: rctx
func (_m *SearchEngineInterface) PurgeIndexes(rctx request.CTX) *model.AppError {
ret := _m.Called(rctx)

View File

@ -12,7 +12,6 @@ const (
JobTypeMessageExport = "message_export"
JobTypeElasticsearchPostIndexing = "elasticsearch_post_indexing"
JobTypeElasticsearchPostAggregation = "elasticsearch_post_aggregation"
JobTypeElasticsearchFixChannelIndex = "elasticsearch_fix_channel_index"
JobTypeBlevePostIndexing = "bleve_post_indexing"
JobTypeLdapSync = "ldap_sync"
JobTypeMigrations = "migrations"

View File

@ -41,7 +41,6 @@ const (
MigrationKeyAddProductsBoardsPermissions = "products_boards"
MigrationKeyAddCustomUserGroupsPermissionRestore = "custom_groups_permission_restore"
MigrationKeyAddReadChannelContentPermissions = "read_channel_content_permissions"
MigrationKeyElasticsearchFixChannelIndex = "elasticsearch_fix_channel_index_migration"
MigrationKeyS3Path = "s3_path_migration"
MigrationKeyDeleteEmptyDrafts = "delete_empty_drafts_migration"
MigrationKeyDeleteOrphanDrafts = "delete_orphan_drafts_migration"

View File

@ -3,6 +3,7 @@
import * as AdminActions from 'mattermost-redux/actions/admin';
import {bindClientFunc} from 'mattermost-redux/actions/helpers';
import {createJob} from 'mattermost-redux/actions/jobs';
import * as TeamActions from 'mattermost-redux/actions/teams';
import * as UserActions from 'mattermost-redux/actions/users';
import {Client4} from 'mattermost-redux/client';
@ -12,7 +13,7 @@ import {trackEvent} from 'actions/telemetry_actions.jsx';
import {getOnNavigationConfirmed} from 'selectors/views/admin';
import store from 'stores/redux_store';
import {ActionTypes} from 'utils/constants';
import {ActionTypes, JobTypes} from 'utils/constants';
const dispatch = store.dispatch;
@ -316,8 +317,8 @@ export async function testS3Connection(success, error) {
}
}
export async function elasticsearchPurgeIndexes(success, error) {
const {data, error: err} = await dispatch(AdminActions.purgeElasticsearchIndexes());
export async function elasticsearchPurgeIndexes(success, error, indexes) {
const {data, error: err} = await dispatch(AdminActions.purgeElasticsearchIndexes(indexes));
if (data && success) {
success(data);
} else if (err && error) {
@ -325,6 +326,31 @@ export async function elasticsearchPurgeIndexes(success, error) {
}
}
export async function jobCreate(success, error, job) {
const {data, error: err} = await dispatch(createJob(job));
if (data && success) {
success(data);
} else if (err && error) {
error({id: err.server_error_id, ...err});
}
}
export async function rebuildChannelsIndex(success, error) {
await elasticsearchPurgeIndexes(undefined, error, ['channels']);
const job = {
type: JobTypes.ELASTICSEARCH_POST_INDEXING,
data: {
index_posts: 'false',
index_users: 'false',
index_files: 'false',
index_channels: 'true',
sub_type: 'channels_index_rebuild',
},
};
await jobCreate(undefined, error, job);
success();
}
export async function blevePurgeIndexes(success, error) {
const purgeBleveIndexes = bindClientFunc({
clientFunc: Client4.purgeBleveIndexes,

View File

@ -338,6 +338,50 @@ exports[`components/ElasticSearchSettings should match snapshot, disabled 1`] =
</div>
</div>
</div>
<RequestButton
buttonText={
<Memo(MemoizedFormattedMessage)
defaultMessage="Rebuild Channels Index"
id="admin.elasticsearch.rebuildChannelsIndex.title"
/>
}
disabled={true}
errorMessage={
Object {
"defaultMessage": "Failed to trigger channels index rebuild job: {error}",
"id": "admin.elasticsearch.rebuildIndexSuccessfully.error",
}
}
helpText={
<Memo(MemoizedFormattedMessage)
defaultMessage="This purges the channels index and re-indexes all channels in the database, from oldest to newest. Channel autocomplete is available during indexing but search results may be incomplete until the indexing job is complete.
<b>Note- Please ensure no other indexing job is in progress in the table above.</b>"
id="admin.elasticsearch.rebuildChannelsIndex.helpText"
values={
Object {
"b": [Function],
}
}
/>
}
id="rebuildChannelsIndexButton"
includeDetailedError={false}
label={
<Memo(MemoizedFormattedMessage)
defaultMessage="Rebuild Channels Index"
id="admin.elasticsearch.rebuildChannelsIndex.title"
/>
}
requestAction={[MockFunction]}
saveNeeded={false}
showSuccessMessage={true}
successMessage={
Object {
"defaultMessage": "Channels index rebuild job triggered successfully.",
"id": "admin.elasticsearch.rebuildIndexSuccessfully.success",
}
}
/>
<RequestButton
buttonText={
<Memo(MemoizedFormattedMessage)
@ -843,6 +887,50 @@ exports[`components/ElasticSearchSettings should match snapshot, enabled 1`] = `
</div>
</div>
</div>
<RequestButton
buttonText={
<Memo(MemoizedFormattedMessage)
defaultMessage="Rebuild Channels Index"
id="admin.elasticsearch.rebuildChannelsIndex.title"
/>
}
disabled={false}
errorMessage={
Object {
"defaultMessage": "Failed to trigger channels index rebuild job: {error}",
"id": "admin.elasticsearch.rebuildIndexSuccessfully.error",
}
}
helpText={
<Memo(MemoizedFormattedMessage)
defaultMessage="This purges the channels index and re-indexes all channels in the database, from oldest to newest. Channel autocomplete is available during indexing but search results may be incomplete until the indexing job is complete.
<b>Note- Please ensure no other indexing job is in progress in the table above.</b>"
id="admin.elasticsearch.rebuildChannelsIndex.helpText"
values={
Object {
"b": [Function],
}
}
/>
}
id="rebuildChannelsIndexButton"
includeDetailedError={false}
label={
<Memo(MemoizedFormattedMessage)
defaultMessage="Rebuild Channels Index"
id="admin.elasticsearch.rebuildChannelsIndex.title"
/>
}
requestAction={[MockFunction]}
saveNeeded={false}
showSuccessMessage={true}
successMessage={
Object {
"defaultMessage": "Channels index rebuild job triggered successfully.",
"id": "admin.elasticsearch.rebuildIndexSuccessfully.success",
}
}
/>
<RequestButton
buttonText={
<Memo(MemoizedFormattedMessage)

View File

@ -12,6 +12,7 @@ import SaveButton from 'components/save_button';
jest.mock('actions/admin_actions.jsx', () => {
return {
elasticsearchPurgeIndexes: jest.fn(),
rebuildChannelsIndex: jest.fn(),
elasticsearchTest: (config: AdminConfig, success: () => void) => success(),
};
});

View File

@ -8,7 +8,7 @@ import {FormattedMessage, defineMessage, defineMessages} from 'react-intl';
import type {AdminConfig} from '@mattermost/types/config';
import type {Job, JobType} from '@mattermost/types/jobs';
import {elasticsearchPurgeIndexes, elasticsearchTest} from 'actions/admin_actions.jsx';
import {elasticsearchPurgeIndexes, elasticsearchTest, rebuildChannelsIndex} from 'actions/admin_actions.jsx';
import ExternalLink from 'components/external_link';
@ -62,6 +62,9 @@ export const messages = defineMessages({
elasticsearch_test_button: {id: 'admin.elasticsearch.elasticsearch_test_button', defaultMessage: 'Test Connection'},
bulkIndexingTitle: {id: 'admin.elasticsearch.bulkIndexingTitle', defaultMessage: 'Bulk Indexing:'},
help: {id: 'admin.elasticsearch.createJob.help', defaultMessage: 'All users, channels and posts in the database will be indexed from oldest to newest. Elasticsearch is available during indexing but search results may be incomplete until the indexing job is complete.'},
rebuildChannelsIndexTitle: {id: 'admin.elasticsearch.rebuildChannelsIndexTitle', defaultMessage: 'Rebuild Channels Index'},
rebuildChannelIndexHelpText: {id: 'admin.elasticsearch.rebuildChannelsIndex.helpText', defaultMessage: 'This purges the channels index and re-indexes all channels in the database, from oldest to newest. Channel autocomplete is available during indexing but search results may be incomplete until the indexing job is complete.\n<b>Note- Please ensure no other indexing job is in progress in the table above.</b>'},
rebuildChannelsIndexButtonText: {id: 'admin.elasticsearch.rebuildChannelsIndex.title', defaultMessage: 'Rebuild Channels Index'},
purgeIndexesHelpText: {id: 'admin.elasticsearch.purgeIndexesHelpText', defaultMessage: 'Purging will entirely remove the indexes on the Elasticsearch server. Search results may be incomplete until a bulk index of the existing database is rebuilt.'},
purgeIndexesButton: {id: 'admin.elasticsearch.purgeIndexesButton', defaultMessage: 'Purge Index'},
label: {id: 'admin.elasticsearch.purgeIndexesButton.label', defaultMessage: 'Purge Indexes:'},
@ -197,8 +200,22 @@ export default class ElasticsearchSettings extends AdminSettings<Props, State> {
};
getExtraInfo(job: Job) {
let jobSubType = null;
if (job.data?.sub_type === 'channels_index_rebuild') {
jobSubType = (
<span>
{'. '}
<FormattedMessage
id='admin.elasticsearch.channelIndexRebuildJobTitle'
defaultMessage='Channels index rebuild job.'
/>
</span>
);
}
let jobProgress = null;
if (job.status === JobStatuses.IN_PROGRESS) {
return (
jobProgress = (
<FormattedMessage
id='admin.elasticsearch.percentComplete'
defaultMessage='{percent}% Complete'
@ -207,7 +224,7 @@ export default class ElasticsearchSettings extends AdminSettings<Props, State> {
);
}
return null;
return (<span>{jobProgress}{jobSubType}</span>);
}
renderTitle() {
@ -400,6 +417,29 @@ export default class ElasticsearchSettings extends AdminSettings<Props, State> {
</div>
</div>
</div>
<RequestButton
id='rebuildChannelsIndexButton'
requestAction={rebuildChannelsIndex}
helpText={
<FormattedMessage
{...messages.rebuildChannelIndexHelpText}
values={{
b: (chunks: React.ReactNode) => (<b>{chunks}</b>),
}}
/>
}
buttonText={<FormattedMessage {...messages.rebuildChannelsIndexButtonText}/>}
successMessage={defineMessage({
id: 'admin.elasticsearch.rebuildIndexSuccessfully.success',
defaultMessage: 'Channels index rebuild job triggered successfully.',
})}
errorMessage={defineMessage({
id: 'admin.elasticsearch.rebuildIndexSuccessfully.error',
defaultMessage: 'Failed to trigger channels index rebuild job: {error}',
})}
disabled={!this.state.canPurgeAndIndex || this.props.isDisabled!}
label={<FormattedMessage {...messages.rebuildChannelsIndexButtonText}/>}
/>
<RequestButton
id='purgeIndexesSection'
requestAction={elasticsearchPurgeIndexes}

View File

@ -29,6 +29,8 @@ export type Props = {
className?: string;
hideJobCreateButton?: boolean;
createJobButtonText: React.ReactNode;
hideTable?: boolean;
jobData?: any;
actions: {
getJobsByType: (jobType: JobType) => void;
cancelJob: (jobId: string) => Promise<ActionResult>;
@ -75,6 +77,7 @@ class JobTable extends React.PureComponent<Props> {
e.preventDefault();
const job = {
type: this.props.jobType,
data: this.props.jobData,
};
await this.props.actions.createJob(job);
@ -131,53 +134,56 @@ class JobTable extends React.PureComponent<Props> {
{this.props.createJobHelpText}
</div>
</div>
<div className='job-table__table'>
<table
className='table'
data-testid='jobTable'
>
<thead>
<tr>
<th className='cancel-button-field'/>
<th>
<FormattedMessage
id='admin.jobTable.headerStatus'
defaultMessage='Status'
/>
</th>
{showFilesColumn &&
{
!this.props.hideTable &&
<div className='job-table__table'>
<table
className='table'
data-testid='jobTable'
>
<thead>
<tr>
<th className='cancel-button-field'/>
<th>
<FormattedMessage
id='admin.jobTable.headerStatus'
defaultMessage='Status'
/>
</th>
{showFilesColumn &&
<th>
<FormattedMessage
id='admin.jobTable.headerFiles'
defaultMessage='Files'
/>
</th>
}
<th>
<FormattedMessage
id='admin.jobTable.headerFinishAt'
defaultMessage='Finish Time'
/>
</th>
<th>
<FormattedMessage
id='admin.jobTable.headerRunTime'
defaultMessage='Run Time'
/>
</th>
<th colSpan={3}>
<FormattedMessage
id='admin.jobTable.headerExtraInfo'
defaultMessage='Details'
/>
</th>
</tr>
</thead>
<tbody>
{items}
</tbody>
</table>
</div>
}
<th>
<FormattedMessage
id='admin.jobTable.headerFinishAt'
defaultMessage='Finish Time'
/>
</th>
<th>
<FormattedMessage
id='admin.jobTable.headerRunTime'
defaultMessage='Run Time'
/>
</th>
<th colSpan={3}>
<FormattedMessage
id='admin.jobTable.headerExtraInfo'
defaultMessage='Details'
/>
</th>
</tr>
</thead>
<tbody>
{items}
</tbody>
</table>
</div>
}
</div>
);
}

View File

@ -799,6 +799,7 @@
"admin.elasticsearch.caDescription": "(Optional) Custom Certificate Authority certificates for the Elasticsearch server. Leave this empty to use the default CAs from the operating system.",
"admin.elasticsearch.caExample": "E.g.: \"./elasticsearch/ca.pem\"",
"admin.elasticsearch.caTitle": "CA path:",
"admin.elasticsearch.channelIndexRebuildJobTitle": "Channels index rebuild job.",
"admin.elasticsearch.clientCertDescription": "(Optional) The client certificate for the connection to the Elasticsearch server in the PEM format.",
"admin.elasticsearch.clientCertExample": "E.g.: \"./elasticsearch/client-cert.pem\"",
"admin.elasticsearch.clientCertTitle": "Client Certificate path:",
@ -829,6 +830,11 @@
"admin.elasticsearch.purgeIndexesButton.label": "Purge Indexes:",
"admin.elasticsearch.purgeIndexesButton.success": "Indexes purged successfully.",
"admin.elasticsearch.purgeIndexesHelpText": "Purging will entirely remove the indexes on the Elasticsearch server. Search results may be incomplete until a bulk index of the existing database is rebuilt.",
"admin.elasticsearch.rebuildChannelsIndex.helpText": "This purges the channels index and re-indexes all channels in the database, from oldest to newest. Channel autocomplete is available during indexing but search results may be incomplete until the indexing job is complete.\n\n<b>Note- Please ensure no other indexing job is in progress in the table above.</b>",
"admin.elasticsearch.rebuildChannelsIndex.title": "Rebuild Channels Index",
"admin.elasticsearch.rebuildChannelsIndexTitle": "Rebuild Channels Index",
"admin.elasticsearch.rebuildIndexSuccessfully.error": "Failed to trigger channels index rebuild job.",
"admin.elasticsearch.rebuildIndexSuccessfully.success": "Channels index rebuild job triggered successfully.",
"admin.elasticsearch.skipTLSVerificationDescription": "When true, Mattermost will not require the Elasticsearch certificate to be signed by a trusted Certificate Authority.",
"admin.elasticsearch.skipTLSVerificationTitle": "Skip TLS Verification:",
"admin.elasticsearch.sniffDescription": "When true, sniffing finds and connects to all data nodes in your cluster automatically.",

View File

@ -358,9 +358,12 @@ export function testElasticsearch(config?: AdminConfig) {
});
}
export function purgeElasticsearchIndexes() {
export function purgeElasticsearchIndexes(indexes?: string[]) {
return bindClientFunc({
clientFunc: Client4.purgeElasticsearchIndexes,
params: [
indexes,
],
});
}

View File

@ -3369,9 +3369,9 @@ export default class Client4 {
);
};
purgeElasticsearchIndexes = () => {
purgeElasticsearchIndexes = (indexes?: string[]) => {
return this.doFetch<StatusOK>(
`${this.getBaseRoute()}/elasticsearch/purge_indexes`,
`${this.getBaseRoute()}/elasticsearch/purge_indexes${indexes && indexes.length > 0 ? '?index=' + indexes.join(',') : ''}`,
{method: 'post'},
);
};