mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
This reverts commit 934456dc1c
.
This commit is contained in:
parent
c7442c0fd2
commit
8cd8eb7882
@ -13,13 +13,12 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/loki/instrumentation"
|
"github.com/grafana/grafana/pkg/tsdb/loki/instrumentation"
|
||||||
@ -156,10 +155,10 @@ func readLokiError(body io.ReadCloser) error {
|
|||||||
return makeLokiError(bytes)
|
return makeLokiError(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) backend.DataResponse {
|
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts ResponseOpts) (data.Frames, error) {
|
||||||
req, err := makeDataRequest(ctx, api.url, query)
|
req, err := makeDataRequest(ctx, api.url, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errorsource.Response(errorsource.PluginError(err, false))
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
queryAttrs := []any{"start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr, "queryType", query.QueryType, "direction", query.Direction, "maxLines", query.MaxLines, "supportingQueryType", query.SupportingQueryType, "lokiHost", req.URL.Host, "lokiPath", req.URL.Path}
|
queryAttrs := []any{"start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr, "queryType", query.QueryType, "direction", query.Direction, "maxLines", query.MaxLines, "supportingQueryType", query.SupportingQueryType, "lokiHost", req.URL.Host, "lokiPath", req.URL.Path}
|
||||||
@ -177,8 +176,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
|
|||||||
lp = append(lp, "statusCode", resp.StatusCode)
|
lp = append(lp, "statusCode", resp.StatusCode)
|
||||||
}
|
}
|
||||||
api.log.Error("Error received from Loki", lp...)
|
api.log.Error("Error received from Loki", lp...)
|
||||||
// Here, errors source is provided by errorsource middleware
|
return nil, err
|
||||||
return errorsource.Response(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -193,11 +191,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
|
|||||||
err := readLokiError(resp.Body)
|
err := readLokiError(resp.Body)
|
||||||
lp = append(lp, "status", "error", "error", err)
|
lp = append(lp, "status", "error", "error", err)
|
||||||
api.log.Error("Error received from Loki", lp...)
|
api.log.Error("Error received from Loki", lp...)
|
||||||
// errors should be processed by errorsource middleware
|
return nil, err
|
||||||
// here we do here something extra - turning non-200 to error
|
|
||||||
// we will consider this Plugin error, but let's re-evaluate if we need this
|
|
||||||
// @todo Re-evaluate if we need to turn non-200 to error
|
|
||||||
return errorsource.Response(errorsource.PluginError(err, false))
|
|
||||||
} else {
|
} else {
|
||||||
lp = append(lp, "status", "ok")
|
lp = append(lp, "status", "ok")
|
||||||
api.log.Info("Response received from loki", lp...)
|
api.log.Info("Response received from loki", lp...)
|
||||||
@ -217,14 +211,12 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
|
|||||||
span.SetStatus(codes.Error, err.Error())
|
span.SetStatus(codes.Error, err.Error())
|
||||||
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error")
|
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "error")
|
||||||
api.log.Error("Error parsing response from loki", "error", res.Error, "metricDataplane", responseOpts.metricDataplane, "duration", time.Since(start), "stage", stageParseResponse)
|
api.log.Error("Error parsing response from loki", "error", res.Error, "metricDataplane", responseOpts.metricDataplane, "duration", time.Since(start), "stage", stageParseResponse)
|
||||||
// Here the response.Error is set by converter.ReadPrometheusStyleResult without ErrorSource, which means it will always be PluginError.
|
return nil, res.Error
|
||||||
// @todo: We should look into when successful response is returned with error field and what type of ErrorSource we should set for that
|
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok")
|
instrumentation.UpdatePluginParsingResponseDurationSeconds(ctx, time.Since(start), "ok")
|
||||||
api.log.Info("Response parsed from loki", "duration", time.Since(start), "metricDataplane", responseOpts.metricDataplane, "framesLength", len(res.Frames), "stage", stageParseResponse)
|
api.log.Info("Response parsed from loki", "duration", time.Since(start), "metricDataplane", responseOpts.metricDataplane, "framesLength", len(res.Frames), "stage", stageParseResponse)
|
||||||
|
|
||||||
return res
|
return res.Frames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string) (*http.Request, error) {
|
func makeRawRequest(ctx context.Context, lokiDsUrl string, resourcePath string) (*http.Request, error) {
|
||||||
|
@ -28,8 +28,8 @@ func TestApiLogVolume(t *testing.T) {
|
|||||||
require.Equal(t, "Source=logvolhist", req.Header.Get("X-Query-Tags"))
|
require.Equal(t, "Source=logvolhist", req.Header.Get("X-Query-Tags"))
|
||||||
})
|
})
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange}, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsVolume, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -40,8 +40,8 @@ func TestApiLogVolume(t *testing.T) {
|
|||||||
require.Equal(t, "Source=logsample", req.Header.Get("X-Query-Tags"))
|
require.Equal(t, "Source=logsample", req.Header.Get("X-Query-Tags"))
|
||||||
})
|
})
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryLogsSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -52,8 +52,8 @@ func TestApiLogVolume(t *testing.T) {
|
|||||||
require.Equal(t, "Source=datasample", req.Header.Get("X-Query-Tags"))
|
require.Equal(t, "Source=datasample", req.Header.Get("X-Query-Tags"))
|
||||||
})
|
})
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryDataSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryDataSample, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -64,8 +64,8 @@ func TestApiLogVolume(t *testing.T) {
|
|||||||
require.Equal(t, "", req.Header.Get("X-Query-Tags"))
|
require.Equal(t, "", req.Header.Get("X-Query-Tags"))
|
||||||
})
|
})
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryNone, QueryType: QueryTypeRange}, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), lokiQuery{Expr: "", SupportingQueryType: SupportingQueryNone, QueryType: QueryTypeRange}, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -133,8 +133,8 @@ func TestApiUrlHandling(t *testing.T) {
|
|||||||
QueryType: QueryTypeRange,
|
QueryType: QueryTypeRange,
|
||||||
}
|
}
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), query, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), query, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -154,8 +154,8 @@ func TestApiUrlHandling(t *testing.T) {
|
|||||||
QueryType: QueryTypeInstant,
|
QueryType: QueryTypeInstant,
|
||||||
}
|
}
|
||||||
|
|
||||||
res := api.DataQuery(context.Background(), query, ResponseOpts{})
|
_, err := api.DataQuery(context.Background(), query, ResponseOpts{})
|
||||||
require.Equal(t, nil, res.Error)
|
require.NoError(t, err)
|
||||||
require.True(t, called)
|
require.True(t, called)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -60,10 +61,14 @@ func TestSuccessResponse(t *testing.T) {
|
|||||||
bytes, err := os.ReadFile(responseFileName)
|
bytes, err := os.ReadFile(responseFileName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
resp := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &query, responseOpts, log.New("test"))
|
frames, err := runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &query, responseOpts, log.New("test"))
|
||||||
require.Equal(t, nil, resp.Error)
|
require.NoError(t, err)
|
||||||
|
|
||||||
experimental.CheckGoldenJSONResponse(t, folder, goldenFileName, &resp, true)
|
dr := &backend.DataResponse{
|
||||||
|
Frames: frames,
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
experimental.CheckGoldenJSONResponse(t, folder, goldenFileName, dr, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tt {
|
for _, test := range tt {
|
||||||
@ -121,11 +126,11 @@ func TestErrorResponse(t *testing.T) {
|
|||||||
|
|
||||||
for _, test := range tt {
|
for _, test := range tt {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
resp := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}, ResponseOpts{}, log.New("test"))
|
frames, err := runQuery(context.Background(), makeMockedAPI(400, test.contentType, test.body, nil), &lokiQuery{QueryType: QueryTypeRange, Direction: DirectionBackward}, ResponseOpts{}, log.New("test"))
|
||||||
|
|
||||||
require.Len(t, resp.Frames, 0)
|
require.Len(t, frames, 0)
|
||||||
require.Error(t, resp.Error)
|
require.Error(t, err)
|
||||||
require.EqualError(t, resp.Error, test.errorMessage)
|
require.EqualError(t, err, test.errorMessage)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,10 +13,8 @@ import (
|
|||||||
"github.com/grafana/dskit/concurrency"
|
"github.com/grafana/dskit/concurrency"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||||
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/errorsource"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/codes"
|
"go.opentelemetry.io/otel/codes"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
@ -97,7 +95,6 @@ func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.Inst
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
opts.Middlewares = append(sdkhttpclient.DefaultMiddlewares(), errorsource.Middleware("errorsource"))
|
|
||||||
client, err := httpClientProvider.New(opts)
|
client, err := httpClientProvider.New(opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -242,28 +239,37 @@ func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataR
|
|||||||
|
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
res := runQuery(ctx, api, query, responseOpts, plog)
|
frames, err := runQuery(ctx, api, query, responseOpts, plog)
|
||||||
return res
|
queryRes := backend.DataResponse{}
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
queryRes.Error = err
|
||||||
|
} else {
|
||||||
|
queryRes.Frames = frames
|
||||||
|
}
|
||||||
|
|
||||||
|
return queryRes
|
||||||
}
|
}
|
||||||
|
|
||||||
// we extracted this part of the functionality to make it easy to unit-test it
|
// we extracted this part of the functionality to make it easy to unit-test it
|
||||||
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) backend.DataResponse {
|
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (data.Frames, error) {
|
||||||
dataResponse := api.DataQuery(ctx, *query, responseOpts)
|
frames, err := api.DataQuery(ctx, *query, responseOpts)
|
||||||
if dataResponse.Error != nil {
|
if err != nil {
|
||||||
plog.Error("Error querying loki", "error", dataResponse.Error)
|
plog.Error("Error querying loki", "error", err)
|
||||||
return dataResponse
|
return data.Frames{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, frame := range dataResponse.Frames {
|
for _, frame := range frames {
|
||||||
err := adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane)
|
err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plog.Error("Error adjusting frame", "error", err)
|
plog.Error("Error adjusting frame", "error", err)
|
||||||
return errorsource.Response(errorsource.PluginError(err, false))
|
return data.Frames{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return dataResponse
|
return frames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
|
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
|
||||||
|
@ -19,7 +19,7 @@ func BenchmarkMatrixJson(b *testing.B) {
|
|||||||
|
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
_ = runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &lokiQuery{}, ResponseOpts{}, log.New("test"))
|
_, _ = runQuery(context.Background(), makeMockedAPI(http.StatusOK, "application/json", bytes, nil), &lokiQuery{}, ResponseOpts{}, log.New("test"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user