Files
grafana/public/app/plugins/datasource/influxdb/datasource.ts
Brett Buddin a5a85e0398 InfluxDB: Send retention policy with InfluxQL queries if its been specified. (#62149)
* InfluxDB: Send retention policy with InfluQL queries if it's been specified.

In InfluxDB v2, due to technical limitations of the InfluxDB v1
compatibility layer, retention policies in a query (e.g. "<retention
policy>.<measurement_name>") aren't honored and must be specified in the
URL query parameter `rp` to be applied. Grafana doesn't send this query
parameter which results in all queries resolving to the default
retention policy when querying InfluxDB v2 servers using InfluxQL.

This addresses the issue by sending the `rp` query parameter for queries
that have specified a retention policy in the `target` given to
`runExploreQuery`.

The outcomes are:

1. InfluxQL queries executed against InfluxDB v2 databases will have the
   necessary retention policy information for queries like `SHOW FIELD
   KEYS FROM measurement` to function correctly.
2. InfluxQL queries executed against InfluxDB v1 databases will be
   unaffected, because this `rp` query parameter is unsupported there.

You can read more about the rentention policy mapping behavior of
InfluxDB v2 in our documentation:

- https://docs.influxdata.com/influxdb/v2.6/reference/api/influxdb-1x/dbrp/#when-querying-data
- https://docs.influxdata.com/influxdb/v2.6/reference/api/influxdb-1x/query/#query-a-non-default-retention-policy

* Use the ? operator

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>

Co-authored-by: Ryan McKinley <ryantxu@gmail.com>
2023-01-26 19:14:02 +01:00

524 lines
15 KiB
TypeScript

import { extend, groupBy, has, isString, omit, pick, reduce } from 'lodash';
import { lastValueFrom, merge, Observable, of, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import {
AnnotationEvent,
DataQueryError,
DataQueryRequest,
DataQueryResponse,
DataSourceInstanceSettings,
dateMath,
MetricFindValue,
ScopedVars,
toDataFrame,
} from '@grafana/data';
import {
BackendDataSourceResponse,
DataSourceWithBackend,
FetchResponse,
frameToMetricFindValue,
getBackendSrv,
} from '@grafana/runtime';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { AnnotationEditor } from './components/AnnotationEditor';
import { FluxQueryEditor } from './components/FluxQueryEditor';
import { BROWSER_MODE_DISABLED_MESSAGE } from './constants';
import InfluxQueryModel from './influx_query_model';
import { prepareAnnotation } from './migrations';
import { buildRawQuery } from './queryUtils';
import { InfluxQueryBuilder } from './query_builder';
import ResponseParser from './response_parser';
import { InfluxOptions, InfluxQuery, InfluxVersion } from './types';
export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery, InfluxOptions> {
type: string;
urls: string[];
username: string;
password: string;
name: string;
database: any;
basicAuth: any;
withCredentials: any;
access: 'direct' | 'proxy';
interval: any;
responseParser: any;
httpMode: string;
isFlux: boolean;
isProxyAccess: boolean;
constructor(
instanceSettings: DataSourceInstanceSettings<InfluxOptions>,
private readonly templateSrv: TemplateSrv = getTemplateSrv()
) {
super(instanceSettings);
this.type = 'influxdb';
this.urls = (instanceSettings.url ?? '').split(',').map((url) => {
return url.trim();
});
this.username = instanceSettings.username ?? '';
this.password = instanceSettings.password ?? '';
this.name = instanceSettings.name;
this.database = instanceSettings.database;
this.basicAuth = instanceSettings.basicAuth;
this.withCredentials = instanceSettings.withCredentials;
this.access = instanceSettings.access;
const settingsData = instanceSettings.jsonData || ({} as InfluxOptions);
this.interval = settingsData.timeInterval;
this.httpMode = settingsData.httpMode || 'GET';
this.responseParser = new ResponseParser();
this.isFlux = settingsData.version === InfluxVersion.Flux;
this.isProxyAccess = instanceSettings.access === 'proxy';
if (this.isFlux) {
// When flux, use an annotation processor rather than the `annotationQuery` lifecycle
this.annotations = {
QueryEditor: FluxQueryEditor,
};
} else {
this.annotations = {
QueryEditor: AnnotationEditor,
prepareAnnotation,
};
}
}
query(request: DataQueryRequest<InfluxQuery>): Observable<DataQueryResponse> {
if (!this.isProxyAccess) {
const error = new Error(BROWSER_MODE_DISABLED_MESSAGE);
return throwError(() => error);
}
// for not-flux queries we call `this.classicQuery`, and that
// handles the is-hidden situation.
// for the flux-case, we do the filtering here
const filteredRequest = {
...request,
targets: request.targets.filter((t) => t.hide !== true),
};
if (this.isFlux) {
return super.query(filteredRequest);
}
if (filteredRequest.targets.some((target: InfluxQuery) => target.fromAnnotations)) {
const streams: Array<Observable<DataQueryResponse>> = [];
for (const target of filteredRequest.targets) {
if (target.query) {
streams.push(
new Observable((subscriber) => {
this.annotationEvents(filteredRequest, target)
.then((events) => subscriber.next({ data: [toDataFrame(events)] }))
.catch((ex) => subscriber.error(new Error(ex)))
.finally(() => subscriber.complete());
})
);
}
}
return merge(...streams);
}
return super.query(filteredRequest).pipe(
map((res) => {
if (res.error) {
throw {
message: 'InfluxDB Error: ' + res.error.message,
res,
};
}
const seriesList: any[] = [];
const groupedFrames = groupBy(res.data, (x) => x.refId);
if (Object.keys(groupedFrames).length > 0) {
filteredRequest.targets.forEach((target) => {
const filteredFrames = groupedFrames[target.refId] ?? [];
switch (target.resultFormat) {
case 'logs':
case 'table':
seriesList.push(
this.responseParser.getTable(filteredFrames, target, {
preferredVisualisationType: target.resultFormat,
})
);
break;
default: {
for (let i = 0; i < filteredFrames.length; i++) {
seriesList.push(filteredFrames[i]);
}
break;
}
}
});
}
return { data: seriesList };
})
);
}
getQueryDisplayText(query: InfluxQuery) {
if (this.isFlux) {
return query.query;
}
return new InfluxQueryModel(query).render(false);
}
/**
* Returns false if the query should be skipped
*/
filterQuery(query: InfluxQuery): boolean {
if (this.isFlux) {
return !!query.query;
}
return true;
}
applyTemplateVariables(query: InfluxQuery, scopedVars: ScopedVars): Record<string, any> {
// We want to interpolate these variables on backend
const { __interval, __interval_ms, ...rest } = scopedVars;
if (this.isFlux) {
return {
...query,
query: this.templateSrv.replace(query.query ?? '', rest), // The raw query text
};
}
query = this.applyVariables(query, scopedVars, rest);
return query;
}
async annotationEvents(options: DataQueryRequest, annotation: InfluxQuery): Promise<AnnotationEvent[]> {
if (this.isFlux) {
return Promise.reject({
message: 'Flux requires the standard annotation query',
});
}
// InfluxQL puts a query string on the annotation
if (!annotation.query) {
return Promise.reject({
message: 'Query missing in annotation definition',
});
}
// We want to send our query to the backend as a raw query
const target: InfluxQuery = {
refId: 'metricFindQuery',
datasource: this.getRef(),
query: this.templateSrv.replace(annotation.query, undefined, 'regex'),
rawQuery: true,
};
return lastValueFrom(
getBackendSrv()
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
headers: this.getRequestHeaders(),
data: {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries: [target],
},
requestId: annotation.name,
})
.pipe(
map(
async (res: FetchResponse<BackendDataSourceResponse>) =>
await this.responseParser.transformAnnotationResponse(annotation, res, target)
)
)
);
}
targetContainsTemplate(target: any) {
// for flux-mode we just take target.query,
// for influxql-mode we use InfluxQueryModel to create the text-representation
const queryText = this.isFlux ? target.query : buildRawQuery(target);
return this.templateSrv.containsTemplate(queryText);
}
interpolateVariablesInQueries(queries: InfluxQuery[], scopedVars: ScopedVars): InfluxQuery[] {
if (!queries || queries.length === 0) {
return [];
}
return queries.map((query) => {
if (this.isFlux) {
return {
...query,
datasource: this.getRef(),
query: this.templateSrv.replace(query.query ?? '', scopedVars, 'regex'), // The raw query text
};
}
return {
...query,
datasource: this.getRef(),
...this.applyVariables(query, scopedVars, scopedVars),
};
});
}
applyVariables(query: InfluxQuery, scopedVars: ScopedVars, rest: ScopedVars) {
const expandedQuery = { ...query };
if (query.groupBy) {
expandedQuery.groupBy = query.groupBy.map((groupBy) => {
return {
...groupBy,
params: groupBy.params?.map((param) => {
return this.templateSrv.replace(param.toString(), undefined, 'regex');
}),
};
});
}
if (query.select) {
expandedQuery.select = query.select.map((selects) => {
return selects.map((select: any) => {
return {
...select,
params: select.params?.map((param: any) => {
return this.templateSrv.replace(param.toString(), undefined, 'regex');
}),
};
});
});
}
if (query.tags) {
expandedQuery.tags = query.tags.map((tag) => {
return {
...tag,
value: this.templateSrv.replace(tag.value, scopedVars, 'regex'),
};
});
}
return {
...expandedQuery,
adhocFilters: this.templateSrv.getAdhocFilters(this.name) ?? [],
query: this.templateSrv.replace(query.query ?? '', rest, 'regex'), // The raw query text
alias: this.templateSrv.replace(query.alias ?? '', scopedVars),
limit: this.templateSrv.replace(query.limit?.toString() ?? '', scopedVars, 'regex'),
measurement: this.templateSrv.replace(query.measurement ?? '', scopedVars, 'regex'),
policy: this.templateSrv.replace(query.policy ?? '', scopedVars, 'regex'),
slimit: this.templateSrv.replace(query.slimit?.toString() ?? '', scopedVars, 'regex'),
tz: this.templateSrv.replace(query.tz ?? '', scopedVars),
};
}
async metricFindQuery(query: string, options?: any): Promise<MetricFindValue[]> {
if (this.isFlux) {
const target: InfluxQuery = {
refId: 'metricFindQuery',
query,
rawQuery: true,
};
return lastValueFrom(
super.query({
...options, // includes 'range'
targets: [target],
} as DataQueryRequest)
).then((rsp) => {
if (rsp.data?.length) {
return frameToMetricFindValue(rsp.data[0]);
}
return [];
});
}
const interpolated = this.templateSrv.replace(query, undefined, 'regex');
return lastValueFrom(this._seriesQuery(interpolated, options)).then((resp) => {
return this.responseParser.parse(query, resp);
});
}
getTagKeys(options: any = {}) {
const queryBuilder = new InfluxQueryBuilder({ measurement: options.measurement || '', tags: [] }, this.database);
const query = queryBuilder.buildExploreQuery('TAG_KEYS');
return this.metricFindQuery(query, options);
}
getTagValues(options: any = {}) {
const queryBuilder = new InfluxQueryBuilder({ measurement: options.measurement || '', tags: [] }, this.database);
const query = queryBuilder.buildExploreQuery('TAG_VALUES', options.key);
return this.metricFindQuery(query, options);
}
_seriesQuery(query: string, options?: any) {
if (!query) {
return of({ results: [] });
}
if (options && options.range) {
const timeFilter = this.getTimeFilter({ rangeRaw: options.range, timezone: options.timezone });
query = query.replace('$timeFilter', timeFilter);
}
return this._influxRequest(this.httpMode, '/query', { q: query, epoch: 'ms' }, options);
}
serializeParams(params: any) {
if (!params) {
return '';
}
return reduce(
params,
(memo, value, key) => {
if (value === null || value === undefined) {
return memo;
}
memo.push(encodeURIComponent(key) + '=' + encodeURIComponent(value));
return memo;
},
[] as string[]
).join('&');
}
_influxRequest(method: string, url: string, data: any, options?: any) {
const currentUrl = this.urls.shift()!;
this.urls.push(currentUrl);
const params: any = {};
if (this.username) {
params.u = this.username;
params.p = this.password;
}
if (options && options.database) {
params.db = options.database;
} else if (this.database) {
params.db = this.database;
}
if (options?.policy) {
params.rp = options.policy;
}
const { q } = data;
if (method === 'POST' && has(data, 'q')) {
// verb is POST and 'q' param is defined
extend(params, omit(data, ['q']));
data = this.serializeParams(pick(data, ['q']));
} else if (method === 'GET' || method === 'POST') {
// verb is GET, or POST without 'q' param
extend(params, data);
data = null;
}
const req: any = {
method: method,
url: currentUrl + url,
params: params,
data: data,
precision: 'ms',
inspect: { type: 'influxdb' },
paramSerializer: this.serializeParams,
};
req.headers = req.headers || {};
if (this.basicAuth || this.withCredentials) {
req.withCredentials = true;
}
if (this.basicAuth) {
req.headers.Authorization = this.basicAuth;
}
if (method === 'POST') {
req.headers['Content-type'] = 'application/x-www-form-urlencoded';
}
return getBackendSrv()
.fetch(req)
.pipe(
map((result: any) => {
const { data } = result;
if (data) {
data.executedQueryString = q;
if (data.results) {
const errors = result.data.results.filter((elem: any) => elem.error);
if (errors.length > 0) {
throw {
message: 'InfluxDB Error: ' + errors[0].error,
data,
};
}
}
}
return data;
}),
catchError((err) => {
if (err.cancelled) {
return of(err);
}
return throwError(this.handleErrors(err));
})
);
}
handleErrors(err: any) {
const error: DataQueryError = {
message:
(err && err.status) ||
(err && err.message) ||
'Unknown error during query transaction. Please check JS console logs.',
};
if ((Number.isInteger(err.status) && err.status !== 0) || err.status >= 300) {
if (err.data && err.data.error) {
error.message = 'InfluxDB Error: ' + err.data.error;
error.data = err.data;
// @ts-ignore
error.config = err.config;
} else {
error.message = 'Network Error: ' + err.statusText + '(' + err.status + ')';
error.data = err.data;
// @ts-ignore
error.config = err.config;
}
}
return error;
}
getTimeFilter(options: any) {
const from = this.getInfluxTime(options.rangeRaw.from, false, options.timezone);
const until = this.getInfluxTime(options.rangeRaw.to, true, options.timezone);
return 'time >= ' + from + ' and time <= ' + until;
}
getInfluxTime(date: any, roundUp: any, timezone: any) {
if (isString(date)) {
if (date === 'now') {
return 'now()';
}
const parts = /^now-(\d+)([dhms])$/.exec(date);
if (parts) {
const amount = parseInt(parts[1], 10);
const unit = parts[2];
return 'now() - ' + amount + unit;
}
date = dateMath.parse(date, roundUp, timezone);
}
return date.valueOf() + 'ms';
}
}