From af7aa29c91bb248e0a4c7deb0f0a6d37a01aa366 Mon Sep 17 00:00:00 2001 From: Florent BEAUCHAMP Date: Wed, 15 Nov 2023 10:04:09 +0100 Subject: [PATCH] feat(nbd-client): various fixes (#6964) --- @vates/nbd-client/index.mjs | 75 +++++++++++++++++++++++++++++-------- CHANGELOG.unreleased.md | 1 + 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/@vates/nbd-client/index.mjs b/@vates/nbd-client/index.mjs index 9d31ca697..dfcfc9df2 100644 --- a/@vates/nbd-client/index.mjs +++ b/@vates/nbd-client/index.mjs @@ -1,7 +1,7 @@ import assert from 'node:assert' import { Socket } from 'node:net' import { connect } from 'node:tls' -import { fromCallback, pRetry, pDelay, pTimeout } from 'promise-toolbox' +import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox' import { readChunkStrict } from '@vates/read-chunk' import { createLogger } from '@xen-orchestra/log' @@ -21,6 +21,7 @@ import { OPTS_MAGIC, NBD_CMD_DISC, } from './constants.mjs' +import { Readable } from 'node:stream' const { warn } = createLogger('vates:nbd-client') @@ -40,6 +41,7 @@ export default class NbdClient { #readBlockRetries #reconnectRetry #connectTimeout + #messageTimeout // AFAIK, there is no guaranty the server answers in the same order as the queries // so we handle a backlog of command waiting for response and handle concurrency manually @@ -52,7 +54,14 @@ export default class NbdClient { #reconnectingPromise constructor( { address, port = NBD_DEFAULT_PORT, exportname, cert }, - { connectTimeout = 6e4, waitBeforeReconnect = 1e3, readAhead = 10, readBlockRetries = 5, reconnectRetry = 5 } = {} + { + connectTimeout = 6e4, + messageTimeout = 6e4, + waitBeforeReconnect = 1e3, + readAhead = 10, + readBlockRetries = 5, + reconnectRetry = 5, + } = {} ) { this.#serverAddress = address this.#serverPort = port @@ -63,6 +72,7 @@ export default class NbdClient { this.#readBlockRetries = readBlockRetries this.#reconnectRetry = reconnectRetry this.#connectTimeout = connectTimeout + this.#messageTimeout = messageTimeout } get exportSize() { @@ -116,12 +126,24 @@ export default class NbdClient { return } + const queryId = this.#nextCommandQueryId + this.#nextCommandQueryId++ + const buffer = Buffer.alloc(28) buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0) // it is a nbd request buffer.writeInt16BE(0, 4) // no command flags for a disconnect buffer.writeInt16BE(NBD_CMD_DISC, 6) // we want to disconnect from nbd server - await this.#write(buffer) - await this.#serverSocket.destroy() + buffer.writeBigUInt64BE(queryId, 8) + buffer.writeBigUInt64BE(0n, 16) + buffer.writeInt32BE(0, 24) + const promise = pFromCallback(cb => { + this.#serverSocket.end(buffer, 'utf8', cb) + }) + try { + await pTimeout.call(promise, this.#messageTimeout) + } catch (error) { + this.#serverSocket.destroy() + } this.#serverSocket = undefined this.#connected = false } @@ -195,11 +217,13 @@ export default class NbdClient { } #read(length) { - return readChunkStrict(this.#serverSocket, length) + const promise = readChunkStrict(this.#serverSocket, length) + return pTimeout.call(promise, this.#messageTimeout) } #write(buffer) { - return fromCallback.call(this.#serverSocket, 'write', buffer) + const promise = fromCallback.call(this.#serverSocket, 'write', buffer) + return pTimeout.call(promise, this.#messageTimeout) } async #readInt32() { @@ -232,19 +256,20 @@ export default class NbdClient { } try { this.#waitingForResponse = true - const magic = await this.#readInt32() + const buffer = await this.#read(16) + const magic = buffer.readInt32BE(0) if (magic !== NBD_REPLY_MAGIC) { throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`) } - const error = await this.#readInt32() + const error = buffer.readInt32BE(4) if (error !== 0) { // @todo use error code from constants.mjs throw new Error(`GOT ERROR CODE : ${error}`) } - const blockQueryId = await this.#readInt64() + const blockQueryId = buffer.readBigUInt64BE(8) const query = this.#commandQueryBacklog.get(blockQueryId) if (!query) { throw new Error(` no query associated with id ${blockQueryId}`) @@ -281,7 +306,13 @@ export default class NbdClient { buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block buffer.writeBigUInt64BE(queryId, 8) // byte offset in the raw disk - buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16) + const offset = BigInt(index) * BigInt(size) + const remaining = this.#exportSize - offset + if (remaining < BigInt(size)) { + size = Number(remaining) + } + + buffer.writeBigUInt64BE(offset, 16) buffer.writeInt32BE(size, 24) return new Promise((resolve, reject) => { @@ -307,14 +338,15 @@ export default class NbdClient { }) } - async *readBlocks(indexGenerator) { + async *readBlocks(indexGenerator = 2 * 1024 * 1024) { // default : read all blocks - if (indexGenerator === undefined) { - const exportSize = this.#exportSize - const chunkSize = 2 * 1024 * 1024 + if (typeof indexGenerator === 'number') { + const exportSize = Number(this.#exportSize) + const chunkSize = indexGenerator + indexGenerator = function* () { - const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize))) - for (let index = 0; BigInt(index) < nbBlocks; index++) { + const nbBlocks = Math.ceil(exportSize / chunkSize) + for (let index = 0; index < nbBlocks; index++) { yield { index, size: chunkSize } } } @@ -348,4 +380,15 @@ export default class NbdClient { yield readAhead.shift() } } + + stream(chunkSize) { + async function* iterator() { + for await (const chunk of this.readBlocks(chunkSize)) { + yield chunk + } + } + // create a readable stream instead of returning the iterator + // since iterators don't like unshift and partial reading + return Readable.from(iterator()) + } } diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 4278e3e36..daf3888aa 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -32,6 +32,7 @@ +- @vates/nbd-client patch - @xen-orchestra/backups patch - @xen-orchestra/vmware-explorer patch - xo-server-netbox minor