mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Tempo: Add toggle for streaming (#88685)
* first implementation * Linting * Fix tests * Trigger CI * Renaming * Linting * Trigger CI * Fix tests * Add live test for streaming * Trigger CI * Chores * Fix broken test * Chores * Improve tests * Linting * Address PR comments * Renaming * Add info alert * Linting * Linting * Check also feature toggle * Fix tests * Added link to streaming config section and updated copy * Require streaming to be specifically disabled --------- Co-authored-by: André Pereira <adrapereira@gmail.com>
This commit is contained in:
parent
fc8a5cf468
commit
13be47f903
@ -23,6 +23,7 @@ import { SecureSocksProxySettings, useStyles2, Divider, Stack } from '@grafana/u
|
||||
|
||||
import { QuerySettings } from './QuerySettings';
|
||||
import { ServiceGraphSettings } from './ServiceGraphSettings';
|
||||
import { StreamingSection } from './StreamingSection';
|
||||
import { TraceQLSearchSettings } from './TraceQLSearchSettings';
|
||||
|
||||
export type Props = DataSourcePluginOptionsEditorProps;
|
||||
@ -48,8 +49,11 @@ export const ConfigEditor = ({ options, onOptionsChange }: Props) => {
|
||||
onChange: onOptionsChange,
|
||||
})}
|
||||
/>
|
||||
|
||||
<Divider spacing={4} />
|
||||
|
||||
<StreamingSection options={options} onOptionsChange={onOptionsChange} />
|
||||
<Divider spacing={4} />
|
||||
|
||||
<TraceToLogsSection options={options} onOptionsChange={onOptionsChange} />
|
||||
<Divider spacing={4} />
|
||||
|
||||
|
@ -0,0 +1,81 @@
|
||||
import { css } from '@emotion/css';
|
||||
import React from 'react';
|
||||
|
||||
import {
|
||||
DataSourceJsonData,
|
||||
DataSourcePluginOptionsEditorProps,
|
||||
GrafanaTheme2,
|
||||
updateDatasourcePluginJsonDataOption,
|
||||
} from '@grafana/data';
|
||||
import { ConfigSection } from '@grafana/experimental';
|
||||
import { InlineFieldRow, InlineField, InlineSwitch, Alert, Stack, useStyles2 } from '@grafana/ui';
|
||||
|
||||
import { FeatureName, featuresToTempoVersion } from '../datasource';
|
||||
|
||||
interface StreamingOptions extends DataSourceJsonData {
|
||||
streamingEnabled?: {
|
||||
search?: boolean;
|
||||
};
|
||||
}
|
||||
interface Props extends DataSourcePluginOptionsEditorProps<StreamingOptions> {}
|
||||
|
||||
export const StreamingSection = ({ options, onOptionsChange }: Props) => {
|
||||
const styles = useStyles2(getStyles);
|
||||
return (
|
||||
<ConfigSection
|
||||
title="Streaming"
|
||||
isCollapsible={false}
|
||||
description={
|
||||
<Stack gap={0.5}>
|
||||
<div>{`Enable streaming for different Tempo features.
|
||||
Currently supported only for search queries and from Tempo version ${featuresToTempoVersion[FeatureName.streaming]} onwards.`}</div>
|
||||
<a
|
||||
href={'https://grafana.com/docs/tempo/latest/traceql/#stream-query-results'}
|
||||
target={'_blank'}
|
||||
rel="noreferrer"
|
||||
className={styles.a}
|
||||
>
|
||||
Learn more
|
||||
</a>
|
||||
</Stack>
|
||||
}
|
||||
>
|
||||
<Alert severity="info" title="Streaming and self-managed Tempo instances">
|
||||
If your Tempo instance is behind a load balancer or proxy that does not supporting gRPC or HTTP2, streaming will
|
||||
probably not work and should be disabled.
|
||||
</Alert>
|
||||
<InlineFieldRow>
|
||||
<InlineField
|
||||
tooltip={`Enable streaming for search queries. Minimum required version for Tempo: ${featuresToTempoVersion[FeatureName.streaming]}.`}
|
||||
label="Queries"
|
||||
labelWidth={26}
|
||||
>
|
||||
<InlineSwitch
|
||||
id={'streamingEnabled.search'}
|
||||
// TECHDEBT: We should check whether the feature is supported by the Tempo version,
|
||||
// but here we don't have easily access to such information
|
||||
value={options.jsonData.streamingEnabled?.search || false}
|
||||
onChange={(event: React.SyntheticEvent<HTMLInputElement>) => {
|
||||
updateDatasourcePluginJsonDataOption({ onOptionsChange, options }, 'streamingEnabled', {
|
||||
...options.jsonData.streamingEnabled,
|
||||
search: event.currentTarget.checked,
|
||||
});
|
||||
}}
|
||||
/>
|
||||
</InlineField>
|
||||
</InlineFieldRow>
|
||||
</ConfigSection>
|
||||
);
|
||||
};
|
||||
const getStyles = (theme: GrafanaTheme2) => {
|
||||
return {
|
||||
a: css({
|
||||
color: theme.colors.text.link,
|
||||
textDecoration: 'underline',
|
||||
marginLeft: '5px',
|
||||
'&:hover': {
|
||||
textDecoration: 'none',
|
||||
},
|
||||
}),
|
||||
};
|
||||
};
|
@ -75,7 +75,7 @@ describe('Tempo data source', () => {
|
||||
const range = {
|
||||
from: dateTime(new Date(2022, 8, 13, 16, 0, 0, 0)),
|
||||
to: dateTime(new Date(2022, 8, 13, 16, 15, 0, 0)),
|
||||
raw: { from: '15m', to: 'now' },
|
||||
raw: { from: 'now-15m', to: 'now' },
|
||||
};
|
||||
const traceqlQuery = {
|
||||
targets: [{ refId: 'refid1', queryType: 'traceql', query: '{}' }],
|
||||
@ -365,9 +365,14 @@ describe('Tempo data source', () => {
|
||||
describe('test the testDatasource function', () => {
|
||||
it('should return a success msg if response.ok is true', async () => {
|
||||
mockObservable = () => of({ ok: true });
|
||||
const handleStreamingSearch = jest
|
||||
.spyOn(TempoDatasource.prototype, 'handleStreamingSearch')
|
||||
.mockImplementation(() => of({ data: [] }));
|
||||
|
||||
const ds = new TempoDatasource(defaultSettings);
|
||||
const response = await ds.testDatasource();
|
||||
expect(response.status).toBe('success');
|
||||
expect(handleStreamingSearch).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@ -389,7 +394,7 @@ describe('Tempo data source', () => {
|
||||
const range = {
|
||||
from: dateTime(new Date(2022, 8, 13, 16, 0, 0, 0)),
|
||||
to: dateTime(new Date(2022, 8, 13, 16, 15, 0, 0)),
|
||||
raw: { from: '15m', to: 'now' },
|
||||
raw: { from: 'now-15m', to: 'now' },
|
||||
};
|
||||
|
||||
const request = ds.traceIdQueryRequest(
|
||||
@ -434,7 +439,7 @@ describe('Tempo data source', () => {
|
||||
range: {
|
||||
from: dateTime(new Date(2022, 8, 13, 16, 0, 0, 0)),
|
||||
to: dateTime(new Date(2022, 8, 13, 16, 15, 0, 0)),
|
||||
raw: { from: '15m', to: 'now' },
|
||||
raw: { from: 'now-15m', to: 'now' },
|
||||
},
|
||||
},
|
||||
[{ refId: 'refid1', queryType: 'traceql', query: '' } as TempoQuery]
|
||||
@ -1264,6 +1269,9 @@ export const defaultSettings: DataSourceInstanceSettings<TempoJsonData> = {
|
||||
nodeGraph: {
|
||||
enabled: true,
|
||||
},
|
||||
streamingEnabled: {
|
||||
search: true,
|
||||
},
|
||||
},
|
||||
readOnly: false,
|
||||
};
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { groupBy } from 'lodash';
|
||||
import { EMPTY, from, lastValueFrom, merge, Observable, of } from 'rxjs';
|
||||
import { EMPTY, forkJoin, from, lastValueFrom, merge, Observable, of } from 'rxjs';
|
||||
import { catchError, concatMap, map, mergeMap, toArray } from 'rxjs/operators';
|
||||
import semver from 'semver';
|
||||
|
||||
@ -41,7 +41,7 @@ import {
|
||||
} from './SearchTraceQLEditor/utils';
|
||||
import { TempoVariableQuery, TempoVariableQueryType } from './VariableQueryEditor';
|
||||
import { PrometheusDatasource, PromQuery } from './_importedDependencies/datasources/prometheus/types';
|
||||
import { TraceqlFilter, TraceqlSearchScope } from './dataquery.gen';
|
||||
import { SearchTableType, TraceqlFilter, TraceqlSearchScope } from './dataquery.gen';
|
||||
import {
|
||||
defaultTableFilter,
|
||||
durationMetric,
|
||||
@ -69,7 +69,7 @@ import { TempoVariableSupport } from './variables';
|
||||
export const DEFAULT_LIMIT = 20;
|
||||
export const DEFAULT_SPSS = 3; // spans per span set
|
||||
|
||||
enum FeatureName {
|
||||
export enum FeatureName {
|
||||
streaming = 'streaming',
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ enum FeatureName {
|
||||
** feature available. If the running Tempo instance on the user's backend is older than the
|
||||
** target version, the feature is disabled in Grafana (frontend).
|
||||
*/
|
||||
const featuresToTempoVersion = {
|
||||
export const featuresToTempoVersion = {
|
||||
[FeatureName.streaming]: '2.2.0',
|
||||
};
|
||||
|
||||
@ -115,6 +115,10 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
spanBar?: SpanBarOptions;
|
||||
languageProvider: TempoLanguageProvider;
|
||||
|
||||
streamingEnabled?: {
|
||||
search?: boolean;
|
||||
};
|
||||
|
||||
// The version of Tempo running on the backend. `null` if we cannot retrieve it for whatever reason
|
||||
tempoVersion?: string | null;
|
||||
|
||||
@ -129,6 +133,8 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
this.search = instanceSettings.jsonData.search;
|
||||
this.nodeGraph = instanceSettings.jsonData.nodeGraph;
|
||||
this.traceQuery = instanceSettings.jsonData.traceQuery;
|
||||
this.streamingEnabled = instanceSettings.jsonData.streamingEnabled;
|
||||
|
||||
this.languageProvider = new TempoLanguageProvider(this);
|
||||
|
||||
if (!this.search?.filters) {
|
||||
@ -259,6 +265,25 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if streaming for search queries is enabled (and available).
|
||||
*
|
||||
* We need to check:
|
||||
* - the `traceQLStreaming` feature toggle, to disable streaming if customer support turned off the toggle in the past, which usually means that streaming does not work properly for the customer
|
||||
* - the recently created Tempo data source plugin toggle, to disable streaming if the user disabled it in the data source configuration
|
||||
* - whether streaming is actually available based on the Tempo version, just as a sanity check
|
||||
*
|
||||
* @return true if streaming for search queries is enabled, false otherwise
|
||||
*/
|
||||
isStreamingSearchEnabled() {
|
||||
return (
|
||||
config.featureToggles.traceQLStreaming &&
|
||||
this.isFeatureAvailable(FeatureName.streaming) &&
|
||||
config.liveEnabled &&
|
||||
this.streamingEnabled?.search !== false
|
||||
);
|
||||
}
|
||||
|
||||
query(options: DataQueryRequest<TempoQuery>): Observable<DataQueryResponse> {
|
||||
const subQueries: Array<Observable<DataQueryResponse>> = [];
|
||||
const filteredTargets = options.targets.filter((target) => !target.hide);
|
||||
@ -317,7 +342,7 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
app: options.app ?? '',
|
||||
grafana_version: config.buildInfo.version,
|
||||
query: queryValue ?? '',
|
||||
streaming: config.featureToggles.traceQLStreaming,
|
||||
streaming: this.streamingEnabled,
|
||||
});
|
||||
subQueries.push(this.handleTraceQlQuery(options, targets, queryValue));
|
||||
}
|
||||
@ -351,14 +376,10 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
app: options.app ?? '',
|
||||
grafana_version: config.buildInfo.version,
|
||||
query: queryValueFromFilters ?? '',
|
||||
streaming: config.featureToggles.traceQLStreaming,
|
||||
streaming: this.streamingEnabled,
|
||||
});
|
||||
|
||||
if (
|
||||
config.featureToggles.traceQLStreaming &&
|
||||
this.isFeatureAvailable(FeatureName.streaming) &&
|
||||
config.liveEnabled
|
||||
) {
|
||||
if (this.isStreamingSearchEnabled()) {
|
||||
subQueries.push(this.handleStreamingSearch(options, traceqlSearchTargets, queryValueFromFilters));
|
||||
} else {
|
||||
subQueries.push(
|
||||
@ -603,11 +624,7 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
},
|
||||
queryValue: string
|
||||
): Observable<DataQueryResponse> => {
|
||||
if (
|
||||
config.featureToggles.traceQLStreaming &&
|
||||
this.isFeatureAvailable(FeatureName.streaming) &&
|
||||
config.liveEnabled
|
||||
) {
|
||||
if (this.isStreamingSearchEnabled()) {
|
||||
return this.handleStreamingSearch(options, targets.traceql, queryValue);
|
||||
} else {
|
||||
return this._request('/api/search', {
|
||||
@ -717,24 +734,86 @@ export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJson
|
||||
}
|
||||
|
||||
async testDatasource(): Promise<TestDataSourceResponse> {
|
||||
const observables = [];
|
||||
|
||||
const options: BackendSrvRequest = {
|
||||
headers: {},
|
||||
method: 'GET',
|
||||
url: `${this.instanceSettings.url}/api/echo`,
|
||||
};
|
||||
|
||||
return await lastValueFrom(
|
||||
observables.push(
|
||||
getBackendSrv()
|
||||
.fetch(options)
|
||||
.pipe(
|
||||
mergeMap(() => {
|
||||
return of({ status: 'success', message: 'Data source successfully connected.' });
|
||||
return of({ status: 'success', message: 'Health check succeeded' });
|
||||
}),
|
||||
catchError((err) => {
|
||||
return of({ status: 'error', message: getErrorMessage(err.data.message, 'Unable to connect with Tempo') });
|
||||
return of({
|
||||
status: 'error',
|
||||
message: getErrorMessage(err.data.message, 'Unable to connect with Tempo'),
|
||||
});
|
||||
})
|
||||
)
|
||||
);
|
||||
|
||||
if (this.streamingEnabled?.search) {
|
||||
const now = new Date();
|
||||
const from = new Date(now);
|
||||
from.setMinutes(from.getMinutes() - 15);
|
||||
observables.push(
|
||||
this.handleStreamingSearch(
|
||||
{
|
||||
range: {
|
||||
from: dateTime(from),
|
||||
to: dateTime(now),
|
||||
raw: { from: 'now-15m', to: 'now' },
|
||||
},
|
||||
requestId: '',
|
||||
interval: '',
|
||||
intervalMs: 0,
|
||||
scopedVars: {},
|
||||
targets: [],
|
||||
timezone: '',
|
||||
app: '',
|
||||
startTime: 0,
|
||||
},
|
||||
[
|
||||
{
|
||||
datasource: this.instanceSettings,
|
||||
limit: 1,
|
||||
query: '{}',
|
||||
queryType: 'traceql',
|
||||
refId: 'A',
|
||||
tableType: SearchTableType.Traces,
|
||||
filters: [],
|
||||
},
|
||||
],
|
||||
'{}'
|
||||
).pipe(
|
||||
mergeMap(() => {
|
||||
return of({ status: 'success', message: 'Streaming test succeeded.' });
|
||||
}),
|
||||
catchError((err) => {
|
||||
return of({
|
||||
status: 'error',
|
||||
message: getErrorMessage(err.data.message, 'Test for streaming failed, consider disabling streaming'),
|
||||
});
|
||||
})
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
return await lastValueFrom(
|
||||
forkJoin(observables).pipe(
|
||||
mergeMap((observableResults) => {
|
||||
const erroredResult = observableResults.find((result) => result.status !== 'success');
|
||||
return erroredResult
|
||||
? of(erroredResult)
|
||||
: of({ status: 'success', message: 'Successfully connected to Tempo data source.' });
|
||||
})
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
getQueryDisplayText(query: TempoQuery) {
|
||||
|
@ -20,6 +20,7 @@ import { SearchStreamingState } from './dataquery.gen';
|
||||
import { DEFAULT_SPSS, TempoDatasource } from './datasource';
|
||||
import { formatTraceQLResponse } from './resultTransformer';
|
||||
import { SearchMetrics, TempoJsonData, TempoQuery } from './types';
|
||||
|
||||
function getLiveStreamKey(): string {
|
||||
return uuidv4();
|
||||
}
|
||||
|
@ -21,6 +21,9 @@ export interface TempoJsonData extends DataSourceJsonData {
|
||||
spanStartTimeShift?: string;
|
||||
spanEndTimeShift?: string;
|
||||
};
|
||||
streamingEnabled?: {
|
||||
search?: boolean;
|
||||
};
|
||||
}
|
||||
|
||||
export interface TempoQuery extends TempoBase {
|
||||
|
Loading…
Reference in New Issue
Block a user