Custom HTTP request implementation instead of got.

This commit is contained in:
Julien Fontanet 2016-01-16 16:56:51 +01:00
parent fc82f185cb
commit 23f1965398
7 changed files with 207 additions and 135 deletions

View File

@ -55,7 +55,7 @@
"fatfs": "^0.10.3",
"fs-extra": "^0.26.2",
"fs-promise": "^0.3.1",
"got": "^5.0.0",
"get-stream": "^1.1.0",
"graceful-fs": "^4.1.2",
"hashy": "~0.4.2",
"highland": "^2.5.1",

View File

@ -3,7 +3,6 @@ $find = require 'lodash.find'
$findIndex = require 'lodash.findindex'
$forEach = require 'lodash.foreach'
endsWith = require 'lodash.endswith'
got = require('got')
startsWith = require 'lodash.startswith'
{coroutine: $coroutine} = require 'bluebird'
{

View File

@ -6,7 +6,6 @@ $isArray = require 'lodash.isarray'
endsWith = require 'lodash.endswith'
escapeStringRegexp = require 'escape-string-regexp'
eventToPromise = require 'event-to-promise'
got = require('got')
sortBy = require 'lodash.sortby'
startsWith = require 'lodash.startswith'
{coroutine: $coroutine} = require 'bluebird'
@ -767,17 +766,16 @@ handleExport = $coroutine (req, res, {xapi, id, compress, onlyMetadata}) ->
compress: compress ? true,
onlyMetadata: onlyMetadata ? false
})
upstream = stream.response
res.on('close', () ->
stream.cancel()
)
# Remove the filename as it is already part of the URL.
upstream.headers['content-disposition'] = 'attachment'
stream.headers['content-disposition'] = 'attachment'
res.writeHead(
upstream.statusCode,
upstream.statusMessage ? '',
upstream.headers
stream.statusCode,
stream.statusMessage ? '',
stream.headers
)
stream.pipe(res)
return
@ -818,14 +816,8 @@ handleVmImport = $coroutine (req, res, { xapi, srId }) ->
# See https://github.com/nodejs/node/issues/3319
req.setTimeout(43200000) # 12 hours
contentLength = req.headers['content-length']
if !contentLength
res.writeHead(411)
res.end('Content length is mandatory')
return
try
vm = yield xapi.importVm(req, contentLength, { srId })
vm = yield xapi.importVm(req, { srId })
res.end(format.response(0, vm.$id))
catch e
res.writeHead(500)

121
src/http-request.js Normal file
View File

@ -0,0 +1,121 @@
import assign from 'lodash.assign'
import getStream from 'get-stream'
import isString from 'lodash.isstring'
import startsWith from 'lodash.startswith'
import { parse as parseUrl } from 'url'
import { request as httpRequest } from 'http'
import { request as httpsRequest } from 'https'
import { stringify as formatQueryString } from 'querystring'
// -------------------------------------------------------------------
export default (...args) => {
let req
const pResponse = new Promise((resolve, reject) => {
const opts = {}
for (let i = 0, { length } = args; i < length; ++i) {
const arg = args[i]
assign(opts, isString(arg) ? parseUrl(arg) : arg)
}
const {
body,
headers: { ...headers } = {},
protocol,
query,
...rest
} = opts
if (headers['content-length'] == null && body != null) {
let tmp
if (isString(body)) {
headers['content-length'] = Buffer.byteLength(body)
} else if (
(
(tmp = body.headers) &&
(tmp = tmp['content-length']) != null
) ||
(tmp = body.length) != null
) {
headers['content-length'] = tmp
}
}
if (query) {
rest.path = `${rest.pathname || rest.path || '/'}?${
isString(query)
? query
: formatQueryString(query)
}`
}
// Some headers can be explicitly removed by setting them to null.
const headersToRemove = []
for (const header in headers) {
if (headers[header] === null) {
delete headers[header]
headersToRemove.push(header)
}
}
req = (
protocol && startsWith(protocol.toLowerCase(), 'https')
? httpsRequest
: httpRequest
)({
...rest,
headers
})
for (let i = 0, { length } = headersToRemove; i < length; ++i) {
req.removeHeader(headersToRemove[i])
}
if (body) {
if (typeof body.pipe === 'function') {
body.pipe(req)
} else {
req.end(body)
}
} else {
req.end()
}
req.on('error', reject)
req.once('response', resolve)
}).then(response => {
response.cancel = () => {
req.abort()
}
response.readAll = () => getStream(response)
const length = response.headers['content-length']
if (length) {
response.length = length
}
const code = response.statusCode
if (code < 200 || code >= 300) {
const error = new Error(response.statusMessage)
error.code = code
Object.defineProperty(error, 'response', {
configurable: true,
value: response,
writable: true
})
throw error
}
return response
})
pResponse.cancel = () => {
req.emit('error', new Error('HTTP request canceled!'))
req.abort()
}
pResponse.readAll = () => pResponse.then(response => response.readAll())
pResponse.request = req
return pResponse
}

