Usage Stats: Decouple from GrafanaLive (#39512)

* Usage Stats: Decouple from GrafanaLive
This commit is contained in:
Joan López de la Franca Beltran 2021-09-22 16:28:40 +02:00 committed by GitHub
parent a4d41d35d0
commit c75737c808
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 125 additions and 92 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
dboards "github.com/grafana/grafana/pkg/dashboards"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/dashboards"
@ -86,9 +87,35 @@ type testState struct {
dashQueries []*models.GetDashboardQuery
}
type usageStatsMock struct {
t *testing.T
metricsFuncs []usagestats.MetricsFunc
}
func (usm *usageStatsMock) RegisterMetricsFunc(fn usagestats.MetricsFunc) {
usm.metricsFuncs = append(usm.metricsFuncs, fn)
}
func (usm *usageStatsMock) GetUsageReport(_ context.Context) (usagestats.Report, error) {
all := make(map[string]interface{})
for _, fn := range usm.metricsFuncs {
fnMetrics, err := fn()
require.NoError(usm.t, err)
for name, value := range fnMetrics {
all[name] = value
}
}
return usagestats.Report{Metrics: all}, nil
}
func (usm *usageStatsMock) ShouldBeReported(_ string) bool {
return true
}
func newTestLive(t *testing.T) *live.GrafanaLive {
cfg := &setting.Cfg{AppURL: "http://localhost:3000/"}
gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t))
gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t), &usageStatsMock{t: t})
require.NoError(t, err)
return gLive
}

View File

