Datasource/CloudWatch: Results of CloudWatch Logs stats queries are now grouped (#24396)

* Datasource/CloudWatch: Results of CloudWatch Logs stats queries are now grouped
This commit is contained in:
kay delaney 2020-05-11 18:52:15 +01:00 committed by GitHub
parent 98de101bd8
commit db91961405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 296 additions and 39 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@ -29,6 +30,34 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
return err
}
// When a query of the form "stats ... by ..." is made, we want to return
// one series per group defined in the query, but due to the format
// the query response is in, there does not seem to be a way to tell
// by the response alone if/how the results should be grouped.
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "groupResults" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
if query.Model.Get("groupResults").MustBool() && len(dataframe.Fields) > 0 {
groupingFields := findGroupingFields(dataframe.Fields)
groupedFrames, err := groupResults(dataframe, groupingFields)
if err != nil {
return err
}
encodedFrames := make([][]byte, 0)
for _, frame := range groupedFrames {
dataframeEnc, err := frame.MarshalArrow()
if err != nil {
return err
}
encodedFrames = append(encodedFrames, dataframeEnc)
}
resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: encodedFrames}
return nil
}
dataframeEnc, err := dataframe.MarshalArrow()
if err != nil {
return err
@ -56,6 +85,23 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext
return response, nil
}
func findGroupingFields(fields []*data.Field) []string {
groupingFields := make([]string, 0)
for _, field := range fields {
if field.Type().Numeric() || field.Type() == data.FieldTypeNullableTime || field.Type() == data.FieldTypeTime {
continue
}
if _, err := strconv.ParseFloat(*field.At(0).(*string), 64); err == nil {
continue
}
groupingFields = append(groupingFields, field.Name)
}
return groupingFields
}
func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) {
parameters := query.Model
subType := query.Model.Get("subtype").MustString()

View File

@ -73,3 +73,48 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
return frame, nil
}
func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Frame, error) {
groupingFields := make([]*data.Field, 0)
for _, field := range results.Fields {
for _, groupingField := range groupingFieldNames {
if field.Name == groupingField {
groupingFields = append(groupingFields, field)
}
}
}
rowLength, err := results.RowLen()
if err != nil {
return nil, err
}
groupedDataFrames := make(map[string]*data.Frame)
for i := 0; i < rowLength; i++ {
groupKey := generateGroupKey(groupingFields, i)
if _, exists := groupedDataFrames[groupKey]; !exists {
newFrame := results.EmptyCopy()
newFrame.Name = groupKey
groupedDataFrames[groupKey] = newFrame
}
groupedDataFrames[groupKey].AppendRow(results.RowCopy(i)...)
}
newDataFrames := make([]*data.Frame, 0, len(groupedDataFrames))
for _, dataFrame := range groupedDataFrames {
newDataFrames = append(newDataFrames, dataFrame)
}
return newDataFrames, nil
}
func generateGroupKey(fields []*data.Field, row int) string {
groupKey := ""
for _, field := range fields {
groupKey += *field.At(row).(*string)
}
return groupKey
}

View File

