Loki: Added support to split queries by stream shard (#94245)

* Add shard query splitting implementation

* Shard query splitting: reuse function from query splitting

* Shard query splitting: remove max line limit

* Shard query splitting: update test

* Shard query splitting: fix types and non-sharded queries

* Merge responses: fix log merging

* Merge responses: remove legacy code

* Query splitting: add support to retry failed requests

* Query splitting: unit test request retrying

* Query splitting: add unsubscriptions

* Shard query splitting: fix retrying

* Shard query splitting: switch to dynamic grouping

* Shard query splitting: update group size thresholds and fix -1 query

* Shard query splitting: update initial group size + don't retry parse errors

* Shard query splitting: update unit test

* chore: update mock value

* Shard query splitting: add support for multiple targets

* chore: update description

* Shard query splitting: use group targets

* chore: filter hidden queries

* Shard query splitting: issue initial log query without sharding

* Splitting: fix retrying in both methods

* Merge responses: keep execution time

* Shard query splitting: remove no-shard attempt

* Shard query splitting: adjust groups based on rate of change

* chore: clean up experiments

* Shard query splittng: remove log query restrictions

* Shard query splitting: remove fallback to time splitting

* Loki: add new query direction

* Missing generated file

* LokiOptionField: integrate new query direction

* Shard query splitting: delegate non-scan queries to time splitting

* Query splitting: do not retry queries with parse errors

* Loki datasource: add placeholder for feature flag

* Shard query splitting: add function with support criteria

* Shard query splitting: refactor query modification and shard logs volume

* Shard query splitting: update unit tests

* chore: Update scan direction tooltip

* chore: formatting

* LogsVolumePanel: fix missing state in logs volume panel data

* Merge responses: better handle missing nanoseconds

* LokiQueryOptionFields: display query direction for log queries

* loki: process scan direction as backward

* Loki datasource: restrict sharding to Explore

* Retrying: invert criteria and move to response utils

* Formatting

* Use log volume refId constant

* Fix import order

* Create feature flag

* Use feature toggle

* LogsVolumePanel: prevent flashing no data while streaming
This commit is contained in:
Matias Chomicki 2024-10-23 13:21:03 +02:00 committed by GitHub
parent a5ae8959e3
commit 2573cbec08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 2740 additions and 25 deletions

View File

@ -135,6 +135,7 @@ Experimental features might be changed or removed without prior notice.
| `mysqlParseTime` | Ensure the parseTime flag is set for MySQL driver | | `mysqlParseTime` | Ensure the parseTime flag is set for MySQL driver |
| `alertingBacktesting` | Rule backtesting API for alerting | | `alertingBacktesting` | Rule backtesting API for alerting |
| `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files | | `editPanelCSVDragAndDrop` | Enables drag and drop for CSV and Excel files |
| `lokiShardSplitting` | Use stream shards to split queries into smaller subqueries |
| `lokiQuerySplittingConfig` | Give users the option to configure split durations for Loki queries | | `lokiQuerySplittingConfig` | Give users the option to configure split durations for Loki queries |
| `individualCookiePreferences` | Support overriding cookie preferences per user | | `individualCookiePreferences` | Support overriding cookie preferences per user |
| `influxqlStreamingParser` | Enable streaming JSON parser for InfluxDB datasource InfluxQL query language | | `influxqlStreamingParser` | Enable streaming JSON parser for InfluxDB datasource InfluxQL query language |

View File

@ -52,6 +52,7 @@ export interface FeatureToggles {
editPanelCSVDragAndDrop?: boolean; editPanelCSVDragAndDrop?: boolean;
alertingNoNormalState?: boolean; alertingNoNormalState?: boolean;
logsContextDatasourceUi?: boolean; logsContextDatasourceUi?: boolean;
lokiShardSplitting?: boolean;
lokiQuerySplitting?: boolean; lokiQuerySplitting?: boolean;
lokiQuerySplittingConfig?: boolean; lokiQuerySplittingConfig?: boolean;
individualCookiePreferences?: boolean; individualCookiePreferences?: boolean;

View File

@ -33,6 +33,7 @@ export enum SupportingQueryType {
export enum LokiQueryDirection { export enum LokiQueryDirection {
Backward = 'backward', Backward = 'backward',
Forward = 'forward', Forward = 'forward',
Scan = 'scan',
} }
export interface LokiDataQuery extends common.DataQuery { export interface LokiDataQuery extends common.DataQuery {

View File

@ -265,6 +265,13 @@ var (
Expression: "true", // turned on by default Expression: "true", // turned on by default
AllowSelfServe: true, AllowSelfServe: true,
}, },
{
Name: "lokiShardSplitting",
Description: "Use stream shards to split queries into smaller subqueries",
Stage: FeatureStageExperimental,
FrontendOnly: true,
Owner: grafanaObservabilityLogsSquad,
},
{ {
Name: "lokiQuerySplitting", Name: "lokiQuerySplitting",
Description: "Split large interval queries into subqueries with smaller time intervals", Description: "Split large interval queries into subqueries with smaller time intervals",

View File

@ -33,6 +33,7 @@ alertingBacktesting,experimental,@grafana/alerting-squad,false,false,false
editPanelCSVDragAndDrop,experimental,@grafana/dataviz-squad,false,false,true editPanelCSVDragAndDrop,experimental,@grafana/dataviz-squad,false,false,true
alertingNoNormalState,preview,@grafana/alerting-squad,false,false,false alertingNoNormalState,preview,@grafana/alerting-squad,false,false,false
logsContextDatasourceUi,GA,@grafana/observability-logs,false,false,true logsContextDatasourceUi,GA,@grafana/observability-logs,false,false,true
lokiShardSplitting,experimental,@grafana/observability-logs,false,false,true
lokiQuerySplitting,GA,@grafana/observability-logs,false,false,true lokiQuerySplitting,GA,@grafana/observability-logs,false,false,true
lokiQuerySplittingConfig,experimental,@grafana/observability-logs,false,false,true lokiQuerySplittingConfig,experimental,@grafana/observability-logs,false,false,true
individualCookiePreferences,experimental,@grafana/grafana-backend-group,false,false,false individualCookiePreferences,experimental,@grafana/grafana-backend-group,false,false,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
33 editPanelCSVDragAndDrop experimental @grafana/dataviz-squad false false true
34 alertingNoNormalState preview @grafana/alerting-squad false false false
35 logsContextDatasourceUi GA @grafana/observability-logs false false true
36 lokiShardSplitting experimental @grafana/observability-logs false false true
37 lokiQuerySplitting GA @grafana/observability-logs false false true
38 lokiQuerySplittingConfig experimental @grafana/observability-logs false false true
39 individualCookiePreferences experimental @grafana/grafana-backend-group false false false

View File

@ -143,6 +143,10 @@ const (
// Allow datasource to provide custom UI for context view // Allow datasource to provide custom UI for context view
FlagLogsContextDatasourceUi = "logsContextDatasourceUi" FlagLogsContextDatasourceUi = "logsContextDatasourceUi"
// FlagLokiShardSplitting
// Use stream shards to split queries into smaller subqueries
FlagLokiShardSplitting = "lokiShardSplitting"
// FlagLokiQuerySplitting // FlagLokiQuerySplitting
// Split large interval queries into subqueries with smaller time intervals // Split large interval queries into subqueries with smaller time intervals
FlagLokiQuerySplitting = "lokiQuerySplitting" FlagLokiQuerySplitting = "lokiQuerySplitting"

View File

@ -1986,6 +1986,22 @@
"codeowner": "@grafana/observability-logs" "codeowner": "@grafana/observability-logs"
} }
}, },
{
"metadata": {
"name": "lokiShardSplitting",
"resourceVersion": "1729678036788",
"creationTimestamp": "2024-10-23T10:06:42Z",
"annotations": {
"grafana.app/updatedTimestamp": "2024-10-23 10:07:16.788828 +0000 UTC"
}
},
"spec": {
"description": "Use stream shards to split queries into smaller subqueries",
"stage": "experimental",
"codeowner": "@grafana/observability-logs",
"frontend": true
}
},
{ {
"metadata": { "metadata": {
"name": "lokiStructuredMetadata", "name": "lokiStructuredMetadata",

View File

@ -13,6 +13,7 @@ package dataquery
const ( const (
LokiQueryDirectionBackward LokiQueryDirection = "backward" LokiQueryDirectionBackward LokiQueryDirection = "backward"
LokiQueryDirectionForward LokiQueryDirection = "forward" LokiQueryDirectionForward LokiQueryDirection = "forward"
LokiQueryDirectionScan LokiQueryDirection = "scan"
) )
// Defines values for LokiQueryType. // Defines values for LokiQueryType.

View File

@ -94,6 +94,8 @@ func parseDirection(jsonPointerValue *string) (Direction, error) {
return DirectionBackward, nil return DirectionBackward, nil
case "forward": case "forward":
return DirectionForward, nil return DirectionForward, nil
case "scan":
return DirectionBackward, nil
default: default:
return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue) return DirectionBackward, fmt.Errorf("invalid queryDirection: %s", jsonValue)
} }

View File

@ -109,7 +109,7 @@ export const LogsVolumePanelList = ({
return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />; return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />;
} }
if (numberOfLogVolumes === 0) { if (numberOfLogVolumes === 0 && logsVolumeData?.state !== LoadingState.Streaming) {
return ( return (
<div className={styles.alertContainer}> <div className={styles.alertContainer}>
<Alert severity="info" title="No logs volume available"> <Alert severity="info" title="No logs volume available">
@ -122,7 +122,6 @@ export const LogsVolumePanelList = ({
return ( return (
<div className={styles.listContainer}> <div className={styles.listContainer}>
{Object.keys(logVolumes).map((name, index) => { {Object.keys(logVolumes).map((name, index) => {
const logsVolumeData = { data: logVolumes[name] };
return ( return (
<LogsVolumePanel <LogsVolumePanel
toggleLegendRef={toggleLegendRef} toggleLegendRef={toggleLegendRef}
@ -130,7 +129,7 @@ export const LogsVolumePanelList = ({
timeRange={visibleRange} timeRange={visibleRange}
allLogsVolumeMaximum={allLogsVolumeMaximumValue} allLogsVolumeMaximum={allLogsVolumeMaximumValue}
width={width} width={width}
logsVolumeData={logsVolumeData} logsVolumeData={{ data: logVolumes[name], state: logsVolumeData?.state }}
onUpdateTimeRange={onUpdateTimeRange} onUpdateTimeRange={onUpdateTimeRange}
timeZone={timeZone} timeZone={timeZone}
splitOpen={splitOpen} splitOpen={splitOpen}

View File

@ -104,6 +104,66 @@ export function getMockFrames() {
length: 2, length: 2,
}; };
const logFrameAB: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [1, 2, 3, 4],
},
{
name: 'Line',
type: FieldType.string,
config: {},
values: ['line3', 'line4', 'line1', 'line2'],
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: [
{
otherLabel: 'other value',
},
undefined,
{
label: 'value',
},
{
otherLabel: 'other value',
},
],
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: ['1000000', '2000000', '3000000', '4000000'],
},
{
name: 'id',
type: FieldType.string,
config: {},
values: ['id3', 'id4', 'id1', 'id2'],
},
],
meta: {
custom: {
frameType: 'LabeledTimeValues',
},
stats: [
{
displayName: 'Summary: total bytes processed',
unit: 'decbytes',
value: 22,
},
],
},
length: 4,
};
const metricFrameA: DataFrame = { const metricFrameA: DataFrame = {
refId: 'A', refId: 'A',
fields: [ fields: [
@ -192,11 +252,59 @@ export function getMockFrames() {
length: 2, length: 2,
}; };
const emptyFrame: DataFrame = {
refId: 'A',
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [],
},
{
name: 'Line',
type: FieldType.string,
config: {},
values: [],
},
{
name: 'labels',
type: FieldType.other,
config: {},
values: [],
},
{
name: 'tsNs',
type: FieldType.string,
config: {},
values: [],
},
{
name: 'id',
type: FieldType.string,
config: {},
values: [],
},
],
meta: {
custom: {
frameType: 'LabeledTimeValues',
},
stats: [
{ displayName: 'Summary: total bytes processed', unit: 'decbytes', value: 0 },
{ displayName: 'Ingester: total reached', value: 0 },
],
},
length: 2,
};
return { return {
logFrameA, logFrameA,
logFrameB, logFrameB,
logFrameAB,
metricFrameA, metricFrameA,
metricFrameB, metricFrameB,
metricFrameC, metricFrameC,
emptyFrame,
}; };
} }

View File

@ -36,8 +36,17 @@ export const queryDirections: Array<SelectableValue<LokiQueryDirection>> = [
label: 'Forward', label: 'Forward',
description: 'Search in forward direction.', description: 'Search in forward direction.',
}, },
{
value: LokiQueryDirection.Scan,
label: 'Scan',
description: 'Split the query into smaller units and stop at the requested log line limit.',
},
]; ];
export function getQueryDirectionLabel(direction: LokiQueryDirection) {
return queryDirections.find((queryDirection) => queryDirection.value === direction)?.label ?? 'Unknown';
}
if (config.featureToggles.lokiExperimentalStreaming) { if (config.featureToggles.lokiExperimentalStreaming) {
queryTypeOptions.push({ queryTypeOptions.push({
value: LokiQueryType.Stream, value: LokiQueryType.Stream,

View File

@ -49,7 +49,7 @@ composableKinds: DataQuery: {
#SupportingQueryType: "logsVolume" | "logsSample" | "dataSample" | "infiniteScroll" @cuetsy(kind="enum") #SupportingQueryType: "logsVolume" | "logsSample" | "dataSample" | "infiniteScroll" @cuetsy(kind="enum")
#LokiQueryDirection: "forward" | "backward" @cuetsy(kind="enum") #LokiQueryDirection: "forward" | "backward" | "scan" @cuetsy(kind="enum")
} }
}] }]
lenses: [] lenses: []

View File

@ -31,6 +31,7 @@ export enum SupportingQueryType {
export enum LokiQueryDirection { export enum LokiQueryDirection {
Backward = 'backward', Backward = 'backward',
Forward = 'forward', Forward = 'forward',
Scan = 'scan',
} }
export interface LokiDataQuery extends common.DataQuery { export interface LokiDataQuery extends common.DataQuery {

View File

@ -82,9 +82,11 @@ import {
getStreamSelectorsFromQuery, getStreamSelectorsFromQuery,
isLogsQuery, isLogsQuery,
isQueryWithError, isQueryWithError,
requestSupportsSharding,
requestSupportsSplitting, requestSupportsSplitting,
} from './queryUtils'; } from './queryUtils';
import { replaceVariables, returnVariables } from './querybuilder/parsingUtils'; import { replaceVariables, returnVariables } from './querybuilder/parsingUtils';
import { runShardSplitQuery } from './shardQuerySplitting';
import { convertToWebSocketUrl, doLokiChannelStream } from './streaming'; import { convertToWebSocketUrl, doLokiChannelStream } from './streaming';
import { trackQuery } from './tracking'; import { trackQuery } from './tracking';
import { import {
@ -359,7 +361,13 @@ export class LokiDatasource
return this.runLiveQueryThroughBackend(fixedRequest); return this.runLiveQueryThroughBackend(fixedRequest);
} }
if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) { if (
config.featureToggles.lokiShardSplitting &&
requestSupportsSharding(fixedRequest.targets) &&
fixedRequest.app === CoreApp.Explore
) {
return runShardSplitQuery(this, fixedRequest);
} else if (config.featureToggles.lokiQuerySplitting && requestSupportsSplitting(fixedRequest.targets)) {
return runSplitQuery(this, fixedRequest); return runSplitQuery(this, fixedRequest);
} }

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,318 @@
import {
closestIdx,
DataFrame,
DataFrameType,
DataQueryResponse,
DataQueryResponseData,
Field,
FieldType,
LoadingState,
QueryResultMetaStat,
shallowCompare,
} from '@grafana/data';
import { LOADING_FRAME_NAME } from './querySplitting';
export function combineResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
if (!currentResponse) {
return cloneQueryResponse(newResponse);
}
newResponse.data.forEach((newFrame) => {
const currentFrame = currentResponse.data.find((frame) => shouldCombine(frame, newFrame));
if (!currentFrame) {
currentResponse.data.push(cloneDataFrame(newFrame));
return;
}
mergeFrames(currentFrame, newFrame);
});
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
if (mergedErrors.length > 0) {
currentResponse.errors = mergedErrors;
}
// the `.error` attribute is obsolete now,
// but we have to maintain it, otherwise
// some grafana parts do not behave well.
// we just choose the old error, if it exists,
// otherwise the new error, if it exists.
const mergedError = currentResponse.error ?? newResponse.error;
if (mergedError != null) {
currentResponse.error = mergedError;
}
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
if (mergedTraceIds.length > 0) {
currentResponse.traceIds = mergedTraceIds;
}
return currentResponse;
}
/**
* Given an existing DataQueryResponse, replace any data frame present in newResponse with those in newResponse
*/
export function replaceResponses(currentResponse: DataQueryResponse | null, newResponse: DataQueryResponse) {
if (!currentResponse) {
return cloneQueryResponse(newResponse);
}
newResponse.data.forEach((newFrame) => {
const currentFrameIndex = currentResponse.data.findIndex((frame) => shouldCombine(frame, newFrame));
if (currentFrameIndex < 0) {
currentResponse.data.push(cloneDataFrame(newFrame));
return;
}
currentResponse.data[currentFrameIndex] = newFrame;
});
// Clean up loading frame when newResponse contains the final response
if (newResponse.state === LoadingState.Done) {
currentResponse.data = currentResponse.data.filter((frame) => frame.name !== LOADING_FRAME_NAME);
}
const mergedErrors = [...(currentResponse.errors ?? []), ...(newResponse.errors ?? [])];
if (mergedErrors.length > 0) {
currentResponse.errors = mergedErrors;
}
const mergedError = currentResponse.error ?? newResponse.error;
if (mergedError != null) {
currentResponse.error = mergedError;
}
const mergedTraceIds = [...(currentResponse.traceIds ?? []), ...(newResponse.traceIds ?? [])];
if (mergedTraceIds.length > 0) {
currentResponse.traceIds = mergedTraceIds;
}
return currentResponse;
}
/**
* Given two data frames, merge their values. Overlapping values will be added together.
*/
export function mergeFrames(dest: DataFrame, source: DataFrame) {
const destTimeField = dest.fields.find((field) => field.type === FieldType.time);
const destIdField = dest.fields.find((field) => field.type === FieldType.string && field.name === 'id');
const sourceTimeField = source.fields.find((field) => field.type === FieldType.time);
const sourceIdField = source.fields.find((field) => field.type === FieldType.string && field.name === 'id');
if (!destTimeField || !sourceTimeField) {
console.error(new Error(`Time fields not found in the data frames`));
return;
}
const sourceTimeValues = sourceTimeField?.values.slice(0) ?? [];
const totalFields = Math.max(dest.fields.length, source.fields.length);
for (let i = 0; i < sourceTimeValues.length; i++) {
const destIdx = resolveIdx(destTimeField, sourceTimeField, i);
const entryExistsInDest = compareEntries(destTimeField, destIdField, destIdx, sourceTimeField, sourceIdField, i);
for (let f = 0; f < totalFields; f++) {
// For now, skip undefined fields that exist in the new frame
if (!dest.fields[f]) {
continue;
}
// Index is not reliable when frames have disordered fields, or an extra/missing field, so we find them by name.
// If the field has no name, we fallback to the old index version.
const sourceField = findSourceField(dest.fields[f], source.fields, f);
if (!sourceField) {
continue;
}
// Same value, accumulate
if (entryExistsInDest) {
if (dest.fields[f].type === FieldType.time) {
// Time already exists, skip
continue;
} else if (dest.fields[f].type === FieldType.number) {
// Number, add
dest.fields[f].values[destIdx] = (dest.fields[f].values[destIdx] ?? 0) + sourceField.values[i];
} else if (dest.fields[f].type === FieldType.other) {
// Possibly labels, combine
if (typeof sourceField.values[i] === 'object') {
dest.fields[f].values[destIdx] = {
...dest.fields[f].values[destIdx],
...sourceField.values[i],
};
} else if (sourceField.values[i]) {
dest.fields[f].values[destIdx] = sourceField.values[i];
}
} else {
// Replace value
dest.fields[f].values[destIdx] = sourceField.values[i];
}
} else if (sourceField.values[i] !== undefined) {
// Insert in the `destIdx` position
dest.fields[f].values.splice(destIdx, 0, sourceField.values[i]);
if (sourceField.nanos) {
dest.fields[f].nanos = dest.fields[f].nanos ?? new Array(dest.fields[f].values.length - 1).fill(0);
dest.fields[f].nanos?.splice(destIdx, 0, sourceField.nanos[i]);
} else if (dest.fields[f].nanos) {
dest.fields[f].nanos?.splice(destIdx, 0, 0);
}
}
}
}
dest.length = dest.fields[0].values.length;
dest.meta = {
...dest.meta,
stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []),
};
}
function resolveIdx(destField: Field, sourceField: Field, index: number) {
const idx = closestIdx(sourceField.values[index], destField.values);
if (idx < 0) {
return 0;
}
if (sourceField.values[index] === destField.values[idx] && sourceField.nanos && destField.nanos) {
return sourceField.nanos[index] > destField.nanos[idx] ? idx + 1 : idx;
}
if (sourceField.values[index] > destField.values[idx]) {
return idx + 1;
}
return idx;
}
function compareEntries(
destTimeField: Field,
destIdField: Field | undefined,
destIndex: number,
sourceTimeField: Field,
sourceIdField: Field | undefined,
sourceIndex: number
) {
const sameTimestamp = compareNsTimestamps(destTimeField, destIndex, sourceTimeField, sourceIndex);
if (!sameTimestamp) {
return false;
}
if (!destIdField || !sourceIdField) {
return true;
}
// Log frames, check indexes
return (
destIdField.values[destIndex] !== undefined && destIdField.values[destIndex] === sourceIdField.values[sourceIndex]
);
}
function compareNsTimestamps(destField: Field, destIndex: number, sourceField: Field, sourceIndex: number) {
if (destField.nanos && sourceField.nanos) {
return (
destField.values[destIndex] !== undefined &&
destField.values[destIndex] === sourceField.values[sourceIndex] &&
destField.nanos[destIndex] !== undefined &&
destField.nanos[destIndex] === sourceField.nanos[sourceIndex]
);
}
return destField.values[destIndex] !== undefined && destField.values[destIndex] === sourceField.values[sourceIndex];
}
function findSourceField(referenceField: Field, sourceFields: Field[], index: number) {
const candidates = sourceFields.filter((f) => f.name === referenceField.name);
if (candidates.length === 1) {
return candidates[0];
}
if (referenceField.labels) {
return candidates.find((candidate) => shallowCompare(referenceField.labels ?? {}, candidate.labels ?? {}));
}
return sourceFields[index];
}
const TOTAL_BYTES_STAT = 'Summary: total bytes processed';
const EXEC_TIME_STAT = 'Summary: exec time';
// This is specific for Loki
function getCombinedMetadataStats(
destStats: QueryResultMetaStat[],
sourceStats: QueryResultMetaStat[]
): QueryResultMetaStat[] {
// in the current approach, we only handle a single stat
const stats: QueryResultMetaStat[] = [];
for (const stat of [TOTAL_BYTES_STAT, EXEC_TIME_STAT]) {
const destStat = destStats.find((s) => s.displayName === stat);
const sourceStat = sourceStats.find((s) => s.displayName === stat);
if (sourceStat != null && destStat != null) {
stats.push({ value: sourceStat.value + destStat.value, displayName: stat, unit: destStat.unit });
continue;
}
// maybe one of them exist
const eitherStat = sourceStat ?? destStat;
if (eitherStat != null) {
stats.push(eitherStat);
}
}
return stats;
}
/**
* Deep clones a DataQueryResponse
*/
export function cloneQueryResponse(response: DataQueryResponse): DataQueryResponse {
const newResponse = {
...response,
data: response.data.map(cloneDataFrame),
};
return newResponse;
}
function cloneDataFrame(frame: DataQueryResponseData): DataQueryResponseData {
return {
...frame,
fields: frame.fields.map((field: Field) => ({
...field,
values: field.values,
})),
};
}
function shouldCombine(frame1: DataFrame, frame2: DataFrame): boolean {
if (frame1.refId !== frame2.refId || frame1.name !== frame2.name) {
return false;
}
const frameType1 = frame1.meta?.type;
const frameType2 = frame2.meta?.type;
if (frameType1 !== frameType2) {
// we do not join things that have a different type
return false;
}
// metric range query data
if (frameType1 === DataFrameType.TimeSeriesMulti) {
const field1 = frame1.fields.find((f) => f.type === FieldType.number);
const field2 = frame2.fields.find((f) => f.type === FieldType.number);
if (field1 === undefined || field2 === undefined) {
// should never happen
return false;
}
return shallowCompare(field1.labels ?? {}, field2.labels ?? {});
}
// logs query data
// logs use a special attribute in the dataframe's "custom" section
// because we do not have a good "frametype" value for them yet.
const customType1 = frame1.meta?.custom?.frameType;
const customType2 = frame2.meta?.custom?.frameType;
// Legacy frames have this custom type
if (customType1 === 'LabeledTimeValues' && customType2 === 'LabeledTimeValues') {
return true;
} else if (customType1 === customType2) {
// Data plane frames don't
return true;
}
// should never reach here
return false;
}

View File

@ -151,7 +151,7 @@ export function addLabelToQuery(
value: string, value: string,
labelType?: LabelType | null labelType?: LabelType | null
): string { ): string {
if (!key || !value) { if (!key) {
throw new Error('Need label to add to query.'); throw new Error('Need label to add to query.');
} }

View File

@ -1,6 +1,7 @@
import { of } from 'rxjs'; import { of } from 'rxjs';
import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data'; import { DataQueryRequest, dateTime, LoadingState } from '@grafana/data';
import { config } from '@grafana/runtime';
import { createLokiDatasource } from './__mocks__/datasource'; import { createLokiDatasource } from './__mocks__/datasource';
import { getMockFrames } from './__mocks__/frames'; import { getMockFrames } from './__mocks__/frames';
@ -16,6 +17,19 @@ jest.mock('uuid', () => ({
v4: jest.fn().mockReturnValue('uuid'), v4: jest.fn().mockReturnValue('uuid'),
})); }));
const originalShardingFlagState = config.featureToggles.lokiShardSplitting;
beforeAll(() => {
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementation((callback) => {
callback();
});
config.featureToggles.lokiShardSplitting = false;
});
afterAll(() => {
jest.mocked(global.setTimeout).mockReset();
config.featureToggles.lokiShardSplitting = originalShardingFlagState;
});
describe('runSplitQuery()', () => { describe('runSplitQuery()', () => {
let datasource: LokiDatasource; let datasource: LokiDatasource;
const range = { const range = {
@ -51,6 +65,26 @@ describe('runSplitQuery()', () => {
}); });
}); });
test('Retries retriable failed requests', async () => {
jest
.mocked(datasource.runQuery)
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 1 retry, 4 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
});
});
test('Does not retry on other errors', async () => {
jest
.mocked(datasource.runQuery)
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
// 3 days, 3 chunks, 3 requests.
expect(datasource.runQuery).toHaveBeenCalledTimes(1);
});
});
test('Metric queries with maxLines of 0 will execute', async () => { test('Metric queries with maxLines of 0 will execute', async () => {
const request = createRequest([{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }]); const request = createRequest([{ expr: 'count_over_time({a="b"}[1m])', refId: 'A', maxLines: 0 }]);
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {
@ -285,9 +319,10 @@ describe('runSplitQuery()', () => {
describe('Dynamic maxLines for logs requests', () => { describe('Dynamic maxLines for logs requests', () => {
const request = createRequest([{ expr: '{a="b"}', refId: 'A', maxLines: 4 }]); const request = createRequest([{ expr: '{a="b"}', refId: 'A', maxLines: 4 }]);
const { logFrameA } = getMockFrames(); const { logFrameA, logFrameB } = getMockFrames();
beforeEach(() => { beforeEach(() => {
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [logFrameA], refId: 'A' })); jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameA], refId: 'A' }));
jest.spyOn(datasource, 'runQuery').mockReturnValueOnce(of({ data: [logFrameB], refId: 'A' }));
}); });
test('Stops requesting once maxLines of logs have been received', async () => { test('Stops requesting once maxLines of logs have been received', async () => {
await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => { await expect(runSplitQuery(datasource, request)).toEmitValuesWith(() => {

View File

@ -14,12 +14,13 @@ import {
TimeRange, TimeRange,
LoadingState, LoadingState,
} from '@grafana/data'; } from '@grafana/data';
import { combineResponses } from '@grafana/o11y-ds-frontend';
import { LokiDatasource } from './datasource'; import { LokiDatasource } from './datasource';
import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting'; import { splitTimeRange as splitLogsTimeRange } from './logsTimeSplitting';
import { combineResponses } from './mergeResponses';
import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting'; import { splitTimeRange as splitMetricTimeRange } from './metricTimeSplitting';
import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils'; import { isLogsQuery, isQueryWithRangeVariable } from './queryUtils';
import { isRetriableError } from './responseUtils';
import { trackGroupedQueries } from './tracking'; import { trackGroupedQueries } from './tracking';
import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types'; import { LokiGroupedRequest, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
@ -53,7 +54,7 @@ export function partitionTimeRange(
* At the end, we will filter the targets that don't need to be executed in the next request batch, * At the end, we will filter the targets that don't need to be executed in the next request batch,
* becasue, for example, the `maxLines` have been reached. * becasue, for example, the `maxLines` have been reached.
*/ */
function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] { export function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQueryResponse | null): LokiQuery[] {
if (!response) { if (!response) {
return targets; return targets;
} }
@ -82,8 +83,18 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
const longestPartition = requests.filter(({ partition }) => partition.length === totalRequests)[0].partition; const longestPartition = requests.filter(({ partition }) => partition.length === totalRequests)[0].partition;
let shouldStop = false; let shouldStop = false;
let subquerySubsciption: Subscription | null = null; let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => { const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number, requestGroup: number) => {
let retrying = false;
if (subquerySubscription != null) {
subquerySubscription.unsubscribe();
subquerySubscription = null;
}
if (shouldStop) { if (shouldStop) {
subscriber.complete(); subscriber.complete();
return; return;
@ -104,6 +115,37 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
done(); done();
}; };
const retry = (errorResponse?: DataQueryResponse) => {
try {
if (errorResponse && !isRetriableError(errorResponse)) {
return false;
}
} catch (e) {
console.error(e);
shouldStop = true;
return false;
}
const key = `${requestN}-${requestGroup}`;
const retries = retriesMap.get(key) ?? 0;
if (retries > 3) {
return false;
}
retriesMap.set(key, retries + 1);
retryTimer = setTimeout(
() => {
runNextRequest(subscriber, requestN, requestGroup);
},
1500 * Math.pow(2, retries)
); // Exponential backoff
retrying = true;
return true;
};
const group = requests[requestGroup]; const group = requests[requestGroup];
const range = group.partition[requestN - 1]; const range = group.partition[requestN - 1];
const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse); const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse);
@ -119,20 +161,29 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
subRequest.requestId = `${group.request.requestId}_${requestN}`; subRequest.requestId = `${group.request.requestId}_${requestN}`;
} }
subquerySubsciption = datasource.runQuery(subRequest).subscribe({ subquerySubscription = datasource.runQuery(subRequest).subscribe({
next: (partialResponse) => { next: (partialResponse) => {
mergedResponse = combineResponses(mergedResponse, partialResponse); if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN); if (retry(partialResponse)) {
if ((mergedResponse.errors ?? []).length > 0 || mergedResponse.error != null) { return;
}
shouldStop = true; shouldStop = true;
} }
mergedResponse = combineResponses(mergedResponse, partialResponse);
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
}, },
complete: () => { complete: () => {
if (retrying) {
return;
}
subscriber.next(mergedResponse); subscriber.next(mergedResponse);
nextRequest(); nextRequest();
}, },
error: (error) => { error: (error) => {
subscriber.error(error); subscriber.error(error);
if (retry()) {
return;
}
}, },
}); });
}; };
@ -141,8 +192,13 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
runNextRequest(subscriber, totalRequests, 0); runNextRequest(subscriber, totalRequests, 0);
return () => { return () => {
shouldStop = true; shouldStop = true;
if (subquerySubsciption != null) { if (retryTimer) {
subquerySubsciption.unsubscribe(); clearTimeout(retryTimer);
retryTimer = null;
}
if (subquerySubscription != null) {
subquerySubscription.unsubscribe();
subquerySubscription = null;
} }
}; };
}); });

View File

@ -25,9 +25,10 @@ import {
} from '@grafana/lezer-logql'; } from '@grafana/lezer-logql';
import { DataQuery } from '@grafana/schema'; import { DataQuery } from '@grafana/schema';
import { getStreamSelectorPositions, NodePosition } from './modifyQuery'; import { REF_ID_STARTER_LOG_VOLUME } from './datasource';
import { addLabelToQuery, getStreamSelectorPositions, NodePosition } from './modifyQuery';
import { ErrorId } from './querybuilder/parsingUtils'; import { ErrorId } from './querybuilder/parsingUtils';
import { LokiQuery, LokiQueryType } from './types'; import { LabelType, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
/** /**
* Returns search terms from a LogQL query. * Returns search terms from a LogQL query.
@ -313,6 +314,18 @@ export function requestSupportsSplitting(allQueries: LokiQuery[]) {
return queries.length > 0; return queries.length > 0;
} }
export function requestSupportsSharding(allQueries: LokiQuery[]) {
const queries = allQueries
.filter((query) => !query.hide)
.filter((query) => !query.refId.includes('do-not-shard'))
.filter((query) => query.expr)
.filter(
(query) => query.direction === LokiQueryDirection.Scan || query.refId?.startsWith(REF_ID_STARTER_LOG_VOLUME)
);
return queries.length > 0;
}
export const isLokiQuery = (query: DataQuery): query is LokiQuery => { export const isLokiQuery = (query: DataQuery): query is LokiQuery => {
if (!query) { if (!query) {
return false; return false;
@ -328,3 +341,33 @@ export const getLokiQueryFromDataQuery = (query?: DataQuery): LokiQuery | undefi
return query; return query;
}; };
export const interpolateShardingSelector = (queries: LokiQuery[], shards: number[]) => {
if (shards.length === 0) {
return queries;
}
let shardValue = shards.join('|');
// -1 means empty shard value
if (shardValue === '-1' || shards.length === 1) {
shardValue = shardValue === '-1' ? '' : shardValue;
return queries.map((query) => ({
...query,
expr: addLabelToQuery(query.expr, '__stream_shard__', '=', shardValue, LabelType.Indexed),
}));
}
return queries.map((query) => ({
...query,
expr: addLabelToQuery(query.expr, '__stream_shard__', '=~', shardValue, LabelType.Indexed),
}));
};
export const getSelectorForShardValues = (query: string) => {
const selector = getNodesFromQuery(query, [Selector]);
if (selector.length > 0) {
return query.substring(selector[0].from, selector[0].to);
}
return '';
};

View File

@ -89,6 +89,7 @@ describe('LokiQueryBuilderOptions', () => {
setup({ expr: '{foo="bar"}' }); setup({ expr: '{foo="bar"}' });
expect(screen.getByText('Line limit: 20')).toBeInTheDocument(); expect(screen.getByText('Line limit: 20')).toBeInTheDocument();
expect(screen.getByText('Type: Range')).toBeInTheDocument(); expect(screen.getByText('Type: Range')).toBeInTheDocument();
expect(screen.getByText('Direction: Backward')).toBeInTheDocument();
expect(screen.queryByText(/step/i)).not.toBeInTheDocument(); expect(screen.queryByText(/step/i)).not.toBeInTheDocument();
}); });
@ -98,6 +99,7 @@ describe('LokiQueryBuilderOptions', () => {
expect(screen.getByText('Type: Range')).toBeInTheDocument(); expect(screen.getByText('Type: Range')).toBeInTheDocument();
expect(screen.getByText('Step: 1m')).toBeInTheDocument(); expect(screen.getByText('Step: 1m')).toBeInTheDocument();
expect(screen.getByText('Resolution: 1/2')).toBeInTheDocument(); expect(screen.getByText('Resolution: 1/2')).toBeInTheDocument();
expect(screen.queryByText(/Direction/)).not.toBeInTheDocument();
}); });
it('does not shows resolution field if resolution is not set', async () => { it('does not shows resolution field if resolution is not set', async () => {

View File

@ -8,6 +8,7 @@ import { config, reportInteraction } from '@grafana/runtime';
import { Alert, AutoSizeInput, RadioButtonGroup, Select } from '@grafana/ui'; import { Alert, AutoSizeInput, RadioButtonGroup, Select } from '@grafana/ui';
import { import {
getQueryDirectionLabel,
preprocessMaxLines, preprocessMaxLines,
queryDirections, queryDirections,
queryTypeOptions, queryTypeOptions,
@ -102,7 +103,7 @@ export const LokiQueryBuilderOptions = React.memo<Props>(
<EditorRow> <EditorRow>
<QueryOptionGroup <QueryOptionGroup
title="Options" title="Options"
collapsedInfo={getCollapsedInfo(query, queryType, maxLines, isLogQuery, isValidStep)} collapsedInfo={getCollapsedInfo(query, queryType, maxLines, isLogQuery, isValidStep, queryDirection)}
queryStats={queryStats} queryStats={queryStats}
> >
<EditorField <EditorField
@ -203,7 +204,8 @@ function getCollapsedInfo(
queryType: LokiQueryType, queryType: LokiQueryType,
maxLines: number, maxLines: number,
isLogQuery: boolean, isLogQuery: boolean,
isValidStep: boolean isValidStep: boolean,
direction: LokiQueryDirection
): string[] { ): string[] {
const queryTypeLabel = queryTypeOptions.find((x) => x.value === queryType); const queryTypeLabel = queryTypeOptions.find((x) => x.value === queryType);
const resolutionLabel = RESOLUTION_OPTIONS.find((x) => x.value === (query.resolution ?? 1)); const resolutionLabel = RESOLUTION_OPTIONS.find((x) => x.value === (query.resolution ?? 1));
@ -218,9 +220,8 @@ function getCollapsedInfo(
if (isLogQuery) { if (isLogQuery) {
items.push(`Line limit: ${query.maxLines ?? maxLines}`); items.push(`Line limit: ${query.maxLines ?? maxLines}`);
} items.push(`Direction: ${getQueryDirectionLabel(direction)}`);
} else {
if (!isLogQuery) {
if (query.step) { if (query.step) {
items.push(`Step: ${isValidStep ? query.step : 'Invalid value'}`); items.push(`Step: ${isValidStep ? query.step : 'Invalid value'}`);
} }

View File

@ -1,4 +1,4 @@
import { DataFrame, FieldType, isValidGoDuration, Labels } from '@grafana/data'; import { DataFrame, DataQueryResponse, FieldType, isValidGoDuration, Labels } from '@grafana/data';
import { isBytesString, processLabels } from './languageUtils'; import { isBytesString, processLabels } from './languageUtils';
import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser'; import { isLogLineJSON, isLogLineLogfmt, isLogLinePacked } from './lineParser';
@ -131,3 +131,14 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
} }
return levelLikeLabel; return levelLikeLabel;
} }
export function isRetriableError(errorResponse: DataQueryResponse) {
const message = errorResponse.errors ? (errorResponse.errors[0].message ?? '').toLowerCase() : '';
if (message.includes('timeout')) {
return true;
} else if (message.includes('parse error') || message.includes('max entries')) {
// If the error is a parse error, we want to signal to stop querying.
throw new Error(message);
}
return false;
}

View File

@ -0,0 +1,455 @@
import { of } from 'rxjs';
import { DataQueryRequest, DataQueryResponse, dateTime, LoadingState } from '@grafana/data';
import { createLokiDatasource } from './__mocks__/datasource';
import { getMockFrames } from './__mocks__/frames';
import { LokiDatasource } from './datasource';
import { runShardSplitQuery } from './shardQuerySplitting';
import { LokiQuery, LokiQueryDirection } from './types';
jest.mock('uuid', () => ({
v4: jest.fn().mockReturnValue('uuid'),
}));
const originalLog = console.log;
const originalWarn = console.warn;
beforeEach(() => {
//jest.spyOn(console, 'log').mockImplementation(() => {});
jest.spyOn(console, 'warn').mockImplementation(() => {});
});
afterAll(() => {
console.log = originalLog;
console.warn = originalWarn;
});
describe('runShardSplitQuery()', () => {
let datasource: LokiDatasource;
const range = {
from: dateTime('2023-02-08T04:00:00.000Z'),
to: dateTime('2023-02-08T11:00:00.000Z'),
raw: {
from: dateTime('2023-02-08T04:00:00.000Z'),
to: dateTime('2023-02-08T11:00:00.000Z'),
},
};
const createRequest = (targets: Array<Partial<LokiQuery>>, overrides?: Partial<DataQueryRequest<LokiQuery>>) => {
let request = {
range,
targets,
intervalMs: 60000,
requestId: 'TEST',
} as DataQueryRequest<LokiQuery>;
Object.assign(request, overrides);
return request;
};
let request: DataQueryRequest<LokiQuery>;
beforeEach(() => {
request = createRequest([
{ expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
]);
datasource = createLokiDatasource();
datasource.languageProvider.fetchLabelValues = jest.fn();
datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => {
return queries.map((query) => {
query.expr = query.expr.replace('$SELECTOR', '{a="b"}');
return query;
});
});
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1', '10', '2', '20', '3']);
const { metricFrameA } = getMockFrames();
jest.spyOn(datasource, 'runQuery').mockReturnValue(of({ data: [metricFrameA] }));
jest.spyOn(datasource, 'query').mockReturnValue(of({ data: [metricFrameA] }));
});
test('Splits datasource queries', async () => {
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
// 5 shards, 3 groups + empty shard group, 4 requests
expect(datasource.runQuery).toHaveBeenCalledTimes(4);
});
});
test('Interpolates queries before running', async () => {
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
expect(datasource.interpolateVariablesInQueries).toHaveBeenCalledTimes(1);
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_0_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"20|10"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_2_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_4_1',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_5_1',
targets: [
{ expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
],
});
});
});
test('Sends the whole stream selector to fetch values', async () => {
datasource.interpolateVariablesInQueries = jest.fn().mockImplementation((queries: LokiQuery[]) => {
return queries.map((query) => {
query.expr = query.expr.replace('$SELECTOR', '{service_name="test", filter="true"}');
return query;
});
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
expect(datasource.languageProvider.fetchLabelValues).toHaveBeenCalledWith('__stream_shard__', {
streamSelector: '{service_name="test", filter="true"}',
timeRange: expect.anything(),
});
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_0_2',
targets: [
{
expr: 'count_over_time({service_name="test", filter="true", __stream_shard__=~"20|10"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
});
});
test('Returns a DataQueryResponse with the expected attributes', async () => {
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].data).toBeDefined();
expect(response[0].state).toBe(LoadingState.Done);
expect(response[0].key).toBeDefined();
});
});
test('Retries failed retriable requests', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
// 1 shard + empty shard + 1 retry = 3
expect(response).toHaveLength(3);
expect(datasource.runQuery).toHaveBeenCalledTimes(3);
});
});
test('Does not retry on other errors', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'nope nope' }], data: [] }));
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(datasource.runQuery).toHaveBeenCalledTimes(2);
});
});
test('Adjusts the group size based on errors and execution time', async () => {
const request = createRequest(
[{ expr: 'count_over_time($SELECTOR[1m])', refId: 'A', direction: LokiQueryDirection.Scan }],
{
range: {
from: dateTime('2024-11-13T05:00:00.000Z'),
to: dateTime('2024-11-14T06:00:00.000Z'),
raw: {
from: dateTime('2024-11-13T05:00:00.000Z'),
to: dateTime('2024-11-14T06:00:00.000Z'),
},
},
}
);
jest
.mocked(datasource.languageProvider.fetchLabelValues)
.mockResolvedValue(['1', '10', '2', '20', '3', '4', '5', '6', '7', '8', '9']);
// @ts-expect-error
jest.spyOn(global, 'setTimeout').mockImplementationOnce((callback) => {
callback();
});
const { metricFrameA } = getMockFrames();
jest.mocked(datasource.runQuery).mockReset();
// + 50%
jest.mocked(datasource.runQuery).mockReturnValueOnce(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 0.5,
},
],
},
},
],
})
);
// sqrt(currentSize)
jest
.mocked(datasource.runQuery)
.mockReturnValueOnce(of({ state: LoadingState.Error, errors: [{ refId: 'A', message: 'timeout' }], data: [] }));
// +10%
jest.mocked(datasource.runQuery).mockReturnValueOnce(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 5,
},
],
},
},
],
})
);
// -10%
jest.mocked(datasource.runQuery).mockReturnValueOnce(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 15,
},
],
},
},
],
})
);
// -10%
jest.mocked(datasource.runQuery).mockReturnValueOnce(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 19,
},
],
},
},
],
})
);
// -50%
jest.mocked(datasource.runQuery).mockReturnValueOnce(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 21,
},
],
},
},
],
})
);
// No more than 50% of the remaining shards
jest.mocked(datasource.runQuery).mockReturnValue(
of({
data: [
{
...metricFrameA,
meta: {
...metricFrameA.meta,
stats: [
...metricFrameA.meta!.stats!,
{
displayName: 'Summary: exec time',
unit: 's',
value: 0.5,
},
],
},
},
],
})
);
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith(() => {
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_0_3',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"20|10|9"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// +50%
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_3_4',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"8|7|6|5"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// Error, sqrt(currentSize)
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_3_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"8|7"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// +10%
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_5_3',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"6|5|4"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// -10%
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_8_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// No more than 50% of the remaining shards
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_10_1',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
// No more than 50% of the remaining shards
expect(datasource.runQuery).toHaveBeenCalledWith({
intervalMs: expect.any(Number),
range: expect.any(Object),
requestId: 'TEST_shard_0_11_1',
targets: [
{ expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
],
});
});
});
});

View File

@ -0,0 +1,353 @@
import { groupBy, partition } from 'lodash';
import { Observable, Subscriber, Subscription } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { DataQueryRequest, LoadingState, DataQueryResponse, QueryResultMetaStat } from '@grafana/data';
import { LokiDatasource } from './datasource';
import { combineResponses, replaceResponses } from './mergeResponses';
import { adjustTargetsFromResponseState, runSplitQuery } from './querySplitting';
import { getSelectorForShardValues, interpolateShardingSelector, requestSupportsSharding } from './queryUtils';
import { isRetriableError } from './responseUtils';
import { LokiQuery } from './types';
/**
* Query splitting by stream shards.
* Query splitting was introduced in Loki to optimize querying for long intervals and high volume of data,
* dividing a big request into smaller sub-requests, combining and displaying the results as they arrive.
*
* This approach, inspired by the time-based query splitting, takes advantage of the __stream_shard__
* internal label, representing how data is spread into different sources that can be queried individually.
*
* The main entry point of this module is runShardSplitQuery(), which prepares the query for execution and
* passes it to splitQueriesByStreamShard() to begin the querying loop.
*
* splitQueriesByStreamShard() has the following structure:
* - Creates and returns an Observable to which the UI will subscribe
* - Requests the __stream_shard__ values of the selected service:
* . If there are no shard values, it falls back to the standard querying approach of the data source in runNonSplitRequest()
* . If there are shards:
* - It sorts them by value, descending. Higher shard numbers correspond with the least volume.
* - It defines an initial group size, roughly Math.sqrt(amountOfShards).
* - It begins the querying loop with runNextRequest().
* - runNextRequest() will create a group of groupSize shards from the nth shard (cycle), and has the following internal structure:
* . groupShardRequests() returns an array of shards from cycle to cycle + groupSize.
* . interpolateShardingSelector() will update the stream selector with the shard numbers in the current group.
* . After query execution:
* - If the response is successful:
* . It will add new data to the response with combineResponses()
* . Using the data and meta data of the response, updateGroupSizeFromResponse() will increase or decrease the group size.
* . nextRequest() will use the current cycle and group size to determine the next request or complete execution with done().
* - If the response is unsuccessful:
* . If the response is not a query error, and the group size bigger than 1, it will decrease the group size.
* . If the group size is already 1, it will retry the request up to 4 times.
* . If there are retry attempts, it will retry the current cycle, or else stop querying.
* - Once all request groups have been executed, it will be done().
*/
export function runShardSplitQuery(datasource: LokiDatasource, request: DataQueryRequest<LokiQuery>) {
const queries = datasource
.interpolateVariablesInQueries(request.targets, request.scopedVars)
.filter((query) => query.expr)
.filter((query) => !query.hide);
return splitQueriesByStreamShard(datasource, request, queries);
}
function splitQueriesByStreamShard(
datasource: LokiDatasource,
request: DataQueryRequest<LokiQuery>,
splittingTargets: LokiQuery[]
) {
let shouldStop = false;
let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming, key: uuidv4() };
let subquerySubscription: Subscription | null = null;
let retriesMap = new Map<string, number>();
let retryTimer: ReturnType<typeof setTimeout> | null = null;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, group: number, groups: ShardedQueryGroup[]) => {
let nextGroupSize = groups[group].groupSize;
const { shards, groupSize, cycle } = groups[group];
let retrying = false;
if (subquerySubscription != null) {
subquerySubscription.unsubscribe();
subquerySubscription = null;
}
if (shouldStop) {
subscriber.complete();
return;
}
const done = () => {
mergedResponse.state = LoadingState.Done;
subscriber.next(mergedResponse);
subscriber.complete();
};
const nextRequest = () => {
const nextGroup =
groups[group + 1] && groupHasPendingRequests(groups[group + 1])
? groups[group + 1]
: groups.find((shardGroup) => groupHasPendingRequests(shardGroup));
if (nextGroup === undefined) {
done();
return;
}
groups[group].groupSize = nextGroupSize;
runNextRequest(subscriber, groups.indexOf(nextGroup), groups);
};
const retry = (errorResponse?: DataQueryResponse) => {
try {
if (errorResponse && !isRetriableError(errorResponse)) {
return false;
}
} catch (e) {
console.error(e);
shouldStop = true;
return false;
}
if (groupSize !== undefined && groupSize > 1) {
groups[group].groupSize = Math.floor(Math.sqrt(groupSize));
debug(`Possible time out, new group size ${groups[group].groupSize}`);
retrying = true;
runNextRequest(subscriber, group, groups);
return true;
}
const key = `${group}_${cycle}`;
const retries = retriesMap.get(key) ?? 0;
if (retries > 3) {
shouldStop = true;
return false;
}
retriesMap.set(key, retries + 1);
retryTimer = setTimeout(
() => {
console.warn(`Retrying ${group} ${cycle} (${retries + 1})`);
runNextRequest(subscriber, group, groups);
retryTimer = null;
},
1500 * Math.pow(2, retries)
); // Exponential backoff
retrying = true;
return true;
};
const targets = adjustTargetsFromResponseState(groups[group].targets, mergedResponse);
if (!targets.length) {
nextRequest();
return;
}
const shardsToQuery =
shards && cycle !== undefined && groupSize ? groupShardRequests(shards, cycle, groupSize) : [];
const subRequest = { ...request, targets: interpolateShardingSelector(targets, shardsToQuery) };
// Request may not have a request id
if (request.requestId) {
subRequest.requestId =
shardsToQuery.length > 0 ? `${request.requestId}_shard_${group}_${cycle}_${groupSize}` : request.requestId;
}
debug(shardsToQuery.length ? `Querying ${shardsToQuery.join(', ')}` : 'Running regular query');
const queryRunner =
shardsToQuery.length > 0 ? datasource.runQuery.bind(datasource) : runSplitQuery.bind(null, datasource);
subquerySubscription = queryRunner(subRequest).subscribe({
next: (partialResponse: DataQueryResponse) => {
if ((partialResponse.errors ?? []).length > 0 || partialResponse.error != null) {
if (retry(partialResponse)) {
return;
}
}
if (groupSize && cycle !== undefined && shards !== undefined) {
nextGroupSize = constrainGroupSize(
cycle + groupSize,
updateGroupSizeFromResponse(partialResponse, groups[group]),
shards.length
);
if (nextGroupSize !== groupSize) {
debug(`New group size ${nextGroupSize}`);
}
}
mergedResponse =
shardsToQuery.length > 0
? combineResponses(mergedResponse, partialResponse)
: replaceResponses(mergedResponse, partialResponse);
// When we delegate query running to runSplitQuery(), we will receive partial updates here, and complete
// will be called when all the sub-requests were completed, so we need to show partial progress here.
if (shardsToQuery.length === 0) {
subscriber.next(mergedResponse);
}
},
complete: () => {
if (retrying) {
return;
}
subscriber.next(mergedResponse);
nextRequest();
},
error: (error: unknown) => {
console.error(error, { msg: 'failed to shard' });
subscriber.next(mergedResponse);
if (retry()) {
return;
}
nextRequest();
},
});
};
const response = new Observable<DataQueryResponse>((subscriber) => {
groupTargetsByQueryType(splittingTargets, datasource, request).then((groupedRequests) => {
runNextRequest(subscriber, 0, groupedRequests);
});
return () => {
shouldStop = true;
if (retryTimer) {
clearTimeout(retryTimer);
}
if (subquerySubscription != null) {
subquerySubscription.unsubscribe();
subquerySubscription = null;
}
};
});
return response;
}
interface ShardedQueryGroup {
targets: LokiQuery[];
shards?: number[];
groupSize?: number;
cycle?: number;
}
async function groupTargetsByQueryType(
targets: LokiQuery[],
datasource: LokiDatasource,
request: DataQueryRequest<LokiQuery>
) {
const [shardedQueries, otherQueries] = partition(targets, (query) => requestSupportsSharding([query]));
const groups: ShardedQueryGroup[] = [];
if (otherQueries.length) {
groups.push({
targets: otherQueries,
});
}
const selectorPartition = groupBy(shardedQueries, (query) => getSelectorForShardValues(query.expr));
for (const selector in selectorPartition) {
try {
const values = await datasource.languageProvider.fetchLabelValues('__stream_shard__', {
timeRange: request.range,
streamSelector: selector,
});
const shards = values.map((value) => parseInt(value, 10));
if (shards) {
shards.sort((a, b) => b - a);
debug(`Querying ${selector} with shards ${shards.join(', ')}`);
}
groups.push({
targets: selectorPartition[selector],
shards: shards.length ? shards : undefined,
groupSize: shards.length ? getInitialGroupSize(shards) : undefined,
cycle: 0,
});
} catch (error) {
console.error(error, { msg: 'failed to fetch label values for __stream_shard__' });
groups.push({
targets: selectorPartition[selector],
});
}
}
return groups;
}
function groupHasPendingRequests(group: ShardedQueryGroup) {
if (group.cycle === undefined || !group.groupSize || !group.shards) {
return false;
}
const { cycle, groupSize, shards } = group;
const nextCycle = Math.min(cycle + groupSize, shards.length);
group.cycle = nextCycle;
return cycle < shards.length && nextCycle <= shards.length;
}
function updateGroupSizeFromResponse(response: DataQueryResponse, group: ShardedQueryGroup) {
const { groupSize: currentSize } = group;
if (!currentSize) {
return 1;
}
if (!response.data.length) {
// Empty response, increase group size
return currentSize + 1;
}
const metaExecutionTime: QueryResultMetaStat | undefined = response.data[0].meta?.stats?.find(
(stat: QueryResultMetaStat) => stat.displayName === 'Summary: exec time'
);
if (metaExecutionTime) {
const executionTime = Math.round(metaExecutionTime.value);
debug(`${metaExecutionTime.value}`);
// Positive scenarios
if (executionTime <= 1) {
return Math.floor(currentSize * 1.5);
} else if (executionTime < 6) {
return Math.ceil(currentSize * 1.1);
}
// Negative scenarios
if (currentSize === 1) {
return currentSize;
} else if (executionTime < 20) {
return Math.ceil(currentSize * 0.9);
} else {
return Math.floor(currentSize / 2);
}
}
return currentSize;
}
/**
* Prevents the group size for ever being more than maxFactor% of the pending shards.
*/
function constrainGroupSize(cycle: number, groupSize: number, shards: number) {
const maxFactor = 0.7;
return Math.min(groupSize, Math.max(Math.floor((shards - cycle) * maxFactor), 1));
}
function groupShardRequests(shards: number[], start: number, groupSize: number) {
if (start === shards.length) {
return [-1];
}
return shards.slice(start, start + groupSize);
}
function getInitialGroupSize(shards: number[]) {
return Math.floor(Math.sqrt(shards.length));
}
// Enable to output debugging logs
const DEBUG_ENABLED = Boolean(localStorage.getItem(`loki.sharding_debug_enabled`));
function debug(message: string) {
if (!DEBUG_ENABLED) {
return;
}
console.log(message);
}