Move Xen servers management to a new mixin.

This commit is contained in:
Julien Fontanet 2016-02-29 12:36:03 +01:00
parent f780ba2c5a
commit 31e3117190
2 changed files with 356 additions and 344 deletions

View File

@ -0,0 +1,355 @@
import Xapi from '../xapi'
import xapiObjectToXo from '../xapi-object-to-xo'
import XapiStats from '../xapi-stats'
import {
JsonRpcError,
NoSuchObject
} from '../api-errors'
import {
camelToSnakeCase,
createRawObject,
forEach,
isEmpty,
isString,
noop,
popProperty
} from '../utils'
import {
Servers
} from '../models/server'
// ===================================================================
class NoSuchXenServer extends NoSuchObject {
constructor (id) {
super(id, 'xen server')
}
}
// ===================================================================
export default class {
constructor (xo) {
this._objectConflicts = createRawObject() // TODO: clean when a server is disconnected.
this._servers = new Servers({
connection: xo._redis,
prefix: 'xo:server',
indexes: ['host']
})
this._stats = new XapiStats()
this._xapis = createRawObject()
this._xo = xo
xo.on('start', async () => {
// Connects to existing servers.
const servers = await this._servers.get()
for (let server of servers) {
if (server.enabled) {
this.connectXenServer(server.id).catch(error => {
console.error(
`[WARN] ${server.host}:`,
error[0] || error.stack || error.code || error
)
})
}
}
})
// TODO: disconnect servers on stop.
}
async registerXenServer ({host, username, password, readOnly = false}) {
// FIXME: We are storing passwords which is bad!
// Could we use tokens instead?
// TODO: use plain objects
const server = await this._servers.create({
host,
username,
password,
readOnly: readOnly ? 'true' : undefined,
enabled: 'true'
})
return server.properties
}
async unregisterXenServer (id) {
this.disconnectXenServer(id).catch(noop)
if (!await this._servers.remove(id)) { // eslint-disable-line space-before-keywords
throw new NoSuchXenServer(id)
}
}
async updateXenServer (id, {host, username, password, readOnly, enabled}) {
const server = await this._getXenServer(id)
if (host) server.set('host', host)
if (username) server.set('username', username)
if (password) server.set('password', password)
if (enabled !== undefined) {
server.set('enabled', enabled ? 'true' : undefined)
}
if (readOnly !== undefined) {
server.set('readOnly', readOnly ? 'true' : undefined)
const xapi = this._xapis[id]
if (xapi) {
xapi.readOnly = readOnly
}
}
await this._servers.update(server)
}
// TODO: this method will no longer be async when servers are
// integrated to the main collection.
async _getXenServer (id) {
const server = await this._servers.first(id)
if (!server) {
throw new NoSuchXenServer(id)
}
return server
}
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._objectConflicts
const objects = this._xo._objects
forEach(xapiObjects, (xapiObject, xapiId) => {
try {
const xoObject = xapiObjectToXo(xapiObject)
if (!xoObject) {
return
}
const xoId = xoObject.id
xapiIdsToXo[xapiId] = xoId
const previous = objects.get(xoId, undefined)
if (
previous &&
previous._xapiRef !== xapiObject.$ref
) {
(
conflicts[xoId] ||
(conflicts[xoId] = createRawObject())
)[conId] = xoObject
} else {
objects.set(xoId, xoObject)
}
} catch (error) {
console.error('ERROR: xapiObjectToXo', error)
toRetry[xapiId] = xapiObject
}
})
}
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._objectConflicts
const objects = this._xo._objects
forEach(xapiObjects, (_, xapiId) => {
toRetry && delete toRetry[xapiId]
const xoId = xapiIdsToXo[xapiId]
if (!xoId) {
// This object was not known previously.
return
}
delete xapiIdsToXo[xapiId]
const objConflicts = conflicts[xoId]
if (objConflicts) {
if (objConflicts[conId]) {
delete objConflicts[conId]
} else {
objects.set(xoId, popProperty(objConflicts))
}
if (isEmpty(objConflicts)) {
delete conflicts[xoId]
}
} else {
objects.unset(xoId)
}
})
}
async connectXenServer (id) {
const server = (await this._getXenServer(id)).properties
const xapi = this._xapis[server.id] = new Xapi({
url: server.host,
auth: {
user: server.username,
password: server.password
},
readOnly: Boolean(server.readOnly)
})
xapi.xo = (() => {
const conId = server.id
// Maps ids of XAPI objects to ids of XO objects.
const xapiIdsToXo = createRawObject()
// Map of XAPI objects which failed to be transformed to XO
// objects.
//
// At each `finish` there will be another attempt to transform
// until they succeed.
let toRetry
let toRetryNext = createRawObject()
const onAddOrUpdate = objects => {
this._onXenAdd(objects, xapiIdsToXo, toRetryNext, conId)
}
const onRemove = objects => {
this._onXenRemove(objects, xapiIdsToXo, toRetry, conId)
}
const onFinish = () => {
if (xapi.pool) {
this._xapis[xapi.pool.$id] = xapi
}
if (!isEmpty(toRetry)) {
onAddOrUpdate(toRetry)
toRetry = null
}
if (!isEmpty(toRetryNext)) {
toRetry = toRetryNext
toRetryNext = createRawObject()
}
}
const { objects } = xapi
const addObject = object => {
// TODO: optimize.
onAddOrUpdate({ [object.$id]: object })
return xapiObjectToXo(object)
}
return {
install () {
objects.on('add', onAddOrUpdate)
objects.on('update', onAddOrUpdate)
objects.on('remove', onRemove)
objects.on('finish', onFinish)
onAddOrUpdate(objects.all)
},
uninstall () {
objects.removeListener('add', onAddOrUpdate)
objects.removeListener('update', onAddOrUpdate)
objects.removeListener('remove', onRemove)
objects.removeListener('finish', onFinish)
onRemove(objects.all)
},
addObject,
getData: (id, key) => {
const value = xapi.getObject(id).other_config[`xo:${camelToSnakeCase(key)}`]
return value && JSON.parse(value)
},
setData: async (id, key, value) => {
await xapi._updateObjectMapProperty(
xapi.getObject(id),
'other_config',
{ [`xo:${camelToSnakeCase(key)}`]: JSON.stringify(value) }
)
// Register the updated object.
addObject(await xapi._waitObject(id))
}
}
})()
xapi.xo.install()
try {
await xapi.connect()
} catch (error) {
if (error.code === 'SESSION_AUTHENTICATION_FAILED') {
throw new JsonRpcError('authentication failed')
}
if (error.code === 'EHOSTUNREACH') {
throw new JsonRpcError('host unreachable')
}
throw error
}
}
async disconnectXenServer (id) {
const xapi = this._xapis[id]
if (!xapi) {
throw new NoSuchXenServer(id)
}
delete this._xapis[id]
if (xapi.pool) {
delete this._xapis[xapi.pool.id]
}
xapi.xo.uninstall()
return xapi.disconnect()
}
// Returns the XAPI connection associated to an object.
getXapi (object, type) {
if (isString(object)) {
object = this.getObject(object, type)
}
const { $pool: poolId } = object
if (!poolId) {
throw new Error(`object ${object.id} does not belong to a pool`)
}
const xapi = this._xapis[poolId]
if (!xapi) {
throw new Error(`no connection found for object ${object.id}`)
}
return xapi
}
getXapiVmStats (vm, granularity) {
const xapi = this.getXapi(vm)
return this._stats.getVmPoints(xapi, vm._xapiId, granularity)
}
getXapiHostStats (host, granularity) {
const xapi = this.getXapi(host)
return this._stats.getHostPoints(xapi, host._xapiId, granularity)
}
async mergeXenPools (sourceId, targetId, force = false) {
const sourceXapi = this.getXapi(sourceId)
const {
_auth: { user, password },
_url: { hostname }
} = this.getXapi(targetId)
// We don't want the events of the source XAPI to interfere with
// the events of the new XAPI.
sourceXapi.xo.uninstall()
try {
await sourceXapi.joinPool(hostname, user, password, force)
} catch (e) {
sourceXapi.xo.install()
throw e
}
await this.unregisterXenServer(sourceId)
}
}

