Remotes refactoring + SMB implementation.
This commit is contained in:
Julien Fontanet 2016-01-27 11:24:52 +01:00
commit 8c672b23b5
11 changed files with 595 additions and 285 deletions

View File

@ -34,6 +34,7 @@
"node": ">=0.12 <5"
},
"dependencies": {
"@marsaud/smb2-promise": "^0.1.0",
"app-conf": "^0.4.0",
"babel-runtime": "^5",
"base64url": "^1.0.5",
@ -95,6 +96,7 @@
"lodash.pick": "^3.0.0",
"lodash.sortby": "^3.1.4",
"lodash.startswith": "^3.0.1",
"lodash.trim": "^3.0.1",
"loud-rejection": "^1.2.0",
"make-error": "^1",
"micromatch": "^2.3.2",
@ -114,7 +116,8 @@
"ws": "~0.8.0",
"xen-api": "^0.7.2",
"xml2js": "~0.4.6",
"xo-collection": "^0.4.0"
"xo-collection": "^0.4.0",
"xo-remote-parser": "^0.1.0"
},
"devDependencies": {
"babel-eslint": "^4.0.10",

View File

@ -5,7 +5,7 @@ export async function getAll () {
getAll.permission = 'admin'
getAll.description = 'Gets all existing fs remote points'
export async function get (id) {
export async function get ({id}) {
return await this.getRemote(id)
}
@ -15,7 +15,7 @@ get.params = {
id: {type: 'string'}
}
export async function list (id) {
export async function list ({id}) {
return await this.listRemoteBackups(id)
}

View File

@ -572,12 +572,13 @@ exports.rollingSnapshot = rollingSnapshot
#---------------------------------------------------------------------
backup = $coroutine ({vm, pathToFile, compress, onlyMetadata}) ->
yield @backupVm({vm, pathToFile, compress, onlyMetadata})
backup = $coroutine ({vm, remoteId, file, compress, onlyMetadata}) ->
yield @backupVm({vm, remoteId, file, compress, onlyMetadata})
backup.params = {
id: { type: 'string' }
pathToFile: { type: 'string' }
id: {type: 'string'}
remoteId: { type: 'string' }
file: { type: 'string' }
compress: { type: 'boolean', optional: true }
onlyMetadata: { type: 'boolean', optional: true }
}
@ -615,14 +616,9 @@ exports.importBackup = importBackup
#---------------------------------------------------------------------
rollingBackup = $coroutine ({vm, remoteId, tag, depth, compress, onlyMetadata}) ->
remote = yield @getRemote remoteId
if not remote?.path?
throw new Error "No such Remote #{remoteId}"
if not remote.enabled
throw new Error "Backup remote #{remoteId} is disabled"
return yield @rollingBackupVm({
vm,
path: remote.path,
remoteId,
tag,
depth,
compress,

View File

@ -1,6 +1,8 @@
import Collection from '../collection/redis'
import Model from '../model'
import { forEach } from '../utils'
import {
forEach
} from '../utils'
// ===================================================================

View File

@ -1,140 +0,0 @@
import filter from 'lodash.filter'
import fs from 'fs-promise'
import {exec} from 'child_process'
import {
forEach,
noop,
promisify
} from './utils'
const execAsync = promisify(exec)
class NfsMounter {
async _loadRealMounts () {
let stdout
try {
[stdout] = await execAsync('findmnt -P -t nfs,nfs4 --output SOURCE,TARGET --noheadings')
} catch (exc) {
// When no mounts are found, the call pretends to fail...
}
const mounted = {}
if (stdout) {
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
forEach(stdout.split('\n'), m => {
if (m) {
const match = regex.exec(m)
mounted[match[3]] = {
host: match[1],
share: match[2]
}
}
})
}
this._realMounts = mounted
return mounted
}
_fullPath (path) {
return path
}
_matchesRealMount (mount) {
return this._fullPath(mount.path) in this._realMounts
}
async _mount (mount) {
const path = this._fullPath(mount.path)
await fs.ensureDir(path)
return await execAsync(`mount -t nfs ${mount.host}:${mount.share} ${path}`)
}
async forget (mount) {
try {
await this._umount(mount)
} catch (_) {
// We have to go on...
}
}
async _umount (mount) {
const path = this._fullPath(mount.path)
await execAsync(`umount ${path}`)
}
async sync (mount) {
await this._loadRealMounts()
if (this._matchesRealMount(mount) && !mount.enabled) {
try {
await this._umount(mount)
} catch (exc) {
mount.enabled = true
mount.error = exc.message
}
} else if (!this._matchesRealMount(mount) && mount.enabled) {
try {
await this._mount(mount)
} catch (exc) {
mount.enabled = false
mount.error = exc.message
}
}
return mount
}
async disableAll (mounts) {
await this._loadRealMounts()
forEach(mounts, async mount => {
if (this._matchesRealMount(mount)) {
try {
await this._umount(mount)
} catch (_) {
// We have to go on...
}
}
})
}
}
class LocalHandler {
constructor () {
this.forget = noop
this.disableAll = noop
}
async sync (local) {
if (local.enabled) {
try {
await fs.ensureDir(local.path)
await fs.access(local.path, fs.R_OK | fs.W_OK)
} catch (exc) {
local.enabled = false
local.error = exc.message
}
}
return local
}
}
export default class RemoteHandler {
constructor () {
this.handlers = {
nfs: new NfsMounter(),
local: new LocalHandler()
}
}
async sync (remote) {
return await this.handlers[remote.type].sync(remote)
}
async forget (remote) {
return await this.handlers[remote.type].forget(remote)
}
async disableAll (remotes) {
const promises = []
forEach(['local', 'nfs'], type => promises.push(this.handlers[type].disableAll(filter(remotes, remote => remote.type === type))))
await Promise.all(promises)
}
}

View File

@ -0,0 +1,114 @@
import eventToPromise from 'event-to-promise'
import getStream from 'get-stream'
import {
parse
} from 'xo-remote-parser'
export default class RemoteHandlerAbstract {
constructor (remote) {
this._remote = parse({...remote})
if (this._remote.type !== this.type) {
throw new Error('Incorrect remote type')
}
}
get type () {
throw new Error('Not implemented')
}
/**
* Asks the handler to sync the state of the effective remote with its' metadata
*/
async sync () {
return this._sync()
}
async _sync () {
throw new Error('Not implemented')
}
/**
* Free the resources possibly dedicated to put the remote at work, when it is no more needed
*/
async forget () {
return this._forget()
}
async _forget () {
throw new Error('Not implemented')
}
async outputFile (file, data, options) {
return this._outputFile(file, data, options)
}
async _outputFile (file, data, options) {
const stream = await this.createOutputStream(file)
const promise = eventToPromise(stream, 'finish')
stream.end(data)
return promise
}
async readFile (file, options) {
return this._readFile(file, options)
}
async _readFile (file, options) {
return getStream(await this.createReadStream(file, options))
}
async rename (oldPath, newPath) {
return this._rename(oldPath, newPath)
}
async _rename (oldPath, newPath) {
throw new Error('Not implemented')
}
async list (dir = '.') {
return this._list(dir)
}
async _list (dir) {
throw new Error('Not implemented')
}
async createReadStream (file, options) {
const stream = await this._createReadStream(file)
if (stream.length === undefined) {
try {
stream.length = await this.getSize(file)
} catch (_) {}
}
return stream
}
async _createReadStream (file, options) {
throw new Error('Not implemented')
}
async createOutputStream (file, options) {
return this._createOutputStream(file, options)
}
async _createOutputStream (file, options) {
throw new Error('Not implemented')
}
async unlink (file) {
return this._unlink(file)
}
async _unlink (file) {
throw new Error('Not implemented')
}
async getSize (file) {
return this._getSize(file)
}
async _getSize (file) {
throw new Error('Not implemented')
}
}

View File

@ -0,0 +1,84 @@
import fs from 'fs-promise'
import startsWith from 'lodash.startswith'
import {
dirname,
resolve
} from 'path'
import RemoteHandlerAbstract from './abstract'
import {
noop
} from '../utils'
export default class LocalHandler extends RemoteHandlerAbstract {
get type () {
return 'local'
}
_getFilePath (file) {
const parts = [this._remote.path]
if (file) {
parts.push(file)
}
const path = resolve.apply(null, parts)
if (!startsWith(path, this._remote.path)) {
throw new Error('Remote path is unavailable')
}
return path
}
async _sync () {
if (this._remote.enabled) {
try {
await fs.ensureDir(this._remote.path)
await fs.access(this._remote.path, fs.R_OK | fs.W_OK)
} catch (exc) {
this._remote.enabled = false
this._remote.error = exc.message
}
}
return this._remote
}
async _forget () {
return noop()
}
async _outputFile (file, data, options) {
const path = this._getFilePath(file)
await fs.ensureDir(dirname(path))
await fs.writeFile(this._getFilePath(file), data, options)
}
async _readFile (file, options) {
return fs.readFile(this._getFilePath(file), options)
}
async _rename (oldPath, newPath) {
return fs.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
}
async _list (dir = '.') {
return fs.readdir(this._getFilePath(dir))
}
async _createReadStream (file, options) {
return fs.createReadStream(this._getFilePath(file), options)
}
async _createOutputStream (file, options) {
const path = this._getFilePath(file)
await fs.ensureDir(dirname(path))
return fs.createWriteStream(path, options)
}
async _unlink (file) {
return fs.unlink(this._getFilePath(file))
}
async _getSize (file) {
const stats = await fs.stat(this._getFilePath(file))
return stats.size
}
}

View File

@ -0,0 +1,77 @@
import execa from 'execa'
import fs from 'fs-promise'
import LocalHandler from './local'
import {
forEach
} from '../utils'
export default class NfsHandler extends LocalHandler {
get type () {
return 'nfs'
}
async _loadRealMounts () {
let stdout
const mounted = {}
try {
[stdout] = await execa('findmnt', ['-P', '-t', 'nfs,nfs4', '--output', 'SOURCE,TARGET', '--noheadings'])
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
forEach(stdout.split('\n'), m => {
if (m) {
const match = regex.exec(m)
mounted[match[3]] = {
host: match[1],
share: match[2]
}
}
})
} catch (exc) {
// When no mounts are found, the call pretends to fail...
}
this._realMounts = mounted
return mounted
}
_matchesRealMount (remote) {
return remote.path in this._realMounts
}
async _mount (remote) {
await fs.ensureDir(remote.path)
return execa('mount', ['-t', 'nfs', `${remote.host}:${remote.share}`, remote.path])
}
async _sync () {
await this._loadRealMounts()
if (this._matchesRealMount(this._remote) && !this._remote.enabled) {
try {
await this._umount(this._remote)
} catch (exc) {
this._remote.enabled = true
this._remote.error = exc.message
}
} else if (!this._matchesRealMount(this._remote) && this._remote.enabled) {
try {
await this._mount(this._remote)
} catch (exc) {
this._remote.enabled = false
this._remote.error = exc.message
}
}
return this._remote
}
async _forget () {
try {
await this._umount(this._remote)
} catch (_) {
// We have to go on...
}
}
async _umount (remote) {
await execa('umount', [remote.path])
}
}

144
src/remote-handlers/smb.js Normal file
View File

@ -0,0 +1,144 @@
import Smb2 from '@marsaud/smb2-promise'
import RemoteHandlerAbstract from './abstract'
import {
noop
} from '../utils'
export default class SmbHandler extends RemoteHandlerAbstract {
constructor (remote) {
super(remote)
this._forget = noop
}
get type () {
return 'smb'
}
_getClient (remote) {
return new Smb2({
share: `\\\\${remote.host}`,
domain: remote.domain,
username: remote.username,
password: remote.password,
autoCloseTimeout: 0
})
}
_getFilePath (file) {
if (file === '.') {
file = undefined
}
const parts = []
if (this._remote.path !== '') {
parts.push(this._remote.path)
}
if (file) {
parts.push(file.split('/'))
}
return parts.join('\\')
}
_dirname (file) {
const parts = file.split('\\')
parts.pop()
return parts.join('\\')
}
async _sync () {
if (this._remote.enabled) {
try {
// Check access (smb2 does not expose connect in public so far...)
await this.list()
} catch (error) {
this._remote.enabled = false
this._remote.error = error.message
}
}
return this._remote
}
async _outputFile (file, data, options) {
const client = this._getClient(this._remote)
const path = this._getFilePath(file)
const dir = this._dirname(path)
try {
if (dir) {
await client.ensureDir(dir)
}
return client.writeFile(path, data, options)
} finally {
client.close()
}
}
async _readFile (file, options) {
const client = this._getClient(this._remote)
try {
return client.readFile(this._getFilePath(file), options)
} finally {
client.close()
}
}
async _rename (oldPath, newPath) {
const client = this._getClient(this._remote)
try {
return client.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
} finally {
client.close()
}
}
async _list (dir = '.') {
const client = this._getClient(this._remote)
try {
return client.readdir(this._getFilePath(dir))
} finally {
client.close()
}
}
async _createReadStream (file, options) {
const client = this._getClient(this._remote)
const stream = await client.createReadStream(this._getFilePath(file), options) // FIXME ensure that options are properly handled by @marsaud/smb2
stream.on('end', () => client.close())
return stream
}
async _createOutputStream (file, options) {
const client = this._getClient(this._remote)
const path = this._getFilePath(file)
const dir = this._dirname(path)
let stream
try {
if (dir) {
await client.ensureDir(dir)
}
stream = await client.createWriteStream(path, options) // FIXME ensure that options are properly handled by @marsaud/smb2
} catch (err) {
client.close()
throw err
}
stream.on('finish', () => client.close())
return stream
}
async _unlink (file) {
const client = this._getClient(this._remote)
try {
return client.unlink(this._getFilePath(file))
} finally {
client.close()
}
}
async _getSize (file) {
const client = await this._getClient(this._remote)
try {
return client.getSize(this._getFilePath(file))
} finally {
client.close()
}
}
}

View File

@ -6,17 +6,6 @@ import filter from 'lodash.filter'
import findIndex from 'lodash.findindex'
import sortBy from 'lodash.sortby'
import startsWith from 'lodash.startswith'
import {
createReadStream,
createWriteStream,
ensureDir,
readdir,
readFile,
rename,
stat,
unlink,
writeFile
} from 'fs-promise'
import {
basename,
dirname
@ -59,20 +48,19 @@ export default class {
}
async listRemoteBackups (remoteId) {
const remote = await this._xo.getRemote(remoteId)
const path = remote.path
const handler = await this._xo.getRemoteHandler(remoteId)
// List backups. (Except delta backups)
const xvaFilter = file => endsWith(file, '.xva')
const files = await readdir(path)
const files = await handler.list()
const backups = filter(files, xvaFilter)
// List delta backups.
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
for (const deltaDir of deltaDirs) {
const files = await readdir(`${path}/${deltaDir}`)
const files = await handler.list(deltaDir)
const deltaBackups = filter(files, xvaFilter)
backups.push(...mapToArray(
@ -84,11 +72,11 @@ export default class {
return backups
}
// TODO: move into utils and rename!
async _openAndwaitReadableFile (path, errorMessage) {
const stream = createReadStream(path)
// TODO: move into utils and rename! NO, until we may pass a handler instead of a remote...?
async _openAndwaitReadableFile (handler, file, errorMessage) {
let stream
try {
stream = await handler.createReadStream(file)
await eventToPromise(stream, 'readable')
} catch (error) {
if (error.code === 'ENOENT') {
@ -97,16 +85,14 @@ export default class {
throw error
}
stream.length = (await stat(path)).size
return stream
}
async importVmBackup (remoteId, file, sr) {
const remote = await this._xo.getRemote(remoteId)
const path = `${remote.path}/${file}`
const handler = await this._xo.getRemoteHandler(remoteId)
const stream = await this._openAndwaitReadableFile(
path,
handler,
file,
'VM to import not found in this remote'
)
@ -179,39 +165,48 @@ export default class {
// TODO: The other backup methods must use this function !
// Prerequisite: The backups array must be ordered. (old to new backups)
async _removeOldBackups (backups, path, n) {
async _removeOldBackups (backups, handler, dir, n) {
if (n <= 0) {
return
}
const getPath = (file, dir) => dir ? `${dir}/${file}` : file
await Promise.all(
mapToArray(backups.slice(0, n), backup => unlink(`${path}/${backup}`))
mapToArray(backups.slice(0, n), async backup => await handler.unlink(getPath(backup, dir)))
)
}
// -----------------------------------------------------------------
async _listVdiBackups (path) {
const files = await readdir(path)
async _listVdiBackups (handler, dir) {
let files
try {
files = await handler.list(dir)
} catch (error) {
if (error.code === 'ENOENT') {
files = []
} else {
throw error
}
}
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
let i
// Avoid unstable state: No full vdi found to the beginning of array. (base)
for (i = 0; i < backups.length && isDeltaVdiBackup(backups[i]); i++);
await this._removeOldBackups(backups, path, i)
await this._removeOldBackups(backups, handler, dir, i)
return backups.slice(i)
}
async _deltaVdiBackup ({vdi, path, depth}) {
async _deltaVdiBackup ({vdi, handler, dir, depth}) {
const xapi = this._xo.getXapi(vdi)
const backupDirectory = `vdi_${vdi.uuid}`
vdi = xapi.getObject(vdi._xapiId)
path = `${path}/${backupDirectory}`
await ensureDir(path)
dir = `${dir}/${backupDirectory}`
const backups = await this._listVdiBackups(path)
const backups = await this._listVdiBackups(handler, dir)
// Make snapshot.
const date = safeDateFormat(new Date())
@ -234,7 +229,7 @@ export default class {
// Export full or delta backup.
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
const backupFullPath = `${path}/${vdiFilename}`
const backupFullPath = `${dir}/${vdiFilename}`
try {
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
@ -242,7 +237,7 @@ export default class {
format: VDI_FORMAT_VHD
})
const targetStream = createWriteStream(backupFullPath, { flags: 'wx' })
const targetStream = await handler.createOutputStream(backupFullPath, { flags: 'wx' })
sourceStream.on('error', error => targetStream.emit('error', error))
await Promise.all([
@ -252,8 +247,7 @@ export default class {
} catch (error) {
// Remove new backup. (corrupt) and delete new vdi base.
xapi.deleteVdi(currentSnapshot.$id).catch(noop)
await unlink(backupFullPath).catch(noop)
await handler.unlink(backupFullPath).catch(noop)
throw error
}
@ -266,8 +260,11 @@ export default class {
}
}
async _mergeDeltaVdiBackups ({path, depth}) {
const backups = await this._listVdiBackups(path)
async _mergeDeltaVdiBackups ({handler, dir, depth}) {
if (handler.type === 'smb') {
throw new Error('VDI merging is not available through SMB')
}
const backups = await this._listVdiBackups(handler, dir)
let i = backups.length - depth
// No merge.
@ -279,36 +276,37 @@ export default class {
const vhdUtil = `${__dirname}/../../bin/vhd-util`
for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) {
const backup = `${path}/${backups[i]}`
const parent = `${path}/${backups[i - 1]}`
const backup = `${dir}/${backups[i]}`
const parent = `${dir}/${backups[i - 1]}`
try {
await execa(vhdUtil, ['modify', '-n', backup, '-p', parent])
await execa(vhdUtil, ['coalesce', '-n', backup])
await execa(vhdUtil, ['modify', '-n', `${handler.path}/${backup}`, '-p', `${handler.path}/${parent}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['coalesce', '-n', `${handler.path}/${backup}`]) // FIXME not ok at least with smb remotes
} catch (e) {
console.error('Unable to use vhd-util.', e)
throw e
}
await unlink(backup)
await handler.unlink(backup)
}
// The base was removed, it exists two full backups or more ?
// => Remove old backups before the most recent full.
if (i > 0) {
for (i--; i >= 0; i--) {
await unlink(`${path}/${backups[i]}`)
await handler.unlink(`${dir}/${backups[i]}`)
}
return
}
// Rename the first old full backup to the new full backup.
await rename(`${path}/${backups[0]}`, `${path}/${newFull}`)
await handler.rename(`${dir}/${backups[0]}`, `${dir}/${newFull}`)
}
async _importVdiBackupContent (xapi, file, vdiId) {
async _importVdiBackupContent (xapi, handler, file, vdiId) {
const stream = await this._openAndwaitReadableFile(
handler,
file,
'VDI to import not found in this remote'
)
@ -319,11 +317,14 @@ export default class {
}
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const path = dirname(`${remote.path}/${filePath}`)
const handler = await this._xo.getRemoteHandler(remoteId)
return this._importDeltaVdiBackup(vdi, handler, filePath)
}
async _importDeltaVdiBackup (vdi, handler, filePath) {
const dir = dirname(filePath)
const filename = basename(filePath)
const backups = await this._listVdiBackups(path)
const backups = await this._listVdiBackups(handler, dir)
// Search file. (delta or full backup)
const i = findIndex(backups, backup =>
@ -347,34 +348,44 @@ export default class {
const xapi = this._xo.getXapi(vdi)
for (; j <= i; j++) {
await this._importVdiBackupContent(xapi, `${path}/${backups[j]}`, vdi._xapiId)
await this._importVdiBackupContent(xapi, handler, `${dir}/${backups[j]}`, vdi._xapiId)
}
}
// -----------------------------------------------------------------
async _listDeltaVmBackups (path) {
const files = await readdir(path)
async _listDeltaVmBackups (handler, dir) {
const files = await handler.list(dir)
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
}
async _failedRollingDeltaVmBackup (xapi, path, fulFilledVdiBackups) {
async _failedRollingDeltaVmBackup (xapi, handler, dir, fulFilledVdiBackups) {
await Promise.all(
mapToArray(fulFilledVdiBackups, async vdiBackup => {
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
await xapi.deleteVdi(newBaseId)
await unlink(`${path}/${backupDirectory}/${vdiFilename}`).catch(noop)
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`).catch(noop)
})
)
}
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
const remote = await this._xo.getRemote(remoteId)
const directory = `vm_delta_${tag}_${vm.uuid}`
const path = `${remote.path}/${directory}`
await ensureDir(path)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
if (handler.type === 'smb') {
throw new Error('Delta Backup is not supported for smb remotes')
}
const dir = `vm_delta_${tag}_${vm.uuid}`
const info = {
vbds: [],
@ -408,7 +419,7 @@ export default class {
if (!info.vdis[vdiUUID]) {
info.vdis[vdiUUID] = { ...vdi }
promises.push(
this._deltaVdiBackup({vdi: vdiXo, path, depth}).then(
this._deltaVdiBackup({handler, vdi: vdiXo, dir, depth}).then(
vdiBackup => {
const { backupDirectory, vdiFilename } = vdiBackup
info.vdis[vdiUUID].xoPath = `${backupDirectory}/${vdiFilename}`
@ -435,29 +446,29 @@ export default class {
}
if (fail) {
console.error(`Remove successful backups in ${path}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
console.error(`Remove successful backups in ${handler.path}/${dir}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
throw new Error('Rolling delta vm backup failed.')
}
const backups = await this._listDeltaVmBackups(path)
const backups = await this._listDeltaVmBackups(handler, dir)
const date = safeDateFormat(new Date())
const backupFormat = `${date}_${vm.name_label}`
const xvaPath = `${path}/${backupFormat}.xva`
const infoPath = `${path}/${backupFormat}.json`
const xvaPath = `${dir}/${backupFormat}.xva`
const infoPath = `${dir}/${backupFormat}.json`
try {
await Promise.all([
this.backupVm({vm, pathToFile: xvaPath, onlyMetadata: true}),
writeFile(infoPath, JSON.stringify(info), {flag: 'wx'})
this._backupVm(vm, handler, xvaPath, {onlyMetadata: true}),
handler.outputFile(infoPath, JSON.stringify(info), {flag: 'wx'})
])
} catch (e) {
await Promise.all([
unlink(xvaPath).catch(noop),
unlink(infoPath).catch(noop),
this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
handler.unlink(xvaPath).catch(noop),
handler.unlink(infoPath).catch(noop),
this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
])
throw e
@ -467,12 +478,12 @@ export default class {
await Promise.all(
mapToArray(vdiBackups, vdiBackup => {
const { backupDirectory } = vdiBackup.value()
return this._mergeDeltaVdiBackups({path: `${path}/${backupDirectory}`, depth})
return this._mergeDeltaVdiBackups({handler, dir: `${dir}/${backupDirectory}`, depth})
})
)
// Remove x2 files : json AND xva files.
await this._removeOldBackups(backups, path, backups.length - (depth - 1) * 2)
await this._removeOldBackups(backups, handler, dir, backups.length - (depth - 1) * 2)
// Remove old vdi bases.
Promise.all(
@ -486,37 +497,37 @@ export default class {
).catch(noop)
// Returns relative path.
return `${directory}/${backupFormat}`
return `${dir}/${backupFormat}`
}
async _importVmMetadata (xapi, file) {
async _importVmMetadata (xapi, handler, file) {
const stream = await this._openAndwaitReadableFile(
handler,
file,
'VM metadata to import not found in this remote'
)
return await xapi.importVm(stream, { onlyMetadata: true })
}
async _importDeltaVdiBackupFromVm (xapi, vmId, remoteId, directory, vdiInfo) {
async _importDeltaVdiBackupFromVm (xapi, vmId, handler, directory, vdiInfo) {
const vdi = await xapi.createVdi(vdiInfo.virtual_size, vdiInfo)
const vdiId = vdi.$id
await this.importDeltaVdiBackup({
vdi: this._xo.getObject(vdiId),
remoteId,
filePath: `${directory}/${vdiInfo.xoPath}`
})
await this._importDeltaVdiBackup(
this._xo.getObject(vdiId),
handler,
`${directory}/${vdiInfo.xoPath}`
)
return vdiId
}
async importDeltaVmBackup ({sr, remoteId, filePath}) {
const remote = await this._xo.getRemote(remoteId)
const fullBackupPath = `${remote.path}/${filePath}`
const handler = await this._xo.getRemoteHandler(remoteId)
const xapi = this._xo.getXapi(sr)
// Import vm metadata.
const vm = await this._importVmMetadata(xapi, `${fullBackupPath}.xva`)
const vm = await this._importVmMetadata(xapi, handler, `${filePath}.xva`)
const vmName = vm.name_label
// Disable start and change the VM name label during import.
@ -529,7 +540,7 @@ export default class {
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
await xapi.destroyVbdsFromVm(vm.uuid)
const info = JSON.parse(await readFile(`${fullBackupPath}.json`))
const info = JSON.parse(await handler.readFile(`${filePath}.json`))
// Import VDIs.
const vdiIds = {}
@ -565,8 +576,22 @@ export default class {
// -----------------------------------------------------------------
async backupVm ({vm, pathToFile, compress, onlyMetadata}) {
const targetStream = createWriteStream(pathToFile, { flags: 'wx' })
async backupVm ({vm, remoteId, file, compress, onlyMetadata}) {
const remote = await this._xo.getRemote(remoteId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Backup remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
return this._backupVm(vm, handler, file, {compress, onlyMetadata})
}
async _backupVm (vm, handler, file, {compress, onlyMetadata}) {
const targetStream = await handler.createOutputStream(file, { flags: 'wx' })
const promise = eventToPromise(targetStream, 'finish')
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
@ -578,26 +603,28 @@ export default class {
await promise
}
async rollingBackupVm ({vm, path, tag, depth, compress, onlyMetadata}) {
await ensureDir(path)
const files = await readdir(path)
async rollingBackupVm ({vm, remoteId, tag, depth, compress, onlyMetadata}) {
const remote = await this._xo.getRemote(remoteId)
if (!remote) {
throw new Error(`No such Remote ${remoteId}`)
}
if (!remote.enabled) {
throw new Error(`Backup remote ${remoteId} is disabled`)
}
const handler = await this._xo.getRemoteHandler(remote)
const files = await handler.list()
const reg = new RegExp('^[^_]+_' + escapeStringRegexp(`${tag}_${vm.name_label}.xva`))
const backups = sortBy(filter(files, (fileName) => reg.test(fileName)))
const date = safeDateFormat(new Date())
const backupFullPath = `${path}/${date}_${tag}_${vm.name_label}.xva`
const file = `${date}_${tag}_${vm.name_label}.xva`
await this.backupVm({vm, pathToFile: backupFullPath, compress, onlyMetadata})
const promises = []
for (let surplus = backups.length - (depth - 1); surplus > 0; surplus--) {
const oldBackup = backups.shift()
promises.push(unlink(`${path}/${oldBackup}`))
}
await Promise.all(promises)
return backupFullPath
await this._backupVm(vm, handler, file, {compress, onlyMetadata})
await this._removeOldBackups(backups, handler, undefined, backups.length - (depth - 1))
}
async rollingSnapshotVm (vm, tag, depth) {

View File

@ -1,14 +1,15 @@
import startsWith from 'lodash.startswith'
import RemoteHandler from '../remote-handler'
import { Remotes } from '../models/remote'
import RemoteHandlerLocal from '../remote-handlers/local'
import RemoteHandlerNfs from '../remote-handlers/nfs'
import RemoteHandlerSmb from '../remote-handlers/smb'
import {
forEach
} from '../utils'
import {
NoSuchObject
} from '../api-errors'
import {
forEach,
mapToArray
} from '../utils'
Remotes
} from '../models/remote'
// ===================================================================
@ -29,33 +30,30 @@ export default class {
})
xo.on('start', async () => {
// TODO: Should it be private?
this.remoteHandler = new RemoteHandler()
await this.initRemotes()
await this.syncAllRemotes()
})
xo.on('stop', () => this.disableAllRemotes())
xo.on('stop', () => this.forgetAllRemotes())
}
_developRemote (remote) {
const _remote = { ...remote }
if (startsWith(_remote.url, 'file://')) {
_remote.type = 'local'
_remote.path = _remote.url.slice(6)
} else if (startsWith(_remote.url, 'nfs://')) {
_remote.type = 'nfs'
const url = _remote.url.slice(6)
const [host, share] = url.split(':')
_remote.path = '/tmp/xo-server/mounts/' + _remote.id
_remote.host = host
_remote.share = share
async getRemoteHandler (remote) {
if (typeof remote === 'string') {
remote = await this.getRemote(remote)
}
return _remote
const Handler = {
file: RemoteHandlerLocal,
smb: RemoteHandlerSmb,
nfs: RemoteHandlerNfs
}
const type = remote.url.split('://')[0]
if (!Handler[type]) {
throw new Error('Unhandled remote type')
}
return new Handler[type](remote)
}
async getAllRemotes () {
return mapToArray(await this._remotes.get(), this._developRemote)
return this._remotes.get()
}
async _getRemote (id) {
@ -68,7 +66,7 @@ export default class {
}
async getRemote (id) {
return this._developRemote((await this._getRemote(id)).properties)
return (await this._getRemote(id)).properties
}
async createRemote ({name, url}) {
@ -79,9 +77,10 @@ export default class {
async updateRemote (id, {name, url, enabled, error}) {
const remote = await this._getRemote(id)
this._updateRemote(remote, {name, url, enabled, error})
const props = await this.remoteHandler.sync(this._developRemote(remote.properties))
const handler = await this.getRemoteHandler(remote.properties)
const props = await handler.sync()
this._updateRemote(remote, props)
return await this._developRemote(this._remotes.save(remote).properties)
return (await this._remotes.save(remote)).properties
}
_updateRemote (remote, {name, url, enabled, error}) {
@ -96,8 +95,8 @@ export default class {
}
async removeRemote (id) {
const remote = await this.getRemote(id)
await this.remoteHandler.forget(remote)
const handler = await this.getRemoteHandler(id)
await handler.forget()
await this._remotes.remove(id)
}
@ -110,9 +109,13 @@ export default class {
}
// TODO: Should it be private?
async disableAllRemotes () {
async forgetAllRemotes () {
const remotes = await this.getAllRemotes()
this.remoteHandler.disableAll(remotes)
for (let remote of remotes) {
try {
(await this.getRemoteHandler(remote)).forget()
} catch (_) {}
}
}
// TODO: Should it be private?