Merge pull request #115 from vatesfr/abhamonr-logger-module

Logger is implemented.
This commit is contained in:
Fabrice Marsaud 2015-11-12 10:16:04 +01:00
commit aa81e72e45
15 changed files with 364 additions and 45 deletions

View File

@ -61,6 +61,8 @@
"julien-f-source-map-support": "0.0.0",
"julien-f-unzip": "^0.2.1",
"kindof": "^2.0.0",
"level": "^1.3.0",
"level-sublevel": "^6.5.2",
"lodash.assign": "^3.0.0",
"lodash.bind": "^3.0.0",
"lodash.difference": "^3.2.0",

View File

@ -109,3 +109,9 @@ redis:
#
# Default: tcp://localhost:6379
#uri: ''
# Directory containing the database of XO.
# Currently used for logs.
#
# Default: '/var/lib/xo-server/data'
#datadir: '/var/lib/xo-server/data'

View File

@ -10,6 +10,7 @@ export * as docker from './docker'
export * as group from './group'
export * as host from './host'
export * as job from './job'
export * as log from './log'
export * as message from './message'
export * as pbd from './pbd'
export * as pif from './pif'

18
src/api/log.js Normal file
View File

@ -0,0 +1,18 @@
export async function get ({namespace}) {
const logger = this.getLogger(namespace)
return new Promise((resolve, reject) => {
const logs = {}
logger.createReadStream()
.on('data', (data) => {
logs[data.key] = data.value
})
.on('end', () => {
resolve(logs)
})
.on('error', reject)
})
}
get.description = 'returns logs list for one namespace'

View File

