Compare commits

...

2 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
3e1227c710 feat(nbd): implement server export from/to stream 2023-08-08 14:34:03 +02:00
Florent BEAUCHAMP
ddc73fb836 feat(nbd): server implementation working, no encryption 2023-08-08 11:21:39 +02:00
6 changed files with 383 additions and 11 deletions

View File

@@ -0,0 +1,32 @@
import NbdClient from "./client.mjs";
async function bench(){
const client = new NbdClient({
address:'localhost',
port: 9000,
exportname: 'bench_export'
})
await client.connect()
console.log('connected', client.exportSize)
for(let chunk_size=16*1024; chunk_size < 16*1024*1024; chunk_size *=2){
let i=0
const start = + new Date()
for await(const block of client.readBlocks(chunk_size) ){
i++
if((i*chunk_size) % (16*1024*1024) ===0){
process.stdout.write('.')
}
if(i*chunk_size > 1024*1024*1024) break
}
console.log(chunk_size,Math.round( (i*chunk_size/1024/1024*1000)/ (new Date() - start)))
}
await client.disconnect()
}
bench()

View File

@@ -74,7 +74,7 @@ export default class NbdClient {
this.#serverSocket = connect({ this.#serverSocket = connect({
socket: this.#serverSocket, socket: this.#serverSocket,
rejectUnauthorized: false, rejectUnauthorized: false,
cert: this.#serverCert, cert: this.#serverCert
}) })
this.#serverSocket.once('error', reject) this.#serverSocket.once('error', reject)
this.#serverSocket.once('secureConnect', () => { this.#serverSocket.once('secureConnect', () => {
@@ -88,7 +88,11 @@ export default class NbdClient {
async #unsecureConnect() { async #unsecureConnect() {
this.#serverSocket = new Socket() this.#serverSocket = new Socket()
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this.#serverSocket.connect(this.#serverPort, this.#serverAddress) this.#serverSocket.connect({
port:this.#serverPort,
host: this.#serverAddress,
// @todo should test the onRead to limit buffer copy
})
this.#serverSocket.once('error', reject) this.#serverSocket.once('error', reject)
this.#serverSocket.once('connect', () => { this.#serverSocket.once('connect', () => {
this.#serverSocket.removeListener('error', reject) this.#serverSocket.removeListener('error', reject)
@@ -232,19 +236,20 @@ export default class NbdClient {
} }
try { try {
this.#waitingForResponse = true this.#waitingForResponse = true
const magic = await this.#readInt32() const buffer = await this.#read(4+4+8)
const magic = buffer.readUInt32BE()
if (magic !== NBD_REPLY_MAGIC) { if (magic !== NBD_REPLY_MAGIC) {
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`) throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
} }
const error = await this.#readInt32() const error = buffer.readUInt32BE(4)
if (error !== 0) { if (error !== 0) {
// @todo use error code from constants.mjs // @todo use error code from constants.mjs
throw new Error(`GOT ERROR CODE : ${error}`) throw new Error(`GOT ERROR CODE : ${error}`)
} }
const blockQueryId = await this.#readInt64() const blockQueryId = buffer.readBigUInt64BE(8)
const query = this.#commandQueryBacklog.get(blockQueryId) const query = this.#commandQueryBacklog.get(blockQueryId)
if (!query) { if (!query) {
throw new Error(` no query associated with id ${blockQueryId}`) throw new Error(` no query associated with id ${blockQueryId}`)
@@ -307,11 +312,11 @@ export default class NbdClient {
}) })
} }
async *readBlocks(indexGenerator) { async *readBlocks(indexGenerator = 2*1024*1024) {
// default : read all blocks // default : read all blocks
if (indexGenerator === undefined) { if (typeof indexGenerator === 'number') {
const exportSize = this.#exportSize const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024 const chunkSize = indexGenerator
indexGenerator = function* () { indexGenerator = function* () {
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize))) const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
for (let index = 0; BigInt(index) < nbBlocks; index++) { for (let index = 0; BigInt(index) < nbBlocks; index++) {
@@ -319,12 +324,14 @@ export default class NbdClient {
} }
} }
} }
const readAhead = [] const readAhead = []
const readAheadMaxLength = this.#readAhead const readAheadMaxLength = this.#readAhead
const makeReadBlockPromise = (index, size) => { const makeReadBlockPromise = (index, size) => {
const promise = pRetry(() => this.readBlock(index, size), { const promise = pRetry(() => this.readBlock(index, size), {
tries: this.#readBlockRetries, tries: this.#readBlockRetries,
onRetry: async err => { onRetry: async err => {
console.error(err)
warn('will retry reading block ', index, err) warn('will retry reading block ', index, err)
await this.reconnect() await this.reconnect()
}, },
@@ -336,6 +343,7 @@ export default class NbdClient {
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead // read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) { for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLength promises before starting to handle the results // stack readAheadMaxLength promises before starting to handle the results
if (readAhead.length === readAheadMaxLength) { if (readAhead.length === readAheadMaxLength) {
// any error will stop reading blocks // any error will stop reading blocks
@@ -348,4 +356,4 @@ export default class NbdClient {
yield readAhead.shift() yield readAhead.shift()
} }
} }
} }

View File

@@ -1,12 +1,40 @@
// https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
export const NBD_OPT_EXPORT_NAME = 1 export const NBD_OPT_EXPORT_NAME = 1
export const NBD_OPT_ABORT = 2 export const NBD_OPT_ABORT = 2
export const NBD_OPT_LIST = 3 export const NBD_OPT_LIST = 3
export const NBD_OPT_STARTTLS = 5 export const NBD_OPT_STARTTLS = 5
export const NBD_OPT_INFO = 6 export const NBD_OPT_INFO = 6
export const NBD_OPT_GO = 7 export const NBD_OPT_GO = 7
export const NBD_OPT_STRUCTURED_REPLY = 8
export const NBD_OPT_LIST_META_CONTEXT = 9
export const NBD_OPT_SET_META_CONTEXT = 10
export const NBD_OPT_EXTENDED_HEADERS = 11
export const NBD_REP_ACK =1
export const NBD_REP_SERVER = 2
export const NBD_REP_INFO = 3
export const NBD_REP_META_CONTEXT = 4
export const NBD_REP_ERR_UNSUP = 0x80000001 // 2^32+1
export const NBD_REP_ERR_POLICY = 0x80000002
export const NBD_REP_ERR_INVALID = 0x80000003
export const NBD_REP_ERR_PLATFORM = 0x80000004
export const NBD_REP_ERR_TLS_REQD = 0x80000005
export const NBD_REP_ERR_UNKNOWN = 0x80000006
export const NBD_REP_ERR_SHUTDOWN = 0x80000007
export const NBD_REP_ERR_BLOCK_SIZE_REQD = 0x80000008
export const NBD_REP_ERR_TOO_BIG = 0x80000009
export const NBD_REP_ERR_EXT_HEADER_REQD = 0x8000000a
export const NBD_INFO_EXPORT = 0
export const NBD_INFO_NAME = 1
export const NBD_INFO_DESCRIPTION = 2
export const NBD_INFO_BLOCK_SIZE = 3
export const NBD_FLAG_HAS_FLAGS = 1 << 0 export const NBD_FLAG_HAS_FLAGS = 1 << 0
export const NBD_FLAG_READ_ONLY = 1 << 1 export const NBD_FLAG_READ_ONLY = 1 << 1
@@ -14,6 +42,9 @@ export const NBD_FLAG_SEND_FLUSH = 1 << 2
export const NBD_FLAG_SEND_FUA = 1 << 3 export const NBD_FLAG_SEND_FUA = 1 << 3
export const NBD_FLAG_ROTATIONAL = 1 << 4 export const NBD_FLAG_ROTATIONAL = 1 << 4
export const NBD_FLAG_SEND_TRIM = 1 << 5 export const NBD_FLAG_SEND_TRIM = 1 << 5
export const NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
export const NBD_FLAG_SEND_DF = 1 << 7
export const NBD_FLAG_CAN_MULTI_CONN = 1 << 8
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0 export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
@@ -36,6 +67,15 @@ export const NBD_CMD_RESIZE = 8
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
export const NBD_REPLY_ACK = 1 export const NBD_REPLY_ACK = 1
export const NBD_SIMPLE_REPLY_MAGIC = 0x67446698
export const NBD_STRUCTURED_REPLY_MAGIC = 0x668e33ef
export const NBD_REPLY_TYPE_NONE = 0
export const NBD_REPLY_TYPE_OFFSET_DATA = 1
export const NBD_REPLY_TYPE_OFFSET_HOLE = 2
export const NBD_REPLY_TYPE_BLOCK_STATUS = 5
export const NBD_REPLY_TYPE_ERROR = 1 << 15 +1
export const NBD_REPLY_TYPE_ERROR_OFFSET = 1 << 15 +2
export const NBD_DEFAULT_PORT = 10809 export const NBD_DEFAULT_PORT = 10809
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024 export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024

View File

@@ -0,0 +1,292 @@
import assert, { deepEqual, strictEqual, notStrictEqual } from 'node:assert'
import { createServer } from 'node:net'
import { fromCallback } from 'promise-toolbox'
import { readChunkStrict } from '@vates/read-chunk'
import {
INIT_PASSWD,
NBD_CMD_READ,
NBD_DEFAULT_PORT,
NBD_FLAG_FIXED_NEWSTYLE,
NBD_FLAG_HAS_FLAGS,
NBD_OPT_EXPORT_NAME,
NBD_OPT_REPLY_MAGIC,
NBD_REPLY_ACK,
NBD_REQUEST_MAGIC,
OPTS_MAGIC,
NBD_CMD_DISC,
NBD_REP_ERR_UNSUP,
NBD_CMD_WRITE,
NBD_OPT_GO,
NBD_OPT_INFO,
NBD_INFO_EXPORT,
NBD_REP_INFO,
NBD_SIMPLE_REPLY_MAGIC,
NBD_REP_ERR_UNKNOWN,
} from './constants.mjs'
import { PassThrough } from 'node:stream'
export default class NbdServer {
#server
#clients = new Map()
constructor(port = NBD_DEFAULT_PORT) {
this.#server = createServer()
this.#server.listen(port)
this.#server.on('connection', client => this.#handleNewConnection(client))
}
// will wait for a client to connect and upload the file to this server
downloadStream(key) {
strictEqual(this.#clients.has(key), false)
const stream = new PassThrough()
const offset = BigInt(0)
this.#clients.set(key, { length: BigInt(2 * 1024 * 1024 * 1024 * 1024), stream, offset, key })
return stream
}
// will wait for a client to connect and downlaod this stream
uploadStream(key, source, length) {
strictEqual(this.#clients.has(key), false)
notStrictEqual(length, undefined)
const offset = BigInt(0)
this.#clients.set(key, { length: BigInt(length), stream: source, offset, key })
}
#read(socket, length) {
return readChunkStrict(socket, length)
}
async #readInt32(socket) {
const buffer = await this.#read(socket, 4)
return buffer.readUInt32BE()
}
#write(socket, buffer) {
return fromCallback.call(socket, 'write', buffer)
}
async #writeInt16(socket, int16) {
const buffer = Buffer.alloc(2)
buffer.writeUInt16BE(int16)
return this.#write(socket, buffer)
}
async #writeInt32(socket, int32) {
const buffer = Buffer.alloc(4)
buffer.writeUInt32BE(int32)
return this.#write(socket, buffer)
}
async #writeInt64(socket, int64) {
const buffer = Buffer.alloc(8)
buffer.writeBigUInt64BE(int64)
return this.#write(socket, buffer)
}
async #openExport(key) {
if (!this.#clients.has(key)) {
// export does not exists
const err = new Error('Export not found ')
err.code = 'ENOTFOUND'
throw err
}
const { length } = this.#clients.get(key)
return length
}
async #sendOptionResponse(socket, option, response, data = Buffer.alloc(0)) {
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
await this.#writeInt32(socket, option)
await this.#writeInt32(socket, response)
await this.#writeInt32(socket, data.length)
await this.#write(socket, data)
}
/**
*
* @param {Socket} socket
* @returns true if server is waiting for more options
*/
async #readOption(socket) {
console.log('wait for option')
const magic = await this.#read(socket, 8)
console.log(magic.toString('ascii'), magic.length, OPTS_MAGIC.toString('ascii'))
deepEqual(magic, OPTS_MAGIC)
const option = await this.#readInt32(socket)
const length = await this.#readInt32(socket)
console.log({ option, length })
const data = length > 0 ? await this.#read(socket, length) : undefined
switch (option) {
case NBD_OPT_EXPORT_NAME: {
const exportNameLength = data.readInt32BE()
const key = data.slice(4, exportNameLength + 4).toString()
let exportSize
try {
exportSize = await this.#openExport(key)
} catch (err) {
if (err.code === 'ENOTFOUND') {
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
return false
}
throw err
}
socket.key = key
await this.#writeInt64(socket, exportSize)
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
await this.#write(socket, Buffer.alloc(124) /* padding */)
return false
}
/*
case NBD_OPT_STARTTLS:
console.log('starttls')
// @todo not working
return true
*/
case NBD_OPT_GO:
case NBD_OPT_INFO: {
const exportNameLength = data.readInt32BE()
const key = data.slice(4, exportNameLength + 4).toString()
let exportSize
try {
exportSize = await this.#openExport(key)
} catch (err) {
if (err.code === 'ENOTFOUND') {
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
// @todo should disconnect
return false
}
throw err
}
socket.key = key
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
await this.#writeInt32(socket, option)
await this.#writeInt32(socket, NBD_REP_INFO)
await this.#writeInt32(socket, 12)
// the export info
await this.#writeInt16(socket, NBD_INFO_EXPORT)
await this.#writeInt64(socket, exportSize)
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
// an ACK at the end of the infos
await this.#sendOptionResponse(socket, option, NBD_REPLY_ACK) // no additionnal data
return option === NBD_OPT_INFO // we stays in option phase is option is INFO
}
default:
// not supported
console.log('not supported', option, length, data?.toString())
await this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNSUP) // no additionnal data
// wait for next option
return true
}
}
async #readCommand(socket) {
const key = socket.key
// this socket has an export key
notStrictEqual(key, undefined)
// this export key is still valid
strictEqual(this.#clients.has(key), true)
const client = this.#clients.get(key)
const buffer = await this.#read(socket, 28)
const magic = buffer.readInt32BE(0)
strictEqual(magic, NBD_REQUEST_MAGIC)
/* const commandFlags = */ buffer.readInt16BE(4)
const command = buffer.readInt16BE(6)
const cookie = buffer.readBigUInt64BE(8)
const offset = buffer.readBigUInt64BE(16)
const length = buffer.readInt32BE(24)
switch (command) {
case NBD_CMD_DISC:
console.log('gotdisconnect', client.offset)
await client.stream?.destroy()
// @todo : disconnect
return false
case NBD_CMD_READ: {
/** simple replies */
// read length byte from offset in export
// the client is writing in contiguous mode
assert.strictEqual(offset, client.offset)
client.offset += BigInt(length)
const data = await readChunkStrict(client.stream, length)
const reply = Buffer.alloc(16)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0, 4) // no error
reply.writeBigInt64BE(cookie, 8)
await this.#write(socket, reply)
await this.#write(socket, data)
/* if we implement non stream read, we can handle read in parallel
const reply = Buffer.alloc(16+length)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0,4)// no error
reply.writeBigInt64BE(cookie,8)
// read length byte from offset in export directly in the given buffer
// may do multiple read in parallel on the same export
size += length
socket.fd.read(reply, 16, length, Number(offset))
.then(()=>{
return this.#write(socket, reply)
})
.catch(err => console.error('NBD_CMD_READ',err)) */
return true
}
case NBD_CMD_WRITE: {
// the client is writing in contiguous mode
assert.strictEqual(offset, client.offset)
const data = await this.#read(socket, length)
client.offset += BigInt(length)
await new Promise((resolve, reject) => {
if (!client.stream.write(data, 0, length, Number(offset))) {
client.stream.once('drain', err => (err ? reject(err) : resolve()))
} else {
process.nextTick(resolve)
}
})
const reply = Buffer.alloc(16)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0, 4) // no error
reply.writeBigInt64BE(cookie, 8)
await this.#write(socket, reply)
return true
}
default:
console.log('GOT unsupported command ', command)
// fail to handle
return true
}
}
async #handleNewConnection(socket) {
const remoteAddress = socket.remoteAddress + ':' + socket.remotePort
console.log('new client connection from %s', remoteAddress)
socket.on('close', () => {
console.log('client ', remoteAddress, 'is done')
})
socket.on('error', error => {
throw error
})
// handshake
await this.#write(socket, INIT_PASSWD)
await this.#write(socket, OPTS_MAGIC)
// send flags , the bare minimum
await this.#writeInt16(socket, NBD_FLAG_FIXED_NEWSTYLE)
const clientFlag = await this.#readInt32(socket)
assert.strictEqual(clientFlag & NBD_FLAG_FIXED_NEWSTYLE, NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
// read client response flags
let waitingForOptions = true
while (waitingForOptions) {
waitingForOptions = await this.#readOption(socket)
}
let waitingForCommand = true
while (waitingForCommand) {
waitingForCommand = await this.#readCommand(socket)
}
}
#handleClientData(client, data) {}
}

View File

@@ -1,4 +1,4 @@
import NbdClient from '../index.mjs' import NbdClient from '../client.mjs'
import { spawn, exec } from 'node:child_process' import { spawn, exec } from 'node:child_process'
import fs from 'node:fs/promises' import fs from 'node:fs/promises'
import { test } from 'tap' import { test } from 'tap'

View File

@@ -6,7 +6,7 @@ import { decorateClass } from '@vates/decorate-with'
import { strict as assert } from 'node:assert' import { strict as assert } from 'node:assert'
import extractOpaqueRef from './_extractOpaqueRef.mjs' import extractOpaqueRef from './_extractOpaqueRef.mjs'
import NbdClient from '@vates/nbd-client' import NbdClient from '@vates/nbd-client/client.mjs'
import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js' import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js'
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs' import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs'