Compare commits

..

10 Commits

Author SHA1 Message Date
Florent BEAUCHAMP
3e1227c710 feat(nbd): implement server export from/to stream 2023-08-08 14:34:03 +02:00
Florent BEAUCHAMP
ddc73fb836 feat(nbd): server implementation working, no encryption 2023-08-08 11:21:39 +02:00
Florent BEAUCHAMP
a13fda5fe9 fix(backups/_MixinXapiWriter): typo _heathCheckSr → _healthCheckSr (#6969)
Fix `TypeError: Cannot read properties of undefined (reading 'uuid') at #isAlreadyOnHealthCheckSr`
2023-08-08 09:48:53 +02:00
Florent BEAUCHAMP
66bee59774 fix(xen-api/getResource): don't fail silently when HTTP request fails without response (#6970)
Seen while investigating zammad#16309
2023-08-08 09:39:18 +02:00
Julien Fontanet
685400bbf8 fix(xo-server): fix get-stream@3 usage
Fixes #6971

Introduced by 3dca7f2a7
2023-08-08 08:05:38 +02:00
Julien Fontanet
5bef8fc411 fix(lite): disable linting because it's broken
Introduced by 3dca7f2a7
2023-08-05 17:05:02 +02:00
Julien Fontanet
aa7ff1449a fix(lite): adapt ESLint config to prettier@3
Introduced by 3dca7f2a7
2023-08-04 22:09:55 +02:00
Julien Fontanet
3dca7f2a71 chore: update deps 2023-08-03 17:56:24 +02:00
Julien Fontanet
3dc2f649f6 chore: format with Prettier 2023-08-03 17:56:24 +02:00
Julien Fontanet
9eb537c2f9 chore: update dev deps 2023-08-03 17:56:24 +02:00
40 changed files with 2457 additions and 2096 deletions

View File

@@ -1,8 +1,11 @@
'use strict'
module.exports = {
arrowParens: 'avoid',
jsxSingleQuote: true,
semi: false,
singleQuote: true,
trailingComma: 'es5',
// 2020-11-24: Requested by nraynaud and approved by the rest of the team
//

View File

@@ -0,0 +1,32 @@
import NbdClient from "./client.mjs";
async function bench(){
const client = new NbdClient({
address:'localhost',
port: 9000,
exportname: 'bench_export'
})
await client.connect()
console.log('connected', client.exportSize)
for(let chunk_size=16*1024; chunk_size < 16*1024*1024; chunk_size *=2){
let i=0
const start = + new Date()
for await(const block of client.readBlocks(chunk_size) ){
i++
if((i*chunk_size) % (16*1024*1024) ===0){
process.stdout.write('.')
}
if(i*chunk_size > 1024*1024*1024) break
}
console.log(chunk_size,Math.round( (i*chunk_size/1024/1024*1000)/ (new Date() - start)))
}
await client.disconnect()
}
bench()

View File

@@ -74,7 +74,7 @@ export default class NbdClient {
this.#serverSocket = connect({
socket: this.#serverSocket,
rejectUnauthorized: false,
cert: this.#serverCert,
cert: this.#serverCert
})
this.#serverSocket.once('error', reject)
this.#serverSocket.once('secureConnect', () => {
@@ -88,7 +88,11 @@ export default class NbdClient {
async #unsecureConnect() {
this.#serverSocket = new Socket()
return new Promise((resolve, reject) => {
this.#serverSocket.connect(this.#serverPort, this.#serverAddress)
this.#serverSocket.connect({
port:this.#serverPort,
host: this.#serverAddress,
// @todo should test the onRead to limit buffer copy
})
this.#serverSocket.once('error', reject)
this.#serverSocket.once('connect', () => {
this.#serverSocket.removeListener('error', reject)
@@ -232,19 +236,20 @@ export default class NbdClient {
}
try {
this.#waitingForResponse = true
const magic = await this.#readInt32()
const buffer = await this.#read(4+4+8)
const magic = buffer.readUInt32BE()
if (magic !== NBD_REPLY_MAGIC) {
throw new Error(`magic number for block answer is wrong : ${magic} ${NBD_REPLY_MAGIC}`)
}
const error = await this.#readInt32()
const error = buffer.readUInt32BE(4)
if (error !== 0) {
// @todo use error code from constants.mjs
throw new Error(`GOT ERROR CODE : ${error}`)
}
const blockQueryId = await this.#readInt64()
const blockQueryId = buffer.readBigUInt64BE(8)
const query = this.#commandQueryBacklog.get(blockQueryId)
if (!query) {
throw new Error(` no query associated with id ${blockQueryId}`)
@@ -307,11 +312,11 @@ export default class NbdClient {
})
}
async *readBlocks(indexGenerator) {
async *readBlocks(indexGenerator = 2*1024*1024) {
// default : read all blocks
if (indexGenerator === undefined) {
if (typeof indexGenerator === 'number') {
const exportSize = this.#exportSize
const chunkSize = 2 * 1024 * 1024
const chunkSize = indexGenerator
indexGenerator = function* () {
const nbBlocks = Math.ceil(Number(exportSize / BigInt(chunkSize)))
for (let index = 0; BigInt(index) < nbBlocks; index++) {
@@ -319,12 +324,14 @@ export default class NbdClient {
}
}
}
const readAhead = []
const readAheadMaxLength = this.#readAhead
const makeReadBlockPromise = (index, size) => {
const promise = pRetry(() => this.readBlock(index, size), {
tries: this.#readBlockRetries,
onRetry: async err => {
console.error(err)
warn('will retry reading block ', index, err)
await this.reconnect()
},
@@ -336,6 +343,7 @@ export default class NbdClient {
// read all blocks, but try to keep readAheadMaxLength promise waiting ahead
for (const { index, size } of indexGenerator()) {
// stack readAheadMaxLength promises before starting to handle the results
if (readAhead.length === readAheadMaxLength) {
// any error will stop reading blocks
@@ -348,4 +356,4 @@ export default class NbdClient {
yield readAhead.shift()
}
}
}
}

View File

@@ -1,12 +1,40 @@
// https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md
export const INIT_PASSWD = Buffer.from('NBDMAGIC') // "NBDMAGIC" ensure we're connected to a nbd server
export const OPTS_MAGIC = Buffer.from('IHAVEOPT') // "IHAVEOPT" start an option block
export const NBD_OPT_REPLY_MAGIC = 1100100111001001n // magic received during negociation
export const NBD_OPT_EXPORT_NAME = 1
export const NBD_OPT_ABORT = 2
export const NBD_OPT_LIST = 3
export const NBD_OPT_STARTTLS = 5
export const NBD_OPT_INFO = 6
export const NBD_OPT_GO = 7
export const NBD_OPT_STRUCTURED_REPLY = 8
export const NBD_OPT_LIST_META_CONTEXT = 9
export const NBD_OPT_SET_META_CONTEXT = 10
export const NBD_OPT_EXTENDED_HEADERS = 11
export const NBD_REP_ACK =1
export const NBD_REP_SERVER = 2
export const NBD_REP_INFO = 3
export const NBD_REP_META_CONTEXT = 4
export const NBD_REP_ERR_UNSUP = 0x80000001 // 2^32+1
export const NBD_REP_ERR_POLICY = 0x80000002
export const NBD_REP_ERR_INVALID = 0x80000003
export const NBD_REP_ERR_PLATFORM = 0x80000004
export const NBD_REP_ERR_TLS_REQD = 0x80000005
export const NBD_REP_ERR_UNKNOWN = 0x80000006
export const NBD_REP_ERR_SHUTDOWN = 0x80000007
export const NBD_REP_ERR_BLOCK_SIZE_REQD = 0x80000008
export const NBD_REP_ERR_TOO_BIG = 0x80000009
export const NBD_REP_ERR_EXT_HEADER_REQD = 0x8000000a
export const NBD_INFO_EXPORT = 0
export const NBD_INFO_NAME = 1
export const NBD_INFO_DESCRIPTION = 2
export const NBD_INFO_BLOCK_SIZE = 3
export const NBD_FLAG_HAS_FLAGS = 1 << 0
export const NBD_FLAG_READ_ONLY = 1 << 1
@@ -14,6 +42,9 @@ export const NBD_FLAG_SEND_FLUSH = 1 << 2
export const NBD_FLAG_SEND_FUA = 1 << 3
export const NBD_FLAG_ROTATIONAL = 1 << 4
export const NBD_FLAG_SEND_TRIM = 1 << 5
export const NBD_FLAG_SEND_WRITE_ZEROES = 1 << 6
export const NBD_FLAG_SEND_DF = 1 << 7
export const NBD_FLAG_CAN_MULTI_CONN = 1 << 8
export const NBD_FLAG_FIXED_NEWSTYLE = 1 << 0
@@ -36,6 +67,15 @@ export const NBD_CMD_RESIZE = 8
export const NBD_REQUEST_MAGIC = 0x25609513 // magic number to create a new NBD request to send to the server
export const NBD_REPLY_MAGIC = 0x67446698 // magic number received from the server when reading response to a nbd request
export const NBD_REPLY_ACK = 1
export const NBD_SIMPLE_REPLY_MAGIC = 0x67446698
export const NBD_STRUCTURED_REPLY_MAGIC = 0x668e33ef
export const NBD_REPLY_TYPE_NONE = 0
export const NBD_REPLY_TYPE_OFFSET_DATA = 1
export const NBD_REPLY_TYPE_OFFSET_HOLE = 2
export const NBD_REPLY_TYPE_BLOCK_STATUS = 5
export const NBD_REPLY_TYPE_ERROR = 1 << 15 +1
export const NBD_REPLY_TYPE_ERROR_OFFSET = 1 << 15 +2
export const NBD_DEFAULT_PORT = 10809
export const NBD_DEFAULT_BLOCK_SIZE = 64 * 1024

View File

@@ -0,0 +1,292 @@
import assert, { deepEqual, strictEqual, notStrictEqual } from 'node:assert'
import { createServer } from 'node:net'
import { fromCallback } from 'promise-toolbox'
import { readChunkStrict } from '@vates/read-chunk'
import {
INIT_PASSWD,
NBD_CMD_READ,
NBD_DEFAULT_PORT,
NBD_FLAG_FIXED_NEWSTYLE,
NBD_FLAG_HAS_FLAGS,
NBD_OPT_EXPORT_NAME,
NBD_OPT_REPLY_MAGIC,
NBD_REPLY_ACK,
NBD_REQUEST_MAGIC,
OPTS_MAGIC,
NBD_CMD_DISC,
NBD_REP_ERR_UNSUP,
NBD_CMD_WRITE,
NBD_OPT_GO,
NBD_OPT_INFO,
NBD_INFO_EXPORT,
NBD_REP_INFO,
NBD_SIMPLE_REPLY_MAGIC,
NBD_REP_ERR_UNKNOWN,
} from './constants.mjs'
import { PassThrough } from 'node:stream'
export default class NbdServer {
#server
#clients = new Map()
constructor(port = NBD_DEFAULT_PORT) {
this.#server = createServer()
this.#server.listen(port)
this.#server.on('connection', client => this.#handleNewConnection(client))
}
// will wait for a client to connect and upload the file to this server
downloadStream(key) {
strictEqual(this.#clients.has(key), false)
const stream = new PassThrough()
const offset = BigInt(0)
this.#clients.set(key, { length: BigInt(2 * 1024 * 1024 * 1024 * 1024), stream, offset, key })
return stream
}
// will wait for a client to connect and downlaod this stream
uploadStream(key, source, length) {
strictEqual(this.#clients.has(key), false)
notStrictEqual(length, undefined)
const offset = BigInt(0)
this.#clients.set(key, { length: BigInt(length), stream: source, offset, key })
}
#read(socket, length) {
return readChunkStrict(socket, length)
}
async #readInt32(socket) {
const buffer = await this.#read(socket, 4)
return buffer.readUInt32BE()
}
#write(socket, buffer) {
return fromCallback.call(socket, 'write', buffer)
}
async #writeInt16(socket, int16) {
const buffer = Buffer.alloc(2)
buffer.writeUInt16BE(int16)
return this.#write(socket, buffer)
}
async #writeInt32(socket, int32) {
const buffer = Buffer.alloc(4)
buffer.writeUInt32BE(int32)
return this.#write(socket, buffer)
}
async #writeInt64(socket, int64) {
const buffer = Buffer.alloc(8)
buffer.writeBigUInt64BE(int64)
return this.#write(socket, buffer)
}
async #openExport(key) {
if (!this.#clients.has(key)) {
// export does not exists
const err = new Error('Export not found ')
err.code = 'ENOTFOUND'
throw err
}
const { length } = this.#clients.get(key)
return length
}
async #sendOptionResponse(socket, option, response, data = Buffer.alloc(0)) {
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
await this.#writeInt32(socket, option)
await this.#writeInt32(socket, response)
await this.#writeInt32(socket, data.length)
await this.#write(socket, data)
}
/**
*
* @param {Socket} socket
* @returns true if server is waiting for more options
*/
async #readOption(socket) {
console.log('wait for option')
const magic = await this.#read(socket, 8)
console.log(magic.toString('ascii'), magic.length, OPTS_MAGIC.toString('ascii'))
deepEqual(magic, OPTS_MAGIC)
const option = await this.#readInt32(socket)
const length = await this.#readInt32(socket)
console.log({ option, length })
const data = length > 0 ? await this.#read(socket, length) : undefined
switch (option) {
case NBD_OPT_EXPORT_NAME: {
const exportNameLength = data.readInt32BE()
const key = data.slice(4, exportNameLength + 4).toString()
let exportSize
try {
exportSize = await this.#openExport(key)
} catch (err) {
if (err.code === 'ENOTFOUND') {
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
return false
}
throw err
}
socket.key = key
await this.#writeInt64(socket, exportSize)
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
await this.#write(socket, Buffer.alloc(124) /* padding */)
return false
}
/*
case NBD_OPT_STARTTLS:
console.log('starttls')
// @todo not working
return true
*/
case NBD_OPT_GO:
case NBD_OPT_INFO: {
const exportNameLength = data.readInt32BE()
const key = data.slice(4, exportNameLength + 4).toString()
let exportSize
try {
exportSize = await this.#openExport(key)
} catch (err) {
if (err.code === 'ENOTFOUND') {
this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNKNOWN)
// @todo should disconnect
return false
}
throw err
}
socket.key = key
await this.#writeInt64(socket, NBD_OPT_REPLY_MAGIC)
await this.#writeInt32(socket, option)
await this.#writeInt32(socket, NBD_REP_INFO)
await this.#writeInt32(socket, 12)
// the export info
await this.#writeInt16(socket, NBD_INFO_EXPORT)
await this.#writeInt64(socket, exportSize)
await this.#writeInt16(socket, NBD_FLAG_HAS_FLAGS /* transmission flag */)
// an ACK at the end of the infos
await this.#sendOptionResponse(socket, option, NBD_REPLY_ACK) // no additionnal data
return option === NBD_OPT_INFO // we stays in option phase is option is INFO
}
default:
// not supported
console.log('not supported', option, length, data?.toString())
await this.#sendOptionResponse(socket, option, NBD_REP_ERR_UNSUP) // no additionnal data
// wait for next option
return true
}
}
async #readCommand(socket) {
const key = socket.key
// this socket has an export key
notStrictEqual(key, undefined)
// this export key is still valid
strictEqual(this.#clients.has(key), true)
const client = this.#clients.get(key)
const buffer = await this.#read(socket, 28)
const magic = buffer.readInt32BE(0)
strictEqual(magic, NBD_REQUEST_MAGIC)
/* const commandFlags = */ buffer.readInt16BE(4)
const command = buffer.readInt16BE(6)
const cookie = buffer.readBigUInt64BE(8)
const offset = buffer.readBigUInt64BE(16)
const length = buffer.readInt32BE(24)
switch (command) {
case NBD_CMD_DISC:
console.log('gotdisconnect', client.offset)
await client.stream?.destroy()
// @todo : disconnect
return false
case NBD_CMD_READ: {
/** simple replies */
// read length byte from offset in export
// the client is writing in contiguous mode
assert.strictEqual(offset, client.offset)
client.offset += BigInt(length)
const data = await readChunkStrict(client.stream, length)
const reply = Buffer.alloc(16)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0, 4) // no error
reply.writeBigInt64BE(cookie, 8)
await this.#write(socket, reply)
await this.#write(socket, data)
/* if we implement non stream read, we can handle read in parallel
const reply = Buffer.alloc(16+length)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0,4)// no error
reply.writeBigInt64BE(cookie,8)
// read length byte from offset in export directly in the given buffer
// may do multiple read in parallel on the same export
size += length
socket.fd.read(reply, 16, length, Number(offset))
.then(()=>{
return this.#write(socket, reply)
})
.catch(err => console.error('NBD_CMD_READ',err)) */
return true
}
case NBD_CMD_WRITE: {
// the client is writing in contiguous mode
assert.strictEqual(offset, client.offset)
const data = await this.#read(socket, length)
client.offset += BigInt(length)
await new Promise((resolve, reject) => {
if (!client.stream.write(data, 0, length, Number(offset))) {
client.stream.once('drain', err => (err ? reject(err) : resolve()))
} else {
process.nextTick(resolve)
}
})
const reply = Buffer.alloc(16)
reply.writeInt32BE(NBD_SIMPLE_REPLY_MAGIC)
reply.writeInt32BE(0, 4) // no error
reply.writeBigInt64BE(cookie, 8)
await this.#write(socket, reply)
return true
}
default:
console.log('GOT unsupported command ', command)
// fail to handle
return true
}
}
async #handleNewConnection(socket) {
const remoteAddress = socket.remoteAddress + ':' + socket.remotePort
console.log('new client connection from %s', remoteAddress)
socket.on('close', () => {
console.log('client ', remoteAddress, 'is done')
})
socket.on('error', error => {
throw error
})
// handshake
await this.#write(socket, INIT_PASSWD)
await this.#write(socket, OPTS_MAGIC)
// send flags , the bare minimum
await this.#writeInt16(socket, NBD_FLAG_FIXED_NEWSTYLE)
const clientFlag = await this.#readInt32(socket)
assert.strictEqual(clientFlag & NBD_FLAG_FIXED_NEWSTYLE, NBD_FLAG_FIXED_NEWSTYLE) // only FIXED_NEWSTYLE one is supported from the server options
// read client response flags
let waitingForOptions = true
while (waitingForOptions) {
waitingForOptions = await this.#readOption(socket)
}
let waitingForCommand = true
while (waitingForCommand) {
waitingForCommand = await this.#readCommand(socket)
}
}
#handleClientData(client, data) {}
}

