diff --git a/server/einterfaces/metrics.go b/server/einterfaces/metrics.go index 99b6a98552..1e2cf9aa49 100644 --- a/server/einterfaces/metrics.go +++ b/server/einterfaces/metrics.go @@ -79,6 +79,14 @@ type MetricsInterface interface { ObserveRemoteClusterClockSkew(remoteID string, skew float64) IncrementRemoteClusterConnStateChangeCounter(remoteID string, online bool) + IncrementSharedChannelsSyncCounter(remoteID string) + ObserveSharedChannelsTaskInQueueDuration(elapsed float64) + ObserveSharedChannelsQueueSize(size int64) + ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) + ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) + ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) + ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) + IncrementJobActive(jobType string) DecrementJobActive(jobType string) diff --git a/server/einterfaces/mocks/MetricsInterface.go b/server/einterfaces/mocks/MetricsInterface.go index a512115e40..7c50f2f03e 100644 --- a/server/einterfaces/mocks/MetricsInterface.go +++ b/server/einterfaces/mocks/MetricsInterface.go @@ -204,6 +204,11 @@ func (_m *MetricsInterface) IncrementRemoteClusterMsgSentCounter(remoteID string _m.Called(remoteID) } +// IncrementSharedChannelsSyncCounter provides a mock function with given fields: remoteID +func (_m *MetricsInterface) IncrementSharedChannelsSyncCounter(remoteID string) { + _m.Called(remoteID) +} + // IncrementUserIndexCounter provides a mock function with given fields: func (_m *MetricsInterface) IncrementUserIndexCounter() { _m.Called() @@ -294,6 +299,36 @@ func (_m *MetricsInterface) ObserveRemoteClusterPingDuration(remoteID string, el _m.Called(remoteID, elapsed) } +// ObserveSharedChannelsQueueSize provides a mock function with given fields: size +func (_m *MetricsInterface) ObserveSharedChannelsQueueSize(size int64) { + _m.Called(size) +} + +// ObserveSharedChannelsSyncCollectionDuration provides a mock function with given fields: remoteID, elapsed +func (_m *MetricsInterface) ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) { + _m.Called(remoteID, elapsed) +} + +// ObserveSharedChannelsSyncCollectionStepDuration provides a mock function with given fields: remoteID, step, elapsed +func (_m *MetricsInterface) ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) { + _m.Called(remoteID, step, elapsed) +} + +// ObserveSharedChannelsSyncSendDuration provides a mock function with given fields: remoteID, elapsed +func (_m *MetricsInterface) ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) { + _m.Called(remoteID, elapsed) +} + +// ObserveSharedChannelsSyncSendStepDuration provides a mock function with given fields: remoteID, step, elapsed +func (_m *MetricsInterface) ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) { + _m.Called(remoteID, step, elapsed) +} + +// ObserveSharedChannelsTaskInQueueDuration provides a mock function with given fields: elapsed +func (_m *MetricsInterface) ObserveSharedChannelsTaskInQueueDuration(elapsed float64) { + _m.Called(elapsed) +} + // ObserveStoreMethodDuration provides a mock function with given fields: method, success, elapsed func (_m *MetricsInterface) ObserveStoreMethodDuration(method string, success string, elapsed float64) { _m.Called(method, success, elapsed) diff --git a/server/enterprise/metrics/metrics.go b/server/enterprise/metrics/metrics.go index 7440b138e4..b1d7331c8a 100644 --- a/server/enterprise/metrics/metrics.go +++ b/server/enterprise/metrics/metrics.go @@ -38,6 +38,7 @@ const ( MetricsSubsystemSearch = "search" MetricsSubsystemLogging = "logging" MetricsSubsystemRemoteCluster = "remote_cluster" + MetricsSubsystemSharedChannels = "shared_channels" MetricsSubsystemSystem = "system" MetricsSubsystemJobs = "jobs" MetricsCloudInstallationLabel = "installationId" @@ -189,6 +190,14 @@ type MetricsInterfaceImpl struct { RemoteClusterClockSkewHistograms *prometheus.HistogramVec RemoteClusterConnStateChangeCounter *prometheus.CounterVec + SharedChannelsSyncCount *prometheus.CounterVec + SharedChannelsTaskInQueueHistogram prometheus.Histogram + SharedChannelsQueueSize prometheus.Gauge + SharedChannelsSyncCollectionHistogram *prometheus.HistogramVec + SharedChannelsSyncSendHistogram *prometheus.HistogramVec + SharedChannelsSyncCollectionStepHistogram *prometheus.HistogramVec + SharedChannelsSyncSendStepHistogram *prometheus.HistogramVec + ServerStartTime prometheus.Gauge JobsActive *prometheus.GaugeVec @@ -853,7 +862,7 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf ) m.Registry.MustRegister(m.LoggerBlockedCounters.counter) - // Remote Cluster subsystem + // Remote Cluster service m.RemoteClusterMsgSentCounters = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -927,6 +936,90 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf ) m.Registry.MustRegister(m.RemoteClusterConnStateChangeCounter) + // Shared Channel service + + m.SharedChannelsSyncCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "sync_count", + Help: "Count of sync events processed for each remote", + ConstLabels: additionalLabels, + }, + []string{"remote_id"}, + ) + m.Registry.MustRegister(m.SharedChannelsSyncCount) + + m.SharedChannelsTaskInQueueHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "task_in_queue_duration_seconds", + Help: "Duration tasks spend in queue (seconds)", + ConstLabels: additionalLabels, + }, + ) + m.Registry.MustRegister(m.SharedChannelsTaskInQueueHistogram) + + m.SharedChannelsQueueSize = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "task_queue_size", + Help: "Current number of tasks in queue", + ConstLabels: additionalLabels, + }, + ) + m.Registry.MustRegister(m.SharedChannelsQueueSize) + + m.SharedChannelsSyncCollectionHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "sync_collection_duration_seconds", + Help: "Duration tasks spend collecting sync data (seconds)", + ConstLabels: additionalLabels, + }, + []string{"remote_id"}, + ) + m.Registry.MustRegister(m.SharedChannelsSyncCollectionHistogram) + + m.SharedChannelsSyncSendHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "sync_send_duration_seconds", + Help: "Duration tasks spend sending sync data (seconds)", + ConstLabels: additionalLabels, + }, + []string{"remote_id"}, + ) + m.Registry.MustRegister(m.SharedChannelsSyncSendHistogram) + + m.SharedChannelsSyncCollectionStepHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "sync_collection_step_duration_seconds", + Help: "Duration tasks spend in each step collecting data (seconds)", + ConstLabels: additionalLabels, + }, + []string{"remote_id", "step"}, + ) + m.Registry.MustRegister(m.SharedChannelsSyncCollectionStepHistogram) + + m.SharedChannelsSyncSendStepHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: MetricsNamespace, + Subsystem: MetricsSubsystemSharedChannels, + Name: "sync_send_step_duration_seconds", + Help: "Duration tasks spend in each step sending data (seconds)", + ConstLabels: additionalLabels, + }, + []string{"remote_id", "step"}, + ) + m.Registry.MustRegister(m.SharedChannelsSyncSendStepHistogram) + m.ServerStartTime = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: MetricsNamespace, Subsystem: MetricsSubsystemSystem, @@ -1324,6 +1417,46 @@ func (mi *MetricsInterfaceImpl) IncrementRemoteClusterConnStateChangeCounter(rem }).Inc() } +func (mi *MetricsInterfaceImpl) IncrementSharedChannelsSyncCounter(remoteID string) { + mi.SharedChannelsSyncCount.With(prometheus.Labels{ + "remote_id": remoteID, + }).Inc() +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsTaskInQueueDuration(elapsed float64) { + mi.SharedChannelsTaskInQueueHistogram.Observe(elapsed) +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsQueueSize(size int64) { + mi.SharedChannelsQueueSize.Set(float64(size)) +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) { + mi.SharedChannelsSyncCollectionHistogram.With(prometheus.Labels{ + "remote_id": remoteID, + }).Observe(elapsed) +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) { + mi.SharedChannelsSyncSendHistogram.With(prometheus.Labels{ + "remote_id": remoteID, + }).Observe(elapsed) +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) { + mi.SharedChannelsSyncCollectionStepHistogram.With(prometheus.Labels{ + "remote_id": remoteID, + "step": step, + }).Observe(elapsed) +} + +func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) { + mi.SharedChannelsSyncSendStepHistogram.With(prometheus.Labels{ + "remote_id": remoteID, + "step": step, + }).Observe(elapsed) +} + // SetReplicaLagAbsolute sets the absolute replica lag for a given node. func (mi *MetricsInterfaceImpl) SetReplicaLagAbsolute(node string, value float64) { mi.DbReplicaLagGaugeAbs.With(prometheus.Labels{"node": node}).Set(value) diff --git a/server/platform/services/remotecluster/sendfile.go b/server/platform/services/remotecluster/sendfile.go index ec89f92a2a..5fdfae7a6f 100644 --- a/server/platform/services/remotecluster/sendfile.go +++ b/server/platform/services/remotecluster/sendfile.go @@ -82,6 +82,13 @@ func (rcs *Service) sendFile(task sendFileTask) { } func (rcs *Service) sendFileToRemote(timeout time.Duration, task sendFileTask) (*model.FileInfo, error) { + start := time.Now() + defer func() { + if metrics := rcs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "Attachments", time.Since(start).Seconds()) + } + }() + rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending file to remote...", mlog.String("remote", task.rc.DisplayName), mlog.String("uploadId", task.us.Id), diff --git a/server/platform/services/remotecluster/sendprofileImage.go b/server/platform/services/remotecluster/sendprofileImage.go index 3b9b98ebd3..fd495f7a41 100644 --- a/server/platform/services/remotecluster/sendprofileImage.go +++ b/server/platform/services/remotecluster/sendprofileImage.go @@ -79,6 +79,13 @@ func (rcs *Service) sendProfileImage(task sendProfileImageTask) { } func (rcs *Service) sendProfileImageToRemote(timeout time.Duration, task sendProfileImageTask) error { + start := time.Now() + defer func() { + if metrics := rcs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "ProfileImages", time.Since(start).Seconds()) + } + }() + rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending profile image to remote...", mlog.String("remote", task.rc.DisplayName), mlog.String("UserId", task.userID), diff --git a/server/platform/services/sharedchannel/mock_ServerIface_test.go b/server/platform/services/sharedchannel/mock_ServerIface_test.go index b017fb8a27..5de1ede5a9 100644 --- a/server/platform/services/sharedchannel/mock_ServerIface_test.go +++ b/server/platform/services/sharedchannel/mock_ServerIface_test.go @@ -6,6 +6,8 @@ package sharedchannel import ( mlog "github.com/mattermost/mattermost/server/public/shared/mlog" + einterfaces "github.com/mattermost/mattermost/server/v8/einterfaces" + mock "github.com/stretchr/testify/mock" model "github.com/mattermost/mattermost/server/public/model" @@ -50,6 +52,22 @@ func (_m *MockServerIface) Config() *model.Config { return r0 } +// GetMetrics provides a mock function with given fields: +func (_m *MockServerIface) GetMetrics() einterfaces.MetricsInterface { + ret := _m.Called() + + var r0 einterfaces.MetricsInterface + if rf, ok := ret.Get(0).(func() einterfaces.MetricsInterface); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(einterfaces.MetricsInterface) + } + } + + return r0 +} + // GetRemoteClusterService provides a mock function with given fields: func (_m *MockServerIface) GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace { ret := _m.Called() diff --git a/server/platform/services/sharedchannel/service.go b/server/platform/services/sharedchannel/service.go index 0c5761a822..335f9ba2e3 100644 --- a/server/platform/services/sharedchannel/service.go +++ b/server/platform/services/sharedchannel/service.go @@ -14,6 +14,7 @@ import ( "github.com/mattermost/mattermost/server/public/shared/mlog" "github.com/mattermost/mattermost/server/public/shared/request" "github.com/mattermost/mattermost/server/v8/channels/store" + "github.com/mattermost/mattermost/server/v8/einterfaces" "github.com/mattermost/mattermost/server/v8/platform/services/remotecluster" "github.com/mattermost/mattermost/server/v8/platform/shared/filestore" ) @@ -42,6 +43,7 @@ type ServerIface interface { GetStore() store.Store Log() *mlog.Logger GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace + GetMetrics() einterfaces.MetricsInterface } type PlatformIface interface { diff --git a/server/platform/services/sharedchannel/sync_send.go b/server/platform/services/sharedchannel/sync_send.go index b3f90a2bf0..3217de104f 100644 --- a/server/platform/services/sharedchannel/sync_send.go +++ b/server/platform/services/sharedchannel/sync_send.go @@ -178,12 +178,25 @@ func (scs *Service) doSync() time.Duration { var task syncTask var ok bool var shortestWait time.Duration + metrics := scs.server.GetMetrics() + + if metrics != nil { + scs.mux.Lock() + size := len(scs.tasks) + scs.mux.Unlock() + metrics.ObserveSharedChannelsQueueSize(int64(size)) + } for { task, ok, shortestWait = scs.removeOldestTask() if !ok { break } + + if metrics != nil { + metrics.ObserveSharedChannelsTaskInQueueDuration(time.Since(task.AddedAt).Seconds()) + } + if err := scs.processTask(task); err != nil { // put task back into map so it will update again if task.incRetry() { @@ -236,18 +249,22 @@ func (scs *Service) removeOldestTask() (syncTask, bool, time.Duration) { // processTask updates one or more remote clusters with any new channel content. func (scs *Service) processTask(task syncTask) error { - var err error - var remotes []*model.RemoteCluster + // map is used to ensure remotes don't get sync'd twice, such as when + // they have the autoinvited flag and have explicitly subscribed to a channel. + remotesMap := make(map[string]*model.RemoteCluster) if task.remoteID == "" { filter := model.RemoteClusterQueryFilter{ InChannel: task.channelID, OnlyConfirmed: true, } - remotes, err = scs.server.GetStore().RemoteCluster().GetAll(filter) + remotes, err := scs.server.GetStore().RemoteCluster().GetAll(filter) if err != nil { return err } + for _, r := range remotes { + remotesMap[r.RemoteId] = r + } // add all remotes that have the autoinvited option. filter = model.RemoteClusterQueryFilter{ @@ -257,7 +274,9 @@ func (scs *Service) processTask(task syncTask) error { if err != nil { return err } - remotes = append(remotes, remotesAutoInvited...) + for _, r := range remotesAutoInvited { + remotesMap[r.RemoteId] = r + } } else { rc, err := scs.server.GetStore().RemoteCluster().Get(task.remoteID) if err != nil { @@ -266,10 +285,10 @@ func (scs *Service) processTask(task syncTask) error { if !rc.IsOnline() { return fmt.Errorf("Failed updating shared channel '%s' for offline remote cluster '%s'", task.channelID, rc.DisplayName) } - remotes = []*model.RemoteCluster{rc} + remotesMap[rc.RemoteId] = rc } - for _, rc := range remotes { + for _, rc := range remotesMap { rtask := task rtask.remoteID = rc.RemoteId if err := scs.syncForRemote(rtask, rc); err != nil { diff --git a/server/platform/services/sharedchannel/sync_send_remote.go b/server/platform/services/sharedchannel/sync_send_remote.go index 379724e964..f51b59e469 100644 --- a/server/platform/services/sharedchannel/sync_send_remote.go +++ b/server/platform/services/sharedchannel/sync_send_remote.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "sync" + "time" "github.com/wiggin77/merror" @@ -78,6 +79,18 @@ func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, task.channelID) } + metrics := scs.server.GetMetrics() + + start := time.Now() + var metricsRecorded bool + defer func() { + if !metricsRecorded && metrics != nil { + metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId) + metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds()) + metricsRecorded = true + } + }() + scr, err := scs.server.GetStore().SharedChannel().GetRemoteByIds(task.channelID, rc.RemoteId) if isNotFoundError(err) && rc.IsOptionFlagSet(model.BitflagOptionAutoInvited) { // if SharedChannelRemote not found and remote has autoinvite flag, create a scr for it, thus inviting the remote. @@ -175,12 +188,25 @@ func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error mlog.Int("attachments", len(sd.attachments)), ) + if !metricsRecorded && metrics != nil { + metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId) + metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds()) + metricsRecorded = true + } + return scs.sendSyncData(sd) } // fetchUsersForSync populates the sync data with any channel users who updated their user profile // since the last sync. func (scs *Service) fetchUsersForSync(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds()) + } + }() + filter := model.GetUsersForSyncFilter{ ChannelID: sd.task.channelID, Limit: MaxUsersPerSync, @@ -212,6 +238,13 @@ func (scs *Service) fetchUsersForSync(sd *syncData) error { // fetchPostsForSync populates the sync data with any new or edited posts since the last sync. func (scs *Service) fetchPostsForSync(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds()) + } + }() + options := model.GetPostsSinceForSyncOptions{ ChannelId: sd.task.channelID, IncludeDeleted: true, @@ -288,6 +321,13 @@ func containsPost(posts []*model.Post, post *model.Post) bool { // fetchReactionsForSync populates the sync data with any new reactions since the last sync. func (scs *Service) fetchReactionsForSync(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds()) + } + }() + merr := merror.New() for _, post := range sd.posts { // any reactions originating from the remote cluster are filtered out @@ -303,6 +343,13 @@ func (scs *Service) fetchReactionsForSync(sd *syncData) error { // fetchPostUsersForSync populates the sync data with all users associated with posts. func (scs *Service) fetchPostUsersForSync(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "PostUsers", time.Since(start).Seconds()) + } + }() + sc, err := scs.server.GetStore().SharedChannel().Get(sd.task.channelID) if err != nil { return fmt.Errorf("cannot determine teamID: %w", err) @@ -367,6 +414,13 @@ func (scs *Service) fetchPostUsersForSync(sd *syncData) error { // fetchPostAttachmentsForSync populates the sync data with any file attachments for new posts. func (scs *Service) fetchPostAttachmentsForSync(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Attachments", time.Since(start).Seconds()) + } + }() + merr := merror.New() for _, post := range sd.posts { fis, err := scs.server.GetStore().FileInfo().GetForPost(post.Id, false, true, true) @@ -420,8 +474,14 @@ func (scs *Service) filterPostsForSync(sd *syncData) { // remote cluster. // The order of items sent is important: users -> attachments -> posts -> reactions -> profile images func (scs *Service) sendSyncData(sd *syncData) error { - merr := merror.New() + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendDuration(sd.rc.RemoteId, time.Since(start).Seconds()) + } + }() + merr := merror.New() sanitizeSyncData(sd) // send users @@ -462,6 +522,13 @@ func (scs *Service) sendSyncData(sd *syncData) error { // sendUserSyncData sends the collected user updates to the remote cluster. func (scs *Service) sendUserSyncData(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds()) + } + }() + msg := model.NewSyncMsg(sd.task.channelID) msg.Users = sd.users @@ -504,6 +571,13 @@ func (scs *Service) sendAttachmentSyncData(sd *syncData) { // sendPostSyncData sends the collected post updates to the remote cluster. func (scs *Service) sendPostSyncData(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds()) + } + }() + msg := model.NewSyncMsg(sd.task.channelID) msg.Posts = sd.posts @@ -525,6 +599,13 @@ func (scs *Service) sendPostSyncData(sd *syncData) error { // sendReactionSyncData sends the collected reaction updates to the remote cluster. func (scs *Service) sendReactionSyncData(sd *syncData) error { + start := time.Now() + defer func() { + if metrics := scs.server.GetMetrics(); metrics != nil { + metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds()) + } + }() + msg := model.NewSyncMsg(sd.task.channelID) msg.Reactions = sd.reactions