diff --git a/Makefile b/Makefile index 60bc5d99b8..dc079853ab 100644 --- a/Makefile +++ b/Makefile @@ -374,7 +374,7 @@ test-server-race: test-server-pre endif test-server: test-server-pre - ./scripts/test.sh "$(GO)" "$(GOFLAGS)" "$(ALL_PACKAGES)" "$(TESTS)" "$(TESTFLAGS)" "$(GOBIN)" "20m" "count" + ./scripts/test.sh "$(GO)" "$(GOFLAGS)" "$(ALL_PACKAGES)" "$(TESTS)" "$(TESTFLAGS)" "$(GOBIN)" "45m" "count" ifneq ($(IS_CI),true) ifneq ($(MM_NO_DOCKER),true) ifneq ($(TEMP_DOCKER_SERVICES),) diff --git a/api4/remote_cluster.go b/api4/remote_cluster.go index 6add7d8431..0b667e60b4 100644 --- a/api4/remote_cluster.go +++ b/api4/remote_cluster.go @@ -5,6 +5,8 @@ package api4 import ( "encoding/json" + "io" + "io/ioutil" "net/http" "time" @@ -18,6 +20,7 @@ func (api *API) InitRemoteCluster() { api.BaseRoutes.RemoteCluster.Handle("/msg", api.RemoteClusterTokenRequired(remoteClusterAcceptMessage)).Methods("POST") api.BaseRoutes.RemoteCluster.Handle("/confirm_invite", api.RemoteClusterTokenRequired(remoteClusterConfirmInvite)).Methods("POST") api.BaseRoutes.RemoteCluster.Handle("/upload/{upload_id:[A-Za-z0-9]+}", api.RemoteClusterTokenRequired(uploadRemoteData)).Methods("POST") + api.BaseRoutes.RemoteCluster.Handle("/{user_id:[A-Za-z0-9]+}/image", api.RemoteClusterTokenRequired(remoteSetProfileImage)).Methods("POST") } func remoteClusterPing(c *Context, w http.ResponseWriter, r *http.Request) { @@ -212,3 +215,63 @@ func uploadRemoteData(c *Context, w http.ResponseWriter, r *http.Request) { w.Write([]byte(info.ToJson())) } + +func remoteSetProfileImage(c *Context, w http.ResponseWriter, r *http.Request) { + defer io.Copy(ioutil.Discard, r.Body) + + c.RequireUserId() + if c.Err != nil { + return + } + + if *c.App.Config().FileSettings.DriverName == "" { + c.Err = model.NewAppError("remoteUploadProfileImage", "api.user.upload_profile_user.storage.app_error", nil, "", http.StatusNotImplemented) + return + } + + if r.ContentLength > *c.App.Config().FileSettings.MaxFileSize { + c.Err = model.NewAppError("remoteUploadProfileImage", "api.user.upload_profile_user.too_large.app_error", nil, "", http.StatusRequestEntityTooLarge) + return + } + + if err := r.ParseMultipartForm(*c.App.Config().FileSettings.MaxFileSize); err != nil { + c.Err = model.NewAppError("remoteUploadProfileImage", "api.user.upload_profile_user.parse.app_error", nil, err.Error(), http.StatusInternalServerError) + return + } + + m := r.MultipartForm + imageArray, ok := m.File["image"] + if !ok { + c.Err = model.NewAppError("remoteUploadProfileImage", "api.user.upload_profile_user.no_file.app_error", nil, "", http.StatusBadRequest) + return + } + + if len(imageArray) == 0 { + c.Err = model.NewAppError("remoteUploadProfileImage", "api.user.upload_profile_user.array.app_error", nil, "", http.StatusBadRequest) + return + } + + auditRec := c.MakeAuditRecord("remoteUploadProfileImage", audit.Fail) + defer c.LogAuditRec(auditRec) + if imageArray[0] != nil { + auditRec.AddMeta("filename", imageArray[0].Filename) + } + + user, err := c.App.GetUser(c.Params.UserId) + if err != nil || !user.IsRemote() { + c.SetInvalidUrlParam("user_id") + return + } + auditRec.AddMeta("user", user) + + imageData := imageArray[0] + if err := c.App.SetProfileImage(c.Params.UserId, imageData); err != nil { + c.Err = err + return + } + + auditRec.Success() + c.LogAudit("") + + ReturnStatusOK(w) +} diff --git a/model/user.go b/model/user.go index 73d506a24f..2ae62bae8a 100644 --- a/model/user.go +++ b/model/user.go @@ -752,6 +752,14 @@ func (u *User) IsRemote() bool { return u.RemoteId != nil && *u.RemoteId != "" } +// GetRemoteID returns the remote id for this user or "" if not a remote user. +func (u *User) GetRemoteID() string { + if u.RemoteId != nil { + return *u.RemoteId + } + return "" +} + // GetProp fetches a prop value by name. func (u *User) GetProp(name string) (string, bool) { val, ok := u.Props[name] diff --git a/services/remotecluster/mocks_test.go b/services/remotecluster/mocks_test.go index 8f02fc1ab6..5f722ae355 100644 --- a/services/remotecluster/mocks_test.go +++ b/services/remotecluster/mocks_test.go @@ -4,6 +4,7 @@ package remotecluster import ( + "context" "fmt" "strings" "sync" @@ -22,6 +23,7 @@ import ( type mockServer struct { remotes []*model.RemoteCluster logger *mockLogger + user *model.User } func newMockServer(t *testing.T, remotes []*model.RemoteCluster) *mockServer { @@ -31,6 +33,10 @@ func newMockServer(t *testing.T, remotes []*model.RemoteCluster) *mockServer { } } +func (ms *mockServer) SetUser(user *model.User) { + ms.user = user +} + func (ms *mockServer) Config() *model.Config { return nil } func (ms *mockServer) GetMetrics() einterfaces.MetricsInterface { return nil } func (ms *mockServer) IsLeader() bool { return true } @@ -40,16 +46,21 @@ func (ms *mockServer) GetLogger() mlog.LoggerIFace { return ms.logger } func (ms *mockServer) GetStore() store.Store { - anyFilter := mock.MatchedBy(func(filter model.RemoteClusterQueryFilter) bool { + anyQueryFilter := mock.MatchedBy(func(filter model.RemoteClusterQueryFilter) bool { return true }) + anyUserId := mock.AnythingOfType("string") remoteClusterStoreMock := &mocks.RemoteClusterStore{} remoteClusterStoreMock.On("GetByTopic", "share").Return(ms.remotes, nil) - remoteClusterStoreMock.On("GetAll", anyFilter).Return(ms.remotes, nil) + remoteClusterStoreMock.On("GetAll", anyQueryFilter).Return(ms.remotes, nil) + + userStoreMock := &mocks.UserStore{} + userStoreMock.On("Get", context.Background(), anyUserId).Return(ms.user, nil) storeMock := &mocks.Store{} storeMock.On("RemoteCluster").Return(remoteClusterStoreMock) + storeMock.On("User").Return(userStoreMock) return storeMock } func (ms *mockServer) Shutdown() { ms.logger.Shutdown() } diff --git a/services/remotecluster/send.go b/services/remotecluster/send.go index df68f97b00..2e2abae00e 100644 --- a/services/remotecluster/send.go +++ b/services/remotecluster/send.go @@ -48,6 +48,8 @@ func (rcs *Service) sendLoop(idx int, done chan struct{}) { rcs.sendMsg(task) case sendFileTask: rcs.sendFile(task) + case sendProfileImageTask: + rcs.sendProfileImage(task) } case <-done: return diff --git a/services/remotecluster/sendfile.go b/services/remotecluster/sendfile.go index 7d13e2ed7b..443835f87d 100644 --- a/services/remotecluster/sendfile.go +++ b/services/remotecluster/sendfile.go @@ -35,9 +35,9 @@ type ReaderProvider interface { // SendFile asynchronously sends a file to a remote cluster. // // `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a -// BufferFullError if the file cannot be enqueued before the timeout. A background context will block indefinitely. +// BufferFullError if the task cannot be enqueued before the timeout. A background context will block indefinitely. // -// Nil or error return indicates success or failure of file enqueue only. +// Nil or error return indicates success or failure of task enqueue only. // // An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the // callback is regarding file delivery only. The `resp` contains the decoded bytes returned from the remote. @@ -55,17 +55,6 @@ func (rcs *Service) SendFile(ctx context.Context, us *model.UploadSession, fi *m // sendFile is called when a sendFileTask is popped from the send channel. func (rcs *Service) sendFile(task sendFileTask) { - // Ensure a panic from the callback does not exit the goroutine. - defer func() { - if r := recover(); r != nil { - rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceError, "Remote Cluster sendFile panic", - mlog.String("remote", task.rc.DisplayName), - mlog.String("uploadId", task.us.Id), - mlog.Any("panic", r), - ) - } - }() - fi, err := rcs.sendFileToRemote(SendTimeout, task) var response Response diff --git a/services/remotecluster/sendmsg.go b/services/remotecluster/sendmsg.go index ea7dda9137..81108d2d5b 100644 --- a/services/remotecluster/sendmsg.go +++ b/services/remotecluster/sendmsg.go @@ -82,13 +82,6 @@ func (rcs *Service) sendMsg(task sendMsgTask) { // Ensure a panic from the callback does not exit the pool goroutine. defer func() { - if r := recover(); r != nil { - rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceError, "Remote Cluster sendMsg panic", - mlog.String("remote", task.rc.DisplayName), - mlog.String("msgId", task.msg.Id), mlog.Any("panic", r), - ) - } - if errResp != nil { response.Err = errResp.Error() } diff --git a/services/remotecluster/sendprofileImage.go b/services/remotecluster/sendprofileImage.go new file mode 100644 index 0000000000..d4558f47ed --- /dev/null +++ b/services/remotecluster/sendprofileImage.go @@ -0,0 +1,146 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package remotecluster + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/url" + "path" + "time" + + "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/shared/mlog" +) + +type SendProfileImageResultFunc func(userId string, rc *model.RemoteCluster, resp *Response, err error) + +type sendProfileImageTask struct { + rc *model.RemoteCluster + userID string + provider ProfileImageProvider + f SendProfileImageResultFunc +} + +type ProfileImageProvider interface { + GetProfileImage(user *model.User) ([]byte, bool, *model.AppError) +} + +// SendProfileImage asynchronously sends a user's profile image to a remote cluster. +// +// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a +// BufferFullError if the task cannot be enqueued before the timeout. A background context will block indefinitely. +// +// Nil or error return indicates success or failure of task enqueue only. +// +// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the +// callback is regarding image delivery only. The `resp` contains the decoded bytes returned from the remote. +// If a callback is provided it should return quickly. +func (rcs *Service) SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error { + task := sendProfileImageTask{ + rc: rc, + userID: userID, + provider: provider, + f: f, + } + return rcs.enqueueTask(ctx, rc.RemoteId, task) +} + +// sendProfileImage is called when a sendProfileImageTask is popped from the send channel. +func (rcs *Service) sendProfileImage(task sendProfileImageTask) { + err := rcs.sendProfileImageToRemote(SendTimeout, task) + var response Response + + if err != nil { + rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceError, "Remote Cluster send profile image failed", + mlog.String("remote", task.rc.DisplayName), + mlog.String("UserId", task.userID), + mlog.Err(err), + ) + response.Status = ResponseStatusFail + response.Err = err.Error() + } else { + rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster profile image sent successfully", + mlog.String("remote", task.rc.DisplayName), + mlog.String("UserId", task.userID), + ) + response.Status = ResponseStatusOK + } + + // If callback provided then call it with the results. + if task.f != nil { + task.f(task.userID, task.rc, &response, err) + } +} + +func (rcs *Service) sendProfileImageToRemote(timeout time.Duration, task sendProfileImageTask) error { + rcs.server.GetLogger().Log(mlog.LvlRemoteClusterServiceDebug, "sending profile image to remote...", + mlog.String("remote", task.rc.DisplayName), + mlog.String("UserId", task.userID), + ) + + user, err := rcs.server.GetStore().User().Get(context.Background(), task.userID) + if err != nil { + return fmt.Errorf("error fetching user while sending profile image to remote %s: %w", task.rc.RemoteId, err) + } + + img, _, appErr := task.provider.GetProfileImage(user) // get Reader for the file + if appErr != nil { + return fmt.Errorf("error fetching profile image for user (%s) while sending to remote %s: %w", task.userID, task.rc.RemoteId, appErr) + } + + u, err := url.Parse(task.rc.SiteURL) + if err != nil { + return fmt.Errorf("invalid siteURL while sending file to remote %s: %w", task.rc.RemoteId, err) + } + u.Path = path.Join(u.Path, model.API_URL_SUFFIX, "remotecluster", task.userID, "image") + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + + part, err := writer.CreateFormFile("image", "profile.png") + if err != nil { + return err + } + + if _, err = io.Copy(part, bytes.NewBuffer(img)); err != nil { + return err + } + + if err = writer.Close(); err != nil { + return err + } + + req, err := http.NewRequest("POST", u.String(), body) + if err != nil { + return err + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + req.Header.Set(model.HEADER_REMOTECLUSTER_ID, task.rc.RemoteId) + req.Header.Set(model.HEADER_REMOTECLUSTER_TOKEN, task.rc.RemoteToken) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + resp, err := rcs.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status) + } + return nil +} diff --git a/services/remotecluster/sendprofileImage_test.go b/services/remotecluster/sendprofileImage_test.go new file mode 100644 index 0000000000..d5680062cd --- /dev/null +++ b/services/remotecluster/sendprofileImage_test.go @@ -0,0 +1,187 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package remotecluster + +import ( + "bytes" + "image" + "image/color" + "image/png" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/mattermost/mattermost-server/v5/model" +) + +const ( + imageWidth = 128 + imageHeight = 128 +) + +func TestService_sendProfileImageToRemote(t *testing.T) { + hadPing := disablePing + disablePing = true + defer func() { disablePing = hadPing }() + + shouldError := &flag{} + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer io.Copy(ioutil.Discard, r.Body) + + if shouldError.get() { + w.WriteHeader(http.StatusInternalServerError) + resp := make(map[string]string) + resp[model.STATUS] = model.STATUS_FAIL + w.Write([]byte(model.MapToJson(resp))) + return + } + + status := model.STATUS_OK + defer func(s *string) { + if *s != model.STATUS_OK { + w.WriteHeader(http.StatusInternalServerError) + } + resp := make(map[string]string) + resp[model.STATUS] = *s + w.Write([]byte(model.MapToJson(resp))) + }(&status) + + if err := r.ParseMultipartForm(1024 * 1024); err != nil { + status = model.STATUS_FAIL + assert.Fail(t, "connect parse multipart form", err) + return + } + m := r.MultipartForm + if m == nil { + status = model.STATUS_FAIL + assert.Fail(t, "multipart form missing") + return + } + + imageArray, ok := m.File["image"] + if !ok || len(imageArray) != 1 { + status = model.STATUS_FAIL + assert.Fail(t, "image missing") + return + } + + imageData := imageArray[0] + file, err := imageData.Open() + if err != nil { + status = model.STATUS_FAIL + assert.Fail(t, "cannot open multipart form file") + return + } + defer file.Close() + + img, err := png.Decode(file) + if err != nil || imageWidth != img.Bounds().Max.X || imageHeight != img.Bounds().Max.Y { + status = model.STATUS_FAIL + assert.Fail(t, "cannot decode png", err) + return + } + })) + defer ts.Close() + + rc := makeRemoteCluster("remote_test_profile_image", ts.URL, TestTopics) + + user := &model.User{ + Id: model.NewId(), + RemoteId: model.NewString(rc.RemoteId), + } + + provider := testImageProvider{} + + mockServer := newMockServer(t, makeRemoteClusters(NumRemotes, ts.URL)) + mockServer.SetUser(user) + service, err := NewRemoteClusterService(mockServer) + require.NoError(t, err) + + err = service.Start() + require.NoError(t, err) + defer service.Shutdown() + + t.Run("Server response 200", func(t *testing.T) { + shouldError.set(false) + + resultFunc := func(userId string, rc *model.RemoteCluster, resp *Response, err error) { + assert.Equal(t, user.Id, userId, "user ids should match") + assert.NoError(t, err) + assert.True(t, resp.IsSuccess()) + } + + task := sendProfileImageTask{ + rc: rc, + userID: user.Id, + provider: provider, + f: resultFunc, + } + + err := service.sendProfileImageToRemote(time.Second*15, task) + assert.NoError(t, err, "request should not error") + }) + + t.Run("Server response 500", func(t *testing.T) { + shouldError.set(true) + + resultFunc := func(userId string, rc *model.RemoteCluster, resp *Response, err error) { + assert.Equal(t, user.Id, userId, "user ids should match") + assert.False(t, resp.IsSuccess()) + } + + task := sendProfileImageTask{ + rc: rc, + userID: user.Id, + provider: provider, + f: resultFunc, + } + + err := service.sendProfileImageToRemote(time.Second*15, task) + assert.Error(t, err, "request should error") + }) +} + +type testImageProvider struct { +} + +func (tip testImageProvider) GetProfileImage(user *model.User) ([]byte, bool, *model.AppError) { + img := image.NewRGBA(image.Rectangle{image.Point{0, 0}, image.Point{imageWidth, imageHeight}}) + red := color.RGBA{255, 50, 50, 0xff} + + for x := 0; x < imageWidth; x++ { + for y := 0; y < imageHeight; y++ { + img.Set(x, y, red) + } + } + + buf := &bytes.Buffer{} + png.Encode(buf, img) + + return buf.Bytes(), true, nil +} + +type flag struct { + mux sync.RWMutex + b bool +} + +func (f *flag) get() bool { + f.mux.RLock() + defer f.mux.RUnlock() + return f.b +} + +func (f *flag) set(b bool) { + f.mux.Lock() + defer f.mux.Unlock() + f.b = b +} diff --git a/services/remotecluster/service.go b/services/remotecluster/service.go index 981886bd69..d2a6e15d7a 100644 --- a/services/remotecluster/service.go +++ b/services/remotecluster/service.go @@ -63,6 +63,7 @@ type RemoteClusterServiceIFace interface { RemoveConnectionStateListener(listenerId string) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error SendFile(ctx context.Context, us *model.UploadSession, fi *model.FileInfo, rc *model.RemoteCluster, rp ReaderProvider, f SendFileResultFunc) error + SendProfileImage(ctx context.Context, userID string, rc *model.RemoteCluster, provider ProfileImageProvider, f SendProfileImageResultFunc) error AcceptInvitation(invite *model.RemoteClusterInvite, name string, displayName string, creatorId string, teamId string, siteURL string) (*model.RemoteCluster, error) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response } diff --git a/services/sharedchannel/mock_AppIface_test.go b/services/sharedchannel/mock_AppIface_test.go index 565a9c6833..1420f6c9a6 100644 --- a/services/sharedchannel/mock_AppIface_test.go +++ b/services/sharedchannel/mock_AppIface_test.go @@ -230,6 +230,38 @@ func (_m *MockAppIface) GetOrCreateDirectChannel(userId string, otherUserId stri return r0, r1 } +// GetProfileImage provides a mock function with given fields: user +func (_m *MockAppIface) GetProfileImage(user *model.User) ([]byte, bool, *model.AppError) { + ret := _m.Called(user) + + var r0 []byte + if rf, ok := ret.Get(0).(func(*model.User) []byte); ok { + r0 = rf(user) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + var r1 bool + if rf, ok := ret.Get(1).(func(*model.User) bool); ok { + r1 = rf(user) + } else { + r1 = ret.Get(1).(bool) + } + + var r2 *model.AppError + if rf, ok := ret.Get(2).(func(*model.User) *model.AppError); ok { + r2 = rf(user) + } else { + if ret.Get(2) != nil { + r2 = ret.Get(2).(*model.AppError) + } + } + + return r0, r1, r2 +} + // InvalidateCacheForUser provides a mock function with given fields: userID func (_m *MockAppIface) InvalidateCacheForUser(userID string) { _m.Called(userID) diff --git a/services/sharedchannel/msg.go b/services/sharedchannel/msg.go index ce7f8319f6..b7eb92b05f 100644 --- a/services/sharedchannel/msg.go +++ b/services/sharedchannel/msg.go @@ -7,8 +7,10 @@ import ( "context" "encoding/json" "strings" + "time" "github.com/mattermost/mattermost-server/v5/model" + "github.com/mattermost/mattermost-server/v5/services/remotecluster" "github.com/mattermost/mattermost-server/v5/shared/mlog" ) @@ -177,26 +179,35 @@ func (scs *Service) usersForPost(post *model.Post, reactions []*model.Reaction, for _, id := range userIds { user, err := scs.server.GetStore().User().Get(context.Background(), id) - if err == nil { - if sync, err2 := scs.shouldUserSync(user, channelID, rc); err2 != nil { - scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Could not find user for post", - mlog.String("user_id", id), - mlog.Err(err2), - ) - continue - } else if sync { - users = append(users, sanitizeUserForSync(user)) - } - // if this was a mention then put the real username in place of the username+remotename, but only - // when sending to the remote that the user belongs to. - if user.RemoteId != nil && *user.RemoteId == rc.RemoteId { - fixMention(post, mentionMap, user) - } - } else { + if err != nil { scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error checking if user should sync", mlog.String("user_id", id), mlog.Err(err), ) + continue + } + + sync, syncImage, err2 := scs.shouldUserSync(user, channelID, rc) + if err2 != nil { + scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Could not find user for post", + mlog.String("user_id", id), + mlog.Err(err2), + ) + continue + } + + if sync { + users = append(users, sanitizeUserForSync(user)) + } + + if syncImage { + scs.syncProfileImage(user, channelID, rc) + } + + // if this was a mention then put the real username in place of the username+remotename, but only + // when sending to the remote that the user belongs to. + if user.RemoteId != nil && *user.RemoteId == rc.RemoteId { + fixMention(post, mentionMap, user) } } return users @@ -240,16 +251,16 @@ func sanitizeUserForSync(user *model.User) *model.User { // shouldUserSync determines if a user needs to be synchronized. // User should be synchronized if it has no entry in the SharedChannelUsers table for the specified channel, // or there is an entry but the LastSyncAt is less than user.UpdateAt -func (scs *Service) shouldUserSync(user *model.User, channelID string, rc *model.RemoteCluster) (bool, error) { +func (scs *Service) shouldUserSync(user *model.User, channelID string, rc *model.RemoteCluster) (sync bool, syncImage bool, err error) { // don't sync users with the remote they originated from. if user.RemoteId != nil && *user.RemoteId == rc.RemoteId { - return false, nil + return false, false, nil } scu, err := scs.server.GetStore().SharedChannel().GetUser(user.Id, channelID, rc.RemoteId) if err != nil { if _, ok := err.(errNotFound); !ok { - return false, err + return false, false, err } // user not in the SharedChannelUsers table, so we must add them. @@ -266,8 +277,52 @@ func (scs *Service) shouldUserSync(user *model.User, channelID string, rc *model mlog.Err(err), ) } - } else if scu.LastSyncAt >= user.UpdateAt { - return false, nil + return true, true, nil } - return true, nil + + return user.UpdateAt > scu.LastSyncAt, user.LastPictureUpdate > scu.LastSyncAt, nil +} + +func (scs *Service) syncProfileImage(user *model.User, channelID string, rc *model.RemoteCluster) { + rcs := scs.server.GetRemoteClusterService() + if rcs == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + rcs.SendProfileImage(ctx, user.Id, rc, scs.app, func(userId string, rc *model.RemoteCluster, resp *remotecluster.Response, err error) { + if resp.IsSuccess() { + scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceDebug, "Users profile image synchronized", + mlog.String("remote_id", rc.RemoteId), + mlog.String("user_id", user.Id), + ) + + scu, err := scs.server.GetStore().SharedChannel().GetUser(user.Id, channelID, rc.RemoteId) + if err != nil { + scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error fetching shared channel user while updating users LastSyncTime after profile image update", + mlog.String("remote_id", rc.RemoteId), + mlog.String("user_id", user.Id), + mlog.String("channel_id", channelID), + mlog.Err(err), + ) + } + + if err = scs.server.GetStore().SharedChannel().UpdateUserLastSyncAt(scu.Id, model.GetMillis()); err != nil { + scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error updating users LastSyncTime after profile image update", + mlog.String("remote_id", rc.RemoteId), + mlog.String("user_id", user.Id), + mlog.Err(err), + ) + } + return + } + + scs.server.GetLogger().Log(mlog.LvlSharedChannelServiceError, "Error synchronizing users profile image", + mlog.String("remote_id", rc.RemoteId), + mlog.String("user_id", user.Id), + mlog.String("Err", resp.Err), + ) + }) } diff --git a/services/sharedchannel/service.go b/services/sharedchannel/service.go index f5314bf519..276581aa31 100644 --- a/services/sharedchannel/service.go +++ b/services/sharedchannel/service.go @@ -57,6 +57,7 @@ type AppIface interface { CreateUploadSession(us *model.UploadSession) (*model.UploadSession, *model.AppError) FileReader(path string) (filestore.ReadCloseSeeker, *model.AppError) MentionsToTeamMembers(message, teamID string) model.UserMentionMap + GetProfileImage(user *model.User) ([]byte, bool, *model.AppError) InvalidateCacheForUser(userID string) NotifySharedChannelUserUpdate(user *model.User) }