@ -12,7 +12,6 @@ import (
"github.com/grafana/grafana/pkg/login/social"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
)
@ -24,7 +23,6 @@ type UsageStats struct {
AlertingUsageStats alerting.UsageStatsQuerier
PluginManager plugins.Manager
SocialService social.Service
grafanaLive *live.GrafanaLive
kvStore *kvstore.NamespacedKVStore
log log.Logger
@ -32,23 +30,12 @@ type UsageStats struct {
oauthProviders map[string]bool
externalMetrics []usagestats.MetricsFunc
concurrentUserStatsCache memoConcurrentUserStats
liveStats liveUsageStats
startTime time.Time
}
type liveUsageStats struct {
numClientsMax int
numClientsMin int
numClientsSum int
numUsersMax int
numUsersMin int
numUsersSum int
sampleCount int
}
func ProvideService(cfg *setting.Cfg, bus bus.Bus, sqlStore *sqlstore.SQLStore,
alertingStats alerting.UsageStatsQuerier, pluginManager plugins.Manager,
socialService social.Service, grafanaLive *live.GrafanaLive,
socialService social.Service,
kvStore kvstore.KVStore) *UsageStats {
s := &UsageStats{
Cfg: cfg,
@ -57,7 +44,6 @@ func ProvideService(cfg *setting.Cfg, bus bus.Bus, sqlStore *sqlstore.SQLStore,
AlertingUsageStats: alertingStats,
oauthProviders: socialService.GetOAuthProviders(),
PluginManager: pluginManager,
grafanaLive: grafanaLive,
kvStore: kvstore.WithNamespace(kvStore, 0, "infra.usagestats"),
log: log.New("infra.usagestats"),
startTime: time.Now(),
@ -110,12 +96,8 @@ func (uss *UsageStats) Run(ctx context.Context) error {
nextSendInterval = sendInterval
sendReportTicker.Reset(nextSendInterval)
}
// always reset live stats every report tick
uss.resetLiveStats()
case <-updateStatsTicker.C:
uss.updateTotalStats()
uss.sampleLiveStats()
case <-ctx.Done():
return ctx.Err()
}

View File

@ -83,20 +83,6 @@ func (uss *UsageStats) GetUsageReport(ctx context.Context) (usagestats.Report, e
metrics["stats.folders_viewers_can_edit.count"] = statsQuery.Result.FoldersViewersCanEdit
metrics["stats.folders_viewers_can_admin.count"] = statsQuery.Result.FoldersViewersCanAdmin
liveUsersAvg := 0
liveClientsAvg := 0
if uss.liveStats.sampleCount > 0 {
liveUsersAvg = uss.liveStats.numUsersSum / uss.liveStats.sampleCount
liveClientsAvg = uss.liveStats.numClientsSum / uss.liveStats.sampleCount
}
metrics["stats.live_samples.count"] = uss.liveStats.sampleCount
metrics["stats.live_users_max.count"] = uss.liveStats.numUsersMax
metrics["stats.live_users_min.count"] = uss.liveStats.numUsersMin
metrics["stats.live_users_avg.count"] = liveUsersAvg
metrics["stats.live_clients_max.count"] = uss.liveStats.numClientsMax
metrics["stats.live_clients_min.count"] = uss.liveStats.numClientsMin
metrics["stats.live_clients_avg.count"] = liveClientsAvg
ossEditionCount := 1
enterpriseEditionCount := 0
if uss.Cfg.IsEnterprise {
@ -323,34 +309,6 @@ var sendUsageStats = func(uss *UsageStats, data *bytes.Buffer) {
}()
}
func (uss *UsageStats) sampleLiveStats() {
current := uss.grafanaLive.UsageStats()
uss.liveStats.sampleCount++
uss.liveStats.numClientsSum += current.NumClients
uss.liveStats.numUsersSum += current.NumUsers
if current.NumClients > uss.liveStats.numClientsMax {
uss.liveStats.numClientsMax = current.NumClients
}
if current.NumClients < uss.liveStats.numClientsMin {
uss.liveStats.numClientsMin = current.NumClients
}
if current.NumUsers > uss.liveStats.numUsersMax {
uss.liveStats.numUsersMax = current.NumUsers
}
if current.NumUsers < uss.liveStats.numUsersMin {
uss.liveStats.numUsersMin = current.NumUsers
}
}
func (uss *UsageStats) resetLiveStats() {
uss.liveStats = liveUsageStats{}
}
func (uss *UsageStats) updateTotalStats() {
if !uss.Cfg.MetricsEndpointEnabled || uss.Cfg.MetricsEndpointDisableTotalStats {
return

View File

@ -11,7 +11,6 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/kvstore"
@ -21,7 +20,6 @@ import (
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/manager"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/assert"
@ -648,16 +646,8 @@ func createService(t *testing.T, cfg setting.Cfg) *UsageStats {
AlertingUsageStats: &alertingUsageMock{},
externalMetrics: make([]usagestats.MetricsFunc, 0),
PluginManager: &fakePluginManager{},
grafanaLive: newTestLive(t),
kvStore: kvstore.WithNamespace(kvstore.ProvideService(sqlStore), 0, "infra.usagestats"),
log: log.New("infra.usagestats"),
startTime: time.Now().Add(-1 * time.Minute),
}
}
func newTestLive(t *testing.T) *live.GrafanaLive {
cfg := &setting.Cfg{AppURL: "http://localhost:3000/"}
gLive, err := live.ProvideService(nil, cfg, routing.NewRouteRegister(), nil, nil, nil, nil, sqlstore.InitTestDB(t))
require.NoError(t, err)
return gLive
}

View File

@ -11,11 +11,17 @@ import (
"sync"
"time"
"github.com/centrifugal/centrifuge"
"github.com/go-redis/redis/v8"
"github.com/gobwas/glob"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/api/response"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/localcache"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/usagestats"
"github.com/grafana/grafana/pkg/middleware"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins/manager"
@ -35,12 +41,7 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
"github.com/grafana/grafana/pkg/util"
"github.com/centrifugal/centrifuge"
"github.com/go-redis/redis/v8"
"github.com/gobwas/glob"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/live"
"golang.org/x/sync/errgroup"
"gopkg.in/macaron.v1"
)
@ -59,7 +60,8 @@ type CoreGrafanaScope struct {
func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, routeRegister routing.RouteRegister,
logsService *cloudwatch.LogsService, pluginManager *manager.PluginManager, cacheService *localcache.CacheService,
dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore) (*GrafanaLive, error) {
dataSourceCache datasources.CacheService, sqlStore *sqlstore.SQLStore,
usageStatsService usagestats.Service) (*GrafanaLive, error) {
g := &GrafanaLive{
Cfg: cfg,
PluginContextProvider: plugCtxProvider,
@ -73,6 +75,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]models.ChannelHandlerFactory),
},
usageStatsService: usageStatsService,
}
logger.Debug("GrafanaLive initialization", "ha", g.IsHA())
@ -325,6 +328,8 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
group.Get("/push/:streamId", g.pushWebsocketHandler)
}, middleware.ReqOrgAdmin)
g.registerUsageMetrics()
return g, nil
}
@ -364,11 +369,9 @@ type GrafanaLive struct {
contextGetter *liveplugin.ContextGetter
runStreamManager *runstream.Manager
storage *database.Storage
}
type UsageStats struct {
NumClients int
NumUsers int
usageStatsService usagestats.Service
usageStats usageStats
}
func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, error) {
@ -384,11 +387,30 @@ func (g *GrafanaLive) getStreamPlugin(pluginID string) (backend.StreamHandler, e
}
func (g *GrafanaLive) Run(ctx context.Context) error {
eGroup, eCtx := errgroup.WithContext(ctx)
eGroup.Go(func() error {
updateStatsTicker := time.NewTicker(time.Minute * 30)
defer updateStatsTicker.Stop()
for {
select {
case <-updateStatsTicker.C:
g.sampleLiveStats()
case <-ctx.Done():
return ctx.Err()
}
}
})
if g.runStreamManager != nil {
// Only run stream manager if GrafanaLive properly initialized.
return g.runStreamManager.Run(ctx)
eGroup.Go(func() error {
return g.runStreamManager.Run(eCtx)
})
}
return nil
return eGroup.Wait()
}
func (g *GrafanaLive) ChannelRuleStorage() pipeline.RuleStorage {
@ -781,13 +803,6 @@ func (g *GrafanaLive) ClientCount(orgID int64, channel string) (int, error) {
return len(p.Presence), nil
}
func (g *GrafanaLive) UsageStats() UsageStats {
clients := g.node.Hub().NumClients()
users := g.node.Hub().NumUsers()
return UsageStats{NumClients: clients, NumUsers: users}
}
func (g *GrafanaLive) HandleHTTPPublish(ctx *models.ReqContext, cmd dtos.LivePublishCmd) response.Response {
addr, err := live.ParseChannel(cmd.Channel)
if err != nil {
@ -990,3 +1005,64 @@ func handleLog(msg centrifuge.LogEntry) {
loggerCF.Debug(msg.Message, arr...)
}
}
func (g *GrafanaLive) sampleLiveStats() {
numClients := g.node.Hub().NumClients()
numUsers := g.node.Hub().NumUsers()
g.usageStats.sampleCount++
g.usageStats.numClientsSum += numClients
g.usageStats.numUsersSum += numUsers
if numClients > g.usageStats.numClientsMax {
g.usageStats.numClientsMax = numClients
}
if numClients < g.usageStats.numClientsMin {
g.usageStats.numClientsMin = numClients
}
if numUsers > g.usageStats.numUsersMax {
g.usageStats.numUsersMax = numUsers
}
if numUsers < g.usageStats.numUsersMin {
g.usageStats.numUsersMin = numUsers
}
}
func (g *GrafanaLive) registerUsageMetrics() {
g.usageStatsService.RegisterMetricsFunc(func() (map[string]interface{}, error) {
liveUsersAvg := 0
liveClientsAvg := 0
if g.usageStats.sampleCount > 0 {
liveUsersAvg = g.usageStats.numUsersSum / g.usageStats.sampleCount
liveClientsAvg = g.usageStats.numClientsSum / g.usageStats.sampleCount
}
metrics := map[string]interface{}{
"stats.live_samples.count": g.usageStats.sampleCount,
"stats.live_users_max.count": g.usageStats.numUsersMax,
"stats.live_users_min.count": g.usageStats.numUsersMin,
"stats.live_users_avg.count": liveUsersAvg,
"stats.live_clients_max.count": g.usageStats.numClientsMax,
"stats.live_clients_min.count": g.usageStats.numClientsMin,
"stats.live_clients_avg.count": liveClientsAvg,
}
g.usageStats = usageStats{}
return metrics, nil
})
}
type usageStats struct {
numClientsMax int
numClientsMin int
numClientsSum int
numUsersMax int
numUsersMin int
numUsersSum int
sampleCount int
}