Add checksum verification for delta backup on restore/merge. (fix vatesfr/xo-web#617)

This commit is contained in:
wescoeur 2016-01-20 10:19:31 +01:00
parent 2c5858c2e0
commit 354692fb06
5 changed files with 210 additions and 28 deletions

View File

@ -87,6 +87,7 @@
"lodash.get": "^3.7.0",
"lodash.has": "^3.0.0",
"lodash.includes": "^3.1.1",
"lodash.invert": "^4.0.1",
"lodash.isarray": "^3.0.0",
"lodash.isboolean": "^3.0.2",
"lodash.isempty": "^3.0.0",
@ -110,12 +111,14 @@
"partial-stream": "0.0.0",
"passport": "^0.3.0",
"passport-local": "^1.0.0",
"promise-toolbox": "^0.1.0",
"proxy-http-request": "0.1.0",
"redis": "^2.0.1",
"schema-inspector": "^1.5.1",
"semver": "^5.1.0",
"serve-static": "^1.9.2",
"stack-chain": "^1.3.3",
"through2": "^2.0.0",
"trace": "^2.0.1",
"ws": "~0.8.0",
"xen-api": "^0.7.2",

View File

@ -1,11 +1,16 @@
import eventToPromise from 'event-to-promise'
import getStream from 'get-stream'
import through2 from 'through2'
import {
parse
} from 'xo-remote-parser'
import { noop } from '../utils'
import {
addChecksumToReadStream,
noop,
validChecksumOfReadStream
} from '../utils'
export default class RemoteHandlerAbstract {
constructor (remote) {
@ -76,16 +81,40 @@ export default class RemoteHandlerAbstract {
throw new Error('Not implemented')
}
async createReadStream (file, options) {
const stream = await this._createReadStream(file)
async createReadStream (file, {
checksum = false,
ignoreMissingChecksum = false,
...options
} = {}) {
const streamP = this._createReadStream(file, options).then(async stream => {
await eventToPromise(stream, 'readable')
await Promise.all([
stream.length === undefined
? this.getSize(file).then(value => stream.length = value).catch(noop)
: false,
// FIXME: the readable event may have already been emitted.
eventToPromise(stream, 'readable')
])
if (stream.length === undefined) {
stream.length = await this.getSize(file).catch(noop)
}
return stream
})
if (!checksum) {
return streamP
}
try {
checksum = await this.readFile(`${file}.checksum`)
} catch (error) {
if (error.code === 'ENOENT' && ignoreMissingChecksum) {
return streamP
}
throw error
}
let stream = await streamP
const { length } = stream
stream = validChecksumOfReadStream(stream, checksum.toString())
stream.length = length
return stream
}
@ -94,15 +123,43 @@ export default class RemoteHandlerAbstract {
throw new Error('Not implemented')
}
async createOutputStream (file, options) {
return this._createOutputStream(file, options)
async createOutputStream (file, {
checksum = false,
...options
} = {}) {
const streamP = this._createOutputStream(file, options)
if (!checksum) {
return streamP
}
const connectorStream = through2()
const forwardError = error => {
connectorStream.emit('error', error)
}
const streamWithChecksum = addChecksumToReadStream(connectorStream)
streamWithChecksum.pipe(await streamP)
streamWithChecksum.on('error', forwardError)
streamWithChecksum.checksum
.then(value => this.outputFile(`${file}.checksum`, value))
.catch(forwardError)
return connectorStream
}
async _createOutputStream (file, options) {
throw new Error('Not implemented')
}
async unlink (file) {
async unlink (file, {
checksum = false
} = {}) {
if (checksum) {
this._unlink(`${file}.checksum`).catch(noop)
}
return this._unlink(file)
}

View File

@ -1,15 +1,22 @@
import base64url from 'base64url'
import eventToPromise from 'event-to-promise'
import forEach from 'lodash.foreach'
import has from 'lodash.has'
import humanFormat from 'human-format'
import invert from 'lodash.invert'
import isArray from 'lodash.isarray'
import isString from 'lodash.isstring'
import kindOf from 'kindof'
import multiKeyHashInt from 'multikey-hash'
import xml2js from 'xml2js'
import { defer } from 'promise-toolbox'
import {promisify} from 'bluebird'
import {randomBytes} from 'crypto'
import {
createHash,
randomBytes
} from 'crypto'
import { Readable } from 'stream'
import through2 from 'through2'
import {utcFormat as d3TimeFormat} from 'd3-time-format'
// ===================================================================
@ -50,6 +57,90 @@ export const createRawObject = Object.create
// -------------------------------------------------------------------
const ALGORITHM_TO_ID = {
md5: '1',
sha256: '5',
sha512: '6'
}
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// Wrap a readable stream in a stream with a checksum promise
// attribute which is resolved at the end of an input stream.
// (Finally .checksum contains the checksum of the input stream)
//
// Example:
// const sourceStream = ...
// const targetStream = ...
// const checksumStream = addChecksumToReadStream(sourceStream)
// await Promise.all([
// eventToPromise(checksumStream.pipe(targetStream), 'finish'),
// checksumStream.checksum.then(console.log)
// ])
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithm}`)
}
const hash = createHash(algorithm)
const { promise, resolve } = defer()
const wrapper = stream.pipe(through2(
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
resolve(hash.digest('hex'))
callback()
}
))
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
return wrapper
}
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1))
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithmId}`)
}
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
const checksum = `$${algorithmId}$$${hash.digest('hex')}`
callback(
checksum !== expectedChecksum
? new Error(`Bad checksum (${checksum}), expected: ${expectedChecksum}`)
: null
)
}
))
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksumVerified = eventToPromise(wrapper, 'end')
return wrapper
}
// -------------------------------------------------------------------
// Ensure the value is an array, wrap it if necessary.
export function ensureArray (value) {
if (value === undefined) {

View File

@ -2113,6 +2113,7 @@ export default class Xapi extends XapiBase {
const task = this._watchTask(taskRef)
await Promise.all([
stream.checksumVerified,
task,
put(stream, {
hostname: host.address,

View File

@ -41,13 +41,29 @@ const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name)
// Get the timestamp of a vdi backup. (full or delta)
const getVdiTimestamp = name => {
const arr = /^(\d+T\d+Z)_(?:full|delta)\.vhd$/.exec(name)
return arr[1] || undefined
return arr[1]
}
const getDeltaBackupNameWithoutExt = name => name.slice(0, -DELTA_BACKUP_EXT_LENGTH)
const isDeltaBackup = name => endsWith(name, DELTA_BACKUP_EXT)
async function checkFileIntegrity (handler, name) {
let stream
try {
stream = await handler.createReadStream(name, { checksum: true })
} catch (error) {
if (error.code === 'ENOENT') {
return
}
throw error
}
stream.resume()
await eventToPromise(stream, 'finish')
}
// ===================================================================
export default class {
@ -307,9 +323,17 @@ export default class {
format: VDI_FORMAT_VHD
})
const targetStream = await handler.createOutputStream(backupFullPath, { flags: 'wx' })
const targetStream = await handler.createOutputStream(backupFullPath, {
// FIXME: Checksum is not computed for full vdi backups.
// The problem is in the merge case, a delta merged in a full vdi
// backup forces us to browse the resulting file =>
// Significant transfer time on the network !
checksum: !isFull,
flags: 'wx'
})
sourceStream.on('error', error => targetStream.emit('error', error))
await Promise.all([
eventToPromise(sourceStream.pipe(targetStream), 'finish'),
sourceStream.task
@ -317,7 +341,8 @@ export default class {
} catch (error) {
// Remove new backup. (corrupt) and delete new vdi base.
xapi.deleteVdi(currentSnapshot.$id).catch(noop)
await handler.unlink(backupFullPath).catch(noop)
await handler.unlink(backupFullPath, { checksum: true }).catch(noop)
throw error
}
@ -343,9 +368,13 @@ export default class {
return
}
const newFull = `${getVdiTimestamp(backups[i])}_full.vhd`
const vhdUtil = `${__dirname}/../../bin/vhd-util`
const timestamp = getVdiTimestamp(backups[i])
const newFullBackup = `${dir}/${timestamp}_full.vhd`
await checkFileIntegrity(handler, `${dir}/${backups[i]}`)
for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) {
const backup = `${dir}/${backups[i]}`
const parent = `${dir}/${backups[i - 1]}`
@ -353,6 +382,7 @@ export default class {
const path = handler._remote.path // FIXME, private attribute !
try {
await checkFileIntegrity(handler, `${dir}/${backups[i - 1]}`)
await execa(vhdUtil, ['modify', '-n', `${path}/${backup}`, '-p', `${path}/${parent}`]) // FIXME not ok at least with smb remotes
await execa(vhdUtil, ['coalesce', '-n', `${path}/${backup}`]) // FIXME not ok at least with smb remotes
} catch (e) {
@ -360,21 +390,21 @@ export default class {
throw e
}
await handler.unlink(backup)
await handler.unlink(backup, { checksum: true })
}
// The base was removed, it exists two full backups or more ?
// => Remove old backups before the most recent full.
if (i > 0) {
for (i--; i >= 0; i--) {
await handler.unlink(`${dir}/${backups[i]}`)
await handler.unlink(`${dir}/${backups[i]}`, { checksum: true })
}
return
}
// Rename the first old full backup to the new full backup.
await handler.rename(`${dir}/${backups[0]}`, `${dir}/${newFull}`)
await handler.rename(`${dir}/${backups[0]}`, newFullBackup)
}
async _listDeltaVdiDependencies (handler, filePath) {
@ -416,7 +446,7 @@ export default class {
const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value()
await xapi.deleteVdi(newBaseId)
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`).catch(noop)
await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`, { checksum: true }).catch(noop)
})
)
}
@ -494,7 +524,7 @@ export default class {
}
if (fail) {
console.error(`Remove successful backups in ${handler.path}/${dir}`, fulFilledVdiBackups)
console.error(`Remove successful backups in ${dir}`, fulFilledVdiBackups)
await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups)
throw new Error('Rolling delta vm backup failed.')
@ -582,12 +612,12 @@ export default class {
mapToArray(
delta.vdis,
async (vdi, id) => {
const vdisFolder = dirname(vdi.xoPath)
const vdisFolder = `${basePath}/${dirname(vdi.xoPath)}`
const backups = await this._listDeltaVdiDependencies(handler, `${basePath}/${vdi.xoPath}`)
streams[`${id}.vhd`] = await Promise.all(
mapToArray(backups, backup => handler.createReadStream(`${basePath}/${vdisFolder}/${backup}`))
)
streams[`${id}.vhd`] = await Promise.all(mapToArray(backups, async backup =>
handler.createReadStream(`${vdisFolder}/${backup}`, { checksum: true, ignoreMissingChecksum: true })
))
}
)
)