diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md
index 972439653ac..5111856e609 100644
--- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md
+++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md
@@ -135,6 +135,7 @@ Experimental features might be changed or removed without prior notice.
| `mysqlParseTime` | Ensure the parseTime flag is set for MySQL driver |
| `alertingBacktesting` | Rule backtesting API for alerting |
| `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files |
+| `lokiShardSplitting` | Use stream shards to split queries into smaller subqueries |
| `lokiQuerySplittingConfig` | Give users the option to configure split durations for Loki queries |
| `individualCookiePreferences` | Support overriding cookie preferences per user |
| `influxqlStreamingParser` | Enable streaming JSON parser for InfluxDB datasource InfluxQL query language |
diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts
index ac423666cd9..644e5531261 100644
--- a/packages/grafana-data/src/types/featureToggles.gen.ts
+++ b/packages/grafana-data/src/types/featureToggles.gen.ts
@@ -52,6 +52,7 @@ export interface FeatureToggles {
editPanelCSVDragAndDrop?: boolean;
alertingNoNormalState?: boolean;
logsContextDatasourceUi?: boolean;
+ lokiShardSplitting?: boolean;
lokiQuerySplitting?: boolean;
lokiQuerySplittingConfig?: boolean;
individualCookiePreferences?: boolean;
diff --git a/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts b/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts
index b8d2b5ffc49..a9b2c7ecaee 100644
--- a/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts
+++ b/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts
@@ -33,6 +33,7 @@ export enum SupportingQueryType {
export enum LokiQueryDirection {
Backward = 'backward',
Forward = 'forward',
+ Scan = 'scan',
}
export interface LokiDataQuery extends common.DataQuery {
diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go
index d3ff4aa7755..2141ce0cd6d 100644
--- a/pkg/services/featuremgmt/registry.go
+++ b/pkg/services/featuremgmt/registry.go
@@ -265,6 +265,13 @@ var (
Expression: "true", // turned on by default
AllowSelfServe: true,
},
+ {
+ Name: "lokiShardSplitting",
+ Description: "Use stream shards to split queries into smaller subqueries",
+ Stage: FeatureStageExperimental,
+ FrontendOnly: true,
+ Owner: grafanaObservabilityLogsSquad,
+ },
{
Name: "lokiQuerySplitting",
Description: "Split large interval queries into subqueries with smaller time intervals",
diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv
index 4718a0d7ffa..a452651a933 100644
--- a/pkg/services/featuremgmt/toggles_gen.csv
+++ b/pkg/services/featuremgmt/toggles_gen.csv
@@ -33,6 +33,7 @@ alertingBacktesting,experimental,@grafana/alerting-squad,false,false,false
editPanelCSVDragAndDrop,experimental,@grafana/dataviz-squad,false,false,true
alertingNoNormalState,preview,@grafana/alerting-squad,false,false,false
logsContextDatasourceUi,GA,@grafana/observability-logs,false,false,true
+lokiShardSplitting,experimental,@grafana/observability-logs,false,false,true
lokiQuerySplitting,GA,@grafana/observability-logs,false,false,true
lokiQuerySplittingConfig,experimental,@grafana/observability-logs,false,false,true
individualCookiePreferences,experimental,@grafana/grafana-backend-group,false,false,false
diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go
index 921f061f555..5e91ba9dd42 100644
--- a/pkg/services/featuremgmt/toggles_gen.go
+++ b/pkg/services/featuremgmt/toggles_gen.go
@@ -143,6 +143,10 @@ const (
// Allow datasource to provide custom UI for context view
FlagLogsContextDatasourceUi = "logsContextDatasourceUi"
+ // FlagLokiShardSplitting
+ // Use stream shards to split queries into smaller subqueries
+ FlagLokiShardSplitting = "lokiShardSplitting"
+
// FlagLokiQuerySplitting
// Split large interval queries into subqueries with smaller time intervals
FlagLokiQuerySplitting = "lokiQuerySplitting"
diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json
index 2e08fb55bf1..bb89eb8bcfc 100644
--- a/pkg/services/featuremgmt/toggles_gen.json
+++ b/pkg/services/featuremgmt/toggles_gen.json
@@ -1986,6 +1986,22 @@
"codeowner": "@grafana/observability-logs"
}
},
+ {
+ "metadata": {
+ "name": "lokiShardSplitting",
+ "resourceVersion": "1729678036788",
+ "creationTimestamp": "2024-10-23T10:06:42Z",
+ "annotations": {
+ "grafana.app/updatedTimestamp": "2024-10-23 10:07:16.788828 +0000 UTC"
+ }
+ },
+ "spec": {
+ "description": "Use stream shards to split queries into smaller subqueries",
+ "stage": "experimental",
+ "codeowner": "@grafana/observability-logs",
+ "frontend": true
+ }
+ },
{
"metadata": {
"name": "lokiStructuredMetadata",
diff --git a/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go b/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
index f9dcd723805..c386bd5c3b4 100644
--- a/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
+++ b/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go
@@ -13,6 +13,7 @@ package dataquery
const (
LokiQueryDirectionBackward LokiQueryDirection = "backward"
LokiQueryDirectionForward LokiQueryDirection = "forward"
+ LokiQueryDirectionScan LokiQueryDirection = "scan"
)
// Defines values for LokiQueryType.
diff --git a/pkg/tsdb/loki/parse_query.go b/pkg/tsdb/loki/parse_query.go
index a4f39679d6c..d824bf596dc 100644
--- a/pkg/tsdb/loki/parse_query.go
+++ b/pkg/tsdb/loki/parse_query.go
@@ -94,6 +94,8 @@ func parseDirection(jsonPointerValue *string) (Direction, error) {
return DirectionBackward, nil
case "forward":
return DirectionForward, nil
+ case "scan":
+ return DirectionBackward, nil
default:
return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue)
}
diff --git a/public/app/features/explore/Logs/LogsVolumePanelList.tsx b/public/app/features/explore/Logs/LogsVolumePanelList.tsx
index a25a02b29dc..f2bbc0b3905 100644
--- a/public/app/features/explore/Logs/LogsVolumePanelList.tsx
+++ b/public/app/features/explore/Logs/LogsVolumePanelList.tsx
@@ -109,7 +109,7 @@ export const LogsVolumePanelList = ({
return ;
}
- if (numberOfLogVolumes === 0) {
+ if (numberOfLogVolumes === 0 && logsVolumeData?.state !== LoadingState.Streaming) {
return (
@@ -122,7 +122,6 @@ export const LogsVolumePanelList = ({
return (
{Object.keys(logVolumes).map((name, index) => {
- const logsVolumeData = { data: logVolumes[name] };
return (
> = [
label: 'Forward',
description: 'Search in forward direction.',
},
+ {
+ value: LokiQueryDirection.Scan,
+ label: 'Scan',
+ description: 'Split the query into smaller units and stop at the requested log line limit.',
+ },
];
+export function getQueryDirectionLabel(direction: LokiQueryDirection) {
+ return queryDirections.find((queryDirection) => queryDirection.value === direction)?.label ?? 'Unknown';
+}
+
if (config.featureToggles.lokiExperimentalStreaming) {
queryTypeOptions.push({
value: LokiQueryType.Stream,
diff --git a/public/app/plugins/datasource/loki/dataquery.cue b/public/app/plugins/datasource/loki/dataquery.cue
index 97bc3103ef8..63dd1c0265a 100644
--- a/public/app/plugins/datasource/loki/dataquery.cue
+++ b/public/app/plugins/datasource/loki/dataquery.cue
@@ -49,7 +49,7 @@ composableKinds: DataQuery: {
#SupportingQueryType: "logsVolume" | "logsSample" | "dataSample" | "infiniteScroll" @cuetsy(kind="enum")
- #LokiQueryDirection: "forward" | "backward" @cuetsy(kind="enum")
+ #LokiQueryDirection: "forward" | "backward" | "scan" @cuetsy(kind="enum")
}
}]
lenses: []
diff --git a/public/app/plugins/datasource/loki/dataquery.gen.ts b/public/app/plugins/datasource/loki/dataquery.gen.ts
index 063deb3ee2f..99047e02221 100644
--- a/public/app/plugins/datasource/loki/dataquery.gen.ts
+++ b/public/app/plugins/datasource/loki/dataquery.gen.ts
@@ -31,6 +31,7 @@ export enum SupportingQueryType {
export enum LokiQueryDirection {
Backward = 'backward',
Forward = 'forward',
+ Scan = 'scan',
}
export interface LokiDataQuery extends common.DataQuery {
diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts
index 859b718c38c..d7f5862159f 100644
--- a/public/app/plugins/datasource/loki/datasource.ts
+++ b/public/app/plugins/datasource/loki/datasource.ts
@@ -82,9 +82,11 @@ import {
getStreamSelectorsFromQuery,
isLogsQuery,
isQueryWithError,
+ requestSupportsSharding,
requestSupportsSplitting,
} from './queryUtils';
import { replaceVariables, returnVariables } from './querybuilder/parsingUtils';
+import { runShardSplitQuery } from './shardQuerySplitting';
import { convertToWebSocketUrl, doLokiChannelStream } from './streaming';
import { trackQuery } from './tracking';
import {
@@ -359,7 +361,13 @@ export class LokiDatasource
return this.runLiveQueryThroughBackend(fixedRequest);
}
- if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) {
+ if (
+ config.featureToggles.lokiShardSplitting &&
+ requestSupportsSharding(fixedRequest.targets) &&
+ fixedRequest.app === CoreApp.Explore
+ ) {
+ return runShardSplitQuery(this, fixedRequest);
+ } else if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) {
return runSplitQuery(this, fixedRequest);
}
diff --git a/public/app/plugins/datasource/loki/mergeResponses.test.ts b/public/app/plugins/datasource/loki/mergeResponses.test.ts
new file mode 100644
index 00000000000..38f3833d6b0
--- /dev/null
+++ b/public/app/plugins/datasource/loki/mergeResponses.test.ts
@@ -0,0 +1,1282 @@
+import { cloneDeep } from 'lodash';
+
+import { DataQueryResponse, Field, FieldType, QueryResultMetaStat } from '@grafana/data';
+
+import { getMockFrames } from './__mocks__/frames';
+import { cloneQueryResponse, combineResponses } from './mergeResponses';
+
+describe('cloneQueryResponse', () => {
+ const { logFrameA } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ it('clones query responses', () => {
+ const clonedA = cloneQueryResponse(responseA);
+ expect(clonedA).not.toBe(responseA);
+ expect(clonedA).toEqual(clonedA);
+ });
+});
+
+describe('combineResponses', () => {
+ it('combines logs frames', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 2, 3, 4],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '3000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id4', 'id1', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines logs frames with transformed fields', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+ const { logFrameB: originalLogFrameB } = getMockFrames();
+
+ // Pseudo shuffle fields
+ logFrameB.fields.sort((a: Field, b: Field) => (a.name < b.name ? -1 : 1));
+ expect(logFrameB.fields).not.toEqual(originalLogFrameB.fields);
+
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 2, 3, 4],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '3000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id4', 'id1', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines metric frames', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ labels: {
+ level: 'debug',
+ },
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines and identifies new frames in the response', () => {
+ const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB, metricFrameC],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ labels: {
+ level: 'debug',
+ },
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ metricFrameC,
+ ],
+ });
+ });
+
+ it('combines frames prioritizing refIds over names', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const dataFrameA = {
+ ...metricFrameA,
+ refId: 'A',
+ name: 'A',
+ };
+ const dataFrameB = {
+ ...metricFrameB,
+ refId: 'B',
+ name: 'A',
+ };
+ const responseA: DataQueryResponse = {
+ data: [dataFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [dataFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [dataFrameA, dataFrameB],
+ });
+ });
+
+ it('combines frames in a new response instance', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ expect(combineResponses(null, responseA)).not.toBe(responseA);
+ expect(combineResponses(null, responseB)).not.toBe(responseB);
+ });
+
+ it('combine when first param has errors', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const errorA = {
+ message: 'errorA',
+ };
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ error: errorA,
+ errors: [errorA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+
+ const combined = combineResponses(responseA, responseB);
+ expect(combined.data[0].length).toBe(4);
+ expect(combined.error?.message).toBe('errorA');
+ expect(combined.errors).toHaveLength(1);
+ expect(combined.errors?.[0]?.message).toBe('errorA');
+ });
+
+ it('combine when second param has errors', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const errorB = {
+ message: 'errorB',
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ error: errorB,
+ errors: [errorB],
+ };
+
+ const combined = combineResponses(responseA, responseB);
+ expect(combined.data[0].length).toBe(4);
+ expect(combined.error?.message).toBe('errorB');
+ expect(combined.errors).toHaveLength(1);
+ expect(combined.errors?.[0]?.message).toBe('errorB');
+ });
+
+ it('combine when both frames have errors', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const errorA = {
+ message: 'errorA',
+ };
+ const errorB = {
+ message: 'errorB',
+ };
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ error: errorA,
+ errors: [errorA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ error: errorB,
+ errors: [errorB],
+ };
+
+ const combined = combineResponses(responseA, responseB);
+ expect(combined.data[0].length).toBe(4);
+ expect(combined.error?.message).toBe('errorA');
+ expect(combined.errors).toHaveLength(2);
+ expect(combined.errors?.[0]?.message).toBe('errorA');
+ expect(combined.errors?.[1]?.message).toBe('errorB');
+ });
+
+ it('combines frames with nanoseconds', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+ logFrameA.fields[0].nanos = [333333, 444444];
+ logFrameB.fields[0].nanos = [111111, 222222];
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 2, 3, 4],
+ nanos: [111111, 222222, 333333, 444444],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '3000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id4', 'id1', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines frames without nanoseconds with frames with nanoseconds', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+ logFrameA.fields[0].nanos = undefined;
+ logFrameB.fields[0].nanos = [111111, 222222];
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 2, 3, 4],
+ nanos: [111111, 222222, 0, 0],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '3000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id4', 'id1', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines frames with nanoseconds with frames without nanoseconds', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+ logFrameA.fields[0].nanos = [111111, 222222];
+ logFrameB.fields[0].nanos = undefined;
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 2, 3, 4],
+ nanos: [0, 0, 111111, 222222],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '3000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id4', 'id1', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ describe('combine stats', () => {
+ const { metricFrameA } = getMockFrames();
+ const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats,
+ },
+ },
+ ],
+ });
+ it('two values', () => {
+ const responseA = makeResponse([
+ { displayName: 'Ingester: total reached', value: 1 },
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
+ ]);
+ const responseB = makeResponse([
+ { displayName: 'Ingester: total reached', value: 2 },
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 },
+ ]);
+
+ expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 },
+ ]);
+ });
+
+ it('one value', () => {
+ const responseA = makeResponse([
+ { displayName: 'Ingester: total reached', value: 1 },
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
+ ]);
+ const responseB = makeResponse();
+
+ expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
+ ]);
+
+ expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([
+ { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 },
+ ]);
+ });
+
+ it('no value', () => {
+ const responseA = makeResponse();
+ const responseB = makeResponse();
+ expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0);
+ });
+ });
+
+ it('does not combine frames with different refId', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ metricFrameA.refId = 'A';
+ metricFrameB.refId = 'B';
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [metricFrameA, metricFrameB],
+ });
+ });
+
+ it('does not combine frames with different refId', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ metricFrameA.name = 'A';
+ metricFrameB.name = 'B';
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [metricFrameA, metricFrameB],
+ });
+ });
+
+ it('when fields with the same name are present, uses labels to find the right field to combine', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+
+ metricFrameA.fields.push({
+ name: 'Value',
+ type: FieldType.number,
+ config: {},
+ values: [9, 8],
+ labels: {
+ test: 'true',
+ },
+ });
+ metricFrameB.fields.push({
+ name: 'Value',
+ type: FieldType.number,
+ config: {},
+ values: [11, 10],
+ labels: {
+ test: 'true',
+ },
+ });
+
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ labels: {
+ level: 'debug',
+ },
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [11, 10, 9, 8],
+ labels: {
+ test: 'true',
+ },
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('when fields with the same name are present and labels are not present, falls back to indexes', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+
+ delete metricFrameA.fields[1].labels;
+ delete metricFrameB.fields[1].labels;
+
+ metricFrameA.fields.push({
+ name: 'Value',
+ type: FieldType.number,
+ config: {},
+ values: [9, 8],
+ });
+ metricFrameB.fields.push({
+ name: 'Value',
+ type: FieldType.number,
+ config: {},
+ values: [11, 10],
+ });
+
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [11, 10, 9, 8],
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+});
+
+describe('mergeFrames', () => {
+ it('combines metric frames', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ labels: {
+ level: 'debug',
+ },
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('adds old to new values when combining', () => {
+ const { metricFrameA, metricFrameB } = getMockFrames();
+
+ metricFrameB.fields[0].values = [3000000, 3500000, 4000000];
+ metricFrameB.fields[1].values = [5, 10, 6];
+
+ const responseA: DataQueryResponse = {
+ data: [metricFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [3000000, 3500000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [10, 10, 10],
+ labels: {
+ level: 'debug',
+ },
+ },
+ ],
+ length: 3,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('combines and identifies new frames in the response', () => {
+ const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames();
+ const responseA: DataQueryResponse = {
+ data: [metricFrameB],
+ };
+ const responseB: DataQueryResponse = {
+ data: [metricFrameA, metricFrameC],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1000000, 2000000, 3000000, 4000000],
+ },
+ {
+ config: {},
+ name: 'Value',
+ type: 'number',
+ values: [6, 7, 5, 4],
+ labels: {
+ level: 'debug',
+ },
+ },
+ ],
+ length: 4,
+ meta: {
+ type: 'timeseries-multi',
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ metricFrameC,
+ ],
+ });
+ });
+
+ it('merges logs frames', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+
+ // 3 overlaps with logFrameA
+ logFrameB.fields[0].values = [2, 3];
+ logFrameB.fields[1].values = ['line4', 'line1'];
+ logFrameB.fields[4].values = ['id4', 'id1'];
+
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [2, 3, 4],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line4', 'line1', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id4', 'id1', 'id2'],
+ },
+ ],
+ length: 3,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('merges frames with nanoseconds', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+
+ logFrameA.fields[0].values = [3, 4];
+ logFrameA.fields[0].nanos = [333333, 444444];
+
+ // 3 overlaps with logFrameA
+ logFrameB.fields[0].values = [2, 3];
+ logFrameB.fields[0].nanos = [222222, 333333];
+ logFrameB.fields[4].values = ['id4', 'id1'];
+
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [2, 3, 4],
+ nanos: [222222, 333333, 444444],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line4', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '2000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id4', 'id1', 'id2'],
+ },
+ ],
+ length: 3,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('receiving existing values do not introduce inconsistencies', () => {
+ const { logFrameA, logFrameAB } = getMockFrames();
+
+ const responseA: DataQueryResponse = {
+ data: [cloneDeep(logFrameAB)],
+ };
+ const responseB: DataQueryResponse = {
+ data: [cloneDeep(logFrameA)],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ ...logFrameAB,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ },
+ ],
+ });
+ });
+
+ it('considers nanoseconds to merge the frames', () => {
+ const { logFrameA, logFrameB } = getMockFrames();
+
+ // Same timestamps but different nanos
+ logFrameA.fields[0].values = [1, 2];
+ logFrameA.fields[0].nanos = [333333, 444444];
+ logFrameB.fields[0].values = [1, 2];
+ logFrameB.fields[0].nanos = [222222, 333333];
+
+ const responseA: DataQueryResponse = {
+ data: [logFrameA],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ fields: [
+ {
+ config: {},
+ name: 'Time',
+ type: 'time',
+ values: [1, 1, 2, 2],
+ nanos: [222222, 333333, 333333, 444444],
+ },
+ {
+ config: {},
+ name: 'Line',
+ type: 'string',
+ values: ['line3', 'line1', 'line4', 'line2'],
+ },
+ {
+ config: {},
+ name: 'labels',
+ type: 'other',
+ values: [
+ {
+ otherLabel: 'other value',
+ },
+ {
+ label: 'value',
+ },
+ {
+ otherLabel: 'other value',
+ },
+ ],
+ },
+ {
+ config: {},
+ name: 'tsNs',
+ type: 'string',
+ values: ['1000000', '3000000', '2000000', '4000000'],
+ },
+ {
+ config: {},
+ name: 'id',
+ type: 'string',
+ values: ['id3', 'id1', 'id4', 'id2'],
+ },
+ ],
+ length: 4,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 33,
+ },
+ ],
+ },
+ refId: 'A',
+ },
+ ],
+ });
+ });
+
+ it('correctly handles empty responses', () => {
+ const { emptyFrame, logFrameB } = getMockFrames();
+
+ logFrameB.fields[0].values = [1, 2];
+ logFrameB.fields[0].nanos = [222222, 333333];
+
+ const responseA: DataQueryResponse = {
+ data: [emptyFrame],
+ };
+ const responseB: DataQueryResponse = {
+ data: [logFrameB],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ ...logFrameB,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 }],
+ },
+ length: 2,
+ },
+ ],
+ });
+ });
+
+ it('merging exactly the same data produces the same data', () => {
+ const { logFrameA } = getMockFrames();
+
+ const responseA: DataQueryResponse = {
+ data: [cloneDeep(logFrameA)],
+ };
+ const responseB: DataQueryResponse = {
+ data: [cloneDeep(logFrameA)],
+ };
+ expect(combineResponses(responseA, responseB)).toEqual({
+ data: [
+ {
+ ...logFrameA,
+ meta: {
+ custom: {
+ frameType: 'LabeledTimeValues',
+ },
+ stats: [
+ {
+ displayName: 'Summary: total bytes processed',
+ unit: 'decbytes',
+ value: 22,
+ },
+ ],
+ },
+ },
+ ],
+ });
+ });
+});
diff --git a/public/app/plugins/datasource/loki/mergeResponses.ts b/public/app/plugins/datasource/loki/mergeResponses.ts
new file mode 100644
index 00000000000..d37413650fc
--- /dev/null
+++ b/public/app/plugins/datasource/loki/mergeResponses.ts
@@ -0,0 +1,318 @@
+import {
+ closestIdx,
+ DataFrame,
+ DataFrameType,
+ DataQueryResponse,
+ DataQueryResponseData,
+ Field,
+ FieldType,
+ LoadingState,
+ QueryResultMetaStat,
+ shallowCompare,
+} from '@grafana/data';
+
+import { LOADING_FRAME_NAME } from './querySplitting';
+
+export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
+ if (!currentResponse) {
+ return cloneQueryResponse(newResponse);
+ }
+
+ newResponse.data.forEach((newFrame) => {
+ const currentFrame = currentResponse.data.find((frame) => shouldCombine(frame, newFrame));
+ if (!currentFrame) {
+ currentResponse.data.push(cloneDataFrame(newFrame));
+ return;
+ }
+ mergeFrames(currentFrame, newFrame);
+ });
+
+ const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
+ if (mergedErrors.length > 0) {
+ currentResponse.errors = mergedErrors;
+ }
+
+ // the `.error` attribute is obsolete now,
+ // but we have to maintain it, otherwise
+ // some grafana parts do not behave well.
+ // we just choose the old error, if it exists,
+ // otherwise the new error, if it exists.
+ const mergedError = currentResponse.error ?? newResponse.error;
+ if (mergedError != null) {
+ currentResponse.error = mergedError;
+ }
+
+ const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
+ if (mergedTraceIds.length > 0) {
+ currentResponse.traceIds = mergedTraceIds;
+ }
+
+ return currentResponse;
+}
+
+/**
+ * Given an existing DataQueryResponse, replace any data frame present in newResponse with those in newResponse
+ */
+export function replaceResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
+ if (!currentResponse) {
+ return cloneQueryResponse(newResponse);
+ }
+
+ newResponse.data.forEach((newFrame) => {
+ const currentFrameIndex = currentResponse.data.findIndex((frame) => shouldCombine(frame, newFrame));
+ if (currentFrameIndex < 0) {
+ currentResponse.data.push(cloneDataFrame(newFrame));
+ return;
+ }
+ currentResponse.data[currentFrameIndex] = newFrame;
+ });
+
+ // Clean up loading frame when newResponse contains the final response
+ if (newResponse.state === LoadingState.Done) {
+ currentResponse.data = currentResponse.data.filter((frame) => frame.name !== LOADING_FRAME_NAME);
+ }
+
+ const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
+ if (mergedErrors.length > 0) {
+ currentResponse.errors = mergedErrors;
+ }
+
+ const mergedError = currentResponse.error ?? newResponse.error;
+ if (mergedError != null) {
+ currentResponse.error = mergedError;
+ }
+
+ const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
+ if (mergedTraceIds.length > 0) {
+ currentResponse.traceIds = mergedTraceIds;
+ }
+
+ return currentResponse;
+}
+
+/**
+ * Given two data frames, merge their values. Overlapping values will be added together.
+ */
+export function mergeFrames(dest: DataFrame, source: DataFrame) {
+ const destTimeField = dest.fields.find((field) => field.type === FieldType.time);
+ const destIdField = dest.fields.find((field) => field.type === FieldType.string && field.name === 'id');
+ const sourceTimeField = source.fields.find((field) => field.type === FieldType.time);
+ const sourceIdField = source.fields.find((field) => field.type === FieldType.string && field.name === 'id');
+
+ if (!destTimeField || !sourceTimeField) {
+ console.error(new Error(`Time fields not found in the data frames`));
+ return;
+ }
+
+ const sourceTimeValues = sourceTimeField?.values.slice(0) ?? [];
+ const totalFields = Math.max(dest.fields.length, source.fields.length);
+
+ for (let i = 0; i < sourceTimeValues.length; i++) {
+ const destIdx = resolveIdx(destTimeField, sourceTimeField, i);
+
+ const entryExistsInDest = compareEntries(destTimeField, destIdField, destIdx, sourceTimeField, sourceIdField, i);
+
+ for (let f = 0; f < totalFields; f++) {
+ // For now, skip undefined fields that exist in the new frame
+ if (!dest.fields[f]) {
+ continue;
+ }
+ // Index is not reliable when frames have disordered fields, or an extra/missing field, so we find them by name.
+ // If the field has no name, we fallback to the old index version.
+ const sourceField = findSourceField(dest.fields[f], source.fields, f);
+ if (!sourceField) {
+ continue;
+ }
+ // Same value, accumulate
+ if (entryExistsInDest) {
+ if (dest.fields[f].type === FieldType.time) {
+ // Time already exists, skip
+ continue;
+ } else if (dest.fields[f].type === FieldType.number) {
+ // Number, add
+ dest.fields[f].values[destIdx] = (dest.fields[f].values[destIdx] ?? 0) + sourceField.values[i];
+ } else if (dest.fields[f].type === FieldType.other) {
+ // Possibly labels, combine
+ if (typeof sourceField.values[i] === 'object') {
+ dest.fields[f].values[destIdx] = {
+ ...dest.fields[f].values[destIdx],
+ ...sourceField.values[i],
+ };
+ } else if (sourceField.values[i]) {
+ dest.fields[f].values[destIdx] = sourceField.values[i];
+ }
+ } else {
+ // Replace value
+ dest.fields[f].values[destIdx] = sourceField.values[i];
+ }
+ } else if (sourceField.values[i] !== undefined) {
+ // Insert in the `destIdx` position
+ dest.fields[f].values.splice(destIdx, 0, sourceField.values[i]);
+ if (sourceField.nanos) {
+ dest.fields[f].nanos = dest.fields[f].nanos ?? new Array(dest.fields[f].values.length - 1).fill(0);
+ dest.fields[f].nanos?.splice(destIdx, 0, sourceField.nanos[i]);
+ } else if (dest.fields[f].nanos) {
+ dest.fields[f].nanos?.splice(destIdx, 0, 0);
+ }
+ }
+ }
+ }
+
+ dest.length = dest.fields[0].values.length;
+
+ dest.meta = {
+ ...dest.meta,
+ stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []),
+ };
+}
+
+function resolveIdx(destField: Field, sourceField: Field, index: number) {
+ const idx = closestIdx(sourceField.values[index], destField.values);
+ if (idx < 0) {
+ return 0;
+ }
+ if (sourceField.values[index] === destField.values[idx] && sourceField.nanos && destField.nanos) {
+ return sourceField.nanos[index] > destField.nanos[idx] ? idx + 1 : idx;
+ }
+ if (sourceField.values[index] > destField.values[idx]) {
+ return idx + 1;
+ }
+ return idx;
+}
+
+function compareEntries(
+ destTimeField: Field,
+ destIdField: Field | undefined,
+ destIndex: number,
+ sourceTimeField: Field,
+ sourceIdField: Field | undefined,
+ sourceIndex: number
+) {
+ const sameTimestamp = compareNsTimestamps(destTimeField, destIndex, sourceTimeField, sourceIndex);
+ if (!sameTimestamp) {
+ return false;
+ }
+ if (!destIdField || !sourceIdField) {
+ return true;
+ }
+ // Log frames, check indexes
+ return (
+ destIdField.values[destIndex] !== undefined && destIdField.values[destIndex] === sourceIdField.values[sourceIndex]
+ );
+}
+
+function compareNsTimestamps(destField: Field, destIndex: number, sourceField: Field, sourceIndex: number) {
+ if (destField.nanos && sourceField.nanos) {
+ return (
+ destField.values[destIndex] !== undefined &&
+ destField.values[destIndex] === sourceField.values[sourceIndex] &&
+ destField.nanos[destIndex] !== undefined &&
+ destField.nanos[destIndex] === sourceField.nanos[sourceIndex]
+ );
+ }
+ return destField.values[destIndex] !== undefined && destField.values[destIndex] === sourceField.values[sourceIndex];
+}
+
+function findSourceField(referenceField: Field, sourceFields: Field[], index: number) {
+ const candidates = sourceFields.filter((f) => f.name === referenceField.name);
+
+ if (candidates.length === 1) {
+ return candidates[0];
+ }
+
+ if (referenceField.labels) {
+ return candidates.find((candidate) => shallowCompare(referenceField.labels ?? {}, candidate.labels ?? {}));
+ }
+
+ return sourceFields[index];
+}
+
+const TOTAL_BYTES_STAT = 'Summary: total bytes processed';
+const EXEC_TIME_STAT = 'Summary: exec time';
+// This is specific for Loki
+function getCombinedMetadataStats(
+ destStats: QueryResultMetaStat[],
+ sourceStats: QueryResultMetaStat[]
+): QueryResultMetaStat[] {
+ // in the current approach, we only handle a single stat
+ const stats: QueryResultMetaStat[] = [];
+ for (const stat of [TOTAL_BYTES_STAT, EXEC_TIME_STAT]) {
+ const destStat = destStats.find((s) => s.displayName === stat);
+ const sourceStat = sourceStats.find((s) => s.displayName === stat);
+
+ if (sourceStat != null && destStat != null) {
+ stats.push({ value: sourceStat.value + destStat.value, displayName: stat, unit: destStat.unit });
+ continue;
+ }
+
+ // maybe one of them exist
+ const eitherStat = sourceStat ?? destStat;
+ if (eitherStat != null) {
+ stats.push(eitherStat);
+ }
+ }
+ return stats;
+}
+
+/**
+ * Deep clones a DataQueryResponse
+ */
+export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse {
+ const newResponse = {
+ ...response,
+ data: response.data.map(cloneDataFrame),
+ };
+ return newResponse;
+}
+
+function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData {
+ return {
+ ...frame,
+ fields: frame.fields.map((field: Field) => ({
+ ...field,
+ values: field.values,
+ })),
+ };
+}
+
+function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean {
+ if (frame1.refId !== frame2.refId || frame1.name !== frame2.name) {
+ return false;
+ }
+
+ const frameType1 = frame1.meta?.type;
+ const frameType2 = frame2.meta?.type;
+
+ if (frameType1 !== frameType2) {
+ // we do not join things that have a different type
+ return false;
+ }
+
+ // metric range query data
+ if (frameType1 === DataFrameType.TimeSeriesMulti) {
+ const field1 = frame1.fields.find((f) => f.type === FieldType.number);
+ const field2 = frame2.fields.find((f) => f.type === FieldType.number);
+ if (field1 === undefined || field2 === undefined) {
+ // should never happen
+ return false;
+ }
+
+ return shallowCompare(field1.labels ?? {}, field2.labels ?? {});
+ }
+
+ // logs query data
+ // logs use a special attribute in the dataframe's "custom" section
+ // because we do not have a good "frametype" value for them yet.
+ const customType1 = frame1.meta?.custom?.frameType;
+ const customType2 = frame2.meta?.custom?.frameType;
+ // Legacy frames have this custom type
+ if (customType1 === 'LabeledTimeValues' && customType2 === 'LabeledTimeValues') {
+ return true;
+ } else if (customType1 === customType2) {
+ // Data plane frames don't
+ return true;
+ }
+
+ // should never reach here
+ return false;
+}
diff --git a/public/app/plugins/datasource/loki/modifyQuery.ts b/public/app/plugins/datasource/loki/modifyQuery.ts
index 293d78fc660..67ca4f0e6bb 100644
--- a/public/app/plugins/datasource/loki/modifyQuery.ts
+++ b/public/app/plugins/datasource/loki/modifyQuery.ts
@@ -151,7 +151,7 @@ export function addLabelToQuery(
value: string,
labelType?: LabelType | null
): string {
- if (!key || !value) {
+ if (!key) {
throw new Error('Need label to add to query.');
}
diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/querySplitting.test.ts
index 47f36658731..9c8326eb767 100644
--- a/public/app/plugins/datasource/loki/querySplitting.test.ts
+++ b/public/app/plugins/datasource/loki/querySplitting.test.ts
@@ -1,6 +1,7 @@
import { of } from 'rxjs';
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data';
+import { config } from '@grafana/runtime';
import { createLokiDatasource } from './__mocks__/datasource';
import { getMockFrames } from './__mocks__/frames';
@@ -16,6 +17,19 @@ jest.mock('uuid', () => ({
v4: jest.fn().mockReturnValue('uuid'),
}));
+const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
+beforeAll(() => {
+ // @ts-expect-error
+ jest.spyOn(global, 'setTimeout').mockImplementation((callback) => {
+ callback();
+ });
+ config.featureToggles.lokiShardSplitting = false;
+});
+afterAll(() => {
+ jest.mocked(global.setTimeout).mockReset();
+ config.featureToggles.lokiShardSplitting = originalShardingFlagState;
+});
+
describe('runSplitQuery()', () => {
let datasource: LokiDatasource;
const range = {
@@ -51,6 +65,26 @@ describe('runSplitQuery()', () => {
});
});
+ test('Retries retriable failed requests', async () => {
+ jest
+ .mocked(datasource.runQuery)
+ .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
+ await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ // 3 days, 3 chunks, 1 retry, 4 requests.
+ expect(datasource.runQuery).toHaveBeenCalledTimes(4);
+ });
+ });
+
+ test('Does not retry on other errors', async () => {
+ jest
+ .mocked(datasource.runQuery)
+ .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
+ await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ // 3 days, 3 chunks, 3 requests.
+ expect(datasource.runQuery).toHaveBeenCalledTimes(1);
+ });
+ });
+
test('Metric queries with maxLines of 0 will execute', async () => {
const request = createRequest([{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }]);
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
@@ -285,9 +319,10 @@ describe('runSplitQuery()', () => {
describe('Dynamic maxLines for logs requests', () => {
const request = createRequest([{ expr: '{a="b"}', refId: 'A', maxLines: 4 }]);
- const { logFrameA } = getMockFrames();
+ const { logFrameA, logFrameB } = getMockFrames();
beforeEach(() => {
- jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' }));
+ jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' }));
+ jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameB], refId: 'A' }));
});
test('Stops requesting once maxLines of logs have been received', async () => {
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/querySplitting.ts
index 6215f3ff42e..acd0458ca1e 100644
--- a/public/app/plugins/datasource/loki/querySplitting.ts
+++ b/public/app/plugins/datasource/loki/querySplitting.ts
@@ -14,12 +14,13 @@ import {
TimeRange,
LoadingState,
} from '@grafana/data';
-import { combineResponses } from '@grafana/o11y-ds-frontend';
import { LokiDatasource } from './datasource';
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
+import { combineResponses } from './mergeResponses';
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
+import { isRetriableError } from './responseUtils';
import { trackGroupedQueries } from './tracking';
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
@@ -53,7 +54,7 @@ export function partitionTimeRange(
* At the end, we will filter the targets that don't need to be executed in the next request batch,
* becasue, for example, the `maxLines` have been reached.
*/
-function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] {
+export function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] {
if (!response) {
return targets;
}
@@ -82,8 +83,18 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
const longestPartition = requests.filter(({ partition }) => partition.length === totalRequests)[0].partition;
let shouldStop = false;
- let subquerySubsciption: Subscription | null = null;
+ let subquerySubscription: Subscription | null = null;
+ let retriesMap = new Map();
+ let retryTimer: ReturnType | null = null;
+
const runNextRequest = (subscriber: Subscriber, requestN: number, requestGroup: number) => {
+ let retrying = false;
+
+ if (subquerySubscription != null) {
+ subquerySubscription.unsubscribe();
+ subquerySubscription = null;
+ }
+
if (shouldStop) {
subscriber.complete();
return;
@@ -104,6 +115,37 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
done();
};
+ const retry = (errorResponse?: DataQueryResponse) => {
+ try {
+ if (errorResponse && !isRetriableError(errorResponse)) {
+ return false;
+ }
+ } catch (e) {
+ console.error(e);
+ shouldStop = true;
+ return false;
+ }
+
+ const key = `${requestN}-${requestGroup}`;
+ const retries = retriesMap.get(key) ?? 0;
+ if (retries > 3) {
+ return false;
+ }
+
+ retriesMap.set(key, retries + 1);
+
+ retryTimer = setTimeout(
+ () => {
+ runNextRequest(subscriber, requestN, requestGroup);
+ },
+ 1500 * Math.pow(2, retries)
+ ); // Exponential backoff
+
+ retrying = true;
+
+ return true;
+ };
+
const group = requests[requestGroup];
const range = group.partition[requestN - 1];
const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse);
@@ -119,20 +161,29 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
subRequest.requestId = `${group.request.requestId}_${requestN}`;
}
- subquerySubsciption = datasource.runQuery(subRequest).subscribe({
+ subquerySubscription = datasource.runQuery(subRequest).subscribe({
next: (partialResponse) => {
- mergedResponse = combineResponses(mergedResponse, partialResponse);
- mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
- if ((mergedResponse.errors ?? []).length > 0 || mergedResponse.error != null) {
+ if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
+ if (retry(partialResponse)) {
+ return;
+ }
shouldStop = true;
}
+ mergedResponse = combineResponses(mergedResponse, partialResponse);
+ mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
},
complete: () => {
+ if (retrying) {
+ return;
+ }
subscriber.next(mergedResponse);
nextRequest();
},
error: (error) => {
subscriber.error(error);
+ if (retry()) {
+ return;
+ }
},
});
};
@@ -141,8 +192,13 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
runNextRequest(subscriber, totalRequests, 0);
return () => {
shouldStop = true;
- if (subquerySubsciption != null) {
- subquerySubsciption.unsubscribe();
+ if (retryTimer) {
+ clearTimeout(retryTimer);
+ retryTimer = null;
+ }
+ if (subquerySubscription != null) {
+ subquerySubscription.unsubscribe();
+ subquerySubscription = null;
}
};
});
diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts
index 368f1819620..c3b3d6ed195 100644
--- a/public/app/plugins/datasource/loki/queryUtils.ts
+++ b/public/app/plugins/datasource/loki/queryUtils.ts
@@ -25,9 +25,10 @@ import {
} from '@grafana/lezer-logql';
import { DataQuery } from '@grafana/schema';
-import { getStreamSelectorPositions, NodePosition } from './modifyQuery';
+import { REF_ID_STARTER_LOG_VOLUME } from './datasource';
+import { addLabelToQuery, getStreamSelectorPositions, NodePosition } from './modifyQuery';
import { ErrorId } from './querybuilder/parsingUtils';
-import { LokiQuery, LokiQueryType } from './types';
+import { LabelType, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
/**
* Returns search terms from a LogQL query.
@@ -313,6 +314,18 @@ export function requestSupportsSplitting(allQueries: LokiQuery[]) {
return queries.length > 0;
}
+export function requestSupportsSharding(allQueries: LokiQuery[]) {
+ const queries = allQueries
+ .filter((query) => !query.hide)
+ .filter((query) => !query.refId.includes('do-not-shard'))
+ .filter((query) => query.expr)
+ .filter(
+ (query) => query.direction === LokiQueryDirection.Scan || query.refId?.startsWith(REF_ID_STARTER_LOG_VOLUME)
+ );
+
+ return queries.length > 0;
+}
+
export const isLokiQuery = (query: DataQuery): query is LokiQuery => {
if (!query) {
return false;
@@ -328,3 +341,33 @@ export const getLokiQueryFromDataQuery = (query?: DataQuery): LokiQuery | undefi
return query;
};
+
+export const interpolateShardingSelector = (queries: LokiQuery[], shards: number[]) => {
+ if (shards.length === 0) {
+ return queries;
+ }
+
+ let shardValue = shards.join('|');
+
+ // -1 means empty shard value
+ if (shardValue === '-1' || shards.length === 1) {
+ shardValue = shardValue === '-1' ? '' : shardValue;
+ return queries.map((query) => ({
+ ...query,
+ expr: addLabelToQuery(query.expr, '__stream_shard__', '=', shardValue, LabelType.Indexed),
+ }));
+ }
+
+ return queries.map((query) => ({
+ ...query,
+ expr: addLabelToQuery(query.expr, '__stream_shard__', '=~', shardValue, LabelType.Indexed),
+ }));
+};
+
+export const getSelectorForShardValues = (query: string) => {
+ const selector = getNodesFromQuery(query, [Selector]);
+ if (selector.length > 0) {
+ return query.substring(selector[0].from, selector[0].to);
+ }
+ return '';
+};
diff --git a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx
index e1154b21eef..e41af2534fa 100644
--- a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx
+++ b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx
@@ -89,6 +89,7 @@ describe('LokiQueryBuilderOptions', () => {
setup({ expr: '{foo="bar"}' });
expect(screen.getByText('Line limit: 20')).toBeInTheDocument();
expect(screen.getByText('Type: Range')).toBeInTheDocument();
+ expect(screen.getByText('Direction: Backward')).toBeInTheDocument();
expect(screen.queryByText(/step/i)).not.toBeInTheDocument();
});
@@ -98,6 +99,7 @@ describe('LokiQueryBuilderOptions', () => {
expect(screen.getByText('Type: Range')).toBeInTheDocument();
expect(screen.getByText('Step: 1m')).toBeInTheDocument();
expect(screen.getByText('Resolution: 1/2')).toBeInTheDocument();
+ expect(screen.queryByText(/Direction/)).not.toBeInTheDocument();
});
it('does not shows resolution field if resolution is not set', async () => {
diff --git a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx
index 5fe4714369c..ec013d185df 100644
--- a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx
+++ b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx
@@ -8,6 +8,7 @@ import { config, reportInteraction } from '@grafana/runtime';
import { Alert, AutoSizeInput, RadioButtonGroup, Select } from '@grafana/ui';
import {
+ getQueryDirectionLabel,
preprocessMaxLines,
queryDirections,
queryTypeOptions,
@@ -102,7 +103,7 @@ export const LokiQueryBuilderOptions = React.memo(
x.value === queryType);
const resolutionLabel = RESOLUTION_OPTIONS.find((x) => x.value === (query.resolution ?? 1));
@@ -218,9 +220,8 @@ function getCollapsedInfo(
if (isLogQuery) {
items.push(`Line limit: ${query.maxLines ?? maxLines}`);
- }
-
- if (!isLogQuery) {
+ items.push(`Direction: ${getQueryDirectionLabel(direction)}`);
+ } else {
if (query.step) {
items.push(`Step: ${isValidStep ? query.step : 'Invalid value'}`);
}
diff --git a/public/app/plugins/datasource/loki/responseUtils.ts b/public/app/plugins/datasource/loki/responseUtils.ts
index 1b1e909fefb..3d9bc01342d 100644
--- a/public/app/plugins/datasource/loki/responseUtils.ts
+++ b/public/app/plugins/datasource/loki/responseUtils.ts
@@ -1,4 +1,4 @@
-import { DataFrame, FieldType, isValidGoDuration, Labels } from '@grafana/data';
+import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
import { isBytesString, processLabels } from './languageUtils';
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
@@ -131,3 +131,14 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
}
return levelLikeLabel;
}
+
+export function isRetriableError(errorResponse: DataQueryResponse) {
+ const message = errorResponse.errors ? (errorResponse.errors[0].message ?? '').toLowerCase() : '';
+ if (message.includes('timeout')) {
+ return true;
+ } else if (message.includes('parse error') || message.includes('max entries')) {
+ // If the error is a parse error, we want to signal to stop querying.
+ throw new Error(message);
+ }
+ return false;
+}
diff --git a/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts b/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts
new file mode 100644
index 00000000000..dc52b82fea0
--- /dev/null
+++ b/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts
@@ -0,0 +1,455 @@
+import { of } from 'rxjs';
+
+import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
+
+import { createLokiDatasource } from './__mocks__/datasource';
+import { getMockFrames } from './__mocks__/frames';
+import { LokiDatasource } from './datasource';
+import { runShardSplitQuery } from './shardQuerySplitting';
+import { LokiQuery, LokiQueryDirection } from './types';
+
+jest.mock('uuid', () => ({
+ v4: jest.fn().mockReturnValue('uuid'),
+}));
+
+const originalLog = console.log;
+const originalWarn = console.warn;
+beforeEach(() => {
+ //jest.spyOn(console, 'log').mockImplementation(() => {});
+ jest.spyOn(console, 'warn').mockImplementation(() => {});
+});
+afterAll(() => {
+ console.log = originalLog;
+ console.warn = originalWarn;
+});
+
+describe('runShardSplitQuery()', () => {
+ let datasource: LokiDatasource;
+ const range = {
+ from: dateTime('2023-02-08T04:00:00.000Z'),
+ to: dateTime('2023-02-08T11:00:00.000Z'),
+ raw: {
+ from: dateTime('2023-02-08T04:00:00.000Z'),
+ to: dateTime('2023-02-08T11:00:00.000Z'),
+ },
+ };
+
+ const createRequest = (targets: Array>, overrides?: Partial>) => {
+ let request = {
+ range,
+ targets,
+ intervalMs: 60000,
+ requestId: 'TEST',
+ } as DataQueryRequest;
+
+ Object.assign(request, overrides);
+ return request;
+ };
+ let request: DataQueryRequest;
+ beforeEach(() => {
+ request = createRequest([
+ { expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
+ ]);
+ datasource = createLokiDatasource();
+ datasource.languageProvider.fetchLabelValues = jest.fn();
+ datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => {
+ return queries.map((query) => {
+ query.expr = query.expr.replace('$SELECTOR', '{a="b"}');
+ return query;
+ });
+ });
+ jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '2', '20', '3']);
+ const { metricFrameA } = getMockFrames();
+ jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [metricFrameA] }));
+ jest.spyOn(datasource, 'query').mockReturnValue(of({ data: [metricFrameA] }));
+ });
+
+ test('Splits datasource queries', async () => {
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ // 5 shards, 3 groups + empty shard group, 4 requests
+ expect(datasource.runQuery).toHaveBeenCalledTimes(4);
+ });
+ });
+
+ test('Interpolates queries before running', async () => {
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ expect(datasource.interpolateVariablesInQueries).toHaveBeenCalledTimes(1);
+
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_0_2',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"20|10"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_2_2',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_4_1',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_5_1',
+ targets: [
+ { expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
+ ],
+ });
+ });
+ });
+
+ test('Sends the whole stream selector to fetch values', async () => {
+ datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => {
+ return queries.map((query) => {
+ query.expr = query.expr.replace('$SELECTOR', '{service_name="test", filter="true"}');
+ return query;
+ });
+ });
+
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ expect(datasource.languageProvider.fetchLabelValues).toHaveBeenCalledWith('__stream_shard__', {
+ streamSelector: '{service_name="test", filter="true"}',
+ timeRange: expect.anything(),
+ });
+
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_0_2',
+ targets: [
+ {
+ expr: 'count_over_time({service_name="test", filter="true", __stream_shard__=~"20|10"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+ });
+ });
+
+ test('Returns a DataQueryResponse with the expected attributes', async () => {
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
+ expect(response[0].data).toBeDefined();
+ expect(response[0].state).toBe(LoadingState.Done);
+ expect(response[0].key).toBeDefined();
+ });
+ });
+
+ test('Retries failed retriable requests', async () => {
+ jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
+ jest
+ .spyOn(datasource, 'runQuery')
+ .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
+ // @ts-expect-error
+ jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
+ callback();
+ });
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
+ // 1 shard + empty shard + 1 retry = 3
+ expect(response).toHaveLength(3);
+ expect(datasource.runQuery).toHaveBeenCalledTimes(3);
+ });
+ });
+
+ test('Does not retry on other errors', async () => {
+ jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
+ jest
+ .spyOn(datasource, 'runQuery')
+ .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
+ // @ts-expect-error
+ jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
+ callback();
+ });
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
+ expect(datasource.runQuery).toHaveBeenCalledTimes(2);
+ });
+ });
+
+ test('Adjusts the group size based on errors and execution time', async () => {
+ const request = createRequest(
+ [{ expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan }],
+ {
+ range: {
+ from: dateTime('2024-11-13T05:00:00.000Z'),
+ to: dateTime('2024-11-14T06:00:00.000Z'),
+ raw: {
+ from: dateTime('2024-11-13T05:00:00.000Z'),
+ to: dateTime('2024-11-14T06:00:00.000Z'),
+ },
+ },
+ }
+ );
+
+ jest
+ .mocked(datasource.languageProvider.fetchLabelValues)
+ .mockResolvedValue(['1', '10', '2', '20', '3', '4', '5', '6', '7', '8', '9']);
+
+ // @ts-expect-error
+ jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
+ callback();
+ });
+
+ const { metricFrameA } = getMockFrames();
+
+ jest.mocked(datasource.runQuery).mockReset();
+
+ // + 50%
+ jest.mocked(datasource.runQuery).mockReturnValueOnce(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 0.5,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ // sqrt(currentSize)
+ jest
+ .mocked(datasource.runQuery)
+ .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
+
+ // +10%
+ jest.mocked(datasource.runQuery).mockReturnValueOnce(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 5,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ // -10%
+ jest.mocked(datasource.runQuery).mockReturnValueOnce(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 15,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ // -10%
+ jest.mocked(datasource.runQuery).mockReturnValueOnce(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 19,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ // -50%
+ jest.mocked(datasource.runQuery).mockReturnValueOnce(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 21,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ // No more than 50% of the remaining shards
+ jest.mocked(datasource.runQuery).mockReturnValue(
+ of({
+ data: [
+ {
+ ...metricFrameA,
+ meta: {
+ ...metricFrameA.meta,
+ stats: [
+ ...metricFrameA.meta!.stats!,
+ {
+ displayName: 'Summary: exec time',
+ unit: 's',
+ value: 0.5,
+ },
+ ],
+ },
+ },
+ ],
+ })
+ );
+
+ await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_0_3',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"20|10|9"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // +50%
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_3_4',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"8|7|6|5"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // Error, sqrt(currentSize)
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_3_2',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"8|7"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // +10%
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_5_3',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"6|5|4"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // -10%
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_8_2',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // No more than 50% of the remaining shards
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_10_1',
+ targets: [
+ {
+ expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
+ refId: 'A',
+ direction: LokiQueryDirection.Scan,
+ },
+ ],
+ });
+
+ // No more than 50% of the remaining shards
+ expect(datasource.runQuery).toHaveBeenCalledWith({
+ intervalMs: expect.any(Number),
+ range: expect.any(Object),
+ requestId: 'TEST_shard_0_11_1',
+ targets: [
+ { expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
+ ],
+ });
+ });
+ });
+});
diff --git a/public/app/plugins/datasource/loki/shardQuerySplitting.ts b/public/app/plugins/datasource/loki/shardQuerySplitting.ts
new file mode 100644
index 00000000000..d1b74e9de67
--- /dev/null
+++ b/public/app/plugins/datasource/loki/shardQuerySplitting.ts
@@ -0,0 +1,353 @@
+import { groupBy, partition } from 'lodash';
+import { Observable, Subscriber, Subscription } from 'rxjs';
+import { v4 as uuidv4 } from 'uuid';
+
+import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
+
+import { LokiDatasource } from './datasource';
+import { combineResponses, replaceResponses } from './mergeResponses';
+import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
+import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils';
+import { isRetriableError } from './responseUtils';
+import { LokiQuery } from './types';
+
+/**
+ * Query splitting by stream shards.
+ * Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
+ * dividing a big request into smaller sub-requests, combining and displaying the results as they arrive.
+ *
+ * This approach, inspired by the time-based query splitting, takes advantage of the __stream_shard__
+ * internal label, representing how data is spread into different sources that can be queried individually.
+ *
+ * The main entry point of this module is runShardSplitQuery(), which prepares the query for execution and
+ * passes it to splitQueriesByStreamShard() to begin the querying loop.
+ *
+ * splitQueriesByStreamShard() has the following structure:
+ * - Creates and returns an Observable to which the UI will subscribe
+ * - Requests the __stream_shard__ values of the selected service:
+ * . If there are no shard values, it falls back to the standard querying approach of the data source in runNonSplitRequest()
+ * . If there are shards:
+ * - It sorts them by value, descending. Higher shard numbers correspond with the least volume.
+ * - It defines an initial group size, roughly Math.sqrt(amountOfShards).
+ * - It begins the querying loop with runNextRequest().
+ * - runNextRequest() will create a group of groupSize shards from the nth shard (cycle), and has the following internal structure:
+ * . groupShardRequests() returns an array of shards from cycle to cycle + groupSize.
+ * . interpolateShardingSelector() will update the stream selector with the shard numbers in the current group.
+ * . After query execution:
+ * - If the response is successful:
+ * . It will add new data to the response with combineResponses()
+ * . Using the data and meta data of the response, updateGroupSizeFromResponse() will increase or decrease the group size.
+ * . nextRequest() will use the current cycle and group size to determine the next request or complete execution with done().
+ * - If the response is unsuccessful:
+ * . If the response is not a query error, and the group size bigger than 1, it will decrease the group size.
+ * . If the group size is already 1, it will retry the request up to 4 times.
+ * . If there are retry attempts, it will retry the current cycle, or else stop querying.
+ * - Once all request groups have been executed, it will be done().
+ */
+
+export function runShardSplitQuery(datasource: LokiDatasource, request: DataQueryRequest) {
+ const queries = datasource
+ .interpolateVariablesInQueries(request.targets, request.scopedVars)
+ .filter((query) => query.expr)
+ .filter((query) => !query.hide);
+
+ return splitQueriesByStreamShard(datasource, request, queries);
+}
+
+function splitQueriesByStreamShard(
+ datasource: LokiDatasource,
+ request: DataQueryRequest,
+ splittingTargets: LokiQuery[]
+) {
+ let shouldStop = false;
+ let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming, key: uuidv4() };
+ let subquerySubscription: Subscription | null = null;
+ let retriesMap = new Map();
+ let retryTimer: ReturnType | null = null;
+
+ const runNextRequest = (subscriber: Subscriber, group: number, groups: ShardedQueryGroup[]) => {
+ let nextGroupSize = groups[group].groupSize;
+ const { shards, groupSize, cycle } = groups[group];
+ let retrying = false;
+
+ if (subquerySubscription != null) {
+ subquerySubscription.unsubscribe();
+ subquerySubscription = null;
+ }
+
+ if (shouldStop) {
+ subscriber.complete();
+ return;
+ }
+
+ const done = () => {
+ mergedResponse.state = LoadingState.Done;
+ subscriber.next(mergedResponse);
+ subscriber.complete();
+ };
+
+ const nextRequest = () => {
+ const nextGroup =
+ groups[group + 1] && groupHasPendingRequests(groups[group + 1])
+ ? groups[group + 1]
+ : groups.find((shardGroup) => groupHasPendingRequests(shardGroup));
+
+ if (nextGroup === undefined) {
+ done();
+ return;
+ }
+ groups[group].groupSize = nextGroupSize;
+ runNextRequest(subscriber, groups.indexOf(nextGroup), groups);
+ };
+
+ const retry = (errorResponse?: DataQueryResponse) => {
+ try {
+ if (errorResponse && !isRetriableError(errorResponse)) {
+ return false;
+ }
+ } catch (e) {
+ console.error(e);
+ shouldStop = true;
+ return false;
+ }
+
+ if (groupSize !== undefined && groupSize > 1) {
+ groups[group].groupSize = Math.floor(Math.sqrt(groupSize));
+ debug(`Possible time out, new group size ${groups[group].groupSize}`);
+ retrying = true;
+ runNextRequest(subscriber, group, groups);
+ return true;
+ }
+
+ const key = `${group}_${cycle}`;
+ const retries = retriesMap.get(key) ?? 0;
+
+ if (retries > 3) {
+ shouldStop = true;
+ return false;
+ }
+
+ retriesMap.set(key, retries + 1);
+
+ retryTimer = setTimeout(
+ () => {
+ console.warn(`Retrying ${group} ${cycle} (${retries + 1})`);
+ runNextRequest(subscriber, group, groups);
+ retryTimer = null;
+ },
+ 1500 * Math.pow(2, retries)
+ ); // Exponential backoff
+
+ retrying = true;
+
+ return true;
+ };
+
+ const targets = adjustTargetsFromResponseState(groups[group].targets, mergedResponse);
+ if (!targets.length) {
+ nextRequest();
+ return;
+ }
+
+ const shardsToQuery =
+ shards && cycle !== undefined && groupSize ? groupShardRequests(shards, cycle, groupSize) : [];
+ const subRequest = { ...request, targets: interpolateShardingSelector(targets, shardsToQuery) };
+ // Request may not have a request id
+ if (request.requestId) {
+ subRequest.requestId =
+ shardsToQuery.length > 0 ? `${request.requestId}_shard_${group}_${cycle}_${groupSize}` : request.requestId;
+ }
+
+ debug(shardsToQuery.length ? `Querying ${shardsToQuery.join(', ')}` : 'Running regular query');
+
+ const queryRunner =
+ shardsToQuery.length > 0 ? datasource.runQuery.bind(datasource) : runSplitQuery.bind(null, datasource);
+ subquerySubscription = queryRunner(subRequest).subscribe({
+ next: (partialResponse: DataQueryResponse) => {
+ if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
+ if (retry(partialResponse)) {
+ return;
+ }
+ }
+ if (groupSize && cycle !== undefined && shards !== undefined) {
+ nextGroupSize = constrainGroupSize(
+ cycle + groupSize,
+ updateGroupSizeFromResponse(partialResponse, groups[group]),
+ shards.length
+ );
+ if (nextGroupSize !== groupSize) {
+ debug(`New group size ${nextGroupSize}`);
+ }
+ }
+ mergedResponse =
+ shardsToQuery.length > 0
+ ? combineResponses(mergedResponse, partialResponse)
+ : replaceResponses(mergedResponse, partialResponse);
+
+ // When we delegate query running to runSplitQuery(), we will receive partial updates here, and complete
+ // will be called when all the sub-requests were completed, so we need to show partial progress here.
+ if (shardsToQuery.length === 0) {
+ subscriber.next(mergedResponse);
+ }
+ },
+ complete: () => {
+ if (retrying) {
+ return;
+ }
+ subscriber.next(mergedResponse);
+ nextRequest();
+ },
+ error: (error: unknown) => {
+ console.error(error, { msg: 'failed to shard' });
+ subscriber.next(mergedResponse);
+ if (retry()) {
+ return;
+ }
+ nextRequest();
+ },
+ });
+ };
+
+ const response = new Observable((subscriber) => {
+ groupTargetsByQueryType(splittingTargets, datasource, request).then((groupedRequests) => {
+ runNextRequest(subscriber, 0, groupedRequests);
+ });
+ return () => {
+ shouldStop = true;
+ if (retryTimer) {
+ clearTimeout(retryTimer);
+ }
+ if (subquerySubscription != null) {
+ subquerySubscription.unsubscribe();
+ subquerySubscription = null;
+ }
+ };
+ });
+
+ return response;
+}
+
+interface ShardedQueryGroup {
+ targets: LokiQuery[];
+ shards?: number[];
+ groupSize?: number;
+ cycle?: number;
+}
+
+async function groupTargetsByQueryType(
+ targets: LokiQuery[],
+ datasource: LokiDatasource,
+ request: DataQueryRequest
+) {
+ const [shardedQueries, otherQueries] = partition(targets, (query) => requestSupportsSharding([query]));
+ const groups: ShardedQueryGroup[] = [];
+
+ if (otherQueries.length) {
+ groups.push({
+ targets: otherQueries,
+ });
+ }
+
+ const selectorPartition = groupBy(shardedQueries, (query) => getSelectorForShardValues(query.expr));
+ for (const selector in selectorPartition) {
+ try {
+ const values = await datasource.languageProvider.fetchLabelValues('__stream_shard__', {
+ timeRange: request.range,
+ streamSelector: selector,
+ });
+ const shards = values.map((value) => parseInt(value, 10));
+ if (shards) {
+ shards.sort((a, b) => b - a);
+ debug(`Querying ${selector} with shards ${shards.join(', ')}`);
+ }
+ groups.push({
+ targets: selectorPartition[selector],
+ shards: shards.length ? shards : undefined,
+ groupSize: shards.length ? getInitialGroupSize(shards) : undefined,
+ cycle: 0,
+ });
+ } catch (error) {
+ console.error(error, { msg: 'failed to fetch label values for __stream_shard__' });
+ groups.push({
+ targets: selectorPartition[selector],
+ });
+ }
+ }
+
+ return groups;
+}
+
+function groupHasPendingRequests(group: ShardedQueryGroup) {
+ if (group.cycle === undefined || !group.groupSize || !group.shards) {
+ return false;
+ }
+ const { cycle, groupSize, shards } = group;
+ const nextCycle = Math.min(cycle + groupSize, shards.length);
+ group.cycle = nextCycle;
+ return cycle < shards.length && nextCycle <= shards.length;
+}
+
+function updateGroupSizeFromResponse(response: DataQueryResponse, group: ShardedQueryGroup) {
+ const { groupSize: currentSize } = group;
+ if (!currentSize) {
+ return 1;
+ }
+ if (!response.data.length) {
+ // Empty response, increase group size
+ return currentSize + 1;
+ }
+
+ const metaExecutionTime: QueryResultMetaStat | undefined = response.data[0].meta?.stats?.find(
+ (stat: QueryResultMetaStat) => stat.displayName === 'Summary: exec time'
+ );
+
+ if (metaExecutionTime) {
+ const executionTime = Math.round(metaExecutionTime.value);
+ debug(`${metaExecutionTime.value}`);
+ // Positive scenarios
+ if (executionTime <= 1) {
+ return Math.floor(currentSize * 1.5);
+ } else if (executionTime < 6) {
+ return Math.ceil(currentSize * 1.1);
+ }
+
+ // Negative scenarios
+ if (currentSize === 1) {
+ return currentSize;
+ } else if (executionTime < 20) {
+ return Math.ceil(currentSize * 0.9);
+ } else {
+ return Math.floor(currentSize / 2);
+ }
+ }
+
+ return currentSize;
+}
+
+/**
+ * Prevents the group size for ever being more than maxFactor% of the pending shards.
+ */
+function constrainGroupSize(cycle: number, groupSize: number, shards: number) {
+ const maxFactor = 0.7;
+ return Math.min(groupSize, Math.max(Math.floor((shards - cycle) * maxFactor), 1));
+}
+
+function groupShardRequests(shards: number[], start: number, groupSize: number) {
+ if (start === shards.length) {
+ return [-1];
+ }
+ return shards.slice(start, start + groupSize);
+}
+
+function getInitialGroupSize(shards: number[]) {
+ return Math.floor(Math.sqrt(shards.length));
+}
+
+// Enable to output debugging logs
+const DEBUG_ENABLED = Boolean(localStorage.getItem(`loki.sharding_debug_enabled`));
+function debug(message: string) {
+ if (!DEBUG_ENABLED) {
+ return;
+ }
+ console.log(message);
+}