feat(mixins/Tasks): db, log consolidation, abortion & watch
This commit is contained in:
parent
60f6e54da1
commit
f3c5e817a3
@ -1,58 +1,187 @@
|
||||
import { createLogger } from '@xen-orchestra/log'
|
||||
import { EventEmitter } from 'node:events'
|
||||
import { makeOnProgress } from '@vates/task/combineEvents'
|
||||
import { noSuchObject } from 'xo-common/api-errors.js'
|
||||
import { Task } from '@vates/task'
|
||||
|
||||
import { TreeMap } from './_TreeMap.mjs'
|
||||
import iteratee from 'lodash/iteratee.js'
|
||||
import stubTrue from 'lodash/stubTrue.js'
|
||||
|
||||
export { Task }
|
||||
|
||||
const { debug } = createLogger('xo:mixins:Tasks')
|
||||
const { warn } = createLogger('xo:mixins:Tasks')
|
||||
|
||||
export default class Tasks {
|
||||
// contains instance of running tasks (required to interact with running tasks)
|
||||
const formatId = timestamp => timestamp.toString(36).padStart(9, '0')
|
||||
|
||||
const noop = Function.prototype
|
||||
|
||||
export default class Tasks extends EventEmitter {
|
||||
// contains consolidated logs of all live and finished tasks
|
||||
#store
|
||||
|
||||
// contains instance of running tasks (required for interaction, e.g. aborting)
|
||||
#tasks = new Map()
|
||||
|
||||
// contains tasks grouped by objects (tasks unrelated to objects are available under undefined)
|
||||
#tasksByObject = new TreeMap()
|
||||
#onProgress = makeOnProgress({
|
||||
onRootTaskEnd: taskLog => {
|
||||
const { id } = taskLog
|
||||
this.#tasks.delete(id)
|
||||
},
|
||||
onTaskUpdate: async taskLog => {
|
||||
try {
|
||||
const { $root } = taskLog
|
||||
|
||||
create({ name, objectId }) {
|
||||
const tasks = this.#tasks
|
||||
let id
|
||||
do {
|
||||
id = Math.random().toString(36).slice(2)
|
||||
} while (tasks.has(id))
|
||||
$root.updatedAt = Date.now()
|
||||
|
||||
const byObject = this.#tasksByObject
|
||||
const { id } = $root
|
||||
await this.#store.put(id, $root)
|
||||
this.emit(id, $root)
|
||||
} catch (error) {
|
||||
warn('failure on saving task log in store', { error, taskLog })
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
const task = new Task({
|
||||
name,
|
||||
onProgress: event => {
|
||||
debug('task event', event)
|
||||
if (event.type === 'end' && event.id === id) {
|
||||
setTimeout(() => {
|
||||
tasks.delete(id)
|
||||
byObject.delete([objectId, id])
|
||||
}, 600e3)
|
||||
constructor(app) {
|
||||
super()
|
||||
|
||||
app.hooks
|
||||
.on('clean', () => this.#gc(app.config.getOptional('tasks.gc.keep') ?? 1e3))
|
||||
.on('start', async () => {
|
||||
this.#store = await app.getStore('tasks')
|
||||
|
||||
for await (const taskLog of this.list({ filter: _ => _.status === 'pending' })) {
|
||||
taskLog.status = 'interrupted'
|
||||
await this.#store.put(taskLog.id, taskLog)
|
||||
}
|
||||
},
|
||||
|
||||
return () => this.#store.close()
|
||||
})
|
||||
}
|
||||
|
||||
#gc(keep) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const db = this.#store
|
||||
|
||||
let count = 1
|
||||
|
||||
const cb = () => {
|
||||
if (--count === 0) {
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
const stream = db.createKeyStream({
|
||||
reverse: true,
|
||||
})
|
||||
|
||||
const deleteEntry = key => {
|
||||
++count
|
||||
db.del(key, cb)
|
||||
}
|
||||
|
||||
const onData =
|
||||
keep !== 0
|
||||
? () => {
|
||||
if (--keep === 0) {
|
||||
stream.on('data', deleteEntry)
|
||||
stream.removeListener('data', onData)
|
||||
}
|
||||
}
|
||||
: deleteEntry
|
||||
stream.on('data', onData)
|
||||
|
||||
stream.on('end', cb).on('error', reject)
|
||||
})
|
||||
task.id = id
|
||||
|
||||
tasks.set(id, task)
|
||||
byObject.set([objectId, id], task)
|
||||
|
||||
return task
|
||||
}
|
||||
|
||||
getByObject(objectId) {
|
||||
return this.#tasksByObject.get(objectId)
|
||||
}
|
||||
|
||||
async abort(id) {
|
||||
async abort(id, reason) {
|
||||
const task = this.#tasks.get(id)
|
||||
if (task === undefined) {
|
||||
throw noSuchObject(id, 'task')
|
||||
}
|
||||
return task.abort()
|
||||
return task.abort(reason)
|
||||
}
|
||||
|
||||
async clearLogs() {
|
||||
await this.#store.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new XO task
|
||||
*
|
||||
* @param {object} params
|
||||
* @param {string} params.name - Human readable name of the task
|
||||
* @param {string} [params.objectId] - Identifier of the object this task is related to
|
||||
* @param {string} [params.type] - Type of this task (e.g. `backup.vm`, `backup.metadata`)
|
||||
*
|
||||
* @returns {Task}
|
||||
*/
|
||||
create({ name, objectId, type }) {
|
||||
const tasks = this.#tasks
|
||||
|
||||
const task = new Task({ data: { name, objectId, type }, onProgress: this.#onProgress })
|
||||
|
||||
// Use a compact, sortable, string representation of the creation date
|
||||
//
|
||||
// Due to the padding, dates are sortable up to 5188-04-22T11:04:28.415Z
|
||||
let now = Date.now()
|
||||
let id
|
||||
while (tasks.has((id = formatId(now)))) {
|
||||
// if the current id is already taken, use the next millisecond
|
||||
++now
|
||||
}
|
||||
task.id = id
|
||||
|
||||
tasks.set(id, task)
|
||||
|
||||
return task
|
||||
}
|
||||
|
||||
async deleteLog(id) {
|
||||
await this.#store.del(id)
|
||||
}
|
||||
|
||||
async get(id) {
|
||||
try {
|
||||
return await this.#store.get(id)
|
||||
} catch (error) {
|
||||
if (error.type === 'NotFoundError') {
|
||||
throw noSuchObject(id, 'task')
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
async *list({ filter, limit = Infinity }) {
|
||||
const predicate = filter === undefined ? stubTrue : typeof filter === 'function' ? filter : iteratee(filter)
|
||||
|
||||
for await (const [, taskLog] of this.#store.iterator()) {
|
||||
if (predicate(taskLog)) {
|
||||
yield taskLog
|
||||
|
||||
if (--limit < 1) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async watch(id, cb) {
|
||||
// live task
|
||||
if (this.#tasks.has(id)) {
|
||||
this.on(id, cb)
|
||||
|
||||
let stopWatch = () => {
|
||||
this.off(id, cb)
|
||||
stopWatch = noop
|
||||
}
|
||||
return () => stopWatch()
|
||||
}
|
||||
|
||||
// unknown task will throw noSuchObject
|
||||
const taskLog = await this.get(id)
|
||||
|
||||
// finished task
|
||||
setImmediate(cb, taskLog)
|
||||
return noop
|
||||
}
|
||||
}
|
||||
|
@ -1,66 +0,0 @@
|
||||
function splitKey(key) {
|
||||
return Array.isArray(key) ? [key[0], key.length < 2 ? undefined : key.slice(1)] : [key, undefined]
|
||||
}
|
||||
|
||||
export class TreeMap extends Map {
|
||||
delete(key) {
|
||||
const [head, tail] = splitKey(key)
|
||||
|
||||
if (tail === undefined) {
|
||||
return super.delete(head)
|
||||
}
|
||||
|
||||
const value = super.get(head)
|
||||
if (value instanceof TreeMap) {
|
||||
if (value.delete(tail)) {
|
||||
if (value.size === 0) {
|
||||
return super.delete(head)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
get(key) {
|
||||
const [head, tail] = splitKey(key)
|
||||
|
||||
const value = super.get(head)
|
||||
if (tail === undefined) {
|
||||
return value
|
||||
}
|
||||
|
||||
if (value instanceof TreeMap) {
|
||||
return value.get(tail)
|
||||
}
|
||||
}
|
||||
|
||||
has(key) {
|
||||
const [head, tail] = splitKey(key)
|
||||
|
||||
if (!super.has(head)) {
|
||||
return false
|
||||
}
|
||||
if (tail === undefined) {
|
||||
return true
|
||||
}
|
||||
|
||||
const value = super.get(head)
|
||||
return value instanceof TreeMap && value.has(tail)
|
||||
}
|
||||
|
||||
set(key, value) {
|
||||
const [head, tail] = splitKey(key)
|
||||
|
||||
if (tail === undefined) {
|
||||
return super.set(head, value)
|
||||
}
|
||||
|
||||
let map = super.get(head)
|
||||
if (!(map instanceof TreeMap)) {
|
||||
map = new TreeMap()
|
||||
super.set(head, map)
|
||||
}
|
||||
map.set(tail, value)
|
||||
return this
|
||||
}
|
||||
}
|
@ -1,28 +0,0 @@
|
||||
import { strictEqual as assertEq } from 'node:assert'
|
||||
import test from 'test'
|
||||
|
||||
import { TreeMap } from './_TreeMap.mjs'
|
||||
|
||||
test(function () {
|
||||
const tree = new TreeMap()
|
||||
|
||||
assertEq(tree instanceof Map, true)
|
||||
|
||||
assertEq(tree.set([0, 1], 'foo'), tree)
|
||||
|
||||
assertEq(tree.has(0), true)
|
||||
assertEq(tree.has([0, 1]), true)
|
||||
|
||||
assertEq(tree.get(0).get(1), 'foo')
|
||||
assertEq(tree.get([0, 1]), 'foo')
|
||||
|
||||
assertEq(tree.delete([0, 1]), true)
|
||||
|
||||
assertEq(tree.has(0), false)
|
||||
assertEq(tree.has([0, 1]), false)
|
||||
|
||||
assertEq(tree.get(0), undefined)
|
||||
assertEq(tree.get([0, 1]), undefined)
|
||||
|
||||
assertEq(tree.delete([0, 1]), false)
|
||||
})
|
@ -1,5 +1,6 @@
|
||||
import assert from 'assert'
|
||||
import { fromEvent } from 'promise-toolbox'
|
||||
import { Task } from '@xen-orchestra/mixins/Tasks.mjs'
|
||||
|
||||
export function getPermissionsForUser({ userId }) {
|
||||
return this.getPermissionsForUser(userId)
|
||||
@ -107,3 +108,39 @@ changeConnectedXapiHostname.params = {
|
||||
newObject: { type: 'string', description: "new connection's XO object" },
|
||||
oldObject: { type: 'string', description: "current connection's XO object" },
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------
|
||||
|
||||
export async function createTask({ name, objectId, result, duration }) {
|
||||
const task = this.tasks.create({ name, objectId })
|
||||
task
|
||||
.run(async () => {
|
||||
const { abortSignal } = Task
|
||||
|
||||
let i = 0
|
||||
const handle = setInterval(() => {
|
||||
Task.set('i', i++)
|
||||
}, 5e3)
|
||||
try {
|
||||
await new Promise((resolve, reject) => {
|
||||
setTimeout(resolve, duration)
|
||||
|
||||
abortSignal.addEventListener('abort', () => reject(abortSignal.reason))
|
||||
})
|
||||
return result
|
||||
} finally {
|
||||
clearInterval(handle)
|
||||
}
|
||||
})
|
||||
.catch(Function.prototype)
|
||||
return task.id
|
||||
}
|
||||
|
||||
createTask.permission = 'admin'
|
||||
|
||||
createTask.params = {
|
||||
name: { type: 'string', default: 'xo task' },
|
||||
objectId: { type: 'string', optional: true },
|
||||
result: { optional: true },
|
||||
duration: { type: 'number', default: 600e3 },
|
||||
}
|
||||
|
@ -267,10 +267,18 @@ export default class RestApi {
|
||||
})
|
||||
)
|
||||
|
||||
api.get('/:collection/:object/tasks', (req, res) => {
|
||||
const tasks = app.tasks.getByObject(req.xoObject.id)
|
||||
sendObjects(tasks === undefined ? [] : Array.from(tasks.values()), req, res, '/tasks')
|
||||
})
|
||||
api.get(
|
||||
'/:collection/:object/tasks',
|
||||
wrap(async (req, res) => {
|
||||
const { query } = req
|
||||
const objectId = req.xoObject.id
|
||||
const tasks = app.tasks.list({
|
||||
filter: every(_ => _.status === 'pending' && _.objectId === objectId, handleOptionalUserFilter(query.filter)),
|
||||
limit: ifDef(query.limit, Number),
|
||||
})
|
||||
sendObjects(await asyncIteratorToArray(tasks), req, res, req.baseUrl + '/tasks')
|
||||
})
|
||||
)
|
||||
|
||||
api.get('/:collection/:object/actions', (req, res) => {
|
||||
const { actions } = req.collection
|
||||
@ -289,6 +297,7 @@ export default class RestApi {
|
||||
pResult.then(result => res.json(result), next)
|
||||
} else {
|
||||
pResult.catch(noop)
|
||||
res.statusCode = 202
|
||||
res.end(req.baseUrl + '/tasks/' + task.id)
|
||||
}
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user