InfluxDB: annotations migration to backend (#45635)

* Annotations backend migration

* Response parser checks

* Moved parse to response parser

* Removed unused imports

* Update type

* response_parser test

* response_parser test for text

* Removed from ctx

* Use vars in annotationQuery

* Tags and tests

* Removed import
This commit is contained in:
Joey Tawadrous 2022-03-23 16:02:29 +00:00 committed by GitHub
parent 7b23c7fe1e
commit aabafee90a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 288 additions and 7 deletions

View File

@ -2,10 +2,15 @@ import { cloneDeep, extend, get, groupBy, has, isString, map as _map, omit, pick
import { lastValueFrom, Observable, of, throwError } from 'rxjs'; import { lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, map } from 'rxjs/operators'; import { catchError, map } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid'; import { v4 as uuidv4 } from 'uuid';
import { DataSourceWithBackend, frameToMetricFindValue, getBackendSrv } from '@grafana/runtime'; import {
BackendDataSourceResponse,
DataSourceWithBackend,
FetchResponse,
frameToMetricFindValue,
getBackendSrv,
} from '@grafana/runtime';
import { import {
AnnotationEvent, AnnotationEvent,
AnnotationQueryRequest,
ArrayVector, ArrayVector,
DataFrame, DataFrame,
DataQueryError, DataQueryError,
@ -22,6 +27,7 @@ import {
TIME_SERIES_TIME_FIELD_NAME, TIME_SERIES_TIME_FIELD_NAME,
TIME_SERIES_VALUE_FIELD_NAME, TIME_SERIES_VALUE_FIELD_NAME,
TimeSeries, TimeSeries,
AnnotationQueryRequest,
} from '@grafana/data'; } from '@grafana/data';
import InfluxSeries from './influx_series'; import InfluxSeries from './influx_series';
import InfluxQueryModel from './influx_query_model'; import InfluxQueryModel from './influx_query_model';
@ -355,6 +361,36 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
}); });
} }
if (config.featureToggles.influxdbBackendMigration && this.access === 'proxy') {
// We want to send our query to the backend as a raw query
const target: InfluxQuery = {
refId: 'metricFindQuery',
datasource: this.getRef(),
query: this.templateSrv.replace(options.annotation.query ?? ''),
rawQuery: true,
};
return lastValueFrom(
getBackendSrv()
.fetch<BackendDataSourceResponse>({
url: '/api/ds/query',
method: 'POST',
data: {
from: options.range.from.valueOf().toString(),
to: options.range.to.valueOf().toString(),
queries: [target],
},
requestId: options.annotation.name,
})
.pipe(
map(
async (res: FetchResponse<BackendDataSourceResponse>) =>
await this.responseParser.transformAnnotationResponse(options, res, target)
)
)
);
}
const timeFilter = this.getTimeFilter({ rangeRaw: options.rangeRaw, timezone: options.dashboard.timezone }); const timeFilter = this.getTimeFilter({ rangeRaw: options.rangeRaw, timezone: options.dashboard.timezone });
let query = options.annotation.query.replace('$timeFilter', timeFilter); let query = options.annotation.query.replace('$timeFilter', timeFilter);
query = this.templateSrv.replace(query, undefined, 'regex'); query = this.templateSrv.replace(query, undefined, 'regex');

View File

@ -1,6 +1,7 @@
import { DataFrame, FieldType, QueryResultMeta } from '@grafana/data'; import { AnnotationEvent, DataFrame, DataQuery, FieldType, QueryResultMeta } from '@grafana/data';
import { toDataQueryResponse } from '@grafana/runtime';
import TableModel from 'app/core/table_model'; import TableModel from 'app/core/table_model';
import { each, groupBy, isArray } from 'lodash'; import { each, flatten, groupBy, isArray } from 'lodash';
import { InfluxQuery } from './types'; import { InfluxQuery } from './types';
export default class ResponseParser { export default class ResponseParser {
@ -67,7 +68,7 @@ export default class ResponseParser {
table = getTableCols(dfs, table, target); table = getTableCols(dfs, table, target);
// if group by tag(s) added // if group by tag(s) added
if (dfs[0].fields[1].labels) { if (dfs[0].fields[1] && dfs[0].fields[1].labels) {
let dfsByLabels: any = groupBy(dfs, (df: DataFrame) => let dfsByLabels: any = groupBy(dfs, (df: DataFrame) =>
df.fields[1].labels ? Object.values(df.fields[1].labels!) : null df.fields[1].labels ? Object.values(df.fields[1].labels!) : null
); );
@ -84,6 +85,82 @@ export default class ResponseParser {
return table; return table;
} }
async transformAnnotationResponse(options: any, data: any, target: InfluxQuery): Promise<AnnotationEvent[]> {
const rsp = toDataQueryResponse(data, [target] as DataQuery[]);
if (rsp) {
const table = this.getTable(rsp.data, target, {});
const list: any[] = [];
let titleCol: any = null;
let timeCol: any = null;
let timeEndCol: any = null;
const tagsCol: any = [];
let textCol: any = null;
each(table.columns, (column, index) => {
if (column.text.toLowerCase() === 'time') {
timeCol = index;
return;
}
if (column.text === options.annotation.titleColumn) {
titleCol = index;
return;
}
if (colContainsTag(column.text, options.annotation.tagsColumn)) {
tagsCol.push(index);
return;
}
if (column.text.includes(options.annotation.textColumn)) {
textCol = index;
return;
}
if (column.text === options.annotation.timeEndColumn) {
timeEndCol = index;
return;
}
// legacy case
if (!titleCol && textCol !== index) {
titleCol = index;
}
});
each(table.rows, (value) => {
const data = {
annotation: options.annotation,
time: +new Date(value[timeCol]),
title: value[titleCol],
timeEnd: value[timeEndCol],
// Remove empty values, then split in different tags for comma separated values
tags: flatten(
tagsCol
.filter((t: any) => {
return value[t];
})
.map((t: any) => {
return value[t].split(',');
})
),
text: value[textCol],
};
list.push(data);
});
return list;
}
return [];
}
}
function colContainsTag(colText: string, tagsColumn: string): boolean {
const tags = (tagsColumn || '').replace(' ', '').split(',');
for (var tag of tags) {
if (colText.includes(tag)) {
return true;
}
}
return false;
} }
function getTableCols(dfs: DataFrame[], table: TableModel, target: InfluxQuery): TableModel { function getTableCols(dfs: DataFrame[], table: TableModel, target: InfluxQuery): TableModel {
@ -105,6 +182,15 @@ function getTableCols(dfs: DataFrame[], table: TableModel, target: InfluxQuery):
} }
}); });
// Get cols for annotationQuery
if (dfs[0].refId === 'metricFindQuery') {
dfs.forEach((field) => {
if (field.name) {
table.columns.push({ text: field.name });
}
});
}
// Select (metric) column(s) // Select (metric) column(s)
for (let i = 0; i < selectedParams.length; i++) { for (let i = 0; i < selectedParams.length; i++) {
table.columns.push({ text: selectedParams[i] }); table.columns.push({ text: selectedParams[i] });
@ -119,10 +205,12 @@ function getTableRows(dfs: DataFrame[], table: TableModel, labels: string[]): Ta
for (let i = 0; i < values.length; i++) { for (let i = 0; i < values.length; i++) {
const time = values[i]; const time = values[i];
const metrics = dfs.map((df: DataFrame) => { const metrics = dfs.map((df: DataFrame) => {
return df.fields[1].values.toArray()[i]; return df.fields[1] ? df.fields[1].values.toArray()[i] : null;
}); });
if (metrics.indexOf(null) < 0) {
table.rows.push([time, ...labels, ...metrics]); table.rows.push([time, ...labels, ...metrics]);
} }
}
return table; return table;
} }

