[MM-8497] Ability to set Do Not Disturb for a specified period of time (#17680)

* Revert "Revert "[MM-8497] Ability to set Do Not Disturb for a specified period of time (#16067)" (#17657)"

This reverts commit ff383990f8.

* add debug log for recurring function

* add feature flag for dnd timed status

* refactoring changes

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
This commit is contained in:
darkLord19
2021-06-17 00:08:26 +05:30
committed by GitHub
parent 6483abd263
commit b0bdd23d2c
27 changed files with 615 additions and 21 deletions

View File

@@ -25,6 +25,7 @@ import (
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/config"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/plugin/plugintest/mock"
"github.com/mattermost/mattermost-server/v5/services/searchengine"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
"github.com/mattermost/mattermost-server/v5/store"
@@ -258,24 +259,42 @@ func SetupConfig(tb testing.TB, updateConfig func(cfg *model.Config)) *TestHelpe
func SetupConfigWithStoreMock(tb testing.TB, updateConfig func(cfg *model.Config)) *TestHelper {
th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, false, false, updateConfig, nil)
statusMock := mocks.StatusStore{}
statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil)
statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil)
statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil)
statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil)
emptyMockStore := mocks.Store{}
emptyMockStore.On("Close").Return(nil)
emptyMockStore.On("Status").Return(&statusMock)
th.App.Srv().Store = &emptyMockStore
return th
}
func SetupWithStoreMock(tb testing.TB) *TestHelper {
th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, false, false, nil, nil)
statusMock := mocks.StatusStore{}
statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil)
statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil)
statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil)
statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil)
emptyMockStore := mocks.Store{}
emptyMockStore.On("Close").Return(nil)
emptyMockStore.On("Status").Return(&statusMock)
th.App.Srv().Store = &emptyMockStore
return th
}
func SetupEnterpriseWithStoreMock(tb testing.TB) *TestHelper {
th := setupTestHelper(testlib.GetMockStoreForSetupFunctions(), nil, true, false, nil, nil)
statusMock := mocks.StatusStore{}
statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil)
statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil)
statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil)
statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil)
emptyMockStore := mocks.Store{}
emptyMockStore.On("Close").Return(nil)
emptyMockStore.On("Status").Return(&statusMock)
th.App.Srv().Store = &emptyMockStore
return th
}

View File

