diff --git a/@vates/nbd-client/index.mjs b/@vates/nbd-client/index.mjs index dfcfc9df2..2f4b4b298 100644 --- a/@vates/nbd-client/index.mjs +++ b/@vates/nbd-client/index.mjs @@ -3,7 +3,6 @@ import { Socket } from 'node:net' import { connect } from 'node:tls' import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox' import { readChunkStrict } from '@vates/read-chunk' -import { createLogger } from '@xen-orchestra/log' import { INIT_PASSWD, @@ -21,9 +20,6 @@ import { OPTS_MAGIC, NBD_CMD_DISC, } from './constants.mjs' -import { Readable } from 'node:stream' - -const { warn } = createLogger('vates:nbd-client') // documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md @@ -337,58 +333,4 @@ export default class NbdClient { this.#readBlockResponse() }) } - - async *readBlocks(indexGenerator = 2 * 1024 * 1024) { - // default : read all blocks - if (typeof indexGenerator === 'number') { - const exportSize = Number(this.#exportSize) - const chunkSize = indexGenerator - - indexGenerator = function* () { - const nbBlocks = Math.ceil(exportSize / chunkSize) - for (let index = 0; index < nbBlocks; index++) { - yield { index, size: chunkSize } - } - } - } - const readAhead = [] - const readAheadMaxLength = this.#readAhead - const makeReadBlockPromise = (index, size) => { - const promise = pRetry(() => this.readBlock(index, size), { - tries: this.#readBlockRetries, - onRetry: async err => { - warn('will retry reading block ', index, err) - await this.reconnect() - }, - }) - // error is handled during unshift - promise.catch(() => {}) - return promise - } - - // read all blocks, but try to keep readAheadMaxLength promise waiting ahead - for (const { index, size } of indexGenerator()) { - // stack readAheadMaxLength promises before starting to handle the results - if (readAhead.length === readAheadMaxLength) { - // any error will stop reading blocks - yield readAhead.shift() - } - - readAhead.push(makeReadBlockPromise(index, size)) - } - while (readAhead.length > 0) { - yield readAhead.shift() - } - } - - stream(chunkSize) { - async function* iterator() { - for await (const chunk of this.readBlocks(chunkSize)) { - yield chunk - } - } - // create a readable stream instead of returning the iterator - // since iterators don't like unshift and partial reading - return Readable.from(iterator()) - } } diff --git a/@vates/nbd-client/tests/nbdclient.integ.mjs b/@vates/nbd-client/tests/nbdclient.integ.mjs index 91b94d9ef..5e00edb52 100644 --- a/@vates/nbd-client/tests/nbdclient.integ.mjs +++ b/@vates/nbd-client/tests/nbdclient.integ.mjs @@ -1,4 +1,3 @@ -import NbdClient from '../index.mjs' import { spawn, exec } from 'node:child_process' import fs from 'node:fs/promises' import { test } from 'tap' @@ -7,6 +6,7 @@ import { pFromCallback } from 'promise-toolbox' import { Socket } from 'node:net' import { NBD_DEFAULT_PORT } from '../constants.mjs' import assert from 'node:assert' +import MultiNbdClient from '../multi.mjs' const FILE_SIZE = 10 * 1024 * 1024 @@ -81,7 +81,7 @@ test('it works with unsecured network', async tap => { const path = await createTempFile(FILE_SIZE) let nbdServer = await spawnNbdKit(path) - const client = new NbdClient( + const client = new MultiNbdClient( { address: '127.0.0.1', exportname: 'MY_SECRET_EXPORT', @@ -147,17 +147,6 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB nbdServer = await spawnNbdKit(path) } } - - // we can reuse the conneciton to read other blocks - // default iterator - const nbdIteratorWithDefaultBlockIterator = client.readBlocks() - let nb = 0 - for await (const block of nbdIteratorWithDefaultBlockIterator) { - nb++ - tap.equal(block.length, 2 * 1024 * 1024) - } - - tap.equal(nb, 5) assert.rejects(() => client.readBlock(100, CHUNK_SIZE)) await client.disconnect() diff --git a/packages/vhd-lib/createStreamNbd.js b/packages/vhd-lib/createStreamNbd.js index aef39c6bb..7ef60ebfa 100644 --- a/packages/vhd-lib/createStreamNbd.js +++ b/packages/vhd-lib/createStreamNbd.js @@ -15,7 +15,16 @@ const { fuHeader, checksumStruct } = require('./_structs') const assert = require('node:assert') exports.createNbdRawStream = function createRawStream(nbdClient) { - const stream = Readable.from(nbdClient.readBlocks(), { objectMode: false }) + const exportSize = Number(nbdClient.exportSize) + const chunkSize = 2 * 1024 * 1024 + + const indexGenerator = function* () { + const nbBlocks = Math.ceil(exportSize / chunkSize) + for (let index = 0; index < nbBlocks; index++) { + yield { index, size: chunkSize } + } + } + const stream = Readable.from(nbdClient.readBlocks(indexGenerator), { objectMode: false }) stream.on('error', () => nbdClient.disconnect()) stream.on('end', () => nbdClient.disconnect()) @@ -26,6 +35,7 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE) const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE)) + header.tableOffset = FOOTER_SIZE + HEADER_SIZE // compute BAT in order const batSize = Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE) * SECTOR_SIZE await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE)) @@ -47,7 +57,6 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr precLocator = offset } } - header.tableOffset = FOOTER_SIZE + HEADER_SIZE const rawHeader = fuHeader.pack(header) checksumStruct(rawHeader, fuHeader)