grafana/pkg/services/live/managedstream/runner.go
linoman 1b8e9b51b2
Replace signed in user for identity.requester (#74048)
* Make identity.Requester available at Context

* Clean pkg/services/guardian/guardian.go

* Clean guardian provider and guardian AC

* Clean pkg/api/team.go

* Clean ctxhandler, datasources, plugin and live

* Clean dashboards and guardian

* Implement NewUserDisplayDTOFromRequester

* Change status code numbers for http constants

* Upgrade signature of ngalert services

* log parsing errors instead of throwing error
2023-08-30 16:51:18 +02:00

257 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package managedstream
import (
"context"
"encoding/json"
"fmt"
"sort"
"sync"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/live"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/auth/identity"
"github.com/grafana/grafana/pkg/services/live/model"
"github.com/grafana/grafana/pkg/services/live/orgchannel"
)
var (
logger = log.New("live.managed_stream")
)
// If message comes from a plugin:
// * it's simply sent to local subscribers without any additional steps
// * if there is RULE then may be processed in some way
// * important to keep a message in the original channel
// * client subscribed to ds/<UID>/xxx
//
// What we want to build:
// * Stream scope not hardcoded and determined by the caller
// * So it's possible to use managed stream from plugins
// * The problem is HA at moment several plugins on different nodes publish same messages
// * Can use in-memory managed stream for plugins with local subscribers publish, use HA-managed stream for HTTP/WS
// * Eventually maintain a single connection with a plugin over a channel leader selection.
// Runner keeps NamespaceStream per namespace.
type Runner struct {
mu sync.RWMutex
streams map[int64]map[string]*NamespaceStream
publisher model.ChannelPublisher
localPublisher LocalPublisher
frameCache FrameCache
}
type LocalPublisher interface {
PublishLocal(channel string, data []byte) error
}
// NewRunner creates new Runner.
func NewRunner(publisher model.ChannelPublisher, localPublisher LocalPublisher, frameCache FrameCache) *Runner {
return &Runner{
publisher: publisher,
localPublisher: localPublisher,
streams: map[int64]map[string]*NamespaceStream{},
frameCache: frameCache,
}
}
func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) {
activeChannels, err := r.frameCache.GetActiveChannels(orgID)
if err != nil {
return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err)
}
channels := make([]*ManagedChannel, 0, len(activeChannels))
for ch, schema := range activeChannels {
managedChannel := &ManagedChannel{
Channel: ch,
Data: schema,
}
// Enrich with minute rate.
channel, _ := live.ParseChannel(managedChannel.Channel)
prefix := channel.Scope + "/" + channel.Namespace
namespaceStream, ok := r.streams[orgID][prefix]
if ok {
managedChannel.MinuteRate = namespaceStream.minuteRate(channel.Path)
}
channels = append(channels, managedChannel)
}
// Hardcode sample streams
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
data.NewField("Time", nil, make([]time.Time, 0)),
data.NewField("Value", nil, make([]float64, 0)),
data.NewField("Min", nil, make([]float64, 0)),
data.NewField("Max", nil, make([]float64, 0)),
), data.IncludeSchemaOnly)
if err == nil {
channels = append(channels, &ManagedChannel{
Channel: "plugin/testdata/random-2s-stream",
Data: frameJSON,
MinuteRate: 30,
}, &ManagedChannel{
Channel: "plugin/testdata/random-flakey-stream",
Data: frameJSON,
MinuteRate: 150,
}, &ManagedChannel{
Channel: "plugin/testdata/random-labeled-stream",
Data: frameJSON,
MinuteRate: 250,
}, &ManagedChannel{
Channel: "plugin/testdata/random-20Hz-stream",
Data: frameJSON,
MinuteRate: 1200,
})
}
sort.Slice(channels, func(i, j int) bool {
return channels[i].Channel < channels[j].Channel
})
return channels, nil
}
// GetOrCreateStream -- for now this will create new manager for each key.
// Eventually, the stream behavior will need to be configured explicitly
func (r *Runner) GetOrCreateStream(orgID int64, scope string, namespace string) (*NamespaceStream, error) {
r.mu.Lock()
defer r.mu.Unlock()
_, ok := r.streams[orgID]
if !ok {
r.streams[orgID] = map[string]*NamespaceStream{}
}
prefix := scope + "/" + namespace
s, ok := r.streams[orgID][prefix]
if !ok {
s = NewNamespaceStream(orgID, scope, namespace, r.publisher, r.localPublisher, r.frameCache)
r.streams[orgID][prefix] = s
}
return s, nil
}
// NamespaceStream holds the state of a managed stream.
type NamespaceStream struct {
orgID int64
scope string
namespace string
publisher model.ChannelPublisher
localPublisher LocalPublisher
frameCache FrameCache
rateMu sync.RWMutex
rates map[string][60]rateEntry
}
type rateEntry struct {
time uint32
count int32
}
// ManagedChannel represents a managed stream.
type ManagedChannel struct {
Channel string `json:"channel"`
MinuteRate int64 `json:"minute_rate"`
Data json.RawMessage `json:"data"`
}
// NewNamespaceStream creates new NamespaceStream.
func NewNamespaceStream(orgID int64, scope string, namespace string, publisher model.ChannelPublisher, localPublisher LocalPublisher, schemaUpdater FrameCache) *NamespaceStream {
return &NamespaceStream{
orgID: orgID,
scope: scope,
namespace: namespace,
publisher: publisher,
localPublisher: localPublisher,
frameCache: schemaUpdater,
rates: map[string][60]rateEntry{},
}
}
// Push sends frame to the stream and saves it for later retrieval by subscribers.
// * Saves the entire frame to cache.
// * If schema has been changed sends entire frame to channel, otherwise only data.
func (s *NamespaceStream) Push(ctx context.Context, path string, frame *data.Frame) error {
jsonFrameCache, err := data.FrameToJSONCache(frame)
if err != nil {
return err
}
// The channel this will be posted into.
channel := live.Channel{Scope: s.scope, Namespace: s.namespace, Path: path}.String()
isUpdated, err := s.frameCache.Update(ctx, s.orgID, channel, jsonFrameCache)
if err != nil {
logger.Error("Error updating managed stream schema", "error", err)
return err
}
// When the schema has not changed, just send the data.
include := data.IncludeDataOnly
if isUpdated {
// When the schema has been changed, send all.
include = data.IncludeAll
}
frameJSON := jsonFrameCache.Bytes(include)
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
s.incRate(path, time.Now().Unix())
if s.scope == live.ScopeDatasource || s.scope == live.ScopePlugin {
return s.localPublisher.PublishLocal(orgchannel.PrependOrgID(s.orgID, channel), frameJSON)
}
return s.publisher(s.orgID, channel, frameJSON)
}
func (s *NamespaceStream) incRate(path string, nowUnix int64) {
s.rateMu.Lock()
pathRate, ok := s.rates[path]
if !ok {
pathRate = [60]rateEntry{}
}
now := time.Unix(nowUnix, 0)
slot := now.Second() % 60
if pathRate[slot].time != uint32(nowUnix) {
pathRate[slot].count = 0
}
pathRate[slot].time = uint32(nowUnix)
pathRate[slot].count += 1
s.rates[path] = pathRate
s.rateMu.Unlock()
}
func (s *NamespaceStream) minuteRate(path string) int64 {
var total int64
s.rateMu.RLock()
defer s.rateMu.RUnlock()
pathRate, ok := s.rates[path]
if !ok {
return 0
}
for _, val := range pathRate {
if val.time > uint32(time.Now().Unix()-60) {
total += int64(val.count)
}
}
return total
}
func (s *NamespaceStream) GetHandlerForPath(_ string) (model.ChannelHandler, error) {
return s, nil
}
func (s *NamespaceStream) OnSubscribe(ctx context.Context, u identity.Requester, e model.SubscribeEvent) (model.SubscribeReply, backend.SubscribeStreamStatus, error) {
reply := model.SubscribeReply{}
frameJSON, ok, err := s.frameCache.GetFrame(ctx, u.GetOrgID(), e.Channel)
if err != nil {
return reply, 0, err
}
if ok {
reply.Data = frameJSON
}
return reply, backend.SubscribeStreamStatusOK, nil
}
func (s *NamespaceStream) OnPublish(_ context.Context, _ identity.Requester, _ model.PublishEvent) (model.PublishReply, backend.PublishStreamStatus, error) {
return model.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
}