feat(xo-server/vhd): createReadStream (#2763)
A stream to a synthetic full VHD.
This commit is contained in:
parent
86b0d5e2b7
commit
36d2de049f
68
packages/xo-server/src/ag2s.js
Normal file
68
packages/xo-server/src/ag2s.js
Normal file
@ -0,0 +1,68 @@
|
||||
import { Readable } from 'stream'
|
||||
|
||||
// return the next value of the iterator but if it is a promise, resolve it and
|
||||
// reinject it
|
||||
//
|
||||
// this enables the use of a simple generator instead of an async generator
|
||||
// (which are less widely supported)
|
||||
const next = async (iterator, arg) => {
|
||||
let cursor = iterator.next(arg)
|
||||
if (typeof cursor.then === 'function') {
|
||||
return cursor
|
||||
}
|
||||
let value
|
||||
while (
|
||||
!cursor.done &&
|
||||
(value = cursor.value) != null &&
|
||||
typeof value.then === 'function'
|
||||
) {
|
||||
let success = false
|
||||
try {
|
||||
value = await value
|
||||
success = true
|
||||
} catch (error) {
|
||||
cursor = iterator.throw(error)
|
||||
}
|
||||
if (success) {
|
||||
cursor = iterator.next(value)
|
||||
}
|
||||
}
|
||||
return cursor
|
||||
}
|
||||
|
||||
// Create a readable stream from a generator
|
||||
//
|
||||
// generator can be async or can yield promises to wait for them
|
||||
export const createReadable = (generator, options) => {
|
||||
const readable = new Readable(options)
|
||||
readable._read = size => {
|
||||
const iterator = generator(size)
|
||||
readable._destroy = (error, cb) => {
|
||||
iterator.throw(error)
|
||||
cb(error)
|
||||
}
|
||||
let running = false
|
||||
const read = (readable._read = async size => {
|
||||
if (running) {
|
||||
return
|
||||
}
|
||||
running = true
|
||||
try {
|
||||
let cursor
|
||||
do {
|
||||
cursor = await next(iterator, size)
|
||||
if (cursor.done) {
|
||||
return readable.push(null)
|
||||
}
|
||||
} while (readable.push(cursor.value))
|
||||
} catch (error) {
|
||||
readable.emit('error', error)
|
||||
} finally {
|
||||
running = false
|
||||
}
|
||||
})
|
||||
return read(size)
|
||||
}
|
||||
|
||||
return readable
|
||||
}
|
@ -13,7 +13,7 @@ import pick from 'lodash/pick'
|
||||
import tmp from 'tmp'
|
||||
import xml2js from 'xml2js'
|
||||
import { randomBytes } from 'crypto'
|
||||
import { resolve } from 'path'
|
||||
import { dirname, resolve } from 'path'
|
||||
import { utcFormat, utcParse } from 'd3-time-format'
|
||||
import {
|
||||
all as pAll,
|
||||
@ -319,6 +319,12 @@ export const popProperty = obj => {
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// resolve a relative path from a file
|
||||
export const resolveRelativeFromFile = (file, path) =>
|
||||
resolve('/', dirname(file), path).slice(1)
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
// Format a date in ISO 8601 in a safe way to be used in filenames
|
||||
// (even on Windows).
|
||||
export const safeDateFormat = utcFormat('%Y%m%dT%H%M%SZ')
|
||||
|
@ -15,6 +15,8 @@ declare export function asyncMap<K, V1, V2>(
|
||||
|
||||
declare export function getPseudoRandomBytes(n: number): Buffer
|
||||
|
||||
declare export function resolveRelativeFromFile(file: string, path: string): string
|
||||
|
||||
declare export function safeDateFormat(timestamp: number): string
|
||||
|
||||
declare export function serializeError(error: Error): Object
|
||||
|
@ -9,7 +9,8 @@ import { fromEvent } from 'promise-toolbox'
|
||||
|
||||
import type RemoteHandler from './remote-handlers/abstract'
|
||||
import constantStream from './constant-stream'
|
||||
import { noop, streamToBuffer } from './utils'
|
||||
import { createReadable } from './ag2s'
|
||||
import { noop, resolveRelativeFromFile, streamToBuffer } from './utils'
|
||||
|
||||
const VHD_UTIL_DEBUG = 0
|
||||
const debug = VHD_UTIL_DEBUG ? str => console.log(`[vhd-util]${str}`) : noop
|
||||
@ -186,21 +187,24 @@ function checksumStruct (rawStruct, struct) {
|
||||
|
||||
// Format:
|
||||
//
|
||||
// - Footer (512)
|
||||
// - Header (1024)
|
||||
//
|
||||
// - BAT (batSize @ header.tableOffset)
|
||||
// - Block i (@ blockOffset(i))
|
||||
// - bitmap (blockBitmapSize)
|
||||
// - data (header.blockSize)
|
||||
//
|
||||
// - Footer (512)
|
||||
// 1. Footer (512)
|
||||
// 2. Header (1024)
|
||||
// 3. Unordered entries
|
||||
// - BAT (batSize @ header.tableOffset)
|
||||
// - Blocks (@ blockOffset(i))
|
||||
// - bitmap (blockBitmapSize)
|
||||
// - data (header.blockSize)
|
||||
// - Parent locators (parentLocatorSize(i) @ parentLocatorOffset(i))
|
||||
// 4. Footer (512 @ vhdSize - 512)
|
||||
//
|
||||
// Variables:
|
||||
//
|
||||
// - batSize = min(1, ceil(header.maxTableEntries * 4)) * sectorSize
|
||||
// - batSize = min(1, ceil(header.maxTableEntries * 4 / sectorSize)) * sectorSize
|
||||
// - blockBitmapSize = ceil(header.blockSize / sectorSize / 8 / sectorSize) * sectorSize
|
||||
// - blockOffset(i) = bat[i] * sectorSize
|
||||
// - nBlocks = ceil(footer.currentSize / header.blockSize)
|
||||
// - parentLocatorOffset(i) = header.parentLocatorEntry[i].platformDataOffset
|
||||
// - parentLocatorSize(i) = header.parentLocatorEntry[i].platformDataSpace * sectorSize
|
||||
// - sectorSize = 512
|
||||
export class Vhd {
|
||||
constructor (handler, path) {
|
||||
@ -223,6 +227,10 @@ export class Vhd {
|
||||
return this._readStream(start, n).then(streamToBuffer)
|
||||
}
|
||||
|
||||
containsBlock (id) {
|
||||
return this._getBatEntry(id) !== BLOCK_UNUSED
|
||||
}
|
||||
|
||||
// Returns the first address after metadata. (In bytes)
|
||||
getEndOfHeaders () {
|
||||
const { header } = this
|
||||
@ -703,7 +711,7 @@ export default concurrency(2)(async function vhdMerge (
|
||||
blockId < childVhd.header.maxTableEntries;
|
||||
blockId++
|
||||
) {
|
||||
if (childVhd._getBatEntry(blockId) !== BLOCK_UNUSED) {
|
||||
if (childVhd.containsBlock(blockId)) {
|
||||
mergedDataSize += await parentVhd.coalesceBlock(childVhd, blockId)
|
||||
}
|
||||
}
|
||||
@ -761,6 +769,139 @@ export async function chainVhd (
|
||||
return false
|
||||
}
|
||||
|
||||
export const createReadStream = (handler, path) =>
|
||||
createReadable(function * () {
|
||||
const fds = []
|
||||
|
||||
try {
|
||||
const vhds = []
|
||||
while (true) {
|
||||
const fd = yield handler.openFile(path, 'r')
|
||||
fds.push(fd)
|
||||
const vhd = new Vhd(handler, fd)
|
||||
vhds.push(vhd)
|
||||
yield vhd.readHeaderAndFooter()
|
||||
yield vhd.readBlockTable()
|
||||
|
||||
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
|
||||
break
|
||||
}
|
||||
|
||||
path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName)
|
||||
}
|
||||
const nVhds = vhds.length
|
||||
|
||||
// this the VHD we want to synthetize
|
||||
const vhd = vhds[0]
|
||||
|
||||
// data of our synthetic VHD
|
||||
// TODO: empty parentUuid and parentLocatorEntry-s in header
|
||||
let header = {
|
||||
...vhd.header,
|
||||
tableOffset: {
|
||||
high: 0,
|
||||
low: 512 + 1024,
|
||||
},
|
||||
parentUnicodeName: '',
|
||||
}
|
||||
|
||||
const bat = Buffer.allocUnsafe(
|
||||
Math.ceil(4 * header.maxTableEntries / VHD_SECTOR_SIZE) *
|
||||
VHD_SECTOR_SIZE
|
||||
)
|
||||
let footer = {
|
||||
...vhd.footer,
|
||||
diskType: HARD_DISK_TYPE_DYNAMIC,
|
||||
}
|
||||
const sectorsPerBlockData = vhd.sectorsPerBlock
|
||||
const sectorsPerBlock =
|
||||
sectorsPerBlockData + vhd.bitmapSize / VHD_SECTOR_SIZE
|
||||
|
||||
const nBlocks = Math.ceil(
|
||||
uint32ToUint64(footer.currentSize) / header.blockSize
|
||||
)
|
||||
|
||||
const blocksOwner = new Array(nBlocks)
|
||||
for (
|
||||
let iBlock = 0,
|
||||
blockOffset = Math.ceil((512 + 1024 + bat.length) / VHD_SECTOR_SIZE);
|
||||
iBlock < nBlocks;
|
||||
++iBlock
|
||||
) {
|
||||
let blockSector = BLOCK_UNUSED
|
||||
for (let i = 0; i < nVhds; ++i) {
|
||||
if (vhds[i].containsBlock(iBlock)) {
|
||||
blocksOwner[iBlock] = i
|
||||
blockSector = blockOffset
|
||||
blockOffset += sectorsPerBlock
|
||||
break
|
||||
}
|
||||
}
|
||||
bat.writeUInt32BE(blockSector, iBlock * 4)
|
||||
}
|
||||
|
||||
footer = fuFooter.pack(footer)
|
||||
checksumStruct(footer, fuFooter)
|
||||
yield footer
|
||||
|
||||
header = fuHeader.pack(header)
|
||||
checksumStruct(header, fuHeader)
|
||||
yield header
|
||||
|
||||
yield bat
|
||||
|
||||
const bitmap = Buffer.alloc(vhd.bitmapSize, 0xff)
|
||||
for (let iBlock = 0; iBlock < nBlocks; ++iBlock) {
|
||||
const owner = blocksOwner[iBlock]
|
||||
if (owner === undefined) {
|
||||
continue
|
||||
}
|
||||
|
||||
yield bitmap
|
||||
|
||||
const blocksByVhd = new Map()
|
||||
const emitBlockSectors = function * (iVhd, i, n) {
|
||||
const vhd = vhds[iVhd]
|
||||
if (!vhd.containsBlock(iBlock)) {
|
||||
yield * emitBlockSectors(iVhd + 1, i, n)
|
||||
return
|
||||
}
|
||||
let block = blocksByVhd.get(vhd)
|
||||
if (block === undefined) {
|
||||
block = yield vhd._readBlock(iBlock)
|
||||
blocksByVhd.set(vhd, block)
|
||||
}
|
||||
const { bitmap, data } = block
|
||||
if (vhd.footer.diskType === HARD_DISK_TYPE_DYNAMIC) {
|
||||
yield data.slice(i * VHD_SECTOR_SIZE, n * VHD_SECTOR_SIZE)
|
||||
return
|
||||
}
|
||||
while (i < n) {
|
||||
const hasData = mapTestBit(bitmap, i)
|
||||
const start = i
|
||||
do {
|
||||
++i
|
||||
} while (i < n && mapTestBit(bitmap, i) === hasData)
|
||||
if (hasData) {
|
||||
yield data.slice(start * VHD_SECTOR_SIZE, i * VHD_SECTOR_SIZE)
|
||||
} else {
|
||||
yield * emitBlockSectors(iVhd + 1, start, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
yield * emitBlockSectors(owner, 0, sectorsPerBlock)
|
||||
}
|
||||
|
||||
yield footer
|
||||
} finally {
|
||||
for (let i = 0, n = fds.length; i < n; ++i) {
|
||||
handler.closeFile(fds[i]).catch(error => {
|
||||
console.warn('createReadStream, closeFd', i, error)
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
export async function readVhdMetadata (handler: RemoteHandler, path: string) {
|
||||
const vhd = new Vhd(handler, path)
|
||||
await vhd.readHeaderAndFooter()
|
||||
|
@ -4,7 +4,7 @@
|
||||
import defer from 'golike-defer'
|
||||
import { type Pattern, createPredicate } from 'value-matcher'
|
||||
import { type Readable, PassThrough } from 'stream'
|
||||
import { basename, dirname, resolve } from 'path'
|
||||
import { basename, dirname } from 'path'
|
||||
import { isEmpty, last, mapValues, noop, values } from 'lodash'
|
||||
import { timeout as pTimeout } from 'promise-toolbox'
|
||||
|
||||
@ -19,10 +19,15 @@ import {
|
||||
type Vm,
|
||||
type Xapi,
|
||||
} from '../../xapi'
|
||||
import { asyncMap, safeDateFormat, serializeError } from '../../utils'
|
||||
import {
|
||||
asyncMap,
|
||||
resolveRelativeFromFile,
|
||||
safeDateFormat,
|
||||
serializeError,
|
||||
} from '../../utils'
|
||||
import mergeVhd, {
|
||||
HARD_DISK_TYPE_DIFFERENCING,
|
||||
chainVhd,
|
||||
createReadStream as createVhdReadStream,
|
||||
readVhdMetadata,
|
||||
} from '../../vhd-merge'
|
||||
|
||||
@ -148,28 +153,6 @@ const listReplicatedVms = (
|
||||
return values(vms).sort(compareSnapshotTime)
|
||||
}
|
||||
|
||||
// returns the chain of parents of this VHD
|
||||
//
|
||||
// TODO: move to vhd-merge module
|
||||
const getVhdChain = async (
|
||||
handler: RemoteHandler,
|
||||
path: string
|
||||
): Promise<Object[]> => {
|
||||
const chain = []
|
||||
|
||||
while (true) {
|
||||
const vhd = await readVhdMetadata(handler, path)
|
||||
vhd.path = path
|
||||
chain.push(vhd)
|
||||
if (vhd.header.type !== HARD_DISK_TYPE_DIFFERENCING) {
|
||||
break
|
||||
}
|
||||
path = resolveRelativeFromFile(path, vhd.header.parentUnicodeName)
|
||||
}
|
||||
|
||||
return chain
|
||||
}
|
||||
|
||||
const importers: $Dict<
|
||||
(
|
||||
handler: RemoteHandler,
|
||||
@ -186,16 +169,10 @@ const importers: $Dict<
|
||||
|
||||
const streams = {}
|
||||
await asyncMap(vdis, async (vdi, id) => {
|
||||
const chain = await getVhdChain(
|
||||
streams[`${id}.vhd`] = await createVhdReadStream(
|
||||
handler,
|
||||
resolveRelativeFromFile(metadataFilename, vhds[id])
|
||||
)
|
||||
streams[`${id}.vhd`] = await asyncMap(chain, ({ path }) =>
|
||||
handler.createReadStream(path, {
|
||||
checksum: true,
|
||||
ignoreMissingChecksum: true,
|
||||
})
|
||||
)
|
||||
})
|
||||
|
||||
const delta: DeltaVmImport = {
|
||||
@ -250,10 +227,6 @@ const parseVmBackupId = (id: string) => {
|
||||
}
|
||||
}
|
||||
|
||||
// used to resolve the xva field from the metadata
|
||||
const resolveRelativeFromFile = (file: string, path: string): string =>
|
||||
resolve('/', dirname(file), path).slice(1)
|
||||
|
||||
const unboxIds = (pattern?: SimpleIdPattern): string[] => {
|
||||
if (pattern === undefined) {
|
||||
return []
|
||||
|
Loading…
Reference in New Issue
Block a user