chore(xen-api/_watchEvents): new implementation (#3990)

- fetch each types independently: no more huge requests
- only fall back to legacy implementation if `event.inject` is not available
- can only watch certain types
- `Xapi#objectsFetched` is a promise which resolves when objects have been fetched
This commit is contained in:
Julien Fontanet 2019-02-26 09:45:21 +01:00 committed by Pierre Donias
parent 4216a5808a
commit 59106aa29e

View File

@ -8,6 +8,7 @@ import { EventEmitter } from 'events'
import { fibonacci } from 'iterable-backoff'
import {
forEach,
forOwn,
isArray,
isInteger,
map,
@ -270,17 +271,14 @@ export class Xapi extends EventEmitter {
super()
this._allowUnauthorized = opts.allowUnauthorized
this._auth = opts.auth
this._callTimeout = makeCallSetting(opts.callTimeout, 0)
this._debounce = opts.debounce == null ? 200 : opts.debounce
this._pool = null
this._readOnly = Boolean(opts.readOnly)
this._RecordsByType = createObject(null)
this._sessionId = null
;(this._objects = new Collection()).getKey = getKey
;(this._objectsByRef = createObject(null))[NULL_REF] = undefined
const url = (this._url = parseUrl(opts.url))
this._auth = opts.auth
const url = (this._url = parseUrl(opts.url))
if (this._auth === undefined) {
const user = url.username
if (user !== undefined) {
@ -293,7 +291,19 @@ export class Xapi extends EventEmitter {
}
}
if (opts.watchEvents !== false) {
;(this._objects = new Collection()).getKey = getKey
this._debounce = opts.debounce == null ? 200 : opts.debounce
this._watchedTypes = undefined
this._watching = false
this.on(DISCONNECTED, this._clearObjects)
this._clearObjects()
const { watchEvents } = opts
if (watchEvents !== false) {
if (Array.isArray(watchEvents)) {
this._watchedTypes = watchEvents
}
this.watchEvents()
}
}
@ -301,19 +311,14 @@ export class Xapi extends EventEmitter {
watchEvents() {
this._eventWatchers = createObject(null)
this._fromToken = ''
this._nTasks = 0
this._taskWatchers = Object.create(null)
if (this.status === CONNECTED) {
this._watchEvents()
this._watchEventsWrapper()
}
this.on('connected', this._watchEvents)
this.on('connected', this._watchEventsWrapper)
this.on('disconnected', () => {
this._fromToken = ''
this._objects.clear()
})
}
@ -805,6 +810,15 @@ export class Xapi extends EventEmitter {
return this._objects
}
_clearObjects() {
;(this._objectsByRef = createObject(null))[NULL_REF] = undefined
this._nTasks = 0
this._objects.clear()
this.objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
}
// return a promise which resolves to a task ref or undefined
_autoTask(task = this._taskWatchers !== undefined, name) {
if (task === false) {
@ -820,7 +834,7 @@ export class Xapi extends EventEmitter {
}
// Medium level call: handle session errors.
_sessionCall(method, args) {
_sessionCall(method, args, timeout = this._callTimeout(method, args)) {
try {
if (startsWith(method, 'session.')) {
throw new Error('session.*() methods are disabled from this interface')
@ -844,7 +858,7 @@ export class Xapi extends EventEmitter {
return this.connect().then(() => this._sessionCall(method, args))
}
),
this._callTimeout(method, args)
timeout
)
} catch (error) {
return Promise.reject(error)
@ -937,34 +951,97 @@ export class Xapi extends EventEmitter {
})
}
_watchEvents() {
const loop = () =>
this.status === CONNECTED &&
pTimeout
.call(
this._sessionCall('event.from', [
['*'],
this._fromToken,
EVENT_TIMEOUT + 0.1, // Force float.
]),
EVENT_TIMEOUT * 1.1e3 // 10% longer than the XenAPI timeout
// - prevent multiple watches
// - swallow errors
async _watchEventsWrapper() {
if (!this._watching) {
this._watching = true
await ignoreErrors.call(this._watchEvents())
this._watching = false
}
}
// TODO: cancelation
async _watchEvents() {
this._clearObjects()
// compute the initial token for the event loop
//
// we need to do this before the initial fetch to avoid losing events
let fromToken
try {
fromToken = await this._sessionCall('event.inject', [
'pool',
this._pool.$ref,
])
} catch (error) {
if (isMethodUnknown(error)) {
return this._watchEventsLegacy()
}
}
const types = this._watchedTypes || this._types
// initial fetch
await Promise.all(
types.map(async type => {
try {
// FIXME: use _transportCall to avoid auto-reconnection
forOwn(
await this._sessionCall(`${type}.get_all_records`),
(record, ref) => {
// we can bypass _processEvents here because they are all *add*
// event and all objects are of the same type
this._addObject(type, ref, record)
}
)
} catch (error) {
if (error == null || error.code !== 'MESSAGE_REMOVED') {
throw error
}
}
})
)
this._resolveObjectsFetched()
// event loop
const debounce = this._debounce
while (true) {
if (debounce != null) {
await pDelay(debounce)
}
let result
try {
result = await this._sessionCall(
'event.from',
[types, fromToken, EVENT_TIMEOUT],
EVENT_TIMEOUT * 1.1
)
.then(onSuccess, onFailure)
} catch (error) {
if (error instanceof TimeoutError) {
continue
}
if (areEventsLost(error)) {
return this._watchEvents()
}
throw error
}
const onSuccess = ({ events, token, valid_ref_counts: { task } }) => {
this._fromToken = token
this._processEvents(events)
fromToken = result.token
this._processEvents(result.events)
if (task !== this._nTasks) {
this._sessionCall('task.get_all_records')
.then(tasks => {
// detect and fix disappearing tasks (e.g. when toolstack restarts)
if (result.valid_ref_counts.task !== this._nTasks) {
ignoreErrors.call(
this._sessionCall('task.get_all_records').then(tasks => {
const toRemove = new Set()
forEach(this.objects.all, object => {
forOwn(this.objects.all, object => {
if (object.$type === 'task') {
toRemove.add(object.$ref)
}
})
forEach(tasks, (task, ref) => {
forOwn(tasks, (task, ref) => {
toRemove.delete(ref)
this._addObject('task', ref, task)
})
@ -972,40 +1049,9 @@ export class Xapi extends EventEmitter {
this._removeObject('task', ref)
})
})
.catch(noop)
)
}
const debounce = this._debounce
return debounce != null ? pDelay(debounce).then(loop) : loop()
}
const onFailure = error => {
if (error instanceof TimeoutError) {
return loop()
}
if (areEventsLost(error)) {
this._fromToken = ''
this._objects.clear()
return loop()
}
throw error
}
ignoreErrors.call(
pCatch.call(
loop(),
isMethodUnknown,
// If the server failed, it is probably due to an excessively
// large response.
// Falling back to legacy events watch should be enough.
error => error && error.res && error.res.statusCode === 500,
() => this._watchEventsLegacy()
)
)
}
// This method watches events using the legacy `event.next` XAPI
@ -1029,7 +1075,7 @@ export class Xapi extends EventEmitter {
}
)
)
)
).then(() => this._resolveObjectsFetched())
}
const watchEvents = () =>
@ -1065,8 +1111,7 @@ export class Xapi extends EventEmitter {
const nFields = fields.length
const xapi = this
const objectsByRef = this._objectsByRef
const getObjectByRef = ref => objectsByRef[ref]
const getObjectByRef = ref => this._objectsByRef[ref]
Record = function(ref, data) {
defineProperties(this, {
@ -1108,7 +1153,7 @@ export class Xapi extends EventEmitter {
const value = this[field]
const result = {}
getKeys(value).forEach(key => {
result[key] = objectsByRef[value[key]]
result[key] = xapi._objectsByRef[value[key]]
})
return result
}
@ -1122,7 +1167,7 @@ export class Xapi extends EventEmitter {
// a ref property, an user had the case on XenServer 7.0 on the CD VBD
// of a VM created by XenCenter
getters[$field] = function() {
return objectsByRef[this[field]]
return xapi._objectsByRef[this[field]]
}
}
})