InfluxDB: Introduce maxDataPoints setting for flux variable query editor (#87935)

* Introduce custom variable support

* Remove comment lines

* introduce maxDataPoints

* fix tests

* update

* fix unit tests

* remove new line
This commit is contained in:
ismail simsek 2024-06-03 11:09:33 +02:00 committed by GitHub
parent 3e57576770
commit c73bbf19a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 136 additions and 101 deletions

View File

@ -29,8 +29,13 @@ func executeQuery(ctx context.Context, logger log.Logger, query queryModel, runn
logger.Warn("Flux query failed", "err", err, "query", flux)
dr.Error = err
} else {
// we only enforce a larger number than maxDataPoints
maxPointsEnforced := int(float64(query.MaxDataPoints) * maxPointsEnforceFactor)
maxPointsEnforced := int(query.MaxDataPoints)
// The default value of MaxDataPoints is 100 when it is not set
// See https://github.com/grafana/grafana/blob/d69b19e431bfe31ff904a48826593e6fa79b7a5b/pkg/services/query/query.go#L322
// So if the default value is being used we fall back to the old logic.
if query.MaxDataPoints == 100 {
maxPointsEnforced *= int(maxPointsEnforceFactor)
}
dr = readDataFrames(logger, tables, maxPointsEnforced, maxSeries)

View File

@ -250,8 +250,8 @@ func TestMaxDataPointsExceededNoAggregate(t *testing.T) {
dr := executeMockedQuery(t, "max_data_points_exceeded", queryModel{MaxDataPoints: 2})
// it should contain the error-message
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2. Try using the aggregateWindow() function in your query to reduce the number of points returned.")
assertDataResponseDimensions(t, dr, 2, 21)
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 3 points to prevent memory issues. At the current graph size, Grafana can only draw 2. Try using the aggregateWindow() function in your query to reduce the number of points returned.")
assertDataResponseDimensions(t, dr, 2, 3)
}
func TestMaxDataPointsExceededWithAggregate(t *testing.T) {
@ -261,8 +261,8 @@ func TestMaxDataPointsExceededWithAggregate(t *testing.T) {
dr := executeMockedQuery(t, "max_data_points_exceeded", queryModel{RawQuery: "aggregateWindow()", MaxDataPoints: 2})
// it should contain the error-message
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 21 points to prevent memory issues. At the current graph size, Grafana can only draw 2.")
assertDataResponseDimensions(t, dr, 2, 21)
require.EqualError(t, dr.Error, "A query returned too many datapoints and the results have been truncated at 3 points to prevent memory issues. At the current graph size, Grafana can only draw 2.")
assertDataResponseDimensions(t, dr, 2, 3)
}
func TestMultivalue(t *testing.T) {

View File

@ -1,72 +1,83 @@
import React from 'react';
import { QueryEditorProps } from '@grafana/data';
import { Field, FieldSet, InlineFieldRow, TextArea } from '@grafana/ui';
import { InlineFieldRow, Input, TextArea } from '@grafana/ui';
import { InlineField } from '@grafana/ui/';
import InfluxDatasource from '../../../datasource';
import { InfluxOptions, InfluxQuery, InfluxVariableQuery, InfluxVersion } from '../../../types';
import { FluxQueryEditor } from '../query/flux/FluxQueryEditor';
export type Props = QueryEditorProps<InfluxDatasource, InfluxQuery, InfluxOptions, InfluxVariableQuery>;
type Props = QueryEditorProps<InfluxDatasource, InfluxQuery, InfluxOptions, InfluxVariableQuery>;
const refId = 'InfluxVariableQueryEditor-VariableQuery';
const useVariableQuery = (query: InfluxVariableQuery | string): InfluxVariableQuery => {
// in legacy variable support query can be only a string
// in new variable support query can be an object and hold more information
// to be able to support old version we check the query here
if (typeof query === 'string') {
return {
refId,
query,
};
} else {
return {
refId,
query: query.query ?? '',
};
}
};
export const InfluxVariableEditor = ({ onChange, datasource, query }: Props) => {
const varQuery = useVariableQuery(query);
const getFluxVariableQuery = (q: InfluxVariableQuery | string) => {
// in legacy variable support query can be only a string
// in new variable support query can be an object and hold more information
// to be able to support old version we check the query here
if (typeof q !== 'string') {
return q;
}
const onChangeHandler = (q: InfluxQuery) => {
onChange({ refId, query: q.query || '' });
};
const onBlurHandler = (e: React.FocusEvent<HTMLTextAreaElement>) => {
onChange({ refId, query: e.currentTarget.value });
return {
refId,
query: q,
maxDataPoints: 1000,
};
};
switch (datasource.version) {
case InfluxVersion.Flux:
return <FluxQueryEditor datasource={datasource} query={varQuery} onChange={onChangeHandler} />;
case InfluxVersion.SQL:
return (
<FieldSet>
<Field htmlFor="influx-sql-variable-query">
<TextArea
id="influx-sql-variable-query"
defaultValue={varQuery.query || ''}
placeholder="metric name or tags query"
rows={1}
onBlur={onBlurHandler}
/>
</Field>
</FieldSet>
<>
<FluxQueryEditor
datasource={datasource}
query={getFluxVariableQuery(query)}
onChange={(q) => {
onChange({ ...query, query: q.query ?? '' });
}}
/>
<InlineFieldRow>
<InlineField
label="Max Data Points"
labelWidth={20}
required
grow
aria-labelledby="flux-maxdatapoints"
tooltip={<div>Upper boundary of data points will return for the variable query.</div>}
>
<Input
id="influx-sql-variable-maxdatapoints"
aria-label="flux-maxdatapoints"
type="number"
defaultValue={query.maxDataPoints ?? 1000}
placeholder="Default is 1000"
onBlur={(e) => {
onChange({
refId,
query: query.query,
maxDataPoints: Number.parseInt(e.currentTarget.value, 10),
});
}}
/>
</InlineField>
</InlineFieldRow>
</>
);
case InfluxVersion.InfluxQL:
default:
return (
<InlineFieldRow>
<InlineField label="Query" labelWidth={20} required grow aria-labelledby="label-select">
<InlineField label="Query" labelWidth={20} required grow aria-labelledby="influx-variable-query">
<TextArea
defaultValue={varQuery.query || ''}
aria-label="influx-variable-query"
defaultValue={query.query}
placeholder="metric name or tags query"
rows={1}
onBlur={onBlurHandler}
onBlur={(e) => {
onChange({ refId, query: query.query ?? '' });
}}
/>
</InlineField>
</InlineFieldRow>

View File

@ -104,10 +104,10 @@ describe('InfluxDataSource Frontend Mode [influxdbBackendMigration=false]', () =
it('should replace $timefilter', async () => {
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'GET' }));
await ds.metricFindQuery(query, queryOptions);
await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(fetchMock.mock.lastCall[0].params?.q).toMatch('time >= 1514764800000ms and time <= 1514851200000ms');
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'POST' }));
await ds.metricFindQuery(query, queryOptions);
await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(fetchMock.mock.lastCall[0].params?.q).toBeFalsy();
expect(fetchMock.mock.lastCall[0].data).toMatch(
'time%20%3E%3D%201514764800000ms%20and%20time%20%3C%3D%201514851200000ms'
@ -116,23 +116,23 @@ describe('InfluxDataSource Frontend Mode [influxdbBackendMigration=false]', () =
it('should not have any data in request body if http mode is GET', async () => {
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'GET' }));
await ds.metricFindQuery(query, queryOptions);
await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(fetchMock.mock.lastCall[0].data).toBeNull();
});
it('should have data in request body if http mode is POST', async () => {
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'POST' }));
await ds.metricFindQuery(query, queryOptions);
await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(fetchMock.mock.lastCall[0].data).not.toBeNull();
expect(fetchMock.mock.lastCall[0].data).toMatch('q=SELECT');
});
it('parse response correctly', async () => {
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'GET' }));
let responseGet = await ds.metricFindQuery(query, queryOptions);
let responseGet = await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(responseGet).toEqual([{ text: 'cpu' }]);
ds = getMockInfluxDS(getMockDSInstanceSettings({ httpMode: 'POST' }));
let responsePost = await ds.metricFindQuery(query, queryOptions);
let responsePost = await ds.metricFindQuery({ refId: 'test', query }, queryOptions);
expect(responsePost).toEqual([{ text: 'cpu' }]);
});
});

