From 9954bb9c157e0a5744c75e418cd900f5154644de Mon Sep 17 00:00:00 2001 From: wescoeur Date: Wed, 28 Oct 2015 17:11:09 +0100 Subject: [PATCH] Logger is implemented. Logger save jobs events : (start, end, call start, call end) Logs can be obtained in api. --- package.json | 2 + sample.config.yaml | 6 ++ src/api/index.js | 1 + src/api/log.js | 18 +++++ src/index.js | 9 +-- src/job-executor.js | 123 +++++++++++++++++++++++--------- src/job-executor.spec.js | 7 +- src/loggers/leveldb.js | 53 ++++++++++++++ src/schemas/log.js | 31 ++++++++ src/schemas/log/jobCallEnd.js | 33 +++++++++ src/schemas/log/jobCallStart.js | 27 +++++++ src/schemas/log/jobEnd.js | 21 ++++++ src/schemas/log/jobStart.js | 26 +++++++ src/utils.spec.js | 26 +++++++ src/xo.js | 26 ++++++- 15 files changed, 364 insertions(+), 45 deletions(-) create mode 100644 src/api/log.js create mode 100644 src/loggers/leveldb.js create mode 100644 src/schemas/log.js create mode 100644 src/schemas/log/jobCallEnd.js create mode 100644 src/schemas/log/jobCallStart.js create mode 100644 src/schemas/log/jobEnd.js create mode 100644 src/schemas/log/jobStart.js diff --git a/package.json b/package.json index 89add5d17..3560adcb1 100644 --- a/package.json +++ b/package.json @@ -61,6 +61,8 @@ "julien-f-source-map-support": "0.0.0", "julien-f-unzip": "^0.2.1", "kindof": "^2.0.0", + "level": "^1.3.0", + "level-sublevel": "^6.5.2", "lodash.assign": "^3.0.0", "lodash.bind": "^3.0.0", "lodash.difference": "^3.2.0", diff --git a/sample.config.yaml b/sample.config.yaml index 60da9478c..6578f47b7 100644 --- a/sample.config.yaml +++ b/sample.config.yaml @@ -109,3 +109,9 @@ redis: # # Default: tcp://localhost:6379 #uri: '' + +# Directory containing the database of XO. +# Currently used for logs. +# +# Default: '/var/lib/xo-server/data' +#datadir: '/var/lib/xo-server/data' diff --git a/src/api/index.js b/src/api/index.js index ca5d3d38a..752bfe954 100644 --- a/src/api/index.js +++ b/src/api/index.js @@ -10,6 +10,7 @@ export * as docker from './docker' export * as group from './group' export * as host from './host' export * as job from './job' +export * as log from './log' export * as message from './message' export * as pbd from './pbd' export * as pif from './pif' diff --git a/src/api/log.js b/src/api/log.js new file mode 100644 index 000000000..06799cf3a --- /dev/null +++ b/src/api/log.js @@ -0,0 +1,18 @@ +export async function get ({namespace}) { + const logger = this.getLogger(namespace) + + return new Promise((resolve, reject) => { + const logs = {} + + logger.createReadStream() + .on('data', (data) => { + logs[data.key] = data.value + }) + .on('end', () => { + resolve(logs) + }) + .on('error', reject) + }) +} + +get.description = 'returns logs list for one namespace' diff --git a/src/index.js b/src/index.js index c8f3bd439..6c63a294a 100644 --- a/src/index.js +++ b/src/index.js @@ -65,7 +65,8 @@ const DEFAULTS = { { port: 80 } ], mounts: {} - } + }, + datadir: '/var/lib/xo-server/data' } const DEPRECATED_ENTRIES = [ @@ -573,11 +574,7 @@ export default async function main (args) { // Create the main object which will connects to Xen servers and // manages all the models. const xo = new Xo() - await xo.start({ - redis: { - uri: config.redis && config.redis.uri - } - }) + await xo.start(config) // Loads default authentication providers. registerPasswordAuthenticationProvider(xo) diff --git a/src/job-executor.js b/src/job-executor.js index a1b75b088..1543c8e16 100644 --- a/src/job-executor.js +++ b/src/job-executor.js @@ -24,6 +24,24 @@ export const productParams = (...args) => { return product } +export function _computeCrossProduct (items, productCb, extractValueMap = {}) { + const upstreamValues = [] + const itemsCopy = items.slice() + const item = itemsCopy.pop() + const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item + forEach(values, value => { + if (itemsCopy.length) { + let downstreamValues = _computeCrossProduct(itemsCopy, productCb, extractValueMap) + forEach(downstreamValues, downstreamValue => { + upstreamValues.push(productCb(value, downstreamValue)) + }) + } else { + upstreamValues.push(value) + } + }) + return upstreamValues +} + export default class JobExecutor { constructor (xo, api) { this.xo = xo @@ -31,46 +49,83 @@ export default class JobExecutor { this._extractValueCb = { 'set': items => items.values } + this._logger = this.xo.getLogger('jobs') } - exec (job) { - if (job.type === 'call') { - this._execCall(job.userId, job.method, job.paramsVector) - } else { - throw new UnsupportedJobType(job) - } - } - - _execCall (userId, method, paramsVector) { - let paramsFlatVector - if (paramsVector.type === 'crossProduct') { - paramsFlatVector = this._computeCrossProduct(paramsVector.items, productParams, this._extractValueCb) - } else { - throw new UnsupportedVectorType(paramsVector) - } - const connection = this.xo.createUserConnection() - connection.set('user_id', userId) - forEach(paramsFlatVector, params => { - this.api.call(connection, method, { ...params }) + async exec (job) { + const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, { + event: 'job.start', + userId: job.userId, + jobId: job.id, + key: job.key }) - connection.close() - } - _computeCrossProduct (items, productCb, extractValueMap = {}) { - const upstreamValues = [] - const itemsCopy = items.slice() - const item = itemsCopy.pop() - const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item - forEach(values, value => { - if (itemsCopy.length) { - let downstreamValues = this._computeCrossProduct(itemsCopy, productCb, extractValueMap) - forEach(downstreamValues, downstreamValue => { - upstreamValues.push(productCb(value, downstreamValue)) - }) + try { + if (job.type === 'call') { + await this._execCall(job, runJobId) } else { - upstreamValues.push(value) + throw new UnsupportedJobType(job) } + + this._logger.notice(`Execution terminated for ${job.id}.`, { + event: 'job.end', + runJobId + }) + } catch (e) { + this._logger.error(`The execution of ${job.id} has failed.`, { + event: 'job.end', + runJobId, + error: e + }) + } + } + + async _execCall (job, runJobId) { + let paramsFlatVector + + if (job.paramsVector.type === 'crossProduct') { + paramsFlatVector = _computeCrossProduct(job.paramsVector.items, productParams, this._extractValueCb) + } else { + throw new UnsupportedVectorType(job.paramsVector) + } + + const connection = this.xo.createUserConnection() + const promises = [] + + connection.set('user_id', job.userId) + + forEach(paramsFlatVector, params => { + const runCallId = this._logger.notice(`Starting ${job.method} call. (${job.id})`, { + event: 'jobCall.start', + runJobId, + method: job.method, + params + }) + + promises.push( + this.api.call(connection, job.method, assign({}, params)).then( + value => { + this._logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, { + event: 'jobCall.end', + runJobId, + runCallId, + returnedValue: value + }) + }, + reason => { + this._logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, { + event: 'jobCall.end', + runJobId, + runCallId, + error: reason + }) + } + ) + ) }) - return upstreamValues + + connection.close() + + await Promise.all(promises) } } diff --git a/src/job-executor.spec.js b/src/job-executor.spec.js index 117bc934e..b88e1558f 100644 --- a/src/job-executor.spec.js +++ b/src/job-executor.spec.js @@ -4,7 +4,7 @@ import {expect} from 'chai' import leche from 'leche' import {productParams} from './job-executor' -import JobExecutor from './job-executor' +import {_computeCrossProduct} from './job-executor' describe('productParams', function () { leche.withData({ @@ -36,8 +36,7 @@ describe('productParams', function () { }) }) -describe('JobExecutor._computeCrossProduct', function () { - const jobExecutor = new JobExecutor({}) +describe('_computeCrossProduct', function () { // Gives the sum of all args const addTest = (...args) => args.reduce((prev, curr) => prev + curr, 0) // Gives the product of all args @@ -64,7 +63,7 @@ describe('JobExecutor._computeCrossProduct', function () { ] }, function (product, items, cb) { it('Crosses sets of values with a crossProduct callback', function () { - expect(jobExecutor._computeCrossProduct(items, cb)).to.have.members(product) + expect(_computeCrossProduct(items, cb)).to.have.members(product) }) }) }) diff --git a/src/loggers/leveldb.js b/src/loggers/leveldb.js new file mode 100644 index 000000000..0efe4a2bb --- /dev/null +++ b/src/loggers/leveldb.js @@ -0,0 +1,53 @@ +// See: https://en.wikipedia.org/wiki/Syslog#Severity_level +const LEVELS = [ + 'emergency', + 'alert', + 'critical', + 'error', + 'warning', + 'notice', + 'informational', + 'debug' +] + +let lastDate = 0 +let lastId = 0 + +function generateUniqueKey (date) { + lastId = (date === lastDate) ? (lastId + 1) : 0 + lastDate = date + + return `${lastDate}:${lastId}` +} + +export default class LevelDbLogger { + constructor (db) { + this._db = db + } + + _add (level, message, data) { + const log = { + level, + message, + data, + time: Date.now() + } + + const key = generateUniqueKey(log.time) + this._db.put(key, log) + return key + } + + createReadStream () { + return this._db.createReadStream() + } +} + +// Create high level log methods. +for (const level of LEVELS) { + Object.defineProperty(LevelDbLogger.prototype, level, { + value (message, data) { + return this._add(level, message, data) + } + }) +} diff --git a/src/schemas/log.js b/src/schemas/log.js new file mode 100644 index 000000000..10f48b43d --- /dev/null +++ b/src/schemas/log.js @@ -0,0 +1,31 @@ +export default { + $schema: 'http://json-schema.org/draft-04/schema#', + type: 'object', + properties: { + id: { + type: 'string', + description: 'unique identifier for this log' + }, + time: { + type: 'string', + description: 'timestamp (in miliseconds) of this log' + }, + message: { + type: 'string', + description: 'human readable (short) description of this log' + }, + data: { + oneOf: [ + { '$ref': 'log/jobStart.js' }, + { '$ref': 'log/jobEnd.js' }, + { '$ref': 'log/jobCallStart.js' }, + { '$ref': 'log/jobCallEnd.js' } + ] + } + }, + required: [ + 'id', + 'time', + 'message' + ] +} diff --git a/src/schemas/log/jobCallEnd.js b/src/schemas/log/jobCallEnd.js new file mode 100644 index 000000000..1ed5f78d4 --- /dev/null +++ b/src/schemas/log/jobCallEnd.js @@ -0,0 +1,33 @@ +export default { + $schema: 'http://json-schema.org/draft-04/schema#', + type: 'object', + properties: { + event: { + enum: ['jobCall.end'] + }, + runJobId: { + type: 'string', + description: 'instance id of this job' + }, + runCallId: { + type: 'string', + description: 'instance id of this call' + }, + error: { + type: 'object', + description: 'describe one failure, exists if the call has failed' + }, + returnedValue: { + description: 'call\'s result, exists if the call is a success' + } + }, + required: [ + 'event', + 'runJobId', + 'runCallId' + ], + oneOf: [ + { required: ['error'] }, + { required: ['returnedValue'] } + ] +} diff --git a/src/schemas/log/jobCallStart.js b/src/schemas/log/jobCallStart.js new file mode 100644 index 000000000..c2477253f --- /dev/null +++ b/src/schemas/log/jobCallStart.js @@ -0,0 +1,27 @@ +export default { + $schema: 'http://json-schema.org/draft-04/schema#', + type: 'object', + properties: { + event: { + enum: ['jobCall.start'] + }, + runJobId: { + type: 'string', + description: 'instance id of this job' + }, + method: { + type: 'string', + description: 'method linked to this call' + }, + params: { + type: 'object', + description: 'params of the called method' + } + }, + required: [ + 'event', + 'runJobId', + 'method', + 'params' + ] +} diff --git a/src/schemas/log/jobEnd.js b/src/schemas/log/jobEnd.js new file mode 100644 index 000000000..d46e5889f --- /dev/null +++ b/src/schemas/log/jobEnd.js @@ -0,0 +1,21 @@ +export default { + $schema: 'http://json-schema.org/draft-04/schema#', + type: 'object', + properties: { + event: { + enum: ['job.end'] + }, + runJobId: { + type: 'string', + description: 'instance id of this job' + }, + error: { + type: 'object', + description: 'describe one failure, exists if no call has been made' + } + }, + required: [ + 'event', + 'runJobId' + ] +} diff --git a/src/schemas/log/jobStart.js b/src/schemas/log/jobStart.js new file mode 100644 index 000000000..fdeddeb1e --- /dev/null +++ b/src/schemas/log/jobStart.js @@ -0,0 +1,26 @@ +export default { + $schema: 'http://json-schema.org/draft-04/schema#', + type: 'object', + properties: { + event: { + enum: ['job.start'] + }, + userId: { + type: 'string', + description: 'user who executes this job' + }, + jobId: { + type: 'string', + description: 'identifier of this job' + }, + key: { + type: 'string' + } + }, + required: [ + 'event', + 'userId', + 'jobId', + 'key' + ] +} diff --git a/src/utils.spec.js b/src/utils.spec.js index 369cd8ee2..aebb03167 100644 --- a/src/utils.spec.js +++ b/src/utils.spec.js @@ -122,6 +122,32 @@ describe('generateToken()', () => { // ------------------------------------------------------------------- +describe('pSettle()', () => { + it('makes an array of PromiseInspection', async () => { + const [ + status1, + status2 + ] = await pSettle([ + Promise.resolve(42), + Promise.reject('fatality') + ]) + + expect(status1.isRejected()).to.equal(false) + expect(status2.isRejected()).to.equal(true) + + expect(status1.isFulfilled()).to.equal(true) + expect(status2.isFulfilled()).to.equal(false) + + expect(status1.value()).to.equal(42) + expect(::status2.value).to.throw() + + expect(::status1.reason).to.throw() + expect(status2.reason()).to.equal('fatality') + }) +}) + +// ------------------------------------------------------------------- + describe('parseSize()', function () { it('parses a human size', function () { expect(parseSize('1G')).to.equal(1e9) diff --git a/src/xo.js b/src/xo.js index ce078b2db..8fc691877 100644 --- a/src/xo.js +++ b/src/xo.js @@ -8,8 +8,10 @@ import fs from 'fs-promise' import includes from 'lodash.includes' import isFunction from 'lodash.isfunction' import isString from 'lodash.isstring' +import levelup from 'level' import sortBy from 'lodash.sortby' import startsWith from 'lodash.startswith' +import sublevel from 'level-sublevel' import XoCollection from 'xo-collection' import XoUniqueIndex from 'xo-collection/unique-index' import {createClient as createRedisClient} from 'redis' @@ -22,6 +24,7 @@ import { import * as xapiObjectsToXo from './xapi-objects-to-xo' import checkAuthorization from './acl' import Connection from './connection' +import LevelDbLogger from './loggers/leveldb' import Xapi from './xapi' import XapiStats from './xapi-stats' import {Acls} from './models/acl' @@ -130,9 +133,10 @@ export default class Xo extends EventEmitter { this._nextConId = 0 this._connections = createRawObject() - this._authenticationProviders = new Set() this._authenticationFailures = createRawObject() + this._authenticationProviders = new Set() this._httpRequestWatchers = createRawObject() + this._leveldb = null // Initialized in start(). this._plugins = createRawObject() this._watchObjects() @@ -141,6 +145,14 @@ export default class Xo extends EventEmitter { // ----------------------------------------------------------------- async start (config) { + await fs.mkdirp(config.datadir) + + this._leveldb = sublevel(levelup(`${config.datadir}/leveldb`, { + valueEncoding: 'json' + })) + + // --------------------------------------------------------------- + // Connects to Redis. const redis = createRedisClient(config.redis && config.redis.uri) @@ -189,6 +201,8 @@ export default class Xo extends EventEmitter { indexes: ['enabled'] }) + // --------------------------------------------------------------- + // Proxies tokens/users related events to XO and removes tokens // when their related user is removed. this._tokens.on('remove', ids => { @@ -206,6 +220,8 @@ export default class Xo extends EventEmitter { } }.bind(this)) + // --------------------------------------------------------------- + // Connects to existing servers. const servers = await this._servers.get() for (let server of servers) { @@ -220,6 +236,14 @@ export default class Xo extends EventEmitter { // ----------------------------------------------------------------- + getLogger (identifier) { + return new LevelDbLogger( + this._leveldb.sublevel('logs').sublevel(identifier) + ) + } + + // ----------------------------------------------------------------- + async _getAclsForUser (userId) { const subjects = (await this.getUser(userId)).groups.concat(userId)