Removed Elasticsearch and Opensearch channel index schedma check (#30102)

This commit is contained in:
Harshil Sharma
2025-02-06 11:50:37 +05:30
committed by GitHub
parent 8b65771a31
commit 35e776d805
9 changed files with 2 additions and 419 deletions

View File

@@ -1,153 +0,0 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package app
import (
"errors"
"net/url"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/i18n"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/utils"
)
func (a *App) initElasticsearchChannelIndexCheck() {
// the logic of when to perform the check has been derived from platform/searchengine.StartSearchEngine()
// Wherever we're starting the engine, we're checking the index mapping here.
a.Log().Debug("initElasticsearchChannelIndexCheck: calling elasticsearchChannelIndexCheckWithRetry before setting up config and license change listeners...")
a.elasticsearchChannelIndexCheckWithRetry()
a.AddConfigListener(func(oldConfig, newConfig *model.Config) {
if a.SearchEngine().ElasticsearchEngine == nil {
return
}
oldESConfig := oldConfig.ElasticsearchSettings
newESConfig := newConfig.ElasticsearchSettings
// if indexing is turned on, check.
if !*oldESConfig.EnableIndexing && *newESConfig.EnableIndexing {
a.Log().Debug("initElasticsearchChannelIndexCheck: calling elasticsearchChannelIndexCheckWithRetry from config change listener as ES indexing was turned from 'off' to 'on'")
a.elasticsearchChannelIndexCheckWithRetry()
} else if *newESConfig.EnableIndexing && (*oldESConfig.Password != *newESConfig.Password || *oldESConfig.Username != *newESConfig.Username || *oldESConfig.ConnectionURL != *newESConfig.ConnectionURL || *oldESConfig.Sniff != *newESConfig.Sniff) {
// ES client reconnects if credentials or address changes
a.Log().Debug("initElasticsearchChannelIndexCheck: calling elasticsearchChannelIndexCheckWithRetry from config change listener one of the Elasticsearch config param changed")
a.elasticsearchChannelIndexCheckWithRetry()
}
})
a.AddLicenseListener(func(oldLicense, newLicense *model.License) {
if a.SearchEngine() == nil {
return
}
// if a license was added, and it has ES enabled-
if oldLicense == nil && newLicense != nil {
if a.SearchEngine().ElasticsearchEngine != nil {
a.Log().Debug("initElasticsearchChannelIndexCheck: calling elasticsearchChannelIndexCheckWithRetry from license change listener")
a.elasticsearchChannelIndexCheckWithRetry()
}
}
})
}
func (a *App) elasticsearchChannelIndexCheckWithRetry() {
// this is being done async to not block license application and config
// processes as the listeners for those are called synchronously.
go func() {
// using progressive retry because ES client may take some time to connect and be ready.
_ = utils.LongProgressiveRetry(func() error {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: attempting to check channel index state...")
if !*a.Config().ElasticsearchSettings.EnableIndexing {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elasticsearch indexing is disabled")
return nil
}
elastic := a.SearchEngine().ElasticsearchEngine
if elastic == nil {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elastic engine is nil")
return errors.New("retry")
}
if !elastic.IsActive() {
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: skipping because elastic.IsActive is false")
return errors.New("retry")
}
a.Log().Debug("elasticsearchChannelIndexCheckWithRetry: checking channel index state...")
a.elasticsearchChannelIndexCheck()
return nil
})
}()
}
func (a *App) elasticsearchChannelIndexCheck() {
a.Log().Debug("elasticsearchChannelIndexCheck: checking if there is a need to notify the admins...")
if needNotify := a.elasticChannelsIndexNeedNotifyAdmins(); !needNotify {
a.Log().Debug("elasticsearchChannelIndexCheck: index is verified, no need to notify admins.")
return
}
a.Log().Debug("elasticsearchChannelIndexCheck: index is not verified, need to notify admins.")
// notify all system admins
systemBot, appErr := a.GetSystemBot(request.EmptyContext(a.Log()))
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: couldn't get system bot", mlog.Err(appErr))
return
}
sysAdmins, appErr := a.getAllSystemAdmins()
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred fetching all system admins", mlog.Err(appErr))
}
elasticsearchSettingsSectionLink, err := url.JoinPath(*a.Config().ServiceSettings.SiteURL, "admin_console/environment/elasticsearch")
if err != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred constructing Elasticsearch system console section path")
return
}
// TODO include a link to changelog
postMessage := i18n.T("app.channel.elasticsearch_channel_index.notify_admin.message", map[string]any{"ElasticsearchSection": elasticsearchSettingsSectionLink})
for _, sysAdmin := range sysAdmins {
var channel *model.Channel
channel, appErr = a.GetOrCreateDirectChannel(request.EmptyContext(a.Log()), sysAdmin.Id, systemBot.UserId)
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred ensuring DM channel between system bot and sys admin", mlog.Err(appErr))
continue
}
post := &model.Post{
Message: postMessage,
UserId: systemBot.UserId,
ChannelId: channel.Id,
}
_, appErr = a.CreatePost(request.EmptyContext(a.Log()), post, channel, model.CreatePostFlags{TriggerWebhooks: true})
if appErr != nil {
a.Log().Error("elasticsearchChannelIndexCheck: error occurred creating post", mlog.Err(appErr))
continue
}
}
}
func (a *App) elasticChannelsIndexNeedNotifyAdmins() bool {
elastic := a.SearchEngine().ElasticsearchEngine
if elastic == nil {
a.Log().Debug("elasticChannelsIndexNeedNotifyAdmins: skipping because elastic engine is nil")
return false
}
if elastic.IsChannelsIndexVerified() {
a.Log().Debug("elasticChannelsIndexNeedNotifyAdmins: skipping because channels index is verified")
return false
}
return true
}

