Remote handlers refactored, and adding a smb handler
This commit is contained in:
parent
6c2f48181c
commit
aad4ebf287
@ -34,6 +34,7 @@
|
||||
"node": ">=0.12 <5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@marsaud/smb2-promise": "^0.1.0",
|
||||
"app-conf": "^0.4.0",
|
||||
"babel-runtime": "^5",
|
||||
"base64url": "^1.0.5",
|
||||
@ -96,6 +97,7 @@
|
||||
"lodash.sortby": "^3.1.4",
|
||||
"lodash.startswith": "^3.0.1",
|
||||
"loud-rejection": "^1.2.0",
|
||||
"lodash.trim": "^3.0.1",
|
||||
"make-error": "^1",
|
||||
"micromatch": "^2.3.2",
|
||||
"minimist": "^1.2.0",
|
||||
|
@ -493,6 +493,11 @@ exports.snapshot = snapshot
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
rollingDeltaBackup = $coroutine ({vm, remote, tag, depth}) ->
|
||||
_remote = yield @getRemote remote
|
||||
if not _remote?.path?
|
||||
throw new Error "No such Remote #{remote}"
|
||||
if not _remote.enabled
|
||||
throw new Error "Backup remote #{remote} is disabled"
|
||||
return yield @rollingDeltaVmBackup({
|
||||
vm,
|
||||
remoteId: remote,
|
||||
@ -572,12 +577,18 @@ exports.rollingSnapshot = rollingSnapshot
|
||||
|
||||
#---------------------------------------------------------------------
|
||||
|
||||
backup = $coroutine ({vm, pathToFile, compress, onlyMetadata}) ->
|
||||
yield @backupVm({vm, pathToFile, compress, onlyMetadata})
|
||||
backup = $coroutine ({vm, remoteId, file, compress, onlyMetadata}) ->
|
||||
remote = yield @getRemote remoteId
|
||||
if not remote?.path?
|
||||
throw new Error "No such Remote #{remoteId}"
|
||||
if not remote.enabled
|
||||
throw new Error "Backup remote #{remoteId} is disabled"
|
||||
yield @backupVm({vm, remoteId, file, compress, onlyMetadata})
|
||||
|
||||
backup.params = {
|
||||
id: { type: 'string' }
|
||||
pathToFile: { type: 'string' }
|
||||
id: {type: 'string'}
|
||||
remoteId: { type: 'string' }
|
||||
file: { type: 'string' }
|
||||
compress: { type: 'boolean', optional: true }
|
||||
onlyMetadata: { type: 'boolean', optional: true }
|
||||
}
|
||||
@ -622,7 +633,7 @@ rollingBackup = $coroutine ({vm, remoteId, tag, depth, compress, onlyMetadata})
|
||||
throw new Error "Backup remote #{remoteId} is disabled"
|
||||
return yield @rollingBackupVm({
|
||||
vm,
|
||||
path: remote.path,
|
||||
remoteId,
|
||||
tag,
|
||||
depth,
|
||||
compress,
|
||||
|
@ -1,140 +0,0 @@
|
||||
import filter from 'lodash.filter'
|
||||
import fs from 'fs-promise'
|
||||
import {exec} from 'child_process'
|
||||
|
||||
import {
|
||||
forEach,
|
||||
noop,
|
||||
promisify
|
||||
} from './utils'
|
||||
|
||||
const execAsync = promisify(exec)
|
||||
|
||||
class NfsMounter {
|
||||
async _loadRealMounts () {
|
||||
let stdout
|
||||
try {
|
||||
[stdout] = await execAsync('findmnt -P -t nfs,nfs4 --output SOURCE,TARGET --noheadings')
|
||||
} catch (exc) {
|
||||
// When no mounts are found, the call pretends to fail...
|
||||
}
|
||||
const mounted = {}
|
||||
if (stdout) {
|
||||
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
|
||||
forEach(stdout.split('\n'), m => {
|
||||
if (m) {
|
||||
const match = regex.exec(m)
|
||||
mounted[match[3]] = {
|
||||
host: match[1],
|
||||
share: match[2]
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
this._realMounts = mounted
|
||||
return mounted
|
||||
}
|
||||
|
||||
_fullPath (path) {
|
||||
return path
|
||||
}
|
||||
|
||||
_matchesRealMount (mount) {
|
||||
return this._fullPath(mount.path) in this._realMounts
|
||||
}
|
||||
|
||||
async _mount (mount) {
|
||||
const path = this._fullPath(mount.path)
|
||||
await fs.ensureDir(path)
|
||||
return await execAsync(`mount -t nfs ${mount.host}:${mount.share} ${path}`)
|
||||
}
|
||||
|
||||
async forget (mount) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
|
||||
async _umount (mount) {
|
||||
const path = this._fullPath(mount.path)
|
||||
await execAsync(`umount ${path}`)
|
||||
}
|
||||
|
||||
async sync (mount) {
|
||||
await this._loadRealMounts()
|
||||
if (this._matchesRealMount(mount) && !mount.enabled) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (exc) {
|
||||
mount.enabled = true
|
||||
mount.error = exc.message
|
||||
}
|
||||
} else if (!this._matchesRealMount(mount) && mount.enabled) {
|
||||
try {
|
||||
await this._mount(mount)
|
||||
} catch (exc) {
|
||||
mount.enabled = false
|
||||
mount.error = exc.message
|
||||
}
|
||||
}
|
||||
return mount
|
||||
}
|
||||
|
||||
async disableAll (mounts) {
|
||||
await this._loadRealMounts()
|
||||
forEach(mounts, async mount => {
|
||||
if (this._matchesRealMount(mount)) {
|
||||
try {
|
||||
await this._umount(mount)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
class LocalHandler {
|
||||
constructor () {
|
||||
this.forget = noop
|
||||
this.disableAll = noop
|
||||
}
|
||||
|
||||
async sync (local) {
|
||||
if (local.enabled) {
|
||||
try {
|
||||
await fs.ensureDir(local.path)
|
||||
await fs.access(local.path, fs.R_OK | fs.W_OK)
|
||||
} catch (exc) {
|
||||
local.enabled = false
|
||||
local.error = exc.message
|
||||
}
|
||||
}
|
||||
return local
|
||||
}
|
||||
}
|
||||
|
||||
export default class RemoteHandler {
|
||||
constructor () {
|
||||
this.handlers = {
|
||||
nfs: new NfsMounter(),
|
||||
local: new LocalHandler()
|
||||
}
|
||||
}
|
||||
|
||||
async sync (remote) {
|
||||
return await this.handlers[remote.type].sync(remote)
|
||||
}
|
||||
|
||||
async forget (remote) {
|
||||
return await this.handlers[remote.type].forget(remote)
|
||||
}
|
||||
|
||||
async disableAll (remotes) {
|
||||
const promises = []
|
||||
forEach(['local', 'nfs'], type => promises.push(this.handlers[type].disableAll(filter(remotes, remote => remote.type === type))))
|
||||
await Promise.all(promises)
|
||||
}
|
||||
}
|
49
src/remote-handlers/abstract.js
Normal file
49
src/remote-handlers/abstract.js
Normal file
@ -0,0 +1,49 @@
|
||||
export default class RemoteHandlerAbstract {
|
||||
constructor (remote) {
|
||||
this._remote = remote
|
||||
}
|
||||
|
||||
set (remote) {
|
||||
this._remote = remote
|
||||
}
|
||||
|
||||
async sync () {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async forget () {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async outputFile (file, data, options) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async readFile (file, options) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async rename (oldPath, newPath) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async list (dir = undefined) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async createReadStream (file) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async createOutputStream (file) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async unlink (file) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async getSize (file) {
|
||||
throw new Error('Not implement')
|
||||
}
|
||||
}
|
70
src/remote-handlers/local.js
Normal file
70
src/remote-handlers/local.js
Normal file
@ -0,0 +1,70 @@
|
||||
import fs from 'fs-promise'
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import {dirname} from 'path'
|
||||
import {noop} from '../utils'
|
||||
|
||||
export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
constructor (remote) {
|
||||
super(remote)
|
||||
this.forget = noop
|
||||
}
|
||||
|
||||
_getFilePath (file) {
|
||||
const parts = [this._remote.path]
|
||||
if (file) {
|
||||
parts.push(file)
|
||||
}
|
||||
return parts.join('/')
|
||||
}
|
||||
|
||||
async sync () {
|
||||
if (this._remote.enabled) {
|
||||
try {
|
||||
await fs.ensureDir(this._remote.path)
|
||||
await fs.access(this._remote.path, fs.R_OK | fs.W_OK)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async outputFile (file, data, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
await fs.writeFile(path, data, options)
|
||||
}
|
||||
|
||||
async readFile (file, options) {
|
||||
return await fs.readFile(this._getFilePath(file), options)
|
||||
}
|
||||
|
||||
async rename (oldPath, newPath) {
|
||||
return await fs.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
|
||||
}
|
||||
|
||||
async list (dir = undefined) {
|
||||
return await fs.readdir(this._getFilePath(dir))
|
||||
}
|
||||
|
||||
async createReadStream (file) {
|
||||
return fs.createReadStream(this._getFilePath(file))
|
||||
}
|
||||
|
||||
async createOutputStream (file, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
return fs.createWriteStream(path, options)
|
||||
}
|
||||
|
||||
async unlink (file) {
|
||||
return fs.unlink(this._getFilePath(file))
|
||||
}
|
||||
|
||||
async getSize (file) {
|
||||
const stats = await fs.stat(this._getFilePath(file))
|
||||
return stats.size
|
||||
}
|
||||
|
||||
}
|
119
src/remote-handlers/nfs.js
Normal file
119
src/remote-handlers/nfs.js
Normal file
@ -0,0 +1,119 @@
|
||||
import fs from 'fs-promise'
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import {dirname} from 'path'
|
||||
import {exec} from 'child_process'
|
||||
import {forEach, promisify} from '../utils'
|
||||
|
||||
const execAsync = promisify(exec)
|
||||
|
||||
export default class NfsHandler extends RemoteHandlerAbstract {
|
||||
_getFilePath (file) {
|
||||
const parts = [this._remote.path]
|
||||
if (file) {
|
||||
parts.push(file)
|
||||
}
|
||||
return parts.join('/')
|
||||
}
|
||||
|
||||
async _loadRealMounts () {
|
||||
let stdout
|
||||
try {
|
||||
[stdout] = await execAsync('findmnt -P -t nfs,nfs4 --output SOURCE,TARGET --noheadings')
|
||||
} catch (exc) {
|
||||
// When no mounts are found, the call pretends to fail...
|
||||
}
|
||||
const mounted = {}
|
||||
if (stdout) {
|
||||
const regex = /^SOURCE="([^:]*):(.*)" TARGET="(.*)"$/
|
||||
forEach(stdout.split('\n'), m => {
|
||||
if (m) {
|
||||
const match = regex.exec(m)
|
||||
mounted[match[3]] = {
|
||||
host: match[1],
|
||||
share: match[2]
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
this._realMounts = mounted
|
||||
return mounted
|
||||
}
|
||||
|
||||
_matchesRealMount (remote) {
|
||||
return remote.path in this._realMounts
|
||||
}
|
||||
|
||||
async _mount (remote) {
|
||||
await fs.ensureDir(remote.path)
|
||||
return await execAsync(`mount -t nfs ${remote.host}:${remote.share} ${remote.path}`)
|
||||
}
|
||||
|
||||
async sync () {
|
||||
await this._loadRealMounts()
|
||||
if (this._matchesRealMount(this._remote) && !this._remote.enabled) {
|
||||
try {
|
||||
await this._umount(this._remote)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = true
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
} else if (!this._matchesRealMount(this._remote) && this._remote.enabled) {
|
||||
try {
|
||||
await this._mount(this._remote)
|
||||
} catch (exc) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = exc.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async forget () {
|
||||
try {
|
||||
await this._umount(this._remote)
|
||||
} catch (_) {
|
||||
// We have to go on...
|
||||
}
|
||||
}
|
||||
|
||||
async _umount (remote) {
|
||||
await execAsync(`umount ${remote.path}`)
|
||||
}
|
||||
|
||||
async outputFile (file, data, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
await fs.writeFile(path, data, options)
|
||||
}
|
||||
|
||||
async readFile (file, options) {
|
||||
return await fs.readFile(this._getFilePath(file), options)
|
||||
}
|
||||
|
||||
async rename (oldPath, newPath) {
|
||||
return await fs.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
|
||||
}
|
||||
|
||||
async list (dir = undefined) {
|
||||
return await fs.readdir(this._getFilePath(dir))
|
||||
}
|
||||
|
||||
async createReadStream (file) {
|
||||
return fs.createReadStream(this._getFilePath(file))
|
||||
}
|
||||
|
||||
async createOutputStream (file, options) {
|
||||
const path = this._getFilePath(file)
|
||||
await fs.ensureDir(dirname(path))
|
||||
return fs.createWriteStream(path, options)
|
||||
}
|
||||
|
||||
async unlink (file) {
|
||||
return fs.unlink(this._getFilePath(file))
|
||||
}
|
||||
|
||||
async getSize (file) {
|
||||
const stats = await fs.stat(this._getFilePath(file))
|
||||
return stats.size
|
||||
}
|
||||
}
|
134
src/remote-handlers/smb.js
Normal file
134
src/remote-handlers/smb.js
Normal file
@ -0,0 +1,134 @@
|
||||
import Smb2 from '@marsaud/smb2-promise'
|
||||
import {noop} from '../utils'
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
|
||||
export default class SmbHandler extends RemoteHandlerAbstract {
|
||||
constructor (remote) {
|
||||
super(remote)
|
||||
this.forget = noop
|
||||
}
|
||||
|
||||
_getClient (remote) {
|
||||
return new Smb2({
|
||||
share: `\\\\${remote.host}`,
|
||||
domain: remote.domain,
|
||||
username: remote.username,
|
||||
password: remote.password,
|
||||
autoCloseTimeout: 0
|
||||
})
|
||||
}
|
||||
|
||||
_getFilePath (file) {
|
||||
const parts = []
|
||||
if (this._remote.path !== '') {
|
||||
parts.push(this._remote.path)
|
||||
}
|
||||
if (file) {
|
||||
parts.push(file.split('/'))
|
||||
}
|
||||
return parts.join('\\')
|
||||
}
|
||||
|
||||
_getDirname (file) {
|
||||
const parts = file.split('\\')
|
||||
parts.pop()
|
||||
return parts.join('\\')
|
||||
}
|
||||
|
||||
async sync () {
|
||||
if (this._remote.enabled) {
|
||||
try {
|
||||
// Check access (smb2 does not expose connect in public so far...)
|
||||
await this.list()
|
||||
} catch (error) {
|
||||
this._remote.enabled = false
|
||||
this._remote.error = error.message
|
||||
}
|
||||
}
|
||||
return this._remote
|
||||
}
|
||||
|
||||
async outputFile (file, data, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
const path = this._getFilePath(file)
|
||||
const dir = this._getDirname(path)
|
||||
try {
|
||||
if (dir) {
|
||||
await client.ensureDir(dir)
|
||||
}
|
||||
return await client.writeFile(path, data, options)
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async readFile (file, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return await client.readFile(this._getFilePath(file), options)
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async rename (oldPath, newPath) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return await client.rename(this._getFilePath(oldPath), this._getFilePath(newPath))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async list (dir = undefined) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return await client.readdir(this._getFilePath(dir))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async createReadStream (file) {
|
||||
const client = this._getClient(this._remote)
|
||||
const stream = await client.createReadStream(this._getFilePath(file))
|
||||
stream.on('end', () => client.close())
|
||||
return stream
|
||||
}
|
||||
|
||||
async createOutputStream (file, options) {
|
||||
const client = this._getClient(this._remote)
|
||||
const path = this._getFilePath(file)
|
||||
const dir = this._getDirname(path)
|
||||
let stream
|
||||
try {
|
||||
if (dir) {
|
||||
await client.ensureDir(dir)
|
||||
}
|
||||
stream = await client.createWriteStream(path, options/* , { flags: 'wx' }*/) // TODO ensure that wx flag is properly handled by @marsaud/smb2
|
||||
} catch (err) {
|
||||
client.close()
|
||||
throw err
|
||||
}
|
||||
stream.on('finish', () => client.close())
|
||||
return stream
|
||||
}
|
||||
|
||||
async unlink (file) {
|
||||
const client = this._getClient(this._remote)
|
||||
try {
|
||||
return await client.unlink(this._getFilePath(file))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
|
||||
async getSize (file) {
|
||||
const client = await this._getClient(this._remote)
|
||||
try {
|
||||
return await client.getSize(this._getFilePath(file))
|
||||
} finally {
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
}
|
@ -6,17 +6,6 @@ import filter from 'lodash.filter'
|
||||
import findIndex from 'lodash.findindex'
|
||||
import sortBy from 'lodash.sortby'
|
||||
import startsWith from 'lodash.startswith'
|
||||
import {
|
||||
createReadStream,
|
||||
createWriteStream,
|
||||
ensureDir,
|
||||
readdir,
|
||||
readFile,
|
||||
rename,
|
||||
stat,
|
||||
unlink,
|
||||
writeFile
|
||||
} from 'fs-promise'
|
||||
import {
|
||||
basename,
|
||||
dirname
|
||||
@ -60,19 +49,19 @@ export default class {
|
||||
|
||||
async listRemoteBackups (remoteId) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = remote.path
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
|
||||
// List backups. (Except delta backups)
|
||||
const xvaFilter = file => endsWith(file, '.xva')
|
||||
|
||||
const files = await readdir(path)
|
||||
const files = await handler.list()
|
||||
const backups = filter(files, xvaFilter)
|
||||
|
||||
// List delta backups.
|
||||
const deltaDirs = filter(files, file => startsWith(file, 'vm_delta_'))
|
||||
|
||||
for (const deltaDir of deltaDirs) {
|
||||
const files = await readdir(`${path}/${deltaDir}`)
|
||||
const files = await handler.list(deltaDir)
|
||||
const deltaBackups = filter(files, xvaFilter)
|
||||
|
||||
backups.push(...mapToArray(
|
||||
@ -84,11 +73,12 @@ export default class {
|
||||
return backups
|
||||
}
|
||||
|
||||
// TODO: move into utils and rename!
|
||||
async _openAndwaitReadableFile (path, errorMessage) {
|
||||
const stream = createReadStream(path)
|
||||
|
||||
// TODO: move into utils and rename! NO, until we may pass a handler instead of a remote...?
|
||||
async _openAndwaitReadableFile (remote, file, errorMessage) {
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
let stream
|
||||
try {
|
||||
stream = await handler.createReadStream(file)
|
||||
await eventToPromise(stream, 'readable')
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
@ -97,16 +87,15 @@ export default class {
|
||||
throw error
|
||||
}
|
||||
|
||||
stream.length = (await stat(path)).size
|
||||
|
||||
stream.length = await handler.getSize(file)
|
||||
return stream
|
||||
}
|
||||
|
||||
async importVmBackup (remoteId, file, sr) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = `${remote.path}/${file}`
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
path,
|
||||
remote,
|
||||
file,
|
||||
'VM to import not found in this remote'
|
||||
)
|
||||
|
||||
@ -179,39 +168,50 @@ export default class {
|
||||
|
||||
// TODO: The other backup methods must use this function !
|
||||
// Prerequisite: The backups array must be ordered. (old to new backups)
|
||||
async _removeOldBackups (backups, path, n) {
|
||||
async _removeOldBackups (backups, remote, dir, n) {
|
||||
if (n <= 0) {
|
||||
return
|
||||
}
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
const getPath = (file, dir) => dir ? `${dir}/${file}` : file
|
||||
|
||||
await Promise.all(
|
||||
mapToArray(backups.slice(0, n), backup => unlink(`${path}/${backup}`))
|
||||
mapToArray(backups.slice(0, n), async backup => await handler.unlink(getPath(backup, dir)))
|
||||
)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async _listVdiBackups (path) {
|
||||
const files = await readdir(path)
|
||||
async _listVdiBackups (remote, dir) {
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
let files
|
||||
try {
|
||||
files = await handler.list(dir)
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
files = []
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
const backups = sortBy(filter(files, fileName => isVdiBackup(fileName)))
|
||||
let i
|
||||
|
||||
// Avoid unstable state: No full vdi found to the beginning of array. (base)
|
||||
for (i = 0; i < backups.length && isDeltaVdiBackup(backups[i]); i++);
|
||||
await this._removeOldBackups(backups, path, i)
|
||||
await this._removeOldBackups(backups, remote, dir, i)
|
||||
|
||||
return backups.slice(i)
|
||||
}
|
||||
|
||||
async _deltaVdiBackup ({vdi, path, depth}) {
|
||||
async _deltaVdiBackup ({vdi, remote, dir, depth}) {
|
||||
const xapi = this._xo.getXapi(vdi)
|
||||
const backupDirectory = `vdi_${vdi.uuid}`
|
||||
|
||||
vdi = xapi.getObject(vdi._xapiId)
|
||||
path = `${path}/${backupDirectory}`
|
||||
await ensureDir(path)
|
||||
dir = `${dir}/${backupDirectory}`
|
||||
|
||||
const backups = await this._listVdiBackups(path)
|
||||
const backups = await this._listVdiBackups(remote, dir)
|
||||
|
||||
// Make snapshot.
|
||||
const date = safeDateFormat(new Date())
|
||||
@ -234,15 +234,16 @@ export default class {
|
||||
|
||||
// Export full or delta backup.
|
||||
const vdiFilename = `${date}_${isFull ? 'full' : 'delta'}.vhd`
|
||||
const backupFullPath = `${path}/${vdiFilename}`
|
||||
const backupFullPath = `${dir}/${vdiFilename}`
|
||||
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
try {
|
||||
const sourceStream = await xapi.exportVdi(currentSnapshot.$id, {
|
||||
baseId: isFull ? undefined : base.$id,
|
||||
format: VDI_FORMAT_VHD
|
||||
})
|
||||
|
||||
const targetStream = createWriteStream(backupFullPath, { flags: 'wx' })
|
||||
const targetStream = await handler.createOutputStream(backupFullPath, { flags: 'wx' })
|
||||
|
||||
sourceStream.on('error', error => targetStream.emit('error', error))
|
||||
await Promise.all([
|
||||
@ -252,8 +253,7 @@ export default class {
|
||||
} catch (error) {
|
||||
// Remove new backup. (corrupt) and delete new vdi base.
|
||||
xapi.deleteVdi(currentSnapshot.$id).catch(noop)
|
||||
await unlink(backupFullPath).catch(noop)
|
||||
|
||||
await handler.unlink(backupFullPath).catch(noop)
|
||||
throw error
|
||||
}
|
||||
|
||||
@ -266,8 +266,8 @@ export default class {
|
||||
}
|
||||
}
|
||||
|
||||
async _mergeDeltaVdiBackups ({path, depth}) {
|
||||
const backups = await this._listVdiBackups(path)
|
||||
async _mergeDeltaVdiBackups ({remote, dir, depth}) {
|
||||
const backups = await this._listVdiBackups(remote, dir)
|
||||
let i = backups.length - depth
|
||||
|
||||
// No merge.
|
||||
@ -278,37 +278,39 @@ export default class {
|
||||
const newFull = `${getVdiTimestamp(backups[i])}_full.vhd`
|
||||
const vhdUtil = `${__dirname}/../../bin/vhd-util`
|
||||
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) {
|
||||
const backup = `${path}/${backups[i]}`
|
||||
const parent = `${path}/${backups[i - 1]}`
|
||||
const backup = `${dir}/${backups[i]}`
|
||||
const parent = `${dir}/${backups[i - 1]}`
|
||||
|
||||
try {
|
||||
await execa(vhdUtil, ['modify', '-n', backup, '-p', parent])
|
||||
await execa(vhdUtil, ['coalesce', '-n', backup])
|
||||
await execa(vhdUtil, ['modify', '-n', `${remote.path}/${backup}`, '-p', `${remote.path}/${parent}`]) // FIXME not ok at least with smb remotes
|
||||
await execa(vhdUtil, ['coalesce', '-n', `${remote.path}/${backup}`]) // FIXME not ok at least with smb remotes
|
||||
} catch (e) {
|
||||
console.error('Unable to use vhd-util.', e)
|
||||
throw e
|
||||
}
|
||||
|
||||
await unlink(backup)
|
||||
await handler.unlink(backup)
|
||||
}
|
||||
|
||||
// The base was removed, it exists two full backups or more ?
|
||||
// => Remove old backups before the most recent full.
|
||||
if (i > 0) {
|
||||
for (i--; i >= 0; i--) {
|
||||
await unlink(`${path}/${backups[i]}`)
|
||||
await remote.unlink(`${dir}/${backups[i]}`)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Rename the first old full backup to the new full backup.
|
||||
await rename(`${path}/${backups[0]}`, `${path}/${newFull}`)
|
||||
await handler.rename(`${dir}/${backups[0]}`, `${dir}/${newFull}`)
|
||||
}
|
||||
|
||||
async _importVdiBackupContent (xapi, file, vdiId) {
|
||||
async _importVdiBackupContent (xapi, remote, file, vdiId) {
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
remote,
|
||||
file,
|
||||
'VDI to import not found in this remote'
|
||||
)
|
||||
@ -320,10 +322,10 @@ export default class {
|
||||
|
||||
async importDeltaVdiBackup ({vdi, remoteId, filePath}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const path = dirname(`${remote.path}/${filePath}`)
|
||||
|
||||
const dir = dirname(filePath)
|
||||
const filename = basename(filePath)
|
||||
const backups = await this._listVdiBackups(path)
|
||||
const backups = await this._listVdiBackups(remote, dir)
|
||||
|
||||
// Search file. (delta or full backup)
|
||||
const i = findIndex(backups, backup =>
|
||||
@ -347,34 +349,33 @@ export default class {
|
||||
const xapi = this._xo.getXapi(vdi)
|
||||
|
||||
for (; j <= i; j++) {
|
||||
await this._importVdiBackupContent(xapi, `${path}/${backups[j]}`, vdi._xapiId)
|
||||
await this._importVdiBackupContent(xapi, remote, `${dir}/${backups[j]}`, vdi._xapiId)
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async _listDeltaVmBackups (path) {
|
||||
const files = await readdir(path)
|
||||
async _listDeltaVmBackups (remote, dir) {
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
const files = await handler.list(dir)
|
||||
return await sortBy(filter(files, (fileName) => /^\d+T\d+Z_.*\.(?:xva|json)$/.test(fileName)))
|
||||
}
|
||||
|
||||
async _failedRollingDeltaVmBackup (xapi, path, fulFilledVdiBackups) {
|
||||
async _failedRollingDeltaVmBackup (xapi, remote, dir, fulFilledVdiBackups) {
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
await Promise.all(
|
||||
mapToArray(fulFilledVdiBackups, async vdiBackup => {
|
||||
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
|
||||
|
||||
await xapi.deleteVdi(newBaseId)
|
||||
await unlink(`${path}/${backupDirectory}/${vdiFilename}`).catch(noop)
|
||||
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`).catch(noop)
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
async rollingDeltaVmBackup ({vm, remoteId, tag, depth}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const directory = `vm_delta_${tag}_${vm.uuid}`
|
||||
const path = `${remote.path}/${directory}`
|
||||
|
||||
await ensureDir(path)
|
||||
const dir = `vm_delta_${tag}_${vm.uuid}`
|
||||
|
||||
const info = {
|
||||
vbds: [],
|
||||
@ -408,7 +409,7 @@ export default class {
|
||||
if (!info.vdis[vdiUUID]) {
|
||||
info.vdis[vdiUUID] = { ...vdi }
|
||||
promises.push(
|
||||
this._deltaVdiBackup({vdi: vdiXo, path, depth}).then(
|
||||
this._deltaVdiBackup({remote, vdi: vdiXo, dir, depth}).then(
|
||||
vdiBackup => {
|
||||
const { backupDirectory, vdiFilename } = vdiBackup
|
||||
info.vdis[vdiUUID].xoPath = `${backupDirectory}/${vdiFilename}`
|
||||
@ -435,29 +436,31 @@ export default class {
|
||||
}
|
||||
|
||||
if (fail) {
|
||||
console.error(`Remove successful backups in ${path}`, fulFilledVdiBackups)
|
||||
await this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
|
||||
console.error(`Remove successful backups in ${remote.path}/${dir}`, fulFilledVdiBackups)
|
||||
await this._failedRollingDeltaVmBackup(xapi, remote, dir, fulFilledVdiBackups)
|
||||
|
||||
throw new Error('Rolling delta vm backup failed.')
|
||||
}
|
||||
|
||||
const backups = await this._listDeltaVmBackups(path)
|
||||
const backups = await this._listDeltaVmBackups(remote, dir)
|
||||
const date = safeDateFormat(new Date())
|
||||
const backupFormat = `${date}_${vm.name_label}`
|
||||
|
||||
const xvaPath = `${path}/${backupFormat}.xva`
|
||||
const infoPath = `${path}/${backupFormat}.json`
|
||||
const xvaPath = `${dir}/${backupFormat}.xva`
|
||||
const infoPath = `${dir}/${backupFormat}.json`
|
||||
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
|
||||
try {
|
||||
await Promise.all([
|
||||
this.backupVm({vm, pathToFile: xvaPath, onlyMetadata: true}),
|
||||
writeFile(infoPath, JSON.stringify(info), {flag: 'wx'})
|
||||
this.backupVm({vm, remoteId, file: xvaPath, onlyMetadata: true}),
|
||||
handler.outputFile(infoPath, JSON.stringify(info), {flag: 'wx'})
|
||||
])
|
||||
} catch (e) {
|
||||
await Promise.all([
|
||||
unlink(xvaPath).catch(noop),
|
||||
unlink(infoPath).catch(noop),
|
||||
this._failedRollingDeltaVmBackup(xapi, path, fulFilledVdiBackups)
|
||||
handler.unlink(xvaPath).catch(noop),
|
||||
handler.unlink(infoPath).catch(noop),
|
||||
this._failedRollingDeltaVmBackup(xapi, remote, dir, fulFilledVdiBackups)
|
||||
])
|
||||
|
||||
throw e
|
||||
@ -467,12 +470,12 @@ export default class {
|
||||
await Promise.all(
|
||||
mapToArray(vdiBackups, vdiBackup => {
|
||||
const { backupDirectory } = vdiBackup.value()
|
||||
return this._mergeDeltaVdiBackups({path: `${path}/${backupDirectory}`, depth})
|
||||
return this._mergeDeltaVdiBackups({remote, dir: `${dir}/${backupDirectory}`, depth})
|
||||
})
|
||||
)
|
||||
|
||||
// Remove x2 files : json AND xva files.
|
||||
await this._removeOldBackups(backups, path, backups.length - (depth - 1) * 2)
|
||||
await this._removeOldBackups(backups, remote, dir, backups.length - (depth - 1) * 2)
|
||||
|
||||
// Remove old vdi bases.
|
||||
Promise.all(
|
||||
@ -486,11 +489,12 @@ export default class {
|
||||
).catch(noop)
|
||||
|
||||
// Returns relative path.
|
||||
return `${directory}/${backupFormat}`
|
||||
return `${dir}/${backupFormat}`
|
||||
}
|
||||
|
||||
async _importVmMetadata (xapi, file) {
|
||||
async _importVmMetadata (xapi, remote, file) {
|
||||
const stream = await this._openAndwaitReadableFile(
|
||||
remote,
|
||||
file,
|
||||
'VM metadata to import not found in this remote'
|
||||
)
|
||||
@ -512,11 +516,10 @@ export default class {
|
||||
|
||||
async importDeltaVmBackup ({sr, remoteId, filePath}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const fullBackupPath = `${remote.path}/${filePath}`
|
||||
const xapi = this._xo.getXapi(sr)
|
||||
|
||||
// Import vm metadata.
|
||||
const vm = await this._importVmMetadata(xapi, `${fullBackupPath}.xva`)
|
||||
const vm = await this._importVmMetadata(xapi, remote, `${filePath}.xva`)
|
||||
const vmName = vm.name_label
|
||||
|
||||
// Disable start and change the VM name label during import.
|
||||
@ -529,7 +532,8 @@ export default class {
|
||||
// Because XenServer creates Vbds linked to the vdis of the backup vm if it exists.
|
||||
await xapi.destroyVbdsFromVm(vm.uuid)
|
||||
|
||||
const info = JSON.parse(await readFile(`${fullBackupPath}.json`))
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
const info = JSON.parse(await handler.readFile(`${filePath}.json`))
|
||||
|
||||
// Import VDIs.
|
||||
const vdiIds = {}
|
||||
@ -565,8 +569,10 @@ export default class {
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
async backupVm ({vm, pathToFile, compress, onlyMetadata}) {
|
||||
const targetStream = createWriteStream(pathToFile, { flags: 'wx' })
|
||||
async backupVm ({vm, remoteId, file, compress, onlyMetadata}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
const targetStream = await handler.createOutputStream(file, { flags: 'wx' })
|
||||
const promise = eventToPromise(targetStream, 'finish')
|
||||
|
||||
const sourceStream = await this._xo.getXapi(vm).exportVm(vm._xapiId, {
|
||||
@ -578,26 +584,19 @@ export default class {
|
||||
await promise
|
||||
}
|
||||
|
||||
async rollingBackupVm ({vm, path, tag, depth, compress, onlyMetadata}) {
|
||||
await ensureDir(path)
|
||||
const files = await readdir(path)
|
||||
async rollingBackupVm ({vm, remoteId, tag, depth, compress, onlyMetadata}) {
|
||||
const remote = await this._xo.getRemote(remoteId)
|
||||
const handler = this._xo.getRemoteHandler(remote)
|
||||
const files = await handler.list()
|
||||
|
||||
const reg = new RegExp('^[^_]+_' + escapeStringRegexp(`${tag}_${vm.name_label}.xva`))
|
||||
const backups = sortBy(filter(files, (fileName) => reg.test(fileName)))
|
||||
|
||||
const date = safeDateFormat(new Date())
|
||||
const backupFullPath = `${path}/${date}_${tag}_${vm.name_label}.xva`
|
||||
const file = `${date}_${tag}_${vm.name_label}.xva`
|
||||
|
||||
await this.backupVm({vm, pathToFile: backupFullPath, compress, onlyMetadata})
|
||||
|
||||
const promises = []
|
||||
for (let surplus = backups.length - (depth - 1); surplus > 0; surplus--) {
|
||||
const oldBackup = backups.shift()
|
||||
promises.push(unlink(`${path}/${oldBackup}`))
|
||||
}
|
||||
await Promise.all(promises)
|
||||
|
||||
return backupFullPath
|
||||
await this.backupVm({vm, remoteId, file, compress, onlyMetadata})
|
||||
await this._removeOldBackups(backups, remote, undefined, backups.length - (depth - 1))
|
||||
}
|
||||
|
||||
async rollingSnapshotVm (vm, tag, depth) {
|
||||
|
@ -1,6 +1,8 @@
|
||||
import startsWith from 'lodash.startswith'
|
||||
|
||||
import RemoteHandler from '../remote-handler'
|
||||
import RemoteHandlerLocal from '../remote-handlers/local'
|
||||
import RemoteHandlerNfs from '../remote-handlers/nfs'
|
||||
import RemoteHandlerSmb from '../remote-handlers/smb'
|
||||
import { Remotes } from '../models/remote'
|
||||
import {
|
||||
NoSuchObject
|
||||
@ -30,12 +32,26 @@ export default class {
|
||||
|
||||
xo.on('start', async () => {
|
||||
// TODO: Should it be private?
|
||||
this.remoteHandler = new RemoteHandler()
|
||||
this._remoteHandlers = {}
|
||||
|
||||
await this.initRemotes()
|
||||
await this.syncAllRemotes()
|
||||
})
|
||||
xo.on('stop', () => this.disableAllRemotes())
|
||||
xo.on('stop', () => this.forgetAllRemotes())
|
||||
}
|
||||
|
||||
getRemoteHandler (remote) {
|
||||
if (!(remote.id in this._remoteHandlers)) {
|
||||
const handlers = {
|
||||
'local': RemoteHandlerLocal,
|
||||
'nfs': RemoteHandlerNfs,
|
||||
'smb': RemoteHandlerSmb
|
||||
}
|
||||
this._remoteHandlers[remote.id] = new handlers[remote.type](remote)
|
||||
}
|
||||
const handler = this._remoteHandlers[remote.id]
|
||||
handler.set(remote)
|
||||
return handler
|
||||
}
|
||||
|
||||
_developRemote (remote) {
|
||||
@ -50,6 +66,18 @@ export default class {
|
||||
_remote.path = '/tmp/xo-server/mounts/' + _remote.id
|
||||
_remote.host = host
|
||||
_remote.share = share
|
||||
} else if (startsWith(_remote.url, 'smb://')) {
|
||||
_remote.type = 'smb'
|
||||
const url = _remote.url.slice(6)
|
||||
const [auth, smb] = url.split('@')
|
||||
const [username, password] = auth.split(':')
|
||||
const [domain, sh] = smb.split('\\\\')
|
||||
const [host, path] = sh.split('\0')
|
||||
_remote.host = host
|
||||
_remote.path = path
|
||||
_remote.domain = domain
|
||||
_remote.username = username
|
||||
_remote.password = password
|
||||
}
|
||||
return _remote
|
||||
}
|
||||
@ -79,7 +107,8 @@ export default class {
|
||||
async updateRemote (id, {name, url, enabled, error}) {
|
||||
const remote = await this._getRemote(id)
|
||||
this._updateRemote(remote, {name, url, enabled, error})
|
||||
const props = await this.remoteHandler.sync(this._developRemote(remote.properties))
|
||||
const r = this._developRemote(remote.properties)
|
||||
const props = await this.getRemoteHandler(r).sync()
|
||||
this._updateRemote(remote, props)
|
||||
return await this._developRemote(this._remotes.save(remote).properties)
|
||||
}
|
||||
@ -97,7 +126,7 @@ export default class {
|
||||
|
||||
async removeRemote (id) {
|
||||
const remote = await this.getRemote(id)
|
||||
await this.remoteHandler.forget(remote)
|
||||
await this.getRemoteHandler(remote).forget()
|
||||
await this._remotes.remove(id)
|
||||
}
|
||||
|
||||
@ -110,9 +139,11 @@ export default class {
|
||||
}
|
||||
|
||||
// TODO: Should it be private?
|
||||
async disableAllRemotes () {
|
||||
async forgetAllRemotes () {
|
||||
const remotes = await this.getAllRemotes()
|
||||
this.remoteHandler.disableAll(remotes)
|
||||
for (let remote of remotes) {
|
||||
await this.getRemoteHandler(remote).forget()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Should it be private?
|
||||
|
Loading…
Reference in New Issue
Block a user