Loki Range Splitting: Calculate dynamic maxLines per target based on the current response state (#63248)

* Range splitting: update maxLines for logs queries

* Range splitting: add unit tests for dynamic limit requesting
This commit is contained in:
Matias Chomicki 2023-02-13 07:59:20 +01:00 committed by GitHub
parent 985c61d700
commit 84fdb7f908
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 127 additions and 99 deletions

View File

@ -34,7 +34,7 @@ import { CustomVariableModel } from '../../../features/variables/types';
import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource';
import { createLokiDatasource, createMetadataRequest } from './mocks';
import { runPartitionedQuery } from './querySplitting';
import { parseToNodeNamesArray, requestSupportsPartitioning } from './queryUtils';
import { parseToNodeNamesArray } from './queryUtils';
import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types';
import { LokiVariableSupport } from './variables';
@ -45,13 +45,6 @@ jest.mock('@grafana/runtime', () => {
};
});
jest.mock('./queryUtils', () => {
return {
...jest.requireActual('./queryUtils'),
requestSupportsPartitioning: jest.fn(),
};
});
jest.mock('./querySplitting');
const templateSrvStub = {
@ -1111,7 +1104,6 @@ describe('LokiDatasource', () => {
describe('Query splitting', () => {
beforeAll(() => {
jest.mocked(requestSupportsPartitioning).mockReturnValue(true);
config.featureToggles.lokiQuerySplitting = true;
jest.mocked(runPartitionedQuery).mockReturnValue(
of({
@ -1121,12 +1113,20 @@ describe('LokiDatasource', () => {
});
afterAll(() => {
config.featureToggles.lokiQuerySplitting = false;
jest.mocked(requestSupportsPartitioning).mockReturnValue(false);
});
it('supports query splitting when the requirements are met', async () => {
it.each([
[[{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }]],
[[{ expr: '{a="b"}', refId: 'A' }]],
[
[
{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', hide: true },
{ expr: '{a="b"}', refId: 'B' },
],
],
])('supports query splitting when the requirements are met', async (targets: LokiQuery[]) => {
const ds = createLokiDatasource(templateSrvStub);
const query = getQueryOptions<LokiQuery>({
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }],
targets,
app: CoreApp.Dashboard,
});

View File

@ -4,22 +4,25 @@ import { getQueryOptions } from 'test/helpers/getQueryOptions';
import { dateTime } from '@grafana/data';
import { LokiDatasource } from './datasource';
import { createLokiDatasource } from './mocks';
import * as logsTimeSplit from './logsTimeSplit';
import * as metricTimeSplit from './metricTimeSplit';
import { createLokiDatasource, logFrameA } from './mocks';
import { runPartitionedQuery } from './querySplitting';
import { LokiQuery } from './types';
describe('runPartitionedQuery()', () => {
let datasource: LokiDatasource;
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }],
range: {
const range = {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
raw: {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
raw: {
from: dateTime('2023-02-08T05:00:00.000Z'),
to: dateTime('2023-02-10T06:00:00.000Z'),
},
},
};
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }],
range,
});
beforeEach(() => {
datasource = createLokiDatasource();
@ -32,4 +35,59 @@ describe('runPartitionedQuery()', () => {
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
describe('Hidden queries', () => {
const request = getQueryOptions<LokiQuery>({
targets: [
{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', hide: true },
{ expr: '{a="b"}', refId: 'B' },
],
range,
});
beforeAll(() => {
jest.spyOn(logsTimeSplit, 'getRangeChunks').mockReturnValue([]);
jest.spyOn(metricTimeSplit, 'getRangeChunks').mockReturnValue([]);
});
afterAll(() => {
jest.mocked(logsTimeSplit.getRangeChunks).mockRestore();
jest.mocked(metricTimeSplit.getRangeChunks).mockRestore();
});
test('Ignores hidden queries', async () => {
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
expect(logsTimeSplit.getRangeChunks).toHaveBeenCalled();
expect(metricTimeSplit.getRangeChunks).not.toHaveBeenCalled();
});
});
});
describe('Dynamic maxLines for logs requests', () => {
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 4 }],
range,
});
beforeEach(() => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
});
test('Stops requesting once maxLines of logs have been received', async () => {
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 2 responses of 2 logs, 2 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(2);
});
});
test('Performs all the requests if maxLines has not been reached', async () => {
request.targets[0].maxLines = 9999;
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
test('Performs all the requests if not a log query', async () => {
request.targets[0].maxLines = 1;
request.targets[0].expr = 'count_over_time({a="b"}[1m])';
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
});
});

View File

@ -6,7 +6,7 @@ import { LoadingState } from '@grafana/schema';
import { LokiDatasource } from './datasource';
import { getRangeChunks as getLogsRangeChunks } from './logsTimeSplit';
import { getRangeChunks as getMetricRangeChunks } from './metricTimeSplit';
import { combineResponses, isLogsQuery, resultLimitReached } from './queryUtils';
import { combineResponses, isLogsQuery } from './queryUtils';
import { LokiQuery } from './types';
/**
@ -56,6 +56,36 @@ export function partitionTimeRange(
});
}
/**
* Based in the state of the current response, if any, adjust target parameters such as `maxLines`.
* For `maxLines`, we will update it as `maxLines - current amount of lines`.
* At the end, we will filter the targets that don't need to be executed in the next request batch,
* becasue, for example, the `maxLines` have been reached.
*/
function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] {
if (!response) {
return targets;
}
return targets
.map((target) => {
if (!target.maxLines || !isLogsQuery(target.expr)) {
return target;
}
const targetFrame = response.data.find((frame) => frame.refId === target.refId);
if (!targetFrame) {
return target;
}
const updatedMaxLines = target.maxLines - targetFrame.length;
return {
...target,
maxLines: updatedMaxLines < 0 ? 0 : updatedMaxLines,
};
})
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
}
export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
let mergedResponse: DataQueryResponse | null;
const queries = request.targets.filter((query) => !query.hide);
@ -72,8 +102,21 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
const requestId = `${request.requestId}_${requestN}`;
const range = partition[requestN - 1];
const targets = adjustTargetsFromResponseState(request.targets, mergedResponse);
const done = (response: DataQueryResponse) => {
response.state = LoadingState.Done;
subscriber.next(response);
subscriber.complete();
};
if (!targets.length && mergedResponse) {
done(mergedResponse);
return;
}
datasource
.runQuery({ ...request, range, requestId })
.runQuery({ ...request, range, requestId, targets })
.pipe(
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
map((partialResponse) => {
@ -83,15 +126,13 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
)
.subscribe({
next: (response) => {
if (requestN > 1 && resultLimitReached(request, response) === false) {
if (requestN > 1) {
response.state = LoadingState.Streaming;
subscriber.next(response);
runNextRequest(subscriber, requestN - 1);
return;
}
response.state = LoadingState.Done;
subscriber.next(response);
subscriber.complete();
done(response);
},
error: (error) => {
subscriber.error(error);

View File

@ -1,6 +1,4 @@
import { getQueryOptions } from 'test/helpers/getQueryOptions';
import { ArrayVector, DataQueryResponse, FieldType } from '@grafana/data';
import { ArrayVector, DataQueryResponse } from '@grafana/data';
import { logFrameA, logFrameB, metricFrameA, metricFrameB } from './mocks';
import {
@ -13,7 +11,6 @@ import {
parseToNodeNamesArray,
getParserFromQuery,
obfuscate,
resultLimitReached,
combineResponses,
} from './queryUtils';
import { LokiQuery, LokiQueryType } from './types';
@ -300,52 +297,6 @@ describe('getParserFromQuery', () => {
});
});
describe('resultLimitReached', () => {
const result = {
data: [
{
name: 'test',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([1, 2]),
},
{
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['line1', 'line2']),
},
],
length: 2,
},
],
};
it('returns false for non-logs queries', () => {
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }],
});
expect(resultLimitReached(request, result)).toBe(false);
});
it('returns false when the limit is not reached', () => {
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 3 }],
});
expect(resultLimitReached(request, result)).toBe(false);
});
it('returns true when the limit is reached', () => {
const request = getQueryOptions<LokiQuery>({
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 2 }],
});
expect(resultLimitReached(request, result)).toBe(true);
});
});
describe('combineResponses', () => {
it('combines logs frames', () => {
const responseA: DataQueryResponse = {

View File

@ -1,7 +1,7 @@
import { SyntaxNode } from '@lezer/common';
import { escapeRegExp } from 'lodash';
import { DataQueryRequest, DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data';
import { DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data';
import {
parser,
LineFilter,
@ -359,25 +359,3 @@ function combineMetadata(dest: DataQueryResponseData = {}, source: DataQueryResp
}
});
}
/**
* Checks if the current response has reached the requested amount of results or not.
* For log queries, we will ensure that the current amount of results doesn't go beyond `maxLines`.
*/
export function resultLimitReached(request: DataQueryRequest<LokiQuery>, result: DataQueryResponse) {
const logRequests = request.targets.filter((target) => isLogsQuery(target.expr));
if (logRequests.length === 0) {
return false;
}
for (const request of logRequests) {
for (const frame of result.data) {
if (request.maxLines && frame?.fields[0].values.length >= request.maxLines) {
return true;
}
}
}
return false;
}