CloudWatch: Use single timeout for log queries (#44848)

This commit is contained in:
Isabella Siu 2022-02-08 15:43:37 -05:00 committed by GitHub
parent 3d0cff5410
commit 16e001e762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 52 deletions

View File

@ -81,8 +81,6 @@ const displayAlert = (datasourceName: string, region: string) =>
const displayCustomError = (title: string, message: string) => const displayCustomError = (title: string, message: string) =>
store.dispatch(notifyApp(createErrorNotification(title, message))); store.dispatch(notifyApp(createErrorNotification(title, message)));
export const MAX_ATTEMPTS = 5;
export class CloudWatchDatasource export class CloudWatchDatasource
extends DataSourceWithBackend<CloudWatchQuery, CloudWatchJsonData> extends DataSourceWithBackend<CloudWatchQuery, CloudWatchJsonData>
implements DataSourceWithLogsContextSupport implements DataSourceWithLogsContextSupport
@ -180,6 +178,11 @@ export class CloudWatchDatasource
region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'), region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
})); }));
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + rangeUtil.intervalToMs(this.logsTimeout);
};
return runWithRetry( return runWithRetry(
(targets: StartQueryRequest[]) => { (targets: StartQueryRequest[]) => {
return this.makeLogActionRequest('StartQuery', targets, { return this.makeLogActionRequest('StartQuery', targets, {
@ -189,9 +192,7 @@ export class CloudWatchDatasource
}); });
}, },
queryParams, queryParams,
{ timeoutFunc
timeout: rangeUtil.intervalToMs(this.logsTimeout),
}
).pipe( ).pipe(
mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) => mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) =>
// This queries for the results // This queries for the results
@ -202,7 +203,8 @@ export class CloudWatchDatasource
refId: dataFrame.refId!, refId: dataFrame.refId!,
statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery) statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
.statsGroups, .statsGroups,
})) })),
timeoutFunc
).pipe( ).pipe(
map((response: DataQueryResponse) => { map((response: DataQueryResponse) => {
if (!response.error && error) { if (!response.error && error) {
@ -310,7 +312,8 @@ export class CloudWatchDatasource
limit?: number; limit?: number;
region: string; region: string;
statsGroups?: string[]; statsGroups?: string[];
}> }>,
timeoutFunc: () => boolean
): Observable<DataQueryResponse> { ): Observable<DataQueryResponse> {
this.logQueries = {}; this.logQueries = {};
queryParams.forEach((param) => { queryParams.forEach((param) => {
@ -363,7 +366,7 @@ export class CloudWatchDatasource
} }
}), }),
map(([dataFrames, failedAttempts]) => { map(([dataFrames, failedAttempts]) => {
if (failedAttempts >= MAX_ATTEMPTS) { if (timeoutFunc()) {
for (const frame of dataFrames) { for (const frame of dataFrames) {
set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Cancelled); set(frame, 'meta.custom.Status', CloudWatchLogsQueryStatus.Cancelled);
} }
@ -381,13 +384,12 @@ export class CloudWatchDatasource
) )
? LoadingState.Done ? LoadingState.Done
: LoadingState.Loading, : LoadingState.Loading,
error: error: timeoutFunc()
failedAttempts >= MAX_ATTEMPTS ? {
? { message: `error: query timed out after ${failedAttempts} attempts`,
message: `error: query timed out after ${MAX_ATTEMPTS} attempts`, type: DataQueryErrorType.Timeout,
type: DataQueryErrorType.Timeout, }
} : undefined,
: undefined,
}; };
}), }),
takeWhile(({ state }) => state !== LoadingState.Error && state !== LoadingState.Done, true) takeWhile(({ state }) => state !== LoadingState.Error && state !== LoadingState.Done, true)

View File

