grafana/public/app/features/explore/state/query.ts
Piotr Jamróz ad757b48e7
Explore: Improve local storage error handling when rich history is added (#39943)
* Rich History: improve local storage error handling

* Reduce number of max items and update docs

* Rotate not-starred items and add tests

* Add missing property to initial state

* Unify date in richHistory tests

* Show a warning message that rich history limit has been reached

* Add missing param
2021-10-11 09:48:25 +02:00

854 lines
26 KiB
TypeScript

import { mergeMap, throttleTime } from 'rxjs/operators';
import { identity, Observable, of, SubscriptionLike, Unsubscribable } from 'rxjs';
import {
DataQuery,
DataQueryErrorType,
DataQueryResponse,
DataSourceApi,
hasLogsVolumeSupport,
LoadingState,
PanelData,
PanelEvents,
QueryFixAction,
toLegacyResponseData,
} from '@grafana/data';
import {
buildQueryTransaction,
ensureQueries,
generateEmptyQuery,
generateNewKeyAndAddRefIdIfMissing,
getQueryKeys,
hasNonEmptyQuery,
stopQueryState,
updateHistory,
} from 'app/core/utils/explore';
import { addToRichHistory } from 'app/core/utils/richHistory';
import { ExploreItemState, ExplorePanelData, ThunkResult } from 'app/types';
import { ExploreId, QueryOptions } from 'app/types/explore';
import { getTimeZone } from 'app/features/profile/state/selectors';
import { getShiftedTimeRange } from 'app/core/utils/timePicker';
import { notifyApp } from '../../../core/actions';
import { runRequest } from '../../query/state/runRequest';
import { decorateData } from '../utils/decorators';
import { createErrorNotification } from '../../../core/copy/appNotification';
import {
localStorageFullAction,
richHistoryLimitExceededAction,
richHistoryUpdatedAction,
stateSave,
storeAutoLoadLogsVolumeAction,
} from './main';
import { AnyAction, createAction, PayloadAction } from '@reduxjs/toolkit';
import { updateTime } from './time';
import { historyUpdatedAction } from './history';
import { createCacheKey, createEmptyQueryResponse, getResultsFromCache } from './utils';
import { config } from '@grafana/runtime';
//
// Actions and Payloads
//
/**
* Adds a query row after the row with the given index.
*/
export interface AddQueryRowPayload {
exploreId: ExploreId;
index: number;
query: DataQuery;
}
export const addQueryRowAction = createAction<AddQueryRowPayload>('explore/addQueryRow');
/**
* Query change handler for the query row with the given index.
* If `override` is reset the query modifications and run the queries. Use this to set queries via a link.
*/
export interface ChangeQueriesPayload {
exploreId: ExploreId;
queries: DataQuery[];
}
export const changeQueriesAction = createAction<ChangeQueriesPayload>('explore/changeQueries');
/**
* Clear all queries and results.
*/
export interface ClearQueriesPayload {
exploreId: ExploreId;
}
export const clearQueriesAction = createAction<ClearQueriesPayload>('explore/clearQueries');
/**
* Cancel running queries.
*/
export const cancelQueriesAction = createAction<ClearQueriesPayload>('explore/cancelQueries');
export interface QueriesImportedPayload {
exploreId: ExploreId;
queries: DataQuery[];
}
export const queriesImportedAction = createAction<QueriesImportedPayload>('explore/queriesImported');
/**
* Action to modify a query given a datasource-specific modifier action.
* @param exploreId Explore area
* @param modification Action object with a type, e.g., ADD_FILTER
* @param index Optional query row index. If omitted, the modification is applied to all query rows.
* @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
*/
export interface ModifyQueriesPayload {
exploreId: ExploreId;
modification: QueryFixAction;
index?: number;
modifier: (query: DataQuery, modification: QueryFixAction) => DataQuery;
}
export const modifyQueriesAction = createAction<ModifyQueriesPayload>('explore/modifyQueries');
export interface QueryStoreSubscriptionPayload {
exploreId: ExploreId;
querySubscription: Unsubscribable;
}
export const queryStoreSubscriptionAction = createAction<QueryStoreSubscriptionPayload>(
'explore/queryStoreSubscription'
);
export interface StoreLogsVolumeDataProvider {
exploreId: ExploreId;
logsVolumeDataProvider?: Observable<DataQueryResponse>;
}
/**
* Stores available logs volume provider after running the query. Used internally by runQueries().
*/
const storeLogsVolumeDataProviderAction = createAction<StoreLogsVolumeDataProvider>(
'explore/storeLogsVolumeDataProviderAction'
);
export interface StoreLogsVolumeDataSubscriptionPayload {
exploreId: ExploreId;
logsVolumeDataSubscription?: SubscriptionLike;
}
/**
* Stores current logs volume subscription for given explore pane.
*/
const storeLogsVolumeDataSubscriptionAction = createAction<StoreLogsVolumeDataSubscriptionPayload>(
'explore/storeLogsVolumeDataSubscriptionAction'
);
/**
* Stores data returned by the provider. Used internally by loadLogsVolumeData().
*/
const updateLogsVolumeDataAction = createAction<{
exploreId: ExploreId;
logsVolumeData: DataQueryResponse;
}>('explore/updateLogsVolumeDataAction');
export interface QueryEndedPayload {
exploreId: ExploreId;
response: ExplorePanelData;
}
export const queryStreamUpdatedAction = createAction<QueryEndedPayload>('explore/queryStreamUpdated');
/**
* Reset queries to the given queries. Any modifications will be discarded.
* Use this action for clicks on query examples. Triggers a query run.
*/
export interface SetQueriesPayload {
exploreId: ExploreId;
queries: DataQuery[];
}
export const setQueriesAction = createAction<SetQueriesPayload>('explore/setQueries');
export interface ChangeLoadingStatePayload {
exploreId: ExploreId;
loadingState: LoadingState;
}
export const changeLoadingStateAction = createAction<ChangeLoadingStatePayload>('changeLoadingState');
export interface SetPausedStatePayload {
exploreId: ExploreId;
isPaused: boolean;
}
export const setPausedStateAction = createAction<SetPausedStatePayload>('explore/setPausedState');
/**
* Start a scan for more results using the given scanner.
* @param exploreId Explore area
* @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
*/
export interface ScanStartPayload {
exploreId: ExploreId;
}
export const scanStartAction = createAction<ScanStartPayload>('explore/scanStart');
/**
* Stop any scanning for more results.
*/
export interface ScanStopPayload {
exploreId: ExploreId;
}
export const scanStopAction = createAction<ScanStopPayload>('explore/scanStop');
/**
* Adds query results to cache.
* This is currently used to cache last 5 query results for log queries run from logs navigation (pagination).
*/
export interface AddResultsToCachePayload {
exploreId: ExploreId;
cacheKey: string;
queryResponse: PanelData;
}
export const addResultsToCacheAction = createAction<AddResultsToCachePayload>('explore/addResultsToCache');
/**
* Clears cache.
*/
export interface ClearCachePayload {
exploreId: ExploreId;
}
export const clearCacheAction = createAction<ClearCachePayload>('explore/clearCache');
//
// Action creators
//
/**
* Adds a query row after the row with the given index.
*/
export function addQueryRow(exploreId: ExploreId, index: number): ThunkResult<void> {
return (dispatch, getState) => {
const queries = getState().explore[exploreId]!.queries;
const query = generateEmptyQuery(queries, index);
dispatch(addQueryRowAction({ exploreId, index, query }));
};
}
/**
* Clear all queries and results.
*/
export function clearQueries(exploreId: ExploreId): ThunkResult<void> {
return (dispatch) => {
dispatch(scanStopAction({ exploreId }));
dispatch(clearQueriesAction({ exploreId }));
dispatch(stateSave());
};
}
/**
* Cancel running queries
*/
export function cancelQueries(exploreId: ExploreId): ThunkResult<void> {
return (dispatch) => {
dispatch(scanStopAction({ exploreId }));
dispatch(cancelQueriesAction({ exploreId }));
dispatch(stateSave());
};
}
/**
* Import queries from previous datasource if possible eg Loki and Prometheus have similar query language so the
* labels part can be reused to get similar data.
* @param exploreId
* @param queries
* @param sourceDataSource
* @param targetDataSource
*/
export const importQueries = (
exploreId: ExploreId,
queries: DataQuery[],
sourceDataSource: DataSourceApi | undefined | null,
targetDataSource: DataSourceApi
): ThunkResult<void> => {
return async (dispatch) => {
if (!sourceDataSource) {
// explore not initialized
dispatch(queriesImportedAction({ exploreId, queries }));
return;
}
let importedQueries = queries;
// Check if queries can be imported from previously selected datasource
if (sourceDataSource.meta?.id === targetDataSource.meta?.id) {
// Keep same queries if same type of datasource, but delete datasource query property to prevent mismatch of new and old data source instance
importedQueries = queries.map(({ datasource, ...query }) => query);
} else if (targetDataSource.importQueries) {
// Datasource-specific importers
importedQueries = await targetDataSource.importQueries(queries, sourceDataSource);
} else {
// Default is blank queries
importedQueries = ensureQueries();
}
const nextQueries = ensureQueries(importedQueries);
dispatch(queriesImportedAction({ exploreId, queries: nextQueries }));
};
};
/**
* Action to modify a query given a datasource-specific modifier action.
* @param exploreId Explore area
* @param modification Action object with a type, e.g., ADD_FILTER
* @param index Optional query row index. If omitted, the modification is applied to all query rows.
* @param modifier Function that executes the modification, typically `datasourceInstance.modifyQueries`.
*/
export function modifyQueries(
exploreId: ExploreId,
modification: QueryFixAction,
modifier: any,
index?: number
): ThunkResult<void> {
return (dispatch) => {
dispatch(modifyQueriesAction({ exploreId, modification, index, modifier }));
if (!modification.preventSubmit) {
dispatch(runQueries(exploreId));
}
};
}
/**
* Main action to run queries and dispatches sub-actions based on which result viewers are active
*/
export const runQueries = (
exploreId: ExploreId,
options?: { replaceUrl?: boolean; preserveCache?: boolean }
): ThunkResult<void> => {
return (dispatch, getState) => {
dispatch(updateTime({ exploreId }));
// We always want to clear cache unless we explicitly pass preserveCache parameter
const preserveCache = options?.preserveCache === true;
if (!preserveCache) {
dispatch(clearCache(exploreId));
}
const { richHistory, autoLoadLogsVolume } = getState().explore;
const exploreItemState = getState().explore[exploreId]!;
const {
datasourceInstance,
queries,
containerWidth,
isLive: live,
range,
scanning,
queryResponse,
querySubscription,
history,
refreshInterval,
absoluteRange,
cache,
logsVolumeDataProvider,
} = exploreItemState;
let newQuerySub;
const cachedValue = getResultsFromCache(cache, absoluteRange);
// If we have results saved in cache, we are going to use those results instead of running queries
if (cachedValue) {
newQuerySub = of(cachedValue)
.pipe(
mergeMap((data: PanelData) =>
decorateData(data, queryResponse, absoluteRange, refreshInterval, queries, !!logsVolumeDataProvider)
)
)
.subscribe((data) => {
if (!data.error) {
dispatch(stateSave());
}
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
});
// If we don't have results saved in cache, run new queries
} else {
if (!hasNonEmptyQuery(queries)) {
dispatch(stateSave({ replace: options?.replaceUrl })); // Remember to save to state and update location
return;
}
if (!datasourceInstance) {
return;
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const minInterval = datasourceInstance?.interval;
stopQueryState(querySubscription);
const datasourceId = datasourceInstance?.meta.id;
const queryOptions: QueryOptions = {
minInterval,
// maxDataPoints is used in:
// Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
// Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
// Influx - used to correctly display logs in graph
// TODO:unification
// maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
maxDataPoints: containerWidth,
liveStreaming: live,
};
const datasourceName = datasourceInstance.name;
const timeZone = getTimeZone(getState().user);
const transaction = buildQueryTransaction(exploreId, queries, queryOptions, range, scanning, timeZone);
let firstResponse = true;
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Loading }));
newQuerySub = runRequest(datasourceInstance, transaction.request)
.pipe(
// Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
// rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
// actually can see what is happening.
live ? throttleTime(500) : identity,
mergeMap((data: PanelData) =>
decorateData(
data,
queryResponse,
absoluteRange,
refreshInterval,
queries,
!!getState().explore[exploreId]!.logsVolumeDataProvider
)
)
)
.subscribe(
(data) => {
if (!data.error && firstResponse) {
// Side-effect: Saving history in localstorage
const nextHistory = updateHistory(history, datasourceId, queries);
const { richHistory: nextRichHistory, localStorageFull, limitExceeded } = addToRichHistory(
richHistory || [],
datasourceId,
datasourceName,
queries,
false,
'',
'',
!getState().explore.localStorageFull,
!getState().explore.richHistoryLimitExceededWarningShown
);
dispatch(historyUpdatedAction({ exploreId, history: nextHistory }));
dispatch(richHistoryUpdatedAction({ richHistory: nextRichHistory }));
if (localStorageFull) {
dispatch(localStorageFullAction());
}
if (limitExceeded) {
dispatch(richHistoryLimitExceededAction());
}
// We save queries to the URL here so that only successfully run queries change the URL.
dispatch(stateSave({ replace: options?.replaceUrl }));
}
firstResponse = false;
dispatch(queryStreamUpdatedAction({ exploreId, response: data }));
// Keep scanning for results if this was the last scanning transaction
if (getState().explore[exploreId]!.scanning) {
if (data.state === LoadingState.Done && data.series.length === 0) {
const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
dispatch(updateTime({ exploreId, absoluteRange: range }));
dispatch(runQueries(exploreId));
} else {
// We can stop scanning if we have a result
dispatch(scanStopAction({ exploreId }));
}
}
},
(error) => {
dispatch(notifyApp(createErrorNotification('Query processing error', error)));
dispatch(changeLoadingStateAction({ exploreId, loadingState: LoadingState.Error }));
console.error(error);
}
);
if (config.featureToggles.fullRangeLogsVolume && hasLogsVolumeSupport(datasourceInstance)) {
const logsVolumeDataProvider = datasourceInstance.getLogsVolumeDataProvider(transaction.request);
dispatch(
storeLogsVolumeDataProviderAction({
exploreId,
logsVolumeDataProvider,
})
);
if (autoLoadLogsVolume && logsVolumeDataProvider) {
dispatch(loadLogsVolumeData(exploreId));
}
} else {
dispatch(
storeLogsVolumeDataProviderAction({
exploreId,
logsVolumeDataProvider: undefined,
})
);
}
}
dispatch(queryStoreSubscriptionAction({ exploreId, querySubscription: newQuerySub }));
};
};
/**
* Reset queries to the given queries. Any modifications will be discarded.
* Use this action for clicks on query examples. Triggers a query run.
*/
export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): ThunkResult<void> {
return (dispatch, getState) => {
// Inject react keys into query objects
const queries = getState().explore[exploreId]!.queries;
const nextQueries = rawQueries.map((query, index) => generateNewKeyAndAddRefIdIfMissing(query, queries, index));
dispatch(setQueriesAction({ exploreId, queries: nextQueries }));
dispatch(runQueries(exploreId));
};
}
/**
* Start a scan for more results using the given scanner.
* @param exploreId Explore area
* @param scanner Function that a) returns a new time range and b) triggers a query run for the new range
*/
export function scanStart(exploreId: ExploreId): ThunkResult<void> {
return (dispatch, getState) => {
// Register the scanner
dispatch(scanStartAction({ exploreId }));
// Scanning must trigger query run, and return the new range
const range = getShiftedTimeRange(-1, getState().explore[exploreId]!.range);
// Set the new range to be displayed
dispatch(updateTime({ exploreId, absoluteRange: range }));
dispatch(runQueries(exploreId));
};
}
export function addResultsToCache(exploreId: ExploreId): ThunkResult<void> {
return (dispatch, getState) => {
const queryResponse = getState().explore[exploreId]!.queryResponse;
const absoluteRange = getState().explore[exploreId]!.absoluteRange;
const cacheKey = createCacheKey(absoluteRange);
// Save results to cache only when all results recived and loading is done
if (queryResponse.state === LoadingState.Done) {
dispatch(addResultsToCacheAction({ exploreId, cacheKey, queryResponse }));
}
};
}
export function clearCache(exploreId: ExploreId): ThunkResult<void> {
return (dispatch, getState) => {
dispatch(clearCacheAction({ exploreId }));
};
}
/**
* Uses storeLogsVolumeDataProviderAction to update the state and load logs volume when auto-load
* is enabled and logs volume hasn't been loaded yet.
*/
export function changeAutoLogsVolume(exploreId: ExploreId, autoLoadLogsVolume: boolean): ThunkResult<void> {
return (dispatch, getState) => {
dispatch(storeAutoLoadLogsVolumeAction(autoLoadLogsVolume));
const state = getState().explore[exploreId]!;
// load logs volume automatically after switching
const logsVolumeData = state.logsVolumeData;
if (!logsVolumeData?.data && autoLoadLogsVolume) {
dispatch(loadLogsVolumeData(exploreId));
}
};
}
/**
* Initializes loading logs volume data and stores emitted value.
*/
export function loadLogsVolumeData(exploreId: ExploreId): ThunkResult<void> {
return (dispatch, getState) => {
const { logsVolumeDataProvider } = getState().explore[exploreId]!;
if (logsVolumeDataProvider) {
const logsVolumeDataSubscription = logsVolumeDataProvider.subscribe({
next: (logsVolumeData: DataQueryResponse) => {
dispatch(updateLogsVolumeDataAction({ exploreId, logsVolumeData }));
},
});
dispatch(storeLogsVolumeDataSubscriptionAction({ exploreId, logsVolumeDataSubscription }));
}
};
}
//
// Reducer
//
// Redux Toolkit uses ImmerJs as part of their solution to ensure that state objects are not mutated.
// ImmerJs has an autoFreeze option that freezes objects from change which means this reducer can't be migrated to createSlice
// because the state would become frozen and during run time we would get errors because flot (Graph lib) would try to mutate
// the frozen state.
// https://github.com/reduxjs/redux-toolkit/issues/242
export const queryReducer = (state: ExploreItemState, action: AnyAction): ExploreItemState => {
if (addQueryRowAction.match(action)) {
const { queries } = state;
const { index, query } = action.payload;
// Add to queries, which will cause a new row to be rendered
const nextQueries = [...queries.slice(0, index + 1), { ...query }, ...queries.slice(index + 1)];
return {
...state,
queries: nextQueries,
queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
};
}
if (changeQueriesAction.match(action)) {
const { queries } = action.payload;
return {
...state,
queries,
};
}
if (clearQueriesAction.match(action)) {
const queries = ensureQueries();
stopQueryState(state.querySubscription);
return {
...state,
queries: queries.slice(),
graphResult: null,
tableResult: null,
logsResult: null,
queryKeys: getQueryKeys(queries, state.datasourceInstance),
queryResponse: createEmptyQueryResponse(),
loading: false,
};
}
if (cancelQueriesAction.match(action)) {
stopQueryState(state.querySubscription);
return {
...state,
loading: false,
};
}
if (modifyQueriesAction.match(action)) {
const { queries } = state;
const { modification, index, modifier } = action.payload;
let nextQueries: DataQuery[];
if (index === undefined) {
// Modify all queries
nextQueries = queries.map((query, i) => {
const nextQuery = modifier({ ...query }, modification);
return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
});
} else {
// Modify query only at index
nextQueries = queries.map((query, i) => {
if (i === index) {
const nextQuery = modifier({ ...query }, modification);
return generateNewKeyAndAddRefIdIfMissing(nextQuery, queries, i);
}
return query;
});
}
return {
...state,
queries: nextQueries,
queryKeys: getQueryKeys(nextQueries, state.datasourceInstance),
};
}
if (setQueriesAction.match(action)) {
const { queries } = action.payload;
return {
...state,
queries: queries.slice(),
queryKeys: getQueryKeys(queries, state.datasourceInstance),
};
}
if (queriesImportedAction.match(action)) {
const { queries } = action.payload;
return {
...state,
queries,
queryKeys: getQueryKeys(queries, state.datasourceInstance),
};
}
if (queryStoreSubscriptionAction.match(action)) {
const { querySubscription } = action.payload;
return {
...state,
querySubscription,
};
}
if (storeLogsVolumeDataProviderAction.match(action)) {
let { logsVolumeDataProvider } = action.payload;
if (state.logsVolumeDataSubscription) {
state.logsVolumeDataSubscription.unsubscribe();
}
return {
...state,
logsVolumeDataProvider,
logsVolumeDataSubscription: undefined,
// clear previous data, with a new provider the previous data becomes stale
logsVolumeData: undefined,
};
}
if (storeLogsVolumeDataSubscriptionAction.match(action)) {
const { logsVolumeDataSubscription } = action.payload;
return {
...state,
logsVolumeDataSubscription,
};
}
if (updateLogsVolumeDataAction.match(action)) {
let { logsVolumeData } = action.payload;
return {
...state,
logsVolumeData,
};
}
if (queryStreamUpdatedAction.match(action)) {
return processQueryResponse(state, action);
}
if (queriesImportedAction.match(action)) {
const { queries } = action.payload;
return {
...state,
queries,
queryKeys: getQueryKeys(queries, state.datasourceInstance),
};
}
if (changeLoadingStateAction.match(action)) {
const { loadingState } = action.payload;
return {
...state,
queryResponse: {
...state.queryResponse,
state: loadingState,
},
loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
};
}
if (setPausedStateAction.match(action)) {
const { isPaused } = action.payload;
return {
...state,
isPaused: isPaused,
};
}
if (scanStartAction.match(action)) {
return { ...state, scanning: true };
}
if (scanStopAction.match(action)) {
return {
...state,
scanning: false,
scanRange: undefined,
};
}
if (addResultsToCacheAction.match(action)) {
const CACHE_LIMIT = 5;
const { cache } = state;
const { queryResponse, cacheKey } = action.payload;
let newCache = [...cache];
const isDuplicateKey = newCache.some((c) => c.key === cacheKey);
if (!isDuplicateKey) {
const newCacheItem = { key: cacheKey, value: queryResponse };
newCache = [newCacheItem, ...newCache].slice(0, CACHE_LIMIT);
}
return {
...state,
cache: newCache,
};
}
if (clearCacheAction.match(action)) {
return {
...state,
cache: [],
};
}
return state;
};
export const processQueryResponse = (
state: ExploreItemState,
action: PayloadAction<QueryEndedPayload>
): ExploreItemState => {
const { response } = action.payload;
const {
request,
state: loadingState,
series,
error,
graphResult,
logsResult,
tableResult,
traceFrames,
nodeGraphFrames,
} = response;
if (error) {
if (error.type === DataQueryErrorType.Timeout) {
return {
...state,
queryResponse: response,
loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
};
} else if (error.type === DataQueryErrorType.Cancelled) {
return state;
}
// Send error to Angular editors
if (state.datasourceInstance?.components?.QueryCtrl) {
state.eventBridge.emit(PanelEvents.dataError, error);
}
}
if (!request) {
return { ...state };
}
// Send legacy data to Angular editors
if (state.datasourceInstance?.components?.QueryCtrl) {
const legacy = series.map((v) => toLegacyResponseData(v));
state.eventBridge.emit(PanelEvents.dataReceived, legacy);
}
return {
...state,
queryResponse: response,
graphResult,
tableResult,
logsResult,
loading: loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming,
showLogs: !!logsResult,
showMetrics: !!graphResult,
showTable: !!tableResult,
showTrace: !!traceFrames.length,
showNodeGraph: !!nodeGraphFrames.length,
};
};