Job scheduler feature

This commit is contained in:
Fabrice Marsaud 2015-06-10 17:25:37 +02:00
parent 39e3025077
commit 4a5f6d8393
14 changed files with 741 additions and 1 deletions

View File

@ -35,7 +35,9 @@
"bluebird": "^2.9.14",
"clarify": "^1.0.5",
"connect": "^3.3.5",
"cron": "^1.0.9",
"debug": "^2.1.3",
"escape-string-regexp": "^1.0.3",
"event-to-promise": "^0.3.2",
"exec-promise": "^0.5.1",
"fs-promise": "^0.3.1",
@ -68,6 +70,7 @@
"lodash.pick": "^3.0.0",
"lodash.result": "^3.0.0",
"lodash.snakecase": "^3.0.1",
"lodash.sortby": "^3.1.4",
"lodash.startswith": "^3.0.1",
"make-error": "^1",
"multikey-hash": "^1.0.1",
@ -86,6 +89,7 @@
},
"devDependencies": {
"babel-eslint": "^3.1.9",
"chai": "^3.0.0",
"dependency-check": "^2.4.0",
"gulp": "git://github.com/gulpjs/gulp#4.0",
"gulp-babel": "^5",
@ -93,6 +97,7 @@
"gulp-plumber": "^1.0.0",
"gulp-sourcemaps": "^1.5.1",
"gulp-watch": "^4.2.2",
"leche": "^2.1.1",
"mocha": "^2.2.1",
"must": "^0.12.0",
"node-inspector": "^0.10.1",

View File

@ -9,11 +9,14 @@ export * as disk from './disk'
export * as docker from './docker'
export * as group from './group'
export * as host from './host'
export * as job from './job'
export * as message from './message'
export * as pbd from './pbd'
export * as pif from './pif'
export * as pool from './pool'
export * as role from './role'
export * as schedule from './schedule'
export * as scheduler from './scheduler'
export * as server from './server'
export * as session from './session'
export * as sr from './sr'

103
src/api/job.js Normal file
View File

@ -0,0 +1,103 @@
// FIXME so far, no acls for jobs
export async function getAll () {
return await this.getAllJobs()
}
getAll.permission = 'admin'
getAll.description = 'Gets all available jobs'
export async function get (id) {
return await this.getJob(id)
}
get.permission = 'admin'
get.description = 'Gets an existing job'
get.params = {
id: {type: 'string'}
}
export async function create ({job}) {
return (await this.createJob(this.session.get('user_id'), job)).id
}
create.permission = 'admin'
create.description = 'Creates a new job from description object'
create.params = {
job: {
type: 'object',
properties: {
type: {type: 'string'},
key: {type: 'string'},
method: {type: 'string'},
paramsVector: {
type: 'object',
properties: {
type: {type: 'string'},
items: {
type: 'array',
items: {
type: 'object',
properties: {
type: {type: 'string'},
values: {
type: 'array',
items: {type: 'object'}
}
}
}
}
}
}
}
}
}
export async function set ({job}) {
await this.updateJob(job)
}
set.permission = 'admin'
set.description = 'Modifies an existing job from a description object'
set.params = {
job: {
type: 'object',
properties: {
id: {type: 'string'},
type: {type: 'string'},
key: {type: 'string'},
method: {type: 'string'},
paramsVector: {
type: 'object',
properties: {
type: {type: 'string'},
items: {
type: 'array',
items: {
type: 'object',
properties: {
type: {type: 'string'},
values: {
type: 'array',
items: {type: 'object'}
}
}
}
}
}
}
}
}
}
async function delete_ ({id}) {
await this.removeJob(id)
}
delete_.permission = 'admin'
delete_.description = 'Deletes an existing job'
delete_.params = {
id: {type: 'string'}
}
export {delete_ as delete}

55
src/api/schedule.js Normal file
View File

@ -0,0 +1,55 @@
// FIXME so far, no acls for schedules
export async function getAll () {
return await this.getAllSchedules()
}
getAll.permission = 'admin'
getAll.description = 'Gets all existing schedules'
export async function get (id) {
return await this.getSchedule(id)
}
get.permission = 'admin'
get.description = 'Gets an existing schedule'
get.params = {
id: {type: 'string'}
}
export async function create ({jobId, cron, enabled}) {
return await this.createSchedule(this.session.get('user_id'), {job: jobId, cron, enabled})
}
create.permission = 'admin'
create.description = 'Creates a new schedule'
create.params = {
jobId: {type: 'string'},
cron: {type: 'string'},
enabled: {type: 'boolean', optional: true}
}
export async function set ({id, jobId, cron, enabled}) {
await this.updateSchedule(id, {job: jobId, cron, enabled})
}
set.permission = 'admin'
set.description = 'Modifies an existing schedule'
set.params = {
id: {type: 'string'},
jobId: {type: 'string', optional: true},
cron: {type: 'string', optional: true},
enabled: {type: 'boolean', optional: true}
}
async function delete_ ({id}) {
await this.removeSchedule(id)
}
delete_.permission = 'admin'
delete_.description = 'Deletes an existing schedule'
delete_.params = {
id: {type: 'string'}
}
export {delete_ as delete}

30
src/api/scheduler.js Normal file
View File

@ -0,0 +1,30 @@
export async function enable ({id}) {
const schedule = await this.getSchedule(id)
schedule.enabled = true
await this.updateSchedule(id, schedule)
}
enable.permission = 'admin'
enable.description = 'Enables a schedule to run it\'s job as scheduled'
enable.params = {
id: {type: 'string'}
}
export async function disable ({id}) {
const schedule = await this.getSchedule(id)
schedule.enabled = false
await this.updateSchedule(id, schedule)
}
disable.permission = 'admin'
disable.description = 'Disables a schedule'
disable.params = {
id: {type: 'string'}
}
export function getScheduleTable () {
return this.scheduler.scheduleTable
}
disable.permission = 'admin'
disable.description = 'Get a map of existing schedules enabled/disabled state'

View File

@ -471,6 +471,26 @@ exports.snapshot = snapshot
#---------------------------------------------------------------------
rollingSnapshot = $coroutine ({vm, tag, depth}) ->
snapshot = yield @getXAPI(vm).rollingSnapshotVm(vm.ref, tag, depth)
return snapshot.$id
rollingSnapshot.params = {
id: { type: 'string'}
tag: {type: 'string'}
depth: {type: 'number'}
}
rollingSnapshot.resolve = {
vm: ['id', 'VM', 'administrate']
}
rollingSnapshot.description = 'Snaphots a VM with a tagged name, and removes the oldest snapshot with the same tag according to depth'
exports.rollingSnapshot = rollingSnapshot
#---------------------------------------------------------------------
start = $coroutine ({vm}) ->
yield @getXAPI(vm).call(
'VM.start', vm.ref

View File

@ -29,6 +29,8 @@ import {readFile} from 'fs-promise'
import * as apiMethods from './api/index'
import Api from './api'
import JobExecutor from './job-executor'
import Scheduler from './scheduler'
import WebServer from 'http-server-plus'
import wsProxy from './ws-proxy'
import Xo from './xo'
@ -285,7 +287,18 @@ const setUpApi = (webServer, xo) => {
socket.send(data, onSend)
}
})
})
return api
}
const setUpScheduler = (api, xo) => {
const jobExecutor = new JobExecutor(xo, api)
const scheduler = new Scheduler(xo, {executor: jobExecutor})
xo.scheduler = scheduler
return scheduler
}
// ===================================================================
@ -435,7 +448,9 @@ export default async function main (args) {
connect.use(bind(xo._handleProxyRequest, xo))
// Must be set up before the static files.
setUpApi(webServer, xo)
const api = setUpApi(webServer, xo)
const scheduler = setUpScheduler(api, xo)
setUpProxies(connect, config.http.proxies)
@ -455,10 +470,12 @@ export default async function main (args) {
// responsability?)
process.on('SIGINT', () => {
debug('SIGINT caught, closing web server…')
scheduler.disableAll()
webServer.close()
})
process.on('SIGTERM', () => {
debug('SIGTERM caught, closing web server…')
scheduler.disableAll()
webServer.close()
})

72
src/job-executor.js Normal file
View File

@ -0,0 +1,72 @@
import assign from 'lodash.assign'
import forEach from 'lodash.foreach'
import {BaseError} from 'make-error'
export class JobExecutorError extends BaseError {}
export class UnsupportedJobType extends JobExecutorError {
constructor (job) {
super('Unknown job type: ' + job.type)
}
}
export class UnsupportedVectorType extends JobExecutorError {
constructor (vector) {
super('Unknown vector type: ' + vector.type)
}
}
export const productParams = (...args) => {
let product = Object.create(null)
assign(product, ...args)
return product
}
export default class JobExecutor {
constructor (xo, api) {
this.xo = xo
this.api = api
this._extractValueCb = {
'set': items => items.values
}
}
exec (job) {
if (job.type === 'call') {
this._execCall(job.userId, job.method, job.paramsVector)
} else {
throw new UnsupportedJobType(job)
}
}
_execCall (userId, method, paramsVector) {
let paramsFlatVector
if (paramsVector.type === 'crossProduct') {
paramsFlatVector = this._computeCrossProduct(paramsVector.items, productParams, this._extractValueCb)
} else {
throw new UnsupportedVectorType(paramsVector)
}
const connection = this.xo.createUserConnection()
connection.set('user_id', userId)
forEach(paramsFlatVector, params => {
this.api.call(connection, method, assign({}, params))
})
connection.close()
}
_computeCrossProduct (items, productCb, extractValueMap = {}) {
const upstreamValues = []
const itemsCopy = items.slice()
const item = itemsCopy.pop()
const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item
forEach(values, value => {
if (itemsCopy.length) {
let downstreamValues = this._computeCrossProduct(itemsCopy, productCb, extractValueMap)
forEach(downstreamValues, downstreamValue => {
upstreamValues.push(productCb(value, downstreamValue))
})
} else {
upstreamValues.push(value)
}
})
return upstreamValues
}
}

70
src/job-executor.spec.js Normal file
View File

@ -0,0 +1,70 @@
/* eslint-env mocha */
import {expect} from 'chai'
import leche from 'leche'
import {productParams} from './job-executor'
import JobExecutor from './job-executor'
describe('productParams', function () {
leche.withData({
'Two sets of one': [
{a: 1, b: 2}, {a: 1}, {b: 2}
],
'Two sets of two': [
{a: 1, b: 2, c: 3, d: 4}, {a: 1, b: 2}, {c: 3, d: 4}
],
'Three sets': [
{a: 1, b: 2, c: 3, d: 4, e: 5, f: 6}, {a: 1}, {b: 2, c: 3}, {d: 4, e: 5, f: 6}
],
'One set': [
{a: 1, b: 2}, {a: 1, b: 2}
],
'Empty set': [
{a: 1}, {a: 1}, {}
],
'All empty': [
{}, {}, {}
],
'No set': [
{}
]
}, function (resultSet, ...sets) {
it('Assembles all given param sets in on set', function () {
expect(productParams(...sets)).to.eql(resultSet)
})
})
})
describe('JobExecutor._computeCrossProduct', function () {
const jobExecutor = new JobExecutor({})
// Gives the sum of all args
const addTest = (...args) => args.reduce((prev, curr) => prev + curr, 0)
// Gives the product of all args
const multiplyTest = (...args) => args.reduce((prev, curr) => prev * curr, 1)
leche.withData({
'2 sets of 2 items to multiply': [
[10, 14, 15, 21], [[2, 3], [5, 7]], multiplyTest
],
'3 sets of 2 items to multiply': [
[110, 130, 154, 182, 165, 195, 231, 273], [[2, 3], [5, 7], [11, 13]], multiplyTest
],
'2 sets of 3 items to multiply': [
[14, 22, 26, 21, 33, 39, 35, 55, 65], [[2, 3, 5], [7, 11, 13]], multiplyTest
],
'2 sets of 2 items to add': [
[7, 9, 8, 10], [[2, 3], [5, 7]], addTest
],
'3 sets of 2 items to add': [
[18, 20, 20, 22, 19, 21, 21, 23], [[2, 3], [5, 7], [11, 13]], addTest
],
'2 sets of 3 items to add': [
[9, 13, 15, 10, 14, 16, 12, 16, 18], [[2, 3, 5], [7, 11, 13]], addTest
]
}, function (product, items, cb) {
it('Crosses sets of values with a crossProduct callback', function () {
expect(jobExecutor._computeCrossProduct(items, cb)).to.have.members(product)
})
})
})

48
src/models/job.js Normal file
View File

@ -0,0 +1,48 @@
import forEach from 'lodash.foreach'
import Collection from '../collection/redis'
import Model from '../model'
// ===================================================================
export default class Job extends Model {}
export class Jobs extends Collection {
get Model () {
return Job
}
get idPrefix () {
return 'job:'
}
async create (userId, job) {
job.userId = userId
// Serializes.
job.paramsVector = JSON.stringify(job.paramsVector)
return await this.add(new Job(job))
}
async save (job) {
// Serializes.
job.paramsVector = JSON.stringify(job.paramsVector)
return await this.update(job)
}
async get (properties) {
const jobs = await super.get(properties)
// Deserializes.
forEach(jobs, job => {
const {paramsVector} = job
try {
job.paramsVector = JSON.parse(paramsVector)
} catch (error) {
console.warn('cannot parse job.paramsVector:', paramsVector) // FIXME this is a warning as I copy/paste acl.js, but...
job.paramsVector = {}
}
})
return jobs
}
}

36
src/models/schedule.js Normal file
View File

@ -0,0 +1,36 @@
import Collection from '../collection/redis'
import forEach from 'lodash.foreach'
import Model from '../model'
// ===================================================================
export default class Schedule extends Model {}
export class Schedules extends Collection {
get Model () {
return Schedule
}
get idPrefix () {
return 'schedule:'
}
create (userId, job, cron, enabled) {
return this.add(new Schedule({
userId,
job,
cron,
enabled
}))
}
async save (schedule) {
return await this.update(schedule)
}
async get (properties) {
const schedules = await super.get(properties)
forEach(schedules, schedule => schedule.enabled = (schedule.enabled === 'true'))
return schedules
}
}

159
src/scheduler.js Normal file
View File

@ -0,0 +1,159 @@
import forEach from 'lodash.foreach'
import {BaseError} from 'make-error'
import {CronJob} from 'cron'
const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId
export class SchedulerError extends BaseError {}
export class ScheduleOverride extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added')
}
}
export class NoSuchSchedule extends SchedulerError {
constructor (scheduleOrId) {
super('No schedule found for ID ' + _resolveId(scheduleOrId))
}
}
export class ScheduleNotEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled'
}
}
export class ScheduleAlreadyEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled')
}
}
export class ScheduleJobNotFound extends SchedulerError {
constructor (jobId, scheduleId) {
super('Job ' + jobId + ' not found for Schedule ' + scheduleId)
}
}
export default class Scheduler {
constructor (xo, {executor}) {
this.executor = executor
this.xo = xo
this._scheduleTable = undefined
this._loadSchedules()
}
async _loadSchedules () {
this._schedules = {}
const schedules = await this.xo.getAllSchedules()
this._scheduleTable = {}
this._cronJobs = {}
forEach(schedules, schedule => {
this._add(schedule)
})
}
add (schedule) {
if (this.exists(schedule)) {
throw new ScheduleOverride(schedule)
}
this._add(schedule)
}
_add (schedule) {
const id = _resolveId(schedule)
this._schedules[id] = schedule
this._scheduleTable[id] = false
if (schedule.enabled) {
this._enable(schedule)
}
}
remove (id) {
try {
this._disable(id)
} catch (exc) {
if (!exc instanceof SchedulerError) {
throw exc
}
} finally {
delete this._schedules[id]
delete this._scheduleTable[id]
}
}
exists (scheduleOrId) {
const id_ = _resolveId(scheduleOrId)
return id_ in this._schedules
}
async get (id) {
if (!this.exists(id)) {
throw new NoSuchSchedule(id)
}
return this._schedules[id]
}
async _get (id) {
const schedule = await this.xo.getSchedule(id)
if (!schedule) {
throw new NoSuchSchedule(id)
}
return schedule
}
async update (schedule) {
if (!this.exists(schedule)) {
throw new NoSuchSchedule(schedule)
}
const enabled = this.isEnabled(schedule)
if (enabled) {
await this._disable(schedule)
}
this._add(schedule)
}
isEnabled (scheduleOrId) {
return this._scheduleTable[_resolveId(scheduleOrId)]
}
_enable (schedule) {
const jobId = schedule.job
const cronJob = new CronJob(schedule.cron, async () => {
const job = await this._getJob(jobId, schedule.id)
this.executor.exec(job)
})
this._cronJobs[schedule.id] = cronJob
cronJob.start()
this._scheduleTable[schedule.id] = true
}
async _getJob (id, scheduleId) {
const job = await this.xo.getJob(id)
if (!job) {
throw new ScheduleJobNotFound(id, scheduleId)
}
return job
}
_disable (scheduleOrId) {
if (!this.exists(scheduleOrId)) {
throw new NoSuchSchedule(scheduleOrId)
}
if (!this.isEnabled(scheduleOrId)) {
throw new ScheduleNotEnabled(scheduleOrId)
}
const id = _resolveId(scheduleOrId)
this._cronJobs[id].stop()
delete this._cronJobs[id]
this._scheduleTable[id] = false
}
disableAll () {
forEach(this.scheduleTable, (enabled, id) => {
if (enabled) {
this._disable(id)
}
})
}
get scheduleTable () {
return this._scheduleTable
}
}

