Compare commits

...

2 Commits

Author SHA1 Message Date
Julien Fontanet
402bdbf656 WiP: refactor(xo-server/rest-api): declarative approach 2023-11-14 15:31:43 +01:00
Julien Fontanet
d27698d131 fix(proxy/appliance): fix warning message 2023-11-14 10:12:41 +01:00
4 changed files with 340 additions and 509 deletions

View File

@@ -33,7 +33,7 @@ const callUpdate = params =>
} else if (method === 'server-error') {
reject(new Error(params.message))
} else if (method !== 'connected') {
warn('update.update, unhandled message', {
warn('updater.update, unhandled message', {
method,
params,
})

View File

@@ -66,6 +66,7 @@
"content-type": "^1.0.4",
"cookie": "^0.5.0",
"cookie-parser": "^1.4.3",
"csv-stringify": "^6.4.0",
"d3-time-format": "^4.1.0",
"decorator-synchronized": "^0.6.0",
"exec-promise": "^0.7.0",

View File

@@ -1,14 +1,13 @@
import { asyncEach } from '@vates/async-each'
import { createGzip } from 'node:zlib'
import { every } from '@vates/predicates'
import { featureUnauthorized, invalidCredentials } from 'xo-common/api-errors.js'
import { ifDef } from '@xen-orchestra/defined'
import { featureUnauthorized, invalidCredentials, noSuchObject } from 'xo-common/api-errors.js'
import { pipeline } from 'node:stream/promises'
import { json, Router } from 'express'
import { stringify as csvStringify } from 'csv-stringify'
import * as CM from 'complex-matcher'
import assert from 'node:assert/strict'
import path from 'node:path'
import pick from 'lodash/pick.js'
import * as CM from 'complex-matcher'
import { VDI_FORMAT_RAW, VDI_FORMAT_VHD } from '@xen-orchestra/xapi'
import { getUserPublicProperties } from '../utils.mjs'
@@ -34,43 +33,56 @@ function compressMaybe(req, res) {
return res
}
async function* makeObjectsStream(iterable, makeResult, json) {
// use Object.values() on non-iterable objects
if (
iterable != null &&
typeof iterable === 'object' &&
typeof iterable[Symbol.iterator] !== 'function' &&
typeof iterable[Symbol.asyncIterator] !== 'function'
) {
iterable = Object.values(iterable)
}
const FORMATS = {
__proto__: null,
if (json) {
yield '['
let first = true
for await (const object of iterable) {
if (first) {
first = false
yield '\n'
} else {
yield ',\n'
csv(iterable, res, { query: { fields } }) {
res.setHeader('content-type', 'text/csv')
return pipeline(iterable, csvStringify({ columns: fields?.split(','), header: true }), res)
},
json(iterable, res, req) {
res.setHeader('content-type', 'application/json')
return pipeline(async function* () {
yield '['
let first = true
for await (const object of iterable) {
if (first) {
first = false
yield '\n'
} else {
yield ',\n'
}
yield JSON.stringify(object, null, 2)
}
yield JSON.stringify(makeResult(object), null, 2)
}
yield '\n]\n'
} else {
for await (const object of iterable) {
yield JSON.stringify(makeResult(object))
yield '\n'
}
yield '\n]\n'
}, res)
},
ndjson(iterable, res, req) {
res.setHeader('content-type', 'application/x-ndjson')
return pipeline(async function* () {
for await (const object of iterable) {
yield JSON.stringify(object)
yield '\n'
}
}, res)
},
}
async function* itMap(iterable, cb) {
for await (const value of iterable) {
yield cb(value)
}
}
async function sendObjects(iterable, req, res, path = req.path) {
async function* collectionMap(iterable, req) {
const { query } = req
const basePath = join(req.baseUrl, path)
const makeUrl = ({ id }) => join(basePath, typeof id === 'number' ? String(id) : id)
const makeUrl = object => join(basePath, object.id)
let makeResult
let { fields } = query
@@ -91,39 +103,50 @@ async function sendObjects(iterable, req, res, path = req.path) {
}
}
const json = !Object.hasOwn(query, 'ndjson')
res.setHeader('content-type', json ? 'application/json' : 'application/x-ndjson')
return pipeline(makeObjectsStream(iterable, makeResult, json, res), res)
}
const handleOptionalUserFilter = filter => filter && CM.parse(filter).createPredicate()
const subRouter = (app, path) => {
const router = Router({ strict: false })
app.use(path, router)
return router
}
// wraps an async middleware
function wrap(middleware, handleNoSuchObject = false) {
return async function asyncMiddlewareWrapped(req, res, next) {
try {
await middleware.apply(this, arguments)
} catch (error) {
if (featureUnauthorized.is(error)) {
res.sendStatus(403)
} else if (handleNoSuchObject && noSuchObject.is(error)) {
res.sendStatus(404)
} else {
next(error)
}
}
for await (const entry of iterable) {
yield makeResult(entry)
}
}
// async function sendCollection(iterable, req, res, format, path = req.path) {
// const { query } = req
// const basePath = join(req.baseUrl, path)
// const makeUrl = id => join(basePath, id)
// let makeResult
// let { fields } = query
// if (fields === undefined) {
// makeResult = makeUrl
// } else if (fields === '*') {
// makeResult = object =>
// typeof object === 'string' ? { id: object, href: makeUrl(object) } : { ...object, href: makeUrl(object.id) }
// } else if (fields) {
// fields = fields.split(',')
// makeResult = object => {
// if (typeof object === 'string') {
// object = { id: object }
// }
// const url = makeUrl(object)
// object = pick(object, fields)
// object.href = url
// return object
// }
// }
// const json = format === 'json'
// if (!json) {
// assert.equal(format, 'ndjson')
// }
// res.setHeader('content-type', json ? 'application/json' : 'application/x-ndjson')
// return pipeline(makeObjectsStream(iterable, makeResult, json, res), res)
// }
const handleOptionalUserFilter = filter => filter && CM.parse(filter).createPredicate()
export default class RestApi {
#api
#root = new Map()
constructor(app, { express }) {
// don't setup the API if express is not present
@@ -133,26 +156,227 @@ export default class RestApi {
return
}
const api = subRouter(express, '/rest/v0')
this.#api = api
api.use(({ cookies }, res, next) => {
app.authenticateUser({ token: cookies.authenticationToken ?? cookies.token }).then(
({ user }) => {
const root = this.#root
express.use('/rest/v0/', async function (req, res, next) {
try {
try {
const { token, authenticationToken = token } = req.cookies
const user = await app.authenticateUser({ token: authenticationToken })
if (user.permission === 'admin') {
return res.sendStatus(401)
}
} catch (error) {
if (invalidCredentials.is(error)) {
return res.sendStatus(401)
}
throw error
}
req.parts = []
let node = root
let format = Object.hasOwn(req.query, 'ndjson') ? 'ndjson' : 'json'
const { path } = req
if (path.length !== 1) {
const keys = path.slice(1).split('/')
const n = keys.length
for (let i = 0; i < n; ++i) {
let key = keys[i]
const isLastPart = i === n - 1
if (isLastPart) {
const j = key.lastIndexOf('.')
if (j !== -1) {
format = key.slice(j + 1)
key = key.slice(0, j)
}
}
if (key[0] === '_') {
return next()
}
let nextNode = node.get(key)
if (nextNode === undefined) {
nextNode = node.get('_')
if (nextNode === undefined) {
return next()
}
req.parts.unshift(key)
}
node = nextNode
}
}
const { method } = req
let fn = node.get('_' + method.toLowerCase())
if (fn === undefined) {
if (method !== 'GET') {
return next()
}
res.sendStatus(401)
},
error => {
if (invalidCredentials.is(error)) {
res.sendStatus(401)
} else {
next(error)
}
fn = () => Array.from(node.keys()).filter(key => key[0] !== '_')
}
)
let result = await fn.apply(this, arguments)
if (result !== undefined) {
if (result !== null && typeof result === 'object') {
if (typeof result[Symbol.iterator] === 'function' || typeof result[Symbol.asyncIterator] === 'function') {
const fn = FORMATS[format]
assert.notEqual(fn, undefined)
return fn(result, res, req)
}
// augment the returned object with subroutes URLs
result = { ...result }
for (const key of node.keys()) {
if (key[0] !== '_') {
result[key.split('.')[0] + '_href'] = join(req.baseUrl, req.path, key)
}
}
}
assert.equal(format, 'json')
return res.json(result)
}
} catch (error) {
if (featureUnauthorized.is(error)) {
return res.sendStatus(403)
}
return next(error)
}
})
this.addToRestApi({
// /backups
backups: {
// /backups/jobs
jobs: {
// GET method on the current path
_get: async () => Object.values(await app.getAllJobs('backup')),
// /backups/jobs/* fallback route
_: {
_get: req => app.getJob(req.parts[0], 'backup'),
},
},
logs: {
_get: () => app.getBackupNgLogsSorted({ filter: ({ message: m }) => m === 'backup' || m === 'metadata' }),
_: {
_get: req => app.getBackupNgLogs(req.parts[0]),
},
},
},
restore: {
logs: {
_get: () => app.getBackupNgLogsSorted({ filter: _ => _.message === 'restore' }),
_: {
_get: req => app.getBackupNgLogs(req.parts[0]),
},
},
},
hosts: {
_: {
async 'audit.txt'(req, res) {
const host = app.getXapiObject(req.parts[0])
res.setHeader('content-type', 'text/plain')
await pipeline(await host.$xapi.getResource('/audit_log', { host }), compressMaybe(req, res))
},
async 'logs.tar'(req, res) {
const host = app.getXapiObject(req.parts[0])
res.setHeader('content-type', 'application/x-tar')
await pipeline(await host.$xapi.getResource('/host_logs_download', { host }), compressMaybe(req, res))
},
},
},
tasks: {
_delete: async (req, res) => {
await app.tasks.clearLogs()
res.sendStatus(200)
},
_get: ({ query: { filter, limit } }) =>
collectionMap(
app.tasks.list({
filter: handleOptionalUserFilter(filter),
limit: ifDef(limit, Number),
})
),
_: {
_actions: {
abort: {
enabled: async req => {
const task = await app.tasks.get(req.parts[0])
return task.status === 'pending'
},
run: async (req, res) => {
const [id] = req.parts
await app.tasks.abort(id)
res.status = 202
res.end(req.baseUrl + '/tasks/' + id) // @FIXME
},
},
},
_delete: async (req, res) => {
await app.tasks.deleteLog(req.parts[0])
res.sendStatus(200)
},
_get: async (req, res) => {
const {
parts: [id],
query: { wait },
} = req
if (wait !== undefined) {
const stopWatch = await app.tasks.watch(id, task => {
if (wait !== 'result' || task.status !== 'pending') {
stopWatch()
res.json(task)
}
})
req.on('close', stopWatch)
} else {
res.json(await app.tasks.get(id))
}
},
},
},
users: {
_get: async (req, res) => {
let users = await app.getAllUsers()
const { filter, limit } = req.query
if (filter !== undefined) {
users = users.filter(CM.parse(filter).createPredicate())
}
if (limit < users.length) {
users.length = limit
}
return collectionMap(users.map(getUserPublicProperties), req)
},
_: {
_get: async (req, res) => {
res.json(getUserPublicProperties(await app.getUser(req.parts[0])))
},
},
},
})
const types = [
@@ -168,438 +392,44 @@ export default class RestApi {
'VM-template',
'VM',
]
const collections = Object.fromEntries(
types.map(type => {
const id = type.toLocaleLowerCase() + 's'
return [id, { id, isCorrectType: _ => _.type === type, type }]
for (const type of types) {
const id = type.toLocaleLowerCase() + 's'
const isCorrectType = _ => _.type === type
this.addToRestApi({
[id]: {
_get: ({ query, fields }) => {
collectionMap(
Object.values(
app.getObjects({
filter: every(isCorrectType, handleOptionalUserFilter(query.filter)),
limit: ifDef(query.limit, Number),
})
)
)
},
},
})
)
collections.backups = { id: 'backups' }
collections.restore = { id: 'restore' }
collections.tasks = { id: 'tasks' }
collections.users = { id: 'users' }
collections.hosts.routes = {
__proto__: null,
async 'audit.txt'(req, res) {
const host = req.xapiObject
res.setHeader('content-type', 'text/plain')
await pipeline(await host.$xapi.getResource('/audit_log', { host }), compressMaybe(req, res))
},
async 'logs.tar'(req, res) {
const host = req.xapiObject
res.setHeader('content-type', 'application/x-tar')
await pipeline(await host.$xapi.getResource('/host_logs_download', { host }), compressMaybe(req, res))
},
async missing_patches(req, res) {
await app.checkFeatureAuthorization('LIST_MISSING_PATCHES')
const host = req.xapiObject
res.json(await host.$xapi.listMissingPatches(host))
},
}
collections.pools.routes = {
__proto__: null,
async missing_patches(req, res) {
await app.checkFeatureAuthorization('LIST_MISSING_PATCHES')
const xapi = req.xapiObject.$xapi
const missingPatches = new Map()
await asyncEach(Object.values(xapi.objects.indexes.type.host ?? {}), async host => {
try {
for (const patch of await xapi.listMissingPatches(host)) {
const { uuid: key = `${patch.name}-${patch.version}-${patch.release}` } = patch
missingPatches.set(key, patch)
}
} catch (error) {
console.warn(host.uuid, error)
}
})
res.json(Array.from(missingPatches.values()))
},
}
collections.pools.actions = {
__proto__: null,
rolling_update: async ({ xoObject }) => {
await app.checkFeatureAuthorization('ROLLING_POOL_UPDATE')
await app.rollingPoolUpdate(xoObject)
},
}
collections.vms.actions = {
__proto__: null,
clean_reboot: ({ xapiObject: vm }) => vm.$callAsync('clean_reboot').then(noop),
clean_shutdown: ({ xapiObject: vm }) => vm.$callAsync('clean_shutdown').then(noop),
hard_reboot: ({ xapiObject: vm }) => vm.$callAsync('hard_reboot').then(noop),
hard_shutdown: ({ xapiObject: vm }) => vm.$callAsync('hard_shutdown').then(noop),
snapshot: async ({ xapiObject: vm }, { name_label }) => {
const ref = await vm.$snapshot({ name_label })
return vm.$xapi.getField('VM', ref, 'uuid')
},
start: ({ xapiObject: vm }) => vm.$callAsync('start', false, false).then(noop),
}
api.param('collection', (req, res, next) => {
const id = req.params.collection
const collection = collections[id]
if (collection === undefined) {
next('route')
} else {
req.collection = collection
next()
}
})
api.param('object', (req, res, next) => {
const id = req.params.object
const { type } = req.collection
try {
req.xapiObject = app.getXapiObject((req.xoObject = app.getObject(id, type)))
next()
} catch (error) {
if (noSuchObject.is(error, { id, type })) {
next('route')
} else {
next(error)
}
}
})
api.get(
'/',
wrap((req, res) => sendObjects(collections, req, res))
)
api
.get(
'/backups',
wrap((req, res) => sendObjects([{ id: 'jobs' }, { id: 'logs' }], req, res))
)
.get(
'/backups/jobs',
wrap(async (req, res) => sendObjects(await app.getAllJobs('backup'), req, res))
)
.get(
'/backups/jobs/:id',
wrap(async (req, res) => {
res.json(await app.getJob(req.params.id, 'backup'))
})
)
.get(
'/backups/logs',
wrap(async (req, res) => {
const { filter, limit } = req.query
const logs = await app.getBackupNgLogsSorted({
filter: every(({ message: m }) => m === 'backup' || m === 'metadata', handleOptionalUserFilter(filter)),
limit: ifDef(limit, Number),
})
await sendObjects(logs, req, res)
})
)
.get(
'/restore',
wrap((req, res) => sendObjects([{ id: 'logs' }], req, res))
)
.get(
'/restore/logs',
wrap(async (req, res) => {
const { filter, limit } = req.query
const logs = await app.getBackupNgLogsSorted({
filter: every(_ => _.message === 'restore', handleOptionalUserFilter(filter)),
limit: ifDef(limit, Number),
})
await sendObjects(logs, req, res)
})
)
.get(
['/backups/logs/:id', '/restore/logs/:id'],
wrap(async (req, res) => {
res.json(await app.getBackupNgLogs(req.params.id))
})
)
api
.get(
'/tasks',
wrap(async (req, res) => {
const { filter, limit } = req.query
const tasks = app.tasks.list({
filter: handleOptionalUserFilter(filter),
limit: ifDef(limit, Number),
})
await sendObjects(tasks, req, res)
})
)
.delete(
'/tasks',
wrap(async (req, res) => {
await app.tasks.clearLogs()
res.sendStatus(200)
})
)
.get(
'/tasks/:id',
wrap(async (req, res) => {
const {
params: { id },
query: { wait },
} = req
if (wait !== undefined) {
const stopWatch = await app.tasks.watch(id, task => {
if (wait !== 'result' || task.status !== 'pending') {
stopWatch()
res.json(task)
}
})
req.on('close', stopWatch)
} else {
res.json(await app.tasks.get(id))
}
}, true)
)
.delete(
'/tasks/:id',
wrap(async (req, res) => {
await app.tasks.deleteLog(req.params.id)
res.sendStatus(200)
})
)
.get(
'/tasks/:id/actions',
wrap(async (req, res) => {
const task = await app.tasks.get(req.params.id)
await sendObjects(task.status === 'pending' ? [{ id: 'abort' }] : [], req, res)
})
)
.post(
'/tasks/:id/actions/abort',
wrap(async (req, res) => {
const { id } = req.params
await app.tasks.abort(id)
res.status = 202
res.end(req.baseUrl + '/tasks/' + id)
}, true)
)
api
.get(
'/users',
wrap(async (req, res) => {
let users = await app.getAllUsers()
const { filter, limit } = req.query
if (filter !== undefined) {
users = users.filter(CM.parse(filter).createPredicate())
}
if (limit < users.length) {
users.length = limit
}
sendObjects(users.map(getUserPublicProperties), req, res)
})
)
.get(
'/users/:id',
wrap(async (req, res) => {
res.json(getUserPublicProperties(await app.getUser(req.params.id)))
})
)
api.get(
'/:collection',
wrap(async (req, res) => {
const { query } = req
await sendObjects(
await app.getObjects({
filter: every(req.collection.isCorrectType, handleOptionalUserFilter(query.filter)),
limit: ifDef(query.limit, Number),
}),
req,
res
)
})
)
// should go before routes /:collection/:object because they will match but
// will not work due to the extension being included in the object identifer
api.get(
'/:collection(vdis|vdi-snapshots)/:object.:format(vhd|raw)',
wrap(async (req, res) => {
const stream = await req.xapiObject.$exportContent({ format: req.params.format })
// stream can be an HTTP response, in this case, extract interesting data
const { headers = {}, length, statusCode = 200, statusMessage = 'OK' } = stream
// Set the correct disposition
headers['content-disposition'] = 'attachment'
// expose the stream length if known
if (headers['content-length'] === undefined && length !== undefined) {
headers['content-length'] = length
}
res.writeHead(statusCode, statusMessage, headers)
await pipeline(stream, res)
})
)
api.get(
'/:collection(vms|vm-snapshots|vm-templates)/:object.xva',
wrap(async (req, res) => {
const stream = await req.xapiObject.$export({ compress: req.query.compress })
stream.headers['content-disposition'] = 'attachment'
res.writeHead(stream.statusCode, stream.statusMessage != null ? stream.statusMessage : '', stream.headers)
await pipeline(stream, res)
})
)
api.get('/:collection/:object', (req, res) => {
let result = req.xoObject
// add locations of sub-routes for discoverability
const { routes } = req.collection
if (routes !== undefined) {
result = { ...result }
for (const route of Object.keys(routes)) {
result[route.split('.')[0] + '_href'] = join(req.baseUrl, req.path, route)
}
}
res.json(result)
})
api.patch(
'/:collection/:object',
json(),
wrap(async (req, res) => {
const obj = req.xapiObject
const promises = []
const { body } = req
for (const key of ['name_description', 'name_label']) {
const value = body[key]
if (value !== undefined) {
promises.push(obj['set_' + key](value))
}
}
await promises
res.sendStatus(204)
})
)
api.get(
'/:collection/:object/tasks',
wrap(async (req, res) => {
const { query } = req
const objectId = req.xoObject.id
const tasks = app.tasks.list({
filter: every(_ => _.status === 'pending' && _.objectId === objectId, handleOptionalUserFilter(query.filter)),
limit: ifDef(query.limit, Number),
})
await sendObjects(tasks, req, res, req.baseUrl + '/tasks')
})
)
api.get(
'/:collection/:object/actions',
wrap((req, res) => {
const { actions } = req.collection
return sendObjects(actions === undefined ? [] : Array.from(Object.keys(actions), id => ({ id })), req, res)
})
)
api.post('/:collection/:object/actions/:action', json(), (req, res, next) => {
const { action } = req.params
const fn = req.collection.actions?.[action]
if (fn === undefined) {
return next()
}
const { xapiObject, xoObject } = req
const task = app.tasks.create({ name: `REST: ${action} ${req.collection.type}`, objectId: xoObject.id })
const pResult = task.run(() => fn({ xapiObject, xoObject }, req.body))
if (Object.hasOwn(req.query, 'sync')) {
pResult.then(result => res.json(result), next)
} else {
pResult.catch(noop)
res.statusCode = 202
res.end(req.baseUrl + '/tasks/' + task.id)
}
})
api.get(
'/:collection/:object/:route',
wrap((req, res, next) => {
const handler = req.collection.routes?.[req.params.route]
if (handler !== undefined) {
return handler(req, res, next)
}
return next()
})
)
api.post(
'/:collection(srs)/:object/vdis',
wrap(async (req, res) => {
const sr = req.xapiObject
req.length = +req.headers['content-length']
const { name_label, name_description, raw } = req.query
const vdiRef = await sr.$importVdi(req, {
format: raw !== undefined ? VDI_FORMAT_RAW : VDI_FORMAT_VHD,
name_label,
name_description,
})
res.end(await sr.$xapi.getField('VDI', vdiRef, 'uuid'))
})
)
api.delete(
'/:collection(vdis|vdi-snapshots|vms|vm-snapshots|vm-templates)/:object',
wrap(async (req, res) => {
await req.xapiObject.$destroy()
res.sendStatus(200)
})
)
}
registerRestApi(spec, base = '/') {
for (const path of Object.keys(spec)) {
if (path[0] === '_') {
const handler = spec[path]
this.#api[path.slice(1)](base, json(), async (req, res, next) => {
try {
const result = await handler(req, res, next)
if (result !== undefined) {
const isIterable =
result !== null && typeof (result[Symbol.iterator] ?? result[Symbol.asyncIterator]) === 'function'
if (isIterable) {
await sendObjects(result, req, res)
} else {
res.json(result)
}
}
} catch (error) {
next(error)
addToRestApi(spec) {
const add = (node, spec) => {
for (const key of Object.keys(spec)) {
if (key.length !== 1 && key[0] === '_') {
if (node.has(key)) {
throw new Error('duplicate entry')
}
})
} else {
this.registerRestApi(spec[path], join(base, path))
node.set(key, spec[key])
} else {
let current = node.get(key)
if (current === undefined) {
current = new Map()
node.set(key, current)
}
add(current, spec[key])
}
}
}
return () => {
throw new Error('not implemented')
}
return add(this.#root, spec)
}
}

View File

@@ -7793,10 +7793,10 @@ csv-parser@^3.0.0:
dependencies:
minimist "^1.2.0"
csv-stringify@^6.0.0:
version "6.4.4"
resolved "https://registry.yarnpkg.com/csv-stringify/-/csv-stringify-6.4.4.tgz#92ef52e21b5cc39d20e7db9b6b913367460e2659"
integrity sha512-NDshLupGa7gp4UG4sSNIqwYJqgSwvds0SvENntxoVoVvTzXcrHvd5gG2MWpbRpSNvk59dlmIe1IwNvSxN4IVmg==
csv-stringify@^6.0.0, csv-stringify@^6.4.0:
version "6.4.0"
resolved "https://registry.yarnpkg.com/csv-stringify/-/csv-stringify-6.4.0.tgz#6d006dca9194700e44f9fbc541bee8bbbd4f459c"
integrity sha512-HQsw0QXiN5fdlO+R8/JzCZnR3Fqp8E87YVnhHlaPtNGJjt6ffbV0LpOkieIb1x6V1+xt878IYq77SpXHWAqKkA==
cuint@^0.2.2:
version "0.2.2"