refactor(nbd-client): remove unused method and default iterator
This commit is contained in:
parent
b644cbe28d
commit
fc1357db93
@ -3,7 +3,6 @@ import { Socket } from 'node:net'
|
||||
import { connect } from 'node:tls'
|
||||
import { fromCallback, pRetry, pDelay, pTimeout, pFromCallback } from 'promise-toolbox'
|
||||
import { readChunkStrict } from '@vates/read-chunk'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
|
||||
import {
|
||||
INIT_PASSWD,
|
||||
@ -21,9 +20,6 @@ import {
|
||||
OPTS_MAGIC,
|
||||
NBD_CMD_DISC,
|
||||
} from './constants.mjs'
|
||||
import { Readable } from 'node:stream'
|
||||
|
||||
const { warn } = createLogger('vates:nbd-client')
|
||||
|
||||
// documentation is here : https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||
|
||||
@ -337,58 +333,4 @@ export default class NbdClient {
|
||||
this.#readBlockResponse()
|
||||
})
|
||||
}
|
||||
|
||||
async *readBlocks(indexGenerator = 2 * 1024 * 1024) {
|
||||
// default : read all blocks
|
||||
if (typeof indexGenerator === 'number') {
|
||||
const exportSize = Number(this.#exportSize)
|
||||
const chunkSize = indexGenerator
|
||||
|
||||
indexGenerator = function* () {
|
||||
const nbBlocks = Math.ceil(exportSize / chunkSize)
|
||||
for (let index = 0; index < nbBlocks; index++) {
|
||||
yield { index, size: chunkSize }
|
||||
}
|
||||
}
|
||||
}
|
||||
const readAhead = []
|
||||
const readAheadMaxLength = this.#readAhead
|
||||
const makeReadBlockPromise = (index, size) => {
|
||||
const promise = pRetry(() => this.readBlock(index, size), {
|
||||
tries: this.#readBlockRetries,
|
||||
onRetry: async err => {
|
||||
warn('will retry reading block ', index, err)
|
||||
await this.reconnect()
|
||||
},
|
||||
})
|
||||
// error is handled during unshift
|
||||
promise.catch(() => {})
|
||||
return promise
|
||||
}
|
||||
|
||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||
for (const { index, size } of indexGenerator()) {
|
||||
// stack readAheadMaxLength promises before starting to handle the results
|
||||
if (readAhead.length === readAheadMaxLength) {
|
||||
// any error will stop reading blocks
|
||||
yield readAhead.shift()
|
||||
}
|
||||
|
||||
readAhead.push(makeReadBlockPromise(index, size))
|
||||
}
|
||||
while (readAhead.length > 0) {
|
||||
yield readAhead.shift()
|
||||
}
|
||||
}
|
||||
|
||||
stream(chunkSize) {
|
||||
async function* iterator() {
|
||||
for await (const chunk of this.readBlocks(chunkSize)) {
|
||||
yield chunk
|
||||
}
|
||||
}
|
||||
// create a readable stream instead of returning the iterator
|
||||
// since iterators don't like unshift and partial reading
|
||||
return Readable.from(iterator())
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,3 @@
|
||||
import NbdClient from '../index.mjs'
|
||||
import { spawn, exec } from 'node:child_process'
|
||||
import fs from 'node:fs/promises'
|
||||
import { test } from 'tap'
|
||||
@ -7,6 +6,7 @@ import { pFromCallback } from 'promise-toolbox'
|
||||
import { Socket } from 'node:net'
|
||||
import { NBD_DEFAULT_PORT } from '../constants.mjs'
|
||||
import assert from 'node:assert'
|
||||
import MultiNbdClient from '../multi.mjs'
|
||||
|
||||
const FILE_SIZE = 10 * 1024 * 1024
|
||||
|
||||
@ -81,7 +81,7 @@ test('it works with unsecured network', async tap => {
|
||||
const path = await createTempFile(FILE_SIZE)
|
||||
|
||||
let nbdServer = await spawnNbdKit(path)
|
||||
const client = new NbdClient(
|
||||
const client = new MultiNbdClient(
|
||||
{
|
||||
address: '127.0.0.1',
|
||||
exportname: 'MY_SECRET_EXPORT',
|
||||
@ -147,17 +147,6 @@ CYu1Xn/FVPx1HoRgWc7E8wFhDcA/P3SJtfIQWHB9FzSaBflKGR4t8WCE2eE8+cTB
|
||||
nbdServer = await spawnNbdKit(path)
|
||||
}
|
||||
}
|
||||
|
||||
// we can reuse the conneciton to read other blocks
|
||||
// default iterator
|
||||
const nbdIteratorWithDefaultBlockIterator = client.readBlocks()
|
||||
let nb = 0
|
||||
for await (const block of nbdIteratorWithDefaultBlockIterator) {
|
||||
nb++
|
||||
tap.equal(block.length, 2 * 1024 * 1024)
|
||||
}
|
||||
|
||||
tap.equal(nb, 5)
|
||||
assert.rejects(() => client.readBlock(100, CHUNK_SIZE))
|
||||
|
||||
await client.disconnect()
|
||||
|
@ -15,7 +15,16 @@ const { fuHeader, checksumStruct } = require('./_structs')
|
||||
const assert = require('node:assert')
|
||||
|
||||
exports.createNbdRawStream = function createRawStream(nbdClient) {
|
||||
const stream = Readable.from(nbdClient.readBlocks(), { objectMode: false })
|
||||
const exportSize = Number(nbdClient.exportSize)
|
||||
const chunkSize = 2 * 1024 * 1024
|
||||
|
||||
const indexGenerator = function* () {
|
||||
const nbBlocks = Math.ceil(exportSize / chunkSize)
|
||||
for (let index = 0; index < nbBlocks; index++) {
|
||||
yield { index, size: chunkSize }
|
||||
}
|
||||
}
|
||||
const stream = Readable.from(nbdClient.readBlocks(indexGenerator), { objectMode: false })
|
||||
|
||||
stream.on('error', () => nbdClient.disconnect())
|
||||
stream.on('end', () => nbdClient.disconnect())
|
||||
@ -26,6 +35,7 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
|
||||
const bufFooter = await readChunkStrict(sourceStream, FOOTER_SIZE)
|
||||
|
||||
const header = unpackHeader(await readChunkStrict(sourceStream, HEADER_SIZE))
|
||||
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
|
||||
// compute BAT in order
|
||||
const batSize = Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE) * SECTOR_SIZE
|
||||
await skipStrict(sourceStream, header.tableOffset - (FOOTER_SIZE + HEADER_SIZE))
|
||||
@ -47,7 +57,6 @@ exports.createNbdVhdStream = async function createVhdStream(nbdClient, sourceStr
|
||||
precLocator = offset
|
||||
}
|
||||
}
|
||||
header.tableOffset = FOOTER_SIZE + HEADER_SIZE
|
||||
const rawHeader = fuHeader.pack(header)
|
||||
checksumStruct(rawHeader, fuHeader)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user