Fix client player error on fast restream

This commit is contained in:
Chocobozzz 2024-08-09 09:42:58 +02:00
parent d47d95cb6f
commit 25684e837c
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
12 changed files with 141 additions and 41 deletions

View File

@ -562,14 +562,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
if (this.video.isLive) { if (this.video.isLive) {
player.one('ended', () => { player.one('ended', () => {
this.zone.run(() => { this.zone.run(() => this.endLive())
// We changed the video, it's not a live anymore
if (!this.video.isLive) return
this.video.state.id = VideoState.LIVE_ENDED
this.updatePlayerOnNoLive()
})
}) })
} }
@ -884,6 +877,7 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
.subscribe(({ type, payload }) => { .subscribe(({ type, payload }) => {
if (type === 'state-change') return this.handleLiveStateChange(payload.state) if (type === 'state-change') return this.handleLiveStateChange(payload.state)
if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers) if (type === 'views-change') return this.handleLiveViewsChange(payload.viewers)
if (type === 'force-end') return this.endLive()
}) })
} }
@ -992,4 +986,13 @@ export class VideoWatchComponent implements OnInit, OnDestroy {
peertubeLink: false peertubeLink: false
} }
} }
private endLive () {
// We changed the video, it's not a live anymore
if (!this.video.isLive) return
this.video.state.id = VideoState.LIVE_ENDED
this.updatePlayerOnNoLive()
}
} }

View File

