Update elasticsearch

This commit is contained in:
Ivana Huckova 2024-11-28 12:04:38 +01:00
parent 5bf0872d14
commit 348c8fa927
5 changed files with 26 additions and 22 deletions

View File

@ -17,9 +17,9 @@ import (
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
) )
// Used in logging to mark a stage // Used in logging to mark a stage
@ -182,9 +182,9 @@ func (c *baseClientImpl) ExecuteMultisearch(r *MultiSearchRequest) (*MultiSearch
status = "cancelled" status = "cancelled"
} }
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", StageDatabaseRequest} lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", StageDatabaseRequest}
sourceErr := exp.Error{} sourceErr := backend.ErrorWithSource{}
if errors.As(err, &sourceErr) { if errors.As(err, &sourceErr) {
lp = append(lp, "statusSource", sourceErr.Source()) lp = append(lp, "statusSource", sourceErr.ErrorSource())
} }
if clientRes != nil { if clientRes != nil {
lp = append(lp, "statusCode", clientRes.StatusCode) lp = append(lp, "statusCode", clientRes.StatusCode)

View File

@ -11,7 +11,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
@ -52,7 +51,8 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
if err != nil { if err != nil {
mq, _ := json.Marshal(e.dataQueries) mq, _ := json.Marshal(e.dataQueries)
e.logger.Error("Failed to parse queries", "error", err, "queries", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest) e.logger.Error("Failed to parse queries", "error", err, "queries", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
return errorsource.AddPluginErrorToResponse(e.dataQueries[0].RefID, response, err), nil response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
return response, nil
} }
ms := e.client.MultiSearch() ms := e.client.MultiSearch()
@ -63,7 +63,8 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
if err := e.processQuery(q, ms, from, to); err != nil { if err := e.processQuery(q, ms, from, to); err != nil {
mq, _ := json.Marshal(q) mq, _ := json.Marshal(q)
e.logger.Error("Failed to process query to multisearch request builder", "error", err, "query", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest) e.logger.Error("Failed to process query to multisearch request builder", "error", err, "query", string(mq), "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
return errorsource.AddPluginErrorToResponse(q.RefID, response, err), nil response.Responses[q.RefID] = backend.ErrorResponseWithErrorSource(err)
return response, nil
} }
} }
@ -71,21 +72,28 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
if err != nil { if err != nil {
mqs, _ := json.Marshal(e.dataQueries) mqs, _ := json.Marshal(e.dataQueries)
e.logger.Error("Failed to build multisearch request", "error", err, "queriesLength", len(queries), "queries", string(mqs), "duration", time.Since(start), "stage", es.StagePrepareRequest) e.logger.Error("Failed to build multisearch request", "error", err, "queriesLength", len(queries), "queries", string(mqs), "duration", time.Since(start), "stage", es.StagePrepareRequest)
return errorsource.AddPluginErrorToResponse(e.dataQueries[0].RefID, response, err), nil response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
return response, nil
} }
e.logger.Info("Prepared request", "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest) e.logger.Info("Prepared request", "queriesLength", len(queries), "duration", time.Since(start), "stage", es.StagePrepareRequest)
res, err := e.client.ExecuteMultisearch(req) res, err := e.client.ExecuteMultisearch(req)
if err != nil { if err != nil {
if backend.IsDownstreamHTTPError(err) { if backend.IsDownstreamHTTPError(err) {
err = errorsource.DownstreamError(err, false) err = backend.DownstreamError(err)
} }
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, err), nil response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(err)
return response, nil
} }
if res.Status >= 400 { if res.Status >= 400 {
errWithSource := errorsource.SourceError(backend.ErrorSourceFromHTTPStatus(res.Status), fmt.Errorf("unexpected status code: %d", res.Status), false) statusErr := fmt.Errorf("unexpected status code: %d", res.Status)
return errorsource.AddErrorToResponse(e.dataQueries[0].RefID, response, errWithSource), nil if backend.ErrorSourceFromHTTPStatus(res.Status) == backend.ErrorSourceDownstream {
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(statusErr))
} else {
response.Responses[e.dataQueries[0].RefID] = backend.ErrorResponseWithErrorSource(backend.PluginError(statusErr))
}
return response, nil
} }
return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger) return parseResponse(e.ctx, res.Responses, queries, e.client.GetConfiguredFields(), e.keepLabelsInResponse, e.logger)
@ -94,8 +102,7 @@ func (e *elasticsearchDataQuery) execute() (*backend.QueryDataResponse, error) {
func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error { func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchRequestBuilder, from, to int64) error {
err := isQueryWithError(q) err := isQueryWithError(q)
if err != nil { if err != nil {
err = errorsource.DownstreamError(fmt.Errorf("received invalid query. %w", err), false) return backend.DownstreamError(fmt.Errorf("received invalid query. %w", err))
return err
} }
defaultTimeField := e.client.GetConfiguredFields().TimeField defaultTimeField := e.client.GetConfiguredFields().TimeField

View File

@ -19,7 +19,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/log"
exp "github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client" es "github.com/grafana/grafana/pkg/tsdb/elasticsearch/client"
) )
@ -98,11 +97,11 @@ func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.Ins
timeField, ok := jsonData["timeField"].(string) timeField, ok := jsonData["timeField"].(string)
if !ok { if !ok {
return nil, exp.DownstreamError(errors.New("timeField cannot be cast to string"), false) return nil, backend.DownstreamError(errors.New("timeField cannot be cast to string"))
} }
if timeField == "" { if timeField == "" {
return nil, exp.DownstreamError(errors.New("elasticsearch time field name is required"), false) return nil, backend.DownstreamError(errors.New("elasticsearch time field name is required"))
} }
logLevelField, ok := jsonData["logLevelField"].(string) logLevelField, ok := jsonData["logLevelField"].(string)
@ -221,9 +220,9 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
status = "cancelled" status = "cancelled"
} }
lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", es.StageDatabaseRequest, "resourcePath", req.Path} lp := []any{"error", err, "status", status, "duration", time.Since(start), "stage", es.StageDatabaseRequest, "resourcePath", req.Path}
sourceErr := exp.Error{} sourceErr := backend.ErrorWithSource{}
if errors.As(err, &sourceErr) { if errors.As(err, &sourceErr) {
lp = append(lp, "statusSource", sourceErr.Source()) lp = append(lp, "statusSource", sourceErr.ErrorSource())
} }
if response != nil { if response != nil {
lp = append(lp, "statusCode", response.StatusCode) lp = append(lp, "statusCode", response.StatusCode)

View File

@ -5,7 +5,6 @@ import (
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -121,7 +120,7 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Len(t, dataResponse.Frames, 0) require.Len(t, dataResponse.Frames, 0)
require.ErrorContains(t, dataResponse.Error, "Trying to create too many buckets. Must be less than or equal to: [65536].") require.ErrorContains(t, dataResponse.Error, "Trying to create too many buckets. Must be less than or equal to: [65536].")
var sourceErr errorsource.Error var sourceErr backend.ErrorWithSource
ok = errors.As(dataResponse.Error, &sourceErr) ok = errors.As(dataResponse.Error, &sourceErr)
require.True(t, ok) require.True(t, ok)
require.Equal(t, sourceErr.ErrorSource().String(), "downstream") require.Equal(t, sourceErr.ErrorSource().String(), "downstream")

View File

@ -15,7 +15,6 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log" "github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing" "github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -75,7 +74,7 @@ func parseResponse(ctx context.Context, responses []*es.SearchResponse, targets
resSpan.End() resSpan.End()
logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt)) logger.Error("Processing error response from Elasticsearch", "error", string(me), "query", string(mt))
errResult := getErrorFromElasticResponse(res) errResult := getErrorFromElasticResponse(res)
result.Responses[target.RefID] = errorsource.Response(errorsource.DownstreamError(errors.New(errResult), false)) result.Responses[target.RefID] = backend.ErrorResponseWithErrorSource(backend.DownstreamError(errors.New(errResult)))
continue continue
} }