Explore: Support mixed data sources for supplementary query (#63036)

* Consolidate logs volume logic (full range and limited)

* Fix showing limited histogram message

* Test passing meta data to logs volume provider

* Improve readability

* Clean up types

* Add basic support for multiple log volumes

* Move the comment back to the right place

* Improve readability

* Clean up the logic to support Logs Samples

* Update docs

* Sort log volumes

* Provide title to logs volume panel

* Move logs volume cache to the provider factory

* Add helper functions

* Reuse only if queries are the same

* Fix alphabetical sorting

* Move caching out of the provider

* Support errors and loading state

* Remove unused code

* Consolidate supplementary query utils

* Add tests for supplementaryQueries

* Update tests

* Simplify logs volume extra info

* Update tests

* Remove comment

* Update tests

* Fix hiding the histogram for hidden queries

* Simplify loading message

* Update tests

* Wait for full fallback histogram to load before showing it

* Fix a typo

* Add feedback comments

* Move feedback comments to github

* Do not filter out hidden queries as they may be used as references in other queries

* Group log volume by refId

* Support showing fallback histograms per query to avoid duplicates

* Improve type-checking

* Fix supplementaryQueries.test.ts

* Fix logsModel.test.ts

* Fix loading fallback results

* Fix unit tests

* WIP

* Update deprecated styles

* Simplify test

* Simplify rendering zoom info

* Update deprecated styles

* Simplify getLogsVolumeDataSourceInfo

* Simplify isLogsVolumeLimited()

* Simplify rendering zoom info
This commit is contained in:
Piotr Jamróz 2023-03-07 15:00:11 +01:00 committed by GitHub
parent b2d7bea78b
commit a7238ba933
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 943 additions and 417 deletions

View File

@ -2279,10 +2279,6 @@ exports[`better eslint`] = {
"public/app/core/history/richHistoryLocalStorageUtils.ts:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"]
],
"public/app/core/logsModel.ts:5381": [
[0, 0, 0, "Do not use any type assertions.", "0"],
[0, 0, 0, "Do not use any type assertions.", "1"]
],
"public/app/core/navigation/GrafanaRoute.test.tsx:5381": [
[0, 0, 0, "Unexpected any. Specify a different type.", "0"],
[0, 0, 0, "Unexpected any. Specify a different type.", "1"]

View File

@ -87,6 +87,7 @@ export interface LogsModel {
// visibleRange is time range for histogram created from log results
visibleRange?: AbsoluteTimeRange;
queries?: DataQuery[];
bucketSize?: number;
}
export interface LogSearchMatch {
@ -194,6 +195,40 @@ export enum LogsVolumeType {
Limited = 'Limited',
}
/**
* Custom meta information required by Logs Volume responses
*/
export type LogsVolumeCustomMetaData = {
absoluteRange: AbsoluteTimeRange;
logsVolumeType: LogsVolumeType;
datasourceName: string;
sourceQuery: DataQuery;
};
export const getLogsVolumeAbsoluteRange = (
dataFrames: DataFrame[],
defaultRange: AbsoluteTimeRange
): AbsoluteTimeRange => {
return dataFrames[0].meta?.custom?.absoluteRange || defaultRange;
};
export const getLogsVolumeDataSourceInfo = (dataFrames: DataFrame[]): { name: string; refId: string } | null => {
const customMeta = dataFrames[0]?.meta?.custom;
if (customMeta && customMeta.datasourceName && customMeta.sourceQuery?.refId) {
return {
name: customMeta.datasourceName,
refId: customMeta.sourceQuery.refId,
};
}
return null;
};
export const isLogsVolumeLimited = (dataFrames: DataFrame[]) => {
return dataFrames[0]?.meta?.custom?.logsVolumeType === LogsVolumeType.Limited;
};
/**
* Data sources that support supplementary queries in Explore.
* This will enable users to see additional data when running original queries.

View File

@ -29,8 +29,8 @@ import {
getSeriesProperties,
LIMIT_LABEL,
logSeriesToLogsModel,
queryLogsVolume,
queryLogsSample,
queryLogsVolume,
} from './logsModel';
const FROM = dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' });
@ -1123,8 +1123,9 @@ describe('logs volume', () => {
datasource: MockObservableDataSourceApi,
request: DataQueryRequest<TestDataQuery>;
function createFrame(labels: object, timestamps: number[], values: number[]) {
function createFrame(labels: object, timestamps: number[], values: number[], refId: string) {
return toDataFrame({
refId,
fields: [
{ name: 'Time', type: FieldType.time, values: timestamps },
{
@ -1137,20 +1138,13 @@ describe('logs volume', () => {
});
}
function createExpectedFields(levelName: string) {
return [
expect.objectContaining({ name: 'Time' }),
expect.objectContaining({
name: 'Value',
config: expect.objectContaining({ displayNameFromDS: levelName }),
}),
];
}
function setup(datasourceSetup: () => void) {
datasourceSetup();
request = {
targets: [{ target: 'volume query 1' }, { target: 'volume query 2' }],
targets: [
{ refId: 'A', target: 'volume query 1' },
{ refId: 'B', target: 'volume query 2' },
],
scopedVars: {},
} as unknown as DataQueryRequest<TestDataQuery>;
volumeProvider = queryLogsVolume(datasource, request, {
@ -1168,19 +1162,21 @@ describe('logs volume', () => {
function setupMultipleResults() {
// level=unknown
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5]);
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5], 'A');
// level=error
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0]);
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0], 'B');
// level=unknown
const resultBFrame1 = createFrame({ app: 'app02' }, [100, 200, 300], [1, 2, 3]);
const resultBFrame1 = createFrame({ app: 'app02' }, [100, 200, 300], [1, 2, 3], 'A');
// level=error
const resultBFrame2 = createFrame({ app: 'app02', level: 'error' }, [100, 200, 300], [1, 1, 1]);
const resultBFrame2 = createFrame({ app: 'app02', level: 'error' }, [100, 200, 300], [1, 1, 1], 'B');
datasource = new MockObservableDataSourceApi('loki', [
{
state: LoadingState.Loading,
data: [resultAFrame1, resultAFrame2],
},
{
state: LoadingState.Done,
data: [resultBFrame1, resultBFrame2],
},
]);
@ -1188,9 +1184,9 @@ describe('logs volume', () => {
function setupMultipleResultsStreaming() {
// level=unknown
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5]);
const resultAFrame1 = createFrame({ app: 'app01' }, [100, 200, 300], [5, 5, 5], 'A');
// level=error
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0]);
const resultAFrame2 = createFrame({ app: 'app01', level: 'error' }, [100, 200, 300], [0, 1, 0], 'B');
datasource = new MockObservableDataSourceApi('loki', [
{
@ -1198,7 +1194,7 @@ describe('logs volume', () => {
data: [resultAFrame1],
},
{
state: LoadingState.Streaming,
state: LoadingState.Done,
data: [resultAFrame1, resultAFrame2],
},
]);
@ -1221,14 +1217,8 @@ describe('logs volume', () => {
fields: expect.anything(),
meta: {
custom: {
targets: [
{
target: 'volume query 1',
},
{
target: 'volume query 2',
},
],
sourceQuery: { refId: 'A', target: 'volume query 1' },
datasourceName: 'loki',
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: {
from: FROM.valueOf(),
@ -1243,7 +1233,7 @@ describe('logs volume', () => {
});
});
it('applies correct meta datya when streaming', async () => {
it('applies correct meta data when streaming', async () => {
setup(setupMultipleResultsStreaming);
await expect(volumeProvider).toEmitValuesWith((received) => {
@ -1256,14 +1246,8 @@ describe('logs volume', () => {
fields: expect.anything(),
meta: {
custom: {
targets: [
{
target: 'volume query 1',
},
{
target: 'volume query 2',
},
],
sourceQuery: { refId: 'A', target: 'volume query 1' },
datasourceName: 'loki',
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: {
from: FROM.valueOf(),
@ -1278,22 +1262,6 @@ describe('logs volume', () => {
});
});
it('aggregates data frames by level', async () => {
setup(setupMultipleResults);
await expect(volumeProvider).toEmitValuesWith((received) => {
expect(received).toContainEqual({
state: LoadingState.Done,
error: undefined,
data: expect.arrayContaining([
expect.objectContaining({
fields: expect.arrayContaining(createExpectedFields('error')),
}),
]),
});
});
});
it('returns error', async () => {
setup(setupErrorResponse);

View File

@ -1,4 +1,4 @@
import { size } from 'lodash';
import { groupBy, size } from 'lodash';
import { from, isObservable, Observable } from 'rxjs';
import {
@ -13,7 +13,6 @@ import {
dateTimeFormatTimeAgo,
FieldCache,
FieldColorModeId,
FieldConfig,
FieldType,
FieldWithIndex,
findCommonLabels,
@ -27,8 +26,8 @@ import {
LogsMetaItem,
LogsMetaKind,
LogsModel,
LogsVolumeCustomMetaData,
LogsVolumeType,
MutableDataFrame,
rangeUtil,
ScopedVars,
sortDataFrame,
@ -225,6 +224,7 @@ export function dataFrameToLogsModel(
absoluteRange
);
logsModel.visibleRange = visibleRange;
logsModel.bucketSize = bucketSize;
logsModel.series = makeDataFramesForLogs(sortedRows, bucketSize);
if (logsModel.meta) {
@ -605,69 +605,22 @@ function getLogVolumeFieldConfig(level: LogLevel, oneLevelDetected: boolean) {
};
}
/**
* Take multiple data frames, sum up values and group by level.
* Return a list of data frames, each representing single level.
*/
export function aggregateRawLogsVolume(
rawLogsVolume: DataFrame[],
extractLevel: (dataFrame: DataFrame) => LogLevel
): DataFrame[] {
const logsVolumeByLevelMap: Partial<Record<LogLevel, DataFrame[]>> = {};
rawLogsVolume.forEach((dataFrame) => {
const level = extractLevel(dataFrame);
if (!logsVolumeByLevelMap[level]) {
logsVolumeByLevelMap[level] = [];
const updateLogsVolumeConfig = (
dataFrame: DataFrame,
extractLevel: (dataFrame: DataFrame) => LogLevel,
oneLevelDetected: boolean
): DataFrame => {
dataFrame.fields = dataFrame.fields.map((field) => {
if (field.type === FieldType.number) {
field.config = {
...field.config,
...getLogVolumeFieldConfig(extractLevel(dataFrame), oneLevelDetected),
};
}
logsVolumeByLevelMap[level]!.push(dataFrame);
return field;
});
return Object.keys(logsVolumeByLevelMap).map((level: string) => {
return aggregateFields(
logsVolumeByLevelMap[level as LogLevel]!,
getLogVolumeFieldConfig(level as LogLevel, Object.keys(logsVolumeByLevelMap).length === 1)
);
});
}
/**
* Aggregate multiple data frames into a single data frame by adding values.
* Multiple data frames for the same level are passed here to get a single
* data frame for a given level. Aggregation by level happens in aggregateRawLogsVolume()
*/
function aggregateFields(dataFrames: DataFrame[], config: FieldConfig): DataFrame {
const aggregatedDataFrame = new MutableDataFrame();
if (!dataFrames.length) {
return aggregatedDataFrame;
}
const totalLength = dataFrames[0].length;
const timeField = new FieldCache(dataFrames[0]).getFirstFieldOfType(FieldType.time);
if (!timeField) {
return aggregatedDataFrame;
}
aggregatedDataFrame.addField({ name: 'Time', type: FieldType.time }, totalLength);
aggregatedDataFrame.addField({ name: 'Value', type: FieldType.number, config }, totalLength);
dataFrames.forEach((dataFrame) => {
dataFrame.fields.forEach((field) => {
if (field.type === FieldType.number) {
for (let pointIndex = 0; pointIndex < totalLength; pointIndex++) {
const currentValue = aggregatedDataFrame.get(pointIndex).Value;
const valueToAdd = field.values.get(pointIndex);
const totalValue =
currentValue === null && valueToAdd === null ? null : (currentValue || 0) + (valueToAdd || 0);
aggregatedDataFrame.set(pointIndex, { Value: totalValue, Time: timeField.values.get(pointIndex) });
}
}
});
});
return aggregatedDataFrame;
}
return dataFrame;
};
type LogsVolumeQueryOptions<T extends DataQuery> = {
extractLevel: (dataFrame: DataFrame) => LogLevel;
@ -697,7 +650,7 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
logsVolumeRequest.hideFromInspector = true;
return new Observable((observer) => {
let rawLogsVolume: DataFrame[] = [];
let logsVolumeData: DataFrame[] = [];
observer.next({
state: LoadingState.Loading,
error: undefined,
@ -709,11 +662,6 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
const subscription = queryObservable.subscribe({
complete: () => {
observer.next({
state: LoadingState.Done,
error: undefined,
data: rawLogsVolume,
});
observer.complete();
},
next: (dataQueryResponse: DataQueryResponse) => {
@ -726,24 +674,34 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
});
observer.error(error);
} else {
const aggregatedLogsVolume = aggregateRawLogsVolume(
dataQueryResponse.data.map(toDataFrame),
options.extractLevel
);
if (aggregatedLogsVolume[0]) {
aggregatedLogsVolume[0].meta = {
const framesByRefId = groupBy(dataQueryResponse.data, 'refId');
logsVolumeData = dataQueryResponse.data.map((dataFrame) => {
let sourceRefId = dataFrame.refId || '';
if (sourceRefId.startsWith('log-volume-')) {
sourceRefId = sourceRefId.substr('log-volume-'.length);
}
const logsVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
datasourceName: datasource.name,
sourceQuery: options.targets.find((dataQuery) => dataQuery.refId === sourceRefId)!,
};
dataFrame.meta = {
...dataFrame.meta,
custom: {
targets: options.targets,
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
...dataFrame.meta?.custom,
...logsVolumeCustomMetaData,
},
};
}
rawLogsVolume = aggregatedLogsVolume;
return updateLogsVolumeConfig(dataFrame, options.extractLevel, framesByRefId[dataFrame.refId].length === 1);
});
observer.next({
state: dataQueryResponse.state ?? LoadingState.Streaming,
state: dataQueryResponse.state,
error: undefined,
data: rawLogsVolume,
data: logsVolumeData,
});
}
},

View File

@ -48,7 +48,7 @@ import { LogRows } from '../logs/components/LogRows';
import { LogsMetaRow } from './LogsMetaRow';
import LogsNavigation from './LogsNavigation';
import { LogsVolumePanel } from './LogsVolumePanel';
import { LogsVolumePanelList } from './LogsVolumePanelList';
import { SETTINGS_KEYS } from './utils/logs';
interface Props extends Themeable2 {
@ -324,13 +324,10 @@ class UnthemedLogs extends PureComponent<Props, State> {
splitOpen,
logRows,
logsMeta,
logsSeries,
visibleRange,
logsVolumeEnabled,
logsVolumeData,
loadLogsVolumeData,
loading = false,
loadingState,
onClickFilterLabel,
onClickFilterOutLabel,
timeZone,
@ -377,19 +374,10 @@ class UnthemedLogs extends PureComponent<Props, State> {
<>
<Collapse label="Logs volume" collapsible isOpen={logsVolumeEnabled} onToggle={this.onToggleLogsVolumeCollapse}>
{logsVolumeEnabled && (
<LogsVolumePanel
<LogsVolumePanelList
absoluteRange={absoluteRange}
width={width}
logsVolumeData={logsVolumeData}
logLinesBasedData={
logsSeries
? {
data: logsSeries,
state: loadingState,
}
: undefined
}
logLinesBasedDataVisibleRange={visibleRange}
onUpdateTimeRange={onChangeTime}
timeZone={timeZone}
splitOpen={splitOpen}

View File

@ -12,7 +12,7 @@ import {
SupplementaryQueryType,
} from '@grafana/data';
import { reportInteraction } from '@grafana/runtime';
import { TimeZone, DataQuery } from '@grafana/schema';
import { DataQuery, TimeZone } from '@grafana/schema';
import { Button, Collapse, useStyles2 } from '@grafana/ui';
import { dataFrameToLogsModel } from 'app/core/logsModel';
import store from 'app/core/store';
@ -111,11 +111,11 @@ export function LogsSamplePanel(props: Props) {
);
}
return (
return queryResponse?.state !== LoadingState.NotStarted ? (
<Collapse label="Logs sample" isOpen={enabled} collapsible={true} onToggle={onToggleLogsSampleCollapse}>
{LogsSamplePanelContent}
</Collapse>
);
) : null;
}
const getStyles = (theme: GrafanaTheme2) => ({

View File

@ -21,8 +21,6 @@ function renderPanel(logsVolumeData?: DataQueryResponse) {
width={100}
onUpdateTimeRange={() => {}}
logsVolumeData={logsVolumeData}
logLinesBasedData={undefined}
logLinesBasedDataVisibleRange={undefined}
onLoadLogsVolume={() => {}}
onHiddenSeriesChanged={() => null}
eventBus={new EventBusSrv()}
@ -31,11 +29,6 @@ function renderPanel(logsVolumeData?: DataQueryResponse) {
}
describe('LogsVolumePanel', () => {
it('shows loading message', () => {
renderPanel({ state: LoadingState.Loading, error: undefined, data: [] });
expect(screen.getByText('Log volume is loading...')).toBeInTheDocument();
});
it('shows no volume data', () => {
renderPanel({ state: LoadingState.Done, error: undefined, data: [] });
expect(screen.getByText('No volume data.')).toBeInTheDocument();
@ -46,25 +39,6 @@ describe('LogsVolumePanel', () => {
expect(screen.getByText('ExploreGraph')).toBeInTheDocument();
});
it('shows short warning message', () => {
renderPanel({ state: LoadingState.Error, error: { data: { message: 'Test error message' } }, data: [] });
expect(screen.getByText('Failed to load log volume for this query')).toBeInTheDocument();
expect(screen.getByText('Test error message')).toBeInTheDocument();
});
it('shows long warning message', () => {
// we make a long message
const messagePart = 'One two three four five six seven eight nine ten.';
const message = messagePart + ' ' + messagePart + ' ' + messagePart;
renderPanel({ state: LoadingState.Error, error: { data: { message } }, data: [] });
expect(screen.getByText('Failed to load log volume for this query')).toBeInTheDocument();
expect(screen.queryByText(message)).not.toBeInTheDocument();
const button = screen.getByText('Show details');
button.click();
expect(screen.getByText(message)).toBeInTheDocument();
});
it('does not show the panel when there is no volume data', () => {
renderPanel(undefined);
expect(screen.queryByText('Log volume')).not.toBeInTheDocument();

View File

@ -4,23 +4,22 @@ import React from 'react';
import {
AbsoluteTimeRange,
DataQueryResponse,
GrafanaTheme2,
LoadingState,
SplitOpen,
TimeZone,
EventBus,
LogsVolumeType,
isLogsVolumeLimited,
getLogsVolumeAbsoluteRange,
GrafanaTheme2,
getLogsVolumeDataSourceInfo,
} from '@grafana/data';
import { Button, Collapse, Icon, InlineField, Tooltip, TooltipDisplayMode, useStyles2, useTheme2 } from '@grafana/ui';
import { Icon, Tooltip, TooltipDisplayMode, useStyles2, useTheme2 } from '@grafana/ui';
import { ExploreGraph } from './Graph/ExploreGraph';
import { SupplementaryResultError } from './SupplementaryResultError';
type Props = {
logsVolumeData: DataQueryResponse | undefined;
absoluteRange: AbsoluteTimeRange;
logLinesBasedData: DataQueryResponse | undefined;
logLinesBasedDataVisibleRange: AbsoluteTimeRange | undefined;
timeZone: TimeZone;
splitOpen: SplitOpen;
width: number;
@ -31,7 +30,7 @@ type Props = {
};
export function LogsVolumePanel(props: Props) {
const { width, timeZone, splitOpen, onUpdateTimeRange, onLoadLogsVolume, onHiddenSeriesChanged } = props;
const { width, timeZone, splitOpen, onUpdateTimeRange, onHiddenSeriesChanged } = props;
const theme = useTheme2();
const styles = useStyles2(getStyles);
const spacing = parseInt(theme.spacing(2).slice(0, -2), 10);
@ -42,17 +41,24 @@ export function LogsVolumePanel(props: Props) {
}
const logsVolumeData = props.logsVolumeData;
const range = logsVolumeData.data[0]?.meta?.custom?.absoluteRange || props.absoluteRange;
if (logsVolumeData.error !== undefined) {
return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />;
const logsVolumeInfo = getLogsVolumeDataSourceInfo(logsVolumeData?.data);
let extraInfo = logsVolumeInfo ? `${logsVolumeInfo.refId} (${logsVolumeInfo.name})` : '';
if (isLogsVolumeLimited(logsVolumeData.data)) {
extraInfo = [
extraInfo,
'This datasource does not support full-range histograms. The graph below is based on the logs seen in the response.',
].join('. ');
}
const range = isLogsVolumeLimited(logsVolumeData.data)
? getLogsVolumeAbsoluteRange(logsVolumeData.data, props.absoluteRange)
: props.absoluteRange;
let LogsVolumePanelContent;
if (logsVolumeData?.state === LoadingState.Loading) {
LogsVolumePanelContent = <span>Log volume is loading...</span>;
} else if (logsVolumeData?.data) {
if (logsVolumeData?.data) {
if (logsVolumeData.data.length > 0) {
LogsVolumePanelContent = (
<ExploreGraph
@ -76,41 +82,24 @@ export function LogsVolumePanel(props: Props) {
}
}
let extraInfo;
if (logsVolumeData.data[0]?.meta?.custom?.logsVolumeType !== LogsVolumeType.Limited) {
const zoomRatio = logsLevelZoomRatio(logsVolumeData, range);
let extraInfoComponent = <span>{extraInfo}</span>;
if (zoomRatio !== undefined && zoomRatio < 1) {
extraInfo = (
<InlineField label="Reload log volume" transparent>
<Button size="xs" icon="sync" variant="secondary" onClick={onLoadLogsVolume} id="reload-volume" />
</InlineField>
);
}
} else {
extraInfo = (
<div className={styles.oldInfoText}>
This datasource does not support full-range histograms. The graph is based on the logs seen in the response.
</div>
);
}
if (logsVolumeData.state === LoadingState.Streaming) {
extraInfo = (
extraInfoComponent = (
<>
{extraInfo}
{extraInfoComponent}
<Tooltip content="Streaming">
<Icon name="circle-mono" size="md" className={styles.streaming} data-testid="logs-volume-streaming" />
</Tooltip>
</>
);
}
return (
<Collapse label="" isOpen={true}>
<div style={{ height }} className={styles.contentContainer}>
{LogsVolumePanelContent}
</div>
<div className={styles.extraInfoContainer}>{extraInfo}</div>
</Collapse>
<div style={{ height }} className={styles.contentContainer}>
{LogsVolumePanelContent}
{extraInfoComponent && <div className={styles.extraInfoContainer}>{extraInfoComponent}</div>}
</div>
);
}
@ -121,27 +110,18 @@ const getStyles = (theme: GrafanaTheme2) => {
justify-content: end;
position: absolute;
right: 5px;
top: 5px;
top: -10px;
font-size: ${theme.typography.bodySmall.fontSize};
color: ${theme.colors.text.secondary};
`,
contentContainer: css`
display: flex;
align-items: center;
justify-content: center;
`,
oldInfoText: css`
font-size: ${theme.typography.size.sm};
color: ${theme.colors.text.secondary};
position: relative;
`,
streaming: css`
color: ${theme.colors.success.text};
`,
};
};
function logsLevelZoomRatio(
logsVolumeData: DataQueryResponse | undefined,
selectedTimeRange: AbsoluteTimeRange
): number | undefined {
const dataRange = logsVolumeData && logsVolumeData.data[0] && logsVolumeData.data[0].meta?.custom?.absoluteRange;
return dataRange ? (selectedTimeRange.from - selectedTimeRange.to) / (dataRange.from - dataRange.to) : undefined;
}

View File

@ -0,0 +1,54 @@
import { fireEvent, render, screen } from '@testing-library/react';
import React from 'react';
import { DataQueryResponse, LoadingState, EventBusSrv } from '@grafana/data';
import { LogsVolumePanelList } from './LogsVolumePanelList';
jest.mock('./Graph/ExploreGraph', () => {
const ExploreGraph = () => <span>ExploreGraph</span>;
return {
ExploreGraph,
};
});
function renderPanel(logsVolumeData?: DataQueryResponse) {
render(
<LogsVolumePanelList
absoluteRange={{ from: 0, to: 1 }}
timeZone="timeZone"
splitOpen={() => {}}
width={100}
onUpdateTimeRange={() => {}}
logsVolumeData={logsVolumeData}
onLoadLogsVolume={() => {}}
onHiddenSeriesChanged={() => null}
eventBus={new EventBusSrv()}
/>
);
}
describe('LogsVolumePanelList', () => {
it('shows loading message', () => {
renderPanel({ state: LoadingState.Loading, error: undefined, data: [] });
expect(screen.getByText('Loading...')).toBeInTheDocument();
});
it('shows short warning message', () => {
renderPanel({ state: LoadingState.Error, error: { data: { message: 'Test error message' } }, data: [] });
expect(screen.getByText('Failed to load log volume for this query')).toBeInTheDocument();
expect(screen.getByText('Test error message')).toBeInTheDocument();
});
it('shows long warning message', () => {
// we make a long message
const messagePart = 'One two three four five six seven eight nine ten.';
const message = messagePart + ' ' + messagePart + ' ' + messagePart;
renderPanel({ state: LoadingState.Error, error: { data: { message } }, data: [] });
expect(screen.getByText('Failed to load log volume for this query')).toBeInTheDocument();
expect(screen.queryByText(message)).not.toBeInTheDocument();
fireEvent.click(screen.getByRole('button', { name: 'Show details' }));
expect(screen.getByText(message)).toBeInTheDocument();
});
});

View File

@ -0,0 +1,120 @@
import { css } from '@emotion/css';
import { groupBy } from 'lodash';
import React, { useMemo } from 'react';
import {
AbsoluteTimeRange,
DataFrame,
DataQueryResponse,
EventBus,
GrafanaTheme2,
isLogsVolumeLimited,
LoadingState,
SplitOpen,
TimeZone,
} from '@grafana/data';
import { Button, InlineField, useStyles2 } from '@grafana/ui';
import { LogsVolumePanel } from './LogsVolumePanel';
import { SupplementaryResultError } from './SupplementaryResultError';
type Props = {
logsVolumeData: DataQueryResponse | undefined;
absoluteRange: AbsoluteTimeRange;
timeZone: TimeZone;
splitOpen: SplitOpen;
width: number;
onUpdateTimeRange: (timeRange: AbsoluteTimeRange) => void;
onLoadLogsVolume: () => void;
onHiddenSeriesChanged: (hiddenSeries: string[]) => void;
eventBus: EventBus;
};
export const LogsVolumePanelList = ({
logsVolumeData,
absoluteRange,
onUpdateTimeRange,
width,
onLoadLogsVolume,
onHiddenSeriesChanged,
eventBus,
splitOpen,
timeZone,
}: Props) => {
const logVolumes = useMemo(
() => groupBy(logsVolumeData?.data || [], 'meta.custom.sourceQuery.refId'),
[logsVolumeData]
);
const styles = useStyles2(getStyles);
const numberOfLogVolumes = Object.keys(logVolumes).length;
const containsZoomed = Object.values(logVolumes).some((data: DataFrame[]) => {
const zoomRatio = logsLevelZoomRatio(data, absoluteRange);
return !isLogsVolumeLimited(data) && zoomRatio && zoomRatio < 1;
});
if (logsVolumeData?.state === LoadingState.Loading) {
return <span>Loading...</span>;
}
if (logsVolumeData?.error !== undefined) {
return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />;
}
return (
<div className={styles.listContainer}>
{Object.keys(logVolumes).map((name, index) => {
const logsVolumeData = { data: logVolumes[name] };
return (
<LogsVolumePanel
key={index}
absoluteRange={absoluteRange}
width={width}
logsVolumeData={logsVolumeData}
onUpdateTimeRange={onUpdateTimeRange}
timeZone={timeZone}
splitOpen={splitOpen}
onLoadLogsVolume={onLoadLogsVolume}
// TODO: Support filtering level from multiple log levels
onHiddenSeriesChanged={numberOfLogVolumes > 1 ? () => {} : onHiddenSeriesChanged}
eventBus={eventBus}
/>
);
})}
{containsZoomed && (
<div className={styles.extraInfoContainer}>
<InlineField label="Reload log volume" transparent>
<Button size="xs" icon="sync" variant="secondary" onClick={onLoadLogsVolume} id="reload-volume" />
</InlineField>
</div>
)}
</div>
);
};
const getStyles = (theme: GrafanaTheme2) => {
return {
listContainer: css`
padding-top: 10px;
`,
extraInfoContainer: css`
display: flex;
justify-content: end;
position: absolute;
right: 5px;
top: 5px;
`,
oldInfoText: css`
font-size: ${theme.typography.bodySmall.fontSize};
color: ${theme.colors.text.secondary};
`,
};
};
function logsLevelZoomRatio(
logsVolumeData: DataFrame[] | undefined,
selectedTimeRange: AbsoluteTimeRange
): number | undefined {
const dataRange = logsVolumeData && logsVolumeData[0] && logsVolumeData[0].meta?.custom?.absoluteRange;
return dataRange ? (selectedTimeRange.from - selectedTimeRange.to) / (dataRange.from - dataRange.to) : undefined;
}

View File

@ -0,0 +1,37 @@
import { Observable, of } from 'rxjs';
import { getDefaultTimeRange, LoadingState, LogsModel } from '@grafana/data';
import { ExplorePanelData } from '../../../types';
type MockProps = {
logsResult?: Partial<LogsModel>;
};
export const mockExplorePanelData = (props?: MockProps): Observable<ExplorePanelData> => {
const data: ExplorePanelData = {
flameGraphFrames: [],
graphFrames: [],
graphResult: [],
logsFrames: [],
logsResult: {
hasUniqueLabels: false,
rows: [],
meta: [],
series: [],
queries: [],
...(props?.logsResult || {}),
},
nodeGraphFrames: [],
rawPrometheusFrames: [],
rawPrometheusResult: null,
series: [],
state: LoadingState.Done,
tableFrames: [],
tableResult: [],
timeRange: getDefaultTimeRange(),
traceFrames: [],
};
return of(data);
};

View File

@ -1,11 +1,12 @@
import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit';
import deepEqual from 'fast-deep-equal';
import { flatten, groupBy, snakeCase } from 'lodash';
import { flatten, groupBy, head, map, mapValues, snakeCase, zipObject } from 'lodash';
import { combineLatest, identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs';
import { mergeMap, throttleTime } from 'rxjs/operators';
import {
AbsoluteTimeRange,
DataFrame,
DataQueryErrorType,
DataQueryResponse,
DataSourceApi,
@ -43,11 +44,14 @@ import { notifyApp } from '../../../core/actions';
import { createErrorNotification } from '../../../core/copy/appNotification';
import { runRequest } from '../../query/state/runRequest';
import { decorateData } from '../utils/decorators';
import { storeSupplementaryQueryEnabled, supplementaryQueryTypes } from '../utils/supplementaryQueries';
import {
storeSupplementaryQueryEnabled,
supplementaryQueryTypes,
getSupplementaryQueryProvider,
} from '../utils/supplementaryQueries';
import { addHistoryItem, historyUpdatedAction, loadRichHistory } from './history';
import { stateSave } from './main';
import { getSupplementaryQueryProvider } from './supplementaryQueries';
import { updateTime } from './time';
import { createCacheKey, getResultsFromCache } from './utils';
@ -674,21 +678,36 @@ export const runQueries = (
*/
function canReuseSupplementaryQueryData(
supplementaryQueryData: DataQueryResponse | undefined,
queries: DataQuery[],
newQueries: DataQuery[],
selectedTimeRange: AbsoluteTimeRange
): boolean {
if (supplementaryQueryData && supplementaryQueryData.data[0]) {
// check if queries are the same
if (!deepEqual(supplementaryQueryData.data[0].meta?.custom?.targets, queries)) {
return false;
}
const dataRange = supplementaryQueryData.data[0].meta?.custom?.absoluteRange;
// if selected range is within loaded logs volume
if (dataRange && dataRange.from <= selectedTimeRange.from && selectedTimeRange.to <= dataRange.to) {
if (!supplementaryQueryData) {
return false;
}
const newQueriesByRefId = zipObject(map(newQueries, 'refId'), newQueries);
const existingDataByRefId = mapValues(
groupBy(
supplementaryQueryData.data.map((dataFrame: DataFrame) => dataFrame.meta?.custom?.sourceQuery),
'refId'
),
head
);
const allQueriesAreTheSame = deepEqual(newQueriesByRefId, existingDataByRefId);
const allResultsHaveWiderRange = supplementaryQueryData.data.every((data: DataFrame) => {
const dataRange = data.meta?.custom?.absoluteRange;
// Only first data frame in the response may contain the absolute range
if (!dataRange) {
return true;
}
}
return false;
const hasWiderRange = dataRange && dataRange.from <= selectedTimeRange.from && selectedTimeRange.to <= dataRange.to;
return hasWiderRange;
});
return allQueriesAreTheSame && allResultsHaveWiderRange;
}
/**

View File

@ -1,49 +0,0 @@
import { Observable } from 'rxjs';
import {
DataSourceApi,
SupplementaryQueryType,
DataQueryResponse,
hasSupplementaryQuerySupport,
DataQueryRequest,
LoadingState,
LogsVolumeType,
} from '@grafana/data';
import { ExplorePanelData } from '../../../types';
export const getSupplementaryQueryProvider = (
datasourceInstance: DataSourceApi,
type: SupplementaryQueryType,
request: DataQueryRequest,
explorePanelData: Observable<ExplorePanelData>
): Observable<DataQueryResponse> | undefined => {
if (hasSupplementaryQuerySupport(datasourceInstance, type)) {
return datasourceInstance.getDataProvider(type, request);
} else if (type === SupplementaryQueryType.LogsVolume) {
// Create a fallback to results based logs volume
return new Observable<DataQueryResponse>((observer) => {
explorePanelData.subscribe((exploreData) => {
if (exploreData.logsResult?.series && exploreData.logsResult?.visibleRange) {
observer.next({
data: exploreData.logsResult.series.map((d) => {
const custom = d.meta?.custom || {};
return {
...d,
meta: {
custom: {
...custom,
logsVolumeType: LogsVolumeType.Limited,
absoluteRange: exploreData.logsResult?.visibleRange,
},
},
};
}),
state: LoadingState.Done,
});
}
});
});
}
return undefined;
};

View File

@ -1,17 +1,6 @@
import { lastValueFrom } from 'rxjs';
import {
ArrayVector,
DataFrame,
DataQueryRequest,
FieldColorModeId,
FieldType,
LoadingState,
PanelData,
getDefaultTimeRange,
toDataFrame,
} from '@grafana/data';
import { GraphDrawStyle, StackingMode } from '@grafana/schema';
import { DataFrame, FieldType, LoadingState, PanelData, getDefaultTimeRange, toDataFrame } from '@grafana/data';
import TableModel from 'app/core/TableModel';
import { ExplorePanelData } from 'app/types';
@ -314,112 +303,6 @@ describe('decorateWithTableResult', () => {
});
describe('decorateWithLogsResult', () => {
it('should correctly transform logs dataFrames', () => {
const { logs } = getTestContext();
const request = { timezone: 'utc', intervalMs: 60000 } as unknown as DataQueryRequest;
const panelData = createExplorePanelData({ logsFrames: [logs], request });
expect(decorateWithLogsResult()(panelData).logsResult).toEqual({
hasUniqueLabels: false,
meta: [],
rows: [
{
rowIndex: 0,
dataFrame: logs,
entry: 'this is a message',
entryFieldIndex: 3,
hasAnsi: false,
hasUnescapedContent: false,
labels: {},
logLevel: 'unknown',
raw: 'this is a message',
searchWords: [],
timeEpochMs: 100,
timeEpochNs: '100000002',
timeFromNow: 'fromNow() jest mocked',
timeLocal: 'format() jest mocked',
timeUtc: 'format() jest mocked',
uid: '0',
uniqueLabels: {},
},
{
rowIndex: 2,
dataFrame: logs,
entry: 'third',
entryFieldIndex: 3,
hasAnsi: false,
hasUnescapedContent: false,
labels: {},
logLevel: 'unknown',
raw: 'third',
searchWords: [],
timeEpochMs: 100,
timeEpochNs: '100000001',
timeFromNow: 'fromNow() jest mocked',
timeLocal: 'format() jest mocked',
timeUtc: 'format() jest mocked',
uid: '2',
uniqueLabels: {},
},
{
rowIndex: 1,
dataFrame: logs,
entry: 'second message',
entryFieldIndex: 3,
hasAnsi: false,
hasUnescapedContent: false,
labels: {},
logLevel: 'unknown',
raw: 'second message',
searchWords: [],
timeEpochMs: 100,
timeEpochNs: '100000000',
timeFromNow: 'fromNow() jest mocked',
timeLocal: 'format() jest mocked',
timeUtc: 'format() jest mocked',
uid: '1',
uniqueLabels: {},
},
],
series: [
{
name: 'unknown',
length: 1,
fields: [
{ name: 'Time', type: 'time', values: new ArrayVector([0]), config: {} },
{
name: 'Value',
type: 'number',
labels: undefined,
values: new ArrayVector([3]),
config: {
color: {
fixedColor: '#8e8e8e',
mode: FieldColorModeId.Fixed,
},
min: 0,
decimals: 0,
unit: undefined,
custom: {
drawStyle: GraphDrawStyle.Bars,
barAlignment: 0,
barMaxWidth: 5,
barWidthFactor: 0.9,
lineColor: '#8e8e8e',
fillColor: '#8e8e8e',
pointColor: '#8e8e8e',
lineWidth: 0,
fillOpacity: 100,
stacking: { mode: StackingMode.Normal, group: 'A' },
},
},
},
],
},
],
visibleRange: undefined,
});
});
it('returns null if passed empty array', () => {
const panelData = createExplorePanelData({ logsFrames: [] });
expect(decorateWithLogsResult()(panelData).logsResult).toBeNull();

View File

@ -0,0 +1,357 @@
import { flatten } from 'lodash';
import { from, Observable } from 'rxjs';
import {
DataFrame,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
DataSourceWithSupplementaryQueriesSupport,
FieldType,
LoadingState,
LogLevel,
LogsVolumeType,
MutableDataFrame,
SupplementaryQueryType,
toDataFrame,
} from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { DataQuery } from '@grafana/schema';
import { MockDataSourceApi } from '../../../../test/mocks/datasource_srv';
import { MockDataQueryRequest, MockQuery } from '../../../../test/mocks/query';
import { ExplorePanelData } from '../../../types';
import { mockExplorePanelData } from '../__mocks__/data';
import { getSupplementaryQueryProvider } from './supplementaryQueries';
class MockDataSourceWithSupplementaryQuerySupport
extends MockDataSourceApi
implements DataSourceWithSupplementaryQueriesSupport<DataQuery>
{
private supplementaryQueriesResults: Record<SupplementaryQueryType, DataFrame[] | undefined> = {
[SupplementaryQueryType.LogsVolume]: undefined,
[SupplementaryQueryType.LogsSample]: undefined,
};
withSupplementaryQuerySupport(type: SupplementaryQueryType, data: DataFrame[]) {
this.supplementaryQueriesResults[type] = data;
return this;
}
getDataProvider(
type: SupplementaryQueryType,
request: DataQueryRequest<DataQuery>
): Observable<DataQueryResponse> | undefined {
const data = this.supplementaryQueriesResults[type];
if (data) {
return from([
{ state: LoadingState.Loading, data: [] },
{ state: LoadingState.Done, data },
]);
}
return undefined;
}
getSupplementaryQuery(type: SupplementaryQueryType, query: DataQuery): DataQuery | undefined {
return query;
}
getSupportedSupplementaryQueryTypes(): SupplementaryQueryType[] {
return Object.values(SupplementaryQueryType).filter((type) => this.supplementaryQueriesResults[type]);
}
}
const createSupplementaryQueryResponse = (type: SupplementaryQueryType, id: string) => {
return [
toDataFrame({
refId: `1-${type}-${id}`,
fields: [{ name: 'value', type: FieldType.string, values: [1] }],
meta: {
custom: {
logsVolumeType: LogsVolumeType.FullRange,
},
},
}),
toDataFrame({
refId: `2-${type}-${id}`,
fields: [{ name: 'value', type: FieldType.string, values: [2] }],
meta: {
custom: {
logsVolumeType: LogsVolumeType.FullRange,
},
},
}),
];
};
const mockRow = (refId: string) => {
return {
rowIndex: 0,
entryFieldIndex: 0,
dataFrame: new MutableDataFrame({ refId, fields: [{ name: 'A', values: [] }] }),
entry: '',
hasAnsi: false,
hasUnescapedContent: false,
labels: {},
logLevel: LogLevel.info,
raw: '',
timeEpochMs: 0,
timeEpochNs: '0',
timeFromNow: '',
timeLocal: '',
timeUtc: '',
uid: '1',
};
};
const mockExploreDataWithLogs = () =>
mockExplorePanelData({
logsResult: {
rows: [mockRow('0'), mockRow('1')],
visibleRange: { from: 0, to: 1 },
bucketSize: 1000,
},
});
const datasources: DataSourceApi[] = [
new MockDataSourceWithSupplementaryQuerySupport('logs-volume-a').withSupplementaryQuerySupport(
SupplementaryQueryType.LogsVolume,
createSupplementaryQueryResponse(SupplementaryQueryType.LogsVolume, 'logs-volume-a')
),
new MockDataSourceWithSupplementaryQuerySupport('logs-volume-b').withSupplementaryQuerySupport(
SupplementaryQueryType.LogsVolume,
createSupplementaryQueryResponse(SupplementaryQueryType.LogsVolume, 'logs-volume-b')
),
new MockDataSourceWithSupplementaryQuerySupport('logs-sample-a').withSupplementaryQuerySupport(
SupplementaryQueryType.LogsSample,
createSupplementaryQueryResponse(SupplementaryQueryType.LogsSample, 'logs-sample-a')
),
new MockDataSourceWithSupplementaryQuerySupport('logs-sample-b').withSupplementaryQuerySupport(
SupplementaryQueryType.LogsSample,
createSupplementaryQueryResponse(SupplementaryQueryType.LogsSample, 'logs-sample-b')
),
new MockDataSourceApi('no-data-providers'),
new MockDataSourceApi('no-data-providers-2'),
new MockDataSourceApi('mixed').setupMixed(true),
];
jest.mock('@grafana/runtime', () => ({
...jest.requireActual('@grafana/runtime'),
getDataSourceSrv: () => {
return {
get: async ({ uid }: { uid: string }) => datasources.find((ds) => ds.name === uid) || undefined,
};
},
}));
const setup = async (rootDataSource: string, type: SupplementaryQueryType, targetSources?: string[]) => {
const rootDataSourceApiMock = await getDataSourceSrv().get({ uid: rootDataSource });
targetSources = targetSources || [rootDataSource];
const requestMock = new MockDataQueryRequest({
targets: targetSources.map((source, i) => new MockQuery(`${i}`, 'a', { uid: source })),
});
const explorePanelDataMock: Observable<ExplorePanelData> = mockExploreDataWithLogs();
return getSupplementaryQueryProvider(rootDataSourceApiMock, type, requestMock, explorePanelDataMock);
};
const assertDataFrom = (type: SupplementaryQueryType, ...datasources: string[]) => {
return flatten(
datasources.map((name: string) => {
return [{ refId: `1-${type}-${name}` }, { refId: `2-${type}-${name}` }];
})
);
};
const assertDataFromLogsResults = () => {
return [{ meta: { custom: { logsVolumeType: LogsVolumeType.Limited } } }];
};
describe('SupplementaryQueries utils', function () {
describe('Non-mixed data source', function () {
it('Returns result from the provider', async () => {
const testProvider = await setup('logs-volume-a', SupplementaryQueryType.LogsVolume);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading },
{
data: assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a'),
state: LoadingState.Done,
},
]);
});
});
it('Uses fallback for logs volume', async () => {
const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsVolume);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{
data: assertDataFromLogsResults(),
state: LoadingState.Done,
},
]);
});
});
it('Does not use a fallback for logs sample', async () => {
const testProvider = await setup('no-data-providers', SupplementaryQueryType.LogsSample);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{
state: LoadingState.NotStarted,
},
]);
});
});
});
describe('Mixed data source', function () {
describe('Logs volume', function () {
describe('All data sources support full range logs volume', function () {
it('Merges all data frames into a single response', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [
'logs-volume-a',
'logs-volume-b',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading },
{
data: assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a'),
state: LoadingState.Done,
},
{
data: assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a', 'logs-volume-b'),
state: LoadingState.Done,
},
]);
});
});
});
describe('All data sources do not support full range logs volume', function () {
it('Creates single fallback result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [
'no-data-providers',
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{
data: assertDataFromLogsResults(),
state: LoadingState.Done,
},
{
data: [...assertDataFromLogsResults(), ...assertDataFromLogsResults()],
state: LoadingState.Done,
},
]);
});
});
});
describe('Some data sources support full range logs volume, while others do not', function () {
it('Creates merged result containing full range and limited logs volume', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsVolume, [
'logs-volume-a',
'no-data-providers',
'logs-volume-b',
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{
data: [],
state: LoadingState.Loading,
},
{
data: assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a'),
state: LoadingState.Done,
},
{
data: [
...assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a'),
...assertDataFromLogsResults(),
],
state: LoadingState.Done,
},
{
data: [
...assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-a'),
...assertDataFromLogsResults(),
...assertDataFrom(SupplementaryQueryType.LogsVolume, 'logs-volume-b'),
],
state: LoadingState.Done,
},
]);
});
});
});
});
describe('Logs sample', function () {
describe('All data sources support logs sample', function () {
it('Merges all responses into single result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [
'logs-sample-a',
'logs-sample-b',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading },
{
data: assertDataFrom(SupplementaryQueryType.LogsSample, 'logs-sample-a'),
state: LoadingState.Done,
},
{
data: assertDataFrom(SupplementaryQueryType.LogsSample, 'logs-sample-a', 'logs-sample-b'),
state: LoadingState.Done,
},
]);
});
});
});
describe('All data sources do not support full range logs volume', function () {
it('Does not provide fallback result', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [
'no-data-providers',
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([{ state: LoadingState.NotStarted, data: [] }]);
});
});
});
describe('Some data sources support full range logs volume, while others do not', function () {
it('Returns results only for data sources supporting logs sample', async () => {
const testProvider = await setup('mixed', SupplementaryQueryType.LogsSample, [
'logs-sample-a',
'no-data-providers',
'logs-sample-b',
'no-data-providers-2',
]);
await expect(testProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ data: [], state: LoadingState.Loading },
{
data: assertDataFrom(SupplementaryQueryType.LogsSample, 'logs-sample-a'),
state: LoadingState.Done,
},
{
data: assertDataFrom(SupplementaryQueryType.LogsSample, 'logs-sample-a', 'logs-sample-b'),
state: LoadingState.Done,
},
]);
});
});
});
});
});
});

View File

@ -1,6 +1,23 @@
import { SupplementaryQueryType } from '@grafana/data';
import { cloneDeep, groupBy } from 'lodash';
import { distinct, from, mergeMap, Observable, of } from 'rxjs';
import { scan } from 'rxjs/operators';
import {
DataQuery,
DataQueryRequest,
DataQueryResponse,
DataSourceApi,
hasSupplementaryQuerySupport,
LoadingState,
LogsVolumeCustomMetaData,
LogsVolumeType,
SupplementaryQueryType,
} from '@grafana/data';
import { getDataSourceSrv } from '@grafana/runtime';
import { makeDataFramesForLogs } from 'app/core/logsModel';
import store from 'app/core/store';
import { SupplementaryQueries } from 'app/types';
import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource';
import { ExplorePanelData, SupplementaryQueries } from 'app/types';
export const supplementaryQueryTypes: SupplementaryQueryType[] = [
SupplementaryQueryType.LogsVolume,
@ -49,3 +66,156 @@ export const loadSupplementaryQueries = (): SupplementaryQueries => {
}
return supplementaryQueries;
};
const createFallbackLogVolumeProvider = (
explorePanelData: Observable<ExplorePanelData>,
queryTargets: DataQuery[],
datasourceName: string
): Observable<DataQueryResponse> => {
return new Observable<DataQueryResponse>((observer) => {
explorePanelData.subscribe((exploreData) => {
if (
exploreData.logsResult &&
exploreData.logsResult.rows &&
exploreData.logsResult.visibleRange &&
exploreData.logsResult.bucketSize !== undefined &&
exploreData.state === LoadingState.Done
) {
const bucketSize = exploreData.logsResult.bucketSize;
const targetRefIds = queryTargets.map((query) => query.refId);
const rowsByRefId = groupBy(exploreData.logsResult.rows, 'dataFrame.refId');
targetRefIds.forEach((refId) => {
if (rowsByRefId[refId]?.length) {
const series = makeDataFramesForLogs(rowsByRefId[refId], bucketSize);
const logVolumeCustomMetaData: LogsVolumeCustomMetaData = {
logsVolumeType: LogsVolumeType.Limited,
absoluteRange: exploreData.logsResult?.visibleRange!,
datasourceName,
sourceQuery: queryTargets.find((query) => query.refId === refId)!,
};
observer.next({
data: series.map((d) => {
const custom = d.meta?.custom || {};
return {
...d,
meta: {
custom: {
...custom,
...logVolumeCustomMetaData,
},
},
};
}),
state: exploreData.state,
});
}
});
observer.complete();
}
});
});
};
const getSupplementaryQueryFallback = (
type: SupplementaryQueryType,
explorePanelData: Observable<ExplorePanelData>,
queryTargets: DataQuery[],
datasourceName: string
) => {
if (type === SupplementaryQueryType.LogsVolume) {
return createFallbackLogVolumeProvider(explorePanelData, queryTargets, datasourceName);
} else {
return of({
data: [],
state: LoadingState.NotStarted,
});
}
};
export const getSupplementaryQueryProvider = (
datasourceInstance: DataSourceApi,
type: SupplementaryQueryType,
request: DataQueryRequest,
explorePanelData: Observable<ExplorePanelData>
): Observable<DataQueryResponse> | undefined => {
if (hasSupplementaryQuerySupport(datasourceInstance, type)) {
return datasourceInstance.getDataProvider(type, request);
} else if (datasourceInstance.meta?.mixed === true) {
const queries = request.targets.filter((t) => {
return t.datasource?.uid !== MIXED_DATASOURCE_NAME;
});
// Build groups of queries to run in parallel
const sets: { [key: string]: DataQuery[] } = groupBy(queries, 'datasource.uid');
const mixed: Array<{ datasource: Promise<DataSourceApi>; targets: DataQuery[] }> = [];
for (const key in sets) {
const targets = sets[key];
mixed.push({
datasource: getDataSourceSrv().get(targets[0].datasource, request.scopedVars),
targets,
});
}
return from(mixed).pipe(
mergeMap((query, i) => {
return from(query.datasource).pipe(
mergeMap((ds) => {
const dsRequest = cloneDeep(request);
dsRequest.requestId = `mixed-${type}-${i}-${dsRequest.requestId || ''}`;
dsRequest.targets = query.targets;
if (hasSupplementaryQuerySupport(ds, type)) {
const dsProvider = ds.getDataProvider(type, dsRequest);
if (dsProvider) {
// 1) It provides data for current request - use the provider
return dsProvider;
} else {
// 2) It doesn't provide data for current request -> return nothing
return of({
data: [],
state: LoadingState.NotStarted,
});
}
} else {
// 3) Data source doesn't support the supplementary query -> use fallback
// the fallback cannot determine data availability based on request, it
// works on the results once they are available so it never uses the cache
return getSupplementaryQueryFallback(type, explorePanelData, query.targets, ds.name);
}
})
);
}),
scan<DataQueryResponse, DataQueryResponse>(
(acc, next) => {
if (acc.error || next.state === LoadingState.NotStarted) {
return acc;
}
if (next.state === LoadingState.Loading && acc.state === LoadingState.NotStarted) {
return {
...acc,
state: LoadingState.Loading,
};
}
if (next.state && next.state !== LoadingState.Done) {
return acc;
}
return {
...acc,
data: [...acc.data, ...next.data],
state: LoadingState.Done,
};
},
{ data: [], state: LoadingState.NotStarted }
),
distinct()
);
} else {
// Create a fallback to results based logs volume
return getSupplementaryQueryFallback(type, explorePanelData, request.targets, datasourceInstance.name);
}
return undefined;
};

View File

@ -60,6 +60,12 @@ export class MockDataSourceApi extends DataSourceApi {
testDatasource() {
return Promise.resolve();
}
setupMixed(value: boolean) {
this.meta = this.meta || {};
this.meta.mixed = value;
return this;
}
}
export class MockObservableDataSourceApi extends DataSourceApi {

View File

@ -0,0 +1,30 @@
import { CoreApp, DataQueryRequest, getDefaultTimeRange } from '@grafana/data';
import { DataQuery, DataSourceRef } from '@grafana/schema';
export class MockQuery implements DataQuery {
refId: string;
testQuery: string;
datasource?: DataSourceRef;
constructor(refId = 'A', testQuery = '', datasourceRef?: DataSourceRef) {
this.refId = refId;
this.testQuery = testQuery;
this.datasource = datasourceRef;
}
}
export class MockDataQueryRequest implements DataQueryRequest<MockQuery> {
app = CoreApp.Unknown;
interval = '';
intervalMs = 0;
range = getDefaultTimeRange();
requestId = '1';
scopedVars = {};
startTime = 0;
targets: MockQuery[];
timezone = 'utc';
constructor({ targets }: { targets: MockQuery[] }) {
this.targets = targets || [];
}
}