Compare commits

..

1 Commits

Author SHA1 Message Date
Julien Fontanet
282805966b WiP: feat(xen-api/getCachedRecord): getRecord + cache + events
Fixes #5088
2022-03-01 13:37:03 +01:00
362 changed files with 5809 additions and 14076 deletions

View File

@@ -1,7 +1,7 @@
'use strict'
module.exports = {
extends: ['plugin:eslint-comments/recommended', 'plugin:n/recommended', 'standard', 'standard-jsx', 'prettier'],
extends: ['plugin:eslint-comments/recommended', 'standard', 'standard-jsx', 'prettier'],
globals: {
__DEV__: true,
$Dict: true,
@@ -17,7 +17,6 @@ module.exports = {
{
files: ['cli.{,c,m}js', '*-cli.{,c,m}js', '**/*cli*/**/*.{,c,m}js'],
rules: {
'n/no-process-exit': 'off',
'no-console': 'off',
},
},
@@ -27,23 +26,6 @@ module.exports = {
sourceType: 'module',
},
},
{
files: ['*.spec.{,c,m}js'],
rules: {
'n/no-unsupported-features/node-builtins': [
'error',
{
version: '>=16',
},
],
'n/no-unsupported-features/es-syntax': [
'error',
{
version: '>=16',
},
],
},
},
],
parserOptions: {

16
.flowconfig Normal file
View File

@@ -0,0 +1,16 @@
[ignore]
<PROJECT_ROOT>/node_modules/.*
[include]
[libs]
[lints]
[options]
esproposal.decorators=ignore
esproposal.optional_chaining=enable
include_warnings=true
module.use_strict=true
[strict]

View File

@@ -6,18 +6,6 @@ labels: 'status: triaging :triangular_flag_on_post:, type: bug :bug:'
assignees: ''
---
**XOA or XO from the sources?**
If XOA:
- which release channel? (`stable` vs `latest`)
- please consider creating a support ticket in [your dedicated support area](https://xen-orchestra.com/#!/member/support)
If XO from the sources:
- Don't forget to [read this first](https://xen-orchestra.com/docs/community.html)
- As well as follow [this guide](https://xen-orchestra.com/docs/community.html#report-a-bug)
**Describe the bug**
A clear and concise description of what the bug is.
@@ -35,7 +23,7 @@ A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Environment (please provide the following information):**
**Desktop (please complete the following information):**
- Node: [e.g. 16.12.1]
- xo-server: [e.g. 5.82.3]

View File

@@ -1,13 +0,0 @@
name: CI
on: [push]
jobs:
build:
name: Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: satackey/action-docker-layer-caching@v0.0.11
# Ignore the failure of a step and avoid terminating the job.
continue-on-error: true
- run: docker-compose -f docker/docker-compose.dev.yml build
- run: docker-compose -f docker/docker-compose.dev.yml up

8
.gitignore vendored
View File

@@ -1,4 +1,5 @@
/_book/
/coverage/
/node_modules/
/lerna-debug.log
/lerna-debug.log.*
@@ -10,6 +11,8 @@
/packages/*/dist/
/packages/*/node_modules/
/@xen-orchestra/proxy/src/app/mixins/index.mjs
/packages/vhd-cli/src/commands/index.js
/packages/xen-api/examples/node_modules/
@@ -33,6 +36,5 @@ yarn-error.log
yarn-error.log.*
.env
# code coverage
.nyc_output/
coverage/
# nyc test coverage
.nyc_output

23
.travis.yml Normal file
View File

@@ -0,0 +1,23 @@
language: node_js
node_js:
- 14
# Use containers.
# http://docs.travis-ci.com/user/workers/container-based-infrastructure/
sudo: false
addons:
apt:
packages:
- qemu-utils
- blktap-utils
- vmdk-stream-converter
before_install:
- curl -o- -L https://yarnpkg.com/install.sh | bash
- export PATH="$HOME/.yarn/bin:$PATH"
cache:
yarn: true
script:
- yarn run travis-tests

View File

@@ -1,30 +0,0 @@
Node does not cache queries to `dns.lookup`, which can lead application doing a lot of connections to have perf issues and to saturate Node threads pool.
This library attempts to mitigate these problems by providing a version of this function with a version short cache, applied on both errors and results.
> Limitation: `verbatim: false` option is not supported.
It has exactly the same API as the native method and can be used directly:
```js
import { createCachedLookup } from '@vates/cached-dns.lookup'
const lookup = createCachedLookup()
lookup('example.net', { all: true, family: 0 }, (error, result) => {
if (error != null) {
return console.warn(error)
}
console.log(result)
})
```
Or it can be used to replace the native implementation and speed up the whole app:
```js
// assign our cached implementation to dns.lookup
const restore = createCachedLookup().patchGlobal()
// to restore the previous implementation
restore()
```

View File

@@ -1 +0,0 @@
../../scripts/npmignore

View File

@@ -1,63 +0,0 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
# @vates/cached-dns.lookup
[![Package Version](https://badgen.net/npm/v/@vates/cached-dns.lookup)](https://npmjs.org/package/@vates/cached-dns.lookup) ![License](https://badgen.net/npm/license/@vates/cached-dns.lookup) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/cached-dns.lookup)](https://bundlephobia.com/result?p=@vates/cached-dns.lookup) [![Node compatibility](https://badgen.net/npm/node/@vates/cached-dns.lookup)](https://npmjs.org/package/@vates/cached-dns.lookup)
> Cached implementation of dns.lookup
## Install
Installation of the [npm package](https://npmjs.org/package/@vates/cached-dns.lookup):
```
> npm install --save @vates/cached-dns.lookup
```
## Usage
Node does not cache queries to `dns.lookup`, which can lead application doing a lot of connections to have perf issues and to saturate Node threads pool.
This library attempts to mitigate these problems by providing a version of this function with a version short cache, applied on both errors and results.
> Limitation: `verbatim: false` option is not supported.
It has exactly the same API as the native method and can be used directly:
```js
import { createCachedLookup } from '@vates/cached-dns.lookup'
const lookup = createCachedLookup()
lookup('example.net', { all: true, family: 0 }, (error, result) => {
if (error != null) {
return console.warn(error)
}
console.log(result)
})
```
Or it can be used to replace the native implementation and speed up the whole app:
```js
// assign our cached implementation to dns.lookup
const restore = createCachedLookup().patchGlobal()
// to restore the previous implementation
restore()
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)

View File

@@ -1,72 +0,0 @@
'use strict'
const assert = require('assert')
const dns = require('dns')
const LRU = require('lru-cache')
function reportResults(all, results, callback) {
if (all) {
callback(null, results)
} else {
const first = results[0]
callback(null, first.address, first.family)
}
}
exports.createCachedLookup = function createCachedLookup({ lookup = dns.lookup } = {}) {
const cache = new LRU({
max: 500,
// 1 minute: long enough to be effective, short enough so there is no need to bother with DNS TTLs
ttl: 60e3,
})
function cachedLookup(hostname, options, callback) {
let all = false
let family = 0
if (typeof options === 'function') {
callback = options
} else if (typeof options === 'number') {
family = options
} else if (options != null) {
assert.notStrictEqual(options.verbatim, false, 'not supported by this implementation')
;({ all = all, family = family } = options)
}
// cache by family option because there will be an error if there is no
// entries for the requestion family so we cannot easily cache all families
// and filter on reporting back
const key = hostname + '/' + family
const result = cache.get(key)
if (result !== undefined) {
setImmediate(reportResults, all, result, callback)
} else {
lookup(hostname, { all: true, family, verbatim: true }, function onLookup(error, results) {
// errors are not cached because this will delay recovery after DNS/network issues
//
// there are no reliable way to detect if the error is real or simply
// that there are no results for the requested hostname
//
// there should be much fewer errors than success, therefore it should
// not be a big deal to not cache them
if (error != null) {
return callback(error)
}
cache.set(key, results)
reportResults(all, results, callback)
})
}
}
cachedLookup.patchGlobal = function patchGlobal() {
const previous = dns.lookup
dns.lookup = cachedLookup
return function restoreGlobal() {
assert.strictEqual(dns.lookup, cachedLookup)
dns.lookup = previous
}
}
return cachedLookup
}

View File

@@ -1,32 +0,0 @@
{
"engines": {
"node": ">=8"
},
"dependencies": {
"lru-cache": "^7.0.4"
},
"private": false,
"name": "@vates/cached-dns.lookup",
"description": "Cached implementation of dns.lookup",
"keywords": [
"cache",
"dns",
"lookup"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/cached-dns.lookup",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@vates/cached-dns.lookup",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "1.0.0",
"scripts": {
"postversion": "npm publish --access public"
}
}

View File

@@ -13,19 +13,15 @@ class Foo {
}
```
### `decorateClass(class, map)`
### `decorateMethodsWith(class, map)`
Decorates a number of accessors and methods directly, without using the decorator syntax:
Decorates a number of methods directly, without using the decorator syntax:
```js
import { decorateClass } from '@vates/decorate-with'
import { decorateMethodsWith } from '@vates/decorate-with'
class Foo {
get bar() {
// body
}
set bar(value) {
bar() {
// body
}
@@ -34,28 +30,22 @@ class Foo {
}
}
decorateClass(Foo, {
// getter and/or setter
bar: {
// without arguments
get: lodash.memoize,
decorateMethodsWith(Foo, {
// without arguments
bar: lodash.curry,
// with arguments
set: [lodash.debounce, 150],
},
// method (with or without arguments)
baz: lodash.curry,
// with arguments
baz: [lodash.debounce, 150],
})
```
The decorated class is returned, so you can export it directly.
To apply multiple transforms to an accessor/method, you can either call `decorateClass` multiple times or use [`@vates/compose`](https://www.npmjs.com/package/@vates/compose):
To apply multiple transforms to a method, you can either call `decorateMethodsWith` multiple times or use [`@vates/compose`](https://www.npmjs.com/package/@vates/compose):
```js
decorateClass(Foo, {
baz: compose([
decorateMethodsWith(Foo, {
bar: compose([
[lodash.debounce, 150]
lodash.curry,
])
@@ -79,8 +69,4 @@ class Foo {
}
```
Because it's a normal function, it can also be used with `decorateClass`, with `compose` or even by itself.
### `decorateMethodsWith(class, map)`
> Deprecated alias for [`decorateClass(class, map)`](#decorateclassclass-map).
Because it's a normal function, it can also be used with `decorateMethodsWith`, with `compose` or even by itself.

View File

@@ -31,19 +31,15 @@ class Foo {
}
```
### `decorateClass(class, map)`
### `decorateMethodsWith(class, map)`
Decorates a number of accessors and methods directly, without using the decorator syntax:
Decorates a number of methods directly, without using the decorator syntax:
```js
import { decorateClass } from '@vates/decorate-with'
import { decorateMethodsWith } from '@vates/decorate-with'
class Foo {
get bar() {
// body
}
set bar(value) {
bar() {
// body
}
@@ -52,28 +48,22 @@ class Foo {
}
}
decorateClass(Foo, {
// getter and/or setter
bar: {
// without arguments
get: lodash.memoize,
decorateMethodsWith(Foo, {
// without arguments
bar: lodash.curry,
// with arguments
set: [lodash.debounce, 150],
},
// method (with or without arguments)
baz: lodash.curry,
// with arguments
baz: [lodash.debounce, 150],
})
```
The decorated class is returned, so you can export it directly.
To apply multiple transforms to an accessor/method, you can either call `decorateClass` multiple times or use [`@vates/compose`](https://www.npmjs.com/package/@vates/compose):
To apply multiple transforms to a method, you can either call `decorateMethodsWith` multiple times or use [`@vates/compose`](https://www.npmjs.com/package/@vates/compose):
```js
decorateClass(Foo, {
baz: compose([
decorateMethodsWith(Foo, {
bar: compose([
[lodash.debounce, 150]
lodash.curry,
])
@@ -97,11 +87,7 @@ class Foo {
}
```
Because it's a normal function, it can also be used with `decorateClass`, with `compose` or even by itself.
### `decorateMethodsWith(class, map)`
> Deprecated alias for [`decorateClass(class, map)`](#decorateclassclass-map).
Because it's a normal function, it can also be used with `decorateMethodsWith`, with `compose` or even by itself.
## Contributions

View File

@@ -9,27 +9,14 @@ exports.decorateWith = function decorateWith(fn, ...args) {
const { getOwnPropertyDescriptor, defineProperty } = Object
function applyDecorator(decorator, value) {
return typeof decorator === 'function' ? decorator(value) : decorator[0](value, ...decorator.slice(1))
}
exports.decorateClass = exports.decorateMethodsWith = function decorateClass(klass, map) {
exports.decorateMethodsWith = function decorateMethodsWith(klass, map) {
const { prototype } = klass
for (const name of Object.keys(map)) {
const decorator = map[name]
const descriptor = getOwnPropertyDescriptor(prototype, name)
if (typeof decorator === 'function' || Array.isArray(decorator)) {
descriptor.value = applyDecorator(decorator, descriptor.value)
} else {
const { get, set } = decorator
if (get !== undefined) {
descriptor.get = applyDecorator(get, descriptor.get)
}
if (set !== undefined) {
descriptor.set = applyDecorator(set, descriptor.set)
}
}
const { value } = descriptor
const decorator = map[name]
descriptor.value = typeof decorator === 'function' ? decorator(value) : decorator[0](value, ...decorator.slice(1))
defineProperty(prototype, name, descriptor)
}
return klass

View File

@@ -3,9 +3,7 @@
const assert = require('assert')
const { describe, it } = require('tap').mocha
const { decorateClass, decorateWith, decorateMethodsWith, perInstance } = require('./')
const identity = _ => _
const { decorateWith, decorateMethodsWith, perInstance } = require('./')
describe('decorateWith', () => {
it('works', () => {
@@ -33,14 +31,11 @@ describe('decorateWith', () => {
})
})
describe('decorateClass', () => {
describe('decorateMethodsWith', () => {
it('works', () => {
class C {
foo() {}
bar() {}
get baz() {}
// eslint-disable-next-line accessor-pairs
set qux(_) {}
}
const expectedArgs = [Math.random(), Math.random()]
@@ -50,74 +45,27 @@ describe('decorateClass', () => {
const newFoo = () => {}
const newBar = () => {}
const newGetBaz = () => {}
const newSetQux = _ => {}
decorateClass(C, {
foo(fn) {
decorateMethodsWith(C, {
foo(method) {
assert.strictEqual(arguments.length, 1)
assert.strictEqual(fn, P.foo)
assert.strictEqual(method, P.foo)
return newFoo
},
bar: [
function (fn, ...args) {
assert.strictEqual(fn, P.bar)
function (method, ...args) {
assert.strictEqual(method, P.bar)
assert.deepStrictEqual(args, expectedArgs)
return newBar
},
...expectedArgs,
],
baz: {
get(fn) {
assert.strictEqual(arguments.length, 1)
assert.strictEqual(fn, descriptors.baz.get)
return newGetBaz
},
},
qux: {
set: [
function (fn, ...args) {
assert.strictEqual(fn, descriptors.qux.set)
assert.deepStrictEqual(args, expectedArgs)
return newSetQux
},
...expectedArgs,
],
},
})
const newDescriptors = Object.getOwnPropertyDescriptors(P)
assert.deepStrictEqual(newDescriptors.foo, { ...descriptors.foo, value: newFoo })
assert.deepStrictEqual(newDescriptors.bar, { ...descriptors.bar, value: newBar })
assert.deepStrictEqual(newDescriptors.baz, { ...descriptors.baz, get: newGetBaz })
assert.deepStrictEqual(newDescriptors.qux, { ...descriptors.qux, set: newSetQux })
})
it('throws if using an accessor decorator for a method', function () {
assert.throws(() =>
decorateClass(
class {
foo() {}
},
{ foo: { get: identity, set: identity } }
)
)
})
it('throws if using a method decorator for an accessor', function () {
assert.throws(() =>
decorateClass(
class {
get foo() {}
},
{ foo: identity }
)
)
})
})
it('decorateMethodsWith is an alias of decorateClass', function () {
assert.strictEqual(decorateMethodsWith, decorateClass)
})
describe('perInstance', () => {

View File

@@ -20,7 +20,7 @@
"url": "https://vates.fr"
},
"license": "ISC",
"version": "2.0.0",
"version": "1.0.0",
"engines": {
"node": ">=8.10"
},
@@ -29,6 +29,6 @@
"test": "tap"
},
"devDependencies": {
"tap": "^16.0.1"
"tap": "^15.1.6"
}
}

View File

@@ -1,50 +0,0 @@
> This library is compatible with Node's `EventEmitter` and web browsers' `EventTarget` APIs.
### API
```js
import { EventListenersManager } from '@vates/event-listeners-manager'
const events = new EventListenersManager(emitter)
// adding listeners
events.add('foo', onFoo).add('bar', onBar).on('baz', onBaz)
// removing a specific listener
events.remove('foo', onFoo)
// removing all listeners for a specific event
events.removeAll('foo')
// removing all listeners
events.removeAll()
```
### Typical use case
> Removing all listeners when no longer necessary.
Manually:
```js
const onFoo = () => {}
const onBar = () => {}
const onBaz = () => {}
emitter.on('foo', onFoo).on('bar', onBar).on('baz', onBaz)
// CODE LOGIC
emitter.off('foo', onFoo).off('bar', onBar).off('baz', onBaz)
```
With this library:
```js
const events = new EventListenersManager(emitter)
events.add('foo', () => {})).add('bar', () => {})).add('baz', () => {}))
// CODE LOGIC
events.removeAll()
```

View File

@@ -1 +0,0 @@
../../scripts/npmignore

View File

@@ -1,81 +0,0 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
# @vates/event-listeners-manager
[![Package Version](https://badgen.net/npm/v/@vates/event-listeners-manager)](https://npmjs.org/package/@vates/event-listeners-manager) ![License](https://badgen.net/npm/license/@vates/event-listeners-manager) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/event-listeners-manager)](https://bundlephobia.com/result?p=@vates/event-listeners-manager) [![Node compatibility](https://badgen.net/npm/node/@vates/event-listeners-manager)](https://npmjs.org/package/@vates/event-listeners-manager)
## Install
Installation of the [npm package](https://npmjs.org/package/@vates/event-listeners-manager):
```
> npm install --save @vates/event-listeners-manager
```
## Usage
> This library is compatible with Node's `EventEmitter` and web browsers' `EventTarget` APIs.
### API
```js
import { EventListenersManager } from '@vates/event-listeners-manager'
const events = new EventListenersManager(emitter)
// adding listeners
events.add('foo', onFoo).add('bar', onBar).on('baz', onBaz)
// removing a specific listener
events.remove('foo', onFoo)
// removing all listeners for a specific event
events.removeAll('foo')
// removing all listeners
events.removeAll()
```
### Typical use case
> Removing all listeners when no longer necessary.
Manually:
```js
const onFoo = () => {}
const onBar = () => {}
const onBaz = () => {}
emitter.on('foo', onFoo).on('bar', onBar).on('baz', onBaz)
// CODE LOGIC
emitter.off('foo', onFoo).off('bar', onBar).off('baz', onBaz)
```
With this library:
```js
const events = new EventListenersManager(emitter)
events.add('foo', () => {})).add('bar', () => {})).add('baz', () => {}))
// CODE LOGIC
events.removeAll()
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)

View File

@@ -1,56 +0,0 @@
'use strict'
exports.EventListenersManager = class EventListenersManager {
constructor(emitter) {
this._listeners = new Map()
this._add = (emitter.addListener || emitter.addEventListener).bind(emitter)
this._remove = (emitter.removeListener || emitter.removeEventListener).bind(emitter)
}
add(type, listener) {
let listeners = this._listeners.get(type)
if (listeners === undefined) {
listeners = new Set()
this._listeners.set(type, listeners)
}
// don't add the same listener multiple times (allowed on Node.js)
if (!listeners.has(listener)) {
listeners.add(listener)
this._add(type, listener)
}
return this
}
remove(type, listener) {
const allListeners = this._listeners
const listeners = allListeners.get(type)
if (listeners !== undefined && listeners.delete(listener)) {
this._remove(type, listener)
if (listeners.size === 0) {
allListeners.delete(type)
}
}
return this
}
removeAll(type) {
const allListeners = this._listeners
const remove = this._remove
const types = type !== undefined ? [type] : allListeners.keys()
for (const type of types) {
const listeners = allListeners.get(type)
if (listeners !== undefined) {
allListeners.delete(type)
for (const listener of listeners) {
remove(type, listener)
}
}
}
return this
}
}

View File

@@ -1,67 +0,0 @@
'use strict'
const t = require('tap')
const { EventEmitter } = require('events')
const { EventListenersManager } = require('./')
const noop = Function.prototype
// function spy (impl = Function.prototype) {
// function spy() {
// spy.calls.push([Array.from(arguments), this])
// }
// spy.calls = []
// return spy
// }
function assertListeners(t, event, listeners) {
t.strictSame(t.context.ee.listeners(event), listeners)
}
t.beforeEach(function (t) {
t.context.ee = new EventEmitter()
t.context.em = new EventListenersManager(t.context.ee)
})
t.test('.add adds a listener', function (t) {
t.context.em.add('foo', noop)
assertListeners(t, 'foo', [noop])
t.end()
})
t.test('.add does not add a duplicate listener', function (t) {
t.context.em.add('foo', noop).add('foo', noop)
assertListeners(t, 'foo', [noop])
t.end()
})
t.test('.remove removes a listener', function (t) {
t.context.em.add('foo', noop).remove('foo', noop)
assertListeners(t, 'foo', [])
t.end()
})
t.test('.removeAll removes all listeners of a given type', function (t) {
t.context.em.add('foo', noop).add('bar', noop).removeAll('foo')
assertListeners(t, 'foo', [])
assertListeners(t, 'bar', [noop])
t.end()
})
t.test('.removeAll removes all listeners', function (t) {
t.context.em.add('foo', noop).add('bar', noop).removeAll()
assertListeners(t, 'foo', [])
assertListeners(t, 'bar', [])
t.end()
})

View File

@@ -1,46 +0,0 @@
{
"engines": {
"node": ">=6"
},
"private": false,
"name": "@vates/event-listeners-manager",
"descriptions": "Easy way to clean up event listeners",
"keywords": [
"add",
"addEventListener",
"addListener",
"browser",
"clear",
"DOM",
"emitter",
"event",
"EventEmitter",
"EventTarget",
"management",
"manager",
"node",
"remove",
"removeEventListener",
"removeListener"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/event-listeners-manager",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@vates/event-listeners-manager",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "1.0.0",
"scripts": {
"postversion": "npm publish --access public",
"test": "tap --branches=72"
},
"devDependencies": {
"tap": "^16.2.0"
}
}

View File

@@ -35,6 +35,6 @@
"test": "tap"
},
"devDependencies": {
"tap": "^16.0.1"
"tap": "^15.1.6"
}
}

View File

@@ -1,9 +1,6 @@
### `readChunk(stream, [size])`
- returns the next available chunk of data
- like `stream.read()`, a number of bytes can be specified
- returns with less data than expected if stream has ended
- returns `null` if the stream has ended and no data has been read
- returns `null` if the stream has ended
```js
import { readChunk } from '@vates/read-chunk'
@@ -14,13 +11,3 @@ import { readChunk } from '@vates/read-chunk'
}
})()
```
### `readChunkStrict(stream, [size])`
Similar behavior to `readChunk` but throws if the stream ended before the requested data could be read.
```js
import { readChunkStrict } from '@vates/read-chunk'
const chunk = await readChunkStrict(stream, 1024)
```

View File

@@ -16,12 +16,9 @@ Installation of the [npm package](https://npmjs.org/package/@vates/read-chunk):
## Usage
### `readChunk(stream, [size])`
- returns the next available chunk of data
- like `stream.read()`, a number of bytes can be specified
- returns with less data than expected if stream has ended
- returns `null` if the stream has ended and no data has been read
- returns `null` if the stream has ended
```js
import { readChunk } from '@vates/read-chunk'
@@ -33,16 +30,6 @@ import { readChunk } from '@vates/read-chunk'
})()
```
### `readChunkStrict(stream, [size])`
Similar behavior to `readChunk` but throws if the stream ended before the requested data could be read.
```js
import { readChunkStrict } from '@vates/read-chunk'
const chunk = await readChunkStrict(stream, 1024)
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on

View File

@@ -30,22 +30,3 @@ const readChunk = (stream, size) =>
onReadable()
})
exports.readChunk = readChunk
exports.readChunkStrict = async function readChunkStrict(stream, size) {
const chunk = await readChunk(stream, size)
if (chunk === null) {
throw new Error('stream has ended without data')
}
if (size !== undefined && chunk.length !== size) {
const error = new Error('stream has ended with not enough data')
Object.defineProperties(error, {
chunk: {
value: chunk,
},
})
throw error
}
return chunk
}

View File

@@ -4,7 +4,7 @@
const { Readable } = require('stream')
const { readChunk, readChunkStrict } = require('./')
const { readChunk } = require('./')
const makeStream = it => Readable.from(it, { objectMode: false })
makeStream.obj = Readable.from
@@ -43,27 +43,3 @@ describe('readChunk', () => {
})
})
})
const rejectionOf = promise =>
promise.then(
value => {
throw value
},
error => error
)
describe('readChunkStrict', function () {
it('throws if stream is empty', async () => {
const error = await rejectionOf(readChunkStrict(makeStream([])))
expect(error).toBeInstanceOf(Error)
expect(error.message).toBe('stream has ended without data')
expect(error.chunk).toEqual(undefined)
})
it('throws if stream ends with not enough data', async () => {
const error = await rejectionOf(readChunkStrict(makeStream(['foo', 'bar']), 10))
expect(error).toBeInstanceOf(Error)
expect(error.message).toBe('stream has ended with not enough data')
expect(error.chunk).toEqual(Buffer.from('foobar'))
})
})

View File

@@ -0,0 +1,3 @@
'use strict'
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -0,0 +1 @@
../../scripts/babel-eslintrc.js

View File

@@ -9,14 +9,28 @@
},
"version": "0.2.0",
"engines": {
"node": ">=14"
"node": ">=10"
},
"main": "dist/",
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"postversion": "npm publish --access public",
"test": "tap --lines 67 --functions 92 --branches 52 --statements 67"
"prebuild": "rimraf dist/",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build"
},
"devDependencies": {
"@babel/cli": "^7.7.4",
"@babel/core": "^7.7.4",
"@babel/plugin-proposal-decorators": "^7.8.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.8.0",
"@babel/preset-env": "^7.7.4",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"dependencies": {
"@vates/decorate-with": "^2.0.0",
"@vates/decorate-with": "^1.0.0",
"@xen-orchestra/log": "^0.3.0",
"golike-defer": "^0.5.1",
"object-hash": "^2.0.1"
@@ -26,8 +40,5 @@
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"devDependencies": {
"tap": "^16.0.1"
}
}

View File

@@ -1,14 +1,12 @@
'use strict'
const assert = require('assert')
const hash = require('object-hash')
const { createLogger } = require('@xen-orchestra/log')
const { decorateClass } = require('@vates/decorate-with')
const { defer } = require('golike-defer')
import assert from 'assert'
import hash from 'object-hash'
import { createLogger } from '@xen-orchestra/log'
import { decorateWith } from '@vates/decorate-with'
import { defer } from 'golike-defer'
const log = createLogger('xo:audit-core')
exports.Storage = class Storage {
export class Storage {
constructor() {
this._lock = Promise.resolve()
}
@@ -31,7 +29,7 @@ const ID_TO_ALGORITHM = {
5: 'sha256',
}
class AlteredRecordError extends Error {
export class AlteredRecordError extends Error {
constructor(id, nValid, record) {
super('altered record')
@@ -40,9 +38,8 @@ class AlteredRecordError extends Error {
this.record = record
}
}
exports.AlteredRecordError = AlteredRecordError
class MissingRecordError extends Error {
export class MissingRecordError extends Error {
constructor(id, nValid) {
super('missing record')
@@ -50,10 +47,8 @@ class MissingRecordError extends Error {
this.nValid = nValid
}
}
exports.MissingRecordError = MissingRecordError
const NULL_ID = 'nullId'
exports.NULL_ID = NULL_ID
export const NULL_ID = 'nullId'
const HASH_ALGORITHM_ID = '5'
const createHash = (data, algorithmId = HASH_ALGORITHM_ID) =>
@@ -62,12 +57,13 @@ const createHash = (data, algorithmId = HASH_ALGORITHM_ID) =>
excludeKeys: key => key === 'id',
})}`
class AuditCore {
export class AuditCore {
constructor(storage) {
assert.notStrictEqual(storage, undefined)
this._storage = storage
}
@decorateWith(defer)
async add($defer, subject, event, data) {
const time = Date.now()
$defer(await this._storage.acquireLock())
@@ -152,6 +148,7 @@ class AuditCore {
}
}
@decorateWith(defer)
async deleteRangeAndRewrite($defer, newest, oldest) {
assert.notStrictEqual(newest, undefined)
assert.notStrictEqual(oldest, undefined)
@@ -192,9 +189,3 @@ class AuditCore {
}
}
}
exports.AuditCore = AuditCore
decorateClass(AuditCore, {
add: defer,
deleteRangeAndRewrite: defer,
})

View File

@@ -1,9 +1,6 @@
'use strict'
/* eslint-env jest */
const assert = require('assert/strict')
const { afterEach, describe, it } = require('tap').mocha
const { AlteredRecordError, AuditCore, MissingRecordError, NULL_ID, Storage } = require('.')
import { AlteredRecordError, AuditCore, MissingRecordError, NULL_ID, Storage } from '.'
const asyncIteratorToArray = async asyncIterator => {
const array = []
@@ -75,7 +72,7 @@ const auditCore = new AuditCore(db)
const storeAuditRecords = async () => {
await Promise.all(DATA.map(data => auditCore.add(...data)))
const records = await asyncIteratorToArray(auditCore.getFrom())
assert.equal(records.length, DATA.length)
expect(records.length).toBe(DATA.length)
return records
}
@@ -86,11 +83,10 @@ describe('auditCore', () => {
const [newestRecord, deletedRecord] = await storeAuditRecords()
const nValidRecords = await auditCore.checkIntegrity(NULL_ID, newestRecord.id)
assert.equal(nValidRecords, DATA.length)
expect(nValidRecords).toBe(DATA.length)
await db.del(deletedRecord.id)
await assert.rejects(
auditCore.checkIntegrity(NULL_ID, newestRecord.id),
await expect(auditCore.checkIntegrity(NULL_ID, newestRecord.id)).rejects.toEqual(
new MissingRecordError(deletedRecord.id, 1)
)
})
@@ -101,8 +97,7 @@ describe('auditCore', () => {
alteredRecord.event = ''
await db.put(alteredRecord)
await assert.rejects(
auditCore.checkIntegrity(NULL_ID, newestRecord.id),
await expect(auditCore.checkIntegrity(NULL_ID, newestRecord.id)).rejects.toEqual(
new AlteredRecordError(alteredRecord.id, 1, alteredRecord)
)
})
@@ -112,8 +107,8 @@ describe('auditCore', () => {
await auditCore.deleteFrom(secondRecord.id)
assert.equal(await db.get(firstRecord.id), undefined)
assert.equal(await db.get(secondRecord.id), undefined)
expect(await db.get(firstRecord.id)).toBe(undefined)
expect(await db.get(secondRecord.id)).toBe(undefined)
await auditCore.checkIntegrity(secondRecord.id, thirdRecord.id)
})

View File

@@ -10,7 +10,7 @@
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"engines": {
"node": ">=8.3"
"node": ">=6"
},
"license": "AGPL-3.0-or-later",
"author": {

View File

@@ -1,3 +1,5 @@
#!/usr/bin/env node
'use strict'
// -----------------------------------------------------------------------------

View File

@@ -7,8 +7,8 @@
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"dependencies": {
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.25.0",
"@xen-orchestra/fs": "^1.0.3",
"@xen-orchestra/backups": "^0.20.0",
"@xen-orchestra/fs": "^0.20.0",
"filenamify": "^4.1.0",
"getopts": "^2.2.5",
"lodash": "^4.17.15",
@@ -27,7 +27,7 @@
"scripts": {
"postversion": "npm publish --access public"
},
"version": "0.7.3",
"version": "0.7.0",
"license": "AGPL-3.0-or-later",
"author": {
"name": "Vates SAS",

View File

@@ -6,7 +6,7 @@ const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { compileTemplate } = require('@xen-orchestra/template')
const { limitConcurrency } = require('limit-concurrency-decorator')
const { extractIdsFromSimplePattern } = require('./extractIdsFromSimplePattern.js')
const { extractIdsFromSimplePattern } = require('./_extractIdsFromSimplePattern.js')
const { PoolMetadataBackup } = require('./_PoolMetadataBackup.js')
const { Task } = require('./Task.js')
const { VmBackup } = require('./_VmBackup.js')
@@ -24,34 +24,6 @@ const getAdaptersByRemote = adapters => {
const runTask = (...args) => Task.run(...args).catch(noop) // errors are handled by logs
const DEFAULT_SETTINGS = {
reportWhen: 'failure',
}
const DEFAULT_VM_SETTINGS = {
bypassVdiChainsCheck: false,
checkpointSnapshot: false,
concurrency: 2,
copyRetention: 0,
deleteFirst: false,
exportRetention: 0,
fullInterval: 0,
healthCheckSr: undefined,
healthCheckVmsWithTags: [],
maxMergedDeltasPerRun: 2,
offlineBackup: false,
offlineSnapshot: false,
snapshotRetention: 0,
timeout: 0,
unconditionalSnapshot: false,
vmTimeout: 0,
}
const DEFAULT_METADATA_SETTINGS = {
retentionPoolMetadata: 0,
retentionXoMetadata: 0,
}
exports.Backup = class Backup {
constructor({ config, getAdapter, getConnectedRecord, job, schedule }) {
this._config = config
@@ -70,22 +42,17 @@ exports.Backup = class Backup {
'{job.name}': job.name,
'{vm.name_label}': vm => vm.name_label,
})
}
const { type } = job
const baseSettings = { ...DEFAULT_SETTINGS }
run() {
const type = this._job.type
if (type === 'backup') {
Object.assign(baseSettings, DEFAULT_VM_SETTINGS, config.defaultSettings, config.vm?.defaultSettings)
this.run = this._runVmBackup
return this._runVmBackup()
} else if (type === 'metadataBackup') {
Object.assign(baseSettings, DEFAULT_METADATA_SETTINGS, config.defaultSettings, config.metadata?.defaultSettings)
this.run = this._runMetadataBackup
return this._runMetadataBackup()
} else {
throw new Error(`No runner for the backup type ${type}`)
}
Object.assign(baseSettings, job.settings[''])
this._baseSettings = baseSettings
this._settings = { ...baseSettings, ...job.settings[schedule.id] }
}
async _runMetadataBackup() {
@@ -97,6 +64,13 @@ exports.Backup = class Backup {
}
const config = this._config
const settings = {
...config.defaultSettings,
...config.metadata.defaultSettings,
...job.settings[''],
...job.settings[schedule.id],
}
const poolIds = extractIdsFromSimplePattern(job.pools)
const isEmptyPools = poolIds.length === 0
const isXoMetadata = job.xoMetadata !== undefined
@@ -104,8 +78,6 @@ exports.Backup = class Backup {
throw new Error('no metadata mode found')
}
const settings = this._settings
const { retentionPoolMetadata, retentionXoMetadata } = settings
if (
@@ -217,7 +189,14 @@ exports.Backup = class Backup {
const schedule = this._schedule
const config = this._config
const settings = this._settings
const { settings } = job
const scheduleSettings = {
...config.defaultSettings,
...config.vm.defaultSettings,
...settings[''],
...settings[schedule.id],
}
await Disposable.use(
Disposable.all(
extractIdsFromSimplePattern(job.srs).map(id =>
@@ -245,15 +224,14 @@ exports.Backup = class Backup {
})
)
),
() => settings.healthCheckSr !== undefined ? this._getRecord('SR', settings.healthCheckSr) : undefined,
async (srs, remoteAdapters, healthCheckSr) => {
async (srs, remoteAdapters) => {
// remove adapters that failed (already handled)
remoteAdapters = remoteAdapters.filter(_ => _ !== undefined)
// remove srs that failed (already handled)
srs = srs.filter(_ => _ !== undefined)
if (remoteAdapters.length === 0 && srs.length === 0 && settings.snapshotRetention === 0) {
if (remoteAdapters.length === 0 && srs.length === 0 && scheduleSettings.snapshotRetention === 0) {
return
}
@@ -263,27 +241,23 @@ exports.Backup = class Backup {
remoteAdapters = getAdaptersByRemote(remoteAdapters)
const allSettings = this._job.settings
const baseSettings = this._baseSettings
const handleVm = vmUuid =>
runTask({ name: 'backup VM', data: { type: 'VM', id: vmUuid } }, () =>
Disposable.use(this._getRecord('VM', vmUuid), vm =>
new VmBackup({
baseSettings,
config,
getSnapshotNameLabel,
healthCheckSr,
job,
// remotes,
remoteAdapters,
schedule,
settings: { ...settings, ...allSettings[vm.uuid] },
settings: { ...scheduleSettings, ...settings[vmUuid] },
srs,
vm,
}).run()
)
)
const { concurrency } = settings
const { concurrency } = scheduleSettings
await asyncMapSettled(vmIds, concurrency === 0 ? handleVm : limitConcurrency(concurrency)(handleVm))
}
)

View File

@@ -1,64 +0,0 @@
'use strict'
const { Task } = require('./Task')
exports.HealthCheckVmBackup = class HealthCheckVmBackup {
#xapi
#restoredVm
constructor({ restoredVm, xapi }) {
this.#restoredVm = restoredVm
this.#xapi = xapi
}
async run() {
return Task.run(
{
name: 'vmstart',
},
async () => {
let restoredVm = this.#restoredVm
const xapi = this.#xapi
const restoredId = restoredVm.uuid
// remove vifs
await Promise.all(restoredVm.$VIFs.map(vif => xapi.callAsync('VIF.destroy', vif.$ref)))
const start = new Date()
// start Vm
await xapi.callAsync(
'VM.start',
restoredVm.$ref,
false, // Start paused?
false // Skip pre-boot checks?
)
const started = new Date()
const timeout = 10 * 60 * 1000
const startDuration = started - start
let remainingTimeout = timeout - startDuration
if (remainingTimeout < 0) {
throw new Error(`VM ${restoredId} not started after ${timeout / 1000} second`)
}
// wait for the 'Running' event to be really stored in local xapi object cache
restoredVm = await xapi.waitObjectState(restoredVm.$ref, vm => vm.power_state === 'Running', {
timeout: remainingTimeout,
})
const running = new Date()
remainingTimeout -= running - started
if (remainingTimeout < 0) {
throw new Error(`local xapi did not get Runnig state for VM ${restoredId} after ${timeout / 1000} second`)
}
// wait for the guest tool version to be defined
await xapi.waitObjectState(restoredVm.guest_metrics, gm => gm?.PV_drivers_version?.major !== undefined, {
timeout: remainingTimeout,
})
}
)
}
}

View File

@@ -8,7 +8,7 @@ const { Task } = require('./Task.js')
const { watchStreamSize } = require('./_watchStreamSize.js')
exports.ImportVmBackup = class ImportVmBackup {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs = {} } = {} }) {
constructor({ adapter, metadata, srUuid, xapi, settings: { newMacAddresses, mapVdisSrs } = {} }) {
this._adapter = adapter
this._importDeltaVmSettings = { newMacAddresses, mapVdisSrs }
this._metadata = metadata
@@ -30,12 +30,7 @@ exports.ImportVmBackup = class ImportVmBackup {
} else {
assert.strictEqual(metadata.mode, 'delta')
const ignoredVdis = new Set(
Object.entries(this._importDeltaVmSettings.mapVdisSrs)
.filter(([_, srUuid]) => srUuid === null)
.map(([vdiUuid]) => vdiUuid)
)
backup = await adapter.readDeltaVmBackup(metadata, ignoredVdis)
backup = await adapter.readDeltaVmBackup(metadata)
Object.values(backup.streams).forEach(stream => watchStreamSize(stream, sizeContainer))
}

View File

@@ -1,16 +1,14 @@
'use strict'
const { asyncMap, asyncMapSettled } = require('@xen-orchestra/async-map')
const { synchronized } = require('decorator-synchronized')
const Disposable = require('promise-toolbox/Disposable')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
const pDefer = require('promise-toolbox/defer')
const groupBy = require('lodash/groupBy.js')
const pickBy = require('lodash/pickBy.js')
const { dirname, join, normalize, resolve } = require('path')
const { createLogger } = require('@xen-orchestra/log')
const { createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } = require('vhd-lib')
const { Constants, createVhdDirectoryFromStream, openVhd, VhdAbstract, VhdDirectory, VhdSynthetic } = require('vhd-lib')
const { deduped } = require('@vates/disposable/deduped.js')
const { decorateMethodsWith } = require('@vates/decorate-with')
const { compose } = require('@vates/compose')
@@ -18,7 +16,6 @@ const { execFile } = require('child_process')
const { readdir, stat } = require('fs-extra')
const { v4: uuidv4 } = require('uuid')
const { ZipFile } = require('yazl')
const zlib = require('zlib')
const { BACKUP_DIR } = require('./_getVmBackupDir.js')
const { cleanVm } = require('./_cleanVm.js')
@@ -80,7 +77,6 @@ class RemoteAdapter {
this._dirMode = dirMode
this._handler = handler
this._vhdDirectoryCompression = vhdDirectoryCompression
this._readCacheListVmBackups = synchronized.withKey()(this._readCacheListVmBackups)
}
get handler() {
@@ -264,8 +260,7 @@ class RemoteAdapter {
}
async deleteVmBackups(files) {
const metadatas = await asyncMap(files, file => this.readVmBackupMetadata(file))
const { delta, full, ...others } = groupBy(metadatas, 'mode')
const { delta, full, ...others } = groupBy(await asyncMap(files, file => this.readVmBackupMetadata(file)), 'mode')
const unsupportedModes = Object.keys(others)
if (unsupportedModes.length !== 0) {
@@ -282,9 +277,6 @@ class RemoteAdapter {
// don't merge in main process, unused VHDs will be merged in the next backup run
await this.cleanVm(dir, { remove: true, onLog: warn })
}
const dedupedVmUuid = new Set(metadatas.map(_ => _.vm.uuid))
await asyncMap(dedupedVmUuid, vmUuid => this.invalidateVmBackupListCache(vmUuid))
}
#getCompressionType() {
@@ -455,94 +447,34 @@ class RemoteAdapter {
return backupsByPool
}
async invalidateVmBackupListCache(vmUuid) {
await this.handler.unlink(`${BACKUP_DIR}/${vmUuid}/cache.json.gz`)
}
async #getCachabledDataListVmBackups(dir) {
async listVmBackups(vmUuid, predicate) {
const handler = this._handler
const backups = {}
const backups = []
try {
const files = await handler.list(dir, {
const files = await handler.list(`${BACKUP_DIR}/${vmUuid}`, {
filter: isMetadataFile,
prependDir: true,
})
await asyncMap(files, async file => {
try {
const metadata = await this.readVmBackupMetadata(file)
// inject an id usable by importVmBackupNg()
metadata.id = metadata._filename
backups[file] = metadata
if (predicate === undefined || predicate(metadata)) {
// inject an id usable by importVmBackupNg()
metadata.id = metadata._filename
backups.push(metadata)
}
} catch (error) {
warn(`can't read vm backup metadata`, { error, file, dir })
warn(`listVmBackups ${file}`, { error })
}
})
return backups
} catch (error) {
let code
if (error == null || ((code = error.code) !== 'ENOENT' && code !== 'ENOTDIR')) {
throw error
}
}
}
// use _ to mark this method as private by convention
// since we decorate it with synchronized.withKey in the constructor
// and # function are not writeable.
//
// read the list of backup of a Vm from cache
// if cache is missing or broken => regenerate it and return
async _readCacheListVmBackups(vmUuid) {
const dir = `${BACKUP_DIR}/${vmUuid}`
const path = `${dir}/cache.json.gz`
try {
const gzipped = await this.handler.readFile(path)
const text = await fromCallback(zlib.gunzip, gzipped)
return JSON.parse(text)
} catch (error) {
if (error.code !== 'ENOENT') {
warn('Cache file was unreadable', { vmUuid, error })
}
}
// nothing cached, or cache unreadable => regenerate it
const backups = await this.#getCachabledDataListVmBackups(dir)
if (backups === undefined) {
return
}
// detached async action, will not reject
this.#writeVmBackupsCache(path, backups)
return backups
}
async #writeVmBackupsCache(cacheFile, backups) {
try {
const text = JSON.stringify(backups)
const zipped = await fromCallback(zlib.gzip, text)
await this.handler.writeFile(cacheFile, zipped, { flags: 'w' })
} catch (error) {
warn('writeVmBackupsCache', { cacheFile, error })
}
}
async listVmBackups(vmUuid, predicate) {
const backups = []
const cached = await this._readCacheListVmBackups(vmUuid)
if (cached === undefined) {
return []
}
Object.values(cached).forEach(metadata => {
if (predicate === undefined || predicate(metadata)) {
backups.push(metadata)
}
})
return backups.sort(compareTimestamp)
}
@@ -598,42 +530,60 @@ class RemoteAdapter {
})
}
// open the hierarchy of ancestors until we find a full one
async _createSyntheticStream(handler, path) {
const disposableSynthetic = await VhdSynthetic.fromVhdChain(handler, path)
async _createSyntheticStream(handler, paths) {
let disposableVhds = []
// if it's a path : open all hierarchy of parent
if (typeof paths === 'string') {
let vhd
let vhdPath = paths
do {
const disposable = await openVhd(handler, vhdPath)
vhd = disposable.value
disposableVhds.push(disposable)
vhdPath = resolveRelativeFromFile(vhdPath, vhd.header.parentUnicodeName)
} while (vhd.footer.diskType !== Constants.DISK_TYPES.DYNAMIC)
} else {
// only open the list of path given
disposableVhds = paths.map(path => openVhd(handler, path))
}
// I don't want the vhds to be disposed on return
// but only when the stream is done ( or failed )
const disposables = await Disposable.all(disposableVhds)
const vhds = disposables.value
let disposed = false
const disposeOnce = async () => {
if (!disposed) {
disposed = true
try {
await disposableSynthetic.dispose()
await disposables.dispose()
} catch (error) {
warn('openVhd: failed to dispose VHDs', { error })
warn('_createSyntheticStream: failed to dispose VHDs', { error })
}
}
}
const synthetic = disposableSynthetic.value
const synthetic = new VhdSynthetic(vhds)
await synthetic.readHeaderAndFooter()
await synthetic.readBlockAllocationTable()
const stream = await synthetic.stream()
stream.on('end', disposeOnce)
stream.on('close', disposeOnce)
stream.on('error', disposeOnce)
return stream
}
async readDeltaVmBackup(metadata, ignoredVdis) {
async readDeltaVmBackup(metadata) {
const handler = this._handler
const { vbds, vhds, vifs, vm } = metadata
const { vbds, vdis, vhds, vifs, vm } = metadata
const dir = dirname(metadata._filename)
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))
const streams = {}
await asyncMapSettled(Object.keys(vdis), async ref => {
streams[`${ref}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[ref]))
await asyncMapSettled(Object.keys(vdis), async id => {
streams[`${id}.vhd`] = await this._createSyntheticStream(handler, join(dir, vhds[id]))
})
return {
@@ -651,10 +601,7 @@ class RemoteAdapter {
}
async readVmBackupMetadata(path) {
// _filename is a private field used to compute the backup id
//
// it's enumerable to make it cacheable
return { ...JSON.parse(await this._handler.readFile(path)), _filename: path }
return Object.defineProperty(JSON.parse(await this._handler.readFile(path)), '_filename', { value: path })
}
}

View File

@@ -45,18 +45,7 @@ const forkDeltaExport = deltaExport =>
})
class VmBackup {
constructor({
config,
getSnapshotNameLabel,
healthCheckSr,
job,
remoteAdapters,
remotes,
schedule,
settings,
srs,
vm,
}) {
constructor({ config, getSnapshotNameLabel, job, remoteAdapters, remotes, schedule, settings, srs, vm }) {
if (vm.other_config['xo:backup:job'] === job.id && 'start' in vm.blocked_operations) {
// don't match replicated VMs created by this very job otherwise they
// will be replicated again and again
@@ -66,6 +55,7 @@ class VmBackup {
this.config = config
this.job = job
this.remoteAdapters = remoteAdapters
this.remotes = remotes
this.scheduleId = schedule.id
this.timestamp = undefined
@@ -79,7 +69,6 @@ class VmBackup {
this._fullVdisRequired = undefined
this._getSnapshotNameLabel = getSnapshotNameLabel
this._isDelta = job.mode === 'delta'
this._healthCheckSr = healthCheckSr
this._jobId = job.id
this._jobSnapshots = undefined
this._xapi = vm.$xapi
@@ -106,6 +95,7 @@ class VmBackup {
: [FullBackupWriter, FullReplicationWriter]
const allSettings = job.settings
Object.keys(remoteAdapters).forEach(remoteId => {
const targetSettings = {
...settings,
@@ -153,13 +143,6 @@ class VmBackup {
errors.push(error)
this.delete(writer)
warn(warnMessage, { error, writer: writer.constructor.name })
// these two steps are the only one that are not already in their own sub tasks
if (warnMessage === 'writer.checkBaseVdis()' || warnMessage === 'writer.beforeBackup()') {
Task.warning(
`the writer ${writer.constructor.name} has failed the step ${warnMessage} with error ${error.message}. It won't be used anymore in this job execution.`
)
}
}
})
if (writers.size === 0) {
@@ -190,10 +173,7 @@ class VmBackup {
const settings = this._settings
const doSnapshot =
settings.unconditionalSnapshot ||
this._isDelta ||
(!settings.offlineBackup && vm.power_state === 'Running') ||
settings.snapshotRetention !== 0
this._isDelta || (!settings.offlineBackup && vm.power_state === 'Running') || settings.snapshotRetention !== 0
if (doSnapshot) {
await Task.run({ name: 'snapshot' }, async () => {
if (!settings.bypassVdiChainsCheck) {
@@ -201,9 +181,7 @@ class VmBackup {
}
const snapshotRef = await vm[settings.checkpointSnapshot ? '$checkpoint' : '$snapshot']({
ignoreNobakVdis: true,
name_label: this._getSnapshotNameLabel(vm),
unplugVusbs: true,
})
this.timestamp = Date.now()
@@ -325,17 +303,22 @@ class VmBackup {
}
async _removeUnusedSnapshots() {
const allSettings = this.job.settings
const baseSettings = this._baseSettings
const jobSettings = this.job.settings
const baseVmRef = this._baseVm?.$ref
const { config } = this
const baseSettings = {
...config.defaultSettings,
...config.metadata.defaultSettings,
...jobSettings[''],
}
const snapshotsPerSchedule = groupBy(this._jobSnapshots, _ => _.other_config['xo:backup:schedule'])
const xapi = this._xapi
await asyncMap(Object.entries(snapshotsPerSchedule), ([scheduleId, snapshots]) => {
const settings = {
...baseSettings,
...allSettings[scheduleId],
...allSettings[this.vm.uuid],
...jobSettings[scheduleId],
...jobSettings[this.vm.uuid],
}
return asyncMap(getOldEntries(settings.snapshotRetention, snapshots), ({ $ref }) => {
if ($ref !== baseVmRef) {
@@ -414,24 +397,6 @@ class VmBackup {
this._fullVdisRequired = fullVdisRequired
}
async _healthCheck() {
const settings = this._settings
if (this._healthCheckSr === undefined) {
return
}
// check if current VM has tags
const { tags } = this.vm
const intersect = settings.healthCheckVmsWithTags.some(t => tags.includes(t))
if (settings.healthCheckVmsWithTags.length !== 0 && !intersect) {
return
}
await this._callWriters(writer => writer.healthCheck(this._healthCheckSr), 'writer.healthCheck()')
}
async run($defer) {
const settings = this._settings
assert(
@@ -441,9 +406,7 @@ class VmBackup {
await this._callWriters(async writer => {
await writer.beforeBackup()
$defer(async () => {
await writer.afterBackup()
})
$defer(() => writer.afterBackup())
}, 'writer.beforeBackup()')
await this._fetchJobSnapshots()
@@ -479,7 +442,6 @@ class VmBackup {
await this._fetchJobSnapshots()
await this._removeUnusedSnapshots()
}
await this._healthCheck()
}
}
exports.VmBackup = VmBackup

View File

@@ -3,4 +3,4 @@
exports.isMetadataFile = filename => filename.endsWith('.json')
exports.isVhdFile = filename => filename.endsWith('.vhd')
exports.isXvaFile = filename => filename.endsWith('.xva')
exports.isXvaSumFile = filename => filename.endsWith('.xva.checksum')
exports.isXvaSumFile = filename => filename.endsWith('.xva.cheksum')

View File

@@ -4,8 +4,6 @@ require('@xen-orchestra/log/configure.js').catchGlobalErrors(
require('@xen-orchestra/log').createLogger('xo:backups:worker')
)
require('@vates/cached-dns.lookup').createCachedLookup().patchGlobal()
const Disposable = require('promise-toolbox/Disposable')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { compose } = require('@vates/compose')

View File

@@ -5,9 +5,9 @@
const rimraf = require('rimraf')
const tmp = require('tmp')
const fs = require('fs-extra')
const uuid = require('uuid')
const { getHandler } = require('@xen-orchestra/fs')
const { pFromCallback } = require('promise-toolbox')
const crypto = require('crypto')
const { RemoteAdapter } = require('./RemoteAdapter')
const { VHDFOOTER, VHDHEADER } = require('./tests.fixtures.js')
const { VhdFile, Constants, VhdDirectory, VhdAbstract } = require('vhd-lib')
@@ -34,8 +34,7 @@ afterEach(async () => {
await handler.forget()
})
const uniqueId = () => uuid.v1()
const uniqueIdBuffer = () => Buffer.from(uniqueId(), 'utf-8')
const uniqueId = () => crypto.randomBytes(16).toString('hex')
async function generateVhd(path, opts = {}) {
let vhd
@@ -54,9 +53,10 @@ async function generateVhd(path, opts = {}) {
}
vhd.header = { ...VHDHEADER, ...opts.header }
vhd.footer = { ...VHDFOOTER, ...opts.footer, uuid: uniqueIdBuffer() }
vhd.footer = { ...VHDFOOTER, ...opts.footer }
vhd.footer.uuid = Buffer.from(crypto.randomBytes(16))
if (vhd.header.parentUuid) {
if (vhd.header.parentUnicodeName) {
vhd.footer.diskType = Constants.DISK_TYPES.DIFFERENCING
} else {
vhd.footer.diskType = Constants.DISK_TYPES.DYNAMIC
@@ -91,31 +91,24 @@ test('It remove broken vhd', async () => {
})
test('it remove vhd with missing or multiple ancestors', async () => {
// one with a broken parent, should be deleted
// one with a broken parent
await generateVhd(`${basePath}/abandonned.vhd`, {
header: {
parentUnicodeName: 'gone.vhd',
parentUuid: uniqueIdBuffer(),
parentUid: Buffer.from(crypto.randomBytes(16)),
},
})
// one orphan, which is a full vhd, no parent : should stay
// one orphan, which is a full vhd, no parent
const orphan = await generateVhd(`${basePath}/orphan.vhd`)
// a child to the orphan in the metadata : should stay
// a child to the orphan
await generateVhd(`${basePath}/child.vhd`, {
header: {
parentUnicodeName: 'orphan.vhd',
parentUuid: orphan.footer.uuid,
parentUid: orphan.footer.uuid,
},
})
await handler.writeFile(
`metadata.json`,
JSON.stringify({
mode: 'delta',
vhds: [`${basePath}/child.vhd`, `${basePath}/abandonned.vhd`],
}),
{ flags: 'w' }
)
// clean
let loggued = ''
const onLog = message => {
@@ -154,7 +147,7 @@ test('it remove backup meta data referencing a missing vhd in delta backup', asy
await generateVhd(`${basePath}/child.vhd`, {
header: {
parentUnicodeName: 'orphan.vhd',
parentUuid: orphan.footer.uuid,
parentUid: orphan.footer.uuid,
},
})
@@ -208,14 +201,14 @@ test('it merges delta of non destroyed chain', async () => {
const child = await generateVhd(`${basePath}/child.vhd`, {
header: {
parentUnicodeName: 'orphan.vhd',
parentUuid: orphan.footer.uuid,
parentUid: orphan.footer.uuid,
},
})
// a grand child
await generateVhd(`${basePath}/grandchild.vhd`, {
header: {
parentUnicodeName: 'child.vhd',
parentUuid: child.footer.uuid,
parentUid: child.footer.uuid,
},
})
@@ -224,12 +217,14 @@ test('it merges delta of non destroyed chain', async () => {
loggued.push(message)
}
await adapter.cleanVm('/', { remove: true, onLog })
expect(loggued[0]).toEqual(`incorrect size in metadata: 12000 instead of 209920`)
expect(loggued[0]).toEqual(`the parent /${basePath}/orphan.vhd of the child /${basePath}/child.vhd is unused`)
expect(loggued[1]).toEqual(`incorrect size in metadata: 12000 instead of 209920`)
loggued = []
await adapter.cleanVm('/', { remove: true, merge: true, onLog })
const [merging] = loggued
expect(merging).toEqual(`merging 1 children into /${basePath}/orphan.vhd`)
const [unused, merging] = loggued
expect(unused).toEqual(`the parent /${basePath}/orphan.vhd of the child /${basePath}/child.vhd is unused`)
expect(merging).toEqual(`merging /${basePath}/child.vhd into /${basePath}/orphan.vhd`)
const metadata = JSON.parse(await handler.readFile(`metadata.json`))
// size should be the size of children + grand children after the merge
@@ -259,7 +254,7 @@ test('it finish unterminated merge ', async () => {
const child = await generateVhd(`${basePath}/child.vhd`, {
header: {
parentUnicodeName: 'orphan.vhd',
parentUuid: orphan.footer.uuid,
parentUid: orphan.footer.uuid,
},
})
// a merge in progress file
@@ -315,7 +310,7 @@ describe('tests multiple combination ', () => {
mode: vhdMode,
header: {
parentUnicodeName: 'gone.vhd',
parentUuid: uniqueIdBuffer(),
parentUid: crypto.randomBytes(16),
},
})
@@ -329,7 +324,7 @@ describe('tests multiple combination ', () => {
mode: vhdMode,
header: {
parentUnicodeName: 'ancestor.vhd' + (useAlias ? '.alias.vhd' : ''),
parentUuid: ancestor.footer.uuid,
parentUid: ancestor.footer.uuid,
},
})
// a grand child vhd in metadata
@@ -338,7 +333,7 @@ describe('tests multiple combination ', () => {
mode: vhdMode,
header: {
parentUnicodeName: 'child.vhd' + (useAlias ? '.alias.vhd' : ''),
parentUuid: child.footer.uuid,
parentUid: child.footer.uuid,
},
})
@@ -353,7 +348,7 @@ describe('tests multiple combination ', () => {
mode: vhdMode,
header: {
parentUnicodeName: 'cleanAncestor.vhd' + (useAlias ? '.alias.vhd' : ''),
parentUuid: cleanAncestor.footer.uuid,
parentUid: cleanAncestor.footer.uuid,
},
})

View File

@@ -31,48 +31,71 @@ const computeVhdsSize = (handler, vhdPaths) =>
}
)
// chain is [ ancestor, child1, ..., childn]
// 1. Create a VhdSynthetic from all children
// 2. Merge the VhdSynthetic into the ancestor
// 3. Delete all (now) unused VHDs
// 4. Rename the ancestor with the merged data to the latest child
// chain is an array of VHDs from child to parent
//
// VhdSynthetic
// |
// /‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾‾\
// [ ancestor, child1, ...,child n-1, childn ]
// | \___________________/ ^
// | | |
// | unused VHDs |
// | |
// \___________rename_____________/
async function mergeVhdChain(chain, { handler, logInfo, remove, merge }) {
// the whole chain will be merged into parent, parent will be renamed to child
// and all the others will deleted
async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
assert(chain.length >= 2)
const chainCopy = [...chain]
const parent = chainCopy.pop()
const children = chainCopy
let child = chain[0]
const parent = chain[chain.length - 1]
const children = chain.slice(0, -1).reverse()
chain
.slice(1)
.reverse()
.forEach(parent => {
onLog(`the parent ${parent} of the child ${child} is unused`)
})
if (merge) {
logInfo(`merging children into parent`, { childrenCount: children.length, parent })
// `mergeVhd` does not work with a stream, either
// - make it accept a stream
// - or create synthetic VHD which is not a stream
if (children.length !== 1) {
// TODO: implement merging multiple children
children.length = 1
child = children[0]
}
onLog(`merging ${child} into ${parent}`)
let done, total
const handle = setInterval(() => {
if (done !== undefined) {
logInfo(`merging children in progress`, { children, parent, doneCount: done, totalCount: total })
onLog(`merging ${child}: ${done}/${total}`)
}
}, 10e3)
const mergedSize = await mergeVhd(handler, parent, handler, children, {
logInfo,
onProgress({ done: d, total: t }) {
done = d
total = t
},
remove,
})
const mergedSize = await mergeVhd(
handler,
parent,
handler,
child,
// children.length === 1
// ? child
// : await createSyntheticStream(handler, children),
{
onProgress({ done: d, total: t }) {
done = d
total = t
},
}
)
clearInterval(handle)
await Promise.all([
VhdAbstract.rename(handler, parent, child),
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
if (remove) {
onLog(`deleting unused VHD ${child}`)
return VhdAbstract.unlink(handler, child)
}
}),
])
return mergedSize
}
}
@@ -115,19 +138,14 @@ const listVhds = async (handler, vmDir) => {
return { vhds, interruptedVhds, aliases }
}
async function checkAliases(
aliasPaths,
targetDataRepository,
{ handler, logInfo = noop, logWarn = console.warn, remove = false }
) {
async function checkAliases(aliasPaths, targetDataRepository, { handler, onLog = noop, remove = false }) {
const aliasFound = []
for (const path of aliasPaths) {
const target = await resolveVhdAlias(handler, path)
if (!isVhdFile(target)) {
logWarn('alias references non VHD target', { path, target })
onLog(`Alias ${path} references a non vhd target: ${target}`)
if (remove) {
logInfo('removing alias and non VHD target', { path, target })
await handler.unlink(target)
await handler.unlink(path)
}
@@ -142,13 +160,13 @@ async function checkAliases(
// error during dispose should not trigger a deletion
}
} catch (error) {
logWarn('missing or broken alias target', { target, path, error })
onLog(`target ${target} of alias ${path} is missing or broken`, { error })
if (remove) {
try {
await VhdAbstract.unlink(handler, path)
} catch (error) {
if (error.code !== 'ENOENT') {
logWarn('error deleting alias target', { target, path, error })
} catch (e) {
if (e.code !== 'ENOENT') {
onLog(`Error while deleting target ${target} of alias ${path}`, { error: e })
}
}
}
@@ -165,22 +183,20 @@ async function checkAliases(
entries.forEach(async entry => {
if (!aliasFound.includes(entry)) {
logWarn('no alias references VHD', { entry })
onLog(`the Vhd ${entry} is not referenced by a an alias`)
if (remove) {
logInfo('deleting unaliased VHD')
await VhdAbstract.unlink(handler, entry)
}
}
})
}
exports.checkAliases = checkAliases
const defaultMergeLimiter = limitConcurrency(1)
exports.cleanVm = async function cleanVm(
vmDir,
{ fixMetadata, remove, merge, mergeLimiter = defaultMergeLimiter, logInfo = noop, logWarn = console.warn }
{ fixMetadata, remove, merge, mergeLimiter = defaultMergeLimiter, onLog = noop }
) {
const limitedMergeVhdChain = mergeLimiter(mergeVhdChain)
@@ -211,9 +227,9 @@ exports.cleanVm = async function cleanVm(
})
} catch (error) {
vhds.delete(path)
logWarn('VHD check error', { path, error })
onLog(`error while checking the VHD with path ${path}`, { error })
if (error?.code === 'ERR_ASSERTION' && remove) {
logInfo('deleting broken path', { path })
onLog(`deleting broken ${path}`)
return VhdAbstract.unlink(handler, path)
}
}
@@ -225,12 +241,12 @@ exports.cleanVm = async function cleanVm(
const statePath = interruptedVhds.get(interruptedVhd)
interruptedVhds.delete(interruptedVhd)
logWarn('orphan merge state', {
onLog('orphan merge state', {
mergeStatePath: statePath,
missingVhdPath: interruptedVhd,
})
if (remove) {
logInfo('deleting orphan merge state', { statePath })
onLog(`deleting orphan merge state ${statePath}`)
await handler.unlink(statePath)
}
}
@@ -239,7 +255,7 @@ exports.cleanVm = async function cleanVm(
// check if alias are correct
// check if all vhd in data subfolder have a corresponding alias
await asyncMap(Object.keys(aliases), async dir => {
await checkAliases(aliases[dir], `${dir}/data`, { handler, logInfo, logWarn, remove })
await checkAliases(aliases[dir], `${dir}/data`, { handler, onLog, remove })
})
// remove VHDs with missing ancestors
@@ -261,9 +277,9 @@ exports.cleanVm = async function cleanVm(
if (!vhds.has(parent)) {
vhds.delete(vhdPath)
logWarn('parent VHD is missing', { parent, vhdPath })
onLog(`the parent ${parent} of the VHD ${vhdPath} is missing`)
if (remove) {
logInfo('deleting orphan VHD', { vhdPath })
onLog(`deleting orphan VHD ${vhdPath}`)
deletions.push(VhdAbstract.unlink(handler, vhdPath))
}
}
@@ -300,7 +316,7 @@ exports.cleanVm = async function cleanVm(
// check is not good enough to delete the file, the best we can do is report
// it
if (!(await this.isValidXva(path))) {
logWarn('XVA might be broken', { path })
onLog(`the XVA with path ${path} is potentially broken`)
}
})
@@ -314,7 +330,7 @@ exports.cleanVm = async function cleanVm(
try {
metadata = JSON.parse(await handler.readFile(json))
} catch (error) {
logWarn('failed to read metadata file', { json, error })
onLog(`failed to read metadata file ${json}`, { error })
jsons.delete(json)
return
}
@@ -325,9 +341,9 @@ exports.cleanVm = async function cleanVm(
if (xvas.has(linkedXva)) {
unusedXvas.delete(linkedXva)
} else {
logWarn('metadata XVA is missing', { json })
onLog(`the XVA linked to the metadata ${json} is missing`)
if (remove) {
logInfo('deleting incomplete backup', { json })
onLog(`deleting incomplete backup ${json}`)
jsons.delete(json)
await handler.unlink(json)
}
@@ -348,9 +364,9 @@ exports.cleanVm = async function cleanVm(
vhdsToJSons[path] = json
})
} else {
logWarn('some metadata VHDs are missing', { json, missingVhds })
onLog(`Some VHDs linked to the metadata ${json} are missing`, { missingVhds })
if (remove) {
logInfo('deleting incomplete backup', { json })
onLog(`deleting incomplete backup ${json}`)
jsons.delete(json)
await handler.unlink(json)
}
@@ -391,9 +407,9 @@ exports.cleanVm = async function cleanVm(
}
}
logWarn('unused VHD', { vhd })
onLog(`the VHD ${vhd} is unused`)
if (remove) {
logInfo('deleting unused VHD', { vhd })
onLog(`deleting unused VHD ${vhd}`)
unusedVhdsDeletion.push(VhdAbstract.unlink(handler, vhd))
}
}
@@ -417,7 +433,7 @@ exports.cleanVm = async function cleanVm(
const metadataWithMergedVhd = {}
const doMerge = async () => {
await asyncMap(toMerge, async chain => {
const merged = await limitedMergeVhdChain(chain, { handler, logInfo, logWarn, remove, merge })
const merged = await limitedMergeVhdChain(chain, { handler, onLog, remove, merge })
if (merged !== undefined) {
const metadataPath = vhdsToJSons[chain[0]] // all the chain should have the same metada file
metadataWithMergedVhd[metadataPath] = true
@@ -429,18 +445,18 @@ exports.cleanVm = async function cleanVm(
...unusedVhdsDeletion,
toMerge.length !== 0 && (merge ? Task.run({ name: 'merge' }, doMerge) : doMerge()),
asyncMap(unusedXvas, path => {
logWarn('unused XVA', { path })
onLog(`the XVA ${path} is unused`)
if (remove) {
logInfo('deleting unused XVA', { path })
onLog(`deleting unused XVA ${path}`)
return handler.unlink(path)
}
}),
asyncMap(xvaSums, path => {
// no need to handle checksums for XVAs deleted by the script, they will be handled by `unlink()`
if (!xvas.has(path.slice(0, -'.checksum'.length))) {
logInfo('unused XVA checksum', { path })
onLog(`the XVA checksum ${path} is unused`)
if (remove) {
logInfo('deleting unused XVA checksum', { path })
onLog(`deleting unused XVA checksum ${path}`)
return handler.unlink(path)
}
}
@@ -474,11 +490,11 @@ exports.cleanVm = async function cleanVm(
// don't warn if the size has changed after a merge
if (!merged && fileSystemSize !== size) {
logWarn('incorrect size in metadata', { size: size ?? 'none', fileSystemSize })
onLog(`incorrect size in metadata: ${size ?? 'none'} instead of ${fileSystemSize}`)
}
}
} catch (error) {
logWarn('failed to get metadata size', { metadataPath, error })
onLog(`failed to get size of ${metadataPath}`, { error })
return
}
@@ -488,7 +504,7 @@ exports.cleanVm = async function cleanVm(
try {
await handler.writeFile(metadataPath, JSON.stringify(metadata), { flags: 'w' })
} catch (error) {
logWarn('metadata size update failed', { metadataPath, error })
onLog(`failed to update size in backup metadata ${metadataPath} after merge`, { error })
}
}
})

View File

@@ -11,8 +11,6 @@ const { createVhdStreamWithLength } = require('vhd-lib')
const { defer } = require('golike-defer')
const { cancelableMap } = require('./_cancelableMap.js')
const { Task } = require('./Task.js')
const { pick } = require('lodash')
const TAG_BASE_DELTA = 'xo:base_delta'
exports.TAG_BASE_DELTA = TAG_BASE_DELTA
@@ -22,9 +20,6 @@ exports.TAG_COPY_SRC = TAG_COPY_SRC
const ensureArray = value => (value === undefined ? [] : Array.isArray(value) ? value : [value])
const resolveUuid = async (xapi, cache, uuid, type) => {
if (uuid == null) {
return uuid
}
let ref = cache.get(uuid)
if (ref === undefined) {
ref = await xapi.call(`${type}.get_by_uuid`, uuid)
@@ -65,6 +60,17 @@ exports.exportDeltaVm = async function exportDeltaVm(
return
}
// If the VDI name start with `[NOBAK]`, do not export it.
if (vdi.name_label.startsWith('[NOBAK]')) {
// FIXME: find a way to not create the VDI snapshot in the
// first time.
//
// The snapshot must not exist otherwise it could break the
// next export.
ignoreErrors.call(vdi.$destroy())
return
}
vbds[vbd.$ref] = vbd
const vdiRef = vdi.$ref
@@ -189,25 +195,19 @@ exports.importDeltaVm = defer(async function importDeltaVm(
let suspendVdi
if (vmRecord.power_state === 'Suspended') {
const vdi = vdiRecords[vmRecord.suspend_VDI]
if (vdi === undefined) {
Task.warning('Suspend VDI not available for this suspended VM', {
vm: pick(vmRecord, 'uuid', 'name_label'),
suspendVdi = await xapi.getRecord(
'VDI',
await xapi.VDI_create({
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: undefined,
[TAG_COPY_SRC]: vdi.uuid,
},
sr: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
})
} else {
suspendVdi = await xapi.getRecord(
'VDI',
await xapi.VDI_create({
...vdi,
other_config: {
...vdi.other_config,
[TAG_BASE_DELTA]: undefined,
[TAG_COPY_SRC]: vdi.uuid,
},
sr: mapVdisSrRefs[vdi.uuid] ?? sr.$ref,
})
)
$defer.onFailure(() => suspendVdi.$destroy())
}
)
$defer.onFailure(() => suspendVdi.$destroy())
}
// 1. Create the VM.

View File

@@ -1,52 +0,0 @@
- [File structure on remote](#file-structure-on-remote)
- [Structure of `metadata.json`](#structure-of-metadatajson)
- [Task logs](#task-logs)
- [During backup](#during-backup)
## File structure on remote
```
<remote>
├─ xo-config-backups
│ └─ <schedule ID>
│ └─ <YYYYMMDD>T<HHmmss>
│ ├─ metadata.json
│ └─ data.json
└─ xo-pool-metadata-backups
└─ <schedule ID>
└─ <pool UUID>
└─ <YYYYMMDD>T<HHmmss>
├─ metadata.json
└─ data
```
## Structure of `metadata.json`
```ts
interface Metadata {
jobId: String
jobName: String
scheduleId: String
scheduleName: String
timestamp: number
pool?: Pool
poolMaster?: Host
}
```
## Task logs
### During backup
```
job.start(data: { reportWhen: ReportWhen })
├─ task.start(data: { type: 'pool', id: string, pool?: Pool, poolMaster?: Host })
│ ├─ task.start(data: { type: 'remote', id: string })
│ │ └─ task.end
│ └─ task.end
├─ task.start(data: { type: 'xo' })
│ ├─ task.start(data: { type: 'remote', id: string })
│ │ └─ task.end
│ └─ task.end
└─ job.end
```

View File

@@ -1,207 +0,0 @@
- [File structure on remote](#file-structure-on-remote)
- [Attributes](#attributes)
- [Of created snapshots](#of-created-snapshots)
- [Of created VMs and snapshots](#of-created-vms-and-snapshots)
- [Of created VMs](#of-created-vms)
- [Task logs](#task-logs)
- [During backup](#during-backup)
- [During restoration](#during-restoration)
- [API](#api)
- [Run description object](#run-description-object)
- [`IdPattern`](#idpattern)
- [Settings](#settings)
- [Writer API](#writer-api)
## File structure on remote
```
<remote>
└─ xo-vm-backups
├─ index.json // TODO
└─ <VM UUID>
├─ index.json // TODO
├─ vdis
│ └─ <job UUID>
│ └─ <VDI UUID>
│ ├─ index.json // TODO
│ └─ <YYYYMMDD>T<HHmmss>.vhd
├─ <YYYYMMDD>T<HHmmss>.json // backup metadata
├─ <YYYYMMDD>T<HHmmss>.xva
└─ <YYYYMMDD>T<HHmmss>.xva.checksum
```
## Attributes
### Of created snapshots
- `other_config`:
- `xo:backup:deltaChainLength` = n (number of delta copies/replicated since a full)
- `xo:backup:exported` = 'true' (added at the end of the backup)
### Of created VMs and snapshots
- `other_config`:
- `xo:backup:datetime`: format is UTC %Y%m%dT%H:%M:%SZ
- from snapshots: snapshot.snapshot_time
- with offline backup: formatDateTime(Date.now())
- `xo:backup:job` = job.id
- `xo:backup:schedule` = schedule.id
- `xo:backup:vm` = vm.uuid
### Of created VMs
- `name_label`: `${original name} - ${job name} - (${safeDateFormat(backup timestamp)})`
- tag:
- copy in delta mode: `Continuous Replication`
- copy in full mode: `Disaster Recovery`
- imported from backup: `restored from backup`
- `blocked_operations.start`: message
- for copies/replications only, added after complete transfer
- `other_config[xo:backup:sr]` = sr.uuid
## Task logs
### During backup
```
job.start(data: { mode: Mode, reportWhen: ReportWhen })
├─ task.info(message: 'vms', data: { vms: string[] })
├─ task.warning(message: string)
├─ task.start(data: { type: 'VM', id: string })
│ ├─ task.warning(message: string)
| ├─ task.start(message: 'clean-vm')
│ │ └─ task.end
│ ├─ task.start(message: 'snapshot')
│ │ └─ task.end
│ ├─ task.start(message: 'export', data: { type: 'SR' | 'remote', id: string, isFull: boolean })
│ │ ├─ task.warning(message: string)
│ │ ├─ task.start(message: 'transfer')
│ │ │ ├─ task.warning(message: string)
│ │ │ └─ task.end(result: { size: number })
│ │ │
│ │ │ // in case there is a healthcheck scheduled for this vm in this job
│ │ ├─ task.start(message: 'health check')
│ │ │ ├─ task.start(message: 'transfer')
│ │ │ │ └─ task.end(result: { size: number })
│ │ │ ├─ task.start(message: 'vmstart')
│ │ │ │ └─ task.end
│ │ │ └─ task.end
│ │ │
│ │ │ // in case of full backup, DR and CR
│ │ ├─ task.start(message: 'clean')
│ │ │ ├─ task.warning(message: string)
│ │ │ └─ task.end
│ │ └─ task.end
| ├─ task.start(message: 'clean-vm')
│ │ └─ task.end
│ └─ task.end
└─ job.end
```
### During restoration
```
task.start(message: 'restore', data: { jobId: string, srId: string, time: number })
├─ task.start(message: 'transfer')
│ └─ task.end(result: { id: string, size: number })
└─ task.end
```
## API
### Run description object
This is a JavaScript object containing all the information necessary to run a backup job.
```coffee
# Information about the job itself
job:
# Unique identifier
id: string
# Human readable identifier
name: string
# Whether this job is doing Full Backup / Disaster Recovery or
# Delta Backup / Continuous Replication
mode: 'full' | 'delta'
# For backup jobs, indicates which remotes to use
remotes: IdPattern
settings:
# Used for the whole job
'': Settings
# Used for a specific schedule
[ScheduleId]: Settings
# Used for a specific VM
[VmId]: Settings
# For replication jobs, indicates which SRs to use
srs: IdPattern
# Here for historical reasons
type: 'backup'
# Indicates which VMs to backup/replicate
vms: IdPattern
# Indicates which XAPI to use to connect to a specific VM or SR
recordToXapi:
[ObjectId]: XapiId
# Information necessary to connect to each remote
remotes:
[RemoteId]:
url: string
# Indicates which schedule is used for this run
schedule:
id: ScheduleId
# Information necessary to connect to each XAPI
xapis:
[XapiId]:
allowUnauthorized: boolean
credentials:
password: string
username: string
url: string
```
### `IdPattern`
For a single object:
```
{ id: string }
```
For multiple objects:
```
{ id: { __or: string[] } }
```
> This syntax is compatible with [`value-matcher`](https://github.com/vatesfr/xen-orchestra/tree/master/packages/value-matcher).
### Settings
Settings are described in [`@xen-orchestra/backups/Backup.js](https://github.com/vatesfr/xen-orchestra/blob/master/%40xen-orchestra/backups/Backup.js).
## Writer API
- `beforeBackup()`
- **Delta**
- `checkBaseVdis(baseUuidToSrcVdi, baseVm)`
- `prepare({ isFull })`
- `transfer({ timestamp, deltaExport, sizeContainers })`
- `cleanup()`
- `healthCheck(sr)`
- **Full**
- `run({ timestamp, sizeContainer, stream })`
- `afterBackup()`

View File

@@ -1,6 +1,4 @@
#!/usr/bin/env node
// eslint-disable-next-line eslint-comments/disable-enable-pair
/* eslint-disable n/shebang */
'use strict'

View File

@@ -8,7 +8,7 @@
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"version": "0.25.0",
"version": "0.20.0",
"engines": {
"node": ">=14.6"
},
@@ -16,18 +16,16 @@
"postversion": "npm publish --access public"
},
"dependencies": {
"@vates/cached-dns.lookup": "^1.0.0",
"@vates/compose": "^2.1.0",
"@vates/decorate-with": "^2.0.0",
"@vates/decorate-with": "^1.0.0",
"@vates/disposable": "^0.1.1",
"@vates/parse-duration": "^0.1.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/fs": "^1.0.3",
"@xen-orchestra/fs": "^0.20.0",
"@xen-orchestra/log": "^0.3.0",
"@xen-orchestra/template": "^0.1.0",
"compare-versions": "^4.0.1",
"d3-time-format": "^3.0.0",
"decorator-synchronized": "^0.6.0",
"end-of-stream": "^1.4.4",
"fs-extra": "^10.0.0",
"golike-defer": "^0.5.1",
@@ -38,15 +36,11 @@
"promise-toolbox": "^0.21.0",
"proper-lockfile": "^4.1.2",
"uuid": "^8.3.2",
"vhd-lib": "^3.2.0",
"vhd-lib": "^3.1.0",
"yazl": "^2.5.1"
},
"devDependencies": {
"rimraf": "^3.0.2",
"tmp": "^0.2.1"
},
"peerDependencies": {
"@xen-orchestra/xapi": "^1.2.0"
"@xen-orchestra/xapi": "^0.9.0"
},
"license": "AGPL-3.0-or-later",
"author": {

View File

@@ -19,8 +19,6 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
const { checkVhd } = require('./_checkVhd.js')
const { packUuid } = require('./_packUuid.js')
const { Disposable } = require('promise-toolbox')
const { HealthCheckVmBackup } = require('../HealthCheckVmBackup.js')
const { ImportVmBackup } = require('../ImportVmBackup.js')
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
@@ -71,35 +69,6 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
return this._cleanVm({ merge: true })
}
healthCheck(sr) {
return Task.run(
{
name: 'health check',
},
async () => {
const xapi = sr.$xapi
const srUuid = sr.uuid
const adapter = this._adapter
const metadata = await adapter.readVmBackupMetadata(this._metadataFileName)
const { id: restoredId } = await new ImportVmBackup({
adapter,
metadata,
srUuid,
xapi,
}).run()
const restoredVm = xapi.getObject(restoredId)
try {
await new HealthCheckVmBackup({
restoredVm,
xapi,
}).run()
} finally {
await xapi.VM_destroy(restoredVm.$ref)
}
}
)
}
prepare({ isFull }) {
// create the task related to this export and ensure all methods are called in this context
const task = new Task({
@@ -111,9 +80,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
},
})
this.transfer = task.wrapFn(this.transfer)
this.healthCheck = task.wrapFn(this.healthCheck)
this.cleanup = task.wrapFn(this.cleanup)
this.afterBackup = task.wrapFn(this.afterBackup, true)
this.cleanup = task.wrapFn(this.cleanup, true)
return task.run(() => this._prepare())
}
@@ -189,7 +156,7 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
}/${adapter.getVhdFileName(basename)}`
)
const metadataFilename = (this._metadataFileName = `${backupDir}/${basename}.json`)
const metadataFilename = `${backupDir}/${basename}.json`
const metadataContent = {
jobId,
mode: job.mode,

View File

@@ -9,6 +9,4 @@ exports.AbstractWriter = class AbstractWriter {
beforeBackup() {}
afterBackup() {}
healthCheck(sr) {}
}

View File

@@ -6,9 +6,8 @@ const { join } = require('path')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const MergeWorker = require('../merge-worker/index.js')
const { formatFilenameDate } = require('../_filenameDate.js')
const { Task } = require('../Task.js')
const { info, warn } = createLogger('xo:backups:MixinBackupWriter')
const { warn } = createLogger('xo:backups:MixinBackupWriter')
exports.MixinBackupWriter = (BaseClass = Object) =>
class MixinBackupWriter extends BaseClass {
@@ -26,17 +25,11 @@ exports.MixinBackupWriter = (BaseClass = Object) =>
async _cleanVm(options) {
try {
return await Task.run({ name: 'clean-vm' }, () => {
return this._adapter.cleanVm(this.#vmBackupDir, {
...options,
fixMetadata: true,
logInfo: info,
logWarn: (message, data) => {
warn(message, data)
Task.warning(message, data)
},
lock: false,
})
return await this._adapter.cleanVm(this.#vmBackupDir, {
...options,
fixMetadata: true,
onLog: warn,
lock: false,
})
} catch (error) {
warn(error)
@@ -71,6 +64,5 @@ exports.MixinBackupWriter = (BaseClass = Object) =>
const remotePath = handler._getRealPath()
await MergeWorker.run(remotePath)
}
await this._adapter.invalidateVmBackupListCache(this._backup.vm.uuid)
}
}

View File

@@ -18,7 +18,7 @@
"preferGlobal": true,
"dependencies": {
"golike-defer": "^0.5.1",
"xen-api": "^1.2.1"
"xen-api": "^0.36.0"
},
"scripts": {
"postversion": "npm publish"

View File

@@ -0,0 +1,3 @@
'use strict'
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -0,0 +1 @@
../../scripts/babel-eslintrc.js

View File

@@ -27,17 +27,31 @@
"url": "https://vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"browserslist": [
">2%"
],
"engines": {
"node": ">=8.3"
"node": ">=6"
},
"dependencies": {
"lodash": "^4.17.4",
"moment-timezone": "^0.5.14"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"prebuild": "yarn run clean",
"predev": "yarn run clean",
"prepublishOnly": "yarn run build",
"postversion": "npm publish"
}
}

View File

@@ -1,9 +1,7 @@
'use strict'
import moment from 'moment-timezone'
const moment = require('moment-timezone')
const next = require('./next')
const parse = require('./parse')
import next from './next'
import parse from './parse'
const MAX_DELAY = 2 ** 31 - 1
@@ -96,5 +94,4 @@ class Schedule {
}
}
const createSchedule = (...args) => new Schedule(...args)
exports.createSchedule = createSchedule
export const createSchedule = (...args) => new Schedule(...args)

View File

@@ -1,8 +1,6 @@
/* eslint-env jest */
'use strict'
const { createSchedule } = require('./')
import { createSchedule } from './'
jest.useFakeTimers()

View File

@@ -1,7 +1,5 @@
'use strict'
const moment = require('moment-timezone')
const sortedIndex = require('lodash/sortedIndex')
import moment from 'moment-timezone'
import sortedIndex from 'lodash/sortedIndex'
const NEXT_MAPPING = {
month: { year: 1 },
@@ -33,7 +31,7 @@ const setFirstAvailable = (date, unit, values) => {
}
// returns the next run, after the passed date
module.exports = (schedule, fromDate) => {
export default (schedule, fromDate) => {
let date = moment(fromDate)
.set({
second: 0,

View File

@@ -1,12 +1,10 @@
/* eslint-env jest */
'use strict'
import mapValues from 'lodash/mapValues'
import moment from 'moment-timezone'
const mapValues = require('lodash/mapValues')
const moment = require('moment-timezone')
const next = require('./next')
const parse = require('./parse')
import next from './next'
import parse from './parse'
const N = (pattern, fromDate = '2018-04-09T06:25') => {
const iso = next(parse(pattern), moment.utc(fromDate)).toISOString()

View File

@@ -1,5 +1,3 @@
'use strict'
const compareNumbers = (a, b) => a - b
const createParser = ({ fields: [...fields], presets: { ...presets } }) => {
@@ -150,7 +148,7 @@ const createParser = ({ fields: [...fields], presets: { ...presets } }) => {
return parse
}
module.exports = createParser({
export default createParser({
fields: [
{
name: 'minute',

View File

@@ -1,8 +1,6 @@
/* eslint-env jest */
'use strict'
const parse = require('./parse')
import parse from './parse'
describe('parse()', () => {
it('works', () => {

View File

@@ -22,7 +22,7 @@ await ee.emitAsync('start')
// error handling though:
await ee.emitAsync(
{
onError(error, event, listener) {
onError(error) {
console.warn(error)
},
},

View File

@@ -40,7 +40,7 @@ await ee.emitAsync('start')
// error handling though:
await ee.emitAsync(
{
onError(error, event, listener) {
onError(error) {
console.warn(error)
},
},

View File

@@ -1,7 +1,5 @@
'use strict'
const identity = v => v
module.exports = function emitAsync(event) {
let opts
let i = 1
@@ -19,18 +17,12 @@ module.exports = function emitAsync(event) {
}
const onError = opts != null && opts.onError
const addErrorHandler = onError
? (promise, listener) => promise.catch(error => onError(error, event, listener))
: identity
return Promise.all(
this.listeners(event).map(listener =>
addErrorHandler(
new Promise(resolve => {
resolve(listener.apply(this, args))
}),
listener
)
new Promise(resolve => {
resolve(listener.apply(this, args))
}).catch(onError)
)
)
}

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/emit-async",
"version": "1.0.0",
"version": "0.1.0",
"license": "ISC",
"description": "Emit an event for async listeners to settle",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/emit-async",

View File

@@ -1,62 +0,0 @@
#!/usr/bin/env node
'use strict'
const Disposable = require('promise-toolbox/Disposable')
const { getBoundPropertyDescriptor } = require('bind-property-descriptor')
const { getSyncedHandler } = require('./')
const { getPrototypeOf, ownKeys } = Reflect
function getAllBoundDescriptors(object) {
const descriptors = { __proto__: null }
let current = object
do {
ownKeys(current).forEach(key => {
if (!(key in descriptors)) {
descriptors[key] = getBoundPropertyDescriptor(current, key, object)
}
})
} while ((current = getPrototypeOf(current)) !== null)
return descriptors
}
// https://gist.github.com/julien-f/18161f6032e808d6fa08782951ce3bfb
async function repl({ prompt, context } = {}) {
const repl = require('repl').start({
ignoreUndefined: true,
prompt,
})
if (context !== undefined) {
Object.defineProperties(repl.context, Object.getOwnPropertyDescriptors(context))
}
const { eval: evaluate } = repl
repl.eval = (cmd, context, filename, cb) => {
evaluate.call(repl, cmd, context, filename, (error, result) => {
if (error != null) {
return cb(error)
}
Promise.resolve(result).then(result => cb(undefined, result), cb)
})
}
return new Promise((resolve, reject) => {
repl.on('error', reject).on('exit', resolve)
})
}
async function* main([url]) {
if (url === undefined) {
throw new TypeError('missing arg <url>')
}
const handler = yield getSyncedHandler({ url })
await repl({
prompt: handler.type + '> ',
context: Object.create(null, getAllBoundDescriptors(handler)),
})
}
Disposable.wrap(main)(process.argv.slice(2)).catch(error => {
console.error('FATAL:', error)
process.exitCode = 1
})

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/fs",
"version": "1.0.3",
"version": "0.20.0",
"license": "AGPL-3.0-or-later",
"description": "The File System for Xen Orchestra backups.",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/fs",
@@ -13,25 +13,18 @@
},
"preferGlobal": true,
"main": "dist/",
"bin": {
"xo-fs": "./cli.js"
},
"engines": {
"node": ">=14"
},
"dependencies": {
"@aws-sdk/client-s3": "^3.54.0",
"@aws-sdk/lib-storage": "^3.54.0",
"@aws-sdk/middleware-apply-body-checksum": "^3.58.0",
"@aws-sdk/node-http-handler": "^3.54.0",
"@marsaud/smb2": "^0.18.0",
"@sindresorhus/df": "^3.1.1",
"@vates/async-each": "^0.1.0",
"@sullux/aws-sdk": "^1.0.5",
"@vates/coalesce-calls": "^0.1.0",
"@vates/decorate-with": "^2.0.0",
"@vates/decorate-with": "^1.0.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.3.0",
"bind-property-descriptor": "^2.0.0",
"aws-sdk": "^2.686.0",
"decorator-synchronized": "^0.6.0",
"execa": "^5.0.0",
"fs-extra": "^10.0.0",
@@ -49,11 +42,12 @@
"@babel/core": "^7.0.0",
"@babel/plugin-proposal-decorators": "^7.1.6",
"@babel/plugin-proposal-function-bind": "^7.0.0",
"@babel/preset-env": "^7.8.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.4.4",
"@babel/preset-env": "^7.0.0",
"async-iterator-to-stream": "^1.1.0",
"babel-plugin-lodash": "^3.3.2",
"cross-env": "^7.0.2",
"dotenv": "^16.0.0",
"dotenv": "^15.0.0",
"rimraf": "^3.0.0"
},
"scripts": {

View File

@@ -1,18 +0,0 @@
/**
* @param {Readable} inputStream
* @param {Buffer} destinationBuffer
* @returns {Promise<int>} Buffer length
* @private
*/
export default function copyStreamToBuffer(inputStream, destinationBuffer) {
return new Promise((resolve, reject) => {
let index = 0
inputStream.on('data', chunk => {
chunk.copy(destinationBuffer, index)
index += chunk.length
})
inputStream.on('end', () => resolve(index))
inputStream.on('error', err => reject(err))
})
}

View File

@@ -1,21 +0,0 @@
/* eslint-env jest */
import { Readable } from 'readable-stream'
import copyStreamToBuffer from './_copyStreamToBuffer.js'
describe('copyStreamToBuffer', () => {
it('should copy the stream to the buffer', async () => {
const stream = new Readable({
read() {
this.push('hello')
this.push(null)
},
})
const buffer = Buffer.alloc(3)
await copyStreamToBuffer(stream, buffer)
expect(buffer.toString()).toBe('hel')
})
})

View File

@@ -1,13 +0,0 @@
/**
* @param {Readable} stream
* @returns {Promise<Buffer>}
* @private
*/
export default function createBufferFromStream(stream) {
return new Promise((resolve, reject) => {
const chunks = []
stream.on('data', chunk => chunks.push(chunk))
stream.on('end', () => resolve(Buffer.concat(chunks)))
stream.on('error', error => reject(error))
})
}

View File

@@ -1,19 +0,0 @@
/* eslint-env jest */
import { Readable } from 'readable-stream'
import createBufferFromStream from './_createBufferFromStream.js'
describe('createBufferFromStream', () => {
it('should create a buffer from a stream', async () => {
const stream = new Readable({
read() {
this.push('hello')
this.push(null)
},
})
const buffer = await createBufferFromStream(stream)
expect(buffer.toString()).toBe('hello')
})
})

View File

@@ -1,4 +0,0 @@
export default function guessAwsRegion(host) {
const matches = /^s3\.([^.]+)\.amazonaws.com$/.exec(host)
return matches !== null ? matches[1] : 'us-east-1'
}

View File

@@ -1,17 +0,0 @@
/* eslint-env jest */
import guessAwsRegion from './_guessAwsRegion.js'
describe('guessAwsRegion', () => {
it('should return region from AWS URL', async () => {
const region = guessAwsRegion('s3.test-region.amazonaws.com')
expect(region).toBe('test-region')
})
it('should return default region if none is found is AWS URL', async () => {
const region = guessAwsRegion('s3.amazonaws.com')
expect(region).toBe('us-east-1')
})
})

View File

@@ -0,0 +1,9 @@
import path from 'path'
const { resolve } = path.posix
// normalize the path:
// - does not contains `.` or `..` (cannot escape root dir)
// - always starts with `/`
const normalizePath = path => resolve('/', path)
export { normalizePath as default }

View File

@@ -1,21 +0,0 @@
import path from 'path'
const { basename, dirname, join, resolve, sep } = path.posix
export { basename, dirname, join }
// normalize the path:
// - does not contains `.` or `..` (cannot escape root dir)
// - always starts with `/`
// - no trailing slash (expect for root)
// - no duplicate slashes
export const normalize = path => resolve('/', path)
export function split(path) {
const parts = normalize(path).split(sep)
// remove first (empty) entry
parts.shift()
return parts
}

View File

@@ -1,7 +1,7 @@
import asyncMapSettled from '@xen-orchestra/async-map/legacy'
import getStream from 'get-stream'
import path, { basename } from 'path'
import { coalesceCalls } from '@vates/coalesce-calls'
import { createLogger } from '@xen-orchestra/log'
import { fromCallback, fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import { parse } from 'xo-remote-parser'
@@ -9,10 +9,10 @@ import { pipeline } from 'stream'
import { randomBytes } from 'crypto'
import { synchronized } from 'decorator-synchronized'
import { basename, dirname, normalize as normalizePath } from './_path'
import normalizePath from './_normalizePath'
import { createChecksumStream, validChecksumOfReadStream } from './checksum'
const { warn } = createLogger('@xen-orchestra:fs')
const { dirname } = path.posix
const checksumFile = file => file + '.checksum'
const computeRate = (hrtime, size) => {
@@ -360,12 +360,11 @@ export default class RemoteHandlerAbstract {
readRate: computeRate(readDuration, SIZE),
}
} catch (error) {
warn(`error while testing the remote at step ${step}`, { error })
return {
success: false,
step,
file: testFileName,
error,
error: error.message || String(error),
}
} finally {
ignoreErrors.call(this._unlink(testFileName))
@@ -552,9 +551,7 @@ export default class RemoteHandlerAbstract {
const files = await this._list(dir)
await asyncMapSettled(files, file =>
this._unlink(`${dir}/${file}`).catch(error => {
// Unlink dir behavior is not consistent across platforms
// https://github.com/nodejs/node-v0.x-archive/issues/5791
if (error.code === 'EISDIR' || error.code === 'EPERM') {
if (error.code === 'EISDIR') {
return this._rmtree(`${dir}/${file}`)
}
throw error

View File

@@ -1,33 +1,13 @@
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CopyObjectCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client,
UploadPartCommand,
UploadPartCopyCommand,
} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'
import { NodeHttpHandler } from '@aws-sdk/node-http-handler'
import { getApplyMd5BodyChecksumPlugin } from '@aws-sdk/middleware-apply-body-checksum'
import aws from '@sullux/aws-sdk'
import assert from 'assert'
import { Agent as HttpAgent } from 'http'
import { Agent as HttpsAgent } from 'https'
import http from 'http'
import https from 'https'
import pRetry from 'promise-toolbox/retry'
import { createLogger } from '@xen-orchestra/log'
import { decorateWith } from '@vates/decorate-with'
import { PassThrough, pipeline } from 'stream'
import { parse } from 'xo-remote-parser'
import copyStreamToBuffer from './_copyStreamToBuffer.js'
import createBufferFromStream from './_createBufferFromStream.js'
import guessAwsRegion from './_guessAwsRegion.js'
import RemoteHandlerAbstract from './abstract'
import { basename, join, split } from './_path'
import { asyncEach } from '@vates/async-each'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -44,115 +24,78 @@ const { warn } = createLogger('xo:fs:s3')
export default class S3Handler extends RemoteHandlerAbstract {
constructor(remote, _opts) {
super(remote)
const {
allowUnauthorized,
host,
path,
username,
password,
protocol,
region = guessAwsRegion(host),
} = parse(remote.url)
this._s3 = new S3Client({
const { allowUnauthorized, host, path, username, password, protocol, region } = parse(remote.url)
const params = {
accessKeyId: username,
apiVersion: '2006-03-01',
endpoint: `${protocol}://${host}`,
forcePathStyle: true,
credentials: {
accessKeyId: username,
secretAccessKey: password,
endpoint: host,
s3ForcePathStyle: true,
secretAccessKey: password,
signatureVersion: 'v4',
httpOptions: {
timeout: 600000,
},
tls: protocol === 'https',
region,
requestHandler: new NodeHttpHandler({
socketTimeout: 600000,
httpAgent: new HttpAgent({
keepAlive: true,
}),
httpsAgent: new HttpsAgent({
rejectUnauthorized: !allowUnauthorized,
keepAlive: true,
}),
}),
})
}
if (protocol === 'http') {
params.httpOptions.agent = new http.Agent({ keepAlive: true })
params.sslEnabled = false
} else if (protocol === 'https') {
params.httpOptions.agent = new https.Agent({
rejectUnauthorized: !allowUnauthorized,
keepAlive: true,
})
}
if (region !== undefined) {
params.region = region
}
// Workaround for https://github.com/aws/aws-sdk-js-v3/issues/2673
this._s3.middlewareStack.use(getApplyMd5BodyChecksumPlugin(this._s3.config))
this._s3 = aws(params).s3
const parts = split(path)
this._bucket = parts.shift()
this._dir = join(...parts)
const splitPath = path.split('/').filter(s => s.length)
this._bucket = splitPath.shift()
this._dir = splitPath.join('/')
}
get type() {
return 's3'
}
_makeCopySource(path) {
return join(this._bucket, this._dir, path)
}
_makeKey(file) {
return join(this._dir, file)
}
_makePrefix(dir) {
const prefix = join(this._dir, dir, '/')
// no prefix for root
if (prefix !== './') {
return prefix
}
}
_createParams(file) {
return { Bucket: this._bucket, Key: this._makeKey(file) }
return { Bucket: this._bucket, Key: this._dir + file }
}
async _multipartCopy(oldPath, newPath) {
const size = await this._getSize(oldPath)
const CopySource = this._makeCopySource(oldPath)
const multipartParams = await this._s3.send(new CreateMultipartUploadCommand({ ...this._createParams(newPath) }))
const CopySource = `/${this._bucket}/${this._dir}${oldPath}`
const multipartParams = await this._s3.createMultipartUpload({ ...this._createParams(newPath) })
const param2 = { ...multipartParams, CopySource }
try {
const parts = []
let start = 0
while (start < size) {
const partNumber = parts.length + 1
const upload = await this._s3.send(
new UploadPartCopyCommand({
...multipartParams,
CopySource,
CopySourceRange: `bytes=${start}-${Math.min(start + MAX_PART_SIZE, size) - 1}`,
PartNumber: partNumber,
})
)
parts.push({ ETag: upload.CopyPartResult.ETag, PartNumber: partNumber })
const range = `bytes=${start}-${Math.min(start + MAX_PART_SIZE, size) - 1}`
const partParams = { ...param2, PartNumber: parts.length + 1, CopySourceRange: range }
const upload = await this._s3.uploadPartCopy(partParams)
parts.push({ ETag: upload.CopyPartResult.ETag, PartNumber: partParams.PartNumber })
start += MAX_PART_SIZE
}
await this._s3.send(
new CompleteMultipartUploadCommand({
...multipartParams,
MultipartUpload: { Parts: parts },
})
)
await this._s3.completeMultipartUpload({ ...multipartParams, MultipartUpload: { Parts: parts } })
} catch (e) {
await this._s3.send(new AbortMultipartUploadCommand(multipartParams))
await this._s3.abortMultipartUpload(multipartParams)
throw e
}
}
async _copy(oldPath, newPath) {
const CopySource = this._makeCopySource(oldPath)
const CopySource = `/${this._bucket}/${this._dir}${oldPath}`
try {
await this._s3.send(
new CopyObjectCommand({
...this._createParams(newPath),
CopySource,
})
)
await this._s3.copyObject({
...this._createParams(newPath),
CopySource,
})
} catch (e) {
// object > 5GB must be copied part by part
if (e.name === 'EntityTooLarge') {
if (e.code === 'EntityTooLarge') {
return this._multipartCopy(oldPath, newPath)
}
throw e
@@ -160,22 +103,20 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _isNotEmptyDir(path) {
const result = await this._s3.send(
new ListObjectsV2Command({
Bucket: this._bucket,
MaxKeys: 1,
Prefix: this._makePrefix(path),
})
)
return result.Contents?.length > 0
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
MaxKeys: 1,
Prefix: this._dir + path + '/',
})
return result.Contents.length !== 0
}
async _isFile(path) {
try {
await this._s3.send(new HeadObjectCommand(this._createParams(path)))
await this._s3.headObject(this._createParams(path))
return true
} catch (error) {
if (error.name === 'NotFound') {
if (error.code === 'NotFound') {
return false
}
throw error
@@ -183,23 +124,13 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _outputStream(path, input, { validator }) {
// Workaround for "ReferenceError: ReadableStream is not defined"
// https://github.com/aws/aws-sdk-js-v3/issues/2522
const Body = new PassThrough()
pipeline(input, Body, () => {})
const upload = new Upload({
client: this._s3,
queueSize: 1,
partSize: IDEAL_FRAGMENT_SIZE,
params: {
await this._s3.upload(
{
...this._createParams(path),
Body,
Body: input,
},
})
await upload.done()
{ partSize: IDEAL_FRAGMENT_SIZE, queueSize: 1 }
)
if (validator !== undefined) {
try {
await validator.call(this, path)
@@ -215,7 +146,7 @@ export default class S3Handler extends RemoteHandlerAbstract {
// https://www.backblaze.com/b2/docs/calling.html#error_handling
@decorateWith(pRetry.wrap, {
delays: [100, 200, 500, 1000, 2000],
when: e => e.$metadata?.httpStatusCode === 500,
when: e => e.code === 'InternalError',
onRetry(error) {
warn('retrying writing file', {
attemptNumber: this.attemptNumber,
@@ -226,31 +157,23 @@ export default class S3Handler extends RemoteHandlerAbstract {
},
})
async _writeFile(file, data, options) {
return this._s3.send(
new PutObjectCommand({
...this._createParams(file),
Body: data,
})
)
return this._s3.putObject({ ...this._createParams(file), Body: data })
}
async _createReadStream(path, options) {
try {
return (await this._s3.send(new GetObjectCommand(this._createParams(path)))).Body
} catch (e) {
if (e.name === 'NoSuchKey') {
const error = new Error(`ENOENT: no such file '${path}'`)
error.code = 'ENOENT'
error.path = path
throw error
}
throw e
if (!(await this._isFile(path))) {
const error = new Error(`ENOENT: no such file '${path}'`)
error.code = 'ENOENT'
error.path = path
throw error
}
// https://github.com/Sullux/aws-sdk/issues/11
return this._s3.getObject.raw(this._createParams(path)).createReadStream()
}
async _unlink(path) {
await this._s3.send(new DeleteObjectCommand(this._createParams(path)))
await this._s3.deleteObject(this._createParams(path))
if (await this._isNotEmptyDir(path)) {
const error = new Error(`EISDIR: illegal operation on a directory, unlink '${path}'`)
error.code = 'EISDIR'
@@ -260,40 +183,38 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
async _list(dir) {
let NextContinuationToken
const uniq = new Set()
const Prefix = this._makePrefix(dir)
function splitPath(path) {
return path.split('/').filter(d => d.length)
}
do {
const result = await this._s3.send(
new ListObjectsV2Command({
Bucket: this._bucket,
Prefix,
Delimiter: '/',
// will only return path until delimiters
ContinuationToken: NextContinuationToken,
})
)
const prefix = [this._dir, dir].join('/')
const splitPrefix = splitPath(prefix)
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
Prefix: splitPrefix.join('/') + '/', // need slash at the end with the use of delimiters
Delimiter: '/', // will only return path until delimiters
})
if (result.IsTruncated) {
warn(`need pagination to browse the directory ${dir} completely`)
NextContinuationToken = result.NextContinuationToken
} else {
NextContinuationToken = undefined
}
if (result.IsTruncated) {
const error = new Error('more than 1000 objects, unsupported in this implementation')
error.dir = dir
throw error
}
// subdirectories
for (const entry of result.CommonPrefixes ?? []) {
uniq.add(basename(entry.Prefix))
}
const uniq = []
// files
for (const entry of result.Contents ?? []) {
uniq.add(basename(entry.Key))
}
} while (NextContinuationToken !== undefined)
// sub directories
for (const entry of result.CommonPrefixes) {
const line = splitPath(entry.Prefix)
uniq.push(line[line.length - 1])
}
// files
for (const entry of result.Contents) {
const line = splitPath(entry.Key)
uniq.push(line[line.length - 1])
}
return [...uniq]
return uniq
}
async _mkdir(path) {
@@ -309,14 +230,14 @@ export default class S3Handler extends RemoteHandlerAbstract {
// s3 doesn't have a rename operation, so copy + delete source
async _rename(oldPath, newPath) {
await this.copy(oldPath, newPath)
await this._s3.send(new DeleteObjectCommand(this._createParams(oldPath)))
await this._s3.deleteObject(this._createParams(oldPath))
}
async _getSize(file) {
if (typeof file !== 'string') {
file = file.fd
}
const result = await this._s3.send(new HeadObjectCommand(this._createParams(file)))
const result = await this._s3.headObject(this._createParams(file))
return +result.ContentLength
}
@@ -327,11 +248,11 @@ export default class S3Handler extends RemoteHandlerAbstract {
const params = this._createParams(file)
params.Range = `bytes=${position}-${position + buffer.length - 1}`
try {
const result = await this._s3.send(new GetObjectCommand(params))
const bytesRead = await copyStreamToBuffer(result.Body, buffer)
return { bytesRead, buffer }
const result = await this._s3.getObject(params)
result.Body.copy(buffer)
return { bytesRead: result.Body.length, buffer }
} catch (e) {
if (e.name === 'NoSuchKey') {
if (e.code === 'NoSuchKey') {
if (await this._isNotEmptyDir(file)) {
const error = new Error(`${file} is a directory`)
error.code = 'EISDIR'
@@ -358,28 +279,22 @@ export default class S3Handler extends RemoteHandlerAbstract {
// @todo : use parallel processing for unlink
async _rmtree(path) {
let NextContinuationToken
const Prefix = this._makePrefix(path)
do {
const result = await this._s3.send(
new ListObjectsV2Command({
Bucket: this._bucket,
Prefix,
ContinuationToken: NextContinuationToken,
})
)
const result = await this._s3.listObjectsV2({
Bucket: this._bucket,
Prefix: this._dir + path + '/',
ContinuationToken: NextContinuationToken,
})
NextContinuationToken = result.IsTruncated ? result.NextContinuationToken : undefined
await asyncEach(
result.Contents ?? [],
result.Contents,
async ({ Key }) => {
// _unlink will add the prefix, but Key contains everything
// also we don't need to check if we delete a directory, since the list only return files
await this._s3.send(
new DeleteObjectCommand({
Bucket: this._bucket,
Key,
})
)
await this._s3.deleteObject({
Bucket: this._bucket,
Key,
})
},
{
concurrency: 16,
@@ -395,9 +310,9 @@ export default class S3Handler extends RemoteHandlerAbstract {
const uploadParams = this._createParams(file)
let fileSize
try {
fileSize = +(await this._s3.send(new HeadObjectCommand(uploadParams))).ContentLength
fileSize = +(await this._s3.headObject(uploadParams)).ContentLength
} catch (e) {
if (e.name === 'NotFound') {
if (e.code === 'NotFound') {
fileSize = 0
} else {
throw e
@@ -405,19 +320,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
}
if (fileSize < MIN_PART_SIZE) {
const resultBuffer = Buffer.alloc(Math.max(fileSize, position + buffer.length))
if (fileSize !== 0) {
const result = await this._s3.send(new GetObjectCommand(uploadParams))
await copyStreamToBuffer(result.Body, resultBuffer)
} else {
Buffer.alloc(0).copy(resultBuffer)
}
const fileContent = fileSize !== 0 ? (await this._s3.getObject(uploadParams)).Body : Buffer.alloc(0)
fileContent.copy(resultBuffer)
buffer.copy(resultBuffer, position)
await this._s3.send(
new PutObjectCommand({
...uploadParams,
Body: resultBuffer,
})
)
await this._s3.putObject({ ...uploadParams, Body: resultBuffer })
return { buffer, bytesWritten: buffer.length }
} else {
// using this trick: https://stackoverflow.com/a/38089437/72637
@@ -428,10 +334,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
// `edit` will always be an upload part
// `suffix` will always be sourced from uploadPartCopy()
// Then everything will be sliced in 5Gb parts before getting uploaded
const multipartParams = await this._s3.send(new CreateMultipartUploadCommand(uploadParams))
const multipartParams = await this._s3.createMultipartUpload(uploadParams)
const copyMultipartParams = {
...multipartParams,
CopySource: this._makeCopySource(file),
CopySource: `/${this._bucket}/${this._dir + file}`,
}
try {
const parts = []
@@ -458,20 +364,14 @@ export default class S3Handler extends RemoteHandlerAbstract {
assert.strictEqual(fragmentEnd - prefixPosition <= MAX_PART_SIZE, true)
const range = `bytes=${prefixPosition}-${fragmentEnd - 1}`
const copyPrefixParams = { ...copyMultipartParams, PartNumber: partNumber++, CopySourceRange: range }
const part = await this._s3.send(new UploadPartCopyCommand(copyPrefixParams))
const part = await this._s3.uploadPartCopy(copyPrefixParams)
parts.push({ ETag: part.CopyPartResult.ETag, PartNumber: copyPrefixParams.PartNumber })
prefixPosition += prefixFragmentSize
}
if (prefixLastFragmentSize) {
// grab everything from the prefix that was too small to be copied, download and merge to the edit buffer.
const downloadParams = { ...uploadParams, Range: `bytes=${prefixPosition}-${prefixSize - 1}` }
let prefixBuffer
if (prefixSize > 0) {
const result = await this._s3.send(new GetObjectCommand(downloadParams))
prefixBuffer = await createBufferFromStream(result.Body)
} else {
prefixBuffer = Buffer.alloc(0)
}
const prefixBuffer = prefixSize > 0 ? (await this._s3.getObject(downloadParams)).Body : Buffer.alloc(0)
editBuffer = Buffer.concat([prefixBuffer, buffer])
editBufferOffset -= prefixLastFragmentSize
}
@@ -486,12 +386,11 @@ export default class S3Handler extends RemoteHandlerAbstract {
hasSuffix = suffixSize > 0
const prefixRange = `bytes=${complementOffset}-${complementOffset + complementSize - 1}`
const downloadParams = { ...uploadParams, Range: prefixRange }
const result = await this._s3.send(new GetObjectCommand(downloadParams))
const complementBuffer = await createBufferFromStream(result.Body)
const complementBuffer = (await this._s3.getObject(downloadParams)).Body
editBuffer = Buffer.concat([editBuffer, complementBuffer])
}
const editParams = { ...multipartParams, Body: editBuffer, PartNumber: partNumber++ }
const editPart = await this._s3.send(new UploadPartCommand(editParams))
const editPart = await this._s3.uploadPart(editParams)
parts.push({ ETag: editPart.ETag, PartNumber: editParams.PartNumber })
if (hasSuffix) {
// use ceil because the last fragment can be arbitrarily small.
@@ -502,19 +401,17 @@ export default class S3Handler extends RemoteHandlerAbstract {
assert.strictEqual(Math.min(fileSize, fragmentEnd) - suffixFragmentOffset <= MAX_PART_SIZE, true)
const suffixRange = `bytes=${suffixFragmentOffset}-${Math.min(fileSize, fragmentEnd) - 1}`
const copySuffixParams = { ...copyMultipartParams, PartNumber: partNumber++, CopySourceRange: suffixRange }
const suffixPart = (await this._s3.send(new UploadPartCopyCommand(copySuffixParams))).CopyPartResult
const suffixPart = (await this._s3.uploadPartCopy(copySuffixParams)).CopyPartResult
parts.push({ ETag: suffixPart.ETag, PartNumber: copySuffixParams.PartNumber })
suffixFragmentOffset = fragmentEnd
}
}
await this._s3.send(
new CompleteMultipartUploadCommand({
...multipartParams,
MultipartUpload: { Parts: parts },
})
)
await this._s3.completeMultipartUpload({
...multipartParams,
MultipartUpload: { Parts: parts },
})
} catch (e) {
await this._s3.send(new AbortMultipartUploadCommand(multipartParams))
await this._s3.abortMultipartUpload(multipartParams)
throw e
}
}

View File

@@ -1,14 +1,14 @@
import { parse } from 'xo-remote-parser'
import MountHandler from './_mount'
import { normalize } from './_path'
import normalizePath from './_normalizePath'
export default class SmbMountHandler extends MountHandler {
constructor(remote, opts) {
const { domain = 'WORKGROUP', host, password, path, username } = parse(remote.url)
super(remote, opts, {
type: 'cifs',
device: '//' + host + normalize(path),
device: '//' + host + normalizePath(path),
options: `domain=${domain}`,
env: {
USER: username,

View File

@@ -20,7 +20,7 @@
">2%"
],
"engines": {
"node": ">=8.3"
"node": ">=6"
},
"dependencies": {
"lodash": "^4.17.4",

View File

@@ -1,8 +1,8 @@
'use strict'
const fromCallback = require('promise-toolbox/fromCallback')
const nodemailer = require('nodemailer') // eslint-disable-line n/no-extraneous-require
const prettyFormat = require('pretty-format') // eslint-disable-line n/no-extraneous-require
const nodemailer = require('nodemailer') // eslint-disable-line n/no-extraneous-import
const prettyFormat = require('pretty-format') // eslint-disable-line n/no-extraneous-import
const { evalTemplate, required } = require('../utils')
const { NAMES } = require('../levels')

View File

@@ -1,9 +1,7 @@
'use strict'
const fromCallback = require('promise-toolbox/fromCallback')
// eslint-disable-next-line n/no-missing-require
const splitHost = require('split-host')
// eslint-disable-next-line n/no-missing-require
const { createClient, Facility, Severity, Transport } = require('syslog-client')
const LEVELS = require('../levels')

View File

@@ -2,12 +2,7 @@
const camelCase = require('lodash/camelCase')
const {
defineProperties,
defineProperty,
hasOwn = Function.prototype.call.bind(Object.prototype.hasOwnProperty),
keys,
} = Object
const { defineProperties, defineProperty, keys } = Object
const noop = Function.prototype
const MIXIN_CYCLIC_DESCRIPTOR = {
@@ -18,49 +13,23 @@ const MIXIN_CYCLIC_DESCRIPTOR = {
}
module.exports = function mixin(object, mixins, args) {
const importing = { __proto__: null }
const importers = { __proto__: null }
function instantiateMixin(name, Mixin) {
defineProperty(object, name, MIXIN_CYCLIC_DESCRIPTOR)
const instance = new Mixin(object, ...args)
defineProperty(object, name, {
value: instance,
})
return instance
}
// add lazy property for each of the mixin, this allows mixins to depend on
// one another without any special ordering
const descriptors = {
loadMixin(name) {
if (hasOwn(this, name)) {
return Promise.resolve(this[name])
}
let promise = importing[name]
if (promise === undefined) {
const clean = () => {
delete importing[name]
}
promise = importers[name]().then(Mixin => instantiateMixin(name, Mixin))
promise.then(clean, clean)
importing[name] = promise
}
return promise
},
}
const descriptors = {}
keys(mixins).forEach(name => {
const Mixin = mixins[name]
name = camelCase(name)
if (Mixin.prototype === undefined) {
importers[name] = Mixin(name)
} else {
descriptors[name] = {
configurable: true,
get: () => instantiateMixin(name, Mixin),
}
descriptors[name] = {
configurable: true,
get: () => {
defineProperty(object, name, MIXIN_CYCLIC_DESCRIPTOR)
const instance = new Mixin(object, ...args)
defineProperty(object, name, {
value: instance,
})
return instance
},
}
})
defineProperties(object, descriptors)

View File

@@ -16,7 +16,7 @@
},
"preferGlobal": false,
"engines": {
"node": ">=7.6"
"node": ">=6"
},
"dependencies": {
"bind-property-descriptor": "^2.0.0",

View File

@@ -1,16 +1,15 @@
import get from 'lodash/get.js'
import identity from 'lodash/identity.js'
import isEqual from 'lodash/isEqual.js'
import { createLogger } from '@xen-orchestra/log'
import { parseDuration } from '@vates/parse-duration'
import { watch } from 'app-conf'
'use strict'
const get = require('lodash/get')
const identity = require('lodash/identity')
const isEqual = require('lodash/isEqual')
const { createLogger } = require('@xen-orchestra/log')
const { parseDuration } = require('@vates/parse-duration')
const { watch } = require('app-conf')
const { warn } = createLogger('xo:mixins:config')
// if path is undefined, an empty string or an empty array, returns the root value
const niceGet = (value, path) => (path === undefined || path.length === 0 ? value : get(value, path))
export default class Config {
module.exports = class Config {
constructor(app, { appDir, appName, config }) {
this._config = config
const watchers = (this._watchers = new Set())
@@ -33,7 +32,7 @@ export default class Config {
}
get(path) {
const value = niceGet(this._config, path)
const value = get(this._config, path)
if (value === undefined) {
throw new TypeError('missing config entry: ' + path)
}
@@ -45,27 +44,20 @@ export default class Config {
}
getOptional(path) {
return niceGet(this._config, path)
return get(this._config, path)
}
watch(path, cb) {
// short syntax for the whole config: watch(cb)
if (typeof path === 'function') {
cb = path
path = undefined
}
// internal arg
const processor = arguments.length > 2 ? arguments[2] : identity
let prev
const watcher = config => {
try {
const value = processor(niceGet(config, path))
const value = processor(get(config, path))
if (!isEqual(value, prev)) {
const previous = prev
prev = value
cb(value, previous, path)
cb(value)
}
} catch (error) {
warn('watch', { error, path })

View File

@@ -1,7 +1,9 @@
import assert from 'assert'
import emitAsync from '@xen-orchestra/emit-async'
import EventEmitter from 'events'
import { createLogger } from '@xen-orchestra/log'
'use strict'
const assert = require('assert')
const emitAsync = require('@xen-orchestra/emit-async')
const EventEmitter = require('events')
const { createLogger } = require('@xen-orchestra/log')
const { debug, warn } = createLogger('xo:mixins:hooks')
@@ -17,7 +19,7 @@ const runHook = async (emitter, hook) => {
debug(`${hook} finished`)
}
export default class Hooks extends EventEmitter {
module.exports = class Hooks extends EventEmitter {
// Run *clean* async listeners.
//
// They normalize existing data, clear invalid entries, etc.

View File

@@ -1,144 +0,0 @@
import { createLogger } from '@xen-orchestra/log'
import { EventListenersManager } from '@vates/event-listeners-manager'
import { pipeline } from 'stream'
import { ServerResponse, request } from 'http'
import assert from 'assert'
import fromCallback from 'promise-toolbox/fromCallback'
import fromEvent from 'promise-toolbox/fromEvent'
import net from 'net'
import { parseBasicAuth } from './_parseBasicAuth.mjs'
const { debug, warn } = createLogger('xo:mixins:HttpProxy')
const IGNORED_HEADERS = new Set([
// https://datatracker.ietf.org/doc/html/rfc2616#section-13.5.1
'connection',
'keep-alive',
'proxy-authenticate',
'proxy-authorization',
'te',
'trailers',
'transfer-encoding',
'upgrade',
// don't forward original host
'host',
])
export default class HttpProxy {
#app
constructor(app, { httpServer }) {
// don't setup the proxy if httpServer is not present
//
// that can happen when the app is instanciated in another context like xo-server-recover-account
if (httpServer === undefined) {
return
}
this.#app = app
const events = new EventListenersManager(httpServer)
app.config.watch('http.proxy.enabled', (enabled = false) => {
events.removeAll()
if (enabled) {
events.add('connect', this.#handleConnect.bind(this)).add('request', this.#handleRequest.bind(this))
}
})
}
async #handleAuthentication(req, res, next) {
const auth = parseBasicAuth(req.headers['proxy-authorization'])
let authenticated = false
if (auth !== undefined) {
const app = this.#app
if (app.authenticateUser !== undefined) {
// xo-server
try {
const { user } = await app.authenticateUser(auth)
authenticated = user.permission === 'admin'
} catch (error) {}
} else {
// xo-proxy
authenticated = (await app.authentication.findProfile(auth)) !== undefined
}
}
if (authenticated) {
return next()
}
// https://datatracker.ietf.org/doc/html/rfc7235#section-3.2
res.statusCode = '407'
res.setHeader('proxy-authenticate', 'Basic realm="proxy"')
return res.end('Proxy Authentication Required')
}
// https://nodejs.org/api/http.html#event-connect
async #handleConnect(req, clientSocket, head) {
const { url } = req
debug('CONNECT proxy', { url })
// https://github.com/TooTallNate/proxy/blob/d677ef31fd4ca9f7e868b34c18b9cb22b0ff69da/proxy.js#L391-L398
const res = new ServerResponse(req)
res.assignSocket(clientSocket)
try {
await this.#handleAuthentication(req, res, async () => {
const { port, hostname } = new URL('http://' + req.url)
const serverSocket = net.connect(port || 80, hostname)
await fromEvent(serverSocket, 'connect')
clientSocket.write('HTTP/1.1 200 Connection Established\r\n\r\n')
serverSocket.write(head)
fromCallback(pipeline, clientSocket, serverSocket).catch(warn)
fromCallback(pipeline, serverSocket, clientSocket).catch(warn)
})
} catch (error) {
warn(error)
clientSocket.end()
}
}
async #handleRequest(req, res) {
const { url } = req
if (url.startsWith('/')) {
// not a proxy request
return
}
debug('HTTP proxy', { url })
try {
assert(url.startsWith('http:'), 'HTTPS should use connect')
await this.#handleAuthentication(req, res, async () => {
const { headers } = req
const pHeaders = {}
for (const key of Object.keys(headers)) {
if (!IGNORED_HEADERS.has(key)) {
pHeaders[key] = headers[key]
}
}
const pReq = request(url, { headers: pHeaders, method: req.method })
fromCallback(pipeline, req, pReq).catch(warn)
const pRes = await fromEvent(pReq, 'response')
res.writeHead(pRes.statusCode, pRes.statusMessage, pRes.headers)
await fromCallback(pipeline, pRes, res)
})
} catch (error) {
res.statusCode = 500
res.end('Internal Server Error')
warn(error)
}
}
}

View File

@@ -1,27 +0,0 @@
const RE = /^\s*basic\s+(.+?)\s*$/i
export function parseBasicAuth(header) {
if (header === undefined) {
return
}
const matches = RE.exec(header)
if (matches === null) {
return
}
let credentials = Buffer.from(matches[1], 'base64').toString()
const i = credentials.indexOf(':')
if (i === -1) {
credentials = { token: credentials }
} else {
// https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.1
credentials = {
username: credentials.slice(0, i),
password: credentials.slice(i + 1),
}
}
return credentials
}

View File

@@ -1,74 +0,0 @@
> This module provides an HTTP and HTTPS proxy for `xo-proxy` and `xo-server`.
- [Set up](#set-up)
- [Usage](#usage)
- [`xo-proxy`](#xo-proxy)
- [`xo-server`](#xo-server)
- [Use cases](#use-cases)
- [Access hosts in a private network](#access-hosts-in-a-private-network)
- [Allow upgrading xo-proxy via xo-server](#allow-upgrading-xo-proxy-via-xo-server)
## Set up
The proxy is disabled by default, to enable it, add the following lines to your config:
```toml
[http.proxy]
enabled = true
```
## Usage
For safety reasons, the proxy requires authentication to be used.
### `xo-proxy`
Use the authentication token:
```
$ cat ~/.config/xo-proxy/config.z-auto.json
{"authenticationToken":"J0BgKritQgPxoyZrBJ5ViafQfLk06YoyFwC3fmfO5wU"}
```
Proxy URL to use:
```
https://J0BgKritQgPxoyZrBJ5ViafQfLk06YoyFwC3fmfO5wU@xo-proxy.company.lan
```
### `xo-server`
> Only available for admin users.
You can use your credentials:
```
https://user:password@xo.company.lan
```
Or create a dedicated token with `xo-cli`:
```
$ xo-cli --createToken xoa.company.lan admin@admin.net
Password: ********
Successfully logged with admin@admin.net
Authentication token created
DiYBFavJwf9GODZqQJs23eAx9eh3KlsRhBi8RcoX0KM
```
And use it in the URL:
```
https://DiYBFavJwf9GODZqQJs23eAx9eh3KlsRhBi8RcoX0KM@xo.company.lan
```
## Use cases
### Access hosts in a private network
To access hosts in a private network, deploy an XO Proxy in this network, expose its port 443 and use it as an HTTP proxy to connect to your servers in XO.
### Allow upgrading xo-proxy via xo-server
If your xo-proxy does not have direct Internet access, you can use xo-server as an HTTP proxy to make upgrades possible.

View File

@@ -14,18 +14,16 @@
"url": "https://vates.fr"
},
"license": "AGPL-3.0-or-later",
"version": "0.5.0",
"version": "0.2.0",
"engines": {
"node": ">=12"
},
"dependencies": {
"@vates/event-listeners-manager": "^1.0.0",
"@vates/parse-duration": "^0.1.1",
"@xen-orchestra/emit-async": "^1.0.0",
"@xen-orchestra/emit-async": "^0.1.0",
"@xen-orchestra/log": "^0.3.0",
"app-conf": "^2.1.0",
"lodash": "^4.17.21",
"promise-toolbox": "^0.21.0"
"app-conf": "^2.0.0",
"lodash": "^4.17.21"
},
"scripts": {
"postversion": "npm publish --access public"

View File

@@ -0,0 +1,3 @@
'use strict'
module.exports = require('../../@xen-orchestra/babel-config')(require('./package.json'))

View File

@@ -0,0 +1 @@
../../scripts/babel-eslintrc.js

View File

@@ -1,7 +1,7 @@
{
"private": false,
"name": "@xen-orchestra/proxy-cli",
"version": "0.3.0",
"version": "0.2.0",
"license": "AGPL-3.0-or-later",
"description": "CLI for @xen-orchestra/proxy",
"keywords": [
@@ -18,17 +18,18 @@
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"preferGlobal": true,
"main": "dist/",
"bin": {
"xo-proxy-cli": "./index.mjs"
"xo-proxy-cli": "dist/index.js"
},
"engines": {
"node": ">=14.13"
"node": ">=12"
},
"dependencies": {
"@iarna/toml": "^2.2.0",
"@vates/read-chunk": "^0.1.2",
"ansi-colors": "^4.1.1",
"app-conf": "^2.1.0",
"app-conf": "^2.0.0",
"content-type": "^1.0.4",
"cson-parser": "^4.0.7",
"getopts": "^2.2.3",
@@ -38,8 +39,23 @@
"pumpify": "^2.0.1",
"split2": "^4.1.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.7.4",
"@babel/plugin-proposal-optional-chaining": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"cross-env": "^7.0.2",
"rimraf": "^3.0.0"
},
"scripts": {
"postversion": "npm publish --access public"
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"clean": "rimraf dist/",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"postversion": "npm publish --access public",
"prebuild": "yarn run clean",
"predev": "yarn run prebuild",
"prepublishOnly": "yarn run build"
},
"author": {
"name": "Vates SAS",

View File

@@ -10,14 +10,14 @@ import getopts from 'getopts'
import hrp from 'http-request-plus'
import split2 from 'split2'
import pumpify from 'pumpify'
import { extname } from 'path'
import { extname, join } from 'path'
import { format, parse } from 'json-rpc-protocol'
import { inspect } from 'util'
import { load as loadConfig } from 'app-conf'
import { pipeline } from 'stream'
import { readChunk } from '@vates/read-chunk'
const pkg = JSON.parse(fs.readFileSync(new URL('package.json', import.meta.url)))
import pkg from '../package.json'
const FORMATS = {
__proto__: null,
@@ -30,22 +30,30 @@ const parseValue = value => (value.startsWith('json:') ? JSON.parse(value.slice(
async function main(argv) {
const config = await loadConfig('xo-proxy', {
appDir: join(__dirname, '..'),
ignoreUnknownFormats: true,
})
const opts = getopts(argv, {
const { hostname = 'localhost', port } = config?.http?.listen?.https ?? {}
const {
_: args,
file,
help,
host,
raw,
token,
} = getopts(argv, {
alias: { file: 'f', help: 'h' },
boolean: ['help', 'raw'],
default: {
token: config.authenticationToken,
},
stopEarly: true,
string: ['file', 'host', 'token', 'url'],
string: ['file', 'host', 'token'],
})
const { _: args, file } = opts
if (opts.help || (file === '' && args.length === 0)) {
if (help || (file === '' && args.length === 0)) {
return console.log(
'%s',
`Usage:
@@ -70,29 +78,18 @@ ${pkg.name} v${pkg.version}`
const baseRequest = {
headers: {
'content-type': 'application/json',
cookie: `authenticationToken=${token}`,
},
pathname: '/api/v1',
protocol: 'https:',
rejectUnauthorized: false,
}
let { token } = opts
if (opts.url !== '') {
const { protocol, host, username } = new URL(opts.url)
Object.assign(baseRequest, { protocol, host })
if (username !== '') {
token = username
}
if (host !== '') {
baseRequest.host = host
} else {
baseRequest.protocol = 'https:'
if (opts.host !== '') {
baseRequest.host = opts.host
} else {
const { hostname = 'localhost', port } = config?.http?.listen?.https ?? {}
baseRequest.hostname = hostname
baseRequest.port = port
}
baseRequest.hostname = hostname
baseRequest.port = port
}
baseRequest.headers.cookie = `authenticationToken=${token}`
const call = async ({ method, params }) => {
if (callPath.length !== 0) {
process.stderr.write(`\n${colors.bold(`--- call #${callPath.join('.')}`)} ---\n\n`)
@@ -131,7 +128,7 @@ ${pkg.name} v${pkg.version}`
stdout.write(inspect(JSON.parse(line), { colors: true, depth: null }))
stdout.write('\n')
}
} else if (opts.raw && typeof result === 'string') {
} else if (raw && typeof result === 'string') {
stdout.write(result)
} else {
stdout.write(inspect(result, { colors: true, depth: null }))

View File

@@ -0,0 +1 @@
../../scripts/babel-eslintrc.js

View File

@@ -1,29 +0,0 @@
import Config from '@xen-orchestra/mixins/Config.mjs'
import Hooks from '@xen-orchestra/mixins/Hooks.mjs'
import HttpProxy from '@xen-orchestra/mixins/HttpProxy.mjs'
import mixin from '@xen-orchestra/mixin'
import { createDebounceResource } from '@vates/disposable/debounceResource.js'
import Api from './mixins/api.mjs'
import Appliance from './mixins/appliance.mjs'
import Authentication from './mixins/authentication.mjs'
import Backups from './mixins/backups.mjs'
import Logs from './mixins/logs.mjs'
import Remotes from './mixins/remotes.mjs'
import ReverseProxy from './mixins/reverseProxy.mjs'
export default class App {
constructor(opts) {
mixin(this, { Api, Appliance, Authentication, Backups, Config, Hooks, HttpProxy, Logs, Remotes, ReverseProxy }, [
opts,
])
const debounceResource = createDebounceResource()
this.config.watchDuration('resourceCacheDelay', delay => {
debounceResource.defaultDelay = delay
})
this.hooks.once('stop', debounceResource.flushAll)
this.debounceResource = debounceResource
}
}

View File

@@ -22,6 +22,27 @@ disableMergeWorker = false
snapshotNameLabelTpl = '[XO Backup {job.name}] {vm.name_label}'
vhdDirectoryCompression = 'brotli'
[backups.defaultSettings]
reportWhen = 'failure'
[backups.metadata.defaultSettings]
retentionPoolMetadata = 0
retentionXoMetadata = 0
[backups.vm.defaultSettings]
bypassVdiChainsCheck = false
checkpointSnapshot = false
concurrency = 2
copyRetention = 0
deleteFirst = false
exportRetention = 0
fullInterval = 0
offlineBackup = false
offlineSnapshot = false
snapshotRetention = 0
timeout = 0
vmTimeout = 0
# This is a work-around.
#
# See https://github.com/vatesfr/xen-orchestra/pull/4674
@@ -60,6 +81,11 @@ timeout = 600e3
disableFileRemotes = true
[xapiOptions]
# VDIs with `[NOBAK]` flag can be ignored while snapshotting an halted VM.
#
# This is disabled by default for the time being but will be turned on after enough testing.
ignoreNobakVdis = false
maxUncoalescedVdis = 1
watchEvents = ['network', 'PIF', 'pool', 'SR', 'task', 'VBD', 'VDI', 'VIF', 'VM']

View File

@@ -1,7 +1,7 @@
{
"private": true,
"name": "@xen-orchestra/proxy",
"version": "0.23.2",
"version": "0.19.0",
"license": "AGPL-3.0-or-later",
"description": "XO Proxy used to remotely execute backup jobs",
"keywords": [
@@ -19,7 +19,7 @@
},
"preferGlobal": true,
"bin": {
"xo-proxy": "./index.mjs"
"xo-proxy": "dist/index.mjs"
},
"engines": {
"node": ">=14.18"
@@ -27,26 +27,25 @@
"dependencies": {
"@iarna/toml": "^2.2.0",
"@koa/router": "^10.0.0",
"@vates/cached-dns.lookup": "^1.0.0",
"@vates/compose": "^2.1.0",
"@vates/decorate-with": "^2.0.0",
"@vates/decorate-with": "^1.0.0",
"@vates/disposable": "^0.1.1",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.25.0",
"@xen-orchestra/fs": "^1.0.3",
"@xen-orchestra/backups": "^0.20.0",
"@xen-orchestra/fs": "^0.20.0",
"@xen-orchestra/log": "^0.3.0",
"@xen-orchestra/mixin": "^0.1.0",
"@xen-orchestra/mixins": "^0.5.0",
"@xen-orchestra/self-signed": "^0.1.3",
"@xen-orchestra/xapi": "^1.2.0",
"@xen-orchestra/mixins": "^0.2.0",
"@xen-orchestra/self-signed": "^0.1.0",
"@xen-orchestra/xapi": "^0.9.0",
"ajv": "^8.0.3",
"app-conf": "^2.1.0",
"app-conf": "^2.0.0",
"async-iterator-to-stream": "^1.1.0",
"fs-extra": "^10.0.0",
"get-stream": "^6.0.0",
"getopts": "^2.2.3",
"golike-defer": "^0.5.1",
"http-server-plus": "^0.11.1",
"http-server-plus": "^0.11.0",
"http2-proxy": "^5.0.53",
"json-rpc-protocol": "^0.13.1",
"jsonrpc-websocket-client": "^0.7.2",
@@ -60,19 +59,32 @@
"source-map-support": "^0.5.16",
"stoppable": "^1.0.6",
"xdg-basedir": "^5.1.0",
"xen-api": "^1.2.1",
"xo-common": "^0.8.0"
"xen-api": "^0.36.0",
"xo-common": "^0.7.0"
},
"devDependencies": {
"@babel/cli": "^7.0.0",
"@babel/core": "^7.0.0",
"@babel/plugin-proposal-class-properties": "^7.1.0",
"@babel/plugin-proposal-decorators": "^7.0.0",
"@babel/plugin-proposal-nullish-coalescing-operator": "^7.7.4",
"@babel/plugin-proposal-optional-chaining": "^7.0.0",
"@babel/preset-env": "^7.0.0",
"@vates/toggle-scripts": "^1.0.0",
"ws": "^8.5.0"
"babel-plugin-transform-dev": "^2.0.1",
"cross-env": "^7.0.2",
"index-modules": "^0.4.3"
},
"scripts": {
"_build": "index-modules --index-file index.mjs src/app/mixins && babel --delete-dir-on-start --keep-file-extension --source-maps --out-dir=dist/ src/",
"build": "cross-env NODE_ENV=production yarn run _build",
"dev": "cross-env NODE_ENV=development yarn run _build --watch",
"_postinstall": "./scripts/systemd-service-installer",
"postpack": "toggle-scripts -postinstall -preuninstall",
"prepack": "toggle-scripts +postinstall +preuninstall",
"prepublishOnly": "yarn run build",
"_preuninstall": "./scripts/systemd-service-installer",
"start": "./index.mjs"
"start": "./dist/index.mjs"
},
"author": {
"name": "Vates SAS",

Some files were not shown because too many files have changed in this diff Show More