|
|
|
|
@@ -12,7 +12,17 @@ import { synchronized } from 'decorator-synchronized'
|
|
|
|
|
import { Collection } from 'xo-collection'
|
|
|
|
|
import { EventEmitter } from 'events'
|
|
|
|
|
import { Index } from 'xo-collection/index'
|
|
|
|
|
import { cancelable, defer, fromCallback, fromEvents, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
|
|
|
|
|
import {
|
|
|
|
|
cancelable,
|
|
|
|
|
defer,
|
|
|
|
|
fromCallback,
|
|
|
|
|
fromEvents,
|
|
|
|
|
ignoreErrors,
|
|
|
|
|
pDelay,
|
|
|
|
|
pRetry,
|
|
|
|
|
pTimeout,
|
|
|
|
|
TimeoutError,
|
|
|
|
|
} from 'promise-toolbox'
|
|
|
|
|
import { limitConcurrency } from 'limit-concurrency-decorator'
|
|
|
|
|
|
|
|
|
|
import autoTransport from './transports/auto'
|
|
|
|
|
@@ -202,6 +212,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
const status = this._status
|
|
|
|
|
|
|
|
|
|
if (status === CONNECTED) {
|
|
|
|
|
console.log('already connected')
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -213,15 +224,20 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
console.log('will open session')
|
|
|
|
|
await this._sessionOpen()
|
|
|
|
|
|
|
|
|
|
console.log('session opened')
|
|
|
|
|
debug('%s: connected', this._humanId)
|
|
|
|
|
this._status = CONNECTED
|
|
|
|
|
this._resolveConnected()
|
|
|
|
|
this._resolveConnected !== undefined && this._resolveConnected()
|
|
|
|
|
this._resolveConnected = undefined
|
|
|
|
|
console.log('_resolveConnected done')
|
|
|
|
|
this.emit(CONNECTED)
|
|
|
|
|
console.log('_emited')
|
|
|
|
|
} catch (error) {
|
|
|
|
|
ignoreErrors.call(this.disconnect())
|
|
|
|
|
console.log('connect error', { error })
|
|
|
|
|
await ignoreErrors.call(this.disconnect())
|
|
|
|
|
|
|
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
@@ -231,21 +247,24 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
const status = this._status
|
|
|
|
|
|
|
|
|
|
if (status === DISCONNECTED) {
|
|
|
|
|
// console.log('disconnect- already disconected')
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (status === CONNECTED) {
|
|
|
|
|
// console.log('disconnect-already conected')
|
|
|
|
|
this._connected = new Promise(resolve => {
|
|
|
|
|
this._resolveConnected = resolve
|
|
|
|
|
})
|
|
|
|
|
} else {
|
|
|
|
|
// console.log('disconnect-connecting')
|
|
|
|
|
assert.strictEqual(status, CONNECTING)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const sessionId = this._sessionId
|
|
|
|
|
if (sessionId !== undefined) {
|
|
|
|
|
this._sessionId = undefined
|
|
|
|
|
ignoreErrors.call(this._call('session.logout', [sessionId]))
|
|
|
|
|
await ignoreErrors.call(this._call('session.logout', [sessionId]))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
debug('%s: disconnected', this._humanId)
|
|
|
|
|
@@ -678,12 +697,39 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
return this._addSyncStackTrace(watcher.promise)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async _checkIfConnected() {
|
|
|
|
|
try {
|
|
|
|
|
await this._transport('host.get_servertime', ['1', '1'])
|
|
|
|
|
// host is ok but with a broken call ? , nothing to do here
|
|
|
|
|
return true
|
|
|
|
|
} catch (error) {
|
|
|
|
|
if (error.code === 'SESSION_INVALID') {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
if (
|
|
|
|
|
[
|
|
|
|
|
'EHOSTUNREACH',
|
|
|
|
|
'HOST_IS_SLAVE',
|
|
|
|
|
'ENOTFOUND',
|
|
|
|
|
'ECONNRESET',
|
|
|
|
|
'HOST_OFFLINE',
|
|
|
|
|
'ETIMEDOUT',
|
|
|
|
|
'ECONNRESET',
|
|
|
|
|
].includes(error.code)
|
|
|
|
|
) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ===========================================================================
|
|
|
|
|
// Private
|
|
|
|
|
// ===========================================================================
|
|
|
|
|
|
|
|
|
|
@synchronized()
|
|
|
|
|
async _updateUrlFromFallbackAdresses(method, args, startTime) {
|
|
|
|
|
console.log('_updateUrlFromFallbackAdresses', method)
|
|
|
|
|
debug('%s: will try to find a new master', this._humanId)
|
|
|
|
|
const hostAddresses = this._fallBackAddresses
|
|
|
|
|
|
|
|
|
|
@@ -693,49 +739,92 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// another call to _updateUrlFromFallbackAdresses has already connected us to the right host
|
|
|
|
|
try {
|
|
|
|
|
await this._transport('session.get_auth_user_name', [])
|
|
|
|
|
debug('%s: host was already ok', this._humanId)
|
|
|
|
|
// host is ok, nothing to do here
|
|
|
|
|
if ((await this._checkIfConnected()) === true) {
|
|
|
|
|
console.log('already reconnected', method)
|
|
|
|
|
return true
|
|
|
|
|
} catch (error) {
|
|
|
|
|
if (!['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND', 'MESSAGE_PARAMETER_COUNT_MISMATCH'].includes(error.code)) {
|
|
|
|
|
throw this._augmentCallError(error, method, args, startTime)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const checkAndReconnect = async () => {
|
|
|
|
|
try {
|
|
|
|
|
console.log('check and reconnect', this._sessionId, this.status)
|
|
|
|
|
if (!(await this._checkIfConnected())) {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
//this._status = DISCONNECTED
|
|
|
|
|
//await this.connect()
|
|
|
|
|
// maybe it's in connecting mode and we just need to wait ?
|
|
|
|
|
return true
|
|
|
|
|
} catch (e) {
|
|
|
|
|
console.log(e)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (const hostAdress of hostAddresses) {
|
|
|
|
|
console.log('check ', hostAdress, hostAddresses)
|
|
|
|
|
// we already tested the current master
|
|
|
|
|
/* when retrying it can be a broken url
|
|
|
|
|
if (hostAdress === this._url.hostname) {
|
|
|
|
|
console.log('current master')
|
|
|
|
|
debug(`%s: don't recheck currrent host`, this._humanId)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// since this method is @syncrhonized , there can be only on call at once
|
|
|
|
|
}*/
|
|
|
|
|
// since this method is @synchronized , there can be only on call at once
|
|
|
|
|
this._setUrl({ ...this._url, hostname: hostAdress })
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await new Promise((resolve, reject) => {
|
|
|
|
|
let timeouted = false
|
|
|
|
|
|
|
|
|
|
this._transport('host.get_servertime', ['1', '1'])
|
|
|
|
|
.then(res => {
|
|
|
|
|
if (!timeouted) {
|
|
|
|
|
clearTimeout(timeout)
|
|
|
|
|
resolve(res)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.catch(e => {
|
|
|
|
|
clearTimeout(timeout)
|
|
|
|
|
reject(e)
|
|
|
|
|
})
|
|
|
|
|
const timeout = setTimeout(() => {
|
|
|
|
|
timeouted = true
|
|
|
|
|
const error = new Error()
|
|
|
|
|
error.code = 'ETIMEDOUT'
|
|
|
|
|
reject(error)
|
|
|
|
|
}, 5000)
|
|
|
|
|
})
|
|
|
|
|
// will not succeed without sessionId, but I only want to know if this host is entitled to answer (he's the master)
|
|
|
|
|
await this._transport('session.get_auth_user_name', [])
|
|
|
|
|
|
|
|
|
|
// success
|
|
|
|
|
return true
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.log(error.code)
|
|
|
|
|
// this host is also down, try the next one
|
|
|
|
|
if (error.code === 'EHOSTUNREACH' || error.code === 'ENOTFOUND') {
|
|
|
|
|
debug(`%s: slave %s is alsoa unreachable`, this._humanId, hostAdress)
|
|
|
|
|
if (
|
|
|
|
|
['EHOSTUNREACH', 'ENOTFOUND', 'ECONNRESET', 'HOST_OFFLINE', 'ETIMEDOUT', 'ECONNRESET'].includes(error.code)
|
|
|
|
|
) {
|
|
|
|
|
debug(`%s: slave %s is also unreachable`, this._humanId, hostAdress)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (error.code === 'HOST_IS_SLAVE') {
|
|
|
|
|
debug('%s: slave %s told us the master is %s', this._humanId, hostAdress, error.params[0])
|
|
|
|
|
this._setUrl({ ...this._url, hostname: hostAdress })
|
|
|
|
|
return true // we found the master no need to check for the other slave
|
|
|
|
|
// check if the master is alive
|
|
|
|
|
if (await checkAndReconnect()) {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (error.code === 'MESSAGE_PARAMETER_COUNT_MISMATCH') {
|
|
|
|
|
if (error.code === 'SESSION_INVALID') {
|
|
|
|
|
debug('%s: found new master %s master', this._humanId, hostAdress)
|
|
|
|
|
// lucky : we found the master with the first host answering
|
|
|
|
|
return true // we found the master no need to check for the other slave
|
|
|
|
|
// lucky : we found the master with the first host answering no need to check for the other slave
|
|
|
|
|
// also , no need to check
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// any other error should be reported
|
|
|
|
|
throw this._augmentCallError(error, method, args, startTime)
|
|
|
|
|
}
|
|
|
|
|
@@ -762,19 +851,29 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async _call(method, args, timeout = this._callTimeout(method, args)) {
|
|
|
|
|
// console.log(method)
|
|
|
|
|
const startTime = Date.now()
|
|
|
|
|
try {
|
|
|
|
|
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args)), timeout)
|
|
|
|
|
debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result))
|
|
|
|
|
// console.log(method, 'success')
|
|
|
|
|
return result
|
|
|
|
|
} catch (error) {
|
|
|
|
|
if (this._fallBackAddresses?.length > 1 && ['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND'].includes(error.code)) {
|
|
|
|
|
const updatedHostUrl = await this._updateUrlFromFallbackAdresses(method, args, startTime)
|
|
|
|
|
if (updatedHostUrl) {
|
|
|
|
|
// try again
|
|
|
|
|
return this._call(method, args, timeout)
|
|
|
|
|
}
|
|
|
|
|
if (
|
|
|
|
|
this._fallBackAddresses?.length > 1 &&
|
|
|
|
|
(['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND', 'ECONNRESET', 'HOST_OFFLINE'].includes(error.code) ||
|
|
|
|
|
error instanceof TimeoutError)
|
|
|
|
|
) {
|
|
|
|
|
const success = await this._updateUrlFromFallbackAdresses(method, args, startTime)
|
|
|
|
|
// try again whether it succeed or fails
|
|
|
|
|
// @todo should we update the first parameter ( session ID ) if we reconnected ?
|
|
|
|
|
// console.log('call again', method)
|
|
|
|
|
// _call is not synchronized
|
|
|
|
|
// that means that the _call order will be changed : we relaunch the method, but any other in the backlog
|
|
|
|
|
// may have been choosen to relaunch , messing the order
|
|
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
// console.log('will escape retry', method, error.code)
|
|
|
|
|
throw this._augmentCallError(error, method, args, startTime)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -889,6 +988,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
// it's not the same XAPI, we need to refetch the available types and reset
|
|
|
|
|
// the event loop in that case
|
|
|
|
|
if (this._pool.$ref !== oldPoolRef) {
|
|
|
|
|
// console.log('pool ref changed')
|
|
|
|
|
// Uses introspection to list available types.
|
|
|
|
|
const types = (this._types = (await this._interruptOnDisconnect(this._call('system.listMethods')))
|
|
|
|
|
.filter(isGetAllRecordsMethod)
|
|
|
|
|
@@ -1128,16 +1228,19 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
_watchEvents = coalesceCalls(this._watchEvents)
|
|
|
|
|
// eslint-disable-next-line no-dupe-class-members
|
|
|
|
|
async _watchEvents() {
|
|
|
|
|
// console.log('_watchEvents')
|
|
|
|
|
// eslint-disable-next-line no-labels
|
|
|
|
|
mainLoop: while (true) {
|
|
|
|
|
// console.log('mainloop')
|
|
|
|
|
if (this._resolveObjectsFetched === undefined) {
|
|
|
|
|
this._objectsFetched = new Promise(resolve => {
|
|
|
|
|
this._resolveObjectsFetched = resolve
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
// console.log('will await connection')
|
|
|
|
|
|
|
|
|
|
await this._connected
|
|
|
|
|
|
|
|
|
|
// console.log('connected')
|
|
|
|
|
// compute the initial token for the event loop
|
|
|
|
|
//
|
|
|
|
|
// we need to do this before the initial fetch to avoid losing events
|
|
|
|
|
@@ -1149,7 +1252,7 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
return this._watchEventsLegacy()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
console.warn('_watchEvents', error)
|
|
|
|
|
console.warn('_watchEvents sessionCall', error)
|
|
|
|
|
await pDelay(this._eventPollDelay)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
@@ -1165,14 +1268,16 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
// event loop
|
|
|
|
|
const debounce = this._debounce
|
|
|
|
|
while (true) {
|
|
|
|
|
// console.log('loop', this._debounce)
|
|
|
|
|
await pDelay(debounce)
|
|
|
|
|
|
|
|
|
|
await this._connected
|
|
|
|
|
|
|
|
|
|
// console.log('is connected')
|
|
|
|
|
let result
|
|
|
|
|
try {
|
|
|
|
|
// don't use _sessionCall because a session failure should break the
|
|
|
|
|
// loop and trigger a complete refetch
|
|
|
|
|
// console.log('will call event.from')
|
|
|
|
|
result = await this._call(
|
|
|
|
|
'event.from',
|
|
|
|
|
[
|
|
|
|
|
@@ -1183,30 +1288,34 @@ export class Xapi extends EventEmitter {
|
|
|
|
|
],
|
|
|
|
|
EVENT_TIMEOUT * 1e3 * 1.1
|
|
|
|
|
)
|
|
|
|
|
// console.log('got response event.from')
|
|
|
|
|
this._lastEventFetchedTimestamp = Date.now()
|
|
|
|
|
this._watchEventsError = undefined
|
|
|
|
|
this.emit('eventFetchingSuccess')
|
|
|
|
|
} catch (error) {
|
|
|
|
|
const code = error?.code
|
|
|
|
|
if (code === 'EVENTS_LOST' || code === 'SESSION_INVALID') {
|
|
|
|
|
console.warn('_watchEvents event.from', error.code)
|
|
|
|
|
if (
|
|
|
|
|
code === 'EVENTS_LOST' ||
|
|
|
|
|
code === 'SESSION_INVALID' ||
|
|
|
|
|
code === 'ECONNRESET' ||
|
|
|
|
|
code === 'ECONNREFUSED'
|
|
|
|
|
) {
|
|
|
|
|
// eslint-disable-next-line no-labels
|
|
|
|
|
// console.log(' continue main loop')
|
|
|
|
|
continue mainLoop
|
|
|
|
|
}
|
|
|
|
|
if (code === 'EHOSTUNREACH') {
|
|
|
|
|
// maybe the master has been demoted
|
|
|
|
|
await this._findCurrentMaster()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.emit('eventFetchingError', error)
|
|
|
|
|
this._watchEventsError = error
|
|
|
|
|
console.warn('_watchEvents', error)
|
|
|
|
|
await pDelay(this._eventPollDelay)
|
|
|
|
|
// console.log('continue loop')
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fromToken = result.token
|
|
|
|
|
this._processEvents(result.events)
|
|
|
|
|
|
|
|
|
|
// console.log('processed event ')
|
|
|
|
|
// detect and fix disappearing tasks (e.g. when toolstack restarts)
|
|
|
|
|
//
|
|
|
|
|
// FIXME: only if 'task' in 'types
|
|
|
|
|
|