|
|
|
|
@@ -1,14 +1,25 @@
|
|
|
|
|
import assert from 'assert'
|
|
|
|
|
import dns from 'dns'
|
|
|
|
|
import httpRequest from 'http-request-plus'
|
|
|
|
|
import kindOf from 'kindof'
|
|
|
|
|
import ms from 'ms'
|
|
|
|
|
import httpRequest from 'http-request-plus'
|
|
|
|
|
import pCatch from 'promise-toolbox/catch'
|
|
|
|
|
import { coalesceCalls } from '@vates/coalesce-calls'
|
|
|
|
|
import { Collection } from 'xo-collection'
|
|
|
|
|
import { EventEmitter } from 'events'
|
|
|
|
|
import { map, noop, omit } from 'lodash'
|
|
|
|
|
import { cancelable, defer, fromCallback, fromEvents, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
|
|
|
|
|
import { limitConcurrency } from 'limit-concurrency-decorator'
|
|
|
|
|
import {
|
|
|
|
|
cancelable,
|
|
|
|
|
CancelToken,
|
|
|
|
|
defer,
|
|
|
|
|
fromCallback,
|
|
|
|
|
fromEvents,
|
|
|
|
|
ignoreErrors,
|
|
|
|
|
pDelay,
|
|
|
|
|
pRetry,
|
|
|
|
|
pTimeout,
|
|
|
|
|
} from 'promise-toolbox'
|
|
|
|
|
|
|
|
|
|
import autoTransport from './transports/auto'
|
|
|
|
|
import debug from './_debug'
|
|
|
|
|
@@ -132,6 +143,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
this._lastEventFetchedTimestamp = undefined
|
|
|
|
|
|
|
|
|
|
this._debounce = opts.debounce ?? 200
|
|
|
|
|
this._cacheAllRecords = opts.watchEvents !== 'lazy'
|
|
|
|
|
this._objects = new Collection()
|
|
|
|
|
this._objectsByRef = { __proto__: null }
|
|
|
|
|
this._objectsFetched = new Promise(resolve => {
|
|
|
|
|
@@ -284,7 +296,13 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async getRecord(type, ref) {
|
|
|
|
|
return this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref]))
|
|
|
|
|
const record = this._wrapRecord(type, ref, await this._roCall(`${type}.get_record`, [ref]))
|
|
|
|
|
const records = this._objectsByRef
|
|
|
|
|
if (records !== undefined) {
|
|
|
|
|
records[ref] = record
|
|
|
|
|
this._objects[record.$id] = record
|
|
|
|
|
}
|
|
|
|
|
return record
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async getRecordByUuid(type, uuid) {
|
|
|
|
|
@@ -590,10 +608,29 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
return taskRef
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async getCachedRecord(type, ref) {
|
|
|
|
|
const records = this._objectsByRef
|
|
|
|
|
let record = records[ref]
|
|
|
|
|
if (record !== undefined) {
|
|
|
|
|
return record
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this._watchType(type)
|
|
|
|
|
|
|
|
|
|
record = await pCatch.call(this.getRecord(type, ref), { code: 'HANDLE_INVALID' }, noop)
|
|
|
|
|
records[ref] = record
|
|
|
|
|
|
|
|
|
|
return record
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Nice getter which returns the object for a given $id (internal to
|
|
|
|
|
// this lib), UUID (unique identifier that some objects have) or
|
|
|
|
|
// opaque reference (internal to XAPI).
|
|
|
|
|
getObject(idOrUuidOrRef, defaultValue) {
|
|
|
|
|
if (!this._cacheAllRecords) {
|
|
|
|
|
throw new Error('disabled')
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (typeof idOrUuidOrRef === 'object') {
|
|
|
|
|
idOrUuidOrRef = idOrUuidOrRef.$id
|
|
|
|
|
}
|
|
|
|
|
@@ -610,6 +647,10 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
// Returns the object for a given opaque reference (internal to
|
|
|
|
|
// XAPI).
|
|
|
|
|
getObjectByRef(ref, defaultValue) {
|
|
|
|
|
if (!this._cacheAllRecords) {
|
|
|
|
|
throw new Error('disabled')
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const object = this._objectsByRef[ref]
|
|
|
|
|
|
|
|
|
|
if (object !== undefined) return object
|
|
|
|
|
@@ -622,6 +663,10 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
// Returns the object for a given UUID (unique identifier that some
|
|
|
|
|
// objects have).
|
|
|
|
|
getObjectByUuid(uuid, defaultValue) {
|
|
|
|
|
if (!this._cacheAllRecords) {
|
|
|
|
|
throw new Error('disabled')
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Objects ids are already UUIDs if they have one.
|
|
|
|
|
const object = this._objects.all[uuid]
|
|
|
|
|
|
|
|
|
|
@@ -666,10 +711,10 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
// Private
|
|
|
|
|
// ===========================================================================
|
|
|
|
|
|
|
|
|
|
async _call(method, args, timeout = this._callTimeout(method, args)) {
|
|
|
|
|
async _call(method, args, { cancelToken = CancelToken.none, timeout = this._callTimeout(method, args) } = {}) {
|
|
|
|
|
const startTime = Date.now()
|
|
|
|
|
try {
|
|
|
|
|
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args)), timeout)
|
|
|
|
|
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args, cancelToken)), timeout)
|
|
|
|
|
debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result))
|
|
|
|
|
return result
|
|
|
|
|
} catch (error) {
|
|
|
|
|
@@ -732,7 +777,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
this._status !== DISCONNECTED && error?.code === 'SESSION_INVALID' && this._auth.password !== undefined,
|
|
|
|
|
onRetry: () => this._sessionOpen(),
|
|
|
|
|
}
|
|
|
|
|
async _sessionCall(method, args, timeout) {
|
|
|
|
|
async _sessionCall(method, args, opts) {
|
|
|
|
|
if (method.startsWith('session.')) {
|
|
|
|
|
return Promise.reject(new Error('session.*() methods are disabled from this interface'))
|
|
|
|
|
}
|
|
|
|
|
@@ -747,7 +792,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
newArgs.push.apply(newArgs, args)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return this._call(method, newArgs, timeout)
|
|
|
|
|
return this._call(method, newArgs, opts)
|
|
|
|
|
}, this._sessionCallRetryOptions)
|
|
|
|
|
} catch (error) {
|
|
|
|
|
if (error?.code === 'SESSION_INVALID') {
|
|
|
|
|
@@ -1031,8 +1076,18 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// read-only call, automatically retry in case of connection issues
|
|
|
|
|
_roCall(method, args) {
|
|
|
|
|
return pRetry(() => this._sessionCall(method, args), this._roCallRetryOptions)
|
|
|
|
|
_roCall(...args) {
|
|
|
|
|
return pRetry(() => this._sessionCall(...args), this._roCallRetryOptions)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_cancelEventCall = noop
|
|
|
|
|
|
|
|
|
|
_watchType(type) {
|
|
|
|
|
const watchedTypes = this._watchedTypes
|
|
|
|
|
if (!watchedTypes.includes(type)) {
|
|
|
|
|
watchedTypes.push(type)
|
|
|
|
|
this._cancelEventCall()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_watchEvents = coalesceCalls(this._watchEvents)
|
|
|
|
|
@@ -1067,9 +1122,11 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
const types = this._watchedTypes ?? this._types
|
|
|
|
|
|
|
|
|
|
// initial fetch
|
|
|
|
|
await this._refreshCachedRecords(types)
|
|
|
|
|
this._resolveObjectsFetched()
|
|
|
|
|
this._resolveObjectsFetched = undefined
|
|
|
|
|
if (this._cacheAllRecords) {
|
|
|
|
|
await this._refreshCachedRecords(types)
|
|
|
|
|
this._resolveObjectsFetched()
|
|
|
|
|
this._resolveObjectsFetched = undefined
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// event loop
|
|
|
|
|
const debounce = this._debounce
|
|
|
|
|
@@ -1082,16 +1139,28 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
try {
|
|
|
|
|
// don't use _sessionCall because a session failure should break the
|
|
|
|
|
// loop and trigger a complete refetch
|
|
|
|
|
result = await this._call(
|
|
|
|
|
'event.from',
|
|
|
|
|
[
|
|
|
|
|
this._sessionId,
|
|
|
|
|
types,
|
|
|
|
|
fromToken,
|
|
|
|
|
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
|
|
|
|
|
],
|
|
|
|
|
EVENT_TIMEOUT * 1e3 * 1.1
|
|
|
|
|
result = await pRetry(
|
|
|
|
|
() => {
|
|
|
|
|
const source = CancelToken.source()
|
|
|
|
|
this._cancelEventCall = source.cancel
|
|
|
|
|
|
|
|
|
|
return this._call(
|
|
|
|
|
'event.from',
|
|
|
|
|
[
|
|
|
|
|
this._sessionId,
|
|
|
|
|
types,
|
|
|
|
|
fromToken,
|
|
|
|
|
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
|
|
|
|
|
],
|
|
|
|
|
{
|
|
|
|
|
cancelToken: source.token,
|
|
|
|
|
timeout: EVENT_TIMEOUT * 1e3 * 1.1,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
},
|
|
|
|
|
{ when: CancelToken.isCancelToken }
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
this._lastEventFetchedTimestamp = Date.now()
|
|
|
|
|
this._watchEventsError = undefined
|
|
|
|
|
this.emit('eventFetchingSuccess')
|
|
|
|
|
@@ -1151,7 +1220,17 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await this._connected
|
|
|
|
|
this._processEvents(await this._roCall('event.next', undefined, EVENT_TIMEOUT * 1e3))
|
|
|
|
|
this._processEvents(
|
|
|
|
|
await pRetry(
|
|
|
|
|
() => {
|
|
|
|
|
const source = CancelToken.source()
|
|
|
|
|
this._cancelEventCall = source.cancel
|
|
|
|
|
|
|
|
|
|
return this._roCall('event.next', undefined, { cancelToken: source.token, timeout: EVENT_TIMEOUT * 1e3 })
|
|
|
|
|
},
|
|
|
|
|
{ when: CancelToken.isCancelToken }
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
} catch (error) {
|
|
|
|
|
if (error?.code === 'EVENTS_LOST') {
|
|
|
|
|
await ignoreErrors.call(this._sessionCall('event.unregister', [types]))
|
|
|
|
|
@@ -1219,20 +1298,29 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
addMethods(proto)
|
|
|
|
|
}
|
|
|
|
|
})(xapi)
|
|
|
|
|
const disabledGetter = this._cacheAllRecords
|
|
|
|
|
? undefined
|
|
|
|
|
: () => {
|
|
|
|
|
throw new Error('disabled')
|
|
|
|
|
}
|
|
|
|
|
fields.forEach(field => {
|
|
|
|
|
props[`set_${field}`] = function (value) {
|
|
|
|
|
return xapi.setField(this.$type, this.$ref, field, value)
|
|
|
|
|
}
|
|
|
|
|
props[`set_${field}`] =
|
|
|
|
|
disabledGetter ||
|
|
|
|
|
function (value) {
|
|
|
|
|
return xapi.setField(this.$type, this.$ref, field, value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const $field = (field in RESERVED_FIELDS ? '$$' : '$') + field
|
|
|
|
|
|
|
|
|
|
const value = data[field]
|
|
|
|
|
if (Array.isArray(value)) {
|
|
|
|
|
if (value.length === 0 || Ref.is(value[0])) {
|
|
|
|
|
getters[$field] = function () {
|
|
|
|
|
const value = this[field]
|
|
|
|
|
return value.length === 0 ? value : value.map(getObjectByRef)
|
|
|
|
|
}
|
|
|
|
|
getters[$field] =
|
|
|
|
|
disabledGetter ||
|
|
|
|
|
function () {
|
|
|
|
|
const value = this[field]
|
|
|
|
|
return value.length === 0 ? value : value.map(getObjectByRef)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
props[`add_${field}`] = function (value) {
|
|
|
|
|
@@ -1242,23 +1330,27 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
return xapi.call(`${type}.remove_${field}`, this.$ref, value).then(noop)
|
|
|
|
|
}
|
|
|
|
|
} else if (value !== null && typeof value === 'object') {
|
|
|
|
|
getters[$field] = function () {
|
|
|
|
|
const value = this[field]
|
|
|
|
|
const result = {}
|
|
|
|
|
getKeys(value).forEach(key => {
|
|
|
|
|
result[key] = xapi._objectsByRef[value[key]]
|
|
|
|
|
})
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
getters[$field] =
|
|
|
|
|
disabledGetter ||
|
|
|
|
|
function () {
|
|
|
|
|
const value = this[field]
|
|
|
|
|
const result = {}
|
|
|
|
|
getKeys(value).forEach(key => {
|
|
|
|
|
result[key] = xapi._objectsByRef[value[key]]
|
|
|
|
|
})
|
|
|
|
|
return result
|
|
|
|
|
}
|
|
|
|
|
props[`update_${field}`] = function (entries, value) {
|
|
|
|
|
return typeof entries === 'string'
|
|
|
|
|
? xapi.setFieldEntry(this.$type, this.$ref, field, entries, value)
|
|
|
|
|
: xapi.setFieldEntries(this.$type, this.$ref, field, entries)
|
|
|
|
|
}
|
|
|
|
|
} else if (Ref.is(value)) {
|
|
|
|
|
getters[$field] = function () {
|
|
|
|
|
return xapi._objectsByRef[this[field]]
|
|
|
|
|
}
|
|
|
|
|
getters[$field] =
|
|
|
|
|
disabledGetter ||
|
|
|
|
|
function () {
|
|
|
|
|
return xapi._objectsByRef[this[field]]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
const descriptors = {}
|
|
|
|
|
|