Loki Range Splitting: Create a new instance for the initial frame (#63361)

* Range splitting: create a new instance of the initial frame

* Chore: rename variable
This commit is contained in:
Matias Chomicki 2023-02-16 12:55:31 +01:00 committed by GitHub
parent 99316f1fb6
commit 0240f4eb45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 259 additions and 134 deletions

View File

@ -108,134 +108,167 @@ export function createMetadataRequest(
}; };
} }
export const logFrameA: DataFrame = { export function getMockFrames() {
refId: 'A', const logFrameA: DataFrame = {
fields: [ refId: 'A',
{ fields: [
name: 'Time', {
type: FieldType.time, name: 'Time',
config: {}, type: FieldType.time,
values: new ArrayVector([3, 4]), config: {},
}, values: new ArrayVector([3, 4]),
{ },
name: 'Line', {
type: FieldType.string, name: 'Line',
config: {}, type: FieldType.string,
values: new ArrayVector(['line1', 'line2']), config: {},
}, values: new ArrayVector(['line1', 'line2']),
{ },
name: 'labels', {
type: FieldType.other, name: 'labels',
config: {}, type: FieldType.other,
values: new ArrayVector([ config: {},
{ values: new ArrayVector([
label: 'value', {
}, label: 'value',
{ },
otherLabel: 'other value', {
}, otherLabel: 'other value',
]), },
}, ]),
{ },
name: 'tsNs', {
type: FieldType.string, name: 'tsNs',
config: {}, type: FieldType.string,
values: new ArrayVector(['3000000', '4000000']), config: {},
}, values: new ArrayVector(['3000000', '4000000']),
{ },
name: 'id', {
type: FieldType.string, name: 'id',
config: {}, type: FieldType.string,
values: new ArrayVector(['id1', 'id2']), config: {},
}, values: new ArrayVector(['id1', 'id2']),
], },
length: 2, ],
}; length: 2,
};
export const logFrameB: DataFrame = { const logFrameB: DataFrame = {
refId: 'A', refId: 'A',
fields: [ fields: [
{ {
name: 'Time', name: 'Time',
type: FieldType.time, type: FieldType.time,
config: {}, config: {},
values: new ArrayVector([1, 2]), values: new ArrayVector([1, 2]),
},
{
name: 'Line',
type: FieldType.string,
config: {},
values: new ArrayVector(['line3', 'line4']),
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: new ArrayVector([
{
otherLabel: 'other value',
},
]),
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: new ArrayVector(['1000000', '2000000']),
},
{
name: 'id',
type: FieldType.string,
config: {},
values: new ArrayVector(['id3', 'id4']),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
}, },
{ length: 2,
name: 'Line', };
type: FieldType.string,
config: {},
values: new ArrayVector(['line3', 'line4']),
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: new ArrayVector([
{
otherLabel: 'other value',
},
]),
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: new ArrayVector(['1000000', '2000000']),
},
{
name: 'id',
type: FieldType.string,
config: {},
values: new ArrayVector(['id3', 'id4']),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
},
length: 2,
};
export const metricFrameA: DataFrame = { const metricFrameA: DataFrame = {
refId: 'A', refId: 'A',
fields: [ fields: [
{ {
name: 'Time', name: 'Time',
type: FieldType.time, type: FieldType.time,
config: {}, config: {},
values: new ArrayVector([3000000, 4000000]), values: new ArrayVector([3000000, 4000000]),
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: new ArrayVector([5, 4]),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
}, },
{ length: 2,
name: 'Value', };
type: FieldType.number,
config: {},
values: new ArrayVector([5, 4]),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 1 }],
},
length: 2,
};
export const metricFrameB: DataFrame = { const metricFrameB: DataFrame = {
refId: 'A', refId: 'A',
fields: [ fields: [
{ {
name: 'Time', name: 'Time',
type: FieldType.time, type: FieldType.time,
config: {}, config: {},
values: new ArrayVector([1000000, 2000000]), values: new ArrayVector([1000000, 2000000]),
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: new ArrayVector([6, 7]),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 2 }],
}, },
{ length: 2,
name: 'Value', };
type: FieldType.number,
config: {}, const metricFrameC: DataFrame = {
values: new ArrayVector([6, 7]), refId: 'A',
name: 'some-time-series',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: new ArrayVector([3000000, 4000000]),
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: new ArrayVector([6, 7]),
},
],
meta: {
stats: [{ displayName: 'Ingester: total reached', value: 2 }],
}, },
], length: 2,
meta: { };
stats: [{ displayName: 'Ingester: total reached', value: 2 }],
}, return {
length: 2, logFrameA,
}; logFrameB,
metricFrameA,
metricFrameB,
metricFrameC,
};
}

