Metrics for shared channels (#26199)

* add metrics definitions for shared channels
This commit is contained in:
Doug Lauder 2024-02-21 17:21:35 -05:00 committed by GitHub
parent f90b3d4141
commit 38bbf04e48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 318 additions and 8 deletions

View File

@ -79,6 +79,14 @@ type MetricsInterface interface {
ObserveRemoteClusterClockSkew(remoteID string, skew float64)
IncrementRemoteClusterConnStateChangeCounter(remoteID string, online bool)
IncrementSharedChannelsSyncCounter(remoteID string)
ObserveSharedChannelsTaskInQueueDuration(elapsed float64)
ObserveSharedChannelsQueueSize(size int64)
ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64)
ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64)
ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64)
ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64)
IncrementJobActive(jobType string)
DecrementJobActive(jobType string)

View File

@ -204,6 +204,11 @@ func (_m *MetricsInterface) IncrementRemoteClusterMsgSentCounter(remoteID string
_m.Called(remoteID)
}
// IncrementSharedChannelsSyncCounter provides a mock function with given fields: remoteID
func (_m *MetricsInterface) IncrementSharedChannelsSyncCounter(remoteID string) {
_m.Called(remoteID)
}
// IncrementUserIndexCounter provides a mock function with given fields:
func (_m *MetricsInterface) IncrementUserIndexCounter() {
_m.Called()
@ -294,6 +299,36 @@ func (_m *MetricsInterface) ObserveRemoteClusterPingDuration(remoteID string, el
_m.Called(remoteID, elapsed)
}
// ObserveSharedChannelsQueueSize provides a mock function with given fields: size
func (_m *MetricsInterface) ObserveSharedChannelsQueueSize(size int64) {
_m.Called(size)
}
// ObserveSharedChannelsSyncCollectionDuration provides a mock function with given fields: remoteID, elapsed
func (_m *MetricsInterface) ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) {
_m.Called(remoteID, elapsed)
}
// ObserveSharedChannelsSyncCollectionStepDuration provides a mock function with given fields: remoteID, step, elapsed
func (_m *MetricsInterface) ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) {
_m.Called(remoteID, step, elapsed)
}
// ObserveSharedChannelsSyncSendDuration provides a mock function with given fields: remoteID, elapsed
func (_m *MetricsInterface) ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) {
_m.Called(remoteID, elapsed)
}
// ObserveSharedChannelsSyncSendStepDuration provides a mock function with given fields: remoteID, step, elapsed
func (_m *MetricsInterface) ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) {
_m.Called(remoteID, step, elapsed)
}
// ObserveSharedChannelsTaskInQueueDuration provides a mock function with given fields: elapsed
func (_m *MetricsInterface) ObserveSharedChannelsTaskInQueueDuration(elapsed float64) {
_m.Called(elapsed)
}
// ObserveStoreMethodDuration provides a mock function with given fields: method, success, elapsed
func (_m *MetricsInterface) ObserveStoreMethodDuration(method string, success string, elapsed float64) {
_m.Called(method, success, elapsed)

View File

@ -38,6 +38,7 @@ const (
MetricsSubsystemSearch = "search"
MetricsSubsystemLogging = "logging"
MetricsSubsystemRemoteCluster = "remote_cluster"
MetricsSubsystemSharedChannels = "shared_channels"
MetricsSubsystemSystem = "system"
MetricsSubsystemJobs = "jobs"
MetricsCloudInstallationLabel = "installationId"
@ -189,6 +190,14 @@ type MetricsInterfaceImpl struct {
RemoteClusterClockSkewHistograms *prometheus.HistogramVec
RemoteClusterConnStateChangeCounter *prometheus.CounterVec
SharedChannelsSyncCount *prometheus.CounterVec
SharedChannelsTaskInQueueHistogram prometheus.Histogram
SharedChannelsQueueSize prometheus.Gauge
SharedChannelsSyncCollectionHistogram *prometheus.HistogramVec
SharedChannelsSyncSendHistogram *prometheus.HistogramVec
SharedChannelsSyncCollectionStepHistogram *prometheus.HistogramVec
SharedChannelsSyncSendStepHistogram *prometheus.HistogramVec
ServerStartTime prometheus.Gauge
JobsActive *prometheus.GaugeVec
@ -853,7 +862,7 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf
)
m.Registry.MustRegister(m.LoggerBlockedCounters.counter)
// Remote Cluster subsystem
// Remote Cluster service
m.RemoteClusterMsgSentCounters = prometheus.NewCounterVec(
prometheus.CounterOpts{
@ -927,6 +936,90 @@ func New(ps *platform.PlatformService, driver, dataSource string) *MetricsInterf
)
m.Registry.MustRegister(m.RemoteClusterConnStateChangeCounter)
// Shared Channel service
m.SharedChannelsSyncCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_count",
Help: "Count of sync events processed for each remote",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCount)
m.SharedChannelsTaskInQueueHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "task_in_queue_duration_seconds",
Help: "Duration tasks spend in queue (seconds)",
ConstLabels: additionalLabels,
},
)
m.Registry.MustRegister(m.SharedChannelsTaskInQueueHistogram)
m.SharedChannelsQueueSize = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "task_queue_size",
Help: "Current number of tasks in queue",
ConstLabels: additionalLabels,
},
)
m.Registry.MustRegister(m.SharedChannelsQueueSize)
m.SharedChannelsSyncCollectionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_collection_duration_seconds",
Help: "Duration tasks spend collecting sync data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCollectionHistogram)
m.SharedChannelsSyncSendHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_send_duration_seconds",
Help: "Duration tasks spend sending sync data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id"},
)
m.Registry.MustRegister(m.SharedChannelsSyncSendHistogram)
m.SharedChannelsSyncCollectionStepHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_collection_step_duration_seconds",
Help: "Duration tasks spend in each step collecting data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id", "step"},
)
m.Registry.MustRegister(m.SharedChannelsSyncCollectionStepHistogram)
m.SharedChannelsSyncSendStepHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSharedChannels,
Name: "sync_send_step_duration_seconds",
Help: "Duration tasks spend in each step sending data (seconds)",
ConstLabels: additionalLabels,
},
[]string{"remote_id", "step"},
)
m.Registry.MustRegister(m.SharedChannelsSyncSendStepHistogram)
m.ServerStartTime = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystemSystem,
@ -1324,6 +1417,46 @@ func (mi *MetricsInterfaceImpl) IncrementRemoteClusterConnStateChangeCounter(rem
}).Inc()
}
func (mi *MetricsInterfaceImpl) IncrementSharedChannelsSyncCounter(remoteID string) {
mi.SharedChannelsSyncCount.With(prometheus.Labels{
"remote_id": remoteID,
}).Inc()
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsTaskInQueueDuration(elapsed float64) {
mi.SharedChannelsTaskInQueueHistogram.Observe(elapsed)
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsQueueSize(size int64) {
mi.SharedChannelsQueueSize.Set(float64(size))
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionDuration(remoteID string, elapsed float64) {
mi.SharedChannelsSyncCollectionHistogram.With(prometheus.Labels{
"remote_id": remoteID,
}).Observe(elapsed)
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendDuration(remoteID string, elapsed float64) {
mi.SharedChannelsSyncSendHistogram.With(prometheus.Labels{
"remote_id": remoteID,
}).Observe(elapsed)
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncCollectionStepDuration(remoteID string, step string, elapsed float64) {
mi.SharedChannelsSyncCollectionStepHistogram.With(prometheus.Labels{
"remote_id": remoteID,
"step": step,
}).Observe(elapsed)
}
func (mi *MetricsInterfaceImpl) ObserveSharedChannelsSyncSendStepDuration(remoteID string, step string, elapsed float64) {
mi.SharedChannelsSyncSendStepHistogram.With(prometheus.Labels{
"remote_id": remoteID,
"step": step,
}).Observe(elapsed)
}
// SetReplicaLagAbsolute sets the absolute replica lag for a given node.
func (mi *MetricsInterfaceImpl) SetReplicaLagAbsolute(node string, value float64) {
mi.DbReplicaLagGaugeAbs.With(prometheus.Labels{"node": node}).Set(value)

View File

@ -82,6 +82,13 @@ func (rcs *Service) sendFile(task sendFileTask) {
}
func (rcs *Service) sendFileToRemote(timeout time.Duration, task sendFileTask) (*model.FileInfo, error) {
start := time.Now()
defer func() {
if metrics := rcs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "Attachments", time.Since(start).Seconds())
}
}()
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending file to remote...",
mlog.String("remote", task.rc.DisplayName),
mlog.String("uploadId", task.us.Id),

View File

@ -79,6 +79,13 @@ func (rcs *Service) sendProfileImage(task sendProfileImageTask) {
}
func (rcs *Service) sendProfileImageToRemote(timeout time.Duration, task sendProfileImageTask) error {
start := time.Now()
defer func() {
if metrics := rcs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(task.rc.RemoteId, "ProfileImages", time.Since(start).Seconds())
}
}()
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "sending profile image to remote...",
mlog.String("remote", task.rc.DisplayName),
mlog.String("UserId", task.userID),

View File

@ -6,6 +6,8 @@ package sharedchannel
import (
mlog "github.com/mattermost/mattermost/server/public/shared/mlog"
einterfaces "github.com/mattermost/mattermost/server/v8/einterfaces"
mock "github.com/stretchr/testify/mock"
model "github.com/mattermost/mattermost/server/public/model"
@ -50,6 +52,22 @@ func (_m *MockServerIface) Config() *model.Config {
return r0
}
// GetMetrics provides a mock function with given fields:
func (_m *MockServerIface) GetMetrics() einterfaces.MetricsInterface {
ret := _m.Called()
var r0 einterfaces.MetricsInterface
if rf, ok := ret.Get(0).(func() einterfaces.MetricsInterface); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(einterfaces.MetricsInterface)
}
}
return r0
}
// GetRemoteClusterService provides a mock function with given fields:
func (_m *MockServerIface) GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace {
ret := _m.Called()

View File

@ -14,6 +14,7 @@ import (
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/store"
"github.com/mattermost/mattermost/server/v8/einterfaces"
"github.com/mattermost/mattermost/server/v8/platform/services/remotecluster"
"github.com/mattermost/mattermost/server/v8/platform/shared/filestore"
)
@ -42,6 +43,7 @@ type ServerIface interface {
GetStore() store.Store
Log() *mlog.Logger
GetRemoteClusterService() remotecluster.RemoteClusterServiceIFace
GetMetrics() einterfaces.MetricsInterface
}
type PlatformIface interface {

View File

@ -178,12 +178,25 @@ func (scs *Service) doSync() time.Duration {
var task syncTask
var ok bool
var shortestWait time.Duration
metrics := scs.server.GetMetrics()
if metrics != nil {
scs.mux.Lock()
size := len(scs.tasks)
scs.mux.Unlock()
metrics.ObserveSharedChannelsQueueSize(int64(size))
}
for {
task, ok, shortestWait = scs.removeOldestTask()
if !ok {
break
}
if metrics != nil {
metrics.ObserveSharedChannelsTaskInQueueDuration(time.Since(task.AddedAt).Seconds())
}
if err := scs.processTask(task); err != nil {
// put task back into map so it will update again
if task.incRetry() {
@ -236,18 +249,22 @@ func (scs *Service) removeOldestTask() (syncTask, bool, time.Duration) {
// processTask updates one or more remote clusters with any new channel content.
func (scs *Service) processTask(task syncTask) error {
var err error
var remotes []*model.RemoteCluster
// map is used to ensure remotes don't get sync'd twice, such as when
// they have the autoinvited flag and have explicitly subscribed to a channel.
remotesMap := make(map[string]*model.RemoteCluster)
if task.remoteID == "" {
filter := model.RemoteClusterQueryFilter{
InChannel: task.channelID,
OnlyConfirmed: true,
}
remotes, err = scs.server.GetStore().RemoteCluster().GetAll(filter)
remotes, err := scs.server.GetStore().RemoteCluster().GetAll(filter)
if err != nil {
return err
}
for _, r := range remotes {
remotesMap[r.RemoteId] = r
}
// add all remotes that have the autoinvited option.
filter = model.RemoteClusterQueryFilter{
@ -257,7 +274,9 @@ func (scs *Service) processTask(task syncTask) error {
if err != nil {
return err
}
remotes = append(remotes, remotesAutoInvited...)
for _, r := range remotesAutoInvited {
remotesMap[r.RemoteId] = r
}
} else {
rc, err := scs.server.GetStore().RemoteCluster().Get(task.remoteID)
if err != nil {
@ -266,10 +285,10 @@ func (scs *Service) processTask(task syncTask) error {
if !rc.IsOnline() {
return fmt.Errorf("Failed updating shared channel '%s' for offline remote cluster '%s'", task.channelID, rc.DisplayName)
}
remotes = []*model.RemoteCluster{rc}
remotesMap[rc.RemoteId] = rc
}
for _, rc := range remotes {
for _, rc := range remotesMap {
rtask := task
rtask.remoteID = rc.RemoteId
if err := scs.syncForRemote(rtask, rc); err != nil {

View File

@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/wiggin77/merror"
@ -78,6 +79,18 @@ func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error
return fmt.Errorf("cannot update remote cluster %s for channel id %s; Remote Cluster Service not enabled", rc.Name, task.channelID)
}
metrics := scs.server.GetMetrics()
start := time.Now()
var metricsRecorded bool
defer func() {
if !metricsRecorded && metrics != nil {
metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId)
metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds())
metricsRecorded = true
}
}()
scr, err := scs.server.GetStore().SharedChannel().GetRemoteByIds(task.channelID, rc.RemoteId)
if isNotFoundError(err) && rc.IsOptionFlagSet(model.BitflagOptionAutoInvited) {
// if SharedChannelRemote not found and remote has autoinvite flag, create a scr for it, thus inviting the remote.
@ -175,12 +188,25 @@ func (scs *Service) syncForRemote(task syncTask, rc *model.RemoteCluster) error
mlog.Int("attachments", len(sd.attachments)),
)
if !metricsRecorded && metrics != nil {
metrics.IncrementSharedChannelsSyncCounter(rc.RemoteId)
metrics.ObserveSharedChannelsSyncCollectionDuration(rc.RemoteId, time.Since(start).Seconds())
metricsRecorded = true
}
return scs.sendSyncData(sd)
}
// fetchUsersForSync populates the sync data with any channel users who updated their user profile
// since the last sync.
func (scs *Service) fetchUsersForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds())
}
}()
filter := model.GetUsersForSyncFilter{
ChannelID: sd.task.channelID,
Limit: MaxUsersPerSync,
@ -212,6 +238,13 @@ func (scs *Service) fetchUsersForSync(sd *syncData) error {
// fetchPostsForSync populates the sync data with any new or edited posts since the last sync.
func (scs *Service) fetchPostsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds())
}
}()
options := model.GetPostsSinceForSyncOptions{
ChannelId: sd.task.channelID,
IncludeDeleted: true,
@ -288,6 +321,13 @@ func containsPost(posts []*model.Post, post *model.Post) bool {
// fetchReactionsForSync populates the sync data with any new reactions since the last sync.
func (scs *Service) fetchReactionsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds())
}
}()
merr := merror.New()
for _, post := range sd.posts {
// any reactions originating from the remote cluster are filtered out
@ -303,6 +343,13 @@ func (scs *Service) fetchReactionsForSync(sd *syncData) error {
// fetchPostUsersForSync populates the sync data with all users associated with posts.
func (scs *Service) fetchPostUsersForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "PostUsers", time.Since(start).Seconds())
}
}()
sc, err := scs.server.GetStore().SharedChannel().Get(sd.task.channelID)
if err != nil {
return fmt.Errorf("cannot determine teamID: %w", err)
@ -367,6 +414,13 @@ func (scs *Service) fetchPostUsersForSync(sd *syncData) error {
// fetchPostAttachmentsForSync populates the sync data with any file attachments for new posts.
func (scs *Service) fetchPostAttachmentsForSync(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncCollectionStepDuration(sd.rc.RemoteId, "Attachments", time.Since(start).Seconds())
}
}()
merr := merror.New()
for _, post := range sd.posts {
fis, err := scs.server.GetStore().FileInfo().GetForPost(post.Id, false, true, true)
@ -420,8 +474,14 @@ func (scs *Service) filterPostsForSync(sd *syncData) {
// remote cluster.
// The order of items sent is important: users -> attachments -> posts -> reactions -> profile images
func (scs *Service) sendSyncData(sd *syncData) error {
merr := merror.New()
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendDuration(sd.rc.RemoteId, time.Since(start).Seconds())
}
}()
merr := merror.New()
sanitizeSyncData(sd)
// send users
@ -462,6 +522,13 @@ func (scs *Service) sendSyncData(sd *syncData) error {
// sendUserSyncData sends the collected user updates to the remote cluster.
func (scs *Service) sendUserSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Users", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Users = sd.users
@ -504,6 +571,13 @@ func (scs *Service) sendAttachmentSyncData(sd *syncData) {
// sendPostSyncData sends the collected post updates to the remote cluster.
func (scs *Service) sendPostSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Posts", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Posts = sd.posts
@ -525,6 +599,13 @@ func (scs *Service) sendPostSyncData(sd *syncData) error {
// sendReactionSyncData sends the collected reaction updates to the remote cluster.
func (scs *Service) sendReactionSyncData(sd *syncData) error {
start := time.Now()
defer func() {
if metrics := scs.server.GetMetrics(); metrics != nil {
metrics.ObserveSharedChannelsSyncSendStepDuration(sd.rc.RemoteId, "Reactions", time.Since(start).Seconds())
}
}()
msg := model.NewSyncMsg(sd.task.channelID)
msg.Reactions = sd.reactions