grafana/public/app/features/alerting/unified/state/AlertingQueryRunner.ts
Josh Hunt 3c6e0e8ef8
Chore: ESlint import order (#44959)
* Add and configure eslint-plugin-import

* Fix the lint:ts npm command

* Autofix + prettier all the files

* Manually fix remaining files

* Move jquery code in jest-setup to external file to safely reorder imports

* Resolve issue caused by circular dependencies within Prometheus

* Update .betterer.results

* Fix missing // @ts-ignore

* ignore iconBundle.ts

* Fix missing // @ts-ignore
2022-04-22 14:33:13 +01:00

205 lines
5.9 KiB
TypeScript

import { Observable, of, OperatorFunction, ReplaySubject, Unsubscribable } from 'rxjs';
import { catchError, map, share } from 'rxjs/operators';
import { v4 as uuidv4 } from 'uuid';
import {
dataFrameFromJSON,
DataFrameJSON,
getDefaultTimeRange,
LoadingState,
PanelData,
rangeUtil,
TimeRange,
withLoadingIndicator,
} from '@grafana/data';
import { FetchResponse, getDataSourceSrv, toDataQueryError } from '@grafana/runtime';
import { BackendSrv, getBackendSrv } from 'app/core/services/backend_srv';
import { isExpressionQuery } from 'app/features/expressions/guards';
import { cancelNetworkRequestsOnUnsubscribe } from 'app/features/query/state/processing/canceler';
import { setStructureRevision } from 'app/features/query/state/processing/revision';
import { preProcessPanelData } from 'app/features/query/state/runRequest';
import { AlertQuery } from 'app/types/unified-alerting-dto';
import { getTimeRangeForExpression } from '../utils/timeRange';
export interface AlertingQueryResult {
frames: DataFrameJSON[];
}
export interface AlertingQueryResponse {
results: Record<string, AlertingQueryResult>;
}
export class AlertingQueryRunner {
private subject: ReplaySubject<Record<string, PanelData>>;
private subscription?: Unsubscribable;
private lastResult: Record<string, PanelData>;
constructor(private backendSrv = getBackendSrv(), private dataSourceSrv = getDataSourceSrv()) {
this.subject = new ReplaySubject(1);
this.lastResult = {};
}
get(): Observable<Record<string, PanelData>> {
return this.subject.asObservable();
}
async run(queries: AlertQuery[]) {
if (queries.length === 0) {
const empty = initialState(queries, LoadingState.Done);
return this.subject.next(empty);
}
// do not execute if one more of the queries are not runnable,
// for example not completely configured
for (const query of queries) {
if (!isExpressionQuery(query.model)) {
const ds = await this.dataSourceSrv.get(query.datasourceUid);
if (ds.filterQuery && !ds.filterQuery(query.model)) {
const empty = initialState(queries, LoadingState.Done);
return this.subject.next(empty);
}
}
}
this.subscription = runRequest(this.backendSrv, queries).subscribe({
next: (dataPerQuery) => {
const nextResult = applyChange(dataPerQuery, (refId, data) => {
const previous = this.lastResult[refId];
const preProcessed = preProcessPanelData(data, previous);
return setStructureRevision(preProcessed, previous);
});
this.lastResult = nextResult;
this.subject.next(this.lastResult);
},
error: (error: Error) => {
this.lastResult = mapErrorToPanelData(this.lastResult, error);
this.subject.next(this.lastResult);
},
});
}
cancel() {
if (!this.subscription) {
return;
}
this.subscription.unsubscribe();
let requestIsRunning = false;
const nextResult = applyChange(this.lastResult, (refId, data) => {
if (data.state === LoadingState.Loading) {
requestIsRunning = true;
}
return {
...data,
state: LoadingState.Done,
};
});
if (requestIsRunning) {
this.subject.next(nextResult);
}
}
destroy() {
if (this.subject) {
this.subject.complete();
}
this.cancel();
}
}
const runRequest = (backendSrv: BackendSrv, queries: AlertQuery[]): Observable<Record<string, PanelData>> => {
const initial = initialState(queries, LoadingState.Loading);
const request = {
data: { data: queries },
url: '/api/v1/eval',
method: 'POST',
requestId: uuidv4(),
};
return withLoadingIndicator({
whileLoading: initial,
source: backendSrv.fetch<AlertingQueryResponse>(request).pipe(
mapToPanelData(initial),
catchError((error) => of(mapErrorToPanelData(initial, error))),
cancelNetworkRequestsOnUnsubscribe(backendSrv, request.requestId),
share()
),
});
};
const initialState = (queries: AlertQuery[], state: LoadingState): Record<string, PanelData> => {
return queries.reduce((dataByQuery: Record<string, PanelData>, query) => {
dataByQuery[query.refId] = {
state,
series: [],
timeRange: getTimeRange(query, queries),
};
return dataByQuery;
}, {});
};
const getTimeRange = (query: AlertQuery, queries: AlertQuery[]): TimeRange => {
if (isExpressionQuery(query.model)) {
const relative = getTimeRangeForExpression(query.model, queries);
return rangeUtil.relativeToTimeRange(relative);
}
if (!query.relativeTimeRange) {
console.warn(`Query with refId: ${query.refId} did not have any relative time range, using default.`);
return getDefaultTimeRange();
}
return rangeUtil.relativeToTimeRange(query.relativeTimeRange);
};
const mapToPanelData = (
dataByQuery: Record<string, PanelData>
): OperatorFunction<FetchResponse<AlertingQueryResponse>, Record<string, PanelData>> => {
return map((response) => {
const { data } = response;
const results: Record<string, PanelData> = {};
for (const [refId, result] of Object.entries(data.results)) {
results[refId] = {
timeRange: dataByQuery[refId].timeRange,
state: LoadingState.Done,
series: result.frames.map(dataFrameFromJSON),
};
}
return results;
});
};
const mapErrorToPanelData = (lastResult: Record<string, PanelData>, error: Error): Record<string, PanelData> => {
const queryError = toDataQueryError(error);
return applyChange(lastResult, (refId, data) => {
return {
...data,
state: LoadingState.Error,
error: queryError,
};
});
};
const applyChange = (
initial: Record<string, PanelData>,
change: (refId: string, data: PanelData) => PanelData
): Record<string, PanelData> => {
const nextResult: Record<string, PanelData> = {};
for (const [refId, data] of Object.entries(initial)) {
nextResult[refId] = change(refId, data);
}
return nextResult;
};