The main file now also uses CoffeeScript and fibers.

This commit is contained in:
Julien Fontanet 2013-12-13 14:17:12 +01:00
parent a3d04fa810
commit 95dc2d430c
6 changed files with 257 additions and 564 deletions

View File

@ -19,7 +19,7 @@
"fibers": "~1.0.1",
"hashy": "~0.1.0",
"js-yaml": "~2.1.3",
"optimist": "~0.6.0",
"nconf": "~0.6.9",
"q": "~0.9.7",
"sync": "~0.2.2",
"then-redis": "~0.3.9",

37
src/fibers-utils.coffee Normal file
View File

@ -0,0 +1,37 @@
# Async code is easier with fibers (light threads)!
$fiber = require 'fibers'
#=====================================================================
# Makes a function running in its own fiber.
$fiberize = (fn) ->
(args...) ->
$fiber(-> fn args...).run()
# Makes an asynchrouneous function synchrouneous (in a fiber).
$synchronize = (fn) ->
(args...) ->
fiber = $fiber.current
fn args..., (error, result) ->
if error?
fiber.throwInto error
else
fiber.run result
$fiber.yield()
# TODO: remove promises ASAP.
$waitForPromise = (promise) ->
fiber = $fiber.current
promise.then(
(value) -> fiber.run value
(error) -> fiber.throwInto error
)
$fiber.yield()
#=====================================================================
module.exports = {
$fiberize
$synchronize
$waitForPromise
}

200
src/main.coffee Normal file
View File

@ -0,0 +1,200 @@
# File system handling.
$fs = require 'fs'
# HTTP(S) handling.
$http = require 'http'
$https = require 'https'
#---------------------------------------------------------------------
# Low level tools.
$_ = require 'underscore'
# HTTP(s) middleware framework.
$connect = require 'connect'
# Async code is easier with fibers (light threads)!
$fiber = require 'fibers'
# Configuration handling.
$nconf = require 'nconf'
# WebSocket server.
$WSServer = (require 'ws').Server
# YAML formatting and parsing.
$YAML = require 'js-yaml'
#---------------------------------------------------------------------
$API = require './api'
$Session = require './session'
$XO = require './xo'
# Helpers for dealing with fibers.
{$fiberize, $waitForPromise} = require './fibers-utils'
#=====================================================================
$createWebServer = ({host, port, certificate, key}) ->
# Creates the web server.
if certificate? and key?
protocol = 'HTTPS'
server = $https.createServer {
cert: certificate
key: key
}
else
protocol = 'HTTP'
server = $http.createServer()
# Starts listening.
server.listen port, host
# Prints a message when it has started to listen.
server.once 'listening', ->
console.log "#{protocol} server is listening on #{host}:#{port}"
# Prints an error message if if failed to listen.
server.once 'error', ->
console.warn "#{protocol} server could not listen on #{host}:#{port}"
# Returns the server.
server
$handleJsonRpcCall = (api, session, encodedRequest) ->
request = {
id: null
}
formatError = (error) -> JSON.stringify {
jsonrpc: '2.0'
error: error
id: request.id
}
# Parses the JSON.
try
request = JSON.parse encodedRequest.toString()
catch error
return formatError (
if error instanceof SyntaxError
$API.err.INVALID_JSON
else
$API.err.SERVER_ERROR
)
# Checks it is a compliant JSON-RPC 2.0 request.
if not request.method or not request.params or request.id is undefined or request.jsonrpc isnt '2.0'
return formatError $API.err.INVALID_REQUEST
# Executes the requested method on the API.
try
JSON.stringify {
jsonrpc: '2.0'
result: $waitForPromise (api.exec session, request)
id: request.id
}
catch error
# If it is not a valid API error, hides it with a generic server error.
unless $_.isObject error and error not instanceof Error
error = $API.err.SERVER_ERROR
formatError error
#=====================================================================
# Main.
do $fiberize ->
# Loads the environment.
$nconf.env()
# Parses process' arguments.
$nconf.argv()
# Loads the configuration file.
$nconf.use 'file', {
file: "#{__dirname}/../config/local.yaml"
format: {
stringify: (obj) -> $YAML.safeDump obj
parse: (string) -> $YAML.safeLoad string
}
}
# Defines defaults configuration.
$nconf.defaults {
http: {
enabled: true
host: '0.0.0.0'
port: 80
}
https: {
enabled: false
host: '0.0.0.0'
port: 443
certificate: './certificate.pem'
key: './key.pem'
}
redis: {
uri: 'tcp://127.0.0.1:6379'
}
}
# Prints a message if deprecated entries are specified.
for entry in ['users', 'servers']
if $nconf.get entry
console.warn "[Warn] `#{entry}` configuration is deprecated."
# Creates the main object which will connects to Xen servers and
# manages all the models.
xo = new $XO()
# Starts it.
xo.start {
redis: {
uri: $nconf.get 'redis:uri'
}
}
# Creates web servers according to the configuration.
webServers = []
if $nconf.get 'http:enabled'
webServers.push $createWebServer {
secure: false
host: $nconf.get 'http:host'
port: $nconf.get 'http:port'
}
if $nconf.get 'https:enabled'
webServers.push $createWebServer {
secure: true
host: $nconf.get 'https:host'
port: $nconf.get 'https:port'
}
# Static file serving (e.g. for XO-Web).
connect = $connect()
.use $connect.static "#{__dirname}/../public/http"
webServer.on 'request', connect for webServer in webServers
# Creates the API.
api = new $API xo
# JSON-RPC over WebSocket.
for webServer in webServers
new $WSServer({
server: webServer
path: '/api/'
}).on 'connection', (socket) ->
# Binds a session to this connection.
session = new $Session xo
session.once 'close', -> socket.close()
socket.once 'close', -> session.close()
# Handles each request in a separate fiber.
socket.on 'message', $fiberize (request) ->
response = $handleJsonRpcCall api, session, request
# The socket may have closed beetween the request and the
# response.
socket.send response if socket.readyState is socket.OPEN