345
src/xo.js
View File

@ -10,29 +10,22 @@ import {EventEmitter} from 'events'
import mixins from './xo-mixins' import mixins from './xo-mixins'
import Connection from './connection' import Connection from './connection'
import LevelDbLogger from './loggers/leveldb' import LevelDbLogger from './loggers/leveldb'
import Xapi from './xapi'
import xapiObjectToXo from './xapi-object-to-xo'
import XapiStats from './xapi-stats'
import {Acls} from './models/acl' import {Acls} from './models/acl'
import { import {
mixin mixin
} from './decorators' } from './decorators'
import { import {
camelToSnakeCase,
createRawObject, createRawObject,
forEach, forEach,
generateToken, generateToken,
isEmpty, isEmpty,
mapToArray, mapToArray,
noop, noop
popProperty
} from './utils' } from './utils'
import { import {
JsonRpcError,
NoSuchObject NoSuchObject
} from './api-errors' } from './api-errors'
import {ModelAlreadyExists} from './collection' import {ModelAlreadyExists} from './collection'
import {Servers} from './models/server'
import Token, {Tokens} from './models/token' import Token, {Tokens} from './models/token'
// =================================================================== // ===================================================================
@ -43,12 +36,6 @@ class NoSuchAuthenticationToken extends NoSuchObject {
} }
} }
class NoSuchXenServer extends NoSuchObject {
constructor (id) {
super(id, 'xen server')
}
}
// =================================================================== // ===================================================================
@mixin(mapToArray(mixins)) @mixin(mapToArray(mixins))
@ -61,12 +48,6 @@ export default class Xo extends EventEmitter {
this._objects = new XoCollection() this._objects = new XoCollection()
this._objects.createIndex('byRef', new XoUniqueIndex('_xapiRef')) this._objects.createIndex('byRef', new XoUniqueIndex('_xapiRef'))
// Connections to Xen servers.
this._xapis = createRawObject()
// Stats utils.
this._xapiStats = new XapiStats()
// Connections to users. // Connections to users.
this._nextConId = 0 this._nextConId = 0
this._connections = createRawObject() this._connections = createRawObject()
@ -74,7 +55,6 @@ export default class Xo extends EventEmitter {
this._authenticationFailures = createRawObject() this._authenticationFailures = createRawObject()
this._authenticationProviders = new Set() this._authenticationProviders = new Set()
this._httpRequestWatchers = createRawObject() this._httpRequestWatchers = createRawObject()
this._xenObjConflicts = createRawObject() // TODO: clean when a server is disconnected.
// Connects to Redis. // Connects to Redis.
this._redis = createRedisClient(config.redis && config.redis.uri) this._redis = createRedisClient(config.redis && config.redis.uri)
@ -97,11 +77,6 @@ export default class Xo extends EventEmitter {
prefix: 'xo:acl', prefix: 'xo:acl',
indexes: ['subject', 'object'] indexes: ['subject', 'object']
}) })
this._servers = new Servers({
connection: redis,
prefix: 'xo:server',
indexes: ['host']
})
this._tokens = new Tokens({ this._tokens = new Tokens({
connection: redis, connection: redis,
prefix: 'xo:token', prefix: 'xo:token',
@ -110,21 +85,6 @@ export default class Xo extends EventEmitter {
// --------------------------------------------------------------- // ---------------------------------------------------------------
// Connects to existing servers.
const servers = await this._servers.get()
for (let server of servers) {
if (server.enabled) {
this.connectXenServer(server.id).catch(error => {
console.error(
`[WARN] ${server.host}:`,
error[0] || error.stack || error.code || error
)
})
}
}
// ---------------------------------------------------------------
const handleStartError = error => { const handleStartError = error => {
console.error( console.error(
'[WARN] start error:', '[WARN] start error:',
@ -149,8 +109,6 @@ export default class Xo extends EventEmitter {
async stop () { async stop () {
this.stop = noop this.stop = noop
// TODO: disconnect all servers.
const handleStopError = error => { const handleStopError = error => {
console.error( console.error(
'[WARN] stop error:', '[WARN] stop error:',
@ -363,307 +321,6 @@ export default class Xo extends EventEmitter {
// ----------------------------------------------------------------- // -----------------------------------------------------------------
async registerXenServer ({host, username, password, readOnly = false}) {
// FIXME: We are storing passwords which is bad!
// Could we use tokens instead?
// TODO: use plain objects
const server = await this._servers.create({
host,
username,
password,
readOnly: readOnly ? 'true' : undefined,
enabled: 'true'
})
return server.properties
}
async unregisterXenServer (id) {
this.disconnectXenServer(id).catch(noop)
if (!await this._servers.remove(id)) { // eslint-disable-line space-before-keywords
throw new NoSuchXenServer(id)
}
}
async updateXenServer (id, {host, username, password, readOnly, enabled}) {
const server = await this._getXenServer(id)
if (host) server.set('host', host)
if (username) server.set('username', username)
if (password) server.set('password', password)
if (enabled !== undefined) {
server.set('enabled', enabled ? 'true' : undefined)
}
if (readOnly !== undefined) {
server.set('readOnly', readOnly ? 'true' : undefined)
const xapi = this._xapis[id]
if (xapi) {
xapi.readOnly = readOnly
}
}
await this._servers.update(server)
}
// TODO: this method will no longer be async when servers are
// integrated to the main collection.
async _getXenServer (id) {
const server = await this._servers.first(id)
if (!server) {
throw new NoSuchXenServer(id)
}
return server
}
_onXenAdd (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._xenObjConflicts
const objects = this._objects
forEach(xapiObjects, (xapiObject, xapiId) => {
try {
const xoObject = xapiObjectToXo(xapiObject)
if (!xoObject) {
return
}
const xoId = xoObject.id
xapiIdsToXo[xapiId] = xoId
const previous = objects.get(xoId, undefined)
if (
previous &&
previous._xapiRef !== xapiObject.$ref
) {
(
conflicts[xoId] ||
(conflicts[xoId] = createRawObject())
)[conId] = xoObject
} else {
objects.set(xoId, xoObject)
}
} catch (error) {
console.error('ERROR: xapiObjectToXo', error)
toRetry[xapiId] = xapiObject
}
})
}
_onXenRemove (xapiObjects, xapiIdsToXo, toRetry, conId) {
const conflicts = this._xenObjConflicts
const objects = this._objects
forEach(xapiObjects, (_, xapiId) => {
toRetry && delete toRetry[xapiId]
const xoId = xapiIdsToXo[xapiId]
if (!xoId) {
// This object was not known previously.
return
}
delete xapiIdsToXo[xapiId]
const objConflicts = conflicts[xoId]
if (objConflicts) {
if (objConflicts[conId]) {
delete objConflicts[conId]
} else {
objects.set(xoId, popProperty(objConflicts))
}
if (isEmpty(objConflicts)) {
delete conflicts[xoId]
}
} else {
objects.unset(xoId)
}
})
}
async connectXenServer (id) {
const server = (await this._getXenServer(id)).properties
const xapi = this._xapis[server.id] = new Xapi({
url: server.host,
auth: {
user: server.username,
password: server.password
},
readOnly: Boolean(server.readOnly)
})
xapi.xo = (() => {
const conId = server.id
// Maps ids of XAPI objects to ids of XO objects.
const xapiIdsToXo = createRawObject()
// Map of XAPI objects which failed to be transformed to XO
// objects.
//
// At each `finish` there will be another attempt to transform
// until they succeed.
let toRetry
let toRetryNext = createRawObject()
const onAddOrUpdate = objects => {
this._onXenAdd(objects, xapiIdsToXo, toRetryNext, conId)
}
const onRemove = objects => {
this._onXenRemove(objects, xapiIdsToXo, toRetry, conId)
}
const onFinish = () => {
if (xapi.pool) {
this._xapis[xapi.pool.$id] = xapi
}
if (!isEmpty(toRetry)) {
onAddOrUpdate(toRetry)
toRetry = null
}
if (!isEmpty(toRetryNext)) {
toRetry = toRetryNext
toRetryNext = createRawObject()
}
}
const { objects } = xapi
const addObject = object => {
// TODO: optimize.
onAddOrUpdate({ [object.$id]: object })
return xapiObjectToXo(object)
}
return {
install () {
objects.on('add', onAddOrUpdate)
objects.on('update', onAddOrUpdate)
objects.on('remove', onRemove)
objects.on('finish', onFinish)
onAddOrUpdate(objects.all)
},
uninstall () {
objects.removeListener('add', onAddOrUpdate)
objects.removeListener('update', onAddOrUpdate)
objects.removeListener('remove', onRemove)
objects.removeListener('finish', onFinish)
onRemove(objects.all)
},
addObject,
getData: (id, key) => {
const value = xapi.getObject(id).other_config[`xo:${camelToSnakeCase(key)}`]
return value && JSON.parse(value)
},
setData: async (id, key, value) => {
await xapi._updateObjectMapProperty(
xapi.getObject(id),
'other_config',
{ [`xo:${camelToSnakeCase(key)}`]: JSON.stringify(value) }
)
// Register the updated object.
addObject(await xapi._waitObject(id))
}
}
})()
xapi.xo.install()
try {
await xapi.connect()
} catch (error) {
if (error.code === 'SESSION_AUTHENTICATION_FAILED') {
throw new JsonRpcError('authentication failed')
}
if (error.code === 'EHOSTUNREACH') {
throw new JsonRpcError('host unreachable')
}
throw error
}
}
async disconnectXenServer (id) {
const xapi = this._xapis[id]
if (!xapi) {
throw new NoSuchXenServer(id)
}
delete this._xapis[id]
if (xapi.pool) {
delete this._xapis[xapi.pool.id]
}
xapi.xo.uninstall()
return xapi.disconnect()
}
getXAPI () {
throw new Error(`Use <Xo#getXapi()> (camel case) instead.`)
}
// Returns the XAPI connection associated to an object.
getXapi (object, type) {
if (isString(object)) {
object = this.getObject(object, type)
}
const { $pool: poolId } = object
if (!poolId) {
throw new Error(`object ${object.id} does not belong to a pool`)
}
const xapi = this._xapis[poolId]
if (!xapi) {
throw new Error(`no connection found for object ${object.id}`)
}
return xapi
}
getXapiVmStats (vm, granularity) {
const xapi = this.getXapi(vm)
return this._xapiStats.getVmPoints(xapi, vm._xapiId, granularity)
}
getXapiHostStats (host, granularity) {
const xapi = this.getXapi(host)
return this._xapiStats.getHostPoints(xapi, host._xapiId, granularity)
}
async mergeXenPools (sourceId, targetId, force = false) {
const sourceXapi = this.getXapi(sourceId)
const {
_auth: { user, password },
_url: { hostname }
} = this.getXapi(targetId)
// We don't want the events of the source XAPI to interfere with
// the events of the new XAPI.
sourceXapi.xo.uninstall()
try {
await sourceXapi.joinPool(hostname, user, password, force)
} catch (e) {
sourceXapi.xo.install()
throw e
}
await this.unregisterXenServer(sourceId)
}
// -----------------------------------------------------------------
// Returns an object from its key or UUID. // Returns an object from its key or UUID.
// //
// TODO: should throw a NoSuchObject error on failure. // TODO: should throw a NoSuchObject error on failure.