diff --git a/app/notification_push.go b/app/notification_push.go index ab986f2932..3dd056ce1a 100644 --- a/app/notification_push.go +++ b/app/notification_push.go @@ -16,11 +16,13 @@ import ( "github.com/mattermost/mattermost-server/v5/utils" ) -type NotificationType string +type notificationType string -const NOTIFICATION_TYPE_CLEAR NotificationType = "clear" -const NOTIFICATION_TYPE_MESSAGE NotificationType = "message" -const NOTIFICATION_TYPE_UPDATE_BADGE NotificationType = "update_badge" +const ( + notificationTypeClear notificationType = "clear" + notificationTypeMessage notificationType = "message" + notificationTypeUpdateBadge notificationType = "update_badge" +) const PUSH_NOTIFICATION_HUB_WORKERS = 1000 const PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER = 50 @@ -30,7 +32,7 @@ type PushNotificationsHub struct { } type PushNotification struct { - notificationType NotificationType + notificationType notificationType currentSessionId string userId string channelId string @@ -155,7 +157,7 @@ func (a *App) sendPushNotification(notification *PostNotification, user *model.U c := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(user.Id) c <- PushNotification{ - notificationType: NOTIFICATION_TYPE_MESSAGE, + notificationType: notificationTypeMessage, post: post, user: user, channel: channel, @@ -229,7 +231,7 @@ func (a *App) clearPushNotificationSync(currentSessionId, userId, channelId stri func (a *App) clearPushNotification(currentSessionId, userId, channelId string) { channel := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(userId) channel <- PushNotification{ - notificationType: NOTIFICATION_TYPE_CLEAR, + notificationType: notificationTypeClear, currentSessionId: currentSessionId, userId: userId, channelId: channelId, @@ -257,7 +259,7 @@ func (a *App) updateMobileAppBadgeSync(userId string) *model.AppError { func (a *App) UpdateMobileAppBadge(userId string) { channel := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(userId) channel <- PushNotification{ - notificationType: NOTIFICATION_TYPE_UPDATE_BADGE, + notificationType: notificationTypeUpdateBadge, userId: userId, } } @@ -275,11 +277,10 @@ func (a *App) createPushNotificationsHub() { func (a *App) pushNotificationWorker(notifications chan PushNotification) { for notification := range notifications { var err *model.AppError - switch notification.notificationType { - case NOTIFICATION_TYPE_CLEAR: + case notificationTypeClear: err = a.clearPushNotificationSync(notification.currentSessionId, notification.userId, notification.channelId) - case NOTIFICATION_TYPE_MESSAGE: + case notificationTypeMessage: err = a.sendPushNotificationSync( notification.post, notification.user, @@ -290,7 +291,7 @@ func (a *App) pushNotificationWorker(notifications chan PushNotification) { notification.channelWideMention, notification.replyToThreadType, ) - case NOTIFICATION_TYPE_UPDATE_BADGE: + case notificationTypeUpdateBadge: err = a.updateMobileAppBadgeSync(notification.userId) default: mlog.Error("Invalid notification type", mlog.String("notification_type", string(notification.notificationType))) @@ -326,7 +327,8 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session mlog.String("status", model.PUSH_SEND_PREPARE), ) - request, err := http.NewRequest("POST", strings.TrimRight(*a.Config().EmailSettings.PushNotificationServer, "/")+model.API_URL_SUFFIX_V1+"/send_push", strings.NewReader(msg.ToJson())) + url := strings.TrimRight(*a.Config().EmailSettings.PushNotificationServer, "/") + model.API_URL_SUFFIX_V1 + "/send_push" + request, err := http.NewRequest("POST", url, strings.NewReader(msg.ToJson())) if err != nil { return err } @@ -335,21 +337,18 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session if err != nil { return err } - defer resp.Body.Close() pushResponse := model.PushResponseFromJson(resp.Body) - if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_REMOVE { + switch pushResponse[model.PUSH_STATUS] { + case model.PUSH_STATUS_REMOVE: a.AttachDeviceId(session.Id, "", session.ExpiresAt) a.ClearSessionCacheForUser(session.UserId) return errors.New("Device was reported as removed") - } - - if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_FAIL { + case model.PUSH_STATUS_FAIL: return errors.New(pushResponse[model.PUSH_STATUS_ERROR_MSG]) } - return nil } diff --git a/app/notification_push_test.go b/app/notification_push_test.go index 862797d001..83132cb118 100644 --- a/app/notification_push_test.go +++ b/app/notification_push_test.go @@ -5,7 +5,11 @@ package app import ( "fmt" + "net/http" + "net/http/httptest" + "sync" "testing" + "time" "github.com/mattermost/mattermost-server/v5/model" "github.com/mattermost/mattermost-server/v5/store/storetest/mocks" @@ -971,19 +975,505 @@ func TestBuildPushNotificationMessageMentions(t *testing.T) { func TestSendPushNotifications(t *testing.T) { th := Setup(t).InitBasic() - th.App.CreateSession(&model.Session{ + defer th.TearDown() + _, err := th.App.CreateSession(&model.Session{ UserId: th.BasicUser.Id, DeviceId: "test", ExpiresAt: model.GetMillis() + 100000, }) - defer th.TearDown() + require.Nil(t, err) t.Run("should return error if data is not valid or nil", func(t *testing.T) { err := th.App.sendPushNotificationToAllSessions(nil, th.BasicUser.Id, "") - assert.NotNil(t, err) - assert.Equal(t, "pushNotification: An error occurred building the push notification message., ", err.Error()) + require.NotNil(t, err) + assert.Equal(t, "api.push_notifications.message.parse.app_error", err.Id) // Errors derived of using an empty object are handled internally through the notifications log err = th.App.sendPushNotificationToAllSessions(&model.PushNotification{}, th.BasicUser.Id, "") - assert.Nil(t, err) + require.Nil(t, err) }) } + +// testPushNotificationHandler is an HTTP handler to record push notifications +// being sent from the client. +// It records the number of requests sent to it, and stores all the requests +// to be verified later. +type testPushNotificationHandler struct { + t testing.TB + serialUserMap sync.Map + mut sync.RWMutex + behavior string + _numReqs int + _notifications []*model.PushNotification + _notificationAcks []*model.PushNotificationAck +} + +// handleReq parses a push notification from the body, and stores it. +// It also sends an appropriate response depending on the behavior set. +// If the behavior is simple, it always sends an OK response. Otherwise, +// it alternates between an OK and a REMOVE response. +func (h *testPushNotificationHandler) handleReq(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/v1/send_push", "/api/v1/ack": + h.t.Helper() + + // Don't do any checking if it's a benchmark + if _, ok := h.t.(*testing.B); ok { + resp := model.NewOkPushResponse() + fmt.Fprintln(w, (&resp).ToJson()) + return + } + + var notification *model.PushNotification + var notificationAck *model.PushNotificationAck + var err error + if r.URL.Path == "/api/v1/send_push" { + notification, err = model.PushNotificationFromJson(r.Body) + if err != nil { + resp := model.NewErrorPushResponse("fail") + fmt.Fprintln(w, (&resp).ToJson()) + return + } + // We verify that messages are being sent in order per-device. + if notification.DeviceId != "" { + if _, ok := h.serialUserMap.Load(notification.DeviceId); ok { + h.t.Fatalf("device id: %s being sent concurrently", notification.DeviceId) + } + h.serialUserMap.LoadOrStore(notification.DeviceId, true) + defer h.serialUserMap.Delete(notification.DeviceId) + } + } else { + notificationAck, err = model.PushNotificationAckFromJson(r.Body) + if err != nil { + resp := model.NewErrorPushResponse("fail") + fmt.Fprintln(w, (&resp).ToJson()) + return + } + } + // Updating internal state. + h.mut.Lock() + defer h.mut.Unlock() + h._numReqs++ + // Little bit of duplicate condition check so that we can check the in-order property + // first. + if r.URL.Path == "/api/v1/send_push" { + h._notifications = append(h._notifications, notification) + } else { + h._notificationAcks = append(h._notificationAcks, notificationAck) + } + + var resp model.PushResponse + if h.behavior == "simple" { + resp = model.NewOkPushResponse() + } else { + // alternating between ok and remove response to test both code paths. + if h._numReqs%2 == 0 { + resp = model.NewOkPushResponse() + } else { + resp = model.NewRemovePushResponse() + } + } + fmt.Fprintln(w, (&resp).ToJson()) + } +} + +func (h *testPushNotificationHandler) numReqs() int { + h.mut.RLock() + defer h.mut.RUnlock() + return h._numReqs +} + +func (h *testPushNotificationHandler) notifications() []*model.PushNotification { + h.mut.RLock() + defer h.mut.RUnlock() + return h._notifications +} + +func (h *testPushNotificationHandler) notificationAcks() []*model.PushNotificationAck { + h.mut.RLock() + defer h.mut.RUnlock() + return h._notificationAcks +} + +func TestClearPushNotificationSync(t *testing.T) { + th := SetupWithStoreMock(t) + defer th.TearDown() + + handler := &testPushNotificationHandler{t: t} + pushServer := httptest.NewServer( + http.HandlerFunc(handler.handleReq), + ) + defer pushServer.Close() + + sess1 := &model.Session{ + Id: "id1", + UserId: "user1", + DeviceId: "test1", + ExpiresAt: model.GetMillis() + 100000, + } + sess2 := &model.Session{ + Id: "id2", + UserId: "user1", + DeviceId: "test2", + ExpiresAt: model.GetMillis() + 100000, + } + + mockStore := th.App.Srv().Store.(*mocks.Store) + mockUserStore := mocks.UserStore{} + mockUserStore.On("Count", mock.Anything).Return(int64(10), nil) + mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil) + mockPostStore := mocks.PostStore{} + mockPostStore.On("GetMaxPostSize").Return(65535, nil) + mockSystemStore := mocks.SystemStore{} + mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil) + mockSessionStore := mocks.SessionStore{} + mockSessionStore.On("GetSessionsWithActiveDeviceIds", mock.AnythingOfType("string")).Return([]*model.Session{sess1, sess2}, nil) + mockSessionStore.On("UpdateDeviceId", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64")).Return("testdeviceID", nil) + mockStore.On("User").Return(&mockUserStore) + mockStore.On("Post").Return(&mockPostStore) + mockStore.On("System").Return(&mockSystemStore) + mockStore.On("Session").Return(&mockSessionStore) + + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.EmailSettings.PushNotificationServer = pushServer.URL + }) + + err := th.App.clearPushNotificationSync(sess1.Id, "user1", "channel1") + require.Nil(t, err) + // Server side verification. + // We verify that 1 request has been sent, and also check the message contents. + require.Equal(t, 1, handler.numReqs()) + assert.Equal(t, "channel1", handler.notifications()[0].ChannelId) + assert.Equal(t, model.PUSH_TYPE_CLEAR, handler.notifications()[0].Type) +} + +func TestUpdateMobileAppBadgeSync(t *testing.T) { + th := SetupWithStoreMock(t) + defer th.TearDown() + + handler := &testPushNotificationHandler{t: t} + pushServer := httptest.NewServer( + http.HandlerFunc(handler.handleReq), + ) + defer pushServer.Close() + + sess1 := &model.Session{ + Id: "id1", + UserId: "user1", + DeviceId: "test1", + ExpiresAt: model.GetMillis() + 100000, + } + sess2 := &model.Session{ + Id: "id2", + UserId: "user1", + DeviceId: "test2", + ExpiresAt: model.GetMillis() + 100000, + } + + mockStore := th.App.Srv().Store.(*mocks.Store) + mockUserStore := mocks.UserStore{} + mockUserStore.On("Count", mock.Anything).Return(int64(10), nil) + mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil) + mockPostStore := mocks.PostStore{} + mockPostStore.On("GetMaxPostSize").Return(65535, nil) + mockSystemStore := mocks.SystemStore{} + mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil) + mockSessionStore := mocks.SessionStore{} + mockSessionStore.On("GetSessionsWithActiveDeviceIds", mock.AnythingOfType("string")).Return([]*model.Session{sess1, sess2}, nil) + mockSessionStore.On("UpdateDeviceId", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64")).Return("testdeviceID", nil) + mockStore.On("User").Return(&mockUserStore) + mockStore.On("Post").Return(&mockPostStore) + mockStore.On("System").Return(&mockSystemStore) + mockStore.On("Session").Return(&mockSessionStore) + + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.EmailSettings.PushNotificationServer = pushServer.URL + }) + + err := th.App.updateMobileAppBadgeSync("user1") + require.Nil(t, err) + // Server side verification. + // We verify that 2 requests have been sent, and also check the message contents. + require.Equal(t, 2, handler.numReqs()) + assert.Equal(t, 1, handler.notifications()[0].ContentAvailable) + assert.Equal(t, model.PUSH_TYPE_UPDATE_BADGE, handler.notifications()[0].Type) + assert.Equal(t, 1, handler.notifications()[1].ContentAvailable) + assert.Equal(t, model.PUSH_TYPE_UPDATE_BADGE, handler.notifications()[1].Type) +} + +func TestSendAckToPushProxy(t *testing.T) { + th := SetupWithStoreMock(t) + defer th.TearDown() + + handler := &testPushNotificationHandler{t: t} + pushServer := httptest.NewServer( + http.HandlerFunc(handler.handleReq), + ) + defer pushServer.Close() + + mockStore := th.App.Srv().Store.(*mocks.Store) + mockUserStore := mocks.UserStore{} + mockUserStore.On("Count", mock.Anything).Return(int64(10), nil) + mockPostStore := mocks.PostStore{} + mockPostStore.On("GetMaxPostSize").Return(65535, nil) + mockSystemStore := mocks.SystemStore{} + mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil) + mockStore.On("User").Return(&mockUserStore) + mockStore.On("Post").Return(&mockPostStore) + mockStore.On("System").Return(&mockSystemStore) + + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.EmailSettings.PushNotificationServer = pushServer.URL + }) + + ack := &model.PushNotificationAck{ + Id: "testid", + NotificationType: model.PUSH_TYPE_MESSAGE, + } + err := th.App.SendAckToPushProxy(ack) + require.Nil(t, err) + // Server side verification. + // We verify that 1 request has been sent, and also check the message contents. + require.Equal(t, 1, handler.numReqs()) + assert.Equal(t, ack.Id, handler.notificationAcks()[0].Id) + assert.Equal(t, ack.NotificationType, handler.notificationAcks()[0].NotificationType) +} + +// TestAllPushNotifications is a master test which sends all verious types +// of notifications and verifies they have been properly sent. +func TestAllPushNotifications(t *testing.T) { + if testing.Short() { + t.Skip("skipping all push notifications test in short mode") + } + + th := Setup(t).InitBasic() + defer th.TearDown() + + // Create 10 users, each having 2 sessions. + type userSession struct { + user *model.User + session *model.Session + } + var testData []userSession + for i := 0; i < 10; i++ { + u := th.CreateUser() + sess, err := th.App.CreateSession(&model.Session{ + UserId: u.Id, + DeviceId: "deviceID" + u.Id, + ExpiresAt: model.GetMillis() + 100000, + }) + require.Nil(t, err) + // We don't need to track the 2nd session. + _, err = th.App.CreateSession(&model.Session{ + UserId: u.Id, + DeviceId: "deviceID" + u.Id, + ExpiresAt: model.GetMillis() + 100000, + }) + require.Nil(t, err) + _, err = th.App.AddTeamMember(th.BasicTeam.Id, u.Id) + require.Nil(t, err) + th.AddUserToChannel(u, th.BasicChannel) + testData = append(testData, userSession{ + user: u, + session: sess, + }) + } + + handler := &testPushNotificationHandler{ + t: t, + behavior: "simple", + } + pushServer := httptest.NewServer( + http.HandlerFunc(handler.handleReq), + ) + defer pushServer.Close() + + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.EmailSettings.PushNotificationServer = pushServer.URL + }) + + var wg sync.WaitGroup + for i, data := range testData { + wg.Add(1) + // Ranging between 3 types of notifications. + switch i % 3 { + case 0: + go func(user model.User) { + defer wg.Done() + notification := &PostNotification{ + Post: th.CreatePost(th.BasicChannel), + Channel: th.BasicChannel, + ProfileMap: map[string]*model.User{ + user.Id: &user, + }, + Sender: &user, + } + // testing all 3 notification types. + th.App.sendPushNotification(notification, &user, true, false, model.COMMENTS_NOTIFY_ANY) + }(*data.user) + case 1: + go func(id string) { + defer wg.Done() + th.App.UpdateMobileAppBadge(id) + }(data.user.Id) + case 2: + go func(sessID, userID string) { + defer wg.Done() + th.App.clearPushNotification(sessID, userID, th.BasicChannel.Id) + }(data.session.Id, data.user.Id) + } + } + wg.Wait() + + // Hack to let the worker goroutines complete. + time.Sleep(1 * time.Second) + // Server side verification. + assert.Equal(t, 17, handler.numReqs()) + var numClears, numMessages, numUpdateBadges int + for _, n := range handler.notifications() { + switch n.Type { + case model.PUSH_TYPE_CLEAR: + numClears++ + assert.Equal(t, th.BasicChannel.Id, n.ChannelId) + case model.PUSH_TYPE_MESSAGE: + numMessages++ + assert.Equal(t, th.BasicChannel.Id, n.ChannelId) + assert.Contains(t, n.Message, "mentioned you") + case model.PUSH_TYPE_UPDATE_BADGE: + numUpdateBadges++ + assert.Equal(t, "none", n.Sound) + assert.Equal(t, 1, n.ContentAvailable) + } + } + assert.Equal(t, 8, numMessages) + assert.Equal(t, 3, numClears) + assert.Equal(t, 6, numUpdateBadges) +} + +func BenchmarkPushNotification(b *testing.B) { + th := SetupWithStoreMock(b) + defer th.TearDown() + + handler := &testPushNotificationHandler{ + t: b, + behavior: "simple", + } + pushServer := httptest.NewServer( + http.HandlerFunc(handler.handleReq), + ) + defer pushServer.Close() + + mockStore := th.App.Srv().Store.(*mocks.Store) + mockUserStore := mocks.UserStore{} + mockUserStore.On("Count", mock.Anything).Return(int64(10), nil) + mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil) + mockPostStore := mocks.PostStore{} + mockPostStore.On("GetMaxPostSize").Return(65535, nil) + mockSystemStore := mocks.SystemStore{} + mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil) + mockSessionStore := mocks.SessionStore{} + mockPreferenceStore := mocks.PreferenceStore{} + mockPreferenceStore.On("Get", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(&model.Preference{Value: "test"}, nil) + mockStore.On("User").Return(&mockUserStore) + mockStore.On("Post").Return(&mockPostStore) + mockStore.On("System").Return(&mockSystemStore) + mockStore.On("Session").Return(&mockSessionStore) + mockStore.On("Preference").Return(&mockPreferenceStore) + + // create 50 users, each having 2 sessions. + type userSession struct { + user *model.User + session *model.Session + } + var testData []userSession + for i := 0; i < 50; i++ { + id := model.NewId() + u := &model.User{ + Id: id, + Email: "success+" + id + "@simulator.amazonses.com", + Username: "un_" + id, + Nickname: "nn_" + id, + Password: "Password1", + EmailVerified: true, + } + sess1 := &model.Session{ + Id: "id1", + UserId: u.Id, + DeviceId: "deviceID" + u.Id, + ExpiresAt: model.GetMillis() + 100000, + } + sess2 := &model.Session{ + Id: "id2", + UserId: u.Id, + DeviceId: "deviceID" + u.Id, + ExpiresAt: model.GetMillis() + 100000, + } + mockSessionStore.On("GetSessionsWithActiveDeviceIds", u.Id).Return([]*model.Session{sess1, sess2}, nil) + mockSessionStore.On("UpdateDeviceId", sess1.Id, "deviceID"+u.Id, mock.AnythingOfType("int64")).Return("deviceID"+u.Id, nil) + + testData = append(testData, userSession{ + user: u, + session: sess1, + }) + } + + th.App.UpdateConfig(func(cfg *model.Config) { + *cfg.EmailSettings.PushNotificationServer = pushServer.URL + *cfg.LogSettings.EnableConsole = false + *cfg.NotificationLogSettings.EnableConsole = false + }) + + ch := &model.Channel{ + Id: model.NewId(), + CreateAt: model.GetMillis(), + Type: model.CHANNEL_OPEN, + Name: "testch", + } + + b.ResetTimer() + // We have an inner loop which ranges the testdata slice + // and we just repeat that. + // TODO: replace 10 by b.N + then := time.Now() + for i := 0; i < 10; i++ { + var wg sync.WaitGroup + for j, data := range testData { + wg.Add(1) + // Ranging between 3 types of notifications. + switch j % 3 { + case 0: + go func(user model.User) { + defer wg.Done() + post := &model.Post{ + UserId: user.Id, + ChannelId: ch.Id, + Message: "test message", + CreateAt: model.GetMillis(), + } + notification := &PostNotification{ + Post: post, + Channel: ch, + ProfileMap: map[string]*model.User{ + user.Id: &user, + }, + Sender: &user, + } + th.App.sendPushNotification(notification, &user, true, false, model.COMMENTS_NOTIFY_ANY) + }(*data.user) + case 1: + go func(id string) { + defer wg.Done() + th.App.UpdateMobileAppBadge(id) + }(data.user.Id) + case 2: + go func(sessID, userID string) { + defer wg.Done() + th.App.clearPushNotification(sessID, userID, ch.Id) + }(data.session.Id, data.user.Id) + } + } + wg.Wait() + } + b.Logf("time taken: %v", time.Since(then)) + b.StopTimer() + time.Sleep(2 * time.Second) +}