From 108202f96cfc1ab393f26c1802dd6a64b4d0b408 Mon Sep 17 00:00:00 2001 From: Jo Date: Mon, 11 Mar 2024 15:18:42 +0100 Subject: [PATCH] UsageStats: Separate context and threads for usage stats (#83963) * separate context and threads for usage stats * use constants * ignore original context * fix runMetricsFunc * fix collector registration Co-authored-by: Gabriel MABILLE * change background ctx * fix test randomness * Add traces to support bundle collector * Remove unecessay span * Add trace to usagestats api * Close spans * Mv trace to bundle * Change span name * use parent context * fix runtime declare of stats * Fix pointer dereference problem on usage stat func Co-authored-by: Karl Persson Co-authored-by: jguer * fix broken support bundle tests by tracer --------- Co-authored-by: Gabriel MABILLE Co-authored-by: gamab Co-authored-by: Karl Persson --- go.mod | 6 +-- pkg/infra/usagestats/service/api.go | 5 ++- pkg/infra/usagestats/service/usage_stats.go | 45 ++++++++++++++----- .../usagestats/statscollector/service.go | 29 +++++------- .../usagestats/statscollector/service_test.go | 13 +++--- .../supportbundlesimpl/service.go | 6 ++- .../supportbundlesimpl/service_bundle.go | 13 +++++- .../supportbundlesimpl/service_bundle_test.go | 4 ++ 8 files changed, 79 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index c99e625f09e..fb5204c52bf 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,7 @@ require ( gopkg.in/mail.v2 v2.3.1 // @grafana/backend-platform gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // @grafana/alerting-squad-backend - xorm.io/builder v0.3.6 // indirect; @grafana/backend-platform + xorm.io/builder v0.3.6 // @grafana/backend-platform xorm.io/core v0.7.3 // @grafana/backend-platform xorm.io/xorm v0.8.2 // @grafana/alerting-squad-backend ) @@ -174,7 +174,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.1-0.20191002090509-6af20e3a5340 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect; @grafana/alerting-squad + github.com/hashicorp/go-multierror v1.1.1 // @grafana/alerting-squad github.com/hashicorp/go-sockaddr v1.0.6 // indirect github.com/hashicorp/golang-lru v0.6.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect @@ -337,7 +337,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect; @grafana/alerting-squad-backend + github.com/hashicorp/golang-lru/v2 v2.0.7 // @grafana/alerting-squad-backend github.com/hashicorp/memberlist v0.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/yaml v0.2.0 // indirect diff --git a/pkg/infra/usagestats/service/api.go b/pkg/infra/usagestats/service/api.go index 5bbbb00ce10..1217b639613 100644 --- a/pkg/infra/usagestats/service/api.go +++ b/pkg/infra/usagestats/service/api.go @@ -20,7 +20,10 @@ func (uss *UsageStats) registerAPIEndpoints() { } func (uss *UsageStats) getUsageReportPreview(ctx *contextmodel.ReqContext) response.Response { - usageReport, err := uss.GetUsageReport(ctx.Req.Context()) + ctxTracer, span := uss.tracer.Start(ctx.Req.Context(), "usageStats.getUsageReportPreview") + defer span.End() + + usageReport, err := uss.GetUsageReport(ctxTracer) if err != nil { return response.Error(http.StatusInternalServerError, "failed to get usage report", err) } diff --git a/pkg/infra/usagestats/service/usage_stats.go b/pkg/infra/usagestats/service/usage_stats.go index 5f40165e596..2f19bd22150 100644 --- a/pkg/infra/usagestats/service/usage_stats.go +++ b/pkg/infra/usagestats/service/usage_stats.go @@ -9,6 +9,7 @@ import ( "reflect" "runtime" "strings" + "sync" "time" "github.com/google/uuid" @@ -19,6 +20,11 @@ import ( "github.com/grafana/grafana/pkg/infra/usagestats" ) +const ( + maxConcurrentCollectors = 5 + collectorTimeoutDuration = 5 * time.Minute +) + var usageStatsURL = "https://stats.grafana.org/grafana-usage-report" func (uss *UsageStats) GetUsageReport(ctx context.Context) (usagestats.Report, error) { @@ -54,21 +60,38 @@ func (uss *UsageStats) GetUsageReport(ctx context.Context) (usagestats.Report, e } func (uss *UsageStats) gatherMetrics(ctx context.Context, metrics map[string]any) { - ctx, span := uss.tracer.Start(ctx, "UsageStats.GatherLoop") + ctxTracer, span := uss.tracer.Start(ctx, "UsageStats.GatherLoop") defer span.End() totC, errC := 0, 0 - for _, fn := range uss.externalMetrics { - fnMetrics, err := uss.runMetricsFunc(ctx, fn) - totC++ - if err != nil { - errC++ - continue - } - for name, value := range fnMetrics { - metrics[name] = value - } + sem := make(chan struct{}, maxConcurrentCollectors) // create a semaphore with a capacity of 5 + var wg sync.WaitGroup + + for _, fn := range uss.externalMetrics { + wg.Add(1) + go func(fn func(context.Context) (map[string]any, error)) { + defer wg.Done() + + sem <- struct{}{} // acquire a token + defer func() { <-sem }() // release the token when done + + ctxWithTimeout, cancel := context.WithTimeout(ctxTracer, collectorTimeoutDuration) + defer cancel() + + fnMetrics, err := uss.runMetricsFunc(ctxWithTimeout, fn) + totC++ + if err != nil { + errC++ + return + } + + for name, value := range fnMetrics { + metrics[name] = value + } + }(fn) } + + wg.Wait() metrics["stats.usagestats.debug.collect.total.count"] = totC metrics["stats.usagestats.debug.collect.error.count"] = errC } diff --git a/pkg/infra/usagestats/statscollector/service.go b/pkg/infra/usagestats/statscollector/service.go index 158cc5f6367..20b0d186f98 100644 --- a/pkg/infra/usagestats/statscollector/service.go +++ b/pkg/infra/usagestats/statscollector/service.go @@ -25,8 +25,8 @@ import ( ) const ( - MIN_DELAY = 30 - MAX_DELAY = 120 + minDelay = 30 + maxDelay = 120 ) type Service struct { @@ -45,7 +45,6 @@ type Service struct { startTime time.Time concurrentUserStatsCache memoConcurrentUserStats promFlavorCache memoPrometheusFlavor - usageStatProviders []registry.ProvidesUsageStats } func ProvideService( @@ -82,8 +81,8 @@ func ProvideService( s.collectDatasourceAccess, s.collectAlertNotifierStats, s.collectPrometheusFlavors, - s.collectAdditionalMetrics, } + for _, c := range collectors { us.RegisterMetricsFunc(c) } @@ -94,12 +93,19 @@ func ProvideService( // RegisterProviders is called only once - during Grafana start up func (s *Service) RegisterProviders(usageStatProviders []registry.ProvidesUsageStats) { s.log.Info("registering usage stat providers", "usageStatsProvidersLen", len(usageStatProviders)) - s.usageStatProviders = usageStatProviders + for _, usageStatProvider := range usageStatProviders { + provider := usageStatProvider.GetUsageStats + collector := func(ctx context.Context) (map[string]interface{}, error) { + return provider(ctx), nil + } + + s.usageStats.RegisterMetricsFunc(collector) + } } func (s *Service) Run(ctx context.Context) error { sendInterval := time.Second * time.Duration(s.cfg.MetricsTotalStatsIntervalSeconds) - nextSendInterval := time.Duration(rand.Intn(MAX_DELAY-MIN_DELAY)+MIN_DELAY) * time.Second + nextSendInterval := time.Duration(rand.Intn(maxDelay-minDelay)+minDelay) * time.Second s.log.Debug("usage stats collector started", "sendInterval", sendInterval, "nextSendInterval", nextSendInterval) updateStatsTicker := time.NewTicker(nextSendInterval) defer updateStatsTicker.Stop() @@ -212,17 +218,6 @@ func (s *Service) collectSystemStats(ctx context.Context) (map[string]any, error return m, nil } -func (s *Service) collectAdditionalMetrics(ctx context.Context) (map[string]any, error) { - m := map[string]any{} - for _, usageStatProvider := range s.usageStatProviders { - stats := usageStatProvider.GetUsageStats(ctx) - for k, v := range stats { - m[k] = v - } - } - return m, nil -} - func (s *Service) collectAlertNotifierStats(ctx context.Context) (map[string]any, error) { m := map[string]any{} // get stats about alert notifier usage diff --git a/pkg/infra/usagestats/statscollector/service_test.go b/pkg/infra/usagestats/statscollector/service_test.go index 9261b490cf9..600f39afac2 100644 --- a/pkg/infra/usagestats/statscollector/service_test.go +++ b/pkg/infra/usagestats/statscollector/service_test.go @@ -93,22 +93,19 @@ func (d dummyUsageStatProvider) GetUsageStats(ctx context.Context) map[string]an } func TestUsageStatsProviders(t *testing.T) { - provider1 := &dummyUsageStatProvider{stats: map[string]any{"my_stat_1": "val1", "my_stat_2": "val2"}} - provider2 := &dummyUsageStatProvider{stats: map[string]any{"my_stat_x": "valx", "my_stat_z": "valz"}} + provider := &dummyUsageStatProvider{stats: map[string]any{"my_stat_x": "valx", "my_stat_z": "valz"}} store := dbtest.NewFakeDB() statsService := statstest.NewFakeService() mockSystemStats(statsService) s := createService(t, setting.NewCfg(), store, statsService) - s.RegisterProviders([]registry.ProvidesUsageStats{provider1, provider2}) + s.RegisterProviders([]registry.ProvidesUsageStats{provider}) - m, err := s.collectAdditionalMetrics(context.Background()) + report, err := s.usageStats.GetUsageReport(context.Background()) require.NoError(t, err, "Expected no error") - assert.Equal(t, "val1", m["my_stat_1"]) - assert.Equal(t, "val2", m["my_stat_2"]) - assert.Equal(t, "valx", m["my_stat_x"]) - assert.Equal(t, "valz", m["my_stat_z"]) + assert.Equal(t, "valx", report.Metrics["my_stat_x"]) + assert.Equal(t, "valz", report.Metrics["my_stat_z"]) } func TestFeatureUsageStats(t *testing.T) { diff --git a/pkg/services/supportbundles/supportbundlesimpl/service.go b/pkg/services/supportbundles/supportbundlesimpl/service.go index df569dc408a..50d5f9f317c 100644 --- a/pkg/services/supportbundles/supportbundlesimpl/service.go +++ b/pkg/services/supportbundles/supportbundlesimpl/service.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/infra/usagestats" ac "github.com/grafana/grafana/pkg/services/accesscontrol" "github.com/grafana/grafana/pkg/services/auth/identity" @@ -34,6 +35,7 @@ type Service struct { pluginSettings pluginsettings.Service pluginStore pluginstore.Store store bundleStore + tracer tracing.Tracer log log.Logger encryptionPublicKeys []string @@ -55,7 +57,8 @@ func ProvideService( routeRegister routing.RouteRegister, settings setting.Provider, sql db.DB, - usageStats usagestats.Service) (*Service, error) { + usageStats usagestats.Service, + tracer tracing.Tracer) (*Service, error) { section := cfg.SectionWithEnvOverrides("support_bundles") s := &Service{ accessControl: accessControl, @@ -69,6 +72,7 @@ func ProvideService( pluginStore: pluginStore, serverAdminOnly: section.Key("server_admin_only").MustBool(true), store: newStore(kvStore), + tracer: tracer, } usageStats.RegisterMetricsFunc(s.getUsageStats) diff --git a/pkg/services/supportbundles/supportbundlesimpl/service_bundle.go b/pkg/services/supportbundles/supportbundlesimpl/service_bundle.go index fddd6488812..f6305e78ec7 100644 --- a/pkg/services/supportbundles/supportbundlesimpl/service_bundle.go +++ b/pkg/services/supportbundles/supportbundlesimpl/service_bundle.go @@ -13,6 +13,7 @@ import ( "time" "filippo.io/age" + "go.opentelemetry.io/otel/attribute" "github.com/grafana/grafana/pkg/services/supportbundles" ) @@ -66,6 +67,10 @@ func (s *Service) startBundleWork(ctx context.Context, collectors []string, uid } func (s *Service) bundle(ctx context.Context, collectors []string, uid string) ([]byte, error) { + ctxTracer, span := s.tracer.Start(ctx, "SupportBundle.bundle") + span.SetAttributes(attribute.String("SupportBundle.bundle.uid", uid)) + defer span.End() + lookup := make(map[string]bool, len(collectors)) for _, c := range collectors { lookup[c] = true @@ -83,7 +88,11 @@ func (s *Service) bundle(ctx context.Context, collectors []string, uid string) ( continue } - item, err := collector.Fn(ctx) + // Trace the collector run + ctxBundler, span := s.tracer.Start(ctxTracer, "SupportBundle.bundle.collector") + span.SetAttributes(attribute.String("SupportBundle.bundle.collector.uid", collector.UID)) + + item, err := collector.Fn(ctxBundler) if err != nil { s.log.Warn("Failed to collect support bundle item", "error", err, "collector", collector.UID) } @@ -92,6 +101,8 @@ func (s *Service) bundle(ctx context.Context, collectors []string, uid string) ( if item != nil { files[item.Filename] = item.FileBytes } + + span.End() } // create tar.gz file diff --git a/pkg/services/supportbundles/supportbundlesimpl/service_bundle_test.go b/pkg/services/supportbundles/supportbundlesimpl/service_bundle_test.go index 016e72d5dbb..5e3891466f0 100644 --- a/pkg/services/supportbundles/supportbundlesimpl/service_bundle_test.go +++ b/pkg/services/supportbundles/supportbundlesimpl/service_bundle_test.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/grafana/pkg/infra/kvstore" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/supportbundles" "github.com/grafana/grafana/pkg/services/supportbundles/bundleregistry" "github.com/grafana/grafana/pkg/services/user" @@ -33,6 +34,7 @@ func TestService_bundleCreate(t *testing.T) { log: log.New("test"), bundleRegistry: bundleregistry.ProvideService(), store: newStore(kvstore.NewFakeKVStore()), + tracer: tracing.InitializeTracerForTest(), } cfg := setting.NewCfg() @@ -66,6 +68,7 @@ func TestService_bundleEncryptDecrypt(t *testing.T) { bundleRegistry: bundleregistry.ProvideService(), store: newStore(kvstore.NewFakeKVStore()), encryptionPublicKeys: []string{testAgePublicKey}, + tracer: tracing.InitializeTracerForTest(), } cfg := setting.NewCfg() @@ -98,6 +101,7 @@ func TestService_bundleEncryptDecryptMultipleRecipients(t *testing.T) { bundleRegistry: bundleregistry.ProvideService(), store: newStore(kvstore.NewFakeKVStore()), encryptionPublicKeys: []string{testAgePublicKey, testAgePublicKey2}, + tracer: tracing.InitializeTracerForTest(), } cfg := setting.NewCfg()