mirror of
https://github.com/grafana/grafana.git
synced 2024-11-29 20:24:18 -06:00
3ef06a0c88
Co-authored by: Arve Knudsen
233 lines
6.5 KiB
Go
233 lines
6.5 KiB
Go
package cloudwatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
|
|
|
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
|
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
|
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
|
"github.com/grafana/grafana/pkg/components/simplejson"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/models"
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
)
|
|
|
|
type CloudWatchExecutor struct {
|
|
*models.DataSource
|
|
ec2Svc ec2iface.EC2API
|
|
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
|
|
|
|
logsClientsByRegion map[string](*cloudwatchlogs.CloudWatchLogs)
|
|
mux sync.Mutex
|
|
}
|
|
|
|
type DatasourceInfo struct {
|
|
Profile string
|
|
Region string
|
|
AuthType string
|
|
AssumeRoleArn string
|
|
ExternalID string
|
|
Namespace string
|
|
|
|
AccessKey string
|
|
SecretKey string
|
|
}
|
|
|
|
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
|
|
|
|
// Constants also defined in datasource/cloudwatch/datasource.ts
|
|
const logIdentifierInternal = "__log__grafana_internal__"
|
|
const logStreamIdentifierInternal = "__logstream__grafana_internal__"
|
|
|
|
func (e *CloudWatchExecutor) getLogsClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) {
|
|
e.mux.Lock()
|
|
defer e.mux.Unlock()
|
|
|
|
if logsClient, ok := e.logsClientsByRegion[region]; ok {
|
|
return logsClient, nil
|
|
}
|
|
|
|
dsInfo := retrieveDsInfo(e.DataSource, region)
|
|
newLogsClient, err := retrieveLogsClient(dsInfo)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
e.logsClientsByRegion[region] = newLogsClient
|
|
|
|
return newLogsClient, nil
|
|
}
|
|
|
|
func NewCloudWatchExecutor(datasource *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
|
|
dsInfo := retrieveDsInfo(datasource, "default")
|
|
defaultLogsClient, err := retrieveLogsClient(dsInfo)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logsClientsByRegion := make(map[string](*cloudwatchlogs.CloudWatchLogs))
|
|
logsClientsByRegion[dsInfo.Region] = defaultLogsClient
|
|
logsClientsByRegion["default"] = defaultLogsClient
|
|
|
|
return &CloudWatchExecutor{
|
|
logsClientsByRegion: logsClientsByRegion,
|
|
}, nil
|
|
}
|
|
|
|
var plog = log.New("tsdb.cloudwatch")
|
|
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
|
|
|
func init() {
|
|
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) alertQuery(ctx context.Context, logsClient *cloudwatchlogs.CloudWatchLogs, queryContext *tsdb.TsdbQuery) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
|
const maxAttempts = 8
|
|
const pollPeriod = 1000 * time.Millisecond
|
|
|
|
queryParams := queryContext.Queries[0].Model
|
|
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, queryParams, queryContext.TimeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestParams := simplejson.NewFromAny(map[string]interface{}{
|
|
"region": queryParams.Get("region").MustString(""),
|
|
"queryId": *startQueryOutput.QueryId,
|
|
})
|
|
|
|
ticker := time.NewTicker(pollPeriod)
|
|
defer ticker.Stop()
|
|
|
|
attemptCount := 1
|
|
for range ticker.C {
|
|
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if isTerminated(*res.Status) {
|
|
return res, err
|
|
}
|
|
if attemptCount >= maxAttempts {
|
|
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
|
|
}
|
|
|
|
attemptCount++
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
e.DataSource = dsInfo
|
|
|
|
/*
|
|
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response to the query, but rather
|
|
an ID is first returned. Following this, a client is expected to send requests along with the ID until the status of the query is complete,
|
|
receiving (possibly partial) results each time. For queries made via dashboards and Explore, the logic of making these repeated queries is handled on
|
|
the frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
|
*/
|
|
queryParams := queryContext.Queries[0].Model
|
|
_, fromAlert := queryContext.Headers["FromAlert"]
|
|
isLogAlertQuery := fromAlert && queryParams.Get("queryMode").MustString("") == "Logs"
|
|
|
|
if isLogAlertQuery {
|
|
return e.executeLogAlertQuery(ctx, queryContext)
|
|
}
|
|
|
|
queryType := queryParams.Get("type").MustString("")
|
|
|
|
var err error
|
|
var result *tsdb.Response
|
|
switch queryType {
|
|
case "metricFindQuery":
|
|
result, err = e.executeMetricFindQuery(ctx, queryContext)
|
|
case "annotationQuery":
|
|
result, err = e.executeAnnotationQuery(ctx, queryContext)
|
|
case "logAction":
|
|
result, err = e.executeLogActions(ctx, queryContext)
|
|
case "timeSeriesQuery":
|
|
fallthrough
|
|
default:
|
|
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) executeLogAlertQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
queryParams := queryContext.Queries[0].Model
|
|
queryParams.Set("subtype", "StartQuery")
|
|
queryParams.Set("queryString", queryParams.Get("expression").MustString(""))
|
|
|
|
region := queryParams.Get("region").MustString("default")
|
|
if region == "default" {
|
|
region = e.DataSource.JsonData.Get("defaultRegion").MustString()
|
|
queryParams.Set("region", region)
|
|
}
|
|
|
|
logsClient, err := e.getLogsClient(region)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result, err := e.executeStartQuery(ctx, logsClient, queryParams, queryContext.TimeRange)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queryParams.Set("queryId", *result.QueryId)
|
|
|
|
// Get query results
|
|
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, queryContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
statsGroups := queryParams.Get("statsGroups").MustStringArray()
|
|
if len(statsGroups) > 0 && len(dataframe.Fields) > 0 {
|
|
groupedFrames, err := groupResults(dataframe, statsGroups)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := &tsdb.Response{
|
|
Results: make(map[string]*tsdb.QueryResult),
|
|
}
|
|
|
|
response.Results["A"] = &tsdb.QueryResult{
|
|
RefId: "A",
|
|
Dataframes: tsdb.NewDecodedDataFrames(groupedFrames),
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
response := &tsdb.Response{
|
|
Results: map[string]*tsdb.QueryResult{
|
|
"A": {
|
|
RefId: "A",
|
|
Dataframes: tsdb.NewDecodedDataFrames(data.Frames{dataframe}),
|
|
},
|
|
},
|
|
}
|
|
return response, nil
|
|
}
|
|
|
|
func isTerminated(queryStatus string) bool {
|
|
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
|
|
}
|