Tempo: TraceQL metrics streaming (#99037)

* TraceQL metrics streaming POC

* Reduce duplicate frames by using scan() and combineResponses()

* Trying to remove samples outside of time range

* Remove code to clean out of range

* Metrics streaming config toggle

* Sync opening the search and metrics options

* Fix tests

* Fix issues after conflicts

* Fix tests

* Use absolute value when computing minXDelta

* Revert last commit

* Fix frame sorting

* Remove all duplicates

* Use fields from schema to get the frames

* Use FieldCache

* Address PR comments
This commit is contained in:
Andre Pereira 2025-02-11 11:11:01 +00:00 committed by GitHub
parent cbe5741096
commit d48802cdfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 453 additions and 77 deletions

View File

@ -0,0 +1,103 @@
package tempo
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"github.com/grafana/grafana/pkg/tsdb/tempo/traceql"
"google.golang.org/grpc/metadata"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/tracing"
"github.com/grafana/grafana/pkg/tsdb/tempo/kinds/dataquery"
"github.com/grafana/tempo/pkg/tempopb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
const MetricsPathPrefix = "metrics/"
func (s *Service) runMetricsStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, datasource *Datasource) error {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.runMetricsStream")
defer span.End()
response := &backend.DataResponse{}
var backendQuery *backend.DataQuery
err := json.Unmarshal(req.Data, &backendQuery)
if err != nil {
response.Error = fmt.Errorf("error unmarshaling backend query model: %v", err)
span.RecordError(response.Error)
span.SetStatus(codes.Error, response.Error.Error())
return err
}
var qrr *tempopb.QueryRangeRequest
err = json.Unmarshal(req.Data, &qrr)
if err != nil {
response.Error = fmt.Errorf("error unmarshaling Tempo query model: %v", err)
span.RecordError(response.Error)
span.SetStatus(codes.Error, response.Error.Error())
return err
}
if qrr.GetQuery() == "" {
return fmt.Errorf("query is empty")
}
qrr.Start = uint64(backendQuery.TimeRange.From.UnixNano())
qrr.End = uint64(backendQuery.TimeRange.To.UnixNano())
// Setting the user agent for the gRPC call. When DS is decoupled we don't recreate instance when grafana config
// changes or updates, so we have to get it from context.
// Ideally this would be pushed higher, so it's set once for all rpc calls, but we have only one now.
ctx = metadata.AppendToOutgoingContext(ctx, "User-Agent", backend.UserAgentFromContext(ctx).String())
stream, err := datasource.StreamingClient.MetricsQueryRange(ctx, qrr)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
s.logger.Error("Error Search()", "err", err)
return err
}
return s.processMetricsStream(ctx, qrr.Query, stream, sender)
}
func (s *Service) processMetricsStream(ctx context.Context, query string, stream tempopb.StreamingQuerier_MetricsQueryRangeClient, sender StreamSender) error {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.processStream")
defer span.End()
messageCount := 0
for {
msg, err := stream.Recv()
messageCount++
span.SetAttributes(attribute.Int("message_count", messageCount))
if errors.Is(err, io.EOF) {
if err := s.sendResponse(ctx, nil, nil, dataquery.SearchStreamingStateDone, sender); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
break
}
if err != nil {
s.logger.Error("Error receiving message", "err", err)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
transformed := traceql.TransformMetricsResponse(query, *msg)
if err := s.sendResponse(ctx, transformed, msg.Metrics, dataquery.SearchStreamingStateStreaming, sender); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
}
return nil
}

View File

@ -89,7 +89,7 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue
messageCount++
span.SetAttributes(attribute.Int("message_count", messageCount))
if errors.Is(err, io.EOF) {
if err := s.sendResponse(ctx, &ExtendedResponse{
if err := s.sendSearchResponse(ctx, &ExtendedResponse{
State: dataquery.SearchStreamingStateDone,
SearchResponse: &tempopb.SearchResponse{
Metrics: metrics,
@ -114,7 +114,7 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue
traceList = removeDuplicates(traceList)
span.SetAttributes(attribute.Int("traces_count", len(traceList)))
if err := s.sendResponse(ctx, &ExtendedResponse{
if err := s.sendSearchResponse(ctx, &ExtendedResponse{
State: dataquery.SearchStreamingStateStreaming,
SearchResponse: &tempopb.SearchResponse{
Metrics: metrics,
@ -130,34 +130,43 @@ func (s *Service) processStream(ctx context.Context, stream tempopb.StreamingQue
return nil
}
func (s *Service) sendResponse(ctx context.Context, response *ExtendedResponse, sender StreamSender) error {
_, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendResponse")
func (s *Service) sendSearchResponse(ctx context.Context, response *ExtendedResponse, sender StreamSender) error {
_, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendSearchResponse")
defer span.End()
frame := createResponseDataFrame()
if response != nil {
span.SetAttributes(attribute.Int("trace_count", len(response.Traces)), attribute.String("state", string(response.State)))
tracesAsJson, err := json.Marshal(response.Traces)
if err != nil {
return err
}
tracesRawMessage := json.RawMessage(tracesAsJson)
frame.Fields[0].Append(tracesRawMessage)
metricsAsJson, err := json.Marshal(response.Metrics)
if err != nil {
return err
}
metricsRawMessage := json.RawMessage(metricsAsJson)
frame.Fields[1].Append(metricsRawMessage)
frame.Fields[2].Append(string(response.State))
frame.Fields[3].Append("")
return s.sendResponse(ctx, response.Traces, response.Metrics, response.State, sender)
}
return sender.SendFrame(frame, data.IncludeAll)
}
func (s *Service) sendResponse(ctx context.Context, result interface{}, metrics *tempopb.SearchMetrics, state dataquery.SearchStreamingState, sender StreamSender) error {
_, span := tracing.DefaultTracer().Start(ctx, "datasource.tempo.sendResponse")
defer span.End()
frame := createResponseDataFrame()
tracesAsJson, err := json.Marshal(result)
if err != nil {
return err
}
tracesRawMessage := json.RawMessage(tracesAsJson)
frame.Fields[0].Append(tracesRawMessage)
metricsAsJson, err := json.Marshal(metrics)
if err != nil {
return err
}
metricsRawMessage := json.RawMessage(metricsAsJson)
frame.Fields[1].Append(metricsRawMessage)
frame.Fields[2].Append(string(state))
frame.Fields[3].Append("")
return sender.SendFrame(frame, data.IncludeAll)
}
func sendError(searchErr error, sender StreamSender) error {
frame := createResponseDataFrame()
@ -173,7 +182,7 @@ func sendError(searchErr error, sender StreamSender) error {
func createResponseDataFrame() *data.Frame {
frame := data.NewFrame("response")
frame.Fields = append(frame.Fields, data.NewField("traces", nil, []json.RawMessage{}))
frame.Fields = append(frame.Fields, data.NewField("result", nil, []json.RawMessage{}))
frame.Fields = append(frame.Fields, data.NewField("metrics", nil, []json.RawMessage{}))
frame.Fields = append(frame.Fields, data.NewField("state", nil, []string{}))
frame.Fields = append(frame.Fields, data.NewField("error", nil, []string{}))

View File

@ -8,19 +8,22 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
func (s *Service) SubscribeStream(_ context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
s.logger.Debug("Allowing access to stream", "path", req.Path, "user", req.PluginContext.User)
status := backend.SubscribeStreamStatusPermissionDenied
if strings.HasPrefix(req.Path, SearchPathPrefix) {
status = backend.SubscribeStreamStatusOK
}
if strings.HasPrefix(req.Path, MetricsPathPrefix) {
status = backend.SubscribeStreamStatusOK
}
return &backend.SubscribeStreamResponse{
Status: status,
}, nil
}
func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
s.logger.Debug("PublishStream called")
// Do not allow publishing at all.
@ -31,9 +34,9 @@ func (s *Service) PublishStream(ctx context.Context, req *backend.PublishStreamR
func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamRequest, sender *backend.StreamSender) error {
s.logger.Debug("New stream call", "path", request.Path)
tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext)
if strings.HasPrefix(request.Path, SearchPathPrefix) {
tempoDatasource, err := s.getDSInfo(ctx, request.PluginContext)
if err != nil {
return err
}
@ -43,6 +46,16 @@ func (s *Service) RunStream(ctx context.Context, request *backend.RunStreamReque
return nil
}
}
if strings.HasPrefix(request.Path, MetricsPathPrefix) {
if err != nil {
return err
}
if err = s.runMetricsStream(ctx, request, sender, tempoDatasource); err != nil {
return sendError(err, sender)
} else {
return nil
}
}
return fmt.Errorf("unknown path %s", request.Path)
}

View File

@ -155,8 +155,10 @@ func (s *Service) performTraceRequest(ctx context.Context, dsInfo *Datasource, a
}
defer func() {
if err := resp.Body.Close(); err != nil {
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
if resp != nil && resp.Body != nil {
if err := resp.Body.Close(); err != nil {
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
}
}
}()

View File

@ -3,6 +3,7 @@ package traceql
import (
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"time"
@ -13,7 +14,7 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
)
func TransformMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryRangeResponse) []*data.Frame {
func TransformMetricsResponse(query string, resp tempopb.QueryRangeResponse) []*data.Frame {
// prealloc frames
frames := make([]*data.Frame, len(resp.Series))
var exemplarFrames []*data.Frame
@ -37,10 +38,11 @@ func TransformMetricsResponse(query *dataquery.TempoQuery, resp tempopb.QueryRan
},
Meta: &data.FrameMeta{
PreferredVisualization: data.VisTypeGraph,
Type: data.FrameTypeTimeSeriesMulti,
},
}
isHistogram := isHistogramQuery(*query.Query)
isHistogram := isHistogramQuery(query)
if isHistogram {
frame.Meta.PreferredVisualizationPluginID = "heatmap"
}
@ -128,10 +130,18 @@ func transformLabelsAndGetName(seriesLabels []v1.KeyValue) (string, data.Labels)
if len(seriesLabels) == 1 {
_, name = metricsValueToString(seriesLabels[0].GetValue())
} else {
var labelStrings []string
for key, val := range labels {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", key, val))
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
var labelStrings []string
for _, key := range keys {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", key, labels[key]))
}
name = fmt.Sprintf("{%s}", strings.Join(labelStrings, ", "))
}
}

View File

@ -13,9 +13,7 @@ import (
func TestTransformMetricsResponse_EmptyResponse(t *testing.T) {
resp := tempopb.QueryRangeResponse{}
queryStr := ""
query := &dataquery.TempoQuery{Query: &queryStr}
frames := TransformMetricsResponse(query, resp)
frames := TransformMetricsResponse("", resp)
assert.Empty(t, frames)
}
@ -32,9 +30,7 @@ func TestTransformMetricsResponse_SingleSeriesSingleLabel(t *testing.T) {
},
},
}
queryStr := ""
query := &dataquery.TempoQuery{Query: &queryStr}
frames := TransformMetricsResponse(query, resp)
frames := TransformMetricsResponse("", resp)
assert.Len(t, frames, 1)
assert.Equal(t, "value1", frames[0].RefID)
assert.Equal(t, "value1", frames[0].Name)
@ -47,9 +43,6 @@ func TestTransformMetricsResponse_SingleSeriesSingleLabel(t *testing.T) {
}
func TestTransformMetricsResponse_SingleSeriesMultipleLabels(t *testing.T) {
// Skipping for now because this test is broken.
t.Skip()
resp := tempopb.QueryRangeResponse{
Series: []*tempopb.TimeSeries{
{
@ -65,9 +58,7 @@ func TestTransformMetricsResponse_SingleSeriesMultipleLabels(t *testing.T) {
},
},
}
queryStr := ""
query := &dataquery.TempoQuery{Query: &queryStr}
frames := TransformMetricsResponse(query, resp)
frames := TransformMetricsResponse("", resp)
assert.Len(t, frames, 1)
assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].RefID)
assert.Equal(t, "{label1=\"value1\", label2=123, label3=123.456, label4=true}", frames[0].Name)
@ -100,9 +91,7 @@ func TestTransformMetricsResponse_MultipleSeries(t *testing.T) {
},
},
}
queryStr := ""
query := &dataquery.TempoQuery{Query: &queryStr}
frames := TransformMetricsResponse(query, resp)
frames := TransformMetricsResponse("", resp)
assert.Len(t, frames, 2)
assert.Equal(t, "value1", frames[0].RefID)
assert.Equal(t, "value1", frames[0].Name)

View File

@ -71,8 +71,10 @@ func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.Plugi
resp, responseBody, err := s.performMetricsQuery(ctx, dsInfo, tempoQuery, backendQuery, span)
defer func() {
if err := resp.Body.Close(); err != nil {
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
if resp != nil && resp.Body != nil {
if err := resp.Body.Close(); err != nil {
ctxLogger.Error("Failed to close response body", "error", err, "function", logEntrypoint())
}
}
}()
if err != nil {
@ -105,7 +107,7 @@ func (s *Service) runTraceQlQueryMetrics(ctx context.Context, pCtx backend.Plugi
return res, err
}
frames := traceql.TransformMetricsResponse(tempoQuery, queryResponse)
frames := traceql.TransformMetricsResponse(*tempoQuery.Query, queryResponse)
result.Frames = frames
}

View File

@ -65,6 +65,7 @@ describe('TraceQLSearch', () => {
},
} as TempoDatasource;
datasource.isStreamingSearchEnabled = () => false;
datasource.isStreamingMetricsEnabled = () => false;
const lp = new TempoLanguageProvider(datasource);
lp.getIntrinsics = () => ['duration'];
lp.generateQueryFromFilters = () => '{}';
@ -221,6 +222,8 @@ describe('TraceQLSearch', () => {
},
} as TempoDatasource;
datasource.isStreamingSearchEnabled = () => false;
datasource.isStreamingMetricsEnabled = () => false;
const lp = new TempoLanguageProvider(datasource);
lp.getIntrinsics = () => ['duration'];
lp.generateQueryFromFilters = () => '{}';

View File

@ -274,7 +274,8 @@ const TraceQLSearch = ({ datasource, query, onChange, onClearResults, app, addVa
<TempoQueryBuilderOptions
onChange={onChange}
query={query}
isStreaming={datasource.isStreamingSearchEnabled() ?? false}
searchStreaming={datasource.isStreamingSearchEnabled() ?? false}
metricsStreaming={datasource.isStreamingMetricsEnabled() ?? false}
/>
</div>
{error ? (

View File

@ -13,9 +13,11 @@ export interface Props {
collapsedInfo: string[];
queryStats?: QueryStats | null;
children: React.ReactNode;
onToggle?: (isOpen: boolean) => void;
isOpen?: boolean;
}
export function QueryOptionGroup({ title, children, collapsedInfo, queryStats }: Props) {
export function QueryOptionGroup({ title, children, collapsedInfo, queryStats, onToggle, isOpen: propsIsOpen }: Props) {
const [isOpen, toggleOpen] = useToggle(false);
const styles = useStyles2(getStyles);
@ -24,8 +26,8 @@ export function QueryOptionGroup({ title, children, collapsedInfo, queryStats }:
<Collapse
className={styles.collapse}
collapsible
isOpen={isOpen}
onToggle={toggleOpen}
isOpen={propsIsOpen ?? isOpen}
onToggle={onToggle ?? toggleOpen}
label={
<Stack gap={0}>
<h6 className={styles.title}>{title}</h6>

View File

@ -15,6 +15,7 @@ import { FeatureName, featuresToTempoVersion } from '../datasource';
interface StreamingOptions extends DataSourceJsonData {
streamingEnabled?: {
search?: boolean;
metrics?: boolean;
};
}
interface Props extends DataSourcePluginOptionsEditorProps<StreamingOptions> {}
@ -27,8 +28,7 @@ export const StreamingSection = ({ options, onOptionsChange }: Props) => {
isCollapsible={false}
description={
<Stack gap={0.5}>
<div>{`Enable streaming for different Tempo features.
Currently supported only for search queries and from Tempo version ${featuresToTempoVersion[FeatureName.streaming]} onwards.`}</div>
<div>Enable streaming for different Tempo features.</div>
<a
href={'https://grafana.com/docs/tempo/latest/traceql/#stream-query-results'}
target={'_blank'}
@ -46,8 +46,8 @@ export const StreamingSection = ({ options, onOptionsChange }: Props) => {
</Alert>
<InlineFieldRow>
<InlineField
tooltip={`Enable streaming for search queries. Minimum required version for Tempo: ${featuresToTempoVersion[FeatureName.streaming]}.`}
label="Queries"
tooltip={`Enable streaming for search queries. Minimum required version for Tempo: ${featuresToTempoVersion[FeatureName.searchStreaming]}.`}
label="Search queries"
labelWidth={26}
>
<InlineSwitch
@ -64,6 +64,26 @@ export const StreamingSection = ({ options, onOptionsChange }: Props) => {
/>
</InlineField>
</InlineFieldRow>
<InlineFieldRow>
<InlineField
tooltip={`Enable streaming for metrics queries. Minimum required version for Tempo: ${featuresToTempoVersion[FeatureName.metricsStreaming]}.`}
label="Metrics queries"
labelWidth={26}
>
<InlineSwitch
id={'streamingEnabled.metrics'}
// TECHDEBT: We should check whether the feature is supported by the Tempo version,
// but here we don't have easily access to such information
value={options.jsonData.streamingEnabled?.metrics || false}
onChange={(event: React.SyntheticEvent<HTMLInputElement>) => {
updateDatasourcePluginJsonDataOption({ onOptionsChange, options }, 'streamingEnabled', {
...options.jsonData.streamingEnabled,
metrics: event.currentTarget.checked,
});
}}
/>
</InlineField>
</InlineFieldRow>
</ConfigSection>
);
};

View File

@ -58,7 +58,7 @@ import {
transformFromOTLP as transformFromOTEL,
transformTrace,
} from './resultTransformer';
import { doTempoChannelStream } from './streaming';
import { doTempoMetricsStreaming, doTempoSearchStreaming } from './streaming';
import { TempoJsonData, TempoQuery } from './types';
import { getErrorMessage, migrateFromSearchToTraceQLSearch } from './utils';
import { TempoVariableSupport } from './variables';
@ -67,7 +67,8 @@ export const DEFAULT_LIMIT = 20;
export const DEFAULT_SPSS = 3; // spans per span set
export enum FeatureName {
streaming = 'streaming',
searchStreaming = 'searchStreaming',
metricsStreaming = 'metricsStreaming',
}
/* Map, for each feature (e.g., streaming), the minimum Tempo version required to have that
@ -75,7 +76,8 @@ export enum FeatureName {
** target version, the feature is disabled in Grafana (frontend).
*/
export const featuresToTempoVersion = {
[FeatureName.streaming]: '2.2.0',
[FeatureName.searchStreaming]: '2.2.0',
[FeatureName.metricsStreaming]: '2.7.0',
};
// The version that we use as default in case we cannot retrieve it from the backend.
@ -115,6 +117,7 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
streamingEnabled?: {
search?: boolean;
metrics?: boolean;
};
// The version of Tempo running on the backend. `null` if we cannot retrieve it for whatever reason
@ -291,6 +294,18 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
isStreamingSearchEnabled() {
return this.streamingEnabled?.search && config.liveEnabled;
}
/**
* Check if streaming for metrics queries is enabled (and available).
*
* We need to check:
* - the Tempo data source plugin toggle, to disable streaming if the user disabled it in the data source configuration
* - if Grafana Live is enabled
*
* @return true if streaming for metrics queries is enabled, false otherwise
*/
isStreamingMetricsEnabled() {
return this.streamingEnabled?.metrics && config.liveEnabled;
}
isTraceQlMetricsQuery(query: string): boolean {
// Check whether this is a metrics query by checking if it contains a metrics function
@ -355,8 +370,13 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
app: options.app ?? '',
grafana_version: config.buildInfo.version,
query: queryValue ?? '',
streaming: this.isStreamingMetricsEnabled(),
});
subQueries.push(this.handleTraceQlMetricsQuery(options, targets.traceql));
if (this.isStreamingMetricsEnabled()) {
subQueries.push(this.handleMetricsStreamingQuery(options, targets.traceql, queryValue));
} else {
subQueries.push(this.handleTraceQlMetricsQuery(options, targets.traceql));
}
} else {
reportInteraction('grafana_traces_traceql_queried', {
datasourceType: 'tempo',
@ -689,7 +709,7 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
return merge(
...targets.map((target) =>
doTempoChannelStream(
doTempoSearchStreaming(
{ ...target, query },
this, // the datasource
options,
@ -699,6 +719,28 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
);
}
// This function can probably be simplified by avoiding passing both `targets` and `query`,
// since `query` is built from `targets`, if you look at how this function is currently called
handleMetricsStreamingQuery(
options: DataQueryRequest<TempoQuery>,
targets: TempoQuery[],
query: string
): Observable<DataQueryResponse> {
if (query === '') {
return EMPTY;
}
return merge(
...targets.map((target) =>
doTempoMetricsStreaming(
{ ...target, query },
this, // the datasource
options
)
)
);
}
makeTraceIdRequest(options: DataQueryRequest<TempoQuery>, targets: TempoQuery[]): DataQueryRequest<TempoQuery> {
const request = {
...options,

View File

@ -1,31 +1,36 @@
import { capitalize } from 'lodash';
import { map, Observable, takeWhile } from 'rxjs';
import { map, Observable, scan, takeWhile } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import {
DataFrame,
dataFrameFromJSON,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
FieldCache,
FieldType,
LiveChannelScope,
LoadingState,
MutableDataFrame,
sortDataFrame,
ThresholdsConfig,
ThresholdsMode,
} from '@grafana/data';
import { cloneQueryResponse, combineResponses } from '@grafana/o11y-ds-frontend';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import { SearchStreamingState } from './dataquery.gen';
import { DEFAULT_SPSS, TempoDatasource } from './datasource';
import { formatTraceQLResponse } from './resultTransformer';
import { SearchMetrics, TempoJsonData, TempoQuery } from './types';
import { stepToNanos } from './utils';
function getLiveStreamKey(): string {
return uuidv4();
}
export function doTempoChannelStream(
export function doTempoSearchStreaming(
query: TempoQuery,
ds: TempoDatasource,
options: DataQueryRequest<TempoQuery>,
@ -67,11 +72,14 @@ export function doTempoChannelStream(
if ('message' in evt && evt?.message) {
const currentTime = performance.now();
const elapsedTime = currentTime - requestTime;
// Schema should be [traces, metrics, state, error]
const traces = evt.message.data.values[0][0];
const metrics = evt.message.data.values[1][0];
const frameState: SearchStreamingState = evt.message.data.values[2][0];
const error = evt.message.data.values[3][0];
const messageFrame = dataFrameFromJSON(evt.message);
const fieldCache = new FieldCache(messageFrame);
const traces = fieldCache.getFieldByName('result')?.values[0];
const metrics = fieldCache.getFieldByName('metrics')?.values[0];
const frameState = fieldCache.getFieldByName('state')?.values[0];
const error = fieldCache.getFieldByName('error')?.values[0];
switch (frameState) {
case SearchStreamingState.Done:
@ -100,6 +108,127 @@ export function doTempoChannelStream(
);
}
export function doTempoMetricsStreaming(
query: TempoQuery,
ds: TempoDatasource,
options: DataQueryRequest<TempoQuery>
): Observable<DataQueryResponse> {
const range = options.range;
const key = getLiveStreamKey();
let state: LoadingState = LoadingState.NotStarted;
const step = stepToNanos(query.step);
return getGrafanaLiveSrv()
.getStream<MutableDataFrame>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `metrics/${key}`,
data: {
...query,
step,
timeRange: {
from: range.from.toISOString(),
to: range.to.toISOString(),
},
},
})
.pipe(
takeWhile((evt) => {
if ('message' in evt && evt?.message) {
const frameState: SearchStreamingState = evt.message.data.values[2][0];
if (frameState === SearchStreamingState.Done || frameState === SearchStreamingState.Error) {
return false;
}
}
return true;
}, true),
map((evt) => {
let newResult: DataQueryResponse = { data: [], state: LoadingState.NotStarted };
if ('message' in evt && evt?.message) {
const messageFrame = dataFrameFromJSON(evt.message);
const fieldCache = new FieldCache(messageFrame);
const data = fieldCache.getFieldByName('result')?.values[0];
const frameState = fieldCache.getFieldByName('state')?.values[0];
const error = fieldCache.getFieldByName('error')?.values[0];
switch (frameState) {
case SearchStreamingState.Done:
state = LoadingState.Done;
break;
case SearchStreamingState.Streaming:
state = LoadingState.Streaming;
break;
case SearchStreamingState.Error:
throw new Error(error);
}
newResult = {
data: data?.map(dataFrameFromJSON) ?? [],
state,
};
}
return newResult;
}),
// Merge results on acc
scan((acc, curr) => {
if (!curr) {
return acc;
}
if (!acc) {
return cloneQueryResponse(curr);
}
return mergeFrames(acc, curr);
})
);
}
function mergeFrames(acc: DataQueryResponse, newResult: DataQueryResponse): DataQueryResponse {
const result = combineResponses(cloneQueryResponse(acc), newResult);
// Remove duplicate time field values for all frames
result.data = result.data.map((frame: DataFrame) => {
let newFrame = frame;
const timeFieldIndex = frame.fields.findIndex((f) => f.type === FieldType.time);
if (timeFieldIndex >= 0) {
removeDuplicateTimeFieldValues(frame, timeFieldIndex);
newFrame = sortDataFrame(frame, timeFieldIndex);
}
return newFrame;
});
result.state = newResult.state;
return result;
}
/**
* Remove duplicate time field values from the DataFrame. This is necessary because Tempo sends partial results to Grafana
* that we append to an existing DataFrame. This can result in duplicate values for the same timestamp so this function removes
* older values and keeps the latest value.
* @param accFrame
* @param timeFieldIndex
*/
function removeDuplicateTimeFieldValues(accFrame: DataFrame, timeFieldIndex: number) {
const duplicatesMap = accFrame.fields[timeFieldIndex].values.reduce((acc: Record<number, number[]>, value, index) => {
if (acc[value]) {
acc[value].push(index);
} else {
acc[value] = [index];
}
return acc;
}, {});
const indexesToRemove = Object.values(duplicatesMap)
.filter((indexes) => indexes.length > 1)
.map((indexes) => indexes.slice(1))
.flat();
accFrame.fields.forEach((field) => {
field.values = field.values.filter((_, index) => !indexesToRemove.includes(index));
});
}
function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState, elapsedTime: number) {
const progressThresholds: ThresholdsConfig = {
steps: [

View File

@ -71,7 +71,8 @@ export function QueryEditor(props: Props) {
<TempoQueryBuilderOptions
query={query}
onChange={props.onChange}
isStreaming={props.datasource.isStreamingSearchEnabled() ?? false}
searchStreaming={props.datasource.isStreamingSearchEnabled() ?? false}
metricsStreaming={props.datasource.isStreamingMetricsEnabled() ?? false}
/>
</div>
</>

View File

@ -1,5 +1,6 @@
import { css } from '@emotion/css';
import * as React from 'react';
import { useToggle } from 'react-use';
import { GrafanaTheme2 } from '@grafana/data';
import { EditorField, EditorRow } from '@grafana/plugin-ui';
@ -13,7 +14,8 @@ import { TempoQuery } from '../types';
interface Props {
onChange: (value: TempoQuery) => void;
query: Partial<TempoQuery> & TempoQuery;
isStreaming: boolean;
searchStreaming: boolean;
metricsStreaming: boolean;
}
/**
@ -29,8 +31,9 @@ const parseIntWithFallback = (val: string, fallback: number) => {
return isNaN(parsed) ? fallback : parsed;
};
export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, isStreaming }) => {
export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, searchStreaming, metricsStreaming }) => {
const styles = useStyles2(getStyles);
const [isOpen, toggleOpen] = useToggle(false);
if (!query.hasOwnProperty('limit')) {
query.limit = DEFAULT_LIMIT;
@ -76,19 +79,26 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
`Spans Limit: ${query.spss || DEFAULT_SPSS}`,
`Table Format: ${query.tableType === SearchTableType.Traces ? 'Traces' : 'Spans'}`,
'|',
`Streaming: ${isStreaming ? 'Enabled' : 'Disabled'}`,
`Streaming: ${searchStreaming ? 'Enabled' : 'Disabled'}`,
];
const collapsedMetricsOptions = [
`Step: ${query.step || 'auto'}`,
`Type: ${query.metricsQueryType === MetricsQueryType.Range ? 'Range' : 'Instant'}`,
'|',
`Streaming: ${metricsStreaming ? 'Enabled' : 'Disabled'}`,
// `Exemplars: ${query.exemplars !== undefined ? query.exemplars : 'auto'}`,
];
return (
<EditorRow>
<div className={styles.options}>
<QueryOptionGroup title="Search Options" collapsedInfo={collapsedSearchOptions}>
<QueryOptionGroup
title="Search Options"
collapsedInfo={collapsedSearchOptions}
isOpen={isOpen}
onToggle={toggleOpen}
>
<EditorField label="Limit" tooltip="Maximum number of traces to return.">
<AutoSizeInput
className="width-4"
@ -122,11 +132,16 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
/>
</EditorField>
<EditorField label="Streaming" tooltip={<StreamingTooltip />} tooltipInteractive>
<div>{isStreaming ? 'Enabled' : 'Disabled'}</div>
<div>{searchStreaming ? 'Enabled' : 'Disabled'}</div>
</EditorField>
</QueryOptionGroup>
<QueryOptionGroup title="Metrics Options" collapsedInfo={collapsedMetricsOptions}>
<QueryOptionGroup
title="Metrics Options"
collapsedInfo={collapsedMetricsOptions}
isOpen={isOpen}
onToggle={toggleOpen}
>
<EditorField
label="Step"
tooltip="Defines the step for metric queries. Use duration notation, for example 30s or 1m"
@ -150,6 +165,10 @@ export const TempoQueryBuilderOptions = React.memo<Props>(({ onChange, query, is
onChange={onMetricsQueryTypeChange}
/>
</EditorField>
<EditorField label="Streaming" tooltip={<StreamingTooltip />} tooltipInteractive>
<div>{metricsStreaming ? 'Enabled' : 'Disabled'}</div>
</EditorField>
{/*<EditorField*/}
{/* label="Exemplars"*/}
{/* tooltip="Defines the amount of exemplars to request for metric queries. A value of 0 means no exemplars."*/}

View File

@ -1,4 +1,4 @@
import { DataSourceApi } from '@grafana/data';
import { DataSourceApi, parseDuration } from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { generateId } from './SearchTraceQLEditor/TagsInput';
@ -99,3 +99,34 @@ export const migrateFromSearchToTraceQLSearch = (query: TempoQuery) => {
};
return migratedQuery;
};
export const stepToNanos = (step?: string) => {
if (!step) {
return 0;
}
const match = step.match(/(\d+)(.+)/);
const rawLength = match?.[1];
const unit = match?.[2];
if (rawLength) {
if (unit === 'ns') {
return parseInt(rawLength, 10);
}
if (unit === 'µs') {
return parseInt(rawLength, 10) * 1000;
}
if (unit === 'ms') {
return parseInt(rawLength, 10) * 1000000;
}
const duration = parseDuration(step);
return (
(duration.seconds || 0) * 1000000000 +
(duration.minutes || 0) * 60000000000 +
(duration.hours || 0) * 3600000000000
);
}
return 0;
};