feat(backups): merge worker concurrency (#6965)

This commit is contained in:
Florent BEAUCHAMP 2023-08-30 09:38:37 +02:00 committed by GitHub
parent ba81d0e08a
commit f5d3bc1f2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 30 deletions

View File

@ -2,18 +2,21 @@
// eslint-disable-next-line eslint-comments/disable-enable-pair
/* eslint-disable n/shebang */
import { asyncEach } from '@vates/async-each'
import { catchGlobalErrors } from '@xen-orchestra/log/configure'
import { createLogger } from '@xen-orchestra/log'
import { getSyncedHandler } from '@xen-orchestra/fs'
import { join } from 'node:path'
import { load as loadConfig } from 'app-conf'
import Disposable from 'promise-toolbox/Disposable'
import min from 'lodash/min.js'
import { getVmBackupDir } from '../_getVmBackupDir.mjs'
import { RemoteAdapter } from '../RemoteAdapter.mjs'
import { CLEAN_VM_QUEUE } from './index.mjs'
const APP_NAME = 'xo-merge-worker'
const APP_DIR = new URL('.', import.meta.url).pathname
// -------------------------------------------------------------------
catchGlobalErrors(createLogger('xo:backups:mergeWorker'))
@ -34,6 +37,7 @@ const main = Disposable.wrap(async function* main(args) {
for (let i = 0; i < 10; ++i) {
const entries = await handler.list(CLEAN_VM_QUEUE)
if (entries.length !== 0) {
entries.sort()
return entries
}
await new Promise(timeoutResolver)
@ -42,38 +46,47 @@ const main = Disposable.wrap(async function* main(args) {
let taskFiles
while ((taskFiles = await listRetry()) !== undefined) {
const taskFileBasename = min(taskFiles)
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
const { concurrency } = await loadConfig(APP_NAME, {
appDir: APP_DIR,
ignoreUnknownFormats: true,
})
await asyncEach(
taskFiles,
async taskFileBasename => {
const previousTaskFile = join(CLEAN_VM_QUEUE, taskFileBasename)
const taskFile = join(CLEAN_VM_QUEUE, '_' + taskFileBasename)
// move this task to the end
try {
await handler.rename(previousTaskFile, taskFile)
} catch (error) {
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
// there is nothing more that can be done
if (error.code === 'ENAMETOOLONG') {
await handler.unlink(previousTaskFile)
}
// move this task to the end
try {
await handler.rename(previousTaskFile, taskFile)
} catch (error) {
// this error occurs if the task failed too many times (i.e. too many `_` prefixes)
// there is nothing more that can be done
if (error.code === 'ENAMETOOLONG') {
await handler.unlink(previousTaskFile)
}
throw error
}
try {
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
try {
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
} catch (error) {
// consider the clean successful if the VM dir is missing
if (error.code !== 'ENOENT') {
throw error
}
}
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
} catch (error) {
warn('failure handling task', { error })
}
try {
const vmDir = getVmBackupDir(String(await handler.readFile(taskFile)))
try {
await adapter.cleanVm(vmDir, { merge: true, logInfo: info, logWarn: warn, remove: true })
} catch (error) {
// consider the clean successful if the VM dir is missing
if (error.code !== 'ENOENT') {
throw error
}
}
handler.unlink(taskFile).catch(error => warn('deleting task failure', { error }))
} catch (error) {
warn('failure handling task', { error })
}
},
{ concurrency }
)
}
})

View File

@ -0,0 +1 @@
concurrency = 1

View File

@ -17,6 +17,7 @@
"test-integration": "node--test *.integ.mjs"
},
"dependencies": {
"@iarna/toml": "^2.2.5",
"@kldzj/stream-throttle": "^1.1.1",
"@vates/async-each": "^1.0.0",
"@vates/cached-dns.lookup": "^1.0.0",
@ -30,6 +31,7 @@
"@xen-orchestra/fs": "^4.0.1",
"@xen-orchestra/log": "^0.6.0",
"@xen-orchestra/template": "^0.1.0",
"app-conf": "^2.3.0",
"compare-versions": "^6.0.0",
"d3-time-format": "^4.1.0",
"decorator-synchronized": "^0.6.0",

View File

@ -11,6 +11,7 @@
- [REST API] Add support for `filter` and `limit` parameters to `backups/logs` and `restore/logs` collections [Forum#64789](https://xcp-ng.org/forum/post/64789)
- [Pool/Advanced] Ability to set a crash dump SR [#5060](https://github.com/vatesfr/xen-orchestra/issues/5060) (PR [#6973](https://github.com/vatesfr/xen-orchestra/pull/6973))
- [Plugin/transport-email] Local hostname can now be configured [Forum#7579](https://xcp-ng.org/forum/topic/7579)
- [Backups] Add setting `concurrency` in a new configuration file `xo-merge-worker` (PR [#6787](https://github.com/vatesfr/xen-orchestra/pull/6787))
### Bug fixes
@ -40,7 +41,7 @@
<!--packages-start-->
- @xen-orchestra/backups patch
- @xen-orchestra/backups minor
- @xen-orchestra/mixins minor
- @xen-orchestra/xapi patch
- xen-api patch

View File

@ -2162,7 +2162,7 @@
resolved "https://registry.yarnpkg.com/@humanwhocodes/object-schema/-/object-schema-1.2.1.tgz#b520529ec21d8e5945a1851dfd1c32e94e39ff45"
integrity sha512-ZnQMnLV4e7hDlUvw8H+U8ASL02SS2Gn6+9Ac3wGGLIe7+je2AeAOxPY+izIPJDfFDb7eDjev0Us8MO1iFRN8hA==
"@iarna/toml@^2.2.0", "@iarna/toml@^2.2.1":
"@iarna/toml@^2.2.0", "@iarna/toml@^2.2.1", "@iarna/toml@^2.2.5":
version "2.2.5"
resolved "https://registry.yarnpkg.com/@iarna/toml/-/toml-2.2.5.tgz#b32366c89b43c6f8cefbdefac778b9c828e3ba8c"
integrity sha512-trnsAYxU3xnS1gPHPyU961coFyLkh4gAD/0zQ5mymY4yOZ+CYvsPqUbOFSw0aDM4y0tV7tiFxL/1XfXPNC6IPg==