mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
SSE: (Chore/Instrumentation) Add ds_queries_total metric and move met… (#66695)
* SSE: (Chore/Instrumentation) Add ds_queries_total metric and move metrics to service
This commit is contained in:
parent
45e1bfe421
commit
2f13c851e4
@ -49,6 +49,7 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
queries := []Query{{
|
||||
RefID: "A",
|
||||
|
47
pkg/expr/metrics.go
Normal file
47
pkg/expr/metrics.go
Normal file
@ -0,0 +1,47 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
metricsSubSystem = "sse"
|
||||
metricsNamespace = "grafana"
|
||||
)
|
||||
|
||||
type metrics struct {
|
||||
dsRequests *prometheus.CounterVec
|
||||
|
||||
// older metric
|
||||
expressionsQuerySummary *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
func newMetrics(reg prometheus.Registerer) *metrics {
|
||||
m := &metrics{
|
||||
dsRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: metricsNamespace,
|
||||
Subsystem: metricsSubSystem,
|
||||
Name: "ds_queries_total",
|
||||
Help: "Number of datasource queries made via server side expression requests",
|
||||
}, []string{"error", "dataplane"}),
|
||||
|
||||
// older (No Namespace or Subsystem)
|
||||
expressionsQuerySummary: prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "expressions_queries_duration_milliseconds",
|
||||
Help: "Expressions query summary",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{"status"},
|
||||
),
|
||||
}
|
||||
|
||||
if reg != nil {
|
||||
reg.MustRegister(
|
||||
m.dsRequests,
|
||||
m.expressionsQuerySummary,
|
||||
)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
@ -230,11 +230,16 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
}
|
||||
|
||||
responseType := "unknown"
|
||||
respStatus := "success"
|
||||
var useDataplane bool
|
||||
defer func() {
|
||||
if e != nil {
|
||||
responseType = "error"
|
||||
respStatus = "failure"
|
||||
}
|
||||
logger.Debug("Data source queried", "responseType", responseType)
|
||||
|
||||
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane)).Inc()
|
||||
}()
|
||||
|
||||
resp, err := s.dataService.QueryData(ctx, req)
|
||||
@ -259,7 +264,9 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
return mathexp.Results{}, QueryError{RefID: dn.refID, Err: response.Error}
|
||||
}
|
||||
|
||||
if dt, use, _ := shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane)); use {
|
||||
var dt data.FrameType
|
||||
dt, useDataplane, _ = shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane))
|
||||
if useDataplane {
|
||||
logger.Debug("Handling SSE data source query through dataplane", "datatype", dt)
|
||||
return handleDataplaneFrames(dt.Kind(), response.Frames)
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
@ -42,14 +43,17 @@ type Service struct {
|
||||
dataService backend.QueryDataHandler
|
||||
dataSourceService datasources.DataSourceService
|
||||
features featuremgmt.FeatureToggles
|
||||
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService, features featuremgmt.FeatureToggles) *Service {
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService, features featuremgmt.FeatureToggles, registerer prometheus.Registerer) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
dataService: pluginClient,
|
||||
dataSourceService: dataSourceService,
|
||||
features: features,
|
||||
metrics: newMetrics(registerer),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ func TestService(t *testing.T) {
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
|
||||
queries := []Query{
|
||||
|
@ -7,28 +7,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
)
|
||||
|
||||
var (
|
||||
expressionsQuerySummary *prometheus.SummaryVec
|
||||
)
|
||||
|
||||
func init() {
|
||||
expressionsQuerySummary = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Name: "expressions_queries_duration_milliseconds",
|
||||
Help: "Expressions query summary",
|
||||
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
||||
},
|
||||
[]string{"status"},
|
||||
)
|
||||
|
||||
prometheus.MustRegister(expressionsQuerySummary)
|
||||
}
|
||||
|
||||
// Request is similar to plugins.DataQuery but with the Time Ranges is per Query.
|
||||
type Request struct {
|
||||
Headers map[string]string
|
||||
@ -97,7 +79,7 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request
|
||||
respStatus = "failure"
|
||||
}
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
s.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
}()
|
||||
|
||||
// Build the pipeline from the request, checking for ordering issues (e.g. loops)
|
||||
|
@ -533,7 +533,7 @@ func TestValidate(t *testing.T) {
|
||||
pluginsStore: store,
|
||||
})
|
||||
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}), store)
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil), store)
|
||||
evalCtx := NewContext(context.Background(), u)
|
||||
|
||||
err := evaluator.Validate(evalCtx, condition)
|
||||
|
@ -781,7 +781,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
|
||||
var evaluator = evalMock
|
||||
if evalMock == nil {
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}), &plugins.FakePluginStore{})
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil), &plugins.FakePluginStore{})
|
||||
}
|
||||
|
||||
if registry == nil {
|
||||
|
@ -446,7 +446,7 @@ func setup(t *testing.T) *testContext {
|
||||
DataSources: nil,
|
||||
SimulatePluginFailure: false,
|
||||
}
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService, &featuremgmt.FeatureManager{})
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService, &featuremgmt.FeatureManager{}, nil)
|
||||
queryService := ProvideService(setting.NewCfg(), dc, exprService, rv, ds, pc) // provider belonging to this package
|
||||
return &testContext{
|
||||
pluginContext: pc,
|
||||
|
Loading…
Reference in New Issue
Block a user