Merge branch 'patch' into next-release

This commit is contained in:
Julien Fontanet
2015-05-06 14:50:40 +02:00
10 changed files with 497 additions and 8 deletions

View File

@@ -54,6 +54,7 @@ gulp.task(function buildEs6 () {
comments: false,
optional: [
'es7.asyncFunctions',
'es7.decorators',
'runtime'
]
}))

View File

@@ -29,6 +29,7 @@
},
"dependencies": {
"@julien-f/json-rpc": "^0.3.5",
"@julien-f/unzip": "^0.2.1",
"app-conf": "^0.3.4",
"babel-runtime": "^5",
"base64url": "1.0.4",
@@ -39,6 +40,7 @@
"exec-promise": "^0.5.1",
"fibers": "~1.0.5",
"fs-promise": "^0.3.1",
"got": "^2.9.2",
"graceful-fs": "^3.0.6",
"hashy": "~0.4.2",
"http-server-plus": "^0.5.1",
@@ -47,10 +49,11 @@
"lodash.assign": "^3.0.0",
"lodash.bind": "^3.0.0",
"lodash.clone": "^3.0.1",
"lodash.difference": "^3.0.1",
"lodash.difference": "^3.2.0",
"lodash.filter": "^3.1.0",
"lodash.find": "^3.0.0",
"lodash.findindex": "^3.0.0",
"lodash.findlast": "^3.2.0",
"lodash.foreach": "^3.0.1",
"lodash.has": "^3.0.0",
"lodash.includes": "^3.1.1",

View File

@@ -1,4 +1,8 @@
{$coroutine, $wait} = require '../fibers-utils'
$request = require('bluebird').promisify(require('request'))
{parseXml} = require '../utils'
$forEach = require 'lodash.foreach'
$find = require 'lodash.find'
#=====================================================================
@@ -206,3 +210,37 @@ createNetwork.resolve = {
}
createNetwork.permission = 'admin'
exports.createNetwork = createNetwork
#---------------------------------------------------------------------
# Returns an array of missing new patches in the host
# Returns an empty array if up-to-date
# Throws an error if the host is not running the latest XS version
listMissingPatches = $coroutine ({host}) ->
return @getXAPI(host).listMissingHostPatches(host)
listMissingPatches.params = {
host: { type: 'string' }
}
listMissingPatches.resolve = {
host: ['host', 'host'],
}
exports.listMissingPatches = listMissingPatches
#---------------------------------------------------------------------
installPatch = $coroutine ({host, patch: patchUuid}) ->
return @getXAPI(host).installPoolPatchOnHost(patchUuid, host.id)
installPatch.params = {
host: { type: 'string' }
patch: { type: 'string' }
}
installPatch.resolve = {
host: ['host', 'host']
}
exports.installPatch = installPatch

View File

@@ -38,17 +38,22 @@ set.resolve = {
exports.set = set
#---------------------------------------------------------------------
# Upload a patch and apply it
# If host is given, only apply to a host and not the whole pool
# FIXME
patch = $coroutine ({pool}) ->
# FIXME: remove and implements uploadPatch instead
patch = $coroutine ({pool, host}) ->
xapi = @getXAPI pool
host = @getObject pool.master, 'host'
taskRef = $wait xapi.call 'task.create', 'Patch upload from XO', ''
@watchTask taskRef
.then $coroutine (patchRef) ->
$debug 'Patch upload succeeded'
xapi.call 'pool_patch.pool_apply', patchRef
if not host
xapi.call 'pool_patch.pool_apply', patchRef
else
host = @getObject host
xapi.call 'pool_patch.apply', patchRef, host.ref
return
.catch (error) ->
$debug 'Patch upload failed: %j', error
@@ -57,6 +62,9 @@ patch = $coroutine ({pool}) ->
xapi.call 'task.destroy', taskRef
return
if not host
host = @getObject pool.master, 'host'
url = $wait @registerProxyRequest {
# Receive a POST but send a PUT.
method: 'put'
@@ -75,6 +83,7 @@ patch = $coroutine ({pool}) ->
patch.params = {
pool: { type: 'string' },
host: { type: 'string', optional: true },
}
patch.resolve = {
@@ -82,3 +91,19 @@ patch.resolve = {
}
exports.patch = patch
#---------------------------------------------------------------------
installPatch = $coroutine ({pool, patch: patchUuid}) ->
return @getXAPI(pool).installPoolPatchOnAllHosts(patchUuid)
installPatch.params = {
pool: { type: 'string' }
patch: { type: 'string' }
}
installPatch.resolve = {
pool: ['pool', 'pool']
}
exports.installPatch = installPatch

View File

@@ -1103,3 +1103,29 @@ stats.resolve = {
}
exports.stats = stats;
#---------------------------------------------------------------------
# Actions on a Docker container in a VM
# Can be: start, stop, pause, unpause, restart
dockerContainerAction = $coroutine ({host, vm, container, action}) ->
xapi = @getXAPI vm
host = @getObject vm.$container
args = {
vmuuid: vm.UUID,
container: container,
}
console.log args
return $wait xapi.call 'host.call_plugin', host.ref, 'xscontainer', action, args
dockerContainerAction.params = {
vm: { type: 'string' }
container: { type: 'string' }
action: { type: 'string' }
}
dockerContainerAction.resolve = {
vm: ['vm', 'VM'],
}
dockerContainerAction.permission = 'admin'
exports.dockerContainerAction = dockerContainerAction

33
src/decorators.js Normal file
View File

@@ -0,0 +1,33 @@
// Debounce decorator for methods.
//
// See: https://github.com/wycats/javascript-decorators
export const debounce = (duration) => (target, name, descriptor) => {
const {value: fn} = descriptor
// This symbol is used to store the related data directly on the
// current object.
const s = Symbol()
function debounced () {
let data = this[s] || (this[s] = {
lastCall: 0,
wrapper: null
})
const now = Date.now()
if (now > data.lastCall + duration) {
data.lastCall = now
try {
const result = fn.apply(this, arguments)
data.wrapper = () => result
} catch (error) {
data.wrapper = () => { throw error }
}
}
return data.wrapper()
}
debounced.reset = (obj) => { delete obj[s] }
descriptor.value = debounced
return descriptor
}

View File

@@ -434,6 +434,10 @@ module.exports = ->
if: $isVMRunning
val: -> @val.CPUs.number
}
version: -> @genval.software_version.product_version
build: -> @genval.software_version.build_number
}
#-------------------------------------------------------------------

View File

@@ -68,6 +68,17 @@ export const parseXml = (function () {
// -------------------------------------------------------------------
// Ponyfill for Promise.finally(cb)
export const pFinally = (promise, cb) => {
return promise.then(
(value) => constructor.resolve(cb()).then(() => value),
(reason) => constructor.resolve(cb()).then(() => {
throw reason
})
)
}
// -------------------------------------------------------------------
export function parseSize (size) {
let bytes = humanFormat.parse.raw(size, { scale: 'binary' })
if (bytes.unit && bytes.unit !== 'B') {

349
src/xapi.js Normal file
View File

@@ -0,0 +1,349 @@
import createDebug from 'debug'
import eventToPromise from 'event-to-promise'
import find from 'lodash.find'
import forEach from 'lodash.foreach'
import got from 'got'
import map from 'lodash.map'
import omit from 'lodash.omit'
import unzip from '@julien-f/unzip'
import {PassThrough} from 'stream'
import {promisify} from 'bluebird'
import {Xapi as XapiBase} from 'xen-api'
import {debounce} from './decorators'
import {JsonRpcError} from './api-errors'
import {parseXml, pFinally} from './utils'
const debug = createDebug('xo:xapi')
// ===================================================================
const gotPromise = promisify(got)
const wrapError = error => {
const e = new Error(error[0])
e.code = error[0]
e.params = error.slice(1)
return e
}
// ===================================================================
export default class Xapi extends XapiBase {
constructor (...args) {
super(...args)
const objectsWatchers = this._objectWatchers = Object.create(null)
const taskWatchers = this._taskWatchers = Object.create(null)
// TODO: This is necessary to get UUIDs for host.patches.
//
// It will no longer be useful when using xen-api >= 0.5.
this._refsToUuids = Object.create(null)
const onAddOrUpdate = objects => {
forEach(objects, object => {
const {
$id: id,
$ref: ref,
uuid
} = object
if (ref && uuid) {
this._refsToUuids[ref] = uuid
}
// Watched object.
if (id in objectsWatchers) {
objectsWatchers[id].resolve(object)
delete objectsWatchers[id]
}
if (ref in objectsWatchers) {
objectsWatchers[ref].resolve(object)
delete objectsWatchers[ref]
}
// Watched task.
if (ref in taskWatchers) {
const {status} = object
if (status === 'success') {
taskWatchers[ref].resolve(object.result)
} else if (status === 'failure') {
taskWatchers[ref].reject(wrapError(object.error_info))
} else {
return
}
delete taskWatchers[ref]
}
})
}
this.objects.on('add', onAddOrUpdate)
this.objects.on('update', onAddOrUpdate)
}
// FIXME: remove this backported methods when xen-api >= 0.5
getObject (idOrUuidOrRef, defaultValue) {
const {_objects: {all: objects}} = this
const object = (
// if there is an UUID, it is also the $id.
objects[idOrUuidOrRef] ||
objects[this._objectsByRefs[idOrUuidOrRef]]
)
if (object) return object
if (arguments.length > 1) return defaultValue
throw new Error('there is not object can be matched to ' + idOrUuidOrRef)
}
getObjectByRef (ref, defaultValue) {
const {
_refsToUuids: refsToUuids,
// Objects ids are already UUIDs if they have one.
_objects: {all: objectsByUuids}
} = this
if (ref in refsToUuids) {
return objectsByUuids[refsToUuids[ref]]
}
if (arguments.length > 1) {
return defaultValue
}
throw new Error('there is no object with the ref ' + ref)
}
getObjectByUuid (uuid, defaultValue) {
const {
// Objects ids are already UUIDs if they have one.
_objects: {all: objectsByUuids}
} = this
if (uuid in objectsByUuids) {
return objectsByUuids[uuid]
}
if (arguments.length > 1) {
return defaultValue
}
throw new Error('there is no object with the UUID ' + uuid)
}
// =================================================================
// Wait for an object to appear or to be updated.
//
// TODO: implements a timeout.
_waitObject (idOrUuidOrRef) {
let watcher = this._objectWatchers[idOrUuidOrRef]
if (!watcher) {
let resolve, reject
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_
reject = reject_
})
// Register the watcher.
watcher = this._objectWatchers[idOrUuidOrRef] = {
promise,
resolve,
reject
}
}
return watcher.promise
}
// Returns the objects if already presents or waits for it.
async _getOrWaitObject (idOrUuidOrRef) {
return (
this.getObject(idOrUuidOrRef, undefined) ||
this._waitObject(idOrUuidOrRef)
)
}
// =================================================================
// Create a task.
//
// Returns the task object from the Xapi.
async _createTask (name, description = '') {
const ref = await this.call('task.create', name, description)
pFinally(this._watchTask(ref), () => {
this.call('task.destroy', ref)
})
return this._getOrWaitObject(ref)
}
// Waits for a task to be resolved.
_watchTask ({ref}) {
let watcher = this._taskWatchers[ref]
if (!watcher) {
let resolve, reject
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_
reject = reject_
})
// Register the watcher.
watcher = this._taskWatchers[ref] = {
promise,
resolve,
reject
}
}
return watcher.promise
}
// =================================================================
// FIXME: should be static
@debounce(24 * 60 * 60 * 1000)
async _getXenUpdates () {
const [body, {statusCode}] = await gotPromise(
'http://updates.xensource.com/XenServer/updates.xml'
)
if (statusCode !== 200) {
throw new JsonRpcError('cannot fetch patches list from Citrix')
}
const {patchdata: data} = parseXml(body)
const patches = Object.create(null)
forEach(data.patches.patch, patch => {
patches[patch.uuid] = {
date: patch.timestamp,
description: patch['name-description'],
documentationUrl: patch.url,
guidance: patch['after-apply-guidance'],
name: patch['name-label'],
url: patch['patch-url'],
// TODO: what does it mean, should we handle it?
// version: patch.version,
}
})
const resolveVersionPatches = function (uuids) {
const versionPatches = Object.create(null)
forEach(uuids, ({uuid}) => {
versionPatches[uuid] = patches[uuid]
})
return versionPatches
}
const versions = Object.create(null)
let latestVersion
forEach(data.serverversions.version, version => {
versions[version.value] = {
date: version.timestamp,
name: version.name,
id: version.value,
documentationUrl: version.url,
patches: resolveVersionPatches(version.patch)
}
if (version.latest) {
latestVersion = versions[version.value]
}
})
return {
patches,
latestVersion,
versions
}
}
// =================================================================
async listMissingHostPatches (host) {
return omit(
(await this._getXenUpdates()).versions[host.version].patches,
// TODO: simplify when we start to use xen-api >= 0.5
map(host.patches, ref => {
const hostPatch = this.objects.all[this._refsToUuids[ref]]
return this._refsToUuids[hostPatch.pool_patch]
})
)
}
// -----------------------------------------------------------------
async _uploadPoolPatch (stream, length) {
const task = await this._createTask('Patch upload from XO')
// TODO: Update when xen-api >= 0.5
const poolMaster = this.objects.all[this._refsToUuids[this.pool.master]]
await Promise.all([
gotPromise('http://' + poolMaster.address + '/pool_patch_upload', {
method: 'put',
body: stream,
query: {
session_id: this.sessionId,
task_id: task.$ref
},
headers: {
'content-length': length
}
}),
this._watchTask(task)
]).then(([, patchRef]) => this._waitObject(patchRef))
}
async _getOrUploadPoolPatch (uuid) {
try {
return this.getObjectByUuid(uuid)
} catch (error) {}
const patchInfo = (await this._getXenUpdates()).patches[uuid]
const PATCH_RE = /\.xsupdate$/
const proxy = new PassThrough()
got(patchInfo.url).on('error', error => {
// TODO: better error handling
console.error(error)
}).pipe(unzip.Parse()).on('entry', entry => {
if (PATCH_RE.test(entry.path)) {
proxy.emit('length', entry.size)
entry.pipe(proxy)
} else {
entry.autodrain()
}
}).on('error', error => {
// TODO: better error handling
console.error(error)
})
const length = await eventToPromise(proxy, 'length')
return this._uploadPoolPatch(host, proxy, length)
}
async installPoolPatchOnHost (patchUuid, hostId) {
const patch = await this._getOrUploadPoolPatch(patchUuid)
const host = this.getObject(hostId)
await this.call('pool_patch.apply', patch.$ref, host.$ref)
}
async installPoolPatchOnAllHosts (patchUuid) {
const patch = await this._getOrUploadPoolPatch(patchUuid)
await this.call('pool_patch.pool_apply', patch.$ref)
}
// =================================================================
}

View File

@@ -6,17 +6,16 @@ import isString from 'lodash.isstring'
import pluck from 'lodash.pluck'
import proxyRequest from 'proxy-http-request'
import {createClient as createRedisClient} from 'then-redis'
import {createClient as createXapiClient} from 'xen-api'
import {EventEmitter} from 'events'
import {parse as parseUrl} from 'url'
import Connection from './connection'
import spec from './spec'
import Xapi from './xapi'
import User, {Users} from './models/user'
import {$MappedCollection as MappedCollection} from './MappedCollection'
import {Acls} from './models/acl'
import {generateToken} from './utils'
import {NoSuchObject} from './api-errors'
import {Servers} from './models/server'
import {Tokens} from './models/token'
@@ -272,7 +271,7 @@ export default class Xo extends EventEmitter {
async connectXenServer (id) {
const server = (await this._getXenServer(id)).properties
const xapi = this._xapis[server.id] = createXapiClient({
const xapi = this._xapis[server.id] = new Xapi({
url: server.host,
auth: {
user: server.username,