grafana/pkg/tsdb/cloudwatch/cloudwatch.go
Shirley 972e611f76
CloudWatch Logs: Revert "Queries in an expression should run synchronously (#64443)" (#65036)
Revert "CloudWatch Logs: Queries in an expression should run synchronously (#64443)"

This reverts commit 74436d31de.
2023-03-20 15:54:30 +01:00

317 lines
9.6 KiB
Go

package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/resourcegroupstaggingapi/resourcegroupstaggingapiiface"
"github.com/grafana/grafana-aws-sdk/pkg/awsds"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/resource/httpadapter"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/clients"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch/models"
)
type DataQueryJson struct {
QueryType string `json:"type,omitempty"`
QueryMode string
PrefixMatching bool
Region string
Namespace string
MetricName string
Dimensions map[string]interface{}
Statistic *string
Period string
ActionPrefix string
AlarmNamePrefix string
}
type DataSource struct {
Settings models.CloudWatchSettings
HTTPClient *http.Client
}
const (
defaultRegion = "default"
logsQueryMode = "Logs"
// QueryTypes
annotationQuery = "annotationQuery"
logAction = "logAction"
timeSeriesQuery = "timeSeriesQuery"
)
var logger = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func ProvideService(cfg *setting.Cfg, httpClientProvider httpclient.Provider, features featuremgmt.FeatureToggles) *CloudWatchService {
logger.Debug("Initializing")
executor := newExecutor(datasource.NewInstanceManager(NewInstanceSettings(httpClientProvider)), cfg, awsds.NewSessionCache(), features)
return &CloudWatchService{
Cfg: cfg,
Executor: executor,
}
}
type CloudWatchService struct {
Cfg *setting.Cfg
Executor *cloudWatchExecutor
}
type SessionCache interface {
GetSession(c awsds.SessionConfig) (*session.Session, error)
}
func newExecutor(im instancemgmt.InstanceManager, cfg *setting.Cfg, sessions SessionCache, features featuremgmt.FeatureToggles) *cloudWatchExecutor {
e := &cloudWatchExecutor{
im: im,
cfg: cfg,
sessions: sessions,
features: features,
}
e.resourceHandler = httpadapter.New(e.newResourceMux())
return e
}
func NewInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
instanceSettings, err := models.LoadCloudWatchSettings(settings)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
httpClient, err := httpClientProvider.New()
if err != nil {
return nil, fmt.Errorf("error creating http client: %w", err)
}
return DataSource{
Settings: instanceSettings,
HTTPClient: httpClient,
}, nil
}
}
// cloudWatchExecutor executes CloudWatch requests.
type cloudWatchExecutor struct {
im instancemgmt.InstanceManager
cfg *setting.Cfg
sessions SessionCache
features featuremgmt.FeatureToggles
resourceHandler backend.CallResourceHandler
}
func (e *cloudWatchExecutor) getRequestContext(pluginCtx backend.PluginContext, region string) (models.RequestContext, error) {
r := region
instance, err := e.getInstance(pluginCtx)
if region == defaultRegion {
if err != nil {
return models.RequestContext{}, err
}
r = instance.Settings.Region
}
sess, err := e.newSession(pluginCtx, r)
if err != nil {
return models.RequestContext{}, err
}
return models.RequestContext{
OAMAPIProvider: NewOAMAPI(sess),
MetricsClientProvider: clients.NewMetricsClient(NewMetricsAPI(sess), e.cfg),
LogsAPIProvider: NewLogsAPI(sess),
Settings: instance.Settings,
Features: e.features,
}, nil
}
func (e *cloudWatchExecutor) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
return e.resourceHandler.CallResource(ctx, req, sender)
}
func (e *cloudWatchExecutor) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
logger := logger.FromContext(ctx)
/*
Unlike many other data sources, with Cloudwatch Logs query requests don't receive the results as the response
to the query, but rather an ID is first returned. Following this, a client is expected to send requests along
with the ID until the status of the query is complete, receiving (possibly partial) results each time. For
queries made via dashboards and Explore, the logic of making these repeated queries is handled on the
frontend, but because alerts are executed on the backend the logic needs to be reimplemented here.
*/
q := req.Queries[0]
var model DataQueryJson
err := json.Unmarshal(q.JSON, &model)
if err != nil {
return nil, err
}
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
isLogAlertQuery := fromAlert && model.QueryMode == logsQueryMode
if isLogAlertQuery {
return e.executeLogAlertQuery(ctx, req)
}
var result *backend.QueryDataResponse
switch model.QueryType {
case annotationQuery:
result, err = e.executeAnnotationQuery(req.PluginContext, model, q)
case logAction:
result, err = e.executeLogActions(ctx, logger, req)
case timeSeriesQuery:
fallthrough
default:
result, err = e.executeTimeSeriesQuery(ctx, logger, req)
}
return result, err
}
func (e *cloudWatchExecutor) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
status := backend.HealthStatusOk
metricsTest := "Successfully queried the CloudWatch metrics API."
logsTest := "Successfully queried the CloudWatch logs API."
err := e.checkHealthMetrics(req.PluginContext)
if err != nil {
status = backend.HealthStatusError
metricsTest = fmt.Sprintf("CloudWatch metrics query failed: %s", err.Error())
}
err = e.checkHealthLogs(req.PluginContext)
if err != nil {
status = backend.HealthStatusError
logsTest = fmt.Sprintf("CloudWatch logs query failed: %s", err.Error())
}
return &backend.CheckHealthResult{
Status: status,
Message: fmt.Sprintf("1. %s\n2. %s", metricsTest, logsTest),
}, nil
}
func (e *cloudWatchExecutor) checkHealthMetrics(pluginCtx backend.PluginContext) error {
namespace := "AWS/Billing"
metric := "EstimatedCharges"
params := &cloudwatch.ListMetricsInput{
Namespace: &namespace,
MetricName: &metric,
}
session, err := e.newSession(pluginCtx, defaultRegion)
if err != nil {
return err
}
metricClient := clients.NewMetricsClient(NewMetricsAPI(session), e.cfg)
_, err = metricClient.ListMetricsWithPageLimit(params)
return err
}
func (e *cloudWatchExecutor) checkHealthLogs(pluginCtx backend.PluginContext) error {
session, err := e.newSession(pluginCtx, defaultRegion)
if err != nil {
return err
}
logsClient := NewLogsAPI(session)
_, err = logsClient.DescribeLogGroups(&cloudwatchlogs.DescribeLogGroupsInput{Limit: aws.Int64(1)})
return err
}
func (e *cloudWatchExecutor) newSession(pluginCtx backend.PluginContext, region string) (*session.Session, error) {
instance, err := e.getInstance(pluginCtx)
if err != nil {
return nil, err
}
if region == defaultRegion {
region = instance.Settings.Region
}
return e.sessions.GetSession(awsds.SessionConfig{
// https://github.com/grafana/grafana/issues/46365
// HTTPClient: dsInfo.HTTPClient,
Settings: awsds.AWSDatasourceSettings{
Profile: instance.Settings.Profile,
Region: region,
AuthType: instance.Settings.AuthType,
AssumeRoleARN: instance.Settings.AssumeRoleARN,
ExternalID: instance.Settings.ExternalID,
Endpoint: instance.Settings.Endpoint,
DefaultRegion: instance.Settings.Region,
AccessKey: instance.Settings.AccessKey,
SecretKey: instance.Settings.SecretKey,
},
UserAgentName: aws.String("Cloudwatch"),
})
}
func (e *cloudWatchExecutor) getInstance(pluginCtx backend.PluginContext) (*DataSource, error) {
i, err := e.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance := i.(DataSource)
return &instance, nil
}
func (e *cloudWatchExecutor) getCWClient(pluginCtx backend.PluginContext, region string) (cloudwatchiface.CloudWatchAPI, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return NewCWClient(sess), nil
}
func (e *cloudWatchExecutor) getCWLogsClient(pluginCtx backend.PluginContext, region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
logsClient := NewCWLogsClient(sess)
return logsClient, nil
}
func (e *cloudWatchExecutor) getEC2Client(pluginCtx backend.PluginContext, region string) (ec2iface.EC2API, error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return newEC2Client(sess), nil
}
func (e *cloudWatchExecutor) getRGTAClient(pluginCtx backend.PluginContext, region string) (resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI,
error) {
sess, err := e.newSession(pluginCtx, region)
if err != nil {
return nil, err
}
return newRGTAClient(sess), nil
}
func isTerminated(queryStatus string) bool {
return queryStatus == "Complete" || queryStatus == "Cancelled" || queryStatus == "Failed" || queryStatus == "Timeout"
}