From b47e097983999e6409239ae7cd9db1cb56c44fa3 Mon Sep 17 00:00:00 2001 From: Julien Fontanet Date: Thu, 28 Mar 2019 13:55:56 +0100 Subject: [PATCH] feat(xen-api/{get,put}Resource): add inactivity detection (#4090) --- packages/xen-api/src/index.js | 261 +++++++++++++++++----------------- 1 file changed, 132 insertions(+), 129 deletions(-) diff --git a/packages/xen-api/src/index.js b/packages/xen-api/src/index.js index b82e6f2ae..253f1ff9d 100644 --- a/packages/xen-api/src/index.js +++ b/packages/xen-api/src/index.js @@ -134,6 +134,7 @@ export class Xapi extends EventEmitter { this._allowUnauthorized = opts.allowUnauthorized this._callTimeout = makeCallSetting(opts.callTimeout, 0) + this._httpInactivityTimeout = opts.httpInactivityTimeout ?? 5 * 60 * 1e3 // 5 mins this._pool = null this._readOnly = Boolean(opts.readOnly) this._RecordsByType = createObject(null) @@ -499,157 +500,159 @@ export class Xapi extends EventEmitter { } @cancelable - getResource($cancelToken, pathname, { host, query, task } = {}) { - return this._autoTask(task, `Xapi#getResource ${pathname}`).then( - taskRef => { - query = { ...query, session_id: this.sessionId } - let taskResult - if (taskRef !== undefined) { - query.task_id = taskRef - taskResult = this.watchTask(taskRef) + async getResource($cancelToken, pathname, { host, query, task } = {}) { + const taskRef = await this._autoTask(task, `Xapi#getResource ${pathname}`) - if (typeof $cancelToken.addHandler === 'function') { - $cancelToken.addHandler(() => taskResult) - } - } + query = { ...query, session_id: this.sessionId } - let promise = pTimeout.call( - httpRequest( - $cancelToken, - this._url, - host && { - hostname: this.getObject(host).address, - }, - { - pathname, - query, - rejectUnauthorized: !this._allowUnauthorized, - } - ), - HTTP_TIMEOUT - ) + let pTaskResult + if (taskRef !== undefined) { + query.task_id = taskRef + pTaskResult = this.watchTask(taskRef) - if (taskResult !== undefined) { - promise = promise.then(response => { - response.task = taskResult - return response - }) - } + if (typeof $cancelToken.addHandler === 'function') { + $cancelToken.addHandler(() => pTaskResult) + } + } - return promise + const response = await httpRequest( + $cancelToken, + this._url, + host !== undefined && { + hostname: this.getObject(host).address, + }, + { + pathname, + query, + rejectUnauthorized: !this._allowUnauthorized, + + // this is an inactivity timeout (unclear in Node doc) + timeout: this._httpInactivityTimeout, } ) + + if (pTaskResult !== undefined) { + response.task = pTaskResult + } + + return response } @cancelable - putResource($cancelToken, body, pathname, { host, query, task } = {}) { + async putResource($cancelToken, body, pathname, { host, query, task } = {}) { if (this._readOnly) { - return Promise.reject( - new Error(new Error('cannot put resource in read only mode')) - ) + throw new Error('cannot put resource in read only mode') } - return this._autoTask(task, `Xapi#putResource ${pathname}`).then( - taskRef => { - query = { ...query, session_id: this.sessionId } + const taskRef = await this._autoTask(task, `Xapi#putResource ${pathname}`) - let taskResult - if (taskRef !== undefined) { - query.task_id = taskRef - taskResult = this.watchTask(taskRef) + query = { ...query, session_id: this.sessionId } - if (typeof $cancelToken.addHandler === 'function') { - $cancelToken.addHandler(() => taskResult) - } - } + let pTaskResult + if (taskRef !== undefined) { + query.task_id = taskRef + pTaskResult = this.watchTask(taskRef) - const headers = {} + if (typeof $cancelToken.addHandler === 'function') { + $cancelToken.addHandler(() => pTaskResult) + } + } - // Xen API does not support chunk encoding. - const isStream = typeof body.pipe === 'function' - const { length } = body - if (isStream && length === undefined) { - // add a fake huge content length (1 PiB) - headers['content-length'] = '1125899906842624' - } + const headers = {} - const doRequest = (...opts) => - pTimeout.call( - httpRequest.put( - $cancelToken, - this._url, - host && { - hostname: this.getObject(host).address, - }, - { - body, - headers, - query, - pathname, - rejectUnauthorized: !this._allowUnauthorized, - }, - ...opts - ), - HTTP_TIMEOUT - ) + // XAPI does not support chunk encoding so there is no proper way to send + // data without knowing its length + // + // as a work-around, a huge content length (1PiB) is added (so that the + // server won't prematurely cut the connection), and the connection will be + // cut once all the data has been sent without waiting for a response + const isStream = typeof body.pipe === 'function' + const useHack = isStream && body.length === undefined + if (useHack) { + console.warn( + this._humanId, + 'Xapi#putResource', + pathname, + 'missing length' + ) - // if a stream, sends a dummy request to probe for a - // redirection before consuming body - const promise = isStream - ? doRequest({ - body: '', + headers['content-length'] = '1125899906842624' + } - // omit task_id because this request will fail on purpose - query: 'task_id' in query ? omit(query, 'task_id') : query, + const doRequest = httpRequest.put.bind( + undefined, + $cancelToken, + this._url, + host !== undefined && { + hostname: this.getObject(host).address, + }, + { + body, + headers, + pathname, + query, + rejectUnauthorized: !this._allowUnauthorized, - maxRedirects: 0, - }).then( - response => { - response.cancel() - return doRequest() - }, - error => { - let response - if (error != null && (response = error.response) != null) { - response.cancel() - - const { - headers: { location }, - statusCode, - } = response - if (statusCode === 302 && location !== undefined) { - // ensure the original query is sent - return doRequest(location, { query }) - } - } - - throw error - } - ) - : doRequest() - - return promise.then(response => { - const { req } = response - - if (taskResult !== undefined) { - taskResult = taskResult.catch(error => { - error.url = response.url - throw error - }) - } - - if (req.finished) { - response.cancel() - return taskResult - } - - return fromEvents(req, ['close', 'finish']).then(() => { - response.cancel() - return taskResult - }) - }) + // this is an inactivity timeout (unclear in Node doc) + timeout: this._httpInactivityTimeout, } ) + + // if body is a stream, sends a dummy request to probe for a redirection + // before consuming body + const response = await (isStream + ? doRequest({ + body: '', + + // omit task_id because this request will fail on purpose + query: 'task_id' in query ? omit(query, 'task_id') : query, + + maxRedirects: 0, + }).then( + response => { + response.cancel() + return doRequest() + }, + error => { + let response + if (error != null && (response = error.response) != null) { + response.cancel() + + const { + headers: { location }, + statusCode, + } = response + if (statusCode === 302 && location !== undefined) { + // ensure the original query is sent + return doRequest(location, { query }) + } + } + + throw error + } + ) + : doRequest()) + + if (pTaskResult !== undefined) { + pTaskResult = pTaskResult.catch(error => { + error.url = response.url + throw error + }) + } + + if (!useHack) { + // consume the response + response.resume() + + return pTaskResult + } + + const { req } = response + if (!req.finished) { + await fromEvents(req, ['close', 'finish']) + } + response.cancel() + return pTaskResult } watchTask(ref) {