feat(fs): add timeouts to atomic operations (#3534)

Fixes #3467
This commit is contained in:
Enishowk
2018-10-16 08:38:46 +00:00
committed by Julien Fontanet
parent a8ad13f60e
commit 61db0269a2
4 changed files with 162 additions and 34 deletions

View File

@@ -2,7 +2,7 @@
import getStream from 'get-stream'
import { randomBytes } from 'crypto'
import { fromCallback, fromEvent, ignoreErrors } from 'promise-toolbox'
import { fromCallback, fromEvent, ignoreErrors, timeout } from 'promise-toolbox'
import { type Readable, type Writable } from 'stream'
import { parse } from 'xo-remote-parser'
@@ -17,12 +17,18 @@ type File = FileDescriptor | string
const checksumFile = file => file + '.checksum'
export const DEFAULT_TIMEOUT = 10000
export default class RemoteHandlerAbstract {
_remote: Object
constructor (remote: any) {
this._remote = { ...remote, ...parse(remote.url) }
if (this._remote.type !== this.type) {
throw new Error('Incorrect remote type')
if (remote.url === 'test://') {
this._remote = remote
} else {
this._remote = { ...remote, ...parse(remote.url) }
if (this._remote.type !== this.type) {
throw new Error('Incorrect remote type')
}
}
}
@@ -121,7 +127,7 @@ export default class RemoteHandlerAbstract {
newPath: string,
{ checksum = false }: Object = {}
) {
let p = this._rename(oldPath, newPath)
let p = timeout.call(this._rename(oldPath, newPath), DEFAULT_TIMEOUT)
if (checksum) {
p = Promise.all([
p,
@@ -142,7 +148,7 @@ export default class RemoteHandlerAbstract {
prependDir = false,
}: { filter?: (name: string) => boolean, prependDir?: boolean } = {}
): Promise<string[]> {
let entries = await this._list(dir)
let entries = await timeout.call(this._list(dir), DEFAULT_TIMEOUT)
if (filter !== undefined) {
entries = entries.filter(filter)
}
@@ -165,28 +171,30 @@ export default class RemoteHandlerAbstract {
{ checksum = false, ignoreMissingChecksum = false, ...options }: Object = {}
): Promise<LaxReadable> {
const path = typeof file === 'string' ? file : file.path
const streamP = this._createReadStream(file, options).then(stream => {
// detect early errors
let promise = fromEvent(stream, 'readable')
const streamP = timeout
.call(this._createReadStream(file, options), DEFAULT_TIMEOUT)
.then(stream => {
// detect early errors
let promise = fromEvent(stream, 'readable')
// try to add the length prop if missing and not a range stream
if (
stream.length === undefined &&
options.end === undefined &&
options.start === undefined
) {
promise = Promise.all([
promise,
ignoreErrors.call(
this.getSize(file).then(size => {
stream.length = size
})
),
])
}
// try to add the length prop if missing and not a range stream
if (
stream.length === undefined &&
options.end === undefined &&
options.start === undefined
) {
promise = Promise.all([
promise,
ignoreErrors.call(
this.getSize(file).then(size => {
stream.length = size
})
),
])
}
return promise.then(() => stream)
})
return promise.then(() => stream)
})
if (!checksum) {
return streamP
@@ -224,7 +232,10 @@ export default class RemoteHandlerAbstract {
}
async openFile (path: string, flags?: string): Promise<FileDescriptor> {
return { fd: await this._openFile(path, flags), path }
return {
fd: await timeout.call(this._openFile(path, flags), DEFAULT_TIMEOUT),
path,
}
}
async _openFile (path: string, flags?: string): Promise<mixed> {
@@ -232,7 +243,7 @@ export default class RemoteHandlerAbstract {
}
async closeFile (fd: FileDescriptor): Promise<void> {
await this._closeFile(fd.fd)
await timeout.call(this._closeFile(fd.fd), DEFAULT_TIMEOUT)
}
async _closeFile (fd: mixed): Promise<void> {
@@ -252,10 +263,13 @@ export default class RemoteHandlerAbstract {
{ checksum = false, ...options }: Object = {}
): Promise<LaxWritable> {
const path = typeof file === 'string' ? file : file.path
const streamP = this._createOutputStream(file, {
flags: 'wx',
...options,
})
const streamP = timeout.call(
this._createOutputStream(file, {
flags: 'wx',
...options,
}),
DEFAULT_TIMEOUT
)
if (!checksum) {
return streamP
@@ -290,7 +304,7 @@ export default class RemoteHandlerAbstract {
ignoreErrors.call(this._unlink(checksumFile(file)))
}
await this._unlink(file)
await timeout.call(this._unlink(file), DEFAULT_TIMEOUT)
}
async _unlink (file: mixed): Promise<void> {
@@ -298,7 +312,7 @@ export default class RemoteHandlerAbstract {
}
async getSize (file: mixed): Promise<number> {
return this._getSize(file)
return timeout.call(this._getSize(file), DEFAULT_TIMEOUT)
}
async _getSize (file: mixed): Promise<number> {

View File

@@ -0,0 +1,111 @@
/* eslint-env jest */
import { TimeoutError } from 'promise-toolbox'
import AbstractHandler, { DEFAULT_TIMEOUT } from './abstract'
class TestHandler extends AbstractHandler {
constructor (impl) {
super({ url: 'test://' })
Object.keys(impl).forEach(method => {
this[`_${method}`] = impl[method]
})
}
}
describe('rename()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
rename: () => new Promise(() => {}),
})
const promise = testHandler.rename('oldPath', 'newPath')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('list()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
list: () => new Promise(() => {}),
})
const promise = testHandler.list()
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('createReadStream()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
createReadStream: () => new Promise(() => {}),
})
const promise = testHandler.createReadStream('file')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('openFile()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
openFile: () => new Promise(() => {}),
})
const promise = testHandler.openFile('path')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('closeFile()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
closeFile: () => new Promise(() => {}),
})
const promise = testHandler.closeFile({ fd: undefined, path: '' })
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('createOutputStream()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
createOutputStream: () => new Promise(() => {}),
})
const promise = testHandler.createOutputStream('File')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('unlink()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
unlink: () => new Promise(() => {}),
})
const promise = testHandler.unlink('')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})
describe('getSize()', () => {
it(`return TimeoutError after ${DEFAULT_TIMEOUT} ms`, async () => {
const testHandler = new TestHandler({
getSize: () => new Promise(() => {}),
})
const promise = testHandler.getSize('')
jest.advanceTimersByTime(DEFAULT_TIMEOUT)
await expect(promise).rejects.toThrowError(TimeoutError)
})
})

View File

@@ -15,9 +15,11 @@
### Bug fixes
- [Remotes] Fix removal of broken remotes [#3327](https://github.com/vatesfr/xen-orchestra/issues/3327) (PR [#3521](https://github.com/vatesfr/xen-orchestra/pull/3521))
- [Backups] Fix stuck backups due to broken NFS remotes [#3467](https://github.com/vatesfr/xen-orchestra/issues/3467) (PR [#3534](https://github.com/vatesfr/xen-orchestra/pull/3534))
### Released packages
- @xen-orchestra/fs v0.4.0
- vhd-lib v0.4.0
- xen-api v0.20.0
- xo-server-usage-report v0.7.0

View File

@@ -33,6 +33,7 @@
}
},
"jest": {
"timers": "fake",
"collectCoverage": true,
"projects": [
"<rootDir>"