From b66963fe6fdddf178dee4523fe65444c24698ede Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 27 Jun 2024 15:54:59 +0200 Subject: [PATCH] Runner can choose job type --- apps/peertube-runner/src/peertube-runner.ts | 26 +++++++++++++++-- apps/peertube-runner/src/server/server.ts | 20 ++++++------- .../src/server/shared/supported-job.ts | 11 ++++--- .../peertube-runner/video-transcription.ts | 29 ++++++++++++++++++- .../src/shared/peertube-runner-process.ts | 13 ++++++++- 5 files changed, 81 insertions(+), 18 deletions(-) diff --git a/apps/peertube-runner/src/peertube-runner.ts b/apps/peertube-runner/src/peertube-runner.ts index 67ca0e0ac..45787a4b2 100644 --- a/apps/peertube-runner/src/peertube-runner.ts +++ b/apps/peertube-runner/src/peertube-runner.ts @@ -1,8 +1,10 @@ #!/usr/bin/env node import { Command, InvalidArgumentError } from '@commander-js/extra-typings' +import { RunnerJobType } from '@peertube/peertube-models' import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' import { RunnerServer } from './server/index.js' +import { getSupportedJobsList } from './server/shared/supported-job.js' import { ConfigManager, logger } from './shared/index.js' const program = new Command() @@ -25,9 +27,29 @@ const program = new Command() program.command('server') .description('Run in server mode, to execute remote jobs of registered PeerTube instances') - .action(async () => { + .option( + '--enable-job ', + 'Enable this job type (multiple --enable-job options can be specified). ' + + 'By default all supported jobs are enabled). ' + + 'Supported job types: ' + getSupportedJobsList().join(', '), + (value: RunnerJobType, previous: RunnerJobType[]) => [ ...previous, value ], + [] + ) + .action(async options => { try { - await RunnerServer.Instance.run() + let enabledJobs: Set + + if (options.enableJob) { + for (const jobType of options.enableJob) { + if (getSupportedJobsList().includes(jobType) !== true) { + throw new InvalidArgumentError(`${jobType} is not a supported job`) + } + + enabledJobs = new Set(options.enableJob) + } + } + + await new RunnerServer(enabledJobs).run() } catch (err) { logger.error(err, 'Cannot run PeerTube runner as server mode') process.exit(-1) diff --git a/apps/peertube-runner/src/server/server.ts b/apps/peertube-runner/src/server/server.ts index 61f30c5ce..c10d7c4c1 100644 --- a/apps/peertube-runner/src/server/server.ts +++ b/apps/peertube-runner/src/server/server.ts @@ -3,13 +3,13 @@ import { readdir } from 'fs/promises' import { join } from 'path' import { io, Socket } from 'socket.io-client' import { pick, shuffle, wait } from '@peertube/peertube-core-utils' -import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models' +import { PeerTubeProblemDocument, RunnerJobType, ServerErrorCode } from '@peertube/peertube-models' import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands' import { ConfigManager } from '../shared/index.js' import { IPCServer } from '../shared/ipc/index.js' import { logger } from '../shared/logger.js' import { JobWithToken, processJob } from './process/index.js' -import { isJobSupported } from './shared/index.js' +import { getSupportedJobsList, isJobSupported } from './shared/index.js' type PeerTubeServer = PeerTubeServerCommand & { runnerToken: string @@ -18,8 +18,6 @@ type PeerTubeServer = PeerTubeServerCommand & { } export class RunnerServer { - private static instance: RunnerServer - private servers: PeerTubeServer[] = [] private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] @@ -30,11 +28,17 @@ export class RunnerServer { private readonly sockets = new Map() - private constructor () {} + constructor (private readonly enabledJobs?: Set) {} async run () { logger.info('Running PeerTube runner in server mode') + const enabledJobsArray = this.enabledJobs + ? Array.from(this.enabledJobs) + : getSupportedJobsList() + + logger.info('Supported and enabled job types: ' + enabledJobsArray.join(', ')) + await ConfigManager.Instance.load() for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { @@ -235,7 +239,7 @@ export class RunnerServer { const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) - const filtered = availableJobs.filter(j => isJobSupported(j)) + const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs)) if (filtered.length === 0) { logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) @@ -315,8 +319,4 @@ export class RunnerServer { process.exit() } - - static get Instance () { - return this.instance || (this.instance = new this()) - } } diff --git a/apps/peertube-runner/src/server/shared/supported-job.ts b/apps/peertube-runner/src/server/shared/supported-job.ts index 38cd28140..3e14261a6 100644 --- a/apps/peertube-runner/src/server/shared/supported-job.ts +++ b/apps/peertube-runner/src/server/shared/supported-job.ts @@ -36,12 +36,15 @@ const supportedMatrix: { [ id in RunnerJobType ]: (payload: RunnerJobPayload) => } } -export function isJobSupported (job: { - type: RunnerJobType - payload: RunnerJobPayload -}) { +export function isJobSupported (job: { type: RunnerJobType, payload: RunnerJobPayload }, enabledJobs?: Set) { + if (enabledJobs && !enabledJobs.has(job.type)) return false + const fn = supportedMatrix[job.type] if (!fn) return false return fn(job.payload as any) } + +export function getSupportedJobsList () { + return Object.keys(supportedMatrix) +} diff --git a/packages/tests/src/peertube-runner/video-transcription.ts b/packages/tests/src/peertube-runner/video-transcription.ts index aa6f40c54..462bb21eb 100644 --- a/packages/tests/src/peertube-runner/video-transcription.ts +++ b/packages/tests/src/peertube-runner/video-transcription.ts @@ -14,6 +14,7 @@ import { import { checkPeerTubeRunnerCacheIsEmpty } from '@tests/shared/directories.js' import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js' import { checkAutoCaption, checkLanguage, checkNoCaption, uploadForTranscription } from '@tests/shared/transcription.js' +import { expect } from 'chai' describe('Test transcription in peertube-runner program', function () { let servers: PeerTubeServer[] = [] @@ -34,7 +35,7 @@ describe('Test transcription in peertube-runner program', function () { const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken() peertubeRunner = new PeerTubeRunnerProcess(servers[0]) - await peertubeRunner.runServer() + await peertubeRunner.runServer({ jobType: 'video-transcription' }) await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' }) }) @@ -71,6 +72,32 @@ describe('Test transcription in peertube-runner program', function () { }) }) + describe('When transcription is not enabled in runner', function () { + + before(async function () { + await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' }) + peertubeRunner.kill() + await wait(500) + + const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken() + await peertubeRunner.runServer({ jobType: 'live-rtmp-hls-transcoding' }) + await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' }) + }) + + it('Should not run transcription', async function () { + this.timeout(60000) + + const uuid = await uploadForTranscription(servers[0]) + await wait(2000) + + const { data } = await servers[0].runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] }) + expect(data.some(j => j.type === 'video-transcription')).to.be.true + + await checkNoCaption(servers, uuid) + await checkLanguage(servers, uuid, null) + }) + }) + describe('Check cleanup', function () { it('Should have an empty cache directory', async function () { diff --git a/packages/tests/src/shared/peertube-runner-process.ts b/packages/tests/src/shared/peertube-runner-process.ts index 414d1f5bf..b4ba27d2f 100644 --- a/packages/tests/src/shared/peertube-runner-process.ts +++ b/packages/tests/src/shared/peertube-runner-process.ts @@ -3,6 +3,7 @@ import { execaNode } from 'execa' import { join } from 'path' import { root } from '@peertube/peertube-node-utils' import { PeerTubeServer } from '@peertube/peertube-server-commands' +import { RunnerJobType } from '../../../models/src/runners/runner-job-type.type.js' export class PeerTubeRunnerProcess { private app?: ChildProcess @@ -12,13 +13,19 @@ export class PeerTubeRunnerProcess { } runServer (options: { + jobType?: RunnerJobType hideLogs?: boolean // default true } = {}) { - const { hideLogs = true } = options + const { jobType, hideLogs = true } = options return new Promise((res, rej) => { const args = [ 'server', '--verbose', ...this.buildIdArg() ] + if (jobType) { + args.push('--enable-job') + args.push(jobType) + } + const forkOptions: ForkOptions = { detached: false, silent: true, @@ -27,6 +34,10 @@ export class PeerTubeRunnerProcess { this.app = fork(this.getRunnerPath(), args, forkOptions) + this.app.stderr.on('data', data => { + console.error(data.toString()) + }) + this.app.stdout.on('data', data => { const str = data.toString() as string