Query Splitting: Add support for multiple queries (#63663)

* Range splitting: group metric and logs queries

* Range splitting: intercalate logs and metric queries when > 1 queries

* Range splitting: fix possibly undefined error and remove console log

* Range splitting: update test imports

* Range splitting: add unit tests for multiple queries

* Query splitting: use lodash partition function

* Chore: rename variable

* Chore: attempt to improve readability
This commit is contained in:
Matias Chomicki 2023-03-02 10:32:45 +01:00 committed by GitHub
parent f3e08a3560
commit 40ac0fa14b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 61 deletions

View File

@ -33,7 +33,7 @@ import { CustomVariableModel } from '../../../features/variables/types';
import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource';
import { createLokiDatasource, createMetadataRequest } from './mocks';
import { runPartitionedQuery } from './querySplitting';
import { runPartitionedQueries } from './querySplitting';
import { parseToNodeNamesArray } from './queryUtils';
import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types';
import { LokiVariableSupport } from './variables';
@ -1105,7 +1105,7 @@ describe('LokiDatasource', () => {
describe('Query splitting', () => {
beforeAll(() => {
config.featureToggles.lokiQuerySplitting = true;
jest.mocked(runPartitionedQuery).mockReturnValue(
jest.mocked(runPartitionedQueries).mockReturnValue(
of({
data: [],
})
@ -1131,7 +1131,7 @@ describe('LokiDatasource', () => {
});
await expect(ds.query(query)).toEmitValuesWith(() => {
expect(runPartitionedQuery).toHaveBeenCalled();
expect(runPartitionedQueries).toHaveBeenCalled();
});
});
});

View File

@ -68,7 +68,7 @@ import {
getLabelFilterPositions,
} from './modifyQuery';
import { getQueryHints } from './queryHints';
import { runPartitionedQuery } from './querySplitting';
import { runPartitionedQueries } from './querySplitting';
import {
getLogQueryFromMetricsQuery,
getNormalizedLokiQuery,
@ -285,7 +285,7 @@ export class LokiDatasource
}
if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) {
return runPartitionedQuery(this, fixedRequest);
return runPartitionedQueries(this, fixedRequest);
}
return this.runQuery(fixedRequest);

View File

@ -8,10 +8,10 @@ import { LokiDatasource } from './datasource';
import * as logsTimeSplit from './logsTimeSplit';
import * as metricTimeSplit from './metricTimeSplit';
import { createLokiDatasource, getMockFrames } from './mocks';
import { runPartitionedQuery } from './querySplitting';
import { runPartitionedQueries } from './querySplitting';
import { LokiQuery } from './types';
describe('runPartitionedQuery()', () => {
describe('runPartitionedQueries()', () => {
let datasource: LokiDatasource;
const range = {
from: dateTime('2023-02-08T05:00:00.000Z'),
@ -31,7 +31,7 @@ describe('runPartitionedQuery()', () => {
});
test('Splits datasource queries', async () => {
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
@ -41,7 +41,7 @@ describe('runPartitionedQuery()', () => {
jest
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith((values) => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith((values) => {
expect(values).toEqual([{ refId: 'A', message: 'Error' }]);
});
});
@ -63,7 +63,7 @@ describe('runPartitionedQuery()', () => {
jest.mocked(metricTimeSplit.getRangeChunks).mockRestore();
});
test('Ignores hidden queries', async () => {
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
expect(logsTimeSplit.getRangeChunks).toHaveBeenCalled();
expect(metricTimeSplit.getRangeChunks).not.toHaveBeenCalled();
});
@ -80,14 +80,14 @@ describe('runPartitionedQuery()', () => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
});
test('Stops requesting once maxLines of logs have been received', async () => {
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 2 responses of 2 logs, 2 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(2);
});
});
test('Performs all the requests if maxLines has not been reached', async () => {
request.targets[0].maxLines = 9999;
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
@ -95,10 +95,72 @@ describe('runPartitionedQuery()', () => {
test('Performs all the requests if not a log query', async () => {
request.targets[0].maxLines = 1;
request.targets[0].expr = 'count_over_time({a="b"}[1m])';
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
});
describe('Splitting multiple targets', () => {
beforeEach(() => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'A' }));
});
test('Sends logs and metric queries individually', async () => {
const request = getQueryOptions<LokiQuery>({
targets: [
{ expr: '{a="b"}', refId: 'A' },
{ expr: 'count_over_time({a="b"}[1m])', refId: 'B' },
],
range,
});
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 1x Metric + 1x Log, 6 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(6);
});
});
test('Groups metric queries', async () => {
const request = getQueryOptions<LokiQuery>({
targets: [
{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' },
{ expr: 'count_over_time({c="d"}[1m])', refId: 'B' },
],
range,
});
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 1x2 Metric, 3 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
test('Groups logs queries', async () => {
const request = getQueryOptions<LokiQuery>({
targets: [
{ expr: '{a="b"}', refId: 'A' },
{ expr: '{c="d"}', refId: 'B' },
],
range,
});
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 1x2 Logs, 3 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
test('Respects maxLines of logs queries', async () => {
const { logFrameA } = getMockFrames();
const request = getQueryOptions<LokiQuery>({
targets: [
{ expr: '{a="b"}', refId: 'A', maxLines: logFrameA.fields[0].values.length },
{ expr: 'count_over_time({a="b"}[1m])', refId: 'B' },
],
range,
});
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'B' }));
jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' }));
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 1x Logs + 3x Metric, 3 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
});
});
});
});

View File

@ -1,3 +1,4 @@
import { partition } from 'lodash';
import { Subscriber, Observable, Subscription } from 'rxjs';
import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data';
@ -86,31 +87,20 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer
.filter((target) => target.maxLines === undefined || target.maxLines > 0);
}
export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
type LokiGroupedRequest = Array<{ request: DataQueryRequest<LokiQuery>; partition: TimeRange[] }>;
export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest) {
let mergedResponse: DataQueryResponse | null;
const queries = request.targets.filter((query) => !query.hide);
// we assume there is just a single query in the request
const query = queries[0];
const partition = partitionTimeRange(
isLogsQuery(query.expr),
request.range,
request.intervalMs,
query.resolution ?? 1
);
const totalRequests = partition.length;
const totalRequests = Math.max(...requests.map(({ partition }) => partition.length));
let shouldStop = false;
let subquerySubsciption: Subscription | null = null;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
if (shouldStop) {
subscriber.complete();
return;
}
const requestId = `${request.requestId}_${requestN}`;
const range = partition[requestN - 1];
const targets = adjustTargetsFromResponseState(request.targets, mergedResponse);
const done = (response: DataQueryResponse) => {
response.state = LoadingState.Done;
subscriber.next(response);
@ -119,38 +109,47 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
const nextRequest = () => {
mergedResponse = mergedResponse || { data: [] };
if (requestN > 1) {
const { nextRequestN, nextRequestGroup } = getNextRequestPointers(requests, requestGroup, requestN);
if (nextRequestN > 0) {
mergedResponse.state = LoadingState.Streaming;
subscriber.next(mergedResponse);
runNextRequest(subscriber, requestN - 1);
runNextRequest(subscriber, nextRequestN, nextRequestGroup);
return;
}
done(mergedResponse);
};
const group = requests[requestGroup];
const requestId = `${group.request.requestId}_${requestN}`;
const range = group.partition[requestN - 1];
const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse);
if (!targets.length && mergedResponse) {
done(mergedResponse);
nextRequest();
return;
}
subquerySubsciption = datasource.runQuery({ ...request, range, requestId, targets }).subscribe({
next: (partialResponse) => {
if (partialResponse.error) {
subscriber.error(partialResponse.error);
}
mergedResponse = combineResponses(mergedResponse, partialResponse);
},
complete: () => {
nextRequest();
},
error: (error) => {
subscriber.error(error);
},
});
subquerySubsciption = datasource
.runQuery({ ...requests[requestGroup].request, range, requestId, targets })
.subscribe({
next: (partialResponse) => {
if (partialResponse.error) {
subscriber.error(partialResponse.error);
}
mergedResponse = combineResponses(mergedResponse, partialResponse);
},
complete: () => {
nextRequest();
},
error: (error) => {
subscriber.error(error);
},
});
};
const response = new Observable<DataQueryResponse>((subscriber) => {
runNextRequest(subscriber, totalRequests);
runNextRequest(subscriber, totalRequests, 0);
return () => {
shouldStop = true;
if (subquerySubsciption != null) {
@ -161,3 +160,37 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
return response;
}
function getNextRequestPointers(requests: LokiGroupedRequest, requestGroup: number, requestN: number) {
// There's a pending request from the next group:
if (requests[requestGroup + 1]?.partition[requestN - 1]) {
return {
nextRequestGroup: requestGroup + 1,
nextRequestN: requestN,
};
}
return {
nextRequestGroup: 0,
nextRequestN: requestN - 1,
};
}
export function runPartitionedQueries(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
const queries = request.targets.filter((query) => !query.hide);
const [logQueries, metricQueries] = partition(queries, (query) => isLogsQuery(query.expr));
const requests = [];
if (logQueries.length) {
requests.push({
request: { ...request, targets: logQueries },
partition: partitionTimeRange(true, request.range, request.intervalMs, logQueries[0].resolution ?? 1),
});
}
if (metricQueries.length) {
requests.push({
request: { ...request, targets: metricQueries },
partition: partitionTimeRange(false, request.range, request.intervalMs, metricQueries[0].resolution ?? 1),
});
}
return runGroupedQueries(datasource, requests);
}

View File

@ -305,26 +305,14 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
}
export function requestSupportsPartitioning(allQueries: LokiQuery[]) {
const queries = allQueries.filter((query) => !query.hide);
/*
* For now, we only split if there is a single query.
* - we do not split for zero queries
* - we do not split for multiple queries
*/
if (queries.length !== 1) {
return false;
}
const queries = allQueries.filter((query) => !query.hide).filter((query) => !query.refId.includes('do-not-chunk'));
const instantQueries = queries.some((query) => query.queryType === LokiQueryType.Instant);
if (instantQueries) {
return false;
}
if (queries[0].refId.includes('do-not-chunk')) {
return false;
}
return true;
return queries.length > 0;
}
export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {