mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Prometheus: Run Explore both queries trough backend (#39462)
* Prometheus: Run Explore both queries trough backend
* Refactor, simplify
* Set default values for query type selector
* Run multiple queries as one query trough backend
* Remove trailing newlines
* Pass utcOffset
* Remove trailing comma
* Add meta to frames only at 1 place
* Set exemplars to falsee if they are disabled
* Revert "Set exemplars to falsee if they are disabled"
This reverts commit e7b697c9f3.
This commit is contained in:
@@ -34,26 +34,6 @@ var (
|
|||||||
safeRes = 11000
|
safeRes = 11000
|
||||||
)
|
)
|
||||||
|
|
||||||
type DatasourceInfo struct {
|
|
||||||
ID int64
|
|
||||||
HTTPClientOpts sdkhttpclient.Options
|
|
||||||
URL string
|
|
||||||
HTTPMethod string
|
|
||||||
TimeInterval string
|
|
||||||
}
|
|
||||||
|
|
||||||
type QueryModel struct {
|
|
||||||
Expr string `json:"expr"`
|
|
||||||
LegendFormat string `json:"legendFormat"`
|
|
||||||
Interval string `json:"interval"`
|
|
||||||
IntervalMS int64 `json:"intervalMS"`
|
|
||||||
StepMode string `json:"stepMode"`
|
|
||||||
RangeQuery bool `json:"range"`
|
|
||||||
InstantQuery bool `json:"instant"`
|
|
||||||
IntervalFactor int64 `json:"intervalFactor"`
|
|
||||||
UtcOffsetSec int64 `json:"utcOffsetSec"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
httpClientProvider httpclient.Provider
|
httpClientProvider httpclient.Provider
|
||||||
intervalCalculator intervalv2.Calculator
|
intervalCalculator intervalv2.Calculator
|
||||||
@@ -154,13 +134,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, query := range queries {
|
for _, query := range queries {
|
||||||
timeRange := apiv1.Range{
|
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
|
||||||
Start: query.Start,
|
|
||||||
End: query.End,
|
|
||||||
Step: query.Step,
|
|
||||||
}
|
|
||||||
|
|
||||||
plog.Debug("Sending query", "start", timeRange.Start, "end", timeRange.End, "step", timeRange.Step, "query", query.Expr)
|
|
||||||
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "datasource.prometheus")
|
span, ctx := opentracing.StartSpanFromContext(ctx, "datasource.prometheus")
|
||||||
span.SetTag("expr", query.Expr)
|
span.SetTag("expr", query.Expr)
|
||||||
@@ -168,30 +142,38 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
span.SetTag("stop_unixnano", query.End.UnixNano())
|
span.SetTag("stop_unixnano", query.End.UnixNano())
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
var response model.Value
|
response := make(map[PrometheusQueryType]model.Value)
|
||||||
|
|
||||||
switch query.QueryType {
|
if query.RangeQuery {
|
||||||
case Range:
|
timeRange := apiv1.Range{
|
||||||
response, _, err = client.QueryRange(ctx, query.Expr, timeRange)
|
Step: query.Step,
|
||||||
|
// Align query range to step. It rounds start and end down to a multiple of step.
|
||||||
|
Start: time.Unix(int64(math.Floor((float64(query.Start.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
|
||||||
|
End: time.Unix(int64(math.Floor((float64(query.End.Unix()+query.UtcOffsetSec)/query.Step.Seconds()))*query.Step.Seconds()-float64(query.UtcOffsetSec)), 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
rangeResponse, _, err := client.QueryRange(ctx, query.Expr, timeRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
|
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
|
||||||
}
|
}
|
||||||
case Instant:
|
response[Range] = rangeResponse
|
||||||
response, _, err = client.Query(ctx, query.Expr, query.End)
|
}
|
||||||
|
|
||||||
|
if query.InstantQuery {
|
||||||
|
instantResponse, _, err := client.Query(ctx, query.Expr, query.End)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
|
return &result, fmt.Errorf("query: %s failed with: %v", query.Expr, err)
|
||||||
}
|
}
|
||||||
default:
|
response[Instant] = instantResponse
|
||||||
return &result, fmt.Errorf("unknown Query type detected %#v", query.QueryType)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
frame, err := parseResponse(response, query)
|
frames, err := parseResponse(response, query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &result, err
|
return &result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Responses[query.RefId] = backend.DataResponse{
|
result.Responses[query.RefId] = backend.DataResponse{
|
||||||
Frames: frame,
|
Frames: frames,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -309,61 +291,60 @@ func (s *Service) parseQuery(queryContext *backend.QueryDataRequest, dsInfo *Dat
|
|||||||
expr = strings.ReplaceAll(expr, "$__range", strconv.FormatInt(rangeS, 10)+"s")
|
expr = strings.ReplaceAll(expr, "$__range", strconv.FormatInt(rangeS, 10)+"s")
|
||||||
expr = strings.ReplaceAll(expr, "$__rate_interval", intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
|
expr = strings.ReplaceAll(expr, "$__rate_interval", intervalv2.FormatDuration(calculateRateInterval(interval, dsInfo.TimeInterval, s.intervalCalculator)))
|
||||||
|
|
||||||
if model.RangeQuery && model.InstantQuery {
|
rangeQuery := model.RangeQuery
|
||||||
return nil, fmt.Errorf("the provided query is not valid, expected only one of `range` and `instant` to be true")
|
if !model.InstantQuery && !model.RangeQuery {
|
||||||
}
|
// In older dashboards, we were not setting range query param and !range && !instant was run as range query
|
||||||
|
rangeQuery = true
|
||||||
var queryType PrometheusQueryType
|
|
||||||
var start time.Time
|
|
||||||
var end time.Time
|
|
||||||
|
|
||||||
if model.InstantQuery {
|
|
||||||
queryType = Instant
|
|
||||||
start = query.TimeRange.From
|
|
||||||
end = query.TimeRange.To
|
|
||||||
} else {
|
|
||||||
queryType = Range
|
|
||||||
// Align query range to step. It rounds start and end down to a multiple of step.
|
|
||||||
start = time.Unix(int64(math.Floor((float64(query.TimeRange.From.Unix()+model.UtcOffsetSec)/interval.Seconds()))*interval.Seconds()-float64(model.UtcOffsetSec)), 0)
|
|
||||||
end = time.Unix(int64(math.Floor((float64(query.TimeRange.To.Unix()+model.UtcOffsetSec)/interval.Seconds()))*interval.Seconds()-float64(model.UtcOffsetSec)), 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qs = append(qs, &PrometheusQuery{
|
qs = append(qs, &PrometheusQuery{
|
||||||
Expr: expr,
|
Expr: expr,
|
||||||
Step: interval,
|
Step: interval,
|
||||||
LegendFormat: model.LegendFormat,
|
LegendFormat: model.LegendFormat,
|
||||||
Start: start,
|
Start: query.TimeRange.From,
|
||||||
End: end,
|
End: query.TimeRange.To,
|
||||||
RefId: query.RefID,
|
RefId: query.RefID,
|
||||||
QueryType: queryType,
|
InstantQuery: model.InstantQuery,
|
||||||
|
RangeQuery: rangeQuery,
|
||||||
|
UtcOffsetSec: model.UtcOffsetSec,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return qs, nil
|
return qs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseResponse(value model.Value, query *PrometheusQuery) (data.Frames, error) {
|
func parseResponse(value map[PrometheusQueryType]model.Value, query *PrometheusQuery) (data.Frames, error) {
|
||||||
frames := data.Frames{}
|
allFrames := data.Frames{}
|
||||||
|
|
||||||
|
for queryType, value := range value {
|
||||||
|
var frames data.Frames
|
||||||
|
|
||||||
matrix, ok := value.(model.Matrix)
|
matrix, ok := value.(model.Matrix)
|
||||||
if ok {
|
if ok {
|
||||||
matrixFrames := matrixToDataFrames(matrix, query)
|
frames = matrixToDataFrames(matrix, query, queryType)
|
||||||
frames = append(frames, matrixFrames...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vector, ok := value.(model.Vector)
|
vector, ok := value.(model.Vector)
|
||||||
if ok {
|
if ok {
|
||||||
vectorFrames := vectorToDataFrames(vector, query)
|
frames = vectorToDataFrames(vector, query, queryType)
|
||||||
frames = append(frames, vectorFrames...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
scalar, ok := value.(*model.Scalar)
|
scalar, ok := value.(*model.Scalar)
|
||||||
if ok {
|
if ok {
|
||||||
scalarFrames := scalarToDataFrames(scalar)
|
frames = scalarToDataFrames(scalar, query, queryType)
|
||||||
frames = append(frames, scalarFrames...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return frames, nil
|
for _, frame := range frames {
|
||||||
|
frame.Meta = &data.FrameMeta{
|
||||||
|
Custom: map[string]PrometheusQueryType{
|
||||||
|
"queryType": queryType,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
allFrames = append(allFrames, frames...)
|
||||||
|
}
|
||||||
|
|
||||||
|
return allFrames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsAPIError returns whether err is or wraps a Prometheus error.
|
// IsAPIError returns whether err is or wraps a Prometheus error.
|
||||||
@@ -396,7 +377,7 @@ func calculateRateInterval(interval time.Duration, scrapeInterval string, interv
|
|||||||
return rateInterval
|
return rateInterval
|
||||||
}
|
}
|
||||||
|
|
||||||
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames {
|
func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, queryType PrometheusQueryType) data.Frames {
|
||||||
frames := data.Frames{}
|
frames := data.Frames{}
|
||||||
|
|
||||||
for _, v := range matrix {
|
for _, v := range matrix {
|
||||||
@@ -411,26 +392,26 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery) data.Frames
|
|||||||
values = append(values, float64(k.Value))
|
values = append(values, float64(k.Value))
|
||||||
}
|
}
|
||||||
name := formatLegend(v.Metric, query)
|
name := formatLegend(v.Metric, query)
|
||||||
frames = append(frames, data.NewFrame(name,
|
frame := data.NewFrame(name,
|
||||||
data.NewField("Time", nil, timeVector),
|
data.NewField("Time", nil, timeVector),
|
||||||
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
|
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))
|
||||||
|
frames = append(frames, frame)
|
||||||
}
|
}
|
||||||
|
|
||||||
return frames
|
return frames
|
||||||
}
|
}
|
||||||
|
|
||||||
func scalarToDataFrames(scalar *model.Scalar) data.Frames {
|
func scalarToDataFrames(scalar *model.Scalar, query *PrometheusQuery, queryType PrometheusQueryType) data.Frames {
|
||||||
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
|
timeVector := []time.Time{time.Unix(scalar.Timestamp.Unix(), 0).UTC()}
|
||||||
values := []float64{float64(scalar.Value)}
|
values := []float64{float64(scalar.Value)}
|
||||||
name := fmt.Sprintf("%g", values[0])
|
name := fmt.Sprintf("%g", values[0])
|
||||||
frames := data.Frames{data.NewFrame(name,
|
frame := data.NewFrame(name,
|
||||||
data.NewField("Time", nil, timeVector),
|
data.NewField("Time", nil, timeVector),
|
||||||
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))}
|
data.NewField("Value", nil, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))
|
||||||
|
frames := data.Frames{frame}
|
||||||
return frames
|
return frames
|
||||||
}
|
}
|
||||||
|
|
||||||
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery) data.Frames {
|
func vectorToDataFrames(vector model.Vector, query *PrometheusQuery, queryType PrometheusQueryType) data.Frames {
|
||||||
frames := data.Frames{}
|
frames := data.Frames{}
|
||||||
for _, v := range vector {
|
for _, v := range vector {
|
||||||
name := formatLegend(v.Metric, query)
|
name := formatLegend(v.Metric, query)
|
||||||
@@ -440,9 +421,10 @@ func vectorToDataFrames(vector model.Vector, query *PrometheusQuery) data.Frames
|
|||||||
for k, v := range v.Metric {
|
for k, v := range v.Metric {
|
||||||
tags[string(k)] = string(v)
|
tags[string(k)] = string(v)
|
||||||
}
|
}
|
||||||
frames = append(frames, data.NewFrame(name,
|
frame := data.NewFrame(name,
|
||||||
data.NewField("Time", nil, timeVector),
|
data.NewField("Time", nil, timeVector),
|
||||||
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
|
data.NewField("Value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))
|
||||||
|
frames = append(frames, frame)
|
||||||
}
|
}
|
||||||
|
|
||||||
return frames
|
return frames
|
||||||
|
|||||||
@@ -303,10 +303,32 @@ func TestPrometheus_parseQuery(t *testing.T) {
|
|||||||
dsInfo := &DatasourceInfo{}
|
dsInfo := &DatasourceInfo{}
|
||||||
models, err := service.parseQuery(query, dsInfo)
|
models, err := service.parseQuery(query, dsInfo)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, Range, models[0].QueryType)
|
require.Equal(t, true, models[0].RangeQuery)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("parsing query model of with default query type", func(t *testing.T) {
|
t.Run("parsing query model of range and instant query", func(t *testing.T) {
|
||||||
|
timeRange := backend.TimeRange{
|
||||||
|
From: now,
|
||||||
|
To: now.Add(48 * time.Hour),
|
||||||
|
}
|
||||||
|
|
||||||
|
query := queryContext(`{
|
||||||
|
"expr": "go_goroutines",
|
||||||
|
"format": "time_series",
|
||||||
|
"intervalFactor": 1,
|
||||||
|
"refId": "A",
|
||||||
|
"range": true,
|
||||||
|
"instant": true
|
||||||
|
}`, timeRange)
|
||||||
|
|
||||||
|
dsInfo := &DatasourceInfo{}
|
||||||
|
models, err := service.parseQuery(query, dsInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, true, models[0].RangeQuery)
|
||||||
|
require.Equal(t, true, models[0].InstantQuery)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("parsing query model of with no query type", func(t *testing.T) {
|
||||||
timeRange := backend.TimeRange{
|
timeRange := backend.TimeRange{
|
||||||
From: now,
|
From: now,
|
||||||
To: now.Add(48 * time.Hour),
|
To: now.Add(48 * time.Hour),
|
||||||
@@ -322,7 +344,7 @@ func TestPrometheus_parseQuery(t *testing.T) {
|
|||||||
dsInfo := &DatasourceInfo{}
|
dsInfo := &DatasourceInfo{}
|
||||||
models, err := service.parseQuery(query, dsInfo)
|
models, err := service.parseQuery(query, dsInfo)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, Range, models[0].QueryType)
|
require.Equal(t, true, models[0].RangeQuery)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -335,7 +357,8 @@ func TestPrometheus_parseResponse(t *testing.T) {
|
|||||||
{Value: 4, Timestamp: 4000},
|
{Value: 4, Timestamp: 4000},
|
||||||
{Value: 5, Timestamp: 5000},
|
{Value: 5, Timestamp: 5000},
|
||||||
}
|
}
|
||||||
value := p.Matrix{
|
value := make(map[PrometheusQueryType]p.Value)
|
||||||
|
value[Range] = p.Matrix{
|
||||||
&p.SampleStream{
|
&p.SampleStream{
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||||
Values: values,
|
Values: values,
|
||||||
@@ -363,7 +386,8 @@ func TestPrometheus_parseResponse(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("vector response should be parsed normally", func(t *testing.T) {
|
t.Run("vector response should be parsed normally", func(t *testing.T) {
|
||||||
value := p.Vector{
|
value := make(map[PrometheusQueryType]p.Value)
|
||||||
|
value[Range] = p.Vector{
|
||||||
&p.Sample{
|
&p.Sample{
|
||||||
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
|
||||||
Value: 1,
|
Value: 1,
|
||||||
@@ -392,10 +416,12 @@ func TestPrometheus_parseResponse(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("scalar response should be parsed normally", func(t *testing.T) {
|
t.Run("scalar response should be parsed normally", func(t *testing.T) {
|
||||||
value := &p.Scalar{
|
value := make(map[PrometheusQueryType]p.Value)
|
||||||
|
value[Range] = &p.Scalar{
|
||||||
Value: 1,
|
Value: 1,
|
||||||
Timestamp: 1000,
|
Timestamp: 1000,
|
||||||
}
|
}
|
||||||
|
|
||||||
query := &PrometheusQuery{}
|
query := &PrometheusQuery{}
|
||||||
res, err := parseResponse(value, query)
|
res, err := parseResponse(value, query)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|||||||
@@ -1,6 +1,18 @@
|
|||||||
package prometheus
|
package prometheus
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DatasourceInfo struct {
|
||||||
|
ID int64
|
||||||
|
HTTPClientOpts sdkhttpclient.Options
|
||||||
|
URL string
|
||||||
|
HTTPMethod string
|
||||||
|
TimeInterval string
|
||||||
|
}
|
||||||
|
|
||||||
type PrometheusQuery struct {
|
type PrometheusQuery struct {
|
||||||
Expr string
|
Expr string
|
||||||
@@ -9,13 +21,26 @@ type PrometheusQuery struct {
|
|||||||
Start time.Time
|
Start time.Time
|
||||||
End time.Time
|
End time.Time
|
||||||
RefId string
|
RefId string
|
||||||
QueryType PrometheusQueryType
|
InstantQuery bool
|
||||||
|
RangeQuery bool
|
||||||
|
UtcOffsetSec int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryModel struct {
|
||||||
|
Expr string `json:"expr"`
|
||||||
|
LegendFormat string `json:"legendFormat"`
|
||||||
|
Interval string `json:"interval"`
|
||||||
|
IntervalMS int64 `json:"intervalMS"`
|
||||||
|
StepMode string `json:"stepMode"`
|
||||||
|
RangeQuery bool `json:"range"`
|
||||||
|
InstantQuery bool `json:"instant"`
|
||||||
|
IntervalFactor int64 `json:"intervalFactor"`
|
||||||
|
UtcOffsetSec int64 `json:"utcOffsetSec"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type PrometheusQueryType string
|
type PrometheusQueryType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Range PrometheusQueryType = "range"
|
Range PrometheusQueryType = "range"
|
||||||
//This is currently not used, but we will use it in next iteration
|
|
||||||
Instant PrometheusQueryType = "instant"
|
Instant PrometheusQueryType = "instant"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -18,6 +18,10 @@ export const PromExploreQueryEditor: FC<Props> = (props: Props) => {
|
|||||||
if (query.exemplar === undefined) {
|
if (query.exemplar === undefined) {
|
||||||
onChange({ ...query, exemplar: true });
|
onChange({ ...query, exemplar: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!query.instant && !query.range) {
|
||||||
|
onChange({ ...query, instant: true, range: true });
|
||||||
|
}
|
||||||
}, [onChange, query]);
|
}, [onChange, query]);
|
||||||
|
|
||||||
function onChangeQueryStep(value: string) {
|
function onChangeQueryStep(value: string) {
|
||||||
|
|||||||
@@ -289,36 +289,18 @@ export class PrometheusDatasource extends DataSourceWithBackend<PromQuery, PromO
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
prepareOptionsV2 = (options: DataQueryRequest<PromQuery>) => {
|
|
||||||
const targets = options.targets.map((target) => {
|
|
||||||
//This is currently only preparing options for Explore queries where we know the format of data we want to receive
|
|
||||||
if (target.instant) {
|
|
||||||
return { ...target, instant: true, range: false, format: 'table' };
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
...target,
|
|
||||||
instant: false,
|
|
||||||
range: true,
|
|
||||||
format: 'time_series',
|
|
||||||
utcOffsetSec: this.timeSrv.timeRange().to.utcOffset() * 60,
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
return { ...options, targets };
|
|
||||||
};
|
|
||||||
|
|
||||||
query(options: DataQueryRequest<PromQuery>): Observable<DataQueryResponse> {
|
query(options: DataQueryRequest<PromQuery>): Observable<DataQueryResponse> {
|
||||||
// WIP - currently we want to run trough backend only if all queries are explore + range/instant queries
|
// WIP - currently we want to run trough backend only if all queries are explore + range/instant queries
|
||||||
const shouldRunBackendQuery =
|
const shouldRunBackendQuery =
|
||||||
this.access === 'proxy' &&
|
this.access === 'proxy' && options.app === CoreApp.Explore && !options.targets.some((query) => query.exemplar);
|
||||||
options.app === CoreApp.Explore &&
|
|
||||||
!options.targets.some((query) => query.exemplar) &&
|
|
||||||
// When running both queries, run through proxy
|
|
||||||
!options.targets.some((query) => query.instant && query.range);
|
|
||||||
|
|
||||||
if (shouldRunBackendQuery) {
|
if (shouldRunBackendQuery) {
|
||||||
const newOptions = this.prepareOptionsV2(options);
|
const targets = options.targets.map((target) => ({
|
||||||
return super.query(newOptions).pipe(map((response) => transformV2(response, newOptions)));
|
...target,
|
||||||
|
// We need to pass utcOffsetSec to backend to calculate aligned range
|
||||||
|
utcOffsetSec: this.timeSrv.timeRange().to.utcOffset() * 60,
|
||||||
|
}));
|
||||||
|
return super.query({ ...options, targets }).pipe(map((response) => transformV2(response, options)));
|
||||||
// Run queries trough browser/proxy
|
// Run queries trough browser/proxy
|
||||||
} else {
|
} else {
|
||||||
const start = this.getPrometheusTime(options.range.from, false);
|
const start = this.getPrometheusTime(options.range.from, false);
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import {
|
|||||||
DataQueryResponse,
|
DataQueryResponse,
|
||||||
DataQueryRequest,
|
DataQueryRequest,
|
||||||
PreferredVisualisationType,
|
PreferredVisualisationType,
|
||||||
|
CoreApp,
|
||||||
} from '@grafana/data';
|
} from '@grafana/data';
|
||||||
import { FetchResponse, getDataSourceSrv, getTemplateSrv } from '@grafana/runtime';
|
import { FetchResponse, getDataSourceSrv, getTemplateSrv } from '@grafana/runtime';
|
||||||
import { partition } from 'lodash';
|
import { partition } from 'lodash';
|
||||||
@@ -41,12 +42,25 @@ interface TimeAndValue {
|
|||||||
[TIME_SERIES_VALUE_FIELD_NAME]: number;
|
[TIME_SERIES_VALUE_FIELD_NAME]: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const isTableResult = (dataFrame: DataFrame, options: DataQueryRequest<PromQuery>): boolean => {
|
||||||
|
// We want to process instant results in Explore as table
|
||||||
|
if ((options.app === CoreApp.Explore && dataFrame.meta?.custom?.queryType) === 'instant') {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We want to process all dataFrames with target.format === 'table' as table
|
||||||
|
const target = options.targets.find((target) => target.refId === dataFrame.refId);
|
||||||
|
if (target?.format === 'table') {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
// V2 result trasnformer used to transform query results from queries that were run trough prometheus backend
|
// V2 result trasnformer used to transform query results from queries that were run trough prometheus backend
|
||||||
export function transformV2(response: DataQueryResponse, options: DataQueryRequest<PromQuery>) {
|
export function transformV2(response: DataQueryResponse, options: DataQueryRequest<PromQuery>) {
|
||||||
// Get refIds that have table format as we need to process those to table reuslts
|
|
||||||
const tableRefIds = options.targets.filter((target) => target.format === 'table').map((target) => target.refId);
|
|
||||||
const [tableResults, otherResults]: [DataFrame[], DataFrame[]] = partition(response.data, (dataFrame) =>
|
const [tableResults, otherResults]: [DataFrame[], DataFrame[]] = partition(response.data, (dataFrame) =>
|
||||||
dataFrame.refId ? tableRefIds.includes(dataFrame.refId) : false
|
isTableResult(dataFrame, options)
|
||||||
);
|
);
|
||||||
|
|
||||||
// For table results, we need to transform data frames to table data frames
|
// For table results, we need to transform data frames to table data frames
|
||||||
|
|||||||
Reference in New Issue
Block a user