2022-04-22 08:33:13 -05:00
import { AnyAction , createAction , PayloadAction } from '@reduxjs/toolkit' ;
import deepEqual from 'fast-deep-equal' ;
2022-08-31 09:24:20 -05:00
import { flatten , groupBy } from 'lodash' ;
2022-10-11 07:40:55 -05:00
import { identity , Observable , of , SubscriptionLike , Unsubscribable , combineLatest } from 'rxjs' ;
2022-04-22 08:33:13 -05:00
import { mergeMap , throttleTime } from 'rxjs/operators' ;
2020-11-09 07:48:24 -06:00
import {
2021-10-18 05:22:41 -05:00
AbsoluteTimeRange ,
2020-11-09 07:48:24 -06:00
DataQuery ,
DataQueryErrorType ,
2021-09-30 08:46:11 -05:00
DataQueryResponse ,
2020-11-09 07:48:24 -06:00
DataSourceApi ,
2021-09-30 08:46:11 -05:00
hasLogsVolumeSupport ,
2021-12-14 07:36:47 -06:00
hasQueryExportSupport ,
hasQueryImportSupport ,
2021-12-09 07:00:50 -06:00
HistoryItem ,
2020-11-09 07:48:24 -06:00
LoadingState ,
PanelEvents ,
QueryFixAction ,
toLegacyResponseData ,
} from '@grafana/data' ;
2022-08-31 09:24:20 -05:00
import { config , getDataSourceSrv , reportInteraction } from '@grafana/runtime' ;
2020-11-09 07:48:24 -06:00
import {
buildQueryTransaction ,
ensureQueries ,
generateEmptyQuery ,
generateNewKeyAndAddRefIdIfMissing ,
getQueryKeys ,
hasNonEmptyQuery ,
stopQueryState ,
updateHistory ,
} from 'app/core/utils/explore' ;
2022-04-22 08:33:13 -05:00
import { getShiftedTimeRange } from 'app/core/utils/timePicker' ;
2022-10-11 07:40:55 -05:00
import { CorrelationData } from 'app/features/correlations/useCorrelations' ;
2022-04-22 08:33:13 -05:00
import { getTimeZone } from 'app/features/profile/state/selectors' ;
2022-08-31 09:24:20 -05:00
import { MIXED_DATASOURCE_NAME } from 'app/plugins/datasource/mixed/MixedDataSource' ;
2022-10-11 07:40:55 -05:00
import { store } from 'app/store/store' ;
2021-12-09 07:00:50 -06:00
import { ExploreItemState , ExplorePanelData , ThunkDispatch , ThunkResult } from 'app/types' ;
import { ExploreId , ExploreState , QueryOptions } from 'app/types/explore' ;
2022-04-22 08:33:13 -05:00
2020-11-09 07:48:24 -06:00
import { notifyApp } from '../../../core/actions' ;
2022-04-22 08:33:13 -05:00
import { createErrorNotification } from '../../../core/copy/appNotification' ;
2021-05-24 06:56:48 -05:00
import { runRequest } from '../../query/state/runRequest' ;
import { decorateData } from '../utils/decorators' ;
2022-04-22 08:33:13 -05:00
import { addHistoryItem , historyUpdatedAction , loadRichHistory } from './history' ;
2022-04-06 06:49:25 -05:00
import { stateSave } from './main' ;
2020-11-09 07:48:24 -06:00
import { updateTime } from './time' ;
2022-09-26 07:28:12 -05:00
import { createCacheKey , getResultsFromCache , storeLogsVolumeEnabled } from './utils' ;
2020-11-09 07:48:24 -06:00
//
// Actions and Payloads
//
/ * *
* Adds a query row after the row with the given index .
* /
export interface AddQueryRowPayload {
exploreId : ExploreId ;
index : number ;
query : DataQuery ;
}
export const addQueryRowAction = createAction < AddQueryRowPayload > ( 'explore/addQueryRow' ) ;
/ * *
* Query change handler for the query row with the given index .
* If ` override ` is reset the query modifications and run the queries . Use this to set queries via a link .
* /
2021-09-15 10:26:23 -05:00
export interface ChangeQueriesPayload {
2020-11-09 07:48:24 -06:00
exploreId : ExploreId ;
2021-09-15 10:26:23 -05:00
queries : DataQuery [ ] ;
2020-11-09 07:48:24 -06:00
}
2021-09-15 10:26:23 -05:00
export const changeQueriesAction = createAction < ChangeQueriesPayload > ( 'explore/changeQueries' ) ;
2020-11-09 07:48:24 -06:00
/ * *
2021-11-12 04:09:25 -06:00
* Cancel running queries .
2020-11-09 07:48:24 -06:00
* /
2021-11-12 04:09:25 -06:00
export interface CancelQueriesPayload {
2020-11-09 07:48:24 -06:00
exploreId : ExploreId ;
}
2021-11-12 04:09:25 -06:00
export const cancelQueriesAction = createAction < CancelQueriesPayload > ( 'explore/cancelQueries' ) ;
2020-11-09 07:48:24 -06:00
export interface QueriesImportedPayload {
exploreId : ExploreId ;
queries : DataQuery [ ] ;
}
export const queriesImportedAction = createAction < QueriesImportedPayload > ( 'explore/queriesImported' ) ;
/ * *
* Action to modify a query given a datasource - specific modifier action .
* @param exploreId Explore area
* @param modification Action object with a type , e . g . , ADD_FILTER
* @param index Optional query row index . If omitted , the modification is applied to all query rows .
* @param modifier Function that executes the modification , typically ` datasourceInstance.modifyQueries ` .
* /
export interface ModifyQueriesPayload {
exploreId : ExploreId ;
modification : QueryFixAction ;
modifier : ( query : DataQuery , modification : QueryFixAction ) = > DataQuery ;
}
export const modifyQueriesAction = createAction < ModifyQueriesPayload > ( 'explore/modifyQueries' ) ;
export interface QueryStoreSubscriptionPayload {
exploreId : ExploreId ;
querySubscription : Unsubscribable ;
}
2021-09-30 08:46:11 -05:00
2020-11-09 07:48:24 -06:00
export const queryStoreSubscriptionAction = createAction < QueryStoreSubscriptionPayload > (
'explore/queryStoreSubscription'
) ;
2022-09-26 07:28:12 -05:00
const setLogsVolumeEnabledAction = createAction < { exploreId : ExploreId ; enabled : boolean } > (
'explore/setLogsVolumeEnabledAction'
) ;
2021-09-30 08:46:11 -05:00
export interface StoreLogsVolumeDataProvider {
exploreId : ExploreId ;
logsVolumeDataProvider? : Observable < DataQueryResponse > ;
}
/ * *
* Stores available logs volume provider after running the query . Used internally by runQueries ( ) .
* /
2021-11-24 08:34:19 -06:00
export const storeLogsVolumeDataProviderAction = createAction < StoreLogsVolumeDataProvider > (
2021-09-30 08:46:11 -05:00
'explore/storeLogsVolumeDataProviderAction'
) ;
2021-11-24 08:34:19 -06:00
export const cleanLogsVolumeAction = createAction < { exploreId : ExploreId } > ( 'explore/cleanLogsVolumeAction' ) ;
2021-10-18 05:22:41 -05:00
2021-09-30 08:46:11 -05:00
export interface StoreLogsVolumeDataSubscriptionPayload {
exploreId : ExploreId ;
logsVolumeDataSubscription? : SubscriptionLike ;
}
/ * *
* Stores current logs volume subscription for given explore pane .
* /
const storeLogsVolumeDataSubscriptionAction = createAction < StoreLogsVolumeDataSubscriptionPayload > (
'explore/storeLogsVolumeDataSubscriptionAction'
) ;
/ * *
* Stores data returned by the provider . Used internally by loadLogsVolumeData ( ) .
* /
const updateLogsVolumeDataAction = createAction < {
exploreId : ExploreId ;
logsVolumeData : DataQueryResponse ;
} > ( 'explore/updateLogsVolumeDataAction' ) ;
2020-11-09 07:48:24 -06:00
export interface QueryEndedPayload {
exploreId : ExploreId ;
response : ExplorePanelData ;
}
export const queryStreamUpdatedAction = createAction < QueryEndedPayload > ( 'explore/queryStreamUpdated' ) ;
/ * *
* Reset queries to the given queries . Any modifications will be discarded .
* Use this action for clicks on query examples . Triggers a query run .
* /
export interface SetQueriesPayload {
exploreId : ExploreId ;
queries : DataQuery [ ] ;
}
export const setQueriesAction = createAction < SetQueriesPayload > ( 'explore/setQueries' ) ;
export interface ChangeLoadingStatePayload {
exploreId : ExploreId ;
loadingState : LoadingState ;
}
export const changeLoadingStateAction = createAction < ChangeLoadingStatePayload > ( 'changeLoadingState' ) ;
export interface SetPausedStatePayload {
exploreId : ExploreId ;
isPaused : boolean ;
}
export const setPausedStateAction = createAction < SetPausedStatePayload > ( 'explore/setPausedState' ) ;
/ * *
* Start a scan for more results using the given scanner .
* @param exploreId Explore area
* @param scanner Function that a ) returns a new time range and b ) triggers a query run for the new range
* /
export interface ScanStartPayload {
exploreId : ExploreId ;
}
export const scanStartAction = createAction < ScanStartPayload > ( 'explore/scanStart' ) ;
/ * *
* Stop any scanning for more results .
* /
export interface ScanStopPayload {
exploreId : ExploreId ;
}
export const scanStopAction = createAction < ScanStopPayload > ( 'explore/scanStop' ) ;
2021-05-24 06:56:48 -05:00
/ * *
* Adds query results to cache .
* This is currently used to cache last 5 query results for log queries run from logs navigation ( pagination ) .
* /
export interface AddResultsToCachePayload {
exploreId : ExploreId ;
cacheKey : string ;
2022-03-03 02:54:06 -06:00
queryResponse : ExplorePanelData ;
2021-05-24 06:56:48 -05:00
}
export const addResultsToCacheAction = createAction < AddResultsToCachePayload > ( 'explore/addResultsToCache' ) ;
/ * *
* Clears cache .
* /
export interface ClearCachePayload {
exploreId : ExploreId ;
}
export const clearCacheAction = createAction < ClearCachePayload > ( 'explore/clearCache' ) ;
2021-09-30 08:46:11 -05:00
2020-11-09 07:48:24 -06:00
//
// Action creators
//
/ * *
* Adds a query row after the row with the given index .
* /
2022-08-31 09:24:20 -05:00
export function addQueryRow ( exploreId : ExploreId , index : number ) : ThunkResult < void > {
return async ( dispatch , getState ) = > {
2021-02-12 14:33:26 -06:00
const queries = getState ( ) . explore [ exploreId ] ! . queries ;
2022-08-31 09:24:20 -05:00
const query = await generateEmptyQuery ( queries , index ) ;
2020-11-09 07:48:24 -06:00
dispatch ( addQueryRowAction ( { exploreId , index , query } ) ) ;
} ;
}
/ * *
* Cancel running queries
* /
export function cancelQueries ( exploreId : ExploreId ) : ThunkResult < void > {
2021-11-24 08:34:19 -06:00
return ( dispatch , getState ) = > {
2020-11-09 07:48:24 -06:00
dispatch ( scanStopAction ( { exploreId } ) ) ;
dispatch ( cancelQueriesAction ( { exploreId } ) ) ;
2021-11-24 08:34:19 -06:00
dispatch (
storeLogsVolumeDataProviderAction ( {
exploreId ,
logsVolumeDataProvider : undefined ,
} )
) ;
// clear any incomplete data
if ( getState ( ) . explore [ exploreId ] ! . logsVolumeData ? . state !== LoadingState . Done ) {
dispatch ( cleanLogsVolumeAction ( { exploreId } ) ) ;
}
2020-11-09 07:48:24 -06:00
dispatch ( stateSave ( ) ) ;
} ;
}
2022-08-31 09:24:20 -05:00
const addDatasourceToQueries = ( datasource : DataSourceApi , queries : DataQuery [ ] ) = > {
const dataSourceRef = datasource . getRef ( ) ;
return queries . map ( ( query : DataQuery ) = > {
return { . . . query , datasource : dataSourceRef } ;
} ) ;
} ;
const getImportableQueries = async (
targetDataSource : DataSourceApi ,
sourceDataSource : DataSourceApi ,
queries : DataQuery [ ]
) : Promise < DataQuery [ ] > = > {
let queriesOut : DataQuery [ ] = [ ] ;
if ( sourceDataSource . meta ? . id === targetDataSource . meta ? . id ) {
queriesOut = queries ;
} else if ( hasQueryExportSupport ( sourceDataSource ) && hasQueryImportSupport ( targetDataSource ) ) {
const abstractQueries = await sourceDataSource . exportToAbstractQueries ( queries ) ;
queriesOut = await targetDataSource . importFromAbstractQueries ( abstractQueries ) ;
} else if ( targetDataSource . importQueries ) {
// Datasource-specific importers
queriesOut = await targetDataSource . importQueries ( queries , sourceDataSource ) ;
}
// add new datasource to queries before returning
return addDatasourceToQueries ( targetDataSource , queriesOut ) ;
} ;
2020-11-09 07:48:24 -06:00
/ * *
* Import queries from previous datasource if possible eg Loki and Prometheus have similar query language so the
* labels part can be reused to get similar data .
* @param exploreId
* @param queries
* @param sourceDataSource
* @param targetDataSource
* /
export const importQueries = (
exploreId : ExploreId ,
queries : DataQuery [ ] ,
sourceDataSource : DataSourceApi | undefined | null ,
2022-08-31 09:24:20 -05:00
targetDataSource : DataSourceApi ,
singleQueryChangeRef? : string // when changing one query DS to another in a mixed environment, we do not want to change all queries, just the one being changed
2020-11-09 07:48:24 -06:00
) : ThunkResult < void > = > {
2021-01-20 00:59:48 -06:00
return async ( dispatch ) = > {
2020-11-09 07:48:24 -06:00
if ( ! sourceDataSource ) {
// explore not initialized
dispatch ( queriesImportedAction ( { exploreId , queries } ) ) ;
return ;
}
let importedQueries = queries ;
2022-08-31 09:24:20 -05:00
// If going to mixed, keep queries with source datasource
2022-08-31 10:22:13 -05:00
if ( targetDataSource . uid === MIXED_DATASOURCE_NAME ) {
2022-08-31 09:24:20 -05:00
importedQueries = queries . map ( ( query ) = > {
return { . . . query , datasource : sourceDataSource.getRef ( ) } ;
} ) ;
}
// If going from mixed, see what queries you keep by their individual datasources
2022-08-31 10:22:13 -05:00
else if ( sourceDataSource . uid === MIXED_DATASOURCE_NAME ) {
2022-08-31 09:24:20 -05:00
const groupedQueries = groupBy ( queries , ( query ) = > query . datasource ? . uid ) ;
const groupedImportableQueries = await Promise . all (
Object . keys ( groupedQueries ) . map ( async ( key : string ) = > {
const queryDatasource = await getDataSourceSrv ( ) . get ( { uid : key } ) ;
return await getImportableQueries ( targetDataSource , queryDatasource , groupedQueries [ key ] ) ;
} )
) ;
importedQueries = flatten ( groupedImportableQueries . filter ( ( arr ) = > arr . length > 0 ) ) ;
2020-11-09 07:48:24 -06:00
} else {
2022-08-31 09:24:20 -05:00
let queriesStartArr = queries ;
if ( singleQueryChangeRef !== undefined ) {
const changedQuery = queries . find ( ( query ) = > query . refId === singleQueryChangeRef ) ;
if ( changedQuery ) {
queriesStartArr = [ changedQuery ] ;
}
}
importedQueries = await getImportableQueries ( targetDataSource , sourceDataSource , queriesStartArr ) ;
2020-11-09 07:48:24 -06:00
}
2022-08-31 09:24:20 -05:00
// this will be the entire imported set, or the single imported query in an array
let nextQueries = await ensureQueries ( importedQueries , targetDataSource . getRef ( ) ) ;
if ( singleQueryChangeRef !== undefined ) {
2022-08-31 10:22:13 -05:00
// if the query import didn't return a result, there was no ability to import between datasources. Create an empty query for the datasource
if ( importedQueries . length === 0 ) {
const dsQuery = await generateEmptyQuery ( [ ] , undefined , targetDataSource . getRef ( ) ) ;
importedQueries = [ dsQuery ] ;
}
2022-08-31 09:24:20 -05:00
// capture the single imported query, and copy the original set
const updatedQueryIdx = queries . findIndex ( ( query ) = > query . refId === singleQueryChangeRef ) ;
2022-08-31 10:22:13 -05:00
// for single query change, all areas that generate refId do not know about other queries, so just copy the existing refID to the new query
const changedQuery = { . . . nextQueries [ 0 ] , refId : queries [ updatedQueryIdx ] . refId } ;
nextQueries = [ . . . queries ] ;
2022-08-31 09:24:20 -05:00
// replace the changed query
nextQueries [ updatedQueryIdx ] = changedQuery ;
}
2022-07-27 10:17:31 -05:00
2020-11-09 07:48:24 -06:00
dispatch ( queriesImportedAction ( { exploreId , queries : nextQueries } ) ) ;
} ;
} ;
/ * *
* Action to modify a query given a datasource - specific modifier action .
* @param exploreId Explore area
* @param modification Action object with a type , e . g . , ADD_FILTER
* @param modifier Function that executes the modification , typically ` datasourceInstance.modifyQueries ` .
* /
2022-10-13 03:04:51 -05:00
export function modifyQueries ( exploreId : ExploreId , modification : QueryFixAction , modifier : any ) : ThunkResult < void > {
2021-01-20 00:59:48 -06:00
return ( dispatch ) = > {
2022-10-13 03:04:51 -05:00
dispatch ( modifyQueriesAction ( { exploreId , modification , modifier } ) ) ;
2020-11-09 07:48:24 -06:00
if ( ! modification . preventSubmit ) {
dispatch ( runQueries ( exploreId ) ) ;
}
} ;
}
2022-02-04 05:46:27 -06:00
async function handleHistory (
2021-12-09 07:00:50 -06:00
dispatch : ThunkDispatch ,
state : ExploreState ,
history : Array < HistoryItem < DataQuery > > ,
datasource : DataSourceApi ,
queries : DataQuery [ ] ,
exploreId : ExploreId
) {
const datasourceId = datasource . meta . id ;
const nextHistory = updateHistory ( history , datasourceId , queries ) ;
dispatch ( historyUpdatedAction ( { exploreId , history : nextHistory } ) ) ;
2022-04-06 06:49:25 -05:00
dispatch ( addHistoryItem ( datasource . uid , datasource . name , queries ) ) ;
// Because filtering happens in the backend we cannot add a new entry without checking if it matches currently
// used filters. Instead, we refresh the query history list.
// TODO: run only if Query History list is opened (#47252)
2022-04-27 08:07:44 -05:00
await dispatch ( loadRichHistory ( ExploreId . left ) ) ;
await dispatch ( loadRichHistory ( ExploreId . right ) ) ;
2021-12-09 07:00:50 -06:00
}
2020-11-09 07:48:24 -06:00
/ * *
* Main action to run queries and dispatches sub - actions based on which result viewers are active
* /
2021-07-12 10:54:17 -05:00
export const runQueries = (
exploreId : ExploreId ,
options ? : { replaceUrl? : boolean ; preserveCache? : boolean }
) : ThunkResult < void > = > {
2020-11-09 07:48:24 -06:00
return ( dispatch , getState ) = > {
dispatch ( updateTime ( { exploreId } ) ) ;
2022-10-11 07:40:55 -05:00
const correlations $ = getCorrelations ( ) ;
2021-07-12 10:54:17 -05:00
// We always want to clear cache unless we explicitly pass preserveCache parameter
const preserveCache = options ? . preserveCache === true ;
if ( ! preserveCache ) {
dispatch ( clearCache ( exploreId ) ) ;
}
2021-02-12 14:33:26 -06:00
const exploreItemState = getState ( ) . explore [ exploreId ] ! ;
2020-11-09 07:48:24 -06:00
const {
datasourceInstance ,
containerWidth ,
isLive : live ,
range ,
scanning ,
queryResponse ,
querySubscription ,
refreshInterval ,
absoluteRange ,
2021-05-24 06:56:48 -05:00
cache ,
2022-09-26 07:28:12 -05:00
logsVolumeEnabled ,
2020-11-09 07:48:24 -06:00
} = exploreItemState ;
2021-05-24 06:56:48 -05:00
let newQuerySub ;
2020-11-09 07:48:24 -06:00
2021-10-25 08:25:17 -05:00
const queries = exploreItemState . queries . map ( ( query ) = > ( {
. . . query ,
2021-10-29 12:57:24 -05:00
datasource : query.datasource || datasourceInstance ? . getRef ( ) ,
2021-10-25 08:25:17 -05:00
} ) ) ;
2021-12-09 07:00:50 -06:00
if ( datasourceInstance != null ) {
handleHistory ( dispatch , getState ( ) . explore , exploreItemState . history , datasourceInstance , queries , exploreId ) ;
}
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ;
2021-05-24 06:56:48 -05:00
const cachedValue = getResultsFromCache ( cache , absoluteRange ) ;
2020-11-09 07:48:24 -06:00
2021-05-24 06:56:48 -05:00
// If we have results saved in cache, we are going to use those results instead of running queries
if ( cachedValue ) {
2022-10-11 07:40:55 -05:00
newQuerySub = combineLatest ( [ of ( cachedValue ) , correlations $ ] )
2021-09-30 08:46:11 -05:00
. pipe (
2022-10-11 07:40:55 -05:00
mergeMap ( ( [ data , correlations ] ) = >
2022-06-13 10:20:57 -05:00
decorateData (
data ,
queryResponse ,
absoluteRange ,
refreshInterval ,
queries ,
2022-10-11 07:40:55 -05:00
correlations ,
2022-06-13 10:20:57 -05:00
datasourceInstance != null && hasLogsVolumeSupport ( datasourceInstance )
)
2021-09-30 08:46:11 -05:00
)
)
2021-05-24 06:56:48 -05:00
. subscribe ( ( data ) = > {
if ( ! data . error ) {
dispatch ( stateSave ( ) ) ;
}
2020-11-09 07:48:24 -06:00
2021-05-24 06:56:48 -05:00
dispatch ( queryStreamUpdatedAction ( { exploreId , response : data } ) ) ;
} ) ;
2020-11-09 07:48:24 -06:00
2021-06-28 09:12:46 -05:00
// If we don't have results saved in cache, run new queries
2021-05-24 06:56:48 -05:00
} else {
if ( ! hasNonEmptyQuery ( queries ) ) {
dispatch ( stateSave ( { replace : options?.replaceUrl } ) ) ; // Remember to save to state and update location
return ;
}
if ( ! datasourceInstance ) {
return ;
}
// Some datasource's query builders allow per-query interval limits,
// but we're using the datasource interval limit for now
const minInterval = datasourceInstance ? . interval ;
stopQueryState ( querySubscription ) ;
const queryOptions : QueryOptions = {
minInterval ,
// maxDataPoints is used in:
// Loki - used for logs streaming for buffer size, with undefined it falls back to datasource config if it supports that.
// Elastic - limits the number of datapoints for the counts query and for logs it has hardcoded limit.
// Influx - used to correctly display logs in graph
// TODO:unification
// maxDataPoints: mode === ExploreMode.Logs && datasourceId === 'loki' ? undefined : containerWidth,
maxDataPoints : containerWidth ,
liveStreaming : live ,
} ;
const timeZone = getTimeZone ( getState ( ) . user ) ;
2021-07-19 10:29:51 -05:00
const transaction = buildQueryTransaction ( exploreId , queries , queryOptions , range , scanning , timeZone ) ;
2021-05-24 06:56:48 -05:00
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Loading } ) ) ;
2022-10-11 07:40:55 -05:00
newQuerySub = combineLatest ( [
runRequest ( datasourceInstance , transaction . request )
2021-05-24 06:56:48 -05:00
// Simple throttle for live tailing, in case of > 1000 rows per interval we spend about 200ms on processing and
// rendering. In case this is optimized this can be tweaked, but also it should be only as fast as user
// actually can see what is happening.
2022-10-11 07:40:55 -05:00
. pipe ( live ? throttleTime ( 500 ) : identity ) ,
correlations $ ,
] )
. pipe (
mergeMap ( ( [ data , correlations ] ) = >
2021-09-30 08:46:11 -05:00
decorateData (
data ,
queryResponse ,
absoluteRange ,
refreshInterval ,
queries ,
2022-10-11 07:40:55 -05:00
correlations ,
2022-06-13 10:20:57 -05:00
datasourceInstance != null && hasLogsVolumeSupport ( datasourceInstance )
2021-09-30 08:46:11 -05:00
)
)
2021-05-24 06:56:48 -05:00
)
2022-01-05 12:34:09 -06:00
. subscribe ( {
next ( data ) {
2022-06-09 08:53:23 -05:00
if ( data . logsResult !== null ) {
reportInteraction ( 'grafana_explore_logs_result_displayed' , {
datasourceType : datasourceInstance.type ,
} ) ;
}
2021-05-24 06:56:48 -05:00
dispatch ( queryStreamUpdatedAction ( { exploreId , response : data } ) ) ;
2020-11-09 07:48:24 -06:00
2021-05-24 06:56:48 -05:00
// Keep scanning for results if this was the last scanning transaction
if ( getState ( ) . explore [ exploreId ] ! . scanning ) {
if ( data . state === LoadingState . Done && data . series . length === 0 ) {
const range = getShiftedTimeRange ( - 1 , getState ( ) . explore [ exploreId ] ! . range ) ;
dispatch ( updateTime ( { exploreId , absoluteRange : range } ) ) ;
dispatch ( runQueries ( exploreId ) ) ;
} else {
// We can stop scanning if we have a result
dispatch ( scanStopAction ( { exploreId } ) ) ;
}
2020-11-09 07:48:24 -06:00
}
2021-05-24 06:56:48 -05:00
} ,
2022-01-05 12:34:09 -06:00
error ( error ) {
2021-05-24 06:56:48 -05:00
dispatch ( notifyApp ( createErrorNotification ( 'Query processing error' , error ) ) ) ;
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Error } ) ) ;
console . error ( error ) ;
2022-01-05 12:34:09 -06:00
} ,
complete() {
2022-01-10 06:44:51 -06:00
// In case we don't get any response at all but the observable completed, make sure we stop loading state.
// This is for cases when some queries are noop like running first query after load but we don't have any
// actual query input.
if ( getState ( ) . explore [ exploreId ] ! . queryResponse . state === LoadingState . Loading ) {
dispatch ( changeLoadingStateAction ( { exploreId , loadingState : LoadingState.Done } ) ) ;
}
2022-01-05 12:34:09 -06:00
} ,
} ) ;
2021-09-30 08:46:11 -05:00
2021-10-18 05:22:41 -05:00
if ( live ) {
dispatch (
storeLogsVolumeDataProviderAction ( {
exploreId ,
logsVolumeDataProvider : undefined ,
} )
) ;
dispatch ( cleanLogsVolumeAction ( { exploreId } ) ) ;
2021-11-10 04:20:30 -06:00
} else if ( hasLogsVolumeSupport ( datasourceInstance ) ) {
2022-09-26 07:28:12 -05:00
// we always prepare the logsVolumeProvider,
// but we only load it, if the logs-volume-histogram is enabled.
// (we need to have the logsVolumeProvider always actual,
// even when the visuals are disabled, because when the user
// enables the visuals again, we need to load the histogram,
// so we need the provider)
2022-02-18 05:05:29 -06:00
const sourceRequest = {
. . . transaction . request ,
requestId : transaction.request.requestId + '_log_volume' ,
} ;
const logsVolumeDataProvider = datasourceInstance . getLogsVolumeDataProvider ( sourceRequest ) ;
2021-09-30 08:46:11 -05:00
dispatch (
storeLogsVolumeDataProviderAction ( {
exploreId ,
logsVolumeDataProvider ,
} )
) ;
2021-10-18 05:22:41 -05:00
const { logsVolumeData , absoluteRange } = getState ( ) . explore [ exploreId ] ! ;
if ( ! canReuseLogsVolumeData ( logsVolumeData , queries , absoluteRange ) ) {
dispatch ( cleanLogsVolumeAction ( { exploreId } ) ) ;
2022-09-26 07:28:12 -05:00
if ( logsVolumeEnabled ) {
dispatch ( loadLogsVolumeData ( exploreId ) ) ;
}
2021-09-30 08:46:11 -05:00
}
} else {
dispatch (
storeLogsVolumeDataProviderAction ( {
exploreId ,
logsVolumeDataProvider : undefined ,
} )
) ;
}
2021-05-24 06:56:48 -05:00
}
2020-11-09 07:48:24 -06:00
dispatch ( queryStoreSubscriptionAction ( { exploreId , querySubscription : newQuerySub } ) ) ;
} ;
} ;
2021-10-18 05:22:41 -05:00
/ * *
* Checks if after changing the time range the existing data can be used to show logs volume .
* It can happen if queries are the same and new time range is within existing data time range .
* /
function canReuseLogsVolumeData (
logsVolumeData : DataQueryResponse | undefined ,
queries : DataQuery [ ] ,
selectedTimeRange : AbsoluteTimeRange
) : boolean {
if ( logsVolumeData && logsVolumeData . data [ 0 ] ) {
// check if queries are the same
if ( ! deepEqual ( logsVolumeData . data [ 0 ] . meta ? . custom ? . targets , queries ) ) {
return false ;
}
const dataRange = logsVolumeData && logsVolumeData . data [ 0 ] && logsVolumeData . data [ 0 ] . meta ? . custom ? . absoluteRange ;
// if selected range is within loaded logs volume
if ( dataRange && dataRange . from <= selectedTimeRange . from && selectedTimeRange . to <= dataRange . to ) {
return true ;
}
}
return false ;
}
2020-11-09 07:48:24 -06:00
/ * *
* Reset queries to the given queries . Any modifications will be discarded .
* Use this action for clicks on query examples . Triggers a query run .
* /
export function setQueries ( exploreId : ExploreId , rawQueries : DataQuery [ ] ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
// Inject react keys into query objects
2021-02-12 14:33:26 -06:00
const queries = getState ( ) . explore [ exploreId ] ! . queries ;
2020-11-09 07:48:24 -06:00
const nextQueries = rawQueries . map ( ( query , index ) = > generateNewKeyAndAddRefIdIfMissing ( query , queries , index ) ) ;
dispatch ( setQueriesAction ( { exploreId , queries : nextQueries } ) ) ;
dispatch ( runQueries ( exploreId ) ) ;
} ;
}
/ * *
* Start a scan for more results using the given scanner .
* @param exploreId Explore area
* @param scanner Function that a ) returns a new time range and b ) triggers a query run for the new range
* /
export function scanStart ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
// Register the scanner
dispatch ( scanStartAction ( { exploreId } ) ) ;
// Scanning must trigger query run, and return the new range
2021-02-12 14:33:26 -06:00
const range = getShiftedTimeRange ( - 1 , getState ( ) . explore [ exploreId ] ! . range ) ;
2020-11-09 07:48:24 -06:00
// Set the new range to be displayed
dispatch ( updateTime ( { exploreId , absoluteRange : range } ) ) ;
dispatch ( runQueries ( exploreId ) ) ;
} ;
}
2021-05-24 06:56:48 -05:00
export function addResultsToCache ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
const queryResponse = getState ( ) . explore [ exploreId ] ! . queryResponse ;
const absoluteRange = getState ( ) . explore [ exploreId ] ! . absoluteRange ;
const cacheKey = createCacheKey ( absoluteRange ) ;
// Save results to cache only when all results recived and loading is done
if ( queryResponse . state === LoadingState . Done ) {
dispatch ( addResultsToCacheAction ( { exploreId , cacheKey , queryResponse } ) ) ;
}
} ;
}
export function clearCache ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
dispatch ( clearCacheAction ( { exploreId } ) ) ;
} ;
}
2021-09-30 08:46:11 -05:00
/ * *
* Initializes loading logs volume data and stores emitted value .
* /
export function loadLogsVolumeData ( exploreId : ExploreId ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
const { logsVolumeDataProvider } = getState ( ) . explore [ exploreId ] ! ;
if ( logsVolumeDataProvider ) {
const logsVolumeDataSubscription = logsVolumeDataProvider . subscribe ( {
next : ( logsVolumeData : DataQueryResponse ) = > {
dispatch ( updateLogsVolumeDataAction ( { exploreId , logsVolumeData } ) ) ;
} ,
} ) ;
dispatch ( storeLogsVolumeDataSubscriptionAction ( { exploreId , logsVolumeDataSubscription } ) ) ;
}
} ;
}
2022-09-26 07:28:12 -05:00
export function setLogsVolumeEnabled ( exploreId : ExploreId , enabled : boolean ) : ThunkResult < void > {
return ( dispatch , getState ) = > {
dispatch ( setLogsVolumeEnabledAction ( { exploreId , enabled } ) ) ;
storeLogsVolumeEnabled ( enabled ) ;
if ( enabled ) {
dispatch ( loadLogsVolumeData ( exploreId ) ) ;
}
} ;
}
2020-11-09 07:48:24 -06:00
//
// Reducer
//
// Redux Toolkit uses ImmerJs as part of their solution to ensure that state objects are not mutated.
// ImmerJs has an autoFreeze option that freezes objects from change which means this reducer can't be migrated to createSlice
// because the state would become frozen and during run time we would get errors because flot (Graph lib) would try to mutate
// the frozen state.
// https://github.com/reduxjs/redux-toolkit/issues/242
export const queryReducer = ( state : ExploreItemState , action : AnyAction ) : ExploreItemState = > {
if ( addQueryRowAction . match ( action ) ) {
const { queries } = state ;
const { index , query } = action . payload ;
// Add to queries, which will cause a new row to be rendered
const nextQueries = [ . . . queries . slice ( 0 , index + 1 ) , { . . . query } , . . . queries . slice ( index + 1 ) ] ;
return {
. . . state ,
queries : nextQueries ,
2022-08-31 09:24:20 -05:00
queryKeys : getQueryKeys ( nextQueries ) ,
2020-11-09 07:48:24 -06:00
} ;
}
2021-09-15 10:26:23 -05:00
if ( changeQueriesAction . match ( action ) ) {
const { queries } = action . payload ;
2020-11-09 07:48:24 -06:00
return {
. . . state ,
2021-09-15 10:26:23 -05:00
queries ,
2020-11-09 07:48:24 -06:00
} ;
}
if ( cancelQueriesAction . match ( action ) ) {
stopQueryState ( state . querySubscription ) ;
return {
. . . state ,
loading : false ,
} ;
}
if ( modifyQueriesAction . match ( action ) ) {
const { queries } = state ;
2022-10-13 03:04:51 -05:00
const { modification , modifier } = action . payload ;
2020-11-09 07:48:24 -06:00
let nextQueries : DataQuery [ ] ;
2022-10-13 03:04:51 -05:00
nextQueries = queries . map ( ( query , i ) = > {
const nextQuery = modifier ( { . . . query } , modification ) ;
return generateNewKeyAndAddRefIdIfMissing ( nextQuery , queries , i ) ;
} ) ;
2020-11-09 07:48:24 -06:00
return {
. . . state ,
queries : nextQueries ,
2022-08-31 09:24:20 -05:00
queryKeys : getQueryKeys ( nextQueries ) ,
2020-11-09 07:48:24 -06:00
} ;
}
if ( setQueriesAction . match ( action ) ) {
const { queries } = action . payload ;
return {
. . . state ,
queries : queries.slice ( ) ,
2022-08-31 09:24:20 -05:00
queryKeys : getQueryKeys ( queries ) ,
2020-11-09 07:48:24 -06:00
} ;
}
if ( queryStoreSubscriptionAction . match ( action ) ) {
const { querySubscription } = action . payload ;
return {
. . . state ,
querySubscription ,
} ;
}
2022-09-26 07:28:12 -05:00
if ( setLogsVolumeEnabledAction . match ( action ) ) {
const { enabled } = action . payload ;
if ( ! enabled && state . logsVolumeDataSubscription ) {
state . logsVolumeDataSubscription . unsubscribe ( ) ;
}
return {
. . . state ,
logsVolumeEnabled : enabled ,
// NOTE: the dataProvider is not cleared, we may need it later,
// if the user re-enables the histogram-visualization
logsVolumeData : undefined ,
} ;
}
2021-09-30 08:46:11 -05:00
if ( storeLogsVolumeDataProviderAction . match ( action ) ) {
let { logsVolumeDataProvider } = action . payload ;
if ( state . logsVolumeDataSubscription ) {
state . logsVolumeDataSubscription . unsubscribe ( ) ;
}
return {
. . . state ,
logsVolumeDataProvider ,
logsVolumeDataSubscription : undefined ,
2021-10-18 05:22:41 -05:00
} ;
}
if ( cleanLogsVolumeAction . match ( action ) ) {
return {
. . . state ,
2021-09-30 08:46:11 -05:00
logsVolumeData : undefined ,
} ;
}
if ( storeLogsVolumeDataSubscriptionAction . match ( action ) ) {
const { logsVolumeDataSubscription } = action . payload ;
return {
. . . state ,
logsVolumeDataSubscription ,
} ;
}
if ( updateLogsVolumeDataAction . match ( action ) ) {
let { logsVolumeData } = action . payload ;
return {
. . . state ,
logsVolumeData ,
} ;
}
2020-11-09 07:48:24 -06:00
if ( queryStreamUpdatedAction . match ( action ) ) {
return processQueryResponse ( state , action ) ;
}
if ( queriesImportedAction . match ( action ) ) {
const { queries } = action . payload ;
return {
. . . state ,
queries ,
2022-08-31 09:24:20 -05:00
queryKeys : getQueryKeys ( queries ) ,
2020-11-09 07:48:24 -06:00
} ;
}
if ( changeLoadingStateAction . match ( action ) ) {
const { loadingState } = action . payload ;
return {
. . . state ,
queryResponse : {
. . . state . queryResponse ,
state : loadingState ,
} ,
loading : loadingState === LoadingState . Loading || loadingState === LoadingState . Streaming ,
} ;
}
if ( setPausedStateAction . match ( action ) ) {
const { isPaused } = action . payload ;
return {
. . . state ,
isPaused : isPaused ,
} ;
}
if ( scanStartAction . match ( action ) ) {
return { . . . state , scanning : true } ;
}
if ( scanStopAction . match ( action ) ) {
return {
. . . state ,
scanning : false ,
scanRange : undefined ,
} ;
}
2021-05-24 06:56:48 -05:00
if ( addResultsToCacheAction . match ( action ) ) {
const CACHE_LIMIT = 5 ;
const { cache } = state ;
const { queryResponse , cacheKey } = action . payload ;
let newCache = [ . . . cache ] ;
const isDuplicateKey = newCache . some ( ( c ) = > c . key === cacheKey ) ;
if ( ! isDuplicateKey ) {
const newCacheItem = { key : cacheKey , value : queryResponse } ;
newCache = [ newCacheItem , . . . newCache ] . slice ( 0 , CACHE_LIMIT ) ;
}
return {
. . . state ,
cache : newCache ,
} ;
}
if ( clearCacheAction . match ( action ) ) {
return {
. . . state ,
cache : [ ] ,
} ;
}
2020-11-09 07:48:24 -06:00
return state ;
} ;
2022-10-11 07:40:55 -05:00
/ * *
* Creates an observable that emits correlations once they are loaded
* /
const getCorrelations = ( ) = > {
return new Observable < CorrelationData [ ] > ( ( subscriber ) = > {
const existingCorrelations = store . getState ( ) . explore . correlations ;
if ( existingCorrelations ) {
subscriber . next ( existingCorrelations ) ;
subscriber . complete ( ) ;
} else {
const unsubscribe = store . subscribe ( ( ) = > {
const { correlations } = store . getState ( ) . explore ;
if ( correlations ) {
unsubscribe ( ) ;
subscriber . next ( correlations ) ;
subscriber . complete ( ) ;
}
} ) ;
}
} ) ;
} ;
2020-11-09 07:48:24 -06:00
export const processQueryResponse = (
state : ExploreItemState ,
action : PayloadAction < QueryEndedPayload >
) : ExploreItemState = > {
const { response } = action . payload ;
2021-01-19 09:34:43 -06:00
const {
request ,
state : loadingState ,
series ,
error ,
graphResult ,
logsResult ,
tableResult ,
traceFrames ,
nodeGraphFrames ,
2022-10-07 05:39:14 -05:00
flameGraphFrames ,
2021-01-19 09:34:43 -06:00
} = response ;
2020-11-09 07:48:24 -06:00
if ( error ) {
if ( error . type === DataQueryErrorType . Timeout ) {
return {
. . . state ,
queryResponse : response ,
loading : loadingState === LoadingState . Loading || loadingState === LoadingState . Streaming ,
} ;
} else if ( error . type === DataQueryErrorType . Cancelled ) {
return state ;
}
2021-05-17 10:48:04 -05:00
// Send error to Angular editors
2022-05-23 01:33:05 -05:00
// When angularSupportEnabled is removed we can remove this code and all references to eventBridge
if ( config . angularSupportEnabled && state . datasourceInstance ? . components ? . QueryCtrl ) {
2021-05-17 10:48:04 -05:00
state . eventBridge . emit ( PanelEvents . dataError , error ) ;
}
2020-11-09 07:48:24 -06:00
}
if ( ! request ) {
return { . . . state } ;
}
// Send legacy data to Angular editors
2022-05-23 01:33:05 -05:00
// When angularSupportEnabled is removed we can remove this code and all references to eventBridge
if ( config . angularSupportEnabled && state . datasourceInstance ? . components ? . QueryCtrl ) {
2021-01-20 00:59:48 -06:00
const legacy = series . map ( ( v ) = > toLegacyResponseData ( v ) ) ;
2020-11-09 07:48:24 -06:00
state . eventBridge . emit ( PanelEvents . dataReceived , legacy ) ;
}
return {
. . . state ,
queryResponse : response ,
graphResult ,
tableResult ,
logsResult ,
loading : loadingState === LoadingState . Loading || loadingState === LoadingState . Streaming ,
showLogs : ! ! logsResult ,
showMetrics : ! ! graphResult ,
showTable : ! ! tableResult ,
showTrace : ! ! traceFrames . length ,
2021-01-19 09:34:43 -06:00
showNodeGraph : ! ! nodeGraphFrames . length ,
2022-10-07 05:39:14 -05:00
showFlameGraph : ! ! flameGraphFrames . length ,
2020-11-09 07:48:24 -06:00
} ;
} ;