Compare commits

..

1 Commits

Author SHA1 Message Date
Florent Beauchamp
d47c7c6064 feat(s3): add completion control to s3 backup 2022-05-09 10:01:09 +02:00
19 changed files with 78 additions and 150 deletions

View File

@@ -32,7 +32,7 @@ exports.DIR_XO_CONFIG_BACKUPS = DIR_XO_CONFIG_BACKUPS
const DIR_XO_POOL_METADATA_BACKUPS = 'xo-pool-metadata-backups'
exports.DIR_XO_POOL_METADATA_BACKUPS = DIR_XO_POOL_METADATA_BACKUPS
const { debug, warn } = createLogger('xo:backups:RemoteAdapter')
const { warn } = createLogger('xo:backups:RemoteAdapter')
const compareTimestamp = (a, b) => a.timestamp - b.timestamp
@@ -224,7 +224,7 @@ class RemoteAdapter {
async deleteDeltaVmBackups(backups) {
const handler = this._handler
debug(`deleteDeltaVmBackups will delete ${backups.length} delta backups`, { backups })
// this will delete the json, unused VHDs will be detected by `cleanVm`
await asyncMapSettled(backups, ({ _filename }) => handler.unlink(_filename))
}

View File

@@ -5,7 +5,7 @@ const sum = require('lodash/sum')
const { asyncMap } = require('@xen-orchestra/async-map')
const { Constants, mergeVhd, openVhd, VhdAbstract, VhdFile } = require('vhd-lib')
const { isVhdAlias, resolveVhdAlias } = require('vhd-lib/aliases')
const { dirname, resolve, basename } = require('path')
const { dirname, resolve } = require('path')
const { DISK_TYPES } = Constants
const { isMetadataFile, isVhdFile, isXvaFile, isXvaSumFile } = require('./_backupType.js')
const { limitConcurrency } = require('limit-concurrency-decorator')
@@ -90,7 +90,7 @@ async function mergeVhdChain(chain, { handler, onLog, remove, merge }) {
asyncMap(children.slice(0, -1), child => {
onLog(`the VHD ${child} is unused`)
if (remove) {
onLog(`mergeVhdChain: deleting unused VHD ${child}`)
onLog(`deleting unused VHD ${child}`)
return VhdAbstract.unlink(handler, child)
}
}),
@@ -383,7 +383,7 @@ exports.cleanVm = async function cleanVm(
const vhdChainsToMerge = { __proto__: null }
const toCheck = new Set(unusedVhds)
let shouldDelete = false
const getUsedChildChainOrDelete = vhd => {
if (vhd in vhdChainsToMerge) {
const chain = vhdChainsToMerge[vhd]
@@ -409,64 +409,8 @@ exports.cleanVm = async function cleanVm(
onLog(`the VHD ${vhd} is unused`)
if (remove) {
onLog(`getUsedChildChainOrDelete: deleting unused VHD`, {
vhdChildren,
vhd,
})
// temporarly disabled
shouldDelete = true
// unusedVhdsDeletion.push(VhdAbstract.unlink(handler, vhd))
}
}
{
// eslint-disable-next-line no-console
const debug = console.debug
if (shouldDelete) {
const chains = { __proto__: null }
const queue = new Set(vhds)
function addChildren(parent, chain) {
queue.delete(parent)
const child = vhdChildren[parent]
if (child !== undefined) {
const childChain = chains[child]
if (childChain !== undefined) {
// if a chain already exists, use it
delete chains[child]
chain.push(...childChain)
} else {
chain.push(child)
addChildren(child, chain)
}
}
}
for (const vhd of queue) {
const chain = []
addChildren(vhd, chain)
chains[vhd] = chain
}
const entries = Object.entries(chains)
debug(`${vhds.size} VHDs (${unusedVhds.size} unused) found among ${entries.length} chains [`)
const decorateVhd = vhd => {
const shortPath = basename(vhd)
return unusedVhds.has(vhd) ? `${shortPath} [unused]` : shortPath
}
for (let i = 0, n = entries.length; i < n; ++i) {
debug(`in ${dirname(entries[i][0])}`)
debug(' [')
const [parent, children] = entries[i]
debug(' ' + decorateVhd(parent))
for (const child of children) {
debug(' ' + decorateVhd(child))
}
debug(' ]')
}
debug(']')
onLog(`deleting unused VHD ${vhd}`)
unusedVhdsDeletion.push(VhdAbstract.unlink(handler, vhd))
}
}

View File

@@ -3,10 +3,9 @@
const assert = require('assert')
const map = require('lodash/map.js')
const mapValues = require('lodash/mapValues.js')
const uuid = require('uuid')
const ignoreErrors = require('promise-toolbox/ignoreErrors')
const { asyncMap } = require('@xen-orchestra/async-map')
const { chainVhd, checkVhdChain, openVhd, VhdAbstract, VhdDirectory } = require('vhd-lib')
const { chainVhd, checkVhdChain, openVhd, VhdAbstract } = require('vhd-lib')
const { createLogger } = require('@xen-orchestra/log')
const { dirname } = require('path')
@@ -31,7 +30,6 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
const backupDir = getVmBackupDir(backup.vm.uuid)
const vdisDir = `${backupDir}/vdis/${backup.job.id}`
const vhdDebugData = {}
await asyncMap(baseUuidToSrcVdi, async ([baseUuid, srcVdi]) => {
let found = false
@@ -42,16 +40,6 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
})
const packedBaseUuid = packUuid(baseUuid)
await asyncMap(vhds, async path => {
await Disposable.use(openVhd(handler, path), async vhd => {
const isMergeable = await adapter.isMergeableParent(packedBaseUuid, path)
vhdDebugData[path] = {
uuid: uuid.stringify(vhd.footer.uuid),
parentUuid: uuid.stringify(vhd.header.parentUuid),
isVhdDirectory: vhd instanceof VhdDirectory,
disktype: vhd.footer.diskType,
isMergeable,
}
})
try {
await checkVhdChain(handler, path)
// Warning, this should not be written as found = found || await adapter.isMergeableParent(packedBaseUuid, path)
@@ -64,31 +52,13 @@ exports.DeltaBackupWriter = class DeltaBackupWriter extends MixinBackupWriter(Ab
found = found || isMergeable
} catch (error) {
warn('checkBaseVdis', { error })
Task.warning(
`Backup.checkBaseVdis: Error while checking existing VHD ${vdisDir}/${srcVdi.uuid} : ${error.toString()}`
)
await ignoreErrors.call(VhdAbstract.unlink(handler, path))
}
})
} catch (error) {
warn('checkBaseVdis', { error })
Task.warning(
`Backup.checkBaseVdis : Impossible to open ${vdisDir}/${
srcVdi.uuid
} folder to list precedent backups: ${error.toString()}`
)
}
if (!found) {
Task.warning(
`Backup.checkBaseVdis : Impossible to find the base of ${srcVdi.uuid} for a delta : fallback to a full `,
{
data: {
vhdDebugData,
baseUuid,
vdiuuid: srcVdi.uuid,
},
}
)
baseUuidToSrcVdi.delete(baseUuid)
}
})

View File

@@ -20,7 +20,6 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
vm => vm.other_config[TAG_COPY_SRC] === baseVm.uuid
)
if (replicatedVm === undefined) {
Task.warning(`Replication.checkBaseVdis: no replicated VMs`)
return baseUuidToSrcVdi.clear()
}
@@ -34,7 +33,6 @@ exports.DeltaReplicationWriter = class DeltaReplicationWriter extends MixinRepli
for (const uuid of baseUuidToSrcVdi.keys()) {
if (!replicatedVdis.has(uuid)) {
Task.warning(`Replication.checkBaseVdis: VDI ${uuid} is not in the list of already replicated VDI`)
baseUuidToSrcVdi.delete(uuid)
}
}

View File

@@ -6,7 +6,6 @@ const { join } = require('path')
const { getVmBackupDir } = require('../_getVmBackupDir.js')
const MergeWorker = require('../merge-worker/index.js')
const { formatFilenameDate } = require('../_filenameDate.js')
const { Task } = require('../Task.js')
const { warn } = createLogger('xo:backups:MixinBackupWriter')
@@ -34,7 +33,6 @@ exports.MixinBackupWriter = (BaseClass = Object) =>
})
} catch (error) {
warn(error)
Task.warning(`error while cleaning the backup folder : ${error.toString()}`)
return {}
}
}

