grafana/pkg/tsdb/cloudwatch/live.go

314 lines
9.0 KiB
Go

package cloudwatch
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go/service/servicequotas"
"github.com/aws/aws-sdk-go/service/servicequotas/servicequotasiface"
"github.com/centrifugal/centrifuge"
"github.com/google/uuid"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/util/retryer"
"golang.org/x/sync/errgroup"
)
const defaultConcurrentQueries = 4
type LogQueryRunnerSupplier struct {
Publisher models.ChannelPublisher
Service *LogsService
}
type logQueryRunner struct {
channelName string
publish models.ChannelPublisher
running map[string]bool
runningMu sync.Mutex
service *LogsService
}
const (
maxAttempts = 8
minRetryDelay = 500 * time.Millisecond
maxRetryDelay = 30 * time.Second
)
// GetHandlerForPath gets the channel handler for a certain path.
func (s *LogQueryRunnerSupplier) GetHandlerForPath(path string) (models.ChannelHandler, error) {
return &logQueryRunner{
channelName: path,
publish: s.Publisher,
running: make(map[string]bool),
service: s.Service,
}, nil
}
// OnSubscribe publishes results from the corresponding CloudWatch Logs query to the provided channel
func (r *logQueryRunner) OnSubscribe(c *centrifuge.Client, e centrifuge.SubscribeEvent) (centrifuge.SubscribeReply, error) {
r.runningMu.Lock()
defer r.runningMu.Unlock()
if _, ok := r.running[e.Channel]; ok {
return centrifuge.SubscribeReply{}, nil
}
r.running[e.Channel] = true
go func() {
if err := r.publishResults(e.Channel); err != nil {
plog.Error(err.Error())
}
}()
return centrifuge.SubscribeReply{}, nil
}
// OnPublish checks if a message from the websocket can be broadcast on this channel
func (r *logQueryRunner) OnPublish(c *centrifuge.Client, e centrifuge.PublishEvent) (centrifuge.PublishReply, error) {
return centrifuge.PublishReply{}, fmt.Errorf("can not publish")
}
func (r *logQueryRunner) publishResults(channelName string) error {
defer func() {
r.service.DeleteResponseChannel(channelName)
r.runningMu.Lock()
delete(r.running, channelName)
r.runningMu.Unlock()
}()
responseChannel, err := r.service.GetResponseChannel(channelName)
if err != nil {
return err
}
for response := range responseChannel {
responseBytes, err := json.Marshal(response)
if err != nil {
return err
}
if err := r.publish(channelName, responseBytes); err != nil {
return err
}
}
return nil
}
// executeLiveLogQuery executes a CloudWatch Logs query with live updates over WebSocket.
// A WebSocket channel is created, which goroutines send responses over.
func (e *cloudWatchExecutor) executeLiveLogQuery(ctx context.Context, queryContext *tsdb.TsdbQuery) (*tsdb.Response, error) {
responseChannelName := uuid.New().String()
responseChannel := make(chan *tsdb.Response)
if err := e.logsService.AddResponseChannel("plugin/cloudwatch/"+responseChannelName, responseChannel); err != nil {
close(responseChannel)
return nil, err
}
go e.sendLiveQueriesToChannel(queryContext, responseChannel)
response := &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
"A": {
RefId: "A",
Meta: simplejson.NewFromAny(map[string]interface{}{
"channelName": responseChannelName,
}),
},
},
}
return response, nil
}
func (e *cloudWatchExecutor) sendLiveQueriesToChannel(queryContext *tsdb.TsdbQuery, responseChannel chan *tsdb.Response) {
defer close(responseChannel)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
eg, ectx := errgroup.WithContext(ctx)
for _, query := range queryContext.Queries {
query := query
eg.Go(func() error {
return e.startLiveQuery(ectx, responseChannel, query, queryContext.TimeRange)
})
}
if err := eg.Wait(); err != nil {
plog.Error(err.Error())
}
}
func (e *cloudWatchExecutor) getQueue(queueKey string) (chan bool, error) {
e.logsService.queueLock.Lock()
defer e.logsService.queueLock.Unlock()
if queue, ok := e.logsService.queues[queueKey]; ok {
return queue, nil
}
concurrentQueriesQuota := e.fetchConcurrentQueriesQuota(queueKey)
queueChannel := make(chan bool, concurrentQueriesQuota)
e.logsService.queues[queueKey] = queueChannel
return queueChannel, nil
}
func (e *cloudWatchExecutor) fetchConcurrentQueriesQuota(region string) int {
sess, err := e.newSession(region)
if err != nil {
plog.Warn("Could not get service quota client")
return defaultConcurrentQueries
}
client := newQuotasClient(sess)
concurrentQueriesQuota, err := client.GetServiceQuota(&servicequotas.GetServiceQuotaInput{
ServiceCode: aws.String("logs"),
QuotaCode: aws.String("L-32C48FBB"),
})
if err != nil {
plog.Warn("Could not get service quota")
return defaultConcurrentQueries
}
if concurrentQueriesQuota != nil && concurrentQueriesQuota.Quota != nil && concurrentQueriesQuota.Quota.Value != nil {
return int(*concurrentQueriesQuota.Quota.Value)
}
plog.Warn("Could not get service quota")
defaultConcurrentQueriesQuota, err := client.GetAWSDefaultServiceQuota(&servicequotas.GetAWSDefaultServiceQuotaInput{
ServiceCode: aws.String("logs"),
QuotaCode: aws.String("L-32C48FBB"),
})
if err != nil {
plog.Warn("Could not get default service quota")
return defaultConcurrentQueries
}
if defaultConcurrentQueriesQuota != nil && defaultConcurrentQueriesQuota.Quota != nil && defaultConcurrentQueriesQuota.Quota.Value != nil {
return int(*defaultConcurrentQueriesQuota.Quota.Value)
}
plog.Warn("Could not get default service quota")
return defaultConcurrentQueries
}
func (e *cloudWatchExecutor) startLiveQuery(ctx context.Context, responseChannel chan *tsdb.Response, query *tsdb.Query, timeRange *tsdb.TimeRange) error {
defaultRegion := e.DataSource.JsonData.Get("defaultRegion").MustString()
parameters := query.Model
region := parameters.Get("region").MustString(defaultRegion)
logsClient, err := e.getCWLogsClient(region)
if err != nil {
return err
}
queue, err := e.getQueue(fmt.Sprintf("%s-%d", region, e.DataSource.Id))
if err != nil {
return err
}
// Wait until there are no more active workers than the concurrent queries quota
queue <- true
defer func() { <-queue }()
startQueryOutput, err := e.executeStartQuery(ctx, logsClient, parameters, timeRange)
if err != nil {
return err
}
queryResultsInput := &cloudwatchlogs.GetQueryResultsInput{
QueryId: startQueryOutput.QueryId,
}
recordsMatched := 0.0
return retryer.Retry(func() (retryer.RetrySignal, error) {
getQueryResultsOutput, err := logsClient.GetQueryResultsWithContext(ctx, queryResultsInput)
if err != nil {
return retryer.FuncError, err
}
retryNeeded := *getQueryResultsOutput.Statistics.RecordsMatched <= recordsMatched
recordsMatched = *getQueryResultsOutput.Statistics.RecordsMatched
dataFrame, err := logsResultsToDataframes(getQueryResultsOutput)
if err != nil {
return retryer.FuncError, err
}
dataFrame.Name = query.RefId
dataFrame.RefID = query.RefId
var dataFrames data.Frames
// When a query of the form "stats ... by ..." is made, we want to return
// one series per group defined in the query, but due to the format
// the query response is in, there does not seem to be a way to tell
// by the response alone if/how the results should be grouped.
// Because of this, if the frontend sees that a "stats ... by ..." query is being made
// the "statsGroups" parameter is sent along with the query to the backend so that we
// can correctly group the CloudWatch logs response.
statsGroups := parameters.Get("statsGroups").MustStringArray()
if len(statsGroups) > 0 && len(dataFrame.Fields) > 0 {
groupedFrames, err := groupResults(dataFrame, statsGroups)
if err != nil {
return retryer.FuncError, err
}
dataFrames = groupedFrames
} else {
if dataFrame.Meta != nil {
dataFrame.Meta.PreferredVisualization = "logs"
} else {
dataFrame.Meta = &data.FrameMeta{
PreferredVisualization: "logs",
}
}
dataFrames = data.Frames{dataFrame}
}
responseChannel <- &tsdb.Response{
Results: map[string]*tsdb.QueryResult{
query.RefId: {
RefId: query.RefId,
Dataframes: tsdb.NewDecodedDataFrames(dataFrames),
},
},
}
if isTerminated(*getQueryResultsOutput.Status) {
return retryer.FuncComplete, nil
} else if retryNeeded {
return retryer.FuncFailure, nil
}
return retryer.FuncSuccess, nil
}, maxAttempts, minRetryDelay, maxRetryDelay)
}
// Service quotas client factory.
//
// Stubbable by tests.
var newQuotasClient = func(sess *session.Session) servicequotasiface.ServiceQuotasAPI {
client := servicequotas.New(sess)
client.Handlers.Send.PushFront(func(r *request.Request) {
r.HTTPRequest.Header.Set("User-Agent", fmt.Sprintf("Grafana/%s", setting.BuildVersion))
})
return client
}