import { groupBy, identity, pick, pickBy, startCase } from 'lodash'; import { EMPTY, from, lastValueFrom, merge, Observable, of, throwError } from 'rxjs'; import { catchError, concatMap, map, mergeMap, toArray } from 'rxjs/operators'; import semver from 'semver'; import { CoreApp, DataFrame, DataFrameDTO, DataQueryRequest, DataQueryResponse, DataQueryResponseData, DataSourceApi, DataSourceInstanceSettings, dateTime, FieldType, isValidGoDuration, LoadingState, rangeUtil, ScopedVars, TestDataSourceResponse, urlUtil, } from '@grafana/data'; import { NodeGraphOptions, SpanBarOptions, TraceToLogsOptions } from '@grafana/o11y-ds-frontend'; import { BackendSrvRequest, config, DataSourceWithBackend, getBackendSrv, getDataSourceSrv, getTemplateSrv, reportInteraction, TemplateSrv, } from '@grafana/runtime'; import { BarGaugeDisplayMode, TableCellDisplayMode, VariableFormatID } from '@grafana/schema'; import { generateQueryFromFilters } from './SearchTraceQLEditor/utils'; import { TempoVariableQuery, TempoVariableQueryType } from './VariableQueryEditor'; import { LokiOptions } from './_importedDependencies/datasources/loki/types'; import { PromQuery, PrometheusDatasource } from './_importedDependencies/datasources/prometheus/types'; import { TraceqlFilter, TraceqlSearchScope } from './dataquery.gen'; import { defaultTableFilter, durationMetric, errorRateMetric, failedMetric, histogramMetric, mapPromMetricsToServiceMap, rateMetric, serviceMapMetrics, totalsMetric, } from './graphTransform'; import TempoLanguageProvider from './language_provider'; import { createTableFrameFromMetricsSummaryQuery, emptyResponse, MetricsSummary } from './metricsSummary'; import { createTableFrameFromSearch, transformFromOTLP as transformFromOTEL, transformTrace, transformTraceList, formatTraceQLResponse, } from './resultTransformer'; import { doTempoChannelStream } from './streaming'; import { SearchQueryParams, TempoJsonData, TempoQuery } from './types'; import { getErrorMessage } from './utils'; import { TempoVariableSupport } from './variables'; export const DEFAULT_LIMIT = 20; export const DEFAULT_SPSS = 3; // spans per span set enum FeatureName { streaming = 'streaming', } /* Map, for each feature (e.g., streaming), the minimum Tempo version required to have that ** feature available. If the running Tempo instance on the user's backend is older than the ** target version, the feature is disabled in Grafana (frontend). */ const featuresToTempoVersion = { [FeatureName.streaming]: '2.2.0', }; // The version that we use as default in case we cannot retrieve it from the backend. // This is the last minor version of Tempo that does not expose the endpoint for build information. const defaultTempoVersion = '2.1.0'; interface ServiceMapQueryResponse { nodes: DataFrame; edges: DataFrame; } interface ServiceMapQueryResponseWithRates { rates: Array; nodes: DataFrame; edges: DataFrame; } export class TempoDatasource extends DataSourceWithBackend { tracesToLogs?: TraceToLogsOptions; serviceMap?: { datasourceUid?: string; }; search?: { hide?: boolean; filters?: TraceqlFilter[]; }; nodeGraph?: NodeGraphOptions; lokiSearch?: { datasourceUid?: string; }; traceQuery?: { timeShiftEnabled?: boolean; spanStartTimeShift?: string; spanEndTimeShift?: string; }; uploadedJson?: string | ArrayBuffer | null = null; spanBar?: SpanBarOptions; languageProvider: TempoLanguageProvider; // The version of Tempo running on the backend. `null` if we cannot retrieve it for whatever reason tempoVersion?: string | null; constructor( private instanceSettings: DataSourceInstanceSettings, private readonly templateSrv: TemplateSrv = getTemplateSrv() ) { super(instanceSettings); this.tracesToLogs = instanceSettings.jsonData.tracesToLogs; this.serviceMap = instanceSettings.jsonData.serviceMap; this.search = instanceSettings.jsonData.search; this.nodeGraph = instanceSettings.jsonData.nodeGraph; this.lokiSearch = instanceSettings.jsonData.lokiSearch; this.traceQuery = instanceSettings.jsonData.traceQuery; this.languageProvider = new TempoLanguageProvider(this); if (!this.search?.filters) { this.search = { ...this.search, filters: [ { id: 'service-name', tag: 'service.name', operator: '=', scope: TraceqlSearchScope.Resource, }, { id: 'span-name', tag: 'name', operator: '=', scope: TraceqlSearchScope.Span }, ], }; } this.variables = new TempoVariableSupport(this); } async executeVariableQuery(query: TempoVariableQuery) { // Avoid failing if the user did not select the query type (label names, label values, etc.) if (query.type === undefined) { return new Promise>(() => []); } switch (query.type) { case TempoVariableQueryType.LabelNames: { return await this.labelNamesQuery(); } case TempoVariableQueryType.LabelValues: { return this.labelValuesQuery(query.label); } default: { throw Error('Invalid query type: ' + query.type); } } } async labelNamesQuery(): Promise> { await this.languageProvider.fetchTags(); const tags = this.languageProvider.getAutocompleteTags(); return tags.filter((tag) => tag !== undefined).map((tag) => ({ text: tag })); } async labelValuesQuery(labelName?: string): Promise> { if (!labelName) { return []; } let options; try { // Retrieve the scope of the tag // Example: given `http.status_code`, we want scope `span` // Note that we ignore possible name clashes, e.g., `http.status_code` in both `span` and `resource` const scope: string | undefined = (this.languageProvider.tagsV2 || []) // flatten the Scope objects .flatMap((tagV2) => tagV2.tags.map((tag) => ({ scope: tagV2.name, name: tag }))) // find associated scope .find((tag) => tag.name === labelName)?.scope; if (!scope) { throw Error(`Scope for tag ${labelName} not found`); } // For V2, we need to send scope and tag name, e.g. `span.http.status_code`, // unless the tag has intrinsic scope const scopeAndTag = scope === 'intrinsic' ? labelName : `${scope}.${labelName}`; options = await this.languageProvider.getOptionsV2(scopeAndTag); } catch { // For V1, the tag name (e.g. `http.status_code`) is enough options = await this.languageProvider.getOptionsV1(labelName); } return options.filter((option) => option.value !== undefined).map((option) => ({ text: option.value })) as Array<{ text: string; }>; } init = async () => { const response = await lastValueFrom( this._request('/api/status/buildinfo').pipe( map((response) => response), catchError((error) => { console.error('Failure in retrieving build information', error.data.message); return of({ error, data: { version: null } }); // unknown version }) ) ); this.tempoVersion = response.data.version; }; /** * Check, for the given feature, whether it is available in Grafana. * * The check is done based on the version of the Tempo instance running on the backend and * the minimum version required by the given feature to work. * * @param featureName - the name of the feature to consider * @return true if the feature is available, false otherwise */ private isFeatureAvailable(featureName: FeatureName) { // We know for old Tempo instances we don't know their version, so resort to default const actualVersion = this.tempoVersion ?? defaultTempoVersion; try { return semver.gte(actualVersion, featuresToTempoVersion[featureName]); } catch { // We assume we are on a development and recent branch, thus we enable all features return true; } } query(options: DataQueryRequest): Observable { const subQueries: Array> = []; const filteredTargets = options.targets.filter((target) => !target.hide); const targets: { [type: string]: TempoQuery[] } = groupBy(filteredTargets, (t) => t.queryType || 'traceql'); if (targets.clear) { return of({ data: [], state: LoadingState.Done }); } const logsDatasourceUid = this.getLokiSearchDS(); // Run search queries on linked datasource if (logsDatasourceUid && targets.search?.length > 0) { reportInteraction('grafana_traces_loki_search_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, hasLinkedQueryExpr: targets.search[0].linkedQuery?.expr && targets.search[0].linkedQuery?.expr !== '' ? true : false, }); const dsSrv = getDataSourceSrv(); subQueries.push( from(dsSrv.get(logsDatasourceUid)).pipe( mergeMap((linkedDatasource: DataSourceApi) => { // Wrap linked query into a data request based on original request const linkedRequest: DataQueryRequest = { ...options, targets: targets.search.map((t) => t.linkedQuery!) }; // Find trace matchers in derived fields of the linked datasource that's identical to this datasource const settings: DataSourceInstanceSettings = (linkedDatasource as any).instanceSettings; const traceLinkMatcher: string[] = settings.jsonData.derivedFields ?.filter((field) => field.datasourceUid === this.uid && field.matcherRegex) .map((field) => field.matcherRegex) || []; if (!traceLinkMatcher || traceLinkMatcher.length === 0) { return throwError( () => new Error( 'No Loki datasource configured for search. Set up Derived Fields for traces in a Loki datasource settings and link it to this Tempo datasource.' ) ); } else { return (linkedDatasource.query(linkedRequest) as Observable).pipe( map((response) => response.error ? response : transformTraceList(response, this.uid, this.name, traceLinkMatcher) ) ); } }) ) ); } if (targets.nativeSearch?.length) { try { reportInteraction('grafana_traces_search_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, hasServiceName: targets.nativeSearch[0].serviceName ? true : false, hasSpanName: targets.nativeSearch[0].spanName ? true : false, resultLimit: targets.nativeSearch[0].limit ?? '', hasSearch: targets.nativeSearch[0].search ? true : false, minDuration: targets.nativeSearch[0].minDuration ?? '', maxDuration: targets.nativeSearch[0].maxDuration ?? '', }); const timeRange = { startTime: options.range.from.unix(), endTime: options.range.to.unix() }; const query = this.applyVariables(targets.nativeSearch[0], options.scopedVars); const searchQuery = this.buildSearchQuery(query, timeRange); subQueries.push( this._request('/api/search', searchQuery).pipe( map((response) => { return { data: [createTableFrameFromSearch(response.data.traces, this.instanceSettings)], }; }), catchError((err) => { return of({ error: { message: getErrorMessage(err.data.message) }, data: [] }); }) ) ); } catch (error) { return of({ error: { message: error instanceof Error ? error.message : 'Unknown error occurred' }, data: [] }); } } if (targets.traceql?.length) { try { const appliedQuery = this.applyVariables(targets.traceql[0], options.scopedVars); const queryValue = appliedQuery?.query || ''; const hexOnlyRegex = /^[0-9A-Fa-f]*$/; // Check whether this is a trace ID or traceQL query by checking if it only contains hex characters if (queryValue.trim().match(hexOnlyRegex)) { // There's only hex characters so let's assume that this is a trace ID reportInteraction('grafana_traces_traceID_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, hasQuery: queryValue !== '' ? true : false, }); subQueries.push(this.handleTraceIdQuery(options, targets.traceql)); } else { reportInteraction('grafana_traces_traceql_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, query: queryValue ?? '', streaming: config.featureToggles.traceQLStreaming, }); if (config.featureToggles.traceQLStreaming && this.isFeatureAvailable(FeatureName.streaming)) { subQueries.push(this.handleStreamingSearch(options, targets.traceql, queryValue)); } else { subQueries.push( this._request('/api/search', { q: queryValue, limit: options.targets[0].limit ?? DEFAULT_LIMIT, spss: options.targets[0].spss ?? DEFAULT_SPSS, start: options.range.from.unix(), end: options.range.to.unix(), }).pipe( map((response) => { return { data: formatTraceQLResponse( response.data.traces, this.instanceSettings, targets.traceql[0].tableType ), }; }), catchError((err) => { return of({ error: { message: getErrorMessage(err.data.message) }, data: [] }); }) ) ); } } } catch (error) { return of({ error: { message: error instanceof Error ? error.message : 'Unknown error occurred' }, data: [] }); } } if (targets.traceqlSearch?.length) { try { if (config.featureToggles.metricsSummary) { const groupBy = targets.traceqlSearch.find((t) => this.hasGroupBy(t)); if (groupBy) { subQueries.push(this.handleMetricsSummary(groupBy, generateQueryFromFilters(groupBy.filters), options)); } } const traceqlSearchTargets = config.featureToggles.metricsSummary ? targets.traceqlSearch.filter((t) => !this.hasGroupBy(t)) : targets.traceqlSearch; if (traceqlSearchTargets.length > 0) { const queryValueFromFilters = generateQueryFromFilters(traceqlSearchTargets[0].filters); // We want to support template variables also in Search for consistency with other data sources const queryValue = this.templateSrv.replace(queryValueFromFilters, options.scopedVars); reportInteraction('grafana_traces_traceql_search_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, query: queryValue ?? '', streaming: config.featureToggles.traceQLStreaming, }); if (config.featureToggles.traceQLStreaming && this.isFeatureAvailable(FeatureName.streaming)) { subQueries.push(this.handleStreamingSearch(options, traceqlSearchTargets, queryValue)); } else { subQueries.push( this._request('/api/search', { q: queryValue, limit: options.targets[0].limit ?? DEFAULT_LIMIT, spss: options.targets[0].spss ?? DEFAULT_SPSS, start: options.range.from.unix(), end: options.range.to.unix(), }).pipe( map((response) => { return { data: formatTraceQLResponse( response.data.traces, this.instanceSettings, targets.traceqlSearch[0].tableType ), }; }), catchError((err) => { return of({ error: { message: getErrorMessage(err.data.message) }, data: [] }); }) ) ); } } } catch (error) { return of({ error: { message: error instanceof Error ? error.message : 'Unknown error occurred' }, data: [] }); } } if (targets.upload?.length) { if (this.uploadedJson) { reportInteraction('grafana_traces_json_file_uploaded', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, }); const jsonData = JSON.parse(this.uploadedJson as string); const isTraceData = jsonData.batches; const isServiceGraphData = Array.isArray(jsonData) && jsonData.some((df) => df?.meta?.preferredVisualisationType === 'nodeGraph'); if (isTraceData) { subQueries.push(of(transformFromOTEL(jsonData.batches, this.nodeGraph?.enabled))); } else if (isServiceGraphData) { subQueries.push(of({ data: jsonData, state: LoadingState.Done })); } else { subQueries.push(of({ error: { message: 'Unable to parse uploaded data.' }, data: [] })); } } else { subQueries.push(of({ data: [], state: LoadingState.Done })); } } if (this.serviceMap?.datasourceUid && targets.serviceMap?.length > 0) { reportInteraction('grafana_traces_service_graph_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, hasServiceMapQuery: targets.serviceMap[0].serviceMapQuery ? true : false, }); const dsId = this.serviceMap.datasourceUid; const tempoDsUid = this.uid; subQueries.push( serviceMapQuery(options, dsId, tempoDsUid).pipe( concatMap((result) => rateQuery(options, result, dsId).pipe( concatMap((result) => errorAndDurationQuery(options, result, dsId, tempoDsUid)) ) ) ) ); } return merge(...subQueries); } applyTemplateVariables(query: TempoQuery, scopedVars: ScopedVars) { return this.applyVariables(query, scopedVars); } interpolateVariablesInQueries(queries: TempoQuery[], scopedVars: ScopedVars): TempoQuery[] { if (!queries || queries.length === 0) { return []; } return queries.map((query) => { return { ...query, datasource: this.getRef(), ...this.applyVariables(query, scopedVars), }; }); } applyVariables(query: TempoQuery, scopedVars: ScopedVars) { const expandedQuery = { ...query }; if (query.linkedQuery) { expandedQuery.linkedQuery = { ...query.linkedQuery, expr: this.templateSrv.replace(query.linkedQuery?.expr ?? '', scopedVars), }; } return { ...expandedQuery, query: this.templateSrv.replace(query.query ?? '', scopedVars, VariableFormatID.Pipe), serviceName: this.templateSrv.replace(query.serviceName ?? '', scopedVars), spanName: this.templateSrv.replace(query.spanName ?? '', scopedVars), search: this.templateSrv.replace(query.search ?? '', scopedVars), minDuration: this.templateSrv.replace(query.minDuration ?? '', scopedVars), maxDuration: this.templateSrv.replace(query.maxDuration ?? '', scopedVars), serviceMapQuery: Array.isArray(query.serviceMapQuery) ? query.serviceMapQuery.map((query) => this.templateSrv.replace(query, scopedVars)) : this.templateSrv.replace(query.serviceMapQuery ?? '', scopedVars), }; } handleMetricsSummary = (target: TempoQuery, query: string, options: DataQueryRequest) => { reportInteraction('grafana_traces_metrics_summary_queried', { datasourceType: 'tempo', app: options.app ?? '', grafana_version: config.buildInfo.version, filterCount: target.groupBy?.length ?? 0, }); if (query === '{}') { return of({ error: { message: 'Please ensure you do not have an empty query. This is so filters are applied and the metrics summary is not generated from all spans.', }, data: emptyResponse, }); } const groupBy = target.groupBy ? this.formatGroupBy(target.groupBy) : ''; return this._request('/api/metrics/summary', { q: query, groupBy, start: options.range.from.unix(), end: options.range.to.unix(), }).pipe( map((response) => { if (!response.data.summaries) { return { error: { message: getErrorMessage( `No summary data for '${groupBy}'. Note: the metrics summary API only considers spans of kind = server. You can check if the attributes exist by running a TraceQL query like { attr_key = attr_value && kind = server }` ), }, data: emptyResponse, }; } // Check if any of the results have series data as older versions of Tempo placed the series data in a different structure const hasSeries = response.data.summaries.some((summary: MetricsSummary) => summary.series.length > 0); if (!hasSeries) { return { error: { message: getErrorMessage(`No series data. Ensure you are using an up to date version of Tempo`), }, data: emptyResponse, }; } return { data: createTableFrameFromMetricsSummaryQuery(response.data.summaries, query, this.instanceSettings), }; }), catchError((error) => { return of({ error: { message: getErrorMessage(error.data.message) }, data: emptyResponse, }); }) ); }; formatGroupBy = (groupBy: TraceqlFilter[]) => { return groupBy ?.filter((f) => f.tag) .map((f) => { if (f.scope === TraceqlSearchScope.Unscoped) { return `.${f.tag}`; } return f.scope !== TraceqlSearchScope.Intrinsic ? `${f.scope}.${f.tag}` : f.tag; }) .join(', '); }; hasGroupBy = (query: TempoQuery) => { return query.groupBy?.find((gb) => gb.tag); }; /** * Handles the simplest of the queries where we have just a trace id and return trace data for it. * @param options * @param targets * @private */ handleTraceIdQuery(options: DataQueryRequest, targets: TempoQuery[]): Observable { const validTargets = targets .filter((t) => t.query) .map((t): TempoQuery => ({ ...t, query: t.query?.trim(), queryType: 'traceId' })); if (!validTargets.length) { return EMPTY; } const traceRequest = this.traceIdQueryRequest(options, validTargets); return super.query(traceRequest).pipe( map((response) => { if (response.error) { return response; } return transformTrace(response, this.instanceSettings, this.nodeGraph?.enabled); }) ); } traceIdQueryRequest(options: DataQueryRequest, targets: TempoQuery[]): DataQueryRequest { const request = { ...options, targets, }; if (this.traceQuery?.timeShiftEnabled) { request.range = options.range && { ...options.range, from: options.range.from.subtract( rangeUtil.intervalToMs(this.traceQuery?.spanStartTimeShift || '30m'), 'milliseconds' ), to: options.range.to.add(rangeUtil.intervalToMs(this.traceQuery?.spanEndTimeShift || '30m'), 'milliseconds'), }; } else { request.range = { from: dateTime(0), to: dateTime(0), raw: { from: dateTime(0), to: dateTime(0) } }; } return request; } // This function can probably be simplified by avoiding passing both `targets` and `query`, // since `query` is built from `targets`, if you look at how this function is currently called handleStreamingSearch( options: DataQueryRequest, targets: TempoQuery[], query: string ): Observable { if (query === '') { return EMPTY; } return merge( ...targets.map((target) => doTempoChannelStream( { ...target, query }, this, // the datasource options, this.instanceSettings ) ) ); } async metadataRequest(url: string, params = {}) { return await lastValueFrom(this._request(url, params, { method: 'GET', hideFromInspector: true })); } private _request( apiUrl: string, data?: unknown, options?: Partial ): Observable> { const params = data ? urlUtil.serializeParams(data) : ''; const url = `${this.instanceSettings.url}${apiUrl}${params.length ? `?${params}` : ''}`; const req = { ...options, url }; return getBackendSrv().fetch(req); } async testDatasource(): Promise { const options: BackendSrvRequest = { headers: {}, method: 'GET', url: `${this.instanceSettings.url}/api/echo`, }; return await lastValueFrom( getBackendSrv() .fetch(options) .pipe( mergeMap(() => { return of({ status: 'success', message: 'Data source successfully connected.' }); }), catchError((err) => { return of({ status: 'error', message: getErrorMessage(err.data.message, 'Unable to connect with Tempo') }); }) ) ); } getQueryDisplayText(query: TempoQuery) { if (query.queryType === 'nativeSearch') { let result = []; for (const key of ['serviceName', 'spanName', 'search', 'minDuration', 'maxDuration', 'limit']) { if (query.hasOwnProperty(key) && query[key as keyof TempoQuery]) { result.push(`${startCase(key)}: ${query[key as keyof TempoQuery]}`); } } return result.join(', '); } return query.query ?? ''; } buildSearchQuery(query: TempoQuery, timeRange?: { startTime: number; endTime?: number }): SearchQueryParams { let tags = query.search ?? ''; let tempoQuery = pick(query, ['minDuration', 'maxDuration', 'limit']); // Remove empty properties tempoQuery = pickBy(tempoQuery, identity); if (query.serviceName) { tags += ` service.name="${query.serviceName}"`; } if (query.spanName) { tags += ` name="${query.spanName}"`; } // Set default limit if (!tempoQuery.limit) { tempoQuery.limit = DEFAULT_LIMIT; } // Validate query inputs and remove spaces if valid if (tempoQuery.minDuration) { tempoQuery.minDuration = this.templateSrv.replace(tempoQuery.minDuration ?? ''); if (!isValidGoDuration(tempoQuery.minDuration)) { throw new Error('Please enter a valid min duration.'); } tempoQuery.minDuration = tempoQuery.minDuration.replace(/\s/g, ''); } if (tempoQuery.maxDuration) { tempoQuery.maxDuration = this.templateSrv.replace(tempoQuery.maxDuration ?? ''); if (!isValidGoDuration(tempoQuery.maxDuration)) { throw new Error('Please enter a valid max duration.'); } tempoQuery.maxDuration = tempoQuery.maxDuration.replace(/\s/g, ''); } if (!Number.isInteger(tempoQuery.limit) || tempoQuery.limit <= 0) { throw new Error('Please enter a valid limit.'); } let searchQuery: SearchQueryParams = { tags, ...tempoQuery }; if (timeRange) { searchQuery.start = timeRange.startTime; searchQuery.end = timeRange.endTime; } return searchQuery; } // Get linked loki search datasource. Fall back to legacy loki search/trace to logs config getLokiSearchDS = (): string | undefined => { const legacyLogsDatasourceUid = this.tracesToLogs?.lokiSearch !== false && this.lokiSearch === undefined ? this.tracesToLogs?.datasourceUid : undefined; return this.lokiSearch?.datasourceUid ?? legacyLogsDatasourceUid; }; } function queryPrometheus(request: DataQueryRequest, datasourceUid: string) { return from(getDataSourceSrv().get(datasourceUid)).pipe( mergeMap((ds) => { return (ds as PrometheusDatasource).query(request); }) ); } function serviceMapQuery( request: DataQueryRequest, datasourceUid: string, tempoDatasourceUid: string ): Observable { const serviceMapRequest = makePromServiceMapRequest(request); return queryPrometheus(serviceMapRequest, datasourceUid).pipe( // Just collect all the responses first before processing into node graph data toArray(), map((responses: DataQueryResponse[]) => { const errorRes = responses.find((res) => !!res.error); if (errorRes) { throw new Error(getErrorMessage(errorRes.error?.message)); } const { nodes, edges } = mapPromMetricsToServiceMap(responses, request.range); if (nodes.fields.length > 0 && edges.fields.length > 0) { const nodeLength = nodes.fields[0].values.length; const edgeLength = edges.fields[0].values.length; reportInteraction('grafana_traces_service_graph_size', { datasourceType: 'tempo', grafana_version: config.buildInfo.version, nodeLength, edgeLength, }); } // No handling of multiple targets assume just one. NodeGraph does not support it anyway, but still should be // fixed at some point. const { serviceMapIncludeNamespace, refId } = request.targets[0]; nodes.refId = refId; edges.refId = refId; if (serviceMapIncludeNamespace) { nodes.fields[0].config = getFieldConfig( datasourceUid, // datasourceUid tempoDatasourceUid, // tempoDatasourceUid '__data.fields.title', // targetField '__data.fields[0]', // tempoField undefined, // sourceField { targetNamespace: '__data.fields.subtitle' } ); edges.fields[0].config = getFieldConfig( datasourceUid, // datasourceUid tempoDatasourceUid, // tempoDatasourceUid '__data.fields.targetName', // targetField '__data.fields.target', // tempoField '__data.fields.sourceName', // sourceField { targetNamespace: '__data.fields.targetNamespace', sourceNamespace: '__data.fields.sourceNamespace' } ); } else { nodes.fields[0].config = getFieldConfig( datasourceUid, tempoDatasourceUid, '__data.fields.id', '__data.fields[0]' ); edges.fields[0].config = getFieldConfig( datasourceUid, tempoDatasourceUid, '__data.fields.target', '__data.fields.target', '__data.fields.source' ); } return { nodes, edges, state: LoadingState.Done, }; }) ); } function rateQuery( request: DataQueryRequest, serviceMapResponse: ServiceMapQueryResponse, datasourceUid: string ): Observable { const serviceMapRequest = makePromServiceMapRequest(request); serviceMapRequest.targets = makeServiceGraphViewRequest([buildExpr(rateMetric, defaultTableFilter, request)]); return queryPrometheus(serviceMapRequest, datasourceUid).pipe( toArray(), map((responses: DataQueryResponse[]) => { const errorRes = responses.find((res) => !!res.error); if (errorRes) { throw new Error(getErrorMessage(errorRes.error?.message)); } return { rates: responses[0]?.data ?? [], nodes: serviceMapResponse.nodes, edges: serviceMapResponse.edges, }; }) ); } // we need the response from the rate query to get the rate span_name(s), // -> which determine the errorRate/duration span_name(s) we need to query function errorAndDurationQuery( request: DataQueryRequest, rateResponse: ServiceMapQueryResponseWithRates, datasourceUid: string, tempoDatasourceUid: string ) { let serviceGraphViewMetrics = []; let errorRateBySpanName = ''; let durationsBySpanName: string[] = []; let labels = []; if (rateResponse.rates[0] && request.app === CoreApp.Explore) { const spanNameField = rateResponse.rates[0].fields.find((field) => field.name === 'span_name'); if (spanNameField && spanNameField.values) { labels = spanNameField.values; } } else if (rateResponse.rates) { rateResponse.rates.map((df: DataFrame | DataFrameDTO) => { const spanNameLabels = df.fields.find((field) => field.labels?.['span_name']); if (spanNameLabels) { labels.push(spanNameLabels.labels?.['span_name']); } }); } const spanNames = getEscapedSpanNames(labels); if (spanNames.length > 0) { errorRateBySpanName = buildExpr(errorRateMetric, 'span_name=~"' + spanNames.join('|') + '"', request); serviceGraphViewMetrics.push(errorRateBySpanName); spanNames.map((name: string) => { const metric = buildExpr(durationMetric, 'span_name=~"' + name + '"', request); durationsBySpanName.push(metric); serviceGraphViewMetrics.push(metric); }); } const serviceMapRequest = makePromServiceMapRequest(request); serviceMapRequest.targets = makeServiceGraphViewRequest(serviceGraphViewMetrics); return queryPrometheus(serviceMapRequest, datasourceUid).pipe( // Just collect all the responses first before processing into node graph data toArray(), map((errorAndDurationResponse: DataQueryResponse[]) => { const errorRes = errorAndDurationResponse.find((res) => !!res.error); if (errorRes) { throw new Error(getErrorMessage(errorRes.error?.message)); } const serviceGraphView = getServiceGraphView( request, rateResponse, errorAndDurationResponse[0], errorRateBySpanName, durationsBySpanName, datasourceUid, tempoDatasourceUid ); if (serviceGraphView.fields.length === 0) { return { data: [rateResponse.nodes, rateResponse.edges], state: LoadingState.Done, }; } return { data: [serviceGraphView, rateResponse.nodes, rateResponse.edges], state: LoadingState.Done, }; }) ); } function makePromLink(title: string, expr: string, datasourceUid: string, instant: boolean) { return { url: '', title, internal: { query: { expr: expr, range: !instant, exemplar: !instant, instant: instant, }, datasourceUid, datasourceName: getDataSourceSrv().getInstanceSettings(datasourceUid)?.name ?? '', }, }; } export function getEscapedSpanNames(values: string[]) { return values.map((value: string) => value.replace(/[.*+?^${}()|[\]\\]/g, '\\\\$&')); } export function getFieldConfig( datasourceUid: string, tempoDatasourceUid: string, targetField: string, tempoField: string, sourceField?: string, namespaceFields?: { targetNamespace: string; sourceNamespace?: string } ) { let source = sourceField ? `client="\${${sourceField}}",` : ''; let target = `server="\${${targetField}}"`; let serverSumBy = 'server'; if (namespaceFields !== undefined) { const { targetNamespace } = namespaceFields; target += `,server_service_namespace="\${${targetNamespace}}"`; serverSumBy += ', server_service_namespace'; if (source) { const { sourceNamespace } = namespaceFields; source += `client_service_namespace="\${${sourceNamespace}}",`; serverSumBy += ', client_service_namespace'; } } return { links: [ makePromLink( 'Request rate', `sum by (client, ${serverSumBy})(rate(${totalsMetric}{${source}${target}}[$__rate_interval]))`, datasourceUid, false ), makePromLink( 'Request histogram', `histogram_quantile(0.9, sum(rate(${histogramMetric}{${source}${target}}[$__rate_interval])) by (le, client, ${serverSumBy}))`, datasourceUid, false ), makePromLink( 'Failed request rate', `sum by (client, ${serverSumBy})(rate(${failedMetric}{${source}${target}}[$__rate_interval]))`, datasourceUid, false ), makeTempoLink('View traces', `\${${tempoField}}`, '', tempoDatasourceUid), ], }; } export function makeTempoLink(title: string, serviceName: string, spanName: string, datasourceUid: string) { let query: TempoQuery = { refId: 'A', queryType: 'traceqlSearch', filters: [] }; if (serviceName !== '') { query.filters.push({ id: 'service-name', scope: TraceqlSearchScope.Resource, tag: 'service.name', value: serviceName, operator: '=', valueType: 'string', }); } if (spanName !== '') { query.filters.push({ id: 'span-name', scope: TraceqlSearchScope.Span, tag: 'name', value: spanName, operator: '=', valueType: 'string', }); } return { url: '', title, internal: { query, datasourceUid, datasourceName: getDataSourceSrv().getInstanceSettings(datasourceUid)?.name ?? '', }, }; } function makePromServiceMapRequest(options: DataQueryRequest): DataQueryRequest { return { ...options, targets: serviceMapMetrics.map((metric) => { const { serviceMapQuery, serviceMapIncludeNamespace: serviceMapIncludeNamespace } = options.targets[0]; const extraSumByFields = serviceMapIncludeNamespace ? ', client_service_namespace, server_service_namespace' : ''; const queries = Array.isArray(serviceMapQuery) ? serviceMapQuery : [serviceMapQuery]; const subExprs = queries.map( (query) => `sum by (client, server${extraSumByFields}) (rate(${metric}${query || ''}[$__range]))` ); return { format: 'table', refId: metric, // options.targets[0] is not correct here, but not sure what should happen if you have multiple queries for // service map at the same time anyway expr: subExprs.join(' OR '), instant: true, }; }), }; } function getServiceGraphView( request: DataQueryRequest, rateResponse: ServiceMapQueryResponseWithRates, secondResponse: DataQueryResponse, errorRateBySpanName: string, durationsBySpanName: string[], datasourceUid: string, tempoDatasourceUid: string ) { let df: any = { fields: [] }; const rate = rateResponse.rates.filter((x) => { return x.refId === buildExpr(rateMetric, defaultTableFilter, request); }); const errorRate = secondResponse.data.filter((x) => { return x.refId === errorRateBySpanName; }); const duration = secondResponse.data.filter((x) => { return durationsBySpanName.includes(x.refId ?? ''); }); if (rate.length > 0 && rate[0].fields?.length > 2) { df.fields.push({ ...rate[0].fields[1], name: 'Name', config: { filterable: false, }, }); df.fields.push({ ...rate[0].fields[2], name: 'Rate', config: { links: [ makePromLink( 'Rate', buildLinkExpr(buildExpr(rateMetric, 'span_name="${__data.fields[0]}"', request)), datasourceUid, false ), ], decimals: 2, }, }); df.fields.push({ ...rate[0].fields[2], name: ' ', labels: null, config: { color: { mode: 'continuous-BlPu', }, custom: { cellOptions: { mode: BarGaugeDisplayMode.Lcd, type: TableCellDisplayMode.Gauge, }, }, decimals: 3, }, }); } if (errorRate.length > 0 && errorRate[0].fields?.length > 2) { const errorRateNames = errorRate[0].fields[1]?.values ?? []; const errorRateValues = errorRate[0].fields[2]?.values ?? []; let errorRateObj: any = {}; errorRateNames.map((name: string, index: number) => { errorRateObj[name] = { value: errorRateValues[index] }; }); const values = getRateAlignedValues({ ...rate }, errorRateObj); df.fields.push({ ...errorRate[0].fields[2], name: 'Error Rate', values: values, config: { links: [ makePromLink( 'Error Rate', buildLinkExpr(buildExpr(errorRateMetric, 'span_name="${__data.fields[0]}"', request)), datasourceUid, false ), ], decimals: 2, }, }); df.fields.push({ ...errorRate[0].fields[2], name: ' ', values: values, labels: null, config: { color: { mode: 'continuous-RdYlGr', }, custom: { cellOptions: { mode: BarGaugeDisplayMode.Lcd, type: TableCellDisplayMode.Gauge, }, }, decimals: 3, }, }); } if (duration.length > 0) { let durationObj: any = {}; duration.forEach((d) => { if (d.fields.length > 1) { const delimiter = d.refId?.includes('span_name=~"') ? 'span_name=~"' : 'span_name="'; const name = d.refId?.split(delimiter)[1].split('"}')[0]; durationObj[name!] = { value: d.fields[1].values[0] }; } }); if (Object.keys(durationObj).length > 0) { df.fields.push({ ...duration[0].fields[1], name: 'Duration (p90)', values: getRateAlignedValues({ ...rate }, durationObj), config: { links: [ makePromLink( 'Duration', buildLinkExpr(buildExpr(durationMetric, 'span_name="${__data.fields[0]}"', request)), datasourceUid, false ), ], unit: 's', }, }); } } if (df.fields.length > 0 && df.fields[0].values) { df.fields.push({ name: 'Links', type: FieldType.string, values: df.fields[0].values.map(() => { return 'Tempo'; }), config: { links: [makeTempoLink('Tempo', '', `\${__data.fields[0]}`, tempoDatasourceUid)], }, }); } return df; } export function buildExpr( metric: { expr: string; params: string[]; topk?: number }, extraParams: string, request: DataQueryRequest ): string { let serviceMapQuery = request.targets[0]?.serviceMapQuery ?? ''; const serviceMapQueries = Array.isArray(serviceMapQuery) ? serviceMapQuery : [serviceMapQuery]; const metricParamsArray = serviceMapQueries.map((query) => { // remove surrounding curly braces from serviceMapQuery const serviceMapQueryMatch = query.match(/^{(.*)}$/); if (serviceMapQueryMatch?.length) { query = serviceMapQueryMatch[1]; } // map serviceGraph metric tags to serviceGraphView metric tags query = query.replace('client', 'service').replace('server', 'service'); return query.includes('span_name') ? metric.params.concat(query) : metric.params .concat(query) .concat(extraParams) .filter((item: string) => item); }); const exprs = metricParamsArray.map((params) => metric.expr.replace('{}', '{' + params.join(',') + '}')); const expr = exprs.join(' OR '); if (metric.topk) { return `topk(${metric.topk}, ${expr})`; } return expr; } export function buildLinkExpr(expr: string) { // don't want top 5 or by span name in links expr = expr.replace('topk(5, ', '').replace(' by (span_name))', ''); return expr.replace('__range', '__rate_interval'); } // query result frames can come back in any order // here we align the table col values to the same row name (rateName) across the table export function getRateAlignedValues( rateResp: DataQueryResponseData[], objToAlign: { [x: string]: { value: string } } ) { const rateNames = rateResp[0]?.fields[1]?.values ?? []; let values: string[] = []; for (let i = 0; i < rateNames.length; i++) { if (Object.keys(objToAlign).includes(rateNames[i])) { values.push(objToAlign[rateNames[i]].value); } else { values.push('0'); } } return values; } export function makeServiceGraphViewRequest(metrics: string[]): PromQuery[] { return metrics.map((metric) => { return { refId: metric, expr: metric, instant: true, }; }); }