chore(xo-server): addChecksumToReadStream → createChecksumStream (#2725)
`addChecksumToReadStream` was overly complicated and its usage was limited. `createChecksumStream` is similar but does not pipe the readable stream in by itself.
This commit is contained in:
parent
3e89c62e72
commit
41f16846b6
@ -1,10 +1,9 @@
|
||||
import through2 from 'through2'
|
||||
import { fromEvent, ignoreErrors } from 'promise-toolbox'
|
||||
import { parse } from 'xo-remote-parser'
|
||||
|
||||
import { getPseudoRandomBytes, streamToBuffer } from '../utils'
|
||||
|
||||
import { addChecksumToReadStream, validChecksumOfReadStream } from './checksum'
|
||||
import { createChecksumStream, validChecksumOfReadStream } from './checksum'
|
||||
|
||||
const checksumFile = file => file + '.checksum'
|
||||
|
||||
@ -187,10 +186,11 @@ export default class RemoteHandlerAbstract {
|
||||
}
|
||||
|
||||
async refreshChecksum (path) {
|
||||
const stream = addChecksumToReadStream(await this.createReadStream(path))
|
||||
const stream = (await this.createReadStream(path)).pipe(
|
||||
createChecksumStream()
|
||||
)
|
||||
stream.resume() // start reading the whole file
|
||||
const checksum = await stream.checksum
|
||||
await this.outputFile(checksumFile(path), checksum)
|
||||
await this.outputFile(checksumFile(path), await stream.checksum)
|
||||
}
|
||||
|
||||
async createOutputStream (file, { checksum = false, ...options } = {}) {
|
||||
@ -204,21 +204,20 @@ export default class RemoteHandlerAbstract {
|
||||
return streamP
|
||||
}
|
||||
|
||||
const connectorStream = through2()
|
||||
const checksumStream = createChecksumStream()
|
||||
const forwardError = error => {
|
||||
connectorStream.emit('error', error)
|
||||
checksumStream.emit('error', error)
|
||||
}
|
||||
|
||||
const streamWithChecksum = addChecksumToReadStream(connectorStream)
|
||||
const stream = await streamP
|
||||
stream.on('error', forwardError)
|
||||
streamWithChecksum.pipe(stream)
|
||||
checksumStream.pipe(stream)
|
||||
|
||||
streamWithChecksum.checksum
|
||||
checksumStream.checksum
|
||||
.then(value => this.outputFile(checksumFile(path), value))
|
||||
.catch(forwardError)
|
||||
|
||||
return connectorStream
|
||||
return checksumStream
|
||||
}
|
||||
|
||||
async _createOutputStream (file, options) {
|
||||
|
@ -11,19 +11,17 @@ const ALGORITHM_TO_ID = {
|
||||
|
||||
const ID_TO_ALGORITHM = invert(ALGORITHM_TO_ID)
|
||||
|
||||
// Wrap a readable stream in a stream with a checksum promise
|
||||
// attribute which is resolved at the end of an input stream.
|
||||
// (Finally .checksum contains the checksum of the input stream)
|
||||
// Create a through stream which computes the checksum of all data going
|
||||
// through.
|
||||
//
|
||||
// Example:
|
||||
// const sourceStream = ...
|
||||
// const targetStream = ...
|
||||
// const checksumStream = addChecksumToReadStream(sourceStream)
|
||||
// await Promise.all([
|
||||
// fromEvent(checksumStream.pipe(targetStream), 'finish'),
|
||||
// checksumStream.checksum.then(console.log)
|
||||
// ])
|
||||
export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
|
||||
// The `checksum` attribute is a promise which resolves at the end of the stream
|
||||
// with a string representation of the checksum.
|
||||
//
|
||||
// const source = ...
|
||||
// const checksumStream = source.pipe(createChecksumStream())
|
||||
// checksumStream.resume() // make the data flow without an output
|
||||
// console.log(await checksumStream.checksum)
|
||||
export const createChecksumStream = (algorithm = 'md5') => {
|
||||
const algorithmId = ALGORITHM_TO_ID[algorithm]
|
||||
|
||||
if (!algorithmId) {
|
||||
@ -31,25 +29,20 @@ export const addChecksumToReadStream = (stream, algorithm = 'md5') => {
|
||||
}
|
||||
|
||||
const hash = createHash(algorithm)
|
||||
const { promise, resolve } = defer()
|
||||
const { promise, resolve, reject } = defer()
|
||||
|
||||
const wrapper = stream.pipe(
|
||||
through2(
|
||||
(chunk, enc, callback) => {
|
||||
hash.update(chunk)
|
||||
callback(null, chunk)
|
||||
},
|
||||
callback => {
|
||||
resolve(hash.digest('hex'))
|
||||
callback()
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
stream.on('error', error => wrapper.emit('error', error))
|
||||
wrapper.checksum = promise.then(hash => `$${algorithmId}$$${hash}`)
|
||||
|
||||
return wrapper
|
||||
const stream = through2(
|
||||
(chunk, enc, callback) => {
|
||||
hash.update(chunk)
|
||||
callback(null, chunk)
|
||||
},
|
||||
callback => {
|
||||
resolve(`$${algorithmId}$$${hash.digest('hex')}`)
|
||||
callback()
|
||||
}
|
||||
).once('error', reject)
|
||||
stream.checksum = promise
|
||||
return stream
|
||||
}
|
||||
|
||||
// Check if the checksum of a readable stream is equals to an expected checksum.
|
||||
|
Loading…
Reference in New Issue
Block a user