package cloudwatch import ( "context" "fmt" "sort" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/tsdb" "golang.org/x/sync/errgroup" ) func (e *CloudWatchExecutor) executeLogActions(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) { resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries)) eg, ectx := errgroup.WithContext(ctx) for _, query := range queryContext.Queries { query := query eg.Go(func() error { dataframe, err := e.executeLogAction(ectx, queryContext, query) if err != nil { return err } // When a query of the form "stats ... by ..." is made, we want to return // one series per group defined in the query, but due to the format // the query response is in, there does not seem to be a way to tell // by the response alone if/how the results should be grouped. // Because of this, if the frontend sees that a "stats ... by ..." query is being made // the "statsGroups" parameter is sent along with the query to the backend so that we // can correctly group the CloudWatch logs response. statsGroups := query.Model.Get("statsGroups").MustStringArray() if len(statsGroups) > 0 && len(dataframe.Fields) > 0 { groupedFrames, err := groupResults(dataframe, statsGroups) if err != nil { return err } resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: tsdb.NewDecodedDataFrames(groupedFrames)} return nil } resultChan <- &tsdb.QueryResult{RefId: query.RefId, Dataframes: tsdb.NewDecodedDataFrames(data.Frames{dataframe})} return nil }) } if err := eg.Wait(); err != nil { return nil, err } close(resultChan) response := &tsdb.Response{ Results: make(map[string]*tsdb.QueryResult), } for result := range resultChan { response.Results[result.RefId] = result } return response, nil } func (e *CloudWatchExecutor) executeLogAction(ctx context.Context, queryContext *tsdb.TsdbQuery, query *tsdb.Query) (*data.Frame, error) { parameters := query.Model subType := query.Model.Get("subtype").MustString() defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString() region := parameters.Get("region").MustString(defaultRegion) logsClient, err := e.getLogsClient(region) if err != nil { return nil, err } var data *data.Frame = nil switch subType { case "DescribeLogGroups": data, err = e.handleDescribeLogGroups(ctx, logsClient, parameters) case "GetLogGroupFields": data, err = e.handleGetLogGroupFields(ctx, logsClient, parameters, query.RefId) case "StartQuery": data, err = e.handleStartQuery(ctx, logsClient, parameters, queryContext.TimeRange, query.RefId) case "StopQuery": data, err = e.handleStopQuery(ctx, logsClient, parameters) case "GetQueryResults": data, err = e.handleGetQueryResults(ctx, logsClient, parameters, query.RefId) case "GetLogEvents": data, err = e.handleGetLogEvents(ctx, logsClient, parameters) } if err != nil { return nil, err } return data, nil } func (e *CloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { queryRequest := &cloudwatchlogs.GetLogEventsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(10)), StartFromHead: aws.Bool(parameters.Get("startFromHead").MustBool(false)), } logGroupName, err := parameters.Get("logGroupName").String() if err != nil { return nil, fmt.Errorf("Error: Parameter 'logGroupName' is required") } queryRequest.SetLogGroupName(logGroupName) logStreamName, err := parameters.Get("logStreamName").String() if err != nil { return nil, fmt.Errorf("Error: Parameter 'logStream' is required") } queryRequest.SetLogStreamName(logStreamName) if startTime, err := parameters.Get("startTime").Int64(); err == nil { queryRequest.SetStartTime(startTime) } if endTime, err := parameters.Get("endTime").Int64(); err == nil { queryRequest.SetEndTime(endTime) } logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest) if err != nil { return nil, err } messages := make([]*string, 0) timestamps := make([]*int64, 0) sort.Slice(logEvents.Events, func(i, j int) bool { return *(logEvents.Events[i].Timestamp) > *(logEvents.Events[j].Timestamp) }) for _, event := range logEvents.Events { messages = append(messages, event.Message) timestamps = append(timestamps, event.Timestamp) } timestampField := data.NewField("ts", nil, timestamps) timestampField.SetConfig(&data.FieldConfig{DisplayName: "Time"}) messageField := data.NewField("line", nil, messages) return data.NewFrame("logEvents", timestampField, messageField), nil } func (e *CloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { logGroupNamePrefix := parameters.Get("logGroupNamePrefix").MustString("") var response *cloudwatchlogs.DescribeLogGroupsOutput = nil var err error if len(logGroupNamePrefix) < 1 { response, err = logsClient.DescribeLogGroupsWithContext(ctx, &cloudwatchlogs.DescribeLogGroupsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(50)), }) } else { response, err = logsClient.DescribeLogGroupsWithContext(ctx, &cloudwatchlogs.DescribeLogGroupsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(50)), LogGroupNamePrefix: aws.String(logGroupNamePrefix), }) } if err != nil || response == nil { return nil, err } logGroupNames := make([]*string, 0) for _, logGroup := range response.LogGroups { logGroupNames = append(logGroupNames, logGroup.LogGroupName) } groupNamesField := data.NewField("logGroupName", nil, logGroupNames) frame := data.NewFrame("logGroups", groupNamesField) return frame, nil } func (e *CloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) { startTime, err := timeRange.ParseFrom() if err != nil { return nil, err } endTime, err := timeRange.ParseTo() if err != nil { return nil, err } if !startTime.Before(endTime) { return nil, fmt.Errorf("invalid time range: start time must be before end time") } // The fields @log and @logStream are always included in the results of a user's query // so that a row's context can be retrieved later if necessary. // The usage of ltrim around the @log/@logStream fields is a necessary workaround, as without it, // CloudWatch wouldn't consider a query using a non-alised @log/@logStream valid. modifiedQueryString := "fields @timestamp,ltrim(@log) as " + logIdentifierInternal + ",ltrim(@logStream) as " + logStreamIdentifierInternal + "|" + parameters.Get("queryString").MustString("") startQueryInput := &cloudwatchlogs.StartQueryInput{ StartTime: aws.Int64(startTime.Unix()), EndTime: aws.Int64(endTime.Unix()), LogGroupNames: aws.StringSlice(parameters.Get("logGroupNames").MustStringArray()), QueryString: aws.String(modifiedQueryString), } if resultsLimit, err := parameters.Get("limit").Int64(); err == nil { startQueryInput.Limit = aws.Int64(resultsLimit) } return logsClient.StartQueryWithContext(ctx, startQueryInput) } func (e *CloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange *tsdb.TimeRange, refID string) (*data.Frame, error) { startQueryResponse, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange) if err != nil { return nil, err } dataFrame := data.NewFrame(refID, data.NewField("queryId", nil, []string{*startQueryResponse.QueryId})) dataFrame.RefID = refID clientRegion := parameters.Get("region").MustString("default") dataFrame.Meta = &data.FrameMeta{ Custom: map[string]interface{}{ "Region": clientRegion, }, } return dataFrame, nil } func (e *CloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) { queryInput := &cloudwatchlogs.StopQueryInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } response, err := logsClient.StopQueryWithContext(ctx, queryInput) if err != nil { awsErr, ok := err.(awserr.Error) // If the query has already stopped by the time CloudWatch receives the stop query request, // an "InvalidParameterException" error is returned. For our purposes though the query has been // stopped, so we ignore the error. if ok && awsErr.Code() == "InvalidParameterException" { response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)} err = nil } } return response, err } func (e *CloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { response, err := e.executeStopQuery(ctx, logsClient, parameters) if err != nil { return nil, err } dataFrame := data.NewFrame("StopQueryResponse", data.NewField("success", nil, []bool{*response.Success})) return dataFrame, nil } func (e *CloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { queryInput := &cloudwatchlogs.GetQueryResultsInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } return logsClient.GetQueryResultsWithContext(ctx, queryInput) } func (e *CloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { getQueryResultsOutput, err := e.executeGetQueryResults(ctx, logsClient, parameters) if err != nil { return nil, err } dataFrame, err := logsResultsToDataframes(getQueryResultsOutput) if err != nil { return nil, err } dataFrame.Name = refID dataFrame.RefID = refID return dataFrame, nil } func (e *CloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { queryInput := &cloudwatchlogs.GetLogGroupFieldsInput{ LogGroupName: aws.String(parameters.Get("logGroupName").MustString()), Time: aws.Int64(parameters.Get("time").MustInt64()), } getLogGroupFieldsOutput, err := logsClient.GetLogGroupFieldsWithContext(ctx, queryInput) if err != nil { return nil, err } fieldNames := make([]*string, 0) fieldPercentages := make([]*int64, 0) for _, logGroupField := range getLogGroupFieldsOutput.LogGroupFields { fieldNames = append(fieldNames, logGroupField.Name) fieldPercentages = append(fieldPercentages, logGroupField.Percent) } dataFrame := data.NewFrame( refID, data.NewField("name", nil, fieldNames), data.NewField("percent", nil, fieldPercentages), ) dataFrame.RefID = refID return dataFrame, nil }