feat(nbd-client): various fixes (#6964)

This commit is contained in:
Florent BEAUCHAMP
2023-11-15 10:04:09 +01:00
committed by GitHub
parent 315d626055
commit af7aa29c91
2 changed files with 60 additions and 16 deletions

View File

@@ -1,7 +1,7 @@
import assert from 'node:assert'
import { Socket } from 'node:net'
import { connect } from 'node:tls'
import { fromCallback, pRetry, pDelay, pTimeout } from 'promise-toolbox'
import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox'
import { readChunkStrict } from '@vates/read-chunk'
import { createLogger } from '@xen-orchestra/log'
@@ -21,6 +21,7 @@ import {
OPTS_MAGIC,
NBD_CMD_DISC,
} from './constants.mjs'
import { Readable } from 'node:stream'
const { warn } = createLogger('vates:nbd-client')
@@ -40,6 +41,7 @@ export default class NbdClient {
#readBlockRetries
#reconnectRetry
#connectTimeout
#messageTimeout
// AFAIK, there is no guaranty the server answers in the same order as the queries
// so we handle a backlog of command waiting for response and handle concurrency manually
@@ -52,7 +54,14 @@ export default class NbdClient {
#reconnectingPromise
constructor(
{ address, port = NBD_DEFAULT_PORT, exportname, cert },
{ connectTimeout = 6e4, waitBeforeReconnect = 1e3, readAhead = 10, readBlockRetries = 5, reconnectRetry = 5 } = {}
{
connectTimeout = 6e4,
messageTimeout = 6e4,
waitBeforeReconnect = 1e3,
readAhead = 10,
readBlockRetries = 5,
reconnectRetry = 5,
} = {}
) {
this.#serverAddress = address
this.#serverPort = port
@@ -63,6 +72,7 @@ export default class NbdClient {
this.#readBlockRetries = readBlockRetries
this.#reconnectRetry = reconnectRetry
this.#connectTimeout = connectTimeout
this.#messageTimeout = messageTimeout
}
get exportSize() {
@@ -116,12 +126,24 @@ export default class NbdClient {
return
}
const queryId = this.#nextCommandQueryId
this.#nextCommandQueryId++
const buffer = Buffer.alloc(28)
buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0) // it is a nbd request
buffer.writeInt16BE(0, 4) // no command flags for a disconnect
buffer.writeInt16BE(NBD_CMD_DISC, 6) // we want to disconnect from nbd server
await this.#write(buffer)
await this.#serverSocket.destroy()
buffer.writeBigUInt64BE(queryId, 8)
buffer.writeBigUInt64BE(0n, 16)
buffer.writeInt32BE(0, 24)
const promise = pFromCallback(cb => {
this.#serverSocket.end(buffer, 'utf8', cb)
})
try {
await pTimeout.call(promise, this.#messageTimeout)
} catch (error) {
this.#serverSocket.destroy()
}
this.#serverSocket = undefined
this.#connected = false
}
@@ -195,11 +217,13 @@ export default class NbdClient {
}
#read(length) {
return readChunkStrict(this.#serverSocket, length)
const promise = readChunkStrict(this.#serverSocket, length)
return pTimeout.call(promise, this.#messageTimeout)
}
#write(buffer) {
return fromCallback.call(this.#serverSocket, 'write', buffer)
const promise = fromCallback.call(this.#serverSocket, 'write', buffer)
return pTimeout.call(promise, this.#messageTimeout)
}
async #readInt32() {
@@ -232,19 +256,20 @@ export default class NbdClient {
}
try {
this.#waitingForResponse = true
const magic = await this.#readInt32()
const buffer = await this.#read(16)
const magic = buffer.readInt32BE(0)
if (magic !== NBD_REPLY_MAGIC) {
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
}
const error = await this.#readInt32()
const error = buffer.readInt32BE(4)
if (error !== 0) {
// @todo use error code from constants.mjs
throw new Error(`GOT ERROR CODE : ${error}`)
}
const blockQueryId = await this.#readInt64()
const blockQueryId = buffer.readBigUInt64BE(8)
const query = this.#commandQueryBacklog.get(blockQueryId)
if (!query) {
throw new Error(` no query associated with id ${blockQueryId}`)
@@ -281,7 +306,13 @@ export default class NbdClient {
buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block
buffer.writeBigUInt64BE(queryId, 8)
// byte offset in the raw disk
buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16)
const offset = BigInt(index) * BigInt(size)
const remaining = this.#exportSize - offset
if (remaining < BigInt(size)) {
size = Number(remaining)
}
buffer.writeBigUInt64BE(offset, 16)
buffer.writeInt32BE(size, 24)
return new Promise((resolve, reject) => {
@@ -307,14 +338,15 @@ export default class NbdClient {
})
}
async *readBlocks(indexGenerator) {
async *readBlocks(indexGenerator = 2 * 1024 * 1024) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
if (typeof indexGenerator === 'number') {
const exportSize = Number(this.#exportSize)
const chunkSize = indexGenerator
indexGenerator = function* () {
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
for (let index = 0; BigInt(index) < nbBlocks; index++) {
const nbBlocks = Math.ceil(exportSize / chunkSize)
for (let index = 0; index < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
@@ -348,4 +380,15 @@ export default class NbdClient {
yield readAhead.shift()
}
}
stream(chunkSize) {
async function* iterator() {
for await (const chunk of this.readBlocks(chunkSize)) {
yield chunk
}
}
// create a readable stream instead of returning the iterator
// since iterators don't like unshift and partial reading
return Readable.from(iterator())
}
}