feat(xo-server/proxy) (#4763)

See #4254
This commit is contained in:
Pierre Donias
2020-01-30 16:32:26 +01:00
committed by GitHub
parent d649a22b80
commit 218bd0ffc1
12 changed files with 626 additions and 46 deletions

View File

@@ -124,3 +124,16 @@ maxUncoalescedVdis = 1
vdiExportConcurrency = 12
vmExportConcurrency = 2
vmSnapshotConcurrency = 2
["xo-proxy"]
callTimeout = '1 min'
channel = 'xo-proxy-appliance'
namespace = 'xoProxyAppliance'
# The duration for which we can wait for the VM networks to be defined
vmNetworksTimeout = '1 min'
# The duration for which we can wait for the XOA to be upgraded
xoaUpgradeTimeout = '5 min'

View File

@@ -104,14 +104,17 @@
"proxy-agent": "^3.0.0",
"pug": "^2.0.0-rc.4",
"pump": "^3.0.0",
"pumpify": "^2.0.0",
"pw": "^0.0.4",
"readable-stream": "^3.2.0",
"redis": "^2.8.0",
"schema-inspector": "^1.6.8",
"semver": "^6.0.0",
"serve-static": "^1.13.1",
"set-cookie-parser": "^2.3.5",
"source-map-support": "^0.5.16",
"split-lines": "^2.0.0",
"split2": "^3.1.1",
"stack-chain": "^2.0.0",
"stoppable": "^1.0.5",
"strict-timeout": "^1.0.0",

View File

@@ -0,0 +1,38 @@
function onEnd() {
this.onError(new Error('unexpected end of stream'))
}
function onError(reject, error) {
reject(error)
removeListeners.call(this)
}
function onReadable(resolve, size) {
const data = this.stream.read(size)
if (data !== null) {
resolve(data)
removeListeners.call(this)
}
}
function removeListeners() {
const { onEnd, onError, onReadable, stream } = this
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
stream.removeListener('readable', onReadable)
}
function Reader(stream, size, resolve, reject) {
stream.on('end', (this.onEnd = onEnd.bind(this, reject)))
stream.on('error', (this.onError = onError.bind(this, reject)))
stream.on(
'readable',
(this.onReadable = onReadable.bind(this, resolve, size))
)
this.stream = stream
}
export default (stream, size) =>
new Promise((resolve, reject) => {
new Reader(stream, size, resolve, reject).onReadable()
})

View File

@@ -24,6 +24,10 @@ createJob.params = {
type: 'string',
optional: true,
},
proxy: {
type: 'string',
optional: true,
},
remotes: {
type: 'object',
optional: true,
@@ -85,6 +89,10 @@ editJob.params = {
type: 'string',
optional: true,
},
proxy: {
type: ['string', 'null'],
optional: true,
},
remotes: {
type: 'object',
optional: true,

View File

@@ -0,0 +1,135 @@
export function register({ vm, ...props }) {
return this.registerProxy({
vmUuid: vm?.uuid,
...props,
})
}
register.permission = 'admin'
register.params = {
address: {
type: 'string',
optional: true,
},
vm: {
type: 'string',
optional: true,
},
name: {
type: 'string',
optional: true,
},
authenticationToken: {
type: 'string',
},
}
register.resolve = {
vm: ['vm', 'VM', 'administrate'],
}
export async function unregister({ id }) {
await this.unregisterProxy(id)
}
unregister.permission = 'admin'
unregister.params = {
id: {
type: 'string',
},
}
export function destroy({ id }) {
return this.destroyProxy(id)
}
destroy.permission = 'admin'
destroy.params = {
id: {
type: 'string',
},
}
export function get({ id }) {
return this.getProxy(id)
}
get.permission = 'admin'
get.params = {
id: {
type: 'string',
},
}
export function getAll() {
return this.getAllProxies()
}
getAll.permission = 'admin'
export function update({ id, vm, ...props }) {
return this.updateProxy(id, {
vmUuid: vm?.uuid,
...props,
})
}
update.permission = 'admin'
update.params = {
id: {
type: 'string',
},
address: {
type: ['string', 'null'],
optional: true,
},
vm: {
type: ['string', 'null'],
optional: true,
},
name: {
type: 'string',
optional: true,
},
authenticationToken: {
type: 'string',
optional: true,
},
}
update.resolve = {
vm: ['vm', 'VM', 'administrate'],
}
export function deploy({ sr }) {
return this.deployProxy(sr._xapiId)
}
deploy.permission = 'admin'
deploy.params = {
sr: {
type: 'string',
},
}
deploy.resolve = {
sr: ['sr', 'SR', 'administrate'],
}
export function upgradeAppliance({ id }) {
return this.upgradeProxyAppliance(id)
}
upgradeAppliance.permission = 'admin'
upgradeAppliance.params = {
id: {
type: 'string',
},
}
export function checkHealth({ id }) {
return this.checkProxyHealth(id)
}
checkHealth.permission = 'admin'
checkHealth.params = {
id: {
type: 'string',
},
}

View File

@@ -42,30 +42,32 @@ list.params = {
id: { type: 'string' },
}
export async function create({ name, url, options }) {
return this.createRemote({ name, url, options })
export function create(props) {
return this.createRemote(props)
}
create.permission = 'admin'
create.description = 'Creates a new fs remote point'
create.params = {
name: { type: 'string' },
url: { type: 'string' },
options: { type: 'string', optional: true },
proxy: { type: 'string', optional: true },
url: { type: 'string' },
}
export async function set({ id, name, url, options, enabled }) {
await this.updateRemote(id, { name, url, options, enabled })
export async function set({ id, ...props }) {
await this.updateRemote(id, props)
}
set.permission = 'admin'
set.description = 'Modifies an existing fs remote point'
set.params = {
enabled: { type: 'boolean', optional: true },
id: { type: 'string' },
name: { type: 'string', optional: true },
url: { type: 'string', optional: true },
options: { type: ['string', 'null'], optional: true },
enabled: { type: 'boolean', optional: true },
proxy: { type: ['string', 'null'], optional: true },
url: { type: 'string', optional: true },
}
async function delete_({ id }) {

View File

@@ -224,7 +224,7 @@ export default class Xapi extends XapiBase {
// Wait for an object to be in a given state.
//
// Faster than _waitObject() with a function.
_waitObjectState(idOrUuidOrRef, predicate) {
async _waitObjectState(idOrUuidOrRef, predicate) {
const object = this.getObject(idOrUuidOrRef, null)
if (object && predicate(object)) {
return object

View File

@@ -91,6 +91,7 @@ export type BackupJob = {|
...$Exact<Job>,
compression?: 'native' | 'zstd' | '',
mode: Mode,
proxy?: string,
remotes?: SimpleIdPattern,
settings: $Dict<Settings>,
srs?: SimpleIdPattern,
@@ -609,7 +610,63 @@ export default class BackupNg {
}
}
const jobId = job.id
const srs = unboxIdsFromPattern(job.srs).map(id => {
const remoteIds = unboxIdsFromPattern(job.remotes)
const srIds = unboxIdsFromPattern(job.srs)
if (job.proxy !== undefined) {
const vmIds = Object.keys(vms)
const recordToXapi = {}
const servers = new Set()
const handleRecord = uuid => {
const serverId = app.getXenServerIdByObject(uuid)
recordToXapi[uuid] = serverId
servers.add(serverId)
}
vmIds.forEach(handleRecord)
srIds.forEach(handleRecord)
const remotes = {}
const xapis = {}
await waitAll([
asyncMap(remoteIds, async id => {
remotes[id] = await app.getRemoteWithCredentials(id)
}),
asyncMap([...servers], async id => {
const {
allowUnauthorized,
host,
password,
username,
} = await app.getXenServer(id)
xapis[id] = {
allowUnauthorized,
credentials: {
username,
password,
},
url: host,
}
}),
])
return app.callProxyMethod(job.proxy, 'backup.run', {
job: {
...job,
// Make sure we are passing only the VM to run which can be
// different than the VMs in the job itself.
vms: { __or: vmIds },
},
recordToXapi,
remotes,
schedule,
xapis,
})
}
const srs = srIds.map(id => {
const xapi = app.getXapi(id)
return {
__proto__: xapi.getObject(id),
@@ -617,7 +674,7 @@ export default class BackupNg {
}
})
const remotes = await Promise.all(
unboxIdsFromPattern(job.remotes).map(async id => ({
remoteIds.map(async id => ({
id,
handler: await app.getRemoteHandler(id),
}))

View File

@@ -0,0 +1,275 @@
import cookie from 'cookie'
import defer from 'golike-defer'
import parseSetCookie from 'set-cookie-parser'
import pumpify from 'pumpify'
import split2 from 'split2'
import synchronized from 'decorator-synchronized'
import { format, parse } from 'json-rpc-peer'
import { noSuchObject } from 'xo-common/api-errors'
import { NULL_REF } from 'xen-api'
import { omit } from 'lodash'
import { timeout } from 'promise-toolbox'
import Collection from '../collection/redis'
import parseDuration from '../_parseDuration'
import patch from '../patch'
import readChunk from '../_readStreamChunk'
import { generateToken } from '../utils'
const extractProperties = _ => _.properties
const omitToken = proxy => omit(proxy, 'authenticationToken')
const synchronizedWrite = synchronized()
export default class Proxy {
constructor(app, conf) {
this._app = app
this._xoProxyConf = conf['xo-proxy']
const db = (this._db = new Collection({
connection: app._redis,
indexes: ['address', 'vmUuid'],
prefix: 'xo:proxy',
}))
app.on('clean', () => db.rebuildIndexes())
app.on('start', () =>
app.addConfigManager(
'proxies',
() => db.get(),
proxies => db.update(proxies)
)
)
}
async _throwIfRegistered(address, vmUuid) {
if (address != null && (await this._db.exists({ address }))) {
throw new Error(
`A proxy with the address (${address}) is already registered`
)
}
if (vmUuid != null && (await this._db.exists({ vmUuid }))) {
throw new Error(`A proxy with the vm (${vmUuid}) is already registered`)
}
}
@synchronizedWrite
async registerProxy({
address,
authenticationToken,
name = `Proxy ${new Date().toISOString()}`,
vmUuid,
}) {
await this._throwIfRegistered(address, vmUuid)
return this._db
.add({
address,
authenticationToken,
name,
vmUuid,
})
.then(extractProperties)
}
unregisterProxy(id) {
return this._db.remove(id)
}
async destroyProxy(id) {
const { vmUuid } = await this._getProxy(id)
if (vmUuid !== undefined) {
await this._app.getXapi(vmUuid).deleteVm(vmUuid)
}
return this.unregisterProxy(id)
}
async _getProxy(id) {
const proxy = await this._db.first(id)
if (proxy === undefined) {
throw noSuchObject(id, 'proxy')
}
return extractProperties(proxy)
}
getProxy(id) {
return this._getProxy(id).then(omitToken)
}
getAllProxies() {
return this._db.get().then(proxies => proxies.map(omitToken))
}
@synchronizedWrite
async updateProxy(id, { address, authenticationToken, name, vmUuid }) {
// TODO: don't throw if these properties aren't modified
await this._throwIfRegistered(address, vmUuid)
const proxy = await this._getProxy(id)
patch(proxy, { address, authenticationToken, name, vmUuid })
return this._db
.update(proxy)
.then(extractProperties)
.then(omitToken)
}
async upgradeProxyAppliance(id) {
const { vmUuid } = await this._getProxy(id)
const xapi = this._app.getXapi(vmUuid)
await xapi.getObject(vmUuid).update_xenstore_data({
'vm-data/xoa-updater-channel': JSON.stringify(this._xoProxyConf.channel),
})
return xapi.rebootVm(vmUuid)
}
@defer
async deployProxy($defer, srId) {
const app = this._app
const xoProxyConf = this._xoProxyConf
const namespace = xoProxyConf.namespace
const {
[namespace]: { xva },
} = await app.getResourceCatalog()
const xapi = app.getXapi(srId)
let vm = await xapi.importVm(
await app.requestResource({
id: xva.id,
namespace,
version: xva.version,
}),
{ srId }
)
$defer.onFailure.call(xapi, '_deleteVm', vm)
const [
password,
proxyAuthenticationToken,
{ registrationToken, registrationEmail: email },
] = await Promise.all([
generateToken(10),
generateToken(),
app.getApplianceRegistration(),
])
const date = new Date().toISOString()
await Promise.all([
vm.add_tags('XOA Proxy'),
vm.set_name_label(`XOA Proxy ${date}`),
vm.update_xenstore_data({
'vm-data/system-account-xoa-password': password,
'vm-data/xo-proxy-authenticationToken': JSON.stringify(
proxyAuthenticationToken
),
'vm-data/xoa-updater-credentials': JSON.stringify({
email,
registrationToken,
}),
'vm-data/xoa-updater-channel': JSON.stringify(xoProxyConf.channel),
}),
])
await xapi.startVm(vm.$id)
await vm.update_xenstore_data({
'vm-data/system-account-xoa-password': null,
'vm-data/xo-proxy-authenticationToken': null,
'vm-data/xoa-updater-credentials': null,
})
// ensure appliance has an IP address
const vmNetworksTimeout = parseDuration(xoProxyConf.vmNetworksTimeout)
vm = await timeout.call(
xapi._waitObjectState(vm.$id, _ => _.guest_metrics !== NULL_REF),
vmNetworksTimeout
)
await timeout.call(
xapi._waitObjectState(
vm.guest_metrics,
guest_metrics => guest_metrics.networks['0/ip'] !== undefined
),
vmNetworksTimeout
)
// wait for the appliance to be upgraded
const xoaUpgradeTimeout = parseDuration(xoProxyConf.xoaUpgradeTimeout)
await timeout.call(
xapi._waitObjectState(
vm.$id,
({ xenstore_data }) =>
xenstore_data['vm-data/xoa-updater-channel'] === undefined
),
xoaUpgradeTimeout
)
const { id } = await this.registerProxy({
authenticationToken: proxyAuthenticationToken,
name: `Proxy ${date}`,
vmUuid: vm.uuid,
})
await this.checkProxyHealth(id)
}
checkProxyHealth(id) {
return this.callProxyMethod(id, 'system.getServerVersion')
}
async callProxyMethod(id, method, params, expectStream = false) {
const proxy = await this._getProxy(id)
if (proxy.address === undefined) {
if (proxy.vmUuid === undefined) {
throw new Error(
'proxy VM and proxy address should not be both undefined'
)
}
const vm = this._app.getXapi(proxy.vmUuid).getObjectByUuid(proxy.vmUuid)
if ((proxy.address = vm.$guest_metrics?.networks['0/ip']) === undefined) {
throw new Error(`cannot get the proxy VM IP (${proxy.vmUuid})`)
}
}
const response = await this._app.httpRequest({
body: format.request(0, method, params),
headers: {
'Content-Type': 'application/json',
Cookie: cookie.serialize(
'authenticationToken',
proxy.authenticationToken
),
},
host: proxy.address,
method: 'POST',
pathname: '/api/v1',
protocol: 'https:',
rejectUnauthorized: false,
timeout: parseDuration(this._xoProxyConf.callTimeout),
})
const authenticationToken = parseSetCookie(response, {
map: true,
}).authenticationToken?.value
if (authenticationToken !== undefined) {
await this.updateProxy(id, { authenticationToken })
}
const lines = pumpify(response, split2())
const firstLine = await readChunk(lines)
const { result, error } = parse(String(firstLine))
if (error !== undefined) {
throw error
}
const isStream = result.$responseType === 'ndjson'
if (isStream !== expectStream) {
throw new Error(
`expect the result ${expectStream ? '' : 'not'} to be a stream`
)
}
if (isStream) {
return lines
}
lines.destroy()
return result
}
}

View File

@@ -27,6 +27,7 @@ export default class {
indexes: ['enabled'],
})
this._remotesInfo = {}
this._xo = xo
xo.on('clean', () => this._remotes.rebuildIndexes())
xo.on('start', async () => {
@@ -84,8 +85,13 @@ export default class {
}
async testRemote(remoteId) {
const handler = await this.getRemoteHandler(remoteId)
const { readRate, writeRate, ...answer } = await handler.test()
const remote = await this._getRemote(remoteId)
const { readRate, writeRate, ...answer } =
remote.proxy !== undefined
? await this._xo.callProxyMethod(remote.proxy, 'remote.test', {
remote,
})
: await this.getRemoteHandler(remoteId).then(handler => handler.test())
if (answer.success) {
const benchmark = {
@@ -93,8 +99,6 @@ export default class {
timestamp: Date.now(),
writeRate,
}
const remote = await this._getRemote(remoteId)
await this._updateRemote(remoteId, {
benchmarks:
remote.benchmarks !== undefined
@@ -107,20 +111,27 @@ export default class {
}
async getAllRemotesInfo() {
const remotes = await this._remotes.get()
const remotesInfo = this._remotesInfo
await asyncMap(this._remotes.get(), async remote => {
const promise =
remote.proxy !== undefined
? this._xo.callProxyMethod(remote.proxy, 'remote.getInfo', {
remote,
})
: await this.getRemoteHandler(remote.id).then(handler =>
handler.getInfo()
)
await asyncMap(remotes, async remote => {
try {
const handler = await this.getRemoteHandler(remote.id)
await timeout.call(
handler.getInfo().then(info => {
this._remotesInfo[remote.id] = info
promise.then(info => {
remotesInfo[remote.id] = info
}),
5e3
)
} catch (_) {}
})
return this._remotesInfo
return remotesInfo
}
async getAllRemotes() {
@@ -135,16 +146,21 @@ export default class {
return remote.properties
}
getRemoteWithCredentials(id) {
return this._getRemote(id)
}
getRemote(id) {
return this._getRemote(id).then(obfuscateRemote)
}
async createRemote({ name, url, options }) {
async createRemote({ name, options, proxy, url }) {
const params = {
name,
url,
enabled: false,
error: '',
name,
proxy,
url,
}
if (options !== undefined) {
params.options = options
@@ -153,7 +169,7 @@ export default class {
return /* await */ this.updateRemote(remote.get('id'), { enabled: true })
}
updateRemote(id, { name, url, options, enabled }) {
updateRemote(id, { enabled, name, options, proxy, url }) {
const handlers = this._handlers
const handler = handlers[id]
if (handler !== undefined) {
@@ -162,10 +178,11 @@ export default class {
}
return this._updateRemote(id, {
name,
url,
options,
enabled,
name,
options,
proxy,
url,
})
}

View File

@@ -137,7 +137,7 @@ export default class {
username,
}
) {
const server = await this._getXenServer(id)
const server = await this.getXenServer(id)
const xapi = this._xapis[id]
const requireDisconnected =
allowUnauthorized !== undefined ||
@@ -184,7 +184,7 @@ export default class {
// TODO: this method will no longer be async when servers are
// integrated to the main collection.
async _getXenServer(id) {
async getXenServer(id) {
const server = await this._servers.first(id)
if (server === undefined) {
throw noSuchObject(id, 'xenServer')
@@ -193,6 +193,22 @@ export default class {
return server
}
getXenServerIdByObject(object, type) {
if (typeof object === 'string') {
object = this._xo.getObject(object, type)
}
const { $pool: poolId } = object
if (!poolId) {
throw new Error(`object ${object.id} does not belong to a pool`)
}
const serverId = this._serverIdsByPool[poolId]
if (serverId === undefined) {
throw new Error(`no connection found for object ${object.id}`)
}
return serverId
}
_onXenAdd(
newXapiObjects,
xapiIdsToXo,
@@ -285,7 +301,7 @@ export default class {
}
async connectXenServer(id) {
const server = (await this._getXenServer(id)).properties
const server = (await this.getXenServer(id)).properties
if (this._getXenServerStatus(id) !== 'disconnected') {
throw new Error('the server is already connected')
@@ -452,21 +468,7 @@ export default class {
// Returns the XAPI connection associated to an object.
getXapi(object, type) {
if (typeof object === 'string') {
object = this._xo.getObject(object, type)
}
const { $pool: poolId } = object
if (!poolId) {
throw new Error(`object ${object.id} does not belong to a pool`)
}
const xapi = this._xapis[this._serverIdsByPool[poolId]]
if (xapi === undefined) {
throw new Error(`no connection found for object ${object.id}`)
}
return xapi
return this._xapis[this.getXenServerIdByObject(object)]
}
// returns the XAPI object corresponding to an XO object
@@ -550,7 +552,7 @@ export default class {
await xapi.ejectHostFromPool(hostId)
this._getXenServer(this._serverIdsByPool[poolId])
this.getXenServer(this._serverIdsByPool[poolId])
.then(async ({ properties }) => {
const { id } = await this.registerXenServer({
...properties,

View File

@@ -11375,6 +11375,15 @@ pumpify@^1.3.5:
inherits "^2.0.3"
pump "^2.0.0"
pumpify@^2.0.0:
version "2.0.1"
resolved "https://registry.yarnpkg.com/pumpify/-/pumpify-2.0.1.tgz#abfc7b5a621307c728b551decbbefb51f0e4aa1e"
integrity sha512-m7KOje7jZxrmutanlkS1daj1dS6z6BgslzOXmcSEpIlCxM3VJH7lG5QLeck/6hgF6F4crFf01UtQmNsJfweTAw==
dependencies:
duplexify "^4.1.1"
inherits "^2.0.3"
pump "^3.0.0"
punycode@1.3.2:
version "1.3.2"
resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.3.2.tgz#9653a036fb7c1ee42342f2325cceefea3926c48d"
@@ -11935,6 +11944,15 @@ readable-stream@2, readable-stream@^2.0.0, readable-stream@^2.0.1, readable-stre
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readable-stream@^3.0.0:
version "3.5.0"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.5.0.tgz#465d70e6d1087f6162d079cd0b5db7fbebfd1606"
integrity sha512-gSz026xs2LfxBPudDuI41V1lka8cxg64E66SGe78zJlsUofOg/yqwezdIcdfwik6B4h8LFmWPA9ef9X3FiNFLA==
dependencies:
inherits "^2.0.3"
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readable-stream@~1.0.0, readable-stream@~1.0.17, readable-stream@~1.0.31:
version "1.0.34"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c"
@@ -12583,6 +12601,11 @@ set-blocking@^2.0.0, set-blocking@~2.0.0:
resolved "https://registry.yarnpkg.com/set-blocking/-/set-blocking-2.0.0.tgz#045f9782d011ae9a6803ddd382b24392b3d890f7"
integrity sha1-BF+XgtARrppoA93TgrJDkrPYkPc=
set-cookie-parser@^2.3.5:
version "2.4.3"
resolved "https://registry.yarnpkg.com/set-cookie-parser/-/set-cookie-parser-2.4.3.tgz#9c917e75698a5633511c3c6a3435f334faabc240"
integrity sha512-+Eovq+TUyhqwUe+Ac9EaPlfEZOcQyy7uUPhcbEXEIsH73x/gOU56RO8wZDZW98fu3vSxhcPjuKDo1mIrmM7ixw==
set-value@^2.0.0, set-value@^2.0.1:
version "2.0.1"
resolved "https://registry.yarnpkg.com/set-value/-/set-value-2.0.1.tgz#a18d40530e6f07de4228c7defe4227af8cad005b"
@@ -12902,6 +12925,13 @@ split2@^2.1.0:
dependencies:
through2 "^2.0.2"
split2@^3.1.1:
version "3.1.1"
resolved "https://registry.yarnpkg.com/split2/-/split2-3.1.1.tgz#c51f18f3e06a8c4469aaab487687d8d956160bb6"
integrity sha512-emNzr1s7ruq4N+1993yht631/JH+jaj0NYBosuKmLcq+JkGQ9MmTw1RB1fGaTCzUuseRIClrlSLHRNYGwWQ58Q==
dependencies:
readable-stream "^3.0.0"
split@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/split/-/split-1.0.1.tgz#605bd9be303aa59fb35f9229fbea0ddec9ea07d9"