From 2573cbec083e8f5160888a455efd435b8d03549f Mon Sep 17 00:00:00 2001 From: Matias Chomicki Date: Wed, 23 Oct 2024 13:21:03 +0200 Subject: [PATCH] Loki: Added support to split queries by stream shard (#94245) * Add shard query splitting implementation * Shard query splitting: reuse function from query splitting * Shard query splitting: remove max line limit * Shard query splitting: update test * Shard query splitting: fix types and non-sharded queries * Merge responses: fix log merging * Merge responses: remove legacy code * Query splitting: add support to retry failed requests * Query splitting: unit test request retrying * Query splitting: add unsubscriptions * Shard query splitting: fix retrying * Shard query splitting: switch to dynamic grouping * Shard query splitting: update group size thresholds and fix -1 query * Shard query splitting: update initial group size + don't retry parse errors * Shard query splitting: update unit test * chore: update mock value * Shard query splitting: add support for multiple targets * chore: update description * Shard query splitting: use group targets * chore: filter hidden queries * Shard query splitting: issue initial log query without sharding * Splitting: fix retrying in both methods * Merge responses: keep execution time * Shard query splitting: remove no-shard attempt * Shard query splitting: adjust groups based on rate of change * chore: clean up experiments * Shard query splittng: remove log query restrictions * Shard query splitting: remove fallback to time splitting * Loki: add new query direction * Missing generated file * LokiOptionField: integrate new query direction * Shard query splitting: delegate non-scan queries to time splitting * Query splitting: do not retry queries with parse errors * Loki datasource: add placeholder for feature flag * Shard query splitting: add function with support criteria * Shard query splitting: refactor query modification and shard logs volume * Shard query splitting: update unit tests * chore: Update scan direction tooltip * chore: formatting * LogsVolumePanel: fix missing state in logs volume panel data * Merge responses: better handle missing nanoseconds * LokiQueryOptionFields: display query direction for log queries * loki: process scan direction as backward * Loki datasource: restrict sharding to Explore * Retrying: invert criteria and move to response utils * Formatting * Use log volume refId constant * Fix import order * Create feature flag * Use feature toggle * LogsVolumePanel: prevent flashing no data while streaming --- .../feature-toggles/index.md | 1 + .../src/types/featureToggles.gen.ts | 1 + .../dataquery/x/LokiDataQuery_types.gen.ts | 1 + pkg/services/featuremgmt/registry.go | 7 + pkg/services/featuremgmt/toggles_gen.csv | 1 + pkg/services/featuremgmt/toggles_gen.go | 4 + pkg/services/featuremgmt/toggles_gen.json | 16 + .../kinds/dataquery/types_dataquery_gen.go | 1 + pkg/tsdb/loki/parse_query.go | 2 + .../explore/Logs/LogsVolumePanelList.tsx | 5 +- .../datasource/loki/__mocks__/frames.ts | 108 ++ .../loki/components/LokiOptionFields.tsx | 9 + .../app/plugins/datasource/loki/dataquery.cue | 2 +- .../plugins/datasource/loki/dataquery.gen.ts | 1 + .../app/plugins/datasource/loki/datasource.ts | 10 +- .../datasource/loki/mergeResponses.test.ts | 1282 +++++++++++++++++ .../plugins/datasource/loki/mergeResponses.ts | 318 ++++ .../plugins/datasource/loki/modifyQuery.ts | 2 +- .../datasource/loki/querySplitting.test.ts | 39 +- .../plugins/datasource/loki/querySplitting.ts | 74 +- .../app/plugins/datasource/loki/queryUtils.ts | 47 +- .../LokiQueryBuilderOptions.test.tsx | 2 + .../components/LokiQueryBuilderOptions.tsx | 11 +- .../plugins/datasource/loki/responseUtils.ts | 13 +- .../loki/shardQuerySplitting.test.ts | 455 ++++++ .../datasource/loki/shardQuerySplitting.ts | 353 +++++ 26 files changed, 2740 insertions(+), 25 deletions(-) create mode 100644 public/app/plugins/datasource/loki/mergeResponses.test.ts create mode 100644 public/app/plugins/datasource/loki/mergeResponses.ts create mode 100644 public/app/plugins/datasource/loki/shardQuerySplitting.test.ts create mode 100644 public/app/plugins/datasource/loki/shardQuerySplitting.ts diff --git a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md index 972439653ac..5111856e609 100644 --- a/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md +++ b/docs/sources/setup-grafana/configure-grafana/feature-toggles/index.md @@ -135,6 +135,7 @@ Experimental features might be changed or removed without prior notice. | `mysqlParseTime` | Ensure the parseTime flag is set for MySQL driver | | `alertingBacktesting` | Rule backtesting API for alerting | | `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files | +| `lokiShardSplitting` | Use stream shards to split queries into smaller subqueries | | `lokiQuerySplittingConfig` | Give users the option to configure split durations for Loki queries | | `individualCookiePreferences` | Support overriding cookie preferences per user | | `influxqlStreamingParser` | Enable streaming JSON parser for InfluxDB datasource InfluxQL query language | diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index ac423666cd9..644e5531261 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -52,6 +52,7 @@ export interface FeatureToggles { editPanelCSVDragAndDrop?: boolean; alertingNoNormalState?: boolean; logsContextDatasourceUi?: boolean; + lokiShardSplitting?: boolean; lokiQuerySplitting?: boolean; lokiQuerySplittingConfig?: boolean; individualCookiePreferences?: boolean; diff --git a/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts b/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts index b8d2b5ffc49..a9b2c7ecaee 100644 --- a/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts +++ b/packages/grafana-schema/src/raw/composable/loki/dataquery/x/LokiDataQuery_types.gen.ts @@ -33,6 +33,7 @@ export enum SupportingQueryType { export enum LokiQueryDirection { Backward = 'backward', Forward = 'forward', + Scan = 'scan', } export interface LokiDataQuery extends common.DataQuery { diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index d3ff4aa7755..2141ce0cd6d 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -265,6 +265,13 @@ var ( Expression: "true", // turned on by default AllowSelfServe: true, }, + { + Name: "lokiShardSplitting", + Description: "Use stream shards to split queries into smaller subqueries", + Stage: FeatureStageExperimental, + FrontendOnly: true, + Owner: grafanaObservabilityLogsSquad, + }, { Name: "lokiQuerySplitting", Description: "Split large interval queries into subqueries with smaller time intervals", diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index 4718a0d7ffa..a452651a933 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -33,6 +33,7 @@ alertingBacktesting,experimental,@grafana/alerting-squad,false,false,false editPanelCSVDragAndDrop,experimental,@grafana/dataviz-squad,false,false,true alertingNoNormalState,preview,@grafana/alerting-squad,false,false,false logsContextDatasourceUi,GA,@grafana/observability-logs,false,false,true +lokiShardSplitting,experimental,@grafana/observability-logs,false,false,true lokiQuerySplitting,GA,@grafana/observability-logs,false,false,true lokiQuerySplittingConfig,experimental,@grafana/observability-logs,false,false,true individualCookiePreferences,experimental,@grafana/grafana-backend-group,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 921f061f555..5e91ba9dd42 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -143,6 +143,10 @@ const ( // Allow datasource to provide custom UI for context view FlagLogsContextDatasourceUi = "logsContextDatasourceUi" + // FlagLokiShardSplitting + // Use stream shards to split queries into smaller subqueries + FlagLokiShardSplitting = "lokiShardSplitting" + // FlagLokiQuerySplitting // Split large interval queries into subqueries with smaller time intervals FlagLokiQuerySplitting = "lokiQuerySplitting" diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index 2e08fb55bf1..bb89eb8bcfc 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -1986,6 +1986,22 @@ "codeowner": "@grafana/observability-logs" } }, + { + "metadata": { + "name": "lokiShardSplitting", + "resourceVersion": "1729678036788", + "creationTimestamp": "2024-10-23T10:06:42Z", + "annotations": { + "grafana.app/updatedTimestamp": "2024-10-23 10:07:16.788828 +0000 UTC" + } + }, + "spec": { + "description": "Use stream shards to split queries into smaller subqueries", + "stage": "experimental", + "codeowner": "@grafana/observability-logs", + "frontend": true + } + }, { "metadata": { "name": "lokiStructuredMetadata", diff --git a/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go b/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go index f9dcd723805..c386bd5c3b4 100644 --- a/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go +++ b/pkg/tsdb/loki/kinds/dataquery/types_dataquery_gen.go @@ -13,6 +13,7 @@ package dataquery const ( LokiQueryDirectionBackward LokiQueryDirection = "backward" LokiQueryDirectionForward LokiQueryDirection = "forward" + LokiQueryDirectionScan LokiQueryDirection = "scan" ) // Defines values for LokiQueryType. diff --git a/pkg/tsdb/loki/parse_query.go b/pkg/tsdb/loki/parse_query.go index a4f39679d6c..d824bf596dc 100644 --- a/pkg/tsdb/loki/parse_query.go +++ b/pkg/tsdb/loki/parse_query.go @@ -94,6 +94,8 @@ func parseDirection(jsonPointerValue *string) (Direction, error) { return DirectionBackward, nil case "forward": return DirectionForward, nil + case "scan": + return DirectionBackward, nil default: return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue) } diff --git a/public/app/features/explore/Logs/LogsVolumePanelList.tsx b/public/app/features/explore/Logs/LogsVolumePanelList.tsx index a25a02b29dc..f2bbc0b3905 100644 --- a/public/app/features/explore/Logs/LogsVolumePanelList.tsx +++ b/public/app/features/explore/Logs/LogsVolumePanelList.tsx @@ -109,7 +109,7 @@ export const LogsVolumePanelList = ({ return ; } - if (numberOfLogVolumes === 0) { + if (numberOfLogVolumes === 0 && logsVolumeData?.state !== LoadingState.Streaming) { return (
@@ -122,7 +122,6 @@ export const LogsVolumePanelList = ({ return (
{Object.keys(logVolumes).map((name, index) => { - const logsVolumeData = { data: logVolumes[name] }; return ( > = [ label: 'Forward', description: 'Search in forward direction.', }, + { + value: LokiQueryDirection.Scan, + label: 'Scan', + description: 'Split the query into smaller units and stop at the requested log line limit.', + }, ]; +export function getQueryDirectionLabel(direction: LokiQueryDirection) { + return queryDirections.find((queryDirection) => queryDirection.value === direction)?.label ?? 'Unknown'; +} + if (config.featureToggles.lokiExperimentalStreaming) { queryTypeOptions.push({ value: LokiQueryType.Stream, diff --git a/public/app/plugins/datasource/loki/dataquery.cue b/public/app/plugins/datasource/loki/dataquery.cue index 97bc3103ef8..63dd1c0265a 100644 --- a/public/app/plugins/datasource/loki/dataquery.cue +++ b/public/app/plugins/datasource/loki/dataquery.cue @@ -49,7 +49,7 @@ composableKinds: DataQuery: { #SupportingQueryType: "logsVolume" | "logsSample" | "dataSample" | "infiniteScroll" @cuetsy(kind="enum") - #LokiQueryDirection: "forward" | "backward" @cuetsy(kind="enum") + #LokiQueryDirection: "forward" | "backward" | "scan" @cuetsy(kind="enum") } }] lenses: [] diff --git a/public/app/plugins/datasource/loki/dataquery.gen.ts b/public/app/plugins/datasource/loki/dataquery.gen.ts index 063deb3ee2f..99047e02221 100644 --- a/public/app/plugins/datasource/loki/dataquery.gen.ts +++ b/public/app/plugins/datasource/loki/dataquery.gen.ts @@ -31,6 +31,7 @@ export enum SupportingQueryType { export enum LokiQueryDirection { Backward = 'backward', Forward = 'forward', + Scan = 'scan', } export interface LokiDataQuery extends common.DataQuery { diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index 859b718c38c..d7f5862159f 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -82,9 +82,11 @@ import { getStreamSelectorsFromQuery, isLogsQuery, isQueryWithError, + requestSupportsSharding, requestSupportsSplitting, } from './queryUtils'; import { replaceVariables, returnVariables } from './querybuilder/parsingUtils'; +import { runShardSplitQuery } from './shardQuerySplitting'; import { convertToWebSocketUrl, doLokiChannelStream } from './streaming'; import { trackQuery } from './tracking'; import { @@ -359,7 +361,13 @@ export class LokiDatasource return this.runLiveQueryThroughBackend(fixedRequest); } - if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) { + if ( + config.featureToggles.lokiShardSplitting && + requestSupportsSharding(fixedRequest.targets) && + fixedRequest.app === CoreApp.Explore + ) { + return runShardSplitQuery(this, fixedRequest); + } else if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) { return runSplitQuery(this, fixedRequest); } diff --git a/public/app/plugins/datasource/loki/mergeResponses.test.ts b/public/app/plugins/datasource/loki/mergeResponses.test.ts new file mode 100644 index 00000000000..38f3833d6b0 --- /dev/null +++ b/public/app/plugins/datasource/loki/mergeResponses.test.ts @@ -0,0 +1,1282 @@ +import { cloneDeep } from 'lodash'; + +import { DataQueryResponse, Field, FieldType, QueryResultMetaStat } from '@grafana/data'; + +import { getMockFrames } from './__mocks__/frames'; +import { cloneQueryResponse, combineResponses } from './mergeResponses'; + +describe('cloneQueryResponse', () => { + const { logFrameA } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + it('clones query responses', () => { + const clonedA = cloneQueryResponse(responseA); + expect(clonedA).not.toBe(responseA); + expect(clonedA).toEqual(clonedA); + }); +}); + +describe('combineResponses', () => { + it('combines logs frames', () => { + const { logFrameA, logFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 2, 3, 4], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '3000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id4', 'id1', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines logs frames with transformed fields', () => { + const { logFrameA, logFrameB } = getMockFrames(); + const { logFrameB: originalLogFrameB } = getMockFrames(); + + // Pseudo shuffle fields + logFrameB.fields.sort((a: Field, b: Field) => (a.name < b.name ? -1 : 1)); + expect(logFrameB.fields).not.toEqual(originalLogFrameB.fields); + + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 2, 3, 4], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '3000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id4', 'id1', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines metric frames', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + labels: { + level: 'debug', + }, + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines and identifies new frames in the response', () => { + const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB, metricFrameC], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + labels: { + level: 'debug', + }, + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + metricFrameC, + ], + }); + }); + + it('combines frames prioritizing refIds over names', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const dataFrameA = { + ...metricFrameA, + refId: 'A', + name: 'A', + }; + const dataFrameB = { + ...metricFrameB, + refId: 'B', + name: 'A', + }; + const responseA: DataQueryResponse = { + data: [dataFrameA], + }; + const responseB: DataQueryResponse = { + data: [dataFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [dataFrameA, dataFrameB], + }); + }); + + it('combines frames in a new response instance', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(null, responseA)).not.toBe(responseA); + expect(combineResponses(null, responseB)).not.toBe(responseB); + }); + + it('combine when first param has errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const errorA = { + message: 'errorA', + }; + const responseA: DataQueryResponse = { + data: [metricFrameA], + error: errorA, + errors: [errorA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorA'); + expect(combined.errors).toHaveLength(1); + expect(combined.errors?.[0]?.message).toBe('errorA'); + }); + + it('combine when second param has errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const errorB = { + message: 'errorB', + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + error: errorB, + errors: [errorB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorB'); + expect(combined.errors).toHaveLength(1); + expect(combined.errors?.[0]?.message).toBe('errorB'); + }); + + it('combine when both frames have errors', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const errorA = { + message: 'errorA', + }; + const errorB = { + message: 'errorB', + }; + const responseA: DataQueryResponse = { + data: [metricFrameA], + error: errorA, + errors: [errorA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + error: errorB, + errors: [errorB], + }; + + const combined = combineResponses(responseA, responseB); + expect(combined.data[0].length).toBe(4); + expect(combined.error?.message).toBe('errorA'); + expect(combined.errors).toHaveLength(2); + expect(combined.errors?.[0]?.message).toBe('errorA'); + expect(combined.errors?.[1]?.message).toBe('errorB'); + }); + + it('combines frames with nanoseconds', () => { + const { logFrameA, logFrameB } = getMockFrames(); + logFrameA.fields[0].nanos = [333333, 444444]; + logFrameB.fields[0].nanos = [111111, 222222]; + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 2, 3, 4], + nanos: [111111, 222222, 333333, 444444], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '3000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id4', 'id1', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines frames without nanoseconds with frames with nanoseconds', () => { + const { logFrameA, logFrameB } = getMockFrames(); + logFrameA.fields[0].nanos = undefined; + logFrameB.fields[0].nanos = [111111, 222222]; + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 2, 3, 4], + nanos: [111111, 222222, 0, 0], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '3000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id4', 'id1', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines frames with nanoseconds with frames without nanoseconds', () => { + const { logFrameA, logFrameB } = getMockFrames(); + logFrameA.fields[0].nanos = [111111, 222222]; + logFrameB.fields[0].nanos = undefined; + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 2, 3, 4], + nanos: [0, 0, 111111, 222222], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '3000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id4', 'id1', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + describe('combine stats', () => { + const { metricFrameA } = getMockFrames(); + const makeResponse = (stats?: QueryResultMetaStat[]): DataQueryResponse => ({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats, + }, + }, + ], + }); + it('two values', () => { + const responseA = makeResponse([ + { displayName: 'Ingester: total reached', value: 1 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + const responseB = makeResponse([ + { displayName: 'Ingester: total reached', value: 2 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 }, + ]); + + expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 33 }, + ]); + }); + + it('one value', () => { + const responseA = makeResponse([ + { displayName: 'Ingester: total reached', value: 1 }, + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + const responseB = makeResponse(); + + expect(combineResponses(responseA, responseB).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + + expect(combineResponses(responseB, responseA).data[0].meta.stats).toStrictEqual([ + { displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 11 }, + ]); + }); + + it('no value', () => { + const responseA = makeResponse(); + const responseB = makeResponse(); + expect(combineResponses(responseA, responseB).data[0].meta.stats).toHaveLength(0); + }); + }); + + it('does not combine frames with different refId', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + metricFrameA.refId = 'A'; + metricFrameB.refId = 'B'; + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [metricFrameA, metricFrameB], + }); + }); + + it('does not combine frames with different refId', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + metricFrameA.name = 'A'; + metricFrameB.name = 'B'; + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [metricFrameA, metricFrameB], + }); + }); + + it('when fields with the same name are present, uses labels to find the right field to combine', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + + metricFrameA.fields.push({ + name: 'Value', + type: FieldType.number, + config: {}, + values: [9, 8], + labels: { + test: 'true', + }, + }); + metricFrameB.fields.push({ + name: 'Value', + type: FieldType.number, + config: {}, + values: [11, 10], + labels: { + test: 'true', + }, + }); + + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + labels: { + level: 'debug', + }, + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [11, 10, 9, 8], + labels: { + test: 'true', + }, + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('when fields with the same name are present and labels are not present, falls back to indexes', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + + delete metricFrameA.fields[1].labels; + delete metricFrameB.fields[1].labels; + + metricFrameA.fields.push({ + name: 'Value', + type: FieldType.number, + config: {}, + values: [9, 8], + }); + metricFrameB.fields.push({ + name: 'Value', + type: FieldType.number, + config: {}, + values: [11, 10], + }); + + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [11, 10, 9, 8], + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); +}); + +describe('mergeFrames', () => { + it('combines metric frames', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameB], + }; + const responseB: DataQueryResponse = { + data: [metricFrameA], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + labels: { + level: 'debug', + }, + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('adds old to new values when combining', () => { + const { metricFrameA, metricFrameB } = getMockFrames(); + + metricFrameB.fields[0].values = [3000000, 3500000, 4000000]; + metricFrameB.fields[1].values = [5, 10, 6]; + + const responseA: DataQueryResponse = { + data: [metricFrameA], + }; + const responseB: DataQueryResponse = { + data: [metricFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [3000000, 3500000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [10, 10, 10], + labels: { + level: 'debug', + }, + }, + ], + length: 3, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('combines and identifies new frames in the response', () => { + const { metricFrameA, metricFrameB, metricFrameC } = getMockFrames(); + const responseA: DataQueryResponse = { + data: [metricFrameB], + }; + const responseB: DataQueryResponse = { + data: [metricFrameA, metricFrameC], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1000000, 2000000, 3000000, 4000000], + }, + { + config: {}, + name: 'Value', + type: 'number', + values: [6, 7, 5, 4], + labels: { + level: 'debug', + }, + }, + ], + length: 4, + meta: { + type: 'timeseries-multi', + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + metricFrameC, + ], + }); + }); + + it('merges logs frames', () => { + const { logFrameA, logFrameB } = getMockFrames(); + + // 3 overlaps with logFrameA + logFrameB.fields[0].values = [2, 3]; + logFrameB.fields[1].values = ['line4', 'line1']; + logFrameB.fields[4].values = ['id4', 'id1']; + + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [2, 3, 4], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line4', 'line1', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id4', 'id1', 'id2'], + }, + ], + length: 3, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('merges frames with nanoseconds', () => { + const { logFrameA, logFrameB } = getMockFrames(); + + logFrameA.fields[0].values = [3, 4]; + logFrameA.fields[0].nanos = [333333, 444444]; + + // 3 overlaps with logFrameA + logFrameB.fields[0].values = [2, 3]; + logFrameB.fields[0].nanos = [222222, 333333]; + logFrameB.fields[4].values = ['id4', 'id1']; + + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [2, 3, 4], + nanos: [222222, 333333, 444444], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line4', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '2000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id4', 'id1', 'id2'], + }, + ], + length: 3, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('receiving existing values do not introduce inconsistencies', () => { + const { logFrameA, logFrameAB } = getMockFrames(); + + const responseA: DataQueryResponse = { + data: [cloneDeep(logFrameAB)], + }; + const responseB: DataQueryResponse = { + data: [cloneDeep(logFrameA)], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + ...logFrameAB, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + }, + ], + }); + }); + + it('considers nanoseconds to merge the frames', () => { + const { logFrameA, logFrameB } = getMockFrames(); + + // Same timestamps but different nanos + logFrameA.fields[0].values = [1, 2]; + logFrameA.fields[0].nanos = [333333, 444444]; + logFrameB.fields[0].values = [1, 2]; + logFrameB.fields[0].nanos = [222222, 333333]; + + const responseA: DataQueryResponse = { + data: [logFrameA], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + fields: [ + { + config: {}, + name: 'Time', + type: 'time', + values: [1, 1, 2, 2], + nanos: [222222, 333333, 333333, 444444], + }, + { + config: {}, + name: 'Line', + type: 'string', + values: ['line3', 'line1', 'line4', 'line2'], + }, + { + config: {}, + name: 'labels', + type: 'other', + values: [ + { + otherLabel: 'other value', + }, + { + label: 'value', + }, + { + otherLabel: 'other value', + }, + ], + }, + { + config: {}, + name: 'tsNs', + type: 'string', + values: ['1000000', '3000000', '2000000', '4000000'], + }, + { + config: {}, + name: 'id', + type: 'string', + values: ['id3', 'id1', 'id4', 'id2'], + }, + ], + length: 4, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 33, + }, + ], + }, + refId: 'A', + }, + ], + }); + }); + + it('correctly handles empty responses', () => { + const { emptyFrame, logFrameB } = getMockFrames(); + + logFrameB.fields[0].values = [1, 2]; + logFrameB.fields[0].nanos = [222222, 333333]; + + const responseA: DataQueryResponse = { + data: [emptyFrame], + }; + const responseB: DataQueryResponse = { + data: [logFrameB], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + ...logFrameB, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 22 }], + }, + length: 2, + }, + ], + }); + }); + + it('merging exactly the same data produces the same data', () => { + const { logFrameA } = getMockFrames(); + + const responseA: DataQueryResponse = { + data: [cloneDeep(logFrameA)], + }; + const responseB: DataQueryResponse = { + data: [cloneDeep(logFrameA)], + }; + expect(combineResponses(responseA, responseB)).toEqual({ + data: [ + { + ...logFrameA, + meta: { + custom: { + frameType: 'LabeledTimeValues', + }, + stats: [ + { + displayName: 'Summary: total bytes processed', + unit: 'decbytes', + value: 22, + }, + ], + }, + }, + ], + }); + }); +}); diff --git a/public/app/plugins/datasource/loki/mergeResponses.ts b/public/app/plugins/datasource/loki/mergeResponses.ts new file mode 100644 index 00000000000..d37413650fc --- /dev/null +++ b/public/app/plugins/datasource/loki/mergeResponses.ts @@ -0,0 +1,318 @@ +import { + closestIdx, + DataFrame, + DataFrameType, + DataQueryResponse, + DataQueryResponseData, + Field, + FieldType, + LoadingState, + QueryResultMetaStat, + shallowCompare, +} from '@grafana/data'; + +import { LOADING_FRAME_NAME } from './querySplitting'; + +export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) { + if (!currentResponse) { + return cloneQueryResponse(newResponse); + } + + newResponse.data.forEach((newFrame) => { + const currentFrame = currentResponse.data.find((frame) => shouldCombine(frame, newFrame)); + if (!currentFrame) { + currentResponse.data.push(cloneDataFrame(newFrame)); + return; + } + mergeFrames(currentFrame, newFrame); + }); + + const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])]; + if (mergedErrors.length > 0) { + currentResponse.errors = mergedErrors; + } + + // the `.error` attribute is obsolete now, + // but we have to maintain it, otherwise + // some grafana parts do not behave well. + // we just choose the old error, if it exists, + // otherwise the new error, if it exists. + const mergedError = currentResponse.error ?? newResponse.error; + if (mergedError != null) { + currentResponse.error = mergedError; + } + + const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])]; + if (mergedTraceIds.length > 0) { + currentResponse.traceIds = mergedTraceIds; + } + + return currentResponse; +} + +/** + * Given an existing DataQueryResponse, replace any data frame present in newResponse with those in newResponse + */ +export function replaceResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) { + if (!currentResponse) { + return cloneQueryResponse(newResponse); + } + + newResponse.data.forEach((newFrame) => { + const currentFrameIndex = currentResponse.data.findIndex((frame) => shouldCombine(frame, newFrame)); + if (currentFrameIndex < 0) { + currentResponse.data.push(cloneDataFrame(newFrame)); + return; + } + currentResponse.data[currentFrameIndex] = newFrame; + }); + + // Clean up loading frame when newResponse contains the final response + if (newResponse.state === LoadingState.Done) { + currentResponse.data = currentResponse.data.filter((frame) => frame.name !== LOADING_FRAME_NAME); + } + + const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])]; + if (mergedErrors.length > 0) { + currentResponse.errors = mergedErrors; + } + + const mergedError = currentResponse.error ?? newResponse.error; + if (mergedError != null) { + currentResponse.error = mergedError; + } + + const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])]; + if (mergedTraceIds.length > 0) { + currentResponse.traceIds = mergedTraceIds; + } + + return currentResponse; +} + +/** + * Given two data frames, merge their values. Overlapping values will be added together. + */ +export function mergeFrames(dest: DataFrame, source: DataFrame) { + const destTimeField = dest.fields.find((field) => field.type === FieldType.time); + const destIdField = dest.fields.find((field) => field.type === FieldType.string && field.name === 'id'); + const sourceTimeField = source.fields.find((field) => field.type === FieldType.time); + const sourceIdField = source.fields.find((field) => field.type === FieldType.string && field.name === 'id'); + + if (!destTimeField || !sourceTimeField) { + console.error(new Error(`Time fields not found in the data frames`)); + return; + } + + const sourceTimeValues = sourceTimeField?.values.slice(0) ?? []; + const totalFields = Math.max(dest.fields.length, source.fields.length); + + for (let i = 0; i < sourceTimeValues.length; i++) { + const destIdx = resolveIdx(destTimeField, sourceTimeField, i); + + const entryExistsInDest = compareEntries(destTimeField, destIdField, destIdx, sourceTimeField, sourceIdField, i); + + for (let f = 0; f < totalFields; f++) { + // For now, skip undefined fields that exist in the new frame + if (!dest.fields[f]) { + continue; + } + // Index is not reliable when frames have disordered fields, or an extra/missing field, so we find them by name. + // If the field has no name, we fallback to the old index version. + const sourceField = findSourceField(dest.fields[f], source.fields, f); + if (!sourceField) { + continue; + } + // Same value, accumulate + if (entryExistsInDest) { + if (dest.fields[f].type === FieldType.time) { + // Time already exists, skip + continue; + } else if (dest.fields[f].type === FieldType.number) { + // Number, add + dest.fields[f].values[destIdx] = (dest.fields[f].values[destIdx] ?? 0) + sourceField.values[i]; + } else if (dest.fields[f].type === FieldType.other) { + // Possibly labels, combine + if (typeof sourceField.values[i] === 'object') { + dest.fields[f].values[destIdx] = { + ...dest.fields[f].values[destIdx], + ...sourceField.values[i], + }; + } else if (sourceField.values[i]) { + dest.fields[f].values[destIdx] = sourceField.values[i]; + } + } else { + // Replace value + dest.fields[f].values[destIdx] = sourceField.values[i]; + } + } else if (sourceField.values[i] !== undefined) { + // Insert in the `destIdx` position + dest.fields[f].values.splice(destIdx, 0, sourceField.values[i]); + if (sourceField.nanos) { + dest.fields[f].nanos = dest.fields[f].nanos ?? new Array(dest.fields[f].values.length - 1).fill(0); + dest.fields[f].nanos?.splice(destIdx, 0, sourceField.nanos[i]); + } else if (dest.fields[f].nanos) { + dest.fields[f].nanos?.splice(destIdx, 0, 0); + } + } + } + } + + dest.length = dest.fields[0].values.length; + + dest.meta = { + ...dest.meta, + stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []), + }; +} + +function resolveIdx(destField: Field, sourceField: Field, index: number) { + const idx = closestIdx(sourceField.values[index], destField.values); + if (idx < 0) { + return 0; + } + if (sourceField.values[index] === destField.values[idx] && sourceField.nanos && destField.nanos) { + return sourceField.nanos[index] > destField.nanos[idx] ? idx + 1 : idx; + } + if (sourceField.values[index] > destField.values[idx]) { + return idx + 1; + } + return idx; +} + +function compareEntries( + destTimeField: Field, + destIdField: Field | undefined, + destIndex: number, + sourceTimeField: Field, + sourceIdField: Field | undefined, + sourceIndex: number +) { + const sameTimestamp = compareNsTimestamps(destTimeField, destIndex, sourceTimeField, sourceIndex); + if (!sameTimestamp) { + return false; + } + if (!destIdField || !sourceIdField) { + return true; + } + // Log frames, check indexes + return ( + destIdField.values[destIndex] !== undefined && destIdField.values[destIndex] === sourceIdField.values[sourceIndex] + ); +} + +function compareNsTimestamps(destField: Field, destIndex: number, sourceField: Field, sourceIndex: number) { + if (destField.nanos && sourceField.nanos) { + return ( + destField.values[destIndex] !== undefined && + destField.values[destIndex] === sourceField.values[sourceIndex] && + destField.nanos[destIndex] !== undefined && + destField.nanos[destIndex] === sourceField.nanos[sourceIndex] + ); + } + return destField.values[destIndex] !== undefined && destField.values[destIndex] === sourceField.values[sourceIndex]; +} + +function findSourceField(referenceField: Field, sourceFields: Field[], index: number) { + const candidates = sourceFields.filter((f) => f.name === referenceField.name); + + if (candidates.length === 1) { + return candidates[0]; + } + + if (referenceField.labels) { + return candidates.find((candidate) => shallowCompare(referenceField.labels ?? {}, candidate.labels ?? {})); + } + + return sourceFields[index]; +} + +const TOTAL_BYTES_STAT = 'Summary: total bytes processed'; +const EXEC_TIME_STAT = 'Summary: exec time'; +// This is specific for Loki +function getCombinedMetadataStats( + destStats: QueryResultMetaStat[], + sourceStats: QueryResultMetaStat[] +): QueryResultMetaStat[] { + // in the current approach, we only handle a single stat + const stats: QueryResultMetaStat[] = []; + for (const stat of [TOTAL_BYTES_STAT, EXEC_TIME_STAT]) { + const destStat = destStats.find((s) => s.displayName === stat); + const sourceStat = sourceStats.find((s) => s.displayName === stat); + + if (sourceStat != null && destStat != null) { + stats.push({ value: sourceStat.value + destStat.value, displayName: stat, unit: destStat.unit }); + continue; + } + + // maybe one of them exist + const eitherStat = sourceStat ?? destStat; + if (eitherStat != null) { + stats.push(eitherStat); + } + } + return stats; +} + +/** + * Deep clones a DataQueryResponse + */ +export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse { + const newResponse = { + ...response, + data: response.data.map(cloneDataFrame), + }; + return newResponse; +} + +function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData { + return { + ...frame, + fields: frame.fields.map((field: Field) => ({ + ...field, + values: field.values, + })), + }; +} + +function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean { + if (frame1.refId !== frame2.refId || frame1.name !== frame2.name) { + return false; + } + + const frameType1 = frame1.meta?.type; + const frameType2 = frame2.meta?.type; + + if (frameType1 !== frameType2) { + // we do not join things that have a different type + return false; + } + + // metric range query data + if (frameType1 === DataFrameType.TimeSeriesMulti) { + const field1 = frame1.fields.find((f) => f.type === FieldType.number); + const field2 = frame2.fields.find((f) => f.type === FieldType.number); + if (field1 === undefined || field2 === undefined) { + // should never happen + return false; + } + + return shallowCompare(field1.labels ?? {}, field2.labels ?? {}); + } + + // logs query data + // logs use a special attribute in the dataframe's "custom" section + // because we do not have a good "frametype" value for them yet. + const customType1 = frame1.meta?.custom?.frameType; + const customType2 = frame2.meta?.custom?.frameType; + // Legacy frames have this custom type + if (customType1 === 'LabeledTimeValues' && customType2 === 'LabeledTimeValues') { + return true; + } else if (customType1 === customType2) { + // Data plane frames don't + return true; + } + + // should never reach here + return false; +} diff --git a/public/app/plugins/datasource/loki/modifyQuery.ts b/public/app/plugins/datasource/loki/modifyQuery.ts index 293d78fc660..67ca4f0e6bb 100644 --- a/public/app/plugins/datasource/loki/modifyQuery.ts +++ b/public/app/plugins/datasource/loki/modifyQuery.ts @@ -151,7 +151,7 @@ export function addLabelToQuery( value: string, labelType?: LabelType | null ): string { - if (!key || !value) { + if (!key) { throw new Error('Need label to add to query.'); } diff --git a/public/app/plugins/datasource/loki/querySplitting.test.ts b/public/app/plugins/datasource/loki/querySplitting.test.ts index 47f36658731..9c8326eb767 100644 --- a/public/app/plugins/datasource/loki/querySplitting.test.ts +++ b/public/app/plugins/datasource/loki/querySplitting.test.ts @@ -1,6 +1,7 @@ import { of } from 'rxjs'; import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data'; +import { config } from '@grafana/runtime'; import { createLokiDatasource } from './__mocks__/datasource'; import { getMockFrames } from './__mocks__/frames'; @@ -16,6 +17,19 @@ jest.mock('uuid', () => ({ v4: jest.fn().mockReturnValue('uuid'), })); +const originalShardingFlagState = config.featureToggles.lokiShardSplitting; +beforeAll(() => { + // @ts-expect-error + jest.spyOn(global, 'setTimeout').mockImplementation((callback) => { + callback(); + }); + config.featureToggles.lokiShardSplitting = false; +}); +afterAll(() => { + jest.mocked(global.setTimeout).mockReset(); + config.featureToggles.lokiShardSplitting = originalShardingFlagState; +}); + describe('runSplitQuery()', () => { let datasource: LokiDatasource; const range = { @@ -51,6 +65,26 @@ describe('runSplitQuery()', () => { }); }); + test('Retries retriable failed requests', async () => { + jest + .mocked(datasource.runQuery) + .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] })); + await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 1 retry, 4 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(4); + }); + }); + + test('Does not retry on other errors', async () => { + jest + .mocked(datasource.runQuery) + .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })); + await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { + // 3 days, 3 chunks, 3 requests. + expect(datasource.runQuery).toHaveBeenCalledTimes(1); + }); + }); + test('Metric queries with maxLines of 0 will execute', async () => { const request = createRequest([{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }]); await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { @@ -285,9 +319,10 @@ describe('runSplitQuery()', () => { describe('Dynamic maxLines for logs requests', () => { const request = createRequest([{ expr: '{a="b"}', refId: 'A', maxLines: 4 }]); - const { logFrameA } = getMockFrames(); + const { logFrameA, logFrameB } = getMockFrames(); beforeEach(() => { - jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); + jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' })); + jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameB], refId: 'A' })); }); test('Stops requesting once maxLines of logs have been received', async () => { await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { diff --git a/public/app/plugins/datasource/loki/querySplitting.ts b/public/app/plugins/datasource/loki/querySplitting.ts index 6215f3ff42e..acd0458ca1e 100644 --- a/public/app/plugins/datasource/loki/querySplitting.ts +++ b/public/app/plugins/datasource/loki/querySplitting.ts @@ -14,12 +14,13 @@ import { TimeRange, LoadingState, } from '@grafana/data'; -import { combineResponses } from '@grafana/o11y-ds-frontend'; import { LokiDatasource } from './datasource'; import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting'; +import { combineResponses } from './mergeResponses'; import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting'; import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils'; +import { isRetriableError } from './responseUtils'; import { trackGroupedQueries } from './tracking'; import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types'; @@ -53,7 +54,7 @@ export function partitionTimeRange( * At the end, we will filter the targets that don't need to be executed in the next request batch, * becasue, for example, the `maxLines` have been reached. */ -function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] { +export function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] { if (!response) { return targets; } @@ -82,8 +83,18 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok const longestPartition = requests.filter(({ partition }) => partition.length === totalRequests)[0].partition; let shouldStop = false; - let subquerySubsciption: Subscription | null = null; + let subquerySubscription: Subscription | null = null; + let retriesMap = new Map(); + let retryTimer: ReturnType | null = null; + const runNextRequest = (subscriber: Subscriber, requestN: number, requestGroup: number) => { + let retrying = false; + + if (subquerySubscription != null) { + subquerySubscription.unsubscribe(); + subquerySubscription = null; + } + if (shouldStop) { subscriber.complete(); return; @@ -104,6 +115,37 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok done(); }; + const retry = (errorResponse?: DataQueryResponse) => { + try { + if (errorResponse && !isRetriableError(errorResponse)) { + return false; + } + } catch (e) { + console.error(e); + shouldStop = true; + return false; + } + + const key = `${requestN}-${requestGroup}`; + const retries = retriesMap.get(key) ?? 0; + if (retries > 3) { + return false; + } + + retriesMap.set(key, retries + 1); + + retryTimer = setTimeout( + () => { + runNextRequest(subscriber, requestN, requestGroup); + }, + 1500 * Math.pow(2, retries) + ); // Exponential backoff + + retrying = true; + + return true; + }; + const group = requests[requestGroup]; const range = group.partition[requestN - 1]; const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse); @@ -119,20 +161,29 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok subRequest.requestId = `${group.request.requestId}_${requestN}`; } - subquerySubsciption = datasource.runQuery(subRequest).subscribe({ + subquerySubscription = datasource.runQuery(subRequest).subscribe({ next: (partialResponse) => { - mergedResponse = combineResponses(mergedResponse, partialResponse); - mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN); - if ((mergedResponse.errors ?? []).length > 0 || mergedResponse.error != null) { + if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) { + if (retry(partialResponse)) { + return; + } shouldStop = true; } + mergedResponse = combineResponses(mergedResponse, partialResponse); + mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN); }, complete: () => { + if (retrying) { + return; + } subscriber.next(mergedResponse); nextRequest(); }, error: (error) => { subscriber.error(error); + if (retry()) { + return; + } }, }); }; @@ -141,8 +192,13 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok runNextRequest(subscriber, totalRequests, 0); return () => { shouldStop = true; - if (subquerySubsciption != null) { - subquerySubsciption.unsubscribe(); + if (retryTimer) { + clearTimeout(retryTimer); + retryTimer = null; + } + if (subquerySubscription != null) { + subquerySubscription.unsubscribe(); + subquerySubscription = null; } }; }); diff --git a/public/app/plugins/datasource/loki/queryUtils.ts b/public/app/plugins/datasource/loki/queryUtils.ts index 368f1819620..c3b3d6ed195 100644 --- a/public/app/plugins/datasource/loki/queryUtils.ts +++ b/public/app/plugins/datasource/loki/queryUtils.ts @@ -25,9 +25,10 @@ import { } from '@grafana/lezer-logql'; import { DataQuery } from '@grafana/schema'; -import { getStreamSelectorPositions, NodePosition } from './modifyQuery'; +import { REF_ID_STARTER_LOG_VOLUME } from './datasource'; +import { addLabelToQuery, getStreamSelectorPositions, NodePosition } from './modifyQuery'; import { ErrorId } from './querybuilder/parsingUtils'; -import { LokiQuery, LokiQueryType } from './types'; +import { LabelType, LokiQuery, LokiQueryDirection, LokiQueryType } from './types'; /** * Returns search terms from a LogQL query. @@ -313,6 +314,18 @@ export function requestSupportsSplitting(allQueries: LokiQuery[]) { return queries.length > 0; } +export function requestSupportsSharding(allQueries: LokiQuery[]) { + const queries = allQueries + .filter((query) => !query.hide) + .filter((query) => !query.refId.includes('do-not-shard')) + .filter((query) => query.expr) + .filter( + (query) => query.direction === LokiQueryDirection.Scan || query.refId?.startsWith(REF_ID_STARTER_LOG_VOLUME) + ); + + return queries.length > 0; +} + export const isLokiQuery = (query: DataQuery): query is LokiQuery => { if (!query) { return false; @@ -328,3 +341,33 @@ export const getLokiQueryFromDataQuery = (query?: DataQuery): LokiQuery | undefi return query; }; + +export const interpolateShardingSelector = (queries: LokiQuery[], shards: number[]) => { + if (shards.length === 0) { + return queries; + } + + let shardValue = shards.join('|'); + + // -1 means empty shard value + if (shardValue === '-1' || shards.length === 1) { + shardValue = shardValue === '-1' ? '' : shardValue; + return queries.map((query) => ({ + ...query, + expr: addLabelToQuery(query.expr, '__stream_shard__', '=', shardValue, LabelType.Indexed), + })); + } + + return queries.map((query) => ({ + ...query, + expr: addLabelToQuery(query.expr, '__stream_shard__', '=~', shardValue, LabelType.Indexed), + })); +}; + +export const getSelectorForShardValues = (query: string) => { + const selector = getNodesFromQuery(query, [Selector]); + if (selector.length > 0) { + return query.substring(selector[0].from, selector[0].to); + } + return ''; +}; diff --git a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx index e1154b21eef..e41af2534fa 100644 --- a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx +++ b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.test.tsx @@ -89,6 +89,7 @@ describe('LokiQueryBuilderOptions', () => { setup({ expr: '{foo="bar"}' }); expect(screen.getByText('Line limit: 20')).toBeInTheDocument(); expect(screen.getByText('Type: Range')).toBeInTheDocument(); + expect(screen.getByText('Direction: Backward')).toBeInTheDocument(); expect(screen.queryByText(/step/i)).not.toBeInTheDocument(); }); @@ -98,6 +99,7 @@ describe('LokiQueryBuilderOptions', () => { expect(screen.getByText('Type: Range')).toBeInTheDocument(); expect(screen.getByText('Step: 1m')).toBeInTheDocument(); expect(screen.getByText('Resolution: 1/2')).toBeInTheDocument(); + expect(screen.queryByText(/Direction/)).not.toBeInTheDocument(); }); it('does not shows resolution field if resolution is not set', async () => { diff --git a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx index 5fe4714369c..ec013d185df 100644 --- a/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx +++ b/public/app/plugins/datasource/loki/querybuilder/components/LokiQueryBuilderOptions.tsx @@ -8,6 +8,7 @@ import { config, reportInteraction } from '@grafana/runtime'; import { Alert, AutoSizeInput, RadioButtonGroup, Select } from '@grafana/ui'; import { + getQueryDirectionLabel, preprocessMaxLines, queryDirections, queryTypeOptions, @@ -102,7 +103,7 @@ export const LokiQueryBuilderOptions = React.memo( x.value === queryType); const resolutionLabel = RESOLUTION_OPTIONS.find((x) => x.value === (query.resolution ?? 1)); @@ -218,9 +220,8 @@ function getCollapsedInfo( if (isLogQuery) { items.push(`Line limit: ${query.maxLines ?? maxLines}`); - } - - if (!isLogQuery) { + items.push(`Direction: ${getQueryDirectionLabel(direction)}`); + } else { if (query.step) { items.push(`Step: ${isValidStep ? query.step : 'Invalid value'}`); } diff --git a/public/app/plugins/datasource/loki/responseUtils.ts b/public/app/plugins/datasource/loki/responseUtils.ts index 1b1e909fefb..3d9bc01342d 100644 --- a/public/app/plugins/datasource/loki/responseUtils.ts +++ b/public/app/plugins/datasource/loki/responseUtils.ts @@ -1,4 +1,4 @@ -import { DataFrame, FieldType, isValidGoDuration, Labels } from '@grafana/data'; +import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data'; import { isBytesString, processLabels } from './languageUtils'; import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser'; @@ -131,3 +131,14 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n } return levelLikeLabel; } + +export function isRetriableError(errorResponse: DataQueryResponse) { + const message = errorResponse.errors ? (errorResponse.errors[0].message ?? '').toLowerCase() : ''; + if (message.includes('timeout')) { + return true; + } else if (message.includes('parse error') || message.includes('max entries')) { + // If the error is a parse error, we want to signal to stop querying. + throw new Error(message); + } + return false; +} diff --git a/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts b/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts new file mode 100644 index 00000000000..dc52b82fea0 --- /dev/null +++ b/public/app/plugins/datasource/loki/shardQuerySplitting.test.ts @@ -0,0 +1,455 @@ +import { of } from 'rxjs'; + +import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data'; + +import { createLokiDatasource } from './__mocks__/datasource'; +import { getMockFrames } from './__mocks__/frames'; +import { LokiDatasource } from './datasource'; +import { runShardSplitQuery } from './shardQuerySplitting'; +import { LokiQuery, LokiQueryDirection } from './types'; + +jest.mock('uuid', () => ({ + v4: jest.fn().mockReturnValue('uuid'), +})); + +const originalLog = console.log; +const originalWarn = console.warn; +beforeEach(() => { + //jest.spyOn(console, 'log').mockImplementation(() => {}); + jest.spyOn(console, 'warn').mockImplementation(() => {}); +}); +afterAll(() => { + console.log = originalLog; + console.warn = originalWarn; +}); + +describe('runShardSplitQuery()', () => { + let datasource: LokiDatasource; + const range = { + from: dateTime('2023-02-08T04:00:00.000Z'), + to: dateTime('2023-02-08T11:00:00.000Z'), + raw: { + from: dateTime('2023-02-08T04:00:00.000Z'), + to: dateTime('2023-02-08T11:00:00.000Z'), + }, + }; + + const createRequest = (targets: Array>, overrides?: Partial>) => { + let request = { + range, + targets, + intervalMs: 60000, + requestId: 'TEST', + } as DataQueryRequest; + + Object.assign(request, overrides); + return request; + }; + let request: DataQueryRequest; + beforeEach(() => { + request = createRequest([ + { expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan }, + ]); + datasource = createLokiDatasource(); + datasource.languageProvider.fetchLabelValues = jest.fn(); + datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => { + return queries.map((query) => { + query.expr = query.expr.replace('$SELECTOR', '{a="b"}'); + return query; + }); + }); + jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '2', '20', '3']); + const { metricFrameA } = getMockFrames(); + jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [metricFrameA] })); + jest.spyOn(datasource, 'query').mockReturnValue(of({ data: [metricFrameA] })); + }); + + test('Splits datasource queries', async () => { + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => { + // 5 shards, 3 groups + empty shard group, 4 requests + expect(datasource.runQuery).toHaveBeenCalledTimes(4); + }); + }); + + test('Interpolates queries before running', async () => { + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => { + expect(datasource.interpolateVariablesInQueries).toHaveBeenCalledTimes(1); + + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_0_2', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"20|10"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_2_2', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_4_1', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_5_1', + targets: [ + { expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan }, + ], + }); + }); + }); + + test('Sends the whole stream selector to fetch values', async () => { + datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => { + return queries.map((query) => { + query.expr = query.expr.replace('$SELECTOR', '{service_name="test", filter="true"}'); + return query; + }); + }); + + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => { + expect(datasource.languageProvider.fetchLabelValues).toHaveBeenCalledWith('__stream_shard__', { + streamSelector: '{service_name="test", filter="true"}', + timeRange: expect.anything(), + }); + + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_0_2', + targets: [ + { + expr: 'count_over_time({service_name="test", filter="true", __stream_shard__=~"20|10"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + }); + }); + + test('Returns a DataQueryResponse with the expected attributes', async () => { + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => { + expect(response[0].data).toBeDefined(); + expect(response[0].state).toBe(LoadingState.Done); + expect(response[0].key).toBeDefined(); + }); + }); + + test('Retries failed retriable requests', async () => { + jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']); + jest + .spyOn(datasource, 'runQuery') + .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] })); + // @ts-expect-error + jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => { + callback(); + }); + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => { + // 1 shard + empty shard + 1 retry = 3 + expect(response).toHaveLength(3); + expect(datasource.runQuery).toHaveBeenCalledTimes(3); + }); + }); + + test('Does not retry on other errors', async () => { + jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']); + jest + .spyOn(datasource, 'runQuery') + .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] })); + // @ts-expect-error + jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => { + callback(); + }); + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => { + expect(datasource.runQuery).toHaveBeenCalledTimes(2); + }); + }); + + test('Adjusts the group size based on errors and execution time', async () => { + const request = createRequest( + [{ expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan }], + { + range: { + from: dateTime('2024-11-13T05:00:00.000Z'), + to: dateTime('2024-11-14T06:00:00.000Z'), + raw: { + from: dateTime('2024-11-13T05:00:00.000Z'), + to: dateTime('2024-11-14T06:00:00.000Z'), + }, + }, + } + ); + + jest + .mocked(datasource.languageProvider.fetchLabelValues) + .mockResolvedValue(['1', '10', '2', '20', '3', '4', '5', '6', '7', '8', '9']); + + // @ts-expect-error + jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => { + callback(); + }); + + const { metricFrameA } = getMockFrames(); + + jest.mocked(datasource.runQuery).mockReset(); + + // + 50% + jest.mocked(datasource.runQuery).mockReturnValueOnce( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 0.5, + }, + ], + }, + }, + ], + }) + ); + + // sqrt(currentSize) + jest + .mocked(datasource.runQuery) + .mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] })); + + // +10% + jest.mocked(datasource.runQuery).mockReturnValueOnce( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 5, + }, + ], + }, + }, + ], + }) + ); + + // -10% + jest.mocked(datasource.runQuery).mockReturnValueOnce( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 15, + }, + ], + }, + }, + ], + }) + ); + + // -10% + jest.mocked(datasource.runQuery).mockReturnValueOnce( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 19, + }, + ], + }, + }, + ], + }) + ); + + // -50% + jest.mocked(datasource.runQuery).mockReturnValueOnce( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 21, + }, + ], + }, + }, + ], + }) + ); + + // No more than 50% of the remaining shards + jest.mocked(datasource.runQuery).mockReturnValue( + of({ + data: [ + { + ...metricFrameA, + meta: { + ...metricFrameA.meta, + stats: [ + ...metricFrameA.meta!.stats!, + { + displayName: 'Summary: exec time', + unit: 's', + value: 0.5, + }, + ], + }, + }, + ], + }) + ); + + await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => { + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_0_3', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"20|10|9"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // +50% + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_3_4', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"8|7|6|5"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // Error, sqrt(currentSize) + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_3_2', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"8|7"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // +10% + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_5_3', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"6|5|4"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // -10% + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_8_2', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // No more than 50% of the remaining shards + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_10_1', + targets: [ + { + expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])', + refId: 'A', + direction: LokiQueryDirection.Scan, + }, + ], + }); + + // No more than 50% of the remaining shards + expect(datasource.runQuery).toHaveBeenCalledWith({ + intervalMs: expect.any(Number), + range: expect.any(Object), + requestId: 'TEST_shard_0_11_1', + targets: [ + { expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan }, + ], + }); + }); + }); +}); diff --git a/public/app/plugins/datasource/loki/shardQuerySplitting.ts b/public/app/plugins/datasource/loki/shardQuerySplitting.ts new file mode 100644 index 00000000000..d1b74e9de67 --- /dev/null +++ b/public/app/plugins/datasource/loki/shardQuerySplitting.ts @@ -0,0 +1,353 @@ +import { groupBy, partition } from 'lodash'; +import { Observable, Subscriber, Subscription } from 'rxjs'; +import { v4 as uuidv4 } from 'uuid'; + +import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data'; + +import { LokiDatasource } from './datasource'; +import { combineResponses, replaceResponses } from './mergeResponses'; +import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting'; +import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils'; +import { isRetriableError } from './responseUtils'; +import { LokiQuery } from './types'; + +/** + * Query splitting by stream shards. + * Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data, + * dividing a big request into smaller sub-requests, combining and displaying the results as they arrive. + * + * This approach, inspired by the time-based query splitting, takes advantage of the __stream_shard__ + * internal label, representing how data is spread into different sources that can be queried individually. + * + * The main entry point of this module is runShardSplitQuery(), which prepares the query for execution and + * passes it to splitQueriesByStreamShard() to begin the querying loop. + * + * splitQueriesByStreamShard() has the following structure: + * - Creates and returns an Observable to which the UI will subscribe + * - Requests the __stream_shard__ values of the selected service: + * . If there are no shard values, it falls back to the standard querying approach of the data source in runNonSplitRequest() + * . If there are shards: + * - It sorts them by value, descending. Higher shard numbers correspond with the least volume. + * - It defines an initial group size, roughly Math.sqrt(amountOfShards). + * - It begins the querying loop with runNextRequest(). + * - runNextRequest() will create a group of groupSize shards from the nth shard (cycle), and has the following internal structure: + * . groupShardRequests() returns an array of shards from cycle to cycle + groupSize. + * . interpolateShardingSelector() will update the stream selector with the shard numbers in the current group. + * . After query execution: + * - If the response is successful: + * . It will add new data to the response with combineResponses() + * . Using the data and meta data of the response, updateGroupSizeFromResponse() will increase or decrease the group size. + * . nextRequest() will use the current cycle and group size to determine the next request or complete execution with done(). + * - If the response is unsuccessful: + * . If the response is not a query error, and the group size bigger than 1, it will decrease the group size. + * . If the group size is already 1, it will retry the request up to 4 times. + * . If there are retry attempts, it will retry the current cycle, or else stop querying. + * - Once all request groups have been executed, it will be done(). + */ + +export function runShardSplitQuery(datasource: LokiDatasource, request: DataQueryRequest) { + const queries = datasource + .interpolateVariablesInQueries(request.targets, request.scopedVars) + .filter((query) => query.expr) + .filter((query) => !query.hide); + + return splitQueriesByStreamShard(datasource, request, queries); +} + +function splitQueriesByStreamShard( + datasource: LokiDatasource, + request: DataQueryRequest, + splittingTargets: LokiQuery[] +) { + let shouldStop = false; + let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming, key: uuidv4() }; + let subquerySubscription: Subscription | null = null; + let retriesMap = new Map(); + let retryTimer: ReturnType | null = null; + + const runNextRequest = (subscriber: Subscriber, group: number, groups: ShardedQueryGroup[]) => { + let nextGroupSize = groups[group].groupSize; + const { shards, groupSize, cycle } = groups[group]; + let retrying = false; + + if (subquerySubscription != null) { + subquerySubscription.unsubscribe(); + subquerySubscription = null; + } + + if (shouldStop) { + subscriber.complete(); + return; + } + + const done = () => { + mergedResponse.state = LoadingState.Done; + subscriber.next(mergedResponse); + subscriber.complete(); + }; + + const nextRequest = () => { + const nextGroup = + groups[group + 1] && groupHasPendingRequests(groups[group + 1]) + ? groups[group + 1] + : groups.find((shardGroup) => groupHasPendingRequests(shardGroup)); + + if (nextGroup === undefined) { + done(); + return; + } + groups[group].groupSize = nextGroupSize; + runNextRequest(subscriber, groups.indexOf(nextGroup), groups); + }; + + const retry = (errorResponse?: DataQueryResponse) => { + try { + if (errorResponse && !isRetriableError(errorResponse)) { + return false; + } + } catch (e) { + console.error(e); + shouldStop = true; + return false; + } + + if (groupSize !== undefined && groupSize > 1) { + groups[group].groupSize = Math.floor(Math.sqrt(groupSize)); + debug(`Possible time out, new group size ${groups[group].groupSize}`); + retrying = true; + runNextRequest(subscriber, group, groups); + return true; + } + + const key = `${group}_${cycle}`; + const retries = retriesMap.get(key) ?? 0; + + if (retries > 3) { + shouldStop = true; + return false; + } + + retriesMap.set(key, retries + 1); + + retryTimer = setTimeout( + () => { + console.warn(`Retrying ${group} ${cycle} (${retries + 1})`); + runNextRequest(subscriber, group, groups); + retryTimer = null; + }, + 1500 * Math.pow(2, retries) + ); // Exponential backoff + + retrying = true; + + return true; + }; + + const targets = adjustTargetsFromResponseState(groups[group].targets, mergedResponse); + if (!targets.length) { + nextRequest(); + return; + } + + const shardsToQuery = + shards && cycle !== undefined && groupSize ? groupShardRequests(shards, cycle, groupSize) : []; + const subRequest = { ...request, targets: interpolateShardingSelector(targets, shardsToQuery) }; + // Request may not have a request id + if (request.requestId) { + subRequest.requestId = + shardsToQuery.length > 0 ? `${request.requestId}_shard_${group}_${cycle}_${groupSize}` : request.requestId; + } + + debug(shardsToQuery.length ? `Querying ${shardsToQuery.join(', ')}` : 'Running regular query'); + + const queryRunner = + shardsToQuery.length > 0 ? datasource.runQuery.bind(datasource) : runSplitQuery.bind(null, datasource); + subquerySubscription = queryRunner(subRequest).subscribe({ + next: (partialResponse: DataQueryResponse) => { + if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) { + if (retry(partialResponse)) { + return; + } + } + if (groupSize && cycle !== undefined && shards !== undefined) { + nextGroupSize = constrainGroupSize( + cycle + groupSize, + updateGroupSizeFromResponse(partialResponse, groups[group]), + shards.length + ); + if (nextGroupSize !== groupSize) { + debug(`New group size ${nextGroupSize}`); + } + } + mergedResponse = + shardsToQuery.length > 0 + ? combineResponses(mergedResponse, partialResponse) + : replaceResponses(mergedResponse, partialResponse); + + // When we delegate query running to runSplitQuery(), we will receive partial updates here, and complete + // will be called when all the sub-requests were completed, so we need to show partial progress here. + if (shardsToQuery.length === 0) { + subscriber.next(mergedResponse); + } + }, + complete: () => { + if (retrying) { + return; + } + subscriber.next(mergedResponse); + nextRequest(); + }, + error: (error: unknown) => { + console.error(error, { msg: 'failed to shard' }); + subscriber.next(mergedResponse); + if (retry()) { + return; + } + nextRequest(); + }, + }); + }; + + const response = new Observable((subscriber) => { + groupTargetsByQueryType(splittingTargets, datasource, request).then((groupedRequests) => { + runNextRequest(subscriber, 0, groupedRequests); + }); + return () => { + shouldStop = true; + if (retryTimer) { + clearTimeout(retryTimer); + } + if (subquerySubscription != null) { + subquerySubscription.unsubscribe(); + subquerySubscription = null; + } + }; + }); + + return response; +} + +interface ShardedQueryGroup { + targets: LokiQuery[]; + shards?: number[]; + groupSize?: number; + cycle?: number; +} + +async function groupTargetsByQueryType( + targets: LokiQuery[], + datasource: LokiDatasource, + request: DataQueryRequest +) { + const [shardedQueries, otherQueries] = partition(targets, (query) => requestSupportsSharding([query])); + const groups: ShardedQueryGroup[] = []; + + if (otherQueries.length) { + groups.push({ + targets: otherQueries, + }); + } + + const selectorPartition = groupBy(shardedQueries, (query) => getSelectorForShardValues(query.expr)); + for (const selector in selectorPartition) { + try { + const values = await datasource.languageProvider.fetchLabelValues('__stream_shard__', { + timeRange: request.range, + streamSelector: selector, + }); + const shards = values.map((value) => parseInt(value, 10)); + if (shards) { + shards.sort((a, b) => b - a); + debug(`Querying ${selector} with shards ${shards.join(', ')}`); + } + groups.push({ + targets: selectorPartition[selector], + shards: shards.length ? shards : undefined, + groupSize: shards.length ? getInitialGroupSize(shards) : undefined, + cycle: 0, + }); + } catch (error) { + console.error(error, { msg: 'failed to fetch label values for __stream_shard__' }); + groups.push({ + targets: selectorPartition[selector], + }); + } + } + + return groups; +} + +function groupHasPendingRequests(group: ShardedQueryGroup) { + if (group.cycle === undefined || !group.groupSize || !group.shards) { + return false; + } + const { cycle, groupSize, shards } = group; + const nextCycle = Math.min(cycle + groupSize, shards.length); + group.cycle = nextCycle; + return cycle < shards.length && nextCycle <= shards.length; +} + +function updateGroupSizeFromResponse(response: DataQueryResponse, group: ShardedQueryGroup) { + const { groupSize: currentSize } = group; + if (!currentSize) { + return 1; + } + if (!response.data.length) { + // Empty response, increase group size + return currentSize + 1; + } + + const metaExecutionTime: QueryResultMetaStat | undefined = response.data[0].meta?.stats?.find( + (stat: QueryResultMetaStat) => stat.displayName === 'Summary: exec time' + ); + + if (metaExecutionTime) { + const executionTime = Math.round(metaExecutionTime.value); + debug(`${metaExecutionTime.value}`); + // Positive scenarios + if (executionTime <= 1) { + return Math.floor(currentSize * 1.5); + } else if (executionTime < 6) { + return Math.ceil(currentSize * 1.1); + } + + // Negative scenarios + if (currentSize === 1) { + return currentSize; + } else if (executionTime < 20) { + return Math.ceil(currentSize * 0.9); + } else { + return Math.floor(currentSize / 2); + } + } + + return currentSize; +} + +/** + * Prevents the group size for ever being more than maxFactor% of the pending shards. + */ +function constrainGroupSize(cycle: number, groupSize: number, shards: number) { + const maxFactor = 0.7; + return Math.min(groupSize, Math.max(Math.floor((shards - cycle) * maxFactor), 1)); +} + +function groupShardRequests(shards: number[], start: number, groupSize: number) { + if (start === shards.length) { + return [-1]; + } + return shards.slice(start, start + groupSize); +} + +function getInitialGroupSize(shards: number[]) { + return Math.floor(Math.sqrt(shards.length)); +} + +// Enable to output debugging logs +const DEBUG_ENABLED = Boolean(localStorage.getItem(`loki.sharding_debug_enabled`)); +function debug(message: string) { + if (!DEBUG_ENABLED) { + return; + } + console.log(message); +}