feat(backups/_forkStreamUnpipe): add more debug

This commit is contained in:
Julien Fontanet 2022-08-26 10:47:38 +02:00
parent c1846e6ff3
commit 58ab32a623

View File

@ -3,6 +3,8 @@
const eos = require('end-of-stream') const eos = require('end-of-stream')
const { PassThrough } = require('stream') const { PassThrough } = require('stream')
const { debug } = require('@xen-orchestra/log').createLogger('xo:backups:forkStreamUnpipe')
// create a new readable stream from an existing one which may be piped later // create a new readable stream from an existing one which may be piped later
// //
// in case of error in the new readable stream, it will simply be unpiped // in case of error in the new readable stream, it will simply be unpiped
@ -11,18 +13,23 @@ exports.forkStreamUnpipe = function forkStreamUnpipe(stream) {
const { forks = 0 } = stream const { forks = 0 } = stream
stream.forks = forks + 1 stream.forks = forks + 1
debug('forking', { forks: stream.forks })
const proxy = new PassThrough() const proxy = new PassThrough()
stream.pipe(proxy) stream.pipe(proxy)
eos(stream, error => { eos(stream, error => {
if (error !== undefined) { if (error !== undefined) {
debug('error on original stream, destroying fork', { error })
proxy.destroy(error) proxy.destroy(error)
} }
}) })
eos(proxy, _ => { eos(proxy, error => {
stream.forks-- debug('end of stream, unpiping', { error, forks: --stream.forks })
stream.unpipe(proxy) stream.unpipe(proxy)
if (stream.forks === 0) { if (stream.forks === 0) {
debug('no more forks, destroying original stream')
stream.destroy(new Error('no more consumers for this stream')) stream.destroy(new Error('no more consumers for this stream'))
} }
}) })