remove request.Context from shared channel service methods (#17606)

This commit is contained in:
Ibrahim Serdar Acikgoz
2021-05-11 22:29:30 +03:00
committed by GitHub
parent 4d018c2121
commit 35bfae9052
10 changed files with 46 additions and 58 deletions

View File

@@ -104,7 +104,7 @@ func remoteClusterAcceptMessage(c *Context, w http.ResponseWriter, r *http.Reque
auditRec.AddMeta("remoteCluster", rc)
// pass message to Remote Cluster Service and write response
resp := service.ReceiveIncomingMsg(c.AppContext, rc, frame.Msg)
resp := service.ReceiveIncomingMsg(rc, frame.Msg)
b, errMarshall := json.Marshal(resp)
if errMarshall != nil {

View File

@@ -6,7 +6,6 @@ package app
import (
"context"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/services/remotecluster"
)
@@ -71,6 +70,6 @@ func (mrcs *mockRemoteClusterService) AcceptInvitation(invite *model.RemoteClust
return nil, nil
}
func (mrcs *mockRemoteClusterService) ReceiveIncomingMsg(_ *request.Context, rc *model.RemoteCluster, msg model.RemoteClusterMsg) remotecluster.Response {
func (mrcs *mockRemoteClusterService) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) remotecluster.Response {
return remotecluster.Response{}
}

View File

@@ -6,7 +6,6 @@ package remotecluster
import (
"fmt"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
)
@@ -14,7 +13,7 @@ import (
// ReceiveIncomingMsg is called by the Rest API layer, or websocket layer (future), when a Remote Cluster
// message is received. Here we route the message to any topic listeners.
// `rc` and `msg` cannot be nil.
func (rcs *Service) ReceiveIncomingMsg(c *request.Context, rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response {
func (rcs *Service) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response {
rcs.mux.RLock()
defer rcs.mux.RUnlock()
@@ -32,7 +31,7 @@ func (rcs *Service) ReceiveIncomingMsg(c *request.Context, rc *model.RemoteClust
listeners := rcs.getTopicListeners(msg.Topic)
for _, l := range listeners {
if err := callback(l, c, msg, &rcSanitized, &response); err != nil {
if err := callback(l, msg, &rcSanitized, &response); err != nil {
rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceError, "Error from remote cluster message listener",
mlog.String("msgId", msg.Id), mlog.String("topic", msg.Topic), mlog.String("remote", rc.DisplayName), mlog.Err(err))
@@ -43,12 +42,12 @@ func (rcs *Service) ReceiveIncomingMsg(c *request.Context, rc *model.RemoteClust
return response
}
func callback(listener TopicListener, c *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) (err error) {
func callback(listener TopicListener, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
}()
err = listener(c, msg, rc, resp)
err = listener(msg, rc, resp)
return
}

View File

@@ -10,7 +10,6 @@ import (
"sync"
"time"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/einterfaces"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
@@ -66,12 +65,12 @@ type RemoteClusterServiceIFace interface {
SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, rc *model.RemoteCluster, rp ReaderProvider, f SendFileResultFunc) error
SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error
AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName string, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error)
ReceiveIncomingMsg(c *request.Context, rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response
ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response
}
// TopicListener is a callback signature used to listen for incoming messages for
// a specific topic.
type TopicListener func(c *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error
type TopicListener func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error
// ConnectionStateListener is used to listen to remote cluster connection state changes.
type ConnectionStateListener func(rc *model.RemoteCluster, online bool)

View File

@@ -10,22 +10,21 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
)
func TestService_AddTopicListener(t *testing.T) {
var count int32
l1 := func(_ *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
l1 := func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
atomic.AddInt32(&count, 1)
return nil
}
l2 := func(_ *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
l2 := func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
atomic.AddInt32(&count, 1)
return nil
}
l3 := func(_ *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
l3 := func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) error {
atomic.AddInt32(&count, 1)
return nil
}
@@ -47,28 +46,26 @@ func TestService_AddTopicListener(t *testing.T) {
msg1 := model.RemoteClusterMsg{Topic: "test"}
msg2 := model.RemoteClusterMsg{Topic: "different"}
c := request.EmptyContext()
service.ReceiveIncomingMsg(c, rc, msg1)
service.ReceiveIncomingMsg(rc, msg1)
assert.Equal(t, int32(2), atomic.LoadInt32(&count))
service.ReceiveIncomingMsg(c, rc, msg2)
service.ReceiveIncomingMsg(rc, msg2)
assert.Equal(t, int32(3), atomic.LoadInt32(&count))
service.RemoveTopicListener(l1id)
service.ReceiveIncomingMsg(c, rc, msg1)
service.ReceiveIncomingMsg(rc, msg1)
assert.Equal(t, int32(4), atomic.LoadInt32(&count))
service.RemoveTopicListener(l2id)
service.ReceiveIncomingMsg(c, rc, msg1)
service.ReceiveIncomingMsg(rc, msg1)
assert.Equal(t, int32(4), atomic.LoadInt32(&count))
service.ReceiveIncomingMsg(c, rc, msg2)
service.ReceiveIncomingMsg(rc, msg2)
assert.Equal(t, int32(5), atomic.LoadInt32(&count))
service.RemoveTopicListener(l3id)
service.ReceiveIncomingMsg(c, rc, msg1)
service.ReceiveIncomingMsg(c, rc, msg2)
service.ReceiveIncomingMsg(rc, msg1)
service.ReceiveIncomingMsg(rc, msg2)
assert.Equal(t, int32(5), atomic.LoadInt32(&count))
listeners = service.getTopicListeners("test")

View File

@@ -10,7 +10,6 @@ import (
"fmt"
"sync"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/services/remotecluster"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
@@ -159,7 +158,7 @@ func (scs *Service) sendAttachmentForRemote(fi *model.FileInfo, post *model.Post
// onReceiveUploadCreate is called when a message requesting to create an upload session is received. An upload session is
// created and the id returned in the response.
func (scs *Service) onReceiveUploadCreate(_ *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
func (scs *Service) onReceiveUploadCreate(msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
var us model.UploadSession
if err := json.Unmarshal(msg.Payload, &us); err != nil {

View File

@@ -111,7 +111,7 @@ func combineErrors(err error, serror string) string {
return sb.String()
}
func (scs *Service) onReceiveChannelInvite(c *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, _ *remotecluster.Response) error {
func (scs *Service) onReceiveChannelInvite(msg model.RemoteClusterMsg, rc *model.RemoteCluster, _ *remotecluster.Response) error {
if len(msg.Payload) == 0 {
return nil
}
@@ -132,7 +132,7 @@ func (scs *Service) onReceiveChannelInvite(c *request.Context, msg model.RemoteC
// create channel if it doesn't exist; the channel may already exist, such as if it was shared then unshared at some point.
channel, err := scs.server.GetStore().Channel().Get(invite.ChannelId, true)
if err != nil {
if channel, err = scs.handleChannelCreation(c, invite, rc); err != nil {
if channel, err = scs.handleChannelCreation(invite, rc); err != nil {
return err
}
}
@@ -179,9 +179,9 @@ func (scs *Service) onReceiveChannelInvite(c *request.Context, msg model.RemoteC
return nil
}
func (scs *Service) handleChannelCreation(c *request.Context, invite channelInviteMsg, rc *model.RemoteCluster) (*model.Channel, error) {
func (scs *Service) handleChannelCreation(invite channelInviteMsg, rc *model.RemoteCluster) (*model.Channel, error) {
if invite.Type == model.CHANNEL_DIRECT {
return scs.createDirectChannel(c, invite)
return scs.createDirectChannel(invite)
}
channelNew := &model.Channel{
@@ -197,7 +197,7 @@ func (scs *Service) handleChannelCreation(c *request.Context, invite channelInvi
}
// check user perms?
channel, appErr := scs.app.CreateChannelWithUser(c, channelNew, rc.CreatorId)
channel, appErr := scs.app.CreateChannelWithUser(request.EmptyContext(), channelNew, rc.CreatorId)
if appErr != nil {
return nil, fmt.Errorf("cannot create channel `%s`: %w", invite.ChannelId, appErr)
}
@@ -205,12 +205,12 @@ func (scs *Service) handleChannelCreation(c *request.Context, invite channelInvi
return channel, nil
}
func (scs *Service) createDirectChannel(c *request.Context, invite channelInviteMsg) (*model.Channel, error) {
func (scs *Service) createDirectChannel(invite channelInviteMsg) (*model.Channel, error) {
if len(invite.DirectParticipantIDs) != 2 {
return nil, fmt.Errorf("cannot create direct channel `%s` insufficient participant count `%d`", invite.ChannelId, len(invite.DirectParticipantIDs))
}
channel, err := scs.app.GetOrCreateDirectChannel(c, invite.DirectParticipantIDs[0], invite.DirectParticipantIDs[1], model.WithID(invite.ChannelId))
channel, err := scs.app.GetOrCreateDirectChannel(request.EmptyContext(), invite.DirectParticipantIDs[0], invite.DirectParticipantIDs[1], model.WithID(invite.ChannelId))
if err != nil {
return nil, fmt.Errorf("cannot create direct channel `%s`: %w", invite.ChannelId, err)
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/plugin/plugintest/mock"
"github.com/mattermost/mattermost-server/v5/shared/mlog"
@@ -27,8 +26,6 @@ type mockLogger struct {
func (ml *mockLogger) Log(level mlog.LogLevel, s string, flds ...mlog.Field) {}
func TestOnReceiveChannelInvite(t *testing.T) {
c := request.EmptyContext()
t.Run("when msg payload is empty, it does nothing", func(t *testing.T) {
mockServer := &MockServerIface{}
mockLogger := &mockLogger{}
@@ -46,7 +43,7 @@ func TestOnReceiveChannelInvite(t *testing.T) {
remoteCluster := &model.RemoteCluster{}
msg := model.RemoteClusterMsg{}
err := scs.onReceiveChannelInvite(c, msg, remoteCluster, nil)
err := scs.onReceiveChannelInvite(msg, remoteCluster, nil)
require.NoError(t, err)
mockStore.AssertNotCalled(t, "Channel")
})
@@ -107,7 +104,7 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockApp.On("PatchChannelModerationsForChannel", channel, readonlyChannelModerations).Return(nil, nil)
defer mockApp.AssertExpectations(t)
err = scs.onReceiveChannelInvite(c, msg, remoteCluster, nil)
err = scs.onReceiveChannelInvite(msg, remoteCluster, nil)
require.NoError(t, err)
})
@@ -148,7 +145,7 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockApp.On("PatchChannelModerationsForChannel", channel, mock.Anything).Return(nil, appErr)
defer mockApp.AssertExpectations(t)
err = scs.onReceiveChannelInvite(c, msg, remoteCluster, nil)
err = scs.onReceiveChannelInvite(msg, remoteCluster, nil)
require.Error(t, err)
assert.Equal(t, fmt.Sprintf("cannot make channel readonly `%s`: foo: bar, boom", invitation.ChannelId), err.Error())
})
@@ -194,7 +191,7 @@ func TestOnReceiveChannelInvite(t *testing.T) {
mockApp.On("GetOrCreateDirectChannel", mock.AnythingOfType("*request.Context"), invitation.DirectParticipantIDs[0], invitation.DirectParticipantIDs[1], mock.AnythingOfType("model.ChannelOption")).Return(channel, nil)
defer mockApp.AssertExpectations(t)
err = scs.onReceiveChannelInvite(c, msg, remoteCluster, nil)
err = scs.onReceiveChannelInvite(msg, remoteCluster, nil)
require.NoError(t, err)
})
}

View File

@@ -17,7 +17,7 @@ import (
"github.com/mattermost/mattermost-server/v5/shared/mlog"
)
func (scs *Service) onReceiveSyncMessage(c *request.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
func (scs *Service) onReceiveSyncMessage(msg model.RemoteClusterMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
if msg.Topic != TopicSync {
return fmt.Errorf("wrong topic, expected `%s`, got `%s`", TopicSync, msg.Topic)
}
@@ -44,10 +44,10 @@ func (scs *Service) onReceiveSyncMessage(c *request.Context, msg model.RemoteClu
mlog.Int("sync_msg_count", len(syncMessages)),
)
return scs.processSyncMessages(c, syncMessages, rc, response)
return scs.processSyncMessages(syncMessages, rc, response)
}
func (scs *Service) processSyncMessages(c *request.Context, syncMessages []syncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
func (scs *Service) processSyncMessages(syncMessages []syncMsg, rc *model.RemoteCluster, response *remotecluster.Response) error {
var channel *model.Channel
var team *model.Team
@@ -74,7 +74,7 @@ func (scs *Service) processSyncMessages(c *request.Context, syncMessages []syncM
// add/update users before posts
for _, user := range sm.Users {
if userSaved, err := scs.upsertSyncUser(c, user, channel, rc); err != nil {
if userSaved, err := scs.upsertSyncUser(user, channel, rc); err != nil {
scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error upserting sync user",
mlog.String("post_id", sm.PostId),
mlog.String("channel_id", sm.ChannelId),
@@ -121,7 +121,7 @@ func (scs *Service) processSyncMessages(c *request.Context, syncMessages []syncM
}
// add/update post
rpost, err := scs.upsertSyncPost(c, sm.Post, channel, rc)
rpost, err := scs.upsertSyncPost(sm.Post, channel, rc)
if err != nil {
postErrors = append(postErrors, sm.Post.Id)
scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error upserting sync post",
@@ -136,7 +136,7 @@ func (scs *Service) processSyncMessages(c *request.Context, syncMessages []syncM
// add/remove reactions
for _, reaction := range sm.Reactions {
if _, err := scs.upsertSyncReaction(c, reaction, rc); err != nil {
if _, err := scs.upsertSyncReaction(reaction, rc); err != nil {
scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error upserting sync reaction",
mlog.String("user_id", reaction.UserId),
mlog.String("post_id", reaction.PostId),
@@ -170,7 +170,7 @@ func (scs *Service) processSyncMessages(c *request.Context, syncMessages []syncM
return nil
}
func (scs *Service) upsertSyncUser(c *request.Context, user *model.User, channel *model.Channel, rc *model.RemoteCluster) (*model.User, error) {
func (scs *Service) upsertSyncUser(user *model.User, channel *model.Channel, rc *model.RemoteCluster) (*model.User, error) {
var err error
if user.RemoteId == nil || *user.RemoteId == "" {
user.RemoteId = model.NewString(rc.RemoteId)
@@ -213,7 +213,7 @@ func (scs *Service) upsertSyncUser(c *request.Context, user *model.User, channel
// Instead of undoing what succeeded on any failure we simply do all steps each
// time. AddUserToChannel & AddUserToTeamByTeamId do not error if user was already
// added and exit quickly.
if err := scs.app.AddUserToTeamByTeamId(c, channel.TeamId, userSaved); err != nil {
if err := scs.app.AddUserToTeamByTeamId(request.EmptyContext(), channel.TeamId, userSaved); err != nil {
return nil, fmt.Errorf("error adding sync user to Team: %w", err)
}
@@ -330,7 +330,7 @@ func (scs *Service) updateSyncUser(patch *model.UserPatch, user *model.User, cha
return nil, fmt.Errorf("error updating sync user %s: %w", user.Id, err)
}
func (scs *Service) upsertSyncPost(c *request.Context, post *model.Post, channel *model.Channel, rc *model.RemoteCluster) (*model.Post, error) {
func (scs *Service) upsertSyncPost(post *model.Post, channel *model.Channel, rc *model.RemoteCluster) (*model.Post, error) {
var appErr *model.AppError
post.RemoteId = model.NewString(rc.RemoteId)
@@ -344,7 +344,7 @@ func (scs *Service) upsertSyncPost(c *request.Context, post *model.Post, channel
if rpost == nil {
// post doesn't exist; create new one
rpost, appErr = scs.app.CreatePost(c, post, channel, true, true)
rpost, appErr = scs.app.CreatePost(request.EmptyContext(), post, channel, true, true)
scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceDebug, "Created sync post",
mlog.String("post_id", post.Id),
mlog.String("channel_id", post.ChannelId),
@@ -358,7 +358,7 @@ func (scs *Service) upsertSyncPost(c *request.Context, post *model.Post, channel
)
} else if post.EditAt > rpost.EditAt || post.Message != rpost.Message {
// update post
rpost, appErr = scs.app.UpdatePost(c, post, false)
rpost, appErr = scs.app.UpdatePost(request.EmptyContext(), post, false)
scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceDebug, "Updated sync post",
mlog.String("post_id", post.Id),
mlog.String("channel_id", post.ChannelId),
@@ -378,16 +378,16 @@ func (scs *Service) upsertSyncPost(c *request.Context, post *model.Post, channel
return rpost, rerr
}
func (scs *Service) upsertSyncReaction(c *request.Context, reaction *model.Reaction, rc *model.RemoteCluster) (*model.Reaction, error) {
func (scs *Service) upsertSyncReaction(reaction *model.Reaction, rc *model.RemoteCluster) (*model.Reaction, error) {
savedReaction := reaction
var appErr *model.AppError
reaction.RemoteId = model.NewString(rc.RemoteId)
if reaction.DeleteAt == 0 {
savedReaction, appErr = scs.app.SaveReactionForPost(c, reaction)
savedReaction, appErr = scs.app.SaveReactionForPost(request.EmptyContext(), reaction)
} else {
appErr = scs.app.DeleteReactionForPost(c, reaction)
appErr = scs.app.DeleteReactionForPost(request.EmptyContext(), reaction)
}
var err error

View File

@@ -10,7 +10,6 @@ import (
"sync"
"time"
"github.com/mattermost/mattermost-server/v5/app/request"
"github.com/mattermost/mattermost-server/v5/model"
"github.com/mattermost/mattermost-server/v5/services/remotecluster"
"github.com/mattermost/mattermost-server/v5/shared/i18n"
@@ -141,7 +140,6 @@ func stopTimer(timer *time.Timer) {
// doSync checks the task queue for any tasks to be processed and processes all that are ready.
// If any delayed tasks remain in queue then the duration until the next scheduled task is returned.
func (scs *Service) doSync() time.Duration {
c := request.EmptyContext() // TODO: check this
var task syncTask
var ok bool
var shortestWait time.Duration
@@ -151,7 +149,7 @@ func (scs *Service) doSync() time.Duration {
if !ok {
break
}
if err := scs.processTask(c, task); err != nil {
if err := scs.processTask(task); err != nil {
// put task back into map so it will update again
if task.incRetry() {
scs.addTask(task)
@@ -202,7 +200,7 @@ func (scs *Service) removeOldestTask() (syncTask, bool, time.Duration) {
}
// processTask updates one or more remote clusters with any new channel content.
func (scs *Service) processTask(c *request.Context, task syncTask) error {
func (scs *Service) processTask(task syncTask) error {
var err error
var remotes []*model.RemoteCluster