feat(xen-api/{get,put}Resource): add inactivity detection (#4090)

This commit is contained in:
Julien Fontanet 2019-03-28 13:55:56 +01:00 committed by GitHub
parent e44dbfb2a4
commit b47e097983
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -134,6 +134,7 @@ export class Xapi extends EventEmitter {
this._allowUnauthorized = opts.allowUnauthorized this._allowUnauthorized = opts.allowUnauthorized
this._callTimeout = makeCallSetting(opts.callTimeout, 0) this._callTimeout = makeCallSetting(opts.callTimeout, 0)
this._httpInactivityTimeout = opts.httpInactivityTimeout ?? 5 * 60 * 1e3 // 5 mins
this._pool = null this._pool = null
this._readOnly = Boolean(opts.readOnly) this._readOnly = Boolean(opts.readOnly)
this._RecordsByType = createObject(null) this._RecordsByType = createObject(null)
@ -499,157 +500,159 @@ export class Xapi extends EventEmitter {
} }
@cancelable @cancelable
getResource($cancelToken, pathname, { host, query, task } = {}) { async getResource($cancelToken, pathname, { host, query, task } = {}) {
return this._autoTask(task, `Xapi#getResource ${pathname}`).then( const taskRef = await this._autoTask(task, `Xapi#getResource ${pathname}`)
taskRef => {
query = { ...query, session_id: this.sessionId }
let taskResult
if (taskRef !== undefined) {
query.task_id = taskRef
taskResult = this.watchTask(taskRef)
if (typeof $cancelToken.addHandler === 'function') { query = { ...query, session_id: this.sessionId }
$cancelToken.addHandler(() => taskResult)
}
}
let promise = pTimeout.call( let pTaskResult
httpRequest( if (taskRef !== undefined) {
$cancelToken, query.task_id = taskRef
this._url, pTaskResult = this.watchTask(taskRef)
host && {
hostname: this.getObject(host).address,
},
{
pathname,
query,
rejectUnauthorized: !this._allowUnauthorized,
}
),
HTTP_TIMEOUT
)
if (taskResult !== undefined) { if (typeof $cancelToken.addHandler === 'function') {
promise = promise.then(response => { $cancelToken.addHandler(() => pTaskResult)
response.task = taskResult }
return response }
})
}
return promise const response = await httpRequest(
$cancelToken,
this._url,
host !== undefined && {
hostname: this.getObject(host).address,
},
{
pathname,
query,
rejectUnauthorized: !this._allowUnauthorized,
// this is an inactivity timeout (unclear in Node doc)
timeout: this._httpInactivityTimeout,
} }
) )
if (pTaskResult !== undefined) {
response.task = pTaskResult
}
return response
} }
@cancelable @cancelable
putResource($cancelToken, body, pathname, { host, query, task } = {}) { async putResource($cancelToken, body, pathname, { host, query, task } = {}) {
if (this._readOnly) { if (this._readOnly) {
return Promise.reject( throw new Error('cannot put resource in read only mode')
new Error(new Error('cannot put resource in read only mode'))
)
} }
return this._autoTask(task, `Xapi#putResource ${pathname}`).then( const taskRef = await this._autoTask(task, `Xapi#putResource ${pathname}`)
taskRef => {
query = { ...query, session_id: this.sessionId }
let taskResult query = { ...query, session_id: this.sessionId }
if (taskRef !== undefined) {
query.task_id = taskRef
taskResult = this.watchTask(taskRef)
if (typeof $cancelToken.addHandler === 'function') { let pTaskResult
$cancelToken.addHandler(() => taskResult) if (taskRef !== undefined) {
} query.task_id = taskRef
} pTaskResult = this.watchTask(taskRef)
const headers = {} if (typeof $cancelToken.addHandler === 'function') {
$cancelToken.addHandler(() => pTaskResult)
}
}
// Xen API does not support chunk encoding. const headers = {}
const isStream = typeof body.pipe === 'function'
const { length } = body
if (isStream && length === undefined) {
// add a fake huge content length (1 PiB)
headers['content-length'] = '1125899906842624'
}
const doRequest = (...opts) => // XAPI does not support chunk encoding so there is no proper way to send
pTimeout.call( // data without knowing its length
httpRequest.put( //
$cancelToken, // as a work-around, a huge content length (1PiB) is added (so that the
this._url, // server won't prematurely cut the connection), and the connection will be
host && { // cut once all the data has been sent without waiting for a response
hostname: this.getObject(host).address, const isStream = typeof body.pipe === 'function'
}, const useHack = isStream && body.length === undefined
{ if (useHack) {
body, console.warn(
headers, this._humanId,
query, 'Xapi#putResource',
pathname, pathname,
rejectUnauthorized: !this._allowUnauthorized, 'missing length'
}, )
...opts
),
HTTP_TIMEOUT
)
// if a stream, sends a dummy request to probe for a headers['content-length'] = '1125899906842624'
// redirection before consuming body }
const promise = isStream
? doRequest({
body: '',
// omit task_id because this request will fail on purpose const doRequest = httpRequest.put.bind(
query: 'task_id' in query ? omit(query, 'task_id') : query, undefined,
$cancelToken,
this._url,
host !== undefined && {
hostname: this.getObject(host).address,
},
{
body,
headers,
pathname,
query,
rejectUnauthorized: !this._allowUnauthorized,
maxRedirects: 0, // this is an inactivity timeout (unclear in Node doc)
}).then( timeout: this._httpInactivityTimeout,
response => {
response.cancel()
return doRequest()
},
error => {
let response
if (error != null && (response = error.response) != null) {
response.cancel()
const {
headers: { location },
statusCode,
} = response
if (statusCode === 302 && location !== undefined) {
// ensure the original query is sent
return doRequest(location, { query })
}
}
throw error
}
)
: doRequest()
return promise.then(response => {
const { req } = response
if (taskResult !== undefined) {
taskResult = taskResult.catch(error => {
error.url = response.url
throw error
})
}
if (req.finished) {
response.cancel()
return taskResult
}
return fromEvents(req, ['close', 'finish']).then(() => {
response.cancel()
return taskResult
})
})
} }
) )
// if body is a stream, sends a dummy request to probe for a redirection
// before consuming body
const response = await (isStream
? doRequest({
body: '',
// omit task_id because this request will fail on purpose
query: 'task_id' in query ? omit(query, 'task_id') : query,
maxRedirects: 0,
}).then(
response => {
response.cancel()
return doRequest()
},
error => {
let response
if (error != null && (response = error.response) != null) {
response.cancel()
const {
headers: { location },
statusCode,
} = response
if (statusCode === 302 && location !== undefined) {
// ensure the original query is sent
return doRequest(location, { query })
}
}
throw error
}
)
: doRequest())
if (pTaskResult !== undefined) {
pTaskResult = pTaskResult.catch(error => {
error.url = response.url
throw error
})
}
if (!useHack) {
// consume the response
response.resume()
return pTaskResult
}
const { req } = response
if (!req.finished) {
await fromEvents(req, ['close', 'finish'])
}
response.cancel()
return pTaskResult
} }
watchTask(ref) { watchTask(ref) {