Merge pull request #192 from vatesfr/mixins

Split Xo with mixins.
This commit is contained in:
Julien Fontanet
2016-01-13 11:47:49 +01:00
13 changed files with 1184 additions and 939 deletions

View File

@@ -18,6 +18,8 @@ cd "$(dirname "$(which "$0")")"
for f in *.js *.coffee
do
[ -f "$f" ] || continue
base=${f%.*}
[ "$base" != index ] || continue

View File

@@ -1,8 +1,14 @@
import bind from 'lodash.bind'
import isArray from 'lodash.isarray'
import isFunction from 'lodash.isfunction'
// ===================================================================
const {defineProperty} = Object
const {
defineProperties,
defineProperty,
getOwnPropertyDescriptor
} = Object
// ===================================================================
@@ -85,3 +91,100 @@ export const debounce = (duration) => (target, name, descriptor) => {
descriptor.value = debounced
return descriptor
}
// -------------------------------------------------------------------
const _ownKeys = (
typeof Reflect !== 'undefined' && Reflect.ownKeys ||
(({
getOwnPropertyNames: names,
getOwnPropertySymbols: symbols
}) => symbols
? obj => names(obj).concat(symbols(obj))
: names
)(Object)
)
const _bindPropertyDescriptor = (descriptor, thisArg) => {
const { get, set, value } = descriptor
if (get) {
descriptor.get = bind(get, thisArg)
}
if (set) {
descriptor.set = bind(set, thisArg)
}
if (isFunction(value)) {
descriptor.value = bind(value, thisArg)
}
return descriptor
}
const _isIgnoredProperty = name => (
name[0] === '_' ||
name === 'constructor'
)
const _isIgnoredStaticProperty = name => (
name === 'length' ||
name === 'name' ||
name === 'prototype'
)
export const mixin = MixIns => Class => {
if (!isArray(MixIns)) {
MixIns = [ MixIns ]
}
const { name } = Class
const Decorator = (...args) => {
const instance = new Class(...args)
for (const MixIn of MixIns) {
const { prototype } = MixIn
const mixinInstance = new MixIn(instance)
const descriptors = { __proto__: null }
for (const prop of _ownKeys(prototype)) {
if (_isIgnoredProperty(prop)) {
continue
}
if (prop in instance) {
throw new Error(`${name}#${prop} is already defined`)
}
descriptors[prop] = _bindPropertyDescriptor(
getOwnPropertyDescriptor(prototype, prop),
mixinInstance
)
}
defineProperties(instance, descriptors)
}
return instance
}
// Copy original and mixed-in static properties on Decorator class.
const descriptors = { __proto__: null }
for (const prop of _ownKeys(Class)) {
descriptors[prop] = getOwnPropertyDescriptor(Class, prop)
}
for (const MixIn of MixIns) {
for (const prop of _ownKeys(MixIn)) {
if (_isIgnoredStaticProperty(prop)) {
continue
}
if (prop in descriptors) {
throw new Error(`${name}.${prop} is already defined`)
}
descriptors[prop] = getOwnPropertyDescriptor(MixIn, prop)
}
}
defineProperties(Decorator, descriptors)
return Decorator
}

View File

