Use json-rpc.

This commit is contained in:
Julien Fontanet 2015-02-05 17:05:49 +01:00
parent f598e0d0d5
commit fec8dd74af
2 changed files with 42 additions and 72 deletions

View File

@ -4,7 +4,11 @@
var assign = require('lodash.assign'); var assign = require('lodash.assign');
var Bluebird = require('bluebird'); var Bluebird = require('bluebird');
var EventEmitter = require('events').EventEmitter;
var forEach = require('lodash.foreach'); var forEach = require('lodash.foreach');
var inherits = require('util').inherits;
var jsonRpc = require('json-rpc');
var MethodNotFound = require('json-rpc/errors').MethodNotFound;
var parseUrl = require('url').parse; var parseUrl = require('url').parse;
var WebSocket = require('ws'); var WebSocket = require('ws');
@ -24,10 +28,6 @@ function makeDeferred() {
}; };
} }
function notConnected() {
throw new Error('not connected');
}
function startsWith(string, target) { function startsWith(string, target) {
return (string.lastIndexOf(target, 0) === 0); return (string.lastIndexOf(target, 0) === 0);
} }
@ -52,23 +52,35 @@ function fixUrl(url) {
//==================================================================== //====================================================================
function Xo(url) { function Xo(url) {
// Super constructor.
EventEmitter.call(this);
// Fix the URL (ensure correct protocol and /api/ path).
this._url = fixUrl(url); this._url = fixUrl(url);
// Identifier of the next request.
this._nextId = 0;
// Promises linked to the requests.
this._deferreds = {};
// Current WebSocket.
this._socket = null;
// Current status which may be: // Current status which may be:
// - disconnected // - disconnected
// - connecting // - connecting
// - connected // - connected
this.status = 'disconnected'; this.status = 'disconnected';
// Will contains the WebSocket.
this._socket = null;
// The JSON-RPC server.
var this_ = this;
this._jsonRpc = jsonRpc.createServer(function (message) {
if (message.type === 'notification') {
this_.emit('notification', message);
} else {
// This object does not support requests.
throw new MethodNotFound(message.method);
}
}).on('data', function (message) {
this_._socket.send(JSON.stringify(message));
});
} }
inherits(Xo, EventEmitter);
assign(Xo.prototype, { assign(Xo.prototype, {
close: function () { close: function () {
@ -78,7 +90,7 @@ assign(Xo.prototype, {
}, },
connect: Bluebird.method(function () { connect: Bluebird.method(function () {
if (this.status === 'connected') { if (this._socket) {
return; return;
} }
this.status = 'connecting'; this.status = 'connecting';
@ -92,59 +104,27 @@ assign(Xo.prototype, {
} }
var socket = this._socket = new WebSocket(this._url, '', opts); var socket = this._socket = new WebSocket(this._url, '', opts);
// Used to avoid binding listeners to this object.
var this_ = this;
// When the socket opens, send any queued requests. // When the socket opens, send any queued requests.
socket.addEventListener('open', function () { socket.addEventListener('open', function () {
this.status = 'connected'; this.status = 'connected';
// (Re)Opens accesses.
delete this.send;
// Resolves the promise. // Resolves the promise.
deferred.resolve(); deferred.resolve();
}.bind(this)); });
socket.addEventListener('message', function (data) { socket.addEventListener('message', function (message) {
// `ws` API is lightly different from standard API. this_._jsonRpc.write(message.data);
if (data.data) { });
data = data.data;
}
// TODO: Wraps in a promise to prevent releasing the Zalgo.
var response = JSON.parse(data);
var id = response.id;
var deferred = this._deferreds[id];
if (!deferred) {
// Response already handled.
return;
}
delete this._deferreds[id];
if ('error' in response) {
return deferred.reject(response.error);
}
if ('result' in response) {
return deferred.resolve(response.result);
}
deferred.reject({
message: 'invalid response received',
object: response,
});
}.bind(this));
socket.addEventListener('close', function () { socket.addEventListener('close', function () {
// Closes accesses. this_.status = 'disconnected';
this.send = notConnected; this_._socket = null;
// Fails all waiting requests. this_._jsonRpc.failPendingRequests('connection lost');
forEach(this._deferreds, function (deferred) { });
deferred.reject('not connected');
});
this._deferreds = {};
}.bind(this));
socket.addEventListener('error', function (error) { socket.addEventListener('error', function (error) {
// Fails the connect promise if possible. // Fails the connect promise if possible.
@ -155,22 +135,11 @@ assign(Xo.prototype, {
}), }),
call: function (method, params) { call: function (method, params) {
var jsonRpc = this._jsonRpc;
return this.connect().then(function () { return this.connect().then(function () {
var socket = this._socket; return jsonRpc.request(method, params);
});
var id = this._nextId++;
socket.send(JSON.stringify({
jsonrpc: '2.0',
id: id,
method: method,
params: params || [],
}));
var deferred = this._deferreds[id] = makeDeferred();
return deferred.promise;
}.bind(this));
}, },
}); });

View File

@ -26,6 +26,7 @@
], ],
"dependencies": { "dependencies": {
"bluebird": "^2.9.6", "bluebird": "^2.9.6",
"json-rpc": "git://github.com/julien-f/js-json-rpc",
"lodash.assign": "^3.0.0", "lodash.assign": "^3.0.0",
"lodash.foreach": "^3.0.1", "lodash.foreach": "^3.0.1",
"ws": "^0.7.1" "ws": "^0.7.1"