From 94241f6676cf36824a180c11c2824ad41c81a470 Mon Sep 17 00:00:00 2001 From: Matias Chomicki Date: Thu, 9 Feb 2023 18:27:02 +0100 Subject: [PATCH] Loki Query Splitting: Split queries into sub-queries with smaller time interval (#62767) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Range splitting: range splitting function * Range splitting: experiment with 1 hour splits * Range splitting: reorganize code * Range splitting: improve code readability and meaning * Range splitting: add partition limit to prevent infinite loops * Range splitting: add error handling * Range splitting: disable for logs queries * Range splitting: support any arbitrary time splitting + respect original from/to in the partition * Chore: remove console logs * Chore: delete unused import * Range splitting: actually send requests in sequence * Range splitting: do not split when > 1 query * Range splitting: combine frames * Chore: rename function * split in reverse * polished reversing * keep reference to the right frame in the result * Range splitting: change request state to Streaming * Range splitting: fix moving only 1 unit of time instead of the provided one * Chore: change default parameter to timeShift = 1 * Range splitting: do not split for range queqries * Range splitting: add initial support for log queries * Range splitting: do not use MutableDataFrame It has bad performance and it's not required * Chore: remove unused export * Query Splitting: move to module * loki: split: fix off-by-one error (#62966) loki: split: fix off-by-one loop * Range splitting: disable for logs volume queries * Range splitting: combine any number of fields, not just hardcoded 2 * Range splitting: optimize frame-combining function * Range splitting: further optimize * Range splitting: combine frame length * Range splitting: combine stats * Range splitting: combine stats without assuming the same order * Query splitting: catch and raise errors * Range splitting: create feature flag * Range splitting: implement feature flag * Range splitting: add unit test for datasource query * Range splitting: add basic test for runPartitionedQuery * Range splitting: add unit test for resultLimitReached * Range splitting: test frame merging * Chore: fix unit test --------- Co-authored-by: Sven Grossmann Co-authored-by: Gábor Farkas --- .betterer.results | 6 + .../feature-toggles/index.md | 1 + .../src/types/featureToggles.gen.ts | 1 + pkg/services/featuremgmt/registry.go | 6 + pkg/services/featuremgmt/toggles_gen.go | 4 + .../datasource/loki/datasource.test.ts | 40 ++++- .../app/plugins/datasource/loki/datasource.ts | 28 ++- .../datasource/loki/metricTimeSplit.test.ts | 29 ++++ .../datasource/loki/metricTimeSplit.ts | 58 +++++++ public/app/plugins/datasource/loki/mocks.ts | 142 ++++++++++++++- .../datasource/loki/querySplitting.test.ts | 35 ++++ .../plugins/datasource/loki/querySplitting.ts | 93 ++++++++++ .../datasource/loki/queryUtils.test.ts | 161 ++++++++++++++++++ .../app/plugins/datasource/loki/queryUtils.ts | 92 ++++++++++ 14 files changed, 685 insertions(+), 11 deletions(-) create mode 100644 public/app/plugins/datasource/loki/metricTimeSplit.test.ts create mode 100644 public/app/plugins/datasource/loki/metricTimeSplit.ts create mode 100644 public/app/plugins/datasource/loki/querySplitting.test.ts create mode 100644 public/app/plugins/datasource/loki/querySplitting.ts diff --git a/.betterer.results b/.betterer.results index 21e2d29019f..cbfe4b579d2 100644 --- a/.betterer.results +++ b/.betterer.results @@ -6032,6 +6032,12 @@ exports[`better eslint`] = { "public/app/plugins/datasource/loki/getDerivedFields.ts:5381": [ [0, 0, 0, "Unexpected any. Specify a different type.", "0"] ], + "public/app/plugins/datasource/loki/querySplitting.ts:5381": [ + [0, 0, 0, "Do not use any type assertions.", "0"], + [0, 0, 0, "Unexpected any. Specify a different type.", "1"], + [0, 0, 0, "Do not use any type assertions.", "2"], + [0, 0, 0, "Unexpected any. Specify a different type.", "3"] + ], "public/app/plugins/datasource/loki/querybuilder/binaryScalarOperations.ts:5381": [ [0, 0, 0, "Unexpected any. Specify a different type.", "0"] ], diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 85e04057306..f33947e010d 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -91,6 +91,7 @@ Alpha features might be changed or removed without prior notice. | `alertingBacktesting` | Rule backtesting API for alerting | | `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files | | `logsContextDatasourceUi` | Allow datasource to provide custom UI for context view | +| `lokiQuerySplitting` | Split large interval queries into subqueries with smaller time intervals | ## Development feature toggles diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 255e31de55d..4c3281c3657 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -83,4 +83,5 @@ export interface FeatureToggles { topNavCommandPalette?: boolean; logsSampleInExplore?: boolean; logsContextDatasourceUi?: boolean; + lokiQuerySplitting?: boolean; } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index f14fde64bd7..323a46327ad 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -378,5 +378,11 @@ var ( State: FeatureStateAlpha, FrontendOnly: true, }, + { + Name: "lokiQuerySplitting", + Description: "Split large interval queries into subqueries with smaller time intervals", + State: FeatureStateAlpha, + FrontendOnly: true, + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 35cbddb10ee..931e2f2a063 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -274,4 +274,8 @@ const ( // FlagLogsContextDatasourceUi // Allow datasource to provide custom UI for context view FlagLogsContextDatasourceUi = "logsContextDatasourceUi" + + // FlagLokiQuerySplitting + // Split large interval queries into subqueries with smaller time intervals + FlagLokiQuerySplitting = "lokiQuerySplitting" ) diff --git a/public/app/plugins/datasource/loki/datasource.test.ts b/public/app/plugins/datasource/loki/datasource.test.ts index 1b19a7acb6d..f77c804065a 100644 --- a/public/app/plugins/datasource/loki/datasource.test.ts +++ b/public/app/plugins/datasource/loki/datasource.test.ts @@ -20,6 +20,7 @@ import { import { BackendSrv, BackendSrvRequest, + config, FetchResponse, getBackendSrv, reportInteraction, @@ -32,7 +33,8 @@ import { CustomVariableModel } from '../../../features/variables/types'; import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource'; import { createLokiDatasource, createMetadataRequest } from './mocks'; -import { parseToNodeNamesArray } from './queryUtils'; +import { runPartitionedQuery } from './querySplitting'; +import { parseToNodeNamesArray, requestSupportsPartitioning } from './queryUtils'; import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types'; import { LokiVariableSupport } from './variables'; @@ -43,6 +45,15 @@ jest.mock('@grafana/runtime', () => { }; }); +jest.mock('./queryUtils', () => { + return { + ...jest.requireActual('./queryUtils'), + requestSupportsPartitioning: jest.fn(), + }; +}); + +jest.mock('./querySplitting'); + const templateSrvStub = { getAdhocFilters: jest.fn(() => [] as unknown[]), replace: jest.fn((a: string, ...rest: unknown[]) => a), @@ -1097,6 +1108,33 @@ describe('LokiDatasource', () => { ); }); }); + + describe('Query splitting', () => { + beforeAll(() => { + jest.mocked(requestSupportsPartitioning).mockReturnValue(true); + config.featureToggles.lokiQuerySplitting = true; + jest.mocked(runPartitionedQuery).mockReturnValue( + of({ + data: [], + }) + ); + }); + afterAll(() => { + config.featureToggles.lokiQuerySplitting = false; + jest.mocked(requestSupportsPartitioning).mockReturnValue(false); + }); + it('supports query splitting when the requirements are met', async () => { + const ds = createLokiDatasource(templateSrvStub); + const query = getQueryOptions({ + targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }], + app: CoreApp.Dashboard, + }); + + await expect(ds.query(query)).toEmitValuesWith(() => { + expect(runPartitionedQuery).toHaveBeenCalled(); + }); + }); + }); }); describe('applyTemplateVariables', () => { diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index b1da90ab4d4..86e933f1a55 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -68,6 +68,7 @@ import { getLabelFilterPositions, } from './modifyQuery'; import { getQueryHints } from './queryHints'; +import { runPartitionedQuery } from './querySplitting'; import { getLogQueryFromMetricsQuery, getNormalizedLokiQuery, @@ -75,6 +76,7 @@ import { getParserFromQuery, isLogsQuery, isValidQuery, + requestSupportsPartitioning, } from './queryUtils'; import { sortDataFrameByTime } from './sortDataFrame'; import { doLokiChannelStream } from './streaming'; @@ -280,16 +282,24 @@ export class LokiDatasource if (fixedRequest.liveStreaming) { return this.runLiveQueryThroughBackend(fixedRequest); - } else { - const startTime = new Date(); - return super.query(fixedRequest).pipe( - // in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`. - map((response) => - transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? []) - ), - tap((response) => trackQuery(response, fixedRequest, startTime)) - ); } + + if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) { + return runPartitionedQuery(this, fixedRequest); + } + + return this.runQuery(fixedRequest); + } + + runQuery(fixedRequest: DataQueryRequest & { targets: LokiQuery[] }) { + const startTime = new Date(); + return super.query(fixedRequest).pipe( + // in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`. + map((response) => + transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? []) + ), + tap((response) => trackQuery(response, fixedRequest, startTime)) + ); } runLiveQueryThroughBackend(request: DataQueryRequest): Observable { diff --git a/public/app/plugins/datasource/loki/metricTimeSplit.test.ts b/public/app/plugins/datasource/loki/metricTimeSplit.test.ts new file mode 100644 index 00000000000..1247457bbc2 --- /dev/null +++ b/public/app/plugins/datasource/loki/metricTimeSplit.test.ts @@ -0,0 +1,29 @@ +import { getRanges } from './metricTimeSplit'; + +describe('querySplit', () => { + it('should split time range into chunks', () => { + const start = Date.parse('2022-02-06T14:10:03'); + const end = Date.parse('2022-02-06T14:11:03'); + const step = 10 * 1000; + + expect(getRanges(start, end, step, 25000)).toStrictEqual([ + [Date.parse('2022-02-06T14:10:00'), Date.parse('2022-02-06T14:10:10')], + [Date.parse('2022-02-06T14:10:20'), Date.parse('2022-02-06T14:10:40')], + [Date.parse('2022-02-06T14:10:50'), Date.parse('2022-02-06T14:11:10')], + ]); + }); + + it('should return null if too many chunks would be generated', () => { + const start = Date.parse('2022-02-06T14:10:03'); + const end = Date.parse('2022-02-06T14:35:01'); + const step = 10 * 1000; + expect(getRanges(start, end, step, 20000)).toBeNull(); + }); + + it('should return null if requested duration is smaller than step', () => { + const start = Date.parse('2022-02-06T14:10:03'); + const end = Date.parse('2022-02-06T14:10:33'); + const step = 10 * 1000; + expect(getRanges(start, end, step, 1000)).toBeNull(); + }); +}); diff --git a/public/app/plugins/datasource/loki/metricTimeSplit.ts b/public/app/plugins/datasource/loki/metricTimeSplit.ts new file mode 100644 index 00000000000..56aac1cc855 --- /dev/null +++ b/public/app/plugins/datasource/loki/metricTimeSplit.ts @@ -0,0 +1,58 @@ +// every timestamp in this file is a number which contains an unix-timestamp-in-millisecond format, +// like returned by `new Date().getTime()`. this is needed because the "math" +// has to be done on integer numbers. + +// we are trying to be compatible with +// https://github.com/grafana/loki/blob/089ec1b05f5ec15a8851d0e8230153e0eeb4dcec/pkg/querier/queryrange/split_by_interval.go#L327-L336 + +function expandTimeRange(startTime: number, endTime: number, step: number): [number, number] { + // startTime is decreased to the closes multiple-of-step, if necessary + const newStartTime = startTime - (startTime % step); + + // endTime is increased to the closed multiple-of-step, if necessary + let newEndTime = endTime; + const endStepMod = endTime % step; + if (endStepMod !== 0) { + newEndTime += step - endStepMod; + } + + return [newStartTime, newEndTime]; +} + +const MAX_CHUNK_COUNT = 50; + +export function getRanges( + startTime: number, + endTime: number, + step: number, + idealRangeDuration: number +): Array<[number, number]> | null { + if (idealRangeDuration < step) { + // we cannot create chunks smaller than `step` + return null; + } + + // we make the duration a multiple of `step`, lowering it if necessary + const alignedDuration = Math.trunc(idealRangeDuration / step) * step; + + const [alignedStartTime, alignedEndTime] = expandTimeRange(startTime, endTime, step); + + const result: Array<[number, number]> = []; + + // we iterate it from the end, because we want to have the potentially smaller chunk at the end, not at the beginning + for (let chunkEndTime = alignedEndTime; chunkEndTime > alignedStartTime; chunkEndTime -= alignedDuration + step) { + // when we get close to the start of the time range, we need to be sure not + // to cross over the startTime + const chunkStartTime = Math.max(chunkEndTime - alignedDuration, alignedStartTime); + result.push([chunkStartTime, chunkEndTime]); + + if (result.length > MAX_CHUNK_COUNT) { + return null; + } + } + + // because we walked backwards, we need to reverse the array + result.reverse(); + + return result; +} diff --git a/public/app/plugins/datasource/loki/mocks.ts b/public/app/plugins/datasource/loki/mocks.ts index 71e64be3894..2db4ae0515f 100644 --- a/public/app/plugins/datasource/loki/mocks.ts +++ b/public/app/plugins/datasource/loki/mocks.ts @@ -1,4 +1,12 @@ -import { DataSourceInstanceSettings, DataSourceSettings, PluginType, toUtc } from '@grafana/data'; +import { + ArrayVector, + DataFrame, + DataSourceInstanceSettings, + DataSourceSettings, + FieldType, + PluginType, + toUtc, +} from '@grafana/data'; import { TemplateSrv } from '@grafana/runtime'; import { getMockDataSource } from '../../../features/datasources/__mocks__'; @@ -99,3 +107,135 @@ export function createMetadataRequest( } }; } + +export const logFrameA: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([3, 4]), + }, + { + name: 'Line', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line1', 'line2']), + }, + { + name: 'labels', + type: FieldType.other, + config: {}, + values: new ArrayVector([ + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ]), + }, + { + name: 'tsNs', + type: FieldType.string, + config: {}, + values: new ArrayVector(['3000000', '4000000']), + }, + { + name: 'id', + type: FieldType.string, + config: {}, + values: new ArrayVector(['id1', 'id2']), + }, + ], + length: 2, +}; + +export const logFrameB: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1, 2]), + }, + { + name: 'Line', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line3', 'line4']), + }, + { + name: 'labels', + type: FieldType.other, + config: {}, + values: new ArrayVector([ + { + otherLabel: 'other value', + }, + ]), + }, + { + name: 'tsNs', + type: FieldType.string, + config: {}, + values: new ArrayVector(['1000000', '2000000']), + }, + { + name: 'id', + type: FieldType.string, + config: {}, + values: new ArrayVector(['id3', 'id4']), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 1 }], + }, + length: 2, +}; + +export const metricFrameA: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([3000000, 4000000]), + }, + { + name: 'Value', + type: FieldType.number, + config: {}, + values: new ArrayVector([5, 4]), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 1 }], + }, + length: 2, +}; + +export const metricFrameB: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1000000, 2000000]), + }, + { + name: 'Value', + type: FieldType.number, + config: {}, + values: new ArrayVector([6, 7]), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 2 }], + }, + length: 2, +}; diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/querySplitting.test.ts new file mode 100644 index 00000000000..e76b75fbdca --- /dev/null +++ b/public/app/plugins/datasource/loki/querySplitting.test.ts @@ -0,0 +1,35 @@ +import { of } from 'rxjs'; +import { getQueryOptions } from 'test/helpers/getQueryOptions'; + +import { dateTime } from '@grafana/data'; + +import { LokiDatasource } from './datasource'; +import { createLokiDatasource } from './mocks'; +import { runPartitionedQuery } from './querySplitting'; +import { LokiQuery } from './types'; + +describe('runPartitionedQuery()', () => { + let datasource: LokiDatasource; + const request = getQueryOptions({ + targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }], + range: { + from: dateTime('2023-02-08T05:00:00.000Z'), + to: dateTime('2023-02-10T06:00:00.000Z'), + raw: { + from: dateTime('2023-02-08T05:00:00.000Z'), + to: dateTime('2023-02-10T06:00:00.000Z'), + }, + }, + }); + beforeEach(() => { + datasource = createLokiDatasource(); + jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [] })); + }); + + test('Splits datasource queries', async () => { + await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 3 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(3); + }); + }); +}); diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/querySplitting.ts new file mode 100644 index 00000000000..08ad8e4f96f --- /dev/null +++ b/public/app/plugins/datasource/loki/querySplitting.ts @@ -0,0 +1,93 @@ +import { Subscriber, map, Observable } from 'rxjs'; + +import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data'; +import { LoadingState } from '@grafana/schema'; + +import { LokiDatasource } from './datasource'; +import { getRanges } from './metricTimeSplit'; +import { combineResponses, resultLimitReached } from './queryUtils'; +import { LokiQuery } from './types'; + +/** + * Purposely exposing it to support doing tests without needing to update the repo. + * TODO: remove. + * Hardcoded to 1 day. + */ +(window as any).lokiChunkDuration = 24 * 60 * 60 * 1000; + +export function partitionTimeRange(originalTimeRange: TimeRange, intervalMs: number, resolution: number): TimeRange[] { + // we currently assume we are only running metric queries here. + // for logs-queries we will have to use a different time-range-split algorithm. + + // the `step` value that will be finally sent to Loki is rougly the same as `intervalMs`, + // but there are some complications. + // we need to replicate this algo: + // + // https://github.com/grafana/grafana/blob/main/pkg/tsdb/loki/step.go#L23 + + const start = originalTimeRange.from.toDate().getTime(); + const end = originalTimeRange.to.toDate().getTime(); + + const safeStep = Math.ceil((end - start) / 11000); + const step = Math.max(intervalMs * resolution, safeStep); + + const ranges = getRanges(start, end, step, (window as any).lokiChunkDuration); + + // if the split was not possible, go with the original range + if (ranges == null) { + return [originalTimeRange]; + } + + return ranges.map(([start, end]) => { + const from = dateTime(start); + const to = dateTime(end); + return { + from, + to, + raw: { from, to }, + }; + }); +} + +export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest) { + let mergedResponse: DataQueryResponse | null; + // FIXME: the following line assumes every query has the same resolution + const partition = partitionTimeRange(request.range, request.intervalMs, request.targets[0].resolution ?? 1); + const totalRequests = partition.length; + + const runNextRequest = (subscriber: Subscriber, requestN: number) => { + const requestId = `${request.requestId}_${requestN}`; + const range = partition[requestN - 1]; + datasource + .runQuery({ ...request, range, requestId }) + .pipe( + // in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`. + map((partialResponse) => { + mergedResponse = combineResponses(mergedResponse, partialResponse); + return mergedResponse; + }) + ) + .subscribe({ + next: (response) => { + if (requestN > 1 && resultLimitReached(request, response) === false) { + response.state = LoadingState.Streaming; + subscriber.next(response); + runNextRequest(subscriber, requestN - 1); + return; + } + response.state = LoadingState.Done; + subscriber.next(response); + subscriber.complete(); + }, + error: (error) => { + subscriber.error(error); + }, + }); + }; + + const response = new Observable((subscriber) => { + runNextRequest(subscriber, totalRequests); + }); + + return response; +} diff --git a/public/app/plugins/datasource/loki/queryUtils.test.ts b/public/app/plugins/datasource/loki/queryUtils.test.ts index c2ac42d41ae..6c1bfa6e493 100644 --- a/public/app/plugins/datasource/loki/queryUtils.test.ts +++ b/public/app/plugins/datasource/loki/queryUtils.test.ts @@ -1,3 +1,8 @@ +import { getQueryOptions } from 'test/helpers/getQueryOptions'; + +import { ArrayVector, DataQueryResponse, FieldType } from '@grafana/data'; + +import { logFrameA, logFrameB, metricFrameA, metricFrameB } from './mocks'; import { getHighlighterExpressionsFromQuery, getNormalizedLokiQuery, @@ -8,6 +13,8 @@ import { parseToNodeNamesArray, getParserFromQuery, obfuscate, + resultLimitReached, + combineResponses, } from './queryUtils'; import { LokiQuery, LokiQueryType } from './types'; @@ -292,3 +299,157 @@ describe('getParserFromQuery', () => { ); }); }); + +describe('resultLimitReached', () => { + const result = { + data: [ + { + name: 'test', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1, 2]), + }, + { + name: 'Line', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line1', 'line2']), + }, + ], + length: 2, + }, + ], + }; + it('returns false for non-logs queries', () => { + const request = getQueryOptions({ + targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }], + }); + + expect(resultLimitReached(request, result)).toBe(false); + }); + it('returns false when the limit is not reached', () => { + const request = getQueryOptions({ + targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 3 }], + }); + + expect(resultLimitReached(request, result)).toBe(false); + }); + it('returns true when the limit is reached', () => { + const request = getQueryOptions({ + targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 2 }], + }); + + expect(resultLimitReached(request, result)).toBe(true); + }); +}); + +describe('combineResponses', () => { + it('combines logs frames', () => { + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1, 2, 3, 4]), + }, + { + config: {}, + name: 'Line', + type: 'string', + values: new ArrayVector(['line3', 'line4', 'line1', 'line2']), + }, + { + config: {}, + name: 'labels', + type: 'other', + values: new ArrayVector([ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ]), + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']), + }, + { + config: {}, + name: 'id', + type: 'string', + values: new ArrayVector(['id3', 'id4', 'id1', 'id2']), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Ingester: total reached', + value: 1, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines metric frames', () => { + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), + }, + { + config: {}, + name: 'Value', + type: 'number', + values: new ArrayVector([6, 7, 5, 4]), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Ingester: total reached', + value: 3, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); +}); diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts index 779215d1c19..fd7a4af9c84 100644 --- a/public/app/plugins/datasource/loki/queryUtils.ts +++ b/public/app/plugins/datasource/loki/queryUtils.ts @@ -1,6 +1,7 @@ import { SyntaxNode } from '@lezer/common'; import { escapeRegExp } from 'lodash'; +import { DataQueryRequest, DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data'; import { parser, LineFilter, @@ -295,3 +296,94 @@ export function getStreamSelectorsFromQuery(query: string): string[] { return labelMatchers; } + +export function requestSupportsPartitioning(queries: LokiQuery[]) { + /* + * For now, we would not split when more than 1 query is requested. + */ + if (queries.length > 1) { + return false; + } + + /** + * Disable logs volume queries. + */ + if (queries[0].refId.includes('log-volume-')) { + return false; + } + + if (isLogsQuery(queries[0].expr)) { + return false; + } + + return true; +} + +export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) { + if (!currentResult) { + return newResult; + } + + newResult.data.forEach((newFrame) => { + const currentFrame = currentResult.data.find((frame) => frame.name === newFrame.name); + if (!currentFrame) { + currentResult.data.push(newFrame); + return; + } + combineFrames(currentFrame, newFrame); + }); + + return currentResult; +} + +function combineFrames(dest: DataQueryResponseData, source: DataQueryResponseData) { + const totalFields = dest.fields.length; + for (let i = 0; i < totalFields; i++) { + dest.fields[i].values.buffer = [].concat.apply(source.fields[i].values.buffer, dest.fields[i].values.buffer); + } + dest.length += source.length; + combineMetadata(dest, source); +} + +function combineMetadata(dest: DataQueryResponseData = {}, source: DataQueryResponseData = {}) { + if (!source.meta?.stats) { + return; + } + if (!dest.meta?.stats) { + if (!dest.meta) { + dest.meta = {}; + } + Object.assign(dest.meta, { stats: source.meta.stats }); + return; + } + dest.meta.stats.forEach((destStat: QueryResultMetaStat, i: number) => { + const sourceStat = source.meta.stats?.find( + (sourceStat: QueryResultMetaStat) => destStat.displayName === sourceStat.displayName + ); + if (sourceStat) { + destStat.value += sourceStat.value; + } + }); +} + +/** + * Checks if the current response has reached the requested amount of results or not. + * For log queries, we will ensure that the current amount of results doesn't go beyond `maxLines`. + */ +export function resultLimitReached(request: DataQueryRequest, result: DataQueryResponse) { + const logRequests = request.targets.filter((target) => isLogsQuery(target.expr)); + + if (logRequests.length === 0) { + return false; + } + + for (const request of logRequests) { + for (const frame of result.data) { + if (request.maxLines && frame?.fields[0].values.length >= request.maxLines) { + return true; + } + } + } + + return false; +}