Compare commits

..

1 Commits

Author SHA1 Message Date
Julien Fontanet
4bd7a76d47 feat(xen-api/Record#resolve_): resolve props named as types 2021-04-07 16:00:35 +02:00
402 changed files with 8890 additions and 3915 deletions

View File

@@ -13,13 +13,19 @@ module.exports = {
overrides: [
{
files: ['cli.{,c,m}js', '*-cli.{,c,m}js', '**/*cli*/**/*.{,c,m}js'],
files: ['cli.js', '*-cli.js', '**/*cli*/**/*.js'],
rules: {
'no-console': 'off',
},
},
],
parser: 'babel-eslint',
parserOptions: {
ecmaFeatures: {
legacyDecorators: true,
},
},
rules: {
// disabled because XAPI objects are using camel case
camelcase: ['off'],

8
.gitignore vendored
View File

@@ -11,7 +11,7 @@
/packages/*/dist/
/packages/*/node_modules/
/@xen-orchestra/proxy/src/app/mixins/index.mjs
/@xen-orchestra/proxy/src/app/mixins/index.js
/packages/vhd-cli/src/commands/index.js
@@ -19,9 +19,9 @@
/packages/xen-api/plot.dat
/packages/xo-server/.xo-server.*
/packages/xo-server/src/api/index.mjs
/packages/xo-server/src/xapi/mixins/index.mjs
/packages/xo-server/src/xo-mixins/index.mjs
/packages/xo-server/src/api/index.js
/packages/xo-server/src/xapi/mixins/index.js
/packages/xo-server/src/xo-mixins/index.js
/packages/xo-server-auth-ldap/ldap.cache.conf

View File

@@ -20,6 +20,9 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"files": [
"index.js"
],
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"

View File

@@ -48,7 +48,7 @@ import { createDebounceResource } from '@vates/disposable/debounceResource'
const debounceResource = createDebounceResource()
// it will wait for 10 seconds before calling the disposer
Disposable.use(debounceResource(getConnection(host), 10e3), connection => {})
using(debounceResource(getConnection(host), 10e3), connection => {})
```
### `debounceResource.flushAll()`

View File

@@ -30,7 +30,7 @@ import { createDebounceResource } from '@vates/disposable/debounceResource'
const debounceResource = createDebounceResource()
// it will wait for 10 seconds before calling the disposer
Disposable.use(debounceResource(getConnection(host), 10e3), connection => {})
using(debounceResource(getConnection(host), 10e3), connection => {})
```
### `debounceResource.flushAll()`

View File

@@ -23,7 +23,6 @@
},
"dependencies": {
"@vates/multi-key-map": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.2.0",
"ensure-array": "^1.0.0"
}

View File

@@ -44,4 +44,4 @@ You may:
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)
[AGPL-3.0-or-later](https://spdx.org/licenses/AGPL-3.0-or-later) © [Vates SAS](https://vates.fr)

View File

@@ -6,7 +6,7 @@ exports.parseDuration = value => {
}
const duration = ms(value)
if (duration === undefined) {
throw new TypeError(`not a valid duration: ${value}`)
throw new TypeError(`not a valid duration: ${duration}`)
}
return duration
}

View File

@@ -18,8 +18,8 @@
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.1.1",
"license": "AGPL-3.0-or-later",
"version": "0.1.0",
"engines": {
"node": ">=8.10"
},

View File

@@ -23,6 +23,9 @@
"engines": {
"node": ">=8.10"
},
"files": [
"index.js"
],
"scripts": {
"postversion": "npm publish --access public"
},

View File

@@ -31,6 +31,9 @@
"engines": {
"node": ">=6"
},
"files": [
"index.js"
],
"bin": "./index.js",
"scripts": {
"postversion": "npm publish --access public"

View File

@@ -24,6 +24,10 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"files": [
"index.js",
"legacy.js"
],
"engines": {
"node": ">=6"
},

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -26,14 +26,14 @@
"@babel/plugin-proposal-decorators": "^7.8.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.8.0",
"@babel/preset-env": "^7.7.4",
"cross-env": "^7.0.2",
"cross": "^1.0.0",
"rimraf": "^3.0.0"
},
"dependencies": {
"@vates/decorate-with": "^0.0.1",
"@xen-orchestra/log": "^0.2.0",
"core-js": "^3.6.4",
"golike-defer": "^0.5.1",
"lodash": "^4.17.15",
"object-hash": "^2.0.1"
},
"private": false,

View File

@@ -2,10 +2,9 @@
import 'core-js/features/symbol/async-iterator'
import assert from 'assert'
import defer from 'golike-defer'
import hash from 'object-hash'
import { createLogger } from '@xen-orchestra/log'
import { decorateWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
const log = createLogger('xo:audit-core')
@@ -66,7 +65,7 @@ export class AuditCore {
this._storage = storage
}
@decorateWith(defer)
@defer
async add($defer, subject, event, data) {
const time = Date.now()
$defer(await this._storage.acquireLock())
@@ -151,7 +150,7 @@ export class AuditCore {
}
}
@decorateWith(defer)
@defer
async deleteRangeAndRewrite($defer, newest, oldest) {
assert.notStrictEqual(newest, undefined)
assert.notStrictEqual(oldest, undefined)

View File

@@ -14,13 +14,25 @@ const configs = {
'@babel/plugin-proposal-pipeline-operator': {
proposal: 'minimal',
},
'@babel/preset-env': {
debug: !__TEST__,
'@babel/preset-env'(pkg) {
return {
debug: !__TEST__,
// disabled until https://github.com/babel/babel/issues/8323 is resolved
// loose: true,
// disabled until https://github.com/babel/babel/issues/8323 is resolved
// loose: true,
shippedProposals: true,
shippedProposals: true,
targets: (() => {
let node = (pkg.engines || {}).node
if (node !== undefined) {
const trimChars = '^=>~'
while (trimChars.includes(node[0])) {
node = node.slice(1)
}
}
return { browsers: pkg.browserslist, node }
})(),
}
},
}
@@ -32,21 +44,21 @@ const getConfig = (key, ...args) => {
// some plugins must be used in a specific order
const pluginsOrder = ['@babel/plugin-proposal-decorators', '@babel/plugin-proposal-class-properties']
module.exports = function (pkg, configs = {}) {
const plugins = {}
const presets = {}
module.exports = function (pkg, plugins, presets) {
plugins === undefined && (plugins = {})
presets === undefined && (presets = {})
Object.keys(pkg.devDependencies || {}).forEach(name => {
if (!(name in presets) && PLUGINS_RE.test(name)) {
plugins[name] = { ...getConfig(name, pkg), ...configs[name] }
plugins[name] = getConfig(name, pkg)
} else if (!(name in presets) && PRESETS_RE.test(name)) {
presets[name] = { ...getConfig(name, pkg), ...configs[name] }
presets[name] = getConfig(name, pkg)
}
})
return {
comments: !__PROD__,
ignore: __PROD__ ? [/\.spec\.js$/] : undefined,
ignore: __TEST__ ? undefined : [/\.spec\.js$/],
plugins: Object.keys(plugins)
.map(plugin => [plugin, plugins[plugin]])
.sort(([a], [b]) => {
@@ -55,15 +67,5 @@ module.exports = function (pkg, configs = {}) {
return oA !== -1 && oB !== -1 ? oA - oB : a < b ? -1 : 1
}),
presets: Object.keys(presets).map(preset => [preset, presets[preset]]),
targets: (() => {
let node = (pkg.engines || {}).node
if (node !== undefined) {
const trimChars = '^=>~'
while (trimChars.includes(node[0])) {
node = node.slice(1)
}
}
return { browsers: pkg.browserslist, node }
})(),
}
}

View File

@@ -7,16 +7,21 @@
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"dependencies": {
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.11.0",
"@xen-orchestra/fs": "^0.17.0",
"@xen-orchestra/backups": "^0.9.1",
"@xen-orchestra/fs": "^0.14.0",
"filenamify": "^4.1.0",
"getopts": "^2.2.5",
"lodash": "^4.17.15",
"promise-toolbox": "^0.19.2"
"promise-toolbox": "^0.18.0",
"vhd-lib": "^1.0.0"
},
"engines": {
"node": ">=7.10.1"
},
"files": [
"commands",
"*.js"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/backups-cli",
"name": "@xen-orchestra/backups-cli",
"repository": {
@@ -27,7 +32,7 @@
"scripts": {
"postversion": "npm publish --access public"
},
"version": "0.6.0",
"version": "0.5.0",
"license": "AGPL-3.0-or-later",
"author": {
"name": "Vates SAS",

View File

@@ -1,14 +1,14 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const limitConcurrency = require('limit-concurrency-decorator').default
const { compileTemplate } = require('@xen-orchestra/template')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
const { Task } = require('./Task.js')
const { VmBackup } = require('./_VmBackup.js')
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup')
const { Task } = require('./Task')
const { VmBackup } = require('./_VmBackup')
const { XoMetadataBackup } = require('./_XoMetadataBackup')
const noop = Function.prototype

View File

@@ -1,9 +1,8 @@
const assert = require('assert')
const { formatFilenameDate } = require('./_filenameDate.js')
const { importDeltaVm } = require('./_deltaVm.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
const { formatFilenameDate } = require('./_filenameDate')
const { importDeltaVm } = require('./_deltaVm')
const { Task } = require('./Task')
exports.ImportVmBackup = class ImportVmBackup {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses } = {} }) {
@@ -19,17 +18,13 @@ exports.ImportVmBackup = class ImportVmBackup {
const metadata = this._metadata
const isFull = metadata.mode === 'full'
const sizeContainer = { size: 0 }
let backup
if (isFull) {
backup = await adapter.readFullVmBackup(metadata)
watchStreamSize(backup, sizeContainer)
} else {
assert.strictEqual(metadata.mode, 'delta')
backup = await adapter.readDeltaVmBackup(metadata)
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
}
return Task.run(
@@ -57,7 +52,7 @@ exports.ImportVmBackup = class ImportVmBackup {
])
return {
size: sizeContainer.size,
size: metadata.size,
id: await xapi.getField('VM', vmRef, 'uuid'),
}
}

View File

@@ -1,24 +1,23 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable.js')
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromEvent = require('promise-toolbox/fromEvent.js')
const pDefer = require('promise-toolbox/defer.js')
const Disposable = require('promise-toolbox/Disposable')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
const pDefer = require('promise-toolbox/defer')
const pump = require('pump')
const { basename, dirname, join, normalize, resolve } = require('path')
const { createLogger } = require('@xen-orchestra/log')
const { createSyntheticStream, mergeVhd, default: Vhd } = require('vhd-lib')
const { deduped } = require('@vates/disposable/deduped.js')
const { deduped } = require('@vates/disposable/deduped')
const { execFile } = require('child_process')
const { readdir, stat } = require('fs-extra')
const { ZipFile } = require('yazl')
const { BACKUP_DIR } = require('./_getVmBackupDir.js')
const { cleanVm } = require('./_cleanVm.js')
const { getTmpDir } = require('./_getTmpDir.js')
const { isMetadataFile, isVhdFile } = require('./_backupType.js')
const { isValidXva } = require('./_isValidXva.js')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions.js')
const { lvs, pvs } = require('./_lvm.js')
const { BACKUP_DIR } = require('./_getVmBackupDir')
const { cleanVm } = require('./_cleanVm')
const { getTmpDir } = require('./_getTmpDir')
const { isMetadataFile, isVhdFile } = require('./_backupType')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions')
const { lvs, pvs } = require('./_lvm')
const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
exports.DIR_XO_CONFIG_BACKUPS = DIR_XO_CONFIG_BACKUPS
@@ -506,14 +505,21 @@ class RemoteAdapter {
}
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
await this._handler.outputStream(path, input, {
const handler = this._handler
input = await input
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await handler.createOutputStream(tmpPath, {
checksum,
dirMode: this._dirMode,
async validator() {
await input.task
return validator.apply(this, arguments)
},
})
try {
await Promise.all([fromCallback(pump, input, output), output.checksumWritten, input.task])
await validator(tmpPath)
await handler.rename(tmpPath, path, { checksum })
} catch (error) {
await handler.unlink(tmpPath, { checksum })
throw error
}
}
async readDeltaVmBackup(metadata) {
@@ -522,7 +528,7 @@ class RemoteAdapter {
const dir = dirname(metadata._filename)
const streams = {}
await asyncMapSettled(Object.keys(vdis), async id => {
await asyncMapSettled(Object.entries(vdis), async ([id, vdi]) => {
streams[`${id}.vhd`] = await createSyntheticStream(handler, join(dir, vhds[id]))
})
@@ -545,15 +551,8 @@ class RemoteAdapter {
}
}
Object.assign(RemoteAdapter.prototype, {
cleanVm(vmDir, { lock = true } = {}) {
if (lock) {
return Disposable.use(this._handler.lock(vmDir), () => cleanVm.apply(this, arguments))
} else {
return cleanVm.apply(this, arguments)
}
},
isValidXva,
})
RemoteAdapter.prototype.cleanVm = function (vmDir) {
return Disposable.use(this._handler.lock(vmDir), () => cleanVm.apply(this, arguments))
}
exports.RemoteAdapter = RemoteAdapter

View File

@@ -1,5 +1,5 @@
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup.js')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup')
exports.RestoreMetadataBackup = class RestoreMetadataBackup {
constructor({ backupId, handler, xapi }) {

View File

@@ -1,12 +1,11 @@
const CancelToken = require('promise-toolbox/CancelToken.js')
const Zone = require('node-zone')
const { SyncThenable } = require('./_syncThenable')
const logAfterEnd = () => {
throw new Error('task has already ended')
}
const noop = Function.prototype
// Create a serializable object from an error.
//
// Otherwise some fields might be non-enumerable and missing from logs.
@@ -20,132 +19,163 @@ const serializeError = error =>
stack: error.stack,
}
: error
exports.serializeError = serializeError
const $$task = Symbol('@xen-orchestra/backups/Task')
class Task {
static get cancelToken() {
const task = Zone.current.data[$$task]
return task !== undefined ? task.#cancelToken : CancelToken.none
class TaskLogger {
constructor(logFn, parentId) {
this._log = logFn
this._parentId = parentId
this._taskId = undefined
}
static run(opts, fn) {
return new this(opts).run(fn, true)
get taskId() {
const taskId = this._taskId
if (taskId === undefined) {
throw new Error('start the task first')
}
return taskId
}
static wrapFn(opts, fn) {
// create a subtask
fork() {
return new TaskLogger(this._log, this.taskId)
}
info(message, data) {
return this._log({
data,
event: 'info',
message,
taskId: this.taskId,
timestamp: Date.now(),
})
}
run(message, data, fn) {
if (arguments.length === 2) {
fn = data
data = undefined
}
return SyncThenable.tryUnwrap(
SyncThenable.fromFunction(() => {
if (this._taskId !== undefined) {
throw new Error('task has already started')
}
this._taskId = Math.random().toString(36).slice(2)
return this._log({
data,
event: 'start',
message,
parentId: this._parentId,
taskId: this.taskId,
timestamp: Date.now(),
})
})
.then(fn)
.then(
result => {
const log = this._log
this._log = logAfterEnd
return SyncThenable.resolve(
log({
event: 'end',
result,
status: 'success',
taskId: this.taskId,
timestamp: Date.now(),
})
).then(() => result)
},
error => {
const log = this._log
this._log = logAfterEnd
return SyncThenable.resolve(
log({
event: 'end',
result: serializeError(error),
status: 'failure',
taskId: this.taskId,
timestamp: Date.now(),
})
).then(() => {
throw error
})
}
)
)
}
warning(message, data) {
return this._log({
data,
event: 'warning',
message,
taskId: this.taskId,
timestamp: Date.now(),
})
}
wrapFn(fn, message, data) {
const logger = this
return function () {
const evaluate = v => (typeof v === 'function' ? v.apply(this, arguments) : v)
return logger.run(evaluate(message), evaluate(data), () => fn.apply(this, arguments))
}
}
}
const $$task = Symbol('current task logger')
const getCurrent = () => Zone.current.data[$$task]
const Task = {
info(message, data) {
const task = getCurrent()
if (task !== undefined) {
return task.info(message, data)
}
},
run({ name, data, onLog }, fn) {
let parentId
if (onLog === undefined) {
const parent = getCurrent()
if (parent === undefined) {
return fn()
}
onLog = parent._log
parentId = parent.taskId
}
const task = new TaskLogger(onLog, parentId)
const zone = Zone.current.fork('task')
zone.data[$$task] = task
return task.run(name, data, zone.wrap(fn))
},
warning(message, data) {
const task = getCurrent()
if (task !== undefined) {
return task.warning(message, data)
}
},
wrapFn(opts, fn) {
// compatibility with @decorateWith
if (typeof fn !== 'function') {
;[fn, opts] = [opts, fn]
}
const { name, data, onLog } = opts
return function () {
return Task.run(typeof opts === 'function' ? opts.apply(this, arguments) : opts, () => fn.apply(this, arguments))
const evaluate = v => (typeof v === 'function' ? v.apply(this, arguments) : v)
return Task.run({ name: evaluate(name), data: evaluate(data), onLog }, () => fn.apply(this, arguments))
}
}
#cancelToken
#id = Math.random().toString(36).slice(2)
#onLog
#zone
constructor({ name, data, onLog }) {
let parentCancelToken, parentId
if (onLog === undefined) {
const parent = Zone.current.data[$$task]
if (parent === undefined) {
onLog = noop
} else {
onLog = log => parent.#onLog(log)
parentCancelToken = parent.#cancelToken
parentId = parent.#id
}
}
const zone = Zone.current.fork('@xen-orchestra/backups/Task')
zone.data[$$task] = this
this.#zone = zone
const { cancel, token } = CancelToken.source(parentCancelToken && [parentCancelToken])
this.#cancelToken = token
this.cancel = cancel
this.#onLog = onLog
this.#log('start', {
data,
message: name,
parentId,
})
}
failure(error) {
this.#end('failure', serializeError(error))
}
info(message, data) {
this.#log('info', { data, message })
}
/**
* Run a function in the context of this task
*
* In case of error, the task will be failed.
*
* @typedef Result
* @param {() => Result)} fn
* @param {boolean} last - Whether the task should succeed if there is no error
* @returns Result
*/
run(fn, last = false) {
return this.#zone.run(() => {
try {
const result = fn()
let then
if (result != null && typeof (then = result.then) === 'function') {
then.call(result, last && (value => this.success(value)), error => this.failure(error))
} else if (last) {
this.success(result)
}
return result
} catch (error) {
this.failure(error)
throw error
}
})
}
success(value) {
this.#end('success', value)
}
warning(message, data) {
this.#log('warning', { data, message })
}
wrapFn(fn, last) {
const task = this
return function () {
return task.run(() => fn.apply(this, arguments), last)
}
}
#end(status, result) {
this.#log('end', { result, status })
this.#onLog = logAfterEnd
}
#log(event, props) {
this.#onLog({
...props,
event,
taskId: this.#id,
timestamp: Date.now(),
})
}
},
}
exports.Task = Task
for (const method of ['info', 'warning']) {
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)
}

View File

@@ -1,17 +1,32 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { importDeltaVm, TAG_COPY_SRC } = require('../_deltaVm.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { importDeltaVm, TAG_COPY_SRC } = require('./_deltaVm')
const { listReplicatedVms } = require('./_listReplicatedVms')
const { Task } = require('./Task')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
constructor(backup, sr, settings) {
this._backup = backup
this._settings = settings
this._sr = sr
this.transfer = Task.wrapFn(
{
name: 'export',
data: ({ deltaExport }) => ({
id: sr.uuid,
isFull: Object.values(deltaExport.vdis).some(vdi => vdi.other_config['xo:base_delta'] === undefined),
type: 'SR',
}),
},
this.transfer
)
}
exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinReplicationWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
const sr = this._sr
const replicatedVm = listReplicatedVms(sr.$xapi, this._backup.job.id, sr.uuid, this._backup.vm.uuid).find(
@@ -36,23 +51,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
}
}
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
id: this._sr.uuid,
isFull,
type: 'SR',
},
})
this.transfer = task.wrapFn(this.transfer)
this.cleanup = task.wrapFn(this.cleanup, true)
return task.run(() => this._prepare())
}
async _prepare() {
async prepare() {
const settings = this._settings
const { uuid: srUuid, $xapi: xapi } = this._sr
const { scheduleId, vm } = this._backup
@@ -64,12 +63,8 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
if (settings.deleteFirst) {
await this._deleteOldEntries()
}
}
async cleanup() {
if (!this._settings.deleteFirst) {
await this._deleteOldEntries()
} else {
this.cleanup = this._deleteOldEntries
}
}

View File

@@ -1,25 +1,42 @@
const assert = require('assert')
const map = require('lodash/map.js')
const mapValues = require('lodash/mapValues.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const map = require('lodash/map')
const mapValues = require('lodash/mapValues')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap } = require('@xen-orchestra/async-map')
const { chainVhd, checkVhdChain, default: Vhd } = require('vhd-lib')
const { createLogger } = require('@xen-orchestra/log')
const { dirname } = require('path')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { Task } = require('../Task.js')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { checkVhd } = require('./_checkVhd')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { getVmBackupDir } = require('./_getVmBackupDir')
const { packUuid } = require('./_packUuid')
const { Task } = require('./Task')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
exports.DeltaBackupWriter = class DeltaBackupWriter {
constructor(backup, remoteId, settings) {
this._adapter = backup.remoteAdapters[remoteId]
this._backup = backup
this._settings = settings
this.transfer = Task.wrapFn(
{
name: 'export',
data: ({ deltaExport }) => ({
id: remoteId,
isFull: Object.values(deltaExport.vdis).some(vdi => vdi.other_config['xo:base_delta'] === undefined),
type: 'remote',
}),
},
this.transfer
)
this[settings.deleteFirst ? 'prepare' : 'cleanup'] = this._deleteOldEntries
}
async checkBaseVdis(baseUuidToSrcVdi) {
const { handler } = this._adapter
const backup = this._backup
@@ -55,28 +72,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
})
}
async beforeBackup() {
await super.beforeBackup()
return this._cleanVm({ merge: true })
}
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
id: this._remoteId,
isFull,
type: 'remote',
},
})
this.transfer = task.wrapFn(this.transfer)
this.cleanup = task.wrapFn(this.cleanup, true)
return task.run(() => this._prepare())
}
async _prepare() {
async prepare() {
const adapter = this._adapter
const settings = this._settings
const { scheduleId, vm } = this._backup
@@ -103,12 +99,8 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
if (settings.deleteFirst) {
await this._deleteOldEntries()
}
}
async cleanup() {
if (!this._settings.deleteFirst) {
await this._deleteOldEntries()
} else {
this.cleanup = this._deleteOldEntries
}
}

View File

@@ -1,24 +1,23 @@
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMapSettled } = require('@xen-orchestra/async-map')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { listReplicatedVms } = require('./_listReplicatedVms')
const { Task } = require('./Task')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplicationWriter(AbstractFullWriter) {
constructor(props) {
super(props)
exports.DisasterRecoveryWriter = class DisasterRecoveryWriter {
constructor(backup, sr, settings) {
this._backup = backup
this._settings = settings
this._sr = sr
this.run = Task.wrapFn(
{
name: 'export',
data: {
id: props.sr.uuid,
id: sr.uuid,
type: 'SR',
// necessary?

View File

@@ -1,20 +1,20 @@
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { getVmBackupDir } = require('./_getVmBackupDir')
const { isValidXva } = require('./isValidXva')
const { Task } = require('./Task')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(AbstractFullWriter) {
constructor(props) {
super(props)
exports.FullBackupWriter = class FullBackupWriter {
constructor(backup, remoteId, settings) {
this._backup = backup
this._remoteId = remoteId
this._settings = settings
this.run = Task.wrapFn(
{
name: 'export',
data: {
id: props.remoteId,
id: remoteId,
type: 'remote',
// necessary?
@@ -27,11 +27,12 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
async run({ timestamp, sizeContainer, stream }) {
const backup = this._backup
const remoteId = this._remoteId
const settings = this._settings
const { job, scheduleId, vm } = backup
const adapter = this._adapter
const adapter = backup.remoteAdapters[remoteId]
const handler = adapter.handler
const backupDir = getVmBackupDir(vm.uuid)
@@ -67,7 +68,11 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
validator: tmpPath => adapter.isValidXva(tmpPath),
validator: tmpPath => {
if (handler._getFilePath !== undefined) {
return isValidXva(handler._getFilePath('/' + tmpPath))
}
},
})
return { size: sizeContainer.size }
})

View File

@@ -1,9 +1,9 @@
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { formatFilenameDate } = require('./_filenameDate.js')
const { Task } = require('./Task.js')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe')
const { formatFilenameDate } = require('./_filenameDate')
const { Task } = require('./Task')
const PATH_DB_DUMP = '/pool/xmldbdump'
exports.PATH_DB_DUMP = PATH_DB_DUMP

View File

@@ -1,31 +1,23 @@
const assert = require('assert')
const findLast = require('lodash/findLast.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const keyBy = require('lodash/keyBy.js')
const mapValues = require('lodash/mapValues.js')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const findLast = require('lodash/findLast')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const keyBy = require('lodash/keyBy')
const mapValues = require('lodash/mapValues')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { defer } = require('golike-defer')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { DeltaBackupWriter } = require('./writers/DeltaBackupWriter.js')
const { DeltaReplicationWriter } = require('./writers/DeltaReplicationWriter.js')
const { exportDeltaVm } = require('./_deltaVm.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { FullBackupWriter } = require('./writers/FullBackupWriter.js')
const { FullReplicationWriter } = require('./writers/FullReplicationWriter.js')
const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
const { ContinuousReplicationWriter } = require('./_ContinuousReplicationWriter')
const { DeltaBackupWriter } = require('./_DeltaBackupWriter')
const { DisasterRecoveryWriter } = require('./_DisasterRecoveryWriter')
const { exportDeltaVm } = require('./_deltaVm')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe')
const { FullBackupWriter } = require('./_FullBackupWriter')
const { getOldEntries } = require('./_getOldEntries')
const { Task } = require('./Task')
const { watchStreamSize } = require('./_watchStreamSize')
const { debug, warn } = createLogger('xo:backups:VmBackup')
const asyncEach = async (iterable, fn, thisArg = iterable) => {
for (const item of iterable) {
await fn.call(thisArg, item)
}
}
const forkDeltaExport = deltaExport =>
Object.create(deltaExport, {
streams: {
@@ -70,12 +62,12 @@ exports.VmBackup = class VmBackup {
// Create writers
{
const writers = new Set()
const writers = []
this._writers = writers
const [BackupWriter, ReplicationWriter] = this._isDelta
? [DeltaBackupWriter, DeltaReplicationWriter]
: [FullBackupWriter, FullReplicationWriter]
? [DeltaBackupWriter, ContinuousReplicationWriter]
: [FullBackupWriter, DisasterRecoveryWriter]
const allSettings = job.settings
@@ -85,7 +77,7 @@ exports.VmBackup = class VmBackup {
...allSettings[remoteId],
}
if (targetSettings.exportRetention !== 0) {
writers.add(new BackupWriter({ backup: this, remoteId, settings: targetSettings }))
writers.push(new BackupWriter(this, remoteId, targetSettings))
}
})
srs.forEach(sr => {
@@ -94,31 +86,12 @@ exports.VmBackup = class VmBackup {
...allSettings[sr.uuid],
}
if (targetSettings.copyRetention !== 0) {
writers.add(new ReplicationWriter({ backup: this, sr, settings: targetSettings }))
writers.push(new ReplicationWriter(this, sr, targetSettings))
}
})
}
}
// calls fn for each function, warns of any errors, and throws only if there are no writers left
async _callWriters(fn, warnMessage, parallel = true) {
const writers = this._writers
if (writers.size === 0) {
return
}
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
try {
await fn(writer)
} catch (error) {
this.delete(writer)
warn(warnMessage, { error, writer: writer.constructor.name })
}
})
if (writers.size === 0) {
throw new Error('all targets have failed, step: ' + warnMessage)
}
}
// ensure the VM itself does not have any backup metadata which would be
// copied on manual snapshots and interfere with the backup jobs
async _cleanMetadata() {
@@ -141,8 +114,7 @@ exports.VmBackup = class VmBackup {
const settings = this._settings
const doSnapshot =
this._isDelta || (!settings.offlineBackup && vm.power_state === 'Running') || settings.snapshotRetention !== 0
const doSnapshot = this._isDelta || vm.power_state === 'Running' || settings.snapshotRetention !== 0
if (doSnapshot) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
@@ -174,28 +146,31 @@ exports.VmBackup = class VmBackup {
async _copyDelta() {
const { exportedVm } = this
const baseVm = this._baseVm
const fullVdisRequired = this._fullVdisRequired
const isFull = fullVdisRequired === undefined || fullVdisRequired.size !== 0
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
await asyncMap(this._writers, writer => writer.prepare && writer.prepare())
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
fullVdisRequired,
fullVdisRequired: this._fullVdisRequired,
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
const sizeContainers = mapValues(deltaExport.streams, watchStreamSize)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.transfer({
await asyncMap(this._writers, async writer => {
try {
await writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
sizeContainers,
timestamp,
}),
'writer.transfer()'
)
})
} catch (error) {
warn('copy failure', {
error,
target: writer.target,
vm: this.vm,
})
}
})
this._baseVm = exportedVm
@@ -220,7 +195,7 @@ exports.VmBackup = class VmBackup {
size,
})
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
await asyncMap(this._writers, writer => writer.cleanup && writer.cleanup())
}
async _copyFull() {
@@ -233,15 +208,21 @@ exports.VmBackup = class VmBackup {
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
await asyncMap(this._writers, async writer => {
try {
await writer.run({
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
}),
'writer.run()'
)
})
} catch (error) {
warn('copy failure', {
error,
target: writer.target,
vm: this.vm,
})
}
})
const { size } = sizeContainer
const end = Date.now()
@@ -315,11 +296,14 @@ exports.VmBackup = class VmBackup {
})
const presentBaseVdis = new Map(baseUuidToSrcVdi)
await this._callWriters(
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
'writer.checkBaseVdis()',
false
)
const writers = this._writers
for (let i = 0, n = writers.length; presentBaseVdis.size !== 0 && i < n; ++i) {
await writers[i].checkBaseVdis(presentBaseVdis, baseVm)
}
if (presentBaseVdis.size === 0) {
return
}
const fullVdisRequired = new Set()
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
@@ -332,19 +316,7 @@ exports.VmBackup = class VmBackup {
this._fullVdisRequired = fullVdisRequired
}
run = defer(this.run)
async run($defer) {
const settings = this._settings
assert(
!settings.offlineBackup || settings.snapshotRetention === 0,
'offlineBackup is not compatible with snapshotRetention'
)
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(() => writer.afterBackup())
}, 'writer.beforeBackup()')
async run() {
await this._fetchJobSnapshots()
if (this._isDelta) {
@@ -354,7 +326,7 @@ exports.VmBackup = class VmBackup {
await this._cleanMetadata()
await this._removeUnusedSnapshots()
const { vm } = this
const { _settings: settings, vm } = this
const isRunning = vm.power_state === 'Running'
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
if (startAfter) {
@@ -367,7 +339,7 @@ exports.VmBackup = class VmBackup {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
if (this._writers.size !== 0) {
if (this._writers.length !== 0) {
await (this._isDelta ? this._copyDelta() : this._copyFull())
}
} finally {

View File

@@ -1,8 +1,8 @@
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter.js')
const { formatFilenameDate } = require('./_filenameDate.js')
const { Task } = require('./Task.js')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter')
const { formatFilenameDate } = require('./_filenameDate')
const { Task } = require('./Task')
exports.XoMetadataBackup = class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) {

View File

@@ -1,19 +1,15 @@
require('@xen-orchestra/log/configure.js').catchGlobalErrors(
require('@xen-orchestra/log').createLogger('xo:backups:worker')
)
const Disposable = require('promise-toolbox/Disposable.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { compose } = require('@vates/compose')
const { createDebounceResource } = require('@vates/disposable/debounceResource.js')
const { deduped } = require('@vates/disposable/deduped.js')
const { createDebounceResource } = require('@vates/disposable/debounceResource')
const { deduped } = require('@vates/disposable/deduped')
const { getHandler } = require('@xen-orchestra/fs')
const { parseDuration } = require('@vates/parse-duration')
const { Xapi } = require('@xen-orchestra/xapi')
const { Backup } = require('./Backup.js')
const { RemoteAdapter } = require('./RemoteAdapter.js')
const { Task } = require('./Task.js')
const { Backup } = require('./Backup')
const { RemoteAdapter } = require('./RemoteAdapter')
const { Task } = require('./Task')
class BackupWorker {
#config

View File

@@ -1,5 +1,5 @@
const cancelable = require('promise-toolbox/cancelable.js')
const CancelToken = require('promise-toolbox/CancelToken.js')
const cancelable = require('promise-toolbox/cancelable')
const CancelToken = require('promise-toolbox/CancelToken')
// Similar to `Promise.all` + `map` but pass a cancel token to the callback
//

View File

@@ -1,10 +1,11 @@
const assert = require('assert')
const limitConcurrency = require('limit-concurrency-decorator').default
const { asyncMap } = require('@xen-orchestra/async-map')
const { default: Vhd, mergeVhd } = require('vhd-lib')
const { dirname, resolve } = require('path')
const { DISK_TYPE_DIFFERENCING } = require('vhd-lib/dist/_constants.js')
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { DISK_TYPE_DIFFERENCING } = require('vhd-lib/dist/_constants')
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType')
const { isValidXva } = require('./isValidXva')
// chain is an array of VHDs from child to parent
//
@@ -34,8 +35,6 @@ const mergeVhdChain = limitConcurrency(1)(async function mergeVhdChain(chain, {
child = children[0]
}
onLog(`merging ${child} into ${parent}`)
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
@@ -60,59 +59,19 @@ const mergeVhdChain = limitConcurrency(1)(async function mergeVhdChain(chain, {
)
clearInterval(handle)
await Promise.all([
handler.rename(parent, child),
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
if (remove) {
onLog(`deleting unused VHD ${child}`)
return handler.unlink(child)
}
}),
])
}
await Promise.all([
remove && handler.rename(parent, child),
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
return remove && handler.unlink(child)
}),
])
})
const noop = Function.prototype
const INTERRUPTED_VHDS_REG = /^(?:(.+)\/)?\.(.+)\.merge.json$/
const listVhds = async (handler, vmDir) => {
const vhds = []
const interruptedVhds = new Set()
await asyncMap(
await handler.list(`${vmDir}/vdis`, {
ignoreMissing: true,
prependDir: true,
}),
async jobDir =>
asyncMap(
await handler.list(jobDir, {
prependDir: true,
}),
async vdiDir => {
const list = await handler.list(vdiDir, {
filter: file => isVhdFile(file) || INTERRUPTED_VHDS_REG.test(file),
prependDir: true,
})
list.forEach(file => {
const res = INTERRUPTED_VHDS_REG.exec(file)
if (res === null) {
vhds.push(file)
} else {
const [, dir, file] = res
interruptedVhds.add(`${dir}/${file}`)
}
})
}
)
)
return { vhds, interruptedVhds }
}
exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop }) {
const handler = this._handler
@@ -120,34 +79,37 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
const vhdParents = { __proto__: null }
const vhdChildren = { __proto__: null }
const vhdsList = await listVhds(handler, vmDir)
// remove broken VHDs
await asyncMap(vhdsList.vhds, async path => {
try {
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter(!vhdsList.interruptedVhds.has(path))
vhds.add(path)
if (vhd.footer.diskType === DISK_TYPE_DIFFERENCING) {
const parent = resolve('/', dirname(path), vhd.header.parentUnicodeName)
vhdParents[path] = parent
if (parent in vhdChildren) {
const error = new Error('this script does not support multiple VHD children')
error.parent = parent
error.child1 = vhdChildren[parent]
error.child2 = path
throw error // should we throw?
await asyncMap(
await handler.list(`${vmDir}/vdis`, {
filter: isVhdFile,
prependDir: true,
}),
async path => {
try {
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter()
vhds.add(path)
if (vhd.footer.diskType === DISK_TYPE_DIFFERENCING) {
const parent = resolve(dirname(path), vhd.header.parentUnicodeName)
vhdParents[path] = parent
if (parent in vhdChildren) {
const error = new Error('this script does not support multiple VHD children')
error.parent = parent
error.child1 = vhdChildren[parent]
error.child2 = path
throw error // should we throw?
}
vhdChildren[parent] = path
}
} catch (error) {
onLog(`error while checking the VHD with path ${path}`)
if (error?.code === 'ERR_ASSERTION' && remove) {
await handler.unlink(path)
}
vhdChildren[parent] = path
}
} catch (error) {
onLog(`error while checking the VHD with path ${path}`, { error })
if (error?.code === 'ERR_ASSERTION' && remove) {
onLog(`deleting broken ${path}`)
await handler.unlink(path)
}
}
})
)
// remove VHDs with missing ancestors
{
@@ -170,7 +132,6 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
onLog(`the parent ${parent} of the VHD ${vhd} is missing`)
if (remove) {
onLog(`deleting orphan VHD ${vhd}`)
deletions.push(handler.unlink(vhd))
}
}
@@ -206,7 +167,7 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
await asyncMap(xvas, async path => {
// check is not good enough to delete the file, the best we can do is report
// it
if (!(await this.isValidXva(path))) {
if (!(await isValidXva(path))) {
onLog(`the XVA with path ${path} is potentially broken`)
}
})
@@ -220,21 +181,20 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
const metadata = JSON.parse(await handler.readFile(json))
const { mode } = metadata
if (mode === 'full') {
const linkedXva = resolve('/', vmDir, metadata.xva)
const linkedXva = resolve(vmDir, metadata.xva)
if (xvas.has(linkedXva)) {
unusedXvas.delete(linkedXva)
} else {
onLog(`the XVA linked to the metadata ${json} is missing`)
if (remove) {
onLog(`deleting incomplete backup ${json}`)
await handler.unlink(json)
}
}
} else if (mode === 'delta') {
const linkedVhds = (() => {
const { vhds } = metadata
return Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
return Object.keys(vhds).map(key => resolve(vmDir, vhds[key]))
})()
// FIXME: find better approach by keeping as much of the backup as
@@ -244,7 +204,6 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
} else {
onLog(`Some VHDs linked to the metadata ${json} are missing`)
if (remove) {
onLog(`deleting incomplete backup ${json}`)
await handler.unlink(json)
}
}
@@ -285,7 +244,6 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
onLog(`the VHD ${vhd} is unused`)
if (remove) {
onLog(`deleting unused VHD ${vhd}`)
unusedVhdsDeletion.push(handler.unlink(vhd))
}
}
@@ -294,38 +252,25 @@ exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop })
vhdChainsToMerge[vhd] = getUsedChildChainOrDelete(vhd)
})
// merge interrupted VHDs
if (merge) {
vhdsList.interruptedVhds.forEach(parent => {
vhdChainsToMerge[parent] = [vhdChildren[parent], parent]
})
}
Object.keys(vhdChainsToMerge).forEach(key => {
const chain = vhdChainsToMerge[key]
if (chain !== undefined) {
unusedVhdsDeletion.push(mergeVhdChain(chain, { handler, onLog, remove, merge }))
unusedVhdsDeletion.push(mergeVhdChain(chain, { onLog, remove, merge }))
}
})
}
await Promise.all([
...unusedVhdsDeletion,
unusedVhdsDeletion,
asyncMap(unusedXvas, path => {
onLog(`the XVA ${path} is unused`)
if (remove) {
onLog(`deleting unused XVA ${path}`)
return handler.unlink(path)
}
return remove && handler.unlink(path)
}),
asyncMap(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
onLog(`the XVA checksum ${path} is unused`)
if (remove) {
onLog(`deleting unused XVA checksum ${path}`)
return handler.unlink(path)
}
return remove && handler.unlink(path)
}
}),
])

View File

@@ -1,14 +1,14 @@
const compareVersions = require('compare-versions')
const find = require('lodash/find.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const omit = require('lodash/omit.js')
const defer = require('golike-defer').default
const find = require('lodash/find')
const groupBy = require('lodash/groupBy')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox')
const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer')
const { cancelableMap } = require('./_cancelableMap.js')
const { cancelableMap } = require('./_cancelableMap')
const TAG_BASE_DELTA = 'xo:base_delta'
exports.TAG_BASE_DELTA = TAG_BASE_DELTA

View File

@@ -1,4 +1,4 @@
const Disposable = require('promise-toolbox/Disposable.js')
const Disposable = require('promise-toolbox/Disposable')
const { join } = require('path')
const { mkdir, rmdir } = require('fs-extra')
const { tmpdir } = require('os')
@@ -10,7 +10,7 @@ exports.getTmpDir = async function getTmpDir() {
const path = join(tmpdir(), Math.random().toString(36).slice(2))
try {
await mkdir(path)
return new Disposable(() => rmdir(path), path)
return new Disposable(path, () => rmdir(path))
} catch (error) {
if (i === MAX_ATTEMPTS) {
throw error

View File

@@ -1,4 +1,4 @@
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromCallback = require('promise-toolbox/fromCallback')
const { createLogger } = require('@xen-orchestra/log')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')

View File

@@ -1,4 +1,4 @@
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromCallback = require('promise-toolbox/fromCallback')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')

View File

@@ -0,0 +1,46 @@
function fulfilledThen(cb) {
return typeof cb === 'function' ? SyncThenable.fromFunction(cb, this.value) : this
}
function rejectedThen(_, cb) {
return typeof cb === 'function' ? SyncThenable.fromFunction(cb, this.value) : this
}
class SyncThenable {
static resolve(value) {
if (value != null && typeof value.then === 'function') {
return value
}
return new this(false, value)
}
static fromFunction(fn, ...arg) {
try {
return this.resolve(fn(...arg))
} catch (error) {
return this.reject(error)
}
}
static reject(reason) {
return new this(true, reason)
}
// unwrap if it's a SyncThenable
static tryUnwrap(value) {
if (value instanceof this) {
if (value.then === rejectedThen) {
throw value.value
}
return value.value
}
return value
}
constructor(rejected, value) {
this.then = rejected ? rejectedThen : fulfilledThen
this.value = value
}
}
exports.SyncThenable = SyncThenable

View File

@@ -1,4 +1,5 @@
exports.watchStreamSize = function watchStreamSize(stream, container = { size: 0 }) {
exports.watchStreamSize = function watchStreamSize(stream) {
const container = { size: 0 }
stream.on('data', data => {
container.size += data.length
})

View File

@@ -1,4 +1,4 @@
const mapValues = require('lodash/mapValues.js')
const mapValues = require('lodash/mapValues')
const { dirname } = require('path')
function formatVmBackup(backup) {

View File

@@ -1,10 +1,10 @@
const assert = require('assert')
const fs = require('fs-extra')
const isGzipFile = async (handler, fd) => {
const isGzipFile = async fd => {
// https://tools.ietf.org/html/rfc1952.html#page-5
const magicNumber = Buffer.allocUnsafe(2)
assert.strictEqual((await handler.read(fd, magicNumber, 0)).bytesRead, magicNumber.length)
assert.strictEqual((await fs.read(fd, magicNumber, 0, magicNumber.length, 0)).bytesRead, magicNumber.length)
return magicNumber[0] === 31 && magicNumber[1] === 139
}
@@ -21,33 +21,32 @@ const isGzipFile = async (handler, fd) => {
// /^Ref:\d+/\d+\.checksum$/ and then validating the tar structure from it
//
// https://github.com/npm/node-tar/issues/234#issuecomment-538190295
const isValidTar = async (handler, size, fd) => {
const isValidTar = async (size, fd) => {
if (size <= 1024 || size % 512 !== 0) {
return false
}
const buf = Buffer.allocUnsafe(1024)
assert.strictEqual((await handler.read(fd, buf, size - buf.length)).bytesRead, buf.length)
assert.strictEqual((await fs.read(fd, buf, 0, buf.length, size - buf.length)).bytesRead, buf.length)
return buf.every(_ => _ === 0)
}
// TODO: find an heuristic for compressed files
async function isValidXva(path) {
const handler = this._handler
const isValidXva = async path => {
try {
const fd = await handler.openFile(path, 'r')
const fd = await fs.open(path, 'r')
try {
const size = await handler.getSize(fd)
const { size } = await fs.fstat(fd)
if (size < 20) {
// neither a valid gzip not tar
return false
}
return (await isGzipFile(handler, fd))
return (await isGzipFile(fd))
? true // gzip files cannot be validated at this time
: await isValidTar(handler, size, fd)
: await isValidTar(size, fd)
} finally {
handler.closeFile(fd).catch(noop)
fs.close(fd).catch(noop)
}
} catch (error) {
// never throw, log and report as valid to avoid side effects

View File

@@ -8,9 +8,9 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"version": "0.11.0",
"version": "0.9.1",
"engines": {
"node": ">=14.6"
"node": ">=14.5"
},
"scripts": {
"postversion": "npm publish --access public"
@@ -18,27 +18,28 @@
"dependencies": {
"@vates/compose": "^2.0.0",
"@vates/disposable": "^0.1.1",
"@vates/parse-duration": "^0.1.1",
"@vates/multi-key-map": "^0.1.0",
"@vates/parse-duration": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^0.17.0",
"@xen-orchestra/fs": "^0.14.0",
"@xen-orchestra/log": "^0.2.0",
"@xen-orchestra/template": "^0.1.0",
"compare-versions": "^3.6.0",
"d3-time-format": "^3.0.0",
"end-of-stream": "^1.4.4",
"ensure-array": "^1.0.0",
"fs-extra": "^9.0.0",
"golike-defer": "^0.5.1",
"limit-concurrency-decorator": "^0.5.0",
"limit-concurrency-decorator": "^0.4.0",
"lodash": "^4.17.20",
"node-zone": "^0.4.0",
"parse-pairs": "^1.1.0",
"pump": "^3.0.0",
"promise-toolbox": "^0.19.2",
"promise-toolbox": "^0.18.0",
"vhd-lib": "^1.0.0",
"yazl": "^2.5.1"
},
"peerDependencies": {
"@xen-orchestra/xapi": "^0.6.2"
"@xen-orchestra/xapi": "^0.6.0"
},
"license": "AGPL-3.0-or-later",
"author": {

View File

@@ -1,4 +1,4 @@
const { DIR_XO_CONFIG_BACKUPS, DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { DIR_XO_CONFIG_BACKUPS, DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
exports.parseMetadataBackupId = function parseMetadataBackupId(backupId) {
const [dir, ...rest] = backupId.split('/')

View File

@@ -1,19 +0,0 @@
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractDeltaWriter = class AbstractDeltaWriter extends AbstractWriter {
checkBaseVdis(baseUuidToSrcVdi, baseVm) {
throw new Error('Not implemented')
}
cleanup() {
throw new Error('Not implemented')
}
prepare({ isFull }) {
throw new Error('Not implemented')
}
transfer({ timestamp, deltaExport, sizeContainers }) {
throw new Error('Not implemented')
}
}

View File

@@ -1,7 +0,0 @@
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractFullWriter = class AbstractFullWriter extends AbstractWriter {
run({ timestamp, sizeContainer, stream }) {
throw new Error('Not implemented')
}
}

View File

@@ -1,10 +0,0 @@
exports.AbstractWriter = class AbstractWriter {
constructor({ backup, settings }) {
this._backup = backup
this._settings = settings
}
beforeBackup() {}
afterBackup() {}
}

View File

@@ -1,34 +0,0 @@
const { createLogger } = require('@xen-orchestra/log')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { warn } = createLogger('xo:backups:MixinBackupWriter')
exports.MixinBackupWriter = (BaseClass = Object) =>
class MixinBackupWriter extends BaseClass {
constructor({ remoteId, ...rest }) {
super(rest)
this._adapter = rest.backup.remoteAdapters[remoteId]
this._remoteId = remoteId
this._lock = undefined
}
_cleanVm(options) {
return this._adapter
.cleanVm(getVmBackupDir(this._backup.vm.uuid), { ...options, onLog: warn, lock: false })
.catch(warn)
}
async beforeBackup() {
const { handler } = this._adapter
const vmBackupDir = getVmBackupDir(this._backup.vm.uuid)
await handler.mktree(vmBackupDir)
this._lock = await handler.lock(vmBackupDir)
}
async afterBackup() {
await this._cleanVm({ remove: true, merge: true })
await this._lock.dispose()
}
}

View File

@@ -1,8 +0,0 @@
exports.MixinReplicationWriter = (BaseClass = Object) =>
class MixinReplicationWriter extends BaseClass {
constructor({ sr, ...rest }) {
super(rest)
this._sr = sr
}
}

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env node
const defer = require('golike-defer').default
const { Ref, Xapi } = require('xen-api')
const { defer } = require('golike-defer')
const pkg = require('./package.json')

View File

@@ -18,7 +18,7 @@
"preferGlobal": true,
"dependencies": {
"golike-defer": "^0.5.1",
"xen-api": "^0.32.0"
"xen-api": "^0.31.0"
},
"scripts": {
"postversion": "npm publish"

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -28,6 +28,9 @@
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
@@ -42,6 +45,7 @@
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"@babel/preset-flow": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},

View File

@@ -16,13 +16,30 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
"engines": {
"node": ">=6"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -11,7 +11,7 @@
// process.env.http_proxy
// ])
// ```
function defined() {
export default function defined() {
let args = arguments
let n = args.length
if (n === 1) {
@@ -29,7 +29,6 @@ function defined() {
}
}
}
module.exports = exports = defined
// Usage:
//
@@ -40,7 +39,7 @@ module.exports = exports = defined
// const getFriendName = _ => _.friends[0].name
// const friendName = get(getFriendName, props.user)
// ```
function get(accessor, arg) {
export const get = (accessor, arg) => {
try {
return accessor(arg)
} catch (error) {
@@ -50,7 +49,6 @@ function get(accessor, arg) {
}
}
}
exports.get = get
// Usage:
//
@@ -60,6 +58,4 @@ exports.get = get
// _ => new ProxyAgent(_)
// )
// ```
exports.ifDef = function ifDef(value, thenFn) {
return value !== undefined ? thenFn(value) : value
}
export const ifDef = (value, thenFn) => (value !== undefined ? thenFn(value) : value)

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -16,13 +16,30 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
"engines": {
"node": ">=6"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -1,4 +1,4 @@
module.exports = function emitAsync(event) {
export default function emitAsync(event) {
let opts
let i = 1

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/fs",
"version": "0.17.0",
"version": "0.14.0",
"license": "AGPL-3.0-or-later",
"description": "The File System for Xen Orchestra backups.",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/fs",
@@ -13,28 +13,30 @@
},
"preferGlobal": true,
"main": "dist/",
"files": [
"dist/"
],
"engines": {
"node": ">=14"
"node": ">=8.10"
},
"dependencies": {
"@marsaud/smb2": "^0.17.2",
"@sindresorhus/df": "^3.1.1",
"@sullux/aws-sdk": "^1.0.5",
"@vates/coalesce-calls": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"aws-sdk": "^2.686.0",
"decorator-synchronized": "^0.5.0",
"execa": "^5.0.0",
"fs-extra": "^9.0.0",
"get-stream": "^6.0.0",
"limit-concurrency-decorator": "^0.5.0",
"limit-concurrency-decorator": "^0.4.0",
"lodash": "^4.17.4",
"minio": "^7.0.18",
"promise-toolbox": "^0.19.2",
"promise-toolbox": "^0.18.0",
"proper-lockfile": "^4.1.2",
"readable-stream": "^3.0.6",
"through2": "^4.0.2",
"xo-remote-parser": "^0.7.0"
"tmp": "^0.2.1",
"xo-remote-parser": "^0.6.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
@@ -43,10 +45,12 @@
"@babel/plugin-proposal-function-bind": "^7.0.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.4.4",
"@babel/preset-env": "^7.0.0",
"@babel/preset-flow": "^7.0.0",
"async-iterator-to-stream": "^1.1.0",
"babel-plugin-lodash": "^3.3.2",
"cross-env": "^7.0.2",
"dotenv": "^8.0.0",
"index-modules": "^0.3.0",
"rimraf": "^3.0.0"
},
"scripts": {

View File

@@ -1,21 +1,33 @@
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
// @flow
// $FlowFixMe
import getStream from 'get-stream'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import synchronized from 'decorator-synchronized'
import { coalesceCalls } from '@vates/coalesce-calls'
import { fromCallback, fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { parse } from 'xo-remote-parser'
import { pipeline } from 'stream'
import { randomBytes } from 'crypto'
import { type Readable, type Writable } from 'stream'
import normalizePath from './_normalizePath'
import { createChecksumStream, validChecksumOfReadStream } from './checksum'
const { dirname } = path.posix
type Data = Buffer | Readable | string
type Disposable<T> = {| dispose: () => void | Promise<void>, value?: T |}
type FileDescriptor = {| fd: mixed, path: string |}
type LaxReadable = Readable & Object
type LaxWritable = Writable & Object
type RemoteInfo = { used?: number, size?: number }
type File = FileDescriptor | string
const checksumFile = file => file + '.checksum'
const computeRate = (hrtime, size) => {
const computeRate = (hrtime: number[], size: number) => {
const seconds = hrtime[0] + hrtime[1] / 1e9
return size / seconds
}
@@ -29,8 +41,6 @@ const ignoreEnoent = error => {
}
}
const noop = Function.prototype
class PrefixWrapper {
constructor(handler, prefix) {
this._prefix = prefix
@@ -63,7 +73,11 @@ class PrefixWrapper {
}
export default class RemoteHandlerAbstract {
constructor(remote, options = {}) {
_highWaterMark: number
_remote: Object
_timeout: number
constructor(remote: any, options: Object = {}) {
if (remote.url === 'test://') {
this._remote = remote
} else {
@@ -74,7 +88,7 @@ export default class RemoteHandlerAbstract {
}
;({ highWaterMark: this._highWaterMark, timeout: this._timeout = DEFAULT_TIMEOUT } = options)
const sharedLimit = limitConcurrency(options.maxParallelOperations ?? DEFAULT_MAX_PARALLEL_OPERATIONS)
const sharedLimit = limit(options.maxParallelOperations ?? DEFAULT_MAX_PARALLEL_OPERATIONS)
this.closeFile = sharedLimit(this.closeFile)
this.getInfo = sharedLimit(this.getInfo)
this.getSize = sharedLimit(this.getSize)
@@ -90,28 +104,25 @@ export default class RemoteHandlerAbstract {
this.unlink = sharedLimit(this.unlink)
this.write = sharedLimit(this.write)
this.writeFile = sharedLimit(this.writeFile)
this._forget = coalesceCalls(this._forget)
this._sync = coalesceCalls(this._sync)
}
// Public members
get type() {
get type(): string {
throw new Error('Not implemented')
}
addPrefix(prefix) {
addPrefix(prefix: string) {
prefix = normalizePath(prefix)
return prefix === '/' ? this : new PrefixWrapper(this, prefix)
}
async closeFile(fd) {
async closeFile(fd: FileDescriptor): Promise<void> {
await this.__closeFile(fd)
}
// TODO: remove method
async createOutputStream(file, { checksum = false, dirMode, ...options } = {}) {
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
@@ -138,6 +149,7 @@ export default class RemoteHandlerAbstract {
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
@@ -145,7 +157,10 @@ export default class RemoteHandlerAbstract {
return checksumStream
}
createReadStream(file, { checksum = false, ignoreMissingChecksum = false, ...options } = {}) {
createReadStream(
file: File,
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
): Promise<LaxReadable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
@@ -182,7 +197,7 @@ export default class RemoteHandlerAbstract {
checksum =>
streamP.then(stream => {
const { length } = stream
stream = validChecksumOfReadStream(stream, String(checksum).trim())
stream = (validChecksumOfReadStream(stream, String(checksum).trim()): LaxReadable)
stream.length = length
return stream
@@ -196,31 +211,16 @@ export default class RemoteHandlerAbstract {
)
}
/**
* write a stream to a file using a temporary file
*
* @param {string} path
* @param {ReadableStream} input
* @param {object} [options]
* @param {boolean} [options.checksum]
* @param {number} [options.dirMode]
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
*/
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
path = normalizePath(path)
let checksumStream
if (checksum) {
checksumStream = createChecksumStream()
pipeline(input, checksumStream, noop)
input = checksumStream
}
await this._outputStream(path, input, {
// write a stream to a file using a temporary file
async outputStream(
path: string,
input: Readable | Promise<Readable>,
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
): Promise<void> {
return this._outputStream(normalizePath(path), await input, {
checksum,
dirMode,
validator,
})
if (checksum) {
await this._outputFile(checksumFile(path), await checksumStream.checksum, { dirMode, flags: 'wx' })
}
}
// Free the resources possibly dedicated to put the remote at work, when it
@@ -230,73 +230,73 @@ export default class RemoteHandlerAbstract {
// as mount), forgetting them might breaking other processes using the same
// remote.
@synchronized()
async forget() {
async forget(): Promise<void> {
await this._forget()
}
async getInfo() {
async getInfo(): Promise<RemoteInfo> {
return timeout.call(this._getInfo(), this._timeout)
}
async getSize(file) {
async getSize(file: File): Promise<number> {
return timeout.call(this._getSize(typeof file === 'string' ? normalizePath(file) : file), this._timeout)
}
async list(dir, { filter, ignoreMissing = false, prependDir = false } = {}) {
try {
const virtualDir = normalizePath(dir)
dir = normalizePath(dir)
async list(
dir: string,
{ filter, prependDir = false }: { filter?: (name: string) => boolean, prependDir?: boolean } = {}
): Promise<string[]> {
const virtualDir = normalizePath(dir)
dir = normalizePath(dir)
let entries = await timeout.call(this._list(dir), this._timeout)
if (filter !== undefined) {
entries = entries.filter(filter)
}
if (prependDir) {
entries.forEach((entry, i) => {
entries[i] = virtualDir + '/' + entry
})
}
return entries
} catch (error) {
if (ignoreMissing && error?.code === 'ENOENT') {
return []
}
throw error
let entries = await timeout.call(this._list(dir), this._timeout)
if (filter !== undefined) {
entries = entries.filter(filter)
}
if (prependDir) {
entries.forEach((entry, i) => {
entries[i] = virtualDir + '/' + entry
})
}
return entries
}
async lock(path) {
async lock(path: string): Promise<Disposable> {
path = normalizePath(path)
return { dispose: await this._lock(path) }
}
async mkdir(dir, { mode } = {}) {
async mkdir(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
await this.__mkdir(normalizePath(dir), { mode })
}
async mktree(dir, { mode } = {}) {
async mktree(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
await this._mktree(normalizePath(dir), { mode })
}
openFile(path, flags) {
openFile(path: string, flags: string): Promise<FileDescriptor> {
return this.__openFile(path, flags)
}
async outputFile(file, data, { dirMode, flags = 'wx' } = {}) {
async outputFile(
file: string,
data: Data,
{ dirMode, flags = 'wx' }: { dirMode?: number, flags?: string } = {}
): Promise<void> {
await this._outputFile(normalizePath(file), data, { dirMode, flags })
}
async read(file, buffer, position) {
async read(file: File, buffer: Buffer, position?: number): Promise<{| bytesRead: number, buffer: Buffer |}> {
return this._read(typeof file === 'string' ? normalizePath(file) : file, buffer, position)
}
async readFile(file, { flags = 'r' } = {}) {
async readFile(file: string, { flags = 'r' }: { flags?: string } = {}): Promise<Buffer> {
return this._readFile(normalizePath(file), { flags })
}
async rename(oldPath, newPath, { checksum = false } = {}) {
async rename(oldPath: string, newPath: string, { checksum = false }: Object = {}) {
oldPath = normalizePath(oldPath)
newPath = normalizePath(newPath)
@@ -307,11 +307,11 @@ export default class RemoteHandlerAbstract {
return p
}
async rmdir(dir) {
async rmdir(dir: string): Promise<void> {
await timeout.call(this._rmdir(normalizePath(dir)).catch(ignoreEnoent), this._timeout)
}
async rmtree(dir) {
async rmtree(dir: string): Promise<void> {
await this._rmtree(normalizePath(dir))
}
@@ -320,11 +320,11 @@ export default class RemoteHandlerAbstract {
//
// This method MUST ALWAYS be called before using the handler.
@synchronized()
async sync() {
async sync(): Promise<void> {
await this._sync()
}
async test() {
async test(): Promise<Object> {
const SIZE = 1024 * 1024 * 10
const testFileName = normalizePath(`${Date.now()}.test`)
const data = await fromCallback(randomBytes, SIZE)
@@ -359,11 +359,11 @@ export default class RemoteHandlerAbstract {
}
}
async truncate(file, len) {
async truncate(file: string, len: number): Promise<void> {
await this._truncate(file, len)
}
async unlink(file, { checksum = true } = {}) {
async unlink(file: string, { checksum = true }: Object = {}): Promise<void> {
file = normalizePath(file)
if (checksum) {
@@ -373,21 +373,21 @@ export default class RemoteHandlerAbstract {
await this._unlink(file).catch(ignoreEnoent)
}
async write(file, buffer, position) {
async write(file: File, buffer: Buffer, position: number): Promise<{| bytesWritten: number, buffer: Buffer |}> {
await this._write(typeof file === 'string' ? normalizePath(file) : file, buffer, position)
}
async writeFile(file, data, { flags = 'wx' } = {}) {
async writeFile(file: string, data: Data, { flags = 'wx' }: { flags?: string } = {}): Promise<void> {
await this._writeFile(normalizePath(file), data, { flags })
}
// Methods that can be called by private methods to avoid parallel limit on public methods
async __closeFile(fd) {
async __closeFile(fd: FileDescriptor): Promise<void> {
await timeout.call(this._closeFile(fd.fd), this._timeout)
}
async __mkdir(dir, { mode } = {}) {
async __mkdir(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
try {
await this._mkdir(dir, { mode })
} catch (error) {
@@ -400,7 +400,7 @@ export default class RemoteHandlerAbstract {
}
}
async __openFile(path, flags) {
async __openFile(path: string, flags: string): Promise<FileDescriptor> {
path = normalizePath(path)
return {
@@ -411,11 +411,11 @@ export default class RemoteHandlerAbstract {
// Methods that can be implemented by inheriting classes
async _closeFile(fd) {
async _closeFile(fd: mixed): Promise<void> {
throw new Error('Not implemented')
}
async _createOutputStream(file, { dirMode, ...options } = {}) {
async _createOutputStream(file: File, { dirMode, ...options }: Object = {}): Promise<LaxWritable> {
try {
return await this._createWriteStream(file, { ...options, highWaterMark: this._highWaterMark })
} catch (error) {
@@ -428,40 +428,40 @@ export default class RemoteHandlerAbstract {
return this._createOutputStream(file, options)
}
async _createReadStream(file, options) {
async _createReadStream(file: File, options?: Object): Promise<LaxReadable> {
throw new Error('Not implemented')
}
// createWriteStream takes highWaterMark as option even if it's not documented.
// Source: https://stackoverflow.com/questions/55026306/how-to-set-writeable-highwatermark
async _createWriteStream(file, options) {
async _createWriteStream(file: File, options: Object): Promise<LaxWritable> {
throw new Error('Not implemented')
}
// called to finalize the remote
async _forget() {}
async _forget(): Promise<void> {}
async _getInfo() {
async _getInfo(): Promise<Object> {
return {}
}
async _lock(path) {
async _lock(path: string): Promise<Function> {
return () => Promise.resolve()
}
async _getSize(file) {
async _getSize(file: File): Promise<number> {
throw new Error('Not implemented')
}
async _list(dir) {
async _list(dir: string): Promise<string[]> {
throw new Error('Not implemented')
}
async _mkdir(dir) {
async _mkdir(dir: string): Promise<void> {
throw new Error('Not implemented')
}
async _mktree(dir, { mode } = {}) {
async _mktree(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
try {
return await this.__mkdir(dir, { mode })
} catch (error) {
@@ -474,11 +474,11 @@ export default class RemoteHandlerAbstract {
return this._mktree(dir, { mode })
}
async _openFile(path, flags) {
async _openFile(path: string, flags: string): Promise<mixed> {
throw new Error('Not implemented')
}
async _outputFile(file, data, { dirMode, flags }) {
async _outputFile(file: string, data: Data, { dirMode, flags }: { dirMode?: number, flags?: string }): Promise<void> {
try {
return await this._writeFile(file, data, { flags })
} catch (error) {
@@ -491,40 +491,42 @@ export default class RemoteHandlerAbstract {
return this._outputFile(file, data, { flags })
}
async _outputStream(path, input, { dirMode, validator }) {
async _outputStream(path: string, input: Readable, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await this.createOutputStream(tmpPath, {
checksum,
dirMode,
})
try {
await fromCallback(pipeline, input, output)
if (validator !== undefined) {
await validator.call(this, tmpPath)
}
await this.rename(tmpPath, path)
input.pipe(output)
await fromEvent(output, 'finish')
await output.checksumWritten
// $FlowFixMe
await input.task
await this.rename(tmpPath, path, { checksum })
} catch (error) {
await this.unlink(tmpPath)
await this.unlink(tmpPath, { checksum })
throw error
}
}
_read(file, buffer, position) {
_read(file: File, buffer: Buffer, position?: number): Promise<{| bytesRead: number, buffer: Buffer |}> {
throw new Error('Not implemented')
}
_readFile(file, options) {
_readFile(file: string, options?: Object): Promise<Buffer> {
return this._createReadStream(file, { ...options, highWaterMark: this._highWaterMark }).then(getStream.buffer)
}
async _rename(oldPath, newPath) {
async _rename(oldPath: string, newPath: string) {
throw new Error('Not implemented')
}
async _rmdir(dir) {
async _rmdir(dir: string) {
throw new Error('Not implemented')
}
async _rmtree(dir) {
async _rmtree(dir: string) {
try {
return await this._rmdir(dir)
} catch (error) {
@@ -546,13 +548,13 @@ export default class RemoteHandlerAbstract {
}
// called to initialize the remote
async _sync() {}
async _sync(): Promise<void> {}
async _unlink(file) {
async _unlink(file: string): Promise<void> {
throw new Error('Not implemented')
}
async _write(file, buffer, position) {
async _write(file: File, buffer: Buffer, position: number): Promise<void> {
const isPath = typeof file === 'string'
if (isPath) {
file = await this.__openFile(file, 'r+')
@@ -566,11 +568,11 @@ export default class RemoteHandlerAbstract {
}
}
async _writeFd(fd, buffer, position) {
async _writeFd(fd: FileDescriptor, buffer: Buffer, position: number): Promise<void> {
throw new Error('Not implemented')
}
async _writeFile(file, data, options) {
async _writeFile(file: string, data: Data, options: { flags?: string }): Promise<void> {
throw new Error('Not implemented')
}
}

View File

@@ -1,7 +1,10 @@
// @flow
import through2 from 'through2'
import { createHash } from 'crypto'
import { defer, fromEvent } from 'promise-toolbox'
import { invert } from 'lodash'
import { type Readable, type Transform } from 'stream'
// Format: $<algorithm>$<salt>$<encrypted>
//
@@ -24,7 +27,7 @@ const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// const checksumStream = source.pipe(createChecksumStream())
// checksumStream.resume() // make the data flow without an output
// console.log(await checksumStream.checksum)
export const createChecksumStream = (algorithm = 'md5') => {
export const createChecksumStream = (algorithm: string = 'md5'): Transform & { checksum: Promise<string> } => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
@@ -51,7 +54,10 @@ export const createChecksumStream = (algorithm = 'md5') => {
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
export const validChecksumOfReadStream = (
stream: Readable,
expectedChecksum: string
): Readable & { checksumVerified: Promise<void> } => {
const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1))
if (!algorithmId) {
@@ -60,7 +66,7 @@ export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(
const wrapper: any = stream.pipe(
through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {

View File

@@ -133,14 +133,6 @@ handlers.forEach(url => {
await handler.outputFile('dir/file', '')
expect(await handler.list('dir', { prependDir: true })).toEqual(['/dir/file'])
})
it('throws ENOENT if no such directory', async () => {
expect((await rejectionOf(handler.list('dir'))).code).toBe('ENOENT')
})
it('can returns empty for missing directory', async () => {
expect(await handler.list('dir', { ignoreMissing: true })).toEqual([])
})
})
describe('#mkdir()', () => {

View File

@@ -1,12 +1,16 @@
// @flow
import execa from 'execa'
import { parse } from 'xo-remote-parser'
import type RemoteHandler from './abstract'
import RemoteHandlerLocal from './local'
import RemoteHandlerNfs from './nfs'
import RemoteHandlerS3 from './s3'
import RemoteHandlerSmb from './smb'
import RemoteHandlerSmbMount from './smb-mount'
export type { default as RemoteHandler } from './abstract'
export type Remote = { url: string }
const HANDLERS = {
file: RemoteHandlerLocal,
nfs: RemoteHandlerNfs,
@@ -20,8 +24,11 @@ try {
HANDLERS.smb = RemoteHandlerSmb
}
export const getHandler = (remote, ...rest) => {
const Handler = HANDLERS[parse(remote.url).type]
export const getHandler = (remote: Remote, ...rest: any): RemoteHandler => {
// FIXME: should be done in xo-remote-parser.
const type = remote.url.split('://')[0]
const Handler = HANDLERS[type]
if (!Handler) {
throw new Error('Unhandled remote type')
}

View File

@@ -84,7 +84,7 @@ export default class LocalHandler extends RemoteHandlerAbstract {
}
_lock(path) {
return lockfile.lock(this._getFilePath(path))
return lockfile.lock(path)
}
_mkdir(dir, { mode }) {

View File

@@ -8,6 +8,7 @@ export default class NfsHandler extends MountHandler {
super(remote, opts, {
type: 'nfs',
device: `${host}${port !== undefined ? ':' + port : ''}:${path}`,
defaultOptions: 'vers=3',
})
}

View File

@@ -1,21 +1,24 @@
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import https from 'https'
import { Client as Minio } from 'minio'
import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
import { createChecksumStream } from './checksum'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
// limits: https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
const MIN_PART_SIZE = 1024 * 1024 * 5 // 5MB
const MAX_PART_SIZE = 1024 * 1024 * 1024 * 5 // 5GB
const MAX_PARTS_COUNT = 10000
const MAX_OBJECT_SIZE = 1024 * 1024 * 1024 * 1024 * 5 // 5TB
const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the smallest fragment size that still allows a 5TB upload in 10000 fragments, about 524MB
export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const { host, hostname, port, path, username, password, protocol, region } = parse(remote.url)
const params = {
const { host, path, username, password } = parse(remote.url)
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
this._s3 = aws({
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: host,
@@ -25,30 +28,8 @@ export default class S3Handler extends RemoteHandlerAbstract {
httpOptions: {
timeout: 600000,
},
}
if (protocol === 'http') {
throw new Error('HTTP no longer supported, please use HTTPS')
} else {
params.httpOptions.agent = new https.Agent({
// TODO : UI checkbox
rejectUnauthorized: false,
})
}
if (region !== undefined) {
params.region = region
}
}).s3
this._s3 = aws(params).s3
this._minioClient = new Minio({
endPoint: hostname,
port: port !== undefined ? +port : undefined,
useSSL: protocol !== 'http',
accessKey: username,
secretKey: password,
})
// TODO : UI checkbox
this._minioClient.setRequestOptions({ rejectUnauthorized: false })
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
this._dir = splitPath.join('/')
@@ -62,63 +43,46 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}
async _isNotEmptyDir(path) {
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
MaxKeys: 1,
Prefix: this._dir + path + '/',
})
return result.Contents.length !== 0
}
async _isFile(path) {
try {
await this._s3.headObject(this._createParams(path))
return true
} catch (error) {
if (error.code === 'NotFound') {
return false
async _outputStream(path, input, { checksum }) {
let inputStream = input
if (checksum) {
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
throw error
input.pipe(checksumStream)
input.on('error', forwardError)
inputStream = checksumStream
}
}
async _outputStream(path, input, { validator }) {
await this._minioClient.putObject(this._bucket, this._dir + path, input)
if (validator !== undefined) {
try {
await validator.call(this, path)
} catch (error) {
await this.unlink(path)
throw error
await this._s3.upload(
{
...this._createParams(path),
Body: inputStream,
},
{ partSize: IDEAL_FRAGMENT_SIZE, queueSize: 1 }
)
if (checksum) {
const checksum = await inputStream.checksum
const params = {
...this._createParams(path + '.checksum'),
Body: checksum,
}
await this._s3.upload(params)
}
await input.task
}
async _writeFile(file, data, options) {
return this._s3.putObject({ ...this._createParams(file), Body: data })
}
async _createReadStream(path, options) {
if (!(await this._isFile(path))) {
const error = new Error(`ENOENT: no such file '${path}'`)
error.code = 'ENOENT'
error.path = path
throw error
}
async _createReadStream(file, options) {
// https://github.com/Sullux/aws-sdk/issues/11
return this._s3.getObject.raw(this._createParams(path)).createReadStream()
return this._s3.getObject.raw(this._createParams(file)).createReadStream()
}
async _unlink(path) {
await this._s3.deleteObject(this._createParams(path))
if (await this._isNotEmptyDir(path)) {
const error = new Error(`EISDIR: illegal operation on a directory, unlink '${path}'`)
error.code = 'EISDIR'
error.path = path
throw error
}
async _unlink(file) {
return this._s3.deleteObject(this._createParams(file))
}
async _list(dir) {
@@ -142,16 +106,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
return [...uniq]
}
async _mkdir(path) {
if (await this._isFile(path)) {
const error = new Error(`ENOTDIR: file already exists, mkdir '${path}'`)
error.code = 'ENOTDIR'
error.path = path
throw error
}
// nothing to do, directories do not exist, they are part of the files' path
}
async _rename(oldPath, newPath) {
const size = await this._getSize(oldPath)
const multipartParams = await this._s3.createMultipartUpload({ ...this._createParams(newPath) })
@@ -193,17 +147,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { bytesRead: result.Body.length, buffer }
}
async _rmdir(path) {
if (await this._isNotEmptyDir(path)) {
const error = new Error(`ENOTEMPTY: directory not empty, rmdir '${path}`)
error.code = 'ENOTEMPTY'
error.path = path
throw error
}
// nothing to do, directories do not exist, they are part of the files' path
}
async _write(file, buffer, position) {
if (typeof file !== 'string') {
file = file.fd

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -1,107 +1 @@
const createConsoleTransport = require('./transports/console')
const { LEVELS, resolve } = require('./levels')
const { compileGlobPattern } = require('./utils')
// ===================================================================
const createTransport = config => {
if (typeof config === 'function') {
return config
}
if (Array.isArray(config)) {
const transports = config.map(createTransport)
const { length } = transports
return function () {
for (let i = 0; i < length; ++i) {
transports[i].apply(this, arguments)
}
}
}
let { filter } = config
let transport = createTransport(config.transport)
const level = resolve(config.level)
if (filter !== undefined) {
if (typeof filter === 'string') {
const re = compileGlobPattern(filter)
filter = log => re.test(log.namespace)
}
const orig = transport
transport = function (log) {
if ((level !== undefined && log.level >= level) || filter(log)) {
return orig.apply(this, arguments)
}
}
} else if (level !== undefined) {
const orig = transport
transport = function (log) {
if (log.level >= level) {
return orig.apply(this, arguments)
}
}
}
return transport
}
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
const { env } = process
global[symbol] = createTransport({
// display warnings or above, and all that are enabled via DEBUG or
// NODE_DEBUG env
filter: [env.DEBUG, env.NODE_DEBUG].filter(Boolean).join(','),
level: resolve(env.LOG_LEVEL, LEVELS.INFO),
transport: createConsoleTransport(),
})
const configure = config => {
global[symbol] = createTransport(config)
}
exports.configure = configure
// -------------------------------------------------------------------
const catchGlobalErrors = logger => {
// patch process
const onUncaughtException = error => {
logger.error('uncaught exception', { error })
}
const onUnhandledRejection = error => {
logger.warn('possibly unhandled rejection', { error })
}
const onWarning = error => {
logger.warn('Node warning', { error })
}
process.on('uncaughtException', onUncaughtException)
process.on('unhandledRejection', onUnhandledRejection)
process.on('warning', onWarning)
// patch EventEmitter
const EventEmitter = require('events')
const { prototype } = EventEmitter
const { emit } = prototype
function patchedEmit(event, error) {
if (event === 'error' && this.listenerCount(event) === 0) {
logger.error('unhandled error event', { error })
return false
}
return emit.apply(this, arguments)
}
prototype.emit = patchedEmit
return () => {
process.removeListener('uncaughtException', onUncaughtException)
process.removeListener('unhandledRejection', onUnhandledRejection)
process.removeListener('warning', onWarning)
if (prototype.emit === patchedEmit) {
prototype.emit = emit
}
}
}
exports.catchGlobalErrors = catchGlobalErrors
module.exports = require('./dist/configure')

View File

@@ -16,6 +16,12 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"configure.js",
"dist/",
"transports/"
],
"browserslist": [
">2%"
],
@@ -24,9 +30,24 @@
},
"dependencies": {
"lodash": "^4.17.4",
"promise-toolbox": "^0.19.2"
"promise-toolbox": "^0.18.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"babel-plugin-lodash": "^3.3.2",
"cross-env": "^7.0.2",
"index-modules": "^0.3.0",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -0,0 +1,105 @@
import createConsoleTransport from './transports/console'
import LEVELS, { resolve } from './levels'
import { compileGlobPattern } from './utils'
// ===================================================================
const createTransport = config => {
if (typeof config === 'function') {
return config
}
if (Array.isArray(config)) {
const transports = config.map(createTransport)
const { length } = transports
return function () {
for (let i = 0; i < length; ++i) {
transports[i].apply(this, arguments)
}
}
}
let { filter } = config
let transport = createTransport(config.transport)
const level = resolve(config.level)
if (filter !== undefined) {
if (typeof filter === 'string') {
const re = compileGlobPattern(filter)
filter = log => re.test(log.namespace)
}
const orig = transport
transport = function (log) {
if ((level !== undefined && log.level >= level) || filter(log)) {
return orig.apply(this, arguments)
}
}
} else if (level !== undefined) {
const orig = transport
transport = function (log) {
if (log.level >= level) {
return orig.apply(this, arguments)
}
}
}
return transport
}
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
const { env } = process
global[symbol] = createTransport({
// display warnings or above, and all that are enabled via DEBUG or
// NODE_DEBUG env
filter: [env.DEBUG, env.NODE_DEBUG].filter(Boolean).join(','),
level: resolve(env.LOG_LEVEL, LEVELS.INFO),
transport: createConsoleTransport(),
})
export const configure = config => {
global[symbol] = createTransport(config)
}
// -------------------------------------------------------------------
export const catchGlobalErrors = logger => {
// patch process
const onUncaughtException = error => {
logger.error('uncaught exception', { error })
}
const onUnhandledRejection = error => {
logger.warn('possibly unhandled rejection', { error })
}
const onWarning = error => {
logger.warn('Node warning', { error })
}
process.on('uncaughtException', onUncaughtException)
process.on('unhandledRejection', onUnhandledRejection)
process.on('warning', onWarning)
// patch EventEmitter
const EventEmitter = require('events')
const { prototype } = EventEmitter
const { emit } = prototype
function patchedEmit(event, error) {
if (event === 'error' && this.listenerCount(event) === 0) {
logger.error('unhandled error event', { error })
return false
}
return emit.apply(this, arguments)
}
prototype.emit = patchedEmit
return () => {
process.removeListener('uncaughtException', onUncaughtException)
process.removeListener('unhandledRejection', onUnhandledRejection)
process.removeListener('warning', onWarning)
if (prototype.emit === patchedEmit) {
prototype.emit = emit
}
}
}

View File

@@ -1,5 +1,5 @@
const createTransport = require('./transports/console')
const { LEVELS, resolve } = require('./levels')
import createTransport from './transports/console'
import LEVELS, { resolve } from './levels'
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
if (!(symbol in global)) {
@@ -68,7 +68,5 @@ prototype.wrap = function (message, fn) {
}
}
const createLogger = namespace => new Logger(namespace)
module.exports = exports = createLogger
exports.createLogger = createLogger
export const createLogger = namespace => new Logger(namespace)
export { createLogger as default }

View File

@@ -1,5 +1,5 @@
const LEVELS = Object.create(null)
exports.LEVELS = LEVELS
export { LEVELS as default }
// https://github.com/trentm/node-bunyan#levels
LEVELS.FATAL = 60 // service/app is going down
@@ -8,8 +8,7 @@ LEVELS.WARN = 40 // something went wrong but it's not fatal
LEVELS.INFO = 30 // detail on unusual but normal operation
LEVELS.DEBUG = 20
const NAMES = Object.create(null)
exports.NAMES = NAMES
export const NAMES = Object.create(null)
for (const name in LEVELS) {
NAMES[LEVELS[name]] = name
}
@@ -17,7 +16,7 @@ for (const name in LEVELS) {
// resolves to the number representation of a level
//
// returns `defaultLevel` if invalid
const resolve = (level, defaultLevel) => {
export const resolve = (level, defaultLevel) => {
const type = typeof level
if (type === 'number') {
if (level in NAMES) {
@@ -31,7 +30,6 @@ const resolve = (level, defaultLevel) => {
}
return defaultLevel
}
exports.resolve = resolve
Object.freeze(LEVELS)
Object.freeze(NAMES)

View File

@@ -1,8 +1,8 @@
/* eslint-env jest */
const { forEach, isInteger } = require('lodash')
import { forEach, isInteger } from 'lodash'
const { LEVELS, NAMES, resolve } = require('./levels')
import LEVELS, { NAMES, resolve } from './levels'
describe('LEVELS', () => {
it('maps level names to their integer values', () => {

View File

@@ -0,0 +1,79 @@
import LEVELS, { NAMES } from '../levels'
const { DEBUG, ERROR, FATAL, INFO, WARN } = LEVELS
let formatLevel, formatNamespace
if (process.stdout !== undefined && process.stdout.isTTY && process.stderr !== undefined && process.stderr.isTTY) {
const ansi = (style, str) => `\x1b[${style}m${str}\x1b[0m`
const LEVEL_STYLES = {
[DEBUG]: '2',
[ERROR]: '1;31',
[FATAL]: '1;31',
[INFO]: '1',
[WARN]: '1;33',
}
formatLevel = level => {
const style = LEVEL_STYLES[level]
const name = NAMES[level]
return style === undefined ? name : ansi(style, name)
}
const NAMESPACE_COLORS = [
196,
202,
208,
214,
220,
226,
190,
154,
118,
82,
46,
47,
48,
49,
50,
51,
45,
39,
33,
27,
21,
57,
93,
129,
165,
201,
200,
199,
198,
197,
]
formatNamespace = namespace => {
// https://werxltd.com/wp/2010/05/13/javascript-implementation-of-javas-string-hashcode-method/
let hash = 0
for (let i = 0, n = namespace.length; i < n; ++i) {
hash = ((hash << 5) - hash + namespace.charCodeAt(i)) | 0
}
return ansi(`1;38;5;${NAMESPACE_COLORS[Math.abs(hash) % NAMESPACE_COLORS.length]}`, namespace)
}
} else {
formatLevel = str => NAMES[str]
formatNamespace = str => str
}
const consoleTransport = ({ data, level, namespace, message, time }) => {
const fn =
/* eslint-disable no-console */
level < INFO ? console.log : level < WARN ? console.info : level < ERROR ? console.warn : console.error
/* eslint-enable no-console */
const args = [time.toISOString(), formatNamespace(namespace), formatLevel(level), message]
if (data != null) {
args.push(data)
}
fn.apply(console, args)
}
export default () => consoleTransport

View File

@@ -0,0 +1,64 @@
import fromCallback from 'promise-toolbox/fromCallback'
import prettyFormat from 'pretty-format' // eslint-disable-line node/no-extraneous-import
import { createTransport } from 'nodemailer' // eslint-disable-line node/no-extraneous-import
import { evalTemplate, required } from '../utils'
import { NAMES } from '../levels'
export default ({
// transport options (https://nodemailer.com/smtp/)
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
// message options (https://nodemailer.com/message/)
bcc,
cc,
from = required('from'),
to = required('to'),
subject = '[{{level}} - {{namespace}}] {{time}} {{message}}',
}) => {
const transporter = createTransport(
{
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
disableFileAccess: true,
disableUrlAccess: true,
},
{
bcc,
cc,
from,
to,
}
)
return log =>
fromCallback(cb =>
transporter.sendMail(
{
subject: evalTemplate(subject, key =>
key === 'level' ? NAMES[log.level] : key === 'time' ? log.time.toISOString() : log[key]
),
text: prettyFormat(log.data),
},
cb
)
)
}

View File

@@ -0,0 +1,7 @@
export default () => {
const memoryLogger = log => {
logs.push(log)
}
const logs = (memoryLogger.logs = [])
return memoryLogger
}

View File

@@ -0,0 +1,41 @@
import fromCallback from 'promise-toolbox/fromCallback'
import splitHost from 'split-host'
import { createClient, Facility, Severity, Transport } from 'syslog-client'
import LEVELS from '../levels'
// https://github.com/paulgrove/node-syslog-client#syslogseverity
const LEVEL_TO_SEVERITY = {
[LEVELS.FATAL]: Severity.Critical,
[LEVELS.ERROR]: Severity.Error,
[LEVELS.WARN]: Severity.Warning,
[LEVELS.INFO]: Severity.Informational,
[LEVELS.DEBUG]: Severity.Debug,
}
const facility = Facility.User
export default target => {
const opts = {}
if (target !== undefined) {
if (target.startsWith('tcp://')) {
target = target.slice(6)
opts.transport = Transport.Tcp
} else if (target.startsWith('udp://')) {
target = target.slice(6)
opts.transport = Transport.Udp
}
;({ host: target, port: opts.port } = splitHost(target))
}
const client = createClient(target, opts)
return log =>
fromCallback(cb =>
client.log(log.message, {
facility,
severity: LEVEL_TO_SEVERITY[log.level],
})
)
}

View File

@@ -1,20 +1,19 @@
const escapeRegExp = require('lodash/escapeRegExp')
import escapeRegExp from 'lodash/escapeRegExp'
// ===================================================================
const TPL_RE = /\{\{(.+?)\}\}/g
const evalTemplate = (tpl, data) => {
export const evalTemplate = (tpl, data) => {
const getData = typeof data === 'function' ? (_, key) => data(key) : (_, key) => data[key]
return tpl.replace(TPL_RE, getData)
}
exports.evalTemplate = evalTemplate
// -------------------------------------------------------------------
const compileGlobPatternFragment = pattern => pattern.split('*').map(escapeRegExp).join('.*')
const compileGlobPattern = pattern => {
export const compileGlobPattern = pattern => {
const no = []
const yes = []
pattern.split(/[\s,]+/).forEach(pattern => {
@@ -41,22 +40,19 @@ const compileGlobPattern = pattern => {
return new RegExp(raw.join(''))
}
exports.compileGlobPattern = compileGlobPattern
// -------------------------------------------------------------------
const required = name => {
export const required = name => {
throw new Error(`missing required arg ${name}`)
}
exports.required = required
// -------------------------------------------------------------------
const serializeError = error => ({
export const serializeError = error => ({
...error, // Copy enumerable properties.
code: error.code,
message: error.message,
name: error.name,
stack: error.stack,
})
exports.serializeError = serializeError

View File

@@ -1,6 +1,6 @@
/* eslint-env jest */
const { compileGlobPattern } = require('./utils')
import { compileGlobPattern } from './utils'
describe('compileGlobPattern()', () => {
it('works', () => {

View File

@@ -1,82 +1 @@
const { LEVELS, NAMES } = require('../levels')
const { DEBUG, ERROR, FATAL, INFO, WARN } = LEVELS
let formatLevel, formatNamespace
if (process.stdout !== undefined && process.stdout.isTTY && process.stderr !== undefined && process.stderr.isTTY) {
const ansi = (style, str) => `\x1b[${style}m${str}\x1b[0m`
const LEVEL_STYLES = {
[DEBUG]: '2',
[ERROR]: '1;31',
[FATAL]: '1;31',
[INFO]: '1',
[WARN]: '1;33',
}
formatLevel = level => {
const style = LEVEL_STYLES[level]
const name = NAMES[level]
return style === undefined ? name : ansi(style, name)
}
const NAMESPACE_COLORS = [
196,
202,
208,
214,
220,
226,
190,
154,
118,
82,
46,
47,
48,
49,
50,
51,
45,
39,
33,
27,
21,
57,
93,
129,
165,
201,
200,
199,
198,
197,
]
formatNamespace = namespace => {
// https://werxltd.com/wp/2010/05/13/javascript-implementation-of-javas-string-hashcode-method/
let hash = 0
for (let i = 0, n = namespace.length; i < n; ++i) {
hash = ((hash << 5) - hash + namespace.charCodeAt(i)) | 0
}
return ansi(`1;38;5;${NAMESPACE_COLORS[Math.abs(hash) % NAMESPACE_COLORS.length]}`, namespace)
}
} else {
formatLevel = str => NAMES[str]
formatNamespace = str => str
}
const consoleTransport = ({ data, level, namespace, message, time }) => {
const fn =
/* eslint-disable no-console */
level < INFO ? console.log : level < WARN ? console.info : level < ERROR ? console.warn : console.error
/* eslint-enable no-console */
const args = [time.toISOString(), formatNamespace(namespace), formatLevel(level), message]
if (data != null) {
args.push(data)
}
fn.apply(console, args)
}
const createTransport = () => consoleTransport
module.exports = exports = createTransport
module.exports = require('../dist/transports/console.js')

View File

@@ -1,66 +1 @@
const fromCallback = require('promise-toolbox/fromCallback')
const nodemailer = require('nodemailer') // eslint-disable-line node/no-extraneous-import
const prettyFormat = require('pretty-format') // eslint-disable-line node/no-extraneous-import
const { evalTemplate, required } = require('../utils')
const { NAMES } = require('../levels')
function createTransport({
// transport options (https://nodemailer.com/smtp/)
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
// message options (https://nodemailer.com/message/)
bcc,
cc,
from = required('from'),
to = required('to'),
subject = '[{{level}} - {{namespace}}] {{time}} {{message}}',
}) {
const transporter = nodemailer.createTransport(
{
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
disableFileAccess: true,
disableUrlAccess: true,
},
{
bcc,
cc,
from,
to,
}
)
return log =>
fromCallback(cb =>
transporter.sendMail(
{
subject: evalTemplate(subject, key =>
key === 'level' ? NAMES[log.level] : key === 'time' ? log.time.toISOString() : log[key]
),
text: prettyFormat(log.data),
},
cb
)
)
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/email.js')

View File

@@ -1,9 +1 @@
function createTransport() {
const memoryLogger = log => {
logs.push(log)
}
const logs = (memoryLogger.logs = [])
return memoryLogger
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/memory.js')

View File

@@ -1,43 +1 @@
const fromCallback = require('promise-toolbox/fromCallback')
const splitHost = require('split-host')
const { createClient, Facility, Severity, Transport } = require('syslog-client')
const LEVELS = require('../levels')
// https://github.com/paulgrove/node-syslog-client#syslogseverity
const LEVEL_TO_SEVERITY = {
[LEVELS.FATAL]: Severity.Critical,
[LEVELS.ERROR]: Severity.Error,
[LEVELS.WARN]: Severity.Warning,
[LEVELS.INFO]: Severity.Informational,
[LEVELS.DEBUG]: Severity.Debug,
}
const facility = Facility.User
function createTransport(target) {
const opts = {}
if (target !== undefined) {
if (target.startsWith('tcp://')) {
target = target.slice(6)
opts.transport = Transport.Tcp
} else if (target.startsWith('udp://')) {
target = target.slice(6)
opts.transport = Transport.Udp
}
;({ host: target, port: opts.port } = splitHost(target))
}
const client = createClient(target, opts)
return log =>
fromCallback(cb =>
client.log(log.message, {
facility,
severity: LEVEL_TO_SEVERITY[log.level],
})
)
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/syslog.js')

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -1,39 +0,0 @@
const camelCase = require('lodash/camelCase')
const { defineProperties, defineProperty, keys } = Object
const noop = Function.prototype
const MIXIN_CYCLIC_DESCRIPTOR = {
configurable: true,
get() {
throw new Error('cyclic dependency')
},
}
module.exports = function mixin(object, mixins, args) {
// add lazy property for each of the mixin, this allows mixins to depend on
// one another without any special ordering
const descriptors = {}
keys(mixins).forEach(name => {
const Mixin = mixins[name]
name = camelCase(name)
descriptors[name] = {
configurable: true,
get: () => {
defineProperty(object, name, MIXIN_CYCLIC_DESCRIPTOR)
const instance = new Mixin(object, ...args)
defineProperty(object, name, {
value: instance,
})
return instance
},
}
})
defineProperties(object, descriptors)
// access all mixin properties to trigger their creation
keys(descriptors).forEach(name => {
noop(object[name])
})
}

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/mixin",
"version": "0.1.0",
"version": "0.0.0",
"license": "ISC",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/mixin",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
@@ -15,14 +15,34 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
"engines": {
"node": ">=6"
},
"dependencies": {
"bind-property-descriptor": "^1.0.0",
"lodash": "^4.17.21"
"bind-property-descriptor": "^1.0.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"babel-plugin-dev": "^1.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -1,4 +1,4 @@
const { getBoundPropertyDescriptor } = require('bind-property-descriptor')
import { getBoundPropertyDescriptor } from 'bind-property-descriptor'
// ===================================================================
@@ -25,7 +25,7 @@ const ownKeys =
// -------------------------------------------------------------------
const mixin = Mixins => Class => {
if (!Array.isArray(Mixins)) {
if (__DEV__ && !Array.isArray(Mixins)) {
throw new TypeError('Mixins should be an array')
}
@@ -44,7 +44,7 @@ const mixin = Mixins => Class => {
}
for (const prop of ownKeys(Mixin)) {
if (prop in prototype) {
if (__DEV__ && prop in prototype) {
throw new Error(`${name}#${prop} is already defined`)
}
@@ -106,7 +106,7 @@ const mixin = Mixins => Class => {
return
}
if (prop in descriptors) {
if (__DEV__ && prop in descriptors) {
throw new Error(`${name}.${prop} is already defined`)
}
@@ -117,4 +117,4 @@ const mixin = Mixins => Class => {
return DecoratedClass
}
module.exports = mixin
export { mixin as default }

View File

@@ -1 +0,0 @@
../../scripts/npmignore

View File

@@ -1,30 +0,0 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
# @xen-orchestra/mixins
[![Package Version](https://badgen.net/npm/v/@xen-orchestra/mixins)](https://npmjs.org/package/@xen-orchestra/mixins) ![License](https://badgen.net/npm/license/@xen-orchestra/mixins) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@xen-orchestra/mixins)](https://bundlephobia.com/result?p=@xen-orchestra/mixins) [![Node compatibility](https://badgen.net/npm/node/@xen-orchestra/mixins)](https://npmjs.org/package/@xen-orchestra/mixins)
> Mixins shared between xo-proxy and xo-server
## Install
Installation of the [npm package](https://npmjs.org/package/@xen-orchestra/mixins):
```
> npm install --save @xen-orchestra/mixins
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.
## License
[AGPL-3.0-or-later](https://spdx.org/licenses/AGPL-3.0-or-later) © [Vates SAS](https://vates.fr)

View File

@@ -1,31 +0,0 @@
{
"private": false,
"name": "@xen-orchestra/mixins",
"description": "Mixins shared between xo-proxy and xo-server",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/mixins",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@xen-orchestra/mixins",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "AGPL-3.0-or-later",
"version": "0.1.0",
"engines": {
"node": ">=12"
},
"dependencies": {
"@vates/parse-duration": "^0.1.1",
"@xen-orchestra/emit-async": "^0.0.0",
"@xen-orchestra/log": "^0.2.0",
"app-conf": "^0.9.0",
"lodash": "^4.17.21"
},
"scripts": {
"postversion": "npm publish --access public"
}
}

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -26,7 +26,7 @@
"@babel/cli": "^7.7.4",
"@babel/core": "^7.7.4",
"@babel/preset-env": "^7.7.4",
"cross-env": "^7.0.2",
"cross": "^1.0.0",
"rimraf": "^3.0.0"
},
"dependencies": {

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -22,6 +22,9 @@
"bin": {
"xo-proxy-cli": "dist/index.js"
},
"files": [
"dist/"
],
"engines": {
"node": ">=8.10"
},
@@ -35,7 +38,7 @@
"getopts": "^2.2.3",
"http-request-plus": "^0.9.1",
"json-rpc-protocol": "^0.13.1",
"promise-toolbox": "^0.19.2",
"promise-toolbox": "^0.18.1",
"pump": "^3.0.0",
"pumpify": "^2.0.1",
"split2": "^3.1.1"

View File

@@ -1,5 +0,0 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'), {
'@babel/preset-env': {
modules: false,
},
})

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

Some files were not shown because too many files have changed in this diff Show More