View File

@@ -1,13 +1,15 @@
import get from 'lodash/get.js'
import identity from 'lodash/identity.js'
import isEqual from 'lodash/isEqual.js'
import { createLogger } from '@xen-orchestra/log'
import { parseDuration } from '@vates/parse-duration'
import { watch } from 'app-conf'
'use strict'
const get = require('lodash/get')
const identity = require('lodash/identity')
const isEqual = require('lodash/isEqual')
const { createLogger } = require('@xen-orchestra/log')
const { parseDuration } = require('@vates/parse-duration')
const { watch } = require('app-conf')
const { warn } = createLogger('xo:mixins:config')
export default class Config {
module.exports = class Config {
constructor(app, { appDir, appName, config }) {
this._config = config
const watchers = (this._watchers = new Set())

View File

@@ -1,7 +1,9 @@
import assert from 'assert'
import emitAsync from '@xen-orchestra/emit-async'
import EventEmitter from 'events'
import { createLogger } from '@xen-orchestra/log'
'use strict'
const assert = require('assert')
const emitAsync = require('@xen-orchestra/emit-async')
const EventEmitter = require('events')
const { createLogger } = require('@xen-orchestra/log')
const { debug, warn } = createLogger('xo:mixins:hooks')
@@ -17,7 +19,7 @@ const runHook = async (emitter, hook) => {
debug(`${hook} finished`)
}
export default class Hooks extends EventEmitter {
module.exports = class Hooks extends EventEmitter {
// Run *clean* async listeners.
//
// They normalize existing data, clear invalid entries, etc.

View File

@@ -1,15 +1,15 @@
import { createLogger } from '@xen-orchestra/log'
import { EventListenersManager } from '@vates/event-listeners-manager'
import { pipeline } from 'stream'
import { ServerResponse, request } from 'http'
import assert from 'assert'
import fromCallback from 'promise-toolbox/fromCallback'
import fromEvent from 'promise-toolbox/fromEvent'
import net from 'net'
'use strict'
import { parseBasicAuth } from './_parseBasicAuth.mjs'
const { debug, warn } = require('@xen-orchestra/log').createLogger('xo:mixins:HttpProxy')
const { EventListenersManager } = require('@vates/event-listeners-manager')
const { pipeline } = require('stream')
const { ServerResponse, request } = require('http')
const assert = require('assert')
const fromCallback = require('promise-toolbox/fromCallback')
const fromEvent = require('promise-toolbox/fromEvent')
const net = require('net')
const { debug, warn } = createLogger('xo:mixins:HttpProxy')
const { parseBasicAuth } = require('./_parseBasicAuth.js')
const IGNORED_HEADERS = new Set([
// https://datatracker.ietf.org/doc/html/rfc2616#section-13.5.1
@@ -26,7 +26,7 @@ const IGNORED_HEADERS = new Set([
'host',
])
export default class HttpProxy {
module.exports = class HttpProxy {
#app
constructor(app, { httpServer }) {

View File

@@ -1,6 +1,8 @@
'use strict'
const RE = /^\s*basic\s+(.+?)\s*$/i
export function parseBasicAuth(header) {
exports.parseBasicAuth = function parseBasicAuth(header) {
if (header === undefined) {
return
}

View File

@@ -1,6 +1,6 @@
import Config from '@xen-orchestra/mixins/Config.mjs'
import Hooks from '@xen-orchestra/mixins/Hooks.mjs'
import HttpProxy from '@xen-orchestra/mixins/HttpProxy.mjs'
import Config from '@xen-orchestra/mixins/Config.js'
import Hooks from '@xen-orchestra/mixins/Hooks.js'
import HttpProxy from '@xen-orchestra/mixins/HttpProxy.js'
import mixin from '@xen-orchestra/mixin'
import { createDebounceResource } from '@vates/disposable/debounceResource.js'

View File

@@ -33,10 +33,5 @@
<!--packages-start-->
- @xen-orchestra/xapi major
- @xen-orchestra/backups minor
- @xen-orchestra/mixins major
- xo-server patch
- @xen-orchestra/proxy patch
<!--packages-end-->

View File

@@ -138,22 +138,9 @@ This CLI is mainly used as a debug tool, there's no 100% guarantee on its stabil
> xo-cli --help
Usage:
xo-cli --register [--allowUnauthorized] [--expiresIn duration] <XO-Server URL> <username> [<password>]
xo-cli --register <XO-Server URL> <username> [<password>]
Registers the XO instance to use.
--allowUnauthorized, --au
Accept invalid certificate (e.g. self-signed).
--expiresIn duration
Can be used to change the validity duration of the
authorization token (default: one month).
xo-cli --createToken <params>…
Create an authentication token for XO API.
<params>…
Accept the same parameters as --register, see its usage.
xo-cli --unregister
Remove stored credentials.
@@ -173,6 +160,7 @@ Usage:
xo-cli <command> [<name>=<value>]...
Executes a command on the current XO instance.
```
#### Register your XO instance

View File

@@ -22,7 +22,7 @@ Cookie: authenticationToken=TN2YBOMYtXB_hHtf4wTzm9p5tTuqq2i15yeuhcz2xXM
The server will respond to an invalid token with a `401 Unauthorized` status.
**[Not implemented at this time]** The server can request that the client updates its token with a `Set-Cookie` header:
The server can request that the client updates its token with a `Set-Cookie` header:
```http
HTTP/1.1 200 OK

View File

@@ -46,5 +46,9 @@ export default async rawArgs => {
await dest.writeFooter()
await dest.writeHeader()
await dest.writeBlockAllocationTable()
if (directory) {
dest.finalize()
}
})
}

View File

@@ -11,6 +11,8 @@ const zlib = require('zlib')
const { debug } = createLogger('vhd-lib:VhdDirectory')
const CREATION_FILE_NAME = '.creating'
const NULL_COMPRESSOR = {
compress: buffer => buffer,
decompress: buffer => buffer,
@@ -119,6 +121,7 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
static async create(handler, path, { flags = 'wx+', compression } = {}) {
await handler.mkdir(path)
const vhd = new VhdDirectory(handler, path, { flags, compression })
await handler.writeFile(`${path}/${CREATION_FILE_NAME}`, +new Date())
return {
dispose: () => {},
value: vhd,
@@ -174,7 +177,19 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
return `blocks/${blockPrefix}/${blockSuffix}`
}
async readHeaderAndFooter() {
async readHeaderAndFooter(checkSecondFooter = true) {
// check that the vhd is complete ( )
if (checkSecondFooter) {
try {
const date = await this._handler.readFile(`${this._path}/${CREATION_FILE_NAME}`)
throw new Error(`the vhd ${this._path} is currently in creation since ${date}`, { path: this._path, date })
} catch (error) {
if (error.code !== 'ENOENT') {
throw error
}
// no temporary file indicating that the vhd is currently in creation
}
}
await this.#readChunkFilters()
let bufHeader, bufFooter
@@ -290,4 +305,8 @@ exports.VhdDirectory = class VhdDirectory extends VhdAbstract {
})
this.#compressor = getCompressor(chunkFilters[0])
}
async finalize() {
await this._handler.unlink(`${this._path}/${CREATION_FILE_NAME}`)
}
}

View File

@@ -26,6 +26,9 @@ const buildVhd = Disposable.wrap(async function* (handler, path, inputStream, {
case 'bat':
// it exists but I don't care
break
case 'end':
await vhd.finalize()
break
default:
throw new Error(`unhandled type of block generated by parser : ${item.type} while generating ${path}`)
}

View File

@@ -119,6 +119,9 @@ exports.parseVhdStream = async function* parseVhdStream(stream) {
*/
const bufFooterEnd = await readLastSector(stream)
assert(bufFooter.equals(bufFooterEnd), 'footer1 !== footer2')
yield {
type: 'end',
}
}
function readLastSector(stream) {

View File

@@ -20,7 +20,7 @@ Cookie: authenticationToken=TN2YBOMYtXB_hHtf4wTzm9p5tTuqq2i15yeuhcz2xXM
The server will respond to an invalid token with a `401 Unauthorized` status.
**[Not implemented at this time]** The server can request that the client updates its token with a `Set-Cookie` header:
The server can request that the client updates its token with a `Set-Cookie` header:
```http
HTTP/1.1 200 OK

View File

@@ -1,7 +1,7 @@
import Config from '@xen-orchestra/mixins/Config.mjs'
import Config from '@xen-orchestra/mixins/Config.js'
import forEach from 'lodash/forEach.js'
import Hooks from '@xen-orchestra/mixins/Hooks.mjs'
import HttpProxy from '@xen-orchestra/mixins/HttpProxy.mjs'
import Hooks from '@xen-orchestra/mixins/Hooks.js'
import HttpProxy from '@xen-orchestra/mixins/HttpProxy.js'
import includes from 'lodash/includes.js'
import isEmpty from 'lodash/isEmpty.js'
import iteratee from 'lodash/iteratee.js'