mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Prometheus: Remove buffered client and feature toggle related to it (#59898)
* Remove prometheus buffered client and feature toggle related to it * Remove redundant pieces * Clean the integration test
This commit is contained in:
parent
8356df081d
commit
5424ec4157
@ -19,18 +19,17 @@ This page contains a list of available feature toggles. To learn how to turn on
|
|||||||
|
|
||||||
Some stable features are enabled by default. You can disable a stable feature by setting the feature flag to "false" in the configuration.
|
Some stable features are enabled by default. You can disable a stable feature by setting the feature flag to "false" in the configuration.
|
||||||
|
|
||||||
| Feature toggle name | Description | Enabled by default |
|
| Feature toggle name | Description | Enabled by default |
|
||||||
| ---------------------------- | --------------------------------------------------------------------------------------------------------------- | ------------------ |
|
| ---------------------------- | ------------------------------------------------------------------------------------ | ------------------ |
|
||||||
| `promQueryBuilder` | Show Prometheus query builder | Yes |
|
| `promQueryBuilder` | Show Prometheus query builder | Yes |
|
||||||
| `disableEnvelopeEncryption` | Disable envelope encryption (emergency only) | |
|
| `disableEnvelopeEncryption` | Disable envelope encryption (emergency only) | |
|
||||||
| `database_metrics` | Add Prometheus metrics for database tables | |
|
| `database_metrics` | Add Prometheus metrics for database tables | |
|
||||||
| `lokiMonacoEditor` | Access to Monaco query editor for Loki | Yes |
|
| `lokiMonacoEditor` | Access to Monaco query editor for Loki | Yes |
|
||||||
| `featureHighlights` | Highlight Grafana Enterprise features | |
|
| `featureHighlights` | Highlight Grafana Enterprise features | |
|
||||||
| `commandPalette` | Enable command palette | Yes |
|
| `commandPalette` | Enable command palette | Yes |
|
||||||
| `cloudWatchDynamicLabels` | Use dynamic labels instead of alias patterns in CloudWatch datasource | Yes |
|
| `cloudWatchDynamicLabels` | Use dynamic labels instead of alias patterns in CloudWatch datasource | Yes |
|
||||||
| `prometheusBufferedClient` | Enable buffered (old) client for Prometheus datasource as default instead of streaming JSON parser client (new) | |
|
| `internationalization` | Enables internationalization | Yes |
|
||||||
| `internationalization` | Enables internationalization | Yes |
|
| `accessTokenExpirationCheck` | Enable OAuth access_token expiration check and token refresh using the refresh_token | |
|
||||||
| `accessTokenExpirationCheck` | Enable OAuth access_token expiration check and token refresh using the refresh_token | |
|
|
||||||
|
|
||||||
## Beta feature toggles
|
## Beta feature toggles
|
||||||
|
|
||||||
|
@ -53,7 +53,6 @@ export interface FeatureToggles {
|
|||||||
cloudWatchDynamicLabels?: boolean;
|
cloudWatchDynamicLabels?: boolean;
|
||||||
datasourceQueryMultiStatus?: boolean;
|
datasourceQueryMultiStatus?: boolean;
|
||||||
traceToMetrics?: boolean;
|
traceToMetrics?: boolean;
|
||||||
prometheusBufferedClient?: boolean;
|
|
||||||
newDBLibrary?: boolean;
|
newDBLibrary?: boolean;
|
||||||
validateDashboardsOnSave?: boolean;
|
validateDashboardsOnSave?: boolean;
|
||||||
autoMigrateGraphPanels?: boolean;
|
autoMigrateGraphPanels?: boolean;
|
||||||
|
@ -214,11 +214,6 @@ var (
|
|||||||
State: FeatureStateAlpha,
|
State: FeatureStateAlpha,
|
||||||
FrontendOnly: true,
|
FrontendOnly: true,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
Name: "prometheusBufferedClient",
|
|
||||||
Description: "Enable buffered (old) client for Prometheus datasource as default instead of streaming JSON parser client (new)",
|
|
||||||
State: FeatureStateStable,
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
Name: "newDBLibrary",
|
Name: "newDBLibrary",
|
||||||
Description: "Use jmoiron/sqlx rather than xorm for a few backend services",
|
Description: "Use jmoiron/sqlx rather than xorm for a few backend services",
|
||||||
|
@ -155,10 +155,6 @@ const (
|
|||||||
// Enable trace to metrics links
|
// Enable trace to metrics links
|
||||||
FlagTraceToMetrics = "traceToMetrics"
|
FlagTraceToMetrics = "traceToMetrics"
|
||||||
|
|
||||||
// FlagPrometheusBufferedClient
|
|
||||||
// Enable buffered (old) client for Prometheus datasource as default instead of streaming JSON parser client (new)
|
|
||||||
FlagPrometheusBufferedClient = "prometheusBufferedClient"
|
|
||||||
|
|
||||||
// FlagNewDBLibrary
|
// FlagNewDBLibrary
|
||||||
// Use jmoiron/sqlx rather than xorm for a few backend services
|
// Use jmoiron/sqlx rather than xorm for a few backend services
|
||||||
FlagNewDBLibrary = "newDBLibrary"
|
FlagNewDBLibrary = "newDBLibrary"
|
||||||
|
@ -19,7 +19,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestIntegrationPrometheusBuffered(t *testing.T) {
|
func TestIntegrationPrometheus(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("skipping integration test")
|
t.Skip("skipping integration test")
|
||||||
}
|
}
|
||||||
@ -104,54 +104,6 @@ func TestIntegrationPrometheusBuffered(t *testing.T) {
|
|||||||
require.Equal(t, "basicAuthUser", username)
|
require.Equal(t, "basicAuthUser", username)
|
||||||
require.Equal(t, "basicAuthPassword", pwd)
|
require.Equal(t, "basicAuthPassword", pwd)
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
func TestIntegrationPrometheusClient(t *testing.T) {
|
|
||||||
if testing.Short() {
|
|
||||||
t.Skip("skipping integration test")
|
|
||||||
}
|
|
||||||
dir, path := testinfra.CreateGrafDir(t, testinfra.GrafanaOpts{})
|
|
||||||
|
|
||||||
grafanaListeningAddr, testEnv := testinfra.StartGrafanaEnv(t, dir, path)
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
testinfra.CreateUser(t, testEnv.SQLStore, user.CreateUserCommand{
|
|
||||||
DefaultOrgRole: string(org.RoleAdmin),
|
|
||||||
Password: "admin",
|
|
||||||
Login: "admin",
|
|
||||||
})
|
|
||||||
|
|
||||||
var outgoingRequest *http.Request
|
|
||||||
outgoingServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
outgoingRequest = r
|
|
||||||
w.WriteHeader(http.StatusUnauthorized)
|
|
||||||
}))
|
|
||||||
t.Cleanup(outgoingServer.Close)
|
|
||||||
|
|
||||||
jsonData := simplejson.NewFromAny(map[string]interface{}{
|
|
||||||
"httpMethod": "post",
|
|
||||||
"httpHeaderName1": "X-CUSTOM-HEADER",
|
|
||||||
"customQueryParameters": "q1=1&q2=2",
|
|
||||||
})
|
|
||||||
secureJSONData := map[string]string{
|
|
||||||
"basicAuthPassword": "basicAuthPassword",
|
|
||||||
"httpHeaderValue1": "custom-header-value",
|
|
||||||
}
|
|
||||||
|
|
||||||
uid := "prometheus"
|
|
||||||
err := testEnv.Server.HTTPServer.DataSourcesService.AddDataSource(ctx, &datasources.AddDataSourceCommand{
|
|
||||||
OrgId: 1,
|
|
||||||
Access: datasources.DS_ACCESS_PROXY,
|
|
||||||
Name: "Prometheus",
|
|
||||||
Type: datasources.DS_PROMETHEUS,
|
|
||||||
Uid: uid,
|
|
||||||
Url: outgoingServer.URL,
|
|
||||||
BasicAuth: true,
|
|
||||||
BasicAuthUser: "basicAuthUser",
|
|
||||||
JsonData: jsonData,
|
|
||||||
SecureJsonData: secureJSONData,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
t.Run("When calling /api/ds/query should set expected headers on outgoing HTTP request", func(t *testing.T) {
|
t.Run("When calling /api/ds/query should set expected headers on outgoing HTTP request", func(t *testing.T) {
|
||||||
query := simplejson.NewFromAny(map[string]interface{}{
|
query := simplejson.NewFromAny(map[string]interface{}{
|
||||||
@ -189,26 +141,4 @@ func TestIntegrationPrometheusClient(t *testing.T) {
|
|||||||
require.Equal(t, "basicAuthUser", username)
|
require.Equal(t, "basicAuthUser", username)
|
||||||
require.Equal(t, "basicAuthPassword", pwd)
|
require.Equal(t, "basicAuthPassword", pwd)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("When calling /api/datasources/uid/{uid}/resources/api/v1/labels should set expected headers on outgoing HTTP request", func(t *testing.T) {
|
|
||||||
u := fmt.Sprintf("http://%s/api/datasources/uid/%s/resources/api/v1/labels", grafanaListeningAddr, uid)
|
|
||||||
// nolint:gosec
|
|
||||||
resp, err := http.Post(u, "application/json", nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, http.StatusUnauthorized, resp.StatusCode)
|
|
||||||
t.Cleanup(func() {
|
|
||||||
err := resp.Body.Close()
|
|
||||||
require.NoError(t, err)
|
|
||||||
})
|
|
||||||
_, err = io.ReadAll(resp.Body)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.NotNil(t, outgoingRequest)
|
|
||||||
require.Equal(t, "/api/v1/labels?q1=1&q2=2", outgoingRequest.URL.String())
|
|
||||||
require.Equal(t, "custom-header-value", outgoingRequest.Header.Get("X-CUSTOM-HEADER"))
|
|
||||||
username, pwd, ok := outgoingRequest.BasicAuth()
|
|
||||||
require.True(t, ok)
|
|
||||||
require.Equal(t, "basicAuthUser", username)
|
|
||||||
require.Equal(t, "basicAuthPassword", pwd)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
@ -1,42 +0,0 @@
|
|||||||
GO = go
|
|
||||||
SHELL = /bin/zsh
|
|
||||||
|
|
||||||
ITERATIONS=10
|
|
||||||
BENCH=repeat $(ITERATIONS) $(LEFT_BRACKET) $(GO) test -benchmem -run=^$$ -bench
|
|
||||||
PROFILE=$(GO) test -benchmem -run=^$$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -cpuprofile cpuprofile.out -bench
|
|
||||||
|
|
||||||
LEFT_BRACKET = {
|
|
||||||
RIGHT_BRACKET = }
|
|
||||||
|
|
||||||
memprofile-exemplar memprofile-range: %: --%
|
|
||||||
$(GO) tool pprof -http=localhost:6061 memprofile.out
|
|
||||||
|
|
||||||
cpuprofile-exemplar cpuprofile-range: %: --%
|
|
||||||
$(GO) tool pprof -http=localhost:6061 cpuprofile.out
|
|
||||||
|
|
||||||
benchmark-exemplar benchmark-range: %: --%
|
|
||||||
sed -i 's/buffered/querydata/g' old.txt
|
|
||||||
benchstat old.txt new.txt
|
|
||||||
rm old.txt new.txt
|
|
||||||
|
|
||||||
--benchmark-range:
|
|
||||||
$(BENCH) ^BenchmarkRangeJson ./buffered >> old.txt $(RIGHT_BRACKET)
|
|
||||||
$(BENCH) ^BenchmarkRangeJson ./querydata >> new.txt $(RIGHT_BRACKET)
|
|
||||||
|
|
||||||
--memprofile-range:
|
|
||||||
$(PROFILE) ^BenchmarkRangeJson ./querydata
|
|
||||||
|
|
||||||
--cpuprofile-range:
|
|
||||||
$(PROFILE) ^BenchmarkRangeJson ./querydata
|
|
||||||
|
|
||||||
--benchmark-exemplar:
|
|
||||||
$(BENCH) ^BenchmarkExemplarJson ./buffered >> old.txt $(RIGHT_BRACKET)
|
|
||||||
$(BENCH) ^BenchmarkExemplarJson ./querydata >> new.txt $(RIGHT_BRACKET)
|
|
||||||
|
|
||||||
--memprofile-exemplar:
|
|
||||||
$(PROFILE) ^BenchmarkExemplarJson ./querydata
|
|
||||||
|
|
||||||
--cpuprofile-exemplar:
|
|
||||||
$(PROFILE) ^BenchmarkExemplarJson ./querydata
|
|
||||||
|
|
||||||
.PHONY: benchmark-range benchmark-exemplar memprofile-range memprofile-exemplar cpuprofile-range cpuprofile-exemplar
|
|
@ -1,180 +0,0 @@
|
|||||||
package buffered
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/experimental"
|
|
||||||
"github.com/prometheus/client_golang/api"
|
|
||||||
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
|
||||||
)
|
|
||||||
|
|
||||||
var update = true
|
|
||||||
|
|
||||||
func TestResponses(t *testing.T) {
|
|
||||||
tt := []struct {
|
|
||||||
name string
|
|
||||||
filepath string
|
|
||||||
}{
|
|
||||||
{name: "parse a simple matrix response", filepath: "range_simple"},
|
|
||||||
{name: "parse a simple matrix response with value missing steps", filepath: "range_missing"},
|
|
||||||
{name: "parse a matrix response with Infinity", filepath: "range_infinity"},
|
|
||||||
{name: "parse a matrix response with NaN", filepath: "range_nan"},
|
|
||||||
{name: "parse a response with legendFormat __auto", filepath: "range_auto"},
|
|
||||||
{name: "parse an exemplar response", filepath: "exemplar"},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tt {
|
|
||||||
t.Run(test.name, func(t *testing.T) {
|
|
||||||
queryFileName := filepath.Join("../testdata", test.filepath+".query.json")
|
|
||||||
responseFileName := filepath.Join("../testdata", test.filepath+".result.json")
|
|
||||||
goldenFileName := test.filepath + ".result.golden"
|
|
||||||
|
|
||||||
query, err := loadStoredPrometheusQuery(queryFileName)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
//nolint:gosec
|
|
||||||
responseBytes, err := os.ReadFile(responseFileName)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
result, err := runQuery(responseBytes, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, result.Responses, 1)
|
|
||||||
|
|
||||||
dr, found := result.Responses["A"]
|
|
||||||
require.True(t, found)
|
|
||||||
experimental.CheckGoldenJSONResponse(t, "../testdata", goldenFileName, &dr, update)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockedRoundTripper struct {
|
|
||||||
responseBytes []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mockedRT *mockedRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
||||||
return &http.Response{
|
|
||||||
StatusCode: http.StatusOK,
|
|
||||||
Body: io.NopCloser(bytes.NewReader(mockedRT.responseBytes)),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeMockedApi(responseBytes []byte) (apiv1.API, error) {
|
|
||||||
roundTripper := mockedRoundTripper{responseBytes: responseBytes}
|
|
||||||
|
|
||||||
cfg := api.Config{
|
|
||||||
Address: "http://localhost:9999",
|
|
||||||
RoundTripper: &roundTripper,
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := api.NewClient(cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
api := apiv1.NewAPI(client)
|
|
||||||
|
|
||||||
return api, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// we store the prometheus query data in a json file, here is some minimal code
|
|
||||||
// to be able to read it back. unfortunately we cannot use the PrometheusQuery
|
|
||||||
// struct here, because it has `time.time` and `time.duration` fields that
|
|
||||||
// cannot be unmarshalled from JSON automatically.
|
|
||||||
type storedPrometheusQuery struct {
|
|
||||||
RefId string
|
|
||||||
RangeQuery bool
|
|
||||||
ExemplarQuery bool
|
|
||||||
Start int64
|
|
||||||
End int64
|
|
||||||
Step int64
|
|
||||||
Expr string
|
|
||||||
LegendFormat string
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadStoredPrometheusQuery(fileName string) (storedPrometheusQuery, error) {
|
|
||||||
//nolint:gosec
|
|
||||||
bytes, err := os.ReadFile(fileName)
|
|
||||||
if err != nil {
|
|
||||||
return storedPrometheusQuery{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var sq storedPrometheusQuery
|
|
||||||
err = json.Unmarshal(bytes, &sq)
|
|
||||||
return sq, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func runQuery(response []byte, sq storedPrometheusQuery) (*backend.QueryDataResponse, error) {
|
|
||||||
api, err := makeMockedApi(response)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
tracer := tracing.InitializeTracerForTest()
|
|
||||||
|
|
||||||
qm := QueryModel{
|
|
||||||
RangeQuery: sq.RangeQuery,
|
|
||||||
ExemplarQuery: sq.ExemplarQuery,
|
|
||||||
Expr: sq.Expr,
|
|
||||||
Interval: fmt.Sprintf("%ds", sq.Step),
|
|
||||||
IntervalMS: sq.Step * 1000,
|
|
||||||
LegendFormat: sq.LegendFormat,
|
|
||||||
}
|
|
||||||
|
|
||||||
b := Buffered{
|
|
||||||
intervalCalculator: intervalv2.NewCalculator(),
|
|
||||||
tracer: tracer,
|
|
||||||
TimeInterval: "15s",
|
|
||||||
log: &logtest.Fake{},
|
|
||||||
client: api,
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(&qm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &backend.QueryDataRequest{
|
|
||||||
Queries: []backend.DataQuery{
|
|
||||||
{
|
|
||||||
TimeRange: backend.TimeRange{
|
|
||||||
From: time.Unix(sq.Start, 0),
|
|
||||||
To: time.Unix(sq.End, 0),
|
|
||||||
},
|
|
||||||
RefID: sq.RefId,
|
|
||||||
Interval: time.Second * time.Duration(sq.Step),
|
|
||||||
JSON: json.RawMessage(data),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
queries, err := b.parseTimeSeriesQuery(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseTimeSeriesQuery forces range queries if the only query is an exemplar query
|
|
||||||
// so we need to set it back to false
|
|
||||||
if qm.ExemplarQuery {
|
|
||||||
for i := range queries {
|
|
||||||
queries[i].RangeQuery = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.runQueries(context.Background(), queries)
|
|
||||||
}
|
|
@ -1,142 +0,0 @@
|
|||||||
package buffered
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
||||||
)
|
|
||||||
|
|
||||||
// when memory-profiling this benchmark, these commands are recommended:
|
|
||||||
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkExemplarJson$ github.com/grafana/grafana/pkg/tsdb/prometheus/buffered
|
|
||||||
// - go tool pprof -http=localhost:6061 memprofile.out
|
|
||||||
func BenchmarkExemplarJson(b *testing.B) {
|
|
||||||
queryFileName := filepath.Join("../testdata", "exemplar.query.json")
|
|
||||||
query, err := loadStoredQuery(queryFileName)
|
|
||||||
require.NoError(b, err)
|
|
||||||
|
|
||||||
responseFileName := filepath.Join("../testdata", "exemplar.result.json")
|
|
||||||
// This is a test, so it's safe to ignore gosec warning G304.
|
|
||||||
// nolint:gosec
|
|
||||||
responseBytes, err := os.ReadFile(responseFileName)
|
|
||||||
require.NoError(b, err)
|
|
||||||
|
|
||||||
api, err := makeMockedApi(responseBytes)
|
|
||||||
require.NoError(b, err)
|
|
||||||
|
|
||||||
tracer := tracing.InitializeTracerForTest()
|
|
||||||
|
|
||||||
s := Buffered{tracer: tracer, log: &logtest.Fake{}, client: api}
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
for n := 0; n < b.N; n++ {
|
|
||||||
_, err := s.runQueries(context.Background(), []*PrometheusQuery{query})
|
|
||||||
require.NoError(b, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// when memory-profiling this benchmark, these commands are recommended:
|
|
||||||
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkRangeJson$ github.com/grafana/grafana/pkg/tsdb/prometheus/buffered
|
|
||||||
// - go tool pprof -http=localhost:6061 memprofile.out
|
|
||||||
func BenchmarkRangeJson(b *testing.B) {
|
|
||||||
resp, query := createJsonTestData(1642000000, 1, 300, 400)
|
|
||||||
|
|
||||||
api, err := makeMockedApi(resp)
|
|
||||||
require.NoError(b, err)
|
|
||||||
|
|
||||||
s := Buffered{tracer: tracing.InitializeTracerForTest(), log: &logtest.Fake{}, client: api}
|
|
||||||
|
|
||||||
b.ResetTimer()
|
|
||||||
for n := 0; n < b.N; n++ {
|
|
||||||
_, err := s.runQueries(context.Background(), []*PrometheusQuery{&query})
|
|
||||||
require.NoError(b, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const nanRate = 0.002
|
|
||||||
|
|
||||||
// we build the JSON file from strings,
|
|
||||||
// it was easier to write it this way.
|
|
||||||
func makeJsonTestMetric(index int) string {
|
|
||||||
return fmt.Sprintf(`{"server":"main","category":"maintenance","case":"%v"}`, index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// return a value between -100 and +100, sometimes NaN, in string
|
|
||||||
func makeJsonTestValue(r *rand.Rand) string {
|
|
||||||
if r.Float64() < nanRate {
|
|
||||||
return "NaN"
|
|
||||||
} else {
|
|
||||||
return fmt.Sprintf("%f", (r.Float64()*200)-100)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// create one time-series
|
|
||||||
func makeJsonTestSeries(start int64, step int64, timestampCount int, r *rand.Rand, seriesIndex int) string {
|
|
||||||
var values []string
|
|
||||||
for i := 0; i < timestampCount; i++ {
|
|
||||||
// create out of order timestamps to test sorting
|
|
||||||
if seriesIndex == 0 && i%2 == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
value := fmt.Sprintf(`[%d,"%v"]`, start+(int64(i)*step), makeJsonTestValue(r))
|
|
||||||
values = append(values, value)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf(`{"metric":%v,"values":[%v]}`, makeJsonTestMetric(seriesIndex), strings.Join(values, ","))
|
|
||||||
}
|
|
||||||
|
|
||||||
func createJsonTestData(start int64, step int64, timestampCount int, seriesCount int) ([]byte, PrometheusQuery) {
|
|
||||||
// we use random numbers as values, but they have to be the same numbers
|
|
||||||
// every time we call this, so we create a random source.
|
|
||||||
r := rand.New(rand.NewSource(42))
|
|
||||||
var allSeries []string
|
|
||||||
for i := 0; i < seriesCount; i++ {
|
|
||||||
allSeries = append(allSeries, makeJsonTestSeries(start, step, timestampCount, r, i))
|
|
||||||
}
|
|
||||||
bytes := []byte(fmt.Sprintf(`{"data":{"resultType":"matrix","result":[%v]},"status":"success"}`, strings.Join(allSeries, ",")))
|
|
||||||
|
|
||||||
query := PrometheusQuery{
|
|
||||||
RefId: "A",
|
|
||||||
RangeQuery: true,
|
|
||||||
Start: time.Unix(start, 0),
|
|
||||||
End: time.Unix(start+((int64(timestampCount)-1)*step), 0),
|
|
||||||
Step: time.Second * time.Duration(step),
|
|
||||||
Expr: "test",
|
|
||||||
}
|
|
||||||
|
|
||||||
return bytes, query
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadStoredQuery(fileName string) (*PrometheusQuery, error) {
|
|
||||||
// This is a test, so it's safe to ignore gosec warning G304.
|
|
||||||
// nolint:gosec
|
|
||||||
bytes, err := os.ReadFile(fileName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var sq storedPrometheusQuery
|
|
||||||
|
|
||||||
err = json.Unmarshal(bytes, &sq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &PrometheusQuery{
|
|
||||||
RefId: "A",
|
|
||||||
ExemplarQuery: sq.ExemplarQuery,
|
|
||||||
Start: time.Unix(sq.Start, 0),
|
|
||||||
End: time.Unix(sq.End, 0),
|
|
||||||
Step: time.Second * time.Duration(sq.Step),
|
|
||||||
Expr: sq.Expr,
|
|
||||||
}, nil
|
|
||||||
}
|
|
@ -1,715 +0,0 @@
|
|||||||
package buffered
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
|
||||||
"regexp"
|
|
||||||
"sort"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
sdkHTTPClient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/client"
|
|
||||||
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
||||||
"github.com/prometheus/common/model"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/middleware"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/utils"
|
|
||||||
"github.com/grafana/grafana/pkg/util/maputil"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Internal interval and range variables
|
|
||||||
const (
|
|
||||||
varInterval = "$__interval"
|
|
||||||
varIntervalMs = "$__interval_ms"
|
|
||||||
varRange = "$__range"
|
|
||||||
varRangeS = "$__range_s"
|
|
||||||
varRangeMs = "$__range_ms"
|
|
||||||
varRateInterval = "$__rate_interval"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Internal interval and range variables with {} syntax
|
|
||||||
// Repetitive code, we should have functionality to unify these
|
|
||||||
const (
|
|
||||||
varIntervalAlt = "${__interval}"
|
|
||||||
varIntervalMsAlt = "${__interval_ms}"
|
|
||||||
varRangeAlt = "${__range}"
|
|
||||||
varRangeSAlt = "${__range_s}"
|
|
||||||
varRangeMsAlt = "${__range_ms}"
|
|
||||||
varRateIntervalAlt = "${__rate_interval}"
|
|
||||||
)
|
|
||||||
|
|
||||||
const legendFormatAuto = "__auto"
|
|
||||||
|
|
||||||
var (
|
|
||||||
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
|
||||||
safeRes = 11000
|
|
||||||
)
|
|
||||||
|
|
||||||
type Buffered struct {
|
|
||||||
intervalCalculator intervalv2.Calculator
|
|
||||||
tracer tracing.Tracer
|
|
||||||
client apiv1.API
|
|
||||||
log log.Logger
|
|
||||||
ID int64
|
|
||||||
URL string
|
|
||||||
TimeInterval string
|
|
||||||
}
|
|
||||||
|
|
||||||
type bufferedResponse struct {
|
|
||||||
Response interface{}
|
|
||||||
Warnings apiv1.Warnings
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates and object capable of executing and parsing a Prometheus queries. It's "buffered" because there is
|
|
||||||
// another implementation capable of streaming parse the response.
|
|
||||||
func New(roundTripper http.RoundTripper, tracer tracing.Tracer, settings backend.DataSourceInstanceSettings, plog log.Logger) (*Buffered, error) {
|
|
||||||
promClient, err := client.CreateAPIClient(roundTripper, settings.URL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error creating prom client: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonData, err := utils.GetJsonData(settings)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error getting jsonData: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Buffered{
|
|
||||||
intervalCalculator: intervalv2.NewCalculator(),
|
|
||||||
tracer: tracer,
|
|
||||||
log: plog,
|
|
||||||
client: promClient,
|
|
||||||
TimeInterval: timeInterval,
|
|
||||||
ID: settings.ID,
|
|
||||||
URL: settings.URL,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffered) ExecuteTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
|
||||||
// Add headers from the request to context so they are added later on by a context middleware. This is because
|
|
||||||
// prom client does not allow us to do this directly.
|
|
||||||
|
|
||||||
addHeaders := make(map[string]string)
|
|
||||||
|
|
||||||
if req.Headers["FromAlert"] == "true" {
|
|
||||||
addHeaders["FromAlert"] = "true"
|
|
||||||
}
|
|
||||||
|
|
||||||
ctxWithHeaders := sdkHTTPClient.WithContextualMiddleware(ctx, middleware.ReqHeadersMiddleware(addHeaders))
|
|
||||||
|
|
||||||
queries, err := b.parseTimeSeriesQuery(req)
|
|
||||||
if err != nil {
|
|
||||||
result := backend.QueryDataResponse{
|
|
||||||
Responses: backend.Responses{},
|
|
||||||
}
|
|
||||||
return &result, fmt.Errorf("error parsing time series query: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.runQueries(ctxWithHeaders, queries)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffered) runQueries(ctx context.Context, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) {
|
|
||||||
result := backend.QueryDataResponse{
|
|
||||||
Responses: backend.Responses{},
|
|
||||||
}
|
|
||||||
for _, query := range queries {
|
|
||||||
response, err := b.runQuery(ctx, query)
|
|
||||||
if err != nil {
|
|
||||||
return &result, err
|
|
||||||
}
|
|
||||||
result.Responses[query.RefId] = response
|
|
||||||
}
|
|
||||||
return &result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffered) runQuery(ctx context.Context, query *PrometheusQuery) (backend.DataResponse, error) {
|
|
||||||
ctx, endSpan := utils.StartTrace(ctx, b.tracer, "datasource.prometheus", []utils.Attribute{
|
|
||||||
{Key: "expr", Value: query.Expr, Kv: attribute.Key("expr").String(query.Expr)},
|
|
||||||
{Key: "start_unixnano", Value: query.Start, Kv: attribute.Key("start_unixnano").Int64(query.Start.UnixNano())},
|
|
||||||
{Key: "stop_unixnano", Value: query.End, Kv: attribute.Key("stop_unixnano").Int64(query.End.UnixNano())},
|
|
||||||
})
|
|
||||||
defer endSpan()
|
|
||||||
|
|
||||||
logger := b.log.FromContext(ctx) // read trace-id and other info from the context
|
|
||||||
logger.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
|
|
||||||
|
|
||||||
response := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
|
|
||||||
timeRange := apiv1.Range{
|
|
||||||
Step: query.Step,
|
|
||||||
// Align query range to step. It rounds start and end down to a multiple of step.
|
|
||||||
Start: alignTimeRange(query.Start, query.Step, query.UtcOffsetSec),
|
|
||||||
End: alignTimeRange(query.End, query.Step, query.UtcOffsetSec),
|
|
||||||
}
|
|
||||||
|
|
||||||
if query.RangeQuery {
|
|
||||||
rangeResponse, warnings, err := b.client.QueryRange(ctx, query.Expr, timeRange)
|
|
||||||
if err != nil {
|
|
||||||
var promErr *apiv1.Error
|
|
||||||
if errors.As(err, &promErr) {
|
|
||||||
logger.Error("Range query failed", "query", query.Expr, "error", err, "detail", promErr.Detail)
|
|
||||||
return backend.DataResponse{Error: fmt.Errorf("%w: details: %s", err, promErr.Detail)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Error("Range query failed", "query", query.Expr, "err", err)
|
|
||||||
return backend.DataResponse{Error: err}, nil
|
|
||||||
}
|
|
||||||
response[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: rangeResponse,
|
|
||||||
Warnings: warnings,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if query.InstantQuery {
|
|
||||||
instantResponse, warnings, err := b.client.Query(ctx, query.Expr, query.End)
|
|
||||||
if err != nil {
|
|
||||||
var promErr *apiv1.Error
|
|
||||||
if errors.As(err, &promErr) {
|
|
||||||
logger.Error("Instant query failed", "query", query.Expr, "error", err, "detail", promErr.Detail)
|
|
||||||
return backend.DataResponse{Error: fmt.Errorf("%w: details: %s", err, promErr.Detail)}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Error("Instant query failed", "query", query.Expr, "err", err)
|
|
||||||
return backend.DataResponse{Error: err}, nil
|
|
||||||
}
|
|
||||||
response[InstantQueryType] = bufferedResponse{
|
|
||||||
Response: instantResponse,
|
|
||||||
Warnings: warnings,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a special case
|
|
||||||
// If exemplar query returns error, we want to only log it and continue with other results processing
|
|
||||||
if query.ExemplarQuery {
|
|
||||||
exemplarResponse, err := b.client.QueryExemplars(ctx, query.Expr, timeRange.Start, timeRange.End)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Exemplar query failed", "query", query.Expr, "err", err)
|
|
||||||
} else {
|
|
||||||
response[ExemplarQueryType] = bufferedResponse{
|
|
||||||
Response: exemplarResponse,
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
frames, err := parseTimeSeriesResponse(response, query)
|
|
||||||
if err != nil {
|
|
||||||
return backend.DataResponse{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// The ExecutedQueryString can be viewed in QueryInspector in UI
|
|
||||||
for _, frame := range frames {
|
|
||||||
frame.Meta.ExecutedQueryString = "Expr: " + query.Expr + "\n" + "Step: " + query.Step.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
return backend.DataResponse{
|
|
||||||
Frames: frames,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
|
|
||||||
var legend = metric.String()
|
|
||||||
|
|
||||||
if query.LegendFormat == legendFormatAuto {
|
|
||||||
// If we have labels set legend to empty string to utilize the auto naming system
|
|
||||||
if len(metric) > 0 {
|
|
||||||
legend = ""
|
|
||||||
}
|
|
||||||
} else if query.LegendFormat != "" {
|
|
||||||
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
|
|
||||||
labelName := strings.Replace(string(in), "{{", "", 1)
|
|
||||||
labelName = strings.Replace(labelName, "}}", "", 1)
|
|
||||||
labelName = strings.TrimSpace(labelName)
|
|
||||||
if val, exists := metric[model.LabelName(labelName)]; exists {
|
|
||||||
return []byte(val)
|
|
||||||
}
|
|
||||||
return []byte{}
|
|
||||||
})
|
|
||||||
legend = string(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If legend is empty brackets, use query expression
|
|
||||||
if legend == "{}" {
|
|
||||||
legend = query.Expr
|
|
||||||
}
|
|
||||||
|
|
||||||
return legend
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *Buffered) parseTimeSeriesQuery(req *backend.QueryDataRequest) ([]*PrometheusQuery, error) {
|
|
||||||
qs := []*PrometheusQuery{}
|
|
||||||
for _, query := range req.Queries {
|
|
||||||
model := &QueryModel{}
|
|
||||||
err := json.Unmarshal(query.JSON, model)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error unmarshaling query model: %v", err)
|
|
||||||
}
|
|
||||||
// Final interval value
|
|
||||||
interval, err := calculatePrometheusInterval(model, b.TimeInterval, query, b.intervalCalculator)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error calculating interval: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Interpolate variables in expr
|
|
||||||
timeRange := query.TimeRange.To.Sub(query.TimeRange.From)
|
|
||||||
expr := interpolateVariables(model, interval, timeRange, b.intervalCalculator, b.TimeInterval)
|
|
||||||
rangeQuery := model.RangeQuery
|
|
||||||
if !model.InstantQuery && !model.RangeQuery {
|
|
||||||
// In older dashboards, we were not setting range query param and !range && !instant was run as range query
|
|
||||||
rangeQuery = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// We never want to run exemplar query for alerting
|
|
||||||
exemplarQuery := model.ExemplarQuery
|
|
||||||
if req.Headers["FromAlert"] == "true" {
|
|
||||||
exemplarQuery = false
|
|
||||||
}
|
|
||||||
|
|
||||||
qs = append(qs, &PrometheusQuery{
|
|
||||||
Expr: expr,
|
|
||||||
Step: interval,
|
|
||||||
LegendFormat: model.LegendFormat,
|
|
||||||
Start: query.TimeRange.From,
|
|
||||||
End: query.TimeRange.To,
|
|
||||||
RefId: query.RefID,
|
|
||||||
InstantQuery: model.InstantQuery,
|
|
||||||
RangeQuery: rangeQuery,
|
|
||||||
ExemplarQuery: exemplarQuery,
|
|
||||||
UtcOffsetSec: model.UtcOffsetSec,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return qs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseTimeSeriesResponse(value map[TimeSeriesQueryType]bufferedResponse, query *PrometheusQuery) (data.Frames, error) {
|
|
||||||
var (
|
|
||||||
frames = data.Frames{}
|
|
||||||
nextFrames = data.Frames{}
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, val := range value {
|
|
||||||
// Zero out the slice to prevent data corruption.
|
|
||||||
nextFrames = nextFrames[:0]
|
|
||||||
|
|
||||||
switch v := val.Response.(type) {
|
|
||||||
case model.Matrix:
|
|
||||||
nextFrames = matrixToDataFrames(v, query, nextFrames)
|
|
||||||
case model.Vector:
|
|
||||||
nextFrames = vectorToDataFrames(v, query, nextFrames)
|
|
||||||
case *model.Scalar:
|
|
||||||
nextFrames = scalarToDataFrames(v, query, nextFrames)
|
|
||||||
case []apiv1.ExemplarQueryResult:
|
|
||||||
nextFrames = exemplarToDataFrames(v, query, nextFrames)
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unexpected result type: %s query: %s", v, query.Expr)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(val.Warnings) > 0 {
|
|
||||||
for _, frame := range nextFrames {
|
|
||||||
if frame.Meta == nil {
|
|
||||||
frame.Meta = &data.FrameMeta{}
|
|
||||||
}
|
|
||||||
frame.Meta.Notices = readWarnings(val.Warnings)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
frames = append(frames, nextFrames...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return frames, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func readWarnings(warnings apiv1.Warnings) []data.Notice {
|
|
||||||
notices := []data.Notice{}
|
|
||||||
|
|
||||||
for _, w := range warnings {
|
|
||||||
notice := data.Notice{
|
|
||||||
Severity: data.NoticeSeverityWarning,
|
|
||||||
Text: w,
|
|
||||||
}
|
|
||||||
notices = append(notices, notice)
|
|
||||||
}
|
|
||||||
|
|
||||||
return notices
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculatePrometheusInterval(model *QueryModel, timeInterval string, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) {
|
|
||||||
queryInterval := model.Interval
|
|
||||||
|
|
||||||
// If we are using variable for interval/step, we will replace it with calculated interval
|
|
||||||
if isVariableInterval(queryInterval) {
|
|
||||||
queryInterval = ""
|
|
||||||
}
|
|
||||||
|
|
||||||
minInterval, err := intervalv2.GetIntervalFrom(timeInterval, queryInterval, model.IntervalMS, 15*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
return time.Duration(0), err
|
|
||||||
}
|
|
||||||
calculatedInterval := intervalCalculator.Calculate(query.TimeRange, minInterval, query.MaxDataPoints)
|
|
||||||
safeInterval := intervalCalculator.CalculateSafeInterval(query.TimeRange, int64(safeRes))
|
|
||||||
|
|
||||||
adjustedInterval := safeInterval.Value
|
|
||||||
if calculatedInterval.Value > safeInterval.Value {
|
|
||||||
adjustedInterval = calculatedInterval.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
if model.Interval == varRateInterval || model.Interval == varRateIntervalAlt {
|
|
||||||
// Rate interval is final and is not affected by resolution
|
|
||||||
return calculateRateInterval(adjustedInterval, timeInterval, intervalCalculator), nil
|
|
||||||
} else {
|
|
||||||
intervalFactor := model.IntervalFactor
|
|
||||||
if intervalFactor == 0 {
|
|
||||||
intervalFactor = 1
|
|
||||||
}
|
|
||||||
return time.Duration(int64(adjustedInterval) * intervalFactor), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculateRateInterval(interval time.Duration, scrapeInterval string, intervalCalculator intervalv2.Calculator) time.Duration {
|
|
||||||
scrape := scrapeInterval
|
|
||||||
if scrape == "" {
|
|
||||||
scrape = "15s"
|
|
||||||
}
|
|
||||||
|
|
||||||
scrapeIntervalDuration, err := intervalv2.ParseIntervalStringToTimeDuration(scrape)
|
|
||||||
if err != nil {
|
|
||||||
return time.Duration(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
rateInterval := time.Duration(int64(math.Max(float64(interval+scrapeIntervalDuration), float64(4)*float64(scrapeIntervalDuration))))
|
|
||||||
return rateInterval
|
|
||||||
}
|
|
||||||
|
|
||||||
func interpolateVariables(model *QueryModel, interval time.Duration, timeRange time.Duration, intervalCalculator intervalv2.Calculator, timeInterval string) string {
|
|
||||||
expr := model.Expr
|
|
||||||
rangeMs := timeRange.Milliseconds()
|
|
||||||
rangeSRounded := int64(math.Round(float64(rangeMs) / 1000.0))
|
|
||||||
|
|
||||||
var rateInterval time.Duration
|
|
||||||
if model.Interval == varRateInterval || model.Interval == varRateIntervalAlt {
|
|
||||||
rateInterval = interval
|
|
||||||
} else {
|
|
||||||
rateInterval = calculateRateInterval(interval, timeInterval, intervalCalculator)
|
|
||||||
}
|
|
||||||
|
|
||||||
expr = strings.ReplaceAll(expr, varIntervalMs, strconv.FormatInt(int64(interval/time.Millisecond), 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varInterval, intervalv2.FormatDuration(interval))
|
|
||||||
expr = strings.ReplaceAll(expr, varRangeMs, strconv.FormatInt(rangeMs, 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varRangeS, strconv.FormatInt(rangeSRounded, 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varRange, strconv.FormatInt(rangeSRounded, 10)+"s")
|
|
||||||
expr = strings.ReplaceAll(expr, varRateInterval, rateInterval.String())
|
|
||||||
|
|
||||||
// Repetitive code, we should have functionality to unify these
|
|
||||||
expr = strings.ReplaceAll(expr, varIntervalMsAlt, strconv.FormatInt(int64(interval/time.Millisecond), 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varIntervalAlt, intervalv2.FormatDuration(interval))
|
|
||||||
expr = strings.ReplaceAll(expr, varRangeMsAlt, strconv.FormatInt(rangeMs, 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varRangeSAlt, strconv.FormatInt(rangeSRounded, 10))
|
|
||||||
expr = strings.ReplaceAll(expr, varRangeAlt, strconv.FormatInt(rangeSRounded, 10)+"s")
|
|
||||||
expr = strings.ReplaceAll(expr, varRateIntervalAlt, rateInterval.String())
|
|
||||||
return expr
|
|
||||||
}
|
|
||||||
|
|
||||||
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data.Frames) data.Frames {
|
|
||||||
for _, v := range matrix {
|
|
||||||
tags := make(map[string]string, len(v.Metric))
|
|
||||||
for k, v := range v.Metric {
|
|
||||||
tags[string(k)] = string(v)
|
|
||||||
}
|
|
||||||
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
|
|
||||||
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(v.Values))
|
|
||||||
|
|
||||||
for i, k := range v.Values {
|
|
||||||
timeField.Set(i, k.Timestamp.Time().UTC())
|
|
||||||
valueField.Set(i, float64(k.Value))
|
|
||||||
}
|
|
||||||
|
|
||||||
name := formatLegend(v.Metric, query)
|
|
||||||
timeField.Name = data.TimeSeriesTimeFieldName
|
|
||||||
timeField.Config = &data.FieldConfig{Interval: float64(query.Step.Milliseconds())}
|
|
||||||
valueField.Name = data.TimeSeriesValueFieldName
|
|
||||||
valueField.Labels = tags
|
|
||||||
|
|
||||||
if name != "" {
|
|
||||||
valueField.Config = &data.FieldConfig{DisplayNameFromDS: name}
|
|
||||||
}
|
|
||||||
|
|
||||||
frames = append(frames, newDataFrame(name, "matrix", timeField, valueField))
|
|
||||||
}
|
|
||||||
|
|
||||||
return frames
|
|
||||||
}
|
|
||||||
|
|
||||||
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, frames data.Frames) data.Frames {
|
|
||||||
timeVector := []time.Time{scalar.Timestamp.Time().UTC()}
|
|
||||||
values := []float64{float64(scalar.Value)}
|
|
||||||
name := fmt.Sprintf("%g", values[0])
|
|
||||||
|
|
||||||
return append(
|
|
||||||
frames,
|
|
||||||
newDataFrame(
|
|
||||||
name,
|
|
||||||
"scalar",
|
|
||||||
data.NewField("Time", nil, timeVector),
|
|
||||||
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, frames data.Frames) data.Frames {
|
|
||||||
for _, v := range vector {
|
|
||||||
name := formatLegend(v.Metric, query)
|
|
||||||
tags := make(map[string]string, len(v.Metric))
|
|
||||||
timeVector := []time.Time{v.Timestamp.Time().UTC()}
|
|
||||||
values := []float64{float64(v.Value)}
|
|
||||||
|
|
||||||
for k, v := range v.Metric {
|
|
||||||
tags[string(k)] = string(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
frames = append(
|
|
||||||
frames,
|
|
||||||
newDataFrame(
|
|
||||||
name,
|
|
||||||
"vector",
|
|
||||||
data.NewField("Time", nil, timeVector),
|
|
||||||
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return frames
|
|
||||||
}
|
|
||||||
|
|
||||||
// normalizeExemplars transforms the exemplar results into a single list of events. At the same time we make sure
|
|
||||||
// that all exemplar events have the same labels which is important when converting to dataFrames so that we have
|
|
||||||
// the same length of each field (each label will be a separate field). Exemplars can have different label either
|
|
||||||
// because the exemplar event have different labels or because they are from different series.
|
|
||||||
// Reason why we merge exemplars into single list even if they are from different series is that for example in case
|
|
||||||
// of a histogram query, like histogram_quantile(0.99, sum(rate(traces_spanmetrics_duration_seconds_bucket[15s])) by (le))
|
|
||||||
// Prometheus still returns all the exemplars for all the series of metric traces_spanmetrics_duration_seconds_bucket.
|
|
||||||
// Which makes sense because each histogram bucket is separate series but we still want to show all the exemplars for
|
|
||||||
// the metric and we don't specifically care which buckets they are from.
|
|
||||||
// For non histogram queries or if you split by some label it would probably be nicer to then split also exemplars to
|
|
||||||
// multiple frames (so they will have different symbols in the UI) but that would require understanding the query so it
|
|
||||||
// is not implemented now.
|
|
||||||
func normalizeExemplars(response []apiv1.ExemplarQueryResult) []ExemplarEvent {
|
|
||||||
// TODO: this preallocation is very naive.
|
|
||||||
// We should figure out a better approximation here.
|
|
||||||
events := make([]ExemplarEvent, 0, len(response)*2)
|
|
||||||
|
|
||||||
// Get all the labels across all exemplars both from the examplars and their series labels. We will use this to make
|
|
||||||
// sure the resulting data frame has consistent number of values in each column.
|
|
||||||
eventLabels := make(map[string]struct{})
|
|
||||||
for _, exemplarData := range response {
|
|
||||||
// Check each exemplar labels as there isn't a guarantee they are consistent
|
|
||||||
for _, exemplar := range exemplarData.Exemplars {
|
|
||||||
for label := range exemplar.Labels {
|
|
||||||
eventLabels[string(label)] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for label := range exemplarData.SeriesLabels {
|
|
||||||
eventLabels[string(label)] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, exemplarData := range response {
|
|
||||||
for _, exemplar := range exemplarData.Exemplars {
|
|
||||||
event := ExemplarEvent{}
|
|
||||||
exemplarTime := exemplar.Timestamp.Time().UTC()
|
|
||||||
event.Time = exemplarTime
|
|
||||||
event.Value = float64(exemplar.Value)
|
|
||||||
event.Labels = make(map[string]string)
|
|
||||||
|
|
||||||
// Fill in all the labels from eventLabels with values from exemplar labels or series labels or fill with
|
|
||||||
// empty string
|
|
||||||
for label := range eventLabels {
|
|
||||||
if _, ok := exemplar.Labels[model.LabelName(label)]; ok {
|
|
||||||
event.Labels[label] = string(exemplar.Labels[model.LabelName(label)])
|
|
||||||
} else if _, ok := exemplarData.SeriesLabels[model.LabelName(label)]; ok {
|
|
||||||
event.Labels[label] = string(exemplarData.SeriesLabels[model.LabelName(label)])
|
|
||||||
} else {
|
|
||||||
event.Labels[label] = ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
events = append(events, event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return events
|
|
||||||
}
|
|
||||||
|
|
||||||
func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *PrometheusQuery, frames data.Frames) data.Frames {
|
|
||||||
events := normalizeExemplars(response)
|
|
||||||
|
|
||||||
// Sampling of exemplars
|
|
||||||
bucketedExemplars := make(map[string][]ExemplarEvent)
|
|
||||||
values := make([]float64, 0, len(events))
|
|
||||||
|
|
||||||
// Create bucketed exemplars based on aligned timestamp
|
|
||||||
for _, event := range events {
|
|
||||||
alignedTs := fmt.Sprintf("%.0f", math.Floor(float64(event.Time.Unix())/query.Step.Seconds())*query.Step.Seconds())
|
|
||||||
_, ok := bucketedExemplars[alignedTs]
|
|
||||||
if !ok {
|
|
||||||
bucketedExemplars[alignedTs] = make([]ExemplarEvent, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
bucketedExemplars[alignedTs] = append(bucketedExemplars[alignedTs], event)
|
|
||||||
values = append(values, event.Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate standard deviation
|
|
||||||
standardDeviation := deviation(values)
|
|
||||||
|
|
||||||
// Create slice with all of the bucketed exemplars
|
|
||||||
sampledBuckets := make([]string, len(bucketedExemplars))
|
|
||||||
for bucketTimes := range bucketedExemplars {
|
|
||||||
sampledBuckets = append(sampledBuckets, bucketTimes)
|
|
||||||
}
|
|
||||||
sort.Strings(sampledBuckets)
|
|
||||||
|
|
||||||
// Sample exemplars based ona value, so we are not showing too many of them
|
|
||||||
sampleExemplars := make([]ExemplarEvent, 0, len(sampledBuckets))
|
|
||||||
for _, bucket := range sampledBuckets {
|
|
||||||
exemplarsInBucket := bucketedExemplars[bucket]
|
|
||||||
if len(exemplarsInBucket) == 1 {
|
|
||||||
sampleExemplars = append(sampleExemplars, exemplarsInBucket[0])
|
|
||||||
} else {
|
|
||||||
bucketValues := make([]float64, len(exemplarsInBucket))
|
|
||||||
for _, exemplar := range exemplarsInBucket {
|
|
||||||
bucketValues = append(bucketValues, exemplar.Value)
|
|
||||||
}
|
|
||||||
sort.Slice(bucketValues, func(i, j int) bool {
|
|
||||||
return bucketValues[i] > bucketValues[j]
|
|
||||||
})
|
|
||||||
|
|
||||||
sampledBucketValues := make([]float64, 0)
|
|
||||||
for _, value := range bucketValues {
|
|
||||||
if len(sampledBucketValues) == 0 {
|
|
||||||
sampledBucketValues = append(sampledBucketValues, value)
|
|
||||||
} else {
|
|
||||||
// Then take values only when at least 2 standard deviation distance to previously taken value
|
|
||||||
prev := sampledBucketValues[len(sampledBucketValues)-1]
|
|
||||||
if standardDeviation != 0 && prev-value >= float64(2)*standardDeviation {
|
|
||||||
sampledBucketValues = append(sampledBucketValues, value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, valueBucket := range sampledBucketValues {
|
|
||||||
for _, exemplar := range exemplarsInBucket {
|
|
||||||
if exemplar.Value == valueBucket {
|
|
||||||
sampleExemplars = append(sampleExemplars, exemplar)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.SliceStable(sampleExemplars, func(i, j int) bool {
|
|
||||||
return sampleExemplars[i].Time.Before(sampleExemplars[j].Time)
|
|
||||||
})
|
|
||||||
|
|
||||||
// Create DF from sampled exemplars
|
|
||||||
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars))
|
|
||||||
timeField.Name = "Time"
|
|
||||||
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(sampleExemplars))
|
|
||||||
valueField.Name = "Value"
|
|
||||||
labelsVector := make(map[string][]string, len(sampleExemplars))
|
|
||||||
|
|
||||||
for i, exemplar := range sampleExemplars {
|
|
||||||
timeField.Set(i, exemplar.Time)
|
|
||||||
valueField.Set(i, exemplar.Value)
|
|
||||||
|
|
||||||
for label, value := range exemplar.Labels {
|
|
||||||
if labelsVector[label] == nil {
|
|
||||||
labelsVector[label] = make([]string, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
labelsVector[label] = append(labelsVector[label], value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dataFields := make([]*data.Field, 0, len(labelsVector)+2)
|
|
||||||
dataFields = append(dataFields, timeField, valueField)
|
|
||||||
|
|
||||||
// Sort the labels/fields so that it is consistent (mainly for easier testing)
|
|
||||||
allLabels := sortedLabels(labelsVector)
|
|
||||||
for _, label := range allLabels {
|
|
||||||
dataFields = append(dataFields, data.NewField(label, nil, labelsVector[label]))
|
|
||||||
}
|
|
||||||
|
|
||||||
newFrame := newDataFrame("exemplar", "exemplar", dataFields...)
|
|
||||||
// unset on exemplars (ugly but this client will be deprecated soon)
|
|
||||||
newFrame.Meta.Type = ""
|
|
||||||
|
|
||||||
return append(frames, newFrame)
|
|
||||||
}
|
|
||||||
|
|
||||||
func sortedLabels(labelsVector map[string][]string) []string {
|
|
||||||
allLabels := make([]string, len(labelsVector))
|
|
||||||
i := 0
|
|
||||||
for key := range labelsVector {
|
|
||||||
allLabels[i] = key
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
sort.Strings(allLabels)
|
|
||||||
return allLabels
|
|
||||||
}
|
|
||||||
|
|
||||||
func deviation(values []float64) float64 {
|
|
||||||
var sum, mean, sd float64
|
|
||||||
valuesLen := float64(len(values))
|
|
||||||
for _, value := range values {
|
|
||||||
sum += value
|
|
||||||
}
|
|
||||||
mean = sum / valuesLen
|
|
||||||
for j := 0; j < len(values); j++ {
|
|
||||||
sd += math.Pow(values[j]-mean, 2)
|
|
||||||
}
|
|
||||||
return math.Sqrt(sd / (valuesLen - 1))
|
|
||||||
}
|
|
||||||
|
|
||||||
func newDataFrame(name string, typ string, fields ...*data.Field) *data.Frame {
|
|
||||||
frame := data.NewFrame(name, fields...)
|
|
||||||
frame.Meta = &data.FrameMeta{
|
|
||||||
Type: data.FrameTypeTimeSeriesMulti,
|
|
||||||
Custom: map[string]string{
|
|
||||||
"resultType": typ, // Note: SSE depends on this property and map type
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
return frame
|
|
||||||
}
|
|
||||||
|
|
||||||
func alignTimeRange(t time.Time, step time.Duration, offset int64) time.Time {
|
|
||||||
offsetNano := float64(offset * 1e9)
|
|
||||||
stepNano := float64(step.Nanoseconds())
|
|
||||||
return time.Unix(0, int64(math.Floor((float64(t.UnixNano())+offsetNano)/stepNano)*stepNano-offsetNano))
|
|
||||||
}
|
|
||||||
|
|
||||||
func isVariableInterval(interval string) bool {
|
|
||||||
if interval == varInterval || interval == varIntervalMs || interval == varRateInterval {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// Repetitive code, we should have functionality to unify these
|
|
||||||
if interval == varIntervalAlt || interval == varIntervalMsAlt || interval == varRateIntervalAlt {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
@ -1,965 +0,0 @@
|
|||||||
package buffered
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math"
|
|
||||||
"net/http"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
||||||
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
|
|
||||||
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
||||||
p "github.com/prometheus/common/model"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
var now = time.Now()
|
|
||||||
|
|
||||||
type FakeRoundTripper struct {
|
|
||||||
Req *http.Request
|
|
||||||
}
|
|
||||||
|
|
||||||
func (frt *FakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
||||||
frt.Req = req
|
|
||||||
return &http.Response{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func FakeMiddleware(rt *FakeRoundTripper) sdkhttpclient.Middleware {
|
|
||||||
return sdkhttpclient.NamedMiddlewareFunc("fake", func(opts sdkhttpclient.Options, next http.RoundTripper) http.RoundTripper {
|
|
||||||
return rt
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrometheus_ExecuteTimeSeriesQuery(t *testing.T) {
|
|
||||||
t.Run("adding req headers", func(t *testing.T) {
|
|
||||||
// This makes sure we add req headers from the front end request to the request to prometheus. We do that
|
|
||||||
// through contextual middleware so this setup is a bit complex and the test itself goes a bit too much into
|
|
||||||
// internals.
|
|
||||||
|
|
||||||
// This ends the trip and saves the request on the instance so we can inspect it.
|
|
||||||
rt := &FakeRoundTripper{}
|
|
||||||
// DefaultMiddlewares also contain contextual middleware which is the one we need to use.
|
|
||||||
middlewares := sdkhttpclient.DefaultMiddlewares()
|
|
||||||
middlewares = append(middlewares, FakeMiddleware(rt))
|
|
||||||
|
|
||||||
// Setup http client in at least similar way to how grafana provides it to the service
|
|
||||||
provider := sdkhttpclient.NewProvider(sdkhttpclient.ProviderOptions{Middlewares: sdkhttpclient.DefaultMiddlewares()})
|
|
||||||
roundTripper, err := provider.GetTransport(sdkhttpclient.Options{
|
|
||||||
Middlewares: middlewares,
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
buffered, err := New(roundTripper, nil, backend.DataSourceInstanceSettings{JSONData: []byte("{}")}, &logtest.Fake{})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = buffered.ExecuteTimeSeriesQuery(context.Background(), &backend.QueryDataRequest{
|
|
||||||
PluginContext: backend.PluginContext{},
|
|
||||||
// This header is dropped, as only FromAlert header will be added to outgoing requests
|
|
||||||
Headers: map[string]string{"foo": "bar"},
|
|
||||||
Queries: []backend.DataQuery{{
|
|
||||||
JSON: []byte(`{"expr": "metric{label=\"test\"}", "rangeQuery": true}`),
|
|
||||||
}},
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, rt.Req)
|
|
||||||
require.Equal(t, http.Header{"Content-Type": []string{"application/x-www-form-urlencoded"}, "Idempotency-Key": []string(nil)}, rt.Req.Header)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrometheus_timeSeriesQuery_formatLegend(t *testing.T) {
|
|
||||||
t.Run("converting metric name", func(t *testing.T) {
|
|
||||||
metric := map[p.LabelName]p.LabelValue{
|
|
||||||
p.LabelName("app"): p.LabelValue("backend"),
|
|
||||||
p.LabelName("device"): p.LabelValue("mobile"),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "legend {{app}} {{ device }} {{broken}}",
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, "legend backend mobile ", formatLegend(metric, query))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("build full series name", func(t *testing.T) {
|
|
||||||
metric := map[p.LabelName]p.LabelValue{
|
|
||||||
p.LabelName(p.MetricNameLabel): p.LabelValue("http_request_total"),
|
|
||||||
p.LabelName("app"): p.LabelValue("backend"),
|
|
||||||
p.LabelName("device"): p.LabelValue("mobile"),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "",
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, `http_request_total{app="backend", device="mobile"}`, formatLegend(metric, query))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("use query expr when no labels", func(t *testing.T) {
|
|
||||||
metric := map[p.LabelName]p.LabelValue{}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "",
|
|
||||||
Expr: `{job="grafana"}`,
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, `{job="grafana"}`, formatLegend(metric, query))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("When legendFormat = __auto and no labels", func(t *testing.T) {
|
|
||||||
metric := map[p.LabelName]p.LabelValue{}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: legendFormatAuto,
|
|
||||||
Expr: `{job="grafana"}`,
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, `{job="grafana"}`, formatLegend(metric, query))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("When legendFormat = __auto with labels", func(t *testing.T) {
|
|
||||||
metric := map[p.LabelName]p.LabelValue{
|
|
||||||
p.LabelName("app"): p.LabelValue("backend"),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: legendFormatAuto,
|
|
||||||
Expr: `{job="grafana"}`,
|
|
||||||
}
|
|
||||||
|
|
||||||
require.Equal(t, "", formatLegend(metric, query))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
|
|
||||||
service := Buffered{
|
|
||||||
intervalCalculator: intervalv2.NewCalculator(),
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("parsing query from unified alerting", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(12 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
queryJson := `{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"refId": "A",
|
|
||||||
"exemplar": true
|
|
||||||
}`
|
|
||||||
|
|
||||||
query := &backend.QueryDataRequest{
|
|
||||||
Queries: []backend.DataQuery{
|
|
||||||
{
|
|
||||||
JSON: []byte(queryJson),
|
|
||||||
TimeRange: timeRange,
|
|
||||||
RefID: "A",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Headers: map[string]string{
|
|
||||||
"FromAlert": "true",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, false, models[0].ExemplarQuery)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with step", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(12 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, time.Second*30, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model without step parameter", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(1 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, time.Second*15, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with high intervalFactor", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 10,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, time.Minute*20, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with low intervalFactor", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, time.Minute*2, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model specified scrape-interval in the data source", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "240s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, time.Minute*4, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__interval variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__interval]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with ${__interval} variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [${__interval}]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__interval_ms variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__interval_ms]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__interval_ms and $__interval variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__interval_ms]}) + rate(ALERTS{job=\"test\" [$__interval]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with ${__interval_ms} and ${__interval} variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [${__interval_ms}]}) + rate(ALERTS{job=\"test\" [${__interval}]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range_s variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range_s]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [172800]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with ${__range_s} variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [${__range_s}s]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range_s variable below 0.5s", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(40 * time.Millisecond),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range_s]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [0]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range_s variable between 1-0.5s", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(800 * time.Millisecond),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range_s]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [1]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range_ms variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range_ms]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [172800000]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__range_ms variable below 1s", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(20 * time.Millisecond),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__range_ms]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [20]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__rate_interval variable", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(5 * time.Minute),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__rate_interval]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"interval": "5m",
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [5m15s]})", models[0].Expr)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model with $__rate_interval variable in expr and interval", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(5 * time.Minute),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "rate(ALERTS{job=\"test\" [$__rate_interval]})",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"interval": "$__rate_interval",
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, "rate(ALERTS{job=\"test\" [1m0s]})", models[0].Expr)
|
|
||||||
require.Equal(t, 1*time.Minute, models[0].Step)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model of range query", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A",
|
|
||||||
"range": true
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, true, models[0].RangeQuery)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model of range and instant query", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A",
|
|
||||||
"range": true,
|
|
||||||
"instant": true
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, true, models[0].RangeQuery)
|
|
||||||
require.Equal(t, true, models[0].InstantQuery)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("parsing query model of with no query type", func(t *testing.T) {
|
|
||||||
timeRange := backend.TimeRange{
|
|
||||||
From: now,
|
|
||||||
To: now.Add(48 * time.Hour),
|
|
||||||
}
|
|
||||||
|
|
||||||
query := queryContext(`{
|
|
||||||
"expr": "go_goroutines",
|
|
||||||
"format": "time_series",
|
|
||||||
"intervalFactor": 1,
|
|
||||||
"refId": "A"
|
|
||||||
}`, timeRange)
|
|
||||||
|
|
||||||
service.TimeInterval = "15s"
|
|
||||||
models, err := service.parseTimeSeriesQuery(query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, true, models[0].RangeQuery)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
|
|
||||||
t.Run("exemplars response should be sampled and parsed normally", func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
exemplars := []apiv1.ExemplarQueryResult{
|
|
||||||
{
|
|
||||||
SeriesLabels: p.LabelSet{
|
|
||||||
"__name__": "tns_request_duration_seconds_bucket",
|
|
||||||
"instance": "app:80",
|
|
||||||
"job": "tns/app",
|
|
||||||
},
|
|
||||||
Exemplars: []apiv1.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test1"},
|
|
||||||
Value: 0.003535405,
|
|
||||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-2 * time.Minute).UnixNano()),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test2"},
|
|
||||||
Value: 0.005555605,
|
|
||||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-4 * time.Minute).UnixNano()),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test3"},
|
|
||||||
Value: 0.007545445,
|
|
||||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-6 * time.Minute).UnixNano()),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test4"},
|
|
||||||
Value: 0.009545445,
|
|
||||||
Timestamp: p.TimeFromUnixNano(time.Now().Add(-7 * time.Minute).UnixNano()),
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
value[ExemplarQueryType] = bufferedResponse{
|
|
||||||
Response: exemplars,
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "legend {{app}}",
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Test fields
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "exemplar")
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Len(t, res[0].Fields, 6)
|
|
||||||
|
|
||||||
// Test correct values (sampled to 2)
|
|
||||||
require.Equal(t, res[0].Fields[1].Len(), 2)
|
|
||||||
require.Equal(t, res[0].Fields[1].At(0), 0.009545445)
|
|
||||||
require.Equal(t, res[0].Fields[1].At(1), 0.003535405)
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("exemplars response with inconsistent labels should marshal json ok", func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
exemplars := []apiv1.ExemplarQueryResult{
|
|
||||||
{
|
|
||||||
SeriesLabels: p.LabelSet{
|
|
||||||
"__name__": "tns_request_duration_seconds_bucket",
|
|
||||||
"instance": "app:80",
|
|
||||||
"service": "example",
|
|
||||||
},
|
|
||||||
Exemplars: []apiv1.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test1"},
|
|
||||||
Value: 0.003535405,
|
|
||||||
Timestamp: 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
SeriesLabels: p.LabelSet{
|
|
||||||
"__name__": "tns_request_duration_seconds_bucket",
|
|
||||||
"instance": "app:80",
|
|
||||||
"service": "example2",
|
|
||||||
"additional_label": "foo",
|
|
||||||
},
|
|
||||||
Exemplars: []apiv1.Exemplar{
|
|
||||||
{
|
|
||||||
Labels: p.LabelSet{"traceID": "test2", "userID": "test3"},
|
|
||||||
Value: 0.003535405,
|
|
||||||
Timestamp: 10,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
value[ExemplarQueryType] = bufferedResponse{
|
|
||||||
Response: exemplars,
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "legend {{app}}",
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Test frame marshal json no error.
|
|
||||||
_, err = res[0].MarshalJSON()
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
fields := []*data.Field{
|
|
||||||
data.NewField("Time", map[string]string{}, []time.Time{time.UnixMilli(1), time.UnixMilli(10)}),
|
|
||||||
data.NewField("Value", map[string]string{}, []float64{0.003535405, 0.003535405}),
|
|
||||||
data.NewField("__name__", map[string]string{}, []string{"tns_request_duration_seconds_bucket", "tns_request_duration_seconds_bucket"}),
|
|
||||||
data.NewField("additional_label", map[string]string{}, []string{"", "foo"}),
|
|
||||||
data.NewField("instance", map[string]string{}, []string{"app:80", "app:80"}),
|
|
||||||
data.NewField("service", map[string]string{}, []string{"example", "example2"}),
|
|
||||||
data.NewField("traceID", map[string]string{}, []string{"test1", "test2"}),
|
|
||||||
data.NewField("userID", map[string]string{}, []string{"", "test3"}),
|
|
||||||
}
|
|
||||||
|
|
||||||
newFrame := newDataFrame("exemplar", "exemplar", fields...)
|
|
||||||
newFrame.Meta.Type = ""
|
|
||||||
|
|
||||||
if diff := cmp.Diff(newFrame, res[0], data.FrameTestCompareOptions()...); diff != "" {
|
|
||||||
t.Errorf("Result mismatch (-want +got):\n%s", diff)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("matrix response should be parsed normally", func(t *testing.T) {
|
|
||||||
values := []p.SamplePair{
|
|
||||||
{Value: 1, Timestamp: 1000},
|
|
||||||
{Value: 2, Timestamp: 2000},
|
|
||||||
{Value: 3, Timestamp: 3000},
|
|
||||||
{Value: 4, Timestamp: 4000},
|
|
||||||
{Value: 5, Timestamp: 5000},
|
|
||||||
}
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: p.Matrix{
|
|
||||||
&p.SampleStream{
|
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
|
||||||
Values: values,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "legend {{app}}",
|
|
||||||
Step: 1 * time.Second,
|
|
||||||
Start: time.Unix(1, 0).UTC(),
|
|
||||||
End: time.Unix(5, 0).UTC(),
|
|
||||||
UtcOffsetSec: 0,
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "legend Application")
|
|
||||||
require.Len(t, res[0].Fields, 2)
|
|
||||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
|
||||||
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
|
|
||||||
|
|
||||||
// Ensure the timestamps are UTC zoned
|
|
||||||
testValue := res[0].Fields[0].At(0)
|
|
||||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("matrix response with missed data points should be parsed correctly", func(t *testing.T) {
|
|
||||||
values := []p.SamplePair{
|
|
||||||
{Value: 1, Timestamp: 1000},
|
|
||||||
{Value: 4, Timestamp: 4000},
|
|
||||||
}
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: p.Matrix{
|
|
||||||
&p.SampleStream{
|
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
|
||||||
Values: values,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "",
|
|
||||||
Step: 1 * time.Second,
|
|
||||||
Start: time.Unix(1, 0).UTC(),
|
|
||||||
End: time.Unix(4, 0).UTC(),
|
|
||||||
UtcOffsetSec: 0,
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Fields[0].Len(), 2)
|
|
||||||
require.Equal(t, time.Unix(1, 0).UTC(), res[0].Fields[0].At(0))
|
|
||||||
require.Equal(t, time.Unix(4, 0).UTC(), res[0].Fields[0].At(1))
|
|
||||||
require.Equal(t, res[0].Fields[1].Len(), 2)
|
|
||||||
require.Equal(t, float64(1), res[0].Fields[1].At(0).(float64))
|
|
||||||
require.Equal(t, float64(4), res[0].Fields[1].At(1).(float64))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("matrix response with from alerting missed data points should be parsed correctly", func(t *testing.T) {
|
|
||||||
values := []p.SamplePair{
|
|
||||||
{Value: 1, Timestamp: 1000},
|
|
||||||
{Value: 4, Timestamp: 4000},
|
|
||||||
}
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: p.Matrix{
|
|
||||||
&p.SampleStream{
|
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
|
||||||
Values: values,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "",
|
|
||||||
Step: 1 * time.Second,
|
|
||||||
Start: time.Unix(1, 0).UTC(),
|
|
||||||
End: time.Unix(4, 0).UTC(),
|
|
||||||
UtcOffsetSec: 0,
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "{app=\"Application\", tag2=\"tag2\"}")
|
|
||||||
require.Len(t, res[0].Fields, 2)
|
|
||||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
|
||||||
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "{app=\"Application\", tag2=\"tag2\"}")
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("matrix response with NaN value should be changed to null", func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: p.Matrix{
|
|
||||||
&p.SampleStream{
|
|
||||||
Metric: p.Metric{"app": "Application"},
|
|
||||||
Values: []p.SamplePair{
|
|
||||||
{Value: p.SampleValue(math.NaN()), Timestamp: 1000},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "",
|
|
||||||
Step: 1 * time.Second,
|
|
||||||
Start: time.Unix(1, 0).UTC(),
|
|
||||||
End: time.Unix(4, 0).UTC(),
|
|
||||||
UtcOffsetSec: 0,
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Equal(t, "Value", res[0].Fields[1].Name)
|
|
||||||
require.True(t, math.IsNaN(res[0].Fields[1].At(0).(float64)))
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("vector response should be parsed normally", func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: p.Vector{
|
|
||||||
&p.Sample{
|
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
|
||||||
Value: 1,
|
|
||||||
Timestamp: 123,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
query := &PrometheusQuery{
|
|
||||||
LegendFormat: "legend {{app}}",
|
|
||||||
}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "legend Application")
|
|
||||||
require.Len(t, res[0].Fields, 2)
|
|
||||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Len(t, res[0].Fields[1].Labels, 2)
|
|
||||||
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
|
|
||||||
|
|
||||||
// Ensure the timestamps are UTC zoned
|
|
||||||
testValue := res[0].Fields[0].At(0)
|
|
||||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
|
||||||
require.Equal(t, int64(123), testValue.(time.Time).UnixMilli())
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("scalar response should be parsed normally", func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: &p.Scalar{
|
|
||||||
Value: 1,
|
|
||||||
Timestamp: 123,
|
|
||||||
},
|
|
||||||
Warnings: nil,
|
|
||||||
}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "1")
|
|
||||||
require.Len(t, res[0].Fields, 2)
|
|
||||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "1")
|
|
||||||
|
|
||||||
// Ensure the timestamps are UTC zoned
|
|
||||||
testValue := res[0].Fields[0].At(0)
|
|
||||||
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
|
|
||||||
require.Equal(t, int64(123), testValue.(time.Time).UnixMilli())
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("warnings, if there is any, should be added to each frame",
|
|
||||||
func(t *testing.T) {
|
|
||||||
value := make(map[TimeSeriesQueryType]bufferedResponse)
|
|
||||||
value[RangeQueryType] = bufferedResponse{
|
|
||||||
Response: &p.Scalar{
|
|
||||||
Value: 1,
|
|
||||||
Timestamp: 123,
|
|
||||||
},
|
|
||||||
Warnings: []string{"warning1", "warning2"},
|
|
||||||
}
|
|
||||||
|
|
||||||
query := &PrometheusQuery{}
|
|
||||||
res, err := parseTimeSeriesResponse(value, query)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
require.Len(t, res, 1)
|
|
||||||
require.Equal(t, res[0].Name, "1")
|
|
||||||
require.Len(t, res[0].Fields, 2)
|
|
||||||
require.Len(t, res[0].Fields[0].Labels, 0)
|
|
||||||
require.Equal(t, res[0].Fields[0].Name, "Time")
|
|
||||||
require.Equal(t, res[0].Fields[1].Name, "Value")
|
|
||||||
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "1")
|
|
||||||
|
|
||||||
require.Equal(t, res[0].Meta.Notices[0].Text, "warning1")
|
|
||||||
require.Equal(t, res[0].Meta.Notices[1].Text, "warning2")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func queryContext(json string, timeRange backend.TimeRange) *backend.QueryDataRequest {
|
|
||||||
return &backend.QueryDataRequest{
|
|
||||||
Queries: []backend.DataQuery{
|
|
||||||
{
|
|
||||||
JSON: []byte(json),
|
|
||||||
TimeRange: timeRange,
|
|
||||||
RefID: "A",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,45 +0,0 @@
|
|||||||
package buffered
|
|
||||||
|
|
||||||
import (
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type PrometheusQuery struct {
|
|
||||||
Expr string
|
|
||||||
Step time.Duration
|
|
||||||
LegendFormat string
|
|
||||||
Start time.Time
|
|
||||||
End time.Time
|
|
||||||
RefId string
|
|
||||||
InstantQuery bool
|
|
||||||
RangeQuery bool
|
|
||||||
ExemplarQuery bool
|
|
||||||
UtcOffsetSec int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type ExemplarEvent struct {
|
|
||||||
Time time.Time
|
|
||||||
Value float64
|
|
||||||
Labels map[string]string
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueryModel struct {
|
|
||||||
Expr string `json:"expr"`
|
|
||||||
LegendFormat string `json:"legendFormat"`
|
|
||||||
Interval string `json:"interval"`
|
|
||||||
IntervalMS int64 `json:"intervalMS"`
|
|
||||||
StepMode string `json:"stepMode"`
|
|
||||||
RangeQuery bool `json:"range"`
|
|
||||||
InstantQuery bool `json:"instant"`
|
|
||||||
ExemplarQuery bool `json:"exemplar"`
|
|
||||||
IntervalFactor int64 `json:"intervalFactor"`
|
|
||||||
UtcOffsetSec int64 `json:"utcOffsetSec"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TimeSeriesQueryType string
|
|
||||||
|
|
||||||
const (
|
|
||||||
RangeQueryType TimeSeriesQueryType = "range"
|
|
||||||
InstantQueryType TimeSeriesQueryType = "instant"
|
|
||||||
ExemplarQueryType TimeSeriesQueryType = "exemplar"
|
|
||||||
)
|
|
@ -2,7 +2,6 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||||
@ -13,8 +12,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/middleware"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus/middleware"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/utils"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus/utils"
|
||||||
"github.com/grafana/grafana/pkg/util/maputil"
|
"github.com/grafana/grafana/pkg/util/maputil"
|
||||||
"github.com/prometheus/client_golang/api"
|
|
||||||
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateTransportOptions creates options for the http client. Probably should be shared and should not live in the
|
// CreateTransportOptions creates options for the http client. Probably should be shared and should not live in the
|
||||||
@ -49,20 +46,6 @@ func CreateTransportOptions(settings backend.DataSourceInstanceSettings, cfg *se
|
|||||||
return &opts, nil
|
return &opts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateAPIClient(roundTripper http.RoundTripper, url string) (apiv1.API, error) {
|
|
||||||
cfg := api.Config{
|
|
||||||
Address: url,
|
|
||||||
RoundTripper: roundTripper,
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := api.NewClient(cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return apiv1.NewAPI(client), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func middlewares(logger log.Logger, httpMethod string) []sdkhttpclient.Middleware {
|
func middlewares(logger log.Logger, httpMethod string) []sdkhttpclient.Middleware {
|
||||||
middlewares := []sdkhttpclient.Middleware{
|
middlewares := []sdkhttpclient.Middleware{
|
||||||
// TODO: probably isn't needed anymore and should by done by http infra code
|
// TODO: probably isn't needed anymore and should by done by http infra code
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
package middleware
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
|
|
||||||
sdkHTTPClient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ReqHeadersMiddleware is used so that we can pass req headers through the prometheus go client as it does not allow
|
|
||||||
// access to the request directly. Should be used together with WithContextualMiddleware so that it is attached to
|
|
||||||
// the context of each request with its unique headers.
|
|
||||||
func ReqHeadersMiddleware(headers map[string]string) sdkHTTPClient.Middleware {
|
|
||||||
return sdkHTTPClient.NamedMiddlewareFunc("prometheus-req-headers-middleware", func(opts sdkHTTPClient.Options, next http.RoundTripper) http.RoundTripper {
|
|
||||||
return sdkHTTPClient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
|
||||||
for k, v := range headers {
|
|
||||||
// As custom headers middleware is before contextual we may overwrite custom headers here with those
|
|
||||||
// that came with the request which probably makes sense.
|
|
||||||
req.Header[k] = []string{v}
|
|
||||||
}
|
|
||||||
|
|
||||||
return next.RoundTrip(req)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
"github.com/grafana/grafana/pkg/setting"
|
"github.com/grafana/grafana/pkg/setting"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/buffered"
|
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/client"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus/client"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/prometheus/resource"
|
"github.com/grafana/grafana/pkg/tsdb/prometheus/resource"
|
||||||
@ -31,7 +30,6 @@ type Service struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type instance struct {
|
type instance struct {
|
||||||
buffered *buffered.Buffered
|
|
||||||
queryData *querydata.QueryData
|
queryData *querydata.QueryData
|
||||||
resource *resource.Resource
|
resource *resource.Resource
|
||||||
versionCache *cache.Cache
|
versionCache *cache.Cache
|
||||||
@ -47,7 +45,7 @@ func ProvideService(httpClientProvider httpclient.Provider, cfg *setting.Cfg, fe
|
|||||||
|
|
||||||
func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) datasource.InstanceFactoryFunc {
|
func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) datasource.InstanceFactoryFunc {
|
||||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||||
// Creates a http roundTripper. Probably should be used for both buffered and streaming/querydata instances.
|
// Creates a http roundTripper.
|
||||||
opts, err := client.CreateTransportOptions(settings, cfg, plog)
|
opts, err := client.CreateTransportOptions(settings, cfg, plog)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating transport options: %v", err)
|
return nil, fmt.Errorf("error creating transport options: %v", err)
|
||||||
@ -56,11 +54,6 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cf
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating http client: %v", err)
|
return nil, fmt.Errorf("error creating http client: %v", err)
|
||||||
}
|
}
|
||||||
// Older version using standard Go Prometheus client
|
|
||||||
b, err := buffered.New(httpClient.Transport, tracer, settings, plog)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// New version using custom client and better response parsing
|
// New version using custom client and better response parsing
|
||||||
qd, err := querydata.New(httpClient, features, tracer, settings, plog)
|
qd, err := querydata.New(httpClient, features, tracer, settings, plog)
|
||||||
@ -75,7 +68,6 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cf
|
|||||||
}
|
}
|
||||||
|
|
||||||
return instance{
|
return instance{
|
||||||
buffered: b,
|
|
||||||
queryData: qd,
|
queryData: qd,
|
||||||
resource: r,
|
resource: r,
|
||||||
versionCache: cache.New(time.Minute*1, time.Minute*5),
|
versionCache: cache.New(time.Minute*1, time.Minute*5),
|
||||||
@ -93,10 +85,6 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.features.IsEnabled(featuremgmt.FlagPrometheusBufferedClient) {
|
|
||||||
return i.buffered.ExecuteTimeSeriesQuery(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
return i.queryData.Execute(ctx, req)
|
return i.queryData.Execute(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user