Compare commits

...

10 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
f134fd33bf feat(@vates/nbd-client): export a stream from a remote nbd export 2023-08-02 09:13:17 +02:00
Florent BEAUCHAMP
28794fa820 feat(@vates/nbd-client): make defautl iterator chunk size a parameter 2023-08-02 09:09:42 +02:00
Florent BEAUCHAMP
66847f04b4 fix(@vates/nbd-client): group socket reading 2023-08-02 09:08:36 +02:00
Florent BEAUCHAMP
1dbfc6d0a2 fix(nbd-client) better handling of last unaligned block 2023-08-02 09:07:39 +02:00
Florent BEAUCHAMP
12c2083651 fix: code cleanup 2023-07-27 11:05:36 +02:00
Florent BEAUCHAMP
b1ed98a8fd wip 2023-07-26 16:59:34 +02:00
Florent BEAUCHAMP
f495e7110d dumb implementation 2023-07-26 07:03:06 +02:00
Florent BEAUCHAMP
e9b92780b9 fix nraynaud 2023-07-25 09:50:40 +02:00
Florent BEAUCHAMP
d6ac1c2598 fixes 2023-07-25 09:49:31 +02:00
Florent BEAUCHAMP
7e1dd7c26f feat(vmware-explorer): handle sesparse files 2023-07-25 09:49:31 +02:00
4 changed files with 204 additions and 162 deletions

View File

