mirror of
				https://github.com/grafana/grafana.git
				synced 2025-02-25 18:55:37 -06:00 
			
		
		
		
	support GetMetricData
This commit is contained in:
		@@ -14,6 +14,7 @@ import (
 | 
			
		||||
	"github.com/grafana/grafana/pkg/models"
 | 
			
		||||
	"github.com/grafana/grafana/pkg/setting"
 | 
			
		||||
	"github.com/grafana/grafana/pkg/tsdb"
 | 
			
		||||
	"golang.org/x/sync/errgroup"
 | 
			
		||||
 | 
			
		||||
	"github.com/aws/aws-sdk-go/aws"
 | 
			
		||||
	"github.com/aws/aws-sdk-go/aws/request"
 | 
			
		||||
@@ -88,48 +89,63 @@ func (e *CloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryCo
 | 
			
		||||
		Results: make(map[string]*tsdb.QueryResult),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	errCh := make(chan error, 1)
 | 
			
		||||
	resCh := make(chan *tsdb.QueryResult, 1)
 | 
			
		||||
	eg, ectx := errgroup.WithContext(ctx)
 | 
			
		||||
 | 
			
		||||
	currentlyExecuting := 0
 | 
			
		||||
	getMetricDataQueries := make(map[string]map[string]*CloudWatchQuery)
 | 
			
		||||
	for i, model := range queryContext.Queries {
 | 
			
		||||
		queryType := model.Model.Get("type").MustString()
 | 
			
		||||
		if queryType != "timeSeriesQuery" && queryType != "" {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		currentlyExecuting++
 | 
			
		||||
		go func(refId string, index int) {
 | 
			
		||||
			queryRes, err := e.executeQuery(ctx, queryContext.Queries[index].Model, queryContext)
 | 
			
		||||
			currentlyExecuting--
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				errCh <- err
 | 
			
		||||
			} else {
 | 
			
		||||
				queryRes.RefId = refId
 | 
			
		||||
				resCh <- queryRes
 | 
			
		||||
 | 
			
		||||
		query, err := parseQuery(queryContext.Queries[i].Model)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		query.RefId = queryContext.Queries[i].RefId
 | 
			
		||||
 | 
			
		||||
		if query.Id != "" {
 | 
			
		||||
			if _, ok := getMetricDataQueries[query.Region]; !ok {
 | 
			
		||||
				getMetricDataQueries[query.Region] = make(map[string]*CloudWatchQuery)
 | 
			
		||||
			}
 | 
			
		||||
		}(model.RefId, i)
 | 
			
		||||
			getMetricDataQueries[query.Region][query.Id] = query
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		eg.Go(func() error {
 | 
			
		||||
			queryRes, err := e.executeQuery(ectx, query, queryContext)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
			result.Results[queryRes.RefId] = queryRes
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for currentlyExecuting != 0 {
 | 
			
		||||
		select {
 | 
			
		||||
		case res := <-resCh:
 | 
			
		||||
			result.Results[res.RefId] = res
 | 
			
		||||
		case err := <-errCh:
 | 
			
		||||
			return result, err
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			return result, ctx.Err()
 | 
			
		||||
	if len(getMetricDataQueries) > 0 {
 | 
			
		||||
		for region, getMetricDataQuery := range getMetricDataQueries {
 | 
			
		||||
			q := getMetricDataQuery
 | 
			
		||||
			eg.Go(func() error {
 | 
			
		||||
				queryResponses, err := e.executeGetMetricDataQuery(ectx, region, q, queryContext)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					return err
 | 
			
		||||
				}
 | 
			
		||||
				for _, queryRes := range queryResponses {
 | 
			
		||||
					result.Results[queryRes.RefId] = queryRes
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := eg.Wait(); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, parameters *simplejson.Json, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
 | 
			
		||||
	query, err := parseQuery(parameters)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func (e *CloudWatchExecutor) executeQuery(ctx context.Context, query *CloudWatchQuery, queryContext *tsdb.TsdbQuery) (*tsdb.QueryResult, error) {
 | 
			
		||||
	client, err := e.getClient(query.Region)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -201,6 +217,139 @@ func (e *CloudWatchExecutor) executeQuery(ctx context.Context, parameters *simpl
 | 
			
		||||
	return queryRes, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e *CloudWatchExecutor) executeGetMetricDataQuery(ctx context.Context, region string, queries map[string]*CloudWatchQuery, queryContext *tsdb.TsdbQuery) ([]*tsdb.QueryResult, error) {
 | 
			
		||||
	queryResponses := make([]*tsdb.QueryResult, 0)
 | 
			
		||||
 | 
			
		||||
	// validate query
 | 
			
		||||
	for _, query := range queries {
 | 
			
		||||
		if !(len(query.Statistics) == 1 && len(query.ExtendedStatistics) == 0) &&
 | 
			
		||||
			!(len(query.Statistics) == 0 && len(query.ExtendedStatistics) == 1) {
 | 
			
		||||
			return queryResponses, errors.New("Statistics count should be 1")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	client, err := e.getClient(region)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return queryResponses, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	startTime, err := queryContext.TimeRange.ParseFrom()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return queryResponses, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	endTime, err := queryContext.TimeRange.ParseTo()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return queryResponses, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	params := &cloudwatch.GetMetricDataInput{
 | 
			
		||||
		StartTime: aws.Time(startTime),
 | 
			
		||||
		EndTime:   aws.Time(endTime),
 | 
			
		||||
		ScanBy:    aws.String("TimestampAscending"),
 | 
			
		||||
	}
 | 
			
		||||
	for _, query := range queries {
 | 
			
		||||
		// 1 minutes resolutin metrics is stored for 15 days, 15 * 24 * 60 = 21600
 | 
			
		||||
		if query.HighResolution && (((endTime.Unix() - startTime.Unix()) / int64(query.Period)) > 21600) {
 | 
			
		||||
			return nil, errors.New("too long query period")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		mdq := &cloudwatch.MetricDataQuery{
 | 
			
		||||
			Id:         aws.String(query.Id),
 | 
			
		||||
			ReturnData: aws.Bool(query.ReturnData),
 | 
			
		||||
		}
 | 
			
		||||
		if query.Expression != "" {
 | 
			
		||||
			mdq.Expression = aws.String(query.Expression)
 | 
			
		||||
		} else {
 | 
			
		||||
			mdq.MetricStat = &cloudwatch.MetricStat{
 | 
			
		||||
				Metric: &cloudwatch.Metric{
 | 
			
		||||
					Namespace:  aws.String(query.Namespace),
 | 
			
		||||
					MetricName: aws.String(query.MetricName),
 | 
			
		||||
				},
 | 
			
		||||
				Period: aws.Int64(int64(query.Period)),
 | 
			
		||||
			}
 | 
			
		||||
			for _, d := range query.Dimensions {
 | 
			
		||||
				mdq.MetricStat.Metric.Dimensions = append(mdq.MetricStat.Metric.Dimensions,
 | 
			
		||||
					&cloudwatch.Dimension{
 | 
			
		||||
						Name:  d.Name,
 | 
			
		||||
						Value: d.Value,
 | 
			
		||||
					})
 | 
			
		||||
			}
 | 
			
		||||
			if len(query.Statistics) == 1 {
 | 
			
		||||
				mdq.MetricStat.Stat = query.Statistics[0]
 | 
			
		||||
			} else {
 | 
			
		||||
				mdq.MetricStat.Stat = query.ExtendedStatistics[0]
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		params.MetricDataQueries = append(params.MetricDataQueries, mdq)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	nextToken := ""
 | 
			
		||||
	mdr := make(map[string]*cloudwatch.MetricDataResult)
 | 
			
		||||
	for {
 | 
			
		||||
		if nextToken != "" {
 | 
			
		||||
			params.NextToken = aws.String(nextToken)
 | 
			
		||||
		}
 | 
			
		||||
		resp, err := client.GetMetricDataWithContext(ctx, params)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return queryResponses, err
 | 
			
		||||
		}
 | 
			
		||||
		metrics.M_Aws_CloudWatch_GetMetricData.Add(float64(len(params.MetricDataQueries)))
 | 
			
		||||
 | 
			
		||||
		for _, r := range resp.MetricDataResults {
 | 
			
		||||
			if _, ok := mdr[*r.Id]; !ok {
 | 
			
		||||
				mdr[*r.Id] = r
 | 
			
		||||
			} else {
 | 
			
		||||
				mdr[*r.Id].Timestamps = append(mdr[*r.Id].Timestamps, r.Timestamps...)
 | 
			
		||||
				mdr[*r.Id].Values = append(mdr[*r.Id].Values, r.Values...)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if resp.NextToken == nil || *resp.NextToken == "" {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		nextToken = *resp.NextToken
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for i, r := range mdr {
 | 
			
		||||
		if *r.StatusCode != "Complete" {
 | 
			
		||||
			return queryResponses, fmt.Errorf("Part of query is failed: %s", *r.StatusCode)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		queryRes := tsdb.NewQueryResult()
 | 
			
		||||
		queryRes.RefId = queries[i].RefId
 | 
			
		||||
		query := queries[*r.Id]
 | 
			
		||||
 | 
			
		||||
		series := tsdb.TimeSeries{
 | 
			
		||||
			Tags:   map[string]string{},
 | 
			
		||||
			Points: make([]tsdb.TimePoint, 0),
 | 
			
		||||
		}
 | 
			
		||||
		for _, d := range query.Dimensions {
 | 
			
		||||
			series.Tags[*d.Name] = *d.Value
 | 
			
		||||
		}
 | 
			
		||||
		s := ""
 | 
			
		||||
		if len(query.Statistics) == 1 {
 | 
			
		||||
			s = *query.Statistics[0]
 | 
			
		||||
		} else {
 | 
			
		||||
			s = *query.ExtendedStatistics[0]
 | 
			
		||||
		}
 | 
			
		||||
		series.Name = formatAlias(query, s, series.Tags)
 | 
			
		||||
 | 
			
		||||
		for j, t := range r.Timestamps {
 | 
			
		||||
			expectedTimestamp := r.Timestamps[j].Add(time.Duration(query.Period) * time.Second)
 | 
			
		||||
			if j > 0 && expectedTimestamp.Before(*t) {
 | 
			
		||||
				series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
 | 
			
		||||
			}
 | 
			
		||||
			series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*r.Values[j]), float64((*t).Unix())*1000))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		queryRes.Series = append(queryRes.Series, &series)
 | 
			
		||||
		queryResponses = append(queryResponses, queryRes)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return queryResponses, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseDimensions(model *simplejson.Json) ([]*cloudwatch.Dimension, error) {
 | 
			
		||||
	var result []*cloudwatch.Dimension
 | 
			
		||||
 | 
			
		||||
@@ -257,6 +406,9 @@ func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	id := model.Get("id").MustString("")
 | 
			
		||||
	expression := model.Get("expression").MustString("")
 | 
			
		||||
 | 
			
		||||
	dimensions, err := parseDimensions(model)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
@@ -295,6 +447,7 @@ func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
 | 
			
		||||
		alias = "{{metric}}_{{stat}}"
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	returnData := model.Get("returnData").MustBool(false)
 | 
			
		||||
	highResolution := model.Get("highResolution").MustBool(false)
 | 
			
		||||
 | 
			
		||||
	return &CloudWatchQuery{
 | 
			
		||||
@@ -306,11 +459,18 @@ func parseQuery(model *simplejson.Json) (*CloudWatchQuery, error) {
 | 
			
		||||
		ExtendedStatistics: aws.StringSlice(extendedStatistics),
 | 
			
		||||
		Period:             period,
 | 
			
		||||
		Alias:              alias,
 | 
			
		||||
		Id:                 id,
 | 
			
		||||
		Expression:         expression,
 | 
			
		||||
		ReturnData:         returnData,
 | 
			
		||||
		HighResolution:     highResolution,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]string) string {
 | 
			
		||||
	if len(query.Id) > 0 && len(query.Expression) > 0 {
 | 
			
		||||
		return query.Id
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	data := map[string]string{}
 | 
			
		||||
	data["region"] = query.Region
 | 
			
		||||
	data["namespace"] = query.Namespace
 | 
			
		||||
@@ -338,6 +498,7 @@ func formatAlias(query *CloudWatchQuery, stat string, dimensions map[string]stri
 | 
			
		||||
func parseResponse(resp *cloudwatch.GetMetricStatisticsOutput, query *CloudWatchQuery) (*tsdb.QueryResult, error) {
 | 
			
		||||
	queryRes := tsdb.NewQueryResult()
 | 
			
		||||
 | 
			
		||||
	queryRes.RefId = query.RefId
 | 
			
		||||
	var value float64
 | 
			
		||||
	for _, s := range append(query.Statistics, query.ExtendedStatistics...) {
 | 
			
		||||
		series := tsdb.TimeSeries{
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user