mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Plugins: Improve instrumentation by adding metrics and tracing (#61035)
* WIP: Plugins tracing * Trace ID middleware * Add prometheus metrics and tracing to plugins updater * Add TODOs * Add instrumented http client * Add tracing to grafana update checker * Goimports * Moved plugins tracing to middleware * goimports, fix tests * Removed X-Trace-Id header * Fix comment in NewTracingHeaderMiddleware * Add metrics to instrumented http client * Add instrumented http client options * Removed unused function * Switch to contextual logger * Refactoring, fix tests * Moved InstrumentedHTTPClient and PrometheusMetrics to their own package * Tracing middleware: handle errors * Report span status codes when recording errors * Add tests for tracing middleware * Moved fakeSpan and fakeTracer to pkg/infra/tracing * Add TestHTTPClientTracing * Lint * Changes after PR review * Tests: Made "ended" in FakeSpan private, allow calling End only once * Testing: panic in FakeSpan if span already ended * Refactoring: Simplify Grafana updater checks * Refactoring: Simplify plugins updater error checks and logs * Fix wrong call to checkForUpdates -> instrumentedCheckForUpdates * Tests: Fix wrong call to checkForUpdates -> instrumentedCheckForUpdates * Log update checks duration, use Info log level for check succeeded logs * Add plugin context span attributes in tracing_middleware * Refactor prometheus metrics as httpclient middleware * Fix call to ProvidePluginsService in plugins_test.go * Propagate context to update checker outgoing http requests * Plugin client tracing middleware: Removed operation name in status * Fix tests * Goimports tracing_middleware.go * Goimports * Fix imports * Changed span name to plugins client middleware * Add span name assertion in TestTracingMiddleware * Removed Prometheus metrics middleware from grafana and plugins updatechecker * Add span attributes for ds name, type, uid, panel and dashboard ids * Fix http header reading in tracing middlewares * Use contexthandler.FromContext, add X-Query-Group-Id * Add test for RunStream * Fix imports * Changes from PR review * TestTracingMiddleware: Changed assert to require for didPanic assertion * Lint * Fix imports
This commit is contained in:
parent
0beb768427
commit
a89202eab2
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/localcache"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/provider"
|
||||
"github.com/grafana/grafana/pkg/plugins/config"
|
||||
@ -103,7 +104,7 @@ func TestCallResource(t *testing.T) {
|
||||
req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
return errors.New("something went wrong")
|
||||
}),
|
||||
}, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{})...)
|
||||
}, pluginsintegration.CreateMiddlewares(cfg, &oauthtokentest.Service{}, tracing.InitializeTracerForTest())...)
|
||||
require.NoError(t, err)
|
||||
|
||||
srv = SetupAPITestServer(t, func(hs *HTTPServer) {
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/dtos"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/config"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/fakes"
|
||||
@ -693,7 +694,9 @@ func Test_PluginsList_AccessControl(t *testing.T) {
|
||||
hs.Cfg = setting.NewCfg()
|
||||
hs.PluginSettings = &pluginSettings
|
||||
hs.pluginStore = pluginStore
|
||||
hs.pluginsUpdateChecker = updatechecker.ProvidePluginsService(hs.Cfg, pluginStore)
|
||||
var err error
|
||||
hs.pluginsUpdateChecker, err = updatechecker.ProvidePluginsService(hs.Cfg, pluginStore, tracing.InitializeTracerForTest())
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
res, err := server.Send(webtest.RequestWithSignedInUser(server.NewGetRequest("/api/plugins"), userWithPermissions(1, tc.permissions)))
|
||||
|
@ -0,0 +1,88 @@
|
||||
package httpclientprovider
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// PrometheusMetrics groups some metrics for a PrometheusMetricsMiddleware
|
||||
type PrometheusMetrics struct {
|
||||
requestsCounter prometheus.Counter
|
||||
failureCounter prometheus.Counter
|
||||
durationSecondsHistogram prometheus.Histogram
|
||||
inFlightGauge prometheus.Gauge
|
||||
}
|
||||
|
||||
// NewPrometheusMetricsMiddleware returns a new *PrometheusMetrics with pre-filled metrics, with the specified prefix
|
||||
func NewPrometheusMetricsMiddleware(prefix string) *PrometheusMetrics {
|
||||
return &PrometheusMetrics{
|
||||
requestsCounter: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: prefix + "_request_total",
|
||||
}),
|
||||
failureCounter: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: prefix + "_failure_total",
|
||||
}),
|
||||
durationSecondsHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: prefix + "_request_duration_seconds",
|
||||
}),
|
||||
inFlightGauge: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: prefix + "_in_flight_request",
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// Register registers the metrics in the current PrometheusMetrics into the provided registry
|
||||
func (m *PrometheusMetrics) Register(registry prometheus.Registerer) error {
|
||||
for _, collector := range []prometheus.Collector{
|
||||
m.requestsCounter, m.failureCounter, m.durationSecondsHistogram, m.inFlightGauge,
|
||||
} {
|
||||
if err := registry.Register(collector); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// MustRegister is like Register, but, in case of failure, it panics instead of returning an error
|
||||
func (m *PrometheusMetrics) MustRegister(registry prometheus.Registerer) {
|
||||
if err := m.Register(registry); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// WithMustRegister calls MustRegister and returns itself. This is to allow to chain the method call
|
||||
// upon initialization, useful when declaring metrics in the global scope:
|
||||
//
|
||||
// var svcMetrics = NewPrometheusMetricsMiddleware("my_client").WithMustRegister(prometheus.DefaultRegisterer)
|
||||
func (m *PrometheusMetrics) WithMustRegister(registry prometheus.Registerer) *PrometheusMetrics {
|
||||
m.MustRegister(registry)
|
||||
return m
|
||||
}
|
||||
|
||||
// PrometheusMetricsMiddleware is a middleware that will mutate the in flight, requests, duration and
|
||||
// failure count on the provided *PrometheusMetrics instance. This can be used to count the number of requests,
|
||||
// successful requests and errors that go through the httpclient, as well as to track the response times.
|
||||
// For the metrics to be exposed properly, the provided *PrometheusMetrics should already be registered in a Prometheus
|
||||
// registry.
|
||||
func PrometheusMetricsMiddleware(metrics *PrometheusMetrics) httpclient.Middleware {
|
||||
return httpclient.MiddlewareFunc(func(opts httpclient.Options, next http.RoundTripper) http.RoundTripper {
|
||||
return httpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
startTime := time.Now()
|
||||
metrics.inFlightGauge.Inc()
|
||||
|
||||
res, err := next.RoundTrip(req)
|
||||
|
||||
metrics.inFlightGauge.Dec()
|
||||
metrics.requestsCounter.Inc()
|
||||
metrics.durationSecondsHistogram.Observe(time.Since(startTime).Seconds())
|
||||
if err != nil || (res != nil && !(res.StatusCode >= 200 && res.StatusCode <= 299)) {
|
||||
metrics.failureCounter.Inc()
|
||||
}
|
||||
|
||||
return res, err
|
||||
})
|
||||
})
|
||||
}
|
@ -0,0 +1,94 @@
|
||||
package httpclientprovider
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPrometheusMetricsMiddleware(t *testing.T) {
|
||||
noOpHandlerFunc := func(writer http.ResponseWriter, request *http.Request) {}
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
handler http.HandlerFunc
|
||||
assert func(t *testing.T, metrics *PrometheusMetrics)
|
||||
}{
|
||||
{
|
||||
name: "successful",
|
||||
assert: func(t *testing.T, metrics *PrometheusMetrics) {
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.inFlightGauge))
|
||||
require.Equal(t, float64(1), testutil.ToFloat64(metrics.requestsCounter))
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.failureCounter))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "failure",
|
||||
handler: func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.WriteHeader(http.StatusInternalServerError)
|
||||
},
|
||||
assert: func(t *testing.T, metrics *PrometheusMetrics) {
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.inFlightGauge))
|
||||
require.Equal(t, float64(1), testutil.ToFloat64(metrics.requestsCounter))
|
||||
require.Equal(t, float64(1), testutil.ToFloat64(metrics.failureCounter))
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
// Create metrics and make sure they are 0
|
||||
metrics := NewPrometheusMetricsMiddleware("test")
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.inFlightGauge))
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.requestsCounter))
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.failureCounter))
|
||||
|
||||
// Set up test server
|
||||
// Default to noOpHandlerFunc if it's not provided in test case
|
||||
h := tc.handler
|
||||
if h == nil {
|
||||
h = noOpHandlerFunc
|
||||
}
|
||||
srv := httptest.NewServer(h)
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
// Make request with the prometheus handling middleware
|
||||
cl, err := httpclient.New(httpclient.Options{
|
||||
Middlewares: []httpclient.Middleware{PrometheusMetricsMiddleware(metrics)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := cl.Get(srv.URL)
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
// Run test-case-specific assertions
|
||||
tc.assert(t, metrics)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("in flight", func(t *testing.T) {
|
||||
metrics := NewPrometheusMetricsMiddleware("test")
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.inFlightGauge))
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
|
||||
// Assert in-flight requests
|
||||
require.Equal(t, float64(1), testutil.ToFloat64(metrics.inFlightGauge), "in flight should increase during request")
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
cl, err := httpclient.New(httpclient.Options{
|
||||
Middlewares: []httpclient.Middleware{PrometheusMetricsMiddleware(metrics)},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err := cl.Get(srv.URL)
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, float64(0), testutil.ToFloat64(metrics.inFlightGauge), "in flight should decrease after response")
|
||||
})
|
||||
}
|
@ -7,12 +7,13 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1,8 +1,14 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func InitializeTracerForTest() Tracer {
|
||||
@ -14,3 +20,95 @@ func InitializeTracerForTest() Tracer {
|
||||
_ = ots.initOpentelemetryTracer()
|
||||
return ots
|
||||
}
|
||||
|
||||
type FakeSpan struct {
|
||||
Name string
|
||||
|
||||
ended bool
|
||||
Attributes map[attribute.Key]attribute.Value
|
||||
StatusCode codes.Code
|
||||
Description string
|
||||
Err error
|
||||
Events map[string]EventValue
|
||||
}
|
||||
|
||||
func newFakeSpan(name string) *FakeSpan {
|
||||
return &FakeSpan{
|
||||
Name: name,
|
||||
Attributes: map[attribute.Key]attribute.Value{},
|
||||
Events: map[string]EventValue{},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *FakeSpan) End() {
|
||||
if t.ended {
|
||||
panic("End already called")
|
||||
}
|
||||
t.ended = true
|
||||
}
|
||||
|
||||
func (t *FakeSpan) IsEnded() bool {
|
||||
return t.ended
|
||||
}
|
||||
|
||||
func (t *FakeSpan) SetAttributes(key string, value interface{}, kv attribute.KeyValue) {
|
||||
if t.IsEnded() {
|
||||
panic("span already ended")
|
||||
}
|
||||
t.Attributes[kv.Key] = kv.Value
|
||||
}
|
||||
|
||||
func (t *FakeSpan) SetName(name string) {
|
||||
if t.IsEnded() {
|
||||
panic("span already ended")
|
||||
}
|
||||
t.Name = name
|
||||
}
|
||||
|
||||
func (t *FakeSpan) SetStatus(code codes.Code, description string) {
|
||||
if t.IsEnded() {
|
||||
panic("span already ended")
|
||||
}
|
||||
t.StatusCode = code
|
||||
t.Description = description
|
||||
}
|
||||
|
||||
func (t *FakeSpan) RecordError(err error, options ...trace.EventOption) {
|
||||
if t.IsEnded() {
|
||||
panic("span already ended")
|
||||
}
|
||||
t.Err = err
|
||||
}
|
||||
|
||||
func (t *FakeSpan) AddEvents(keys []string, values []EventValue) {
|
||||
if t.IsEnded() {
|
||||
panic("span already ended")
|
||||
}
|
||||
if len(keys) != len(values) {
|
||||
panic("different number of keys and values")
|
||||
}
|
||||
for i := 0; i < len(keys); i++ {
|
||||
t.Events[keys[i]] = values[i]
|
||||
}
|
||||
}
|
||||
|
||||
type FakeTracer struct {
|
||||
Spans []*FakeSpan
|
||||
}
|
||||
|
||||
func (t *FakeTracer) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *FakeTracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, Span) {
|
||||
span := newFakeSpan(spanName)
|
||||
t.Spans = append(t.Spans, span)
|
||||
return ctx, span
|
||||
}
|
||||
|
||||
func (t *FakeTracer) Inject(ctx context.Context, header http.Header, span Span) {
|
||||
}
|
||||
|
||||
func NewFakeTracer() *FakeTracer {
|
||||
return &FakeTracer{Spans: []*FakeSpan{}}
|
||||
}
|
||||
|
@ -18,9 +18,13 @@ import (
|
||||
|
||||
type TestClient struct {
|
||||
plugins.Client
|
||||
QueryDataFunc backend.QueryDataHandlerFunc
|
||||
CallResourceFunc backend.CallResourceHandlerFunc
|
||||
CheckHealthFunc backend.CheckHealthHandlerFunc
|
||||
QueryDataFunc backend.QueryDataHandlerFunc
|
||||
CallResourceFunc backend.CallResourceHandlerFunc
|
||||
CheckHealthFunc backend.CheckHealthHandlerFunc
|
||||
CollectMetricsFunc backend.CollectMetricsHandlerFunc
|
||||
SubscribeStreamFunc func(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error)
|
||||
PublishStreamFunc func(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error)
|
||||
RunStreamFunc func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error
|
||||
}
|
||||
|
||||
func (c *TestClient) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
@ -47,6 +51,37 @@ func (c *TestClient) CheckHealth(ctx context.Context, req *backend.CheckHealthRe
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TestClient) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
||||
if c.CollectMetricsFunc != nil {
|
||||
return c.CollectMetricsFunc(ctx, req)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TestClient) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
||||
if c.PublishStreamFunc != nil {
|
||||
return c.PublishStreamFunc(ctx, req)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TestClient) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
if c.SubscribeStreamFunc != nil {
|
||||
return c.SubscribeStreamFunc(ctx, req)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TestClient) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
if c.RunStreamFunc != nil {
|
||||
return c.RunStreamFunc(ctx, req, sender)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type MiddlewareScenarioContext struct {
|
||||
QueryDataCallChain []string
|
||||
CallResourceCallChain []string
|
||||
@ -125,18 +160,24 @@ func (m *TestMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRe
|
||||
var _ plugins.Client = &TestClient{}
|
||||
|
||||
type ClientDecoratorTest struct {
|
||||
T *testing.T
|
||||
Context context.Context
|
||||
TestClient *TestClient
|
||||
Middlewares []plugins.ClientMiddleware
|
||||
Decorator *client.Decorator
|
||||
ReqContext *contextmodel.ReqContext
|
||||
QueryDataReq *backend.QueryDataRequest
|
||||
QueryDataCtx context.Context
|
||||
CallResourceReq *backend.CallResourceRequest
|
||||
CallResourceCtx context.Context
|
||||
CheckHealthReq *backend.CheckHealthRequest
|
||||
CheckHealthCtx context.Context
|
||||
T *testing.T
|
||||
Context context.Context
|
||||
TestClient *TestClient
|
||||
Middlewares []plugins.ClientMiddleware
|
||||
Decorator *client.Decorator
|
||||
ReqContext *contextmodel.ReqContext
|
||||
QueryDataReq *backend.QueryDataRequest
|
||||
QueryDataCtx context.Context
|
||||
CallResourceReq *backend.CallResourceRequest
|
||||
CallResourceCtx context.Context
|
||||
CheckHealthReq *backend.CheckHealthRequest
|
||||
CheckHealthCtx context.Context
|
||||
CollectMetricsReq *backend.CollectMetricsRequest
|
||||
CollectMetricsCtx context.Context
|
||||
SubscribeStreamReq *backend.SubscribeStreamRequest
|
||||
SubscribeStreamCtx context.Context
|
||||
PublishStreamReq *backend.PublishStreamRequest
|
||||
PublishStreamCtx context.Context
|
||||
}
|
||||
|
||||
type ClientDecoratorTestOption func(*ClientDecoratorTest)
|
||||
@ -162,6 +203,21 @@ func NewClientDecoratorTest(t *testing.T, opts ...ClientDecoratorTestOption) *Cl
|
||||
cdt.CheckHealthCtx = ctx
|
||||
return nil, nil
|
||||
},
|
||||
CollectMetricsFunc: func(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
||||
cdt.CollectMetricsReq = req
|
||||
cdt.CollectMetricsCtx = ctx
|
||||
return nil, nil
|
||||
},
|
||||
SubscribeStreamFunc: func(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
cdt.SubscribeStreamReq = req
|
||||
cdt.SubscribeStreamCtx = ctx
|
||||
return nil, nil
|
||||
},
|
||||
PublishStreamFunc: func(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
||||
cdt.PublishStreamReq = req
|
||||
cdt.PublishStreamCtx = ctx
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
require.NotNil(t, cdt)
|
||||
|
||||
|
@ -51,8 +51,8 @@ type MockAMConfigStore_GetLatestAlertmanagerConfiguration_Call struct {
|
||||
}
|
||||
|
||||
// GetLatestAlertmanagerConfiguration is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - query *models.GetLatestAlertmanagerConfigurationQuery
|
||||
// - ctx context.Context
|
||||
// - query *models.GetLatestAlertmanagerConfigurationQuery
|
||||
func (_e *MockAMConfigStore_Expecter) GetLatestAlertmanagerConfiguration(ctx interface{}, query interface{}) *MockAMConfigStore_GetLatestAlertmanagerConfiguration_Call {
|
||||
return &MockAMConfigStore_GetLatestAlertmanagerConfiguration_Call{Call: _e.mock.On("GetLatestAlertmanagerConfiguration", ctx, query)}
|
||||
}
|
||||
@ -89,8 +89,8 @@ type MockAMConfigStore_UpdateAlertmanagerConfiguration_Call struct {
|
||||
}
|
||||
|
||||
// UpdateAlertmanagerConfiguration is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - cmd *models.SaveAlertmanagerConfigurationCmd
|
||||
// - ctx context.Context
|
||||
// - cmd *models.SaveAlertmanagerConfigurationCmd
|
||||
func (_e *MockAMConfigStore_Expecter) UpdateAlertmanagerConfiguration(ctx interface{}, cmd interface{}) *MockAMConfigStore_UpdateAlertmanagerConfiguration_Call {
|
||||
return &MockAMConfigStore_UpdateAlertmanagerConfiguration_Call{Call: _e.mock.On("UpdateAlertmanagerConfiguration", ctx, cmd)}
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler"
|
||||
"github.com/grafana/grafana/pkg/services/query"
|
||||
|
@ -0,0 +1,135 @@
|
||||
package clientmiddleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler"
|
||||
"github.com/grafana/grafana/pkg/services/query"
|
||||
)
|
||||
|
||||
// NewTracingMiddleware returns a new middleware that creates a new span on every method call.
|
||||
func NewTracingMiddleware(tracer tracing.Tracer) plugins.ClientMiddleware {
|
||||
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
|
||||
return &TracingMiddleware{
|
||||
tracer: tracer,
|
||||
next: next,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
type TracingMiddleware struct {
|
||||
tracer tracing.Tracer
|
||||
next plugins.Client
|
||||
}
|
||||
|
||||
// setSpanAttributeFromHTTPHeader takes a ReqContext and a span, and adds the specified HTTP header as a span attribute
|
||||
// (string value), if the header is present.
|
||||
func setSpanAttributeFromHTTPHeader(headers http.Header, span tracing.Span, attributeName, headerName string) {
|
||||
// Set the attribute as string
|
||||
if v := headers.Get(headerName); v != "" {
|
||||
span.SetAttributes(attributeName, v, attribute.Key(attributeName).String(v))
|
||||
}
|
||||
}
|
||||
|
||||
// traceWrap returns a new context.Context which wraps a newly created span. The span will also contain attributes for
|
||||
// plugin id, org id, user login, ds, dashboard and panel info. The second function returned is a cleanup function,
|
||||
// which should be called by the caller (deferred) and will set the span status/error and end the span.
|
||||
func (m *TracingMiddleware) traceWrap(
|
||||
ctx context.Context, pluginContext backend.PluginContext, opName string,
|
||||
) (context.Context, func(error)) {
|
||||
// Start span
|
||||
ctx, span := m.tracer.Start(ctx, "PluginClient."+opName)
|
||||
|
||||
// Attach some plugin context information to span
|
||||
span.SetAttributes("plugin_id", pluginContext.PluginID, attribute.String("plugin_id", pluginContext.PluginID))
|
||||
span.SetAttributes("org_id", pluginContext.OrgID, attribute.Int64("org_id", pluginContext.OrgID))
|
||||
if settings := pluginContext.DataSourceInstanceSettings; settings != nil {
|
||||
span.SetAttributes("datasource_name", settings.Name, attribute.Key("datasource_name").String(settings.Name))
|
||||
span.SetAttributes("datasource_uid", settings.UID, attribute.Key("datasource_uid").String(settings.UID))
|
||||
}
|
||||
if u := pluginContext.User; u != nil {
|
||||
span.SetAttributes("user", u.Login, attribute.String("user", u.Login))
|
||||
}
|
||||
|
||||
// Additional attributes from http headers
|
||||
if reqCtx := contexthandler.FromContext(ctx); reqCtx != nil && reqCtx.Req != nil && len(reqCtx.Req.Header) > 0 {
|
||||
if v, err := strconv.Atoi(reqCtx.Req.Header.Get(query.HeaderPanelID)); err == nil {
|
||||
span.SetAttributes("panel_id", v, attribute.Key("panel_id").Int(v))
|
||||
}
|
||||
setSpanAttributeFromHTTPHeader(reqCtx.Req.Header, span, "query_group_id", query.HeaderQueryGroupID)
|
||||
setSpanAttributeFromHTTPHeader(reqCtx.Req.Header, span, "dashboard_uid", query.HeaderDashboardUID)
|
||||
}
|
||||
|
||||
// Return ctx with span + cleanup func
|
||||
return ctx, func(err error) {
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.RecordError(err)
|
||||
}
|
||||
span.End()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "queryData")
|
||||
defer func() { end(err) }()
|
||||
resp, err := m.next.QueryData(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "callResource")
|
||||
defer func() { end(err) }()
|
||||
err = m.next.CallResource(ctx, req, sender)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "checkHealth")
|
||||
defer func() { end(err) }()
|
||||
resp, err := m.next.CheckHealth(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "collectMetrics")
|
||||
defer func() { end(err) }()
|
||||
resp, err := m.next.CollectMetrics(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "subscribeStream")
|
||||
defer func() { end(err) }()
|
||||
resp, err := m.next.SubscribeStream(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "publishStream")
|
||||
defer func() { end(err) }()
|
||||
resp, err := m.next.PublishStream(ctx, req)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (m *TracingMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
var err error
|
||||
ctx, end := m.traceWrap(ctx, req.PluginContext, "runStream")
|
||||
defer func() { end(err) }()
|
||||
err = m.next.RunStream(ctx, req, sender)
|
||||
return err
|
||||
}
|
@ -0,0 +1,394 @@
|
||||
package clientmiddleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/manager/client/clienttest"
|
||||
"github.com/grafana/grafana/pkg/services/contexthandler/ctxkey"
|
||||
contextmodel "github.com/grafana/grafana/pkg/services/contexthandler/model"
|
||||
"github.com/grafana/grafana/pkg/web"
|
||||
)
|
||||
|
||||
func TestTracingMiddleware(t *testing.T) {
|
||||
pluginCtx := backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{},
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
run func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error
|
||||
expSpanName string
|
||||
}{
|
||||
{
|
||||
name: "QueryData",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
_, err := cdt.Decorator.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
PluginContext: pluginCtx,
|
||||
})
|
||||
return err
|
||||
},
|
||||
expSpanName: "PluginClient.queryData",
|
||||
},
|
||||
{
|
||||
name: "CallResource",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
return cdt.Decorator.CallResource(context.Background(), &backend.CallResourceRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}, nopCallResourceSender)
|
||||
},
|
||||
expSpanName: "PluginClient.callResource",
|
||||
},
|
||||
{
|
||||
name: "CheckHealth",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
_, err := cdt.Decorator.CheckHealth(context.Background(), &backend.CheckHealthRequest{
|
||||
PluginContext: pluginCtx,
|
||||
})
|
||||
return err
|
||||
},
|
||||
expSpanName: "PluginClient.checkHealth",
|
||||
},
|
||||
{
|
||||
name: "CollectMetrics",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
_, err := cdt.Decorator.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{
|
||||
PluginContext: pluginCtx,
|
||||
})
|
||||
return err
|
||||
},
|
||||
expSpanName: "PluginClient.collectMetrics",
|
||||
},
|
||||
{
|
||||
name: "SubscribeStream",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
_, err := cdt.Decorator.SubscribeStream(context.Background(), &backend.SubscribeStreamRequest{
|
||||
PluginContext: pluginCtx,
|
||||
})
|
||||
return err
|
||||
},
|
||||
expSpanName: "PluginClient.subscribeStream",
|
||||
},
|
||||
{
|
||||
name: "PublishStream",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
_, err := cdt.Decorator.PublishStream(context.Background(), &backend.PublishStreamRequest{
|
||||
PluginContext: pluginCtx,
|
||||
})
|
||||
return err
|
||||
},
|
||||
expSpanName: "PluginClient.publishStream",
|
||||
},
|
||||
{
|
||||
name: "RunStream",
|
||||
run: func(pluginCtx backend.PluginContext, cdt *clienttest.ClientDecoratorTest) error {
|
||||
return cdt.Decorator.RunStream(context.Background(), &backend.RunStreamRequest{
|
||||
PluginContext: pluginCtx,
|
||||
}, &backend.StreamSender{})
|
||||
},
|
||||
expSpanName: "PluginClient.runStream",
|
||||
},
|
||||
} {
|
||||
t.Run("Creates spans on "+tc.name, func(t *testing.T) {
|
||||
t.Run("successful", func(t *testing.T) {
|
||||
tracer := tracing.NewFakeTracer()
|
||||
|
||||
cdt := clienttest.NewClientDecoratorTest(
|
||||
t,
|
||||
clienttest.WithMiddlewares(NewTracingMiddleware(tracer)),
|
||||
)
|
||||
|
||||
err := tc.run(pluginCtx, cdt)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, tracer.Spans, 1, "must have 1 span")
|
||||
span := tracer.Spans[0]
|
||||
assert.True(t, span.IsEnded(), "span should be ended")
|
||||
assert.NoError(t, span.Err, "span should not have an error")
|
||||
assert.Equal(t, codes.Unset, span.StatusCode, "span should not have a status code")
|
||||
assert.Equal(t, tc.expSpanName, span.Name)
|
||||
})
|
||||
|
||||
t.Run("error", func(t *testing.T) {
|
||||
tracer := tracing.NewFakeTracer()
|
||||
|
||||
cdt := clienttest.NewClientDecoratorTest(
|
||||
t,
|
||||
clienttest.WithMiddlewares(
|
||||
NewTracingMiddleware(tracer),
|
||||
newAlwaysErrorMiddleware(errors.New("ops")),
|
||||
),
|
||||
)
|
||||
|
||||
err := tc.run(pluginCtx, cdt)
|
||||
require.Error(t, err)
|
||||
require.Len(t, tracer.Spans, 1, "must have 1 span")
|
||||
span := tracer.Spans[0]
|
||||
assert.True(t, span.IsEnded(), "span should be ended")
|
||||
assert.Error(t, span.Err, "span should contain an error")
|
||||
assert.Equal(t, codes.Error, span.StatusCode, "span code should be error")
|
||||
})
|
||||
|
||||
t.Run("panic", func(t *testing.T) {
|
||||
var didPanic bool
|
||||
|
||||
tracer := tracing.NewFakeTracer()
|
||||
|
||||
cdt := clienttest.NewClientDecoratorTest(
|
||||
t,
|
||||
clienttest.WithMiddlewares(
|
||||
NewTracingMiddleware(tracer),
|
||||
newAlwaysPanicMiddleware("panic!"),
|
||||
),
|
||||
)
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
// Swallow panic so the test can keep running,
|
||||
// and we can assert that the client panicked
|
||||
if r := recover(); r != nil {
|
||||
didPanic = true
|
||||
}
|
||||
}()
|
||||
_ = tc.run(pluginCtx, cdt)
|
||||
}()
|
||||
|
||||
require.True(t, didPanic, "should have panicked")
|
||||
require.Len(t, tracer.Spans, 1, "must have 1 span")
|
||||
span := tracer.Spans[0]
|
||||
assert.True(t, span.IsEnded(), "span should be ended")
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracingMiddlewareAttributes(t *testing.T) {
|
||||
defaultPluginContextRequestMut := func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
req.PluginContext.PluginID = "my_plugin_id"
|
||||
req.PluginContext.OrgID = 1337
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
requestMut []func(ctx *context.Context, req *backend.QueryDataRequest)
|
||||
assert func(t *testing.T, span *tracing.FakeSpan)
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
defaultPluginContextRequestMut,
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
assert.Len(t, span.Attributes, 2, "should have correct number of span attributes")
|
||||
assert.Equal(t, "my_plugin_id", span.Attributes["plugin_id"].AsString(), "should have correct plugin_id")
|
||||
assert.Equal(t, int64(1337), span.Attributes["org_id"].AsInt64(), "should have correct org_id")
|
||||
_, ok := span.Attributes["user"]
|
||||
assert.False(t, ok, "should not have user attribute")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "with user",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
defaultPluginContextRequestMut,
|
||||
func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
req.PluginContext.User = &backend.User{Login: "admin"}
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
assert.Len(t, span.Attributes, 3, "should have correct number of span attributes")
|
||||
assert.Equal(t, "my_plugin_id", span.Attributes["plugin_id"].AsString(), "should have correct plugin_id")
|
||||
assert.Equal(t, int64(1337), span.Attributes["org_id"].AsInt64(), "should have correct org_id")
|
||||
assert.Equal(t, "admin", span.Attributes["user"].AsString(), "should have correct user attribute")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty retains zero values",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
assert.Len(t, span.Attributes, 2, "should have correct number of span attributes")
|
||||
assert.Zero(t, span.Attributes["plugin_id"].AsString(), "should have correct plugin_id")
|
||||
assert.Zero(t, span.Attributes["org_id"].AsInt64(), "should have correct org_id")
|
||||
_, ok := span.Attributes["user"]
|
||||
assert.False(t, ok, "should not have user attribute")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no http headers",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
*ctx = ctxkey.Set(*ctx, &contextmodel.ReqContext{Context: &web.Context{Req: &http.Request{Header: nil}}})
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
assert.Empty(t, span.Attributes["panel_id"])
|
||||
assert.Empty(t, span.Attributes["dashboard_id"])
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "datasource settings",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
req.PluginContext.DataSourceInstanceSettings = &backend.DataSourceInstanceSettings{
|
||||
UID: "uid",
|
||||
Name: "name",
|
||||
Type: "type",
|
||||
}
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
require.Len(t, span.Attributes, 4)
|
||||
for _, k := range []string{"plugin_id", "org_id"} {
|
||||
_, ok := span.Attributes[attribute.Key(k)]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
assert.Equal(t, "uid", span.Attributes["datasource_uid"].AsString())
|
||||
assert.Equal(t, "name", span.Attributes["datasource_name"].AsString())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "http headers",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
*ctx = ctxkey.Set(*ctx, newReqContextWithRequest(&http.Request{
|
||||
Header: map[string][]string{
|
||||
"X-Panel-Id": {"10"},
|
||||
"X-Dashboard-Uid": {"dashboard uid"},
|
||||
"X-Query-Group-Id": {"query group id"},
|
||||
"X-Other": {"30"},
|
||||
},
|
||||
}))
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
require.Len(t, span.Attributes, 5)
|
||||
for _, k := range []string{"plugin_id", "org_id"} {
|
||||
_, ok := span.Attributes[attribute.Key(k)]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
assert.Equal(t, int64(10), span.Attributes["panel_id"].AsInt64())
|
||||
assert.Equal(t, "dashboard uid", span.Attributes["dashboard_uid"].AsString())
|
||||
assert.Equal(t, "query group id", span.Attributes["query_group_id"].AsString())
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "single http headers are skipped if not present or empty",
|
||||
requestMut: []func(ctx *context.Context, req *backend.QueryDataRequest){
|
||||
func(ctx *context.Context, req *backend.QueryDataRequest) {
|
||||
*ctx = ctxkey.Set(*ctx, newReqContextWithRequest(&http.Request{
|
||||
Header: map[string][]string{
|
||||
"X-Dashboard-Uid": {""},
|
||||
"X-Other": {"30"},
|
||||
},
|
||||
}))
|
||||
},
|
||||
},
|
||||
assert: func(t *testing.T, span *tracing.FakeSpan) {
|
||||
require.Len(t, span.Attributes, 2)
|
||||
for _, k := range []string{"plugin_id", "org_id"} {
|
||||
_, ok := span.Attributes[attribute.Key(k)]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
req := &backend.QueryDataRequest{
|
||||
PluginContext: backend.PluginContext{},
|
||||
}
|
||||
for _, mut := range tc.requestMut {
|
||||
mut(&ctx, req)
|
||||
}
|
||||
|
||||
tracer := tracing.NewFakeTracer()
|
||||
|
||||
cdt := clienttest.NewClientDecoratorTest(
|
||||
t,
|
||||
clienttest.WithMiddlewares(NewTracingMiddleware(tracer)),
|
||||
)
|
||||
|
||||
_, err := cdt.Decorator.QueryData(ctx, req)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, tracer.Spans, 1, "must have 1 span")
|
||||
span := tracer.Spans[0]
|
||||
assert.True(t, span.IsEnded(), "span should be ended")
|
||||
assert.NoError(t, span.Err, "span should not have an error")
|
||||
assert.Equal(t, codes.Unset, span.StatusCode, "span should not have a status code")
|
||||
|
||||
if tc.assert != nil {
|
||||
tc.assert(t, span)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newReqContextWithRequest(req *http.Request) *contextmodel.ReqContext {
|
||||
return &contextmodel.ReqContext{
|
||||
Context: &web.Context{
|
||||
Req: req,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// alwaysErrorFuncMiddleware is a middleware that runs the specified f function for each method, and returns the error
|
||||
// returned by f. Any other return values are set to their zero-value.
|
||||
// If recovererFunc is specified, it is run in case of panic in the middleware (f).
|
||||
type alwaysErrorFuncMiddleware struct {
|
||||
f func() error
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
return nil, m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
return m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
return nil, m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
|
||||
return nil, m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
|
||||
return nil, m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
|
||||
return nil, m.f()
|
||||
}
|
||||
|
||||
func (m *alwaysErrorFuncMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
|
||||
return m.f()
|
||||
}
|
||||
|
||||
// newAlwaysErrorMiddleware returns a new middleware that always returns the specified error.
|
||||
func newAlwaysErrorMiddleware(err error) plugins.ClientMiddleware {
|
||||
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
|
||||
return &alwaysErrorFuncMiddleware{func() error {
|
||||
return err
|
||||
}}
|
||||
})
|
||||
}
|
||||
|
||||
// newAlwaysPanicMiddleware returns a new middleware that always panics with the specified message,
|
||||
func newAlwaysPanicMiddleware(message string) plugins.ClientMiddleware {
|
||||
return plugins.ClientMiddlewareFunc(func(next plugins.Client) plugins.Client {
|
||||
return &alwaysErrorFuncMiddleware{func() error {
|
||||
panic(message)
|
||||
return nil // nolint:govet
|
||||
}}
|
||||
})
|
||||
}
|
@ -3,6 +3,7 @@ package pluginsintegration
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
|
||||
"github.com/grafana/grafana/pkg/plugins/backendplugin/provider"
|
||||
@ -74,22 +75,25 @@ var WireExtensionSet = wire.NewSet(
|
||||
|
||||
func ProvideClientDecorator(cfg *setting.Cfg, pCfg *config.Cfg,
|
||||
pluginRegistry registry.Service,
|
||||
oAuthTokenService oauthtoken.OAuthTokenService) (*client.Decorator, error) {
|
||||
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService)
|
||||
oAuthTokenService oauthtoken.OAuthTokenService,
|
||||
tracer tracing.Tracer) (*client.Decorator, error) {
|
||||
return NewClientDecorator(cfg, pCfg, pluginRegistry, oAuthTokenService, tracer)
|
||||
}
|
||||
|
||||
func NewClientDecorator(cfg *setting.Cfg, pCfg *config.Cfg,
|
||||
pluginRegistry registry.Service,
|
||||
oAuthTokenService oauthtoken.OAuthTokenService) (*client.Decorator, error) {
|
||||
oAuthTokenService oauthtoken.OAuthTokenService,
|
||||
tracer tracing.Tracer) (*client.Decorator, error) {
|
||||
c := client.ProvideService(pluginRegistry, pCfg)
|
||||
middlewares := CreateMiddlewares(cfg, oAuthTokenService)
|
||||
middlewares := CreateMiddlewares(cfg, oAuthTokenService, tracer)
|
||||
|
||||
return client.NewDecorator(c, middlewares...)
|
||||
}
|
||||
|
||||
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService) []plugins.ClientMiddleware {
|
||||
func CreateMiddlewares(cfg *setting.Cfg, oAuthTokenService oauthtoken.OAuthTokenService, tracer tracing.Tracer) []plugins.ClientMiddleware {
|
||||
skipCookiesNames := []string{cfg.LoginCookieName}
|
||||
middlewares := []plugins.ClientMiddleware{
|
||||
clientmiddleware.NewTracingMiddleware(tracer),
|
||||
clientmiddleware.NewTracingHeaderMiddleware(),
|
||||
clientmiddleware.NewClearAuthHeadersMiddleware(),
|
||||
clientmiddleware.NewOAuthTokenMiddleware(oAuthTokenService),
|
||||
|
@ -3,36 +3,54 @@ package updatechecker
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/hashicorp/go-version"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient/httpclientprovider"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
const grafanaLatestJSONURL = "https://raw.githubusercontent.com/grafana/grafana/main/latest.json"
|
||||
|
||||
type GrafanaService struct {
|
||||
hasUpdate bool
|
||||
latestVersion string
|
||||
|
||||
enabled bool
|
||||
grafanaVersion string
|
||||
httpClient http.Client
|
||||
httpClient httpClient
|
||||
mutex sync.RWMutex
|
||||
log log.Logger
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
func ProvideGrafanaService(cfg *setting.Cfg) *GrafanaService {
|
||||
func ProvideGrafanaService(cfg *setting.Cfg, tracer tracing.Tracer) (*GrafanaService, error) {
|
||||
logger := log.New("grafana.update.checker")
|
||||
cl, err := httpclient.New(httpclient.Options{
|
||||
Middlewares: []httpclient.Middleware{
|
||||
httpclientprovider.TracingMiddleware(logger, tracer),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &GrafanaService{
|
||||
enabled: cfg.CheckForGrafanaUpdates,
|
||||
grafanaVersion: cfg.BuildVersion,
|
||||
httpClient: http.Client{Timeout: 10 * time.Second},
|
||||
log: log.New("grafana.update.checker"),
|
||||
}
|
||||
httpClient: cl,
|
||||
log: logger,
|
||||
tracer: tracer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *GrafanaService) IsDisabled() bool {
|
||||
@ -40,7 +58,7 @@ func (s *GrafanaService) IsDisabled() bool {
|
||||
}
|
||||
|
||||
func (s *GrafanaService) Run(ctx context.Context) error {
|
||||
s.checkForUpdates()
|
||||
s.instrumentedCheckForUpdates(ctx)
|
||||
|
||||
ticker := time.NewTicker(time.Minute * 10)
|
||||
run := true
|
||||
@ -48,7 +66,7 @@ func (s *GrafanaService) Run(ctx context.Context) error {
|
||||
for run {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.checkForUpdates()
|
||||
s.instrumentedCheckForUpdates(ctx)
|
||||
case <-ctx.Done():
|
||||
run = false
|
||||
}
|
||||
@ -57,21 +75,39 @@ func (s *GrafanaService) Run(ctx context.Context) error {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
func (s *GrafanaService) checkForUpdates() {
|
||||
resp, err := s.httpClient.Get("https://raw.githubusercontent.com/grafana/grafana/main/latest.json")
|
||||
if err != nil {
|
||||
s.log.Debug("Failed to get latest.json repo from github.com", "error", err)
|
||||
func (s *GrafanaService) instrumentedCheckForUpdates(ctx context.Context) {
|
||||
start := time.Now()
|
||||
ctx, span := s.tracer.Start(ctx, "updatechecker.GrafanaService.checkForUpdates")
|
||||
defer span.End()
|
||||
ctxLogger := s.log.FromContext(ctx)
|
||||
if err := s.checkForUpdates(ctx); err != nil {
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("update check failed: %s", err))
|
||||
span.RecordError(err)
|
||||
ctxLogger.Error("Update check failed", "error", err, "duration", time.Since(start))
|
||||
return
|
||||
}
|
||||
ctxLogger.Info("Update check succeeded", "duration", time.Since(start))
|
||||
}
|
||||
|
||||
func (s *GrafanaService) checkForUpdates(ctx context.Context) error {
|
||||
ctxLogger := s.log.FromContext(ctx)
|
||||
ctxLogger.Debug("Checking for updates")
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, grafanaLatestJSONURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := s.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get latest.json repo from github.com: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
s.log.Warn("Failed to close response body", "err", err)
|
||||
ctxLogger.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
s.log.Debug("Update check failed, reading response from github.com", "error", err)
|
||||
return
|
||||
return fmt.Errorf("update check failed, reading response from github.com: %w", err)
|
||||
}
|
||||
|
||||
type latestJSON struct {
|
||||
@ -81,8 +117,7 @@ func (s *GrafanaService) checkForUpdates() {
|
||||
var latest latestJSON
|
||||
err = json.Unmarshal(body, &latest)
|
||||
if err != nil {
|
||||
s.log.Debug("Failed to unmarshal latest.json", "error", err)
|
||||
return
|
||||
return fmt.Errorf("failed to unmarshal latest.json: %w", err)
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
@ -100,6 +135,8 @@ func (s *GrafanaService) checkForUpdates() {
|
||||
if err1 == nil && err2 == nil {
|
||||
s.hasUpdate = currVersion.LessThan(latestVersion)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *GrafanaService) UpdateAvailable() bool {
|
||||
|
@ -3,15 +3,21 @@ package updatechecker
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||
"github.com/hashicorp/go-version"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient/httpclientprovider"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
@ -25,21 +31,29 @@ type PluginsService struct {
|
||||
httpClient httpClient
|
||||
mutex sync.RWMutex
|
||||
log log.Logger
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
func ProvidePluginsService(cfg *setting.Cfg, pluginStore plugins.Store) *PluginsService {
|
||||
func ProvidePluginsService(cfg *setting.Cfg, pluginStore plugins.Store, tracer tracing.Tracer) (*PluginsService, error) {
|
||||
logger := log.New("plugins.update.checker")
|
||||
cl, err := httpclient.New(httpclient.Options{
|
||||
Middlewares: []httpclient.Middleware{
|
||||
httpclientprovider.TracingMiddleware(logger, tracer),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PluginsService{
|
||||
enabled: cfg.CheckForPluginUpdates,
|
||||
grafanaVersion: cfg.BuildVersion,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
log: log.New("plugins.update.checker"),
|
||||
httpClient: cl,
|
||||
log: logger,
|
||||
tracer: tracer,
|
||||
pluginStore: pluginStore,
|
||||
availableUpdates: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
type httpClient interface {
|
||||
Get(url string) (resp *http.Response, err error)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PluginsService) IsDisabled() bool {
|
||||
@ -47,7 +61,7 @@ func (s *PluginsService) IsDisabled() bool {
|
||||
}
|
||||
|
||||
func (s *PluginsService) Run(ctx context.Context) error {
|
||||
s.checkForUpdates(ctx)
|
||||
s.instrumentedCheckForUpdates(ctx)
|
||||
|
||||
ticker := time.NewTicker(time.Minute * 10)
|
||||
run := true
|
||||
@ -55,7 +69,7 @@ func (s *PluginsService) Run(ctx context.Context) error {
|
||||
for run {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.checkForUpdates(ctx)
|
||||
s.instrumentedCheckForUpdates(ctx)
|
||||
case <-ctx.Done():
|
||||
run = false
|
||||
}
|
||||
@ -83,26 +97,46 @@ func (s *PluginsService) HasUpdate(ctx context.Context, pluginID string) (string
|
||||
return "", false
|
||||
}
|
||||
|
||||
func (s *PluginsService) checkForUpdates(ctx context.Context) {
|
||||
s.log.Debug("Checking for updates")
|
||||
|
||||
localPlugins := s.pluginsEligibleForVersionCheck(ctx)
|
||||
resp, err := s.httpClient.Get("https://grafana.com/api/plugins/versioncheck?slugIn=" +
|
||||
s.pluginIDsCSV(localPlugins) + "&grafanaVersion=" + s.grafanaVersion)
|
||||
if err != nil {
|
||||
s.log.Debug("Failed to get plugins repo from grafana.com", "error", err.Error())
|
||||
func (s *PluginsService) instrumentedCheckForUpdates(ctx context.Context) {
|
||||
start := time.Now()
|
||||
ctx, span := s.tracer.Start(ctx, "updatechecker.PluginsService.checkForUpdates")
|
||||
defer span.End()
|
||||
ctxLogger := s.log.FromContext(ctx)
|
||||
if err := s.checkForUpdates(ctx); err != nil {
|
||||
span.SetStatus(codes.Error, fmt.Sprintf("update check failed: %s", err))
|
||||
span.RecordError(err)
|
||||
ctxLogger.Debug("Update check failed", "error", err, "duration", time.Since(start))
|
||||
return
|
||||
}
|
||||
ctxLogger.Info("Update check succeeded", "duration", time.Since(start))
|
||||
}
|
||||
|
||||
func (s *PluginsService) checkForUpdates(ctx context.Context) error {
|
||||
ctxLogger := s.log.FromContext(ctx)
|
||||
ctxLogger.Debug("Checking for updates")
|
||||
localPlugins := s.pluginsEligibleForVersionCheck(ctx)
|
||||
requestURL := "https://grafana.com/api/plugins/versioncheck?" + url.Values{
|
||||
"slugIn": []string{s.pluginIDsCSV(localPlugins)},
|
||||
"grafanaVersion": []string{s.grafanaVersion},
|
||||
}.Encode()
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := s.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get plugins repo from grafana.com: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
s.log.Warn("Failed to close response body", "err", err)
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
ctxLogger.Warn("Failed to close response body", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
s.log.Debug("Update check failed, reading response from grafana.com", "error", err.Error())
|
||||
return
|
||||
return fmt.Errorf("failed to read response from grafana.com: %w", err)
|
||||
}
|
||||
|
||||
type gcomPlugin struct {
|
||||
@ -112,8 +146,7 @@ func (s *PluginsService) checkForUpdates(ctx context.Context) {
|
||||
var gcomPlugins []gcomPlugin
|
||||
err = json.Unmarshal(body, &gcomPlugins)
|
||||
if err != nil {
|
||||
s.log.Debug("Failed to unmarshal plugin repo, reading response from grafana.com", "error", err.Error())
|
||||
return
|
||||
return fmt.Errorf("failed to unmarshal plugin repo, reading response from grafana.com: %w", err)
|
||||
}
|
||||
|
||||
availableUpdates := map[string]string{}
|
||||
@ -130,6 +163,8 @@ func (s *PluginsService) checkForUpdates(ctx context.Context) {
|
||||
s.availableUpdates = availableUpdates
|
||||
s.mutex.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func canUpdate(v1, v2 string) bool {
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
)
|
||||
|
||||
@ -166,10 +167,11 @@ func TestPluginUpdateChecker_checkForUpdates(t *testing.T) {
|
||||
httpClient: &fakeHTTPClient{
|
||||
fakeResp: jsonResp,
|
||||
},
|
||||
log: log.NewNopLogger(),
|
||||
log: log.NewNopLogger(),
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
}
|
||||
|
||||
svc.checkForUpdates(context.Background())
|
||||
svc.instrumentedCheckForUpdates(context.Background())
|
||||
|
||||
require.Equal(t, 1, len(svc.availableUpdates))
|
||||
|
||||
@ -198,8 +200,8 @@ type fakeHTTPClient struct {
|
||||
requestURL string
|
||||
}
|
||||
|
||||
func (c *fakeHTTPClient) Get(url string) (*http.Response, error) {
|
||||
c.requestURL = url
|
||||
func (c *fakeHTTPClient) Do(req *http.Request) (*http.Response, error) {
|
||||
c.requestURL = req.URL.String()
|
||||
|
||||
resp := &http.Response{
|
||||
Body: io.NopCloser(strings.NewReader(c.fakeResp)),
|
||||
|
7
pkg/services/updatechecker/updatechecker.go
Normal file
7
pkg/services/updatechecker/updatechecker.go
Normal file
@ -0,0 +1,7 @@
|
||||
package updatechecker
|
||||
|
||||
import "net/http"
|
||||
|
||||
type httpClient interface {
|
||||
Do(req *http.Request) (resp *http.Response, err error)
|
||||
}
|
Loading…
Reference in New Issue
Block a user