Shard query splitting: improve error handling (#95824)

* Shard query splitting: Improve error handling

* Formatting

* Shard query splitting: do not shard instant queries

* Shard query splitting: stop querying on parse errors
This commit is contained in:
Matias Chomicki 2024-11-05 16:34:22 +00:00 committed by GitHub
parent 82ac9e2bb6
commit a3d3af35da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 23 additions and 9 deletions

View File

@ -316,6 +316,7 @@ export function requestSupportsSplitting(allQueries: LokiQuery[]) {
export function requestSupportsSharding(allQueries: LokiQuery[]) {
const queries = allQueries
.filter((query) => !query.hide)
.filter((query) => query.queryType !== LokiQueryType.Instant)
.filter((query) => !query.refId.includes('do-not-shard'))
.filter((query) => query.expr)
.filter((query) => query.direction === LokiQueryDirection.Scan || !isLogsQuery(query.expr));

View File

@ -133,10 +133,12 @@ export function extractLevelLikeLabelFromDataFrame(frame: DataFrame): string | n
}
export function isRetriableError(errorResponse: DataQueryResponse) {
const message = errorResponse.errors ? (errorResponse.errors[0].message ?? '').toLowerCase() : '';
const message = errorResponse.errors
? (errorResponse.errors[0].message ?? '').toLowerCase()
: (errorResponse.error?.message ?? '');
if (message.includes('timeout')) {
return true;
} else if (message.includes('parse error') || message.includes('max entries')) {
} else if (message.includes('parse error')) {
// If the error is a parse error, we want to signal to stop querying.
throw new Error(message);
}

View File

@ -15,8 +15,9 @@ jest.mock('uuid', () => ({
const originalLog = console.log;
const originalWarn = console.warn;
beforeEach(() => {
//jest.spyOn(console, 'log').mockImplementation(() => {});
jest.spyOn(console, 'log').mockImplementation(() => {});
jest.spyOn(console, 'warn').mockImplementation(() => {});
jest.spyOn(console, 'error').mockImplementation(() => {});
});
afterAll(() => {
console.log = originalLog;
@ -182,6 +183,16 @@ describe('runShardSplitQuery()', () => {
});
});
test('Failed requests have loading state Error', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest
.spyOn(datasource, 'runQuery')
.mockReturnValue(of({ state: LoadingState.Error, error: { refId: 'A', message: 'parse error' }, data: [] }));
await expect(runShardSplitQuery(datasource, request)).toEmitValuesWith((response: DataQueryResponse[]) => {
expect(response[0].state).toBe(LoadingState.Error);
});
});
test('Does not retry on other errors', async () => {
jest.mocked(datasource.languageProvider.fetchLabelValues).mockResolvedValue(['1']);
jest

View File

@ -75,17 +75,17 @@ function splitQueriesByStreamShard(
subquerySubscription = null;
}
if (shouldStop) {
subscriber.complete();
return;
}
const done = () => {
mergedResponse.state = LoadingState.Done;
mergedResponse.state = shouldStop ? LoadingState.Error : LoadingState.Done;
subscriber.next(mergedResponse);
subscriber.complete();
};
if (shouldStop) {
done();
return;
}
const nextRequest = () => {
const nextGroup =
groups[group + 1] && groupHasPendingRequests(groups[group + 1])