@ -8,7 +8,7 @@ import {
} from '@grafana/data'; } from '@grafana/data';
import * as redux from 'app/store/store'; import * as redux from 'app/store/store';
import { CloudWatchDatasource, MAX_ATTEMPTS } from '../datasource'; import { CloudWatchDatasource } from '../datasource';
import { TemplateSrv } from 'app/features/templating/template_srv'; import { TemplateSrv } from 'app/features/templating/template_srv';
import { import {
MetricEditorMode, MetricEditorMode,
@ -177,7 +177,7 @@ describe('CloudWatchDatasource', () => {
jest.spyOn(rxjsUtils, 'increasingInterval').mockImplementation(() => interval(100)); jest.spyOn(rxjsUtils, 'increasingInterval').mockImplementation(() => interval(100));
}); });
it('should stop querying when no more data received a number of times in a row', async () => { it('should stop querying when timed out', async () => {
const { ds } = getTestContext(); const { ds } = getTestContext();
const fakeFrames = genMockFrames(20); const fakeFrames = genMockFrames(20);
const initialRecordsMatched = fakeFrames[0].meta!.stats!.find((stat) => stat.displayName === 'Records scanned')! const initialRecordsMatched = fakeFrames[0].meta!.stats!.find((stat) => stat.displayName === 'Records scanned')!
@ -213,8 +213,13 @@ describe('CloudWatchDatasource', () => {
} }
}); });
const iterations = 15;
// Times out after 15 passes for consistent testing
const timeoutFunc = () => {
return i >= iterations;
};
const myResponse = await lastValueFrom( const myResponse = await lastValueFrom(
ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }]) ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc)
); );
const expectedData = [ const expectedData = [
@ -235,10 +240,10 @@ describe('CloudWatchDatasource', () => {
state: 'Done', state: 'Done',
error: { error: {
type: DataQueryErrorType.Timeout, type: DataQueryErrorType.Timeout,
message: `error: query timed out after ${MAX_ATTEMPTS} attempts`, message: `error: query timed out after 5 attempts`,
}, },
}); });
expect(i).toBe(15); expect(i).toBe(iterations);
}); });
it('should continue querying as long as new data is being received', async () => { it('should continue querying as long as new data is being received', async () => {
@ -256,8 +261,12 @@ describe('CloudWatchDatasource', () => {
} }
}); });
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + 6000;
};
const myResponse = await lastValueFrom( const myResponse = await lastValueFrom(
ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }]) ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc)
); );
expect(myResponse).toEqual({ expect(myResponse).toEqual({
data: [fakeFrames[fakeFrames.length - 1]], data: [fakeFrames[fakeFrames.length - 1]],
@ -281,8 +290,12 @@ describe('CloudWatchDatasource', () => {
} }
}); });
const startTime = new Date();
const timeoutFunc = () => {
return Date.now() >= startTime.valueOf() + 6000;
};
const myResponse = await lastValueFrom( const myResponse = await lastValueFrom(
ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }]) ds.logsQuery([{ queryId: 'fake-query-id', region: 'default', refId: 'A' }], timeoutFunc)
); );
expect(myResponse).toEqual({ expect(myResponse).toEqual({

View File

@ -6,11 +6,13 @@ import { DataResponse, FetchError } from '@grafana/runtime';
import { StartQueryRequest } from '../types'; import { StartQueryRequest } from '../types';
describe('runWithRetry', () => { describe('runWithRetry', () => {
const timeoutPass = () => false;
const timeoutFail = () => true;
it('returns results if no retry is needed', async () => { it('returns results if no retry is needed', async () => {
const queryFunc = jest.fn(); const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const targets = [targetA]; const targets = [targetA];
const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1); expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).toBeCalledWith(targets); expect(queryFunc).toBeCalledWith(targets);
expect(values).toEqual([{ frames: [createResponseFrame('A')] }]); expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
@ -23,7 +25,7 @@ describe('runWithRetry', () => {
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets))); queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers(); jest.runAllTimers();
const values = await valuesPromise; const values = await valuesPromise;
@ -33,14 +35,14 @@ describe('runWithRetry', () => {
expect(values).toEqual([{ frames: [createResponseFrame('A')] }]); expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
}); });
it('fails if reaching timoeut and no data was retrieved', async () => { it('fails if reaching timeout and no data was retrieved', async () => {
jest.useFakeTimers(); jest.useFakeTimers();
const targets = [targetA]; const targets = [targetA];
const queryFunc = jest.fn(); const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets))); queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')])); queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray())); const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
jest.runAllTimers(); jest.runAllTimers();
let error; let error;
try { try {
@ -60,7 +62,7 @@ describe('runWithRetry', () => {
const queryFunc = jest.fn(); const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => 'random error')); queryFunc.mockReturnValueOnce(throwError(() => 'random error'));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers(); jest.runAllTimers();
let error; let error;
try { try {
@ -79,7 +81,7 @@ describe('runWithRetry', () => {
const queryFunc = jest.fn(); const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')])); queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')]));
const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); const values = await lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1); expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets); expect(queryFunc).nthCalledWith(1, targets);
@ -101,7 +103,7 @@ describe('runWithRetry', () => {
queryFunc.mockReturnValueOnce(of([createResponseFrame('B')])); queryFunc.mockReturnValueOnce(of([createResponseFrame('B')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray())); const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutPass).pipe(toArray()));
jest.runAllTimers(); jest.runAllTimers();
const values = await valuesPromise; const values = await valuesPromise;
@ -129,7 +131,7 @@ describe('runWithRetry', () => {
) )
); );
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray())); const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, timeoutFail).pipe(toArray()));
jest.runAllTimers(); jest.runAllTimers();
const values = await valuesPromise; const values = await valuesPromise;
@ -172,9 +174,7 @@ describe('runWithRetry', () => {
) )
); );
const valuesPromise = lastValueFrom( const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, (retry) => retry >= 2).pipe(toArray()));
runWithRetry(queryFunc, targets, { timeoutFunc: (retry) => retry >= 2 }).pipe(toArray())
);
jest.runAllTimers(); jest.runAllTimers();
const values = await valuesPromise; const values = await valuesPromise;

