From 901c7704f454e4370be70c1faedacb8009aa4923 Mon Sep 17 00:00:00 2001 From: wescoeur Date: Tue, 23 Feb 2016 11:03:51 +0100 Subject: [PATCH] Scheduler is now in scheduling.js and use ScheduleFn function. --- src/api/scheduler.js | 2 +- src/scheduler.js | 167 ------------------------------------ src/utils.js | 29 +++++++ src/xo-mixins/scheduling.js | 154 +++++++++++++++++++++++++++------ 4 files changed, 160 insertions(+), 192 deletions(-) delete mode 100644 src/scheduler.js diff --git a/src/api/scheduler.js b/src/api/scheduler.js index 3aa5efff3..eb636daaa 100644 --- a/src/api/scheduler.js +++ b/src/api/scheduler.js @@ -23,7 +23,7 @@ disable.params = { } export function getScheduleTable () { - return this.scheduler.scheduleTable + return this.scheduleTable } disable.permission = 'admin' diff --git a/src/scheduler.js b/src/scheduler.js deleted file mode 100644 index 1de43fe32..000000000 --- a/src/scheduler.js +++ /dev/null @@ -1,167 +0,0 @@ -import {BaseError} from 'make-error' -import {CronJob} from 'cron' - -import { forEach } from './utils' - -const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId - -export class SchedulerError extends BaseError {} -export class ScheduleOverride extends SchedulerError { - constructor (scheduleOrId) { - super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added') - } -} -export class NoSuchSchedule extends SchedulerError { - constructor (scheduleOrId) { - super('No schedule found for ID ' + _resolveId(scheduleOrId)) - } -} -export class ScheduleNotEnabled extends SchedulerError { - constructor (scheduleOrId) { - super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled' - } -} -export class ScheduleAlreadyEnabled extends SchedulerError { - constructor (scheduleOrId) { - super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled') - } -} -export class ScheduleJobNotFound extends SchedulerError { - constructor (jobId, scheduleId) { - super('Job ' + jobId + ' not found for Schedule ' + scheduleId) - } -} - -export default class Scheduler { - constructor (xo) { - this.xo = xo - this._scheduleTable = undefined - - this._runningSchedules = {} - } - - async _loadSchedules () { - this._schedules = {} - const schedules = await this.xo.getAllSchedules() - this._scheduleTable = {} - this._cronJobs = {} - forEach(schedules, schedule => { - this._add(schedule) - }) - } - - add (schedule) { - if (this.exists(schedule)) { - throw new ScheduleOverride(schedule) - } - this._add(schedule) - } - - _add (schedule) { - const id = _resolveId(schedule) - this._schedules[id] = schedule - this._scheduleTable[id] = false - if (schedule.enabled) { - this._enable(schedule) - } - } - - remove (id) { - try { - this._disable(id) - } catch (exc) { - if (!exc instanceof SchedulerError) { - throw exc - } - } finally { - delete this._schedules[id] - delete this._scheduleTable[id] - } - } - - exists (scheduleOrId) { - const id_ = _resolveId(scheduleOrId) - return id_ in this._schedules - } - - async get (id) { - if (!this.exists(id)) { - throw new NoSuchSchedule(id) - } - return this._schedules[id] - } - - async _get (id) { - const schedule = await this.xo.getSchedule(id) - if (!schedule) { - throw new NoSuchSchedule(id) - } - return schedule - } - - async update (schedule) { - if (!this.exists(schedule)) { - throw new NoSuchSchedule(schedule) - } - const enabled = this.isEnabled(schedule) - if (enabled) { - await this._disable(schedule) - } - this._add(schedule) - } - - isEnabled (scheduleOrId) { - return this._scheduleTable[_resolveId(scheduleOrId)] - } - - _enable (schedule) { - const running = this._runningSchedules - - const { id } = schedule - const jobId = schedule.job - - const cronJob = new CronJob(schedule.cron, async () => { - if (running[id]) { - return // Simply ignore. - } - - try { - running[id] = true - await this.xo.runJobSequence([ jobId ]) - } catch (error) { - // FIXME: better error handling - console.error(error && error.stack || error) - } finally { - delete running[id] - } - }) - this._cronJobs[id] = cronJob - cronJob.start() - this._scheduleTable[id] = true - } - - _disable (scheduleOrId) { - if (!this.exists(scheduleOrId)) { - throw new NoSuchSchedule(scheduleOrId) - } - if (!this.isEnabled(scheduleOrId)) { - throw new ScheduleNotEnabled(scheduleOrId) - } - const id = _resolveId(scheduleOrId) - this._cronJobs[id].stop() - delete this._cronJobs[id] - this._scheduleTable[id] = false - } - - disableAll () { - forEach(this.scheduleTable, (enabled, id) => { - if (enabled) { - this._disable(id) - } - }) - } - - get scheduleTable () { - return this._scheduleTable - } -} diff --git a/src/utils.js b/src/utils.js index 671a6762e..20212ae0a 100644 --- a/src/utils.js +++ b/src/utils.js @@ -10,6 +10,7 @@ import isString from 'lodash.isstring' import kindOf from 'kindof' import multiKeyHashInt from 'multikey-hash' import xml2js from 'xml2js' +import { CronJob } from 'cron' import { defer } from 'promise-toolbox' import {promisify} from 'bluebird' import { @@ -479,5 +480,33 @@ export const streamToArray = (stream, filter = undefined) => new Promise((resolv // ------------------------------------------------------------------- +export const scheduleFn = (cronPattern, fn) => { + let running = false + + const job = new CronJob(cronPattern, async () => { + if (running) { + return + } + + running = true + + try { + await fn() + } catch (error) { + console.error('[WARN] scheduled function:', error && error.stack || error) + } finally { + running = false + } + }) + + job.start() + + return () => { + job.stop() + } +} + +// ------------------------------------------------------------------- + // Wrap a value in a function. export const wrap = value => () => value diff --git a/src/xo-mixins/scheduling.js b/src/xo-mixins/scheduling.js index 22d723eb6..6e2805547 100644 --- a/src/xo-mixins/scheduling.js +++ b/src/xo-mixins/scheduling.js @@ -1,14 +1,39 @@ -import Scheduler from '../scheduler' +import { BaseError } from 'make-error' +import { NoSuchObject } from '../api-errors.js' import { Schedules } from '../models/schedule' + import { - NoSuchObject -} from '../api-errors' + forEach, + scheduleFn +} from '../utils' // =================================================================== -class NoSuchSchedule extends NoSuchObject { - constructor (id) { - super(id, 'schedule') +const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId + +export class SchedulerError extends BaseError {} + +export class ScheduleOverride extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added') + } +} + +export class NoSuchSchedule extends NoSuchObject { + constructor (scheduleOrId) { + super(scheduleOrId, 'schedule') + } +} + +export class ScheduleNotEnabled extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled' + } +} + +export class ScheduleAlreadyEnabled extends SchedulerError { + constructor (scheduleOrId) { + super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled') } } @@ -16,23 +41,87 @@ class NoSuchSchedule extends NoSuchObject { export default class { constructor (xo) { - this._schedules = new Schedules({ + this.xo = xo + this._redisSchedules = new Schedules({ connection: xo._redis, prefix: 'xo:schedule', indexes: ['user_id', 'job'] }) - const scheduler = this._scheduler = new Scheduler(xo) + this._scheduleTable = undefined - xo.on('start', () => scheduler._loadSchedules()) - xo.on('stop', () => scheduler.disableAll()) + xo.on('start', () => this._loadSchedules()) + xo.on('stop', () => this._disableAll()) } - get scheduler () { - return this._scheduler + _add (schedule) { + const id = _resolveId(schedule) + this._schedules[id] = schedule + this._scheduleTable[id] = false + if (schedule.enabled) { + this._enable(schedule) + } + } + + _exists (scheduleOrId) { + const id_ = _resolveId(scheduleOrId) + return id_ in this._schedules + } + + _isEnabled (scheduleOrId) { + return this._scheduleTable[_resolveId(scheduleOrId)] + } + + _enable (schedule) { + const { id } = schedule + + const stopSchedule = scheduleFn(schedule.cron, () => + this.xo.runJobSequence([ schedule.job ]) + ) + + this._cronJobs[id] = stopSchedule + this._scheduleTable[id] = true + } + + _disable (scheduleOrId) { + if (!this._exists(scheduleOrId)) { + throw new NoSuchSchedule(scheduleOrId) + } + if (!this._isEnabled(scheduleOrId)) { + throw new ScheduleNotEnabled(scheduleOrId) + } + const id = _resolveId(scheduleOrId) + this._cronJobs[id]() // Stop cron job. + delete this._cronJobs[id] + this._scheduleTable[id] = false + } + + _disableAll () { + forEach(this._scheduleTable, (enabled, id) => { + if (enabled) { + this._disable(id) + } + }) + } + + get scheduleTable () { + return this._scheduleTable + } + + async _loadSchedules () { + this._schedules = {} + this._scheduleTable = {} + this._cronJobs = {} + + const schedules = await this.xo.getAllSchedules() + + forEach(schedules, schedule => { + this._add(schedule) + }) } async _getSchedule (id) { - const schedule = await this._schedules.first(id) + const schedule = await this._redisSchedules.first(id) + if (!schedule) { throw new NoSuchSchedule(id) } @@ -45,15 +134,15 @@ export default class { } async getAllSchedules () { - return await this._schedules.get() + return await this._redisSchedules.get() } async createSchedule (userId, {job, cron, enabled, name}) { - const schedule_ = await this._schedules.create(userId, job, cron, enabled, name) + const schedule_ = await this._redisSchedules.create(userId, job, cron, enabled, name) const schedule = schedule_.properties - if (this.scheduler) { - this.scheduler.add(schedule) - } + + this._add(schedule) + return schedule } @@ -65,16 +154,33 @@ export default class { if (enabled !== undefined) schedule.set('enabled', enabled) if (name !== undefined) schedule.set('name', name) - await this._schedules.save(schedule) - if (this.scheduler) { - this.scheduler.update(schedule.properties) + await this._redisSchedules.save(schedule) + + const { properties } = schedule + + if (!this._exists(properties)) { + throw new NoSuchSchedule(properties) } + + if (this._isEnabled(properties)) { + await this._disable(properties) + } + + this._add(properties) } async removeSchedule (id) { - await this._schedules.remove(id) - if (this.scheduler) { - this.scheduler.remove(id) + await this._redisSchedules.remove(id) + + try { + this._disable(id) + } catch (exc) { + if (!exc instanceof SchedulerError) { + throw exc + } + } finally { + delete this._schedules[id] + delete this._scheduleTable[id] } } }