View File

@@ -1,4 +1,4 @@
import NbdClient from '../index.mjs'
import NbdClient from '../client.mjs'
import { spawn, exec } from 'node:child_process'
import fs from 'node:fs/promises'
import { test } from 'tap'

View File

@@ -9,7 +9,7 @@
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/backups": "^0.40.0",
"@xen-orchestra/fs": "^4.0.1",
"filenamify": "^4.1.0",
"filenamify": "^6.0.0",
"getopts": "^2.2.5",
"lodash": "^4.17.15",
"promise-toolbox": "^0.21.0"

View File

@@ -18,7 +18,7 @@ export const MixinXapiWriter = (BaseClass = Object) =>
const vdiRefs = await xapi.VM_getDisks(baseVm.$ref)
for (const vdiRef of vdiRefs) {
const vdi = xapi.getObject(vdiRef)
if (vdi.$SR.uuid !== this._heathCheckSr.uuid) {
if (vdi.$SR.uuid !== this._healthCheckSr.uuid) {
return false
}
}

View File

@@ -30,8 +30,8 @@
"@xen-orchestra/fs": "^4.0.1",
"@xen-orchestra/log": "^0.6.0",
"@xen-orchestra/template": "^0.1.0",
"compare-versions": "^5.0.1",
"d3-time-format": "^3.0.0",
"compare-versions": "^6.0.0",
"d3-time-format": "^4.1.0",
"decorator-synchronized": "^0.6.0",
"golike-defer": "^0.5.1",
"limit-concurrency-decorator": "^0.5.0",

View File

@@ -1,2 +1,4 @@
// Keeping this file to prevent applying the global monorepo config for now
module.exports = {};
module.exports = {
trailingComma: "es5",
};

View File

@@ -4,53 +4,6 @@ All collections of `XenApiRecord` are stored inside the `xapiCollectionStore`.
To retrieve a collection, invoke `useXapiCollectionStore().get(type)`.
## TL;DR - How to extend a subscription
_**Note:** Once the extension grows in complexity, it's recommended to create a dedicated file for it (e.g. `host.extension.ts` for `host.store.ts`)._
```typescript
type MyExtension1 = Extension<{ propA: string }>;
type MyExtension2 = Extension<{ propB: string }, { withB: true }>;
type Extensions = [
XenApiRecordExtension<XenApiHost>, // If needed
DeferExtension, // If needed
MyExtension1,
MyExtension2
];
export const useHostStore = defineStore("host", () => {
const hostCollection = useXapiCollectionStore().get("console");
const subscribe = <O extends Options<Extensions>>(options?: O) => {
const originalSubscription = hostCollection.subscribe(options);
const myExtension1: PartialSubscription<MyExtension1> = {
propA: "Hello",
};
const myExtension2: PartialSubscription<MyExtension2> | undefined =
options?.withB
? {
propB: "World",
}
: undefined;
return {
...originalSubscription,
...myExtension1,
...myExtension2,
};
};
return {
...hostCollection,
subscribe,
};
});
```
## Accessing a collection
In order to use a collection, you'll need to subscribe to it.
@@ -87,102 +40,71 @@ export const useConsoleStore = defineStore("console", () =>
To extend the base Subscription, you'll need to override the `subscribe` method.
For that, you can use the `createSubscribe<RawObjectType, Extensions>((options) => { /* ... */})` helper.
### Define the extensions
Subscription extensions are defined as a simple extension (`Extension<object>`) or as a conditional
extension (`Extension<object, object>`).
Subscription extensions are defined as `(object | [object, RequiredOptions])[]`.
When using a conditional extension, the corresponding `object` type will be added to the subscription only if
the the options passed to `subscribe(options)` do match the second argument or `Extension`.
There is two existing extensions:
- `XenApiRecordExtension<T extends XenApiRecord>`: a simple extension which defined all the base
properties and methods (`records`, `getByOpaqueRef`, `getByUuid`, etc.)
- `DeferExtension`: a conditional extension which add the `start` and `isStarted` properties if the
`immediate` option is set to `false`.
When using a tuple (`[object, RequiredOptions]`), the corresponding `object` type will be added to the subscription if
the `RequiredOptions` for that tuple are present in the options passed to `subscribe`.
```typescript
// Always present extension
type PropABExtension = Extension<{
type DefaultExtension = {
propA: string;
propB: ComputedRef<number>;
}>;
};
// Conditional extension 1
type PropCExtension = Extension<
type FirstConditionalExtension = [
{ propC: ComputedRef<string> }, // <- This signature will be added
{ optC: string } // <- if this condition is met
>;
];
// Conditional extension 2
type PropDExtension = Extension<
type SecondConditionalExtension = [
{ propD: () => void }, // <- This signature will be added
{ optD: number } // <- if this condition is met
>;
];
// Create the extensions array
type Extensions = [
XenApiRecordExtension<XenApiHost>,
DeferExtension,
PropABExtension,
PropCExtension,
PropDExtension
DefaultExtension,
FirstConditionalExtension,
SecondConditionalExtension
];
```
### Define the `subscribe` method
You can then create the `subscribe` function with the help of `Options` and `Subscription` helper types.
This will allow to get correct completion and type checking for the `options` argument, and to get the correct return
type based on passed options.
```typescript
const subscribe = <O extends Options<Extensions>>(options?: O) => {
return {
// ...
} as Subscription<Extensions, O>;
};
```
### Extend the subscription
The `PartialSubscription` type will help to define and check the data to add to subscription for each extension.
### Define the subscription
```typescript
export const useConsoleStore = defineStore("console", () => {
const consoleCollection = useXapiCollectionStore().get("console");
const subscribe = <O extends Options<Extensions>>(options?: O) => {
const subscribe = createSubscribe<"console", Extensions>((options) => {
const originalSubscription = consoleCollection.subscribe(options);
const propABSubscription: PartialSubscription<PropABExtension> = {
const extendedSubscription = {
propA: "Some string",
propB: computed(() => 42),
};
const propCSubscription: PartialSubscription<PropCExtension> | undefined =
options?.optC !== undefined
? {
propC: computed(() => "Some other string"),
}
: undefined;
const propCSubscription = options?.optC !== undefined && {
propC: computed(() => "Some other string"),
};
const propDSubscription: PartialSubscription<PropDExtension> | undefined =
options?.optD !== undefined
? {
propD: () => console.log("Hello"),
}
: undefined;
const propDSubscription = options?.optD !== undefined && {
propD: () => console.log("Hello"),
};
return {
...originalSubscription,
...propABSubscription,
...extendedSubscription,
...propCSubscription,
...propDSubscription,
};
};
});
return {
...consoleCollection,
@@ -203,18 +125,20 @@ type Options = {
### Use the subscription
In each case, all the default properties (`records`, `getByUuid`, etc.) will be present.
```typescript
const store = useConsoleStore();
// No options (Contains common properties: `propA`, `propB`, `records`, `getByUuid`, etc.)
const subscription1 = store.subscribe();
// No options (propA and propB will be present)
const subscription = store.subscribe();
// optC option (Contains common properties + `propC`)
const subscription2 = store.subscribe({ optC: "Hello" });
// optC option (propA, propB and propC will be present)
const subscription = store.subscribe({ optC: "Hello" });
// optD option (Contains common properties + `propD`)
const subscription3 = store.subscribe({ optD: 12 });
// optD option (propA, propB and propD will be present)
const subscription = store.subscribe({ optD: 12 });
// optC and optD options (Contains common properties + `propC` + `propD`)
const subscription4 = store.subscribe({ optC: "Hello", optD: 12 });
// optC and optD options (propA, propB, propC and propD will be present)
const subscription = store.subscribe({ optC: "Hello", optD: 12 });
```

View File

@@ -49,7 +49,7 @@
"@rushstack/eslint-patch": "^1.1.0",
"@types/node": "^16.11.41",
"@vitejs/plugin-vue": "^4.2.3",
"@vue/eslint-config-prettier": "^7.0.0",
"@vue/eslint-config-prettier": "^8.0.0",
"@vue/eslint-config-typescript": "^11.0.0",
"@vue/tsconfig": "^0.1.3",
"eslint-plugin-vue": "^9.0.0",

View File

@@ -5,10 +5,9 @@ import type {
XenApiVm,
VM_OPERATION,
RawObjectType,
XenApiHostMetrics,
} from "@/libs/xen-api";
import type { Filter } from "@/types/filter";
import type { XenApiRecordSubscription } from "@/types/subscription";
import type { Subscription } from "@/types/xapi-collection";
import { faSquareCheck } from "@fortawesome/free-regular-svg-icons";
import { faFont, faHashtag, faList } from "@fortawesome/free-solid-svg-icons";
import { utcParse } from "d3-time-format";
@@ -117,14 +116,14 @@ export function getStatsLength(stats?: object | any[]) {
export function isHostRunning(
host: XenApiHost,
hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics>
hostMetricsSubscription: Subscription<"host_metrics", object>
) {
return hostMetricsSubscription.getByOpaqueRef(host.metrics)?.live === true;
}
export function getHostMemory(
host: XenApiHost,
hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics>
hostMetricsSubscription: Subscription<"host_metrics", object>
) {
const hostMetrics = hostMetricsSubscription.getByOpaqueRef(host.metrics);

View File

@@ -1,4 +1,5 @@
import { buildXoObject, parseDateTime } from "@/libs/utils";
import type { RawTypeToRecord } from "@/types/xapi-collection";
import { JSONRPCClient } from "json-rpc-2.0";
import { castArray } from "lodash-es";
@@ -174,45 +175,6 @@ export interface XenApiMessage<T extends RawObjectType = RawObjectType>
timestamp: string;
}
export type XenApiAlarmType =
| "cpu_usage"
| "disk_usage"
| "fs_usage"
| "log_fs_usage"
| "mem_usage"
| "memory_free_kib"
| "network_usage"
| "physical_utilisation"
| "sr_io_throughput_total_per_host";
export interface XenApiAlarm extends XenApiMessage {
level: number;
triggerLevel: number;
type: XenApiAlarmType;
}
export type RawTypeToRecord<T extends RawObjectType> = T extends "SR"
? XenApiSr
: T extends "VM"
? XenApiVm
: T extends "VM_guest_metrics"
? XenApiVmGuestMetrics
: T extends "VM_metrics"
? XenApiVmMetrics
: T extends "console"
? XenApiConsole
: T extends "host"
? XenApiHost
: T extends "host_metrics"
? XenApiHostMetrics
: T extends "message"
? XenApiMessage
: T extends "pool"
? XenApiPool
: T extends "task"
? XenApiTask
: never;
type WatchCallbackResult = {
id: string;
class: ObjectType;

View File

@@ -1,33 +1,27 @@
import type { XenApiAlarm } from "@/libs/xen-api";
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
import type {
DeferExtension,
Options,
Subscription,
XenApiRecordExtension,
} from "@/types/subscription";
import { createSubscribe } from "@/types/xapi-collection";
import { defineStore } from "pinia";
import { computed } from "vue";
type Extensions = [XenApiRecordExtension<XenApiAlarm>, DeferExtension];
export const useAlarmStore = defineStore("alarm", () => {
const messageCollection = useXapiCollectionStore().get("message");
const subscribe = <O extends Options<Extensions>>(options?: O) => {
const subscription = messageCollection.subscribe(options);
const subscribe = createSubscribe<"message", []>((options) => {
const originalSubscription = messageCollection.subscribe(options);
const extendedSubscription = {
records: computed(() =>
subscription.records.value.filter((record) => record.name === "alarm")
originalSubscription.records.value.filter(
(record) => record.name === "alarm"
)
),
};
return {
...subscription,
...originalSubscription,
...extendedSubscription,
} as Subscription<Extensions, O>;
};
};
});
return {
...messageCollection,

View File

@@ -1,88 +0,0 @@
import { isHostRunning } from "@/libs/utils";
import type {
GRANULARITY,
HostStats,
XapiStatsResponse,
} from "@/libs/xapi-stats";
import type { XenApiHost, XenApiHostMetrics } from "@/libs/xen-api";
import { useXenApiStore } from "@/stores/xen-api.store";
import type {
Extension,
XenApiRecordExtension,
XenApiRecordSubscription,
} from "@/types/subscription";
import type { PartialSubscription } from "@/types/subscription";
import { computed } from "vue";
import type { ComputedRef } from "vue";
type GetStatsExtension = Extension<{
getStats: (
hostUuid: XenApiHost["uuid"],
granularity: GRANULARITY,
ignoreExpired: boolean,
opts: { abortSignal?: AbortSignal }
) => Promise<XapiStatsResponse<HostStats> | undefined> | undefined;
}>;
type RunningHostsExtension = Extension<
{ runningHosts: ComputedRef<XenApiHost[]> },
{ hostMetricsSubscription: XenApiRecordSubscription<XenApiHostMetrics> }
>;
export type HostExtensions = [
XenApiRecordExtension<XenApiHost>,
GetStatsExtension,
RunningHostsExtension
];
export const getStatsSubscription = (
hostSubscription: XenApiRecordSubscription<XenApiHost>
): PartialSubscription<GetStatsExtension> => {
const xenApiStore = useXenApiStore();
return {
getStats: (
hostUuid,
granularity,
ignoreExpired = false,
{ abortSignal }
) => {
const host = hostSubscription.getByUuid(hostUuid);
if (host === undefined) {
throw new Error(`Host ${hostUuid} could not be found.`);
}
const xapiStats = xenApiStore.isConnected
? xenApiStore.getXapiStats()
: undefined;
return xapiStats?._getAndUpdateStats<HostStats>({
abortSignal,
host,
ignoreExpired,
uuid: host.uuid,
granularity,
});
},
};
};
export const runningHostsSubscription = (
hostSubscription: XenApiRecordSubscription<XenApiHost>,
hostMetricsSubscription:
| XenApiRecordSubscription<XenApiHostMetrics>
| undefined
): PartialSubscription<RunningHostsExtension> | undefined => {
if (hostMetricsSubscription === undefined) {
return undefined;
}
return {
runningHosts: computed(() =>
hostSubscription.records.value.filter((host) =>
isHostRunning(host, hostMetricsSubscription)
)
),
};
};

View File

@@ -1,28 +1,88 @@
import { sortRecordsByNameLabel } from "@/libs/utils";
import type { HostExtensions } from "@/stores/host.extension";
import {
getStatsSubscription,
runningHostsSubscription,
} from "@/stores/host.extension";
import { isHostRunning, sortRecordsByNameLabel } from "@/libs/utils";
import type {
GRANULARITY,
HostStats,
XapiStatsResponse,
} from "@/libs/xapi-stats";
import type { XenApiHost } from "@/libs/xen-api";
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
import type { Options, Subscription } from "@/types/subscription";
import { useXenApiStore } from "@/stores/xen-api.store";
import type { Subscription } from "@/types/xapi-collection";
import { createSubscribe } from "@/types/xapi-collection";
import { defineStore } from "pinia";
import { computed, type ComputedRef } from "vue";
type GetStats = (
hostUuid: XenApiHost["uuid"],
granularity: GRANULARITY,
ignoreExpired: boolean,
opts: { abortSignal?: AbortSignal }
) => Promise<XapiStatsResponse<HostStats> | undefined> | undefined;
type GetStatsExtension = {
getStats: GetStats;
};
type RunningHostsExtension = [
{ runningHosts: ComputedRef<XenApiHost[]> },
{ hostMetricsSubscription: Subscription<"host_metrics", any> }
];
type Extensions = [GetStatsExtension, RunningHostsExtension];
export const useHostStore = defineStore("host", () => {
const xenApiStore = useXenApiStore();
const hostCollection = useXapiCollectionStore().get("host");
hostCollection.setSort(sortRecordsByNameLabel);
const subscribe = <O extends Options<HostExtensions>>(options?: O) => {
const subscription = hostCollection.subscribe(options);
const { hostMetricsSubscription } = options ?? {};
const subscribe = createSubscribe<"host", Extensions>((options) => {
const originalSubscription = hostCollection.subscribe(options);
const getStats: GetStats = (
hostUuid,
granularity,
ignoreExpired = false,
{ abortSignal }
) => {
const host = originalSubscription.getByUuid(hostUuid);
if (host === undefined) {
throw new Error(`Host ${hostUuid} could not be found.`);
}
const xapiStats = xenApiStore.isConnected
? xenApiStore.getXapiStats()
: undefined;
return xapiStats?._getAndUpdateStats<HostStats>({
abortSignal,
host,
ignoreExpired,
uuid: host.uuid,
granularity,
});
};
const extendedSubscription = {
getStats,
};
const hostMetricsSubscription = options?.hostMetricsSubscription;
const runningHostsSubscription = hostMetricsSubscription !== undefined && {
runningHosts: computed(() =>
originalSubscription.records.value.filter((host) =>
isHostRunning(host, hostMetricsSubscription)
)
),
};
return {
...subscription,
...getStatsSubscription(subscription),
...runningHostsSubscription(subscription, hostMetricsSubscription),
} as Subscription<HostExtensions, O>;
};
...originalSubscription,
...extendedSubscription,
...runningHostsSubscription,
};
});
return {
...hostCollection,

View File

@@ -1,36 +1,31 @@
import { getFirst } from "@/libs/utils";
import type { XenApiPool } from "@/libs/xen-api";
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
import type {
Extension,
Options,
PartialSubscription,
Subscription,
XenApiRecordExtension,
} from "@/types/subscription";
import { createSubscribe } from "@/types/xapi-collection";
import { defineStore } from "pinia";
import { computed, type ComputedRef } from "vue";
type PoolExtension = Extension<{
type PoolExtension = {
pool: ComputedRef<XenApiPool | undefined>;
}>;
};
type Extensions = [XenApiRecordExtension<XenApiPool>, PoolExtension];
type Extensions = [PoolExtension];
export const usePoolStore = defineStore("pool", () => {
const poolCollection = useXapiCollectionStore().get("pool");
const subscribe = <O extends Options<Extensions>>(options?: O) => {
const subscription = poolCollection.subscribe(options);
const extendedSubscription: PartialSubscription<PoolExtension> = {
pool: computed(() => getFirst(subscription.records.value)),
const subscribe = createSubscribe<"pool", Extensions>((options) => {
const originalSubscription = poolCollection.subscribe(options);
const extendedSubscription = {
pool: computed(() => getFirst(originalSubscription.records.value)),
};
return {
...subscription,
...originalSubscription,
...extendedSubscription,
} as Subscription<Extensions, O>;
};
};
});
return {
...poolCollection,

View File

@@ -1,56 +0,0 @@
import useArrayRemovedItemsHistory from "@/composables/array-removed-items-history.composable";
import useCollectionFilter from "@/composables/collection-filter.composable";
import useCollectionSorter from "@/composables/collection-sorter.composable";
import useFilteredCollection from "@/composables/filtered-collection.composable";
import useSortedCollection from "@/composables/sorted-collection.composable";
import type { XenApiTask } from "@/libs/xen-api";
import type {
Extension,
PartialSubscription,
XenApiRecordExtension,
XenApiRecordSubscription,
} from "@/types/subscription";
import type { ComputedRef, Ref } from "vue";
type AdditionalTasksExtension = Extension<{
pendingTasks: ComputedRef<XenApiTask[]>;
finishedTasks: Ref<XenApiTask[]>;
}>;
export type TaskExtensions = [
XenApiRecordExtension<XenApiTask>,
AdditionalTasksExtension
];
export const additionalTasksSubscription = (
taskSubscription: XenApiRecordSubscription<XenApiTask>
): PartialSubscription<AdditionalTasksExtension> => {
const { compareFn } = useCollectionSorter<XenApiTask>({
initialSorts: ["-created"],
});
const { predicate } = useCollectionFilter({
initialFilters: [
"!name_label:|(SR.scan host.call_plugin)",
"status:pending",
],
});
const sortedTasks = useSortedCollection(taskSubscription.records, compareFn);
return {
pendingTasks: useFilteredCollection<XenApiTask>(sortedTasks, predicate),
finishedTasks: useArrayRemovedItemsHistory(
sortedTasks,
(task) => task.uuid,
{
limit: 50,
onRemove: (tasks) =>
tasks.map((task) => ({
...task,
finished: new Date().toISOString(),
})),
}
),
};
};

View File

@@ -1,22 +1,64 @@
import {
additionalTasksSubscription,
type TaskExtensions,
} from "@/stores/task.extension";
import useArrayRemovedItemsHistory from "@/composables/array-removed-items-history.composable";
import useCollectionFilter from "@/composables/collection-filter.composable";
import useCollectionSorter from "@/composables/collection-sorter.composable";
import useFilteredCollection from "@/composables/filtered-collection.composable";
import useSortedCollection from "@/composables/sorted-collection.composable";
import type { XenApiTask } from "@/libs/xen-api";
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
import type { Options, Subscription } from "@/types/subscription";
import { createSubscribe } from "@/types/xapi-collection";
import { defineStore } from "pinia";
import type { ComputedRef, Ref } from "vue";
type PendingTasksExtension = {
pendingTasks: ComputedRef<XenApiTask[]>;
};
type FinishedTasksExtension = {
finishedTasks: Ref<XenApiTask[]>;
};
type Extensions = [PendingTasksExtension, FinishedTasksExtension];
export const useTaskStore = defineStore("task", () => {
const tasksCollection = useXapiCollectionStore().get("task");
const subscribe = <O extends Options<TaskExtensions>>(options?: O) => {
const subscription = tasksCollection.subscribe(options);
const subscribe = createSubscribe<"task", Extensions>(() => {
const subscription = tasksCollection.subscribe();
const { compareFn } = useCollectionSorter<XenApiTask>({
initialSorts: ["-created"],
});
const sortedTasks = useSortedCollection(subscription.records, compareFn);
const { predicate } = useCollectionFilter({
initialFilters: [
"!name_label:|(SR.scan host.call_plugin)",
"status:pending",
],
});
const extendedSubscription = {
pendingTasks: useFilteredCollection<XenApiTask>(sortedTasks, predicate),
finishedTasks: useArrayRemovedItemsHistory(
sortedTasks,
(task) => task.uuid,
{
limit: 50,
onRemove: (tasks) =>
tasks.map((task) => ({
...task,
finished: new Date().toISOString(),
})),
}
),
};
return {
...subscription,
...additionalTasksSubscription(subscription),
} as Subscription<TaskExtensions, O>;
};
...extendedSubscription,
};
});
return { ...tasksCollection, subscribe };
});

View File

@@ -1,108 +0,0 @@
import type {
GRANULARITY,
VmStats,
XapiStatsResponse,
} from "@/libs/xapi-stats";
import { POWER_STATE, type XenApiHost, type XenApiVm } from "@/libs/xen-api";
import { useXenApiStore } from "@/stores/xen-api.store";
import type {
Extension,
PartialSubscription,
XenApiRecordExtension,
XenApiRecordSubscription,
} from "@/types/subscription";
import { computed, type ComputedRef } from "vue";
type RecordsByHostRefExtension = Extension<{
recordsByHostRef: ComputedRef<Map<XenApiHost["$ref"], XenApiVm[]>>;
}>;
type RunningVmsExtension = Extension<{
runningVms: ComputedRef<XenApiVm[]>;
}>;
type GetStatsExtension = Extension<
{
getStats: (
id: XenApiVm["uuid"],
granularity: GRANULARITY,
ignoreExpired: boolean,
opts: { abortSignal?: AbortSignal }
) => Promise<XapiStatsResponse<VmStats> | undefined> | undefined;
},
{ hostSubscription: XenApiRecordSubscription<XenApiHost> }
>;
export type VmExtensions = [
XenApiRecordExtension<XenApiVm>,
RecordsByHostRefExtension,
RunningVmsExtension,
GetStatsExtension
];
export const recordsByHostRefSubscription = (
vmSubscription: XenApiRecordSubscription<XenApiVm>
): PartialSubscription<RecordsByHostRefExtension> => ({
recordsByHostRef: computed(() => {
const vmsByHostOpaqueRef = new Map<XenApiHost["$ref"], XenApiVm[]>();
vmSubscription.records.value.forEach((vm) => {
if (!vmsByHostOpaqueRef.has(vm.resident_on)) {
vmsByHostOpaqueRef.set(vm.resident_on, []);
}
vmsByHostOpaqueRef.get(vm.resident_on)?.push(vm);
});
return vmsByHostOpaqueRef;
}),
});
export const runningVmsSubscription = (
vmSubscription: XenApiRecordSubscription<XenApiVm>
): PartialSubscription<RunningVmsExtension> => ({
runningVms: computed(() =>
vmSubscription.records.value.filter(
(vm) => vm.power_state === POWER_STATE.RUNNING
)
),
});
export const getStatsSubscription = (
vmSubscription: XenApiRecordSubscription<XenApiVm>,
hostSubscription: XenApiRecordSubscription<XenApiHost> | undefined
): PartialSubscription<GetStatsExtension> | undefined => {
if (hostSubscription === undefined) {
return;
}
return {
getStats: (id, granularity, ignoreExpired = false, { abortSignal }) => {
const xenApiStore = useXenApiStore();
if (!xenApiStore.isConnected) {
return undefined;
}
const vm = vmSubscription.getByUuid(id);
if (vm === undefined) {
throw new Error(`VM ${id} could not be found.`);
}
const host = hostSubscription.getByOpaqueRef(vm.resident_on);
if (host === undefined) {
throw new Error(`VM ${id} is halted or host could not be found.`);
}
return xenApiStore.getXapiStats()._getAndUpdateStats<VmStats>({
abortSignal,
host,
ignoreExpired,
uuid: vm.uuid,
granularity,
});
},
};
};

View File

@@ -1,13 +1,36 @@
import { sortRecordsByNameLabel } from "@/libs/utils";
import {
getStatsSubscription,
recordsByHostRefSubscription,
runningVmsSubscription,
type VmExtensions,
} from "@/stores/vm.extension";
import type {
GRANULARITY,
VmStats,
XapiStatsResponse,
} from "@/libs/xapi-stats";
import type { XenApiHost, XenApiVm } from "@/libs/xen-api";
import { POWER_STATE } from "@/libs/xen-api";
import { useXapiCollectionStore } from "@/stores/xapi-collection.store";
import type { Options, Subscription } from "@/types/subscription";
import { useXenApiStore } from "@/stores/xen-api.store";
import { createSubscribe, type Subscription } from "@/types/xapi-collection";
import { defineStore } from "pinia";
import { computed, type ComputedRef } from "vue";
type GetStats = (
id: XenApiVm["uuid"],
granularity: GRANULARITY,
ignoreExpired: boolean,
opts: { abortSignal?: AbortSignal }
) => Promise<XapiStatsResponse<VmStats> | undefined> | undefined;
type DefaultExtension = {
recordsByHostRef: ComputedRef<Map<XenApiHost["$ref"], XenApiVm[]>>;
runningVms: ComputedRef<XenApiVm[]>;
};
type GetStatsExtension = [
{
getStats: GetStats;
},
{ hostSubscription: Subscription<"host", object> }
];
type Extensions = [DefaultExtension, GetStatsExtension];
export const useVmStore = defineStore("vm", () => {
const vmCollection = useXapiCollectionStore().get("VM");
@@ -18,16 +41,82 @@ export const useVmStore = defineStore("vm", () => {
vmCollection.setSort(sortRecordsByNameLabel);
const subscribe = <O extends Options<VmExtensions>>(options?: O) => {
const subscription = vmCollection.subscribe(options);
const subscribe = createSubscribe<"VM", Extensions>((options) => {
const originalSubscription = vmCollection.subscribe(options);
const extendedSubscription = {
recordsByHostRef: computed(() => {
const vmsByHostOpaqueRef = new Map<XenApiHost["$ref"], XenApiVm[]>();
originalSubscription.records.value.forEach((vm) => {
if (!vmsByHostOpaqueRef.has(vm.resident_on)) {
vmsByHostOpaqueRef.set(vm.resident_on, []);
}
vmsByHostOpaqueRef.get(vm.resident_on)?.push(vm);
});
return vmsByHostOpaqueRef;
}),
runningVms: computed(() =>
originalSubscription.records.value.filter(
(vm) => vm.power_state === POWER_STATE.RUNNING
)
),
};
const hostSubscription = options?.hostSubscription;
const getStatsSubscription:
| {
getStats: GetStats;
}
| undefined =
hostSubscription !== undefined
? {
getStats: (
id,
granularity,
ignoreExpired = false,
{ abortSignal }
) => {
const xenApiStore = useXenApiStore();
if (!xenApiStore.isConnected) {
return undefined;
}
const vm = originalSubscription.getByUuid(id);
if (vm === undefined) {
throw new Error(`VM ${id} could not be found.`);
}
const host = hostSubscription.getByOpaqueRef(vm.resident_on);
if (host === undefined) {
throw new Error(
`VM ${id} is halted or host could not be found.`
);
}
return xenApiStore.getXapiStats()._getAndUpdateStats<VmStats>({
abortSignal,
host,
ignoreExpired,
uuid: vm.uuid,
granularity,
});
},
}
: undefined;
return {
...subscription,
...recordsByHostRefSubscription(subscription),
...runningVmsSubscription(subscription),
...getStatsSubscription(subscription, options?.hostSubscription),
} as Subscription<VmExtensions, O>;
};
...originalSubscription,
...extendedSubscription,
...getStatsSubscription,
};
});
return {
...vmCollection,

View File

@@ -1,11 +1,10 @@
import type { RawObjectType, RawTypeToRecord } from "@/libs/xen-api";
import type { RawObjectType } from "@/libs/xen-api";
import { useXenApiStore } from "@/stores/xen-api.store";
import type {
DeferExtension,
Options,
RawTypeToRecord,
SubscribeOptions,
Subscription,
XenApiRecordExtension,
} from "@/types/subscription";
} from "@/types/xapi-collection";
import { tryOnUnmounted, whenever } from "@vueuse/core";
import { defineStore } from "pinia";
import { computed, readonly, ref } from "vue";
@@ -15,7 +14,7 @@ export const useXapiCollectionStore = defineStore("xapiCollection", () => {
function get<T extends RawObjectType>(
type: T
): ReturnType<typeof createXapiCollection<T>> {
): ReturnType<typeof createXapiCollection<T, RawTypeToRecord<T>>> {
if (!collections.value.has(type)) {
collections.value.set(type, createXapiCollection(type));
}
@@ -28,7 +27,7 @@ export const useXapiCollectionStore = defineStore("xapiCollection", () => {
const createXapiCollection = <
T extends RawObjectType,
R extends RawTypeToRecord<T> = RawTypeToRecord<T>
R extends RawTypeToRecord<T>
>(
type: T
) => {
@@ -107,11 +106,9 @@ const createXapiCollection = <
() => fetchAll()
);
type Extensions = [XenApiRecordExtension<R>, DeferExtension];
function subscribe<O extends Options<Extensions>>(
function subscribe<O extends SubscribeOptions<any>>(
options?: O
): Subscription<Extensions, O> {
): Subscription<T, O> {
const id = Symbol();
tryOnUnmounted(() => {
@@ -134,14 +131,14 @@ const createXapiCollection = <
if (options?.immediate !== false) {
start();
return subscription as Subscription<Extensions, O>;
return subscription as unknown as Subscription<T, O>;
}
return {
...subscription,
start,
isStarted: computed(() => subscriptions.value.has(id)),
} as Subscription<Extensions, O>;
} as unknown as Subscription<T, O>;
}
const unsubscribe = (id: symbol) => subscriptions.value.delete(id);

View File

@@ -1,74 +0,0 @@
import type { XenApiRecord } from "@/libs/xen-api";
import type { ComputedRef, Ref } from "vue";
type SimpleExtension<Value extends object> = { type: "simple"; value: Value };
type ConditionalExtension<Value extends object, Condition extends object> = {
type: "conditional";
value: Value;
condition: Condition;
};
type UnpackExtension<E, Options> = E extends SimpleExtension<infer Value>
? Value
: E extends ConditionalExtension<infer Value, infer Condition>
? Options extends Condition
? Value
: object
: object;
export type Extension<
Value extends object,
Condition extends object | undefined = undefined
> = Condition extends object
? ConditionalExtension<Value, Condition>
: SimpleExtension<Value>;
export type Options<Extensions extends any[]> = Extensions extends [
infer First,
...infer Rest
]
? First extends ConditionalExtension<any, infer Condition>
? Rest extends any[]
? Partial<Condition> & Options<Rest>
: Partial<Condition>
: Rest extends any[]
? Options<Rest>
: object
: object;
export type Subscription<
Extensions extends any[],
Options extends object
> = Extensions extends [infer First, ...infer Rest]
? UnpackExtension<First, Options> & Subscription<Rest, Options>
: object;
export type PartialSubscription<E> = E extends SimpleExtension<infer Value>
? Value
: E extends ConditionalExtension<infer Value, any>
? Value
: never;
export type XenApiRecordExtension<T extends XenApiRecord<any>> = Extension<{
records: ComputedRef<T[]>;
getByOpaqueRef: (opaqueRef: T["$ref"]) => T | undefined;
getByUuid: (uuid: T["uuid"]) => T | undefined;
hasUuid: (uuid: T["uuid"]) => boolean;
isReady: Readonly<Ref<boolean>>;
isFetching: Readonly<Ref<boolean>>;
isReloading: ComputedRef<boolean>;
hasError: ComputedRef<boolean>;
lastError: Readonly<Ref<string | undefined>>;
}>;
export type DeferExtension = Extension<
{
start: () => void;
isStarted: ComputedRef<boolean>;
},
{ immediate: false }
>;
export type XenApiRecordSubscription<T extends XenApiRecord<any>> =
PartialSubscription<XenApiRecordExtension<T>>;

View File

@@ -0,0 +1,108 @@
import type {
RawObjectType,
XenApiConsole,
XenApiHost,
XenApiHostMetrics,
XenApiMessage,
XenApiPool,
XenApiSr,
XenApiTask,
XenApiVm,
XenApiVmGuestMetrics,
XenApiVmMetrics,
} from "@/libs/xen-api";
import type { ComputedRef, Ref } from "vue";
type DefaultExtension<
T extends RawObjectType,
R extends RawTypeToRecord<T> = RawTypeToRecord<T>
> = {
records: ComputedRef<R[]>;
getByOpaqueRef: (opaqueRef: R["$ref"]) => R | undefined;
getByUuid: (uuid: R["uuid"]) => R | undefined;
hasUuid: (uuid: R["uuid"]) => boolean;
isReady: Readonly<Ref<boolean>>;
isFetching: Readonly<Ref<boolean>>;
isReloading: ComputedRef<boolean>;
hasError: ComputedRef<boolean>;
lastError: Readonly<Ref<string | undefined>>;
};
type DeferExtension = [
{
start: () => void;
isStarted: ComputedRef<boolean>;
},
{ immediate: false }
];
type DefaultExtensions<T extends RawObjectType> = [
DefaultExtension<T>,
DeferExtension
];
type GenerateSubscribeOptions<Extensions extends any[]> = Extensions extends [
infer FirstExtension,
...infer RestExtension
]
? FirstExtension extends [object, infer FirstCondition]
? FirstCondition & GenerateSubscribeOptions<RestExtension>
: GenerateSubscribeOptions<RestExtension>
: object;
export type SubscribeOptions<Extensions extends any[]> = Partial<
GenerateSubscribeOptions<Extensions> &
GenerateSubscribeOptions<DefaultExtensions<any>>
>;
type GenerateSubscription<
Options extends object,
Extensions extends any[]
> = Extensions extends [infer FirstExtension, ...infer RestExtension]
? FirstExtension extends [infer FirstObject, infer FirstCondition]
? Options extends FirstCondition
? FirstObject & GenerateSubscription<Options, RestExtension>
: GenerateSubscription<Options, RestExtension>
: FirstExtension & GenerateSubscription<Options, RestExtension>
: object;
export type Subscription<
T extends RawObjectType,
Options extends object,
Extensions extends any[] = []
> = GenerateSubscription<Options, Extensions> &
GenerateSubscription<Options, DefaultExtensions<T>>;
export function createSubscribe<
T extends RawObjectType,
Extensions extends any[],
Options extends object = SubscribeOptions<Extensions>
>(builder: (options?: Options) => Subscription<T, Options, Extensions>) {
return function subscribe<O extends Options>(
options?: O
): Subscription<T, O, Extensions> {
return builder(options);
};
}
export type RawTypeToRecord<T extends RawObjectType> = T extends "SR"
? XenApiSr
: T extends "VM"
? XenApiVm
: T extends "VM_guest_metrics"
? XenApiVmGuestMetrics
: T extends "VM_metrics"
? XenApiVmMetrics
: T extends "console"
? XenApiConsole
: T extends "host"
? XenApiHost
: T extends "host_metrics"
? XenApiHostMetrics
: T extends "message"
? XenApiMessage
: T extends "pool"
? XenApiPool
: T extends "task"
? XenApiTask
: never;

View File

@@ -21,7 +21,6 @@ import { useI18n } from "vue-i18n";
const { pendingTasks, finishedTasks, isReady, hasError } =
useTaskStore().subscribe();
const { t } = useI18n();
const titleStore = usePageTitleStore();

View File

@@ -43,7 +43,7 @@
"app-conf": "^2.3.0",
"async-iterator-to-stream": "^1.1.0",
"fs-extra": "^11.1.0",
"get-stream": "^6.0.0",
"get-stream": "^7.0.1",
"getopts": "^2.2.3",
"golike-defer": "^0.5.1",
"http-server-plus": "^1.0.0",

View File

@@ -28,7 +28,7 @@
"@vates/nbd-client": "^2.0.0",
"@xen-orchestra/async-map": "^0.1.2",
"@xen-orchestra/log": "^0.6.0",
"d3-time-format": "^3.0.0",
"d3-time-format": "^4.1.0",
"golike-defer": "^0.5.1",
"http-request-plus": "^1.0.0",
"json-rpc-protocol": "^0.13.2",

View File

@@ -6,7 +6,7 @@ import { decorateClass } from '@vates/decorate-with'
import { strict as assert } from 'node:assert'
import extractOpaqueRef from './_extractOpaqueRef.mjs'
import NbdClient from '@vates/nbd-client'
import NbdClient from '@vates/nbd-client/client.mjs'
import { createNbdRawStream, createNbdVhdStream } from 'vhd-lib/createStreamNbd.js'
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from './index.mjs'

View File

@@ -12,6 +12,7 @@
> Users must be able to say: “I had this issue, happy to know it's fixed”
- [LDAP] Mark the _Id attribute_ setting as required
- [Incremental Replication] Fix `TypeError: Cannot read properties of undefined (reading 'uuid') at #isAlreadyOnHealthCheckSr` [Forum#7492](https://xcp-ng.org/forum/topic/7492) (PR [#6969](https://github.com/vatesfr/xen-orchestra/pull/6969))
### Packages to release
@@ -29,6 +30,8 @@
<!--packages-start-->
- @xen-orchestra/backups patch
- xen-api patch
- xo-server patch
- xo-server-auth-ldap patch
- xo-web patch

View File

@@ -25,7 +25,7 @@
"jest": "^29.0.3",
"lint-staged": "^13.0.3",
"lodash": "^4.17.4",
"prettier": "^2.0.5",
"prettier": "^3.0.1",
"promise-toolbox": "^0.21.0",
"semver": "^7.3.6",
"sorted-object": "^2.0.1",
@@ -98,7 +98,7 @@
"prettify": "prettier --ignore-path .gitignore --ignore-unknown --write .",
"test": "npm run test-lint && npm run test-unit",
"test-integration": "jest \".integ\\.spec\\.js$\" && scripts/run-script.js --parallel test-integration",
"test-lint": "eslint --ignore-path .gitignore --ignore-pattern packages/xo-web .",
"test-lint": "eslint --ignore-path .gitignore --ignore-pattern @xen-orchestra/lite --ignore-pattern packages/xo-web .",
"test-unit": "jest \"^(?!.*\\.integ\\.spec\\.js$)\" && scripts/run-script.js --bail test"
},
"workspaces": [

View File

@@ -419,7 +419,7 @@ export class Xapi extends EventEmitter {
signal: $cancelToken,
}),
{
when: error => error.response.statusCode === 302,
when: error => error.response !== undefined && error.response.statusCode === 302,
onRetry: async error => {
const response = error.response
if (response === undefined) {

View File

@@ -35,7 +35,7 @@
"ensure-array": "^1.0.0",
"exec-promise": "^0.7.0",
"inquirer": "^8.0.0",
"ldapts": "^4.1.0",
"ldapts": "^6.0.0",
"promise-toolbox": "^0.21.0"
},
"devDependencies": {

View File

@@ -76,7 +76,7 @@
"fast-xml-parser": "^4.0.0",
"fatfs": "^0.10.4",
"fs-extra": "^11.1.0",
"get-stream": "^6.0.0",
"get-stream": "^7.0.1",
"golike-defer": "^0.5.1",
"hashy": "^0.11.1",
"helmet": "^3.9.0",

View File

@@ -1,10 +1,10 @@
import * as multiparty from 'multiparty'
import assert from 'assert'
import getStream from 'get-stream'
import hrp from 'http-request-plus'
import { createLogger } from '@xen-orchestra/log'
import { defer } from 'golike-defer'
import { format, JsonRpcError } from 'json-rpc-peer'
import { getStreamAsBuffer } from 'get-stream'
import { invalidParameters, noSuchObject } from 'xo-common/api-errors.js'
import { pipeline } from 'stream'
import { peekFooterFromVhdStream } from 'vhd-lib'
@@ -187,7 +187,7 @@ async function handleImport(req, res, { type, name, description, vmdkData, srId,
if (part.name !== 'file') {
promises.push(
(async () => {
const buffer = await getStream.buffer(part)
const buffer = await getStreamAsBuffer(part)
vmdkData[part.name] = new Uint32Array(
buffer.buffer,
buffer.byteOffset,

View File

@@ -80,7 +80,7 @@ set.params = {
},
shareByDefault: {
type: 'boolean',
optional: true
optional: true,
},
subjects: {
type: 'array',

View File

@@ -4,12 +4,12 @@ import { asyncEach } from '@vates/async-each'
import asyncMapSettled from '@xen-orchestra/async-map/legacy.js'
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
import concat from 'lodash/concat.js'
import getStream from 'get-stream'
import hrp from 'http-request-plus'
import { createLogger } from '@xen-orchestra/log'
import { defer } from 'golike-defer'
import { format } from 'json-rpc-peer'
import { FAIL_ON_QUEUE } from 'limit-concurrency-decorator'
import { getStreamAsBuffer } from 'get-stream'
import { ignoreErrors } from 'promise-toolbox'
import { invalidParameters, noSuchObject, operationFailed, unauthorized } from 'xo-common/api-errors.js'
import { Ref } from 'xen-api'
@@ -1224,7 +1224,7 @@ async function handleVmImport(req, res, { data, srId, type, xapi }) {
if (!(part.filename in tables)) {
tables[part.filename] = {}
}
const buffer = await getStream.buffer(part)
const buffer = await getStreamAsBuffer(part)
tables[part.filename][part.name] = new Uint32Array(
buffer.buffer,
buffer.byteOffset,

View File

@@ -1,6 +1,6 @@
import * as CM from 'complex-matcher'
import getStream from 'get-stream'
import { fromCallback } from 'promise-toolbox'
import { getStreamAsBuffer } from 'get-stream'
import { pipeline } from 'readable-stream'
import createNdJsonStream from '../_createNdJsonStream.mjs'
@@ -81,7 +81,7 @@ getAllObjects.params = {
export async function importConfig({ passphrase }) {
return {
$sendTo: await this.registerHttpRequest(async (req, res) => {
await this.importConfig(await getStream.buffer(req), { passphrase })
await this.importConfig(await getStreamAsBuffer(req), { passphrase })
res.end('config successfully imported')
}),

View File

@@ -533,8 +533,7 @@ const xoItemToRender = {
<span>
<Icon icon='xo-cloud-config' /> <ShortDate timestamp={createdAt} />
</span>
)
,
),
// XO objects.
pool: props => <Pool {...props} />,

3133
yarn.lock

File diff suppressed because it is too large Load Diff