mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Tempo: Support TraceQL instant metrics queries (#99732)
Support TraceQL instant metrics
This commit is contained in:
parent
90c18099a5
commit
7883215c68
@ -30,6 +30,10 @@ export interface TempoQuery extends common.DataQuery {
|
||||
* @deprecated Define the maximum duration to select traces. Use duration format, for example: 1.2s, 100ms
|
||||
*/
|
||||
maxDuration?: string;
|
||||
/**
|
||||
* For metric queries, whether to run instant or range queries
|
||||
*/
|
||||
metricsQueryType?: MetricsQueryType;
|
||||
/**
|
||||
* @deprecated Define the minimum duration to select traces. Use duration format, for example: 1.2s, 100ms
|
||||
*/
|
||||
@ -79,6 +83,11 @@ export const defaultTempoQuery: Partial<TempoQuery> = {
|
||||
|
||||
export type TempoQueryType = ('traceql' | 'traceqlSearch' | 'serviceMap' | 'upload' | 'nativeSearch' | 'traceId' | 'clear');
|
||||
|
||||
export enum MetricsQueryType {
|
||||
Instant = 'instant',
|
||||
Range = 'range',
|
||||
}
|
||||
|
||||
/**
|
||||
* The state of the TraceQL streaming search query
|
||||
*/
|
||||
|
@ -54,13 +54,15 @@ type TempoQuery struct {
|
||||
TableType *SearchTableType `json:"tableType,omitempty"`
|
||||
// For metric queries, the step size to use
|
||||
Step *string `json:"step,omitempty"`
|
||||
// For metric queries, how many exemplars to request, 0 means no exemplars
|
||||
Exemplars *int64 `json:"exemplars,omitempty"`
|
||||
// For mixed data sources the selected datasource is on the query level.
|
||||
// For non mixed scenarios this is undefined.
|
||||
// TODO find a better way to do this ^ that's friendly to schema
|
||||
// TODO this shouldn't be unknown but DataSourceRef | null
|
||||
Datasource any `json:"datasource,omitempty"`
|
||||
// For metric queries, how many exemplars to request, 0 means no exemplars
|
||||
Exemplars *int64 `json:"exemplars,omitempty"`
|
||||
// For metric queries, whether to run instant or range queries
|
||||
MetricsQueryType *MetricsQueryType `json:"metricsQueryType,omitempty"`
|
||||
}
|
||||
|
||||
// NewTempoQuery creates a new TempoQuery object.
|
||||
@ -80,6 +82,13 @@ const (
|
||||
TempoQueryTypeClear TempoQueryType = "clear"
|
||||
)
|
||||
|
||||
type MetricsQueryType string
|
||||
|
||||
const (
|
||||
MetricsQueryTypeRange MetricsQueryType = "range"
|
||||
MetricsQueryTypeInstant MetricsQueryType = "instant"
|
||||
)
|
||||
|
||||
// The state of the TraceQL streaming search query
|
||||
type SearchStreamingState string
|
||||
|
||||
|
@ -59,6 +59,46 @@ func TransformMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryRan
|
||||
return append(frames, exemplarFrames...)
|
||||
}
|
||||
|
||||
func TransformInstantMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryInstantResponse) []*data.Frame {
|
||||
frames := make([]*data.Frame, len(resp.Series))
|
||||
|
||||
for i, series := range resp.Series {
|
||||
name, labels := transformLabelsAndGetName(series.Labels)
|
||||
|
||||
labelKeys := make([]string, 0, len(labels))
|
||||
labelFields := make([]*data.Field, 0, len(labels))
|
||||
for key := range labels {
|
||||
labelKeys = append(labelKeys, key)
|
||||
labelFields = append(labelFields, data.NewField(key, nil, []string{}))
|
||||
}
|
||||
|
||||
timeField := data.NewField("time", nil, []time.Time{})
|
||||
valueField := data.NewField("value", labels, []float64{})
|
||||
valueField.Config = &data.FieldConfig{
|
||||
DisplayName: name,
|
||||
}
|
||||
|
||||
frame := &data.Frame{
|
||||
RefID: name,
|
||||
Name: name,
|
||||
Fields: append([]*data.Field{timeField}, append(labelFields, valueField)...),
|
||||
Meta: &data.FrameMeta{
|
||||
PreferredVisualization: data.VisTypeTable,
|
||||
},
|
||||
}
|
||||
|
||||
labelValues := make([]interface{}, len(labels))
|
||||
for idx, key := range labelKeys {
|
||||
labelValues[idx] = strings.Trim(labels[key], "\"")
|
||||
}
|
||||
row := append([]interface{}{time.Now()}, append(labelValues, series.GetValue())...)
|
||||
frame.AppendRow(row...)
|
||||
|
||||
frames[i] = frame
|
||||
}
|
||||
return frames
|
||||
}
|
||||
|
||||
func metricsValueToString(value *v1.AnyValue) (string, string) {
|
||||
switch value.GetValue().(type) {
|
||||
case *v1.AnyValue_DoubleValue:
|
||||
|
@ -122,3 +122,47 @@ func TestTransformMetricsResponse_MultipleSeries(t *testing.T) {
|
||||
assert.Equal(t, time.UnixMilli(1638316800000), frames[1].Fields[0].At(0))
|
||||
assert.Equal(t, 4.56, frames[1].Fields[1].At(0))
|
||||
}
|
||||
|
||||
func TestTransformInstantMetricsResponse(t *testing.T) {
|
||||
query := &dataquery.TempoQuery{}
|
||||
resp := tempopb.QueryInstantResponse{
|
||||
Series: []*tempopb.InstantSeries{
|
||||
{
|
||||
Labels: []v1.KeyValue{
|
||||
{
|
||||
Key: "label",
|
||||
Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "value"}},
|
||||
},
|
||||
},
|
||||
Value: 123.45,
|
||||
PromLabels: "label=\"value\"",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
frames := TransformInstantMetricsResponse(query, resp)
|
||||
|
||||
assert.Len(t, frames, 1)
|
||||
frame := frames[0]
|
||||
|
||||
assert.Equal(t, "value", frame.RefID)
|
||||
assert.Equal(t, "value", frame.Name)
|
||||
assert.Len(t, frame.Fields, 3)
|
||||
|
||||
timeField := frame.Fields[0]
|
||||
assert.Equal(t, "time", timeField.Name)
|
||||
assert.Equal(t, 1, timeField.Len())
|
||||
assert.IsType(t, time.Time{}, timeField.At(0))
|
||||
|
||||
labelField := frame.Fields[1]
|
||||
assert.Equal(t, "label", labelField.Name)
|
||||
assert.Equal(t, 1, labelField.Len())
|
||||
assert.IsType(t, "", labelField.At(0))
|
||||
assert.Equal(t, "value", labelField.At(0))
|
||||
|
||||
valueField := frame.Fields[2]
|
||||
assert.Equal(t, "value", valueField.Name)
|
||||
assert.Equal(t, 1, valueField.Len())
|
||||
assert.IsType(t, 0.0, valueField.At(0))
|
||||
assert.Equal(t, 123.45, valueField.At(0).(float64))
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
//nolint:all
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/traceql"
|
||||
@ -86,21 +87,40 @@ func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.Plugi
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var queryResponse tempopb.QueryRangeResponse
|
||||
err = jsonpb.Unmarshal(bytes.NewReader(responseBody), &queryResponse)
|
||||
if isInstantQuery(tempoQuery.MetricsQueryType) {
|
||||
var queryResponse tempopb.QueryInstantResponse
|
||||
err = jsonpb.Unmarshal(bytes.NewReader(responseBody), &queryResponse)
|
||||
|
||||
if res, err := handleConversionError(ctxLogger, span, err); err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
frames := traceql.TransformInstantMetricsResponse(tempoQuery, queryResponse)
|
||||
result.Frames = frames
|
||||
} else {
|
||||
var queryResponse tempopb.QueryRangeResponse
|
||||
err = jsonpb.Unmarshal(bytes.NewReader(responseBody), &queryResponse)
|
||||
|
||||
if res, err := handleConversionError(ctxLogger, span, err); err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
frames := traceql.TransformMetricsResponse(tempoQuery, queryResponse)
|
||||
result.Frames = frames
|
||||
}
|
||||
|
||||
ctxLogger.Debug("Successfully performed TraceQL query", "function", logEntrypoint())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func handleConversionError(ctxLogger log.Logger, span trace.Span, err error) (*backend.DataResponse, error) {
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to convert response to type", "error", err, "function", logEntrypoint())
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return &backend.DataResponse{}, fmt.Errorf("failed to convert response to type: %w", err)
|
||||
}
|
||||
|
||||
frames := traceql.TransformMetricsResponse(tempoQuery, queryResponse)
|
||||
|
||||
result.Frames = frames
|
||||
ctxLogger.Debug("Successfully performed TraceQL query", "function", logEntrypoint())
|
||||
return result, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *Service) performMetricsQuery(ctx context.Context, dsInfo *Datasource, model *dataquery.TempoQuery, query backend.DataQuery, span trace.Span) (*http.Response, []byte, error) {
|
||||
@ -133,7 +153,12 @@ func (s *Service) performMetricsQuery(ctx context.Context, dsInfo *Datasource, m
|
||||
func (s *Service) createMetricsQuery(ctx context.Context, dsInfo *Datasource, query *dataquery.TempoQuery, start int64, end int64) (*http.Request, error) {
|
||||
ctxLogger := s.logger.FromContext(ctx)
|
||||
|
||||
rawUrl := fmt.Sprintf("%s/api/metrics/query_range", dsInfo.URL)
|
||||
queryType := "query_range"
|
||||
if isInstantQuery(query.MetricsQueryType) {
|
||||
queryType = "query"
|
||||
}
|
||||
|
||||
rawUrl := fmt.Sprintf("%s/api/metrics/%s", dsInfo.URL, queryType)
|
||||
searchUrl, err := url.Parse(rawUrl)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to parse URL", "url", rawUrl, "error", err, "function", logEntrypoint())
|
||||
@ -167,6 +192,13 @@ func (s *Service) createMetricsQuery(ctx context.Context, dsInfo *Datasource, qu
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func isInstantQuery(metricQueryType *dataquery.MetricsQueryType) bool {
|
||||
if metricQueryType == nil {
|
||||
return false
|
||||
}
|
||||
return *metricQueryType == dataquery.MetricsQueryTypeInstant
|
||||
}
|
||||
|
||||
func isMetricsQuery(query string) bool {
|
||||
match, _ := regexp.MatchString("\\|\\s*(rate|count_over_time|avg_over_time|max_over_time|min_over_time|quantile_over_time|histogram_over_time|compare)\\s*\\(", query)
|
||||
return match
|
||||
|
@ -55,10 +55,14 @@ composableKinds: DataQuery: {
|
||||
step?: string
|
||||
// For metric queries, how many exemplars to request, 0 means no exemplars
|
||||
exemplars?: int64
|
||||
// For metric queries, whether to run instant or range queries
|
||||
metricsQueryType?: #MetricsQueryType
|
||||
} @cuetsy(kind="interface") @grafana(TSVeneer="type")
|
||||
|
||||
#TempoQueryType: "traceql" | "traceqlSearch" | "serviceMap" | "upload" | "nativeSearch" | "traceId" | "clear" @cuetsy(kind="type")
|
||||
|
||||
#MetricsQueryType: "range" | "instant" @cuetsy(kind="enum")
|
||||
|
||||
// The state of the TraceQL streaming search query
|
||||
#SearchStreamingState: "pending" | "streaming" | "done" | "error" @cuetsy(kind="enum")
|
||||
|
||||
|
@ -28,6 +28,10 @@ export interface TempoQuery extends common.DataQuery {
|
||||
* @deprecated Define the maximum duration to select traces. Use duration format, for example: 1.2s, 100ms
|
||||
*/
|
||||
maxDuration?: string;
|
||||
/**
|
||||
* For metric queries, whether to run instant or range queries
|
||||
*/
|
||||
metricsQueryType?: MetricsQueryType;
|
||||
/**
|
||||
* @deprecated Define the minimum duration to select traces. Use duration format, for example: 1.2s, 100ms
|
||||
*/
|
||||
@ -77,6 +81,11 @@ export const defaultTempoQuery: Partial<TempoQuery> = {
|
||||
|
||||
export type TempoQueryType = ('traceql' | 'traceqlSearch' | 'serviceMap' | 'upload' | 'nativeSearch' | 'traceId' | 'clear');
|
||||
|
||||
export enum MetricsQueryType {
|
||||
Instant = 'instant',
|
||||
Range = 'range',
|
||||
}
|
||||
|
||||
/**
|
||||
* The state of the TraceQL streaming search query
|
||||
*/
|
||||
|
@ -6,7 +6,7 @@ import { EditorField, EditorRow } from '@grafana/experimental';
|
||||
import { AutoSizeInput, RadioButtonGroup, useStyles2 } from '@grafana/ui';
|
||||
|
||||
import { QueryOptionGroup } from '../_importedDependencies/datasources/prometheus/QueryOptionGroup';
|
||||
import { SearchTableType } from '../dataquery.gen';
|
||||
import { SearchTableType, MetricsQueryType } from '../dataquery.gen';
|
||||
import { DEFAULT_LIMIT, DEFAULT_SPSS } from '../datasource';
|
||||
import { TempoQuery } from '../types';
|
||||
|
||||
@ -40,6 +40,10 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
|
||||
query.tableType = SearchTableType.Traces;
|
||||
}
|
||||
|
||||
if (!query.hasOwnProperty('metricsQueryType')) {
|
||||
query.metricsQueryType = MetricsQueryType.Range;
|
||||
}
|
||||
|
||||
const onLimitChange = (e: React.FormEvent<HTMLInputElement>) => {
|
||||
onChange({ ...query, limit: parseIntWithFallback(e.currentTarget.value, DEFAULT_LIMIT) });
|
||||
};
|
||||
@ -49,6 +53,9 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
|
||||
const onTableTypeChange = (val: SearchTableType) => {
|
||||
onChange({ ...query, tableType: val });
|
||||
};
|
||||
const onMetricsQueryTypeChange = (val: MetricsQueryType) => {
|
||||
onChange({ ...query, metricsQueryType: val });
|
||||
};
|
||||
const onStepChange = (e: React.FormEvent<HTMLInputElement>) => {
|
||||
onChange({ ...query, step: e.currentTarget.value });
|
||||
};
|
||||
@ -74,6 +81,7 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
|
||||
|
||||
const collapsedMetricsOptions = [
|
||||
`Step: ${query.step || 'auto'}`,
|
||||
`Type: ${query.metricsQueryType === MetricsQueryType.Range ? 'Range' : 'Instant'}`,
|
||||
// `Exemplars: ${query.exemplars !== undefined ? query.exemplars : 'auto'}`,
|
||||
];
|
||||
|
||||
@ -132,6 +140,16 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
|
||||
value={query.step}
|
||||
/>
|
||||
</EditorField>
|
||||
<EditorField label="Type" tooltip="Type of metrics query to run">
|
||||
<RadioButtonGroup
|
||||
options={[
|
||||
{ label: 'Range', value: MetricsQueryType.Range },
|
||||
{ label: 'Instant', value: MetricsQueryType.Instant },
|
||||
]}
|
||||
value={query.metricsQueryType}
|
||||
onChange={onMetricsQueryTypeChange}
|
||||
/>
|
||||
</EditorField>
|
||||
{/*<EditorField*/}
|
||||
{/* label="Exemplars"*/}
|
||||
{/* tooltip="Defines the amount of exemplars to request for metric queries. A value of 0 means no exemplars."*/}
|
||||
|
Loading…
Reference in New Issue
Block a user