From 92f20b9b7d374b9bd0dd3fc2a5aabace5ef4f40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torkel=20=C3=96degaard?= Date: Mon, 14 Mar 2016 19:21:32 +0100 Subject: [PATCH] feat(websocket): reconnection and resubscription handling, #4355 --- package.json | 2 +- public/app/core/live/live_srv.ts | 80 ++++++++++++++++--- .../datasource/grafana-live/datasource.ts | 6 +- 3 files changed, 76 insertions(+), 12 deletions(-) diff --git a/package.json b/package.json index 1c8af93eb5f..1dea1e79445 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "mocha": "2.3.4", "phantomjs": "^1.9.19", "reflect-metadata": "0.1.2", - "rxjs": "5.0.0-beta.0", + "rxjs": "5.0.0-beta.2", "sass-lint": "^1.5.0", "systemjs": "0.19.20", "zone.js": "0.5.10" diff --git a/public/app/core/live/live_srv.ts b/public/app/core/live/live_srv.ts index df4fb220932..b6954bdcffc 100644 --- a/public/app/core/live/live_srv.ts +++ b/public/app/core/live/live_srv.ts @@ -1,57 +1,117 @@ /// +import _ from 'lodash'; import config from 'app/core/config'; import coreModule from 'app/core/core_module'; +import {Observable} from 'vendor/npm/rxjs/Observable'; + export class LiveSrv { conn: any; + observers: any; initPromise: any; + constructor() { + this.observers = {}; + } + getWebSocketUrl() { var l = window.location; return ((l.protocol === "https:") ? "wss://" : "ws://") + l.host + config.appSubUrl + '/ws'; } - init() { + getConnection() { if (this.initPromise) { return this.initPromise; } if (this.conn && this.conn.readyState === 1) { - return Promise.resolve(); + return Promise.resolve(this.conn); } this.initPromise = new Promise((resolve, reject) => { console.log('Live: connecting...'); this.conn = new WebSocket(this.getWebSocketUrl()); - this.conn.onclose = function(evt) { + this.conn.onclose = (evt) => { + console.log("Live: websocket onclose", evt); reject({message: 'Connection closed'}); + + this.initPromise = null; + setTimeout(this.reconnect.bind(this), 2000); }; - this.conn.onmessage = function(evt) { + this.conn.onmessage = (evt) => { console.log("Live: message received:", evt.data); }; - this.conn.onopen = function(evt) { - console.log('Live: connection open'); - resolve(); + this.conn.onerror = (evt) => { + this.initPromise = null; + reject({message: 'Connection error'}); + console.log("Live: websocket error", evt); + }; + + this.conn.onopen = (evt) => { + console.log('opened'); + this.initPromise = null; + resolve(this.conn); }; }); return this.initPromise; } + reconnect() { + // no need to reconnect if no one cares + if (_.keys(this.observers).length === 0) { + return; + } + + console.log('LiveSrv: Reconnecting'); + + this.getConnection().then(conn => { + _.each(this.observers, (value, key) => { + this.send({action: 'subscribe', stream: key}); + }); + }); + } + send(data) { this.conn.send(JSON.stringify(data)); } - subscribe(name) { - return this.init().then(() => { - this.send({action: 'subscribe', stream: name}); + addObserver(stream, observer) { + this.observers[stream] = observer; + + this.getConnection().then(conn => { + this.send({action: 'subscribe', stream: stream}); }); } + removeObserver(stream, observer) { + delete this.observers[stream]; + + this.getConnection().then(conn => { + this.send({action: 'unsubscribe', stream: stream}); + }); + } + + subscribe(streamName) { + console.log('LiveSrv.subscribe: ' + streamName); + + return Observable.create(observer => { + this.addObserver(streamName, observer); + + return () => { + this.removeObserver(streamName, observer); + }; + }); + + // return this.init().then(() => { + // this.send({action: 'subscribe', stream: name}); + // }); + } + } var instance = new LiveSrv(); diff --git a/public/app/plugins/datasource/grafana-live/datasource.ts b/public/app/plugins/datasource/grafana-live/datasource.ts index b62e64b6793..13ee30d7ddb 100644 --- a/public/app/plugins/datasource/grafana-live/datasource.ts +++ b/public/app/plugins/datasource/grafana-live/datasource.ts @@ -3,6 +3,7 @@ import {liveSrv} from 'app/core/core'; export class GrafanaStreamDS { + subscription: any; /** @ngInject */ constructor(private $q) { @@ -15,7 +16,10 @@ export class GrafanaStreamDS { } var target = options.targets[0]; - liveSrv.subscribe(target.stream); + var observable = liveSrv.subscribe(target.stream); + this.subscription = observable.subscribe(data => { + console.log("grafana stream ds data!", data); + }); return Promise.resolve({data: []}); }