Compare commits
1 Commits
refactor_b
...
xen-api-ev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3facbcda99 |
8
packages/xen-api/src/_MultiCounter.js
Normal file
8
packages/xen-api/src/_MultiCounter.js
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
const handler = {
|
||||||
|
get(target, property) {
|
||||||
|
const value = target[property]
|
||||||
|
return value !== undefined ? value : 0
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
export const create = () => new Proxy({ __proto__: null }, handler)
|
||||||
@@ -15,6 +15,7 @@ import {
|
|||||||
pTimeout,
|
pTimeout,
|
||||||
} from 'promise-toolbox'
|
} from 'promise-toolbox'
|
||||||
|
|
||||||
|
import * as MultiCounter from './_MultiCounter'
|
||||||
import autoTransport from './transports/auto'
|
import autoTransport from './transports/auto'
|
||||||
import coalesceCalls from './_coalesceCalls'
|
import coalesceCalls from './_coalesceCalls'
|
||||||
import debug from './_debug'
|
import debug from './_debug'
|
||||||
@@ -99,6 +100,7 @@ export class Xapi extends EventEmitter {
|
|||||||
this._sessionId = undefined
|
this._sessionId = undefined
|
||||||
this._status = DISCONNECTED
|
this._status = DISCONNECTED
|
||||||
|
|
||||||
|
this._counter = MultiCounter.create()
|
||||||
this._debounce = opts.debounce ?? 200
|
this._debounce = opts.debounce ?? 200
|
||||||
this._objects = new Collection()
|
this._objects = new Collection()
|
||||||
this._objectsByRef = { __proto__: null }
|
this._objectsByRef = { __proto__: null }
|
||||||
@@ -773,6 +775,10 @@ export class Xapi extends EventEmitter {
|
|||||||
this._objects.set(object.$id, object)
|
this._objects.set(object.$id, object)
|
||||||
objectsByRef[ref] = object
|
objectsByRef[ref] = object
|
||||||
|
|
||||||
|
if (prev === undefined) {
|
||||||
|
++this._counter[type]
|
||||||
|
}
|
||||||
|
|
||||||
if (type === 'pool') {
|
if (type === 'pool') {
|
||||||
this._pool = object
|
this._pool = object
|
||||||
|
|
||||||
@@ -785,10 +791,6 @@ export class Xapi extends EventEmitter {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
} else if (type === 'task') {
|
} else if (type === 'task') {
|
||||||
if (prev === undefined) {
|
|
||||||
++this._nTasks
|
|
||||||
}
|
|
||||||
|
|
||||||
const taskWatchers = this._taskWatchers
|
const taskWatchers = this._taskWatchers
|
||||||
const taskWatcher = taskWatchers[ref]
|
const taskWatcher = taskWatchers[ref]
|
||||||
if (taskWatcher !== undefined) {
|
if (taskWatcher !== undefined) {
|
||||||
@@ -820,6 +822,7 @@ export class Xapi extends EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async _refreshCachedRecords(types) {
|
async _refreshCachedRecords(types) {
|
||||||
|
const counter = this._counter
|
||||||
const toRemoveByType = { __proto__: null }
|
const toRemoveByType = { __proto__: null }
|
||||||
types.forEach(type => {
|
types.forEach(type => {
|
||||||
toRemoveByType[type] = new Set()
|
toRemoveByType[type] = new Set()
|
||||||
@@ -851,8 +854,15 @@ export class Xapi extends EventEmitter {
|
|||||||
this._removeRecordFromCache(type, ref)
|
this._removeRecordFromCache(type, ref)
|
||||||
})
|
})
|
||||||
|
|
||||||
if (type === 'task') {
|
const count = refs.length
|
||||||
this._nTasks = refs.length
|
if (counter[type] !== count) {
|
||||||
|
console.warn(
|
||||||
|
'_refreshCachedRecords(%s): xapi=%d != local=%d',
|
||||||
|
type,
|
||||||
|
count,
|
||||||
|
counter[type]
|
||||||
|
)
|
||||||
|
counter[type] = count
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// there is nothing ideal to do here, do not interrupt event
|
// there is nothing ideal to do here, do not interrupt event
|
||||||
@@ -873,9 +883,7 @@ export class Xapi extends EventEmitter {
|
|||||||
this._objects.unset(object.$id)
|
this._objects.unset(object.$id)
|
||||||
delete byRefs[ref]
|
delete byRefs[ref]
|
||||||
|
|
||||||
if (type === 'task') {
|
--this._counter[type]
|
||||||
--this._nTasks
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const taskWatchers = this._taskWatchers
|
const taskWatchers = this._taskWatchers
|
||||||
@@ -927,6 +935,16 @@ export class Xapi extends EventEmitter {
|
|||||||
this._resolveObjectsFetched()
|
this._resolveObjectsFetched()
|
||||||
this._resolveObjectsFetched = undefined
|
this._resolveObjectsFetched = undefined
|
||||||
|
|
||||||
|
const IGNORED_TYPES = {
|
||||||
|
__proto__: null,
|
||||||
|
message: true,
|
||||||
|
role: true,
|
||||||
|
session: true,
|
||||||
|
user: true,
|
||||||
|
VBD_metrics: true,
|
||||||
|
VIF_metrics: true,
|
||||||
|
}
|
||||||
|
|
||||||
// event loop
|
// event loop
|
||||||
const debounce = this._debounce
|
const debounce = this._debounce
|
||||||
while (true) {
|
while (true) {
|
||||||
@@ -959,10 +977,25 @@ export class Xapi extends EventEmitter {
|
|||||||
fromToken = result.token
|
fromToken = result.token
|
||||||
this._processEvents(result.events)
|
this._processEvents(result.events)
|
||||||
|
|
||||||
// detect and fix disappearing tasks (e.g. when toolstack restarts)
|
// detect and fix desynchronized records
|
||||||
if (result.valid_ref_counts.task !== this._nTasks) {
|
const localCounts = this._counter
|
||||||
await this._refreshCachedRecords(['task'])
|
const xapiCounts = result.valid_ref_counts
|
||||||
}
|
await this._refreshCachedRecords(
|
||||||
|
types.filter(type => {
|
||||||
|
if (type in IGNORED_TYPES) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// XAPI uses lowercased types in events, but this may change, so we
|
||||||
|
// handle both
|
||||||
|
let xapiCount = xapiCounts[type]
|
||||||
|
if (xapiCount === undefined) {
|
||||||
|
xapiCount = xapiCounts[type.toLowerCase()]
|
||||||
|
}
|
||||||
|
|
||||||
|
return localCounts[type] !== xapiCount
|
||||||
|
})
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user