grafana/public/app/features/variables/query/VariableQueryRunner.ts
Hugo Häggmark 7ffefc069f
Variables: Handle variable cancellations better (#43987)
Co-authored-by: Ashley Harrison <ashley.harrison@grafana.com>
Co-authored-by: joshhunt <josh@trtr.co>
Co-authored-by: kay delaney <kay@grafana.com>
Co-authored-by: Alexandra Vargas <alexa1866@gmail.com>>
2022-01-14 11:19:42 +01:00

202 lines
6.8 KiB
TypeScript

import { merge, Observable, of, Subject, throwError, Unsubscribable } from 'rxjs';
import { catchError, filter, finalize, mergeMap, take, takeUntil } from 'rxjs/operators';
import {
CoreApp,
DataQuery,
DataQueryRequest,
DataSourceApi,
getDefaultTimeRange,
LoadingState,
PanelData,
ScopedVars,
} from '@grafana/data';
import { VariableIdentifier } from '../state/types';
import { getVariable } from '../state/selectors';
import { QueryVariableModel, VariableRefresh } from '../types';
import { StoreState, ThunkDispatch } from '../../../types';
import { dispatch, getState } from '../../../store/store';
import { getTemplatedRegex } from '../utils';
import { v4 as uuidv4 } from 'uuid';
import { getTimeSrv } from '../../dashboard/services/TimeSrv';
import { QueryRunners } from './queryRunners';
import { runRequest } from '../../query/state/runRequest';
import { toMetricFindValues, updateOptionsState, validateVariableSelection } from './operators';
interface UpdateOptionsArgs {
identifier: VariableIdentifier;
datasource: DataSourceApi;
searchFilter?: string;
}
export interface UpdateOptionsResults {
state: LoadingState;
identifier: VariableIdentifier;
error?: any;
cancelled?: boolean;
}
interface VariableQueryRunnerArgs {
dispatch: ThunkDispatch;
getState: () => StoreState;
getVariable: typeof getVariable;
getTemplatedRegex: typeof getTemplatedRegex;
getTimeSrv: typeof getTimeSrv;
queryRunners: QueryRunners;
runRequest: typeof runRequest;
}
export class VariableQueryRunner {
private readonly updateOptionsRequests: Subject<UpdateOptionsArgs>;
private readonly updateOptionsResults: Subject<UpdateOptionsResults>;
private readonly cancelRequests: Subject<{ identifier: VariableIdentifier }>;
private readonly subscription: Unsubscribable;
constructor(
private dependencies: VariableQueryRunnerArgs = {
dispatch,
getState,
getVariable,
getTemplatedRegex,
getTimeSrv,
queryRunners: new QueryRunners(),
runRequest,
}
) {
this.updateOptionsRequests = new Subject<UpdateOptionsArgs>();
this.updateOptionsResults = new Subject<UpdateOptionsResults>();
this.cancelRequests = new Subject<{ identifier: VariableIdentifier }>();
this.onNewRequest = this.onNewRequest.bind(this);
this.subscription = this.updateOptionsRequests.subscribe(this.onNewRequest);
}
queueRequest(args: UpdateOptionsArgs): void {
this.updateOptionsRequests.next(args);
}
getResponse(identifier: VariableIdentifier): Observable<UpdateOptionsResults> {
return this.updateOptionsResults.asObservable().pipe(filter((result) => result.identifier === identifier));
}
cancelRequest(identifier: VariableIdentifier): void {
this.cancelRequests.next({ identifier });
}
destroy(): void {
this.subscription.unsubscribe();
}
private onNewRequest(args: UpdateOptionsArgs): void {
const { datasource, identifier, searchFilter } = args;
try {
const {
dispatch,
runRequest,
getTemplatedRegex: getTemplatedRegexFunc,
getVariable,
queryRunners,
getTimeSrv,
getState,
} = this.dependencies;
const beforeUid = getState().templating.transaction.uid;
this.updateOptionsResults.next({ identifier, state: LoadingState.Loading });
const variable = getVariable<QueryVariableModel>(identifier.id, getState());
const timeSrv = getTimeSrv();
const runnerArgs = { variable, datasource, searchFilter, timeSrv, runRequest };
const runner = queryRunners.getRunnerForDatasource(datasource);
const target = runner.getTarget({ datasource, variable });
const request = this.getRequest(variable, args, target);
runner
.runRequest(runnerArgs, request)
.pipe(
filter(() => {
// Lets check if we started another batch during the execution of the observable. If so we just want to abort the rest.
const afterUid = getState().templating.transaction.uid;
return beforeUid === afterUid;
}),
filter((data) => data.state === LoadingState.Done || data.state === LoadingState.Error), // we only care about done or error for now
take(1), // take the first result, using first caused a bug where it in some situations throw an uncaught error because of no results had been received yet
mergeMap((data: PanelData) => {
if (data.state === LoadingState.Error) {
return throwError(() => data.error);
}
return of(data);
}),
toMetricFindValues(),
updateOptionsState({ variable, dispatch, getTemplatedRegexFunc }),
validateVariableSelection({ variable, dispatch, searchFilter }),
takeUntil(
merge(this.updateOptionsRequests, this.cancelRequests).pipe(
filter((args) => {
let cancelRequest = false;
if (args.identifier.id === identifier.id) {
cancelRequest = true;
this.updateOptionsResults.next({ identifier, state: LoadingState.Loading, cancelled: cancelRequest });
}
return cancelRequest;
})
)
),
catchError((error) => {
if (error.cancelled) {
return of({});
}
this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
return throwError(() => error);
}),
finalize(() => {
this.updateOptionsResults.next({ identifier, state: LoadingState.Done });
})
)
.subscribe();
} catch (error) {
this.updateOptionsResults.next({ identifier, state: LoadingState.Error, error });
}
}
private getRequest(variable: QueryVariableModel, args: UpdateOptionsArgs, target: DataQuery) {
const { searchFilter } = args;
const variableAsVars = { variable: { text: variable.current.text, value: variable.current.value } };
const searchFilterScope = { searchFilter: { text: searchFilter, value: searchFilter } };
const searchFilterAsVars = searchFilter ? searchFilterScope : {};
const scopedVars = { ...searchFilterAsVars, ...variableAsVars } as ScopedVars;
const range =
variable.refresh === VariableRefresh.onTimeRangeChanged
? this.dependencies.getTimeSrv().timeRange()
: getDefaultTimeRange();
const request: DataQueryRequest = {
app: CoreApp.Dashboard,
requestId: uuidv4(),
timezone: '',
range,
interval: '',
intervalMs: 0,
targets: [target],
scopedVars,
startTime: Date.now(),
};
return request;
}
}
let singleton: VariableQueryRunner;
export function setVariableQueryRunner(runner: VariableQueryRunner): void {
singleton = runner;
}
export function getVariableQueryRunner(): VariableQueryRunner {
return singleton;
}