Merge remote-tracking branch 'xen-api/master'

This commit is contained in:
Julien Fontanet 2017-10-05 17:33:20 +02:00
commit a0bf1bca8b
24 changed files with 5698 additions and 0 deletions

1
packages/xen-api/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/plot.dat

View File

@ -0,0 +1,24 @@
/benchmark/
/benchmarks/
*.bench.js
*.bench.js.map
/examples/
example.js
example.js.map
*.example.js
*.example.js.map
/fixture/
/fixtures/
*.fixture.js
*.fixture.js.map
*.fixtures.js
*.fixtures.js.map
/test/
/tests/
*.spec.js
*.spec.js.map
__snapshots__/

View File

@ -0,0 +1,9 @@
language: node_js
node_js:
- stable
- 6
- 4
# Use containers.
# http://docs.travis-ci.com/user/workers/container-based-infrastructure/
sudo: false

128
packages/xen-api/README.md Normal file
View File

@ -0,0 +1,128 @@
# xen-api [![Build Status](https://travis-ci.org/vatesfr/xen-orchestra.png?branch=master)](https://travis-ci.org/vatesfr/xen-orchestra)
> Connector to the Xen API
Tested with:
- XenServer 7.1
- XenServer 7
- XenServer 6.5
- XenServer 6.2
- XenServer 5.6
## Install
Installation of the [npm package](https://npmjs.org/package/xen-api):
```
> npm install --save xen-api
```
## Usage
### Library
```javascript
const { createClient } = require('xen-api')
const xapi = createClient({
url: 'https://xen1.company.net',
allowUnauthorized: false,
auth: {
user: 'root',
password: 'important secret password'
},
readOnly: false
})
```
Options:
- `url`: address of a host in the pool we are trying to connect to
- `allowUnauthorized`: whether to accept self-signed certificates
- `auth`: credentials used to sign in (can also be specified in the URL)
- `readOnly = false`: if true, no methods with side-effects can be called
```js
// Force connection.
xapi.connect().catch(error => {
console.error(error)
})
// Watch objects.
xapi.objects.on('add', objects => {
console.log('new objects:', objects)
})
```
> Note: all objects are frozen and cannot be altered!
Custom fields on objects (hidden ie. non enumerable):
- `$type`: the type of the object (`VM`, `task`, …);
- `$ref`: the (opaque) reference of the object;
- `$id`: the identifier of this object (its UUID if any, otherwise its reference);
- `$pool`: the pool object this object belongs to.
Furthermore, any field containing a reference (or references if an
array) can be resolved by prepending the field name with a `$`:
```javascript
console.log(xapi.pool.$master.$resident_VMs[0].name_label)
// vm1
```
### CLI
A CLI is provided to help exploration and discovery of the XAPI.
```
> xen-api https://xen1.company.net root
Password: ******
root@xen1.company.net> xapi.status
'connected'
root@xen1.company.net> xapi.pool.master
'OpaqueRef:ec7c5147-8aee-990f-c70b-0de916a8e993'
root@xen1.company.net> xapi.pool.$master.name_label
'xen1'
```
To ease searches, `find()` and `findAll()` functions are available:
```
root@xen1.company.net> findAll({ $type: 'vm' }).length
183
```
## Development
```
# Install dependencies
> npm install
# Run the tests
> npm test
# Continuously compile
> npm run dev
# Continuously run the tests
> npm run dev-test
# Build for production (automatically called by npm install)
> npm run build
```
## Contributions
Contributions are *very* welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/xen-api/issues)
you've encountered;
- fork and create a pull request.
## License
ISC © [Julien Fontanet](https://github.com/julien-f)

View File

@ -0,0 +1,50 @@
#!/usr/bin/env node
process.env.DEBUG = '*'
const defer = require('golike-defer').default
const pump = require('pump')
const { fromCallback } = require('promise-toolbox')
const { createClient } = require('../')
const { createOutputStream, resolveRef } = require('./utils')
defer(async ($defer, args) => {
let raw = false
if (args[0] === '--raw') {
raw = true
args.shift()
}
if (args.length < 2) {
return console.log('Usage: export-vdi [--raw] <XS URL> <VDI identifier> [<VHD file>]')
}
const xapi = createClient({
allowUnauthorized: true,
url: args[0],
watchEvents: false
})
await xapi.connect()
$defer(() => xapi.disconnect())
// https://xapi-project.github.io/xen-api/snapshots.html#downloading-a-disk-or-snapshot
const exportStream = await xapi.getResource('/export_raw_vdi/', {
query: {
format: raw ? 'raw' : 'vhd',
vdi: await resolveRef(xapi, 'VDI', args[1])
}
})
console.warn('Export task:', exportStream.headers['task-id'])
await fromCallback(cb => pump(
exportStream,
createOutputStream(args[2]),
cb
))
})(process.argv.slice(2)).catch(
console.error.bind(console, 'error')
)

View File

@ -0,0 +1,44 @@
#!/usr/bin/env node
process.env.DEBUG = '*'
const defer = require('golike-defer').default
const pump = require('pump')
const { fromCallback } = require('promise-toolbox')
const { createClient } = require('../')
const { createOutputStream, resolveRef } = require('./utils')
defer(async ($defer, args) => {
if (args.length < 2) {
return console.log('Usage: export-vm <XS URL> <VM identifier> [<XVA file>]')
}
const xapi = createClient({
allowUnauthorized: true,
url: args[0],
watchEvents: false
})
await xapi.connect()
$defer(() => xapi.disconnect())
// https://xapi-project.github.io/xen-api/importexport.html
const exportStream = await xapi.getResource('/export/', {
query: {
ref: await resolveRef(xapi, 'VM', args[1]),
use_compression: 'true'
}
})
console.warn('Export task:', exportStream.headers['task-id'])
await fromCallback(cb => pump(
exportStream,
createOutputStream(args[2]),
cb
))
})(process.argv.slice(2)).catch(
console.error.bind(console, 'error')
)

View File

@ -0,0 +1,40 @@
#!/usr/bin/env node
process.env.DEBUG = '*'
const defer = require('golike-defer').default
const { createClient } = require('../')
const { createInputStream, resolveRef } = require('./utils')
defer(async ($defer, args) => {
let raw = false
if (args[0] === '--raw') {
raw = true
args.shift()
}
if (args.length < 2) {
return console.log('Usage: import-vdi [--raw] <XS URL> <VDI identifier> [<VHD file>]')
}
const xapi = createClient({
allowUnauthorized: true,
url: args[0],
watchEvents: false
})
await xapi.connect()
$defer(() => xapi.disconnect())
// https://xapi-project.github.io/xen-api/snapshots.html#uploading-a-disk-or-snapshot
await xapi.putResource(createInputStream(args[2]), '/import_raw_vdi/', {
query: {
format: raw ? 'raw' : 'vhd',
vdi: await resolveRef(xapi, 'VDI', args[1])
}
})
})(process.argv.slice(2)).catch(
console.error.bind(console, 'error')
)

View File

@ -0,0 +1,31 @@
#!/usr/bin/env node
process.env.DEBUG = '*'
const defer = require('golike-defer').default
const { createClient } = require('../')
const { createInputStream, resolveRef } = require('./utils')
defer(async ($defer, args) => {
if (args.length < 1) {
return console.log('Usage: import-vm <XS URL> [<XVA file>] [<SR identifier>]')
}
const xapi = createClient({
allowUnauthorized: true,
url: args[0],
watchEvents: false
})
await xapi.connect()
$defer(() => xapi.disconnect())
// https://xapi-project.github.io/xen-api/importexport.html
await xapi.putResource(createInputStream(args[1]), '/import/', {
query: args[2] && { sr_id: await resolveRef(xapi, 'SR', args[2]) }
})
})(process.argv.slice(2)).catch(
console.error.bind(console, 'error')
)

View File

@ -0,0 +1,57 @@
#!/usr/bin/env node
require('source-map-support').install()
const { forEach, size } = require('lodash')
const { createClient } = require('../')
// ===================================================================
if (process.argv.length < 3) {
return console.log('Usage: log-events <XS URL>')
}
// ===================================================================
// Creation
const xapi = createClient({
allowUnauthorized: true,
url: process.argv[2]
})
// ===================================================================
// Method call
xapi.connect().then(() => {
xapi.call('VM.get_all_records')
.then(function (vms) {
console.log('%s VMs fetched', size(vms))
})
.catch(function (error) {
console.error(error)
})
})
// ===================================================================
// Objects
const objects = xapi.objects
objects.on('add', objects => {
forEach(objects, object => {
console.log('+ %s: %s', object.$type, object.$id)
})
})
objects.on('update', objects => {
forEach(objects, object => {
console.log('± %s: %s', object.$type, object.$id)
})
})
objects.on('remove', objects => {
forEach(objects, (value, id) => {
console.log('- %s', id)
})
})

View File

@ -0,0 +1,6 @@
{
"dependencies": {
"golike-defer": "^0.1.0",
"pump": "^1.0.2"
}
}

View File

@ -0,0 +1,41 @@
const { createReadStream, createWriteStream, statSync } = require('fs')
const { PassThrough } = require('stream')
const { isOpaqueRef } = require('../')
exports.createInputStream = path => {
if (path === undefined || path === '-') {
return process.stdin
}
const { size } = statSync(path)
const stream = createReadStream(path)
stream.length = size
return stream
}
exports.createOutputStream = path => {
if (path !== undefined && path !== '-') {
return createWriteStream(path)
}
// introduce a through stream because stdout is not a normal stream!
const stream = new PassThrough()
stream.pipe(process.stdout)
return stream
}
exports.resolveRef = (xapi, type, refOrUuidOrNameLabel) =>
isOpaqueRef(refOrUuidOrNameLabel)
? refOrUuidOrNameLabel
: xapi.call(`${type}.get_by_uuid`, refOrUuidOrNameLabel).catch(
() => xapi.call(`${type}.get_by_name_label`, refOrUuidOrNameLabel).then(
refs => {
if (refs.length === 1) {
return refs[0]
}
throw new Error(`no single match for ${type} with name label ${refOrUuidOrNameLabel}`)
}
)
)

View File

@ -0,0 +1,30 @@
# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY.
# yarn lockfile v1
end-of-stream@^1.1.0:
version "1.4.0"
resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.4.0.tgz#7a90d833efda6cfa6eac0f4949dbb0fad3a63206"
dependencies:
once "^1.4.0"
golike-defer@^0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/golike-defer/-/golike-defer-0.1.0.tgz#70a3d8991cdfe41845956bfb578f69bc3e49f525"
once@^1.3.1, once@^1.4.0:
version "1.4.0"
resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1"
dependencies:
wrappy "1"
pump@^1.0.2:
version "1.0.2"
resolved "https://registry.yarnpkg.com/pump/-/pump-1.0.2.tgz#3b3ee6512f94f0e575538c17995f9f16990a5d51"
dependencies:
end-of-stream "^1.1.0"
once "^1.3.1"
wrappy@1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f"

View File

@ -0,0 +1,4 @@
set yrange [ 0 : ]
set grid
plot for [i=2:4] "plot.dat" using 1:i with lines

View File

@ -0,0 +1,107 @@
{
"name": "xen-api",
"version": "0.14.2",
"license": "ISC",
"description": "Connector to the Xen API",
"keywords": [
"xen",
"api",
"xen-api",
"xenapi",
"xapi"
],
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/packages/xen-api",
"bugs": "https://github.com/vatesfr/xo-web/issues",
"repository": {
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Julien Fontanet",
"email": "julien.fontanet@vates.fr"
},
"preferGlobal": false,
"main": "dist/",
"bin": {
"xen-api": "dist/cli.js"
},
"files": [
"dist/"
],
"engines": {
"node": ">=4"
},
"dependencies": {
"babel-polyfill": "^6.23.0",
"blocked": "^1.2.1",
"debug": "^2.6.8",
"event-to-promise": "^0.8.0",
"exec-promise": "^0.7.0",
"http-request-plus": "^0.4.0",
"json-rpc-protocol": "^0.11.2",
"kindof": "^2.0.0",
"lodash": "^4.17.4",
"make-error": "^1.3.0",
"minimist": "^1.2.0",
"ms": "^2.0.0",
"promise-toolbox": "^0.9.5",
"pw": "0.0.4",
"xmlrpc": "^1.3.2",
"xo-collection": "^0.4.1"
},
"devDependencies": {
"babel-cli": "^6.24.1",
"babel-eslint": "^7.2.3",
"babel-plugin-lodash": "^3.2.11",
"babel-plugin-transform-decorators-legacy": "^1.3.4",
"babel-preset-env": "^1.6.0",
"babel-preset-stage-0": "^6.24.1",
"cross-env": "^5.0.5",
"dependency-check": "^2.9.1",
"husky": "^0.14.3",
"jest": "^20.0.4",
"rimraf": "^2.6.1",
"standard": "^10.0.3"
},
"scripts": {
"build": "cross-env NODE_ENV=production babel --source-maps --out-dir=dist/ src/",
"commitmsg": "npm test",
"dev": "cross-env NODE_ENV=development babel --watch --source-maps --out-dir=dist/ src/",
"dev-test": "jest --bail --watch",
"plot": "gnuplot -p memory-test.gnu",
"posttest": "standard && dependency-check ./package.json",
"prebuild": "rimraf dist/",
"predev": "npm run prebuild",
"prepublish": "npm run build",
"test": "jest"
},
"babel": {
"plugins": [
"lodash",
"transform-decorators-legacy"
],
"presets": [
[
"env",
{
"targets": {
"node": 4
}
}
],
"stage-0"
]
},
"jest": {
"roots": [
"<rootDir>/src"
],
"testRegex": "\\.spec\\.js$"
},
"standard": {
"ignore": [
"dist"
],
"parser": "babel-eslint"
}
}

106
packages/xen-api/src/cli.js Executable file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env node
import 'babel-polyfill'
import blocked from 'blocked'
import createDebug from 'debug'
import eventToPromise from 'event-to-promise'
import execPromise from 'exec-promise'
import minimist from 'minimist'
import pw from 'pw'
import { asCallback, fromCallback } from 'promise-toolbox'
import { filter, find, isArray } from 'lodash'
import { start as createRepl } from 'repl'
import { createClient } from './'
// ===================================================================
function askPassword (prompt = 'Password: ') {
if (prompt) {
process.stdout.write(prompt)
}
return new Promise(resolve => {
pw(resolve)
})
}
// ===================================================================
const usage = 'Usage: xen-api <url> [<user> [<password>]]'
const main = async args => {
const opts = minimist(args, {
boolean: ['allow-unauthorized', 'help', 'read-only', 'verbose'],
alias: {
'allow-unauthorized': 'au',
debounce: 'd',
help: 'h',
'read-only': 'ro',
verbose: 'v'
}
})
if (opts.help) {
return usage
}
if (opts.verbose) {
// Does not work perfectly.
//
// https://github.com/visionmedia/debug/pull/156
createDebug.enable('xen-api,xen-api:*')
}
let auth
if (opts._.length > 1) {
const [ , user, password = await askPassword() ] = opts._
auth = { user, password }
}
{
const debug = createDebug('xen-api:perf')
blocked(ms => {
debug('blocked for %sms', ms | 0)
})
}
const xapi = createClient({
url: opts._[0],
allowUnauthorized: opts.au,
auth,
debounce: opts.debounce != null ? +opts.debounce : null,
readOnly: opts.ro
})
await xapi.connect()
const repl = createRepl({
prompt: `${xapi._humanId}> `
})
repl.context.xapi = xapi
repl.context.find = predicate => find(xapi.objects.all, predicate)
repl.context.findAll = predicate => filter(xapi.objects.all, predicate)
// Make the REPL waits for promise completion.
repl.eval = (evaluate => (cmd, context, filename, cb) => {
fromCallback(cb => {
evaluate.call(repl, cmd, context, filename, cb)
}).then(value =>
isArray(value) ? Promise.all(value) : value
)::asCallback(cb)
})(repl.eval)
await eventToPromise(repl, 'exit')
try {
await xapi.disconnect()
} catch (error) {}
}
export default main
if (!module.parent) {
execPromise(main)
}

View File

@ -0,0 +1,948 @@
import Collection from 'xo-collection'
import createDebug from 'debug'
import kindOf from 'kindof'
import ms from 'ms'
import httpRequest from 'http-request-plus'
import { BaseError } from 'make-error'
import { EventEmitter } from 'events'
import { filter, forEach, isArray, isObject, map, noop, omit, reduce, startsWith } from 'lodash'
import {
Cancel,
cancelable,
catchPlus as pCatch,
defer,
delay as pDelay,
fromEvents,
lastly
} from 'promise-toolbox'
import autoTransport from './transports/auto'
const debug = createDebug('xen-api')
// ===================================================================
// http://www.gnu.org/software/libc/manual/html_node/Error-Codes.html
const NETWORK_ERRORS = {
// Connection has been closed outside of our control.
ECONNRESET: true,
// Connection has been aborted locally.
ECONNABORTED: true,
// Host is up but refuses connection (typically: no such service).
ECONNREFUSED: true,
// TODO: ??
EINVAL: true,
// Host is not reachable (does not respond).
EHOSTUNREACH: true,
// Connection configured timed out has been reach.
ETIMEDOUT: true
}
const isNetworkError = ({code}) => NETWORK_ERRORS[code]
// -------------------------------------------------------------------
const XAPI_NETWORK_ERRORS = {
HOST_STILL_BOOTING: true,
HOST_HAS_NO_MANAGEMENT_IP: true
}
const isXapiNetworkError = ({code}) => XAPI_NETWORK_ERRORS[code]
// -------------------------------------------------------------------
const areEventsLost = ({code}) => code === 'EVENTS_LOST'
const isHostSlave = ({code}) => code === 'HOST_IS_SLAVE'
const isMethodUnknown = ({code}) => code === 'MESSAGE_METHOD_UNKNOWN'
const isSessionInvalid = ({code}) => code === 'SESSION_INVALID'
// -------------------------------------------------------------------
class XapiError extends BaseError {
constructor ([code, ...params]) {
super(`${code}(${params.join(', ')})`)
this.code = code
this.params = params
// slot than can be assigned later
this.method = null
}
}
export const wrapError = error => new XapiError(error)
// ===================================================================
const URL_RE = /^(?:(https?:)\/*)?(?:([^:]+):([^@]+)@)?([^/]+?)(?::([0-9]+))?\/?$/
const parseUrl = url => {
const matches = URL_RE.exec(url)
if (!matches) {
throw new Error('invalid URL: ' + url)
}
const [ , protocol = 'https:', username, password, hostname, port ] = matches
return { protocol, username, password, hostname, port }
}
// -------------------------------------------------------------------
const {
create: createObject,
defineProperties,
defineProperty,
freeze: freezeObject
} = Object
// -------------------------------------------------------------------
const OPAQUE_REF_PREFIX = 'OpaqueRef:'
export const isOpaqueRef = value =>
typeof value === 'string' &&
startsWith(value, OPAQUE_REF_PREFIX)
// -------------------------------------------------------------------
const RE_READ_ONLY_METHOD = /^[^.]+\.get_/
const isReadOnlyCall = (method, args) => (
args.length === 1 &&
isOpaqueRef(args[0]) &&
RE_READ_ONLY_METHOD.test(method)
)
// -------------------------------------------------------------------
const getKey = o => o.$id
// -------------------------------------------------------------------
const EMPTY_ARRAY = freezeObject([])
// -------------------------------------------------------------------
const getTaskResult = (task, onSuccess, onFailure) => {
const { status } = task
if (status === 'cancelled') {
return [ onFailure(new Cancel('task canceled')) ]
}
if (status === 'failure') {
return [ onFailure(wrapError(task.error_info)) ]
}
if (status === 'success') {
// the result might be:
// - empty string
// - an opaque reference
// - an XML-RPC value
return [ onSuccess(task.result) ]
}
}
// -------------------------------------------------------------------
// see https://gist.github.com/julien-f/675b01825302bcc85270dd74f15e7cb0
const parseEventId = id => typeof id === 'string'
? +id.split(',')[0]
: id
// -------------------------------------------------------------------
const MAX_TRIES = 5
const CONNECTED = 'connected'
const CONNECTING = 'connecting'
const DISCONNECTED = 'disconnected'
// -------------------------------------------------------------------
export class Xapi extends EventEmitter {
constructor (opts) {
super()
this._allowUnauthorized = opts.allowUnauthorized
this._auth = opts.auth
this._pool = null
this._readOnly = Boolean(opts.readOnly)
this._sessionId = null
const url = this._url = parseUrl(opts.url)
if (this._auth === undefined) {
const user = url.username
if (user !== undefined) {
this._auth = {
user,
password: url.password
}
delete url.username
delete url.password
}
}
if (opts.watchEvents !== false) {
this._debounce = opts.debounce == null
? 200
: opts.debounce
this._eventWatchers = createObject(null)
this._fromToken = ''
// Memoize this function _addObject().
this._getPool = () => this._pool
const objects = this._objects = new Collection()
objects.getKey = getKey
this._objectsByRefs = createObject(null)
this._objectsByRefs['OpaqueRef:NULL'] = null
this._taskWatchers = Object.create(null)
this.on('connected', this._watchEvents)
this.on('disconnected', () => {
this._fromToken = ''
objects.clear()
})
}
}
get _url () {
return this.__url
}
set _url (url) {
this.__url = url
this._call = autoTransport({
allowUnauthorized: this._allowUnauthorized,
url
})
}
get readOnly () {
return this._readOnly
}
set readOnly (ro) {
this._readOnly = Boolean(ro)
}
get sessionId () {
const id = this._sessionId
if (!id || id === CONNECTING) {
throw new Error('sessionId is only available when connected')
}
return id
}
get status () {
const id = this._sessionId
return id
? (
id === CONNECTING
? CONNECTING
: CONNECTED
)
: DISCONNECTED
}
get _humanId () {
return `${this._auth.user}@${this._url.hostname}`
}
// this method injects an artificial event on the pool and waits for
// this event to be received
barrier (type, ref) {
const eventWatchers = this._eventWatchers
if (eventWatchers === undefined) {
return Promise.reject(new Error('Xapi#barrier() requires events watching'))
}
if (type === undefined) {
type = 'pool'
ref = this._pool.$ref
}
return this._sessionCall(
'event.inject',
[ type, ref ]
).then(eventId => {
eventId = parseEventId(eventId)
const { promise, resolve } = defer()
;(
eventWatchers[ref] ||
(eventWatchers[ref] = [])
).push({ eventId, resolve })
return promise
})
}
connect () {
const {status} = this
if (status === CONNECTED) {
return Promise.reject(new Error('already connected'))
}
if (status === CONNECTING) {
return Promise.reject(new Error('already connecting'))
}
const auth = this._auth
if (auth === undefined) {
return Promise.reject(new Error('missing credentials'))
}
this._sessionId = CONNECTING
return this._transportCall('session.login_with_password', [
auth.user,
auth.password
]).then(
sessionId => {
this._sessionId = sessionId
debug('%s: connected', this._humanId)
this.emit(CONNECTED)
},
error => {
this._sessionId = null
throw error
}
)
}
disconnect () {
return Promise.resolve().then(() => {
const { status } = this
if (status === DISCONNECTED) {
return Promise.reject(new Error('already disconnected'))
}
this._sessionId = null
debug('%s: disconnected', this._humanId)
this.emit(DISCONNECTED)
})
}
// High level calls.
call (method, ...args) {
return this._readOnly && !isReadOnlyCall(method, args)
? Promise.reject(new Error(`cannot call ${method}() in read only mode`))
: this._sessionCall(method, args)
}
@cancelable
callAsync ($cancelToken, method, ...args) {
return this.call(`Async.${method}`, ...args).then(taskRef => {
$cancelToken.promise.then(() => {
this._sessionCall('task.cancel', taskRef).catch(noop)
})
return this.watchTask(taskRef)::lastly(() => {
this._sessionCall('task.destroy', taskRef).catch(noop)
})
})
}
// create a task and automatically destroy it when settled
createTask (nameLabel, nameDescription = '') {
const promise = this._sessionCall('task.create', [
nameLabel,
nameDescription
])
promise.then(taskRef => {
const destroy = () =>
this._sessionCall('task.destroy', taskRef).catch(noop)
this.watchTask(taskRef).then(destroy, destroy)
})
return promise
}
// Nice getter which returns the object for a given $id (internal to
// this lib), UUID (unique identifier that some objects have) or
// opaque reference (internal to XAPI).
getObject (idOrUuidOrRef, defaultValue) {
const object = typeof idOrUuidOrRef === 'string'
? (
// if there is an UUID, it is also the $id.
this._objects.all[idOrUuidOrRef] ||
this._objectsByRefs[idOrUuidOrRef]
)
: this._objects.all[idOrUuidOrRef.$id]
if (object) return object
if (arguments.length > 1) return defaultValue
throw new Error('there is not object can be matched to ' + idOrUuidOrRef)
}
// Returns the object for a given opaque reference (internal to
// XAPI).
getObjectByRef (ref, defaultValue) {
const object = this._objectsByRefs[ref]
if (object) return object
if (arguments.length > 1) return defaultValue
throw new Error('there is no object with the ref ' + ref)
}
// Returns the object for a given UUID (unique identifier that some
// objects have).
getObjectByUuid (uuid, defaultValue) {
// Objects ids are already UUIDs if they have one.
const object = this._objects.all[uuid]
if (object) return object
if (arguments.length > 1) return defaultValue
throw new Error('there is no object with the UUID ' + uuid)
}
@cancelable
getResource ($cancelToken, pathname, {
host,
query,
task
}) {
return this._autoTask(
task,
`Xapi#getResource ${pathname}`
).then(taskRef => {
query = { ...query, session_id: this.sessionId }
let taskResult
if (taskRef !== undefined) {
query.task_id = taskRef
taskResult = this.watchTask(taskRef)
if (typeof $cancelToken.addHandler === 'function') {
$cancelToken.addHandler(() => taskResult)
}
}
let promise = httpRequest(
$cancelToken,
this._url,
host && {
hostname: this.getObject(host).address
},
{
pathname,
query,
rejectUnauthorized: !this._allowUnauthorized
}
)
if (taskResult !== undefined) {
promise = promise.then(response => {
response.task = taskResult
return response
})
}
return promise
})
}
@cancelable
putResource ($cancelToken, body, pathname, {
host,
query,
task
} = {}) {
return this._autoTask(
task,
`Xapi#putResource ${pathname}`
).then(taskRef => {
query = { ...query, session_id: this.sessionId }
let taskResult
if (taskRef !== undefined) {
query.task_id = taskRef
taskResult = this.watchTask(taskRef)
if (typeof $cancelToken.addHandler === 'function') {
$cancelToken.addHandler(() => taskResult)
}
}
const headers = {}
// Xen API does not support chunk encoding.
const isStream = typeof body.pipe === 'function'
const { length } = body
if (isStream && length === undefined) {
// add a fake huge content length (1 PiB)
headers['content-length'] = '1125899906842624'
}
const doRequest = override => httpRequest.put(
$cancelToken,
this._url,
host && {
hostname: this.getObject(host).address
},
{
body,
headers,
pathname,
query,
rejectUnauthorized: !this._allowUnauthorized
},
override
)
const promise = isStream
// dummy request to probe for a redirection before consuming body
? doRequest({
body: '',
// omit task_id because this request will fail on purpose
query: 'task_id' in query
? omit(query, 'task_id')
: query,
maxRedirects: 0
}).then(
response => {
response.req.abort()
return doRequest()
},
error => {
let response
if (error != null && (response = error.response) != null) {
response.req.abort()
const { headers: { location }, statusCode } = response
if (statusCode === 302 && location !== undefined) {
return doRequest(location)
}
}
throw error
}
)
// http-request-plus correctly handle redirects if body is not a stream
: doRequest()
return promise.then(response => {
const { req } = response
if (req.finished) {
req.abort()
return taskResult
}
return fromEvents(req, ['close', 'finish']).then(() => {
req.abort()
return taskResult
})
})
})
}
watchTask (ref) {
const watchers = this._taskWatchers
if (watchers === undefined) {
throw new Error('Xapi#watchTask() requires events watching')
}
// allow task object to be passed
if (ref.$ref !== undefined) ref = ref.$ref
let watcher = watchers[ref]
if (watcher === undefined) {
// sync check if the task is already settled
const task = this.objects.all[ref]
if (task !== undefined) {
const result = getTaskResult(task, Promise.resolve, Promise.reject)
if (result) {
return result[0]
}
}
watcher = watchers[ref] = defer()
}
return watcher.promise
}
get pool () {
return this._pool
}
get objects () {
return this._objects
}
// return a promise which resolves to a task ref or undefined
_autoTask (task = this._taskWatchers !== undefined, name) {
if (task === false) {
return Promise.resolve()
}
if (task === true) {
return this.createTask(name)
}
// either a reference or a promise to a reference
return Promise.resolve(task)
}
// Medium level call: handle session errors.
_sessionCall (method, args) {
try {
if (startsWith(method, 'session.')) {
throw new Error('session.*() methods are disabled from this interface')
}
return this._transportCall(method, [this.sessionId].concat(args))
::pCatch(isSessionInvalid, () => {
// XAPI is sometimes reinitialized and sessions are lost.
// Try to login again.
debug('%s: the session has been reinitialized', this._humanId)
this._sessionId = null
return this.connect().then(() => this._sessionCall(method, args))
})
} catch (error) {
return Promise.reject(error)
}
}
_addObject (type, ref, object) {
const {_objectsByRefs: objectsByRefs} = this
const reservedKeys = {
id: true,
pool: true,
ref: true,
type: true
}
const getKey = (key, obj) => reservedKeys[key] && obj === object
? `$$${key}`
: `$${key}`
// Creates resolved properties.
forEach(object, function resolveObject (value, key, object) {
if (isArray(value)) {
if (!value.length) {
// If the array is empty, it isn't possible to be sure that
// it is not supposed to contain links, therefore, in
// benefice of the doubt, a resolved property is defined.
defineProperty(object, getKey(key, object), {
value: EMPTY_ARRAY
})
// Minor memory optimization, use the same empty array for
// everyone.
object[key] = EMPTY_ARRAY
} else if (isOpaqueRef(value[0])) {
// This is an array of refs.
defineProperty(object, getKey(key, object), {
get: () => freezeObject(map(value, (ref) => objectsByRefs[ref]))
})
freezeObject(value)
}
} else if (isObject(value)) {
forEach(value, resolveObject)
freezeObject(value)
} else if (isOpaqueRef(value)) {
defineProperty(object, getKey(key, object), {
get: () => objectsByRefs[value]
})
}
})
// All custom properties are read-only and non enumerable.
defineProperties(object, {
$id: { value: object.uuid || ref },
$pool: { get: this._getPool },
$ref: { value: ref },
$type: { value: type }
})
// Finally freezes the object.
freezeObject(object)
const objects = this._objects
// An object's UUID can change during its life.
const prev = objectsByRefs[ref]
let prevUuid
if (prev && (prevUuid = prev.uuid) && prevUuid !== object.uuid) {
objects.remove(prevUuid)
}
this._objects.set(object)
objectsByRefs[ref] = object
if (type === 'pool') {
this._pool = object
} else if (type === 'task') {
const taskWatchers = this._taskWatchers
let taskWatcher = taskWatchers[ref]
if (
taskWatcher !== undefined &&
getTaskResult(object, taskWatcher.resolve, taskWatcher.reject)
) {
delete taskWatchers[ref]
}
}
return object
}
_removeObject (ref) {
const byRefs = this._objectsByRefs
const object = byRefs[ref]
if (object !== undefined) {
this._objects.unset(object.$id)
delete byRefs[ref]
}
const taskWatchers = this._taskWatchers
const taskWatcher = taskWatchers[ref]
if (taskWatcher !== undefined) {
taskWatcher.reject(new Error('task has been detroyed before completion'))
delete taskWatchers[ref]
}
}
_processEvents (events) {
const eventWatchers = this._eventWatchers
forEach(events, event => {
let object
const { ref } = event
if (event.operation === 'del') {
this._removeObject(ref)
} else {
object = this._addObject(event.class, ref, event.snapshot)
}
if (eventWatchers !== undefined) {
const watchers = eventWatchers[ref]
if (watchers !== undefined) {
const eventId = parseEventId(event.id)
const { length } = watchers
let i = 0
let current
while (
i < length &&
(current = watchers[i]).eventId <= eventId
) {
current.resolve(object)
++i
}
if (i === length) {
delete eventWatchers[ref]
} else {
watchers.splice(0, i)
}
}
}
})
}
_watchEvents () {
const loop = () => this.status === CONNECTED && this._sessionCall('event.from', [
['*'],
this._fromToken,
60 + 0.1 // Force float.
]).then(onSuccess, onFailure)
const onSuccess = ({token, events}) => {
this._fromToken = token
this._processEvents(events)
const debounce = this._debounce
return debounce != null
? pDelay(debounce).then(loop)
: loop()
}
const onFailure = error => {
if (areEventsLost(error)) {
this._fromToken = ''
this._objects.clear()
return loop()
}
throw error
}
return loop()::pCatch(
isMethodUnknown,
// If the server failed, it is probably due to an excessively
// large response.
// Falling back to legacy events watch should be enough.
error => error && error.res && error.res.statusCode === 500,
() => this._watchEventsLegacy()
)
}
// This method watches events using the legacy `event.next` XAPI
// methods.
//
// It also has to manually get all objects first.
_watchEventsLegacy () {
const getAllObjects = () => {
return this._sessionCall('system.listMethods', []).then(methods => {
// Uses introspection to determine the methods to use to get
// all objects.
const getAllRecordsMethods = filter(
methods,
::/\.get_all_records$/.test
)
return Promise.all(map(
getAllRecordsMethods,
method => this._sessionCall(method, []).then(
objects => {
const type = method.slice(0, method.indexOf('.')).toLowerCase()
forEach(objects, (object, ref) => {
this._addObject(type, ref, object)
})
},
error => {
if (error.code !== 'MESSAGE_REMOVED') {
throw error
}
}
)
))
})
}
const watchEvents = () => this._sessionCall('event.register', [ ['*'] ]).then(loop)
const loop = () => this.status === CONNECTED && this._sessionCall('event.next', []).then(onSuccess, onFailure)
const onSuccess = events => {
this._processEvents(events)
const debounce = this._debounce
return debounce == null
? loop()
: pDelay(debounce).then(loop)
}
const onFailure = error => {
if (areEventsLost(error)) {
return this._sessionCall('event.unregister', [ ['*'] ]).then(watchEvents)
}
throw error
}
return getAllObjects().then(watchEvents)
}
}
Xapi.prototype._transportCall = reduce([
function (method, args) {
return this._call(method, args).catch(error => {
if (isArray(error)) {
error = wrapError(error)
}
error.method = method
throw error
})
},
call => function () {
let tries = 1
const loop = () => call.apply(this, arguments)
::pCatch(isNetworkError, isXapiNetworkError, error => {
debug('%s: network error %s', this._humanId, error.code)
if (++tries < MAX_TRIES) {
// TODO: ability to cancel the connection
// TODO: ability to force immediate reconnection
// TODO: implement back-off
return pDelay(5e3).then(loop)
}
debug('%s too many network errors (%s), give up', this._humanId, tries)
// mark as disconnected
this.disconnect()::pCatch(noop)
throw error
})
return loop()
},
call => function loop () {
return call.apply(this, arguments)
::pCatch(isHostSlave, ({params: [master]}) => {
debug('%s: host is slave, attempting to connect at %s', this._humanId, master)
const newUrl = {
...this._url,
hostname: master
}
this.emit('redirect', newUrl)
this._url = newUrl
return loop.apply(this, arguments)
})
},
call => function (method) {
const startTime = Date.now()
return call.apply(this, arguments).then(
result => {
debug(
'%s: %s(...) [%s] ==> %s',
this._humanId,
method,
ms(Date.now() - startTime),
kindOf(result)
)
return result
},
error => {
debug(
'%s: %s(...) [%s] =!> %s',
this._humanId,
method,
ms(Date.now() - startTime),
error
)
throw error
}
)
}
], (call, decorator) => decorator(call))
// ===================================================================
// The default value is a factory function.
export const createClient = opts => new Xapi(opts)

View File

@ -0,0 +1,29 @@
#!/usr/bin/env node
import { delay as pDelay } from 'promise-toolbox'
import { createClient } from './'
const xapi = (() => {
const [ , , url, user, password ] = process.argv
return createClient({
auth: { user, password },
url,
watchEvents: false
})
})()
xapi.connect()
// Get the pool record's ref.
.then(() => xapi.call('pool.get_all'))
// Injects lots of events.
.then(([ poolRef ]) => {
const loop = () => xapi.call('event.inject', 'pool', poolRef)
::pDelay(10) // A small delay is required to avoid overloading the Xen API.
.then(loop)
return loop()
})

View File

@ -0,0 +1,22 @@
#!/usr/bin/env node
import { createClient } from './'
let i = 0
setInterval(() => {
const usage = process.memoryUsage()
console.log(
'%s %s %s %s',
i++,
Math.round(usage.rss / 1e6),
Math.round(usage.heapTotal / 1e6),
Math.round(usage.heapUsed / 1e6)
)
}, 1e2)
const [ , , url, user, password ] = process.argv
createClient({
auth: { user, password },
readOnly: true,
url
}).connect()

View File

@ -0,0 +1,3 @@
import makeError from 'make-error'
export const UnsupportedTransport = makeError('UnsupportedTransport')

View File

@ -0,0 +1,36 @@
import jsonRpc from './json-rpc'
import xmlRpc from './xml-rpc'
import xmlRpcJson from './xml-rpc-json'
import { UnsupportedTransport } from './_utils'
const factories = [ jsonRpc, xmlRpcJson, xmlRpc ]
const { length } = factories
export default opts => {
let i = 0
let call
function create () {
const current = factories[i++](opts)
if (i < length) {
const currentI = i
call = (method, args) => current(method, args).catch(
error => {
if (error instanceof UnsupportedTransport) {
if (currentI === i) { // not changed yet
create()
}
return call(method, args)
}
throw error
}
)
} else {
call = current
}
}
create()
return (method, args) => call(method, args)
}

View File

@ -0,0 +1,38 @@
import httpRequestPlus from 'http-request-plus'
import { format, parse } from 'json-rpc-protocol'
import { UnsupportedTransport } from './_utils'
export default ({ allowUnauthorized, url }) => {
return (method, args) => httpRequestPlus.post(url, {
rejectUnauthorized: !allowUnauthorized,
body: format.request(0, method, args),
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json'
},
path: '/jsonrpc'
}).readAll('utf8').then(
text => {
let response
try {
response = parse(text)
} catch (error) {
throw new UnsupportedTransport()
}
if (response.type === 'response') {
return response.result
}
throw response.error
},
error => {
if (error.response !== undefined) { // HTTP error
throw new UnsupportedTransport()
}
throw error
}
)
}

View File

@ -0,0 +1,97 @@
import { createClient, createSecureClient } from 'xmlrpc'
import { promisify } from 'promise-toolbox'
import { UnsupportedTransport } from './_utils'
const logError = error => {
if (error.res) {
console.error(
'XML-RPC Error: %s (response status %s)',
error.message,
error.res.statusCode
)
console.error('%s', error.body)
}
throw error
}
const SPECIAL_CHARS = {
'\r': '\\r',
'\t': '\\t'
}
const SPECIAL_CHARS_RE = new RegExp(
Object.keys(SPECIAL_CHARS).join('|'),
'g'
)
const parseResult = result => {
const status = result.Status
// Return the plain result if it does not have a valid XAPI
// format.
if (status === undefined) {
return result
}
if (status !== 'Success') {
throw result.ErrorDescription
}
const value = result.Value
// XAPI returns an empty string (invalid JSON) for an empty
// result.
if (value === '') {
return ''
}
try {
return JSON.parse(value)
} catch (error) {
// XAPI JSON sometimes contains invalid characters.
if (!(error instanceof SyntaxError)) {
throw error
}
}
let replaced = false
const fixedValue = value.replace(SPECIAL_CHARS_RE, match => {
replaced = true
return SPECIAL_CHARS[match]
})
if (replaced) {
try {
return JSON.parse(fixedValue)
} catch (error) {
if (!(error instanceof SyntaxError)) {
throw error
}
}
}
throw new UnsupportedTransport()
}
export default ({
allowUnauthorized,
url: { hostname, path, port, protocol }
}) => {
const client = (
protocol === 'https:'
? createSecureClient
: createClient
)({
host: hostname,
path: '/json',
port,
rejectUnauthorized: !allowUnauthorized
})
const call = promisify(client.methodCall, client)
return (method, args) => call(method, args).then(
parseResult,
logError
)
}

View File

@ -0,0 +1,52 @@
import { createClient, createSecureClient } from 'xmlrpc'
import { promisify } from 'promise-toolbox'
const logError = error => {
if (error.res) {
console.error(
'XML-RPC Error: %s (response status %s)',
error.message,
error.res.statusCode
)
console.error('%s', error.body)
}
throw error
}
const parseResult = result => {
const status = result.Status
// Return the plain result if it does not have a valid XAPI
// format.
if (status === undefined) {
return result
}
if (status !== 'Success') {
throw result.ErrorDescription
}
return result.Value
}
export default ({
allowUnauthorized,
url: { hostname, path, port, protocol }
}) => {
const client = (
protocol === 'https:'
? createSecureClient
: createClient
)({
host: hostname,
port,
rejectUnauthorized: !allowUnauthorized
})
const call = promisify(client.methodCall, client)
return (method, args) => call(method, args).then(
parseResult,
logError
)
}

3795
packages/xen-api/yarn.lock Normal file

File diff suppressed because it is too large Load Diff