chore(xen-api): rework events watching (#4103)

This commit is contained in:
Julien Fontanet
2019-03-29 15:59:51 +01:00
committed by Pierre Donias
parent d7527f280c
commit 485b8fe993

View File

@@ -4,7 +4,7 @@ import kindOf from 'kindof'
import ms from 'ms'
import httpRequest from 'http-request-plus'
import { EventEmitter } from 'events'
import { forEach, forOwn, isArray, map, noop, omit } from 'lodash'
import { isArray, map, noop, omit } from 'lodash'
import {
cancelable,
defer,
@@ -13,7 +13,6 @@ import {
pDelay,
pRetry,
pTimeout,
TimeoutError,
} from 'promise-toolbox'
import autoTransport from './transports/auto'
@@ -33,12 +32,6 @@ import XapiError from './_XapiError'
// in seconds!
const EVENT_TIMEOUT = 60
// -------------------------------------------------------------------
const areEventsLost = ({ code }) => code === 'EVENTS_LOST'
const isMethodUnknown = ({ code }) => code === 'MESSAGE_METHOD_UNKNOWN'
// ===================================================================
const { defineProperties, freeze, keys: getKeys } = Object
@@ -49,10 +42,6 @@ export const NULL_REF = 'OpaqueRef:NULL'
// -------------------------------------------------------------------
const getKey = o => o.$id
// -------------------------------------------------------------------
const RESERVED_FIELDS = {
id: true,
pool: true,
@@ -79,6 +68,7 @@ export class Xapi extends EventEmitter {
this._callTimeout = makeCallSetting(opts.callTimeout, 0)
this._httpInactivityTimeout = opts.httpInactivityTimeout ?? 5 * 60 * 1e3 // 5 mins
this._eventPollDelay = opts.eventPollDelay ?? 60 * 1e3 // 1 min
this._pool = null
this._readOnly = Boolean(opts.readOnly)
this._RecordsByType = { __proto__: null }
@@ -108,14 +98,16 @@ export class Xapi extends EventEmitter {
this._disconnected = Promise.resolve()
this._sessionId = undefined
this._status = DISCONNECTED
;(this._objects = new Collection()).getKey = getKey
this._debounce = opts.debounce == null ? 200 : opts.debounce
this._debounce = opts.debounce ?? 200
this._objects = new Collection()
this._objectsByRef = { __proto__: null }
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
this._eventWatchers = { __proto__: null }
this._taskWatchers = { __proto__: null }
this._watchedTypes = undefined
this._watching = false
this.on(DISCONNECTED, this._clearObjects)
this._clearObjects()
const { watchEvents } = opts
if (watchEvents !== false) {
if (isArray(watchEvents)) {
@@ -602,19 +594,9 @@ export class Xapi extends EventEmitter {
throw new Error('no object with UUID: ' + uuid)
}
// manually run events watching if set to `false` in constructor
watchEvents() {
this._eventWatchers = { __proto__: null }
this._taskWatchers = { __proto__: null }
if (this.status === CONNECTED) {
this._watchEventsWrapper()
}
this.on('connected', this._watchEventsWrapper)
this.on('disconnected', () => {
this._objects.clear()
})
ignoreErrors.call(this._watchEvents())
}
watchTask(ref) {
@@ -684,15 +666,6 @@ export class Xapi extends EventEmitter {
}
}
_clearObjects() {
;(this._objectsByRef = { __proto__: null })[NULL_REF] = undefined
this._nTasks = 0
this._objects.clear()
this.objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
}
// return a promise which resolves to a task ref or undefined
_autoTask(task = this._taskWatchers !== undefined, name) {
if (task === false) {
@@ -786,11 +759,15 @@ export class Xapi extends EventEmitter {
// An object's UUID can change during its life.
const prev = objectsByRef[ref]
let prevUuid
if (prev && (prevUuid = prev.uuid) && prevUuid !== object.uuid) {
if (
prev !== undefined &&
(prevUuid = prev.uuid) !== undefined &&
prevUuid !== object.uuid
) {
objects.remove(prevUuid)
}
this._objects.set(object)
this._objects.set(object.$id, object)
objectsByRef[ref] = object
if (type === 'pool') {
@@ -822,6 +799,7 @@ export class Xapi extends EventEmitter {
}
_processEvents(events) {
const flush = this._objects.bufferEvents()
events.forEach(event => {
let type = event.class
const lcToTypes = this._lcToTypes
@@ -835,6 +813,54 @@ export class Xapi extends EventEmitter {
this._addRecordToCache(type, ref, event.snapshot)
}
})
flush()
}
async _refreshCachedRecords(types) {
const toRemoveByType = { __proto__: null }
types.forEach(type => {
toRemoveByType[type] = new Set()
})
const byRefs = this._objectsByRef
getKeys(byRefs).forEach(ref => {
const { $type } = byRefs[ref]
const toRemove = toRemoveByType[$type]
if (toRemove !== undefined) {
toRemove.add(ref)
}
})
const flush = this._objects.bufferEvents()
await Promise.all(
types.map(async type => {
try {
const toRemove = toRemoveByType[type]
const records = await this._sessionCall(`${type}.get_all_records`)
const refs = getKeys(records)
refs.forEach(ref => {
toRemove.delete(ref)
// we can bypass _processEvents here because they are all *add*
// event and all objects are of the same type
this._addRecordToCache(type, ref, records[ref])
})
toRemove.forEach(ref => {
this._removeRecordFromCache(type, ref)
})
if (type === 'task') {
this._nTasks = refs.length
}
} catch (error) {
// there is nothing ideal to do here, do not interrupt event
// handling
if (error?.code !== 'MESSAGE_REMOVED') {
console.warn('_refreshCachedRecords', type, error)
}
}
})
)
flush()
}
_removeRecordFromCache(type, ref) {
@@ -860,120 +886,80 @@ export class Xapi extends EventEmitter {
}
}
// - prevent multiple watches
// - swallow errors
async _watchEventsWrapper() {
if (!this._watching) {
this._watching = true
try {
await this._watchEvents()
} catch (error) {
console.error('_watchEventsWrapper', error)
}
this._watching = false
}
}
// TODO: cancelation
_watchEvents = coalesceCalls(this._watchEvents)
async _watchEvents() {
this._clearObjects()
// compute the initial token for the event loop
//
// we need to do this before the initial fetch to avoid losing events
let fromToken
try {
fromToken = await this._sessionCall('event.inject', [
'pool',
this._pool.$ref,
])
} catch (error) {
if (isMethodUnknown(error)) {
return this._watchEventsLegacy()
}
}
const types = this._watchedTypes || this._types
// initial fetch
const flush = this.objects.bufferEvents()
try {
await Promise.all(
types.map(async type => {
try {
// FIXME: use _transportCall to avoid auto-reconnection
forOwn(
await this._sessionCall(`${type}.get_all_records`),
(record, ref) => {
// we can bypass _processEvents here because they are all *add*
// event and all objects are of the same type
this._addRecordToCache(type, ref, record)
}
)
} catch (error) {
// there is nothing ideal to do here, do not interrupt event
// handling
if (error != null && error.code !== 'MESSAGE_REMOVED') {
console.warn('_watchEvents', 'initial fetch', type, error)
}
}
// eslint-disable-next-line no-labels
mainLoop: while (true) {
if (this._resolveObjectsFetched === undefined) {
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
)
} finally {
flush()
}
this._resolveObjectsFetched()
// event loop
const debounce = this._debounce
while (true) {
if (debounce != null) {
await pDelay(debounce)
}
let result
await this._connected
// compute the initial token for the event loop
//
// we need to do this before the initial fetch to avoid losing events
let fromToken
try {
result = await this._sessionCall(
'event.from',
[
types,
fromToken,
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
],
EVENT_TIMEOUT * 1e3 * 1.1
)
fromToken = await this._sessionCall('event.inject', [
'pool',
this._pool.$ref,
])
} catch (error) {
if (error instanceof TimeoutError) {
if (error?.code === 'MESSAGE_METHOD_UNKNOWN') {
return this._watchEventsLegacy()
}
console.warn('_watchEvents', error)
await pDelay(this._eventPollDelay)
continue
}
const types = this._watchedTypes ?? this._types
// initial fetch
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
// event loop
const debounce = this._debounce
while (true) {
await pDelay(debounce)
await this._connected
let result
try {
result = await this._sessionCall(
'event.from',
[
types,
fromToken,
EVENT_TIMEOUT + 0.1, // must be float for XML-RPC transport
],
EVENT_TIMEOUT * 1e3 * 1.1
)
} catch (error) {
if (error?.code === 'EVENTS_LOST') {
// eslint-disable-next-line no-labels
continue mainLoop
}
console.warn('_watchEvents', error)
await pDelay(this._eventPollDelay)
continue
}
if (areEventsLost(error)) {
return this._watchEvents()
fromToken = result.token
this._processEvents(result.events)
// detect and fix disappearing tasks (e.g. when toolstack restarts)
if (result.valid_ref_counts.task !== this._nTasks) {
await this._refreshCachedRecords(['task'])
}
throw error
}
fromToken = result.token
this._processEvents(result.events)
// detect and fix disappearing tasks (e.g. when toolstack restarts)
if (result.valid_ref_counts.task !== this._nTasks) {
await ignoreErrors.call(
this._sessionCall('task.get_all_records').then(tasks => {
const toRemove = new Set()
forOwn(this.objects.all, object => {
if (object.$type === 'task') {
toRemove.add(object.$ref)
}
})
forOwn(tasks, (task, ref) => {
toRemove.delete(ref)
this._addRecordToCache('task', ref, task)
})
toRemove.forEach(ref => {
this._removeRecordFromCache('task', ref)
})
})
)
}
}
}
@@ -982,55 +968,46 @@ export class Xapi extends EventEmitter {
// methods.
//
// It also has to manually get all objects first.
_watchEventsLegacy() {
const getAllObjects = async () => {
const flush = this.objects.bufferEvents()
async _watchEventsLegacy() {
if (this._resolveObjectsFetched === undefined) {
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
}
await this._connected
const types = this._watchedTypes ?? this._types
// initial fetch
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
await this._sessionCall('event.register', [types])
// event loop
const debounce = this._debounce
while (true) {
await pDelay(debounce)
try {
await Promise.all(
this._types.map(type =>
this._sessionCall(`${type}.get_all_records`).then(
objects => {
forEach(objects, (object, ref) => {
this._addRecordToCache(type, ref, object)
})
},
error => {
if (error.code !== 'MESSAGE_REMOVED') {
throw error
}
}
)
)
await this._connected
this._processEvents(
await this._sessionCall('event.next', undefined, EVENT_TIMEOUT * 1e3)
)
} finally {
flush()
} catch (error) {
if (error?.code === 'EVENTS_LOST') {
await ignoreErrors.call(
this._sessionCall('event.unregister', [types])
)
return this._watchEventsLegacy()
}
console.warn('_watchEventsLegacy', error)
await pDelay(this._eventPollDelay)
}
this._resolveObjectsFetched()
}
const watchEvents = () =>
this._sessionCall('event.register', [['*']]).then(loop)
const loop = () =>
this.status === CONNECTED &&
this._sessionCall('event.next').then(onSuccess, onFailure)
const onSuccess = events => {
this._processEvents(events)
const debounce = this._debounce
return debounce == null ? loop() : pDelay(debounce).then(loop)
}
const onFailure = error => {
if (areEventsLost(error)) {
return this._sessionCall('event.unregister', [['*']]).then(watchEvents)
}
throw error
}
return getAllObjects().then(watchEvents)
}
_wrapRecord(type, ref, data) {