diff --git a/pkg/storage/unified/resource/metrics.go b/pkg/storage/unified/resource/metrics.go index be187eba7b3..8747fa52a8a 100644 --- a/pkg/storage/unified/resource/metrics.go +++ b/pkg/storage/unified/resource/metrics.go @@ -2,7 +2,9 @@ package resource import ( "sync" + "time" + "github.com/grafana/dskit/instrument" "github.com/prometheus/client_golang/prometheus" ) @@ -12,20 +14,21 @@ var ( ) type StorageApiMetrics struct { - OptimisticLockFailed *prometheus.CounterVec + WatchEventLatency *prometheus.HistogramVec } func NewStorageMetrics() *StorageApiMetrics { once.Do(func() { StorageServerMetrics = &StorageApiMetrics{ - OptimisticLockFailed: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "resource_storage", - Name: "optimistic_lock_failed", - Help: "count of optimistic locks failed", - }, - []string{"action"}, - ), + WatchEventLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "storage_server", + Name: "watch_latency_seconds", + Help: "Time (in seconds) spent waiting for watch events to be sent", + Buckets: instrument.DefBuckets, + NativeHistogramBucketFactor: 1.1, // enable native histograms + NativeHistogramMaxBucketNumber: 160, + NativeHistogramMinResetDuration: time.Hour, + }, []string{"resource"}), } }) @@ -33,9 +36,9 @@ func NewStorageMetrics() *StorageApiMetrics { } func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) { - s.OptimisticLockFailed.Collect(ch) + s.WatchEventLatency.Collect(ch) } func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) { - s.OptimisticLockFailed.Describe(ch) + s.WatchEventLatency.Describe(ch) } diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index 801c3198a36..ae976f7640a 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -168,6 +168,12 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { } } + logger := slog.Default().With("logger", "resource-server") + // register metrics + if err := prometheus.Register(NewStorageMetrics()); err != nil { + logger.Warn("failed to register storage metrics", "error", err) + } + // Make this cancelable ctx, cancel := context.WithCancel(claims.WithClaims(context.Background(), &identity.StaticRequester{ @@ -178,7 +184,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { })) return &server{ tracer: opts.Tracer, - log: slog.Default().With("logger", "resource-server"), + log: logger, backend: opts.Backend, index: opts.Index, blob: blobstore, @@ -712,6 +718,12 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error { if err := srv.Send(resp); err != nil { return err } + + // record latency - resource version is a unix timestamp in microseconds so we convert to seconds + latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6 + if latencySeconds > 0 { + StorageServerMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds) + } } } }