Compare commits
10 Commits
lite/rewor
...
feat_nbd_s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e1227c710 | ||
|
|
ddc73fb836 | ||
|
|
a13fda5fe9 | ||
|
|
66bee59774 | ||
|
|
685400bbf8 | ||
|
|
5bef8fc411 | ||
|
|
aa7ff1449a | ||
|
|
3dca7f2a71 | ||
|
|
3dc2f649f6 | ||
|
|
9eb537c2f9 |
@@ -1,8 +1,11 @@
|
||||
'use strict'
|
||||
|
||||
module.exports = {
|
||||
arrowParens: 'avoid',
|
||||
jsxSingleQuote: true,
|
||||
semi: false,
|
||||
singleQuote: true,
|
||||
trailingComma: 'es5',
|
||||
|
||||
// 2020-11-24: Requested by nraynaud and approved by the rest of the team
|
||||
//
|
||||
|
||||
32
@vates/nbd-client/bench.mjs
Normal file
32
@vates/nbd-client/bench.mjs
Normal file
@@ -0,0 +1,32 @@
|
||||
import NbdClient from "./client.mjs";
|
||||
|
||||
|
||||
|
||||
async function bench(){
|
||||
const client = new NbdClient({
|
||||
address:'localhost',
|
||||
port: 9000,
|
||||
exportname: 'bench_export'
|
||||
})
|
||||
await client.connect()
|
||||
console.log('connected', client.exportSize)
|
||||
|
||||
for(let chunk_size=16*1024; chunk_size < 16*1024*1024; chunk_size *=2){
|
||||
|
||||
|
||||
let i=0
|
||||
const start = + new Date()
|
||||
for await(const block of client.readBlocks(chunk_size) ){
|
||||
i++
|
||||
if((i*chunk_size) % (16*1024*1024) ===0){
|
||||
process.stdout.write('.')
|
||||
}
|
||||
if(i*chunk_size > 1024*1024*1024) break
|
||||
}
|
||||
console.log(chunk_size,Math.round( (i*chunk_size/1024/1024*1000)/ (new Date() - start)))
|
||||
|
||||
}
|
||||
await client.disconnect()
|
||||
}
|
||||
|
||||
bench()
|
||||
@@ -74,7 +74,7 @@ export default class NbdClient {
|
||||
this.#serverSocket = connect({
|
||||
socket: this.#serverSocket,
|
||||
rejectUnauthorized: false,
|
||||
cert: this.#serverCert,
|
||||
cert: this.#serverCert
|
||||
})
|
||||
this.#serverSocket.once('error', reject)
|
||||
this.#serverSocket.once('secureConnect', () => {
|
||||
@@ -88,7 +88,11 @@ export default class NbdClient {
|
||||
async #unsecureConnect() {
|
||||
this.#serverSocket = new Socket()
|
||||
return new Promise((resolve, reject) => {
|
||||
this.#serverSocket.connect(this.#serverPort, this.#serverAddress)
|
||||
this.#serverSocket.connect({
|
||||
port:this.#serverPort,
|
||||
host: this.#serverAddress,
|
||||
// @todo should test the onRead to limit buffer copy
|
||||
})
|
||||
this.#serverSocket.once('error', reject)
|
||||
this.#serverSocket.once('connect', () => {
|
||||
this.#serverSocket.removeListener('error', reject)
|
||||
@@ -232,19 +236,20 @@ export default class NbdClient {
|
||||
}
|
||||
try {
|
||||
this.#waitingForResponse = true
|
||||
const magic = await this.#readInt32()
|
||||
const buffer = await this.#read(4+4+8)
|
||||
const magic = buffer.readUInt32BE()
|
||||
|
||||
if (magic !== NBD_REPLY_MAGIC) {
|
||||
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
|
||||
}
|
||||
|
||||
const error = await this.#readInt32()
|
||||
const error = buffer.readUInt32BE(4)
|
||||
if (error !== 0) {
|
||||
// @todo use error code from constants.mjs
|
||||
throw new Error(`GOT ERROR CODE : ${error}`)
|
||||
}
|
||||
|
||||
const blockQueryId = await this.#readInt64()
|
||||
const blockQueryId = buffer.readBigUInt64BE(8)
|
||||
const query = this.#commandQueryBacklog.get(blockQueryId)
|
||||
if (!query) {
|
||||
throw new Error(` no query associated with id ${blockQueryId}`)
|
||||
@@ -307,11 +312,11 @@ export default class NbdClient {
|
||||
})
|
||||
}
|
||||
|
||||
async *readBlocks(indexGenerator) {
|
||||
async *readBlocks(indexGenerator = 2*1024*1024) {
|
||||
// default : read all blocks
|
||||
if (indexGenerator === undefined) {
|
||||
if (typeof indexGenerator === 'number') {
|
||||
const exportSize = this.#exportSize
|
||||
const chunkSize = 2 * 1024 * 1024
|
||||
const chunkSize = indexGenerator
|
||||
indexGenerator = function* () {
|
||||
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
|
||||
for (let index = 0; BigInt(index) < nbBlocks; index++) {
|
||||
@@ -319,12 +324,14 @@ export default class NbdClient {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const readAhead = []
|
||||
const readAheadMaxLength = this.#readAhead
|
||||
const makeReadBlockPromise = (index, size) => {
|
||||
const promise = pRetry(() => this.readBlock(index, size), {
|
||||
tries: this.#readBlockRetries,
|
||||
onRetry: async err => {
|
||||
console.error(err)
|
||||
warn('will retry reading block ', index, err)
|
||||
await this.reconnect()
|
||||
},
|
||||
@@ -336,6 +343,7 @@ export default class NbdClient {
|
||||
|
||||
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
|
||||
for (const { index, size } of indexGenerator()) {
|
||||
|
||||
// stack readAheadMaxLength promises before starting to handle the results
|
||||
if (readAhead.length === readAheadMaxLength) {
|
||||
// any error will stop reading blocks
|
||||
@@ -348,4 +356,4 @@ export default class NbdClient {
|
||||
yield readAhead.shift()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,40 @@
|
||||
// https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
|
||||
|
||||
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
|
||||
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
|
||||
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
|
||||
|
||||
export const NBD_OPT_EXPORT_NAME = 1
|
||||
export const NBD_OPT_ABORT = 2
|
||||
export const NBD_OPT_LIST = 3
|
||||
export const NBD_OPT_STARTTLS = 5
|
||||
export const NBD_OPT_INFO = 6
|
||||
export const NBD_OPT_GO = 7
|
||||
export const NBD_OPT_STRUCTURED_REPLY = 8
|
||||
export const NBD_OPT_LIST_META_CONTEXT = 9
|
||||
export const NBD_OPT_SET_META_CONTEXT = 10
|
||||
export const NBD_OPT_EXTENDED_HEADERS = 11
|
||||
|
||||
export const NBD_REP_ACK =1
|
||||
export const NBD_REP_SERVER = 2
|
||||
export const NBD_REP_INFO = 3
|
||||
export const NBD_REP_META_CONTEXT = 4
|
||||
export const NBD_REP_ERR_UNSUP = 0x80000001 // 2^32+1
|
||||
export const NBD_REP_ERR_POLICY = 0x80000002
|
||||
export const NBD_REP_ERR_INVALID = 0x80000003
|
||||
export const NBD_REP_ERR_PLATFORM = 0x80000004
|
||||
export const NBD_REP_ERR_TLS_REQD = 0x80000005
|
||||
export const NBD_REP_ERR_UNKNOWN = 0x80000006
|
||||
export const NBD_REP_ERR_SHUTDOWN = 0x80000007
|
||||
export const NBD_REP_ERR_BLOCK_SIZE_REQD = 0x80000008
|
||||
export const NBD_REP_ERR_TOO_BIG = 0x80000009
|
||||
export const NBD_REP_ERR_EXT_HEADER_REQD = 0x8000000a
|
||||
|
||||
export const NBD_INFO_EXPORT = 0
|
||||
export const NBD_INFO_NAME = 1
|
||||
export const NBD_INFO_DESCRIPTION = 2
|
||||
export const NBD_INFO_BLOCK_SIZE = 3
|
||||
|
||||
|
||||
export const NBD_FLAG_HAS_FLAGS = 1 << 0
|
||||
export const NBD_FLAG_READ_ONLY = 1 << 1
|
||||
@@ -14,6 +42,9 @@ export const NBD_FLAG_SEND_FLUSH = 1 << 2
|
||||
export const NBD_FLAG_SEND_FUA = 1 << 3
|
||||
export const NBD_FLAG_ROTATIONAL = 1 << 4
|
||||
export const NBD_FLAG_SEND_TRIM = 1 << 5
|
||||
export const NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
|
||||
export const NBD_FLAG_SEND_DF = 1 << 7
|
||||
export const NBD_FLAG_CAN_MULTI_CONN = 1 << 8
|
||||
|
||||
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
|
||||
|
||||
@@ -36,6 +67,15 @@ export const NBD_CMD_RESIZE = 8
|
||||
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
|
||||
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
|
||||
export const NBD_REPLY_ACK = 1
|
||||
export const NBD_SIMPLE_REPLY_MAGIC = 0x67446698
|
||||
export const NBD_STRUCTURED_REPLY_MAGIC = 0x668e33ef
|
||||
export const NBD_REPLY_TYPE_NONE = 0
|
||||
export const NBD_REPLY_TYPE_OFFSET_DATA = 1
|
||||
export const NBD_REPLY_TYPE_OFFSET_HOLE = 2
|
||||
export const NBD_REPLY_TYPE_BLOCK_STATUS = 5
|
||||
export const NBD_REPLY_TYPE_ERROR = 1 << 15 +1
|
||||
export const NBD_REPLY_TYPE_ERROR_OFFSET = 1 << 15 +2
|
||||
|
||||
|
||||
export const NBD_DEFAULT_PORT = 10809
|
||||
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024
|
||||
|
||||
292
@vates/nbd-client/server.mjs
Normal file
292
@vates/nbd-client/server.mjs
Normal file
@@ -0,0 +1,292 @@
|
||||
import assert, { deepEqual, strictEqual, notStrictEqual } from 'node:assert'
|
||||
import { createServer } from 'node:net'
|
||||
import { fromCallback } from 'promise-toolbox'
|
||||
import { readChunkStrict } from '@vates/read-chunk'
|
||||
import {
|
||||
INIT_PASSWD,
|
||||
NBD_CMD_READ,
|
||||
NBD_DEFAULT_PORT,
|
||||
NBD_FLAG_FIXED_NEWSTYLE,
|
||||
NBD_FLAG_HAS_FLAGS,
|
||||
NBD_OPT_EXPORT_NAME,
|
||||
NBD_OPT_REPLY_MAGIC,
|
||||
NBD_REPLY_ACK,
|
||||
NBD_REQUEST_MAGIC,
|
||||
OPTS_MAGIC,
|
||||
NBD_CMD_DISC,
|
||||
NBD_REP_ERR_UNSUP,
|
||||
NBD_CMD_WRITE,
|
||||
NBD_OPT_GO,
|
||||
NBD_OPT_INFO,
|
||||
NBD_INFO_EXPORT,
|
||||
NBD_REP_INFO,
|
||||
NBD_SIMPLE_REPLY_MAGIC,
|
||||
NBD_REP_ERR_UNKNOWN,
|
||||
} from './constants.mjs'
|
||||
import { PassThrough } from 'node:stream'
|
||||
|
||||
export default class NbdServer {
|
||||
#server
|
||||
#clients = new Map()
|
||||
constructor(port = NBD_DEFAULT_PORT) {
|
||||
this.#server = createServer()
|
||||
this.#server.listen(port)
|
||||
this.#server.on('connection', client => this.#handleNewConnection(client))
|
||||
}
|
||||
|
||||
// will wait for a client to connect and upload the file to this server
|
||||
downloadStream(key) {
|
||||
strictEqual(this.#clients.has(key), false)
|
||||
const stream = new PassThrough()
|
||||
const offset = BigInt(0)
|
||||
this.#clients.set(key, { length: BigInt(2 * 1024 * 1024 * 1024 * 1024), stream, offset, key })
|
||||
return stream
|
||||
}
|
||||
|
||||
// will wait for a client to connect and downlaod this stream
|
||||
uploadStream(key, source, length) {
|
||||
strictEqual(this.#clients.has(key), false)
|
||||
notStrictEqual(length, undefined)
|
||||
const offset = BigInt(0)
|
||||
this.#clients.set(key, { length: BigInt(length), stream: source, offset, key })
|
||||
}
|
||||
|
||||
#read(socket, length) {
|
||||
return readChunkStrict(socket, length)
|
||||
}
|
||||
async #readInt32(socket) {
|
||||
const buffer = await this.#read(socket, 4)
|
||||
return buffer.readUInt32BE()
|
||||
}
|
||||
|
||||
#write(socket, buffer) {
|
||||
return fromCallback.call(socket, 'write', buffer)
|
||||
}
|
||||
async #writeInt16(socket, int16) {
|
||||
const buffer = Buffer.alloc(2)
|
||||
buffer.writeUInt16BE(int16)
|
||||
return this.#write(socket, buffer)
|
||||
}
|
||||
async #writeInt32(socket, int32) {
|
||||
const buffer = Buffer.alloc(4)
|
||||
buffer.writeUInt32BE(int32)
|
||||
return this.#write(socket, buffer)
|
||||
}
|
||||
async #writeInt64(socket, int64) {
|
||||
const buffer = Buffer.alloc(8)
|
||||
buffer.writeBigUInt64BE(int64)
|
||||
return this.#write(socket, buffer)
|
||||
}
|
||||
|
||||
async #openExport(key) {
|
||||
if (!this.#clients.has(key)) {
|
||||
// export does not exists
|
||||
const err = new Error('Export not found ')
|
||||
err.code = 'ENOTFOUND'
|
||||
throw err
|
||||
}
|
||||
const { length } = this.#clients.get(key)
|
||||
return length
|
||||
}
|
||||
|
||||
async #sendOptionResponse(socket, option, response, data = Buffer.alloc(0)) {
|
||||
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
|
||||
await this.#writeInt32(socket, option)
|
||||
await this.#writeInt32(socket, response)
|
||||
await this.#writeInt32(socket, data.length)
|
||||
await this.#write(socket, data)
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Socket} socket
|
||||
* @returns true if server is waiting for more options
|
||||
*/
|
||||
async #readOption(socket) {
|
||||
console.log('wait for option')
|
||||
const magic = await this.#read(socket, 8)
|
||||
console.log(magic.toString('ascii'), magic.length, OPTS_MAGIC.toString('ascii'))
|
||||
deepEqual(magic, OPTS_MAGIC)
|
||||
const option = await this.#readInt32(socket)
|
||||
const length = await this.#readInt32(socket)
|
||||
console.log({ option, length })
|
||||
const data = length > 0 ? await this.#read(socket, length) : undefined
|
||||
switch (option) {
|
||||
case NBD_OPT_EXPORT_NAME: {
|
||||
const exportNameLength = data.readInt32BE()
|
||||
const key = data.slice(4, exportNameLength + 4).toString()
|
||||
let exportSize
|
||||
try {
|
||||
exportSize = await this.#openExport(key)
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOTFOUND') {
|
||||
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
|
||||
return false
|
||||
}
|
||||
throw err
|
||||
}
|
||||
socket.key = key
|
||||
await this.#writeInt64(socket, exportSize)
|
||||
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
|
||||
await this.#write(socket, Buffer.alloc(124) /* padding */)
|
||||
|
||||
return false
|
||||
}
|
||||
/*
|
||||
case NBD_OPT_STARTTLS:
|
||||
console.log('starttls')
|
||||
// @todo not working
|
||||
return true
|
||||
*/
|
||||
|
||||
case NBD_OPT_GO:
|
||||
case NBD_OPT_INFO: {
|
||||
const exportNameLength = data.readInt32BE()
|
||||
const key = data.slice(4, exportNameLength + 4).toString()
|
||||
let exportSize
|
||||
try {
|
||||
exportSize = await this.#openExport(key)
|
||||
} catch (err) {
|
||||
if (err.code === 'ENOTFOUND') {
|
||||
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
|
||||
// @todo should disconnect
|
||||
return false
|
||||
}
|
||||
throw err
|
||||
}
|
||||
socket.key = key
|
||||
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
|
||||
await this.#writeInt32(socket, option)
|
||||
await this.#writeInt32(socket, NBD_REP_INFO)
|
||||
await this.#writeInt32(socket, 12)
|
||||
// the export info
|
||||
await this.#writeInt16(socket, NBD_INFO_EXPORT)
|
||||
await this.#writeInt64(socket, exportSize)
|
||||
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
|
||||
|
||||
// an ACK at the end of the infos
|
||||
await this.#sendOptionResponse(socket, option, NBD_REPLY_ACK) // no additionnal data
|
||||
return option === NBD_OPT_INFO // we stays in option phase is option is INFO
|
||||
}
|
||||
default:
|
||||
// not supported
|
||||
console.log('not supported', option, length, data?.toString())
|
||||
await this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNSUP) // no additionnal data
|
||||
// wait for next option
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
async #readCommand(socket) {
|
||||
const key = socket.key
|
||||
// this socket has an export key
|
||||
notStrictEqual(key, undefined)
|
||||
// this export key is still valid
|
||||
strictEqual(this.#clients.has(key), true)
|
||||
const client = this.#clients.get(key)
|
||||
|
||||
const buffer = await this.#read(socket, 28)
|
||||
const magic = buffer.readInt32BE(0)
|
||||
strictEqual(magic, NBD_REQUEST_MAGIC)
|
||||
/* const commandFlags = */ buffer.readInt16BE(4)
|
||||
const command = buffer.readInt16BE(6)
|
||||
const cookie = buffer.readBigUInt64BE(8)
|
||||
const offset = buffer.readBigUInt64BE(16)
|
||||
const length = buffer.readInt32BE(24)
|
||||
switch (command) {
|
||||
case NBD_CMD_DISC:
|
||||
console.log('gotdisconnect', client.offset)
|
||||
await client.stream?.destroy()
|
||||
// @todo : disconnect
|
||||
return false
|
||||
case NBD_CMD_READ: {
|
||||
/** simple replies */
|
||||
|
||||
// read length byte from offset in export
|
||||
|
||||
// the client is writing in contiguous mode
|
||||
assert.strictEqual(offset, client.offset)
|
||||
client.offset += BigInt(length)
|
||||
const data = await readChunkStrict(client.stream, length)
|
||||
const reply = Buffer.alloc(16)
|
||||
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||
reply.writeInt32BE(0, 4) // no error
|
||||
reply.writeBigInt64BE(cookie, 8)
|
||||
await this.#write(socket, reply)
|
||||
await this.#write(socket, data)
|
||||
/* if we implement non stream read, we can handle read in parallel
|
||||
const reply = Buffer.alloc(16+length)
|
||||
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||
reply.writeInt32BE(0,4)// no error
|
||||
reply.writeBigInt64BE(cookie,8)
|
||||
|
||||
// read length byte from offset in export directly in the given buffer
|
||||
// may do multiple read in parallel on the same export
|
||||
size += length
|
||||
socket.fd.read(reply, 16, length, Number(offset))
|
||||
.then(()=>{
|
||||
return this.#write(socket, reply)
|
||||
})
|
||||
.catch(err => console.error('NBD_CMD_READ',err)) */
|
||||
return true
|
||||
}
|
||||
case NBD_CMD_WRITE: {
|
||||
// the client is writing in contiguous mode
|
||||
assert.strictEqual(offset, client.offset)
|
||||
|
||||
const data = await this.#read(socket, length)
|
||||
client.offset += BigInt(length)
|
||||
await new Promise((resolve, reject) => {
|
||||
if (!client.stream.write(data, 0, length, Number(offset))) {
|
||||
client.stream.once('drain', err => (err ? reject(err) : resolve()))
|
||||
} else {
|
||||
process.nextTick(resolve)
|
||||
}
|
||||
})
|
||||
const reply = Buffer.alloc(16)
|
||||
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
|
||||
reply.writeInt32BE(0, 4) // no error
|
||||
reply.writeBigInt64BE(cookie, 8)
|
||||
await this.#write(socket, reply)
|
||||
return true
|
||||
}
|
||||
default:
|
||||
console.log('GOT unsupported command ', command)
|
||||
// fail to handle
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
async #handleNewConnection(socket) {
|
||||
const remoteAddress = socket.remoteAddress + ':' + socket.remotePort
|
||||
console.log('new client connection from %s', remoteAddress)
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log('client ', remoteAddress, 'is done')
|
||||
})
|
||||
socket.on('error', error => {
|
||||
throw error
|
||||
})
|
||||
// handshake
|
||||
await this.#write(socket, INIT_PASSWD)
|
||||
await this.#write(socket, OPTS_MAGIC)
|
||||
|
||||
// send flags , the bare minimum
|
||||
await this.#writeInt16(socket, NBD_FLAG_FIXED_NEWSTYLE)
|
||||
const clientFlag = await this.#readInt32(socket)
|
||||
assert.strictEqual(clientFlag & NBD_FLAG_FIXED_NEWSTYLE, NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
|
||||
|
||||
// read client response flags
|
||||
let waitingForOptions = true
|
||||
while (waitingForOptions) {
|
||||
waitingForOptions = await this.#readOption(socket)
|
||||
}
|
||||
|
||||
let waitingForCommand = true
|
||||
while (waitingForCommand) {
|
||||
waitingForCommand = await this.#readCommand(socket)
|
||||
}
|
||||
}
|
||||
|
||||
#handleClientData(client, data) {}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import NbdClient from '../index.mjs'
|
||||
import NbdClient from '../client.mjs'
|
||||
import { spawn, exec } from 'node:child_process'
|
||||
import fs from 'node:fs/promises'
|
||||
import { test } from 'tap'
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/backups": "^0.40.0",
|
||||
"@xen-orchestra/fs": "^4.0.1",
|
||||
"filenamify": "^4.1.0",
|
||||
"filenamify": "^6.0.0",
|
||||
"getopts": "^2.2.5",
|
||||
"lodash": "^4.17.15",
|
||||
"promise-toolbox": "^0.21.0"
|
||||
|
||||
@@ -18,7 +18,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
|
||||
const vdiRefs = await xapi.VM_getDisks(baseVm.$ref)
|
||||
for (const vdiRef of vdiRefs) {
|
||||
const vdi = xapi.getObject(vdiRef)
|
||||
if (vdi.$SR.uuid !== this._heathCheckSr.uuid) {
|
||||
if (vdi.$SR.uuid !== this._healthCheckSr.uuid) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,8 +30,8 @@
|
||||
"@xen-orchestra/fs": "^4.0.1",
|
||||
"@xen-orchestra/log": "^0.6.0",
|
||||
"@xen-orchestra/template": "^0.1.0",
|
||||
"compare-versions": "^5.0.1",
|
||||
"d3-time-format": "^3.0.0",
|
||||
"compare-versions": "^6.0.0",
|
||||
"d3-time-format": "^4.1.0",
|
||||
"decorator-synchronized": "^0.6.0",
|
||||
"golike-defer": "^0.5.1",
|
||||
"limit-concurrency-decorator": "^0.5.0",
|
||||
|
||||
@@ -1,2 +1,4 @@
|
||||
// Keeping this file to prevent applying the global monorepo config for now
|
||||
module.exports = {};
|
||||
module.exports = {
|
||||
trailingComma: "es5",
|
||||
};
|
||||
|
||||
@@ -4,53 +4,6 @@ All collections of `XenApiRecord` are stored inside the `xapiCollectionStore`.
|
||||
|
||||
To retrieve a collection, invoke `useXapiCollectionStore().get(type)`.
|
||||
|
||||
## TL;DR - How to extend a subscription
|
||||
|
||||
_**Note:** Once the extension grows in complexity, it's recommended to create a dedicated file for it (e.g. `host.extension.ts` for `host.store.ts`)._
|
||||
|
||||
```typescript
|
||||
type MyExtension1 = Extension<{ propA: string }>;
|
||||
|
||||
type MyExtension2 = Extension<{ propB: string }, { withB: true }>;
|
||||
|
||||
type Extensions = [
|
||||
XenApiRecordExtension<XenApiHost>, // If needed
|
||||
DeferExtension, // If needed
|
||||
MyExtension1,
|
||||
MyExtension2
|
||||
];
|
||||
|
||||
export const useHostStore = defineStore("host", () => {
|
||||
const hostCollection = useXapiCollectionStore().get("console");
|
||||
|
||||
const subscribe = <O extends Options<Extensions>>(options?: O) => {
|
||||
const originalSubscription = hostCollection.subscribe(options);
|
||||
|
||||
const myExtension1: PartialSubscription<MyExtension1> = {
|
||||
propA: "Hello",
|
||||
};
|
||||
|
||||
const myExtension2: PartialSubscription<MyExtension2> | undefined =
|
||||
options?.withB
|
||||
? {
|
||||
propB: "World",
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
...originalSubscription,
|
||||
...myExtension1,
|
||||
...myExtension2,
|
||||
};
|
||||
};
|
||||
|
||||
return {
|
||||
...hostCollection,
|
||||
subscribe,
|
||||
};
|
||||
});
|
||||
```
|
||||
|
||||
## Accessing a collection
|
||||
|
||||
In order to use a collection, you'll need to subscribe to it.
|
||||
@@ -87,102 +40,71 @@ export const useConsoleStore = defineStore("console", () =>
|
||||
|
||||
To extend the base Subscription, you'll need to override the `subscribe` method.
|
||||
|
||||
For that, you can use the `createSubscribe<RawObjectType, Extensions>((options) => { /* ... */})` helper.
|
||||
|
||||
### Define the extensions
|
||||
|
||||
Subscription extensions are defined as a simple extension (`Extension<object>`) or as a conditional
|
||||
extension (`Extension<object, object>`).
|
||||
Subscription extensions are defined as `(object | [object, RequiredOptions])[]`.
|
||||
|
||||
When using a conditional extension, the corresponding `object` type will be added to the subscription only if
|
||||
the the options passed to `subscribe(options)` do match the second argument or `Extension`.
|
||||
|
||||
There is two existing extensions:
|
||||
|
||||
- `XenApiRecordExtension<T extends XenApiRecord>`: a simple extension which defined all the base
|
||||
properties and methods (`records`, `getByOpaqueRef`, `getByUuid`, etc.)
|
||||
- `DeferExtension`: a conditional extension which add the `start` and `isStarted` properties if the
|
||||
`immediate` option is set to `false`.
|
||||
When using a tuple (`[object, RequiredOptions]`), the corresponding `object` type will be added to the subscription if
|
||||
the `RequiredOptions` for that tuple are present in the options passed to `subscribe`.
|
||||
|
||||
```typescript
|
||||
// Always present extension
|
||||
type PropABExtension = Extension<{
|
||||
type DefaultExtension = {
|
||||
propA: string;
|
||||
propB: ComputedRef<number>;
|
||||
}>;
|
||||
};
|
||||
|
||||
// Conditional extension 1
|
||||
type PropCExtension = Extension<
|
||||
type FirstConditionalExtension = [
|
||||
{ propC: ComputedRef<string> }, // <- This signature will be added
|
||||
{ optC: string } // <- if this condition is met
|
||||
>;
|
||||
];
|
||||
|
||||
// Conditional extension 2
|
||||
type PropDExtension = Extension<
|
||||
type SecondConditionalExtension = [
|
||||
{ propD: () => void }, // <- This signature will be added
|
||||
{ optD: number } // <- if this condition is met
|
||||
>;
|
||||
];
|
||||
|
||||
// Create the extensions array
|
||||
type Extensions = [
|
||||
XenApiRecordExtension<XenApiHost>,
|
||||
DeferExtension,
|
||||
PropABExtension,
|
||||
PropCExtension,
|
||||
PropDExtension
|
||||
DefaultExtension,
|
||||
FirstConditionalExtension,
|
||||
SecondConditionalExtension
|
||||
];
|
||||
```
|
||||
|
||||
### Define the `subscribe` method
|
||||
|
||||
You can then create the `subscribe` function with the help of `Options` and `Subscription` helper types.
|
||||
|
||||
This will allow to get correct completion and type checking for the `options` argument, and to get the correct return
|
||||
type based on passed options.
|
||||
|
||||
```typescript
|
||||
const subscribe = <O extends Options<Extensions>>(options?: O) => {
|
||||
return {
|
||||
// ...
|
||||
} as Subscription<Extensions, O>;
|
||||
};
|
||||
```
|
||||
|
||||
### Extend the subscription
|
||||
|
||||
The `PartialSubscription` type will help to define and check the data to add to subscription for each extension.
|
||||
### Define the subscription
|
||||
|
||||
```typescript
|
||||
export const useConsoleStore = defineStore("console", () => {
|
||||
const consoleCollection = useXapiCollectionStore().get("console");
|
||||
|
||||
const subscribe = <O extends Options<Extensions>>(options?: O) => {
|
||||
const subscribe = createSubscribe<"console", Extensions>((options) => {
|
||||
const originalSubscription = consoleCollection.subscribe(options);
|
||||
|
||||
const propABSubscription: PartialSubscription<PropABExtension> = {
|
||||
const extendedSubscription = {
|
||||
propA: "Some string",
|
||||
propB: computed(() => 42),
|
||||
};
|
||||
|
||||
const propCSubscription: PartialSubscription<PropCExtension> | undefined =
|
||||
options?.optC !== undefined
|
||||
? {
|
||||
propC: computed(() => "Some other string"),
|
||||
}
|
||||
: undefined;
|
||||
const propCSubscription = options?.optC !== undefined && {
|
||||
propC: computed(() => "Some other string"),
|
||||
};
|
||||
|
||||
const propDSubscription: PartialSubscription<PropDExtension> | undefined =
|
||||
options?.optD !== undefined
|
||||
? {
|
||||
propD: () => console.log("Hello"),
|
||||
}
|
||||
: undefined;
|
||||
const propDSubscription = options?.optD !== undefined && {
|
||||
propD: () => console.log("Hello"),
|
||||
};
|
||||
|
||||
return {
|
||||
...originalSubscription,
|
||||
...propABSubscription,
|
||||
...extendedSubscription,
|
||||
...propCSubscription,
|
||||
...propDSubscription,
|
||||
};
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
...consoleCollection,
|
||||
@@ -203,18 +125,20 @@ type Options = {
|
||||
|
||||
### Use the subscription
|
||||
|
||||
In each case, all the default properties (`records`, `getByUuid`, etc.) will be present.
|
||||
|
||||
```typescript
|
||||
const store = useConsoleStore();
|
||||
|
||||
// No options (Contains common properties: `propA`, `propB`, `records`, `getByUuid`, etc.)
|
||||
const subscription1 = store.subscribe();
|
||||
// No options (propA and propB will be present)
|
||||
const subscription = store.subscribe();
|
||||
|
||||
// optC option (Contains common properties + `propC`)
|
||||
const subscription2 = store.subscribe({ optC: "Hello" });
|
||||
// optC option (propA, propB and propC will be present)
|
||||
const subscription = store.subscribe({ optC: "Hello" });
|
||||
|
||||
// optD option (Contains common properties + `propD`)
|
||||
const subscription3 = store.subscribe({ optD: 12 });
|
||||
// optD option (propA, propB and propD will be present)
|
||||
const subscription = store.subscribe({ optD: 12 });
|
||||
|
||||
// optC and optD options (Contains common properties + `propC` + `propD`)
|
||||
const subscription4 = store.subscribe({ optC: "Hello", optD: 12 });
|
||||
// optC and optD options (propA, propB, propC and propD will be present)
|
||||
const subscription = store.subscribe({ optC: "Hello", optD: 12 });
|
||||
```
|
||||
|
||||
@@ -49,7 +49,7 @@
|
||||
"@rushstack/eslint-patch": "^1.1.0",
|
||||
"@types/node": "^16.11.41",
|
||||
"@vitejs/plugin-vue": "^4.2.3",
|
||||
"@vue/eslint-config-prettier": "^7.0.0",
|
||||
"@vue/eslint-config-prettier": "^8.0.0",
|
||||
"@vue/eslint-config-typescript": "^11.0.0",
|
||||
"@vue/tsconfig": "^0.1.3",
|
||||
"eslint-plugin-vue": "^9.0.0",
|
||||
|
||||
@@ -5,10 +5,9 @@ import type {
|
||||
XenApiVm,
|
||||
VM_OPERATION,
|
||||
RawObjectType,
|
||||
XenApiHostMetrics,
|
||||
} from "@/libs/xen-api";
|
||||
import type { Filter } from "@/types/filter";
|
||||
import type { XenApiRecordSubscription } from "@/types/subscription";
|
||||
import type { Subscription } from "@/types/xapi-collection";
|
||||
import { faSquareCheck } from "@fortawesome/free-regular-svg-icons";
|
||||
import { faFont, faHashtag, faList } from "@fortawesome/free-solid-svg-icons";
|
||||
import { utcParse } from "d3-time-format";
|
||||
@@ -117,14 +116,14 @@ export function getStatsLength(stats?: object | any[]) {
|
||||
|
||||
export function isHostRunning(
|
||||
host: XenApiHost,
|
||||
hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics>
|
||||
hostMetricsSubscription: Subscription<"host_metrics", object>
|
||||
) {
|
||||
return hostMetricsSubscription.getByOpaqueRef(host.metrics)?.live === true;
|
||||
}
|
||||
|
||||
export function getHostMemory(
|
||||
host: XenApiHost,
|
||||
hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics>
|
||||
hostMetricsSubscription: Subscription<"host_metrics", object>
|
||||
) {
|
||||
const hostMetrics = hostMetricsSubscription.getByOpaqueRef(host.metrics);
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { buildXoObject, parseDateTime } from "@/libs/utils";
|
||||
import type { RawTypeToRecord } from "@/types/xapi-collection";
|
||||
import { JSONRPCClient } from "json-rpc-2.0";
|
||||
import { castArray } from "lodash-es";
|
||||
|
||||
@@ -174,45 +175,6 @@ export interface XenApiMessage<T extends RawObjectType = RawObjectType>
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export type XenApiAlarmType =
|
||||
| "cpu_usage"
|
||||
| "disk_usage"
|
||||
| "fs_usage"
|
||||
| "log_fs_usage"
|
||||
| "mem_usage"
|
||||
| "memory_free_kib"
|
||||
| "network_usage"
|
||||
| "physical_utilisation"
|
||||
| "sr_io_throughput_total_per_host";
|
||||
|
||||
export interface XenApiAlarm extends XenApiMessage {
|
||||
level: number;
|
||||
triggerLevel: number;
|
||||
type: XenApiAlarmType;
|
||||
}
|
||||
|
||||
export type RawTypeToRecord<T extends RawObjectType> = T extends "SR"
|
||||
? XenApiSr
|
||||
: T extends "VM"
|
||||
? XenApiVm
|
||||
: T extends "VM_guest_metrics"
|
||||
? XenApiVmGuestMetrics
|
||||
: T extends "VM_metrics"
|
||||
? XenApiVmMetrics
|
||||
: T extends "console"
|
||||
? XenApiConsole
|
||||
: T extends "host"
|
||||
? XenApiHost
|
||||
: T extends "host_metrics"
|
||||
? XenApiHostMetrics
|
||||
: T extends "message"
|
||||
? XenApiMessage
|
||||
: T extends "pool"
|
||||
? XenApiPool
|
||||
: T extends "task"
|
||||
? XenApiTask
|
||||
: never;
|
||||
|
||||
type WatchCallbackResult = {
|
||||
id: string;
|
||||
class: ObjectType;
|
||||
|
||||
@@ -1,33 +1,27 @@
|
||||
import type { XenApiAlarm } from "@/libs/xen-api";
|
||||
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
|
||||
import type {
|
||||
DeferExtension,
|
||||
Options,
|
||||
Subscription,
|
||||
XenApiRecordExtension,
|
||||
} from "@/types/subscription";
|
||||
import { createSubscribe } from "@/types/xapi-collection";
|
||||
import { defineStore } from "pinia";
|
||||
import { computed } from "vue";
|
||||
|
||||
type Extensions = [XenApiRecordExtension<XenApiAlarm>, DeferExtension];
|
||||
|
||||
export const useAlarmStore = defineStore("alarm", () => {
|
||||
const messageCollection = useXapiCollectionStore().get("message");
|
||||
|
||||
const subscribe = <O extends Options<Extensions>>(options?: O) => {
|
||||
const subscription = messageCollection.subscribe(options);
|
||||
const subscribe = createSubscribe<"message", []>((options) => {
|
||||
const originalSubscription = messageCollection.subscribe(options);
|
||||
|
||||
const extendedSubscription = {
|
||||
records: computed(() =>
|
||||
subscription.records.value.filter((record) => record.name === "alarm")
|
||||
originalSubscription.records.value.filter(
|
||||
(record) => record.name === "alarm"
|
||||
)
|
||||
),
|
||||
};
|
||||
|
||||
return {
|
||||
...subscription,
|
||||
...originalSubscription,
|
||||
...extendedSubscription,
|
||||
} as Subscription<Extensions, O>;
|
||||
};
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
...messageCollection,
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
import { isHostRunning } from "@/libs/utils";
|
||||
import type {
|
||||
GRANULARITY,
|
||||
HostStats,
|
||||
XapiStatsResponse,
|
||||
} from "@/libs/xapi-stats";
|
||||
import type { XenApiHost, XenApiHostMetrics } from "@/libs/xen-api";
|
||||
import { useXenApiStore } from "@/stores/xen-api.store";
|
||||
import type {
|
||||
Extension,
|
||||
XenApiRecordExtension,
|
||||
XenApiRecordSubscription,
|
||||
} from "@/types/subscription";
|
||||
import type { PartialSubscription } from "@/types/subscription";
|
||||
import { computed } from "vue";
|
||||
import type { ComputedRef } from "vue";
|
||||
|
||||
type GetStatsExtension = Extension<{
|
||||
getStats: (
|
||||
hostUuid: XenApiHost["uuid"],
|
||||
granularity: GRANULARITY,
|
||||
ignoreExpired: boolean,
|
||||
opts: { abortSignal?: AbortSignal }
|
||||
) => Promise<XapiStatsResponse<HostStats> | undefined> | undefined;
|
||||
}>;
|
||||
|
||||
type RunningHostsExtension = Extension<
|
||||
{ runningHosts: ComputedRef<XenApiHost[]> },
|
||||
{ hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics> }
|
||||
>;
|
||||
|
||||
export type HostExtensions = [
|
||||
XenApiRecordExtension<XenApiHost>,
|
||||
GetStatsExtension,
|
||||
RunningHostsExtension
|
||||
];
|
||||
|
||||
export const getStatsSubscription = (
|
||||
hostSubscription: XenApiRecordSubscription<XenApiHost>
|
||||
): PartialSubscription<GetStatsExtension> => {
|
||||
const xenApiStore = useXenApiStore();
|
||||
|
||||
return {
|
||||
getStats: (
|
||||
hostUuid,
|
||||
granularity,
|
||||
ignoreExpired = false,
|
||||
{ abortSignal }
|
||||
) => {
|
||||
const host = hostSubscription.getByUuid(hostUuid);
|
||||
|
||||
if (host === undefined) {
|
||||
throw new Error(`Host ${hostUuid} could not be found.`);
|
||||
}
|
||||
|
||||
const xapiStats = xenApiStore.isConnected
|
||||
? xenApiStore.getXapiStats()
|
||||
: undefined;
|
||||
|
||||
return xapiStats?._getAndUpdateStats<HostStats>({
|
||||
abortSignal,
|
||||
host,
|
||||
ignoreExpired,
|
||||
uuid: host.uuid,
|
||||
granularity,
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
export const runningHostsSubscription = (
|
||||
hostSubscription: XenApiRecordSubscription<XenApiHost>,
|
||||
hostMetricsSubscription:
|
||||
| XenApiRecordSubscription<XenApiHostMetrics>
|
||||
| undefined
|
||||
): PartialSubscription<RunningHostsExtension> | undefined => {
|
||||
if (hostMetricsSubscription === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return {
|
||||
runningHosts: computed(() =>
|
||||
hostSubscription.records.value.filter((host) =>
|
||||
isHostRunning(host, hostMetricsSubscription)
|
||||
)
|
||||
),
|
||||
};
|
||||
};
|
||||
@@ -1,28 +1,88 @@
|
||||
import { sortRecordsByNameLabel } from "@/libs/utils";
|
||||
import type { HostExtensions } from "@/stores/host.extension";
|
||||
import {
|
||||
getStatsSubscription,
|
||||
runningHostsSubscription,
|
||||
} from "@/stores/host.extension";
|
||||
import { isHostRunning, sortRecordsByNameLabel } from "@/libs/utils";
|
||||
import type {
|
||||
GRANULARITY,
|
||||
HostStats,
|
||||
XapiStatsResponse,
|
||||
} from "@/libs/xapi-stats";
|
||||
import type { XenApiHost } from "@/libs/xen-api";
|
||||
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
|
||||
import type { Options, Subscription } from "@/types/subscription";
|
||||
import { useXenApiStore } from "@/stores/xen-api.store";
|
||||
import type { Subscription } from "@/types/xapi-collection";
|
||||
import { createSubscribe } from "@/types/xapi-collection";
|
||||
import { defineStore } from "pinia";
|
||||
import { computed, type ComputedRef } from "vue";
|
||||
|
||||
type GetStats = (
|
||||
hostUuid: XenApiHost["uuid"],
|
||||
granularity: GRANULARITY,
|
||||
ignoreExpired: boolean,
|
||||
opts: { abortSignal?: AbortSignal }
|
||||
) => Promise<XapiStatsResponse<HostStats> | undefined> | undefined;
|
||||
|
||||
type GetStatsExtension = {
|
||||
getStats: GetStats;
|
||||
};
|
||||
|
||||
type RunningHostsExtension = [
|
||||
{ runningHosts: ComputedRef<XenApiHost[]> },
|
||||
{ hostMetricsSubscription: Subscription<"host_metrics", any> }
|
||||
];
|
||||
|
||||
type Extensions = [GetStatsExtension, RunningHostsExtension];
|
||||
|
||||
export const useHostStore = defineStore("host", () => {
|
||||
const xenApiStore = useXenApiStore();
|
||||
const hostCollection = useXapiCollectionStore().get("host");
|
||||
|
||||
hostCollection.setSort(sortRecordsByNameLabel);
|
||||
|
||||
const subscribe = <O extends Options<HostExtensions>>(options?: O) => {
|
||||
const subscription = hostCollection.subscribe(options);
|
||||
const { hostMetricsSubscription } = options ?? {};
|
||||
const subscribe = createSubscribe<"host", Extensions>((options) => {
|
||||
const originalSubscription = hostCollection.subscribe(options);
|
||||
|
||||
const getStats: GetStats = (
|
||||
hostUuid,
|
||||
granularity,
|
||||
ignoreExpired = false,
|
||||
{ abortSignal }
|
||||
) => {
|
||||
const host = originalSubscription.getByUuid(hostUuid);
|
||||
|
||||
if (host === undefined) {
|
||||
throw new Error(`Host ${hostUuid} could not be found.`);
|
||||
}
|
||||
|
||||
const xapiStats = xenApiStore.isConnected
|
||||
? xenApiStore.getXapiStats()
|
||||
: undefined;
|
||||
|
||||
return xapiStats?._getAndUpdateStats<HostStats>({
|
||||
abortSignal,
|
||||
host,
|
||||
ignoreExpired,
|
||||
uuid: host.uuid,
|
||||
granularity,
|
||||
});
|
||||
};
|
||||
|
||||
const extendedSubscription = {
|
||||
getStats,
|
||||
};
|
||||
|
||||
const hostMetricsSubscription = options?.hostMetricsSubscription;
|
||||
|
||||
const runningHostsSubscription = hostMetricsSubscription !== undefined && {
|
||||
runningHosts: computed(() =>
|
||||
originalSubscription.records.value.filter((host) =>
|
||||
isHostRunning(host, hostMetricsSubscription)
|
||||
)
|
||||
),
|
||||
};
|
||||
return {
|
||||
...subscription,
|
||||
...getStatsSubscription(subscription),
|
||||
...runningHostsSubscription(subscription, hostMetricsSubscription),
|
||||
} as Subscription<HostExtensions, O>;
|
||||
};
|
||||
...originalSubscription,
|
||||
...extendedSubscription,
|
||||
...runningHostsSubscription,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
...hostCollection,
|
||||
|
||||
@@ -1,36 +1,31 @@
|
||||
import { getFirst } from "@/libs/utils";
|
||||
import type { XenApiPool } from "@/libs/xen-api";
|
||||
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
|
||||
import type {
|
||||
Extension,
|
||||
Options,
|
||||
PartialSubscription,
|
||||
Subscription,
|
||||
XenApiRecordExtension,
|
||||
} from "@/types/subscription";
|
||||
import { createSubscribe } from "@/types/xapi-collection";
|
||||
import { defineStore } from "pinia";
|
||||
import { computed, type ComputedRef } from "vue";
|
||||
|
||||
type PoolExtension = Extension<{
|
||||
type PoolExtension = {
|
||||
pool: ComputedRef<XenApiPool | undefined>;
|
||||
}>;
|
||||
};
|
||||
|
||||
type Extensions = [XenApiRecordExtension<XenApiPool>, PoolExtension];
|
||||
type Extensions = [PoolExtension];
|
||||
|
||||
export const usePoolStore = defineStore("pool", () => {
|
||||
const poolCollection = useXapiCollectionStore().get("pool");
|
||||
const subscribe = <O extends Options<Extensions>>(options?: O) => {
|
||||
const subscription = poolCollection.subscribe(options);
|
||||
|
||||
const extendedSubscription: PartialSubscription<PoolExtension> = {
|
||||
pool: computed(() => getFirst(subscription.records.value)),
|
||||
const subscribe = createSubscribe<"pool", Extensions>((options) => {
|
||||
const originalSubscription = poolCollection.subscribe(options);
|
||||
|
||||
const extendedSubscription = {
|
||||
pool: computed(() => getFirst(originalSubscription.records.value)),
|
||||
};
|
||||
|
||||
return {
|
||||
...subscription,
|
||||
...originalSubscription,
|
||||
...extendedSubscription,
|
||||
} as Subscription<Extensions, O>;
|
||||
};
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
...poolCollection,
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
import useArrayRemovedItemsHistory from "@/composables/array-removed-items-history.composable";
|
||||
import useCollectionFilter from "@/composables/collection-filter.composable";
|
||||
import useCollectionSorter from "@/composables/collection-sorter.composable";
|
||||
import useFilteredCollection from "@/composables/filtered-collection.composable";
|
||||
import useSortedCollection from "@/composables/sorted-collection.composable";
|
||||
import type { XenApiTask } from "@/libs/xen-api";
|
||||
import type {
|
||||
Extension,
|
||||
PartialSubscription,
|
||||
XenApiRecordExtension,
|
||||
XenApiRecordSubscription,
|
||||
} from "@/types/subscription";
|
||||
import type { ComputedRef, Ref } from "vue";
|
||||
|
||||
type AdditionalTasksExtension = Extension<{
|
||||
pendingTasks: ComputedRef<XenApiTask[]>;
|
||||
finishedTasks: Ref<XenApiTask[]>;
|
||||
}>;
|
||||
|
||||
export type TaskExtensions = [
|
||||
XenApiRecordExtension<XenApiTask>,
|
||||
AdditionalTasksExtension
|
||||
];
|
||||
|
||||
export const additionalTasksSubscription = (
|
||||
taskSubscription: XenApiRecordSubscription<XenApiTask>
|
||||
): PartialSubscription<AdditionalTasksExtension> => {
|
||||
const { compareFn } = useCollectionSorter<XenApiTask>({
|
||||
initialSorts: ["-created"],
|
||||
});
|
||||
|
||||
const { predicate } = useCollectionFilter({
|
||||
initialFilters: [
|
||||
"!name_label:|(SR.scan host.call_plugin)",
|
||||
"status:pending",
|
||||
],
|
||||
});
|
||||
|
||||
const sortedTasks = useSortedCollection(taskSubscription.records, compareFn);
|
||||
|
||||
return {
|
||||
pendingTasks: useFilteredCollection<XenApiTask>(sortedTasks, predicate),
|
||||
finishedTasks: useArrayRemovedItemsHistory(
|
||||
sortedTasks,
|
||||
(task) => task.uuid,
|
||||
{
|
||||
limit: 50,
|
||||
onRemove: (tasks) =>
|
||||
tasks.map((task) => ({
|
||||
...task,
|
||||
finished: new Date().toISOString(),
|
||||
})),
|
||||
}
|
||||
),
|
||||
};
|
||||
};
|
||||
@@ -1,22 +1,64 @@
|
||||
import {
|
||||
additionalTasksSubscription,
|
||||
type TaskExtensions,
|
||||
} from "@/stores/task.extension";
|
||||
import useArrayRemovedItemsHistory from "@/composables/array-removed-items-history.composable";
|
||||
import useCollectionFilter from "@/composables/collection-filter.composable";
|
||||
import useCollectionSorter from "@/composables/collection-sorter.composable";
|
||||
import useFilteredCollection from "@/composables/filtered-collection.composable";
|
||||
import useSortedCollection from "@/composables/sorted-collection.composable";
|
||||
import type { XenApiTask } from "@/libs/xen-api";
|
||||
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
|
||||
import type { Options, Subscription } from "@/types/subscription";
|
||||
import { createSubscribe } from "@/types/xapi-collection";
|
||||
import { defineStore } from "pinia";
|
||||
import type { ComputedRef, Ref } from "vue";
|
||||
|
||||
type PendingTasksExtension = {
|
||||
pendingTasks: ComputedRef<XenApiTask[]>;
|
||||
};
|
||||
|
||||
type FinishedTasksExtension = {
|
||||
finishedTasks: Ref<XenApiTask[]>;
|
||||
};
|
||||
|
||||
type Extensions = [PendingTasksExtension, FinishedTasksExtension];
|
||||
|
||||
export const useTaskStore = defineStore("task", () => {
|
||||
const tasksCollection = useXapiCollectionStore().get("task");
|
||||
|
||||
const subscribe = <O extends Options<TaskExtensions>>(options?: O) => {
|
||||
const subscription = tasksCollection.subscribe(options);
|
||||
const subscribe = createSubscribe<"task", Extensions>(() => {
|
||||
const subscription = tasksCollection.subscribe();
|
||||
|
||||
const { compareFn } = useCollectionSorter<XenApiTask>({
|
||||
initialSorts: ["-created"],
|
||||
});
|
||||
|
||||
const sortedTasks = useSortedCollection(subscription.records, compareFn);
|
||||
|
||||
const { predicate } = useCollectionFilter({
|
||||
initialFilters: [
|
||||
"!name_label:|(SR.scan host.call_plugin)",
|
||||
"status:pending",
|
||||
],
|
||||
});
|
||||
|
||||
const extendedSubscription = {
|
||||
pendingTasks: useFilteredCollection<XenApiTask>(sortedTasks, predicate),
|
||||
finishedTasks: useArrayRemovedItemsHistory(
|
||||
sortedTasks,
|
||||
(task) => task.uuid,
|
||||
{
|
||||
limit: 50,
|
||||
onRemove: (tasks) =>
|
||||
tasks.map((task) => ({
|
||||
...task,
|
||||
finished: new Date().toISOString(),
|
||||
})),
|
||||
}
|
||||
),
|
||||
};
|
||||
|
||||
return {
|
||||
...subscription,
|
||||
...additionalTasksSubscription(subscription),
|
||||
} as Subscription<TaskExtensions, O>;
|
||||
};
|
||||
...extendedSubscription,
|
||||
};
|
||||
});
|
||||
|
||||
return { ...tasksCollection, subscribe };
|
||||
});
|
||||
|
||||
@@ -1,108 +0,0 @@
|
||||
import type {
|
||||
GRANULARITY,
|
||||
VmStats,
|
||||
XapiStatsResponse,
|
||||
} from "@/libs/xapi-stats";
|
||||
import { POWER_STATE, type XenApiHost, type XenApiVm } from "@/libs/xen-api";
|
||||
import { useXenApiStore } from "@/stores/xen-api.store";
|
||||
import type {
|
||||
Extension,
|
||||
PartialSubscription,
|
||||
XenApiRecordExtension,
|
||||
XenApiRecordSubscription,
|
||||
} from "@/types/subscription";
|
||||
import { computed, type ComputedRef } from "vue";
|
||||
|
||||
type RecordsByHostRefExtension = Extension<{
|
||||
recordsByHostRef: ComputedRef<Map<XenApiHost["$ref"], XenApiVm[]>>;
|
||||
}>;
|
||||
|
||||
type RunningVmsExtension = Extension<{
|
||||
runningVms: ComputedRef<XenApiVm[]>;
|
||||
}>;
|
||||
|
||||
type GetStatsExtension = Extension<
|
||||
{
|
||||
getStats: (
|
||||
id: XenApiVm["uuid"],
|
||||
granularity: GRANULARITY,
|
||||
ignoreExpired: boolean,
|
||||
opts: { abortSignal?: AbortSignal }
|
||||
) => Promise<XapiStatsResponse<VmStats> | undefined> | undefined;
|
||||
},
|
||||
{ hostSubscription: XenApiRecordSubscription<XenApiHost> }
|
||||
>;
|
||||
|
||||
export type VmExtensions = [
|
||||
XenApiRecordExtension<XenApiVm>,
|
||||
RecordsByHostRefExtension,
|
||||
RunningVmsExtension,
|
||||
GetStatsExtension
|
||||
];
|
||||
|
||||
export const recordsByHostRefSubscription = (
|
||||
vmSubscription: XenApiRecordSubscription<XenApiVm>
|
||||
): PartialSubscription<RecordsByHostRefExtension> => ({
|
||||
recordsByHostRef: computed(() => {
|
||||
const vmsByHostOpaqueRef = new Map<XenApiHost["$ref"], XenApiVm[]>();
|
||||
|
||||
vmSubscription.records.value.forEach((vm) => {
|
||||
if (!vmsByHostOpaqueRef.has(vm.resident_on)) {
|
||||
vmsByHostOpaqueRef.set(vm.resident_on, []);
|
||||
}
|
||||
|
||||
vmsByHostOpaqueRef.get(vm.resident_on)?.push(vm);
|
||||
});
|
||||
|
||||
return vmsByHostOpaqueRef;
|
||||
}),
|
||||
});
|
||||
|
||||
export const runningVmsSubscription = (
|
||||
vmSubscription: XenApiRecordSubscription<XenApiVm>
|
||||
): PartialSubscription<RunningVmsExtension> => ({
|
||||
runningVms: computed(() =>
|
||||
vmSubscription.records.value.filter(
|
||||
(vm) => vm.power_state === POWER_STATE.RUNNING
|
||||
)
|
||||
),
|
||||
});
|
||||
|
||||
export const getStatsSubscription = (
|
||||
vmSubscription: XenApiRecordSubscription<XenApiVm>,
|
||||
hostSubscription: XenApiRecordSubscription<XenApiHost> | undefined
|
||||
): PartialSubscription<GetStatsExtension> | undefined => {
|
||||
if (hostSubscription === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
return {
|
||||
getStats: (id, granularity, ignoreExpired = false, { abortSignal }) => {
|
||||
const xenApiStore = useXenApiStore();
|
||||
|
||||
if (!xenApiStore.isConnected) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const vm = vmSubscription.getByUuid(id);
|
||||
|
||||
if (vm === undefined) {
|
||||
throw new Error(`VM ${id} could not be found.`);
|
||||
}
|
||||
|
||||
const host = hostSubscription.getByOpaqueRef(vm.resident_on);
|
||||
|
||||
if (host === undefined) {
|
||||
throw new Error(`VM ${id} is halted or host could not be found.`);
|
||||
}
|
||||
|
||||
return xenApiStore.getXapiStats()._getAndUpdateStats<VmStats>({
|
||||
abortSignal,
|
||||
host,
|
||||
ignoreExpired,
|
||||
uuid: vm.uuid,
|
||||
granularity,
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
@@ -1,13 +1,36 @@
|
||||
import { sortRecordsByNameLabel } from "@/libs/utils";
|
||||
import {
|
||||
getStatsSubscription,
|
||||
recordsByHostRefSubscription,
|
||||
runningVmsSubscription,
|
||||
type VmExtensions,
|
||||
} from "@/stores/vm.extension";
|
||||
import type {
|
||||
GRANULARITY,
|
||||
VmStats,
|
||||
XapiStatsResponse,
|
||||
} from "@/libs/xapi-stats";
|
||||
import type { XenApiHost, XenApiVm } from "@/libs/xen-api";
|
||||
import { POWER_STATE } from "@/libs/xen-api";
|
||||
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
|
||||
import type { Options, Subscription } from "@/types/subscription";
|
||||
import { useXenApiStore } from "@/stores/xen-api.store";
|
||||
import { createSubscribe, type Subscription } from "@/types/xapi-collection";
|
||||
import { defineStore } from "pinia";
|
||||
import { computed, type ComputedRef } from "vue";
|
||||
|
||||
type GetStats = (
|
||||
id: XenApiVm["uuid"],
|
||||
granularity: GRANULARITY,
|
||||
ignoreExpired: boolean,
|
||||
opts: { abortSignal?: AbortSignal }
|
||||
) => Promise<XapiStatsResponse<VmStats> | undefined> | undefined;
|
||||
type DefaultExtension = {
|
||||
recordsByHostRef: ComputedRef<Map<XenApiHost["$ref"], XenApiVm[]>>;
|
||||
runningVms: ComputedRef<XenApiVm[]>;
|
||||
};
|
||||
|
||||
type GetStatsExtension = [
|
||||
{
|
||||
getStats: GetStats;
|
||||
},
|
||||
{ hostSubscription: Subscription<"host", object> }
|
||||
];
|
||||
|
||||
type Extensions = [DefaultExtension, GetStatsExtension];
|
||||
|
||||
export const useVmStore = defineStore("vm", () => {
|
||||
const vmCollection = useXapiCollectionStore().get("VM");
|
||||
@@ -18,16 +41,82 @@ export const useVmStore = defineStore("vm", () => {
|
||||
|
||||
vmCollection.setSort(sortRecordsByNameLabel);
|
||||
|
||||
const subscribe = <O extends Options<VmExtensions>>(options?: O) => {
|
||||
const subscription = vmCollection.subscribe(options);
|
||||
const subscribe = createSubscribe<"VM", Extensions>((options) => {
|
||||
const originalSubscription = vmCollection.subscribe(options);
|
||||
|
||||
const extendedSubscription = {
|
||||
recordsByHostRef: computed(() => {
|
||||
const vmsByHostOpaqueRef = new Map<XenApiHost["$ref"], XenApiVm[]>();
|
||||
|
||||
originalSubscription.records.value.forEach((vm) => {
|
||||
if (!vmsByHostOpaqueRef.has(vm.resident_on)) {
|
||||
vmsByHostOpaqueRef.set(vm.resident_on, []);
|
||||
}
|
||||
|
||||
vmsByHostOpaqueRef.get(vm.resident_on)?.push(vm);
|
||||
});
|
||||
|
||||
return vmsByHostOpaqueRef;
|
||||
}),
|
||||
runningVms: computed(() =>
|
||||
originalSubscription.records.value.filter(
|
||||
(vm) => vm.power_state === POWER_STATE.RUNNING
|
||||
)
|
||||
),
|
||||
};
|
||||
|
||||
const hostSubscription = options?.hostSubscription;
|
||||
|
||||
const getStatsSubscription:
|
||||
| {
|
||||
getStats: GetStats;
|
||||
}
|
||||
| undefined =
|
||||
hostSubscription !== undefined
|
||||
? {
|
||||
getStats: (
|
||||
id,
|
||||
granularity,
|
||||
ignoreExpired = false,
|
||||
{ abortSignal }
|
||||
) => {
|
||||
const xenApiStore = useXenApiStore();
|
||||
|
||||
if (!xenApiStore.isConnected) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const vm = originalSubscription.getByUuid(id);
|
||||
|
||||
if (vm === undefined) {
|
||||
throw new Error(`VM ${id} could not be found.`);
|
||||
}
|
||||
|
||||
const host = hostSubscription.getByOpaqueRef(vm.resident_on);
|
||||
|
||||
if (host === undefined) {
|
||||
throw new Error(
|
||||
`VM ${id} is halted or host could not be found.`
|
||||
);
|
||||
}
|
||||
|
||||
return xenApiStore.getXapiStats()._getAndUpdateStats<VmStats>({
|
||||
abortSignal,
|
||||
host,
|
||||
ignoreExpired,
|
||||
uuid: vm.uuid,
|
||||
granularity,
|
||||
});
|
||||
},
|
||||
}
|
||||
: undefined;
|
||||
|
||||
return {
|
||||
...subscription,
|
||||
...recordsByHostRefSubscription(subscription),
|
||||
...runningVmsSubscription(subscription),
|
||||
...getStatsSubscription(subscription, options?.hostSubscription),
|
||||
} as Subscription<VmExtensions, O>;
|
||||
};
|
||||
...originalSubscription,
|
||||
...extendedSubscription,
|
||||
...getStatsSubscription,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
...vmCollection,
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import type { RawObjectType, RawTypeToRecord } from "@/libs/xen-api";
|
||||
import type { RawObjectType } from "@/libs/xen-api";
|
||||
import { useXenApiStore } from "@/stores/xen-api.store";
|
||||
import type {
|
||||
DeferExtension,
|
||||
Options,
|
||||
RawTypeToRecord,
|
||||
SubscribeOptions,
|
||||
Subscription,
|
||||
XenApiRecordExtension,
|
||||
} from "@/types/subscription";
|
||||
} from "@/types/xapi-collection";
|
||||
import { tryOnUnmounted, whenever } from "@vueuse/core";
|
||||
import { defineStore } from "pinia";
|
||||
import { computed, readonly, ref } from "vue";
|
||||
@@ -15,7 +14,7 @@ export const useXapiCollectionStore = defineStore("xapiCollection", () => {
|
||||
|
||||
function get<T extends RawObjectType>(
|
||||
type: T
|
||||
): ReturnType<typeof createXapiCollection<T>> {
|
||||
): ReturnType<typeof createXapiCollection<T, RawTypeToRecord<T>>> {
|
||||
if (!collections.value.has(type)) {
|
||||
collections.value.set(type, createXapiCollection(type));
|
||||
}
|
||||
@@ -28,7 +27,7 @@ export const useXapiCollectionStore = defineStore("xapiCollection", () => {
|
||||
|
||||
const createXapiCollection = <
|
||||
T extends RawObjectType,
|
||||
R extends RawTypeToRecord<T> = RawTypeToRecord<T>
|
||||
R extends RawTypeToRecord<T>
|
||||
>(
|
||||
type: T
|
||||
) => {
|
||||
@@ -107,11 +106,9 @@ const createXapiCollection = <
|
||||
() => fetchAll()
|
||||
);
|
||||
|
||||
type Extensions = [XenApiRecordExtension<R>, DeferExtension];
|
||||
|
||||
function subscribe<O extends Options<Extensions>>(
|
||||
function subscribe<O extends SubscribeOptions<any>>(
|
||||
options?: O
|
||||
): Subscription<Extensions, O> {
|
||||
): Subscription<T, O> {
|
||||
const id = Symbol();
|
||||
|
||||
tryOnUnmounted(() => {
|
||||
@@ -134,14 +131,14 @@ const createXapiCollection = <
|
||||
|
||||
if (options?.immediate !== false) {
|
||||
start();
|
||||
return subscription as Subscription<Extensions, O>;
|
||||
return subscription as unknown as Subscription<T, O>;
|
||||
}
|
||||
|
||||
return {
|
||||
...subscription,
|
||||
start,
|
||||
isStarted: computed(() => subscriptions.value.has(id)),
|
||||
} as Subscription<Extensions, O>;
|
||||
} as unknown as Subscription<T, O>;
|
||||
}
|
||||
|
||||
const unsubscribe = (id: symbol) => subscriptions.value.delete(id);
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
import type { XenApiRecord } from "@/libs/xen-api";
|
||||
import type { ComputedRef, Ref } from "vue";
|
||||
|
||||
type SimpleExtension<Value extends object> = { type: "simple"; value: Value };
|
||||
|
||||
type ConditionalExtension<Value extends object, Condition extends object> = {
|
||||
type: "conditional";
|
||||
value: Value;
|
||||
condition: Condition;
|
||||
};
|
||||
|
||||
type UnpackExtension<E, Options> = E extends SimpleExtension<infer Value>
|
||||
? Value
|
||||
: E extends ConditionalExtension<infer Value, infer Condition>
|
||||
? Options extends Condition
|
||||
? Value
|
||||
: object
|
||||
: object;
|
||||
|
||||
export type Extension<
|
||||
Value extends object,
|
||||
Condition extends object | undefined = undefined
|
||||
> = Condition extends object
|
||||
? ConditionalExtension<Value, Condition>
|
||||
: SimpleExtension<Value>;
|
||||
|
||||
export type Options<Extensions extends any[]> = Extensions extends [
|
||||
infer First,
|
||||
...infer Rest
|
||||
]
|
||||
? First extends ConditionalExtension<any, infer Condition>
|
||||
? Rest extends any[]
|
||||
? Partial<Condition> & Options<Rest>
|
||||
: Partial<Condition>
|
||||
: Rest extends any[]
|
||||
? Options<Rest>
|
||||
: object
|
||||
: object;
|
||||
|
||||
export type Subscription<
|
||||
Extensions extends any[],
|
||||
Options extends object
|
||||
> = Extensions extends [infer First, ...infer Rest]
|
||||
? UnpackExtension<First, Options> & Subscription<Rest, Options>
|
||||
: object;
|
||||
|
||||
export type PartialSubscription<E> = E extends SimpleExtension<infer Value>
|
||||
? Value
|
||||
: E extends ConditionalExtension<infer Value, any>
|
||||
? Value
|
||||
: never;
|
||||
|
||||
export type XenApiRecordExtension<T extends XenApiRecord<any>> = Extension<{
|
||||
records: ComputedRef<T[]>;
|
||||
getByOpaqueRef: (opaqueRef: T["$ref"]) => T | undefined;
|
||||
getByUuid: (uuid: T["uuid"]) => T | undefined;
|
||||
hasUuid: (uuid: T["uuid"]) => boolean;
|
||||
isReady: Readonly<Ref<boolean>>;
|
||||
isFetching: Readonly<Ref<boolean>>;
|
||||
isReloading: ComputedRef<boolean>;
|
||||
hasError: ComputedRef<boolean>;
|
||||
lastError: Readonly<Ref<string | undefined>>;
|
||||
}>;
|
||||
|
||||
export type DeferExtension = Extension<
|
||||
{
|
||||
start: () => void;
|
||||
isStarted: ComputedRef<boolean>;
|
||||
},
|
||||
{ immediate: false }
|
||||
>;
|
||||
|
||||
export type XenApiRecordSubscription<T extends XenApiRecord<any>> =
|
||||
PartialSubscription<XenApiRecordExtension<T>>;
|
||||
108
@xen-orchestra/lite/src/types/xapi-collection.ts
Normal file
108
@xen-orchestra/lite/src/types/xapi-collection.ts
Normal file
@@ -0,0 +1,108 @@
|
||||
import type {
|
||||
RawObjectType,
|
||||
XenApiConsole,
|
||||
XenApiHost,
|
||||
XenApiHostMetrics,
|
||||
XenApiMessage,
|
||||
XenApiPool,
|
||||
XenApiSr,
|
||||
XenApiTask,
|
||||
XenApiVm,
|
||||
XenApiVmGuestMetrics,
|
||||
XenApiVmMetrics,
|
||||
} from "@/libs/xen-api";
|
||||
import type { ComputedRef, Ref } from "vue";
|
||||
|
||||
type DefaultExtension<
|
||||
T extends RawObjectType,
|
||||
R extends RawTypeToRecord<T> = RawTypeToRecord<T>
|
||||
> = {
|
||||
records: ComputedRef<R[]>;
|
||||
getByOpaqueRef: (opaqueRef: R["$ref"]) => R | undefined;
|
||||
getByUuid: (uuid: R["uuid"]) => R | undefined;
|
||||
hasUuid: (uuid: R["uuid"]) => boolean;
|
||||
isReady: Readonly<Ref<boolean>>;
|
||||
isFetching: Readonly<Ref<boolean>>;
|
||||
isReloading: ComputedRef<boolean>;
|
||||
hasError: ComputedRef<boolean>;
|
||||
lastError: Readonly<Ref<string | undefined>>;
|
||||
};
|
||||
|
||||
type DeferExtension = [
|
||||
{
|
||||
start: () => void;
|
||||
isStarted: ComputedRef<boolean>;
|
||||
},
|
||||
{ immediate: false }
|
||||
];
|
||||
|
||||
type DefaultExtensions<T extends RawObjectType> = [
|
||||
DefaultExtension<T>,
|
||||
DeferExtension
|
||||
];
|
||||
|
||||
type GenerateSubscribeOptions<Extensions extends any[]> = Extensions extends [
|
||||
infer FirstExtension,
|
||||
...infer RestExtension
|
||||
]
|
||||
? FirstExtension extends [object, infer FirstCondition]
|
||||
? FirstCondition & GenerateSubscribeOptions<RestExtension>
|
||||
: GenerateSubscribeOptions<RestExtension>
|
||||
: object;
|
||||
|
||||
export type SubscribeOptions<Extensions extends any[]> = Partial<
|
||||
GenerateSubscribeOptions<Extensions> &
|
||||
GenerateSubscribeOptions<DefaultExtensions<any>>
|
||||
>;
|
||||
|
||||
type GenerateSubscription<
|
||||
Options extends object,
|
||||
Extensions extends any[]
|
||||
> = Extensions extends [infer FirstExtension, ...infer RestExtension]
|
||||
? FirstExtension extends [infer FirstObject, infer FirstCondition]
|
||||
? Options extends FirstCondition
|
||||
? FirstObject & GenerateSubscription<Options, RestExtension>
|
||||
: GenerateSubscription<Options, RestExtension>
|
||||
: FirstExtension & GenerateSubscription<Options, RestExtension>
|
||||
: object;
|
||||
|
||||
export type Subscription<
|
||||
T extends RawObjectType,
|
||||
Options extends object,
|
||||
Extensions extends any[] = []
|
||||
> = GenerateSubscription<Options, Extensions> &
|
||||
GenerateSubscription<Options, DefaultExtensions<T>>;
|
||||
|
||||
export function createSubscribe<
|
||||
T extends RawObjectType,
|
||||
Extensions extends any[],
|
||||
Options extends object = SubscribeOptions<Extensions>
|
||||
>(builder: (options?: Options) => Subscription<T, Options, Extensions>) {
|
||||
return function subscribe<O extends Options>(
|
||||
options?: O
|
||||
): Subscription<T, O, Extensions> {
|
||||
return builder(options);
|
||||
};
|
||||
}
|
||||
|
||||
export type RawTypeToRecord<T extends RawObjectType> = T extends "SR"
|
||||
? XenApiSr
|
||||
: T extends "VM"
|
||||
? XenApiVm
|
||||
: T extends "VM_guest_metrics"
|
||||
? XenApiVmGuestMetrics
|
||||
: T extends "VM_metrics"
|
||||
? XenApiVmMetrics
|
||||
: T extends "console"
|
||||
? XenApiConsole
|
||||
: T extends "host"
|
||||
? XenApiHost
|
||||
: T extends "host_metrics"
|
||||
? XenApiHostMetrics
|
||||
: T extends "message"
|
||||
? XenApiMessage
|
||||
: T extends "pool"
|
||||
? XenApiPool
|
||||
: T extends "task"
|
||||
? XenApiTask
|
||||
: never;
|
||||
@@ -21,7 +21,6 @@ import { useI18n } from "vue-i18n";
|
||||
|
||||
const { pendingTasks, finishedTasks, isReady, hasError } =
|
||||
useTaskStore().subscribe();
|
||||
|
||||
const { t } = useI18n();
|
||||
|
||||
const titleStore = usePageTitleStore();
|
||||
|
||||
@@ -43,7 +43,7 @@
|
||||
"app-conf": "^2.3.0",
|
||||
"async-iterator-to-stream": "^1.1.0",
|
||||
"fs-extra": "^11.1.0",
|
||||
"get-stream": "^6.0.0",
|
||||
"get-stream": "^7.0.1",
|
||||
"getopts": "^2.2.3",
|
||||
"golike-defer": "^0.5.1",
|
||||
"http-server-plus": "^1.0.0",
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
"@vates/nbd-client": "^2.0.0",
|
||||
"@xen-orchestra/async-map": "^0.1.2",
|
||||
"@xen-orchestra/log": "^0.6.0",
|
||||
"d3-time-format": "^3.0.0",
|
||||
"d3-time-format": "^4.1.0",
|
||||
"golike-defer": "^0.5.1",
|
||||
"http-request-plus": "^1.0.0",
|
||||
"json-rpc-protocol": "^0.13.2",
|
||||
|
||||
@@ -6,7 +6,7 @@ import { decorateClass } from '@vates/decorate-with'
|
||||
import { strict as assert } from 'node:assert'
|
||||
|
||||
import extractOpaqueRef from './_extractOpaqueRef.mjs'
|
||||
import NbdClient from '@vates/nbd-client'
|
||||
import NbdClient from '@vates/nbd-client/client.mjs'
|
||||
import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js'
|
||||
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs'
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
> Users must be able to say: “I had this issue, happy to know it's fixed”
|
||||
|
||||
- [LDAP] Mark the _Id attribute_ setting as required
|
||||
- [Incremental Replication] Fix `TypeError: Cannot read properties of undefined (reading 'uuid') at #isAlreadyOnHealthCheckSr` [Forum#7492](https://xcp-ng.org/forum/topic/7492) (PR [#6969](https://github.com/vatesfr/xen-orchestra/pull/6969))
|
||||
|
||||
### Packages to release
|
||||
|
||||
@@ -29,6 +30,8 @@
|
||||
|
||||
<!--packages-start-->
|
||||
|
||||
- @xen-orchestra/backups patch
|
||||
- xen-api patch
|
||||
- xo-server patch
|
||||
- xo-server-auth-ldap patch
|
||||
- xo-web patch
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
"jest": "^29.0.3",
|
||||
"lint-staged": "^13.0.3",
|
||||
"lodash": "^4.17.4",
|
||||
"prettier": "^2.0.5",
|
||||
"prettier": "^3.0.1",
|
||||
"promise-toolbox": "^0.21.0",
|
||||
"semver": "^7.3.6",
|
||||
"sorted-object": "^2.0.1",
|
||||
@@ -98,7 +98,7 @@
|
||||
"prettify": "prettier --ignore-path .gitignore --ignore-unknown --write .",
|
||||
"test": "npm run test-lint && npm run test-unit",
|
||||
"test-integration": "jest \".integ\\.spec\\.js$\" && scripts/run-script.js --parallel test-integration",
|
||||
"test-lint": "eslint --ignore-path .gitignore --ignore-pattern packages/xo-web .",
|
||||
"test-lint": "eslint --ignore-path .gitignore --ignore-pattern @xen-orchestra/lite --ignore-pattern packages/xo-web .",
|
||||
"test-unit": "jest \"^(?!.*\\.integ\\.spec\\.js$)\" && scripts/run-script.js --bail test"
|
||||
},
|
||||
"workspaces": [
|
||||
|
||||
@@ -419,7 +419,7 @@ export class Xapi extends EventEmitter {
|
||||
signal: $cancelToken,
|
||||
}),
|
||||
{
|
||||
when: error => error.response.statusCode === 302,
|
||||
when: error => error.response !== undefined && error.response.statusCode === 302,
|
||||
onRetry: async error => {
|
||||
const response = error.response
|
||||
if (response === undefined) {
|
||||
|
||||
@@ -35,7 +35,7 @@
|
||||
"ensure-array": "^1.0.0",
|
||||
"exec-promise": "^0.7.0",
|
||||
"inquirer": "^8.0.0",
|
||||
"ldapts": "^4.1.0",
|
||||
"ldapts": "^6.0.0",
|
||||
"promise-toolbox": "^0.21.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -76,7 +76,7 @@
|
||||
"fast-xml-parser": "^4.0.0",
|
||||
"fatfs": "^0.10.4",
|
||||
"fs-extra": "^11.1.0",
|
||||
"get-stream": "^6.0.0",
|
||||
"get-stream": "^7.0.1",
|
||||
"golike-defer": "^0.5.1",
|
||||
"hashy": "^0.11.1",
|
||||
"helmet": "^3.9.0",
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import * as multiparty from 'multiparty'
|
||||
import assert from 'assert'
|
||||
import getStream from 'get-stream'
|
||||
import hrp from 'http-request-plus'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { defer } from 'golike-defer'
|
||||
import { format, JsonRpcError } from 'json-rpc-peer'
|
||||
import { getStreamAsBuffer } from 'get-stream'
|
||||
import { invalidParameters, noSuchObject } from 'xo-common/api-errors.js'
|
||||
import { pipeline } from 'stream'
|
||||
import { peekFooterFromVhdStream } from 'vhd-lib'
|
||||
@@ -187,7 +187,7 @@ async function handleImport(req, res, { type, name, description, vmdkData, srId,
|
||||
if (part.name !== 'file') {
|
||||
promises.push(
|
||||
(async () => {
|
||||
const buffer = await getStream.buffer(part)
|
||||
const buffer = await getStreamAsBuffer(part)
|
||||
vmdkData[part.name] = new Uint32Array(
|
||||
buffer.buffer,
|
||||
buffer.byteOffset,
|
||||
|
||||
@@ -80,7 +80,7 @@ set.params = {
|
||||
},
|
||||
shareByDefault: {
|
||||
type: 'boolean',
|
||||
optional: true
|
||||
optional: true,
|
||||
},
|
||||
subjects: {
|
||||
type: 'array',
|
||||
|
||||
@@ -4,12 +4,12 @@ import { asyncEach } from '@vates/async-each'
|
||||
import asyncMapSettled from '@xen-orchestra/async-map/legacy.js'
|
||||
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
|
||||
import concat from 'lodash/concat.js'
|
||||
import getStream from 'get-stream'
|
||||
import hrp from 'http-request-plus'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { defer } from 'golike-defer'
|
||||
import { format } from 'json-rpc-peer'
|
||||
import { FAIL_ON_QUEUE } from 'limit-concurrency-decorator'
|
||||
import { getStreamAsBuffer } from 'get-stream'
|
||||
import { ignoreErrors } from 'promise-toolbox'
|
||||
import { invalidParameters, noSuchObject, operationFailed, unauthorized } from 'xo-common/api-errors.js'
|
||||
import { Ref } from 'xen-api'
|
||||
@@ -1224,7 +1224,7 @@ async function handleVmImport(req, res, { data, srId, type, xapi }) {
|
||||
if (!(part.filename in tables)) {
|
||||
tables[part.filename] = {}
|
||||
}
|
||||
const buffer = await getStream.buffer(part)
|
||||
const buffer = await getStreamAsBuffer(part)
|
||||
tables[part.filename][part.name] = new Uint32Array(
|
||||
buffer.buffer,
|
||||
buffer.byteOffset,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import * as CM from 'complex-matcher'
|
||||
import getStream from 'get-stream'
|
||||
import { fromCallback } from 'promise-toolbox'
|
||||
import { getStreamAsBuffer } from 'get-stream'
|
||||
import { pipeline } from 'readable-stream'
|
||||
|
||||
import createNdJsonStream from '../_createNdJsonStream.mjs'
|
||||
@@ -81,7 +81,7 @@ getAllObjects.params = {
|
||||
export async function importConfig({ passphrase }) {
|
||||
return {
|
||||
$sendTo: await this.registerHttpRequest(async (req, res) => {
|
||||
await this.importConfig(await getStream.buffer(req), { passphrase })
|
||||
await this.importConfig(await getStreamAsBuffer(req), { passphrase })
|
||||
|
||||
res.end('config successfully imported')
|
||||
}),
|
||||
|
||||
@@ -533,8 +533,7 @@ const xoItemToRender = {
|
||||
<span>
|
||||
<Icon icon='xo-cloud-config' /> <ShortDate timestamp={createdAt} />
|
||||
</span>
|
||||
)
|
||||
,
|
||||
),
|
||||
// XO objects.
|
||||
pool: props => <Pool {...props} />,
|
||||
|
||||
|
||||
Reference in New Issue
Block a user