CloudWatch Logs: Add retry strategy for hitting max concurrent queries (#39290)

* Add error passing and retry strategy

* Change generic error to specific one

* Make the error more generic

* Refactor retry strategy

* Add retry that handles multiple queries

* Rollback some backend changes

* Remove simple retry strategy

* Add comments

* Add tests

* Small test fixes

* Add log timeout config

* Fix tests

* Fix tests

* Add validation

* Remove commented code and add comment

* Fix snapshots

* Remove unnecessary cast
This commit is contained in:
Andrej Ocenas 2021-11-17 21:46:13 +01:00 committed by GitHub
parent 3dd73387fa
commit e237ff20a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 659 additions and 58 deletions

View File

@ -114,7 +114,7 @@ export interface FetchErrorDataProps {
*
* @public
*/
export interface FetchError<T extends FetchErrorDataProps = any> {
export interface FetchError<T = any> {
status: number;
statusText?: string;
data: T;

View File

@ -14,9 +14,22 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/util/errutil"
"golang.org/x/sync/errgroup"
)
var LimitExceededException = "LimitExceededException"
type AWSError struct {
Code string
Message string
Payload map[string]string
}
func (e *AWSError) Error() string {
return fmt.Sprintf("%s: %s", e.Code, e.Message)
}
func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
resp := backend.NewQueryDataResponse()
@ -33,6 +46,13 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
eg.Go(func() error {
dataframe, err := e.executeLogAction(ectx, model, query, req.PluginContext)
if err != nil {
var AWSError *AWSError
if errors.As(err, &AWSError) {
resultChan <- backend.Responses{
query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: AWSError},
}
return nil
}
return err
}
@ -56,6 +76,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
for refID, response := range result {
respD := resp.Responses[refID]
respD.Frames = response.Frames
respD.Error = response.Error
resp.Responses[refID] = respD
}
}
@ -96,7 +117,7 @@ func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simple
data, err = e.handleGetLogEvents(ctx, logsClient, model)
}
if err != nil {
return nil, err
return nil, errutil.Wrapf(err, "failed to execute log action with subtype: %s", subType)
}
return data, nil
@ -224,6 +245,11 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl
model *simplejson.Json, timeRange backend.TimeRange, refID string) (*data.Frame, error) {
startQueryResponse, err := e.executeStartQuery(ctx, logsClient, model, timeRange)
if err != nil {
var awsErr awserr.Error
if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" {
plog.Debug("executeStartQuery limit exceeded", "err", awsErr)
return nil, &AWSError{Code: LimitExceededException, Message: err.Error()}
}
return nil, err
}

View File

@ -3,7 +3,6 @@ package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
@ -285,7 +284,7 @@ func TestQuery_StartQuery(t *testing.T) {
})
require.Error(t, err)
assert.Equal(t, fmt.Errorf("invalid time range: start time must be before end time"), err)
assert.Contains(t, err.Error(), "invalid time range: start time must be before end time")
})
t.Run("valid time range", func(t *testing.T) {

View File

@ -1,6 +1,8 @@
import React, { FC, useEffect, useState } from 'react';
import { useDebounce } from 'react-use';
import { Input, InlineField } from '@grafana/ui';
import {
rangeUtil,
DataSourcePluginOptionsEditorProps,
onUpdateDatasourceJsonDataOption,
updateDatasourcePluginJsonDataOption,
@ -23,6 +25,7 @@ export const ConfigEditor: FC<Props> = (props: Props) => {
const datasource = useDatasource(options.name);
useAuthenticationWarning(options.jsonData);
const logsTimeoutError = useTimoutValidation(props.options.jsonData.logsTimeout);
return (
<>
@ -43,6 +46,24 @@ export const ConfigEditor: FC<Props> = (props: Props) => {
</InlineField>
</ConnectionConfig>
<h3 className="page-heading">CloudWatch Logs</h3>
<div className="gf-form-group">
<InlineField
label="Timeout"
labelWidth={28}
tooltip='Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as "15m" "30s" "2000ms" etc.'
invalid={Boolean(logsTimeoutError)}
>
<Input
width={60}
placeholder="15m"
value={options.jsonData.logsTimeout || ''}
onChange={onUpdateDatasourceJsonDataOption(props, 'logsTimeout')}
title={'The timeout must be a valid duration string, such as "15m" "30s" "2000ms" etc.'}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={(uid) => updateDatasourcePluginJsonDataOption(props, 'tracingDatasourceUid', uid)}
datasourceUid={options.jsonData.tracingDatasourceUid}
@ -84,3 +105,24 @@ function useDatasource(datasourceName: string) {
return datasource;
}
function useTimoutValidation(value: string | undefined) {
const [err, setErr] = useState<undefined | string>(undefined);
useDebounce(
() => {
if (value) {
try {
rangeUtil.describeInterval(value);
setErr(undefined);
} catch (e) {
setErr(e.toString());
}
} else {
setErr(undefined);
}
},
350,
[value]
);
return err;
}

View File

@ -62,6 +62,29 @@ exports[`Render should disable access key id field 1`] = `
/>
</InlineField>
</ConnectionConfig>
<h3
className="page-heading"
>
CloudWatch Logs
</h3>
<div
className="gf-form-group"
>
<InlineField
invalid={false}
label="Timeout"
labelWidth={28}
tooltip="Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
>
<Input
onChange={[Function]}
placeholder="15m"
title="The timeout must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
value=""
width={60}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={[Function]}
/>
@ -125,6 +148,29 @@ exports[`Render should render component 1`] = `
/>
</InlineField>
</ConnectionConfig>
<h3
className="page-heading"
>
CloudWatch Logs
</h3>
<div
className="gf-form-group"
>
<InlineField
invalid={false}
label="Timeout"
labelWidth={28}
tooltip="Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
>
<Input
onChange={[Function]}
placeholder="15m"
title="The timeout must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
value=""
width={60}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={[Function]}
/>
@ -193,6 +239,29 @@ exports[`Render should show access key and secret access key fields 1`] = `
/>
</InlineField>
</ConnectionConfig>
<h3
className="page-heading"
>
CloudWatch Logs
</h3>
<div
className="gf-form-group"
>
<InlineField
invalid={false}
label="Timeout"
labelWidth={28}
tooltip="Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
>
<Input
onChange={[Function]}
placeholder="15m"
title="The timeout must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
value=""
width={60}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={[Function]}
/>
@ -261,6 +330,29 @@ exports[`Render should show arn role field 1`] = `
/>
</InlineField>
</ConnectionConfig>
<h3
className="page-heading"
>
CloudWatch Logs
</h3>
<div
className="gf-form-group"
>
<InlineField
invalid={false}
label="Timeout"
labelWidth={28}
tooltip="Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
>
<Input
onChange={[Function]}
placeholder="15m"
title="The timeout must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
value=""
width={60}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={[Function]}
/>
@ -329,6 +421,29 @@ exports[`Render should show credentials profile name field 1`] = `
/>
</InlineField>
</ConnectionConfig>
<h3
className="page-heading"
>
CloudWatch Logs
</h3>
<div
className="gf-form-group"
>
<InlineField
invalid={false}
label="Timeout"
labelWidth={28}
tooltip="Custom timout for CloudWatch Logs insights queries which have max concurrency limits. Default is 15 minutes. Must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
>
<Input
onChange={[Function]}
placeholder="15m"
title="The timeout must be a valid duration string, such as \\"15m\\" \\"30s\\" \\"2000ms\\" etc."
value=""
width={60}
/>
</InlineField>
</div>
<XrayLinkConfig
onChange={[Function]}
/>

View File

@ -30,16 +30,20 @@ describe('datasource', () => {
it('should interpolate variables in the query', async () => {
const { datasource, fetchMock } = setupMockedDataSource();
datasource.query({
targets: [
{
queryMode: 'Logs' as 'Logs',
region: '$region',
expression: 'fields $fields',
logGroupNames: ['/some/$group'],
},
],
} as any);
await lastValueFrom(
datasource
.query({
targets: [
{
queryMode: 'Logs',
region: '$region',
expression: 'fields $fields',
logGroupNames: ['/some/$group'],
},
],
} as any)
.pipe(toArray())
);
expect(fetchMock.mock.calls[0][0].data.queries[0]).toMatchObject({
queryString: 'fields templatedField',
logGroupNames: ['/some/templatedGroup'],

View File

@ -3,10 +3,11 @@ import angular from 'angular';
import { find, findLast, isEmpty, isString, set } from 'lodash';
import { from, lastValueFrom, merge, Observable, of, throwError, zip } from 'rxjs';
import { catchError, concatMap, finalize, map, mergeMap, repeat, scan, share, takeWhile, tap } from 'rxjs/operators';
import { DataSourceWithBackend, getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { DataSourceWithBackend, FetchError, getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import {
DataFrame,
DataQueryError,
DataQueryErrorType,
DataQueryRequest,
DataQueryResponse,
@ -54,6 +55,7 @@ import { VariableWithMultiSupport } from 'app/features/variables/types';
import { increasingInterval } from './utils/rxjs/increasingInterval';
import { toTestingStatus } from '@grafana/runtime/src/utils/queryResponse';
import { addDataLinksToLogsResponse } from './utils/datalinks';
import { runWithRetry } from './utils/logsRetry';
const DS_QUERY_ENDPOINT = '/api/ds/query';
@ -85,6 +87,7 @@ export class CloudWatchDatasource
datasourceName: string;
languageProvider: CloudWatchLanguageProvider;
tracingDataSourceUid?: string;
logsTimeout: string;
type = 'cloudwatch';
standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'];
@ -109,6 +112,7 @@ export class CloudWatchDatasource
this.datasourceName = instanceSettings.name;
this.languageProvider = new CloudWatchLanguageProvider(this);
this.tracingDataSourceUid = instanceSettings.jsonData.tracingDatasourceUid;
this.logsTimeout = instanceSettings.jsonData.logsTimeout || '15m';
}
query(options: DataQueryRequest<CloudWatchQuery>): Observable<DataQueryResponse> {
@ -158,28 +162,42 @@ export class CloudWatchDatasource
}
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
queryString: target.expression,
queryString: target.expression || '',
refId: target.refId,
logGroupNames: target.logGroupNames,
region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
}));
// This first starts the query which returns queryId which can be used to retrieve results.
return this.makeLogActionRequest('StartQuery', queryParams, {
makeReplacements: true,
scopedVars: options.scopedVars,
skipCache: true,
}).pipe(
mergeMap((dataFrames) =>
return runWithRetry(
(targets: StartQueryRequest[]) => {
return this.makeLogActionRequest('StartQuery', targets, {
makeReplacements: true,
scopedVars: options.scopedVars,
skipCache: true,
});
},
queryParams,
{
timeout: rangeUtil.intervalToMs(this.logsTimeout),
}
).pipe(
mergeMap(({ frames, error }: { frames: DataFrame[]; error?: DataQueryError }) =>
// This queries for the results
this.logsQuery(
dataFrames.map((dataFrame) => ({
frames.map((dataFrame) => ({
queryId: dataFrame.fields[0].values.get(0),
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
.statsGroups,
}))
).pipe(
map((response: DataQueryResponse) => {
if (!response.error && error) {
response.error = error;
}
return response;
})
)
),
mergeMap((dataQueryResponse) => {
@ -544,37 +562,47 @@ export class CloudWatchDatasource
const requestParams = {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
queries: queryParams.map((param: any) => ({
refId: 'A',
intervalMs: 1, // dummy
maxDataPoints: 1, // dummy
datasource: this.getRef(),
type: 'logAction',
subtype: subtype,
...param,
})),
queries: queryParams.map(
(param: GetLogEventsRequest | StartQueryRequest | DescribeLogGroupsRequest | GetLogGroupFieldsRequest) => ({
refId: (param as StartQueryRequest).refId || 'A',
intervalMs: 1, // dummy
maxDataPoints: 1, // dummy
datasource: this.getRef(),
type: 'logAction',
subtype: subtype,
...param,
})
),
};
if (options.makeReplacements) {
requestParams.queries.forEach((query) => {
const fieldsToReplace: Array<
keyof (GetLogEventsRequest & StartQueryRequest & DescribeLogGroupsRequest & GetLogGroupFieldsRequest)
> = ['queryString', 'logGroupNames', 'logGroupName', 'logGroupNamePrefix'];
requestParams.queries.forEach(
(query: GetLogEventsRequest | StartQueryRequest | DescribeLogGroupsRequest | GetLogGroupFieldsRequest) => {
const fieldsToReplace: Array<
keyof (GetLogEventsRequest & StartQueryRequest & DescribeLogGroupsRequest & GetLogGroupFieldsRequest)
> = ['queryString', 'logGroupNames', 'logGroupName', 'logGroupNamePrefix'];
for (const fieldName of fieldsToReplace) {
if (query.hasOwnProperty(fieldName)) {
if (Array.isArray(query[fieldName])) {
query[fieldName] = query[fieldName].map((val: string) =>
this.replace(val, options.scopedVars, true, fieldName)
);
} else {
query[fieldName] = this.replace(query[fieldName], options.scopedVars, true, fieldName);
const anyQuery: any = query;
for (const fieldName of fieldsToReplace) {
if (query.hasOwnProperty(fieldName)) {
if (Array.isArray(anyQuery[fieldName])) {
anyQuery[fieldName] = anyQuery[fieldName].map((val: string) =>
this.replace(val, options.scopedVars, true, fieldName)
);
} else {
anyQuery[fieldName] = this.replace(anyQuery[fieldName], options.scopedVars, true, fieldName);
}
}
}
// TODO: seems to be some sort of bug that we don't really send region with all queries. This means
// if you select different than default region in editor you will get results for autocomplete from wrong
// region.
if (anyQuery.region) {
anyQuery.region = this.replace(anyQuery.region, options.scopedVars, true, 'region');
anyQuery.region = this.getActualRegion(anyQuery.region);
}
}
query.region = this.replace(query.region, options.scopedVars, true, 'region');
query.region = this.getActualRegion(query.region);
});
);
}
const resultsToDataFrames = (val: any): DataFrame[] => toDataQueryResponse(val).data || [];
@ -587,7 +615,11 @@ export class CloudWatchDatasource
return this.awsRequest(DS_QUERY_ENDPOINT, requestParams, headers).pipe(
map((response) => resultsToDataFrames({ data: response })),
catchError((err) => {
catchError((err: FetchError) => {
if (err.status === 400) {
throw err;
}
if (err.data?.error) {
throw err.data.error;
} else if (err.data?.message) {

View File

@ -76,7 +76,8 @@ export interface CloudWatchJsonData extends AwsAuthDataSourceJsonData {
database?: string;
customMetricsNamespaces?: string;
endpoint?: string;
// Time string like 15s, 10m etc, see rangeUtils.intervalToMs.
logsTimeout?: string;
// Used to create links if logs contain traceId.
tracingDatasourceUid?: string;
}
@ -284,14 +285,6 @@ export interface StartQueryRequest {
* The list of log groups to be queried. You can include up to 20 log groups. A StartQuery operation must include a logGroupNames or a logGroupName parameter, but not both.
*/
logGroupNames?: string[];
/**
* The beginning of the time range to query. The range is inclusive, so the specified start time is included in the query. Specified as epoch time, the number of seconds since January 1, 1970, 00:00:00 UTC.
*/
startTime: number;
/**
* The end of the time range to query. The range is inclusive, so the specified end time is included in the query. Specified as epoch time, the number of seconds since January 1, 1970, 00:00:00 UTC.
*/
endTime: number;
/**
* The query string to use. For more information, see CloudWatch Logs Insights Query Syntax.
*/
@ -300,6 +293,8 @@ export interface StartQueryRequest {
* The maximum number of log events to return in the query. If the query string uses the fields command, only the specified fields and their values are returned. The default is 1000.
*/
limit?: number;
refId: string;
region: string;
}
export interface StartQueryResponse {
/**

View File

@ -0,0 +1,225 @@
import { runWithRetry } from './logsRetry';
import { toArray } from 'rxjs/operators';
import { lastValueFrom, of, throwError } from 'rxjs';
import { dataFrameToJSON, MutableDataFrame } from '@grafana/data';
import { DataResponse, FetchError } from '@grafana/runtime';
import { StartQueryRequest } from '../types';
describe('runWithRetry', () => {
it('returns results if no retry is needed', async () => {
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const targets = [targetA];
const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).toBeCalledWith(targets);
expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
});
it('retries if error', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(2);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, targets);
expect(values).toEqual([{ frames: [createResponseFrame('A')] }]);
});
it('fails if reaching timoeut and no data was retrieved', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => createErrorResponse(targets)));
queryFunc.mockReturnValueOnce(of([createResponseFrame('A')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray()));
jest.runAllTimers();
let error;
try {
await valuesPromise;
} catch (e) {
error = e;
}
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(error).toEqual({ message: 'LimitExceededException', refId: 'A' });
});
it('fails if we get unexpected error', async () => {
jest.useFakeTimers();
const targets = [targetA];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(throwError(() => 'random error'));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray()));
jest.runAllTimers();
let error;
try {
await valuesPromise;
} catch (e) {
error = e;
}
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(error).toEqual('random error');
});
it('works with multiple queries if there is no error', async () => {
const targets = [targetA, targetB];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(of([createResponseFrame('A'), createResponseFrame('B')]));
const values = await lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray()));
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(values).toEqual([{ frames: [createResponseFrame('A'), createResponseFrame('B')] }]);
});
it('works with multiple queries only one errors out', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(of([createResponseFrame('B')]));
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(2);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, [targetB]);
// Bit more involved because dataFrameToJSON and dataFrameFromJSON are not symmetrical and add some attributes to the
// dataframe fields
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(2);
expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
});
it('sends data and also error if only one query gets limit error', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
})
)
);
const valuesPromise = lastValueFrom(runWithRetry(queryFunc, targets, { timeout: 0 }).pipe(toArray()));
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(1);
expect(queryFunc).nthCalledWith(1, targets);
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(1);
expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
});
it('sends all collected successful responses on timeout', async () => {
jest.useFakeTimers();
const targets = [targetA, targetB, targetC];
const queryFunc = jest.fn();
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
A: { frames: [dataFrameToJSON(createResponseFrame('A'))] },
B: { error: 'LimitExceededException' },
C: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
B: { frames: [dataFrameToJSON(createResponseFrame('B'))] },
C: { error: 'LimitExceededException' },
})
)
);
queryFunc.mockReturnValueOnce(
throwError(() =>
createErrorResponse(targets, {
C: { error: 'LimitExceededException' },
})
)
);
const valuesPromise = lastValueFrom(
runWithRetry(queryFunc, targets, { timeoutFunc: (retry) => retry >= 2 }).pipe(toArray())
);
jest.runAllTimers();
const values = await valuesPromise;
expect(queryFunc).toBeCalledTimes(3);
expect(queryFunc).nthCalledWith(1, targets);
expect(queryFunc).nthCalledWith(2, [targetB, targetC]);
expect(queryFunc).nthCalledWith(3, [targetC]);
expect(values.length).toBe(1);
expect(values[0].frames.length).toBe(2);
expect(values[0].frames[0].fields[0].values.get(0)).toBe('A');
expect(values[0].frames[1].fields[0].values.get(0)).toBe('B');
expect(values[0].error).toEqual({ message: 'Some queries timed out: LimitExceededException' });
});
});
const targetA = makeTarget('A');
const targetB = makeTarget('B');
const targetC = makeTarget('C');
function makeTarget(refId: string) {
return { queryString: 'query ' + refId, refId, region: 'test' };
}
function createResponseFrame(ref: string) {
return new MutableDataFrame({
fields: [{ name: 'queryId', values: [ref] }],
refId: ref,
});
}
function createErrorResponse(targets: StartQueryRequest[], results?: Record<string, DataResponse>): FetchError {
return {
status: 400,
data: {
results: results || {
A: {
error: 'LimitExceededException',
},
},
},
config: {
url: '',
data: {
queries: targets,
},
},
};
}

View File

@ -0,0 +1,163 @@
import { Observable, Subscription } from 'rxjs';
import { FetchError, toDataQueryResponse } from '@grafana/runtime';
import { StartQueryRequest } from '../types';
import { DataFrame, DataFrameJSON, DataQueryError } from '@grafana/data';
type Result = { frames: DataFrameJSON[]; error?: string };
const defaultTimeout = 30_000;
/**
* A retry strategy specifically for cloud watch logs query. Cloud watch logs queries need first starting the query
* and the polling for the results. The start query can fail because of the concurrent queries rate limit,
* and so we hove to retry the start query call if there is already lot of queries running.
*
* As we send multiple queries in single request some can fail and some can succeed and we have to also handle those
* cases by only retrying the failed queries. We retry the failed queries until we hit the time limit or all queries
* succeed and only then we pass the data forward. This means we wait longer but makes the code a bit simpler as we
* can treat starting the query and polling as steps in a pipeline.
* @param queryFun
* @param targets
* @param options
*/
export function runWithRetry(
queryFun: (targets: StartQueryRequest[]) => Observable<DataFrame[]>,
targets: StartQueryRequest[],
options: {
timeout?: number;
timeoutFunc?: (retry: number, startTime: number) => boolean;
retryWaitFunc?: (retry: number) => number;
} = {}
): Observable<{ frames: DataFrame[]; error?: DataQueryError }> {
const startTime = new Date();
let retries = 0;
let timerID: any;
let subscription: Subscription;
let collected = {};
const timeoutFunction = options.timeoutFunc
? options.timeoutFunc
: (retry: number, startTime: number) => {
return Date.now() >= startTime + (options.timeout === undefined ? defaultTimeout : options.timeout);
};
const retryWaitFunction = options.retryWaitFunc
? options.retryWaitFunc
: (retry: number) => {
return Math.pow(2, retry) * 1000 + Math.random() * 100;
};
return new Observable((observer) => {
// Run function is where the logic takes place. We have it in a function so we can call it recursively.
function run(currentQueryParams: StartQueryRequest[]) {
subscription = queryFun(currentQueryParams).subscribe({
next(frames) {
// In case we successfully finished, merge the current response with whatever we already collected.
const collectedPreviously = toDataQueryResponse({ data: { results: collected } }).data || [];
observer.next({ frames: [...collectedPreviously, ...frames] });
observer.complete();
},
error(error: FetchError<{ results?: Record<string, Result> }> | string) {
// In case of error we first try to figure out what kind of error it is
// This means it was a generic 500 error probably so we just pass it on
if (typeof error === 'string') {
observer.error(error);
return;
}
// In case of multiple queries this some can error while some may be ok
const errorData = splitErrorData(error);
if (!errorData) {
// Not sure what happened but the error structure wasn't what we expected
observer.error(error);
return;
}
if (!errorData!.errors.length) {
// So there is no limit error but some other errors so nothing to retry so we just pass it as it would be
// otherwise.
observer.error(error);
return;
}
if (timeoutFunction(retries, startTime.valueOf())) {
// We timed out but we could have started some queries
if (Object.keys(collected).length || Object.keys(errorData.good).length) {
const dataResponse = toDataQueryResponse({
data: {
results: {
...(errorData.good ?? {}),
...(collected ?? {}),
},
},
});
dataResponse.error = {
...(dataResponse.error ?? {}),
message: `Some queries timed out: ${errorData.errorMessage}`,
};
// So we consider this a partial success and pass the data forward but also with error to be shown to
// the user.
observer.next({
error: dataResponse.error,
frames: dataResponse.data,
});
observer.complete();
} else {
// So we timed out and there was no data to pass forward so we just pass the error
const dataResponse = toDataQueryResponse({ data: { results: error.data?.results ?? {} } });
observer.error(dataResponse.error);
}
return;
}
collected = {
...collected,
...errorData!.good,
};
timerID = setTimeout(
() => {
retries++;
console.log(`Attempt ${retries}`);
run(errorData!.errors);
},
// We want to know how long to wait for the next retry. First time this will be 0.
retryWaitFunction(retries + 1)
);
},
});
}
run(targets);
return () => {
// We clear only the latest timer and subscription but the observable should complete after one response so
// there should not be more things running at the same time.
clearTimeout(timerID);
subscription.unsubscribe();
};
});
}
function splitErrorData(error: FetchError<{ results?: Record<string, Result> }>) {
const results = error.data?.results;
if (!results) {
return undefined;
}
return Object.keys(results).reduce<{
errors: StartQueryRequest[];
good: Record<string, Result>;
errorMessage: string;
}>(
(acc, refId) => {
if (results[refId].error?.startsWith('LimitExceededException')) {
acc.errorMessage = results[refId].error!;
acc.errors.push(error.config.data.queries.find((q: any) => q.refId === refId));
} else {
acc.good[refId] = results[refId];
}
return acc;
},
{ errors: [], good: {}, errorMessage: '' }
);
}