2021-04-26 13:17:49 +03:00
|
|
|
package managedstream
|
2021-04-05 19:04:46 +03:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
2021-06-24 11:07:09 +03:00
|
|
|
"fmt"
|
2021-07-30 21:05:39 +03:00
|
|
|
"sort"
|
2021-04-05 19:04:46 +03:00
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
2021-04-09 12:17:22 -07:00
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/live"
|
2021-04-26 13:17:49 +03:00
|
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
2021-04-05 19:04:46 +03:00
|
|
|
"github.com/grafana/grafana/pkg/models"
|
|
|
|
|
)
|
|
|
|
|
|
2021-04-26 13:17:49 +03:00
|
|
|
var (
|
|
|
|
|
logger = log.New("live.managed_stream")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Runner keeps ManagedStream per streamID.
|
|
|
|
|
type Runner struct {
|
2021-06-24 11:07:09 +03:00
|
|
|
mu sync.RWMutex
|
|
|
|
|
streams map[int64]map[string]*ManagedStream
|
|
|
|
|
publisher models.ChannelPublisher
|
|
|
|
|
frameCache FrameCache
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
|
2021-04-26 13:17:49 +03:00
|
|
|
// NewRunner creates new Runner.
|
2021-06-24 11:07:09 +03:00
|
|
|
func NewRunner(publisher models.ChannelPublisher, frameCache FrameCache) *Runner {
|
2021-04-26 13:17:49 +03:00
|
|
|
return &Runner{
|
2021-06-24 11:07:09 +03:00
|
|
|
publisher: publisher,
|
|
|
|
|
streams: map[int64]map[string]*ManagedStream{},
|
|
|
|
|
frameCache: frameCache,
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-24 11:07:09 +03:00
|
|
|
func (r *Runner) GetManagedChannels(orgID int64) ([]*ManagedChannel, error) {
|
2021-07-30 21:05:39 +03:00
|
|
|
activeChannels, err := r.frameCache.GetActiveChannels(orgID)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return []*ManagedChannel{}, fmt.Errorf("error getting active managed stream paths: %v", err)
|
|
|
|
|
}
|
|
|
|
|
channels := make([]*ManagedChannel, 0, len(activeChannels))
|
|
|
|
|
for ch, schema := range activeChannels {
|
|
|
|
|
managedChannel := &ManagedChannel{
|
|
|
|
|
Channel: ch,
|
|
|
|
|
Data: schema,
|
2021-06-24 11:07:09 +03:00
|
|
|
}
|
2021-07-30 21:05:39 +03:00
|
|
|
// Enrich with minute rate.
|
|
|
|
|
channel, _ := live.ParseChannel(managedChannel.Channel)
|
|
|
|
|
namespaceStream, ok := r.streams[orgID][channel.Namespace]
|
|
|
|
|
if ok {
|
|
|
|
|
managedChannel.MinuteRate = namespaceStream.minuteRate(channel.Path)
|
|
|
|
|
}
|
|
|
|
|
channels = append(channels, managedChannel)
|
2021-06-24 11:07:09 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Hardcode sample streams
|
|
|
|
|
frameJSON, err := data.FrameToJSON(data.NewFrame("testdata",
|
|
|
|
|
data.NewField("Time", nil, make([]time.Time, 0)),
|
|
|
|
|
data.NewField("Value", nil, make([]float64, 0)),
|
|
|
|
|
data.NewField("Min", nil, make([]float64, 0)),
|
|
|
|
|
data.NewField("Max", nil, make([]float64, 0)),
|
|
|
|
|
), data.IncludeSchemaOnly)
|
|
|
|
|
if err == nil {
|
|
|
|
|
channels = append(channels, &ManagedChannel{
|
2021-07-30 21:05:39 +03:00
|
|
|
Channel: "plugin/testdata/random-2s-stream",
|
|
|
|
|
Data: frameJSON,
|
|
|
|
|
MinuteRate: 30,
|
2021-06-24 11:07:09 +03:00
|
|
|
}, &ManagedChannel{
|
2021-07-30 21:05:39 +03:00
|
|
|
Channel: "plugin/testdata/random-flakey-stream",
|
|
|
|
|
Data: frameJSON,
|
|
|
|
|
MinuteRate: 150,
|
2021-06-24 11:07:09 +03:00
|
|
|
}, &ManagedChannel{
|
2021-07-30 21:05:39 +03:00
|
|
|
Channel: "plugin/testdata/random-20Hz-stream",
|
|
|
|
|
Data: frameJSON,
|
|
|
|
|
MinuteRate: 1200,
|
2021-06-24 11:07:09 +03:00
|
|
|
})
|
|
|
|
|
}
|
2021-07-30 21:05:39 +03:00
|
|
|
|
|
|
|
|
sort.Slice(channels, func(i, j int) bool {
|
|
|
|
|
return channels[i].Channel < channels[j].Channel
|
|
|
|
|
})
|
|
|
|
|
|
2021-06-24 11:07:09 +03:00
|
|
|
return channels, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-09 21:06:25 +03:00
|
|
|
// Streams returns a map of active managed streams (per streamID).
|
2021-05-11 22:03:04 +03:00
|
|
|
func (r *Runner) Streams(orgID int64) map[string]*ManagedStream {
|
2021-04-05 19:04:46 +03:00
|
|
|
r.mu.RLock()
|
|
|
|
|
defer r.mu.RUnlock()
|
2021-05-11 22:03:04 +03:00
|
|
|
if _, ok := r.streams[orgID]; !ok {
|
|
|
|
|
return map[string]*ManagedStream{}
|
|
|
|
|
}
|
|
|
|
|
streams := make(map[string]*ManagedStream, len(r.streams[orgID]))
|
|
|
|
|
for k, v := range r.streams[orgID] {
|
2021-04-05 19:04:46 +03:00
|
|
|
streams[k] = v
|
|
|
|
|
}
|
|
|
|
|
return streams
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetOrCreateStream -- for now this will create new manager for each key.
|
|
|
|
|
// Eventually, the stream behavior will need to be configured explicitly
|
2021-05-11 22:03:04 +03:00
|
|
|
func (r *Runner) GetOrCreateStream(orgID int64, streamID string) (*ManagedStream, error) {
|
2021-04-05 19:04:46 +03:00
|
|
|
r.mu.Lock()
|
|
|
|
|
defer r.mu.Unlock()
|
2021-05-11 22:03:04 +03:00
|
|
|
_, ok := r.streams[orgID]
|
|
|
|
|
if !ok {
|
|
|
|
|
r.streams[orgID] = map[string]*ManagedStream{}
|
|
|
|
|
}
|
|
|
|
|
s, ok := r.streams[orgID][streamID]
|
2021-04-05 19:04:46 +03:00
|
|
|
if !ok {
|
2021-07-30 21:05:39 +03:00
|
|
|
s = NewManagedStream(streamID, orgID, r.publisher, r.frameCache)
|
2021-05-11 22:03:04 +03:00
|
|
|
r.streams[orgID][streamID] = s
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-04-09 21:06:25 +03:00
|
|
|
// ManagedStream holds the state of a managed stream.
|
2021-04-05 19:04:46 +03:00
|
|
|
type ManagedStream struct {
|
2021-06-24 11:07:09 +03:00
|
|
|
id string
|
2021-07-30 21:05:39 +03:00
|
|
|
orgID int64
|
2021-06-24 11:07:09 +03:00
|
|
|
start time.Time
|
|
|
|
|
publisher models.ChannelPublisher
|
|
|
|
|
frameCache FrameCache
|
2021-07-30 21:05:39 +03:00
|
|
|
rateMu sync.RWMutex
|
|
|
|
|
rates map[string][60]rateEntry
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type rateEntry struct {
|
|
|
|
|
time uint32
|
|
|
|
|
count int32
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
|
2021-04-09 21:06:25 +03:00
|
|
|
// NewManagedStream creates new ManagedStream.
|
2021-07-30 21:05:39 +03:00
|
|
|
func NewManagedStream(id string, orgID int64, publisher models.ChannelPublisher, schemaUpdater FrameCache) *ManagedStream {
|
2021-04-05 19:04:46 +03:00
|
|
|
return &ManagedStream{
|
2021-06-24 11:07:09 +03:00
|
|
|
id: id,
|
2021-07-30 21:05:39 +03:00
|
|
|
orgID: orgID,
|
2021-06-24 11:07:09 +03:00
|
|
|
start: time.Now(),
|
|
|
|
|
publisher: publisher,
|
|
|
|
|
frameCache: schemaUpdater,
|
2021-07-30 21:05:39 +03:00
|
|
|
rates: map[string][60]rateEntry{},
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-24 11:07:09 +03:00
|
|
|
// ManagedChannel represents a managed stream.
|
|
|
|
|
type ManagedChannel struct {
|
2021-07-30 21:05:39 +03:00
|
|
|
Channel string `json:"channel"`
|
|
|
|
|
MinuteRate int64 `json:"minute_rate"`
|
|
|
|
|
Data json.RawMessage `json:"data"`
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
|
2021-04-09 21:06:25 +03:00
|
|
|
// Push sends frame to the stream and saves it for later retrieval by subscribers.
|
2021-04-26 20:46:26 +03:00
|
|
|
// unstableSchema flag can be set to disable schema caching for a path.
|
2021-07-30 21:05:39 +03:00
|
|
|
func (s *ManagedStream) Push(path string, frame *data.Frame) error {
|
2021-06-24 11:07:09 +03:00
|
|
|
jsonFrameCache, err := data.FrameToJSONCache(frame)
|
2021-04-05 19:04:46 +03:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2021-05-27 02:55:42 -07:00
|
|
|
|
2021-04-05 19:04:46 +03:00
|
|
|
// The channel this will be posted into.
|
2021-04-09 12:17:22 -07:00
|
|
|
channel := live.Channel{Scope: live.ScopeStream, Namespace: s.id, Path: path}.String()
|
2021-04-05 19:04:46 +03:00
|
|
|
|
2021-07-30 21:05:39 +03:00
|
|
|
isUpdated, err := s.frameCache.Update(s.orgID, channel, jsonFrameCache)
|
2021-06-24 11:07:09 +03:00
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("Error updating managed stream schema", "error", err)
|
|
|
|
|
return err
|
2021-05-11 22:03:04 +03:00
|
|
|
}
|
2021-06-24 11:07:09 +03:00
|
|
|
|
|
|
|
|
// When the schema has not changed, just send the data.
|
|
|
|
|
include := data.IncludeDataOnly
|
|
|
|
|
if isUpdated {
|
|
|
|
|
// When the schema has been changed, send all.
|
|
|
|
|
include = data.IncludeAll
|
2021-05-27 02:55:42 -07:00
|
|
|
}
|
2021-06-24 11:07:09 +03:00
|
|
|
frameJSON := jsonFrameCache.Bytes(include)
|
|
|
|
|
|
|
|
|
|
logger.Debug("Publish data to channel", "channel", channel, "dataLength", len(frameJSON))
|
2021-07-30 21:05:39 +03:00
|
|
|
s.incRate(path, time.Now().Unix())
|
|
|
|
|
return s.publisher(s.orgID, channel, frameJSON)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ManagedStream) incRate(path string, nowUnix int64) {
|
|
|
|
|
s.rateMu.Lock()
|
|
|
|
|
pathRate, ok := s.rates[path]
|
|
|
|
|
if !ok {
|
|
|
|
|
pathRate = [60]rateEntry{}
|
|
|
|
|
}
|
|
|
|
|
now := time.Unix(nowUnix, 0)
|
|
|
|
|
slot := now.Second() % 60
|
|
|
|
|
if pathRate[slot].time != uint32(nowUnix) {
|
|
|
|
|
pathRate[slot].count = 0
|
|
|
|
|
}
|
|
|
|
|
pathRate[slot].time = uint32(nowUnix)
|
|
|
|
|
pathRate[slot].count += 1
|
|
|
|
|
s.rates[path] = pathRate
|
|
|
|
|
s.rateMu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ManagedStream) minuteRate(path string) int64 {
|
|
|
|
|
var total int64
|
|
|
|
|
s.rateMu.RLock()
|
|
|
|
|
defer s.rateMu.RUnlock()
|
|
|
|
|
pathRate, ok := s.rates[path]
|
|
|
|
|
if !ok {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
for _, val := range pathRate {
|
|
|
|
|
if val.time > uint32(time.Now().Unix()-60) {
|
|
|
|
|
total += int64(val.count)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return total
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *ManagedStream) GetHandlerForPath(_ string) (models.ChannelHandler, error) {
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-05-11 22:03:04 +03:00
|
|
|
func (s *ManagedStream) OnSubscribe(_ context.Context, u *models.SignedInUser, e models.SubscribeEvent) (models.SubscribeReply, backend.SubscribeStreamStatus, error) {
|
2021-04-05 19:04:46 +03:00
|
|
|
reply := models.SubscribeReply{}
|
2021-06-24 11:07:09 +03:00
|
|
|
frameJSON, ok, err := s.frameCache.GetFrame(u.OrgId, e.Channel)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return reply, 0, err
|
|
|
|
|
}
|
2021-04-05 19:04:46 +03:00
|
|
|
if ok {
|
2021-06-24 11:07:09 +03:00
|
|
|
reply.Data = frameJSON
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|
|
|
|
|
return reply, backend.SubscribeStreamStatusOK, nil
|
|
|
|
|
}
|
|
|
|
|
|
2021-06-24 11:07:09 +03:00
|
|
|
func (s *ManagedStream) OnPublish(_ context.Context, _ *models.SignedInUser, _ models.PublishEvent) (models.PublishReply, backend.PublishStreamStatus, error) {
|
|
|
|
|
return models.PublishReply{}, backend.PublishStreamStatusPermissionDenied, nil
|
2021-04-05 19:04:46 +03:00
|
|
|
}
|