Compare commits

..

62 Commits

Author SHA1 Message Date
Julien Fontanet
28b9bbe54f 4.13.0-0 2016-02-04 18:01:04 +01:00
Julien Fontanet
bf6bd7cbdc Merge pull request #230 from vatesfr/pierre-vm-migration-details
Fix intra-pool migration on different SRs
2016-02-04 17:30:14 +01:00
Pierre
ddcb2468a6 Minor fixes 2016-02-04 17:26:27 +01:00
Pierre
f048b58935 Fix intra-pool migration on different SRs 2016-02-04 17:17:09 +01:00
Julien Fontanet
09f6200c2e Merge pull request #209 from vatesfr/abhamonr-checksum-verification-delta-backup
Create and verify checksums for VDI delta backups
2016-02-04 16:12:54 +01:00
wescoeur
354692fb06 Add checksum verification for delta backup on restore/merge. (fix vatesfr/xo-web#617) 2016-02-04 15:22:14 +01:00
Julien Fontanet
2c5858c2e0 Merge pull request #228 from vatesfr/pierre-vm-migration-details
Fix default migration
2016-02-04 15:18:31 +01:00
Pierre
1f41fd0436 Better handle of undefined maps 2016-02-04 11:36:04 +01:00
Pierre
e0bbefdfae Fix default migration 2016-02-04 11:02:43 +01:00
Julien Fontanet
bc6fbb2797 Xo#registerPlugin(): log errors. 2016-02-04 10:32:36 +01:00
Julien Fontanet
b579cf8128 Merge pull request #227 from vatesfr/https-redirect
Can redirect to HTTPs (fix vatesfr/xo-web#626).
2016-02-04 09:55:12 +01:00
Julien Fontanet
a94ed014b7 sample config: add redirectToHttps. 2016-02-04 09:52:37 +01:00
Julien Fontanet
0db991b668 Can redirect to HTTPs. 2016-02-03 17:39:39 +01:00
Julien Fontanet
347ced6942 Merge pull request #214 from vatesfr/better-https
Better https (fix vatesfr/xo-web#685)
2016-02-03 14:32:25 +01:00
Olivier Lambert
5d7a775b2b Merge pull request #225 from vatesfr/xo-acl-resolver
Use xo-acl-resolver.
2016-02-03 14:29:31 +01:00
Julien Fontanet
df732ab4bf Merge pull request #216 from vatesfr/vdi-snapshot-type
VDI-snapshot type
2016-02-03 14:14:42 +01:00
Fabrice Marsaud
31cd3953d6 Fixing VM object properties 2016-02-03 13:56:15 +01:00
Julien Fontanet
4666b13892 Use xo-acl-resolver. 2016-02-03 11:47:02 +01:00
Julien Fontanet
37d7ddb4b0 Merge pull request #224 from vatesfr/pierre-vm-migration-details
Custom VM migration (See vatesfr/xo-web#567)
2016-02-03 11:39:53 +01:00
Fabrice Marsaud
3abbaeb44b resolving VDI snapshots 2016-02-03 10:38:09 +01:00
Fabrice Marsaud
847ea49042 VDI-snapshot type 2016-02-03 09:57:42 +01:00
Julien Fontanet
779068c2ee HTTP security: use Helmet. 2016-02-02 20:49:33 +01:00
Julien Fontanet
140cd6882d Allows full TLS config. 2016-02-02 20:45:06 +01:00
Julien Fontanet
2e295c2391 Merge pull request #213 from vatesfr/fix-cpu-weight
Fixed cpuWeight removal for default
2016-02-02 10:24:32 +01:00
Fabrice Marsaud
596b0995f4 Prepare object values for xapi 2016-02-02 10:11:39 +01:00
Fabrice Marsaud
b61fe97893 Fixed cpuWeight removal for default 2016-02-02 09:55:40 +01:00
Julien Fontanet
209aa2ebe6 Add a TODO. 2016-02-01 17:14:05 +01:00
Julien Fontanet
c03a0e857e Merge pull request #188 from vatesfr/olivierlambert-cpu-weight
Ability to set vCPU weight
2016-02-01 16:00:21 +01:00
Olivier Lambert
2854d698e6 Implement vCPU weight 2016-02-01 15:56:36 +01:00
Pierre
944163be0e Bug fix: VDIs should be on chosen SRs. 2016-01-29 10:07:02 +01:00
Julien Fontanet
269a9eaff0 Xapi: small but important fix concerning imports. 2016-01-28 17:22:44 +01:00
Olivier Lambert
7f9c49cbc4 Merge pull request #208 from vatesfr/pierre-vm-migration-details
Custom VM migration. (See vatesfr/xo-web#567)
2016-01-28 17:02:28 +01:00
Julien Fontanet
2b6bfeeb15 Merge pull request #212 from vatesfr/contrep-better-snapshot-names
continous replication: clearer VM snapshot names.
2016-01-28 16:05:03 +01:00
Julien Fontanet
fa9742bc92 continous replication: clearer VM snapshot names. 2016-01-28 15:57:25 +01:00
Pierre
472e419abc Using forEach instead of for. Minor fixes. 2016-01-28 13:33:43 +01:00
Pierre
169d11387b Custom VM migration (See vatesfr/xo-web#567)
Optional parameters for migratePool:
- Migration network
- Map each VDI to an SR on destination host
- Map each VIF to a network on destination host
2016-01-28 13:33:43 +01:00
Julien Fontanet
e59ac6d947 Fixes regarding #660. 2016-01-28 11:42:06 +01:00
Olivier Lambert
e193b45562 Merge pull request #207 from vatesfr/abhamonr-avoid-metadata-imp-exp-delta-backups
Avoid metadata import/export in delta backups. (fix vatesfr/xo-web#651)
2016-01-28 11:35:04 +01:00
wescoeur
1ac34f810e Avoid metadata import/export in delta backups. (fix vatesfr/xo-web#651) 2016-01-28 11:12:21 +01:00
Olivier Lambert
e65e5c6e5f Merge pull request #211 from vatesfr/marsaudf-clear-logs#661
Marsaudf clear logs#661
2016-01-28 10:59:37 +01:00
Fabrice Marsaud
af6365c76a logger.delete 2016-01-28 09:00:15 +01:00
Julien Fontanet
8c672b23b5 Merge pull request #159 from vatesfr/marsaudf-smb-mounts#338
Remotes refactoring + SMB implementation.
2016-01-27 11:24:52 +01:00
Fabrice Marsaud
3b53f5ac11 fixes 2016-01-27 10:58:16 +01:00
Fabrice Marsaud
ccdc744748 fixes 2016-01-27 10:08:59 +01:00
Fabrice Marsaud
261f0b4bf0 typo fix 2016-01-27 09:11:45 +01:00
Fabrice Marsaud
495b59c2e5 update dependency 2016-01-26 17:34:00 +01:00
Fabrice Marsaud
d6e1c13c39 Handler and remotes reworked 2016-01-26 17:28:27 +01:00
Fabrice Marsaud
f7f13b9e07 PR feedback 2 2016-01-26 09:47:47 +01:00
Fabrice Marsaud
62564d747f Errors moved from API to core 2016-01-25 17:29:18 +01:00
Fabrice Marsaud
1d5d59c4c0 Remote handler reworked 2016-01-25 17:01:14 +01:00
Fabrice Marsaud
e8380b8a12 PR feedback 2016-01-25 11:45:53 +01:00
Fabrice Marsaud
c304d9cc62 No vdi merge through smb 2016-01-25 11:45:53 +01:00
Fabrice Marsaud
aad4ebf287 Remote handlers refactored, and adding a smb handler 2016-01-25 11:45:53 +01:00
Olivier Lambert
6c2f48181c Merge pull request #210 from vatesfr/handle-objects-conflicts
Properly handle multiple XAPI objects with the same XO id.
2016-01-22 16:02:19 +01:00
Julien Fontanet
480b6ff7d6 Properly handle multiple XAPI objects with the same XO id.
When there is a conflict, the existing object keep the place but when
it is removed, the other object (which is in the waiting list) will
take the new place.
2016-01-22 15:57:44 +01:00
Julien Fontanet
4bdd6f972c Remove node-inspector. 2016-01-21 16:44:42 +01:00
Olivier Lambert
6674d8456a Merge pull request #206 from vatesfr/olivierlambert-fixMigration
Correctly use destination host SR and network
2016-01-20 18:31:33 +01:00
Olivier Lambert
d1478ff694 select the correct migration network 2016-01-20 18:12:37 +01:00
Julien Fontanet
cb20d46b74 Merge pull request #205 from vatesfr/abhamonr-fix-avoid-errors-delta-backups
Delta backups: Fix various issues.
2016-01-20 17:36:33 +01:00
Olivier Lambert
9dd2538043 correctly use destination host SR and network 2016-01-20 17:26:47 +01:00
wescoeur
f25136a512 Avoid errors in delta backups. (fix)
- Wait the task end of vdi export.
- Now, in the error case of vdi backup,
  the current vdi snapshot is removed with catch(noop).
2016-01-20 17:19:41 +01:00
Julien Fontanet
03eb56ad2a Xapi#_updateObjectMapProperty(): do no hide remove errors. 2016-01-20 16:04:23 +01:00
23 changed files with 1239 additions and 665 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "xo-server",
"version": "4.12.1",
"version": "4.13.0-0",
"license": "AGPL-3.0",
"description": "Server part of Xen-Orchestra",
"keywords": [
@@ -34,6 +34,7 @@
"node": ">=0.12 <5"
},
"dependencies": {
"@marsaud/smb2-promise": "^0.1.0",
"app-conf": "^0.4.0",
"babel-runtime": "^5",
"base64url": "^1.0.5",
@@ -58,8 +59,9 @@
"get-stream": "^1.1.0",
"graceful-fs": "^4.1.2",
"hashy": "~0.4.2",
"helmet": "^1.1.0",
"highland": "^2.5.1",
"http-server-plus": "^0.5.1",
"http-server-plus": "^0.6.4",
"human-format": "^0.6.0",
"is-my-json-valid": "^2.12.2",
"jade": "^1.11.0",
@@ -85,9 +87,12 @@
"lodash.get": "^3.7.0",
"lodash.has": "^3.0.0",
"lodash.includes": "^3.1.1",
"lodash.invert": "^4.0.1",
"lodash.isarray": "^3.0.0",
"lodash.isboolean": "^3.0.2",
"lodash.isempty": "^3.0.0",
"lodash.isfunction": "^3.0.1",
"lodash.isinteger": "^4.0.0",
"lodash.isobject": "^3.0.0",
"lodash.isstring": "^3.0.0",
"lodash.keys": "^3.0.4",
@@ -95,6 +100,7 @@
"lodash.pick": "^3.0.0",
"lodash.sortby": "^3.1.4",
"lodash.startswith": "^3.0.1",
"lodash.trim": "^3.0.1",
"loud-rejection": "^1.2.0",
"make-error": "^1",
"micromatch": "^2.3.2",
@@ -105,16 +111,21 @@
"partial-stream": "0.0.0",
"passport": "^0.3.0",
"passport-local": "^1.0.0",
"promise-toolbox": "^0.1.0",
"proxy-http-request": "0.1.0",
"redis": "^2.0.1",
"schema-inspector": "^1.5.1",
"semver": "^5.1.0",
"serve-static": "^1.9.2",
"stack-chain": "^1.3.3",
"through2": "^2.0.0",
"trace": "^2.0.1",
"ws": "~0.8.0",
"xen-api": "^0.7.2",
"xml2js": "~0.4.6",
"xo-collection": "^0.4.0"
"xo-acl-resolver": "0.0.0-0",
"xo-collection": "^0.4.0",
"xo-remote-parser": "^0.1.0"
},
"devDependencies": {
"babel-eslint": "^4.0.10",
@@ -129,7 +140,6 @@
"leche": "^2.1.1",
"mocha": "^2.2.1",
"must": "^0.13.1",
"node-inspector": "^0.12.2",
"sinon": "^1.14.1",
"standard": "^5.2.1"
},

View File

@@ -66,6 +66,8 @@ http:
#socket: './http.sock'
# Basic HTTPS.
#
# You can find the list of possible options there https://nodejs.org/docs/latest/api/tls.html#tls.createServer
# -
# # The only difference is the presence of the certificate and the
# # key.
@@ -83,7 +85,7 @@ http:
# # certificate authority up to the root.
# #
# # Default: undefined
# certificate: './certificate.pem'
# cert: './certificate.pem'
# # File containing the private key (PEM format).
# #
@@ -93,6 +95,10 @@ http:
# # Default: undefined
# key: './key.pem'
# If set to true, all HTTP traffic will be redirected to the first
# HTTPs configuration.
#redirectToHttps: true
# List of files/directories which will be served.
mounts:
#'/': '/path/to/xo-web/dist/'

View File

@@ -1,123 +0,0 @@
// These global variables are not a problem because the algorithm is
// synchronous.
let permissionsByObject
let getObject
// -------------------------------------------------------------------
const authorized = () => true // eslint-disable-line no-unused-vars
const forbiddden = () => false // eslint-disable-line no-unused-vars
function and (...checkers) { // eslint-disable-line no-unused-vars
return function (object, permission) {
for (const checker of checkers) {
if (!checker(object, permission)) {
return false
}
}
return true
}
}
function or (...checkers) { // eslint-disable-line no-unused-vars
return function (object, permission) {
for (const checker of checkers) {
if (checker(object, permission)) {
return true
}
}
return false
}
}
// -------------------------------------------------------------------
function checkMember (memberName) {
return function (object, permission) {
const member = object[memberName]
return checkAuthorization(member, permission)
}
}
function checkSelf ({ id }, permission) {
const permissionsForObject = permissionsByObject[id]
return (
permissionsForObject &&
permissionsForObject[permission]
)
}
// ===================================================================
const checkAuthorizationByTypes = {
host: or(checkSelf, checkMember('$pool')),
message: checkMember('$object'),
network: or(checkSelf, checkMember('$pool')),
SR: or(checkSelf, checkMember('$pool')),
task: checkMember('$host'),
VBD: checkMember('VDI'),
// Access to a VDI is granted if the user has access to the
// containing SR or to a linked VM.
VDI (vdi, permission) {
// Check authorization for the containing SR.
if (checkAuthorization(vdi.$SR, permission)) {
return true
}
// Check authorization for each of the connected VMs.
for (const { VM: vm } of vdi.$VBDs) {
if (checkAuthorization(vm, permission)) {
return true
}
}
return false
},
VIF: or(checkMember('$network'), checkMember('$VM')),
VM: or(checkSelf, checkMember('$container')),
'VM-snapshot': checkMember('$snapshot_of'),
'VM-template': authorized
}
function checkAuthorization (objectId, permission) {
const object = getObject(objectId)
const checker = checkAuthorizationByTypes[object.type] || checkSelf
return checker(object, permission)
}
// -------------------------------------------------------------------
export default function (
permissionsByObject_,
getObject_,
permissions
) {
// Assign global variables.
permissionsByObject = permissionsByObject_
getObject = getObject_
try {
for (const [objectId, permission] of permissions) {
if (!checkAuthorization(objectId, permission)) {
return false
}
}
return true
} finally {
// Free the global variables.
permissionsByObject = getObject = null
}
}

View File

@@ -36,5 +36,5 @@ resize.params = {
}
resize.resolve = {
vdi: ['id', 'VDI', 'administrate']
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate']
}

View File

@@ -16,3 +16,13 @@ export async function get ({namespace}) {
}
get.description = 'returns logs list for one namespace'
function delete_ ({namespace, id}) {
const logger = this.getLogger(namespace)
logger.del(id)
}
delete_.description = 'deletes on or several logs from a namespace'
delete_.permission = 'admin'
export {delete_ as delete}

View File

@@ -5,7 +5,7 @@ export async function getAll () {
getAll.permission = 'admin'
getAll.description = 'Gets all existing fs remote points'
export async function get (id) {
export async function get ({id}) {
return await this.getRemote(id)
}
@@ -15,7 +15,7 @@ get.params = {
id: {type: 'string'}
}
export async function list (id) {
export async function list ({id}) {
return await this.listRemoteBackups(id)
}

View File

@@ -20,7 +20,7 @@ delete_.params = {
}
delete_.resolve = {
vdi: ['id', 'VDI', 'administrate'],
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
}
exports.delete = delete_
@@ -69,7 +69,7 @@ set.params = {
}
set.resolve = {
vdi: ['id', 'VDI', 'administrate'],
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
}
exports.set = set
@@ -89,7 +89,7 @@ migrate.params = {
}
migrate.resolve = {
vdi: ['id', 'VDI', 'administrate'],
vdi: ['id', ['VDI', 'VDI-snapshot'], 'administrate'],
sr: ['sr_id', 'SR', 'administrate'],
}

View File

@@ -191,69 +191,70 @@ exports.insertCd = insertCd
#---------------------------------------------------------------------
migrate = $coroutine ({vm, host}) ->
yield @getXapi(vm).migrateVm(vm._xapiId, @getXapi(host), host._xapiId)
return
migrate.params = {
# Identifier of the VM to migrate.
id: { type: 'string' }
# Identifier of the host to migrate to.
host_id: { type: 'string' }
}
migrate.resolve = {
vm: ['id', 'VM']
host: ['host_id', 'host', 'administrate']
}
exports.migrate = migrate
#---------------------------------------------------------------------
migratePool = $coroutine ({
migrate = $coroutine ({
vm,
host
sr
network
host,
mapVdisSrs,
mapVifsNetworks,
migrationNetwork
}) ->
permissions = []
if mapVdisSrs
mapVdisSrsXapi = {}
forEach mapVdisSrs, (srId, vdiId) =>
vdiXapiId = @getObject(vdiId, 'VDI')._xapiId
mapVdisSrsXapi[vdiXapiId] = @getObject(srId, 'SR')._xapiId
permissions.push([
srId,
'administrate'
])
if mapVifsNetworks
mapVifsNetworksXapi = {}
forEach mapVifsNetworks, (networkId, vifId) =>
vifXapiId = @getObject(vifId, 'VIF')._xapiId
mapVifsNetworksXapi[vifXapiId] = @getObject(networkId, 'network')._xapiId
permissions.push([
networkId,
'administrate'
])
unless yield @hasPermissions(@session.get('user_id'), permissions)
throw new Unauthorized()
yield @getXapi(vm).migrateVm(vm._xapiId, @getXapi(host), host._xapiId, {
migrationNetworkId: migrationNetwork?._xapiId
networkId: network?._xapiId,
srId: sr?._xapiId,
mapVifsNetworks: mapVifsNetworksXapi,
mapVdisSrs: mapVdisSrsXapi,
})
return
migratePool.params = {
migrate.params = {
# Identifier of the VM to migrate.
id: { type: 'string' }
vm: { type: 'string' }
# Identifier of the host to migrate to.
target_host_id: { type: 'string' }
targetHost: { type: 'string' }
# Identifier of the target SR
target_sr_id: { type: 'string', optional: true }
# Map VDIs IDs --> SRs IDs
mapVdisSrs: { type: 'object', optional: true }
# Identifier of the target Network
target_network_id: { type: 'string', optional: true }
# Map VIFs IDs --> Networks IDs
mapVifsNetworks: { type: 'object', optional: true }
# Identifier of the Network use for the migration
migration_network_id: { type: 'string', optional: true }
migrationNetwork: { type: 'string', optional: true }
}
migratePool.resolve = {
vm: ['id', 'VM', 'administrate'],
host: ['target_host_id', 'host', 'administrate'],
sr: ['target_sr_id', 'SR', 'administrate'],
network: ['target_network_id', 'network', 'administrate'],
migrationNetwork: ['migration_network_id', 'network', 'administrate'],
migrate.resolve = {
vm: ['vm', 'VM', 'administrate'],
host: ['targetHost', 'host', 'administrate'],
migrationNetwork: ['migrationNetwork', 'network', 'administrate'],
}
# TODO: camel case.
exports.migrate_pool = migratePool
exports.migrate = migrate
#---------------------------------------------------------------------
@@ -323,6 +324,9 @@ set = $coroutine (params) ->
else
yield xapi.call 'VM.remove_from_other_config', ref, 'auto_poweron'
if 'cpuWeight' of params
yield xapi.setVcpuWeight(VM._xapiId, params.cpuWeight)
# Other fields.
for param, fields of {
'name_label'
@@ -360,6 +364,8 @@ set.params = {
# Kernel arguments for PV VM.
PV_args: { type: 'string', optional: true }
cpuWeight: { type: 'integer', optional: true}
}
set.resolve = {
@@ -572,12 +578,13 @@ exports.rollingSnapshot = rollingSnapshot
#---------------------------------------------------------------------
backup = $coroutine ({vm, pathToFile, compress, onlyMetadata}) ->
yield @backupVm({vm, pathToFile, compress, onlyMetadata})
backup = $coroutine ({vm, remoteId, file, compress, onlyMetadata}) ->
yield @backupVm({vm, remoteId, file, compress, onlyMetadata})
backup.params = {
id: { type: 'string' }
pathToFile: { type: 'string' }
id: {type: 'string'}
remoteId: { type: 'string' }
file: { type: 'string' }
compress: { type: 'boolean', optional: true }
onlyMetadata: { type: 'boolean', optional: true }
}
@@ -615,14 +622,9 @@ exports.importBackup = importBackup
#---------------------------------------------------------------------
rollingBackup = $coroutine ({vm, remoteId, tag, depth, compress, onlyMetadata}) ->
remote = yield @getRemote remoteId
if not remote?.path?
throw new Error "No such Remote #{remoteId}"
if not remote.enabled
throw new Error "Backup remote #{remoteId} is disabled"
return yield @rollingBackupVm({
vm,
path: remote.path,
remoteId,
tag,
depth,
compress,

View File

@@ -7,6 +7,7 @@ import blocked from 'blocked'
import createExpress from 'express'
import eventToPromise from 'event-to-promise'
import has from 'lodash.has'
import helmet from 'helmet'
import includes from 'lodash.includes'
import isArray from 'lodash.isarray'
import isFunction from 'lodash.isfunction'
@@ -90,6 +91,8 @@ async function loadConfiguration () {
function createExpressApp () {
const app = createExpress()
app.use(helmet())
// Registers the cookie-parser and express-session middlewares,
// necessary for connect-flash.
app.use(cookieParser())
@@ -276,12 +279,18 @@ async function registerPlugins (xo) {
// ===================================================================
async function makeWebServerListen (opts) {
// Read certificate and key if necessary.
const {certificate, key} = opts
if (certificate && key) {
[opts.certificate, opts.key] = await Promise.all([
readFile(certificate),
async function makeWebServerListen ({
certificate,
// The properties was called `certificate` before.
cert = certificate,
key,
...opts
}) {
if (cert && key) {
[opts.cert, opts.key] = await Promise.all([
readFile(cert),
readFile(key)
])
}
@@ -607,6 +616,31 @@ export default async function main (args) {
// Express is used to manage non WebSocket connections.
const express = createExpressApp()
if (config.http.redirectToHttps) {
let port
forEach(config.http.listen, listen => {
if (
listen.port &&
(listen.cert || listen.certificate)
) {
port = listen.port
return false
}
})
if (port === undefined) {
warn('Could not setup HTTPs redirection: no HTTPs port found')
} else {
express.use((req, res, next) => {
if (req.secure) {
return next()
}
res.redirect(`https://${req.hostname}:${port}${req.originalUrl}`)
})
}
}
// Must be set up before the API.
setUpConsoleProxy(webServer, xo)

View File

@@ -1,4 +1,5 @@
import highland from 'highland'
import { forEach, noop } from '../utils'
// See: https://en.wikipedia.org/wiki/Syslog#Severity_level
const LEVELS = [
@@ -46,6 +47,19 @@ export default class LevelDbLogger {
return highland(this._db.createReadStream())
.filter(({value}) => value.namespace === this._namespace)
}
del (id) {
if (!Array.isArray(id)) {
id = [id]
}
forEach(id, id => {
this._db.get(id, (err, value) => {
if (!err && value.namespace === this._namespace) {
this._db.del(id, noop)
}
})
})
}
}
// Create high level log methods.

View File

@@ -1,6 +1,8 @@
import Collection from '../collection/redis'
import Model from '../model'
import { forEach } from '../utils'
import {
forEach
} from '../utils'
// ===================================================================

View File

@@ -1,140 +0,0 @@
import filter from 'lodash.filter'
import fs from 'fs-promise'
import {exec} from 'child_process'
import {
forEach,
noop,
promisify
} from './utils'
const execAsync = promisify(exec)
class NfsMounter {
async _loadRealMounts () {
let stdout
try {
[stdout] = await execAsync('findmnt -P -t nfs,nfs4 --output SOURCE,TARGET --noheadings')
} catch (exc) {
// When no mounts are found, the call pretends to fail...
}
const mounted = {}
if (stdout) {
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
forEach(stdout.split('\n'), m => {
if (m) {
const match = regex.exec(m)
mounted[match[3]] = {
host: match[1],
share: match[2]
}
}
})
}
this._realMounts = mounted
return mounted
}
_fullPath (path) {
return path
}
_matchesRealMount (mount) {
return this._fullPath(mount.path) in this._realMounts
}
async _mount (mount) {
const path = this._fullPath(mount.path)
await fs.ensureDir(path)
return await execAsync(`mount -t nfs ${mount.host}:${mount.share} ${path}`)
}
async forget (mount) {
try {
await this._umount(mount)
} catch (_) {
// We have to go on...
}
}
async _umount (mount) {
const path = this._fullPath(mount.path)
await execAsync(`umount ${path}`)
}
async sync (mount) {
await this._loadRealMounts()
if (this._matchesRealMount(mount) && !mount.enabled) {
try {
await this._umount(mount)
} catch (exc) {
mount.enabled = true
mount.error = exc.message
}
} else if (!this._matchesRealMount(mount) && mount.enabled) {
try {
await this._mount(mount)
} catch (exc) {
mount.enabled = false
mount.error = exc.message
}
}
return mount
}
async disableAll (mounts) {
await this._loadRealMounts()
forEach(mounts, async mount => {
if (this._matchesRealMount(mount)) {
try {
await this._umount(mount)
} catch (_) {
// We have to go on...
}
}
})
}
}
class LocalHandler {
constructor () {
this.forget = noop
this.disableAll = noop
}
async sync (local) {
if (local.enabled) {
try {
await fs.ensureDir(local.path)
await fs.access(local.path, fs.R_OK | fs.W_OK)
} catch (exc) {
local.enabled = false
local.error = exc.message
}
}
return local
}
}
export default class RemoteHandler {
constructor () {
this.handlers = {
nfs: new NfsMounter(),
local: new LocalHandler()
}
}
async sync (remote) {
return await this.handlers[remote.type].sync(remote)
}
async forget (remote) {
return await this.handlers[remote.type].forget(remote)
}
async disableAll (remotes) {
const promises = []
forEach(['local', 'nfs'], type => promises.push(this.handlers[type].disableAll(filter(remotes, remote => remote.type === type))))
await Promise.all(promises)
}
}

View File

@@ -0,0 +1,177 @@
import eventToPromise from 'event-to-promise'
import getStream from 'get-stream'
import through2 from 'through2'
import {
parse
} from 'xo-remote-parser'
import {
addChecksumToReadStream,
noop,
validChecksumOfReadStream
} from '../utils'
export default class RemoteHandlerAbstract {
constructor (remote) {
this._remote = parse({...remote})
if (this._remote.type !== this.type) {
throw new Error('Incorrect remote type')
}
}
get type () {
throw new Error('Not implemented')
}
/**
* Asks the handler to sync the state of the effective remote with its' metadata
*/
async sync () {
return this._sync()
}
async _sync () {
throw new Error('Not implemented')
}
/**
* Free the resources possibly dedicated to put the remote at work, when it is no more needed
*/
async forget () {
return this._forget()
}
async _forget () {
throw new Error('Not implemented')
}
async outputFile (file, data, options) {
return this._outputFile(file, data, options)
}
async _outputFile (file, data, options) {
const stream = await this.createOutputStream(file)
const promise = eventToPromise(stream, 'finish')
stream.end(data)
return promise
}
async readFile (file, options) {
return this._readFile(file, options)
}
async _readFile (file, options) {
return getStream(await this.createReadStream(file, options))
}
async rename (oldPath, newPath) {
return this._rename(oldPath, newPath)
}
async _rename (oldPath, newPath) {
throw new Error('Not implemented')
}
async list (dir = '.') {
return this._list(dir)
}
async _list (dir) {
throw new Error('Not implemented')
}
async createReadStream (file, {
checksum = false,
ignoreMissingChecksum = false,
...options
} = {}) {
const streamP = this._createReadStream(file, options).then(async stream => {
await eventToPromise(stream, 'readable')
if (stream.length === undefined) {
stream.length = await this.getSize(file).catch(noop)
}
return stream
})
if (!checksum) {
return streamP
}
try {
checksum = await this.readFile(`${file}.checksum`)
} catch (error) {
if (error.code === 'ENOENT' && ignoreMissingChecksum) {
return streamP
}
throw error
}
let stream = await streamP
const { length } = stream
stream = validChecksumOfReadStream(stream, checksum.toString())
stream.length = length
return stream
}
async _createReadStream (file, options) {
throw new Error('Not implemented')
}
async createOutputStream (file, {
checksum = false,
...options
} = {}) {
const streamP = this._createOutputStream(file, options)
if (!checksum) {
return streamP
}
const connectorStream = through2()
const forwardError = error => {
connectorStream.emit('error', error)
}
const streamWithChecksum = addChecksumToReadStream(connectorStream)
streamWithChecksum.pipe(await streamP)
streamWithChecksum.on('error', forwardError)
streamWithChecksum.checksum
.then(value => this.outputFile(`${file}.checksum`, value))
.catch(forwardError)
return connectorStream
}
async _createOutputStream (file, options) {
throw new Error('Not implemented')
}
async unlink (file, {
checksum = false
} = {}) {
if (checksum) {
this._unlink(`${file}.checksum`).catch(noop)
}
return this._unlink(file)
}
async _unlink (file) {
throw new Error('Not implemented')
}
async getSize (file) {
return this._getSize(file)
}
async _getSize (file) {
throw new Error('Not implemented')
}
}

View File

@@ -0,0 +1,84 @@
import fs from 'fs-promise'
import startsWith from 'lodash.startswith'
import {
dirname,
resolve
} from 'path'
import RemoteHandlerAbstract from './abstract'
import {
noop
} from '../utils'
export default class LocalHandler extends RemoteHandlerAbstract {
get type () {
return 'local'
}
_getFilePath (file) {
const parts = [this._remote.path]
if (file) {
parts.push(file)
}
const path = resolve.apply(null, parts)
if (!startsWith(path, this._remote.path)) {
throw new Error('Remote path is unavailable')
}
return path
}
async _sync () {
if (this._remote.enabled) {
try {
await fs.ensureDir(this._remote.path)
await fs.access(this._remote.path, fs.R_OK | fs.W_OK)
} catch (exc) {
this._remote.enabled = false
this._remote.error = exc.message
}
}
return this._remote
}
async _forget () {
return noop()
}
async _outputFile (file, data, options) {
const path = this._getFilePath(file)
await fs.ensureDir(dirname(path))
await fs.writeFile(this._getFilePath(file), data, options)
}
async _readFile (file, options) {
return fs.readFile(this._getFilePath(file), options)
}
async _rename (oldPath, newPath) {
return fs.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
}
async _list (dir = '.') {
return fs.readdir(this._getFilePath(dir))
}
async _createReadStream (file, options) {
return fs.createReadStream(this._getFilePath(file), options)
}
async _createOutputStream (file, options) {
const path = this._getFilePath(file)
await fs.ensureDir(dirname(path))
return fs.createWriteStream(path, options)
}
async _unlink (file) {
return fs.unlink(this._getFilePath(file))
}
async _getSize (file) {
const stats = await fs.stat(this._getFilePath(file))
return stats.size
}
}

View File

@@ -0,0 +1,77 @@
import execa from 'execa'
import fs from 'fs-promise'
import LocalHandler from './local'
import {
forEach
} from '../utils'
export default class NfsHandler extends LocalHandler {
get type () {
return 'nfs'
}
async _loadRealMounts () {
let stdout
const mounted = {}
try {
[stdout] = await execa('findmnt', ['-P', '-t', 'nfs,nfs4', '--output', 'SOURCE,TARGET', '--noheadings'])
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
forEach(stdout.split('\n'), m => {
if (m) {
const match = regex.exec(m)
mounted[match[3]] = {
host: match[1],
share: match[2]
}
}
})
} catch (exc) {
// When no mounts are found, the call pretends to fail...
}
this._realMounts = mounted
return mounted
}
_matchesRealMount (remote) {
return remote.path in this._realMounts
}
async _mount (remote) {
await fs.ensureDir(remote.path)
return execa('mount', ['-t', 'nfs', `${remote.host}:${remote.share}`, remote.path])
}
async _sync () {
await this._loadRealMounts()
if (this._matchesRealMount(this._remote) && !this._remote.enabled) {
try {
await this._umount(this._remote)
} catch (exc) {
this._remote.enabled = true
this._remote.error = exc.message
}
} else if (!this._matchesRealMount(this._remote) && this._remote.enabled) {
try {
await this._mount(this._remote)
} catch (exc) {
this._remote.enabled = false
this._remote.error = exc.message
}
}
return this._remote
}
async _forget () {
try {
await this._umount(this._remote)
} catch (_) {
// We have to go on...
}
}
async _umount (remote) {
await execa('umount', [remote.path])
}
}

144
src/remote-handlers/smb.js Normal file
View File

@@ -0,0 +1,144 @@
import Smb2 from '@marsaud/smb2-promise'
import RemoteHandlerAbstract from './abstract'
import {
noop
} from '../utils'
export default class SmbHandler extends RemoteHandlerAbstract {
constructor (remote) {
super(remote)
this._forget = noop
}
get type () {
return 'smb'
}
_getClient (remote) {
return new Smb2({
share: `\\\\${remote.host}`,
domain: remote.domain,
username: remote.username,
password: remote.password,
autoCloseTimeout: 0
})
}
_getFilePath (file) {
if (file === '.') {
file = undefined
}
const parts = []
if (this._remote.path !== '') {
parts.push(this._remote.path)
}
if (file) {
parts.push(file.split('/'))
}
return parts.join('\\')
}
_dirname (file) {
const parts = file.split('\\')
parts.pop()
return parts.join('\\')
}
async _sync () {
if (this._remote.enabled) {
try {
// Check access (smb2 does not expose connect in public so far...)
await this.list()
} catch (error) {
this._remote.enabled = false
this._remote.error = error.message
}
}
return this._remote
}
async _outputFile (file, data, options) {
const client = this._getClient(this._remote)
const path = this._getFilePath(file)
const dir = this._dirname(path)
try {
if (dir) {
await client.ensureDir(dir)
}
return client.writeFile(path, data, options)
} finally {
client.close()
}
}
async _readFile (file, options) {
const client = this._getClient(this._remote)
try {
return client.readFile(this._getFilePath(file), options)
} finally {
client.close()
}
}
async _rename (oldPath, newPath) {
const client = this._getClient(this._remote)
try {
return client.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
} finally {
client.close()
}
}
async _list (dir = '.') {
const client = this._getClient(this._remote)
try {
return client.readdir(this._getFilePath(dir))
} finally {
client.close()
}
}
async _createReadStream (file, options) {
const client = this._getClient(this._remote)
const stream = await client.createReadStream(this._getFilePath(file), options) // FIXME ensure that options are properly handled by @marsaud/smb2
stream.on('end', () => client.close())
return stream
}
async _createOutputStream (file, options) {
const client = this._getClient(this._remote)
const path = this._getFilePath(file)
const dir = this._dirname(path)
let stream
try {
if (dir) {
await client.ensureDir(dir)
}
stream = await client.createWriteStream(path, options) // FIXME ensure that options are properly handled by @marsaud/smb2
} catch (err) {
client.close()
throw err
}
stream.on('finish', () => client.close())
return stream
}
async _unlink (file) {
const client = this._getClient(this._remote)
try {
return client.unlink(this._getFilePath(file))
} finally {
client.close()
}
}
async _getSize (file) {
const client = await this._getClient(this._remote)
try {
return client.getSize(this._getFilePath(file))
} finally {
client.close()
}
}
}

View File

@@ -1,15 +1,22 @@
import base64url from 'base64url'
import eventToPromise from 'event-to-promise'
import forEach from 'lodash.foreach'
import has from 'lodash.has'
import humanFormat from 'human-format'
import invert from 'lodash.invert'
import isArray from 'lodash.isarray'
import isString from 'lodash.isstring'
import kindOf from 'kindof'
import multiKeyHashInt from 'multikey-hash'
import xml2js from 'xml2js'
import { defer } from 'promise-toolbox'
import {promisify} from 'bluebird'
import {randomBytes} from 'crypto'
import {
createHash,
randomBytes
} from 'crypto'
import { Readable } from 'stream'
import through2 from 'through2'
import {utcFormat as d3TimeFormat} from 'd3-time-format'
// ===================================================================
@@ -50,6 +57,90 @@ export const createRawObject = Object.create
// -------------------------------------------------------------------
const ALGORITHM_TO_ID = {
md5: '1',
sha256: '5',
sha512: '6'
}
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// Wrap a readable stream in a stream with a checksum promise
// attribute which is resolved at the end of an input stream.
// (Finally .checksum contains the checksum of the input stream)
//
// Example:
// const sourceStream = ...
// const targetStream = ...
// const checksumStream = addChecksumToReadStream(sourceStream)
// await Promise.all([
// eventToPromise(checksumStream.pipe(targetStream), 'finish'),
// checksumStream.checksum.then(console.log)
// ])
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithm}`)
}
const hash = createHash(algorithm)
const { promise, resolve } = defer()
const wrapper = stream.pipe(through2(
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
resolve(hash.digest('hex'))
callback()
}
))
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
return wrapper
}
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1))
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithmId}`)
}
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
const checksum = `$${algorithmId}$$${hash.digest('hex')}`
callback(
checksum !== expectedChecksum
? new Error(`Bad checksum (${checksum}), expected: ${expectedChecksum}`)
: null
)
}
))
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksumVerified = eventToPromise(wrapper, 'end')
return wrapper
}
// -------------------------------------------------------------------
// Ensure the value is an array, wrap it if necessary.
export function ensureArray (value) {
if (value === undefined) {
@@ -289,6 +380,19 @@ export function parseSize (size) {
// -------------------------------------------------------------------
const _has = Object.prototype.hasOwnProperty
// Removes an own property from an object and returns its value.
export const popProperty = obj => {
for (const prop in obj) {
if (_has.call(obj, prop)) {
return extractProperty(obj, prop)
}
}
}
// -------------------------------------------------------------------
// Format a date in ISO 8601 in a safe way to be used in filenames
// (even on Windows).
export const safeDateFormat = d3TimeFormat('%Y%m%dT%H%M%SZ')

View File

@@ -1,4 +1,3 @@
import includes from 'lodash.includes'
import isArray from 'lodash.isarray'
import {
@@ -228,7 +227,6 @@ const TRANSFORMS = {
other: otherConfig,
os_version: guestMetrics && guestMetrics.os_version || null,
power_state: obj.power_state,
snapshot_time: toTimestamp(obj.snapshot_time),
snapshots: link(obj, 'snapshots'),
tags: obj.tags,
VIFs: link(obj, 'VIFs'),
@@ -277,6 +275,7 @@ const TRANSFORMS = {
} else if (obj.is_a_snapshot) {
vm.type += '-snapshot'
vm.snapshot_time = toTimestamp(obj.snapshot_time)
vm.$snapshot_of = link(obj, 'snapshot_of')
} else if (obj.is_a_template) {
vm.type += '-template'
@@ -307,8 +306,10 @@ const TRANSFORMS = {
})(),
install_repository: otherConfig['install-repository']
}
} else if (includes(obj.current_operations, 'migrate_send')) {
vm.id = obj.$ref
}
if (obj.VCPUs_params && obj.VCPUs_params.weight) {
vm.cpuWeight = obj.VCPUs_params.weight
}
if (!isHvm) {
@@ -325,9 +326,12 @@ const TRANSFORMS = {
type: 'SR',
content_type: obj.content_type,
// TODO: Should it replace usage?
physical_usage: +obj.physical_utilisation,
name_description: obj.name_description,
name_label: obj.name_label,
physical_usage: +obj.physical_utilisation,
size: +obj.physical_size,
SR_type: obj.type,
tags: obj.tags,
@@ -384,27 +388,32 @@ const TRANSFORMS = {
// -----------------------------------------------------------------
// TODO: should we have a VDI-snapshot type like we have with VMs?
vdi (obj) {
if (!obj.managed) {
return
}
return {
const vdi = {
type: 'VDI',
name_description: obj.name_description,
name_label: obj.name_label,
size: +obj.virtual_size,
snapshots: link(obj, 'snapshots'),
snapshot_time: toTimestamp(obj.snapshot_time),
tags: obj.tags,
usage: +obj.physical_utilisation,
$snapshot_of: link(obj, 'snapshot_of'),
$SR: link(obj, 'SR'),
$VBDs: link(obj, 'VBDs')
}
if (obj.is_a_snapshot) {
vdi.type += '-snapshot'
vdi.snapshot_time = toTimestamp(obj.snapshot_time)
vdi.$snapshot_of = link(obj, 'snapshot_of')
}
return vdi
},
// -----------------------------------------------------------------

View File

@@ -3,8 +3,12 @@ import every from 'lodash.every'
import fatfs from 'fatfs'
import fatfsBuffer, { init as fatfsBufferInit } from './fatfs-buffer'
import find from 'lodash.find'
import isBoolean from 'lodash.isboolean'
import includes from 'lodash.includes'
// import isFinite from 'lodash.isfinite'
import isFunction from 'lodash.isfunction'
import isInteger from 'lodash.isinteger'
import isObject from 'lodash.isobject'
import pick from 'lodash.pick'
import sortBy from 'lodash.sortby'
import unzip from 'julien-f-unzip'
@@ -19,6 +23,8 @@ import {
debounce,
deferrable
} from './decorators'
import { satisfies as versionSatisfies } from 'semver'
import {
bufferToStream,
camelToSnakeCase,
@@ -89,7 +95,7 @@ const put = (stream, {
})
}
return promise.readAll
return promise.readAll()
}
return makeRequest().readAll()
@@ -106,6 +112,12 @@ const asInteger = value => String(value)
const filterUndefineds = obj => pick(obj, value => value !== undefined)
const prepareXapiParam = param => {
// if (isFinite(param) && !isInteger(param)) { return asFloat(param) }
if (isInteger(param)) { return asInteger(param) }
if (isBoolean(param)) { return asBoolean(param) }
if (isObject(param)) { return map(filterUndefineds(param), prepareXapiParam) }
}
// ===================================================================
const typeToNamespace = createRawObject()
@@ -352,12 +364,11 @@ export default class Xapi extends XapiBase {
await Promise.all(mapToArray(values, (value, name) => {
if (value !== undefined) {
name = camelToSnakeCase(name)
const removal = this.call(remove, ref, name).catch(noop)
const removal = this.call(remove, ref, name)
return value === null
? removal
: removal.catch(noop).then(() => this.call(add, ref, name, value))
: removal.catch(noop).then(() => this.call(add, ref, name, prepareXapiParam(value)))
}
}))
}
@@ -1259,9 +1270,16 @@ export default class Xapi extends XapiBase {
// Create a snapshot of the VM and returns a delta export object.
@deferrable.onFailure
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined) {
async exportDeltaVm ($onFailure, vmId, baseVmId = undefined, {
snapshotNameLabel = undefined
} = {}) {
const vm = await this.snapshotVm(vmId)
$onFailure(() => this._deleteVm(vm, true))
if (snapshotNameLabel) {
this._setObjectProperties(vm, {
nameLabel: snapshotNameLabel
}).catch(noop)
}
const baseVm = baseVmId && this.getObject(baseVmId)
@@ -1322,6 +1340,7 @@ export default class Xapi extends XapiBase {
// TODO: make non-enumerable?
streams: await streams::pAll(),
version: '1.0.0',
vbds,
vdis,
vifs,
@@ -1341,8 +1360,15 @@ export default class Xapi extends XapiBase {
async importDeltaVm ($onFailure, delta, {
deleteBase = false,
name_label = delta.vm.name_label,
srId = this.pool.default_SR
srId = this.pool.default_SR,
disableStartAfterImport = true
} = {}) {
const { version } = delta
if (!versionSatisfies(version, '^1')) {
throw new Error(`Unsupported delta backup version: ${version}`)
}
const remoteBaseVmUuid = delta.vm.other_config[TAG_BASE_DELTA]
let baseVm
if (remoteBaseVmUuid) {
@@ -1447,7 +1473,11 @@ export default class Xapi extends XapiBase {
// Import VDI contents.
Promise.all(mapToArray(
newVdis,
(vdi, id) => this._importVdiContent(vdi, streams[`${id}.vhd`], VDI_FORMAT_VHD)
async (vdi, id) => {
for (const stream of ensureArray(streams[`${id}.vhd`])) {
await this._importVdiContent(vdi, stream, VDI_FORMAT_VHD)
}
}
)),
// Wait for VDI export tasks (if any) termination.
@@ -1472,23 +1502,42 @@ export default class Xapi extends XapiBase {
this._setObjectProperties(vm, {
name_label
}),
// FIXME: move
this._updateObjectMapProperty(vm, 'blocked_operations', {
start: 'Do not start this VM, clone it if you want to use it.' // FIXME: move
start: disableStartAfterImport
? 'Do not start this VM, clone it if you want to use it.'
: null
})
])
return vm
}
async _migrateVMWithStorageMotion (vm, hostXapi, host, {
async _migrateVmWithStorageMotion (vm, hostXapi, host, {
migrationNetwork = find(host.$PIFs, pif => pif.management).$network, // TODO: handle not found
sr = host.$pool.$default_SR, // TODO: handle not found
vifsMap = {}
mapVdisSrs,
mapVifsNetworks
}) {
// VDIs/SRs mapping
const vdis = {}
const defaultSrRef = host.$pool.$default_SR.$ref
for (const vbd of vm.$VBDs) {
if (vbd.type !== 'CD') {
vdis[vbd.$VDI.$ref] = sr.$ref
const vdi = vbd.$VDI
if (vbd.type === 'Disk') {
vdis[vdi.$ref] = mapVdisSrs && mapVdisSrs[vdi.$id]
? hostXapi.getObject(mapVdisSrs[vdi.$id]).$ref
: defaultSrRef
}
}
// VIFs/Networks mapping
let vifsMap = {}
if (vm.$pool !== host.$pool) {
const defaultNetworkRef = find(host.$PIFs, pif => pif.management).$network.$ref
for (const vif of vm.$VIFs) {
vifsMap[vif.$ref] = mapVifsNetworks && mapVifsNetworks[vif.$id]
? hostXapi.getObject(mapVifsNetworks[vif.$id]).$ref
: defaultNetworkRef
}
}
@@ -1572,8 +1621,8 @@ export default class Xapi extends XapiBase {
async migrateVm (vmId, hostXapi, hostId, {
migrationNetworkId,
networkId,
srId
mapVifsNetworks,
mapVdisSrs
} = {}) {
const vm = this.getObject(vmId)
if (!isVmRunning(vm)) {
@@ -1586,25 +1635,15 @@ export default class Xapi extends XapiBase {
const useStorageMotion = (
accrossPools ||
migrationNetworkId ||
networkId ||
srId
mapVifsNetworks ||
mapVdisSrs
)
if (useStorageMotion) {
const vifsMap = {}
if (accrossPools || networkId) {
const {$ref: networkRef} = networkId
? this.getObject(networkId)
: find(host.$PIFs, pif => pif.management).$network
for (const vif of vm.$VIFs) {
vifsMap[vif.$ref] = networkRef
}
}
await this._migrateVMWithStorageMotion(vm, hostXapi, host, {
migrationNetwork: migrationNetworkId && this.getObject(migrationNetworkId),
sr: srId && this.getObject(srId),
vifsMap
await this._migrateVmWithStorageMotion(vm, hostXapi, host, {
migrationNetwork: migrationNetworkId && hostXapi.getObject(migrationNetworkId),
mapVdisSrs,
mapVifsNetworks
})
} else {
try {
@@ -1615,7 +1654,7 @@ export default class Xapi extends XapiBase {
}
// Retry using motion storage.
await this._migrateVMWithStorageMotion(vm, hostXapi, host, {})
await this._migrateVmWithStorageMotion(vm, hostXapi, host, {})
}
}
}
@@ -1629,6 +1668,12 @@ export default class Xapi extends XapiBase {
)
}
async setVcpuWeight (vmId, weight) {
weight = weight || null // Take all falsy values as a removal (0 included)
const vm = this.getObject(vmId)
await this._updateObjectMapProperty(vm, 'VCPUs_params', {weight})
}
_startVm (vm) {
debug(`Starting VM ${vm.name_label}`)
@@ -2070,6 +2115,7 @@ export default class Xapi extends XapiBase {
const task = this._watchTask(taskRef)
await Promise.all([
stream.checksumVerified,
task,
put(stream, {
hostname: host.address,

View File

@@ -6,21 +6,11 @@ import filter from 'lodash.filter'
import findIndex from 'lodash.findindex'
import sortBy from 'lodash.sortby'
import startsWith from 'lodash.startswith'
import {
createReadStream,
createWriteStream,
ensureDir,
readdir,
readFile,
rename,
stat,
unlink,
writeFile
} from 'fs-promise'
import {
basename,
dirname
} from 'path'
import { satisfies as versionSatisfies } from 'semver'
import xapiObjectToXo from '../xapi-object-to-xo'
import {
@@ -39,6 +29,9 @@ import {
// ===================================================================
const DELTA_BACKUP_EXT = '.json'
const DELTA_BACKUP_EXT_LENGTH = DELTA_BACKUP_EXT.length
// Test if a file is a vdi backup. (full or delta)
const isVdiBackup = name => /^\d+T\d+Z_(?:full|delta)\.vhd$/.test(name)
@@ -48,7 +41,27 @@ const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name)
// Get the timestamp of a vdi backup. (full or delta)
const getVdiTimestamp = name => {
const arr = /^(\d+T\d+Z)_(?:full|delta)\.vhd$/.exec(name)
return arr[1] || undefined
return arr[1]
}
const getDeltaBackupNameWithoutExt = name => name.slice(0, -DELTA_BACKUP_EXT_LENGTH)
const isDeltaBackup = name => endsWith(name, DELTA_BACKUP_EXT)
async function checkFileIntegrity (handler, name) {
let stream
try {
stream = await handler.createReadStream(name, { checksum: true })
} catch (error) {
if (error.code === 'ENOENT') {
return
}
throw error
}
stream.resume()
await eventToPromise(stream, 'finish')
}
// ===================================================================
@@ -59,57 +72,35 @@ export default class {
}
async listRemoteBackups (remoteId) {
const remote = await this._xo.getRemote(remoteId)
const path = remote.path
const handler = await this._xo.getRemoteHandler(remoteId)
// List backups. (Except delta backups)
const xvaFilter = file => endsWith(file, '.xva')
// List backups. (No delta)
const backupFilter = file => endsWith(file, '.xva')
const files = await readdir(path)
const backups = filter(files, xvaFilter)
const files = await handler.list()
const backups = filter(files, backupFilter)
// List delta backups.
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
for (const deltaDir of deltaDirs) {
const files = await readdir(`${path}/${deltaDir}`)
const deltaBackups = filter(files, xvaFilter)
const files = await handler.list(deltaDir)
const deltaBackups = filter(files, isDeltaBackup)
backups.push(...mapToArray(
deltaBackups,
deltaBackup => `${deltaDir}/${deltaBackup}`
deltaBackup => {
return `${deltaDir}/${getDeltaBackupNameWithoutExt(deltaBackup)}`
}
))
}
return backups
}
// TODO: move into utils and rename!
async _openAndwaitReadableFile (path, errorMessage) {
const stream = createReadStream(path)
try {
await eventToPromise(stream, 'readable')
} catch (error) {
if (error.code === 'ENOENT') {
throw new Error(errorMessage)
}
throw error
}
stream.length = (await stat(path)).size
return stream
}
async importVmBackup (remoteId, file, sr) {
const remote = await this._xo.getRemote(remoteId)
const path = `${remote.path}/${file}`
const stream = await this._openAndwaitReadableFile(
path,
'VM to import not found in this remote'
)
const handler = await this._xo.getRemoteHandler(remoteId)
const stream = await handler.createReadStream(file)
const xapi = this._xo.getXapi(sr)
await xapi.importVm(stream, { srId: sr._xapiId })
@@ -137,7 +128,9 @@ export default class {
// 2. Copy.
const dstVm = await (async () => {
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid)
const delta = await srcXapi.exportDeltaVm(srcVm.$id, localBaseUuid, {
snapshotNameLabel: `XO_DELTA_EXPORT: ${targetSr.name_label} (${targetSr.uuid})`
})
$onFailure(async () => {
await Promise.all(mapToArray(
delta.streams,
@@ -179,39 +172,127 @@ export default class {
// TODO: The other backup methods must use this function !
// Prerequisite: The backups array must be ordered. (old to new backups)
async _removeOldBackups (backups, path, n) {
async _removeOldBackups (backups, handler, dir, n) {
if (n <= 0) {
return
}
const getPath = (file, dir) => dir ? `${dir}/${file}` : file
await Promise.all(
mapToArray(backups.slice(0, n), backup => unlink(`${path}/${backup}`))
mapToArray(backups.slice(0, n), async backup => await handler.unlink(getPath(backup, dir)))
)
}
// -----------------------------------------------------------------
async _listVdiBackups (path) {
const files = await readdir(path)
async _legacyImportDeltaVdiBackup (xapi, { vmId, handler, dir, vdiInfo }) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
// dir = vm_delta_xxx
// xoPath = vdi_xxx/timestamp_(full|delta).vhd
// vdiDir = vdi_xxx
const { xoPath } = vdiInfo
const filePath = `${dir}/${xoPath}`
const vdiDir = dirname(xoPath)
const backups = await this._listDeltaVdiDependencies(handler, filePath)
for (const backup of backups) {
const stream = await handler.createReadStream(`${dir}/${vdiDir}/${backup}`)
await xapi.importVdiContent(vdiId, stream, {
format: VDI_FORMAT_VHD
})
}
return vdiId
}
async _legacyImportDeltaVmBackup (xapi, { remoteId, handler, filePath, info, sr }) {
// Import vm metadata.
const vm = await (async () => {
const stream = await handler.createReadStream(`${filePath}.xva`)
return await xapi.importVm(stream, { onlyMetadata: true })
})()
const vmName = vm.name_label
const dir = dirname(filePath)
// Disable start and change the VM name label during import.
await Promise.all([
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
])
// Destroy vbds if necessary. Why ?
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
// Import VDIs.
const vdiIds = {}
await Promise.all(
mapToArray(
info.vdis,
async vdiInfo => {
vdiInfo.sr = sr._xapiId
const vdiId = await this._legacyImportDeltaVdiBackup(xapi, { vmId: vm.$id, handler, dir, vdiInfo })
vdiIds[vdiInfo.uuid] = vdiId
}
)
)
await Promise.all(
mapToArray(
info.vbds,
vbdInfo => {
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
}
)
)
// Import done, reenable start and set real vm name.
await Promise.all([
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
xapi._setObjectProperties(vm, { name_label: vmName })
])
return vm
}
// -----------------------------------------------------------------
async _listVdiBackups (handler, dir) {
let files
try {
files = await handler.list(dir)
} catch (error) {
if (error.code === 'ENOENT') {
files = []
} else {
throw error
}
}
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
let i
// Avoid unstable state: No full vdi found to the beginning of array. (base)
for (i = 0; i < backups.length && isDeltaVdiBackup(backups[i]); i++);
await this._removeOldBackups(backups, path, i)
await this._removeOldBackups(backups, handler, dir, i)
return backups.slice(i)
}
async _deltaVdiBackup ({vdi, path, depth}) {
const xapi = this._xo.getXapi(vdi)
async _deltaVdiBackup (xapi, {vdi, handler, dir, depth}) {
const backupDirectory = `vdi_${vdi.uuid}`
vdi = xapi.getObject(vdi._xapiId)
path = `${path}/${backupDirectory}`
await ensureDir(path)
dir = `${dir}/${backupDirectory}`
const backups = await this._listVdiBackups(path)
const backups = await this._listVdiBackups(handler, dir)
// Make snapshot.
const date = safeDateFormat(new Date())
@@ -234,7 +315,7 @@ export default class {
// Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
const backupFullPath = `${path}/${vdiFilename}`
const backupFullPath = `${dir}/${vdiFilename}`
try {
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
@@ -242,16 +323,27 @@ export default class {
format: VDI_FORMAT_VHD
})
const targetStream = createWriteStream(backupFullPath, { flags: 'wx' })
const targetStream = await handler.createOutputStream(backupFullPath, {
// FIXME: Checksum is not computed for full vdi backups.
// The problem is in the merge case, a delta merged in a full vdi
// backup forces us to browse the resulting file =>
// Significant transfer time on the network !
checksum: !isFull,
flags: 'wx'
})
sourceStream.on('error', error => targetStream.emit('error', error))
await eventToPromise(sourceStream.pipe(targetStream), 'finish')
} catch (e) {
// Remove new backup. (corrupt) and delete new vdi base.
await xapi.deleteVdi(currentSnapshot.$id)
await unlink(backupFullPath).catch(noop)
throw e
await Promise.all([
eventToPromise(sourceStream.pipe(targetStream), 'finish'),
sourceStream.task
])
} catch (error) {
// Remove new backup. (corrupt) and delete new vdi base.
xapi.deleteVdi(currentSnapshot.$id).catch(noop)
await handler.unlink(backupFullPath, { checksum: true }).catch(noop)
throw error
}
// Returns relative path. (subdir and vdi filename), old/new base.
@@ -263,8 +355,12 @@ export default class {
}
}
async _mergeDeltaVdiBackups ({path, depth}) {
const backups = await this._listVdiBackups(path)
async _mergeDeltaVdiBackups ({handler, dir, depth}) {
if (handler.type === 'smb') {
throw new Error('VDI merging is not available through SMB')
}
const backups = await this._listVdiBackups(handler, dir)
let i = backups.length - depth
// No merge.
@@ -272,55 +368,49 @@ export default class {
return
}
const newFull = `${getVdiTimestamp(backups[i])}_full.vhd`
const vhdUtil = `${__dirname}/../../bin/vhd-util`
const timestamp = getVdiTimestamp(backups[i])
const newFullBackup = `${dir}/${timestamp}_full.vhd`
await checkFileIntegrity(handler, `${dir}/${backups[i]}`)
for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) {
const backup = `${path}/${backups[i]}`
const parent = `${path}/${backups[i - 1]}`
const backup = `${dir}/${backups[i]}`
const parent = `${dir}/${backups[i - 1]}`
const path = handler._remote.path // FIXME, private attribute !
try {
await execa(vhdUtil, ['modify', '-n', backup, '-p', parent])
await execa(vhdUtil, ['coalesce', '-n', backup])
await checkFileIntegrity(handler, `${dir}/${backups[i - 1]}`)
await execa(vhdUtil, ['modify', '-n', `${path}/${backup}`, '-p', `${path}/${parent}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['coalesce', '-n', `${path}/${backup}`]) // FIXME not ok at least with smb remotes
} catch (e) {
console.error('Unable to use vhd-util.', e)
throw e
}
await unlink(backup)
await handler.unlink(backup, { checksum: true })
}
// The base was removed, it exists two full backups or more ?
// => Remove old backups before the most recent full.
if (i > 0) {
for (i--; i >= 0; i--) {
await unlink(`${path}/${backups[i]}`)
await handler.unlink(`${dir}/${backups[i]}`, { checksum: true })
}
return
}
// Rename the first old full backup to the new full backup.
await rename(`${path}/${backups[0]}`, `${path}/${newFull}`)
await handler.rename(`${dir}/${backups[0]}`, newFullBackup)
}
async _importVdiBackupContent (xapi, file, vdiId) {
const stream = await this._openAndwaitReadableFile(
file,
'VDI to import not found in this remote'
)
await xapi.importVdiContent(vdiId, stream, {
format: VDI_FORMAT_VHD
})
}
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const path = dirname(`${remote.path}/${filePath}`)
async _listDeltaVdiDependencies (handler, filePath) {
const dir = dirname(filePath)
const filename = basename(filePath)
const backups = await this._listVdiBackups(path)
const backups = await this._listVdiBackups(handler, dir)
// Search file. (delta or full backup)
const i = findIndex(backups, backup =>
@@ -340,81 +430,83 @@ export default class {
throw new Error(`Unable to found full vdi backup of: ${filePath}`)
}
// Restore...
const xapi = this._xo.getXapi(vdi)
for (; j <= i; j++) {
await this._importVdiBackupContent(xapi, `${path}/${backups[j]}`, vdi._xapiId)
}
return backups.slice(j, i + 1)
}
// -----------------------------------------------------------------
async _listDeltaVmBackups (path) {
const files = await readdir(path)
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
async _listDeltaVmBackups (handler, dir) {
const files = await handler.list(dir)
return await sortBy(filter(files, isDeltaBackup))
}
async _failedRollingDeltaVmBackup (xapi, path, fulFilledVdiBackups) {
async _failedRollingDeltaVmBackup (xapi, handler, dir, fulFilledVdiBackups) {
await Promise.all(
mapToArray(fulFilledVdiBackups, async vdiBackup => {
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
await xapi.deleteVdi(newBaseId)
await unlink(`${path}/${backupDirectory}/${vdiFilename}`).catch(noop)
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`, { checksum: true }).catch(noop)
})
)
}
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
const remote = await this._xo.getRemote(remoteId)
const directory = `vm_delta_${tag}_${vm.uuid}`
const path = `${remote.path}/${directory}`
await ensureDir(path)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
if (handler.type === 'smb') {
throw new Error('Delta Backup is not supported for smb remotes')
}
const dir = `vm_delta_${tag}_${vm.uuid}`
const info = {
vbds: [],
vdis: {}
version: '1.0.0',
vbds: {},
vdis: {},
vifs: {}
}
const promises = []
const xapi = this._xo.getXapi(vm)
for (const vbdId of vm.$VBDs) {
const vbd = this._xo.getObject(vbdId)
vm = xapi.getObject(vm._xapiId)
if (!vbd.VDI) {
for (const vbd of vm.$VBDs) {
const vdiId = vbd.VDI
if (!vdiId || vbd.type !== 'Disk') {
continue
}
if (vbd.is_cd_drive) {
continue
}
const vdiXo = this._xo.getObject(vbd.VDI)
const vdi = xapi.getObject(vdiXo._xapiId)
const vdiUUID = vdi.uuid
info.vbds.push({
...xapi.getObject(vbd._xapiId),
xoVdi: vdiUUID
})
info.vbds[vbd.$ref] = vbd
// Warning: There may be the same VDI id for a VBD set.
if (!info.vdis[vdiUUID]) {
info.vdis[vdiUUID] = { ...vdi }
promises.push(
this._deltaVdiBackup({vdi: vdiXo, path, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiUUID].xoPath = `${backupDirectory}/${vdiFilename}`
return vdiBackup
}
)
)
if (info.vdis[vdiId]) {
continue
}
const vdi = vbd.$VDI
info.vdis[vdiId] = { ...vdi }
promises.push(
this._deltaVdiBackup(xapi, {vdi, handler, dir, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiId].xoPath = `${backupDirectory}/${vdiFilename}`
return vdiBackup
}
)
)
}
const vdiBackups = await pSettle(promises)
@@ -432,29 +524,28 @@ export default class {
}
if (fail) {
console.error(`Remove successful backups in ${path}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
console.error(`Remove successful backups in ${dir}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
throw new Error('Rolling delta vm backup failed.')
}
const backups = await this._listDeltaVmBackups(path)
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const xvaPath = `${path}/${backupFormat}.xva`
const infoPath = `${path}/${backupFormat}.json`
const infoPath = `${dir}/${backupFormat}${DELTA_BACKUP_EXT}`
try {
await Promise.all([
this.backupVm({vm, pathToFile: xvaPath, onlyMetadata: true}),
writeFile(infoPath, JSON.stringify(info), {flag: 'wx'})
])
for (const vif of vm.$VIFs) {
info.vifs[vif.$ref] = vif
}
info.vm = vm
await handler.outputFile(infoPath, JSON.stringify(info, null, 2), {flag: 'wx'})
} catch (e) {
await Promise.all([
unlink(xvaPath).catch(noop),
unlink(infoPath).catch(noop),
this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
handler.unlink(infoPath).catch(noop),
this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
])
throw e
@@ -464,12 +555,26 @@ export default class {
await Promise.all(
mapToArray(vdiBackups, vdiBackup => {
const { backupDirectory } = vdiBackup.value()
return this._mergeDeltaVdiBackups({path: `${path}/${backupDirectory}`, depth})
return this._mergeDeltaVdiBackups({handler, dir: `${dir}/${backupDirectory}`, depth})
})
)
// Remove x2 files : json AND xva files.
await this._removeOldBackups(backups, path, backups.length - (depth - 1) * 2)
// Remove old vm backups.
const backups = await this._listDeltaVmBackups(handler, dir)
const nOldBackups = backups.length - depth
if (nOldBackups > 0) {
await Promise.all(
mapToArray(backups.slice(0, nOldBackups), async backup => {
// Remove json file.
await handler.unlink(`${dir}/${backup}`)
// Remove xva file.
// Version 0.0.0 (Legacy) Delta Backup.
handler.unlink(`${dir}/${getDeltaBackupNameWithoutExt(backup)}.xva`).catch(noop)
})
)
}
// Remove old vdi bases.
Promise.all(
@@ -483,87 +588,69 @@ export default class {
).catch(noop)
// Returns relative path.
return `${directory}/${backupFormat}`
}
async _importVmMetadata (xapi, file) {
const stream = await this._openAndwaitReadableFile(
file,
'VM metadata to import not found in this remote'
)
return await xapi.importVm(stream, { onlyMetadata: true })
}
async _importDeltaVdiBackupFromVm (xapi, vmId, remoteId, directory, vdiInfo) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
await this.importDeltaVdiBackup({
vdi: this._xo.getObject(vdiId),
remoteId,
filePath: `${directory}/${vdiInfo.xoPath}`
})
return vdiId
return `${dir}/${backupFormat}`
}
async importDeltaVmBackup ({sr, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const fullBackupPath = `${remote.path}/${filePath}`
const handler = await this._xo.getRemoteHandler(remoteId)
const xapi = this._xo.getXapi(sr)
// Import vm metadata.
const vm = await this._importVmMetadata(xapi, `${fullBackupPath}.xva`)
const vmName = vm.name_label
const delta = JSON.parse(await handler.readFile(`${filePath}${DELTA_BACKUP_EXT}`))
let vm
const { version } = delta
// Disable start and change the VM name label during import.
await Promise.all([
xapi.addForbiddenOperationToVm(vm.$id, 'start', 'Delta backup import...'),
xapi._setObjectProperties(vm, { name_label: `[Importing...] ${vmName}` })
])
if (!version) {
// Legacy import. (Version 0.0.0)
vm = await this._legacyImportDeltaVmBackup(xapi, {
remoteId, handler, filePath, info: delta, sr
})
} else if (versionSatisfies(delta.version, '^1')) {
const basePath = dirname(filePath)
const streams = delta.streams = {}
// Destroy vbds if necessary. Why ?
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
await Promise.all(
mapToArray(
delta.vdis,
async (vdi, id) => {
const vdisFolder = `${basePath}/${dirname(vdi.xoPath)}`
const backups = await this._listDeltaVdiDependencies(handler, `${basePath}/${vdi.xoPath}`)
const info = JSON.parse(await readFile(`${fullBackupPath}.json`))
// Import VDIs.
const vdiIds = {}
await Promise.all(
mapToArray(
info.vdis,
async vdiInfo => {
vdiInfo.sr = sr._xapiId
const vdiId = await this._importDeltaVdiBackupFromVm(xapi, vm.$id, remoteId, dirname(filePath), vdiInfo)
vdiIds[vdiInfo.uuid] = vdiId
}
streams[`${id}.vhd`] = await Promise.all(mapToArray(backups, async backup =>
handler.createReadStream(`${vdisFolder}/${backup}`, { checksum: true, ignoreMissingChecksum: true })
))
}
)
)
)
await Promise.all(
mapToArray(
info.vbds,
vbdInfo => {
xapi.attachVdiToVm(vdiIds[vbdInfo.xoVdi], vm.$id, vbdInfo)
}
)
)
// Import done, reenable start and set real vm name.
await Promise.all([
xapi.removeForbiddenOperationFromVm(vm.$id, 'start'),
xapi._setObjectProperties(vm, { name_label: vmName })
])
vm = await xapi.importDeltaVm(delta, {
srId: sr._xapiId,
disableStartAfterImport: false
})
} else {
throw new Error(`Unsupported delta backup version: ${version}`)
}
return xapiObjectToXo(vm).id
}
// -----------------------------------------------------------------
async backupVm ({vm, pathToFile, compress, onlyMetadata}) {
const targetStream = createWriteStream(pathToFile, { flags: 'wx' })
async backupVm ({vm, remoteId, file, compress, onlyMetadata}) {
const remote = await this._xo.getRemote(remoteId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Backup remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
return this._backupVm(vm, handler, file, {compress, onlyMetadata})
}
async _backupVm (vm, handler, file, {compress, onlyMetadata}) {
const targetStream = await handler.createOutputStream(file, { flags: 'wx' })
const promise = eventToPromise(targetStream, 'finish')
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
@@ -575,26 +662,28 @@ export default class {
await promise
}
async rollingBackupVm ({vm, path, tag, depth, compress, onlyMetadata}) {
await ensureDir(path)
const files = await readdir(path)
async rollingBackupVm ({vm, remoteId, tag, depth, compress, onlyMetadata}) {
const remote = await this._xo.getRemote(remoteId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Backup remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
const files = await handler.list()
const reg = new RegExp('^[^_]+_' + escapeStringRegexp(`${tag}_${vm.name_label}.xva`))
const backups = sortBy(filter(files, (fileName) => reg.test(fileName)))
const date = safeDateFormat(new Date())
const backupFullPath = `${path}/${date}_${tag}_${vm.name_label}.xva`
const file = `${date}_${tag}_${vm.name_label}.xva`
await this.backupVm({vm, pathToFile: backupFullPath, compress, onlyMetadata})
const promises = []
for (let surplus = backups.length - (depth - 1); surplus > 0; surplus--) {
const oldBackup = backups.shift()
promises.push(unlink(`${path}/${oldBackup}`))
}
await Promise.all(promises)
return backupFullPath
await this._backupVm(vm, handler, file, {compress, onlyMetadata})
await this._removeOldBackups(backups, handler, undefined, backups.length - (depth - 1))
}
async rollingSnapshotVm (vm, tag, depth) {

View File

@@ -8,7 +8,6 @@ import {
} from '../api-errors'
import {
createRawObject,
noop,
mapToArray
} from '../utils'
@@ -94,7 +93,9 @@ export default class {
return this.loadPlugin(id)
}
})
.catch(noop)
.catch(error => {
console.error('register plugin %s: %s', name, error && error.stack || error)
})
}
async _getPlugin (id) {

View File

@@ -1,14 +1,15 @@
import startsWith from 'lodash.startswith'
import RemoteHandler from '../remote-handler'
import { Remotes } from '../models/remote'
import RemoteHandlerLocal from '../remote-handlers/local'
import RemoteHandlerNfs from '../remote-handlers/nfs'
import RemoteHandlerSmb from '../remote-handlers/smb'
import {
forEach
} from '../utils'
import {
NoSuchObject
} from '../api-errors'
import {
forEach,
mapToArray
} from '../utils'
Remotes
} from '../models/remote'
// ===================================================================
@@ -29,33 +30,30 @@ export default class {
})
xo.on('start', async () => {
// TODO: Should it be private?
this.remoteHandler = new RemoteHandler()
await this.initRemotes()
await this.syncAllRemotes()
})
xo.on('stop', () => this.disableAllRemotes())
xo.on('stop', () => this.forgetAllRemotes())
}
_developRemote (remote) {
const _remote = { ...remote }
if (startsWith(_remote.url, 'file://')) {
_remote.type = 'local'
_remote.path = _remote.url.slice(6)
} else if (startsWith(_remote.url, 'nfs://')) {
_remote.type = 'nfs'
const url = _remote.url.slice(6)
const [host, share] = url.split(':')
_remote.path = '/tmp/xo-server/mounts/' + _remote.id
_remote.host = host
_remote.share = share
async getRemoteHandler (remote) {
if (typeof remote === 'string') {
remote = await this.getRemote(remote)
}
return _remote
const Handler = {
file: RemoteHandlerLocal,
smb: RemoteHandlerSmb,
nfs: RemoteHandlerNfs
}
const type = remote.url.split('://')[0]
if (!Handler[type]) {
throw new Error('Unhandled remote type')
}
return new Handler[type](remote)
}
async getAllRemotes () {
return mapToArray(await this._remotes.get(), this._developRemote)
return this._remotes.get()
}
async _getRemote (id) {
@@ -68,7 +66,7 @@ export default class {
}
async getRemote (id) {
return this._developRemote((await this._getRemote(id)).properties)
return (await this._getRemote(id)).properties
}
async createRemote ({name, url}) {
@@ -79,9 +77,10 @@ export default class {
async updateRemote (id, {name, url, enabled, error}) {
const remote = await this._getRemote(id)
this._updateRemote(remote, {name, url, enabled, error})
const props = await this.remoteHandler.sync(this._developRemote(remote.properties))
const handler = await this.getRemoteHandler(remote.properties)
const props = await handler.sync()
this._updateRemote(remote, props)
return await this._developRemote(this._remotes.save(remote).properties)
return (await this._remotes.save(remote)).properties
}
_updateRemote (remote, {name, url, enabled, error}) {
@@ -96,8 +95,8 @@ export default class {
}
async removeRemote (id) {
const remote = await this.getRemote(id)
await this.remoteHandler.forget(remote)
const handler = await this.getRemoteHandler(id)
await handler.forget()
await this._remotes.remove(id)
}
@@ -110,9 +109,13 @@ export default class {
}
// TODO: Should it be private?
async disableAllRemotes () {
async forgetAllRemotes () {
const remotes = await this.getAllRemotes()
this.remoteHandler.disableAll(remotes)
for (let remote of remotes) {
try {
(await this.getRemoteHandler(remote)).forget()
} catch (_) {}
}
}
// TODO: Should it be private?

View File

@@ -1,3 +1,4 @@
import checkAuthorization from 'xo-acl-resolver'
import filter from 'lodash.filter'
import fs from 'fs-promise'
import includes from 'lodash.includes'
@@ -15,7 +16,6 @@ import {
} from 'hashy'
import mixins from './xo-mixins'
import checkAuthorization from './acl'
import Connection from './connection'
import LevelDbLogger from './loggers/leveldb'
import Xapi from './xapi'
@@ -31,7 +31,8 @@ import {
generateToken,
isEmpty,
mapToArray,
noop
noop,
popProperty
} from './utils'
import {Groups} from './models/group'
import {
@@ -95,6 +96,7 @@ export default class Xo extends EventEmitter {
this._authenticationFailures = createRawObject()
this._authenticationProviders = new Set()
this._httpRequestWatchers = createRawObject()
this._xenObjConflicts = createRawObject() // TODO: clean when a server is disconnected.
// Connects to Redis.
this._redis = createRedisClient(config.redis && config.redis.uri)
@@ -706,27 +708,31 @@ export default class Xo extends EventEmitter {
return server
}
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry) {
const {_objects: objects} = this
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._xenObjConflicts
const objects = this._objects
forEach(xapiObjects, (xapiObject, xapiId) => {
try {
const xoObject = xapiObjectToXo(xapiObject)
if (!xoObject) {
return
}
if (xoObject) {
const prevId = xapiIdsToXo[xapiId]
const currId = xoObject.id
const xoId = xoObject.id
xapiIdsToXo[xapiId] = xoId
if (prevId !== currId) {
// If there was a previous XO object for this XAPI object
// (with a different id), removes it.
if (prevId) {
objects.unset(prevId)
}
xapiIdsToXo[xapiId] = currId
}
objects.set(xoObject)
const previous = objects.get(xoId, undefined)
if (
previous &&
previous._xapiRef !== xapiObject.$ref
) {
(
conflicts[xoId] ||
(conflicts[xoId] = createRawObject())
)[conId] = xoObject
} else {
objects.set(xoId, xoObject)
}
} catch (error) {
console.error('ERROR: xapiObjectToXo', error)
@@ -736,16 +742,33 @@ export default class Xo extends EventEmitter {
})
}
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry) {
const {_objects: objects} = this
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._xenObjConflicts
const objects = this._objects
forEach(xapiObjects, (_, xapiId) => {
toRetry && delete toRetry[xapiId]
const xoId = xapiIdsToXo[xapiId]
if (!xoId) {
// This object was not known previously.
return
}
if (xoId) {
delete xapiIdsToXo[xapiId]
delete xapiIdsToXo[xapiId]
const objConflicts = conflicts[xoId]
if (objConflicts) {
if (objConflicts[conId]) {
delete objConflicts[conId]
} else {
objects.set(xoId, popProperty(objConflicts))
}
if (isEmpty(objConflicts)) {
delete conflicts[xoId]
}
} else {
objects.unset(xoId)
}
})
@@ -764,7 +787,9 @@ export default class Xo extends EventEmitter {
})
xapi.xo = (() => {
// Maps ids of XAPI objects to ids of XO objecs.
const conId = server.id
// Maps ids of XAPI objects to ids of XO objects.
const xapiIdsToXo = createRawObject()
// Map of XAPI objects which failed to be transformed to XO
@@ -776,10 +801,10 @@ export default class Xo extends EventEmitter {
let toRetryNext = createRawObject()
const onAddOrUpdate = objects => {
this._onXenAdd(objects, xapiIdsToXo, toRetryNext)
this._onXenAdd(objects, xapiIdsToXo, toRetryNext, conId)
}
const onRemove = objects => {
this._onXenRemove(objects, xapiIdsToXo, toRetry)
this._onXenRemove(objects, xapiIdsToXo, toRetry, conId)
}
const onFinish = () => {
if (xapi.pool) {