Compare commits

...

1 Commits

Author SHA1 Message Date
Florent Beauchamp
8932492b33 fix: identify and reconnect to master 2022-04-15 16:10:03 +02:00
2 changed files with 152 additions and 43 deletions

View File

@@ -12,7 +12,17 @@ import { synchronized } from 'decorator-synchronized'
import { Collection } from 'xo-collection'
import { EventEmitter } from 'events'
import { Index } from 'xo-collection/index'
import { cancelable, defer, fromCallback, fromEvents, ignoreErrors, pDelay, pRetry, pTimeout } from 'promise-toolbox'
import {
cancelable,
defer,
fromCallback,
fromEvents,
ignoreErrors,
pDelay,
pRetry,
pTimeout,
TimeoutError,
} from 'promise-toolbox'
import { limitConcurrency } from 'limit-concurrency-decorator'
import autoTransport from './transports/auto'
@@ -202,6 +212,7 @@ export class Xapi extends EventEmitter {
const status = this._status
if (status === CONNECTED) {
console.log('already connected')
return
}
@@ -213,15 +224,20 @@ export class Xapi extends EventEmitter {
})
try {
console.log('will open session')
await this._sessionOpen()
console.log('session opened')
debug('%s: connected', this._humanId)
this._status = CONNECTED
this._resolveConnected()
this._resolveConnected !== undefined && this._resolveConnected()
this._resolveConnected = undefined
console.log('_resolveConnected done')
this.emit(CONNECTED)
console.log('_emited')
} catch (error) {
ignoreErrors.call(this.disconnect())
console.log('connect error', { error })
await ignoreErrors.call(this.disconnect())
throw error
}
@@ -231,21 +247,24 @@ export class Xapi extends EventEmitter {
const status = this._status
if (status === DISCONNECTED) {
// console.log('disconnect- already disconected')
return
}
if (status === CONNECTED) {
// console.log('disconnect-already conected')
this._connected = new Promise(resolve => {
this._resolveConnected = resolve
})
} else {
// console.log('disconnect-connecting')
assert.strictEqual(status, CONNECTING)
}
const sessionId = this._sessionId
if (sessionId !== undefined) {
this._sessionId = undefined
ignoreErrors.call(this._call('session.logout', [sessionId]))
await ignoreErrors.call(this._call('session.logout', [sessionId]))
}
debug('%s: disconnected', this._humanId)
@@ -678,12 +697,39 @@ export class Xapi extends EventEmitter {
return this._addSyncStackTrace(watcher.promise)
}
async _checkIfConnected() {
try {
await this._transport('host.get_servertime', ['1', '1'])
// host is ok but with a broken call ? , nothing to do here
return true
} catch (error) {
if (error.code === 'SESSION_INVALID') {
return true
}
if (
[
'EHOSTUNREACH',
'HOST_IS_SLAVE',
'ENOTFOUND',
'ECONNRESET',
'HOST_OFFLINE',
'ETIMEDOUT',
'ECONNRESET',
].includes(error.code)
) {
return false
}
throw error
}
}
// ===========================================================================
// Private
// ===========================================================================
@synchronized()
async _updateUrlFromFallbackAdresses(method, args, startTime) {
console.log('_updateUrlFromFallbackAdresses', method)
debug('%s: will try to find a new master', this._humanId)
const hostAddresses = this._fallBackAddresses
@@ -693,49 +739,92 @@ export class Xapi extends EventEmitter {
}
// another call to _updateUrlFromFallbackAdresses has already connected us to the right host
try {
await this._transport('session.get_auth_user_name', [])
debug('%s: host was already ok', this._humanId)
// host is ok, nothing to do here
if ((await this._checkIfConnected()) === true) {
console.log('already reconnected', method)
return true
} catch (error) {
if (!['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND', 'MESSAGE_PARAMETER_COUNT_MISMATCH'].includes(error.code)) {
throw this._augmentCallError(error, method, args, startTime)
}
const checkAndReconnect = async () => {
try {
console.log('check and reconnect', this._sessionId, this.status)
if (!(await this._checkIfConnected())) {
return false
}
//this._status = DISCONNECTED
//await this.connect()
// maybe it's in connecting mode and we just need to wait ?
return true
} catch (e) {
console.log(e)
return false
}
}
for (const hostAdress of hostAddresses) {
console.log('check ', hostAdress, hostAddresses)
// we already tested the current master
/* when retrying it can be a broken url
if (hostAdress === this._url.hostname) {
console.log('current master')
debug(`%s: don't recheck currrent host`, this._humanId)
continue
}
// since this method is @syncrhonized , there can be only on call at once
}*/
// since this method is @synchronized , there can be only on call at once
this._setUrl({ ...this._url, hostname: hostAdress })
try {
await new Promise((resolve, reject) => {
let timeouted = false
this._transport('host.get_servertime', ['1', '1'])
.then(res => {
if (!timeouted) {
clearTimeout(timeout)
resolve(res)
}
})
.catch(e => {
clearTimeout(timeout)
reject(e)
})
const timeout = setTimeout(() => {
timeouted = true
const error = new Error()
error.code = 'ETIMEDOUT'
reject(error)
}, 5000)
})
// will not succeed without sessionId, but I only want to know if this host is entitled to answer (he's the master)
await this._transport('session.get_auth_user_name', [])
// success
return true
} catch (error) {
console.log(error.code)
// this host is also down, try the next one
if (error.code === 'EHOSTUNREACH' || error.code === 'ENOTFOUND') {
debug(`%s: slave %s is alsoa unreachable`, this._humanId, hostAdress)
if (
['EHOSTUNREACH', 'ENOTFOUND', 'ECONNRESET', 'HOST_OFFLINE', 'ETIMEDOUT', 'ECONNRESET'].includes(error.code)
) {
debug(`%s: slave %s is also unreachable`, this._humanId, hostAdress)
continue
}
if (error.code === 'HOST_IS_SLAVE') {
debug('%s: slave %s told us the master is %s', this._humanId, hostAdress, error.params[0])
this._setUrl({ ...this._url, hostname: hostAdress })
return true // we found the master no need to check for the other slave
// check if the master is alive
if (await checkAndReconnect()) {
return true
}
break
}
if (error.code === 'MESSAGE_PARAMETER_COUNT_MISMATCH') {
if (error.code === 'SESSION_INVALID') {
debug('%s: found new master %s master', this._humanId, hostAdress)
// lucky : we found the master with the first host answering
return true // we found the master no need to check for the other slave
// lucky : we found the master with the first host answering no need to check for the other slave
// also , no need to check
return true
}
// any other error should be reported
throw this._augmentCallError(error, method, args, startTime)
}
@@ -762,19 +851,29 @@ export class Xapi extends EventEmitter {
}
async _call(method, args, timeout = this._callTimeout(method, args)) {
// console.log(method)
const startTime = Date.now()
try {
const result = await pTimeout.call(this._addSyncStackTrace(this._transport(method, args)), timeout)
debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result))
// console.log(method, 'success')
return result
} catch (error) {
if (this._fallBackAddresses?.length > 1 && ['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND'].includes(error.code)) {
const updatedHostUrl = await this._updateUrlFromFallbackAdresses(method, args, startTime)
if (updatedHostUrl) {
// try again
return this._call(method, args, timeout)
}
if (
this._fallBackAddresses?.length > 1 &&
(['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND', 'ECONNRESET', 'HOST_OFFLINE'].includes(error.code) ||
error instanceof TimeoutError)
) {
const success = await this._updateUrlFromFallbackAdresses(method, args, startTime)
// try again whether it succeed or fails
// @todo should we update the first parameter ( session ID ) if we reconnected ?
// console.log('call again', method)
// _call is not synchronized
// that means that the _call order will be changed : we relaunch the method, but any other in the backlog
// may have been choosen to relaunch , messing the order
throw error
}
// console.log('will escape retry', method, error.code)
throw this._augmentCallError(error, method, args, startTime)
}
}
@@ -889,6 +988,7 @@ export class Xapi extends EventEmitter {
// it's not the same XAPI, we need to refetch the available types and reset
// the event loop in that case
if (this._pool.$ref !== oldPoolRef) {
// console.log('pool ref changed')
// Uses introspection to list available types.
const types = (this._types = (await this._interruptOnDisconnect(this._call('system.listMethods')))
.filter(isGetAllRecordsMethod)
@@ -1128,16 +1228,19 @@ export class Xapi extends EventEmitter {
_watchEvents = coalesceCalls(this._watchEvents)
// eslint-disable-next-line no-dupe-class-members
async _watchEvents() {
// console.log('_watchEvents')
// eslint-disable-next-line no-labels
mainLoop: while (true) {
// console.log('mainloop')
if (this._resolveObjectsFetched === undefined) {
this._objectsFetched = new Promise(resolve => {
this._resolveObjectsFetched = resolve
})
}
// console.log('will await connection')
await this._connected
// console.log('connected')
// compute the initial token for the event loop
//
// we need to do this before the initial fetch to avoid losing events
@@ -1149,7 +1252,7 @@ export class Xapi extends EventEmitter {
return this._watchEventsLegacy()
}
console.warn('_watchEvents', error)
console.warn('_watchEvents sessionCall', error)
await pDelay(this._eventPollDelay)
continue
}
@@ -1165,14 +1268,16 @@ export class Xapi extends EventEmitter {
// event loop
const debounce = this._debounce
while (true) {
// console.log('loop', this._debounce)
await pDelay(debounce)
await this._connected
// console.log('is connected')
let result
try {
// don't use _sessionCall because a session failure should break the
// loop and trigger a complete refetch
// console.log('will call event.from')
result = await this._call(
'event.from',
[
@@ -1183,30 +1288,34 @@ export class Xapi extends EventEmitter {
],
EVENT_TIMEOUT * 1e3 * 1.1
)
// console.log('got response event.from')
this._lastEventFetchedTimestamp = Date.now()
this._watchEventsError = undefined
this.emit('eventFetchingSuccess')
} catch (error) {
const code = error?.code
if (code === 'EVENTS_LOST' || code === 'SESSION_INVALID') {
console.warn('_watchEvents event.from', error.code)
if (
code === 'EVENTS_LOST' ||
code === 'SESSION_INVALID' ||
code === 'ECONNRESET' ||
code === 'ECONNREFUSED'
) {
// eslint-disable-next-line no-labels
// console.log(' continue main loop')
continue mainLoop
}
if (code === 'EHOSTUNREACH') {
// maybe the master has been demoted
await this._findCurrentMaster()
}
this.emit('eventFetchingError', error)
this._watchEventsError = error
console.warn('_watchEvents', error)
await pDelay(this._eventPollDelay)
// console.log('continue loop')
continue
}
fromToken = result.token
this._processEvents(result.events)
// console.log('processed event ')
// detect and fix disappearing tasks (e.g. when toolstack restarts)
//
// FIXME: only if 'task' in 'types

View File

@@ -14,7 +14,7 @@ import * as XenStore from '../_XenStore.mjs'
import Xapi from '../xapi/index.mjs'
import xapiObjectToXo from '../xapi-object-to-xo.mjs'
import XapiStats from '../xapi-stats.mjs'
import { camelToSnakeCase, forEach, isEmpty, noop, popProperty } from '../utils.mjs'
import { camelToSnakeCase, forEach, isEmpty, popProperty } from '../utils.mjs'
import { Servers } from '../models/server.mjs'
// ===================================================================
@@ -177,7 +177,7 @@ export default class {
server.set('httpProxy', httpProxy === null ? undefined : httpProxy)
}
if (fallbackAddresses !== undefined) {
server.set('fallBackAddresses', fallbackAddresses === null ? undefined : JSON.stringify(fallbackAddresses))
server.set('fallBackAddresses', fallbackAddresses)
}
await this._servers.update(server)
}
@@ -304,7 +304,7 @@ export default class {
const hosts = await xapi.getAllRecords('host')
const fallbackAddresses = hosts.map(({ address }) => address)
for (const host of hosts) {
await this.updateXenServer(host.id, { fallbackAddresses })
await this.updateXenServer(host.uuid, { fallbackAddresses: JSON.stringify(fallbackAddresses) })
}
}
@@ -355,8 +355,8 @@ export default class {
if (serverIdsByPool[poolId] !== undefined) {
throw new PoolAlreadyConnected(poolId, serverIdsByPool[poolId], server.id)
}
await this._updateFallBackAdresses(xapi).catch(noop)
// @todo check id servers collection is ready here, no such object error on uuid
// await this._updateFallBackAdresses(xapi).catch(noop)
serverIdsByPool[poolId] = server.id
@@ -490,8 +490,8 @@ export default class {
// when the data are loaded, update the fall back adresses of all the hosts of this pool
xapi.once('eventFetchedSuccess', async function eventFetchedSuccessListener() {
const hosts = await this.getAllRecords('host')
const hostAddresses = hosts.map(({ address }) => address).join(';')
await xo.updateXenServer(server.id, { fallbackAddresses: hostAddresses })
const hostAddresses = hosts.map(({ address }) => address)
await xo.updateXenServer(server.id, { fallbackAddresses: JSON.stringify(hostAddresses) })
})
xapi.once('eventFetchingError', function eventFetchingErrorListener() {