@ -65,7 +65,8 @@ const DEFAULTS = {
{ port: 80 }
],
mounts: {}
}
},
datadir: '/var/lib/xo-server/data'
}
const DEPRECATED_ENTRIES = [
@ -573,11 +574,7 @@ export default async function main (args) {
// Create the main object which will connects to Xen servers and
// manages all the models.
const xo = new Xo()
await xo.start({
redis: {
uri: config.redis && config.redis.uri
}
})
await xo.start(config)
// Loads default authentication providers.
registerPasswordAuthenticationProvider(xo)

View File

@ -24,6 +24,24 @@ export const productParams = (...args) => {
return product
}
export function _computeCrossProduct (items, productCb, extractValueMap = {}) {
const upstreamValues = []
const itemsCopy = items.slice()
const item = itemsCopy.pop()
const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item
forEach(values, value => {
if (itemsCopy.length) {
let downstreamValues = _computeCrossProduct(itemsCopy, productCb, extractValueMap)
forEach(downstreamValues, downstreamValue => {
upstreamValues.push(productCb(value, downstreamValue))
})
} else {
upstreamValues.push(value)
}
})
return upstreamValues
}
export default class JobExecutor {
constructor (xo, api) {
this.xo = xo
@ -31,46 +49,83 @@ export default class JobExecutor {
this._extractValueCb = {
'set': items => items.values
}
this._logger = this.xo.getLogger('jobs')
}
exec (job) {
if (job.type === 'call') {
this._execCall(job.userId, job.method, job.paramsVector)
} else {
throw new UnsupportedJobType(job)
}
}
_execCall (userId, method, paramsVector) {
let paramsFlatVector
if (paramsVector.type === 'crossProduct') {
paramsFlatVector = this._computeCrossProduct(paramsVector.items, productParams, this._extractValueCb)
} else {
throw new UnsupportedVectorType(paramsVector)
}
const connection = this.xo.createUserConnection()
connection.set('user_id', userId)
forEach(paramsFlatVector, params => {
this.api.call(connection, method, { ...params })
async exec (job) {
const runJobId = this._logger.notice(`Starting execution of ${job.id}.`, {
event: 'job.start',
userId: job.userId,
jobId: job.id,
key: job.key
})
connection.close()
}
_computeCrossProduct (items, productCb, extractValueMap = {}) {
const upstreamValues = []
const itemsCopy = items.slice()
const item = itemsCopy.pop()
const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item
forEach(values, value => {
if (itemsCopy.length) {
let downstreamValues = this._computeCrossProduct(itemsCopy, productCb, extractValueMap)
forEach(downstreamValues, downstreamValue => {
upstreamValues.push(productCb(value, downstreamValue))
})
try {
if (job.type === 'call') {
await this._execCall(job, runJobId)
} else {
upstreamValues.push(value)
throw new UnsupportedJobType(job)
}
this._logger.notice(`Execution terminated for ${job.id}.`, {
event: 'job.end',
runJobId
})
} catch (e) {
this._logger.error(`The execution of ${job.id} has failed.`, {
event: 'job.end',
runJobId,
error: e
})
}
}
async _execCall (job, runJobId) {
let paramsFlatVector
if (job.paramsVector.type === 'crossProduct') {
paramsFlatVector = _computeCrossProduct(job.paramsVector.items, productParams, this._extractValueCb)
} else {
throw new UnsupportedVectorType(job.paramsVector)
}
const connection = this.xo.createUserConnection()
const promises = []
connection.set('user_id', job.userId)
forEach(paramsFlatVector, params => {
const runCallId = this._logger.notice(`Starting ${job.method} call. (${job.id})`, {
event: 'jobCall.start',
runJobId,
method: job.method,
params
})
promises.push(
this.api.call(connection, job.method, assign({}, params)).then(
value => {
this._logger.notice(`Call ${job.method} (${runCallId}) is a success. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
returnedValue: value
})
},
reason => {
this._logger.notice(`Call ${job.method} (${runCallId}) has failed. (${job.id})`, {
event: 'jobCall.end',
runJobId,
runCallId,
error: reason
})
}
)
)
})
return upstreamValues
connection.close()
await Promise.all(promises)
}
}

View File

@ -4,7 +4,7 @@ import {expect} from 'chai'
import leche from 'leche'
import {productParams} from './job-executor'
import JobExecutor from './job-executor'
import {_computeCrossProduct} from './job-executor'
describe('productParams', function () {
leche.withData({
@ -36,8 +36,7 @@ describe('productParams', function () {
})
})
describe('JobExecutor._computeCrossProduct', function () {
const jobExecutor = new JobExecutor({})
describe('_computeCrossProduct', function () {
// Gives the sum of all args
const addTest = (...args) => args.reduce((prev, curr) => prev + curr, 0)
// Gives the product of all args
@ -64,7 +63,7 @@ describe('JobExecutor._computeCrossProduct', function () {
]
}, function (product, items, cb) {
it('Crosses sets of values with a crossProduct callback', function () {
expect(jobExecutor._computeCrossProduct(items, cb)).to.have.members(product)
expect(_computeCrossProduct(items, cb)).to.have.members(product)
})
})
})

53
src/loggers/leveldb.js Normal file
View File

@ -0,0 +1,53 @@
// See: https://en.wikipedia.org/wiki/Syslog#Severity_level
const LEVELS = [
'emergency',
'alert',
'critical',
'error',
'warning',
'notice',
'informational',
'debug'
]
let lastDate = 0
let lastId = 0
function generateUniqueKey (date) {
lastId = (date === lastDate) ? (lastId + 1) : 0
lastDate = date
return `${lastDate}:${lastId}`
}
export default class LevelDbLogger {
constructor (db) {
this._db = db
}
_add (level, message, data) {
const log = {
level,
message,
data,
time: Date.now()
}
const key = generateUniqueKey(log.time)
this._db.put(key, log)
return key
}
createReadStream () {
return this._db.createReadStream()
}
}
// Create high level log methods.
for (const level of LEVELS) {
Object.defineProperty(LevelDbLogger.prototype, level, {
value (message, data) {
return this._add(level, message, data)
}
})
}

31
src/schemas/log.js Normal file
View File

@ -0,0 +1,31 @@
export default {
$schema: 'http://json-schema.org/draft-04/schema#',
type: 'object',
properties: {
id: {
type: 'string',
description: 'unique identifier for this log'
},
time: {
type: 'string',
description: 'timestamp (in miliseconds) of this log'
},
message: {
type: 'string',
description: 'human readable (short) description of this log'
},
data: {
oneOf: [
{ '$ref': 'log/jobStart.js' },
{ '$ref': 'log/jobEnd.js' },
{ '$ref': 'log/jobCallStart.js' },
{ '$ref': 'log/jobCallEnd.js' }
]
}
},
required: [
'id',
'time',
'message'
]
}

View File

@ -0,0 +1,33 @@
export default {
$schema: 'http://json-schema.org/draft-04/schema#',
type: 'object',
properties: {
event: {
enum: ['jobCall.end']
},
runJobId: {
type: 'string',
description: 'instance id of this job'
},
runCallId: {
type: 'string',
description: 'instance id of this call'
},
error: {
type: 'object',
description: 'describe one failure, exists if the call has failed'
},
returnedValue: {
description: 'call\'s result, exists if the call is a success'
}
},
required: [
'event',
'runJobId',
'runCallId'
],
oneOf: [
{ required: ['error'] },
{ required: ['returnedValue'] }
]
}

View File

@ -0,0 +1,27 @@
export default {
$schema: 'http://json-schema.org/draft-04/schema#',
type: 'object',
properties: {
event: {
enum: ['jobCall.start']
},
runJobId: {
type: 'string',
description: 'instance id of this job'
},
method: {
type: 'string',
description: 'method linked to this call'
},
params: {
type: 'object',
description: 'params of the called method'
}
},
required: [
'event',
'runJobId',
'method',
'params'
]
}

21
src/schemas/log/jobEnd.js Normal file
View File

@ -0,0 +1,21 @@
export default {
$schema: 'http://json-schema.org/draft-04/schema#',
type: 'object',
properties: {
event: {
enum: ['job.end']
},
runJobId: {
type: 'string',
description: 'instance id of this job'
},
error: {
type: 'object',
description: 'describe one failure, exists if no call has been made'
}
},
required: [
'event',
'runJobId'
]
}

View File

@ -0,0 +1,26 @@
export default {
$schema: 'http://json-schema.org/draft-04/schema#',
type: 'object',
properties: {
event: {
enum: ['job.start']
},
userId: {
type: 'string',
description: 'user who executes this job'
},
jobId: {
type: 'string',
description: 'identifier of this job'
},
key: {
type: 'string'
}
},
required: [
'event',
'userId',
'jobId',
'key'
]
}

View File

@ -122,6 +122,32 @@ describe('generateToken()', () => {
// -------------------------------------------------------------------
describe('pSettle()', () => {
it('makes an array of PromiseInspection', async () => {
const [
status1,
status2
] = await pSettle([
Promise.resolve(42),
Promise.reject('fatality')
])
expect(status1.isRejected()).to.equal(false)
expect(status2.isRejected()).to.equal(true)
expect(status1.isFulfilled()).to.equal(true)
expect(status2.isFulfilled()).to.equal(false)
expect(status1.value()).to.equal(42)
expect(::status2.value).to.throw()
expect(::status1.reason).to.throw()
expect(status2.reason()).to.equal('fatality')
})
})
// -------------------------------------------------------------------
describe('parseSize()', function () {
it('parses a human size', function () {
expect(parseSize('1G')).to.equal(1e9)

View File

@ -8,8 +8,10 @@ import fs from 'fs-promise'
import includes from 'lodash.includes'
import isFunction from 'lodash.isfunction'
import isString from 'lodash.isstring'
import levelup from 'level'
import sortBy from 'lodash.sortby'
import startsWith from 'lodash.startswith'
import sublevel from 'level-sublevel'
import XoCollection from 'xo-collection'
import XoUniqueIndex from 'xo-collection/unique-index'
import {createClient as createRedisClient} from 'redis'
@ -22,6 +24,7 @@ import {
import * as xapiObjectsToXo from './xapi-objects-to-xo'
import checkAuthorization from './acl'
import Connection from './connection'
import LevelDbLogger from './loggers/leveldb'
import Xapi from './xapi'
import XapiStats from './xapi-stats'
import {Acls} from './models/acl'
@ -130,9 +133,10 @@ export default class Xo extends EventEmitter {
this._nextConId = 0
this._connections = createRawObject()
this._authenticationProviders = new Set()
this._authenticationFailures = createRawObject()
this._authenticationProviders = new Set()
this._httpRequestWatchers = createRawObject()
this._leveldb = null // Initialized in start().
this._plugins = createRawObject()
this._watchObjects()
@ -141,6 +145,14 @@ export default class Xo extends EventEmitter {
// -----------------------------------------------------------------
async start (config) {
await fs.mkdirp(config.datadir)
this._leveldb = sublevel(levelup(`${config.datadir}/leveldb`, {
valueEncoding: 'json'
}))
// ---------------------------------------------------------------
// Connects to Redis.
const redis = createRedisClient(config.redis && config.redis.uri)
@ -189,6 +201,8 @@ export default class Xo extends EventEmitter {
indexes: ['enabled']
})
// ---------------------------------------------------------------
// Proxies tokens/users related events to XO and removes tokens
// when their related user is removed.
this._tokens.on('remove', ids => {
@ -206,6 +220,8 @@ export default class Xo extends EventEmitter {
}
}.bind(this))
// ---------------------------------------------------------------
// Connects to existing servers.
const servers = await this._servers.get()
for (let server of servers) {
@ -220,6 +236,14 @@ export default class Xo extends EventEmitter {
// -----------------------------------------------------------------
getLogger (identifier) {
return new LevelDbLogger(
this._leveldb.sublevel('logs').sublevel(identifier)
)
}
// -----------------------------------------------------------------
async _getAclsForUser (userId) {
const subjects = (await this.getUser(userId)).groups.concat(userId)