From 4a5f6d83935972e5a7230da9f24e97e6becd7154 Mon Sep 17 00:00:00 2001 From: Fabrice Marsaud Date: Wed, 10 Jun 2015 17:25:37 +0200 Subject: [PATCH] Job scheduler feature --- package.json | 5 ++ src/api/index.js | 3 + src/api/job.js | 103 +++++++++++++++++++++++++ src/api/schedule.js | 55 ++++++++++++++ src/api/scheduler.js | 30 ++++++++ src/api/vm.coffee | 20 +++++ src/index.js | 19 ++++- src/job-executor.js | 72 ++++++++++++++++++ src/job-executor.spec.js | 70 +++++++++++++++++ src/models/job.js | 48 ++++++++++++ src/models/schedule.js | 36 +++++++++ src/scheduler.js | 159 +++++++++++++++++++++++++++++++++++++++ src/xapi.js | 21 ++++++ src/xo.js | 101 +++++++++++++++++++++++++ 14 files changed, 741 insertions(+), 1 deletion(-) create mode 100644 src/api/job.js create mode 100644 src/api/schedule.js create mode 100644 src/api/scheduler.js create mode 100644 src/job-executor.js create mode 100644 src/job-executor.spec.js create mode 100644 src/models/job.js create mode 100644 src/models/schedule.js create mode 100644 src/scheduler.js diff --git a/package.json b/package.json index 0badd923d..ab99fd2b7 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,9 @@ "bluebird": "^2.9.14", "clarify": "^1.0.5", "connect": "^3.3.5", + "cron": "^1.0.9", "debug": "^2.1.3", + "escape-string-regexp": "^1.0.3", "event-to-promise": "^0.3.2", "exec-promise": "^0.5.1", "fs-promise": "^0.3.1", @@ -68,6 +70,7 @@ "lodash.pick": "^3.0.0", "lodash.result": "^3.0.0", "lodash.snakecase": "^3.0.1", + "lodash.sortby": "^3.1.4", "lodash.startswith": "^3.0.1", "make-error": "^1", "multikey-hash": "^1.0.1", @@ -86,6 +89,7 @@ }, "devDependencies": { "babel-eslint": "^3.1.9", + "chai": "^3.0.0", "dependency-check": "^2.4.0", "gulp": "git://github.com/gulpjs/gulp#4.0", "gulp-babel": "^5", @@ -93,6 +97,7 @@ "gulp-plumber": "^1.0.0", "gulp-sourcemaps": "^1.5.1", "gulp-watch": "^4.2.2", + "leche": "^2.1.1", "mocha": "^2.2.1", "must": "^0.12.0", "node-inspector": "^0.10.1", diff --git a/src/api/index.js b/src/api/index.js index e09b3d971..11ba8ebfb 100644 --- a/src/api/index.js +++ b/src/api/index.js @@ -9,11 +9,14 @@ export * as disk from './disk' export * as docker from './docker' export * as group from './group' export * as host from './host' +export * as job from './job' export * as message from './message' export * as pbd from './pbd' export * as pif from './pif' export * as pool from './pool' export * as role from './role' +export * as schedule from './schedule' +export * as scheduler from './scheduler' export * as server from './server' export * as session from './session' export * as sr from './sr' diff --git a/src/api/job.js b/src/api/job.js new file mode 100644 index 000000000..8077a26b3 --- /dev/null +++ b/src/api/job.js @@ -0,0 +1,103 @@ +// FIXME so far, no acls for jobs + +export async function getAll () { + return await this.getAllJobs() +} + +getAll.permission = 'admin' +getAll.description = 'Gets all available jobs' + +export async function get (id) { + return await this.getJob(id) +} + +get.permission = 'admin' +get.description = 'Gets an existing job' +get.params = { + id: {type: 'string'} +} + +export async function create ({job}) { + return (await this.createJob(this.session.get('user_id'), job)).id +} + +create.permission = 'admin' +create.description = 'Creates a new job from description object' +create.params = { + job: { + type: 'object', + properties: { + type: {type: 'string'}, + key: {type: 'string'}, + method: {type: 'string'}, + paramsVector: { + type: 'object', + properties: { + type: {type: 'string'}, + items: { + type: 'array', + items: { + type: 'object', + properties: { + type: {type: 'string'}, + values: { + type: 'array', + items: {type: 'object'} + } + } + } + } + } + } + } + } +} + +export async function set ({job}) { + await this.updateJob(job) +} + +set.permission = 'admin' +set.description = 'Modifies an existing job from a description object' +set.params = { + job: { + type: 'object', + properties: { + id: {type: 'string'}, + type: {type: 'string'}, + key: {type: 'string'}, + method: {type: 'string'}, + paramsVector: { + type: 'object', + properties: { + type: {type: 'string'}, + items: { + type: 'array', + items: { + type: 'object', + properties: { + type: {type: 'string'}, + values: { + type: 'array', + items: {type: 'object'} + } + } + } + } + } + } + } + } +} + +async function delete_ ({id}) { + await this.removeJob(id) +} + +delete_.permission = 'admin' +delete_.description = 'Deletes an existing job' +delete_.params = { + id: {type: 'string'} +} + +export {delete_ as delete} diff --git a/src/api/schedule.js b/src/api/schedule.js new file mode 100644 index 000000000..c884df404 --- /dev/null +++ b/src/api/schedule.js @@ -0,0 +1,55 @@ +// FIXME so far, no acls for schedules + +export async function getAll () { + return await this.getAllSchedules() +} + +getAll.permission = 'admin' +getAll.description = 'Gets all existing schedules' + +export async function get (id) { + return await this.getSchedule(id) +} + +get.permission = 'admin' +get.description = 'Gets an existing schedule' +get.params = { + id: {type: 'string'} +} + +export async function create ({jobId, cron, enabled}) { + return await this.createSchedule(this.session.get('user_id'), {job: jobId, cron, enabled}) +} + +create.permission = 'admin' +create.description = 'Creates a new schedule' +create.params = { + jobId: {type: 'string'}, + cron: {type: 'string'}, + enabled: {type: 'boolean', optional: true} +} + +export async function set ({id, jobId, cron, enabled}) { + await this.updateSchedule(id, {job: jobId, cron, enabled}) +} + +set.permission = 'admin' +set.description = 'Modifies an existing schedule' +set.params = { + id: {type: 'string'}, + jobId: {type: 'string', optional: true}, + cron: {type: 'string', optional: true}, + enabled: {type: 'boolean', optional: true} +} + +async function delete_ ({id}) { + await this.removeSchedule(id) +} + +delete_.permission = 'admin' +delete_.description = 'Deletes an existing schedule' +delete_.params = { + id: {type: 'string'} +} + +export {delete_ as delete} diff --git a/src/api/scheduler.js b/src/api/scheduler.js new file mode 100644 index 000000000..1e064187e --- /dev/null +++ b/src/api/scheduler.js @@ -0,0 +1,30 @@ +export async function enable ({id}) { + const schedule = await this.getSchedule(id) + schedule.enabled = true + await this.updateSchedule(id, schedule) +} + +enable.permission = 'admin' +enable.description = 'Enables a schedule to run it\'s job as scheduled' +enable.params = { + id: {type: 'string'} +} + +export async function disable ({id}) { + const schedule = await this.getSchedule(id) + schedule.enabled = false + await this.updateSchedule(id, schedule) +} + +disable.permission = 'admin' +disable.description = 'Disables a schedule' +disable.params = { + id: {type: 'string'} +} + +export function getScheduleTable () { + return this.scheduler.scheduleTable +} + +disable.permission = 'admin' +disable.description = 'Get a map of existing schedules enabled/disabled state' diff --git a/src/api/vm.coffee b/src/api/vm.coffee index f14357cb2..d6c7014ea 100644 --- a/src/api/vm.coffee +++ b/src/api/vm.coffee @@ -471,6 +471,26 @@ exports.snapshot = snapshot #--------------------------------------------------------------------- +rollingSnapshot = $coroutine ({vm, tag, depth}) -> + snapshot = yield @getXAPI(vm).rollingSnapshotVm(vm.ref, tag, depth) + return snapshot.$id + +rollingSnapshot.params = { + id: { type: 'string'} + tag: {type: 'string'} + depth: {type: 'number'} +} + +rollingSnapshot.resolve = { + vm: ['id', 'VM', 'administrate'] +} + +rollingSnapshot.description = 'Snaphots a VM with a tagged name, and removes the oldest snapshot with the same tag according to depth' + +exports.rollingSnapshot = rollingSnapshot + +#--------------------------------------------------------------------- + start = $coroutine ({vm}) -> yield @getXAPI(vm).call( 'VM.start', vm.ref diff --git a/src/index.js b/src/index.js index 0257c431e..a882d53b6 100644 --- a/src/index.js +++ b/src/index.js @@ -29,6 +29,8 @@ import {readFile} from 'fs-promise' import * as apiMethods from './api/index' import Api from './api' +import JobExecutor from './job-executor' +import Scheduler from './scheduler' import WebServer from 'http-server-plus' import wsProxy from './ws-proxy' import Xo from './xo' @@ -285,7 +287,18 @@ const setUpApi = (webServer, xo) => { socket.send(data, onSend) } }) + }) + + return api +} + +const setUpScheduler = (api, xo) => { + const jobExecutor = new JobExecutor(xo, api) + const scheduler = new Scheduler(xo, {executor: jobExecutor}) + xo.scheduler = scheduler + + return scheduler } // =================================================================== @@ -435,7 +448,9 @@ export default async function main (args) { connect.use(bind(xo._handleProxyRequest, xo)) // Must be set up before the static files. - setUpApi(webServer, xo) + const api = setUpApi(webServer, xo) + + const scheduler = setUpScheduler(api, xo) setUpProxies(connect, config.http.proxies) @@ -455,10 +470,12 @@ export default async function main (args) { // responsability?) process.on('SIGINT', () => { debug('SIGINT caught, closing web server…') + scheduler.disableAll() webServer.close() }) process.on('SIGTERM', () => { debug('SIGTERM caught, closing web server…') + scheduler.disableAll() webServer.close() }) diff --git a/src/job-executor.js b/src/job-executor.js new file mode 100644 index 000000000..f00d36633 --- /dev/null +++ b/src/job-executor.js @@ -0,0 +1,72 @@ +import assign from 'lodash.assign' +import forEach from 'lodash.foreach' +import {BaseError} from 'make-error' + +export class JobExecutorError extends BaseError {} +export class UnsupportedJobType extends JobExecutorError { + constructor (job) { + super('Unknown job type: ' + job.type) + } +} +export class UnsupportedVectorType extends JobExecutorError { + constructor (vector) { + super('Unknown vector type: ' + vector.type) + } +} + +export const productParams = (...args) => { + let product = Object.create(null) + assign(product, ...args) + return product +} + +export default class JobExecutor { + constructor (xo, api) { + this.xo = xo + this.api = api + this._extractValueCb = { + 'set': items => items.values + } + } + + exec (job) { + if (job.type === 'call') { + this._execCall(job.userId, job.method, job.paramsVector) + } else { + throw new UnsupportedJobType(job) + } + } + + _execCall (userId, method, paramsVector) { + let paramsFlatVector + if (paramsVector.type === 'crossProduct') { + paramsFlatVector = this._computeCrossProduct(paramsVector.items, productParams, this._extractValueCb) + } else { + throw new UnsupportedVectorType(paramsVector) + } + const connection = this.xo.createUserConnection() + connection.set('user_id', userId) + forEach(paramsFlatVector, params => { + this.api.call(connection, method, assign({}, params)) + }) + connection.close() + } + + _computeCrossProduct (items, productCb, extractValueMap = {}) { + const upstreamValues = [] + const itemsCopy = items.slice() + const item = itemsCopy.pop() + const values = extractValueMap[item.type] && extractValueMap[item.type](item) || item + forEach(values, value => { + if (itemsCopy.length) { + let downstreamValues = this._computeCrossProduct(itemsCopy, productCb, extractValueMap) + forEach(downstreamValues, downstreamValue => { + upstreamValues.push(productCb(value, downstreamValue)) + }) + } else { + upstreamValues.push(value) + } + }) + return upstreamValues + } +} diff --git a/src/job-executor.spec.js b/src/job-executor.spec.js new file mode 100644 index 000000000..117bc934e --- /dev/null +++ b/src/job-executor.spec.js @@ -0,0 +1,70 @@ +/* eslint-env mocha */ + +import {expect} from 'chai' +import leche from 'leche' + +import {productParams} from './job-executor' +import JobExecutor from './job-executor' + +describe('productParams', function () { + leche.withData({ + 'Two sets of one': [ + {a: 1, b: 2}, {a: 1}, {b: 2} + ], + 'Two sets of two': [ + {a: 1, b: 2, c: 3, d: 4}, {a: 1, b: 2}, {c: 3, d: 4} + ], + 'Three sets': [ + {a: 1, b: 2, c: 3, d: 4, e: 5, f: 6}, {a: 1}, {b: 2, c: 3}, {d: 4, e: 5, f: 6} + ], + 'One set': [ + {a: 1, b: 2}, {a: 1, b: 2} + ], + 'Empty set': [ + {a: 1}, {a: 1}, {} + ], + 'All empty': [ + {}, {}, {} + ], + 'No set': [ + {} + ] + }, function (resultSet, ...sets) { + it('Assembles all given param sets in on set', function () { + expect(productParams(...sets)).to.eql(resultSet) + }) + }) +}) + +describe('JobExecutor._computeCrossProduct', function () { + const jobExecutor = new JobExecutor({}) + // Gives the sum of all args + const addTest = (...args) => args.reduce((prev, curr) => prev + curr, 0) + // Gives the product of all args + const multiplyTest = (...args) => args.reduce((prev, curr) => prev * curr, 1) + + leche.withData({ + '2 sets of 2 items to multiply': [ + [10, 14, 15, 21], [[2, 3], [5, 7]], multiplyTest + ], + '3 sets of 2 items to multiply': [ + [110, 130, 154, 182, 165, 195, 231, 273], [[2, 3], [5, 7], [11, 13]], multiplyTest + ], + '2 sets of 3 items to multiply': [ + [14, 22, 26, 21, 33, 39, 35, 55, 65], [[2, 3, 5], [7, 11, 13]], multiplyTest + ], + '2 sets of 2 items to add': [ + [7, 9, 8, 10], [[2, 3], [5, 7]], addTest + ], + '3 sets of 2 items to add': [ + [18, 20, 20, 22, 19, 21, 21, 23], [[2, 3], [5, 7], [11, 13]], addTest + ], + '2 sets of 3 items to add': [ + [9, 13, 15, 10, 14, 16, 12, 16, 18], [[2, 3, 5], [7, 11, 13]], addTest + ] + }, function (product, items, cb) { + it('Crosses sets of values with a crossProduct callback', function () { + expect(jobExecutor._computeCrossProduct(items, cb)).to.have.members(product) + }) + }) +}) diff --git a/src/models/job.js b/src/models/job.js new file mode 100644 index 000000000..3e622c5e7 --- /dev/null +++ b/src/models/job.js @@ -0,0 +1,48 @@ +import forEach from 'lodash.foreach' + +import Collection from '../collection/redis' +import Model from '../model' + +// =================================================================== + +export default class Job extends Model {} + +export class Jobs extends Collection { + get Model () { + return Job + } + + get idPrefix () { + return 'job:' + } + + async create (userId, job) { + job.userId = userId + // Serializes. + job.paramsVector = JSON.stringify(job.paramsVector) + return await this.add(new Job(job)) + } + + async save (job) { + // Serializes. + job.paramsVector = JSON.stringify(job.paramsVector) + return await this.update(job) + } + + async get (properties) { + const jobs = await super.get(properties) + + // Deserializes. + forEach(jobs, job => { + const {paramsVector} = job + try { + job.paramsVector = JSON.parse(paramsVector) + } catch (error) { + console.warn('cannot parse job.paramsVector:', paramsVector) // FIXME this is a warning as I copy/paste acl.js, but... + job.paramsVector = {} + } + }) + + return jobs + } +} diff --git a/src/models/schedule.js b/src/models/schedule.js new file mode 100644 index 000000000..e252cbb58 --- /dev/null +++ b/src/models/schedule.js @@ -0,0 +1,36 @@ +import Collection from '../collection/redis' +import forEach from 'lodash.foreach' +import Model from '../model' + +// =================================================================== + +export default class Schedule extends Model {} + +export class Schedules extends Collection { + get Model () { + return Schedule + } + + get idPrefix () { + return 'schedule:' + } + + create (userId, job, cron, enabled) { + return this.add(new Schedule({ + userId, + job, + cron, + enabled + })) + } + + async save (schedule) { + return await this.update(schedule) + } + + async get (properties) { + const schedules = await super.get(properties) + forEach(schedules, schedule => schedule.enabled = (schedule.enabled === 'true')) + return schedules + } +} diff --git a/src/scheduler.js b/src/scheduler.js new file mode 100644 index 000000000..3915f8776 --- /dev/null +++ b/src/scheduler.js @@ -0,0 +1,159 @@ +import forEach from 'lodash.foreach' +import {BaseError} from 'make-error' +import {CronJob} from 'cron' + +const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId + +export class SchedulerError extends BaseError {} +export class ScheduleOverride extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added') + } +} +export class NoSuchSchedule extends SchedulerError { + constructor (scheduleOrId) { + super('No schedule found for ID ' + _resolveId(scheduleOrId)) + } +} +export class ScheduleNotEnabled extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled' + } +} +export class ScheduleAlreadyEnabled extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled') + } +} +export class ScheduleJobNotFound extends SchedulerError { + constructor (jobId, scheduleId) { + super('Job ' + jobId + ' not found for Schedule ' + scheduleId) + } +} + +export default class Scheduler { + constructor (xo, {executor}) { + this.executor = executor + this.xo = xo + this._scheduleTable = undefined + this._loadSchedules() + } + + async _loadSchedules () { + this._schedules = {} + const schedules = await this.xo.getAllSchedules() + this._scheduleTable = {} + this._cronJobs = {} + forEach(schedules, schedule => { + this._add(schedule) + }) + } + + add (schedule) { + if (this.exists(schedule)) { + throw new ScheduleOverride(schedule) + } + this._add(schedule) + } + + _add (schedule) { + const id = _resolveId(schedule) + this._schedules[id] = schedule + this._scheduleTable[id] = false + if (schedule.enabled) { + this._enable(schedule) + } + } + + remove (id) { + try { + this._disable(id) + } catch (exc) { + if (!exc instanceof SchedulerError) { + throw exc + } + } finally { + delete this._schedules[id] + delete this._scheduleTable[id] + } + } + + exists (scheduleOrId) { + const id_ = _resolveId(scheduleOrId) + return id_ in this._schedules + } + + async get (id) { + if (!this.exists(id)) { + throw new NoSuchSchedule(id) + } + return this._schedules[id] + } + + async _get (id) { + const schedule = await this.xo.getSchedule(id) + if (!schedule) { + throw new NoSuchSchedule(id) + } + return schedule + } + + async update (schedule) { + if (!this.exists(schedule)) { + throw new NoSuchSchedule(schedule) + } + const enabled = this.isEnabled(schedule) + if (enabled) { + await this._disable(schedule) + } + this._add(schedule) + } + + isEnabled (scheduleOrId) { + return this._scheduleTable[_resolveId(scheduleOrId)] + } + + _enable (schedule) { + const jobId = schedule.job + const cronJob = new CronJob(schedule.cron, async () => { + const job = await this._getJob(jobId, schedule.id) + this.executor.exec(job) + }) + this._cronJobs[schedule.id] = cronJob + cronJob.start() + this._scheduleTable[schedule.id] = true + } + + async _getJob (id, scheduleId) { + const job = await this.xo.getJob(id) + if (!job) { + throw new ScheduleJobNotFound(id, scheduleId) + } + return job + } + + _disable (scheduleOrId) { + if (!this.exists(scheduleOrId)) { + throw new NoSuchSchedule(scheduleOrId) + } + if (!this.isEnabled(scheduleOrId)) { + throw new ScheduleNotEnabled(scheduleOrId) + } + const id = _resolveId(scheduleOrId) + this._cronJobs[id].stop() + delete this._cronJobs[id] + this._scheduleTable[id] = false + } + + disableAll () { + forEach(this.scheduleTable, (enabled, id) => { + if (enabled) { + this._disable(id) + } + }) + } + + get scheduleTable () { + return this._scheduleTable + } +} diff --git a/src/xapi.js b/src/xapi.js index 8cf685f4b..cbd9fe09e 100644 --- a/src/xapi.js +++ b/src/xapi.js @@ -1,11 +1,14 @@ import createDebug from 'debug' +import escapeStringRegexp from 'escape-string-regexp' import eventToPromise from 'event-to-promise' +import filter from 'lodash.filter' import find from 'lodash.find' import forEach from 'lodash.foreach' import got from 'got' import includes from 'lodash.includes' import map from 'lodash.map' import snakeCase from 'lodash.snakecase' +import sortBy from 'lodash.sortby' import unzip from 'julien-f-unzip' import {PassThrough} from 'stream' import {promisify} from 'bluebird' @@ -645,6 +648,24 @@ export default class Xapi extends XapiBase { // ================================================================= + async rollingSnapshotVm (vmId, tag, depth) { + const vm = this.getObject(vmId) + const reg = new RegExp('^rollingSnapshot_[^_]+_' + escapeStringRegexp(tag)) + const snapshots = sortBy(filter(vm.$snapshots, snapshot => reg.test(snapshot.name_label)), 'name_label') + const date = new Date().toISOString() + + const ref = await this._snapshotVm(vm, `rollingSnapshot_${date}_${tag}_${vm.name_label}`) + + const promises = [] + for (let surplus = snapshots.length - (depth - 1); surplus > 0; surplus--) { + const oldSnap = snapshots.shift() + promises.push(this.deleteVm(oldSnap.uuid, true)) + } + await Promise.all(promises) + + return await this._getOrWaitObject(ref) + } + async _createVbd (vm, vdi, { bootable = false, position = undefined, diff --git a/src/xo.js b/src/xo.js index f38c76b07..22ec6ff0c 100644 --- a/src/xo.js +++ b/src/xo.js @@ -21,10 +21,12 @@ import {Acls} from './models/acl' import {autobind} from './decorators' import {generateToken} from './utils' import {Groups} from './models/group' +import {Jobs} from './models/job' import {JsonRpcError, NoSuchObject} from './api-errors' import {ModelAlreadyExists} from './collection' import {Servers} from './models/server' import {Tokens} from './models/token' +import {Schedules} from './models/schedule' // =================================================================== @@ -52,6 +54,18 @@ class NoSuchXenServer extends NoSuchObject { } } +class NoSuchSchedule extends NoSuchObject { + constructor (id) { + super(id, 'schedule') + } +} + +class NoSuchJob extends NoSuchObject { + constructor (id) { + super(id, 'job') + } +} + // =================================================================== export default class Xo extends EventEmitter { @@ -119,6 +133,16 @@ export default class Xo extends EventEmitter { prefix: 'xo:user', indexes: ['email'] }) + this._jobs = new Jobs({ + connection: redis, + prefix: 'xo:job', + indexes: ['user_id', 'key'] + }) + this._schedules = new Schedules({ + connection: redis, + prefix: 'xo:schedule', + indexes: ['user_id', 'job'] + }) // Proxies tokens/users related events to XO and removes tokens // when their related user is removed. @@ -431,6 +455,83 @@ export default class Xo extends EventEmitter { // ----------------------------------------------------------------- + async getAllJobs () { + return await this._jobs.get() + } + + async getJob (id) { + const job = await this._jobs.first(id) + if (!job) { + throw new NoSuchJob(id) + } + + return job.properties + } + + async createJob (userId, job) { + // TODO: use plain objects + const job_ = await this._jobs.create(userId, job) + return job_.properties + } + + async updateJob (job) { + return await this._jobs.save(job) + } + + async removeJob (id) { + return await this._jobs.remove(id) + } + + // ----------------------------------------------------------------- + + async _getSchedule (id) { + const schedule = await this._schedules.first(id) + if (!schedule) { + throw new NoSuchSchedule(id) + } + + return schedule + } + + async getSchedule (id) { + return (await this._getSchedule(id)).properties + } + + async getAllSchedules () { + return await this._schedules.get() + } + + async createSchedule (userId, {job, cron, enabled}) { + const schedule_ = await this._schedules.create(userId, job, cron, enabled) + const schedule = schedule_.properties + if (this.scheduler) { + this.scheduler.add(schedule) + } + return schedule + } + + async updateSchedule (id, {job, cron, enabled}) { + const schedule = await this._getSchedule(id) + + if (job) schedule.set('job', job) + if (cron) schedule.set('cron', cron) + if (enabled !== undefined) schedule.set('enabled', enabled) + + await this._schedules.save(schedule) + if (this.scheduler) { + this.scheduler.update(schedule.properties) + } + } + + async removeSchedule (id) { + await this._schedules.remove(id) + if (this.scheduler) { + this.scheduler.remove(id) + } + } + + // ----------------------------------------------------------------- + async createAuthenticationToken ({userId}) { // TODO: use plain objects const token = await this._tokens.generate(userId)