Query Splitting: Display progress of sub-requests using an annotation frame (#69574)

* Loki Query Splitting: Add and update loading frame to merged response

* Update unit test

* Loki Query Splitting: use data frame constructor function
This commit is contained in:
Matias Chomicki 2023-06-07 15:52:06 +02:00 committed by GitHub
parent 14d2f371a4
commit e31ea1da9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 1 deletions

View File

@ -45,7 +45,10 @@ describe('runSplitQuery()', () => {
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'Error' }, data: [] }));
await expect(runSplitQuery(datasource, request)).toEmitValuesWith((values) => {
expect(values).toEqual([{ error: { refId: 'A', message: 'Error' }, data: [], state: LoadingState.Streaming }]);
expect(values).toHaveLength(1);
expect(values[0]).toEqual(
expect.objectContaining({ error: { refId: 'A', message: 'Error' }, state: LoadingState.Streaming })
);
});
});

View File

@ -3,8 +3,10 @@ import { Observable, Subscriber, Subscription, tap } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import {
arrayToDataFrame,
DataQueryRequest,
DataQueryResponse,
DataTopic,
dateTime,
durationToMilliseconds,
parseDuration,
@ -84,6 +86,7 @@ function adjustTargetsFromResponseState(targets: LokiQuery[], response: DataQuer
export function runSplitGroupedQueries(datasource: LokiDatasource, requests: LokiGroupedRequest[]) {
let mergedResponse: DataQueryResponse = { data: [], state: LoadingState.Streaming };
const totalRequests = Math.max(...requests.map(({ partition }) => partition.length));
const longestPartition = requests.filter(({ partition }) => partition.length === totalRequests)[0].partition;
let shouldStop = false;
let subquerySubsciption: Subscription | null = null;
@ -126,6 +129,7 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
subquerySubsciption = datasource.runQuery(subRequest).subscribe({
next: (partialResponse) => {
mergedResponse = combineResponses(mergedResponse, partialResponse);
mergedResponse = updateLoadingFrame(mergedResponse, subRequest, longestPartition, requestN);
if ((mergedResponse.errors ?? []).length > 0 || mergedResponse.error != null) {
shouldStop = true;
}
@ -153,6 +157,44 @@ export function runSplitGroupedQueries(datasource: LokiDatasource, requests: Lok
return response;
}
function updateLoadingFrame(
response: DataQueryResponse,
request: DataQueryRequest<LokiQuery>,
partition: TimeRange[],
requestN: number
): DataQueryResponse {
if (isLogsQuery(request.targets[0].expr) || isLogsVolumeRequest(request)) {
return response;
}
const loadingFrameName = 'loki-splitting-progress';
response.data = response.data.filter((frame) => frame.name !== loadingFrameName);
if (requestN <= 1) {
return response;
}
const loadingFrame = arrayToDataFrame([
{
time: partition[0].from.valueOf(),
timeEnd: partition[requestN - 2].to.valueOf(),
isRegion: true,
color: 'rgba(120, 120, 120, 0.1)',
},
]);
loadingFrame.name = loadingFrameName;
loadingFrame.meta = {
dataTopic: DataTopic.Annotations,
};
response.data.push(loadingFrame);
return response;
}
function isLogsVolumeRequest(request: DataQueryRequest<LokiQuery>): boolean {
return request.targets.some((target) => target.refId.startsWith('log-volume'));
}
function getNextRequestPointers(requests: LokiGroupedRequest[], requestGroup: number, requestN: number) {
// There's a pending request from the next group:
for (let i = requestGroup + 1; i < requests.length; i++) {