@ -68,7 +68,7 @@ export class PeerTubeSocket {
this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos') this.liveVideosSocket = this.io(environment.apiUrl + '/live-videos')
const types: LiveVideoEventType[] = [ 'views-change', 'state-change' ] const types: LiveVideoEventType[] = [ 'views-change', 'state-change', 'force-end' ]
for (const type of types) { for (const type of types) {
this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => { this.liveVideosSocket.on(type, (payload: LiveVideoEventPayload) => {

View File

@ -130,6 +130,8 @@ export class Html5Hlsjs {
private dvrDuration: number = null private dvrDuration: number = null
private edgeMargin: number = null private edgeMargin: number = null
private liveEnded = false
private handlers: { [ id in 'play' | 'error' ]: EventListener } = { private handlers: { [ id in 'play' | 'error' ]: EventListener } = {
play: null, play: null,
error: null error: null
@ -260,6 +262,16 @@ export class Html5Hlsjs {
private _handleNetworkError (error: any) { private _handleNetworkError (error: any) {
if (navigator.onLine === false) return if (navigator.onLine === false) return
// We may have errors if the live ended because of a fast-restream in the same permanent live
if (this.liveEnded) {
logger.info('Forcing end of live stream after a network error');
(this.player as any)?.handleTechEnded_()
this.hls?.stopLoad()
return
}
if (this.errorCounts[Hlsjs.ErrorTypes.NETWORK_ERROR] <= this.maxNetworkErrorRecovery) { if (this.errorCounts[Hlsjs.ErrorTypes.NETWORK_ERROR] <= this.maxNetworkErrorRecovery) {
logger.info('trying to recover network error') logger.info('trying to recover network error')
@ -383,6 +395,8 @@ export class Html5Hlsjs {
} }
private initialize () { private initialize () {
this.liveEnded = false
this.buildBaseConfig() this.buildBaseConfig()
if ([ '', 'auto' ].includes(this.videoElement.preload) && !this.videoElement.autoplay && this.hlsjsConfig.autoStartLoad === undefined) { if ([ '', 'auto' ].includes(this.videoElement.preload) && !this.videoElement.autoplay && this.hlsjsConfig.autoStartLoad === undefined) {
@ -403,7 +417,7 @@ export class Html5Hlsjs {
this.hls.on(Hlsjs.Events.ERROR, (event, data) => this._onError(event, data)) this.hls.on(Hlsjs.Events.ERROR, (event, data) => this._onError(event, data))
this.hls.on(Hlsjs.Events.MANIFEST_PARSED, (event, data) => this._onMetaData(event, data)) this.hls.on(Hlsjs.Events.MANIFEST_PARSED, (event, data) => this._onMetaData(event, data))
this.hls.on(Hlsjs.Events.LEVEL_LOADED, (event, data) => { this.hls.on(Hlsjs.Events.LEVEL_LOADED, (_event, data) => {
// The DVR plugin will auto seek to "live edge" on start up // The DVR plugin will auto seek to "live edge" on start up
if (this.hlsjsConfig.liveSyncDuration) { if (this.hlsjsConfig.liveSyncDuration) {
this.edgeMargin = this.hlsjsConfig.liveSyncDuration this.edgeMargin = this.hlsjsConfig.liveSyncDuration
@ -412,6 +426,7 @@ export class Html5Hlsjs {
} }
if (this.isLive && !data.details.live) { if (this.isLive && !data.details.live) {
this.liveEnded = true
this.player.trigger('hlsjs-live-ended') this.player.trigger('hlsjs-live-ended')
} }

View File

@ -141,14 +141,9 @@ class P2pMediaLoaderPlugin extends Plugin {
initHlsJsPlayer(this.player, this.hlsjs) initHlsJsPlayer(this.player, this.hlsjs)
this.p2pEngine.on(Events.SegmentError, (segment: Segment, err) => { this.p2pEngine.on(Events.SegmentError, (segment: Segment, err) => {
if (navigator.onLine === false) return if (navigator.onLine === false || this.liveEnded) return
// We may have errors if the live ended because of a fast-restream in the same permanent live
if (this.liveEnded) {
(this.player as any).handleTechEnded_()
return
}
logger.error(`Segment ${segment.id} error.`, err) logger.clientError(`Segment ${segment.id} error.`, err)
if (this.options.redundancyUrlManager) { if (this.options.redundancyUrlManager) {
this.options.redundancyUrlManager.removeBySegmentUrl(segment.requestUrl) this.options.redundancyUrlManager.removeBySegmentUrl(segment.requestUrl)

View File

@ -7,11 +7,14 @@ class RedundancyUrlManager {
} }
removeBySegmentUrl (segmentUrl: string) { removeBySegmentUrl (segmentUrl: string) {
logger.info(`Removing redundancy of segment URL ${segmentUrl}.`)
const baseUrl = getBaseUrl(segmentUrl) const baseUrl = getBaseUrl(segmentUrl)
const oldLength = baseUrl.length
this.baseUrls = this.baseUrls.filter(u => u !== baseUrl && u !== baseUrl + '/') this.baseUrls = this.baseUrls.filter(u => u !== baseUrl && u !== baseUrl + '/')
if (oldLength !== this.baseUrls.length) {
logger.info(`Removed redundancy of segment URL ${segmentUrl}.`)
}
} }
buildUrl (url: string) { buildUrl (url: string) {

View File

@ -70,7 +70,7 @@ export class SegmentValidator {
throw new Error(`Unknown segment name ${filename}/${range} in segment validator`) throw new Error(`Unknown segment name ${filename}/${range} in segment validator`)
} }
debugLogger(`Validating ${filename} range ${segment.range}`) debugLogger(`Validating ${filename}` + (segment.range ? ` range ${segment.range}` : ''))
const calculatedSha = await this.sha256Hex(segment.data) const calculatedSha = await this.sha256Hex(segment.data)
if (calculatedSha !== hashShouldBe) { if (calculatedSha !== hashShouldBe) {

View File

@ -317,17 +317,20 @@ export class PeerTubeEmbed {
if (video.isLive) { if (video.isLive) {
this.liveManager.listenForChanges({ this.liveManager.listenForChanges({
video, video,
onPublishedVideo: () => { onPublishedVideo: () => {
this.liveManager.stopListeningForChanges(video) this.liveManager.stopListeningForChanges(video)
this.loadVideoAndBuildPlayer({ uuid: video.uuid, forceAutoplay: true }) this.loadVideoAndBuildPlayer({ uuid: video.uuid, forceAutoplay: true })
} },
onForceEnd: () => this.endLive(video, translations)
}) })
if (video.state.id === VideoState.WAITING_FOR_LIVE || video.state.id === VideoState.LIVE_ENDED) { if (video.state.id === VideoState.WAITING_FOR_LIVE || video.state.id === VideoState.LIVE_ENDED) {
this.liveManager.displayInfo({ state: video.state.id, translations }) this.liveManager.displayInfo({ state: video.state.id, translations })
this.peertubePlayer.disable() this.peertubePlayer.disable()
} else { } else {
this.correctlyHandleLiveEnding(translations) this.player.one('ended', () => this.endLive(video, translations))
} }
} }
@ -369,13 +372,13 @@ export class PeerTubeEmbed {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
private correctlyHandleLiveEnding (translations: Translations) { private endLive (video: VideoDetails, translations: Translations) {
this.player.one('ended', () => { // Display the live ended information
// Display the live ended information this.liveManager.displayInfo({ state: VideoState.LIVE_ENDED, translations })
this.liveManager.displayInfo({ state: VideoState.LIVE_ENDED, translations })
this.peertubePlayer.disable() this.peertubePlayer.unload()
}) this.peertubePlayer.disable()
this.peertubePlayer.setPoster(video.previewPath)
} }
private async handlePasswordError (err: PeerTubeServerError) { private async handlePasswordError (err: PeerTubeServerError) {

View File

@ -7,7 +7,8 @@ import { getBackendUrl } from './url'
export class LiveManager { export class LiveManager {
private liveSocket: Socket private liveSocket: Socket
private listeners = new Map<string, (payload: LiveVideoEventPayload) => void>() private stateChangeListeners = new Map<string, (payload: LiveVideoEventPayload) => void>()
private forceEndListeners = new Map<string, () => void>()
constructor ( constructor (
private readonly playerHTML: PlayerHTML private readonly playerHTML: PlayerHTML
@ -17,16 +18,19 @@ export class LiveManager {
async listenForChanges (options: { async listenForChanges (options: {
video: VideoDetails video: VideoDetails
onPublishedVideo: () => any onPublishedVideo: () => any
onForceEnd: () => any
}) { }) {
const { video, onPublishedVideo } = options const { video, onPublishedVideo, onForceEnd } = options
if (!this.liveSocket) { if (!this.liveSocket) {
const io = (await import('socket.io-client')).io const io = (await import('socket.io-client')).io
this.liveSocket = io(getBackendUrl() + '/live-videos') this.liveSocket = io(getBackendUrl() + '/live-videos')
} }
const listener = (payload: LiveVideoEventPayload) => { const stateChangeListener = (payload: LiveVideoEventPayload) => {
if (payload.state === VideoState.PUBLISHED) { if (payload.state === VideoState.PUBLISHED) {
this.playerHTML.removeInformation() this.playerHTML.removeInformation()
onPublishedVideo() onPublishedVideo()
@ -34,16 +38,28 @@ export class LiveManager {
} }
} }
this.liveSocket.on('state-change', listener) const forceEndListener = () => {
this.listeners.set(video.uuid, listener) onForceEnd()
}
this.liveSocket.on('state-change', stateChangeListener)
this.liveSocket.on('force-end', forceEndListener)
this.stateChangeListeners.set(video.uuid, stateChangeListener)
this.forceEndListeners.set(video.uuid, forceEndListener)
this.liveSocket.emit('subscribe', { videoId: video.id }) this.liveSocket.emit('subscribe', { videoId: video.id })
} }
stopListeningForChanges (video: VideoDetails) { stopListeningForChanges (video: VideoDetails) {
const listener = this.listeners.get(video.uuid) {
if (listener) { const listener = this.stateChangeListeners.get(video.uuid)
this.liveSocket.off('state-change', listener) if (listener) this.liveSocket.off('state-change', listener)
}
{
const listener = this.forceEndListeners.get(video.uuid)
if (listener) this.liveSocket.off('force-end', listener)
} }
this.liveSocket.emit('unsubscribe', { videoId: video.id }) this.liveSocket.emit('unsubscribe', { videoId: video.id })

View File

@ -1 +1 @@
export type LiveVideoEventType = 'state-change' | 'views-change' export type LiveVideoEventType = 'state-change' | 'views-change' | 'force-end'

View File

@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import { wait } from '@peertube/peertube-core-utils' import { wait } from '@peertube/peertube-core-utils'
import { LiveVideoEventPayload, VideoPrivacy, VideoState, VideoStateType } from '@peertube/peertube-models' import { LiveVideoCreate, LiveVideoEventPayload, VideoPrivacy, VideoState, VideoStateType } from '@peertube/peertube-models'
import { import {
PeerTubeServer, PeerTubeServer,
cleanupTests, cleanupTests,
@ -36,11 +36,13 @@ describe('Test live socket messages', function () {
describe('Live socket messages', function () { describe('Live socket messages', function () {
async function createLiveWrapper () { async function createLiveWrapper (options: Partial<LiveVideoCreate> = {}) {
const liveAttributes = { const liveAttributes = {
name: 'live video', name: 'live video',
channelId: servers[0].store.channel.id, channelId: servers[0].store.channel.id,
privacy: VideoPrivacy.PUBLIC privacy: VideoPrivacy.PUBLIC,
...options
} }
const { uuid } = await servers[0].live.create({ fields: liveAttributes }) const { uuid } = await servers[0].live.create({ fields: liveAttributes })
@ -173,6 +175,48 @@ describe('Test live socket messages', function () {
expect(stateChanges).to.have.lengthOf(1) expect(stateChanges).to.have.lengthOf(1)
}) })
it('Should correctly send a force end notification', async function () {
this.timeout(60000)
let hadForcedEndEvent = false
await servers[0].kill()
const env = { PEERTUBE_TEST_CONSTANTS_VIDEO_LIVE_CLEANUP_DELAY: '20000' }
await servers[0].run({}, { env })
const liveVideoUUID = await createLiveWrapper({ permanentLive: true })
{
const videoId = await servers[0].videos.getId({ uuid: liveVideoUUID })
const localSocket = servers[0].socketIO.getLiveNotificationSocket()
localSocket.on('force-end', () => { hadForcedEndEvent = true })
localSocket.emit('subscribe', { videoId })
}
// Streaming session #1
const rtmpOptions = {
videoId: liveVideoUUID,
copyCodecs: true,
fixtureName: 'video_short.mp4'
}
let ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo(rtmpOptions)
await servers[0].live.waitUntilPublished({ videoId: liveVideoUUID })
await stopFfmpeg(ffmpegCommand)
await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
// Streaming session #2
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo(rtmpOptions)
// eslint-disable-next-line no-unmodified-loop-condition
while (!hadForcedEndEvent) {
await wait(500)
}
})
}) })
after(async function () { after(async function () {

View File

@ -279,6 +279,8 @@ class LiveManager {
if (oldStreamingPlaylist) { if (oldStreamingPlaylist) {
if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
PeerTubeSocket.Instance.sendVideoForceEnd(video)
await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
} }

View File

@ -8,6 +8,7 @@ import { UserNotificationModelForApi } from '@server/types/models/user/index.js'
import { LiveVideoEventPayload, LiveVideoEventType } from '@peertube/peertube-models' import { LiveVideoEventPayload, LiveVideoEventType } from '@peertube/peertube-models'
import { logger } from '../helpers/logger.js' import { logger } from '../helpers/logger.js'
import { authenticateRunnerSocket, authenticateSocket } from '../middlewares/index.js' import { authenticateRunnerSocket, authenticateSocket } from '../middlewares/index.js'
import { isDevInstance } from '@peertube/peertube-node-utils'
class PeerTubeSocket { class PeerTubeSocket {
@ -20,7 +21,11 @@ class PeerTubeSocket {
private constructor () {} private constructor () {}
init (server: HTTPServer) { init (server: HTTPServer) {
const io = new SocketServer(server) const io = new SocketServer(server, {
cors: isDevInstance()
? { origin: 'http://localhost:5173', methods: [ 'GET', 'POST' ] }
: undefined
})
io.of('/user-notifications') io.of('/user-notifications')
.use(authenticateSocket) .use(authenticateSocket)
@ -88,6 +93,8 @@ class PeerTubeSocket {
} }
} }
// ---------------------------------------------------------------------------
sendVideoLiveNewState (video: MVideo) { sendVideoLiveNewState (video: MVideo) {
const data: LiveVideoEventPayload = { state: video.state } const data: LiveVideoEventPayload = { state: video.state }
const type: LiveVideoEventType = 'state-change' const type: LiveVideoEventType = 'state-change'
@ -110,6 +117,18 @@ class PeerTubeSocket {
.emit(type, data) .emit(type, data)
} }
sendVideoForceEnd (video: MVideo) {
const type: LiveVideoEventType = 'force-end'
logger.debug('Sending video live "force end" notification of %s.', video.url)
this.liveVideosNamespace
.in(video.id + '')
.emit(type)
}
// ---------------------------------------------------------------------------
@Debounce({ timeoutMS: 1000 }) @Debounce({ timeoutMS: 1000 })
sendAvailableJobsPingToRunners () { sendAvailableJobsPingToRunners () {
logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`) logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)