mirror of
https://github.com/mattermost/mattermost.git
synced 2025-02-25 18:55:24 -06:00
[GH-25492] Add the request context to all public methods in server/channels/app/admin.go (#25498)
This commit is contained in:
parent
6e3f0ab928
commit
6cf7115e1f
@ -25,7 +25,7 @@ func getClusterStatus(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
infos := c.App.GetClusterStatus()
|
||||
infos := c.App.GetClusterStatus(c.AppContext)
|
||||
js, err := json.Marshal(infos)
|
||||
if err != nil {
|
||||
c.Err = model.NewAppError("getClusterStatus", "api.marshal_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
||||
|
@ -225,7 +225,7 @@ func testEmail(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
appErr := c.App.TestEmail(c.AppContext.Session().UserId, cfg)
|
||||
appErr := c.App.TestEmail(c.AppContext, c.AppContext.Session().UserId, cfg)
|
||||
if appErr != nil {
|
||||
c.Err = appErr
|
||||
return
|
||||
@ -252,7 +252,7 @@ func testSiteURL(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
appErr := c.App.TestSiteURL(siteURL)
|
||||
appErr := c.App.TestSiteURL(c.AppContext, siteURL)
|
||||
if appErr != nil {
|
||||
c.Err = appErr
|
||||
return
|
||||
@ -487,7 +487,7 @@ func getLatestVersion(c *Context, w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
resp, appErr := c.App.GetLatestVersion("https://api.github.com/repos/mattermost/mattermost-server/releases/latest")
|
||||
resp, appErr := c.App.GetLatestVersion(c.AppContext, "https://api.github.com/repos/mattermost/mattermost-server/releases/latest")
|
||||
if appErr != nil {
|
||||
c.Err = appErr
|
||||
return
|
||||
|
@ -111,23 +111,23 @@ func AddLocalLogs(logData map[string][]string, s *Server, page, perPage int, ser
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) QueryLogs(c request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
|
||||
return a.Srv().QueryLogs(c, page, perPage, logFilter)
|
||||
func (a *App) QueryLogs(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
|
||||
return a.Srv().QueryLogs(rctx, page, perPage, logFilter)
|
||||
}
|
||||
|
||||
func (a *App) GetLogs(c request.CTX, page, perPage int) ([]string, *model.AppError) {
|
||||
return a.Srv().GetLogs(c, page, perPage)
|
||||
func (a *App) GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.AppError) {
|
||||
return a.Srv().GetLogs(rctx, page, perPage)
|
||||
}
|
||||
|
||||
func (s *Server) GetLogsSkipSend(page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
|
||||
return s.platform.GetLogsSkipSend(page, perPage, logFilter)
|
||||
}
|
||||
|
||||
func (a *App) GetLogsSkipSend(page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
|
||||
func (a *App) GetLogsSkipSend(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
|
||||
return a.Srv().GetLogsSkipSend(page, perPage, logFilter)
|
||||
}
|
||||
|
||||
func (a *App) GetClusterStatus() []*model.ClusterInfo {
|
||||
func (a *App) GetClusterStatus(rctx request.CTX) []*model.ClusterInfo {
|
||||
infos := make([]*model.ClusterInfo, 0)
|
||||
|
||||
if a.Cluster() != nil {
|
||||
@ -145,18 +145,18 @@ func (s *Server) InvalidateAllCachesSkipSend() {
|
||||
s.platform.InvalidateAllCachesSkipSend()
|
||||
}
|
||||
|
||||
func (a *App) RecycleDatabaseConnection(c request.CTX) {
|
||||
c.Logger().Info("Attempting to recycle database connections.")
|
||||
func (a *App) RecycleDatabaseConnection(rctx request.CTX) {
|
||||
rctx.Logger().Info("Attempting to recycle database connections.")
|
||||
|
||||
// This works by setting 10 seconds as the max conn lifetime for all DB connections.
|
||||
// This allows in gradually closing connections as they expire. In future, we can think
|
||||
// of exposing this as a param from the REST api.
|
||||
a.Srv().Store().RecycleDBConnections(10 * time.Second)
|
||||
|
||||
c.Logger().Info("Finished recycling database connections.")
|
||||
rctx.Logger().Info("Finished recycling database connections.")
|
||||
}
|
||||
|
||||
func (a *App) TestSiteURL(siteURL string) *model.AppError {
|
||||
func (a *App) TestSiteURL(rctx request.CTX, siteURL string) *model.AppError {
|
||||
url := fmt.Sprintf("%s/api/v4/system/ping", siteURL)
|
||||
res, err := http.Get(url)
|
||||
if err != nil || res.StatusCode != 200 {
|
||||
@ -170,7 +170,7 @@ func (a *App) TestSiteURL(siteURL string) *model.AppError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) TestEmail(userID string, cfg *model.Config) *model.AppError {
|
||||
func (a *App) TestEmail(rctx request.CTX, userID string, cfg *model.Config) *model.AppError {
|
||||
if *cfg.EmailSettings.SMTPServer == "" {
|
||||
return model.NewAppError("testEmail", "api.admin.test_email.missing_server", nil, i18n.T("api.context.invalid_param.app_error", map[string]any{"Name": "SMTPServer"}), http.StatusBadRequest)
|
||||
}
|
||||
@ -201,7 +201,7 @@ func (a *App) TestEmail(userID string, cfg *model.Config) *model.AppError {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) GetLatestVersion(latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError) {
|
||||
func (a *App) GetLatestVersion(rctx request.CTX, latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError) {
|
||||
var cachedLatestVersion *model.GithubReleaseInfo
|
||||
if cacheErr := latestVersionCache.Get("latest_version_cache", &cachedLatestVersion); cacheErr == nil {
|
||||
return cachedLatestVersion, nil
|
||||
@ -237,6 +237,6 @@ func (a *App) GetLatestVersion(latestVersionUrl string) (*model.GithubReleaseInf
|
||||
return releaseInfoResponse, nil
|
||||
}
|
||||
|
||||
func (a *App) ClearLatestVersionCache() {
|
||||
func (a *App) ClearLatestVersionCache(rctx request.CTX) {
|
||||
latestVersionCache.Remove("latest_version_cache")
|
||||
}
|
||||
|
@ -37,13 +37,13 @@ func TestGetLatestVersion(t *testing.T) {
|
||||
defer ts.Close()
|
||||
|
||||
t.Run("get latest mm version happy path", func(t *testing.T) {
|
||||
_, err := th.App.GetLatestVersion(ts.URL)
|
||||
_, err := th.App.GetLatestVersion(th.Context, ts.URL)
|
||||
require.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("get latest mm version from cache", func(t *testing.T) {
|
||||
th.App.ClearLatestVersionCache()
|
||||
originalResult, err := th.App.GetLatestVersion(ts.URL)
|
||||
th.App.ClearLatestVersionCache(th.Context)
|
||||
originalResult, err := th.App.GetLatestVersion(th.Context, ts.URL)
|
||||
require.Nil(t, err)
|
||||
|
||||
// Call same function but mock the GET request to return a different result.
|
||||
@ -66,14 +66,14 @@ func TestGetLatestVersion(t *testing.T) {
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
cachedResult, err := th.App.GetLatestVersion(updatedServer.URL)
|
||||
cachedResult, err := th.App.GetLatestVersion(th.Context, updatedServer.URL)
|
||||
require.Nil(t, err)
|
||||
|
||||
require.Equal(t, originalResult.TagName, cachedResult.TagName, "did not get cached result")
|
||||
})
|
||||
|
||||
t.Run("get latest mm version error from external", func(t *testing.T) {
|
||||
th.App.ClearLatestVersionCache()
|
||||
th.App.ClearLatestVersionCache(th.Context)
|
||||
errorServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(`
|
||||
@ -84,7 +84,7 @@ func TestGetLatestVersion(t *testing.T) {
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
_, appErr := th.App.GetLatestVersion(errorServer.URL)
|
||||
_, appErr := th.App.GetLatestVersion(th.Context, errorServer.URL)
|
||||
require.NotNil(t, appErr)
|
||||
})
|
||||
}
|
||||
|
@ -459,7 +459,7 @@ type AppIface interface {
|
||||
CheckUserPreflightAuthenticationCriteria(rctx request.CTX, user *model.User, mfaToken string) *model.AppError
|
||||
CheckWebConn(userID, connectionID string) *platform.CheckConnResult
|
||||
ClearChannelMembersCache(c request.CTX, channelID string) error
|
||||
ClearLatestVersionCache()
|
||||
ClearLatestVersionCache(rctx request.CTX)
|
||||
ClearSessionCacheForAllUsers()
|
||||
ClearSessionCacheForAllUsersSkipClusterSend()
|
||||
ClearSessionCacheForUser(userID string)
|
||||
@ -630,7 +630,7 @@ type AppIface interface {
|
||||
GetChannelsUserNotIn(c request.CTX, teamID string, userID string, offset int, limit int) (model.ChannelList, *model.AppError)
|
||||
GetCloudSession(token string) (*model.Session, *model.AppError)
|
||||
GetClusterId() string
|
||||
GetClusterStatus() []*model.ClusterInfo
|
||||
GetClusterStatus(rctx request.CTX) []*model.ClusterInfo
|
||||
GetCommand(commandID string) (*model.Command, *model.AppError)
|
||||
GetCommonTeamIDsForTwoUsers(userID, otherUserID string) ([]string, *model.AppError)
|
||||
GetComplianceFile(job *model.Compliance) ([]byte, *model.AppError)
|
||||
@ -684,9 +684,9 @@ type AppIface interface {
|
||||
GetJobsByTypes(c request.CTX, jobTypes []string, offset int, limit int) ([]*model.Job, *model.AppError)
|
||||
GetJobsByTypesPage(c request.CTX, jobType []string, page int, perPage int) ([]*model.Job, *model.AppError)
|
||||
GetLatestTermsOfService() (*model.TermsOfService, *model.AppError)
|
||||
GetLatestVersion(latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError)
|
||||
GetLogs(c request.CTX, page, perPage int) ([]string, *model.AppError)
|
||||
GetLogsSkipSend(page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError)
|
||||
GetLatestVersion(rctx request.CTX, latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError)
|
||||
GetLogs(rctx request.CTX, page, perPage int) ([]string, *model.AppError)
|
||||
GetLogsSkipSend(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError)
|
||||
GetMemberCountsByGroup(rctx request.CTX, channelID string, includeTimezones bool) ([]*model.ChannelMemberCountByGroup, *model.AppError)
|
||||
GetMessageForNotification(post *model.Post, translateFunc i18n.TranslateFunc) string
|
||||
GetMultipleEmojiByName(c request.CTX, names []string) ([]*model.Emoji, *model.AppError)
|
||||
@ -954,9 +954,9 @@ type AppIface interface {
|
||||
PublishUserTyping(userID, channelID, parentId string) *model.AppError
|
||||
PurgeBleveIndexes() *model.AppError
|
||||
PurgeElasticsearchIndexes() *model.AppError
|
||||
QueryLogs(c request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError)
|
||||
QueryLogs(rctx request.CTX, page, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError)
|
||||
ReadFile(path string) ([]byte, *model.AppError)
|
||||
RecycleDatabaseConnection(c request.CTX)
|
||||
RecycleDatabaseConnection(rctx request.CTX)
|
||||
RegenCommandToken(cmd *model.Command) (*model.Command, *model.AppError)
|
||||
RegenOutgoingWebhookToken(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, *model.AppError)
|
||||
RegenerateOAuthAppSecret(app *model.OAuthApp) (*model.OAuthApp, *model.AppError)
|
||||
@ -1097,11 +1097,11 @@ type AppIface interface {
|
||||
TeamMembersToRemove(teamID *string) ([]*model.TeamMember, *model.AppError)
|
||||
TelemetryId() string
|
||||
TestElasticsearch(cfg *model.Config) *model.AppError
|
||||
TestEmail(userID string, cfg *model.Config) *model.AppError
|
||||
TestEmail(rctx request.CTX, userID string, cfg *model.Config) *model.AppError
|
||||
TestFileStoreConnection() *model.AppError
|
||||
TestFileStoreConnectionWithConfig(cfg *model.FileSettings) *model.AppError
|
||||
TestLdap(rctx request.CTX) *model.AppError
|
||||
TestSiteURL(siteURL string) *model.AppError
|
||||
TestSiteURL(rctx request.CTX, siteURL string) *model.AppError
|
||||
Timezones() *timezones.Timezones
|
||||
ToggleMuteChannel(c request.CTX, channelID, userID string) (*model.ChannelMember, *model.AppError)
|
||||
TotalWebsocketConnections() int
|
||||
|
@ -1460,7 +1460,7 @@ func (a *OpenTracingAppLayer) ClearChannelMembersCache(c request.CTX, channelID
|
||||
return resultVar0
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) ClearLatestVersionCache() {
|
||||
func (a *OpenTracingAppLayer) ClearLatestVersionCache(rctx request.CTX) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.ClearLatestVersionCache")
|
||||
|
||||
@ -1472,7 +1472,7 @@ func (a *OpenTracingAppLayer) ClearLatestVersionCache() {
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
a.app.ClearLatestVersionCache()
|
||||
a.app.ClearLatestVersionCache(rctx)
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) ClearSessionCacheForAllUsers() {
|
||||
@ -5844,7 +5844,7 @@ func (a *OpenTracingAppLayer) GetClusterPluginStatuses() (model.PluginStatuses,
|
||||
return resultVar0, resultVar1
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) GetClusterStatus() []*model.ClusterInfo {
|
||||
func (a *OpenTracingAppLayer) GetClusterStatus(rctx request.CTX) []*model.ClusterInfo {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.GetClusterStatus")
|
||||
|
||||
@ -5856,7 +5856,7 @@ func (a *OpenTracingAppLayer) GetClusterStatus() []*model.ClusterInfo {
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0 := a.app.GetClusterStatus()
|
||||
resultVar0 := a.app.GetClusterStatus(rctx)
|
||||
|
||||
return resultVar0
|
||||
}
|
||||
@ -7210,7 +7210,7 @@ func (a *OpenTracingAppLayer) GetLatestTermsOfService() (*model.TermsOfService,
|
||||
return resultVar0, resultVar1
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) GetLatestVersion(latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError) {
|
||||
func (a *OpenTracingAppLayer) GetLatestVersion(rctx request.CTX, latestVersionUrl string) (*model.GithubReleaseInfo, *model.AppError) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.GetLatestVersion")
|
||||
|
||||
@ -7222,7 +7222,7 @@ func (a *OpenTracingAppLayer) GetLatestVersion(latestVersionUrl string) (*model.
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0, resultVar1 := a.app.GetLatestVersion(latestVersionUrl)
|
||||
resultVar0, resultVar1 := a.app.GetLatestVersion(rctx, latestVersionUrl)
|
||||
|
||||
if resultVar1 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar1))
|
||||
@ -7254,7 +7254,7 @@ func (a *OpenTracingAppLayer) GetLdapGroup(rctx request.CTX, ldapGroupID string)
|
||||
return resultVar0, resultVar1
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) GetLogs(c request.CTX, page int, perPage int) ([]string, *model.AppError) {
|
||||
func (a *OpenTracingAppLayer) GetLogs(rctx request.CTX, page int, perPage int) ([]string, *model.AppError) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.GetLogs")
|
||||
|
||||
@ -7266,7 +7266,7 @@ func (a *OpenTracingAppLayer) GetLogs(c request.CTX, page int, perPage int) ([]s
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0, resultVar1 := a.app.GetLogs(c, page, perPage)
|
||||
resultVar0, resultVar1 := a.app.GetLogs(rctx, page, perPage)
|
||||
|
||||
if resultVar1 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar1))
|
||||
@ -7276,7 +7276,7 @@ func (a *OpenTracingAppLayer) GetLogs(c request.CTX, page int, perPage int) ([]s
|
||||
return resultVar0, resultVar1
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) GetLogsSkipSend(page int, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
|
||||
func (a *OpenTracingAppLayer) GetLogsSkipSend(rctx request.CTX, page int, perPage int, logFilter *model.LogFilter) ([]string, *model.AppError) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.GetLogsSkipSend")
|
||||
|
||||
@ -7288,7 +7288,7 @@ func (a *OpenTracingAppLayer) GetLogsSkipSend(page int, perPage int, logFilter *
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0, resultVar1 := a.app.GetLogsSkipSend(page, perPage, logFilter)
|
||||
resultVar0, resultVar1 := a.app.GetLogsSkipSend(rctx, page, perPage, logFilter)
|
||||
|
||||
if resultVar1 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar1))
|
||||
@ -13505,7 +13505,7 @@ func (a *OpenTracingAppLayer) PurgeElasticsearchIndexes() *model.AppError {
|
||||
return resultVar0
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) QueryLogs(c request.CTX, page int, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
|
||||
func (a *OpenTracingAppLayer) QueryLogs(rctx request.CTX, page int, perPage int, logFilter *model.LogFilter) (map[string][]string, *model.AppError) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.QueryLogs")
|
||||
|
||||
@ -13517,7 +13517,7 @@ func (a *OpenTracingAppLayer) QueryLogs(c request.CTX, page int, perPage int, lo
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0, resultVar1 := a.app.QueryLogs(c, page, perPage, logFilter)
|
||||
resultVar0, resultVar1 := a.app.QueryLogs(rctx, page, perPage, logFilter)
|
||||
|
||||
if resultVar1 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar1))
|
||||
@ -13549,7 +13549,7 @@ func (a *OpenTracingAppLayer) ReadFile(path string) ([]byte, *model.AppError) {
|
||||
return resultVar0, resultVar1
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) RecycleDatabaseConnection(c request.CTX) {
|
||||
func (a *OpenTracingAppLayer) RecycleDatabaseConnection(rctx request.CTX) {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.RecycleDatabaseConnection")
|
||||
|
||||
@ -13561,7 +13561,7 @@ func (a *OpenTracingAppLayer) RecycleDatabaseConnection(c request.CTX) {
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
a.app.RecycleDatabaseConnection(c)
|
||||
a.app.RecycleDatabaseConnection(rctx)
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) RegenCommandToken(cmd *model.Command) (*model.Command, *model.AppError) {
|
||||
@ -16855,7 +16855,7 @@ func (a *OpenTracingAppLayer) TestElasticsearch(cfg *model.Config) *model.AppErr
|
||||
return resultVar0
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) TestEmail(userID string, cfg *model.Config) *model.AppError {
|
||||
func (a *OpenTracingAppLayer) TestEmail(rctx request.CTX, userID string, cfg *model.Config) *model.AppError {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.TestEmail")
|
||||
|
||||
@ -16867,7 +16867,7 @@ func (a *OpenTracingAppLayer) TestEmail(userID string, cfg *model.Config) *model
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0 := a.app.TestEmail(userID, cfg)
|
||||
resultVar0 := a.app.TestEmail(rctx, userID, cfg)
|
||||
|
||||
if resultVar0 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar0))
|
||||
@ -16943,7 +16943,7 @@ func (a *OpenTracingAppLayer) TestLdap(rctx request.CTX) *model.AppError {
|
||||
return resultVar0
|
||||
}
|
||||
|
||||
func (a *OpenTracingAppLayer) TestSiteURL(siteURL string) *model.AppError {
|
||||
func (a *OpenTracingAppLayer) TestSiteURL(rctx request.CTX, siteURL string) *model.AppError {
|
||||
origCtx := a.ctx
|
||||
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.TestSiteURL")
|
||||
|
||||
@ -16955,7 +16955,7 @@ func (a *OpenTracingAppLayer) TestSiteURL(siteURL string) *model.AppError {
|
||||
}()
|
||||
|
||||
defer span.Finish()
|
||||
resultVar0 := a.app.TestSiteURL(siteURL)
|
||||
resultVar0 := a.app.TestSiteURL(rctx, siteURL)
|
||||
|
||||
if resultVar0 != nil {
|
||||
span.LogFields(spanlog.Error(resultVar0))
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
type BatchMigrationWorkerAppIFace interface {
|
||||
GetClusterStatus() []*model.ClusterInfo
|
||||
GetClusterStatus(rctx request.CTX) []*model.ClusterInfo
|
||||
}
|
||||
|
||||
// BatchMigrationWorker processes database migration jobs in batches to help avoid table locks.
|
||||
@ -110,11 +110,11 @@ func (worker *BatchMigrationWorker) IsEnabled(_ *model.Config) bool {
|
||||
|
||||
// checkIsClusterInSync returns true if all nodes in the cluster are running the same version,
|
||||
// logging a warning on the first mismatch found.
|
||||
func (worker *BatchMigrationWorker) checkIsClusterInSync() bool {
|
||||
clusterStatus := worker.app.GetClusterStatus()
|
||||
func (worker *BatchMigrationWorker) checkIsClusterInSync(rctx request.CTX) bool {
|
||||
clusterStatus := worker.app.GetClusterStatus(rctx)
|
||||
for i := 1; i < len(clusterStatus); i++ {
|
||||
if clusterStatus[i].SchemaVersion != clusterStatus[0].SchemaVersion {
|
||||
worker.logger.Warn(
|
||||
rctx.Logger().Warn(
|
||||
"Worker: cluster not in sync",
|
||||
mlog.String("schema_version_a", clusterStatus[0].SchemaVersion),
|
||||
mlog.String("schema_version_b", clusterStatus[1].SchemaVersion),
|
||||
@ -172,7 +172,7 @@ func (worker *BatchMigrationWorker) DoJob(job *model.Job) {
|
||||
// Ensure the cluster remains in sync, otherwise we restart the job to
|
||||
// ensure a complete migration. Technically, the cluster could go out of
|
||||
// sync briefly within a batch, but we accept that risk.
|
||||
if !worker.checkIsClusterInSync() {
|
||||
if !worker.checkIsClusterInSync(c) {
|
||||
worker.logger.Warn("Worker: Resetting job")
|
||||
worker.resetJob(logger, job)
|
||||
return
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/mattermost/mattermost/server/public/model"
|
||||
"github.com/mattermost/mattermost/server/public/shared/request"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
||||
"github.com/mattermost/mattermost/server/v8/channels/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -19,7 +20,7 @@ type MockApp struct {
|
||||
clusterInfo []*model.ClusterInfo
|
||||
}
|
||||
|
||||
func (ma MockApp) GetClusterStatus() []*model.ClusterInfo {
|
||||
func (ma MockApp) GetClusterStatus(rctx request.CTX) []*model.ClusterInfo {
|
||||
return ma.clusterInfo
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user