View File

@ -6,7 +6,7 @@ import { dateTime } from '@grafana/data';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
import * as logsTimeSplit from './logsTimeSplit'; import * as logsTimeSplit from './logsTimeSplit';
import * as metricTimeSplit from './metricTimeSplit'; import * as metricTimeSplit from './metricTimeSplit';
import { createLokiDatasource, logFrameA } from './mocks'; import { createLokiDatasource, getMockFrames } from './mocks';
import { runPartitionedQuery } from './querySplitting'; import { runPartitionedQuery } from './querySplitting';
import { LokiQuery } from './types'; import { LokiQuery } from './types';
@ -65,6 +65,7 @@ describe('runPartitionedQuery()', () => {
targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 4 }], targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 4 }],
range, range,
}); });
const { logFrameA } = getMockFrames();
beforeEach(() => { beforeEach(() => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
}); });

View File

@ -100,7 +100,7 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
const totalRequests = partition.length; const totalRequests = partition.length;
let shouldStop = false; let shouldStop = false;
let smallQuerySubsciption: Subscription | null = null; let subquerySubsciption: Subscription | null = null;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => { const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
if (shouldStop) { if (shouldStop) {
return; return;
@ -121,7 +121,7 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
return; return;
} }
smallQuerySubsciption = datasource subquerySubsciption = datasource
.runQuery({ ...request, range, requestId, targets }) .runQuery({ ...request, range, requestId, targets })
.pipe( .pipe(
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`. // in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
@ -150,8 +150,8 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
runNextRequest(subscriber, totalRequests); runNextRequest(subscriber, totalRequests);
return () => { return () => {
shouldStop = true; shouldStop = true;
if (smallQuerySubsciption != null) { if (subquerySubsciption != null) {
smallQuerySubsciption.unsubscribe(); subquerySubsciption.unsubscribe();
} }
}; };
}); });

View File

@ -1,6 +1,6 @@
import { ArrayVector, DataQueryResponse } from '@grafana/data'; import { ArrayVector, DataQueryResponse } from '@grafana/data';
import { logFrameA, logFrameB, metricFrameA, metricFrameB } from './mocks'; import { getMockFrames } from './mocks';
import { import {
getHighlighterExpressionsFromQuery, getHighlighterExpressionsFromQuery,
getNormalizedLokiQuery, getNormalizedLokiQuery,
@ -12,6 +12,7 @@ import {
getParserFromQuery, getParserFromQuery,
obfuscate, obfuscate,
combineResponses, combineResponses,
cloneQueryResponse,
} from './queryUtils'; } from './queryUtils';
import { LokiQuery, LokiQueryType } from './types'; import { LokiQuery, LokiQueryType } from './types';
@ -297,8 +298,21 @@ describe('getParserFromQuery', () => {
}); });
}); });
describe('cloneQueryResponse', () => {
const { logFrameA } = getMockFrames();
const responseA: DataQueryResponse = {
data: [logFrameA],
};
it('clones query responses', () => {
const clonedA = cloneQueryResponse(responseA);
expect(clonedA).not.toBe(responseA);
expect(clonedA).toEqual(clonedA);
});
});
describe('combineResponses', () => { describe('combineResponses', () => {
it('combines logs frames', () => { it('combines logs frames', () => {
const { logFrameA, logFrameB } = getMockFrames();
const responseA: DataQueryResponse = { const responseA: DataQueryResponse = {
data: [logFrameA], data: [logFrameA],
}; };
@ -366,6 +380,7 @@ describe('combineResponses', () => {
}); });
it('combines metric frames', () => { it('combines metric frames', () => {
const { metricFrameA, metricFrameB } = getMockFrames();
const responseA: DataQueryResponse = { const responseA: DataQueryResponse = {
data: [metricFrameA], data: [metricFrameA],
}; };
@ -403,4 +418,57 @@ describe('combineResponses', () => {
], ],
}); });
}); });
it('combines and identifies new frames in the response', () => {
const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames();
const responseA: DataQueryResponse = {
data: [metricFrameA],
};
const responseB: DataQueryResponse = {
data: [metricFrameB, metricFrameC],
};
expect(combineResponses(responseA, responseB)).toEqual({
data: [
{
fields: [
{
config: {},
name: 'Time',
type: 'time',
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
},
{
config: {},
name: 'Value',
type: 'number',
values: new ArrayVector([6, 7, 5, 4]),
},
],
length: 4,
meta: {
stats: [
{
displayName: 'Ingester: total reached',
value: 3,
},
],
},
refId: 'A',
},
metricFrameC,
],
});
});
it('combines frames in a new response instance', () => {
const { metricFrameA, metricFrameB } = getMockFrames();
const responseA: DataQueryResponse = {
data: [metricFrameA],
};
const responseB: DataQueryResponse = {
data: [metricFrameB],
};
expect(combineResponses(null, responseA)).not.toBe(responseA);
expect(combineResponses(null, responseB)).not.toBe(responseB);
});
}); });

View File

@ -1,7 +1,7 @@
import { SyntaxNode } from '@lezer/common'; import { SyntaxNode } from '@lezer/common';
import { escapeRegExp } from 'lodash'; import { escapeRegExp } from 'lodash';
import { DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data'; import { ArrayVector, DataQueryResponse, DataQueryResponseData, Field, QueryResultMetaStat } from '@grafana/data';
import { import {
parser, parser,
LineFilter, LineFilter,
@ -315,13 +315,13 @@ export function requestSupportsPartitioning(allQueries: LokiQuery[]) {
export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) { export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {
if (!currentResult) { if (!currentResult) {
return newResult; return cloneQueryResponse(newResult);
} }
newResult.data.forEach((newFrame) => { newResult.data.forEach((newFrame) => {
const currentFrame = currentResult.data.find((frame) => frame.name === newFrame.name); const currentFrame = currentResult.data.find((frame) => frame.name === newFrame.name);
if (!currentFrame) { if (!currentFrame) {
currentResult.data.push(newFrame); currentResult.data.push(cloneDataFrame(newFrame));
return; return;
} }
combineFrames(currentFrame, newFrame); combineFrames(currentFrame, newFrame);
@ -333,7 +333,9 @@ export function combineResponses(currentResult: DataQueryResponse | null, newRes
function combineFrames(dest: DataQueryResponseData, source: DataQueryResponseData) { function combineFrames(dest: DataQueryResponseData, source: DataQueryResponseData) {
const totalFields = dest.fields.length; const totalFields = dest.fields.length;
for (let i = 0; i < totalFields; i++) { for (let i = 0; i < totalFields; i++) {
dest.fields[i].values.buffer = [].concat.apply(source.fields[i].values.buffer, dest.fields[i].values.buffer); dest.fields[i].values = new ArrayVector(
[].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray())
);
} }
dest.length += source.length; dest.length += source.length;
combineMetadata(dest, source); combineMetadata(dest, source);
@ -359,3 +361,24 @@ function combineMetadata(dest: DataQueryResponseData = {}, source: DataQueryResp
} }
}); });
} }
/**
* Deep clones a DataQueryResponse
*/
export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse {
const newResponse = {
...response,
data: response.data.map(cloneDataFrame),
};
return newResponse;
}
function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData {
return {
...frame,
fields: frame.fields.map((field: Field<unknown, ArrayVector>) => ({
...field,
values: new ArrayVector(field.values.buffer),
})),
};
}