Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
282805966b WiP: feat(xen-api/getCachedRecord): getRecord + cache + events
Fixes #5088
2022-03-01 13:37:03 +01:00
5 changed files with 144 additions and 48 deletions

View File

@@ -1,14 +1,25 @@
import assert from 'assert' import assert from 'assert'
import dns from 'dns' import dns from 'dns'
import httpRequest from 'http-request-plus'
import kindOf from 'kindof' import kindOf from 'kindof'
import ms from 'ms' import ms from 'ms'
import httpRequest from 'http-request-plus' import pCatch from 'promise-toolbox/catch'
import { coalesceCalls } from '@vates/coalesce-calls' import { coalesceCalls } from '@vates/coalesce-calls'
import { Collection } from 'xo-collection' import { Collection } from 'xo-collection'
import { EventEmitter } from 'events' import { EventEmitter } from 'events'
import { map, noop, omit } from 'lodash' import { map, noop, omit } from 'lodash'
import { cancelable, defer, fromCallback, fromEvents, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator' import { limitConcurrency } from 'limit-concurrency-decorator'
import {
cancelable,
CancelToken,
defer,
fromCallback,
fromEvents,
ignoreErrors,
pDelay,
pRetry,
pTimeout,
} from 'promise-toolbox'
import autoTransport from './transports/auto' import autoTransport from './transports/auto'
import debug from './_debug' import debug from './_debug'
@@ -132,6 +143,7 @@ export class Xapi extends EventEmitter {
this._lastEventFetchedTimestamp = undefined this._lastEventFetchedTimestamp = undefined
this._debounce = opts.debounce ?? 200 this._debounce = opts.debounce ?? 200
this._cacheAllRecords = opts.watchEvents !== 'lazy'
this._objects = new Collection() this._objects = new Collection()
this._objectsByRef = { __proto__: null } this._objectsByRef = { __proto__: null }
this._objectsFetched = new Promise(resolve => { this._objectsFetched = new Promise(resolve => {
@@ -284,7 +296,13 @@ export class Xapi extends EventEmitter {
} }
async getRecord(type, ref) { async getRecord(type, ref) {
return this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref])) const record = this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref]))
const records = this._objectsByRef
if (records !== undefined) {
records[ref] = record
this._objects[record.$id] = record
}
return record
} }
async getRecordByUuid(type, uuid) { async getRecordByUuid(type, uuid) {
@@ -590,10 +608,29 @@ export class Xapi extends EventEmitter {
return taskRef return taskRef
} }
async getCachedRecord(type, ref) {
const records = this._objectsByRef
let record = records[ref]
if (record !== undefined) {
return record
}
this._watchType(type)
record = await pCatch.call(this.getRecord(type, ref), { code: 'HANDLE_INVALID' }, noop)
records[ref] = record
return record
}
// Nice getter which returns the object for a given $id (internal to // Nice getter which returns the object for a given $id (internal to
// this lib), UUID (unique identifier that some objects have) or // this lib), UUID (unique identifier that some objects have) or
// opaque reference (internal to XAPI). // opaque reference (internal to XAPI).
getObject(idOrUuidOrRef, defaultValue) { getObject(idOrUuidOrRef, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
if (typeof idOrUuidOrRef === 'object') { if (typeof idOrUuidOrRef === 'object') {
idOrUuidOrRef = idOrUuidOrRef.$id idOrUuidOrRef = idOrUuidOrRef.$id
} }
@@ -610,6 +647,10 @@ export class Xapi extends EventEmitter {
// Returns the object for a given opaque reference (internal to // Returns the object for a given opaque reference (internal to
// XAPI). // XAPI).
getObjectByRef(ref, defaultValue) { getObjectByRef(ref, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
const object = this._objectsByRef[ref] const object = this._objectsByRef[ref]
if (object !== undefined) return object if (object !== undefined) return object
@@ -622,6 +663,10 @@ export class Xapi extends EventEmitter {
// Returns the object for a given UUID (unique identifier that some // Returns the object for a given UUID (unique identifier that some
// objects have). // objects have).
getObjectByUuid(uuid, defaultValue) { getObjectByUuid(uuid, defaultValue) {
if (!this._cacheAllRecords) {
throw new Error('disabled')
}
// Objects ids are already UUIDs if they have one. // Objects ids are already UUIDs if they have one.
const object = this._objects.all[uuid] const object = this._objects.all[uuid]
@@ -666,10 +711,10 @@ export class Xapi extends EventEmitter {
// Private // Private
// =========================================================================== // ===========================================================================
async _call(method, args, timeout = this._callTimeout(method, args)) { async _call(method, args, { cancelToken = CancelToken.none, timeout = this._callTimeout(method, args) } = {}) {
const startTime = Date.now() const startTime = Date.now()
try { try {
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args)), timeout) const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args, cancelToken)), timeout)
debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result)) debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result))
return result return result
} catch (error) { } catch (error) {
@@ -732,7 +777,7 @@ export class Xapi extends EventEmitter {
this._status !== DISCONNECTED && error?.code === 'SESSION_INVALID' && this._auth.password !== undefined, this._status !== DISCONNECTED && error?.code === 'SESSION_INVALID' && this._auth.password !== undefined,
onRetry: () => this._sessionOpen(), onRetry: () => this._sessionOpen(),
} }
async _sessionCall(method, args, timeout) { async _sessionCall(method, args, opts) {
if (method.startsWith('session.')) { if (method.startsWith('session.')) {
return Promise.reject(new Error('session.*() methods are disabled from this interface')) return Promise.reject(new Error('session.*() methods are disabled from this interface'))
} }
@@ -747,7 +792,7 @@ export class Xapi extends EventEmitter {
newArgs.push.apply(newArgs, args) newArgs.push.apply(newArgs, args)
} }
return this._call(method, newArgs, timeout) return this._call(method, newArgs, opts)
}, this._sessionCallRetryOptions) }, this._sessionCallRetryOptions)
} catch (error) { } catch (error) {
if (error?.code === 'SESSION_INVALID') { if (error?.code === 'SESSION_INVALID') {
@@ -1031,8 +1076,18 @@ export class Xapi extends EventEmitter {
} }
// read-only call, automatically retry in case of connection issues // read-only call, automatically retry in case of connection issues
_roCall(method, args) { _roCall(...args) {
return pRetry(() => this._sessionCall(method, args), this._roCallRetryOptions) return pRetry(() => this._sessionCall(...args), this._roCallRetryOptions)
}
_cancelEventCall = noop
_watchType(type) {
const watchedTypes = this._watchedTypes
if (!watchedTypes.includes(type)) {
watchedTypes.push(type)
this._cancelEventCall()
}
} }
_watchEvents = coalesceCalls(this._watchEvents) _watchEvents = coalesceCalls(this._watchEvents)
@@ -1067,9 +1122,11 @@ export class Xapi extends EventEmitter {
const types = this._watchedTypes ?? this._types const types = this._watchedTypes ?? this._types
// initial fetch // initial fetch
await this._refreshCachedRecords(types) if (this._cacheAllRecords) {
this._resolveObjectsFetched() await this._refreshCachedRecords(types)
this._resolveObjectsFetched = undefined this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
}
// event loop // event loop
const debounce = this._debounce const debounce = this._debounce
@@ -1082,16 +1139,28 @@ export class Xapi extends EventEmitter {
try { try {
// don't use _sessionCall because a session failure should break the // don't use _sessionCall because a session failure should break the
// loop and trigger a complete refetch // loop and trigger a complete refetch
result = await this._call( result = await pRetry(
'event.from', () => {
[ const source = CancelToken.source()
this._sessionId, this._cancelEventCall = source.cancel
types,
fromToken, return this._call(
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport 'event.from',
], [
EVENT_TIMEOUT * 1e3 * 1.1 this._sessionId,
types,
fromToken,
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
],
{
cancelToken: source.token,
timeout: EVENT_TIMEOUT * 1e3 * 1.1,
}
)
},
{ when: CancelToken.isCancelToken }
) )
this._lastEventFetchedTimestamp = Date.now() this._lastEventFetchedTimestamp = Date.now()
this._watchEventsError = undefined this._watchEventsError = undefined
this.emit('eventFetchingSuccess') this.emit('eventFetchingSuccess')
@@ -1151,7 +1220,17 @@ export class Xapi extends EventEmitter {
try { try {
await this._connected await this._connected
this._processEvents(await this._roCall('event.next', undefined, EVENT_TIMEOUT * 1e3)) this._processEvents(
await pRetry(
() => {
const source = CancelToken.source()
this._cancelEventCall = source.cancel
return this._roCall('event.next', undefined, { cancelToken: source.token, timeout: EVENT_TIMEOUT * 1e3 })
},
{ when: CancelToken.isCancelToken }
)
)
} catch (error) { } catch (error) {
if (error?.code === 'EVENTS_LOST') { if (error?.code === 'EVENTS_LOST') {
await ignoreErrors.call(this._sessionCall('event.unregister', [types])) await ignoreErrors.call(this._sessionCall('event.unregister', [types]))
@@ -1219,20 +1298,29 @@ export class Xapi extends EventEmitter {
addMethods(proto) addMethods(proto)
} }
})(xapi) })(xapi)
const disabledGetter = this._cacheAllRecords
? undefined
: () => {
throw new Error('disabled')
}
fields.forEach(field => { fields.forEach(field => {
props[`set_${field}`] = function (value) { props[`set_${field}`] =
return xapi.setField(this.$type, this.$ref, field, value) disabledGetter ||
} function (value) {
return xapi.setField(this.$type, this.$ref, field, value)
}
const $field = (field in RESERVED_FIELDS ? '$$' : '$') + field const $field = (field in RESERVED_FIELDS ? '$$' : '$') + field
const value = data[field] const value = data[field]
if (Array.isArray(value)) { if (Array.isArray(value)) {
if (value.length === 0 || Ref.is(value[0])) { if (value.length === 0 || Ref.is(value[0])) {
getters[$field] = function () { getters[$field] =
const value = this[field] disabledGetter ||
return value.length === 0 ? value : value.map(getObjectByRef) function () {
} const value = this[field]
return value.length === 0 ? value : value.map(getObjectByRef)
}
} }
props[`add_${field}`] = function (value) { props[`add_${field}`] = function (value) {
@@ -1242,23 +1330,27 @@ export class Xapi extends EventEmitter {
return xapi.call(`${type}.remove_${field}`, this.$ref, value).then(noop) return xapi.call(`${type}.remove_${field}`, this.$ref, value).then(noop)
} }
} else if (value !== null && typeof value === 'object') { } else if (value !== null && typeof value === 'object') {
getters[$field] = function () { getters[$field] =
const value = this[field] disabledGetter ||
const result = {} function () {
getKeys(value).forEach(key => { const value = this[field]
result[key] = xapi._objectsByRef[value[key]] const result = {}
}) getKeys(value).forEach(key => {
return result result[key] = xapi._objectsByRef[value[key]]
} })
return result
}
props[`update_${field}`] = function (entries, value) { props[`update_${field}`] = function (entries, value) {
return typeof entries === 'string' return typeof entries === 'string'
? xapi.setFieldEntry(this.$type, this.$ref, field, entries, value) ? xapi.setFieldEntry(this.$type, this.$ref, field, entries, value)
: xapi.setFieldEntries(this.$type, this.$ref, field, entries) : xapi.setFieldEntries(this.$type, this.$ref, field, entries)
} }
} else if (Ref.is(value)) { } else if (Ref.is(value)) {
getters[$field] = function () { getters[$field] =
return xapi._objectsByRef[this[field]] disabledGetter ||
} function () {
return xapi._objectsByRef[this[field]]
}
} }
}) })
const descriptors = {} const descriptors = {}

View File

@@ -14,14 +14,14 @@ export default opts => {
const current = factories[i++](opts) const current = factories[i++](opts)
if (i < length) { if (i < length) {
const currentI = i const currentI = i
call = (method, args) => call = (...args) =>
current(method, args).catch(error => { current(...args).catch(error => {
if (error instanceof UnsupportedTransport) { if (error instanceof UnsupportedTransport) {
if (currentI === i) { if (currentI === i) {
// not changed yet // not changed yet
create() create()
} }
return call(method, args) return call(...args)
} }
throw error throw error
@@ -32,5 +32,5 @@ export default opts => {
} }
create() create()
return (method, args) => call(method, args) return (...args) => call(...args)
} }

View File

@@ -12,9 +12,9 @@ export default ({ secureOptions, url, httpProxy }) => {
if (httpProxy !== undefined) { if (httpProxy !== undefined) {
agent = new ProxyAgent(httpProxy) agent = new ProxyAgent(httpProxy)
} }
return (method, args) => return (method, args, cancelToken) =>
httpRequestPlus httpRequestPlus
.post(url, { .post(cancelToken, url, {
...secureOptions, ...secureOptions,
body: format.request(0, method, args), body: format.request(0, method, args),
headers: { headers: {

View File

@@ -86,5 +86,5 @@ export default ({ secureOptions, url: { hostname, port, protocol }, httpProxy })
}) })
const call = promisify(client.methodCall, client) const call = promisify(client.methodCall, client)
return (method, args) => call(method, prepareXmlRpcParams(args)).then(parseResult, logError) return (method, args, cancelToken) => call(method, prepareXmlRpcParams(args), cancelToken).then(parseResult, logError)
} }

View File

@@ -45,5 +45,9 @@ export default ({ secureOptions, url: { hostname, port, protocol, httpProxy } })
}) })
const call = promisify(client.methodCall, client) const call = promisify(client.methodCall, client)
return (method, args) => call(method, prepareXmlRpcParams(args)).then(parseResult, logError) return (method, args, cancelToken) =>
new Promise((resolve, reject) => {
call(method, prepareXmlRpcParams(args), cancelToken).then(resolve, reject)
cancelToken.addHandler(reject)
}).then(parseResult, logError)
} }