Unified Storage: Add watch latency metric (#93509)

* adds metric for watch latency

* registers storage metrics when creating a new ResourceServer

* defines the latency (in milliseconds) as the diff between now and the RV. Still need to wait until PR for switching RV to millisecond timestamp is rolled out.

* should be micro seconds not milli

* for watch latency, use diff between now and resource version and convert to seconds

* fix typo
This commit is contained in:
owensmallwood 2024-10-21 17:15:11 -06:00 committed by GitHub
parent cab4288b88
commit bda27ec8c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 27 additions and 12 deletions

View File

@ -2,7 +2,9 @@ package resource
import (
"sync"
"time"
"github.com/grafana/dskit/instrument"
"github.com/prometheus/client_golang/prometheus"
)
@ -12,20 +14,21 @@ var (
)
type StorageApiMetrics struct {
OptimisticLockFailed *prometheus.CounterVec
WatchEventLatency *prometheus.HistogramVec
}
func NewStorageMetrics() *StorageApiMetrics {
once.Do(func() {
StorageServerMetrics = &StorageApiMetrics{
OptimisticLockFailed: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "resource_storage",
Name: "optimistic_lock_failed",
Help: "count of optimistic locks failed",
},
[]string{"action"},
),
WatchEventLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "storage_server",
Name: "watch_latency_seconds",
Help: "Time (in seconds) spent waiting for watch events to be sent",
Buckets: instrument.DefBuckets,
NativeHistogramBucketFactor: 1.1, // enable native histograms
NativeHistogramMaxBucketNumber: 160,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"resource"}),
}
})
@ -33,9 +36,9 @@ func NewStorageMetrics() *StorageApiMetrics {
}
func (s *StorageApiMetrics) Collect(ch chan<- prometheus.Metric) {
s.OptimisticLockFailed.Collect(ch)
s.WatchEventLatency.Collect(ch)
}
func (s *StorageApiMetrics) Describe(ch chan<- *prometheus.Desc) {
s.OptimisticLockFailed.Describe(ch)
s.WatchEventLatency.Describe(ch)
}

View File

@ -168,6 +168,12 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
}
logger := slog.Default().With("logger", "resource-server")
// register metrics
if err := prometheus.Register(NewStorageMetrics()); err != nil {
logger.Warn("failed to register storage metrics", "error", err)
}
// Make this cancelable
ctx, cancel := context.WithCancel(claims.WithClaims(context.Background(),
&identity.StaticRequester{
@ -178,7 +184,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}))
return &server{
tracer: opts.Tracer,
log: slog.Default().With("logger", "resource-server"),
log: logger,
backend: opts.Backend,
index: opts.Index,
blob: blobstore,
@ -712,6 +718,12 @@ func (s *server) Watch(req *WatchRequest, srv ResourceStore_WatchServer) error {
if err := srv.Send(resp); err != nil {
return err
}
// record latency - resource version is a unix timestamp in microseconds so we convert to seconds
latencySeconds := float64(time.Now().UnixMicro()-event.ResourceVersion) / 1e6
if latencySeconds > 0 {
StorageServerMetrics.WatchEventLatency.WithLabelValues(event.Key.Resource).Observe(latencySeconds)
}
}
}
}