Compare commits

..

1 Commits

Author SHA1 Message Date
Julien Fontanet
4bd7a76d47 feat(xen-api/Record#resolve_): resolve props named as types 2021-04-07 16:00:35 +02:00
561 changed files with 13711 additions and 11656 deletions

View File

@@ -13,13 +13,19 @@ module.exports = {
overrides: [
{
files: ['cli.{,c,m}js', '*-cli.{,c,m}js', '**/*cli*/**/*.{,c,m}js'],
files: ['cli.js', '*-cli.js', '**/*cli*/**/*.js'],
rules: {
'no-console': 'off',
},
},
],
parser: 'babel-eslint',
parserOptions: {
ecmaFeatures: {
legacyDecorators: true,
},
},
rules: {
// disabled because XAPI objects are using camel case
camelcase: ['off'],

View File

@@ -1,33 +0,0 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: 'status: triaging :triangular_flag_on_post:, type: bug :bug:'
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- Node: [e.g. 16.12.1]
- xo-server: [e.g. 5.82.3]
- xo-web: [e.g. 5.87.0]
- hypervisor: [e.g. XCP-ng 8.2.0]
**Additional context**
Add any other context about the problem here.

View File

@@ -1,20 +0,0 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

8
.gitignore vendored
View File

@@ -11,7 +11,7 @@
/packages/*/dist/
/packages/*/node_modules/
/@xen-orchestra/proxy/src/app/mixins/index.mjs
/@xen-orchestra/proxy/src/app/mixins/index.js
/packages/vhd-cli/src/commands/index.js
@@ -19,9 +19,9 @@
/packages/xen-api/plot.dat
/packages/xo-server/.xo-server.*
/packages/xo-server/src/api/index.mjs
/packages/xo-server/src/xapi/mixins/index.mjs
/packages/xo-server/src/xo-mixins/index.mjs
/packages/xo-server/src/api/index.js
/packages/xo-server/src/xapi/mixins/index.js
/packages/xo-server/src/xo-mixins/index.js
/packages/xo-server-auth-ldap/ldap.cache.conf

View File

@@ -20,6 +20,9 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"files": [
"index.js"
],
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"

View File

@@ -16,9 +16,7 @@ Installation of the [npm package](https://npmjs.org/package/@vates/decorate-with
## Usage
### `decorateWith(fn, ...args)`
Creates a new ([legacy](https://babeljs.io/docs/en/babel-plugin-syntax-decorators#legacy)) method decorator from a function decorator, for instance, allows using Lodash's functions as decorators:
For instance, allows using Lodash's functions as decorators:
```js
import { decorateWith } from '@vates/decorate-with'
@@ -31,34 +29,6 @@ class Foo {
}
```
### `decorateMethodsWith(class, map)`
Decorates a number of methods directly, without using the decorator syntax:
```js
import { decorateMethodsWith } from '@vates/decorate-with'
class Foo {
bar() {
// body
}
baz() {
// body
}
}
decorateMethodsWith(Foo, {
// without arguments
bar: lodash.curry,
// with arguments
baz: [lodash.debounce, 150],
})
```
The decorated class is returned, so you can export it directly.
## Contributions
Contributions are _very_ welcomed, either on the documentation or on

View File

@@ -1,6 +1,4 @@
### `decorateWith(fn, ...args)`
Creates a new ([legacy](https://babeljs.io/docs/en/babel-plugin-syntax-decorators#legacy)) method decorator from a function decorator, for instance, allows using Lodash's functions as decorators:
For instance, allows using Lodash's functions as decorators:
```js
import { decorateWith } from '@vates/decorate-with'
@@ -12,31 +10,3 @@ class Foo {
}
}
```
### `decorateMethodsWith(class, map)`
Decorates a number of methods directly, without using the decorator syntax:
```js
import { decorateMethodsWith } from '@vates/decorate-with'
class Foo {
bar() {
// body
}
baz() {
// body
}
}
decorateMethodsWith(Foo, {
// without arguments
bar: lodash.curry,
// with arguments
baz: [lodash.debounce, 150],
})
```
The decorated class is returned, so you can export it directly.

View File

@@ -1,21 +1,4 @@
exports.decorateWith = function decorateWith(fn, ...args) {
return (target, name, descriptor) => ({
...descriptor,
value: fn(descriptor.value, ...args),
})
}
const { getOwnPropertyDescriptor, defineProperty } = Object
exports.decorateMethodsWith = function decorateMethodsWith(klass, map) {
const { prototype } = klass
for (const name of Object.keys(map)) {
const descriptor = getOwnPropertyDescriptor(prototype, name)
const { value } = descriptor
const decorator = map[name]
descriptor.value = typeof decorator === 'function' ? decorator(value) : decorator[0](value, ...decorator.slice(1))
defineProperty(prototype, name, descriptor)
}
return klass
}
exports.decorateWith = (fn, ...args) => (target, name, descriptor) => ({
...descriptor,
value: fn(descriptor.value, ...args),
})

View File

@@ -20,7 +20,7 @@
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.1.0",
"version": "0.0.1",
"engines": {
"node": ">=8.10"
},

View File

@@ -48,7 +48,7 @@ import { createDebounceResource } from '@vates/disposable/debounceResource'
const debounceResource = createDebounceResource()
// it will wait for 10 seconds before calling the disposer
Disposable.use(debounceResource(getConnection(host), 10e3), connection => {})
using(debounceResource(getConnection(host), 10e3), connection => {})
```
### `debounceResource.flushAll()`

View File

@@ -30,7 +30,7 @@ import { createDebounceResource } from '@vates/disposable/debounceResource'
const debounceResource = createDebounceResource()
// it will wait for 10 seconds before calling the disposer
Disposable.use(debounceResource(getConnection(host), 10e3), connection => {})
using(debounceResource(getConnection(host), 10e3), connection => {})
```
### `debounceResource.flushAll()`

View File

@@ -23,8 +23,7 @@
},
"dependencies": {
"@vates/multi-key-map": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.3.0",
"@xen-orchestra/log": "^0.2.0",
"ensure-array": "^1.0.0"
}
}

View File

@@ -44,4 +44,4 @@ You may:
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)
[AGPL-3.0-or-later](https://spdx.org/licenses/AGPL-3.0-or-later) © [Vates SAS](https://vates.fr)

View File

@@ -6,7 +6,7 @@ exports.parseDuration = value => {
}
const duration = ms(value)
if (duration === undefined) {
throw new TypeError(`not a valid duration: ${value}`)
throw new TypeError(`not a valid duration: ${duration}`)
}
return duration
}

View File

@@ -18,8 +18,8 @@
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.1.1",
"license": "AGPL-3.0-or-later",
"version": "0.1.0",
"engines": {
"node": ">=8.10"
},

View File

@@ -23,6 +23,9 @@
"engines": {
"node": ">=8.10"
},
"files": [
"index.js"
],
"scripts": {
"postversion": "npm publish --access public"
},

View File

@@ -31,6 +31,9 @@
"engines": {
"node": ">=6"
},
"files": [
"index.js"
],
"bin": "./index.js",
"scripts": {
"postversion": "npm publish --access public"

View File

@@ -24,6 +24,10 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"files": [
"index.js",
"legacy.js"
],
"engines": {
"node": ">=6"
},

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -9,7 +9,7 @@
},
"version": "0.2.0",
"engines": {
"node": ">=10"
"node": ">=8.10"
},
"main": "dist/",
"scripts": {
@@ -26,13 +26,14 @@
"@babel/plugin-proposal-decorators": "^7.8.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.8.0",
"@babel/preset-env": "^7.7.4",
"cross-env": "^7.0.2",
"cross": "^1.0.0",
"rimraf": "^3.0.0"
},
"dependencies": {
"@vates/decorate-with": "^0.1.0",
"@xen-orchestra/log": "^0.3.0",
"@xen-orchestra/log": "^0.2.0",
"core-js": "^3.6.4",
"golike-defer": "^0.5.1",
"lodash": "^4.17.15",
"object-hash": "^2.0.1"
},
"private": false,

View File

@@ -1,8 +1,10 @@
// see https://github.com/babel/babel/issues/8450
import 'core-js/features/symbol/async-iterator'
import assert from 'assert'
import defer from 'golike-defer'
import hash from 'object-hash'
import { createLogger } from '@xen-orchestra/log'
import { decorateWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
const log = createLogger('xo:audit-core')
@@ -63,7 +65,7 @@ export class AuditCore {
this._storage = storage
}
@decorateWith(defer)
@defer
async add($defer, subject, event, data) {
const time = Date.now()
$defer(await this._storage.acquireLock())
@@ -148,7 +150,7 @@ export class AuditCore {
}
}
@decorateWith(defer)
@defer
async deleteRangeAndRewrite($defer, newest, oldest) {
assert.notStrictEqual(newest, undefined)
assert.notStrictEqual(oldest, undefined)

View File

@@ -17,10 +17,10 @@ interface Record {
}
export class AuditCore {
constructor(storage: Storage) {}
public add(subject: any, event: string, data: any): Promise<Record> {}
public checkIntegrity(oldest: string, newest: string): Promise<number> {}
public getFrom(newest?: string): AsyncIterator {}
public deleteFrom(newest: string): Promise<void> {}
public deleteRangeAndRewrite(newest: string, oldest: string): Promise<void> {}
constructor(storage: Storage) { }
public add(subject: any, event: string, data: any): Promise<Record> { }
public checkIntegrity(oldest: string, newest: string): Promise<number> { }
public getFrom(newest?: string): AsyncIterator { }
public deleteFrom(newest: string): Promise<void> { }
public deleteRangeAndRewrite(newest: string, oldest: string): Promise<void> { }
}

View File

@@ -14,13 +14,25 @@ const configs = {
'@babel/plugin-proposal-pipeline-operator': {
proposal: 'minimal',
},
'@babel/preset-env': {
debug: !__TEST__,
'@babel/preset-env'(pkg) {
return {
debug: !__TEST__,
// disabled until https://github.com/babel/babel/issues/8323 is resolved
// loose: true,
// disabled until https://github.com/babel/babel/issues/8323 is resolved
// loose: true,
shippedProposals: true,
shippedProposals: true,
targets: (() => {
let node = (pkg.engines || {}).node
if (node !== undefined) {
const trimChars = '^=>~'
while (trimChars.includes(node[0])) {
node = node.slice(1)
}
}
return { browsers: pkg.browserslist, node }
})(),
}
},
}
@@ -32,21 +44,21 @@ const getConfig = (key, ...args) => {
// some plugins must be used in a specific order
const pluginsOrder = ['@babel/plugin-proposal-decorators', '@babel/plugin-proposal-class-properties']
module.exports = function (pkg, configs = {}) {
const plugins = {}
const presets = {}
module.exports = function (pkg, plugins, presets) {
plugins === undefined && (plugins = {})
presets === undefined && (presets = {})
Object.keys(pkg.devDependencies || {}).forEach(name => {
if (!(name in presets) && PLUGINS_RE.test(name)) {
plugins[name] = { ...getConfig(name, pkg), ...configs[name] }
plugins[name] = getConfig(name, pkg)
} else if (!(name in presets) && PRESETS_RE.test(name)) {
presets[name] = { ...getConfig(name, pkg), ...configs[name] }
presets[name] = getConfig(name, pkg)
}
})
return {
comments: !__PROD__,
ignore: __PROD__ ? [/\btests?\//, /\.spec\.js$/] : undefined,
ignore: __TEST__ ? undefined : [/\.spec\.js$/],
plugins: Object.keys(plugins)
.map(plugin => [plugin, plugins[plugin]])
.sort(([a], [b]) => {
@@ -55,15 +67,5 @@ module.exports = function (pkg, configs = {}) {
return oA !== -1 && oB !== -1 ? oA - oB : a < b ? -1 : 1
}),
presets: Object.keys(presets).map(preset => [preset, presets[preset]]),
targets: (() => {
let node = (pkg.engines || {}).node
if (node !== undefined) {
const trimChars = '^=>~'
while (trimChars.includes(node[0])) {
node = node.slice(1)
}
}
return { browsers: pkg.browserslist, node }
})(),
}
}

View File

@@ -10,13 +10,12 @@ const { resolve } = require('path')
const adapter = new RemoteAdapter(require('@xen-orchestra/fs').getHandler({ url: 'file://' }))
module.exports = async function main(args) {
const { _, fix, remove, merge } = getopts(args, {
const { _, remove, merge } = getopts(args, {
alias: {
fix: 'f',
remove: 'r',
merge: 'm',
},
boolean: ['fix', 'merge', 'remove'],
boolean: ['merge', 'remove'],
default: {
merge: false,
remove: false,
@@ -26,7 +25,7 @@ module.exports = async function main(args) {
await asyncMap(_, async vmDir => {
vmDir = resolve(vmDir)
try {
await adapter.cleanVm(vmDir, { fixMetadata: fix, remove, merge, onLog: (...args) => console.warn(...args) })
await adapter.cleanVm(vmDir, { remove, merge, onLog: log => console.warn(log) })
} catch (error) {
console.error('adapter.cleanVm', vmDir, error)
}

View File

@@ -5,12 +5,11 @@ require('./_composeCommands')({
get main() {
return require('./commands/clean-vms')
},
usage: `[--fix] [--merge] [--remove] xo-vm-backups/*
usage: `[--merge] [--remove] xo-vm-backups/*
Detects and repair issues with VM backups.
Options:
-f, --fix Fix metadata issues (like size)
-m, --merge Merge (or continue merging) VHD files that are unused
-r, --remove Remove unused, incomplete, orphan, or corrupted files
`,

View File

@@ -7,16 +7,21 @@
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"dependencies": {
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.15.1",
"@xen-orchestra/fs": "^0.18.0",
"@xen-orchestra/backups": "^0.9.1",
"@xen-orchestra/fs": "^0.14.0",
"filenamify": "^4.1.0",
"getopts": "^2.2.5",
"lodash": "^4.17.15",
"promise-toolbox": "^0.20.0"
"promise-toolbox": "^0.18.0",
"vhd-lib": "^1.0.0"
},
"engines": {
"node": ">=7.10.1"
},
"files": [
"commands",
"*.js"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/backups-cli",
"name": "@xen-orchestra/backups-cli",
"repository": {
@@ -27,7 +32,7 @@
"scripts": {
"postversion": "npm publish --access public"
},
"version": "0.6.0",
"version": "0.5.0",
"license": "AGPL-3.0-or-later",
"author": {
"name": "Vates SAS",

View File

@@ -1,14 +1,14 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const limitConcurrency = require('limit-concurrency-decorator').default
const { compileTemplate } = require('@xen-orchestra/template')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
const { Task } = require('./Task.js')
const { VmBackup } = require('./_VmBackup.js')
const { XoMetadataBackup } = require('./_XoMetadataBackup.js')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup')
const { Task } = require('./Task')
const { VmBackup } = require('./_VmBackup')
const { XoMetadataBackup } = require('./_XoMetadataBackup')
const noop = Function.prototype

View File

@@ -1,9 +1,8 @@
const assert = require('assert')
const { formatFilenameDate } = require('./_filenameDate.js')
const { importDeltaVm } = require('./_deltaVm.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
const { formatFilenameDate } = require('./_filenameDate')
const { importDeltaVm } = require('./_deltaVm')
const { Task } = require('./Task')
exports.ImportVmBackup = class ImportVmBackup {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses } = {} }) {
@@ -19,17 +18,13 @@ exports.ImportVmBackup = class ImportVmBackup {
const metadata = this._metadata
const isFull = metadata.mode === 'full'
const sizeContainer = { size: 0 }
let backup
if (isFull) {
backup = await adapter.readFullVmBackup(metadata)
watchStreamSize(backup, sizeContainer)
} else {
assert.strictEqual(metadata.mode, 'delta')
backup = await adapter.readDeltaVmBackup(metadata)
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
}
return Task.run(
@@ -57,7 +52,7 @@ exports.ImportVmBackup = class ImportVmBackup {
])
return {
size: sizeContainer.size,
size: metadata.size,
id: await xapi.getField('VM', vmRef, 'uuid'),
}
}

View File

@@ -1,24 +1,23 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const Disposable = require('promise-toolbox/Disposable.js')
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromEvent = require('promise-toolbox/fromEvent.js')
const pDefer = require('promise-toolbox/defer.js')
const Disposable = require('promise-toolbox/Disposable')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
const pDefer = require('promise-toolbox/defer')
const pump = require('pump')
const { basename, dirname, join, normalize, resolve } = require('path')
const { createLogger } = require('@xen-orchestra/log')
const { createSyntheticStream, mergeVhd, VhdFile } = require('vhd-lib')
const { deduped } = require('@vates/disposable/deduped.js')
const { createSyntheticStream, mergeVhd, default: Vhd } = require('vhd-lib')
const { deduped } = require('@vates/disposable/deduped')
const { execFile } = require('child_process')
const { readdir, stat } = require('fs-extra')
const { ZipFile } = require('yazl')
const { BACKUP_DIR } = require('./_getVmBackupDir.js')
const { cleanVm } = require('./_cleanVm.js')
const { getTmpDir } = require('./_getTmpDir.js')
const { isMetadataFile, isVhdFile } = require('./_backupType.js')
const { isValidXva } = require('./_isValidXva.js')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions.js')
const { lvs, pvs } = require('./_lvm.js')
const { BACKUP_DIR } = require('./_getVmBackupDir')
const { cleanVm } = require('./_cleanVm')
const { getTmpDir } = require('./_getTmpDir')
const { isMetadataFile, isVhdFile } = require('./_backupType')
const { listPartitions, LVM_PARTITION_TYPE } = require('./_listPartitions')
const { lvs, pvs } = require('./_lvm')
const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'
exports.DIR_XO_CONFIG_BACKUPS = DIR_XO_CONFIG_BACKUPS
@@ -86,7 +85,7 @@ class RemoteAdapter {
}),
async path => {
try {
const vhd = new VhdFile(handler, path)
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter()
return {
footer: vhd.footer,
@@ -253,9 +252,16 @@ class RemoteAdapter {
async deleteDeltaVmBackups(backups) {
const handler = this._handler
// unused VHDs will be detected by `cleanVm`
await asyncMapSettled(backups, ({ _filename }) => handler.unlink(_filename))
let mergedDataSize = 0
await asyncMapSettled(backups, ({ _filename, vhds }) =>
Promise.all([
handler.unlink(_filename),
asyncMap(Object.values(vhds), async _ => {
mergedDataSize += await this._deleteVhd(resolveRelativeFromFile(_filename, _))
}),
])
)
return mergedDataSize
}
async deleteMetadataBackup(backupId) {
@@ -499,14 +505,21 @@ class RemoteAdapter {
}
async outputStream(path, input, { checksum = true, validator = noop } = {}) {
await this._handler.outputStream(path, input, {
const handler = this._handler
input = await input
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await handler.createOutputStream(tmpPath, {
checksum,
dirMode: this._dirMode,
async validator() {
await input.task
return validator.apply(this, arguments)
},
})
try {
await Promise.all([fromCallback(pump, input, output), output.checksumWritten, input.task])
await validator(tmpPath)
await handler.rename(tmpPath, path, { checksum })
} catch (error) {
await handler.unlink(tmpPath, { checksum })
throw error
}
}
async readDeltaVmBackup(metadata) {
@@ -515,7 +528,7 @@ class RemoteAdapter {
const dir = dirname(metadata._filename)
const streams = {}
await asyncMapSettled(Object.keys(vdis), async id => {
await asyncMapSettled(Object.entries(vdis), async ([id, vdi]) => {
streams[`${id}.vhd`] = await createSyntheticStream(handler, join(dir, vhds[id]))
})
@@ -538,15 +551,8 @@ class RemoteAdapter {
}
}
Object.assign(RemoteAdapter.prototype, {
cleanVm(vmDir, { lock = true } = {}) {
if (lock) {
return Disposable.use(this._handler.lock(vmDir), () => cleanVm.apply(this, arguments))
} else {
return cleanVm.apply(this, arguments)
}
},
isValidXva,
})
RemoteAdapter.prototype.cleanVm = function (vmDir) {
return Disposable.use(this._handler.lock(vmDir), () => cleanVm.apply(this, arguments))
}
exports.RemoteAdapter = RemoteAdapter

View File

@@ -1,5 +1,5 @@
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup.js')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
const { PATH_DB_DUMP } = require('./_PoolMetadataBackup')
exports.RestoreMetadataBackup = class RestoreMetadataBackup {
constructor({ backupId, handler, xapi }) {

View File

@@ -1,12 +1,11 @@
const CancelToken = require('promise-toolbox/CancelToken.js')
const Zone = require('node-zone')
const { SyncThenable } = require('./_syncThenable')
const logAfterEnd = () => {
throw new Error('task has already ended')
}
const noop = Function.prototype
// Create a serializable object from an error.
//
// Otherwise some fields might be non-enumerable and missing from logs.
@@ -20,132 +19,163 @@ const serializeError = error =>
stack: error.stack,
}
: error
exports.serializeError = serializeError
const $$task = Symbol('@xen-orchestra/backups/Task')
class Task {
static get cancelToken() {
const task = Zone.current.data[$$task]
return task !== undefined ? task.#cancelToken : CancelToken.none
class TaskLogger {
constructor(logFn, parentId) {
this._log = logFn
this._parentId = parentId
this._taskId = undefined
}
static run(opts, fn) {
return new this(opts).run(fn, true)
get taskId() {
const taskId = this._taskId
if (taskId === undefined) {
throw new Error('start the task first')
}
return taskId
}
static wrapFn(opts, fn) {
// create a subtask
fork() {
return new TaskLogger(this._log, this.taskId)
}
info(message, data) {
return this._log({
data,
event: 'info',
message,
taskId: this.taskId,
timestamp: Date.now(),
})
}
run(message, data, fn) {
if (arguments.length === 2) {
fn = data
data = undefined
}
return SyncThenable.tryUnwrap(
SyncThenable.fromFunction(() => {
if (this._taskId !== undefined) {
throw new Error('task has already started')
}
this._taskId = Math.random().toString(36).slice(2)
return this._log({
data,
event: 'start',
message,
parentId: this._parentId,
taskId: this.taskId,
timestamp: Date.now(),
})
})
.then(fn)
.then(
result => {
const log = this._log
this._log = logAfterEnd
return SyncThenable.resolve(
log({
event: 'end',
result,
status: 'success',
taskId: this.taskId,
timestamp: Date.now(),
})
).then(() => result)
},
error => {
const log = this._log
this._log = logAfterEnd
return SyncThenable.resolve(
log({
event: 'end',
result: serializeError(error),
status: 'failure',
taskId: this.taskId,
timestamp: Date.now(),
})
).then(() => {
throw error
})
}
)
)
}
warning(message, data) {
return this._log({
data,
event: 'warning',
message,
taskId: this.taskId,
timestamp: Date.now(),
})
}
wrapFn(fn, message, data) {
const logger = this
return function () {
const evaluate = v => (typeof v === 'function' ? v.apply(this, arguments) : v)
return logger.run(evaluate(message), evaluate(data), () => fn.apply(this, arguments))
}
}
}
const $$task = Symbol('current task logger')
const getCurrent = () => Zone.current.data[$$task]
const Task = {
info(message, data) {
const task = getCurrent()
if (task !== undefined) {
return task.info(message, data)
}
},
run({ name, data, onLog }, fn) {
let parentId
if (onLog === undefined) {
const parent = getCurrent()
if (parent === undefined) {
return fn()
}
onLog = parent._log
parentId = parent.taskId
}
const task = new TaskLogger(onLog, parentId)
const zone = Zone.current.fork('task')
zone.data[$$task] = task
return task.run(name, data, zone.wrap(fn))
},
warning(message, data) {
const task = getCurrent()
if (task !== undefined) {
return task.warning(message, data)
}
},
wrapFn(opts, fn) {
// compatibility with @decorateWith
if (typeof fn !== 'function') {
;[fn, opts] = [opts, fn]
}
const { name, data, onLog } = opts
return function () {
return Task.run(typeof opts === 'function' ? opts.apply(this, arguments) : opts, () => fn.apply(this, arguments))
const evaluate = v => (typeof v === 'function' ? v.apply(this, arguments) : v)
return Task.run({ name: evaluate(name), data: evaluate(data), onLog }, () => fn.apply(this, arguments))
}
}
#cancelToken
#id = Math.random().toString(36).slice(2)
#onLog
#zone
constructor({ name, data, onLog }) {
let parentCancelToken, parentId
if (onLog === undefined) {
const parent = Zone.current.data[$$task]
if (parent === undefined) {
onLog = noop
} else {
onLog = log => parent.#onLog(log)
parentCancelToken = parent.#cancelToken
parentId = parent.#id
}
}
const zone = Zone.current.fork('@xen-orchestra/backups/Task')
zone.data[$$task] = this
this.#zone = zone
const { cancel, token } = CancelToken.source(parentCancelToken && [parentCancelToken])
this.#cancelToken = token
this.cancel = cancel
this.#onLog = onLog
this.#log('start', {
data,
message: name,
parentId,
})
}
failure(error) {
this.#end('failure', serializeError(error))
}
info(message, data) {
this.#log('info', { data, message })
}
/**
* Run a function in the context of this task
*
* In case of error, the task will be failed.
*
* @typedef Result
* @param {() => Result)} fn
* @param {boolean} last - Whether the task should succeed if there is no error
* @returns Result
*/
run(fn, last = false) {
return this.#zone.run(() => {
try {
const result = fn()
let then
if (result != null && typeof (then = result.then) === 'function') {
then.call(result, last && (value => this.success(value)), error => this.failure(error))
} else if (last) {
this.success(result)
}
return result
} catch (error) {
this.failure(error)
throw error
}
})
}
success(value) {
this.#end('success', value)
}
warning(message, data) {
this.#log('warning', { data, message })
}
wrapFn(fn, last) {
const task = this
return function () {
return task.run(() => fn.apply(this, arguments), last)
}
}
#end(status, result) {
this.#log('end', { result, status })
this.#onLog = logAfterEnd
}
#log(event, props) {
this.#onLog({
...props,
event,
taskId: this.#id,
timestamp: Date.now(),
})
}
},
}
exports.Task = Task
for (const method of ['info', 'warning']) {
Task[method] = (...args) => Zone.current.data[$$task]?.[method](...args)
}

View File

@@ -1,17 +1,32 @@
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { importDeltaVm, TAG_COPY_SRC } = require('../_deltaVm.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { importDeltaVm, TAG_COPY_SRC } = require('./_deltaVm')
const { listReplicatedVms } = require('./_listReplicatedVms')
const { Task } = require('./Task')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.ContinuousReplicationWriter = class ContinuousReplicationWriter {
constructor(backup, sr, settings) {
this._backup = backup
this._settings = settings
this._sr = sr
this.transfer = Task.wrapFn(
{
name: 'export',
data: ({ deltaExport }) => ({
id: sr.uuid,
isFull: Object.values(deltaExport.vdis).some(vdi => vdi.other_config['xo:base_delta'] === undefined),
type: 'SR',
}),
},
this.transfer
)
}
exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinReplicationWriter(AbstractDeltaWriter) {
async checkBaseVdis(baseUuidToSrcVdi, baseVm) {
const sr = this._sr
const replicatedVm = listReplicatedVms(sr.$xapi, this._backup.job.id, sr.uuid, this._backup.vm.uuid).find(
@@ -36,23 +51,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
}
}
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
id: this._sr.uuid,
isFull,
type: 'SR',
},
})
this.transfer = task.wrapFn(this.transfer)
this.cleanup = task.wrapFn(this.cleanup, true)
return task.run(() => this._prepare())
}
async _prepare() {
async prepare() {
const settings = this._settings
const { uuid: srUuid, $xapi: xapi } = this._sr
const { scheduleId, vm } = this._backup
@@ -64,12 +63,8 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
if (settings.deleteFirst) {
await this._deleteOldEntries()
}
}
async cleanup() {
if (!this._settings.deleteFirst) {
await this._deleteOldEntries()
} else {
this.cleanup = this._deleteOldEntries
}
}
@@ -77,7 +72,7 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
return asyncMapSettled(this._oldEntries, vm => vm.$destroy())
}
async _transfer({ timestamp, deltaExport, sizeContainers }) {
async transfer({ timestamp, deltaExport, sizeContainers }) {
const sr = this._sr
const { job, scheduleId, vm } = this._backup
@@ -106,11 +101,9 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
targetVm.ha_restart_priority !== '' &&
Promise.all([targetVm.set_ha_restart_priority(''), targetVm.add_tags('HA disabled')]),
targetVm.set_name_label(`${vm.name_label} - ${job.name} - (${formatFilenameDate(timestamp)})`),
asyncMap(['start', 'start_on'], op =>
targetVm.update_blocked_operations(
op,
'Start operation for this vm is blocked, clone it if you want to use it.'
)
targetVm.update_blocked_operations(
'start',
'Start operation for this vm is blocked, clone it if you want to use it.'
),
targetVm.update_other_config({
'xo:backup:sr': srUuid,

View File

@@ -1,25 +1,42 @@
const assert = require('assert')
const map = require('lodash/map.js')
const mapValues = require('lodash/mapValues.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const map = require('lodash/map')
const mapValues = require('lodash/mapValues')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap } = require('@xen-orchestra/async-map')
const { chainVhd, checkVhdChain, VhdFile } = require('vhd-lib')
const { chainVhd, checkVhdChain, default: Vhd } = require('vhd-lib')
const { createLogger } = require('@xen-orchestra/log')
const { dirname } = require('path')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { Task } = require('../Task.js')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { checkVhd } = require('./_checkVhd')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { getVmBackupDir } = require('./_getVmBackupDir')
const { packUuid } = require('./_packUuid')
const { Task } = require('./Task')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
exports.DeltaBackupWriter = class DeltaBackupWriter {
constructor(backup, remoteId, settings) {
this._adapter = backup.remoteAdapters[remoteId]
this._backup = backup
this._settings = settings
this.transfer = Task.wrapFn(
{
name: 'export',
data: ({ deltaExport }) => ({
id: remoteId,
isFull: Object.values(deltaExport.vdis).some(vdi => vdi.other_config['xo:base_delta'] === undefined),
type: 'remote',
}),
},
this.transfer
)
this[settings.deleteFirst ? 'prepare' : 'cleanup'] = this._deleteOldEntries
}
async checkBaseVdis(baseUuidToSrcVdi) {
const { handler } = this._adapter
const backup = this._backup
@@ -38,7 +55,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
try {
await checkVhdChain(handler, path)
const vhd = new VhdFile(handler, path)
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter()
found = found || vhd.footer.uuid.equals(packUuid(baseUuid))
} catch (error) {
@@ -55,28 +72,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
})
}
async beforeBackup() {
await super.beforeBackup()
return this._cleanVm({ merge: true })
}
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
name: 'export',
data: {
id: this._remoteId,
isFull,
type: 'remote',
},
})
this.transfer = task.wrapFn(this.transfer)
this.cleanup = task.wrapFn(this.cleanup, true)
return task.run(() => this._prepare())
}
async _prepare() {
async prepare() {
const adapter = this._adapter
const settings = this._settings
const { scheduleId, vm } = this._backup
@@ -103,26 +99,28 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
if (settings.deleteFirst) {
await this._deleteOldEntries()
}
}
async cleanup() {
if (!this._settings.deleteFirst) {
await this._deleteOldEntries()
} else {
this.cleanup = this._deleteOldEntries
}
}
async _deleteOldEntries() {
const adapter = this._adapter
const oldEntries = this._oldEntries
return Task.run({ name: 'merge' }, async () => {
const adapter = this._adapter
const oldEntries = this._oldEntries
// delete sequentially from newest to oldest to avoid unnecessary merges
for (let i = oldEntries.length; i-- > 0; ) {
await adapter.deleteDeltaVmBackups([oldEntries[i]])
}
let size = 0
// delete sequentially from newest to oldest to avoid unnecessary merges
for (let i = oldEntries.length; i-- > 0; ) {
size += await adapter.deleteDeltaVmBackups([oldEntries[i]])
}
return {
size,
}
})
}
async _transfer({ timestamp, deltaExport, sizeContainers }) {
async transfer({ timestamp, deltaExport, sizeContainers }) {
const adapter = this._adapter
const backup = this._backup
@@ -200,7 +198,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
}
// set the correct UUID in the VHD
const vhd = new VhdFile(handler, path)
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter()
vhd.footer.uuid = packUuid(vdi.uuid)
await vhd.readBlockAllocationTable() // required by writeFooter()

View File

@@ -1,24 +1,23 @@
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMapSettled } = require('@xen-orchestra/async-map')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { listReplicatedVms } = require('./_listReplicatedVms')
const { Task } = require('./Task')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
const { MixinReplicationWriter } = require('./_MixinReplicationWriter.js')
const { listReplicatedVms } = require('./_listReplicatedVms.js')
exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplicationWriter(AbstractFullWriter) {
constructor(props) {
super(props)
exports.DisasterRecoveryWriter = class DisasterRecoveryWriter {
constructor(backup, sr, settings) {
this._backup = backup
this._settings = settings
this._sr = sr
this.run = Task.wrapFn(
{
name: 'export',
data: {
id: props.sr.uuid,
id: sr.uuid,
type: 'SR',
// necessary?
@@ -29,7 +28,7 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
)
}
async _run({ timestamp, sizeContainer, stream }) {
async run({ timestamp, sizeContainer, stream }) {
const sr = this._sr
const settings = this._settings
const { job, scheduleId, vm } = this._backup
@@ -64,11 +63,9 @@ exports.FullReplicationWriter = class FullReplicationWriter extends MixinReplica
const targetVm = await xapi.getRecord('VM', targetVmRef)
await Promise.all([
asyncMap(['start', 'start_on'], op =>
targetVm.update_blocked_operations(
op,
'Start operation for this vm is blocked, clone it if you want to use it.'
)
targetVm.update_blocked_operations(
'start',
'Start operation for this vm is blocked, clone it if you want to use it.'
),
targetVm.update_other_config({
'xo:backup:sr': srUuid,

View File

@@ -1,20 +1,20 @@
const { formatFilenameDate } = require('../_filenameDate.js')
const { getOldEntries } = require('../_getOldEntries.js')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { Task } = require('../Task.js')
const { formatFilenameDate } = require('./_filenameDate')
const { getOldEntries } = require('./_getOldEntries')
const { getVmBackupDir } = require('./_getVmBackupDir')
const { isValidXva } = require('./isValidXva')
const { Task } = require('./Task')
const { MixinBackupWriter } = require('./_MixinBackupWriter.js')
const { AbstractFullWriter } = require('./_AbstractFullWriter.js')
exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(AbstractFullWriter) {
constructor(props) {
super(props)
exports.FullBackupWriter = class FullBackupWriter {
constructor(backup, remoteId, settings) {
this._backup = backup
this._remoteId = remoteId
this._settings = settings
this.run = Task.wrapFn(
{
name: 'export',
data: {
id: props.remoteId,
id: remoteId,
type: 'remote',
// necessary?
@@ -25,13 +25,14 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
)
}
async _run({ timestamp, sizeContainer, stream }) {
async run({ timestamp, sizeContainer, stream }) {
const backup = this._backup
const remoteId = this._remoteId
const settings = this._settings
const { job, scheduleId, vm } = backup
const adapter = this._adapter
const adapter = backup.remoteAdapters[remoteId]
const handler = adapter.handler
const backupDir = getVmBackupDir(vm.uuid)
@@ -67,7 +68,11 @@ exports.FullBackupWriter = class FullBackupWriter extends MixinBackupWriter(Abst
await Task.run({ name: 'transfer' }, async () => {
await adapter.outputStream(dataFilename, stream, {
validator: tmpPath => adapter.isValidXva(tmpPath),
validator: tmpPath => {
if (handler._getFilePath !== undefined) {
return isValidXva(handler._getFilePath('/' + tmpPath))
}
},
})
return { size: sizeContainer.size }
})

View File

@@ -1,9 +1,9 @@
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { formatFilenameDate } = require('./_filenameDate.js')
const { Task } = require('./Task.js')
const { DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe')
const { formatFilenameDate } = require('./_filenameDate')
const { Task } = require('./Task')
const PATH_DB_DUMP = '/pool/xmldbdump'
exports.PATH_DB_DUMP = PATH_DB_DUMP

View File

@@ -1,32 +1,23 @@
const assert = require('assert')
const findLast = require('lodash/findLast.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const keyBy = require('lodash/keyBy.js')
const mapValues = require('lodash/mapValues.js')
const findLast = require('lodash/findLast')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const keyBy = require('lodash/keyBy')
const mapValues = require('lodash/mapValues')
const { asyncMap } = require('@xen-orchestra/async-map')
const { createLogger } = require('@xen-orchestra/log')
const { defer } = require('golike-defer')
const { formatDateTime } = require('@xen-orchestra/xapi')
const { DeltaBackupWriter } = require('./writers/DeltaBackupWriter.js')
const { DeltaReplicationWriter } = require('./writers/DeltaReplicationWriter.js')
const { exportDeltaVm } = require('./_deltaVm.js')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe.js')
const { FullBackupWriter } = require('./writers/FullBackupWriter.js')
const { FullReplicationWriter } = require('./writers/FullReplicationWriter.js')
const { getOldEntries } = require('./_getOldEntries.js')
const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
const { ContinuousReplicationWriter } = require('./_ContinuousReplicationWriter')
const { DeltaBackupWriter } = require('./_DeltaBackupWriter')
const { DisasterRecoveryWriter } = require('./_DisasterRecoveryWriter')
const { exportDeltaVm } = require('./_deltaVm')
const { forkStreamUnpipe } = require('./_forkStreamUnpipe')
const { FullBackupWriter } = require('./_FullBackupWriter')
const { getOldEntries } = require('./_getOldEntries')
const { Task } = require('./Task')
const { watchStreamSize } = require('./_watchStreamSize')
const { debug, warn } = createLogger('xo:backups:VmBackup')
const asyncEach = async (iterable, fn, thisArg = iterable) => {
for (const item of iterable) {
await fn.call(thisArg, item)
}
}
const forkDeltaExport = deltaExport =>
Object.create(deltaExport, {
streams: {
@@ -71,12 +62,12 @@ exports.VmBackup = class VmBackup {
// Create writers
{
const writers = new Set()
const writers = []
this._writers = writers
const [BackupWriter, ReplicationWriter] = this._isDelta
? [DeltaBackupWriter, DeltaReplicationWriter]
: [FullBackupWriter, FullReplicationWriter]
? [DeltaBackupWriter, ContinuousReplicationWriter]
: [FullBackupWriter, DisasterRecoveryWriter]
const allSettings = job.settings
@@ -86,7 +77,7 @@ exports.VmBackup = class VmBackup {
...allSettings[remoteId],
}
if (targetSettings.exportRetention !== 0) {
writers.add(new BackupWriter({ backup: this, remoteId, settings: targetSettings }))
writers.push(new BackupWriter(this, remoteId, targetSettings))
}
})
srs.forEach(sr => {
@@ -95,43 +86,12 @@ exports.VmBackup = class VmBackup {
...allSettings[sr.uuid],
}
if (targetSettings.copyRetention !== 0) {
writers.add(new ReplicationWriter({ backup: this, sr, settings: targetSettings }))
writers.push(new ReplicationWriter(this, sr, targetSettings))
}
})
}
}
// calls fn for each function, warns of any errors, and throws only if there are no writers left
async _callWriters(fn, warnMessage, parallel = true) {
const writers = this._writers
const n = writers.size
if (n === 0) {
return
}
if (n === 1) {
const [writer] = writers
try {
await fn(writer)
} catch (error) {
writers.delete(writer)
throw error
}
return
}
await (parallel ? asyncMap : asyncEach)(writers, async function (writer) {
try {
await fn(writer)
} catch (error) {
this.delete(writer)
warn(warnMessage, { error, writer: writer.constructor.name })
}
})
if (writers.size === 0) {
throw new Error('all targets have failed, step: ' + warnMessage)
}
}
// ensure the VM itself does not have any backup metadata which would be
// copied on manual snapshots and interfere with the backup jobs
async _cleanMetadata() {
@@ -154,8 +114,7 @@ exports.VmBackup = class VmBackup {
const settings = this._settings
const doSnapshot =
this._isDelta || (!settings.offlineBackup && vm.power_state === 'Running') || settings.snapshotRetention !== 0
const doSnapshot = this._isDelta || vm.power_state === 'Running' || settings.snapshotRetention !== 0
if (doSnapshot) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
@@ -187,28 +146,31 @@ exports.VmBackup = class VmBackup {
async _copyDelta() {
const { exportedVm } = this
const baseVm = this._baseVm
const fullVdisRequired = this._fullVdisRequired
const isFull = fullVdisRequired === undefined || fullVdisRequired.size !== 0
await this._callWriters(writer => writer.prepare({ isFull }), 'writer.prepare()')
await asyncMap(this._writers, writer => writer.prepare && writer.prepare())
const deltaExport = await exportDeltaVm(exportedVm, baseVm, {
fullVdisRequired,
fullVdisRequired: this._fullVdisRequired,
})
const sizeContainers = mapValues(deltaExport.streams, stream => watchStreamSize(stream))
const sizeContainers = mapValues(deltaExport.streams, watchStreamSize)
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.transfer({
await asyncMap(this._writers, async writer => {
try {
await writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
sizeContainers,
timestamp,
}),
'writer.transfer()'
)
})
} catch (error) {
warn('copy failure', {
error,
target: writer.target,
vm: this.vm,
})
}
})
this._baseVm = exportedVm
@@ -233,7 +195,7 @@ exports.VmBackup = class VmBackup {
size,
})
await this._callWriters(writer => writer.cleanup(), 'writer.cleanup()')
await asyncMap(this._writers, writer => writer.cleanup && writer.cleanup())
}
async _copyFull() {
@@ -246,15 +208,21 @@ exports.VmBackup = class VmBackup {
const timestamp = Date.now()
await this._callWriters(
writer =>
writer.run({
await asyncMap(this._writers, async writer => {
try {
await writer.run({
sizeContainer,
stream: forkStreamUnpipe(stream),
timestamp,
}),
'writer.run()'
)
})
} catch (error) {
warn('copy failure', {
error,
target: writer.target,
vm: this.vm,
})
}
})
const { size } = sizeContainer
const end = Date.now()
@@ -285,28 +253,17 @@ exports.VmBackup = class VmBackup {
}
async _removeUnusedSnapshots() {
const jobSettings = this.job.settings
const baseVmRef = this._baseVm?.$ref
const { config } = this
const baseSettings = {
...config.defaultSettings,
...config.metadata.defaultSettings,
...jobSettings[''],
}
// TODO: handle all schedules (no longer existing schedules default to 0 retention)
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const { scheduleId } = this
const scheduleSnapshots = this._jobSnapshots.filter(_ => _.other_config['xo:backup:schedule'] === scheduleId)
const baseVmRef = this._baseVm?.$ref
const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = {
...baseSettings,
...jobSettings[scheduleId],
...jobSettings[this.vm.uuid],
await asyncMap(getOldEntries(this._settings.snapshotRetention, scheduleSnapshots), ({ $ref }) => {
if ($ref !== baseVmRef) {
return xapi.VM_destroy($ref)
}
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== baseVmRef) {
return xapi.VM_destroy($ref)
}
})
})
}
@@ -315,14 +272,12 @@ exports.VmBackup = class VmBackup {
let baseVm = findLast(this._jobSnapshots, _ => 'xo:backup:exported' in _.other_config)
if (baseVm === undefined) {
debug('no base VM found')
return
}
const fullInterval = this._settings.fullInterval
const deltaChainLength = +(baseVm.other_config['xo:backup:deltaChainLength'] ?? 0) + 1
if (!(fullInterval === 0 || fullInterval > deltaChainLength)) {
debug('not using base VM becaust fullInterval reached')
return
}
@@ -337,32 +292,22 @@ exports.VmBackup = class VmBackup {
const srcVdi = srcVdis[snapshotOf]
if (srcVdi !== undefined) {
baseUuidToSrcVdi.set(await xapi.getField('VDI', baseRef, 'uuid'), srcVdi)
} else {
debug('no base VDI found', {
vdi: srcVdi.uuid,
})
}
})
const presentBaseVdis = new Map(baseUuidToSrcVdi)
await this._callWriters(
writer => presentBaseVdis.size !== 0 && writer.checkBaseVdis(presentBaseVdis, baseVm),
'writer.checkBaseVdis()',
false
)
const writers = this._writers
for (let i = 0, n = writers.length; presentBaseVdis.size !== 0 && i < n; ++i) {
await writers[i].checkBaseVdis(presentBaseVdis, baseVm)
}
if (presentBaseVdis.size === 0) {
return
}
const fullVdisRequired = new Set()
baseUuidToSrcVdi.forEach((srcVdi, baseUuid) => {
if (presentBaseVdis.has(baseUuid)) {
debug('found base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
} else {
debug('missing base VDI', {
base: baseUuid,
vdi: srcVdi.uuid,
})
if (!presentBaseVdis.has(baseUuid)) {
fullVdisRequired.add(srcVdi.uuid)
}
})
@@ -371,19 +316,7 @@ exports.VmBackup = class VmBackup {
this._fullVdisRequired = fullVdisRequired
}
run = defer(this.run)
async run($defer) {
const settings = this._settings
assert(
!settings.offlineBackup || settings.snapshotRetention === 0,
'offlineBackup is not compatible with snapshotRetention'
)
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(() => writer.afterBackup())
}, 'writer.beforeBackup()')
async run() {
await this._fetchJobSnapshots()
if (this._isDelta) {
@@ -393,7 +326,7 @@ exports.VmBackup = class VmBackup {
await this._cleanMetadata()
await this._removeUnusedSnapshots()
const { vm } = this
const { _settings: settings, vm } = this
const isRunning = vm.power_state === 'Running'
const startAfter = isRunning && (settings.offlineBackup ? 'backup' : settings.offlineSnapshot && 'snapshot')
if (startAfter) {
@@ -406,7 +339,7 @@ exports.VmBackup = class VmBackup {
ignoreErrors.call(vm.$callAsync('start', false, false))
}
if (this._writers.size !== 0) {
if (this._writers.length !== 0) {
await (this._isDelta ? this._copyDelta() : this._copyFull())
}
} finally {

View File

@@ -1,8 +1,8 @@
const { asyncMap } = require('@xen-orchestra/async-map')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter.js')
const { formatFilenameDate } = require('./_filenameDate.js')
const { Task } = require('./Task.js')
const { DIR_XO_CONFIG_BACKUPS } = require('./RemoteAdapter')
const { formatFilenameDate } = require('./_filenameDate')
const { Task } = require('./Task')
exports.XoMetadataBackup = class XoMetadataBackup {
constructor({ config, job, remoteAdapters, schedule, settings }) {

View File

@@ -1,19 +1,15 @@
require('@xen-orchestra/log/configure.js').catchGlobalErrors(
require('@xen-orchestra/log').createLogger('xo:backups:worker')
)
const Disposable = require('promise-toolbox/Disposable.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { compose } = require('@vates/compose')
const { createDebounceResource } = require('@vates/disposable/debounceResource.js')
const { deduped } = require('@vates/disposable/deduped.js')
const { createDebounceResource } = require('@vates/disposable/debounceResource')
const { deduped } = require('@vates/disposable/deduped')
const { getHandler } = require('@xen-orchestra/fs')
const { parseDuration } = require('@vates/parse-duration')
const { Xapi } = require('@xen-orchestra/xapi')
const { Backup } = require('./Backup.js')
const { RemoteAdapter } = require('./RemoteAdapter.js')
const { Task } = require('./Task.js')
const { Backup } = require('./Backup')
const { RemoteAdapter } = require('./RemoteAdapter')
const { Task } = require('./Task')
class BackupWorker {
#config

View File

@@ -1,5 +1,5 @@
const cancelable = require('promise-toolbox/cancelable.js')
const CancelToken = require('promise-toolbox/CancelToken.js')
const cancelable = require('promise-toolbox/cancelable')
const CancelToken = require('promise-toolbox/CancelToken')
// Similar to `Promise.all` + `map` but pass a cancel token to the callback
//

View File

@@ -1,4 +1,4 @@
const Vhd = require('vhd-lib').VhdFile
const Vhd = require('vhd-lib').default
exports.checkVhd = async function checkVhd(handler, path) {
await new Vhd(handler, path).readHeaderAndFooter()

View File

@@ -1,19 +1,17 @@
const assert = require('assert')
const sum = require('lodash/sum')
const limitConcurrency = require('limit-concurrency-decorator').default
const { asyncMap } = require('@xen-orchestra/async-map')
const { VhdFile, mergeVhd } = require('vhd-lib')
const { default: Vhd, mergeVhd } = require('vhd-lib')
const { dirname, resolve } = require('path')
const { DISK_TYPE_DIFFERENCING } = require('vhd-lib/dist/_constants.js')
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { Task } = require('./Task.js')
const { DISK_TYPE_DIFFERENCING } = require('vhd-lib/dist/_constants')
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType')
const { isValidXva } = require('./isValidXva')
// chain is an array of VHDs from child to parent
//
// the whole chain will be merged into parent, parent will be renamed to child
// and all the others will deleted
async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
const mergeVhdChain = limitConcurrency(1)(async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
assert(chain.length >= 2)
let child = chain[0]
@@ -37,8 +35,6 @@ async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
child = children[0]
}
onLog(`merging ${child} into ${parent}`)
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
@@ -46,7 +42,7 @@ async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
}
}, 10e3)
const mergedSize = await mergeVhd(
await mergeVhd(
handler,
parent,
handler,
@@ -63,103 +59,57 @@ async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
)
clearInterval(handle)
await Promise.all([
handler.rename(parent, child),
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
if (remove) {
onLog(`deleting unused VHD ${child}`)
return handler.unlink(child)
}
}),
])
return mergedSize
}
}
await Promise.all([
remove && handler.rename(parent, child),
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
return remove && handler.unlink(child)
}),
])
})
const noop = Function.prototype
const INTERRUPTED_VHDS_REG = /^(?:(.+)\/)?\.(.+)\.merge.json$/
const listVhds = async (handler, vmDir) => {
const vhds = []
const interruptedVhds = new Set()
await asyncMap(
await handler.list(`${vmDir}/vdis`, {
ignoreMissing: true,
prependDir: true,
}),
async jobDir =>
asyncMap(
await handler.list(jobDir, {
prependDir: true,
}),
async vdiDir => {
const list = await handler.list(vdiDir, {
filter: file => isVhdFile(file) || INTERRUPTED_VHDS_REG.test(file),
prependDir: true,
})
list.forEach(file => {
const res = INTERRUPTED_VHDS_REG.exec(file)
if (res === null) {
vhds.push(file)
} else {
const [, dir, file] = res
interruptedVhds.add(`${dir}/${file}`)
}
})
}
)
)
return { vhds, interruptedVhds }
}
const defaultMergeLimiter = limitConcurrency(1)
exports.cleanVm = async function cleanVm(
vmDir,
{ fixMetadata, remove, merge, mergeLimiter = defaultMergeLimiter, onLog = noop }
) {
const limitedMergeVhdChain = mergeLimiter(mergeVhdChain)
exports.cleanVm = async function cleanVm(vmDir, { remove, merge, onLog = noop }) {
const handler = this._handler
const vhds = new Set()
const vhdParents = { __proto__: null }
const vhdChildren = { __proto__: null }
const vhdsList = await listVhds(handler, vmDir)
// remove broken VHDs
await asyncMap(vhdsList.vhds, async path => {
try {
const vhd = new VhdFile(handler, path)
await vhd.readHeaderAndFooter(!vhdsList.interruptedVhds.has(path))
vhds.add(path)
if (vhd.footer.diskType === DISK_TYPE_DIFFERENCING) {
const parent = resolve('/', dirname(path), vhd.header.parentUnicodeName)
vhdParents[path] = parent
if (parent in vhdChildren) {
const error = new Error('this script does not support multiple VHD children')
error.parent = parent
error.child1 = vhdChildren[parent]
error.child2 = path
throw error // should we throw?
await asyncMap(
await handler.list(`${vmDir}/vdis`, {
filter: isVhdFile,
prependDir: true,
}),
async path => {
try {
const vhd = new Vhd(handler, path)
await vhd.readHeaderAndFooter()
vhds.add(path)
if (vhd.footer.diskType === DISK_TYPE_DIFFERENCING) {
const parent = resolve(dirname(path), vhd.header.parentUnicodeName)
vhdParents[path] = parent
if (parent in vhdChildren) {
const error = new Error('this script does not support multiple VHD children')
error.parent = parent
error.child1 = vhdChildren[parent]
error.child2 = path
throw error // should we throw?
}
vhdChildren[parent] = path
}
} catch (error) {
onLog(`error while checking the VHD with path ${path}`)
if (error?.code === 'ERR_ASSERTION' && remove) {
await handler.unlink(path)
}
vhdChildren[parent] = path
}
} catch (error) {
onLog(`error while checking the VHD with path ${path}`, { error })
if (error?.code === 'ERR_ASSERTION' && remove) {
onLog(`deleting broken ${path}`)
await handler.unlink(path)
}
}
})
)
// remove VHDs with missing ancestors
{
@@ -182,7 +132,6 @@ exports.cleanVm = async function cleanVm(
onLog(`the parent ${parent} of the VHD ${vhd} is missing`)
if (remove) {
onLog(`deleting orphan VHD ${vhd}`)
deletions.push(handler.unlink(vhd))
}
}
@@ -218,7 +167,7 @@ exports.cleanVm = async function cleanVm(
await asyncMap(xvas, async path => {
// check is not good enough to delete the file, the best we can do is report
// it
if (!(await this.isValidXva(path))) {
if (!(await isValidXva(path))) {
onLog(`the XVA with path ${path} is potentially broken`)
}
})
@@ -231,66 +180,38 @@ exports.cleanVm = async function cleanVm(
await asyncMap(jsons, async json => {
const metadata = JSON.parse(await handler.readFile(json))
const { mode } = metadata
let size
if (mode === 'full') {
const linkedXva = resolve('/', vmDir, metadata.xva)
const linkedXva = resolve(vmDir, metadata.xva)
if (xvas.has(linkedXva)) {
unusedXvas.delete(linkedXva)
size = await handler.getSize(linkedXva).catch(error => {
onLog(`failed to get size of ${json}`, { error })
})
} else {
onLog(`the XVA linked to the metadata ${json} is missing`)
if (remove) {
onLog(`deleting incomplete backup ${json}`)
await handler.unlink(json)
}
}
} else if (mode === 'delta') {
const linkedVhds = (() => {
const { vhds } = metadata
return Object.keys(vhds).map(key => resolve('/', vmDir, vhds[key]))
return Object.keys(vhds).map(key => resolve(vmDir, vhds[key]))
})()
// FIXME: find better approach by keeping as much of the backup as
// possible (existing disks) even if one disk is missing
if (linkedVhds.every(_ => vhds.has(_))) {
linkedVhds.forEach(_ => unusedVhds.delete(_))
size = await asyncMap(linkedVhds, vhd => handler.getSize(vhd)).then(sum, error => {
onLog(`failed to get size of ${json}`, { error })
})
} else {
onLog(`Some VHDs linked to the metadata ${json} are missing`)
if (remove) {
onLog(`deleting incomplete backup ${json}`)
await handler.unlink(json)
}
}
}
const metadataSize = metadata.size
if (size !== undefined && metadataSize !== size) {
onLog(`incorrect size in metadata: ${metadataSize ?? 'none'} instead of ${size}`)
// don't update if the the stored size is greater than found files,
// it can indicates a problem
if (fixMetadata && (metadataSize === undefined || metadataSize < size)) {
try {
metadata.size = size
await handler.writeFile(json, JSON.stringify(metadata), { flags: 'w' })
} catch (error) {
onLog(`failed to update size in backup metadata ${json}`, { error })
}
}
}
})
// TODO: parallelize by vm/job/vdi
const unusedVhdsDeletion = []
const toMerge = []
{
// VHD chains (as list from child to ancestor) to merge indexed by last
// ancestor
@@ -323,7 +244,6 @@ exports.cleanVm = async function cleanVm(
onLog(`the VHD ${vhd} is unused`)
if (remove) {
onLog(`deleting unused VHD ${vhd}`)
unusedVhdsDeletion.push(handler.unlink(vhd))
}
}
@@ -332,47 +252,26 @@ exports.cleanVm = async function cleanVm(
vhdChainsToMerge[vhd] = getUsedChildChainOrDelete(vhd)
})
// merge interrupted VHDs
vhdsList.interruptedVhds.forEach(parent => {
vhdChainsToMerge[parent] = [vhdChildren[parent], parent]
})
Object.values(vhdChainsToMerge).forEach(chain => {
Object.keys(vhdChainsToMerge).forEach(key => {
const chain = vhdChainsToMerge[key]
if (chain !== undefined) {
toMerge.push(chain)
unusedVhdsDeletion.push(mergeVhdChain(chain, { onLog, remove, merge }))
}
})
}
const doMerge = () => {
const promise = asyncMap(toMerge, async chain => limitedMergeVhdChain(chain, { handler, onLog, remove, merge }))
return merge ? promise.then(sizes => ({ size: sum(sizes) })) : promise
}
await Promise.all([
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
unusedVhdsDeletion,
asyncMap(unusedXvas, path => {
onLog(`the XVA ${path} is unused`)
if (remove) {
onLog(`deleting unused XVA ${path}`)
return handler.unlink(path)
}
return remove && handler.unlink(path)
}),
asyncMap(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
onLog(`the XVA checksum ${path} is unused`)
if (remove) {
onLog(`deleting unused XVA checksum ${path}`)
return handler.unlink(path)
}
return remove && handler.unlink(path)
}
}),
])
return {
// boolean whether some VHDs were merged (or should be merged)
merge: toMerge.length !== 0,
}
}

View File

@@ -1,14 +1,14 @@
const compareVersions = require('compare-versions')
const find = require('lodash/find.js')
const groupBy = require('lodash/groupBy.js')
const ignoreErrors = require('promise-toolbox/ignoreErrors.js')
const omit = require('lodash/omit.js')
const defer = require('golike-defer').default
const find = require('lodash/find')
const groupBy = require('lodash/groupBy')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const omit = require('lodash/omit')
const { asyncMap } = require('@xen-orchestra/async-map')
const { CancelToken } = require('promise-toolbox')
const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer')
const { cancelableMap } = require('./_cancelableMap.js')
const { cancelableMap } = require('./_cancelableMap')
const TAG_BASE_DELTA = 'xo:base_delta'
exports.TAG_BASE_DELTA = TAG_BASE_DELTA
@@ -202,7 +202,6 @@ exports.importDeltaVm = defer(async function importDeltaVm(
blocked_operations: {
...vmRecord.blocked_operations,
start: 'Importing…',
start_on: 'Importing…',
},
ha_always_run: false,
is_a_template: false,
@@ -306,6 +305,9 @@ exports.importDeltaVm = defer(async function importDeltaVm(
}
}),
// Wait for VDI export tasks (if any) termination.
Promise.all(Object.values(streams).map(stream => stream.task)),
// Create VIFs.
asyncMap(Object.values(deltaVm.vifs), vif => {
let network = vif.$network$uuid && xapi.getObjectByUuid(vif.$network$uuid, undefined)

View File

@@ -1,4 +1,4 @@
const Disposable = require('promise-toolbox/Disposable.js')
const Disposable = require('promise-toolbox/Disposable')
const { join } = require('path')
const { mkdir, rmdir } = require('fs-extra')
const { tmpdir } = require('os')
@@ -10,7 +10,7 @@ exports.getTmpDir = async function getTmpDir() {
const path = join(tmpdir(), Math.random().toString(36).slice(2))
try {
await mkdir(path)
return new Disposable(() => rmdir(path), path)
return new Disposable(path, () => rmdir(path))
} catch (error) {
if (i === MAX_ATTEMPTS) {
throw error

View File

@@ -1,4 +1,4 @@
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromCallback = require('promise-toolbox/fromCallback')
const { createLogger } = require('@xen-orchestra/log')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')

View File

@@ -1,4 +1,4 @@
const fromCallback = require('promise-toolbox/fromCallback.js')
const fromCallback = require('promise-toolbox/fromCallback')
const { createParser } = require('parse-pairs')
const { execFile } = require('child_process')
@@ -7,25 +7,23 @@ const { execFile } = require('child_process')
const parse = createParser({
keyTransform: key => key.slice(5).toLowerCase(),
})
const makeFunction =
command =>
async (fields, ...args) => {
const info = await fromCallback(execFile, command, [
'--noheading',
'--nosuffix',
'--nameprefixes',
'--unbuffered',
'--units',
'b',
'-o',
String(fields),
...args,
])
return info
.trim()
.split(/\r?\n/)
.map(Array.isArray(fields) ? parse : line => parse(line)[fields])
}
const makeFunction = command => async (fields, ...args) => {
const info = await fromCallback(execFile, command, [
'--noheading',
'--nosuffix',
'--nameprefixes',
'--unbuffered',
'--units',
'b',
'-o',
String(fields),
...args,
])
return info
.trim()
.split(/\r?\n/)
.map(Array.isArray(fields) ? parse : line => parse(line)[fields])
}
exports.lvs = makeFunction('lvs')
exports.pvs = makeFunction('pvs')

View File

@@ -0,0 +1,46 @@
function fulfilledThen(cb) {
return typeof cb === 'function' ? SyncThenable.fromFunction(cb, this.value) : this
}
function rejectedThen(_, cb) {
return typeof cb === 'function' ? SyncThenable.fromFunction(cb, this.value) : this
}
class SyncThenable {
static resolve(value) {
if (value != null && typeof value.then === 'function') {
return value
}
return new this(false, value)
}
static fromFunction(fn, ...arg) {
try {
return this.resolve(fn(...arg))
} catch (error) {
return this.reject(error)
}
}
static reject(reason) {
return new this(true, reason)
}
// unwrap if it's a SyncThenable
static tryUnwrap(value) {
if (value instanceof this) {
if (value.then === rejectedThen) {
throw value.value
}
return value.value
}
return value
}
constructor(rejected, value) {
this.then = rejected ? rejectedThen : fulfilledThen
this.value = value
}
}
exports.SyncThenable = SyncThenable

View File

@@ -1,4 +1,5 @@
exports.watchStreamSize = function watchStreamSize(stream, container = { size: 0 }) {
exports.watchStreamSize = function watchStreamSize(stream) {
const container = { size: 0 }
stream.on('data', data => {
container.size += data.length
})

View File

@@ -1,4 +1,4 @@
const mapValues = require('lodash/mapValues.js')
const mapValues = require('lodash/mapValues')
const { dirname } = require('path')
function formatVmBackup(backup) {

View File

@@ -1,10 +1,10 @@
const assert = require('assert')
const fs = require('fs-extra')
const isGzipFile = async (handler, fd) => {
const isGzipFile = async fd => {
// https://tools.ietf.org/html/rfc1952.html#page-5
const magicNumber = Buffer.allocUnsafe(2)
assert.strictEqual((await handler.read(fd, magicNumber, 0)).bytesRead, magicNumber.length)
assert.strictEqual((await fs.read(fd, magicNumber, 0, magicNumber.length, 0)).bytesRead, magicNumber.length)
return magicNumber[0] === 31 && magicNumber[1] === 139
}
@@ -21,33 +21,32 @@ const isGzipFile = async (handler, fd) => {
// /^Ref:\d+/\d+\.checksum$/ and then validating the tar structure from it
//
// https://github.com/npm/node-tar/issues/234#issuecomment-538190295
const isValidTar = async (handler, size, fd) => {
const isValidTar = async (size, fd) => {
if (size <= 1024 || size % 512 !== 0) {
return false
}
const buf = Buffer.allocUnsafe(1024)
assert.strictEqual((await handler.read(fd, buf, size - buf.length)).bytesRead, buf.length)
assert.strictEqual((await fs.read(fd, buf, 0, buf.length, size - buf.length)).bytesRead, buf.length)
return buf.every(_ => _ === 0)
}
// TODO: find an heuristic for compressed files
async function isValidXva(path) {
const handler = this._handler
const isValidXva = async path => {
try {
const fd = await handler.openFile(path, 'r')
const fd = await fs.open(path, 'r')
try {
const size = await handler.getSize(fd)
const { size } = await fs.fstat(fd)
if (size < 20) {
// neither a valid gzip not tar
return false
}
return (await isGzipFile(handler, fd))
return (await isGzipFile(fd))
? true // gzip files cannot be validated at this time
: await isValidTar(handler, size, fd)
: await isValidTar(size, fd)
} finally {
handler.closeFile(fd).catch(noop)
fs.close(fd).catch(noop)
}
} catch (error) {
// never throw, log and report as valid to avoid side effects

View File

@@ -1,69 +0,0 @@
#!/usr/bin/env node
const { catchGlobalErrors } = require('@xen-orchestra/log/configure.js')
const { createLogger } = require('@xen-orchestra/log')
const { getSyncedHandler } = require('@xen-orchestra/fs')
const { join } = require('path')
const Disposable = require('promise-toolbox/Disposable')
const min = require('lodash/min')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const { RemoteAdapter } = require('../RemoteAdapter.js')
const { CLEAN_VM_QUEUE } = require('./index.js')
// -------------------------------------------------------------------
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
const { fatal, info, warn } = createLogger('xo:backups:mergeWorker')
// -------------------------------------------------------------------
const main = Disposable.wrap(async function* main(args) {
const handler = yield getSyncedHandler({ url: 'file://' + process.cwd() })
yield handler.lock(CLEAN_VM_QUEUE)
const adapter = new RemoteAdapter(handler)
const listRetry = async () => {
const timeoutResolver = resolve => setTimeout(resolve, 10e3)
for (let i = 0; i < 10; ++i) {
const entries = await handler.list(CLEAN_VM_QUEUE)
if (entries.length !== 0) {
return entries
}
await new Promise(timeoutResolver)
}
}
let taskFiles
while ((taskFiles = await listRetry()) !== undefined) {
const taskFileBasename = min(taskFiles)
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
// move this task to the end
await handler.rename(join(CLEAN_VM_QUEUE, taskFileBasename), taskFile)
try {
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
await adapter.cleanVm(vmDir, { merge: true, onLog: info, remove: true })
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
} catch (error) {
warn('failure handling task', { error })
}
}
})
info('starting')
main(process.argv.slice(2)).then(
() => {
info('bye :-)')
},
error => {
fatal(error)
process.exit(1)
}
)

View File

@@ -1,25 +0,0 @@
const { join, resolve } = require('path')
const { spawn } = require('child_process')
const { check } = require('proper-lockfile')
const CLEAN_VM_QUEUE = (exports.CLEAN_VM_QUEUE = '/xo-vm-backups/.queue/clean-vm/')
const CLI_PATH = resolve(__dirname, 'cli.js')
exports.run = async function runMergeWorker(remotePath) {
try {
// TODO: find a way to pass the acquire the lock and then pass it down the worker
if (await check(join(remotePath, CLEAN_VM_QUEUE))) {
// already locked, don't start another worker
return
}
spawn(CLI_PATH, {
cwd: remotePath,
detached: true,
stdio: 'inherit',
}).unref()
} catch (error) {
// we usually don't want to throw if the merge worker failed to start
return error
}
}

View File

@@ -8,9 +8,9 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"version": "0.15.1",
"version": "0.9.1",
"engines": {
"node": ">=14.6"
"node": ">=14.5"
},
"scripts": {
"postversion": "npm publish --access public"
@@ -18,28 +18,28 @@
"dependencies": {
"@vates/compose": "^2.0.0",
"@vates/disposable": "^0.1.1",
"@vates/parse-duration": "^0.1.1",
"@vates/multi-key-map": "^0.1.0",
"@vates/parse-duration": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^0.18.0",
"@xen-orchestra/log": "^0.3.0",
"@xen-orchestra/fs": "^0.14.0",
"@xen-orchestra/log": "^0.2.0",
"@xen-orchestra/template": "^0.1.0",
"compare-versions": "^4.0.1",
"compare-versions": "^3.6.0",
"d3-time-format": "^3.0.0",
"end-of-stream": "^1.4.4",
"fs-extra": "^10.0.0",
"ensure-array": "^1.0.0",
"fs-extra": "^9.0.0",
"golike-defer": "^0.5.1",
"limit-concurrency-decorator": "^0.5.0",
"limit-concurrency-decorator": "^0.4.0",
"lodash": "^4.17.20",
"node-zone": "^0.4.0",
"parse-pairs": "^1.1.0",
"promise-toolbox": "^0.20.0",
"proper-lockfile": "^4.1.2",
"pump": "^3.0.0",
"vhd-lib": "^1.3.0",
"promise-toolbox": "^0.18.0",
"vhd-lib": "^1.0.0",
"yazl": "^2.5.1"
},
"peerDependencies": {
"@xen-orchestra/xapi": "^0.8.0"
"@xen-orchestra/xapi": "^0.6.0"
},
"license": "AGPL-3.0-or-later",
"author": {

View File

@@ -1,4 +1,4 @@
const { DIR_XO_CONFIG_BACKUPS, DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter.js')
const { DIR_XO_CONFIG_BACKUPS, DIR_XO_POOL_METADATA_BACKUPS } = require('./RemoteAdapter')
exports.parseMetadataBackupId = function parseMetadataBackupId(backupId) {
const [dir, ...rest] = backupId.split('/')

View File

@@ -1,26 +0,0 @@
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractDeltaWriter = class AbstractDeltaWriter extends AbstractWriter {
checkBaseVdis(baseUuidToSrcVdi, baseVm) {
throw new Error('Not implemented')
}
cleanup() {
throw new Error('Not implemented')
}
prepare({ isFull }) {
throw new Error('Not implemented')
}
async transfer({ timestamp, deltaExport, sizeContainers }) {
try {
return await this._transfer({ timestamp, deltaExport, sizeContainers })
} finally {
// ensure all streams are properly closed
for (const stream of Object.values(deltaExport.streams)) {
stream.destroy()
}
}
}
}

View File

@@ -1,12 +0,0 @@
const { AbstractWriter } = require('./_AbstractWriter.js')
exports.AbstractFullWriter = class AbstractFullWriter extends AbstractWriter {
async run({ timestamp, sizeContainer, stream }) {
try {
return await this._run({ timestamp, sizeContainer, stream })
} finally {
// ensure stream is properly closed
stream.destroy()
}
}
}

View File

@@ -1,10 +0,0 @@
exports.AbstractWriter = class AbstractWriter {
constructor({ backup, settings }) {
this._backup = backup
this._settings = settings
}
beforeBackup() {}
afterBackup() {}
}

View File

@@ -1,51 +0,0 @@
const { createLogger } = require('@xen-orchestra/log')
const { join } = require('path')
const { BACKUP_DIR, getVmBackupDir } = require('../_getVmBackupDir.js')
const MergeWorker = require('../merge-worker/index.js')
const { formatFilenameDate } = require('../_filenameDate.js')
const { warn } = createLogger('xo:backups:MixinBackupWriter')
exports.MixinBackupWriter = (BaseClass = Object) =>
class MixinBackupWriter extends BaseClass {
#lock
#vmBackupDir
constructor({ remoteId, ...rest }) {
super(rest)
this._adapter = rest.backup.remoteAdapters[remoteId]
this._remoteId = remoteId
this.#vmBackupDir = getVmBackupDir(this._backup.vm.uuid)
}
_cleanVm(options) {
return this._adapter
.cleanVm(this.#vmBackupDir, { ...options, fixMetadata: true, onLog: warn, lock: false })
.catch(warn)
}
async beforeBackup() {
const { handler } = this._adapter
const vmBackupDir = this.#vmBackupDir
await handler.mktree(vmBackupDir)
this.#lock = await handler.lock(vmBackupDir)
}
async afterBackup() {
const { disableMergeWorker } = this._backup.config
const { merge } = await this._cleanVm({ remove: true, merge: disableMergeWorker })
await this.#lock.dispose()
// merge worker only compatible with local remotes
const { handler } = this._adapter
if (merge && !disableMergeWorker && typeof handler._getRealPath === 'function') {
await handler.outputFile(join(MergeWorker.CLEAN_VM_QUEUE, formatFilenameDate(new Date())), this._backup.vm.uuid)
const remotePath = handler._getRealPath()
await MergeWorker.run(remotePath)
}
}
}

View File

@@ -1,8 +0,0 @@
exports.MixinReplicationWriter = (BaseClass = Object) =>
class MixinReplicationWriter extends BaseClass {
constructor({ sr, ...rest }) {
super(rest)
this._sr = sr
}
}

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env node
const defer = require('golike-defer').default
const { Ref, Xapi } = require('xen-api')
const { defer } = require('golike-defer')
const pkg = require('./package.json')
@@ -77,11 +77,7 @@ ${cliName} v${pkg.version}
'xo:backup:sr': tgtSr.uuid,
'xo:copy_of': srcSnapshotUuid,
}),
Promise.all(
['start', 'start_on'].map(op =>
tgtVm.update_blocked_operations(op, 'Start operation for this vm is blocked, clone it if you want to use it.')
)
),
tgtVm.update_blocked_operations('start', 'Start operation for this vm is blocked, clone it if you want to use it.'),
Promise.all(
userDevices.map(userDevice => {
const srcDisk = srcDisks[userDevice]

View File

@@ -18,7 +18,7 @@
"preferGlobal": true,
"dependencies": {
"golike-defer": "^0.5.1",
"xen-api": "^0.35.1"
"xen-api": "^0.31.0"
},
"scripts": {
"postversion": "npm publish"

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -28,6 +28,9 @@
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
@@ -42,6 +45,7 @@
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"@babel/preset-flow": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/defined",
"version": "0.0.1",
"version": "0.0.0",
"license": "ISC",
"description": "Utilities to help handling (possibly) undefined values",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/defined",
@@ -16,13 +16,30 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
"engines": {
"node": ">=6"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -11,7 +11,7 @@
// process.env.http_proxy
// ])
// ```
function defined() {
export default function defined() {
let args = arguments
let n = args.length
if (n === 1) {
@@ -29,7 +29,6 @@ function defined() {
}
}
}
module.exports = exports = defined
// Usage:
//
@@ -40,7 +39,7 @@ module.exports = exports = defined
// const getFriendName = _ => _.friends[0].name
// const friendName = get(getFriendName, props.user)
// ```
function get(accessor, arg) {
export const get = (accessor, arg) => {
try {
return accessor(arg)
} catch (error) {
@@ -50,7 +49,6 @@ function get(accessor, arg) {
}
}
}
exports.get = get
// Usage:
//
@@ -60,6 +58,4 @@ exports.get = get
// _ => new ProxyAgent(_)
// )
// ```
exports.ifDef = function ifDef(value, thenFn) {
return value !== undefined ? thenFn(value) : value
}
export const ifDef = (value, thenFn) => (value !== undefined ? thenFn(value) : value)

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/emit-async",
"version": "0.1.0",
"version": "0.0.0",
"license": "ISC",
"description": "Emit an event for async listeners to settle",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/emit-async",
@@ -16,13 +16,30 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"dist/"
],
"browserslist": [
">2%"
],
"engines": {
"node": ">=6"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -1,4 +1,4 @@
module.exports = function emitAsync(event) {
export default function emitAsync(event) {
let opts
let i = 1

View File

@@ -1 +0,0 @@
../../scripts/babel-eslintrc.js

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/fs",
"version": "0.18.0",
"version": "0.14.0",
"license": "AGPL-3.0-or-later",
"description": "The File System for Xen Orchestra backups.",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/fs",
@@ -13,27 +13,30 @@
},
"preferGlobal": true,
"main": "dist/",
"files": [
"dist/"
],
"engines": {
"node": ">=14"
"node": ">=8.10"
},
"dependencies": {
"@marsaud/smb2": "^0.18.0",
"@marsaud/smb2": "^0.17.2",
"@sindresorhus/df": "^3.1.1",
"@sullux/aws-sdk": "^1.0.5",
"@vates/coalesce-calls": "^0.1.0",
"@xen-orchestra/async-map": "^0.1.2",
"aws-sdk": "^2.686.0",
"decorator-synchronized": "^0.6.0",
"decorator-synchronized": "^0.5.0",
"execa": "^5.0.0",
"fs-extra": "^10.0.0",
"fs-extra": "^9.0.0",
"get-stream": "^6.0.0",
"limit-concurrency-decorator": "^0.5.0",
"limit-concurrency-decorator": "^0.4.0",
"lodash": "^4.17.4",
"promise-toolbox": "^0.20.0",
"promise-toolbox": "^0.18.0",
"proper-lockfile": "^4.1.2",
"readable-stream": "^3.0.6",
"through2": "^4.0.2",
"xo-remote-parser": "^0.7.0"
"tmp": "^0.2.1",
"xo-remote-parser": "^0.6.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
@@ -42,10 +45,12 @@
"@babel/plugin-proposal-function-bind": "^7.0.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.4.4",
"@babel/preset-env": "^7.0.0",
"@babel/preset-flow": "^7.0.0",
"async-iterator-to-stream": "^1.1.0",
"babel-plugin-lodash": "^3.3.2",
"cross-env": "^7.0.2",
"dotenv": "^10.0.0",
"dotenv": "^8.0.0",
"index-modules": "^0.3.0",
"rimraf": "^3.0.0"
},
"scripts": {

View File

@@ -1,21 +1,33 @@
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
// @flow
// $FlowFixMe
import getStream from 'get-stream'
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import { coalesceCalls } from '@vates/coalesce-calls'
import synchronized from 'decorator-synchronized'
import { fromCallback, fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { parse } from 'xo-remote-parser'
import { pipeline } from 'stream'
import { randomBytes } from 'crypto'
import { synchronized } from 'decorator-synchronized'
import { type Readable, type Writable } from 'stream'
import normalizePath from './_normalizePath'
import { createChecksumStream, validChecksumOfReadStream } from './checksum'
const { dirname } = path.posix
type Data = Buffer | Readable | string
type Disposable<T> = {| dispose: () => void | Promise<void>, value?: T |}
type FileDescriptor = {| fd: mixed, path: string |}
type LaxReadable = Readable & Object
type LaxWritable = Writable & Object
type RemoteInfo = { used?: number, size?: number }
type File = FileDescriptor | string
const checksumFile = file => file + '.checksum'
const computeRate = (hrtime, size) => {
const computeRate = (hrtime: number[], size: number) => {
const seconds = hrtime[0] + hrtime[1] / 1e9
return size / seconds
}
@@ -29,8 +41,6 @@ const ignoreEnoent = error => {
}
}
const noop = Function.prototype
class PrefixWrapper {
constructor(handler, prefix) {
this._prefix = prefix
@@ -63,7 +73,11 @@ class PrefixWrapper {
}
export default class RemoteHandlerAbstract {
constructor(remote, options = {}) {
_highWaterMark: number
_remote: Object
_timeout: number
constructor(remote: any, options: Object = {}) {
if (remote.url === 'test://') {
this._remote = remote
} else {
@@ -74,7 +88,7 @@ export default class RemoteHandlerAbstract {
}
;({ highWaterMark: this._highWaterMark, timeout: this._timeout = DEFAULT_TIMEOUT } = options)
const sharedLimit = limitConcurrency(options.maxParallelOperations ?? DEFAULT_MAX_PARALLEL_OPERATIONS)
const sharedLimit = limit(options.maxParallelOperations ?? DEFAULT_MAX_PARALLEL_OPERATIONS)
this.closeFile = sharedLimit(this.closeFile)
this.getInfo = sharedLimit(this.getInfo)
this.getSize = sharedLimit(this.getSize)
@@ -90,28 +104,25 @@ export default class RemoteHandlerAbstract {
this.unlink = sharedLimit(this.unlink)
this.write = sharedLimit(this.write)
this.writeFile = sharedLimit(this.writeFile)
this._forget = coalesceCalls(this._forget)
this._sync = coalesceCalls(this._sync)
}
// Public members
get type() {
get type(): string {
throw new Error('Not implemented')
}
addPrefix(prefix) {
addPrefix(prefix: string) {
prefix = normalizePath(prefix)
return prefix === '/' ? this : new PrefixWrapper(this, prefix)
}
async closeFile(fd) {
async closeFile(fd: FileDescriptor): Promise<void> {
await this.__closeFile(fd)
}
// TODO: remove method
async createOutputStream(file, { checksum = false, dirMode, ...options } = {}) {
async createOutputStream(file: File, { checksum = false, dirMode, ...options }: Object = {}): Promise<LaxWritable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
@@ -138,6 +149,7 @@ export default class RemoteHandlerAbstract {
stream.on('error', forwardError)
checksumStream.pipe(stream)
// $FlowFixMe
checksumStream.checksumWritten = checksumStream.checksum
.then(value => this._outputFile(checksumFile(path), value, { flags: 'wx' }))
.catch(forwardError)
@@ -145,7 +157,10 @@ export default class RemoteHandlerAbstract {
return checksumStream
}
createReadStream(file, { checksum = false, ignoreMissingChecksum = false, ...options } = {}) {
createReadStream(
file: File,
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
): Promise<LaxReadable> {
if (typeof file === 'string') {
file = normalizePath(file)
}
@@ -182,7 +197,7 @@ export default class RemoteHandlerAbstract {
checksum =>
streamP.then(stream => {
const { length } = stream
stream = validChecksumOfReadStream(stream, String(checksum).trim())
stream = (validChecksumOfReadStream(stream, String(checksum).trim()): LaxReadable)
stream.length = length
return stream
@@ -196,31 +211,16 @@ export default class RemoteHandlerAbstract {
)
}
/**
* write a stream to a file using a temporary file
*
* @param {string} path
* @param {ReadableStream} input
* @param {object} [options]
* @param {boolean} [options.checksum]
* @param {number} [options.dirMode]
* @param {(this: RemoteHandlerAbstract, path: string) => Promise<undefined>} [options.validator] Function that will be called before the data is commited to the remote, if it fails, file should not exist
*/
async outputStream(path, input, { checksum = true, dirMode, validator } = {}) {
path = normalizePath(path)
let checksumStream
if (checksum) {
checksumStream = createChecksumStream()
pipeline(input, checksumStream, noop)
input = checksumStream
}
await this._outputStream(path, input, {
// write a stream to a file using a temporary file
async outputStream(
path: string,
input: Readable | Promise<Readable>,
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
): Promise<void> {
return this._outputStream(normalizePath(path), await input, {
checksum,
dirMode,
validator,
})
if (checksum) {
await this._outputFile(checksumFile(path), await checksumStream.checksum, { dirMode, flags: 'wx' })
}
}
// Free the resources possibly dedicated to put the remote at work, when it
@@ -230,73 +230,73 @@ export default class RemoteHandlerAbstract {
// as mount), forgetting them might breaking other processes using the same
// remote.
@synchronized()
async forget() {
async forget(): Promise<void> {
await this._forget()
}
async getInfo() {
async getInfo(): Promise<RemoteInfo> {
return timeout.call(this._getInfo(), this._timeout)
}
async getSize(file) {
async getSize(file: File): Promise<number> {
return timeout.call(this._getSize(typeof file === 'string' ? normalizePath(file) : file), this._timeout)
}
async list(dir, { filter, ignoreMissing = false, prependDir = false } = {}) {
try {
const virtualDir = normalizePath(dir)
dir = normalizePath(dir)
async list(
dir: string,
{ filter, prependDir = false }: { filter?: (name: string) => boolean, prependDir?: boolean } = {}
): Promise<string[]> {
const virtualDir = normalizePath(dir)
dir = normalizePath(dir)
let entries = await timeout.call(this._list(dir), this._timeout)
if (filter !== undefined) {
entries = entries.filter(filter)
}
if (prependDir) {
entries.forEach((entry, i) => {
entries[i] = virtualDir + '/' + entry
})
}
return entries
} catch (error) {
if (ignoreMissing && error?.code === 'ENOENT') {
return []
}
throw error
let entries = await timeout.call(this._list(dir), this._timeout)
if (filter !== undefined) {
entries = entries.filter(filter)
}
if (prependDir) {
entries.forEach((entry, i) => {
entries[i] = virtualDir + '/' + entry
})
}
return entries
}
async lock(path) {
async lock(path: string): Promise<Disposable> {
path = normalizePath(path)
return { dispose: await this._lock(path) }
}
async mkdir(dir, { mode } = {}) {
async mkdir(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
await this.__mkdir(normalizePath(dir), { mode })
}
async mktree(dir, { mode } = {}) {
async mktree(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
await this._mktree(normalizePath(dir), { mode })
}
openFile(path, flags) {
openFile(path: string, flags: string): Promise<FileDescriptor> {
return this.__openFile(path, flags)
}
async outputFile(file, data, { dirMode, flags = 'wx' } = {}) {
async outputFile(
file: string,
data: Data,
{ dirMode, flags = 'wx' }: { dirMode?: number, flags?: string } = {}
): Promise<void> {
await this._outputFile(normalizePath(file), data, { dirMode, flags })
}
async read(file, buffer, position) {
async read(file: File, buffer: Buffer, position?: number): Promise<{| bytesRead: number, buffer: Buffer |}> {
return this._read(typeof file === 'string' ? normalizePath(file) : file, buffer, position)
}
async readFile(file, { flags = 'r' } = {}) {
async readFile(file: string, { flags = 'r' }: { flags?: string } = {}): Promise<Buffer> {
return this._readFile(normalizePath(file), { flags })
}
async rename(oldPath, newPath, { checksum = false } = {}) {
async rename(oldPath: string, newPath: string, { checksum = false }: Object = {}) {
oldPath = normalizePath(oldPath)
newPath = normalizePath(newPath)
@@ -307,11 +307,11 @@ export default class RemoteHandlerAbstract {
return p
}
async rmdir(dir) {
async rmdir(dir: string): Promise<void> {
await timeout.call(this._rmdir(normalizePath(dir)).catch(ignoreEnoent), this._timeout)
}
async rmtree(dir) {
async rmtree(dir: string): Promise<void> {
await this._rmtree(normalizePath(dir))
}
@@ -320,11 +320,11 @@ export default class RemoteHandlerAbstract {
//
// This method MUST ALWAYS be called before using the handler.
@synchronized()
async sync() {
async sync(): Promise<void> {
await this._sync()
}
async test() {
async test(): Promise<Object> {
const SIZE = 1024 * 1024 * 10
const testFileName = normalizePath(`${Date.now()}.test`)
const data = await fromCallback(randomBytes, SIZE)
@@ -359,11 +359,11 @@ export default class RemoteHandlerAbstract {
}
}
async truncate(file, len) {
async truncate(file: string, len: number): Promise<void> {
await this._truncate(file, len)
}
async unlink(file, { checksum = true } = {}) {
async unlink(file: string, { checksum = true }: Object = {}): Promise<void> {
file = normalizePath(file)
if (checksum) {
@@ -373,21 +373,21 @@ export default class RemoteHandlerAbstract {
await this._unlink(file).catch(ignoreEnoent)
}
async write(file, buffer, position) {
async write(file: File, buffer: Buffer, position: number): Promise<{| bytesWritten: number, buffer: Buffer |}> {
await this._write(typeof file === 'string' ? normalizePath(file) : file, buffer, position)
}
async writeFile(file, data, { flags = 'wx' } = {}) {
async writeFile(file: string, data: Data, { flags = 'wx' }: { flags?: string } = {}): Promise<void> {
await this._writeFile(normalizePath(file), data, { flags })
}
// Methods that can be called by private methods to avoid parallel limit on public methods
async __closeFile(fd) {
async __closeFile(fd: FileDescriptor): Promise<void> {
await timeout.call(this._closeFile(fd.fd), this._timeout)
}
async __mkdir(dir, { mode } = {}) {
async __mkdir(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
try {
await this._mkdir(dir, { mode })
} catch (error) {
@@ -400,7 +400,7 @@ export default class RemoteHandlerAbstract {
}
}
async __openFile(path, flags) {
async __openFile(path: string, flags: string): Promise<FileDescriptor> {
path = normalizePath(path)
return {
@@ -411,11 +411,11 @@ export default class RemoteHandlerAbstract {
// Methods that can be implemented by inheriting classes
async _closeFile(fd) {
async _closeFile(fd: mixed): Promise<void> {
throw new Error('Not implemented')
}
async _createOutputStream(file, { dirMode, ...options } = {}) {
async _createOutputStream(file: File, { dirMode, ...options }: Object = {}): Promise<LaxWritable> {
try {
return await this._createWriteStream(file, { ...options, highWaterMark: this._highWaterMark })
} catch (error) {
@@ -428,40 +428,40 @@ export default class RemoteHandlerAbstract {
return this._createOutputStream(file, options)
}
async _createReadStream(file, options) {
async _createReadStream(file: File, options?: Object): Promise<LaxReadable> {
throw new Error('Not implemented')
}
// createWriteStream takes highWaterMark as option even if it's not documented.
// Source: https://stackoverflow.com/questions/55026306/how-to-set-writeable-highwatermark
async _createWriteStream(file, options) {
async _createWriteStream(file: File, options: Object): Promise<LaxWritable> {
throw new Error('Not implemented')
}
// called to finalize the remote
async _forget() {}
async _forget(): Promise<void> {}
async _getInfo() {
async _getInfo(): Promise<Object> {
return {}
}
async _lock(path) {
async _lock(path: string): Promise<Function> {
return () => Promise.resolve()
}
async _getSize(file) {
async _getSize(file: File): Promise<number> {
throw new Error('Not implemented')
}
async _list(dir) {
async _list(dir: string): Promise<string[]> {
throw new Error('Not implemented')
}
async _mkdir(dir) {
async _mkdir(dir: string): Promise<void> {
throw new Error('Not implemented')
}
async _mktree(dir, { mode } = {}) {
async _mktree(dir: string, { mode }: { mode?: number } = {}): Promise<void> {
try {
return await this.__mkdir(dir, { mode })
} catch (error) {
@@ -474,11 +474,11 @@ export default class RemoteHandlerAbstract {
return this._mktree(dir, { mode })
}
async _openFile(path, flags) {
async _openFile(path: string, flags: string): Promise<mixed> {
throw new Error('Not implemented')
}
async _outputFile(file, data, { dirMode, flags }) {
async _outputFile(file: string, data: Data, { dirMode, flags }: { dirMode?: number, flags?: string }): Promise<void> {
try {
return await this._writeFile(file, data, { flags })
} catch (error) {
@@ -491,40 +491,42 @@ export default class RemoteHandlerAbstract {
return this._outputFile(file, data, { flags })
}
async _outputStream(path, input, { dirMode, validator }) {
async _outputStream(path: string, input: Readable, { checksum, dirMode }: { checksum?: boolean, dirMode?: number }) {
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await this.createOutputStream(tmpPath, {
checksum,
dirMode,
})
try {
await fromCallback(pipeline, input, output)
if (validator !== undefined) {
await validator.call(this, tmpPath)
}
await this.rename(tmpPath, path)
input.pipe(output)
await fromEvent(output, 'finish')
await output.checksumWritten
// $FlowFixMe
await input.task
await this.rename(tmpPath, path, { checksum })
} catch (error) {
await this.unlink(tmpPath)
await this.unlink(tmpPath, { checksum })
throw error
}
}
_read(file, buffer, position) {
_read(file: File, buffer: Buffer, position?: number): Promise<{| bytesRead: number, buffer: Buffer |}> {
throw new Error('Not implemented')
}
_readFile(file, options) {
_readFile(file: string, options?: Object): Promise<Buffer> {
return this._createReadStream(file, { ...options, highWaterMark: this._highWaterMark }).then(getStream.buffer)
}
async _rename(oldPath, newPath) {
async _rename(oldPath: string, newPath: string) {
throw new Error('Not implemented')
}
async _rmdir(dir) {
async _rmdir(dir: string) {
throw new Error('Not implemented')
}
async _rmtree(dir) {
async _rmtree(dir: string) {
try {
return await this._rmdir(dir)
} catch (error) {
@@ -546,13 +548,13 @@ export default class RemoteHandlerAbstract {
}
// called to initialize the remote
async _sync() {}
async _sync(): Promise<void> {}
async _unlink(file) {
async _unlink(file: string): Promise<void> {
throw new Error('Not implemented')
}
async _write(file, buffer, position) {
async _write(file: File, buffer: Buffer, position: number): Promise<void> {
const isPath = typeof file === 'string'
if (isPath) {
file = await this.__openFile(file, 'r+')
@@ -566,11 +568,11 @@ export default class RemoteHandlerAbstract {
}
}
async _writeFd(fd, buffer, position) {
async _writeFd(fd: FileDescriptor, buffer: Buffer, position: number): Promise<void> {
throw new Error('Not implemented')
}
async _writeFile(file, data, options) {
async _writeFile(file: string, data: Data, options: { flags?: string }): Promise<void> {
throw new Error('Not implemented')
}
}

View File

@@ -1,7 +1,10 @@
// @flow
import through2 from 'through2'
import { createHash } from 'crypto'
import { defer, fromEvent } from 'promise-toolbox'
import { invert } from 'lodash'
import { type Readable, type Transform } from 'stream'
// Format: $<algorithm>$<salt>$<encrypted>
//
@@ -24,7 +27,7 @@ const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// const checksumStream = source.pipe(createChecksumStream())
// checksumStream.resume() // make the data flow without an output
// console.log(await checksumStream.checksum)
export const createChecksumStream = (algorithm = 'md5') => {
export const createChecksumStream = (algorithm: string = 'md5'): Transform & { checksum: Promise<string> } => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
@@ -51,7 +54,10 @@ export const createChecksumStream = (algorithm = 'md5') => {
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
export const validChecksumOfReadStream = (
stream: Readable,
expectedChecksum: string
): Readable & { checksumVerified: Promise<void> } => {
const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1))
if (!algorithmId) {
@@ -60,7 +66,7 @@ export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(
const wrapper: any = stream.pipe(
through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {

View File

@@ -133,14 +133,6 @@ handlers.forEach(url => {
await handler.outputFile('dir/file', '')
expect(await handler.list('dir', { prependDir: true })).toEqual(['/dir/file'])
})
it('throws ENOENT if no such directory', async () => {
expect((await rejectionOf(handler.list('dir'))).code).toBe('ENOENT')
})
it('can returns empty for missing directory', async () => {
expect(await handler.list('dir', { ignoreMissing: true })).toEqual([])
})
})
describe('#mkdir()', () => {

View File

@@ -1,12 +1,16 @@
// @flow
import execa from 'execa'
import { parse } from 'xo-remote-parser'
import type RemoteHandler from './abstract'
import RemoteHandlerLocal from './local'
import RemoteHandlerNfs from './nfs'
import RemoteHandlerS3 from './s3'
import RemoteHandlerSmb from './smb'
import RemoteHandlerSmbMount from './smb-mount'
export type { default as RemoteHandler } from './abstract'
export type Remote = { url: string }
const HANDLERS = {
file: RemoteHandlerLocal,
nfs: RemoteHandlerNfs,
@@ -20,19 +24,13 @@ try {
HANDLERS.smb = RemoteHandlerSmb
}
export const getHandler = (remote, ...rest) => {
const Handler = HANDLERS[parse(remote.url).type]
export const getHandler = (remote: Remote, ...rest: any): RemoteHandler => {
// FIXME: should be done in xo-remote-parser.
const type = remote.url.split('://')[0]
const Handler = HANDLERS[type]
if (!Handler) {
throw new Error('Unhandled remote type')
}
return new Handler(remote, ...rest)
}
export const getSyncedHandler = async (...opts) => {
const handler = getHandler(...opts)
await handler.sync()
return {
dispose: () => handler.forget(),
value: handler,
}
}

View File

@@ -84,7 +84,7 @@ export default class LocalHandler extends RemoteHandlerAbstract {
}
_lock(path) {
return lockfile.lock(this._getFilePath(path))
return lockfile.lock(path)
}
_mkdir(dir, { mode }) {

View File

@@ -8,6 +8,7 @@ export default class NfsHandler extends MountHandler {
super(remote, opts, {
type: 'nfs',
device: `${host}${port !== undefined ? ':' + port : ''}:${path}`,
defaultOptions: 'vers=3',
})
}

View File

@@ -1,9 +1,9 @@
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import http from 'http'
import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
import { createChecksumStream } from './checksum'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -16,8 +16,9 @@ const IDEAL_FRAGMENT_SIZE = Math.ceil(MAX_OBJECT_SIZE / MAX_PARTS_COUNT) // the
export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const { host, path, username, password, protocol, region } = parse(remote.url)
const params = {
const { host, path, username, password } = parse(remote.url)
// https://www.zenko.io/blog/first-things-first-getting-started-scality-s3-server/
this._s3 = aws({
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: host,
@@ -27,16 +28,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
httpOptions: {
timeout: 600000,
},
}
if (protocol === 'http') {
params.httpOptions.agent = new http.Agent()
params.sslEnabled = false
}
if (region !== undefined) {
params.region = region
}
this._s3 = aws(params).s3
}).s3
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
@@ -51,69 +43,46 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}
async _isNotEmptyDir(path) {
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
MaxKeys: 1,
Prefix: this._dir + path + '/',
})
return result.Contents.length !== 0
}
async _isFile(path) {
try {
await this._s3.headObject(this._createParams(path))
return true
} catch (error) {
if (error.code === 'NotFound') {
return false
async _outputStream(path, input, { checksum }) {
let inputStream = input
if (checksum) {
const checksumStream = createChecksumStream()
const forwardError = error => {
checksumStream.emit('error', error)
}
throw error
input.pipe(checksumStream)
input.on('error', forwardError)
inputStream = checksumStream
}
}
async _outputStream(path, input, { validator }) {
await this._s3.upload(
{
...this._createParams(path),
Body: input,
Body: inputStream,
},
{ partSize: IDEAL_FRAGMENT_SIZE, queueSize: 1 }
)
if (validator !== undefined) {
try {
await validator.call(this, path)
} catch (error) {
await this.unlink(path)
throw error
if (checksum) {
const checksum = await inputStream.checksum
const params = {
...this._createParams(path + '.checksum'),
Body: checksum,
}
await this._s3.upload(params)
}
await input.task
}
async _writeFile(file, data, options) {
return this._s3.putObject({ ...this._createParams(file), Body: data })
}
async _createReadStream(path, options) {
if (!(await this._isFile(path))) {
const error = new Error(`ENOENT: no such file '${path}'`)
error.code = 'ENOENT'
error.path = path
throw error
}
async _createReadStream(file, options) {
// https://github.com/Sullux/aws-sdk/issues/11
return this._s3.getObject.raw(this._createParams(path)).createReadStream()
return this._s3.getObject.raw(this._createParams(file)).createReadStream()
}
async _unlink(path) {
await this._s3.deleteObject(this._createParams(path))
if (await this._isNotEmptyDir(path)) {
const error = new Error(`EISDIR: illegal operation on a directory, unlink '${path}'`)
error.code = 'EISDIR'
error.path = path
throw error
}
async _unlink(file) {
return this._s3.deleteObject(this._createParams(file))
}
async _list(dir) {
@@ -137,16 +106,6 @@ export default class S3Handler extends RemoteHandlerAbstract {
return [...uniq]
}
async _mkdir(path) {
if (await this._isFile(path)) {
const error = new Error(`ENOTDIR: file already exists, mkdir '${path}'`)
error.code = 'ENOTDIR'
error.path = path
throw error
}
// nothing to do, directories do not exist, they are part of the files' path
}
async _rename(oldPath, newPath) {
const size = await this._getSize(oldPath)
const multipartParams = await this._s3.createMultipartUpload({ ...this._createParams(newPath) })
@@ -183,32 +142,9 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
const params = this._createParams(file)
params.Range = `bytes=${position}-${position + buffer.length - 1}`
try {
const result = await this._s3.getObject(params)
result.Body.copy(buffer)
return { bytesRead: result.Body.length, buffer }
} catch (e) {
if (e.code === 'NoSuchKey') {
if (await this._isNotEmptyDir(file)) {
const error = new Error(`${file} is a directory`)
error.code = 'EISDIR'
error.path = file
throw error
}
}
throw e
}
}
async _rmdir(path) {
if (await this._isNotEmptyDir(path)) {
const error = new Error(`ENOTEMPTY: directory not empty, rmdir '${path}`)
error.code = 'ENOTEMPTY'
error.path = path
throw error
}
// nothing to do, directories do not exist, they are part of the files' path
const result = await this._s3.getObject(params)
result.Body.copy(buffer)
return { bytesRead: result.Body.length, buffer }
}
async _write(file, buffer, position) {

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -66,10 +66,6 @@ configure([
// if filter is a string, then it is pattern
// (https://github.com/visionmedia/debug#wildcards) which is
// matched against the namespace of the logs
//
// If it's an array, it will be handled as an array of filters
// and the transport will be used if any one of them match the
// current log
filter: process.env.DEBUG,
transport: transportConsole(),

View File

@@ -1,132 +1 @@
const createConsoleTransport = require('./transports/console')
const { LEVELS, resolve } = require('./levels')
const { compileGlobPattern } = require('./utils')
// ===================================================================
const compileFilter = filter => {
if (filter === undefined) {
return
}
const type = typeof filter
if (type === 'function') {
return filter
}
if (type === 'string') {
const re = compileGlobPattern(filter)
return log => re.test(log.namespace)
}
if (Array.isArray(filter)) {
const filters = filter.map(compileFilter).filter(_ => _ !== undefined)
const { length } = filters
if (length === 0) {
return
}
if (length === 1) {
return filters[0]
}
return log => {
for (let i = 0; i < length; ++i) {
if (filters[i](log)) {
return true
}
}
return false
}
}
throw new TypeError('unsupported `filter`')
}
const createTransport = config => {
if (typeof config === 'function') {
return config
}
if (Array.isArray(config)) {
const transports = config.map(createTransport)
const { length } = transports
return function () {
for (let i = 0; i < length; ++i) {
transports[i].apply(this, arguments)
}
}
}
const level = resolve(config.level)
const filter = compileFilter([config.filter, level === undefined ? undefined : log => log.level >= level])
let transport = createTransport(config.transport)
if (filter !== undefined) {
const orig = transport
transport = function (log) {
if (filter(log)) {
return orig.apply(this, arguments)
}
}
}
return transport
}
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
const { env } = process
global[symbol] = createTransport({
// display warnings or above, and all that are enabled via DEBUG or
// NODE_DEBUG env
filter: [env.DEBUG, env.NODE_DEBUG].filter(Boolean).join(','),
level: resolve(env.LOG_LEVEL, LEVELS.INFO),
transport: createConsoleTransport(),
})
const configure = config => {
global[symbol] = createTransport(config)
}
exports.configure = configure
// -------------------------------------------------------------------
const catchGlobalErrors = logger => {
// patch process
const onUncaughtException = error => {
logger.error('uncaught exception', { error })
}
const onUnhandledRejection = error => {
logger.warn('possibly unhandled rejection', { error })
}
const onWarning = error => {
logger.warn('Node warning', { error })
}
process.on('uncaughtException', onUncaughtException)
process.on('unhandledRejection', onUnhandledRejection)
process.on('warning', onWarning)
// patch EventEmitter
const EventEmitter = require('events')
const { prototype } = EventEmitter
const { emit } = prototype
function patchedEmit(event, error) {
if (event === 'error' && this.listenerCount(event) === 0) {
logger.error('unhandled error event', { error })
return false
}
return emit.apply(this, arguments)
}
prototype.emit = patchedEmit
return () => {
process.removeListener('uncaughtException', onUncaughtException)
process.removeListener('unhandledRejection', onUnhandledRejection)
process.removeListener('warning', onWarning)
if (prototype.emit === patchedEmit) {
prototype.emit = emit
}
}
}
exports.catchGlobalErrors = catchGlobalErrors
module.exports = require('./dist/configure')

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/log",
"version": "0.3.0",
"version": "0.2.0",
"license": "ISC",
"description": "Logging system with decoupled producers/consumer",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/log",
@@ -16,6 +16,12 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"files": [
"configure.js",
"dist/",
"transports/"
],
"browserslist": [
">2%"
],
@@ -24,9 +30,24 @@
},
"dependencies": {
"lodash": "^4.17.4",
"promise-toolbox": "^0.20.0"
"promise-toolbox": "^0.18.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"babel-plugin-lodash": "^3.3.2",
"cross-env": "^7.0.2",
"index-modules": "^0.3.0",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -0,0 +1,105 @@
import createConsoleTransport from './transports/console'
import LEVELS, { resolve } from './levels'
import { compileGlobPattern } from './utils'
// ===================================================================
const createTransport = config => {
if (typeof config === 'function') {
return config
}
if (Array.isArray(config)) {
const transports = config.map(createTransport)
const { length } = transports
return function () {
for (let i = 0; i < length; ++i) {
transports[i].apply(this, arguments)
}
}
}
let { filter } = config
let transport = createTransport(config.transport)
const level = resolve(config.level)
if (filter !== undefined) {
if (typeof filter === 'string') {
const re = compileGlobPattern(filter)
filter = log => re.test(log.namespace)
}
const orig = transport
transport = function (log) {
if ((level !== undefined && log.level >= level) || filter(log)) {
return orig.apply(this, arguments)
}
}
} else if (level !== undefined) {
const orig = transport
transport = function (log) {
if (log.level >= level) {
return orig.apply(this, arguments)
}
}
}
return transport
}
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
const { env } = process
global[symbol] = createTransport({
// display warnings or above, and all that are enabled via DEBUG or
// NODE_DEBUG env
filter: [env.DEBUG, env.NODE_DEBUG].filter(Boolean).join(','),
level: resolve(env.LOG_LEVEL, LEVELS.INFO),
transport: createConsoleTransport(),
})
export const configure = config => {
global[symbol] = createTransport(config)
}
// -------------------------------------------------------------------
export const catchGlobalErrors = logger => {
// patch process
const onUncaughtException = error => {
logger.error('uncaught exception', { error })
}
const onUnhandledRejection = error => {
logger.warn('possibly unhandled rejection', { error })
}
const onWarning = error => {
logger.warn('Node warning', { error })
}
process.on('uncaughtException', onUncaughtException)
process.on('unhandledRejection', onUnhandledRejection)
process.on('warning', onWarning)
// patch EventEmitter
const EventEmitter = require('events')
const { prototype } = EventEmitter
const { emit } = prototype
function patchedEmit(event, error) {
if (event === 'error' && this.listenerCount(event) === 0) {
logger.error('unhandled error event', { error })
return false
}
return emit.apply(this, arguments)
}
prototype.emit = patchedEmit
return () => {
process.removeListener('uncaughtException', onUncaughtException)
process.removeListener('unhandledRejection', onUnhandledRejection)
process.removeListener('warning', onWarning)
if (prototype.emit === patchedEmit) {
prototype.emit = emit
}
}
}

View File

@@ -1,5 +1,5 @@
const createTransport = require('./transports/console')
const { LEVELS, resolve } = require('./levels')
import createTransport from './transports/console'
import LEVELS, { resolve } from './levels'
const symbol = typeof Symbol !== 'undefined' ? Symbol.for('@xen-orchestra/log') : '@@@xen-orchestra/log'
if (!(symbol in global)) {
@@ -68,7 +68,5 @@ prototype.wrap = function (message, fn) {
}
}
const createLogger = namespace => new Logger(namespace)
module.exports = exports = createLogger
exports.createLogger = createLogger
export const createLogger = namespace => new Logger(namespace)
export { createLogger as default }

View File

@@ -1,5 +1,5 @@
const LEVELS = Object.create(null)
exports.LEVELS = LEVELS
export { LEVELS as default }
// https://github.com/trentm/node-bunyan#levels
LEVELS.FATAL = 60 // service/app is going down
@@ -8,8 +8,7 @@ LEVELS.WARN = 40 // something went wrong but it's not fatal
LEVELS.INFO = 30 // detail on unusual but normal operation
LEVELS.DEBUG = 20
const NAMES = Object.create(null)
exports.NAMES = NAMES
export const NAMES = Object.create(null)
for (const name in LEVELS) {
NAMES[LEVELS[name]] = name
}
@@ -17,7 +16,7 @@ for (const name in LEVELS) {
// resolves to the number representation of a level
//
// returns `defaultLevel` if invalid
const resolve = (level, defaultLevel) => {
export const resolve = (level, defaultLevel) => {
const type = typeof level
if (type === 'number') {
if (level in NAMES) {
@@ -31,7 +30,6 @@ const resolve = (level, defaultLevel) => {
}
return defaultLevel
}
exports.resolve = resolve
Object.freeze(LEVELS)
Object.freeze(NAMES)

View File

@@ -1,8 +1,8 @@
/* eslint-env jest */
const { forEach, isInteger } = require('lodash')
import { forEach, isInteger } from 'lodash'
const { LEVELS, NAMES, resolve } = require('./levels')
import LEVELS, { NAMES, resolve } from './levels'
describe('LEVELS', () => {
it('maps level names to their integer values', () => {

View File

@@ -0,0 +1,79 @@
import LEVELS, { NAMES } from '../levels'
const { DEBUG, ERROR, FATAL, INFO, WARN } = LEVELS
let formatLevel, formatNamespace
if (process.stdout !== undefined && process.stdout.isTTY && process.stderr !== undefined && process.stderr.isTTY) {
const ansi = (style, str) => `\x1b[${style}m${str}\x1b[0m`
const LEVEL_STYLES = {
[DEBUG]: '2',
[ERROR]: '1;31',
[FATAL]: '1;31',
[INFO]: '1',
[WARN]: '1;33',
}
formatLevel = level => {
const style = LEVEL_STYLES[level]
const name = NAMES[level]
return style === undefined ? name : ansi(style, name)
}
const NAMESPACE_COLORS = [
196,
202,
208,
214,
220,
226,
190,
154,
118,
82,
46,
47,
48,
49,
50,
51,
45,
39,
33,
27,
21,
57,
93,
129,
165,
201,
200,
199,
198,
197,
]
formatNamespace = namespace => {
// https://werxltd.com/wp/2010/05/13/javascript-implementation-of-javas-string-hashcode-method/
let hash = 0
for (let i = 0, n = namespace.length; i < n; ++i) {
hash = ((hash << 5) - hash + namespace.charCodeAt(i)) | 0
}
return ansi(`1;38;5;${NAMESPACE_COLORS[Math.abs(hash) % NAMESPACE_COLORS.length]}`, namespace)
}
} else {
formatLevel = str => NAMES[str]
formatNamespace = str => str
}
const consoleTransport = ({ data, level, namespace, message, time }) => {
const fn =
/* eslint-disable no-console */
level < INFO ? console.log : level < WARN ? console.info : level < ERROR ? console.warn : console.error
/* eslint-enable no-console */
const args = [time.toISOString(), formatNamespace(namespace), formatLevel(level), message]
if (data != null) {
args.push(data)
}
fn.apply(console, args)
}
export default () => consoleTransport

View File

@@ -0,0 +1,64 @@
import fromCallback from 'promise-toolbox/fromCallback'
import prettyFormat from 'pretty-format' // eslint-disable-line node/no-extraneous-import
import { createTransport } from 'nodemailer' // eslint-disable-line node/no-extraneous-import
import { evalTemplate, required } from '../utils'
import { NAMES } from '../levels'
export default ({
// transport options (https://nodemailer.com/smtp/)
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
// message options (https://nodemailer.com/message/)
bcc,
cc,
from = required('from'),
to = required('to'),
subject = '[{{level}} - {{namespace}}] {{time}} {{message}}',
}) => {
const transporter = createTransport(
{
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
disableFileAccess: true,
disableUrlAccess: true,
},
{
bcc,
cc,
from,
to,
}
)
return log =>
fromCallback(cb =>
transporter.sendMail(
{
subject: evalTemplate(subject, key =>
key === 'level' ? NAMES[log.level] : key === 'time' ? log.time.toISOString() : log[key]
),
text: prettyFormat(log.data),
},
cb
)
)
}

View File

@@ -0,0 +1,7 @@
export default () => {
const memoryLogger = log => {
logs.push(log)
}
const logs = (memoryLogger.logs = [])
return memoryLogger
}

View File

@@ -0,0 +1,41 @@
import fromCallback from 'promise-toolbox/fromCallback'
import splitHost from 'split-host'
import { createClient, Facility, Severity, Transport } from 'syslog-client'
import LEVELS from '../levels'
// https://github.com/paulgrove/node-syslog-client#syslogseverity
const LEVEL_TO_SEVERITY = {
[LEVELS.FATAL]: Severity.Critical,
[LEVELS.ERROR]: Severity.Error,
[LEVELS.WARN]: Severity.Warning,
[LEVELS.INFO]: Severity.Informational,
[LEVELS.DEBUG]: Severity.Debug,
}
const facility = Facility.User
export default target => {
const opts = {}
if (target !== undefined) {
if (target.startsWith('tcp://')) {
target = target.slice(6)
opts.transport = Transport.Tcp
} else if (target.startsWith('udp://')) {
target = target.slice(6)
opts.transport = Transport.Udp
}
;({ host: target, port: opts.port } = splitHost(target))
}
const client = createClient(target, opts)
return log =>
fromCallback(cb =>
client.log(log.message, {
facility,
severity: LEVEL_TO_SEVERITY[log.level],
})
)
}

View File

@@ -1,20 +1,19 @@
const escapeRegExp = require('lodash/escapeRegExp')
import escapeRegExp from 'lodash/escapeRegExp'
// ===================================================================
const TPL_RE = /\{\{(.+?)\}\}/g
const evalTemplate = (tpl, data) => {
export const evalTemplate = (tpl, data) => {
const getData = typeof data === 'function' ? (_, key) => data(key) : (_, key) => data[key]
return tpl.replace(TPL_RE, getData)
}
exports.evalTemplate = evalTemplate
// -------------------------------------------------------------------
const compileGlobPatternFragment = pattern => pattern.split('*').map(escapeRegExp).join('.*')
const compileGlobPattern = pattern => {
export const compileGlobPattern = pattern => {
const no = []
const yes = []
pattern.split(/[\s,]+/).forEach(pattern => {
@@ -41,22 +40,19 @@ const compileGlobPattern = pattern => {
return new RegExp(raw.join(''))
}
exports.compileGlobPattern = compileGlobPattern
// -------------------------------------------------------------------
const required = name => {
export const required = name => {
throw new Error(`missing required arg ${name}`)
}
exports.required = required
// -------------------------------------------------------------------
const serializeError = error => ({
export const serializeError = error => ({
...error, // Copy enumerable properties.
code: error.code,
message: error.message,
name: error.name,
stack: error.stack,
})
exports.serializeError = serializeError

View File

@@ -1,6 +1,6 @@
/* eslint-env jest */
const { compileGlobPattern } = require('./utils')
import { compileGlobPattern } from './utils'
describe('compileGlobPattern()', () => {
it('works', () => {

View File

@@ -1,54 +1 @@
const { LEVELS, NAMES } = require('../levels')
const { DEBUG, ERROR, FATAL, INFO, WARN } = LEVELS
let formatLevel, formatNamespace
if (process.stdout !== undefined && process.stdout.isTTY && process.stderr !== undefined && process.stderr.isTTY) {
const ansi = (style, str) => `\x1b[${style}m${str}\x1b[0m`
const LEVEL_STYLES = {
[DEBUG]: '2',
[ERROR]: '1;31',
[FATAL]: '1;31',
[INFO]: '1',
[WARN]: '1;33',
}
formatLevel = level => {
const style = LEVEL_STYLES[level]
const name = NAMES[level]
return style === undefined ? name : ansi(style, name)
}
const NAMESPACE_COLORS = [
196, 202, 208, 214, 220, 226, 190, 154, 118, 82, 46, 47, 48, 49, 50, 51, 45, 39, 33, 27, 21, 57, 93, 129, 165, 201,
200, 199, 198, 197,
]
formatNamespace = namespace => {
// https://werxltd.com/wp/2010/05/13/javascript-implementation-of-javas-string-hashcode-method/
let hash = 0
for (let i = 0, n = namespace.length; i < n; ++i) {
hash = ((hash << 5) - hash + namespace.charCodeAt(i)) | 0
}
return ansi(`1;38;5;${NAMESPACE_COLORS[Math.abs(hash) % NAMESPACE_COLORS.length]}`, namespace)
}
} else {
formatLevel = str => NAMES[str]
formatNamespace = str => str
}
const consoleTransport = ({ data, level, namespace, message, time }) => {
const fn =
/* eslint-disable no-console */
level < INFO ? console.log : level < WARN ? console.info : level < ERROR ? console.warn : console.error
/* eslint-enable no-console */
const args = [time.toISOString(), formatNamespace(namespace), formatLevel(level), message]
if (data != null) {
args.push(data)
}
fn.apply(console, args)
}
const createTransport = () => consoleTransport
module.exports = exports = createTransport
module.exports = require('../dist/transports/console.js')

View File

@@ -1,66 +1 @@
const fromCallback = require('promise-toolbox/fromCallback')
const nodemailer = require('nodemailer') // eslint-disable-line node/no-extraneous-import
const prettyFormat = require('pretty-format') // eslint-disable-line node/no-extraneous-import
const { evalTemplate, required } = require('../utils')
const { NAMES } = require('../levels')
function createTransport({
// transport options (https://nodemailer.com/smtp/)
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
// message options (https://nodemailer.com/message/)
bcc,
cc,
from = required('from'),
to = required('to'),
subject = '[{{level}} - {{namespace}}] {{time}} {{message}}',
}) {
const transporter = nodemailer.createTransport(
{
auth,
authMethod,
host,
ignoreTLS,
port,
proxy,
requireTLS,
secure,
service,
tls,
disableFileAccess: true,
disableUrlAccess: true,
},
{
bcc,
cc,
from,
to,
}
)
return log =>
fromCallback(cb =>
transporter.sendMail(
{
subject: evalTemplate(subject, key =>
key === 'level' ? NAMES[log.level] : key === 'time' ? log.time.toISOString() : log[key]
),
text: prettyFormat(log.data),
},
cb
)
)
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/email.js')

View File

@@ -1,9 +1 @@
function createTransport() {
const memoryLogger = log => {
logs.push(log)
}
const logs = (memoryLogger.logs = [])
return memoryLogger
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/memory.js')

View File

@@ -1,43 +1 @@
const fromCallback = require('promise-toolbox/fromCallback')
const splitHost = require('split-host')
const { createClient, Facility, Severity, Transport } = require('syslog-client')
const LEVELS = require('../levels')
// https://github.com/paulgrove/node-syslog-client#syslogseverity
const LEVEL_TO_SEVERITY = {
[LEVELS.FATAL]: Severity.Critical,
[LEVELS.ERROR]: Severity.Error,
[LEVELS.WARN]: Severity.Warning,
[LEVELS.INFO]: Severity.Informational,
[LEVELS.DEBUG]: Severity.Debug,
}
const facility = Facility.User
function createTransport(target) {
const opts = {}
if (target !== undefined) {
if (target.startsWith('tcp://')) {
target = target.slice(6)
opts.transport = Transport.Tcp
} else if (target.startsWith('udp://')) {
target = target.slice(6)
opts.transport = Transport.Udp
}
;({ host: target, port: opts.port } = splitHost(target))
}
const client = createClient(target, opts)
return log =>
fromCallback(cb =>
client.log(log.message, {
facility,
severity: LEVEL_TO_SEVERITY[log.level],
})
)
}
module.exports = exports = createTransport
module.exports = require('../dist/transports/syslog.js')

View File

@@ -0,0 +1 @@
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -1,39 +0,0 @@
const camelCase = require('lodash/camelCase')
const { defineProperties, defineProperty, keys } = Object
const noop = Function.prototype
const MIXIN_CYCLIC_DESCRIPTOR = {
configurable: true,
get() {
throw new Error('cyclic dependency')
},
}
module.exports = function mixin(object, mixins, args) {
// add lazy property for each of the mixin, this allows mixins to depend on
// one another without any special ordering
const descriptors = {}
keys(mixins).forEach(name => {
const Mixin = mixins[name]
name = camelCase(name)
descriptors[name] = {
configurable: true,
get: () => {
defineProperty(object, name, MIXIN_CYCLIC_DESCRIPTOR)
const instance = new Mixin(object, ...args)
defineProperty(object, name, {
value: instance,
})
return instance
},
}
})
defineProperties(object, descriptors)
// access all mixin properties to trigger their creation
keys(descriptors).forEach(name => {
noop(object[name])
})
}

Some files were not shown because too many files have changed in this diff Show More