diff --git a/pkg/tsdb/cloudwatch/log_actions.go b/pkg/tsdb/cloudwatch/log_actions.go index a2fc95fc3d1..c0e14c73f5c 100644 --- a/pkg/tsdb/cloudwatch/log_actions.go +++ b/pkg/tsdb/cloudwatch/log_actions.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strconv" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -29,6 +30,34 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext return err } + // When a query of the form "stats ... by ..." is made, we want to return + // one series per group defined in the query, but due to the format + // the query response is in, there does not seem to be a way to tell + // by the response alone if/how the results should be grouped. + // Because of this, if the frontend sees that a "stats ... by ..." query is being made + // the "groupResults" parameter is sent along with the query to the backend so that we + // can correctly group the CloudWatch logs response. + if query.Model.Get("groupResults").MustBool() && len(dataframe.Fields) > 0 { + groupingFields := findGroupingFields(dataframe.Fields) + + groupedFrames, err := groupResults(dataframe, groupingFields) + if err != nil { + return err + } + + encodedFrames := make([][]byte, 0) + for _, frame := range groupedFrames { + dataframeEnc, err := frame.MarshalArrow() + if err != nil { + return err + } + encodedFrames = append(encodedFrames, dataframeEnc) + } + + resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: encodedFrames} + return nil + } + dataframeEnc, err := dataframe.MarshalArrow() if err != nil { return err @@ -56,6 +85,23 @@ func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext return response, nil } +func findGroupingFields(fields []*data.Field) []string { + groupingFields := make([]string, 0) + for _, field := range fields { + if field.Type().Numeric() || field.Type() == data.FieldTypeNullableTime || field.Type() == data.FieldTypeTime { + continue + } + + if _, err := strconv.ParseFloat(*field.At(0).(*string), 64); err == nil { + continue + } + + groupingFields = append(groupingFields, field.Name) + } + + return groupingFields +} + func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) { parameters := query.Model subType := query.Model.Get("subtype").MustString() diff --git a/pkg/tsdb/cloudwatch/log_query.go b/pkg/tsdb/cloudwatch/log_query.go index 4c3ba380c01..f4dc4bcc573 100644 --- a/pkg/tsdb/cloudwatch/log_query.go +++ b/pkg/tsdb/cloudwatch/log_query.go @@ -73,3 +73,48 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d return frame, nil } + +func groupResults(results *data.Frame, groupingFieldNames []string) ([]*data.Frame, error) { + groupingFields := make([]*data.Field, 0) + + for _, field := range results.Fields { + for _, groupingField := range groupingFieldNames { + if field.Name == groupingField { + groupingFields = append(groupingFields, field) + } + } + } + + rowLength, err := results.RowLen() + if err != nil { + return nil, err + } + + groupedDataFrames := make(map[string]*data.Frame) + for i := 0; i < rowLength; i++ { + groupKey := generateGroupKey(groupingFields, i) + if _, exists := groupedDataFrames[groupKey]; !exists { + newFrame := results.EmptyCopy() + newFrame.Name = groupKey + groupedDataFrames[groupKey] = newFrame + } + + groupedDataFrames[groupKey].AppendRow(results.RowCopy(i)...) + } + + newDataFrames := make([]*data.Frame, 0, len(groupedDataFrames)) + for _, dataFrame := range groupedDataFrames { + newDataFrames = append(newDataFrames, dataFrame) + } + + return newDataFrames, nil +} + +func generateGroupKey(fields []*data.Field, row int) string { + groupKey := "" + for _, field := range fields { + groupKey += *field.At(row).(*string) + } + + return groupKey +} diff --git a/pkg/tsdb/cloudwatch/log_query_test.go b/pkg/tsdb/cloudwatch/log_query_test.go index 2f457b9c4c7..b80fcaa5b6a 100644 --- a/pkg/tsdb/cloudwatch/log_query_test.go +++ b/pkg/tsdb/cloudwatch/log_query_test.go @@ -159,3 +159,141 @@ func TestLogsResultsToDataframes(t *testing.T) { assert.Equal(t, expectedDataframe.Meta, dataframes.Meta) assert.ElementsMatch(t, expectedDataframe.Fields, dataframes.Fields) } + +func TestGroupKeyGeneration(t *testing.T) { + logField := data.NewField("@log", data.Labels{}, []*string{ + aws.String("fakelog-a"), + aws.String("fakelog-b"), + aws.String("fakelog-c"), + }) + + streamField := data.NewField("stream", data.Labels{}, []*string{ + aws.String("stream-a"), + aws.String("stream-b"), + aws.String("stream-c"), + }) + + fakeFields := []*data.Field{logField, streamField} + expectedKeys := []string{"fakelog-astream-a", "fakelog-bstream-b", "fakelog-cstream-c"} + + assert.Equal(t, expectedKeys[0], generateGroupKey(fakeFields, 0)) + assert.Equal(t, expectedKeys[1], generateGroupKey(fakeFields, 1)) + assert.Equal(t, expectedKeys[2], generateGroupKey(fakeFields, 2)) +} + +func TestGroupingResults(t *testing.T) { + timeA, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 15:04:05.000") + timeB, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 16:04:05.000") + timeC, _ := time.Parse("2006-01-02 15:04:05.000", "2020-03-02 17:04:05.000") + timeVals := []*time.Time{ + &timeA, &timeA, &timeA, &timeB, &timeB, &timeB, &timeC, &timeC, &timeC, + } + timeField := data.NewField("@timestamp", data.Labels{}, timeVals) + + logField := data.NewField("@log", data.Labels{}, []*string{ + aws.String("fakelog-a"), + aws.String("fakelog-b"), + aws.String("fakelog-c"), + aws.String("fakelog-a"), + aws.String("fakelog-b"), + aws.String("fakelog-c"), + aws.String("fakelog-a"), + aws.String("fakelog-b"), + aws.String("fakelog-c"), + }) + + countField := data.NewField("count", data.Labels{}, []*string{ + aws.String("100"), + aws.String("150"), + aws.String("20"), + aws.String("34"), + aws.String("57"), + aws.String("62"), + aws.String("105"), + aws.String("200"), + aws.String("99"), + }) + + fakeDataFrame := &data.Frame{ + Name: "CloudWatchLogsResponse", + Fields: []*data.Field{ + timeField, + logField, + countField, + }, + RefID: "", + } + + groupedTimeVals := []*time.Time{ + &timeA, &timeB, &timeC, + } + groupedTimeField := data.NewField("@timestamp", data.Labels{}, groupedTimeVals) + groupedLogFieldA := data.NewField("@log", data.Labels{}, []*string{ + aws.String("fakelog-a"), + aws.String("fakelog-a"), + aws.String("fakelog-a"), + }) + + groupedCountFieldA := data.NewField("count", data.Labels{}, []*string{ + aws.String("100"), + aws.String("34"), + aws.String("105"), + }) + + groupedLogFieldB := data.NewField("@log", data.Labels{}, []*string{ + aws.String("fakelog-b"), + aws.String("fakelog-b"), + aws.String("fakelog-b"), + }) + + groupedCountFieldB := data.NewField("count", data.Labels{}, []*string{ + aws.String("150"), + aws.String("57"), + aws.String("200"), + }) + + groupedLogFieldC := data.NewField("@log", data.Labels{}, []*string{ + aws.String("fakelog-c"), + aws.String("fakelog-c"), + aws.String("fakelog-c"), + }) + + groupedCountFieldC := data.NewField("count", data.Labels{}, []*string{ + aws.String("20"), + aws.String("62"), + aws.String("99"), + }) + + expectedGroupedFrames := []*data.Frame{ + { + Name: "fakelog-a", + Fields: []*data.Field{ + groupedTimeField, + groupedLogFieldA, + groupedCountFieldA, + }, + RefID: "", + }, + { + Name: "fakelog-b", + Fields: []*data.Field{ + groupedTimeField, + groupedLogFieldB, + groupedCountFieldB, + }, + RefID: "", + }, + { + Name: "fakelog-c", + Fields: []*data.Field{ + groupedTimeField, + groupedLogFieldC, + groupedCountFieldC, + }, + RefID: "", + }, + } + + groupedResults, _ := groupResults(fakeDataFrame, []string{"@log"}) + assert.ElementsMatch(t, expectedGroupedFrames, groupedResults) +} diff --git a/public/app/plugins/datasource/cloudwatch/components/LogsQueryField.tsx b/public/app/plugins/datasource/cloudwatch/components/LogsQueryField.tsx index a6840f3203e..a2b1f340216 100644 --- a/public/app/plugins/datasource/cloudwatch/components/LogsQueryField.tsx +++ b/public/app/plugins/datasource/cloudwatch/components/LogsQueryField.tsx @@ -11,7 +11,6 @@ import { BracesPlugin, Select, MultiSelect, - Token, } from '@grafana/ui'; // Utils & Services @@ -143,15 +142,37 @@ export class CloudWatchLogsQueryField extends React.PureComponent { // Send text change to parent - const { query, onChange, onRunQuery } = this.props; + const { query, onChange, onRunQuery, datasource, exploreMode } = this.props; const { selectedLogGroups, selectedRegion } = this.state; + // TEMP: Remove when logs/metrics unification is complete + if (datasource.languageProvider && exploreMode === ExploreMode.Logs) { + const cloudwatchLanguageProvider = datasource.languageProvider as CloudWatchLanguageProvider; + const queryUsesStatsCommand = cloudwatchLanguageProvider.isStatsQuery(query.expression); + + if (queryUsesStatsCommand) { + this.setState({ + hint: { + message: 'You are trying to run a stats query in Logs mode. ', + fix: { + label: 'Switch to Metrics mode.', + action: this.switchToMetrics, + }, + }, + }); + } else { + this.setState({ + hint: undefined, + }); + } + } + if (onChange) { const nextQuery = { ...query, expression: value, - logGroupNames: selectedLogGroups?.map(logGroupName => logGroupName.value) ?? [], - region: selectedRegion.value, + logGroupNames: selectedLogGroups?.map(logGroupName => logGroupName.value!) ?? [], + region: selectedRegion.value ?? 'default', }; onChange(nextQuery); @@ -171,7 +192,7 @@ export class CloudWatchLogsQueryField extends React.PureComponent logGroupName.value) ?? [], + logGroupNames: v.map(logGroupName => logGroupName.value!) ?? [], }; onChange(nextQuery); @@ -193,8 +214,8 @@ export class CloudWatchLogsQueryField extends React.PureComponent group.value), + region: v.value ?? 'default', + logGroupNames: selectedLogGroups.map(group => group.value!), }; onChange(nextQuery); @@ -208,7 +229,7 @@ export class CloudWatchLogsQueryField extends React.PureComponent => { - const { datasource, exploreMode } = this.props; + const { datasource } = this.props; const { selectedLogGroups } = this.state; if (!datasource.languageProvider) { @@ -224,23 +245,6 @@ export class CloudWatchLogsQueryField extends React.PureComponent logGroup.value!) } ); - const tokens = editor?.value.data.get('tokens'); - const queryUsesStatsCommand = tokens.find( - (token: Token) => token.types.includes('query-command') && token.content.toLowerCase() === 'stats' - ); - - // TEMP: Remove when logs/metrics unification is complete - if (queryUsesStatsCommand && exploreMode === ExploreMode.Logs) { - this.setState({ - hint: { - message: 'You are trying to run a stats query in Logs mode. ', - fix: { - label: 'Switch to Metrics mode.', - action: this.switchToMetrics, - }, - }, - }); - } return result; }; diff --git a/public/app/plugins/datasource/cloudwatch/datasource.ts b/public/app/plugins/datasource/cloudwatch/datasource.ts index d5fe7e9435f..ab283452ce3 100644 --- a/public/app/plugins/datasource/cloudwatch/datasource.ts +++ b/public/app/plugins/datasource/cloudwatch/datasource.ts @@ -76,7 +76,7 @@ export class CloudWatchDatasource extends DataSourceApi void; debouncedCustomAlert: (title: string, message: string) => void; - logQueries: Set<{ id: string; region: string }>; + logQueries: Record; languageProvider: CloudWatchLanguageProvider; /** @ngInject */ @@ -93,7 +93,7 @@ export class CloudWatchDatasource extends DataSourceApi(); + this.logQueries = {}; this.languageProvider = new CloudWatchLanguageProvider(this); } @@ -131,7 +131,10 @@ export class CloudWatchDatasource extends DataSourceApi ({ queryId: dataFrame.fields[0].values.get(0), region: dataFrame.meta?.custom?.['Region'] ?? 'default', - refId: dataFrame.refId, + refId: dataFrame.refId!, + groupResults: this.languageProvider.isStatsQuery( + options.targets.find(target => target.refId === dataFrame.refId)!.expression + ), })) ) ), @@ -196,9 +199,13 @@ export class CloudWatchDatasource extends DataSourceApi): Observable { - this.logQueries.clear(); - queryParams.forEach(param => this.logQueries.add({ id: param.queryId, region: param.region })); + logsQuery( + queryParams: Array<{ queryId: string; refId: string; limit?: number; region: string; groupResults?: boolean }> + ): Observable { + this.logQueries = {}; + queryParams.forEach(param => { + this.logQueries[param.refId] = { id: param.queryId, region: param.region }; + }); let prevRecordsMatched: Record = {}; return withTeardown( @@ -238,9 +245,10 @@ export class CloudWatchDatasource extends DataSourceApi 0) { + if (Object.keys(this.logQueries).length > 0) { this.makeLogActionRequest( 'StopQuery', - [...this.logQueries.values()].map(logQuery => ({ queryId: logQuery.id, region: logQuery.region })), + Object.values(this.logQueries).map(logQuery => ({ queryId: logQuery.id, region: logQuery.region })), undefined, false - ).pipe(finalize(() => this.logQueries.clear())); + ).pipe( + finalize(() => { + this.logQueries = {}; + }) + ); } } diff --git a/public/app/plugins/datasource/cloudwatch/language_provider.ts b/public/app/plugins/datasource/cloudwatch/language_provider.ts index ae62a9abef9..fff40429669 100644 --- a/public/app/plugins/datasource/cloudwatch/language_provider.ts +++ b/public/app/plugins/datasource/cloudwatch/language_provider.ts @@ -19,7 +19,7 @@ import { AbsoluteTimeRange, LanguageProvider, HistoryItem } from '@grafana/data' import { CloudWatchDatasource } from './datasource'; import { TypeaheadInput, TypeaheadOutput, Token } from '@grafana/ui'; -import { Grammar } from 'prismjs'; +import Prism, { Grammar } from 'prismjs'; export type CloudWatchHistoryItem = HistoryItem; @@ -64,6 +64,18 @@ export class CloudWatchLanguageProvider extends LanguageProvider { return this.startTask; }; + isStatsQuery(query: string): boolean { + const grammar = this.getSyntax(); + const tokens = Prism.tokenize(query, grammar) ?? []; + + return !!tokens.find( + token => + typeof token !== 'string' && + token.content.toString().toLowerCase() === 'stats' && + token.type === 'query-command' + ); + } + /** * Return suggestions based on input that can be then plugged into a typeahead dropdown. * Keep this DOM-free for testing @@ -153,7 +165,7 @@ export class CloudWatchLanguageProvider extends LanguageProvider { return fields; }; - private handleKeyword = async (context?: TypeaheadContext): Promise => { + private handleKeyword = async (context?: TypeaheadContext): Promise => { const suggs = await this.getFieldCompletionItems(context?.logGroupNames ?? []); const functionSuggestions = [ { prefixMatch: true, label: 'Functions', items: STRING_FUNCTIONS.concat(DATETIME_FUNCTIONS, IP_FUNCTIONS) }, @@ -406,5 +418,5 @@ function isInsideFunctionParenthesis(curToken: Token): boolean { function isAfterKeyword(keyword: string, token: Token): boolean { const prevToken = prevNonWhitespaceToken(token); - return prevToken?.types.includes('keyword') && prevToken?.content.toLowerCase() === 'by'; + return !!(prevToken?.types.includes('keyword') && prevToken?.content.toLowerCase() === 'by'); }