(obj: T[], key1: string, key2?: string): T[] {
return obj.sort((a, b) => {
const elem1 = key2 ? a[key1][key2] : a[key1]
const elem2 = key2 ? b[key1][key2] : b[key1]
diff --git a/packages/tests/src/api/transcoding/split-audio-and-video.ts b/packages/tests/src/api/transcoding/split-audio-and-video.ts
index bd213c135..b9b9ba82d 100644
--- a/packages/tests/src/api/transcoding/split-audio-and-video.ts
+++ b/packages/tests/src/api/transcoding/split-audio-and-video.ts
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
-import { join } from 'path'
-import { HttpStatusCode } from '@peertube/peertube-models'
+import { sortBy } from '@peertube/peertube-core-utils'
+import { HLSTranscodingPayload, HttpStatusCode } from '@peertube/peertube-models'
import { areMockObjectStorageTestsDisabled } from '@peertube/peertube-node-utils'
import {
cleanupTests,
@@ -15,6 +15,8 @@ import {
import { DEFAULT_AUDIO_RESOLUTION } from '@peertube/peertube-server/core/initializers/constants.js'
import { checkDirectoryIsEmpty, checkTmpIsEmpty } from '@tests/shared/directories.js'
import { completeCheckHlsPlaylist } from '@tests/shared/streaming-playlists.js'
+import { expect } from 'chai'
+import { join } from 'path'
describe('Test HLS with audio and video splitted', function () {
let servers: PeerTubeServer[] = []
@@ -50,6 +52,30 @@ describe('Test HLS with audio and video splitted', function () {
await completeCheckHlsPlaylist({ servers, videoUUID: uuid, hlsOnly, splittedAudio: true, objectStorageBaseUrl })
})
+ if (concurrency === 1) {
+ it('Should have processed audio just after first video resolution', async function () {
+ const { data } = await servers[0].jobs.list({ jobType: 'video-transcoding' })
+
+ const hlsJobs = data.filter(job => {
+ const data = job.data as HLSTranscodingPayload
+
+ return data.videoUUID === videoUUIDs[0] && data.type === 'new-resolution-to-hls'
+ })
+
+ const dataForVideo = sortBy(hlsJobs, 'processedOn')
+
+ expect((dataForVideo[0].data as HLSTranscodingPayload).resolution).to.equal(720)
+
+ // FIXME: next job after parent succeeded should be audio but bullmq seems to fire another one when it updates the child state
+ // It's the reason why another video stream transcoding is executed before the audio
+ try {
+ expect((dataForVideo[1].data as HLSTranscodingPayload).resolution).to.equal(0)
+ } catch (err) {
+ expect((dataForVideo[2].data as HLSTranscodingPayload).resolution).to.equal(0)
+ }
+ })
+ }
+
it('Should upload an audio file and transcode it to HLS', async function () {
this.timeout(120000)
diff --git a/server/core/lib/transcoding/shared/job-builders/abstract-job-builder.ts b/server/core/lib/transcoding/shared/job-builders/abstract-job-builder.ts
index 701f5dd4a..b9004d1d8 100644
--- a/server/core/lib/transcoding/shared/job-builders/abstract-job-builder.ts
+++ b/server/core/lib/transcoding/shared/job-builders/abstract-job-builder.ts
@@ -23,8 +23,8 @@ export abstract class AbstractJobBuilder {
}) {
const { video, videoFile, isNewVideo, user, videoFileAlreadyLocked } = options
- let mergeOrOptimizePayload: P
- let children: P[][] = []
+ let mergeOrOptimizePayload: P & { higherPriority?: boolean }
+ let children: (P & { higherPriority?: boolean })[][] = []
const mutexReleaser = videoFileAlreadyLocked
? () => {}
@@ -72,16 +72,16 @@ export abstract class AbstractJobBuilder
{
// HLS version of max resolution
if (CONFIG.TRANSCODING.HLS.ENABLED === true) {
- const hasSplitAndioTranscoding = CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO && videoFile.hasAudio()
+ const hasSplitAudioTranscoding = CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO && videoFile.hasAudio()
// We had some issues with a web video quick transcoded while producing a HLS version of it
const copyCodecs = !quickTranscode
- const hlsPayloads: P[] = []
+ const hlsPayloads: (P & { higherPriority?: boolean })[] = []
hlsPayloads.push(
this.buildHLSJobPayload({
- deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED && !hasSplitAndioTranscoding,
+ deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED && !hasSplitAudioTranscoding,
separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
@@ -94,20 +94,24 @@ export abstract class AbstractJobBuilder
{
})
)
- if (hasSplitAndioTranscoding) {
+ if (hasSplitAudioTranscoding) {
hlsAudioAlreadyGenerated = true
hlsPayloads.push(
- this.buildHLSJobPayload({
- deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED,
- separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
+ {
+ higherPriority: true,
- copyCodecs,
- resolution: 0,
- fps: 0,
- video,
- isNewVideo
- })
+ ...this.buildHLSJobPayload({
+ deleteWebVideoFiles: !CONFIG.TRANSCODING.WEB_VIDEOS.ENABLED,
+ separatedAudio: CONFIG.TRANSCODING.HLS.SPLIT_AUDIO_AND_VIDEO,
+
+ copyCodecs,
+ resolution: 0,
+ fps: 0,
+ video,
+ isNewVideo
+ })
+ }
)
}
@@ -262,7 +266,8 @@ export abstract class AbstractJobBuilder
{
protected abstract createJobs (options: {
video: MVideoFullLight
- payloads: [ [ P ], ...(P[][]) ] // Array of sequential jobs to create that depend on parent job
+ // Array of sequential jobs to create that depend on parent job
+ payloads: [ [ (P & { higherPriority?: boolean }) ], ...((P & { higherPriority?: boolean })[][]) ]
user: MUserId | null
}): Promise
diff --git a/server/core/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts b/server/core/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts
index ea5cf5fd1..fa4b39a2a 100644
--- a/server/core/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts
+++ b/server/core/lib/transcoding/shared/job-builders/transcoding-job-queue-builder.ts
@@ -8,7 +8,6 @@ import {
import { CreateJobArgument, JobQueue } from '@server/lib/job-queue/index.js'
import { VideoJobInfoModel } from '@server/models/video/video-job-info.js'
import { MUserId, MVideo } from '@server/types/models/index.js'
-import Bluebird from 'bluebird'
import { getTranscodingJobPriority } from '../../transcoding-priority.js'
import { AbstractJobBuilder } from './abstract-job-builder.js'
@@ -18,21 +17,29 @@ type Payload =
NewWebVideoResolutionTranscodingPayload |
HLSTranscodingPayload
+type PayloadWithPriority = Payload & { higherPriority?: boolean }
+
export class TranscodingJobQueueBuilder extends AbstractJobBuilder {
protected async createJobs (options: {
video: MVideo
- payloads: [ [ Payload ], ...(Payload[][]) ] // Array of sequential jobs to create that depend on parent job
+ // Array of sequential jobs to create that depend on parent job
+ payloads: [ [ PayloadWithPriority ], ...(PayloadWithPriority[][]) ]
user: MUserId | null
}): Promise {
const { video, payloads, user } = options
+ const priority = await getTranscodingJobPriority({ user, type: 'vod', fallback: undefined })
+
const parent = payloads[0][0]
payloads.shift()
- const nextTranscodingSequentialJobs = await Bluebird.mapSeries(payloads, p => {
- return Bluebird.mapSeries(p, payload => {
- return this.buildTranscodingJob({ payload, user })
+ const nextTranscodingSequentialJobs = payloads.map(p => {
+ return p.map(payload => {
+ return this.buildTranscodingJob({
+ payload,
+ priority: payload.higherPriority ? priority - 1 : priority
+ })
})
})
@@ -44,7 +51,11 @@ export class TranscodingJobQueueBuilder extends AbstractJobBuilder {
}
}
- const parentJob = await this.buildTranscodingJob({ payload: parent, user, hasChildren: payloads.length !== 0 })
+ const parentJob = this.buildTranscodingJob({
+ payload: parent,
+ priority: parent.higherPriority ? priority - 1 : priority,
+ hasChildren: payloads.length !== 0
+ })
await JobQueue.Instance.createSequentialJobFlow(parentJob, transcodingJobBuilderJob)
@@ -52,16 +63,16 @@ export class TranscodingJobQueueBuilder extends AbstractJobBuilder {
await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
}
- private async buildTranscodingJob (options: {
+ private buildTranscodingJob (options: {
payload: VideoTranscodingPayload
hasChildren?: boolean
- user: MUserId | null // null means we don't want priority
+ priority: number
}) {
- const { user, payload, hasChildren = false } = options
+ const { priority, payload, hasChildren = false } = options
return {
type: 'video-transcoding' as 'video-transcoding',
- priority: await getTranscodingJobPriority({ user, type: 'vod', fallback: undefined }),
+ priority,
payload: { ...payload, hasChildren }
}
}
diff --git a/server/core/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts b/server/core/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts
index 44cee513a..e8ed40139 100644
--- a/server/core/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts
+++ b/server/core/lib/transcoding/shared/job-builders/transcoding-runner-job-builder.ts
@@ -26,12 +26,14 @@ type Payload = {
options: Omit[0], 'priority'>
}
+type PayloadWithPriority = Payload & { higherPriority?: boolean }
+
// eslint-disable-next-line max-len
export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
protected async createJobs (options: {
video: MVideo
- payloads: [ [ Payload ], ...(Payload[][]) ] // Array of sequential jobs to create that depend on parent job
+ payloads: [ [ PayloadWithPriority ], ...(PayloadWithPriority[][]) ] // Array of sequential jobs to create that depend on parent job
user: MUserId | null
}): Promise {
const { payloads, user } = options
@@ -39,7 +41,12 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
const parent = payloads[0][0]
payloads.shift()
- const parentJob = await this.createJob({ payload: parent, user })
+ const priority = await getTranscodingJobPriority({ user, type: 'vod', fallback: 0 })
+
+ const parentJob = await this.createJob({
+ payload: parent,
+ priority: parent.higherPriority ? priority - 1 : priority
+ })
for (const parallelPayloads of payloads) {
let lastJob = parentJob
@@ -47,8 +54,8 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
for (const parallelPayload of parallelPayloads) {
lastJob = await this.createJob({
payload: parallelPayload,
- dependsOnRunnerJob: lastJob,
- user
+ priority: parallelPayload.higherPriority ? priority - 1 : priority,
+ dependsOnRunnerJob: lastJob
})
}
@@ -58,10 +65,10 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
private async createJob (options: {
payload: Payload
- user: MUserId | null
+ priority: number
dependsOnRunnerJob?: MRunnerJob
}) {
- const { dependsOnRunnerJob, payload, user } = options
+ const { dependsOnRunnerJob, payload, priority } = options
const builder = new payload.Builder()
@@ -69,7 +76,7 @@ export class TranscodingRunnerJobBuilder extends AbstractJobBuilder {
...(payload.options as any), // FIXME: typings
dependsOnRunnerJob,
- priority: await getTranscodingJobPriority({ user, type: 'vod', fallback: 0 })
+ priority
})
}
diff --git a/server/core/lib/views/shared/video-viewer-counters.ts b/server/core/lib/views/shared/video-viewer-counters.ts
index ba43ee39a..364c8071f 100644
--- a/server/core/lib/views/shared/video-viewer-counters.ts
+++ b/server/core/lib/views/shared/video-viewer-counters.ts
@@ -126,8 +126,6 @@ export class VideoViewerCounters {
getTotalViewersOf (video: MVideoImmutable) {
const viewers = this.viewersPerVideo.get(video.id)
- logger.error('toto', { viewers })
-
return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
}