From 2adc4d12be3f87d27400291a972185cb05797f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Mon, 14 Mar 2016 13:20:55 +0100 Subject: [PATCH] feat(live): work on websocket data source, #3455 --- pkg/api/live/conn.go | 42 +++++++++++++-- pkg/api/live/hub.go | 1 + .../app/core/directives/plugin_component.ts | 6 ++- public/app/core/live/live_srv.ts | 53 ++++++++++++++----- .../{stream => grafana-live}/datasource.ts | 4 +- .../{stream => grafana-live}/module.ts | 0 .../partials/query.editor.html | 4 +- .../datasource/grafana-live/plugin.json | 7 +++ .../app/plugins/datasource/stream/plugin.json | 8 --- 9 files changed, 96 insertions(+), 29 deletions(-) rename public/app/plugins/datasource/{stream => grafana-live}/datasource.ts (80%) rename public/app/plugins/datasource/{stream => grafana-live}/module.ts (100%) rename public/app/plugins/datasource/{stream => grafana-live}/partials/query.editor.html (78%) create mode 100644 public/app/plugins/datasource/grafana-live/plugin.json delete mode 100644 public/app/plugins/datasource/stream/plugin.json diff --git a/pkg/api/live/conn.go b/pkg/api/live/conn.go index 90ce0af9cae..4f5df016dcd 100644 --- a/pkg/api/live/conn.go +++ b/pkg/api/live/conn.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/log" ) @@ -25,11 +26,27 @@ const ( var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type subscription struct { + name string } type connection struct { - ws *websocket.Conn - send chan []byte + ws *websocket.Conn + streams []*subscription + send chan []byte +} + +func newConnection(ws *websocket.Conn) *connection { + return &connection{ + send: make(chan []byte, 256), + streams: make([]*subscription, 0), + ws: ws, + } } func (c *connection) readPump() { @@ -48,7 +65,24 @@ func (c *connection) readPump() { } break } - h.broadcast <- message + + c.handleMessage(message) + } +} + +func (c *connection) handleMessage(message []byte) { + json, err := simplejson.NewJson(message) + if err != nil { + log.Error(3, "Unreadable message on websocket channel:", err) + } + + msgType := json.Get("action").MustString() + streamName := json.Get("stream").MustString() + + switch msgType { + case "subscribe": + c.streams = append(c.streams, &subscription{name: streamName}) + log.Info("Live: subscribing to stream %v", streamName) } } @@ -98,7 +132,7 @@ func (lc *LiveConn) Serve(w http.ResponseWriter, r *http.Request) { log.Error(3, "Live: Failed to upgrade connection to WebSocket", err) return } - c := &connection{send: make(chan []byte, 256), ws: ws} + c := newConnection(ws) h.register <- c go c.writePump() c.readPump() diff --git a/pkg/api/live/hub.go b/pkg/api/live/hub.go index 9e276eb46d1..c776cea7f34 100644 --- a/pkg/api/live/hub.go +++ b/pkg/api/live/hub.go @@ -28,6 +28,7 @@ func (h *hub) run() { select { case c := <-h.register: h.connections[c] = true + log.Info("Live: New connection (Total count: %v)", len(h.connections)) case c := <-h.unregister: if _, ok := h.connections[c]; ok { delete(h.connections, c) diff --git a/public/app/core/directives/plugin_component.ts b/public/app/core/directives/plugin_component.ts index 69535327cc8..9dc0f86897f 100644 --- a/public/app/core/directives/plugin_component.ts +++ b/public/app/core/directives/plugin_component.ts @@ -149,7 +149,11 @@ function pluginDirectiveLoader($compile, datasourceSrv, $rootScope, $q, $http, $ // ConfigCtrl case 'datasource-config-ctrl': { var dsMeta = scope.ctrl.datasourceMeta; - return System.import(dsMeta.module).then(function(dsModule) { + return System.import(dsMeta.module).then(function(dsModule): any { + if (!dsMeta.ConfigCtrl) { + return {notFound: true}; + } + return { baseUrl: dsMeta.baseUrl, name: 'ds-config-' + dsMeta.id, diff --git a/public/app/core/live/live_srv.ts b/public/app/core/live/live_srv.ts index d417597e1ea..df4fb220932 100644 --- a/public/app/core/live/live_srv.ts +++ b/public/app/core/live/live_srv.ts @@ -5,24 +5,51 @@ import coreModule from 'app/core/core_module'; export class LiveSrv { conn: any; + initPromise: any; + + getWebSocketUrl() { + var l = window.location; + return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws'; + } init() { - this.conn = new WebSocket("ws://localhost:3000/ws"); - this.conn.onclose = function(evt) { - console.log("WebSocket closed"); - }; - this.conn.onmessage = function(evt) { - console.log("WebSocket message", evt.data); - }; - this.conn.onopen = function(evt) { - console.log("Connection opened"); - }; + if (this.initPromise) { + return this.initPromise; + } + + if (this.conn && this.conn.readyState === 1) { + return Promise.resolve(); + } + + this.initPromise = new Promise((resolve, reject) => { + console.log('Live: connecting...'); + this.conn = new WebSocket(this.getWebSocketUrl()); + + this.conn.onclose = function(evt) { + reject({message: 'Connection closed'}); + }; + + this.conn.onmessage = function(evt) { + console.log("Live: message received:", evt.data); + }; + + this.conn.onopen = function(evt) { + console.log('Live: connection open'); + resolve(); + }; + }); + + return this.initPromise; + } + + send(data) { + this.conn.send(JSON.stringify(data)); } subscribe(name) { - if (!this.conn) { - this.init(); - } + return this.init().then(() => { + this.send({action: 'subscribe', stream: name}); + }); } } diff --git a/public/app/plugins/datasource/stream/datasource.ts b/public/app/plugins/datasource/grafana-live/datasource.ts similarity index 80% rename from public/app/plugins/datasource/stream/datasource.ts rename to public/app/plugins/datasource/grafana-live/datasource.ts index ae2bb87dbef..b62e64b6793 100644 --- a/public/app/plugins/datasource/stream/datasource.ts +++ b/public/app/plugins/datasource/grafana-live/datasource.ts @@ -15,7 +15,9 @@ export class GrafanaStreamDS { } var target = options.targets[0]; - liveSrv.subscribe(target); + liveSrv.subscribe(target.stream); + + return Promise.resolve({data: []}); } } diff --git a/public/app/plugins/datasource/stream/module.ts b/public/app/plugins/datasource/grafana-live/module.ts similarity index 100% rename from public/app/plugins/datasource/stream/module.ts rename to public/app/plugins/datasource/grafana-live/module.ts diff --git a/public/app/plugins/datasource/stream/partials/query.editor.html b/public/app/plugins/datasource/grafana-live/partials/query.editor.html similarity index 78% rename from public/app/plugins/datasource/stream/partials/query.editor.html rename to public/app/plugins/datasource/grafana-live/partials/query.editor.html index 6a8554d017d..912b28a6247 100644 --- a/public/app/plugins/datasource/stream/partials/query.editor.html +++ b/public/app/plugins/datasource/grafana-live/partials/query.editor.html @@ -1,8 +1,8 @@
  • - Stream Expression + Stream
  • - +
  • diff --git a/public/app/plugins/datasource/grafana-live/plugin.json b/public/app/plugins/datasource/grafana-live/plugin.json new file mode 100644 index 00000000000..1f2ec204949 --- /dev/null +++ b/public/app/plugins/datasource/grafana-live/plugin.json @@ -0,0 +1,7 @@ +{ + "type": "datasource", + "name": "Grafana Live", + "id": "grafana-live", + + "metrics": true +} diff --git a/public/app/plugins/datasource/stream/plugin.json b/public/app/plugins/datasource/stream/plugin.json deleted file mode 100644 index df5fee586ed..00000000000 --- a/public/app/plugins/datasource/stream/plugin.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "type": "datasource", - "name": "Grafana Stream DS", - "id": "grafana-stream-ds", - - "builtIn": true, - "metrics": true -}