From 354692fb063f48e36d8cb363952422d8041d0b3e Mon Sep 17 00:00:00 2001 From: wescoeur Date: Wed, 20 Jan 2016 10:19:31 +0100 Subject: [PATCH] Add checksum verification for delta backup on restore/merge. (fix vatesfr/xo-web#617) --- package.json | 3 ++ src/remote-handlers/abstract.js | 83 ++++++++++++++++++++++++----- src/utils.js | 93 ++++++++++++++++++++++++++++++++- src/xapi.js | 1 + src/xo-mixins/backups.js | 58 +++++++++++++++----- 5 files changed, 210 insertions(+), 28 deletions(-) diff --git a/package.json b/package.json index 8a3228b05..4d840c2b3 100644 --- a/package.json +++ b/package.json @@ -87,6 +87,7 @@ "lodash.get": "^3.7.0", "lodash.has": "^3.0.0", "lodash.includes": "^3.1.1", + "lodash.invert": "^4.0.1", "lodash.isarray": "^3.0.0", "lodash.isboolean": "^3.0.2", "lodash.isempty": "^3.0.0", @@ -110,12 +111,14 @@ "partial-stream": "0.0.0", "passport": "^0.3.0", "passport-local": "^1.0.0", + "promise-toolbox": "^0.1.0", "proxy-http-request": "0.1.0", "redis": "^2.0.1", "schema-inspector": "^1.5.1", "semver": "^5.1.0", "serve-static": "^1.9.2", "stack-chain": "^1.3.3", + "through2": "^2.0.0", "trace": "^2.0.1", "ws": "~0.8.0", "xen-api": "^0.7.2", diff --git a/src/remote-handlers/abstract.js b/src/remote-handlers/abstract.js index 08c472b1a..0da1f179a 100644 --- a/src/remote-handlers/abstract.js +++ b/src/remote-handlers/abstract.js @@ -1,11 +1,16 @@ import eventToPromise from 'event-to-promise' import getStream from 'get-stream' +import through2 from 'through2' import { parse } from 'xo-remote-parser' -import { noop } from '../utils' +import { + addChecksumToReadStream, + noop, + validChecksumOfReadStream +} from '../utils' export default class RemoteHandlerAbstract { constructor (remote) { @@ -76,16 +81,40 @@ export default class RemoteHandlerAbstract { throw new Error('Not implemented') } - async createReadStream (file, options) { - const stream = await this._createReadStream(file) + async createReadStream (file, { + checksum = false, + ignoreMissingChecksum = false, + ...options + } = {}) { + const streamP = this._createReadStream(file, options).then(async stream => { + await eventToPromise(stream, 'readable') - await Promise.all([ - stream.length === undefined - ? this.getSize(file).then(value => stream.length = value).catch(noop) - : false, - // FIXME: the readable event may have already been emitted. - eventToPromise(stream, 'readable') - ]) + if (stream.length === undefined) { + stream.length = await this.getSize(file).catch(noop) + } + + return stream + }) + + if (!checksum) { + return streamP + } + + try { + checksum = await this.readFile(`${file}.checksum`) + } catch (error) { + if (error.code === 'ENOENT' && ignoreMissingChecksum) { + return streamP + } + + throw error + } + + let stream = await streamP + + const { length } = stream + stream = validChecksumOfReadStream(stream, checksum.toString()) + stream.length = length return stream } @@ -94,15 +123,43 @@ export default class RemoteHandlerAbstract { throw new Error('Not implemented') } - async createOutputStream (file, options) { - return this._createOutputStream(file, options) + async createOutputStream (file, { + checksum = false, + ...options + } = {}) { + const streamP = this._createOutputStream(file, options) + + if (!checksum) { + return streamP + } + + const connectorStream = through2() + const forwardError = error => { + connectorStream.emit('error', error) + } + + const streamWithChecksum = addChecksumToReadStream(connectorStream) + streamWithChecksum.pipe(await streamP) + streamWithChecksum.on('error', forwardError) + + streamWithChecksum.checksum + .then(value => this.outputFile(`${file}.checksum`, value)) + .catch(forwardError) + + return connectorStream } async _createOutputStream (file, options) { throw new Error('Not implemented') } - async unlink (file) { + async unlink (file, { + checksum = false + } = {}) { + if (checksum) { + this._unlink(`${file}.checksum`).catch(noop) + } + return this._unlink(file) } diff --git a/src/utils.js b/src/utils.js index b288ad7b0..c7edea684 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,15 +1,22 @@ import base64url from 'base64url' +import eventToPromise from 'event-to-promise' import forEach from 'lodash.foreach' import has from 'lodash.has' import humanFormat from 'human-format' +import invert from 'lodash.invert' import isArray from 'lodash.isarray' import isString from 'lodash.isstring' import kindOf from 'kindof' import multiKeyHashInt from 'multikey-hash' import xml2js from 'xml2js' +import { defer } from 'promise-toolbox' import {promisify} from 'bluebird' -import {randomBytes} from 'crypto' +import { + createHash, + randomBytes +} from 'crypto' import { Readable } from 'stream' +import through2 from 'through2' import {utcFormat as d3TimeFormat} from 'd3-time-format' // =================================================================== @@ -50,6 +57,90 @@ export const createRawObject = Object.create // ------------------------------------------------------------------- +const ALGORITHM_TO_ID = { + md5: '1', + sha256: '5', + sha512: '6' +} + +const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID) + +// Wrap a readable stream in a stream with a checksum promise +// attribute which is resolved at the end of an input stream. +// (Finally .checksum contains the checksum of the input stream) +// +// Example: +// const sourceStream = ... +// const targetStream = ... +// const checksumStream = addChecksumToReadStream(sourceStream) +// await Promise.all([ +// eventToPromise(checksumStream.pipe(targetStream), 'finish'), +// checksumStream.checksum.then(console.log) +// ]) +export const addChecksumToReadStream = (stream, algorithm = 'md5') => { + const algorithmId = ALGORITHM_TO_ID[algorithm] + + if (!algorithmId) { + throw new Error(`unknown algorithm: ${algorithm}`) + } + + const hash = createHash(algorithm) + const { promise, resolve } = defer() + + const wrapper = stream.pipe(through2( + (chunk, enc, callback) => { + hash.update(chunk) + callback(null, chunk) + }, + callback => { + resolve(hash.digest('hex')) + callback() + } + )) + + stream.on('error', error => wrapper.emit('error', error)) + wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`) + + return wrapper +} + +// Check if the checksum of a readable stream is equals to an expected checksum. +// The given stream is wrapped in a stream which emits an error event +// if the computed checksum is not equals to the expected checksum. +export const validChecksumOfReadStream = (stream, expectedChecksum) => { + const algorithmId = expectedChecksum.slice(1, expectedChecksum.indexOf('$', 1)) + + if (!algorithmId) { + throw new Error(`unknown algorithm: ${algorithmId}`) + } + + const hash = createHash(ID_TO_ALGORITHM[algorithmId]) + + const wrapper = stream.pipe(through2( + { highWaterMark: 0 }, + (chunk, enc, callback) => { + hash.update(chunk) + callback(null, chunk) + }, + callback => { + const checksum = `$${algorithmId}$$${hash.digest('hex')}` + + callback( + checksum !== expectedChecksum + ? new Error(`Bad checksum (${checksum}), expected: ${expectedChecksum}`) + : null + ) + } + )) + + stream.on('error', error => wrapper.emit('error', error)) + wrapper.checksumVerified = eventToPromise(wrapper, 'end') + + return wrapper +} + +// ------------------------------------------------------------------- + // Ensure the value is an array, wrap it if necessary. export function ensureArray (value) { if (value === undefined) { diff --git a/src/xapi.js b/src/xapi.js index 1b402a139..2103d4739 100644 --- a/src/xapi.js +++ b/src/xapi.js @@ -2113,6 +2113,7 @@ export default class Xapi extends XapiBase { const task = this._watchTask(taskRef) await Promise.all([ + stream.checksumVerified, task, put(stream, { hostname: host.address, diff --git a/src/xo-mixins/backups.js b/src/xo-mixins/backups.js index f56b01ca6..ef641f704 100644 --- a/src/xo-mixins/backups.js +++ b/src/xo-mixins/backups.js @@ -41,13 +41,29 @@ const isDeltaVdiBackup = name => /^\d+T\d+Z_delta\.vhd$/.test(name) // Get the timestamp of a vdi backup. (full or delta) const getVdiTimestamp = name => { const arr = /^(\d+T\d+Z)_(?:full|delta)\.vhd$/.exec(name) - return arr[1] || undefined + return arr[1] } const getDeltaBackupNameWithoutExt = name => name.slice(0, -DELTA_BACKUP_EXT_LENGTH) - const isDeltaBackup = name => endsWith(name, DELTA_BACKUP_EXT) +async function checkFileIntegrity (handler, name) { + let stream + + try { + stream = await handler.createReadStream(name, { checksum: true }) + } catch (error) { + if (error.code === 'ENOENT') { + return + } + + throw error + } + + stream.resume() + await eventToPromise(stream, 'finish') +} + // =================================================================== export default class { @@ -307,9 +323,17 @@ export default class { format: VDI_FORMAT_VHD }) - const targetStream = await handler.createOutputStream(backupFullPath, { flags: 'wx' }) + const targetStream = await handler.createOutputStream(backupFullPath, { + // FIXME: Checksum is not computed for full vdi backups. + // The problem is in the merge case, a delta merged in a full vdi + // backup forces us to browse the resulting file => + // Significant transfer time on the network ! + checksum: !isFull, + flags: 'wx' + }) sourceStream.on('error', error => targetStream.emit('error', error)) + await Promise.all([ eventToPromise(sourceStream.pipe(targetStream), 'finish'), sourceStream.task @@ -317,7 +341,8 @@ export default class { } catch (error) { // Remove new backup. (corrupt) and delete new vdi base. xapi.deleteVdi(currentSnapshot.$id).catch(noop) - await handler.unlink(backupFullPath).catch(noop) + await handler.unlink(backupFullPath, { checksum: true }).catch(noop) + throw error } @@ -343,9 +368,13 @@ export default class { return } - const newFull = `${getVdiTimestamp(backups[i])}_full.vhd` const vhdUtil = `${__dirname}/../../bin/vhd-util` + const timestamp = getVdiTimestamp(backups[i]) + const newFullBackup = `${dir}/${timestamp}_full.vhd` + + await checkFileIntegrity(handler, `${dir}/${backups[i]}`) + for (; i > 0 && isDeltaVdiBackup(backups[i]); i--) { const backup = `${dir}/${backups[i]}` const parent = `${dir}/${backups[i - 1]}` @@ -353,6 +382,7 @@ export default class { const path = handler._remote.path // FIXME, private attribute ! try { + await checkFileIntegrity(handler, `${dir}/${backups[i - 1]}`) await execa(vhdUtil, ['modify', '-n', `${path}/${backup}`, '-p', `${path}/${parent}`]) // FIXME not ok at least with smb remotes await execa(vhdUtil, ['coalesce', '-n', `${path}/${backup}`]) // FIXME not ok at least with smb remotes } catch (e) { @@ -360,21 +390,21 @@ export default class { throw e } - await handler.unlink(backup) + await handler.unlink(backup, { checksum: true }) } // The base was removed, it exists two full backups or more ? // => Remove old backups before the most recent full. if (i > 0) { for (i--; i >= 0; i--) { - await handler.unlink(`${dir}/${backups[i]}`) + await handler.unlink(`${dir}/${backups[i]}`, { checksum: true }) } return } // Rename the first old full backup to the new full backup. - await handler.rename(`${dir}/${backups[0]}`, `${dir}/${newFull}`) + await handler.rename(`${dir}/${backups[0]}`, newFullBackup) } async _listDeltaVdiDependencies (handler, filePath) { @@ -416,7 +446,7 @@ export default class { const { newBaseId, backupDirectory, vdiFilename } = vdiBackup.value() await xapi.deleteVdi(newBaseId) - await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`).catch(noop) + await handler.unlink(`${dir}/${backupDirectory}/${vdiFilename}`, { checksum: true }).catch(noop) }) ) } @@ -494,7 +524,7 @@ export default class { } if (fail) { - console.error(`Remove successful backups in ${handler.path}/${dir}`, fulFilledVdiBackups) + console.error(`Remove successful backups in ${dir}`, fulFilledVdiBackups) await this._failedRollingDeltaVmBackup(xapi, handler, dir, fulFilledVdiBackups) throw new Error('Rolling delta vm backup failed.') @@ -582,12 +612,12 @@ export default class { mapToArray( delta.vdis, async (vdi, id) => { - const vdisFolder = dirname(vdi.xoPath) + const vdisFolder = `${basePath}/${dirname(vdi.xoPath)}` const backups = await this._listDeltaVdiDependencies(handler, `${basePath}/${vdi.xoPath}`) - streams[`${id}.vhd`] = await Promise.all( - mapToArray(backups, backup => handler.createReadStream(`${basePath}/${vdisFolder}/${backup}`)) - ) + streams[`${id}.vhd`] = await Promise.all(mapToArray(backups, async backup => + handler.createReadStream(`${vdisFolder}/${backup}`, { checksum: true, ignoreMissingChecksum: true }) + )) } ) )