begin introducing copyFileRange() method to remotes.
This commit is contained in:
@@ -1,22 +1,14 @@
|
||||
import execa from 'execa'
|
||||
import fs from 'fs-extra'
|
||||
import { ignoreErrors, fromCallback } from 'promise-toolbox'
|
||||
import { ignoreErrors } from 'promise-toolbox'
|
||||
import { join } from 'path'
|
||||
import { tmpdir } from 'os'
|
||||
|
||||
import LocalHandler from './local'
|
||||
import { Syscall6 } from 'syscall'
|
||||
import normalizePath from './_normalizePath'
|
||||
import { randomBytes } from 'crypto'
|
||||
|
||||
const sudoExeca = (command, args, opts) =>
|
||||
execa('sudo', [command, ...args], opts)
|
||||
|
||||
const computeRate = (hrtime: number[], size: number) => {
|
||||
const seconds = hrtime[0] + hrtime[1] / 1e9
|
||||
return size / seconds
|
||||
}
|
||||
|
||||
export default class MountHandler extends LocalHandler {
|
||||
constructor(
|
||||
remote,
|
||||
@@ -64,71 +56,6 @@ export default class MountHandler extends LocalHandler {
|
||||
return this._realPath
|
||||
}
|
||||
|
||||
async test(): Promise<Object> {
|
||||
/**
|
||||
* @returns the number of byte effectively copied, needs to be called in a loop!
|
||||
*/
|
||||
function copy_file_range(fdIn, offsetIn, fdOut, offsetOut, dataLen, flags = 0) {
|
||||
// we are stuck on linux x86_64
|
||||
function wrapOffset(offsetIn) {
|
||||
if (offsetIn == null)
|
||||
return 0
|
||||
const offsetInBuffer = new Uint32Array(2)
|
||||
new DataView(offsetInBuffer.buffer).setBigUint64(0, BigInt(offsetIn), true)
|
||||
return offsetInBuffer
|
||||
}
|
||||
|
||||
const SYS_copy_file_range = 326
|
||||
const [ret, _, errno] = Syscall6(SYS_copy_file_range, fdIn, wrapOffset(offsetIn), fdOut, wrapOffset(offsetOut), data.byteLength, 0)
|
||||
if (errno !== 0) {
|
||||
throw new Error('Error no', errno)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
const SIZE = 1024 * 1024 * 10
|
||||
const testFileName = normalizePath(`${Date.now()}.test`)
|
||||
const testFileName2 = normalizePath(`${Date.now()}__2.test`)
|
||||
const data = await fromCallback(randomBytes, SIZE)
|
||||
let step = 'write'
|
||||
try {
|
||||
const writeStart = process.hrtime()
|
||||
await this._outputFile(testFileName, data, { flags: 'wx' })
|
||||
const writeDuration = process.hrtime(writeStart)
|
||||
step = 'duplicate'
|
||||
const fd1 = await this._openFile(testFileName, 'r')
|
||||
const fd2 = await this._openFile(testFileName2, 'w')
|
||||
console.log('_openFile', fd1, fd2, data.byteLength)
|
||||
|
||||
const res = copy_file_range(fd1, 0, fd2, null, data.byteLength, 0)
|
||||
console.log('copy_file_range', res)
|
||||
await this._closeFile(fd2)
|
||||
|
||||
step = 'read'
|
||||
const readStart = process.hrtime()
|
||||
const read = await this._readFile(testFileName, { flags: 'r' })
|
||||
const readDuration = process.hrtime(readStart)
|
||||
|
||||
if (!data.equals(read)) {
|
||||
throw new Error('output and input did not match')
|
||||
}
|
||||
return {
|
||||
success: true,
|
||||
writeRate: computeRate(writeDuration, SIZE),
|
||||
readRate: computeRate(readDuration, SIZE),
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
step,
|
||||
file: testFileName,
|
||||
error: error.message || String(error),
|
||||
}
|
||||
} finally {
|
||||
ignoreErrors.call(this._unlink(testFileName))
|
||||
}
|
||||
}
|
||||
|
||||
async _sync() {
|
||||
// in case of multiple `sync`s, ensure we properly close previous keeper
|
||||
{
|
||||
|
||||
@@ -311,6 +311,26 @@ export default class RemoteHandlerAbstract {
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Slightly different from the copy_file_range linux system call:
|
||||
* - offsets are mandatory (because some remote handlers don't have a current pointer for files)
|
||||
* - flags is fixed to 0
|
||||
* - will not return until copy is finished.
|
||||
*
|
||||
* @param fdIn read open file descriptor
|
||||
* @param offsetIn either start offset in the source file
|
||||
* @param fdOut write open file descriptor (not append!)
|
||||
* @param offsetOut offset in the target file
|
||||
* @param dataLen how long to copy
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async copyFileRange(fdIn, offsetIn, fdOut, offsetOut, dataLen) {
|
||||
// default implementation goes through the network
|
||||
const buffer = Buffer.alloc(dataLen)
|
||||
await this._read(fdIn, buffer, offsetIn)
|
||||
await this._write(fdOut, buffer, offsetOut)
|
||||
}
|
||||
|
||||
async readFile(
|
||||
file: string,
|
||||
{ flags = 'r' }: { flags?: string } = {}
|
||||
@@ -358,13 +378,30 @@ export default class RemoteHandlerAbstract {
|
||||
|
||||
async test(): Promise<Object> {
|
||||
const SIZE = 1024 * 1024 * 10
|
||||
const testFileName = normalizePath(`${Date.now()}.test`)
|
||||
const now = Date.now()
|
||||
const testFileName = normalizePath(`${now}.test`)
|
||||
const testFileName2 = normalizePath(`${now}__dup.test`)
|
||||
const data = await fromCallback(randomBytes, SIZE)
|
||||
let step = 'write'
|
||||
try {
|
||||
const writeStart = process.hrtime()
|
||||
await this._outputFile(testFileName, data, { flags: 'wx' })
|
||||
const writeDuration = process.hrtime(writeStart)
|
||||
step = 'duplicate'
|
||||
const fd1 = await this.openFile(testFileName, 'r')
|
||||
try {
|
||||
const fd2 = await this.openFile(testFileName2, 'wx')
|
||||
try {
|
||||
const cloneStart = process.hrtime()
|
||||
await this.copyFileRange(fd1, 0, fd2, 0, data.byteLength)
|
||||
const cloneDuration = process.hrtime(cloneStart)
|
||||
console.log('cloneDuration', cloneDuration)
|
||||
} finally {
|
||||
await this._closeFile(fd2)
|
||||
}
|
||||
} finally {
|
||||
await this._closeFile(fd1)
|
||||
}
|
||||
|
||||
step = 'read'
|
||||
const readStart = process.hrtime()
|
||||
@@ -374,6 +411,11 @@ export default class RemoteHandlerAbstract {
|
||||
if (!data.equals(read)) {
|
||||
throw new Error('output and input did not match')
|
||||
}
|
||||
|
||||
const read2 = await this._readFile(testFileName2, { flags: 'r' })
|
||||
if (!data.equals(read2)) {
|
||||
throw new Error('duplicated and input did not match')
|
||||
}
|
||||
return {
|
||||
success: true,
|
||||
writeRate: computeRate(writeDuration, SIZE),
|
||||
@@ -388,6 +430,7 @@ export default class RemoteHandlerAbstract {
|
||||
}
|
||||
} finally {
|
||||
ignoreErrors.call(this._unlink(testFileName))
|
||||
ignoreErrors.call(this._unlink(testFileName2))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,7 +471,7 @@ export default class RemoteHandlerAbstract {
|
||||
// Methods that can be called by private methods to avoid parallel limit on public methods
|
||||
|
||||
async __closeFile(fd: FileDescriptor): Promise<void> {
|
||||
await timeout.call(this._closeFile(fd.fd), this._timeout)
|
||||
await timeout.call(this._closeFile(fd), this._timeout)
|
||||
}
|
||||
|
||||
async __mkdir(dir: string): Promise<void> {
|
||||
@@ -660,4 +703,5 @@ function createPrefixWrapperMethods() {
|
||||
defineProperty(pPw, name, descriptor)
|
||||
})
|
||||
}
|
||||
|
||||
createPrefixWrapperMethods()
|
||||
|
||||
@@ -3,6 +3,28 @@ import fs from 'fs-extra'
|
||||
import { fromEvent } from 'promise-toolbox'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import { Syscall6 } from 'syscall'
|
||||
|
||||
/**
|
||||
* @returns the number of byte effectively copied, needs to be called in a loop!
|
||||
*/
|
||||
function copyFileRangeSyscall(fdIn, offsetIn, fdOut, offsetOut, dataLen, flags = 0) {
|
||||
// we are stuck on linux x86_64
|
||||
function wrapOffset(offsetIn) {
|
||||
if (offsetIn == null)
|
||||
return 0
|
||||
const offsetInBuffer = new Uint32Array(2)
|
||||
new DataView(offsetInBuffer.buffer).setBigUint64(0, BigInt(offsetIn), true)
|
||||
return offsetInBuffer
|
||||
}
|
||||
|
||||
const SYS_copy_file_range = 326
|
||||
const [copied, _, errno] = Syscall6(SYS_copy_file_range, fdIn, wrapOffset(offsetIn), fdOut, wrapOffset(offsetOut), dataLen, flags)
|
||||
if (errno !== 0) {
|
||||
throw new Error('Error no ' + errno)
|
||||
}
|
||||
return copied
|
||||
}
|
||||
|
||||
export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
get type() {
|
||||
@@ -18,7 +40,7 @@ export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
}
|
||||
|
||||
async _closeFile(fd) {
|
||||
return fs.close(fd)
|
||||
return fs.close(fd.fd)
|
||||
}
|
||||
|
||||
async _createReadStream(file, options) {
|
||||
@@ -81,6 +103,23 @@ export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
return fs.open(this._getFilePath(path), flags)
|
||||
}
|
||||
|
||||
/**
|
||||
* Slightly different from the linux system call:
|
||||
* - offsets are mandatory (because some remote handlers don't have a current pointer for files)
|
||||
* - flags is fixed to 0
|
||||
* - will not return until copy is finished.
|
||||
*
|
||||
* @param fdIn read open file descriptor
|
||||
* @param offsetIn either start offset in the source file
|
||||
* @param fdOut write open file descriptor (not append!)
|
||||
* @param offsetOut offset in the target file
|
||||
* @param dataLen how long to copy
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async copyFileRange(fdIn, offsetIn, fdOut, offsetOut, dataLen) {
|
||||
await copyFileRangeSyscall(fdIn.fd, offsetIn, fdOut.fd, offsetOut, dataLen)
|
||||
}
|
||||
|
||||
async _read(file, buffer, position) {
|
||||
const needsClose = typeof file === 'string'
|
||||
file = needsClose ? await fs.open(this._getFilePath(file), 'r') : file.fd
|
||||
|
||||
Reference in New Issue
Block a user