mirror of
https://github.com/grafana/grafana.git
synced 2025-02-16 02:23:31 -06:00
237 lines
6.0 KiB
Go
237 lines
6.0 KiB
Go
package cloudwatch
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
|
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/models"
|
|
"github.com/grafana/grafana/pkg/tsdb"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type CloudWatchExecutor struct {
|
|
*models.DataSource
|
|
ec2Svc ec2iface.EC2API
|
|
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
|
|
}
|
|
|
|
type DatasourceInfo struct {
|
|
Profile string
|
|
Region string
|
|
AuthType string
|
|
AssumeRoleArn string
|
|
Namespace string
|
|
|
|
AccessKey string
|
|
SecretKey string
|
|
}
|
|
|
|
func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
|
|
return &CloudWatchExecutor{}, nil
|
|
}
|
|
|
|
var (
|
|
plog log.Logger
|
|
standardStatistics map[string]bool
|
|
aliasFormat *regexp.Regexp
|
|
)
|
|
|
|
func init() {
|
|
plog = log.New("tsdb.cloudwatch")
|
|
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
|
|
standardStatistics = map[string]bool{
|
|
"Average": true,
|
|
"Maximum": true,
|
|
"Minimum": true,
|
|
"Sum": true,
|
|
"SampleCount": true,
|
|
}
|
|
aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
var result *tsdb.Response
|
|
e.DataSource = dsInfo
|
|
queryType := queryContext.Queries[0].Model.Get("type").MustString("")
|
|
var err error
|
|
|
|
switch queryType {
|
|
case "metricFindQuery":
|
|
result, err = e.executeMetricFindQuery(ctx, queryContext)
|
|
case "annotationQuery":
|
|
result, err = e.executeAnnotationQuery(ctx, queryContext)
|
|
case "timeSeriesQuery":
|
|
fallthrough
|
|
default:
|
|
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
|
results := &tsdb.Response{
|
|
Results: make(map[string]*tsdb.QueryResult),
|
|
}
|
|
resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
|
|
|
|
eg, ectx := errgroup.WithContext(ctx)
|
|
|
|
getMetricDataQueries := make(map[string]map[string]*CloudWatchQuery)
|
|
for i, model := range queryContext.Queries {
|
|
queryType := model.Model.Get("type").MustString()
|
|
if queryType != "timeSeriesQuery" && queryType != "" {
|
|
continue
|
|
}
|
|
|
|
RefId := queryContext.Queries[i].RefId
|
|
query, err := parseQuery(queryContext.Queries[i].Model)
|
|
if err != nil {
|
|
results.Results[RefId] = &tsdb.QueryResult{
|
|
Error: err,
|
|
}
|
|
return results, nil
|
|
}
|
|
query.RefId = RefId
|
|
|
|
if query.Id != "" {
|
|
if _, ok := getMetricDataQueries[query.Region]; !ok {
|
|
getMetricDataQueries[query.Region] = make(map[string]*CloudWatchQuery)
|
|
}
|
|
getMetricDataQueries[query.Region][query.Id] = query
|
|
continue
|
|
}
|
|
|
|
if query.Id == "" && query.Expression != "" {
|
|
results.Results[query.RefId] = &tsdb.QueryResult{
|
|
Error: fmt.Errorf("Invalid query: id should be set if using expression"),
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
eg.Go(func() error {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
plog.Error("Execute Query Panic", "error", err, "stack", log.Stack(1))
|
|
if theErr, ok := err.(error); ok {
|
|
resultChan <- &tsdb.QueryResult{
|
|
RefId: query.RefId,
|
|
Error: theErr,
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
queryRes, err := e.executeQuery(ectx, query, queryContext)
|
|
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
|
|
return err
|
|
}
|
|
if err != nil {
|
|
resultChan <- &tsdb.QueryResult{
|
|
RefId: query.RefId,
|
|
Error: err,
|
|
}
|
|
return nil
|
|
}
|
|
resultChan <- queryRes
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if len(getMetricDataQueries) > 0 {
|
|
for region, getMetricDataQuery := range getMetricDataQueries {
|
|
q := getMetricDataQuery
|
|
eg.Go(func() error {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
|
|
if theErr, ok := err.(error); ok {
|
|
resultChan <- &tsdb.QueryResult{
|
|
Error: theErr,
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
queryResponses, err := e.executeGetMetricDataQuery(ectx, region, q, queryContext)
|
|
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
|
|
return err
|
|
}
|
|
for _, queryRes := range queryResponses {
|
|
if err != nil {
|
|
queryRes.Error = err
|
|
}
|
|
resultChan <- queryRes
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
if err := eg.Wait(); err != nil {
|
|
return nil, err
|
|
}
|
|
close(resultChan)
|
|
for result := range resultChan {
|
|
results.Results[result.RefId] = result
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string, label string) string {
|
|
region := query.Region
|
|
namespace := query.Namespace
|
|
metricName := query.MetricName
|
|
period := strconv.Itoa(query.Period)
|
|
if len(query.Id) > 0 && len(query.Expression) > 0 {
|
|
if strings.Index(query.Expression, "SEARCH(") == 0 {
|
|
pIndex := strings.LastIndex(query.Expression, ",")
|
|
period = strings.Trim(query.Expression[pIndex+1:], " )")
|
|
sIndex := strings.LastIndex(query.Expression[:pIndex], ",")
|
|
stat = strings.Trim(query.Expression[sIndex+1:pIndex], " '")
|
|
} else if len(query.Alias) > 0 {
|
|
// expand by Alias
|
|
} else {
|
|
return query.Id
|
|
}
|
|
}
|
|
|
|
data := map[string]string{}
|
|
data["region"] = region
|
|
data["namespace"] = namespace
|
|
data["metric"] = metricName
|
|
data["stat"] = stat
|
|
data["period"] = period
|
|
if len(label) != 0 {
|
|
data["label"] = label
|
|
}
|
|
for k, v := range dimensions {
|
|
data[k] = v
|
|
}
|
|
|
|
result := aliasFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
|
|
labelName := strings.Replace(string(in), "{{", "", 1)
|
|
labelName = strings.Replace(labelName, "}}", "", 1)
|
|
labelName = strings.TrimSpace(labelName)
|
|
if val, exists := data[labelName]; exists {
|
|
return []byte(val)
|
|
}
|
|
|
|
return in
|
|
})
|
|
|
|
if string(result) == "" {
|
|
return metricName + "_" + stat
|
|
}
|
|
|
|
return string(result)
|
|
}
|