mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Loki Query Chunking: Refactor naming conventions and reorganize code (#65056)
* Rename splitting files to chunking * Rename time chunking functions * Move response functions to response utils * Remove some blank spaces * Add an extra test case for frame refIds and names
This commit is contained in:
parent
1b3e6d65ae
commit
6093e45178
@ -33,7 +33,7 @@ import { CustomVariableModel } from '../../../features/variables/types';
|
||||
|
||||
import { LokiDatasource, REF_ID_DATA_SAMPLES } from './datasource';
|
||||
import { createLokiDatasource, createMetadataRequest } from './mocks';
|
||||
import { runPartitionedQueries } from './querySplitting';
|
||||
import { runQueryInChunks } from './queryChunking';
|
||||
import { parseToNodeNamesArray } from './queryUtils';
|
||||
import { LokiOptions, LokiQuery, LokiQueryType, LokiVariableQueryType, SupportingQueryType } from './types';
|
||||
import { LokiVariableSupport } from './variables';
|
||||
@ -45,7 +45,7 @@ jest.mock('@grafana/runtime', () => {
|
||||
};
|
||||
});
|
||||
|
||||
jest.mock('./querySplitting');
|
||||
jest.mock('./queryChunking');
|
||||
|
||||
const templateSrvStub = {
|
||||
getAdhocFilters: jest.fn(() => [] as unknown[]),
|
||||
@ -1105,7 +1105,7 @@ describe('LokiDatasource', () => {
|
||||
describe('Query splitting', () => {
|
||||
beforeAll(() => {
|
||||
config.featureToggles.lokiQuerySplitting = true;
|
||||
jest.mocked(runPartitionedQueries).mockReturnValue(
|
||||
jest.mocked(runQueryInChunks).mockReturnValue(
|
||||
of({
|
||||
data: [],
|
||||
})
|
||||
@ -1131,7 +1131,7 @@ describe('LokiDatasource', () => {
|
||||
});
|
||||
|
||||
await expect(ds.query(query)).toEmitValuesWith(() => {
|
||||
expect(runPartitionedQueries).toHaveBeenCalled();
|
||||
expect(runQueryInChunks).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -67,8 +67,8 @@ import {
|
||||
findLastPosition,
|
||||
getLabelFilterPositions,
|
||||
} from './modifyQuery';
|
||||
import { runQueryInChunks } from './queryChunking';
|
||||
import { getQueryHints } from './queryHints';
|
||||
import { runPartitionedQueries } from './querySplitting';
|
||||
import {
|
||||
getLogQueryFromMetricsQuery,
|
||||
getNormalizedLokiQuery,
|
||||
@ -76,7 +76,7 @@ import {
|
||||
getParserFromQuery,
|
||||
isLogsQuery,
|
||||
isValidQuery,
|
||||
requestSupportsPartitioning,
|
||||
requestSupporsChunking,
|
||||
} from './queryUtils';
|
||||
import { sortDataFrameByTime, SortDirection } from './sortDataFrame';
|
||||
import { doLokiChannelStream } from './streaming';
|
||||
@ -285,8 +285,8 @@ export class LokiDatasource
|
||||
return this.runLiveQueryThroughBackend(fixedRequest);
|
||||
}
|
||||
|
||||
if (config.featureToggles.lokiQuerySplitting && requestSupportsPartitioning(fixedRequest.targets)) {
|
||||
return runPartitionedQueries(this, fixedRequest);
|
||||
if (config.featureToggles.lokiQuerySplitting && requestSupporsChunking(fixedRequest.targets)) {
|
||||
return runQueryInChunks(this, fixedRequest);
|
||||
}
|
||||
|
||||
return this.runQuery(fixedRequest);
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { getRangeChunks } from './logsTimeSplit';
|
||||
import { getRangeChunks } from './logsTimeChunking';
|
||||
|
||||
describe('querySplit', () => {
|
||||
describe('logs getRangeChunks', () => {
|
||||
it('should split time range into chunks', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03.234');
|
||||
const end = Date.parse('2022-02-06T14:11:03.567');
|
@ -1,6 +1,6 @@
|
||||
import { getRangeChunks } from './metricTimeSplit';
|
||||
import { getRangeChunks } from './metricTimeChunking';
|
||||
|
||||
describe('querySplit', () => {
|
||||
describe('metric getRangeChunks', () => {
|
||||
it('should split time range into chunks', () => {
|
||||
const start = Date.parse('2022-02-06T14:10:03');
|
||||
const end = Date.parse('2022-02-06T14:11:03');
|
@ -5,13 +5,13 @@ import { dateTime } from '@grafana/data';
|
||||
import { LoadingState } from '@grafana/schema';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import * as logsTimeSplit from './logsTimeSplit';
|
||||
import * as metricTimeSplit from './metricTimeSplit';
|
||||
import * as logsTimeSplit from './logsTimeChunking';
|
||||
import * as metricTimeSplit from './metricTimeChunking';
|
||||
import { createLokiDatasource, getMockFrames } from './mocks';
|
||||
import { runPartitionedQueries } from './querySplitting';
|
||||
import { runQueryInChunks } from './queryChunking';
|
||||
import { LokiQuery, LokiQueryType } from './types';
|
||||
|
||||
describe('runPartitionedQueries()', () => {
|
||||
describe('runQueryInChunks()', () => {
|
||||
let datasource: LokiDatasource;
|
||||
const range = {
|
||||
from: dateTime('2023-02-08T05:00:00.000Z'),
|
||||
@ -31,7 +31,7 @@ describe('runPartitionedQueries()', () => {
|
||||
});
|
||||
|
||||
test('Splits datasource queries', async () => {
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 3 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -41,7 +41,7 @@ describe('runPartitionedQueries()', () => {
|
||||
jest
|
||||
.spyOn(datasource, 'runQuery')
|
||||
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith((values) => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith((values) => {
|
||||
expect(values).toEqual([{ error: { refId: 'A', message: 'Error' }, data: [], state: LoadingState.Streaming }]);
|
||||
});
|
||||
});
|
||||
@ -63,7 +63,7 @@ describe('runPartitionedQueries()', () => {
|
||||
jest.mocked(metricTimeSplit.getRangeChunks).mockRestore();
|
||||
});
|
||||
test('Ignores hidden queries', async () => {
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
expect(logsTimeSplit.getRangeChunks).toHaveBeenCalled();
|
||||
expect(metricTimeSplit.getRangeChunks).not.toHaveBeenCalled();
|
||||
});
|
||||
@ -80,14 +80,14 @@ describe('runPartitionedQueries()', () => {
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
|
||||
});
|
||||
test('Stops requesting once maxLines of logs have been received', async () => {
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 2 responses of 2 logs, 2 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
test('Performs all the requests if maxLines has not been reached', async () => {
|
||||
request.targets[0].maxLines = 9999;
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -95,7 +95,7 @@ describe('runPartitionedQueries()', () => {
|
||||
test('Performs all the requests if not a log query', async () => {
|
||||
request.targets[0].maxLines = 1;
|
||||
request.targets[0].expr = 'count_over_time({a="b"}[1m])';
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 3 responses of 2 logs, 3 requests
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -114,7 +114,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 1x Metric + 1x Log, 6 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(6);
|
||||
});
|
||||
@ -127,7 +127,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 1x2 Metric, 3 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -140,7 +140,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 1x2 Logs, 3 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -153,7 +153,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// Instant queries are omitted from splitting
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
@ -170,7 +170,7 @@ describe('runPartitionedQueries()', () => {
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [], refId: 'B' }));
|
||||
jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' }));
|
||||
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 1x Logs + 3x Metric, 3 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
@ -184,7 +184,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 3 days, 3 chunks, 3x Logs + 3x Metric + 1x Instant, 7 requests.
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(7);
|
||||
});
|
||||
@ -208,7 +208,7 @@ describe('runPartitionedQueries()', () => {
|
||||
targets: [{ expr: '{a="b"}', refId: 'A', chunkDuration: '30m' }],
|
||||
range: range1h,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@ -217,7 +217,7 @@ describe('runPartitionedQueries()', () => {
|
||||
targets: [{ expr: '{a="b"}', refId: 'A', chunkDuration: '1h' }],
|
||||
range: range1h,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@ -229,7 +229,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range: range1h,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@ -241,7 +241,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range: range1h,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 2 x 30m + 1 x 1h
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
@ -254,7 +254,7 @@ describe('runPartitionedQueries()', () => {
|
||||
],
|
||||
range: range1h,
|
||||
});
|
||||
await expect(runPartitionedQueries(datasource, request)).toEmitValuesWith(() => {
|
||||
await expect(runQueryInChunks(datasource, request)).toEmitValuesWith(() => {
|
||||
// 2 x 30m + 1 x 1h
|
||||
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
|
||||
});
|
@ -13,9 +13,10 @@ import {
|
||||
import { LoadingState } from '@grafana/schema';
|
||||
|
||||
import { LokiDatasource } from './datasource';
|
||||
import { getRangeChunks as getLogsRangeChunks } from './logsTimeSplit';
|
||||
import { getRangeChunks as getMetricRangeChunks } from './metricTimeSplit';
|
||||
import { combineResponses, isLogsQuery } from './queryUtils';
|
||||
import { getRangeChunks as getLogsRangeChunks } from './logsTimeChunking';
|
||||
import { getRangeChunks as getMetricRangeChunks } from './metricTimeChunking';
|
||||
import { isLogsQuery } from './queryUtils';
|
||||
import { combineResponses } from './responseUtils';
|
||||
import { LokiQuery, LokiQueryType } from './types';
|
||||
|
||||
export function partitionTimeRange(
|
||||
@ -30,7 +31,6 @@ export function partitionTimeRange(
|
||||
// we need to replicate this algo:
|
||||
//
|
||||
// https://github.com/grafana/grafana/blob/main/pkg/tsdb/loki/step.go#L23
|
||||
|
||||
const start = originalTimeRange.from.toDate().getTime();
|
||||
const end = originalTimeRange.to.toDate().getTime();
|
||||
|
||||
@ -58,7 +58,6 @@ export function partitionTimeRange(
|
||||
* At the end, we will filter the targets that don't need to be executed in the next request batch,
|
||||
* becasue, for example, the `maxLines` have been reached.
|
||||
*/
|
||||
|
||||
function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] {
|
||||
if (!response) {
|
||||
return targets;
|
||||
@ -84,7 +83,7 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer
|
||||
|
||||
type LokiGroupedRequest = Array<{ request: DataQueryRequest<LokiQuery>; partition: TimeRange[] }>;
|
||||
|
||||
export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest) {
|
||||
export function runGroupedQueriesInChunks(datasource: LokiDatasource, requests: LokiGroupedRequest) {
|
||||
let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming };
|
||||
const totalRequests = Math.max(...requests.map(({ partition }) => partition.length));
|
||||
|
||||
@ -168,7 +167,7 @@ function getNextRequestPointers(requests: LokiGroupedRequest, requestGroup: numb
|
||||
};
|
||||
}
|
||||
|
||||
export function runPartitionedQueries(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
|
||||
export function runQueryInChunks(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
|
||||
const queries = request.targets.filter((query) => !query.hide);
|
||||
const [instantQueries, normalQueries] = partition(queries, (query) => query.queryType === LokiQueryType.Instant);
|
||||
const [logQueries, metricQueries] = partition(normalQueries, (query) => isLogsQuery(query.expr));
|
||||
@ -217,5 +216,5 @@ export function runPartitionedQueries(datasource: LokiDatasource, request: DataQ
|
||||
});
|
||||
}
|
||||
|
||||
return runGroupedQueries(datasource, requests);
|
||||
return runGroupedQueriesInChunks(datasource, requests);
|
||||
}
|
@ -1,6 +1,3 @@
|
||||
import { ArrayVector, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
|
||||
|
||||
import { getMockFrames } from './mocks';
|
||||
import {
|
||||
getHighlighterExpressionsFromQuery,
|
||||
getNormalizedLokiQuery,
|
||||
@ -11,9 +8,7 @@ import {
|
||||
parseToNodeNamesArray,
|
||||
getParserFromQuery,
|
||||
obfuscate,
|
||||
combineResponses,
|
||||
cloneQueryResponse,
|
||||
requestSupportsPartitioning,
|
||||
requestSupporsChunking,
|
||||
} from './queryUtils';
|
||||
import { LokiQuery, LokiQueryType } from './types';
|
||||
|
||||
@ -299,305 +294,7 @@ describe('getParserFromQuery', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('cloneQueryResponse', () => {
|
||||
const { logFrameA } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [logFrameA],
|
||||
};
|
||||
it('clones query responses', () => {
|
||||
const clonedA = cloneQueryResponse(responseA);
|
||||
expect(clonedA).not.toBe(responseA);
|
||||
expect(clonedA).toEqual(clonedA);
|
||||
});
|
||||
});
|
||||
|
||||
describe('combineResponses', () => {
|
||||
it('combines logs frames', () => {
|
||||
const { logFrameA, logFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [logFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [logFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1, 2, 3, 4]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Line',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['line3', 'line4', 'line1', 'line2']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'labels',
|
||||
type: 'other',
|
||||
values: new ArrayVector([
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
{
|
||||
label: 'value',
|
||||
},
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'tsNs',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'id',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['id3', 'id4', 'id1', 'id2']),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines metric frames', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Value',
|
||||
type: 'number',
|
||||
values: new ArrayVector([6, 7, 5, 4]),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines and identifies new frames in the response', () => {
|
||||
const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB, metricFrameC],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Value',
|
||||
type: 'number',
|
||||
values: new ArrayVector([6, 7, 5, 4]),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
metricFrameC,
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines frames in a new response instance', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
expect(combineResponses(null, responseA)).not.toBe(responseA);
|
||||
expect(combineResponses(null, responseB)).not.toBe(responseB);
|
||||
});
|
||||
|
||||
it('combine when first param has errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const errorA = {
|
||||
message: 'errorA',
|
||||
};
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
error: errorA,
|
||||
errors: [errorA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorA');
|
||||
expect(combined.errors).toHaveLength(1);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorA');
|
||||
});
|
||||
|
||||
it('combine when second param has errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const errorB = {
|
||||
message: 'errorB',
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
error: errorB,
|
||||
errors: [errorB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorB');
|
||||
expect(combined.errors).toHaveLength(1);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorB');
|
||||
});
|
||||
|
||||
it('combine when both params have errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const errorA = {
|
||||
message: 'errorA',
|
||||
};
|
||||
const errorB = {
|
||||
message: 'errorB',
|
||||
};
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
error: errorA,
|
||||
errors: [errorA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
error: errorB,
|
||||
errors: [errorB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorA');
|
||||
expect(combined.errors).toHaveLength(2);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorA');
|
||||
expect(combined.errors?.[1]?.message).toBe('errorB');
|
||||
});
|
||||
|
||||
describe('combine stats', () => {
|
||||
const { metricFrameA } = getMockFrames();
|
||||
const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({
|
||||
data: [
|
||||
{
|
||||
...metricFrameA,
|
||||
meta: {
|
||||
...metricFrameA.meta,
|
||||
stats,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
it('two values', () => {
|
||||
const responseA = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 1 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
const responseB = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 2 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 },
|
||||
]);
|
||||
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 },
|
||||
]);
|
||||
});
|
||||
|
||||
it('one value', () => {
|
||||
const responseA = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 1 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
const responseB = makeResponse();
|
||||
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
|
||||
expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
});
|
||||
|
||||
it('no value', () => {
|
||||
const responseA = makeResponse();
|
||||
const responseB = makeResponse();
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('requestSupportsPartitioning', () => {
|
||||
describe('requestSupporsChunking', () => {
|
||||
it('hidden requests are not partitioned', () => {
|
||||
const requests: LokiQuery[] = [
|
||||
{
|
||||
@ -606,7 +303,7 @@ describe('requestSupportsPartitioning', () => {
|
||||
hide: true,
|
||||
},
|
||||
];
|
||||
expect(requestSupportsPartitioning(requests)).toBe(false);
|
||||
expect(requestSupporsChunking(requests)).toBe(false);
|
||||
});
|
||||
it('special requests are not partitioned', () => {
|
||||
const requests: LokiQuery[] = [
|
||||
@ -615,7 +312,7 @@ describe('requestSupportsPartitioning', () => {
|
||||
refId: 'do-not-chunk',
|
||||
},
|
||||
];
|
||||
expect(requestSupportsPartitioning(requests)).toBe(false);
|
||||
expect(requestSupporsChunking(requests)).toBe(false);
|
||||
});
|
||||
it('empty requests are not partitioned', () => {
|
||||
const requests: LokiQuery[] = [
|
||||
@ -624,7 +321,7 @@ describe('requestSupportsPartitioning', () => {
|
||||
refId: 'A',
|
||||
},
|
||||
];
|
||||
expect(requestSupportsPartitioning(requests)).toBe(false);
|
||||
expect(requestSupporsChunking(requests)).toBe(false);
|
||||
});
|
||||
it('all other requests are partitioned', () => {
|
||||
const requests: LokiQuery[] = [
|
||||
@ -637,6 +334,6 @@ describe('requestSupportsPartitioning', () => {
|
||||
refId: 'B',
|
||||
},
|
||||
];
|
||||
expect(requestSupportsPartitioning(requests)).toBe(true);
|
||||
expect(requestSupporsChunking(requests)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
@ -1,14 +1,6 @@
|
||||
import { SyntaxNode } from '@lezer/common';
|
||||
import { escapeRegExp } from 'lodash';
|
||||
|
||||
import {
|
||||
ArrayVector,
|
||||
DataFrame,
|
||||
DataQueryResponse,
|
||||
DataQueryResponseData,
|
||||
Field,
|
||||
QueryResultMetaStat,
|
||||
} from '@grafana/data';
|
||||
import {
|
||||
parser,
|
||||
LineFilter,
|
||||
@ -304,7 +296,7 @@ export function getStreamSelectorsFromQuery(query: string): string[] {
|
||||
return labelMatchers;
|
||||
}
|
||||
|
||||
export function requestSupportsPartitioning(allQueries: LokiQuery[]) {
|
||||
export function requestSupporsChunking(allQueries: LokiQuery[]) {
|
||||
const queries = allQueries
|
||||
.filter((query) => !query.hide)
|
||||
.filter((query) => !query.refId.includes('do-not-chunk'))
|
||||
@ -312,102 +304,3 @@ export function requestSupportsPartitioning(allQueries: LokiQuery[]) {
|
||||
|
||||
return queries.length > 0;
|
||||
}
|
||||
|
||||
function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean {
|
||||
if (frame1.refId !== frame2.refId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return frame1.name === frame2.name;
|
||||
}
|
||||
|
||||
export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {
|
||||
if (!currentResult) {
|
||||
return cloneQueryResponse(newResult);
|
||||
}
|
||||
|
||||
newResult.data.forEach((newFrame) => {
|
||||
const currentFrame = currentResult.data.find((frame) => shouldCombine(frame, newFrame));
|
||||
if (!currentFrame) {
|
||||
currentResult.data.push(cloneDataFrame(newFrame));
|
||||
return;
|
||||
}
|
||||
combineFrames(currentFrame, newFrame);
|
||||
});
|
||||
|
||||
const mergedErrors = [...(currentResult.errors ?? []), ...(newResult.errors ?? [])];
|
||||
|
||||
// we make sure to have `.errors` as undefined, instead of empty-array
|
||||
// when no errors.
|
||||
|
||||
if (mergedErrors.length > 0) {
|
||||
currentResult.errors = mergedErrors;
|
||||
}
|
||||
|
||||
// the `.error` attribute is obsolete now,
|
||||
// but we have to maintain it, otherwise
|
||||
// some grafana parts do not behave well.
|
||||
// we just choose the old error, if it exists,
|
||||
// otherwise the new error, if it exists.
|
||||
currentResult.error = currentResult.error ?? newResult.error;
|
||||
|
||||
return currentResult;
|
||||
}
|
||||
|
||||
function combineFrames(dest: DataFrame, source: DataFrame) {
|
||||
const totalFields = dest.fields.length;
|
||||
for (let i = 0; i < totalFields; i++) {
|
||||
dest.fields[i].values = new ArrayVector(
|
||||
[].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray())
|
||||
);
|
||||
}
|
||||
dest.length += source.length;
|
||||
dest.meta = {
|
||||
...dest.meta,
|
||||
stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []),
|
||||
};
|
||||
}
|
||||
|
||||
const TOTAL_BYTES_STAT = 'Summary: total bytes processed';
|
||||
|
||||
function getCombinedMetadataStats(
|
||||
destStats: QueryResultMetaStat[],
|
||||
sourceStats: QueryResultMetaStat[]
|
||||
): QueryResultMetaStat[] {
|
||||
// in the current approach, we only handle a single stat
|
||||
const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
|
||||
const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
|
||||
|
||||
if (sourceStat != null && destStat != null) {
|
||||
return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }];
|
||||
}
|
||||
|
||||
// maybe one of them exist
|
||||
const eitherStat = sourceStat ?? destStat;
|
||||
if (eitherStat != null) {
|
||||
return [eitherStat];
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Deep clones a DataQueryResponse
|
||||
*/
|
||||
export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse {
|
||||
const newResponse = {
|
||||
...response,
|
||||
data: response.data.map(cloneDataFrame),
|
||||
};
|
||||
return newResponse;
|
||||
}
|
||||
|
||||
function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData {
|
||||
return {
|
||||
...frame,
|
||||
fields: frame.fields.map((field: Field<unknown, ArrayVector>) => ({
|
||||
...field,
|
||||
values: new ArrayVector(field.values.buffer),
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
@ -1,7 +1,8 @@
|
||||
import { cloneDeep } from 'lodash';
|
||||
|
||||
import { ArrayVector, DataFrame, FieldType } from '@grafana/data';
|
||||
import { ArrayVector, DataQueryResponse, QueryResultMetaStat, DataFrame, FieldType } from '@grafana/data';
|
||||
|
||||
import { getMockFrames } from './mocks';
|
||||
import {
|
||||
dataFrameHasLevelLabel,
|
||||
dataFrameHasLokiError,
|
||||
@ -9,6 +10,8 @@ import {
|
||||
extractLogParserFromDataFrame,
|
||||
extractLabelKeysFromDataFrame,
|
||||
extractUnwrapLabelKeysFromDataFrame,
|
||||
cloneQueryResponse,
|
||||
combineResponses,
|
||||
} from './responseUtils';
|
||||
|
||||
const frame: DataFrame = {
|
||||
@ -119,3 +122,324 @@ describe('extractUnwrapLabelKeysFromDataFrame', () => {
|
||||
expect(extractUnwrapLabelKeysFromDataFrame(input)).toEqual(['number']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('cloneQueryResponse', () => {
|
||||
const { logFrameA } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [logFrameA],
|
||||
};
|
||||
it('clones query responses', () => {
|
||||
const clonedA = cloneQueryResponse(responseA);
|
||||
expect(clonedA).not.toBe(responseA);
|
||||
expect(clonedA).toEqual(clonedA);
|
||||
});
|
||||
});
|
||||
|
||||
describe('combineResponses', () => {
|
||||
it('combines logs frames', () => {
|
||||
const { logFrameA, logFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [logFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [logFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1, 2, 3, 4]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Line',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['line3', 'line4', 'line1', 'line2']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'labels',
|
||||
type: 'other',
|
||||
values: new ArrayVector([
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
{
|
||||
label: 'value',
|
||||
},
|
||||
{
|
||||
otherLabel: 'other value',
|
||||
},
|
||||
]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'tsNs',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['1000000', '2000000', '3000000', '4000000']),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'id',
|
||||
type: 'string',
|
||||
values: new ArrayVector(['id3', 'id4', 'id1', 'id2']),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines metric frames', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Value',
|
||||
type: 'number',
|
||||
values: new ArrayVector([6, 7, 5, 4]),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines and identifies new frames in the response', () => {
|
||||
const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB, metricFrameC],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [
|
||||
{
|
||||
fields: [
|
||||
{
|
||||
config: {},
|
||||
name: 'Time',
|
||||
type: 'time',
|
||||
values: new ArrayVector([1000000, 2000000, 3000000, 4000000]),
|
||||
},
|
||||
{
|
||||
config: {},
|
||||
name: 'Value',
|
||||
type: 'number',
|
||||
values: new ArrayVector([6, 7, 5, 4]),
|
||||
},
|
||||
],
|
||||
length: 4,
|
||||
meta: {
|
||||
stats: [
|
||||
{
|
||||
displayName: 'Summary: total bytes processed',
|
||||
unit: 'decbytes',
|
||||
value: 33,
|
||||
},
|
||||
],
|
||||
},
|
||||
refId: 'A',
|
||||
},
|
||||
metricFrameC,
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines frames prioritizing refIds over names', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const dataFrameA = {
|
||||
...metricFrameA,
|
||||
refId: 'A',
|
||||
name: 'A',
|
||||
};
|
||||
const dataFrameB = {
|
||||
...metricFrameB,
|
||||
refId: 'B',
|
||||
name: 'A',
|
||||
};
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [dataFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [dataFrameB],
|
||||
};
|
||||
expect(combineResponses(responseA, responseB)).toEqual({
|
||||
data: [dataFrameA, dataFrameB],
|
||||
});
|
||||
});
|
||||
|
||||
it('combines frames in a new response instance', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
expect(combineResponses(null, responseA)).not.toBe(responseA);
|
||||
expect(combineResponses(null, responseB)).not.toBe(responseB);
|
||||
});
|
||||
|
||||
it('combine when first param has errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const errorA = {
|
||||
message: 'errorA',
|
||||
};
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
error: errorA,
|
||||
errors: [errorA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorA');
|
||||
expect(combined.errors).toHaveLength(1);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorA');
|
||||
});
|
||||
|
||||
it('combine when second param has errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
};
|
||||
const errorB = {
|
||||
message: 'errorB',
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
error: errorB,
|
||||
errors: [errorB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorB');
|
||||
expect(combined.errors).toHaveLength(1);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorB');
|
||||
});
|
||||
|
||||
it('combine when both params have errors', () => {
|
||||
const { metricFrameA, metricFrameB } = getMockFrames();
|
||||
const errorA = {
|
||||
message: 'errorA',
|
||||
};
|
||||
const errorB = {
|
||||
message: 'errorB',
|
||||
};
|
||||
const responseA: DataQueryResponse = {
|
||||
data: [metricFrameA],
|
||||
error: errorA,
|
||||
errors: [errorA],
|
||||
};
|
||||
const responseB: DataQueryResponse = {
|
||||
data: [metricFrameB],
|
||||
error: errorB,
|
||||
errors: [errorB],
|
||||
};
|
||||
|
||||
const combined = combineResponses(responseA, responseB);
|
||||
expect(combined.data[0].length).toBe(4);
|
||||
expect(combined.error?.message).toBe('errorA');
|
||||
expect(combined.errors).toHaveLength(2);
|
||||
expect(combined.errors?.[0]?.message).toBe('errorA');
|
||||
expect(combined.errors?.[1]?.message).toBe('errorB');
|
||||
});
|
||||
|
||||
describe('combine stats', () => {
|
||||
const { metricFrameA } = getMockFrames();
|
||||
const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({
|
||||
data: [
|
||||
{
|
||||
...metricFrameA,
|
||||
meta: {
|
||||
...metricFrameA.meta,
|
||||
stats,
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
it('two values', () => {
|
||||
const responseA = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 1 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
const responseB = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 2 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 },
|
||||
]);
|
||||
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 },
|
||||
]);
|
||||
});
|
||||
|
||||
it('one value', () => {
|
||||
const responseA = makeResponse([
|
||||
{ displayName: 'Ingester: total reached', value: 1 },
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
const responseB = makeResponse();
|
||||
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
|
||||
expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([
|
||||
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
|
||||
]);
|
||||
});
|
||||
|
||||
it('no value', () => {
|
||||
const responseA = makeResponse();
|
||||
const responseB = makeResponse();
|
||||
expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -1,4 +1,14 @@
|
||||
import { DataFrame, FieldType, isValidGoDuration, Labels } from '@grafana/data';
|
||||
import {
|
||||
ArrayVector,
|
||||
DataFrame,
|
||||
DataQueryResponse,
|
||||
DataQueryResponseData,
|
||||
Field,
|
||||
FieldType,
|
||||
isValidGoDuration,
|
||||
Labels,
|
||||
QueryResultMetaStat,
|
||||
} from '@grafana/data';
|
||||
|
||||
import { isBytesString } from './languageUtils';
|
||||
import { isLogLineJSON, isLogLineLogfmt } from './lineParser';
|
||||
@ -100,3 +110,102 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
|
||||
}
|
||||
return levelLikeLabel;
|
||||
}
|
||||
|
||||
function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean {
|
||||
if (frame1.refId !== frame2.refId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return frame1.name === frame2.name;
|
||||
}
|
||||
|
||||
export function combineResponses(currentResult: DataQueryResponse | null, newResult: DataQueryResponse) {
|
||||
if (!currentResult) {
|
||||
return cloneQueryResponse(newResult);
|
||||
}
|
||||
|
||||
newResult.data.forEach((newFrame) => {
|
||||
const currentFrame = currentResult.data.find((frame) => shouldCombine(frame, newFrame));
|
||||
if (!currentFrame) {
|
||||
currentResult.data.push(cloneDataFrame(newFrame));
|
||||
return;
|
||||
}
|
||||
combineFrames(currentFrame, newFrame);
|
||||
});
|
||||
|
||||
const mergedErrors = [...(currentResult.errors ?? []), ...(newResult.errors ?? [])];
|
||||
|
||||
// we make sure to have `.errors` as undefined, instead of empty-array
|
||||
// when no errors.
|
||||
|
||||
if (mergedErrors.length > 0) {
|
||||
currentResult.errors = mergedErrors;
|
||||
}
|
||||
|
||||
// the `.error` attribute is obsolete now,
|
||||
// but we have to maintain it, otherwise
|
||||
// some grafana parts do not behave well.
|
||||
// we just choose the old error, if it exists,
|
||||
// otherwise the new error, if it exists.
|
||||
currentResult.error = currentResult.error ?? newResult.error;
|
||||
|
||||
return currentResult;
|
||||
}
|
||||
|
||||
function combineFrames(dest: DataFrame, source: DataFrame) {
|
||||
const totalFields = dest.fields.length;
|
||||
for (let i = 0; i < totalFields; i++) {
|
||||
dest.fields[i].values = new ArrayVector(
|
||||
[].concat.apply(source.fields[i].values.toArray(), dest.fields[i].values.toArray())
|
||||
);
|
||||
}
|
||||
dest.length += source.length;
|
||||
dest.meta = {
|
||||
...dest.meta,
|
||||
stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []),
|
||||
};
|
||||
}
|
||||
|
||||
const TOTAL_BYTES_STAT = 'Summary: total bytes processed';
|
||||
|
||||
function getCombinedMetadataStats(
|
||||
destStats: QueryResultMetaStat[],
|
||||
sourceStats: QueryResultMetaStat[]
|
||||
): QueryResultMetaStat[] {
|
||||
// in the current approach, we only handle a single stat
|
||||
const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
|
||||
const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
|
||||
|
||||
if (sourceStat != null && destStat != null) {
|
||||
return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }];
|
||||
}
|
||||
|
||||
// maybe one of them exist
|
||||
const eitherStat = sourceStat ?? destStat;
|
||||
if (eitherStat != null) {
|
||||
return [eitherStat];
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Deep clones a DataQueryResponse
|
||||
*/
|
||||
export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse {
|
||||
const newResponse = {
|
||||
...response,
|
||||
data: response.data.map(cloneDataFrame),
|
||||
};
|
||||
return newResponse;
|
||||
}
|
||||
|
||||
function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData {
|
||||
return {
|
||||
...frame,
|
||||
fields: frame.fields.map((field: Field<unknown, ArrayVector>) => ({
|
||||
...field,
|
||||
values: new ArrayVector(field.values.buffer),
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user