From 8bccbdafd2d30caf1fef6feb05dad040f3bfc7e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Wed, 21 Dec 2016 09:16:29 +0100 Subject: [PATCH] feat(live): minor progress on stream manager --- pkg/api/live/stream_manager.go | 74 ++++++++++++++++++++++++++++++++++ pkg/models/streams.go | 27 +++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 pkg/api/live/stream_manager.go create mode 100644 pkg/models/streams.go diff --git a/pkg/api/live/stream_manager.go b/pkg/api/live/stream_manager.go new file mode 100644 index 00000000000..ddc57c67cdc --- /dev/null +++ b/pkg/api/live/stream_manager.go @@ -0,0 +1,74 @@ +package live + +import ( + "sync" + + "github.com/grafana/grafana/pkg/components/simplejson" + "github.com/grafana/grafana/pkg/log" + m "github.com/grafana/grafana/pkg/models" +) + +type StreamManagerImpl struct { + log log.Logger + streams map[string]*Stream + streamRWMutex *sync.RWMutex +} + +func NewStreamManager() m.StreamManager { + return &StreamManagerImpl{ + log: log.New("live.stream.manager"), + streams: make(map[string]*Stream), + streamRWMutex: &sync.RWMutex{}, + } +} + +func (s *StreamManagerImpl) GetStreamList() m.StreamList { + list := make(m.StreamList, 0) + + for _, stream := range s.streams { + list = append(list, &m.StreamInfo{ + Name: stream.name, + }) + } + + return list +} + +func (s *StreamManagerImpl) Push(packet *m.StreamPacket) { + stream, exist := s.streams[packet.Stream] + + if !exist { + s.log.Info("Creating metric stream", "name", packet.Stream) + stream = NewStream(packet.Stream) + s.streams[stream.name] = stream + } + + stream.Push(packet) +} + +type Stream struct { + subscribers []*connection + name string +} + +func NewStream(name string) *Stream { + return &Stream{ + subscribers: make([]*connection, 0), + name: name, + } +} + +func (s *Stream) Push(packet *m.StreamPacket) { + + messageBytes, _ := simplejson.NewFromAny(packet).Encode() + + for _, sub := range s.subscribers { + // check if channel is open + // if _, ok := h.connections[sub]; !ok { + // delete(s.subscribers, sub) + // continue + // } + + sub.send <- messageBytes + } +} diff --git a/pkg/models/streams.go b/pkg/models/streams.go new file mode 100644 index 00000000000..c2c58e83f7b --- /dev/null +++ b/pkg/models/streams.go @@ -0,0 +1,27 @@ +package models + +import "gopkg.in/guregu/null.v3" + +type TimePoint [2]null.Float +type TimeSeriesPoints []TimePoint + +type StreamPacket struct { + Stream string `json:"stream"` + Series []StreamSeries `json:"series"` +} + +type StreamSeries struct { + Name string `json:"name"` + Points TimeSeriesPoints `json:"points"` +} + +type StreamInfo struct { + Name string +} + +type StreamList []*StreamInfo + +type StreamManager interface { + GetStreamList() StreamList + Push(data *StreamPacket) +}