diff --git a/pkg/api/dashboard_test.go b/pkg/api/dashboard_test.go index ce80c2c137a..abb6f43b702 100644 --- a/pkg/api/dashboard_test.go +++ b/pkg/api/dashboard_test.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" dboards "github.com/grafana/grafana/pkg/dashboards" + "github.com/grafana/grafana/pkg/infra/usagestats" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/services/alerting" "github.com/grafana/grafana/pkg/services/dashboards" @@ -86,9 +87,35 @@ type testState struct { dashQueries []*models.GetDashboardQuery } +type usageStatsMock struct { + t *testing.T + metricsFuncs []usagestats.MetricsFunc +} + +func (usm *usageStatsMock) RegisterMetricsFunc(fn usagestats.MetricsFunc) { + usm.metricsFuncs = append(usm.metricsFuncs, fn) +} + +func (usm *usageStatsMock) GetUsageReport(_ context.Context) (usagestats.Report, error) { + all := make(map[string]interface{}) + for _, fn := range usm.metricsFuncs { + fnMetrics, err := fn() + require.NoError(usm.t, err) + + for name, value := range fnMetrics { + all[name] = value + } + } + return usagestats.Report{Metrics: all}, nil +} + +func (usm *usageStatsMock) ShouldBeReported(_ string) bool { + return true +} + func newTestLive(t *testing.T) *live.GrafanaLive { cfg := &setting.Cfg{AppURL: "http://localhost:3000/"} - gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t)) + gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), &usageStatsMock{t: t}) require.NoError(t, err) return gLive } diff --git a/pkg/infra/usagestats/service/service.go b/pkg/infra/usagestats/service/service.go index 89247da7983..dd8fd32d7fd 100644 --- a/pkg/infra/usagestats/service/service.go +++ b/pkg/infra/usagestats/service/service.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/grafana/pkg/login/social" "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/services/alerting" - "github.com/grafana/grafana/pkg/services/live" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" ) @@ -24,7 +23,6 @@ type UsageStats struct { AlertingUsageStats alerting.UsageStatsQuerier PluginManager plugins.Manager SocialService social.Service - grafanaLive *live.GrafanaLive kvStore *kvstore.NamespacedKVStore log log.Logger @@ -32,23 +30,12 @@ type UsageStats struct { oauthProviders map[string]bool externalMetrics []usagestats.MetricsFunc concurrentUserStatsCache memoConcurrentUserStats - liveStats liveUsageStats startTime time.Time } -type liveUsageStats struct { - numClientsMax int - numClientsMin int - numClientsSum int - numUsersMax int - numUsersMin int - numUsersSum int - sampleCount int -} - func ProvideService(cfg *setting.Cfg, bus bus.Bus, sqlStore *sqlstore.SQLStore, alertingStats alerting.UsageStatsQuerier, pluginManager plugins.Manager, - socialService social.Service, grafanaLive *live.GrafanaLive, + socialService social.Service, kvStore kvstore.KVStore) *UsageStats { s := &UsageStats{ Cfg: cfg, @@ -57,7 +44,6 @@ func ProvideService(cfg *setting.Cfg, bus bus.Bus, sqlStore *sqlstore.SQLStore, AlertingUsageStats: alertingStats, oauthProviders: socialService.GetOAuthProviders(), PluginManager: pluginManager, - grafanaLive: grafanaLive, kvStore: kvstore.WithNamespace(kvStore, 0, "infra.usagestats"), log: log.New("infra.usagestats"), startTime: time.Now(), @@ -110,12 +96,8 @@ func (uss *UsageStats) Run(ctx context.Context) error { nextSendInterval = sendInterval sendReportTicker.Reset(nextSendInterval) } - - // always reset live stats every report tick - uss.resetLiveStats() case <-updateStatsTicker.C: uss.updateTotalStats() - uss.sampleLiveStats() case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/infra/usagestats/service/usage_stats.go b/pkg/infra/usagestats/service/usage_stats.go index 3df4cb226c9..2175b077655 100644 --- a/pkg/infra/usagestats/service/usage_stats.go +++ b/pkg/infra/usagestats/service/usage_stats.go @@ -83,20 +83,6 @@ func (uss *UsageStats) GetUsageReport(ctx context.Context) (usagestats.Report, e metrics["stats.folders_viewers_can_edit.count"] = statsQuery.Result.FoldersViewersCanEdit metrics["stats.folders_viewers_can_admin.count"] = statsQuery.Result.FoldersViewersCanAdmin - liveUsersAvg := 0 - liveClientsAvg := 0 - if uss.liveStats.sampleCount > 0 { - liveUsersAvg = uss.liveStats.numUsersSum / uss.liveStats.sampleCount - liveClientsAvg = uss.liveStats.numClientsSum / uss.liveStats.sampleCount - } - metrics["stats.live_samples.count"] = uss.liveStats.sampleCount - metrics["stats.live_users_max.count"] = uss.liveStats.numUsersMax - metrics["stats.live_users_min.count"] = uss.liveStats.numUsersMin - metrics["stats.live_users_avg.count"] = liveUsersAvg - metrics["stats.live_clients_max.count"] = uss.liveStats.numClientsMax - metrics["stats.live_clients_min.count"] = uss.liveStats.numClientsMin - metrics["stats.live_clients_avg.count"] = liveClientsAvg - ossEditionCount := 1 enterpriseEditionCount := 0 if uss.Cfg.IsEnterprise { @@ -323,34 +309,6 @@ var sendUsageStats = func(uss *UsageStats, data *bytes.Buffer) { }() } -func (uss *UsageStats) sampleLiveStats() { - current := uss.grafanaLive.UsageStats() - - uss.liveStats.sampleCount++ - uss.liveStats.numClientsSum += current.NumClients - uss.liveStats.numUsersSum += current.NumUsers - - if current.NumClients > uss.liveStats.numClientsMax { - uss.liveStats.numClientsMax = current.NumClients - } - - if current.NumClients < uss.liveStats.numClientsMin { - uss.liveStats.numClientsMin = current.NumClients - } - - if current.NumUsers > uss.liveStats.numUsersMax { - uss.liveStats.numUsersMax = current.NumUsers - } - - if current.NumUsers < uss.liveStats.numUsersMin { - uss.liveStats.numUsersMin = current.NumUsers - } -} - -func (uss *UsageStats) resetLiveStats() { - uss.liveStats = liveUsageStats{} -} - func (uss *UsageStats) updateTotalStats() { if !uss.Cfg.MetricsEndpointEnabled || uss.Cfg.MetricsEndpointDisableTotalStats { return diff --git a/pkg/infra/usagestats/service/usage_stats_test.go b/pkg/infra/usagestats/service/usage_stats_test.go index e6114aa836e..22eaf3f1fcd 100644 --- a/pkg/infra/usagestats/service/usage_stats_test.go +++ b/pkg/infra/usagestats/service/usage_stats_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/infra/kvstore" @@ -21,7 +20,6 @@ import ( "github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins/manager" "github.com/grafana/grafana/pkg/services/alerting" - "github.com/grafana/grafana/pkg/services/live" "github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/setting" "github.com/stretchr/testify/assert" @@ -648,16 +646,8 @@ func createService(t *testing.T, cfg setting.Cfg) *UsageStats { AlertingUsageStats: &alertingUsageMock{}, externalMetrics: make([]usagestats.MetricsFunc, 0), PluginManager: &fakePluginManager{}, - grafanaLive: newTestLive(t), kvStore: kvstore.WithNamespace(kvstore.ProvideService(sqlStore), 0, "infra.usagestats"), log: log.New("infra.usagestats"), startTime: time.Now().Add(-1 * time.Minute), } } - -func newTestLive(t *testing.T) *live.GrafanaLive { - cfg := &setting.Cfg{AppURL: "http://localhost:3000/"} - gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t)) - require.NoError(t, err) - return gLive -} diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 5f61a0c8f2e..33dda1da53b 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -11,11 +11,17 @@ import ( "sync" "time" + "github.com/centrifugal/centrifuge" + "github.com/go-redis/redis/v8" + "github.com/gobwas/glob" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/live" "github.com/grafana/grafana/pkg/api/dtos" "github.com/grafana/grafana/pkg/api/response" "github.com/grafana/grafana/pkg/api/routing" "github.com/grafana/grafana/pkg/infra/localcache" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/usagestats" "github.com/grafana/grafana/pkg/middleware" "github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/plugins/manager" @@ -35,12 +41,7 @@ import ( "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/tsdb/cloudwatch" "github.com/grafana/grafana/pkg/util" - - "github.com/centrifugal/centrifuge" - "github.com/go-redis/redis/v8" - "github.com/gobwas/glob" - "github.com/grafana/grafana-plugin-sdk-go/backend" - "github.com/grafana/grafana-plugin-sdk-go/live" + "golang.org/x/sync/errgroup" "gopkg.in/macaron.v1" ) @@ -59,7 +60,8 @@ type CoreGrafanaScope struct { func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, routeRegister routing.RouteRegister, logsService *cloudwatch.LogsService, pluginManager *manager.PluginManager, cacheService *localcache.CacheService, - dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore) (*GrafanaLive, error) { + dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore, + usageStatsService usagestats.Service) (*GrafanaLive, error) { g := &GrafanaLive{ Cfg: cfg, PluginContextProvider: plugCtxProvider, @@ -73,6 +75,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r GrafanaScope: CoreGrafanaScope{ Features: make(map[string]models.ChannelHandlerFactory), }, + usageStatsService: usageStatsService, } logger.Debug("GrafanaLive initialization", "ha", g.IsHA()) @@ -325,6 +328,8 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r group.Get("/push/:streamId", g.pushWebsocketHandler) }, middleware.ReqOrgAdmin) + g.registerUsageMetrics() + return g, nil } @@ -364,11 +369,9 @@ type GrafanaLive struct { contextGetter *liveplugin.ContextGetter runStreamManager *runstream.Manager storage *database.Storage -} -type UsageStats struct { - NumClients int - NumUsers int + usageStatsService usagestats.Service + usageStats usageStats } func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) { @@ -384,11 +387,30 @@ func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, e } func (g *GrafanaLive) Run(ctx context.Context) error { + eGroup, eCtx := errgroup.WithContext(ctx) + + eGroup.Go(func() error { + updateStatsTicker := time.NewTicker(time.Minute * 30) + defer updateStatsTicker.Stop() + + for { + select { + case <-updateStatsTicker.C: + g.sampleLiveStats() + case <-ctx.Done(): + return ctx.Err() + } + } + }) + if g.runStreamManager != nil { // Only run stream manager if GrafanaLive properly initialized. - return g.runStreamManager.Run(ctx) + eGroup.Go(func() error { + return g.runStreamManager.Run(eCtx) + }) } - return nil + + return eGroup.Wait() } func (g *GrafanaLive) ChannelRuleStorage() pipeline.RuleStorage { @@ -781,13 +803,6 @@ func (g *GrafanaLive) ClientCount(orgID int64, channel string) (int, error) { return len(p.Presence), nil } -func (g *GrafanaLive) UsageStats() UsageStats { - clients := g.node.Hub().NumClients() - users := g.node.Hub().NumUsers() - - return UsageStats{NumClients: clients, NumUsers: users} -} - func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response { addr, err := live.ParseChannel(cmd.Channel) if err != nil { @@ -990,3 +1005,64 @@ func handleLog(msg centrifuge.LogEntry) { loggerCF.Debug(msg.Message, arr...) } } + +func (g *GrafanaLive) sampleLiveStats() { + numClients := g.node.Hub().NumClients() + numUsers := g.node.Hub().NumUsers() + + g.usageStats.sampleCount++ + g.usageStats.numClientsSum += numClients + g.usageStats.numUsersSum += numUsers + + if numClients > g.usageStats.numClientsMax { + g.usageStats.numClientsMax = numClients + } + + if numClients < g.usageStats.numClientsMin { + g.usageStats.numClientsMin = numClients + } + + if numUsers > g.usageStats.numUsersMax { + g.usageStats.numUsersMax = numUsers + } + + if numUsers < g.usageStats.numUsersMin { + g.usageStats.numUsersMin = numUsers + } +} + +func (g *GrafanaLive) registerUsageMetrics() { + g.usageStatsService.RegisterMetricsFunc(func() (map[string]interface{}, error) { + liveUsersAvg := 0 + liveClientsAvg := 0 + + if g.usageStats.sampleCount > 0 { + liveUsersAvg = g.usageStats.numUsersSum / g.usageStats.sampleCount + liveClientsAvg = g.usageStats.numClientsSum / g.usageStats.sampleCount + } + + metrics := map[string]interface{}{ + "stats.live_samples.count": g.usageStats.sampleCount, + "stats.live_users_max.count": g.usageStats.numUsersMax, + "stats.live_users_min.count": g.usageStats.numUsersMin, + "stats.live_users_avg.count": liveUsersAvg, + "stats.live_clients_max.count": g.usageStats.numClientsMax, + "stats.live_clients_min.count": g.usageStats.numClientsMin, + "stats.live_clients_avg.count": liveClientsAvg, + } + + g.usageStats = usageStats{} + + return metrics, nil + }) +} + +type usageStats struct { + numClientsMax int + numClientsMin int + numClientsSum int + numUsersMax int + numUsersMin int + numUsersSum int + sampleCount int +}