feat(nbd-client): first implementation (#6444)
This commit is contained in:
parent
756d2fe4e7
commit
0dd91c1efe
@ -29,6 +29,7 @@ module.exports = {
|
||||
},
|
||||
{
|
||||
files: ['*.spec.{,c,m}js'],
|
||||
excludedFiles: '@vates/nbd-client/*',
|
||||
rules: {
|
||||
'n/no-unsupported-features/node-builtins': [
|
||||
'error',
|
||||
|
16
@vates/nbd-client/.USAGE.md
Normal file
16
@vates/nbd-client/.USAGE.md
Normal file
@ -0,0 +1,16 @@
|
||||
### `new NdbClient({address, exportname, secure = true, port = 10809})`
|
||||
|
||||
create a new nbd client
|
||||
|
||||
```js
|
||||
import NbdClient from '@vates/nbd-client'
|
||||
const client = new NbdClient({
|
||||
address: 'MY_NBD_HOST',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
cert: 'Server certificate', // optional, will use encrypted link if provided
|
||||
})
|
||||
|
||||
await client.connect()
|
||||
const block = await client.readBlock(blockIndex, BlockSize)
|
||||
await client.disconnect()
|
||||
```
|
1
@vates/nbd-client/.npmignore
Symbolic link
1
@vates/nbd-client/.npmignore
Symbolic link
@ -0,0 +1 @@
|
||||
../../scripts/npmignore
|
47
@vates/nbd-client/README.md
Normal file
47
@vates/nbd-client/README.md
Normal file
@ -0,0 +1,47 @@
|
||||
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
|
||||
|
||||
# @vates/nbd-client
|
||||
|
||||
[](https://npmjs.org/package/@vates/nbd-client)  [](https://bundlephobia.com/result?p=@vates/nbd-client) [](https://npmjs.org/package/@vates/nbd-client)
|
||||
|
||||
## Install
|
||||
|
||||
Installation of the [npm package](https://npmjs.org/package/@vates/nbd-client):
|
||||
|
||||
```
|
||||
> npm install --save @vates/nbd-client
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### `new NdbClient({address, exportname, secure = true, port = 10809})`
|
||||
|
||||
create a new nbd client
|
||||
|
||||
```js
|
||||
import NbdClient from '@vates/nbd-client'
|
||||
const client = new NbdClient({
|
||||
address: 'MY_NBD_HOST',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
cert: 'Server certificate', // optional, will use encrypted link if provided
|
||||
})
|
||||
|
||||
await client.connect()
|
||||
const block = await client.readBlock(blockIndex, BlockSize)
|
||||
await client.disconnect()
|
||||
```
|
||||
|
||||
## Contributions
|
||||
|
||||
Contributions are _very_ welcomed, either on the documentation or on
|
||||
the code.
|
||||
|
||||
You may:
|
||||
|
||||
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
|
||||
you've encountered;
|
||||
- fork and create a pull request.
|
||||
|
||||
## License
|
||||
|
||||
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)
|
42
@vates/nbd-client/constants.js
Normal file
42
@vates/nbd-client/constants.js
Normal file
@ -0,0 +1,42 @@
|
||||
'use strict'
|
||||
exports.INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
||||
exports.OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
||||
exports.NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
||||
exports.NBD_OPT_EXPORT_NAME = 1
|
||||
exports.NBD_OPT_ABORT = 2
|
||||
exports.NBD_OPT_LIST = 3
|
||||
exports.NBD_OPT_STARTTLS = 5
|
||||
exports.NBD_OPT_INFO = 6
|
||||
exports.NBD_OPT_GO = 7
|
||||
|
||||
exports.NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||
exports.NBD_FLAG_READ_ONLY = 1 << 1
|
||||
exports.NBD_FLAG_SEND_FLUSH = 1 << 2
|
||||
exports.NBD_FLAG_SEND_FUA = 1 << 3
|
||||
exports.NBD_FLAG_ROTATIONAL = 1 << 4
|
||||
exports.NBD_FLAG_SEND_TRIM = 1 << 5
|
||||
|
||||
exports.NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||
|
||||
exports.NBD_CMD_FLAG_FUA = 1 << 0
|
||||
exports.NBD_CMD_FLAG_NO_HOLE = 1 << 1
|
||||
exports.NBD_CMD_FLAG_DF = 1 << 2
|
||||
exports.NBD_CMD_FLAG_REQ_ONE = 1 << 3
|
||||
exports.NBD_CMD_FLAG_FAST_ZERO = 1 << 4
|
||||
|
||||
exports.NBD_CMD_READ = 0
|
||||
exports.NBD_CMD_WRITE = 1
|
||||
exports.NBD_CMD_DISC = 2
|
||||
exports.NBD_CMD_FLUSH = 3
|
||||
exports.NBD_CMD_TRIM = 4
|
||||
exports.NBD_CMD_CACHE = 5
|
||||
exports.NBD_CMD_WRITE_ZEROES = 6
|
||||
exports.NBD_CMD_BLOCK_STATUS = 7
|
||||
exports.NBD_CMD_RESIZE = 8
|
||||
|
||||
exports.NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||
exports.NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||
exports.NBD_REPLY_ACK = 1
|
||||
|
||||
exports.NBD_DEFAULT_PORT = 10809
|
||||
exports.NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
243
@vates/nbd-client/index.js
Normal file
243
@vates/nbd-client/index.js
Normal file
@ -0,0 +1,243 @@
|
||||
'use strict'
|
||||
const assert = require('node:assert')
|
||||
const { Socket } = require('node:net')
|
||||
const { connect } = require('node:tls')
|
||||
const {
|
||||
INIT_PASSWD,
|
||||
NBD_CMD_READ,
|
||||
NBD_DEFAULT_BLOCK_SIZE,
|
||||
NBD_DEFAULT_PORT,
|
||||
NBD_FLAG_FIXED_NEWSTYLE,
|
||||
NBD_FLAG_HAS_FLAGS,
|
||||
NBD_OPT_EXPORT_NAME,
|
||||
NBD_OPT_REPLY_MAGIC,
|
||||
NBD_OPT_STARTTLS,
|
||||
NBD_REPLY_ACK,
|
||||
NBD_REPLY_MAGIC,
|
||||
NBD_REQUEST_MAGIC,
|
||||
OPTS_MAGIC,
|
||||
} = require('./constants.js')
|
||||
const { fromCallback } = require('promise-toolbox')
|
||||
const { readChunkStrict } = require('@vates/read-chunk')
|
||||
|
||||
// documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||
|
||||
module.exports = class NbdClient {
|
||||
#serverAddress
|
||||
#serverCert
|
||||
#serverPort
|
||||
#serverSocket
|
||||
|
||||
#exportName
|
||||
#exportSize
|
||||
|
||||
// AFAIK, there is no guaranty the server answers in the same order as the queries
|
||||
// so we handle a backlog of command waiting for response and handle concurrency manually
|
||||
|
||||
#waitingForResponse // there is already a listenner waiting for a response
|
||||
#nextCommandQueryId = BigInt(0)
|
||||
#commandQueryBacklog // map of command waiting for an response queryId => { size/*in byte*/, resolve, reject}
|
||||
|
||||
constructor({ address, port = NBD_DEFAULT_PORT, exportname, cert }) {
|
||||
this.#serverAddress = address
|
||||
this.#serverPort = port
|
||||
this.#exportName = exportname
|
||||
this.#serverCert = cert
|
||||
}
|
||||
|
||||
get exportSize() {
|
||||
return this.#exportSize
|
||||
}
|
||||
|
||||
async #tlsConnect() {
|
||||
return new Promise((resolve, reject) => {
|
||||
this.#serverSocket = connect({
|
||||
socket: this.#serverSocket,
|
||||
rejectUnauthorized: false,
|
||||
cert: this.#serverCert,
|
||||
})
|
||||
this.#serverSocket.once('error', reject)
|
||||
this.#serverSocket.once('secureConnect', () => {
|
||||
this.#serverSocket.removeListener('error', reject)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// mandatory , at least to start the handshake
|
||||
async #unsecureConnect() {
|
||||
this.#serverSocket = new Socket()
|
||||
return new Promise((resolve, reject) => {
|
||||
this.#serverSocket.connect(this.#serverPort, this.#serverAddress)
|
||||
this.#serverSocket.once('error', reject)
|
||||
this.#serverSocket.once('connect', () => {
|
||||
this.#serverSocket.removeListener('error', reject)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async connect() {
|
||||
// first we connect to the serve without tls, and then we upgrade the connection
|
||||
// to tls during the handshake
|
||||
await this.#unsecureConnect()
|
||||
await this.#handshake()
|
||||
|
||||
// reset internal state if we reconnected a nbd client
|
||||
this.#commandQueryBacklog = new Map()
|
||||
this.#waitingForResponse = false
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
await this.#serverSocket.destroy()
|
||||
}
|
||||
|
||||
// we can use individual read/write from the socket here since there is no concurrency
|
||||
async #sendOption(option, buffer = Buffer.alloc(0)) {
|
||||
await this.#write(OPTS_MAGIC)
|
||||
await this.#writeInt32(option)
|
||||
await this.#writeInt32(buffer.length)
|
||||
await this.#write(buffer)
|
||||
assert.strictEqual(await this.#readInt64(), NBD_OPT_REPLY_MAGIC) // magic number everywhere
|
||||
assert.strictEqual(await this.#readInt32(), option) // the option passed
|
||||
assert.strictEqual(await this.#readInt32(), NBD_REPLY_ACK) // ACK
|
||||
const length = await this.#readInt32()
|
||||
assert.strictEqual(length, 0) // length
|
||||
}
|
||||
|
||||
// we can use individual read/write from the socket here since there is only one handshake at once, no concurrency
|
||||
async #handshake() {
|
||||
assert((await this.#read(8)).equals(INIT_PASSWD))
|
||||
assert((await this.#read(8)).equals(OPTS_MAGIC))
|
||||
const flagsBuffer = await this.#read(2)
|
||||
const flags = flagsBuffer.readInt16BE(0)
|
||||
assert.strictEqual(flags & NBD_FLAG_FIXED_NEWSTYLE, NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
|
||||
await this.#writeInt32(NBD_FLAG_FIXED_NEWSTYLE) // client also support NBD_FLAG_C_FIXED_NEWSTYLE
|
||||
|
||||
if (this.#serverCert !== undefined) {
|
||||
// upgrade socket to TLS if needed
|
||||
await this.#sendOption(NBD_OPT_STARTTLS)
|
||||
await this.#tlsConnect()
|
||||
}
|
||||
|
||||
// send export name we want to access.
|
||||
// it's implictly closing the negociation phase.
|
||||
await this.#write(OPTS_MAGIC)
|
||||
await this.#writeInt32(NBD_OPT_EXPORT_NAME)
|
||||
const exportNameBuffer = Buffer.from(this.#exportName)
|
||||
await this.#writeInt32(exportNameBuffer.length)
|
||||
await this.#write(exportNameBuffer)
|
||||
|
||||
// 8 (export size ) + 2 (flags) + 124 zero = 134
|
||||
// must read all to ensure nothing stays in the buffer
|
||||
const answer = await this.#read(134)
|
||||
this.#exportSize = answer.readBigUInt64BE(0)
|
||||
const transmissionFlags = answer.readInt16BE(8)
|
||||
assert.strictEqual(transmissionFlags & NBD_FLAG_HAS_FLAGS, NBD_FLAG_HAS_FLAGS, 'NBD_FLAG_HAS_FLAGS') // must always be 1 by the norm
|
||||
|
||||
// note : xapi server always send NBD_FLAG_READ_ONLY (3) as a flag
|
||||
}
|
||||
|
||||
#read(length) {
|
||||
return readChunkStrict(this.#serverSocket, length)
|
||||
}
|
||||
|
||||
#write(buffer) {
|
||||
return fromCallback.call(this.#serverSocket, 'write', buffer)
|
||||
}
|
||||
|
||||
async #readInt32() {
|
||||
const buffer = await this.#read(4)
|
||||
return buffer.readInt32BE(0)
|
||||
}
|
||||
|
||||
async #readInt64() {
|
||||
const buffer = await this.#read(8)
|
||||
return buffer.readBigUInt64BE(0)
|
||||
}
|
||||
|
||||
#writeInt32(int) {
|
||||
const buffer = Buffer.alloc(4)
|
||||
buffer.writeInt32BE(int)
|
||||
return this.#write(buffer)
|
||||
}
|
||||
|
||||
// when one read fail ,stop everything
|
||||
async #rejectAll(error) {
|
||||
this.#commandQueryBacklog.forEach(({ reject }) => {
|
||||
reject(error)
|
||||
})
|
||||
await this.disconnect()
|
||||
}
|
||||
|
||||
async #readBlockResponse() {
|
||||
// ensure at most one read occur in parallel
|
||||
if (this.#waitingForResponse) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
this.#waitingForResponse = true
|
||||
const magic = await this.#readInt32()
|
||||
|
||||
if (magic !== NBD_REPLY_MAGIC) {
|
||||
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
|
||||
}
|
||||
|
||||
const error = await this.#readInt32()
|
||||
if (error !== 0) {
|
||||
// @todo use error code from constants.mjs
|
||||
throw new Error(`GOT ERROR CODE : ${error}`)
|
||||
}
|
||||
|
||||
const blockQueryId = await this.#readInt64()
|
||||
const query = this.#commandQueryBacklog.get(blockQueryId)
|
||||
if (!query) {
|
||||
throw new Error(` no query associated with id ${blockQueryId}`)
|
||||
}
|
||||
this.#commandQueryBacklog.delete(blockQueryId)
|
||||
const data = await this.#read(query.size)
|
||||
query.resolve(data)
|
||||
this.#waitingForResponse = false
|
||||
if (this.#commandQueryBacklog.size > 0) {
|
||||
await this.#readBlockResponse()
|
||||
}
|
||||
} catch (error) {
|
||||
// reject all the promises
|
||||
// we don't need to call readBlockResponse on failure
|
||||
// since we will empty the backlog
|
||||
await this.#rejectAll(error)
|
||||
}
|
||||
}
|
||||
|
||||
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
|
||||
const queryId = this.#nextCommandQueryId
|
||||
this.#nextCommandQueryId++
|
||||
|
||||
// create and send command at once to ensure there is no concurrency issue
|
||||
const buffer = Buffer.alloc(28)
|
||||
buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0) // it is a nbd request
|
||||
buffer.writeInt16BE(0, 4) // no command flags for a simple block read
|
||||
buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block
|
||||
buffer.writeBigUInt64BE(queryId, 8)
|
||||
// byte offset in the raw disk
|
||||
buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16)
|
||||
buffer.writeInt32BE(size, 24)
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
// this will handle one block response, but it can be another block
|
||||
// since server does not guaranty to handle query in order
|
||||
this.#commandQueryBacklog.set(queryId, {
|
||||
size,
|
||||
resolve,
|
||||
reject,
|
||||
})
|
||||
// really send the command to the server
|
||||
this.#write(buffer).catch(reject)
|
||||
|
||||
// #readBlockResponse never throws directly
|
||||
// but if it fails it will reject all the promises in the backlog
|
||||
this.#readBlockResponse()
|
||||
})
|
||||
}
|
||||
}
|
76
@vates/nbd-client/nbdclient.spec.js
Normal file
76
@vates/nbd-client/nbdclient.spec.js
Normal file
@ -0,0 +1,76 @@
|
||||
'use strict'
|
||||
const NbdClient = require('./index.js')
|
||||
const { spawn } = require('node:child_process')
|
||||
const fs = require('node:fs/promises')
|
||||
const { test } = require('tap')
|
||||
const tmp = require('tmp')
|
||||
const { pFromCallback } = require('promise-toolbox')
|
||||
const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
const FILE_SIZE = 2 * 1024 * 1024
|
||||
|
||||
async function createTempFile(size) {
|
||||
const tmpPath = await pFromCallback(cb => tmp.file(cb))
|
||||
const data = Buffer.alloc(size, 0)
|
||||
for (let i = 0; i < size; i += 4) {
|
||||
data.writeUInt32BE(i, i)
|
||||
}
|
||||
await fs.writeFile(tmpPath, data)
|
||||
|
||||
return tmpPath
|
||||
}
|
||||
|
||||
test('it works with unsecured network', async tap => {
|
||||
const path = await createTempFile(FILE_SIZE)
|
||||
|
||||
const nbdServer = spawn(
|
||||
'nbdkit',
|
||||
[
|
||||
'file',
|
||||
path,
|
||||
'--newstyle', //
|
||||
'--exit-with-parent',
|
||||
'--read-only',
|
||||
'--export-name=MY_SECRET_EXPORT',
|
||||
],
|
||||
{
|
||||
stdio: ['inherit', 'inherit', 'inherit'],
|
||||
}
|
||||
)
|
||||
|
||||
const client = new NbdClient({
|
||||
address: 'localhost',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
secure: false,
|
||||
})
|
||||
|
||||
await client.connect()
|
||||
tap.equal(client.exportSize, BigInt(FILE_SIZE))
|
||||
const CHUNK_SIZE = 128 * 1024 // non default size
|
||||
const indexes = []
|
||||
for (let i = 0; i < FILE_SIZE / CHUNK_SIZE; i++) {
|
||||
indexes.push(i)
|
||||
}
|
||||
// read mutiple blocks in parallel
|
||||
await asyncEach(
|
||||
indexes,
|
||||
async i => {
|
||||
const block = await client.readBlock(i, CHUNK_SIZE)
|
||||
let blockOk = true
|
||||
let firstFail
|
||||
for (let j = 0; j < CHUNK_SIZE; j += 4) {
|
||||
const wanted = i * CHUNK_SIZE + j
|
||||
const found = block.readUInt32BE(j)
|
||||
blockOk = blockOk && found === wanted
|
||||
if (!blockOk && firstFail === undefined) {
|
||||
firstFail = j
|
||||
}
|
||||
}
|
||||
tap.ok(blockOk, `check block ${i} content`)
|
||||
},
|
||||
{ concurrency: 8 }
|
||||
)
|
||||
await client.disconnect()
|
||||
nbdServer.kill()
|
||||
await fs.unlink(path)
|
||||
})
|
35
@vates/nbd-client/package.json
Normal file
35
@vates/nbd-client/package.json
Normal file
@ -0,0 +1,35 @@
|
||||
{
|
||||
"private": false,
|
||||
"name": "@vates/nbd-client",
|
||||
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/nbd-client",
|
||||
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
|
||||
"repository": {
|
||||
"directory": "@vates/nbd-client",
|
||||
"type": "git",
|
||||
"url": "https://github.com/vatesfr/xen-orchestra.git"
|
||||
},
|
||||
"author": {
|
||||
"name": "Vates SAS",
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"license": "ISC",
|
||||
"version": "0.0.1",
|
||||
"engines": {
|
||||
"node": ">=14.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@vates/async-each": "^1.0.0",
|
||||
"@vates/read-chunk": "^1.0.0",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"xen-api": "^1.2.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"tap": "^16.3.0",
|
||||
"tmp": "^0.2.1"
|
||||
},
|
||||
"scripts": {
|
||||
"postversion": "npm publish --access public",
|
||||
"test-integration": "tap *.spec.js"
|
||||
}
|
||||
}
|
@ -27,6 +27,7 @@
|
||||
|
||||
<!--packages-start-->
|
||||
|
||||
- @vates/nbd-client major
|
||||
- @vates/read-chunk patch
|
||||
- @xen-orchestra/log minor
|
||||
- xo-remote-parser patch
|
||||
|
@ -4,7 +4,7 @@ FROM ubuntu:xenial
|
||||
# https://qastack.fr/programming/25899912/how-to-install-nvm-in-docker
|
||||
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y curl qemu-utils blktap-utils vmdk-stream-converter git libxml2-utils libfuse2
|
||||
RUN apt-get install -y curl qemu-utils blktap-utils vmdk-stream-converter git libxml2-utils libfuse2 nbdkit
|
||||
ENV NVM_DIR /usr/local/nvm
|
||||
RUN mkdir -p /usr/local/nvm
|
||||
RUN cd /usr/local/nvm
|
||||
|
@ -61,6 +61,7 @@
|
||||
"testPathIgnorePatterns": [
|
||||
"/@vates/decorate-with/",
|
||||
"/@vates/event-listeners-manager/",
|
||||
"/@vates/nbd-client/",
|
||||
"/@vates/predicates/",
|
||||
"/@xen-orchestra/audit-core/",
|
||||
"/dist/",
|
||||
|
Loading…
Reference in New Issue
Block a user