mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
CloudWatch: GetMetricData refactoring & fix label handling (#16383)
* fix label handling * SEARCH() results could have multiple namespace, remove from legend * divide GetMetricStatistics related code * divide GetMetricData related code * divide parseGetMetricDataResponse() * divide parseGetMetricDataQuery() * divide test code * add test for GetMetricData * add test for GetMetricData parse response * fix bug of terminating gap * fix gofmt
This commit is contained in:
committed by
Torkel Ödegaard
parent
b37ee65bd3
commit
864a6262fe
@@ -2,29 +2,18 @@ package cloudwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
||||
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type CloudWatchExecutor struct {
|
||||
@@ -197,340 +186,33 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
|
||||
client, err := e.getClient(query.Region)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
startTime, err := queryContext.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endTime, err := queryContext.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !startTime.Before(endTime) {
|
||||
return nil, fmt.Errorf("Invalid time range: Start time must be before end time")
|
||||
}
|
||||
|
||||
params := &cloudwatch.GetMetricStatisticsInput{
|
||||
Namespace: aws.String(query.Namespace),
|
||||
MetricName: aws.String(query.MetricName),
|
||||
Dimensions: query.Dimensions,
|
||||
Period: aws.Int64(int64(query.Period)),
|
||||
}
|
||||
if len(query.Statistics) > 0 {
|
||||
params.Statistics = query.Statistics
|
||||
}
|
||||
if len(query.ExtendedStatistics) > 0 {
|
||||
params.ExtendedStatistics = query.ExtendedStatistics
|
||||
}
|
||||
|
||||
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
|
||||
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
|
||||
return nil, errors.New("too long query period")
|
||||
}
|
||||
var resp *cloudwatch.GetMetricStatisticsOutput
|
||||
for startTime.Before(endTime) {
|
||||
params.StartTime = aws.Time(startTime)
|
||||
if query.HighResolution {
|
||||
startTime = startTime.Add(time.Duration(1440*query.Period) * time.Second)
|
||||
} else {
|
||||
startTime = endTime
|
||||
}
|
||||
params.EndTime = aws.Time(startTime)
|
||||
|
||||
if setting.Env == setting.DEV {
|
||||
plog.Debug("CloudWatch query", "raw query", params)
|
||||
}
|
||||
|
||||
partResp, err := client.GetMetricStatisticsWithContext(ctx, params, request.WithResponseReadTimeout(10*time.Second))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp != nil {
|
||||
resp.Datapoints = append(resp.Datapoints, partResp.Datapoints...)
|
||||
} else {
|
||||
resp = partResp
|
||||
|
||||
}
|
||||
metrics.M_Aws_CloudWatch_GetMetricStatistics.Inc()
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return queryRes, nil
|
||||
}
|
||||
|
||||
func (e *CloudWatchExecutor) executeGetMetricDataQuery(ctx context.Context, region string, queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) ([]*tsdb.QueryResult, error) {
|
||||
queryResponses := make([]*tsdb.QueryResult, 0)
|
||||
|
||||
// validate query
|
||||
for _, query := range queries {
|
||||
if !(len(query.Statistics) == 1 && len(query.ExtendedStatistics) == 0) &&
|
||||
!(len(query.Statistics) == 0 && len(query.ExtendedStatistics) == 1) {
|
||||
return queryResponses, errors.New("Statistics count should be 1")
|
||||
}
|
||||
}
|
||||
|
||||
client, err := e.getClient(region)
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
|
||||
startTime, err := queryContext.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
|
||||
endTime, err := queryContext.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
|
||||
params := &cloudwatch.GetMetricDataInput{
|
||||
StartTime: aws.Time(startTime),
|
||||
EndTime: aws.Time(endTime),
|
||||
ScanBy: aws.String("TimestampAscending"),
|
||||
}
|
||||
for _, query := range queries {
|
||||
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
|
||||
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
|
||||
return queryResponses, errors.New("too long query period")
|
||||
}
|
||||
|
||||
mdq := &cloudwatch.MetricDataQuery{
|
||||
Id: aws.String(query.Id),
|
||||
ReturnData: aws.Bool(query.ReturnData),
|
||||
}
|
||||
if query.Expression != "" {
|
||||
mdq.Expression = aws.String(query.Expression)
|
||||
} else {
|
||||
mdq.MetricStat = &cloudwatch.MetricStat{
|
||||
Metric: &cloudwatch.Metric{
|
||||
Namespace: aws.String(query.Namespace),
|
||||
MetricName: aws.String(query.MetricName),
|
||||
},
|
||||
Period: aws.Int64(int64(query.Period)),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
mdq.MetricStat.Metric.Dimensions = append(mdq.MetricStat.Metric.Dimensions,
|
||||
&cloudwatch.Dimension{
|
||||
Name: d.Name,
|
||||
Value: d.Value,
|
||||
})
|
||||
}
|
||||
if len(query.Statistics) == 1 {
|
||||
mdq.MetricStat.Stat = query.Statistics[0]
|
||||
} else {
|
||||
mdq.MetricStat.Stat = query.ExtendedStatistics[0]
|
||||
}
|
||||
}
|
||||
params.MetricDataQueries = append(params.MetricDataQueries, mdq)
|
||||
}
|
||||
|
||||
nextToken := ""
|
||||
mdr := make(map[string]*cloudwatch.MetricDataResult)
|
||||
for {
|
||||
if nextToken != "" {
|
||||
params.NextToken = aws.String(nextToken)
|
||||
}
|
||||
resp, err := client.GetMetricDataWithContext(ctx, params)
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
metrics.M_Aws_CloudWatch_GetMetricData.Add(float64(len(params.MetricDataQueries)))
|
||||
|
||||
for _, r := range resp.MetricDataResults {
|
||||
if _, ok := mdr[*r.Id]; !ok {
|
||||
mdr[*r.Id] = r
|
||||
} else {
|
||||
mdr[*r.Id].Timestamps = append(mdr[*r.Id].Timestamps, r.Timestamps...)
|
||||
mdr[*r.Id].Values = append(mdr[*r.Id].Values, r.Values...)
|
||||
}
|
||||
}
|
||||
|
||||
if resp.NextToken == nil || *resp.NextToken == "" {
|
||||
break
|
||||
}
|
||||
nextToken = *resp.NextToken
|
||||
}
|
||||
|
||||
for i, r := range mdr {
|
||||
if *r.StatusCode != "Complete" {
|
||||
return queryResponses, fmt.Errorf("Part of query is failed: %s", *r.StatusCode)
|
||||
}
|
||||
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
queryRes.RefId = queries[i].RefId
|
||||
query := queries[*r.Id]
|
||||
|
||||
series := tsdb.TimeSeries{
|
||||
Tags: map[string]string{},
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
series.Tags[*d.Name] = *d.Value
|
||||
}
|
||||
s := ""
|
||||
if len(query.Statistics) == 1 {
|
||||
s = *query.Statistics[0]
|
||||
} else {
|
||||
s = *query.ExtendedStatistics[0]
|
||||
}
|
||||
series.Name = formatAlias(query, s, series.Tags)
|
||||
|
||||
for j, t := range r.Timestamps {
|
||||
expectedTimestamp := r.Timestamps[j].Add(time.Duration(query.Period) * time.Second)
|
||||
if j > 0 && expectedTimestamp.Before(*t) {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
|
||||
}
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*r.Values[j]), float64((*t).Unix())*1000))
|
||||
}
|
||||
|
||||
queryRes.Series = append(queryRes.Series, &series)
|
||||
queryRes.Meta = simplejson.New()
|
||||
queryResponses = append(queryResponses, queryRes)
|
||||
}
|
||||
|
||||
return queryResponses, nil
|
||||
}
|
||||
|
||||
func parseDimensions(model *simplejson.Json) ([]*cloudwatch.Dimension, error) {
|
||||
var result []*cloudwatch.Dimension
|
||||
|
||||
for k, v := range model.Get("dimensions").MustMap() {
|
||||
kk := k
|
||||
if vv, ok := v.(string); ok {
|
||||
result = append(result, &cloudwatch.Dimension{
|
||||
Name: &kk,
|
||||
Value: &vv,
|
||||
})
|
||||
} else {
|
||||
return nil, errors.New("failed to parse")
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return *result[i].Name < *result[j].Name
|
||||
})
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func parseStatistics(model *simplejson.Json) ([]string, []string, error) {
|
||||
var statistics []string
|
||||
var extendedStatistics []string
|
||||
|
||||
for _, s := range model.Get("statistics").MustArray() {
|
||||
if ss, ok := s.(string); ok {
|
||||
if _, isStandard := standardStatistics[ss]; isStandard {
|
||||
statistics = append(statistics, ss)
|
||||
} else {
|
||||
extendedStatistics = append(extendedStatistics, ss)
|
||||
}
|
||||
} else {
|
||||
return nil, nil, errors.New("failed to parse")
|
||||
}
|
||||
}
|
||||
|
||||
return statistics, extendedStatistics, nil
|
||||
}
|
||||
|
||||
func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
|
||||
region, err := model.Get("region").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
namespace, err := model.Get("namespace").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metricName, err := model.Get("metricName").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := model.Get("id").MustString("")
|
||||
expression := model.Get("expression").MustString("")
|
||||
|
||||
dimensions, err := parseDimensions(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statistics, extendedStatistics, err := parseStatistics(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := model.Get("period").MustString("")
|
||||
if p == "" {
|
||||
if namespace == "AWS/EC2" {
|
||||
p = "300"
|
||||
} else {
|
||||
p = "60"
|
||||
}
|
||||
}
|
||||
|
||||
var period int
|
||||
if regexp.MustCompile(`^\d+$`).Match([]byte(p)) {
|
||||
period, err = strconv.Atoi(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
d, err := time.ParseDuration(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
period = int(d.Seconds())
|
||||
}
|
||||
|
||||
alias := model.Get("alias").MustString()
|
||||
|
||||
returnData := model.Get("returnData").MustBool(false)
|
||||
highResolution := model.Get("highResolution").MustBool(false)
|
||||
|
||||
return &CloudWatchQuery{
|
||||
Region: region,
|
||||
Namespace: namespace,
|
||||
MetricName: metricName,
|
||||
Dimensions: dimensions,
|
||||
Statistics: aws.StringSlice(statistics),
|
||||
ExtendedStatistics: aws.StringSlice(extendedStatistics),
|
||||
Period: period,
|
||||
Alias: alias,
|
||||
Id: id,
|
||||
Expression: expression,
|
||||
ReturnData: returnData,
|
||||
HighResolution: highResolution,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string) string {
|
||||
func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string, label string) string {
|
||||
region := query.Region
|
||||
namespace := query.Namespace
|
||||
metricName := query.MetricName
|
||||
period := strconv.Itoa(query.Period)
|
||||
if len(query.Id) > 0 && len(query.Expression) > 0 {
|
||||
if len(query.Alias) > 0 {
|
||||
return query.Alias
|
||||
if strings.Index(query.Expression, "SEARCH(") == 0 {
|
||||
pIndex := strings.LastIndex(query.Expression, ",")
|
||||
period = strings.Trim(query.Expression[pIndex+1:], " )")
|
||||
sIndex := strings.LastIndex(query.Expression[:pIndex], ",")
|
||||
stat = strings.Trim(query.Expression[sIndex+1:pIndex], " '")
|
||||
} else if len(query.Alias) > 0 {
|
||||
// expand by Alias
|
||||
} else {
|
||||
return query.Id
|
||||
}
|
||||
}
|
||||
|
||||
data := map[string]string{}
|
||||
data["region"] = query.Region
|
||||
data["namespace"] = query.Namespace
|
||||
data["metric"] = query.MetricName
|
||||
data["region"] = region
|
||||
data["namespace"] = namespace
|
||||
data["metric"] = metricName
|
||||
data["stat"] = stat
|
||||
data["period"] = strconv.Itoa(query.Period)
|
||||
data["period"] = period
|
||||
if len(label) != 0 {
|
||||
data["label"] = label
|
||||
}
|
||||
for k, v := range dimensions {
|
||||
data[k] = v
|
||||
}
|
||||
@@ -548,66 +230,3 @@ func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]stri
|
||||
|
||||
return string(result)
|
||||
}
|
||||
|
||||
func parseResponse(resp *cloudwatch.GetMetricStatisticsOutput, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
|
||||
queryRes.RefId = query.RefId
|
||||
var value float64
|
||||
for _, s := range append(query.Statistics, query.ExtendedStatistics...) {
|
||||
series := tsdb.TimeSeries{
|
||||
Tags: map[string]string{},
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
series.Tags[*d.Name] = *d.Value
|
||||
}
|
||||
series.Name = formatAlias(query, *s, series.Tags)
|
||||
|
||||
lastTimestamp := make(map[string]time.Time)
|
||||
sort.Slice(resp.Datapoints, func(i, j int) bool {
|
||||
return (*resp.Datapoints[i].Timestamp).Before(*resp.Datapoints[j].Timestamp)
|
||||
})
|
||||
for _, v := range resp.Datapoints {
|
||||
switch *s {
|
||||
case "Average":
|
||||
value = *v.Average
|
||||
case "Maximum":
|
||||
value = *v.Maximum
|
||||
case "Minimum":
|
||||
value = *v.Minimum
|
||||
case "Sum":
|
||||
value = *v.Sum
|
||||
case "SampleCount":
|
||||
value = *v.SampleCount
|
||||
default:
|
||||
if strings.Index(*s, "p") == 0 && v.ExtendedStatistics[*s] != nil {
|
||||
value = *v.ExtendedStatistics[*s]
|
||||
}
|
||||
}
|
||||
|
||||
// terminate gap of data points
|
||||
timestamp := *v.Timestamp
|
||||
if _, ok := lastTimestamp[*s]; ok {
|
||||
nextTimestampFromLast := lastTimestamp[*s].Add(time.Duration(query.Period) * time.Second)
|
||||
for timestamp.After(nextTimestampFromLast) {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(nextTimestampFromLast.Unix()*1000)))
|
||||
nextTimestampFromLast = nextTimestampFromLast.Add(time.Duration(query.Period) * time.Second)
|
||||
}
|
||||
}
|
||||
lastTimestamp[*s] = timestamp
|
||||
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), float64(timestamp.Unix()*1000)))
|
||||
}
|
||||
|
||||
queryRes.Series = append(queryRes.Series, &series)
|
||||
queryRes.Meta = simplejson.New()
|
||||
if len(resp.Datapoints) > 0 && resp.Datapoints[0].Unit != nil {
|
||||
if unit, ok := cloudwatchUnitMappings[*resp.Datapoints[0].Unit]; ok {
|
||||
queryRes.Meta.Set("unit", unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queryRes, nil
|
||||
}
|
||||
|
||||
@@ -3,14 +3,10 @@ package cloudwatch
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
@@ -35,175 +31,5 @@ func TestCloudWatch(t *testing.T) {
|
||||
So(err.Error(), ShouldEqual, "Invalid time range: Start time must be before end time")
|
||||
})
|
||||
})
|
||||
|
||||
Convey("can parse cloudwatch json model", func() {
|
||||
json := `
|
||||
{
|
||||
"region": "us-east-1",
|
||||
"namespace": "AWS/ApplicationELB",
|
||||
"metricName": "TargetResponseTime",
|
||||
"dimensions": {
|
||||
"LoadBalancer": "lb",
|
||||
"TargetGroup": "tg"
|
||||
},
|
||||
"statistics": [
|
||||
"Average",
|
||||
"Maximum",
|
||||
"p50.00",
|
||||
"p90.00"
|
||||
],
|
||||
"period": "60",
|
||||
"highResolution": false,
|
||||
"alias": "{{metric}}_{{stat}}"
|
||||
}
|
||||
`
|
||||
modelJson, err := simplejson.NewJson([]byte(json))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res, err := parseQuery(modelJson)
|
||||
So(err, ShouldBeNil)
|
||||
So(res.Region, ShouldEqual, "us-east-1")
|
||||
So(res.Namespace, ShouldEqual, "AWS/ApplicationELB")
|
||||
So(res.MetricName, ShouldEqual, "TargetResponseTime")
|
||||
So(len(res.Dimensions), ShouldEqual, 2)
|
||||
So(*res.Dimensions[0].Name, ShouldEqual, "LoadBalancer")
|
||||
So(*res.Dimensions[0].Value, ShouldEqual, "lb")
|
||||
So(*res.Dimensions[1].Name, ShouldEqual, "TargetGroup")
|
||||
So(*res.Dimensions[1].Value, ShouldEqual, "tg")
|
||||
So(len(res.Statistics), ShouldEqual, 2)
|
||||
So(*res.Statistics[0], ShouldEqual, "Average")
|
||||
So(*res.Statistics[1], ShouldEqual, "Maximum")
|
||||
So(len(res.ExtendedStatistics), ShouldEqual, 2)
|
||||
So(*res.ExtendedStatistics[0], ShouldEqual, "p50.00")
|
||||
So(*res.ExtendedStatistics[1], ShouldEqual, "p90.00")
|
||||
So(res.Period, ShouldEqual, 60)
|
||||
So(res.Alias, ShouldEqual, "{{metric}}_{{stat}}")
|
||||
})
|
||||
|
||||
Convey("can parse cloudwatch response", func() {
|
||||
timestamp := time.Unix(0, 0)
|
||||
resp := &cloudwatch.GetMetricStatisticsOutput{
|
||||
Label: aws.String("TargetResponseTime"),
|
||||
Datapoints: []*cloudwatch.Datapoint{
|
||||
{
|
||||
Timestamp: aws.Time(timestamp),
|
||||
Average: aws.Float64(10.0),
|
||||
Maximum: aws.Float64(20.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(30.0),
|
||||
"p90.00": aws.Float64(40.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
},
|
||||
}
|
||||
query := &CloudWatchQuery{
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/ApplicationELB",
|
||||
MetricName: "TargetResponseTime",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("LoadBalancer"),
|
||||
Value: aws.String("lb"),
|
||||
},
|
||||
{
|
||||
Name: aws.String("TargetGroup"),
|
||||
Value: aws.String("tg"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average"), aws.String("Maximum")},
|
||||
ExtendedStatistics: []*string{aws.String("p50.00"), aws.String("p90.00")},
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
So(err, ShouldBeNil)
|
||||
So(queryRes.Series[0].Name, ShouldEqual, "AWS/ApplicationELB_TargetResponseTime_Average")
|
||||
So(queryRes.Series[0].Tags["LoadBalancer"], ShouldEqual, "lb")
|
||||
So(queryRes.Series[0].Tags["TargetGroup"], ShouldEqual, "tg")
|
||||
So(queryRes.Series[0].Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
|
||||
So(queryRes.Series[1].Points[0][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[2].Points[0][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[3].Points[0][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Meta.Get("unit").MustString(), ShouldEqual, "s")
|
||||
})
|
||||
|
||||
Convey("terminate gap of data points", func() {
|
||||
timestamp := time.Unix(0, 0)
|
||||
resp := &cloudwatch.GetMetricStatisticsOutput{
|
||||
Label: aws.String("TargetResponseTime"),
|
||||
Datapoints: []*cloudwatch.Datapoint{
|
||||
{
|
||||
Timestamp: aws.Time(timestamp),
|
||||
Average: aws.Float64(10.0),
|
||||
Maximum: aws.Float64(20.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(30.0),
|
||||
"p90.00": aws.Float64(40.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
{
|
||||
Timestamp: aws.Time(timestamp.Add(60 * time.Second)),
|
||||
Average: aws.Float64(20.0),
|
||||
Maximum: aws.Float64(30.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(40.0),
|
||||
"p90.00": aws.Float64(50.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
{
|
||||
Timestamp: aws.Time(timestamp.Add(180 * time.Second)),
|
||||
Average: aws.Float64(30.0),
|
||||
Maximum: aws.Float64(40.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(50.0),
|
||||
"p90.00": aws.Float64(60.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
},
|
||||
}
|
||||
query := &CloudWatchQuery{
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/ApplicationELB",
|
||||
MetricName: "TargetResponseTime",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("LoadBalancer"),
|
||||
Value: aws.String("lb"),
|
||||
},
|
||||
{
|
||||
Name: aws.String("TargetGroup"),
|
||||
Value: aws.String("tg"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average"), aws.String("Maximum")},
|
||||
ExtendedStatistics: []*string{aws.String("p50.00"), aws.String("p90.00")},
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
So(err, ShouldBeNil)
|
||||
So(queryRes.Series[0].Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
|
||||
So(queryRes.Series[1].Points[0][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[2].Points[0][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[3].Points[0][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[0].Points[1][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[1].Points[1][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[2].Points[1][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[3].Points[1][0].String(), ShouldEqual, null.FloatFrom(50.0).String())
|
||||
So(queryRes.Series[0].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[1].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[2].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[3].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[0].Points[3][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[1].Points[3][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[2].Points[3][0].String(), ShouldEqual, null.FloatFrom(50.0).String())
|
||||
So(queryRes.Series[3].Points[3][0].String(), ShouldEqual, null.FloatFrom(60.0).String())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
171
pkg/tsdb/cloudwatch/get_metric_data.go
Normal file
171
pkg/tsdb/cloudwatch/get_metric_data.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
func (e *CloudWatchExecutor) executeGetMetricDataQuery(ctx context.Context, region string, queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) ([]*tsdb.QueryResult, error) {
|
||||
queryResponses := make([]*tsdb.QueryResult, 0)
|
||||
|
||||
client, err := e.getClient(region)
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
|
||||
params, err := parseGetMetricDataQuery(queries, queryContext)
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
|
||||
nextToken := ""
|
||||
mdr := make(map[string]map[string]*cloudwatch.MetricDataResult)
|
||||
for {
|
||||
if nextToken != "" {
|
||||
params.NextToken = aws.String(nextToken)
|
||||
}
|
||||
resp, err := client.GetMetricDataWithContext(ctx, params)
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
metrics.M_Aws_CloudWatch_GetMetricData.Add(float64(len(params.MetricDataQueries)))
|
||||
|
||||
for _, r := range resp.MetricDataResults {
|
||||
if _, ok := mdr[*r.Id]; !ok {
|
||||
mdr[*r.Id] = make(map[string]*cloudwatch.MetricDataResult)
|
||||
mdr[*r.Id][*r.Label] = r
|
||||
} else if _, ok := mdr[*r.Id][*r.Label]; !ok {
|
||||
mdr[*r.Id][*r.Label] = r
|
||||
} else {
|
||||
mdr[*r.Id][*r.Label].Timestamps = append(mdr[*r.Id][*r.Label].Timestamps, r.Timestamps...)
|
||||
mdr[*r.Id][*r.Label].Values = append(mdr[*r.Id][*r.Label].Values, r.Values...)
|
||||
}
|
||||
}
|
||||
|
||||
if resp.NextToken == nil || *resp.NextToken == "" {
|
||||
break
|
||||
}
|
||||
nextToken = *resp.NextToken
|
||||
}
|
||||
|
||||
for id, lr := range mdr {
|
||||
queryRes, err := parseGetMetricDataResponse(lr, queries[id])
|
||||
if err != nil {
|
||||
return queryResponses, err
|
||||
}
|
||||
queryResponses = append(queryResponses, queryRes)
|
||||
}
|
||||
|
||||
return queryResponses, nil
|
||||
}
|
||||
|
||||
func parseGetMetricDataQuery(queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*cloudwatch.GetMetricDataInput, error) {
|
||||
// validate query
|
||||
for _, query := range queries {
|
||||
if !(len(query.Statistics) == 1 && len(query.ExtendedStatistics) == 0) &&
|
||||
!(len(query.Statistics) == 0 && len(query.ExtendedStatistics) == 1) {
|
||||
return nil, errors.New("Statistics count should be 1")
|
||||
}
|
||||
}
|
||||
|
||||
startTime, err := queryContext.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endTime, err := queryContext.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params := &cloudwatch.GetMetricDataInput{
|
||||
StartTime: aws.Time(startTime),
|
||||
EndTime: aws.Time(endTime),
|
||||
ScanBy: aws.String("TimestampAscending"),
|
||||
}
|
||||
for _, query := range queries {
|
||||
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
|
||||
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
|
||||
return nil, errors.New("too long query period")
|
||||
}
|
||||
|
||||
mdq := &cloudwatch.MetricDataQuery{
|
||||
Id: aws.String(query.Id),
|
||||
ReturnData: aws.Bool(query.ReturnData),
|
||||
}
|
||||
if query.Expression != "" {
|
||||
mdq.Expression = aws.String(query.Expression)
|
||||
} else {
|
||||
mdq.MetricStat = &cloudwatch.MetricStat{
|
||||
Metric: &cloudwatch.Metric{
|
||||
Namespace: aws.String(query.Namespace),
|
||||
MetricName: aws.String(query.MetricName),
|
||||
},
|
||||
Period: aws.Int64(int64(query.Period)),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
mdq.MetricStat.Metric.Dimensions = append(mdq.MetricStat.Metric.Dimensions,
|
||||
&cloudwatch.Dimension{
|
||||
Name: d.Name,
|
||||
Value: d.Value,
|
||||
})
|
||||
}
|
||||
if len(query.Statistics) == 1 {
|
||||
mdq.MetricStat.Stat = query.Statistics[0]
|
||||
} else {
|
||||
mdq.MetricStat.Stat = query.ExtendedStatistics[0]
|
||||
}
|
||||
}
|
||||
params.MetricDataQueries = append(params.MetricDataQueries, mdq)
|
||||
}
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func parseGetMetricDataResponse(lr map[string]*cloudwatch.MetricDataResult, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
queryRes.RefId = query.RefId
|
||||
|
||||
for label, r := range lr {
|
||||
if *r.StatusCode != "Complete" {
|
||||
return queryRes, fmt.Errorf("Part of query is failed: %s", *r.StatusCode)
|
||||
}
|
||||
|
||||
series := tsdb.TimeSeries{
|
||||
Tags: map[string]string{},
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
series.Tags[*d.Name] = *d.Value
|
||||
}
|
||||
s := ""
|
||||
if len(query.Statistics) == 1 {
|
||||
s = *query.Statistics[0]
|
||||
} else {
|
||||
s = *query.ExtendedStatistics[0]
|
||||
}
|
||||
series.Name = formatAlias(query, s, series.Tags, label)
|
||||
|
||||
for j, t := range r.Timestamps {
|
||||
if j > 0 {
|
||||
expectedTimestamp := r.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second)
|
||||
if expectedTimestamp.Before(*t) {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
|
||||
}
|
||||
}
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*r.Values[j]), float64((*t).Unix())*1000))
|
||||
}
|
||||
|
||||
queryRes.Series = append(queryRes.Series, &series)
|
||||
queryRes.Meta = simplejson.New()
|
||||
}
|
||||
return queryRes, nil
|
||||
}
|
||||
113
pkg/tsdb/cloudwatch/get_metric_data_test.go
Normal file
113
pkg/tsdb/cloudwatch/get_metric_data_test.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestCloudWatchGetMetricData(t *testing.T) {
|
||||
Convey("CloudWatchGetMetricData", t, func() {
|
||||
|
||||
Convey("can parse cloudwatch GetMetricData query", func() {
|
||||
queries := map[string]*CloudWatchQuery{
|
||||
"id1": {
|
||||
RefId: "A",
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/EC2",
|
||||
MetricName: "CPUUtilization",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("InstanceId"),
|
||||
Value: aws.String("i-12345678"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average")},
|
||||
Period: 300,
|
||||
Id: "id1",
|
||||
Expression: "",
|
||||
ReturnData: true,
|
||||
},
|
||||
"id2": {
|
||||
RefId: "B",
|
||||
Region: "us-east-1",
|
||||
Statistics: []*string{aws.String("Average")},
|
||||
Id: "id2",
|
||||
Expression: "id1 * 2",
|
||||
ReturnData: true,
|
||||
},
|
||||
}
|
||||
queryContext := &tsdb.TsdbQuery{
|
||||
TimeRange: tsdb.NewFakeTimeRange("5m", "now", time.Now()),
|
||||
}
|
||||
res, err := parseGetMetricDataQuery(queries, queryContext)
|
||||
So(err, ShouldBeNil)
|
||||
So(*res.MetricDataQueries[0].MetricStat.Metric.Namespace, ShouldEqual, "AWS/EC2")
|
||||
So(*res.MetricDataQueries[0].MetricStat.Metric.MetricName, ShouldEqual, "CPUUtilization")
|
||||
So(*res.MetricDataQueries[0].MetricStat.Metric.Dimensions[0].Name, ShouldEqual, "InstanceId")
|
||||
So(*res.MetricDataQueries[0].MetricStat.Metric.Dimensions[0].Value, ShouldEqual, "i-12345678")
|
||||
So(*res.MetricDataQueries[0].MetricStat.Period, ShouldEqual, 300)
|
||||
So(*res.MetricDataQueries[0].MetricStat.Stat, ShouldEqual, "Average")
|
||||
So(*res.MetricDataQueries[0].Id, ShouldEqual, "id1")
|
||||
So(*res.MetricDataQueries[0].ReturnData, ShouldEqual, true)
|
||||
So(*res.MetricDataQueries[1].Id, ShouldEqual, "id2")
|
||||
So(*res.MetricDataQueries[1].Expression, ShouldEqual, "id1 * 2")
|
||||
So(*res.MetricDataQueries[1].ReturnData, ShouldEqual, true)
|
||||
})
|
||||
|
||||
Convey("can parse cloudwatch response", func() {
|
||||
timestamp := time.Unix(0, 0)
|
||||
resp := map[string]*cloudwatch.MetricDataResult{
|
||||
"label": {
|
||||
Id: aws.String("id1"),
|
||||
Label: aws.String("label"),
|
||||
Timestamps: []*time.Time{
|
||||
aws.Time(timestamp),
|
||||
aws.Time(timestamp.Add(60 * time.Second)),
|
||||
aws.Time(timestamp.Add(180 * time.Second)),
|
||||
},
|
||||
Values: []*float64{
|
||||
aws.Float64(10),
|
||||
aws.Float64(20),
|
||||
aws.Float64(30),
|
||||
},
|
||||
StatusCode: aws.String("Complete"),
|
||||
},
|
||||
}
|
||||
query := &CloudWatchQuery{
|
||||
RefId: "refId1",
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/ApplicationELB",
|
||||
MetricName: "TargetResponseTime",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("LoadBalancer"),
|
||||
Value: aws.String("lb"),
|
||||
},
|
||||
{
|
||||
Name: aws.String("TargetGroup"),
|
||||
Value: aws.String("tg"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average")},
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
queryRes, err := parseGetMetricDataResponse(resp, query)
|
||||
So(err, ShouldBeNil)
|
||||
So(queryRes.RefId, ShouldEqual, "refId1")
|
||||
So(queryRes.Series[0].Name, ShouldEqual, "AWS/ApplicationELB_TargetResponseTime_Average")
|
||||
So(queryRes.Series[0].Tags["LoadBalancer"], ShouldEqual, "lb")
|
||||
So(queryRes.Series[0].Tags["TargetGroup"], ShouldEqual, "tg")
|
||||
So(queryRes.Series[0].Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
|
||||
So(queryRes.Series[0].Points[1][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[0].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[0].Points[3][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
})
|
||||
})
|
||||
}
|
||||
269
pkg/tsdb/cloudwatch/get_metric_statistics.go
Normal file
269
pkg/tsdb/cloudwatch/get_metric_statistics.go
Normal file
@@ -0,0 +1,269 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
|
||||
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
|
||||
client, err := e.getClient(query.Region)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
startTime, err := queryContext.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
endTime, err := queryContext.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !startTime.Before(endTime) {
|
||||
return nil, fmt.Errorf("Invalid time range: Start time must be before end time")
|
||||
}
|
||||
|
||||
params := &cloudwatch.GetMetricStatisticsInput{
|
||||
Namespace: aws.String(query.Namespace),
|
||||
MetricName: aws.String(query.MetricName),
|
||||
Dimensions: query.Dimensions,
|
||||
Period: aws.Int64(int64(query.Period)),
|
||||
}
|
||||
if len(query.Statistics) > 0 {
|
||||
params.Statistics = query.Statistics
|
||||
}
|
||||
if len(query.ExtendedStatistics) > 0 {
|
||||
params.ExtendedStatistics = query.ExtendedStatistics
|
||||
}
|
||||
|
||||
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
|
||||
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
|
||||
return nil, errors.New("too long query period")
|
||||
}
|
||||
var resp *cloudwatch.GetMetricStatisticsOutput
|
||||
for startTime.Before(endTime) {
|
||||
params.StartTime = aws.Time(startTime)
|
||||
if query.HighResolution {
|
||||
startTime = startTime.Add(time.Duration(1440*query.Period) * time.Second)
|
||||
} else {
|
||||
startTime = endTime
|
||||
}
|
||||
params.EndTime = aws.Time(startTime)
|
||||
|
||||
if setting.Env == setting.DEV {
|
||||
plog.Debug("CloudWatch query", "raw query", params)
|
||||
}
|
||||
|
||||
partResp, err := client.GetMetricStatisticsWithContext(ctx, params, request.WithResponseReadTimeout(10*time.Second))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp != nil {
|
||||
resp.Datapoints = append(resp.Datapoints, partResp.Datapoints...)
|
||||
} else {
|
||||
resp = partResp
|
||||
|
||||
}
|
||||
metrics.M_Aws_CloudWatch_GetMetricStatistics.Inc()
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return queryRes, nil
|
||||
}
|
||||
|
||||
func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
|
||||
region, err := model.Get("region").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
namespace, err := model.Get("namespace").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metricName, err := model.Get("metricName").String()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := model.Get("id").MustString("")
|
||||
expression := model.Get("expression").MustString("")
|
||||
|
||||
dimensions, err := parseDimensions(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statistics, extendedStatistics, err := parseStatistics(model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := model.Get("period").MustString("")
|
||||
if p == "" {
|
||||
if namespace == "AWS/EC2" {
|
||||
p = "300"
|
||||
} else {
|
||||
p = "60"
|
||||
}
|
||||
}
|
||||
|
||||
var period int
|
||||
if regexp.MustCompile(`^\d+$`).Match([]byte(p)) {
|
||||
period, err = strconv.Atoi(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
d, err := time.ParseDuration(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
period = int(d.Seconds())
|
||||
}
|
||||
|
||||
alias := model.Get("alias").MustString()
|
||||
|
||||
returnData := model.Get("returnData").MustBool(false)
|
||||
highResolution := model.Get("highResolution").MustBool(false)
|
||||
|
||||
return &CloudWatchQuery{
|
||||
Region: region,
|
||||
Namespace: namespace,
|
||||
MetricName: metricName,
|
||||
Dimensions: dimensions,
|
||||
Statistics: aws.StringSlice(statistics),
|
||||
ExtendedStatistics: aws.StringSlice(extendedStatistics),
|
||||
Period: period,
|
||||
Alias: alias,
|
||||
Id: id,
|
||||
Expression: expression,
|
||||
ReturnData: returnData,
|
||||
HighResolution: highResolution,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseDimensions(model *simplejson.Json) ([]*cloudwatch.Dimension, error) {
|
||||
var result []*cloudwatch.Dimension
|
||||
|
||||
for k, v := range model.Get("dimensions").MustMap() {
|
||||
kk := k
|
||||
if vv, ok := v.(string); ok {
|
||||
result = append(result, &cloudwatch.Dimension{
|
||||
Name: &kk,
|
||||
Value: &vv,
|
||||
})
|
||||
} else {
|
||||
return nil, errors.New("failed to parse")
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return *result[i].Name < *result[j].Name
|
||||
})
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func parseStatistics(model *simplejson.Json) ([]string, []string, error) {
|
||||
var statistics []string
|
||||
var extendedStatistics []string
|
||||
|
||||
for _, s := range model.Get("statistics").MustArray() {
|
||||
if ss, ok := s.(string); ok {
|
||||
if _, isStandard := standardStatistics[ss]; isStandard {
|
||||
statistics = append(statistics, ss)
|
||||
} else {
|
||||
extendedStatistics = append(extendedStatistics, ss)
|
||||
}
|
||||
} else {
|
||||
return nil, nil, errors.New("failed to parse")
|
||||
}
|
||||
}
|
||||
|
||||
return statistics, extendedStatistics, nil
|
||||
}
|
||||
|
||||
func parseResponse(resp *cloudwatch.GetMetricStatisticsOutput, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
|
||||
queryRes := tsdb.NewQueryResult()
|
||||
|
||||
queryRes.RefId = query.RefId
|
||||
var value float64
|
||||
for _, s := range append(query.Statistics, query.ExtendedStatistics...) {
|
||||
series := tsdb.TimeSeries{
|
||||
Tags: map[string]string{},
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
for _, d := range query.Dimensions {
|
||||
series.Tags[*d.Name] = *d.Value
|
||||
}
|
||||
series.Name = formatAlias(query, *s, series.Tags, "")
|
||||
|
||||
lastTimestamp := make(map[string]time.Time)
|
||||
sort.Slice(resp.Datapoints, func(i, j int) bool {
|
||||
return (*resp.Datapoints[i].Timestamp).Before(*resp.Datapoints[j].Timestamp)
|
||||
})
|
||||
for _, v := range resp.Datapoints {
|
||||
switch *s {
|
||||
case "Average":
|
||||
value = *v.Average
|
||||
case "Maximum":
|
||||
value = *v.Maximum
|
||||
case "Minimum":
|
||||
value = *v.Minimum
|
||||
case "Sum":
|
||||
value = *v.Sum
|
||||
case "SampleCount":
|
||||
value = *v.SampleCount
|
||||
default:
|
||||
if strings.Index(*s, "p") == 0 && v.ExtendedStatistics[*s] != nil {
|
||||
value = *v.ExtendedStatistics[*s]
|
||||
}
|
||||
}
|
||||
|
||||
// terminate gap of data points
|
||||
timestamp := *v.Timestamp
|
||||
if _, ok := lastTimestamp[*s]; ok {
|
||||
nextTimestampFromLast := lastTimestamp[*s].Add(time.Duration(query.Period) * time.Second)
|
||||
for timestamp.After(nextTimestampFromLast) {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(nextTimestampFromLast.Unix()*1000)))
|
||||
nextTimestampFromLast = nextTimestampFromLast.Add(time.Duration(query.Period) * time.Second)
|
||||
}
|
||||
}
|
||||
lastTimestamp[*s] = timestamp
|
||||
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), float64(timestamp.Unix()*1000)))
|
||||
}
|
||||
|
||||
queryRes.Series = append(queryRes.Series, &series)
|
||||
queryRes.Meta = simplejson.New()
|
||||
if len(resp.Datapoints) > 0 && resp.Datapoints[0].Unit != nil {
|
||||
if unit, ok := cloudwatchUnitMappings[*resp.Datapoints[0].Unit]; ok {
|
||||
queryRes.Meta.Set("unit", unit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return queryRes, nil
|
||||
}
|
||||
187
pkg/tsdb/cloudwatch/get_metric_statistics_test.go
Normal file
187
pkg/tsdb/cloudwatch/get_metric_statistics_test.go
Normal file
@@ -0,0 +1,187 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
func TestCloudWatchGetMetricStatistics(t *testing.T) {
|
||||
Convey("CloudWatchGetMetricStatistics", t, func() {
|
||||
|
||||
Convey("can parse cloudwatch json model", func() {
|
||||
json := `
|
||||
{
|
||||
"region": "us-east-1",
|
||||
"namespace": "AWS/ApplicationELB",
|
||||
"metricName": "TargetResponseTime",
|
||||
"dimensions": {
|
||||
"LoadBalancer": "lb",
|
||||
"TargetGroup": "tg"
|
||||
},
|
||||
"statistics": [
|
||||
"Average",
|
||||
"Maximum",
|
||||
"p50.00",
|
||||
"p90.00"
|
||||
],
|
||||
"period": "60",
|
||||
"highResolution": false,
|
||||
"alias": "{{metric}}_{{stat}}"
|
||||
}
|
||||
`
|
||||
modelJson, err := simplejson.NewJson([]byte(json))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
res, err := parseQuery(modelJson)
|
||||
So(err, ShouldBeNil)
|
||||
So(res.Region, ShouldEqual, "us-east-1")
|
||||
So(res.Namespace, ShouldEqual, "AWS/ApplicationELB")
|
||||
So(res.MetricName, ShouldEqual, "TargetResponseTime")
|
||||
So(len(res.Dimensions), ShouldEqual, 2)
|
||||
So(*res.Dimensions[0].Name, ShouldEqual, "LoadBalancer")
|
||||
So(*res.Dimensions[0].Value, ShouldEqual, "lb")
|
||||
So(*res.Dimensions[1].Name, ShouldEqual, "TargetGroup")
|
||||
So(*res.Dimensions[1].Value, ShouldEqual, "tg")
|
||||
So(len(res.Statistics), ShouldEqual, 2)
|
||||
So(*res.Statistics[0], ShouldEqual, "Average")
|
||||
So(*res.Statistics[1], ShouldEqual, "Maximum")
|
||||
So(len(res.ExtendedStatistics), ShouldEqual, 2)
|
||||
So(*res.ExtendedStatistics[0], ShouldEqual, "p50.00")
|
||||
So(*res.ExtendedStatistics[1], ShouldEqual, "p90.00")
|
||||
So(res.Period, ShouldEqual, 60)
|
||||
So(res.Alias, ShouldEqual, "{{metric}}_{{stat}}")
|
||||
})
|
||||
|
||||
Convey("can parse cloudwatch response", func() {
|
||||
timestamp := time.Unix(0, 0)
|
||||
resp := &cloudwatch.GetMetricStatisticsOutput{
|
||||
Label: aws.String("TargetResponseTime"),
|
||||
Datapoints: []*cloudwatch.Datapoint{
|
||||
{
|
||||
Timestamp: aws.Time(timestamp),
|
||||
Average: aws.Float64(10.0),
|
||||
Maximum: aws.Float64(20.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(30.0),
|
||||
"p90.00": aws.Float64(40.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
},
|
||||
}
|
||||
query := &CloudWatchQuery{
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/ApplicationELB",
|
||||
MetricName: "TargetResponseTime",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("LoadBalancer"),
|
||||
Value: aws.String("lb"),
|
||||
},
|
||||
{
|
||||
Name: aws.String("TargetGroup"),
|
||||
Value: aws.String("tg"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average"), aws.String("Maximum")},
|
||||
ExtendedStatistics: []*string{aws.String("p50.00"), aws.String("p90.00")},
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
So(err, ShouldBeNil)
|
||||
So(queryRes.Series[0].Name, ShouldEqual, "AWS/ApplicationELB_TargetResponseTime_Average")
|
||||
So(queryRes.Series[0].Tags["LoadBalancer"], ShouldEqual, "lb")
|
||||
So(queryRes.Series[0].Tags["TargetGroup"], ShouldEqual, "tg")
|
||||
So(queryRes.Series[0].Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
|
||||
So(queryRes.Series[1].Points[0][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[2].Points[0][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[3].Points[0][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Meta.Get("unit").MustString(), ShouldEqual, "s")
|
||||
})
|
||||
|
||||
Convey("terminate gap of data points", func() {
|
||||
timestamp := time.Unix(0, 0)
|
||||
resp := &cloudwatch.GetMetricStatisticsOutput{
|
||||
Label: aws.String("TargetResponseTime"),
|
||||
Datapoints: []*cloudwatch.Datapoint{
|
||||
{
|
||||
Timestamp: aws.Time(timestamp),
|
||||
Average: aws.Float64(10.0),
|
||||
Maximum: aws.Float64(20.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(30.0),
|
||||
"p90.00": aws.Float64(40.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
{
|
||||
Timestamp: aws.Time(timestamp.Add(60 * time.Second)),
|
||||
Average: aws.Float64(20.0),
|
||||
Maximum: aws.Float64(30.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(40.0),
|
||||
"p90.00": aws.Float64(50.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
{
|
||||
Timestamp: aws.Time(timestamp.Add(180 * time.Second)),
|
||||
Average: aws.Float64(30.0),
|
||||
Maximum: aws.Float64(40.0),
|
||||
ExtendedStatistics: map[string]*float64{
|
||||
"p50.00": aws.Float64(50.0),
|
||||
"p90.00": aws.Float64(60.0),
|
||||
},
|
||||
Unit: aws.String("Seconds"),
|
||||
},
|
||||
},
|
||||
}
|
||||
query := &CloudWatchQuery{
|
||||
Region: "us-east-1",
|
||||
Namespace: "AWS/ApplicationELB",
|
||||
MetricName: "TargetResponseTime",
|
||||
Dimensions: []*cloudwatch.Dimension{
|
||||
{
|
||||
Name: aws.String("LoadBalancer"),
|
||||
Value: aws.String("lb"),
|
||||
},
|
||||
{
|
||||
Name: aws.String("TargetGroup"),
|
||||
Value: aws.String("tg"),
|
||||
},
|
||||
},
|
||||
Statistics: []*string{aws.String("Average"), aws.String("Maximum")},
|
||||
ExtendedStatistics: []*string{aws.String("p50.00"), aws.String("p90.00")},
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
|
||||
queryRes, err := parseResponse(resp, query)
|
||||
So(err, ShouldBeNil)
|
||||
So(queryRes.Series[0].Points[0][0].String(), ShouldEqual, null.FloatFrom(10.0).String())
|
||||
So(queryRes.Series[1].Points[0][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[2].Points[0][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[3].Points[0][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[0].Points[1][0].String(), ShouldEqual, null.FloatFrom(20.0).String())
|
||||
So(queryRes.Series[1].Points[1][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[2].Points[1][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[3].Points[1][0].String(), ShouldEqual, null.FloatFrom(50.0).String())
|
||||
So(queryRes.Series[0].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[1].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[2].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[3].Points[2][0].String(), ShouldEqual, null.FloatFromPtr(nil).String())
|
||||
So(queryRes.Series[0].Points[3][0].String(), ShouldEqual, null.FloatFrom(30.0).String())
|
||||
So(queryRes.Series[1].Points[3][0].String(), ShouldEqual, null.FloatFrom(40.0).String())
|
||||
So(queryRes.Series[2].Points[3][0].String(), ShouldEqual, null.FloatFrom(50.0).String())
|
||||
So(queryRes.Series[3].Points[3][0].String(), ShouldEqual, null.FloatFrom(60.0).String())
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -66,6 +66,7 @@
|
||||
<li>{{namespace}}</li>
|
||||
<li>{{region}}</li>
|
||||
<li>{{period}}</li>
|
||||
<li>{{label}}</li>
|
||||
<li>{{YOUR_DIMENSION_NAME}}</li>
|
||||
</ul>
|
||||
</info-popover>
|
||||
|
||||
Reference in New Issue
Block a user