View File

@ -1,492 +0,0 @@
// Enables strict mode for this whole file.
'use strict';
//--------------------------------------
// Node.js modules.
//--------------------------------------
// Filesystem operations.
var fs = require('fs');
//--------------------------------------
// External modules.
//--------------------------------------
// Provides Node.js the ability to load modules written in
// CoffeeScript.
require('coffee-script');
// Easier async with fibers (light threads).
var $fiber = require('fibers');
// Utility-belt.
var _ = require('underscore');
// HTTP(s) middleware framework.
var connect = require('connect');
// Promises library.
var Q = require('q');
// WebSocket server.
var WSServer = require('ws').Server;
//--------------------------------------
// Internal modules.
//--------------------------------------
var Session = require('./session');
var xo = new (require('./xo'))();
var Api = require('./api');
var api = new Api(xo);
//////////////////////////////////////////////////////////////////////
function json_api_call(session, message)
{
/* jshint newcap:false */
var req = {
'id': null,
};
function format_error(error)
{
return JSON.stringify({
'jsonrpc': '2.0',
'error': error,
'id': req.id,
});
}
try
{
req = JSON.parse(message.toString());
}
catch (e)
{
if (e instanceof SyntaxError)
{
return Q(format_error(Api.err.INVALID_JSON));
}
return Q(format_error(Api.err.SERVER_ERROR));
}
/* jshint laxbreak: true */
if (!req.method || !req.params
|| (undefined === req.id)
|| ('2.0' !== req.jsonrpc))
{
return Q(format_error(Api.err.INVALID_REQUEST));
}
return api.exec(
session,
{
'method': req.method,
'params': req.params,
}
).then(
function (result) {
return JSON.stringify({
'jsonrpc': '2.0',
'result': result,
'id': req.id,
});
},
function (error) {
if (!_.isObject(error) || (error instanceof Error))
{
console.error(error.stack || error);
return format_error(Api.err.SERVER_ERROR);
}
return format_error(error);
}
);
}
//////////////////////////////////////////////////////////////////////
// Static file serving (for XO-Web for instance).
//////////////////////////////////////////////////////////////////////
xo.on('started', function (data) {
var webServers = data.webServers;
// If there is no web servers, no need to create the `connect`
// application.
if (!webServers || !webServers.length)
{
return;
}
var app = connect()
// Compresses reponses using GZip.
.use(connect.compress())
// Serve static files.
.use(connect.static(__dirname +'/../public/http'))
;
webServers.forEach(function (server) {
server.on('request', app);
});
});
//////////////////////////////////////////////////////////////////////
// WebSocket to TCP proxy (used for consoles).
//////////////////////////////////////////////////////////////////////
// Protocol:
//
// 1. The web browser connects to the server via WebSocket.
//
// 2. It sends a first message containing the “host” and “port” to
// connect to in a JSON object.
//
// 3. All messages to send to the TCP server and received from it will
// be encoded using Base64.
// @todo Avoid Base64 encoding and directly use binary streams.
// xo.on('started', function (data) {
// var webServers = data.webServers;
//
// if (!webServers || !webServers.length)
// {
// return;
// }
//
// var on_connection = function (socket) {
// // Parses the first message which SHOULD contains the host and
// // port of the host to connect to.
// socket.once('message', function (message) {
// try
// {
// message = JSON.parse(message);
// }
// catch (e)
// {
// socket.close();
// return;
// }
// if (!message.host && !message.port)
// {
// socket.close();
// return;
// }
// var target = tcp.createConnection(message.port, message.host);
// target.on('data', function (data) {
// socket.send(data.toString('base64'));
// });
// target.on('end', function () {
// socket.close();
// });
// target.on('error', function () {
// target.end();
// });
// socket.on('message', function (message) {
// target.write(new Buffer(message, 'base64'));
// });
// socket.on('close', function () {
// target.end();
// });
// });
// socket.on('error', function () {
// socket.close();
// });
// };
// webServer.forEach(function (server) {
// new WSServer({
// 'server': server,
// 'path': '/websockify',
// }).on('connection', on_connection);
// });
// });
//////////////////////////////////////////////////////////////////////
// JSON-RPC over WebSocket.
//////////////////////////////////////////////////////////////////////
xo.on('started', function (data) {
var webServers = data.webServers;
if (!webServers || !webServers.length)
{
return;
}
var on_connection = function (socket) {
var session = new Session(xo);
session.once('close', function () {
socket.close();
});
socket.on('message', function (request) {
json_api_call(session, request).then(function (response) {
// Send response if session still open.
if (socket.readyState === socket.OPEN)
{
socket.send(response);
}
}).done();
});
// @todo Ugly inter dependency.
socket.once('close', function () {
session.close();
});
};
webServers.forEach(function (server) {
new WSServer({
'server': server,
'path': '/api/',
}).on('connection', on_connection);
});
});
//////////////////////////////////////////////////////////////////////
// JSON-RPC over TCP.
//////////////////////////////////////////////////////////////////////
// xo.on('started', function () {
// require('net').createServer(function (socket) {
// var session = new Session(xo);
// session.on('close', function () {
// socket.end(); // @todo Check it is enough.
// });
// var length = null; // Expected message length.
// var buffer = new Buffer(1024); // @todo I hate hardcoded values!
// socket.on('data', function (data) {
// data.copy(buffer);
// // Read the message length.
// if (!length)
// {
// var i = _.indexOf(buffer, 10);
// if (-1 === i)
// {
// return;
// }
// length = +(buffer.toString('ascii', 0, i));
// // If the length is NaN, we cannot do anything except
// // closing the connection.
// if (length !== length)
// {
// session.close();
// return;
// }
// buffer = buffer.slice(i + 1);
// }
// // We do not have received everything.
// if (buffer.length < length)
// {
// return;
// }
// json_api_call(
// session,
// buffer.slice(0, length).toString()
// ).then(function (response) {
// // @todo Handle long messages.
// socket.write(response.length +'\n'+ response);
// }).done();
// // @todo Check it frees the memory.
// buffer = buffer.slice(length);
// length = null;
// });
// // @todo Ugly inter dependency.
// socket.once('close', function () {
// session.close();
// });
// }).listen(1024); // @todo Should be configurable.
// });
//////////////////////////////////////////////////////////////////////
var cfg = {
'data': {},
'get': function (path) {
/* jshint noempty: false */
if (!_.isArray(path))
{
path = Array.prototype.slice.call(arguments);
}
var current = this.data;
for (
var i = 0, n = path.length;
(i < n) && (undefined !== (current = current[path[i]]));
++i
)
{}
if (i < n)
{
return undefined;
}
return current;
},
'merge': function (data) {
var helper = function (target, source) {
if (null === source) // Special case.
{
return target;
}
if (!_.isObject(target) || !_.isObject(source))
{
return source;
}
if (_.isArray(target) && _.isArray(source))
{
target.push.apply(target, source);
return target;
}
for (var prop in source)
{
target[prop] = helper(target[prop], source[prop]);
}
return target;
};
helper(this.data, data);
return this;
},
};
// Defaults values.
cfg.merge({
'http': {
'enabled': true,
'host': '0.0.0.0',
'port': 80,
},
'https': {
'enabled': false,
'host': '0.0.0.0',
'port': 443,
'certificate': './certificate.pem',
'key': './key.pem',
},
'redis': {
'uri': 'tcp://127.0.0.1:6379',
},
});
function read_file(file)
{
return Q.ninvoke(fs, 'readFile', file, {'encoding': 'utf-8'});
}
var webServers = [];
read_file(__dirname +'/../config/local.yaml').then(
function (data) {
data = require('js-yaml').safeLoad(data);
cfg.merge(data);
},
function (e) {
console.error('[Warning] Reading config file: '+ e);
}
).then(function () {
if (cfg.get('users'))
{
console.warn('[Warn] Users in config file are no longer supported.');
}
if (cfg.get('servers'))
{
console.warn('[Warn] Servers in config file are no longer supported.');
}
if (cfg.get('http', 'enabled'))
{
var host = cfg.get('http', 'host');
var port = cfg.get('http', 'port');
webServers.push(
require('http').createServer().listen(port, host)
.on('listening', function () {
console.info(
'HTTP server is listening on %s:%s',
host, port
);
})
.on('error', function () {
console.warn(
'[Warn] HTTP server could not listen on %s:%s',
host, port
);
})
);
}
if (cfg.get('https', 'enabled'))
{
return Q.all([
read_file(cfg.get('https', 'certificate')),
read_file(cfg.get('https', 'key')),
]).spread(function (certificate, key) {
var host = cfg.get('https', 'host');
var port = cfg.get('https', 'port');
webServers.push(
require('https').createServer({
'cert': certificate,
'key': key,
}).listen(port, host).on('listening', function () {
console.info(
'HTTPS server is listening on %s:%s',
host, port
);
}).on('error', function () {
console.warn(
'[Warn] HTTPS server could not listen on %s:%s',
host, port
);
})
);
});
}
}).then(function () {
$fiber(function () {
xo.start({
'config': cfg,
'webServers': webServers,
});
}).run();
}).done();
// Create an initial user if there are none.
xo.on('started', function () {
xo.users.exists().then(function (success) {
if (success)
{
return;
}
console.warn('[Warning] No users, creating “admin@admin.net” with password “admin”');
return xo.users.create('admin@admin.net', 'admin', 'admin');
}).done();
});