@@ -32,9 +32,6 @@ import {
import * as apiMethods from './api/index'
import Api from './api'
import JobExecutor from './job-executor'
import RemoteHandler from './remote-handler'
import Scheduler from './scheduler'
import WebServer from 'http-server-plus'
import wsProxy from './ws-proxy'
import Xo from './xo'
@@ -224,7 +221,7 @@ async function registerPlugin (pluginPath, pluginName) {
? factory({ xo: this })
: factory
await this._registerPlugin(
await this.registerPlugin(
pluginName,
instance,
configurationSchema
@@ -327,7 +324,7 @@ const setUpProxies = (express, opts, xo) => {
const webSocketServer = new WebSocket.Server({
noServer: true
})
xo.on('stopping', () => pFromCallback(cb => webSocketServer.close(cb)))
xo.on('stop', () => pFromCallback(cb => webSocketServer.close(cb)))
express.on('upgrade', (req, socket, head) => {
const {url} = req
@@ -401,7 +398,7 @@ const setUpApi = (webServer, xo, verboseLogsOnErrors) => {
server: webServer,
path: '/api/'
})
xo.on('stopping', () => pFromCallback(cb => webSocketServer.close(cb)))
xo.on('stop', () => pFromCallback(cb => webSocketServer.close(cb)))
// FIXME: it can cause issues if there any property assignments in
// XO methods called from the API.
@@ -461,31 +458,6 @@ const setUpApi = (webServer, xo, verboseLogsOnErrors) => {
})
}
const setUpJobExecutor = xo => {
const executor = new JobExecutor(xo)
xo.defineProperty('jobExecutor', executor)
}
const setUpScheduler = xo => {
if (!xo.jobExecutor) {
setUpJobExecutor(xo)
}
const scheduler = new Scheduler(xo, {executor: xo.jobExecutor})
xo.on('stopping', () => scheduler.disableAll())
xo.defineProperty('scheduler', scheduler)
}
const setUpRemoteHandler = async xo => {
const remoteHandler = new RemoteHandler()
xo.defineProperty('remoteHandler', remoteHandler)
await xo.initRemotes()
await xo.syncAllRemotes()
xo.on('stopping', () => xo.disableAllRemotes())
}
// ===================================================================
const CONSOLE_PROXY_PATH_RE = /^\/api\/consoles\/(.*)$/
@@ -494,7 +466,7 @@ const setUpConsoleProxy = (webServer, xo) => {
const webSocketServer = new WebSocket.Server({
noServer: true
})
xo.on('stopping', () => pFromCallback(cb => webSocketServer.close(cb)))
xo.on('stop', () => pFromCallback(cb => webSocketServer.close(cb)))
webServer.on('upgrade', async (req, socket, head) => {
const matches = CONSOLE_PROXY_PATH_RE.exec(req.url)
@@ -614,7 +586,7 @@ export default async function main (args) {
const xo = new Xo(config)
// Register web server close on XO stop.
xo.on('stopping', () => pFromCallback(cb => webServer.close(cb)))
xo.on('stop', () => pFromCallback(cb => webServer.close(cb)))
// Connects to all registered servers.
await xo.start()
@@ -645,10 +617,6 @@ export default async function main (args) {
// Must be set up before the static files.
setUpApi(webServer, xo, config.verboseApiLogsOnErrors)
setUpJobExecutor(xo)
setUpScheduler(xo)
setUpRemoteHandler(xo)
setUpProxies(express, config.http.proxies, xo)
setUpStaticFiles(express, config.http.mounts)
@@ -674,7 +642,7 @@ export default async function main (args) {
process.on('SIGINT', () => shutdown('SIGINT'))
process.on('SIGTERM', () => shutdown('SIGTERM'))
await eventToPromise(xo, 'stop')
await eventToPromise(xo, 'stopped')
debug('bye :-)')
}

View File

@@ -48,7 +48,11 @@ export default class JobExecutor {
this._extractValueCb = {
'set': items => items.values
}
this._logger = this.xo.getLogger('jobs')
// The logger is not available until Xo has started.
xo.on('started', () => {
this._logger = this.xo.getLogger('jobs')
})
}
async exec (job) {

View File

@@ -33,11 +33,9 @@ export class ScheduleJobNotFound extends SchedulerError {
}
export default class Scheduler {
constructor (xo, {executor}) {
this.executor = executor
constructor (xo) {
this.xo = xo
this._scheduleTable = undefined
this._loadSchedules()
this._runningSchedules = {}
}
@@ -129,8 +127,7 @@ export default class Scheduler {
try {
running[id] = true
const job = await this._getJob(jobId, schedule.id)
await this.executor.exec(job)
await this.runJobSequence([ jobId ])
} catch (_) {
// FIXME What do we do ?
} finally {
@@ -142,14 +139,6 @@ export default class Scheduler {
this._scheduleTable[id] = true
}
async _getJob (id, scheduleId) {
const job = await this.xo.getJob(id)
if (!job) {
throw new ScheduleJobNotFound(id, scheduleId)
}
return job
}
_disable (scheduleOrId) {
if (!this.exists(scheduleOrId)) {
throw new NoSuchSchedule(scheduleOrId)

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env sh
# TODO: this generation should probably be automated and integrated
# into the build system.
set -e -u
cd "$(dirname "$(which "$0")")"
{
printf %s '//
// This file has been generated by ./.generate-index.sh
//
// It MUST be re-generated each time an API namespace (read file) is
// added or removed.
//
'
for f in *.js *.coffee
do
[ -f "$f" ] || continue
base=${f%.*}
[ "$base" != index ] || continue
printf '%s\n' "export { default as $base } from './$base'"
done | sort
} > index.js

482
src/xo-mixins/backups.js Normal file
View File

@@ -0,0 +1,482 @@
import endsWith from 'lodash.endswith'
import escapeStringRegexp from 'escape-string-regexp'
import eventToPromise from 'event-to-promise'
import filter from 'lodash.filter'
import find from 'lodash.find'
import findIndex from 'lodash.findindex'
import sortBy from 'lodash.sortby'
import startsWith from 'lodash.startswith'
import {
createReadStream,
createWriteStream,
ensureDir,
readdir,
readFile,
stat,
unlink,
writeFile
} from 'fs-promise'
import {
basename,
dirname
} from 'path'
import xapiObjectToXo from '../xapi-object-to-xo'
import {
forEach,
mapToArray,
noop,
safeDateFormat
} from '../utils'
import {
VDI_FORMAT_VHD
} from '../xapi'
// ===================================================================
const isVdiBackup = name => /^\d+T\d+Z_(?:full|delta)\.vhd$/.test(name)
const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name)
// ===================================================================
export default class {
constructor (xo) {
this._xo = xo
}
async _listRemoteBackups (remote) {
const path = remote.path
// List backups. (Except delta backups)
const xvaFilter = file => endsWith(file, '.xva')
const files = await readdir(path)
const backups = filter(files, xvaFilter)
// List delta backups.
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
for (const deltaDir of deltaDirs) {
const files = await readdir(`${path}/${deltaDir}`)
const deltaBackups = filter(files, xvaFilter)
backups.push(...mapToArray(
deltaBackups,
deltaBackup => `${deltaDir}/${deltaBackup}`
))
}
return backups
}
async _openAndwaitReadableFile (path, errorMessage) {
const stream = createReadStream(path)
try {
await eventToPromise(stream, 'readable')
} catch (error) {
if (error.code === 'ENOENT') {
throw new Error(errorMessage)
}
throw error
}
const stats = await stat(path)
return [ stream, stats.size ]
}
async importVmBackup (remoteId, file, sr) {
const remote = await this._xo.getRemote(remoteId)
const path = `${remote.path}/${file}`
const [ stream, length ] = await this._openAndwaitReadableFile(
path, 'VM to import not found in this remote')
const xapi = this._xo.getXAPI(sr)
await xapi.importVm(stream, length, { srId: sr._xapiId })
}
// -----------------------------------------------------------------
// TODO: The other backup methods must use this function !
// Prerequisite: The backups array must be ordered. (old to new backups)
async _removeOldBackups (backups, path, n) {
if (n <= 0) {
return
}
await Promise.all(
mapToArray(backups.slice(0, n), backup => unlink(`${path}/${backup}`))
)
}
// -----------------------------------------------------------------
async _listVdiBackups (path) {
const files = await readdir(path)
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
let i
// Avoid unstable state: No full vdi found to the beginning of array. (base)
for (i = 0; i < backups.length && isDeltaVdiBackup(backups[i]); i++);
await this._removeOldBackups(backups, path, i)
return backups.slice(i)
}
_countDeltaVdiBackups (backups) {
let nDelta = 0
for (let i = backups.length - 1; i >= 0 && isDeltaVdiBackup(backups[i]); nDelta++, i--);
return nDelta
}
async _rollingDeltaVdiBackup ({vdi, path, depth}) {
const xapi = this._xo.getXAPI(vdi)
const backupDirectory = `vdi_${vdi.uuid}`
vdi = xapi.getObject(vdi._xapiId)
path = `${path}/${backupDirectory}`
await ensureDir(path)
const backups = await this._listVdiBackups(path)
// Count delta backups.
const nDelta = this._countDeltaVdiBackups(backups)
// Make snapshot.
const date = safeDateFormat(new Date())
const base = find(vdi.$snapshots, { name_label: 'XO_DELTA_BASE_VDI_SNAPSHOT' })
const currentSnapshot = await xapi.snapshotVdi(vdi.$id, 'XO_DELTA_BASE_VDI_SNAPSHOT')
// It is strange to have no base but a full backup !
// A full is necessary if it not exists backups or
// the number of delta backups is sufficient.
const isFull = (nDelta + 1 >= depth || !backups.length || !base)
// Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
const backupFullPath = `${path}/${vdiFilename}`
try {
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
baseId: isFull ? undefined : base.$id,
format: VDI_FORMAT_VHD
})
const targetStream = createWriteStream(backupFullPath, { flags: 'wx' })
sourceStream.on('error', error => targetStream.emit('error', error))
await eventToPromise(sourceStream.pipe(targetStream), 'finish')
} catch (e) {
// Remove backup. (corrupt)
await xapi.deleteVdi(currentSnapshot.$id)
unlink(backupFullPath).catch(noop)
throw e
}
if (base) {
await xapi.deleteVdi(base.$id)
}
// Returns relative path. (subdir and vdi filename)
return [ backupDirectory, vdiFilename ]
}
async _removeOldDeltaVdiBackups ({path, depth}) {
const backups = await this._listVdiBackups(path)
let i
for (i = backups.length - depth; i >= 0 && isDeltaVdiBackup(backups[i]); i--);
await this._removeOldBackups(backups, path, i)
}
async _importVdiBackupContent (xapi, file, vdiId) {
const [ stream, length ] = await this._openAndwaitReadableFile(
file, 'VDI to import not found in this remote'
)
await xapi.importVdiContent(vdiId, stream, {
length,
format: VDI_FORMAT_VHD
})
}
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const path = dirname(`${remote.path}/${filePath}`)
const filename = basename(filePath)
const backups = await this._listVdiBackups(path)
// Search file. (delta or full backup)
const i = findIndex(backups, backup => backup === filename)
if (i === -1) {
throw new Error('VDI to import not found in this remote')
}
// Search full backup.
let j
for (j = i; j >= 0 && isDeltaVdiBackup(backups[j]); j--);
if (j === -1) {
throw new Error(`unable to found full vdi backup of: ${filePath}`)
}
// Restore...
const xapi = this._xo.getXAPI(vdi)
for (; j <= i; j++) {
await this._importVdiBackupContent(xapi, `${path}/${backups[j]}`, vdi._xapiId)
}
}
// -----------------------------------------------------------------
async _listDeltaVmBackups (path) {
const files = await readdir(path)
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
}
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
const remote = await this._xo.getRemote(remoteId)
const directory = `vm_delta_${tag}_${vm.uuid}`
const path = `${remote.path}/${directory}`
await ensureDir(path)
const info = {
vbds: [],
vdis: {}
}
const promises = []
const xapi = this._xo.getXAPI(vm)
for (const vbdId of vm.$VBDs) {
const vbd = this._xo.getObject(vbdId)
if (!vbd.VDI) {
continue
}
if (vbd.is_cd_drive) {
continue
}
const vdiXo = this._xo.getObject(vbd.VDI)
const vdi = xapi.getObject(vdiXo._xapiId)
const vdiUUID = vdi.uuid
info.vbds.push({
...xapi.getObject(vbd._xapiId),
xoVdi: vdiUUID
})
// Warning: There may be the same VDI id for a VBD set.
if (!info.vdis[vdiUUID]) {
info.vdis[vdiUUID] = { ...vdi }
promises.push(
this._rollingDeltaVdiBackup({vdi: vdiXo, path, depth}).then(
([ backupPath, backupFile ]) => {
info.vdis[vdiUUID].xoPath = `${backupPath}/${backupFile}`
return backupPath // Used by _removeOldDeltaVdiBackups
}
)
)
}
}
const vdiDirPaths = await Promise.all(promises)
const backups = await this._listDeltaVmBackups(path)
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const xvaPath = `${path}/${backupFormat}.xva`
const infoPath = `${path}/${backupFormat}.json`
try {
await Promise.all([
this.backupVm({vm, pathToFile: xvaPath, onlyMetadata: true}),
writeFile(infoPath, JSON.stringify(info), {flag: 'wx'})
])
} catch (e) {
await Promise.all([unlink(xvaPath).catch(noop), unlink(infoPath).catch(noop)])
throw e
}
// Here we have a completed backup. We can remove old vdis.
await Promise.all(
mapToArray(vdiDirPaths, vdiDirPath => this._removeOldDeltaVdiBackups({path: `${path}/${vdiDirPath}`, depth}))
)
// Remove x2 files : json AND xva files.
await this._removeOldBackups(backups, path, backups.length - (depth - 1) * 2)
// Returns relative path.
return `${directory}/${backupFormat}`
}
async _importVmMetadata (xapi, file) {
const [ stream, length ] = await this._openAndwaitReadableFile(
file, 'VM metadata to import not found in this remote'
)
return await xapi.importVm(stream, length, { onlyMetadata: true })
}
async _importDeltaVdiBackupFromVm (xapi, vmId, remoteId, directory, vdiInfo) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
await this.importDeltaVdiBackup({
vdi: this._xo.getObject(vdiId),
remoteId,
filePath: `${directory}/${vdiInfo.xoPath}`
})
return vdiId
}
async importDeltaVmBackup ({sr, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const fullBackupPath = `${remote.path}/${filePath}`
const xapi = this._xo.getXAPI(sr)
// Import vm metadata.
const vm = await this._importVmMetadata(xapi, `${fullBackupPath}.xva`)
const vmName = vm.name_label
// Disable start and change the VM name label during import.
await Promise.all([
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
])
// Destroy vbds if necessary. Why ?
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
const info = JSON.parse(await readFile(`${fullBackupPath}.json`))
// Import VDIs.
const vdiIds = {}
await Promise.all(
mapToArray(
info.vdis,
async vdiInfo => {
vdiInfo.sr = sr._xapiId
const vdiId = await this._importDeltaVdiBackupFromVm(xapi, vm.$id, remoteId, dirname(filePath), vdiInfo)
vdiIds[vdiInfo.uuid] = vdiId
}
)
)
await Promise.all(
mapToArray(
info.vbds,
vbdInfo => {
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
}
)
)
// Import done, reenable start and set real vm name.
await Promise.all([
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
xapi._setObjectProperties(vm, { name_label: vmName })
])
return xapiObjectToXo(vm).id
}
// -----------------------------------------------------------------
async backupVm ({vm, pathToFile, compress, onlyMetadata}) {
const targetStream = createWriteStream(pathToFile, { flags: 'wx' })
const promise = eventToPromise(targetStream, 'finish')
const sourceStream = await this._xo.getXAPI(vm).exportVm(vm._xapiId, {
compress,
onlyMetadata: onlyMetadata || false
})
sourceStream.pipe(targetStream)
await promise
}
async rollingBackupVm ({vm, path, tag, depth, compress, onlyMetadata}) {
await ensureDir(path)
const files = await readdir(path)
const reg = new RegExp('^[^_]+_' + escapeStringRegexp(`${tag}_${vm.name_label}.xva`))
const backups = sortBy(filter(files, (fileName) => reg.test(fileName)))
const date = safeDateFormat(new Date())
const backupFullPath = `${path}/${date}_${tag}_${vm.name_label}.xva`
await this.backupVm({vm, pathToFile: backupFullPath, compress, onlyMetadata})
const promises = []
for (let surplus = backups.length - (depth - 1); surplus > 0; surplus--) {
const oldBackup = backups.shift()
promises.push(unlink(`${path}/${oldBackup}`))
}
await Promise.all(promises)
return backupFullPath
}
async rollingSnapshotVm (vm, tag, depth) {
const xapi = this._xo.getXAPI(vm)
vm = xapi.getObject(vm._xapiId)
const reg = new RegExp('^rollingSnapshot_[^_]+_' + escapeStringRegexp(tag) + '_')
const snapshots = sortBy(filter(vm.$snapshots, snapshot => reg.test(snapshot.name_label)), 'name_label')
const date = safeDateFormat(new Date())
await xapi.snapshotVm(vm.$id, `rollingSnapshot_${date}_${tag}_${vm.name_label}`)
const promises = []
for (let surplus = snapshots.length - (depth - 1); surplus > 0; surplus--) {
const oldSnap = snapshots.shift()
promises.push(xapi.deleteVm(oldSnap.uuid, true))
}
await Promise.all(promises)
}
async rollingDrCopyVm ({vm, sr, tag, depth}) {
tag = 'DR_' + tag
const reg = new RegExp('^' + escapeStringRegexp(`${vm.name_label}_${tag}_`) + '[0-9]{8}T[0-9]{6}Z$')
const targetXapi = this._xo.getXAPI(sr)
sr = targetXapi.getObject(sr._xapiId)
const sourceXapi = this._xo.getXAPI(vm)
vm = sourceXapi.getObject(vm._xapiId)
const vms = []
forEach(sr.$VDIs, vdi => {
const vbds = vdi.$VBDs
const vm = vbds && vbds[0] && vbds[0].$VM
if (vm && reg.test(vm.name_label)) {
vms.push(vm)
}
})
const olderCopies = sortBy(vms, 'name_label')
const copyName = `${vm.name_label}_${tag}_${safeDateFormat(new Date())}`
const drCopy = await sourceXapi.remoteCopyVm(vm.$id, targetXapi, sr.$id, {
nameLabel: copyName
})
await targetXapi.addTag(drCopy.$id, 'Disaster Recovery')
const promises = []
for (let surplus = olderCopies.length - (depth - 1); surplus > 0; surplus--) {
const oldDRVm = olderCopies.shift()
promises.push(targetXapi.deleteVm(oldDRVm.$id, true))
}
await Promise.all(promises)
}
}

11
src/xo-mixins/index.js Normal file
View File

@@ -0,0 +1,11 @@
//
// This file has been generated by ./.generate-index.sh
//
// It MUST be re-generated each time an API namespace (read file) is
// added or removed.
//
export { default as backups } from './backups'
export { default as jobs } from './jobs'
export { default as plugins } from './plugins'
export { default as remotes } from './remotes'
export { default as scheduling } from './scheduling'

76
src/xo-mixins/jobs.js Normal file
View File

@@ -0,0 +1,76 @@
import JobExecutor from '../job-executor'
import { Jobs } from '../models/job'
import {
JsonRpcError,
NoSuchObject
} from '../api-errors'
// ===================================================================
class NoSuchJob extends NoSuchObject {
constructor (id) {
super(id, 'job')
}
}
// ===================================================================
export default class {
constructor (xo) {
this._executor = new JobExecutor(xo)
this._jobs = new Jobs({
connection: xo._redis,
prefix: 'xo:job',
indexes: ['user_id', 'key']
})
}
async getAllJobs () {
return await this._jobs.get()
}
async getJob (id) {
const job = await this._jobs.first(id)
if (!job) {
throw new NoSuchJob(id)
}
return job.properties
}
async createJob (userId, job) {
// TODO: use plain objects
const job_ = await this._jobs.create(userId, job)
return job_.properties
}
async updateJob (job) {
return await this._jobs.save(job)
}
async removeJob (id) {
return await this._jobs.remove(id)
}
async runJobSequence (idSequence) {
const notFound = []
for (const id of idSequence) {
let job
try {
job = await this.getJob(id)
} catch (error) {
if (error instanceof NoSuchJob) {
notFound.push(id)
} else {
throw error
}
}
if (job) {
await this._executor.exec(job)
}
}
if (notFound.length > 0) {
throw new JsonRpcError(`The following jobs were not found: ${notFound.join()}`)
}
}
}

203
src/xo-mixins/plugins.js Normal file
View File

@@ -0,0 +1,203 @@
import createJsonSchemaValidator from 'is-my-json-valid'
import isFunction from 'lodash.isfunction'
import { PluginsMetadata } from '../models/plugin-metadata'
import {
InvalidParameters,
NoSuchObject
} from '../api-errors'
import {
noop,
mapToArray
} from '../utils'
// ===================================================================
class NoSuchPlugin extends NoSuchObject {
constructor (id) {
super(id, 'plugin')
}
}
// ===================================================================
export default class {
constructor (xo) {
this._plugins = []
this._pluginsMetadata = new PluginsMetadata({
connection: xo._redis,
prefix: 'xo:plugin-metadata'
})
}
_getRawPlugin (id) {
const plugin = this._plugins[id]
if (!plugin) {
throw new NoSuchPlugin(id)
}
return plugin
}
async _getPluginMetadata (id) {
const metadata = await this._pluginsMetadata.first(id)
return metadata
? metadata.properties
: null
}
async registerPlugin (
name,
instance,
configurationSchema
) {
const id = name
const plugin = this._plugins[id] = {
configured: !configurationSchema,
configurationSchema,
id,
instance,
name,
unloadable: isFunction(instance.unload)
}
const metadata = await this._getPluginMetadata(id)
let autoload = true
let configuration
if (metadata) {
({
autoload,
configuration
} = metadata)
} else {
console.log(`[NOTICE] register plugin ${name} for the first time`)
await this._pluginsMetadata.save({
id,
autoload
})
}
// Configure plugin if necessary. (i.e. configurationSchema)
// Load plugin.
// Ignore configuration and loading errors.
Promise.resolve()
.then(() => {
if (!plugin.configured) {
return this._configurePlugin(plugin, configuration)
}
})
.then(() => {
if (autoload) {
return this.loadPlugin(id)
}
})
.catch(noop)
}
async _getPlugin (id) {
const {
configurationSchema,
loaded,
name,
unloadable
} = this._getRawPlugin(id)
const {
autoload,
configuration
} = (await this._getPluginMetadata(id)) || {}
return {
id,
name,
autoload,
loaded,
unloadable,
configuration,
configurationSchema
}
}
async getPlugins () {
return await Promise.all(
mapToArray(this._plugins, ({ id }) => this._getPlugin(id))
)
}
// Validate the configuration and configure the plugin instance.
async _configurePlugin (plugin, configuration) {
if (!plugin.configurationSchema) {
throw new InvalidParameters('plugin not configurable')
}
const validate = createJsonSchemaValidator(plugin.configurationSchema)
if (!validate(configuration)) {
throw new InvalidParameters(validate.errors)
}
// Sets the plugin configuration.
await plugin.instance.configure({
// Shallow copy of the configuration object to avoid most of the
// errors when the plugin is altering the configuration object
// which is handed over to it.
...configuration
})
plugin.configured = true
}
// Validate the configuration, configure the plugin instance and
// save the new configuration.
async configurePlugin (id, configuration) {
const plugin = this._getRawPlugin(id)
await this._configurePlugin(plugin, configuration)
// Saves the configuration.
await this._pluginsMetadata.merge(id, { configuration })
}
async disablePluginAutoload (id) {
// TODO: handle case where autoload is already disabled.
await this._pluginsMetadata.merge(id, { autoload: false })
}
async enablePluginAutoload (id) {
// TODO: handle case where autoload is already enabled.
await this._pluginsMetadata.merge(id, { autoload: true })
}
async loadPlugin (id) {
const plugin = this._getRawPlugin(id)
if (plugin.loaded) {
throw new InvalidParameters('plugin already loaded')
}
if (!plugin.configured) {
throw new InvalidParameters('plugin not configured')
}
await plugin.instance.load()
plugin.loaded = true
}
async unloadPlugin (id) {
const plugin = this._getRawPlugin(id)
if (!plugin.loaded) {
throw new InvalidParameters('plugin already unloaded')
}
if (plugin.unloadable === false) {
throw new InvalidParameters('plugin cannot be unloaded')
}
await plugin.instance.unload()
plugin.loaded = false
}
async purgePluginConfiguration (id) {
await this._pluginsMetadata.merge(id, { configuration: undefined })
}
}

141
src/xo-mixins/remotes.js Normal file
View File

@@ -0,0 +1,141 @@
import startsWith from 'lodash.startswith'
import RemoteHandler from '../remote-handler'
import { Remotes } from '../models/remote'
import {
NoSuchObject
} from '../api-errors'
import {
forEach,
mapToArray
} from '../utils'
// ===================================================================
class NoSuchRemote extends NoSuchObject {
constructor (id) {
super(id, 'remote')
}
}
// ===================================================================
export default class {
constructor (xo) {
this._remotes = new Remotes({
connection: xo._redis,
prefix: 'xo:remote',
indexes: ['enabled']
})
xo.on('start', async () => {
// TODO: Should it be private?
this.remoteHandler = new RemoteHandler()
await this.initRemotes()
await this.syncAllRemotes()
})
xo.on('stop', () => this.disableAllRemotes())
}
_developRemote (remote) {
const _remote = { ...remote }
if (startsWith(_remote.url, 'file://')) {
_remote.type = 'local'
_remote.path = _remote.url.slice(6)
} else if (startsWith(_remote.url, 'nfs://')) {
_remote.type = 'nfs'
const url = _remote.url.slice(6)
const [host, share] = url.split(':')
_remote.path = '/tmp/xo-server/mounts/' + _remote.id
_remote.host = host
_remote.share = share
}
return _remote
}
async getAllRemotes () {
return mapToArray(await this._remotes.get(), this._developRemote)
}
async _getRemote (id) {
const remote = await this._remotes.first(id)
if (!remote) {
throw new NoSuchRemote(id)
}
return remote
}
async getRemote (id) {
return this._developRemote((await this._getRemote(id)).properties)
}
async listRemote (id) {
const remote = await this.getRemote(id)
return this._listRemote(remote)
}
async _listRemote (remote) {
const fsRemotes = {
nfs: true,
local: true
}
if (remote.type in fsRemotes) {
return this._listRemoteBackups(remote)
}
throw new Error('Unhandled remote type')
}
async createRemote ({name, url}) {
let remote = await this._remotes.create(name, url)
return await this.updateRemote(remote.get('id'), {enabled: true})
}
async updateRemote (id, {name, url, enabled, error}) {
const remote = await this._getRemote(id)
this._updateRemote(remote, {name, url, enabled, error})
const props = await this.remoteHandler.sync(this._developRemote(remote.properties))
this._updateRemote(remote, props)
return await this._developRemote(this._remotes.save(remote).properties)
}
_updateRemote (remote, {name, url, enabled, error}) {
if (name) remote.set('name', name)
if (url) remote.set('url', url)
if (enabled !== undefined) remote.set('enabled', enabled)
if (error) {
remote.set('error', error)
} else {
remote.set('error', '')
}
}
async removeRemote (id) {
const remote = await this.getRemote(id)
await this.remoteHandler.forget(remote)
await this._remotes.remove(id)
}
// TODO: Should it be private?
async syncAllRemotes () {
const remotes = await this.getAllRemotes()
forEach(remotes, remote => {
this.updateRemote(remote.id, {})
})
}
// TODO: Should it be private?
async disableAllRemotes () {
const remotes = await this.getAllRemotes()
this.remoteHandler.disableAll(remotes)
}
// TODO: Should it be private?
async initRemotes () {
const remotes = await this.getAllRemotes()
if (!remotes || !remotes.length) {
await this.createRemote({name: 'default', url: 'file://var/lib/xoa-backups'})
}
}
}

View File

@@ -0,0 +1,80 @@
import Scheduler from '../scheduler'
import { Schedules } from '../models/schedule'
import {
NoSuchObject
} from '../api-errors'
// ===================================================================
class NoSuchSchedule extends NoSuchObject {
constructor (id) {
super(id, 'schedule')
}
}
// ===================================================================
export default class {
constructor (xo) {
this._schedules = new Schedules({
connection: xo._redis,
prefix: 'xo:schedule',
indexes: ['user_id', 'job']
})
const scheduler = this._scheduler = new Scheduler(xo)
xo.on('start', () => scheduler._loadSchedules())
xo.on('stop', () => scheduler.disableAll())
}
get scheduler () {
return this._scheduler
}
async _getSchedule (id) {
const schedule = await this._schedules.first(id)
if (!schedule) {
throw new NoSuchSchedule(id)
}
return schedule
}
async getSchedule (id) {
return (await this._getSchedule(id)).properties
}
async getAllSchedules () {
return await this._schedules.get()
}
async createSchedule (userId, {job, cron, enabled, name}) {
const schedule_ = await this._schedules.create(userId, job, cron, enabled, name)
const schedule = schedule_.properties
if (this.scheduler) {
this.scheduler.add(schedule)
}
return schedule
}
async updateSchedule (id, {job, cron, enabled, name}) {
const schedule = await this._getSchedule(id)
if (job) schedule.set('job', job)
if (cron) schedule.set('cron', cron)
if (enabled !== undefined) schedule.set('enabled', enabled)
if (name !== undefined) schedule.set('name', name)
await this._schedules.save(schedule)
if (this.scheduler) {
this.scheduler.update(schedule.properties)
}
}
async removeSchedule (id) {
await this._schedules.remove(id)
if (this.scheduler) {
this.scheduler.remove(id)
}
}
}

930
src/xo.js

File diff suppressed because it is too large Load Diff