View File

@ -1,8 +1,8 @@
import endsWith from 'lodash.endswith'
import got from 'got'
import JSON5 from 'json5'
import { BaseError } from 'make-error'
import httpRequest from './http-request'
import { parseDateTime } from './xapi'
const RRD_STEP_SECONDS = 5
@ -391,8 +391,8 @@ export default class XapiStats {
// Execute one http request on a XenServer for get stats
// Return stats (Json format) or throws got exception
async _getJson (url) {
const response = await got(url, { rejectUnauthorized: false })
return JSON5.parse(response.body)
const body = await httpRequest(url, { rejectUnauthorized: false }).readAll()
return JSON5.parse(body)
}
async _getLastTimestamp (xapi, host, step) {

View File

@ -1,7 +1,6 @@
import createDebug from 'debug'
import eventToPromise from 'event-to-promise'
import find from 'lodash.find'
import got from 'got'
import includes from 'lodash.includes'
import isFunction from 'lodash.isfunction'
import sortBy from 'lodash.sortby'
@ -9,14 +8,13 @@ import fatfs from 'fatfs'
import fatfsBuffer, { init as fatfsBufferInit } from './fatfs-buffer'
import unzip from 'julien-f-unzip'
import { PassThrough } from 'stream'
import { request as httpRequest } from 'http'
import { stringify as formatQueryString } from 'querystring'
import { utcFormat, utcParse } from 'd3-time-format'
import {
wrapError as wrapXapiError,
Xapi as XapiBase
} from 'xen-api'
import httpRequest from './http-request'
import {debounce} from './decorators'
import {
bufferToStream,
@ -50,6 +48,37 @@ function extractOpaqueRef (str) {
return matches[0]
}
// HTTP put, use an ugly hack if the length is not known because XAPI
// does not support chunk encoding.
const put = (stream, {
headers: { ...headers } = {},
...opts
}) => {
const { length } = stream
if (length == null) {
headers['transfer-encoding'] = null
} else {
headers['content-length'] = length
}
const promise = httpRequest({
...opts,
body: stream,
headers,
method: 'put'
})
if (length != null || !promise.request) {
return promise.readAll()
}
promise.request.once('finish', () => {
promise.cancel()
})
return promise.catch(() => new Buffer(0))
}
// ===================================================================
const typeToNamespace = createRawObject()
@ -376,7 +405,7 @@ export default class Xapi extends XapiBase {
// FIXME: should be static
@debounce(24 * 60 * 60 * 1000)
async _getXenUpdates () {
const {body, statusCode} = await got(
const { readAll, statusCode } = await httpRequest(
'http://updates.xensource.com/XenServer/updates.xml'
)
@ -384,7 +413,7 @@ export default class Xapi extends XapiBase {
throw new JsonRpcError('cannot fetch patches list from Citrix')
}
const {patchdata: data} = parseXml(body)
const data = parseXml(await readAll()).patchdata
const patches = createRawObject()
forEach(data.patches.patch, patch => {
@ -536,22 +565,19 @@ export default class Xapi extends XapiBase {
// -----------------------------------------------------------------
async uploadPoolPatch (stream, length, patchName = 'unknown') {
async uploadPoolPatch (stream, patchName = 'unknown') {
const taskRef = await this._createTask('Upload: ' + patchName)
const [, patchRef] = await Promise.all([
got('http://' + this.pool.$master.address + '/pool_patch_upload', {
method: 'put',
body: stream,
const [ patchRef ] = await Promise.all([
this._watchTask(taskRef),
put(stream, {
hostname: this.pool.$master.address,
path: '/pool_patch_upload',
query: {
session_id: this.sessionId,
task_id: taskRef
},
headers: {
'content-length': length
}
}),
this._watchTask(taskRef)
})
])
return this._getOrWaitObject(patchRef)
@ -571,10 +597,9 @@ export default class Xapi extends XapiBase {
const PATCH_RE = /\.xsupdate$/
const proxy = new PassThrough()
got.stream(patchInfo.url).on('error', error => {
// TODO: better error handling
console.error(error)
}).pipe(unzip.Parse()).on('entry', entry => {
const stream = await httpRequest(patchInfo.url)
stream.pipe(unzip.Parse()).on('entry', entry => {
if (PATCH_RE.test(entry.path)) {
proxy.emit('length', entry.size)
entry.pipe(proxy)
@ -824,7 +849,6 @@ export default class Xapi extends XapiBase {
const vm = await targetXapi._getOrWaitObject(
await targetXapi._importVm(
stream,
stream.length,
sr,
false,
onVmCreation
@ -1062,10 +1086,9 @@ export default class Xapi extends XapiBase {
})
}
const stream = got.stream({
return httpRequest({
hostname: host.address,
path: onlyMetadata ? '/export_metadata/' : '/export/'
}, {
path: onlyMetadata ? '/export_metadata/' : '/export/',
query: {
ref: snapshotRef || vm.$ref,
session_id: this.sessionId,
@ -1073,28 +1096,6 @@ export default class Xapi extends XapiBase {
use_compression: compress ? 'true' : 'false'
}
})
const [ request, response ] = await Promise.all([
eventToPromise(stream, 'request'),
eventToPromise(stream, 'response')
])
// Provide a way to cancel the operation.
stream.cancel = () => {
request.abort()
}
const { headers: {
'content-length': length
} } = response
if (length) {
stream.length = length
}
// TODO: remove when no longer used.
stream.response = response
return stream
}
async _migrateVMWithStorageMotion (vm, hostXapi, host, {
@ -1129,30 +1130,7 @@ export default class Xapi extends XapiBase {
)
}
async _putWithoutLength (stream, hostname, path, query) {
const request = httpRequest({
hostname,
method: 'PUT',
path: `${path}?${formatQueryString(query)}`
})
request.removeHeader('transfer-encoding')
stream.pipe(request)
// An error can occur after the finish event and therefore will
// not be handled by eventToPromise().
//
// As a consequence, add an empty error handler to avoid a crash.
request.on('error', noop)
await eventToPromise(request, 'finish')
// The request will never finish because the XAPI has no way to no
// it is finished, therefore it will never send a response.
request.abort()
}
async _importVm (stream, length, sr, onlyMetadata = false, onVmCreation = undefined) {
async _importVm (stream, sr, onlyMetadata = false, onVmCreation = undefined) {
const taskRef = await this._createTask('VM import')
const query = {
force: onlyMetadata
@ -1172,17 +1150,6 @@ export default class Xapi extends XapiBase {
const path = onlyMetadata ? '/import_metadata/' : '/import/'
const upload = length
? got.put({
hostname: host.address,
path
}, {
body: stream,
headers: { 'content-length': length },
query
})
: this._putWithoutLength(stream, host.address, path, query)
if (onVmCreation) {
this._waitObject(
obj => obj && obj.current_operations && taskRef in obj.current_operations
@ -1191,7 +1158,11 @@ export default class Xapi extends XapiBase {
const [ vmRef ] = await Promise.all([
this._watchTask(taskRef).then(extractOpaqueRef),
upload
put(stream, {
hostname: host.address,
path,
query
})
])
// Importing a metadata archive of running VMs is currently
@ -1206,13 +1177,12 @@ export default class Xapi extends XapiBase {
}
// TODO: an XVA can contain multiple VMs
async importVm (stream, length, {
async importVm (stream, {
onlyMetadata = false,
srId
} = {}) {
return await this._getOrWaitObject(await this._importVm(
stream,
length,
srId && this.getObject(srId),
onlyMetadata
))
@ -1637,26 +1607,16 @@ export default class Xapi extends XapiBase {
if (baseId) {
query.base = this.getObject(baseId).$ref
}
const stream = got.stream({
return httpRequest({
hostname: host.address,
path: '/export_raw_vdi/'
}, {
path: '/export_raw_vdi/',
query
})
const request = await eventToPromise(stream, 'request')
// Provide a way to cancel the operation.
stream.cancel = () => {
request.abort()
}
return stream
}
// -----------------------------------------------------------------
async importVdiContent (vdiId, stream, { length, format = VDI_FORMAT_VHD } = {}) {
async importVdiContent (vdiId, stream, { format = VDI_FORMAT_VHD } = {}) {
const vdi = this.getObject(vdiId)
const taskRef = await this._createTask('VDI import')
@ -1669,16 +1629,12 @@ export default class Xapi extends XapiBase {
const host = vdi.$SR.$PBDs[0].$host
const upload = length
? got.put({
hostname: host.address,
path: '/import_raw_vdi/'
}, {
body: stream,
headers: { 'content-length': length },
query
})
: this._putWithoutLength(stream, host.address, '/import_raw_vdi/', query)
const upload = put(stream, {
hostname: host.address,
method: 'put',
path: '/import_raw_vdi/',
query
})
await Promise.all([
upload,

View File

@ -81,6 +81,7 @@ export default class {
return backups
}
// TODO: move into utils and rename!
async _openAndwaitReadableFile (path, errorMessage) {
const stream = createReadStream(path)
@ -93,20 +94,22 @@ export default class {
throw error
}
const stats = await stat(path)
stream.length = (await stat(path)).size
return [ stream, stats.size ]
return stream
}
async importVmBackup (remoteId, file, sr) {
const remote = await this._xo.getRemote(remoteId)
const path = `${remote.path}/${file}`
const [ stream, length ] = await this._openAndwaitReadableFile(
path, 'VM to import not found in this remote')
const stream = await this._openAndwaitReadableFile(
path,
'VM to import not found in this remote'
)
const xapi = this._xo.getXapi(sr)
await xapi.importVm(stream, length, { srId: sr._xapiId })
await xapi.importVm(stream, { srId: sr._xapiId })
}
// -----------------------------------------------------------------
@ -239,12 +242,12 @@ export default class {
}
async _importVdiBackupContent (xapi, file, vdiId) {
const [ stream, length ] = await this._openAndwaitReadableFile(
file, 'VDI to import not found in this remote'
const stream = await this._openAndwaitReadableFile(
file,
'VDI to import not found in this remote'
)
await xapi.importVdiContent(vdiId, stream, {
length,
format: VDI_FORMAT_VHD
})
}
@ -421,10 +424,11 @@ export default class {
}
async _importVmMetadata (xapi, file) {
const [ stream, length ] = await this._openAndwaitReadableFile(
file, 'VM metadata to import not found in this remote'
const stream = await this._openAndwaitReadableFile(
file,
'VM metadata to import not found in this remote'
)
return await xapi.importVm(stream, length, { onlyMetadata: true })
return await xapi.importVm(stream, { onlyMetadata: true })
}
async _importDeltaVdiBackupFromVm (xapi, vmId, remoteId, directory, vdiInfo) {