diff --git a/pkg/api/api.go b/pkg/api/api.go index 32c929a42e2..d85ce6b569a 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -244,6 +244,9 @@ func Register(r *macaron.Macaron) { liveConn := live.New() r.Any("/ws", liveConn.Serve) + // streams + r.Post("/streams/push", reqSignedIn, bind(dtos.StreamMessage{}), liveConn.PushToStream) + InitAppPluginRoutes(r) } diff --git a/pkg/api/dtos/stream.go b/pkg/api/dtos/stream.go new file mode 100644 index 00000000000..032588247b0 --- /dev/null +++ b/pkg/api/dtos/stream.go @@ -0,0 +1,9 @@ +package dtos + +import "encoding/json" + +type StreamMessage struct { + Stream string `json:"stream"` + Metric string `json:"name"` + Datapoints [][]json.Number `json:"username"` +} diff --git a/pkg/api/live/conn.go b/pkg/api/live/conn.go index 4f5df016dcd..b9a39f92fc6 100644 --- a/pkg/api/live/conn.go +++ b/pkg/api/live/conn.go @@ -115,25 +115,3 @@ func (c *connection) writePump() { } } } - -type LiveConn struct { -} - -func New() *LiveConn { - go h.run() - return &LiveConn{} -} - -func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) { - log.Info("Live: Upgrading to WebSocket") - - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Error(3, "Live: Failed to upgrade connection to WebSocket", err) - return - } - c := newConnection(ws) - h.register <- c - go c.writePump() - c.readPump() -} diff --git a/pkg/api/live/hub.go b/pkg/api/live/hub.go index c776cea7f34..cd90eb90dc7 100644 --- a/pkg/api/live/hub.go +++ b/pkg/api/live/hub.go @@ -1,6 +1,9 @@ package live -import "github.com/grafana/grafana/pkg/log" +import ( + "github.com/grafana/grafana/pkg/api/dtos" + "github.com/grafana/grafana/pkg/log" +) type hub struct { // Registered connections. @@ -14,6 +17,8 @@ type hub struct { // Unregister requests from connections. unregister chan *connection + + streamPipe chan *dtos.StreamMessage } var h = hub{ diff --git a/pkg/api/live/live.go b/pkg/api/live/live.go new file mode 100644 index 00000000000..e38b4df38ad --- /dev/null +++ b/pkg/api/live/live.go @@ -0,0 +1,35 @@ +package live + +import ( + "net/http" + + "github.com/grafana/grafana/pkg/api/dtos" + "github.com/grafana/grafana/pkg/log" + "github.com/grafana/grafana/pkg/middleware" +) + +type LiveConn struct { +} + +func New() *LiveConn { + go h.run() + return &LiveConn{} +} + +func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) { + log.Info("Live: Upgrading to WebSocket") + + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error(3, "Live: Failed to upgrade connection to WebSocket", err) + return + } + c := newConnection(ws) + h.register <- c + go c.writePump() + c.readPump() +} + +func (lc *LiveConn) PushToStream(c *middleware.Context, message dtos.StreamMessage) { + +}