Scheduler is now in scheduling.js and use ScheduleFn function.

This commit is contained in:
wescoeur 2016-02-23 11:03:51 +01:00
parent 16351ba7f3
commit 901c7704f4
4 changed files with 160 additions and 192 deletions

View File

@ -23,7 +23,7 @@ disable.params = {
}
export function getScheduleTable () {
return this.scheduler.scheduleTable
return this.scheduleTable
}
disable.permission = 'admin'

View File

@ -1,167 +0,0 @@
import {BaseError} from 'make-error'
import {CronJob} from 'cron'
import { forEach } from './utils'
const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId
export class SchedulerError extends BaseError {}
export class ScheduleOverride extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added')
}
}
export class NoSuchSchedule extends SchedulerError {
constructor (scheduleOrId) {
super('No schedule found for ID ' + _resolveId(scheduleOrId))
}
}
export class ScheduleNotEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled'
}
}
export class ScheduleAlreadyEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled')
}
}
export class ScheduleJobNotFound extends SchedulerError {
constructor (jobId, scheduleId) {
super('Job ' + jobId + ' not found for Schedule ' + scheduleId)
}
}
export default class Scheduler {
constructor (xo) {
this.xo = xo
this._scheduleTable = undefined
this._runningSchedules = {}
}
async _loadSchedules () {
this._schedules = {}
const schedules = await this.xo.getAllSchedules()
this._scheduleTable = {}
this._cronJobs = {}
forEach(schedules, schedule => {
this._add(schedule)
})
}
add (schedule) {
if (this.exists(schedule)) {
throw new ScheduleOverride(schedule)
}
this._add(schedule)
}
_add (schedule) {
const id = _resolveId(schedule)
this._schedules[id] = schedule
this._scheduleTable[id] = false
if (schedule.enabled) {
this._enable(schedule)
}
}
remove (id) {
try {
this._disable(id)
} catch (exc) {
if (!exc instanceof SchedulerError) {
throw exc
}
} finally {
delete this._schedules[id]
delete this._scheduleTable[id]
}
}
exists (scheduleOrId) {
const id_ = _resolveId(scheduleOrId)
return id_ in this._schedules
}
async get (id) {
if (!this.exists(id)) {
throw new NoSuchSchedule(id)
}
return this._schedules[id]
}
async _get (id) {
const schedule = await this.xo.getSchedule(id)
if (!schedule) {
throw new NoSuchSchedule(id)
}
return schedule
}
async update (schedule) {
if (!this.exists(schedule)) {
throw new NoSuchSchedule(schedule)
}
const enabled = this.isEnabled(schedule)
if (enabled) {
await this._disable(schedule)
}
this._add(schedule)
}
isEnabled (scheduleOrId) {
return this._scheduleTable[_resolveId(scheduleOrId)]
}
_enable (schedule) {
const running = this._runningSchedules
const { id } = schedule
const jobId = schedule.job
const cronJob = new CronJob(schedule.cron, async () => {
if (running[id]) {
return // Simply ignore.
}
try {
running[id] = true
await this.xo.runJobSequence([ jobId ])
} catch (error) {
// FIXME: better error handling
console.error(error && error.stack || error)
} finally {
delete running[id]
}
})
this._cronJobs[id] = cronJob
cronJob.start()
this._scheduleTable[id] = true
}
_disable (scheduleOrId) {
if (!this.exists(scheduleOrId)) {
throw new NoSuchSchedule(scheduleOrId)
}
if (!this.isEnabled(scheduleOrId)) {
throw new ScheduleNotEnabled(scheduleOrId)
}
const id = _resolveId(scheduleOrId)
this._cronJobs[id].stop()
delete this._cronJobs[id]
this._scheduleTable[id] = false
}
disableAll () {
forEach(this.scheduleTable, (enabled, id) => {
if (enabled) {
this._disable(id)
}
})
}
get scheduleTable () {
return this._scheduleTable
}
}

View File

