Files
grafana/public/app/plugins/datasource/tempo/streaming.ts
Andre Pereira 8aa5d470ca Tempo: Fix streaming query restart after Grafana server reboot (#77614)
* Fix streaming query restart after Grafana server reboot

* TraceQL Search filter name improvements

* Add flag to enable streaming in tempo docker block

---------

Co-authored-by: Andrej Ocenas <mr.ocenas@gmail.com>
2023-11-06 18:29:59 +02:00

187 lines
4.9 KiB
TypeScript

import { capitalize } from 'lodash';
import { map, Observable, takeWhile } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import {
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
FieldType,
LiveChannelScope,
LoadingState,
MutableDataFrame,
ThresholdsConfig,
ThresholdsMode,
} from '@grafana/data';
import { getGrafanaLiveSrv } from '@grafana/runtime';
import { SearchStreamingState } from './dataquery.gen';
import { DEFAULT_SPSS, TempoDatasource } from './datasource';
import { formatTraceQLResponse } from './resultTransformer';
import { SearchMetrics, TempoJsonData, TempoQuery } from './types';
function getLiveStreamKey(): string {
return uuidv4();
}
export function doTempoChannelStream(
query: TempoQuery,
ds: TempoDatasource,
options: DataQueryRequest<TempoQuery>,
instanceSettings: DataSourceInstanceSettings<TempoJsonData>
): Observable<DataQueryResponse> {
const range = options.range;
let frames: DataFrame[] | undefined = undefined;
let state: LoadingState = LoadingState.NotStarted;
const requestTime = performance.now();
return getGrafanaLiveSrv()
.getStream<MutableDataFrame>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `search/${getLiveStreamKey()}`,
data: {
...query,
SpansPerSpanSet: query.spss ?? DEFAULT_SPSS,
timeRange: {
from: range.from.toISOString(),
to: range.to.toISOString(),
},
},
})
.pipe(
takeWhile((evt) => {
if ('message' in evt && evt?.message) {
const frameState: SearchStreamingState = evt.message.data.values[2][0];
if (frameState === SearchStreamingState.Done || frameState === SearchStreamingState.Error) {
return false;
}
}
return true;
}, true)
)
.pipe(
map((evt) => {
if ('message' in evt && evt?.message) {
const currentTime = performance.now();
const elapsedTime = currentTime - requestTime;
// Schema should be [traces, metrics, state, error]
const traces = evt.message.data.values[0][0];
const metrics = evt.message.data.values[1][0];
const frameState: SearchStreamingState = evt.message.data.values[2][0];
const error = evt.message.data.values[3][0];
switch (frameState) {
case SearchStreamingState.Done:
state = LoadingState.Done;
break;
case SearchStreamingState.Streaming:
state = LoadingState.Streaming;
break;
case SearchStreamingState.Error:
throw new Error(error);
}
frames = [
metricsDataFrame(metrics, frameState, elapsedTime),
...formatTraceQLResponse(traces, instanceSettings, query.tableType),
];
}
return {
data: frames || [],
state,
};
})
);
}
function metricsDataFrame(metrics: SearchMetrics, state: SearchStreamingState, elapsedTime: number) {
const progressThresholds: ThresholdsConfig = {
steps: [
{
color: 'blue',
value: -Infinity,
},
{
color: 'green',
value: 75,
},
],
mode: ThresholdsMode.Absolute,
};
const frame: DataFrame = {
refId: 'streaming-progress',
name: 'Streaming Progress',
length: 1,
fields: [
{
name: 'state',
type: FieldType.string,
values: [capitalize(state.toString())],
config: {
displayNameFromDS: 'State',
},
},
{
name: 'elapsedTime',
type: FieldType.number,
values: [elapsedTime],
config: {
unit: 'ms',
displayNameFromDS: 'Elapsed Time',
},
},
{
name: 'totalBlocks',
type: FieldType.number,
values: [metrics.totalBlocks],
config: {
displayNameFromDS: 'Total Blocks',
},
},
{
name: 'completedJobs',
type: FieldType.number,
values: [metrics.completedJobs],
config: {
displayNameFromDS: 'Completed Jobs',
},
},
{
name: 'totalJobs',
type: FieldType.number,
values: [metrics.totalJobs],
config: {
displayNameFromDS: 'Total Jobs',
},
},
{
name: 'progress',
type: FieldType.number,
values: [
state === SearchStreamingState.Done ? 100 : ((metrics.completedJobs || 0) / (metrics.totalJobs || 1)) * 100,
],
config: {
displayNameFromDS: 'Progress',
unit: 'percent',
min: 0,
max: 100,
custom: {
cellOptions: {
type: 'gauge',
mode: 'gradient',
},
},
thresholds: progressThresholds,
},
},
],
meta: {
preferredVisualisationType: 'table',
},
};
return frame;
}