View File

@ -48,7 +48,14 @@ import { buildMetadataQuery } from './influxql_query_builder';
import { prepareAnnotation } from './migrations';
import { buildRawQuery, removeRegexWrapper } from './queryUtils';
import ResponseParser from './response_parser';
import { DEFAULT_POLICY, InfluxOptions, InfluxQuery, InfluxQueryTag, InfluxVersion } from './types';
import {
DEFAULT_POLICY,
InfluxOptions,
InfluxQuery,
InfluxQueryTag,
InfluxVariableQuery,
InfluxVersion,
} from './types';
import { InfluxVariableSupport } from './variables';
export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery, InfluxOptions> {
@ -373,7 +380,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
).then(this.toMetricFindValue);
}
async metricFindQuery(query: string, options?: any): Promise<MetricFindValue[]> {
async metricFindQuery(query: InfluxVariableQuery, options?: any): Promise<MetricFindValue[]> {
if (
this.version === InfluxVersion.Flux ||
this.version === InfluxVersion.SQL ||
@ -381,26 +388,28 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
) {
const target: InfluxQuery & SQLQuery = {
refId: 'metricFindQuery',
query,
query: query.query,
rawQuery: true,
...(this.version === InfluxVersion.SQL ? { rawSql: query, format: QueryFormat.Table } : {}),
...(this.version === InfluxVersion.SQL ? { rawSql: query.query, format: QueryFormat.Table } : {}),
};
return lastValueFrom(
super.query({
...(options ?? {}), // includes 'range'
maxDataPoints: query.maxDataPoints,
targets: [target],
})
).then(this.toMetricFindValue);
}
const interpolated = this.templateSrv.replace(
query,
query.query,
options?.scopedVars,
(value: string | string[] = [], variable: QueryVariableModel) => this.interpolateQueryExpr(value, variable, query)
(value: string | string[] = [], variable: QueryVariableModel) =>
this.interpolateQueryExpr(value, variable, query.query)
);
return lastValueFrom(this._seriesQuery(interpolated, options)).then((resp) => {
return this.responseParser.parse(query, resp);
return this.responseParser.parse(query.query, resp);
});
}
@ -421,7 +430,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
database: this.database,
});
return this.metricFindQuery(query);
return this.metricFindQuery({ refId: 'get-tag-keys', query });
}
getTagValues(options: DataSourceGetTagValuesOptions<InfluxQuery>) {
@ -432,7 +441,7 @@ export default class InfluxDatasource extends DataSourceWithBackend<InfluxQuery,
withKey: options.key,
});
return this.metricFindQuery(query);
return this.metricFindQuery({ refId: 'get-tag-values', query });
}
/**

View File

@ -2,20 +2,20 @@ import config from 'app/core/config';
import { getMockInfluxDS } from './__mocks__/datasource';
import { getAllMeasurements, getAllPolicies, getFieldKeys, getTagKeys, getTagValues } from './influxql_metadata_query';
import { InfluxQuery } from './types';
import { InfluxQuery, InfluxVariableQuery } from './types';
describe('influx_metadata_query', () => {
let query: string | undefined;
let query: InfluxVariableQuery;
let target: InfluxQuery;
const mockMetricFindQuery = jest.fn();
const mockRunMetadataQuery = jest.fn();
mockMetricFindQuery.mockImplementation((q: string) => {
mockMetricFindQuery.mockImplementation((q: InfluxVariableQuery) => {
query = q;
return Promise.resolve([]);
});
mockRunMetadataQuery.mockImplementation((t: InfluxQuery) => {
mockRunMetadataQuery.mockImplementation((t: InfluxVariableQuery) => {
target = t;
query = t.query;
query = t;
return Promise.resolve([]);
});
@ -42,7 +42,7 @@ describe('influx_metadata_query', () => {
it('should call metricFindQuery with SHOW RETENTION POLICIES', () => {
getAllPolicies(ds);
frontendModeChecks();
expect(query).toMatch('SHOW RETENTION POLICIES');
expect(query.query).toMatch('SHOW RETENTION POLICIES');
});
});
@ -50,19 +50,19 @@ describe('influx_metadata_query', () => {
it('no tags specified', () => {
getAllMeasurements(ds, []);
frontendModeChecks();
expect(query).toBe('SHOW MEASUREMENTS LIMIT 100');
expect(query.query).toBe('SHOW MEASUREMENTS LIMIT 100');
});
it('with tags', () => {
getAllMeasurements(ds, [{ key: 'key', value: 'val' }]);
frontendModeChecks();
expect(query).toMatch('SHOW MEASUREMENTS WHERE "key"');
expect(query.query).toMatch('SHOW MEASUREMENTS WHERE "key"');
});
it('with measurement filter', () => {
getAllMeasurements(ds, [{ key: 'key', value: 'val' }], 'measurementFilter');
frontendModeChecks();
expect(query).toMatch('SHOW MEASUREMENTS WITH MEASUREMENT =~ /(?i)measurementFilter/ WHERE "key"');
expect(query.query).toMatch('SHOW MEASUREMENTS WITH MEASUREMENT =~ /(?i)measurementFilter/ WHERE "key"');
});
});
@ -70,19 +70,19 @@ describe('influx_metadata_query', () => {
it('no tags specified', () => {
getTagKeys(ds);
frontendModeChecks();
expect(query).toBe('SHOW TAG KEYS');
expect(query.query).toBe('SHOW TAG KEYS');
});
it('with measurement', () => {
getTagKeys(ds, 'test_measurement');
frontendModeChecks();
expect(query).toBe('SHOW TAG KEYS FROM "test_measurement"');
expect(query.query).toBe('SHOW TAG KEYS FROM "test_measurement"');
});
it('with retention policy', () => {
getTagKeys(ds, 'test_measurement', 'rp');
frontendModeChecks();
expect(query).toBe('SHOW TAG KEYS FROM "rp"."test_measurement"');
expect(query.query).toBe('SHOW TAG KEYS FROM "rp"."test_measurement"');
});
});
@ -90,13 +90,13 @@ describe('influx_metadata_query', () => {
it('with key', () => {
getTagValues(ds, [], 'test_key');
frontendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
});
it('with key ends with ::tag', () => {
getTagValues(ds, [], 'test_key::tag');
frontendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
});
it('with key ends with ::field', async () => {
@ -107,13 +107,13 @@ describe('influx_metadata_query', () => {
it('with tags', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key');
frontendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\'');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\'');
});
it('with measurement', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key', 'test_measurement');
frontendModeChecks();
expect(query).toBe(
expect(query.query).toBe(
'SHOW TAG VALUES FROM "test_measurement" WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\''
);
});
@ -121,7 +121,7 @@ describe('influx_metadata_query', () => {
it('with retention policy', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key', 'test_measurement', 'rp');
frontendModeChecks();
expect(query).toBe(
expect(query.query).toBe(
'SHOW TAG VALUES FROM "rp"."test_measurement" WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\''
);
});
@ -131,19 +131,19 @@ describe('influx_metadata_query', () => {
it('with no retention policy', () => {
getFieldKeys(ds, 'test_measurement');
frontendModeChecks();
expect(query).toBe('SHOW FIELD KEYS FROM "test_measurement"');
expect(query.query).toBe('SHOW FIELD KEYS FROM "test_measurement"');
});
it('with empty measurement', () => {
getFieldKeys(ds, '');
frontendModeChecks();
expect(query).toBe('SHOW FIELD KEYS');
expect(query.query).toBe('SHOW FIELD KEYS');
});
it('with retention policy', () => {
getFieldKeys(ds, 'test_measurement', 'rp');
frontendModeChecks();
expect(query).toBe('SHOW FIELD KEYS FROM "rp"."test_measurement"');
expect(query.query).toBe('SHOW FIELD KEYS FROM "rp"."test_measurement"');
});
});
});
@ -165,7 +165,7 @@ describe('influx_metadata_query', () => {
it('should call metricFindQuery with SHOW RETENTION POLICIES', () => {
getAllPolicies(ds);
backendModeChecks();
expect(query).toMatch('SHOW RETENTION POLICIES');
expect(query.query).toMatch('SHOW RETENTION POLICIES');
});
});
@ -173,19 +173,19 @@ describe('influx_metadata_query', () => {
it('no tags specified', () => {
getAllMeasurements(ds, []);
backendModeChecks();
expect(query).toBe('SHOW MEASUREMENTS LIMIT 100');
expect(query.query).toBe('SHOW MEASUREMENTS LIMIT 100');
});
it('with tags', () => {
getAllMeasurements(ds, [{ key: 'key', value: 'val' }]);
backendModeChecks();
expect(query).toMatch('SHOW MEASUREMENTS WHERE "key"');
expect(query.query).toMatch('SHOW MEASUREMENTS WHERE "key"');
});
it('with measurement filter', () => {
getAllMeasurements(ds, [{ key: 'key', value: 'val' }], 'measurementFilter');
backendModeChecks();
expect(query).toMatch('SHOW MEASUREMENTS WITH MEASUREMENT =~ /(?i)measurementFilter/ WHERE "key"');
expect(query.query).toMatch('SHOW MEASUREMENTS WITH MEASUREMENT =~ /(?i)measurementFilter/ WHERE "key"');
});
});
@ -193,19 +193,19 @@ describe('influx_metadata_query', () => {
it('no tags specified', () => {
getTagKeys(ds);
backendModeChecks();
expect(query).toBe('SHOW TAG KEYS');
expect(query.query).toBe('SHOW TAG KEYS');
});
it('with measurement', () => {
getTagKeys(ds, 'test_measurement');
backendModeChecks();
expect(query).toBe('SHOW TAG KEYS FROM "test_measurement"');
expect(query.query).toBe('SHOW TAG KEYS FROM "test_measurement"');
});
it('with retention policy', () => {
getTagKeys(ds, 'test_measurement', 'rp');
backendModeChecks();
expect(query).toBe('SHOW TAG KEYS FROM "rp"."test_measurement"');
expect(query.query).toBe('SHOW TAG KEYS FROM "rp"."test_measurement"');
});
});
@ -213,13 +213,13 @@ describe('influx_metadata_query', () => {
it('with key', () => {
getTagValues(ds, [], 'test_key');
backendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
});
it('with key ends with ::tag', () => {
getTagValues(ds, [], 'test_key::tag');
backendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key"');
});
it('with key ends with ::field', async () => {
@ -230,13 +230,13 @@ describe('influx_metadata_query', () => {
it('with tags', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key');
backendModeChecks();
expect(query).toBe('SHOW TAG VALUES WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\'');
expect(query.query).toBe('SHOW TAG VALUES WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\'');
});
it('with measurement', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key', 'test_measurement');
backendModeChecks();
expect(query).toBe(
expect(query.query).toBe(
'SHOW TAG VALUES FROM "test_measurement" WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\''
);
});
@ -244,7 +244,7 @@ describe('influx_metadata_query', () => {
it('with retention policy', () => {
getTagValues(ds, [{ key: 'tagKey', value: 'tag_val' }], 'test_key', 'test_measurement', 'rp');
backendModeChecks();
expect(query).toBe(
expect(query.query).toBe(
'SHOW TAG VALUES FROM "rp"."test_measurement" WITH KEY = "test_key" WHERE "tagKey" = \'tag_val\''
);
});
@ -254,19 +254,19 @@ describe('influx_metadata_query', () => {
it('with no retention policy', () => {
getFieldKeys(ds, 'test_measurement');
backendModeChecks();
expect(query).toBe('SHOW FIELD KEYS FROM "test_measurement"');
expect(query.query).toBe('SHOW FIELD KEYS FROM "test_measurement"');
});
it('with empty measurement', () => {
getFieldKeys(ds, '');
backendModeChecks();
expect(query).toBe('SHOW FIELD KEYS');
expect(query.query).toBe('SHOW FIELD KEYS');
});
it('with retention policy', () => {
getFieldKeys(ds, 'test_measurement', 'rp');
backendModeChecks();
expect(query).toBe('SHOW FIELD KEYS FROM "rp"."test_measurement"');
expect(query.query).toBe('SHOW FIELD KEYS FROM "rp"."test_measurement"');
});
});
});

View File

@ -40,7 +40,7 @@ const runExploreQuery = async (options: MetadataQueryOptions): Promise<Array<{ t
return datasource.runMetadataQuery(target);
} else {
const options = { policy: target.policy };
return datasource.metricFindQuery(query, options);
return datasource.metricFindQuery({ refId: 'run-explore-query', query }, options);
}
};

View File

@ -62,6 +62,7 @@ export type ResultFormat = 'time_series' | 'table' | 'logs';
export interface InfluxVariableQuery extends DataQuery {
query: string;
maxDataPoints?: number;
}
export interface InfluxQuery extends DataQuery {

View File

@ -31,7 +31,16 @@ export class InfluxVariableSupport extends CustomVariableSupport<InfluxDatasourc
}
const interpolated = this.templateSrv.replace(query, request.scopedVars, this.datasource.interpolateQueryExpr);
const metricFindStream = from(this.datasource.metricFindQuery(interpolated, request.range));
const metricFindStream = from(
this.datasource.metricFindQuery(
{
refId: request.targets[0].refId,
query: interpolated,
maxDataPoints: request.targets[0].maxDataPoints ?? 1000,
},
request.range
)
);
return metricFindStream.pipe(map((results) => ({ data: results })));
}
}