mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Elasticsearch: Decouple backend from infra/tracing (#90528)
This commit is contained in:
committed by
GitHub
parent
9caa8151d8
commit
f8c43d0bf3
@@ -18,8 +18,8 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// Used in logging to mark a stage
|
||||
@@ -54,7 +54,7 @@ type Client interface {
|
||||
}
|
||||
|
||||
// NewClient creates a new elasticsearch client
|
||||
var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (Client, error) {
|
||||
var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger) (Client, error) {
|
||||
logger = logger.FromContext(ctx).With("entity", "client")
|
||||
|
||||
ip, err := newIndexPattern(ds.Interval, ds.Database)
|
||||
@@ -71,7 +71,6 @@ var NewClient = func(ctx context.Context, ds *DatasourceInfo, logger log.Logger,
|
||||
ds: ds,
|
||||
configuredFields: ds.ConfiguredFields,
|
||||
indexPattern: ip,
|
||||
tracer: tracer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -81,7 +80,6 @@ type baseClientImpl struct {
|
||||
configuredFields ConfiguredFields
|
||||
indexPattern IndexPattern
|
||||
logger log.Logger
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
func (c *baseClientImpl) GetConfiguredFields() ConfiguredFields {
|
||||
@@ -164,7 +162,7 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
||||
var err error
|
||||
multiRequests := c.createMultiSearchRequests(r.Requests)
|
||||
queryParams := c.getMultiSearchQueryParameters()
|
||||
_, span := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch", trace.WithAttributes(
|
||||
_, span := tracing.DefaultTracer().Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch", trace.WithAttributes(
|
||||
attribute.String("queryParams", queryParams),
|
||||
attribute.String("url", c.ds.URL),
|
||||
))
|
||||
@@ -206,7 +204,7 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
|
||||
start = time.Now()
|
||||
var msr MultiSearchResponse
|
||||
dec := json.NewDecoder(res.Body)
|
||||
_, resSpan := c.tracer.Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse")
|
||||
_, resSpan := tracing.DefaultTracer().Start(c.ctx, "datasource.elasticsearch.queryData.executeMultisearch.decodeResponse")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
resSpan.RecordError(err)
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
func TestClient_ExecuteMultisearch(t *testing.T) {
|
||||
@@ -67,7 +66,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
|
||||
To: to,
|
||||
}
|
||||
|
||||
c, err := NewClient(context.Background(), &ds, log.New(), tracing.InitializeTracerForTest())
|
||||
c, err := NewClient(context.Background(), &ds, log.New())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
||||
@@ -163,7 +162,7 @@ func TestClient_ExecuteMultisearch(t *testing.T) {
|
||||
To: to2,
|
||||
}
|
||||
|
||||
c, err := NewClient(context.Background(), &ds, log.New(), tracing.InitializeTracerForTest())
|
||||
c, err := NewClient(context.Background(), &ds, log.New())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
||||
@@ -260,7 +259,7 @@ func TestClient_Index(t *testing.T) {
|
||||
To: to,
|
||||
}
|
||||
|
||||
c, err := NewClient(context.Background(), &ds, log.New(), tracing.InitializeTracerForTest())
|
||||
c, err := NewClient(context.Background(), &ds, log.New())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, c)
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -27,11 +26,10 @@ type elasticsearchDataQuery struct {
|
||||
dataQueries []backend.DataQuery
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
tracer tracing.Tracer
|
||||
keepLabelsInResponse bool
|
||||
}
|
||||
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, req *backend.QueryDataRequest, logger log.Logger, tracer tracing.Tracer) *elasticsearchDataQuery {
|
||||
var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, req *backend.QueryDataRequest, logger log.Logger) *elasticsearchDataQuery {
|
||||
_, fromAlert := req.Headers[headerFromAlert]
|
||||
fromExpression := req.GetHTTPHeader(headerFromExpression) != ""
|
||||
|
||||
@@ -40,7 +38,6 @@ var newElasticsearchDataQuery = func(ctx context.Context, client es.Client, req
|
||||
dataQueries: req.Queries,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
tracer: tracer,
|
||||
// To maintain backward compatibility, it is necessary to keep labels in responses for alerting and expressions queries.
|
||||
// Historically, these labels have been used in alerting rules and transformations.
|
||||
keepLabelsInResponse: fromAlert || fromExpression,
|
||||
@@ -84,7 +81,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
|
||||
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, err), nil
|
||||
}
|
||||
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger, e.tracer)
|
||||
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger)
|
||||
}
|
||||
|
||||
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -1862,6 +1861,6 @@ func executeElasticsearchDataQuery(c es.Client, body string, from, to time.Time)
|
||||
},
|
||||
},
|
||||
}
|
||||
query := newElasticsearchDataQuery(context.Background(), c, &dataRequest, log.New(), tracing.InitializeTracerForTest())
|
||||
query := newElasticsearchDataQuery(context.Background(), c, &dataRequest, log.New())
|
||||
return query.execute()
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
||||
exphttpclient "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource/httpclient"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -37,14 +36,12 @@ const (
|
||||
|
||||
type Service struct {
|
||||
im instancemgmt.InstanceManager
|
||||
tracer tracing.Tracer
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func ProvideService(httpClientProvider *httpclient.Provider, tracer tracing.Tracer) *Service {
|
||||
func ProvideService(httpClientProvider *httpclient.Provider) *Service {
|
||||
return &Service{
|
||||
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
|
||||
tracer: tracer,
|
||||
logger: backend.NewLoggerWith("logger", "tsdb.elasticsearch"),
|
||||
}
|
||||
}
|
||||
@@ -59,20 +56,20 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
|
||||
return queryData(ctx, req, dsInfo, logger, s.tracer)
|
||||
return queryData(ctx, req, dsInfo, logger)
|
||||
}
|
||||
|
||||
// separate function to allow testing the whole transformation and query flow
|
||||
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *es.DatasourceInfo, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *es.DatasourceInfo, logger log.Logger) (*backend.QueryDataResponse, error) {
|
||||
if len(req.Queries) == 0 {
|
||||
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
|
||||
}
|
||||
|
||||
client, err := es.NewClient(ctx, dsInfo, logger, tracer)
|
||||
client, err := es.NewClient(ctx, dsInfo, logger)
|
||||
if err != nil {
|
||||
return &backend.QueryDataResponse{}, err
|
||||
}
|
||||
query := newElasticsearchDataQuery(ctx, client, req, logger, tracer)
|
||||
query := newElasticsearchDataQuery(ctx, client, req, logger)
|
||||
return query.execute()
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -142,7 +141,7 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,
|
||||
return nil
|
||||
})
|
||||
|
||||
result, err := queryData(context.Background(), &req, dsInfo, log.New(), tracing.InitializeTracerForTest())
|
||||
result, err := queryData(context.Background(), &req, dsInfo, log.New())
|
||||
if err != nil {
|
||||
return queryDataTestResult{}, err
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
@@ -20,7 +21,6 @@ import (
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
"github.com/grafana/grafana/pkg/tsdb/elasticsearch/instrumentation"
|
||||
)
|
||||
@@ -47,20 +47,20 @@ const (
|
||||
|
||||
var searchWordsRegex = regexp.MustCompile(regexp.QuoteMeta(es.HighlightPreTagsString) + `(.*?)` + regexp.QuoteMeta(es.HighlightPostTagsString))
|
||||
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, keepLabelsInResponse bool, logger log.Logger, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
||||
func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets []*Query, configuredFields es.ConfiguredFields, keepLabelsInResponse bool, logger log.Logger) (*backend.QueryDataResponse, error) {
|
||||
result := backend.QueryDataResponse{
|
||||
Responses: backend.Responses{},
|
||||
}
|
||||
if responses == nil {
|
||||
return &result, nil
|
||||
}
|
||||
ctx, span := tracer.Start(ctx, "datasource.elastic.parseResponse", trace.WithAttributes(
|
||||
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.elastic.parseResponse", trace.WithAttributes(
|
||||
attribute.Int("responseLength", len(responses)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
for i, res := range responses {
|
||||
_, resSpan := tracer.Start(ctx, "datasource.elastic.parseResponse.response", trace.WithAttributes(
|
||||
_, resSpan := tracing.DefaultTracer().Start(ctx, "datasource.elastic.parseResponse.response", trace.WithAttributes(
|
||||
attribute.String("queryMetricType", targets[i].Metrics[0].Type),
|
||||
))
|
||||
start := time.Now()
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
|
||||
)
|
||||
|
||||
@@ -3682,7 +3681,7 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string, keepL
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, keepLabelsInResponse, log.New(), tracing.InitializeTracerForTest())
|
||||
return parseResponse(context.Background(), response.Responses, queries, configuredFields, keepLabelsInResponse, log.New())
|
||||
}
|
||||
|
||||
func requireTimeValue(t *testing.T, expected int64, frame *data.Frame, index int) {
|
||||
|
||||
Reference in New Issue
Block a user