feat(xapi/VDI_exportContent): create XAPI task during NBD export (#7228)

See zammad#19003
This commit is contained in:
Mathieu 2023-12-19 19:16:44 +01:00 committed by GitHub
parent c40e71ed49
commit e66bcf2a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 13 deletions

View File

@ -3,6 +3,7 @@ import pCatch from 'promise-toolbox/catch'
import pRetry from 'promise-toolbox/retry' import pRetry from 'promise-toolbox/retry'
import { createLogger } from '@xen-orchestra/log' import { createLogger } from '@xen-orchestra/log'
import { decorateClass } from '@vates/decorate-with' import { decorateClass } from '@vates/decorate-with'
import { finished } from 'node:stream'
import { strict as assert } from 'node:assert' import { strict as assert } from 'node:assert'
import extractOpaqueRef from './_extractOpaqueRef.mjs' import extractOpaqueRef from './_extractOpaqueRef.mjs'
@ -106,12 +107,16 @@ class Vdi {
stream = createNbdRawStream(nbdClient) stream = createNbdRawStream(nbdClient)
} else { } else {
// raw export without nbd or vhd exports needs a resource stream // raw export without nbd or vhd exports needs a resource stream
const vdiName = await this.getField('VDI', ref, 'name_label')
stream = await this.getResource(cancelToken, '/export_raw_vdi/', { stream = await this.getResource(cancelToken, '/export_raw_vdi/', {
query, query,
task: await this.task_create(`Exporting content of VDI ${await this.getField('VDI', ref, 'name_label')}`), task: await this.task_create(`Exporting content of VDI ${vdiName}`),
}) })
if (nbdClient !== undefined && format === VDI_FORMAT_VHD) { if (nbdClient !== undefined && format === VDI_FORMAT_VHD) {
const taskRef = await this.task_create(`Exporting content of VDI ${vdiName} using NBD`)
stream = await createNbdVhdStream(nbdClient, stream) stream = await createNbdVhdStream(nbdClient, stream)
stream.on('progress', progress => this.call('task.set_progress', taskRef, progress))
finished(stream, () => this.task_destroy(taskRef))
} }
} }
return stream return stream

View File

