Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
3facbcda99 feat(xen-api/_watchEvents): detect and fix desynchornizations 2019-04-08 15:46:26 +02:00
2 changed files with 54 additions and 13 deletions

View File

@@ -0,0 +1,8 @@
const handler = {
get(target, property) {
const value = target[property]
return value !== undefined ? value : 0
},
}
export const create = () => new Proxy({ __proto__: null }, handler)

View File

@@ -15,6 +15,7 @@ import {
pTimeout, pTimeout,
} from 'promise-toolbox' } from 'promise-toolbox'
import * as MultiCounter from './_MultiCounter'
import autoTransport from './transports/auto' import autoTransport from './transports/auto'
import coalesceCalls from './_coalesceCalls' import coalesceCalls from './_coalesceCalls'
import debug from './_debug' import debug from './_debug'
@@ -99,6 +100,7 @@ export class Xapi extends EventEmitter {
this._sessionId = undefined this._sessionId = undefined
this._status = DISCONNECTED this._status = DISCONNECTED
this._counter = MultiCounter.create()
this._debounce = opts.debounce ?? 200 this._debounce = opts.debounce ?? 200
this._objects = new Collection() this._objects = new Collection()
this._objectsByRef = { __proto__: null } this._objectsByRef = { __proto__: null }
@@ -773,6 +775,10 @@ export class Xapi extends EventEmitter {
this._objects.set(object.$id, object) this._objects.set(object.$id, object)
objectsByRef[ref] = object objectsByRef[ref] = object
if (prev === undefined) {
++this._counter[type]
}
if (type === 'pool') { if (type === 'pool') {
this._pool = object this._pool = object
@@ -785,10 +791,6 @@ export class Xapi extends EventEmitter {
} }
}) })
} else if (type === 'task') { } else if (type === 'task') {
if (prev === undefined) {
++this._nTasks
}
const taskWatchers = this._taskWatchers const taskWatchers = this._taskWatchers
const taskWatcher = taskWatchers[ref] const taskWatcher = taskWatchers[ref]
if (taskWatcher !== undefined) { if (taskWatcher !== undefined) {
@@ -820,6 +822,7 @@ export class Xapi extends EventEmitter {
} }
async _refreshCachedRecords(types) { async _refreshCachedRecords(types) {
const counter = this._counter
const toRemoveByType = { __proto__: null } const toRemoveByType = { __proto__: null }
types.forEach(type => { types.forEach(type => {
toRemoveByType[type] = new Set() toRemoveByType[type] = new Set()
@@ -851,8 +854,15 @@ export class Xapi extends EventEmitter {
this._removeRecordFromCache(type, ref) this._removeRecordFromCache(type, ref)
}) })
if (type === 'task') { const count = refs.length
this._nTasks = refs.length if (counter[type] !== count) {
console.warn(
'_refreshCachedRecords(%s): xapi=%d != local=%d',
type,
count,
counter[type]
)
counter[type] = count
} }
} catch (error) { } catch (error) {
// there is nothing ideal to do here, do not interrupt event // there is nothing ideal to do here, do not interrupt event
@@ -873,9 +883,7 @@ export class Xapi extends EventEmitter {
this._objects.unset(object.$id) this._objects.unset(object.$id)
delete byRefs[ref] delete byRefs[ref]
if (type === 'task') { --this._counter[type]
--this._nTasks
}
} }
const taskWatchers = this._taskWatchers const taskWatchers = this._taskWatchers
@@ -927,6 +935,16 @@ export class Xapi extends EventEmitter {
this._resolveObjectsFetched() this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined this._resolveObjectsFetched = undefined
const IGNORED_TYPES = {
__proto__: null,
message: true,
role: true,
session: true,
user: true,
VBD_metrics: true,
VIF_metrics: true,
}
// event loop // event loop
const debounce = this._debounce const debounce = this._debounce
while (true) { while (true) {
@@ -959,10 +977,25 @@ export class Xapi extends EventEmitter {
fromToken = result.token fromToken = result.token
this._processEvents(result.events) this._processEvents(result.events)
// detect and fix disappearing tasks (e.g. when toolstack restarts) // detect and fix desynchronized records
if (result.valid_ref_counts.task !== this._nTasks) { const localCounts = this._counter
await this._refreshCachedRecords(['task']) const xapiCounts = result.valid_ref_counts
} await this._refreshCachedRecords(
types.filter(type => {
if (type in IGNORED_TYPES) {
return false
}
// XAPI uses lowercased types in events, but this may change, so we
// handle both
let xapiCount = xapiCounts[type]
if (xapiCount === undefined) {
xapiCount = xapiCounts[type.toLowerCase()]
}
return localCounts[type] !== xapiCount
})
)
} }
} }
} }