CloudWatch Logs: Disable query path using websockets (Live) feature (#39231)

* Use only non live branch for querying logs.

* Update tests

* fix lint
This commit is contained in:
Andrej Ocenas 2021-09-15 16:28:34 +02:00 committed by GitHub
parent 264946f37f
commit 515d6afec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 159 deletions

View File

@ -1,10 +1,11 @@
import { from, lastValueFrom, of } from 'rxjs';
import { setBackendSrv, setDataSourceSrv, setGrafanaLiveSrv } from '@grafana/runtime';
import { ArrayVector, dataFrameToJSON, dateTime, Field, MutableDataFrame } from '@grafana/data';
import { lastValueFrom, of } from 'rxjs';
import { setBackendSrv, setDataSourceSrv } from '@grafana/runtime';
import { ArrayVector, DataFrame, dataFrameToJSON, dateTime, Field, MutableDataFrame } from '@grafana/data';
import { TemplateSrv } from '../../../features/templating/template_srv';
import { CloudWatchDatasource } from './datasource';
import { toArray } from 'rxjs/operators';
import { CloudWatchLogsQueryStatus } from './types';
describe('datasource', () => {
describe('query', () => {
@ -143,16 +144,15 @@ function setup({ data = [] }: { data?: any } = {}) {
}
function setupForLogs() {
const { datasource, fetchMock } = setup({
data: {
results: {
a: {
refId: 'a',
frames: [dataFrameToJSON(new MutableDataFrame({ fields: [], meta: { custom: { channelName: 'test' } } }))],
},
},
},
});
function envelope(frame: DataFrame) {
return { data: { results: { a: { refId: 'a', frames: [dataFrameToJSON(frame)] } } } };
}
const { datasource, fetchMock } = setup();
const startQueryFrame = new MutableDataFrame({ fields: [{ name: 'queryId', values: ['queryid'] }] });
fetchMock.mockReturnValueOnce(of(envelope(startQueryFrame)));
const logsFrame = new MutableDataFrame({
fields: [
{
@ -168,23 +168,10 @@ function setupForLogs() {
values: new ArrayVector(['1-613f0d6b-3e7cb34375b60662359611bd']),
},
],
meta: { custom: { Status: CloudWatchLogsQueryStatus.Complete } },
});
setGrafanaLiveSrv({
getStream() {
return from([
{
type: 'message',
message: {
results: {
a: {
frames: [dataFrameToJSON(logsFrame)],
},
},
},
},
]);
},
} as any);
fetchMock.mockReturnValueOnce(of(envelope(logsFrame)));
setDataSourceSrv({
async get() {

View File

@ -2,20 +2,8 @@ import React from 'react';
import angular from 'angular';
import { find, isEmpty, isString, set } from 'lodash';
import { from, lastValueFrom, merge, Observable, of, throwError, zip } from 'rxjs';
import {
catchError,
concatMap,
filter,
finalize,
map,
mergeMap,
repeat,
scan,
share,
takeWhile,
tap,
} from 'rxjs/operators';
import { DataSourceWithBackend, getBackendSrv, getGrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime';
import { catchError, concatMap, finalize, map, mergeMap, repeat, scan, share, takeWhile, tap } from 'rxjs/operators';
import { DataSourceWithBackend, getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import {
DataFrame,
@ -24,9 +12,6 @@ import {
DataQueryResponse,
DataSourceInstanceSettings,
dateMath,
LiveChannelEvent,
LiveChannelMessageEvent,
LiveChannelScope,
LoadingState,
LogRowModel,
rangeUtil,
@ -64,7 +49,6 @@ import { CloudWatchLanguageProvider } from './language_provider';
import { VariableWithMultiSupport } from 'app/features/variables/types';
import { increasingInterval } from './utils/rxjs/increasingInterval';
import { toTestingStatus } from '@grafana/runtime/src/utils/queryResponse';
import config from 'app/core/config';
import { addDataLinksToLogsResponse } from './utils/datalinks';
const DS_QUERY_ENDPOINT = '/api/ds/query';
@ -147,6 +131,12 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return merge(...dataQueryResponses);
}
/**
* Handle log query. The log query works by starting the query on the CloudWatch and then periodically polling for
* results.
* @param logQueries
* @param options
*/
handleLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
@ -161,11 +151,27 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return of({ data: [], state: LoadingState.Done });
}
const response = config.liveEnabled
? this.handleLiveLogQueries(validLogQueries, options)
: this.handleLegacyLogQueries(validLogQueries, options);
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
queryString: target.expression,
refId: target.refId,
logGroupNames: target.logGroupNames,
region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
}));
return response.pipe(
// This first starts the query which returns queryId which can be used to retrieve results.
return this.makeLogActionRequest('StartQuery', queryParams, options.scopedVars).pipe(
mergeMap((dataFrames) =>
// This queries for the results
this.logsQuery(
dataFrames.map((dataFrame) => ({
queryId: dataFrame.fields[0].values.get(0),
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
.statsGroups,
}))
)
),
mergeMap((dataQueryResponse) => {
return from(
(async () => {
@ -185,105 +191,6 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
);
};
/**
* Handle log query using grafana live feature. This means the backend will return a websocket channel name and it
* will listen on it for partial responses until it's terminated. This should give quicker partial data to the user
* as the log query can be long running. This requires that config.liveEnabled === true as that controls whether
* websocket connections can be made.
* @param logQueries
* @param options
*/
private handleLiveLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
): Observable<DataQueryResponse> => {
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
intervalMs: 1, // dummy
maxDataPoints: 1, // dummy
datasourceId: this.id,
queryString: this.replace(target.expression, options.scopedVars, true),
refId: target.refId,
logGroupNames: target.logGroupNames?.map((logGroup) =>
this.replace(logGroup, options.scopedVars, true, 'log groups')
),
statsGroups: target.statsGroups,
region: this.getActualRegion(this.replace(target.region, options.scopedVars, true, 'region')),
type: 'liveLogAction',
}));
const range = this.timeSrv.timeRange();
const requestParams = {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
queries: queryParams,
};
return this.awsRequest(DS_QUERY_ENDPOINT, requestParams).pipe(
mergeMap((response: TSDBResponse) => {
const dataQueryResponse = toDataQueryResponse({ data: response }, options.targets);
const channelName: string = dataQueryResponse.data[0].meta.custom.channelName;
return getGrafanaLiveSrv().getStream({
scope: LiveChannelScope.Plugin,
namespace: 'cloudwatch',
path: channelName,
});
}),
filter((e: LiveChannelEvent<any>) => e.type === 'message'),
map(({ message }: LiveChannelMessageEvent<TSDBResponse>) => {
const dataQueryResponse = toDataQueryResponse({
data: message,
});
dataQueryResponse.state = dataQueryResponse.data.every((dataFrame) =>
statusIsTerminated(dataFrame.meta?.custom?.['Status'])
)
? LoadingState.Done
: LoadingState.Loading;
dataQueryResponse.key = message.results[Object.keys(message.results)[0]].refId;
return dataQueryResponse;
}),
catchError((err) => {
if (err.data?.error) {
throw err.data.error;
}
throw err;
})
);
};
/**
* Handle query the old way (see handleLiveLogQueries) when websockets are not enabled. As enabling websockets is
* configurable we will have to be able to degrade gracefully for the time being.
* @param logQueries
* @param options
*/
private handleLegacyLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
): Observable<DataQueryResponse> => {
const queryParams = logQueries.map((target: CloudWatchLogsQuery) => ({
queryString: target.expression,
refId: target.refId,
logGroupNames: target.logGroupNames,
region: this.replace(this.getActualRegion(target.region), options.scopedVars, true, 'region'),
}));
return this.makeLogActionRequest('StartQuery', queryParams, options.scopedVars).pipe(
mergeMap((dataFrames) =>
this.logsQuery(
dataFrames.map((dataFrame) => ({
queryId: dataFrame.fields[0].values.get(0),
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId!,
statsGroups: (logQueries.find((target) => target.refId === dataFrame.refId)! as CloudWatchLogsQuery)
.statsGroups,
}))
)
)
);
};
handleMetricQueries = (
metricQueries: CloudWatchMetricsQuery[],
options: DataQueryRequest<CloudWatchQuery>
@ -328,6 +235,10 @@ export class CloudWatchDatasource extends DataSourceWithBackend<CloudWatchQuery,
return this.performTimeSeriesQuery(request, options.range);
};
/**
* Checks progress and polls data of a started logs query with some retry logic.
* @param queryParams
*/
logsQuery(
queryParams: Array<{
queryId: string;
@ -1041,12 +952,3 @@ function parseLogGroupName(logIdentifier: string): string {
const colonIndex = logIdentifier.lastIndexOf(':');
return logIdentifier.substr(colonIndex + 1);
}
function statusIsTerminated(status: string | CloudWatchLogsQueryStatus) {
return [
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
CloudWatchLogsQueryStatus.Timeout,
].includes(status as CloudWatchLogsQueryStatus);
}