feat(xo-server): initial tasks infrastructure (#6625)

This commit is contained in:
Julien Fontanet 2023-01-17 16:12:04 +01:00 committed by GitHub
parent 645c8f32e3
commit 3c7d316b3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 397 additions and 3 deletions

54
@vates/task/.USAGE.md Normal file
View File

@ -0,0 +1,54 @@
```js
import { Task } from '@vates/task'
const task = new Task({
name: 'my task',
// if defined, a new detached task is created
//
// if not defined and created inside an existing task, the new task is considered a subtask
onProgress(event) {
// this function is called each time this task or one of it's subtasks change state
const { id, timestamp, type } = event
if (type === 'start') {
const { name, parentId } = event
} else if (type === 'end') {
const { result, status } = event
} else if (type === 'info' || type === 'warning') {
const { data, message } = event
} else if (type === 'property') {
const { name, value } = event
}
},
})
// this field is settable once before being observed
task.id
task.status
await task.abort()
// if fn rejects, the task will be marked as failed
const result = await task.runInside(fn)
// if fn rejects, the task will be marked as failed
// if fn resolves, the task will be marked as succeeded
const result = await task.run(fn)
// the abort signal of the current task if any, otherwise is `undefined`
Task.abortSignal
// sends an info on the current task if any, otherwise does nothing
Task.info(message, data)
// sends an info on the current task if any, otherwise does nothing
Task.warning(message, data)
// attaches a property to the current task if any, otherwise does nothing
//
// the latest value takes precedence
//
// examples:
// - progress
Task.set(property, value)
```

1
@vates/task/.npmignore Symbolic link
View File

@ -0,0 +1 @@
../../scripts/npmignore

85
@vates/task/README.md Normal file
View File

@ -0,0 +1,85 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->
# @vates/task
[![Package Version](https://badgen.net/npm/v/@vates/task)](https://npmjs.org/package/@vates/task) ![License](https://badgen.net/npm/license/@vates/task) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/task)](https://bundlephobia.com/result?p=@vates/task) [![Node compatibility](https://badgen.net/npm/node/@vates/task)](https://npmjs.org/package/@vates/task)
## Install
Installation of the [npm package](https://npmjs.org/package/@vates/task):
```
> npm install --save @vates/task
```
## Usage
```js
import { Task } from '@vates/task'
const task = new Task({
name: 'my task',
// if defined, a new detached task is created
//
// if not defined and created inside an existing task, the new task is considered a subtask
onProgress(event) {
// this function is called each time this task or one of it's subtasks change state
const { id, timestamp, type } = event
if (type === 'start') {
const { name, parentId } = event
} else if (type === 'end') {
const { result, status } = event
} else if (type === 'info' || type === 'warning') {
const { data, message } = event
} else if (type === 'property') {
const { name, value } = event
}
},
})
// this field is settable once before being observed
task.id
task.status
await task.abort()
// if fn rejects, the task will be marked as failed
const result = await task.runInside(fn)
// if fn rejects, the task will be marked as failed
// if fn resolves, the task will be marked as succeeded
const result = await task.run(fn)
// the abort signal of the current task if any, otherwise is `undefined`
Task.abortSignal
// sends an info on the current task if any, otherwise does nothing
Task.info(message, data)
// sends an info on the current task if any, otherwise does nothing
Task.warning(message, data)
// attaches a property to the current task if any, otherwise does nothing
//
// the latest value takes precedence
//
// examples:
// - progress
Task.set(property, value)
```
## Contributions
Contributions are _very_ welcomed, either on the documentation or on
the code.
You may:
- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.
## License
[ISC](https://spdx.org/licenses/ISC) © [Vates SAS](https://vates.fr)

184
@vates/task/index.js Normal file
View File

@ -0,0 +1,184 @@
'use strict'
const assert = require('node:assert').strict
const { AsyncLocalStorage } = require('node:async_hooks')
// define a read-only, non-enumerable, non-configurable property
function define(object, property, value) {
Object.defineProperty(object, property, { value })
}
const noop = Function.prototype
const ABORTED = 'aborted'
const ABORTING = 'aborting'
const FAILURE = 'failure'
const PENDING = 'pending'
const SUCCESS = 'success'
exports.STATUS = { ABORTED, ABORTING, FAILURE, PENDING, SUCCESS }
const asyncStorage = new AsyncLocalStorage()
const getTask = () => asyncStorage.getStore()
exports.Task = class Task {
static get abortSignal() {
const task = getTask()
if (task !== undefined) {
return task.#abortController.signal
}
}
static info(message, data) {
const task = getTask()
if (task !== undefined) {
task.#emit('info', { data, message })
}
}
static run(opts, fn) {
return new this(opts).run(fn)
}
static set(name, value) {
const task = getTask()
if (task !== undefined) {
task.#emit('property', { name, value })
}
}
static warning(message, data) {
const task = getTask()
if (task !== undefined) {
task.#emit('warning', { data, message })
}
}
static wrap(opts, fn) {
// compatibility with @decorateWith
if (typeof fn !== 'function') {
;[fn, opts] = [opts, fn]
}
return function taskRun() {
return Task.run(typeof opts === 'function' ? opts.apply(this, arguments) : opts, () => fn.apply(this, arguments))
}
}
#abortController = new AbortController()
#onProgress
#parent
get id() {
return (this.id = Math.random().toString(36).slice(2))
}
set id(value) {
define(this, 'id', value)
}
#startData
#status = PENDING
get status() {
return this.#status
}
constructor({ name, onProgress }) {
this.#startData = { name }
if (onProgress !== undefined) {
this.#onProgress = onProgress
} else {
const parent = getTask()
if (parent !== undefined) {
this.#parent = parent
const { signal } = parent.#abortController
signal.addEventListener('abort', () => {
this.#abortController.abort(signal.reason)
})
this.#onProgress = parent.#onProgress
this.#startData.parentId = parent.id
} else {
this.#onProgress = noop
}
}
const { signal } = this.#abortController
signal.addEventListener('abort', () => {
if (this.status === PENDING) {
this.#status = this.#running ? ABORTING : ABORTED
}
})
}
abort(reason) {
this.#abortController.abort(reason)
}
#emit(type, data) {
data.id = this.id
data.timestamp = Date.now()
data.type = type
this.#onProgress(data)
}
#handleMaybeAbortion(result) {
if (this.status === ABORTING) {
this.#status = ABORTED
this.#emit('end', { status: ABORTED, result })
return true
}
return false;
}
async run(fn) {
const result = await this.runInside(fn)
if (this.status === PENDING) {
this.#status = SUCCESS
this.#emit('end', { status: SUCCESS, result })
}
return result
}
#running = false
async runInside(fn) {
assert.equal(this.status, PENDING)
assert.equal(this.#running, false)
this.#running = true
const startData = this.#startData
if (startData !== undefined) {
this.#startData = undefined
this.#emit('start', startData)
}
try {
const result = await asyncStorage.run(this, fn)
this.#handleMaybeAbortion(result)
this.#running = false
return result
} catch (result) {
if (!this.#handleMaybeAbortion(result)) {
this.#status = FAILURE
this.#emit('end', { status: FAILURE, result })
}
throw result
}
}
wrap(fn) {
const task = this
return function taskRun() {
return task.run(() => fn.apply(this, arguments))
}
}
wrapInside(fn) {
const task = this
return function taskRunInside() {
return task.runInside(() => fn.apply(this, arguments))
}
}
}

23
@vates/task/package.json Normal file
View File

@ -0,0 +1,23 @@
{
"private": false,
"name": "@vates/task",
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/task",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@vates/task",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"license": "ISC",
"version": "0.0.0",
"engines": {
"node": ">=14"
},
"scripts": {
"postversion": "npm publish --access public"
}
}

View File

@ -100,7 +100,7 @@ class Task {
* In case of error, the task will be failed.
*
* @typedef Result
* @param {() => Result)} fn
* @param {() => Result} fn
* @param {boolean} last - Whether the task should succeed if there is no error
* @returns Result
*/

View File

@ -0,0 +1,42 @@
import { createLogger } from '@xen-orchestra/log'
import { noSuchObject } from 'xo-common/api-errors.js'
import { Task } from '@vates/task'
export { Task }
const { debug } = createLogger('xo:mixins:Tasks')
export default class Tasks {
// contains instance of running tasks (required to interact with running tasks)
#tasks = new Map()
async create({ name }) {
const tasks = this.#tasks
let id
do {
id = Math.random().toString(36).slice(2)
} while (tasks.has(id))
const task = new Task({
name,
onProgress: event => {
debug('task event', event)
if (event.type === 'end' && event.id === id) {
tasks.delete(id)
}
},
})
task.id = id
tasks.set(id, task)
return task
}
async abort(id) {
const task = this.#tasks.get(id)
if (task === undefined) {
throw noSuchObject(id, 'task')
}
return task.abort()
}
}

View File

@ -21,12 +21,14 @@
"dependencies": {
"@vates/event-listeners-manager": "^1.0.1",
"@vates/parse-duration": "^0.1.1",
"@vates/task": "^0.0.0",
"@xen-orchestra/emit-async": "^1.0.0",
"@xen-orchestra/log": "^0.5.0",
"acme-client": "^5.0.0",
"app-conf": "^2.3.0",
"lodash": "^4.17.21",
"promise-toolbox": "^0.21.0"
"promise-toolbox": "^0.21.0",
"xo-common": "^0.8.0"
},
"scripts": {
"postversion": "npm publish --access public"

View File

@ -33,8 +33,10 @@
<!--packages-start-->
- @vates/task patch
- @xen-orchestra/backups patch
- @xen-orchestra/log minor
- @xen-orchestra/mixins feat
- @xen-orchestra/xapi patch
- vhd-lib patch
- xo-server minor

View File

@ -9,6 +9,7 @@ import mixin from '@xen-orchestra/mixin'
import mixinLegacy from '@xen-orchestra/mixin/legacy.js'
import stubTrue from 'lodash/stubTrue.js'
import SslCertificate from '@xen-orchestra/mixins/SslCertificate.mjs'
import Tasks from '@xen-orchestra/mixins/Tasks.mjs'
import { Collection as XoCollection } from 'xo-collection'
import { createClient as createRedisClient } from 'redis'
import { createDebounceResource } from '@vates/disposable/debounceResource.js'
@ -30,7 +31,7 @@ export default class Xo extends EventEmitter {
constructor(opts) {
super()
mixin(this, { Config, Hooks, HttpProxy, SslCertificate }, [opts])
mixin(this, { Config, Hooks, HttpProxy, SslCertificate, Tasks }, [opts])
// a lot of mixins adds listener for start/stop/… events
this.hooks.setMaxListeners(0)