loki: query splitting: better canceling (#63315)

loki: query splitting: better cancelling
This commit is contained in:
Gábor Farkas 2023-02-13 17:52:30 +01:00 committed by GitHub
parent 222e02fc18
commit 0ee9d11a91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,4 @@
import { Subscriber, map, Observable } from 'rxjs';
import { Subscriber, map, Observable, Subscription } from 'rxjs';
import { DataQueryRequest, DataQueryResponse, dateTime, TimeRange } from '@grafana/data';
import { LoadingState } from '@grafana/schema';
@ -99,7 +99,13 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
);
const totalRequests = partition.length;
let shouldStop = false;
let smallQuerySubsciption: Subscription | null = null;
const runNextRequest = (subscriber: Subscriber<DataQueryResponse>, requestN: number) => {
if (shouldStop) {
return;
}
const requestId = `${request.requestId}_${requestN}`;
const range = partition[requestN - 1];
const targets = adjustTargetsFromResponseState(request.targets, mergedResponse);
@ -115,7 +121,7 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
return;
}
datasource
smallQuerySubsciption = datasource
.runQuery({ ...request, range, requestId, targets })
.pipe(
// in case of an empty query, this is somehow run twice. `share()` is no workaround here as the observable is generated from `of()`.
@ -142,6 +148,12 @@ export function runPartitionedQuery(datasource: LokiDatasource, request: DataQue
const response = new Observable<DataQueryResponse>((subscriber) => {
runNextRequest(subscriber, totalRequests);
return () => {
shouldStop = true;
if (smallQuerySubsciption != null) {
smallQuerySubsciption.unsubscribe();
}
};
});
return response;