From 77d42568f94cf2f1c141d3ee4775269259b3d480 Mon Sep 17 00:00:00 2001 From: darkLord19 Date: Fri, 7 May 2021 21:50:55 +0530 Subject: [PATCH] [MM-8497] Ability to set Do Not Disturb for a specified period of time (#16067) * Add support for timed DND status - accept a date time value in api query when dnd mode for user needs to be unset - Create a new function to handle SetDNDStatus calls - Create a scheduled task to unset dnd mode to wahtever mode was before setting it to DND * update schema version * Model changes to make fields more intuitive - move dndendtime to status model - add new field prev status in status to keep track of previous status of user - update db migration function - make use of prevstatus and dndendtime from status model * set prev status and dndendtime appropriately after unsetting dnd mode * add json tag for dndendtime * unset dnd status only if not changed manually by user * update dnd statuses after server restart * make app-layers * fix failing tests * don't create sched task when setting status to DND * get only expired statuses from db - convert end time from any timezone to utc - store dnd end time in unix format for usability reasons * run update dnd status only on leader * make mocks * fix tests * run UpdateDNDStatusOfUsers as recurring task * save all statuses at once in db and update UpdateDNDStatusOfUsers logic * add app method to get timezone of user * store dnd end time in context.Params * set max size of prevstatus * update status model to take endtime input as string and store in db as unix time(int64) * Add tests for SetStatusDoNotDisturbTimed * if dnd_end_time is not passed the call old api to set dnd mode * fix tests * new plugin api to use new timed dnd mode * get and update rows in a single db query * dnd end time will be stored in request body and not route param * exclude statuses which has dndendtimeunix < 0 * update and get the updated dnd statuses in single db query * add updated status to cache * DNDEndTimeUnix and PrevStatus need not to be visible to users * update db schema version for migration * Keep Status and PrevStatus varchar size same * add test to verify status is restored after dnd end time expires * expect endtime in utc from client - remove store method GetTimezone as no longer needed - add documentation for SetStatusDoNotDisturbTimed * reduce sleep time for dnd timed restore test * more appropriate name for new api to update user status * update db migration function * parse and validate time before potentially triggering db query to get status of user * add migration changes in to existing upgrade function * not supporting un-timed dnd status via api * don't call Srv.Store directly, call via app layer * rename dndendtime to statuscleartime to make it suitable for custom status usage as well * Revert "rename dndendtime to statuscleartime to make it suitable for custom status usage as well" This reverts commit fa69152d9a3db18f1c59b34c878fb7ce494440b5. * mysql doesn't support RETURNING clause so add tx to get and update statuses * add UpdateDNDStatusOfUsers mock in tests * update store mock import path * add mock in storelib * Add status mocks to empty store * Close the task during server shutdown * Do not cancel a nil task * update squirrel queries * remove untimed dnd test * start recurring task to unset statuses on leadership change * set dndTask to nil after cancelling it upon server shutdown * new recurring task which starts at nearest rounded time of the interval * mock Get() call for status * return updated statuses in case of mysql * remove unneccessary code * add Get() mock to empty store * fix mocking for once and all * address review comments fix mysql updateStatus fn protect dndTask with mutex minor refactors * move runDNDStatusExpireJob to server.go and pass App as arg instead of method receiver * frontend will send endtime in unix epoch format so get rid of double representation * scan for all fields and not just two * add some tests and fix review comments * remove extra sql query and create needed result in go * add storetest for UpdateExpiredDNDStatuses * add migrations to latest version * update min supported version * add comment to fix a bug in future * update test to expect 1 status in return * rename UpdateUserStatusWithDNDTimeout to SetUserStatusTimedDND * rename DNDEndTimeUnix to DNDEndTime * cast int to int64 for equality * fix tests and error handling * move updating values to retrieved statuses fields outside sql transaction * move migrations to 5.36 Co-authored-by: Agniva De Sarker Co-authored-by: Mattermod --- api4/apitestlib.go | 19 ++++ api4/status.go | 2 +- api4/status_test.go | 39 ++++++- app/app.go | 6 ++ app/app_iface.go | 7 ++ app/helper_test.go | 13 +++ app/opentracing/opentracing_layer.go | 52 +++++++++ app/plugin_api.go | 8 ++ app/server.go | 32 ++++++ app/status.go | 36 +++++++ model/scheduled_task.go | 55 +++++++--- model/scheduled_task_test.go | 33 ++++++ model/status.go | 2 + model/status_test.go | 4 +- plugin/api.go | 6 ++ plugin/api_timer_layer_generated.go | 7 ++ plugin/client_rpc_generated.go | 30 ++++++ plugin/plugintest/api.go | 25 +++++ store/opentracinglayer/opentracinglayer.go | 18 ++++ store/retrylayer/retrylayer.go | 20 ++++ store/sqlstore/status_store.go | 116 +++++++++++++++++++++ store/sqlstore/upgrade.go | 5 + store/store.go | 1 + store/storetest/mocks/StatusStore.go | 23 ++++ store/storetest/status_store.go | 31 ++++++ store/timerlayer/timerlayer.go | 16 +++ 26 files changed, 585 insertions(+), 21 deletions(-) diff --git a/api4/apitestlib.go b/api4/apitestlib.go index b0607f7c79..33817725bd 100644 --- a/api4/apitestlib.go +++ b/api4/apitestlib.go @@ -24,6 +24,7 @@ import ( "github.com/mattermost/mattermost-server/v5/app" "github.com/mattermost/mattermost-server/v5/config" "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/plugin/plugintest/mock" "github.com/mattermost/mattermost-server/v5/services/searchengine" "github.com/mattermost/mattermost-server/v5/shared/mlog" "github.com/mattermost/mattermost-server/v5/store" @@ -251,24 +252,42 @@ func SetupConfig(tb testing.TB, updateConfig func(cfg *model.Config)) *TestHelpe func SetupConfigWithStoreMock(tb testing.TB, updateConfig func(cfg *model.Config)) *TestHelper { th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, false, false, updateConfig) + statusMock := mocks.StatusStore{} + statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil) + statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil) + statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil) + statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil) emptyMockStore := mocks.Store{} emptyMockStore.On("Close").Return(nil) + emptyMockStore.On("Status").Return(&statusMock) th.App.Srv().Store = &emptyMockStore return th } func SetupWithStoreMock(tb testing.TB) *TestHelper { th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, false, false, nil) + statusMock := mocks.StatusStore{} + statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil) + statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil) + statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil) + statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil) emptyMockStore := mocks.Store{} emptyMockStore.On("Close").Return(nil) + emptyMockStore.On("Status").Return(&statusMock) th.App.Srv().Store = &emptyMockStore return th } func SetupEnterpriseWithStoreMock(tb testing.TB) *TestHelper { th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, true, false, nil) + statusMock := mocks.StatusStore{} + statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil) + statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil) + statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil) + statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil) emptyMockStore := mocks.Store{} emptyMockStore.On("Close").Return(nil) + emptyMockStore.On("Status").Return(&statusMock) th.App.Srv().Store = &emptyMockStore return th } diff --git a/api4/status.go b/api4/status.go index 902c7a5ca3..32925d3eb8 100644 --- a/api4/status.go +++ b/api4/status.go @@ -106,7 +106,7 @@ func updateUserStatus(c *Context, w http.ResponseWriter, r *http.Request) { case "away": c.App.SetStatusAwayIfNeeded(c.Params.UserId, true) case "dnd": - c.App.SetStatusDoNotDisturb(c.Params.UserId) + c.App.SetStatusDoNotDisturbTimed(c.Params.UserId, status.DNDEndTime) default: c.SetInvalidParam("status") return diff --git a/api4/status_test.go b/api4/status_test.go index 10f0b65453..ed645f7e84 100644 --- a/api4/status_test.go +++ b/api4/status_test.go @@ -5,6 +5,7 @@ package api4 import ( "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,6 +44,30 @@ func TestGetUserStatus(t *testing.T) { assert.Equal(t, "dnd", userStatus.Status) }) + t.Run("dnd status timed", func(t *testing.T) { + th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(10*time.Minute).Unix()) + userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "") + CheckNoError(t, resp) + assert.Equal(t, "dnd", userStatus.Status) + }) + + t.Run("dnd status timed restore after time interval", func(t *testing.T) { + task := model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses From Test", th.App.UpdateDNDStatusOfUsers, 1*time.Second) + defer task.Cancel() + th.App.SetStatusOnline(th.BasicUser.Id, true) + userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "") + CheckNoError(t, resp) + assert.Equal(t, "online", userStatus.Status) + th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(2*time.Second).Unix()) + userStatus, resp = Client.GetUserStatus(th.BasicUser.Id, "") + CheckNoError(t, resp) + assert.Equal(t, "dnd", userStatus.Status) + time.Sleep(3 * time.Second) + userStatus, resp = Client.GetUserStatus(th.BasicUser.Id, "") + CheckNoError(t, resp) + assert.Equal(t, "online", userStatus.Status) + }) + t.Run("back to offline status", func(t *testing.T) { th.App.SetStatusOffline(th.BasicUser.Id, true) userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "") @@ -131,6 +156,16 @@ func TestGetUsersStatusesByIds(t *testing.T) { } }) + t.Run("dnd status", func(t *testing.T) { + th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(10*time.Minute).Unix()) + th.App.SetStatusDoNotDisturbTimed(th.BasicUser2.Id, time.Now().Add(15*time.Minute).Unix()) + usersStatuses, resp := Client.GetUsersStatusesByIds(usersIds) + CheckNoError(t, resp) + for _, userStatus := range usersStatuses { + assert.Equal(t, "dnd", userStatus.Status) + } + }) + t.Run("get statuses from logged out user", func(t *testing.T) { Client.Logout() @@ -158,8 +193,8 @@ func TestUpdateUserStatus(t *testing.T) { assert.Equal(t, "away", updateUserStatus.Status) }) - t.Run("set dnd status", func(t *testing.T) { - toUpdateUserStatus := &model.Status{Status: "dnd", UserId: th.BasicUser.Id} + t.Run("set dnd status timed", func(t *testing.T) { + toUpdateUserStatus := &model.Status{Status: "dnd", UserId: th.BasicUser.Id, DNDEndTime: time.Now().Add(10 * time.Minute).Unix()} updateUserStatus, resp := Client.UpdateUserStatus(th.BasicUser.Id, toUpdateUserStatus) CheckNoError(t, resp) assert.Equal(t, "dnd", updateUserStatus.Status) diff --git a/app/app.go b/app/app.go index 40ba029672..31431b18d6 100644 --- a/app/app.go +++ b/app/app.go @@ -94,6 +94,8 @@ func (a *App) InitServer() { if a.Srv().runEssentialJobs { a.Srv().Go(func() { runLicenseExpirationCheckJob(a) + runCheckWarnMetricStatusJob(a) + runDNDStatusExpireJob(a) runCheckAdminSupportStatusJob(a) }) a.srv.runJobs() @@ -734,3 +736,7 @@ func (a *App) DBHealthCheckDelete() error { func (a *App) dbHealthCheckKey() string { return fmt.Sprintf("health_check_%s", a.GetClusterId()) } + +func (a *App) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + return a.Srv().Store.Status().UpdateExpiredDNDStatuses() +} diff --git a/app/app_iface.go b/app/app_iface.go index b7b82616b3..b559374e14 100644 --- a/app/app_iface.go +++ b/app/app_iface.go @@ -310,6 +310,9 @@ type AppIface interface { // relative to either the session creation date or the current time, depending // on the `ExtendSessionOnActivity` config setting. SetSessionExpireInDays(session *model.Session, days int) + // SetStatusDoNotDisturbTimed takes endtime in unix epoch format in UTC + // and sets status of given userId to dnd which will be restored back after endtime + SetStatusDoNotDisturbTimed(userId string, endtime int64) // SetStatusLastActivityAt sets the last activity at for a user on the local app server and updates // status to away if needed. Used by the WS to set status to away if an 'online' device disconnects // while an 'away' device is still connected @@ -356,6 +359,9 @@ type AppIface interface { UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppError) // UpdateChannelScheme saves the new SchemeId of the channel passed. UpdateChannelScheme(channel *model.Channel) (*model.Channel, *model.AppError) + // UpdateDNDStatusOfUsers is a recurring task which is started when server starts + // which unsets dnd status of users if needed and saves and broadcasts it + UpdateDNDStatusOfUsers() // UpdateProductNotices is called periodically from a scheduled worker to fetch new notices and update the cache UpdateProductNotices() *model.AppError // UpdateViewedProductNotices is called from the frontend to mark a set of notices as 'viewed' by user @@ -1058,6 +1064,7 @@ type AppIface interface { UpdateCommand(oldCmd, updatedCmd *model.Command) (*model.Command, *model.AppError) UpdateConfig(f func(*model.Config)) UpdateEphemeralPost(userID string, post *model.Post) *model.Post + UpdateExpiredDNDStatuses() ([]*model.Status, error) UpdateGroup(group *model.Group) (*model.Group, *model.AppError) UpdateGroupSyncable(groupSyncable *model.GroupSyncable) (*model.GroupSyncable, *model.AppError) UpdateHashedPassword(user *model.User, newHashedPassword string) *model.AppError diff --git a/app/helper_test.go b/app/helper_test.go index 78bfb0bfb5..515fbb2370 100644 --- a/app/helper_test.go +++ b/app/helper_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/mattermost/mattermost-server/v5/config" @@ -157,8 +158,14 @@ func SetupWithoutPreloadMigrations(tb testing.TB) *TestHelper { func SetupWithStoreMock(tb testing.TB) *TestHelper { mockStore := testlib.GetMockStoreForSetupFunctions() th := setupTestHelper(mockStore, false, false, tb) + statusMock := mocks.StatusStore{} + statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil) + statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil) + statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil) + statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil) emptyMockStore := mocks.Store{} emptyMockStore.On("Close").Return(nil) + emptyMockStore.On("Status").Return(&statusMock) th.App.Srv().Store = &emptyMockStore return th } @@ -166,8 +173,14 @@ func SetupWithStoreMock(tb testing.TB) *TestHelper { func SetupEnterpriseWithStoreMock(tb testing.TB) *TestHelper { mockStore := testlib.GetMockStoreForSetupFunctions() th := setupTestHelper(mockStore, true, false, tb) + statusMock := mocks.StatusStore{} + statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil) + statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil) + statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil) + statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil) emptyMockStore := mocks.Store{} emptyMockStore.On("Close").Return(nil) + emptyMockStore.On("Status").Return(&statusMock) th.App.Srv().Store = &emptyMockStore return th } diff --git a/app/opentracing/opentracing_layer.go b/app/opentracing/opentracing_layer.go index bb753821d3..4ed38fc7eb 100644 --- a/app/opentracing/opentracing_layer.go +++ b/app/opentracing/opentracing_layer.go @@ -14959,6 +14959,21 @@ func (a *OpenTracingAppLayer) SetStatusDoNotDisturb(userID string) { a.app.SetStatusDoNotDisturb(userID) } +func (a *OpenTracingAppLayer) SetStatusDoNotDisturbTimed(userId string, endtime int64) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SetStatusDoNotDisturbTimed") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + a.app.SetStatusDoNotDisturbTimed(userId, endtime) +} + func (a *OpenTracingAppLayer) SetStatusLastActivityAt(userID string, activityAt int64) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SetStatusLastActivityAt") @@ -15884,6 +15899,21 @@ func (a *OpenTracingAppLayer) UpdateConfig(f func(*model.Config)) { a.app.UpdateConfig(f) } +func (a *OpenTracingAppLayer) UpdateDNDStatusOfUsers() { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateDNDStatusOfUsers") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + a.app.UpdateDNDStatusOfUsers() +} + func (a *OpenTracingAppLayer) UpdateEphemeralPost(userID string, post *model.Post) *model.Post { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateEphemeralPost") @@ -15901,6 +15931,28 @@ func (a *OpenTracingAppLayer) UpdateEphemeralPost(userID string, post *model.Pos return resultVar0 } +func (a *OpenTracingAppLayer) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + origCtx := a.ctx + span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateExpiredDNDStatuses") + + a.ctx = newCtx + a.app.Srv().Store.SetContext(newCtx) + defer func() { + a.app.Srv().Store.SetContext(origCtx) + a.ctx = origCtx + }() + + defer span.Finish() + resultVar0, resultVar1 := a.app.UpdateExpiredDNDStatuses() + + if resultVar1 != nil { + span.LogFields(spanlog.Error(resultVar1)) + ext.Error.Set(span, true) + } + + return resultVar0, resultVar1 +} + func (a *OpenTracingAppLayer) UpdateGroup(group *model.Group) (*model.Group, *model.AppError) { origCtx := a.ctx span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateGroup") diff --git a/app/plugin_api.go b/app/plugin_api.go index 3fed6d442f..1dd978bdae 100644 --- a/app/plugin_api.go +++ b/app/plugin_api.go @@ -312,6 +312,14 @@ func (api *PluginAPI) UpdateUserStatus(userID, status string) (*model.Status, *m return api.app.GetStatus(userID) } +func (api *PluginAPI) SetUserStatusTimedDND(userID string, endTime int64) (*model.Status, *model.AppError) { + // read-after-write bug which will fail if there are replicas. + // it works for now because we have a cache in between. + // FIXME: make SetStatusDoNotDisturbTimed return updated status + api.app.SetStatusDoNotDisturbTimed(userID, endTime) + return api.app.GetStatus(userID) +} + func (api *PluginAPI) GetUsersInChannel(channelID, sortBy string, page, perPage int) ([]*model.User, *model.AppError) { switch sortBy { case model.CHANNEL_SORT_BY_USERNAME: diff --git a/app/server.go b/app/server.go index c8dc9adb84..829cbd5153 100644 --- a/app/server.go +++ b/app/server.go @@ -202,6 +202,9 @@ type Server struct { featureFlagStop chan struct{} featureFlagStopped chan struct{} featureFlagSynchronizerMutex sync.Mutex + + dndnTaskMut sync.Mutex + dndTask *model.ScheduledTask } func NewServer(options ...Option) (*Server, error) { @@ -993,6 +996,12 @@ func (s *Server) Shutdown() { mlog.Warn("Error flushing logs", mlog.Err(err)) } + s.dndnTaskMut.Lock() + if s.dndTask != nil { + s.dndTask.Cancel() + } + s.dndnTaskMut.Unlock() + mlog.Info("Server stopped") // this should just write the "server stopped" record, the rest are already flushed. @@ -2104,3 +2113,26 @@ func (a *App) generateSupportPacketYaml() (*model.FileData, string) { warning := fmt.Sprintf("yaml.Marshal(&supportPacket) Error: %s", err.Error()) return nil, warning } + +func runDNDStatusExpireJob(a *App) { + if a.IsLeader() { + a.srv.dndnTaskMut.Lock() + a.srv.dndTask = model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses", a.UpdateDNDStatusOfUsers, 5*time.Minute) + a.srv.dndnTaskMut.Unlock() + } + a.srv.AddClusterLeaderChangedListener(func() { + mlog.Info("Cluster leader changed. Determining if unset DNS status task should be running", mlog.Bool("isLeader", a.IsLeader())) + if a.IsLeader() { + a.srv.dndnTaskMut.Lock() + a.srv.dndTask = model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses", a.UpdateDNDStatusOfUsers, 5*time.Minute) + a.srv.dndnTaskMut.Unlock() + } else { + a.srv.dndnTaskMut.Lock() + if a.srv.dndTask != nil { + a.srv.dndTask.Cancel() + a.srv.dndTask = nil + } + a.srv.dndnTaskMut.Unlock() + } + }) +} diff --git a/app/status.go b/app/status.go index cfe0b8ac12..c573c1789f 100644 --- a/app/status.go +++ b/app/status.go @@ -282,6 +282,28 @@ func (a *App) SetStatusAwayIfNeeded(userID string, manual bool) { a.SaveAndBroadcastStatus(status) } +// SetStatusDoNotDisturbTimed takes endtime in unix epoch format in UTC +// and sets status of given userId to dnd which will be restored back after endtime +func (a *App) SetStatusDoNotDisturbTimed(userId string, endtime int64) { + if !*a.Config().ServiceSettings.EnableUserStatuses { + return + } + + status, err := a.GetStatus(userId) + + if err != nil { + status = &model.Status{UserId: userId, Status: model.STATUS_OFFLINE, Manual: false, LastActivityAt: 0, ActiveChannel: ""} + } + + status.PrevStatus = status.Status + status.Status = model.STATUS_DND + status.Manual = true + + status.DNDEndTime = endtime + + a.SaveAndBroadcastStatus(status) +} + func (a *App) SetStatusDoNotDisturb(userID string) { if !*a.Config().ServiceSettings.EnableUserStatuses { return @@ -365,6 +387,20 @@ func (a *App) IsUserAway(lastActivityAt int64) bool { return model.GetMillis()-lastActivityAt >= *a.Config().TeamSettings.UserStatusAwayTimeout*1000 } +// UpdateDNDStatusOfUsers is a recurring task which is started when server starts +// which unsets dnd status of users if needed and saves and broadcasts it +func (a *App) UpdateDNDStatusOfUsers() { + statuses, err := a.UpdateExpiredDNDStatuses() + if err != nil { + mlog.Warn("Failed to fetch dnd statues from store", mlog.String("err", err.Error())) + return + } + for i := range statuses { + a.AddStatusCache(statuses[i]) + a.BroadcastStatus(statuses[i]) + } +} + func (a *App) SetCustomStatus(userID string, cs *model.CustomStatus) *model.AppError { user, err := a.GetUser(userID) if err != nil { diff --git a/model/scheduled_task.go b/model/scheduled_task.go index 657cc7493c..cf20db6396 100644 --- a/model/scheduled_task.go +++ b/model/scheduled_task.go @@ -11,42 +11,65 @@ import ( type TaskFunc func() type ScheduledTask struct { - Name string `json:"name"` - Interval time.Duration `json:"interval"` - Recurring bool `json:"recurring"` - function func() - cancel chan struct{} - cancelled chan struct{} + Name string `json:"name"` + Interval time.Duration `json:"interval"` + Recurring bool `json:"recurring"` + function func() + cancel chan struct{} + cancelled chan struct{} + fromNextIntervalTime bool } func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask { - return createTask(name, function, timeToExecution, false) + return createTask(name, function, timeToExecution, false, false) } func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask { - return createTask(name, function, interval, true) + return createTask(name, function, interval, true, false) } -func createTask(name string, function TaskFunc, interval time.Duration, recurring bool) *ScheduledTask { +func CreateRecurringTaskFromNextIntervalTime(name string, function TaskFunc, interval time.Duration) *ScheduledTask { + return createTask(name, function, interval, true, true) +} + +func createTask(name string, function TaskFunc, interval time.Duration, recurring bool, fromNextIntervalTime bool) *ScheduledTask { task := &ScheduledTask{ - Name: name, - Interval: interval, - Recurring: recurring, - function: function, - cancel: make(chan struct{}), - cancelled: make(chan struct{}), + Name: name, + Interval: interval, + Recurring: recurring, + function: function, + cancel: make(chan struct{}), + cancelled: make(chan struct{}), + fromNextIntervalTime: fromNextIntervalTime, } go func() { defer close(task.cancelled) - ticker := time.NewTicker(interval) + var firstTick <-chan time.Time + var ticker *time.Ticker + + if task.fromNextIntervalTime { + currTime := time.Now() + first := currTime.Truncate(interval) + if first.Before(currTime) { + first = first.Add(interval) + } + firstTick = time.After(time.Until(first)) + ticker = &time.Ticker{C: nil} + } else { + firstTick = nil + ticker = time.NewTicker(interval) + } defer func() { ticker.Stop() }() for { select { + case <-firstTick: + ticker = time.NewTicker(interval) + function() case <-ticker.C: function() case <-task.cancel: diff --git a/model/scheduled_task_test.go b/model/scheduled_task_test.go index 5a4106aff6..90e638ff2e 100644 --- a/model/scheduled_task_test.go +++ b/model/scheduled_task_test.go @@ -4,6 +4,7 @@ package model import ( + "sync" "sync/atomic" "testing" "time" @@ -73,3 +74,35 @@ func TestCancelTask(t *testing.T) { time.Sleep(TASK_TIME + time.Second) assert.EqualValues(t, 0, atomic.LoadInt32(executionCount)) } + +func TestCreateRecurringTaskFromNextIntervalTime(t *testing.T) { + TASK_NAME := "Test Recurring Task starting from next interval time" + TASK_TIME := time.Second * 2 + + var executionTime time.Time + var mu sync.Mutex + testFunc := func() { + mu.Lock() + executionTime = time.Now() + mu.Unlock() + } + + task := CreateRecurringTaskFromNextIntervalTime(TASK_NAME, testFunc, TASK_TIME) + defer task.Cancel() + + time.Sleep(TASK_TIME) + mu.Lock() + expectedSeconds := executionTime.Second() + mu.Unlock() + assert.EqualValues(t, 0, expectedSeconds%2) + + time.Sleep(TASK_TIME) + mu.Lock() + expectedSeconds = executionTime.Second() + mu.Unlock() + assert.EqualValues(t, 0, expectedSeconds%2) + + assert.Equal(t, TASK_NAME, task.Name) + assert.Equal(t, TASK_TIME, task.Interval) + assert.True(t, task.Recurring) +} diff --git a/model/status.go b/model/status.go index 1f32422a62..f6f3a67ae4 100644 --- a/model/status.go +++ b/model/status.go @@ -25,6 +25,8 @@ type Status struct { Manual bool `json:"manual"` LastActivityAt int64 `json:"last_activity_at"` ActiveChannel string `json:"active_channel,omitempty" db:"-"` + DNDEndTime int64 `json:"dnd_end_time"` + PrevStatus string `json:"-"` } func (o *Status) ToJson() string { diff --git a/model/status_test.go b/model/status_test.go index 79dae409de..6825c9e139 100644 --- a/model/status_test.go +++ b/model/status_test.go @@ -12,7 +12,7 @@ import ( ) func TestStatus(t *testing.T) { - status := Status{NewId(), STATUS_ONLINE, true, 0, "123"} + status := Status{NewId(), STATUS_ONLINE, true, 0, "123", 0, ""} json := status.ToJson() status2 := StatusFromJson(strings.NewReader(json)) @@ -29,7 +29,7 @@ func TestStatus(t *testing.T) { } func TestStatusListToJson(t *testing.T) { - statuses := []*Status{{NewId(), STATUS_ONLINE, true, 0, "123"}, {NewId(), STATUS_OFFLINE, true, 0, ""}} + statuses := []*Status{{NewId(), STATUS_ONLINE, true, 0, "123", 0, ""}, {NewId(), STATUS_OFFLINE, true, 0, "", 0, ""}} jsonStatuses := StatusListToJson(statuses) var dat []map[string]interface{} diff --git a/plugin/api.go b/plugin/api.go index f5210f7df4..06218cf428 100644 --- a/plugin/api.go +++ b/plugin/api.go @@ -229,6 +229,12 @@ type API interface { // Minimum server version: 5.2 UpdateUserStatus(userID, status string) (*model.Status, *model.AppError) + // SetUserStatusTimedDND will set a user's status to dnd for given time until the user, + // or another integration/plugin, sets it back to online. + // @tag User + // Minimum server version: 5.35 + SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) + // UpdateUserActive deactivates or reactivates an user. // // @tag User diff --git a/plugin/api_timer_layer_generated.go b/plugin/api_timer_layer_generated.go index cd14e1b7e0..fcf191c6c7 100644 --- a/plugin/api_timer_layer_generated.go +++ b/plugin/api_timer_layer_generated.go @@ -266,6 +266,13 @@ func (api *apiTimerLayer) UpdateUserStatus(userID, status string) (*model.Status return _returnsA, _returnsB } +func (api *apiTimerLayer) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) { + startTime := timePkg.Now() + _returnsA, _returnsB := api.apiImpl.SetUserStatusTimedDND(userId, endtime) + api.recordTime(startTime, "SetUserStatusTimedDND", _returnsB == nil) + return _returnsA, _returnsB +} + func (api *apiTimerLayer) UpdateUserActive(userID string, active bool) *model.AppError { startTime := timePkg.Now() _returnsA := api.apiImpl.UpdateUserActive(userID, active) diff --git a/plugin/client_rpc_generated.go b/plugin/client_rpc_generated.go index e7c744f7b6..af3c42c757 100644 --- a/plugin/client_rpc_generated.go +++ b/plugin/client_rpc_generated.go @@ -1509,6 +1509,36 @@ func (s *apiRPCServer) UpdateUserStatus(args *Z_UpdateUserStatusArgs, returns *Z return nil } +type Z_SetUserStatusTimedDNDArgs struct { + A string + B int64 +} + +type Z_SetUserStatusTimedDNDReturns struct { + A *model.Status + B *model.AppError +} + +func (g *apiRPCClient) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) { + _args := &Z_SetUserStatusTimedDNDArgs{userId, endtime} + _returns := &Z_SetUserStatusTimedDNDReturns{} + if err := g.client.Call("Plugin.SetUserStatusTimedDND", _args, _returns); err != nil { + log.Printf("RPC call to SetUserStatusTimedDND API failed: %s", err.Error()) + } + return _returns.A, _returns.B +} + +func (s *apiRPCServer) SetUserStatusTimedDND(args *Z_SetUserStatusTimedDNDArgs, returns *Z_SetUserStatusTimedDNDReturns) error { + if hook, ok := s.impl.(interface { + SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) + }); ok { + returns.A, returns.B = hook.SetUserStatusTimedDND(args.A, args.B) + } else { + return encodableError(fmt.Errorf("API SetUserStatusTimedDND called but not implemented.")) + } + return nil +} + type Z_UpdateUserActiveArgs struct { A string B bool diff --git a/plugin/plugintest/api.go b/plugin/plugintest/api.go index 32f1a1d9bb..5066cc794d 100644 --- a/plugin/plugintest/api.go +++ b/plugin/plugintest/api.go @@ -3041,6 +3041,31 @@ func (_m *API) SetTeamIcon(teamID string, data []byte) *model.AppError { return r0 } +// SetUserStatusTimedDND provides a mock function with given fields: userId, endtime +func (_m *API) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) { + ret := _m.Called(userId, endtime) + + var r0 *model.Status + if rf, ok := ret.Get(0).(func(string, int64) *model.Status); ok { + r0 = rf(userId, endtime) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.Status) + } + } + + var r1 *model.AppError + if rf, ok := ret.Get(1).(func(string, int64) *model.AppError); ok { + r1 = rf(userId, endtime) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*model.AppError) + } + } + + return r0, r1 +} + // UnregisterCommand provides a mock function with given fields: teamID, trigger func (_m *API) UnregisterCommand(teamID string, trigger string) error { ret := _m.Called(teamID, trigger) diff --git a/store/opentracinglayer/opentracinglayer.go b/store/opentracinglayer/opentracinglayer.go index 59e6ed5a45..7359f4c6ee 100644 --- a/store/opentracinglayer/opentracinglayer.go +++ b/store/opentracinglayer/opentracinglayer.go @@ -7598,6 +7598,24 @@ func (s *OpenTracingLayerStatusStore) SaveOrUpdate(status *model.Status) error { return err } +func (s *OpenTracingLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + origCtx := s.Root.Store.Context() + span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "StatusStore.UpdateExpiredDNDStatuses") + s.Root.Store.SetContext(newCtx) + defer func() { + s.Root.Store.SetContext(origCtx) + }() + + defer span.Finish() + result, err := s.StatusStore.UpdateExpiredDNDStatuses() + if err != nil { + span.LogFields(spanlog.Error(err)) + ext.Error.Set(span, true) + } + + return result, err +} + func (s *OpenTracingLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error { origCtx := s.Root.Store.Context() span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "StatusStore.UpdateLastActivityAt") diff --git a/store/retrylayer/retrylayer.go b/store/retrylayer/retrylayer.go index c50709d187..9776d28b40 100644 --- a/store/retrylayer/retrylayer.go +++ b/store/retrylayer/retrylayer.go @@ -8256,6 +8256,26 @@ func (s *RetryLayerStatusStore) SaveOrUpdate(status *model.Status) error { } +func (s *RetryLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + + tries := 0 + for { + result, err := s.StatusStore.UpdateExpiredDNDStatuses() + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + } + +} + func (s *RetryLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error { tries := 0 diff --git a/store/sqlstore/status_store.go b/store/sqlstore/status_store.go index 162acbf956..970379aaf1 100644 --- a/store/sqlstore/status_store.go +++ b/store/sqlstore/status_store.go @@ -7,10 +7,12 @@ import ( "database/sql" "fmt" "strings" + "time" sq "github.com/Masterminds/squirrel" "github.com/pkg/errors" + "github.com/mattermost/gorp" "github.com/mattermost/mattermost-server/v5/model" "github.com/mattermost/mattermost-server/v5/store" ) @@ -27,6 +29,7 @@ func newSqlStatusStore(sqlStore *SqlStore) store.StatusStore { table.ColMap("UserId").SetMaxSize(26) table.ColMap("Status").SetMaxSize(32) table.ColMap("ActiveChannel").SetMaxSize(26) + table.ColMap("PrevStatus").SetMaxSize(32) } return s @@ -99,6 +102,119 @@ func (s SqlStatusStore) GetByIds(userIds []string) ([]*model.Status, error) { return statuses, nil } +// MySQL doesn't have support for RETURNING clause, so we use a transaction to get the updated rows. +func (s SqlStatusStore) updateExpiredStatuses(t *gorp.Transaction) ([]*model.Status, error) { + var statuses []*model.Status + currUnixTime := time.Now().UTC().Unix() + selectQuery, selectParams, err := s.getQueryBuilder(). + Select("*"). + From("Status"). + Where( + sq.And{ + sq.Eq{"Status": model.STATUS_DND}, + sq.Gt{"DNDEndTime": 0}, + sq.LtOrEq{"DNDEndTime": currUnixTime}, + }, + ).ToSql() + if err != nil { + return nil, errors.Wrap(err, "status_tosql") + } + _, err = t.Select(&statuses, selectQuery, selectParams...) + if err != nil { + return nil, errors.Wrap(err, "updateExpiredStatusesT: failed to get expired dnd statuses") + } + updateQuery, args, err := s.getQueryBuilder(). + Update("Status"). + Where( + sq.And{ + sq.Eq{"Status": model.STATUS_DND}, + sq.Gt{"DNDEndTime": 0}, + sq.LtOrEq{"DNDEndTime": currUnixTime}, + }, + ). + Set("Status", sq.Expr("PrevStatus")). + Set("PrevStatus", model.STATUS_DND). + Set("DNDEndTime", 0). + Set("Manual", false). + ToSql() + + if err != nil { + return nil, errors.Wrap(err, "status_tosql") + } + + if _, err := t.Exec(updateQuery, args...); err != nil { + return nil, errors.Wrapf(err, "updateExpiredStatusesT: failed to update statuses") + } + + return statuses, nil +} + +func (s SqlStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + if s.DriverName() == model.DATABASE_DRIVER_MYSQL { + transaction, err := s.GetMaster().Begin() + if err != nil { + return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: begin_transaction") + } + defer finalizeTransaction(transaction) + statuses, err := s.updateExpiredStatuses(transaction) + if err != nil { + return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: updateExpiredDNDStatusesT") + } + if err := transaction.Commit(); err != nil { + return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: commit_transaction") + } + + for _, status := range statuses { + status.Status = status.PrevStatus + status.PrevStatus = model.STATUS_DND + status.DNDEndTime = 0 + status.Manual = false + } + + return statuses, nil + } + + queryString, args, err := s.getQueryBuilder(). + Update("Status"). + Where( + sq.And{ + sq.Eq{"Status": model.STATUS_DND}, + sq.Gt{"DNDEndTime": 0}, + sq.LtOrEq{"DNDEndTime": time.Now().UTC().Unix()}, + }, + ). + Set("Status", sq.Expr("PrevStatus")). + Set("PrevStatus", model.STATUS_DND). + Set("DNDEndTime", 0). + Set("Manual", false). + Suffix("RETURNING *"). + ToSql() + + if err != nil { + return nil, errors.Wrap(err, "status_tosql") + } + + rows, err := s.GetMaster().Query(queryString, args...) + if err != nil { + return nil, errors.Wrap(err, "failed to find Statuses") + } + defer rows.Close() + var statuses []*model.Status + for rows.Next() { + var status model.Status + if err = rows.Scan(&status.UserId, &status.Status, &status.Manual, &status.LastActivityAt, + &status.DNDEndTime, &status.PrevStatus); err != nil { + return nil, errors.Wrap(err, "unable to scan from rows") + } + statuses = append(statuses, &status) + } + if err = rows.Err(); err != nil { + return nil, errors.Wrap(err, "failed while iterating over rows") + } + + return statuses, nil +} + func (s SqlStatusStore) ResetAll() error { if _, err := s.GetMaster().Exec("UPDATE Status SET Status = :Status WHERE Manual = false", map[string]interface{}{"Status": model.STATUS_OFFLINE}); err != nil { return errors.Wrap(err, "failed to update Statuses") diff --git a/store/sqlstore/upgrade.go b/store/sqlstore/upgrade.go index b39b4053ba..0c0eebc774 100644 --- a/store/sqlstore/upgrade.go +++ b/store/sqlstore/upgrade.go @@ -953,6 +953,7 @@ func upgradeDatabaseToVersion530(sqlStore *SqlStore) { sqlStore.CreateColumnIfNotExistsNoDefault("FileInfo", "Content", "longtext", "text") sqlStore.CreateColumnIfNotExists("SidebarCategories", "Muted", "tinyint(1)", "boolean", "0") + saveSchemaVersion(sqlStore, Version5300) } } @@ -1067,6 +1068,10 @@ func upgradeDatabaseToVersion536(sqlStore *SqlStore) { sqlStore.CreateColumnIfNotExists("SharedChannelUsers", "ChannelId", "VARCHAR(26)", "VARCHAR(26)", "") + // timed dnd status support + sqlStore.CreateColumnIfNotExistsNoDefault("Status", "DNDEndTime", "BIGINT", "BIGINT") + sqlStore.CreateColumnIfNotExistsNoDefault("Status", "PrevStatus", "VARCHAR(32)", "VARCHAR(32)") + //saveSchemaVersion(sqlStore, Version5360) //} } diff --git a/store/store.go b/store/store.go index eabbf4679e..d92a1c2d0f 100644 --- a/store/store.go +++ b/store/store.go @@ -600,6 +600,7 @@ type StatusStore interface { ResetAll() error GetTotalActiveUsersCount() (int64, error) UpdateLastActivityAt(userID string, lastActivityAt int64) error + UpdateExpiredDNDStatuses() ([]*model.Status, error) } type FileInfoStore interface { diff --git a/store/storetest/mocks/StatusStore.go b/store/storetest/mocks/StatusStore.go index f80acd4b28..483152ddc1 100644 --- a/store/storetest/mocks/StatusStore.go +++ b/store/storetest/mocks/StatusStore.go @@ -109,6 +109,29 @@ func (_m *StatusStore) SaveOrUpdate(status *model.Status) error { return r0 } +// UpdateExpiredDNDStatuses provides a mock function with given fields: +func (_m *StatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + ret := _m.Called() + + var r0 []*model.Status + if rf, ok := ret.Get(0).(func() []*model.Status); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.Status) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // UpdateLastActivityAt provides a mock function with given fields: userID, lastActivityAt func (_m *StatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error { ret := _m.Called(userID, lastActivityAt) diff --git a/store/storetest/status_store.go b/store/storetest/status_store.go index 1c3d13a803..d537932f3f 100644 --- a/store/storetest/status_store.go +++ b/store/storetest/status_store.go @@ -5,6 +5,7 @@ package storetest import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( func TestStatusStore(t *testing.T, ss store.Store) { t.Run("", func(t *testing.T) { testStatusStore(t, ss) }) t.Run("ActiveUserCount", func(t *testing.T) { testActiveUserCount(t, ss) }) + t.Run("UpdateExpiredDNDStatuses", func(t *testing.T) { testUpdateExpiredDNDStatuses(t, ss) }) } func testStatusStore(t *testing.T, ss store.Store) { @@ -61,3 +63,32 @@ type ByUserId []*model.Status func (s ByUserId) Len() int { return len(s) } func (s ByUserId) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s ByUserId) Less(i, j int) bool { return s[i].UserId < s[j].UserId } + +func testUpdateExpiredDNDStatuses(t *testing.T, ss store.Store) { + userID := NewTestId() + + status := &model.Status{UserId: userID, Status: model.STATUS_DND, Manual: true, + DNDEndTime: time.Now().Add(5 * time.Second).Unix(), PrevStatus: model.STATUS_ONLINE} + require.NoError(t, ss.Status().SaveOrUpdate(status)) + + time.Sleep(2 * time.Second) + + // after 2 seconds no statuses should be expired + statuses, err := ss.Status().UpdateExpiredDNDStatuses() + require.NoError(t, err) + require.Len(t, statuses, 0) + + time.Sleep(3 * time.Second) + + // after 3 more seconds test status should be updated + statuses, err = ss.Status().UpdateExpiredDNDStatuses() + require.NoError(t, err) + require.Len(t, statuses, 1) + + updatedStatus := *statuses[0] + require.Equal(t, updatedStatus.UserId, userID) + require.Equal(t, updatedStatus.Status, model.STATUS_ONLINE) + require.Equal(t, updatedStatus.DNDEndTime, int64(0)) + require.Equal(t, updatedStatus.PrevStatus, model.STATUS_DND) + require.Equal(t, updatedStatus.Manual, false) +} diff --git a/store/timerlayer/timerlayer.go b/store/timerlayer/timerlayer.go index f186655f49..42f16ea06b 100644 --- a/store/timerlayer/timerlayer.go +++ b/store/timerlayer/timerlayer.go @@ -6856,6 +6856,22 @@ func (s *TimerLayerStatusStore) SaveOrUpdate(status *model.Status) error { return err } +func (s *TimerLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) { + start := timemodule.Now() + + result, err := s.StatusStore.UpdateExpiredDNDStatuses() + + elapsed := float64(timemodule.Since(start)) / float64(timemodule.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("StatusStore.UpdateExpiredDNDStatuses", success, elapsed) + } + return result, err +} + func (s *TimerLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error { start := timemodule.Now()