diff --git a/packages/xo-server/src/remote-handlers/abstract.js b/packages/xo-server/src/remote-handlers/abstract.js index 1eb584d22..329015772 100644 --- a/packages/xo-server/src/remote-handlers/abstract.js +++ b/packages/xo-server/src/remote-handlers/abstract.js @@ -3,12 +3,9 @@ import through2 from 'through2' import { ignoreErrors } from 'promise-toolbox' import { parse } from 'xo-remote-parser' -import { - addChecksumToReadStream, - getPseudoRandomBytes, - streamToBuffer, - validChecksumOfReadStream, -} from '../utils' +import { getPseudoRandomBytes, streamToBuffer } from '../utils' + +import { addChecksumToReadStream, validChecksumOfReadStream } from './checksum' const checksumFile = file => file + '.checksum' diff --git a/packages/xo-server/src/remote-handlers/checksum.js b/packages/xo-server/src/remote-handlers/checksum.js new file mode 100644 index 000000000..a9a9ee430 --- /dev/null +++ b/packages/xo-server/src/remote-handlers/checksum.js @@ -0,0 +1,95 @@ +import invert from 'lodash/invert' +import through2 from 'through2' +import { createHash } from 'crypto' +import { defer, fromEvent } from 'promise-toolbox' + +const ALGORITHM_TO_ID = { + md5: '1', + sha256: '5', + sha512: '6', +} + +const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID) + +// Wrap a readable stream in a stream with a checksum promise +// attribute which is resolved at the end of an input stream. +// (Finally .checksum contains the checksum of the input stream) +// +// Example: +// const sourceStream = ... +// const targetStream = ... +// const checksumStream = addChecksumToReadStream(sourceStream) +// await Promise.all([ +// fromEvent(checksumStream.pipe(targetStream), 'finish'), +// checksumStream.checksum.then(console.log) +// ]) +export const addChecksumToReadStream = (stream, algorithm = 'md5') => { + const algorithmId = ALGORITHM_TO_ID[algorithm] + + if (!algorithmId) { + throw new Error(`unknown algorithm: ${algorithm}`) + } + + const hash = createHash(algorithm) + const { promise, resolve } = defer() + + const wrapper = stream.pipe( + through2( + (chunk, enc, callback) => { + hash.update(chunk) + callback(null, chunk) + }, + callback => { + resolve(hash.digest('hex')) + callback() + } + ) + ) + + stream.on('error', error => wrapper.emit('error', error)) + wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`) + + return wrapper +} + +// Check if the checksum of a readable stream is equals to an expected checksum. +// The given stream is wrapped in a stream which emits an error event +// if the computed checksum is not equals to the expected checksum. +export const validChecksumOfReadStream = (stream, expectedChecksum) => { + const algorithmId = expectedChecksum.slice( + 1, + expectedChecksum.indexOf('$', 1) + ) + + if (!algorithmId) { + throw new Error(`unknown algorithm: ${algorithmId}`) + } + + const hash = createHash(ID_TO_ALGORITHM[algorithmId]) + + const wrapper = stream.pipe( + through2( + { highWaterMark: 0 }, + (chunk, enc, callback) => { + hash.update(chunk) + callback(null, chunk) + }, + callback => { + const checksum = `$${algorithmId}$$${hash.digest('hex')}` + + callback( + checksum !== expectedChecksum + ? new Error( + `Bad checksum (${checksum}), expected: ${expectedChecksum}` + ) + : null + ) + } + ) + ) + + stream.on('error', error => wrapper.emit('error', error)) + wrapper.checksumVerified = fromEvent(wrapper, 'end') + + return wrapper +} diff --git a/packages/xo-server/src/utils.js b/packages/xo-server/src/utils.js index 8af96cf6f..d81aca405 100644 --- a/packages/xo-server/src/utils.js +++ b/packages/xo-server/src/utils.js @@ -1,10 +1,8 @@ import base64url from 'base64url' -import eventToPromise from 'event-to-promise' import forEach from 'lodash/forEach' import has from 'lodash/has' import highland from 'highland' import humanFormat from 'human-format' -import invert from 'lodash/invert' import isArray from 'lodash/isArray' import isString from 'lodash/isString' import keys from 'lodash/keys' @@ -14,24 +12,21 @@ import multiKeyHashInt from 'multikey-hash' import pick from 'lodash/pick' import tmp from 'tmp' import xml2js from 'xml2js' +import { randomBytes } from 'crypto' import { resolve } from 'path' - -// Moment timezone can be loaded only one time, it's a workaround to load -// the latest version because cron module uses an old version of moment which -// does not implement `guess` function for example. -import 'moment-timezone' - -import through2 from 'through2' import { utcFormat, utcParse } from 'd3-time-format' import { all as pAll, - defer, fromCallback, isPromise, promisify, reflect as pReflect, } from 'promise-toolbox' -import { createHash, randomBytes } from 'crypto' + +// Moment timezone can be loaded only one time, it's a workaround to load +// the latest version because cron module uses an old version of moment which +// does not implement `guess` function for example. +import 'moment-timezone' // =================================================================== @@ -106,99 +101,6 @@ export const diffItems = (coll1, coll2) => { // ------------------------------------------------------------------- -const ALGORITHM_TO_ID = { - md5: '1', - sha256: '5', - sha512: '6', -} - -const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID) - -// Wrap a readable stream in a stream with a checksum promise -// attribute which is resolved at the end of an input stream. -// (Finally .checksum contains the checksum of the input stream) -// -// Example: -// const sourceStream = ... -// const targetStream = ... -// const checksumStream = addChecksumToReadStream(sourceStream) -// await Promise.all([ -// eventToPromise(checksumStream.pipe(targetStream), 'finish'), -// checksumStream.checksum.then(console.log) -// ]) -export const addChecksumToReadStream = (stream, algorithm = 'md5') => { - const algorithmId = ALGORITHM_TO_ID[algorithm] - - if (!algorithmId) { - throw new Error(`unknown algorithm: ${algorithm}`) - } - - const hash = createHash(algorithm) - const { promise, resolve } = defer() - - const wrapper = stream.pipe( - through2( - (chunk, enc, callback) => { - hash.update(chunk) - callback(null, chunk) - }, - callback => { - resolve(hash.digest('hex')) - callback() - } - ) - ) - - stream.on('error', error => wrapper.emit('error', error)) - wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`) - - return wrapper -} - -// Check if the checksum of a readable stream is equals to an expected checksum. -// The given stream is wrapped in a stream which emits an error event -// if the computed checksum is not equals to the expected checksum. -export const validChecksumOfReadStream = (stream, expectedChecksum) => { - const algorithmId = expectedChecksum.slice( - 1, - expectedChecksum.indexOf('$', 1) - ) - - if (!algorithmId) { - throw new Error(`unknown algorithm: ${algorithmId}`) - } - - const hash = createHash(ID_TO_ALGORITHM[algorithmId]) - - const wrapper = stream.pipe( - through2( - { highWaterMark: 0 }, - (chunk, enc, callback) => { - hash.update(chunk) - callback(null, chunk) - }, - callback => { - const checksum = `$${algorithmId}$$${hash.digest('hex')}` - - callback( - checksum !== expectedChecksum - ? new Error( - `Bad checksum (${checksum}), expected: ${expectedChecksum}` - ) - : null - ) - } - ) - ) - - stream.on('error', error => wrapper.emit('error', error)) - wrapper.checksumVerified = eventToPromise(wrapper, 'end') - - return wrapper -} - -// ------------------------------------------------------------------- - // Ensure the value is an array, wrap it if necessary. export function ensureArray (value) { if (value === undefined) {