mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Cloudwatch: Backend cleanup (#59663)
* cleanup cloudwatch.go * streamline interface naming * use utility func * rename test utils file * move util function to where they are used * move dtos to models * split integration tests from the rest * Update pkg/tsdb/cloudwatch/cloudwatch.go Co-authored-by: Isabella Siu <Isabella.siu@grafana.com> * refactor error codes aggregation * move error messages to models Co-authored-by: Isabella Siu <Isabella.siu@grafana.com>
This commit is contained in:
parent
ada0c771ef
commit
fde9a5d112
65
pkg/tsdb/cloudwatch/client_factory.go
Normal file
65
pkg/tsdb/cloudwatch/client_factory.go
Normal file
@ -0,0 +1,65 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go/aws/client"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
||||
"github.com/aws/aws-sdk-go/service/oam"
|
||||
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
|
||||
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||
)
|
||||
|
||||
// NewMetricsAPI is a CloudWatch metrics api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider {
|
||||
return cloudwatch.New(sess)
|
||||
}
|
||||
|
||||
// NewLogsAPI is a CloudWatch logs api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewLogsAPI = func(sess *session.Session) models.CloudWatchLogsAPIProvider {
|
||||
return cloudwatchlogs.New(sess)
|
||||
}
|
||||
|
||||
// NewOAMAPI is a CloudWatch OAM api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewOAMAPI = func(sess *session.Session) models.OAMAPIProvider {
|
||||
return oam.New(sess)
|
||||
}
|
||||
|
||||
// NewCWClient is a CloudWatch client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
|
||||
return cloudwatch.New(sess)
|
||||
}
|
||||
|
||||
// NewCWLogsClient is a CloudWatch logs client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
|
||||
return cloudwatchlogs.New(sess)
|
||||
}
|
||||
|
||||
// EC2 client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
|
||||
return ec2.New(provider)
|
||||
}
|
||||
|
||||
// RGTA client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
|
||||
return resourcegroupstaggingapi.New(provider)
|
||||
}
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models/resources"
|
||||
)
|
||||
|
||||
// this client wraps the CloudWatch API and handles pagination and the composition of the MetricResponse DTO
|
||||
type metricsClient struct {
|
||||
models.CloudWatchMetricsAPIProvider
|
||||
config *setting.Cfg
|
||||
|
@ -6,26 +6,20 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/client"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
|
||||
"github.com/aws/aws-sdk-go/service/oam"
|
||||
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi"
|
||||
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
|
||||
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -54,17 +48,8 @@ type DataSource struct {
|
||||
}
|
||||
|
||||
const (
|
||||
cloudWatchTSFormat = "2006-01-02 15:04:05.000"
|
||||
defaultRegion = "default"
|
||||
|
||||
// Constants also defined in datasource/cloudwatch/datasource.ts
|
||||
logIdentifierInternal = "__log__grafana_internal__"
|
||||
logStreamIdentifierInternal = "__logstream__grafana_internal__"
|
||||
|
||||
alertMaxAttempts = 8
|
||||
alertPollPeriod = time.Second
|
||||
logsQueryMode = "Logs"
|
||||
|
||||
defaultRegion = "default"
|
||||
logsQueryMode = "Logs"
|
||||
// QueryTypes
|
||||
annotationQuery = "annotationQuery"
|
||||
logAction = "logAction"
|
||||
@ -106,29 +91,6 @@ func newExecutor(im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions Ses
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) getRequestContext(pluginCtx backend.PluginContext, region string) (models.RequestContext, error) {
|
||||
r := region
|
||||
instance, err := e.getInstance(pluginCtx)
|
||||
if region == defaultRegion {
|
||||
if err != nil {
|
||||
return models.RequestContext{}, err
|
||||
}
|
||||
r = instance.Settings.Region
|
||||
}
|
||||
|
||||
sess, err := e.newSession(pluginCtx, r)
|
||||
if err != nil {
|
||||
return models.RequestContext{}, err
|
||||
}
|
||||
return models.RequestContext{
|
||||
OAMClientProvider: NewOAMAPI(sess),
|
||||
MetricsClientProvider: clients.NewMetricsClient(NewMetricsAPI(sess), e.cfg),
|
||||
LogsAPIProvider: NewLogsAPI(sess),
|
||||
Settings: instance.Settings,
|
||||
Features: e.features,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
|
||||
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
instanceSettings, err := models.LoadCloudWatchSettings(settings)
|
||||
@ -158,10 +120,93 @@ type cloudWatchExecutor struct {
|
||||
resourceHandler backend.CallResourceHandler
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) getRequestContext(pluginCtx backend.PluginContext, region string) (models.RequestContext, error) {
|
||||
r := region
|
||||
instance, err := e.getInstance(pluginCtx)
|
||||
if region == defaultRegion {
|
||||
if err != nil {
|
||||
return models.RequestContext{}, err
|
||||
}
|
||||
r = instance.Settings.Region
|
||||
}
|
||||
|
||||
sess, err := e.newSession(pluginCtx, r)
|
||||
if err != nil {
|
||||
return models.RequestContext{}, err
|
||||
}
|
||||
return models.RequestContext{
|
||||
OAMAPIProvider: NewOAMAPI(sess),
|
||||
MetricsClientProvider: clients.NewMetricsClient(NewMetricsAPI(sess), e.cfg),
|
||||
LogsAPIProvider: NewLogsAPI(sess),
|
||||
Settings: instance.Settings,
|
||||
Features: e.features,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
|
||||
return e.resourceHandler.CallResource(ctx, req, sender)
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
logger := logger.FromContext(ctx)
|
||||
/*
|
||||
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
|
||||
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
||||
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
||||
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
||||
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
||||
*/
|
||||
q := req.Queries[0]
|
||||
var model DataQueryJson
|
||||
err := json.Unmarshal(q.JSON, &model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, fromAlert := req.Headers["FromAlert"]
|
||||
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
|
||||
|
||||
if isLogAlertQuery {
|
||||
return e.executeLogAlertQuery(ctx, req)
|
||||
}
|
||||
|
||||
var result *backend.QueryDataResponse
|
||||
switch model.QueryType {
|
||||
case annotationQuery:
|
||||
result, err = e.executeAnnotationQuery(req.PluginContext, model, q)
|
||||
case logAction:
|
||||
result, err = e.executeLogActions(ctx, logger, req)
|
||||
case timeSeriesQuery:
|
||||
fallthrough
|
||||
default:
|
||||
result, err = e.executeTimeSeriesQuery(ctx, logger, req)
|
||||
}
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
status := backend.HealthStatusOk
|
||||
metricsTest := "Successfully queried the CloudWatch metrics API."
|
||||
logsTest := "Successfully queried the CloudWatch logs API."
|
||||
|
||||
err := e.checkHealthMetrics(req.PluginContext)
|
||||
if err != nil {
|
||||
status = backend.HealthStatusError
|
||||
metricsTest = fmt.Sprintf("CloudWatch metrics query failed: %s", err.Error())
|
||||
}
|
||||
|
||||
err = e.checkHealthLogs(req.PluginContext)
|
||||
if err != nil {
|
||||
status = backend.HealthStatusError
|
||||
logsTest = fmt.Sprintf("CloudWatch logs query failed: %s", err.Error())
|
||||
}
|
||||
|
||||
return &backend.CheckHealthResult{
|
||||
Status: status,
|
||||
Message: fmt.Sprintf("1. %s\n2. %s", metricsTest, logsTest),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) checkHealthMetrics(pluginCtx backend.PluginContext) error {
|
||||
namespace := "AWS/Billing"
|
||||
metric := "EstimatedCharges"
|
||||
@ -189,29 +234,6 @@ func (e *cloudWatchExecutor) checkHealthLogs(pluginCtx backend.PluginContext) er
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
|
||||
status := backend.HealthStatusOk
|
||||
metricsTest := "Successfully queried the CloudWatch metrics API."
|
||||
logsTest := "Successfully queried the CloudWatch logs API."
|
||||
|
||||
err := e.checkHealthMetrics(req.PluginContext)
|
||||
if err != nil {
|
||||
status = backend.HealthStatusError
|
||||
metricsTest = fmt.Sprintf("CloudWatch metrics query failed: %s", err.Error())
|
||||
}
|
||||
|
||||
err = e.checkHealthLogs(req.PluginContext)
|
||||
if err != nil {
|
||||
status = backend.HealthStatusError
|
||||
logsTest = fmt.Sprintf("CloudWatch logs query failed: %s", err.Error())
|
||||
}
|
||||
|
||||
return &backend.CheckHealthResult{
|
||||
Status: status,
|
||||
Message: fmt.Sprintf("1. %s\n2. %s", metricsTest, logsTest),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) newSession(pluginCtx backend.PluginContext, region string) (*session.Session, error) {
|
||||
instance, err := e.getInstance(pluginCtx)
|
||||
if err != nil {
|
||||
@ -240,6 +262,16 @@ func (e *cloudWatchExecutor) newSession(pluginCtx backend.PluginContext, region
|
||||
})
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) getInstance(pluginCtx backend.PluginContext) (*DataSource, error) {
|
||||
i, err := e.im.Get(pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
instance := i.(DataSource)
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) getCWClient(pluginCtx backend.PluginContext, region string) (cloudwatchiface.CloudWatchAPI, error) {
|
||||
sess, err := e.newSession(pluginCtx, region)
|
||||
if err != nil {
|
||||
@ -278,192 +310,6 @@ func (e *cloudWatchExecutor) getRGTAClient(pluginCtx backend.PluginContext, regi
|
||||
return newRGTAClient(sess), nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
queryContext backend.DataQuery, model LogQueryJson) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
||||
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
requestParams := LogQueryJson{
|
||||
Region: model.Region,
|
||||
QueryId: *startQueryOutput.QueryId,
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(alertPollPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
attemptCount := 1
|
||||
for range ticker.C {
|
||||
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isTerminated(*res.Status) {
|
||||
return res, err
|
||||
}
|
||||
if attemptCount >= alertMaxAttempts {
|
||||
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
|
||||
}
|
||||
|
||||
attemptCount++
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
logger := logger.FromContext(ctx)
|
||||
/*
|
||||
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
|
||||
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
|
||||
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
|
||||
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
|
||||
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
|
||||
*/
|
||||
q := req.Queries[0]
|
||||
var model DataQueryJson
|
||||
err := json.Unmarshal(q.JSON, &model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, fromAlert := req.Headers["FromAlert"]
|
||||
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
|
||||
|
||||
if isLogAlertQuery {
|
||||
return e.executeLogAlertQuery(ctx, req)
|
||||
}
|
||||
|
||||
var result *backend.QueryDataResponse
|
||||
switch model.QueryType {
|
||||
case annotationQuery:
|
||||
result, err = e.executeAnnotationQuery(req.PluginContext, model, q)
|
||||
case logAction:
|
||||
result, err = e.executeLogActions(ctx, logger, req)
|
||||
case timeSeriesQuery:
|
||||
fallthrough
|
||||
default:
|
||||
result, err = e.executeTimeSeriesQuery(ctx, logger, req)
|
||||
}
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
for _, q := range req.Queries {
|
||||
var model LogQueryJson
|
||||
err := json.Unmarshal(q.JSON, &model)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
model.Subtype = "StartQuery"
|
||||
model.QueryString = model.Expression
|
||||
|
||||
region := model.Region
|
||||
if model.Region == "" || region == defaultRegion {
|
||||
instance, err := e.getInstance(req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
model.Region = instance.Settings.Region
|
||||
}
|
||||
|
||||
logsClient, err := e.getCWLogsClient(req.PluginContext, region)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var frames []*data.Frame
|
||||
if len(model.StatsGroups) > 0 && len(dataframe.Fields) > 0 {
|
||||
frames, err = groupResults(dataframe, model.StatsGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
frames = data.Frames{dataframe}
|
||||
}
|
||||
|
||||
respD := resp.Responses["A"]
|
||||
respD.Frames = frames
|
||||
resp.Responses["A"] = respD
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) getInstance(pluginCtx backend.PluginContext) (*DataSource, error) {
|
||||
i, err := e.im.Get(pluginCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
instance := i.(DataSource)
|
||||
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
func isTerminated(queryStatus string) bool {
|
||||
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
|
||||
}
|
||||
|
||||
// NewMetricsAPI is a CloudWatch metrics api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider {
|
||||
return cloudwatch.New(sess)
|
||||
}
|
||||
|
||||
// NewLogsAPI is a CloudWatch logs api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewLogsAPI = func(sess *session.Session) models.CloudWatchLogsAPIProvider {
|
||||
return cloudwatchlogs.New(sess)
|
||||
}
|
||||
|
||||
// NewOAMAPI is a CloudWatch OAM api factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewOAMAPI = func(sess *session.Session) models.OAMClientProvider {
|
||||
return oam.New(sess)
|
||||
}
|
||||
|
||||
// NewCWClient is a CloudWatch client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewCWClient = func(sess *session.Session) cloudwatchiface.CloudWatchAPI {
|
||||
return cloudwatch.New(sess)
|
||||
}
|
||||
|
||||
// NewCWLogsClient is a CloudWatch logs client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var NewCWLogsClient = func(sess *session.Session) cloudwatchlogsiface.CloudWatchLogsAPI {
|
||||
return cloudwatchlogs.New(sess)
|
||||
}
|
||||
|
||||
// EC2 client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var newEC2Client = func(provider client.ConfigProvider) ec2iface.EC2API {
|
||||
return ec2.New(provider)
|
||||
}
|
||||
|
||||
// RGTA client factory.
|
||||
//
|
||||
// Stubbable by tests.
|
||||
var newRGTAClient = func(provider client.ConfigProvider) resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI {
|
||||
return resourcegroupstaggingapi.New(provider)
|
||||
}
|
||||
|
201
pkg/tsdb/cloudwatch/cloudwatch_integration_test.go
Normal file
201
pkg/tsdb/cloudwatch/cloudwatch_integration_test.go
Normal file
@ -0,0 +1,201 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/mocks"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models/resources"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_CloudWatch_CallResource_Integration_Test(t *testing.T) {
|
||||
sender := &mockedCallResourceResponseSenderForOauth{}
|
||||
origNewMetricsAPI := NewMetricsAPI
|
||||
origNewOAMAPI := NewOAMAPI
|
||||
origNewLogsAPI := NewLogsAPI
|
||||
NewOAMAPI = func(sess *session.Session) models.OAMAPIProvider { return nil }
|
||||
NewLogsAPI = func(sess *session.Session) models.CloudWatchLogsAPIProvider { return nil }
|
||||
t.Cleanup(func() {
|
||||
NewOAMAPI = origNewOAMAPI
|
||||
NewMetricsAPI = origNewMetricsAPI
|
||||
NewLogsAPI = origNewLogsAPI
|
||||
})
|
||||
|
||||
var api mocks.FakeMetricsAPI
|
||||
NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider {
|
||||
return &api
|
||||
}
|
||||
|
||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
return DataSource{Settings: models.CloudWatchSettings{}}, nil
|
||||
})
|
||||
|
||||
t.Run("Should handle dimension value request and return values from the api", func(t *testing.T) {
|
||||
pageLimit := 100
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value1")}, {Name: aws.String("Test_DimensionName2"), Value: aws.String("Value2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value3")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2"), Value: aws.String("Value1")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value2")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2"), Value: aws.String("Value3")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value4")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value6")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value7")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value1")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value2")}}},
|
||||
}, MetricsPerPage: 100}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-values?region=us-east-2&dimensionKey=Test_DimensionName4&namespace=AWS/EC2&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "Value1"}, {Value: "Value2"}, {Value: "Value7"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle dimension key filter query and return keys from the api", func(t *testing.T) {
|
||||
pageLimit := 3
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}, {Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
}, MetricsPerPage: 2}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/EC2&metricName=CPUUtilization&dimensionFilters={"NodeID":["Shared"],"stage":["QueryCommit"]}`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "Test_DimensionName1"}, {Value: "Test_DimensionName2"}, {Value: "Test_DimensionName4"}, {Value: "Test_DimensionName5"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle standard dimension key query and return hard coded keys", func(t *testing.T) {
|
||||
api = mocks.FakeMetricsAPI{}
|
||||
executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/CloudSearch&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "ClientId"}, {Value: "DomainName"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle custom namespace dimension key query and return hard coded keys", func(t *testing.T) {
|
||||
api = mocks.FakeMetricsAPI{}
|
||||
executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/CloudSearch&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "ClientId"}, {Value: "DomainName"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle custom namespace metrics query and return metrics from api", func(t *testing.T) {
|
||||
pageLimit := 3
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}, {Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Namespace: aws.String("AWS/Redshift"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Namespace: aws.String("AWS/Redshift"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
}, MetricsPerPage: 2}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/metrics?region=us-east-2&namespace=custom-namespace`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[resources.Metric]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[resources.Metric]{{Value: resources.Metric{Name: "Test_MetricName1", Namespace: "AWS/EC2"}}, {Value: resources.Metric{Name: "Test_MetricName2", Namespace: "AWS/EC2"}}, {Value: resources.Metric{Name: "Test_MetricName3", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName10", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName4", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName5", Namespace: "AWS/Redshift"}}}, res)
|
||||
})
|
||||
}
|
@ -20,10 +20,8 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/mocks"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models/resources"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
@ -546,7 +544,7 @@ func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *te
|
||||
origNewOAMAPI := NewOAMAPI
|
||||
origNewLogsAPI := NewLogsAPI
|
||||
NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider { return nil }
|
||||
NewOAMAPI = func(sess *session.Session) models.OAMClientProvider { return nil }
|
||||
NewOAMAPI = func(sess *session.Session) models.OAMAPIProvider { return nil }
|
||||
t.Cleanup(func() {
|
||||
NewOAMAPI = origNewOAMAPI
|
||||
NewMetricsAPI = origNewMetricsAPI
|
||||
@ -602,185 +600,6 @@ func TestQuery_ResourceRequest_DescribeLogGroups_with_CrossAccountQuerying(t *te
|
||||
})
|
||||
}
|
||||
|
||||
func Test_CloudWatch_CallResource_Integration_Test(t *testing.T) {
|
||||
sender := &mockedCallResourceResponseSenderForOauth{}
|
||||
origNewMetricsAPI := NewMetricsAPI
|
||||
origNewOAMAPI := NewOAMAPI
|
||||
origNewLogsAPI := NewLogsAPI
|
||||
NewOAMAPI = func(sess *session.Session) models.OAMClientProvider { return nil }
|
||||
NewLogsAPI = func(sess *session.Session) models.CloudWatchLogsAPIProvider { return nil }
|
||||
t.Cleanup(func() {
|
||||
NewOAMAPI = origNewOAMAPI
|
||||
NewMetricsAPI = origNewMetricsAPI
|
||||
NewLogsAPI = origNewLogsAPI
|
||||
})
|
||||
|
||||
var api mocks.FakeMetricsAPI
|
||||
NewMetricsAPI = func(sess *session.Session) models.CloudWatchMetricsAPIProvider {
|
||||
return &api
|
||||
}
|
||||
|
||||
im := datasource.NewInstanceManager(func(s backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
|
||||
return DataSource{Settings: models.CloudWatchSettings{}}, nil
|
||||
})
|
||||
|
||||
t.Run("Should handle dimension value request and return values from the api", func(t *testing.T) {
|
||||
pageLimit := 100
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value1")}, {Name: aws.String("Test_DimensionName2"), Value: aws.String("Value2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value3")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2"), Value: aws.String("Value1")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value2")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2"), Value: aws.String("Value3")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value4")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value6")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value7")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4"), Value: aws.String("Value1")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1"), Value: aws.String("Value2")}}},
|
||||
}, MetricsPerPage: 100}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-values?region=us-east-2&dimensionKey=Test_DimensionName4&namespace=AWS/EC2&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "Value1"}, {Value: "Value2"}, {Value: "Value7"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle dimension key filter query and return keys from the api", func(t *testing.T) {
|
||||
pageLimit := 3
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}, {Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
}, MetricsPerPage: 2}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/EC2&metricName=CPUUtilization&dimensionFilters={"NodeID":["Shared"],"stage":["QueryCommit"]}`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "Test_DimensionName1"}, {Value: "Test_DimensionName2"}, {Value: "Test_DimensionName4"}, {Value: "Test_DimensionName5"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle standard dimension key query and return hard coded keys", func(t *testing.T) {
|
||||
api = mocks.FakeMetricsAPI{}
|
||||
executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/CloudSearch&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "ClientId"}, {Value: "DomainName"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle custom namespace dimension key query and return hard coded keys", func(t *testing.T) {
|
||||
api = mocks.FakeMetricsAPI{}
|
||||
executor := newExecutor(im, newTestConfig(), &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/dimension-keys?region=us-east-2&namespace=AWS/CloudSearch&metricName=CPUUtilization`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[string]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[string]{{Value: "ClientId"}, {Value: "DomainName"}}, res)
|
||||
})
|
||||
|
||||
t.Run("Should handle custom namespace metrics query and return metrics from api", func(t *testing.T) {
|
||||
pageLimit := 3
|
||||
api = mocks.FakeMetricsAPI{Metrics: []*cloudwatch.Metric{
|
||||
{MetricName: aws.String("Test_MetricName1"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}, {Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName2"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName3"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName10"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}, {Name: aws.String("Test_DimensionName5")}}},
|
||||
{MetricName: aws.String("Test_MetricName4"), Namespace: aws.String("AWS/ECS"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName2")}}},
|
||||
{MetricName: aws.String("Test_MetricName5"), Namespace: aws.String("AWS/Redshift"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName6"), Namespace: aws.String("AWS/Redshift"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
{MetricName: aws.String("Test_MetricName7"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName8"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName4")}}},
|
||||
{MetricName: aws.String("Test_MetricName9"), Namespace: aws.String("AWS/EC2"), Dimensions: []*cloudwatch.Dimension{{Name: aws.String("Test_DimensionName1")}}},
|
||||
}, MetricsPerPage: 2}
|
||||
executor := newExecutor(im, &setting.Cfg{AWSListMetricsPageLimit: pageLimit}, &fakeSessionCache{}, featuremgmt.WithFeatures())
|
||||
|
||||
req := &backend.CallResourceRequest{
|
||||
Method: "GET",
|
||||
Path: `/metrics?region=us-east-2&namespace=custom-namespace`,
|
||||
PluginContext: backend.PluginContext{
|
||||
DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ID: 0},
|
||||
PluginID: "cloudwatch",
|
||||
},
|
||||
}
|
||||
err := executor.CallResource(context.Background(), req, sender)
|
||||
|
||||
require.NoError(t, err)
|
||||
sent := sender.Response
|
||||
require.NotNil(t, sent)
|
||||
require.Equal(t, http.StatusOK, sent.Status)
|
||||
res := []resources.ResourceResponse[resources.Metric]{}
|
||||
err = json.Unmarshal(sent.Body, &res)
|
||||
require.Nil(t, err)
|
||||
assert.Equal(t, []resources.ResourceResponse[resources.Metric]{{Value: resources.Metric{Name: "Test_MetricName1", Namespace: "AWS/EC2"}}, {Value: resources.Metric{Name: "Test_MetricName2", Namespace: "AWS/EC2"}}, {Value: resources.Metric{Name: "Test_MetricName3", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName10", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName4", Namespace: "AWS/ECS"}}, {Value: resources.Metric{Name: "Test_MetricName5", Namespace: "AWS/Redshift"}}}, res)
|
||||
})
|
||||
}
|
||||
|
||||
func stringsToSuggestData(values []string) []suggestData {
|
||||
suggestDataArray := make([]suggestData, 0)
|
||||
for _, v := range values {
|
||||
|
@ -1,8 +0,0 @@
|
||||
package cloudwatch
|
||||
|
||||
const (
|
||||
maxMetricsExceeded = "MaxMetricsExceeded"
|
||||
maxQueryTimeRangeExceeded = "MaxQueryTimeRangeExceeded"
|
||||
maxQueryResultsExceeded = "MaxQueryResultsExceeded"
|
||||
maxMatchingResultsExceeded = "MaxMatchingResultsExceeded"
|
||||
)
|
@ -1,53 +0,0 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
func groupResponseFrame(frame *data.Frame, statsGroups []string) (data.Frames, error) {
|
||||
var dataFrames data.Frames
|
||||
|
||||
// When a query of the form "stats ... by ..." is made, we want to return
|
||||
// one series per group defined in the query, but due to the format
|
||||
// the query response is in, there does not seem to be a way to tell
|
||||
// by the response alone if/how the results should be grouped.
|
||||
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
|
||||
// the "statsGroups" parameter is sent along with the query to the backend so that we
|
||||
// can correctly group the CloudWatch logs response.
|
||||
// Check if we have time field though as it makes sense to split only for time series.
|
||||
if hasTimeField(frame) {
|
||||
if len(statsGroups) > 0 && len(frame.Fields) > 0 {
|
||||
groupedFrames, err := groupResults(frame, statsGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dataFrames = groupedFrames
|
||||
} else {
|
||||
setPreferredVisType(frame, "logs")
|
||||
dataFrames = data.Frames{frame}
|
||||
}
|
||||
} else {
|
||||
dataFrames = data.Frames{frame}
|
||||
}
|
||||
return dataFrames, nil
|
||||
}
|
||||
|
||||
func hasTimeField(frame *data.Frame) bool {
|
||||
for _, field := range frame.Fields {
|
||||
if field.Type() == data.FieldTypeNullableTime {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func setPreferredVisType(frame *data.Frame, visType data.VisType) {
|
||||
if frame.Meta != nil {
|
||||
frame.Meta.PreferredVisualization = visType
|
||||
} else {
|
||||
frame.Meta = &data.FrameMeta{
|
||||
PreferredVisualization: visType,
|
||||
}
|
||||
}
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestGroupResponseFrame(t *testing.T) {
|
||||
t.Run("Doesn't group results without time field", func(t *testing.T) {
|
||||
frame := data.NewFrameOfFieldTypes("test", 0, data.FieldTypeString, data.FieldTypeInt32)
|
||||
frame.AppendRow("val1", int32(10))
|
||||
frame.AppendRow("val2", int32(20))
|
||||
frame.AppendRow("val3", int32(30))
|
||||
|
||||
groupedFrame, err := groupResponseFrame(frame, []string{"something"})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, groupedFrame[0].Rows())
|
||||
require.Equal(t, []interface{}{"val1", "val2", "val3"}, asArray(groupedFrame[0].Fields[0]))
|
||||
require.Equal(t, []interface{}{int32(10), int32(20), int32(30)}, asArray(groupedFrame[0].Fields[1]))
|
||||
})
|
||||
}
|
||||
|
||||
func asArray(field *data.Field) []interface{} {
|
||||
var vals []interface{}
|
||||
for i := 0; i < field.Len(); i++ {
|
||||
vals = append(vals, field.At(i))
|
||||
}
|
||||
return vals
|
||||
}
|
@ -22,9 +22,11 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
limitExceededException = "LimitExceededException"
|
||||
defaultEventLimit = int64(10)
|
||||
defaultLogGroupLimit = int64(50)
|
||||
limitExceededException = "LimitExceededException"
|
||||
defaultEventLimit = int64(10)
|
||||
defaultLogGroupLimit = int64(50)
|
||||
logIdentifierInternal = "__log__grafana_internal__"
|
||||
logStreamIdentifierInternal = "__logstream__grafana_internal__"
|
||||
)
|
||||
|
||||
type AWSError struct {
|
||||
@ -373,3 +375,51 @@ func (e *cloudWatchExecutor) handleGetLogGroupFields(ctx context.Context, logsCl
|
||||
|
||||
return dataFrame, nil
|
||||
}
|
||||
|
||||
func groupResponseFrame(frame *data.Frame, statsGroups []string) (data.Frames, error) {
|
||||
var dataFrames data.Frames
|
||||
|
||||
// When a query of the form "stats ... by ..." is made, we want to return
|
||||
// one series per group defined in the query, but due to the format
|
||||
// the query response is in, there does not seem to be a way to tell
|
||||
// by the response alone if/how the results should be grouped.
|
||||
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
|
||||
// the "statsGroups" parameter is sent along with the query to the backend so that we
|
||||
// can correctly group the CloudWatch logs response.
|
||||
// Check if we have time field though as it makes sense to split only for time series.
|
||||
if hasTimeField(frame) {
|
||||
if len(statsGroups) > 0 && len(frame.Fields) > 0 {
|
||||
groupedFrames, err := groupResults(frame, statsGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dataFrames = groupedFrames
|
||||
} else {
|
||||
setPreferredVisType(frame, "logs")
|
||||
dataFrames = data.Frames{frame}
|
||||
}
|
||||
} else {
|
||||
dataFrames = data.Frames{frame}
|
||||
}
|
||||
return dataFrames, nil
|
||||
}
|
||||
|
||||
func setPreferredVisType(frame *data.Frame, visType data.VisType) {
|
||||
if frame.Meta != nil {
|
||||
frame.Meta.PreferredVisualization = visType
|
||||
} else {
|
||||
frame.Meta = &data.FrameMeta{
|
||||
PreferredVisualization: visType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func hasTimeField(frame *data.Frame) bool {
|
||||
for _, field := range frame.Fields {
|
||||
if field.Type() == data.FieldTypeNullableTime {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -654,3 +654,26 @@ func TestQuery_GetQueryResults(t *testing.T) {
|
||||
},
|
||||
}, resp)
|
||||
}
|
||||
|
||||
func TestGroupResponseFrame(t *testing.T) {
|
||||
t.Run("Doesn't group results without time field", func(t *testing.T) {
|
||||
frame := data.NewFrameOfFieldTypes("test", 0, data.FieldTypeString, data.FieldTypeInt32)
|
||||
frame.AppendRow("val1", int32(10))
|
||||
frame.AppendRow("val2", int32(20))
|
||||
frame.AppendRow("val3", int32(30))
|
||||
|
||||
groupedFrame, err := groupResponseFrame(frame, []string{"something"})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, groupedFrame[0].Rows())
|
||||
require.Equal(t, []interface{}{"val1", "val2", "val3"}, asArray(groupedFrame[0].Fields[0]))
|
||||
require.Equal(t, []interface{}{int32(10), int32(20), int32(30)}, asArray(groupedFrame[0].Fields[1]))
|
||||
})
|
||||
}
|
||||
|
||||
func asArray(field *data.Field) []interface{} {
|
||||
var vals []interface{}
|
||||
for i := 0; i < field.Len(); i++ {
|
||||
vals = append(vals, field.At(i))
|
||||
}
|
||||
return vals
|
||||
}
|
||||
|
107
pkg/tsdb/cloudwatch/log_alert.go
Normal file
107
pkg/tsdb/cloudwatch/log_alert.go
Normal file
@ -0,0 +1,107 @@
|
||||
package cloudwatch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
|
||||
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
const (
|
||||
alertMaxAttempts = 8
|
||||
alertPollPeriod = time.Second
|
||||
)
|
||||
|
||||
func (e *cloudWatchExecutor) executeLogAlertQuery(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||
resp := backend.NewQueryDataResponse()
|
||||
|
||||
for _, q := range req.Queries {
|
||||
var model LogQueryJson
|
||||
err := json.Unmarshal(q.JSON, &model)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
model.Subtype = "StartQuery"
|
||||
model.QueryString = model.Expression
|
||||
|
||||
region := model.Region
|
||||
if model.Region == "" || region == defaultRegion {
|
||||
instance, err := e.getInstance(req.PluginContext)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
model.Region = instance.Settings.Region
|
||||
}
|
||||
|
||||
logsClient, err := e.getCWLogsClient(req.PluginContext, region)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
getQueryResultsOutput, err := e.alertQuery(ctx, logsClient, q, model)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dataframe, err := logsResultsToDataframes(getQueryResultsOutput)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var frames []*data.Frame
|
||||
if len(model.StatsGroups) > 0 && len(dataframe.Fields) > 0 {
|
||||
frames, err = groupResults(dataframe, model.StatsGroups)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
frames = data.Frames{dataframe}
|
||||
}
|
||||
|
||||
respD := resp.Responses["A"]
|
||||
respD.Frames = frames
|
||||
resp.Responses["A"] = respD
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (e *cloudWatchExecutor) alertQuery(ctx context.Context, logsClient cloudwatchlogsiface.CloudWatchLogsAPI,
|
||||
queryContext backend.DataQuery, model LogQueryJson) (*cloudwatchlogs.GetQueryResultsOutput, error) {
|
||||
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, model, queryContext.TimeRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
requestParams := LogQueryJson{
|
||||
Region: model.Region,
|
||||
QueryId: *startQueryOutput.QueryId,
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(alertPollPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
attemptCount := 1
|
||||
for range ticker.C {
|
||||
res, err := e.executeGetQueryResults(ctx, logsClient, requestParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if isTerminated(*res.Status) {
|
||||
return res, err
|
||||
}
|
||||
if attemptCount >= alertMaxAttempts {
|
||||
return res, fmt.Errorf("fetching of query results exceeded max number of attempts")
|
||||
}
|
||||
|
||||
attemptCount++
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
@ -11,6 +11,8 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
const cloudWatchTSFormat = "2006-01-02 15:04:05.000"
|
||||
|
||||
func logsResultsToDataframes(response *cloudwatchlogs.GetQueryResultsOutput) (*data.Frame, error) {
|
||||
if response == nil {
|
||||
return nil, fmt.Errorf("response is nil, cannot convert log results to data frames")
|
||||
|
@ -18,34 +18,18 @@ type RouteHandlerFunc func(pluginCtx backend.PluginContext, reqContextFactory Re
|
||||
type RequestContext struct {
|
||||
MetricsClientProvider MetricsClientProvider
|
||||
LogsAPIProvider CloudWatchLogsAPIProvider
|
||||
OAMClientProvider OAMClientProvider
|
||||
OAMAPIProvider OAMAPIProvider
|
||||
Settings CloudWatchSettings
|
||||
Features featuremgmt.FeatureToggles
|
||||
}
|
||||
|
||||
// Services
|
||||
type ListMetricsProvider interface {
|
||||
GetDimensionKeysByDimensionFilter(resources.DimensionKeysRequest) ([]resources.ResourceResponse[string], error)
|
||||
GetDimensionValuesByDimensionFilter(resources.DimensionValuesRequest) ([]resources.ResourceResponse[string], error)
|
||||
GetMetricsByNamespace(r resources.MetricsRequest) ([]resources.ResourceResponse[resources.Metric], error)
|
||||
}
|
||||
|
||||
type MetricsClientProvider interface {
|
||||
ListMetricsWithPageLimit(params *cloudwatch.ListMetricsInput) ([]resources.MetricResponse, error)
|
||||
}
|
||||
|
||||
type CloudWatchMetricsAPIProvider interface {
|
||||
ListMetricsPages(*cloudwatch.ListMetricsInput, func(*cloudwatch.ListMetricsOutput, bool) bool) error
|
||||
}
|
||||
|
||||
type CloudWatchLogsAPIProvider interface {
|
||||
DescribeLogGroups(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
|
||||
}
|
||||
|
||||
type OAMClientProvider interface {
|
||||
ListSinks(*oam.ListSinksInput) (*oam.ListSinksOutput, error)
|
||||
ListAttachedLinks(*oam.ListAttachedLinksInput) (*oam.ListAttachedLinksOutput, error)
|
||||
}
|
||||
|
||||
type LogGroupsProvider interface {
|
||||
GetLogGroups(request resources.LogGroupsRequest) ([]resources.ResourceResponse[resources.LogGroup], error)
|
||||
}
|
||||
@ -53,3 +37,22 @@ type LogGroupsProvider interface {
|
||||
type AccountsProvider interface {
|
||||
GetAccountsForCurrentUserOrRole() ([]resources.ResourceResponse[resources.Account], error)
|
||||
}
|
||||
|
||||
// Clients
|
||||
type MetricsClientProvider interface {
|
||||
ListMetricsWithPageLimit(params *cloudwatch.ListMetricsInput) ([]resources.MetricResponse, error)
|
||||
}
|
||||
|
||||
// APIs - instead of using the API defined in the services within the aws-sdk-go directly, specify a subset of the API with methods that are actually used in a service or a client
|
||||
type CloudWatchMetricsAPIProvider interface {
|
||||
ListMetricsPages(*cloudwatch.ListMetricsInput, func(*cloudwatch.ListMetricsOutput, bool) bool) error
|
||||
}
|
||||
|
||||
type CloudWatchLogsAPIProvider interface {
|
||||
DescribeLogGroups(*cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error)
|
||||
}
|
||||
|
||||
type OAMAPIProvider interface {
|
||||
ListSinks(*oam.ListSinksInput) (*oam.ListSinksOutput, error)
|
||||
ListAttachedLinks(*oam.ListAttachedLinksInput) (*oam.ListAttachedLinksOutput, error)
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
||||
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/utils"
|
||||
)
|
||||
|
||||
func TestCloudWatchQuery(t *testing.T) {
|
||||
@ -119,7 +120,7 @@ func TestCloudWatchQuery(t *testing.T) {
|
||||
Period: 300,
|
||||
Id: "id1",
|
||||
MatchExact: true,
|
||||
AccountId: pointer("123456789"),
|
||||
AccountId: utils.Pointer("123456789"),
|
||||
Label: "${PROP('Namespace')}",
|
||||
Dimensions: map[string][]string{
|
||||
"InstanceId": {"i-12345678"},
|
||||
@ -141,7 +142,7 @@ func TestCloudWatchQuery(t *testing.T) {
|
||||
Region: "us-east-1",
|
||||
Statistic: "Average",
|
||||
Expression: "SEARCH(someexpression)",
|
||||
AccountId: pointer("123456789"),
|
||||
AccountId: utils.Pointer("123456789"),
|
||||
Period: 300,
|
||||
Id: "id1",
|
||||
MatchExact: true,
|
||||
@ -1059,7 +1060,6 @@ func Test_ParseMetricDataQueries_migrate_alias_to_label(t *testing.T) {
|
||||
dynamicLabelsFeatureToggleEnabled: false,
|
||||
expectedLabel: "some label"},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
query := []backend.DataQuery{
|
||||
@ -1195,5 +1195,3 @@ func Test_ParseMetricDataQueries_account_Id(t *testing.T) {
|
||||
assert.Nil(t, actual[0].AccountId)
|
||||
})
|
||||
}
|
||||
|
||||
func pointer[T any](arg T) *T { return &arg }
|
||||
|
15
pkg/tsdb/cloudwatch/models/gmd_errors.go
Normal file
15
pkg/tsdb/cloudwatch/models/gmd_errors.go
Normal file
@ -0,0 +1,15 @@
|
||||
package models
|
||||
|
||||
const (
|
||||
MaxMetricsExceeded = "MaxMetricsExceeded"
|
||||
MaxQueryTimeRangeExceeded = "MaxQueryTimeRangeExceeded"
|
||||
MaxQueryResultsExceeded = "MaxQueryResultsExceeded"
|
||||
MaxMatchingResultsExceeded = "MaxMatchingResultsExceeded"
|
||||
)
|
||||
|
||||
var ErrorMessages = map[string]string{
|
||||
MaxMetricsExceeded: "Maximum number of allowed metrics exceeded. Your search may have been limited",
|
||||
MaxQueryTimeRangeExceeded: "Max time window exceeded for query",
|
||||
MaxQueryResultsExceeded: "Only the first 500 time series can be returned by a query.",
|
||||
MaxMatchingResultsExceeded: "The query matched more than 10.000 metrics, results might not be accurate.",
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package cloudwatch
|
||||
package models
|
||||
|
||||
import "github.com/aws/aws-sdk-go/service/cloudwatch"
|
||||
|
||||
// queryRowResponse represents the GetMetricData response for a query row in the query editor.
|
||||
type queryRowResponse struct {
|
||||
type QueryRowResponse struct {
|
||||
partialDataSet map[string]*cloudwatch.MetricDataResult
|
||||
ErrorCodes map[string]bool
|
||||
HasArithmeticError bool
|
||||
@ -12,21 +12,17 @@ type queryRowResponse struct {
|
||||
StatusCode string
|
||||
}
|
||||
|
||||
func newQueryRowResponse() queryRowResponse {
|
||||
return queryRowResponse{
|
||||
partialDataSet: make(map[string]*cloudwatch.MetricDataResult),
|
||||
ErrorCodes: map[string]bool{
|
||||
maxMetricsExceeded: false,
|
||||
maxQueryTimeRangeExceeded: false,
|
||||
maxQueryResultsExceeded: false,
|
||||
maxMatchingResultsExceeded: false},
|
||||
func NewQueryRowResponse(errors map[string]bool) QueryRowResponse {
|
||||
return QueryRowResponse{
|
||||
partialDataSet: make(map[string]*cloudwatch.MetricDataResult),
|
||||
ErrorCodes: errors,
|
||||
HasArithmeticError: false,
|
||||
ArithmeticErrorMessage: "",
|
||||
Metrics: []*cloudwatch.MetricDataResult{},
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queryRowResponse) addMetricDataResult(mdr *cloudwatch.MetricDataResult) {
|
||||
func (q *QueryRowResponse) AddMetricDataResult(mdr *cloudwatch.MetricDataResult) {
|
||||
if partialData, ok := q.partialDataSet[*mdr.Label]; ok {
|
||||
partialData.Timestamps = append(partialData.Timestamps, mdr.Timestamps...)
|
||||
partialData.Values = append(partialData.Values, mdr.Values...)
|
||||
@ -44,7 +40,7 @@ func (q *queryRowResponse) addMetricDataResult(mdr *cloudwatch.MetricDataResult)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queryRowResponse) addArithmeticError(message *string) {
|
||||
func (q *QueryRowResponse) AddArithmeticError(message *string) {
|
||||
q.HasArithmeticError = true
|
||||
q.ArithmeticErrorMessage = *message
|
||||
}
|
@ -46,41 +46,38 @@ func (e *cloudWatchExecutor) parseResponse(startTime time.Time, endTime time.Tim
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func aggregateResponse(getMetricDataOutputs []*cloudwatch.GetMetricDataOutput) map[string]queryRowResponse {
|
||||
responseByID := make(map[string]queryRowResponse)
|
||||
errorCodes := map[string]bool{
|
||||
maxMetricsExceeded: false,
|
||||
maxQueryTimeRangeExceeded: false,
|
||||
maxQueryResultsExceeded: false,
|
||||
maxMatchingResultsExceeded: false,
|
||||
func aggregateResponse(getMetricDataOutputs []*cloudwatch.GetMetricDataOutput) map[string]models.QueryRowResponse {
|
||||
responseByID := make(map[string]models.QueryRowResponse)
|
||||
errors := map[string]bool{
|
||||
models.MaxMetricsExceeded: false,
|
||||
models.MaxQueryTimeRangeExceeded: false,
|
||||
models.MaxQueryResultsExceeded: false,
|
||||
models.MaxMatchingResultsExceeded: false,
|
||||
}
|
||||
// first check if any of the getMetricDataOutputs has any errors related to the request. if so, store the errors so they can be added to each query response
|
||||
for _, gmdo := range getMetricDataOutputs {
|
||||
for _, message := range gmdo.Messages {
|
||||
if _, exists := errorCodes[*message.Code]; exists {
|
||||
errorCodes[*message.Code] = true
|
||||
if _, exists := errors[*message.Code]; exists {
|
||||
errors[*message.Code] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, gmdo := range getMetricDataOutputs {
|
||||
for _, r := range gmdo.MetricDataResults {
|
||||
id := *r.Id
|
||||
|
||||
response := newQueryRowResponse()
|
||||
response := models.NewQueryRowResponse(errors)
|
||||
if _, exists := responseByID[id]; exists {
|
||||
response = responseByID[id]
|
||||
}
|
||||
|
||||
for _, message := range r.Messages {
|
||||
if *message.Code == "ArithmeticError" {
|
||||
response.addArithmeticError(message.Value)
|
||||
response.AddArithmeticError(message.Value)
|
||||
}
|
||||
}
|
||||
|
||||
response.addMetricDataResult(r)
|
||||
|
||||
for code := range errorCodes {
|
||||
if _, exists := response.ErrorCodes[code]; exists {
|
||||
response.ErrorCodes[code] = errorCodes[code]
|
||||
}
|
||||
}
|
||||
response.AddMetricDataResult(r)
|
||||
responseByID[id] = response
|
||||
}
|
||||
}
|
||||
@ -112,7 +109,7 @@ func getLabels(cloudwatchLabel string, query *models.CloudWatchQuery) data.Label
|
||||
return labels
|
||||
}
|
||||
|
||||
func buildDataFrames(startTime time.Time, endTime time.Time, aggregatedResponse queryRowResponse,
|
||||
func buildDataFrames(startTime time.Time, endTime time.Time, aggregatedResponse models.QueryRowResponse,
|
||||
query *models.CloudWatchQuery, dynamicLabelEnabled bool) (data.Frames, error) {
|
||||
frames := data.Frames{}
|
||||
for _, metric := range aggregatedResponse.Metrics {
|
||||
@ -194,17 +191,11 @@ func buildDataFrames(startTime time.Time, endTime time.Time, aggregatedResponse
|
||||
Meta: createMeta(query),
|
||||
}
|
||||
|
||||
warningTextMap := map[string]string{
|
||||
"MaxMetricsExceeded": "Maximum number of allowed metrics exceeded. Your search may have been limited",
|
||||
"MaxQueryTimeRangeExceeded": "Max time window exceeded for query",
|
||||
"MaxQueryResultsExceeded": "Only the first 500 time series can be returned by a query.",
|
||||
"MaxMatchingResultsExceeded": "The query matched more than 10.000 metrics, results might not be accurate.",
|
||||
}
|
||||
for code := range aggregatedResponse.ErrorCodes {
|
||||
if aggregatedResponse.ErrorCodes[code] {
|
||||
frame.AppendNotices(data.Notice{
|
||||
Severity: data.NoticeSeverityWarning,
|
||||
Text: "cloudwatch GetMetricData error: " + warningTextMap[code],
|
||||
Text: "cloudwatch GetMetricData error: " + models.ErrorMessages[code],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -112,9 +112,35 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("when aggregating response and error codes are in second GetMetricDataOutput", func(t *testing.T) {
|
||||
getMetricDataOutputs, err := loadGetMetricDataOutputsFromFile("./testdata/multiple-outputs3.json")
|
||||
require.NoError(t, err)
|
||||
aggregatedResponse := aggregateResponse(getMetricDataOutputs)
|
||||
t.Run("response for id a", func(t *testing.T) {
|
||||
idA := "a"
|
||||
idB := "b"
|
||||
t.Run("should have exceeded request limit", func(t *testing.T) {
|
||||
assert.True(t, aggregatedResponse[idA].ErrorCodes["MaxMetricsExceeded"])
|
||||
assert.True(t, aggregatedResponse[idB].ErrorCodes["MaxMetricsExceeded"])
|
||||
})
|
||||
t.Run("should have exceeded query time range", func(t *testing.T) {
|
||||
assert.True(t, aggregatedResponse[idA].ErrorCodes["MaxQueryTimeRangeExceeded"])
|
||||
assert.True(t, aggregatedResponse[idB].ErrorCodes["MaxQueryTimeRangeExceeded"])
|
||||
})
|
||||
t.Run("should have exceeded max query results", func(t *testing.T) {
|
||||
assert.True(t, aggregatedResponse[idA].ErrorCodes["MaxQueryResultsExceeded"])
|
||||
assert.True(t, aggregatedResponse[idB].ErrorCodes["MaxQueryResultsExceeded"])
|
||||
})
|
||||
t.Run("should have exceeded max matching results", func(t *testing.T) {
|
||||
assert.True(t, aggregatedResponse[idA].ErrorCodes["MaxMatchingResultsExceeded"])
|
||||
assert.True(t, aggregatedResponse[idB].ErrorCodes["MaxMatchingResultsExceeded"])
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Expand dimension value using exact match", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("id1"),
|
||||
@ -178,7 +204,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Expand dimension value using substring", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("id1"),
|
||||
@ -241,7 +267,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Expand dimension value using wildcard", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("lb3"),
|
||||
@ -300,7 +326,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Expand dimension value when no values are returned and a multi-valued template variable is used", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("lb3"),
|
||||
@ -339,7 +365,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Expand dimension value when no values are returned and a multi-valued template variable and two single-valued dimensions are used", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("lb3"),
|
||||
@ -381,7 +407,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Should only expand certain fields when using SQL queries", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("lb3"),
|
||||
@ -425,7 +451,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
|
||||
t.Run("Parse cloudwatch response", func(t *testing.T) {
|
||||
timestamp := time.Unix(0, 0)
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Id: aws.String("id1"),
|
||||
@ -475,7 +501,7 @@ func TestCloudWatchResponseParser(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("buildDataFrames should use response label as frame name when dynamic label is enabled", func(t *testing.T) {
|
||||
response := &queryRowResponse{
|
||||
response := &models.QueryRowResponse{
|
||||
Metrics: []*cloudwatch.MetricDataResult{
|
||||
{
|
||||
Label: aws.String("some response label"),
|
||||
|
@ -51,5 +51,5 @@ var newAccountsService = func(pluginCtx backend.PluginContext, reqCtxFactory mod
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return services.NewAccountsService(oamClient.OAMClientProvider), nil
|
||||
return services.NewAccountsService(oamClient.OAMAPIProvider), nil
|
||||
}
|
||||
|
@ -13,10 +13,10 @@ import (
|
||||
var ErrAccessDeniedException = errors.New("access denied. please check your IAM policy")
|
||||
|
||||
type AccountsService struct {
|
||||
models.OAMClientProvider
|
||||
models.OAMAPIProvider
|
||||
}
|
||||
|
||||
func NewAccountsService(oamClient models.OAMClientProvider) models.AccountsProvider {
|
||||
func NewAccountsService(oamClient models.OAMAPIProvider) models.AccountsProvider {
|
||||
return &AccountsService{oamClient}
|
||||
}
|
||||
|
||||
|
149
pkg/tsdb/cloudwatch/testdata/multiple-outputs3.json
vendored
Normal file
149
pkg/tsdb/cloudwatch/testdata/multiple-outputs3.json
vendored
Normal file
@ -0,0 +1,149 @@
|
||||
[
|
||||
{
|
||||
"Messages": [],
|
||||
"MetricDataResults": [
|
||||
{
|
||||
"Id": "a",
|
||||
"Label": "label1",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z",
|
||||
"2021-01-15T19:59:00Z",
|
||||
"2021-01-15T20:14:00Z",
|
||||
"2021-01-15T20:29:00Z",
|
||||
"2021-01-15T20:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982,
|
||||
0.244268469636633,
|
||||
0.15574387947267768,
|
||||
0.14447563659125626,
|
||||
0.15519743138527173
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "a",
|
||||
"Label": "label2",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "b",
|
||||
"Label": "label2",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982
|
||||
]
|
||||
}
|
||||
],
|
||||
"NextToken": null
|
||||
},
|
||||
{
|
||||
"Messages": [
|
||||
{ "Code": "", "Value": null },
|
||||
{ "Code": "MaxMetricsExceeded", "Value": null },
|
||||
{ "Code": "MaxQueryTimeRangeExceeded", "Value": null },
|
||||
{ "Code": "MaxQueryResultsExceeded", "Value": null },
|
||||
{ "Code": "MaxMatchingResultsExceeded", "Value": null }
|
||||
],
|
||||
"MetricDataResults": [
|
||||
{
|
||||
"Id": "a",
|
||||
"Label": "label1",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z",
|
||||
"2021-01-15T19:59:00Z",
|
||||
"2021-01-15T20:14:00Z",
|
||||
"2021-01-15T20:29:00Z",
|
||||
"2021-01-15T20:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982,
|
||||
0.244268469636633,
|
||||
0.15574387947267768,
|
||||
0.14447563659125626,
|
||||
0.15519743138527173
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "a",
|
||||
"Label": "label2",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "b",
|
||||
"Label": "label2",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982
|
||||
]
|
||||
}
|
||||
],
|
||||
"NextToken": null
|
||||
},
|
||||
{
|
||||
"Messages": null,
|
||||
"MetricDataResults": [
|
||||
{
|
||||
"Id": "a",
|
||||
"Label": "label1",
|
||||
"Messages": null,
|
||||
"StatusCode": "Complete",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z",
|
||||
"2021-01-15T19:59:00Z",
|
||||
"2021-01-15T20:14:00Z",
|
||||
"2021-01-15T20:29:00Z",
|
||||
"2021-01-15T20:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982,
|
||||
0.244268469636633,
|
||||
0.15574387947267768,
|
||||
0.14447563659125626,
|
||||
0.15519743138527173
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "b",
|
||||
"Label": "label2",
|
||||
"Messages": [{
|
||||
"Code": "ArithmeticError",
|
||||
"Value": "One or more data-points have been dropped due to non-numeric values (NaN, -Infinite, +Infinite)"
|
||||
}],
|
||||
"StatusCode": "Partial",
|
||||
"Timestamps": [
|
||||
"2021-01-15T19:44:00Z"
|
||||
],
|
||||
"Values": [
|
||||
0.1333395078879982
|
||||
]
|
||||
}
|
||||
],
|
||||
"NextToken": null
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue
Block a user