mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
269 lines
7.7 KiB
Go
269 lines
7.7 KiB
Go
package cloudwatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"regexp"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
|
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
|
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/models"
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
)
|
|
|
|
type CloudWatchExecutor struct {
|
|
*models.DataSource
|
|
ec2Svc ec2iface.EC2API
|
|
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
|
|
|
|
logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs)
|
|
mux sync.Mutex
|
|
}
|
|
|
|
type DatasourceInfo struct {
|
|
Profile string
|
|
Region string
|
|
AuthType string
|
|
AssumeRoleArn string
|
|
Namespace string
|
|
|
|
AccessKey string
|
|
SecretKey string
|
|
}
|
|
|
|
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
|
|
|
|
// Constants also defined in datasource/cloudwatch/datasource.ts
|
|
const logIdentifierInternal = "__log__grafana_internal__"
|
|
const logStreamIdentifierInternal = "__logstream__grafana_internal__"
|
|
|
|
func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) {
|
|
e.mux.Lock()
|
|
defer e.mux.Unlock()
|
|
|
|
if logsClient, ok := e.logsClientsByRegion[region]; ok {
|
|
return logsClient, nil
|
|
}
|
|
|
|
dsInfo := retrieveDsInfo(e.DataSource, region)
|
|
newLogsClient, err := retrieveLogsClient(dsInfo)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.logsClientsByRegion[region] = newLogsClient
|
|
|
|
return newLogsClient, nil
|
|
}
|
|
|
|
func NewCloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
|
|
dsInfo := retrieveDsInfo(datasource, "default")
|
|
defaultLogsClient, err := retrieveLogsClient(dsInfo)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logsClientsByRegion := make(map[string](*cloudwatchlogs.CloudWatchLogs))
|
|
logsClientsByRegion[dsInfo.Region] = defaultLogsClient
|
|
logsClientsByRegion["default"] = defaultLogsClient
|
|
|
|
return &CloudWatchExecutor{
|
|
logsClientsByRegion: logsClientsByRegion,
|
|
}, nil
|
|
}
|
|
|
|
var (
|
|
plog log.Logger
|
|
aliasFormat *regexp.Regexp
|
|
)
|
|
|
|
func init() {
|
|
plog = log.New("tsdb.cloudwatch")
|
|
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
|
|
aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
|
const maxAttempts = 8
|
|
const pollPeriod = 1000 * time.Millisecond
|
|
|
|
queryParams := queryContext.Queries[0].Model
|
|
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, queryParams, queryContext.TimeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestParams := simplejson.NewFromAny(map[string]interface{}{
|
|
"region": queryParams.Get("region").MustString(""),
|
|
"queryId": *startQueryOutput.QueryId,
|
|
})
|
|
|
|
ticker := time.NewTicker(pollPeriod)
|
|
defer ticker.Stop()
|
|
|
|
attemptCount := 1
|
|
for range ticker.C {
|
|
if res, err := e.executeGetQueryResults(ctx, logsClient, requestParams); err != nil {
|
|
return nil, err
|
|
} else if isTerminated(*res.Status) {
|
|
return res, err
|
|
} else if attemptCount >= maxAttempts {
|
|
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
|
|
}
|
|
|
|
attemptCount++
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
e.DataSource = dsInfo
|
|
|
|
/*
|
|
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response to the query, but rather
|
|
an ID is first returned. Following this, a client is expected to send requests along with the ID until the status of the query is complete,
|
|
receiving (possibly partial) results each time. For queries made via dashboards and Explore, the logic of making these repeated queries is handled on
|
|
the frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
|
*/
|
|
queryParams := queryContext.Queries[0].Model
|
|
_, fromAlert := queryContext.Headers["FromAlert"]
|
|
isLogAlertQuery := fromAlert && queryParams.Get("mode").MustString("") == "Logs"
|
|
|
|
if isLogAlertQuery {
|
|
return e.executeLogAlertQuery(ctx, queryContext)
|
|
}
|
|
|
|
queryType := queryParams.Get("type").MustString("")
|
|
|
|
var err error
|
|
var result *tsdb.Response
|
|
switch queryType {
|
|
case "metricFindQuery":
|
|
result, err = e.executeMetricFindQuery(ctx, queryContext)
|
|
case "annotationQuery":
|
|
result, err = e.executeAnnotationQuery(ctx, queryContext)
|
|
case "logAction":
|
|
result, err = e.executeLogActions(ctx, queryContext)
|
|
case "timeSeriesQuery":
|
|
fallthrough
|
|
default:
|
|
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
queryParams := queryContext.Queries[0].Model
|
|
queryParams.Set("subtype", "StartQuery")
|
|
queryParams.Set("queryString", queryParams.Get("expression").MustString(""))
|
|
|
|
region := queryParams.Get("region").MustString("default")
|
|
if region == "default" {
|
|
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
|
|
queryParams.Set("region", region)
|
|
}
|
|
|
|
logsClient, err := e.getLogsClient(region)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result, err := e.executeStartQuery(ctx, logsClient, queryParams, queryContext.TimeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queryParams.Set("queryId", *result.QueryId)
|
|
|
|
// Get query results
|
|
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, queryContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dataframe, err := queryResultsToDataframe(getQueryResultsOutput)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dataframeEnc, err := dataframe.MarshalArrow()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &tsdb.Response{
|
|
Results: map[string]*tsdb.QueryResult{
|
|
"A": {
|
|
RefId: "A",
|
|
Dataframes: [][]byte{dataframeEnc},
|
|
},
|
|
},
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func queryResultsToDataframe(results *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) {
|
|
rowCount := len(results.Results)
|
|
fieldValues := make(map[string]interface{})
|
|
for i, row := range results.Results {
|
|
for _, resultField := range row {
|
|
// Strip @ptr field from results as it's not needed
|
|
if *resultField.Field == "@ptr" {
|
|
continue
|
|
}
|
|
|
|
if _, exists := fieldValues[*resultField.Field]; !exists {
|
|
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
|
|
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
|
|
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
|
|
fieldValues[*resultField.Field] = make([]*float64, rowCount)
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if timeField, ok := fieldValues[*resultField.Field].([]*time.Time); ok {
|
|
parsedTime, err := time.Parse(cloudWatchTSFormat, *resultField.Value)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
timeField[i] = &parsedTime
|
|
} else if numericField, ok := fieldValues[*resultField.Field].([]*float64); ok {
|
|
parsedFloat, err := strconv.ParseFloat(*resultField.Value, 64)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
numericField[i] = &parsedFloat
|
|
}
|
|
}
|
|
}
|
|
|
|
newFields := make([]*data.Field, 0)
|
|
for fieldName, vals := range fieldValues {
|
|
newFields = append(newFields, data.NewField(fieldName, nil, vals))
|
|
|
|
if fieldName == "@timestamp" {
|
|
newFields[len(newFields)-1].SetConfig(&data.FieldConfig{Title: "Time"})
|
|
}
|
|
}
|
|
|
|
frame := data.NewFrame("CloudWatchLogsResponse", newFields...)
|
|
return frame, nil
|
|
}
|
|
|
|
func isTerminated(queryStatus string) bool {
|
|
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
|
|
}
|