From 0240f4eb4598021992e7e26c082204e0e82dda50 Mon Sep 17 00:00:00 2001 From: Matias Chomicki Date: Thu, 16 Feb 2023 12:55:31 +0100 Subject: [PATCH] Loki Range Splitting: Create a new instance for the initial frame (#63361) * Range splitting: create a new instance of the initial frame * Chore: rename variable --- public/app/plugins/datasource/loki/mocks.ts | 281 ++++++++++-------- .../datasource/loki/querySplitting.test.ts | 3 +- .../plugins/datasource/loki/querySplitting.ts | 8 +- .../datasource/loki/queryUtils.test.ts | 70 ++++- .../app/plugins/datasource/loki/queryUtils.ts | 31 +- 5 files changed, 259 insertions(+), 134 deletions(-) diff --git a/public/app/plugins/datasource/loki/mocks.ts b/public/app/plugins/datasource/loki/mocks.ts index 2db4ae0515f..e84f1f6d257 100644 --- a/public/app/plugins/datasource/loki/mocks.ts +++ b/public/app/plugins/datasource/loki/mocks.ts @@ -108,134 +108,167 @@ export function createMetadataRequest( }; } -export const logFrameA: DataFrame = { - refId: 'A', - fields: [ - { - name: 'Time', - type: FieldType.time, - config: {}, - values: new ArrayVector([3, 4]), - }, - { - name: 'Line', - type: FieldType.string, - config: {}, - values: new ArrayVector(['line1', 'line2']), - }, - { - name: 'labels', - type: FieldType.other, - config: {}, - values: new ArrayVector([ - { - label: 'value', - }, - { - otherLabel: 'other value', - }, - ]), - }, - { - name: 'tsNs', - type: FieldType.string, - config: {}, - values: new ArrayVector(['3000000', '4000000']), - }, - { - name: 'id', - type: FieldType.string, - config: {}, - values: new ArrayVector(['id1', 'id2']), - }, - ], - length: 2, -}; +export function getMockFrames() { + const logFrameA: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([3, 4]), + }, + { + name: 'Line', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line1', 'line2']), + }, + { + name: 'labels', + type: FieldType.other, + config: {}, + values: new ArrayVector([ + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ]), + }, + { + name: 'tsNs', + type: FieldType.string, + config: {}, + values: new ArrayVector(['3000000', '4000000']), + }, + { + name: 'id', + type: FieldType.string, + config: {}, + values: new ArrayVector(['id1', 'id2']), + }, + ], + length: 2, + }; -export const logFrameB: DataFrame = { - refId: 'A', - fields: [ - { - name: 'Time', - type: FieldType.time, - config: {}, - values: new ArrayVector([1, 2]), + const logFrameB: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1, 2]), + }, + { + name: 'Line', + type: FieldType.string, + config: {}, + values: new ArrayVector(['line3', 'line4']), + }, + { + name: 'labels', + type: FieldType.other, + config: {}, + values: new ArrayVector([ + { + otherLabel: 'other value', + }, + ]), + }, + { + name: 'tsNs', + type: FieldType.string, + config: {}, + values: new ArrayVector(['1000000', '2000000']), + }, + { + name: 'id', + type: FieldType.string, + config: {}, + values: new ArrayVector(['id3', 'id4']), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 1 }], }, - { - name: 'Line', - type: FieldType.string, - config: {}, - values: new ArrayVector(['line3', 'line4']), - }, - { - name: 'labels', - type: FieldType.other, - config: {}, - values: new ArrayVector([ - { - otherLabel: 'other value', - }, - ]), - }, - { - name: 'tsNs', - type: FieldType.string, - config: {}, - values: new ArrayVector(['1000000', '2000000']), - }, - { - name: 'id', - type: FieldType.string, - config: {}, - values: new ArrayVector(['id3', 'id4']), - }, - ], - meta: { - stats: [{ displayName: 'Ingester: total reached', value: 1 }], - }, - length: 2, -}; + length: 2, + }; -export const metricFrameA: DataFrame = { - refId: 'A', - fields: [ - { - name: 'Time', - type: FieldType.time, - config: {}, - values: new ArrayVector([3000000, 4000000]), + const metricFrameA: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([3000000, 4000000]), + }, + { + name: 'Value', + type: FieldType.number, + config: {}, + values: new ArrayVector([5, 4]), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 1 }], }, - { - name: 'Value', - type: FieldType.number, - config: {}, - values: new ArrayVector([5, 4]), - }, - ], - meta: { - stats: [{ displayName: 'Ingester: total reached', value: 1 }], - }, - length: 2, -}; + length: 2, + }; -export const metricFrameB: DataFrame = { - refId: 'A', - fields: [ - { - name: 'Time', - type: FieldType.time, - config: {}, - values: new ArrayVector([1000000, 2000000]), + const metricFrameB: DataFrame = { + refId: 'A', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([1000000, 2000000]), + }, + { + name: 'Value', + type: FieldType.number, + config: {}, + values: new ArrayVector([6, 7]), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 2 }], }, - { - name: 'Value', - type: FieldType.number, - config: {}, - values: new ArrayVector([6, 7]), + length: 2, + }; + + const metricFrameC: DataFrame = { + refId: 'A', + name: 'some-time-series', + fields: [ + { + name: 'Time', + type: FieldType.time, + config: {}, + values: new ArrayVector([3000000, 4000000]), + }, + { + name: 'Value', + type: FieldType.number, + config: {}, + values: new ArrayVector([6, 7]), + }, + ], + meta: { + stats: [{ displayName: 'Ingester: total reached', value: 2 }], }, - ], - meta: { - stats: [{ displayName: 'Ingester: total reached', value: 2 }], - }, - length: 2, -}; + length: 2, + }; + + return { + logFrameA, + logFrameB, + metricFrameA, + metricFrameB, + metricFrameC, + }; +} diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/querySplitting.test.ts index 03fa4107054..e5dd8782082 100644 --- a/public/app/plugins/datasource/loki/querySplitting.test.ts +++ b/public/app/plugins/datasource/loki/querySplitting.test.ts @@ -6,7 +6,7 @@ import { dateTime } from '@grafana/data'; import { LokiDatasource } from './datasource'; import * as logsTimeSplit from './logsTimeSplit'; import * as metricTimeSplit from './metricTimeSplit'; -import { createLokiDatasource, logFrameA } from './mocks'; +import { createLokiDatasource, getMockFrames } from './mocks'; import { runPartitionedQuery } from './querySplitting'; import { LokiQuery } from './types'; @@ -65,6 +65,7 @@ describe('runPartitionedQuery()', () => { targets: [{ expr: '{a="b"}', refId: 'A', maxLines: 4 }], range, }); + const { logFrameA } = getMockFrames(); beforeEach(() => { jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); }); diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/querySplitting.ts index 6db4ae20fb0..f0ac8e12d6a 100644 --- a/public/app/plugins/datasource/loki/querySplitting.ts +++ b/public/app/plugins/datasource/loki/querySplitting.ts @@ -100,7 +100,7 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue const totalRequests = partition.length; let shouldStop = false; - let smallQuerySubsciption: Subscription | null = null; + let subquerySubsciption: Subscription | null = null; const runNextRequest = (subscriber: Subscriber, requestN: number) => { if (shouldStop) { return; @@ -121,7 +121,7 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue return; } - smallQuerySubsciption = datasource + subquerySubsciption = datasource .runQuery({ ...request, range, requestId, targets }) .pipe( // in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`. @@ -150,8 +150,8 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue runNextRequest(subscriber, totalRequests); return () => { shouldStop = true; - if (smallQuerySubsciption != null) { - smallQuerySubsciption.unsubscribe(); + if (subquerySubsciption != null) { + subquerySubsciption.unsubscribe(); } }; }); diff --git a/public/app/plugins/datasource/loki/queryUtils.test.ts b/public/app/plugins/datasource/loki/queryUtils.test.ts index 7721f891372..17faea4fa81 100644 --- a/public/app/plugins/datasource/loki/queryUtils.test.ts +++ b/public/app/plugins/datasource/loki/queryUtils.test.ts @@ -1,6 +1,6 @@ import { ArrayVector, DataQueryResponse } from '@grafana/data'; -import { logFrameA, logFrameB, metricFrameA, metricFrameB } from './mocks'; +import { getMockFrames } from './mocks'; import { getHighlighterExpressionsFromQuery, getNormalizedLokiQuery, @@ -12,6 +12,7 @@ import { getParserFromQuery, obfuscate, combineResponses, + cloneQueryResponse, } from './queryUtils'; import { LokiQuery, LokiQueryType } from './types'; @@ -297,8 +298,21 @@ describe('getParserFromQuery', () => { }); }); +describe('cloneQueryResponse', () => { + const { logFrameA } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + it('clones query responses', () => { + const clonedA = cloneQueryResponse(responseA); + expect(clonedA).not.toBe(responseA); + expect(clonedA).toEqual(clonedA); + }); +}); + describe('combineResponses', () => { it('combines logs frames', () => { + const { logFrameA, logFrameB } = getMockFrames(); const responseA: DataQueryResponse = { data: [logFrameA], }; @@ -366,6 +380,7 @@ describe('combineResponses', () => { }); it('combines metric frames', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); const responseA: DataQueryResponse = { data: [metricFrameA], }; @@ -403,4 +418,57 @@ describe('combineResponses', () => { ], }); }); + + it('combines and identifies new frames in the response', () => { + const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB, metricFrameC], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: new ArrayVector([1000000, 2000000, 3000000, 4000000]), + }, + { + config: {}, + name: 'Value', + type: 'number', + values: new ArrayVector([6, 7, 5, 4]), + }, + ], + length: 4, + meta: { + stats: [ + { + displayName: 'Ingester: total reached', + value: 3, + }, + ], + }, + refId: 'A', + }, + metricFrameC, + ], + }); + }); + + it('combines frames in a new response instance', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(null, responseA)).not.toBe(responseA); + expect(combineResponses(null, responseB)).not.toBe(responseB); + }); }); diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts index 6192455158b..9d4aeeda177 100644 --- a/public/app/plugins/datasource/loki/queryUtils.ts +++ b/public/app/plugins/datasource/loki/queryUtils.ts @@ -1,7 +1,7 @@ import { SyntaxNode } from '@lezer/common'; import { escapeRegExp } from 'lodash'; -import { DataQueryResponse, DataQueryResponseData, QueryResultMetaStat } from '@grafana/data'; +import { ArrayVector, DataQueryResponse, DataQueryResponseData, Field, QueryResultMetaStat } from '@grafana/data'; import { parser, LineFilter, @@ -315,13 +315,13 @@ export function requestSupportsPartitioning(allQueries: LokiQuery[]) { export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) { if (!currentResult) { - return newResult; + return cloneQueryResponse(newResult); } newResult.data.forEach((newFrame) => { const currentFrame = currentResult.data.find((frame) => frame.name === newFrame.name); if (!currentFrame) { - currentResult.data.push(newFrame); + currentResult.data.push(cloneDataFrame(newFrame)); return; } combineFrames(currentFrame, newFrame); @@ -333,7 +333,9 @@ export function combineResponses(currentResult: DataQueryResponse | null, newRes function combineFrames(dest: DataQueryResponseData, source: DataQueryResponseData) { const totalFields = dest.fields.length; for (let i = 0; i < totalFields; i++) { - dest.fields[i].values.buffer = [].concat.apply(source.fields[i].values.buffer, dest.fields[i].values.buffer); + dest.fields[i].values = new ArrayVector( + [].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray()) + ); } dest.length += source.length; combineMetadata(dest, source); @@ -359,3 +361,24 @@ function combineMetadata(dest: DataQueryResponseData = {}, source: DataQueryResp } }); } + +/** + * Deep clones a DataQueryResponse + */ +export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse { + const newResponse = { + ...response, + data: response.data.map(cloneDataFrame), + }; + return newResponse; +} + +function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData { + return { + ...frame, + fields: frame.fields.map((field: Field) => ({ + ...field, + values: new ArrayVector(field.values.buffer), + })), + }; +}