MM-12849 Moving all non request scoped items to Server struct (#9806)

* Moving goroutine pool

* Auto refactor

* Moving plugins.

* Auto refactor

* Moving fields to server

* Auto refactor

* Removing siteurl duplication.

* Moving reset of app fields

* Auto refactor

* Formatting

* Moving niling of Server to after last use

* Fixing unit tests.
This commit is contained in:
Christopher Speller
2018-11-07 10:20:07 -08:00
committed by GitHub
parent 0dcbecac87
commit ecade2f1ec
52 changed files with 388 additions and 385 deletions

View File

@@ -845,7 +845,7 @@ func updateUserActive(c *Context, w http.ResponseWriter, r *http.Request) {
c.LogAuditWithUserId(user.Id, fmt.Sprintf("active=%v", active))
if isSelfDeactive {
c.App.Go(func() {
c.App.Srv.Go(func() {
if err = c.App.SendDeactivateAccountEmail(user.Email, user.Locale, c.App.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}

View File

@@ -139,7 +139,7 @@ func (a *App) InvalidateAllCaches() *model.AppError {
func (a *App) InvalidateAllCachesSkipSend() {
mlog.Info("Purging all caches")
a.sessionCache.Purge()
a.Srv.sessionCache.Purge()
ClearStatusCache()
a.Srv.Store.Channel().ClearCaches()
a.Srv.Store.User().ClearCaches()
@@ -212,8 +212,8 @@ func (a *App) RecycleDatabaseConnection() {
oldStore := a.Srv.Store
mlog.Warn("Attempting to recycle the database connection.")
a.Srv.Store = a.newStore()
a.Jobs.Store = a.Srv.Store
a.Srv.Store = a.Srv.newStore()
a.Srv.Jobs.Store = a.Srv.Store
if a.Srv.Store != oldStore {
time.Sleep(20 * time.Second)

View File

@@ -4,19 +4,15 @@
package app
import (
"crypto/ecdsa"
"fmt"
"html/template"
"net/http"
"path"
"reflect"
"strconv"
"sync"
"sync/atomic"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/throttled/throttled"
"github.com/mattermost/mattermost-server/einterfaces"
ejobs "github.com/mattermost/mattermost-server/einterfaces/jobs"
@@ -24,7 +20,6 @@ import (
tjobs "github.com/mattermost/mattermost-server/jobs/interfaces"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin"
"github.com/mattermost/mattermost-server/services/httpservice"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/store/sqlstore"
@@ -35,26 +30,10 @@ const ADVANCED_PERMISSIONS_MIGRATION_KEY = "AdvancedPermissionsMigrationComplete
const EMOJIS_PERMISSIONS_MIGRATION_KEY = "EmojisPermissionsMigrationComplete"
type App struct {
goroutineCount int32
goroutineExitSignal chan struct{}
Srv *Server
Log *mlog.Logger
Plugins *plugin.Environment
PluginConfigListenerId string
EmailBatching *EmailBatchingJob
EmailRateLimiter *throttled.GCRARateLimiter
Hubs []*Hub
HubsStopCheckingForDeadlock chan bool
PushNotificationsHub PushNotificationsHub
Jobs *jobs.JobServer
AccountMigration einterfaces.AccountMigrationInterface
Cluster einterfaces.ClusterInterface
Compliance einterfaces.ComplianceInterface
@@ -66,42 +45,6 @@ type App struct {
Mfa einterfaces.MfaInterface
Saml einterfaces.SamlInterface
config atomic.Value
envConfig map[string]interface{}
configFile string
configListeners map[string]func(*model.Config, *model.Config)
clusterLeaderListeners sync.Map
licenseValue atomic.Value
clientLicenseValue atomic.Value
licenseListeners map[string]func()
timezones atomic.Value
siteURL string
newStore func() store.Store
htmlTemplateWatcher *utils.HTMLTemplateWatcher
sessionCache *utils.Cache
configListenerId string
licenseListenerId string
logListenerId string
clusterLeaderListenerId string
disableConfigWatch bool
configWatcher *utils.ConfigWatcher
asymmetricSigningKey *ecdsa.PrivateKey
pluginCommands []*PluginCommand
pluginCommandsLock sync.RWMutex
clientConfig map[string]string
clientConfigHash string
limitedClientConfig map[string]string
diagnosticId string
phase2PermissionsMigrationComplete bool
HTTPService httpservice.HTTPService
}
@@ -118,15 +61,15 @@ func New(options ...Option) (outApp *App, outErr error) {
rootRouter := mux.NewRouter()
app := &App{
goroutineExitSignal: make(chan struct{}, 1),
Srv: &Server{
RootRouter: rootRouter,
goroutineExitSignal: make(chan struct{}, 1),
RootRouter: rootRouter,
configFile: "config.json",
configListeners: make(map[string]func(*model.Config, *model.Config)),
licenseListeners: map[string]func(){},
sessionCache: utils.NewLru(model.SESSION_CACHE_SIZE),
clientConfig: make(map[string]string),
},
sessionCache: utils.NewLru(model.SESSION_CACHE_SIZE),
configFile: "config.json",
configListeners: make(map[string]func(*model.Config, *model.Config)),
clientConfig: make(map[string]string),
licenseListeners: map[string]func(){},
}
app.HTTPService = httpservice.MakeHTTPService(app)
@@ -151,7 +94,7 @@ func New(options ...Option) (outApp *App, outErr error) {
}
model.AppErrorInit(utils.T)
if err := app.LoadConfig(app.configFile); err != nil {
if err := app.LoadConfig(app.Srv.configFile); err != nil {
return nil, err
}
@@ -164,7 +107,7 @@ func New(options ...Option) (outApp *App, outErr error) {
// Use this app logger as the global logger (eventually remove all instances of global logging)
mlog.InitGlobalLogger(app.Log)
app.logListenerId = app.AddConfigListener(func(_, after *model.Config) {
app.Srv.logListenerId = app.AddConfigListener(func(_, after *model.Config) {
app.Log.ChangeLevels(utils.MloggerConfigFromLoggerConfig(&after.LogSettings))
})
@@ -176,22 +119,22 @@ func New(options ...Option) (outApp *App, outErr error) {
return nil, errors.Wrapf(err, "unable to load Mattermost translation files")
}
app.configListenerId = app.AddConfigListener(func(_, _ *model.Config) {
app.Srv.configListenerId = app.AddConfigListener(func(_, _ *model.Config) {
app.configOrLicenseListener()
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CONFIG_CHANGED, "", "", "", nil)
message.Add("config", app.ClientConfigWithComputed())
app.Go(func() {
app.Srv.Go(func() {
app.Publish(message)
})
})
app.licenseListenerId = app.AddLicenseListener(func() {
app.Srv.licenseListenerId = app.AddLicenseListener(func() {
app.configOrLicenseListener()
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LICENSE_CHANGED, "", "", "", nil)
message.Add("license", app.GetSanitizedClientLicense())
app.Go(func() {
app.Srv.Go(func() {
app.Publish(message)
})
@@ -205,8 +148,8 @@ func New(options ...Option) (outApp *App, outErr error) {
app.initEnterprise()
if app.newStore == nil {
app.newStore = func() store.Store {
if app.Srv.newStore == nil {
app.Srv.newStore = func() store.Store {
return store.NewLayeredStore(sqlstore.NewSqlSupplier(app.Config().SqlSettings, app.Metrics), app.Metrics, app.Cluster)
}
}
@@ -214,10 +157,10 @@ func New(options ...Option) (outApp *App, outErr error) {
if htmlTemplateWatcher, err := utils.NewHTMLTemplateWatcher("templates"); err != nil {
mlog.Error(fmt.Sprintf("Failed to parse server templates %v", err))
} else {
app.htmlTemplateWatcher = htmlTemplateWatcher
app.Srv.htmlTemplateWatcher = htmlTemplateWatcher
}
app.Srv.Store = app.newStore()
app.Srv.Store = app.Srv.newStore()
if err := app.ensureAsymmetricSigningKey(); err != nil {
return nil, errors.Wrapf(err, "unable to ensure asymmetric signing key")
@@ -235,9 +178,9 @@ func New(options ...Option) (outApp *App, outErr error) {
app.initJobs()
})
app.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() {
app.Srv.clusterLeaderListenerId = app.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if job schedulers should be running:", mlog.Bool("isLeader", app.IsLeader()))
app.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader())
app.Srv.Jobs.Schedulers.HandleClusterLeaderChange(app.IsLeader())
})
subpath, err := utils.GetSubpathFromConfig(app.Config())
@@ -279,26 +222,26 @@ func (a *App) Shutdown() {
a.StopPushNotificationsHubWorkers()
a.ShutDownPlugins()
a.WaitForGoroutines()
a.Srv.WaitForGoroutines()
if a.Srv.Store != nil {
a.Srv.Store.Close()
}
a.Srv = nil
if a.htmlTemplateWatcher != nil {
a.htmlTemplateWatcher.Close()
if a.Srv.htmlTemplateWatcher != nil {
a.Srv.htmlTemplateWatcher.Close()
}
a.RemoveConfigListener(a.configListenerId)
a.RemoveLicenseListener(a.licenseListenerId)
a.RemoveConfigListener(a.logListenerId)
a.RemoveClusterLeaderChangedListener(a.clusterLeaderListenerId)
a.RemoveConfigListener(a.Srv.configListenerId)
a.RemoveLicenseListener(a.Srv.licenseListenerId)
a.RemoveConfigListener(a.Srv.logListenerId)
a.RemoveClusterLeaderChangedListener(a.Srv.clusterLeaderListenerId)
mlog.Info("Server stopped")
a.DisableConfigWatch()
a.HTTPService.Close()
a.Srv = nil
}
var accountMigrationInterface func(*App) einterfaces.AccountMigrationInterface
@@ -439,39 +382,39 @@ func (a *App) initEnterprise() {
}
func (a *App) initJobs() {
a.Jobs = jobs.NewJobServer(a, a.Srv.Store)
a.Srv.Jobs = jobs.NewJobServer(a, a.Srv.Store)
if jobsDataRetentionJobInterface != nil {
a.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a)
a.Srv.Jobs.DataRetentionJob = jobsDataRetentionJobInterface(a)
}
if jobsMessageExportJobInterface != nil {
a.Jobs.MessageExportJob = jobsMessageExportJobInterface(a)
a.Srv.Jobs.MessageExportJob = jobsMessageExportJobInterface(a)
}
if jobsElasticsearchAggregatorInterface != nil {
a.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a)
a.Srv.Jobs.ElasticsearchAggregator = jobsElasticsearchAggregatorInterface(a)
}
if jobsElasticsearchIndexerInterface != nil {
a.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a)
a.Srv.Jobs.ElasticsearchIndexer = jobsElasticsearchIndexerInterface(a)
}
if jobsLdapSyncInterface != nil {
a.Jobs.LdapSync = jobsLdapSyncInterface(a)
a.Srv.Jobs.LdapSync = jobsLdapSyncInterface(a)
}
if jobsMigrationsInterface != nil {
a.Jobs.Migrations = jobsMigrationsInterface(a)
a.Srv.Jobs.Migrations = jobsMigrationsInterface(a)
}
a.Jobs.Workers = a.Jobs.InitWorkers()
a.Jobs.Schedulers = a.Jobs.InitSchedulers()
a.Srv.Jobs.Workers = a.Srv.Jobs.InitWorkers()
a.Srv.Jobs.Schedulers = a.Srv.Jobs.InitSchedulers()
}
func (a *App) DiagnosticId() string {
return a.diagnosticId
return a.Srv.diagnosticId
}
func (a *App) SetDiagnosticId(id string) {
a.diagnosticId = id
a.Srv.diagnosticId = id
}
func (a *App) EnsureDiagnosticId() {
if a.diagnosticId != "" {
if a.Srv.diagnosticId != "" {
return
}
if result := <-a.Srv.Store.System().Get(); result.Err == nil {
@@ -484,36 +427,13 @@ func (a *App) EnsureDiagnosticId() {
<-a.Srv.Store.System().Save(systemId)
}
a.diagnosticId = id
}
}
// Go creates a goroutine, but maintains a record of it to ensure that execution completes before
// the app is destroyed.
func (a *App) Go(f func()) {
atomic.AddInt32(&a.goroutineCount, 1)
go func() {
f()
atomic.AddInt32(&a.goroutineCount, -1)
select {
case a.goroutineExitSignal <- struct{}{}:
default:
}
}()
}
// WaitForGoroutines blocks until all goroutines created by App.Go exit.
func (a *App) WaitForGoroutines() {
for atomic.LoadInt32(&a.goroutineCount) != 0 {
<-a.goroutineExitSignal
a.Srv.diagnosticId = id
}
}
func (a *App) HTMLTemplates() *template.Template {
if a.htmlTemplateWatcher != nil {
return a.htmlTemplateWatcher.Templates()
if a.Srv.htmlTemplateWatcher != nil {
return a.Srv.htmlTemplateWatcher.Templates()
}
return nil
@@ -596,7 +516,7 @@ func (a *App) SetPhase2PermissionsMigrationStatus(isComplete bool) error {
return res.Err
}
}
a.phase2PermissionsMigrationComplete = isComplete
a.Srv.phase2PermissionsMigrationComplete = isComplete
return nil
}
@@ -670,7 +590,7 @@ func (a *App) DoEmojisPermissionsMigration() {
}
func (a *App) StartElasticsearch() {
a.Go(func() {
a.Srv.Go(func() {
if err := a.Elasticsearch.Start(); err != nil {
mlog.Error(err.Error())
}
@@ -678,19 +598,19 @@ func (a *App) StartElasticsearch() {
a.AddConfigListener(func(oldConfig *model.Config, newConfig *model.Config) {
if !*oldConfig.ElasticsearchSettings.EnableIndexing && *newConfig.ElasticsearchSettings.EnableIndexing {
a.Go(func() {
a.Srv.Go(func() {
if err := a.Elasticsearch.Start(); err != nil {
mlog.Error(err.Error())
}
})
} else if *oldConfig.ElasticsearchSettings.EnableIndexing && !*newConfig.ElasticsearchSettings.EnableIndexing {
a.Go(func() {
a.Srv.Go(func() {
if err := a.Elasticsearch.Stop(); err != nil {
mlog.Error(err.Error())
}
})
} else if *oldConfig.ElasticsearchSettings.Password != *newConfig.ElasticsearchSettings.Password || *oldConfig.ElasticsearchSettings.Username != *newConfig.ElasticsearchSettings.Username || *oldConfig.ElasticsearchSettings.ConnectionUrl != *newConfig.ElasticsearchSettings.ConnectionUrl || *oldConfig.ElasticsearchSettings.Sniff != *newConfig.ElasticsearchSettings.Sniff {
a.Go(func() {
a.Srv.Go(func() {
if *oldConfig.ElasticsearchSettings.EnableIndexing {
if err := a.Elasticsearch.Stop(); err != nil {
mlog.Error(err.Error())
@@ -705,13 +625,13 @@ func (a *App) StartElasticsearch() {
a.AddLicenseListener(func() {
if a.License() != nil {
a.Go(func() {
a.Srv.Go(func() {
if err := a.Elasticsearch.Start(); err != nil {
mlog.Error(err.Error())
}
})
} else {
a.Go(func() {
a.Srv.Go(func() {
if err := a.Elasticsearch.Stop(); err != nil {
mlog.Error(err.Error())
}

View File

@@ -246,8 +246,12 @@ func TestDoAdvancedPermissionsMigration(t *testing.T) {
restrictPrivateChannel := *th.App.Config().TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement
defer func() {
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPublicChannelManagement = restrictPublicChannel })
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement = restrictPrivateChannel })
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPublicChannelManagement = restrictPublicChannel
})
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.TeamSettings.DEPRECATED_DO_NOT_USE_RestrictPrivateChannelManagement = restrictPrivateChannel
})
}()
th.App.UpdateConfig(func(cfg *model.Config) {
@@ -433,8 +437,8 @@ func TestDoAdvancedPermissionsMigration(t *testing.T) {
postEditTimeLimit := *th.App.Config().ServiceSettings.PostEditTimeLimit
defer func() {
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.DEPRECATED_DO_NOT_USE_AllowEditPost = allowEditPost})
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.PostEditTimeLimit = postEditTimeLimit})
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.DEPRECATED_DO_NOT_USE_AllowEditPost = allowEditPost })
th.App.UpdateConfig(func(cfg *model.Config) { *cfg.ServiceSettings.PostEditTimeLimit = postEditTimeLimit })
}()
th.App.UpdateConfig(func(cfg *model.Config) {

View File

@@ -212,9 +212,9 @@ func (a *App) CreateChannel(channel *model.Channel, addMember bool) (*model.Chan
}
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.ChannelHasBeenCreated(pluginContext, sc)
return true
}, plugin.ChannelHasBeenCreatedId)
@@ -239,9 +239,9 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha
a.InvalidateCacheForUser(otherUserId)
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.ChannelHasBeenCreated(pluginContext, channel)
return true
}, plugin.ChannelHasBeenCreatedId)
@@ -854,9 +854,9 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques
}
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasJoinedChannel(pluginContext, cm, userRequestor)
return true
}, plugin.UserHasJoinedChannelId)
@@ -866,7 +866,7 @@ func (a *App) AddChannelMember(userId string, channel *model.Channel, userReques
if userRequestorId == "" || userId == userRequestorId {
a.postJoinChannelMessage(user, channel)
} else {
a.Go(func() {
a.Srv.Go(func() {
a.PostAddToChannelMessage(userRequestor, user, channel, postRootId)
})
}
@@ -1244,9 +1244,9 @@ func (a *App) JoinChannel(channel *model.Channel, userId string) *model.AppError
}
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasJoinedChannel(pluginContext, cm, nil)
return true
}, plugin.UserHasJoinedChannelId)
@@ -1336,7 +1336,7 @@ func (a *App) LeaveChannel(channelId string, userId string) *model.AppError {
return nil
}
a.Go(func() {
a.Srv.Go(func() {
a.postLeaveChannelMessage(user, channel)
})
@@ -1451,9 +1451,9 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
actorUser, _ = a.GetUser(removerUserId)
}
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasLeftChannel(pluginContext, cm, actorUser)
return true
}, plugin.UserHasLeftChannelId)
@@ -1489,7 +1489,7 @@ func (a *App) RemoveUserFromChannel(userIdToRemove string, removerUserId string,
if userIdToRemove == removerUserId {
a.postLeaveChannelMessage(user, channel)
} else {
a.Go(func() {
a.Srv.Go(func() {
a.postRemoveFromChannelMessage(removerUserId, user, channel)
})
}

View File

@@ -13,19 +13,19 @@ import (
// be called.
func (a *App) AddClusterLeaderChangedListener(listener func()) string {
id := model.NewId()
a.clusterLeaderListeners.Store(id, listener)
a.Srv.clusterLeaderListeners.Store(id, listener)
return id
}
// Removes a listener function by the unique ID returned when AddConfigListener was called
func (a *App) RemoveClusterLeaderChangedListener(id string) {
a.clusterLeaderListeners.Delete(id)
a.Srv.clusterLeaderListeners.Delete(id)
}
func (a *App) InvokeClusterLeaderChangedListeners() {
mlog.Info("Cluster leader changed. Invoking ClusterLeaderChanged listeners.")
a.Go(func() {
a.clusterLeaderListeners.Range(func(_, listener interface{}) bool {
a.Srv.Go(func() {
a.Srv.clusterLeaderListeners.Range(func(_, listener interface{}) bool {
listener.(func())()
return true
})

View File

@@ -78,7 +78,7 @@ func (me *EchoProvider) DoCommand(a *App, args *model.CommandArgs, message strin
}
echoSem <- true
a.Go(func() {
a.Srv.Go(func() {
defer func() { <-echoSem }()
post := &model.Post{}
post.ChannelId = args.ChannelId

View File

@@ -36,7 +36,7 @@ func (a *App) SaveComplianceReport(job *model.Compliance) (*model.Compliance, *m
}
job = result.Data.(*model.Compliance)
a.Go(func() {
a.Srv.Go(func() {
a.Compliance.RunComplianceJob(job)
})

View File

@@ -28,15 +28,15 @@ const (
)
func (a *App) Config() *model.Config {
if cfg := a.config.Load(); cfg != nil {
if cfg := a.Srv.config.Load(); cfg != nil {
return cfg.(*model.Config)
}
return &model.Config{}
}
func (a *App) EnvironmentConfig() map[string]interface{} {
if a.envConfig != nil {
return a.envConfig
if a.Srv.envConfig != nil {
return a.Srv.envConfig
}
return map[string]interface{}{}
}
@@ -45,7 +45,7 @@ func (a *App) UpdateConfig(f func(*model.Config)) {
old := a.Config()
updated := old.Clone()
f(updated)
a.config.Store(updated)
a.Srv.config.Store(updated)
a.InvokeConfigListeners(old, updated)
}
@@ -62,11 +62,10 @@ func (a *App) LoadConfig(configFile string) *model.AppError {
return err
}
*cfg.ServiceSettings.SiteURL = strings.TrimRight(*cfg.ServiceSettings.SiteURL, "/")
a.config.Store(cfg)
a.Srv.config.Store(cfg)
a.configFile = configPath
a.envConfig = envConfig
a.siteURL = *cfg.ServiceSettings.SiteURL
a.Srv.configFile = configPath
a.Srv.envConfig = envConfig
a.InvokeConfigListeners(old, cfg)
return nil
@@ -74,7 +73,7 @@ func (a *App) LoadConfig(configFile string) *model.AppError {
func (a *App) ReloadConfig() *model.AppError {
debug.FreeOSMemory()
if err := a.LoadConfig(a.configFile); err != nil {
if err := a.LoadConfig(a.Srv.configFile); err != nil {
return err
}
@@ -84,37 +83,37 @@ func (a *App) ReloadConfig() *model.AppError {
}
func (a *App) ConfigFileName() string {
return a.configFile
return a.Srv.configFile
}
func (a *App) ClientConfig() map[string]string {
return a.clientConfig
return a.Srv.clientConfig
}
func (a *App) ClientConfigHash() string {
return a.clientConfigHash
return a.Srv.clientConfigHash
}
func (a *App) LimitedClientConfig() map[string]string {
return a.limitedClientConfig
return a.Srv.limitedClientConfig
}
func (a *App) EnableConfigWatch() {
if a.configWatcher == nil && !a.disableConfigWatch {
if a.Srv.configWatcher == nil && !a.Srv.disableConfigWatch {
configWatcher, err := utils.NewConfigWatcher(a.ConfigFileName(), func() {
a.ReloadConfig()
})
if err != nil {
mlog.Error(fmt.Sprint(err))
}
a.configWatcher = configWatcher
a.Srv.configWatcher = configWatcher
}
}
func (a *App) DisableConfigWatch() {
if a.configWatcher != nil {
a.configWatcher.Close()
a.configWatcher = nil
if a.Srv.configWatcher != nil {
a.Srv.configWatcher.Close()
a.Srv.configWatcher = nil
}
}
@@ -123,17 +122,17 @@ func (a *App) DisableConfigWatch() {
// for the listener that can later be used to remove it.
func (a *App) AddConfigListener(listener func(*model.Config, *model.Config)) string {
id := model.NewId()
a.configListeners[id] = listener
a.Srv.configListeners[id] = listener
return id
}
// Removes a listener function by the unique ID returned when AddConfigListener was called
func (a *App) RemoveConfigListener(id string) {
delete(a.configListeners, id)
delete(a.Srv.configListeners, id)
}
func (a *App) InvokeConfigListeners(old, current *model.Config) {
for _, listener := range a.configListeners {
for _, listener := range a.Srv.configListeners {
listener(old, current)
}
}
@@ -141,7 +140,7 @@ func (a *App) InvokeConfigListeners(old, current *model.Config) {
// EnsureAsymmetricSigningKey ensures that an asymmetric signing key exists and future calls to
// AsymmetricSigningKey will always return a valid signing key.
func (a *App) ensureAsymmetricSigningKey() error {
if a.asymmetricSigningKey != nil {
if a.Srv.asymmetricSigningKey != nil {
return nil
}
@@ -202,7 +201,7 @@ func (a *App) ensureAsymmetricSigningKey() error {
default:
return fmt.Errorf("unknown curve: " + key.ECDSAKey.Curve)
}
a.asymmetricSigningKey = &ecdsa.PrivateKey{
a.Srv.asymmetricSigningKey = &ecdsa.PrivateKey{
PublicKey: ecdsa.PublicKey{
Curve: curve,
X: key.ECDSAKey.X,
@@ -240,31 +239,31 @@ func (a *App) ensureInstallationDate() error {
// AsymmetricSigningKey will return a private key that can be used for asymmetric signing.
func (a *App) AsymmetricSigningKey() *ecdsa.PrivateKey {
return a.asymmetricSigningKey
return a.Srv.asymmetricSigningKey
}
func (a *App) regenerateClientConfig() {
a.clientConfig = utils.GenerateClientConfig(a.Config(), a.DiagnosticId(), a.License())
a.Srv.clientConfig = utils.GenerateClientConfig(a.Config(), a.DiagnosticId(), a.License())
if a.clientConfig["EnableCustomTermsOfService"] == "true" {
if a.Srv.clientConfig["EnableCustomTermsOfService"] == "true" {
termsOfService, err := a.GetLatestTermsOfService()
if err != nil {
mlog.Err(err)
} else {
a.clientConfig["CustomTermsOfServiceId"] = termsOfService.Id
a.Srv.clientConfig["CustomTermsOfServiceId"] = termsOfService.Id
}
}
a.limitedClientConfig = utils.GenerateLimitedClientConfig(a.Config(), a.DiagnosticId(), a.License())
a.Srv.limitedClientConfig = utils.GenerateLimitedClientConfig(a.Config(), a.DiagnosticId(), a.License())
if key := a.AsymmetricSigningKey(); key != nil {
der, _ := x509.MarshalPKIXPublicKey(&key.PublicKey)
a.clientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der)
a.limitedClientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der)
a.Srv.clientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der)
a.Srv.limitedClientConfig["AsymmetricSigningPublicKey"] = base64.StdEncoding.EncodeToString(der)
}
clientConfigJSON, _ := json.Marshal(a.clientConfig)
a.clientConfigHash = fmt.Sprintf("%x", md5.Sum(clientConfigJSON))
clientConfigJSON, _ := json.Marshal(a.Srv.clientConfig)
a.Srv.clientConfigHash = fmt.Sprintf("%x", md5.Sum(clientConfigJSON))
}
func (a *App) Desanitize(cfg *model.Config) {
@@ -322,7 +321,7 @@ func (a *App) GetCookieDomain() string {
}
func (a *App) GetSiteURL() string {
return a.siteURL
return *a.Config().ServiceSettings.SiteURL
}
// ClientConfigWithComputed gets the configuration in a format suitable for sending to the client.

View File

@@ -35,11 +35,12 @@ func TestLoadConfig(t *testing.T) {
require.Nil(t, err)
tempConfig.Close()
a := App{}
a := App{
Srv: &Server{},
}
appErr := a.LoadConfig(tempConfig.Name())
require.Nil(t, appErr)
assert.Equal(t, "http://localhost:8065", a.siteURL)
assert.Equal(t, "http://localhost:8065", *a.GetConfig().ServiceSettings.SiteURL)
}

View File

@@ -590,7 +590,7 @@ func (a *App) trackPlugins() {
settingsCount := 0
pluginStates := a.Config().PluginSettings.PluginStates
plugins, _ := a.Plugins.Available()
plugins, _ := a.Srv.Plugins.Available()
if pluginStates != nil && plugins != nil {
for _, plugin := range plugins {

View File

@@ -42,7 +42,7 @@ func (a *App) SetupInviteEmailRateLimiting() error {
return errors.Wrap(err, "Unable to setup email rate limiting GCRA rate limiter.")
}
a.EmailRateLimiter = rateLimiter
a.Srv.EmailRateLimiter = rateLimiter
return nil
}
@@ -286,11 +286,11 @@ func (a *App) SendMfaChangeEmail(email string, activated bool, locale, siteURL s
}
func (a *App) SendInviteEmails(team *model.Team, senderName string, senderUserId string, invites []string, siteURL string) {
if a.EmailRateLimiter == nil {
if a.Srv.EmailRateLimiter == nil {
a.Log.Error("Email invite not sent, rate limiting could not be setup.", mlog.String("user_id", senderUserId), mlog.String("team_id", team.Id))
return
}
rateLimited, result, err := a.EmailRateLimiter.RateLimit(senderUserId, len(invites))
rateLimited, result, err := a.Srv.EmailRateLimiter.RateLimit(senderUserId, len(invites))
if err != nil {
a.Log.Error("Error rate limiting invite email.", mlog.String("user_id", senderUserId), mlog.String("team_id", team.Id), mlog.Err(err))
return

View File

@@ -25,13 +25,13 @@ const (
func (a *App) InitEmailBatching() {
if *a.Config().EmailSettings.EnableEmailBatching {
if a.EmailBatching == nil {
a.EmailBatching = NewEmailBatchingJob(a, *a.Config().EmailSettings.EmailBatchingBufferSize)
if a.Srv.EmailBatching == nil {
a.Srv.EmailBatching = NewEmailBatchingJob(a, *a.Config().EmailSettings.EmailBatchingBufferSize)
}
// note that we don't support changing EmailBatchingBufferSize without restarting the server
a.EmailBatching.Start()
a.Srv.EmailBatching.Start()
}
}
@@ -40,7 +40,7 @@ func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, te
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented)
}
if !a.EmailBatching.Add(user, post, team) {
if !a.Srv.EmailBatching.Add(user, post, team) {
mlog.Error("Email batching job's receiving channel was full. Please increase the EmailBatchingBufferSize.")
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError)
}
@@ -188,7 +188,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// send the email notification if it's been long enough
if now.Sub(time.Unix(batchStartTime/1000, 0)) > time.Duration(interval)*time.Second {
job.app.Go(func(userId string, notifications []*batchedNotification) func() {
job.app.Srv.Go(func(userId string, notifications []*batchedNotification) func() {
return func() {
handler(userId, notifications)
}

View File

@@ -444,7 +444,7 @@ func (a *App) DoUploadFileExpectModification(now time.Time, rawTeamId string, ra
if a.PluginsReady() {
var rejectionError *model.AppError
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
var newBytes bytes.Buffer
replacementInfo, rejectionReason := hooks.FileWillBeUploaded(pluginContext, info, bytes.NewReader(data), &newBytes)
if rejectionReason != "" {

View File

@@ -40,9 +40,9 @@ func (a *App) GetJobsByType(jobType string, offset int, limit int) ([]*model.Job
}
func (a *App) CreateJob(job *model.Job) (*model.Job, *model.AppError) {
return a.Jobs.CreateJob(job.Type, job.Data)
return a.Srv.Jobs.CreateJob(job.Type, job.Data)
}
func (a *App) CancelJob(jobId string) *model.AppError {
return a.Jobs.RequestCancellation(jobId)
return a.Srv.Jobs.RequestCancellation(jobId)
}

View File

@@ -13,7 +13,7 @@ import (
)
func (a *App) SyncLdap() {
a.Go(func() {
a.Srv.Go(func() {
if license := a.License(); license != nil && *license.Features.LDAP && *a.Config().LdapSettings.EnableSync {
if ldapI := a.Ldap; ldapI != nil {
@@ -67,7 +67,7 @@ func (a *App) SwitchEmailToLdap(email, password, code, ldapLoginId, ldapPassword
return "", err
}
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendSignInChangeEmail(user.Email, "AD/LDAP", user.Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}
@@ -113,7 +113,7 @@ func (a *App) SwitchLdapToEmail(ldapPassword, code, email, newPassword string) (
T := utils.GetUserTranslations(user.Locale)
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}

View File

@@ -93,10 +93,10 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError)
// doesn't start until the server is restarted, which prevents the 'run job now' buttons in system console from
// functioning as expected
if *a.Config().JobSettings.RunJobs {
a.Jobs.StartWorkers()
a.Srv.Jobs.StartWorkers()
}
if *a.Config().JobSettings.RunScheduler {
a.Jobs.StartSchedulers()
a.Srv.Jobs.StartSchedulers()
}
return license, nil
@@ -104,13 +104,13 @@ func (a *App) SaveLicense(licenseBytes []byte) (*model.License, *model.AppError)
// License returns the currently active license or nil if the application is unlicensed.
func (a *App) License() *model.License {
license, _ := a.licenseValue.Load().(*model.License)
license, _ := a.Srv.licenseValue.Load().(*model.License)
return license
}
func (a *App) SetLicense(license *model.License) bool {
defer func() {
for _, listener := range a.licenseListeners {
for _, listener := range a.Srv.licenseListeners {
listener()
}
}()
@@ -119,14 +119,14 @@ func (a *App) SetLicense(license *model.License) bool {
license.Features.SetDefaults()
if !license.IsExpired() {
a.licenseValue.Store(license)
a.clientLicenseValue.Store(utils.GetClientLicense(license))
a.Srv.licenseValue.Store(license)
a.Srv.clientLicenseValue.Store(utils.GetClientLicense(license))
return true
}
}
a.licenseValue.Store((*model.License)(nil))
a.clientLicenseValue.Store(map[string]string(nil))
a.Srv.licenseValue.Store((*model.License)(nil))
a.Srv.clientLicenseValue.Store(map[string]string(nil))
return false
}
@@ -141,18 +141,18 @@ func (a *App) ValidateAndSetLicenseBytes(b []byte) {
}
func (a *App) SetClientLicense(m map[string]string) {
a.clientLicenseValue.Store(m)
a.Srv.clientLicenseValue.Store(m)
}
func (a *App) ClientLicense() map[string]string {
if clientLicense, _ := a.clientLicenseValue.Load().(map[string]string); clientLicense != nil {
if clientLicense, _ := a.Srv.clientLicenseValue.Load().(map[string]string); clientLicense != nil {
return clientLicense
}
return map[string]string{"IsLicensed": "false"}
}
func (a *App) RemoveLicense() *model.AppError {
if license, _ := a.licenseValue.Load().(*model.License); license == nil {
if license, _ := a.Srv.licenseValue.Load().(*model.License); license == nil {
return nil
}
@@ -174,12 +174,12 @@ func (a *App) RemoveLicense() *model.AppError {
func (a *App) AddLicenseListener(listener func()) string {
id := model.NewId()
a.licenseListeners[id] = listener
a.Srv.licenseListeners[id] = listener
return id
}
func (a *App) RemoveLicenseListener(id string) {
delete(a.licenseListeners, id)
delete(a.Srv.licenseListeners, id)
}
func (a *App) GetClientLicenseEtag(useSanitized bool) string {

View File

@@ -69,7 +69,7 @@ func (a *App) AuthenticateUserForLogin(id, loginId, password, mfaToken string, l
if a.PluginsReady() {
var rejectionReason string
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
rejectionReason = hooks.UserWillLogIn(pluginContext, user)
return rejectionReason == ""
}, plugin.UserWillLogInId)
@@ -78,9 +78,9 @@ func (a *App) AuthenticateUserForLogin(id, loginId, password, mfaToken string, l
return nil, model.NewAppError("AuthenticateUserForLogin", "Login rejected by plugin: "+rejectionReason, nil, "", http.StatusBadRequest)
}
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasLoggedIn(pluginContext, user)
return true
}, plugin.UserHasLoggedInId)

View File

@@ -78,7 +78,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
}
if post.Type != model.POST_AUTO_RESPONDER {
a.Go(func() {
a.Srv.Go(func() {
a.SendAutoResponse(channel, otherUser)
})
}
@@ -127,7 +127,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
if result := <-a.Srv.Store.User().GetProfilesByUsernames(m.OtherPotentialMentions, team.Id); result.Err == nil {
outOfChannelMentions := result.Data.([]*model.User)
if channel.Type != model.CHANNEL_GROUP {
a.Go(func() {
a.Srv.Go(func() {
a.sendOutOfChannelMentions(sender, post, outOfChannelMentions)
})
}

View File

@@ -103,7 +103,7 @@ func (a *App) sendNotificationEmail(notification *postNotification, user *model.
teamURL := a.GetSiteURL() + "/" + team.Name
var bodyText = a.getNotificationEmailBody(user, post, channel, channelName, senderName, team.Name, teamURL, emailNotificationContentsType, useMilitaryTime, translateFunc)
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendMail(user.Email, html.UnescapeString(subjectText), bodyText); err != nil {
mlog.Error(fmt.Sprint("Error to send the email", user.Email, err))
}

View File

@@ -132,7 +132,7 @@ func (a *App) sendPushNotification(notification *postNotification, user *model.U
channelName := notification.GetChannelName(nameFormat, user.Id)
senderName := notification.GetSenderName(nameFormat, cfg.ServiceSettings.EnablePostUsernameOverride)
c := a.PushNotificationsHub.GetGoChannelFromUserId(user.Id)
c := a.Srv.PushNotificationsHub.GetGoChannelFromUserId(user.Id)
c <- PushNotification{
notificationType: NOTIFICATION_TYPE_MESSAGE,
post: post,
@@ -217,7 +217,7 @@ func (a *App) ClearPushNotificationSync(userId string, channelId string) {
}
func (a *App) ClearPushNotification(userId string, channelId string) {
channel := a.PushNotificationsHub.GetGoChannelFromUserId(userId)
channel := a.Srv.PushNotificationsHub.GetGoChannelFromUserId(userId)
channel <- PushNotification{
notificationType: NOTIFICATION_TYPE_CLEAR,
userId: userId,
@@ -232,7 +232,7 @@ func (a *App) CreatePushNotificationsHub() {
for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ {
hub.Channels = append(hub.Channels, make(chan PushNotification, PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER))
}
a.PushNotificationsHub = hub
a.Srv.PushNotificationsHub = hub
}
func (a *App) pushNotificationWorker(notifications chan PushNotification) {
@@ -259,13 +259,13 @@ func (a *App) pushNotificationWorker(notifications chan PushNotification) {
func (a *App) StartPushNotificationsHubWorkers() {
for x := 0; x < PUSH_NOTIFICATION_HUB_WORKERS; x++ {
channel := a.PushNotificationsHub.Channels[x]
a.Go(func() { a.pushNotificationWorker(channel) })
channel := a.Srv.PushNotificationsHub.Channels[x]
a.Srv.Go(func() { a.pushNotificationWorker(channel) })
}
}
func (a *App) StopPushNotificationsHubWorkers() {
for _, channel := range a.PushNotificationsHub.Channels {
for _, channel := range a.Srv.PushNotificationsHub.Channels {
close(channel)
}
}

View File

@@ -601,7 +601,7 @@ func (a *App) CompleteSwitchWithOAuth(service string, userData io.ReadCloser, em
return nil, result.Err
}
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendSignInChangeEmail(user.Email, strings.Title(service)+" SSO", user.Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}
@@ -859,7 +859,7 @@ func (a *App) SwitchOAuthToEmail(email, password, requesterId string) (string, *
T := utils.GetUserTranslations(user.Locale)
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendSignInChangeEmail(user.Email, T("api.templates.signin_change_email.body.method_email"), user.Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}

View File

@@ -17,11 +17,11 @@ func StoreOverride(override interface{}) Option {
return func(a *App) {
switch o := override.(type) {
case store.Store:
a.newStore = func() store.Store {
a.Srv.newStore = func() store.Store {
return o
}
case func(*App) store.Store:
a.newStore = func() store.Store {
a.Srv.newStore = func() store.Store {
return o(a)
}
default:
@@ -32,10 +32,10 @@ func StoreOverride(override interface{}) Option {
func ConfigFile(file string) Option {
return func(a *App) {
a.configFile = file
a.Srv.configFile = file
}
}
func DisableConfigWatch(a *App) {
a.disableConfigWatch = true
a.Srv.disableConfigWatch = true
}

View File

@@ -16,21 +16,21 @@ import (
)
func (a *App) SyncPluginsActiveState() {
if a.Plugins == nil {
if a.Srv.Plugins == nil {
return
}
config := a.Config().PluginSettings
if *config.Enable {
availablePlugins, err := a.Plugins.Available()
availablePlugins, err := a.Srv.Plugins.Available()
if err != nil {
a.Log.Error("Unable to get available plugins", mlog.Err(err))
return
}
// Deactivate any plugins that have been disabled.
for _, plugin := range a.Plugins.Active() {
for _, plugin := range a.Srv.Plugins.Active() {
// Determine if plugin is enabled
pluginId := plugin.Manifest.Id
pluginEnabled := false
@@ -40,7 +40,7 @@ func (a *App) SyncPluginsActiveState() {
// If it's not enabled we need to deactivate it
if !pluginEnabled {
deactivated := a.Plugins.Deactivate(pluginId)
deactivated := a.Srv.Plugins.Deactivate(pluginId)
if deactivated && plugin.Manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DISABLED, "", "", "", nil)
message.Add("manifest", plugin.Manifest.ClientManifest())
@@ -65,7 +65,7 @@ func (a *App) SyncPluginsActiveState() {
// Activate plugin if enabled
if pluginEnabled {
updatedManifest, activated, err := a.Plugins.Activate(pluginId)
updatedManifest, activated, err := a.Srv.Plugins.Activate(pluginId)
if err != nil {
plugin.WrapLogger(a.Log).Error("Unable to activate plugin", mlog.Err(err))
continue
@@ -79,7 +79,7 @@ func (a *App) SyncPluginsActiveState() {
}
}
} else { // If plugins are disabled, shutdown plugins.
a.Plugins.Shutdown()
a.Srv.Plugins.Shutdown()
}
if err := a.notifyPluginStatusesChanged(); err != nil {
@@ -92,7 +92,7 @@ func (a *App) NewPluginAPI(manifest *model.Manifest) plugin.API {
}
func (a *App) InitPlugins(pluginDir, webappPluginDir string) {
if a.Plugins != nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins != nil || !*a.Config().PluginSettings.Enable {
a.SyncPluginsActiveState()
return
}
@@ -113,7 +113,7 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) {
mlog.Error("Failed to start up plugins", mlog.Err(err))
return
} else {
a.Plugins = env
a.Srv.Plugins = env
}
prepackagedPluginsDir, found := utils.FindDir("prepackaged_plugins")
@@ -136,10 +136,10 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) {
}
// Sync plugin active state when config changes. Also notify plugins.
a.RemoveConfigListener(a.PluginConfigListenerId)
a.PluginConfigListenerId = a.AddConfigListener(func(*model.Config, *model.Config) {
a.RemoveConfigListener(a.Srv.PluginConfigListenerId)
a.Srv.PluginConfigListenerId = a.AddConfigListener(func(*model.Config, *model.Config) {
a.SyncPluginsActiveState()
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.OnConfigurationChange()
return true
}, plugin.OnConfigurationChangeId)
@@ -150,25 +150,25 @@ func (a *App) InitPlugins(pluginDir, webappPluginDir string) {
}
func (a *App) ShutDownPlugins() {
if a.Plugins == nil {
if a.Srv.Plugins == nil {
return
}
mlog.Info("Shutting down plugins")
a.Plugins.Shutdown()
a.Srv.Plugins.Shutdown()
a.RemoveConfigListener(a.PluginConfigListenerId)
a.PluginConfigListenerId = ""
a.Plugins = nil
a.RemoveConfigListener(a.Srv.PluginConfigListenerId)
a.Srv.PluginConfigListenerId = ""
a.Srv.Plugins = nil
}
func (a *App) GetActivePluginManifests() ([]*model.Manifest, *model.AppError) {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return nil, model.NewAppError("GetActivePluginManifests", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
plugins := a.Plugins.Active()
plugins := a.Srv.Plugins.Active()
manifests := make([]*model.Manifest, len(plugins))
for i, plugin := range plugins {
@@ -181,11 +181,11 @@ func (a *App) GetActivePluginManifests() ([]*model.Manifest, *model.AppError) {
// EnablePlugin will set the config for an installed plugin to enabled, triggering asynchronous
// activation if inactive anywhere in the cluster.
func (a *App) EnablePlugin(id string) *model.AppError {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return model.NewAppError("EnablePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
plugins, err := a.Plugins.Available()
plugins, err := a.Srv.Plugins.Available()
if err != nil {
return model.NewAppError("EnablePlugin", "app.plugin.config.app_error", nil, err.Error(), http.StatusInternalServerError)
}
@@ -221,11 +221,11 @@ func (a *App) EnablePlugin(id string) *model.AppError {
// DisablePlugin will set the config for an installed plugin to disabled, triggering deactivation if active.
func (a *App) DisablePlugin(id string) *model.AppError {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return model.NewAppError("DisablePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
plugins, err := a.Plugins.Available()
plugins, err := a.Srv.Plugins.Available()
if err != nil {
return model.NewAppError("DisablePlugin", "app.plugin.config.app_error", nil, err.Error(), http.StatusInternalServerError)
}
@@ -256,7 +256,7 @@ func (a *App) DisablePlugin(id string) *model.AppError {
}
func (a *App) PluginsReady() bool {
return a.Plugins != nil && *a.Config().PluginSettings.Enable
return a.Srv.Plugins != nil && *a.Config().PluginSettings.Enable
}
func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) {
@@ -264,7 +264,7 @@ func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) {
return nil, model.NewAppError("GetPlugins", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
availablePlugins, err := a.Plugins.Available()
availablePlugins, err := a.Srv.Plugins.Available()
if err != nil {
return nil, model.NewAppError("GetPlugins", "app.plugin.get_plugins.app_error", nil, err.Error(), http.StatusInternalServerError)
}
@@ -278,7 +278,7 @@ func (a *App) GetPlugins() (*model.PluginsResponse, *model.AppError) {
Manifest: *plugin.Manifest,
}
if a.Plugins.IsActive(plugin.Manifest.Id) {
if a.Srv.Plugins.IsActive(plugin.Manifest.Id) {
resp.Active = append(resp.Active, info)
} else {
resp.Inactive = append(resp.Inactive, info)

View File

@@ -33,7 +33,7 @@ func setupPluginApiTest(t *testing.T, pluginCode string, pluginManifest string,
ioutil.WriteFile(filepath.Join(pluginDir, pluginId, "plugin.json"), []byte(pluginManifest), 0600)
env.Activate(pluginId)
app.Plugins = env
app.Srv.Plugins = env
}
func TestPluginAPIUpdateUserStatus(t *testing.T) {
@@ -120,7 +120,7 @@ func TestPluginAPILoadPluginConfiguration(t *testing.T) {
}
]
}}`, "testloadpluginconfig", th.App)
hooks, err := th.App.Plugins.HooksForPlugin("testloadpluginconfig")
hooks, err := th.App.Srv.Plugins.HooksForPlugin("testloadpluginconfig")
assert.NoError(t, err)
_, ret := hooks.MessageWillBePosted(nil, nil)
assert.Equal(t, "str32true", ret)
@@ -194,7 +194,7 @@ func TestPluginAPILoadPluginConfigurationDefaults(t *testing.T) {
}
]
}}`, "testloadpluginconfig", th.App)
hooks, err := th.App.Plugins.HooksForPlugin("testloadpluginconfig")
hooks, err := th.App.Srv.Plugins.HooksForPlugin("testloadpluginconfig")
assert.NoError(t, err)
_, ret := hooks.MessageWillBePosted(nil, nil)
assert.Equal(t, "override35true", ret)

View File

@@ -31,10 +31,10 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err
DisplayName: command.DisplayName,
}
a.pluginCommandsLock.Lock()
defer a.pluginCommandsLock.Unlock()
a.Srv.pluginCommandsLock.Lock()
defer a.Srv.pluginCommandsLock.Unlock()
for _, pc := range a.pluginCommands {
for _, pc := range a.Srv.pluginCommands {
if pc.Command.Trigger == command.Trigger && pc.Command.TeamId == command.TeamId {
if pc.PluginId == pluginId {
pc.Command = command
@@ -43,7 +43,7 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err
}
}
a.pluginCommands = append(a.pluginCommands, &PluginCommand{
a.Srv.pluginCommands = append(a.Srv.pluginCommands, &PluginCommand{
Command: command,
PluginId: pluginId,
})
@@ -53,37 +53,37 @@ func (a *App) RegisterPluginCommand(pluginId string, command *model.Command) err
func (a *App) UnregisterPluginCommand(pluginId, teamId, trigger string) {
trigger = strings.ToLower(trigger)
a.pluginCommandsLock.Lock()
defer a.pluginCommandsLock.Unlock()
a.Srv.pluginCommandsLock.Lock()
defer a.Srv.pluginCommandsLock.Unlock()
var remaining []*PluginCommand
for _, pc := range a.pluginCommands {
for _, pc := range a.Srv.pluginCommands {
if pc.Command.TeamId != teamId || pc.Command.Trigger != trigger {
remaining = append(remaining, pc)
}
}
a.pluginCommands = remaining
a.Srv.pluginCommands = remaining
}
func (a *App) UnregisterPluginCommands(pluginId string) {
a.pluginCommandsLock.Lock()
defer a.pluginCommandsLock.Unlock()
a.Srv.pluginCommandsLock.Lock()
defer a.Srv.pluginCommandsLock.Unlock()
var remaining []*PluginCommand
for _, pc := range a.pluginCommands {
for _, pc := range a.Srv.pluginCommands {
if pc.PluginId != pluginId {
remaining = append(remaining, pc)
}
}
a.pluginCommands = remaining
a.Srv.pluginCommands = remaining
}
func (a *App) PluginCommandsForTeam(teamId string) []*model.Command {
a.pluginCommandsLock.RLock()
defer a.pluginCommandsLock.RUnlock()
a.Srv.pluginCommandsLock.RLock()
defer a.Srv.pluginCommandsLock.RUnlock()
var commands []*model.Command
for _, pc := range a.pluginCommands {
for _, pc := range a.Srv.pluginCommands {
if pc.Command.TeamId == "" || pc.Command.TeamId == teamId {
commands = append(commands, pc.Command)
}
@@ -96,12 +96,12 @@ func (a *App) ExecutePluginCommand(args *model.CommandArgs) (*model.Command, *mo
trigger := parts[0][1:]
trigger = strings.ToLower(trigger)
a.pluginCommandsLock.RLock()
defer a.pluginCommandsLock.RUnlock()
a.Srv.pluginCommandsLock.RLock()
defer a.Srv.pluginCommandsLock.RUnlock()
for _, pc := range a.pluginCommands {
for _, pc := range a.Srv.pluginCommands {
if (pc.Command.TeamId == "" || pc.Command.TeamId == args.TeamId) && pc.Command.Trigger == trigger {
pluginHooks, err := a.Plugins.HooksForPlugin(pc.PluginId)
pluginHooks, err := a.Srv.Plugins.HooksForPlugin(pc.PluginId)
if err != nil {
return pc.Command, nil, model.NewAppError("ExecutePluginCommand", "model.plugin_command.error.app_error", nil, "err="+err.Error(), http.StatusInternalServerError)
}

View File

@@ -44,7 +44,7 @@ func SetAppEnvironmentWithPlugins(t *testing.T, pluginCode []string, app *App, a
env, err := plugin.NewEnvironment(apiFunc, pluginDir, webappPluginDir, app.Log)
require.NoError(t, err)
app.Plugins = env
app.Srv.Plugins = env
pluginIds := []string{}
activationErrors := []error{}
for _, code := range pluginCode {

View File

@@ -22,7 +22,7 @@ func (a *App) InstallPlugin(pluginFile io.Reader, replace bool) (*model.Manifest
}
func (a *App) installPlugin(pluginFile io.Reader, replace bool) (*model.Manifest, *model.AppError) {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return nil, model.NewAppError("installPlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
@@ -55,7 +55,7 @@ func (a *App) installPlugin(pluginFile io.Reader, replace bool) (*model.Manifest
return nil, model.NewAppError("installPlugin", "app.plugin.invalid_id.app_error", map[string]interface{}{"Min": plugin.MinIdLength, "Max": plugin.MaxIdLength, "Regex": plugin.ValidIdRegex}, "", http.StatusBadRequest)
}
bundles, err := a.Plugins.Available()
bundles, err := a.Srv.Plugins.Available()
if err != nil {
return nil, model.NewAppError("installPlugin", "app.plugin.install.app_error", nil, err.Error(), http.StatusInternalServerError)
}
@@ -91,11 +91,11 @@ func (a *App) RemovePlugin(id string) *model.AppError {
}
func (a *App) removePlugin(id string) *model.AppError {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return model.NewAppError("removePlugin", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
plugins, err := a.Plugins.Available()
plugins, err := a.Srv.Plugins.Available()
if err != nil {
return model.NewAppError("removePlugin", "app.plugin.deactivate.app_error", nil, err.Error(), http.StatusBadRequest)
}
@@ -114,13 +114,13 @@ func (a *App) removePlugin(id string) *model.AppError {
return model.NewAppError("removePlugin", "app.plugin.not_installed.app_error", nil, "", http.StatusBadRequest)
}
if a.Plugins.IsActive(id) && manifest.HasClient() {
if a.Srv.Plugins.IsActive(id) && manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DISABLED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
a.Publish(message)
}
a.Plugins.Deactivate(id)
a.Srv.Plugins.Deactivate(id)
a.UnregisterPluginCommands(id)
err = os.RemoveAll(pluginPath)

View File

@@ -19,7 +19,7 @@ import (
)
func (a *App) ServePluginRequest(w http.ResponseWriter, r *http.Request) {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
err := model.NewAppError("ServePluginRequest", "app.plugin.disabled.app_error", nil, "Enable plugins to serve plugin requests", http.StatusNotImplemented)
a.Log.Error(err.Error())
w.WriteHeader(err.StatusCode)
@@ -29,7 +29,7 @@ func (a *App) ServePluginRequest(w http.ResponseWriter, r *http.Request) {
}
params := mux.Vars(r)
hooks, err := a.Plugins.HooksForPlugin(params["plugin_id"])
hooks, err := a.Srv.Plugins.HooksForPlugin(params["plugin_id"])
if err != nil {
a.Log.Error("Access to route for non-existent plugin", mlog.String("missing_plugin_id", params["plugin_id"]), mlog.Err(err))
http.NotFound(w, r)

View File

@@ -11,11 +11,11 @@ import (
// GetPluginStatuses returns the status for plugins installed on this server.
func (a *App) GetPluginStatuses() (model.PluginStatuses, *model.AppError) {
if a.Plugins == nil || !*a.Config().PluginSettings.Enable {
if a.Srv.Plugins == nil || !*a.Config().PluginSettings.Enable {
return nil, model.NewAppError("GetPluginStatuses", "app.plugin.disabled.app_error", nil, "", http.StatusNotImplemented)
}
pluginStatuses, err := a.Plugins.Statuses()
pluginStatuses, err := a.Srv.Plugins.Statuses()
if err != nil {
return nil, model.NewAppError("GetPluginStatuses", "app.plugin.get_statuses.app_error", nil, err.Error(), http.StatusInternalServerError)
}

View File

@@ -150,7 +150,7 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo
if a.PluginsReady() {
var rejectionError *model.AppError
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
replacementPost, rejectionReason := hooks.MessageWillBePosted(pluginContext, post)
if rejectionReason != "" {
rejectionError = model.NewAppError("createPost", "Post rejected by plugin. "+rejectionReason, nil, "", http.StatusBadRequest)
@@ -174,9 +174,9 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo
rpost := result.Data.(*model.Post)
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.MessageHasBeenPosted(pluginContext, rpost)
return true
}, plugin.MessageHasBeenPostedId)
@@ -185,7 +185,7 @@ func (a *App) CreatePost(post *model.Post, channel *model.Channel, triggerWebhoo
esInterface := a.Elasticsearch
if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing {
a.Go(func() {
a.Srv.Go(func() {
esInterface.IndexPost(rpost, channel.TeamId)
})
}
@@ -277,7 +277,7 @@ func (a *App) handlePostEvents(post *model.Post, user *model.User, channel *mode
}
if triggerWebhooks {
a.Go(func() {
a.Srv.Go(func() {
if err := a.handleWebhookEvents(post, team, channel, user); err != nil {
mlog.Error(err.Error())
}
@@ -362,7 +362,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
if a.PluginsReady() {
var rejectionReason string
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
newPost, rejectionReason = hooks.MessageWillBeUpdated(pluginContext, newPost, oldPost)
return post != nil
}, plugin.MessageWillBeUpdatedId)
@@ -378,9 +378,9 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
rpost := result.Data.(*model.Post)
if a.PluginsReady() {
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.MessageHasBeenUpdated(pluginContext, newPost, oldPost)
return true
}, plugin.MessageHasBeenUpdatedId)
@@ -389,7 +389,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
esInterface := a.Elasticsearch
if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing {
a.Go(func() {
a.Srv.Go(func() {
rchannel := <-a.Srv.Store.Channel().GetForPost(rpost.Id)
if rchannel.Err != nil {
mlog.Error(fmt.Sprintf("Couldn't get channel %v for post %v for Elasticsearch indexing.", rpost.ChannelId, rpost.Id))
@@ -567,16 +567,16 @@ func (a *App) DeletePost(postId, deleteByID string) (*model.Post, *model.AppErro
message.Add("post", a.PostWithProxyAddedToImageURLs(post).ToJson())
a.Publish(message)
a.Go(func() {
a.Srv.Go(func() {
a.DeletePostFiles(post)
})
a.Go(func() {
a.Srv.Go(func() {
a.DeleteFlaggedPosts(post.Id)
})
esInterface := a.Elasticsearch
if esInterface != nil && *a.Config().ElasticsearchSettings.EnableIndexing {
a.Go(func() {
a.Srv.Go(func() {
esInterface.DeletePost(post)
})
}

View File

@@ -648,9 +648,9 @@ func TestMaxPostSize(t *testing.T) {
app := App{
Srv: &Server{
Store: mockStore,
Store: mockStore,
config: atomic.Value{},
},
config: atomic.Value{},
}
assert.Equal(t, testCase.ExpectedMaxPostSize, app.MaxPostSize())

View File

@@ -42,7 +42,7 @@ func (a *App) SaveReactionForPost(reaction *model.Reaction) (*model.Reaction, *m
reaction = result.Data.(*model.Reaction)
a.Go(func() {
a.Srv.Go(func() {
a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_ADDED, reaction, post, true)
})
@@ -92,7 +92,7 @@ func (a *App) DeleteReactionForPost(reaction *model.Reaction) *model.AppError {
return result.Err
}
a.Go(func() {
a.Srv.Go(func() {
a.sendReactionEvent(model.WEBSOCKET_EVENT_REACTION_REMOVED, reaction, post, hasReactions)
})

View File

@@ -104,7 +104,7 @@ func (a *App) sendUpdatedRoleEvent(role *model.Role) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ROLE_UPDATED, "", "", "", nil)
message.Add("role", role.ToJson())
a.Go(func() {
a.Srv.Go(func() {
a.Publish(message)
})
}

View File

@@ -152,7 +152,7 @@ func (a *App) GetChannelsForScheme(scheme *model.Scheme, offset int, limit int)
}
func (a *App) IsPhase2MigrationCompleted() *model.AppError {
if a.phase2PermissionsMigrationComplete {
if a.Srv.phase2PermissionsMigrationComplete {
return nil
}
@@ -160,7 +160,7 @@ func (a *App) IsPhase2MigrationCompleted() *model.AppError {
return model.NewAppError("App.IsPhase2MigrationCompleted", "app.schemes.is_phase_2_migration_completed.not_completed.app_error", nil, result.Err.Error(), http.StatusNotImplemented)
}
a.phase2PermissionsMigrationComplete = true
a.Srv.phase2PermissionsMigrationComplete = true
return nil
}

View File

@@ -5,6 +5,7 @@ package app
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"fmt"
"io"
@@ -14,16 +15,21 @@ import (
"net/url"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/rs/cors"
"github.com/throttled/throttled"
"golang.org/x/crypto/acme/autocert"
"github.com/mattermost/mattermost-server/jobs"
"github.com/mattermost/mattermost-server/mlog"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/plugin"
"github.com/mattermost/mattermost-server/store"
"github.com/mattermost/mattermost-server/utils"
)
@@ -44,6 +50,79 @@ type Server struct {
RateLimiter *RateLimiter
didFinishListen chan struct{}
goroutineCount int32
goroutineExitSignal chan struct{}
Plugins *plugin.Environment
PluginConfigListenerId string
EmailBatching *EmailBatchingJob
EmailRateLimiter *throttled.GCRARateLimiter
Hubs []*Hub
HubsStopCheckingForDeadlock chan bool
PushNotificationsHub PushNotificationsHub
Jobs *jobs.JobServer
config atomic.Value
envConfig map[string]interface{}
configFile string
configListeners map[string]func(*model.Config, *model.Config)
clusterLeaderListeners sync.Map
licenseValue atomic.Value
clientLicenseValue atomic.Value
licenseListeners map[string]func()
timezones atomic.Value
newStore func() store.Store
htmlTemplateWatcher *utils.HTMLTemplateWatcher
sessionCache *utils.Cache
configListenerId string
licenseListenerId string
logListenerId string
clusterLeaderListenerId string
disableConfigWatch bool
configWatcher *utils.ConfigWatcher
asymmetricSigningKey *ecdsa.PrivateKey
pluginCommands []*PluginCommand
pluginCommandsLock sync.RWMutex
clientConfig map[string]string
clientConfigHash string
limitedClientConfig map[string]string
diagnosticId string
phase2PermissionsMigrationComplete bool
}
// Go creates a goroutine, but maintains a record of it to ensure that execution completes before
// the app is destroyed.
func (s *Server) Go(f func()) {
atomic.AddInt32(&s.goroutineCount, 1)
go func() {
f()
atomic.AddInt32(&s.goroutineCount, -1)
select {
case s.goroutineExitSignal <- struct{}{}:
default:
}
}()
}
// WaitForGoroutines blocks until all goroutines created by App.Go exit.
func (s *Server) WaitForGoroutines() {
for atomic.LoadInt32(&s.goroutineCount) != 0 {
<-s.goroutineExitSignal
}
}
var corsAllowedMethods = []string{

View File

@@ -29,7 +29,7 @@ func (a *App) GetSession(token string) (*model.Session, *model.AppError) {
metrics := a.Metrics
var session *model.Session
if ts, ok := a.sessionCache.Get(token); ok {
if ts, ok := a.Srv.sessionCache.Get(token); ok {
session = ts.(*model.Session)
if metrics != nil {
metrics.IncrementMemCacheHitCounterSession()
@@ -137,13 +137,13 @@ func (a *App) ClearSessionCacheForUser(userId string) {
}
func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) {
keys := a.sessionCache.Keys()
keys := a.Srv.sessionCache.Keys()
for _, key := range keys {
if ts, ok := a.sessionCache.Get(key); ok {
if ts, ok := a.Srv.sessionCache.Get(key); ok {
session := ts.(*model.Session)
if session.UserId == userId {
a.sessionCache.Remove(key)
a.Srv.sessionCache.Remove(key)
if a.Metrics != nil {
a.Metrics.IncrementMemCacheInvalidationCounterSession()
}
@@ -155,11 +155,11 @@ func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) {
}
func (a *App) AddSessionToCache(session *model.Session) {
a.sessionCache.AddWithExpiresInSecs(session.Token, session, int64(*a.Config().ServiceSettings.SessionCacheInMinutes*60))
a.Srv.sessionCache.AddWithExpiresInSecs(session.Token, session, int64(*a.Config().ServiceSettings.SessionCacheInMinutes*60))
}
func (a *App) SessionCacheLength() int {
return a.sessionCache.Len()
return a.Srv.sessionCache.Len()
}
func (a *App) RevokeSessionsForDeviceId(userId string, deviceId string, currentSessionId string) *model.AppError {

View File

@@ -22,16 +22,16 @@ func TestCache(t *testing.T) {
UserId: model.NewId(),
}
th.App.sessionCache.AddWithExpiresInSecs(session.Token, session, 5*60)
th.App.Srv.sessionCache.AddWithExpiresInSecs(session.Token, session, 5*60)
keys := th.App.sessionCache.Keys()
keys := th.App.Srv.sessionCache.Keys()
if len(keys) <= 0 {
t.Fatal("should have items")
}
th.App.ClearSessionCacheForUser(session.UserId)
rkeys := th.App.sessionCache.Keys()
rkeys := th.App.Srv.sessionCache.Keys()
if len(rkeys) != len(keys)-1 {
t.Fatal("should have one less")
}

View File

@@ -466,9 +466,9 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId
actor, _ = a.GetUser(userRequestorId)
}
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasJoinedTeam(pluginContext, tm, actor)
return true
}, plugin.UserHasJoinedTeamId)
@@ -789,9 +789,9 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User, requestorId string)
actor, _ = a.GetUser(requestorId)
}
a.Go(func() {
a.Srv.Go(func() {
pluginContext := &plugin.Context{}
a.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
a.Srv.Plugins.RunMultiPluginHook(func(hooks plugin.Hooks) bool {
hooks.UserHasLeftTeam(pluginContext, teamMember, actor)
return true
}, plugin.UserHasLeftTeamId)

View File

@@ -9,7 +9,7 @@ import (
)
func (a *App) Timezones() model.SupportedTimezones {
if cfg := a.timezones.Load(); cfg != nil {
if cfg := a.Srv.timezones.Load(); cfg != nil {
return cfg.(model.SupportedTimezones)
}
return model.SupportedTimezones{}
@@ -24,5 +24,5 @@ func (a *App) LoadTimezones() {
timezoneCfg := utils.LoadTimezones(timezonePath)
a.timezones.Store(timezoneCfg)
a.Srv.timezones.Store(timezoneCfg)
}

View File

@@ -1034,14 +1034,14 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User,
if sendNotifications {
if rusers[0].Email != rusers[1].Email {
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendEmailChangeEmail(rusers[1].Email, rusers[0].Email, rusers[0].Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}
})
if a.Config().EmailSettings.RequireEmailVerification {
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendEmailVerification(rusers[0]); err != nil {
mlog.Error(err.Error())
}
@@ -1050,7 +1050,7 @@ func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User,
}
if rusers[0].Username != rusers[1].Username {
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendChangeUsernameEmail(rusers[1].Username, rusers[0].Username, rusers[0].Email, rusers[0].Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}
@@ -1090,7 +1090,7 @@ func (a *App) UpdateMfa(activate bool, userId, token string) *model.AppError {
}
}
a.Go(func() {
a.Srv.Go(func() {
user, err := a.GetUser(userId)
if err != nil {
mlog.Error(err.Error())
@@ -1133,7 +1133,7 @@ func (a *App) UpdatePasswordSendEmail(user *model.User, newPassword, method stri
return err
}
a.Go(func() {
a.Srv.Go(func() {
if err := a.SendPasswordChangeEmail(user.Email, method, user.Locale, a.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}

View File

@@ -45,7 +45,7 @@ type WebConn struct {
func (a *App) NewWebConn(ws *websocket.Conn, session model.Session, t goi18n.TranslateFunc, locale string) *WebConn {
if len(session.UserId) > 0 {
a.Go(func() {
a.Srv.Go(func() {
a.SetStatusOnline(session.UserId, false)
a.UpdateLastActivityAtIfNeeded(session)
})
@@ -126,7 +126,7 @@ func (c *WebConn) readPump() {
c.WebSocket.SetPongHandler(func(string) error {
c.WebSocket.SetReadDeadline(time.Now().Add(PONG_WAIT))
if c.IsAuthenticated() {
c.App.Go(func() {
c.App.Srv.Go(func() {
c.App.SetStatusAwayIfNeeded(c.UserId, false)
})
}
@@ -212,7 +212,7 @@ func (c *WebConn) writePump() {
}
if c.App.Metrics != nil {
c.App.Go(func() {
c.App.Srv.Go(func() {
c.App.Metrics.IncrementWebSocketBroadcast(msg.EventType())
})
}

View File

@@ -62,7 +62,7 @@ func (a *App) NewWebHub() *Hub {
func (a *App) TotalWebsocketConnections() int {
count := int64(0)
for _, hub := range a.Hubs {
for _, hub := range a.Srv.Hubs {
count = count + atomic.LoadInt64(&hub.connectionCount)
}
@@ -74,13 +74,13 @@ func (a *App) HubStart() {
numberOfHubs := runtime.NumCPU() * 2
mlog.Info(fmt.Sprintf("Starting %v websocket hubs", numberOfHubs))
a.Hubs = make([]*Hub, numberOfHubs)
a.HubsStopCheckingForDeadlock = make(chan bool, 1)
a.Srv.Hubs = make([]*Hub, numberOfHubs)
a.Srv.HubsStopCheckingForDeadlock = make(chan bool, 1)
for i := 0; i < len(a.Hubs); i++ {
a.Hubs[i] = a.NewWebHub()
a.Hubs[i].connectionIndex = i
a.Hubs[i].Start()
for i := 0; i < len(a.Srv.Hubs); i++ {
a.Srv.Hubs[i] = a.NewWebHub()
a.Srv.Hubs[i].connectionIndex = i
a.Srv.Hubs[i].Start()
}
go func() {
@@ -93,7 +93,7 @@ func (a *App) HubStart() {
for {
select {
case <-ticker.C:
for _, hub := range a.Hubs {
for _, hub := range a.Srv.Hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
mlog.Error(fmt.Sprintf("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast)))
buf := make([]byte, 1<<16)
@@ -109,7 +109,7 @@ func (a *App) HubStart() {
}
}
case <-a.HubsStopCheckingForDeadlock:
case <-a.Srv.HubsStopCheckingForDeadlock:
return
}
}
@@ -120,27 +120,27 @@ func (a *App) HubStop() {
mlog.Info("stopping websocket hub connections")
select {
case a.HubsStopCheckingForDeadlock <- true:
case a.Srv.HubsStopCheckingForDeadlock <- true:
default:
mlog.Warn("We appear to have already sent the stop checking for deadlocks command")
}
for _, hub := range a.Hubs {
for _, hub := range a.Srv.Hubs {
hub.Stop()
}
a.Hubs = []*Hub{}
a.Srv.Hubs = []*Hub{}
}
func (a *App) GetHubForUserId(userId string) *Hub {
if len(a.Hubs) == 0 {
if len(a.Srv.Hubs) == 0 {
return nil
}
hash := fnv.New32a()
hash.Write([]byte(userId))
index := hash.Sum32() % uint32(len(a.Hubs))
return a.Hubs[index]
index := hash.Sum32() % uint32(len(a.Srv.Hubs))
return a.Srv.Hubs[index]
}
func (a *App) HubRegister(webConn *WebConn) {
@@ -190,7 +190,7 @@ func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
hub.Broadcast(message)
}
} else {
for _, hub := range a.Hubs {
for _, hub := range a.Srv.Hubs {
hub.Broadcast(message)
}
}
@@ -416,7 +416,7 @@ func (h *Hub) Start() {
conns := connections.ForUser(webCon.UserId)
if len(conns) == 0 {
h.app.Go(func() {
h.app.Srv.Go(func() {
h.app.SetStatusOffline(webCon.UserId, false)
})
} else {
@@ -427,7 +427,7 @@ func (h *Hub) Start() {
}
}
if h.app.IsUserAway(latestActivity) {
h.app.Go(func() {
h.app.Srv.Go(func() {
h.app.SetStatusLastActivityAt(webCon.UserId, latestActivity)
})
}

View File

@@ -80,7 +80,7 @@ func (a *App) handleWebhookEvents(post *model.Post, team *model.Team, channel *m
TriggerWord: triggerWord,
FileIds: strings.Join(post.FileIds, ","),
}
a.Go(func(hook *model.OutgoingWebhook) func() {
a.Srv.Go(func(hook *model.OutgoingWebhook) func() {
return func() {
a.TriggerWebhook(payload, hook, post, channel)
}
@@ -102,7 +102,7 @@ func (a *App) TriggerWebhook(payload *model.OutgoingWebhookPayload, hook *model.
}
for _, url := range hook.CallbackURLs {
a.Go(func(url string) func() {
a.Srv.Go(func(url string) func() {
return func() {
req, _ := http.NewRequest("POST", url, body)
req.Header.Set("Content-Type", contentType)

View File

@@ -55,7 +55,7 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
return
}
wr.app.Go(func() {
wr.app.Srv.Go(func() {
wr.app.SetStatusOnline(session.UserId, false)
wr.app.UpdateLastActivityAtIfNeeded(*session)
})

View File

@@ -44,12 +44,12 @@ func jobserverCmdF(command *cobra.Command, args []string) {
defer mlog.Info("Stopped Mattermost job server")
if !noJobs {
a.Jobs.StartWorkers()
defer a.Jobs.StopWorkers()
a.Srv.Jobs.StartWorkers()
defer a.Srv.Jobs.StopWorkers()
}
if !noSchedule {
a.Jobs.StartSchedulers()
defer a.Jobs.StopSchedulers()
a.Srv.Jobs.StartSchedulers()
defer a.Srv.Jobs.StopSchedulers()
}
signalChan := make(chan os.Signal, 1)

View File

@@ -147,19 +147,19 @@ func runServer(configFileLocation string, disableConfigWatch bool, usedPlatform
manualtesting.Init(api)
}
a.Go(func() {
a.Srv.Go(func() {
runSecurityJob(a)
})
a.Go(func() {
a.Srv.Go(func() {
runDiagnosticsJob(a)
})
a.Go(func() {
a.Srv.Go(func() {
runSessionCleanupJob(a)
})
a.Go(func() {
a.Srv.Go(func() {
runTokenCleanupJob(a)
})
a.Go(func() {
a.Srv.Go(func() {
runCommandWebhookCleanupJob(a)
})
@@ -181,12 +181,12 @@ func runServer(configFileLocation string, disableConfigWatch bool, usedPlatform
}
if *a.Config().JobSettings.RunJobs {
a.Jobs.StartWorkers()
defer a.Jobs.StopWorkers()
a.Srv.Jobs.StartWorkers()
defer a.Srv.Jobs.StopWorkers()
}
if *a.Config().JobSettings.RunScheduler {
a.Jobs.StartSchedulers()
defer a.Jobs.StopSchedulers()
a.Srv.Jobs.StartSchedulers()
defer a.Srv.Jobs.StopSchedulers()
}
notifyReady()

View File

@@ -61,7 +61,7 @@ func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, las
// Check the migration job isn't wedged.
if job != nil && job.LastActivityAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS && job.CreateAt < model.GetMillis()-MIGRATION_JOB_WEDGED_TIMEOUT_MILLISECONDS {
mlog.Warn("Job appears to be wedged. Rescheduling another instance.", mlog.String("scheduler", scheduler.Name()), mlog.String("wedged_job_id", job.Id), mlog.String("migration_key", key))
if err := scheduler.App.Jobs.SetJobError(job, nil); err != nil {
if err := scheduler.App.Srv.Jobs.SetJobError(job, nil); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("scheduler", scheduler.Name()), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
return scheduler.createJob(key, job, scheduler.App.Srv.Store)
@@ -102,7 +102,7 @@ func (scheduler *Scheduler) createJob(migrationKey string, lastJob *model.Job, s
JOB_DATA_KEY_MIGRATION_LAST_DONE: lastDone,
}
if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil {
if job, err := scheduler.App.Srv.Jobs.CreateJob(model.JOB_TYPE_MIGRATIONS, data); err != nil {
return nil, err
} else {
return job, nil

View File

@@ -33,7 +33,7 @@ func (m *MigrationsJobInterfaceImpl) MakeWorker() model.Worker {
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
jobs: make(chan model.Job),
jobServer: m.App.Jobs,
jobServer: m.App.Srv.Jobs,
app: m.App,
}
@@ -83,7 +83,7 @@ func (worker *Worker) DoJob(job *model.Job) {
cancelCtx, cancelCancelWatcher := context.WithCancel(context.Background())
cancelWatcherChan := make(chan interface{}, 1)
go worker.app.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
go worker.app.Srv.Jobs.CancellationWatcher(cancelCtx, job.Id, cancelWatcherChan)
defer cancelCancelWatcher()
@@ -111,7 +111,7 @@ func (worker *Worker) DoJob(job *model.Job) {
return
} else {
job.Data[JOB_DATA_KEY_MIGRATION_LAST_DONE] = progress
if err := worker.app.Jobs.UpdateInProgressJobData(job); err != nil {
if err := worker.app.Srv.Jobs.UpdateInProgressJobData(job); err != nil {
mlog.Error("Worker: Failed to update migration status data for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
worker.setJobError(job, err)
return
@@ -122,20 +122,20 @@ func (worker *Worker) DoJob(job *model.Job) {
}
func (worker *Worker) setJobSuccess(job *model.Job) {
if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
if err := worker.app.Srv.Jobs.SetJobSuccess(job); err != nil {
mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
worker.setJobError(job, err)
}
}
func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
if err := worker.app.Srv.Jobs.SetJobError(job, appError); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
}
func (worker *Worker) setJobCanceled(job *model.Job) {
if err := worker.app.Jobs.SetJobCanceled(job); err != nil {
if err := worker.app.Srv.Jobs.SetJobCanceled(job); err != nil {
mlog.Error("Worker: Failed to mark job as canceled", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
}

View File

@@ -39,7 +39,7 @@ func (scheduler *Scheduler) NextScheduleTime(cfg *model.Config, now time.Time, p
func (scheduler *Scheduler) ScheduleJob(cfg *model.Config, pendingJobs bool, lastSuccessfulJob *model.Job) (*model.Job, *model.AppError) {
mlog.Debug("Scheduling Job", mlog.String("scheduler", scheduler.Name()))
if job, err := scheduler.App.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil {
if job, err := scheduler.App.Srv.Jobs.CreateJob(model.JOB_TYPE_PLUGINS, nil); err != nil {
return nil, err
} else {
return job, nil

View File

@@ -25,7 +25,7 @@ func (m *PluginsJobInterfaceImpl) MakeWorker() model.Worker {
stop: make(chan bool, 1),
stopped: make(chan bool, 1),
jobs: make(chan model.Job),
jobServer: m.App.Jobs,
jobServer: m.App.Srv.Jobs,
app: m.App,
}
@@ -86,14 +86,14 @@ func (worker *Worker) DoJob(job *model.Job) {
}
func (worker *Worker) setJobSuccess(job *model.Job) {
if err := worker.app.Jobs.SetJobSuccess(job); err != nil {
if err := worker.app.Srv.Jobs.SetJobSuccess(job); err != nil {
mlog.Error("Worker: Failed to set success for job", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
worker.setJobError(job, err)
}
}
func (worker *Worker) setJobError(job *model.Job, appError *model.AppError) {
if err := worker.app.Jobs.SetJobError(job, appError); err != nil {
if err := worker.app.Srv.Jobs.SetJobError(job, appError); err != nil {
mlog.Error("Worker: Failed to set job error", mlog.String("worker", worker.name), mlog.String("job_id", job.Id), mlog.String("error", err.Error()))
}
}

View File

@@ -115,7 +115,7 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) {
case model.OAUTH_ACTION_SIGNUP:
teamId := relayProps["team_id"]
if len(teamId) > 0 {
c.App.Go(func() {
c.App.Srv.Go(func() {
if err := c.App.AddUserToTeamByTeamId(teamId, user); err != nil {
mlog.Error(err.Error())
} else {
@@ -129,7 +129,7 @@ func completeSaml(c *Context, w http.ResponseWriter, r *http.Request) {
return
}
c.LogAuditWithUserId(user.Id, "Revoked all sessions for user")
c.App.Go(func() {
c.App.Srv.Go(func() {
if err := c.App.SendSignInChangeEmail(user.Email, strings.Title(model.USER_AUTH_SERVICE_SAML)+" SSO", user.Locale, c.App.GetSiteURL()); err != nil {
mlog.Error(err.Error())
}