@ -159,3 +159,141 @@ func TestLogsResultsToDataframes(t *testing.T) {
assert.Equal(t, expectedDataframe.Meta, dataframes.Meta)
assert.ElementsMatch(t, expectedDataframe.Fields, dataframes.Fields)
}
func TestGroupKeyGeneration(t *testing.T) {
logField := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-a"),
aws.String("fakelog-b"),
aws.String("fakelog-c"),
})
streamField := data.NewField("stream", data.Labels{}, []*string{
aws.String("stream-a"),
aws.String("stream-b"),
aws.String("stream-c"),
})
fakeFields := []*data.Field{logField, streamField}
expectedKeys := []string{"fakelog-astream-a", "fakelog-bstream-b", "fakelog-cstream-c"}
assert.Equal(t, expectedKeys[0], generateGroupKey(fakeFields, 0))
assert.Equal(t, expectedKeys[1], generateGroupKey(fakeFields, 1))
assert.Equal(t, expectedKeys[2], generateGroupKey(fakeFields, 2))
}
func TestGroupingResults(t *testing.T) {
timeA, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 15:04:05.000")
timeB, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 16:04:05.000")
timeC, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 17:04:05.000")
timeVals := []*time.Time{
&timeA, &timeA, &timeA, &timeB, &timeB, &timeB, &timeC, &timeC, &timeC,
}
timeField := data.NewField("@timestamp", data.Labels{}, timeVals)
logField := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-a"),
aws.String("fakelog-b"),
aws.String("fakelog-c"),
aws.String("fakelog-a"),
aws.String("fakelog-b"),
aws.String("fakelog-c"),
aws.String("fakelog-a"),
aws.String("fakelog-b"),
aws.String("fakelog-c"),
})
countField := data.NewField("count", data.Labels{}, []*string{
aws.String("100"),
aws.String("150"),
aws.String("20"),
aws.String("34"),
aws.String("57"),
aws.String("62"),
aws.String("105"),
aws.String("200"),
aws.String("99"),
})
fakeDataFrame := &data.Frame{
Name: "CloudWatchLogsResponse",
Fields: []*data.Field{
timeField,
logField,
countField,
},
RefID: "",
}
groupedTimeVals := []*time.Time{
&timeA, &timeB, &timeC,
}
groupedTimeField := data.NewField("@timestamp", data.Labels{}, groupedTimeVals)
groupedLogFieldA := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-a"),
aws.String("fakelog-a"),
aws.String("fakelog-a"),
})
groupedCountFieldA := data.NewField("count", data.Labels{}, []*string{
aws.String("100"),
aws.String("34"),
aws.String("105"),
})
groupedLogFieldB := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-b"),
aws.String("fakelog-b"),
aws.String("fakelog-b"),
})
groupedCountFieldB := data.NewField("count", data.Labels{}, []*string{
aws.String("150"),
aws.String("57"),
aws.String("200"),
})
groupedLogFieldC := data.NewField("@log", data.Labels{}, []*string{
aws.String("fakelog-c"),
aws.String("fakelog-c"),
aws.String("fakelog-c"),
})
groupedCountFieldC := data.NewField("count", data.Labels{}, []*string{
aws.String("20"),
aws.String("62"),
aws.String("99"),
})
expectedGroupedFrames := []*data.Frame{
{
Name: "fakelog-a",
Fields: []*data.Field{
groupedTimeField,
groupedLogFieldA,
groupedCountFieldA,
},
RefID: "",
},
{
Name: "fakelog-b",
Fields: []*data.Field{
groupedTimeField,
groupedLogFieldB,
groupedCountFieldB,
},
RefID: "",
},
{
Name: "fakelog-c",
Fields: []*data.Field{
groupedTimeField,
groupedLogFieldC,
groupedCountFieldC,
},
RefID: "",
},
}
groupedResults, _ := groupResults(fakeDataFrame, []string{"@log"})
assert.ElementsMatch(t, expectedGroupedFrames, groupedResults)
}

View File

@ -11,7 +11,6 @@ import {
BracesPlugin,
Select,
MultiSelect,
Token,
} from '@grafana/ui';
// Utils & Services
@ -143,15 +142,37 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
onChangeQuery = (value: string, override?: boolean) => {
// Send text change to parent
const { query, onChange, onRunQuery } = this.props;
const { query, onChange, onRunQuery, datasource, exploreMode } = this.props;
const { selectedLogGroups, selectedRegion } = this.state;
// TEMP: Remove when logs/metrics unification is complete
if (datasource.languageProvider && exploreMode === ExploreMode.Logs) {
const cloudwatchLanguageProvider = datasource.languageProvider as CloudWatchLanguageProvider;
const queryUsesStatsCommand = cloudwatchLanguageProvider.isStatsQuery(query.expression);
if (queryUsesStatsCommand) {
this.setState({
hint: {
message: 'You are trying to run a stats query in Logs mode. ',
fix: {
label: 'Switch to Metrics mode.',
action: this.switchToMetrics,
},
},
});
} else {
this.setState({
hint: undefined,
});
}
}
if (onChange) {
const nextQuery = {
...query,
expression: value,
logGroupNames: selectedLogGroups?.map(logGroupName => logGroupName.value) ?? [],
region: selectedRegion.value,
logGroupNames: selectedLogGroups?.map(logGroupName => logGroupName.value!) ?? [],
region: selectedRegion.value ?? 'default',
};
onChange(nextQuery);
@ -171,7 +192,7 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
if (onChange) {
const nextQuery = {
...query,
logGroupNames: v.map(logGroupName => logGroupName.value) ?? [],
logGroupNames: v.map(logGroupName => logGroupName.value!) ?? [],
};
onChange(nextQuery);
@ -193,8 +214,8 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
if (onChange) {
const nextQuery = {
...query,
region: v.value,
logGroupNames: selectedLogGroups.map(group => group.value),
region: v.value ?? 'default',
logGroupNames: selectedLogGroups.map(group => group.value!),
};
onChange(nextQuery);
@ -208,7 +229,7 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
};
onTypeahead = async (typeahead: TypeaheadInput): Promise<TypeaheadOutput> => {
const { datasource, exploreMode } = this.props;
const { datasource } = this.props;
const { selectedLogGroups } = this.state;
if (!datasource.languageProvider) {
@ -224,23 +245,6 @@ export class CloudWatchLogsQueryField extends React.PureComponent<CloudWatchLogs
{ history, absoluteRange, logGroupNames: selectedLogGroups.map(logGroup => logGroup.value!) }
);
const tokens = editor?.value.data.get('tokens');
const queryUsesStatsCommand = tokens.find(
(token: Token) => token.types.includes('query-command') && token.content.toLowerCase() === 'stats'
);
// TEMP: Remove when logs/metrics unification is complete
if (queryUsesStatsCommand && exploreMode === ExploreMode.Logs) {
this.setState({
hint: {
message: 'You are trying to run a stats query in Logs mode. ',
fix: {
label: 'Switch to Metrics mode.',
action: this.switchToMetrics,
},
},
});
}
return result;
};

View File

@ -76,7 +76,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
datasourceName: string;
debouncedAlert: (datasourceName: string, region: string) => void;
debouncedCustomAlert: (title: string, message: string) => void;
logQueries: Set<{ id: string; region: string }>;
logQueries: Record<string, { id: string; region: string }>;
languageProvider: CloudWatchLanguageProvider;
/** @ngInject */
@ -93,7 +93,7 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
this.standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'];
this.debouncedAlert = memoizedDebounce(displayAlert, AppNotificationTimeout.Error);
this.debouncedCustomAlert = memoizedDebounce(displayCustomError, AppNotificationTimeout.Error);
this.logQueries = new Set<{ id: string; region: string }>();
this.logQueries = {};
this.languageProvider = new CloudWatchLanguageProvider(this);
}
@ -131,7 +131,10 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
dataFrames.map(dataFrame => ({
queryId: dataFrame.fields[0].values.get(0),
region: dataFrame.meta?.custom?.['Region'] ?? 'default',
refId: dataFrame.refId,
refId: dataFrame.refId!,
groupResults: this.languageProvider.isStatsQuery(
options.targets.find(target => target.refId === dataFrame.refId)!.expression
),
}))
)
),
@ -196,9 +199,13 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
return this.performTimeSeriesQuery(request, options.range);
}
logsQuery(queryParams: Array<{ queryId: string; limit?: number; region: string }>): Observable<DataQueryResponse> {
this.logQueries.clear();
queryParams.forEach(param => this.logQueries.add({ id: param.queryId, region: param.region }));
logsQuery(
queryParams: Array<{ queryId: string; refId: string; limit?: number; region: string; groupResults?: boolean }>
): Observable<DataQueryResponse> {
this.logQueries = {};
queryParams.forEach(param => {
this.logQueries[param.refId] = { id: param.queryId, region: param.region };
});
let prevRecordsMatched: Record<string, number> = {};
return withTeardown(
@ -238,9 +245,10 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
].includes(dataframe.meta?.custom?.['Status'])
].includes(dataframe.meta?.custom?.['Status']) &&
this.logQueries.hasOwnProperty(dataframe.refId!)
) {
this.logQueries.delete({ id: queryParams[i].queryId, region: queryParams[i].region });
delete this.logQueries[dataframe.refId!];
}
});
}),
@ -299,13 +307,17 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
}
stopQueries() {
if (this.logQueries.size > 0) {
if (Object.keys(this.logQueries).length > 0) {
this.makeLogActionRequest(
'StopQuery',
[...this.logQueries.values()].map(logQuery => ({ queryId: logQuery.id, region: logQuery.region })),
Object.values(this.logQueries).map(logQuery => ({ queryId: logQuery.id, region: logQuery.region })),
undefined,
false
).pipe(finalize(() => this.logQueries.clear()));
).pipe(
finalize(() => {
this.logQueries = {};
})
);
}
}