@ -10,6 +10,7 @@ import isString from 'lodash.isstring'
import kindOf from 'kindof'
import multiKeyHashInt from 'multikey-hash'
import xml2js from 'xml2js'
import { CronJob } from 'cron'
import { defer } from 'promise-toolbox'
import {promisify} from 'bluebird'
import {
@ -479,5 +480,33 @@ export const streamToArray = (stream, filter = undefined) => new Promise((resolv
// -------------------------------------------------------------------
export const scheduleFn = (cronPattern, fn) => {
let running = false
const job = new CronJob(cronPattern, async () => {
if (running) {
return
}
running = true
try {
await fn()
} catch (error) {
console.error('[WARN] scheduled function:', error && error.stack || error)
} finally {
running = false
}
})
job.start()
return () => {
job.stop()
}
}
// -------------------------------------------------------------------
// Wrap a value in a function.
export const wrap = value => () => value

View File

@ -1,14 +1,39 @@
import Scheduler from '../scheduler'
import { BaseError } from 'make-error'
import { NoSuchObject } from '../api-errors.js'
import { Schedules } from '../models/schedule'
import {
NoSuchObject
} from '../api-errors'
forEach,
scheduleFn
} from '../utils'
// ===================================================================
class NoSuchSchedule extends NoSuchObject {
constructor (id) {
super(id, 'schedule')
const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId
export class SchedulerError extends BaseError {}
export class ScheduleOverride extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added')
}
}
export class NoSuchSchedule extends NoSuchObject {
constructor (scheduleOrId) {
super(scheduleOrId, 'schedule')
}
}
export class ScheduleNotEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled'
}
}
export class ScheduleAlreadyEnabled extends SchedulerError {
constructor (scheduleOrId) {
super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled')
}
}
@ -16,23 +41,87 @@ class NoSuchSchedule extends NoSuchObject {
export default class {
constructor (xo) {
this._schedules = new Schedules({
this.xo = xo
this._redisSchedules = new Schedules({
connection: xo._redis,
prefix: 'xo:schedule',
indexes: ['user_id', 'job']
})
const scheduler = this._scheduler = new Scheduler(xo)
this._scheduleTable = undefined
xo.on('start', () => scheduler._loadSchedules())
xo.on('stop', () => scheduler.disableAll())
xo.on('start', () => this._loadSchedules())
xo.on('stop', () => this._disableAll())
}
get scheduler () {
return this._scheduler
_add (schedule) {
const id = _resolveId(schedule)
this._schedules[id] = schedule
this._scheduleTable[id] = false
if (schedule.enabled) {
this._enable(schedule)
}
}
_exists (scheduleOrId) {
const id_ = _resolveId(scheduleOrId)
return id_ in this._schedules
}
_isEnabled (scheduleOrId) {
return this._scheduleTable[_resolveId(scheduleOrId)]
}
_enable (schedule) {
const { id } = schedule
const stopSchedule = scheduleFn(schedule.cron, () =>
this.xo.runJobSequence([ schedule.job ])
)
this._cronJobs[id] = stopSchedule
this._scheduleTable[id] = true
}
_disable (scheduleOrId) {
if (!this._exists(scheduleOrId)) {
throw new NoSuchSchedule(scheduleOrId)
}
if (!this._isEnabled(scheduleOrId)) {
throw new ScheduleNotEnabled(scheduleOrId)
}
const id = _resolveId(scheduleOrId)
this._cronJobs[id]() // Stop cron job.
delete this._cronJobs[id]
this._scheduleTable[id] = false
}
_disableAll () {
forEach(this._scheduleTable, (enabled, id) => {
if (enabled) {
this._disable(id)
}
})
}
get scheduleTable () {
return this._scheduleTable
}
async _loadSchedules () {
this._schedules = {}
this._scheduleTable = {}
this._cronJobs = {}
const schedules = await this.xo.getAllSchedules()
forEach(schedules, schedule => {
this._add(schedule)
})
}
async _getSchedule (id) {
const schedule = await this._schedules.first(id)
const schedule = await this._redisSchedules.first(id)
if (!schedule) {
throw new NoSuchSchedule(id)
}
@ -45,15 +134,15 @@ export default class {
}
async getAllSchedules () {
return await this._schedules.get()
return await this._redisSchedules.get()
}
async createSchedule (userId, {job, cron, enabled, name}) {
const schedule_ = await this._schedules.create(userId, job, cron, enabled, name)
const schedule_ = await this._redisSchedules.create(userId, job, cron, enabled, name)
const schedule = schedule_.properties
if (this.scheduler) {
this.scheduler.add(schedule)
}
this._add(schedule)
return schedule
}
@ -65,16 +154,33 @@ export default class {
if (enabled !== undefined) schedule.set('enabled', enabled)
if (name !== undefined) schedule.set('name', name)
await this._schedules.save(schedule)
if (this.scheduler) {
this.scheduler.update(schedule.properties)
await this._redisSchedules.save(schedule)
const { properties } = schedule
if (!this._exists(properties)) {
throw new NoSuchSchedule(properties)
}
if (this._isEnabled(properties)) {
await this._disable(properties)
}
this._add(properties)
}
async removeSchedule (id) {
await this._schedules.remove(id)
if (this.scheduler) {
this.scheduler.remove(id)
await this._redisSchedules.remove(id)
try {
this._disable(id)
} catch (exc) {
if (!exc instanceof SchedulerError) {
throw exc
}
} finally {
delete this._schedules[id]
delete this._scheduleTable[id]
}
}
}