Compare commits
62 Commits
xo-server/
...
xo-server/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
28b9bbe54f | ||
|
|
bf6bd7cbdc | ||
|
|
ddcb2468a6 | ||
|
|
f048b58935 | ||
|
|
09f6200c2e | ||
|
|
354692fb06 | ||
|
|
2c5858c2e0 | ||
|
|
1f41fd0436 | ||
|
|
e0bbefdfae | ||
|
|
bc6fbb2797 | ||
|
|
b579cf8128 | ||
|
|
a94ed014b7 | ||
|
|
0db991b668 | ||
|
|
347ced6942 | ||
|
|
5d7a775b2b | ||
|
|
df732ab4bf | ||
|
|
31cd3953d6 | ||
|
|
4666b13892 | ||
|
|
37d7ddb4b0 | ||
|
|
3abbaeb44b | ||
|
|
847ea49042 | ||
|
|
779068c2ee | ||
|
|
140cd6882d | ||
|
|
2e295c2391 | ||
|
|
596b0995f4 | ||
|
|
b61fe97893 | ||
|
|
209aa2ebe6 | ||
|
|
c03a0e857e | ||
|
|
2854d698e6 | ||
|
|
944163be0e | ||
|
|
269a9eaff0 | ||
|
|
7f9c49cbc4 | ||
|
|
2b6bfeeb15 | ||
|
|
fa9742bc92 | ||
|
|
472e419abc | ||
|
|
169d11387b | ||
|
|
e59ac6d947 | ||
|
|
e193b45562 | ||
|
|
1ac34f810e | ||
|
|
e65e5c6e5f | ||
|
|
af6365c76a | ||
|
|
8c672b23b5 | ||
|
|
3b53f5ac11 | ||
|
|
ccdc744748 | ||
|
|
261f0b4bf0 | ||
|
|
495b59c2e5 | ||
|
|
d6e1c13c39 | ||
|
|
f7f13b9e07 | ||
|
|
62564d747f | ||
|
|
1d5d59c4c0 | ||
|
|
e8380b8a12 | ||
|
|
c304d9cc62 | ||
|
|
aad4ebf287 | ||
|
|
6c2f48181c | ||
|
|
480b6ff7d6 | ||
|
|
4bdd6f972c | ||
|
|
6674d8456a | ||
|
|
d1478ff694 | ||
|
|
cb20d46b74 | ||
|
|
9dd2538043 | ||
|
|
f25136a512 | ||
|
|
03eb56ad2a |
18
package.json
18
package.json
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "xo-server",
|
||||
"version": "4.12.1",
|
||||
"version": "4.13.0-0",
|
||||
"license": "AGPL-3.0",
|
||||
"description": "Server part of Xen-Orchestra",
|
||||
"keywords": [
|
||||
@@ -34,6 +34,7 @@
|
||||
"node": ">=0.12 <5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@marsaud/smb2-promise": "^0.1.0",
|
||||
"app-conf": "^0.4.0",
|
||||
"babel-runtime": "^5",
|
||||
"base64url": "^1.0.5",
|
||||
@@ -58,8 +59,9 @@
|
||||
"get-stream": "^1.1.0",
|
||||
"graceful-fs": "^4.1.2",
|
||||
"hashy": "~0.4.2",
|
||||
"helmet": "^1.1.0",
|
||||
"highland": "^2.5.1",
|
||||
"http-server-plus": "^0.5.1",
|
||||
"http-server-plus": "^0.6.4",
|
||||
"human-format": "^0.6.0",
|
||||
"is-my-json-valid": "^2.12.2",
|
||||
"jade": "^1.11.0",
|
||||
@@ -85,9 +87,12 @@
|
||||
"lodash.get": "^3.7.0",
|
||||
"lodash.has": "^3.0.0",
|
||||
"lodash.includes": "^3.1.1",
|
||||
"lodash.invert": "^4.0.1",
|
||||
"lodash.isarray": "^3.0.0",
|
||||
"lodash.isboolean": "^3.0.2",
|
||||
"lodash.isempty": "^3.0.0",
|
||||
"lodash.isfunction": "^3.0.1",
|
||||
"lodash.isinteger": "^4.0.0",
|
||||
"lodash.isobject": "^3.0.0",
|
||||
"lodash.isstring": "^3.0.0",
|
||||
"lodash.keys": "^3.0.4",
|
||||
@@ -95,6 +100,7 @@
|
||||
"lodash.pick": "^3.0.0",
|
||||
"lodash.sortby": "^3.1.4",
|
||||
"lodash.startswith": "^3.0.1",
|
||||
"lodash.trim": "^3.0.1",
|
||||
"loud-rejection": "^1.2.0",
|
||||
"make-error": "^1",
|
||||
"micromatch": "^2.3.2",
|
||||
@@ -105,16 +111,21 @@
|
||||
"partial-stream": "0.0.0",
|
||||
"passport": "^0.3.0",
|
||||
"passport-local": "^1.0.0",
|
||||
"promise-toolbox": "^0.1.0",
|
||||
"proxy-http-request": "0.1.0",
|
||||
"redis": "^2.0.1",
|
||||
"schema-inspector": "^1.5.1",
|
||||
"semver": "^5.1.0",
|
||||
"serve-static": "^1.9.2",
|
||||
"stack-chain": "^1.3.3",
|
||||
"through2": "^2.0.0",
|
||||
"trace": "^2.0.1",
|
||||
"ws": "~0.8.0",
|
||||
"xen-api": "^0.7.2",
|
||||
"xml2js": "~0.4.6",
|
||||
"xo-collection": "^0.4.0"
|
||||
"xo-acl-resolver": "0.0.0-0",
|
||||
"xo-collection": "^0.4.0",
|
||||
"xo-remote-parser": "^0.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"babel-eslint": "^4.0.10",
|
||||
@@ -129,7 +140,6 @@
|
||||
"leche": "^2.1.1",
|
||||
"mocha": "^2.2.1",
|
||||
"must": "^0.13.1",
|
||||
"node-inspector": "^0.12.2",
|
||||
"sinon": "^1.14.1",
|
||||
"standard": "^5.2.1"
|
||||
},
|
||||
|
||||
@@ -66,6 +66,8 @@ http:
|
||||
#socket: './http.sock'
|
||||
|
||||
# Basic HTTPS.
|
||||
#
|
||||
# You can find the list of possible options there https://nodejs.org/docs/latest/api/tls.html#tls.createServer
|
||||
# -
|
||||
# # The only difference is the presence of the certificate and the
|
||||
# # key.
|
||||
@@ -83,7 +85,7 @@ http:
|
||||
# # certificate authority up to the root.
|
||||
# #
|
||||
# # Default: undefined
|
||||
# certificate: './certificate.pem'
|
||||
# cert: './certificate.pem'
|
||||
|
||||
# # File containing the private key (PEM format).
|
||||
# #
|
||||
@@ -93,6 +95,10 @@ http:
|
||||
# # Default: undefined
|
||||
# key: './key.pem'
|
||||
|
||||
# If set to true, all HTTP traffic will be redirected to the first
|
||||
# HTTPs configuration.
|
||||
#redirectToHttps: true
|
||||
|
||||
# List of files/directories which will be served.
|
||||
mounts:
|
||||
#'/': '/path/to/xo-web/dist/'
|
||||
|
||||
123
src/acl.js
123
src/acl.js
@@ -1,123 +0,0 @@
|
||||
// These global variables are not a problem because the algorithm is
|
||||
// synchronous.
|
||||
let permissionsByObject
|
||||
let getObject
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
const authorized = () => true // eslint-disable-line no-unused-vars
|
||||
const forbiddden = () => false // eslint-disable-line no-unused-vars
|
||||
|
||||
function and (...checkers) { // eslint-disable-line no-unused-vars
|
||||
return function (object, permission) {
|
||||
for (const checker of checkers) {
|
||||
if (!checker(object, permission)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
function or (...checkers) { // eslint-disable-line no-unused-vars
|
||||
return function (object, permission) {
|
||||
for (const checker of checkers) {
|
||||
if (checker(object, permission)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
function checkMember (memberName) {
|
||||
return function (object, permission) {
|
||||
const member = object[memberName]
|
||||
return checkAuthorization(member, permission)
|
||||
}
|
||||
}
|
||||
|
||||
function checkSelf ({ id }, permission) {
|
||||
const permissionsForObject = permissionsByObject[id]
|
||||
|
||||
return (
|
||||
permissionsForObject &&
|
||||
permissionsForObject[permission]
|
||||
)
|
||||
}
|
||||
|
||||
// ===================================================================
|
||||
|
||||
const checkAuthorizationByTypes = {
|
||||
host: or(checkSelf, checkMember('$pool')),
|
||||
|
||||
message: checkMember('$object'),
|
||||
|
||||
network: or(checkSelf, checkMember('$pool')),
|
||||
|
||||
SR: or(checkSelf, checkMember('$pool')),
|
||||
|
||||
task: checkMember('$host'),
|
||||
|
||||
VBD: checkMember('VDI'),
|
||||
|
||||
// Access to a VDI is granted if the user has access to the
|
||||
// containing SR or to a linked VM.
|
||||
VDI (vdi, permission) {
|
||||
// Check authorization for the containing SR.
|
||||
if (checkAuthorization(vdi.$SR, permission)) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check authorization for each of the connected VMs.
|
||||
for (const { VM: vm } of vdi.$VBDs) {
|
||||
if (checkAuthorization(vm, permission)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
},
|
||||
|
||||
VIF: or(checkMember('$network'), checkMember('$VM')),
|
||||
|
||||
VM: or(checkSelf, checkMember('$container')),
|
||||
|
||||
'VM-snapshot': checkMember('$snapshot_of'),
|
||||
|
||||
'VM-template': authorized
|
||||
}
|
||||
|
||||
function checkAuthorization (objectId, permission) {
|
||||
const object = getObject(objectId)
|
||||
const checker = checkAuthorizationByTypes[object.type] || checkSelf
|
||||
|
||||
return checker(object, permission)
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
export default function (
|
||||
permissionsByObject_,
|
||||
getObject_,
|
||||
permissions
|
||||
) {
|
||||
// Assign global variables.
|
||||
permissionsByObject = permissionsByObject_
|
||||
getObject = getObject_
|
||||
|
||||
try {
|
||||
for (const [objectId, permission] of permissions) {
|
||||
if (!checkAuthorization(objectId, permission)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
} finally {
|
||||
// Free the global variables.
|
||||
permissionsByObject = getObject = null
|
||||
}
|
||||
}
|
||||
@@ -36,5 +36,5 @@ resize.params = {
|
||||
}
|
||||
|
||||
resize.resolve = {
|
||||
vdi: ['id', 'VDI', 'administrate']
|
||||
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate']
|
||||
}
|
||||
|
||||
@@ -16,3 +16,13 @@ export async function get ({namespace}) {
|
||||
}
|
||||
|
||||
get.description = 'returns logs list for one namespace'
|
||||
|
||||
function delete_ ({namespace, id}) {
|
||||
const logger = this.getLogger(namespace)
|
||||
logger.del(id)
|
||||
}
|
||||
|
||||
delete_.description = 'deletes on or several logs from a namespace'
|
||||
delete_.permission = 'admin'
|
||||
|
||||
export {delete_ as delete}
|
||||
|
||||
@@ -5,7 +5,7 @@ export async function getAll () {
|
||||
getAll.permission = 'admin'
|
||||
getAll.description = 'Gets all existing fs remote points'
|
||||
|
||||
export async function get (id) {
|
||||
export async function get ({id}) {
|
||||
return await this.getRemote(id)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ get.params = {
|
||||
id: {type: 'string'}
|
||||
}
|
||||
|
||||
export async function list (id) {
|
||||
export async function list ({id}) {
|
||||
return await this.listRemoteBackups(id)
|
||||
}
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ delete_.params = {
|
||||
}
|
||||
|
||||
delete_.resolve = {
|
||||
vdi: ['id', 'VDI', 'administrate'],
|
||||
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
|
||||
}
|
||||
|
||||
exports.delete = delete_
|
||||
@@ -69,7 +69,7 @@ set.params = {
|
||||
}
|
||||
|
||||
set.resolve = {
|
||||
vdi: ['id', 'VDI', 'administrate'],
|
||||
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
|
||||
}
|
||||
|
||||
exports.set = set
|
||||
@@ -89,7 +89,7 @@ migrate.params = {
|
||||
}
|
||||
|
||||
migrate.resolve = {
|
||||
vdi: ['id', 'VDI', 'administrate'],
|
||||
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
|
||||
sr: ['sr_id', 'SR', 'administrate'],
|
||||
}
|
||||
|
||||
|
||||
@@ -191,69 +191,70 @@ exports.insertCd = insertCd
|
||||
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
migrate = $coroutine ({vm, host}) ->
|
||||
yield @getXapi(vm).migrateVm(vm._xapiId, @getXapi(host), host._xapiId)
|
||||
return
|
||||
|
||||
migrate.params = {
|
||||
# Identifier of the VM to migrate.
|
||||
id: { type: 'string' }
|
||||
|
||||
# Identifier of the host to migrate to.
|
||||
host_id: { type: 'string' }
|
||||
}
|
||||
|
||||
migrate.resolve = {
|
||||
vm: ['id', 'VM']
|
||||
host: ['host_id', 'host', 'administrate']
|
||||
}
|
||||
|
||||
exports.migrate = migrate
|
||||
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
migratePool = $coroutine ({
|
||||
migrate = $coroutine ({
|
||||
vm,
|
||||
host
|
||||
sr
|
||||
network
|
||||
host,
|
||||
mapVdisSrs,
|
||||
mapVifsNetworks,
|
||||
migrationNetwork
|
||||
}) ->
|
||||
permissions = []
|
||||
|
||||
if mapVdisSrs
|
||||
mapVdisSrsXapi = {}
|
||||
forEach mapVdisSrs, (srId, vdiId) =>
|
||||
vdiXapiId = @getObject(vdiId, 'VDI')._xapiId
|
||||
mapVdisSrsXapi[vdiXapiId] = @getObject(srId, 'SR')._xapiId
|
||||
permissions.push([
|
||||
srId,
|
||||
'administrate'
|
||||
])
|
||||
|
||||
if mapVifsNetworks
|
||||
mapVifsNetworksXapi = {}
|
||||
forEach mapVifsNetworks, (networkId, vifId) =>
|
||||
vifXapiId = @getObject(vifId, 'VIF')._xapiId
|
||||
mapVifsNetworksXapi[vifXapiId] = @getObject(networkId, 'network')._xapiId
|
||||
permissions.push([
|
||||
networkId,
|
||||
'administrate'
|
||||
])
|
||||
|
||||
unless yield @hasPermissions(@session.get('user_id'), permissions)
|
||||
throw new Unauthorized()
|
||||
|
||||
yield @getXapi(vm).migrateVm(vm._xapiId, @getXapi(host), host._xapiId, {
|
||||
migrationNetworkId: migrationNetwork?._xapiId
|
||||
networkId: network?._xapiId,
|
||||
srId: sr?._xapiId,
|
||||
mapVifsNetworks: mapVifsNetworksXapi,
|
||||
mapVdisSrs: mapVdisSrsXapi,
|
||||
})
|
||||
return
|
||||
|
||||
migratePool.params = {
|
||||
migrate.params = {
|
||||
|
||||
# Identifier of the VM to migrate.
|
||||
id: { type: 'string' }
|
||||
vm: { type: 'string' }
|
||||
|
||||
# Identifier of the host to migrate to.
|
||||
target_host_id: { type: 'string' }
|
||||
targetHost: { type: 'string' }
|
||||
|
||||
# Identifier of the target SR
|
||||
target_sr_id: { type: 'string', optional: true }
|
||||
# Map VDIs IDs --> SRs IDs
|
||||
mapVdisSrs: { type: 'object', optional: true }
|
||||
|
||||
# Identifier of the target Network
|
||||
target_network_id: { type: 'string', optional: true }
|
||||
# Map VIFs IDs --> Networks IDs
|
||||
mapVifsNetworks: { type: 'object', optional: true }
|
||||
|
||||
# Identifier of the Network use for the migration
|
||||
migration_network_id: { type: 'string', optional: true }
|
||||
migrationNetwork: { type: 'string', optional: true }
|
||||
}
|
||||
|
||||
migratePool.resolve = {
|
||||
vm: ['id', 'VM', 'administrate'],
|
||||
host: ['target_host_id', 'host', 'administrate'],
|
||||
sr: ['target_sr_id', 'SR', 'administrate'],
|
||||
network: ['target_network_id', 'network', 'administrate'],
|
||||
migrationNetwork: ['migration_network_id', 'network', 'administrate'],
|
||||
migrate.resolve = {
|
||||
vm: ['vm', 'VM', 'administrate'],
|
||||
host: ['targetHost', 'host', 'administrate'],
|
||||
migrationNetwork: ['migrationNetwork', 'network', 'administrate'],
|
||||
}
|
||||
|
||||
# TODO: camel case.
|
||||
exports.migrate_pool = migratePool
|
||||
exports.migrate = migrate
|
||||
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
@@ -323,6 +324,9 @@ set = $coroutine (params) ->
|
||||
else
|
||||
yield xapi.call 'VM.remove_from_other_config', ref, 'auto_poweron'
|
||||
|
||||
if 'cpuWeight' of params
|
||||
yield xapi.setVcpuWeight(VM._xapiId, params.cpuWeight)
|
||||
|
||||
# Other fields.
|
||||
for param, fields of {
|
||||
'name_label'
|
||||
@@ -360,6 +364,8 @@ set.params = {
|
||||
|
||||
# Kernel arguments for PV VM.
|
||||
PV_args: { type: 'string', optional: true }
|
||||
|
||||
cpuWeight: { type: 'integer', optional: true}
|
||||
}
|
||||
|
||||
set.resolve = {
|
||||
@@ -572,12 +578,13 @@ exports.rollingSnapshot = rollingSnapshot
|
||||
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
backup = $coroutine ({vm, pathToFile, compress, onlyMetadata}) ->
|
||||
yield @backupVm({vm, pathToFile, compress, onlyMetadata})
|
||||
backup = $coroutine ({vm, remoteId, file, compress, onlyMetadata}) ->
|
||||
yield @backupVm({vm, remoteId, file, compress, onlyMetadata})
|
||||
|
||||
backup.params = {
|
||||
id: { type: 'string' }
|
||||
pathToFile: { type: 'string' }
|
||||
id: {type: 'string'}
|
||||
remoteId: { type: 'string' }
|
||||
file: { type: 'string' }
|
||||
compress: { type: 'boolean', optional: true }
|
||||
onlyMetadata: { type: 'boolean', optional: true }
|
||||
}
|
||||
@@ -615,14 +622,9 @@ exports.importBackup = importBackup
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
rollingBackup = $coroutine ({vm, remoteId, tag, depth, compress, onlyMetadata}) ->
|
||||
remote = yield @getRemote remoteId
|
||||
if not remote?.path?
|
||||
throw new Error "No such Remote #{remoteId}"
|
||||
if not remote.enabled
|
||||
throw new Error "Backup remote #{remoteId} is disabled"
|
||||
return yield @rollingBackupVm({
|
||||
vm,
|
||||
path: remote.path,
|
||||
remoteId,
|
||||
tag,
|
||||
depth,
|
||||
compress,
|
||||
|
||||
46
src/index.js
46
src/index.js
@@ -7,6 +7,7 @@ import blocked from 'blocked'
|
||||
import createExpress from 'express'
|
||||
import eventToPromise from 'event-to-promise'
|
||||
import has from 'lodash.has'
|
||||
import helmet from 'helmet'
|
||||
import includes from 'lodash.includes'
|
||||
import isArray from 'lodash.isarray'
|
||||
import isFunction from 'lodash.isfunction'
|
||||
@@ -90,6 +91,8 @@ async function loadConfiguration () {
|
||||
function createExpressApp () {
|
||||
const app = createExpress()
|
||||
|
||||
app.use(helmet())
|
||||
|
||||
// Registers the cookie-parser and express-session middlewares,
|
||||
// necessary for connect-flash.
|
||||
app.use(cookieParser())
|
||||
@@ -276,12 +279,18 @@ async function registerPlugins (xo) {
|
||||
|
||||
// ===================================================================
|
||||
|
||||
async function makeWebServerListen (opts) {
|
||||
// Read certificate and key if necessary.
|
||||
const {certificate, key} = opts
|
||||
if (certificate && key) {
|
||||
[opts.certificate, opts.key] = await Promise.all([
|
||||
readFile(certificate),
|
||||
async function makeWebServerListen ({
|
||||
certificate,
|
||||
|
||||
// The properties was called `certificate` before.
|
||||
cert = certificate,
|
||||
|
||||
key,
|
||||
...opts
|
||||
}) {
|
||||
if (cert && key) {
|
||||
[opts.cert, opts.key] = await Promise.all([
|
||||
readFile(cert),
|
||||
readFile(key)
|
||||
])
|
||||
}
|
||||
@@ -607,6 +616,31 @@ export default async function main (args) {
|
||||
// Express is used to manage non WebSocket connections.
|
||||
const express = createExpressApp()
|
||||
|
||||
if (config.http.redirectToHttps) {
|
||||
let port
|
||||
forEach(config.http.listen, listen => {
|
||||
if (
|
||||
listen.port &&
|
||||
(listen.cert || listen.certificate)
|
||||
) {
|
||||
port = listen.port
|
||||
return false
|
||||
}
|
||||
})
|
||||
|
||||
if (port === undefined) {
|
||||
warn('Could not setup HTTPs redirection: no HTTPs port found')
|
||||
} else {
|
||||
express.use((req, res, next) => {
|
||||
if (req.secure) {
|
||||
return next()
|
||||
}
|
||||
|
||||
res.redirect(`https://${req.hostname}:${port}${req.originalUrl}`)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Must be set up before the API.
|
||||
setUpConsoleProxy(webServer, xo)
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import highland from 'highland'
|
||||
import { forEach, noop } from '../utils'
|
||||
|
||||
// See: https://en.wikipedia.org/wiki/Syslog#Severity_level
|
||||
const LEVELS = [
|
||||
@@ -46,6 +47,19 @@ export default class LevelDbLogger {
|
||||
return highland(this._db.createReadStream())
|
||||
.filter(({value}) => value.namespace === this._namespace)
|
||||
}
|
||||
|
||||
del (id) {
|
||||
if (!Array.isArray(id)) {
|
||||
id = [id]
|
||||
}
|
||||
forEach(id, id => {
|
||||
this._db.get(id, (err, value) => {
|
||||
if (!err && value.namespace === this._namespace) {
|
||||
this._db.del(id, noop)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Create high level log methods.
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import Collection from '../collection/redis'
|
||||
import Model from '../model'
|
||||
import { forEach } from '../utils'
|
||||
import {
|
||||
forEach
|
||||
} from '../utils'
|
||||
|
||||
// ===================================================================
|
||||
|
||||
|
||||
@@ -1,140 +0,0 @@
|
||||
import filter from 'lodash.filter'
|
||||
import fs from 'fs-promise'
|
||||
import {exec} from 'child_process'
|
||||
|
||||
import {
|
||||
forEach,
|
||||
noop,
|
||||
promisify
|
||||
} from './utils'
|
||||
|
||||
const execAsync = promisify(exec)
|
||||
|
||||
class NfsMounter {
|
||||
async _loadRealMounts () {
|
||||
let stdout
|
||||
try {
|
||||
[stdout] = await execAsync('findmnt -P -t nfs,nfs4 --output SOURCE,TARGET --noheadings')
|
||||
} catch (exc) {
|
||||
// When no mounts are found, the call pretends to fail...
|
||||
}
|
||||
const mounted = {}
|
||||
if (stdout) {
|
||||
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
|
||||
forEach(stdout.split('\n'), m => {
|
||||
if (m) {
|
||||
const match = regex.exec(m)
|
||||
mounted[match[3]] = {
|
||||
host: match[1],
|
||||
share: match[2]
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
this._realMounts = mounted
|
||||
return mounted
|
||||
}
|
||||
|
||||
_fullPath (path) {
|
||||
return path
|
||||
}
|
||||
|
||||
_matchesRealMount (mount) {
|
||||
return this._fullPath(mount.path) in this._realMounts
|
||||
}
|
||||
|
||||
async _mount (mount) {
|
||||
const path = this._fullPath(mount.path)
|
||||
await fs.ensureDir(path)
|
||||
return await execAsync(`mount -t nfs ${mount.host}:${mount.share} ${path}`)
|
||||
}
|
||||
|
||||
async forget (mount) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
|
||||
async _umount (mount) {
|
||||
const path = this._fullPath(mount.path)
|
||||
await execAsync(`umount ${path}`)
|
||||
}
|
||||
|
||||
async sync (mount) {
|
||||
await this._loadRealMounts()
|
||||
if (this._matchesRealMount(mount) && !mount.enabled) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (exc) {
|
||||
mount.enabled = true
|
||||
mount.error = exc.message
|
||||
}
|
||||
} else if (!this._matchesRealMount(mount) && mount.enabled) {
|
||||
try {
|
||||
await this._mount(mount)
|
||||
} catch (exc) {
|
||||
mount.enabled = false
|
||||
mount.error = exc.message
|
||||
}
|
||||
}
|
||||
return mount
|
||||
}
|
||||
|
||||
async disableAll (mounts) {
|
||||
await this._loadRealMounts()
|
||||
forEach(mounts, async mount => {
|
||||
if (this._matchesRealMount(mount)) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
class LocalHandler {
|
||||
constructor () {
|
||||
this.forget = noop
|
||||
this.disableAll = noop
|
||||
}
|
||||
|
||||
async sync (local) {
|
||||
if (local.enabled) {
|
||||
try {
|
||||
await fs.ensureDir(local.path)
|
||||
await fs.access(local.path, fs.R_OK | fs.W_OK)
|
||||
} catch (exc) {
|
||||
local.enabled = false
|
||||
local.error = exc.message
|
||||
}
|
||||
}
|
||||
return local
|
||||
}
|
||||
}
|
||||
|
||||
export default class RemoteHandler {
|
||||
constructor () {
|
||||
this.handlers = {
|
||||
nfs: new NfsMounter(),
|
||||
local: new LocalHandler()
|
||||
}
|
||||
}
|
||||
|
||||
async sync (remote) {
|
||||
return await this.handlers[remote.type].sync(remote)
|
||||
}
|
||||
|
||||
async forget (remote) {
|
||||
return await this.handlers[remote.type].forget(remote)
|
||||
}
|
||||
|
||||
async disableAll (remotes) {
|
||||
const promises = []
|
||||
forEach(['local', 'nfs'], type => promises.push(this.handlers[type].disableAll(filter(remotes, remote => remote.type === type))))
|
||||
await Promise.all(promises)
|
||||
}
|
||||
}
|
||||
177
src/remote-handlers/abstract.js
Normal file
177
src/remote-handlers/abstract.js
Normal file
@@ -0,0 +1,177 @@
|
||||
import eventToPromise from 'event-to-promise'
|
||||
import getStream from 'get-stream'
|
||||
import through2 from 'through2'
|
||||
|
||||
import {
|
||||
parse
|
||||
} from 'xo-remote-parser'
|
||||
|
||||
import {
|
||||
addChecksumToReadStream,
|
||||
noop,
|
||||
validChecksumOfReadStream
|
||||
} from '../utils'
|
||||
|
||||
export default class RemoteHandlerAbstract {
|
||||
constructor (remote) {
|
||||
this._remote = parse({...remote})
|
||||
if (this._remote.type !== this.type) {
|
||||
throw new Error('Incorrect remote type')
|
||||
}
|
||||
}
|
||||
|
||||
get type () {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
/**
|
||||
* Asks the handler to sync the state of the effective remote with its' metadata
|
||||
*/
|
||||
async sync () {
|
||||
return this._sync()
|
||||
}
|
||||
|
||||
async _sync () {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the resources possibly dedicated to put the remote at work, when it is no more needed
|
||||
*/
|
||||
async forget () {
|
||||
return this._forget()
|
||||
}
|
||||
|
||||
async _forget () {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async outputFile (file, data, options) {
|
||||
return this._outputFile(file, data, options)
|
||||
}
|
||||
|
||||
async _outputFile (file, data, options) {
|
||||
const stream = await this.createOutputStream(file)
|
||||
const promise = eventToPromise(stream, 'finish')
|
||||
stream.end(data)
|
||||
return promise
|
||||
}
|
||||
|
||||
async readFile (file, options) {
|
||||
return this._readFile(file, options)
|
||||
}
|
||||
|
||||
async _readFile (file, options) {
|
||||
return getStream(await this.createReadStream(file, options))
|
||||
}
|
||||
|
||||
async rename (oldPath, newPath) {
|
||||
return this._rename(oldPath, newPath)
|
||||
}
|
||||
|
||||
async _rename (oldPath, newPath) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async list (dir = '.') {
|
||||
return this._list(dir)
|
||||
}
|
||||
|
||||
async _list (dir) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async createReadStream (file, {
|
||||
checksum = false,
|
||||
ignoreMissingChecksum = false,
|
||||
...options
|
||||
} = {}) {
|
||||
const streamP = this._createReadStream(file, options).then(async stream => {
|
||||
await eventToPromise(stream, 'readable')
|
||||
|
||||
if (stream.length === undefined) {
|
||||
stream.length = await this.getSize(file).catch(noop)
|
||||
}
|
||||
|
||||
return stream
|
||||
})
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
try {
|
||||
checksum = await this.readFile(`${file}.checksum`)
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT' && ignoreMissingChecksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
let stream = await streamP
|
||||
|
||||
const { length } = stream
|
||||
stream = validChecksumOfReadStream(stream, checksum.toString())
|
||||
stream.length = length
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
async _createReadStream (file, options) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async createOutputStream (file, {
|
||||
checksum = false,
|
||||
...options
|
||||
} = {}) {
|
||||
const streamP = this._createOutputStream(file, options)
|
||||
|
||||
if (!checksum) {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const connectorStream = through2()
|
||||
const forwardError = error => {
|
||||
connectorStream.emit('error', error)
|
||||
}
|
||||
|
||||
const streamWithChecksum = addChecksumToReadStream(connectorStream)
|
||||
streamWithChecksum.pipe(await streamP)
|
||||
streamWithChecksum.on('error', forwardError)
|
||||
|
||||
streamWithChecksum.checksum
|
||||
.then(value => this.outputFile(`${file}.checksum`, value))
|
||||
.catch(forwardError)
|
||||
|
||||
return connectorStream
|
||||
}
|
||||
|
||||
async _createOutputStream (file, options) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async unlink (file, {
|
||||
checksum = false
|
||||
} = {}) {
|
||||
if (checksum) {
|
||||
this._unlink(`${file}.checksum`).catch(noop)
|
||||
}
|
||||
|
||||
return this._unlink(file)
|
||||
}
|
||||
|
||||
async _unlink (file) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async getSize (file) {
|
||||
return this._getSize(file)
|
||||
}
|
||||
|
||||
async _getSize (file) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
}
|
||||
84
src/remote-handlers/local.js
Normal file
84
src/remote-handlers/local.js
Normal file
@@ -0,0 +1,84 @@
|
||||
import fs from 'fs-promise'
|
||||
import startsWith from 'lodash.startswith'
|
||||
import {
|
||||
dirname,
|
||||
resolve
|
||||
} from 'path'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import {
|
||||
noop
|
||||
} from '../utils'
|
||||
|
||||
export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
get type () {
|
||||
return 'local'
|
||||
}
|
||||
|
||||
_getFilePath (file) {
|
||||
const parts = [this._remote.path]
|
||||
if (file) {
|
||||
parts.push(file)
|
||||
}
|
||||
const path = resolve.apply(null, parts)
|
||||
if (!startsWith(path, this._remote.path)) {
|
||||
throw new Error('Remote path is unavailable')
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
async _sync () {
|
||||
if (this._remote.enabled) {
|
||||
try {
|
||||
await fs.ensureDir(this._remote.path)
|
||||
await fs.access(this._remote.path, fs.R_OK | fs.W_OK)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async _forget () {
|
||||
return noop()
|
||||
}
|
||||
|
||||
async _outputFile (file, data, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
await fs.writeFile(this._getFilePath(file), data, options)
|
||||
}
|
||||
|
||||
async _readFile (file, options) {
|
||||
return fs.readFile(this._getFilePath(file), options)
|
||||
}
|
||||
|
||||
async _rename (oldPath, newPath) {
|
||||
return fs.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
|
||||
}
|
||||
|
||||
async _list (dir = '.') {
|
||||
return fs.readdir(this._getFilePath(dir))
|
||||
}
|
||||
|
||||
async _createReadStream (file, options) {
|
||||
return fs.createReadStream(this._getFilePath(file), options)
|
||||
}
|
||||
|
||||
async _createOutputStream (file, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
return fs.createWriteStream(path, options)
|
||||
}
|
||||
|
||||
async _unlink (file) {
|
||||
return fs.unlink(this._getFilePath(file))
|
||||
}
|
||||
|
||||
async _getSize (file) {
|
||||
const stats = await fs.stat(this._getFilePath(file))
|
||||
return stats.size
|
||||
}
|
||||
|
||||
}
|
||||
77
src/remote-handlers/nfs.js
Normal file
77
src/remote-handlers/nfs.js
Normal file
@@ -0,0 +1,77 @@
|
||||
import execa from 'execa'
|
||||
import fs from 'fs-promise'
|
||||
|
||||
import LocalHandler from './local'
|
||||
import {
|
||||
forEach
|
||||
} from '../utils'
|
||||
|
||||
export default class NfsHandler extends LocalHandler {
|
||||
get type () {
|
||||
return 'nfs'
|
||||
}
|
||||
|
||||
async _loadRealMounts () {
|
||||
let stdout
|
||||
const mounted = {}
|
||||
try {
|
||||
[stdout] = await execa('findmnt', ['-P', '-t', 'nfs,nfs4', '--output', 'SOURCE,TARGET', '--noheadings'])
|
||||
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
|
||||
forEach(stdout.split('\n'), m => {
|
||||
if (m) {
|
||||
const match = regex.exec(m)
|
||||
mounted[match[3]] = {
|
||||
host: match[1],
|
||||
share: match[2]
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch (exc) {
|
||||
// When no mounts are found, the call pretends to fail...
|
||||
}
|
||||
|
||||
this._realMounts = mounted
|
||||
return mounted
|
||||
}
|
||||
|
||||
_matchesRealMount (remote) {
|
||||
return remote.path in this._realMounts
|
||||
}
|
||||
|
||||
async _mount (remote) {
|
||||
await fs.ensureDir(remote.path)
|
||||
return execa('mount', ['-t', 'nfs', `${remote.host}:${remote.share}`, remote.path])
|
||||
}
|
||||
|
||||
async _sync () {
|
||||
await this._loadRealMounts()
|
||||
if (this._matchesRealMount(this._remote) && !this._remote.enabled) {
|
||||
try {
|
||||
await this._umount(this._remote)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = true
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
} else if (!this._matchesRealMount(this._remote) && this._remote.enabled) {
|
||||
try {
|
||||
await this._mount(this._remote)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async _forget () {
|
||||
try {
|
||||
await this._umount(this._remote)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
|
||||
async _umount (remote) {
|
||||
await execa('umount', [remote.path])
|
||||
}
|
||||
}
|
||||
144
src/remote-handlers/smb.js
Normal file
144
src/remote-handlers/smb.js
Normal file
@@ -0,0 +1,144 @@
|
||||
import Smb2 from '@marsaud/smb2-promise'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import {
|
||||
noop
|
||||
} from '../utils'
|
||||
|
||||
export default class SmbHandler extends RemoteHandlerAbstract {
|
||||
constructor (remote) {
|
||||
super(remote)
|
||||
this._forget = noop
|
||||
}
|
||||
|
||||
get type () {
|
||||
return 'smb'
|
||||
}
|
||||
|
||||
_getClient (remote) {
|
||||
return new Smb2({
|
||||
share: `\\\\${remote.host}`,
|
||||
domain: remote.domain,
|
||||
username: remote.username,
|
||||
password: remote.password,
|
||||
autoCloseTimeout: 0
|
||||
})
|
||||
}
|
||||
|
||||
_getFilePath (file) {
|
||||
if (file === '.') {
|
||||
file = undefined
|
||||
}
|
||||
const parts = []
|
||||
if (this._remote.path !== '') {
|
||||
parts.push(this._remote.path)
|
||||
}
|
||||
if (file) {
|
||||
parts.push(file.split('/'))
|
||||
}
|
||||
return parts.join('\\')
|
||||
}
|
||||
|
||||
_dirname (file) {
|
||||
const parts = file.split('\\')
|
||||
parts.pop()
|
||||
return parts.join('\\')
|
||||
}
|
||||
|
||||
async _sync () {
|
||||
if (this._remote.enabled) {
|
||||
try {
|
||||
// Check access (smb2 does not expose connect in public so far...)
|
||||
await this.list()
|
||||
} catch (error) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = error.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async _outputFile (file, data, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
const path = this._getFilePath(file)
|
||||
const dir = this._dirname(path)
|
||||
try {
|
||||
if (dir) {
|
||||
await client.ensureDir(dir)
|
||||
}
|
||||
return client.writeFile(path, data, options)
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async _readFile (file, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return client.readFile(this._getFilePath(file), options)
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async _rename (oldPath, newPath) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return client.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async _list (dir = '.') {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return client.readdir(this._getFilePath(dir))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async _createReadStream (file, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
const stream = await client.createReadStream(this._getFilePath(file), options) // FIXME ensure that options are properly handled by @marsaud/smb2
|
||||
stream.on('end', () => client.close())
|
||||
return stream
|
||||
}
|
||||
|
||||
async _createOutputStream (file, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
const path = this._getFilePath(file)
|
||||
const dir = this._dirname(path)
|
||||
let stream
|
||||
try {
|
||||
if (dir) {
|
||||
await client.ensureDir(dir)
|
||||
}
|
||||
stream = await client.createWriteStream(path, options) // FIXME ensure that options are properly handled by @marsaud/smb2
|
||||
} catch (err) {
|
||||
client.close()
|
||||
throw err
|
||||
}
|
||||
stream.on('finish', () => client.close())
|
||||
return stream
|
||||
}
|
||||
|
||||
async _unlink (file) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return client.unlink(this._getFilePath(file))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async _getSize (file) {
|
||||
const client = await this._getClient(this._remote)
|
||||
try {
|
||||
return client.getSize(this._getFilePath(file))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
106
src/utils.js
106
src/utils.js
@@ -1,15 +1,22 @@
|
||||
import base64url from 'base64url'
|
||||
import eventToPromise from 'event-to-promise'
|
||||
import forEach from 'lodash.foreach'
|
||||
import has from 'lodash.has'
|
||||
import humanFormat from 'human-format'
|
||||
import invert from 'lodash.invert'
|
||||
import isArray from 'lodash.isarray'
|
||||
import isString from 'lodash.isstring'
|
||||
import kindOf from 'kindof'
|
||||
import multiKeyHashInt from 'multikey-hash'
|
||||
import xml2js from 'xml2js'
|
||||
import { defer } from 'promise-toolbox'
|
||||
import {promisify} from 'bluebird'
|
||||
import {randomBytes} from 'crypto'
|
||||
import {
|
||||
createHash,
|
||||
randomBytes
|
||||
} from 'crypto'
|
||||
import { Readable } from 'stream'
|
||||
import through2 from 'through2'
|
||||
import {utcFormat as d3TimeFormat} from 'd3-time-format'
|
||||
|
||||
// ===================================================================
|
||||
@@ -50,6 +57,90 @@ export const createRawObject = Object.create
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
const ALGORITHM_TO_ID = {
|
||||
md5: '1',
|
||||
sha256: '5',
|
||||
sha512: '6'
|
||||
}
|
||||
|
||||
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
|
||||
|
||||
// Wrap a readable stream in a stream with a checksum promise
|
||||
// attribute which is resolved at the end of an input stream.
|
||||
// (Finally .checksum contains the checksum of the input stream)
|
||||
//
|
||||
// Example:
|
||||
// const sourceStream = ...
|
||||
// const targetStream = ...
|
||||
// const checksumStream = addChecksumToReadStream(sourceStream)
|
||||
// await Promise.all([
|
||||
// eventToPromise(checksumStream.pipe(targetStream), 'finish'),
|
||||
// checksumStream.checksum.then(console.log)
|
||||
// ])
|
||||
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
|
||||
const algorithmId = ALGORITHM_TO_ID[algorithm]
|
||||
|
||||
if (!algorithmId) {
|
||||
throw new Error(`unknown algorithm: ${algorithm}`)
|
||||
}
|
||||
|
||||
const hash = createHash(algorithm)
|
||||
const { promise, resolve } = defer()
|
||||
|
||||
const wrapper = stream.pipe(through2(
|
||||
(chunk, enc, callback) => {
|
||||
hash.update(chunk)
|
||||
callback(null, chunk)
|
||||
},
|
||||
callback => {
|
||||
resolve(hash.digest('hex'))
|
||||
callback()
|
||||
}
|
||||
))
|
||||
|
||||
stream.on('error', error => wrapper.emit('error', error))
|
||||
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
|
||||
|
||||
return wrapper
|
||||
}
|
||||
|
||||
// Check if the checksum of a readable stream is equals to an expected checksum.
|
||||
// The given stream is wrapped in a stream which emits an error event
|
||||
// if the computed checksum is not equals to the expected checksum.
|
||||
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
|
||||
const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1))
|
||||
|
||||
if (!algorithmId) {
|
||||
throw new Error(`unknown algorithm: ${algorithmId}`)
|
||||
}
|
||||
|
||||
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
|
||||
|
||||
const wrapper = stream.pipe(through2(
|
||||
{ highWaterMark: 0 },
|
||||
(chunk, enc, callback) => {
|
||||
hash.update(chunk)
|
||||
callback(null, chunk)
|
||||
},
|
||||
callback => {
|
||||
const checksum = `$${algorithmId}$$${hash.digest('hex')}`
|
||||
|
||||
callback(
|
||||
checksum !== expectedChecksum
|
||||
? new Error(`Bad checksum (${checksum}), expected: ${expectedChecksum}`)
|
||||
: null
|
||||
)
|
||||
}
|
||||
))
|
||||
|
||||
stream.on('error', error => wrapper.emit('error', error))
|
||||
wrapper.checksumVerified = eventToPromise(wrapper, 'end')
|
||||
|
||||
return wrapper
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Ensure the value is an array, wrap it if necessary.
|
||||
export function ensureArray (value) {
|
||||
if (value === undefined) {
|
||||
@@ -289,6 +380,19 @@ export function parseSize (size) {
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
const _has = Object.prototype.hasOwnProperty
|
||||
|
||||
// Removes an own property from an object and returns its value.
|
||||
export const popProperty = obj => {
|
||||
for (const prop in obj) {
|
||||
if (_has.call(obj, prop)) {
|
||||
return extractProperty(obj, prop)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Format a date in ISO 8601 in a safe way to be used in filenames
|
||||
// (even on Windows).
|
||||
export const safeDateFormat = d3TimeFormat('%Y%m%dT%H%M%SZ')
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import includes from 'lodash.includes'
|
||||
import isArray from 'lodash.isarray'
|
||||
|
||||
import {
|
||||
@@ -228,7 +227,6 @@ const TRANSFORMS = {
|
||||
other: otherConfig,
|
||||
os_version: guestMetrics && guestMetrics.os_version || null,
|
||||
power_state: obj.power_state,
|
||||
snapshot_time: toTimestamp(obj.snapshot_time),
|
||||
snapshots: link(obj, 'snapshots'),
|
||||
tags: obj.tags,
|
||||
VIFs: link(obj, 'VIFs'),
|
||||
@@ -277,6 +275,7 @@ const TRANSFORMS = {
|
||||
} else if (obj.is_a_snapshot) {
|
||||
vm.type += '-snapshot'
|
||||
|
||||
vm.snapshot_time = toTimestamp(obj.snapshot_time)
|
||||
vm.$snapshot_of = link(obj, 'snapshot_of')
|
||||
} else if (obj.is_a_template) {
|
||||
vm.type += '-template'
|
||||
@@ -307,8 +306,10 @@ const TRANSFORMS = {
|
||||
})(),
|
||||
install_repository: otherConfig['install-repository']
|
||||
}
|
||||
} else if (includes(obj.current_operations, 'migrate_send')) {
|
||||
vm.id = obj.$ref
|
||||
}
|
||||
|
||||
if (obj.VCPUs_params && obj.VCPUs_params.weight) {
|
||||
vm.cpuWeight = obj.VCPUs_params.weight
|
||||
}
|
||||
|
||||
if (!isHvm) {
|
||||
@@ -325,9 +326,12 @@ const TRANSFORMS = {
|
||||
type: 'SR',
|
||||
|
||||
content_type: obj.content_type,
|
||||
|
||||
// TODO: Should it replace usage?
|
||||
physical_usage: +obj.physical_utilisation,
|
||||
|
||||
name_description: obj.name_description,
|
||||
name_label: obj.name_label,
|
||||
physical_usage: +obj.physical_utilisation,
|
||||
size: +obj.physical_size,
|
||||
SR_type: obj.type,
|
||||
tags: obj.tags,
|
||||
@@ -384,27 +388,32 @@ const TRANSFORMS = {
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
// TODO: should we have a VDI-snapshot type like we have with VMs?
|
||||
vdi (obj) {
|
||||
if (!obj.managed) {
|
||||
return
|
||||
}
|
||||
|
||||
return {
|
||||
const vdi = {
|
||||
type: 'VDI',
|
||||
|
||||
name_description: obj.name_description,
|
||||
name_label: obj.name_label,
|
||||
size: +obj.virtual_size,
|
||||
snapshots: link(obj, 'snapshots'),
|
||||
snapshot_time: toTimestamp(obj.snapshot_time),
|
||||
tags: obj.tags,
|
||||
usage: +obj.physical_utilisation,
|
||||
|
||||
$snapshot_of: link(obj, 'snapshot_of'),
|
||||
$SR: link(obj, 'SR'),
|
||||
$VBDs: link(obj, 'VBDs')
|
||||
}
|
||||
|
||||
if (obj.is_a_snapshot) {
|
||||
vdi.type += '-snapshot'
|
||||
vdi.snapshot_time = toTimestamp(obj.snapshot_time)
|
||||
vdi.$snapshot_of = link(obj, 'snapshot_of')
|
||||
}
|
||||
|
||||
return vdi
|
||||
},
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
110
src/xapi.js
110
src/xapi.js
@@ -3,8 +3,12 @@ import every from 'lodash.every'
|
||||
import fatfs from 'fatfs'
|
||||
import fatfsBuffer, { init as fatfsBufferInit } from './fatfs-buffer'
|
||||
import find from 'lodash.find'
|
||||
import isBoolean from 'lodash.isboolean'
|
||||
import includes from 'lodash.includes'
|
||||
// import isFinite from 'lodash.isfinite'
|
||||
import isFunction from 'lodash.isfunction'
|
||||
import isInteger from 'lodash.isinteger'
|
||||
import isObject from 'lodash.isobject'
|
||||
import pick from 'lodash.pick'
|
||||
import sortBy from 'lodash.sortby'
|
||||
import unzip from 'julien-f-unzip'
|
||||
@@ -19,6 +23,8 @@ import {
|
||||
debounce,
|
||||
deferrable
|
||||
} from './decorators'
|
||||
import { satisfies as versionSatisfies } from 'semver'
|
||||
|
||||
import {
|
||||
bufferToStream,
|
||||
camelToSnakeCase,
|
||||
@@ -89,7 +95,7 @@ const put = (stream, {
|
||||
})
|
||||
}
|
||||
|
||||
return promise.readAll
|
||||
return promise.readAll()
|
||||
}
|
||||
|
||||
return makeRequest().readAll()
|
||||
@@ -106,6 +112,12 @@ const asInteger = value => String(value)
|
||||
|
||||
const filterUndefineds = obj => pick(obj, value => value !== undefined)
|
||||
|
||||
const prepareXapiParam = param => {
|
||||
// if (isFinite(param) && !isInteger(param)) { return asFloat(param) }
|
||||
if (isInteger(param)) { return asInteger(param) }
|
||||
if (isBoolean(param)) { return asBoolean(param) }
|
||||
if (isObject(param)) { return map(filterUndefineds(param), prepareXapiParam) }
|
||||
}
|
||||
// ===================================================================
|
||||
|
||||
const typeToNamespace = createRawObject()
|
||||
@@ -352,12 +364,11 @@ export default class Xapi extends XapiBase {
|
||||
await Promise.all(mapToArray(values, (value, name) => {
|
||||
if (value !== undefined) {
|
||||
name = camelToSnakeCase(name)
|
||||
|
||||
const removal = this.call(remove, ref, name).catch(noop)
|
||||
const removal = this.call(remove, ref, name)
|
||||
|
||||
return value === null
|
||||
? removal
|
||||
: removal.catch(noop).then(() => this.call(add, ref, name, value))
|
||||
: removal.catch(noop).then(() => this.call(add, ref, name, prepareXapiParam(value)))
|
||||
}
|
||||
}))
|
||||
}
|
||||
@@ -1259,9 +1270,16 @@ export default class Xapi extends XapiBase {
|
||||
|
||||
// Create a snapshot of the VM and returns a delta export object.
|
||||
@deferrable.onFailure
|
||||
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined) {
|
||||
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined, {
|
||||
snapshotNameLabel = undefined
|
||||
} = {}) {
|
||||
const vm = await this.snapshotVm(vmId)
|
||||
$onFailure(() => this._deleteVm(vm, true))
|
||||
if (snapshotNameLabel) {
|
||||
this._setObjectProperties(vm, {
|
||||
nameLabel: snapshotNameLabel
|
||||
}).catch(noop)
|
||||
}
|
||||
|
||||
const baseVm = baseVmId && this.getObject(baseVmId)
|
||||
|
||||
@@ -1322,6 +1340,7 @@ export default class Xapi extends XapiBase {
|
||||
// TODO: make non-enumerable?
|
||||
streams: await streams::pAll(),
|
||||
|
||||
version: '1.0.0',
|
||||
vbds,
|
||||
vdis,
|
||||
vifs,
|
||||
@@ -1341,8 +1360,15 @@ export default class Xapi extends XapiBase {
|
||||
async importDeltaVm ($onFailure, delta, {
|
||||
deleteBase = false,
|
||||
name_label = delta.vm.name_label,
|
||||
srId = this.pool.default_SR
|
||||
srId = this.pool.default_SR,
|
||||
disableStartAfterImport = true
|
||||
} = {}) {
|
||||
const { version } = delta
|
||||
|
||||
if (!versionSatisfies(version, '^1')) {
|
||||
throw new Error(`Unsupported delta backup version: ${version}`)
|
||||
}
|
||||
|
||||
const remoteBaseVmUuid = delta.vm.other_config[TAG_BASE_DELTA]
|
||||
let baseVm
|
||||
if (remoteBaseVmUuid) {
|
||||
@@ -1447,7 +1473,11 @@ export default class Xapi extends XapiBase {
|
||||
// Import VDI contents.
|
||||
Promise.all(mapToArray(
|
||||
newVdis,
|
||||
(vdi, id) => this._importVdiContent(vdi, streams[`${id}.vhd`], VDI_FORMAT_VHD)
|
||||
async (vdi, id) => {
|
||||
for (const stream of ensureArray(streams[`${id}.vhd`])) {
|
||||
await this._importVdiContent(vdi, stream, VDI_FORMAT_VHD)
|
||||
}
|
||||
}
|
||||
)),
|
||||
|
||||
// Wait for VDI export tasks (if any) termination.
|
||||
@@ -1472,23 +1502,42 @@ export default class Xapi extends XapiBase {
|
||||
this._setObjectProperties(vm, {
|
||||
name_label
|
||||
}),
|
||||
// FIXME: move
|
||||
this._updateObjectMapProperty(vm, 'blocked_operations', {
|
||||
start: 'Do not start this VM, clone it if you want to use it.' // FIXME: move
|
||||
start: disableStartAfterImport
|
||||
? 'Do not start this VM, clone it if you want to use it.'
|
||||
: null
|
||||
})
|
||||
])
|
||||
|
||||
return vm
|
||||
}
|
||||
|
||||
async _migrateVMWithStorageMotion (vm, hostXapi, host, {
|
||||
async _migrateVmWithStorageMotion (vm, hostXapi, host, {
|
||||
migrationNetwork = find(host.$PIFs, pif => pif.management).$network, // TODO: handle not found
|
||||
sr = host.$pool.$default_SR, // TODO: handle not found
|
||||
vifsMap = {}
|
||||
mapVdisSrs,
|
||||
mapVifsNetworks
|
||||
}) {
|
||||
// VDIs/SRs mapping
|
||||
const vdis = {}
|
||||
const defaultSrRef = host.$pool.$default_SR.$ref
|
||||
for (const vbd of vm.$VBDs) {
|
||||
if (vbd.type !== 'CD') {
|
||||
vdis[vbd.$VDI.$ref] = sr.$ref
|
||||
const vdi = vbd.$VDI
|
||||
if (vbd.type === 'Disk') {
|
||||
vdis[vdi.$ref] = mapVdisSrs && mapVdisSrs[vdi.$id]
|
||||
? hostXapi.getObject(mapVdisSrs[vdi.$id]).$ref
|
||||
: defaultSrRef
|
||||
}
|
||||
}
|
||||
|
||||
// VIFs/Networks mapping
|
||||
let vifsMap = {}
|
||||
if (vm.$pool !== host.$pool) {
|
||||
const defaultNetworkRef = find(host.$PIFs, pif => pif.management).$network.$ref
|
||||
for (const vif of vm.$VIFs) {
|
||||
vifsMap[vif.$ref] = mapVifsNetworks && mapVifsNetworks[vif.$id]
|
||||
? hostXapi.getObject(mapVifsNetworks[vif.$id]).$ref
|
||||
: defaultNetworkRef
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1572,8 +1621,8 @@ export default class Xapi extends XapiBase {
|
||||
|
||||
async migrateVm (vmId, hostXapi, hostId, {
|
||||
migrationNetworkId,
|
||||
networkId,
|
||||
srId
|
||||
mapVifsNetworks,
|
||||
mapVdisSrs
|
||||
} = {}) {
|
||||
const vm = this.getObject(vmId)
|
||||
if (!isVmRunning(vm)) {
|
||||
@@ -1586,25 +1635,15 @@ export default class Xapi extends XapiBase {
|
||||
const useStorageMotion = (
|
||||
accrossPools ||
|
||||
migrationNetworkId ||
|
||||
networkId ||
|
||||
srId
|
||||
mapVifsNetworks ||
|
||||
mapVdisSrs
|
||||
)
|
||||
|
||||
if (useStorageMotion) {
|
||||
const vifsMap = {}
|
||||
if (accrossPools || networkId) {
|
||||
const {$ref: networkRef} = networkId
|
||||
? this.getObject(networkId)
|
||||
: find(host.$PIFs, pif => pif.management).$network
|
||||
for (const vif of vm.$VIFs) {
|
||||
vifsMap[vif.$ref] = networkRef
|
||||
}
|
||||
}
|
||||
|
||||
await this._migrateVMWithStorageMotion(vm, hostXapi, host, {
|
||||
migrationNetwork: migrationNetworkId && this.getObject(migrationNetworkId),
|
||||
sr: srId && this.getObject(srId),
|
||||
vifsMap
|
||||
await this._migrateVmWithStorageMotion(vm, hostXapi, host, {
|
||||
migrationNetwork: migrationNetworkId && hostXapi.getObject(migrationNetworkId),
|
||||
mapVdisSrs,
|
||||
mapVifsNetworks
|
||||
})
|
||||
} else {
|
||||
try {
|
||||
@@ -1615,7 +1654,7 @@ export default class Xapi extends XapiBase {
|
||||
}
|
||||
|
||||
// Retry using motion storage.
|
||||
await this._migrateVMWithStorageMotion(vm, hostXapi, host, {})
|
||||
await this._migrateVmWithStorageMotion(vm, hostXapi, host, {})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1629,6 +1668,12 @@ export default class Xapi extends XapiBase {
|
||||
)
|
||||
}
|
||||
|
||||
async setVcpuWeight (vmId, weight) {
|
||||
weight = weight || null // Take all falsy values as a removal (0 included)
|
||||
const vm = this.getObject(vmId)
|
||||
await this._updateObjectMapProperty(vm, 'VCPUs_params', {weight})
|
||||
}
|
||||
|
||||
_startVm (vm) {
|
||||
debug(`Starting VM ${vm.name_label}`)
|
||||
|
||||
@@ -2070,6 +2115,7 @@ export default class Xapi extends XapiBase {
|
||||
|
||||
const task = this._watchTask(taskRef)
|
||||
await Promise.all([
|
||||
stream.checksumVerified,
|
||||
task,
|
||||
put(stream, {
|
||||
hostname: host.address,
|
||||
|
||||
@@ -6,21 +6,11 @@ import filter from 'lodash.filter'
|
||||
import findIndex from 'lodash.findindex'
|
||||
import sortBy from 'lodash.sortby'
|
||||
import startsWith from 'lodash.startswith'
|
||||
import {
|
||||
createReadStream,
|
||||
createWriteStream,
|
||||
ensureDir,
|
||||
readdir,
|
||||
readFile,
|
||||
rename,
|
||||
stat,
|
||||
unlink,
|
||||
writeFile
|
||||
} from 'fs-promise'
|
||||
import {
|
||||
basename,
|
||||
dirname
|
||||
} from 'path'
|
||||
import { satisfies as versionSatisfies } from 'semver'
|
||||
|
||||
import xapiObjectToXo from '../xapi-object-to-xo'
|
||||
import {
|
||||
@@ -39,6 +29,9 @@ import {
|
||||
|
||||
// ===================================================================
|
||||
|
||||
const DELTA_BACKUP_EXT = '.json'
|
||||
const DELTA_BACKUP_EXT_LENGTH = DELTA_BACKUP_EXT.length
|
||||
|
||||
// Test if a file is a vdi backup. (full or delta)
|
||||
const isVdiBackup = name => /^\d+T\d+Z_(?:full|delta)\.vhd$/.test(name)
|
||||
|
||||
@@ -48,7 +41,27 @@ const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name)
|
||||
// Get the timestamp of a vdi backup. (full or delta)
|
||||
const getVdiTimestamp = name => {
|
||||
const arr = /^(\d+T\d+Z)_(?:full|delta)\.vhd$/.exec(name)
|
||||
return arr[1] || undefined
|
||||
return arr[1]
|
||||
}
|
||||
|
||||
const getDeltaBackupNameWithoutExt = name => name.slice(0, -DELTA_BACKUP_EXT_LENGTH)
|
||||
const isDeltaBackup = name => endsWith(name, DELTA_BACKUP_EXT)
|
||||
|
||||
async function checkFileIntegrity (handler, name) {
|
||||
let stream
|
||||
|
||||
try {
|
||||
stream = await handler.createReadStream(name, { checksum: true })
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
return
|
||||
}
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
stream.resume()
|
||||
await eventToPromise(stream, 'finish')
|
||||
}
|
||||
|
||||
// ===================================================================
|
||||
@@ -59,57 +72,35 @@ export default class {
|
||||
}
|
||||
|
||||
async listRemoteBackups (remoteId) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = remote.path
|
||||
const handler = await this._xo.getRemoteHandler(remoteId)
|
||||
|
||||
// List backups. (Except delta backups)
|
||||
const xvaFilter = file => endsWith(file, '.xva')
|
||||
// List backups. (No delta)
|
||||
const backupFilter = file => endsWith(file, '.xva')
|
||||
|
||||
const files = await readdir(path)
|
||||
const backups = filter(files, xvaFilter)
|
||||
const files = await handler.list()
|
||||
const backups = filter(files, backupFilter)
|
||||
|
||||
// List delta backups.
|
||||
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
|
||||
|
||||
for (const deltaDir of deltaDirs) {
|
||||
const files = await readdir(`${path}/${deltaDir}`)
|
||||
const deltaBackups = filter(files, xvaFilter)
|
||||
const files = await handler.list(deltaDir)
|
||||
const deltaBackups = filter(files, isDeltaBackup)
|
||||
|
||||
backups.push(...mapToArray(
|
||||
deltaBackups,
|
||||
deltaBackup => `${deltaDir}/${deltaBackup}`
|
||||
deltaBackup => {
|
||||
return `${deltaDir}/${getDeltaBackupNameWithoutExt(deltaBackup)}`
|
||||
}
|
||||
))
|
||||
}
|
||||
|
||||
return backups
|
||||
}
|
||||
|
||||
// TODO: move into utils and rename!
|
||||
async _openAndwaitReadableFile (path, errorMessage) {
|
||||
const stream = createReadStream(path)
|
||||
|
||||
try {
|
||||
await eventToPromise(stream, 'readable')
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
throw new Error(errorMessage)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
stream.length = (await stat(path)).size
|
||||
|
||||
return stream
|
||||
}
|
||||
|
||||
async importVmBackup (remoteId, file, sr) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = `${remote.path}/${file}`
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
path,
|
||||
'VM to import not found in this remote'
|
||||
)
|
||||
|
||||
const handler = await this._xo.getRemoteHandler(remoteId)
|
||||
const stream = await handler.createReadStream(file)
|
||||
const xapi = this._xo.getXapi(sr)
|
||||
|
||||
await xapi.importVm(stream, { srId: sr._xapiId })
|
||||
@@ -137,7 +128,9 @@ export default class {
|
||||
|
||||
// 2. Copy.
|
||||
const dstVm = await (async () => {
|
||||
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid)
|
||||
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid, {
|
||||
snapshotNameLabel: `XO_DELTA_EXPORT: ${targetSr.name_label} (${targetSr.uuid})`
|
||||
})
|
||||
$onFailure(async () => {
|
||||
await Promise.all(mapToArray(
|
||||
delta.streams,
|
||||
@@ -179,39 +172,127 @@ export default class {
|
||||
|
||||
// TODO: The other backup methods must use this function !
|
||||
// Prerequisite: The backups array must be ordered. (old to new backups)
|
||||
async _removeOldBackups (backups, path, n) {
|
||||
async _removeOldBackups (backups, handler, dir, n) {
|
||||
if (n <= 0) {
|
||||
return
|
||||
}
|
||||
|
||||
const getPath = (file, dir) => dir ? `${dir}/${file}` : file
|
||||
|
||||
await Promise.all(
|
||||
mapToArray(backups.slice(0, n), backup => unlink(`${path}/${backup}`))
|
||||
mapToArray(backups.slice(0, n), async backup => await handler.unlink(getPath(backup, dir)))
|
||||
)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async _listVdiBackups (path) {
|
||||
const files = await readdir(path)
|
||||
async _legacyImportDeltaVdiBackup (xapi, { vmId, handler, dir, vdiInfo }) {
|
||||
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
|
||||
const vdiId = vdi.$id
|
||||
|
||||
// dir = vm_delta_xxx
|
||||
// xoPath = vdi_xxx/timestamp_(full|delta).vhd
|
||||
// vdiDir = vdi_xxx
|
||||
const { xoPath } = vdiInfo
|
||||
const filePath = `${dir}/${xoPath}`
|
||||
const vdiDir = dirname(xoPath)
|
||||
|
||||
const backups = await this._listDeltaVdiDependencies(handler, filePath)
|
||||
|
||||
for (const backup of backups) {
|
||||
const stream = await handler.createReadStream(`${dir}/${vdiDir}/${backup}`)
|
||||
|
||||
await xapi.importVdiContent(vdiId, stream, {
|
||||
format: VDI_FORMAT_VHD
|
||||
})
|
||||
}
|
||||
|
||||
return vdiId
|
||||
}
|
||||
|
||||
async _legacyImportDeltaVmBackup (xapi, { remoteId, handler, filePath, info, sr }) {
|
||||
// Import vm metadata.
|
||||
const vm = await (async () => {
|
||||
const stream = await handler.createReadStream(`${filePath}.xva`)
|
||||
return await xapi.importVm(stream, { onlyMetadata: true })
|
||||
})()
|
||||
|
||||
const vmName = vm.name_label
|
||||
const dir = dirname(filePath)
|
||||
|
||||
// Disable start and change the VM name label during import.
|
||||
await Promise.all([
|
||||
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
|
||||
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
|
||||
])
|
||||
|
||||
// Destroy vbds if necessary. Why ?
|
||||
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
|
||||
await xapi.destroyVbdsFromVm(vm.uuid)
|
||||
|
||||
// Import VDIs.
|
||||
const vdiIds = {}
|
||||
await Promise.all(
|
||||
mapToArray(
|
||||
info.vdis,
|
||||
async vdiInfo => {
|
||||
vdiInfo.sr = sr._xapiId
|
||||
|
||||
const vdiId = await this._legacyImportDeltaVdiBackup(xapi, { vmId: vm.$id, handler, dir, vdiInfo })
|
||||
vdiIds[vdiInfo.uuid] = vdiId
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
await Promise.all(
|
||||
mapToArray(
|
||||
info.vbds,
|
||||
vbdInfo => {
|
||||
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
// Import done, reenable start and set real vm name.
|
||||
await Promise.all([
|
||||
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
|
||||
xapi._setObjectProperties(vm, { name_label: vmName })
|
||||
])
|
||||
|
||||
return vm
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async _listVdiBackups (handler, dir) {
|
||||
let files
|
||||
|
||||
try {
|
||||
files = await handler.list(dir)
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
files = []
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
|
||||
let i
|
||||
|
||||
// Avoid unstable state: No full vdi found to the beginning of array. (base)
|
||||
for (i = 0; i < backups.length && isDeltaVdiBackup(backups[i]); i++);
|
||||
await this._removeOldBackups(backups, path, i)
|
||||
await this._removeOldBackups(backups, handler, dir, i)
|
||||
|
||||
return backups.slice(i)
|
||||
}
|
||||
|
||||
async _deltaVdiBackup ({vdi, path, depth}) {
|
||||
const xapi = this._xo.getXapi(vdi)
|
||||
async _deltaVdiBackup (xapi, {vdi, handler, dir, depth}) {
|
||||
const backupDirectory = `vdi_${vdi.uuid}`
|
||||
|
||||
vdi = xapi.getObject(vdi._xapiId)
|
||||
path = `${path}/${backupDirectory}`
|
||||
await ensureDir(path)
|
||||
dir = `${dir}/${backupDirectory}`
|
||||
|
||||
const backups = await this._listVdiBackups(path)
|
||||
const backups = await this._listVdiBackups(handler, dir)
|
||||
|
||||
// Make snapshot.
|
||||
const date = safeDateFormat(new Date())
|
||||
@@ -234,7 +315,7 @@ export default class {
|
||||
|
||||
// Export full or delta backup.
|
||||
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
|
||||
const backupFullPath = `${path}/${vdiFilename}`
|
||||
const backupFullPath = `${dir}/${vdiFilename}`
|
||||
|
||||
try {
|
||||
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
|
||||
@@ -242,16 +323,27 @@ export default class {
|
||||
format: VDI_FORMAT_VHD
|
||||
})
|
||||
|
||||
const targetStream = createWriteStream(backupFullPath, { flags: 'wx' })
|
||||
const targetStream = await handler.createOutputStream(backupFullPath, {
|
||||
// FIXME: Checksum is not computed for full vdi backups.
|
||||
// The problem is in the merge case, a delta merged in a full vdi
|
||||
// backup forces us to browse the resulting file =>
|
||||
// Significant transfer time on the network !
|
||||
checksum: !isFull,
|
||||
flags: 'wx'
|
||||
})
|
||||
|
||||
sourceStream.on('error', error => targetStream.emit('error', error))
|
||||
await eventToPromise(sourceStream.pipe(targetStream), 'finish')
|
||||
} catch (e) {
|
||||
// Remove new backup. (corrupt) and delete new vdi base.
|
||||
await xapi.deleteVdi(currentSnapshot.$id)
|
||||
await unlink(backupFullPath).catch(noop)
|
||||
|
||||
throw e
|
||||
await Promise.all([
|
||||
eventToPromise(sourceStream.pipe(targetStream), 'finish'),
|
||||
sourceStream.task
|
||||
])
|
||||
} catch (error) {
|
||||
// Remove new backup. (corrupt) and delete new vdi base.
|
||||
xapi.deleteVdi(currentSnapshot.$id).catch(noop)
|
||||
await handler.unlink(backupFullPath, { checksum: true }).catch(noop)
|
||||
|
||||
throw error
|
||||
}
|
||||
|
||||
// Returns relative path. (subdir and vdi filename), old/new base.
|
||||
@@ -263,8 +355,12 @@ export default class {
|
||||
}
|
||||
}
|
||||
|
||||
async _mergeDeltaVdiBackups ({path, depth}) {
|
||||
const backups = await this._listVdiBackups(path)
|
||||
async _mergeDeltaVdiBackups ({handler, dir, depth}) {
|
||||
if (handler.type === 'smb') {
|
||||
throw new Error('VDI merging is not available through SMB')
|
||||
}
|
||||
|
||||
const backups = await this._listVdiBackups(handler, dir)
|
||||
let i = backups.length - depth
|
||||
|
||||
// No merge.
|
||||
@@ -272,55 +368,49 @@ export default class {
|
||||
return
|
||||
}
|
||||
|
||||
const newFull = `${getVdiTimestamp(backups[i])}_full.vhd`
|
||||
const vhdUtil = `${__dirname}/../../bin/vhd-util`
|
||||
|
||||
const timestamp = getVdiTimestamp(backups[i])
|
||||
const newFullBackup = `${dir}/${timestamp}_full.vhd`
|
||||
|
||||
await checkFileIntegrity(handler, `${dir}/${backups[i]}`)
|
||||
|
||||
for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) {
|
||||
const backup = `${path}/${backups[i]}`
|
||||
const parent = `${path}/${backups[i - 1]}`
|
||||
const backup = `${dir}/${backups[i]}`
|
||||
const parent = `${dir}/${backups[i - 1]}`
|
||||
|
||||
const path = handler._remote.path // FIXME, private attribute !
|
||||
|
||||
try {
|
||||
await execa(vhdUtil, ['modify', '-n', backup, '-p', parent])
|
||||
await execa(vhdUtil, ['coalesce', '-n', backup])
|
||||
await checkFileIntegrity(handler, `${dir}/${backups[i - 1]}`)
|
||||
await execa(vhdUtil, ['modify', '-n', `${path}/${backup}`, '-p', `${path}/${parent}`]) // FIXME not ok at least with smb remotes
|
||||
await execa(vhdUtil, ['coalesce', '-n', `${path}/${backup}`]) // FIXME not ok at least with smb remotes
|
||||
} catch (e) {
|
||||
console.error('Unable to use vhd-util.', e)
|
||||
throw e
|
||||
}
|
||||
|
||||
await unlink(backup)
|
||||
await handler.unlink(backup, { checksum: true })
|
||||
}
|
||||
|
||||
// The base was removed, it exists two full backups or more ?
|
||||
// => Remove old backups before the most recent full.
|
||||
if (i > 0) {
|
||||
for (i--; i >= 0; i--) {
|
||||
await unlink(`${path}/${backups[i]}`)
|
||||
await handler.unlink(`${dir}/${backups[i]}`, { checksum: true })
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Rename the first old full backup to the new full backup.
|
||||
await rename(`${path}/${backups[0]}`, `${path}/${newFull}`)
|
||||
await handler.rename(`${dir}/${backups[0]}`, newFullBackup)
|
||||
}
|
||||
|
||||
async _importVdiBackupContent (xapi, file, vdiId) {
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
file,
|
||||
'VDI to import not found in this remote'
|
||||
)
|
||||
|
||||
await xapi.importVdiContent(vdiId, stream, {
|
||||
format: VDI_FORMAT_VHD
|
||||
})
|
||||
}
|
||||
|
||||
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = dirname(`${remote.path}/${filePath}`)
|
||||
|
||||
async _listDeltaVdiDependencies (handler, filePath) {
|
||||
const dir = dirname(filePath)
|
||||
const filename = basename(filePath)
|
||||
const backups = await this._listVdiBackups(path)
|
||||
const backups = await this._listVdiBackups(handler, dir)
|
||||
|
||||
// Search file. (delta or full backup)
|
||||
const i = findIndex(backups, backup =>
|
||||
@@ -340,81 +430,83 @@ export default class {
|
||||
throw new Error(`Unable to found full vdi backup of: ${filePath}`)
|
||||
}
|
||||
|
||||
// Restore...
|
||||
const xapi = this._xo.getXapi(vdi)
|
||||
|
||||
for (; j <= i; j++) {
|
||||
await this._importVdiBackupContent(xapi, `${path}/${backups[j]}`, vdi._xapiId)
|
||||
}
|
||||
return backups.slice(j, i + 1)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async _listDeltaVmBackups (path) {
|
||||
const files = await readdir(path)
|
||||
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
|
||||
async _listDeltaVmBackups (handler, dir) {
|
||||
const files = await handler.list(dir)
|
||||
return await sortBy(filter(files, isDeltaBackup))
|
||||
}
|
||||
|
||||
async _failedRollingDeltaVmBackup (xapi, path, fulFilledVdiBackups) {
|
||||
async _failedRollingDeltaVmBackup (xapi, handler, dir, fulFilledVdiBackups) {
|
||||
await Promise.all(
|
||||
mapToArray(fulFilledVdiBackups, async vdiBackup => {
|
||||
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
|
||||
|
||||
await xapi.deleteVdi(newBaseId)
|
||||
await unlink(`${path}/${backupDirectory}/${vdiFilename}`).catch(noop)
|
||||
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`, { checksum: true }).catch(noop)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const directory = `vm_delta_${tag}_${vm.uuid}`
|
||||
const path = `${remote.path}/${directory}`
|
||||
|
||||
await ensureDir(path)
|
||||
if (!remote) {
|
||||
throw new Error(`No such Remote ${remoteId}`)
|
||||
}
|
||||
if (!remote.enabled) {
|
||||
throw new Error(`Remote ${remoteId} is disabled`)
|
||||
}
|
||||
|
||||
const handler = await this._xo.getRemoteHandler(remote)
|
||||
if (handler.type === 'smb') {
|
||||
throw new Error('Delta Backup is not supported for smb remotes')
|
||||
}
|
||||
|
||||
const dir = `vm_delta_${tag}_${vm.uuid}`
|
||||
|
||||
const info = {
|
||||
vbds: [],
|
||||
vdis: {}
|
||||
version: '1.0.0',
|
||||
vbds: {},
|
||||
vdis: {},
|
||||
vifs: {}
|
||||
}
|
||||
|
||||
const promises = []
|
||||
const xapi = this._xo.getXapi(vm)
|
||||
|
||||
for (const vbdId of vm.$VBDs) {
|
||||
const vbd = this._xo.getObject(vbdId)
|
||||
vm = xapi.getObject(vm._xapiId)
|
||||
|
||||
if (!vbd.VDI) {
|
||||
for (const vbd of vm.$VBDs) {
|
||||
const vdiId = vbd.VDI
|
||||
|
||||
if (!vdiId || vbd.type !== 'Disk') {
|
||||
continue
|
||||
}
|
||||
|
||||
if (vbd.is_cd_drive) {
|
||||
continue
|
||||
}
|
||||
|
||||
const vdiXo = this._xo.getObject(vbd.VDI)
|
||||
const vdi = xapi.getObject(vdiXo._xapiId)
|
||||
const vdiUUID = vdi.uuid
|
||||
|
||||
info.vbds.push({
|
||||
...xapi.getObject(vbd._xapiId),
|
||||
xoVdi: vdiUUID
|
||||
})
|
||||
info.vbds[vbd.$ref] = vbd
|
||||
|
||||
// Warning: There may be the same VDI id for a VBD set.
|
||||
if (!info.vdis[vdiUUID]) {
|
||||
info.vdis[vdiUUID] = { ...vdi }
|
||||
promises.push(
|
||||
this._deltaVdiBackup({vdi: vdiXo, path, depth}).then(
|
||||
vdiBackup => {
|
||||
const { backupDirectory, vdiFilename } = vdiBackup
|
||||
info.vdis[vdiUUID].xoPath = `${backupDirectory}/${vdiFilename}`
|
||||
|
||||
return vdiBackup
|
||||
}
|
||||
)
|
||||
)
|
||||
if (info.vdis[vdiId]) {
|
||||
continue
|
||||
}
|
||||
|
||||
const vdi = vbd.$VDI
|
||||
|
||||
info.vdis[vdiId] = { ...vdi }
|
||||
promises.push(
|
||||
this._deltaVdiBackup(xapi, {vdi, handler, dir, depth}).then(
|
||||
vdiBackup => {
|
||||
const { backupDirectory, vdiFilename } = vdiBackup
|
||||
info.vdis[vdiId].xoPath = `${backupDirectory}/${vdiFilename}`
|
||||
|
||||
return vdiBackup
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
const vdiBackups = await pSettle(promises)
|
||||
@@ -432,29 +524,28 @@ export default class {
|
||||
}
|
||||
|
||||
if (fail) {
|
||||
console.error(`Remove successful backups in ${path}`, fulFilledVdiBackups)
|
||||
await this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
|
||||
console.error(`Remove successful backups in ${dir}`, fulFilledVdiBackups)
|
||||
await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
|
||||
|
||||
throw new Error('Rolling delta vm backup failed.')
|
||||
}
|
||||
|
||||
const backups = await this._listDeltaVmBackups(path)
|
||||
const date = safeDateFormat(new Date())
|
||||
const backupFormat = `${date}_${vm.name_label}`
|
||||
|
||||
const xvaPath = `${path}/${backupFormat}.xva`
|
||||
const infoPath = `${path}/${backupFormat}.json`
|
||||
const infoPath = `${dir}/${backupFormat}${DELTA_BACKUP_EXT}`
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
this.backupVm({vm, pathToFile: xvaPath, onlyMetadata: true}),
|
||||
writeFile(infoPath, JSON.stringify(info), {flag: 'wx'})
|
||||
])
|
||||
for (const vif of vm.$VIFs) {
|
||||
info.vifs[vif.$ref] = vif
|
||||
}
|
||||
|
||||
info.vm = vm
|
||||
|
||||
await handler.outputFile(infoPath, JSON.stringify(info, null, 2), {flag: 'wx'})
|
||||
} catch (e) {
|
||||
await Promise.all([
|
||||
unlink(xvaPath).catch(noop),
|
||||
unlink(infoPath).catch(noop),
|
||||
this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
|
||||
handler.unlink(infoPath).catch(noop),
|
||||
this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
|
||||
])
|
||||
|
||||
throw e
|
||||
@@ -464,12 +555,26 @@ export default class {
|
||||
await Promise.all(
|
||||
mapToArray(vdiBackups, vdiBackup => {
|
||||
const { backupDirectory } = vdiBackup.value()
|
||||
return this._mergeDeltaVdiBackups({path: `${path}/${backupDirectory}`, depth})
|
||||
return this._mergeDeltaVdiBackups({handler, dir: `${dir}/${backupDirectory}`, depth})
|
||||
})
|
||||
)
|
||||
|
||||
// Remove x2 files : json AND xva files.
|
||||
await this._removeOldBackups(backups, path, backups.length - (depth - 1) * 2)
|
||||
// Remove old vm backups.
|
||||
const backups = await this._listDeltaVmBackups(handler, dir)
|
||||
const nOldBackups = backups.length - depth
|
||||
|
||||
if (nOldBackups > 0) {
|
||||
await Promise.all(
|
||||
mapToArray(backups.slice(0, nOldBackups), async backup => {
|
||||
// Remove json file.
|
||||
await handler.unlink(`${dir}/${backup}`)
|
||||
|
||||
// Remove xva file.
|
||||
// Version 0.0.0 (Legacy) Delta Backup.
|
||||
handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`).catch(noop)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
// Remove old vdi bases.
|
||||
Promise.all(
|
||||
@@ -483,87 +588,69 @@ export default class {
|
||||
).catch(noop)
|
||||
|
||||
// Returns relative path.
|
||||
return `${directory}/${backupFormat}`
|
||||
}
|
||||
|
||||
async _importVmMetadata (xapi, file) {
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
file,
|
||||
'VM metadata to import not found in this remote'
|
||||
)
|
||||
return await xapi.importVm(stream, { onlyMetadata: true })
|
||||
}
|
||||
|
||||
async _importDeltaVdiBackupFromVm (xapi, vmId, remoteId, directory, vdiInfo) {
|
||||
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
|
||||
const vdiId = vdi.$id
|
||||
|
||||
await this.importDeltaVdiBackup({
|
||||
vdi: this._xo.getObject(vdiId),
|
||||
remoteId,
|
||||
filePath: `${directory}/${vdiInfo.xoPath}`
|
||||
})
|
||||
|
||||
return vdiId
|
||||
return `${dir}/${backupFormat}`
|
||||
}
|
||||
|
||||
async importDeltaVmBackup ({sr, remoteId, filePath}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const fullBackupPath = `${remote.path}/${filePath}`
|
||||
const handler = await this._xo.getRemoteHandler(remoteId)
|
||||
const xapi = this._xo.getXapi(sr)
|
||||
|
||||
// Import vm metadata.
|
||||
const vm = await this._importVmMetadata(xapi, `${fullBackupPath}.xva`)
|
||||
const vmName = vm.name_label
|
||||
const delta = JSON.parse(await handler.readFile(`${filePath}${DELTA_BACKUP_EXT}`))
|
||||
let vm
|
||||
const { version } = delta
|
||||
|
||||
// Disable start and change the VM name label during import.
|
||||
await Promise.all([
|
||||
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
|
||||
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
|
||||
])
|
||||
if (!version) {
|
||||
// Legacy import. (Version 0.0.0)
|
||||
vm = await this._legacyImportDeltaVmBackup(xapi, {
|
||||
remoteId, handler, filePath, info: delta, sr
|
||||
})
|
||||
} else if (versionSatisfies(delta.version, '^1')) {
|
||||
const basePath = dirname(filePath)
|
||||
const streams = delta.streams = {}
|
||||
|
||||
// Destroy vbds if necessary. Why ?
|
||||
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
|
||||
await xapi.destroyVbdsFromVm(vm.uuid)
|
||||
await Promise.all(
|
||||
mapToArray(
|
||||
delta.vdis,
|
||||
async (vdi, id) => {
|
||||
const vdisFolder = `${basePath}/${dirname(vdi.xoPath)}`
|
||||
const backups = await this._listDeltaVdiDependencies(handler, `${basePath}/${vdi.xoPath}`)
|
||||
|
||||
const info = JSON.parse(await readFile(`${fullBackupPath}.json`))
|
||||
|
||||
// Import VDIs.
|
||||
const vdiIds = {}
|
||||
await Promise.all(
|
||||
mapToArray(
|
||||
info.vdis,
|
||||
async vdiInfo => {
|
||||
vdiInfo.sr = sr._xapiId
|
||||
|
||||
const vdiId = await this._importDeltaVdiBackupFromVm(xapi, vm.$id, remoteId, dirname(filePath), vdiInfo)
|
||||
vdiIds[vdiInfo.uuid] = vdiId
|
||||
}
|
||||
streams[`${id}.vhd`] = await Promise.all(mapToArray(backups, async backup =>
|
||||
handler.createReadStream(`${vdisFolder}/${backup}`, { checksum: true, ignoreMissingChecksum: true })
|
||||
))
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
await Promise.all(
|
||||
mapToArray(
|
||||
info.vbds,
|
||||
vbdInfo => {
|
||||
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
// Import done, reenable start and set real vm name.
|
||||
await Promise.all([
|
||||
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
|
||||
xapi._setObjectProperties(vm, { name_label: vmName })
|
||||
])
|
||||
vm = await xapi.importDeltaVm(delta, {
|
||||
srId: sr._xapiId,
|
||||
disableStartAfterImport: false
|
||||
})
|
||||
} else {
|
||||
throw new Error(`Unsupported delta backup version: ${version}`)
|
||||
}
|
||||
|
||||
return xapiObjectToXo(vm).id
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async backupVm ({vm, pathToFile, compress, onlyMetadata}) {
|
||||
const targetStream = createWriteStream(pathToFile, { flags: 'wx' })
|
||||
async backupVm ({vm, remoteId, file, compress, onlyMetadata}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
|
||||
if (!remote) {
|
||||
throw new Error(`No such Remote ${remoteId}`)
|
||||
}
|
||||
if (!remote.enabled) {
|
||||
throw new Error(`Backup remote ${remoteId} is disabled`)
|
||||
}
|
||||
|
||||
const handler = await this._xo.getRemoteHandler(remote)
|
||||
return this._backupVm(vm, handler, file, {compress, onlyMetadata})
|
||||
}
|
||||
|
||||
async _backupVm (vm, handler, file, {compress, onlyMetadata}) {
|
||||
const targetStream = await handler.createOutputStream(file, { flags: 'wx' })
|
||||
const promise = eventToPromise(targetStream, 'finish')
|
||||
|
||||
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
|
||||
@@ -575,26 +662,28 @@ export default class {
|
||||
await promise
|
||||
}
|
||||
|
||||
async rollingBackupVm ({vm, path, tag, depth, compress, onlyMetadata}) {
|
||||
await ensureDir(path)
|
||||
const files = await readdir(path)
|
||||
async rollingBackupVm ({vm, remoteId, tag, depth, compress, onlyMetadata}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
|
||||
if (!remote) {
|
||||
throw new Error(`No such Remote ${remoteId}`)
|
||||
}
|
||||
if (!remote.enabled) {
|
||||
throw new Error(`Backup remote ${remoteId} is disabled`)
|
||||
}
|
||||
|
||||
const handler = await this._xo.getRemoteHandler(remote)
|
||||
|
||||
const files = await handler.list()
|
||||
|
||||
const reg = new RegExp('^[^_]+_' + escapeStringRegexp(`${tag}_${vm.name_label}.xva`))
|
||||
const backups = sortBy(filter(files, (fileName) => reg.test(fileName)))
|
||||
|
||||
const date = safeDateFormat(new Date())
|
||||
const backupFullPath = `${path}/${date}_${tag}_${vm.name_label}.xva`
|
||||
const file = `${date}_${tag}_${vm.name_label}.xva`
|
||||
|
||||
await this.backupVm({vm, pathToFile: backupFullPath, compress, onlyMetadata})
|
||||
|
||||
const promises = []
|
||||
for (let surplus = backups.length - (depth - 1); surplus > 0; surplus--) {
|
||||
const oldBackup = backups.shift()
|
||||
promises.push(unlink(`${path}/${oldBackup}`))
|
||||
}
|
||||
await Promise.all(promises)
|
||||
|
||||
return backupFullPath
|
||||
await this._backupVm(vm, handler, file, {compress, onlyMetadata})
|
||||
await this._removeOldBackups(backups, handler, undefined, backups.length - (depth - 1))
|
||||
}
|
||||
|
||||
async rollingSnapshotVm (vm, tag, depth) {
|
||||
|
||||
@@ -8,7 +8,6 @@ import {
|
||||
} from '../api-errors'
|
||||
import {
|
||||
createRawObject,
|
||||
noop,
|
||||
mapToArray
|
||||
} from '../utils'
|
||||
|
||||
@@ -94,7 +93,9 @@ export default class {
|
||||
return this.loadPlugin(id)
|
||||
}
|
||||
})
|
||||
.catch(noop)
|
||||
.catch(error => {
|
||||
console.error('register plugin %s: %s', name, error && error.stack || error)
|
||||
})
|
||||
}
|
||||
|
||||
async _getPlugin (id) {
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import startsWith from 'lodash.startswith'
|
||||
|
||||
import RemoteHandler from '../remote-handler'
|
||||
import { Remotes } from '../models/remote'
|
||||
import RemoteHandlerLocal from '../remote-handlers/local'
|
||||
import RemoteHandlerNfs from '../remote-handlers/nfs'
|
||||
import RemoteHandlerSmb from '../remote-handlers/smb'
|
||||
import {
|
||||
forEach
|
||||
} from '../utils'
|
||||
import {
|
||||
NoSuchObject
|
||||
} from '../api-errors'
|
||||
import {
|
||||
forEach,
|
||||
mapToArray
|
||||
} from '../utils'
|
||||
Remotes
|
||||
} from '../models/remote'
|
||||
|
||||
// ===================================================================
|
||||
|
||||
@@ -29,33 +30,30 @@ export default class {
|
||||
})
|
||||
|
||||
xo.on('start', async () => {
|
||||
// TODO: Should it be private?
|
||||
this.remoteHandler = new RemoteHandler()
|
||||
|
||||
await this.initRemotes()
|
||||
await this.syncAllRemotes()
|
||||
})
|
||||
xo.on('stop', () => this.disableAllRemotes())
|
||||
xo.on('stop', () => this.forgetAllRemotes())
|
||||
}
|
||||
|
||||
_developRemote (remote) {
|
||||
const _remote = { ...remote }
|
||||
if (startsWith(_remote.url, 'file://')) {
|
||||
_remote.type = 'local'
|
||||
_remote.path = _remote.url.slice(6)
|
||||
} else if (startsWith(_remote.url, 'nfs://')) {
|
||||
_remote.type = 'nfs'
|
||||
const url = _remote.url.slice(6)
|
||||
const [host, share] = url.split(':')
|
||||
_remote.path = '/tmp/xo-server/mounts/' + _remote.id
|
||||
_remote.host = host
|
||||
_remote.share = share
|
||||
async getRemoteHandler (remote) {
|
||||
if (typeof remote === 'string') {
|
||||
remote = await this.getRemote(remote)
|
||||
}
|
||||
return _remote
|
||||
const Handler = {
|
||||
file: RemoteHandlerLocal,
|
||||
smb: RemoteHandlerSmb,
|
||||
nfs: RemoteHandlerNfs
|
||||
}
|
||||
const type = remote.url.split('://')[0]
|
||||
if (!Handler[type]) {
|
||||
throw new Error('Unhandled remote type')
|
||||
}
|
||||
return new Handler[type](remote)
|
||||
}
|
||||
|
||||
async getAllRemotes () {
|
||||
return mapToArray(await this._remotes.get(), this._developRemote)
|
||||
return this._remotes.get()
|
||||
}
|
||||
|
||||
async _getRemote (id) {
|
||||
@@ -68,7 +66,7 @@ export default class {
|
||||
}
|
||||
|
||||
async getRemote (id) {
|
||||
return this._developRemote((await this._getRemote(id)).properties)
|
||||
return (await this._getRemote(id)).properties
|
||||
}
|
||||
|
||||
async createRemote ({name, url}) {
|
||||
@@ -79,9 +77,10 @@ export default class {
|
||||
async updateRemote (id, {name, url, enabled, error}) {
|
||||
const remote = await this._getRemote(id)
|
||||
this._updateRemote(remote, {name, url, enabled, error})
|
||||
const props = await this.remoteHandler.sync(this._developRemote(remote.properties))
|
||||
const handler = await this.getRemoteHandler(remote.properties)
|
||||
const props = await handler.sync()
|
||||
this._updateRemote(remote, props)
|
||||
return await this._developRemote(this._remotes.save(remote).properties)
|
||||
return (await this._remotes.save(remote)).properties
|
||||
}
|
||||
|
||||
_updateRemote (remote, {name, url, enabled, error}) {
|
||||
@@ -96,8 +95,8 @@ export default class {
|
||||
}
|
||||
|
||||
async removeRemote (id) {
|
||||
const remote = await this.getRemote(id)
|
||||
await this.remoteHandler.forget(remote)
|
||||
const handler = await this.getRemoteHandler(id)
|
||||
await handler.forget()
|
||||
await this._remotes.remove(id)
|
||||
}
|
||||
|
||||
@@ -110,9 +109,13 @@ export default class {
|
||||
}
|
||||
|
||||
// TODO: Should it be private?
|
||||
async disableAllRemotes () {
|
||||
async forgetAllRemotes () {
|
||||
const remotes = await this.getAllRemotes()
|
||||
this.remoteHandler.disableAll(remotes)
|
||||
for (let remote of remotes) {
|
||||
try {
|
||||
(await this.getRemoteHandler(remote)).forget()
|
||||
} catch (_) {}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Should it be private?
|
||||
|
||||
75
src/xo.js
75
src/xo.js
@@ -1,3 +1,4 @@
|
||||
import checkAuthorization from 'xo-acl-resolver'
|
||||
import filter from 'lodash.filter'
|
||||
import fs from 'fs-promise'
|
||||
import includes from 'lodash.includes'
|
||||
@@ -15,7 +16,6 @@ import {
|
||||
} from 'hashy'
|
||||
|
||||
import mixins from './xo-mixins'
|
||||
import checkAuthorization from './acl'
|
||||
import Connection from './connection'
|
||||
import LevelDbLogger from './loggers/leveldb'
|
||||
import Xapi from './xapi'
|
||||
@@ -31,7 +31,8 @@ import {
|
||||
generateToken,
|
||||
isEmpty,
|
||||
mapToArray,
|
||||
noop
|
||||
noop,
|
||||
popProperty
|
||||
} from './utils'
|
||||
import {Groups} from './models/group'
|
||||
import {
|
||||
@@ -95,6 +96,7 @@ export default class Xo extends EventEmitter {
|
||||
this._authenticationFailures = createRawObject()
|
||||
this._authenticationProviders = new Set()
|
||||
this._httpRequestWatchers = createRawObject()
|
||||
this._xenObjConflicts = createRawObject() // TODO: clean when a server is disconnected.
|
||||
|
||||
// Connects to Redis.
|
||||
this._redis = createRedisClient(config.redis && config.redis.uri)
|
||||
@@ -706,27 +708,31 @@ export default class Xo extends EventEmitter {
|
||||
return server
|
||||
}
|
||||
|
||||
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry) {
|
||||
const {_objects: objects} = this
|
||||
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry, conId) {
|
||||
const conflicts = this._xenObjConflicts
|
||||
const objects = this._objects
|
||||
|
||||
forEach(xapiObjects, (xapiObject, xapiId) => {
|
||||
try {
|
||||
const xoObject = xapiObjectToXo(xapiObject)
|
||||
if (!xoObject) {
|
||||
return
|
||||
}
|
||||
|
||||
if (xoObject) {
|
||||
const prevId = xapiIdsToXo[xapiId]
|
||||
const currId = xoObject.id
|
||||
const xoId = xoObject.id
|
||||
xapiIdsToXo[xapiId] = xoId
|
||||
|
||||
if (prevId !== currId) {
|
||||
// If there was a previous XO object for this XAPI object
|
||||
// (with a different id), removes it.
|
||||
if (prevId) {
|
||||
objects.unset(prevId)
|
||||
}
|
||||
|
||||
xapiIdsToXo[xapiId] = currId
|
||||
}
|
||||
|
||||
objects.set(xoObject)
|
||||
const previous = objects.get(xoId, undefined)
|
||||
if (
|
||||
previous &&
|
||||
previous._xapiRef !== xapiObject.$ref
|
||||
) {
|
||||
(
|
||||
conflicts[xoId] ||
|
||||
(conflicts[xoId] = createRawObject())
|
||||
)[conId] = xoObject
|
||||
} else {
|
||||
objects.set(xoId, xoObject)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('ERROR: xapiObjectToXo', error)
|
||||
@@ -736,16 +742,33 @@ export default class Xo extends EventEmitter {
|
||||
})
|
||||
}
|
||||
|
||||
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry) {
|
||||
const {_objects: objects} = this
|
||||
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry, conId) {
|
||||
const conflicts = this._xenObjConflicts
|
||||
const objects = this._objects
|
||||
|
||||
forEach(xapiObjects, (_, xapiId) => {
|
||||
toRetry && delete toRetry[xapiId]
|
||||
|
||||
const xoId = xapiIdsToXo[xapiId]
|
||||
if (!xoId) {
|
||||
// This object was not known previously.
|
||||
return
|
||||
}
|
||||
|
||||
if (xoId) {
|
||||
delete xapiIdsToXo[xapiId]
|
||||
delete xapiIdsToXo[xapiId]
|
||||
|
||||
const objConflicts = conflicts[xoId]
|
||||
if (objConflicts) {
|
||||
if (objConflicts[conId]) {
|
||||
delete objConflicts[conId]
|
||||
} else {
|
||||
objects.set(xoId, popProperty(objConflicts))
|
||||
}
|
||||
|
||||
if (isEmpty(objConflicts)) {
|
||||
delete conflicts[xoId]
|
||||
}
|
||||
} else {
|
||||
objects.unset(xoId)
|
||||
}
|
||||
})
|
||||
@@ -764,7 +787,9 @@ export default class Xo extends EventEmitter {
|
||||
})
|
||||
|
||||
xapi.xo = (() => {
|
||||
// Maps ids of XAPI objects to ids of XO objecs.
|
||||
const conId = server.id
|
||||
|
||||
// Maps ids of XAPI objects to ids of XO objects.
|
||||
const xapiIdsToXo = createRawObject()
|
||||
|
||||
// Map of XAPI objects which failed to be transformed to XO
|
||||
@@ -776,10 +801,10 @@ export default class Xo extends EventEmitter {
|
||||
let toRetryNext = createRawObject()
|
||||
|
||||
const onAddOrUpdate = objects => {
|
||||
this._onXenAdd(objects, xapiIdsToXo, toRetryNext)
|
||||
this._onXenAdd(objects, xapiIdsToXo, toRetryNext, conId)
|
||||
}
|
||||
const onRemove = objects => {
|
||||
this._onXenRemove(objects, xapiIdsToXo, toRetry)
|
||||
this._onXenRemove(objects, xapiIdsToXo, toRetry, conId)
|
||||
}
|
||||
const onFinish = () => {
|
||||
if (xapi.pool) {
|
||||
|
||||
Reference in New Issue
Block a user