grafana/public/app/plugins/datasource/tempo/datasource.ts
Josh Hunt 3c6e0e8ef8
Chore: ESlint import order (#44959)
* Add and configure eslint-plugin-import

* Fix the lint:ts npm command

* Autofix + prettier all the files

* Manually fix remaining files

* Move jquery code in jest-setup to external file to safely reorder imports

* Resolve issue caused by circular dependencies within Prometheus

* Update .betterer.results

* Fix missing // @ts-ignore

* ignore iconBundle.ts

* Fix missing // @ts-ignore
2022-04-22 14:33:13 +01:00

402 lines
13 KiB
TypeScript

import { identity, pick, pickBy, groupBy, startCase } from 'lodash';
import { EMPTY, from, merge, Observable, of, throwError } from 'rxjs';
import { catchError, map, mergeMap, toArray } from 'rxjs/operators';
import {
DataQuery,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceInstanceSettings,
DataSourceJsonData,
isValidGoDuration,
LoadingState,
} from '@grafana/data';
import { config, BackendSrvRequest, DataSourceWithBackend, getBackendSrv } from '@grafana/runtime';
import { NodeGraphOptions } from 'app/core/components/NodeGraphSettings';
import { TraceToLogsOptions } from 'app/core/components/TraceToLogs/TraceToLogsSettings';
import { serializeParams } from 'app/core/utils/fetch';
import { getDatasourceSrv } from 'app/features/plugins/datasource_srv';
import { LokiOptions, LokiQuery } from '../loki/types';
import { PrometheusDatasource } from '../prometheus/datasource';
import { PromQuery } from '../prometheus/types';
import {
failedMetric,
histogramMetric,
mapPromMetricsToServiceMap,
serviceMapMetrics,
totalsMetric,
} from './graphTransform';
import {
transformTrace,
transformTraceList,
transformFromOTLP as transformFromOTEL,
createTableFrameFromSearch,
} from './resultTransformer';
// search = Loki search, nativeSearch = Tempo search for backwards compatibility
export type TempoQueryType = 'search' | 'traceId' | 'serviceMap' | 'upload' | 'nativeSearch' | 'clear';
export interface TempoJsonData extends DataSourceJsonData {
tracesToLogs?: TraceToLogsOptions;
serviceMap?: {
datasourceUid?: string;
};
search?: {
hide?: boolean;
};
nodeGraph?: NodeGraphOptions;
lokiSearch?: {
datasourceUid?: string;
};
}
export interface TempoQuery extends DataQuery {
query: string;
// Query to find list of traces, e.g., via Loki
linkedQuery?: LokiQuery;
search: string;
queryType: TempoQueryType;
serviceName?: string;
spanName?: string;
minDuration?: string;
maxDuration?: string;
limit?: number;
serviceMapQuery?: string;
}
interface SearchQueryParams {
minDuration?: string;
maxDuration?: string;
limit?: number;
tags: string;
start?: number;
end?: number;
}
export const DEFAULT_LIMIT = 20;
export class TempoDatasource extends DataSourceWithBackend<TempoQuery, TempoJsonData> {
tracesToLogs?: TraceToLogsOptions;
serviceMap?: {
datasourceUid?: string;
};
search?: {
hide?: boolean;
};
nodeGraph?: NodeGraphOptions;
lokiSearch?: {
datasourceUid?: string;
};
uploadedJson?: string | ArrayBuffer | null = null;
constructor(private instanceSettings: DataSourceInstanceSettings<TempoJsonData>) {
super(instanceSettings);
this.tracesToLogs = instanceSettings.jsonData.tracesToLogs;
this.serviceMap = instanceSettings.jsonData.serviceMap;
this.search = instanceSettings.jsonData.search;
this.nodeGraph = instanceSettings.jsonData.nodeGraph;
this.lokiSearch = instanceSettings.jsonData.lokiSearch;
}
query(options: DataQueryRequest<TempoQuery>): Observable<DataQueryResponse> {
const subQueries: Array<Observable<DataQueryResponse>> = [];
const filteredTargets = options.targets.filter((target) => !target.hide);
const targets: { [type: string]: TempoQuery[] } = groupBy(filteredTargets, (t) => t.queryType || 'traceId');
if (targets.clear) {
return of({ data: [], state: LoadingState.Done });
}
const logsDatasourceUid = this.getLokiSearchDS();
// Run search queries on linked datasource
if (logsDatasourceUid && targets.search?.length > 0) {
const dsSrv = getDatasourceSrv();
subQueries.push(
from(dsSrv.get(logsDatasourceUid)).pipe(
mergeMap((linkedDatasource: DataSourceApi) => {
// Wrap linked query into a data request based on original request
const linkedRequest: DataQueryRequest = { ...options, targets: targets.search.map((t) => t.linkedQuery!) };
// Find trace matchers in derived fields of the linked datasource that's identical to this datasource
const settings: DataSourceInstanceSettings<LokiOptions> = (linkedDatasource as any).instanceSettings;
const traceLinkMatcher: string[] =
settings.jsonData.derivedFields
?.filter((field) => field.datasourceUid === this.uid && field.matcherRegex)
.map((field) => field.matcherRegex) || [];
if (!traceLinkMatcher || traceLinkMatcher.length === 0) {
return throwError(
() =>
new Error(
'No Loki datasource configured for search. Set up Derived Fields for traces in a Loki datasource settings and link it to this Tempo datasource.'
)
);
} else {
return (linkedDatasource.query(linkedRequest) as Observable<DataQueryResponse>).pipe(
map((response) =>
response.error ? response : transformTraceList(response, this.uid, this.name, traceLinkMatcher)
)
);
}
})
)
);
}
if (targets.nativeSearch?.length) {
try {
const timeRange = config.featureToggles.tempoBackendSearch
? { startTime: options.range.from.unix(), endTime: options.range.to.unix() }
: undefined;
const searchQuery = this.buildSearchQuery(targets.nativeSearch[0], timeRange);
subQueries.push(
this._request('/api/search', searchQuery).pipe(
map((response) => {
return {
data: [createTableFrameFromSearch(response.data.traces, this.instanceSettings)],
};
}),
catchError((error) => {
return of({ error: { message: error.data.message }, data: [] });
})
)
);
} catch (error) {
return of({ error: { message: error.message }, data: [] });
}
}
if (targets.upload?.length) {
if (this.uploadedJson) {
const otelTraceData = JSON.parse(this.uploadedJson as string);
if (!otelTraceData.batches) {
subQueries.push(of({ error: { message: 'JSON is not valid OpenTelemetry format' }, data: [] }));
} else {
subQueries.push(of(transformFromOTEL(otelTraceData.batches, this.nodeGraph?.enabled)));
}
} else {
subQueries.push(of({ data: [], state: LoadingState.Done }));
}
}
if (this.serviceMap?.datasourceUid && targets.serviceMap?.length > 0) {
subQueries.push(serviceMapQuery(options, this.serviceMap.datasourceUid));
}
if (targets.traceId?.length > 0) {
subQueries.push(this.handleTraceIdQuery(options, targets.traceId));
}
return merge(...subQueries);
}
/**
* Handles the simplest of the queries where we have just a trace id and return trace data for it.
* @param options
* @param targets
* @private
*/
private handleTraceIdQuery(
options: DataQueryRequest<TempoQuery>,
targets: TempoQuery[]
): Observable<DataQueryResponse> {
const validTargets = targets.filter((t) => t.query);
if (!validTargets.length) {
return EMPTY;
}
const traceRequest: DataQueryRequest<TempoQuery> = { ...options, targets: validTargets };
return super.query(traceRequest).pipe(
map((response) => {
if (response.error) {
return response;
}
return transformTrace(response, this.nodeGraph?.enabled);
})
);
}
async metadataRequest(url: string, params = {}) {
return await this._request(url, params, { method: 'GET', hideFromInspector: true }).toPromise();
}
private _request(apiUrl: string, data?: any, options?: Partial<BackendSrvRequest>): Observable<Record<string, any>> {
const params = data ? serializeParams(data) : '';
const url = `${this.instanceSettings.url}${apiUrl}${params.length ? `?${params}` : ''}`;
const req = { ...options, url };
return getBackendSrv().fetch(req);
}
async testDatasource(): Promise<any> {
const options: BackendSrvRequest = {
headers: {},
method: 'GET',
url: `${this.instanceSettings.url}/api/echo`,
};
const response = await getBackendSrv().fetch<any>(options).toPromise();
if (response?.ok) {
return { status: 'success', message: 'Data source is working' };
}
}
getQueryDisplayText(query: TempoQuery) {
if (query.queryType === 'nativeSearch') {
let result = [];
for (const key of ['serviceName', 'spanName', 'search', 'minDuration', 'maxDuration', 'limit']) {
if (query.hasOwnProperty(key) && query[key as keyof TempoQuery]) {
result.push(`${startCase(key)}: ${query[key as keyof TempoQuery]}`);
}
}
return result.join(', ');
}
return query.query;
}
buildSearchQuery(query: TempoQuery, timeRange?: { startTime: number; endTime?: number }): SearchQueryParams {
let tags = query.search ?? '';
let tempoQuery = pick(query, ['minDuration', 'maxDuration', 'limit']);
// Remove empty properties
tempoQuery = pickBy(tempoQuery, identity);
if (query.serviceName) {
tags += ` service.name="${query.serviceName}"`;
}
if (query.spanName) {
tags += ` name="${query.spanName}"`;
}
// Set default limit
if (!tempoQuery.limit) {
tempoQuery.limit = DEFAULT_LIMIT;
}
// Validate query inputs and remove spaces if valid
if (tempoQuery.minDuration) {
if (!isValidGoDuration(tempoQuery.minDuration)) {
throw new Error('Please enter a valid min duration.');
}
tempoQuery.minDuration = tempoQuery.minDuration.replace(/\s/g, '');
}
if (tempoQuery.maxDuration) {
if (!isValidGoDuration(tempoQuery.maxDuration)) {
throw new Error('Please enter a valid max duration.');
}
tempoQuery.maxDuration = tempoQuery.maxDuration.replace(/\s/g, '');
}
if (!Number.isInteger(tempoQuery.limit) || tempoQuery.limit <= 0) {
throw new Error('Please enter a valid limit.');
}
let searchQuery: SearchQueryParams = { tags, ...tempoQuery };
if (timeRange) {
searchQuery.start = timeRange.startTime;
searchQuery.end = timeRange.endTime;
}
return searchQuery;
}
async getServiceGraphLabels() {
const ds = await getDatasourceSrv().get(this.serviceMap!.datasourceUid);
return ds.getTagKeys!();
}
async getServiceGraphLabelValues(key: string) {
const ds = await getDatasourceSrv().get(this.serviceMap!.datasourceUid);
return ds.getTagValues!({ key });
}
// Get linked loki search datasource. Fall back to legacy loki search/trace to logs config
getLokiSearchDS = (): string | undefined => {
const legacyLogsDatasourceUid =
this.tracesToLogs?.lokiSearch !== false && this.lokiSearch === undefined
? this.tracesToLogs?.datasourceUid
: undefined;
return this.lokiSearch?.datasourceUid ?? legacyLogsDatasourceUid;
};
}
function queryServiceMapPrometheus(request: DataQueryRequest<PromQuery>, datasourceUid: string) {
return from(getDatasourceSrv().get(datasourceUid)).pipe(
mergeMap((ds) => {
return (ds as PrometheusDatasource).query(request);
})
);
}
function serviceMapQuery(request: DataQueryRequest<TempoQuery>, datasourceUid: string) {
return queryServiceMapPrometheus(makePromServiceMapRequest(request), datasourceUid).pipe(
// Just collect all the responses first before processing into node graph data
toArray(),
map((responses: DataQueryResponse[]) => {
const errorRes = responses.find((res) => !!res.error);
if (errorRes) {
throw new Error(errorRes.error!.message);
}
const { nodes, edges } = mapPromMetricsToServiceMap(responses, request.range);
nodes.fields[0].config = {
links: [
makePromLink(
'Request rate',
`rate(${totalsMetric}{server="\${__data.fields.id}"}[$__rate_interval])`,
datasourceUid
),
makePromLink(
'Request histogram',
`histogram_quantile(0.9, sum(rate(${histogramMetric}{server="\${__data.fields.id}"}[$__rate_interval])) by (le, client, server))`,
datasourceUid
),
makePromLink(
'Failed request rate',
`rate(${failedMetric}{server="\${__data.fields.id}"}[$__rate_interval])`,
datasourceUid
),
],
};
return {
data: [nodes, edges],
state: LoadingState.Done,
};
})
);
}
function makePromLink(title: string, metric: string, datasourceUid: string) {
return {
url: '',
title,
internal: {
query: {
expr: metric,
} as PromQuery,
datasourceUid,
datasourceName: 'Prometheus',
},
};
}
function makePromServiceMapRequest(options: DataQueryRequest<TempoQuery>): DataQueryRequest<PromQuery> {
return {
...options,
targets: serviceMapMetrics.map((metric) => {
return {
refId: metric,
// options.targets[0] is not correct here, but not sure what should happen if you have multiple queries for
// service map at the same time anyway
expr: `delta(${metric}${options.targets[0].serviceMapQuery || ''}[$__range])`,
instant: true,
};
}),
};
}