feat(mixins/Tasks): db, log consolidation, abortion & watch

This commit is contained in:
Julien Fontanet 2023-04-17 11:49:09 +02:00
parent 60f6e54da1
commit f3c5e817a3
5 changed files with 215 additions and 134 deletions

View File

@ -1,58 +1,187 @@
import { createLogger } from '@xen-orchestra/log'
import { EventEmitter } from 'node:events'
import { makeOnProgress } from '@vates/task/combineEvents'
import { noSuchObject } from 'xo-common/api-errors.js'
import { Task } from '@vates/task'
import { TreeMap } from './_TreeMap.mjs'
import iteratee from 'lodash/iteratee.js'
import stubTrue from 'lodash/stubTrue.js'
export { Task }
const { debug } = createLogger('xo:mixins:Tasks')
const { warn } = createLogger('xo:mixins:Tasks')
export default class Tasks {
// contains instance of running tasks (required to interact with running tasks)
const formatId = timestamp => timestamp.toString(36).padStart(9, '0')
const noop = Function.prototype
export default class Tasks extends EventEmitter {
// contains consolidated logs of all live and finished tasks
#store
// contains instance of running tasks (required for interaction, e.g. aborting)
#tasks = new Map()
// contains tasks grouped by objects (tasks unrelated to objects are available under undefined)
#tasksByObject = new TreeMap()
#onProgress = makeOnProgress({
onRootTaskEnd: taskLog => {
const { id } = taskLog
this.#tasks.delete(id)
},
onTaskUpdate: async taskLog => {
try {
const { $root } = taskLog
create({ name, objectId }) {
const tasks = this.#tasks
let id
do {
id = Math.random().toString(36).slice(2)
} while (tasks.has(id))
$root.updatedAt = Date.now()
const byObject = this.#tasksByObject
const { id } = $root
await this.#store.put(id, $root)
this.emit(id, $root)
} catch (error) {
warn('failure on saving task log in store', { error, taskLog })
}
},
})
const task = new Task({
name,
onProgress: event => {
debug('task event', event)
if (event.type === 'end' && event.id === id) {
setTimeout(() => {
tasks.delete(id)
byObject.delete([objectId, id])
}, 600e3)
constructor(app) {
super()
app.hooks
.on('clean', () => this.#gc(app.config.getOptional('tasks.gc.keep') ?? 1e3))
.on('start', async () => {
this.#store = await app.getStore('tasks')
for await (const taskLog of this.list({ filter: _ => _.status === 'pending' })) {
taskLog.status = 'interrupted'
await this.#store.put(taskLog.id, taskLog)
}
},
return () => this.#store.close()
})
}
#gc(keep) {
return new Promise((resolve, reject) => {
const db = this.#store
let count = 1
const cb = () => {
if (--count === 0) {
resolve()
}
}
const stream = db.createKeyStream({
reverse: true,
})
const deleteEntry = key => {
++count
db.del(key, cb)
}
const onData =
keep !== 0
? () => {
if (--keep === 0) {
stream.on('data', deleteEntry)
stream.removeListener('data', onData)
}
}
: deleteEntry
stream.on('data', onData)
stream.on('end', cb).on('error', reject)
})
task.id = id
tasks.set(id, task)
byObject.set([objectId, id], task)
return task
}
getByObject(objectId) {
return this.#tasksByObject.get(objectId)
}
async abort(id) {
async abort(id, reason) {
const task = this.#tasks.get(id)
if (task === undefined) {
throw noSuchObject(id, 'task')
}
return task.abort()
return task.abort(reason)
}
async clearLogs() {
await this.#store.clear()
}
/**
* Creates a new XO task
*
* @param {object} params
* @param {string} params.name - Human readable name of the task
* @param {string} [params.objectId] - Identifier of the object this task is related to
* @param {string} [params.type] - Type of this task (e.g. `backup.vm`, `backup.metadata`)
*
* @returns {Task}
*/
create({ name, objectId, type }) {
const tasks = this.#tasks
const task = new Task({ data: { name, objectId, type }, onProgress: this.#onProgress })
// Use a compact, sortable, string representation of the creation date
//
// Due to the padding, dates are sortable up to 5188-04-22T11:04:28.415Z
let now = Date.now()
let id
while (tasks.has((id = formatId(now)))) {
// if the current id is already taken, use the next millisecond
++now
}
task.id = id
tasks.set(id, task)
return task
}
async deleteLog(id) {
await this.#store.del(id)
}
async get(id) {
try {
return await this.#store.get(id)
} catch (error) {
if (error.type === 'NotFoundError') {
throw noSuchObject(id, 'task')
}
throw error
}
}
async *list({ filter, limit = Infinity }) {
const predicate = filter === undefined ? stubTrue : typeof filter === 'function' ? filter : iteratee(filter)
for await (const [, taskLog] of this.#store.iterator()) {
if (predicate(taskLog)) {
yield taskLog
if (--limit < 1) {
break
}
}
}
}
async watch(id, cb) {
// live task
if (this.#tasks.has(id)) {
this.on(id, cb)
let stopWatch = () => {
this.off(id, cb)
stopWatch = noop
}
return () => stopWatch()
}
// unknown task will throw noSuchObject
const taskLog = await this.get(id)
// finished task
setImmediate(cb, taskLog)
return noop
}
}

View File

@ -1,66 +0,0 @@
function splitKey(key) {
return Array.isArray(key) ? [key[0], key.length < 2 ? undefined : key.slice(1)] : [key, undefined]
}
export class TreeMap extends Map {
delete(key) {
const [head, tail] = splitKey(key)
if (tail === undefined) {
return super.delete(head)
}
const value = super.get(head)
if (value instanceof TreeMap) {
if (value.delete(tail)) {
if (value.size === 0) {
return super.delete(head)
}
}
}
return false
}
get(key) {
const [head, tail] = splitKey(key)
const value = super.get(head)
if (tail === undefined) {
return value
}
if (value instanceof TreeMap) {
return value.get(tail)
}
}
has(key) {
const [head, tail] = splitKey(key)
if (!super.has(head)) {
return false
}
if (tail === undefined) {
return true
}
const value = super.get(head)
return value instanceof TreeMap && value.has(tail)
}
set(key, value) {
const [head, tail] = splitKey(key)
if (tail === undefined) {
return super.set(head, value)
}
let map = super.get(head)
if (!(map instanceof TreeMap)) {
map = new TreeMap()
super.set(head, map)
}
map.set(tail, value)
return this
}
}

View File

@ -1,28 +0,0 @@
import { strictEqual as assertEq } from 'node:assert'
import test from 'test'
import { TreeMap } from './_TreeMap.mjs'
test(function () {
const tree = new TreeMap()
assertEq(tree instanceof Map, true)
assertEq(tree.set([0, 1], 'foo'), tree)
assertEq(tree.has(0), true)
assertEq(tree.has([0, 1]), true)
assertEq(tree.get(0).get(1), 'foo')
assertEq(tree.get([0, 1]), 'foo')
assertEq(tree.delete([0, 1]), true)
assertEq(tree.has(0), false)
assertEq(tree.has([0, 1]), false)
assertEq(tree.get(0), undefined)
assertEq(tree.get([0, 1]), undefined)
assertEq(tree.delete([0, 1]), false)
})

View File

@ -1,5 +1,6 @@
import assert from 'assert'
import { fromEvent } from 'promise-toolbox'
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
export function getPermissionsForUser({ userId }) {
return this.getPermissionsForUser(userId)
@ -107,3 +108,39 @@ changeConnectedXapiHostname.params = {
newObject: { type: 'string', description: "new connection's XO object" },
oldObject: { type: 'string', description: "current connection's XO object" },
}
// -------------------------------------------------------------------
export async function createTask({ name, objectId, result, duration }) {
const task = this.tasks.create({ name, objectId })
task
.run(async () => {
const { abortSignal } = Task
let i = 0
const handle = setInterval(() => {
Task.set('i', i++)
}, 5e3)
try {
await new Promise((resolve, reject) => {
setTimeout(resolve, duration)
abortSignal.addEventListener('abort', () => reject(abortSignal.reason))
})
return result
} finally {
clearInterval(handle)
}
})
.catch(Function.prototype)
return task.id
}
createTask.permission = 'admin'
createTask.params = {
name: { type: 'string', default: 'xo task' },
objectId: { type: 'string', optional: true },
result: { optional: true },
duration: { type: 'number', default: 600e3 },
}

View File

@ -267,10 +267,18 @@ export default class RestApi {
})
)
api.get('/:collection/:object/tasks', (req, res) => {
const tasks = app.tasks.getByObject(req.xoObject.id)
sendObjects(tasks === undefined ? [] : Array.from(tasks.values()), req, res, '/tasks')
})
api.get(
'/:collection/:object/tasks',
wrap(async (req, res) => {
const { query } = req
const objectId = req.xoObject.id
const tasks = app.tasks.list({
filter: every(_ => _.status === 'pending' && _.objectId === objectId, handleOptionalUserFilter(query.filter)),
limit: ifDef(query.limit, Number),
})
sendObjects(await asyncIteratorToArray(tasks), req, res, req.baseUrl + '/tasks')
})
)
api.get('/:collection/:object/actions', (req, res) => {
const { actions } = req.collection
@ -289,6 +297,7 @@ export default class RestApi {
pResult.then(result => res.json(result), next)
} else {
pResult.catch(noop)
res.statusCode = 202
res.end(req.baseUrl + '/tasks/' + task.id)
}
})