grafana/pkg/tsdb/cloudwatch/cloudwatch.go

614 lines
16 KiB
Go
Raw Normal View History

2017-04-03 07:50:40 -05:00
package cloudwatch
import (
"context"
"errors"
2018-06-15 08:48:25 -05:00
"fmt"
2017-04-03 07:50:40 -05:00
"regexp"
"sort"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/log"
"github.com/grafana/grafana/pkg/models"
2017-09-27 12:31:09 -05:00
"github.com/grafana/grafana/pkg/setting"
2017-04-03 07:50:40 -05:00
"github.com/grafana/grafana/pkg/tsdb"
2018-04-16 02:50:13 -05:00
"golang.org/x/sync/errgroup"
2017-04-03 07:50:40 -05:00
"github.com/aws/aws-sdk-go/aws"
2018-07-24 02:58:48 -05:00
"github.com/aws/aws-sdk-go/aws/awserr"
2017-04-03 07:50:40 -05:00
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
2017-04-03 07:50:40 -05:00
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/metrics"
2017-04-03 07:50:40 -05:00
)
type CloudWatchExecutor struct {
*models.DataSource
ec2Svc ec2iface.EC2API
rgtaSvc resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
2017-04-03 07:50:40 -05:00
}
2017-09-26 04:30:40 -05:00
type DatasourceInfo struct {
Profile string
Region string
AuthType string
AssumeRoleArn string
Namespace string
AccessKey string
SecretKey string
}
func NewCloudWatchExecutor(dsInfo *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return &CloudWatchExecutor{}, nil
2017-04-03 07:50:40 -05:00
}
var (
plog log.Logger
standardStatistics map[string]bool
aliasFormat *regexp.Regexp
)
func init() {
plog = log.New("tsdb.cloudwatch")
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", NewCloudWatchExecutor)
2017-04-03 07:50:40 -05:00
standardStatistics = map[string]bool{
"Average": true,
"Maximum": true,
"Minimum": true,
"Sum": true,
"SampleCount": true,
}
aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
}
func (e *CloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSource, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
var result *tsdb.Response
e.DataSource = dsInfo
queryType := queryContext.Queries[0].Model.Get("type").MustString("")
var err error
2017-09-09 14:24:39 -05:00
switch queryType {
case "metricFindQuery":
result, err = e.executeMetricFindQuery(ctx, queryContext)
2017-09-25 04:16:40 -05:00
case "annotationQuery":
result, err = e.executeAnnotationQuery(ctx, queryContext)
2017-09-23 22:30:34 -05:00
case "timeSeriesQuery":
fallthrough
default:
2017-09-23 22:30:34 -05:00
result, err = e.executeTimeSeriesQuery(ctx, queryContext)
2017-09-09 14:24:39 -05:00
}
return result, err
2017-09-09 14:24:39 -05:00
}
func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
2018-10-15 21:39:10 -05:00
results := &tsdb.Response{
Results: make(map[string]*tsdb.QueryResult),
2017-04-03 07:50:40 -05:00
}
2018-10-15 21:39:10 -05:00
resultChan := make(chan *tsdb.QueryResult, len(queryContext.Queries))
2017-04-03 07:50:40 -05:00
2018-04-16 02:50:13 -05:00
eg, ectx := errgroup.WithContext(ctx)
2017-04-03 07:50:40 -05:00
2018-04-16 02:50:13 -05:00
getMetricDataQueries := make(map[string]map[string]*CloudWatchQuery)
for i, model := range queryContext.Queries {
queryType := model.Model.Get("type").MustString()
2017-09-27 13:00:17 -05:00
if queryType != "timeSeriesQuery" && queryType != "" {
continue
}
2018-04-16 02:50:13 -05:00
2018-07-24 21:27:43 -05:00
RefId := queryContext.Queries[i].RefId
2018-04-16 02:50:13 -05:00
query, err := parseQuery(queryContext.Queries[i].Model)
if err != nil {
2018-10-15 21:39:10 -05:00
results.Results[RefId] = &tsdb.QueryResult{
2018-07-24 02:58:48 -05:00
Error: err,
}
2018-10-15 21:39:10 -05:00
return results, nil
2018-04-16 02:50:13 -05:00
}
2018-07-24 21:27:43 -05:00
query.RefId = RefId
2018-04-16 02:50:13 -05:00
if query.Id != "" {
if _, ok := getMetricDataQueries[query.Region]; !ok {
getMetricDataQueries[query.Region] = make(map[string]*CloudWatchQuery)
}
getMetricDataQueries[query.Region][query.Id] = query
continue
}
2018-06-22 02:35:17 -05:00
if query.Id == "" && query.Expression != "" {
2018-10-15 21:39:10 -05:00
results.Results[query.RefId] = &tsdb.QueryResult{
2018-07-24 02:58:48 -05:00
Error: fmt.Errorf("Invalid query: id should be set if using expression"),
}
2018-10-15 21:39:10 -05:00
return results, nil
2018-06-22 02:35:17 -05:00
}
2018-04-16 02:50:13 -05:00
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &tsdb.QueryResult{
RefId: query.RefId,
Error: theErr,
}
}
}
}()
2018-04-16 02:50:13 -05:00
queryRes, err := e.executeQuery(ectx, query, queryContext)
2018-07-24 02:58:48 -05:00
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
2018-04-16 02:50:13 -05:00
return err
2017-04-03 07:50:40 -05:00
}
2018-07-24 02:58:48 -05:00
if err != nil {
2018-10-15 21:39:10 -05:00
resultChan <- &tsdb.QueryResult{
RefId: query.RefId,
2018-10-09 23:56:58 -05:00
Error: err,
}
return nil
2018-07-24 02:58:48 -05:00
}
2018-10-15 21:39:10 -05:00
resultChan <- queryRes
2018-04-16 02:50:13 -05:00
return nil
})
2017-04-03 07:50:40 -05:00
}
2018-04-16 02:50:13 -05:00
if len(getMetricDataQueries) > 0 {
for region, getMetricDataQuery := range getMetricDataQueries {
q := getMetricDataQuery
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
plog.Error("Execute Get Metric Data Query Panic", "error", err, "stack", log.Stack(1))
if theErr, ok := err.(error); ok {
resultChan <- &tsdb.QueryResult{
Error: theErr,
}
}
}
}()
2018-04-16 02:50:13 -05:00
queryResponses, err := e.executeGetMetricDataQuery(ectx, region, q, queryContext)
2018-07-24 02:58:48 -05:00
if ae, ok := err.(awserr.Error); ok && ae.Code() == "500" {
2018-04-16 02:50:13 -05:00
return err
}
for _, queryRes := range queryResponses {
2018-07-24 02:58:48 -05:00
if err != nil {
2018-10-15 21:39:10 -05:00
queryRes.Error = err
2018-07-24 02:58:48 -05:00
}
2018-10-15 21:39:10 -05:00
resultChan <- queryRes
2018-04-16 02:50:13 -05:00
}
return nil
})
2017-04-03 07:50:40 -05:00
}
}
2018-04-16 02:50:13 -05:00
if err := eg.Wait(); err != nil {
2017-04-03 07:50:40 -05:00
return nil, err
}
2018-10-15 21:39:10 -05:00
close(resultChan)
for result := range resultChan {
results.Results[result.RefId] = result
}
2017-04-03 07:50:40 -05:00
2018-10-15 21:39:10 -05:00
return results, nil
2018-04-16 02:50:13 -05:00
}
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
2017-04-03 07:50:40 -05:00
client, err := e.getClient(query.Region)
if err != nil {
return nil, err
}
startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return nil, err
}
endTime, err := queryContext.TimeRange.ParseTo()
if err != nil {
return nil, err
}
2018-11-21 10:51:02 -06:00
if !startTime.Before(endTime) {
return nil, fmt.Errorf("Invalid time range: Start time must be before end time")
2018-06-15 08:48:25 -05:00
}
2017-04-03 07:50:40 -05:00
params := &cloudwatch.GetMetricStatisticsInput{
Namespace: aws.String(query.Namespace),
MetricName: aws.String(query.MetricName),
Dimensions: query.Dimensions,
Period: aws.Int64(int64(query.Period)),
}
if len(query.Statistics) > 0 {
params.Statistics = query.Statistics
}
if len(query.ExtendedStatistics) > 0 {
params.ExtendedStatistics = query.ExtendedStatistics
}
Fix misspell issues See, $ gometalinter --disable-all --enable misspell --deadline 10m --vendor ./... pkg/api/dtos/alerting_test.go:32:13:warning: "expectes" is a misspelling of "expects" (misspell) pkg/api/static/static.go:2:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/components/imguploader/azureblobuploader.go:55:48:warning: "conatiner" is a misspelling of "container" (misspell) pkg/login/ldap_settings.go:51:115:warning: "compatability" is a misspelling of "compatibility" (misspell) pkg/middleware/auth_proxy_test.go:122:22:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/middleware/logger.go:2:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/services/notifications/codes.go:9:13:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/services/session/mysql.go:170:3:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/mysql.go:171:24:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:95:4:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:96:1:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:167:25:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/setting/setting.go:1:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/tsdb/cloudwatch/cloudwatch.go:199:14:warning: "resolutin" is a misspelling of "resolutions" (misspell) pkg/tsdb/cloudwatch/cloudwatch.go:270:15:warning: "resolutin" is a misspelling of "resolutions" (misspell) pkg/tsdb/elasticsearch/response_parser.go:531:24:warning: "Unkown" is a misspelling of "Unknown" (misspell) pkg/tsdb/elasticsearch/client/search_request.go:113:7:warning: "initaite" is a misspelling of "initiate" (misspell) Note: Unknwon is a library name, and Destory a mysql typo.
2018-09-21 03:44:53 -05:00
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
return nil, errors.New("too long query period")
2017-09-27 12:31:09 -05:00
}
var resp *cloudwatch.GetMetricStatisticsOutput
for startTime.Before(endTime) {
params.StartTime = aws.Time(startTime)
if query.HighResolution {
startTime = startTime.Add(time.Duration(1440*query.Period) * time.Second)
} else {
startTime = endTime
}
params.EndTime = aws.Time(startTime)
2017-09-27 12:31:09 -05:00
if setting.Env == setting.DEV {
plog.Debug("CloudWatch query", "raw query", params)
}
partResp, err := client.GetMetricStatisticsWithContext(ctx, params, request.WithResponseReadTimeout(10*time.Second))
if err != nil {
return nil, err
}
if resp != nil {
resp.Datapoints = append(resp.Datapoints, partResp.Datapoints...)
} else {
resp = partResp
}
metrics.M_Aws_CloudWatch_GetMetricStatistics.Inc()
2017-04-03 07:50:40 -05:00
}
queryRes, err := parseResponse(resp, query)
if err != nil {
return nil, err
}
return queryRes, nil
}
2018-04-16 02:50:13 -05:00
func (e *CloudWatchExecutor) executeGetMetricDataQuery(ctx context.Context, region string, queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) ([]*tsdb.QueryResult, error) {
queryResponses := make([]*tsdb.QueryResult, 0)
// validate query
for _, query := range queries {
if !(len(query.Statistics) == 1 && len(query.ExtendedStatistics) == 0) &&
!(len(query.Statistics) == 0 && len(query.ExtendedStatistics) == 1) {
return queryResponses, errors.New("Statistics count should be 1")
}
}
client, err := e.getClient(region)
if err != nil {
return queryResponses, err
}
startTime, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return queryResponses, err
}
endTime, err := queryContext.TimeRange.ParseTo()
if err != nil {
return queryResponses, err
}
params := &cloudwatch.GetMetricDataInput{
StartTime: aws.Time(startTime),
EndTime: aws.Time(endTime),
ScanBy: aws.String("TimestampAscending"),
}
for _, query := range queries {
Fix misspell issues See, $ gometalinter --disable-all --enable misspell --deadline 10m --vendor ./... pkg/api/dtos/alerting_test.go:32:13:warning: "expectes" is a misspelling of "expects" (misspell) pkg/api/static/static.go:2:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/components/imguploader/azureblobuploader.go:55:48:warning: "conatiner" is a misspelling of "container" (misspell) pkg/login/ldap_settings.go:51:115:warning: "compatability" is a misspelling of "compatibility" (misspell) pkg/middleware/auth_proxy_test.go:122:22:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/middleware/logger.go:2:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/services/notifications/codes.go:9:13:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/services/session/mysql.go:170:3:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/mysql.go:171:24:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:95:4:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:96:1:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/services/session/session.go:167:25:warning: "Destory" is a misspelling of "Destroy" (misspell) pkg/setting/setting.go:1:18:warning: "Unknwon" is a misspelling of "Unknown" (misspell) pkg/tsdb/cloudwatch/cloudwatch.go:199:14:warning: "resolutin" is a misspelling of "resolutions" (misspell) pkg/tsdb/cloudwatch/cloudwatch.go:270:15:warning: "resolutin" is a misspelling of "resolutions" (misspell) pkg/tsdb/elasticsearch/response_parser.go:531:24:warning: "Unkown" is a misspelling of "Unknown" (misspell) pkg/tsdb/elasticsearch/client/search_request.go:113:7:warning: "initaite" is a misspelling of "initiate" (misspell) Note: Unknwon is a library name, and Destory a mysql typo.
2018-09-21 03:44:53 -05:00
// 1 minutes resolution metrics is stored for 15 days, 15 * 24 * 60 = 21600
2018-04-16 02:50:13 -05:00
if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
2018-10-09 23:56:58 -05:00
return queryResponses, errors.New("too long query period")
2018-04-16 02:50:13 -05:00
}
mdq := &cloudwatch.MetricDataQuery{
Id: aws.String(query.Id),
ReturnData: aws.Bool(query.ReturnData),
}
if query.Expression != "" {
mdq.Expression = aws.String(query.Expression)
} else {
mdq.MetricStat = &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Namespace: aws.String(query.Namespace),
MetricName: aws.String(query.MetricName),
},
Period: aws.Int64(int64(query.Period)),
}
for _, d := range query.Dimensions {
mdq.MetricStat.Metric.Dimensions = append(mdq.MetricStat.Metric.Dimensions,
&cloudwatch.Dimension{
Name: d.Name,
Value: d.Value,
})
}
if len(query.Statistics) == 1 {
mdq.MetricStat.Stat = query.Statistics[0]
} else {
mdq.MetricStat.Stat = query.ExtendedStatistics[0]
}
}
params.MetricDataQueries = append(params.MetricDataQueries, mdq)
}
nextToken := ""
mdr := make(map[string]*cloudwatch.MetricDataResult)
for {
if nextToken != "" {
params.NextToken = aws.String(nextToken)
}
resp, err := client.GetMetricDataWithContext(ctx, params)
if err != nil {
return queryResponses, err
}
metrics.M_Aws_CloudWatch_GetMetricData.Add(float64(len(params.MetricDataQueries)))
for _, r := range resp.MetricDataResults {
if _, ok := mdr[*r.Id]; !ok {
mdr[*r.Id] = r
} else {
mdr[*r.Id].Timestamps = append(mdr[*r.Id].Timestamps, r.Timestamps...)
mdr[*r.Id].Values = append(mdr[*r.Id].Values, r.Values...)
}
}
if resp.NextToken == nil || *resp.NextToken == "" {
break
}
nextToken = *resp.NextToken
}
for i, r := range mdr {
if *r.StatusCode != "Complete" {
return queryResponses, fmt.Errorf("Part of query is failed: %s", *r.StatusCode)
}
queryRes := tsdb.NewQueryResult()
queryRes.RefId = queries[i].RefId
query := queries[*r.Id]
series := tsdb.TimeSeries{
Tags: map[string]string{},
Points: make([]tsdb.TimePoint, 0),
}
for _, d := range query.Dimensions {
series.Tags[*d.Name] = *d.Value
}
s := ""
if len(query.Statistics) == 1 {
s = *query.Statistics[0]
} else {
s = *query.ExtendedStatistics[0]
}
series.Name = formatAlias(query, s, series.Tags)
for j, t := range r.Timestamps {
expectedTimestamp := r.Timestamps[j].Add(time.Duration(query.Period) * time.Second)
if j > 0 && expectedTimestamp.Before(*t) {
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
}
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*r.Values[j]), float64((*t).Unix())*1000))
}
queryRes.Series = append(queryRes.Series, &series)
queryRes.Meta = simplejson.New()
2018-04-16 02:50:13 -05:00
queryResponses = append(queryResponses, queryRes)
}
return queryResponses, nil
}
2017-04-03 07:50:40 -05:00
func parseDimensions(model *simplejson.Json) ([]*cloudwatch.Dimension, error) {
var result []*cloudwatch.Dimension
for k, v := range model.Get("dimensions").MustMap() {
kk := k
if vv, ok := v.(string); ok {
result = append(result, &cloudwatch.Dimension{
Name: &kk,
Value: &vv,
})
} else {
return nil, errors.New("failed to parse")
}
}
sort.Slice(result, func(i, j int) bool {
return *result[i].Name < *result[j].Name
})
return result, nil
}
func parseStatistics(model *simplejson.Json) ([]string, []string, error) {
var statistics []string
var extendedStatistics []string
2017-04-03 07:50:40 -05:00
for _, s := range model.Get("statistics").MustArray() {
if ss, ok := s.(string); ok {
if _, isStandard := standardStatistics[ss]; isStandard {
statistics = append(statistics, ss)
2017-04-03 07:50:40 -05:00
} else {
extendedStatistics = append(extendedStatistics, ss)
2017-04-03 07:50:40 -05:00
}
} else {
return nil, nil, errors.New("failed to parse")
}
}
return statistics, extendedStatistics, nil
}
func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
region, err := model.Get("region").String()
if err != nil {
return nil, err
}
namespace, err := model.Get("namespace").String()
if err != nil {
return nil, err
}
metricName, err := model.Get("metricName").String()
if err != nil {
return nil, err
}
2018-04-16 02:50:13 -05:00
id := model.Get("id").MustString("")
expression := model.Get("expression").MustString("")
2017-04-03 07:50:40 -05:00
dimensions, err := parseDimensions(model)
if err != nil {
return nil, err
}
statistics, extendedStatistics, err := parseStatistics(model)
if err != nil {
return nil, err
}
p := model.Get("period").MustString("")
if p == "" {
if namespace == "AWS/EC2" {
p = "300"
} else {
p = "60"
}
}
2017-09-07 01:56:16 -05:00
2018-04-22 13:51:58 -05:00
var period int
2017-09-07 01:56:16 -05:00
if regexp.MustCompile(`^\d+$`).Match([]byte(p)) {
period, err = strconv.Atoi(p)
if err != nil {
return nil, err
}
} else {
d, err := time.ParseDuration(p)
if err != nil {
return nil, err
}
period = int(d.Seconds())
2017-04-03 07:50:40 -05:00
}
2017-11-01 04:48:07 -05:00
alias := model.Get("alias").MustString()
2017-04-03 07:50:40 -05:00
2018-04-16 02:50:13 -05:00
returnData := model.Get("returnData").MustBool(false)
highResolution := model.Get("highResolution").MustBool(false)
2017-04-03 07:50:40 -05:00
return &CloudWatchQuery{
Region: region,
Namespace: namespace,
MetricName: metricName,
Dimensions: dimensions,
Statistics: aws.StringSlice(statistics),
ExtendedStatistics: aws.StringSlice(extendedStatistics),
2017-04-03 07:50:40 -05:00
Period: period,
Alias: alias,
2018-04-16 02:50:13 -05:00
Id: id,
Expression: expression,
ReturnData: returnData,
HighResolution: highResolution,
2017-04-03 07:50:40 -05:00
}, nil
}
func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string) string {
2018-04-16 02:50:13 -05:00
if len(query.Id) > 0 && len(query.Expression) > 0 {
2019-02-27 02:20:29 -06:00
if len(query.Alias) > 0 {
return query.Alias
} else {
return query.Id
}
2018-04-16 02:50:13 -05:00
}
2017-04-03 07:50:40 -05:00
data := map[string]string{}
data["region"] = query.Region
data["namespace"] = query.Namespace
data["metric"] = query.MetricName
data["stat"] = stat
2017-11-01 04:47:21 -05:00
data["period"] = strconv.Itoa(query.Period)
2017-04-03 07:50:40 -05:00
for k, v := range dimensions {
data[k] = v
}
result := aliasFormat.ReplaceAllFunc([]byte(query.Alias), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := data[labelName]; exists {
return []byte(val)
}
return in
})
return string(result)
}
func parseResponse(resp *cloudwatch.GetMetricStatisticsOutput, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
queryRes := tsdb.NewQueryResult()
2018-04-16 02:50:13 -05:00
queryRes.RefId = query.RefId
2017-04-03 07:50:40 -05:00
var value float64
for _, s := range append(query.Statistics, query.ExtendedStatistics...) {
series := tsdb.TimeSeries{
2017-11-11 09:48:54 -06:00
Tags: map[string]string{},
Points: make([]tsdb.TimePoint, 0),
2017-04-03 07:50:40 -05:00
}
for _, d := range query.Dimensions {
series.Tags[*d.Name] = *d.Value
}
series.Name = formatAlias(query, *s, series.Tags)
lastTimestamp := make(map[string]time.Time)
sort.Slice(resp.Datapoints, func(i, j int) bool {
return (*resp.Datapoints[i].Timestamp).Before(*resp.Datapoints[j].Timestamp)
})
for _, v := range resp.Datapoints {
switch *s {
case "Average":
value = *v.Average
case "Maximum":
value = *v.Maximum
case "Minimum":
value = *v.Minimum
case "Sum":
value = *v.Sum
case "SampleCount":
value = *v.SampleCount
default:
if strings.Index(*s, "p") == 0 && v.ExtendedStatistics[*s] != nil {
value = *v.ExtendedStatistics[*s]
}
}
// terminate gap of data points
timestamp := *v.Timestamp
if _, ok := lastTimestamp[*s]; ok {
nextTimestampFromLast := lastTimestamp[*s].Add(time.Duration(query.Period) * time.Second)
for timestamp.After(nextTimestampFromLast) {
2017-04-03 07:50:40 -05:00
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(nextTimestampFromLast.Unix()*1000)))
nextTimestampFromLast = nextTimestampFromLast.Add(time.Duration(query.Period) * time.Second)
2017-04-03 07:50:40 -05:00
}
}
lastTimestamp[*s] = timestamp
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(value), float64(timestamp.Unix()*1000)))
}
queryRes.Series = append(queryRes.Series, &series)
queryRes.Meta = simplejson.New()
if len(resp.Datapoints) > 0 && resp.Datapoints[0].Unit != nil {
if unit, ok := cloudwatchUnitMappings[*resp.Datapoints[0].Unit]; ok {
queryRes.Meta.Set("unit", unit)
}
}
2017-04-03 07:50:40 -05:00
}
return queryRes, nil
}