Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
282805966b WiP: feat(xen-api/getCachedRecord): getRecord + cache + events
Fixes #5088
2022-03-01 13:37:03 +01:00
5 changed files with 144 additions and 48 deletions

View File

@@ -1,14 +1,25 @@
import assert from 'assert'
import dns from 'dns'
import httpRequest from 'http-request-plus'
import kindOf from 'kindof'
import ms from 'ms'
import httpRequest from 'http-request-plus'
import pCatch from 'promise-toolbox/catch'
import { coalesceCalls } from '@vates/coalesce-calls'
import { Collection } from 'xo-collection'
import { EventEmitter } from 'events'
import { map, noop, omit } from 'lodash'
import { cancelable, defer, fromCallback, fromEvents, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import {
cancelable,
CancelToken,
defer,
fromCallback,
fromEvents,
ignoreErrors,
pDelay,
pRetry,
pTimeout,
} from 'promise-toolbox'
import autoTransport from './transports/auto'
import debug from './_debug'
@@ -132,6 +143,7 @@ export class Xapi extends EventEmitter {
this._lastEventFetchedTimestamp = undefined
this._debounce = opts.debounce ?? 200
this._cacheAllRecords = opts.watchEvents !== 'lazy'
this._objects = new Collection()
this._objectsByRef = { __proto__: null }
this._objectsFetched = new Promise(resolve => {
@@ -284,7 +296,13 @@ export class Xapi extends EventEmitter {
}
async getRecord(type, ref) {
return this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref]))
const record = this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref]))
const records = this._objectsByRef
if (records !== undefined) {
records[ref] = record
this._objects[record.$id] = record
}
return record
}
async getRecordByUuid(type, uuid) {
@@ -590,10 +608,29 @@ export class Xapi extends EventEmitter {
return taskRef
}
async getCachedRecord(type, ref) {
const records = this._objectsByRef
let record = records[ref]
if (record !== undefined) {
return record
}
this._watchType(type)
record = await pCatch.call(this.getRecord(type, ref), { code: 'HANDLE_INVALID' }, noop)
records[ref] = record
return record
}
// Nice getter which returns the object for a given $id (internal to
// this lib), UUID (unique identifier that some objects have) or
// opaque reference (internal to XAPI).
getObject(idOrUuidOrRef, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
if (typeof idOrUuidOrRef === 'object') {
idOrUuidOrRef = idOrUuidOrRef.$id
}
@@ -610,6 +647,10 @@ export class Xapi extends EventEmitter {
// Returns the object for a given opaque reference (internal to
// XAPI).
getObjectByRef(ref, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
const object = this._objectsByRef[ref]
if (object !== undefined) return object
@@ -622,6 +663,10 @@ export class Xapi extends EventEmitter {
// Returns the object for a given UUID (unique identifier that some
// objects have).
getObjectByUuid(uuid, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
// Objects ids are already UUIDs if they have one.
const object = this._objects.all[uuid]
@@ -666,10 +711,10 @@ export class Xapi extends EventEmitter {
// Private
// ===========================================================================
async _call(method, args, timeout = this._callTimeout(method, args)) {
async _call(method, args, { cancelToken = CancelToken.none, timeout = this._callTimeout(method, args) } = {}) {
const startTime = Date.now()
try {
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args)), timeout)
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args, cancelToken)), timeout)
debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result))
return result
} catch (error) {
@@ -732,7 +777,7 @@ export class Xapi extends EventEmitter {
this._status !== DISCONNECTED && error?.code === 'SESSION_INVALID' && this._auth.password !== undefined,
onRetry: () => this._sessionOpen(),
}
async _sessionCall(method, args, timeout) {
async _sessionCall(method, args, opts) {
if (method.startsWith('session.')) {
return Promise.reject(new Error('session.*() methods are disabled from this interface'))
}
@@ -747,7 +792,7 @@ export class Xapi extends EventEmitter {
newArgs.push.apply(newArgs, args)
}
return this._call(method, newArgs, timeout)
return this._call(method, newArgs, opts)
}, this._sessionCallRetryOptions)
} catch (error) {
if (error?.code === 'SESSION_INVALID') {
@@ -1031,8 +1076,18 @@ export class Xapi extends EventEmitter {
}
// read-only call, automatically retry in case of connection issues
_roCall(method, args) {
return pRetry(() => this._sessionCall(method, args), this._roCallRetryOptions)
_roCall(...args) {
return pRetry(() => this._sessionCall(...args), this._roCallRetryOptions)
}
_cancelEventCall = noop
_watchType(type) {
const watchedTypes = this._watchedTypes
if (!watchedTypes.includes(type)) {
watchedTypes.push(type)
this._cancelEventCall()
}
}
_watchEvents = coalesceCalls(this._watchEvents)
@@ -1067,9 +1122,11 @@ export class Xapi extends EventEmitter {
const types = this._watchedTypes ?? this._types
// initial fetch
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
if (this._cacheAllRecords) {
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
}
// event loop
const debounce = this._debounce
@@ -1082,16 +1139,28 @@ export class Xapi extends EventEmitter {
try {
// don't use _sessionCall because a session failure should break the
// loop and trigger a complete refetch
result = await this._call(
'event.from',
[
this._sessionId,
types,
fromToken,
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
],
EVENT_TIMEOUT * 1e3 * 1.1
result = await pRetry(
() => {
const source = CancelToken.source()
this._cancelEventCall = source.cancel
return this._call(
'event.from',
[
this._sessionId,
types,
fromToken,
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
],
{
cancelToken: source.token,
timeout: EVENT_TIMEOUT * 1e3 * 1.1,
}
)
},
{ when: CancelToken.isCancelToken }
)
this._lastEventFetchedTimestamp = Date.now()
this._watchEventsError = undefined
this.emit('eventFetchingSuccess')
@@ -1151,7 +1220,17 @@ export class Xapi extends EventEmitter {
try {
await this._connected
this._processEvents(await this._roCall('event.next', undefined, EVENT_TIMEOUT * 1e3))
this._processEvents(
await pRetry(
() => {
const source = CancelToken.source()
this._cancelEventCall = source.cancel
return this._roCall('event.next', undefined, { cancelToken: source.token, timeout: EVENT_TIMEOUT * 1e3 })
},
{ when: CancelToken.isCancelToken }
)
)
} catch (error) {
if (error?.code === 'EVENTS_LOST') {
await ignoreErrors.call(this._sessionCall('event.unregister', [types]))
@@ -1219,20 +1298,29 @@ export class Xapi extends EventEmitter {
addMethods(proto)
}
})(xapi)
const disabledGetter = this._cacheAllRecords
? undefined
: () => {
throw new Error('disabled')
}
fields.forEach(field => {
props[`set_${field}`] = function (value) {
return xapi.setField(this.$type, this.$ref, field, value)
}
props[`set_${field}`] =
disabledGetter ||
function (value) {
return xapi.setField(this.$type, this.$ref, field, value)
}
const $field = (field in RESERVED_FIELDS ? '$$' : '$') + field
const value = data[field]
if (Array.isArray(value)) {
if (value.length === 0 || Ref.is(value[0])) {
getters[$field] = function () {
const value = this[field]
return value.length === 0 ? value : value.map(getObjectByRef)
}
getters[$field] =
disabledGetter ||
function () {
const value = this[field]
return value.length === 0 ? value : value.map(getObjectByRef)
}
}
props[`add_${field}`] = function (value) {
@@ -1242,23 +1330,27 @@ export class Xapi extends EventEmitter {
return xapi.call(`${type}.remove_${field}`, this.$ref, value).then(noop)
}
} else if (value !== null && typeof value === 'object') {
getters[$field] = function () {
const value = this[field]
const result = {}
getKeys(value).forEach(key => {
result[key] = xapi._objectsByRef[value[key]]
})
return result
}
getters[$field] =
disabledGetter ||
function () {
const value = this[field]
const result = {}
getKeys(value).forEach(key => {
result[key] = xapi._objectsByRef[value[key]]
})
return result
}
props[`update_${field}`] = function (entries, value) {
return typeof entries === 'string'
? xapi.setFieldEntry(this.$type, this.$ref, field, entries, value)
: xapi.setFieldEntries(this.$type, this.$ref, field, entries)
}
} else if (Ref.is(value)) {
getters[$field] = function () {
return xapi._objectsByRef[this[field]]
}
getters[$field] =
disabledGetter ||
function () {
return xapi._objectsByRef[this[field]]
}
}
})
const descriptors = {}

View File

@@ -14,14 +14,14 @@ export default opts => {
const current = factories[i++](opts)
if (i < length) {
const currentI = i
call = (method, args) =>
current(method, args).catch(error => {
call = (...args) =>
current(...args).catch(error => {
if (error instanceof UnsupportedTransport) {
if (currentI === i) {
// not changed yet
create()
}
return call(method, args)
return call(...args)
}
throw error
@@ -32,5 +32,5 @@ export default opts => {
}
create()
return (method, args) => call(method, args)
return (...args) => call(...args)
}

View File

@@ -12,9 +12,9 @@ export default ({ secureOptions, url, httpProxy }) => {
if (httpProxy !== undefined) {
agent = new ProxyAgent(httpProxy)
}
return (method, args) =>
return (method, args, cancelToken) =>
httpRequestPlus
.post(url, {
.post(cancelToken, url, {
...secureOptions,
body: format.request(0, method, args),
headers: {

View File

@@ -86,5 +86,5 @@ export default ({ secureOptions, url: { hostname, port, protocol }, httpProxy })
})
const call = promisify(client.methodCall, client)
return (method, args) => call(method, prepareXmlRpcParams(args)).then(parseResult, logError)
return (method, args, cancelToken) => call(method, prepareXmlRpcParams(args), cancelToken).then(parseResult, logError)
}

View File

@@ -45,5 +45,9 @@ export default ({ secureOptions, url: { hostname, port, protocol, httpProxy } })
})
const call = promisify(client.methodCall, client)
return (method, args) => call(method, prepareXmlRpcParams(args)).then(parseResult, logError)
return (method, args, cancelToken) =>
new Promise((resolve, reject) => {
call(method, prepareXmlRpcParams(args), cancelToken).then(resolve, reject)
cancelToken.addHandler(reject)
}).then(parseResult, logError)
}