make cancelToken an keyword optional argument.

This commit is contained in:
Nicolas Raynaud
2021-02-03 15:43:47 +01:00
parent 2f7af5c05a
commit 78dc03e23e
3 changed files with 12 additions and 8 deletions

View File

@@ -4,6 +4,7 @@
import getStream from 'get-stream'
import asyncMap from '@xen-orchestra/async-map'
import CancelToken from 'promise-toolbox/CancelToken'
import limit from 'limit-concurrency-decorator'
import path, { basename } from 'path'
import synchronized from 'decorator-synchronized'
@@ -173,15 +174,15 @@ export default class RemoteHandlerAbstract {
// write a stream to a file using a temporary file
async outputStream(
cancelToken,
input: Readable | Promise<Readable>,
path: string,
{ checksum = true, dirMode }: { checksum?: boolean, dirMode?: number } = {}
{ checksum = true, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number } = {}
): Promise<void> {
path = normalizePath(path)
return this._outputStream(cancelToken, await input, normalizePath(path), {
return this._outputStream(await input, normalizePath(path), {
checksum,
dirMode,
cancelToken,
})
}
@@ -477,10 +478,9 @@ export default class RemoteHandlerAbstract {
}
async _outputStream(
cancelToken,
input: Readable,
path: string,
{ checksum, dirMode }: { checksum?: boolean, dirMode?: number }
{ checksum, dirMode, cancelToken = CancelToken.none }: { checksum?: boolean, dirMode?: number }
) {
const tmpPath = `${dirname(path)}/.${basename(path)}`
const output = await this._createOutputStreamChecksum(tmpPath, { checksum })

View File

@@ -5,6 +5,7 @@ import { parse } from 'xo-remote-parser'
import RemoteHandlerAbstract from './abstract'
import { createChecksumStream } from './checksum'
import CancelToken from 'promise-toolbox/CancelToken'
// endpoints https://docs.aws.amazon.com/general/latest/gr/s3.html
@@ -51,7 +52,10 @@ export default class S3Handler extends RemoteHandlerAbstract {
return { Bucket: this._bucket, Key: this._dir + file }
}
async _outputStream(cancelToken, input, path, { checksum }) {
async _outputStream(input, path, { checksum, cancelToken = CancelToken.none }) {
cancelToken.promise.then(reason => {
input.destroy(reason)
})
let inputStream = input
if (checksum) {
const checksumStream = createChecksumStream()

View File

@@ -1,7 +1,7 @@
// @flow
import asyncMap from '@xen-orchestra/async-map'
import createLogger from '@xen-orchestra/log'
import { fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { ignoreErrors, timeout } from 'promise-toolbox'
import { parseDuration } from '@vates/parse-duration'
import { debounceWithKey, REMOVE_CACHE_ENTRY } from '../_pDebounceWithKey'
@@ -309,7 +309,7 @@ export default class metadataBackup {
await waitAll([
(async () => {
return timeout.call(
handler.outputStream(cancelToken, stream, fileName).catch(error => {
handler.outputStream(stream, fileName, { cancelToken }).catch(error => {
stream.destroy()
if (error.message !== 'aborted') {
throw error