@@ -21,6 +21,7 @@ import {
OPTS_MAGIC,
NBD_CMD_DISC,
} from './constants.mjs'
import { Readable } from 'node:stream'
const { warn } = createLogger('vates:nbd-client')
@@ -232,19 +233,20 @@ export default class NbdClient {
}
try {
this.#waitingForResponse = true
const magic = await this.#readInt32()
const buffer = await this.#read(8)
const magic = buffer.readInt32BE(0)
if (magic !== NBD_REPLY_MAGIC) {
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
}
const error = await this.#readInt32()
const error = buffer.readInt32BE(1)
if (error !== 0) {
// @todo use error code from constants.mjs
throw new Error(`GOT ERROR CODE : ${error}`)
}
const blockQueryId = await this.#readInt64()
const blockQueryId = buffer.readBigUInt64BE(4)
const query = this.#commandQueryBacklog.get(blockQueryId)
if (!query) {
throw new Error(` no query associated with id ${blockQueryId}`)
@@ -281,7 +283,13 @@ export default class NbdClient {
buffer.writeInt16BE(NBD_CMD_READ, 6) // we want to read a data block
buffer.writeBigUInt64BE(queryId, 8)
// byte offset in the raw disk
buffer.writeBigUInt64BE(BigInt(index) * BigInt(size), 16)
const offset = BigInt(index) * BigInt(size)
const remaining = this.#exportSize - offset
if (remaining < BigInt(size)) {
size = Number(remaining)
}
buffer.writeBigUInt64BE(offset, 16)
buffer.writeInt32BE(size, 24)
return new Promise((resolve, reject) => {
@@ -307,14 +315,15 @@ export default class NbdClient {
})
}
async *readBlocks(indexGenerator) {
async *readBlocks(indexGenerator = 2 * 1024 * 1024) {
// default : read all blocks
if (indexGenerator === undefined) {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
if (typeof indexGenerator === 'number') {
const exportSize = Number(this.#exportSize)
const chunkSize = indexGenerator
indexGenerator = function* () {
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
for (let index = 0; BigInt(index) < nbBlocks; index++) {
const nbBlocks = Math.ceil(exportSize / chunkSize)
for (let index = 0; index < nbBlocks; index++) {
yield { index, size: chunkSize }
}
}
@@ -348,4 +357,15 @@ export default class NbdClient {
yield readAhead.shift()
}
}
stream(chunk_size) {
async function* iterator() {
for await (const chunk of this.readBlocks(chunk_size)) {
yield chunk
}
}
// create a readable stream instead of returning the iterator
// since iterators don't like unshift and partial reading
return Readable.from(iterator())
}
}

View File

@@ -1,18 +1,54 @@
import _computeGeometryForSize from 'vhd-lib/_computeGeometryForSize.js'
import { createFooter, createHeader } from 'vhd-lib/_createFooterHeader.js'
import { FOOTER_SIZE } from 'vhd-lib/_constants.js'
import { DISK_TYPES, FOOTER_SIZE } from 'vhd-lib/_constants.js'
import { notEqual, strictEqual } from 'node:assert'
import { unpackFooter, unpackHeader } from 'vhd-lib/Vhd/_utils.js'
import { VhdAbstract } from 'vhd-lib'
// from https://github.com/qemu/qemu/commit/98eb9733f4cf2eeab6d12db7e758665d2fd5367b#
// one big difference with the other versions of VMDK is that the grain tables are actually sparse, they are pre-allocated but not used in grain order,
// so we have to read the grain directory to know where to find the grain tables
function readInt64(buffer, index) {
const n = buffer.readBigInt64LE(index * 8 /* size of an int64 in bytes */)
if (n > Number.MAX_SAFE_INTEGER) {
const SE_SPARSE_DIR_NON_ALLOCATED = 0
const SE_SPARSE_DIR_ALLOCATED = 1
const SE_SPARSE_GRAIN_NON_ALLOCATED = 0 // check in parent
const SE_SPARSE_GRAIN_UNMAPPED = 1 // grain has been unmapped, but index of previous grain still readable for reclamation
const SE_SPARSE_GRAIN_ZERO = 2
const SE_SPARSE_GRAIN_ALLOCATED = 3
const VHD_BLOCK_SIZE_BYTES = 2 * 1024 * 1024
const GRAIN_SIZE_BYTES = 4 * 1024
const GRAIN_TABLE_COUNT = 4 * 1024
const ones = n => (1n << BigInt(n)) - 1n
function asNumber(n) {
if (n > Number.MAX_SAFE_INTEGER)
throw new Error(`can't handle ${n} ${Number.MAX_SAFE_INTEGER} ${n & 0x00000000ffffffffn}`)
}
return +n
return Number(n)
}
const readInt64 = (buffer, index) => asNumber(buffer.readBigInt64LE(index * 8))
/**
* @returns {{topNibble: number, low60: bigint}} topNibble is the first 4 bits of the 64 bits entry, indexPart is the remaining 60 bits
*/
function readTaggedEntry(buffer, index) {
const entry = buffer.readBigInt64LE(index * 8)
return { topNibble: Number(entry >> 60n), low60: entry & ones(60) }
}
function readSeSparseDir(buffer, index) {
const { topNibble, low60 } = readTaggedEntry(buffer, index)
return { type: topNibble, tableIndex: asNumber(low60) }
}
function readSeSparseTable(buffer, index) {
const { topNibble, low60 } = readTaggedEntry(buffer, index)
// https://lists.gnu.org/archive/html/qemu-block/2019-06/msg00934.html
const topIndexPart = low60 >> 48n // bring the top 12 bits down
const bottomIndexPart = (low60 & ones(48)) << 12n // bring the bottom 48 bits up
return { type: topNibble, grainIndex: asNumber(bottomIndexPart | topIndexPart) }
}
export default class VhdEsxiSeSparse extends VhdAbstract {
@@ -25,27 +61,22 @@ export default class VhdEsxiSeSparse extends VhdAbstract {
#header
#footer
#grainDirectory
// as we will read all grain with data with load everything in memory
// in theory , that can be 512MB of data for a 2TB fully allocated
// but our use case is to transfer a relatively small diff
// and random access is expensive in HTTP, and migration is a one time cors
// so let's go with naive approach, and future me will have to handle a more
// clever approach if necessary
// grain at zero won't be stored
#grainIndex // Map blockId => []
#grainMap = new Map()
#grainSize
#grainTableSize
#grainTableOffset
#grainOffset
#grainDirOffsetBytes
#grainDirSizeBytes
#grainTableOffsetBytes
#grainOffsetBytes
static async open(esxi, datastore, path, parentVhd, opts) {
const vhd = new VhdEsxiSeSparse(esxi, datastore, path, parentVhd, opts)
await vhd.readHeaderAndFooter()
return vhd
}
get path() {
return this.#path
}
constructor(esxi, datastore, path, parentVhd, { lookMissingBlockInParent = true } = {}) {
super()
this.#esxi = esxi
@@ -63,156 +94,149 @@ export default class VhdEsxiSeSparse extends VhdAbstract {
return this.#footer
}
async #readGrain(start, length = 4 * 1024) {
return (await this.#esxi.download(this.#datastore, this.#path, `${start}-${start + length - 1}`)).buffer()
}
containsBlock(blockId) {
notEqual(this.#grainDirectory, undefined, "bat must be loaded to use contain blocks'")
// a grain table is 4096 entries of 4KB
// a grain table cover 8 vhd blocks
// grain table always exists in sespars
// depending on the paramters we also look into the parent data
notEqual(this.#grainIndex, undefined, "bat must be loaded to use contain blocks'")
return (
this.#grainDirectory.readInt32LE(blockId * 4) !== 0 ||
this.#grainIndex.get(blockId) !== undefined ||
(this.#lookMissingBlockInParent && this.#parentVhd.containsBlock(blockId))
)
}
async #read(start, end) {
return (await this.#esxi.download(this.#datastore, this.#path, `${start}-${end}`)).buffer()
async #read(start, length) {
const buffer = await (
await this.#esxi.download(this.#datastore, this.#path, `${start}-${start + length - 1}`)
).buffer()
strictEqual(buffer.length, length)
return buffer
}
async readHeaderAndFooter() {
const buffer = await this.#read(0, 2048)
strictEqual(buffer.readBigInt64LE(0), 0xcafebaben)
const vmdkHeaderBuffer = await this.#read(0, 2048)
strictEqual(readInt64(buffer, 1), 0x200000001) // version 2.1
strictEqual(vmdkHeaderBuffer.readBigInt64LE(0), 0xcafebaben)
strictEqual(readInt64(vmdkHeaderBuffer, 1), 0x200000001) // version 2.1
const capacity = readInt64(buffer, 2)
const grain_size = readInt64(buffer, 3)
this.#grainDirOffsetBytes = readInt64(vmdkHeaderBuffer, 16) * 512
// console.log('grainDirOffsetBytes', this.#grainDirOffsetBytes)
this.#grainDirSizeBytes = readInt64(vmdkHeaderBuffer, 17) * 512
// console.log('grainDirSizeBytes', this.#grainDirSizeBytes)
const grain_tables_offset = readInt64(buffer, 18)
const grain_tables_size = readInt64(buffer, 19)
this.#grainOffset = readInt64(buffer, 24)
const grainSizeSectors = readInt64(vmdkHeaderBuffer, 3)
const grainSizeBytes = grainSizeSectors * 512 // 8 sectors = 4KB default
strictEqual(grainSizeBytes, GRAIN_SIZE_BYTES) // we only support default grain size
this.#grainSize = grain_size * 512 // 8 sectors / 4KB default
this.#grainTableOffset = grain_tables_offset * 512
this.#grainTableSize = grain_tables_size * 512
this.#grainTableOffsetBytes = readInt64(vmdkHeaderBuffer, 18) * 512
// console.log('grainTableOffsetBytes', this.#grainTableOffsetBytes)
const size = capacity * grain_size * 512
this.#header = unpackHeader(createHeader(Math.ceil(size / (4096 * 512))))
const geometry = _computeGeometryForSize(size)
const actualSize = geometry.actualSize
const grainTableCount = (readInt64(vmdkHeaderBuffer, 4) * 512) / 8 // count is the number of 64b entries in each tables
// console.log('grainTableCount', grainTableCount)
strictEqual(grainTableCount, GRAIN_TABLE_COUNT) // we only support tables of 4096 entries (default)
this.#grainOffsetBytes = readInt64(vmdkHeaderBuffer, 24) * 512
// console.log('grainOffsetBytes', this.#grainOffsetBytes)
const sizeBytes = readInt64(vmdkHeaderBuffer, 2) * 512
// console.log('sizeBytes', sizeBytes)
const nbBlocks = Math.ceil(sizeBytes / VHD_BLOCK_SIZE_BYTES)
this.#header = unpackHeader(createHeader(nbBlocks))
const geometry = _computeGeometryForSize(sizeBytes)
this.#footer = unpackFooter(
createFooter(actualSize, Math.floor(Date.now() / 1000), geometry, FOOTER_SIZE, this.#parentVhd.footer.diskType)
createFooter(sizeBytes, Math.floor(Date.now() / 1000), geometry, FOOTER_SIZE, DISK_TYPES.DYNAMIC)
)
}
async readBlockAllocationTable() {
const CHUNK_SIZE = 64 * 512
this.#grainIndex = new Map()
strictEqual(this.#grainTableSize % CHUNK_SIZE, 0)
for (let chunkIndex = 0, grainIndex = 0; chunkIndex < this.#grainTableSize / CHUNK_SIZE; chunkIndex++) {
process.stdin.write('.')
const start = chunkIndex * CHUNK_SIZE + this.#grainTableOffset
const end = start + 4096 * 8 - 1
const buffer = await this.#read(start, end)
for (let indexInChunk = 0; indexInChunk < 4096; indexInChunk++) {
const entry = buffer.readBigInt64LE(indexInChunk * 8)
switch (entry) {
case 0n: // not allocated, go to parent
break
case 1n: // unmapped
break
}
if (entry > 3n) {
this.#grainMap.set(grainIndex)
grainIndex++
}
}
}
// read grain directory and the grain tables
const nbBlocks = this.header.maxTableEntries
this.#grainDirectory = await this.#read(2048 /* header length */, 2048 + nbBlocks * 4 - 1)
}
// we're lucky : a grain address can address exacty a full block
async readBlock(blockId) {
notEqual(this.#grainDirectory, undefined, 'grainDirectory is not loaded')
const sectorOffset = this.#grainDirectory.readInt32LE(blockId * 4)
const buffer = (await this.#parentVhd.readBlock(blockId)).buffer
if (sectorOffset === 0) {
strictEqual(this.#lookMissingBlockInParent, true, "shouldn't have empty block in a delta alone")
return {
id: blockId,
bitmap: buffer.slice(0, 512),
data: buffer.slice(512),
buffer,
}
}
const offset = sectorOffset * 512
const graintable = await this.#read(offset, offset + 4096 * 4 /* grain table length */ - 1)
strictEqual(graintable.length, 4096 * 4)
// we have no guaranty that data are order or contiguous
// let's construct ranges to limit the number of queries
let rangeStart, offsetStart, offsetEnd
const changeRange = async (index, offset) => {
if (offsetStart !== undefined) {
// if there was a
if (offset === offsetEnd) {
offsetEnd++
return
}
const grains = await this.#read(offsetStart * 512, offsetEnd * 512 - 1)
grains.copy(buffer, (rangeStart + 1) /* block bitmap */ * 512)
}
if (offset) {
// we're at the beginning of a range present in the file
rangeStart = index
offsetStart = offset
offsetEnd = offset + 1
} else {
// we're at the beginning of a range from the parent or empty
rangeStart = undefined
offsetStart = undefined
offsetEnd = undefined
}
}
for (let i = 0; i < graintable.length / 4; i++) {
const grainOffset = graintable.readInt32LE(i * 4)
if (grainOffset === 0) {
await changeRange()
// from parent
const tableSizeBytes = GRAIN_TABLE_COUNT * 8
const grainDirBuffer = await this.#read(this.#grainDirOffsetBytes, this.#grainDirSizeBytes)
// read the grain dir ( first level )
for (let grainDirIndex = 0; grainDirIndex < grainDirBuffer.length / 8; grainDirIndex++) {
const { type: grainDirType, tableIndex } = readSeSparseDir(grainDirBuffer, grainDirIndex)
if (grainDirType === SE_SPARSE_DIR_NON_ALLOCATED) {
// no grain table allocated at all in this grain dir
continue
}
if (grainOffset === 1) {
await changeRange()
// this is a emptied grain, no data, don't look into parent
buffer.fill(0, (i + 1) /* block bitmap */ * 512)
strictEqual(grainDirType, SE_SPARSE_DIR_ALLOCATED)
// read the corresponding grain table ( second level )
const grainTableBuffer = await this.#read(
this.#grainTableOffsetBytes + tableIndex * tableSizeBytes,
tableSizeBytes
)
// offset in bytes if >0, grainType if <=0
let grainOffsets = []
let blockId = grainDirIndex * 8
const addGrain = val => {
grainOffsets.push(val)
// 4096 block of 4Kb per dir entry =>16MB/grain dir
// 1 block = 2MB
// 512 grain => 1 block
// 8 block per dir entry
if (grainOffsets.length === 512) {
this.#grainIndex.set(blockId, grainOffsets)
grainOffsets = []
blockId++
}
}
if (grainOffset > 1) {
// non empty grain
await changeRange(i, grainOffset)
for (let grainTableIndex = 0; grainTableIndex < grainTableBuffer.length / 8; grainTableIndex++) {
const { type: grainType, grainIndex } = readSeSparseTable(grainTableBuffer, grainTableIndex)
if (grainType === SE_SPARSE_GRAIN_ALLOCATED) {
// this is ok in 32 bits int with VMDK smaller than 2TB
const offsetByte = grainIndex * GRAIN_SIZE_BYTES + this.#grainOffsetBytes
addGrain(offsetByte)
} else {
// multiply by -1 to differenciate type and offset
// no offset can be zero
addGrain(-grainType)
}
}
}
await changeRange()
return {
id: blockId,
bitmap: buffer.slice(0, 512),
data: buffer.slice(512),
buffer,
strictEqual(grainOffsets.length, 0)
}
}
async readBlock(blockId) {
let changed = false
const parentBlock = await this.#parentVhd.readBlock(blockId)
const parentBuffer = parentBlock.buffer
const grainOffsets = this.#grainIndex.get(blockId) // may be undefined if the child contains block and lookMissingBlockInParent=true
const EMPTY_GRAIN = Buffer.alloc(GRAIN_SIZE_BYTES, 0)
for (const index in grainOffsets) {
const value = grainOffsets[index]
let data
if (value > 0) {
// it's the offset in byte of a grain type SE_SPARSE_GRAIN_ALLOCATED
data = await this.#read(value, GRAIN_SIZE_BYTES)
} else {
// back to the real grain type
const type = value * -1
switch (type) {
case SE_SPARSE_GRAIN_ZERO:
case SE_SPARSE_GRAIN_UNMAPPED:
data = EMPTY_GRAIN
break
case SE_SPARSE_GRAIN_NON_ALLOCATED:
/* from parent */
break
default:
throw new Error(`can't handle grain type ${type}`)
}
}
if (data) {
changed = true
data.copy(parentBuffer, index * GRAIN_SIZE_BYTES + 512 /* block bitmap */)
}
}
// no need to copy if data all come from parent
return changed
? {
id: blockId,
bitmap: parentBuffer.slice(0, 512),
data: parentBuffer.slice(512),
buffer: parentBuffer,
}
: parentBlock
}
}

View File

@@ -1,13 +1,10 @@
import VHDEsxiSeSparse from './VhdEsxiSeSparse.mjs'
import VhdEsxiCowd from './VhdEsxiCowd.mjs'
// import VhdEsxiSeSparse from "./VhdEsxiSeSparse.mjs";
export default async function openDeltaVmdkasVhd(esxi, datastore, path, parentVhd, opts) {
let vhd
if (path.endsWith('-sesparse.vmdk')) {
throw new Error(
`sesparse VMDK reading is not functional yet ${path}. For now, this VM can only be migrated if it doesn't have any snapshots and if it is halted.`
)
// vhd = new VhdEsxiSeSparse(esxi, datastore, path, parentVhd, opts)
vhd = new VHDEsxiSeSparse(esxi, datastore, path, parentVhd, opts)
} else {
if (path.endsWith('-delta.vmdk')) {
vhd = new VhdEsxiCowd(esxi, datastore, path, parentVhd, opts)

View File

@@ -8,6 +8,7 @@
> Users must be able to say: “Nice enhancement, I'm eager to test it”
- [Backup/Restore] Button to open the raw log in the REST API (PR [#6936](https://github.com/vatesfr/xen-orchestra/pull/6936))
- [Vmware/Import] Support esxi 6.5+ with snapshot (PR [#6909](https://github.com/vatesfr/xen-orchestra/pull/6909))
### Bug fixes
@@ -45,7 +46,7 @@
- @vates/read-chunk minor
- complex-matcher patch
- xen-api patch
- xo-server patch
- xo-server minor
- xo-server-transport-xmpp patch
- xo-server-audit patch
- xo-web minor