mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Loki Query Splitting: Split queries into sub-queries with smaller time interval (#62767)
* Range splitting: range splitting function * Range splitting: experiment with 1 hour splits * Range splitting: reorganize code * Range splitting: improve code readability and meaning * Range splitting: add partition limit to prevent infinite loops * Range splitting: add error handling * Range splitting: disable for logs queries * Range splitting: support any arbitrary time splitting + respect original from/to in the partition * Chore: remove console logs * Chore: delete unused import * Range splitting: actually send requests in sequence * Range splitting: do not split when > 1 query * Range splitting: combine frames * Chore: rename function * split in reverse * polished reversing * keep reference to the right frame in the result * Range splitting: change request state to Streaming * Range splitting: fix moving only 1 unit of time instead of the provided one * Chore: change default parameter to timeShift = 1 * Range splitting: do not split for range queqries * Range splitting: add initial support for log queries * Range splitting: do not use MutableDataFrame It has bad performance and it's not required * Chore: remove unused export * Query Splitting: move to module * loki: split: fix off-by-one error (#62966) loki: split: fix off-by-one loop * Range splitting: disable for logs volume queries * Range splitting: combine any number of fields, not just hardcoded 2 * Range splitting: optimize frame-combining function * Range splitting: further optimize * Range splitting: combine frame length * Range splitting: combine stats * Range splitting: combine stats without assuming the same order * Query splitting: catch and raise errors * Range splitting: create feature flag * Range splitting: implement feature flag * Range splitting: add unit test for datasource query * Range splitting: add basic test for runPartitionedQuery * Range splitting: add unit test for resultLimitReached * Range splitting: test frame merging * Chore: fix unit test --------- Co-authored-by: Sven Grossmann <svennergr@gmail.com> Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>
This commit is contained in:
parent
1bc43f5faa
commit
94241f6676
@ -6032,6 +6032,12 @@ exports[`better eslint`] = {
|
||||
"public/app/plugins/datasource/loki/getDerivedFields.ts:5381": [
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "0"]
|
||||
],
|
||||
"public/app/plugins/datasource/loki/querySplitting.ts:5381": [
|
||||
[0, 0, 0, "Do not use any type assertions.", "0"],
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "1"],
|
||||
[0, 0, 0, "Do not use any type assertions.", "2"],
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "3"]
|
||||
],
|
||||
"public/app/plugins/datasource/loki/querybuilder/binaryScalarOperations.ts:5381": [
|
||||
[0, 0, 0, "Unexpected any. Specify a different type.", "0"]
|
||||
],
|
||||
|
@ -91,6 +91,7 @@ Alpha features might be changed or removed without prior notice.
|
||||
| `alertingBacktesting` | Rule backtesting API for alerting |
|
||||
| `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files |
|
||||
| `logsContextDatasourceUi` | Allow datasource to provide custom UI for context view |
|
||||
| `lokiQuerySplitting` | Split large interval queries into subqueries with smaller time intervals |
|
||||
|
||||
## Development feature toggles
|
||||
|
||||
|
@ -83,4 +83,5 @@ export interface FeatureToggles {
|
||||
topNavCommandPalette?: boolean;
|
||||
logsSampleInExplore?: boolean;
|
||||
logsContextDatasourceUi?: boolean;
|
||||
lokiQuerySplitting?: boolean;
|
||||
}
|
||||
|
@ -378,5 +378,11 @@ var (
|
||||
State: FeatureStateAlpha,
|
||||
FrontendOnly: true,
|
||||
},
|
||||
{
|
||||
Name: "lokiQuerySplitting",
|
||||
Description: "Split large interval queries into subqueries with smaller time intervals",
|
||||
State: FeatureStateAlpha,
|
||||
FrontendOnly: true,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
@ -274,4 +274,8 @@ const (
|
||||
// FlagLogsContextDatasourceUi
|
||||
// Allow datasource to provide custom UI for context view
|
||||
FlagLogsContextDatasourceUi = "logsContextDatasourceUi"
|
||||
|
||||
// FlagLokiQuerySplitting
|
||||
// Split large interval queries into subqueries with smaller time intervals
|
||||
FlagLokiQuerySplitting = "lokiQuerySplitting"
|
||||
)
|
||||
|
@ -20,6 +20,7 @@ import {
|
||||
import {
|
||||
BackendSrv,
|
||||
BackendSrvRequest,
|
||||
config,
|
||||
FetchResponse,
|
||||
getBackendSrv,
|
||||
reportInteraction,
|
||||
@ -32,7 +33,8 @@ import { CustomVariableModel } from '../../../features/variables/types';
|
||||
|
||||
import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource';
|
||||
import { createLokiDatasource, createMetadataRequest } from './mocks';
|
||||
import { parseToNodeNamesArray } from './queryUtils';
|
||||
import { runPartitionedQuery } from './querySplitting';
|
||||
import { parseToNodeNamesArray, requestSupportsPartitioning } from './queryUtils';
|
||||
import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types';
|
||||
import { LokiVariableSupport } from './variables';
|
||||
|
||||
@ -43,6 +45,15 @@ jest.mock('@grafana/runtime', () => {
|
||||
};
|
||||
});
|
||||
|
||||
jest.mock('./queryUtils', () => {
|
||||
return {
|
||||
...jest.requireActual('./queryUtils'),
|
||||
requestSupportsPartitioning: jest.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
jest.mock('./querySplitting');
|
||||
|
||||
const templateSrvStub = {
|
||||
getAdhocFilters: jest.fn(() => [] as unknown[]),
|
||||
replace: jest.fn((a: string, ...rest: unknown[]) => a),
|
||||
@ -1097,6 +1108,33 @@ describe('LokiDatasource', () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Query splitting', () => {
|
||||
beforeAll(() => {
|
||||
jest.mocked(requestSupportsPartitioning).mockReturnValue(true);
|
||||
config.featureToggles.lokiQuerySplitting = true;
|
||||
jest.mocked(runPartitionedQuery).mockReturnValue(
|
||||
of({
|
||||
data: [],
|
||||
})
|
||||
);
|
||||
});
|
||||
afterAll(() => {
|
||||
config.featureToggles.lokiQuerySplitting = false;
|
||||
jest.mocked(requestSupportsPartitioning).mockReturnValue(false);
|
||||
});
|
||||
it('supports query splitting when the requirements are met', async () => {
|
||||
const ds = createLokiDatasource(templateSrvStub);
|
||||
const query = getQueryOptions<LokiQuery>({
|
||||
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }],
|
||||
app: CoreApp.Dashboard,
|
||||
});
|
||||
|
||||
await expect(ds.query(query)).toEmitValuesWith(() => {
|
||||
expect(runPartitionedQuery).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('applyTemplateVariables', () => {
|
||||
|
@ -68,6 +68,7 @@ import {
|
||||
getLabelFilterPositions,
|
||||
} from './modifyQuery';
|
||||
import { getQueryHints } from './queryHints';
|
||||
import { runPartitionedQuery } from './querySplitting';
|
||||
import {
|
||||
getLogQueryFromMetricsQuery,
|
||||
getNormalizedLokiQuery,
|
||||
@ -75,6 +76,7 @@ import {
|
||||
getParserFromQuery,
|
||||
isLogsQuery,
|
||||
isValidQuery,
|
||||
requestSupportsPartitioning,
|
||||
} from './queryUtils';
|
||||
import { sortDataFrameByTime } from './sortDataFrame';
|
||||
import { doLokiChannelStream } from './streaming';
|
||||
@ -280,16 +282,24 @@ export class LokiDatasource
|
||||
|
||||
if (fixedRequest.liveStreaming) {
|
||||
return this.runLiveQueryThroughBackend(fixedRequest);
|
||||
} else {
|
||||
const startTime = new Date();
|
||||
return super.query(fixedRequest).pipe(
|
||||
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
|
||||
map((response) =>
|
||||
transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? [])
|
||||
),
|
||||
tap((response) => trackQuery(response, fixedRequest, startTime))
|
||||
);
|
||||
}
|
||||
|
||||
if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) {
|
||||
return runPartitionedQuery(this, fixedRequest);
|
||||
}
|
||||
|
||||
return this.runQuery(fixedRequest);
|
||||
}
|
||||
|
||||
runQuery(fixedRequest: DataQueryRequest<LokiQuery> & { targets: LokiQuery[] }) {
|
||||
const startTime = new Date();
|
||||
return super.query(fixedRequest).pipe(
|
||||
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
|
||||
map((response) =>
|
||||
transformBackendResult(response, fixedRequest.targets, this.instanceSettings.jsonData.derivedFields ?? [])
|
||||
),
|
||||
tap((response) => trackQuery(response, fixedRequest, startTime))
|
||||
);
|
||||
}
|
||||
|
||||
runLiveQueryThroughBackend(request: DataQueryRequest<LokiQuery>): Observable<DataQueryResponse> {
|
||||
|
29
public/app/plugins/datasource/loki/metricTimeSplit.test.ts
Normal file
29
public/app/plugins/datasource/loki/metricTimeSplit.test.ts
Normal file
@ -0,0 +1,29 @@
|
||||
import { getRanges } from './metricTimeSplit';
|
||||
|
||||
describe('querySplit', () => {
|
||||
it('should split time range into chunks', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:11:03');
|
||||
const step = 10 * 1000;
|
||||
|
||||
expect(getRanges(start, end, step, 25000)).toStrictEqual([
|
||||
[Date.parse('2022-02-06T14:10:00'), Date.parse('2022-02-06T14:10:10')],
|
||||
[Date.parse('2022-02-06T14:10:20'), Date.parse('2022-02-06T14:10:40')],
|
||||
[Date.parse('2022-02-06T14:10:50'), Date.parse('2022-02-06T14:11:10')],
|
||||
]);
|
||||
});
|
||||
|
||||
it('should return null if too many chunks would be generated', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:35:01');
|
||||
const step = 10 * 1000;
|
||||
expect(getRanges(start, end, step, 20000)).toBeNull();
|
||||
});
|
||||
|
||||
it('should return null if requested duration is smaller than step', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:10:33');
|
||||
const step = 10 * 1000;
|
||||
expect(getRanges(start, end, step, 1000)).toBeNull();
|
||||
});
|
||||
});
|
58
public/app/plugins/datasource/loki/metricTimeSplit.ts
Normal file
58
public/app/plugins/datasource/loki/metricTimeSplit.ts
Normal file
@ -0,0 +1,58 @@
|
||||
// every timestamp in this file is a number which contains an unix-timestamp-in-millisecond format,
|
||||
// like returned by `new Date().getTime()`. this is needed because the "math"
|
||||
// has to be done on integer numbers.
|
||||
|
||||
// we are trying to be compatible with
|
||||
// https://github.com/grafana/loki/blob/089ec1b05f5ec15a8851d0e8230153e0eeb4dcec/pkg/querier/queryrange/split_by_interval.go#L327-L336
|
||||
|
||||
function expandTimeRange(startTime: number, endTime: number, step: number): [number, number] {
|
||||
// startTime is decreased to the closes multiple-of-step, if necessary
|
||||
const newStartTime = startTime - (startTime % step);
|
||||
|
||||
// endTime is increased to the closed multiple-of-step, if necessary
|
||||
let newEndTime = endTime;
|
||||
const endStepMod = endTime % step;
|
||||
if (endStepMod !== 0) {
|
||||
newEndTime += step - endStepMod;
|
||||
}
|
||||
|
||||
return [newStartTime, newEndTime];
|
||||
}
|
||||
|
||||
const MAX_CHUNK_COUNT = 50;
|
||||
|
||||
export function getRanges(
|
||||
startTime: number,
|
||||
endTime: number,
|
||||
step: number,
|
||||
idealRangeDuration: number
|
||||
): Array<[number, number]> | null {
|
||||
if (idealRangeDuration < step) {
|
||||
// we cannot create chunks smaller than `step`
|
||||
return null;
|
||||
}
|
||||
|
||||
// we make the duration a multiple of `step`, lowering it if necessary
|
||||
const alignedDuration = Math.trunc(idealRangeDuration / step) * step;
|
||||
|
||||
const [alignedStartTime, alignedEndTime] = expandTimeRange(startTime, endTime, step);
|
||||
|
||||
const result: Array<[number, number]> = [];
|
||||
|
||||
// we iterate it from the end, because we want to have the potentially smaller chunk at the end, not at the beginning
|
||||
for (let chunkEndTime = alignedEndTime; chunkEndTime > alignedStartTime; chunkEndTime -= alignedDuration + step) {
|
||||
// when we get close to the start of the time range, we need to be sure not
|
||||
// to cross over the startTime
|
||||
const chunkStartTime = Math.max(chunkEndTime - alignedDuration, alignedStartTime);
|
||||
result.push([chunkStartTime, chunkEndTime]);
|
||||
|
||||
if (result.length > MAX_CHUNK_COUNT) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// because we walked backwards, we need to reverse the array
|
||||
result.reverse();
|
||||
|
||||
return result;
|
||||
}
|
@ -1,4 +1,12 @@
|
||||
import { DataSourceInstanceSettings, DataSourceSettings, PluginType, toUtc } from '@grafana/data';
|
||||
import {
|
||||
ArrayVector,
|
||||
DataFrame,
|
||||
DataSourceInstanceSettings,
|
||||
DataSourceSettings,
|
||||
FieldType,
|
||||
PluginType,
|
||||
toUtc,
|
||||
} from '@grafana/data';
|
||||
import { TemplateSrv } from '@grafana/runtime';
|
||||
|
||||
import { getMockDataSource } from '../../../features/datasources/__mocks__';
|
||||
@ -99,3 +107,135 @@ export function createMetadataRequest(
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export const logFrameA: DataFrame = {
|
||||
refId: 'A',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: new ArrayVector([3, 4]),
|
||||
},
|
||||
{
|
||||
name: 'Line',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['line1', 'line2']),
|
||||
},
|
||||
{
|
||||
name: 'labels',
|
||||
type: FieldType.other,
|
||||
config: {},
|
||||
values: new ArrayVector([
|
||||
{
|
||||
label: 'value',
|
||||
},
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
]),
|
||||
},
|
||||
{
|
||||
name: 'tsNs',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['3000000', '4000000']),
|
||||
},
|
||||
{
|
||||
name: 'id',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['id1', 'id2']),
|
||||
},
|
||||
],
|
||||
length: 2,
|
||||
};
|
||||
|
||||
export const logFrameB: DataFrame = {
|
||||
refId: 'A',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: new ArrayVector([1, 2]),
|
||||
},
|
||||
{
|
||||
name: 'Line',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['line3', 'line4']),
|
||||
},
|
||||
{
|
||||
name: 'labels',
|
||||
type: FieldType.other,
|
||||
config: {},
|
||||
values: new ArrayVector([
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
]),
|
||||
},
|
||||
{
|
||||
name: 'tsNs',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['1000000', '2000000']),
|
||||
},
|
||||
{
|
||||
name: 'id',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['id3', 'id4']),
|
||||
},
|
||||
],
|
||||
meta: {
|
||||
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
|
||||
},
|
||||
length: 2,
|
||||
};
|
||||
|
||||
export const metricFrameA: DataFrame = {
|
||||
refId: 'A',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: new ArrayVector([3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
name: 'Value',
|
||||
type: FieldType.number,
|
||||
config: {},
|
||||
values: new ArrayVector([5, 4]),
|
||||
},
|
||||
],
|
||||
meta: {
|
||||
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
|
||||
},
|
||||
length: 2,
|
||||
};
|
||||
|
||||
export const metricFrameB: DataFrame = {
|
||||
refId: 'A',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: new ArrayVector([1000000, 2000000]),
|
||||
},
|
||||
{
|
||||
name: 'Value',
|
||||
type: FieldType.number,
|
||||
config: {},
|
||||
values: new ArrayVector([6, 7]),
|
||||
},
|
||||
],
|
||||
meta: {
|
||||
stats: [{ displayName: 'Ingester: total reached', value: 2 }],
|
||||
},
|
||||
length: 2,
|
||||
};
|
||||
|
35
public/app/plugins/datasource/loki/querySplitting.test.ts
Normal file
35
public/app/plugins/datasource/loki/querySplitting.test.ts
Normal file
@ -0,0 +1,35 @@
|
||||
import { of } from 'rxjs';
|
||||
import { getQueryOptions } from 'test/helpers/getQueryOptions';
|
||||
|
||||
import { dateTime } from '@grafana/data';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { createLokiDatasource } from './mocks';
|
||||
import { runPartitionedQuery } from './querySplitting';
|
||||
import { LokiQuery } from './types';
|
||||
|
||||
describe('runPartitionedQuery()', () => {
|
||||
let datasource: LokiDatasource;
|
||||
const request = getQueryOptions<LokiQuery>({
|
||||
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A' }],
|
||||
range: {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
raw: {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
to: dateTime('2023-02-10T06:00:00.000Z'),
|
||||
},
|
||||
},
|
||||
});
|
||||
beforeEach(() => {
|
||||
datasource = createLokiDatasource();
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [] }));
|
||||
});
|
||||
|
||||
test('Splits datasource queries', async () => {
|
||||
await expect(runPartitionedQuery(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 3 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
});
|
||||
});
|
93
public/app/plugins/datasource/loki/querySplitting.ts
Normal file
93
public/app/plugins/datasource/loki/querySplitting.ts
Normal file
@ -0,0 +1,93 @@
|
||||
import { Subscriber, map, Observable } from 'rxjs';
|
||||
|
||||
import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data';
|
||||
import { LoadingState } from '@grafana/schema';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { getRanges } from './metricTimeSplit';
|
||||
import { combineResponses, resultLimitReached } from './queryUtils';
|
||||
import { LokiQuery } from './types';
|
||||
|
||||
/**
|
||||
* Purposely exposing it to support doing tests without needing to update the repo.
|
||||
* TODO: remove.
|
||||
* Hardcoded to 1 day.
|
||||
*/
|
||||
(window as any).lokiChunkDuration = 24 * 60 * 60 * 1000;
|
||||
|
||||
export function partitionTimeRange(originalTimeRange: TimeRange, intervalMs: number, resolution: number): TimeRange[] {
|
||||
// we currently assume we are only running metric queries here.
|
||||
// for logs-queries we will have to use a different time-range-split algorithm.
|
||||
|
||||
// the `step` value that will be finally sent to Loki is rougly the same as `intervalMs`,
|
||||
// but there are some complications.
|
||||
// we need to replicate this algo:
|
||||
//
|
||||
// https://github.com/grafana/grafana/blob/main/pkg/tsdb/loki/step.go#L23
|
||||
|
||||
const start = originalTimeRange.from.toDate().getTime();
|
||||
const end = originalTimeRange.to.toDate().getTime();
|
||||
|
||||
const safeStep = Math.ceil((end - start) / 11000);
|
||||
const step = Math.max(intervalMs * resolution, safeStep);
|
||||
|
||||
const ranges = getRanges(start, end, step, (window as any).lokiChunkDuration);
|
||||
|
||||
// if the split was not possible, go with the original range
|
||||
if (ranges == null) {
|
||||
return [originalTimeRange];
|
||||
}
|
||||
|
||||
return ranges.map(([start, end]) => {
|
||||
const from = dateTime(start);
|
||||
const to = dateTime(end);
|
||||
return {
|
||||
from,
|
||||
to,
|
||||
raw: { from, to },
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
|
||||
let mergedResponse: DataQueryResponse | null;
|
||||
// FIXME: the following line assumes every query has the same resolution
|
||||
const partition = partitionTimeRange(request.range, request.intervalMs, request.targets[0].resolution ?? 1);
|
||||
const totalRequests = partition.length;
|
||||
|
||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
|
||||
const requestId = `${request.requestId}_${requestN}`;
|
||||
const range = partition[requestN - 1];
|
||||
datasource
|
||||
.runQuery({ ...request, range, requestId })
|
||||
.pipe(
|
||||
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
|
||||
map((partialResponse) => {
|
||||
mergedResponse = combineResponses(mergedResponse, partialResponse);
|
||||
return mergedResponse;
|
||||
})
|
||||
)
|
||||
.subscribe({
|
||||
next: (response) => {
|
||||
if (requestN > 1 && resultLimitReached(request, response) === false) {
|
||||
response.state = LoadingState.Streaming;
|
||||
subscriber.next(response);
|
||||
runNextRequest(subscriber, requestN - 1);
|
||||
return;
|
||||
}
|
||||
response.state = LoadingState.Done;
|
||||
subscriber.next(response);
|
||||
subscriber.complete();
|
||||
},
|
||||
error: (error) => {
|
||||
subscriber.error(error);
|
||||
},
|
||||
});
|
||||
};
|
||||
|
||||
const response = new Observable<DataQueryResponse>((subscriber) => {
|
||||
runNextRequest(subscriber, totalRequests);
|
||||
});
|
||||
|
||||
return response;
|
||||
}
|
@ -1,3 +1,8 @@
|
||||
import { getQueryOptions } from 'test/helpers/getQueryOptions';
|
||||
|
||||
import { ArrayVector, DataQueryResponse, FieldType } from '@grafana/data';
|
||||
|
||||
import { logFrameA, logFrameB, metricFrameA, metricFrameB } from './mocks';
|
||||
import {
|
||||
getHighlighterExpressionsFromQuery,
|
||||
getNormalizedLokiQuery,
|
||||
@ -8,6 +13,8 @@ import {
|
||||
parseToNodeNamesArray,
|
||||
getParserFromQuery,
|
||||
obfuscate,
|
||||
resultLimitReached,
|
||||
combineResponses,
|
||||
} from './queryUtils';
|
||||
import { LokiQuery, LokiQueryType } from './types';
|
||||
|
||||
@ -292,3 +299,157 @@ describe('getParserFromQuery', () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resultLimitReached', () => {
|
||||
const result = {
|
||||
data: [
|
||||
{
|
||||
name: 'test',
|
||||
fields: [
|
||||
{
|
||||
name: 'Time',
|
||||
type: FieldType.time,
|
||||
config: {},
|
||||
values: new ArrayVector([1, 2]),
|
||||
},
|
||||
{
|
||||
name: 'Line',
|
||||
type: FieldType.string,
|
||||
config: {},
|
||||
values: new ArrayVector(['line1', 'line2']),
|
||||
},
|
||||
],
|
||||
length: 2,
|
||||
},
|
||||
],
|
||||
};
|
||||
it('returns false for non-logs queries', () => {
|
||||
const request = getQueryOptions<LokiQuery>({
|
||||
targets: [{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }],
|
||||
});
|
||||
|
||||
expect(resultLimitReached(request, result)).toBe(false);
|
||||
});
|
||||
it('returns false when the limit is not reached', () => {
|
||||
const request = getQueryOptions<LokiQuery>({
|
||||
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 3 }],
|
||||
});
|
||||
|
||||
expect(resultLimitReached(request, result)).toBe(false);
|
||||
});
|
||||
it('returns true when the limit is reached', () => {
|
||||
const request = getQueryOptions<LokiQuery>({
|
||||
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 2 }],
|
||||
});
|
||||
|
||||
expect(resultLimitReached(request, result)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('combineResponses', () => {
|
||||
it('combines logs frames', () => {
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [logFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [logFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1, 2, 3, 4]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Line',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['line3', 'line4', 'line1', 'line2']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'labels',
|
||||
type: 'other',
|
||||
values: new ArrayVector([
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
{
|
||||
label: 'value',
|
||||
},
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'tsNs',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'id',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['id3', 'id4', 'id1', 'id2']),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Ingester: total reached',
|
||||
value: 1,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines metric frames', () => {
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Value',
|
||||
type: 'number',
|
||||
values: new ArrayVector([6, 7, 5, 4]),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Ingester: total reached',
|
||||
value: 3,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1,6 +1,7 @@
|
||||
import { SyntaxNode } from '@lezer/common';
|
||||
import { escapeRegExp } from 'lodash';
|
||||
|
||||
import { DataQueryRequest, DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data';
|
||||
import {
|
||||
parser,
|
||||
LineFilter,
|
||||
@ -295,3 +296,94 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
|
||||
|
||||
return labelMatchers;
|
||||
}
|
||||
|
||||
export function requestSupportsPartitioning(queries: LokiQuery[]) {
|
||||
/*
|
||||
* For now, we would not split when more than 1 query is requested.
|
||||
*/
|
||||
if (queries.length > 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Disable logs volume queries.
|
||||
*/
|
||||
if (queries[0].refId.includes('log-volume-')) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isLogsQuery(queries[0].expr)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {
|
||||
if (!currentResult) {
|
||||
return newResult;
|
||||
}
|
||||
|
||||
newResult.data.forEach((newFrame) => {
|
||||
const currentFrame = currentResult.data.find((frame) => frame.name === newFrame.name);
|
||||
if (!currentFrame) {
|
||||
currentResult.data.push(newFrame);
|
||||
return;
|
||||
}
|
||||
combineFrames(currentFrame, newFrame);
|
||||
});
|
||||
|
||||
return currentResult;
|
||||
}
|
||||
|
||||
function combineFrames(dest: DataQueryResponseData, source: DataQueryResponseData) {
|
||||
const totalFields = dest.fields.length;
|
||||
for (let i = 0; i < totalFields; i++) {
|
||||
dest.fields[i].values.buffer = [].concat.apply(source.fields[i].values.buffer, dest.fields[i].values.buffer);
|
||||
}
|
||||
dest.length += source.length;
|
||||
combineMetadata(dest, source);
|
||||
}
|
||||
|
||||
function combineMetadata(dest: DataQueryResponseData = {}, source: DataQueryResponseData = {}) {
|
||||
if (!source.meta?.stats) {
|
||||
return;
|
||||
}
|
||||
if (!dest.meta?.stats) {
|
||||
if (!dest.meta) {
|
||||
dest.meta = {};
|
||||
}
|
||||
Object.assign(dest.meta, { stats: source.meta.stats });
|
||||
return;
|
||||
}
|
||||
dest.meta.stats.forEach((destStat: QueryResultMetaStat, i: number) => {
|
||||
const sourceStat = source.meta.stats?.find(
|
||||
(sourceStat: QueryResultMetaStat) => destStat.displayName === sourceStat.displayName
|
||||
);
|
||||
if (sourceStat) {
|
||||
destStat.value += sourceStat.value;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the current response has reached the requested amount of results or not.
|
||||
* For log queries, we will ensure that the current amount of results doesn't go beyond `maxLines`.
|
||||
*/
|
||||
export function resultLimitReached(request: DataQueryRequest<LokiQuery>, result: DataQueryResponse) {
|
||||
const logRequests = request.targets.filter((target) => isLogsQuery(target.expr));
|
||||
|
||||
if (logRequests.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (const request of logRequests) {
|
||||
for (const frame of result.data) {
|
||||
if (request.maxLines && frame?.fields[0].values.length >= request.maxLines) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user