remove remaining Global() calls (outside of tests) (#7521)

This commit is contained in:
Chris
2017-09-27 11:52:34 -05:00
committed by Saturnino Abril
parent 1bd66589a2
commit 8c80cdde38
45 changed files with 207 additions and 222 deletions

View File

@@ -121,7 +121,7 @@ func Init(a *app.App, root *mux.Router) *API {
utils.InitHTML()
app.InitEmailBatching()
a.InitEmailBatching()
if *utils.Cfg.ServiceSettings.EnableAPIv3 {
l4g.Info("API version 3 is scheduled for deprecation. Please see https://api.mattermost.com for details.")

View File

@@ -49,11 +49,11 @@ func setupTestHelper(enterprise bool) *TestHelper {
th.App.NewServer()
th.App.InitStores()
th.App.Srv.Router = NewRouter()
wsapi.InitRouter()
th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter()
th.App.StartServer()
api4.Init(th.App, th.App.Srv.Router, false)
Init(th.App, th.App.Srv.Router)
wsapi.InitApi()
wsapi.Init(th.App, th.App.Srv.WebSocketRouter)
utils.EnableDebugLogForTest()
th.App.Srv.Store.MarkSystemRanUnitTests()

View File

@@ -8,7 +8,6 @@ import (
"strings"
"testing"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
)
@@ -96,7 +95,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) {
t.SkipNow()
}
Setup()
th := Setup()
id := model.NewId()
email := "success+" + id + "@simulator.amazonses.com"
username := "name" + id
@@ -108,7 +107,7 @@ func TestCliCreateUserWithoutTeam(t *testing.T) {
t.Fatal(err)
}
if result := <-app.Global().Srv.Store.User().GetByEmail(email); result.Err != nil {
if result := <-th.App.Srv.Store.User().GetByEmail(email); result.Err != nil {
t.Fatal()
} else {
user := result.Data.(*model.User)

View File

@@ -118,7 +118,7 @@ func createEmoji(c *Context, w http.ResponseWriter, r *http.Request) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil)
message.Add("emoji", result.Data.(*model.Emoji).ToJson())
app.Publish(message)
c.App.Publish(message)
w.Write([]byte(result.Data.(*model.Emoji).ToJson()))
}
}

View File

@@ -141,7 +141,7 @@ func saveIsPinnedPost(c *Context, w http.ResponseWriter, r *http.Request, isPinn
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", rpost.ChannelId, "", nil)
message.Add("post", rpost.ToJson())
go app.Publish(message)
go c.App.Publish(message)
c.App.InvalidateCacheForChannelPosts(rpost.ChannelId)

View File

@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/mux"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -112,23 +111,6 @@ func deleteReaction(c *Context, w http.ResponseWriter, r *http.Request) {
ReturnStatusOK(w)
}
func sendReactionEvent(event string, channelId string, reaction *model.Reaction, post *model.Post) {
// send out that a reaction has been added/removed
message := model.NewWebSocketEvent(event, "", channelId, "", nil)
message.Add("reaction", reaction.ToJson())
app.Publish(message)
// THe post is always modified since the UpdateAt always changes
app.Global().InvalidateCacheForChannelPosts(post.ChannelId)
post.HasReactions = true
post.UpdateAt = model.GetMillis()
umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", channelId, "", nil)
umessage.Add("post", post.ToJson())
app.Publish(umessage)
}
func listReactions(c *Context, w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)

View File

@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -37,7 +36,7 @@ func connect(c *Context, w http.ResponseWriter, r *http.Request) {
wc := c.App.NewWebConn(ws, c.Session, c.T, c.Locale)
if len(c.Session.UserId) > 0 {
app.HubRegister(wc)
c.App.HubRegister(wc)
}
go wc.WritePump()

View File

@@ -11,7 +11,6 @@ import (
"time"
"github.com/gorilla/websocket"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -195,7 +194,7 @@ func TestWebSocketEvent(t *testing.T) {
omitUser["somerandomid"] = true
evt1 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", th.BasicChannel.Id, "", omitUser)
evt1.Add("user_id", "somerandomid")
app.Publish(evt1)
th.App.Publish(evt1)
time.Sleep(300 * time.Millisecond)
@@ -224,7 +223,7 @@ func TestWebSocketEvent(t *testing.T) {
}
evt2 := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", "somerandomid", "", nil)
go app.Publish(evt2)
go th.App.Publish(evt2)
time.Sleep(300 * time.Millisecond)
eventHit = false

View File

@@ -226,7 +226,7 @@ func Init(a *app.App, root *mux.Router, full bool) *API {
if full {
utils.InitHTML()
app.InitEmailBatching()
a.InitEmailBatching()
}
return api

View File

@@ -61,10 +61,10 @@ func setupTestHelper(enterprise bool) *TestHelper {
th.App.NewServer()
th.App.InitStores()
th.App.Srv.Router = NewRouter()
wsapi.InitRouter()
th.App.Srv.WebSocketRouter = th.App.NewWebSocketRouter()
th.App.StartServer()
Init(th.App, th.App.Srv.Router, true)
wsapi.InitApi()
wsapi.Init(th.App, th.App.Srv.WebSocketRouter)
utils.EnableDebugLogForTest()
th.App.Srv.Store.MarkSystemRanUnitTests()

View File

@@ -8,7 +8,6 @@ import (
l4g "github.com/alecthomas/log4go"
"github.com/gorilla/websocket"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
@@ -38,7 +37,7 @@ func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
wc := c.App.NewWebConn(ws, c.Session, c.T, "")
if len(c.Session.UserId) > 0 {
app.HubRegister(wc)
c.App.HubRegister(wc)
}
go wc.WritePump()

View File

@@ -138,7 +138,7 @@ func (a *App) ReloadConfig() {
utils.LoadConfig(utils.CfgFileName)
// start/restart email batching job if necessary
InitEmailBatching()
a.InitEmailBatching()
}
func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool) *model.AppError {
@@ -179,7 +179,7 @@ func (a *App) SaveConfig(cfg *model.Config, sendConfigChangeClusterMessage bool)
}
// start/restart email batching job if necessary
InitEmailBatching()
a.InitEmailBatching()
return nil
}

View File

@@ -103,7 +103,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
return nil, err
}
totalSockets := TotalWebsocketConnections()
totalSockets := a.TotalWebsocketConnections()
totalMasterDb := a.Srv.Store.TotalMasterDbConnections()
totalReadDb := a.Srv.Store.TotalReadDbConnections()
@@ -118,7 +118,7 @@ func (a *App) GetAnalytics(name string, teamId string) (model.AnalyticsRows, *mo
rows[7].Value = float64(totalReadDb)
} else {
rows[5].Value = float64(TotalWebsocketConnections())
rows[5].Value = float64(a.TotalWebsocketConnections())
rows[6].Value = float64(a.Srv.Store.TotalMasterDbConnections())
rows[7].Value = float64(a.Srv.Store.TotalReadDbConnections())
}

View File

@@ -20,6 +20,11 @@ type App struct {
PluginEnv *pluginenv.Environment
PluginConfigListenerId string
EmailBatching *EmailBatchingJob
Hubs []*Hub
HubsStopCheckingForDeadlock chan bool
AccountMigration einterfaces.AccountMigrationInterface
Brand einterfaces.BrandInterface
Cluster einterfaces.ClusterInterface

View File

@@ -16,7 +16,7 @@ type TestEnvironment struct {
Environments []TeamEnvironment
}
func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
func CreateTestEnvironmentWithTeams(a *App, client *model.Client, rangeTeams utils.Range, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TestEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
teamCreator := NewAutoTeamCreator(client)
@@ -29,7 +29,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
environment := TestEnvironment{teams, make([]TeamEnvironment, len(teams))}
for i, team := range teams {
userCreator := NewAutoUserCreator(client, team)
userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
randomUser, err := userCreator.createRandomUser()
if err != true {
@@ -37,7 +37,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
}
client.LoginById(randomUser.Id, USER_PASSWORD)
client.SetTeamId(team.Id)
teamEnvironment, err := CreateTestEnvironmentInTeam(client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
teamEnvironment, err := CreateTestEnvironmentInTeam(a, client, team, rangeChannels, rangeUsers, rangePosts, fuzzy)
if err != true {
return TestEnvironment{}, false
}
@@ -47,7 +47,7 @@ func CreateTestEnvironmentWithTeams(client *model.Client, rangeTeams utils.Range
return environment, true
}
func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
func CreateTestEnvironmentInTeam(a *App, client *model.Client, team *model.Team, rangeChannels utils.Range, rangeUsers utils.Range, rangePosts utils.Range, fuzzy bool) (TeamEnvironment, bool) {
rand.Seed(time.Now().UTC().UnixNano())
// We need to create at least one user
@@ -55,7 +55,7 @@ func CreateTestEnvironmentInTeam(client *model.Client, team *model.Team, rangeCh
rangeUsers.Begin = 1
}
userCreator := NewAutoUserCreator(client, team)
userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = fuzzy
users, err := userCreator.CreateTestUsers(rangeUsers)
if err != true {

View File

@@ -12,6 +12,7 @@ import (
)
type AutoUserCreator struct {
app *App
client *model.Client
team *model.Team
EmailLength utils.Range
@@ -21,8 +22,9 @@ type AutoUserCreator struct {
Fuzzy bool
}
func NewAutoUserCreator(client *model.Client, team *model.Team) *AutoUserCreator {
func NewAutoUserCreator(a *App, client *model.Client, team *model.Team) *AutoUserCreator {
return &AutoUserCreator{
app: a,
client: client,
team: team,
EmailLength: USER_EMAIL_LEN,
@@ -81,14 +83,14 @@ func (cfg *AutoUserCreator) createRandomUser() (*model.User, bool) {
ruser := result.Data.(*model.User)
status := &model.Status{UserId: ruser.Id, Status: model.STATUS_ONLINE, Manual: false, LastActivityAt: model.GetMillis(), ActiveChannel: ""}
if result := <-Global().Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
if result := <-cfg.app.Srv.Store.Status().SaveOrUpdate(status); result.Err != nil {
result.Err.Translate(utils.T)
l4g.Error(result.Err.Error())
return nil, false
}
// We need to cheat to verify the user's email
store.Must(Global().Srv.Store.User().VerifyEmail(ruser.Id))
store.Must(cfg.app.Srv.Store.User().VerifyEmail(ruser.Id))
return result.Data.(*model.User), true
}

View File

@@ -136,7 +136,7 @@ func (a *App) CreateChannelWithUser(channel *model.Channel, userId string) (*mod
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_CREATED, "", "", userId, nil)
message.Add("channel_id", channel.Id)
message.Add("team_id", channel.TeamId)
Publish(message)
a.Publish(message)
return rchannel, nil
}
@@ -181,7 +181,7 @@ func (a *App) CreateDirectChannel(userId string, otherUserId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_DIRECT_ADDED, "", channel.Id, "", nil)
message.Add("teammate_id", otherUserId)
Publish(message)
a.Publish(message)
return channel, nil
}
@@ -254,7 +254,7 @@ func (a *App) CreateGroupChannel(userIds []string, creatorId string) (*model.Cha
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_GROUP_ADDED, "", channel.Id, "", nil)
message.Add("teammate_ids", model.ArrayToJson(userIds))
Publish(message)
a.Publish(message)
return channel, nil
}
@@ -316,7 +316,7 @@ func (a *App) UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppE
messageWs := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_UPDATED, "", channel.Id, "", nil)
messageWs.Add("channel", channel.ToJson())
Publish(messageWs)
a.Publish(messageWs)
return channel, nil
}
@@ -484,7 +484,7 @@ func (a *App) DeleteChannel(channel *model.Channel, userId string) *model.AppErr
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_DELETED, channel.TeamId, "", "", nil)
message.Add("channel_id", channel.Id)
Publish(message)
a.Publish(message)
}
return nil
@@ -550,7 +550,7 @@ func (a *App) AddUserToChannel(user *model.User, channel *model.Channel) (*model
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_ADDED, "", channel.Id, "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", channel.TeamId)
Publish(message)
a.Publish(message)
return newMember, nil
}
@@ -1039,13 +1039,13 @@ func (a *App) removeUserFromChannel(userIdToRemove string, removerUserId string,
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", channel.Id, "", nil)
message.Add("user_id", userIdToRemove)
message.Add("remover_id", removerUserId)
go Publish(message)
go a.Publish(message)
// because the removed user no longer belongs to the channel we need to send a separate websocket event
userMsg := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_REMOVED, "", "", userIdToRemove, nil)
userMsg.Add("channel_id", channel.Id)
userMsg.Add("remover_id", removerUserId)
go Publish(userMsg)
go a.Publish(userMsg)
return nil
}
@@ -1098,7 +1098,7 @@ func (a *App) SetActiveChannel(userId string, channelId string) *model.AppError
a.AddStatusCache(status)
if status.Status != oldStatus {
BroadcastStatus(status)
a.BroadcastStatus(status)
}
return nil
@@ -1113,7 +1113,7 @@ func (a *App) UpdateChannelLastViewedAt(channelIds []string, userId string) *mod
for _, channelId := range channelIds {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", channelId)
go Publish(message)
go a.Publish(message)
}
}
@@ -1179,7 +1179,7 @@ func (a *App) ViewChannel(view *model.ChannelView, userId string, clearPushNotif
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages && model.IsValidId(view.ChannelId) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", userId, nil)
message.Add("channel_id", view.ChannelId)
go Publish(message)
go a.Publish(message)
}
return nil

View File

@@ -20,12 +20,12 @@ func (a *App) RegisterAllClusterMessageHandlers() {
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL_BY_NAME, a.ClusterInvalidateCacheForChannelByNameHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_CHANNEL, a.ClusterInvalidateCacheForChannelHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_INVALIDATE_CACHE_FOR_USER, a.ClusterInvalidateCacheForUserHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, ClusterClearSessionCacheForUserHandler)
a.Cluster.RegisterClusterMessageHandler(model.CLUSTER_EVENT_CLEAR_SESSION_CACHE_FOR_USER, a.ClusterClearSessionCacheForUserHandler)
}
func (a *App) ClusterPublishHandler(msg *model.ClusterMessage) {
event := model.WebSocketEventFromJson(strings.NewReader(msg.Data))
PublishSkipClusterSend(event)
a.PublishSkipClusterSend(event)
}
func (a *App) ClusterUpdateStatusHandler(msg *model.ClusterMessage) {
@@ -65,6 +65,6 @@ func (a *App) ClusterInvalidateCacheForUserHandler(msg *model.ClusterMessage) {
a.InvalidateCacheForUserSkipClusterSend(msg.Data)
}
func ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
ClearSessionCacheForUserSkipClusterSend(msg.Data)
func (a *App) ClusterClearSessionCacheForUserHandler(msg *model.ClusterMessage) {
a.ClearSessionCacheForUserSkipClusterSend(msg.Data)
}

View File

@@ -49,7 +49,7 @@ func (a *App) CreateCommandPost(post *model.Post, teamId string, response *model
return a.CreatePostMissingChannel(post, true)
} else if response.ResponseType == "" || response.ResponseType == model.COMMAND_RESPONSE_TYPE_EPHEMERAL {
post.ParentId = ""
SendEphemeralPost(post.UserId, post)
a.SendEphemeralPost(post.UserId, post)
}
return post, nil

View File

@@ -74,7 +74,7 @@ func (a *App) setCollapsePreference(args *model.CommandArgs, isCollapse bool) *m
socketMessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCE_CHANGED, "", "", args.UserId, nil)
socketMessage.Add("preference", pref.ToJson())
go Publish(socketMessage)
go a.Publish(socketMessage)
var rmsg string

View File

@@ -166,6 +166,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
}
client.Login(BTEST_USER_EMAIL, BTEST_USER_PASSWORD)
environment, err := CreateTestEnvironmentWithTeams(
a,
client,
utils.Range{Begin: numTeams, End: numTeams},
utils.Range{Begin: numChannels, End: numChannels},
@@ -193,6 +194,7 @@ func (me *LoadTestProvider) SetupCommand(a *App, args *model.CommandArgs, messag
client.MockSession(args.Session.Token)
client.SetTeamId(args.TeamId)
CreateTestEnvironmentInTeam(
a,
client,
team,
utils.Range{Begin: numChannels, End: numChannels},
@@ -227,7 +229,7 @@ func (me *LoadTestProvider) UsersCommand(a *App, args *model.CommandArgs, messag
client := model.NewClient(args.SiteURL)
client.SetTeamId(team.Id)
userCreator := NewAutoUserCreator(client, team)
userCreator := NewAutoUserCreator(a, client, team)
userCreator.Fuzzy = doFuzz
userCreator.CreateTestUsers(usersr)

View File

@@ -22,26 +22,24 @@ const (
EMAIL_BATCHING_TASK_NAME = "Email Batching"
)
var emailBatchingJob *EmailBatchingJob
func InitEmailBatching() {
func (a *App) InitEmailBatching() {
if *utils.Cfg.EmailSettings.EnableEmailBatching {
if emailBatchingJob == nil {
emailBatchingJob = MakeEmailBatchingJob(*utils.Cfg.EmailSettings.EmailBatchingBufferSize)
if a.EmailBatching == nil {
a.EmailBatching = NewEmailBatchingJob(a, *utils.Cfg.EmailSettings.EmailBatchingBufferSize)
}
// note that we don't support changing EmailBatchingBufferSize without restarting the server
emailBatchingJob.Start()
a.EmailBatching.Start()
}
}
func AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
func (a *App) AddNotificationEmailToBatch(user *model.User, post *model.Post, team *model.Team) *model.AppError {
if !*utils.Cfg.EmailSettings.EnableEmailBatching {
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.disabled.app_error", nil, "", http.StatusNotImplemented)
}
if !emailBatchingJob.Add(user, post, team) {
if !a.EmailBatching.Add(user, post, team) {
l4g.Error(utils.T("api.email_batching.add_notification_email_to_batch.channel_full.app_error"))
return model.NewAppError("AddNotificationEmailToBatch", "api.email_batching.add_notification_email_to_batch.channel_full.app_error", nil, "", http.StatusInternalServerError)
}
@@ -56,12 +54,14 @@ type batchedNotification struct {
}
type EmailBatchingJob struct {
app *App
newNotifications chan *batchedNotification
pendingNotifications map[string][]*batchedNotification
}
func MakeEmailBatchingJob(bufferSize int) *EmailBatchingJob {
func NewEmailBatchingJob(a *App, bufferSize int) *EmailBatchingJob {
return &EmailBatchingJob{
app: a,
newNotifications: make(chan *batchedNotification, bufferSize),
pendingNotifications: make(map[string][]*batchedNotification),
}
@@ -97,7 +97,7 @@ func (job *EmailBatchingJob) CheckPendingEmails() {
// it's a bit weird to pass the send email function through here, but it makes it so that we can test
// without actually sending emails
job.checkPendingNotifications(time.Now(), Global().sendBatchedEmailNotification)
job.checkPendingNotifications(time.Now(), job.app.sendBatchedEmailNotification)
l4g.Debug(utils.T("api.email_batching.check_pending_emails.finished_running"), len(job.pendingNotifications))
}
@@ -131,7 +131,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
if inspectedTeamNames[notification.teamName] != "" {
continue
}
tchan := Global().Srv.Store.Team().GetByName(notifications[0].teamName)
tchan := job.app.Srv.Store.Team().GetByName(notifications[0].teamName)
if result := <-tchan; result.Err != nil {
l4g.Error("Unable to find Team id for notification", result.Err)
continue
@@ -141,7 +141,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// if the user has viewed any channels in this team since the notification was queued, delete
// all queued notifications
mchan := Global().Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
mchan := job.app.Srv.Store.Channel().GetMembersForUser(inspectedTeamNames[notification.teamName], userId)
if result := <-mchan; result.Err != nil {
l4g.Error("Unable to find ChannelMembers for user", result.Err)
continue
@@ -158,7 +158,7 @@ func (job *EmailBatchingJob) checkPendingNotifications(now time.Time, handler fu
// get how long we need to wait to send notifications to the user
var interval int64
pchan := Global().Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
pchan := job.app.Srv.Store.Preference().Get(userId, model.PREFERENCE_CATEGORY_NOTIFICATIONS, model.PREFERENCE_NAME_EMAIL_INTERVAL)
if result := <-pchan; result.Err != nil {
// use the default batching interval if an error ocurrs while fetching user preferences
interval, _ = strconv.ParseInt(model.PREFERENCE_EMAIL_INTERVAL_BATCHING_SECONDS, 10, 64)

View File

@@ -13,14 +13,14 @@ import (
)
func TestHandleNewNotifications(t *testing.T) {
Setup()
th := Setup()
id1 := model.NewId()
id2 := model.NewId()
id3 := model.NewId()
// test queueing of received posts by user
job := MakeEmailBatchingJob(128)
job := NewEmailBatchingJob(th.App, 128)
job.handleNewNotifications()
@@ -74,7 +74,7 @@ func TestHandleNewNotifications(t *testing.T) {
}
// test ordering of received posts
job = MakeEmailBatchingJob(128)
job = NewEmailBatchingJob(th.App, 128)
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test1"}, &model.Team{Name: "team"})
job.Add(&model.User{Id: id1}, &model.Post{UserId: id1, Message: "test2"}, &model.Team{Name: "team"})
@@ -95,7 +95,7 @@ func TestHandleNewNotifications(t *testing.T) {
func TestCheckPendingNotifications(t *testing.T) {
th := Setup().InitBasic()
job := MakeEmailBatchingJob(128)
job := NewEmailBatchingJob(th.App, 128)
job.pendingNotifications[th.BasicUser.Id] = []*batchedNotification{
{
post: &model.Post{
@@ -201,7 +201,7 @@ func TestCheckPendingNotifications(t *testing.T) {
*/
func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
th := Setup().InitBasic()
job := MakeEmailBatchingJob(128)
job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)
@@ -237,7 +237,7 @@ func TestCheckPendingNotificationsDefaultInterval(t *testing.T) {
*/
func TestCheckPendingNotificationsCantParseInterval(t *testing.T) {
th := Setup().InitBasic()
job := MakeEmailBatchingJob(128)
job := NewEmailBatchingJob(th.App, 128)
// bypasses recent user activity check
channelMember := store.Must(th.App.Srv.Store.Channel().GetMember(th.BasicChannel.Id, th.BasicUser.Id)).(*model.ChannelMember)

View File

@@ -61,7 +61,7 @@ func (a *App) CreateEmoji(sessionUserId string, emoji *model.Emoji, multiPartIma
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EMOJI_ADDED, "", "", "", nil)
message.Add("emoji", emoji.ToJson())
Publish(message)
a.Publish(message)
return result.Data.(*model.Emoji), nil
}
}

View File

@@ -94,7 +94,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
if result := <-a.Srv.Store.User().GetProfilesByUsernames(potentialOtherMentions, team.Id); result.Err == nil {
outOfChannelMentions := result.Data.([]*model.User)
if channel.Type != model.CHANNEL_GROUP {
go sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
go a.sendOutOfChannelMentions(sender, post, team.Id, outOfChannelMentions)
}
}
}
@@ -186,7 +186,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @here is disabled
if hereNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
hereNotification = false
SendEphemeralPost(
a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -198,7 +198,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @channel is disabled
if channelNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
SendEphemeralPost(
a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -210,7 +210,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
// If the channel has more than 1K users then @all is disabled
if allNotification && int64(len(profileMap)) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel {
SendEphemeralPost(
a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,
@@ -298,7 +298,7 @@ func (a *App) SendNotifications(post *model.Post, team *model.Team, channel *mod
message.Add("mentions", model.ArrayToJson(mentionedUsersList))
}
Publish(message)
a.Publish(message)
return mentionedUsersList, nil
}
@@ -337,7 +337,7 @@ func (a *App) sendNotificationEmail(post *model.Post, user *model.User, channel
}
if sendBatched {
if err := AddNotificationEmailToBatch(user, post, team); err == nil {
if err := a.AddNotificationEmailToBatch(user, post, team); err == nil {
return nil
}
}
@@ -717,7 +717,7 @@ func (a *App) getMobileAppSessions(userId string) ([]*model.Session, *model.AppE
}
}
func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
func (a *App) sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId string, users []*model.User) *model.AppError {
if len(users) == 0 {
return nil
}
@@ -742,7 +742,7 @@ func sendOutOfChannelMentions(sender *model.User, post *model.Post, teamId strin
})
}
SendEphemeralPost(
a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: post.ChannelId,

View File

@@ -335,7 +335,7 @@ func (a *App) UnpackAndActivatePlugin(pluginFile io.Reader) (*model.Manifest, *m
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_ACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
Publish(message)
a.Publish(message)
}
return manifest, nil
@@ -383,7 +383,7 @@ func (a *App) RemovePlugin(id string) *model.AppError {
if manifest.HasClient() {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PLUGIN_DEACTIVATED, "", "", "", nil)
message.Add("manifest", manifest.ClientManifest())
Publish(message)
a.Publish(message)
}
return nil

View File

@@ -51,7 +51,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
}
T := utils.GetUserTranslations(user.Locale)
SendEphemeralPost(
a.SendEphemeralPost(
post.UserId,
&model.Post{
ChannelId: channel.Id,
@@ -75,7 +75,7 @@ func (a *App) CreatePostAsUser(post *model.Post) (*model.Post, *model.AppError)
if *utils.Cfg.ServiceSettings.EnableChannelViewedMessages {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_CHANNEL_VIEWED, "", "", post.UserId, nil)
message.Add("channel_id", post.ChannelId)
go Publish(message)
go a.Publish(message)
}
}
@@ -239,7 +239,7 @@ func parseSlackLinksToMarkdown(text string) string {
return linkWithTextRegex.ReplaceAllString(text, "[${2}](${1})")
}
func SendEphemeralPost(userId string, post *model.Post) *model.Post {
func (a *App) SendEphemeralPost(userId string, post *model.Post) *model.Post {
post.Type = model.POST_EPHEMERAL
// fill in fields which haven't been specified which have sensible defaults
@@ -256,7 +256,7 @@ func SendEphemeralPost(userId string, post *model.Post) *model.Post {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_EPHEMERAL_MESSAGE, "", post.ChannelId, userId, nil)
message.Add("post", post.ToJson())
go Publish(message)
go a.Publish(message)
return post
}
@@ -330,7 +330,7 @@ func (a *App) UpdatePost(post *model.Post, safeUpdate bool) (*model.Post, *model
}()
}
sendUpdatedPostEvent(rpost)
a.sendUpdatedPostEvent(rpost)
a.InvalidateCacheForChannelPosts(rpost.ChannelId)
@@ -351,17 +351,17 @@ func (a *App) PatchPost(postId string, patch *model.PostPatch) (*model.Post, *mo
return nil, err
}
sendUpdatedPostEvent(updatedPost)
a.sendUpdatedPostEvent(updatedPost)
a.InvalidateCacheForChannelPosts(updatedPost.ChannelId)
return updatedPost, nil
}
func sendUpdatedPostEvent(post *model.Post) {
func (a *App) sendUpdatedPostEvent(post *model.Post) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
go Publish(message)
go a.Publish(message)
}
func (a *App) GetPostsPage(channelId string, page int, perPage int) (*model.PostList, *model.AppError) {
@@ -502,7 +502,7 @@ func (a *App) DeletePost(postId string) (*model.Post, *model.AppError) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_DELETED, "", post.ChannelId, "", nil)
message.Add("post", post.ToJson())
go Publish(message)
go a.Publish(message)
go a.DeletePostFiles(post)
go a.DeleteFlaggedPosts(post.Id)
@@ -724,7 +724,7 @@ func (a *App) DoPostAction(postId string, actionId string, userId string) *model
}
ephemeralPost.UserId = userId
ephemeralPost.AddProp("from_webhook", "true")
SendEphemeralPost(userId, ephemeralPost)
a.SendEphemeralPost(userId, ephemeralPost)
}
return nil

View File

@@ -55,7 +55,7 @@ func (a *App) UpdatePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_CHANGED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
go Publish(message)
go a.Publish(message)
return nil
}
@@ -78,7 +78,7 @@ func (a *App) DeletePreferences(userId string, preferences model.Preferences) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_PREFERENCES_DELETED, "", "", userId, nil)
message.Add("preferences", preferences.ToJson())
go Publish(message)
go a.Publish(message)
return nil
}

View File

@@ -51,7 +51,7 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
// send out that a reaction has been added/removed
message := model.NewWebSocketEvent(event, "", post.ChannelId, "", nil)
message.Add("reaction", reaction.ToJson())
Publish(message)
a.Publish(message)
// The post is always modified since the UpdateAt always changes
a.InvalidateCacheForChannelPosts(post.ChannelId)
@@ -59,5 +59,5 @@ func (a *App) sendReactionEvent(event string, reaction *model.Reaction, post *mo
post.UpdateAt = model.GetMillis()
umessage := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_POST_EDITED, "", post.ChannelId, "", nil)
umessage.Add("post", post.ToJson())
Publish(umessage)
a.Publish(umessage)
}

View File

@@ -218,7 +218,7 @@ func (a *App) StopServer() {
a.Srv.GracefulServer.Stop(TIME_TO_WAIT_FOR_CONNECTIONS_TO_CLOSE_ON_SERVER_SHUTDOWN)
a.Srv.Store.Close()
HubStop()
a.HubStop()
a.ShutDownPlugins()

View File

@@ -107,8 +107,7 @@ func (a *App) RevokeAllSessions(userId string) *model.AppError {
}
func (a *App) ClearSessionCacheForUser(userId string) {
ClearSessionCacheForUserSkipClusterSend(userId)
a.ClearSessionCacheForUserSkipClusterSend(userId)
if a.Cluster != nil {
msg := &model.ClusterMessage{
@@ -120,7 +119,7 @@ func (a *App) ClearSessionCacheForUser(userId string) {
}
}
func ClearSessionCacheForUserSkipClusterSend(userId string) {
func (a *App) ClearSessionCacheForUserSkipClusterSend(userId string) {
keys := sessionCache.Keys()
for _, key := range keys {
@@ -132,8 +131,7 @@ func ClearSessionCacheForUserSkipClusterSend(userId string) {
}
}
InvalidateWebConnSessionCacheForUser(userId)
a.InvalidateWebConnSessionCacheForUser(userId)
}
func AddSessionToCache(session *model.Session) {

View File

@@ -213,15 +213,15 @@ func (a *App) SetStatusOnline(userId string, sessionId string, manual bool) {
}
if broadcast {
BroadcastStatus(status)
a.BroadcastStatus(status)
}
}
func BroadcastStatus(status *model.Status) {
func (a *App) BroadcastStatus(status *model.Status) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", status.Status)
event.Add("user_id", status.UserId)
go Publish(event)
go a.Publish(event)
}
func (a *App) SetStatusOffline(userId string, manual bool) {
@@ -245,7 +245,7 @@ func (a *App) SetStatusOffline(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_OFFLINE)
event.Add("user_id", status.UserId)
go Publish(event)
go a.Publish(event)
}
func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
@@ -286,7 +286,7 @@ func (a *App) SetStatusAwayIfNeeded(userId string, manual bool) {
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_STATUS_CHANGE, "", "", status.UserId, nil)
event.Add("status", model.STATUS_AWAY)
event.Add("user_id", status.UserId)
go Publish(event)
go a.Publish(event)
}
func GetStatusFromCache(userId string) *model.Status {

View File

@@ -106,7 +106,7 @@ func (a *App) UpdateTeam(team *model.Team) (*model.Team, *model.AppError) {
oldTeam.Sanitize()
sendUpdatedTeamEvent(oldTeam)
a.sendUpdatedTeamEvent(oldTeam)
return oldTeam, nil
}
@@ -126,15 +126,15 @@ func (a *App) PatchTeam(teamId string, patch *model.TeamPatch) (*model.Team, *mo
updatedTeam.Sanitize()
sendUpdatedTeamEvent(updatedTeam)
a.sendUpdatedTeamEvent(updatedTeam)
return updatedTeam, nil
}
func sendUpdatedTeamEvent(team *model.Team) {
func (a *App) sendUpdatedTeamEvent(team *model.Team) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_UPDATE_TEAM, "", "", "", nil)
message.Add("team", team.ToJson())
go Publish(message)
go a.Publish(message)
}
func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles string) (*model.TeamMember, *model.AppError) {
@@ -163,16 +163,16 @@ func (a *App) UpdateTeamMemberRoles(teamId string, userId string, newRoles strin
a.ClearSessionCacheForUser(userId)
sendUpdatedMemberRoleEvent(userId, member)
a.sendUpdatedMemberRoleEvent(userId, member)
return member, nil
}
func sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
func (a *App) sendUpdatedMemberRoleEvent(userId string, member *model.TeamMember) {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_MEMBERROLE_UPDATED, "", "", userId, nil)
message.Add("member", member.ToJson())
go Publish(message)
go a.Publish(message)
}
func (a *App) AddUserToTeam(teamId string, userId string, userRequestorId string) (*model.Team, *model.AppError) {
@@ -330,7 +330,7 @@ func (a *App) JoinUserToTeam(team *model.Team, user *model.User, userRequestorId
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", user.Id, nil)
message.Add("team_id", team.Id)
message.Add("user_id", user.Id)
Publish(message)
a.Publish(message)
return nil
}
@@ -462,7 +462,7 @@ func (a *App) AddTeamMember(teamId, userId string) (*model.TeamMember, *model.Ap
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
Publish(message)
a.Publish(message)
return teamMember, nil
}
@@ -484,7 +484,7 @@ func (a *App) AddTeamMembers(teamId string, userIds []string, userRequestorId st
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_ADDED_TO_TEAM, "", "", userId, nil)
message.Add("team_id", teamId)
message.Add("user_id", userId)
Publish(message)
a.Publish(message)
}
return members, nil
@@ -603,7 +603,7 @@ func (a *App) LeaveTeam(team *model.Team, user *model.User) *model.AppError {
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_LEAVE_TEAM, team.Id, "", "", nil)
message.Add("user_id", user.Id)
message.Add("team_id", team.Id)
Publish(message)
a.Publish(message)
teamMember.Roles = ""
teamMember.DeleteAt = model.GetMillis()

View File

@@ -202,7 +202,7 @@ func (a *App) CreateUser(user *model.User) (*model.User, *model.AppError) {
// This message goes to everyone, so the teamId, channelId and userId are irrelevant
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_NEW_USER, "", "", "", nil)
message.Add("user_id", ruser.Id)
go Publish(message)
go a.Publish(message)
return ruser, nil
}
@@ -829,7 +829,7 @@ func (a *App) SetProfileImage(userId string, imageData *multipart.FileHeader) *m
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
Publish(message)
a.Publish(message)
}
return nil
@@ -950,7 +950,7 @@ func (a *App) UpdateUserAsUser(user *model.User, asAdmin bool) (*model.User, *mo
return nil, err
}
sendUpdatedUserEvent(*updatedUser, asAdmin)
a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
@@ -968,19 +968,19 @@ func (a *App) PatchUser(userId string, patch *model.UserPatch, asAdmin bool) (*m
return nil, err
}
sendUpdatedUserEvent(*updatedUser, asAdmin)
a.sendUpdatedUserEvent(*updatedUser, asAdmin)
return updatedUser, nil
}
func sendUpdatedUserEvent(user model.User, asAdmin bool) {
func (a *App) sendUpdatedUserEvent(user model.User, asAdmin bool) {
SanitizeProfile(&user, asAdmin)
omitUsers := make(map[string]bool, 1)
omitUsers[user.Id] = true
message := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_USER_UPDATED, "", "", "", omitUsers)
message.Add("user", user)
go Publish(message)
go a.Publish(message)
}
func (a *App) UpdateUser(user *model.User, sendNotifications bool) (*model.User, *model.AppError) {

View File

@@ -96,7 +96,7 @@ func (c *WebConn) SetSession(v *model.Session) {
func (c *WebConn) ReadPump() {
defer func() {
HubUnregister(c)
c.App.HubUnregister(c)
c.WebSocket.Close()
}()
c.WebSocket.SetReadLimit(model.SOCKET_MAX_MESSAGE_SIZE_KB)

View File

@@ -29,6 +29,7 @@ type Hub struct {
// connectionCount should be kept first.
// See https://github.com/mattermost/mattermost-server/pull/7281
connectionCount int64
app *App
connections []*WebConn
connectionIndex int
register chan *WebConn
@@ -40,11 +41,9 @@ type Hub struct {
goroutineId int
}
var hubs []*Hub = make([]*Hub, 0)
var stopCheckingForDeadlock chan bool
func NewWebHub() *Hub {
func (a *App) NewWebHub() *Hub {
return &Hub{
app: a,
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
@@ -55,26 +54,27 @@ func NewWebHub() *Hub {
}
}
func TotalWebsocketConnections() int {
func (a *App) TotalWebsocketConnections() int {
count := int64(0)
for _, hub := range hubs {
for _, hub := range a.Hubs {
count = count + atomic.LoadInt64(&hub.connectionCount)
}
return int(count)
}
func HubStart() {
func (a *App) HubStart() {
// Total number of hubs is twice the number of CPUs.
numberOfHubs := runtime.NumCPU() * 2
l4g.Info(utils.T("api.web_hub.start.starting.debug"), numberOfHubs)
hubs = make([]*Hub, numberOfHubs)
a.Hubs = make([]*Hub, numberOfHubs)
a.HubsStopCheckingForDeadlock = make(chan bool, 1)
for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
hubs[i].connectionIndex = i
hubs[i].Start()
for i := 0; i < len(a.Hubs); i++ {
a.Hubs[i] = a.NewWebHub()
a.Hubs[i].connectionIndex = i
a.Hubs[i].Start()
}
go func() {
@@ -84,12 +84,10 @@ func HubStart() {
ticker.Stop()
}()
stopCheckingForDeadlock = make(chan bool, 1)
for {
select {
case <-ticker.C:
for _, hub := range hubs {
for _, hub := range a.Hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
buf := make([]byte, 1<<16)
@@ -105,46 +103,42 @@ func HubStart() {
}
}
case <-stopCheckingForDeadlock:
case <-a.HubsStopCheckingForDeadlock:
return
}
}
}()
}
func HubStop() {
func (a *App) HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))
select {
case stopCheckingForDeadlock <- true:
case a.HubsStopCheckingForDeadlock <- true:
default:
l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
}
for _, hub := range hubs {
for _, hub := range a.Hubs {
hub.Stop()
}
hubs = make([]*Hub, 0)
a.Hubs = []*Hub{}
}
func GetHubForUserId(userId string) *Hub {
func (a *App) GetHubForUserId(userId string) *Hub {
hash := fnv.New32a()
hash.Write([]byte(userId))
index := hash.Sum32() % uint32(len(hubs))
return hubs[index]
index := hash.Sum32() % uint32(len(a.Hubs))
return a.Hubs[index]
}
func HubRegister(webConn *WebConn) {
GetHubForUserId(webConn.UserId).Register(webConn)
func (a *App) HubRegister(webConn *WebConn) {
a.GetHubForUserId(webConn.UserId).Register(webConn)
}
func HubUnregister(webConn *WebConn) {
GetHubForUserId(webConn.UserId).Unregister(webConn)
}
func Publish(message *model.WebSocketEvent) {
Global().Publish(message)
func (a *App) HubUnregister(webConn *WebConn) {
a.GetHubForUserId(webConn.UserId).Unregister(webConn)
}
func (a *App) Publish(message *model.WebSocketEvent) {
@@ -152,7 +146,7 @@ func (a *App) Publish(message *model.WebSocketEvent) {
metrics.IncrementWebsocketEvent(message.Event)
}
PublishSkipClusterSend(message)
a.PublishSkipClusterSend(message)
if a.Cluster != nil {
cm := &model.ClusterMessage{
@@ -173,8 +167,8 @@ func (a *App) Publish(message *model.WebSocketEvent) {
}
}
func PublishSkipClusterSend(message *model.WebSocketEvent) {
for _, hub := range hubs {
func (a *App) PublishSkipClusterSend(message *model.WebSocketEvent) {
for _, hub := range a.Hubs {
hub.Broadcast(message)
}
}
@@ -291,8 +285,8 @@ func (a *App) InvalidateCacheForUserSkipClusterSend(userId string) {
a.Srv.Store.User().InvalidateProfilesInChannelCacheByUser(userId)
a.Srv.Store.User().InvalidatProfileCacheForUser(userId)
if len(hubs) != 0 {
GetHubForUserId(userId).InvalidateUser(userId)
if len(a.Hubs) != 0 {
a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -313,9 +307,9 @@ func (a *App) InvalidateCacheForWebhookSkipClusterSend(webhookId string) {
a.Srv.Store.Webhook().InvalidateWebhookCache(webhookId)
}
func InvalidateWebConnSessionCacheForUser(userId string) {
if len(hubs) != 0 {
GetHubForUserId(userId).InvalidateUser(userId)
func (a *App) InvalidateWebConnSessionCacheForUser(userId string) {
if len(a.Hubs) != 0 {
a.GetHubForUserId(userId).InvalidateUser(userId)
}
}
@@ -401,7 +395,7 @@ func (h *Hub) Start() {
}
if !found {
go Global().SetStatusOffline(userId, false)
go h.app.SetStatusOffline(userId, false)
}
case userId := <-h.invalidateUser:

View File

@@ -17,13 +17,15 @@ type webSocketHandler interface {
}
type WebSocketRouter struct {
app *App
handlers map[string]webSocketHandler
}
func NewWebSocketRouter() *WebSocketRouter {
router := &WebSocketRouter{}
router.handlers = make(map[string]webSocketHandler)
return router
func (a *App) NewWebSocketRouter() *WebSocketRouter {
return &WebSocketRouter{
app: a,
handlers: make(map[string]webSocketHandler),
}
}
func (wr *WebSocketRouter) Handle(action string, handler webSocketHandler) {
@@ -54,21 +56,21 @@ func (wr *WebSocketRouter) ServeWebSocket(conn *WebConn, r *model.WebSocketReque
return
}
session, err := Global().GetSession(token)
session, err := wr.app.GetSession(token)
if err != nil {
conn.WebSocket.Close()
} else {
go func() {
Global().SetStatusOnline(session.UserId, session.Id, false)
Global().UpdateLastActivityAtIfNeeded(*session)
wr.app.SetStatusOnline(session.UserId, session.Id, false)
wr.app.UpdateLastActivityAtIfNeeded(*session)
}()
conn.SetSession(session)
conn.SetSessionToken(session.Token)
conn.UserId = session.UserId
HubRegister(conn)
wr.app.HubRegister(conn)
resp := model.NewWebSocketResponse(model.STATUS_OK, r.Seq, nil)
conn.Send <- resp

View File

@@ -69,6 +69,7 @@ func runServer(configFileLocation string) {
a.NewServer()
a.InitStores()
a.Srv.Router = api.NewRouter()
a.Srv.WebSocketRouter = a.NewWebSocketRouter()
if model.BuildEnterpriseReady == "true" {
a.LoadLicense()
@@ -80,10 +81,9 @@ func runServer(configFileLocation string) {
l4g.Error("Unable to find webapp directory, could not initialize plugins")
}
wsapi.InitRouter()
api4.Init(a, a.Srv.Router, false)
api3 := api.Init(a, a.Srv.Router)
wsapi.InitApi()
wsapi.Init(a, a.Srv.WebSocketRouter)
web.Init(api3)
if !utils.IsLicensed() && len(utils.Cfg.SqlSettings.DataSourceReplicas) > 1 {

View File

@@ -52,10 +52,10 @@ func webClientTestsCmdF(cmd *cobra.Command, args []string) error {
utils.InitTranslations(utils.Cfg.LocalizationSettings)
a.Srv.Router = api.NewRouter()
wsapi.InitRouter()
a.Srv.WebSocketRouter = a.NewWebSocketRouter()
api4.Init(a, a.Srv.Router, false)
api.Init(a, a.Srv.Router)
wsapi.InitApi()
wsapi.Init(a, a.Srv.WebSocketRouter)
setupClientTests()
a.StartServer()
runWebClientTests()
@@ -72,10 +72,10 @@ func serverForWebClientTestsCmdF(cmd *cobra.Command, args []string) error {
utils.InitTranslations(utils.Cfg.LocalizationSettings)
a.Srv.Router = api.NewRouter()
wsapi.InitRouter()
a.Srv.WebSocketRouter = a.NewWebSocketRouter()
api4.Init(a, a.Srv.Router, false)
api.Init(a, a.Srv.Router)
wsapi.InitApi()
wsapi.Init(a, a.Srv.WebSocketRouter)
setupClientTests()
a.StartServer()

View File

@@ -7,15 +7,21 @@ import (
"github.com/mattermost/mattermost-server/app"
)
func InitRouter() {
app.Global().Srv.WebSocketRouter = app.NewWebSocketRouter()
type API struct {
App *app.App
Router *app.WebSocketRouter
}
func InitApi() {
InitUser()
InitSystem()
InitStatus()
InitWebrtc()
func Init(a *app.App, router *app.WebSocketRouter) {
api := &API{
App: a,
Router: router,
}
app.HubStart()
api.InitUser()
api.InitSystem()
api.InitStatus()
api.InitWebrtc()
a.HubStart()
}

View File

@@ -10,11 +10,11 @@ import (
"github.com/mattermost/mattermost-server/utils"
)
func InitStatus() {
func (api *API) InitStatus() {
l4g.Debug(utils.T("wsapi.status.init.debug"))
app.Global().Srv.WebSocketRouter.Handle("get_statuses", ApiWebSocketHandler(getStatuses))
app.Global().Srv.WebSocketRouter.Handle("get_statuses_by_ids", ApiWebSocketHandler(getStatusesByIds))
api.Router.Handle("get_statuses", api.ApiWebSocketHandler(getStatuses))
api.Router.Handle("get_statuses_by_ids", api.ApiWebSocketHandler(api.getStatusesByIds))
}
func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
@@ -22,14 +22,14 @@ func getStatuses(req *model.WebSocketRequest) (map[string]interface{}, *model.Ap
return model.StatusMapToInterfaceMap(statusMap), nil
}
func getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
func (api *API) getStatusesByIds(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var userIds []string
if userIds = model.ArrayFromInterface(req.Data["user_ids"]); len(userIds) == 0 {
l4g.Error(model.StringInterfaceToJson(req.Data))
return nil, NewInvalidWebSocketParamError(req.Action, "user_ids")
}
statusMap, err := app.Global().GetStatusesByIds(userIds)
statusMap, err := api.App.GetStatusesByIds(userIds)
if err != nil {
return nil, err
}

View File

@@ -5,15 +5,14 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
func InitSystem() {
func (api *API) InitSystem() {
l4g.Debug(utils.T("wsapi.system.init.debug"))
app.Global().Srv.WebSocketRouter.Handle("ping", ApiWebSocketHandler(ping))
api.Router.Handle("ping", api.ApiWebSocketHandler(ping))
}
func ping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {

View File

@@ -5,18 +5,17 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
func InitUser() {
func (api *API) InitUser() {
l4g.Debug(utils.T("wsapi.user.init.debug"))
app.Global().Srv.WebSocketRouter.Handle("user_typing", ApiWebSocketHandler(userTyping))
api.Router.Handle("user_typing", api.ApiWebSocketHandler(api.userTyping))
}
func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
func (api *API) userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var ok bool
var channelId string
if channelId, ok = req.Data["channel_id"].(string); !ok || len(channelId) != 26 {
@@ -34,7 +33,7 @@ func userTyping(req *model.WebSocketRequest) (map[string]interface{}, *model.App
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_TYPING, "", channelId, "", omitUsers)
event.Add("parent_id", parentId)
event.Add("user_id", req.Session.UserId)
go app.Publish(event)
go api.App.Publish(event)
return nil, nil
}

View File

@@ -5,18 +5,17 @@ package wsapi
import (
l4g "github.com/alecthomas/log4go"
"github.com/mattermost/mattermost-server/app"
"github.com/mattermost/mattermost-server/model"
"github.com/mattermost/mattermost-server/utils"
)
func InitWebrtc() {
func (api *API) InitWebrtc() {
l4g.Debug(utils.T("wsapi.webtrc.init.debug"))
app.Global().Srv.WebSocketRouter.Handle("webrtc", ApiWebSocketHandler(webrtcMessage))
api.Router.Handle("webrtc", api.ApiWebSocketHandler(api.webrtcMessage))
}
func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
func (api *API) webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.AppError) {
var ok bool
var toUserId string
if toUserId, ok = req.Data["to_user_id"].(string); !ok || len(toUserId) != 26 {
@@ -25,7 +24,7 @@ func webrtcMessage(req *model.WebSocketRequest) (map[string]interface{}, *model.
event := model.NewWebSocketEvent(model.WEBSOCKET_EVENT_WEBRTC, "", "", toUserId, nil)
event.Data = req.Data
go app.Publish(event)
go api.App.Publish(event)
return nil, nil
}

View File

@@ -13,18 +13,19 @@ import (
"github.com/mattermost/mattermost-server/utils"
)
func ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler {
return webSocketHandler{wh}
func (api *API) ApiWebSocketHandler(wh func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)) webSocketHandler {
return webSocketHandler{api.App, wh}
}
type webSocketHandler struct {
app *app.App
handlerFunc func(*model.WebSocketRequest) (map[string]interface{}, *model.AppError)
}
func (wh webSocketHandler) ServeWebSocket(conn *app.WebConn, r *model.WebSocketRequest) {
l4g.Debug("/api/v3/users/websocket:%s", r.Action)
session, sessionErr := app.Global().GetSession(conn.GetSessionToken())
session, sessionErr := wh.app.GetSession(conn.GetSessionToken())
if sessionErr != nil {
l4g.Error(utils.T("api.web_socket_handler.log.error"), "/api/v3/users/websocket", r.Action, r.Seq, conn.UserId, sessionErr.SystemMessage(utils.T), sessionErr.Error())
sessionErr.DetailedError = ""