@ -16,6 +16,7 @@
- [HTTP] `http.useForwardedHeaders` setting can be enabled when XO is behind a reverse proxy to fetch clients IP addresses from `X-Forwarded-*` headers [Forum#67625](https://xcp-ng.org/forum/post/67625) (PR [#7233](https://github.com/vatesfr/xen-orchestra/pull/7233)) - [HTTP] `http.useForwardedHeaders` setting can be enabled when XO is behind a reverse proxy to fetch clients IP addresses from `X-Forwarded-*` headers [Forum#67625](https://xcp-ng.org/forum/post/67625) (PR [#7233](https://github.com/vatesfr/xen-orchestra/pull/7233))
- [Backup]Use multiple link to speedup NBD backup (PR [#7216](https://github.com/vatesfr/xen-orchestra/pull/7216)) - [Backup]Use multiple link to speedup NBD backup (PR [#7216](https://github.com/vatesfr/xen-orchestra/pull/7216))
- [Backup] Show if disk is differential or full in incremental backups (PR [#7222](https://github.com/vatesfr/xen-orchestra/pull/7222)) - [Backup] Show if disk is differential or full in incremental backups (PR [#7222](https://github.com/vatesfr/xen-orchestra/pull/7222))
- [VDI] Create XAPI task during NBD export (PR [#7228](https://github.com/vatesfr/xen-orchestra/pull/7228))
### Bug fixes ### Bug fixes

View File

@ -1,6 +1,6 @@
'use strict' 'use strict'
const { finished, Readable } = require('node:stream')
const { readChunkStrict, skipStrict } = require('@vates/read-chunk') const { readChunkStrict, skipStrict } = require('@vates/read-chunk')
const { Readable } = require('node:stream')
const { unpackHeader } = require('./Vhd/_utils') const { unpackHeader } = require('./Vhd/_utils')
const { const {
FOOTER_SIZE, FOOTER_SIZE,
@ -14,6 +14,9 @@ const {
const { fuHeader, checksumStruct } = require('./_structs') const { fuHeader, checksumStruct } = require('./_structs')
const assert = require('node:assert') const assert = require('node:assert')
const MAX_DURATION_BETWEEN_PROGRESS_EMIT = 5e3
const MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT = 1
exports.createNbdRawStream = function createRawStream(nbdClient) { exports.createNbdRawStream = function createRawStream(nbdClient) {
const exportSize = Number(nbdClient.exportSize) const exportSize = Number(nbdClient.exportSize)
const chunkSize = 2 * 1024 * 1024 const chunkSize = 2 * 1024 * 1024
@ -31,7 +34,14 @@ exports.createNbdRawStream = function createRawStream(nbdClient) {
return stream return stream
} }
exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStream) { exports.createNbdVhdStream = async function createVhdStream(
nbdClient,
sourceStream,
{
maxDurationBetweenProgressEmit = MAX_DURATION_BETWEEN_PROGRESS_EMIT,
minTresholdPercentBetweenProgressEmit = MIN_TRESHOLD_PERCENT_BETWEEN_PROGRESS_EMIT,
} = {}
) {
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE) const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)
const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE)) const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
@ -78,10 +88,35 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
} }
} }
const totalLength = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE
let lengthRead = 0
let lastUpdate = 0
let lastLengthRead = 0
function throttleEmitProgress() {
const now = Date.now()
if (
lengthRead - lastLengthRead > (minTresholdPercentBetweenProgressEmit / 100) * totalLength ||
(now - lastUpdate > maxDurationBetweenProgressEmit && lengthRead !== lastLengthRead)
) {
stream.emit('progress', lengthRead / totalLength)
lastUpdate = now
lastLengthRead = lengthRead
}
}
function trackAndGet(buffer) {
lengthRead += buffer.length
throttleEmitProgress()
return buffer
}
async function* iterator() { async function* iterator() {
yield bufFooter yield trackAndGet(bufFooter)
yield rawHeader yield trackAndGet(rawHeader)
yield bat yield trackAndGet(bat)
let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize let precBlocOffset = FOOTER_SIZE + HEADER_SIZE + batSize
for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) { for (let i = 0; i < PARENT_LOCATOR_ENTRIES; i++) {
@ -91,7 +126,7 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset) await skipStrict(sourceStream, parentLocatorOffset - precBlocOffset)
const data = await readChunkStrict(sourceStream, space) const data = await readChunkStrict(sourceStream, space)
precBlocOffset = parentLocatorOffset + space precBlocOffset = parentLocatorOffset + space
yield data yield trackAndGet(data)
} }
} }
@ -106,16 +141,20 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
}) })
const bitmap = Buffer.alloc(SECTOR_SIZE, 255) const bitmap = Buffer.alloc(SECTOR_SIZE, 255)
for await (const block of nbdIterator) { for await (const block of nbdIterator) {
yield bitmap // don't forget the bitmap before the block yield trackAndGet(bitmap) // don't forget the bitmap before the block
yield block yield trackAndGet(block)
} }
yield bufFooter yield trackAndGet(bufFooter)
} }
const stream = Readable.from(iterator(), { objectMode: false }) const stream = Readable.from(iterator(), { objectMode: false })
stream.length = (offsetSector + blockSizeInSectors + 1) /* end footer */ * SECTOR_SIZE stream.length = totalLength
stream._nbd = true stream._nbd = true
stream.on('error', () => nbdClient.disconnect()) finished(stream, () => {
stream.on('end', () => nbdClient.disconnect()) clearInterval(interval)
nbdClient.disconnect()
})
const interval = setInterval(throttleEmitProgress, maxDurationBetweenProgressEmit)
return stream return stream
} }