From 6b8a3452418613868eac84f70fc9deee9209777a Mon Sep 17 00:00:00 2001 From: Florent BEAUCHAMP Date: Fri, 8 Apr 2022 11:11:20 +0200 Subject: [PATCH] feat(xen-api): implement fallback addresses (#6061) --- packages/xen-api/src/cli.js | 7 +- packages/xen-api/src/index.js | 116 ++++++++++++++---- packages/xo-server/src/api/server.mjs | 4 + .../xo-server/src/xo-mixins/xen-servers.mjs | 31 ++++- 4 files changed, 133 insertions(+), 25 deletions(-) diff --git a/packages/xen-api/src/cli.js b/packages/xen-api/src/cli.js index 3b1347a0a..01572d6a3 100755 --- a/packages/xen-api/src/cli.js +++ b/packages/xen-api/src/cli.js @@ -44,7 +44,7 @@ const usage = 'Usage: xen-api [ []]' async function main(createClient) { const opts = minimist(process.argv.slice(2), { - string: ['session-id'], + string: ['session-id', 'fallback-addresses'], boolean: ['allow-unauthorized', 'help', 'read-only', 'verbose'], alias: { @@ -82,6 +82,10 @@ async function main(createClient) { }) } + let fallBackAddresses = [] + if (opts['fallback-addresses']) { + fallBackAddresses = opts['fallback-addresses'].split(',') + } const xapi = createClient({ url: opts._[0], allowUnauthorized: opts.au, @@ -89,6 +93,7 @@ async function main(createClient) { debounce: opts.debounce != null ? +opts.debounce : null, readOnly: opts.ro, syncStackTraces: true, + fallBackAddresses, }) await xapi.connect() diff --git a/packages/xen-api/src/index.js b/packages/xen-api/src/index.js index 7c76b364b..990b28b5a 100644 --- a/packages/xen-api/src/index.js +++ b/packages/xen-api/src/index.js @@ -8,6 +8,7 @@ import noop from 'lodash/noop' import omit from 'lodash/omit' import ProxyAgent from 'proxy-agent' import { coalesceCalls } from '@vates/coalesce-calls' +import { synchronized } from 'decorator-synchronized' import { Collection } from 'xo-collection' import { EventEmitter } from 'events' import { Index } from 'xo-collection/index' @@ -120,7 +121,7 @@ export class Xapi extends EventEmitter { delete url.username delete url.password } - + this._fallBackAddresses = opts.fallBackAddresses ?? [] this._allowUnauthorized = opts.allowUnauthorized if (opts.httpProxy !== undefined) { this._httpAgent = new ProxyAgent(this._httpProxy) @@ -681,6 +682,85 @@ export class Xapi extends EventEmitter { // Private // =========================================================================== + @synchronized() + async _updateUrlFromFallbackAdresses(method, args, startTime) { + debug('%s: will try to find a new master', this._humanId) + const hostAddresses = this._fallBackAddresses + + // there can be a master only if there is more than 1 host + if (hostAddresses.length < 2) { + return false + } + + // another call to _updateUrlFromFallbackAdresses has already connected us to the right host + try { + await this._transport('session.get_auth_user_name', []) + debug('%s: host was already ok', this._humanId) + // host is ok, nothing to do here + return true + } catch (error) { + if (!['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND', 'MESSAGE_PARAMETER_COUNT_MISMATCH'].includes(error.code)) { + throw this._augmentCallError(error, method, args, startTime) + } + } + + for (const hostAdress of hostAddresses) { + // we already tested the current master + if (hostAdress === this._url.hostname) { + debug(`%s: don't recheck currrent host`, this._humanId) + continue + } + // since this method is @syncrhonized , there can be only on call at once + this._setUrl({ ...this._url, hostname: hostAdress }) + + try { + // will not succeed without sessionId, but I only want to know if this host is entitled to answer (he's the master) + await this._transport('session.get_auth_user_name', []) + // success + return true + } catch (error) { + // this host is also down, try the next one + if (error.code === 'EHOSTUNREACH' || error.code === 'ENOTFOUND') { + debug(`%s: slave %s is alsoa unreachable`, this._humanId, hostAdress) + continue + } + + if (error.code === 'HOST_IS_SLAVE') { + debug('%s: slave %s told us the master is %s', this._humanId, hostAdress, error.params[0]) + this._setUrl({ ...this._url, hostname: hostAdress }) + return true // we found the master no need to check for the other slave + } + + if (error.code === 'MESSAGE_PARAMETER_COUNT_MISMATCH') { + debug('%s: found new master %s master', this._humanId, hostAdress) + // lucky : we found the master with the first host answering + return true // we found the master no need to check for the other slave + } + // any other error should be reported + throw this._augmentCallError(error, method, args, startTime) + } + } + return false + } + + _augmentCallError(error, method, args, startTime) { + // do not log the session ID + // + // TODO: should log at the session level to avoid logging sensitive + // values? + const params = args[0] === this._sessionId ? args.slice(1) : args + + error.call = { + method, + params: + // it pass server's credentials as param + method === 'session.login_with_password' ? '* obfuscated *' : replaceSensitiveValues(params, '* obfuscated *'), + } + + debug('%s: %s(...) [%s] =!> %s', this._humanId, method, ms(Date.now() - startTime), error) + return error + } + async _call(method, args, timeout = this._callTimeout(method, args)) { const startTime = Date.now() try { @@ -688,24 +768,14 @@ export class Xapi extends EventEmitter { debug('%s: %s(...) [%s] ==> %s', this._humanId, method, ms(Date.now() - startTime), kindOf(result)) return result } catch (error) { - // do not log the session ID - // - // TODO: should log at the session level to avoid logging sensitive - // values? - const params = args[0] === this._sessionId ? args.slice(1) : args - - error.call = { - method, - params: - // it pass server's credentials as param - method === 'session.login_with_password' - ? '* obfuscated *' - : replaceSensitiveValues(params, '* obfuscated *'), + if (this._fallBackAddresses?.length > 1 && ['EHOSTUNREACH', 'HOST_IS_SLAVE', 'ENOTFOUND'].includes(error.code)) { + const updatedHostUrl = await this._updateUrlFromFallbackAdresses(method, args, startTime) + if (updatedHostUrl) { + // try again + return this._call(method, args, timeout) + } } - - debug('%s: %s(...) [%s] =!> %s', this._humanId, method, ms(Date.now() - startTime), error) - - throw error + throw this._augmentCallError(error, method, args, startTime) } } @@ -782,8 +852,8 @@ export class Xapi extends EventEmitter { // unnecessary renewal _sessionOpenRetryOptions = { tries: 2, - when: { code: 'HOST_IS_SLAVE' }, - onRetry: error => { + when: [{ code: 'HOST_IS_SLAVE' }], + onRetry: async error => { this._setUrl({ ...this._url, hostname: error.params[0] }) }, } @@ -1050,7 +1120,6 @@ export class Xapi extends EventEmitter { delete taskWatchers[ref] } } - // read-only call, automatically retry in case of connection issues _roCall(method, args) { return pRetry(() => this._sessionCall(method, args), this._roCallRetryOptions) @@ -1091,6 +1160,7 @@ export class Xapi extends EventEmitter { await this._refreshCachedRecords(types) this._resolveObjectsFetched() this._resolveObjectsFetched = undefined + this.emit('eventFetchedSuccess') // event loop const debounce = this._debounce @@ -1122,6 +1192,10 @@ export class Xapi extends EventEmitter { // eslint-disable-next-line no-labels continue mainLoop } + if (code === 'EHOSTUNREACH') { + // maybe the master has been demoted + await this._findCurrentMaster() + } this.emit('eventFetchingError', error) this._watchEventsError = error diff --git a/packages/xo-server/src/api/server.mjs b/packages/xo-server/src/api/server.mjs index 194f32403..8420035b4 100644 --- a/packages/xo-server/src/api/server.mjs +++ b/packages/xo-server/src/api/server.mjs @@ -112,6 +112,10 @@ set.params = { optional: true, type: ['string', 'null'], }, + fallbackAddresses: { + type: ['string', 'null'], + optional: true, + }, } // ------------------------------------------------------------------- diff --git a/packages/xo-server/src/xo-mixins/xen-servers.mjs b/packages/xo-server/src/xo-mixins/xen-servers.mjs index ec2191c30..90be78888 100644 --- a/packages/xo-server/src/xo-mixins/xen-servers.mjs +++ b/packages/xo-server/src/xo-mixins/xen-servers.mjs @@ -14,7 +14,7 @@ import * as XenStore from '../_XenStore.mjs' import Xapi from '../xapi/index.mjs' import xapiObjectToXo from '../xapi-object-to-xo.mjs' import XapiStats from '../xapi-stats.mjs' -import { camelToSnakeCase, forEach, isEmpty, popProperty } from '../utils.mjs' +import { camelToSnakeCase, forEach, isEmpty, noop, popProperty } from '../utils.mjs' import { Servers } from '../models/server.mjs' // =================================================================== @@ -133,7 +133,7 @@ export default class { async updateXenServer( id, - { allowUnauthorized, enabled, error, host, label, password, readOnly, username, httpProxy } + { allowUnauthorized, enabled, error, host, label, password, readOnly, username, httpProxy, fallbackAddresses } ) { const server = await this._getXenServer(id) const xapi = this._xapis[id] @@ -176,6 +176,9 @@ export default class { // if value is null, pass undefined to the model , so it will delete this optionnal property from the Server object server.set('httpProxy', httpProxy === null ? undefined : httpProxy) } + if (fallbackAddresses !== undefined) { + server.set('fallBackAddresses', fallbackAddresses === null ? undefined : JSON.stringify(fallbackAddresses)) + } await this._servers.update(server) } @@ -297,6 +300,14 @@ export default class { }) } + async _updateFallBackAdresses(xapi) { + const hosts = await xapi.getAllRecords('host') + const fallbackAddresses = hosts.map(({ address }) => address) + for (const host of hosts) { + await this.updateXenServer(host.id, { fallbackAddresses }) + } + } + async connectXenServer(id) { const server = await this.getXenServer(id) @@ -305,7 +316,10 @@ export default class { } const { config } = this._app - + let fallBackAddresses + try { + fallBackAddresses = JSON.parse(server.fallBackAddresses) + } catch (e) {} let { poolMarkingInterval, poolMarkingMaxAge, poolMarkingPrefix, ...xapiOptions } = config.get('xapiOptions') poolMarkingInterval = parseDuration(poolMarkingInterval) poolMarkingMaxAge = parseDuration(poolMarkingMaxAge) @@ -316,6 +330,7 @@ export default class { ...xapiOptions, httpProxy: server.httpProxy, + fallBackAddresses, guessVhdSizeOnImport: config.get('guessVhdSizeOnImport'), auth: { @@ -341,6 +356,8 @@ export default class { throw new PoolAlreadyConnected(poolId, serverIdsByPool[poolId], server.id) } + await this._updateFallBackAdresses(xapi).cach(noop) + serverIdsByPool[poolId] = server.id xapi.xo = (() => { @@ -469,6 +486,14 @@ export default class { this.updateXenServer(id, { error: null })::ignoreErrors() + const xo = this + // when the data are loaded, update the fall back adresses of all the hosts of this pool + xapi.once('eventFetchedSuccess', async function eventFetchedSuccessListener() { + const hosts = await this.getAllRecords('host') + const hostAddresses = hosts.map(({ address }) => address).join(';') + await xo.updateXenServer(server.id, { fallbackAddresses: hostAddresses }) + }) + xapi.once('eventFetchingError', function eventFetchingErrorListener() { const timeout = setTimeout(() => { xapi.xo.uninstall()