From 53023be33af420675d0060eb95c99a8038457564 Mon Sep 17 00:00:00 2001
From: Chocobozzz <me@florianbigard.com>
Date: Thu, 23 Jun 2022 10:29:43 +0200
Subject: [PATCH] Fix fast restream in saved permanent live

---
 server/initializers/constants.ts              |   8 +-
 .../job-queue/handlers/video-live-ending.ts   |  14 +-
 server/lib/live/live-utils.ts                 |  10 +-
 server/tests/api/live/index.ts                |   1 +
 server/tests/api/live/live-fast-restream.ts   | 128 ++++++++++++++++++
 server/tests/api/live/live-save-replay.ts     |  41 ------
 server/tests/api/live/live.ts                 |   8 +-
 .../server-commands/server/config-command.ts  |   9 +-
 shared/server-commands/videos/live-command.ts |  28 +++-
 9 files changed, 182 insertions(+), 65 deletions(-)
 create mode 100644 server/tests/api/live/live-fast-restream.ts

diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 9201f95b3..6c6628d28 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -865,7 +865,7 @@ if (isTestInstance() === true) {
 
   PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
 
-  VIDEO_LIVE.CLEANUP_DELAY = 5000
+  VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000
   VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2
   VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1
   VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1
@@ -1174,3 +1174,9 @@ function buildLanguages () {
 function generateContentHash () {
   return randomBytes(20).toString('hex')
 }
+
+function getIntEnv (path: string) {
+  if (process.env[path]) return parseInt(process.env[path])
+
+  return undefined
+}
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index feec257fc..450bda2fd 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,7 +4,7 @@ import { join } from 'path'
 import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
 import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
 import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
-import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
+import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
 import {
   generateHLSMasterPlaylistFilename,
   generateHlsSha256SegmentsFilename,
@@ -22,15 +22,17 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
 import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
 import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
-import { logger } from '../../../helpers/logger'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
+
+const lTags = loggerTagsFactory('live', 'job')
 
 async function processVideoLiveEnding (job: Job) {
   const payload = job.data as VideoLiveEndingPayload
 
-  logger.info('Processing video live ending for %s.', payload.videoId, { payload })
+  logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
 
   function logError () {
-    logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
+    logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
   }
 
   const liveVideo = await VideoModel.load(payload.videoId)
@@ -73,8 +75,6 @@ async function saveReplayToExternalVideo (options: {
 }) {
   const { liveVideo, liveSession, publishedAt, replayDirectory } = options
 
-  await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
-
   const video = new VideoModel({
     name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
     isLive: false,
@@ -243,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
     if (live.permanentLive) {
       await cleanupPermanentLive(video, streamingPlaylist)
     } else {
-      await cleanupNormalLive(video, streamingPlaylist)
+      await cleanupUnsavedNormalLive(video, streamingPlaylist)
     }
   }
 
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index 6365e23db..6305a97a8 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -10,20 +10,20 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
   return 'concat-' + num[1] + '.ts'
 }
 
-async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
   const hlsDirectory = getLiveDirectory(video)
 
   await cleanupTMPLiveFiles(hlsDirectory)
 
-  if (streamingPlaylist) await streamingPlaylist.destroy()
+  await streamingPlaylist.destroy()
 }
 
-async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
   const hlsDirectory = getLiveDirectory(video)
 
   await remove(hlsDirectory)
 
-  if (streamingPlaylist) await streamingPlaylist.destroy()
+  await streamingPlaylist.destroy()
 }
 
 async function cleanupTMPLiveFiles (hlsDirectory: string) {
@@ -49,7 +49,7 @@ async function cleanupTMPLiveFiles (hlsDirectory: string) {
 
 export {
   cleanupPermanentLive,
-  cleanupNormalLive,
+  cleanupUnsavedNormalLive,
   cleanupTMPLiveFiles,
   buildConcatenatedName
 }
diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts
index 71bc150d8..c88943f65 100644
--- a/server/tests/api/live/index.ts
+++ b/server/tests/api/live/index.ts
@@ -1,4 +1,5 @@
 import './live-constraints'
+import './live-fast-restream'
 import './live-socket-messages'
 import './live-permanent'
 import './live-rtmps'
diff --git a/server/tests/api/live/live-fast-restream.ts b/server/tests/api/live/live-fast-restream.ts
new file mode 100644
index 000000000..4b5d041ec
--- /dev/null
+++ b/server/tests/api/live/live-fast-restream.ts
@@ -0,0 +1,128 @@
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import { wait } from '@shared/core-utils'
+import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
+import {
+  cleanupTests,
+  createSingleServer,
+  makeRawRequest,
+  PeerTubeServer,
+  setAccessTokensToServers,
+  setDefaultVideoChannel,
+  stopFfmpeg,
+  waitJobs
+} from '@shared/server-commands'
+
+const expect = chai.expect
+
+describe('Fast restream in live', function () {
+  let server: PeerTubeServer
+
+  async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
+    const attributes: LiveVideoCreate = {
+      channelId: server.store.channel.id,
+      privacy: VideoPrivacy.PUBLIC,
+      name: 'my super live',
+      saveReplay: options.replay,
+      permanentLive: options.permanent
+    }
+
+    const { uuid } = await server.live.create({ fields: attributes })
+    return uuid
+  }
+
+  async function fastRestreamWrapper ({ replay }: { replay: boolean }) {
+    const liveVideoUUID = await createLiveWrapper({ permanent: true, replay })
+    await waitJobs([ server ])
+
+    const rtmpOptions = {
+      videoId: liveVideoUUID,
+      copyCodecs: true,
+      fixtureName: 'video_short.mp4'
+    }
+
+    // Streaming session #1
+    let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+    await server.live.waitUntilPublished({ videoId: liveVideoUUID })
+    await stopFfmpeg(ffmpegCommand)
+    await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+    // Streaming session #2
+    ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+    await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 })
+
+    return { ffmpegCommand, liveVideoUUID }
+  }
+
+  async function ensureLastLiveWorks (liveId: string) {
+    // Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY
+    for (let i = 0; i < 100; i++) {
+      const video = await server.videos.get({ id: liveId })
+      expect(video.streamingPlaylists).to.have.lengthOf(1)
+
+      await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
+      await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
+
+      await wait(100)
+    }
+  }
+
+  async function runTest (replay: boolean) {
+    const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay })
+
+    await ensureLastLiveWorks(liveVideoUUID)
+
+    await stopFfmpeg(ffmpegCommand)
+    await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+    // Wait for replays
+    await waitJobs([ server ])
+
+    const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID })
+
+    expect(total).to.equal(2)
+    expect(sessions).to.have.lengthOf(2)
+
+    for (const session of sessions) {
+      expect(session.error).to.be.null
+
+      if (replay) {
+        expect(session.replayVideo).to.exist
+
+        await server.videos.get({ id: session.replayVideo.uuid })
+      } else {
+        expect(session.replayVideo).to.not.exist
+      }
+    }
+  }
+
+  before(async function () {
+    this.timeout(120000)
+
+    const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' }
+    server = await createSingleServer(1, {}, { env })
+
+    // Get the access tokens
+    await setAccessTokensToServers([ server ])
+    await setDefaultVideoChannel([ server ])
+
+    await server.config.enableMinimumTranscoding(false, true)
+    await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' })
+  })
+
+  it('Should correctly fast reastream in a permanent live with and without save replay', async function () {
+    this.timeout(240000)
+
+    // A test can take a long time, so prefer to run them in parallel
+    await Promise.all([
+      runTest(true),
+      runTest(false)
+    ])
+  })
+
+  after(async function () {
+    await cleanupTests([ server ])
+  })
+})
diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts
index 99d500711..7ddcb04ef 100644
--- a/server/tests/api/live/live-save-replay.ts
+++ b/server/tests/api/live/live-save-replay.ts
@@ -12,7 +12,6 @@ import {
   createMultipleServers,
   doubleFollow,
   findExternalSavedVideo,
-  makeRawRequest,
   PeerTubeServer,
   setAccessTokensToServers,
   setDefaultVideoChannel,
@@ -442,46 +441,6 @@ describe('Save replay setting', function () {
       await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
       await checkLiveCleanup(servers[0], liveVideoUUID, [])
     })
-
-    it('Should correctly save replays with multiple sessions', async function () {
-      this.timeout(120000)
-
-      liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
-      await waitJobs(servers)
-
-      // Streaming session #1
-      ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
-      await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
-      await stopFfmpeg(ffmpegCommand)
-      await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
-
-      // Streaming session #2
-      ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
-      await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
-
-      await wait(5000)
-      const video = await servers[0].videos.get({ id: liveVideoUUID })
-      expect(video.streamingPlaylists).to.have.lengthOf(1)
-      await makeRawRequest(video.streamingPlaylists[0].playlistUrl)
-
-      await stopFfmpeg(ffmpegCommand)
-      await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
-
-      // Wait for replays
-      await waitJobs(servers)
-
-      const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
-
-      expect(total).to.equal(2)
-      expect(sessions).to.have.lengthOf(2)
-
-      for (const session of sessions) {
-        expect(session.error).to.be.null
-        expect(session.replayVideo).to.exist
-
-        await servers[0].videos.get({ id: session.replayVideo.uuid })
-      }
-    })
   })
 
   after(async function () {
diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts
index 9b8fbe3e2..5d354aad1 100644
--- a/server/tests/api/live/live.ts
+++ b/server/tests/api/live/live.ts
@@ -395,7 +395,7 @@ describe('Test live', function () {
         for (let i = 0; i < resolutions.length; i++) {
           const segmentNum = 3
           const segmentName = `${i}-00000${segmentNum}.ts`
-          await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum })
+          await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
 
           const subPlaylist = await servers[0].streamingPlaylists.get({
             url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
@@ -628,9 +628,9 @@ describe('Test live', function () {
         commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
       ])
 
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 })
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 })
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 })
 
       {
         const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId })
diff --git a/shared/server-commands/server/config-command.ts b/shared/server-commands/server/config-command.ts
index 5320dead4..3803aaf95 100644
--- a/shared/server-commands/server/config-command.ts
+++ b/shared/server-commands/server/config-command.ts
@@ -39,15 +39,18 @@ export class ConfigCommand extends AbstractCommand {
   enableLive (options: {
     allowReplay?: boolean
     transcoding?: boolean
+    resolutions?: 'min' | 'max' // Default max
   } = {}) {
+    const { allowReplay, transcoding, resolutions = 'max' } = options
+
     return this.updateExistingSubConfig({
       newConfig: {
         live: {
           enabled: true,
-          allowReplay: options.allowReplay ?? true,
+          allowReplay: allowReplay ?? true,
           transcoding: {
-            enabled: options.transcoding ?? true,
-            resolutions: ConfigCommand.getCustomConfigResolutions(true)
+            enabled: transcoding ?? true,
+            resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max')
           }
         }
       }
diff --git a/shared/server-commands/videos/live-command.ts b/shared/server-commands/videos/live-command.ts
index 2ff65881b..3df47ed4d 100644
--- a/shared/server-commands/videos/live-command.ts
+++ b/shared/server-commands/videos/live-command.ts
@@ -154,13 +154,33 @@ export class LiveCommand extends AbstractCommand {
 
   waitUntilSegmentGeneration (options: OverrideCommandOptions & {
     videoUUID: string
-    resolution: number
+    playlistNumber: number
+    segment: number
+    totalSessions?: number
+  }) {
+    const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options
+    const segmentName = `${playlistNumber}-00000${segment}.ts`
+
+    return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
+  }
+
+  getSegment (options: OverrideCommandOptions & {
+    videoUUID: string
+    playlistNumber: number
     segment: number
   }) {
-    const { resolution, segment, videoUUID } = options
-    const segmentName = `${resolution}-00000${segment}.ts`
+    const { playlistNumber, segment, videoUUID } = options
 
-    return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
+    const segmentName = `${playlistNumber}-00000${segment}.ts`
+    const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
+
+    return this.getRawRequest({
+      ...options,
+
+      url,
+      implicitToken: false,
+      defaultExpectedStatus: HttpStatusCode.OK_200
+    })
   }
 
   async waitUntilReplacedByReplay (options: OverrideCommandOptions & {