2020-11-18 08:10:32 -06:00
import { merge , Observable , of , Subject , throwError , Unsubscribable } from 'rxjs' ;
2022-01-14 04:19:42 -06:00
import { catchError , filter , finalize , mergeMap , take , takeUntil } from 'rxjs/operators' ;
2022-04-22 08:33:13 -05:00
import { v4 as uuidv4 } from 'uuid' ;
2020-11-18 08:10:32 -06:00
import {
CoreApp ,
DataQuery ,
DataQueryRequest ,
DataSourceApi ,
2021-01-07 04:26:56 -06:00
getDefaultTimeRange ,
2020-11-18 08:10:32 -06:00
LoadingState ,
2022-01-14 04:19:42 -06:00
PanelData ,
2020-11-18 08:10:32 -06:00
ScopedVars ,
} from '@grafana/data' ;
import { dispatch , getState } from '../../../store/store' ;
2022-04-22 08:33:13 -05:00
import { StoreState , ThunkDispatch } from '../../../types' ;
2020-11-18 08:10:32 -06:00
import { getTimeSrv } from '../../dashboard/services/TimeSrv' ;
2020-11-26 11:12:02 -06:00
import { runRequest } from '../../query/state/runRequest' ;
2022-04-22 08:33:13 -05:00
import { getLastKey , getVariable } from '../state/selectors' ;
import { KeyedVariableIdentifier } from '../state/types' ;
import { QueryVariableModel , VariableRefresh } from '../types' ;
import { getTemplatedRegex } from '../utils' ;
2021-04-26 22:57:25 -05:00
import { toMetricFindValues , updateOptionsState , validateVariableSelection } from './operators' ;
2022-04-22 08:33:13 -05:00
import { QueryRunners } from './queryRunners' ;
2020-11-18 08:10:32 -06:00
interface UpdateOptionsArgs {
2022-02-17 23:06:04 -06:00
identifier : KeyedVariableIdentifier ;
2020-11-18 08:10:32 -06:00
datasource : DataSourceApi ;
searchFilter? : string ;
}
export interface UpdateOptionsResults {
state : LoadingState ;
2022-02-17 23:06:04 -06:00
identifier : KeyedVariableIdentifier ;
2020-11-18 08:10:32 -06:00
error? : any ;
cancelled? : boolean ;
}
interface VariableQueryRunnerArgs {
dispatch : ThunkDispatch ;
getState : ( ) = > StoreState ;
getVariable : typeof getVariable ;
getTemplatedRegex : typeof getTemplatedRegex ;
getTimeSrv : typeof getTimeSrv ;
queryRunners : QueryRunners ;
runRequest : typeof runRequest ;
}
export class VariableQueryRunner {
private readonly updateOptionsRequests : Subject < UpdateOptionsArgs > ;
private readonly updateOptionsResults : Subject < UpdateOptionsResults > ;
2022-02-17 23:06:04 -06:00
private readonly cancelRequests : Subject < { identifier : KeyedVariableIdentifier } > ;
2020-11-18 08:10:32 -06:00
private readonly subscription : Unsubscribable ;
constructor (
private dependencies : VariableQueryRunnerArgs = {
dispatch ,
getState ,
getVariable ,
getTemplatedRegex ,
getTimeSrv ,
queryRunners : new QueryRunners ( ) ,
runRequest ,
}
) {
this . updateOptionsRequests = new Subject < UpdateOptionsArgs > ( ) ;
this . updateOptionsResults = new Subject < UpdateOptionsResults > ( ) ;
2022-02-17 23:06:04 -06:00
this . cancelRequests = new Subject < { identifier : KeyedVariableIdentifier } > ( ) ;
2020-11-18 08:10:32 -06:00
this . onNewRequest = this . onNewRequest . bind ( this ) ;
this . subscription = this . updateOptionsRequests . subscribe ( this . onNewRequest ) ;
}
queueRequest ( args : UpdateOptionsArgs ) : void {
this . updateOptionsRequests . next ( args ) ;
}
2022-02-17 23:06:04 -06:00
getResponse ( identifier : KeyedVariableIdentifier ) : Observable < UpdateOptionsResults > {
2021-01-20 00:59:48 -06:00
return this . updateOptionsResults . asObservable ( ) . pipe ( filter ( ( result ) = > result . identifier === identifier ) ) ;
2020-11-18 08:10:32 -06:00
}
2022-02-17 23:06:04 -06:00
cancelRequest ( identifier : KeyedVariableIdentifier ) : void {
2020-11-18 08:10:32 -06:00
this . cancelRequests . next ( { identifier } ) ;
}
destroy ( ) : void {
this . subscription . unsubscribe ( ) ;
}
private onNewRequest ( args : UpdateOptionsArgs ) : void {
const { datasource , identifier , searchFilter } = args ;
try {
const {
dispatch ,
runRequest ,
getTemplatedRegex : getTemplatedRegexFunc ,
getVariable ,
queryRunners ,
getTimeSrv ,
getState ,
} = this . dependencies ;
2022-02-17 23:06:04 -06:00
const beforeKey = getLastKey ( getState ( ) ) ;
2020-11-18 08:10:32 -06:00
this . updateOptionsResults . next ( { identifier , state : LoadingState.Loading } ) ;
2022-02-17 23:06:04 -06:00
const variable = getVariable < QueryVariableModel > ( identifier , getState ( ) ) ;
2020-11-18 08:10:32 -06:00
const timeSrv = getTimeSrv ( ) ;
const runnerArgs = { variable , datasource , searchFilter , timeSrv , runRequest } ;
const runner = queryRunners . getRunnerForDatasource ( datasource ) ;
const target = runner . getTarget ( { datasource , variable } ) ;
const request = this . getRequest ( variable , args , target ) ;
runner
. runRequest ( runnerArgs , request )
. pipe (
filter ( ( ) = > {
2021-04-01 11:17:39 -05:00
// Lets check if we started another batch during the execution of the observable. If so we just want to abort the rest.
2022-02-17 23:06:04 -06:00
const afterKey = getLastKey ( getState ( ) ) ;
2022-01-14 04:19:42 -06:00
2022-02-17 23:06:04 -06:00
return beforeKey === afterKey ;
2020-11-18 08:10:32 -06:00
} ) ,
2022-01-14 04:19:42 -06:00
filter ( ( data ) = > data . state === LoadingState . Done || data . state === LoadingState . Error ) , // we only care about done or error for now
take ( 1 ) , // take the first result, using first caused a bug where it in some situations throw an uncaught error because of no results had been received yet
mergeMap ( ( data : PanelData ) = > {
2020-11-18 08:10:32 -06:00
if ( data . state === LoadingState . Error ) {
2022-01-14 04:19:42 -06:00
return throwError ( ( ) = > data . error ) ;
2020-11-18 08:10:32 -06:00
}
return of ( data ) ;
} ) ,
toMetricFindValues ( ) ,
updateOptionsState ( { variable , dispatch , getTemplatedRegexFunc } ) ,
validateVariableSelection ( { variable , dispatch , searchFilter } ) ,
takeUntil (
merge ( this . updateOptionsRequests , this . cancelRequests ) . pipe (
2021-01-20 00:59:48 -06:00
filter ( ( args ) = > {
2020-11-18 08:10:32 -06:00
let cancelRequest = false ;
if ( args . identifier . id === identifier . id ) {
cancelRequest = true ;
this . updateOptionsResults . next ( { identifier , state : LoadingState.Loading , cancelled : cancelRequest } ) ;
}
return cancelRequest ;
} )
)
) ,
2021-01-20 00:59:48 -06:00
catchError ( ( error ) = > {
2020-11-18 08:10:32 -06:00
if ( error . cancelled ) {
return of ( { } ) ;
}
this . updateOptionsResults . next ( { identifier , state : LoadingState.Error , error } ) ;
2022-01-14 04:19:42 -06:00
return throwError ( ( ) = > error ) ;
2020-11-18 08:10:32 -06:00
} ) ,
finalize ( ( ) = > {
this . updateOptionsResults . next ( { identifier , state : LoadingState.Done } ) ;
} )
)
. subscribe ( ) ;
} catch ( error ) {
this . updateOptionsResults . next ( { identifier , state : LoadingState.Error , error } ) ;
}
}
private getRequest ( variable : QueryVariableModel , args : UpdateOptionsArgs , target : DataQuery ) {
const { searchFilter } = args ;
const variableAsVars = { variable : { text : variable.current.text , value : variable.current.value } } ;
const searchFilterScope = { searchFilter : { text : searchFilter , value : searchFilter } } ;
const searchFilterAsVars = searchFilter ? searchFilterScope : { } ;
const scopedVars = { . . . searchFilterAsVars , . . . variableAsVars } as ScopedVars ;
const range =
variable . refresh === VariableRefresh . onTimeRangeChanged
? this . dependencies . getTimeSrv ( ) . timeRange ( )
2021-01-07 04:26:56 -06:00
: getDefaultTimeRange ( ) ;
2020-11-18 08:10:32 -06:00
const request : DataQueryRequest = {
app : CoreApp.Dashboard ,
requestId : uuidv4 ( ) ,
timezone : '' ,
range ,
interval : '' ,
intervalMs : 0 ,
targets : [ target ] ,
scopedVars ,
startTime : Date.now ( ) ,
} ;
return request ;
}
}
let singleton : VariableQueryRunner ;
export function setVariableQueryRunner ( runner : VariableQueryRunner ) : void {
singleton = runner ;
}
export function getVariableQueryRunner ( ) : VariableQueryRunner {
return singleton ;
}