Tempo: Fix streaming query restart after Grafana server reboot (#77614)

* Fix streaming query restart after Grafana server reboot

* TraceQL Search filter name improvements

* Add flag to enable streaming in tempo docker block

---------

Co-authored-by: Andrej Ocenas <mr.ocenas@gmail.com>
This commit is contained in:
Andre Pereira 2023-11-06 16:29:59 +00:00 committed by GitHub
parent 25779bb6e5
commit 8aa5d470ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 69 additions and 56 deletions

View File

@ -52,3 +52,5 @@ storage:
overrides:
metrics_generator_processors: [local-blocks, service-graphs, span-metrics]
stream_over_http_enabled: true

View File

@ -342,9 +342,9 @@ export const SpanFilters = memo((props: SpanFilterProps) => {
</InlineFieldRow>
<InlineFieldRow>
<InlineField
label="Span Duration"
label="Duration"
labelWidth={16}
tooltip="Filter by span duration. Accepted units are ns, us, ms, s, m, h"
tooltip="Filter by duration. Accepted units are ns, us, ms, s, m, h"
>
<HorizontalGroup spacing="xs" align="flex-start">
<Select

View File

@ -150,7 +150,7 @@ const TraceQLSearch = ({ datasource, query, onChange }: Props) => {
/>
</InlineSearchField>
<InlineSearchField
label={'Duration'}
label={'Span Duration'}
tooltip="The span duration, i.e. end - start time of the span. Accepted units are ns, ms, s, m, h"
>
<HorizontalGroup spacing={'sm'}>

View File

@ -41,6 +41,10 @@ export const filterTitle = (f: TraceqlFilter) => {
if (f.tag === 'name') {
return 'Span Name';
}
// Special case for the resource service name
if (f.tag === 'service.name' && f.scope === TraceqlSearchScope.Resource) {
return 'Service Name';
}
return startCase(filterScopedTag(f));
};

View File

@ -1,5 +1,5 @@
import { capitalize } from 'lodash';
import { map, Observable, defer, mergeMap } from 'rxjs';
import { map, Observable, takeWhile } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import {
@ -20,7 +20,7 @@ import { SearchStreamingState } from './dataquery.gen';
import { DEFAULT_SPSS, TempoDatasource } from './datasource';
import { formatTraceQLResponse } from './resultTransformer';
import { SearchMetrics, TempoJsonData, TempoQuery } from './types';
export async function getLiveStreamKey(): Promise<string> {
function getLiveStreamKey(): string {
return uuidv4();
}
@ -34,59 +34,66 @@ export function doTempoChannelStream(
let frames: DataFrame[] | undefined = undefined;
let state: LoadingState = LoadingState.NotStarted;
const requestTime = performance.now();
return defer(() => getLiveStreamKey()).pipe(
mergeMap((key) => {
const requestTime = performance.now();
return getGrafanaLiveSrv()
.getStream<MutableDataFrame>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `search/${key}`,
data: {
...query,
SpansPerSpanSet: query.spss ?? DEFAULT_SPSS,
timeRange: {
from: range.from.toISOString(),
to: range.to.toISOString(),
},
},
})
.pipe(
map((evt) => {
if ('message' in evt && evt?.message) {
const currentTime = performance.now();
const elapsedTime = currentTime - requestTime;
// Schema should be [traces, metrics, state, error]
const traces = evt.message.data.values[0][0];
const metrics = evt.message.data.values[1][0];
const frameState: SearchStreamingState = evt.message.data.values[2][0];
const error = evt.message.data.values[3][0];
switch (frameState) {
case SearchStreamingState.Done:
state = LoadingState.Done;
break;
case SearchStreamingState.Streaming:
state = LoadingState.Streaming;
break;
case SearchStreamingState.Error:
throw new Error(error);
}
frames = [
metricsDataFrame(metrics, frameState, elapsedTime),
...formatTraceQLResponse(traces, instanceSettings, query.tableType),
];
}
return {
data: frames || [],
state,
};
})
);
return getGrafanaLiveSrv()
.getStream<MutableDataFrame>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `search/${getLiveStreamKey()}`,
data: {
...query,
SpansPerSpanSet: query.spss ?? DEFAULT_SPSS,
timeRange: {
from: range.from.toISOString(),
to: range.to.toISOString(),
},
},
})
);
.pipe(
takeWhile((evt) => {
if ('message' in evt && evt?.message) {
const frameState: SearchStreamingState = evt.message.data.values[2][0];
if (frameState === SearchStreamingState.Done || frameState === SearchStreamingState.Error) {
return false;
}
}
return true;
}, true)
)
.pipe(
map((evt) => {
if ('message' in evt && evt?.message) {
const currentTime = performance.now();
const elapsedTime = currentTime - requestTime;
// Schema should be [traces, metrics, state, error]
const traces = evt.message.data.values[0][0];
const metrics = evt.message.data.values[1][0];
const frameState: SearchStreamingState = evt.message.data.values[2][0];
const error = evt.message.data.values[3][0];
switch (frameState) {
case SearchStreamingState.Done:
state = LoadingState.Done;
break;
case SearchStreamingState.Streaming:
state = LoadingState.Streaming;
break;
case SearchStreamingState.Error:
throw new Error(error);
}
frames = [
metricsDataFrame(metrics, frameState, elapsedTime),
...formatTraceQLResponse(traces, instanceSettings, query.tableType),
];
}
return {
data: frames || [],
state,
};
})
);
}
function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState, elapsedTime: number) {