Merge pull request #241 from vatesfr/a-schedule-links-a-function
Scheduler is now in scheduling.js and use ScheduleFn function.
This commit is contained in:
commit
5ee83a1af9
@ -23,7 +23,7 @@ disable.params = {
|
||||
}
|
||||
|
||||
export function getScheduleTable () {
|
||||
return this.scheduler.scheduleTable
|
||||
return this.scheduleTable
|
||||
}
|
||||
|
||||
disable.permission = 'admin'
|
||||
|
167
src/scheduler.js
167
src/scheduler.js
@ -1,167 +0,0 @@
|
||||
import {BaseError} from 'make-error'
|
||||
import {CronJob} from 'cron'
|
||||
|
||||
import { forEach } from './utils'
|
||||
|
||||
const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId
|
||||
|
||||
export class SchedulerError extends BaseError {}
|
||||
export class ScheduleOverride extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added')
|
||||
}
|
||||
}
|
||||
export class NoSuchSchedule extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('No schedule found for ID ' + _resolveId(scheduleOrId))
|
||||
}
|
||||
}
|
||||
export class ScheduleNotEnabled extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled'
|
||||
}
|
||||
}
|
||||
export class ScheduleAlreadyEnabled extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled')
|
||||
}
|
||||
}
|
||||
export class ScheduleJobNotFound extends SchedulerError {
|
||||
constructor (jobId, scheduleId) {
|
||||
super('Job ' + jobId + ' not found for Schedule ' + scheduleId)
|
||||
}
|
||||
}
|
||||
|
||||
export default class Scheduler {
|
||||
constructor (xo) {
|
||||
this.xo = xo
|
||||
this._scheduleTable = undefined
|
||||
|
||||
this._runningSchedules = {}
|
||||
}
|
||||
|
||||
async _loadSchedules () {
|
||||
this._schedules = {}
|
||||
const schedules = await this.xo.getAllSchedules()
|
||||
this._scheduleTable = {}
|
||||
this._cronJobs = {}
|
||||
forEach(schedules, schedule => {
|
||||
this._add(schedule)
|
||||
})
|
||||
}
|
||||
|
||||
add (schedule) {
|
||||
if (this.exists(schedule)) {
|
||||
throw new ScheduleOverride(schedule)
|
||||
}
|
||||
this._add(schedule)
|
||||
}
|
||||
|
||||
_add (schedule) {
|
||||
const id = _resolveId(schedule)
|
||||
this._schedules[id] = schedule
|
||||
this._scheduleTable[id] = false
|
||||
if (schedule.enabled) {
|
||||
this._enable(schedule)
|
||||
}
|
||||
}
|
||||
|
||||
remove (id) {
|
||||
try {
|
||||
this._disable(id)
|
||||
} catch (exc) {
|
||||
if (!exc instanceof SchedulerError) {
|
||||
throw exc
|
||||
}
|
||||
} finally {
|
||||
delete this._schedules[id]
|
||||
delete this._scheduleTable[id]
|
||||
}
|
||||
}
|
||||
|
||||
exists (scheduleOrId) {
|
||||
const id_ = _resolveId(scheduleOrId)
|
||||
return id_ in this._schedules
|
||||
}
|
||||
|
||||
async get (id) {
|
||||
if (!this.exists(id)) {
|
||||
throw new NoSuchSchedule(id)
|
||||
}
|
||||
return this._schedules[id]
|
||||
}
|
||||
|
||||
async _get (id) {
|
||||
const schedule = await this.xo.getSchedule(id)
|
||||
if (!schedule) {
|
||||
throw new NoSuchSchedule(id)
|
||||
}
|
||||
return schedule
|
||||
}
|
||||
|
||||
async update (schedule) {
|
||||
if (!this.exists(schedule)) {
|
||||
throw new NoSuchSchedule(schedule)
|
||||
}
|
||||
const enabled = this.isEnabled(schedule)
|
||||
if (enabled) {
|
||||
await this._disable(schedule)
|
||||
}
|
||||
this._add(schedule)
|
||||
}
|
||||
|
||||
isEnabled (scheduleOrId) {
|
||||
return this._scheduleTable[_resolveId(scheduleOrId)]
|
||||
}
|
||||
|
||||
_enable (schedule) {
|
||||
const running = this._runningSchedules
|
||||
|
||||
const { id } = schedule
|
||||
const jobId = schedule.job
|
||||
|
||||
const cronJob = new CronJob(schedule.cron, async () => {
|
||||
if (running[id]) {
|
||||
return // Simply ignore.
|
||||
}
|
||||
|
||||
try {
|
||||
running[id] = true
|
||||
await this.xo.runJobSequence([ jobId ])
|
||||
} catch (error) {
|
||||
// FIXME: better error handling
|
||||
console.error(error && error.stack || error)
|
||||
} finally {
|
||||
delete running[id]
|
||||
}
|
||||
})
|
||||
this._cronJobs[id] = cronJob
|
||||
cronJob.start()
|
||||
this._scheduleTable[id] = true
|
||||
}
|
||||
|
||||
_disable (scheduleOrId) {
|
||||
if (!this.exists(scheduleOrId)) {
|
||||
throw new NoSuchSchedule(scheduleOrId)
|
||||
}
|
||||
if (!this.isEnabled(scheduleOrId)) {
|
||||
throw new ScheduleNotEnabled(scheduleOrId)
|
||||
}
|
||||
const id = _resolveId(scheduleOrId)
|
||||
this._cronJobs[id].stop()
|
||||
delete this._cronJobs[id]
|
||||
this._scheduleTable[id] = false
|
||||
}
|
||||
|
||||
disableAll () {
|
||||
forEach(this.scheduleTable, (enabled, id) => {
|
||||
if (enabled) {
|
||||
this._disable(id)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
get scheduleTable () {
|
||||
return this._scheduleTable
|
||||
}
|
||||
}
|
29
src/utils.js
29
src/utils.js
@ -10,6 +10,7 @@ import isString from 'lodash.isstring'
|
||||
import kindOf from 'kindof'
|
||||
import multiKeyHashInt from 'multikey-hash'
|
||||
import xml2js from 'xml2js'
|
||||
import { CronJob } from 'cron'
|
||||
import { defer } from 'promise-toolbox'
|
||||
import {promisify} from 'bluebird'
|
||||
import {
|
||||
@ -479,5 +480,33 @@ export const streamToArray = (stream, filter = undefined) => new Promise((resolv
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
export const scheduleFn = (cronPattern, fn) => {
|
||||
let running = false
|
||||
|
||||
const job = new CronJob(cronPattern, async () => {
|
||||
if (running) {
|
||||
return
|
||||
}
|
||||
|
||||
running = true
|
||||
|
||||
try {
|
||||
await fn()
|
||||
} catch (error) {
|
||||
console.error('[WARN] scheduled function:', error && error.stack || error)
|
||||
} finally {
|
||||
running = false
|
||||
}
|
||||
})
|
||||
|
||||
job.start()
|
||||
|
||||
return () => {
|
||||
job.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Wrap a value in a function.
|
||||
export const wrap = value => () => value
|
||||
|
@ -1,14 +1,39 @@
|
||||
import Scheduler from '../scheduler'
|
||||
import { BaseError } from 'make-error'
|
||||
import { NoSuchObject } from '../api-errors.js'
|
||||
import { Schedules } from '../models/schedule'
|
||||
|
||||
import {
|
||||
NoSuchObject
|
||||
} from '../api-errors'
|
||||
forEach,
|
||||
scheduleFn
|
||||
} from '../utils'
|
||||
|
||||
// ===================================================================
|
||||
|
||||
class NoSuchSchedule extends NoSuchObject {
|
||||
constructor (id) {
|
||||
super(id, 'schedule')
|
||||
const _resolveId = scheduleOrId => scheduleOrId.id || scheduleOrId
|
||||
|
||||
export class SchedulerError extends BaseError {}
|
||||
|
||||
export class ScheduleOverride extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ID ' + _resolveId(scheduleOrId) + ' is already added')
|
||||
}
|
||||
}
|
||||
|
||||
export class NoSuchSchedule extends NoSuchObject {
|
||||
constructor (scheduleOrId) {
|
||||
super(scheduleOrId, 'schedule')
|
||||
}
|
||||
}
|
||||
|
||||
export class ScheduleNotEnabled extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ' + _resolveId(scheduleOrId)) + ' is not enabled'
|
||||
}
|
||||
}
|
||||
|
||||
export class ScheduleAlreadyEnabled extends SchedulerError {
|
||||
constructor (scheduleOrId) {
|
||||
super('Schedule ' + _resolveId(scheduleOrId) + ' is already enabled')
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,23 +41,87 @@ class NoSuchSchedule extends NoSuchObject {
|
||||
|
||||
export default class {
|
||||
constructor (xo) {
|
||||
this._schedules = new Schedules({
|
||||
this.xo = xo
|
||||
this._redisSchedules = new Schedules({
|
||||
connection: xo._redis,
|
||||
prefix: 'xo:schedule',
|
||||
indexes: ['user_id', 'job']
|
||||
})
|
||||
const scheduler = this._scheduler = new Scheduler(xo)
|
||||
this._scheduleTable = undefined
|
||||
|
||||
xo.on('start', () => scheduler._loadSchedules())
|
||||
xo.on('stop', () => scheduler.disableAll())
|
||||
xo.on('start', () => this._loadSchedules())
|
||||
xo.on('stop', () => this._disableAll())
|
||||
}
|
||||
|
||||
get scheduler () {
|
||||
return this._scheduler
|
||||
_add (schedule) {
|
||||
const id = _resolveId(schedule)
|
||||
this._schedules[id] = schedule
|
||||
this._scheduleTable[id] = false
|
||||
if (schedule.enabled) {
|
||||
this._enable(schedule)
|
||||
}
|
||||
}
|
||||
|
||||
_exists (scheduleOrId) {
|
||||
const id_ = _resolveId(scheduleOrId)
|
||||
return id_ in this._schedules
|
||||
}
|
||||
|
||||
_isEnabled (scheduleOrId) {
|
||||
return this._scheduleTable[_resolveId(scheduleOrId)]
|
||||
}
|
||||
|
||||
_enable (schedule) {
|
||||
const { id } = schedule
|
||||
|
||||
const stopSchedule = scheduleFn(schedule.cron, () =>
|
||||
this.xo.runJobSequence([ schedule.job ])
|
||||
)
|
||||
|
||||
this._cronJobs[id] = stopSchedule
|
||||
this._scheduleTable[id] = true
|
||||
}
|
||||
|
||||
_disable (scheduleOrId) {
|
||||
if (!this._exists(scheduleOrId)) {
|
||||
throw new NoSuchSchedule(scheduleOrId)
|
||||
}
|
||||
if (!this._isEnabled(scheduleOrId)) {
|
||||
throw new ScheduleNotEnabled(scheduleOrId)
|
||||
}
|
||||
const id = _resolveId(scheduleOrId)
|
||||
this._cronJobs[id]() // Stop cron job.
|
||||
delete this._cronJobs[id]
|
||||
this._scheduleTable[id] = false
|
||||
}
|
||||
|
||||
_disableAll () {
|
||||
forEach(this._scheduleTable, (enabled, id) => {
|
||||
if (enabled) {
|
||||
this._disable(id)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
get scheduleTable () {
|
||||
return this._scheduleTable
|
||||
}
|
||||
|
||||
async _loadSchedules () {
|
||||
this._schedules = {}
|
||||
this._scheduleTable = {}
|
||||
this._cronJobs = {}
|
||||
|
||||
const schedules = await this.xo.getAllSchedules()
|
||||
|
||||
forEach(schedules, schedule => {
|
||||
this._add(schedule)
|
||||
})
|
||||
}
|
||||
|
||||
async _getSchedule (id) {
|
||||
const schedule = await this._schedules.first(id)
|
||||
const schedule = await this._redisSchedules.first(id)
|
||||
|
||||
if (!schedule) {
|
||||
throw new NoSuchSchedule(id)
|
||||
}
|
||||
@ -45,15 +134,15 @@ export default class {
|
||||
}
|
||||
|
||||
async getAllSchedules () {
|
||||
return await this._schedules.get()
|
||||
return await this._redisSchedules.get()
|
||||
}
|
||||
|
||||
async createSchedule (userId, {job, cron, enabled, name}) {
|
||||
const schedule_ = await this._schedules.create(userId, job, cron, enabled, name)
|
||||
const schedule_ = await this._redisSchedules.create(userId, job, cron, enabled, name)
|
||||
const schedule = schedule_.properties
|
||||
if (this.scheduler) {
|
||||
this.scheduler.add(schedule)
|
||||
}
|
||||
|
||||
this._add(schedule)
|
||||
|
||||
return schedule
|
||||
}
|
||||
|
||||
@ -65,16 +154,33 @@ export default class {
|
||||
if (enabled !== undefined) schedule.set('enabled', enabled)
|
||||
if (name !== undefined) schedule.set('name', name)
|
||||
|
||||
await this._schedules.save(schedule)
|
||||
if (this.scheduler) {
|
||||
this.scheduler.update(schedule.properties)
|
||||
await this._redisSchedules.save(schedule)
|
||||
|
||||
const { properties } = schedule
|
||||
|
||||
if (!this._exists(properties)) {
|
||||
throw new NoSuchSchedule(properties)
|
||||
}
|
||||
|
||||
if (this._isEnabled(properties)) {
|
||||
await this._disable(properties)
|
||||
}
|
||||
|
||||
this._add(properties)
|
||||
}
|
||||
|
||||
async removeSchedule (id) {
|
||||
await this._schedules.remove(id)
|
||||
if (this.scheduler) {
|
||||
this.scheduler.remove(id)
|
||||
await this._redisSchedules.remove(id)
|
||||
|
||||
try {
|
||||
this._disable(id)
|
||||
} catch (exc) {
|
||||
if (!exc instanceof SchedulerError) {
|
||||
throw exc
|
||||
}
|
||||
} finally {
|
||||
delete this._schedules[id]
|
||||
delete this._scheduleTable[id]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user