chore(xo-server): move checksum streams into own module

This commit is contained in:
Julien Fontanet 2018-03-05 17:16:03 +01:00
parent 927d3135c4
commit e4b11a793b
3 changed files with 104 additions and 110 deletions

View File

@ -3,12 +3,9 @@ import through2 from 'through2'
import { ignoreErrors } from 'promise-toolbox' import { ignoreErrors } from 'promise-toolbox'
import { parse } from 'xo-remote-parser' import { parse } from 'xo-remote-parser'
import { import { getPseudoRandomBytes, streamToBuffer } from '../utils'
addChecksumToReadStream,
getPseudoRandomBytes, import { addChecksumToReadStream, validChecksumOfReadStream } from './checksum'
streamToBuffer,
validChecksumOfReadStream,
} from '../utils'
const checksumFile = file => file + '.checksum' const checksumFile = file => file + '.checksum'

View File

@ -0,0 +1,95 @@
import invert from 'lodash/invert'
import through2 from 'through2'
import { createHash } from 'crypto'
import { defer, fromEvent } from 'promise-toolbox'
const ALGORITHM_TO_ID = {
md5: '1',
sha256: '5',
sha512: '6',
}
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// Wrap a readable stream in a stream with a checksum promise
// attribute which is resolved at the end of an input stream.
// (Finally .checksum contains the checksum of the input stream)
//
// Example:
// const sourceStream = ...
// const targetStream = ...
// const checksumStream = addChecksumToReadStream(sourceStream)
// await Promise.all([
// fromEvent(checksumStream.pipe(targetStream), 'finish'),
// checksumStream.checksum.then(console.log)
// ])
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithm}`)
}
const hash = createHash(algorithm)
const { promise, resolve } = defer()
const wrapper = stream.pipe(
through2(
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
resolve(hash.digest('hex'))
callback()
}
)
)
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
return wrapper
}
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const algorithmId = expectedChecksum.slice(
1,
expectedChecksum.indexOf('$', 1)
)
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithmId}`)
}
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(
through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
const checksum = `$${algorithmId}$$${hash.digest('hex')}`
callback(
checksum !== expectedChecksum
? new Error(
`Bad checksum (${checksum}), expected: ${expectedChecksum}`
)
: null
)
}
)
)
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksumVerified = fromEvent(wrapper, 'end')
return wrapper
}

View File

@ -1,10 +1,8 @@
import base64url from 'base64url' import base64url from 'base64url'
import eventToPromise from 'event-to-promise'
import forEach from 'lodash/forEach' import forEach from 'lodash/forEach'
import has from 'lodash/has' import has from 'lodash/has'
import highland from 'highland' import highland from 'highland'
import humanFormat from 'human-format' import humanFormat from 'human-format'
import invert from 'lodash/invert'
import isArray from 'lodash/isArray' import isArray from 'lodash/isArray'
import isString from 'lodash/isString' import isString from 'lodash/isString'
import keys from 'lodash/keys' import keys from 'lodash/keys'
@ -14,24 +12,21 @@ import multiKeyHashInt from 'multikey-hash'
import pick from 'lodash/pick' import pick from 'lodash/pick'
import tmp from 'tmp' import tmp from 'tmp'
import xml2js from 'xml2js' import xml2js from 'xml2js'
import { randomBytes } from 'crypto'
import { resolve } from 'path' import { resolve } from 'path'
// Moment timezone can be loaded only one time, it's a workaround to load
// the latest version because cron module uses an old version of moment which
// does not implement `guess` function for example.
import 'moment-timezone'
import through2 from 'through2'
import { utcFormat, utcParse } from 'd3-time-format' import { utcFormat, utcParse } from 'd3-time-format'
import { import {
all as pAll, all as pAll,
defer,
fromCallback, fromCallback,
isPromise, isPromise,
promisify, promisify,
reflect as pReflect, reflect as pReflect,
} from 'promise-toolbox' } from 'promise-toolbox'
import { createHash, randomBytes } from 'crypto'
// Moment timezone can be loaded only one time, it's a workaround to load
// the latest version because cron module uses an old version of moment which
// does not implement `guess` function for example.
import 'moment-timezone'
// =================================================================== // ===================================================================
@ -106,99 +101,6 @@ export const diffItems = (coll1, coll2) => {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
const ALGORITHM_TO_ID = {
md5: '1',
sha256: '5',
sha512: '6',
}
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
// Wrap a readable stream in a stream with a checksum promise
// attribute which is resolved at the end of an input stream.
// (Finally .checksum contains the checksum of the input stream)
//
// Example:
// const sourceStream = ...
// const targetStream = ...
// const checksumStream = addChecksumToReadStream(sourceStream)
// await Promise.all([
// eventToPromise(checksumStream.pipe(targetStream), 'finish'),
// checksumStream.checksum.then(console.log)
// ])
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
const algorithmId = ALGORITHM_TO_ID[algorithm]
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithm}`)
}
const hash = createHash(algorithm)
const { promise, resolve } = defer()
const wrapper = stream.pipe(
through2(
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
resolve(hash.digest('hex'))
callback()
}
)
)
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
return wrapper
}
// Check if the checksum of a readable stream is equals to an expected checksum.
// The given stream is wrapped in a stream which emits an error event
// if the computed checksum is not equals to the expected checksum.
export const validChecksumOfReadStream = (stream, expectedChecksum) => {
const algorithmId = expectedChecksum.slice(
1,
expectedChecksum.indexOf('$', 1)
)
if (!algorithmId) {
throw new Error(`unknown algorithm: ${algorithmId}`)
}
const hash = createHash(ID_TO_ALGORITHM[algorithmId])
const wrapper = stream.pipe(
through2(
{ highWaterMark: 0 },
(chunk, enc, callback) => {
hash.update(chunk)
callback(null, chunk)
},
callback => {
const checksum = `$${algorithmId}$$${hash.digest('hex')}`
callback(
checksum !== expectedChecksum
? new Error(
`Bad checksum (${checksum}), expected: ${expectedChecksum}`
)
: null
)
}
)
)
stream.on('error', error => wrapper.emit('error', error))
wrapper.checksumVerified = eventToPromise(wrapper, 'end')
return wrapper
}
// -------------------------------------------------------------------
// Ensure the value is an array, wrap it if necessary. // Ensure the value is an array, wrap it if necessary.
export function ensureArray (value) { export function ensureArray (value) {
if (value === undefined) { if (value === undefined) {