diff --git a/public/app/plugins/datasource/influxdb/datasource.ts b/public/app/plugins/datasource/influxdb/datasource.ts index a1956b3d4db..2a26e4ccd5f 100644 --- a/public/app/plugins/datasource/influxdb/datasource.ts +++ b/public/app/plugins/datasource/influxdb/datasource.ts @@ -2,10 +2,15 @@ import { cloneDeep, extend, get, groupBy, has, isString, map as _map, omit, pick import { lastValueFrom, Observable, of, throwError } from 'rxjs'; import { catchError, map } from 'rxjs/operators'; import { v4 as uuidv4 } from 'uuid'; -import { DataSourceWithBackend, frameToMetricFindValue, getBackendSrv } from '@grafana/runtime'; +import { + BackendDataSourceResponse, + DataSourceWithBackend, + FetchResponse, + frameToMetricFindValue, + getBackendSrv, +} from '@grafana/runtime'; import { AnnotationEvent, - AnnotationQueryRequest, ArrayVector, DataFrame, DataQueryError, @@ -22,6 +27,7 @@ import { TIME_SERIES_TIME_FIELD_NAME, TIME_SERIES_VALUE_FIELD_NAME, TimeSeries, + AnnotationQueryRequest, } from '@grafana/data'; import InfluxSeries from './influx_series'; import InfluxQueryModel from './influx_query_model'; @@ -355,6 +361,36 @@ export default class InfluxDatasource extends DataSourceWithBackend({ + url: '/api/ds/query', + method: 'POST', + data: { + from: options.range.from.valueOf().toString(), + to: options.range.to.valueOf().toString(), + queries: [target], + }, + requestId: options.annotation.name, + }) + .pipe( + map( + async (res: FetchResponse) => + await this.responseParser.transformAnnotationResponse(options, res, target) + ) + ) + ); + } + const timeFilter = this.getTimeFilter({ rangeRaw: options.rangeRaw, timezone: options.dashboard.timezone }); let query = options.annotation.query.replace('$timeFilter', timeFilter); query = this.templateSrv.replace(query, undefined, 'regex'); diff --git a/public/app/plugins/datasource/influxdb/response_parser.ts b/public/app/plugins/datasource/influxdb/response_parser.ts index 871d11c7dd7..93544aded31 100644 --- a/public/app/plugins/datasource/influxdb/response_parser.ts +++ b/public/app/plugins/datasource/influxdb/response_parser.ts @@ -1,6 +1,7 @@ -import { DataFrame, FieldType, QueryResultMeta } from '@grafana/data'; +import { AnnotationEvent, DataFrame, DataQuery, FieldType, QueryResultMeta } from '@grafana/data'; +import { toDataQueryResponse } from '@grafana/runtime'; import TableModel from 'app/core/table_model'; -import { each, groupBy, isArray } from 'lodash'; +import { each, flatten, groupBy, isArray } from 'lodash'; import { InfluxQuery } from './types'; export default class ResponseParser { @@ -67,7 +68,7 @@ export default class ResponseParser { table = getTableCols(dfs, table, target); // if group by tag(s) added - if (dfs[0].fields[1].labels) { + if (dfs[0].fields[1] && dfs[0].fields[1].labels) { let dfsByLabels: any = groupBy(dfs, (df: DataFrame) => df.fields[1].labels ? Object.values(df.fields[1].labels!) : null ); @@ -84,6 +85,82 @@ export default class ResponseParser { return table; } + + async transformAnnotationResponse(options: any, data: any, target: InfluxQuery): Promise { + const rsp = toDataQueryResponse(data, [target] as DataQuery[]); + + if (rsp) { + const table = this.getTable(rsp.data, target, {}); + const list: any[] = []; + let titleCol: any = null; + let timeCol: any = null; + let timeEndCol: any = null; + const tagsCol: any = []; + let textCol: any = null; + + each(table.columns, (column, index) => { + if (column.text.toLowerCase() === 'time') { + timeCol = index; + return; + } + if (column.text === options.annotation.titleColumn) { + titleCol = index; + return; + } + if (colContainsTag(column.text, options.annotation.tagsColumn)) { + tagsCol.push(index); + return; + } + if (column.text.includes(options.annotation.textColumn)) { + textCol = index; + return; + } + if (column.text === options.annotation.timeEndColumn) { + timeEndCol = index; + return; + } + // legacy case + if (!titleCol && textCol !== index) { + titleCol = index; + } + }); + + each(table.rows, (value) => { + const data = { + annotation: options.annotation, + time: +new Date(value[timeCol]), + title: value[titleCol], + timeEnd: value[timeEndCol], + // Remove empty values, then split in different tags for comma separated values + tags: flatten( + tagsCol + .filter((t: any) => { + return value[t]; + }) + .map((t: any) => { + return value[t].split(','); + }) + ), + text: value[textCol], + }; + + list.push(data); + }); + + return list; + } + return []; + } +} + +function colContainsTag(colText: string, tagsColumn: string): boolean { + const tags = (tagsColumn || '').replace(' ', '').split(','); + for (var tag of tags) { + if (colText.includes(tag)) { + return true; + } + } + return false; } function getTableCols(dfs: DataFrame[], table: TableModel, target: InfluxQuery): TableModel { @@ -105,6 +182,15 @@ function getTableCols(dfs: DataFrame[], table: TableModel, target: InfluxQuery): } }); + // Get cols for annotationQuery + if (dfs[0].refId === 'metricFindQuery') { + dfs.forEach((field) => { + if (field.name) { + table.columns.push({ text: field.name }); + } + }); + } + // Select (metric) column(s) for (let i = 0; i < selectedParams.length; i++) { table.columns.push({ text: selectedParams[i] }); @@ -119,9 +205,11 @@ function getTableRows(dfs: DataFrame[], table: TableModel, labels: string[]): Ta for (let i = 0; i < values.length; i++) { const time = values[i]; const metrics = dfs.map((df: DataFrame) => { - return df.fields[1].values.toArray()[i]; + return df.fields[1] ? df.fields[1].values.toArray()[i] : null; }); - table.rows.push([time, ...labels, ...metrics]); + if (metrics.indexOf(null) < 0) { + table.rows.push([time, ...labels, ...metrics]); + } } return table; } diff --git a/public/app/plugins/datasource/influxdb/specs/response_parser.test.ts b/public/app/plugins/datasource/influxdb/specs/response_parser.test.ts index 9922301e048..b9f988c497c 100644 --- a/public/app/plugins/datasource/influxdb/specs/response_parser.test.ts +++ b/public/app/plugins/datasource/influxdb/specs/response_parser.test.ts @@ -2,6 +2,20 @@ import { size } from 'lodash'; import ResponseParser, { getSelectedParams } from '../response_parser'; import InfluxQueryModel from '../influx_query_model'; import { FieldType, MutableDataFrame } from '@grafana/data'; +import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__ +import InfluxDatasource from '../datasource'; +import { of } from 'rxjs'; +import { FetchResponse } from '@grafana/runtime'; +import { TemplateSrvStub } from 'test/specs/helpers'; +import config from 'app/core/config'; + +//@ts-ignore +const templateSrv = new TemplateSrvStub(); + +jest.mock('@grafana/runtime', () => ({ + ...(jest.requireActual('@grafana/runtime') as unknown as object), + getBackendSrv: () => backendSrv, +})); describe('influxdb response parser', () => { const parser = new ResponseParser(); @@ -282,4 +296,147 @@ describe('influxdb response parser', () => { expect(table.meta?.executedQueryString).toBe('SELECT everything!'); }); }); + + describe('When issuing annotationQuery', () => { + const ctx: any = { + instanceSettings: { url: 'url', name: 'influxDb' }, + }; + + const fetchMock = jest.spyOn(backendSrv, 'fetch'); + + const queryOptions: any = { + annotation: { + name: 'Anno', + query: 'select * from logs where time >= now() - 15m and time <= now()', + textColumn: 'textColumn', + tagsColumn: 'host,path', + }, + range: { + from: '2018-01-01T00:00:00Z', + to: '2018-01-02T00:00:00Z', + }, + }; + let response: any; + + beforeEach(async () => { + fetchMock.mockImplementation(() => { + return of({ + data: { + results: { + metricFindQuery: { + frames: [ + { + schema: { + name: 'logs.host', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'], + ], + }, + }, + { + schema: { + name: 'logs.message', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + [ + 'Station softwareupdated[447]: Adding client 1', + 'Station softwareupdated[447]: Adding client 2', + ], + ], + }, + }, + { + schema: { + name: 'logs.path', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['/var/log/host/install.log 1', '/var/log/host/install.log 2'], + ], + }, + }, + { + schema: { + name: 'textColumn', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['text 1', 'text 2'], + ], + }, + }, + ], + }, + }, + }, + } as FetchResponse); + }); + + ctx.ds = new InfluxDatasource(ctx.instanceSettings, templateSrv); + ctx.ds.access = 'proxy'; + config.featureToggles.influxdbBackendMigration = true; + response = await ctx.ds.annotationQuery(queryOptions); + }); + + it('should return annotation list', () => { + expect(response.length).toBe(2); + expect(response[0].time).toBe(1645208701000); + expect(response[0].title).toBe('Station softwareupdated[447]: Adding client 1'); + expect(response[0].text).toBe('text 1'); + expect(response[0].tags[0]).toBe('cbfa07e0e3bb 1'); + expect(response[0].tags[1]).toBe('/var/log/host/install.log 1'); + expect(response[1].time).toBe(1645208702000); + expect(response[1].title).toBe('Station softwareupdated[447]: Adding client 2'); + expect(response[1].text).toBe('text 2'); + expect(response[1].tags[0]).toBe('cbfa07e0e3bb 2'); + expect(response[1].tags[1]).toBe('/var/log/host/install.log 2'); + }); + }); });