From fb39831df243386377d9188eedeb951c3e1c3698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hugo=20H=C3=A4ggmark?= Date: Mon, 3 Jun 2019 14:54:32 +0200 Subject: [PATCH] Explore: Queries the datasource once per run query and uses DataStreamObserver (#17263) * Refactor: Removes replaceUrl from actions * Refactor: Moves saveState thunk to epic * Refactor: Moves thunks to epics * Wip: removes resulttype and queries once * Refactor: LiveTailing uses observer in query * Refactor: Creates epics folder for epics and move back actioncreators * Tests: Adds tests for epics and reducer * Fix: Checks for undefined as well * Refactor: Cleans up previous live tailing implementation * Chore: merge with master * Fix: Fixes url issuses and prom graph in Panels * Refactor: Removes supportsStreaming and adds sockets to DataSourcePluginMeta instead * Refactor: Changes the way we create TimeSeries * Refactor: Renames sockets to streaming * Refactor: Changes the way Explore does incremental updates * Refactor: Removes unused method * Refactor: Adds back Loading indication --- .../components/SetInterval/SetInterval.tsx | 2 +- packages/grafana-ui/src/types/datasource.ts | 6 +- .../grafana-ui/src/utils/processSeriesData.ts | 1 + pkg/plugins/datasource_plugin.go | 1 + public/app/core/time_series2.ts | 2 +- public/app/core/utils/explore.ts | 99 +--- public/app/features/explore/Explore.tsx | 2 +- .../app/features/explore/ExploreToolbar.tsx | 10 +- .../app/features/explore/GraphContainer.tsx | 6 +- public/app/features/explore/LogsContainer.tsx | 5 +- public/app/features/explore/QueryRow.tsx | 13 +- .../app/features/explore/TableContainer.tsx | 8 +- .../app/features/explore/state/actionTypes.ts | 80 ++- public/app/features/explore/state/actions.ts | 302 +--------- .../app/features/explore/state/epics.test.ts | 550 ------------------ public/app/features/explore/state/epics.ts | 159 ----- .../state/epics/limitMessageRateEpic.ts | 25 + .../epics/processQueryErrorsEpic.test.ts | 67 +++ .../state/epics/processQueryErrorsEpic.ts | 40 ++ .../epics/processQueryResultsEpic.test.ts | 119 ++++ .../state/epics/processQueryResultsEpic.ts | 76 +++ .../state/epics/runQueriesBatchEpic.test.ts | 421 ++++++++++++++ .../state/epics/runQueriesBatchEpic.ts | 220 +++++++ .../state/epics/runQueriesEpic.test.ts | 71 +++ .../explore/state/epics/runQueriesEpic.ts | 39 ++ .../explore/state/epics/stateSaveEpic.test.ts | 61 ++ .../explore/state/epics/stateSaveEpic.ts | 72 +++ .../features/explore/state/reducers.test.ts | 8 +- public/app/features/explore/state/reducers.ts | 144 ++--- .../explore/utils/ResultProcessor.test.ts | 453 +++++++++++++++ .../features/explore/utils/ResultProcessor.ts | 176 ++++++ .../app/plugins/datasource/loki/datasource.ts | 200 ++++--- .../datasource/loki/language_provider.ts | 4 +- .../app/plugins/datasource/loki/plugin.json | 1 + public/app/plugins/datasource/loki/types.ts | 3 + .../prometheus/components/PromQueryField.tsx | 2 +- .../datasource/prometheus/datasource.ts | 157 ++++- .../plugins/datasource/prometheus/types.ts | 15 + public/app/store/configureStore.ts | 35 +- public/app/types/explore.ts | 23 +- public/test/core/redux/epicTester.ts | 59 +- public/test/mocks/mockExploreState.ts | 86 +++ 42 files changed, 2470 insertions(+), 1353 deletions(-) delete mode 100644 public/app/features/explore/state/epics.test.ts delete mode 100644 public/app/features/explore/state/epics.ts create mode 100644 public/app/features/explore/state/epics/limitMessageRateEpic.ts create mode 100644 public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts create mode 100644 public/app/features/explore/state/epics/processQueryErrorsEpic.ts create mode 100644 public/app/features/explore/state/epics/processQueryResultsEpic.test.ts create mode 100644 public/app/features/explore/state/epics/processQueryResultsEpic.ts create mode 100644 public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts create mode 100644 public/app/features/explore/state/epics/runQueriesBatchEpic.ts create mode 100644 public/app/features/explore/state/epics/runQueriesEpic.test.ts create mode 100644 public/app/features/explore/state/epics/runQueriesEpic.ts create mode 100644 public/app/features/explore/state/epics/stateSaveEpic.test.ts create mode 100644 public/app/features/explore/state/epics/stateSaveEpic.ts create mode 100644 public/app/features/explore/utils/ResultProcessor.test.ts create mode 100644 public/app/features/explore/utils/ResultProcessor.ts create mode 100644 public/test/mocks/mockExploreState.ts diff --git a/packages/grafana-ui/src/components/SetInterval/SetInterval.tsx b/packages/grafana-ui/src/components/SetInterval/SetInterval.tsx index cdcc1f406bb..026aa5600a1 100644 --- a/packages/grafana-ui/src/components/SetInterval/SetInterval.tsx +++ b/packages/grafana-ui/src/components/SetInterval/SetInterval.tsx @@ -38,7 +38,7 @@ export class SetInterval extends PureComponent { } componentDidUpdate(prevProps: Props) { - if (_.isEqual(prevProps, this.props)) { + if ((isLive(prevProps.interval) && isLive(this.props.interval)) || _.isEqual(prevProps, this.props)) { return; } diff --git a/packages/grafana-ui/src/types/datasource.ts b/packages/grafana-ui/src/types/datasource.ts index dbb80e2fdf5..a2629ec6f6d 100644 --- a/packages/grafana-ui/src/types/datasource.ts +++ b/packages/grafana-ui/src/types/datasource.ts @@ -83,7 +83,7 @@ export interface DataSourcePluginMeta extends PluginMeta { category?: string; queryOptions?: PluginMetaQueryOptions; sort?: number; - supportsStreaming?: boolean; + streaming?: boolean; /** * By default, hidden queries are not passed to the datasource @@ -164,10 +164,6 @@ export abstract class DataSourceApi< */ abstract query(options: DataQueryRequest, observer?: DataStreamObserver): Promise; - convertToStreamTargets?(options: DataQueryRequest): Array<{ url: string; refId: string }>; - - resultToSeriesData?(data: any, refId: string): SeriesData[]; - /** * Test & verify datasource settings & connection details */ diff --git a/packages/grafana-ui/src/utils/processSeriesData.ts b/packages/grafana-ui/src/utils/processSeriesData.ts index 84aadcc9f65..38e9abf9135 100644 --- a/packages/grafana-ui/src/utils/processSeriesData.ts +++ b/packages/grafana-ui/src/utils/processSeriesData.ts @@ -160,6 +160,7 @@ export const toLegacyResponseData = (series: SeriesData): TimeSeries | TableData const type = guessFieldTypeFromSeries(series, 1); if (type === FieldType.time) { return { + alias: fields[0].name || series.name, target: fields[0].name || series.name, datapoints: rows, unit: fields[0].unit, diff --git a/pkg/plugins/datasource_plugin.go b/pkg/plugins/datasource_plugin.go index 8c846839eda..1379daf5a6d 100644 --- a/pkg/plugins/datasource_plugin.go +++ b/pkg/plugins/datasource_plugin.go @@ -29,6 +29,7 @@ type DataSourcePlugin struct { BuiltIn bool `json:"builtIn,omitempty"` Mixed bool `json:"mixed,omitempty"` Routes []*AppPluginRoute `json:"routes"` + Streaming bool `json:"streaming"` Backend bool `json:"backend,omitempty"` Executable string `json:"executable,omitempty"` diff --git a/public/app/core/time_series2.ts b/public/app/core/time_series2.ts index 05815ab7ab3..d7a57b77afc 100644 --- a/public/app/core/time_series2.ts +++ b/public/app/core/time_series2.ts @@ -329,7 +329,7 @@ export default class TimeSeries { isMsResolutionNeeded() { for (let i = 0; i < this.datapoints.length; i++) { - if (this.datapoints[i][1] !== null) { + if (this.datapoints[i][1] !== null && this.datapoints[i][1] !== undefined) { const timestamp = this.datapoints[i][1].toString(); if (timestamp.length === 13 && timestamp % 1000 !== 0) { return true; diff --git a/public/app/core/utils/explore.ts b/public/app/core/utils/explore.ts index 99e168b8590..4a4697d7d0a 100644 --- a/public/app/core/utils/explore.ts +++ b/public/app/core/utils/explore.ts @@ -1,44 +1,35 @@ // Libraries import _ from 'lodash'; +import { from } from 'rxjs'; +import { toUtc } from '@grafana/ui/src/utils/moment_wrapper'; +import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; // Services & Utils import * as dateMath from '@grafana/ui/src/utils/datemath'; import { renderUrl } from 'app/core/utils/url'; import kbn from 'app/core/utils/kbn'; import store from 'app/core/store'; -import TableModel, { mergeTablesIntoModel } from 'app/core/table_model'; import { getNextRefIdChar } from './query'; // Types import { - colors, TimeRange, RawTimeRange, TimeZone, IntervalValues, DataQuery, DataSourceApi, - toSeriesData, - guessFieldTypes, TimeFragment, DataQueryError, LogRowModel, LogsModel, LogsDedupStrategy, + DataSourceJsonData, + DataQueryRequest, + DataStreamObserver, } from '@grafana/ui'; -import TimeSeries from 'app/core/time_series2'; -import { - ExploreUrlState, - HistoryItem, - QueryTransaction, - ResultType, - QueryIntervals, - QueryOptions, - ResultGetter, -} from 'app/types/explore'; -import { seriesDataToLogsModel } from 'app/core/logs_model'; -import { toUtc } from '@grafana/ui/src/utils/moment_wrapper'; -import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; +import { ExploreUrlState, HistoryItem, QueryTransaction, QueryIntervals, QueryOptions } from 'app/types/explore'; +import { config } from '../config'; export const DEFAULT_RANGE = { from: 'now-6h', @@ -116,7 +107,6 @@ export async function getExploreUrl( export function buildQueryTransaction( queries: DataQuery[], - resultType: ResultType, queryOptions: QueryOptions, range: TimeRange, queryIntervals: QueryIntervals, @@ -137,7 +127,7 @@ export function buildQueryTransaction( // Using `format` here because it relates to the view panel that the request is for. // However, some datasources don't use `panelId + query.refId`, but only `panelId`. // Therefore panel id has to be unique. - const panelId = `${queryOptions.format}-${key}`; + const panelId = `${key}`; const options = { interval, @@ -156,7 +146,6 @@ export function buildQueryTransaction( return { queries, options, - resultType, scanning, id: generateKey(), // reusing for unique ID done: false, @@ -328,28 +317,6 @@ export function hasNonEmptyQuery(queries: TQuery ); } -export function calculateResultsFromQueryTransactions(result: any, resultType: ResultType, graphInterval: number) { - const flattenedResult: any[] = _.flatten(result); - const graphResult = resultType === 'Graph' && result ? result : null; - const tableResult = - resultType === 'Table' && result - ? mergeTablesIntoModel( - new TableModel(), - ...flattenedResult.filter((r: any) => r.columns && r.rows).map((r: any) => r as TableModel) - ) - : mergeTablesIntoModel(new TableModel()); - const logsResult = - resultType === 'Logs' && result - ? seriesDataToLogsModel(flattenedResult.map(r => guessFieldTypes(toSeriesData(r))), graphInterval) - : null; - - return { - graphResult, - tableResult, - logsResult, - }; -} - export function getIntervals(range: TimeRange, lowLimit: string, resolution: number): IntervalValues { if (!resolution) { return { interval: '1s', intervalMs: 1000 }; @@ -358,37 +325,6 @@ export function getIntervals(range: TimeRange, lowLimit: string, resolution: num return kbn.calculateInterval(range, resolution, lowLimit); } -export const makeTimeSeriesList: ResultGetter = (dataList, transaction, allTransactions) => { - // Prevent multiple Graph transactions to have the same colors - let colorIndexOffset = 0; - for (const other of allTransactions) { - // Only need to consider transactions that came before the current one - if (other === transaction) { - break; - } - // Count timeseries of previous query results - if (other.resultType === 'Graph' && other.done) { - colorIndexOffset += other.result.length; - } - } - - return dataList.map((seriesData, index: number) => { - const datapoints = seriesData.datapoints || []; - const alias = seriesData.target; - const colorIndex = (colorIndexOffset + index) % colors.length; - const color = colors[colorIndex]; - - const series = new TimeSeries({ - datapoints, - alias, - color, - unit: seriesData.unit, - }); - - return series; - }); -}; - /** * Update the query history. Side-effect: store history in local storage */ @@ -566,3 +502,20 @@ export const sortLogsResult = (logsResult: LogsModel, refreshInterval: string) = return result; }; + +export const convertToWebSocketUrl = (url: string) => { + const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://'; + let backend = `${protocol}${window.location.host}${config.appSubUrl}`; + if (backend.endsWith('/')) { + backend = backend.slice(0, backend.length - 1); + } + return `${backend}${url}`; +}; + +export const getQueryResponse = ( + datasourceInstance: DataSourceApi, + options: DataQueryRequest, + observer?: DataStreamObserver +) => { + return from(datasourceInstance.query(options, observer)); +}; diff --git a/public/app/features/explore/Explore.tsx b/public/app/features/explore/Explore.tsx index eef4b8b21dc..8028e8362d7 100644 --- a/public/app/features/explore/Explore.tsx +++ b/public/app/features/explore/Explore.tsx @@ -51,11 +51,11 @@ import { } from 'app/core/utils/explore'; import { Emitter } from 'app/core/utils/emitter'; import { ExploreToolbar } from './ExploreToolbar'; -import { scanStopAction } from './state/actionTypes'; import { NoDataSourceCallToAction } from './NoDataSourceCallToAction'; import { FadeIn } from 'app/core/components/Animations/FadeIn'; import { getTimeZone } from '../profile/state/selectors'; import { ErrorContainer } from './ErrorContainer'; +import { scanStopAction } from './state/actionTypes'; interface ExploreProps { StartPage?: ComponentClass; diff --git a/public/app/features/explore/ExploreToolbar.tsx b/public/app/features/explore/ExploreToolbar.tsx index 9d6c4a1d3d9..9d3cb984120 100644 --- a/public/app/features/explore/ExploreToolbar.tsx +++ b/public/app/features/explore/ExploreToolbar.tsx @@ -10,6 +10,7 @@ import { TimeZone, TimeRange, SelectOptionItem, + LoadingState, } from '@grafana/ui'; import { DataSourcePicker } from 'app/core/components/Select/DataSourcePicker'; import { StoreState } from 'app/types/store'; @@ -261,9 +262,7 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps exploreDatasources, range, refreshInterval, - graphIsLoading, - logIsLoading, - tableIsLoading, + loadingState, supportedModes, mode, isLive, @@ -271,8 +270,9 @@ const mapStateToProps = (state: StoreState, { exploreId }: OwnProps): StateProps const selectedDatasource = datasourceInstance ? exploreDatasources.find(datasource => datasource.name === datasourceInstance.name) : undefined; - const loading = graphIsLoading || logIsLoading || tableIsLoading; - const hasLiveOption = datasourceInstance && datasourceInstance.convertToStreamTargets ? true : false; + const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming; + const hasLiveOption = + datasourceInstance && datasourceInstance.meta && datasourceInstance.meta.streaming ? true : false; const supportedModeOptions: Array> = []; let selectedModeOption = null; diff --git a/public/app/features/explore/GraphContainer.tsx b/public/app/features/explore/GraphContainer.tsx index 0fba2ae6ded..6d1bb6c4e38 100644 --- a/public/app/features/explore/GraphContainer.tsx +++ b/public/app/features/explore/GraphContainer.tsx @@ -1,7 +1,7 @@ import React, { PureComponent } from 'react'; import { hot } from 'react-hot-loader'; import { connect } from 'react-redux'; -import { TimeRange, TimeZone, AbsoluteTimeRange } from '@grafana/ui'; +import { TimeRange, TimeZone, AbsoluteTimeRange, LoadingState } from '@grafana/ui'; import { ExploreId, ExploreItemState } from 'app/types/explore'; import { StoreState } from 'app/types'; @@ -69,8 +69,8 @@ function mapStateToProps(state: StoreState, { exploreId }) { const explore = state.explore; const { split } = explore; const item: ExploreItemState = explore[exploreId]; - const { graphResult, graphIsLoading, range, showingGraph, showingTable } = item; - const loading = graphIsLoading; + const { graphResult, loadingState, range, showingGraph, showingTable } = item; + const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming; return { graphResult, loading, range, showingGraph, showingTable, split, timeZone: getTimeZone(state.user) }; } diff --git a/public/app/features/explore/LogsContainer.tsx b/public/app/features/explore/LogsContainer.tsx index d8d85efcc13..79846e1d4bc 100644 --- a/public/app/features/explore/LogsContainer.tsx +++ b/public/app/features/explore/LogsContainer.tsx @@ -13,6 +13,7 @@ import { LogsModel, LogRowModel, LogsDedupStrategy, + LoadingState, } from '@grafana/ui'; import { ExploreId, ExploreItemState } from 'app/types/explore'; @@ -151,14 +152,14 @@ function mapStateToProps(state: StoreState, { exploreId }) { const { logsHighlighterExpressions, logsResult, - logIsLoading, + loadingState, scanning, scanRange, range, datasourceInstance, isLive, } = item; - const loading = logIsLoading; + const loading = loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming; const { dedupStrategy } = exploreItemUIStateSelector(item); const hiddenLogLevels = new Set(item.hiddenLogLevels); const dedupedResult = deduplicatedLogsSelector(item); diff --git a/public/app/features/explore/QueryRow.tsx b/public/app/features/explore/QueryRow.tsx index 2a0429dbd97..49880c11230 100644 --- a/public/app/features/explore/QueryRow.tsx +++ b/public/app/features/explore/QueryRow.tsx @@ -20,7 +20,6 @@ import { QueryFixAction, DataSourceStatus, PanelData, - LoadingState, DataQueryError, } from '@grafana/ui'; import { HistoryItem, ExploreItemState, ExploreId } from 'app/types/explore'; @@ -180,9 +179,7 @@ function mapStateToProps(state: StoreState, { exploreId, index }: QueryRowProps) range, datasourceError, graphResult, - graphIsLoading, - tableIsLoading, - logIsLoading, + loadingState, latency, queryErrors, } = item; @@ -190,15 +187,9 @@ function mapStateToProps(state: StoreState, { exploreId, index }: QueryRowProps) const datasourceStatus = datasourceError ? DataSourceStatus.Disconnected : DataSourceStatus.Connected; const error = queryErrors.filter(queryError => queryError.refId === query.refId)[0]; const series = graphResult ? graphResult : []; // TODO: use SeriesData - const queryResponseState = - graphIsLoading || tableIsLoading || logIsLoading - ? LoadingState.Loading - : error - ? LoadingState.Error - : LoadingState.Done; const queryResponse: PanelData = { series, - state: queryResponseState, + state: loadingState, error, }; diff --git a/public/app/features/explore/TableContainer.tsx b/public/app/features/explore/TableContainer.tsx index 18ee70d8ee2..ea227e78b97 100644 --- a/public/app/features/explore/TableContainer.tsx +++ b/public/app/features/explore/TableContainer.tsx @@ -9,6 +9,7 @@ import { toggleTable } from './state/actions'; import Table from './Table'; import Panel from './Panel'; import TableModel from 'app/core/table_model'; +import { LoadingState } from '@grafana/ui'; interface TableContainerProps { exploreId: ExploreId; @@ -38,8 +39,11 @@ export class TableContainer extends PureComponent { function mapStateToProps(state: StoreState, { exploreId }) { const explore = state.explore; const item: ExploreItemState = explore[exploreId]; - const { tableIsLoading, showingTable, tableResult } = item; - const loading = tableIsLoading; + const { loadingState, showingTable, tableResult } = item; + const loading = + tableResult && tableResult.rows.length > 0 + ? false + : loadingState === LoadingState.Loading || loadingState === LoadingState.Streaming; return { loading, showingTable, tableResult }; } diff --git a/public/app/features/explore/state/actionTypes.ts b/public/app/features/explore/state/actionTypes.ts index b572b6ca041..68b9ac604eb 100644 --- a/public/app/features/explore/state/actionTypes.ts +++ b/public/app/features/explore/state/actionTypes.ts @@ -9,18 +9,23 @@ import { LogLevel, TimeRange, DataQueryError, + SeriesData, + LogsModel, + TimeSeries, + DataQueryResponseData, + LoadingState, } from '@grafana/ui/src/types'; import { ExploreId, ExploreItemState, HistoryItem, RangeScanner, - ResultType, - QueryTransaction, ExploreUIState, ExploreMode, + QueryOptions, } from 'app/types/explore'; import { actionCreatorFactory, noPayloadActionCreatorFactory, ActionOf } from 'app/core/redux/actionCreatorFactory'; +import TableModel from 'app/core/table_model'; /** Higher order actions * @@ -142,21 +147,19 @@ export interface ModifyQueriesPayload { export interface QueryFailurePayload { exploreId: ExploreId; response: DataQueryError; - resultType: ResultType; } export interface QueryStartPayload { exploreId: ExploreId; - resultType: ResultType; - rowIndex: number; - transaction: QueryTransaction; } export interface QuerySuccessPayload { exploreId: ExploreId; - result: any; - resultType: ResultType; latency: number; + loadingState: LoadingState; + graphResult: TimeSeries[]; + tableResult: TableModel; + logsResult: LogsModel; } export interface HistoryUpdatedPayload { @@ -238,6 +241,41 @@ export interface ResetQueryErrorPayload { refIds: string[]; } +export interface SetUrlReplacedPayload { + exploreId: ExploreId; +} + +export interface ProcessQueryErrorsPayload { + exploreId: ExploreId; + response: any; + datasourceId: string; +} + +export interface ProcessQueryResultsPayload { + exploreId: ExploreId; + latency: number; + datasourceId: string; + loadingState: LoadingState; + series?: DataQueryResponseData[]; + delta?: SeriesData[]; +} + +export interface RunQueriesBatchPayload { + exploreId: ExploreId; + queryOptions: QueryOptions; +} + +export interface LimitMessageRatePayload { + series: SeriesData[]; + exploreId: ExploreId; + datasourceId: string; +} + +export interface ChangeRangePayload { + exploreId: ExploreId; + range: TimeRange; +} + /** * Adds a query row after the row with the given index. */ @@ -333,13 +371,6 @@ export const modifyQueriesAction = actionCreatorFactory('e */ export const queryFailureAction = actionCreatorFactory('explore/QUERY_FAILURE').create(); -/** - * Start a query transaction for the given result type. - * @param exploreId Explore area - * @param transaction Query options and `done` status. - * @param resultType Associate the transaction with a result viewer, e.g., Graph - * @param rowIndex Index is used to associate latency for this transaction with a query row - */ export const queryStartAction = actionCreatorFactory('explore/QUERY_START').create(); /** @@ -392,6 +423,7 @@ export const splitCloseAction = actionCreatorFactory('e * The copy keeps all query modifications but wipes the query results. */ export const splitOpenAction = actionCreatorFactory('explore/SPLIT_OPEN').create(); + export const stateSaveAction = noPayloadActionCreatorFactory('explore/STATE_SAVE').create(); /** @@ -440,6 +472,24 @@ export const historyUpdatedAction = actionCreatorFactory( export const resetQueryErrorAction = actionCreatorFactory('explore/RESET_QUERY_ERROR').create(); +export const setUrlReplacedAction = actionCreatorFactory('explore/SET_URL_REPLACED').create(); + +export const processQueryErrorsAction = actionCreatorFactory( + 'explore/PROCESS_QUERY_ERRORS' +).create(); + +export const processQueryResultsAction = actionCreatorFactory( + 'explore/PROCESS_QUERY_RESULTS' +).create(); + +export const runQueriesBatchAction = actionCreatorFactory('explore/RUN_QUERIES_BATCH').create(); + +export const limitMessageRatePayloadAction = actionCreatorFactory( + 'explore/LIMIT_MESSAGE_RATE_PAYLOAD' +).create(); + +export const changeRangeAction = actionCreatorFactory('explore/CHANGE_RANGE').create(); + export type HigherOrderAction = | ActionOf | SplitOpenAction diff --git a/public/app/features/explore/state/actions.ts b/public/app/features/explore/state/actions.ts index bfeb96aef35..4f95744eb47 100644 --- a/public/app/features/explore/state/actions.ts +++ b/public/app/features/explore/state/actions.ts @@ -7,25 +7,14 @@ import { getDatasourceSrv } from 'app/features/plugins/datasource_srv'; import { Emitter } from 'app/core/core'; import { LAST_USED_DATASOURCE_KEY, - clearQueryKeys, ensureQueries, generateEmptyQuery, - hasNonEmptyQuery, - makeTimeSeriesList, - updateHistory, - buildQueryTransaction, - serializeStateToUrlParam, parseUrlState, getTimeRange, getTimeRangeFromUrl, generateNewKeyAndAddRefIdIfMissing, - instanceOfDataQueryError, - getRefIds, } from 'app/core/utils/explore'; -// Actions -import { updateLocation } from 'app/core/actions'; - // Types import { ThunkResult } from 'app/types'; import { @@ -34,19 +23,9 @@ import { DataQuery, DataSourceSelectItem, QueryFixAction, - TimeRange, LogsDedupStrategy, } from '@grafana/ui'; -import { - ExploreId, - ExploreUrlState, - RangeScanner, - ResultType, - QueryOptions, - ExploreUIState, - QueryTransaction, - ExploreMode, -} from 'app/types/explore'; +import { ExploreId, RangeScanner, ExploreUIState, QueryTransaction, ExploreMode } from 'app/types/explore'; import { updateDatasourceInstanceAction, changeQueryAction, @@ -55,7 +34,6 @@ import { changeSizeAction, ChangeSizePayload, changeTimeAction, - scanStopAction, clearQueriesAction, initializeExploreAction, loadDatasourceMissingAction, @@ -64,9 +42,6 @@ import { LoadDatasourceReadyPayload, loadDatasourceReadyAction, modifyQueriesAction, - queryFailureAction, - querySuccessAction, - scanRangeAction, scanStartAction, setQueriesAction, splitCloseAction, @@ -77,21 +52,19 @@ import { ToggleGraphPayload, ToggleTablePayload, updateUIStateAction, - runQueriesAction, testDataSourcePendingAction, testDataSourceSuccessAction, testDataSourceFailureAction, loadExploreDatasources, - queryStartAction, - historyUpdatedAction, - resetQueryErrorAction, changeModeAction, + scanStopAction, + scanRangeAction, + runQueriesAction, + stateSaveAction, } from './actionTypes'; import { ActionOf, ActionCreator } from 'app/core/redux/actionCreatorFactory'; import { getTimeZone } from 'app/features/profile/state/selectors'; -import { isDateTime } from '@grafana/ui/src/utils/moment_wrapper'; -import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState'; -import { startSubscriptionsAction, subscriptionDataReceivedAction } from 'app/features/explore/state/epics'; +import { offOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; /** * Updates UI state and save it to the URL @@ -99,7 +72,7 @@ import { startSubscriptionsAction, subscriptionDataReceivedAction } from 'app/fe const updateExploreUIState = (exploreId: ExploreId, uiStateFragment: Partial): ThunkResult => { return dispatch => { dispatch(updateUIStateAction({ exploreId, ...uiStateFragment })); - dispatch(stateSave()); + dispatch(stateSaveAction()); }; }; @@ -118,7 +91,7 @@ export function addQueryRow(exploreId: ExploreId, index: number): ThunkResult { +export function changeDatasource(exploreId: ExploreId, datasource: string): ThunkResult { return async (dispatch, getState) => { let newDataSourceInstance: DataSourceApi = null; @@ -135,8 +108,12 @@ export function changeDatasource(exploreId: ExploreId, datasource: string, repla dispatch(updateDatasourceInstanceAction({ exploreId, datasourceInstance: newDataSourceInstance })); + if (getState().explore[exploreId].isLive) { + dispatch(changeRefreshInterval(exploreId, offOption.value)); + } + await dispatch(loadDatasource(exploreId, newDataSourceInstance)); - dispatch(runQueries(exploreId, false, replaceUrl)); + dispatch(runQueries(exploreId)); }; } @@ -215,7 +192,7 @@ export function clearQueries(exploreId: ExploreId): ThunkResult { return dispatch => { dispatch(scanStopAction({ exploreId })); dispatch(clearQueriesAction({ exploreId })); - dispatch(stateSave()); + dispatch(stateSaveAction()); }; } @@ -242,7 +219,7 @@ export function loadExploreDatasourcesAndSetDatasource( dispatch(loadExploreDatasources({ exploreId, exploreDatasources })); if (exploreDatasources.length >= 1) { - dispatch(changeDatasource(exploreId, datasourceName, true)); + dispatch(changeDatasource(exploreId, datasourceName)); } else { dispatch(loadDatasourceMissingAction({ exploreId })); } @@ -419,201 +396,17 @@ export function modifyQueries( }; } -export function processQueryErrors( - exploreId: ExploreId, - response: any, - resultType: ResultType, - datasourceId: string -): ThunkResult { - return (dispatch, getState) => { - const { datasourceInstance } = getState().explore[exploreId]; - - if (datasourceInstance.meta.id !== datasourceId || response.cancelled) { - // Navigated away, queries did not matter - return; - } - - console.error(response); // To help finding problems with query syntax - - if (!instanceOfDataQueryError(response)) { - response = toDataQueryError(response); - } - - dispatch( - queryFailureAction({ - exploreId, - response, - resultType, - }) - ); - }; -} - -/** - * @param exploreId Explore area - * @param response Response from `datasourceInstance.query()` - * @param latency Duration between request and response - * @param resultType The type of result - * @param datasourceId Origin datasource instance, used to discard results if current datasource is different - */ -export function processQueryResults( - exploreId: ExploreId, - response: any, - latency: number, - resultType: ResultType, - datasourceId: string -): ThunkResult { - return (dispatch, getState) => { - const { datasourceInstance, scanning, scanner } = getState().explore[exploreId]; - - // If datasource already changed, results do not matter - if (datasourceInstance.meta.id !== datasourceId) { - return; - } - - const series: any[] = response.data; - const refIds = getRefIds(series); - - // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly - dispatch( - resetQueryErrorAction({ - exploreId, - refIds, - }) - ); - - const resultGetter = - resultType === 'Graph' ? makeTimeSeriesList : resultType === 'Table' ? (data: any[]) => data : null; - const result = resultGetter ? resultGetter(series, null, []) : series; - - dispatch( - querySuccessAction({ - exploreId, - result, - resultType, - latency, - }) - ); - - // Keep scanning for results if this was the last scanning transaction - if (scanning) { - if (_.size(result) === 0) { - const range = scanner(); - dispatch(scanRangeAction({ exploreId, range })); - } else { - // We can stop scanning if we have a result - dispatch(scanStopAction({ exploreId })); - } - } - }; -} - /** * Main action to run queries and dispatches sub-actions based on which result viewers are active */ -export function runQueries(exploreId: ExploreId, ignoreUIState = false, replaceUrl = false): ThunkResult { +export function runQueries(exploreId: ExploreId): ThunkResult { return (dispatch, getState) => { - const { - datasourceInstance, - queries, - showingGraph, - showingTable, - datasourceError, - containerWidth, - mode, - range, - } = getState().explore[exploreId]; - - if (datasourceError) { - // let's not run any queries if data source is in a faulty state - return; - } - - if (!hasNonEmptyQuery(queries)) { - dispatch(clearQueriesAction({ exploreId })); - dispatch(stateSave(replaceUrl)); // Remember to save to state and update location - return; - } - - // Some datasource's query builders allow per-query interval limits, - // but we're using the datasource interval limit for now - const interval = datasourceInstance.interval; + const { range } = getState().explore[exploreId]; const timeZone = getTimeZone(getState().user); const updatedRange = getTimeRange(timeZone, range.raw); dispatch(runQueriesAction({ exploreId, range: updatedRange })); - // Keep table queries first since they need to return quickly - if ((ignoreUIState || showingTable) && mode === ExploreMode.Metrics) { - dispatch( - runQueriesForType(exploreId, 'Table', { - interval, - format: 'table', - instant: true, - valueWithRefId: true, - }) - ); - } - if ((ignoreUIState || showingGraph) && mode === ExploreMode.Metrics) { - dispatch( - runQueriesForType(exploreId, 'Graph', { - interval, - format: 'time_series', - instant: false, - maxDataPoints: containerWidth, - }) - ); - } - if (mode === ExploreMode.Logs) { - dispatch(runQueriesForType(exploreId, 'Logs', { interval, format: 'logs' })); - } - - dispatch(stateSave(replaceUrl)); - }; -} - -/** - * Helper action to build a query transaction object and handing the query to the datasource. - * @param exploreId Explore area - * @param resultType Result viewer that will be associated with this query result - * @param queryOptions Query options as required by the datasource's `query()` function. - * @param resultGetter Optional result extractor, e.g., if the result is a list and you only need the first element. - */ -function runQueriesForType( - exploreId: ExploreId, - resultType: ResultType, - queryOptions: QueryOptions -): ThunkResult { - return async (dispatch, getState) => { - const { datasourceInstance, eventBridge, queries, queryIntervals, range, scanning, history } = getState().explore[ - exploreId - ]; - - if (resultType === 'Logs' && datasourceInstance.convertToStreamTargets) { - dispatch( - startSubscriptionsAction({ - exploreId, - dataReceivedActionCreator: subscriptionDataReceivedAction, - }) - ); - } - - const datasourceId = datasourceInstance.meta.id; - const transaction = buildQueryTransaction(queries, resultType, queryOptions, range, queryIntervals, scanning); - dispatch(queryStartAction({ exploreId, resultType, rowIndex: 0, transaction })); - try { - const now = Date.now(); - const response = await datasourceInstance.query(transaction.options); - eventBridge.emit('data-received', response.data || []); - const latency = Date.now() - now; - // Side-effect: Saving history in localstorage - const nextHistory = updateHistory(history, datasourceId, queries); - dispatch(historyUpdatedAction({ exploreId, history: nextHistory })); - dispatch(processQueryResults(exploreId, response, latency, resultType, datasourceId)); - } catch (err) { - eventBridge.emit('data-error', err); - dispatch(processQueryErrors(exploreId, err, resultType, datasourceId)); - } }; } @@ -653,7 +446,7 @@ export function setQueries(exploreId: ExploreId, rawQueries: DataQuery[]): Thunk export function splitClose(itemId: ExploreId): ThunkResult { return dispatch => { dispatch(splitCloseAction({ itemId })); - dispatch(stateSave()); + dispatch(stateSaveAction()); }; } @@ -677,64 +470,7 @@ export function splitOpen(): ThunkResult { urlState, }; dispatch(splitOpenAction({ itemState })); - dispatch(stateSave()); - }; -} - -const toRawTimeRange = (range: TimeRange): RawTimeRange => { - let from = range.raw.from; - if (isDateTime(from)) { - from = from.valueOf().toString(10); - } - - let to = range.raw.to; - if (isDateTime(to)) { - to = to.valueOf().toString(10); - } - - return { - from, - to, - }; -}; - -/** - * Saves Explore state to URL using the `left` and `right` parameters. - * If split view is not active, `right` will not be set. - */ -export function stateSave(replaceUrl = false): ThunkResult { - return (dispatch, getState) => { - const { left, right, split } = getState().explore; - const urlStates: { [index: string]: string } = {}; - const leftUrlState: ExploreUrlState = { - datasource: left.datasourceInstance.name, - queries: left.queries.map(clearQueryKeys), - range: toRawTimeRange(left.range), - ui: { - showingGraph: left.showingGraph, - showingLogs: true, - showingTable: left.showingTable, - dedupStrategy: left.dedupStrategy, - }, - }; - urlStates.left = serializeStateToUrlParam(leftUrlState, true); - if (split) { - const rightUrlState: ExploreUrlState = { - datasource: right.datasourceInstance.name, - queries: right.queries.map(clearQueryKeys), - range: toRawTimeRange(right.range), - ui: { - showingGraph: right.showingGraph, - showingLogs: true, - showingTable: right.showingTable, - dedupStrategy: right.dedupStrategy, - }, - }; - - urlStates.right = serializeStateToUrlParam(rightUrlState, true); - } - - dispatch(updateLocation({ query: urlStates, replace: replaceUrl })); + dispatch(stateSaveAction()); }; } diff --git a/public/app/features/explore/state/epics.test.ts b/public/app/features/explore/state/epics.test.ts deleted file mode 100644 index fbfb934a43a..00000000000 --- a/public/app/features/explore/state/epics.test.ts +++ /dev/null @@ -1,550 +0,0 @@ -import { liveOption } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; -import { DataSourceApi, DataQuery } from '@grafana/ui/src/types/datasource'; - -import { ExploreId, ExploreState } from 'app/types'; -import { actionCreatorFactory } from 'app/core/redux/actionCreatorFactory'; -import { - startSubscriptionsEpic, - startSubscriptionsAction, - SubscriptionDataReceivedPayload, - startSubscriptionAction, - startSubscriptionEpic, - limitMessageRatePayloadAction, -} from './epics'; -import { makeExploreItemState } from './reducers'; -import { epicTester } from 'test/core/redux/epicTester'; -import { - resetExploreAction, - updateDatasourceInstanceAction, - changeRefreshIntervalAction, - clearQueriesAction, -} from './actionTypes'; - -const setup = (options: any = {}) => { - const url = '/api/datasources/proxy/20/api/prom/tail?query=%7Bfilename%3D%22%2Fvar%2Flog%2Fdocker.log%22%7D'; - const webSocketUrl = 'ws://localhost' + url; - const refId = options.refId || 'A'; - const exploreId = ExploreId.left; - const datasourceInstance: DataSourceApi = options.datasourceInstance || { - id: 1337, - query: jest.fn(), - name: 'test', - testDatasource: jest.fn(), - convertToStreamTargets: () => [ - { - url, - refId, - }, - ], - resultToSeriesData: data => [data], - }; - const itemState = makeExploreItemState(); - const explore: Partial = { - [exploreId]: { - ...itemState, - datasourceInstance, - refreshInterval: options.refreshInterval || liveOption.value, - queries: [{} as DataQuery], - }, - }; - const state: any = { - explore, - }; - - return { url, state, refId, webSocketUrl, exploreId }; -}; - -const dataReceivedActionCreator = actionCreatorFactory('test').create(); - -describe('startSubscriptionsEpic', () => { - describe('when startSubscriptionsAction is dispatched', () => { - describe('and datasource supports convertToStreamTargets', () => { - describe('and explore is Live', () => { - it('then correct actions should be dispatched', () => { - const { state, refId, webSocketUrl, exploreId } = setup(); - - epicTester(startSubscriptionsEpic, state) - .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator })) - .thenResultingActionsEqual( - startSubscriptionAction({ - exploreId, - refId, - url: webSocketUrl, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and explore is not Live', () => { - it('then no actions should be dispatched', () => { - const { state, exploreId } = setup({ refreshInterval: '10s' }); - - epicTester(startSubscriptionsEpic, state) - .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator })) - .thenNoActionsWhereDispatched(); - }); - }); - }); - - describe('and datasource does not support streaming', () => { - it('then no actions should be dispatched', () => { - const { state, exploreId } = setup({ datasourceInstance: {} }); - - epicTester(startSubscriptionsEpic, state) - .whenActionIsDispatched(startSubscriptionsAction({ exploreId, dataReceivedActionCreator })) - .thenNoActionsWhereDispatched(); - }); - }); - }); -}); - -describe('startSubscriptionEpic', () => { - describe('when startSubscriptionAction is dispatched', () => { - describe('and datasource supports resultToSeriesData', () => { - it('then correct actions should be dispatched', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and datasource does not support resultToSeriesData', () => { - it('then no actions should be dispatched', () => { - const { state, webSocketUrl, refId, exploreId } = setup({ datasourceInstance: {} }); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenNoActionsWhereDispatched(); - }); - }); - }); - - describe('when an subscription is active', () => { - describe('and resetExploreAction is dispatched', () => { - it('then subscription should be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ url: webSocketUrl, refId, exploreId, dataReceivedActionCreator }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(resetExploreAction()) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and updateDatasourceInstanceAction is dispatched', () => { - describe('and exploreId matches the websockets', () => { - it('then subscription should be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(updateDatasourceInstanceAction({ exploreId, datasourceInstance: null })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and exploreId does not match the websockets', () => { - it('then subscription should not be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched( - updateDatasourceInstanceAction({ exploreId: ExploreId.right, datasourceInstance: null }) - ) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - }); - - describe('and changeRefreshIntervalAction is dispatched', () => { - describe('and exploreId matches the websockets', () => { - describe('and refreshinterval is not "Live"', () => { - it('then subscription should be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: '10s' })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and refreshinterval is "Live"', () => { - it('then subscription should not be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: liveOption.value })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - }); - - describe('and exploreId does not match the websockets', () => { - it('then subscription should not be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId: ExploreId.right, refreshInterval: '10s' })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - }); - - describe('and clearQueriesAction is dispatched', () => { - describe('and exploreId matches the websockets', () => { - it('then subscription should be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(clearQueriesAction({ exploreId })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - - describe('and exploreId does not match the websockets', () => { - it('then subscription should not be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched(clearQueriesAction({ exploreId: ExploreId.right })) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - }); - - describe('and startSubscriptionAction is dispatched', () => { - describe('and exploreId and refId matches the websockets', () => { - it('then subscription should be unsubscribed', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - // This looks like we haven't stopped the subscription but we actually started the same again - ); - }); - - describe('and exploreId or refId does not match the websockets', () => { - it('then subscription should not be unsubscribed and another websocket is started', () => { - const { state, webSocketUrl, refId, exploreId } = setup(); - - epicTester(startSubscriptionEpic, state) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId, - exploreId, - dataReceivedActionCreator, - }) - ) - .thenNoActionsWhereDispatched() - .whenWebSocketReceivesData({ data: [1, 2, 3] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }) - ) - .whenActionIsDispatched( - startSubscriptionAction({ - url: webSocketUrl, - refId: 'B', - exploreId, - dataReceivedActionCreator, - }) - ) - .whenWebSocketReceivesData({ data: [4, 5, 6] }) - .thenResultingActionsEqual( - limitMessageRatePayloadAction({ - exploreId, - data: { data: [1, 2, 3] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }), - limitMessageRatePayloadAction({ - exploreId, - data: { data: [4, 5, 6] } as any, - dataReceivedActionCreator, - }) - ); - }); - }); - }); - }); - }); -}); diff --git a/public/app/features/explore/state/epics.ts b/public/app/features/explore/state/epics.ts deleted file mode 100644 index a31474f81cc..00000000000 --- a/public/app/features/explore/state/epics.ts +++ /dev/null @@ -1,159 +0,0 @@ -import { Epic } from 'redux-observable'; -import { NEVER } from 'rxjs'; -import { takeUntil, mergeMap, tap, filter, map, throttleTime } from 'rxjs/operators'; - -import { StoreState, ExploreId } from 'app/types'; -import { ActionOf, ActionCreator, actionCreatorFactory } from '../../../core/redux/actionCreatorFactory'; -import { config } from '../../../core/config'; -import { - updateDatasourceInstanceAction, - resetExploreAction, - changeRefreshIntervalAction, - clearQueriesAction, -} from './actionTypes'; -import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; -import { SeriesData } from '@grafana/ui/src/types/data'; -import { EpicDependencies } from 'app/store/configureStore'; - -const convertToWebSocketUrl = (url: string) => { - const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://'; - let backend = `${protocol}${window.location.host}${config.appSubUrl}`; - if (backend.endsWith('/')) { - backend = backend.slice(0, backend.length - 1); - } - return `${backend}${url}`; -}; - -export interface StartSubscriptionsPayload { - exploreId: ExploreId; - dataReceivedActionCreator: ActionCreator; -} - -export const startSubscriptionsAction = actionCreatorFactory( - 'explore/START_SUBSCRIPTIONS' -).create(); - -export interface StartSubscriptionPayload { - url: string; - refId: string; - exploreId: ExploreId; - dataReceivedActionCreator: ActionCreator; -} - -export const startSubscriptionAction = actionCreatorFactory( - 'explore/START_SUBSCRIPTION' -).create(); - -export interface SubscriptionDataReceivedPayload { - data: SeriesData; - exploreId: ExploreId; -} - -export const subscriptionDataReceivedAction = actionCreatorFactory( - 'explore/SUBSCRIPTION_DATA_RECEIVED' -).create(); - -export interface LimitMessageRatePayload { - data: SeriesData; - exploreId: ExploreId; - dataReceivedActionCreator: ActionCreator; -} - -export const limitMessageRatePayloadAction = actionCreatorFactory( - 'explore/LIMIT_MESSAGE_RATE_PAYLOAD' -).create(); - -export const startSubscriptionsEpic: Epic, ActionOf, StoreState> = (action$, state$) => { - return action$.ofType(startSubscriptionsAction.type).pipe( - mergeMap((action: ActionOf) => { - const { exploreId, dataReceivedActionCreator } = action.payload; - const { datasourceInstance, queries, refreshInterval } = state$.value.explore[exploreId]; - - if (!datasourceInstance || !datasourceInstance.convertToStreamTargets) { - return NEVER; //do nothing if datasource does not support streaming - } - - if (!refreshInterval || !isLive(refreshInterval)) { - return NEVER; //do nothing if refresh interval is not 'LIVE' - } - - const request: any = { targets: queries }; - return datasourceInstance.convertToStreamTargets(request).map(target => - startSubscriptionAction({ - url: convertToWebSocketUrl(target.url), - refId: target.refId, - exploreId, - dataReceivedActionCreator, - }) - ); - }) - ); -}; - -export const startSubscriptionEpic: Epic, ActionOf, StoreState, EpicDependencies> = ( - action$, - state$, - { getWebSocket } -) => { - return action$.ofType(startSubscriptionAction.type).pipe( - mergeMap((action: ActionOf) => { - const { url, exploreId, refId, dataReceivedActionCreator } = action.payload; - return getWebSocket(url).pipe( - takeUntil( - action$ - .ofType( - startSubscriptionAction.type, - resetExploreAction.type, - updateDatasourceInstanceAction.type, - changeRefreshIntervalAction.type, - clearQueriesAction.type - ) - .pipe( - filter(action => { - if (action.type === resetExploreAction.type) { - return true; // stops all subscriptions if user navigates away - } - - if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) { - return true; // stops subscriptions if user changes data source - } - - if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) { - return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live' - } - - if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) { - return true; // stops subscriptions if user clears all queries - } - - return action.payload.exploreId === exploreId && action.payload.refId === refId; - }), - tap(value => console.log('Stopping subscription', value)) - ) - ), - mergeMap((result: any) => { - const { datasourceInstance } = state$.value.explore[exploreId]; - - if (!datasourceInstance || !datasourceInstance.resultToSeriesData) { - return [null]; //do nothing if datasource does not support streaming - } - - return datasourceInstance - .resultToSeriesData(result, refId) - .map(data => limitMessageRatePayloadAction({ exploreId, data, dataReceivedActionCreator })); - }), - filter(action => action !== null) - ); - }) - ); -}; - -export const limitMessageRateEpic: Epic, ActionOf, StoreState, EpicDependencies> = action$ => { - return action$.ofType(limitMessageRatePayloadAction.type).pipe( - throttleTime(1), - map((action: ActionOf) => { - const { exploreId, data, dataReceivedActionCreator } = action.payload; - return dataReceivedActionCreator({ exploreId, data }); - }) - ); -}; diff --git a/public/app/features/explore/state/epics/limitMessageRateEpic.ts b/public/app/features/explore/state/epics/limitMessageRateEpic.ts new file mode 100644 index 00000000000..62013706968 --- /dev/null +++ b/public/app/features/explore/state/epics/limitMessageRateEpic.ts @@ -0,0 +1,25 @@ +import { Epic } from 'redux-observable'; +import { map, throttleTime } from 'rxjs/operators'; +import { LoadingState } from '@grafana/ui'; + +import { StoreState } from 'app/types'; +import { ActionOf } from '../../../../core/redux/actionCreatorFactory'; +import { limitMessageRatePayloadAction, LimitMessageRatePayload, processQueryResultsAction } from '../actionTypes'; +import { EpicDependencies } from 'app/store/configureStore'; + +export const limitMessageRateEpic: Epic, ActionOf, StoreState, EpicDependencies> = action$ => { + return action$.ofType(limitMessageRatePayloadAction.type).pipe( + throttleTime(1), + map((action: ActionOf) => { + const { exploreId, series, datasourceId } = action.payload; + return processQueryResultsAction({ + exploreId, + latency: 0, + datasourceId, + loadingState: LoadingState.Streaming, + series: null, + delta: series, + }); + }) + ); +}; diff --git a/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts b/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts new file mode 100644 index 00000000000..7cdaca78f7d --- /dev/null +++ b/public/app/features/explore/state/epics/processQueryErrorsEpic.test.ts @@ -0,0 +1,67 @@ +import { mockExploreState } from 'test/mocks/mockExploreState'; +import { epicTester } from 'test/core/redux/epicTester'; +import { processQueryErrorsAction, queryFailureAction } from '../actionTypes'; +import { processQueryErrorsEpic } from './processQueryErrorsEpic'; + +describe('processQueryErrorsEpic', () => { + let originalConsoleError = console.error; + + beforeEach(() => { + originalConsoleError = console.error; + console.error = jest.fn(); + }); + + afterEach(() => { + console.error = originalConsoleError; + }); + + describe('when processQueryErrorsAction is dispatched', () => { + describe('and datasourceInstance is the same', () => { + describe('and the response is not cancelled', () => { + it('then queryFailureAction is dispatched', () => { + const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); + const response = { message: 'Something went terribly wrong!' }; + + epicTester(processQueryErrorsEpic, state) + .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response })) + .thenResultingActionsEqual(queryFailureAction({ exploreId, response })); + + expect(console.error).toBeCalledTimes(1); + expect(console.error).toBeCalledWith(response); + expect(eventBridge.emit).toBeCalledTimes(1); + expect(eventBridge.emit).toBeCalledWith('data-error', response); + }); + }); + + describe('and the response is cancelled', () => { + it('then no actions are dispatched', () => { + const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); + const response = { cancelled: true, message: 'Something went terribly wrong!' }; + + epicTester(processQueryErrorsEpic, state) + .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId, response })) + .thenNoActionsWhereDispatched(); + + expect(console.error).not.toBeCalled(); + expect(eventBridge.emit).not.toBeCalled(); + }); + }); + }); + + describe('and datasourceInstance is not the same', () => { + describe('and the response is not cancelled', () => { + it('then no actions are dispatched', () => { + const { exploreId, state, eventBridge } = mockExploreState(); + const response = { message: 'Something went terribly wrong!' }; + + epicTester(processQueryErrorsEpic, state) + .whenActionIsDispatched(processQueryErrorsAction({ exploreId, datasourceId: 'other id', response })) + .thenNoActionsWhereDispatched(); + + expect(console.error).not.toBeCalled(); + expect(eventBridge.emit).not.toBeCalled(); + }); + }); + }); + }); +}); diff --git a/public/app/features/explore/state/epics/processQueryErrorsEpic.ts b/public/app/features/explore/state/epics/processQueryErrorsEpic.ts new file mode 100644 index 00000000000..ea029186dc8 --- /dev/null +++ b/public/app/features/explore/state/epics/processQueryErrorsEpic.ts @@ -0,0 +1,40 @@ +import { Epic } from 'redux-observable'; +import { mergeMap } from 'rxjs/operators'; +import { NEVER, of } from 'rxjs'; + +import { ActionOf } from 'app/core/redux/actionCreatorFactory'; +import { StoreState } from 'app/types/store'; +import { instanceOfDataQueryError } from 'app/core/utils/explore'; +import { toDataQueryError } from 'app/features/dashboard/state/PanelQueryState'; +import { processQueryErrorsAction, ProcessQueryErrorsPayload, queryFailureAction } from '../actionTypes'; + +export const processQueryErrorsEpic: Epic, ActionOf, StoreState> = (action$, state$) => { + return action$.ofType(processQueryErrorsAction.type).pipe( + mergeMap((action: ActionOf) => { + const { exploreId, datasourceId } = action.payload; + let { response } = action.payload; + const { datasourceInstance, eventBridge } = state$.value.explore[exploreId]; + + if (datasourceInstance.meta.id !== datasourceId || response.cancelled) { + // Navigated away, queries did not matter + return NEVER; + } + + // For Angular editors + eventBridge.emit('data-error', response); + + console.error(response); // To help finding problems with query syntax + + if (!instanceOfDataQueryError(response)) { + response = toDataQueryError(response); + } + + return of( + queryFailureAction({ + exploreId, + response, + }) + ); + }) + ); +}; diff --git a/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts b/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts new file mode 100644 index 00000000000..c5da93081aa --- /dev/null +++ b/public/app/features/explore/state/epics/processQueryResultsEpic.test.ts @@ -0,0 +1,119 @@ +import { mockExploreState } from 'test/mocks/mockExploreState'; +import { epicTester } from 'test/core/redux/epicTester'; +import { + processQueryResultsAction, + resetQueryErrorAction, + querySuccessAction, + scanStopAction, + scanRangeAction, +} from '../actionTypes'; +import { SeriesData, LoadingState } from '@grafana/ui'; +import { processQueryResultsEpic } from './processQueryResultsEpic'; +import TableModel from 'app/core/table_model'; + +const testContext = () => { + const serieA: SeriesData = { + fields: [], + refId: 'A', + rows: [], + }; + const serieB: SeriesData = { + fields: [], + refId: 'B', + rows: [], + }; + const series = [serieA, serieB]; + const latency = 0; + const loadingState = LoadingState.Done; + + return { + latency, + series, + loadingState, + }; +}; + +describe('processQueryResultsEpic', () => { + describe('when processQueryResultsAction is dispatched', () => { + describe('and datasourceInstance is the same', () => { + describe('and explore is not scanning', () => { + it('then resetQueryErrorAction and querySuccessAction are dispatched and eventBridge emits correct message', () => { + const { datasourceId, exploreId, state, eventBridge } = mockExploreState(); + const { latency, series, loadingState } = testContext(); + const graphResult = []; + const tableResult = new TableModel(); + const logsResult = null; + + epicTester(processQueryResultsEpic, state) + .whenActionIsDispatched( + processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency }) + ) + .thenResultingActionsEqual( + resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }), + querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }) + ); + + expect(eventBridge.emit).toBeCalledTimes(1); + expect(eventBridge.emit).toBeCalledWith('data-received', series); + }); + }); + + describe('and explore is scanning', () => { + describe('and we have a result', () => { + it('then correct actions are dispatched', () => { + const { datasourceId, exploreId, state } = mockExploreState({ scanning: true }); + const { latency, series, loadingState } = testContext(); + const graphResult = []; + const tableResult = new TableModel(); + const logsResult = null; + + epicTester(processQueryResultsEpic, state) + .whenActionIsDispatched( + processQueryResultsAction({ exploreId, datasourceId, loadingState, series, latency }) + ) + .thenResultingActionsEqual( + resetQueryErrorAction({ exploreId, refIds: ['A', 'B'] }), + querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }), + scanStopAction({ exploreId }) + ); + }); + }); + + describe('and we do not have a result', () => { + it('then correct actions are dispatched', () => { + const { datasourceId, exploreId, state, scanner } = mockExploreState({ scanning: true }); + const { latency, loadingState } = testContext(); + const graphResult = []; + const tableResult = new TableModel(); + const logsResult = null; + + epicTester(processQueryResultsEpic, state) + .whenActionIsDispatched( + processQueryResultsAction({ exploreId, datasourceId, loadingState, series: [], latency }) + ) + .thenResultingActionsEqual( + resetQueryErrorAction({ exploreId, refIds: [] }), + querySuccessAction({ exploreId, loadingState, graphResult, tableResult, logsResult, latency }), + scanRangeAction({ exploreId, range: scanner() }) + ); + }); + }); + }); + }); + + describe('and datasourceInstance is not the same', () => { + it('then no actions are dispatched and eventBridge does not emit message', () => { + const { exploreId, state, eventBridge } = mockExploreState(); + const { series, loadingState } = testContext(); + + epicTester(processQueryResultsEpic, state) + .whenActionIsDispatched( + processQueryResultsAction({ exploreId, datasourceId: 'other id', loadingState, series, latency: 0 }) + ) + .thenNoActionsWhereDispatched(); + + expect(eventBridge.emit).not.toBeCalled(); + }); + }); + }); +}); diff --git a/public/app/features/explore/state/epics/processQueryResultsEpic.ts b/public/app/features/explore/state/epics/processQueryResultsEpic.ts new file mode 100644 index 00000000000..76e767c36a0 --- /dev/null +++ b/public/app/features/explore/state/epics/processQueryResultsEpic.ts @@ -0,0 +1,76 @@ +import _ from 'lodash'; +import { Epic } from 'redux-observable'; +import { mergeMap } from 'rxjs/operators'; +import { NEVER } from 'rxjs'; +import { LoadingState } from '@grafana/ui'; + +import { ActionOf } from 'app/core/redux/actionCreatorFactory'; +import { StoreState } from 'app/types/store'; +import { getRefIds } from 'app/core/utils/explore'; +import { + processQueryResultsAction, + ProcessQueryResultsPayload, + querySuccessAction, + scanRangeAction, + resetQueryErrorAction, + scanStopAction, +} from '../actionTypes'; +import { ResultProcessor } from '../../utils/ResultProcessor'; + +export const processQueryResultsEpic: Epic, ActionOf, StoreState> = (action$, state$) => { + return action$.ofType(processQueryResultsAction.type).pipe( + mergeMap((action: ActionOf) => { + const { exploreId, datasourceId, latency, loadingState, series, delta } = action.payload; + const { datasourceInstance, scanning, scanner, eventBridge } = state$.value.explore[exploreId]; + + // If datasource already changed, results do not matter + if (datasourceInstance.meta.id !== datasourceId) { + return NEVER; + } + + const result = series || delta || []; + const replacePreviousResults = loadingState === LoadingState.Done && series && !delta ? true : false; + const resultProcessor = new ResultProcessor(state$.value.explore[exploreId], replacePreviousResults, result); + const graphResult = resultProcessor.getGraphResult(); + const tableResult = resultProcessor.getTableResult(); + const logsResult = resultProcessor.getLogsResult(); + const refIds = getRefIds(result); + const actions: Array> = []; + + // For Angular editors + eventBridge.emit('data-received', resultProcessor.getRawData()); + + // Clears any previous errors that now have a successful query, important so Angular editors are updated correctly + actions.push( + resetQueryErrorAction({ + exploreId, + refIds, + }) + ); + + actions.push( + querySuccessAction({ + exploreId, + latency, + loadingState, + graphResult, + tableResult, + logsResult, + }) + ); + + // Keep scanning for results if this was the last scanning transaction + if (scanning) { + if (_.size(result) === 0) { + const range = scanner(); + actions.push(scanRangeAction({ exploreId, range })); + } else { + // We can stop scanning if we have a result + actions.push(scanStopAction({ exploreId })); + } + } + + return actions; + }) + ); +}; diff --git a/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts b/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts new file mode 100644 index 00000000000..6ddada2bc32 --- /dev/null +++ b/public/app/features/explore/state/epics/runQueriesBatchEpic.test.ts @@ -0,0 +1,421 @@ +import { mockExploreState } from 'test/mocks/mockExploreState'; +import { epicTester } from 'test/core/redux/epicTester'; +import { runQueriesBatchEpic } from './runQueriesBatchEpic'; +import { + runQueriesBatchAction, + queryStartAction, + historyUpdatedAction, + processQueryResultsAction, + processQueryErrorsAction, + limitMessageRatePayloadAction, + resetExploreAction, + updateDatasourceInstanceAction, + changeRefreshIntervalAction, + clearQueriesAction, + stateSaveAction, +} from '../actionTypes'; +import { LoadingState, DataQueryRequest, SeriesData, FieldType } from '@grafana/ui'; + +const testContext = () => { + const series: SeriesData[] = [ + { + fields: [ + { + name: 'Value', + }, + { + name: 'Time', + type: FieldType.time, + unit: 'dateTimeAsIso', + }, + ], + rows: [], + refId: 'A', + }, + ]; + const response = { data: series }; + + return { + response, + series, + }; +}; + +describe('runQueriesBatchEpic', () => { + let originalDateNow = Date.now; + beforeEach(() => { + originalDateNow = Date.now; + Date.now = () => 1337; + }); + + afterEach(() => { + Date.now = originalDateNow; + }); + + describe('when runQueriesBatchAction is dispatched', () => { + describe('and query targets are not live', () => { + describe('and query is successful', () => { + it('then correct actions are dispatched', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId } = mockExploreState(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryReceivesResponse(response) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + }); + }); + + describe('and query is not successful', () => { + it('then correct actions are dispatched', () => { + const error = { + message: 'Error parsing line x', + }; + const { exploreId, state, datasourceId } = mockExploreState(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryThrowsError(error) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + processQueryErrorsAction({ exploreId, response: error, datasourceId }) + ); + }); + }); + }); + + describe('and query targets are live', () => { + describe('and state equals Streaming', () => { + it('then correct actions are dispatched', () => { + const { exploreId, state, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + const serieA = { + fields: [], + rows: [], + refId: 'A', + }; + const serieB = { + fields: [], + rows: [], + refId: 'B', + }; + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryObserverReceivesEvent({ + state: LoadingState.Streaming, + delta: [serieA], + key: 'some key', + request: {} as DataQueryRequest, + unsubscribe, + }) + .whenQueryObserverReceivesEvent({ + state: LoadingState.Streaming, + delta: [serieB], + key: 'some key', + request: {} as DataQueryRequest, + unsubscribe, + }) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + limitMessageRatePayloadAction({ exploreId, series: [serieA], datasourceId }), + limitMessageRatePayloadAction({ exploreId, series: [serieB], datasourceId }) + ); + }); + }); + + describe('and state equals Error', () => { + it('then correct actions are dispatched', () => { + const { exploreId, state, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + const error = { message: 'Something went really wrong!' }; + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryObserverReceivesEvent({ + state: LoadingState.Error, + error, + key: 'some key', + request: {} as DataQueryRequest, + unsubscribe, + }) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + processQueryErrorsAction({ exploreId, response: error, datasourceId }) + ); + }); + }); + + describe('and state equals Done', () => { + it('then correct actions are dispatched', () => { + const { exploreId, state, datasourceId, history } = mockExploreState(); + const unsubscribe = jest.fn(); + const serieA = { + fields: [], + rows: [], + refId: 'A', + }; + const serieB = { + fields: [], + rows: [], + refId: 'B', + }; + const delta = [serieA, serieB]; + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryObserverReceivesEvent({ + state: LoadingState.Done, + series: null, + delta, + key: 'some key', + request: {} as DataQueryRequest, + unsubscribe, + }) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta, + series: null, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + }); + }); + }); + + describe('and another runQueriesBatchAction is dispatched', () => { + it('then the observable should be unsubscribed', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) // first observable + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .whenActionIsDispatched( + // second observable and unsubscribes the first observable + runQueriesBatchAction({ exploreId, queryOptions: { live: true, interval: '', maxDataPoints: 800 } }) + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .thenResultingActionsEqual( + queryStartAction({ exploreId }), // output from first observable + historyUpdatedAction({ exploreId, history }), // output from first observable + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction(), + // output from first observable + queryStartAction({ exploreId }), // output from second observable + historyUpdatedAction({ exploreId, history }), // output from second observable + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + // output from second observable + ); + + expect(unsubscribe).toBeCalledTimes(1); // first unsubscribe should be called but not second as that isn't unsubscribed + }); + }); + + describe('and resetExploreAction is dispatched', () => { + it('then the observable should be unsubscribed', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .whenActionIsDispatched(resetExploreAction()) // unsubscribes the observable + .whenQueryReceivesResponse(response) // new updates will not reach anywhere + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + + expect(unsubscribe).toBeCalledTimes(1); + }); + }); + + describe('and updateDatasourceInstanceAction is dispatched', () => { + it('then the observable should be unsubscribed', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId, datasourceInstance } = mockExploreState(); + const unsubscribe = jest.fn(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .whenActionIsDispatched(updateDatasourceInstanceAction({ exploreId, datasourceInstance })) // unsubscribes the observable + .whenQueryReceivesResponse(response) // new updates will not reach anywhere + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + + expect(unsubscribe).toBeCalledTimes(1); + }); + }); + + describe('and changeRefreshIntervalAction is dispatched', () => { + it('then the observable should be unsubscribed', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .whenActionIsDispatched(changeRefreshIntervalAction({ exploreId, refreshInterval: '' })) // unsubscribes the observable + .whenQueryReceivesResponse(response) // new updates will not reach anywhere + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + + expect(unsubscribe).toBeCalledTimes(1); + }); + }); + + describe('and clearQueriesAction is dispatched', () => { + it('then the observable should be unsubscribed', () => { + const { response, series } = testContext(); + const { exploreId, state, history, datasourceId } = mockExploreState(); + const unsubscribe = jest.fn(); + + epicTester(runQueriesBatchEpic, state) + .whenActionIsDispatched( + runQueriesBatchAction({ exploreId, queryOptions: { live: false, interval: '', maxDataPoints: 1980 } }) + ) + .whenQueryReceivesResponse(response) + .whenQueryObserverReceivesEvent({ + key: 'some key', + request: {} as DataQueryRequest, + state: LoadingState.Loading, // fake just to setup and test unsubscribe + unsubscribe, + }) + .whenActionIsDispatched(clearQueriesAction({ exploreId })) // unsubscribes the observable + .whenQueryReceivesResponse(response) // new updates will not reach anywhere + .thenResultingActionsEqual( + queryStartAction({ exploreId }), + historyUpdatedAction({ exploreId, history }), + processQueryResultsAction({ + exploreId, + delta: null, + series, + latency: 0, + datasourceId, + loadingState: LoadingState.Done, + }), + stateSaveAction() + ); + + expect(unsubscribe).toBeCalledTimes(1); + }); + }); + }); +}); diff --git a/public/app/features/explore/state/epics/runQueriesBatchEpic.ts b/public/app/features/explore/state/epics/runQueriesBatchEpic.ts new file mode 100644 index 00000000000..8e2642f193f --- /dev/null +++ b/public/app/features/explore/state/epics/runQueriesBatchEpic.ts @@ -0,0 +1,220 @@ +import { Epic } from 'redux-observable'; +import { Observable, Subject } from 'rxjs'; +import { mergeMap, catchError, takeUntil, filter } from 'rxjs/operators'; +import _, { isString } from 'lodash'; +import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; +import { DataStreamState, LoadingState, DataQueryResponse, SeriesData, DataQueryResponseData } from '@grafana/ui'; +import * as dateMath from '@grafana/ui/src/utils/datemath'; + +import { ActionOf } from 'app/core/redux/actionCreatorFactory'; +import { StoreState } from 'app/types/store'; +import { buildQueryTransaction, updateHistory } from 'app/core/utils/explore'; +import { + clearQueriesAction, + historyUpdatedAction, + resetExploreAction, + updateDatasourceInstanceAction, + changeRefreshIntervalAction, + processQueryErrorsAction, + processQueryResultsAction, + runQueriesBatchAction, + RunQueriesBatchPayload, + queryStartAction, + limitMessageRatePayloadAction, + stateSaveAction, + changeRangeAction, +} from '../actionTypes'; +import { ExploreId, ExploreItemState } from 'app/types'; + +const publishActions = (outerObservable: Subject, actions: Array>) => { + for (const action of actions) { + outerObservable.next(action); + } +}; + +interface ProcessResponseConfig { + exploreId: ExploreId; + exploreItemState: ExploreItemState; + datasourceId: string; + now: number; + loadingState: LoadingState; + series?: DataQueryResponseData[]; + delta?: SeriesData[]; +} + +const processResponse = (config: ProcessResponseConfig) => { + const { exploreId, exploreItemState, datasourceId, now, loadingState, series, delta } = config; + const { queries, history } = exploreItemState; + const latency = Date.now() - now; + + // Side-effect: Saving history in localstorage + const nextHistory = updateHistory(history, datasourceId, queries); + return [ + historyUpdatedAction({ exploreId, history: nextHistory }), + processQueryResultsAction({ exploreId, latency, datasourceId, loadingState, series, delta }), + stateSaveAction(), + ]; +}; + +interface ProcessErrorConfig { + exploreId: ExploreId; + datasourceId: string; + error: any; +} + +const processError = (config: ProcessErrorConfig) => { + const { exploreId, datasourceId, error } = config; + + return [processQueryErrorsAction({ exploreId, response: error, datasourceId })]; +}; + +export const runQueriesBatchEpic: Epic, ActionOf, StoreState> = ( + action$, + state$, + { getQueryResponse } +) => { + return action$.ofType(runQueriesBatchAction.type).pipe( + mergeMap((action: ActionOf) => { + const { exploreId, queryOptions } = action.payload; + const exploreItemState = state$.value.explore[exploreId]; + const { datasourceInstance, queries, queryIntervals, range, scanning } = exploreItemState; + + // Create an observable per run queries action + // Within the observable create two subscriptions + // First subscription: 'querySubscription' subscribes to the call to query method on datasourceinstance + // Second subscription: 'streamSubscription' subscribes to events from the query methods observer callback + const observable: Observable> = Observable.create((outerObservable: Subject) => { + const datasourceId = datasourceInstance.meta.id; + const transaction = buildQueryTransaction(queries, queryOptions, range, queryIntervals, scanning); + outerObservable.next(queryStartAction({ exploreId })); + + const now = Date.now(); + let datasourceUnsubscribe: Function = null; + const streamHandler = new Subject(); + const observer = (event: DataStreamState) => { + datasourceUnsubscribe = event.unsubscribe; + if (!streamHandler.closed) { + // their might be a race condition when unsubscribing + streamHandler.next(event); + } + }; + + // observer subscription, handles datasourceInstance.query observer events and pushes that forward + const streamSubscription = streamHandler.subscribe({ + next: event => { + const { state, error, series, delta } = event; + if (!series && !delta && !error) { + return; + } + + if (state === LoadingState.Error) { + const actions = processError({ exploreId, datasourceId, error }); + publishActions(outerObservable, actions); + } + + if (state === LoadingState.Streaming) { + if (event.request && event.request.range) { + let newRange = event.request.range; + if (isString(newRange.raw.from)) { + newRange = { + from: dateMath.parse(newRange.raw.from, false), + to: dateMath.parse(newRange.raw.to, true), + raw: newRange.raw, + }; + } + outerObservable.next(changeRangeAction({ exploreId, range: newRange })); + } + outerObservable.next( + limitMessageRatePayloadAction({ + exploreId, + series: delta, + datasourceId, + }) + ); + } + + if (state === LoadingState.Done || state === LoadingState.Loading) { + const actions = processResponse({ + exploreId, + exploreItemState, + datasourceId, + now, + loadingState: state, + series: null, + delta, + }); + publishActions(outerObservable, actions); + } + }, + }); + + // query subscription, handles datasourceInstance.query response and pushes that forward + const querySubscription = getQueryResponse(datasourceInstance, transaction.options, observer) + .pipe( + mergeMap((response: DataQueryResponse) => { + return processResponse({ + exploreId, + exploreItemState, + datasourceId, + now, + loadingState: LoadingState.Done, + series: response && response.data ? response.data : [], + delta: null, + }); + }), + catchError(error => { + return processError({ exploreId, datasourceId, error }); + }) + ) + .subscribe({ next: (action: ActionOf) => outerObservable.next(action) }); + + // this unsubscribe method will be called when any of the takeUntil actions below happen + const unsubscribe = () => { + if (datasourceUnsubscribe) { + datasourceUnsubscribe(); + } + querySubscription.unsubscribe(); + streamSubscription.unsubscribe(); + streamHandler.unsubscribe(); + outerObservable.unsubscribe(); + }; + + return unsubscribe; + }); + + return observable.pipe( + takeUntil( + action$ + .ofType( + runQueriesBatchAction.type, + resetExploreAction.type, + updateDatasourceInstanceAction.type, + changeRefreshIntervalAction.type, + clearQueriesAction.type + ) + .pipe( + filter(action => { + if (action.type === resetExploreAction.type) { + return true; // stops all subscriptions if user navigates away + } + + if (action.type === updateDatasourceInstanceAction.type && action.payload.exploreId === exploreId) { + return true; // stops subscriptions if user changes data source + } + + if (action.type === changeRefreshIntervalAction.type && action.payload.exploreId === exploreId) { + return !isLive(action.payload.refreshInterval); // stops subscriptions if user changes refresh interval away from 'Live' + } + + if (action.type === clearQueriesAction.type && action.payload.exploreId === exploreId) { + return true; // stops subscriptions if user clears all queries + } + + return action.payload.exploreId === exploreId; + }) + ) + ) + ); + }) + ); +}; diff --git a/public/app/features/explore/state/epics/runQueriesEpic.test.ts b/public/app/features/explore/state/epics/runQueriesEpic.test.ts new file mode 100644 index 00000000000..87b1f86513f --- /dev/null +++ b/public/app/features/explore/state/epics/runQueriesEpic.test.ts @@ -0,0 +1,71 @@ +import { mockExploreState } from 'test/mocks/mockExploreState'; +import { epicTester } from 'test/core/redux/epicTester'; +import { runQueriesAction, stateSaveAction, runQueriesBatchAction, clearQueriesAction } from '../actionTypes'; +import { runQueriesEpic } from './runQueriesEpic'; + +describe('runQueriesEpic', () => { + describe('when runQueriesAction is dispatched', () => { + describe('and there is no datasourceError', () => { + describe('and we have non empty queries', () => { + describe('and explore is not live', () => { + it('then runQueriesBatchAction and stateSaveAction are dispatched', () => { + const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }]; + const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ queries }); + + epicTester(runQueriesEpic, state) + .whenActionIsDispatched(runQueriesAction({ exploreId, range: null })) + .thenResultingActionsEqual( + runQueriesBatchAction({ + exploreId, + queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: false }, + }) + ); + }); + }); + + describe('and explore is live', () => { + it('then runQueriesBatchAction and stateSaveAction are dispatched', () => { + const queries = [{ refId: 'A', key: '123456', expr: '{__filename__="some.log"}' }]; + const { exploreId, state, datasourceInterval, containerWidth } = mockExploreState({ + queries, + isLive: true, + streaming: true, + }); + + epicTester(runQueriesEpic, state) + .whenActionIsDispatched(runQueriesAction({ exploreId, range: null })) + .thenResultingActionsEqual( + runQueriesBatchAction({ + exploreId, + queryOptions: { interval: datasourceInterval, maxDataPoints: containerWidth, live: true }, + }) + ); + }); + }); + }); + + describe('and we have no queries', () => { + it('then clearQueriesAction and stateSaveAction are dispatched', () => { + const queries = []; + const { exploreId, state } = mockExploreState({ queries }); + + epicTester(runQueriesEpic, state) + .whenActionIsDispatched(runQueriesAction({ exploreId, range: null })) + .thenResultingActionsEqual(clearQueriesAction({ exploreId }), stateSaveAction()); + }); + }); + }); + + describe('and there is a datasourceError', () => { + it('then no actions are dispatched', () => { + const { exploreId, state } = mockExploreState({ + datasourceError: { message: 'Some error' }, + }); + + epicTester(runQueriesEpic, state) + .whenActionIsDispatched(runQueriesAction({ exploreId, range: null })) + .thenNoActionsWhereDispatched(); + }); + }); + }); +}); diff --git a/public/app/features/explore/state/epics/runQueriesEpic.ts b/public/app/features/explore/state/epics/runQueriesEpic.ts new file mode 100644 index 00000000000..2102c11b103 --- /dev/null +++ b/public/app/features/explore/state/epics/runQueriesEpic.ts @@ -0,0 +1,39 @@ +import { Epic } from 'redux-observable'; +import { NEVER } from 'rxjs'; +import { mergeMap } from 'rxjs/operators'; + +import { ActionOf } from 'app/core/redux/actionCreatorFactory'; +import { StoreState } from 'app/types/store'; +import { hasNonEmptyQuery } from 'app/core/utils/explore'; +import { + clearQueriesAction, + runQueriesAction, + RunQueriesPayload, + runQueriesBatchAction, + stateSaveAction, +} from '../actionTypes'; + +export const runQueriesEpic: Epic, ActionOf, StoreState> = (action$, state$) => { + return action$.ofType(runQueriesAction.type).pipe( + mergeMap((action: ActionOf) => { + const { exploreId } = action.payload; + const { datasourceInstance, queries, datasourceError, containerWidth, isLive } = state$.value.explore[exploreId]; + + if (datasourceError) { + // let's not run any queries if data source is in a faulty state + return NEVER; + } + + if (!hasNonEmptyQuery(queries)) { + return [clearQueriesAction({ exploreId }), stateSaveAction()]; // Remember to save to state and update location + } + + // Some datasource's query builders allow per-query interval limits, + // but we're using the datasource interval limit for now + const interval = datasourceInstance.interval; + const live = isLive; + + return [runQueriesBatchAction({ exploreId, queryOptions: { interval, maxDataPoints: containerWidth, live } })]; + }) + ); +}; diff --git a/public/app/features/explore/state/epics/stateSaveEpic.test.ts b/public/app/features/explore/state/epics/stateSaveEpic.test.ts new file mode 100644 index 00000000000..bee12ad92a9 --- /dev/null +++ b/public/app/features/explore/state/epics/stateSaveEpic.test.ts @@ -0,0 +1,61 @@ +import { epicTester } from 'test/core/redux/epicTester'; +import { stateSaveEpic } from './stateSaveEpic'; +import { stateSaveAction, setUrlReplacedAction } from '../actionTypes'; +import { updateLocation } from 'app/core/actions/location'; +import { mockExploreState } from 'test/mocks/mockExploreState'; + +describe('stateSaveEpic', () => { + describe('when stateSaveAction is dispatched', () => { + describe('and there is a left state', () => { + describe('and no split', () => { + it('then the correct actions are dispatched', () => { + const { exploreId, state } = mockExploreState(); + + epicTester(stateSaveEpic, state) + .whenActionIsDispatched(stateSaveAction()) + .thenResultingActionsEqual( + updateLocation({ + query: { left: '["now-6h","now","test",{"ui":[true,true,true,null]}]' }, + replace: true, + }), + setUrlReplacedAction({ exploreId }) + ); + }); + }); + + describe('and explore is splitted', () => { + it('then the correct actions are dispatched', () => { + const { exploreId, state } = mockExploreState({ split: true }); + + epicTester(stateSaveEpic, state) + .whenActionIsDispatched(stateSaveAction()) + .thenResultingActionsEqual( + updateLocation({ + query: { + left: '["now-6h","now","test",{"ui":[true,true,true,null]}]', + right: '["now-6h","now","test",{"ui":[true,true,true,null]}]', + }, + replace: true, + }), + setUrlReplacedAction({ exploreId }) + ); + }); + }); + }); + + describe('and urlReplaced is true', () => { + it('then setUrlReplacedAction should not be dispatched', () => { + const { state } = mockExploreState({ urlReplaced: true }); + + epicTester(stateSaveEpic, state) + .whenActionIsDispatched(stateSaveAction()) + .thenResultingActionsEqual( + updateLocation({ + query: { left: '["now-6h","now","test",{"ui":[true,true,true,null]}]' }, + replace: false, + }) + ); + }); + }); + }); +}); diff --git a/public/app/features/explore/state/epics/stateSaveEpic.ts b/public/app/features/explore/state/epics/stateSaveEpic.ts new file mode 100644 index 00000000000..107f1de547b --- /dev/null +++ b/public/app/features/explore/state/epics/stateSaveEpic.ts @@ -0,0 +1,72 @@ +import { Epic } from 'redux-observable'; +import { mergeMap } from 'rxjs/operators'; +import { RawTimeRange, TimeRange } from '@grafana/ui/src/types/time'; +import { isDateTime } from '@grafana/ui/src/utils/moment_wrapper'; + +import { ActionOf } from 'app/core/redux/actionCreatorFactory'; +import { StoreState } from 'app/types/store'; +import { ExploreUrlState, ExploreId } from 'app/types/explore'; +import { clearQueryKeys, serializeStateToUrlParam } from 'app/core/utils/explore'; +import { updateLocation } from 'app/core/actions/location'; +import { setUrlReplacedAction, stateSaveAction } from '../actionTypes'; + +const toRawTimeRange = (range: TimeRange): RawTimeRange => { + let from = range.raw.from; + if (isDateTime(from)) { + from = from.valueOf().toString(10); + } + + let to = range.raw.to; + if (isDateTime(to)) { + to = to.valueOf().toString(10); + } + + return { + from, + to, + }; +}; + +export const stateSaveEpic: Epic, ActionOf, StoreState> = (action$, state$) => { + return action$.ofType(stateSaveAction.type).pipe( + mergeMap(() => { + const { left, right, split } = state$.value.explore; + const replace = left && left.urlReplaced === false; + const urlStates: { [index: string]: string } = {}; + const leftUrlState: ExploreUrlState = { + datasource: left.datasourceInstance.name, + queries: left.queries.map(clearQueryKeys), + range: toRawTimeRange(left.range), + ui: { + showingGraph: left.showingGraph, + showingLogs: true, + showingTable: left.showingTable, + dedupStrategy: left.dedupStrategy, + }, + }; + urlStates.left = serializeStateToUrlParam(leftUrlState, true); + if (split) { + const rightUrlState: ExploreUrlState = { + datasource: right.datasourceInstance.name, + queries: right.queries.map(clearQueryKeys), + range: toRawTimeRange(right.range), + ui: { + showingGraph: right.showingGraph, + showingLogs: true, + showingTable: right.showingTable, + dedupStrategy: right.dedupStrategy, + }, + }; + + urlStates.right = serializeStateToUrlParam(rightUrlState, true); + } + + const actions: Array> = [updateLocation({ query: urlStates, replace })]; + if (replace) { + actions.push(setUrlReplacedAction({ exploreId: ExploreId.left })); + } + + return actions; + }) + ); +}; diff --git a/public/app/features/explore/state/reducers.test.ts b/public/app/features/explore/state/reducers.test.ts index 0c37a4b388e..1f553313f80 100644 --- a/public/app/features/explore/state/reducers.test.ts +++ b/public/app/features/explore/state/reducers.test.ts @@ -17,7 +17,6 @@ import { import { reducerTester } from 'test/core/redux/reducerTester'; import { scanStartAction, - scanStopAction, testDataSourcePendingAction, testDataSourceSuccessAction, testDataSourceFailureAction, @@ -25,6 +24,7 @@ import { splitOpenAction, splitCloseAction, changeModeAction, + scanStopAction, runQueriesAction, } from './actionTypes'; import { Reducer } from 'redux'; @@ -32,7 +32,7 @@ import { ActionOf } from 'app/core/redux/actionCreatorFactory'; import { updateLocation } from 'app/core/actions/location'; import { serializeStateToUrlParam } from 'app/core/utils/explore'; import TableModel from 'app/core/table_model'; -import { DataSourceApi, DataQuery, LogsModel, LogsDedupStrategy, dateTime } from '@grafana/ui'; +import { DataSourceApi, DataQuery, LogsModel, LogsDedupStrategy, LoadingState, dateTime } from '@grafana/ui'; describe('Explore item reducer', () => { describe('scanning', () => { @@ -166,9 +166,7 @@ describe('Explore item reducer', () => { queryKeys, supportedModes: [ExploreMode.Metrics, ExploreMode.Logs], mode: ExploreMode.Metrics, - graphIsLoading: false, - tableIsLoading: false, - logIsLoading: false, + loadingState: LoadingState.NotStarted, latency: 0, queryErrors: [], }; diff --git a/public/app/features/explore/state/reducers.ts b/public/app/features/explore/state/reducers.ts index 969ecd02066..67775b9626b 100644 --- a/public/app/features/explore/state/reducers.ts +++ b/public/app/features/explore/state/reducers.ts @@ -1,6 +1,5 @@ import _ from 'lodash'; import { - calculateResultsFromQueryTransactions, getIntervals, ensureQueries, getQueryKeys, @@ -10,7 +9,7 @@ import { sortLogsResult, } from 'app/core/utils/explore'; import { ExploreItemState, ExploreState, ExploreId, ExploreUpdateState, ExploreMode } from 'app/types/explore'; -import { DataQuery, LogsModel } from '@grafana/ui'; +import { DataQuery, LoadingState } from '@grafana/ui'; import { HigherOrderAction, ActionTypes, @@ -20,10 +19,17 @@ import { splitCloseAction, SplitCloseActionPayload, loadExploreDatasources, - runQueriesAction, historyUpdatedAction, - resetQueryErrorAction, changeModeAction, + queryFailureAction, + setUrlReplacedAction, + querySuccessAction, + scanRangeAction, + scanStopAction, + resetQueryErrorAction, + queryStartAction, + runQueriesAction, + changeRangeAction, } from './actionTypes'; import { reducerFactory } from 'app/core/redux'; import { @@ -40,13 +46,8 @@ import { loadDatasourcePendingAction, loadDatasourceReadyAction, modifyQueriesAction, - queryFailureAction, - queryStartAction, - querySuccessAction, removeQueryRowAction, - scanRangeAction, scanStartAction, - scanStopAction, setQueriesAction, toggleTableAction, queriesImportedAction, @@ -57,8 +58,6 @@ import { updateLocation } from 'app/core/actions/location'; import { LocationUpdate } from 'app/types'; import TableModel from 'app/core/table_model'; import { isLive } from '@grafana/ui/src/components/RefreshPicker/RefreshPicker'; -import { subscriptionDataReceivedAction, startSubscriptionAction } from './epics'; -import { seriesDataToLogsModel } from 'app/core/logs_model'; export const DEFAULT_RANGE = { from: 'now-6h', @@ -100,9 +99,7 @@ export const makeExploreItemState = (): ExploreItemState => ({ scanRange: null, showingGraph: true, showingTable: true, - graphIsLoading: false, - logIsLoading: false, - tableIsLoading: false, + loadingState: LoadingState.NotStarted, queryKeys: [], urlState: null, update: makeInitialUpdateState(), @@ -111,6 +108,7 @@ export const makeExploreItemState = (): ExploreItemState => ({ supportedModes: [], mode: null, isLive: false, + urlReplaced: false, }); /** @@ -191,10 +189,8 @@ export const itemReducer = reducerFactory({} as ExploreItemSta return { ...state, - refreshInterval: refreshInterval, - graphIsLoading: live ? true : false, - tableIsLoading: live ? true : false, - logIsLoading: live ? true : false, + refreshInterval, + loadingState: live ? LoadingState.Streaming : LoadingState.NotStarted, isLive: live, logsResult, }; @@ -267,9 +263,7 @@ export const itemReducer = reducerFactory({} as ExploreItemSta datasourceInstance, queryErrors: [], latency: 0, - graphIsLoading: false, - logIsLoading: false, - tableIsLoading: false, + loadingState: LoadingState.NotStarted, StartPage, showingStartPage: Boolean(StartPage), queryKeys: getQueryKeys(state.queries, datasourceInstance), @@ -346,35 +340,29 @@ export const itemReducer = reducerFactory({} as ExploreItemSta .addMapper({ filter: queryFailureAction, mapper: (state, action): ExploreItemState => { - const { resultType, response } = action.payload; + const { response } = action.payload; const queryErrors = state.queryErrors.concat(response); return { ...state, - graphResult: resultType === 'Graph' ? null : state.graphResult, - tableResult: resultType === 'Table' ? null : state.tableResult, - logsResult: resultType === 'Logs' ? null : state.logsResult, + graphResult: null, + tableResult: null, + logsResult: null, latency: 0, queryErrors, - graphIsLoading: resultType === 'Graph' ? false : state.graphIsLoading, - logIsLoading: resultType === 'Logs' ? false : state.logIsLoading, - tableIsLoading: resultType === 'Table' ? false : state.tableIsLoading, + loadingState: LoadingState.Error, update: makeInitialUpdateState(), }; }, }) .addMapper({ filter: queryStartAction, - mapper: (state, action): ExploreItemState => { - const { resultType } = action.payload; - + mapper: (state): ExploreItemState => { return { ...state, queryErrors: [], latency: 0, - graphIsLoading: resultType === 'Graph' ? true : state.graphIsLoading, - logIsLoading: resultType === 'Logs' ? true : state.logIsLoading, - tableIsLoading: resultType === 'Table' ? true : state.tableIsLoading, + loadingState: LoadingState.Loading, update: makeInitialUpdateState(), }; }, @@ -382,80 +370,20 @@ export const itemReducer = reducerFactory({} as ExploreItemSta .addMapper({ filter: querySuccessAction, mapper: (state, action): ExploreItemState => { - const { queryIntervals, refreshInterval } = state; - const { result, resultType, latency } = action.payload; - const results = calculateResultsFromQueryTransactions(result, resultType, queryIntervals.intervalMs); - const live = isLive(refreshInterval); - - if (live) { - return state; - } + const { latency, loadingState, graphResult, tableResult, logsResult } = action.payload; return { ...state, - graphResult: resultType === 'Graph' ? results.graphResult : state.graphResult, - tableResult: resultType === 'Table' ? results.tableResult : state.tableResult, - logsResult: - resultType === 'Logs' - ? sortLogsResult(results.logsResult, refreshInterval) - : sortLogsResult(state.logsResult, refreshInterval), + loadingState, + graphResult, + tableResult, + logsResult, latency, - graphIsLoading: live ? true : false, - logIsLoading: live ? true : false, - tableIsLoading: live ? true : false, showingStartPage: false, update: makeInitialUpdateState(), }; }, }) - .addMapper({ - filter: startSubscriptionAction, - mapper: (state): ExploreItemState => { - const logsResult = sortLogsResult(state.logsResult, state.refreshInterval); - - return { - ...state, - logsResult, - graphIsLoading: true, - logIsLoading: true, - tableIsLoading: true, - showingStartPage: false, - update: makeInitialUpdateState(), - }; - }, - }) - .addMapper({ - filter: subscriptionDataReceivedAction, - mapper: (state, action): ExploreItemState => { - const { queryIntervals, refreshInterval } = state; - const { data } = action.payload; - const live = isLive(refreshInterval); - - if (!live) { - return state; - } - - const newResults = seriesDataToLogsModel([data], queryIntervals.intervalMs); - const rowsInState = sortLogsResult(state.logsResult, state.refreshInterval).rows; - - const processedRows = []; - for (const row of rowsInState) { - processedRows.push({ ...row, fresh: false }); - } - for (const row of newResults.rows) { - processedRows.push({ ...row, fresh: true }); - } - - const rows = processedRows.slice(processedRows.length - 1000, 1000); - - const logsResult: LogsModel = state.logsResult ? { ...state.logsResult, rows } : { hasUniqueLabels: false, rows }; - - return { - ...state, - logsResult, - }; - }, - }) .addMapper({ filter: removeQueryRowAction, mapper: (state, action): ExploreItemState => { @@ -635,6 +563,24 @@ export const itemReducer = reducerFactory({} as ExploreItemSta }; }, }) + .addMapper({ + filter: setUrlReplacedAction, + mapper: (state): ExploreItemState => { + return { + ...state, + urlReplaced: true, + }; + }, + }) + .addMapper({ + filter: changeRangeAction, + mapper: (state, action): ExploreItemState => { + return { + ...state, + range: action.payload.range, + }; + }, + }) .create(); export const updateChildRefreshState = ( diff --git a/public/app/features/explore/utils/ResultProcessor.test.ts b/public/app/features/explore/utils/ResultProcessor.test.ts new file mode 100644 index 00000000000..4979afa538c --- /dev/null +++ b/public/app/features/explore/utils/ResultProcessor.test.ts @@ -0,0 +1,453 @@ +jest.mock('@grafana/ui/src/utils/moment_wrapper', () => ({ + dateTime: (ts: any) => { + return { + valueOf: () => ts, + fromNow: () => 'fromNow() jest mocked', + format: (fmt: string) => 'format() jest mocked', + }; + }, +})); + +import { ResultProcessor } from './ResultProcessor'; +import { ExploreItemState, ExploreMode } from 'app/types/explore'; +import TableModel from 'app/core/table_model'; +import { toFixed } from '@grafana/ui'; + +const testContext = (options: any = {}) => { + const response = [ + { + target: 'A-series', + alias: 'A-series', + datapoints: [[39.91264531864214, 1559038518831], [40.35179822906545, 1559038519831]], + refId: 'A', + }, + { + columns: [ + { + text: 'Time', + }, + { + text: 'Message', + }, + { + text: 'Description', + }, + { + text: 'Value', + }, + ], + rows: [ + [1559038518831, 'This is a message', 'Description', 23.1], + [1559038519831, 'This is a message', 'Description', 23.1], + ], + refId: 'B', + }, + ]; + const defaultOptions = { + mode: ExploreMode.Metrics, + replacePreviousResults: true, + result: { data: response }, + graphResult: [], + tableResult: new TableModel(), + logsResult: { hasUniqueLabels: false, rows: [] }, + }; + const combinedOptions = { ...defaultOptions, ...options }; + const state = ({ + mode: combinedOptions.mode, + graphResult: combinedOptions.graphResult, + tableResult: combinedOptions.tableResult, + logsResult: combinedOptions.logsResult, + queryIntervals: { intervalMs: 10 }, + } as any) as ExploreItemState; + const resultProcessor = new ResultProcessor(state, combinedOptions.replacePreviousResults, combinedOptions.result); + + return { + result: combinedOptions.result, + resultProcessor, + }; +}; + +describe('ResultProcessor', () => { + describe('constructed without result', () => { + describe('when calling getRawData', () => { + it('then it should return an empty array', () => { + const { resultProcessor } = testContext({ result: null }); + const theResult = resultProcessor.getRawData(); + + expect(theResult).toEqual([]); + }); + }); + + describe('when calling getGraphResult', () => { + it('then it should return an empty array', () => { + const { resultProcessor } = testContext({ result: null }); + const theResult = resultProcessor.getGraphResult(); + + expect(theResult).toEqual([]); + }); + }); + + describe('when calling getTableResult', () => { + it('then it should return an empty TableModel', () => { + const { resultProcessor } = testContext({ result: null }); + const theResult = resultProcessor.getTableResult(); + + expect(theResult).toEqual(new TableModel()); + }); + }); + + describe('when calling getLogsResult', () => { + it('then it should return null', () => { + const { resultProcessor } = testContext({ result: null }); + const theResult = resultProcessor.getLogsResult(); + + expect(theResult).toBeNull(); + }); + }); + }); + + describe('constructed with a result that is a DataQueryResponse', () => { + describe('when calling getRawData', () => { + it('then it should return result.data', () => { + const { result, resultProcessor } = testContext(); + const theResult = resultProcessor.getRawData(); + + expect(theResult).toEqual(result.data); + }); + }); + + describe('when calling getGraphResult', () => { + it('then it should return correct graph result', () => { + const { resultProcessor } = testContext(); + const theResult = resultProcessor.getGraphResult(); + + expect(theResult).toEqual([ + { + alias: 'A-series', + aliasEscaped: 'A-series', + bars: { + fillColor: '#7EB26D', + }, + hasMsResolution: true, + id: 'A-series', + label: 'A-series', + legend: true, + stats: {}, + color: '#7EB26D', + datapoints: [[39.91264531864214, 1559038518831], [40.35179822906545, 1559038519831]], + unit: undefined, + valueFormater: toFixed, + }, + ]); + }); + }); + + describe('when calling getTableResult', () => { + it('then it should return correct table result', () => { + const { resultProcessor } = testContext(); + const theResult = resultProcessor.getTableResult(); + + expect(theResult).toEqual({ + columnMap: {}, + columns: [{ text: 'Time' }, { text: 'Message' }, { text: 'Description' }, { text: 'Value' }], + rows: [ + [1559038518831, 'This is a message', 'Description', 23.1], + [1559038519831, 'This is a message', 'Description', 23.1], + ], + type: 'table', + }); + }); + }); + + describe('when calling getLogsResult', () => { + it('then it should return correct logs result', () => { + const { resultProcessor } = testContext({ mode: ExploreMode.Logs, observerResponse: null }); + const theResult = resultProcessor.getLogsResult(); + + expect(theResult).toEqual({ + hasUniqueLabels: false, + meta: [], + rows: [ + { + entry: 'This is a message', + hasAnsi: false, + labels: undefined, + logLevel: 'unknown', + raw: 'This is a message', + searchWords: [], + timeEpochMs: 1559038519831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1559038519831, + uniqueLabels: {}, + }, + { + entry: 'This is a message', + hasAnsi: false, + labels: undefined, + logLevel: 'unknown', + raw: 'This is a message', + searchWords: [], + timeEpochMs: 1559038518831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1559038518831, + uniqueLabels: {}, + }, + ], + series: [ + { + alias: 'A-series', + datapoints: [[39.91264531864214, 1559038518831], [40.35179822906545, 1559038519831]], + meta: undefined, + refId: 'A', + target: 'A-series', + unit: undefined, + }, + ], + }); + }); + }); + }); + + describe('constructed with result that is a DataQueryResponse and merging with previous results', () => { + describe('when calling getRawData', () => { + it('then it should return result.data', () => { + const { result, resultProcessor } = testContext(); + const theResult = resultProcessor.getRawData(); + + expect(theResult).toEqual(result.data); + }); + }); + + describe('when calling getGraphResult', () => { + it('then it should return correct graph result', () => { + const { resultProcessor } = testContext({ + replacePreviousResults: false, + graphResult: [ + { + alias: 'A-series', + aliasEscaped: 'A-series', + bars: { + fillColor: '#7EB26D', + }, + hasMsResolution: true, + id: 'A-series', + label: 'A-series', + legend: true, + stats: {}, + color: '#7EB26D', + datapoints: [[19.91264531864214, 1558038518831], [20.35179822906545, 1558038519831]], + unit: undefined, + valueFormater: toFixed, + }, + ], + }); + const theResult = resultProcessor.getGraphResult(); + + expect(theResult).toEqual([ + { + alias: 'A-series', + aliasEscaped: 'A-series', + bars: { + fillColor: '#7EB26D', + }, + hasMsResolution: true, + id: 'A-series', + label: 'A-series', + legend: true, + stats: {}, + color: '#7EB26D', + datapoints: [ + [19.91264531864214, 1558038518831], + [20.35179822906545, 1558038519831], + [39.91264531864214, 1559038518831], + [40.35179822906545, 1559038519831], + ], + unit: undefined, + valueFormater: toFixed, + }, + ]); + }); + }); + + describe('when calling getTableResult', () => { + it('then it should return correct table result', () => { + const { resultProcessor } = testContext({ + replacePreviousResults: false, + tableResult: { + columnMap: {}, + columns: [{ text: 'Time' }, { text: 'Message' }, { text: 'Description' }, { text: 'Value' }], + rows: [ + [1558038518831, 'This is a previous message 1', 'Previous Description 1', 21.1], + [1558038519831, 'This is a previous message 2', 'Previous Description 2', 22.1], + ], + type: 'table', + }, + }); + const theResult = resultProcessor.getTableResult(); + + expect(theResult).toEqual({ + columnMap: {}, + columns: [{ text: 'Time' }, { text: 'Message' }, { text: 'Description' }, { text: 'Value' }], + rows: [ + [1558038518831, 'This is a previous message 1', 'Previous Description 1', 21.1], + [1558038519831, 'This is a previous message 2', 'Previous Description 2', 22.1], + [1559038518831, 'This is a message', 'Description', 23.1], + [1559038519831, 'This is a message', 'Description', 23.1], + ], + type: 'table', + }); + }); + }); + + describe('when calling getLogsResult', () => { + it('then it should return correct logs result', () => { + const { resultProcessor } = testContext({ + mode: ExploreMode.Logs, + replacePreviousResults: false, + logsResult: { + hasUniqueLabels: false, + meta: [], + rows: [ + { + entry: 'This is a previous message 1', + fresh: true, + hasAnsi: false, + labels: { cluster: 'some-cluster' }, + logLevel: 'unknown', + raw: 'This is a previous message 1', + searchWords: [], + timeEpochMs: 1558038519831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1558038519831, + uniqueLabels: {}, + }, + { + entry: 'This is a previous message 2', + fresh: true, + hasAnsi: false, + labels: { cluster: 'some-cluster' }, + logLevel: 'unknown', + raw: 'This is a previous message 2', + searchWords: [], + timeEpochMs: 1558038518831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1558038518831, + uniqueLabels: {}, + }, + ], + series: [ + { + alias: 'A-series', + aliasEscaped: 'A-series', + bars: { + fillColor: '#7EB26D', + }, + hasMsResolution: true, + id: 'A-series', + label: 'A-series', + legend: true, + stats: {}, + color: '#7EB26D', + datapoints: [[37.91264531864214, 1558038518831], [38.35179822906545, 1558038519831]], + unit: undefined, + valueFormater: toFixed, + }, + ], + }, + }); + const theResult = resultProcessor.getLogsResult(); + const expected = { + hasUniqueLabels: false, + meta: [], + rows: [ + { + entry: 'This is a previous message 1', + fresh: false, + hasAnsi: false, + labels: { cluster: 'some-cluster' }, + logLevel: 'unknown', + raw: 'This is a previous message 1', + searchWords: [], + timeEpochMs: 1558038519831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1558038519831, + uniqueLabels: {}, + }, + { + entry: 'This is a previous message 2', + fresh: false, + hasAnsi: false, + labels: { cluster: 'some-cluster' }, + logLevel: 'unknown', + raw: 'This is a previous message 2', + searchWords: [], + timeEpochMs: 1558038518831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1558038518831, + uniqueLabels: {}, + }, + { + entry: 'This is a message', + fresh: true, + hasAnsi: false, + labels: undefined, + logLevel: 'unknown', + raw: 'This is a message', + searchWords: [], + timeEpochMs: 1559038519831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1559038519831, + uniqueLabels: {}, + }, + { + entry: 'This is a message', + fresh: true, + hasAnsi: false, + labels: undefined, + logLevel: 'unknown', + raw: 'This is a message', + searchWords: [], + timeEpochMs: 1559038518831, + timeFromNow: 'fromNow() jest mocked', + timeLocal: 'format() jest mocked', + timestamp: 1559038518831, + uniqueLabels: {}, + }, + ], + series: [ + { + alias: 'A-series', + aliasEscaped: 'A-series', + bars: { + fillColor: '#7EB26D', + }, + hasMsResolution: true, + id: 'A-series', + label: 'A-series', + legend: true, + stats: {}, + color: '#7EB26D', + datapoints: [ + [37.91264531864214, 1558038518831], + [38.35179822906545, 1558038519831], + [39.91264531864214, 1559038518831], + [40.35179822906545, 1559038519831], + ], + unit: undefined, + valueFormater: toFixed, + }, + ], + }; + + expect(theResult).toEqual(expected); + }); + }); + }); +}); diff --git a/public/app/features/explore/utils/ResultProcessor.ts b/public/app/features/explore/utils/ResultProcessor.ts new file mode 100644 index 00000000000..2521c4914f8 --- /dev/null +++ b/public/app/features/explore/utils/ResultProcessor.ts @@ -0,0 +1,176 @@ +import { + DataQueryResponse, + TableData, + isTableData, + LogsModel, + toSeriesData, + guessFieldTypes, + DataQueryResponseData, + TimeSeries, +} from '@grafana/ui'; + +import { ExploreItemState, ExploreMode } from 'app/types/explore'; +import { getProcessedSeriesData } from 'app/features/dashboard/state/PanelQueryState'; +import TableModel, { mergeTablesIntoModel } from 'app/core/table_model'; +import { sortLogsResult } from 'app/core/utils/explore'; +import { seriesDataToLogsModel } from 'app/core/logs_model'; +import { default as TimeSeries2 } from 'app/core/time_series2'; +import { DataProcessor } from 'app/plugins/panel/graph/data_processor'; + +export class ResultProcessor { + private rawData: DataQueryResponseData[] = []; + private metrics: TimeSeries[] = []; + private tables: TableData[] = []; + + constructor( + private state: ExploreItemState, + private replacePreviousResults: boolean, + result?: DataQueryResponse | DataQueryResponseData[] + ) { + if (result && result.hasOwnProperty('data')) { + this.rawData = (result as DataQueryResponse).data; + } else { + this.rawData = (result as DataQueryResponseData[]) || []; + } + + if (this.state.mode !== ExploreMode.Metrics) { + return; + } + + for (let index = 0; index < this.rawData.length; index++) { + const res: any = this.rawData[index]; + const isTable = isTableData(res); + if (isTable) { + this.tables.push(res); + } else { + this.metrics.push(res); + } + } + } + + getRawData = (): any[] => { + return this.rawData; + }; + + getGraphResult = (): TimeSeries[] => { + if (this.state.mode !== ExploreMode.Metrics) { + return []; + } + + const newResults = this.makeTimeSeriesList(this.metrics); + return this.mergeGraphResults(newResults, this.state.graphResult); + }; + + getTableResult = (): TableModel => { + if (this.state.mode !== ExploreMode.Metrics) { + return new TableModel(); + } + + const prevTableResults = this.state.tableResult || []; + const tablesToMerge = this.replacePreviousResults ? this.tables : [].concat(prevTableResults, this.tables); + + return mergeTablesIntoModel(new TableModel(), ...tablesToMerge); + }; + + getLogsResult = (): LogsModel => { + if (this.state.mode !== ExploreMode.Logs) { + return null; + } + const graphInterval = this.state.queryIntervals.intervalMs; + const seriesData = this.rawData.map(result => guessFieldTypes(toSeriesData(result))); + const newResults = this.rawData ? seriesDataToLogsModel(seriesData, graphInterval) : null; + + if (this.replacePreviousResults) { + return newResults; + } + + const prevLogsResult: LogsModel = this.state.logsResult || { hasUniqueLabels: false, rows: [] }; + const sortedLogResult = sortLogsResult(prevLogsResult, this.state.refreshInterval); + const rowsInState = sortedLogResult.rows; + const seriesInState = sortedLogResult.series || []; + + const processedRows = []; + for (const row of rowsInState) { + processedRows.push({ ...row, fresh: false }); + } + for (const row of newResults.rows) { + processedRows.push({ ...row, fresh: true }); + } + + const processedSeries = this.mergeGraphResults(newResults.series, seriesInState); + + const slice = -1000; + const rows = processedRows.slice(slice); + const series = processedSeries.slice(slice); + + return { ...newResults, rows, series }; + }; + + private makeTimeSeriesList = (rawData: any[]) => { + const dataList = getProcessedSeriesData(rawData); + const dataProcessor = new DataProcessor({ xaxis: {}, aliasColors: [] }); // Hack before we use GraphSeriesXY instead + const timeSeries = dataProcessor.getSeriesList({ dataList }); + + return (timeSeries as any) as TimeSeries[]; // Hack before we use GraphSeriesXY instead + }; + + private isSameTimeSeries = (a: TimeSeries | TimeSeries2, b: TimeSeries | TimeSeries2) => { + if (a.hasOwnProperty('id') && b.hasOwnProperty('id')) { + if (a['id'] !== undefined && b['id'] !== undefined && a['id'] === b['id']) { + return true; + } + } + + if (a.hasOwnProperty('alias') && b.hasOwnProperty('alias')) { + if (a['alias'] !== undefined && b['alias'] !== undefined && a['alias'] === b['alias']) { + return true; + } + } + + return false; + }; + + private mergeGraphResults = ( + newResults: TimeSeries[] | TimeSeries2[], + prevResults: TimeSeries[] | TimeSeries2[] + ): TimeSeries[] => { + if (!prevResults || prevResults.length === 0 || this.replacePreviousResults) { + return (newResults as any) as TimeSeries[]; // Hack before we use GraphSeriesXY instead + } + + const results: TimeSeries[] = prevResults.slice() as TimeSeries[]; + + // update existing results + for (let index = 0; index < results.length; index++) { + const prevResult = results[index]; + for (const newResult of newResults) { + const isSame = this.isSameTimeSeries(prevResult, newResult); + + if (isSame) { + prevResult.datapoints = prevResult.datapoints.concat(newResult.datapoints); + break; + } + } + } + + // add new results + for (const newResult of newResults) { + let isNew = true; + for (const prevResult of results) { + const isSame = this.isSameTimeSeries(prevResult, newResult); + if (isSame) { + isNew = false; + break; + } + } + + if (isNew) { + const timeSeries2Result = new TimeSeries2({ ...newResult }); + + const result = (timeSeries2Result as any) as TimeSeries; // Hack before we use GraphSeriesXY instead + results.push(result); + } + } + return results; + }; +} diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index d86e5fe1922..b689d02ba13 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -1,5 +1,8 @@ // Libraries import _ from 'lodash'; +import { Subscription, of } from 'rxjs'; +import { webSocket } from 'rxjs/webSocket'; +import { catchError, map } from 'rxjs/operators'; // Services & Utils import * as dateMath from '@grafana/ui/src/utils/datemath'; @@ -17,11 +20,14 @@ import { DataSourceInstanceSettings, DataQueryError, LogRowModel, + DataStreamObserver, + LoadingState, + DataStreamState, } from '@grafana/ui'; import { LokiQuery, LokiOptions } from './types'; import { BackendSrv } from 'app/core/services/backend_srv'; import { TemplateSrv } from 'app/features/templating/template_srv'; -import { safeStringifyValue } from 'app/core/utils/explore'; +import { safeStringifyValue, convertToWebSocketUrl } from 'app/core/utils/explore'; export const DEFAULT_MAX_LINES = 1000; @@ -47,6 +53,7 @@ interface LokiContextQueryOptions { } export class LokiDatasource extends DataSourceApi { + private subscriptions: { [key: string]: Subscription } = null; languageProvider: LanguageProvider; maxLines: number; @@ -60,6 +67,7 @@ export class LokiDatasource extends DataSourceApi { this.languageProvider = new LanguageProvider(this); const settingsData = instanceSettings.jsonData || {}; this.maxLines = parseInt(settingsData.maxLines, 10) || DEFAULT_MAX_LINES; + this.subscriptions = {}; } _request(apiUrl: string, data?, options?: any) { @@ -73,41 +81,20 @@ export class LokiDatasource extends DataSourceApi { return this.backendSrv.datasourceRequest(req); } - convertToStreamTargets = (options: DataQueryRequest): Array<{ url: string; refId: string }> => { - return options.targets - .filter(target => target.expr && !target.hide) - .map(target => { - const interpolated = this.templateSrv.replace(target.expr); - const { query, regexp } = parseQuery(interpolated); - const refId = target.refId; - const baseUrl = this.instanceSettings.url; - const params = serializeParams({ query, regexp }); - const url = `${baseUrl}/api/prom/tail?${params}`; - - return { - url, - refId, - }; - }); - }; - - resultToSeriesData = (data: any, refId: string): SeriesData[] => { - const toSeriesData = (stream: any, refId: string) => ({ - ...logStreamToSeriesData(stream), + prepareLiveTarget(target: LokiQuery, options: DataQueryRequest) { + const interpolated = this.templateSrv.replace(target.expr); + const { query, regexp } = parseQuery(interpolated); + const refId = target.refId; + const baseUrl = this.instanceSettings.url; + const params = serializeParams({ query, regexp }); + const url = convertToWebSocketUrl(`${baseUrl}/api/prom/tail?${params}`); + return { + query, + regexp, + url, refId, - }); - - if (data.streams) { - // new Loki API purposed in https://github.com/grafana/loki/pull/590 - const series: SeriesData[] = []; - for (const stream of data.streams || []) { - series.push(toSeriesData(stream, refId)); - } - return series; - } - - return [toSeriesData(data, refId)]; - }; + }; + } prepareQueryTarget(target: LokiQuery, options: DataQueryRequest) { const interpolated = this.templateSrv.replace(target.expr); @@ -126,9 +113,106 @@ export class LokiDatasource extends DataSourceApi { }; } - async query(options: DataQueryRequest) { + unsubscribe = (refId: string) => { + const subscription = this.subscriptions[refId]; + if (subscription && !subscription.closed) { + subscription.unsubscribe(); + delete this.subscriptions[refId]; + } + }; + + processError = (err: any, target: any): DataQueryError => { + const error: DataQueryError = { + message: 'Unknown error during query transaction. Please check JS console logs.', + refId: target.refId, + }; + + if (err.data) { + if (typeof err.data === 'string') { + error.message = err.data; + } else if (err.data.error) { + error.message = safeStringifyValue(err.data.error); + } + } else if (err.message) { + error.message = err.message; + } else if (typeof err === 'string') { + error.message = err; + } + + error.status = err.status; + error.statusText = err.statusText; + + return error; + }; + + processResult = (data: any, target: any): SeriesData[] => { + const series: SeriesData[] = []; + + if (Object.keys(data).length === 0) { + return series; + } + + if (!data.streams) { + return [{ ...logStreamToSeriesData(data), refId: target.refId }]; + } + + for (const stream of data.streams || []) { + const seriesData = logStreamToSeriesData(stream); + seriesData.refId = target.refId; + seriesData.meta = { + searchWords: getHighlighterExpressionsFromQuery(formatQuery(target.query, target.regexp)), + limit: this.maxLines, + }; + series.push(seriesData); + } + + return series; + }; + + runLiveQueries = (options: DataQueryRequest, observer?: DataStreamObserver) => { + const liveTargets = options.targets + .filter(target => target.expr && !target.hide && target.live) + .map(target => this.prepareLiveTarget(target, options)); + + for (const liveTarget of liveTargets) { + const subscription = webSocket(liveTarget.url) + .pipe( + map((results: any[]) => { + const delta = this.processResult(results, liveTarget); + const state: DataStreamState = { + key: `loki-${liveTarget.refId}`, + request: options, + state: LoadingState.Streaming, + delta, + unsubscribe: () => this.unsubscribe(liveTarget.refId), + }; + + return state; + }), + catchError(err => { + const error = this.processError(err, liveTarget); + const state: DataStreamState = { + key: `loki-${liveTarget.refId}`, + request: options, + state: LoadingState.Error, + error, + unsubscribe: () => this.unsubscribe(liveTarget.refId), + }; + + return of(state); + }) + ) + .subscribe({ + next: state => observer(state), + }); + + this.subscriptions[liveTarget.refId] = subscription; + } + }; + + runQueries = async (options: DataQueryRequest) => { const queryTargets = options.targets - .filter(target => target.expr && !target.hide) + .filter(target => target.expr && !target.hide && !target.live) .map(target => this.prepareQueryTarget(target, options)); if (queryTargets.length === 0) { @@ -141,53 +225,29 @@ export class LokiDatasource extends DataSourceApi { return err; } - const error: DataQueryError = { - message: 'Unknown error during query transaction. Please check JS console logs.', - refId: target.refId, - }; - - if (err.data) { - if (typeof err.data === 'string') { - error.message = err.data; - } else if (err.data.error) { - error.message = safeStringifyValue(err.data.error); - } - } else if (err.message) { - error.message = err.message; - } else if (typeof err === 'string') { - error.message = err; - } - - error.status = err.status; - error.statusText = err.statusText; - + const error: DataQueryError = this.processError(err, target); throw error; }) ); return Promise.all(queries).then((results: any[]) => { - const series: Array = []; + let series: SeriesData[] = []; for (let i = 0; i < results.length; i++) { const result = results[i]; if (result.data) { - const refId = queryTargets[i].refId; - for (const stream of result.data.streams || []) { - const seriesData = logStreamToSeriesData(stream); - seriesData.refId = refId; - seriesData.meta = { - searchWords: getHighlighterExpressionsFromQuery( - formatQuery(queryTargets[i].query, queryTargets[i].regexp) - ), - limit: this.maxLines, - }; - series.push(seriesData); - } + series = series.concat(this.processResult(result.data, queryTargets[i])); } } return { data: series }; }); + }; + + async query(options: DataQueryRequest, observer?: DataStreamObserver) { + this.runLiveQueries(options, observer); + + return this.runQueries(options); } async importQueries(queries: LokiQuery[], originMeta: PluginMeta): Promise { diff --git a/public/app/plugins/datasource/loki/language_provider.ts b/public/app/plugins/datasource/loki/language_provider.ts index 64bf876f2c7..ff187bd8842 100644 --- a/public/app/plugins/datasource/loki/language_provider.ts +++ b/public/app/plugins/datasource/loki/language_provider.ts @@ -16,6 +16,7 @@ import { } from 'app/types/explore'; import { LokiQuery } from './types'; import { dateTime } from '@grafana/ui/src/utils/moment_wrapper'; +import { PromQuery } from '../prometheus/types'; const DEFAULT_KEYS = ['job', 'namespace']; const EMPTY_SELECTOR = '{}'; @@ -168,8 +169,9 @@ export default class LokiLanguageProvider extends LanguageProvider { return Promise.all( queries.map(async query => { const expr = await this.importPrometheusQuery(query.expr); + const { context, ...rest } = query as PromQuery; return { - ...query, + ...rest, expr, }; }) diff --git a/public/app/plugins/datasource/loki/plugin.json b/public/app/plugins/datasource/loki/plugin.json index 1c880bce811..ca630b56bc7 100644 --- a/public/app/plugins/datasource/loki/plugin.json +++ b/public/app/plugins/datasource/loki/plugin.json @@ -8,6 +8,7 @@ "alerting": false, "annotations": false, "logs": true, + "streaming": true, "info": { "description": "Like Prometheus but for logs. OSS logging solution from Grafana Labs", diff --git a/public/app/plugins/datasource/loki/types.ts b/public/app/plugins/datasource/loki/types.ts index 4c973f8a79e..e733c3b47cb 100644 --- a/public/app/plugins/datasource/loki/types.ts +++ b/public/app/plugins/datasource/loki/types.ts @@ -2,6 +2,9 @@ import { DataQuery, Labels, DataSourceJsonData } from '@grafana/ui/src/types'; export interface LokiQuery extends DataQuery { expr: string; + live?: boolean; + query?: string; + regexp?: string; } export interface LokiOptions extends DataSourceJsonData { diff --git a/public/app/plugins/datasource/prometheus/components/PromQueryField.tsx b/public/app/plugins/datasource/prometheus/components/PromQueryField.tsx index 14d03df6d38..c432e9d58b4 100644 --- a/public/app/plugins/datasource/prometheus/components/PromQueryField.tsx +++ b/public/app/plugins/datasource/prometheus/components/PromQueryField.tsx @@ -223,7 +223,7 @@ class PromQueryField extends React.PureComponent { type: string; @@ -83,7 +87,7 @@ export class PrometheusDatasource extends DataSourceApi } } - _request(url, data?, options?: any) { + _request(url: string, data?: any, options?: any) { options = _.defaults(options || {}, { url: this.url + url, method: this.httpMethod, @@ -119,11 +123,11 @@ export class PrometheusDatasource extends DataSourceApi } // Use this for tab completion features, wont publish response to other components - metadataRequest(url) { + metadataRequest(url: string) { return this._request(url, null, { method: 'GET', silent: true }); } - interpolateQueryExpr(value, variable, defaultFormatFn) { + interpolateQueryExpr(value: any, variable: any, defaultFormatFn: any) { // if no multi or include all do not regexEscape if (!variable.multi && !variable.includeAll) { return prometheusRegularEscape(value); @@ -141,34 +145,132 @@ export class PrometheusDatasource extends DataSourceApi return this.templateSrv.variableExists(target.expr); } - query(options: DataQueryRequest): Promise<{ data: any }> { - const start = this.getPrometheusTime(options.range.from, false); - const end = this.getPrometheusTime(options.range.to, true); + processResult = (response: any, query: PromQueryRequest, target: PromQuery, responseListLength: number) => { + // Keeping original start/end for transformers + const transformerOptions = { + format: target.format, + step: query.step, + legendFormat: target.legendFormat, + start: query.start, + end: query.end, + query: query.expr, + responseListLength, + refId: target.refId, + valueWithRefId: target.valueWithRefId, + }; + const series = this.resultTransformer.transform(response, transformerOptions); - const queries = []; - const activeTargets = []; + return series; + }; - options = _.clone(options); + runObserverQueries = ( + options: DataQueryRequest, + observer: DataStreamObserver, + queries: PromQueryRequest[], + activeTargets: PromQuery[], + end: number + ) => { + for (let index = 0; index < queries.length; index++) { + const query = queries[index]; + const target = activeTargets[index]; + let observable: Observable = null; + + if (query.instant) { + observable = from(this.performInstantQuery(query, end)); + } else { + observable = from(this.performTimeSeriesQuery(query, query.start, query.end)); + } + + observable + .pipe( + single(), // unsubscribes automatically after first result + filter((response: any) => (response.cancelled ? false : true)), + map((response: any) => { + return this.processResult(response, query, target, queries.length); + }) + ) + .subscribe({ + next: series => { + if (query.instant) { + observer({ + key: `prometheus-${target.refId}`, + state: LoadingState.Loading, + request: options, + series: null, + delta: series, + unsubscribe: () => undefined, + }); + } else { + observer({ + key: `prometheus-${target.refId}`, + state: LoadingState.Done, + request: options, + series: null, + delta: series, + unsubscribe: () => undefined, + }); + } + }, + }); + } + }; + + prepareTargets = (options: DataQueryRequest, start: number, end: number) => { + const queries: PromQueryRequest[] = []; + const activeTargets: PromQuery[] = []; for (const target of options.targets) { if (!target.expr || target.hide) { continue; } + if (target.context === 'explore') { + target.format = 'time_series'; + target.instant = false; + const instantTarget: any = _.cloneDeep(target); + instantTarget.format = 'table'; + instantTarget.instant = true; + instantTarget.valueWithRefId = true; + delete instantTarget.maxDataPoints; + instantTarget.requestId += '_instant'; + instantTarget.refId += '_instant'; + activeTargets.push(instantTarget); + queries.push(this.createQuery(instantTarget, options, start, end)); + } + activeTargets.push(target); queries.push(this.createQuery(target, options, start, end)); } + return { + queries, + activeTargets, + }; + }; + + query(options: DataQueryRequest, observer?: DataStreamObserver): Promise<{ data: any }> { + const start = this.getPrometheusTime(options.range.from, false); + const end = this.getPrometheusTime(options.range.to, true); + + options = _.clone(options); + const { queries, activeTargets } = this.prepareTargets(options, start, end); + // No valid targets, return the empty result to save a round trip. if (_.isEmpty(queries)) { return this.$q.when({ data: [] }) as Promise<{ data: any }>; } + if (observer && options.targets.filter(target => target.context === 'explore').length === options.targets.length) { + // using observer to make the instant query return immediately + this.runObserverQueries(options, observer, queries, activeTargets, end); + return this.$q.when({ data: [] }) as Promise<{ data: any }>; + } + const allQueryPromise = _.map(queries, query => { - if (!query.instant) { - return this.performTimeSeriesQuery(query, query.start, query.end); - } else { + if (query.instant) { return this.performInstantQuery(query, end); + } else { + return this.performTimeSeriesQuery(query, query.start, query.end); } }); @@ -180,19 +282,10 @@ export class PrometheusDatasource extends DataSourceApi return; } - // Keeping original start/end for transformers - const transformerOptions = { - format: activeTargets[index].format, - step: queries[index].step, - legendFormat: activeTargets[index].legendFormat, - start: queries[index].start, - end: queries[index].end, - query: queries[index].expr, - responseListLength: responseList.length, - refId: activeTargets[index].refId, - valueWithRefId: activeTargets[index].valueWithRefId, - }; - const series = this.resultTransformer.transform(response, transformerOptions); + const target = activeTargets[index]; + const query = queries[index]; + const series = this.processResult(response, query, target, queries.length); + result = [...result, ...series]; }); @@ -202,10 +295,16 @@ export class PrometheusDatasource extends DataSourceApi return allPromise as Promise<{ data: any }>; } - createQuery(target, options, start, end) { - const query: any = { + createQuery(target: PromQuery, options: DataQueryRequest, start: number, end: number) { + const query: PromQueryRequest = { hinting: target.hinting, instant: target.instant, + step: 0, + expr: '', + requestId: '', + refId: '', + start: 0, + end: 0, }; const range = Math.ceil(end - start); @@ -398,7 +497,7 @@ export class PrometheusDatasource extends DataSourceApi }; // Unsetting min interval for accurate event resolution const minStep = '1s'; - const query = this.createQuery({ expr, interval: minStep }, queryOptions, start, end); + const query = this.createQuery({ expr, interval: minStep, refId: 'X' }, queryOptions, start, end); const self = this; return this.performTimeSeriesQuery(query, query.start, query.end).then(results => { diff --git a/public/app/plugins/datasource/prometheus/types.ts b/public/app/plugins/datasource/prometheus/types.ts index e83029df835..a256f289cfe 100644 --- a/public/app/plugins/datasource/prometheus/types.ts +++ b/public/app/plugins/datasource/prometheus/types.ts @@ -2,6 +2,14 @@ import { DataQuery, DataSourceJsonData } from '@grafana/ui/src/types'; export interface PromQuery extends DataQuery { expr: string; + context?: 'explore' | 'panel'; + format?: string; + instant?: boolean; + hinting?: boolean; + interval?: string; + intervalFactor?: number; + legendFormat?: string; + valueWithRefId?: boolean; } export interface PromOptions extends DataSourceJsonData { @@ -10,3 +18,10 @@ export interface PromOptions extends DataSourceJsonData { httpMethod: string; directUrl: string; } + +export interface PromQueryRequest extends PromQuery { + step?: number; + requestId?: string; + start: number; + end: number; +} diff --git a/public/app/store/configureStore.ts b/public/app/store/configureStore.ts index 2d7d3288d3b..63d8eaaf718 100644 --- a/public/app/store/configureStore.ts +++ b/public/app/store/configureStore.ts @@ -15,8 +15,22 @@ import usersReducers from 'app/features/users/state/reducers'; import userReducers from 'app/features/profile/state/reducers'; import organizationReducers from 'app/features/org/state/reducers'; import { setStore } from './store'; -import { startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic } from 'app/features/explore/state/epics'; -import { WebSocketSubject, webSocket } from 'rxjs/webSocket'; +import { limitMessageRateEpic } from 'app/features/explore/state/epics/limitMessageRateEpic'; +import { stateSaveEpic } from 'app/features/explore/state/epics/stateSaveEpic'; +import { processQueryResultsEpic } from 'app/features/explore/state/epics/processQueryResultsEpic'; +import { processQueryErrorsEpic } from 'app/features/explore/state/epics/processQueryErrorsEpic'; +import { runQueriesEpic } from 'app/features/explore/state/epics/runQueriesEpic'; +import { runQueriesBatchEpic } from 'app/features/explore/state/epics/runQueriesBatchEpic'; +import { + DataSourceApi, + DataQueryResponse, + DataQuery, + DataSourceJsonData, + DataQueryRequest, + DataStreamObserver, +} from '@grafana/ui'; +import { Observable } from 'rxjs'; +import { getQueryResponse } from 'app/core/utils/explore'; import { StoreState } from 'app/types/store'; import { toggleLogActionsMiddleware } from 'app/core/middlewares/application'; @@ -39,14 +53,25 @@ export function addRootReducer(reducers) { Object.assign(rootReducers, ...reducers); } -export const rootEpic: any = combineEpics(startSubscriptionsEpic, startSubscriptionEpic, limitMessageRateEpic); +export const rootEpic: any = combineEpics( + limitMessageRateEpic, + stateSaveEpic, + runQueriesEpic, + runQueriesBatchEpic, + processQueryResultsEpic, + processQueryErrorsEpic +); export interface EpicDependencies { - getWebSocket: (urlConfigOrSource: string) => WebSocketSubject; + getQueryResponse: ( + datasourceInstance: DataSourceApi, + options: DataQueryRequest, + observer?: DataStreamObserver + ) => Observable; } const dependencies: EpicDependencies = { - getWebSocket: webSocket, + getQueryResponse, }; const epicMiddleware = createEpicMiddleware({ dependencies }); diff --git a/public/app/types/explore.ts b/public/app/types/explore.ts index 289ae02b0d7..98d137f1e7a 100644 --- a/public/app/types/explore.ts +++ b/public/app/types/explore.ts @@ -3,7 +3,6 @@ import { Value } from 'slate'; import { RawTimeRange, DataQuery, - DataQueryResponseData, DataSourceSelectItem, DataSourceApi, QueryHint, @@ -13,9 +12,10 @@ import { DataQueryError, LogsModel, LogsDedupStrategy, + LoadingState, } from '@grafana/ui'; -import { Emitter, TimeSeries } from 'app/core/core'; +import { Emitter } from 'app/core/core'; import TableModel from 'app/core/table_model'; export enum ExploreMode { @@ -215,9 +215,7 @@ export interface ExploreItemState { */ showingTable: boolean; - graphIsLoading: boolean; - logIsLoading: boolean; - tableIsLoading: boolean; + loadingState: LoadingState; /** * Table model that combines all query table results into a single table. */ @@ -254,6 +252,7 @@ export interface ExploreItemState { mode: ExploreMode; isLive: boolean; + urlReplaced: boolean; } export interface ExploreUpdateState { @@ -314,11 +313,8 @@ export interface QueryIntervals { export interface QueryOptions { interval: string; - format: string; - hinting?: boolean; - instant?: boolean; - valueWithRefId?: boolean; maxDataPoints?: number; + live?: boolean; } export interface QueryTransaction { @@ -330,23 +326,14 @@ export interface QueryTransaction { options: any; queries: DataQuery[]; result?: any; // Table model / Timeseries[] / Logs - resultType: ResultType; scanning?: boolean; } export type RangeScanner = () => RawTimeRange; -export type ResultGetter = ( - result: DataQueryResponseData, - transaction: QueryTransaction, - allTransactions: QueryTransaction[] -) => TimeSeries; - export interface TextMatch { text: string; start: number; length: number; end: number; } - -export type ResultType = 'Graph' | 'Logs' | 'Table'; diff --git a/public/test/core/redux/epicTester.ts b/public/test/core/redux/epicTester.ts index 5c2a4246943..88638f556c6 100644 --- a/public/test/core/redux/epicTester.ts +++ b/public/test/core/redux/epicTester.ts @@ -1,6 +1,14 @@ import { Epic, ActionsObservable, StateObservable } from 'redux-observable'; import { Subject } from 'rxjs'; -import { WebSocketSubject } from 'rxjs/webSocket'; +import { + DataSourceApi, + DataQuery, + DataSourceJsonData, + DataQueryRequest, + DataStreamObserver, + DataQueryResponse, + DataStreamState, +} from '@grafana/ui'; import { ActionOf } from 'app/core/redux/actionCreatorFactory'; import { StoreState } from 'app/types/store'; @@ -8,21 +16,30 @@ import { EpicDependencies } from 'app/store/configureStore'; export const epicTester = ( epic: Epic, ActionOf, StoreState, EpicDependencies>, - state?: StoreState + state?: Partial ) => { const resultingActions: Array> = []; const action$ = new Subject>(); const state$ = new Subject(); const actionObservable$ = new ActionsObservable(action$); - const stateObservable$ = new StateObservable(state$, state || ({} as StoreState)); - const websockets$: Array> = []; - const dependencies: EpicDependencies = { - getWebSocket: () => { - const webSocket$ = new Subject(); - websockets$.push(webSocket$); - return webSocket$ as WebSocketSubject; - }, + const stateObservable$ = new StateObservable(state$, (state as StoreState) || ({} as StoreState)); + const queryResponse$ = new Subject(); + const observer$ = new Subject(); + const getQueryResponse = ( + datasourceInstance: DataSourceApi, + options: DataQueryRequest, + observer?: DataStreamObserver + ) => { + if (observer) { + observer$.subscribe({ next: event => observer(event) }); + } + return queryResponse$; }; + + const dependencies: EpicDependencies = { + getQueryResponse, + }; + epic(actionObservable$, stateObservable$, dependencies).subscribe({ next: action => resultingActions.push(action) }); const whenActionIsDispatched = (action: ActionOf) => { @@ -31,14 +48,26 @@ export const epicTester = ( return instance; }; - const whenWebSocketReceivesData = (data: any) => { - websockets$.forEach(websocket$ => websocket$.next(data)); + const whenQueryReceivesResponse = (response: DataQueryResponse) => { + queryResponse$.next(response); + + return instance; + }; + + const whenQueryThrowsError = (error: any) => { + queryResponse$.error(error); + + return instance; + }; + + const whenQueryObserverReceivesEvent = (event: DataStreamState) => { + observer$.next(event); return instance; }; const thenResultingActionsEqual = (...actions: Array>) => { - expect(resultingActions).toEqual(actions); + expect(actions).toEqual(resultingActions); return instance; }; @@ -51,7 +80,9 @@ export const epicTester = ( const instance = { whenActionIsDispatched, - whenWebSocketReceivesData, + whenQueryReceivesResponse, + whenQueryThrowsError, + whenQueryObserverReceivesEvent, thenResultingActionsEqual, thenNoActionsWhereDispatched, }; diff --git a/public/test/mocks/mockExploreState.ts b/public/test/mocks/mockExploreState.ts new file mode 100644 index 00000000000..981f1fb2dbe --- /dev/null +++ b/public/test/mocks/mockExploreState.ts @@ -0,0 +1,86 @@ +import { DataSourceApi } from '@grafana/ui/src/types/datasource'; + +import { ExploreId, ExploreItemState, ExploreState } from 'app/types/explore'; +import { makeExploreItemState } from 'app/features/explore/state/reducers'; +import { StoreState } from 'app/types'; + +export const mockExploreState = (options: any = {}) => { + const isLive = options.isLive || false; + const history = []; + const eventBridge = { + emit: jest.fn(), + }; + const streaming = options.streaming || undefined; + const datasourceInterval = options.datasourceInterval || ''; + const refreshInterval = options.refreshInterval || ''; + const containerWidth = options.containerWidth || 1980; + const queries = options.queries || []; + const datasourceError = options.datasourceError || null; + const scanner = options.scanner || jest.fn(); + const scanning = options.scanning || false; + const datasourceId = options.datasourceId || '1337'; + const exploreId = ExploreId.left; + const datasourceInstance: DataSourceApi = options.datasourceInstance || { + id: 1337, + query: jest.fn(), + name: 'test', + testDatasource: jest.fn(), + meta: { + id: datasourceId, + streaming, + }, + interval: datasourceInterval, + }; + const urlReplaced = options.urlReplaced || false; + const left: ExploreItemState = options.left || { + ...makeExploreItemState(), + containerWidth, + datasourceError, + datasourceInstance, + eventBridge, + history, + isLive, + queries, + refreshInterval, + scanner, + scanning, + urlReplaced, + }; + const right: ExploreItemState = options.right || { + ...makeExploreItemState(), + containerWidth, + datasourceError, + datasourceInstance, + eventBridge, + history, + isLive, + queries, + refreshInterval, + scanner, + scanning, + urlReplaced, + }; + const split: boolean = options.split || false; + const explore: ExploreState = { + left, + right, + split, + }; + const state: Partial = { + explore, + }; + + return { + containerWidth, + datasourceId, + datasourceInstance, + datasourceInterval, + eventBridge, + exploreId, + history, + queries, + refreshInterval, + state, + scanner, + }; +};