Compare commits
25 Commits
fix-iso-sm
...
flo_test_n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
266ec24260 | ||
|
|
5bee3a4f22 | ||
|
|
4e1b29bdd0 | ||
|
|
1bfb8af28b | ||
|
|
f07eeb0897 | ||
|
|
e7443e94a9 | ||
|
|
7e678a38e3 | ||
|
|
3323abdf55 | ||
|
|
48a3477c37 | ||
|
|
df81e77f33 | ||
|
|
50791282a6 | ||
|
|
adb7feada9 | ||
|
|
3573dfc861 | ||
|
|
c78c4ae055 | ||
|
|
47977a278d | ||
|
|
2307dc4cbe | ||
|
|
b82923bb5d | ||
|
|
7b2a80fcfc | ||
|
|
aaa4da4390 | ||
|
|
326bafb7f2 | ||
|
|
f5db40244a | ||
|
|
cd8eaca453 | ||
|
|
3a9117553d | ||
|
|
16877d074b | ||
|
|
b136698bae |
1
@vates/nbd-client/.npmignore
Symbolic link
1
@vates/nbd-client/.npmignore
Symbolic link
@@ -0,0 +1 @@
|
||||
../../scripts/npmignore
|
||||
41
@vates/nbd-client/constants.mjs
Normal file
41
@vates/nbd-client/constants.mjs
Normal file
@@ -0,0 +1,41 @@
|
||||
export const INIT_PASSWD = 'NBDMAGIC' // "NBDMAGIC" ensure we're connected to a nbd server
|
||||
export const OPTS_MAGIC = 'IHAVEOPT' // "IHAVEOPT" start an option block
|
||||
export const NBD_OPT_REPLY_MAGIC = 0x3e889045565a9 // magic received during negociation
|
||||
export const NBD_OPT_EXPORT_NAME = 1
|
||||
export const NBD_OPT_ABORT = 2
|
||||
export const NBD_OPT_LIST = 3
|
||||
export const NBD_OPT_STARTTLS = 5
|
||||
export const NBD_OPT_INFO = 6
|
||||
export const NBD_OPT_GO = 7
|
||||
|
||||
export const NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||
export const NBD_FLAG_READ_ONLY = 1 << 1
|
||||
export const NBD_FLAG_SEND_FLUSH = 1 << 2
|
||||
export const NBD_FLAG_SEND_FUA = 1 << 3
|
||||
export const NBD_FLAG_ROTATIONAL = 1 << 4
|
||||
export const NBD_FLAG_SEND_TRIM = 1 << 5
|
||||
|
||||
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||
|
||||
export const NBD_CMD_FLAG_FUA = 1 << 0
|
||||
export const NBD_CMD_FLAG_NO_HOLE = 1 << 1
|
||||
export const NBD_CMD_FLAG_DF = 1 << 2
|
||||
export const NBD_CMD_FLAG_REQ_ONE = 1 << 3
|
||||
export const NBD_CMD_FLAG_FAST_ZERO = 1 << 4
|
||||
|
||||
export const NBD_CMD_READ = 0
|
||||
export const NBD_CMD_WRITE = 1
|
||||
export const NBD_CMD_DISC = 2
|
||||
export const NBD_CMD_FLUSH = 3
|
||||
export const NBD_CMD_TRIM = 4
|
||||
export const NBD_CMD_CACHE = 5
|
||||
export const NBD_CMD_WRITE_ZEROES = 6
|
||||
export const NBD_CMD_BLOCK_STATUS = 7
|
||||
export const NBD_CMD_RESIZE = 8
|
||||
|
||||
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||
|
||||
export const NBD_DEFAULT_PORT = 10809
|
||||
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||
export const MAX_BUFFER_LENGTH = 10 * 1024 * 1024
|
||||
272
@vates/nbd-client/index.mjs
Normal file
272
@vates/nbd-client/index.mjs
Normal file
@@ -0,0 +1,272 @@
|
||||
import assert from 'node:assert'
|
||||
import { Socket } from 'node:net'
|
||||
import { connect } from 'node:tls'
|
||||
import {
|
||||
INIT_PASSWD,
|
||||
MAX_BUFFER_LENGTH,
|
||||
NBD_CMD_READ,
|
||||
NBD_DEFAULT_BLOCK_SIZE,
|
||||
NBD_DEFAULT_PORT,
|
||||
NBD_FLAG_FIXED_NEWSTYLE,
|
||||
NBD_FLAG_HAS_FLAGS,
|
||||
NBD_OPT_EXPORT_NAME,
|
||||
NBD_OPT_REPLY_MAGIC,
|
||||
NBD_OPT_STARTTLS,
|
||||
NBD_REPLY_MAGIC,
|
||||
NBD_REQUEST_MAGIC,
|
||||
OPTS_MAGIC,
|
||||
} from './constants.mjs'
|
||||
|
||||
// documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||
|
||||
export default class NbdClient {
|
||||
_serverAddress
|
||||
_serverCert
|
||||
_serverPort
|
||||
_serverSocket
|
||||
_useSecureConnection = false
|
||||
|
||||
_exportname
|
||||
_nbDiskBlocks = 0
|
||||
|
||||
_receptionBuffer = Buffer.alloc(0)
|
||||
_sendingBuffer = Buffer.alloc(0)
|
||||
|
||||
// ensure the read are resolved in the right order
|
||||
_rawReadResolve = []
|
||||
_rawReadLength = []
|
||||
|
||||
// AFAIK, there is no guaranty the server answer in the same order as the query
|
||||
_nextCommandQueryId = BigInt(0)
|
||||
_commandQueries = {} // map of queries waiting for an answer
|
||||
|
||||
constructor({ address, port = NBD_DEFAULT_PORT, exportname, cert, secure = true }) {
|
||||
this._address = address
|
||||
this._serverPort = port
|
||||
this._exportname = exportname
|
||||
this._serverCert = cert
|
||||
this._useSecureConnection = secure
|
||||
}
|
||||
|
||||
get nbBlocks() {
|
||||
return this._nbDiskBlocks
|
||||
}
|
||||
|
||||
_handleData(data) {
|
||||
if (data !== undefined) {
|
||||
this._receptionBuffer = Buffer.concat([this._receptionBuffer, Buffer.from(data)])
|
||||
}
|
||||
if (this._receptionBuffer.length > MAX_BUFFER_LENGTH) {
|
||||
throw new Error(
|
||||
`Buffer grown too much with a total size of ${this._receptionBuffer.length} bytes (last chunk is ${data.length})`
|
||||
)
|
||||
}
|
||||
// if we're waiting for a specific bit length (in the handshake for example or a block data)
|
||||
while (this._rawReadResolve.length > 0 && this._receptionBuffer.length >= this._rawReadLength[0]) {
|
||||
const resolve = this._rawReadResolve.shift()
|
||||
const waitingForLength = this._rawReadLength.shift()
|
||||
resolve(this._takeFromBuffer(waitingForLength))
|
||||
}
|
||||
if (this._rawReadResolve.length === 0 && this._receptionBuffer.length > 4) {
|
||||
if (this._receptionBuffer.readInt32BE(0) === NBD_REPLY_MAGIC) {
|
||||
this._readBlockResponse()
|
||||
}
|
||||
// keep the received bits in the buffer for subsequent use
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
async _addListenners() {
|
||||
const serverSocket = this._serverSocket
|
||||
serverSocket.on('data', data => this._handleData(data))
|
||||
|
||||
serverSocket.on('close', function () {
|
||||
console.log('Connection closed')
|
||||
})
|
||||
serverSocket.on('error', function (err) {
|
||||
throw err
|
||||
})
|
||||
}
|
||||
|
||||
async _tlsConnect() {
|
||||
return new Promise(resolve => {
|
||||
this._serverSocket = connect(
|
||||
{
|
||||
socket: this._serverSocket,
|
||||
rejectUnauthorized: false,
|
||||
cert: this._serverCert,
|
||||
},
|
||||
resolve
|
||||
)
|
||||
this._addListenners()
|
||||
})
|
||||
}
|
||||
async _unsecureConnect() {
|
||||
this._serverSocket = new Socket()
|
||||
this._addListenners()
|
||||
return new Promise((resolve, reject) => {
|
||||
this._serverSocket.connect(this._serverPort, this._serverAddress, () => {
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
async connect() {
|
||||
await this._unsecureConnect()
|
||||
await this._handshake()
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
await this._serverSocket.destroy()
|
||||
}
|
||||
async _sendOption(option, buffer = Buffer.alloc(0)) {
|
||||
await this._writeToSocket(OPTS_MAGIC)
|
||||
await this._writeToSocketInt32(option)
|
||||
await this._writeToSocketInt32(buffer.length)
|
||||
await this._writeToSocket(buffer)
|
||||
assert(await this._readFromSocketInt64(), NBD_OPT_REPLY_MAGIC) // magic number everywhere
|
||||
assert(await this._readFromSocketInt32(), option) // the option passed
|
||||
assert(await this._readFromSocketInt32(), 1) // ACK
|
||||
const length = await this._readFromSocketInt32()
|
||||
assert(length === 0) // length
|
||||
}
|
||||
|
||||
async _handshake() {
|
||||
assert(await this._readFromSocket(8), INIT_PASSWD)
|
||||
assert(await this._readFromSocket(8), OPTS_MAGIC)
|
||||
const flagsBuffer = await this._readFromSocket(2)
|
||||
const flags = flagsBuffer.readInt16BE(0)
|
||||
assert(flags | NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
|
||||
await this._writeToSocketInt32(NBD_FLAG_FIXED_NEWSTYLE) // client also support NBD_FLAG_C_FIXED_NEWSTYLE
|
||||
|
||||
if (this._useSecureConnection) {
|
||||
// upgrade socket to TLS
|
||||
await this._sendOption(NBD_OPT_STARTTLS)
|
||||
await this._tlsConnect()
|
||||
}
|
||||
|
||||
// send export name required it also implictly closes the negociation phase
|
||||
await this._writeToSocket(Buffer.from(OPTS_MAGIC))
|
||||
await this._writeToSocketInt32(NBD_OPT_EXPORT_NAME)
|
||||
await this._writeToSocketInt32(this._exportname.length)
|
||||
|
||||
await this._writeToSocket(Buffer.from(this._exportname))
|
||||
// 8 + 2 + 124
|
||||
const answer = await this._readFromSocket(134)
|
||||
const exportSize = answer.readBigUInt64BE(0)
|
||||
const transmissionFlags = answer.readInt16BE(8)
|
||||
assert(transmissionFlags & NBD_FLAG_HAS_FLAGS, 'NBD_FLAG_HAS_FLAGS') // must always be 1 by the norm
|
||||
|
||||
// xapi server always send NBD_FLAG_READ_ONLY (3) as a flag
|
||||
|
||||
this._nbDiskBlocks = Number(exportSize / BigInt(NBD_DEFAULT_BLOCK_SIZE))
|
||||
this._exportSize = exportSize
|
||||
}
|
||||
|
||||
_takeFromBuffer(length) {
|
||||
const res = Buffer.from(this._receptionBuffer.slice(0, length))
|
||||
this._receptionBuffer = this._receptionBuffer.slice(length)
|
||||
return res
|
||||
}
|
||||
|
||||
_readFromSocket(length) {
|
||||
if (this._receptionBuffer.length >= length) {
|
||||
return this._takeFromBuffer(length)
|
||||
}
|
||||
return new Promise(resolve => {
|
||||
this._rawReadResolve.push(resolve)
|
||||
this._rawReadLength.push(length)
|
||||
})
|
||||
}
|
||||
|
||||
_writeToSocket(buffer) {
|
||||
return new Promise(resolve => {
|
||||
this._serverSocket.write(buffer, resolve)
|
||||
})
|
||||
}
|
||||
|
||||
async _readFromSocketInt32() {
|
||||
const buffer = await this._readFromSocket(4)
|
||||
|
||||
return buffer.readInt32BE(0)
|
||||
}
|
||||
|
||||
async _readFromSocketInt64() {
|
||||
const buffer = await this._readFromSocket(8)
|
||||
return buffer.readBigUInt64BE(0)
|
||||
}
|
||||
|
||||
_writeToSocketUInt32(int) {
|
||||
const buffer = Buffer.alloc(4)
|
||||
buffer.writeUInt32BE(int)
|
||||
return this._writeToSocket(buffer)
|
||||
}
|
||||
_writeToSocketInt32(int) {
|
||||
const buffer = Buffer.alloc(4)
|
||||
buffer.writeInt32BE(int)
|
||||
return this._writeToSocket(buffer)
|
||||
}
|
||||
|
||||
_writeToSocketInt16(int) {
|
||||
const buffer = Buffer.alloc(2)
|
||||
buffer.writeInt16BE(int)
|
||||
return this._writeToSocket(buffer)
|
||||
}
|
||||
_writeToSocketInt64(int) {
|
||||
const buffer = Buffer.alloc(8)
|
||||
buffer.writeBigUInt64BE(BigInt(int))
|
||||
return this._writeToSocket(buffer)
|
||||
}
|
||||
|
||||
async _readBlockResponse() {
|
||||
const magic = await this._readFromSocketInt32()
|
||||
|
||||
if (magic !== NBD_REPLY_MAGIC) {
|
||||
throw new Error(`magic number for block answer is wrong : ${magic}`)
|
||||
}
|
||||
// error
|
||||
const error = await this._readFromSocketInt32()
|
||||
if (error !== 0) {
|
||||
throw new Error(`GOT ERROR CODE : ${error}`)
|
||||
}
|
||||
|
||||
const blockQueryId = await this._readFromSocketInt64()
|
||||
const query = this._commandQueries[blockQueryId]
|
||||
if (!query) {
|
||||
throw new Error(` no query associated with id ${blockQueryId} ${Object.keys(this._commandQueries)}`)
|
||||
}
|
||||
delete this._commandQueries[blockQueryId]
|
||||
const data = await this._readFromSocket(query.size)
|
||||
assert.strictEqual(data.length, query.size)
|
||||
query.resolve(data)
|
||||
this._handleData()
|
||||
}
|
||||
|
||||
async readBlock(index, size = NBD_DEFAULT_BLOCK_SIZE) {
|
||||
const queryId = this._nextCommandQueryId
|
||||
this._nextCommandQueryId++
|
||||
|
||||
const buffer = Buffer.alloc(28)
|
||||
buffer.writeInt32BE(NBD_REQUEST_MAGIC, 0)
|
||||
buffer.writeInt16BE(0, 4) // no command flags for a simple block read
|
||||
buffer.writeInt16BE(NBD_CMD_READ, 6)
|
||||
buffer.writeBigUInt64BE(queryId, 8)
|
||||
// byte offset in the raw disk
|
||||
const offset = BigInt(index) * BigInt(size)
|
||||
buffer.writeBigUInt64BE(offset, 16)
|
||||
// ensure we do not read after the end of the export (which immediatly disconnect us)
|
||||
|
||||
const maxSize = Math.min(Number(this._exportSize - offset), size)
|
||||
// size wanted
|
||||
buffer.writeInt32BE(maxSize, 24)
|
||||
|
||||
return new Promise(resolve => {
|
||||
this._commandQueries[queryId] = {
|
||||
size: maxSize,
|
||||
resolve,
|
||||
}
|
||||
|
||||
// write command at once to ensure no concurrency issue
|
||||
this._writeToSocket(buffer)
|
||||
})
|
||||
}
|
||||
}
|
||||
75
@vates/nbd-client/nbdclient.spec.mjs
Normal file
75
@vates/nbd-client/nbdclient.spec.mjs
Normal file
@@ -0,0 +1,75 @@
|
||||
import assert from 'assert'
|
||||
import NbdClient from './index.mjs'
|
||||
import { spawn } from 'node:child_process'
|
||||
import fs from 'node:fs/promises'
|
||||
import { test } from 'tap'
|
||||
import tmp from 'tmp'
|
||||
import { pFromCallback } from 'promise-toolbox'
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
|
||||
const FILE_SIZE = 2 * 1024 * 1024
|
||||
|
||||
async function createTempFile(size) {
|
||||
const tmpPath = await pFromCallback(cb => tmp.file(cb))
|
||||
const data = Buffer.alloc(size, 0)
|
||||
for (let i = 0; i < size; i += 4) {
|
||||
data.writeUInt32BE(i, i)
|
||||
}
|
||||
await fs.writeFile(tmpPath, data)
|
||||
|
||||
return tmpPath
|
||||
}
|
||||
|
||||
test('it works with unsecured network', async tap => {
|
||||
const path = await createTempFile(FILE_SIZE)
|
||||
|
||||
const nbdServer = spawn(
|
||||
'nbdkit',
|
||||
[
|
||||
'file',
|
||||
path,
|
||||
'--newstyle', //
|
||||
'--exit-with-parent',
|
||||
'--read-only',
|
||||
'--export-name=MY_SECRET_EXPORT',
|
||||
],
|
||||
{
|
||||
stdio: ['inherit', 'inherit', 'inherit'],
|
||||
}
|
||||
)
|
||||
|
||||
const client = new NbdClient({
|
||||
address: 'localhost',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
secure: false,
|
||||
})
|
||||
|
||||
await client.connect()
|
||||
const CHUNK_SIZE = 32 * 1024 // non default size
|
||||
const indexes = []
|
||||
for (let i = 0; i < FILE_SIZE / CHUNK_SIZE; i++) {
|
||||
indexes.push(i)
|
||||
}
|
||||
// read mutiple blocks in parallel
|
||||
await asyncEach(
|
||||
indexes,
|
||||
async i => {
|
||||
const block = await client.readBlock(i, CHUNK_SIZE)
|
||||
let blockOk = true
|
||||
let firstFail
|
||||
for (let j = 0; j < CHUNK_SIZE; j += 4) {
|
||||
const wanted = i * CHUNK_SIZE + j
|
||||
const found = block.readUInt32BE(j)
|
||||
blockOk = blockOk && found === wanted
|
||||
if (!blockOk && firstFail === undefined) {
|
||||
firstFail = j
|
||||
}
|
||||
}
|
||||
tap.ok(blockOk, `check block ${i} content`)
|
||||
},
|
||||
{ concurrency: 8 }
|
||||
)
|
||||
await client.disconnect()
|
||||
nbdServer.kill()
|
||||
await fs.unlink(path)
|
||||
})
|
||||
30
@vates/nbd-client/package.json
Normal file
30
@vates/nbd-client/package.json
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"private": true,
|
||||
"name": "@vates/nbd-client",
|
||||
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/nbd-client",
|
||||
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
|
||||
"repository": {
|
||||
"directory": "@vates/nbd-client",
|
||||
"type": "git",
|
||||
"url": "https://github.com/vatesfr/xen-orchestra.git"
|
||||
},
|
||||
"author": {
|
||||
"name": "Vates SAS",
|
||||
"url": "https://vates.fr"
|
||||
},
|
||||
"license": "AGPL-3.0-or-later",
|
||||
"version": "0.0.1",
|
||||
"engines": {
|
||||
"node": ">=14.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"@vates/async-each": "^1.0.0",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"xen-api": "^1.2.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"tap": "^16.3.0",
|
||||
"tmp": "^0.2.1"
|
||||
}
|
||||
}
|
||||
152
@vates/nbd-client/tests/index.mjs
Normal file
152
@vates/nbd-client/tests/index.mjs
Normal file
@@ -0,0 +1,152 @@
|
||||
import NbdClient from '../index.js'
|
||||
import { Xapi } from 'xen-api'
|
||||
import readline from 'node:readline'
|
||||
import { stdin as input, stdout as output } from 'node:process'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { downloadVhd, getFullBlocks, getChangedNbdBlocks } from './utils.mjs'
|
||||
|
||||
const xapi = new Xapi({
|
||||
auth: {
|
||||
user: 'root',
|
||||
password: 'vateslab',
|
||||
},
|
||||
url: '172.16.210.11',
|
||||
allowUnauthorized: true,
|
||||
})
|
||||
await xapi.connect()
|
||||
|
||||
const networks = await xapi.call('network.get_all_records')
|
||||
|
||||
const nbdNetworks = Object.values(networks).filter(
|
||||
network => network.purpose.includes('nbd') || network.purpose.includes('insecure_nbd')
|
||||
)
|
||||
|
||||
let secure = false
|
||||
if (!nbdNetworks.length) {
|
||||
console.log(`you don't have any nbd enabled network`)
|
||||
console.log(`please add a purpose of nbd (to use tls) or insecure_nbd to oneof the host network`)
|
||||
process.exit()
|
||||
}
|
||||
|
||||
const network = nbdNetworks[0]
|
||||
secure = network.purpose.includes('nbd')
|
||||
console.log(`we will use network **${network.name_label}** ${secure ? 'with' : 'without'} TLS`)
|
||||
|
||||
const rl = readline.createInterface({ input, output })
|
||||
const question = text => {
|
||||
return new Promise(resolve => {
|
||||
rl.question(text, resolve)
|
||||
})
|
||||
}
|
||||
|
||||
let vmuuid, vmRef
|
||||
do {
|
||||
vmuuid = await question('VM uuid ? ')
|
||||
try {
|
||||
vmRef = xapi.getObject(vmuuid).$ref
|
||||
} catch (e) {
|
||||
// console.log(e)
|
||||
console.log('maybe the objects was not loaded, try again ')
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
} while (!vmRef)
|
||||
|
||||
const vdiRefs = (
|
||||
await asyncMap(await xapi.call('VM.get_VBDs', vmRef), async vbd => {
|
||||
const vdi = await xapi.call('VBD.get_VDI', vbd)
|
||||
return vdi
|
||||
})
|
||||
).filter(vdiRef => vdiRef !== 'OpaqueRef:NULL')
|
||||
|
||||
const vdiRef = vdiRefs[0]
|
||||
|
||||
const vdi = xapi.getObject(vdiRef)
|
||||
|
||||
console.log('Will work on vdi [', vdi.name_label, ']')
|
||||
const cbt_enabled = vdi.cbt_enabled
|
||||
console.log('Change block tracking is [', cbt_enabled ? 'enabled' : 'disabled', ']')
|
||||
|
||||
if (!cbt_enabled) {
|
||||
const shouldEnable = await question('would you like to enable it ? Y/n ')
|
||||
if (shouldEnable === 'Y') {
|
||||
await xapi.call('VDI.enable_cbt', vdiRef)
|
||||
console.log('CBT is now enable for this VDI')
|
||||
console.log('You must make a snapshot, write some data and relaunch this script to backup changes')
|
||||
} else {
|
||||
console.warn('did nothing')
|
||||
}
|
||||
process.exit()
|
||||
}
|
||||
|
||||
console.log('will search for suitable snapshots')
|
||||
const snapshots = vdi.snapshots.map(snapshotRef => xapi.getObject(snapshotRef)).filter(({ cbt_enabled }) => cbt_enabled)
|
||||
|
||||
if (snapshots.length < 2) {
|
||||
throw new Error(`not enough snapshots with cbt enabled , found ${snapshots.length} and 2 are needed`)
|
||||
}
|
||||
|
||||
console.log('found snapshots will compare last two snapshots with cbt_enabled')
|
||||
const snapshotRef = xapi.getObject(snapshots[snapshots.length - 1].uuid).$ref
|
||||
const snapshotTarget = xapi.getObject(snapshots[snapshots.length - 2].uuid).$ref
|
||||
console.log('older snapshot is ', xapi.getObject(snapshotRef).snapshot_time)
|
||||
console.log('newer one is ', xapi.getObject(snapshotTarget).snapshot_time)
|
||||
|
||||
console.log('## will get bitmap of changed blocks')
|
||||
const cbt = Buffer.from(await xapi.call('VDI.list_changed_blocks', snapshotRef, snapshotTarget), 'base64')
|
||||
|
||||
console.log('got changes')
|
||||
console.log('will connect to NBD server')
|
||||
|
||||
const nbd = (await xapi.call('VDI.get_nbd_info', snapshotTarget))[0]
|
||||
|
||||
if (!nbd) {
|
||||
console.error('Nbd is not enabled on the host')
|
||||
console.error('you should add `insecure_nbd` as the `purpose` of a network of this host')
|
||||
process.exit()
|
||||
}
|
||||
|
||||
nbd.secure = true
|
||||
// console.log(nbd)
|
||||
const client = new NbdClient(nbd)
|
||||
await client.connect()
|
||||
|
||||
// @todo : should also handle last blocks that could be incomplete
|
||||
|
||||
const stats = {}
|
||||
|
||||
for (const nbBlocksRead of [32, 16, 8, 4, 2, 1]) {
|
||||
const blockSize = nbBlocksRead * 64 * 1024
|
||||
stats[blockSize] = {}
|
||||
const MASK = 0x80
|
||||
const test = (map, bit) => ((map[bit >> 3] << (bit & 7)) & MASK) !== 0
|
||||
|
||||
const changed = []
|
||||
for (let i = 0; i < (cbt.length * 8) / nbBlocksRead; i++) {
|
||||
let blockChanged = false
|
||||
for (let j = 0; j < nbBlocksRead; j++) {
|
||||
blockChanged = blockChanged || test(cbt, i * nbBlocksRead + j)
|
||||
}
|
||||
if (blockChanged) {
|
||||
changed.push(i)
|
||||
}
|
||||
}
|
||||
console.log(changed.length, 'block changed')
|
||||
for (const concurrency of [32, 16, 8, 4, 2]) {
|
||||
const { speed } = await getChangedNbdBlocks(client, changed, concurrency, blockSize)
|
||||
stats[blockSize][concurrency] = speed
|
||||
}
|
||||
}
|
||||
console.log('speed summary')
|
||||
console.table(stats)
|
||||
|
||||
console.log('## will check full download of the base vdi ')
|
||||
|
||||
await getFullBlocks(client, 16, 512 * 1024) // a good sweet spot
|
||||
|
||||
console.log('## will check vhd delta export size and speed')
|
||||
|
||||
console.log('## will check full vhd export size and speed')
|
||||
await downloadVhd(xapi, {
|
||||
format: 'vhd',
|
||||
vdi: snapshotTarget,
|
||||
})
|
||||
97
@vates/nbd-client/tests/test-coherence.mjs
Normal file
97
@vates/nbd-client/tests/test-coherence.mjs
Normal file
@@ -0,0 +1,97 @@
|
||||
import NbdClient from '../index.js'
|
||||
import { Xapi } from 'xen-api'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { downloadVhd, getFullBlocks } from './utils.mjs'
|
||||
import fs from 'fs/promises'
|
||||
|
||||
const xapi = new Xapi({
|
||||
auth: {
|
||||
user: 'root',
|
||||
password: 'vateslab',
|
||||
},
|
||||
url: '172.16.210.11',
|
||||
allowUnauthorized: true,
|
||||
})
|
||||
await xapi.connect()
|
||||
|
||||
const vmuuid = '123e4f2b-498e-d0af-15ae-f835a1e9f59f'
|
||||
let vmRef
|
||||
do {
|
||||
try {
|
||||
vmRef = xapi.getObject(vmuuid).$ref
|
||||
} catch (e) {
|
||||
console.log('maybe the objects was not loaded, try again ')
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
} while (!vmRef)
|
||||
|
||||
const vdiRefs = (
|
||||
await asyncMap(await xapi.call('VM.get_VBDs', vmRef), async vbd => {
|
||||
const vdi = await xapi.call('VBD.get_VDI', vbd)
|
||||
return vdi
|
||||
})
|
||||
).filter(vdiRef => vdiRef !== 'OpaqueRef:NULL')
|
||||
|
||||
const vdiRef = vdiRefs[0]
|
||||
|
||||
const vdi = xapi.getObject(vdiRef)
|
||||
|
||||
console.log('Will work on vdi [', vdi.name_label, ']')
|
||||
|
||||
console.log('will search for suitable snapshots')
|
||||
const snapshots = vdi.snapshots.map(snapshotRef => xapi.getObject(snapshotRef))
|
||||
|
||||
console.log('found snapshots will use the last one for tests')
|
||||
const snapshotRef = xapi.getObject(snapshots[snapshots.length - 1].uuid).$ref
|
||||
|
||||
console.log('will connect to NBD server')
|
||||
|
||||
const nbd = (await xapi.call('VDI.get_nbd_info', snapshotRef))[0]
|
||||
|
||||
if (!nbd) {
|
||||
console.error('Nbd is not enabled on the host')
|
||||
console.error('you should add `insecure_nbd` as the `purpose` of a network of this host')
|
||||
process.exit()
|
||||
}
|
||||
|
||||
if (!nbd) {
|
||||
console.error('Nbd is not enabled on the host')
|
||||
console.error('you should add `insecure_nbd` as the `purpose` of a network of this host')
|
||||
process.exit()
|
||||
}
|
||||
|
||||
const nbdClient = new NbdClient(nbd)
|
||||
await nbdClient.connect()
|
||||
let fd = await fs.open('/tmp/nbd.raw', 'w')
|
||||
await getFullBlocks({
|
||||
nbdClient,
|
||||
concurrency: 8,
|
||||
nbBlocksRead: 16 /* 1MB block */,
|
||||
fd,
|
||||
})
|
||||
console.log(' done nbd ')
|
||||
await fd.close()
|
||||
|
||||
fd = await fs.open('/tmp/export.raw', 'w')
|
||||
await downloadVhd({
|
||||
xapi,
|
||||
query: {
|
||||
format: 'raw',
|
||||
vdi: snapshotRef,
|
||||
},
|
||||
fd,
|
||||
})
|
||||
|
||||
fd.close()
|
||||
|
||||
fd = await fs.open('/tmp/export.vhd', 'w')
|
||||
await downloadVhd({
|
||||
xapi,
|
||||
query: {
|
||||
format: 'vhd',
|
||||
vdi: snapshotRef,
|
||||
},
|
||||
fd,
|
||||
})
|
||||
|
||||
fd.close()
|
||||
117
@vates/nbd-client/tests/test-transfer-speed.mjs
Normal file
117
@vates/nbd-client/tests/test-transfer-speed.mjs
Normal file
@@ -0,0 +1,117 @@
|
||||
import NbdClient from '../index.js'
|
||||
import { Xapi } from 'xen-api'
|
||||
import readline from 'node:readline'
|
||||
import { stdin as input, stdout as output } from 'node:process'
|
||||
import { asyncMap } from '@xen-orchestra/async-map'
|
||||
import { downloadVhd, getFullBlocks } from './utils.mjs'
|
||||
|
||||
const xapi = new Xapi({
|
||||
auth: {
|
||||
user: 'root',
|
||||
password: 'vateslab',
|
||||
},
|
||||
url: '172.16.210.11',
|
||||
allowUnauthorized: true,
|
||||
})
|
||||
await xapi.connect()
|
||||
|
||||
const networks = await xapi.call('network.get_all_records')
|
||||
console.log({ networks })
|
||||
const nbdNetworks = Object.values(networks).filter(
|
||||
network => network.purpose.includes('nbd') || network.purpose.includes('insecure_nbd')
|
||||
)
|
||||
|
||||
let secure = false
|
||||
if (!nbdNetworks.length) {
|
||||
console.log(`you don't have any nbd enabled network`)
|
||||
console.log(`please add a purpose of nbd (to use tls) or insecure_nbd to oneof the host network`)
|
||||
process.exit()
|
||||
}
|
||||
|
||||
const network = nbdNetworks[0]
|
||||
secure = network.purpose.includes('nbd')
|
||||
console.log(`we will use network **${network.name_label}** ${secure ? 'with' : 'without'} TLS`)
|
||||
|
||||
const rl = readline.createInterface({ input, output })
|
||||
const question = text => {
|
||||
return new Promise(resolve => {
|
||||
rl.question(text, resolve)
|
||||
})
|
||||
}
|
||||
|
||||
let vmuuid, vmRef
|
||||
do {
|
||||
vmuuid = '123e4f2b-498e-d0af-15ae-f835a1e9f59f' // await question('VM uuid ? ')
|
||||
try {
|
||||
vmRef = xapi.getObject(vmuuid).$ref
|
||||
} catch (e) {
|
||||
console.log(e)
|
||||
console.log('maybe the objects was not loaded, try again ')
|
||||
}
|
||||
} while (!vmRef)
|
||||
|
||||
const vdiRefs = (
|
||||
await asyncMap(await xapi.call('VM.get_VBDs', vmRef), async vbd => {
|
||||
const vdi = await xapi.call('VBD.get_VDI', vbd)
|
||||
return vdi
|
||||
})
|
||||
).filter(vdiRef => vdiRef !== 'OpaqueRef:NULL')
|
||||
|
||||
const vdiRef = vdiRefs[0]
|
||||
|
||||
const vdi = xapi.getObject(vdiRef)
|
||||
|
||||
console.log('Will work on vdi [', vdi.name_label, ']')
|
||||
|
||||
console.log('will search for suitable snapshots')
|
||||
const snapshots = vdi.snapshots.map(snapshotRef => xapi.getObject(snapshotRef))
|
||||
|
||||
console.log('found snapshots will use the last one for tests')
|
||||
const snapshotRef = xapi.getObject(snapshots[snapshots.length - 1].uuid).$ref
|
||||
|
||||
console.log('will connect to NBD server')
|
||||
|
||||
const nbd = (await xapi.call('VDI.get_nbd_info', snapshotRef))[0]
|
||||
|
||||
if (!nbd) {
|
||||
console.error('Nbd is not enabled on the host')
|
||||
console.error('you should add `insecure_nbd` as the `purpose` of a network of this host')
|
||||
process.exit()
|
||||
}
|
||||
|
||||
nbd.secure = secure
|
||||
const nbdClient = new NbdClient(nbd)
|
||||
await nbdClient.connect()
|
||||
|
||||
const maxDuration =
|
||||
parseInt(await question('Maximum duration per test in second ? (-1 for unlimited, default 30) '), 10) || 30
|
||||
console.log('Will start downloading blocks during ', maxDuration, 'seconds')
|
||||
|
||||
console.log('## will check the vhd download speed')
|
||||
|
||||
const stats = {}
|
||||
|
||||
for (const nbBlocksRead of [32, 16, 8, 4, 2, 1]) {
|
||||
stats[nbBlocksRead * 64 * 1024] = {}
|
||||
for (const concurrency of [32, 16, 8, 4, 2]) {
|
||||
const { speed } = await getFullBlocks({ nbdClient, concurrency, nbBlocksRead })
|
||||
|
||||
stats[concurrency] = speed
|
||||
}
|
||||
}
|
||||
|
||||
console.log('speed summary')
|
||||
console.table(stats)
|
||||
|
||||
console.log('## will check full vhd export size and speed')
|
||||
await downloadVhd(xapi, {
|
||||
format: 'vhd',
|
||||
vdi: snapshotRef,
|
||||
})
|
||||
|
||||
console.log('## will check full raw export size and speed')
|
||||
await downloadVhd(xapi, {
|
||||
format: 'raw',
|
||||
vdi: snapshotRef,
|
||||
})
|
||||
process.exit()
|
||||
116
@vates/nbd-client/tests/utils.mjs
Normal file
116
@vates/nbd-client/tests/utils.mjs
Normal file
@@ -0,0 +1,116 @@
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { CancelToken } from 'promise-toolbox'
|
||||
import zlib from 'node:zlib'
|
||||
|
||||
export async function getChangedNbdBlocks(nbdClient, changed, concurrency, blockSize) {
|
||||
let nbModified = 0
|
||||
let size = 0
|
||||
let compressedSize = 0
|
||||
const start = new Date()
|
||||
console.log('### with concurrency ', concurrency, ' blockSize ', blockSize / 1024 / 1024, 'MB')
|
||||
const interval = setInterval(() => {
|
||||
console.log(`${nbModified} block handled in ${new Date() - start} ms`)
|
||||
}, 5000)
|
||||
await asyncEach(
|
||||
changed,
|
||||
async blockIndex => {
|
||||
if (new Date() - start > 30000) {
|
||||
return
|
||||
}
|
||||
const data = await nbdClient.readBlock(blockIndex, blockSize)
|
||||
|
||||
await new Promise(resolve => {
|
||||
zlib.gzip(data, { level: zlib.constants.Z_BEST_SPEED }, (_, compressed) => {
|
||||
compressedSize += compressed.length
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
size += data?.length ?? 0
|
||||
nbModified++
|
||||
},
|
||||
{
|
||||
concurrency,
|
||||
}
|
||||
)
|
||||
clearInterval(interval)
|
||||
console.log('duration :', new Date() - start)
|
||||
console.log('read : ', size, 'octets, compressed: ', compressedSize, 'ratio ', size / compressedSize)
|
||||
console.log('speed : ', Math.round(((size / 1024 / 1024) * 1000) / (new Date() - start)), 'MB/s')
|
||||
return { speed: Math.round(((size / 1024 / 1024) * 1000) / (new Date() - start)) }
|
||||
}
|
||||
|
||||
export async function getFullBlocks({ nbdClient, concurrency = 1, nbBlocksRead = 1, fd, maxDuration = -1 } = {}) {
|
||||
const blockSize = nbBlocksRead * 64 * 1024
|
||||
let nbModified = 0
|
||||
let size = 0
|
||||
console.log('### with concurrency ', concurrency)
|
||||
const start = new Date()
|
||||
console.log(' max nb blocks ', nbdClient.nbBlocks / nbBlocksRead)
|
||||
function* blockIterator() {
|
||||
for (let i = 0; i < nbdClient.nbBlocks / nbBlocksRead; i++) {
|
||||
yield i
|
||||
}
|
||||
}
|
||||
const interval = setInterval(() => {
|
||||
console.log(`${nbModified} block handled in ${new Date() - start} ms`)
|
||||
}, 5000)
|
||||
await asyncEach(
|
||||
blockIterator(),
|
||||
async blockIndex => {
|
||||
if (maxDuration > 0 && new Date() - start > maxDuration * 1000) {
|
||||
return
|
||||
}
|
||||
const data = await nbdClient.readBlock(blockIndex, blockSize)
|
||||
size += data?.length ?? 0
|
||||
nbModified++
|
||||
if (fd) {
|
||||
await fd.write(data, 0, data.length, blockIndex * blockSize)
|
||||
}
|
||||
},
|
||||
{
|
||||
concurrency,
|
||||
}
|
||||
)
|
||||
clearInterval(interval)
|
||||
if (new Date() - start < 10000) {
|
||||
console.warn(
|
||||
`data set too small or performance to high, result won't be usefull. Please relaunch with bigger snapshot or higher maximum data size `
|
||||
)
|
||||
}
|
||||
console.log('duration :', new Date() - start)
|
||||
console.log('nb blocks : ', nbModified)
|
||||
console.log('read : ', size, 'octets')
|
||||
const speed = Math.round(((size / 1024 / 1024) * 1000 * 100) / (new Date() - start)) / 100
|
||||
console.log('speed : ', speed, 'MB/s')
|
||||
return { speed }
|
||||
}
|
||||
|
||||
export async function downloadVhd({ xapi, query, fd, maxDuration = -1 } = {}) {
|
||||
const startStream = new Date()
|
||||
let sizeStream = 0
|
||||
let nbChunk = 0
|
||||
|
||||
const interval = setInterval(() => {
|
||||
console.log(`${nbChunk} chunks , ${sizeStream} octets handled in ${new Date() - startStream} ms`)
|
||||
}, 5000)
|
||||
const stream = await xapi.getResource(CancelToken.none, '/export_raw_vdi/', {
|
||||
query,
|
||||
})
|
||||
for await (const chunk of stream) {
|
||||
sizeStream += chunk.length
|
||||
|
||||
if (fd) {
|
||||
await fd.write(chunk)
|
||||
}
|
||||
nbChunk++
|
||||
|
||||
if (maxDuration > 0 && new Date() - startStream > maxDuration * 1000) {
|
||||
break
|
||||
}
|
||||
}
|
||||
clearInterval(interval)
|
||||
console.log('Stream duration :', new Date() - startStream)
|
||||
console.log('Stream read : ', sizeStream, 'octets')
|
||||
const speed = Math.round(((sizeStream / 1024 / 1024) * 1000 * 100) / (new Date() - startStream)) / 100
|
||||
console.log('speed : ', speed, 'MB/s')
|
||||
}
|
||||
@@ -659,7 +659,7 @@ class RemoteAdapter {
|
||||
return path
|
||||
}
|
||||
|
||||
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
|
||||
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency, nbdClient } = {}) {
|
||||
const handler = this._handler
|
||||
|
||||
if (this.#useVhdDirectory()) {
|
||||
@@ -671,6 +671,7 @@ class RemoteAdapter {
|
||||
await input.task
|
||||
return validator.apply(this, arguments)
|
||||
},
|
||||
nbdClient,
|
||||
})
|
||||
await VhdAbstract.createAlias(handler, path, dataPath)
|
||||
} else {
|
||||
|
||||
@@ -19,7 +19,7 @@ const { AbstractDeltaWriter } = require('./_AbstractDeltaWriter.js')
|
||||
const { checkVhd } = require('./_checkVhd.js')
|
||||
const { packUuid } = require('./_packUuid.js')
|
||||
const { Disposable } = require('promise-toolbox')
|
||||
|
||||
const NbdClient = require('nbd-client')
|
||||
const { warn } = createLogger('xo:backups:DeltaBackupWriter')
|
||||
|
||||
exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(AbstractDeltaWriter) {
|
||||
@@ -199,12 +199,24 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
|
||||
await checkVhd(handler, parentPath)
|
||||
}
|
||||
|
||||
// get nbd if possible
|
||||
const vdiRef = vm.$xapi.getObject(vdi.uuid).$ref
|
||||
let nbdClient
|
||||
try {
|
||||
const [nbdInfo] = await vm.$xapi.call('VDI.get_nbd_info', vdiRef)
|
||||
nbdClient = new NbdClient(nbdInfo)
|
||||
await nbdClient.connect()
|
||||
} catch (e) {
|
||||
console.log('NBD error', e)
|
||||
}
|
||||
|
||||
await adapter.writeVhd(path, deltaExport.streams[`${id}.vhd`], {
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
checksum: false,
|
||||
validator: tmpPath => checkVhd(handler, tmpPath),
|
||||
writeBlockConcurrency: this._backup.config.writeBlockConcurrency,
|
||||
nbdClient,
|
||||
})
|
||||
|
||||
if (isDelta) {
|
||||
|
||||
@@ -4,12 +4,12 @@ FROM ubuntu:xenial
|
||||
# https://qastack.fr/programming/25899912/how-to-install-nvm-in-docker
|
||||
|
||||
RUN apt-get update
|
||||
RUN apt-get install -y curl qemu-utils blktap-utils vmdk-stream-converter git libxml2-utils
|
||||
RUN apt-get install -y curl qemu-utils blktap-utils vmdk-stream-converter git libxml2-utils nbdkit g++
|
||||
ENV NVM_DIR /usr/local/nvm
|
||||
RUN mkdir -p /usr/local/nvm
|
||||
RUN cd /usr/local/nvm
|
||||
RUN curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash
|
||||
ENV NODE_VERSION v17.0.1
|
||||
ENV NODE_VERSION v16.10.0
|
||||
RUN /bin/bash -c "source $NVM_DIR/nvm.sh && nvm install $NODE_VERSION && nvm use --delete-prefix $NODE_VERSION"
|
||||
|
||||
ENV NODE_PATH $NVM_DIR/versions/node/$NODE_VERSION/lib/node_modules
|
||||
|
||||
@@ -8,10 +8,10 @@ const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
|
||||
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression, nbdClient }) {
|
||||
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
||||
await asyncEach(
|
||||
parseVhdStream(inputStream),
|
||||
parseVhdStream(inputStream, nbdClient),
|
||||
async function (item) {
|
||||
switch (item.type) {
|
||||
case 'footer':
|
||||
@@ -44,10 +44,10 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
|
||||
handler,
|
||||
path,
|
||||
inputStream,
|
||||
{ validator, concurrency = 16, compression } = {}
|
||||
{ validator, concurrency = 16, compression, nbdClient } = {}
|
||||
) {
|
||||
try {
|
||||
await buildVhd(handler, path, inputStream, { concurrency, compression })
|
||||
await buildVhd(handler, path, inputStream, { concurrency, compression, nbdClient })
|
||||
if (validator !== undefined) {
|
||||
await validator.call(this, path)
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ const { BLOCK_UNUSED, FOOTER_SIZE, HEADER_SIZE, SECTOR_SIZE } = require('./_cons
|
||||
const { readChunk } = require('@vates/read-chunk')
|
||||
const assert = require('assert')
|
||||
const { unpackFooter, unpackHeader, computeFullBlockSize } = require('./Vhd/_utils')
|
||||
|
||||
/*
|
||||
const cappedBufferConcat = (buffers, maxSize) => {
|
||||
let buffer = Buffer.concat(buffers)
|
||||
if (buffer.length > maxSize) {
|
||||
@@ -13,114 +13,6 @@ const cappedBufferConcat = (buffers, maxSize) => {
|
||||
return buffer
|
||||
}
|
||||
|
||||
exports.parseVhdStream = async function* parseVhdStream(stream) {
|
||||
let bytesRead = 0
|
||||
|
||||
// handle empty space between elements
|
||||
// ensure we read stream in order
|
||||
async function read(offset, size) {
|
||||
assert(bytesRead <= offset, `offset is ${offset} but we already read ${bytesRead} bytes`)
|
||||
if (bytesRead < offset) {
|
||||
// empty spaces
|
||||
await read(bytesRead, offset - bytesRead)
|
||||
}
|
||||
const buf = await readChunk(stream, size)
|
||||
assert.strictEqual(buf.length, size, `read ${buf.length} instead of ${size}`)
|
||||
bytesRead += size
|
||||
return buf
|
||||
}
|
||||
|
||||
const bufFooter = await read(0, FOOTER_SIZE)
|
||||
|
||||
const footer = unpackFooter(bufFooter)
|
||||
yield { type: 'footer', footer, offset: 0 }
|
||||
|
||||
const bufHeader = await read(FOOTER_SIZE, HEADER_SIZE)
|
||||
const header = unpackHeader(bufHeader, footer)
|
||||
|
||||
yield { type: 'header', header, offset: SECTOR_SIZE }
|
||||
const blockSize = header.blockSize
|
||||
assert.strictEqual(blockSize % SECTOR_SIZE, 0)
|
||||
|
||||
const fullBlockSize = computeFullBlockSize(blockSize)
|
||||
|
||||
const bitmapSize = fullBlockSize - blockSize
|
||||
|
||||
const index = []
|
||||
|
||||
for (const parentLocatorId in header.parentLocatorEntry) {
|
||||
const parentLocatorEntry = header.parentLocatorEntry[parentLocatorId]
|
||||
// empty parent locator entry, does not exist in the content
|
||||
if (parentLocatorEntry.platformDataSpace === 0) {
|
||||
continue
|
||||
}
|
||||
index.push({
|
||||
...parentLocatorEntry,
|
||||
type: 'parentLocator',
|
||||
offset: parentLocatorEntry.platformDataOffset,
|
||||
size: parentLocatorEntry.platformDataLength,
|
||||
id: parentLocatorId,
|
||||
})
|
||||
}
|
||||
|
||||
const batOffset = header.tableOffset
|
||||
const batSize = Math.max(1, Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE)) * SECTOR_SIZE
|
||||
|
||||
index.push({
|
||||
type: 'bat',
|
||||
offset: batOffset,
|
||||
size: batSize,
|
||||
})
|
||||
|
||||
// sometimes some parent locator are before the BAT
|
||||
index.sort((a, b) => a.offset - b.offset)
|
||||
while (index.length > 0) {
|
||||
const item = index.shift()
|
||||
const buffer = await read(item.offset, item.size)
|
||||
item.buffer = buffer
|
||||
|
||||
const { type } = item
|
||||
if (type === 'bat') {
|
||||
// found the BAT : read it and add block to index
|
||||
|
||||
let blockCount = 0
|
||||
for (let blockCounter = 0; blockCounter < header.maxTableEntries; blockCounter++) {
|
||||
const batEntrySector = buffer.readUInt32BE(blockCounter * 4)
|
||||
// unallocated block, no need to export it
|
||||
if (batEntrySector !== BLOCK_UNUSED) {
|
||||
const batEntryBytes = batEntrySector * SECTOR_SIZE
|
||||
// ensure the block is not before the bat
|
||||
assert.ok(batEntryBytes >= batOffset + batSize)
|
||||
index.push({
|
||||
type: 'block',
|
||||
id: blockCounter,
|
||||
offset: batEntryBytes,
|
||||
size: fullBlockSize,
|
||||
})
|
||||
blockCount++
|
||||
}
|
||||
}
|
||||
// sort again index to ensure block and parent locator are in the right order
|
||||
index.sort((a, b) => a.offset - b.offset)
|
||||
item.blockCount = blockCount
|
||||
} else if (type === 'block') {
|
||||
item.bitmap = buffer.slice(0, bitmapSize)
|
||||
item.data = buffer.slice(bitmapSize)
|
||||
}
|
||||
|
||||
yield item
|
||||
}
|
||||
|
||||
/**
|
||||
* the second footer is at filesize - 512 , there can be empty spaces between last block
|
||||
* and the start of the footer
|
||||
*
|
||||
* we read till the end of the stream, and use the last 512 bytes as the footer
|
||||
*/
|
||||
const bufFooterEnd = await readLastSector(stream)
|
||||
assert(bufFooter.equals(bufFooterEnd), 'footer1 !== footer2')
|
||||
}
|
||||
|
||||
function readLastSector(stream) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let bufFooterEnd = Buffer.alloc(0)
|
||||
@@ -133,4 +25,227 @@ function readLastSector(stream) {
|
||||
stream.on('end', () => resolve(bufFooterEnd))
|
||||
stream.on('error', reject)
|
||||
})
|
||||
} */
|
||||
|
||||
class StreamParser {
|
||||
_bitmapSize = 0
|
||||
_bytesRead = 0
|
||||
_stream = null
|
||||
_index = []
|
||||
constructor(stream) {
|
||||
this._stream = stream
|
||||
}
|
||||
|
||||
async _read(offset, size) {
|
||||
assert(this._bytesRead <= offset, `offset is ${offset} but we already read ${this._bytesRead} bytes`)
|
||||
if (this._bytesRead < offset) {
|
||||
// empty spaces
|
||||
await this._read(this._bytesRead, offset - this._bytesRead)
|
||||
}
|
||||
const buf = await readChunk(this._stream, size)
|
||||
assert.strictEqual(buf.length, size, `read ${buf.length} instead of ${size}`)
|
||||
this._bytesRead += size
|
||||
return buf
|
||||
}
|
||||
|
||||
async *headers() {
|
||||
const bufFooter = await this._read(0, FOOTER_SIZE)
|
||||
|
||||
const footer = unpackFooter(bufFooter)
|
||||
|
||||
yield { type: 'footer', footer, offset: 0 }
|
||||
const bufHeader = await this._read(FOOTER_SIZE, HEADER_SIZE)
|
||||
const header = unpackHeader(bufHeader, footer)
|
||||
|
||||
yield { type: 'header', header, offset: SECTOR_SIZE }
|
||||
const blockSize = header.blockSize
|
||||
assert.strictEqual(blockSize % SECTOR_SIZE, 0)
|
||||
const fullBlockSize = computeFullBlockSize(blockSize)
|
||||
this._bitmapSize = fullBlockSize - blockSize
|
||||
|
||||
let batFound = false
|
||||
|
||||
for (const parentLocatorId in header.parentLocatorEntry) {
|
||||
const parentLocatorEntry = header.parentLocatorEntry[parentLocatorId]
|
||||
// empty parent locator entry, does not exist in the content
|
||||
if (parentLocatorEntry.platformDataSpace === 0) {
|
||||
continue
|
||||
}
|
||||
this._index.push({
|
||||
...parentLocatorEntry,
|
||||
type: 'parentLocator',
|
||||
offset: parentLocatorEntry.platformDataOffset,
|
||||
size: parentLocatorEntry.platformDataLength,
|
||||
id: parentLocatorId,
|
||||
})
|
||||
}
|
||||
|
||||
const batOffset = header.tableOffset
|
||||
const batSize = Math.max(1, Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE)) * SECTOR_SIZE
|
||||
|
||||
this._index.push({
|
||||
type: 'bat',
|
||||
offset: batOffset,
|
||||
size: batSize,
|
||||
})
|
||||
|
||||
// sometimes some parent locator are before the BAT
|
||||
this._index.sort((a, b) => a.offset - b.offset)
|
||||
|
||||
while (!batFound) {
|
||||
const item = this._index.shift()
|
||||
const buffer = await this._read(item.offset, item.size)
|
||||
item.buffer = buffer
|
||||
|
||||
const { type } = item
|
||||
if (type === 'bat') {
|
||||
// found the BAT : read it and add block to index
|
||||
|
||||
let blockCount = 0
|
||||
for (let blockCounter = 0; blockCounter < header.maxTableEntries; blockCounter++) {
|
||||
const batEntrySector = buffer.readUInt32BE(blockCounter * 4)
|
||||
// unallocated block, no need to export it
|
||||
if (batEntrySector !== BLOCK_UNUSED) {
|
||||
const batEntryBytes = batEntrySector * SECTOR_SIZE
|
||||
// ensure the block is not before the bat
|
||||
assert.ok(batEntryBytes >= batOffset + batSize)
|
||||
this._index.push({
|
||||
type: 'block',
|
||||
id: blockCounter,
|
||||
offset: batEntryBytes,
|
||||
size: fullBlockSize,
|
||||
})
|
||||
blockCount++
|
||||
}
|
||||
}
|
||||
// sort again index to ensure block and parent locator are in the right order
|
||||
this._index.sort((a, b) => a.offset - b.offset)
|
||||
item.blockCount = blockCount
|
||||
batFound = true
|
||||
}
|
||||
yield item
|
||||
}
|
||||
}
|
||||
|
||||
async *blocks() {
|
||||
while (this._index.length > 0) {
|
||||
const item = this._index.shift()
|
||||
const buffer = await this._read(item.offset, item.size)
|
||||
|
||||
item.bitmap = buffer.slice(0, this._bitmapSize)
|
||||
item.data = buffer.slice(this._bitmapSize)
|
||||
item.buffer = buffer
|
||||
yield item
|
||||
}
|
||||
}
|
||||
|
||||
async *parse() {
|
||||
yield* this.headers()
|
||||
yield* this.blocks()
|
||||
|
||||
/**
|
||||
* the second footer is at filesize - 512 , there can be empty spaces between last block
|
||||
* and the start of the footer
|
||||
*
|
||||
* we read till the end of the stream, and use the last 512 bytes as the footer
|
||||
*/
|
||||
// const bufFooterEnd = await readLastSector(this._stream)
|
||||
// assert(bufFooter.equals(bufFooterEnd), 'footer1 !== footer2')
|
||||
}
|
||||
}
|
||||
|
||||
// hybrid mode : read the headers from the vhd stream, and read the blocks from nbd
|
||||
class StreamNbdParser extends StreamParser {
|
||||
#nbdClient = null
|
||||
#generateBlockSequentially = true
|
||||
#concurrency = 16
|
||||
|
||||
constructor(stream, nbdClient, { generateBlockSequentially = true } = {}) {
|
||||
super(stream)
|
||||
this.#nbdClient = nbdClient
|
||||
this.#generateBlockSequentially = generateBlockSequentially
|
||||
}
|
||||
|
||||
async readBlockData(item) {
|
||||
const SECTOR_BITMAP = Buffer.alloc(512, 255)
|
||||
const client = this.#nbdClient
|
||||
// we read in a raw file, so the block position is id x length, and have nothing to do with the offset
|
||||
// in the vhd stream
|
||||
const rawDataLength = item.size - SECTOR_BITMAP.length
|
||||
let data = await client.readBlock(item.id, rawDataLength)
|
||||
|
||||
// end of file , non aligned vhd block
|
||||
if (data.length < rawDataLength) {
|
||||
data = Buffer.concat([data, Buffer.alloc(rawDataLength - data.length)])
|
||||
}
|
||||
const buffer = Buffer.concat([SECTOR_BITMAP, data])
|
||||
const block = {
|
||||
...item,
|
||||
size: rawDataLength,
|
||||
bitmap: SECTOR_BITMAP,
|
||||
data,
|
||||
buffer,
|
||||
}
|
||||
return block
|
||||
}
|
||||
|
||||
async *blocks() {
|
||||
const index = this._index
|
||||
|
||||
const promiseSlots = []
|
||||
|
||||
// parallel yielding
|
||||
function next(position) {
|
||||
const item = index.shift()
|
||||
if (item) {
|
||||
// update this ended
|
||||
promiseSlots[position] = this.readBlockData(item)
|
||||
.then(result => {
|
||||
return { result, position }
|
||||
})
|
||||
.catch(error => {
|
||||
console.error(error)
|
||||
throw error
|
||||
})
|
||||
} else {
|
||||
// no more block to handle
|
||||
promiseSlots[position] = undefined
|
||||
}
|
||||
}
|
||||
|
||||
for (let i = 0; i < this.#concurrency; i++) {
|
||||
next(i)
|
||||
}
|
||||
|
||||
let runningPromises = []
|
||||
while (true) {
|
||||
runningPromises = promiseSlots.filter(p => p !== undefined)
|
||||
if (runningPromises.length === 0) {
|
||||
// done
|
||||
break
|
||||
}
|
||||
// the failing promises will only be seen when ALL the running promises
|
||||
// fails
|
||||
const { result, position } = await Promise.any(runningPromises)
|
||||
next(position)
|
||||
yield result
|
||||
}
|
||||
}
|
||||
|
||||
async *parse() {
|
||||
yield* this.headers()
|
||||
|
||||
yield* this.blocks()
|
||||
this._stream.destroy()
|
||||
}
|
||||
}
|
||||
|
||||
exports.parseVhdStream = async function* parseVhdStream(stream, nbdClient) {
|
||||
let parser
|
||||
if (nbdClient) {
|
||||
parser = new StreamNbdParser(stream, nbdClient)
|
||||
} else {
|
||||
parser = new StreamParser(stream)
|
||||
}
|
||||
yield* parser.parse()
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
"@vates/disposable": "^0.1.1",
|
||||
"@vates/event-listeners-manager": "^1.0.1",
|
||||
"@vates/multi-key-map": "^0.1.0",
|
||||
"@vates/nbd-client": "^0.0.1",
|
||||
"@vates/parse-duration": "^0.1.1",
|
||||
"@vates/predicates": "^1.0.0",
|
||||
"@vates/read-chunk": "^1.0.0",
|
||||
|
||||
@@ -32,7 +32,7 @@ const gitDiff = (what, args = []) =>
|
||||
.split('\n')
|
||||
.filter(_ => _ !== '')
|
||||
const gitDiffFiles = (files = []) => gitDiff('files', files)
|
||||
const gitDiffIndex = () => gitDiff('index', ['--cached', 'HEAD'])
|
||||
const gitDiffIndex = () => gitDiff('index', ['--cached', 'HEAD~1'])
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -17528,7 +17528,7 @@ tap-yaml@^1.0.0:
|
||||
dependencies:
|
||||
yaml "^1.5.0"
|
||||
|
||||
tap@^16.0.1, tap@^16.1.0, tap@^16.2.0:
|
||||
tap@^16.0.1, tap@^16.1.0, tap@^16.2.0, tap@^16.3.0:
|
||||
version "16.3.0"
|
||||
resolved "https://registry.yarnpkg.com/tap/-/tap-16.3.0.tgz#8323fc66990951b52063a01dadffa0eaf3c55e96"
|
||||
integrity sha512-J9GffPUAbX6FnWbQ/jj7ktzd9nnDFP1fH44OzidqOmxUfZ1hPLMOvpS99LnDiP0H2mO8GY3kGN5XoY0xIKbNFA==
|
||||
|
||||
Reference in New Issue
Block a user