mirror of
https://github.com/grafana/grafana.git
synced 2025-02-16 18:34:52 -06:00
Tempo: Run TraceQL metrics queries through backend (#96246)
* Move TraceQL metrics queries to backend * Add tests for TransformMetricsResponse * Add tests for createMetricsQuery * Refactor, detect metric queries and throw error for search queries * Remove unneeded types * Don't add start and end if they are 0. Return on err * lint * Fix pre-alloc of frames * Address PR comments
This commit is contained in:
parent
6d97d170f6
commit
bf74b9c9c0
@ -109,8 +109,11 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
}
|
||||
|
||||
func (s *Service) query(ctx context.Context, pCtx backend.PluginContext, query backend.DataQuery) (*backend.DataResponse, error) {
|
||||
if query.QueryType == string(dataquery.TempoQueryTypeTraceId) {
|
||||
switch query.QueryType {
|
||||
case string(dataquery.TempoQueryTypeTraceId):
|
||||
return s.getTrace(ctx, pCtx, query)
|
||||
case string(dataquery.TempoQueryTypeTraceql):
|
||||
return s.runTraceQlQuery(ctx, pCtx, query)
|
||||
}
|
||||
return nil, fmt.Errorf("unsupported query type: '%s' for query with refID '%s'", query.QueryType, query.RefID)
|
||||
}
|
||||
|
@ -62,8 +62,7 @@ func TraceToFrame(resourceSpans []*tracev11.ResourceSpans) (*data.Frame, error)
|
||||
data.NewField("tags", nil, []json.RawMessage{}),
|
||||
},
|
||||
Meta: &data.FrameMeta{
|
||||
// TODO: use constant once available in the SDK
|
||||
PreferredVisualization: "trace",
|
||||
PreferredVisualization: data.VisTypeTrace,
|
||||
},
|
||||
}
|
||||
|
||||
|
76
pkg/tsdb/tempo/traceql/metrics.go
Normal file
76
pkg/tsdb/tempo/traceql/metrics.go
Normal file
@ -0,0 +1,76 @@
|
||||
package traceql
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/tempo/pkg/tempopb"
|
||||
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
|
||||
)
|
||||
|
||||
func TransformMetricsResponse(resp tempopb.QueryRangeResponse) []*data.Frame {
|
||||
// prealloc frames
|
||||
frames := make([]*data.Frame, len(resp.Series))
|
||||
for i, series := range resp.Series {
|
||||
labels := make(data.Labels)
|
||||
for _, label := range series.Labels {
|
||||
labels[label.GetKey()] = metricsValueToString(label.GetValue())
|
||||
}
|
||||
|
||||
name := ""
|
||||
if len(series.Labels) > 0 {
|
||||
if len(series.Labels) == 1 {
|
||||
name = metricsValueToString(series.Labels[0].GetValue())
|
||||
} else {
|
||||
var labelStrings []string
|
||||
for key, val := range labels {
|
||||
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", key, val))
|
||||
}
|
||||
name = fmt.Sprintf("{%s}", strings.Join(labelStrings, ", "))
|
||||
}
|
||||
}
|
||||
|
||||
valueField := data.NewField(name, labels, []float64{})
|
||||
valueField.Config = &data.FieldConfig{
|
||||
DisplayName: name,
|
||||
}
|
||||
|
||||
timeField := data.NewField("time", nil, []time.Time{})
|
||||
|
||||
frame := &data.Frame{
|
||||
RefID: name,
|
||||
Name: "Trace",
|
||||
Fields: []*data.Field{
|
||||
timeField,
|
||||
valueField,
|
||||
},
|
||||
Meta: &data.FrameMeta{
|
||||
PreferredVisualization: data.VisTypeGraph,
|
||||
},
|
||||
}
|
||||
|
||||
for _, sample := range series.Samples {
|
||||
frame.AppendRow(time.UnixMilli(sample.GetTimestampMs()), sample.GetValue())
|
||||
}
|
||||
|
||||
frames[i] = frame
|
||||
}
|
||||
return frames
|
||||
}
|
||||
|
||||
func metricsValueToString(value *v1.AnyValue) string {
|
||||
switch value.GetValue().(type) {
|
||||
case *v1.AnyValue_DoubleValue:
|
||||
return strconv.FormatFloat(value.GetDoubleValue(), 'f', -1, 64)
|
||||
case *v1.AnyValue_IntValue:
|
||||
return strconv.FormatInt(value.GetIntValue(), 10)
|
||||
case *v1.AnyValue_StringValue:
|
||||
return fmt.Sprintf("\"%s\"", value.GetStringValue())
|
||||
case *v1.AnyValue_BoolValue:
|
||||
return strconv.FormatBool(value.GetBoolValue())
|
||||
}
|
||||
return ""
|
||||
}
|
112
pkg/tsdb/tempo/traceql/metrics_test.go
Normal file
112
pkg/tsdb/tempo/traceql/metrics_test.go
Normal file
@ -0,0 +1,112 @@
|
||||
package traceql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/tempo/pkg/tempopb"
|
||||
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTransformMetricsResponse_EmptyResponse(t *testing.T) {
|
||||
resp := tempopb.QueryRangeResponse{}
|
||||
frames := TransformMetricsResponse(resp)
|
||||
assert.Empty(t, frames)
|
||||
}
|
||||
|
||||
func TestTransformMetricsResponse_SingleSeriesSingleLabel(t *testing.T) {
|
||||
resp := tempopb.QueryRangeResponse{
|
||||
Series: []*tempopb.TimeSeries{
|
||||
{
|
||||
Labels: []v1.KeyValue{
|
||||
{Key: "label1", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "value1"}}},
|
||||
},
|
||||
Samples: []tempopb.Sample{
|
||||
{TimestampMs: 1638316800000, Value: 1.23},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
frames := TransformMetricsResponse(resp)
|
||||
assert.Len(t, frames, 1)
|
||||
assert.Equal(t, "\"value1\"", frames[0].RefID)
|
||||
assert.Equal(t, "Trace", frames[0].Name)
|
||||
assert.Len(t, frames[0].Fields, 2)
|
||||
assert.Equal(t, "time", frames[0].Fields[0].Name)
|
||||
assert.Equal(t, "\"value1\"", frames[0].Fields[1].Name)
|
||||
assert.Equal(t, data.VisTypeGraph, frames[0].Meta.PreferredVisualization)
|
||||
assert.Equal(t, time.UnixMilli(1638316800000), frames[0].Fields[0].At(0))
|
||||
assert.Equal(t, 1.23, frames[0].Fields[1].At(0))
|
||||
}
|
||||
|
||||
func TestTransformMetricsResponse_SingleSeriesMultipleLabels(t *testing.T) {
|
||||
resp := tempopb.QueryRangeResponse{
|
||||
Series: []*tempopb.TimeSeries{
|
||||
{
|
||||
Labels: []v1.KeyValue{
|
||||
{Key: "label1", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "value1"}}},
|
||||
{Key: "label2", Value: &v1.AnyValue{Value: &v1.AnyValue_IntValue{IntValue: 123}}},
|
||||
{Key: "label3", Value: &v1.AnyValue{Value: &v1.AnyValue_DoubleValue{DoubleValue: 123.456}}},
|
||||
{Key: "label4", Value: &v1.AnyValue{Value: &v1.AnyValue_BoolValue{BoolValue: true}}},
|
||||
},
|
||||
Samples: []tempopb.Sample{
|
||||
{TimestampMs: 1638316800000, Value: 1.23},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
frames := TransformMetricsResponse(resp)
|
||||
assert.Len(t, frames, 1)
|
||||
assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].RefID)
|
||||
assert.Equal(t, "Trace", frames[0].Name)
|
||||
assert.Len(t, frames[0].Fields, 2)
|
||||
assert.Equal(t, "time", frames[0].Fields[0].Name)
|
||||
assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].Fields[1].Name)
|
||||
assert.Equal(t, data.VisTypeGraph, frames[0].Meta.PreferredVisualization)
|
||||
assert.Equal(t, time.UnixMilli(1638316800000), frames[0].Fields[0].At(0))
|
||||
assert.Equal(t, 1.23, frames[0].Fields[1].At(0))
|
||||
}
|
||||
|
||||
func TestTransformMetricsResponse_MultipleSeries(t *testing.T) {
|
||||
resp := tempopb.QueryRangeResponse{
|
||||
Series: []*tempopb.TimeSeries{
|
||||
{
|
||||
Labels: []v1.KeyValue{
|
||||
{Key: "label1", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "value1"}}},
|
||||
},
|
||||
Samples: []tempopb.Sample{
|
||||
{TimestampMs: 1638316800000, Value: 1.23},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []v1.KeyValue{
|
||||
{Key: "label2", Value: &v1.AnyValue{Value: &v1.AnyValue_IntValue{IntValue: 456}}},
|
||||
},
|
||||
Samples: []tempopb.Sample{
|
||||
{TimestampMs: 1638316800000, Value: 4.56},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
frames := TransformMetricsResponse(resp)
|
||||
assert.Len(t, frames, 2)
|
||||
assert.Equal(t, "\"value1\"", frames[0].RefID)
|
||||
assert.Equal(t, "Trace", frames[0].Name)
|
||||
assert.Len(t, frames[0].Fields, 2)
|
||||
assert.Equal(t, "time", frames[0].Fields[0].Name)
|
||||
assert.Equal(t, "\"value1\"", frames[0].Fields[1].Name)
|
||||
assert.Equal(t, data.VisTypeGraph, frames[0].Meta.PreferredVisualization)
|
||||
assert.Equal(t, time.UnixMilli(1638316800000), frames[0].Fields[0].At(0))
|
||||
assert.Equal(t, 1.23, frames[0].Fields[1].At(0))
|
||||
|
||||
assert.Equal(t, "456", frames[1].RefID)
|
||||
assert.Equal(t, "Trace", frames[1].Name)
|
||||
assert.Len(t, frames[1].Fields, 2)
|
||||
assert.Equal(t, "time", frames[1].Fields[0].Name)
|
||||
assert.Equal(t, "456", frames[1].Fields[1].Name)
|
||||
assert.Equal(t, data.VisTypeGraph, frames[1].Meta.PreferredVisualization)
|
||||
assert.Equal(t, time.UnixMilli(1638316800000), frames[1].Fields[0].At(0))
|
||||
assert.Equal(t, 4.56, frames[1].Fields[1].At(0))
|
||||
}
|
170
pkg/tsdb/tempo/traceql_query.go
Normal file
170
pkg/tsdb/tempo/traceql_query.go
Normal file
@ -0,0 +1,170 @@
|
||||
package tempo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
//nolint:all
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/traceql"
|
||||
"github.com/grafana/tempo/pkg/tempopb"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func (s *Service) runTraceQlQuery(ctx context.Context, pCtx backend.PluginContext, backendQuery backend.DataQuery) (*backend.DataResponse, error) {
|
||||
ctxLogger := s.logger.FromContext(ctx)
|
||||
ctxLogger.Debug("Running TraceQL query", "function", logEntrypoint())
|
||||
|
||||
tempoQuery := &dataquery.TempoQuery{}
|
||||
err := json.Unmarshal(backendQuery.JSON, tempoQuery)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to unmarshall Tempo query model", "error", err, "function", logEntrypoint())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isMetricsQuery(*tempoQuery.Query) {
|
||||
return s.runTraceQlQueryMetrics(ctx, pCtx, backendQuery, tempoQuery)
|
||||
}
|
||||
|
||||
return s.runTraceQlQuerySearch()
|
||||
}
|
||||
|
||||
func (s *Service) runTraceQlQuerySearch() (*backend.DataResponse, error) {
|
||||
return nil, fmt.Errorf("backend TraceQL search queries are not supported")
|
||||
}
|
||||
|
||||
func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.PluginContext, backendQuery backend.DataQuery, tempoQuery *dataquery.TempoQuery) (*backend.DataResponse, error) {
|
||||
ctxLogger := s.logger.FromContext(ctx)
|
||||
ctxLogger.Debug("Running TraceQL Metrics query", "function", logEntrypoint())
|
||||
|
||||
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runTraceQLQuery", trace.WithAttributes(
|
||||
attribute.String("queryType", backendQuery.QueryType),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
result := &backend.DataResponse{}
|
||||
|
||||
dsInfo, err := s.getDSInfo(ctx, pCtx)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to get datasource information", "error", err, "function", logEntrypoint())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if tempoQuery.Query == nil || *tempoQuery.Query == "" {
|
||||
err := fmt.Errorf("query is required")
|
||||
ctxLogger.Error("Failed to validate model query", "error", err, "function", logEntrypoint())
|
||||
return result, err
|
||||
}
|
||||
|
||||
resp, responseBody, err := s.performMetricsQuery(ctx, dsInfo, tempoQuery, backendQuery, span)
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
ctxLogger.Error("Failed to execute TraceQL query", "error", err, "function", logEntrypoint())
|
||||
result.Error = fmt.Errorf("failed to execute TraceQL query: %s Status: %s Body: %s", *tempoQuery.Query, resp.Status, string(responseBody))
|
||||
span.RecordError(result.Error)
|
||||
span.SetStatus(codes.Error, result.Error.Error())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var queryResponse tempopb.QueryRangeResponse
|
||||
err = jsonpb.Unmarshal(bytes.NewReader(responseBody), &queryResponse)
|
||||
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to convert response to type", "error", err, "function", logEntrypoint())
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return &backend.DataResponse{}, fmt.Errorf("failed to convert response to type: %w", err)
|
||||
}
|
||||
|
||||
frames := traceql.TransformMetricsResponse(queryResponse)
|
||||
|
||||
result.Frames = frames
|
||||
ctxLogger.Debug("Successfully performed TraceQL query", "function", logEntrypoint())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *Service) performMetricsQuery(ctx context.Context, dsInfo *Datasource, model *dataquery.TempoQuery, query backend.DataQuery, span trace.Span) (*http.Response, []byte, error) {
|
||||
ctxLogger := s.logger.FromContext(ctx)
|
||||
request, err := s.createMetricsQuery(ctx, dsInfo, model, query.TimeRange.From.Unix(), query.TimeRange.To.Unix())
|
||||
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint())
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
resp, err := dsInfo.HTTPClient.Do(request)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to send request to Tempo", "error", err, "function", logEntrypoint())
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
return nil, nil, fmt.Errorf("failed get to tempo: %w", err)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to read response body", "error", err, "function", logEntrypoint())
|
||||
return nil, nil, err
|
||||
}
|
||||
return resp, body, nil
|
||||
}
|
||||
|
||||
func (s *Service) createMetricsQuery(ctx context.Context, dsInfo *Datasource, query *dataquery.TempoQuery, start int64, end int64) (*http.Request, error) {
|
||||
ctxLogger := s.logger.FromContext(ctx)
|
||||
|
||||
rawUrl := fmt.Sprintf("%s/api/metrics/query_range", dsInfo.URL)
|
||||
searchUrl, err := url.Parse(rawUrl)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to parse URL", "url", rawUrl, "error", err, "function", logEntrypoint())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q := searchUrl.Query()
|
||||
q.Set("q", *query.Query)
|
||||
if start > 0 {
|
||||
q.Set("start", strconv.FormatInt(start, 10))
|
||||
}
|
||||
if end > 0 {
|
||||
q.Set("end", strconv.FormatInt(end, 10))
|
||||
}
|
||||
if query.Step != nil {
|
||||
q.Set("step", *query.Step)
|
||||
}
|
||||
|
||||
searchUrl.RawQuery = q.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", searchUrl.String(), nil)
|
||||
if err != nil {
|
||||
ctxLogger.Error("Failed to create request", "error", err, "function", logEntrypoint())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Accept", "application/json")
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func isMetricsQuery(query string) bool {
|
||||
match, _ := regexp.MatchString("\\|\\s*(rate|count_over_time|avg_over_time|max_over_time|min_over_time|quantile_over_time|histogram_over_time|compare)\\s*\\(", query)
|
||||
return match
|
||||
}
|
133
pkg/tsdb/tempo/traceql_query_test.go
Normal file
133
pkg/tsdb/tempo/traceql_query_test.go
Normal file
@ -0,0 +1,133 @@
|
||||
package tempo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestCreateMetricsQuery_Success(t *testing.T) {
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.tempo.test")
|
||||
service := &Service{
|
||||
logger: logger,
|
||||
}
|
||||
dsInfo := &Datasource{
|
||||
URL: "http://tempo:3100",
|
||||
}
|
||||
queryVal := "{attribute=\"value\"}"
|
||||
stepVal := "14"
|
||||
query := &dataquery.TempoQuery{
|
||||
Query: &queryVal,
|
||||
Step: &stepVal,
|
||||
}
|
||||
start := int64(1625097600)
|
||||
end := int64(1625184000)
|
||||
|
||||
req, err := service.createMetricsQuery(context.Background(), dsInfo, query, start, end)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, req)
|
||||
assert.Equal(t, "http://tempo:3100/api/metrics/query_range?end=1625184000&q=%7Battribute%3D%22value%22%7D&start=1625097600&step=14", req.URL.String())
|
||||
assert.Equal(t, "application/json", req.Header.Get("Accept"))
|
||||
}
|
||||
func TestCreateMetricsQuery_OnlyQuery(t *testing.T) {
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.tempo.test")
|
||||
service := &Service{
|
||||
logger: logger,
|
||||
}
|
||||
dsInfo := &Datasource{
|
||||
URL: "http://tempo:3100",
|
||||
}
|
||||
queryVal := "{attribute=\"value\"}"
|
||||
query := &dataquery.TempoQuery{
|
||||
Query: &queryVal,
|
||||
}
|
||||
|
||||
req, err := service.createMetricsQuery(context.Background(), dsInfo, query, 0, 0)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, req)
|
||||
assert.Equal(t, "http://tempo:3100/api/metrics/query_range?q=%7Battribute%3D%22value%22%7D", req.URL.String())
|
||||
assert.Equal(t, "application/json", req.Header.Get("Accept"))
|
||||
}
|
||||
|
||||
func TestCreateMetricsQuery_URLParseError(t *testing.T) {
|
||||
logger := backend.NewLoggerWith("logger", "tsdb.tempo.test")
|
||||
service := &Service{
|
||||
logger: logger,
|
||||
}
|
||||
dsInfo := &Datasource{
|
||||
URL: "http://[::1]:namedport",
|
||||
}
|
||||
queryVal := "{attribute=\"value\"}"
|
||||
query := &dataquery.TempoQuery{
|
||||
Query: &queryVal,
|
||||
}
|
||||
start := int64(1625097600)
|
||||
end := int64(1625184000)
|
||||
|
||||
req, err := service.createMetricsQuery(context.Background(), dsInfo, query, start, end)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, req)
|
||||
}
|
||||
|
||||
func TestEmptyQueryString_ReturnsFalse(t *testing.T) {
|
||||
result := isMetricsQuery("")
|
||||
assert.False(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithoutMetricsFunction_ReturnsFalse(t *testing.T) {
|
||||
result := isMetricsQuery("{.some = \"random query\"} && {} >> {}")
|
||||
assert.False(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithRateFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | rate(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithAvgOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | avg_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithCountOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | count_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithMaxOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | max_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithMinOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | min_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithQuantileOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | quantile_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithHistogramOverTimeFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | histogram_over_time(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithCompareFunction_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | compare(foo)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithMultipleFunctions_ReturnsTrue(t *testing.T) {
|
||||
result := isMetricsQuery("{} | rate(foo) | avg_over_time(bar)")
|
||||
assert.True(t, result)
|
||||
}
|
||||
|
||||
func TestQueryWithInvalidFunction_ReturnsFalse(t *testing.T) {
|
||||
result := isMetricsQuery("{} | invalid_function(foo)")
|
||||
assert.False(t, result)
|
||||
}
|
@ -51,12 +51,7 @@ import {
|
||||
} from './graphTransform';
|
||||
import TempoLanguageProvider from './language_provider';
|
||||
import { createTableFrameFromMetricsSummaryQuery, emptyResponse, MetricsSummary } from './metricsSummary';
|
||||
import {
|
||||
formatTraceQLMetrics,
|
||||
formatTraceQLResponse,
|
||||
transformFromOTLP as transformFromOTEL,
|
||||
transformTrace,
|
||||
} from './resultTransformer';
|
||||
import { formatTraceQLResponse, transformFromOTLP as transformFromOTEL, transformTrace } from './resultTransformer';
|
||||
import { doTempoChannelStream } from './streaming';
|
||||
import { TempoJsonData, TempoQuery } from './types';
|
||||
import { getErrorMessage, migrateFromSearchToTraceQLSearch } from './utils';
|
||||
@ -354,7 +349,7 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
grafana_version: config.buildInfo.version,
|
||||
query: queryValue ?? '',
|
||||
});
|
||||
subQueries.push(this.handleTraceQlMetricsQuery(options, queryValue));
|
||||
subQueries.push(this.handleTraceQlMetricsQuery(options));
|
||||
} else {
|
||||
reportInteraction('grafana_traces_traceql_queried', {
|
||||
datasourceType: 'tempo',
|
||||
@ -593,26 +588,20 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
}
|
||||
};
|
||||
|
||||
handleTraceQlMetricsQuery = (
|
||||
options: DataQueryRequest<TempoQuery>,
|
||||
queryValue: string
|
||||
): Observable<DataQueryResponse> => {
|
||||
const requestData = {
|
||||
query: queryValue,
|
||||
start: options.range.from.unix(),
|
||||
end: options.range.to.unix(),
|
||||
step: options.targets[0].step,
|
||||
};
|
||||
|
||||
if (!requestData.step) {
|
||||
delete requestData.step;
|
||||
handleTraceQlMetricsQuery = (options: DataQueryRequest<TempoQuery>): Observable<DataQueryResponse> => {
|
||||
const validTargets = options.targets
|
||||
.filter((t) => t.query)
|
||||
.map(
|
||||
(t): TempoQuery => ({ ...t, query: this.applyVariables(t, options.scopedVars).query, queryType: 'traceql' })
|
||||
);
|
||||
if (!validTargets.length) {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return this._request('/api/metrics/query_range', requestData).pipe(
|
||||
const request = { ...options, targets: validTargets };
|
||||
return super.query(request).pipe(
|
||||
map((response) => {
|
||||
return {
|
||||
data: formatTraceQLMetrics(queryValue, response.data),
|
||||
};
|
||||
return response;
|
||||
}),
|
||||
catchError((err) => {
|
||||
return of({ error: { message: getErrorMessage(err.data.message) }, data: [] });
|
||||
|
@ -15,7 +15,6 @@ import {
|
||||
FieldDTO,
|
||||
FieldType,
|
||||
getDisplayProcessor,
|
||||
Labels,
|
||||
MutableDataFrame,
|
||||
toDataFrame,
|
||||
TraceKeyValuePair,
|
||||
@ -28,15 +27,7 @@ import { getDataSourceSrv } from '@grafana/runtime';
|
||||
|
||||
import { SearchTableType } from './dataquery.gen';
|
||||
import { createGraphFrames } from './graphTransform';
|
||||
import {
|
||||
ProtoValue,
|
||||
Span,
|
||||
SpanAttributes,
|
||||
Spanset,
|
||||
TempoJsonData,
|
||||
TraceqlMetricsResponse,
|
||||
TraceSearchMetadata,
|
||||
} from './types';
|
||||
import { Span, SpanAttributes, Spanset, TempoJsonData, TraceSearchMetadata } from './types';
|
||||
|
||||
function getAttributeValue(value: collectorTypes.opentelemetryProto.common.v1.AnyValue): any {
|
||||
if (value.stringValue) {
|
||||
@ -474,56 +465,6 @@ function transformToTraceData(data: TraceSearchMetadata) {
|
||||
};
|
||||
}
|
||||
|
||||
const metricsValueToString = (value: ProtoValue): string => {
|
||||
if (value.stringValue) {
|
||||
return `"${value.stringValue}"`;
|
||||
}
|
||||
return '' + (value.intValue || value.doubleValue || value.boolValue || '""');
|
||||
};
|
||||
|
||||
export function formatTraceQLMetrics(query: string, data: TraceqlMetricsResponse) {
|
||||
const frames = data.series.map((series, index) => {
|
||||
const labels: Labels = {};
|
||||
series.labels?.forEach((label) => {
|
||||
labels[label.key] = metricsValueToString(label.value);
|
||||
});
|
||||
// If it's a single series, use the query as the displayName fallback
|
||||
let name = data.series.length === 1 ? query : '';
|
||||
if (series.labels) {
|
||||
if (series.labels.length === 1) {
|
||||
// For single label series, use the label value as the displayName to improve readability
|
||||
name = metricsValueToString(series.labels[0].value);
|
||||
} else {
|
||||
// otherwise build a string using the label keys and values
|
||||
name = `{${series.labels.map((label) => `${label.key}=${metricsValueToString(label.value)}`).join(', ')}}`;
|
||||
}
|
||||
}
|
||||
return createDataFrame({
|
||||
refId: name || `A${index}`,
|
||||
fields: [
|
||||
{
|
||||
name: 'time',
|
||||
type: FieldType.time,
|
||||
values: series.samples.map((sample) => parseInt(sample.timestampMs, 10)),
|
||||
},
|
||||
{
|
||||
name: name,
|
||||
labels,
|
||||
type: FieldType.number,
|
||||
values: series.samples.map((sample) => sample.value),
|
||||
config: {
|
||||
displayNameFromDS: name,
|
||||
},
|
||||
},
|
||||
],
|
||||
meta: {
|
||||
preferredVisualisationType: 'graph',
|
||||
},
|
||||
});
|
||||
});
|
||||
return frames;
|
||||
}
|
||||
|
||||
export function formatTraceQLResponse(
|
||||
data: TraceSearchMetadata[],
|
||||
instanceSettings: DataSourceInstanceSettings,
|
||||
|
@ -106,32 +106,3 @@ export type Scope = {
|
||||
name: string;
|
||||
tags: string[];
|
||||
};
|
||||
|
||||
// Maps to QueryRangeResponse of tempopb https://github.com/grafana/tempo/blob/cfda98fc5cb0777963f41e0949b9ad2d24b4b5b8/pkg/tempopb/tempo.proto#L360
|
||||
export type TraceqlMetricsResponse = {
|
||||
series: MetricsSeries[];
|
||||
metrics: SearchMetrics;
|
||||
};
|
||||
|
||||
export type MetricsSeries = {
|
||||
labels: MetricsSeriesLabel[];
|
||||
samples: MetricsSeriesSample[];
|
||||
promLabels: string;
|
||||
};
|
||||
|
||||
export type MetricsSeriesLabel = {
|
||||
key: string;
|
||||
value: ProtoValue;
|
||||
};
|
||||
|
||||
export type ProtoValue = {
|
||||
stringValue?: string;
|
||||
intValue?: string;
|
||||
boolValue?: boolean;
|
||||
doubleValue?: string;
|
||||
};
|
||||
|
||||
export type MetricsSeriesSample = {
|
||||
timestampMs: string;
|
||||
value: number;
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user