Prometheus: Move existing query logic to new buffered package (#48668)

This commit is contained in:
Todd Treece 2022-05-12 05:34:09 -04:00 committed by GitHub
parent 44e7602ad2
commit e1a9ce4cc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 166 additions and 140 deletions

View File

@ -1,4 +1,4 @@
package prometheus
package buffered
import (
"bytes"
@ -15,7 +15,9 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
)
@ -33,9 +35,9 @@ func TestMatrixResponses(t *testing.T) {
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
queryFileName := filepath.Join("testdata", test.filepath+".query.json")
responseFileName := filepath.Join("testdata", test.filepath+".result.json")
goldenFileName := filepath.Join("testdata", test.filepath+".result.golden.txt")
queryFileName := filepath.Join("../testdata", test.filepath+".query.json")
responseFileName := filepath.Join("../testdata", test.filepath+".result.json")
goldenFileName := filepath.Join("../testdata", test.filepath+".result.golden.txt")
query, err := loadStoredPrometheusQuery(queryFileName)
require.NoError(t, err)
@ -131,6 +133,20 @@ func runQuery(response []byte, query PrometheusQuery) (*backend.QueryDataRespons
return nil, err
}
s := Service{tracer: tracer}
s := Buffered{
intervalCalculator: intervalv2.NewCalculator(),
tracer: tracer,
TimeInterval: "15s",
log: &fakeLogger{},
}
return s.runQueries(context.Background(), api, []*PrometheusQuery{&query})
}
type fakeLogger struct {
log.Logger
}
func (fl *fakeLogger) Debug(testMessage string, ctx ...interface{}) {}
func (fl *fakeLogger) Info(testMessage string, ctx ...interface{}) {}
func (fl *fakeLogger) Warn(testMessage string, ctx ...interface{}) {}
func (fl *fakeLogger) Error(testMessage string, ctx ...interface{}) {}

View File

@ -7,7 +7,7 @@ import (
"strings"
"testing"
"github.com/grafana/grafana/pkg/tsdb/prometheus/promclient"
"github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"

View File

@ -5,7 +5,7 @@ import (
"net/http"
"testing"
"github.com/grafana/grafana/pkg/tsdb/prometheus/promclient"
"github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"

View File

@ -1,4 +1,4 @@
package prometheus
package buffered
import (
"context"
@ -24,7 +24,7 @@ func BenchmarkJson(b *testing.B) {
tracer, err := tracing.InitializeTracerForTest()
require.NoError(b, err)
s := Service{tracer: tracer}
s := Buffered{tracer: tracer, log: &fakeLogger{}}
b.ResetTimer()
for n := 0; n < b.N; n++ {

View File

@ -1,10 +1,11 @@
package prometheus
package buffered
import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"strings"
@ -12,7 +13,14 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/grafana/grafana/pkg/tsdb/prometheus/buffered/promclient"
"github.com/grafana/grafana/pkg/util/maputil"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel/attribute"
@ -41,23 +49,57 @@ const (
const legendFormatAuto = "__auto"
type TimeSeriesQueryType string
const (
RangeQueryType TimeSeriesQueryType = "range"
InstantQueryType TimeSeriesQueryType = "instant"
ExemplarQueryType TimeSeriesQueryType = "exemplar"
var (
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
safeRes = 11000
)
func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) {
type Buffered struct {
intervalCalculator intervalv2.Calculator
tracer tracing.Tracer
getClient clientGetter
log log.Logger
ID int64
URL string
TimeInterval string
}
func New(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer, settings backend.DataSourceInstanceSettings, plog log.Logger) (*Buffered, error) {
var jsonData map[string]interface{}
if err := json.Unmarshal(settings.JSONData, &jsonData); err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval")
if err != nil {
return nil, err
}
p := promclient.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog)
pc, err := promclient.NewProviderCache(p)
if err != nil {
return nil, err
}
return &Buffered{
intervalCalculator: intervalv2.NewCalculator(),
tracer: tracer,
log: plog,
getClient: pc.GetClient,
TimeInterval: timeInterval,
ID: settings.ID,
URL: settings.URL,
}, nil
}
func (b *Buffered) runQueries(ctx context.Context, client apiv1.API, queries []*PrometheusQuery) (*backend.QueryDataResponse, error) {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
b.log.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
ctx, span := s.tracer.Start(ctx, "datasource.prometheus")
ctx, span := b.tracer.Start(ctx, "datasource.prometheus")
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
@ -75,7 +117,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P
if query.RangeQuery {
rangeResponse, _, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
plog.Error("Range query failed", "query", query.Expr, "err", err)
b.log.Error("Range query failed", "query", query.Expr, "err", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
continue
}
@ -85,7 +127,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P
if query.InstantQuery {
instantResponse, _, err := client.Query(ctx, query.Expr, query.End)
if err != nil {
plog.Error("Instant query failed", "query", query.Expr, "err", err)
b.log.Error("Instant query failed", "query", query.Expr, "err", err)
result.Responses[query.RefId] = backend.DataResponse{Error: err}
continue
}
@ -97,7 +139,7 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P
if query.ExemplarQuery {
exemplarResponse, err := client.QueryExemplars(ctx, query.Expr, timeRange.Start, timeRange.End)
if err != nil {
plog.Error("Exemplar query failed", "query", query.Expr, "err", err)
b.log.Error("Exemplar query failed", "query", query.Expr, "err", err)
} else {
response[ExemplarQueryType] = exemplarResponse
}
@ -121,13 +163,13 @@ func (s *Service) runQueries(ctx context.Context, client apiv1.API, queries []*P
return &result, nil
}
func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest, dsInfo *DatasourceInfo) (*backend.QueryDataResponse, error) {
client, err := dsInfo.getClient(req.Headers)
func (b *Buffered) ExecuteTimeSeriesQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
client, err := b.getClient(req.Headers)
if err != nil {
return nil, err
}
queries, err := s.parseTimeSeriesQuery(req, dsInfo)
queries, err := b.parseTimeSeriesQuery(req)
if err != nil {
result := backend.QueryDataResponse{
Responses: backend.Responses{},
@ -135,7 +177,7 @@ func (s *Service) executeTimeSeriesQuery(ctx context.Context, req *backend.Query
return &result, err
}
return s.runQueries(ctx, client, queries)
return b.runQueries(ctx, client, queries)
}
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
@ -167,7 +209,7 @@ func formatLegend(metric model.Metric, query *PrometheusQuery) string {
return legend
}
func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, dsInfo *DatasourceInfo) ([]*PrometheusQuery, error) {
func (b *Buffered) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest) ([]*PrometheusQuery, error) {
qs := []*PrometheusQuery{}
for _, query := range queryContext.Queries {
model := &QueryModel{}
@ -176,14 +218,14 @@ func (s *Service) parseTimeSeriesQuery(queryContext *backend.QueryDataRequest, d
return nil, err
}
//Final interval value
interval, err := calculatePrometheusInterval(model, dsInfo, query, s.intervalCalculator)
interval, err := calculatePrometheusInterval(model, b.TimeInterval, query, b.intervalCalculator)
if err != nil {
return nil, err
}
// Interpolate variables in expr
timeRange := query.TimeRange.To.Sub(query.TimeRange.From)
expr := interpolateVariables(model, interval, timeRange, s.intervalCalculator, dsInfo.TimeInterval)
expr := interpolateVariables(model, interval, timeRange, b.intervalCalculator, b.TimeInterval)
rangeQuery := model.RangeQuery
if !model.InstantQuery && !model.RangeQuery {
// In older dashboards, we were not setting range query param and !range && !instant was run as range query
@ -232,8 +274,7 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P
case []apiv1.ExemplarQueryResult:
nextFrames = exemplarToDataFrames(v, query, nextFrames)
default:
plog.Error("Query returned unexpected result type", "type", v, "query", query.Expr)
continue
return nil, fmt.Errorf("unexpected result type: %s query: %s", v, query.Expr)
}
frames = append(frames, nextFrames...)
@ -242,7 +283,7 @@ func parseTimeSeriesResponse(value map[TimeSeriesQueryType]interface{}, query *P
return frames, nil
}
func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) {
func calculatePrometheusInterval(model *QueryModel, timeInterval string, query backend.DataQuery, intervalCalculator intervalv2.Calculator) (time.Duration, error) {
queryInterval := model.Interval
//If we are using variable for interval/step, we will replace it with calculated interval
@ -250,7 +291,7 @@ func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, quer
queryInterval = ""
}
minInterval, err := intervalv2.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, model.IntervalMS, 15*time.Second)
minInterval, err := intervalv2.GetIntervalFrom(timeInterval, queryInterval, model.IntervalMS, 15*time.Second)
if err != nil {
return time.Duration(0), err
}
@ -264,7 +305,7 @@ func calculatePrometheusInterval(model *QueryModel, dsInfo *DatasourceInfo, quer
if model.Interval == varRateInterval || model.Interval == varRateIntervalAlt {
// Rate interval is final and is not affected by resolution
return calculateRateInterval(adjustedInterval, dsInfo.TimeInterval, intervalCalculator), nil
return calculateRateInterval(adjustedInterval, timeInterval, intervalCalculator), nil
} else {
intervalFactor := model.IntervalFactor
if intervalFactor == 0 {

View File

@ -1,4 +1,4 @@
package prometheus
package buffered
import (
"math"
@ -79,7 +79,7 @@ func TestPrometheus_timeSeriesQuery_formatLeged(t *testing.T) {
}
func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
service := Service{
service := Buffered{
intervalCalculator: intervalv2.NewCalculator(),
}
@ -108,8 +108,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
},
}
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, false, models[0].ExemplarQuery)
})
@ -126,8 +126,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
@ -145,8 +145,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
})
@ -164,8 +164,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, time.Minute*20, models[0].Step)
})
@ -183,8 +183,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
})
@ -202,10 +202,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{
TimeInterval: "240s",
}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "240s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, time.Minute*4, models[0].Step)
})
@ -223,8 +221,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -242,8 +240,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -261,8 +259,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]})", models[0].Expr)
})
@ -280,8 +278,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -299,8 +297,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [120000]}) + rate(ALERTS{job=\"test\" [2m]})", models[0].Expr)
})
@ -318,8 +316,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr)
})
@ -337,8 +335,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800]})", models[0].Expr)
})
@ -356,8 +354,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800s]})", models[0].Expr)
})
@ -375,8 +373,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [0]})", models[0].Expr)
})
@ -394,8 +392,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [1]})", models[0].Expr)
})
@ -413,8 +411,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [172800000]})", models[0].Expr)
})
@ -432,8 +430,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [20]})", models[0].Expr)
})
@ -452,8 +450,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [5m15s]})", models[0].Expr)
})
@ -472,8 +470,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, "rate(ALERTS{job=\"test\" [1m0s]})", models[0].Expr)
require.Equal(t, 1*time.Minute, models[0].Step)
@ -493,8 +491,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"range": true
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
})
@ -514,8 +512,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"instant": true
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
require.Equal(t, true, models[0].InstantQuery)
@ -534,8 +532,8 @@ func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
"refId": "A"
}`, timeRange)
dsInfo := &DatasourceInfo{}
models, err := service.parseTimeSeriesQuery(query, dsInfo)
service.TimeInterval = "15s"
models, err := service.parseTimeSeriesQuery(query)
require.NoError(t, err)
require.Equal(t, true, models[0].RangeQuery)
})

View File

@ -1,4 +1,4 @@
package prometheus
package buffered
import (
"time"
@ -6,14 +6,6 @@ import (
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
)
type DatasourceInfo struct {
ID int64
URL string
TimeInterval string
getClient clientGetter
}
type clientGetter func(map[string]string) (apiv1.API, error)
type PrometheusQuery struct {
@ -47,3 +39,11 @@ type QueryModel struct {
IntervalFactor int64 `json:"intervalFactor"`
UtcOffsetSec int64 `json:"utcOffsetSec"`
}
type TimeSeriesQueryType string
const (
RangeQueryType TimeSeriesQueryType = "range"
InstantQueryType TimeSeriesQueryType = "instant"
ExemplarQueryType TimeSeriesQueryType = "exemplar"
)

View File

@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@ -15,34 +14,28 @@ import (
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/intervalv2"
"github.com/grafana/grafana/pkg/tsdb/prometheus/promclient"
"github.com/grafana/grafana/pkg/util/maputil"
"github.com/grafana/grafana/pkg/tsdb/prometheus/buffered"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
)
var (
plog = log.New("tsdb.prometheus")
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
safeRes = 11000
)
var plog = log.New("tsdb.prometheus")
type Service struct {
intervalCalculator intervalv2.Calculator
im instancemgmt.InstanceManager
tracer tracing.Tracer
im instancemgmt.InstanceManager
}
type instance struct {
Buffered *buffered.Buffered
}
func ProvideService(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) *Service {
plog.Debug("initializing")
return &Service{
intervalCalculator: intervalv2.NewCalculator(),
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, cfg, features)),
tracer: tracer,
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider, cfg, features, tracer)),
}
}
func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles) datasource.InstanceFactoryFunc {
func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
var jsonData map[string]interface{}
err := json.Unmarshal(settings.JSONData, &jsonData)
@ -50,25 +43,14 @@ func newInstanceSettings(httpClientProvider httpclient.Provider, cfg *setting.Cf
return nil, fmt.Errorf("error reading settings: %w", err)
}
p := promclient.NewProvider(settings, jsonData, httpClientProvider, cfg, features, plog)
pc, err := promclient.NewProviderCache(p)
buf, err := buffered.New(httpClientProvider, cfg, features, tracer, settings, plog)
if err != nil {
return nil, err
}
timeInterval, err := maputil.GetStringOptional(jsonData, "timeInterval")
if err != nil {
return nil, err
}
mdl := DatasourceInfo{
ID: settings.ID,
URL: settings.URL,
TimeInterval: timeInterval,
getClient: pc.GetClient,
}
return mdl, nil
return instance{
Buffered: buf,
}, nil
}
}
@ -77,32 +59,21 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
}
q := req.Queries[0]
dsInfo, err := s.getDSInfo(req.PluginContext)
i, err := s.getInstance(req.PluginContext)
if err != nil {
return nil, err
}
var result *backend.QueryDataResponse
switch q.QueryType {
case "timeSeriesQuery":
fallthrough
default:
result, err = s.executeTimeSeriesQuery(ctx, req, dsInfo)
}
return result, err
return i.Buffered.ExecuteTimeSeriesQuery(ctx, req)
}
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*DatasourceInfo, error) {
func (s *Service) getInstance(pluginCtx backend.PluginContext) (*instance, error) {
i, err := s.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance := i.(DatasourceInfo)
return &instance, nil
in := i.(instance)
return &in, nil
}
// IsAPIError returns whether err is or wraps a Prometheus error.