View File

@ -2,6 +2,20 @@ import { size } from 'lodash';
import ResponseParser, { getSelectedParams } from '../response_parser'; import ResponseParser, { getSelectedParams } from '../response_parser';
import InfluxQueryModel from '../influx_query_model'; import InfluxQueryModel from '../influx_query_model';
import { FieldType, MutableDataFrame } from '@grafana/data'; import { FieldType, MutableDataFrame } from '@grafana/data';
import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__
import InfluxDatasource from '../datasource';
import { of } from 'rxjs';
import { FetchResponse } from '@grafana/runtime';
import { TemplateSrvStub } from 'test/specs/helpers';
import config from 'app/core/config';
//@ts-ignore
const templateSrv = new TemplateSrvStub();
jest.mock('@grafana/runtime', () => ({
...(jest.requireActual('@grafana/runtime') as unknown as object),
getBackendSrv: () => backendSrv,
}));
describe('influxdb response parser', () => { describe('influxdb response parser', () => {
const parser = new ResponseParser(); const parser = new ResponseParser();
@ -282,4 +296,147 @@ describe('influxdb response parser', () => {
expect(table.meta?.executedQueryString).toBe('SELECT everything!'); expect(table.meta?.executedQueryString).toBe('SELECT everything!');
}); });
}); });
describe('When issuing annotationQuery', () => {
const ctx: any = {
instanceSettings: { url: 'url', name: 'influxDb' },
};
const fetchMock = jest.spyOn(backendSrv, 'fetch');
const queryOptions: any = {
annotation: {
name: 'Anno',
query: 'select * from logs where time >= now() - 15m and time <= now()',
textColumn: 'textColumn',
tagsColumn: 'host,path',
},
range: {
from: '2018-01-01T00:00:00Z',
to: '2018-01-02T00:00:00Z',
},
};
let response: any;
beforeEach(async () => {
fetchMock.mockImplementation(() => {
return of({
data: {
results: {
metricFindQuery: {
frames: [
{
schema: {
name: 'logs.host',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'],
],
},
},
{
schema: {
name: 'logs.message',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
[
'Station softwareupdated[447]: Adding client 1',
'Station softwareupdated[447]: Adding client 2',
],
],
},
},
{
schema: {
name: 'logs.path',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['/var/log/host/install.log 1', '/var/log/host/install.log 2'],
],
},
},
{
schema: {
name: 'textColumn',
fields: [
{
name: 'time',
type: 'time',
},
{
name: 'value',
type: 'string',
},
],
},
data: {
values: [
[1645208701000, 1645208702000],
['text 1', 'text 2'],
],
},
},
],
},
},
},
} as FetchResponse);
});
ctx.ds = new InfluxDatasource(ctx.instanceSettings, templateSrv);
ctx.ds.access = 'proxy';
config.featureToggles.influxdbBackendMigration = true;
response = await ctx.ds.annotationQuery(queryOptions);
});
it('should return annotation list', () => {
expect(response.length).toBe(2);
expect(response[0].time).toBe(1645208701000);
expect(response[0].title).toBe('Station softwareupdated[447]: Adding client 1');
expect(response[0].text).toBe('text 1');
expect(response[0].tags[0]).toBe('cbfa07e0e3bb 1');
expect(response[0].tags[1]).toBe('/var/log/host/install.log 1');
expect(response[1].time).toBe(1645208702000);
expect(response[1].title).toBe('Station softwareupdated[447]: Adding client 2');
expect(response[1].text).toBe('text 2');
expect(response[1].tags[0]).toBe('cbfa07e0e3bb 2');
expect(response[1].tags[1]).toBe('/var/log/host/install.log 2');
});
});
}); });