@@ -106,7 +106,11 @@ func updateUserStatus(c *Context, w http.ResponseWriter, r *http.Request) {
case "away":
c.App.SetStatusAwayIfNeeded(c.Params.UserId, true)
case "dnd":
c.App.SetStatusDoNotDisturb(c.Params.UserId)
if c.App.Config().FeatureFlags.TimedDND {
c.App.SetStatusDoNotDisturbTimed(c.Params.UserId, status.DNDEndTime)
} else {
c.App.SetStatusDoNotDisturb(c.Params.UserId)
}
default:
c.SetInvalidParam("status")
return

View File

@@ -5,6 +5,7 @@ package api4
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
@@ -43,6 +44,30 @@ func TestGetUserStatus(t *testing.T) {
assert.Equal(t, "dnd", userStatus.Status)
})
t.Run("dnd status timed", func(t *testing.T) {
th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(10*time.Minute).Unix())
userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "")
CheckNoError(t, resp)
assert.Equal(t, "dnd", userStatus.Status)
})
t.Run("dnd status timed restore after time interval", func(t *testing.T) {
task := model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses From Test", th.App.UpdateDNDStatusOfUsers, 1*time.Second)
defer task.Cancel()
th.App.SetStatusOnline(th.BasicUser.Id, true)
userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "")
CheckNoError(t, resp)
assert.Equal(t, "online", userStatus.Status)
th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(2*time.Second).Unix())
userStatus, resp = Client.GetUserStatus(th.BasicUser.Id, "")
CheckNoError(t, resp)
assert.Equal(t, "dnd", userStatus.Status)
time.Sleep(3 * time.Second)
userStatus, resp = Client.GetUserStatus(th.BasicUser.Id, "")
CheckNoError(t, resp)
assert.Equal(t, "online", userStatus.Status)
})
t.Run("back to offline status", func(t *testing.T) {
th.App.SetStatusOffline(th.BasicUser.Id, true)
userStatus, resp := Client.GetUserStatus(th.BasicUser.Id, "")
@@ -131,6 +156,16 @@ func TestGetUsersStatusesByIds(t *testing.T) {
}
})
t.Run("dnd status", func(t *testing.T) {
th.App.SetStatusDoNotDisturbTimed(th.BasicUser.Id, time.Now().Add(10*time.Minute).Unix())
th.App.SetStatusDoNotDisturbTimed(th.BasicUser2.Id, time.Now().Add(15*time.Minute).Unix())
usersStatuses, resp := Client.GetUsersStatusesByIds(usersIds)
CheckNoError(t, resp)
for _, userStatus := range usersStatuses {
assert.Equal(t, "dnd", userStatus.Status)
}
})
t.Run("get statuses from logged out user", func(t *testing.T) {
Client.Logout()
@@ -158,8 +193,8 @@ func TestUpdateUserStatus(t *testing.T) {
assert.Equal(t, "away", updateUserStatus.Status)
})
t.Run("set dnd status", func(t *testing.T) {
toUpdateUserStatus := &model.Status{Status: "dnd", UserId: th.BasicUser.Id}
t.Run("set dnd status timed", func(t *testing.T) {
toUpdateUserStatus := &model.Status{Status: "dnd", UserId: th.BasicUser.Id, DNDEndTime: time.Now().Add(10 * time.Minute).Unix()}
updateUserStatus, resp := Client.UpdateUserStatus(th.BasicUser.Id, toUpdateUserStatus)
CheckNoError(t, resp)
assert.Equal(t, "dnd", updateUserStatus.Status)

View File

@@ -575,3 +575,7 @@ func (a *App) CheckIntegrity() <-chan model.IntegrityCheckResult {
func (a *App) SetServer(srv *Server) {
a.srv = srv
}
func (a *App) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
return a.Srv().Store.Status().UpdateExpiredDNDStatuses()
}

View File

@@ -307,6 +307,9 @@ type AppIface interface {
// relative to either the session creation date or the current time, depending
// on the `ExtendSessionOnActivity` config setting.
SetSessionExpireInDays(session *model.Session, days int)
// SetStatusDoNotDisturbTimed takes endtime in unix epoch format in UTC
// and sets status of given userId to dnd which will be restored back after endtime
SetStatusDoNotDisturbTimed(userId string, endtime int64)
// SetStatusLastActivityAt sets the last activity at for a user on the local app server and updates
// status to away if needed. Used by the WS to set status to away if an 'online' device disconnects
// while an 'away' device is still connected
@@ -355,6 +358,9 @@ type AppIface interface {
UpdateChannel(channel *model.Channel) (*model.Channel, *model.AppError)
// UpdateChannelScheme saves the new SchemeId of the channel passed.
UpdateChannelScheme(channel *model.Channel) (*model.Channel, *model.AppError)
// UpdateDNDStatusOfUsers is a recurring task which is started when server starts
// which unsets dnd status of users if needed and saves and broadcasts it
UpdateDNDStatusOfUsers()
// UpdateProductNotices is called periodically from a scheduled worker to fetch new notices and update the cache
UpdateProductNotices() *model.AppError
// UpdateViewedProductNotices is called from the frontend to mark a set of notices as 'viewed' by user
@@ -1044,6 +1050,7 @@ type AppIface interface {
UpdateCommand(oldCmd, updatedCmd *model.Command) (*model.Command, *model.AppError)
UpdateConfig(f func(*model.Config))
UpdateEphemeralPost(userID string, post *model.Post) *model.Post
UpdateExpiredDNDStatuses() ([]*model.Status, error)
UpdateGroup(group *model.Group) (*model.Group, *model.AppError)
UpdateGroupSyncable(groupSyncable *model.GroupSyncable) (*model.GroupSyncable, *model.AppError)
UpdateHashedPassword(user *model.User, newHashedPassword string) *model.AppError

View File

@@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/v5/app/request"
@@ -161,8 +162,14 @@ func SetupWithoutPreloadMigrations(tb testing.TB) *TestHelper {
func SetupWithStoreMock(tb testing.TB) *TestHelper {
mockStore := testlib.GetMockStoreForSetupFunctions()
th := setupTestHelper(mockStore, false, false, tb)
statusMock := mocks.StatusStore{}
statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil)
statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil)
statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil)
statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil)
emptyMockStore := mocks.Store{}
emptyMockStore.On("Close").Return(nil)
emptyMockStore.On("Status").Return(&statusMock)
th.App.Srv().Store = &emptyMockStore
return th
}
@@ -170,8 +177,14 @@ func SetupWithStoreMock(tb testing.TB) *TestHelper {
func SetupEnterpriseWithStoreMock(tb testing.TB) *TestHelper {
mockStore := testlib.GetMockStoreForSetupFunctions()
th := setupTestHelper(mockStore, true, false, tb)
statusMock := mocks.StatusStore{}
statusMock.On("UpdateExpiredDNDStatuses").Return([]*model.Status{}, nil)
statusMock.On("Get", "user1").Return(&model.Status{UserId: "user1", Status: model.STATUS_ONLINE}, nil)
statusMock.On("UpdateLastActivityAt", "user1", mock.Anything).Return(nil)
statusMock.On("SaveOrUpdate", mock.AnythingOfType("*model.Status")).Return(nil)
emptyMockStore := mocks.Store{}
emptyMockStore.On("Close").Return(nil)
emptyMockStore.On("Status").Return(&statusMock)
th.App.Srv().Store = &emptyMockStore
return th
}

View File

@@ -15005,6 +15005,21 @@ func (a *OpenTracingAppLayer) SetStatusDoNotDisturb(userID string) {
a.app.SetStatusDoNotDisturb(userID)
}
func (a *OpenTracingAppLayer) SetStatusDoNotDisturbTimed(userId string, endtime int64) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SetStatusDoNotDisturbTimed")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
a.app.SetStatusDoNotDisturbTimed(userId, endtime)
}
func (a *OpenTracingAppLayer) SetStatusLastActivityAt(userID string, activityAt int64) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.SetStatusLastActivityAt")
@@ -15930,6 +15945,21 @@ func (a *OpenTracingAppLayer) UpdateConfig(f func(*model.Config)) {
a.app.UpdateConfig(f)
}
func (a *OpenTracingAppLayer) UpdateDNDStatusOfUsers() {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateDNDStatusOfUsers")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
a.app.UpdateDNDStatusOfUsers()
}
func (a *OpenTracingAppLayer) UpdateEphemeralPost(userID string, post *model.Post) *model.Post {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateEphemeralPost")
@@ -15947,6 +15977,28 @@ func (a *OpenTracingAppLayer) UpdateEphemeralPost(userID string, post *model.Pos
return resultVar0
}
func (a *OpenTracingAppLayer) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateExpiredDNDStatuses")
a.ctx = newCtx
a.app.Srv().Store.SetContext(newCtx)
defer func() {
a.app.Srv().Store.SetContext(origCtx)
a.ctx = origCtx
}()
defer span.Finish()
resultVar0, resultVar1 := a.app.UpdateExpiredDNDStatuses()
if resultVar1 != nil {
span.LogFields(spanlog.Error(resultVar1))
ext.Error.Set(span, true)
}
return resultVar0, resultVar1
}
func (a *OpenTracingAppLayer) UpdateGroup(group *model.Group) (*model.Group, *model.AppError) {
origCtx := a.ctx
span, newCtx := tracing.StartSpanWithParentByContext(a.ctx, "app.UpdateGroup")

View File

@@ -317,6 +317,14 @@ func (api *PluginAPI) UpdateUserStatus(userID, status string) (*model.Status, *m
return api.app.GetStatus(userID)
}
func (api *PluginAPI) SetUserStatusTimedDND(userID string, endTime int64) (*model.Status, *model.AppError) {
// read-after-write bug which will fail if there are replicas.
// it works for now because we have a cache in between.
// FIXME: make SetStatusDoNotDisturbTimed return updated status
api.app.SetStatusDoNotDisturbTimed(userID, endTime)
return api.app.GetStatus(userID)
}
func (api *PluginAPI) GetUsersInChannel(channelID, sortBy string, page, perPage int) ([]*model.User, *model.AppError) {
switch sortBy {
case model.CHANNEL_SORT_BY_USERNAME:

View File

@@ -209,6 +209,9 @@ type Server struct {
imgDecoder *imaging.Decoder
imgEncoder *imaging.Encoder
dndTaskMut sync.Mutex
dndTask *model.ScheduledTask
}
func NewServer(options ...Option) (*Server, error) {
@@ -673,6 +676,7 @@ func NewServer(options ...Option) (*Server, error) {
s.runLicenseExpirationCheckJob()
runCheckAdminSupportStatusJob(app, c)
runCheckWarnMetricStatusJob(app, c)
runDNDStatusExpireJob(app)
})
s.runJobs()
}
@@ -689,6 +693,14 @@ func NewServer(options ...Option) (*Server, error) {
s.ShutDownPlugins()
}
})
s.AddConfigListener(func(oldCfg, newCfg *model.Config) {
if !oldCfg.FeatureFlags.TimedDND && newCfg.FeatureFlags.TimedDND {
runDNDStatusExpireJob(app)
}
if oldCfg.FeatureFlags.TimedDND && !newCfg.FeatureFlags.TimedDND {
stopDNDStatusExpireJob(app)
}
})
return s, nil
}
@@ -1051,6 +1063,12 @@ func (s *Server) Shutdown() {
mlog.Warn("Error flushing logs", mlog.Err(err))
}
s.dndTaskMut.Lock()
if s.dndTask != nil {
s.dndTask.Cancel()
}
s.dndTaskMut.Unlock()
mlog.Info("Server stopped")
// this should just write the "server stopped" record, the rest are already flushed.
@@ -2284,3 +2302,41 @@ func (s *Server) ReadFile(path string) ([]byte, *model.AppError) {
// }
// return result, nil
// }
func createDNDStatusExpirationRecurringTask(a *App) {
a.srv.dndTaskMut.Lock()
a.srv.dndTask = model.CreateRecurringTaskFromNextIntervalTime("Unset DND Statuses", a.UpdateDNDStatusOfUsers, 5*time.Minute)
a.srv.dndTaskMut.Unlock()
}
func cancelDNDStatusExpirationRecurringTask(a *App) {
a.srv.dndTaskMut.Lock()
if a.srv.dndTask != nil {
a.srv.dndTask.Cancel()
a.srv.dndTask = nil
}
a.srv.dndTaskMut.Unlock()
}
func runDNDStatusExpireJob(a *App) {
if !a.Config().FeatureFlags.TimedDND {
return
}
if a.IsLeader() {
createDNDStatusExpirationRecurringTask(a)
}
a.srv.AddClusterLeaderChangedListener(func() {
mlog.Info("Cluster leader changed. Determining if unset DNS status task should be running", mlog.Bool("isLeader", a.IsLeader()))
if a.IsLeader() {
createDNDStatusExpirationRecurringTask(a)
} else {
cancelDNDStatusExpirationRecurringTask(a)
}
})
}
func stopDNDStatusExpireJob(a *App) {
if a.IsLeader() {
cancelDNDStatusExpirationRecurringTask(a)
}
}

View File

@@ -282,6 +282,28 @@ func (a *App) SetStatusAwayIfNeeded(userID string, manual bool) {
a.SaveAndBroadcastStatus(status)
}
// SetStatusDoNotDisturbTimed takes endtime in unix epoch format in UTC
// and sets status of given userId to dnd which will be restored back after endtime
func (a *App) SetStatusDoNotDisturbTimed(userId string, endtime int64) {
if !*a.Config().ServiceSettings.EnableUserStatuses {
return
}
status, err := a.GetStatus(userId)
if err != nil {
status = &model.Status{UserId: userId, Status: model.STATUS_OFFLINE, Manual: false, LastActivityAt: 0, ActiveChannel: ""}
}
status.PrevStatus = status.Status
status.Status = model.STATUS_DND
status.Manual = true
status.DNDEndTime = endtime
a.SaveAndBroadcastStatus(status)
}
func (a *App) SetStatusDoNotDisturb(userID string) {
if !*a.Config().ServiceSettings.EnableUserStatuses {
return
@@ -365,6 +387,21 @@ func (a *App) IsUserAway(lastActivityAt int64) bool {
return model.GetMillis()-lastActivityAt >= *a.Config().TeamSettings.UserStatusAwayTimeout*1000
}
// UpdateDNDStatusOfUsers is a recurring task which is started when server starts
// which unsets dnd status of users if needed and saves and broadcasts it
func (a *App) UpdateDNDStatusOfUsers() {
mlog.Debug("UpdateDNDStatusOfUsers: scheduled run started")
statuses, err := a.UpdateExpiredDNDStatuses()
if err != nil {
mlog.Warn("Failed to fetch dnd statues from store", mlog.String("err", err.Error()))
return
}
for i := range statuses {
a.AddStatusCache(statuses[i])
a.BroadcastStatus(statuses[i])
}
}
func (a *App) SetCustomStatus(userID string, cs *model.CustomStatus) *model.AppError {
user, err := a.GetUser(userID)
if err != nil {

View File

@@ -35,6 +35,9 @@ type FeatureFlags struct {
// Control support for custom data retention policies
CustomDataRetentionEnabled bool
// Enable timed dnd support for user status
TimedDND bool
}
func (f *FeatureFlags) SetDefaults() {
@@ -49,6 +52,7 @@ func (f *FeatureFlags) SetDefaults() {
f.PluginApps = ""
f.PluginFocalboard = ""
f.CustomDataRetentionEnabled = false
f.TimedDND = false
}
func (f *FeatureFlags) Plugins() map[string]string {

View File

@@ -11,42 +11,65 @@ import (
type TaskFunc func()
type ScheduledTask struct {
Name string `json:"name"`
Interval time.Duration `json:"interval"`
Recurring bool `json:"recurring"`
function func()
cancel chan struct{}
cancelled chan struct{}
Name string `json:"name"`
Interval time.Duration `json:"interval"`
Recurring bool `json:"recurring"`
function func()
cancel chan struct{}
cancelled chan struct{}
fromNextIntervalTime bool
}
func CreateTask(name string, function TaskFunc, timeToExecution time.Duration) *ScheduledTask {
return createTask(name, function, timeToExecution, false)
return createTask(name, function, timeToExecution, false, false)
}
func CreateRecurringTask(name string, function TaskFunc, interval time.Duration) *ScheduledTask {
return createTask(name, function, interval, true)
return createTask(name, function, interval, true, false)
}
func createTask(name string, function TaskFunc, interval time.Duration, recurring bool) *ScheduledTask {
func CreateRecurringTaskFromNextIntervalTime(name string, function TaskFunc, interval time.Duration) *ScheduledTask {
return createTask(name, function, interval, true, true)
}
func createTask(name string, function TaskFunc, interval time.Duration, recurring bool, fromNextIntervalTime bool) *ScheduledTask {
task := &ScheduledTask{
Name: name,
Interval: interval,
Recurring: recurring,
function: function,
cancel: make(chan struct{}),
cancelled: make(chan struct{}),
Name: name,
Interval: interval,
Recurring: recurring,
function: function,
cancel: make(chan struct{}),
cancelled: make(chan struct{}),
fromNextIntervalTime: fromNextIntervalTime,
}
go func() {
defer close(task.cancelled)
ticker := time.NewTicker(interval)
var firstTick <-chan time.Time
var ticker *time.Ticker
if task.fromNextIntervalTime {
currTime := time.Now()
first := currTime.Truncate(interval)
if first.Before(currTime) {
first = first.Add(interval)
}
firstTick = time.After(time.Until(first))
ticker = &time.Ticker{C: nil}
} else {
firstTick = nil
ticker = time.NewTicker(interval)
}
defer func() {
ticker.Stop()
}()
for {
select {
case <-firstTick:
ticker = time.NewTicker(interval)
function()
case <-ticker.C:
function()
case <-task.cancel:

View File

@@ -4,6 +4,7 @@
package model
import (
"sync"
"sync/atomic"
"testing"
"time"
@@ -73,3 +74,35 @@ func TestCancelTask(t *testing.T) {
time.Sleep(TASK_TIME + time.Second)
assert.EqualValues(t, 0, atomic.LoadInt32(executionCount))
}
func TestCreateRecurringTaskFromNextIntervalTime(t *testing.T) {
TASK_NAME := "Test Recurring Task starting from next interval time"
TASK_TIME := time.Second * 2
var executionTime time.Time
var mu sync.Mutex
testFunc := func() {
mu.Lock()
executionTime = time.Now()
mu.Unlock()
}
task := CreateRecurringTaskFromNextIntervalTime(TASK_NAME, testFunc, TASK_TIME)
defer task.Cancel()
time.Sleep(TASK_TIME)
mu.Lock()
expectedSeconds := executionTime.Second()
mu.Unlock()
assert.EqualValues(t, 0, expectedSeconds%2)
time.Sleep(TASK_TIME)
mu.Lock()
expectedSeconds = executionTime.Second()
mu.Unlock()
assert.EqualValues(t, 0, expectedSeconds%2)
assert.Equal(t, TASK_NAME, task.Name)
assert.Equal(t, TASK_TIME, task.Interval)
assert.True(t, task.Recurring)
}

View File

@@ -25,6 +25,8 @@ type Status struct {
Manual bool `json:"manual"`
LastActivityAt int64 `json:"last_activity_at"`
ActiveChannel string `json:"active_channel,omitempty" db:"-"`
DNDEndTime int64 `json:"dnd_end_time"`
PrevStatus string `json:"-"`
}
func (o *Status) ToJson() string {

View File

@@ -12,7 +12,7 @@ import (
)
func TestStatus(t *testing.T) {
status := Status{NewId(), STATUS_ONLINE, true, 0, "123"}
status := Status{NewId(), STATUS_ONLINE, true, 0, "123", 0, ""}
json := status.ToJson()
status2 := StatusFromJson(strings.NewReader(json))
@@ -29,7 +29,7 @@ func TestStatus(t *testing.T) {
}
func TestStatusListToJson(t *testing.T) {
statuses := []*Status{{NewId(), STATUS_ONLINE, true, 0, "123"}, {NewId(), STATUS_OFFLINE, true, 0, ""}}
statuses := []*Status{{NewId(), STATUS_ONLINE, true, 0, "123", 0, ""}, {NewId(), STATUS_OFFLINE, true, 0, "", 0, ""}}
jsonStatuses := StatusListToJson(statuses)
var dat []map[string]interface{}

View File

@@ -229,6 +229,12 @@ type API interface {
// Minimum server version: 5.2
UpdateUserStatus(userID, status string) (*model.Status, *model.AppError)
// SetUserStatusTimedDND will set a user's status to dnd for given time until the user,
// or another integration/plugin, sets it back to online.
// @tag User
// Minimum server version: 5.35
SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError)
// UpdateUserActive deactivates or reactivates an user.
//
// @tag User

View File

@@ -266,6 +266,13 @@ func (api *apiTimerLayer) UpdateUserStatus(userID, status string) (*model.Status
return _returnsA, _returnsB
}
func (api *apiTimerLayer) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) {
startTime := timePkg.Now()
_returnsA, _returnsB := api.apiImpl.SetUserStatusTimedDND(userId, endtime)
api.recordTime(startTime, "SetUserStatusTimedDND", _returnsB == nil)
return _returnsA, _returnsB
}
func (api *apiTimerLayer) UpdateUserActive(userID string, active bool) *model.AppError {
startTime := timePkg.Now()
_returnsA := api.apiImpl.UpdateUserActive(userID, active)

View File

@@ -1509,6 +1509,36 @@ func (s *apiRPCServer) UpdateUserStatus(args *Z_UpdateUserStatusArgs, returns *Z
return nil
}
type Z_SetUserStatusTimedDNDArgs struct {
A string
B int64
}
type Z_SetUserStatusTimedDNDReturns struct {
A *model.Status
B *model.AppError
}
func (g *apiRPCClient) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) {
_args := &Z_SetUserStatusTimedDNDArgs{userId, endtime}
_returns := &Z_SetUserStatusTimedDNDReturns{}
if err := g.client.Call("Plugin.SetUserStatusTimedDND", _args, _returns); err != nil {
log.Printf("RPC call to SetUserStatusTimedDND API failed: %s", err.Error())
}
return _returns.A, _returns.B
}
func (s *apiRPCServer) SetUserStatusTimedDND(args *Z_SetUserStatusTimedDNDArgs, returns *Z_SetUserStatusTimedDNDReturns) error {
if hook, ok := s.impl.(interface {
SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError)
}); ok {
returns.A, returns.B = hook.SetUserStatusTimedDND(args.A, args.B)
} else {
return encodableError(fmt.Errorf("API SetUserStatusTimedDND called but not implemented."))
}
return nil
}
type Z_UpdateUserActiveArgs struct {
A string
B bool

View File

@@ -3057,6 +3057,31 @@ func (_m *API) SetTeamIcon(teamID string, data []byte) *model.AppError {
return r0
}
// SetUserStatusTimedDND provides a mock function with given fields: userId, endtime
func (_m *API) SetUserStatusTimedDND(userId string, endtime int64) (*model.Status, *model.AppError) {
ret := _m.Called(userId, endtime)
var r0 *model.Status
if rf, ok := ret.Get(0).(func(string, int64) *model.Status); ok {
r0 = rf(userId, endtime)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Status)
}
}
var r1 *model.AppError
if rf, ok := ret.Get(1).(func(string, int64) *model.AppError); ok {
r1 = rf(userId, endtime)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*model.AppError)
}
}
return r0, r1
}
// UnregisterCommand provides a mock function with given fields: teamID, trigger
func (_m *API) UnregisterCommand(teamID string, trigger string) error {
ret := _m.Called(teamID, trigger)

View File

@@ -7634,6 +7634,24 @@ func (s *OpenTracingLayerStatusStore) SaveOrUpdate(status *model.Status) error {
return err
}
func (s *OpenTracingLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "StatusStore.UpdateExpiredDNDStatuses")
s.Root.Store.SetContext(newCtx)
defer func() {
s.Root.Store.SetContext(origCtx)
}()
defer span.Finish()
result, err := s.StatusStore.UpdateExpiredDNDStatuses()
if err != nil {
span.LogFields(spanlog.Error(err))
ext.Error.Set(span, true)
}
return result, err
}
func (s *OpenTracingLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error {
origCtx := s.Root.Store.Context()
span, newCtx := tracing.StartSpanWithParentByContext(s.Root.Store.Context(), "StatusStore.UpdateLastActivityAt")

View File

@@ -8296,6 +8296,26 @@ func (s *RetryLayerStatusStore) SaveOrUpdate(status *model.Status) error {
}
func (s *RetryLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
tries := 0
for {
result, err := s.StatusStore.UpdateExpiredDNDStatuses()
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
}
}
func (s *RetryLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error {
tries := 0

View File

@@ -7,10 +7,12 @@ import (
"database/sql"
"fmt"
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/pkg/errors"
"github.com/mattermost/gorp"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/store"
)
@@ -27,6 +29,7 @@ func newSqlStatusStore(sqlStore *SqlStore) store.StatusStore {
table.ColMap("UserId").SetMaxSize(26)
table.ColMap("Status").SetMaxSize(32)
table.ColMap("ActiveChannel").SetMaxSize(26)
table.ColMap("PrevStatus").SetMaxSize(32)
}
return s
@@ -98,6 +101,119 @@ func (s SqlStatusStore) GetByIds(userIds []string) ([]*model.Status, error) {
return statuses, nil
}
// MySQL doesn't have support for RETURNING clause, so we use a transaction to get the updated rows.
func (s SqlStatusStore) updateExpiredStatuses(t *gorp.Transaction) ([]*model.Status, error) {
var statuses []*model.Status
currUnixTime := time.Now().UTC().Unix()
selectQuery, selectParams, err := s.getQueryBuilder().
Select("*").
From("Status").
Where(
sq.And{
sq.Eq{"Status": model.STATUS_DND},
sq.Gt{"DNDEndTime": 0},
sq.LtOrEq{"DNDEndTime": currUnixTime},
},
).ToSql()
if err != nil {
return nil, errors.Wrap(err, "status_tosql")
}
_, err = t.Select(&statuses, selectQuery, selectParams...)
if err != nil {
return nil, errors.Wrap(err, "updateExpiredStatusesT: failed to get expired dnd statuses")
}
updateQuery, args, err := s.getQueryBuilder().
Update("Status").
Where(
sq.And{
sq.Eq{"Status": model.STATUS_DND},
sq.Gt{"DNDEndTime": 0},
sq.LtOrEq{"DNDEndTime": currUnixTime},
},
).
Set("Status", sq.Expr("PrevStatus")).
Set("PrevStatus", model.STATUS_DND).
Set("DNDEndTime", 0).
Set("Manual", false).
ToSql()
if err != nil {
return nil, errors.Wrap(err, "status_tosql")
}
if _, err := t.Exec(updateQuery, args...); err != nil {
return nil, errors.Wrapf(err, "updateExpiredStatusesT: failed to update statuses")
}
return statuses, nil
}
func (s SqlStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
if s.DriverName() == model.DATABASE_DRIVER_MYSQL {
transaction, err := s.GetMaster().Begin()
if err != nil {
return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: begin_transaction")
}
defer finalizeTransaction(transaction)
statuses, err := s.updateExpiredStatuses(transaction)
if err != nil {
return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: updateExpiredDNDStatusesT")
}
if err := transaction.Commit(); err != nil {
return nil, errors.Wrap(err, "UpdateExpiredDNDStatuses: commit_transaction")
}
for _, status := range statuses {
status.Status = status.PrevStatus
status.PrevStatus = model.STATUS_DND
status.DNDEndTime = 0
status.Manual = false
}
return statuses, nil
}
queryString, args, err := s.getQueryBuilder().
Update("Status").
Where(
sq.And{
sq.Eq{"Status": model.STATUS_DND},
sq.Gt{"DNDEndTime": 0},
sq.LtOrEq{"DNDEndTime": time.Now().UTC().Unix()},
},
).
Set("Status", sq.Expr("PrevStatus")).
Set("PrevStatus", model.STATUS_DND).
Set("DNDEndTime", 0).
Set("Manual", false).
Suffix("RETURNING *").
ToSql()
if err != nil {
return nil, errors.Wrap(err, "status_tosql")
}
rows, err := s.GetMaster().Query(queryString, args...)
if err != nil {
return nil, errors.Wrap(err, "failed to find Statuses")
}
defer rows.Close()
var statuses []*model.Status
for rows.Next() {
var status model.Status
if err = rows.Scan(&status.UserId, &status.Status, &status.Manual, &status.LastActivityAt,
&status.DNDEndTime, &status.PrevStatus); err != nil {
return nil, errors.Wrap(err, "unable to scan from rows")
}
statuses = append(statuses, &status)
}
if err = rows.Err(); err != nil {
return nil, errors.Wrap(err, "failed while iterating over rows")
}
return statuses, nil
}
func (s SqlStatusStore) ResetAll() error {
if _, err := s.GetMaster().Exec("UPDATE Status SET Status = :Status WHERE Manual = false", map[string]interface{}{"Status": model.STATUS_OFFLINE}); err != nil {
return errors.Wrap(err, "failed to update Statuses")

View File

@@ -972,6 +972,7 @@ func upgradeDatabaseToVersion530(sqlStore *SqlStore) {
sqlStore.CreateColumnIfNotExistsNoDefault("FileInfo", "Content", "longtext", "text")
sqlStore.CreateColumnIfNotExists("SidebarCategories", "Muted", "tinyint(1)", "boolean", "0")
saveSchemaVersion(sqlStore, Version5300)
}
}
@@ -1087,6 +1088,9 @@ func upgradeDatabaseToVersion536(sqlStore *SqlStore) {
sqlStore.CreateColumnIfNotExists("SharedChannelRemotes", "LastPostUpdateAt", "bigint", "bigint", "0")
sqlStore.CreateColumnIfNotExists("SharedChannelRemotes", "LastPostId", "VARCHAR(26)", "VARCHAR(26)", "")
sqlStore.CreateColumnIfNotExistsNoDefault("Status", "DNDEndTime", "bigint", "bigint")
sqlStore.CreateColumnIfNotExistsNoDefault("Status", "PrevStatus", "VARCHAR(32)", "VARCHAR(32)")
saveSchemaVersion(sqlStore, Version5360)
}
}

View File

@@ -605,6 +605,7 @@ type StatusStore interface {
ResetAll() error
GetTotalActiveUsersCount() (int64, error)
UpdateLastActivityAt(userID string, lastActivityAt int64) error
UpdateExpiredDNDStatuses() ([]*model.Status, error)
}
type FileInfoStore interface {

View File

@@ -109,6 +109,29 @@ func (_m *StatusStore) SaveOrUpdate(status *model.Status) error {
return r0
}
// UpdateExpiredDNDStatuses provides a mock function with given fields:
func (_m *StatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
ret := _m.Called()
var r0 []*model.Status
if rf, ok := ret.Get(0).(func() []*model.Status); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.Status)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UpdateLastActivityAt provides a mock function with given fields: userID, lastActivityAt
func (_m *StatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error {
ret := _m.Called(userID, lastActivityAt)

View File

@@ -5,6 +5,7 @@ package storetest
import (
"testing"
"time"
"github.com/stretchr/testify/require"
@@ -15,6 +16,7 @@ import (
func TestStatusStore(t *testing.T, ss store.Store) {
t.Run("", func(t *testing.T) { testStatusStore(t, ss) })
t.Run("ActiveUserCount", func(t *testing.T) { testActiveUserCount(t, ss) })
t.Run("UpdateExpiredDNDStatuses", func(t *testing.T) { testUpdateExpiredDNDStatuses(t, ss) })
}
func testStatusStore(t *testing.T, ss store.Store) {
@@ -61,3 +63,32 @@ type ByUserId []*model.Status
func (s ByUserId) Len() int { return len(s) }
func (s ByUserId) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ByUserId) Less(i, j int) bool { return s[i].UserId < s[j].UserId }
func testUpdateExpiredDNDStatuses(t *testing.T, ss store.Store) {
userID := NewTestId()
status := &model.Status{UserId: userID, Status: model.STATUS_DND, Manual: true,
DNDEndTime: time.Now().Add(5 * time.Second).Unix(), PrevStatus: model.STATUS_ONLINE}
require.NoError(t, ss.Status().SaveOrUpdate(status))
time.Sleep(2 * time.Second)
// after 2 seconds no statuses should be expired
statuses, err := ss.Status().UpdateExpiredDNDStatuses()
require.NoError(t, err)
require.Len(t, statuses, 0)
time.Sleep(3 * time.Second)
// after 3 more seconds test status should be updated
statuses, err = ss.Status().UpdateExpiredDNDStatuses()
require.NoError(t, err)
require.Len(t, statuses, 1)
updatedStatus := *statuses[0]
require.Equal(t, updatedStatus.UserId, userID)
require.Equal(t, updatedStatus.Status, model.STATUS_ONLINE)
require.Equal(t, updatedStatus.DNDEndTime, int64(0))
require.Equal(t, updatedStatus.PrevStatus, model.STATUS_DND)
require.Equal(t, updatedStatus.Manual, false)
}

View File

@@ -6888,6 +6888,22 @@ func (s *TimerLayerStatusStore) SaveOrUpdate(status *model.Status) error {
return err
}
func (s *TimerLayerStatusStore) UpdateExpiredDNDStatuses() ([]*model.Status, error) {
start := timemodule.Now()
result, err := s.StatusStore.UpdateExpiredDNDStatuses()
elapsed := float64(timemodule.Since(start)) / float64(timemodule.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("StatusStore.UpdateExpiredDNDStatuses", success, elapsed)
}
return result, err
}
func (s *TimerLayerStatusStore) UpdateLastActivityAt(userID string, lastActivityAt int64) error {
start := timemodule.Now()