Range splitting: Call subscriber.next only when there are new results to report (#64171)

This commit is contained in:
Matias Chomicki 2023-03-07 13:05:40 +01:00 committed by GitHub
parent dd12cdec4d
commit accef84ca5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -90,7 +90,7 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer
type LokiGroupedRequest = Array<{ request: DataQueryRequest<LokiQuery>; partition: TimeRange[] }>;
export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest) {
let mergedResponse: DataQueryResponse | null;
let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming };
const totalRequests = Math.max(...requests.map(({ partition }) => partition.length));
let shouldStop = false;
@ -101,23 +101,19 @@ export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGrou
return;
}
const done = (response: DataQueryResponse) => {
response.state = LoadingState.Done;
subscriber.next(response);
const done = () => {
mergedResponse.state = LoadingState.Done;
subscriber.next(mergedResponse);
subscriber.complete();
};
const nextRequest = () => {
mergedResponse = mergedResponse || { data: [] };
const { nextRequestN, nextRequestGroup } = getNextRequestPointers(requests, requestGroup, requestN);
if (nextRequestN > 0) {
mergedResponse.state = LoadingState.Streaming;
subscriber.next(mergedResponse);
runNextRequest(subscriber, nextRequestN, nextRequestGroup);
return;
}
done(mergedResponse);
done();
};
const group = requests[requestGroup];
@ -125,7 +121,7 @@ export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGrou
const range = group.partition[requestN - 1];
const targets = adjustTargetsFromResponseState(group.request.targets, mergedResponse);
if (!targets.length && mergedResponse) {
if (!targets.length) {
nextRequest();
return;
}
@ -140,6 +136,7 @@ export function runGroupedQueries(datasource: LokiDatasource, requests: LokiGrou
mergedResponse = combineResponses(mergedResponse, partialResponse);
},
complete: () => {
subscriber.next(mergedResponse);
nextRequest();
},
error: (error) => {