mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
loki: query splitting: split logs queries (#63091)
* loki: calculate logs-chunk-boundaries * renamed function
This commit is contained in:
parent
685afdef37
commit
27d70819cc
35
public/app/plugins/datasource/loki/logsTimeSplit.test.ts
Normal file
35
public/app/plugins/datasource/loki/logsTimeSplit.test.ts
Normal file
@ -0,0 +1,35 @@
|
||||
import { getRangeChunks } from './logsTimeSplit';
|
||||
|
||||
describe('querySplit', () => {
|
||||
it('should split time range into chunks', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03.234');
|
||||
const end = Date.parse('2022-02-06T14:11:03.567');
|
||||
|
||||
expect(getRangeChunks(start, end, 10000)).toStrictEqual([
|
||||
[Date.parse('2022-02-06T14:10:03.234'), Date.parse('2022-02-06T14:10:03.567')],
|
||||
[Date.parse('2022-02-06T14:10:03.567'), Date.parse('2022-02-06T14:10:13.567')],
|
||||
[Date.parse('2022-02-06T14:10:13.567'), Date.parse('2022-02-06T14:10:23.567')],
|
||||
[Date.parse('2022-02-06T14:10:23.567'), Date.parse('2022-02-06T14:10:33.567')],
|
||||
[Date.parse('2022-02-06T14:10:33.567'), Date.parse('2022-02-06T14:10:43.567')],
|
||||
[Date.parse('2022-02-06T14:10:43.567'), Date.parse('2022-02-06T14:10:53.567')],
|
||||
[Date.parse('2022-02-06T14:10:53.567'), Date.parse('2022-02-06T14:11:03.567')],
|
||||
]);
|
||||
});
|
||||
|
||||
it('should split time range into chunks, when nicely aligned', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03.567');
|
||||
const end = Date.parse('2022-02-06T14:11:03.567');
|
||||
|
||||
expect(getRangeChunks(start, end, 20000)).toStrictEqual([
|
||||
[Date.parse('2022-02-06T14:10:03.567'), Date.parse('2022-02-06T14:10:23.567')],
|
||||
[Date.parse('2022-02-06T14:10:23.567'), Date.parse('2022-02-06T14:10:43.567')],
|
||||
[Date.parse('2022-02-06T14:10:43.567'), Date.parse('2022-02-06T14:11:03.567')],
|
||||
]);
|
||||
});
|
||||
|
||||
it('should return null if too many chunks would be generated', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:30:03');
|
||||
expect(getRangeChunks(start, end, 10000)).toBeNull();
|
||||
});
|
||||
});
|
49
public/app/plugins/datasource/loki/logsTimeSplit.ts
Normal file
49
public/app/plugins/datasource/loki/logsTimeSplit.ts
Normal file
@ -0,0 +1,49 @@
|
||||
// every timestamp in this file is a number which contains an unix-timestamp-in-millisecond format,
|
||||
// like returned by `new Date().getTime()`. this is needed because the "math"
|
||||
// has to be done on integer numbers.
|
||||
|
||||
const MAX_CHUNK_COUNT = 50;
|
||||
|
||||
// the way loki handles logs-range-queries is that if you specify start & end,
|
||||
// one of those will be included, but the other will not. this allows us to
|
||||
// make it easy to split ranges.
|
||||
// for example, if the time-range is 100<>150,
|
||||
// we can split it into:
|
||||
// - 100<>120
|
||||
// - 120<>140
|
||||
// - 140<>150
|
||||
// and no log-line will be skipped or duplicated
|
||||
// (NOTE: we do these calculations in milliseconds. at the end, Loki receives
|
||||
// nanoseconds, but it will be OK, because it's simply a matter to adding `000000`,
|
||||
// to the end, so if we do it right in milliseconds, it should be OK in
|
||||
// nanoseconds too
|
||||
|
||||
export function getRangeChunks(
|
||||
startTime: number,
|
||||
endTime: number,
|
||||
idealRangeDuration: number
|
||||
): Array<[number, number]> | null {
|
||||
if (endTime - startTime <= idealRangeDuration) {
|
||||
return [[startTime, endTime]];
|
||||
}
|
||||
|
||||
const result: Array<[number, number]> = [];
|
||||
|
||||
// we walk backward, because we need want the potentially smaller "last" chunk
|
||||
// to be at the oldest timestamp.
|
||||
for (let chunkEndTime = endTime; chunkEndTime > startTime; chunkEndTime -= idealRangeDuration) {
|
||||
// when we get close to the start of the time range, we need to be sure not
|
||||
// to cross over the startTime
|
||||
const chunkStartTime = Math.max(chunkEndTime - idealRangeDuration, startTime);
|
||||
result.push([chunkStartTime, chunkEndTime]);
|
||||
|
||||
if (result.length > MAX_CHUNK_COUNT) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// because we walked backwards, we need to reverse the array
|
||||
result.reverse();
|
||||
|
||||
return result;
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
import { getRanges } from './metricTimeSplit';
|
||||
import { getRangeChunks } from './metricTimeSplit';
|
||||
|
||||
describe('querySplit', () => {
|
||||
it('should split time range into chunks', () => {
|
||||
@ -6,7 +6,7 @@ describe('querySplit', () => {
|
||||
const end = Date.parse('2022-02-06T14:11:03');
|
||||
const step = 10 * 1000;
|
||||
|
||||
expect(getRanges(start, end, step, 25000)).toStrictEqual([
|
||||
expect(getRangeChunks(start, end, step, 25000)).toStrictEqual([
|
||||
[Date.parse('2022-02-06T14:10:00'), Date.parse('2022-02-06T14:10:10')],
|
||||
[Date.parse('2022-02-06T14:10:20'), Date.parse('2022-02-06T14:10:40')],
|
||||
[Date.parse('2022-02-06T14:10:50'), Date.parse('2022-02-06T14:11:10')],
|
||||
@ -17,13 +17,13 @@ describe('querySplit', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:35:01');
|
||||
const step = 10 * 1000;
|
||||
expect(getRanges(start, end, step, 20000)).toBeNull();
|
||||
expect(getRangeChunks(start, end, step, 20000)).toBeNull();
|
||||
});
|
||||
|
||||
it('should return null if requested duration is smaller than step', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:10:33');
|
||||
const step = 10 * 1000;
|
||||
expect(getRanges(start, end, step, 1000)).toBeNull();
|
||||
expect(getRangeChunks(start, end, step, 1000)).toBeNull();
|
||||
});
|
||||
});
|
||||
|
@ -21,7 +21,7 @@ function expandTimeRange(startTime: number, endTime: number, step: number): [num
|
||||
|
||||
const MAX_CHUNK_COUNT = 50;
|
||||
|
||||
export function getRanges(
|
||||
export function getRangeChunks(
|
||||
startTime: number,
|
||||
endTime: number,
|
||||
step: number,
|
||||
|
@ -4,8 +4,9 @@ import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafa
|
||||
import { LoadingState } from '@grafana/schema';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { getRanges } from './metricTimeSplit';
|
||||
import { combineResponses, resultLimitReached } from './queryUtils';
|
||||
import { getRangeChunks as getLogsRangeChunks } from './logsTimeSplit';
|
||||
import { getRangeChunks as getMetricRangeChunks } from './metricTimeSplit';
|
||||
import { combineResponses, isLogsQuery, resultLimitReached } from './queryUtils';
|
||||
import { LokiQuery } from './types';
|
||||
|
||||
/**
|
||||
@ -15,10 +16,12 @@ import { LokiQuery } from './types';
|
||||
*/
|
||||
(window as any).lokiChunkDuration = 24 * 60 * 60 * 1000;
|
||||
|
||||
export function partitionTimeRange(originalTimeRange: TimeRange, intervalMs: number, resolution: number): TimeRange[] {
|
||||
// we currently assume we are only running metric queries here.
|
||||
// for logs-queries we will have to use a different time-range-split algorithm.
|
||||
|
||||
export function partitionTimeRange(
|
||||
isLogsQuery: boolean,
|
||||
originalTimeRange: TimeRange,
|
||||
intervalMs: number,
|
||||
resolution: number
|
||||
): TimeRange[] {
|
||||
// the `step` value that will be finally sent to Loki is rougly the same as `intervalMs`,
|
||||
// but there are some complications.
|
||||
// we need to replicate this algo:
|
||||
@ -31,7 +34,11 @@ export function partitionTimeRange(originalTimeRange: TimeRange, intervalMs: num
|
||||
const safeStep = Math.ceil((end - start) / 11000);
|
||||
const step = Math.max(intervalMs * resolution, safeStep);
|
||||
|
||||
const ranges = getRanges(start, end, step, (window as any).lokiChunkDuration);
|
||||
const duration: number = (window as any).lokiChunkDuration;
|
||||
|
||||
const ranges = isLogsQuery
|
||||
? getLogsRangeChunks(start, end, duration)
|
||||
: getMetricRangeChunks(start, end, step, duration);
|
||||
|
||||
// if the split was not possible, go with the original range
|
||||
if (ranges == null) {
|
||||
@ -51,8 +58,14 @@ export function partitionTimeRange(originalTimeRange: TimeRange, intervalMs: num
|
||||
|
||||
export function runPartitionedQuery(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
|
||||
let mergedResponse: DataQueryResponse | null;
|
||||
// FIXME: the following line assumes every query has the same resolution
|
||||
const partition = partitionTimeRange(request.range, request.intervalMs, request.targets[0].resolution ?? 1);
|
||||
// we assume there is just a single query in the request
|
||||
const query = request.targets[0];
|
||||
const partition = partitionTimeRange(
|
||||
isLogsQuery(query.expr),
|
||||
request.range,
|
||||
request.intervalMs,
|
||||
query.resolution ?? 1
|
||||
);
|
||||
const totalRequests = partition.length;
|
||||
|
||||
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
|
||||
|
@ -312,10 +312,6 @@ export function requestSupportsPartitioning(queries: LokiQuery[]) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isLogsQuery(queries[0].expr)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user