Patch upload reworked.

This commit is contained in:
Julien Fontanet 2015-05-12 17:34:13 +02:00
parent 6e151a9f8b
commit 883a30c7ad
5 changed files with 146 additions and 128 deletions

View File

@ -1,109 +0,0 @@
$debug = (require 'debug') 'xo:api:vm'
{$coroutine, $wait} = require '../fibers-utils'
#=====================================================================
set = $coroutine (params) ->
{pool} = params
xapi = @getXAPI pool
for param, field of {
'name_label'
'name_description'
}
continue unless param of params
$wait xapi.call "pool.set_#{field}", pool.ref, params[param]
return true
set.params = {
id: {
type: 'string',
},
name_label: {
type: 'string',
optional: true,
},
name_description: {
type: 'string',
optional: true,
},
}
set.resolve = {
pool: ['id', 'pool'],
}
exports.set = set
#---------------------------------------------------------------------
# Upload a patch and apply it
# If host is given, only apply to a host and not the whole pool
# FIXME: remove and implements uploadPatch instead
patch = $coroutine ({pool, host}) ->
xapi = @getXAPI pool
taskRef = $wait xapi.call 'task.create', 'Patch upload from XO', ''
@watchTask taskRef
.then $coroutine (patchRef) ->
$debug 'Patch upload succeeded'
if not host
xapi.call 'pool_patch.pool_apply', patchRef
else
host = @getObject host
xapi.call 'pool_patch.apply', patchRef, host.ref
return
.catch (error) ->
$debug 'Patch upload failed: %j', error
return
.finally $coroutine ->
xapi.call 'task.destroy', taskRef
return
if not host
host = @getObject pool.master, 'host'
url = $wait @registerProxyRequest {
# Receive a POST but send a PUT.
method: 'put'
proxyMethod: 'post'
hostname: host.address
pathname: '/pool_patch_upload'
query: {
session_id: xapi.sessionId
task_id: taskRef
}
}
return {
$sendTo: url
}
patch.params = {
pool: { type: 'string' },
host: { type: 'string', optional: true },
}
patch.resolve = {
pool: ['pool', 'pool'],
}
exports.patch = patch
#---------------------------------------------------------------------
installPatch = $coroutine ({pool, patch: patchUuid}) ->
return @getXAPI(pool).installPoolPatchOnAllHosts(patchUuid)
installPatch.params = {
pool: { type: 'string' }
patch: { type: 'string' }
}
installPatch.resolve = {
pool: ['pool', 'pool']
}
exports.installPatch = installPatch

75
src/api/pool.js Normal file
View File

@ -0,0 +1,75 @@
// ===================================================================
export async function set (params) {
const {pool} = params
delete params.pool
await this.getXAPI(pool).setPoolProperties(params)
}
set.params = {
id: {
type: 'string'
},
name_label: {
type: 'string',
optional: true
},
name_description: {
type: 'string',
optional: true
}
}
set.resolve = {
pool: ['id', 'pool']
}
// -------------------------------------------------------------------
export async function installPatch ({pool, patch: patchUuid}) {
await this.getXAPI(pool).installPoolPatchOnAllHosts(patchUuid)
}
installPatch.params = {
pool: {
type: 'string'
},
patch: {
type: 'string'
}
}
installPatch.resolve = {
pool: ['pool', 'pool']
}
// -------------------------------------------------------------------
async function handlePatchUpload (req, res, {pool}) {
const {headers: {['content-length']: contentLength}} = req
if (!contentLength) {
res.writeHead(400)
res.end('Content length is mandatory')
return
}
await this.getXAPI(pool).uploadPoolPatch(req, contentLength)
}
export async function uploadPatch ({pool}) {
return {
$sendTo: await this.registerHttpRequest(handlePatchUpload, {pool})
}
}
uploadPatch.params = {
pool: { type: 'string' }
}
uploadPatch.resolve = {
pool: ['pool', 'pool']
}
// TODO: compatibility
export {uploadPatch as patch}

View File

@ -31,7 +31,7 @@ import {readFile} from 'fs-promise'
import Api from './api'
import WebServer from 'http-server-plus'
import wsProxy from './ws-proxy'
import XO from './xo'
import Xo from './xo'
// ===================================================================
@ -370,7 +370,7 @@ export default async function main (args) {
// Create the main object which will connects to Xen servers and
// manages all the models.
const xo = new XO()
const xo = new Xo()
await xo.start({
redis: {
uri: config.redis && config.redis.uri
@ -393,7 +393,7 @@ export default async function main (args) {
setUpConsoleProxy(webServer, xo)
// Must be set up before the API.
connect.use(bind(xo.handleProxyRequest, xo))
connect.use(bind(xo._handleProxyRequest, xo))
// Must be set up before the static files.
setUpApi(webServer, xo)

View File

@ -327,7 +327,7 @@ export default class Xapi extends XapiBase {
// -----------------------------------------------------------------
async _uploadPoolPatch (stream, length) {
async uploadPoolPatch (stream, length) {
const task = await this._createTask('Patch upload from XO')
// TODO: Update when xen-api >= 0.5
@ -378,7 +378,7 @@ export default class Xapi extends XapiBase {
})
const length = await eventToPromise(proxy, 'length')
return this._uploadPoolPatch(proxy, length)
return this.uploadPoolPatch(proxy, length)
}
async installPoolPatchOnHost (patchUuid, hostId) {

View File

@ -422,28 +422,59 @@ export default class Xo extends EventEmitter {
// -----------------------------------------------------------------
async registerProxyRequest (opts) {
if (isString(opts)) {
opts = parseUrl(opts)
} else {
opts.method = opts.method != null ?
opts.method.toUpperCase() :
'GET'
_handleHttpRequest (req, res, next) {
const {url} = req
opts.proxyMethod = opts.proxyMethod != null ?
opts.proxyMethod.toUpperCase() :
opts.method
const {_httpRequestWatchers: watchers} = this
const watcher = watchers[url]
if (!watcher) {
next()
return
}
delete watchers[url]
opts.createdAt = Date.now()
const {fn, data} = watcher
Bluebird.try(watcher, [data]).then(
result => {
if (result != null) {
res.end(JSON.stringify(result))
}
},
error => {
console.error('HTTP request error', error)
const url = `/${await generateToken()} `
this._proxyRequests[url] = opts
if (!res.headersSent) {
res.writeHead(500)
}
res.end('unknown error')
}
)
}
async registerHttpRequest (fn, data) {
const {_httpRequestWatchers: watchers} = this
const url = await (function generateUniqueUrl () {
return generateToken().then(token => {
const url = `/api/${token}`
return url in watchers ?
generateUrl() :
url
})
})()
watchers[url] = {
fn,
data
}
return url
}
handleProxyRequest (req, res, next) {
// -----------------------------------------------------------------
_handleProxyRequest (req, res, next) {
const {url} = req
const request = this._proxyRequests[url]
if (!request || req.method !== request.proxyMethod) {
@ -482,6 +513,27 @@ export default class Xo extends EventEmitter {
})
}
async registerProxyRequest (opts) {
if (isString(opts)) {
opts = parseUrl(opts)
} else {
opts.method = opts.method != null ?
opts.method.toUpperCase() :
'GET'
opts.proxyMethod = opts.proxyMethod != null ?
opts.proxyMethod.toUpperCase() :
opts.method
}
opts.createdAt = Date.now()
const url = `/${await generateToken()} `
this._proxyRequests[url] = opts
return url
}
// -----------------------------------------------------------------
watchTask (ref) {