View File

@@ -502,8 +502,6 @@ func NewServer(options ...Option) (*Server, error) {
}
})
app.initElasticsearchChannelIndexCheck()
return s, nil
}

View File

@@ -2922,13 +2922,3 @@ func (a *App) UserIsFirstAdmin(rctx request.CTX, user *model.User) bool {
return true
}
func (a *App) getAllSystemAdmins() ([]*model.User, *model.AppError) {
userOptions := &model.UserGetOptions{
Page: 0,
PerPage: 500,
Role: model.SystemAdminRoleId,
Inactive: false,
}
return a.GetUsersFromProfiles(userOptions)
}

View File

@@ -44,14 +44,6 @@ type ElasticsearchInterfaceImpl struct {
bulkProcessor *Bulk
Platform *platform.PlatformService
// This flag is for indicating if channel index's mappings
// has been verified, and if so, what was the result.
//
// value = 0 indicates it has NOT BEEN CHECKED
// value = 1 indicates index has been checked and has CORRECT mappings
// value = 2 indicates index has been checked and it has INCORRECT mappings
channelIndexVerified int32
}
func getJSONOrErrorStr(obj any) string {
@@ -87,25 +79,7 @@ func (es *ElasticsearchInterfaceImpl) IsSearchEnabled() bool {
}
func (es *ElasticsearchInterfaceImpl) IsAutocompletionEnabled() bool {
// if we encounter the index mappings haven't been checked, we check it once and store result.
// While in most cases the flag would have been set in the `Start` function,
// There's a case if you call the update config API and enable ES and autocomplete at the same time, it's not set
// so we're checking if it's unset here and trying to check the index.
if atomic.LoadInt32(&es.channelIndexVerified) == 0 {
es.Platform.Log().Debug("Elasticsearch.IsAutocompletionEnabled: channel index has not been verified yet, checking index now")
es.checkChannelIndex()
}
return *es.Platform.Config().ElasticsearchSettings.EnableAutocomplete && atomic.LoadInt32(&es.channelIndexVerified) == 1
}
func (es *ElasticsearchInterfaceImpl) IsChannelsIndexVerified() bool {
if atomic.LoadInt32(&es.channelIndexVerified) == 0 {
es.Platform.Log().Debug("Elasticsearch.IsChannelsIndexVerified: channel index has not been verified yet, checking index now")
es.checkChannelIndex()
}
return atomic.LoadInt32(&es.channelIndexVerified) == 1
return *es.Platform.Config().ElasticsearchSettings.EnableAutocomplete
}
func (es *ElasticsearchInterfaceImpl) IsIndexingSync() bool {
@@ -191,10 +165,6 @@ func (es *ElasticsearchInterfaceImpl) Start() *model.AppError {
return model.NewAppError("Elasticsearch.start", "ent.elasticsearch.create_template_file_info_if_not_exists.template_create_failed", map[string]any{"Backend": model.ElasticsearchSettingsESBackend}, "", http.StatusInternalServerError).Wrap(err)
}
if atomic.LoadInt32(&es.channelIndexVerified) == 0 {
es.checkChannelIndex()
}
atomic.StoreInt32(&es.ready, 1)
return nil
@@ -1876,83 +1846,3 @@ func checkMaxVersion(client *elastic.TypedClient, cfg *model.Config) (string, in
}
return resp.Version.Int, major, nil
}
// checkChannelIndex checks if channel index's mapping is correct.
// See Jira issue https://mattermost.atlassian.net/browse/MM-49257
func (es *ElasticsearchInterfaceImpl) checkChannelIndex() {
es.Platform.Log().Debug("Elasticsearch.checkChannelIndex: checking if channel index field is of correct type")
isCorrect, err := es.isFieldCorrect()
if err != nil {
return
}
if isCorrect {
es.Platform.Log().Debug("Elasticsearch.checkChannelIndex: channel index field is correct")
atomic.StoreInt32(&es.channelIndexVerified, 1)
} else {
es.Platform.Log().Debug("Elasticsearch.checkChannelIndex: channel index field is incorrect")
atomic.StoreInt32(&es.channelIndexVerified, 2)
}
}
func (es *ElasticsearchInterfaceImpl) isFieldCorrect() (bool, error) {
// We want to check if channel index's "type" field is of type "keyword".
// If the index is in incorrect state, the field would be of type "text".
es.Platform.Log().Debug("Elasticsearch.isFieldCorrect: querying ES to check if field is correct")
ctx, cancel := context.WithTimeout(
context.Background(),
time.Duration(*es.Platform.Config().ElasticsearchSettings.RequestTimeoutSeconds)*time.Second,
)
defer cancel()
indexName := *es.Platform.Config().ElasticsearchSettings.IndexPrefix + common.IndexBaseChannels
indexMappingInterface, err := es.client.Indices.GetFieldMapping("type").Index(indexName).Do(ctx)
if err != nil {
// The case of channels index not existing is fine,
// as whenever the index will be created, it will be created
// with the correct mappings.
elasticErr, ok := err.(*types.ElasticsearchError)
if ok && elasticErr.Status == http.StatusNotFound {
es.Platform.Logger().Debug("Elasticsearch isFieldCorrect: channel index doesn't exist", mlog.Err(err))
return true, nil
}
es.Platform.Logger().Error("Elasticsearch: Failed to fetch channels index template", mlog.Err(err))
return false, err
}
// this struct is declared here because
// it's not used anywhere outside this function
type channelsTypeFieldMapping struct {
Mappings struct {
Type struct {
Mapping struct {
Type struct {
Type string
}
}
}
}
}
mappingInterface := indexMappingInterface[indexName]
mappingBytes, err := json.Marshal(mappingInterface)
if err != nil {
es.Platform.Logger().Error("Elasticsearch: Failed to marshal Elasticsearch index field mapping", mlog.Err(err))
return false, err
}
es.Platform.Log().Debug("Elasticsearch.isFieldCorrect: channel index type field mapping queried successfully", mlog.String("mapping", string(mappingBytes)))
var mapping channelsTypeFieldMapping
err = json.Unmarshal(mappingBytes, &mapping)
if err != nil {
es.Platform.Logger().Error("Elasticsearch: Failed to unmarshal Elasticsearch index field mapping", mlog.Err(err))
return false, err
}
es.Platform.Logger().Debug("Elasticsearch: Found type of type field as", mlog.String("type", mapping.Mappings.Type.Mapping.Type.Type))
return mapping.Mappings.Type.Mapping.Type.Type == "keyword", nil
}

View File

@@ -46,14 +46,6 @@ type OpensearchInterfaceImpl struct {
bulkProcessor *Bulk
Platform *platform.PlatformService
// This flag is for indicating if channel index's mappings
// has been verified, and if so, what was the result.
//
// value = 0 indicates it has NOT BEEN CHECKED
// value = 1 indicates index has been checked and has CORRECT mappings
// value = 2 indicates index has been checked and it has INCORRECT mappings
channelIndexVerified int32
}
func getJSONOrErrorStr(obj any) string {
@@ -89,25 +81,7 @@ func (os *OpensearchInterfaceImpl) IsSearchEnabled() bool {
}
func (os *OpensearchInterfaceImpl) IsAutocompletionEnabled() bool {
// if we encounter the index mappings haven't been checked, we check it once and store result.
// While in most cases the flag would have been set in the `Start` function,
// There's a case if you call the update config API and enable ES and autocomplete at the same time, it's not set
// so we're checking if its unset here and trying to check the index.
if atomic.LoadInt32(&os.channelIndexVerified) == 0 {
os.Platform.Log().Debug("IsAutocompletionEnabled: channel index has not been verified yet, checking index now")
os.checkChannelIndex()
}
return *os.Platform.Config().ElasticsearchSettings.EnableAutocomplete && atomic.LoadInt32(&os.channelIndexVerified) == 1
}
func (os *OpensearchInterfaceImpl) IsChannelsIndexVerified() bool {
if atomic.LoadInt32(&os.channelIndexVerified) == 0 {
os.Platform.Log().Debug("OpenSearch.IsChannelsIndexVerified: channel index has not been verified yet, checking index now")
os.checkChannelIndex()
}
return atomic.LoadInt32(&os.channelIndexVerified) == 1
return *os.Platform.Config().ElasticsearchSettings.EnableAutocomplete
}
func (os *OpensearchInterfaceImpl) IsIndexingSync() bool {
@@ -213,10 +187,6 @@ func (os *OpensearchInterfaceImpl) Start() *model.AppError {
return model.NewAppError("Opensearch.start", "ent.elasticsearch.create_template_file_info_if_not_exists.template_create_failed", map[string]any{"Backend": model.ElasticsearchSettingsOSBackend}, "", http.StatusInternalServerError).Wrap(err)
}
if atomic.LoadInt32(&os.channelIndexVerified) == 0 {
os.checkChannelIndex()
}
atomic.StoreInt32(&os.ready, 1)
return nil
@@ -2025,88 +1995,3 @@ func checkMaxVersion(client *opensearchapi.Client) (string, int, *model.AppError
}
return resp.Version.Number, major, nil
}
// checkChannelIndex checks if channel index's mapping is correct.
// See Jira issue https://mattermost.atlassian.net/browse/MM-49257
func (os *OpensearchInterfaceImpl) checkChannelIndex() {
os.Platform.Log().Debug("Opensearch.checkChannelIndex: checking if channel index field is of correct type")
isCorrect, err := os.isFieldCorrect()
if err != nil {
return
}
if isCorrect {
os.Platform.Log().Debug("Opensearch.checkChannelIndex: channel index field is correct")
atomic.StoreInt32(&os.channelIndexVerified, 1)
} else {
os.Platform.Log().Debug("Opensearch.checkChannelIndex: channel index field is incorrect")
atomic.StoreInt32(&os.channelIndexVerified, 2)
}
}
func (os *OpensearchInterfaceImpl) isFieldCorrect() (bool, error) {
// We want to check if channel index's "type" field is of type "keyword".
// If the index is in incorrect state, the field would be of type "text".
os.Platform.Log().Debug("Opensearch.isFieldCorrect: querying ES to check if field is correct")
ctx, cancel := context.WithTimeout(
context.Background(),
time.Duration(*os.Platform.Config().ElasticsearchSettings.RequestTimeoutSeconds)*time.Second,
)
defer cancel()
var mappingFieldResp map[string]struct {
Mappings json.RawMessage `json:"mappings"`
}
indexName := *os.Platform.Config().ElasticsearchSettings.IndexPrefix + common.IndexBaseChannels
httpResp, err := os.client.Client.Do(ctx, &opensearchapi.MappingFieldReq{
Fields: []string{"type"},
Indices: []string{indexName},
}, &mappingFieldResp)
if err != nil {
os.Platform.Logger().Error("Opensearch: Failed to fetch channels index template", mlog.Err(err))
return false, err
}
// The case of channels index not existing is fine,
// as whenever the index will be created, it will be created
// with the correct mappings.
if httpResp != nil && httpResp.StatusCode == http.StatusNotFound {
os.Platform.Logger().Debug("Opensearch isFieldCorrect: channel index doesn't exist", mlog.Err(err))
return true, nil
}
// this struct is declared here because
// it's not used anywhere outside this function
type channelsTypeFieldMapping struct {
Mappings struct {
Type struct {
Mapping struct {
Type struct {
Type string
}
}
}
}
}
mappingInterface := mappingFieldResp[indexName]
mappingBytes, err := json.Marshal(mappingInterface)
if err != nil {
os.Platform.Logger().Error("Opensearch: Failed to marshal Opensearch index field mapping", mlog.Err(err))
return false, err
}
os.Platform.Log().Debug("Opensearch.isFieldCorrect: channel index type field mapping queried successfully", mlog.String("mapping", string(mappingBytes)))
var mapping channelsTypeFieldMapping
err = json.Unmarshal(mappingBytes, &mapping)
if err != nil {
os.Platform.Logger().Error("Opensearch: Failed to unmarshal Opensearch index field mapping", mlog.Err(err))
return false, err
}
os.Platform.Logger().Debug("Opensearch: Found type of type field as", mlog.String("type", mapping.Mappings.Type.Mapping.Type.Type))
return mapping.Mappings.Type.Mapping.Type.Type == "keyword", nil
}

View File

@@ -4610,10 +4610,6 @@
"id": "app.channel.delete.app_error",
"translation": "Unable to delete the channel."
},
{
"id": "app.channel.elasticsearch_channel_index.notify_admin.message",
"translation": "Your search channel index schema is out of date. We recommend regenerating your channel index.\nClick the `Rebuild Channels Index` button in the [Elasticsearch page via the System Console]({{.ElasticsearchSection}}) to fix this issue.\nSee the Mattermost changelog for more information."
},
{
"id": "app.channel.get.existing.app_error",
"translation": "Unable to find the existing channel {{.channel_id}}."

View File

@@ -335,7 +335,3 @@ func (b *BleveEngine) UpdateConfig(cfg *model.Config) {
}
b.cfg = cfg
}
func (b *BleveEngine) IsChannelsIndexVerified() bool {
return true
}

View File

@@ -50,5 +50,4 @@ type SearchEngineInterface interface {
PurgeIndexList(rctx request.CTX, indexes []string) *model.AppError
RefreshIndexes(rctx request.CTX) *model.AppError
DataRetentionDeleteIndexes(rctx request.CTX, cutoff time.Time) *model.AppError
IsChannelsIndexVerified() bool
}

View File

@@ -407,24 +407,6 @@ func (_m *SearchEngineInterface) IsAutocompletionEnabled() bool {
return r0
}
// IsChannelsIndexVerified provides a mock function with given fields:
func (_m *SearchEngineInterface) IsChannelsIndexVerified() bool {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for IsChannelsIndexVerified")
}
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// IsEnabled provides a mock function with given fields:
func (_m *SearchEngineInterface) IsEnabled() bool {
ret := _m.Called()