View File

@ -5,8 +5,6 @@ import { DataFrame, DataFrameJSON, DataQueryError } from '@grafana/data';
type Result = { frames: DataFrameJSON[]; error?: string }; type Result = { frames: DataFrameJSON[]; error?: string };
const defaultTimeout = 30_000;
/** /**
* A retry strategy specifically for cloud watch logs query. Cloud watch logs queries need first starting the query * A retry strategy specifically for cloud watch logs query. Cloud watch logs queries need first starting the query
* and the polling for the results. The start query can fail because of the concurrent queries rate limit, * and the polling for the results. The start query can fail because of the concurrent queries rate limit,
@ -23,11 +21,7 @@ const defaultTimeout = 30_000;
export function runWithRetry( export function runWithRetry(
queryFun: (targets: StartQueryRequest[]) => Observable<DataFrame[]>, queryFun: (targets: StartQueryRequest[]) => Observable<DataFrame[]>,
targets: StartQueryRequest[], targets: StartQueryRequest[],
options: { timeoutFunc: (retry: number, startTime: number) => boolean
timeout?: number;
timeoutFunc?: (retry: number, startTime: number) => boolean;
retryWaitFunc?: (retry: number) => number;
} = {}
): Observable<{ frames: DataFrame[]; error?: DataQueryError }> { ): Observable<{ frames: DataFrame[]; error?: DataQueryError }> {
const startTime = new Date(); const startTime = new Date();
let retries = 0; let retries = 0;
@ -35,17 +29,9 @@ export function runWithRetry(
let subscription: Subscription; let subscription: Subscription;
let collected = {}; let collected = {};
const timeoutFunction = options.timeoutFunc const retryWaitFunction = (retry: number) => {
? options.timeoutFunc return Math.pow(2, retry) * 1000 + Math.random() * 100;
: (retry: number, startTime: number) => { };
return Date.now() >= startTime + (options.timeout === undefined ? defaultTimeout : options.timeout);
};
const retryWaitFunction = options.retryWaitFunc
? options.retryWaitFunc
: (retry: number) => {
return Math.pow(2, retry) * 1000 + Math.random() * 100;
};
return new Observable((observer) => { return new Observable((observer) => {
// Run function is where the logic takes place. We have it in a function so we can call it recursively. // Run function is where the logic takes place. We have it in a function so we can call it recursively.
@ -82,7 +68,7 @@ export function runWithRetry(
return; return;
} }
if (timeoutFunction(retries, startTime.valueOf())) { if (timeoutFunc(retries, startTime.valueOf())) {
// We timed out but we could have started some queries // We timed out but we could have started some queries
if (Object.keys(collected).length || Object.keys(errorData.good).length) { if (Object.keys(collected).length || Object.keys(errorData.good).length) {
const dataResponse = toDataQueryResponse({ const dataResponse = toDataQueryResponse({