diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index 0356244e4d4..b99aeacf8c1 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -33,7 +33,7 @@ import { CustomVariableModel } from '../../../features/variables/types'; import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource'; import { createLokiDatasource, createMetadataRequest } from './mocks'; -import { runPartitionedQuery } from './querySplitting'; +import { runPartitionedQueries } from './querySplitting'; import { parseToNodeNamesArray } from './queryUtils'; import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types'; import { LokiVariableSupport } from './variables'; @@ -1105,7 +1105,7 @@ describe('LokiDatasource', () => { describe('Query splitting', () => { beforeAll(() => { config.featureToggles.lokiQuerySplitting = true; - jest.mocked(runPartitionedQuery).mockReturnValue( + jest.mocked(runPartitionedQueries).mockReturnValue( of({ data: [], }) @@ -1131,7 +1131,7 @@ describe('LokiDatasource', () => { }); await expect(ds.query(query)).toEmitValuesWith(() => { - expect(runPartitionedQuery).toHaveBeenCalled(); + expect(runPartitionedQueries).toHaveBeenCalled(); }); }); }); diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index a8d13744399..67972b9f04d 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -68,7 +68,7 @@ import { getLabelFilterPositions, } from './modifyQuery'; import { getQueryHints } from './queryHints'; -import { runPartitionedQuery } from './querySplitting'; +import { runPartitionedQueries } from './querySplitting'; import { getLogQueryFromMetricsQuery, getNormalizedLokiQuery, @@ -285,7 +285,7 @@ export class LokiDatasource } if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) { - return runPartitionedQuery(this, fixedRequest); + return runPartitionedQueries(this, fixedRequest); } return this.runQuery(fixedRequest); diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/querySplitting.test.ts index abbf0c493ee..b4c41658680 100644 --- a/public/app/plugins/datasource/loki/querySplitting.test.ts +++ b/public/app/plugins/datasource/loki/querySplitting.test.ts @@ -8,10 +8,10 @@ import { LokiDatasource } from './datasource'; import * as logsTimeSplit from './logsTimeSplit'; import * as metricTimeSplit from './metricTimeSplit'; import { createLokiDatasource, getMockFrames } from './mocks'; -import { runPartitionedQuery } from './querySplitting'; +import { runPartitionedQueries } from './querySplitting'; import { LokiQuery } from './types'; -describe('runPartitionedQuery()', () => { +describe('runPartitionedQueries()', () => { let datasource: LokiDatasource; const range = { from: dateTime('2023-02-08T05:00:00.000Z'), @@ -31,7 +31,7 @@ describe('runPartitionedQuery()', () => { }); test('Splits datasource queries', async () => { - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 requests. expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -41,7 +41,7 @@ describe('runPartitionedQuery()', () => { jest .spyOn(datasource, 'runQuery') .mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] })); - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith((values) => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith((values) => { expect(values).toEqual([{ refId: 'A', message: 'Error' }]); }); }); @@ -63,7 +63,7 @@ describe('runPartitionedQuery()', () => { jest.mocked(metricTimeSplit.getRangeChunks).mockRestore(); }); test('Ignores hidden queries', async () => { - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { expect(logsTimeSplit.getRangeChunks).toHaveBeenCalled(); expect(metricTimeSplit.getRangeChunks).not.toHaveBeenCalled(); }); @@ -80,14 +80,14 @@ describe('runPartitionedQuery()', () => { jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); }); test('Stops requesting once maxLines of logs have been received', async () => { - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 2 responses of 2 logs, 2 requests expect(datasource.runQuery).toHaveBeenCalledTimes(2); }); }); test('Performs all the requests if maxLines has not been reached', async () => { request.targets[0].maxLines = 9999; - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 responses of 2 logs, 3 requests expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); @@ -95,10 +95,72 @@ describe('runPartitionedQuery()', () => { test('Performs all the requests if not a log query', async () => { request.targets[0].maxLines = 1; request.targets[0].expr = 'count_over_time({a="b"}[1m])'; - await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { // 3 days, 3 chunks, 3 responses of 2 logs, 3 requests expect(datasource.runQuery).toHaveBeenCalledTimes(3); }); }); }); + + describe('Splitting multiple targets', () => { + beforeEach(() => { + jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'A' })); + }); + test('Sends logs and metric queries individually', async () => { + const request = getQueryOptions({ + targets: [ + { expr: '{a="b"}', refId: 'A' }, + { expr: 'count_over_time({a="b"}[1m])', refId: 'B' }, + ], + range, + }); + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 1x Metric + 1x Log, 6 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(6); + }); + }); + test('Groups metric queries', async () => { + const request = getQueryOptions({ + targets: [ + { expr: 'count_over_time({a="b"}[1m])', refId: 'A' }, + { expr: 'count_over_time({c="d"}[1m])', refId: 'B' }, + ], + range, + }); + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 1x2 Metric, 3 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(3); + }); + }); + test('Groups logs queries', async () => { + const request = getQueryOptions({ + targets: [ + { expr: '{a="b"}', refId: 'A' }, + { expr: '{c="d"}', refId: 'B' }, + ], + range, + }); + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 1x2 Logs, 3 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(3); + }); + }); + test('Respects maxLines of logs queries', async () => { + const { logFrameA } = getMockFrames(); + const request = getQueryOptions({ + targets: [ + { expr: '{a="b"}', refId: 'A', maxLines: logFrameA.fields[0].values.length }, + { expr: 'count_over_time({a="b"}[1m])', refId: 'B' }, + ], + range, + }); + jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'B' })); + jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' })); + + await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 1x Logs + 3x Metric, 3 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(4); + }); + }); + }); }); diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/querySplitting.ts index c4245b6872f..987716eae56 100644 --- a/public/app/plugins/datasource/loki/querySplitting.ts +++ b/public/app/plugins/datasource/loki/querySplitting.ts @@ -1,3 +1,4 @@ +import { partition } from 'lodash'; import { Subscriber, Observable, Subscription } from 'rxjs'; import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data'; @@ -86,31 +87,20 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer .filter((target) => target.maxLines === undefined || target.maxLines > 0); } -export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest) { +type LokiGroupedRequest = Array<{ request: DataQueryRequest; partition: TimeRange[] }>; + +export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest) { let mergedResponse: DataQueryResponse | null; - const queries = request.targets.filter((query) => !query.hide); - // we assume there is just a single query in the request - const query = queries[0]; - const partition = partitionTimeRange( - isLogsQuery(query.expr), - request.range, - request.intervalMs, - query.resolution ?? 1 - ); - const totalRequests = partition.length; + const totalRequests = Math.max(...requests.map(({ partition }) => partition.length)); let shouldStop = false; let subquerySubsciption: Subscription | null = null; - const runNextRequest = (subscriber: Subscriber, requestN: number) => { + const runNextRequest = (subscriber: Subscriber, requestN: number, requestGroup: number) => { if (shouldStop) { subscriber.complete(); return; } - const requestId = `${request.requestId}_${requestN}`; - const range = partition[requestN - 1]; - const targets = adjustTargetsFromResponseState(request.targets, mergedResponse); - const done = (response: DataQueryResponse) => { response.state = LoadingState.Done; subscriber.next(response); @@ -119,38 +109,47 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue const nextRequest = () => { mergedResponse = mergedResponse || { data: [] }; - if (requestN > 1) { + const { nextRequestN, nextRequestGroup } = getNextRequestPointers(requests, requestGroup, requestN); + if (nextRequestN > 0) { mergedResponse.state = LoadingState.Streaming; subscriber.next(mergedResponse); - runNextRequest(subscriber, requestN - 1); + + runNextRequest(subscriber, nextRequestN, nextRequestGroup); return; } done(mergedResponse); }; + const group = requests[requestGroup]; + const requestId = `${group.request.requestId}_${requestN}`; + const range = group.partition[requestN - 1]; + const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse); + if (!targets.length && mergedResponse) { - done(mergedResponse); + nextRequest(); return; } - subquerySubsciption = datasource.runQuery({ ...request, range, requestId, targets }).subscribe({ - next: (partialResponse) => { - if (partialResponse.error) { - subscriber.error(partialResponse.error); - } - mergedResponse = combineResponses(mergedResponse, partialResponse); - }, - complete: () => { - nextRequest(); - }, - error: (error) => { - subscriber.error(error); - }, - }); + subquerySubsciption = datasource + .runQuery({ ...requests[requestGroup].request, range, requestId, targets }) + .subscribe({ + next: (partialResponse) => { + if (partialResponse.error) { + subscriber.error(partialResponse.error); + } + mergedResponse = combineResponses(mergedResponse, partialResponse); + }, + complete: () => { + nextRequest(); + }, + error: (error) => { + subscriber.error(error); + }, + }); }; const response = new Observable((subscriber) => { - runNextRequest(subscriber, totalRequests); + runNextRequest(subscriber, totalRequests, 0); return () => { shouldStop = true; if (subquerySubsciption != null) { @@ -161,3 +160,37 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue return response; } + +function getNextRequestPointers(requests: LokiGroupedRequest, requestGroup: number, requestN: number) { + // There's a pending request from the next group: + if (requests[requestGroup + 1]?.partition[requestN - 1]) { + return { + nextRequestGroup: requestGroup + 1, + nextRequestN: requestN, + }; + } + return { + nextRequestGroup: 0, + nextRequestN: requestN - 1, + }; +} + +export function runPartitionedQueries(datasource: LokiDatasource, request: DataQueryRequest) { + const queries = request.targets.filter((query) => !query.hide); + const [logQueries, metricQueries] = partition(queries, (query) => isLogsQuery(query.expr)); + + const requests = []; + if (logQueries.length) { + requests.push({ + request: { ...request, targets: logQueries }, + partition: partitionTimeRange(true, request.range, request.intervalMs, logQueries[0].resolution ?? 1), + }); + } + if (metricQueries.length) { + requests.push({ + request: { ...request, targets: metricQueries }, + partition: partitionTimeRange(false, request.range, request.intervalMs, metricQueries[0].resolution ?? 1), + }); + } + return runGroupedQueries(datasource, requests); +} diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts index 9d941892e75..e2c3a80092c 100644 --- a/public/app/plugins/datasource/loki/queryUtils.ts +++ b/public/app/plugins/datasource/loki/queryUtils.ts @@ -305,26 +305,14 @@ export function getStreamSelectorsFromQuery(query: string): string[] { } export function requestSupportsPartitioning(allQueries: LokiQuery[]) { - const queries = allQueries.filter((query) => !query.hide); - /* - * For now, we only split if there is a single query. - * - we do not split for zero queries - * - we do not split for multiple queries - */ - if (queries.length !== 1) { - return false; - } + const queries = allQueries.filter((query) => !query.hide).filter((query) => !query.refId.includes('do-not-chunk')); const instantQueries = queries.some((query) => query.queryType === LokiQueryType.Instant); if (instantQueries) { return false; } - if (queries[0].refId.includes('do-not-chunk')) { - return false; - } - - return true; + return queries.length > 0; } export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {