Compare commits
2 Commits
improveFor
...
feat_nbd_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e1227c710 | ||
|
|
ddc73fb836 |
32
@vates/nbd-client/bench.mjs
Normal file
32
@vates/nbd-client/bench.mjs
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import NbdClient from "./client.mjs";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async function bench(){
|
||||||
|
const client = new NbdClient({
|
||||||
|
address:'localhost',
|
||||||
|
port: 9000,
|
||||||
|
exportname: 'bench_export'
|
||||||
|
})
|
||||||
|
await client.connect()
|
||||||
|
console.log('connected', client.exportSize)
|
||||||
|
|
||||||
|
for(let chunk_size=16*1024; chunk_size < 16*1024*1024; chunk_size *=2){
|
||||||
|
|
||||||
|
|
||||||
|
let i=0
|
||||||
|
const start = + new Date()
|
||||||
|
for await(const block of client.readBlocks(chunk_size) ){
|
||||||
|
i++
|
||||||
|
if((i*chunk_size) % (16*1024*1024) ===0){
|
||||||
|
process.stdout.write('.')
|
||||||
|
}
|
||||||
|
if(i*chunk_size > 1024*1024*1024) break
|
||||||
|
}
|
||||||
|
console.log(chunk_size,Math.round( (i*chunk_size/1024/1024*1000)/ (new Date() - start)))
|
||||||
|
|
||||||
|
}
|
||||||
|
await client.disconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
bench()
|
||||||
@@ -74,7 +74,7 @@ export default class NbdClient {
|
|||||||
this.#serverSocket = connect({
|
this.#serverSocket = connect({
|
||||||
socket: this.#serverSocket,
|
socket: this.#serverSocket,
|
||||||
rejectUnauthorized: false,
|
rejectUnauthorized: false,
|
||||||
cert: this.#serverCert,
|
cert: this.#serverCert
|
||||||
})
|
})
|
||||||
this.#serverSocket.once('error', reject)
|
this.#serverSocket.once('error', reject)
|
||||||
this.#serverSocket.once('secureConnect', () => {
|
this.#serverSocket.once('secureConnect', () => {
|
||||||
@@ -88,7 +88,11 @@ export default class NbdClient {
|
|||||||
async #unsecureConnect() {
|
async #unsecureConnect() {
|
||||||
this.#serverSocket = new Socket()
|
this.#serverSocket = new Socket()
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
this.#serverSocket.connect(this.#serverPort, this.#serverAddress)
|
this.#serverSocket.connect({
|
||||||
|
port:this.#serverPort,
|
||||||
|
host: this.#serverAddress,
|
||||||
|
// @todo should test the onRead to limit buffer copy
|
||||||
|
})
|
||||||
this.#serverSocket.once('error', reject)
|
this.#serverSocket.once('error', reject)
|
||||||
this.#serverSocket.once('connect', () => {
|
this.#serverSocket.once('connect', () => {
|
||||||
this.#serverSocket.removeListener('error', reject)
|
this.#serverSocket.removeListener('error', reject)
|
||||||
@@ -232,19 +236,20 @@ export default class NbdClient {
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
this.#waitingForResponse = true
|
this.#waitingForResponse = true
|
||||||
const magic = await this.#readInt32()
|
const buffer = await this.#read(4+4+8)
|
||||||
|
const magic = buffer.readUInt32BE()
|
||||||
|
|
||||||
if (magic !== NBD_REPLY_MAGIC) {
|
if (magic !== NBD_REPLY_MAGIC) {
|
||||||
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
|
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const error = await this.#readInt32()
|
const error = buffer.readUInt32BE(4)
|
||||||
if (error !== 0) {
|
if (error !== 0) {
|
||||||
// @todo use error code from constants.mjs
|
// @todo use error code from constants.mjs
|
||||||
throw new Error(`GOT ERROR CODE : ${error}`)
|
throw new Error(`GOT ERROR CODE : ${error}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
const blockQueryId = await this.#readInt64()
|
const blockQueryId = buffer.readBigUInt64BE(8)
|
||||||
const query = this.#commandQueryBacklog.get(blockQueryId)
|
const query = this.#commandQueryBacklog.get(blockQueryId)
|
||||||
if (!query) {
|
if (!query) {
|
||||||
throw new Error(` no query associated with id ${blockQueryId}`)
|
throw new Error(` no query associated with id ${blockQueryId}`)
|
||||||
@@ -307,11 +312,11 @@ export default class NbdClient {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async *readBlocks(indexGenerator) {
|
async *readBlocks(indexGenerator = 2*1024*1024) {
|
||||||
// default : read all blocks
|
// default : read all blocks
|
||||||
if (indexGenerator === undefined) {
|
if (typeof indexGenerator === 'number') {
|
||||||
const exportSize = this.#exportSize
|
const exportSize = this.#exportSize
|
||||||
const chunkSize = 2 * 1024 * 1024
|
const chunkSize = indexGenerator
|
||||||
indexGenerator = function* () {
|
indexGenerator = function* () {
|
||||||
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
|
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
|
||||||
for (let index = 0; BigInt(index) < nbBlocks; index++) {
|
for (let index = 0; BigInt(index) < nbBlocks; index++) {
|
||||||
@@ -319,12 +324,14 @@ export default class NbdClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const readAhead = []
|
const readAhead = []
|
||||||
const readAheadMaxLength = this.#readAhead
|
const readAheadMaxLength = this.#readAhead
|
||||||
const makeReadBlockPromise = (index, size) => {
|
const makeReadBlockPromise = (index, size) => {
|
||||||
const promise = pRetry(() => this.readBlock(index, size), {
|
const promise = pRetry(() => this.readBlock(index, size), {
|
||||||
tries: this.#readBlockRetries,
|
tries: this.#readBlockRetries,
|
||||||
onRetry: async err => {
|
onRetry: async err => {
|
||||||
|
console.error(err)
|
||||||
warn('will retry reading block ', index, err)
|
warn('will retry reading block ', index, err)
|
||||||
await this.reconnect()
|
await this.reconnect()
|
||||||
},
|
},
|
||||||
@@ -336,6 +343,7 @@ export default class NbdClient {
|
|||||||
|
|
||||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||||
for (const { index, size } of indexGenerator()) {
|
for (const { index, size } of indexGenerator()) {
|
||||||
|
|
||||||
// stack readAheadMaxLength promises before starting to handle the results
|
// stack readAheadMaxLength promises before starting to handle the results
|
||||||
if (readAhead.length === readAheadMaxLength) {
|
if (readAhead.length === readAheadMaxLength) {
|
||||||
// any error will stop reading blocks
|
// any error will stop reading blocks
|
||||||
@@ -348,4 +356,4 @@ export default class NbdClient {
|
|||||||
yield readAhead.shift()
|
yield readAhead.shift()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,12 +1,40 @@
|
|||||||
|
// https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||||
|
|
||||||
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
||||||
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
||||||
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
||||||
|
|
||||||
export const NBD_OPT_EXPORT_NAME = 1
|
export const NBD_OPT_EXPORT_NAME = 1
|
||||||
export const NBD_OPT_ABORT = 2
|
export const NBD_OPT_ABORT = 2
|
||||||
export const NBD_OPT_LIST = 3
|
export const NBD_OPT_LIST = 3
|
||||||
export const NBD_OPT_STARTTLS = 5
|
export const NBD_OPT_STARTTLS = 5
|
||||||
export const NBD_OPT_INFO = 6
|
export const NBD_OPT_INFO = 6
|
||||||
export const NBD_OPT_GO = 7
|
export const NBD_OPT_GO = 7
|
||||||
|
export const NBD_OPT_STRUCTURED_REPLY = 8
|
||||||
|
export const NBD_OPT_LIST_META_CONTEXT = 9
|
||||||
|
export const NBD_OPT_SET_META_CONTEXT = 10
|
||||||
|
export const NBD_OPT_EXTENDED_HEADERS = 11
|
||||||
|
|
||||||
|
export const NBD_REP_ACK =1
|
||||||
|
export const NBD_REP_SERVER = 2
|
||||||
|
export const NBD_REP_INFO = 3
|
||||||
|
export const NBD_REP_META_CONTEXT = 4
|
||||||
|
export const NBD_REP_ERR_UNSUP = 0x80000001 // 2^32+1
|
||||||
|
export const NBD_REP_ERR_POLICY = 0x80000002
|
||||||
|
export const NBD_REP_ERR_INVALID = 0x80000003
|
||||||
|
export const NBD_REP_ERR_PLATFORM = 0x80000004
|
||||||
|
export const NBD_REP_ERR_TLS_REQD = 0x80000005
|
||||||
|
export const NBD_REP_ERR_UNKNOWN = 0x80000006
|
||||||
|
export const NBD_REP_ERR_SHUTDOWN = 0x80000007
|
||||||
|
export const NBD_REP_ERR_BLOCK_SIZE_REQD = 0x80000008
|
||||||
|
export const NBD_REP_ERR_TOO_BIG = 0x80000009
|
||||||
|
export const NBD_REP_ERR_EXT_HEADER_REQD = 0x8000000a
|
||||||
|
|
||||||
|
export const NBD_INFO_EXPORT = 0
|
||||||
|
export const NBD_INFO_NAME = 1
|
||||||
|
export const NBD_INFO_DESCRIPTION = 2
|
||||||
|
export const NBD_INFO_BLOCK_SIZE = 3
|
||||||
|
|
||||||
|
|
||||||
export const NBD_FLAG_HAS_FLAGS = 1 << 0
|
export const NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||||
export const NBD_FLAG_READ_ONLY = 1 << 1
|
export const NBD_FLAG_READ_ONLY = 1 << 1
|
||||||
@@ -14,6 +42,9 @@ export const NBD_FLAG_SEND_FLUSH = 1 << 2
|
|||||||
export const NBD_FLAG_SEND_FUA = 1 << 3
|
export const NBD_FLAG_SEND_FUA = 1 << 3
|
||||||
export const NBD_FLAG_ROTATIONAL = 1 << 4
|
export const NBD_FLAG_ROTATIONAL = 1 << 4
|
||||||
export const NBD_FLAG_SEND_TRIM = 1 << 5
|
export const NBD_FLAG_SEND_TRIM = 1 << 5
|
||||||
|
export const NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
|
||||||
|
export const NBD_FLAG_SEND_DF = 1 << 7
|
||||||
|
export const NBD_FLAG_CAN_MULTI_CONN = 1 << 8
|
||||||
|
|
||||||
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||||
|
|
||||||
@@ -36,6 +67,15 @@ export const NBD_CMD_RESIZE = 8
|
|||||||
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||||
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||||
export const NBD_REPLY_ACK = 1
|
export const NBD_REPLY_ACK = 1
|
||||||
|
export const NBD_SIMPLE_REPLY_MAGIC = 0x67446698
|
||||||
|
export const NBD_STRUCTURED_REPLY_MAGIC = 0x668e33ef
|
||||||
|
export const NBD_REPLY_TYPE_NONE = 0
|
||||||
|
export const NBD_REPLY_TYPE_OFFSET_DATA = 1
|
||||||
|
export const NBD_REPLY_TYPE_OFFSET_HOLE = 2
|
||||||
|
export const NBD_REPLY_TYPE_BLOCK_STATUS = 5
|
||||||
|
export const NBD_REPLY_TYPE_ERROR = 1 << 15 +1
|
||||||
|
export const NBD_REPLY_TYPE_ERROR_OFFSET = 1 << 15 +2
|
||||||
|
|
||||||
|
|
||||||
export const NBD_DEFAULT_PORT = 10809
|
export const NBD_DEFAULT_PORT = 10809
|
||||||
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||||
|
|||||||
292
@vates/nbd-client/server.mjs
Normal file
292
@vates/nbd-client/server.mjs
Normal file
@@ -0,0 +1,292 @@
|
|||||||
|
import assert, { deepEqual, strictEqual, notStrictEqual } from 'node:assert'
|
||||||
|
import { createServer } from 'node:net'
|
||||||
|
import { fromCallback } from 'promise-toolbox'
|
||||||
|
import { readChunkStrict } from '@vates/read-chunk'
|
||||||
|
import {
|
||||||
|
INIT_PASSWD,
|
||||||
|
NBD_CMD_READ,
|
||||||
|
NBD_DEFAULT_PORT,
|
||||||
|
NBD_FLAG_FIXED_NEWSTYLE,
|
||||||
|
NBD_FLAG_HAS_FLAGS,
|
||||||
|
NBD_OPT_EXPORT_NAME,
|
||||||
|
NBD_OPT_REPLY_MAGIC,
|
||||||
|
NBD_REPLY_ACK,
|
||||||
|
NBD_REQUEST_MAGIC,
|
||||||
|
OPTS_MAGIC,
|
||||||
|
NBD_CMD_DISC,
|
||||||
|
NBD_REP_ERR_UNSUP,
|
||||||
|
NBD_CMD_WRITE,
|
||||||
|
NBD_OPT_GO,
|
||||||
|
NBD_OPT_INFO,
|
||||||
|
NBD_INFO_EXPORT,
|
||||||
|
NBD_REP_INFO,
|
||||||
|
NBD_SIMPLE_REPLY_MAGIC,
|
||||||
|
NBD_REP_ERR_UNKNOWN,
|
||||||
|
} from './constants.mjs'
|
||||||
|
import { PassThrough } from 'node:stream'
|
||||||
|
|
||||||
|
export default class NbdServer {
|
||||||
|
#server
|
||||||
|
#clients = new Map()
|
||||||
|
constructor(port = NBD_DEFAULT_PORT) {
|
||||||
|
this.#server = createServer()
|
||||||
|
this.#server.listen(port)
|
||||||
|
this.#server.on('connection', client => this.#handleNewConnection(client))
|
||||||
|
}
|
||||||
|
|
||||||
|
// will wait for a client to connect and upload the file to this server
|
||||||
|
downloadStream(key) {
|
||||||
|
strictEqual(this.#clients.has(key), false)
|
||||||
|
const stream = new PassThrough()
|
||||||
|
const offset = BigInt(0)
|
||||||
|
this.#clients.set(key, { length: BigInt(2 * 1024 * 1024 * 1024 * 1024), stream, offset, key })
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
// will wait for a client to connect and downlaod this stream
|
||||||
|
uploadStream(key, source, length) {
|
||||||
|
strictEqual(this.#clients.has(key), false)
|
||||||
|
notStrictEqual(length, undefined)
|
||||||
|
const offset = BigInt(0)
|
||||||
|
this.#clients.set(key, { length: BigInt(length), stream: source, offset, key })
|
||||||
|
}
|
||||||
|
|
||||||
|
#read(socket, length) {
|
||||||
|
return readChunkStrict(socket, length)
|
||||||
|
}
|
||||||
|
async #readInt32(socket) {
|
||||||
|
const buffer = await this.#read(socket, 4)
|
||||||
|
return buffer.readUInt32BE()
|
||||||
|
}
|
||||||
|
|
||||||
|
#write(socket, buffer) {
|
||||||
|
return fromCallback.call(socket, 'write', buffer)
|
||||||
|
}
|
||||||
|
async #writeInt16(socket, int16) {
|
||||||
|
const buffer = Buffer.alloc(2)
|
||||||
|
buffer.writeUInt16BE(int16)
|
||||||
|
return this.#write(socket, buffer)
|
||||||
|
}
|
||||||
|
async #writeInt32(socket, int32) {
|
||||||
|
const buffer = Buffer.alloc(4)
|
||||||
|
buffer.writeUInt32BE(int32)
|
||||||
|
return this.#write(socket, buffer)
|
||||||
|
}
|
||||||
|
async #writeInt64(socket, int64) {
|
||||||
|
const buffer = Buffer.alloc(8)
|
||||||
|
buffer.writeBigUInt64BE(int64)
|
||||||
|
return this.#write(socket, buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
async #openExport(key) {
|
||||||
|
if (!this.#clients.has(key)) {
|
||||||
|
// export does not exists
|
||||||
|
const err = new Error('Export not found ')
|
||||||
|
err.code = 'ENOTFOUND'
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
const { length } = this.#clients.get(key)
|
||||||
|
return length
|
||||||
|
}
|
||||||
|
|
||||||
|
async #sendOptionResponse(socket, option, response, data = Buffer.alloc(0)) {
|
||||||
|
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
|
||||||
|
await this.#writeInt32(socket, option)
|
||||||
|
await this.#writeInt32(socket, response)
|
||||||
|
await this.#writeInt32(socket, data.length)
|
||||||
|
await this.#write(socket, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param {Socket} socket
|
||||||
|
* @returns true if server is waiting for more options
|
||||||
|
*/
|
||||||
|
async #readOption(socket) {
|
||||||
|
console.log('wait for option')
|
||||||
|
const magic = await this.#read(socket, 8)
|
||||||
|
console.log(magic.toString('ascii'), magic.length, OPTS_MAGIC.toString('ascii'))
|
||||||
|
deepEqual(magic, OPTS_MAGIC)
|
||||||
|
const option = await this.#readInt32(socket)
|
||||||
|
const length = await this.#readInt32(socket)
|
||||||
|
console.log({ option, length })
|
||||||
|
const data = length > 0 ? await this.#read(socket, length) : undefined
|
||||||
|
switch (option) {
|
||||||
|
case NBD_OPT_EXPORT_NAME: {
|
||||||
|
const exportNameLength = data.readInt32BE()
|
||||||
|
const key = data.slice(4, exportNameLength + 4).toString()
|
||||||
|
let exportSize
|
||||||
|
try {
|
||||||
|
exportSize = await this.#openExport(key)
|
||||||
|
} catch (err) {
|
||||||
|
if (err.code === 'ENOTFOUND') {
|
||||||
|
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
socket.key = key
|
||||||
|
await this.#writeInt64(socket, exportSize)
|
||||||
|
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
|
||||||
|
await this.#write(socket, Buffer.alloc(124) /* padding */)
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
case NBD_OPT_STARTTLS:
|
||||||
|
console.log('starttls')
|
||||||
|
// @todo not working
|
||||||
|
return true
|
||||||
|
*/
|
||||||
|
|
||||||
|
case NBD_OPT_GO:
|
||||||
|
case NBD_OPT_INFO: {
|
||||||
|
const exportNameLength = data.readInt32BE()
|
||||||
|
const key = data.slice(4, exportNameLength + 4).toString()
|
||||||
|
let exportSize
|
||||||
|
try {
|
||||||
|
exportSize = await this.#openExport(key)
|
||||||
|
} catch (err) {
|
||||||
|
if (err.code === 'ENOTFOUND') {
|
||||||
|
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
|
||||||
|
// @todo should disconnect
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
socket.key = key
|
||||||
|
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
|
||||||
|
await this.#writeInt32(socket, option)
|
||||||
|
await this.#writeInt32(socket, NBD_REP_INFO)
|
||||||
|
await this.#writeInt32(socket, 12)
|
||||||
|
// the export info
|
||||||
|
await this.#writeInt16(socket, NBD_INFO_EXPORT)
|
||||||
|
await this.#writeInt64(socket, exportSize)
|
||||||
|
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
|
||||||
|
|
||||||
|
// an ACK at the end of the infos
|
||||||
|
await this.#sendOptionResponse(socket, option, NBD_REPLY_ACK) // no additionnal data
|
||||||
|
return option === NBD_OPT_INFO // we stays in option phase is option is INFO
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// not supported
|
||||||
|
console.log('not supported', option, length, data?.toString())
|
||||||
|
await this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNSUP) // no additionnal data
|
||||||
|
// wait for next option
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async #readCommand(socket) {
|
||||||
|
const key = socket.key
|
||||||
|
// this socket has an export key
|
||||||
|
notStrictEqual(key, undefined)
|
||||||
|
// this export key is still valid
|
||||||
|
strictEqual(this.#clients.has(key), true)
|
||||||
|
const client = this.#clients.get(key)
|
||||||
|
|
||||||
|
const buffer = await this.#read(socket, 28)
|
||||||
|
const magic = buffer.readInt32BE(0)
|
||||||
|
strictEqual(magic, NBD_REQUEST_MAGIC)
|
||||||
|
/* const commandFlags = */ buffer.readInt16BE(4)
|
||||||
|
const command = buffer.readInt16BE(6)
|
||||||
|
const cookie = buffer.readBigUInt64BE(8)
|
||||||
|
const offset = buffer.readBigUInt64BE(16)
|
||||||
|
const length = buffer.readInt32BE(24)
|
||||||
|
switch (command) {
|
||||||
|
case NBD_CMD_DISC:
|
||||||
|
console.log('gotdisconnect', client.offset)
|
||||||
|
await client.stream?.destroy()
|
||||||
|
// @todo : disconnect
|
||||||
|
return false
|
||||||
|
case NBD_CMD_READ: {
|
||||||
|
/** simple replies */
|
||||||
|
|
||||||
|
// read length byte from offset in export
|
||||||
|
|
||||||
|
// the client is writing in contiguous mode
|
||||||
|
assert.strictEqual(offset, client.offset)
|
||||||
|
client.offset += BigInt(length)
|
||||||
|
const data = await readChunkStrict(client.stream, length)
|
||||||
|
const reply = Buffer.alloc(16)
|
||||||
|
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||||
|
reply.writeInt32BE(0, 4) // no error
|
||||||
|
reply.writeBigInt64BE(cookie, 8)
|
||||||
|
await this.#write(socket, reply)
|
||||||
|
await this.#write(socket, data)
|
||||||
|
/* if we implement non stream read, we can handle read in parallel
|
||||||
|
const reply = Buffer.alloc(16+length)
|
||||||
|
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||||
|
reply.writeInt32BE(0,4)// no error
|
||||||
|
reply.writeBigInt64BE(cookie,8)
|
||||||
|
|
||||||
|
// read length byte from offset in export directly in the given buffer
|
||||||
|
// may do multiple read in parallel on the same export
|
||||||
|
size += length
|
||||||
|
socket.fd.read(reply, 16, length, Number(offset))
|
||||||
|
.then(()=>{
|
||||||
|
return this.#write(socket, reply)
|
||||||
|
})
|
||||||
|
.catch(err => console.error('NBD_CMD_READ',err)) */
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
case NBD_CMD_WRITE: {
|
||||||
|
// the client is writing in contiguous mode
|
||||||
|
assert.strictEqual(offset, client.offset)
|
||||||
|
|
||||||
|
const data = await this.#read(socket, length)
|
||||||
|
client.offset += BigInt(length)
|
||||||
|
await new Promise((resolve, reject) => {
|
||||||
|
if (!client.stream.write(data, 0, length, Number(offset))) {
|
||||||
|
client.stream.once('drain', err => (err ? reject(err) : resolve()))
|
||||||
|
} else {
|
||||||
|
process.nextTick(resolve)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
const reply = Buffer.alloc(16)
|
||||||
|
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||||
|
reply.writeInt32BE(0, 4) // no error
|
||||||
|
reply.writeBigInt64BE(cookie, 8)
|
||||||
|
await this.#write(socket, reply)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
console.log('GOT unsupported command ', command)
|
||||||
|
// fail to handle
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async #handleNewConnection(socket) {
|
||||||
|
const remoteAddress = socket.remoteAddress + ':' + socket.remotePort
|
||||||
|
console.log('new client connection from %s', remoteAddress)
|
||||||
|
|
||||||
|
socket.on('close', () => {
|
||||||
|
console.log('client ', remoteAddress, 'is done')
|
||||||
|
})
|
||||||
|
socket.on('error', error => {
|
||||||
|
throw error
|
||||||
|
})
|
||||||
|
// handshake
|
||||||
|
await this.#write(socket, INIT_PASSWD)
|
||||||
|
await this.#write(socket, OPTS_MAGIC)
|
||||||
|
|
||||||
|
// send flags , the bare minimum
|
||||||
|
await this.#writeInt16(socket, NBD_FLAG_FIXED_NEWSTYLE)
|
||||||
|
const clientFlag = await this.#readInt32(socket)
|
||||||
|
assert.strictEqual(clientFlag & NBD_FLAG_FIXED_NEWSTYLE, NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
|
||||||
|
|
||||||
|
// read client response flags
|
||||||
|
let waitingForOptions = true
|
||||||
|
while (waitingForOptions) {
|
||||||
|
waitingForOptions = await this.#readOption(socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
let waitingForCommand = true
|
||||||
|
while (waitingForCommand) {
|
||||||
|
waitingForCommand = await this.#readCommand(socket)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#handleClientData(client, data) {}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
import NbdClient from '../index.mjs'
|
import NbdClient from '../client.mjs'
|
||||||
import { spawn, exec } from 'node:child_process'
|
import { spawn, exec } from 'node:child_process'
|
||||||
import fs from 'node:fs/promises'
|
import fs from 'node:fs/promises'
|
||||||
import { test } from 'tap'
|
import { test } from 'tap'
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import { decorateClass } from '@vates/decorate-with'
|
|||||||
import { strict as assert } from 'node:assert'
|
import { strict as assert } from 'node:assert'
|
||||||
|
|
||||||
import extractOpaqueRef from './_extractOpaqueRef.mjs'
|
import extractOpaqueRef from './_extractOpaqueRef.mjs'
|
||||||
import NbdClient from '@vates/nbd-client'
|
import NbdClient from '@vates/nbd-client/client.mjs'
|
||||||
import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js'
|
import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js'
|
||||||
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs'
|
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs'
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user