MM-23276: Refactor push notifications (Part 1) (#14059)

Automatic Merge
This commit is contained in:
Agniva De Sarker
2020-03-18 22:28:59 +05:30
committed by GitHub
parent 16b535314d
commit 3e2175f897
2 changed files with 513 additions and 24 deletions

View File

@@ -16,11 +16,13 @@ import (
"github.com/mattermost/mattermost-server/v5/utils"
)
type NotificationType string
type notificationType string
const NOTIFICATION_TYPE_CLEAR NotificationType = "clear"
const NOTIFICATION_TYPE_MESSAGE NotificationType = "message"
const NOTIFICATION_TYPE_UPDATE_BADGE NotificationType = "update_badge"
const (
notificationTypeClear notificationType = "clear"
notificationTypeMessage notificationType = "message"
notificationTypeUpdateBadge notificationType = "update_badge"
)
const PUSH_NOTIFICATION_HUB_WORKERS = 1000
const PUSH_NOTIFICATIONS_HUB_BUFFER_PER_WORKER = 50
@@ -30,7 +32,7 @@ type PushNotificationsHub struct {
}
type PushNotification struct {
notificationType NotificationType
notificationType notificationType
currentSessionId string
userId string
channelId string
@@ -155,7 +157,7 @@ func (a *App) sendPushNotification(notification *PostNotification, user *model.U
c := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(user.Id)
c <- PushNotification{
notificationType: NOTIFICATION_TYPE_MESSAGE,
notificationType: notificationTypeMessage,
post: post,
user: user,
channel: channel,
@@ -229,7 +231,7 @@ func (a *App) clearPushNotificationSync(currentSessionId, userId, channelId stri
func (a *App) clearPushNotification(currentSessionId, userId, channelId string) {
channel := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(userId)
channel <- PushNotification{
notificationType: NOTIFICATION_TYPE_CLEAR,
notificationType: notificationTypeClear,
currentSessionId: currentSessionId,
userId: userId,
channelId: channelId,
@@ -257,7 +259,7 @@ func (a *App) updateMobileAppBadgeSync(userId string) *model.AppError {
func (a *App) UpdateMobileAppBadge(userId string) {
channel := a.Srv().PushNotificationsHub.GetGoChannelFromUserId(userId)
channel <- PushNotification{
notificationType: NOTIFICATION_TYPE_UPDATE_BADGE,
notificationType: notificationTypeUpdateBadge,
userId: userId,
}
}
@@ -275,11 +277,10 @@ func (a *App) createPushNotificationsHub() {
func (a *App) pushNotificationWorker(notifications chan PushNotification) {
for notification := range notifications {
var err *model.AppError
switch notification.notificationType {
case NOTIFICATION_TYPE_CLEAR:
case notificationTypeClear:
err = a.clearPushNotificationSync(notification.currentSessionId, notification.userId, notification.channelId)
case NOTIFICATION_TYPE_MESSAGE:
case notificationTypeMessage:
err = a.sendPushNotificationSync(
notification.post,
notification.user,
@@ -290,7 +291,7 @@ func (a *App) pushNotificationWorker(notifications chan PushNotification) {
notification.channelWideMention,
notification.replyToThreadType,
)
case NOTIFICATION_TYPE_UPDATE_BADGE:
case notificationTypeUpdateBadge:
err = a.updateMobileAppBadgeSync(notification.userId)
default:
mlog.Error("Invalid notification type", mlog.String("notification_type", string(notification.notificationType)))
@@ -326,7 +327,8 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session
mlog.String("status", model.PUSH_SEND_PREPARE),
)
request, err := http.NewRequest("POST", strings.TrimRight(*a.Config().EmailSettings.PushNotificationServer, "/")+model.API_URL_SUFFIX_V1+"/send_push", strings.NewReader(msg.ToJson()))
url := strings.TrimRight(*a.Config().EmailSettings.PushNotificationServer, "/") + model.API_URL_SUFFIX_V1 + "/send_push"
request, err := http.NewRequest("POST", url, strings.NewReader(msg.ToJson()))
if err != nil {
return err
}
@@ -335,21 +337,18 @@ func (a *App) sendToPushProxy(msg model.PushNotification, session *model.Session
if err != nil {
return err
}
defer resp.Body.Close()
pushResponse := model.PushResponseFromJson(resp.Body)
if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_REMOVE {
switch pushResponse[model.PUSH_STATUS] {
case model.PUSH_STATUS_REMOVE:
a.AttachDeviceId(session.Id, "", session.ExpiresAt)
a.ClearSessionCacheForUser(session.UserId)
return errors.New("Device was reported as removed")
}
if pushResponse[model.PUSH_STATUS] == model.PUSH_STATUS_FAIL {
case model.PUSH_STATUS_FAIL:
return errors.New(pushResponse[model.PUSH_STATUS_ERROR_MSG])
}
return nil
}

View File

@@ -5,7 +5,11 @@ package app
import (
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store/storetest/mocks"
@@ -971,19 +975,505 @@ func TestBuildPushNotificationMessageMentions(t *testing.T) {
func TestSendPushNotifications(t *testing.T) {
th := Setup(t).InitBasic()
th.App.CreateSession(&model.Session{
defer th.TearDown()
_, err := th.App.CreateSession(&model.Session{
UserId: th.BasicUser.Id,
DeviceId: "test",
ExpiresAt: model.GetMillis() + 100000,
})
defer th.TearDown()
require.Nil(t, err)
t.Run("should return error if data is not valid or nil", func(t *testing.T) {
err := th.App.sendPushNotificationToAllSessions(nil, th.BasicUser.Id, "")
assert.NotNil(t, err)
assert.Equal(t, "pushNotification: An error occurred building the push notification message., ", err.Error())
require.NotNil(t, err)
assert.Equal(t, "api.push_notifications.message.parse.app_error", err.Id)
// Errors derived of using an empty object are handled internally through the notifications log
err = th.App.sendPushNotificationToAllSessions(&model.PushNotification{}, th.BasicUser.Id, "")
assert.Nil(t, err)
require.Nil(t, err)
})
}
// testPushNotificationHandler is an HTTP handler to record push notifications
// being sent from the client.
// It records the number of requests sent to it, and stores all the requests
// to be verified later.
type testPushNotificationHandler struct {
t testing.TB
serialUserMap sync.Map
mut sync.RWMutex
behavior string
_numReqs int
_notifications []*model.PushNotification
_notificationAcks []*model.PushNotificationAck
}
// handleReq parses a push notification from the body, and stores it.
// It also sends an appropriate response depending on the behavior set.
// If the behavior is simple, it always sends an OK response. Otherwise,
// it alternates between an OK and a REMOVE response.
func (h *testPushNotificationHandler) handleReq(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v1/send_push", "/api/v1/ack":
h.t.Helper()
// Don't do any checking if it's a benchmark
if _, ok := h.t.(*testing.B); ok {
resp := model.NewOkPushResponse()
fmt.Fprintln(w, (&resp).ToJson())
return
}
var notification *model.PushNotification
var notificationAck *model.PushNotificationAck
var err error
if r.URL.Path == "/api/v1/send_push" {
notification, err = model.PushNotificationFromJson(r.Body)
if err != nil {
resp := model.NewErrorPushResponse("fail")
fmt.Fprintln(w, (&resp).ToJson())
return
}
// We verify that messages are being sent in order per-device.
if notification.DeviceId != "" {
if _, ok := h.serialUserMap.Load(notification.DeviceId); ok {
h.t.Fatalf("device id: %s being sent concurrently", notification.DeviceId)
}
h.serialUserMap.LoadOrStore(notification.DeviceId, true)
defer h.serialUserMap.Delete(notification.DeviceId)
}
} else {
notificationAck, err = model.PushNotificationAckFromJson(r.Body)
if err != nil {
resp := model.NewErrorPushResponse("fail")
fmt.Fprintln(w, (&resp).ToJson())
return
}
}
// Updating internal state.
h.mut.Lock()
defer h.mut.Unlock()
h._numReqs++
// Little bit of duplicate condition check so that we can check the in-order property
// first.
if r.URL.Path == "/api/v1/send_push" {
h._notifications = append(h._notifications, notification)
} else {
h._notificationAcks = append(h._notificationAcks, notificationAck)
}
var resp model.PushResponse
if h.behavior == "simple" {
resp = model.NewOkPushResponse()
} else {
// alternating between ok and remove response to test both code paths.
if h._numReqs%2 == 0 {
resp = model.NewOkPushResponse()
} else {
resp = model.NewRemovePushResponse()
}
}
fmt.Fprintln(w, (&resp).ToJson())
}
}
func (h *testPushNotificationHandler) numReqs() int {
h.mut.RLock()
defer h.mut.RUnlock()
return h._numReqs
}
func (h *testPushNotificationHandler) notifications() []*model.PushNotification {
h.mut.RLock()
defer h.mut.RUnlock()
return h._notifications
}
func (h *testPushNotificationHandler) notificationAcks() []*model.PushNotificationAck {
h.mut.RLock()
defer h.mut.RUnlock()
return h._notificationAcks
}
func TestClearPushNotificationSync(t *testing.T) {
th := SetupWithStoreMock(t)
defer th.TearDown()
handler := &testPushNotificationHandler{t: t}
pushServer := httptest.NewServer(
http.HandlerFunc(handler.handleReq),
)
defer pushServer.Close()
sess1 := &model.Session{
Id: "id1",
UserId: "user1",
DeviceId: "test1",
ExpiresAt: model.GetMillis() + 100000,
}
sess2 := &model.Session{
Id: "id2",
UserId: "user1",
DeviceId: "test2",
ExpiresAt: model.GetMillis() + 100000,
}
mockStore := th.App.Srv().Store.(*mocks.Store)
mockUserStore := mocks.UserStore{}
mockUserStore.On("Count", mock.Anything).Return(int64(10), nil)
mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil)
mockPostStore := mocks.PostStore{}
mockPostStore.On("GetMaxPostSize").Return(65535, nil)
mockSystemStore := mocks.SystemStore{}
mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil)
mockSessionStore := mocks.SessionStore{}
mockSessionStore.On("GetSessionsWithActiveDeviceIds", mock.AnythingOfType("string")).Return([]*model.Session{sess1, sess2}, nil)
mockSessionStore.On("UpdateDeviceId", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64")).Return("testdeviceID", nil)
mockStore.On("User").Return(&mockUserStore)
mockStore.On("Post").Return(&mockPostStore)
mockStore.On("System").Return(&mockSystemStore)
mockStore.On("Session").Return(&mockSessionStore)
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.EmailSettings.PushNotificationServer = pushServer.URL
})
err := th.App.clearPushNotificationSync(sess1.Id, "user1", "channel1")
require.Nil(t, err)
// Server side verification.
// We verify that 1 request has been sent, and also check the message contents.
require.Equal(t, 1, handler.numReqs())
assert.Equal(t, "channel1", handler.notifications()[0].ChannelId)
assert.Equal(t, model.PUSH_TYPE_CLEAR, handler.notifications()[0].Type)
}
func TestUpdateMobileAppBadgeSync(t *testing.T) {
th := SetupWithStoreMock(t)
defer th.TearDown()
handler := &testPushNotificationHandler{t: t}
pushServer := httptest.NewServer(
http.HandlerFunc(handler.handleReq),
)
defer pushServer.Close()
sess1 := &model.Session{
Id: "id1",
UserId: "user1",
DeviceId: "test1",
ExpiresAt: model.GetMillis() + 100000,
}
sess2 := &model.Session{
Id: "id2",
UserId: "user1",
DeviceId: "test2",
ExpiresAt: model.GetMillis() + 100000,
}
mockStore := th.App.Srv().Store.(*mocks.Store)
mockUserStore := mocks.UserStore{}
mockUserStore.On("Count", mock.Anything).Return(int64(10), nil)
mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil)
mockPostStore := mocks.PostStore{}
mockPostStore.On("GetMaxPostSize").Return(65535, nil)
mockSystemStore := mocks.SystemStore{}
mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil)
mockSessionStore := mocks.SessionStore{}
mockSessionStore.On("GetSessionsWithActiveDeviceIds", mock.AnythingOfType("string")).Return([]*model.Session{sess1, sess2}, nil)
mockSessionStore.On("UpdateDeviceId", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("int64")).Return("testdeviceID", nil)
mockStore.On("User").Return(&mockUserStore)
mockStore.On("Post").Return(&mockPostStore)
mockStore.On("System").Return(&mockSystemStore)
mockStore.On("Session").Return(&mockSessionStore)
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.EmailSettings.PushNotificationServer = pushServer.URL
})
err := th.App.updateMobileAppBadgeSync("user1")
require.Nil(t, err)
// Server side verification.
// We verify that 2 requests have been sent, and also check the message contents.
require.Equal(t, 2, handler.numReqs())
assert.Equal(t, 1, handler.notifications()[0].ContentAvailable)
assert.Equal(t, model.PUSH_TYPE_UPDATE_BADGE, handler.notifications()[0].Type)
assert.Equal(t, 1, handler.notifications()[1].ContentAvailable)
assert.Equal(t, model.PUSH_TYPE_UPDATE_BADGE, handler.notifications()[1].Type)
}
func TestSendAckToPushProxy(t *testing.T) {
th := SetupWithStoreMock(t)
defer th.TearDown()
handler := &testPushNotificationHandler{t: t}
pushServer := httptest.NewServer(
http.HandlerFunc(handler.handleReq),
)
defer pushServer.Close()
mockStore := th.App.Srv().Store.(*mocks.Store)
mockUserStore := mocks.UserStore{}
mockUserStore.On("Count", mock.Anything).Return(int64(10), nil)
mockPostStore := mocks.PostStore{}
mockPostStore.On("GetMaxPostSize").Return(65535, nil)
mockSystemStore := mocks.SystemStore{}
mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil)
mockStore.On("User").Return(&mockUserStore)
mockStore.On("Post").Return(&mockPostStore)
mockStore.On("System").Return(&mockSystemStore)
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.EmailSettings.PushNotificationServer = pushServer.URL
})
ack := &model.PushNotificationAck{
Id: "testid",
NotificationType: model.PUSH_TYPE_MESSAGE,
}
err := th.App.SendAckToPushProxy(ack)
require.Nil(t, err)
// Server side verification.
// We verify that 1 request has been sent, and also check the message contents.
require.Equal(t, 1, handler.numReqs())
assert.Equal(t, ack.Id, handler.notificationAcks()[0].Id)
assert.Equal(t, ack.NotificationType, handler.notificationAcks()[0].NotificationType)
}
// TestAllPushNotifications is a master test which sends all verious types
// of notifications and verifies they have been properly sent.
func TestAllPushNotifications(t *testing.T) {
if testing.Short() {
t.Skip("skipping all push notifications test in short mode")
}
th := Setup(t).InitBasic()
defer th.TearDown()
// Create 10 users, each having 2 sessions.
type userSession struct {
user *model.User
session *model.Session
}
var testData []userSession
for i := 0; i < 10; i++ {
u := th.CreateUser()
sess, err := th.App.CreateSession(&model.Session{
UserId: u.Id,
DeviceId: "deviceID" + u.Id,
ExpiresAt: model.GetMillis() + 100000,
})
require.Nil(t, err)
// We don't need to track the 2nd session.
_, err = th.App.CreateSession(&model.Session{
UserId: u.Id,
DeviceId: "deviceID" + u.Id,
ExpiresAt: model.GetMillis() + 100000,
})
require.Nil(t, err)
_, err = th.App.AddTeamMember(th.BasicTeam.Id, u.Id)
require.Nil(t, err)
th.AddUserToChannel(u, th.BasicChannel)
testData = append(testData, userSession{
user: u,
session: sess,
})
}
handler := &testPushNotificationHandler{
t: t,
behavior: "simple",
}
pushServer := httptest.NewServer(
http.HandlerFunc(handler.handleReq),
)
defer pushServer.Close()
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.EmailSettings.PushNotificationServer = pushServer.URL
})
var wg sync.WaitGroup
for i, data := range testData {
wg.Add(1)
// Ranging between 3 types of notifications.
switch i % 3 {
case 0:
go func(user model.User) {
defer wg.Done()
notification := &PostNotification{
Post: th.CreatePost(th.BasicChannel),
Channel: th.BasicChannel,
ProfileMap: map[string]*model.User{
user.Id: &user,
},
Sender: &user,
}
// testing all 3 notification types.
th.App.sendPushNotification(notification, &user, true, false, model.COMMENTS_NOTIFY_ANY)
}(*data.user)
case 1:
go func(id string) {
defer wg.Done()
th.App.UpdateMobileAppBadge(id)
}(data.user.Id)
case 2:
go func(sessID, userID string) {
defer wg.Done()
th.App.clearPushNotification(sessID, userID, th.BasicChannel.Id)
}(data.session.Id, data.user.Id)
}
}
wg.Wait()
// Hack to let the worker goroutines complete.
time.Sleep(1 * time.Second)
// Server side verification.
assert.Equal(t, 17, handler.numReqs())
var numClears, numMessages, numUpdateBadges int
for _, n := range handler.notifications() {
switch n.Type {
case model.PUSH_TYPE_CLEAR:
numClears++
assert.Equal(t, th.BasicChannel.Id, n.ChannelId)
case model.PUSH_TYPE_MESSAGE:
numMessages++
assert.Equal(t, th.BasicChannel.Id, n.ChannelId)
assert.Contains(t, n.Message, "mentioned you")
case model.PUSH_TYPE_UPDATE_BADGE:
numUpdateBadges++
assert.Equal(t, "none", n.Sound)
assert.Equal(t, 1, n.ContentAvailable)
}
}
assert.Equal(t, 8, numMessages)
assert.Equal(t, 3, numClears)
assert.Equal(t, 6, numUpdateBadges)
}
func BenchmarkPushNotification(b *testing.B) {
th := SetupWithStoreMock(b)
defer th.TearDown()
handler := &testPushNotificationHandler{
t: b,
behavior: "simple",
}
pushServer := httptest.NewServer(
http.HandlerFunc(handler.handleReq),
)
defer pushServer.Close()
mockStore := th.App.Srv().Store.(*mocks.Store)
mockUserStore := mocks.UserStore{}
mockUserStore.On("Count", mock.Anything).Return(int64(10), nil)
mockUserStore.On("GetUnreadCount", mock.AnythingOfType("string")).Return(int64(1), nil)
mockPostStore := mocks.PostStore{}
mockPostStore.On("GetMaxPostSize").Return(65535, nil)
mockSystemStore := mocks.SystemStore{}
mockSystemStore.On("GetByName", "InstallationDate").Return(&model.System{Name: "InstallationDate", Value: "10"}, nil)
mockSessionStore := mocks.SessionStore{}
mockPreferenceStore := mocks.PreferenceStore{}
mockPreferenceStore.On("Get", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(&model.Preference{Value: "test"}, nil)
mockStore.On("User").Return(&mockUserStore)
mockStore.On("Post").Return(&mockPostStore)
mockStore.On("System").Return(&mockSystemStore)
mockStore.On("Session").Return(&mockSessionStore)
mockStore.On("Preference").Return(&mockPreferenceStore)
// create 50 users, each having 2 sessions.
type userSession struct {
user *model.User
session *model.Session
}
var testData []userSession
for i := 0; i < 50; i++ {
id := model.NewId()
u := &model.User{
Id: id,
Email: "success+" + id + "@simulator.amazonses.com",
Username: "un_" + id,
Nickname: "nn_" + id,
Password: "Password1",
EmailVerified: true,
}
sess1 := &model.Session{
Id: "id1",
UserId: u.Id,
DeviceId: "deviceID" + u.Id,
ExpiresAt: model.GetMillis() + 100000,
}
sess2 := &model.Session{
Id: "id2",
UserId: u.Id,
DeviceId: "deviceID" + u.Id,
ExpiresAt: model.GetMillis() + 100000,
}
mockSessionStore.On("GetSessionsWithActiveDeviceIds", u.Id).Return([]*model.Session{sess1, sess2}, nil)
mockSessionStore.On("UpdateDeviceId", sess1.Id, "deviceID"+u.Id, mock.AnythingOfType("int64")).Return("deviceID"+u.Id, nil)
testData = append(testData, userSession{
user: u,
session: sess1,
})
}
th.App.UpdateConfig(func(cfg *model.Config) {
*cfg.EmailSettings.PushNotificationServer = pushServer.URL
*cfg.LogSettings.EnableConsole = false
*cfg.NotificationLogSettings.EnableConsole = false
})
ch := &model.Channel{
Id: model.NewId(),
CreateAt: model.GetMillis(),
Type: model.CHANNEL_OPEN,
Name: "testch",
}
b.ResetTimer()
// We have an inner loop which ranges the testdata slice
// and we just repeat that.
// TODO: replace 10 by b.N
then := time.Now()
for i := 0; i < 10; i++ {
var wg sync.WaitGroup
for j, data := range testData {
wg.Add(1)
// Ranging between 3 types of notifications.
switch j % 3 {
case 0:
go func(user model.User) {
defer wg.Done()
post := &model.Post{
UserId: user.Id,
ChannelId: ch.Id,
Message: "test message",
CreateAt: model.GetMillis(),
}
notification := &PostNotification{
Post: post,
Channel: ch,
ProfileMap: map[string]*model.User{
user.Id: &user,
},
Sender: &user,
}
th.App.sendPushNotification(notification, &user, true, false, model.COMMENTS_NOTIFY_ANY)
}(*data.user)
case 1:
go func(id string) {
defer wg.Done()
th.App.UpdateMobileAppBadge(id)
}(data.user.Id)
case 2:
go func(sessID, userID string) {
defer wg.Done()
th.App.clearPushNotification(sessID, userID, ch.Id)
}(data.session.Id, data.user.Id)
}
}
wg.Wait()
}
b.Logf("time taken: %v", time.Since(then))
b.StopTimer()
time.Sleep(2 * time.Second)
}