mirror of
https://github.com/grafana/grafana.git
synced 2025-02-09 23:16:16 -06:00
Chore: Convert CloudWatch metrics to data frames (#27449)
* CloudWatch: Convert metrics results to data frames Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com> Co-authored-by: kay delaney <kay@grafana.com>
This commit is contained in:
parent
810c327e31
commit
0a2ef086b3
@ -5,10 +5,12 @@ import (
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/grafana/grafana/pkg/infra/metrics"
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudWatchClient, metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
|
||||
func (e *cloudWatchExecutor) executeRequest(ctx context.Context, client cloudwatchiface.CloudWatchAPI,
|
||||
metricDataInput *cloudwatch.GetMetricDataInput) ([]*cloudwatch.GetMetricDataOutput, error) {
|
||||
mdo := make([]*cloudwatch.GetMetricDataOutput, 0)
|
||||
|
||||
nextToken := ""
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -14,6 +15,7 @@ import (
|
||||
var counter = 1
|
||||
|
||||
type cloudWatchFakeClient struct {
|
||||
cloudwatchiface.CloudWatchAPI
|
||||
}
|
||||
|
||||
func (client *cloudWatchFakeClient) GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error) {
|
||||
|
@ -17,8 +17,8 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
|
||||
}
|
||||
|
||||
nonEmptyRows := make([][]*cloudwatchlogs.ResultField, 0)
|
||||
// Sometimes CloudWatch can send empty rows
|
||||
for _, row := range response.Results {
|
||||
// Sometimes CloudWatch can send empty rows
|
||||
if len(row) == 0 {
|
||||
continue
|
||||
}
|
||||
@ -26,7 +26,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
|
||||
if row[0].Value == nil {
|
||||
continue
|
||||
}
|
||||
// Sometimes it sends row with only timestamp
|
||||
// Sometimes it sends rows with only timestamp
|
||||
if _, err := time.Parse(cloudWatchTSFormat, *row[0].Value); err == nil {
|
||||
continue
|
||||
}
|
||||
@ -52,7 +52,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
|
||||
if _, exists := fieldValues[*resultField.Field]; !exists {
|
||||
fieldNames = append(fieldNames, *resultField.Field)
|
||||
|
||||
// Check if field is time field
|
||||
// Check if it's a time field
|
||||
if _, err := time.Parse(cloudWatchTSFormat, *resultField.Value); err == nil {
|
||||
fieldValues[*resultField.Field] = make([]*time.Time, rowCount)
|
||||
} else if _, err := strconv.ParseFloat(*resultField.Value, 64); err == nil {
|
||||
@ -81,7 +81,7 @@ func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*d
|
||||
}
|
||||
}
|
||||
|
||||
newFields := make([]*data.Field, 0)
|
||||
newFields := make([]*data.Field, 0, len(fieldNames))
|
||||
for _, fieldName := range fieldNames {
|
||||
newFields = append(newFields, data.NewField(fieldName, nil, fieldValues[fieldName]))
|
||||
|
||||
|
@ -72,7 +72,7 @@ func TestMetricDataQueryBuilder_buildSearchExpression(t *testing.T) {
|
||||
}
|
||||
|
||||
res := buildSearchExpression(query, "Average")
|
||||
assert.Equal(t, res, `REMOVE_EMPTY(SEARCH('{AWS/EC2,"LoadBalancer"} MetricName="CPUUtilization"', 'Average', 300))`)
|
||||
assert.Equal(t, `REMOVE_EMPTY(SEARCH('{AWS/EC2,"LoadBalancer"} MetricName="CPUUtilization"', 'Average', 300))`, res)
|
||||
})
|
||||
|
||||
t.Run("Query has three dimension values for two given dimension keys, and one value is a star", func(t *testing.T) {
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
)
|
||||
@ -13,7 +14,9 @@ import (
|
||||
// has more than one statistic defined, one cloudwatchQuery will be created for each statistic.
|
||||
// If the query doesn't have an Id defined by the user, we'll give it an with format `query[RefId]`. In the case
|
||||
// the incoming query had more than one stat, it will ge an id like `query[RefId]_[StatName]`, eg queryC_Average
|
||||
func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (map[string]*cloudWatchQuery, error) {
|
||||
func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQueries []*requestQuery) (
|
||||
map[string]*cloudWatchQuery, error) {
|
||||
plog.Debug("Transforming CloudWatch request queries")
|
||||
cloudwatchQueries := make(map[string]*cloudWatchQuery)
|
||||
for _, requestQuery := range requestQueries {
|
||||
for _, stat := range requestQuery.Statistics {
|
||||
@ -52,17 +55,22 @@ func (e *cloudWatchExecutor) transformRequestQueriesToCloudWatchQueries(requestQ
|
||||
|
||||
func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchResponses []*cloudwatchResponse) map[string]*tsdb.QueryResult {
|
||||
responsesByRefID := make(map[string][]*cloudwatchResponse)
|
||||
refIDs := sort.StringSlice{}
|
||||
for _, res := range cloudwatchResponses {
|
||||
refIDs = append(refIDs, res.RefId)
|
||||
responsesByRefID[res.RefId] = append(responsesByRefID[res.RefId], res)
|
||||
}
|
||||
// Ensure stable results
|
||||
refIDs.Sort()
|
||||
|
||||
results := make(map[string]*tsdb.QueryResult)
|
||||
for refID, responses := range responsesByRefID {
|
||||
for _, refID := range refIDs {
|
||||
responses := responsesByRefID[refID]
|
||||
queryResult := tsdb.NewQueryResult()
|
||||
queryResult.RefId = refID
|
||||
queryResult.Meta = simplejson.New()
|
||||
queryResult.Series = tsdb.TimeSeriesSlice{}
|
||||
timeSeries := make(tsdb.TimeSeriesSlice, 0)
|
||||
frames := make(data.Frames, 0, len(responses))
|
||||
|
||||
requestExceededMaxLimit := false
|
||||
partialData := false
|
||||
@ -72,7 +80,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
|
||||
}{}
|
||||
|
||||
for _, response := range responses {
|
||||
timeSeries = append(timeSeries, *response.series...)
|
||||
frames = append(frames, response.DataFrames...)
|
||||
requestExceededMaxLimit = requestExceededMaxLimit || response.RequestExceededMaxLimit
|
||||
partialData = partialData || response.PartialData
|
||||
queryMeta = append(queryMeta, struct {
|
||||
@ -85,8 +93,8 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
|
||||
})
|
||||
}
|
||||
|
||||
sort.Slice(timeSeries, func(i, j int) bool {
|
||||
return timeSeries[i].Name < timeSeries[j].Name
|
||||
sort.Slice(frames, func(i, j int) bool {
|
||||
return frames[i].Name < frames[j].Name
|
||||
})
|
||||
|
||||
if requestExceededMaxLimit {
|
||||
@ -96,7 +104,7 @@ func (e *cloudWatchExecutor) transformQueryResponseToQueryResult(cloudwatchRespo
|
||||
queryResult.ErrorString = "Cloudwatch GetMetricData error: Too many datapoints requested - your search has been limited. Please try to reduce the time range"
|
||||
}
|
||||
|
||||
queryResult.Series = append(queryResult.Series, timeSeries...)
|
||||
queryResult.Dataframes = tsdb.NewDecodedDataFrames(frames)
|
||||
queryResult.Meta.Set("gmdMeta", queryMeta)
|
||||
results[refID] = queryResult
|
||||
}
|
||||
|
@ -17,13 +17,13 @@ import (
|
||||
// Parses the json queries and returns a requestQuery. The requestQuery has a 1 to 1 mapping to a query editor row
|
||||
func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTime time.Time, endTime time.Time) (map[string][]*requestQuery, error) {
|
||||
requestQueries := make(map[string][]*requestQuery)
|
||||
for i, model := range queryContext.Queries {
|
||||
queryType := model.Model.Get("type").MustString()
|
||||
for i, query := range queryContext.Queries {
|
||||
queryType := query.Model.Get("type").MustString()
|
||||
if queryType != "timeSeriesQuery" && queryType != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
refID := queryContext.Queries[i].RefId
|
||||
refID := query.RefId
|
||||
query, err := parseRequestQuery(queryContext.Queries[i].Model, refID, startTime, endTime)
|
||||
if err != nil {
|
||||
return nil, &queryError{err: err, RefID: refID}
|
||||
@ -39,6 +39,7 @@ func (e *cloudWatchExecutor) parseQueries(queryContext *tsdb.TsdbQuery, startTim
|
||||
}
|
||||
|
||||
func parseRequestQuery(model *simplejson.Json, refId string, startTime time.Time, endTime time.Time) (*requestQuery, error) {
|
||||
plog.Debug("Parsing request query", "query", model)
|
||||
reNumber := regexp.MustCompile(`^\d+$`)
|
||||
region, err := model.Get("region").String()
|
||||
if err != nil {
|
||||
|
@ -196,7 +196,7 @@ func TestRequestParser(t *testing.T) {
|
||||
|
||||
res, err := parseRequestQuery(query, "ref1", from, to)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, res.Period, 21600)
|
||||
assert.Equal(t, 21600, res.Period)
|
||||
})
|
||||
|
||||
t.Run("Time range is 2 years", func(t *testing.T) {
|
||||
|
@ -8,11 +8,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput, queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
|
||||
func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMetricDataOutput,
|
||||
queries map[string]*cloudWatchQuery) ([]*cloudwatchResponse, error) {
|
||||
// Map from result ID -> label -> result
|
||||
mdrs := make(map[string]map[string]*cloudwatch.MetricDataResult)
|
||||
labels := map[string][]string{}
|
||||
@ -49,13 +49,13 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe
|
||||
cloudWatchResponses := make([]*cloudwatchResponse, 0)
|
||||
for id, lr := range mdrs {
|
||||
query := queries[id]
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(lr, labels[id], query)
|
||||
frames, partialData, err := parseMetricResults(lr, labels[id], query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := &cloudwatchResponse{
|
||||
series: series,
|
||||
DataFrames: frames,
|
||||
Period: query.Period,
|
||||
Expression: query.UsedExpression,
|
||||
RefId: query.RefId,
|
||||
@ -69,25 +69,25 @@ func (e *cloudWatchExecutor) parseResponse(metricDataOutputs []*cloudwatch.GetMe
|
||||
return cloudWatchResponses, nil
|
||||
}
|
||||
|
||||
func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.MetricDataResult, labels []string,
|
||||
query *cloudWatchQuery) (*tsdb.TimeSeriesSlice, bool, error) {
|
||||
func parseMetricResults(results map[string]*cloudwatch.MetricDataResult, labels []string,
|
||||
query *cloudWatchQuery) (data.Frames, bool, error) {
|
||||
partialData := false
|
||||
result := tsdb.TimeSeriesSlice{}
|
||||
frames := data.Frames{}
|
||||
for _, label := range labels {
|
||||
metricDataResult := metricDataResults[label]
|
||||
if *metricDataResult.StatusCode != "Complete" {
|
||||
result := results[label]
|
||||
if *result.StatusCode != "Complete" {
|
||||
partialData = true
|
||||
}
|
||||
|
||||
for _, message := range metricDataResult.Messages {
|
||||
for _, message := range result.Messages {
|
||||
if *message.Code == "ArithmeticError" {
|
||||
return nil, false, fmt.Errorf("ArithmeticError in query %q: %s", query.RefId, *message.Value)
|
||||
}
|
||||
}
|
||||
|
||||
// In case a multi-valued dimension is used and the cloudwatch query yields no values, create one empty time series for each dimension value.
|
||||
// Use that dimension value to expand the alias field
|
||||
if len(metricDataResult.Values) == 0 && query.isMultiValuedDimensionExpression() {
|
||||
// In case a multi-valued dimension is used and the cloudwatch query yields no values, create one empty time
|
||||
// series for each dimension value. Use that dimension value to expand the alias field
|
||||
if len(result.Values) == 0 && query.isMultiValuedDimensionExpression() {
|
||||
series := 0
|
||||
multiValuedDimension := ""
|
||||
for key, values := range query.Dimensions {
|
||||
@ -98,62 +98,88 @@ func parseGetMetricDataTimeSeries(metricDataResults map[string]*cloudwatch.Metri
|
||||
}
|
||||
|
||||
for _, value := range query.Dimensions[multiValuedDimension] {
|
||||
emptySeries := tsdb.TimeSeries{
|
||||
Tags: map[string]string{multiValuedDimension: value},
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
tags := map[string]string{multiValuedDimension: value}
|
||||
for key, values := range query.Dimensions {
|
||||
if key != multiValuedDimension && len(values) > 0 {
|
||||
emptySeries.Tags[key] = values[0]
|
||||
tags[key] = values[0]
|
||||
}
|
||||
}
|
||||
|
||||
emptySeries.Name = formatAlias(query, query.Stats, emptySeries.Tags, label)
|
||||
result = append(result, &emptySeries)
|
||||
timeField := data.NewField("timestamp", nil, []*time.Time{})
|
||||
timeField.SetConfig(&data.FieldConfig{DisplayName: "Time"})
|
||||
|
||||
frameName := formatAlias(query, query.Stats, tags, label)
|
||||
valueField := data.NewField("value", tags, []*float64{})
|
||||
valueField.SetConfig(&data.FieldConfig{DisplayName: frameName})
|
||||
|
||||
emptyFrame := data.Frame{
|
||||
Name: frameName,
|
||||
Fields: []*data.Field{
|
||||
timeField,
|
||||
valueField,
|
||||
},
|
||||
RefID: query.RefId,
|
||||
}
|
||||
frames = append(frames, &emptyFrame)
|
||||
}
|
||||
} else {
|
||||
keys := make([]string, 0)
|
||||
dims := make([]string, 0, len(query.Dimensions))
|
||||
for k := range query.Dimensions {
|
||||
keys = append(keys, k)
|
||||
dims = append(dims, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
sort.Strings(dims)
|
||||
|
||||
series := tsdb.TimeSeries{
|
||||
Tags: make(map[string]string),
|
||||
Points: make([]tsdb.TimePoint, 0),
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
values := query.Dimensions[key]
|
||||
tags := data.Labels{}
|
||||
for _, dim := range dims {
|
||||
values := query.Dimensions[dim]
|
||||
if len(values) == 1 && values[0] != "*" {
|
||||
series.Tags[key] = values[0]
|
||||
tags[dim] = values[0]
|
||||
} else {
|
||||
for _, value := range values {
|
||||
if value == label || value == "*" {
|
||||
series.Tags[key] = label
|
||||
tags[dim] = label
|
||||
} else if strings.Contains(label, value) {
|
||||
series.Tags[key] = value
|
||||
tags[dim] = value
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
series.Name = formatAlias(query, query.Stats, series.Tags, label)
|
||||
|
||||
for j, t := range metricDataResult.Timestamps {
|
||||
timestamps := []*time.Time{}
|
||||
points := []*float64{}
|
||||
for j, t := range result.Timestamps {
|
||||
if j > 0 {
|
||||
expectedTimestamp := metricDataResult.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second)
|
||||
expectedTimestamp := result.Timestamps[j-1].Add(time.Duration(query.Period) * time.Second)
|
||||
if expectedTimestamp.Before(*t) {
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFromPtr(nil), float64(expectedTimestamp.Unix()*1000)))
|
||||
timestamps = append(timestamps, &expectedTimestamp)
|
||||
points = append(points, nil)
|
||||
}
|
||||
}
|
||||
series.Points = append(series.Points, tsdb.NewTimePoint(null.FloatFrom(*metricDataResult.Values[j]),
|
||||
float64(t.Unix())*1000))
|
||||
val := result.Values[j]
|
||||
timestamps = append(timestamps, t)
|
||||
points = append(points, val)
|
||||
}
|
||||
result = append(result, &series)
|
||||
|
||||
timeField := data.NewField("timestamp", nil, timestamps)
|
||||
timeField.SetConfig(&data.FieldConfig{DisplayName: "Time"})
|
||||
|
||||
frameName := formatAlias(query, query.Stats, tags, label)
|
||||
valueField := data.NewField("value", tags, points)
|
||||
valueField.SetConfig(&data.FieldConfig{DisplayName: frameName})
|
||||
|
||||
frame := data.Frame{
|
||||
Name: frameName,
|
||||
Fields: []*data.Field{
|
||||
timeField,
|
||||
valueField,
|
||||
},
|
||||
RefID: query.RefId,
|
||||
}
|
||||
frames = append(frames, &frame)
|
||||
}
|
||||
}
|
||||
return &result, partialData, nil
|
||||
|
||||
return frames, partialData, nil
|
||||
}
|
||||
|
||||
func formatAlias(query *cloudWatchQuery, stat string, dimensions map[string]string, label string) string {
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/components/null"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -61,17 +60,17 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{LoadBalancer}} Expanded",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
timeSeries := (*series)[0]
|
||||
frame1 := frames[0]
|
||||
assert.False(t, partialData)
|
||||
assert.Equal(t, "lb1 Expanded", timeSeries.Name)
|
||||
assert.Equal(t, "lb1", timeSeries.Tags["LoadBalancer"])
|
||||
assert.Equal(t, "lb1 Expanded", frame1.Name)
|
||||
assert.Equal(t, "lb1", frame1.Fields[1].Labels["LoadBalancer"])
|
||||
|
||||
timeSeries2 := (*series)[1]
|
||||
assert.Equal(t, "lb2 Expanded", timeSeries2.Name)
|
||||
assert.Equal(t, "lb2", timeSeries2.Tags["LoadBalancer"])
|
||||
frame2 := frames[1]
|
||||
assert.Equal(t, "lb2 Expanded", frame2.Name)
|
||||
assert.Equal(t, "lb2", frame2.Fields[1].Labels["LoadBalancer"])
|
||||
})
|
||||
|
||||
t.Run("Expand dimension value using substring", func(t *testing.T) {
|
||||
@ -123,17 +122,17 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{LoadBalancer}} Expanded",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
timeSeries := (*series)[0]
|
||||
frame1 := frames[0]
|
||||
assert.False(t, partialData)
|
||||
assert.Equal(t, "lb1 Expanded", timeSeries.Name)
|
||||
assert.Equal(t, "lb1", timeSeries.Tags["LoadBalancer"])
|
||||
assert.Equal(t, "lb1 Expanded", frame1.Name)
|
||||
assert.Equal(t, "lb1", frame1.Fields[1].Labels["LoadBalancer"])
|
||||
|
||||
timeSeries2 := (*series)[1]
|
||||
assert.Equal(t, "lb2 Expanded", timeSeries2.Name)
|
||||
assert.Equal(t, "lb2", timeSeries2.Tags["LoadBalancer"])
|
||||
frame2 := frames[1]
|
||||
assert.Equal(t, "lb2 Expanded", frame2.Name)
|
||||
assert.Equal(t, "lb2", frame2.Fields[1].Labels["LoadBalancer"])
|
||||
})
|
||||
|
||||
t.Run("Expand dimension value using wildcard", func(t *testing.T) {
|
||||
@ -185,11 +184,12 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{LoadBalancer}} Expanded",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.False(t, partialData)
|
||||
assert.Equal(t, "lb3 Expanded", (*series)[0].Name)
|
||||
assert.Equal(t, "lb4 Expanded", (*series)[1].Name)
|
||||
assert.Equal(t, "lb3 Expanded", frames[0].Name)
|
||||
assert.Equal(t, "lb4 Expanded", frames[1].Name)
|
||||
})
|
||||
|
||||
t.Run("Expand dimension value when no values are returned and a multi-valued template variable is used", func(t *testing.T) {
|
||||
@ -221,12 +221,13 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{LoadBalancer}} Expanded",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.False(t, partialData)
|
||||
assert.Len(t, *series, 2)
|
||||
assert.Equal(t, "lb1 Expanded", (*series)[0].Name)
|
||||
assert.Equal(t, "lb2 Expanded", (*series)[1].Name)
|
||||
assert.Len(t, frames, 2)
|
||||
assert.Equal(t, "lb1 Expanded", frames[0].Name)
|
||||
assert.Equal(t, "lb2 Expanded", frames[1].Name)
|
||||
})
|
||||
|
||||
t.Run("Expand dimension value when no values are returned and a multi-valued template variable and two single-valued dimensions are used", func(t *testing.T) {
|
||||
@ -260,12 +261,13 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{LoadBalancer}} Expanded {{InstanceType}} - {{Resource}}",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.False(t, partialData)
|
||||
assert.Len(t, *series, 2)
|
||||
assert.Equal(t, "lb1 Expanded micro - res", (*series)[0].Name)
|
||||
assert.Equal(t, "lb2 Expanded micro - res", (*series)[1].Name)
|
||||
assert.Len(t, frames, 2)
|
||||
assert.Equal(t, "lb1 Expanded micro - res", frames[0].Name)
|
||||
assert.Equal(t, "lb2 Expanded micro - res", frames[1].Name)
|
||||
})
|
||||
|
||||
t.Run("Parse cloudwatch response", func(t *testing.T) {
|
||||
@ -302,15 +304,16 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
Period: 60,
|
||||
Alias: "{{namespace}}_{{metric}}_{{stat}}",
|
||||
}
|
||||
series, partialData, err := parseGetMetricDataTimeSeries(mdrs, labels, query)
|
||||
timeSeries := (*series)[0]
|
||||
frames, partialData, err := parseMetricResults(mdrs, labels, query)
|
||||
require.NoError(t, err)
|
||||
|
||||
frame := frames[0]
|
||||
assert.False(t, partialData)
|
||||
assert.Equal(t, "AWS/ApplicationELB_TargetResponseTime_Average", timeSeries.Name)
|
||||
assert.Equal(t, "lb", timeSeries.Tags["LoadBalancer"])
|
||||
assert.Equal(t, null.FloatFrom(10.0).String(), timeSeries.Points[0][0].String())
|
||||
assert.Equal(t, null.FloatFrom(20.0).String(), timeSeries.Points[1][0].String())
|
||||
assert.Equal(t, null.FloatFromPtr(nil).String(), timeSeries.Points[2][0].String())
|
||||
assert.Equal(t, null.FloatFrom(30.0).String(), timeSeries.Points[3][0].String())
|
||||
assert.Equal(t, "AWS/ApplicationELB_TargetResponseTime_Average", frame.Name)
|
||||
assert.Equal(t, "lb", frame.Fields[1].Labels["LoadBalancer"])
|
||||
assert.Equal(t, 10.0, *frame.Fields[1].At(0).(*float64))
|
||||
assert.Equal(t, 20.0, *frame.Fields[1].At(1).(*float64))
|
||||
assert.Nil(t, frame.Fields[1].At(2))
|
||||
assert.Equal(t, 30.0, *frame.Fields[1].At(3).(*float64))
|
||||
})
|
||||
}
|
||||
|
@ -6,17 +6,19 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana/pkg/util/errutil"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) executeTimeSeriesQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
|
||||
plog.Debug("Executing time series query")
|
||||
startTime, err := queryContext.TimeRange.ParseFrom()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errutil.Wrap("failed to parse start time", err)
|
||||
}
|
||||
endTime, err := queryContext.TimeRange.ParseTo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errutil.Wrap("failed to parse end time", err)
|
||||
}
|
||||
if !startTime.Before(endTime) {
|
||||
return nil, fmt.Errorf("invalid time range: start time must be before end time")
|
||||
|
@ -3,16 +3,9 @@ package cloudwatch
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana/pkg/tsdb"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
type cloudWatchClient interface {
|
||||
GetMetricDataWithContext(ctx aws.Context, input *cloudwatch.GetMetricDataInput, opts ...request.Option) (*cloudwatch.GetMetricDataOutput, error)
|
||||
}
|
||||
|
||||
type requestQuery struct {
|
||||
RefId string
|
||||
Region string
|
||||
@ -31,7 +24,7 @@ type requestQuery struct {
|
||||
}
|
||||
|
||||
type cloudwatchResponse struct {
|
||||
series *tsdb.TimeSeriesSlice
|
||||
DataFrames data.Frames
|
||||
Id string
|
||||
RefId string
|
||||
Expression string
|
||||
|
@ -89,7 +89,7 @@ func NewTimeSeries(name string, points TimeSeriesPoints) *TimeSeries {
|
||||
}
|
||||
}
|
||||
|
||||
// DataFrames interface for retrieving encoded and decoded data frames.
|
||||
// DataFrames is an interface for retrieving encoded and decoded data frames.
|
||||
//
|
||||
// See NewDecodedDataFrames and NewEncodedDataFrames for more information.
|
||||
type DataFrames interface {
|
||||
|
@ -16,7 +16,6 @@ import {
|
||||
LogRowModel,
|
||||
ScopedVars,
|
||||
TimeRange,
|
||||
toDataFrame,
|
||||
rangeUtil,
|
||||
DataQueryErrorType,
|
||||
} from '@grafana/data';
|
||||
@ -497,75 +496,70 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
|
||||
)}`;
|
||||
}
|
||||
|
||||
performTimeSeriesQuery(request: MetricRequest, { from, to }: TimeRange): Promise<any> {
|
||||
return this.awsRequest(TSDB_QUERY_ENDPOINT, request)
|
||||
.then((res: TSDBResponse) => {
|
||||
if (!res.results) {
|
||||
return { data: [] };
|
||||
}
|
||||
return Object.values(request.queries).reduce(
|
||||
({ data, error }: any, queryRequest: any) => {
|
||||
const queryResult = res.results[queryRequest.refId];
|
||||
if (!queryResult) {
|
||||
return { data, error };
|
||||
}
|
||||
async performTimeSeriesQuery(request: MetricRequest, { from, to }: TimeRange): Promise<any> {
|
||||
try {
|
||||
const res: TSDBResponse = await this.awsRequest(TSDB_QUERY_ENDPOINT, request);
|
||||
const dataframes: DataFrame[] = toDataQueryResponse({ data: res }).data;
|
||||
if (!dataframes || dataframes.length <= 0) {
|
||||
return { data: [] };
|
||||
}
|
||||
|
||||
const link = this.buildCloudwatchConsoleUrl(
|
||||
queryRequest,
|
||||
from.toISOString(),
|
||||
to.toISOString(),
|
||||
queryRequest.refId,
|
||||
queryResult.meta.gmdMeta
|
||||
);
|
||||
return Object.values(request.queries).reduce(
|
||||
({ data, error }: any, queryRequest: any) => {
|
||||
const queryResult = res.results[queryRequest.refId];
|
||||
if (!queryResult) {
|
||||
return { data, error };
|
||||
}
|
||||
|
||||
return {
|
||||
error: error || queryResult.error ? { message: queryResult.error } : null,
|
||||
data: [
|
||||
...data,
|
||||
...queryResult.series.map(({ name, points }: any) => {
|
||||
const dataFrame = toDataFrame({
|
||||
target: name,
|
||||
datapoints: points,
|
||||
refId: queryRequest.refId,
|
||||
meta: queryResult.meta,
|
||||
});
|
||||
if (link) {
|
||||
for (const field of dataFrame.fields) {
|
||||
field.config.links = [
|
||||
{
|
||||
url: link,
|
||||
title: 'View in CloudWatch console',
|
||||
targetBlank: true,
|
||||
},
|
||||
];
|
||||
}
|
||||
const link = this.buildCloudwatchConsoleUrl(
|
||||
queryRequest,
|
||||
from.toISOString(),
|
||||
to.toISOString(),
|
||||
queryRequest.refId,
|
||||
queryResult.meta.gmdMeta
|
||||
);
|
||||
|
||||
return {
|
||||
error: error || queryResult.error ? { message: queryResult.error } : null,
|
||||
data: [
|
||||
...data,
|
||||
...dataframes.map(frame => {
|
||||
if (link) {
|
||||
for (const field of frame.fields) {
|
||||
field.config.links = [
|
||||
{
|
||||
url: link,
|
||||
title: 'View in CloudWatch console',
|
||||
targetBlank: true,
|
||||
},
|
||||
];
|
||||
}
|
||||
return dataFrame;
|
||||
}),
|
||||
],
|
||||
};
|
||||
},
|
||||
{ data: [], error: null }
|
||||
);
|
||||
})
|
||||
.catch((err: any = { data: { error: '' } }) => {
|
||||
if (/^Throttling:.*/.test(err.data.message)) {
|
||||
const failedRedIds = Object.keys(err.data.results);
|
||||
const regionsAffected = Object.values(request.queries).reduce(
|
||||
(res: string[], { refId, region }) =>
|
||||
(refId && !failedRedIds.includes(refId)) || res.includes(region) ? res : [...res, region],
|
||||
[]
|
||||
) as string[];
|
||||
}
|
||||
return frame;
|
||||
}),
|
||||
],
|
||||
};
|
||||
},
|
||||
{ data: [], error: null }
|
||||
);
|
||||
} catch (err) {
|
||||
if (/^Throttling:.*/.test(err.data.message)) {
|
||||
const failedRedIds = Object.keys(err.data.results);
|
||||
const regionsAffected = Object.values(request.queries).reduce(
|
||||
(res: string[], { refId, region }) =>
|
||||
(refId && !failedRedIds.includes(refId)) || res.includes(region) ? res : [...res, region],
|
||||
[]
|
||||
) as string[];
|
||||
|
||||
regionsAffected.forEach(region => this.debouncedAlert(this.datasourceName, this.getActualRegion(region)));
|
||||
}
|
||||
regionsAffected.forEach(region => this.debouncedAlert(this.datasourceName, this.getActualRegion(region)));
|
||||
}
|
||||
|
||||
if (err.data && err.data.message === 'Metric request error' && err.data.error) {
|
||||
err.data.message = err.data.error;
|
||||
}
|
||||
if (err.data && err.data.message === 'Metric request error' && err.data.error) {
|
||||
err.data.message = err.data.error;
|
||||
}
|
||||
|
||||
throw err;
|
||||
});
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
transformSuggestDataFromTable(suggestData: TSDBResponse) {
|
||||
|
Loading…
Reference in New Issue
Block a user