package cloudwatch import ( "context" "errors" "fmt" "math" "sort" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/util/errutil" "golang.org/x/sync/errgroup" ) var LimitExceededException = "LimitExceededException" type AWSError struct { Code string Message string Payload map[string]string } func (e *AWSError) Error() string { return fmt.Sprintf("%s: %s", e.Code, e.Message) } func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { resp := backend.NewQueryDataResponse() resultChan := make(chan backend.Responses, len(req.Queries)) eg, ectx := errgroup.WithContext(ctx) for _, query := range req.Queries { model, err := simplejson.NewJson(query.JSON) if err != nil { return nil, err } query := query eg.Go(func() error { dataframe, err := e.executeLogAction(ectx, model, query, req.PluginContext) if err != nil { var AWSError *AWSError if errors.As(err, &AWSError) { resultChan <- backend.Responses{ query.RefID: backend.DataResponse{Frames: data.Frames{}, Error: AWSError}, } return nil } return err } groupedFrames, err := groupResponseFrame(dataframe, model.Get("statsGroups").MustStringArray()) if err != nil { return err } resultChan <- backend.Responses{ query.RefID: backend.DataResponse{Frames: groupedFrames}, } return nil }) } if err := eg.Wait(); err != nil { return nil, err } close(resultChan) for result := range resultChan { for refID, response := range result { respD := resp.Responses[refID] respD.Frames = response.Frames respD.Error = response.Error resp.Responses[refID] = respD } } return resp, nil } func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simplejson.Json, query backend.DataQuery, pluginCtx backend.PluginContext) (*data.Frame, error) { subType := model.Get("subtype").MustString() dsInfo, err := e.getDSInfo(pluginCtx) if err != nil { return nil, err } defaultRegion := dsInfo.region region := model.Get("region").MustString(defaultRegion) logsClient, err := e.getCWLogsClient(region, pluginCtx) if err != nil { return nil, err } var data *data.Frame = nil switch subType { case "DescribeLogGroups": data, err = e.handleDescribeLogGroups(ctx, logsClient, model) case "GetLogGroupFields": data, err = e.handleGetLogGroupFields(ctx, logsClient, model, query.RefID) case "StartQuery": data, err = e.handleStartQuery(ctx, logsClient, model, query.TimeRange, query.RefID) case "StopQuery": data, err = e.handleStopQuery(ctx, logsClient, model) case "GetQueryResults": data, err = e.handleGetQueryResults(ctx, logsClient, model, query.RefID) case "GetLogEvents": data, err = e.handleGetLogEvents(ctx, logsClient, model) } if err != nil { return nil, errutil.Wrapf(err, "failed to execute log action with subtype: %s", subType) } return data, nil } func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { queryRequest := &cloudwatchlogs.GetLogEventsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(10)), StartFromHead: aws.Bool(parameters.Get("startFromHead").MustBool(false)), } logGroupName, err := parameters.Get("logGroupName").String() if err != nil { return nil, fmt.Errorf("Error: Parameter 'logGroupName' is required") } queryRequest.SetLogGroupName(logGroupName) logStreamName, err := parameters.Get("logStreamName").String() if err != nil { return nil, fmt.Errorf("Error: Parameter 'logStream' is required") } queryRequest.SetLogStreamName(logStreamName) if startTime, err := parameters.Get("startTime").Int64(); err == nil { queryRequest.SetStartTime(startTime) } if endTime, err := parameters.Get("endTime").Int64(); err == nil { queryRequest.SetEndTime(endTime) } logEvents, err := logsClient.GetLogEventsWithContext(ctx, queryRequest) if err != nil { return nil, err } messages := make([]*string, 0) timestamps := make([]*int64, 0) sort.Slice(logEvents.Events, func(i, j int) bool { return *(logEvents.Events[i].Timestamp) > *(logEvents.Events[j].Timestamp) }) for _, event := range logEvents.Events { messages = append(messages, event.Message) timestamps = append(timestamps, event.Timestamp) } timestampField := data.NewField("ts", nil, timestamps) timestampField.SetConfig(&data.FieldConfig{DisplayName: "Time"}) messageField := data.NewField("line", nil, messages) return data.NewFrame("logEvents", timestampField, messageField), nil } func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { logGroupNamePrefix := parameters.Get("logGroupNamePrefix").MustString("") var response *cloudwatchlogs.DescribeLogGroupsOutput = nil var err error if len(logGroupNamePrefix) == 0 { response, err = logsClient.DescribeLogGroupsWithContext(ctx, &cloudwatchlogs.DescribeLogGroupsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(50)), }) } else { response, err = logsClient.DescribeLogGroupsWithContext(ctx, &cloudwatchlogs.DescribeLogGroupsInput{ Limit: aws.Int64(parameters.Get("limit").MustInt64(50)), LogGroupNamePrefix: aws.String(logGroupNamePrefix), }) } if err != nil || response == nil { return nil, err } logGroupNames := make([]*string, 0) for _, logGroup := range response.LogGroups { logGroupNames = append(logGroupNames, logGroup.LogGroupName) } groupNamesField := data.NewField("logGroupName", nil, logGroupNames) frame := data.NewFrame("logGroups", groupNamesField) return frame, nil } func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, timeRange backend.TimeRange) (*cloudwatchlogs.StartQueryOutput, error) { startTime := timeRange.From endTime := timeRange.To if !startTime.Before(endTime) { return nil, fmt.Errorf("invalid time range: start time must be before end time") } // The fields @log and @logStream are always included in the results of a user's query // so that a row's context can be retrieved later if necessary. // The usage of ltrim around the @log/@logStream fields is a necessary workaround, as without it, // CloudWatch wouldn't consider a query using a non-alised @log/@logStream valid. modifiedQueryString := "fields @timestamp,ltrim(@log) as " + logIdentifierInternal + ",ltrim(@logStream) as " + logStreamIdentifierInternal + "|" + parameters.Get("queryString").MustString("") startQueryInput := &cloudwatchlogs.StartQueryInput{ StartTime: aws.Int64(startTime.Unix()), // Usually grafana time range allows only second precision, but you can create ranges with milliseconds // for example when going from trace to logs for that trace and trace length is sub second. In that case // StartTime is effectively floored while here EndTime is ceiled and so we should get the logs user wants // and also a little bit more but as CW logs accept only seconds as integers there is not much to do about // that. EndTime: aws.Int64(int64(math.Ceil(float64(endTime.UnixNano()) / 1e9))), LogGroupNames: aws.StringSlice(parameters.Get("logGroupNames").MustStringArray()), QueryString: aws.String(modifiedQueryString), } if resultsLimit, err := parameters.Get("limit").Int64(); err == nil { startQueryInput.Limit = aws.Int64(resultsLimit) } return logsClient.StartQueryWithContext(ctx, startQueryInput) } func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, model *simplejson.Json, timeRange backend.TimeRange, refID string) (*data.Frame, error) { startQueryResponse, err := e.executeStartQuery(ctx, logsClient, model, timeRange) if err != nil { var awsErr awserr.Error if errors.As(err, &awsErr) && awsErr.Code() == "LimitExceededException" { plog.Debug("executeStartQuery limit exceeded", "err", awsErr) return nil, &AWSError{Code: LimitExceededException, Message: err.Error()} } return nil, err } dataFrame := data.NewFrame(refID, data.NewField("queryId", nil, []string{*startQueryResponse.QueryId})) dataFrame.RefID = refID clientRegion := model.Get("region").MustString("default") dataFrame.Meta = &data.FrameMeta{ Custom: map[string]interface{}{ "Region": clientRegion, }, } return dataFrame, nil } func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.StopQueryOutput, error) { queryInput := &cloudwatchlogs.StopQueryInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } response, err := logsClient.StopQueryWithContext(ctx, queryInput) if err != nil { // If the query has already stopped by the time CloudWatch receives the stop query request, // an "InvalidParameterException" error is returned. For our purposes though the query has been // stopped, so we ignore the error. var awsErr awserr.Error if errors.As(err, &awsErr) && awsErr.Code() == "InvalidParameterException" { response = &cloudwatchlogs.StopQueryOutput{Success: aws.Bool(false)} err = nil } } return response, err } func (e *cloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*data.Frame, error) { response, err := e.executeStopQuery(ctx, logsClient, parameters) if err != nil { return nil, err } dataFrame := data.NewFrame("StopQueryResponse", data.NewField("success", nil, []bool{*response.Success})) return dataFrame, nil } func (e *cloudWatchExecutor) executeGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json) (*cloudwatchlogs.GetQueryResultsOutput, error) { queryInput := &cloudwatchlogs.GetQueryResultsInput{ QueryId: aws.String(parameters.Get("queryId").MustString()), } return logsClient.GetQueryResultsWithContext(ctx, queryInput) } func (e *cloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { getQueryResultsOutput, err := e.executeGetQueryResults(ctx, logsClient, parameters) if err != nil { return nil, err } dataFrame, err := logsResultsToDataframes(getQueryResultsOutput) if err != nil { return nil, err } dataFrame.Name = refID dataFrame.RefID = refID return dataFrame, nil } func (e *cloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI, parameters *simplejson.Json, refID string) (*data.Frame, error) { queryInput := &cloudwatchlogs.GetLogGroupFieldsInput{ LogGroupName: aws.String(parameters.Get("logGroupName").MustString()), Time: aws.Int64(parameters.Get("time").MustInt64()), } getLogGroupFieldsOutput, err := logsClient.GetLogGroupFieldsWithContext(ctx, queryInput) if err != nil { return nil, err } fieldNames := make([]*string, 0) fieldPercentages := make([]*int64, 0) for _, logGroupField := range getLogGroupFieldsOutput.LogGroupFields { fieldNames = append(fieldNames, logGroupField.Name) fieldPercentages = append(fieldPercentages, logGroupField.Percent) } dataFrame := data.NewFrame( refID, data.NewField("name", nil, fieldNames), data.NewField("percent", nil, fieldPercentages), ) dataFrame.RefID = refID return dataFrame, nil }