Compare commits
1 Commits
correct-ne
...
xen-api-Si
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47732f7f5a |
142
packages/xen-api/src/_Signal.js
Normal file
142
packages/xen-api/src/_Signal.js
Normal file
@@ -0,0 +1,142 @@
|
||||
function request() {
|
||||
if (this._requested) {
|
||||
return
|
||||
}
|
||||
|
||||
this._requested = true
|
||||
|
||||
const resolve = this._resolve
|
||||
if (resolve !== undefined) {
|
||||
this._resolve = undefined
|
||||
resolve()
|
||||
}
|
||||
|
||||
const listeners = this._listeners
|
||||
if (listeners !== undefined) {
|
||||
this._listeners = undefined
|
||||
for (let i = 0, n = listeners.length; i < n; ++i) {
|
||||
listeners[i].call(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const INTERNAL = {}
|
||||
|
||||
function Source(signals) {
|
||||
const request_ = (this.request = request.bind(
|
||||
(this.signal = new Signal(INTERNAL))
|
||||
))
|
||||
|
||||
if (signals === undefined) {
|
||||
return
|
||||
}
|
||||
|
||||
const n = signals.length
|
||||
for (let i = 0; i < n; ++i) {
|
||||
if (signals[i].requested) {
|
||||
request_()
|
||||
return
|
||||
}
|
||||
}
|
||||
for (let i = 0; i < n; ++i) {
|
||||
signals[i].addListener(request_)
|
||||
}
|
||||
}
|
||||
|
||||
class Subscription {
|
||||
constructor(signal, listener) {
|
||||
this._listener = listener
|
||||
this._signal = signal
|
||||
}
|
||||
|
||||
get closed() {
|
||||
return this._signal === undefined
|
||||
}
|
||||
|
||||
unsubscribe() {
|
||||
const signal = this._signal
|
||||
if (signal !== undefined) {
|
||||
const listener = this._listener
|
||||
this._listener = this._signal = undefined
|
||||
|
||||
const listeners = signal._listeners
|
||||
if (listeners !== undefined) {
|
||||
const i = listeners.indexOf(listener)
|
||||
if (i !== -1) {
|
||||
listeners.splice(i, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const closedSubscription = new Subscription()
|
||||
|
||||
export default class Signal {
|
||||
static source(signals) {
|
||||
return new Source(signals)
|
||||
}
|
||||
|
||||
constructor(executor) {
|
||||
this._listeners = undefined
|
||||
this._promise = undefined
|
||||
this._requested = false
|
||||
this._resolve = undefined
|
||||
|
||||
if (executor !== INTERNAL) {
|
||||
executor(request.bind(this))
|
||||
}
|
||||
}
|
||||
|
||||
get description() {
|
||||
return this._description
|
||||
}
|
||||
|
||||
get requested() {
|
||||
return this._requested
|
||||
}
|
||||
|
||||
throwIfRequested() {
|
||||
if (this._requested) {
|
||||
throw new Error('this signal has been requested')
|
||||
}
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
// Promise like API
|
||||
// ===========================================================================
|
||||
|
||||
then(listener) {
|
||||
if (typeof listener !== 'function') {
|
||||
return this
|
||||
}
|
||||
|
||||
let promise = this._promise
|
||||
if (promise === undefined) {
|
||||
const requested = this._requested
|
||||
promise = this._promise = requested
|
||||
? Promise.resolve()
|
||||
: new Promise(resolve => {
|
||||
this._resolve = resolve
|
||||
})
|
||||
}
|
||||
return promise.then(listener)
|
||||
}
|
||||
|
||||
// ===========================================================================
|
||||
// Observable like API (but not compatible)
|
||||
// ===========================================================================
|
||||
|
||||
subscribe(listener) {
|
||||
if (this._requested) {
|
||||
listener.call(this)
|
||||
return closedSubscription
|
||||
}
|
||||
const listeners = this._listeners
|
||||
if (listeners === undefined) {
|
||||
this._listeners = [listener]
|
||||
} else {
|
||||
listeners.push(listener)
|
||||
}
|
||||
return new Subscription(this, listener)
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,7 @@ import isReadOnlyCall from './_isReadOnlyCall'
|
||||
import makeCallSetting from './_makeCallSetting'
|
||||
import parseUrl from './_parseUrl'
|
||||
import replaceSensitiveValues from './_replaceSensitiveValues'
|
||||
import Signal from './_Signal'
|
||||
import XapiError from './_XapiError'
|
||||
|
||||
// ===================================================================
|
||||
@@ -92,19 +93,15 @@ export class Xapi extends EventEmitter {
|
||||
this._allowUnauthorized = opts.allowUnauthorized
|
||||
this._setUrl(url)
|
||||
|
||||
this._connected = new Promise(resolve => {
|
||||
this._resolveConnected = resolve
|
||||
})
|
||||
this._disconnected = Promise.resolve()
|
||||
this._connected = Signal.source()
|
||||
this._disconnected = Signal.source()
|
||||
this._sessionId = undefined
|
||||
this._status = DISCONNECTED
|
||||
|
||||
this._debounce = opts.debounce ?? 200
|
||||
this._objects = new Collection()
|
||||
this._objectsByRef = { __proto__: null }
|
||||
this._objectsFetched = new Promise(resolve => {
|
||||
this._resolveObjectsFetched = resolve
|
||||
})
|
||||
this._objectsFetched = Signal.source()
|
||||
this._eventWatchers = { __proto__: null }
|
||||
this._taskWatchers = { __proto__: null }
|
||||
this._watchedTypes = undefined
|
||||
@@ -130,11 +127,11 @@ export class Xapi extends EventEmitter {
|
||||
// ===========================================================================
|
||||
|
||||
get connected() {
|
||||
return this._connected
|
||||
return this._connected.signal
|
||||
}
|
||||
|
||||
get disconnected() {
|
||||
return this._disconnected
|
||||
return this._disconnected.signal
|
||||
}
|
||||
|
||||
get pool() {
|
||||
@@ -161,9 +158,7 @@ export class Xapi extends EventEmitter {
|
||||
assert(status === DISCONNECTED)
|
||||
|
||||
this._status = CONNECTING
|
||||
this._disconnected = new Promise(resolve => {
|
||||
this._resolveDisconnected = resolve
|
||||
})
|
||||
this._disconnected = Signal.source()
|
||||
|
||||
try {
|
||||
await this._sessionOpen()
|
||||
@@ -186,8 +181,7 @@ export class Xapi extends EventEmitter {
|
||||
|
||||
debug('%s: connected', this._humanId)
|
||||
this._status = CONNECTED
|
||||
this._resolveConnected()
|
||||
this._resolveConnected = undefined
|
||||
this._connected.request()
|
||||
this.emit(CONNECTED)
|
||||
} catch (error) {
|
||||
ignoreErrors.call(this.disconnect())
|
||||
@@ -204,9 +198,7 @@ export class Xapi extends EventEmitter {
|
||||
}
|
||||
|
||||
if (status === CONNECTED) {
|
||||
this._connected = new Promise(resolve => {
|
||||
this._resolveConnected = resolve
|
||||
})
|
||||
this._connected = Signal.source()
|
||||
} else {
|
||||
assert(status === CONNECTING)
|
||||
}
|
||||
@@ -220,8 +212,7 @@ export class Xapi extends EventEmitter {
|
||||
debug('%s: disconnected', this._humanId)
|
||||
|
||||
this._status = DISCONNECTED
|
||||
this._resolveDisconnected()
|
||||
this._resolveDisconnected = undefined
|
||||
this._disconnected.request()
|
||||
this.emit(DISCONNECTED)
|
||||
}
|
||||
|
||||
@@ -672,12 +663,18 @@ export class Xapi extends EventEmitter {
|
||||
}
|
||||
|
||||
_interruptOnDisconnect(promise) {
|
||||
return Promise.race([
|
||||
promise,
|
||||
this._disconnected.then(() => {
|
||||
throw new Error('disconnected')
|
||||
}),
|
||||
])
|
||||
let subscription
|
||||
const pWrapper = new Promise((resolve, reject) => {
|
||||
subscription = this._disconnected.signal.subscribe(() => {
|
||||
reject(new Error('disconnected'))
|
||||
})
|
||||
promise.then(resolve, reject)
|
||||
})
|
||||
const clean = () => {
|
||||
subscription.unsubscribe()
|
||||
}
|
||||
pWrapper.then(clean, clean)
|
||||
return pWrapper
|
||||
}
|
||||
|
||||
async _sessionCall(method, args, timeout) {
|
||||
@@ -881,10 +878,8 @@ export class Xapi extends EventEmitter {
|
||||
async _watchEvents() {
|
||||
// eslint-disable-next-line no-labels
|
||||
mainLoop: while (true) {
|
||||
if (this._resolveObjectsFetched === undefined) {
|
||||
this._objectsFetched = new Promise(resolve => {
|
||||
this._resolveObjectsFetched = resolve
|
||||
})
|
||||
if (this._objectsFetched.signal.requested) {
|
||||
this._objectsFetched = Signal.source()
|
||||
}
|
||||
|
||||
await this._connected
|
||||
@@ -912,8 +907,7 @@ export class Xapi extends EventEmitter {
|
||||
|
||||
// initial fetch
|
||||
await this._refreshCachedRecords(types)
|
||||
this._resolveObjectsFetched()
|
||||
this._resolveObjectsFetched = undefined
|
||||
this._objectsFetched.request()
|
||||
|
||||
// event loop
|
||||
const debounce = this._debounce
|
||||
@@ -960,10 +954,8 @@ export class Xapi extends EventEmitter {
|
||||
//
|
||||
// It also has to manually get all objects first.
|
||||
async _watchEventsLegacy() {
|
||||
if (this._resolveObjectsFetched === undefined) {
|
||||
this._objectsFetched = new Promise(resolve => {
|
||||
this._resolveObjectsFetched = resolve
|
||||
})
|
||||
if (this._objectsFetched.signal.requested) {
|
||||
this._objectsFetched = Signal.source()
|
||||
}
|
||||
|
||||
await this._connected
|
||||
@@ -972,8 +964,7 @@ export class Xapi extends EventEmitter {
|
||||
|
||||
// initial fetch
|
||||
await this._refreshCachedRecords(types)
|
||||
this._resolveObjectsFetched()
|
||||
this._resolveObjectsFetched = undefined
|
||||
this._objectsFetched.request()
|
||||
|
||||
await this._sessionCall('event.register', [types])
|
||||
|
||||
|
||||
Reference in New Issue
Block a user