feat(xen-api/_watchEvents): detect and fix desynchornizations
This commit is contained in:
8
packages/xen-api/src/_MultiCounter.js
Normal file
8
packages/xen-api/src/_MultiCounter.js
Normal file
@@ -0,0 +1,8 @@
|
||||
const handler = {
|
||||
get(target, property) {
|
||||
const value = target[property]
|
||||
return value !== undefined ? value : 0
|
||||
},
|
||||
}
|
||||
|
||||
export const create = () => new Proxy({ __proto__: null }, handler)
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
pTimeout,
|
||||
} from 'promise-toolbox'
|
||||
|
||||
import * as MultiCounter from './_MultiCounter'
|
||||
import autoTransport from './transports/auto'
|
||||
import coalesceCalls from './_coalesceCalls'
|
||||
import debug from './_debug'
|
||||
@@ -99,6 +100,7 @@ export class Xapi extends EventEmitter {
|
||||
this._sessionId = undefined
|
||||
this._status = DISCONNECTED
|
||||
|
||||
this._counter = MultiCounter.create()
|
||||
this._debounce = opts.debounce ?? 200
|
||||
this._objects = new Collection()
|
||||
this._objectsByRef = { __proto__: null }
|
||||
@@ -773,6 +775,10 @@ export class Xapi extends EventEmitter {
|
||||
this._objects.set(object.$id, object)
|
||||
objectsByRef[ref] = object
|
||||
|
||||
if (prev === undefined) {
|
||||
++this._counter[type]
|
||||
}
|
||||
|
||||
if (type === 'pool') {
|
||||
this._pool = object
|
||||
|
||||
@@ -785,10 +791,6 @@ export class Xapi extends EventEmitter {
|
||||
}
|
||||
})
|
||||
} else if (type === 'task') {
|
||||
if (prev === undefined) {
|
||||
++this._nTasks
|
||||
}
|
||||
|
||||
const taskWatchers = this._taskWatchers
|
||||
const taskWatcher = taskWatchers[ref]
|
||||
if (taskWatcher !== undefined) {
|
||||
@@ -820,6 +822,7 @@ export class Xapi extends EventEmitter {
|
||||
}
|
||||
|
||||
async _refreshCachedRecords(types) {
|
||||
const counter = this._counter
|
||||
const toRemoveByType = { __proto__: null }
|
||||
types.forEach(type => {
|
||||
toRemoveByType[type] = new Set()
|
||||
@@ -851,8 +854,15 @@ export class Xapi extends EventEmitter {
|
||||
this._removeRecordFromCache(type, ref)
|
||||
})
|
||||
|
||||
if (type === 'task') {
|
||||
this._nTasks = refs.length
|
||||
const count = refs.length
|
||||
if (counter[type] !== count) {
|
||||
console.warn(
|
||||
'_refreshCachedRecords(%s): xapi=%d != local=%d',
|
||||
type,
|
||||
count,
|
||||
counter[type]
|
||||
)
|
||||
counter[type] = count
|
||||
}
|
||||
} catch (error) {
|
||||
// there is nothing ideal to do here, do not interrupt event
|
||||
@@ -873,9 +883,7 @@ export class Xapi extends EventEmitter {
|
||||
this._objects.unset(object.$id)
|
||||
delete byRefs[ref]
|
||||
|
||||
if (type === 'task') {
|
||||
--this._nTasks
|
||||
}
|
||||
--this._counter[type]
|
||||
}
|
||||
|
||||
const taskWatchers = this._taskWatchers
|
||||
@@ -927,6 +935,16 @@ export class Xapi extends EventEmitter {
|
||||
this._resolveObjectsFetched()
|
||||
this._resolveObjectsFetched = undefined
|
||||
|
||||
const IGNORED_TYPES = {
|
||||
__proto__: null,
|
||||
message: true,
|
||||
role: true,
|
||||
session: true,
|
||||
user: true,
|
||||
VBD_metrics: true,
|
||||
VIF_metrics: true,
|
||||
}
|
||||
|
||||
// event loop
|
||||
const debounce = this._debounce
|
||||
while (true) {
|
||||
@@ -959,10 +977,25 @@ export class Xapi extends EventEmitter {
|
||||
fromToken = result.token
|
||||
this._processEvents(result.events)
|
||||
|
||||
// detect and fix disappearing tasks (e.g. when toolstack restarts)
|
||||
if (result.valid_ref_counts.task !== this._nTasks) {
|
||||
await this._refreshCachedRecords(['task'])
|
||||
}
|
||||
// detect and fix desynchronized records
|
||||
const localCounts = this._counter
|
||||
const xapiCounts = result.valid_ref_counts
|
||||
await this._refreshCachedRecords(
|
||||
types.filter(type => {
|
||||
if (type in IGNORED_TYPES) {
|
||||
return false
|
||||
}
|
||||
|
||||
// XAPI uses lowercased types in events, but this may change, so we
|
||||
// handle both
|
||||
let xapiCount = xapiCounts[type]
|
||||
if (xapiCount === undefined) {
|
||||
xapiCount = xapiCounts[type.toLowerCase()]
|
||||
}
|
||||
|
||||
return localCounts[type] !== xapiCount
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user