Compare commits
13 Commits
feat_smart
...
feat_dedup
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
862d9a6a7f | ||
|
|
06cabcfb21 | ||
|
|
50f378ec1e | ||
|
|
506a6aad08 | ||
|
|
447112b583 | ||
|
|
b380e085d2 | ||
|
|
d752b1ed70 | ||
|
|
16f4fcfd04 | ||
|
|
69a0e0e563 | ||
|
|
456e4f213b | ||
|
|
a6d24a6dfa | ||
|
|
391c778515 | ||
|
|
4e125ede88 |
@@ -660,13 +660,14 @@ export class RemoteAdapter {
|
||||
return path
|
||||
}
|
||||
|
||||
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
|
||||
async writeVhd(path, input, { checksum = true, validator = noop, writeBlockConcurrency, dedup = false } = {}) {
|
||||
const handler = this._handler
|
||||
if (this.useVhdDirectory()) {
|
||||
const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd`
|
||||
const size = await createVhdDirectoryFromStream(handler, dataPath, input, {
|
||||
concurrency: writeBlockConcurrency,
|
||||
compression: this.#getCompressionType(),
|
||||
dedup,
|
||||
async validator() {
|
||||
await input.task
|
||||
return validator.apply(this, arguments)
|
||||
|
||||
@@ -123,19 +123,19 @@ export async function checkAliases(
|
||||
) {
|
||||
const aliasFound = []
|
||||
for (const alias of aliasPaths) {
|
||||
const target = await resolveVhdAlias(handler, alias)
|
||||
|
||||
if (!isVhdFile(target)) {
|
||||
logWarn('alias references non VHD target', { alias, target })
|
||||
if (remove) {
|
||||
logInfo('removing alias and non VHD target', { alias, target })
|
||||
await handler.unlink(target)
|
||||
await handler.unlink(alias)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
let target
|
||||
try {
|
||||
target = await resolveVhdAlias(handler, alias)
|
||||
|
||||
if (!isVhdFile(target)) {
|
||||
logWarn('alias references non VHD target', { alias, target })
|
||||
if (remove) {
|
||||
logInfo('removing alias and non VHD target', { alias, target })
|
||||
await handler.unlink(target)
|
||||
await handler.unlink(alias)
|
||||
}
|
||||
continue
|
||||
}
|
||||
const { dispose } = await openVhd(handler, target)
|
||||
try {
|
||||
await dispose()
|
||||
|
||||
@@ -17,6 +17,7 @@ const DEFAULT_XAPI_VM_SETTINGS = {
|
||||
concurrency: 2,
|
||||
copyRetention: 0,
|
||||
deleteFirst: false,
|
||||
dedup: false,
|
||||
diskPerVmConcurrency: 0, // not limited by default
|
||||
exportRetention: 0,
|
||||
fullInterval: 0,
|
||||
|
||||
@@ -160,6 +160,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
)
|
||||
|
||||
metadataContent = {
|
||||
dedup: settings.dedup,
|
||||
jobId,
|
||||
mode: job.mode,
|
||||
scheduleId,
|
||||
@@ -208,6 +209,7 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
|
||||
// no checksum for VHDs, because they will be invalidated by
|
||||
// merges and chainings
|
||||
checksum: false,
|
||||
dedup: settings.dedup,
|
||||
validator: tmpPath => checkVhd(handler, tmpPath),
|
||||
writeBlockConcurrency: this._config.writeBlockConcurrency,
|
||||
})
|
||||
|
||||
@@ -45,6 +45,34 @@ When `useVhdDirectory` is enabled on the remote, the directory containing the VH
|
||||
└─ <uuid>.vhd
|
||||
```
|
||||
|
||||
#### vhd directory with deduplication
|
||||
|
||||
the difference with non dedup mode is that a hash is computed of each vhd block. The hash is splited in 4 chars token and the data are stored in xo-block-store/{token1}/.../{token7}/{token8}.source.
|
||||
Then a hard link is made from this source to the destination folder in <vdis>/<job UUID>/<VDI UUID>/blocks/{number}/{number}
|
||||
|
||||
|
||||
```
|
||||
<remote>
|
||||
└─ xo-block-store
|
||||
└─ {4 char}
|
||||
└─ ...
|
||||
└─ {char.source}
|
||||
└─ xo-vm-backups
|
||||
├─ index.json // TODO
|
||||
└─ <VM UUID>
|
||||
├─ cache.json.gz
|
||||
├─ vdis
|
||||
│ └─ <job UUID>
|
||||
│ └─ <VDI UUID>
|
||||
│ ├─ index.json // TODO
|
||||
│ ├─ <YYYYMMDD>T<HHmmss>.alias.vhd // contains the relative path to a VHD directory
|
||||
| └─ data
|
||||
| ├─ <uuid>.vhd // VHD directory format is described in vhd-lib/Vhd/VhdDirectory.js
|
||||
├─ <YYYYMMDD>T<HHmmss>.json // backup metadata
|
||||
├─ <YYYYMMDD>T<HHmmss>.xva
|
||||
└─ <YYYYMMDD>T<HHmmss>.xva.checksum
|
||||
```
|
||||
|
||||
## Cache for a VM
|
||||
|
||||
In a VM directory, if the file `cache.json.gz` exists, it contains the metadata for all the backups for this VM.
|
||||
|
||||
23
@xen-orchestra/backups/docs/dedup.md
Normal file
23
@xen-orchestra/backups/docs/dedup.md
Normal file
@@ -0,0 +1,23 @@
|
||||
# Deduplication
|
||||
|
||||
- This this use a additionnal inode (or equivalent on the FS), for each different block in the xo-block-store`sub folder`
|
||||
- This will not work well with immutabilty/object lock
|
||||
- only dedup blocks of vhd directory
|
||||
- prerequisite are : the fs must support hard link and extended attributes
|
||||
- a key (full backup) does not take more space on te remote than a delta. It will take more inodes , and more time since we'll have to read all the blocks. T
|
||||
|
||||
When a new block is written to the remote, a hash is computed. If a file with this hash doesn't exists in xo-block-store` create it, then add the has as an extended attributes.
|
||||
A link hard link, sharing data and extended attributes is then create to the destination
|
||||
|
||||
When deleting a block which has a hash extended attributes, a check is done on the xo-block-store. If there are no other link, then the block is deleted . The directory containing it stays
|
||||
|
||||
When merging block : the unlink method is called before overwriting an existing block
|
||||
|
||||
### troubleshooting
|
||||
|
||||
Since all the blocks are hard linked, you can convert a deduplicated remote to a non deduplicated one by deleting the xo-block-store directory
|
||||
|
||||
two new method has been added to the local fs handler :
|
||||
|
||||
- deduplicationGarbageCollector(), which should be called from the root of the FS : it will clean any block without other links, and any empty directory
|
||||
- deduplicationStats() that will compute the number of blocks in store and how many times they are used
|
||||
@@ -16,6 +16,7 @@ function formatVmBackup(backup) {
|
||||
}),
|
||||
|
||||
id: backup.id,
|
||||
dedup: backup.dedup,
|
||||
jobId: backup.jobId,
|
||||
mode: backup.mode,
|
||||
scheduleId: backup.scheduleId,
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
"bind-property-descriptor": "^2.0.0",
|
||||
"decorator-synchronized": "^0.6.0",
|
||||
"execa": "^5.0.0",
|
||||
"fs-extended-attributes": "^1.0.1",
|
||||
"fs-extra": "^11.1.0",
|
||||
"get-stream": "^6.0.0",
|
||||
"limit-concurrency-decorator": "^0.5.0",
|
||||
|
||||
@@ -268,9 +268,9 @@ export default class RemoteHandlerAbstract {
|
||||
await this._mktree(normalizePath(dir), { mode })
|
||||
}
|
||||
|
||||
async outputFile(file, data, { dirMode, flags = 'wx' } = {}) {
|
||||
async outputFile(file, data, { dedup = false, dirMode, flags = 'wx' } = {}) {
|
||||
const encryptedData = this.#encryptor.encryptData(data)
|
||||
await this._outputFile(normalizePath(file), encryptedData, { dirMode, flags })
|
||||
await this._outputFile(normalizePath(file), encryptedData, { dedup, dirMode, flags })
|
||||
}
|
||||
|
||||
async read(file, buffer, position) {
|
||||
@@ -319,8 +319,8 @@ export default class RemoteHandlerAbstract {
|
||||
await timeout.call(this._rmdir(normalizePath(dir)).catch(ignoreEnoent), this._timeout)
|
||||
}
|
||||
|
||||
async rmtree(dir) {
|
||||
await this._rmtree(normalizePath(dir))
|
||||
async rmtree(dir, { dedup } = {}) {
|
||||
await this._rmtree(normalizePath(dir), { dedup })
|
||||
}
|
||||
|
||||
// Asks the handler to sync the state of the effective remote with its'
|
||||
@@ -397,6 +397,10 @@ export default class RemoteHandlerAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
async checkSupport() {
|
||||
return {}
|
||||
}
|
||||
|
||||
async test() {
|
||||
const SIZE = 1024 * 1024 * 10
|
||||
const testFileName = normalizePath(`${Date.now()}.test`)
|
||||
@@ -437,14 +441,14 @@ export default class RemoteHandlerAbstract {
|
||||
await this._truncate(file, len)
|
||||
}
|
||||
|
||||
async __unlink(file, { checksum = true } = {}) {
|
||||
async __unlink(file, { checksum = true, dedup = false } = {}) {
|
||||
file = normalizePath(file)
|
||||
|
||||
if (checksum) {
|
||||
ignoreErrors.call(this._unlink(checksumFile(file)))
|
||||
}
|
||||
|
||||
await this._unlink(file).catch(ignoreEnoent)
|
||||
await this._unlink(file, { dedup }).catch(ignoreEnoent)
|
||||
}
|
||||
|
||||
async write(file, buffer, position) {
|
||||
@@ -560,17 +564,16 @@ export default class RemoteHandlerAbstract {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async _outputFile(file, data, { dirMode, flags }) {
|
||||
async _outputFile(file, data, { dirMode, flags, dedup = false }) {
|
||||
try {
|
||||
return await this._writeFile(file, data, { flags })
|
||||
return await this._writeFile(file, data, { dedup, flags })
|
||||
} catch (error) {
|
||||
if (error.code !== 'ENOENT') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
await this._mktree(dirname(file), { mode: dirMode })
|
||||
return this._outputFile(file, data, { flags })
|
||||
return this._outputFile(file, data, { dedup, flags })
|
||||
}
|
||||
|
||||
async _outputStream(path, input, { dirMode, validator }) {
|
||||
@@ -613,7 +616,7 @@ export default class RemoteHandlerAbstract {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async _rmtree(dir) {
|
||||
async _rmtree(dir, { dedup } = {}) {
|
||||
try {
|
||||
return await this._rmdir(dir)
|
||||
} catch (error) {
|
||||
@@ -624,7 +627,7 @@ export default class RemoteHandlerAbstract {
|
||||
|
||||
const files = await this._list(dir)
|
||||
await asyncEach(files, file =>
|
||||
this._unlink(`${dir}/${file}`).catch(error => {
|
||||
this._unlink(`${dir}/${file}`, { dedup }).catch(error => {
|
||||
// Unlink dir behavior is not consistent across platforms
|
||||
// https://github.com/nodejs/node-v0.x-archive/issues/5791
|
||||
if (error.code === 'EISDIR' || error.code === 'EPERM') {
|
||||
@@ -639,7 +642,7 @@ export default class RemoteHandlerAbstract {
|
||||
// called to initialize the remote
|
||||
async _sync() {}
|
||||
|
||||
async _unlink(file) {
|
||||
async _unlink(file, opts) {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,8 @@ try {
|
||||
} catch (_) {}
|
||||
|
||||
export const getHandler = (remote, ...rest) => {
|
||||
const Handler = HANDLERS[parse(remote.url).type]
|
||||
const { type } = parse(remote.url)
|
||||
const Handler = HANDLERS[type]
|
||||
if (!Handler) {
|
||||
throw new Error('Unhandled remote type')
|
||||
}
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
import df from '@sindresorhus/df'
|
||||
import fs from 'fs-extra'
|
||||
// import fsx from 'fs-extended-attributes'
|
||||
import lockfile from 'proper-lockfile'
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { fromEvent, retry } from 'promise-toolbox'
|
||||
import { asyncEach } from '@vates/async-each'
|
||||
import { fromEvent, fromCallback, ignoreErrors, retry } from 'promise-toolbox'
|
||||
import { synchronized } from 'decorator-synchronized'
|
||||
|
||||
import RemoteHandlerAbstract from './abstract'
|
||||
import { normalize as normalizePath } from './path'
|
||||
|
||||
import assert from 'node:assert'
|
||||
import { createHash, randomBytes } from 'node:crypto'
|
||||
|
||||
const { info, warn } = createLogger('xo:fs:local')
|
||||
|
||||
@@ -37,6 +44,10 @@ export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
#addSyncStackTrace
|
||||
#retriesOnEagain
|
||||
|
||||
#supportDedup
|
||||
#dedupDirectory = '/xo-block-store'
|
||||
#hashMethod = 'sha256'
|
||||
#attributeKey = `user.hash.${this.#hashMethod}`
|
||||
constructor(remote, opts = {}) {
|
||||
super(remote)
|
||||
|
||||
@@ -194,16 +205,267 @@ export default class LocalHandler extends RemoteHandlerAbstract {
|
||||
return this.#addSyncStackTrace(fs.truncate, this.getFilePath(file), len)
|
||||
}
|
||||
|
||||
async _unlink(file) {
|
||||
const filePath = this.getFilePath(file)
|
||||
async #localUnlink(filePath) {
|
||||
return await this.#addSyncStackTrace(retry, () => fs.unlink(filePath), this.#retriesOnEagain)
|
||||
}
|
||||
async _unlink(file, { dedup } = {}) {
|
||||
const filePath = this.getFilePath(file)
|
||||
let hash
|
||||
// only try to read dedup source if we try to delete something deduplicated
|
||||
if (dedup === true) {
|
||||
try {
|
||||
// get hash before deleting the file
|
||||
hash = await this.#getExtendedAttribute(file, this.#attributeKey)
|
||||
} catch (err) {
|
||||
// whatever : fall back to normal delete
|
||||
}
|
||||
}
|
||||
|
||||
// delete file in place
|
||||
await this.#localUnlink(filePath)
|
||||
|
||||
// implies we are on a deduplicated file
|
||||
if (hash !== undefined) {
|
||||
const dedupPath = this.getFilePath(this.#computeDeduplicationPath(hash))
|
||||
await this.#removeExtendedAttribute(file, this.#attributeKey)
|
||||
try {
|
||||
const { nlink } = await fs.stat(dedupPath)
|
||||
// get the number of copy still using these data
|
||||
// delete source if it's alone
|
||||
if (nlink === 1) {
|
||||
await this.#localUnlink(dedupPath)
|
||||
}
|
||||
} catch (error) {
|
||||
// no problem if another process deleted the source or if we unlink directly the source file
|
||||
if (error.code !== 'ENOENT') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_writeFd(file, buffer, position) {
|
||||
return this.#addSyncStackTrace(fs.write, file.fd, buffer, 0, buffer.length, position)
|
||||
}
|
||||
|
||||
_writeFile(file, data, { flags }) {
|
||||
#localWriteFile(file, data, { flags }) {
|
||||
return this.#addSyncStackTrace(fs.writeFile, this.getFilePath(file), data, { flag: flags })
|
||||
}
|
||||
|
||||
async _writeFile(file, data, { flags, dedup }) {
|
||||
if (dedup === true) {
|
||||
// only compute support once , and only if needed
|
||||
if (this.#supportDedup === undefined) {
|
||||
const supported = await this.checkSupport()
|
||||
this.#supportDedup = supported.hardLink === true && supported.extendedAttributes === true
|
||||
}
|
||||
if (this.#supportDedup) {
|
||||
const hash = this.#hash(data)
|
||||
// create the file (if not already present) in the store
|
||||
const dedupPath = await this.#writeDeduplicationSource(hash, data)
|
||||
// hard link to the target place
|
||||
// this linked file will have the same extended attributes
|
||||
// (used for unlink)
|
||||
return this.#link(dedupPath, file)
|
||||
}
|
||||
}
|
||||
// fallback
|
||||
return this.#localWriteFile(file, data, { flags })
|
||||
}
|
||||
|
||||
#hash(data) {
|
||||
return createHash(this.#hashMethod).update(data).digest('hex')
|
||||
}
|
||||
async #getExtendedAttribute(file, attributeName) {
|
||||
try{
|
||||
return this._readFile(file+attributeName)
|
||||
}catch(err){
|
||||
if(err.code === 'ENOENT'){
|
||||
return
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
async #setExtendedAttribute(file, attributeName, value) {
|
||||
return this._writeFile(file+attributeName, value)
|
||||
}
|
||||
|
||||
async #removeExtendedAttribute(file, attributeName){
|
||||
return this._unlink(file+attributeName)
|
||||
}
|
||||
/*
|
||||
async #getExtendedAttribute(file, attributeName) {
|
||||
return new Promise((resolve, reject) => {
|
||||
fsx.get(this.getFilePath(file), attributeName, (err, res) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
// res is a buffer
|
||||
// it is null if the file doesn't have this attribute
|
||||
if (res !== null) {
|
||||
resolve(res.toString('utf-8'))
|
||||
}
|
||||
resolve(undefined)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
async #setExtendedAttribute(file, attributeName, value) {
|
||||
return new Promise((resolve, reject) => {
|
||||
fsx.set(this.getFilePath(file), attributeName, value, (err, res) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(res)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async #removeExtendedAttribute(file, attributeName){
|
||||
|
||||
}
|
||||
*/
|
||||
|
||||
// create a hard link between to files
|
||||
#link(source, dest) {
|
||||
return fs.link(this.getFilePath(source), this.getFilePath(dest))
|
||||
}
|
||||
|
||||
// split path to keep a sane number of file per directory
|
||||
#computeDeduplicationPath(hash) {
|
||||
assert.strictEqual(hash.length % 4, 0)
|
||||
let path = this.#dedupDirectory
|
||||
for (let i = 0; i < hash.length; i++) {
|
||||
if (i % 4 === 0) {
|
||||
path += '/'
|
||||
}
|
||||
path += hash[i]
|
||||
}
|
||||
path += '.source'
|
||||
return path
|
||||
}
|
||||
|
||||
async #writeDeduplicationSource(hash, data) {
|
||||
const path = this.#computeDeduplicationPath(hash)
|
||||
try {
|
||||
// flags ensures it fails if it already exists
|
||||
// _outputfile will create the directory tree
|
||||
await this._outputFile(path, data, { flags: 'wx' })
|
||||
} catch (error) {
|
||||
// if it is alread present : not a problem
|
||||
if (error.code === 'EEXIST') {
|
||||
// it should already have the extended attributes, nothing more to do
|
||||
return path
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
try {
|
||||
await this.#setExtendedAttribute(path, this.#attributeKey, hash)
|
||||
} catch (error) {
|
||||
if (error.code !== 'ENOENT') {
|
||||
throw error
|
||||
}
|
||||
// if a concurrent process deleted the dedup : recreate it
|
||||
return this.#writeDeduplicationSource(path, hash)
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
/**
|
||||
* delete empty dirs
|
||||
* delete file source thath don't have any more links
|
||||
*
|
||||
* @returns Promise
|
||||
*/
|
||||
|
||||
async deduplicationGarbageCollector(dir = this.#dedupDirectory, alreadyVisited = false) {
|
||||
try {
|
||||
await this._rmdir(dir)
|
||||
return
|
||||
} catch (error) {
|
||||
if (error.code !== 'ENOTEMPTY') {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
// the directory may not be empty after a first visit
|
||||
if (alreadyVisited) {
|
||||
return
|
||||
}
|
||||
|
||||
const files = await this._list(dir)
|
||||
await asyncEach(
|
||||
files,
|
||||
async file => {
|
||||
const stat = await fs.stat(this.getFilePath(`${dir}/${file}`))
|
||||
// have to check the stat to ensure we don't try to delete
|
||||
// the directories : they don't have links
|
||||
if (stat.isDirectory()) {
|
||||
return this.deduplicationGarbageCollector(`${dir}/${file}`)
|
||||
}
|
||||
if (stat.nlink === 1) {
|
||||
return fs.unlink(this.getFilePath(`${dir}/${file}`))
|
||||
}
|
||||
},
|
||||
{ concurrency: 2 }
|
||||
) // since we do a recursive traveral with a deep tree)
|
||||
return this.deduplicationGarbageCollector(dir, true)
|
||||
}
|
||||
|
||||
async deduplicationStats(dir = this.#dedupDirectory) {
|
||||
let nbSourceBlocks = 0
|
||||
let nbBlocks = 0
|
||||
try {
|
||||
const files = await this._list(dir)
|
||||
await asyncEach(
|
||||
files,
|
||||
async file => {
|
||||
const stat = await fs.stat(this.getFilePath(`${dir}/${file}`))
|
||||
if (stat.isDirectory()) {
|
||||
const { nbSourceBlocks: nbSourceInChild, nbBlocks: nbBlockInChild } = await this.deduplicationStats(
|
||||
`${dir}/${file}`
|
||||
)
|
||||
nbSourceBlocks += nbSourceInChild
|
||||
nbBlocks += nbBlockInChild
|
||||
} else {
|
||||
nbSourceBlocks++
|
||||
nbBlocks += stat.nlink - 1 // ignore current
|
||||
}
|
||||
},
|
||||
{ concurrency: 2 }
|
||||
)
|
||||
} catch (err) {
|
||||
if (err.code !== 'ENOENT') {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
return { nbSourceBlocks, nbBlocks }
|
||||
}
|
||||
|
||||
@synchronized()
|
||||
async checkSupport() {
|
||||
const supported = await super.checkSupport()
|
||||
const sourceFileName = normalizePath(`${Date.now()}.sourcededup`)
|
||||
const destFileName = normalizePath(`${Date.now()}.destdedup`)
|
||||
try {
|
||||
const SIZE = 1024 * 1024
|
||||
const data = await fromCallback(randomBytes, SIZE)
|
||||
const hash = this.#hash(data)
|
||||
await this._outputFile(sourceFileName, data, { flags: 'wx', dedup: false })
|
||||
await this.#setExtendedAttribute(sourceFileName, this.#attributeKey, hash)
|
||||
await this.#link(sourceFileName, destFileName)
|
||||
const linkedData = await this._readFile(destFileName)
|
||||
const { nlink } = await fs.stat(this.getFilePath(destFileName))
|
||||
// contains the right data and the link counter
|
||||
supported.hardLink = nlink === 2 && linkedData.equals(data)
|
||||
supported.extendedAttributes = hash === (await this.#getExtendedAttribute(sourceFileName, this.#attributeKey))
|
||||
} catch (error) {
|
||||
warn(`error while testing the dedup`, { error })
|
||||
} finally {
|
||||
ignoreErrors.call(this._unlink(sourceFileName))
|
||||
ignoreErrors.call(this._unlink(destFileName))
|
||||
}
|
||||
return supported
|
||||
}
|
||||
}
|
||||
|
||||
107
@xen-orchestra/fs/src/local.test.js
Normal file
107
@xen-orchestra/fs/src/local.test.js
Normal file
@@ -0,0 +1,107 @@
|
||||
import { after, beforeEach, describe, it } from 'node:test'
|
||||
import assert from 'node:assert'
|
||||
import fs from 'node:fs/promises'
|
||||
import { getSyncedHandler } from './index.js'
|
||||
import { Disposable, pFromCallback } from 'promise-toolbox'
|
||||
import tmp from 'tmp'
|
||||
import execa from 'execa'
|
||||
import { rimraf } from 'rimraf'
|
||||
import { randomBytes } from 'node:crypto'
|
||||
|
||||
// https://xkcd.com/221/
|
||||
const data =
|
||||
'H2GbLa0F2J4LHFLRwLP9zN4dGWJpdx1T6eGWra8BRlV9fBpRGtWIOSKXjU8y7fnxAWVGWpbYPYCwRigvxRSTcuaQsCtwvDNKMmFwYpsGMS14akgBD3EpOMPpKIRRySOsOeknpr48oopO1n9eq0PxGbOcY4Q9aojRu9rn1SMNyjq7YGzwVQEm6twA3etKGSYGvPJVTs2riXm7u6BhBh9VZtQDxQEy5ttkHiZUpgLi6QshSpMjL7dHco8k6gzGcxfpoyS5IzaQeXqDOeRjE6HNn27oUXpze5xRYolQhxA7IqdfzcYwWTqlaZb7UBUZoFCiFs5Y6vPlQVZ2Aw5YganLV1ZcIz78j6TAtXJAfXrDhksm9UteQul8RYT0Ur8AJRYgiGXOsXrWWBKm3CzZci6paLZ2jBmGfgVuBJHlvgFIjOHiVozjulGD4SwKQ2MNqUOylv89NTP1BsJuZ7MC6YCm5yix7FswoE7Y2NhDFqzEQvseRQFyz52AsfuqRY7NruKHlO7LOSI932che2WzxBAwy78Sk1eRHQLsZ37dLB4UkFFIq6TvyjJKznTMAcx9HDOSrFeke6KfsDB1A4W3BAxJk40oAcFMeM72Lg97sJExMJRz1m1nGQJEiGCcnll9G6PqEfHjoOhdDLgN2xewUyvbuRuKEXXxD1H6Tz1iWReyRGSagQNLXvqkKoHoxu3bvSi8nWrbtEY6K2eHLeF5bYubYGXc5VsfiCQNPEzQV4ECzaPdolRtbpRFMcB5aWK70Oew3HJkEcN7IkcXI9vlJKnFvFMqGOHKujd4Tyjhvru2UFh0dAkEwojNzz7W0XlASiXRneea9FgiJNLcrXNtBkvIgw6kRrgbXI6DPJdWDpm3fmWS8EpOICH3aTiXRLQUDZsReAaOsfau1FNtP4JKTQpG3b9rKkO5G7vZEWqTi69mtPGWmyOU47WL1ifJtlzGiFbZ30pcHMc0u4uopHwEQq6ZwM5S6NHvioxihhHQHO8JU2xvcjg5OcTEsXtMwIapD3re'
|
||||
const hash = '09a3cd9e135114cb870a0b5cf0dfd3f4be994662d0c715b65bcfc5e3b635dd40'
|
||||
const dataPath = 'xo-block-store/09a3/cd9e/1351/14cb/870a/0b5c/f0df/d3f4/be99/4662/d0c7/15b6/5bcf/c5e3/b635/dd40.source'
|
||||
|
||||
let dir
|
||||
describe('dedup tests', () => {
|
||||
beforeEach(async () => {
|
||||
dir = await pFromCallback(cb => tmp.dir(cb))
|
||||
})
|
||||
after(async () => {
|
||||
await rimraf(dir)
|
||||
})
|
||||
|
||||
it('works in general case ', async () => {
|
||||
await Disposable.use(getSyncedHandler({ url: `file://${dir}` }, { dedup: true }), async handler => {
|
||||
await handler.outputFile('in/a/sub/folder/file', data, { dedup: true })
|
||||
assert.doesNotReject(handler.list('xo-block-store'))
|
||||
assert.strictEqual((await handler.list('xo-block-store')).length, 1)
|
||||
assert.strictEqual((await handler.list('in/a/sub/folder')).length, 1)
|
||||
assert.strictEqual((await handler.readFile('in/a/sub/folder/file')).toString('utf-8'), data)
|
||||
const value = (await execa('getfattr', ['-n', 'user.hash.sha256', '--only-value', dir + '/in/a/sub/folder/file']))
|
||||
.stdout
|
||||
assert.strictEqual(value, hash)
|
||||
// the source file is created
|
||||
assert.strictEqual((await handler.readFile(dataPath)).toString('utf-8'), data)
|
||||
|
||||
await handler.outputFile('in/anotherfolder/file', data, { dedup: true })
|
||||
assert.strictEqual((await handler.list('in/anotherfolder')).length, 1)
|
||||
assert.strictEqual((await handler.readFile('in/anotherfolder/file')).toString('utf-8'), data)
|
||||
|
||||
await handler.unlink('in/a/sub/folder/file', { dedup: true })
|
||||
// source is still here
|
||||
assert.strictEqual((await handler.readFile(dataPath)).toString('utf-8'), data)
|
||||
assert.strictEqual((await handler.readFile('in/anotherfolder/file')).toString('utf-8'), data)
|
||||
|
||||
await handler.unlink('in/anotherfolder/file', { dedup: true })
|
||||
// source should have been deleted
|
||||
assert.strictEqual(
|
||||
(
|
||||
await handler.list(
|
||||
'xo-block-store/09a3/cd9e/1351/14cb/870a/0b5c/f0df/d3f4/be99/4662/d0c7/15b6/5bcf/c5e3/b635'
|
||||
)
|
||||
).length,
|
||||
0
|
||||
)
|
||||
assert.strictEqual((await handler.list('in/anotherfolder')).length, 0)
|
||||
})
|
||||
})
|
||||
|
||||
it('garbage collector an stats ', async () => {
|
||||
await Disposable.use(getSyncedHandler({ url: `file://${dir}` }, { dedup: true }), async handler => {
|
||||
await handler.outputFile('in/anotherfolder/file', data, { dedup: true })
|
||||
await handler.outputFile('in/anotherfolder/same', data, { dedup: true })
|
||||
await handler.outputFile('in/a/sub/folder/file', randomBytes(1024), { dedup: true })
|
||||
|
||||
let stats = await handler.deduplicationStats()
|
||||
assert.strictEqual(stats.nbBlocks, 3)
|
||||
assert.strictEqual(stats.nbSourceBlocks, 2)
|
||||
|
||||
await fs.unlink(`${dir}/in/a/sub/folder/file`, { dedup: true })
|
||||
assert.strictEqual((await handler.list('xo-block-store')).length, 2)
|
||||
|
||||
await handler.deduplicationGarbageCollector()
|
||||
stats = await handler.deduplicationStats()
|
||||
assert.strictEqual(stats.nbBlocks, 2)
|
||||
assert.strictEqual(stats.nbSourceBlocks, 1)
|
||||
|
||||
assert.strictEqual((await handler.list('xo-block-store')).length, 1)
|
||||
})
|
||||
})
|
||||
|
||||
it('compute support', async () => {
|
||||
await Disposable.use(getSyncedHandler({ url: `file://${dir}` }, { dedup: true }), async handler => {
|
||||
const supported = await handler.checkSupport()
|
||||
assert.strictEqual(supported.hardLink, true, 'support hard link is not present in local fs')
|
||||
assert.strictEqual(supported.extendedAttributes, true, 'support extended attributes is not present in local fs')
|
||||
})
|
||||
})
|
||||
|
||||
it('handles edge cases : source deleted', async () => {
|
||||
await Disposable.use(getSyncedHandler({ url: `file://${dir}` }, { dedup: true }), async handler => {
|
||||
await handler.outputFile('in/a/sub/folder/edge', data, { dedup: true })
|
||||
await handler.unlink(dataPath, { dedup: true })
|
||||
// no error if source si already deleted
|
||||
await assert.doesNotReject(() => handler.unlink('in/a/sub/folder/edge', { dedup: true }))
|
||||
})
|
||||
})
|
||||
it('handles edge cases : non deduplicated file ', async () => {
|
||||
await Disposable.use(getSyncedHandler({ url: `file://${dir}` }, { dedup: true }), async handler => {
|
||||
await handler.outputFile('in/a/sub/folder/edge', data, { dedup: false })
|
||||
// no error if deleting a non dedup file with dedup flags
|
||||
await assert.doesNotReject(() => handler.unlink('in/a/sub/folder/edge', { dedup: true }))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -228,6 +228,11 @@ export default class S3Handler extends RemoteHandlerAbstract {
|
||||
},
|
||||
})
|
||||
async _writeFile(file, data, options) {
|
||||
if (options?.dedup ?? false) {
|
||||
throw new Error(
|
||||
"S3 remotes don't support deduplication from XO, please use the deduplication of your S3 provider if any"
|
||||
)
|
||||
}
|
||||
return this.#s3.send(
|
||||
new PutObjectCommand({
|
||||
...this.#createParams(file),
|
||||
|
||||
@@ -104,7 +104,7 @@ describe('VhdAbstract', async () => {
|
||||
it('renames and unlink a VhdDirectory', async () => {
|
||||
const initalSize = 4
|
||||
const vhdDirectory = `${tempDir}/randomfile.dir`
|
||||
await createRandomVhdDirectory(vhdDirectory, initalSize)
|
||||
await createRandomVhdDirectory(vhdDirectory, initalSize, { dedup: true })
|
||||
|
||||
await Disposable.use(async function* () {
|
||||
const handler = yield getSyncedHandler({ url: 'file:///' })
|
||||
@@ -116,11 +116,24 @@ describe('VhdAbstract', async () => {
|
||||
// it should clean an existing directory
|
||||
await fs.mkdir(targetFileName)
|
||||
await fs.writeFile(`${targetFileName}/dummy`, 'I exists')
|
||||
await VhdAbstract.unlink(handler, `${targetFileName}/dummy`)
|
||||
await VhdAbstract.unlink(handler, `${targetFileName}`)
|
||||
assert.equal(await fs.exists(`${targetFileName}/dummy`), false)
|
||||
})
|
||||
})
|
||||
|
||||
it('unlinks a deduplicated VhdDirectory', async () => {
|
||||
const initalSize = 4
|
||||
const vhdDirectory = `${tempDir}/random.vhd`
|
||||
await createRandomVhdDirectory(vhdDirectory, initalSize, { dedup: true })
|
||||
|
||||
await Disposable.use(async function* () {
|
||||
const handler = yield getSyncedHandler({ url: 'file:///' })
|
||||
|
||||
await VhdAbstract.unlink(handler, vhdDirectory)
|
||||
assert.equal(await fs.exists(vhdDirectory), false)
|
||||
})
|
||||
})
|
||||
|
||||
it('Creates, renames and unlink alias', async () => {
|
||||
const initalSize = 4
|
||||
const rawFileName = `${tempDir}/randomfile`
|
||||
|
||||
@@ -206,7 +206,14 @@ exports.VhdAbstract = class VhdAbstract {
|
||||
await handler.unlink(resolved)
|
||||
} catch (err) {
|
||||
if (err.code === 'EISDIR') {
|
||||
await handler.rmtree(resolved)
|
||||
// @todo : should we open it ?
|
||||
const chunkFilters = await handler.readFile(resolved + '/chunk-filters.json').then(JSON.parse, error => {
|
||||
if (error.code === 'ENOENT') {
|
||||
return []
|
||||
}
|
||||
throw error
|
||||
})
|
||||
await handler.rmtree(resolved, { dedup: chunkFilters[1] === true })
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ const NULL_COMPRESSOR = {
|
||||
}
|
||||
|
||||
const COMPRESSORS = {
|
||||
none: NULL_COMPRESSOR,
|
||||
gzip: {
|
||||
compress: (
|
||||
gzip => buffer =>
|
||||
@@ -78,6 +79,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
#header
|
||||
footer
|
||||
#compressor
|
||||
#dedup
|
||||
|
||||
get compressionType() {
|
||||
return this.#compressor.id
|
||||
@@ -102,8 +104,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
this.#uncheckedBlockTable = blockTable
|
||||
}
|
||||
|
||||
static async open(handler, path, { flags = 'r+' } = {}) {
|
||||
const vhd = new VhdDirectory(handler, path, { flags })
|
||||
static async open(handler, path, { compression, flags = 'r+' } = {}) {
|
||||
const dedup = path.endsWith('dedup.vhd')
|
||||
const vhd = new VhdDirectory(handler, path, { compression, dedup, flags })
|
||||
|
||||
// openning a file for reading does not trigger EISDIR as long as we don't really read from it :
|
||||
// https://man7.org/linux/man-pages/man2/open.2.html
|
||||
@@ -117,9 +120,9 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
static async create(handler, path, { flags = 'wx+', compression } = {}) {
|
||||
static async create(handler, path, { flags = 'wx+', compression, dedup } = {}) {
|
||||
await handler.mktree(path)
|
||||
const vhd = new VhdDirectory(handler, path, { flags, compression })
|
||||
const vhd = new VhdDirectory(handler, path, { flags, compression, dedup })
|
||||
return {
|
||||
dispose: () => {},
|
||||
value: vhd,
|
||||
@@ -132,6 +135,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
this._path = path
|
||||
this._opts = opts
|
||||
this.#compressor = getCompressor(opts?.compression)
|
||||
this.#dedup = opts?.dedup ?? false
|
||||
this.writeBlockAllocationTable = synchronized()(this.writeBlockAllocationTable)
|
||||
}
|
||||
|
||||
@@ -158,7 +162,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
}
|
||||
|
||||
async _writeChunk(partName, buffer) {
|
||||
async _writeChunk(partName, buffer, dedup = false) {
|
||||
assert.notStrictEqual(
|
||||
this._opts?.flags,
|
||||
'r',
|
||||
@@ -168,7 +172,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
// in case of VhdDirectory, we want to create the file if it does not exists
|
||||
const flags = this._opts?.flags === 'r+' ? 'w' : this._opts?.flags
|
||||
const compressed = await this.#compressor.compress(buffer)
|
||||
return this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags })
|
||||
return this._handler.outputFile(this.#getChunkPath(partName), compressed, { flags, dedup })
|
||||
}
|
||||
|
||||
// put block in subdirectories to limit impact when doing directory listing
|
||||
@@ -262,6 +266,10 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
try {
|
||||
const blockExists = this.containsBlock(blockId)
|
||||
if (blockExists && this.#dedup) {
|
||||
// this will trigger the dedup store cleaning if needed
|
||||
await this._handler.unlink(this._getFullBlockPath(blockId), { dedup: true })
|
||||
}
|
||||
await this._handler.rename(childBlockPath, this._getFullBlockPath(blockId))
|
||||
if (!blockExists) {
|
||||
setBitmap(this.#blockTable, blockId)
|
||||
@@ -285,7 +293,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
|
||||
}
|
||||
|
||||
async writeEntireBlock(block) {
|
||||
await this._writeChunk(this.#getBlockPath(block.id), block.buffer)
|
||||
await this._writeChunk(this.#getBlockPath(block.id), block.buffer, this.#dedup)
|
||||
setBitmap(this.#blockTable, block.id)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@ const { asyncEach } = require('@vates/async-each')
|
||||
|
||||
const { warn } = createLogger('vhd-lib:createVhdDirectoryFromStream')
|
||||
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression }) {
|
||||
const vhd = yield VhdDirectory.create(handler, path, { compression })
|
||||
const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, { concurrency, compression, dedup }) {
|
||||
const vhd = yield VhdDirectory.create(handler, path, { compression, dedup })
|
||||
await asyncEach(
|
||||
parseVhdStream(inputStream),
|
||||
async function (item) {
|
||||
@@ -45,10 +45,10 @@ exports.createVhdDirectoryFromStream = async function createVhdDirectoryFromStre
|
||||
handler,
|
||||
path,
|
||||
inputStream,
|
||||
{ validator, concurrency = 16, compression } = {}
|
||||
{ validator, concurrency = 16, compression, dedup } = {}
|
||||
) {
|
||||
try {
|
||||
const size = await buildVhd(handler, path, inputStream, { concurrency, compression })
|
||||
const size = await buildVhd(handler, path, inputStream, { concurrency, compression, dedup })
|
||||
if (validator !== undefined) {
|
||||
await validator.call(this, path)
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ exports.recoverRawContent = async function recoverRawContent(vhdName, rawName, o
|
||||
}
|
||||
|
||||
// @ todo how can I call vhd-cli copy from here
|
||||
async function convertToVhdDirectory(rawFileName, vhdFileName, path) {
|
||||
async function convertToVhdDirectory(rawFileName, vhdFileName, path, { dedup = false } = {}) {
|
||||
fs.mkdirp(path)
|
||||
|
||||
const srcVhd = await fs.open(vhdFileName, 'r')
|
||||
@@ -95,15 +95,17 @@ async function convertToVhdDirectory(rawFileName, vhdFileName, path) {
|
||||
await fs.read(srcRaw, blockData, 0, blockData.length, offset)
|
||||
await fs.writeFile(path + '/blocks/0/' + i, Buffer.concat([bitmap, blockData]))
|
||||
}
|
||||
|
||||
await fs.writeFile(path + '/chunk-filters.json', JSON.stringify(['none', dedup]))
|
||||
await fs.close(srcRaw)
|
||||
}
|
||||
exports.convertToVhdDirectory = convertToVhdDirectory
|
||||
|
||||
exports.createRandomVhdDirectory = async function createRandomVhdDirectory(path, sizeMB) {
|
||||
exports.createRandomVhdDirectory = async function createRandomVhdDirectory(path, sizeMB, { dedup = false } = {}) {
|
||||
fs.mkdirp(path)
|
||||
const rawFileName = `${path}/temp.raw`
|
||||
await createRandomFile(rawFileName, sizeMB)
|
||||
const vhdFileName = `${path}/temp.vhd`
|
||||
await convertFromRawToVhd(rawFileName, vhdFileName)
|
||||
await convertToVhdDirectory(rawFileName, vhdFileName, path)
|
||||
await convertToVhdDirectory(rawFileName, vhdFileName, path, { dedup })
|
||||
}
|
||||
|
||||
@@ -533,8 +533,7 @@ const xoItemToRender = {
|
||||
<span>
|
||||
<Icon icon='xo-cloud-config' /> <ShortDate timestamp={createdAt} />
|
||||
</span>
|
||||
)
|
||||
,
|
||||
),
|
||||
// XO objects.
|
||||
pool: props => <Pool {...props} />,
|
||||
|
||||
@@ -602,6 +601,7 @@ const xoItemToRender = {
|
||||
</span>{' '}
|
||||
<span className='tag tag-warning'>{backup.remote.name}</span>{' '}
|
||||
{backup.size !== undefined && <span className='tag tag-info'>{formatSize(backup.size)}</span>}{' '}
|
||||
{backup.dedup === true && <span className='tag tag-info'>deduplicated</span>}{' '}
|
||||
<FormattedDate
|
||||
value={new Date(backup.timestamp)}
|
||||
month='long'
|
||||
|
||||
Reference in New Issue
Block a user