Compare commits

...

1 Commits

Author SHA1 Message Date
Julien Fontanet
47732f7f5a fix(xen-api): remove Promise.race 2019-04-04 11:57:43 +02:00
2 changed files with 170 additions and 37 deletions

View File

@@ -0,0 +1,142 @@
function request() {
if (this._requested) {
return
}
this._requested = true
const resolve = this._resolve
if (resolve !== undefined) {
this._resolve = undefined
resolve()
}
const listeners = this._listeners
if (listeners !== undefined) {
this._listeners = undefined
for (let i = 0, n = listeners.length; i < n; ++i) {
listeners[i].call(this)
}
}
}
const INTERNAL = {}
function Source(signals) {
const request_ = (this.request = request.bind(
(this.signal = new Signal(INTERNAL))
))
if (signals === undefined) {
return
}
const n = signals.length
for (let i = 0; i < n; ++i) {
if (signals[i].requested) {
request_()
return
}
}
for (let i = 0; i < n; ++i) {
signals[i].addListener(request_)
}
}
class Subscription {
constructor(signal, listener) {
this._listener = listener
this._signal = signal
}
get closed() {
return this._signal === undefined
}
unsubscribe() {
const signal = this._signal
if (signal !== undefined) {
const listener = this._listener
this._listener = this._signal = undefined
const listeners = signal._listeners
if (listeners !== undefined) {
const i = listeners.indexOf(listener)
if (i !== -1) {
listeners.splice(i, 1)
}
}
}
}
}
const closedSubscription = new Subscription()
export default class Signal {
static source(signals) {
return new Source(signals)
}
constructor(executor) {
this._listeners = undefined
this._promise = undefined
this._requested = false
this._resolve = undefined
if (executor !== INTERNAL) {
executor(request.bind(this))
}
}
get description() {
return this._description
}
get requested() {
return this._requested
}
throwIfRequested() {
if (this._requested) {
throw new Error('this signal has been requested')
}
}
// ===========================================================================
// Promise like API
// ===========================================================================
then(listener) {
if (typeof listener !== 'function') {
return this
}
let promise = this._promise
if (promise === undefined) {
const requested = this._requested
promise = this._promise = requested
? Promise.resolve()
: new Promise(resolve => {
this._resolve = resolve
})
}
return promise.then(listener)
}
// ===========================================================================
// Observable like API (but not compatible)
// ===========================================================================
subscribe(listener) {
if (this._requested) {
listener.call(this)
return closedSubscription
}
const listeners = this._listeners
if (listeners === undefined) {
this._listeners = [listener]
} else {
listeners.push(listener)
}
return new Subscription(this, listener)
}
}

View File

@@ -25,6 +25,7 @@ import isReadOnlyCall from './_isReadOnlyCall'
import makeCallSetting from './_makeCallSetting'
import parseUrl from './_parseUrl'
import replaceSensitiveValues from './_replaceSensitiveValues'
import Signal from './_Signal'
import XapiError from './_XapiError'
// ===================================================================
@@ -92,19 +93,15 @@ export class Xapi extends EventEmitter {
this._allowUnauthorized = opts.allowUnauthorized
this._setUrl(url)
this._connected = new Promise(resolve => {
this._resolveConnected = resolve
})
this._disconnected = Promise.resolve()
this._connected = Signal.source()
this._disconnected = Signal.source()
this._sessionId = undefined
this._status = DISCONNECTED
this._debounce = opts.debounce ?? 200
this._objects = new Collection()
this._objectsByRef = { __proto__: null }
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
this._objectsFetched = Signal.source()
this._eventWatchers = { __proto__: null }
this._taskWatchers = { __proto__: null }
this._watchedTypes = undefined
@@ -130,11 +127,11 @@ export class Xapi extends EventEmitter {
// ===========================================================================
get connected() {
return this._connected
return this._connected.signal
}
get disconnected() {
return this._disconnected
return this._disconnected.signal
}
get pool() {
@@ -161,9 +158,7 @@ export class Xapi extends EventEmitter {
assert(status === DISCONNECTED)
this._status = CONNECTING
this._disconnected = new Promise(resolve => {
this._resolveDisconnected = resolve
})
this._disconnected = Signal.source()
try {
await this._sessionOpen()
@@ -186,8 +181,7 @@ export class Xapi extends EventEmitter {
debug('%s: connected', this._humanId)
this._status = CONNECTED
this._resolveConnected()
this._resolveConnected = undefined
this._connected.request()
this.emit(CONNECTED)
} catch (error) {
ignoreErrors.call(this.disconnect())
@@ -204,9 +198,7 @@ export class Xapi extends EventEmitter {
}
if (status === CONNECTED) {
this._connected = new Promise(resolve => {
this._resolveConnected = resolve
})
this._connected = Signal.source()
} else {
assert(status === CONNECTING)
}
@@ -220,8 +212,7 @@ export class Xapi extends EventEmitter {
debug('%s: disconnected', this._humanId)
this._status = DISCONNECTED
this._resolveDisconnected()
this._resolveDisconnected = undefined
this._disconnected.request()
this.emit(DISCONNECTED)
}
@@ -672,12 +663,18 @@ export class Xapi extends EventEmitter {
}
_interruptOnDisconnect(promise) {
return Promise.race([
promise,
this._disconnected.then(() => {
throw new Error('disconnected')
}),
])
let subscription
const pWrapper = new Promise((resolve, reject) => {
subscription = this._disconnected.signal.subscribe(() => {
reject(new Error('disconnected'))
})
promise.then(resolve, reject)
})
const clean = () => {
subscription.unsubscribe()
}
pWrapper.then(clean, clean)
return pWrapper
}
async _sessionCall(method, args, timeout) {
@@ -881,10 +878,8 @@ export class Xapi extends EventEmitter {
async _watchEvents() {
// eslint-disable-next-line no-labels
mainLoop: while (true) {
if (this._resolveObjectsFetched === undefined) {
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
if (this._objectsFetched.signal.requested) {
this._objectsFetched = Signal.source()
}
await this._connected
@@ -912,8 +907,7 @@ export class Xapi extends EventEmitter {
// initial fetch
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
this._objectsFetched.request()
// event loop
const debounce = this._debounce
@@ -960,10 +954,8 @@ export class Xapi extends EventEmitter {
//
// It also has to manually get all objects first.
async _watchEventsLegacy() {
if (this._resolveObjectsFetched === undefined) {
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
if (this._objectsFetched.signal.requested) {
this._objectsFetched = Signal.source()
}
await this._connected
@@ -972,8 +964,7 @@ export class Xapi extends EventEmitter {
// initial fetch
await this._refreshCachedRecords(types)
this._resolveObjectsFetched()
this._resolveObjectsFetched = undefined
this._objectsFetched.request()
await this._sessionCall('event.register', [types])