Runner can choose job type

This commit is contained in:
Chocobozzz 2024-06-27 15:54:59 +02:00
parent fd4831e502
commit b66963fe6f
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
5 changed files with 81 additions and 18 deletions

View File

@ -1,8 +1,10 @@
#!/usr/bin/env node #!/usr/bin/env node
import { Command, InvalidArgumentError } from '@commander-js/extra-typings' import { Command, InvalidArgumentError } from '@commander-js/extra-typings'
import { RunnerJobType } from '@peertube/peertube-models'
import { listRegistered, registerRunner, unregisterRunner } from './register/index.js' import { listRegistered, registerRunner, unregisterRunner } from './register/index.js'
import { RunnerServer } from './server/index.js' import { RunnerServer } from './server/index.js'
import { getSupportedJobsList } from './server/shared/supported-job.js'
import { ConfigManager, logger } from './shared/index.js' import { ConfigManager, logger } from './shared/index.js'
const program = new Command() const program = new Command()
@ -25,9 +27,29 @@ const program = new Command()
program.command('server') program.command('server')
.description('Run in server mode, to execute remote jobs of registered PeerTube instances') .description('Run in server mode, to execute remote jobs of registered PeerTube instances')
.action(async () => { .option(
'--enable-job <type>',
'Enable this job type (multiple --enable-job options can be specified). ' +
'By default all supported jobs are enabled). ' +
'Supported job types: ' + getSupportedJobsList().join(', '),
(value: RunnerJobType, previous: RunnerJobType[]) => [ ...previous, value ],
[]
)
.action(async options => {
try { try {
await RunnerServer.Instance.run() let enabledJobs: Set<RunnerJobType>
if (options.enableJob) {
for (const jobType of options.enableJob) {
if (getSupportedJobsList().includes(jobType) !== true) {
throw new InvalidArgumentError(`${jobType} is not a supported job`)
}
enabledJobs = new Set(options.enableJob)
}
}
await new RunnerServer(enabledJobs).run()
} catch (err) { } catch (err) {
logger.error(err, 'Cannot run PeerTube runner as server mode') logger.error(err, 'Cannot run PeerTube runner as server mode')
process.exit(-1) process.exit(-1)

View File

@ -3,13 +3,13 @@ import { readdir } from 'fs/promises'
import { join } from 'path' import { join } from 'path'
import { io, Socket } from 'socket.io-client' import { io, Socket } from 'socket.io-client'
import { pick, shuffle, wait } from '@peertube/peertube-core-utils' import { pick, shuffle, wait } from '@peertube/peertube-core-utils'
import { PeerTubeProblemDocument, ServerErrorCode } from '@peertube/peertube-models' import { PeerTubeProblemDocument, RunnerJobType, ServerErrorCode } from '@peertube/peertube-models'
import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands' import { PeerTubeServer as PeerTubeServerCommand } from '@peertube/peertube-server-commands'
import { ConfigManager } from '../shared/index.js' import { ConfigManager } from '../shared/index.js'
import { IPCServer } from '../shared/ipc/index.js' import { IPCServer } from '../shared/ipc/index.js'
import { logger } from '../shared/logger.js' import { logger } from '../shared/logger.js'
import { JobWithToken, processJob } from './process/index.js' import { JobWithToken, processJob } from './process/index.js'
import { isJobSupported } from './shared/index.js' import { getSupportedJobsList, isJobSupported } from './shared/index.js'
type PeerTubeServer = PeerTubeServerCommand & { type PeerTubeServer = PeerTubeServerCommand & {
runnerToken: string runnerToken: string
@ -18,8 +18,6 @@ type PeerTubeServer = PeerTubeServerCommand & {
} }
export class RunnerServer { export class RunnerServer {
private static instance: RunnerServer
private servers: PeerTubeServer[] = [] private servers: PeerTubeServer[] = []
private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = [] private processingJobs: { job: JobWithToken, server: PeerTubeServer }[] = []
@ -30,11 +28,17 @@ export class RunnerServer {
private readonly sockets = new Map<PeerTubeServer, Socket>() private readonly sockets = new Map<PeerTubeServer, Socket>()
private constructor () {} constructor (private readonly enabledJobs?: Set<RunnerJobType>) {}
async run () { async run () {
logger.info('Running PeerTube runner in server mode') logger.info('Running PeerTube runner in server mode')
const enabledJobsArray = this.enabledJobs
? Array.from(this.enabledJobs)
: getSupportedJobsList()
logger.info('Supported and enabled job types: ' + enabledJobsArray.join(', '))
await ConfigManager.Instance.load() await ConfigManager.Instance.load()
for (const registered of ConfigManager.Instance.getConfig().registeredInstances) { for (const registered of ConfigManager.Instance.getConfig().registeredInstances) {
@ -235,7 +239,7 @@ export class RunnerServer {
const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken }) const { availableJobs } = await server.runnerJobs.request({ runnerToken: server.runnerToken })
const filtered = availableJobs.filter(j => isJobSupported(j)) const filtered = availableJobs.filter(j => isJobSupported(j, this.enabledJobs))
if (filtered.length === 0) { if (filtered.length === 0) {
logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`) logger.debug(`No job available on ${server.url} for runner ${server.runnerName}`)
@ -315,8 +319,4 @@ export class RunnerServer {
process.exit() process.exit()
} }
static get Instance () {
return this.instance || (this.instance = new this())
}
} }

View File

@ -36,12 +36,15 @@ const supportedMatrix: { [ id in RunnerJobType ]: (payload: RunnerJobPayload) =>
} }
} }
export function isJobSupported (job: { export function isJobSupported (job: { type: RunnerJobType, payload: RunnerJobPayload }, enabledJobs?: Set<RunnerJobType>) {
type: RunnerJobType if (enabledJobs && !enabledJobs.has(job.type)) return false
payload: RunnerJobPayload
}) {
const fn = supportedMatrix[job.type] const fn = supportedMatrix[job.type]
if (!fn) return false if (!fn) return false
return fn(job.payload as any) return fn(job.payload as any)
} }
export function getSupportedJobsList () {
return Object.keys(supportedMatrix)
}

View File

@ -14,6 +14,7 @@ import {
import { checkPeerTubeRunnerCacheIsEmpty } from '@tests/shared/directories.js' import { checkPeerTubeRunnerCacheIsEmpty } from '@tests/shared/directories.js'
import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js' import { PeerTubeRunnerProcess } from '@tests/shared/peertube-runner-process.js'
import { checkAutoCaption, checkLanguage, checkNoCaption, uploadForTranscription } from '@tests/shared/transcription.js' import { checkAutoCaption, checkLanguage, checkNoCaption, uploadForTranscription } from '@tests/shared/transcription.js'
import { expect } from 'chai'
describe('Test transcription in peertube-runner program', function () { describe('Test transcription in peertube-runner program', function () {
let servers: PeerTubeServer[] = [] let servers: PeerTubeServer[] = []
@ -34,7 +35,7 @@ describe('Test transcription in peertube-runner program', function () {
const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken() const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken()
peertubeRunner = new PeerTubeRunnerProcess(servers[0]) peertubeRunner = new PeerTubeRunnerProcess(servers[0])
await peertubeRunner.runServer() await peertubeRunner.runServer({ jobType: 'video-transcription' })
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' }) await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' })
}) })
@ -71,6 +72,32 @@ describe('Test transcription in peertube-runner program', function () {
}) })
}) })
describe('When transcription is not enabled in runner', function () {
before(async function () {
await peertubeRunner.unregisterPeerTubeInstance({ runnerName: 'runner' })
peertubeRunner.kill()
await wait(500)
const registrationToken = await servers[0].runnerRegistrationTokens.getFirstRegistrationToken()
await peertubeRunner.runServer({ jobType: 'live-rtmp-hls-transcoding' })
await peertubeRunner.registerPeerTubeInstance({ registrationToken, runnerName: 'runner' })
})
it('Should not run transcription', async function () {
this.timeout(60000)
const uuid = await uploadForTranscription(servers[0])
await wait(2000)
const { data } = await servers[0].runnerJobs.list({ stateOneOf: [ RunnerJobState.PENDING ] })
expect(data.some(j => j.type === 'video-transcription')).to.be.true
await checkNoCaption(servers, uuid)
await checkLanguage(servers, uuid, null)
})
})
describe('Check cleanup', function () { describe('Check cleanup', function () {
it('Should have an empty cache directory', async function () { it('Should have an empty cache directory', async function () {

View File

@ -3,6 +3,7 @@ import { execaNode } from 'execa'
import { join } from 'path' import { join } from 'path'
import { root } from '@peertube/peertube-node-utils' import { root } from '@peertube/peertube-node-utils'
import { PeerTubeServer } from '@peertube/peertube-server-commands' import { PeerTubeServer } from '@peertube/peertube-server-commands'
import { RunnerJobType } from '../../../models/src/runners/runner-job-type.type.js'
export class PeerTubeRunnerProcess { export class PeerTubeRunnerProcess {
private app?: ChildProcess private app?: ChildProcess
@ -12,13 +13,19 @@ export class PeerTubeRunnerProcess {
} }
runServer (options: { runServer (options: {
jobType?: RunnerJobType
hideLogs?: boolean // default true hideLogs?: boolean // default true
} = {}) { } = {}) {
const { hideLogs = true } = options const { jobType, hideLogs = true } = options
return new Promise<void>((res, rej) => { return new Promise<void>((res, rej) => {
const args = [ 'server', '--verbose', ...this.buildIdArg() ] const args = [ 'server', '--verbose', ...this.buildIdArg() ]
if (jobType) {
args.push('--enable-job')
args.push(jobType)
}
const forkOptions: ForkOptions = { const forkOptions: ForkOptions = {
detached: false, detached: false,
silent: true, silent: true,
@ -27,6 +34,10 @@ export class PeerTubeRunnerProcess {
this.app = fork(this.getRunnerPath(), args, forkOptions) this.app = fork(this.getRunnerPath(), args, forkOptions)
this.app.stderr.on('data', data => {
console.error(data.toString())
})
this.app.stdout.on('data', data => { this.app.stdout.on('data', data => {
const str = data.toString() as string const str = data.toString() as string