Loki shard splitting: QOL improvements (#95602)

* Scan direction: add experimental badge

* Logs model: keep data when the response contains errors

* SupplementaryResultError: extend behaviors

* LogsVolumePanel: add custom message for partial shard data

* SupplementaryResultError: add size prop

* SupplementaryResultError: remove size prop and adjust sizes

* Infinite scroll: disable when direction is scan

* Fix lint issues

* logsModel: add logs volume test

* chore: unfocus test

* modifyQuery: add function to add drop statement to queries

* Shard query splitting: split metric queries

* Shard query splitting: drop stream shard from metric queries

* Fix tests

* logsModel: skip coverage data when the direction is scan
This commit is contained in:
Matias Chomicki 2024-10-31 17:58:02 +01:00 committed by GitHub
parent 9f5258717e
commit f539a70d6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 515 additions and 111 deletions

View File

@ -58,6 +58,7 @@ import { LogRows } from 'app/features/logs/components/LogRows';
import { LogRowContextModal } from 'app/features/logs/components/log-context/LogRowContextModal';
import { LogLevelColor, dedupLogRows, filterLogLevels } from 'app/features/logs/logsModel';
import { getLogLevel, getLogLevelFromKey, getLogLevelInfo } from 'app/features/logs/utils';
import { LokiQueryDirection } from 'app/plugins/datasource/loki/dataquery.gen';
import { getState } from 'app/store/store';
import { ExploreItemState, useDispatch } from 'app/types';
@ -733,6 +734,9 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
const filteredLogs = filterRows(logRows, hiddenLogLevels);
const { dedupedRows, dedupCount } = dedupRows(filteredLogs, dedupStrategy);
const navigationRange = createNavigationRange(logRows);
const infiniteScrollAvailable = !logsQueries?.some(
(query) => 'direction' in query && query.direction === LokiQueryDirection.Scan
);
return (
<>
@ -932,7 +936,7 @@ const UnthemedLogs: React.FunctionComponent<Props> = (props: Props) => {
>
<InfiniteScroll
loading={loading}
loadMoreLogs={loadMoreLogs}
loadMoreLogs={infiniteScrollAvailable ? loadMoreLogs : undefined}
range={props.range}
timeZone={timeZone}
rows={logRows}

View File

@ -16,6 +16,7 @@ import {
TimeRange,
TimeZone,
} from '@grafana/data';
import { config } from '@grafana/runtime';
import { Button, InlineField, Alert, useStyles2, SeriesVisibilityChangeMode } from '@grafana/ui';
import { Trans } from 'app/core/internationalization';
@ -87,6 +88,8 @@ export const LogsVolumePanelList = ({
return !isLogsVolumeLimited(data) && zoomRatio && zoomRatio < 1;
});
const canShowPartialData =
config.featureToggles.lokiShardSplitting && logsVolumeData && logsVolumeData.data.length > 0;
const timeoutError = isTimeoutErrorResponse(logsVolumeData);
const from = dateTime(Math.max(absoluteRange.from, allLogsVolumeMaximumRange.from));
@ -95,7 +98,7 @@ export const LogsVolumePanelList = ({
if (logsVolumeData?.state === LoadingState.Loading) {
return <span>Loading...</span>;
} else if (timeoutError) {
} else if (timeoutError && !canShowPartialData) {
return (
<SupplementaryResultError
title="Unable to show log volume"
@ -127,7 +130,7 @@ export const LogsVolumePanelList = ({
onRemove={onClose}
/>
);
} else if (logsVolumeData?.error !== undefined) {
} else if (logsVolumeData?.error !== undefined && !canShowPartialData) {
return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />;
}
@ -143,6 +146,14 @@ export const LogsVolumePanelList = ({
return (
<div className={styles.listContainer}>
{timeoutError && canShowPartialData && (
<SupplementaryResultError
title="Showing partial data"
message="The query is trying to access too much data and some sharded requests could not be completed. Try decreasing the time range or adding more labels to your query."
severity="info"
dismissable
/>
)}
{Object.keys(logVolumes).map((name, index) => {
return (
<LogsVolumePanel

View File

@ -1,5 +1,5 @@
import { css } from '@emotion/css';
import { ReactNode, useState } from 'react';
import { ReactNode, useCallback, useState } from 'react';
import { DataQueryError, GrafanaTheme2 } from '@grafana/data';
import { Alert, AlertVariant, Button, useTheme2 } from '@grafana/ui';
@ -12,12 +12,14 @@ type Props = {
suggestedAction?: string;
onSuggestedAction?(): void;
onRemove?(): void;
dismissable?: boolean;
};
const SHORT_ERROR_MESSAGE_LIMIT = 100;
export function SupplementaryResultError(props: Props) {
const [isOpen, setIsOpen] = useState(false);
const [dismissed, setDismissed] = useState(false);
const { error, title, suggestedAction, onSuggestedAction, onRemove, severity = 'warning' } = props;
const { dismissable, error, title, suggestedAction, onSuggestedAction, onRemove, severity = 'warning' } = props;
// generic get-error-message-logic, taken from
// /public/app/features/explore/ErrorContainer.tsx
const message = props.message ?? error?.message ?? error?.data?.message ?? '';
@ -25,11 +27,21 @@ export function SupplementaryResultError(props: Props) {
const theme = useTheme2();
const styles = getStyles(theme);
const dismiss = useCallback(() => {
setDismissed(true);
}, []);
const handleRemove = dismissable ? dismiss : onRemove;
if (dismissed) {
return null;
}
return (
<div className={styles.supplementaryErrorContainer}>
<Alert title={title} severity={severity} onRemove={onRemove}>
<Alert title={title} severity={severity} onRemove={handleRemove}>
{showButton ? (
<div className={styles.suggestedActionWrapper}>
<div className={styles.messageWrapper}>
{!isOpen ? (
<Button
variant="secondary"
@ -45,7 +57,7 @@ export function SupplementaryResultError(props: Props) {
)}
</div>
) : (
<div className={styles.suggestedActionWrapper}>
<div className={`${styles.messageWrapper} ${styles.suggestedActionWrapper}`}>
{message}
{suggestedAction && onSuggestedAction && (
<Button variant="primary" size="xs" onClick={onSuggestedAction}>
@ -62,26 +74,24 @@ export function SupplementaryResultError(props: Props) {
const getStyles = (theme: GrafanaTheme2) => {
return {
supplementaryErrorContainer: css({
width: '50%',
width: '60%',
minWidth: `${theme.breakpoints.values.sm}px`,
maxWidth: `${theme.breakpoints.values.md}px`,
margin: '0 auto',
[theme.breakpoints.down('lg')]: {
width: '70%',
},
[theme.breakpoints.down('md')]: {
width: '100%',
},
}),
suggestedActionWrapper: css({
messageWrapper: css({
minHeight: theme.spacing(3),
['button']: {
position: 'absolute',
right: theme.spacing(2),
bottom: theme.spacing(2),
},
['ul']: {
paddingLeft: theme.spacing(2),
},
['button']: {
position: 'absolute',
bottom: theme.spacing(2),
right: theme.spacing(2),
},
}),
suggestedActionWrapper: css({
paddingBottom: theme.spacing(5),
}),
};
};

View File

@ -162,7 +162,7 @@ export const InfiniteScroll = ({
<>
{upperLoading && <LoadingIndicator adjective={sortOrder === LogsSortOrder.Descending ? 'newer' : 'older'} />}
{!hideTopMessage && upperOutOfRange && outOfRangeMessage}
{sortOrder === LogsSortOrder.Ascending && app === CoreApp.Explore && (
{sortOrder === LogsSortOrder.Ascending && app === CoreApp.Explore && loadMoreLogs && (
<Button className={styles.navButton} variant="secondary" onClick={loadOlderLogs} disabled={loading}>
<div className={styles.navButtonContent}>
<Icon name="angle-up" size="lg" />

View File

@ -10,6 +10,7 @@ import {
DataTopic,
dateTimeParse,
FieldType,
getDefaultTimeRange,
LoadingState,
LogLevel,
LogRowModel,
@ -22,6 +23,7 @@ import {
} from '@grafana/data';
import { config } from '@grafana/runtime';
import { getMockFrames } from 'app/plugins/datasource/loki/__mocks__/frames';
import { LokiQueryDirection } from 'app/plugins/datasource/loki/dataquery.gen';
import { MockObservableDataSourceApi } from '../../../test/mocks/datasource_srv';
@ -481,39 +483,7 @@ describe('dataFrameToLogsModel', () => {
});
it('given one series with limit as custom meta property should return correct limit', () => {
const series: DataFrame[] = [
createDataFrame({
fields: [
{
name: 'time',
type: FieldType.time,
values: ['2019-04-26T09:28:11.352440161Z', '2019-04-26T14:42:50.991981292Z'],
},
{
name: 'message',
type: FieldType.string,
values: [
't=2019-04-26T11:05:28+0200 lvl=info msg="Initializing DatasourceCacheService" logger=server',
't=2019-04-26T16:42:50+0200 lvl=eror msg="new token…t unhashed token=56d9fdc5c8b7400bd51b060eea8ca9d7',
],
labels: {
filename: '/var/log/grafana/grafana.log',
job: 'grafana',
},
},
{
name: 'id',
type: FieldType.string,
values: ['foo', 'bar'],
},
],
meta: {
custom: {
limit: 1000,
},
},
}),
];
const series: DataFrame[] = getTestDataFrame();
const logsModel = dataFrameToLogsModel(series, 1);
expect(logsModel.meta![1]).toMatchObject({
label: LIMIT_LABEL,
@ -522,6 +492,81 @@ describe('dataFrameToLogsModel', () => {
});
});
it('should return the expected meta when the line limit is reached', () => {
const series: DataFrame[] = getTestDataFrame();
series[0].meta = {
custom: {
limit: 2,
},
};
const timeRange = {
from: 1556270800000,
to: 1556270899999,
};
const queries = [
{
expr: 'test',
refId: 'A',
},
];
const logsModel = dataFrameToLogsModel(
series,
1,
{ from: timeRange.from.valueOf(), to: timeRange.to.valueOf() },
queries
);
expect(logsModel.meta).toEqual([
{
label: 'Common labels',
value: { filename: '/var/log/grafana/grafana.log', job: 'grafana' },
kind: 2,
},
{
label: 'Line limit',
value: '2 reached, received logs cover 8.65% (9sec) of your selected time range (1min 40sec)',
kind: 1,
},
]);
});
it('should skip the time coverage when the query direction is Scan', () => {
const series: DataFrame[] = getTestDataFrame();
series[0].meta = {
custom: {
limit: 2,
},
};
const timeRange = {
from: 1556270800000,
to: 1556270899999,
};
const queries = [
{
expr: 'test',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
];
const logsModel = dataFrameToLogsModel(
series,
1,
{ from: timeRange.from.valueOf(), to: timeRange.to.valueOf() },
queries
);
expect(logsModel.meta).toEqual([
{
label: 'Common labels',
value: { filename: '/var/log/grafana/grafana.log', job: 'grafana' },
kind: 2,
},
{
label: 'Line limit',
value: '2 reached',
kind: 1,
},
]);
});
it('given one series with labels-field should return expected logs model', () => {
const series: DataFrame[] = [
createDataFrame({
@ -1714,6 +1759,117 @@ describe('logs sample', () => {
});
});
describe('logs volume', () => {
class TestDataQuery implements DataQuery {
refId = 'A';
target = '';
}
let logsVolumeProvider: Observable<DataQueryResponse>,
datasource: MockObservableDataSourceApi,
request: DataQueryRequest<TestDataQuery>;
function createFrame() {
return toDataFrame({
fields: [
{
name: 'Time',
type: FieldType.time,
config: {},
values: [3000000, 4000000],
},
{
name: 'Value',
type: FieldType.number,
config: {},
values: [5, 4],
labels: {
level: 'debug',
},
},
],
});
}
function setup(datasourceSetup: () => void) {
datasourceSetup();
request = {
targets: [{ target: 'logs sample query 1' }, { target: 'logs sample query 2' }],
range: getDefaultTimeRange(),
scopedVars: {},
} as unknown as DataQueryRequest<TestDataQuery>;
logsVolumeProvider = queryLogsVolume(datasource, request, { targets: request.targets });
}
const dataFrame = createFrame();
function setupResult() {
datasource = new MockObservableDataSourceApi('loki', [
{
data: [dataFrame],
},
]);
}
function setupError() {
datasource = new MockObservableDataSourceApi('loki', [], undefined, 'Error message');
}
function setupErrorWithData() {
datasource = new MockObservableDataSourceApi(
'loki',
[
{
data: [dataFrame],
},
],
undefined,
'Error message'
);
}
it('returns logs volume data', async () => {
setup(setupResult);
await expect(logsVolumeProvider).toEmitValuesWith((received) => {
expect(received).toContainEqual({ state: LoadingState.Loading, error: undefined, data: [] });
expect(received).toContainEqual(
expect.objectContaining({
data: expect.arrayContaining([dataFrame]),
})
);
});
});
it('returns errors', async () => {
setup(setupError);
await expect(logsVolumeProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ state: LoadingState.Loading, error: undefined, data: [] },
{
state: LoadingState.Error,
error: 'Error message',
data: [],
},
'Error message',
]);
});
});
it('returns errors and data', async () => {
setup(setupErrorWithData);
await expect(logsVolumeProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ state: LoadingState.Loading, error: undefined, data: [] },
{
state: LoadingState.Error,
error: 'Error message',
data: [],
},
'Error message',
]);
});
});
});
const mockLogRow = {
dataFrame: toDataFrame({
fields: [
@ -1770,3 +1926,39 @@ describe('logRowToDataFrame', () => {
expect(result?.refId).toBe(mockLogRow.dataFrame.refId);
});
});
function getTestDataFrame() {
return [
createDataFrame({
fields: [
{
name: 'time',
type: FieldType.time,
values: ['2019-04-26T09:28:11.352440161Z', '2019-04-26T14:42:50.991981292Z'],
},
{
name: 'message',
type: FieldType.string,
values: [
't=2019-04-26T11:05:28+0200 lvl=info msg="Initializing DatasourceCacheService" logger=server',
't=2019-04-26T16:42:50+0200 lvl=eror msg="new token…t unhashed token=56d9fdc5c8b7400bd51b060eea8ca9d7',
],
labels: {
filename: '/var/log/grafana/grafana.log',
job: 'grafana',
},
},
{
name: 'id',
type: FieldType.string,
values: ['foo', 'bar'],
},
],
meta: {
custom: {
limit: 1000,
},
},
}),
];
}

View File

@ -44,6 +44,7 @@ import { config } from '@grafana/runtime';
import { BarAlignment, GraphDrawStyle, StackingMode } from '@grafana/schema';
import { colors } from '@grafana/ui';
import { getThemeColor } from 'app/core/utils/colors';
import { LokiQueryDirection } from 'app/plugins/datasource/loki/types';
import { LogsFrame, parseLogsFrame } from './logsFrame';
import { createLogRowsMap, getLogLevel, getLogLevelFromKey, sortInAscendingOrder } from './utils';
@ -222,6 +223,7 @@ export function dataFrameToLogsModel(
const logsModel = logSeriesToLogsModel(logSeries, queries, Boolean(deduplicateResults));
if (logsModel) {
logsModel.queries = queries;
// Create histogram metrics from logs using the interval as bucket size for the line count
if (intervalMs && logsModel.rows.length > 0) {
const sortedRows = logsModel.rows.sort(sortInAscendingOrder);
@ -240,7 +242,6 @@ export function dataFrameToLogsModel(
} else {
logsModel.series = [];
}
logsModel.queries = queries;
return logsModel;
}
@ -556,11 +557,19 @@ function adjustMetaInfo(logsModel: LogsModel, visibleRangeMs?: number, requested
let metaLimitValue;
if (limit === logsModel.rows.length && visibleRangeMs && requestedRangeMs) {
const coverage = ((visibleRangeMs / requestedRangeMs) * 100).toFixed(2);
metaLimitValue = `${limit} reached`;
metaLimitValue = `${limit} reached, received logs cover ${coverage}% (${rangeUtil.msRangeToTimeString(
visibleRangeMs
)}) of your selected time range (${rangeUtil.msRangeToTimeString(requestedRangeMs)})`;
// Scan is a special Loki query direction which potentially returns fewer logs than expected.
const canShowCoverage = !logsModel.queries?.some(
(query) => 'direction' in query && query.direction === LokiQueryDirection.Scan
);
if (canShowCoverage) {
const coverage = ((visibleRangeMs / requestedRangeMs) * 100).toFixed(2);
metaLimitValue += `, received logs cover ${coverage}% (${rangeUtil.msRangeToTimeString(
visibleRangeMs
)}) of your selected time range (${rangeUtil.msRangeToTimeString(requestedRangeMs)})`;
}
} else {
const description = config.featureToggles.logsInfiniteScrolling ? 'displayed' : 'returned';
metaLimitValue = `${limit} (${logsModel.rows.length} ${description})`;
@ -683,7 +692,7 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
observer.next({
state: LoadingState.Error,
error,
data: [],
data: dataQueryResponse.data,
});
observer.error(error);
} else {

View File

@ -36,12 +36,16 @@ export const queryDirections: Array<SelectableValue<LokiQueryDirection>> = [
label: 'Forward',
description: 'Search in forward direction.',
},
{
];
if (config.featureToggles.lokiShardSplitting) {
queryDirections.push({
value: LokiQueryDirection.Scan,
label: 'Scan',
description: 'Split the query into smaller units and stop at the requested log line limit.',
},
];
description: 'Experimental. Split the query into smaller units and stop at the requested log line limit.',
icon: 'exclamation-triangle',
});
}
export function getQueryDirectionLabel(direction: LokiQueryDirection) {
return queryDirections.find((queryDirection) => queryDirection.value === direction)?.label ?? 'Unknown';

View File

@ -1,6 +1,7 @@
import { SyntaxNode } from '@lezer/common';
import {
addDropToQuery,
addLabelFormatToQuery,
addLabelToQuery,
addLineFilter,
@ -205,6 +206,44 @@ describe('addLabelFormatToQuery', () => {
});
});
describe('addDropToQuery', () => {
describe('when query has a line filter', () => {
it('should add drop after the line filter', () => {
expect(addDropToQuery('{job="grafana"} |= "error"', ['__stream_shard__'])).toBe(
'{job="grafana"} |= "error" | drop __stream_shard__'
);
});
it('should add the parser after multiple line filters', () => {
expect(addDropToQuery('{job="grafana"} |= "error" |= "info" |= "debug"', ['label1', 'label2'])).toBe(
'{job="grafana"} |= "error" |= "info" |= "debug" | drop label1, label2'
);
});
});
describe('when the query has no line filters', () => {
it('should add the parser after the log stream selector in logs query', () => {
expect(addDropToQuery('{job="grafana"}', ['label1', 'label2'])).toBe('{job="grafana"} | drop label1, label2');
});
it('should add the parser after the log stream selector in a metric query', () => {
expect(addDropToQuery('rate({job="grafana"} [5m])', ['__stream_shard__'])).toBe(
'rate({job="grafana"} | drop __stream_shard__ [5m])'
);
});
it('should modify all metric queries', () => {
expect(
addDropToQuery('sum(count_over_time({job="grafana"} [5m])) + sum(count_over_time({job="grafana"} [5m]))', [
'__stream_shard__',
])
).toBe(
'sum(count_over_time({job="grafana"} | drop __stream_shard__ [5m])) + sum(count_over_time({job="grafana"} | drop __stream_shard__ [5m]))'
);
});
});
});
describe('removeCommentsFromQuery', () => {
it.each`
query | expectedResult

View File

@ -239,16 +239,62 @@ export function addParserToQuery(query: string, parser: string): string {
const lineFilterPositions = getLineFiltersPositions(query);
if (lineFilterPositions.length) {
return addParser(query, lineFilterPositions, parser);
return appendToLogsQuery(query, lineFilterPositions, parser);
} else {
const streamSelectorPositions = getStreamSelectorPositions(query);
if (!streamSelectorPositions.length) {
return query;
}
return addParser(query, streamSelectorPositions, parser);
return appendToLogsQuery(query, streamSelectorPositions, parser);
}
}
/**
* Adds a drop statement to the query.
* It uses LogQL parser to find instances of stream selectors or line filters and adds parser after them.
*
* @param query
* @param parser
*/
export function addDropToQuery(query: string, labelsToDrop: string[]): string {
const lineFilterPositions = getLineFiltersPositions(query);
if (lineFilterPositions.length) {
return appendToLogsQuery(query, lineFilterPositions, `drop ${labelsToDrop.join(', ')}`);
} else {
const streamSelectorPositions = getStreamSelectorPositions(query);
if (!streamSelectorPositions.length) {
return query;
}
return appendToLogsQuery(query, streamSelectorPositions, `drop ${labelsToDrop.join(', ')}`);
}
}
/**
* Adds a statement after line filters or stream selectors
* @param query
* @param queryPartPositions
* @param parser
*/
function appendToLogsQuery(query: string, queryPartPositions: NodePosition[], statement: string): string {
let newQuery = '';
let prev = 0;
for (let i = 0; i < queryPartPositions.length; i++) {
// Splice on a string for each matched vector selector
const match = queryPartPositions[i];
const isLast = i === queryPartPositions.length - 1;
const start = query.substring(prev, match.to);
const end = isLast ? query.substring(match.to) : '';
// Add parser
newQuery += start + ` | ${statement}` + end;
prev = match.to;
}
return newQuery;
}
/**
* Adds filtering for pipeline errors to existing query. Useful for query modification for hints.
* It uses LogQL parser to find parsers and adds pipeline errors filtering after them.
@ -494,31 +540,6 @@ export function addFilterAsLabelFilter(
return newQuery;
}
/**
* Add parser after line filter or stream selector
* @param query
* @param queryPartPositions
* @param parser
*/
function addParser(query: string, queryPartPositions: NodePosition[], parser: string): string {
let newQuery = '';
let prev = 0;
for (let i = 0; i < queryPartPositions.length; i++) {
// Splice on a string for each matched vector selector
const match = queryPartPositions[i];
const isLast = i === queryPartPositions.length - 1;
const start = query.substring(prev, match.to);
const end = isLast ? query.substring(match.to) : '';
// Add parser
newQuery += start + ` | ${parser}` + end;
prev = match.to;
}
return newQuery;
}
/**
* Add filter as label filter after the parsers
* @param query

View File

@ -17,6 +17,7 @@ import {
getNormalizedLokiQuery,
getNodePositionsFromQuery,
getLogQueryFromMetricsQueryAtPosition,
interpolateShardingSelector,
} from './queryUtils';
import { LokiQuery, LokiQueryType } from './types';
@ -489,3 +490,103 @@ describe('getNodePositionsFromQuery', () => {
expect(nodePositions.length).toBe(0);
});
});
describe('interpolateShardingSelector', () => {
let queries: LokiQuery[] = [];
beforeEach(() => {
queries = [
{
refId: 'A',
expr: '{job="grafana"}',
},
];
});
it('returns the original query when there are no shards to interpolate', () => {
expect(interpolateShardingSelector(queries, [])).toEqual(queries);
});
it('adds the empty shard to the query when -1 is passed', () => {
expect(interpolateShardingSelector(queries, [-1])).toEqual([
{
refId: 'A',
expr: '{job="grafana", __stream_shard__=""}',
},
]);
});
it('uses an equality filter when a single shard is passed', () => {
expect(interpolateShardingSelector(queries, [13])).toEqual([
{
refId: 'A',
expr: '{job="grafana", __stream_shard__="13"}',
},
]);
});
it('supports multiple shard values', () => {
expect(interpolateShardingSelector(queries, [1, 13, 667])).toEqual([
{
refId: 'A',
expr: '{job="grafana", __stream_shard__=~"1|13|667"}',
},
]);
});
describe('For metric queries', () => {
let queries: LokiQuery[] = [];
beforeEach(() => {
queries = [
{
refId: 'A',
expr: 'count_over_time({job="grafana"} [5m])',
},
];
});
it('returns the original query when there are no shards to interpolate', () => {
expect(interpolateShardingSelector(queries, [])).toEqual(queries);
});
it('adds the empty shard to the query when -1 is passed', () => {
expect(interpolateShardingSelector(queries, [-1])).toEqual([
{
refId: 'A',
expr: 'count_over_time({job="grafana", __stream_shard__=""} | drop __stream_shard__ [5m])',
},
]);
});
it('uses an equality filter when a single shard is passed', () => {
expect(interpolateShardingSelector(queries, [13])).toEqual([
{
refId: 'A',
expr: 'count_over_time({job="grafana", __stream_shard__="13"} | drop __stream_shard__ [5m])',
},
]);
});
it('supports multiple shard values', () => {
expect(interpolateShardingSelector(queries, [1, 13, 667])).toEqual([
{
refId: 'A',
expr: 'count_over_time({job="grafana", __stream_shard__=~"1|13|667"} | drop __stream_shard__ [5m])',
},
]);
});
it('supports multiple metric queries values', () => {
const queries = [
{
refId: 'A',
expr: 'count_over_time({job="grafana"} [5m]) + count_over_time({job="grafana"} [5m])',
},
];
expect(interpolateShardingSelector(queries, [1, 13, 667])).toEqual([
{
refId: 'A',
expr: 'count_over_time({job="grafana", __stream_shard__=~"1|13|667"} | drop __stream_shard__ [5m]) + count_over_time({job="grafana", __stream_shard__=~"1|13|667"} | drop __stream_shard__ [5m])',
},
]);
});
});
});

View File

@ -25,8 +25,7 @@ import {
} from '@grafana/lezer-logql';
import { DataQuery } from '@grafana/schema';
import { REF_ID_STARTER_LOG_VOLUME } from './datasource';
import { addLabelToQuery, getStreamSelectorPositions, NodePosition } from './modifyQuery';
import { addDropToQuery, addLabelToQuery, getStreamSelectorPositions, NodePosition } from './modifyQuery';
import { ErrorId } from './querybuilder/parsingUtils';
import { LabelType, LokiQuery, LokiQueryDirection, LokiQueryType } from './types';
@ -319,9 +318,7 @@ export function requestSupportsSharding(allQueries: LokiQuery[]) {
.filter((query) => !query.hide)
.filter((query) => !query.refId.includes('do-not-shard'))
.filter((query) => query.expr)
.filter(
(query) => query.direction === LokiQueryDirection.Scan || query.refId?.startsWith(REF_ID_STARTER_LOG_VOLUME)
);
.filter((query) => query.direction === LokiQueryDirection.Scan || !isLogsQuery(query.expr));
return queries.length > 0;
}
@ -354,16 +351,24 @@ export const interpolateShardingSelector = (queries: LokiQuery[], shards: number
shardValue = shardValue === '-1' ? '' : shardValue;
return queries.map((query) => ({
...query,
expr: addLabelToQuery(query.expr, '__stream_shard__', '=', shardValue, LabelType.Indexed),
expr: addStreamShardLabelsToQuery(query.expr, '=', shardValue),
}));
}
return queries.map((query) => ({
...query,
expr: addLabelToQuery(query.expr, '__stream_shard__', '=~', shardValue, LabelType.Indexed),
expr: addStreamShardLabelsToQuery(query.expr, '=~', shardValue),
}));
};
function addStreamShardLabelsToQuery(query: string, operator: string, shardValue: string) {
const shardedQuery = addLabelToQuery(query, '__stream_shard__', operator, shardValue, LabelType.Indexed);
if (!isLogsQuery(query)) {
return addDropToQuery(shardedQuery, ['__stream_shard__']);
}
return shardedQuery;
}
export const getSelectorForShardValues = (query: string) => {
const selector = getNodesFromQuery(query, [Selector]);
if (selector.length > 0) {

View File

@ -81,7 +81,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_0_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"20|10"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"20|10"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -94,7 +94,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_2_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -107,7 +107,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_4_1',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__="1"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -119,7 +119,11 @@ describe('runShardSplitQuery()', () => {
range: expect.any(Object),
requestId: 'TEST_shard_0_5_1',
targets: [
{ expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
{
expr: 'count_over_time({a="b", __stream_shard__=""} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
});
@ -145,7 +149,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_0_2',
targets: [
{
expr: 'count_over_time({service_name="test", filter="true", __stream_shard__=~"20|10"}[1m])',
expr: 'count_over_time({service_name="test", filter="true", __stream_shard__=~"20|10"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -364,7 +368,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_0_3',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"20|10|9"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"20|10|9"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -378,7 +382,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_3_4',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"8|7|6|5"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"8|7|6|5"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -392,7 +396,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_3_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"8|7"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"8|7"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -406,7 +410,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_5_3',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"6|5|4"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"6|5|4"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -420,7 +424,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_8_2',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__=~"3|2"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -434,7 +438,7 @@ describe('runShardSplitQuery()', () => {
requestId: 'TEST_shard_0_10_1',
targets: [
{
expr: 'count_over_time({a="b", __stream_shard__="1"}[1m])',
expr: 'count_over_time({a="b", __stream_shard__="1"} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
@ -447,7 +451,11 @@ describe('runShardSplitQuery()', () => {
range: expect.any(Object),
requestId: 'TEST_shard_0_11_1',
targets: [
{ expr: 'count_over_time({a="b", __stream_shard__=""}[1m])', refId: 'A', direction: LokiQueryDirection.Scan },
{
expr: 'count_over_time({a="b", __stream_shard__=""} | drop __stream_shard__[1m])',
refId: 'A',
direction: LokiQueryDirection.Scan,
},
],
});
});