Compare commits

...

1 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
24f4e15cc6 feat(@xen-orchestra/backups): use parallel reading whith NBD + blocks to only one remote 2023-05-17 14:35:04 +02:00
3 changed files with 30 additions and 14 deletions

View File

@@ -267,7 +267,7 @@ class VmBackup {
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
deltaExport: this._writers.size > 1 ? forkDeltaExport(deltaExport) : deltaExport,
sizeContainers,
timestamp,
}),

View File

@@ -1,7 +1,7 @@
'use strict'
const { readChunkStrict, skipStrict } = require('@vates/read-chunk')
const { Readable } = require('node:stream')
const { unpackHeader } = require('./Vhd/_utils')
const { unpackHeader, unpackFooter } = require('./Vhd/_utils')
const {
FOOTER_SIZE,
HEADER_SIZE,
@@ -24,6 +24,7 @@ exports.createNbdRawStream = async function createRawStream(nbdClient) {
exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStream) {
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)
const footer = unpackFooter(bufFooter)
const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
// compute BAT in order
@@ -70,19 +71,20 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
}
async function* iterator() {
yield bufFooter
yield rawHeader
yield bat
yield { buffer: bufFooter, type: 'footer', footer }
yield { buffer: rawHeader, type: 'header', header }
yield { buffer: bat, type: 'bat' }
let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
const parentLocatorOffset = header.parentLocatorEntry[i].platformDataOffset
const space = header.parentLocatorEntry[i].platformDataSpace * SECTOR_SIZE
const parentLocatorEntry = header.parentLocatorEntry[i]
const parentLocatorOffset = parentLocatorEntry.platformDataOffset
const space = parentLocatorEntry.platformDataSpace * SECTOR_SIZE
if (space > 0) {
await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset)
const data = await readChunkStrict(sourceStream, space)
precBlocOffset = parentLocatorOffset + space
yield data
yield { ...parentLocatorEntry, buffer: data, type: 'parentLocator', id: i }
}
}
@@ -96,16 +98,25 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
}
})
const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
let index = 0
for await (const block of nbdIterator) {
yield bitmap // don't forget the bitmap before the block
yield block
const buffer = Buffer.concat([bitmap, block])
yield { buffer, type: 'block', id: entries[index] }
index++
}
yield bufFooter
yield { buffer: bufFooter, type: 'footer', footer }
}
const stream = Readable.from(iterator())
async function* bufferIterator() {
for await (const { buffer } of iterator()) {
yield buffer
}
}
const stream = Readable.from(bufferIterator())
stream.length = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE
stream._nbd = true
stream._iterator = iterator
stream.on('error', () => nbdClient.disconnect())
stream.on('end', () => nbdClient.disconnect())
return stream

View File

@@ -160,7 +160,12 @@ class StreamParser {
yield* this.blocks()
}
}
exports.parseVhdStream = async function* parseVhdStream(stream) {
const parser = new StreamParser(stream)
yield* parser.parse()
if (stream._iterator) {
yield* stream._iterator()
} else {
const parser = new StreamParser(stream)
yield* parser.parse()
}
}