Explore: Consolidate logs volume logic (full range and limited) (#61995)

* Consolidate logs volume logic (full range and limited)

* Fix showing limited histogram message

* Test passing meta data to logs volume provider

* Improve readability

* Clean up types

* Move the comment back to the right place

* Improve readability
This commit is contained in:
Piotr Jamróz 2023-02-07 14:32:06 +01:00 committed by GitHub
parent b4d2eae759
commit 999c836753
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 179 additions and 128 deletions

View File

@ -195,6 +195,16 @@ export enum SupplementaryQueryType {
LogsSample = 'LogsSample',
}
/**
* Types of logs volume responses. A data source may return full range histogram (based on selected range)
* or limited (based on returned results). This information is attached to DataFrame.meta.custom object.
* @internal
*/
export enum LogsVolumeType {
FullRange = 'FullRange',
Limited = 'Limited',
}
/**
* Data sources that support supplementary queries in Explore.
* This will enable users to see additional data when running original queries.

View File

@ -13,6 +13,7 @@ import {
LogRowModel,
LogsDedupStrategy,
LogsMetaKind,
LogsVolumeType,
MutableDataFrame,
toDataFrame,
} from '@grafana/data';
@ -31,6 +32,9 @@ import {
queryLogsSample,
} from './logsModel';
const FROM = dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' });
const TO = dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' });
describe('dedupLogRows()', () => {
test('should return rows as is when dedup is set to none', () => {
const rows = [
@ -1154,8 +1158,8 @@ describe('logs volume', () => {
return dataFrame.fields[1]!.labels!.level === 'error' ? LogLevel.error : LogLevel.unknown;
},
range: {
from: dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' }),
to: dateTimeParse('2021-06-17 00:00:00', { timeZone: 'utc' }),
from: FROM,
to: TO,
raw: { from: '0', to: '1' },
},
targets: request.targets,
@ -1186,6 +1190,43 @@ describe('logs volume', () => {
datasource = new MockObservableDataSourceApi('loki', [], undefined, 'Error message');
}
it('applies correct meta data', async () => {
setup(setupMultipleResults);
await expect(volumeProvider).toEmitValuesWith((received) => {
expect(received).toMatchObject([
{ state: LoadingState.Loading, error: undefined, data: [] },
{
state: LoadingState.Done,
error: undefined,
data: [
{
fields: expect.anything(),
meta: {
custom: {
targets: [
{
target: 'volume query 1',
},
{
target: 'volume query 2',
},
],
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: {
from: FROM.valueOf(),
to: TO.valueOf(),
},
},
},
},
expect.anything(),
],
},
]);
});
});
it('aggregates data frames by level', async () => {
setup(setupMultipleResults);

View File

@ -1,5 +1,5 @@
import { size } from 'lodash';
import { Observable, from, isObservable } from 'rxjs';
import { from, isObservable, Observable } from 'rxjs';
import {
AbsoluteTimeRange,
@ -26,6 +26,7 @@ import {
LogsMetaItem,
LogsMetaKind,
LogsModel,
LogsVolumeType,
MutableDataFrame,
rangeUtil,
ScopedVars,
@ -710,6 +711,7 @@ export function queryLogsVolume<TQuery extends DataQuery, TOptions extends DataS
aggregatedLogsVolume[0].meta = {
custom: {
targets: options.targets,
logsVolumeType: LogsVolumeType.FullRange,
absoluteRange: { from: options.range.from.valueOf(), to: options.range.to.valueOf() },
},
};

View File

@ -9,6 +9,7 @@ import {
SplitOpen,
TimeZone,
EventBus,
LogsVolumeType,
} from '@grafana/data';
import { Button, Collapse, InlineField, TooltipDisplayMode, useStyles2, useTheme2 } from '@grafana/ui';
@ -29,37 +30,6 @@ type Props = {
eventBus: EventBus;
};
function createVisualisationData(
logLinesBased: DataQueryResponse | undefined,
logLinesBasedVisibleRange: AbsoluteTimeRange | undefined,
fullRangeData: DataQueryResponse | undefined,
absoluteRange: AbsoluteTimeRange
):
| {
logsVolumeData: DataQueryResponse;
fullRangeData: boolean;
range: AbsoluteTimeRange;
}
| undefined {
if (fullRangeData !== undefined) {
return {
logsVolumeData: fullRangeData,
fullRangeData: true,
range: absoluteRange,
};
}
if (logLinesBased !== undefined) {
return {
logsVolumeData: logLinesBased,
fullRangeData: false,
range: logLinesBasedVisibleRange || absoluteRange,
};
}
return undefined;
}
export function LogsVolumePanel(props: Props) {
const { width, timeZone, splitOpen, onUpdateTimeRange, onLoadLogsVolume, onHiddenSeriesChanged } = props;
const theme = useTheme2();
@ -67,18 +37,12 @@ export function LogsVolumePanel(props: Props) {
const spacing = parseInt(theme.spacing(2).slice(0, -2), 10);
const height = 150;
const data = createVisualisationData(
props.logLinesBasedData,
props.logLinesBasedDataVisibleRange,
props.logsVolumeData,
props.absoluteRange
);
if (data === undefined) {
if (props.logsVolumeData === undefined) {
return null;
}
const { logsVolumeData, fullRangeData, range } = data;
const logsVolumeData = props.logsVolumeData;
const range = logsVolumeData.data[0]?.meta?.custom?.absoluteRange || props.absoluteRange;
if (logsVolumeData.error !== undefined) {
return <SupplementaryResultError error={logsVolumeData.error} title="Failed to load log volume for this query" />;
@ -113,7 +77,7 @@ export function LogsVolumePanel(props: Props) {
}
let extraInfo;
if (fullRangeData) {
if (logsVolumeData.data[0]?.meta?.custom?.logsVolumeType !== LogsVolumeType.Limited) {
const zoomRatio = logsLevelZoomRatio(logsVolumeData, range);
if (zoomRatio !== undefined && zoomRatio < 1) {

View File

@ -1,7 +1,7 @@
import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit';
import deepEqual from 'fast-deep-equal';
import { flatten, groupBy, snakeCase } from 'lodash';
import { identity, Observable, of, SubscriptionLike, Unsubscribable, combineLatest } from 'rxjs';
import { combineLatest, identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs';
import { mergeMap, throttleTime } from 'rxjs/operators';
import {
@ -9,16 +9,15 @@ import {
DataQueryErrorType,
DataQueryResponse,
DataSourceApi,
hasSupplementaryQuerySupport,
SupplementaryQueryType,
hasLogsVolumeSupport,
hasQueryExportSupport,
hasQueryImportSupport,
HistoryItem,
LoadingState,
PanelEvents,
QueryFixAction,
SupplementaryQueryType,
toLegacyResponseData,
hasLogsVolumeSupport,
} from '@grafana/data';
import { config, getDataSourceSrv, reportInteraction } from '@grafana/runtime';
import { DataQuery } from '@grafana/schema';
@ -48,6 +47,7 @@ import { storeSupplementaryQueryEnabled, supplementaryQueryTypes } from '../util
import { addHistoryItem, historyUpdatedAction, loadRichHistory } from './history';
import { stateSave } from './main';
import { getSupplementaryQueryProvider } from './supplementaryQueries';
import { updateTime } from './time';
import { createCacheKey, getResultsFromCache } from './utils';
@ -447,7 +447,8 @@ export const runQueries = (
cache,
supplementaryQueries,
} = exploreItemState;
let newQuerySub;
let newQuerySource: Observable<ExplorePanelData>;
let newQuerySubscription: SubscriptionLike;
const queries = exploreItemState.queries.map((query) => ({
...query,
@ -464,29 +465,19 @@ export const runQueries = (
// If we have results saved in cache, we are going to use those results instead of running queries
if (cachedValue) {
newQuerySub = combineLatest([of(cachedValue), correlations$])
.pipe(
mergeMap(([data, correlations]) =>
decorateData(
data,
queryResponse,
absoluteRange,
refreshInterval,
queries,
correlations,
datasourceInstance != null &&
(hasSupplementaryQuerySupport(datasourceInstance, SupplementaryQueryType.LogsVolume) ||
hasLogsVolumeSupport(datasourceInstance))
)
)
newQuerySource = combineLatest([of(cachedValue), correlations$]).pipe(
mergeMap(([data, correlations]) =>
decorateData(data, queryResponse, absoluteRange, refreshInterval, queries, correlations)
)
.subscribe((data) => {
if (!data.error) {
dispatch(stateSave());
}
);
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
});
newQuerySubscription = newQuerySource.subscribe((data) => {
if (!data.error) {
dispatch(stateSave());
}
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
});
// If we don't have results saved in cache, run new queries
} else {
@ -522,64 +513,54 @@ export const runQueries = (
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Loading }));
newQuerySub = combineLatest([
newQuerySource = combineLatest([
runRequest(datasourceInstance, transaction.request)
// Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
// rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
// actually can see what is happening.
.pipe(live ? throttleTime(500) : identity),
correlations$,
])
.pipe(
mergeMap(([data, correlations]) =>
decorateData(
data,
queryResponse,
absoluteRange,
refreshInterval,
queries,
correlations,
datasourceInstance != null &&
(hasSupplementaryQuerySupport(datasourceInstance, SupplementaryQueryType.LogsVolume) ||
hasLogsVolumeSupport(datasourceInstance))
)
)
]).pipe(
mergeMap(([data, correlations]) =>
decorateData(data, queryResponse, absoluteRange, refreshInterval, queries, correlations)
)
.subscribe({
next(data) {
if (data.logsResult !== null) {
reportInteraction('grafana_explore_logs_result_displayed', {
datasourceType: datasourceInstance.type,
});
}
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
);
// Keep scanning for results if this was the last scanning transaction
if (getState().explore[exploreId]!.scanning) {
if (data.state === LoadingState.Done && data.series.length === 0) {
const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
dispatch(updateTime({ exploreId, absoluteRange: range }));
dispatch(runQueries(exploreId));
} else {
// We can stop scanning if we have a result
dispatch(scanStopAction({ exploreId }));
}
newQuerySubscription = newQuerySource.subscribe({
next(data) {
if (data.logsResult !== null) {
reportInteraction('grafana_explore_logs_result_displayed', {
datasourceType: datasourceInstance.type,
});
}
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
// Keep scanning for results if this was the last scanning transaction
if (getState().explore[exploreId]!.scanning) {
if (data.state === LoadingState.Done && data.series.length === 0) {
const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
dispatch(updateTime({ exploreId, absoluteRange: range }));
dispatch(runQueries(exploreId));
} else {
// We can stop scanning if we have a result
dispatch(scanStopAction({ exploreId }));
}
},
error(error) {
dispatch(notifyApp(createErrorNotification('Query processing error', error)));
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error }));
console.error(error);
},
complete() {
// In case we don't get any response at all but the observable completed, make sure we stop loading state.
// This is for cases when some queries are noop like running first query after load but we don't have any
// actual query input.
if (getState().explore[exploreId]!.queryResponse.state === LoadingState.Loading) {
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Done }));
}
},
});
}
},
error(error) {
dispatch(notifyApp(createErrorNotification('Query processing error', error)));
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error }));
console.error(error);
},
complete() {
// In case we don't get any response at all but the observable completed, make sure we stop loading state.
// This is for cases when some queries are noop like running first query after load but we don't have any
// actual query input.
if (getState().explore[exploreId]!.queryResponse.state === LoadingState.Loading) {
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Done }));
}
},
});
if (live) {
for (const type of supplementaryQueryTypes) {
@ -595,11 +576,17 @@ export const runQueries = (
for (const type of supplementaryQueryTypes) {
// We always prepare provider, even is supplementary query is disabled because when the user
// enables the query, we need to load the data, so we need the provider
if (hasSupplementaryQuerySupport(datasourceInstance, type)) {
const dataProvider = datasourceInstance.getDataProvider(type, {
const dataProvider = getSupplementaryQueryProvider(
datasourceInstance,
type,
{
...transaction.request,
requestId: `${transaction.request.requestId}_${snakeCase(type)}`,
});
},
newQuerySource
);
if (dataProvider) {
dispatch(
storeSupplementaryQueryDataProviderAction({
exploreId,
@ -648,7 +635,7 @@ export const runQueries = (
}
}
dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub }));
dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySubscription }));
};
};

View File

@ -0,0 +1,49 @@
import { Observable } from 'rxjs';
import {
DataSourceApi,
SupplementaryQueryType,
DataQueryResponse,
hasSupplementaryQuerySupport,
DataQueryRequest,
LoadingState,
LogsVolumeType,
} from '@grafana/data';
import { ExplorePanelData } from '../../../types';
export const getSupplementaryQueryProvider = (
datasourceInstance: DataSourceApi,
type: SupplementaryQueryType,
request: DataQueryRequest,
explorePanelData: Observable<ExplorePanelData>
): Observable<DataQueryResponse> | undefined => {
if (hasSupplementaryQuerySupport(datasourceInstance, type)) {
return datasourceInstance.getDataProvider(type, request);
} else if (type === SupplementaryQueryType.LogsVolume) {
// Create a fallback to results based logs volume
return new Observable<DataQueryResponse>((observer) => {
explorePanelData.subscribe((exploreData) => {
if (exploreData.logsResult?.series && exploreData.logsResult?.visibleRange) {
observer.next({
data: exploreData.logsResult.series.map((d) => {
const custom = d.meta?.custom || {};
return {
...d,
meta: {
custom: {
...custom,
logsVolumeType: LogsVolumeType.Limited,
absoluteRange: exploreData.logsResult?.visibleRange,
},
},
};
}),
state: LoadingState.Done,
});
}
});
});
}
return undefined;
};

View File

@ -223,7 +223,6 @@ export const decorateWithLogsResult =
absoluteRange?: AbsoluteTimeRange;
refreshInterval?: string;
queries?: DataQuery[];
fullRangeLogsVolumeAvailable?: boolean;
} = {}
) =>
(data: ExplorePanelData): ExplorePanelData => {
@ -236,7 +235,7 @@ export const decorateWithLogsResult =
const sortOrder = refreshIntervalToSortOrder(options.refreshInterval);
const sortedNewResults = sortLogsResult(newResults, sortOrder);
const rows = sortedNewResults.rows;
const series = options.fullRangeLogsVolumeAvailable ? undefined : sortedNewResults.series;
const series = sortedNewResults.series;
const logsResult = { ...sortedNewResults, rows, series };
return { ...data, logsResult };
@ -249,8 +248,7 @@ export function decorateData(
absoluteRange: AbsoluteTimeRange,
refreshInterval: string | undefined,
queries: DataQuery[] | undefined,
correlations: CorrelationData[] | undefined,
fullRangeLogsVolumeAvailable: boolean
correlations: CorrelationData[] | undefined
): Observable<ExplorePanelData> {
return of(data).pipe(
map((data: PanelData) => preProcessPanelData(data, queryResponse)),
@ -258,7 +256,7 @@ export function decorateData(
map(decorateWithFrameTypeMetadata),
map(decorateWithGraphResult),
map(decorateWithGraphResult),
map(decorateWithLogsResult({ absoluteRange, refreshInterval, queries, fullRangeLogsVolumeAvailable })),
map(decorateWithLogsResult({ absoluteRange, refreshInterval, queries })),
mergeMap(decorateWithRawPrometheusResult),
mergeMap(decorateWithTableResult)
);