CloudWatch Logs queue and websocket support (#28176)

CloudWatch Logs queue and websocket support
This commit is contained in:
kay delaney 2020-10-28 08:36:57 +00:00 committed by GitHub
parent e94b37c656
commit c4c5b2dc61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 701 additions and 96 deletions

5
go.mod
View File

@ -15,7 +15,7 @@ require (
cloud.google.com/go/storage v1.12.0
github.com/BurntSushi/toml v0.3.1
github.com/VividCortex/mysqlerr v0.0.0-20170204212430-6c6b55f8796f
github.com/aws/aws-sdk-go v1.33.12
github.com/aws/aws-sdk-go v1.35.5
github.com/beevik/etree v1.1.0
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
@ -40,6 +40,7 @@ require (
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.2
github.com/google/uuid v1.1.2
github.com/gosimple/slug v1.4.2
github.com/grafana/grafana-plugin-model v0.0.0-20190930120109-1fc953a61fb4
github.com/grafana/grafana-plugin-sdk-go v0.78.0
@ -50,7 +51,7 @@ require (
github.com/hashicorp/go-version v1.2.0
github.com/inconshreveable/log15 v0.0.0-20180818164646-67afb5ed74ec
github.com/influxdata/influxdb-client-go/v2 v2.0.1
github.com/jmespath/go-jmespath v0.3.0
github.com/jmespath/go-jmespath v0.4.0
github.com/jonboulle/clockwork v0.2.1 // indirect
github.com/jung-kurt/gofpdf v1.10.1
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect

7
go.sum
View File

@ -155,6 +155,8 @@ github.com/aws/aws-sdk-go v1.31.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU
github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.33.12 h1:eydMoSwfrSTD9PWKUJOiDL7+/UwDW8AjInUGVE5Llh4=
github.com/aws/aws-sdk-go v1.33.12/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.35.5 h1:doSEOxC0UkirPcle20Rc+1kAhJ4Ip+GSEeZ3nKl7Qlk=
github.com/aws/aws-sdk-go v1.35.5/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
@ -718,6 +720,10 @@ github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
@ -1067,6 +1073,7 @@ github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e/go.mod h1:gi+0
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/satori/go.uuid v0.0.0-20160603004225-b111a074d5ef/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=

View File

@ -9,6 +9,7 @@ import (
"github.com/grafana/grafana/pkg/models"
)
// registerRoutes registers all API HTTP routes.
func (hs *HTTPServer) registerRoutes() {
reqSignedIn := middleware.ReqSignedIn
reqGrafanaAdmin := middleware.ReqGrafanaAdmin
@ -435,11 +436,6 @@ func (hs *HTTPServer) registerRoutes() {
avatarCacheServer := avatar.NewCacheServer()
r.Get("/avatar/:hash", avatarCacheServer.Handler)
// Live streaming
if hs.Live != nil {
r.Any("/live/*", hs.Live.WebsocketHandler)
}
// Snapshots
r.Post("/api/snapshots/", reqSnapshotPublicModeOrSignedIn, bind(models.CreateDashboardSnapshotCommand{}), CreateDashboardSnapshot)
r.Get("/api/snapshot/shared-options/", reqSignedIn, GetSharingOptions)

View File

@ -262,7 +262,7 @@ func (hs *HTTPServer) PostDashboard(c *models.ReqContext, cmd models.SaveDashboa
}
// Tell everyone listening that the dashboard changed
if hs.Live != nil {
if hs.Live.IsEnabled() {
err := hs.Live.GrafanaScope.Dashboards.DashboardSaved(
dashboard.Uid,
c.UserId,

View File

@ -13,6 +13,7 @@ import (
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/alerting"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/live"
"github.com/grafana/grafana/pkg/services/provisioning"
"github.com/grafana/grafana/pkg/setting"
"github.com/stretchr/testify/require"
@ -1129,6 +1130,7 @@ func postDashboardScenario(desc string, url string, routePattern string, mock *d
Bus: bus.GetBus(),
Cfg: setting.NewCfg(),
ProvisioningService: provisioning.NewProvisioningServiceMock(),
Live: &live.GrafanaLive{Cfg: setting.NewCfg()},
}
sc := setupScenarioContext(url)
@ -1188,6 +1190,7 @@ func restoreDashboardVersionScenario(desc string, url string, routePattern strin
Cfg: setting.NewCfg(),
Bus: bus.GetBus(),
ProvisioningService: provisioning.NewProvisioningServiceMock(),
Live: &live.GrafanaLive{Cfg: setting.NewCfg()},
}
sc := setupScenarioContext(url)

View File

@ -75,22 +75,13 @@ type HTTPServer struct {
SearchService *search.SearchService `inject:""`
AlertNG *eval.AlertNG `inject:""`
ShortURLService *shorturls.ShortURLService `inject:""`
Live *live.GrafanaLive
Live *live.GrafanaLive `inject:""`
Listener net.Listener
}
func (hs *HTTPServer) Init() error {
hs.log = log.New("http.server")
// Set up a websocket broker
if hs.Cfg.IsLiveEnabled() { // feature flag
node, err := live.InitializeBroker()
if err != nil {
return err
}
hs.Live = node
}
hs.macaron = hs.newMacaron()
hs.registerRoutes()

View File

@ -70,7 +70,6 @@ func getServicesWithOverrides() []*Descriptor {
// Service interface is the lowest common shape that services
// are expected to fulfill to be started within Grafana.
type Service interface {
// Init is called by Grafana main process which gives the service
// the possibility do some initial work before its started. Things
// like adding routes, bus handlers should be done in the Init function
@ -82,7 +81,6 @@ type Service interface {
// that might not always be started, ex alerting.
// This will be called after `Init()`.
type CanBeDisabled interface {
// IsDisabled should return a bool saying if it can be started or not.
IsDisabled() bool
}
@ -99,13 +97,12 @@ type BackgroundService interface {
// DatabaseMigrator allows the caller to add migrations to
// the migrator passed as argument
type DatabaseMigrator interface {
// AddMigrations allows the service to add migrations to
// the database migrator.
AddMigration(mg *migrator.Migrator)
}
// IsDisabled takes an service and return true if its disabled
// IsDisabled returns whether a service is disabled.
func IsDisabled(srv Service) bool {
canBeDisabled, ok := srv.(CanBeDisabled)
return ok && canBeDisabled.IsDisabled()

View File

@ -293,7 +293,7 @@ func (s *Server) buildServiceGraph(services []*registry.Descriptor) error {
// Resolve services and their dependencies.
if err := serviceGraph.Populate(); err != nil {
return errutil.Wrapf(err, "Failed to populate service dependency")
return errutil.Wrapf(err, "Failed to populate service dependencies")
}
return nil

View File

@ -7,10 +7,14 @@ import (
"sync"
"github.com/centrifugal/centrifuge"
"github.com/grafana/grafana/pkg/api/routing"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/services/live/features"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/cloudwatch"
)
var (
@ -18,6 +22,16 @@ var (
loggerCF = log.New("live.centrifuge")
)
func init() {
registry.RegisterService(&GrafanaLive{
channels: make(map[string]models.ChannelHandler),
channelsMu: sync.RWMutex{},
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]models.ChannelHandlerFactory),
},
})
}
// CoreGrafanaScope list of core features
type CoreGrafanaScope struct {
Features map[string]models.ChannelHandlerFactory
@ -28,7 +42,10 @@ type CoreGrafanaScope struct {
// GrafanaLive pretends to be the server
type GrafanaLive struct {
node *centrifuge.Node
Cfg *setting.Cfg `inject:""`
RouteRegister routing.RouteRegister `inject:""`
LogsService *cloudwatch.LogsService `inject:""`
node *centrifuge.Node
// The websocket handler
WebsocketHandler interface{}
@ -41,14 +58,14 @@ type GrafanaLive struct {
GrafanaScope CoreGrafanaScope
}
// InitializeBroker initializes the broker and starts listening for requests.
func InitializeBroker() (*GrafanaLive, error) {
glive := &GrafanaLive{
channels: make(map[string]models.ChannelHandler),
channelsMu: sync.RWMutex{},
GrafanaScope: CoreGrafanaScope{
Features: make(map[string]models.ChannelHandlerFactory),
},
// Init initializes the instance.
// Required to implement the registry.Service interface.
func (g *GrafanaLive) Init() error {
logger.Debug("GrafanaLive initing")
if !g.IsEnabled() {
logger.Debug("GrafanaLive feature not enabled, skipping initialization")
return nil
}
// We use default config here as starting point. Default config contains
@ -60,7 +77,7 @@ func InitializeBroker() (*GrafanaLive, error) {
// This function is called fast and often -- it must be sychronized
cfg.ChannelOptionsFunc = func(channel string) (centrifuge.ChannelOptions, bool, error) {
handler, err := glive.GetChannelHandler(channel)
handler, err := g.GetChannelHandler(channel)
if err != nil {
logger.Error("ChannelOptionsFunc", "channel", channel, "err", err)
if err.Error() == "404" { // ????
@ -78,22 +95,22 @@ func InitializeBroker() (*GrafanaLive, error) {
// only from client side.
node, err := centrifuge.New(cfg)
if err != nil {
return nil, err
return err
}
glive.node = node
g.node = node
// Initialize the main features
dash := &features.DashboardHandler{
Publisher: glive.Publish,
Publisher: g.Publish,
}
glive.GrafanaScope.Dashboards = dash
glive.GrafanaScope.Features["dashboard"] = dash
glive.GrafanaScope.Features["testdata"] = &features.TestDataSupplier{
Publisher: glive.Publish,
g.GrafanaScope.Dashboards = dash
g.GrafanaScope.Features["dashboard"] = dash
g.GrafanaScope.Features["testdata"] = &features.TestDataSupplier{
Publisher: g.Publish,
}
glive.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
glive.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{}
g.GrafanaScope.Features["broadcast"] = &features.BroadcastRunner{}
g.GrafanaScope.Features["measurements"] = &features.MeasurementsRunner{}
// Set ConnectHandler called when client successfully connected to Node. Your code
// inside handler must be synchronized since it will be called concurrently from
@ -121,7 +138,7 @@ func InitializeBroker() (*GrafanaLive, error) {
node.OnSubscribe(func(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
reply := centrifuge.SubscribeReply{}
handler, err := glive.GetChannelHandler(e.Channel)
handler, err := g.GetChannelHandler(e.Channel)
if err != nil {
return reply, err
}
@ -141,7 +158,7 @@ func InitializeBroker() (*GrafanaLive, error) {
// Called when something is written to the websocket
node.OnPublish(func(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
reply := centrifuge.PublishReply{}
handler, err := glive.GetChannelHandler(e.Channel)
handler, err := g.GetChannelHandler(e.Channel)
if err != nil {
return reply, err
}
@ -158,7 +175,7 @@ func InitializeBroker() (*GrafanaLive, error) {
// Run node. This method does not block.
if err := node.Run(); err != nil {
return nil, err
return err
}
// SockJS will find the best protocol possible for the browser
@ -175,7 +192,7 @@ func InitializeBroker() (*GrafanaLive, error) {
WriteBufferSize: 1024,
})
glive.WebsocketHandler = func(ctx *models.ReqContext) {
g.WebsocketHandler = func(ctx *models.ReqContext) {
user := ctx.SignedInUser
if user == nil {
ctx.Resp.WriteHeader(401)
@ -223,7 +240,10 @@ func InitializeBroker() (*GrafanaLive, error) {
// Unknown path
ctx.Resp.WriteHeader(404)
}
return glive, nil
g.RouteRegister.Any("/live/*", g.WebsocketHandler)
return nil
}
// GetChannelHandler gives threadsafe access to the channel
@ -280,6 +300,14 @@ func (g *GrafanaLive) GetChannelHandlerFactory(scope string, name string) (model
}
if scope == "plugin" {
// Temporary hack until we have a more generic solution later on
if name == "cloudwatch" {
return &cloudwatch.LogQueryRunnerSupplier{
Publisher: g.Publish,
Service: g.LogsService,
}, nil
}
p, ok := plugins.Plugins[name]
if ok {
h := &PluginHandler{
@ -299,6 +327,11 @@ func (g *GrafanaLive) Publish(channel string, data []byte) error {
return err
}
// IsEnabled returns true if the Grafana Live feature is enabled.
func (g *GrafanaLive) IsEnabled() bool {
return g.Cfg.IsLiveEnabled()
}
// Write to the standard log15 logger
func handleLog(msg centrifuge.LogEntry) {
arr := make([]interface{}, 0)

View File

@ -5,7 +5,6 @@ import (
"fmt"
"regexp"
"strings"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
@ -27,6 +26,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
)
@ -54,14 +54,30 @@ var plog = log.New("tsdb.cloudwatch")
var aliasFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
func init() {
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", func(ds *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return newExecutor(), nil
registry.Register(&registry.Descriptor{
Name: "CloudWatchService",
InitPriority: registry.Low,
Instance: &CloudWatchService{},
})
}
func newExecutor() *cloudWatchExecutor {
type CloudWatchService struct {
LogsService *LogsService `inject:""`
}
func (s *CloudWatchService) Init() error {
plog.Debug("initing")
tsdb.RegisterTsdbQueryEndpoint("cloudwatch", func(ds *models.DataSource) (tsdb.TsdbQueryEndpoint, error) {
return newExecutor(s.LogsService), nil
})
return nil
}
func newExecutor(logsService *LogsService) *cloudWatchExecutor {
return &cloudWatchExecutor{
logsClientsByRegion: map[string]cloudwatchlogsiface.CloudWatchLogsAPI{},
logsService: logsService,
}
}
@ -69,10 +85,10 @@ func newExecutor() *cloudWatchExecutor {
type cloudWatchExecutor struct {
*models.DataSource
ec2Client ec2iface.EC2API
rgtaClient resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsClientsByRegion map[string]cloudwatchlogsiface.CloudWatchLogsAPI
mtx sync.Mutex
ec2Client ec2iface.EC2API
rgtaClient resourcegroupstaggingapiiface.ResourceGroupsTaggingAPIAPI
logsService *LogsService
}
func (e *cloudWatchExecutor) newSession(region string) (*session.Session, error) {
@ -187,20 +203,12 @@ func (e *cloudWatchExecutor) getCWClient(region string) (cloudwatchiface.CloudWa
}
func (e *cloudWatchExecutor) getCWLogsClient(region string) (cloudwatchlogsiface.CloudWatchLogsAPI, error) {
e.mtx.Lock()
defer e.mtx.Unlock()
if logsClient, ok := e.logsClientsByRegion[region]; ok {
return logsClient, nil
}
sess, err := e.newSession(region)
if err != nil {
return nil, err
}
logsClient := newCWLogsClient(sess)
e.logsClientsByRegion[region] = logsClient
return logsClient, nil
}
@ -301,6 +309,8 @@ func (e *cloudWatchExecutor) Query(ctx context.Context, dsInfo *models.DataSourc
result, err = e.executeAnnotationQuery(ctx, queryContext)
case "logAction":
result, err = e.executeLogActions(ctx, queryContext)
case "liveLogAction":
result, err = e.executeLiveLogQuery(ctx, queryContext)
case "timeSeriesQuery":
fallthrough
default:

319
pkg/tsdb/cloudwatch/live.go Normal file
View File

@ -0,0 +1,319 @@
package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/servicequotas"
"github.com/aws/aws-sdk-go/service/servicequotas/servicequotasiface"
"github.com/centrifugal/centrifuge"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util/retryer"
"golang.org/x/sync/errgroup"
)
const defaultConcurrentQueries = 4
type LogQueryRunnerSupplier struct {
Publisher models.ChannelPublisher
Service *LogsService
}
type logQueryRunner struct {
channelName string
publish models.ChannelPublisher
running map[string]bool
runningMu sync.Mutex
service *LogsService
}
const (
maxAttempts = 8
minRetryDelay = 500 * time.Millisecond
maxRetryDelay = 30 * time.Second
)
// GetHandlerForPath gets the channel handler for a certain path.
func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return &logQueryRunner{
channelName: path,
publish: s.Publisher,
running: make(map[string]bool),
service: s.Service,
}, nil
}
// GetChannelOptions gets channel options.
// It's called fast and often.
func (r *logQueryRunner) GetChannelOptions(id string) centrifuge.ChannelOptions {
return centrifuge.ChannelOptions{}
}
// OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel
func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) error {
r.runningMu.Lock()
defer r.runningMu.Unlock()
if _, ok := r.running[e.Channel]; ok {
return nil
}
r.running[e.Channel] = true
go func() {
if err := r.publishResults(e.Channel); err != nil {
plog.Error(err.Error())
}
}()
return nil
}
// OnPublish is called when an event is received from the websocket.
func (r *logQueryRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) ([]byte, error) {
return nil, fmt.Errorf("can not publish")
}
func (r *logQueryRunner) publishResults(channelName string) error {
defer func() {
r.service.DeleteResponseChannel(channelName)
r.runningMu.Lock()
delete(r.running, channelName)
r.runningMu.Unlock()
}()
responseChannel, err := r.service.GetResponseChannel(channelName)
if err != nil {
return err
}
for response := range responseChannel {
responseBytes, err := json.Marshal(response)
if err != nil {
return err
}
if err := r.publish(channelName, responseBytes); err != nil {
return err
}
}
return nil
}
// executeLiveLogQuery executes a CloudWatch Logs query with live updates over WebSocket.
// A WebSocket channel is created, which goroutines send responses over.
func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
responseChannelName := uuid.New().String()
responseChannel := make(chan *tsdb.Response)
if err := e.logsService.AddResponseChannel("plugin/cloudwatch/"+responseChannelName, responseChannel); err != nil {
close(responseChannel)
return nil, err
}
go e.sendLiveQueriesToChannel(queryContext, responseChannel)
response := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"A": {
RefId: "A",
Meta: simplejson.NewFromAny(map[string]interface{}{
"channelName": responseChannelName,
}),
},
},
}
return response, nil
}
func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext *tsdb.TsdbQuery, responseChannel chan *tsdb.Response) {
defer close(responseChannel)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
eg, ectx := errgroup.WithContext(ctx)
for _, query := range queryContext.Queries {
query := query
eg.Go(func() error {
return e.startLiveQuery(ectx, responseChannel, query, queryContext.TimeRange)
})
}
if err := eg.Wait(); err != nil {
plog.Error(err.Error())
}
}
func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) {
e.logsService.queueLock.Lock()
defer e.logsService.queueLock.Unlock()
if queue, ok := e.logsService.queues[queueKey]; ok {
return queue, nil
}
concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey)
queueChannel := make(chan bool, concurrentQueriesQuota)
e.logsService.queues[queueKey] = queueChannel
return queueChannel, nil
}
func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int {
sess, err := e.newSession(region)
if err != nil {
plog.Warn("Could not get service quota client")
return defaultConcurrentQueries
}
client := newQuotasClient(sess)
concurrentQueriesQuota, err := client.GetServiceQuota(&servicequotas.GetServiceQuotaInput{
ServiceCode: aws.String("logs"),
QuotaCode: aws.String("L-32C48FBB"),
})
if err != nil {
plog.Warn("Could not get service quota")
return defaultConcurrentQueries
}
if concurrentQueriesQuota != nil && concurrentQueriesQuota.Quota != nil && concurrentQueriesQuota.Quota.Value != nil {
return int(*concurrentQueriesQuota.Quota.Value)
}
plog.Warn("Could not get service quota")
defaultConcurrentQueriesQuota, err := client.GetAWSDefaultServiceQuota(&servicequotas.GetAWSDefaultServiceQuotaInput{
ServiceCode: aws.String("logs"),
QuotaCode: aws.String("L-32C48FBB"),
})
if err != nil {
plog.Warn("Could not get default service quota")
return defaultConcurrentQueries
}
if defaultConcurrentQueriesQuota != nil && defaultConcurrentQueriesQuota.Quota != nil && defaultConcurrentQueriesQuota.Quota.Value != nil {
return int(*defaultConcurrentQueriesQuota.Quota.Value)
}
plog.Warn("Could not get default service quota")
return defaultConcurrentQueries
}
func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan *tsdb.Response, query *tsdb.Query, timeRange *tsdb.TimeRange) error {
defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString()
parameters := query.Model
region := parameters.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region)
if err != nil {
return err
}
queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, e.DataSource.Id))
if err != nil {
return err
}
// Wait until there are no more active workers than the concurrent queries quota
queue <- true
defer func() { <-queue }()
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange)
if err != nil {
return err
}
queryResultsInput := &cloudwatchlogs.GetQueryResultsInput{
QueryId: startQueryOutput.QueryId,
}
recordsMatched := 0.0
return retryer.Retry(func() (retryer.RetrySignal, error) {
getQueryResultsOutput, err := logsClient.GetQueryResultsWithContext(ctx, queryResultsInput)
if err != nil {
return retryer.FuncError, err
}
retryNeeded := *getQueryResultsOutput.Statistics.RecordsMatched <= recordsMatched
recordsMatched = *getQueryResultsOutput.Statistics.RecordsMatched
dataFrame, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return retryer.FuncError, err
}
dataFrame.Name = query.RefId
dataFrame.RefID = query.RefId
var dataFrames data.Frames
// When a query of the form "stats ... by ..." is made, we want to return
// one series per group defined in the query, but due to the format
// the query response is in, there does not seem to be a way to tell
// by the response alone if/how the results should be grouped.
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "statsGroups" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
statsGroups := parameters.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataFrame.Fields) > 0 {
groupedFrames, err := groupResults(dataFrame, statsGroups)
if err != nil {
return retryer.FuncError, err
}
dataFrames = groupedFrames
} else {
if dataFrame.Meta != nil {
dataFrame.Meta.PreferredVisualization = "logs"
} else {
dataFrame.Meta = &data.FrameMeta{
PreferredVisualization: "logs",
}
}
dataFrames = data.Frames{dataFrame}
}
responseChannel <- &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
query.RefId: {
RefId: query.RefId,
Dataframes: tsdb.NewDecodedDataFrames(dataFrames),
},
},
}
if isTerminated(*getQueryResultsOutput.Status) {
return retryer.FuncComplete, nil
} else if retryNeeded {
return retryer.FuncFailure, nil
}
return retryer.FuncSuccess, nil
}, maxAttempts, minRetryDelay, maxRetryDelay)
}
// Service quotas client factory.
//
// Stubbable by tests.
var newQuotasClient = func(sess *session.Session) servicequotasiface.ServiceQuotasAPI {
client := servicequotas.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client
}

View File

@ -47,7 +47,7 @@ func TestQuery_DescribeLogGroups(t *testing.T) {
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -100,7 +100,7 @@ func TestQuery_DescribeLogGroups(t *testing.T) {
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -170,7 +170,7 @@ func TestQuery_GetLogGroupFields(t *testing.T) {
const refID = "A"
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -249,7 +249,7 @@ func TestQuery_StartQuery(t *testing.T) {
To: "1584700643000",
}
executor := newExecutor()
executor := newExecutor(nil)
_, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: []*tsdb.Query{
@ -295,7 +295,7 @@ func TestQuery_StartQuery(t *testing.T) {
To: "1584873443000",
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: []*tsdb.Query{
@ -371,7 +371,7 @@ func TestQuery_StopQuery(t *testing.T) {
To: "1584700643000",
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
TimeRange: timeRange,
Queries: []*tsdb.Query{
@ -458,7 +458,7 @@ func TestQuery_GetQueryResults(t *testing.T) {
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{

View File

@ -0,0 +1,63 @@
package cloudwatch
import (
"fmt"
"sync"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/tsdb"
)
func init() {
registry.RegisterService(&LogsService{})
}
// LogsService provides methods for querying CloudWatch Logs.
type LogsService struct {
channelMu sync.Mutex
responseChannels map[string]chan *tsdb.Response
queues map[string](chan bool)
queueLock sync.Mutex
}
// Init is called by the DI framework to initialize the instance.
func (s *LogsService) Init() error {
s.responseChannels = make(map[string]chan *tsdb.Response)
s.queues = make(map[string](chan bool))
return nil
}
func (s *LogsService) AddResponseChannel(name string, channel chan *tsdb.Response) error {
s.channelMu.Lock()
defer s.channelMu.Unlock()
if _, ok := s.responseChannels[name]; ok {
return fmt.Errorf("channel with name '%s' already exists", name)
}
s.responseChannels[name] = channel
return nil
}
func (s *LogsService) GetResponseChannel(name string) (chan *tsdb.Response, error) {
s.channelMu.Lock()
defer s.channelMu.Unlock()
if responseChannel, ok := s.responseChannels[name]; ok {
return responseChannel, nil
}
return nil, fmt.Errorf("channel with name '%s' not found", name)
}
func (s *LogsService) DeleteResponseChannel(name string) {
s.channelMu.Lock()
defer s.channelMu.Unlock()
if _, ok := s.responseChannels[name]; ok {
delete(s.responseChannels, name)
return
}
plog.Warn("Channel with name '" + name + "' not found")
}

View File

@ -44,7 +44,7 @@ func TestQuery_Metrics(t *testing.T) {
},
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -101,7 +101,7 @@ func TestQuery_Metrics(t *testing.T) {
},
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -163,7 +163,7 @@ func TestQuery_Regions(t *testing.T) {
cli = fakeEC2Client{
regions: []string{regionName},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -245,7 +245,7 @@ func TestQuery_InstanceAttributes(t *testing.T) {
},
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -348,7 +348,7 @@ func TestQuery_EBSVolumeIDs(t *testing.T) {
},
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{
@ -448,7 +448,7 @@ func TestQuery_ResourceARNs(t *testing.T) {
},
},
}
executor := newExecutor()
executor := newExecutor(nil)
resp, err := executor.Query(context.Background(), fakeDataSource(), &tsdb.TsdbQuery{
Queries: []*tsdb.Query{
{

View File

@ -9,7 +9,7 @@ import (
)
func TestQueryTransformer(t *testing.T) {
executor := newExecutor()
executor := newExecutor(nil)
t.Run("One cloudwatchQuery is generated when its request query has one stat", func(t *testing.T) {
requestQueries := []*requestQuery{
{

View File

@ -57,7 +57,7 @@ func TestNewSession_AssumeRole(t *testing.T) {
const roleARN = "test"
e := newExecutor()
e := newExecutor(nil)
e.DataSource = fakeDataSource(fakeDataSourceCfg{
assumeRoleARN: roleARN,
})
@ -84,7 +84,7 @@ func TestNewSession_AssumeRole(t *testing.T) {
const roleARN = "test"
const externalID = "external"
e := newExecutor()
e := newExecutor(nil)
e.DataSource = fakeDataSource(fakeDataSourceCfg{
assumeRoleARN: roleARN,
externalID: externalID,

View File

@ -9,7 +9,7 @@ import (
)
func TestTimeSeriesQuery(t *testing.T) {
executor := newExecutor()
executor := newExecutor(nil)
t.Run("End time before start time should result in error", func(t *testing.T) {
_, err := executor.executeTimeSeriesQuery(context.TODO(), &tsdb.TsdbQuery{TimeRange: tsdb.NewTimeRange("now-1h", "now-2h")})

View File

@ -0,0 +1,56 @@
package retryer
import (
"time"
)
type RetrySignal = int
const (
FuncSuccess RetrySignal = iota
FuncFailure
FuncComplete
FuncError
)
// Retry retries the provided function using exponential backoff, starting with `minDelay` between attempts, and increasing to
// `maxDelay` after each failure. Stops when the provided function returns `FuncComplete`, or `maxRetries` is reached.
func Retry(body func() (RetrySignal, error), maxRetries int, minDelay time.Duration, maxDelay time.Duration) error {
currentDelay := minDelay
ticker := time.NewTicker(currentDelay)
defer ticker.Stop()
retries := 0
for range ticker.C {
response, err := body()
if err != nil {
return err
}
switch response {
case FuncSuccess:
currentDelay = minDelay
ticker.Reset(currentDelay)
retries = 0
case FuncFailure:
currentDelay = minDuration(currentDelay*2, maxDelay)
ticker.Reset(currentDelay)
retries++
case FuncComplete:
return nil
}
if retries >= maxRetries {
return nil
}
}
return nil
}
func minDuration(a time.Duration, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}

View File

@ -0,0 +1,22 @@
package retryer
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestMaxRetries(t *testing.T) {
retryVal := 0
err := Retry(func() (RetrySignal, error) {
retryVal++
return FuncFailure, nil
}, 8, 100*time.Millisecond, 100*time.Millisecond)
if err != nil {
assert.FailNow(t, "Error while retrying function")
}
assert.Equal(t, 8, retryVal)
}

View File

@ -33,7 +33,7 @@ const setup = () => {
templateSrv.init([variable]);
const datasource = new CloudWatchDatasource(instanceSettings, templateSrv as any, {} as any);
datasource.metricFindQuery = async () => [{ value: 'test', label: 'test' }];
datasource.metricFindQuery = async () => [{ value: 'test', label: 'test', text: 'test' }];
const props: Props = {
query: {

View File

@ -18,8 +18,11 @@ import {
TimeRange,
rangeUtil,
DataQueryErrorType,
LiveChannelScope,
LiveChannelEvent,
LiveChannelMessageEvent,
} from '@grafana/data';
import { getBackendSrv, toDataQueryResponse } from '@grafana/runtime';
import { getBackendSrv, getGrafanaLiveSrv, toDataQueryResponse } from '@grafana/runtime';
import { getTemplateSrv, TemplateSrv } from 'app/features/templating/template_srv';
import { getTimeSrv, TimeSrv } from 'app/features/dashboard/services/TimeSrv';
import { ThrottlingErrorMessage } from './components/ThrottlingErrorMessage';
@ -41,13 +44,26 @@ import {
isCloudWatchLogsQuery,
} from './types';
import { from, Observable, of, merge, zip } from 'rxjs';
import { catchError, finalize, map, mergeMap, tap, concatMap, scan, share, repeat, takeWhile } from 'rxjs/operators';
import {
catchError,
finalize,
map,
mergeMap,
tap,
concatMap,
scan,
share,
repeat,
takeWhile,
filter,
} from 'rxjs/operators';
import { CloudWatchLanguageProvider } from './language_provider';
import { VariableWithMultiSupport } from 'app/features/variables/types';
import { RowContextOptions } from '@grafana/ui/src/components/Logs/LogRowContextProvider';
import { AwsUrl, encodeUrl } from './aws_url';
import { increasingInterval } from './utils/rxjs/increasingInterval';
import config from 'app/core/config';
const TSDB_QUERY_ENDPOINT = '/api/tsdb/query';
@ -72,30 +88,32 @@ const displayCustomError = (title: string, message: string) =>
export const MAX_ATTEMPTS = 5;
export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWatchJsonData> {
type: any;
proxyUrl: any;
defaultRegion: any;
standardStatistics: any;
datasourceName: string;
debouncedAlert: (datasourceName: string, region: string) => void;
debouncedCustomAlert: (title: string, message: string) => void;
logQueries: Record<string, { id: string; region: string; statsQuery: boolean }>;
languageProvider: CloudWatchLanguageProvider;
type = 'cloudwatch';
standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'];
debouncedAlert: (datasourceName: string, region: string) => void = memoizedDebounce(
displayAlert,
AppNotificationTimeout.Error
);
debouncedCustomAlert: (title: string, message: string) => void = memoizedDebounce(
displayCustomError,
AppNotificationTimeout.Error
);
logQueries: Record<string, { id: string; region: string; statsQuery: boolean }> = {};
constructor(
instanceSettings: DataSourceInstanceSettings<CloudWatchJsonData>,
private readonly templateSrv: TemplateSrv = getTemplateSrv(),
private readonly timeSrv: TimeSrv = getTimeSrv()
) {
super(instanceSettings);
this.type = 'cloudwatch';
this.proxyUrl = instanceSettings.url;
this.defaultRegion = instanceSettings.jsonData.defaultRegion;
this.datasourceName = instanceSettings.name;
this.standardStatistics = ['Average', 'Maximum', 'Minimum', 'Sum', 'SampleCount'];
this.debouncedAlert = memoizedDebounce(displayAlert, AppNotificationTimeout.Error);
this.debouncedCustomAlert = memoizedDebounce(displayCustomError, AppNotificationTimeout.Error);
this.logQueries = {};
this.languageProvider = new CloudWatchLanguageProvider(this);
}
@ -108,7 +126,11 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
const dataQueryResponses: Array<Observable<DataQueryResponse>> = [];
if (logQueries.length > 0) {
dataQueryResponses.push(this.handleLogQueries(logQueries, options));
if (config.featureToggles.live) {
dataQueryResponses.push(this.handleLiveLogQueries(logQueries, options));
} else {
dataQueryResponses.push(this.handleLogQueries(logQueries, options));
}
}
if (metricsQueries.length > 0) {
@ -126,6 +148,75 @@ export class CloudWatchDatasource extends DataSourceApi<CloudWatchQuery, CloudWa
return merge(...dataQueryResponses);
}
handleLiveLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
): Observable<DataQueryResponse> => {
const validLogQueries = logQueries.filter(item => item.logGroupNames?.length);
if (logQueries.length > validLogQueries.length) {
return of({ data: [], error: { message: 'Log group is required' } });
}
// No valid targets, return the empty result to save a round trip.
if (_.isEmpty(validLogQueries)) {
return of({ data: [], state: LoadingState.Done });
}
const queryParams = validLogQueries.map((target: CloudWatchLogsQuery) => ({
intervalMs: 1, // dummy
maxDataPoints: 1, // dummy
datasourceId: this.id,
queryString: this.replace(target.expression, options.scopedVars, true),
refId: target.refId,
logGroupNames: target.logGroupNames?.map(logGroup =>
this.replace(logGroup, options.scopedVars, true, 'log groups')
),
statsGroups: target.statsGroups,
region: this.getActualRegion(this.replace(target.region, options.scopedVars, true, 'region')),
type: 'liveLogAction',
}));
const range = this.timeSrv.timeRange();
const requestParams = {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
queries: queryParams,
};
return from(this.awsRequest(TSDB_QUERY_ENDPOINT, requestParams)).pipe(
mergeMap((response: TSDBResponse) => {
const channelName: string = response.results['A'].meta.channelName;
const channel = getGrafanaLiveSrv().getChannel({
scope: LiveChannelScope.Plugin,
namespace: 'cloudwatch',
path: channelName,
});
return channel.getStream();
}),
filter((e: LiveChannelEvent<any>) => e.type === 'message'),
map(({ message }: LiveChannelMessageEvent<TSDBResponse>) => {
const dataQueryResponse = toDataQueryResponse({
data: message,
});
dataQueryResponse.state = dataQueryResponse.data.every(dataFrame =>
statusIsTerminated(dataFrame.meta?.custom?.['Status'])
)
? LoadingState.Done
: LoadingState.Loading;
dataQueryResponse.key = message.results[Object.keys(message.results)[0]].refId;
return this.addDataLinksToLogsResponse(dataQueryResponse, options);
}),
catchError(err => {
if (err.data?.error) {
throw err.data.error;
}
throw err;
})
);
};
handleLogQueries = (
logQueries: CloudWatchLogsQuery[],
options: DataQueryRequest<CloudWatchQuery>
@ -1021,3 +1112,12 @@ function parseLogGroupName(logIdentifier: string): string {
const colonIndex = logIdentifier.lastIndexOf(':');
return logIdentifier.substr(colonIndex + 1);
}
function statusIsTerminated(status: string | CloudWatchLogsQueryStatus) {
return [
CloudWatchLogsQueryStatus.Complete,
CloudWatchLogsQueryStatus.Cancelled,
CloudWatchLogsQueryStatus.Failed,
CloudWatchLogsQueryStatus.Timeout,
].includes(status as CloudWatchLogsQueryStatus);
}

View File

@ -16,4 +16,10 @@ export const plugin = new DataSourcePlugin<CloudWatchDatasource, CloudWatchQuery
.setQueryEditor(PanelQueryEditor)
.setExploreMetricsQueryField(PanelQueryEditor)
.setExploreLogsQueryField(CloudWatchLogsQueryEditor)
.setAnnotationQueryCtrl(CloudWatchAnnotationsQueryCtrl);
.setAnnotationQueryCtrl(CloudWatchAnnotationsQueryCtrl)
.setChannelSupport({
getChannelConfig: (path: string) => ({
path,
}),
getSupportedPaths: () => [],
});

View File

@ -30,6 +30,7 @@ export enum CloudWatchLogsQueryStatus {
Complete = 'Complete',
Failed = 'Failed',
Cancelled = 'Cancelled',
Timeout = 'Timeout',
}
export interface CloudWatchLogsQuery extends DataQuery {