From 80c432e524754f7997aebc8bed3bce7555fd67bd Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Mon, 3 Jul 2023 15:50:08 +0300 Subject: [PATCH] InfluxDB: Fix backend mode table result with aliases (#69943) Co-authored-by: ludovio --- pkg/tsdb/influxdb/response_parser.go | 199 ++++--- .../influxdb/response_parser.test.ts | 562 ++++++++++++++---- .../datasource/influxdb/response_parser.ts | 7 +- 3 files changed, 563 insertions(+), 205 deletions(-) diff --git a/pkg/tsdb/influxdb/response_parser.go b/pkg/tsdb/influxdb/response_parser.go index 1808131057d..d2bf49725a7 100644 --- a/pkg/tsdb/influxdb/response_parser.go +++ b/pkg/tsdb/influxdb/response_parser.go @@ -70,7 +70,7 @@ func transformRows(rows []Row, query Query) data.Frames { } frames := make([]*data.Frame, 0, len(rows)+cols) - // frameName is pre-allocated so we can reuse it, saving memory. + // frameName is pre-allocated. So we can reuse it, saving memory. // It's sized for a reasonably-large name, but will grow if needed. frameName := make([]byte, 0, 128) @@ -87,106 +87,15 @@ func transformRows(rows []Row, query Query) data.Frames { } if !hasTimeCol { - var values []string - - if retentionPolicyQuery { - values = make([]string, 1, len(row.Values)) - } else { - values = make([]string, 0, len(row.Values)) - } - - for _, valuePair := range row.Values { - if tagValuesQuery { - if len(valuePair) >= 2 { - values = append(values, valuePair[1].(string)) - } - } else if retentionPolicyQuery { - // We want to know whether the given retention policy is the default one or not. - // If it is default policy then we should add it to the beginning. - // The index 4 gives us if that policy is default or not. - // https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies - // Only difference is v0.9. In that version we don't receive shardGroupDuration value. - // https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies - // Since it is always the last value we will check that last value always. - if len(valuePair) >= 1 { - if valuePair[len(row.Columns)-1].(bool) { - values[0] = valuePair[0].(string) - } else { - values = append(values, valuePair[0].(string)) - } - } - } else { - if len(valuePair) >= 1 { - values = append(values, valuePair[0].(string)) - } - } - } - - field := data.NewField("Value", nil, values) - frames = append(frames, data.NewFrame(row.Name, field)) + newFrame := newFrameWithoutTimeField(row, retentionPolicyQuery, tagValuesQuery) + frames = append(frames, newFrame) } else { for colIndex, column := range row.Columns { if column == "time" { continue } - - var timeArray []time.Time - var floatArray []*float64 - var stringArray []*string - var boolArray []*bool - valType := typeof(row.Values, colIndex) - - for _, valuePair := range row.Values { - timestamp, timestampErr := parseTimestamp(valuePair[0]) - // we only add this row if the timestamp is valid - if timestampErr == nil { - timeArray = append(timeArray, timestamp) - switch valType { - case "string": - { - value, chk := valuePair[colIndex].(string) - if chk { - stringArray = append(stringArray, &value) - } else { - stringArray = append(stringArray, nil) - } - } - case "json.Number": - value := parseNumber(valuePair[colIndex]) - floatArray = append(floatArray, value) - case "bool": - value, chk := valuePair[colIndex].(bool) - if chk { - boolArray = append(boolArray, &value) - } else { - boolArray = append(boolArray, nil) - } - case "null": - floatArray = append(floatArray, nil) - } - } - } - - name := string(formatFrameName(row, column, query, frameName[:])) - - timeField := data.NewField("Time", nil, timeArray) - if valType == "string" { - valueField := data.NewField("Value", row.Tags, stringArray) - valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name}) - frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField)) - } else if valType == "json.Number" { - valueField := data.NewField("Value", row.Tags, floatArray) - valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name}) - frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField)) - } else if valType == "bool" { - valueField := data.NewField("Value", row.Tags, boolArray) - valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name}) - frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField)) - } else if valType == "null" { - valueField := data.NewField("Value", row.Tags, floatArray) - valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name}) - frames = append(frames, newDataFrame(name, query.RawQuery, timeField, valueField)) - } + newFrame := newFrameWithTimeField(row, column, colIndex, query, frameName) + frames = append(frames, newFrame) } } } @@ -194,6 +103,104 @@ func transformRows(rows []Row, query Query) data.Frames { return frames } +func newFrameWithTimeField(row Row, column string, colIndex int, query Query, frameName []byte) *data.Frame { + var timeArray []time.Time + var floatArray []*float64 + var stringArray []*string + var boolArray []*bool + valType := typeof(row.Values, colIndex) + + for _, valuePair := range row.Values { + timestamp, timestampErr := parseTimestamp(valuePair[0]) + // we only add this row if the timestamp is valid + if timestampErr != nil { + continue + } + + timeArray = append(timeArray, timestamp) + switch valType { + case "string": + value, ok := valuePair[colIndex].(string) + if ok { + stringArray = append(stringArray, &value) + } else { + stringArray = append(stringArray, nil) + } + case "json.Number": + value := parseNumber(valuePair[colIndex]) + floatArray = append(floatArray, value) + case "bool": + value, ok := valuePair[colIndex].(bool) + if ok { + boolArray = append(boolArray, &value) + } else { + boolArray = append(boolArray, nil) + } + case "null": + floatArray = append(floatArray, nil) + } + } + + timeField := data.NewField("Time", nil, timeArray) + + var valueField *data.Field + + switch valType { + case "string": + valueField = data.NewField("Value", row.Tags, stringArray) + case "json.Number": + valueField = data.NewField("Value", row.Tags, floatArray) + case "bool": + valueField = data.NewField("Value", row.Tags, boolArray) + case "null": + valueField = data.NewField("Value", row.Tags, floatArray) + } + + name := string(formatFrameName(row, column, query, frameName[:])) + valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name}) + return newDataFrame(name, query.RawQuery, timeField, valueField) +} + +func newFrameWithoutTimeField(row Row, retentionPolicyQuery bool, tagValuesQuery bool) *data.Frame { + var values []string + + if retentionPolicyQuery { + values = make([]string, 1, len(row.Values)) + } else { + values = make([]string, 0, len(row.Values)) + } + + for _, valuePair := range row.Values { + if tagValuesQuery { + if len(valuePair) >= 2 { + values = append(values, valuePair[1].(string)) + } + } else if retentionPolicyQuery { + // We want to know whether the given retention policy is the default one or not. + // If it is default policy then we should add it to the beginning. + // The index 4 gives us if that policy is default or not. + // https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-retention-policies + // Only difference is v0.9. In that version we don't receive shardGroupDuration value. + // https://archive.docs.influxdata.com/influxdb/v0.9/query_language/schema_exploration/#show-retention-policies + // Since it is always the last value we will check that last value always. + if len(valuePair) >= 1 { + if valuePair[len(row.Columns)-1].(bool) { + values[0] = valuePair[0].(string) + } else { + values = append(values, valuePair[0].(string)) + } + } + } else { + if len(valuePair) >= 1 { + values = append(values, valuePair[0].(string)) + } + } + } + + field := data.NewField("Value", nil, values) + return data.NewFrame(row.Name, field) +} + func newDataFrame(name string, queryString string, timeField *data.Field, valueField *data.Field) *data.Frame { frame := data.NewFrame(name, timeField, valueField) frame.Meta = &data.FrameMeta{ diff --git a/public/app/plugins/datasource/influxdb/response_parser.test.ts b/public/app/plugins/datasource/influxdb/response_parser.test.ts index a5d9be396fd..c10712c2b33 100644 --- a/public/app/plugins/datasource/influxdb/response_parser.test.ts +++ b/public/app/plugins/datasource/influxdb/response_parser.test.ts @@ -1,7 +1,7 @@ import { size } from 'lodash'; import { of } from 'rxjs'; -import { AnnotationEvent, DataQueryRequest, dateTime, FieldType, MutableDataFrame } from '@grafana/data'; +import { AnnotationEvent, DataFrame, DataQueryRequest, dateTime, FieldType, MutableDataFrame } from '@grafana/data'; import { FetchResponse } from '@grafana/runtime'; import config from 'app/core/config'; import { backendSrv } from 'app/core/services/backend_srv'; // will use the version in __mocks__ @@ -296,6 +296,27 @@ describe('influxdb response parser', () => { }); }); + describe('table with aliases', () => { + it('should parse the table with alias', () => { + const table = parser.getTable(mockDataFramesWithAlias, mockQuery, { preferredVisualisationType: 'table' }); + expect(table.columns.length).toBe(4); + expect(table.columns[0].text).toBe('Time'); + expect(table.columns[1].text).toBe('geohash'); + expect(table.columns[2].text).toBe('ALIAS1'); + expect(table.columns[3].text).toBe('ALIAS2'); + }); + + it('should parse the table when there is no alias and two field selects', () => { + const table = parser.getTable(mockDataframesWithTwoFieldSelect, mockQueryWithTwoFieldSelect, { + preferredVisualisationType: 'table', + }); + expect(table.columns.length).toBe(3); + expect(table.columns[0].text).toBe('Time'); + expect(table.columns[1].text).toBe('mean'); + expect(table.columns[2].text).toBe('mean_1'); + }); + }); + describe('When issuing annotationQuery', () => { const ctx = { ds: getMockDS(getMockDSInstanceSettings()), @@ -330,114 +351,8 @@ describe('influxdb response parser', () => { let response: AnnotationEvent[]; beforeEach(async () => { - const mockResponse: FetchResponse = { - config: { url: '' }, - headers: new Headers(), - ok: false, - redirected: false, - status: 0, - statusText: '', - type: 'basic', - url: '', - data: { - results: { - metricFindQuery: { - frames: [ - { - schema: { - name: 'logs.host', - fields: [ - { - name: 'time', - type: 'time', - }, - { - name: 'value', - type: 'string', - }, - ], - }, - data: { - values: [ - [1645208701000, 1645208702000], - ['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'], - ], - }, - }, - { - schema: { - name: 'logs.message', - fields: [ - { - name: 'time', - type: 'time', - }, - { - name: 'value', - type: 'string', - }, - ], - }, - data: { - values: [ - [1645208701000, 1645208702000], - [ - 'Station softwareupdated[447]: Adding client 1', - 'Station softwareupdated[447]: Adding client 2', - ], - ], - }, - }, - { - schema: { - name: 'logs.path', - fields: [ - { - name: 'time', - type: 'time', - }, - { - name: 'value', - type: 'string', - }, - ], - }, - data: { - values: [ - [1645208701000, 1645208702000], - ['/var/log/host/install.log 1', '/var/log/host/install.log 2'], - ], - }, - }, - { - schema: { - name: 'textColumn', - fields: [ - { - name: 'time', - type: 'time', - }, - { - name: 'value', - type: 'string', - }, - ], - }, - data: { - values: [ - [1645208701000, 1645208702000], - ['text 1', 'text 2'], - ], - }, - }, - ], - }, - }, - }, - }; - fetchMock.mockImplementation(() => { - return of(mockResponse); + return of(annotationMockResponse); }); config.featureToggles.influxdbBackendMigration = true; @@ -459,3 +374,434 @@ describe('influxdb response parser', () => { }); }); }); + +const mockQuery: InfluxQuery = { + datasource: { + type: 'influxdb', + uid: '12345', + }, + groupBy: [ + { + params: ['$__interval'], + type: 'time', + }, + { + type: 'tag', + params: ['geohash::tag'], + }, + { + params: ['null'], + type: 'fill', + }, + ], + measurement: 'cpu', + orderByTime: 'ASC', + policy: 'bar', + refId: 'A', + resultFormat: 'table', + select: [ + [ + { + type: 'field', + params: ['value'], + }, + { + type: 'mean', + params: [], + }, + { + type: 'alias', + params: ['ALIAS1'], + }, + ], + [ + { + type: 'field', + params: ['value'], + }, + { + type: 'mean', + params: [], + }, + { + type: 'alias', + params: ['ALIAS2'], + }, + ], + ], + tags: [], +}; + +const mockDataFramesWithAlias: DataFrame[] = [ + { + name: 'cpu.ALIAS1 { geohash: tz6h548nc111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'tz6h548nc111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS1 { geohash: tz6h548nc111 }', + }, + values: [null, 111.98024577663908, null], + }, + ], + length: 1801, + }, + { + name: 'cpu.ALIAS2 { geohash: tz6h548nc111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'tz6h548nc111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS2 { geohash: tz6h548nc111 }', + }, + values: [null, 111.98024577663908, null], + }, + ], + length: 1801, + }, + { + name: 'cpu.ALIAS1 { geohash: wj7c61wnv111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'wj7c61wnv111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS1 { geohash: wj7c61wnv111 }', + }, + values: [null, 112.97136059147347, null], + }, + ], + length: 1801, + }, + { + name: 'cpu.ALIAS2 { geohash: wj7c61wnv111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'wj7c61wnv111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS2 { geohash: wj7c61wnv111 }', + }, + values: [null, 112.97136059147347, null], + }, + ], + length: 1801, + }, + { + name: 'cpu.ALIAS1 { geohash: wr50zpuhj111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'wr50zpuhj111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS1 { geohash: wr50zpuhj111 }', + }, + values: [null, 112.27638560052755, null], + }, + ], + length: 1801, + }, + { + name: 'cpu.ALIAS2 { geohash: wr50zpuhj111 }', + refId: 'A', + meta: { + executedQueryString: + 'SELECT mean("value") AS "ALIAS1", mean("value") AS "ALIAS2" FROM "bar"."cpu" WHERE time >= 1686582333244ms and time <= 1686583233244ms GROUP BY time(500ms), "geohash"::tag fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686582333000, 1686582333500, 1686582334000], + }, + { + name: 'Value', + type: FieldType.number, + labels: { + geohash: 'wr50zpuhj111', + }, + config: { + displayNameFromDS: 'cpu.ALIAS2 { geohash: wr50zpuhj111 }', + }, + values: [null, 112.27638560052755, null], + }, + ], + length: 1801, + }, +]; + +const mockDataframesWithTwoFieldSelect: DataFrame[] = [ + { + name: 'cpu.mean', + refId: 'A', + meta: { + typeVersion: [0, 0], + executedQueryString: + 'SELECT mean("value"), mean("value") FROM "bar"."cpu" WHERE time >= 1686585763070ms and time <= 1686585793070ms GROUP BY time(10ms) fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686585763070, 1686585763080, 1686585763090], + }, + { + name: 'Value', + type: FieldType.number, + config: { + displayNameFromDS: 'cpu.mean', + }, + values: [null, 87.42703187930438, null], + }, + ], + length: 3, + }, + { + name: 'cpu.mean_1', + refId: 'A', + meta: { + typeVersion: [0, 0], + executedQueryString: + 'SELECT mean("value"), mean("value") FROM "bar"."cpu" WHERE time >= 1686585763070ms and time <= 1686585793070ms GROUP BY time(10ms) fill(null) ORDER BY time ASC', + }, + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: [1686585763070, 1686585763080, 1686585763090], + }, + { + name: 'Value', + type: FieldType.number, + config: { + displayNameFromDS: 'cpu.mean_1', + }, + values: [87.3, 87.4, 87.5], + }, + ], + length: 3, + }, +]; + +const mockQueryWithTwoFieldSelect: InfluxQuery = { + datasource: { + type: 'influxdb', + uid: '1234', + }, + groupBy: [ + { + params: ['$__interval'], + type: 'time', + }, + { + params: ['null'], + type: 'fill', + }, + ], + measurement: 'cpu', + orderByTime: 'ASC', + policy: 'bar', + refId: 'A', + resultFormat: 'table', + select: [ + [ + { + type: 'field', + params: ['value'], + }, + { + type: 'mean', + params: [], + }, + ], + [ + { + type: 'field', + params: ['value'], + }, + { + type: 'mean', + params: [], + }, + ], + ], + tags: [], +}; + +const annotationMockResponse: FetchResponse = { + config: { url: '' }, + headers: new Headers(), + ok: false, + redirected: false, + status: 0, + statusText: '', + type: 'basic', + url: '', + data: { + results: { + metricFindQuery: { + frames: [ + { + schema: { + name: 'logs.host', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['cbfa07e0e3bb 1', 'cbfa07e0e3bb 2'], + ], + }, + }, + { + schema: { + name: 'logs.message', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['Station softwareupdated[447]: Adding client 1', 'Station softwareupdated[447]: Adding client 2'], + ], + }, + }, + { + schema: { + name: 'logs.path', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['/var/log/host/install.log 1', '/var/log/host/install.log 2'], + ], + }, + }, + { + schema: { + name: 'textColumn', + fields: [ + { + name: 'time', + type: 'time', + }, + { + name: 'value', + type: 'string', + }, + ], + }, + data: { + values: [ + [1645208701000, 1645208702000], + ['text 1', 'text 2'], + ], + }, + }, + ], + }, + }, + }, +}; diff --git a/public/app/plugins/datasource/influxdb/response_parser.ts b/public/app/plugins/datasource/influxdb/response_parser.ts index b34e8f91bc6..796ebc5812e 100644 --- a/public/app/plugins/datasource/influxdb/response_parser.ts +++ b/public/app/plugins/datasource/influxdb/response_parser.ts @@ -258,7 +258,12 @@ export function getSelectedParams(target: InfluxQuery): string[] { target.select?.forEach((select) => { const selector = select.filter((x) => x.type !== 'field'); if (selector.length > 0) { - allParams.push(selector[0].type); + const aliasIfExist = selector.find((s) => s.type === 'alias'); + if (aliasIfExist) { + allParams.push(aliasIfExist.params?.[0].toString() ?? ''); + } else { + allParams.push(selector[0].type); + } } else { if (select[0] && select[0].params && select[0].params[0]) { allParams.push(select[0].params[0].toString());