chore(xo-server/backups-ng): dont fork streams if 1 target

This commit is contained in:
Julien Fontanet 2018-03-30 12:10:35 +02:00
parent 2ac1093543
commit 505f06c1d8

View File

@ -671,7 +671,8 @@ export default class BackupNg {
const remotes = unboxIds(job.remotes) const remotes = unboxIds(job.remotes)
const srs = unboxIds(job.srs) const srs = unboxIds(job.srs)
if (remotes.length === 0 && srs.length === 0) { const nTargets = remotes.length + srs.length
if (nTargets === 0) {
throw new Error('export retention must be 0 without remotes and SRs') throw new Error('export retention must be 0 without remotes and SRs')
} }
@ -695,6 +696,15 @@ export default class BackupNg {
const exportTask = xva.task const exportTask = xva.task
xva = xva.pipe(createSizeStream()) xva = xva.pipe(createSizeStream())
const forkExport =
nTargets === 0
? () => xva
: () => {
const fork = xva.pipe(new PassThrough())
fork.task = exportTask
return fork
}
const dataBasename = `${basename}.xva` const dataBasename = `${basename}.xva`
const metadata: MetadataFull = { const metadata: MetadataFull = {
@ -715,7 +725,7 @@ export default class BackupNg {
await waitAll( await waitAll(
[ [
...remotes.map(async remoteId => { ...remotes.map(async remoteId => {
const fork = xva.pipe(new PassThrough()) const fork = forkExport()
const handler = await app.getRemoteHandler(remoteId) const handler = await app.getRemoteHandler(remoteId)
@ -742,8 +752,7 @@ export default class BackupNg {
} }
}), }),
...srs.map(async srId => { ...srs.map(async srId => {
const fork = xva.pipe(new PassThrough()) const fork = forkExport()
fork.task = exportTask
const xapi = app.getXapi(srId) const xapi = app.getXapi(srId)
const sr = xapi.getObject(srId) const sr = xapi.getObject(srId)
@ -841,49 +850,55 @@ export default class BackupNg {
const jsonMetadata = JSON.stringify(metadata) const jsonMetadata = JSON.stringify(metadata)
// create a fork of the delta export // create a fork of the delta export
const forkExport = (() => { const forkExport =
// replace the stream factories by fork factories nTargets === 1
const streams: any = mapValues(deltaExport.streams, lazyStream => { ? () => deltaExport
let forks = [] : (() => {
return () => { // replace the stream factories by fork factories
if (forks === undefined) { const streams: any = mapValues(
throw new Error( deltaExport.streams,
'cannot fork the stream after it has been created' lazyStream => {
) let forks = []
} return () => {
if (forks.length === 0) { if (forks === undefined) {
lazyStream().then( throw new Error(
stream => { 'cannot fork the stream after it has been created'
// $FlowFixMe )
forks.forEach(({ resolve }) => { }
const fork: any = stream.pipe(new PassThrough()) if (forks.length === 0) {
fork.task = stream.task lazyStream().then(
resolve(fork) stream => {
// $FlowFixMe
forks.forEach(({ resolve }) => {
const fork: any = stream.pipe(new PassThrough())
fork.task = stream.task
resolve(fork)
})
forks = undefined
},
error => {
// $FlowFixMe
forks.forEach(({ reject }) => {
reject(error)
})
forks = undefined
}
)
}
return new Promise((resolve, reject) => {
// $FlowFixMe
forks.push({ reject, resolve })
}) })
forks = undefined
},
error => {
// $FlowFixMe
forks.forEach(({ reject }) => {
reject(error)
})
forks = undefined
} }
) }
)
return () => {
return {
__proto__: deltaExport,
streams,
}
} }
return new Promise((resolve, reject) => { })()
// $FlowFixMe
forks.push({ reject, resolve })
})
}
})
return () => {
return {
__proto__: deltaExport,
streams,
}
}
})()
const mergeStart = 0 const mergeStart = 0
const mergeEnd = 0 const mergeEnd = 0