View File

@ -19,7 +19,7 @@ import { AbsoluteTimeRange, LanguageProvider, HistoryItem } from '@grafana/data'
import { CloudWatchDatasource } from './datasource';
import { TypeaheadInput, TypeaheadOutput, Token } from '@grafana/ui';
import { Grammar } from 'prismjs';
import Prism, { Grammar } from 'prismjs';
export type CloudWatchHistoryItem = HistoryItem<CloudWatchQuery>;
@ -64,6 +64,18 @@ export class CloudWatchLanguageProvider extends LanguageProvider {
return this.startTask;
};
isStatsQuery(query: string): boolean {
const grammar = this.getSyntax();
const tokens = Prism.tokenize(query, grammar) ?? [];
return !!tokens.find(
token =>
typeof token !== 'string' &&
token.content.toString().toLowerCase() === 'stats' &&
token.type === 'query-command'
);
}
/**
* Return suggestions based on input that can be then plugged into a typeahead dropdown.
* Keep this DOM-free for testing
@ -153,7 +165,7 @@ export class CloudWatchLanguageProvider extends LanguageProvider {
return fields;
};
private handleKeyword = async (context?: TypeaheadContext): Promise<TypeaheadOutput | null> => {
private handleKeyword = async (context?: TypeaheadContext): Promise<TypeaheadOutput> => {
const suggs = await this.getFieldCompletionItems(context?.logGroupNames ?? []);
const functionSuggestions = [
{ prefixMatch: true, label: 'Functions', items: STRING_FUNCTIONS.concat(DATETIME_FUNCTIONS, IP_FUNCTIONS) },
@ -406,5 +418,5 @@ function isInsideFunctionParenthesis(curToken: Token): boolean {
function isAfterKeyword(keyword: string, token: Token): boolean {
const prevToken = prevNonWhitespaceToken(token);
return prevToken?.types.includes('keyword') && prevToken?.content.toLowerCase() === 'by';
return !!(prevToken?.types.includes('keyword') && prevToken?.content.toLowerCase() === 'by');
}