View File

@ -1,9 +1,6 @@
# Cryptographic tools.
$crypto = require 'crypto'
# Base class for events handling.
$EventEmitter = (require 'events').EventEmitter
#---------------------------------------------------------------------
# Low level tools.
@ -12,9 +9,8 @@ $_ = require 'underscore'
# Password hasing.
$hashy = require 'hashy'
# Async code is easier with fibers (light threads) and futures!
# Async code is easier with fibers (light threads)!
$fiber = require 'fibers'
$future = require 'fibers/future'
# Redis.
$createRedisClient = (require 'then-redis').createClient
@ -37,23 +33,12 @@ $Model = require './model'
# Connection to XAPI.
$XAPI = require './xapi'
# Helpers for dealing with fibers.
{$fiberize, $synchronize, $waitForPromise} = require './fibers-utils'
#=====================================================================
# Makes a function running in its own fiber.
fiberize = (fn) ->
(args...) ->
($fiber fn).run args...
randomBytes = $future.wrap $crypto.randomBytes
# TODO: remove promises AMAP.
waitForPromise = (promise) ->
fiber = $fiber.current
promise.then(
(value) -> fiber.run value
(error) -> fiber.throwInto error
)
$fiber.yield()
$randomBytes = $synchronize $crypto.randomBytes
#=====================================================================
# Models and collections.
@ -69,7 +54,7 @@ class Servers extends $RedisCollection
class Token extends $Model
@generate: (userId) ->
new Token {
id: (randomBytes 32).toString 'base64'
id: ($randomBytes 32).toString 'base64'
user_id: userId
}
@ -88,16 +73,16 @@ class User extends $Model
validate: -> # TODO
setPassword: (password) ->
@set 'password', waitForPromise ($hashy.hash password)
@set 'password', $waitForPromise ($hashy.hash password)
# Checks the password and updates the hash if necessary.
checkPassword: (password) ->
hash = @get 'hash'
unless waitForPromise ($hashy.verify password, hash)
unless $waitForPromise ($hashy.verify password, hash)
return false
if waitForPromise ($hashy.needsRehash hash)
if $waitForPromise ($hashy.needsRehash hash)
@setPassword password
true
@ -126,13 +111,11 @@ class Users extends $RedisCollection
#=====================================================================
class XO extends $EventEmitter
start: (data) ->
{config} = data
class XO
start: (config) ->
# Connects to Redis.
redis = $createRedisClient (config.get 'redis', 'uri')
redis = $createRedisClient config.redis.uri
# Creates persistent collections.
@servers = new Servers {
@ -174,7 +157,7 @@ class XO extends $EventEmitter
# This function asynchroneously connects to a server, retrieves
# all its objects and monitors events.
connect = fiberize (server) =>
connect = $fiberize (server) =>
# Identifier of the connection.
id = server.id
@ -284,7 +267,7 @@ class XO extends $EventEmitter
throw error unless error[0] is 'SESSION_NOT_REGISTERED'
# Connects to existing servers.
connect server for server in waitForPromise @servers.get()
connect server for server in $waitForPromise @servers.get()
# Automatically connects to new servers.
@servers.on 'add', (servers) ->
@ -292,11 +275,6 @@ class XO extends $EventEmitter
# TODO: Automaticall disconnects from removed servers.
# Emits the started event.
#
# FIXME: This event is no longer needed because fibers are used.
@emit 'started', data
#=====================================================================
module.exports = XO

View File

@ -21,54 +21,24 @@
#
# @package Xen Orchestra Server
MAIN='src/main.js'
MAIN='src/main.coffee'
COFFEE='./node_modules/.bin/coffee'
#######################################
# _fail message
_fail()
{
printf '%s\n' "$1" >&2
exit 1
}
# _have <command>
_have()
{
type "$1" 2> /dev/null >&2
}
########################################
cd -P "$(dirname "$(which "$0")")"
########################################
if [ "${NODE:-}" ]
then
node=$NODE
unset NODE # Unexports it.
elif _have node
then
node=node
elif _have nodejs
then
node=nodejs
else
_fail 'node.js could not be found'
fi
########################################
if [ "${1:-}" = '--debug' ]
then
# Launch XO-Server in debug mode.
"$node" --debug-brk "$MAIN" > /dev/null &
"$COFFEE" --nodejs --debug-brk "$MAIN" > /dev/null &
# Runs Node Inspector (avoids the recommended alternate HTTP port
# for XO-Server).
exec ./node_modules/.bin/node-inspector --web-port 64985
else
exec "$node" "$MAIN"
exec "$COFFEE" "$MAIN"
fi