View File

@ -1,11 +1,14 @@
import createDebug from 'debug'
import escapeStringRegexp from 'escape-string-regexp'
import eventToPromise from 'event-to-promise'
import filter from 'lodash.filter'
import find from 'lodash.find'
import forEach from 'lodash.foreach'
import got from 'got'
import includes from 'lodash.includes'
import map from 'lodash.map'
import snakeCase from 'lodash.snakecase'
import sortBy from 'lodash.sortby'
import unzip from 'julien-f-unzip'
import {PassThrough} from 'stream'
import {promisify} from 'bluebird'
@ -645,6 +648,24 @@ export default class Xapi extends XapiBase {
// =================================================================
async rollingSnapshotVm (vmId, tag, depth) {
const vm = this.getObject(vmId)
const reg = new RegExp('^rollingSnapshot_[^_]+_' + escapeStringRegexp(tag))
const snapshots = sortBy(filter(vm.$snapshots, snapshot => reg.test(snapshot.name_label)), 'name_label')
const date = new Date().toISOString()
const ref = await this._snapshotVm(vm, `rollingSnapshot_${date}_${tag}_${vm.name_label}`)
const promises = []
for (let surplus = snapshots.length - (depth - 1); surplus > 0; surplus--) {
const oldSnap = snapshots.shift()
promises.push(this.deleteVm(oldSnap.uuid, true))
}
await Promise.all(promises)
return await this._getOrWaitObject(ref)
}
async _createVbd (vm, vdi, {
bootable = false,
position = undefined,

101
src/xo.js
View File

@ -21,10 +21,12 @@ import {Acls} from './models/acl'
import {autobind} from './decorators'
import {generateToken} from './utils'
import {Groups} from './models/group'
import {Jobs} from './models/job'
import {JsonRpcError, NoSuchObject} from './api-errors'
import {ModelAlreadyExists} from './collection'
import {Servers} from './models/server'
import {Tokens} from './models/token'
import {Schedules} from './models/schedule'
// ===================================================================
@ -52,6 +54,18 @@ class NoSuchXenServer extends NoSuchObject {
}
}
class NoSuchSchedule extends NoSuchObject {
constructor (id) {
super(id, 'schedule')
}
}
class NoSuchJob extends NoSuchObject {
constructor (id) {
super(id, 'job')
}
}
// ===================================================================
export default class Xo extends EventEmitter {
@ -119,6 +133,16 @@ export default class Xo extends EventEmitter {
prefix: 'xo:user',
indexes: ['email']
})
this._jobs = new Jobs({
connection: redis,
prefix: 'xo:job',
indexes: ['user_id', 'key']
})
this._schedules = new Schedules({
connection: redis,
prefix: 'xo:schedule',
indexes: ['user_id', 'job']
})
// Proxies tokens/users related events to XO and removes tokens
// when their related user is removed.
@ -431,6 +455,83 @@ export default class Xo extends EventEmitter {
// -----------------------------------------------------------------
async getAllJobs () {
return await this._jobs.get()
}
async getJob (id) {
const job = await this._jobs.first(id)
if (!job) {
throw new NoSuchJob(id)
}
return job.properties
}
async createJob (userId, job) {
// TODO: use plain objects
const job_ = await this._jobs.create(userId, job)
return job_.properties
}
async updateJob (job) {
return await this._jobs.save(job)
}
async removeJob (id) {
return await this._jobs.remove(id)
}
// -----------------------------------------------------------------
async _getSchedule (id) {
const schedule = await this._schedules.first(id)
if (!schedule) {
throw new NoSuchSchedule(id)
}
return schedule
}
async getSchedule (id) {
return (await this._getSchedule(id)).properties
}
async getAllSchedules () {
return await this._schedules.get()
}
async createSchedule (userId, {job, cron, enabled}) {
const schedule_ = await this._schedules.create(userId, job, cron, enabled)
const schedule = schedule_.properties
if (this.scheduler) {
this.scheduler.add(schedule)
}
return schedule
}
async updateSchedule (id, {job, cron, enabled}) {
const schedule = await this._getSchedule(id)
if (job) schedule.set('job', job)
if (cron) schedule.set('cron', cron)
if (enabled !== undefined) schedule.set('enabled', enabled)
await this._schedules.save(schedule)
if (this.scheduler) {
this.scheduler.update(schedule.properties)
}
}
async removeSchedule (id) {
await this._schedules.remove(id)
if (this.scheduler) {
this.scheduler.remove(id)
}
}
// -----------------------------------------------------------------
async createAuthenticationToken ({userId}) {
// TODO: use plain objects
const token = await this._tokens.generate(userId)