Add ability to search a video with an URL

This commit is contained in:
Chocobozzz
2018-08-22 11:51:39 +02:00
parent 22a16e36f6
commit f6eebcb336
19 changed files with 244 additions and 135 deletions

View File

@@ -177,7 +177,8 @@ async function addFetchOutboxJob (actor: ActorModel) {
}
const payload = {
uris: [ actor.outboxUrl ]
uri: actor.outboxUrl,
type: 'activity' as 'activity'
}
return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
@@ -248,6 +249,7 @@ function saveActorAndServerAndModelIfNotExist (
} else if (actorCreated.type === 'Group') { // Video channel
actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t)
actorCreated.VideoChannel.Actor = actorCreated
actorCreated.VideoChannel.Account = ownerActor.Account
}
return actorCreated

View File

@@ -1,8 +1,9 @@
import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers'
import { doRequest } from '../../helpers/requests'
import { logger } from '../../helpers/logger'
import Bluebird = require('bluebird')
async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any>) {
async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any> | Bluebird<any>) {
logger.info('Crawling ActivityPub data on %s.', uri)
const options = {

View File

@@ -24,10 +24,8 @@ export {
async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) {
const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id
let video: VideoModel
const res = await getOrCreateAccountAndVideoAndChannel(objectUri)
video = res.video
const { video } = await getOrCreateAccountAndVideoAndChannel(objectUri)
return sequelizeTypescript.transaction(async t => {
// Add share entry

View File

@@ -23,7 +23,7 @@ async function processCreateActivity (activity: ActivityCreate) {
} else if (activityType === 'Dislike') {
return retryTransactionWrapper(processCreateDislike, actor, activity)
} else if (activityType === 'Video') {
return processCreateVideo(actor, activity)
return processCreateVideo(activity)
} else if (activityType === 'Flag') {
return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject)
} else if (activityType === 'Note') {
@@ -42,13 +42,10 @@ export {
// ---------------------------------------------------------------------------
async function processCreateVideo (
actor: ActorModel,
activity: ActivityCreate
) {
async function processCreateVideo (activity: ActivityCreate) {
const videoToCreateData = activity.object as VideoTorrentObject
const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData, actor)
const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData)
return video
}

View File

@@ -2,12 +2,13 @@ import { VideoCommentObject } from '../../../shared/models/activitypub/objects/v
import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments'
import { logger } from '../../helpers/logger'
import { doRequest } from '../../helpers/requests'
import { ACTIVITY_PUB } from '../../initializers'
import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers'
import { ActorModel } from '../../models/activitypub/actor'
import { VideoModel } from '../../models/video/video'
import { VideoCommentModel } from '../../models/video/video-comment'
import { getOrCreateActorAndServerAndModel } from './actor'
import { getOrCreateAccountAndVideoAndChannel } from './videos'
import * as Bluebird from 'bluebird'
async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) {
let originCommentId: number = null
@@ -38,9 +39,9 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto
}
async function addVideoComments (commentUrls: string[], instance: VideoModel) {
for (const commentUrl of commentUrls) {
await addVideoComment(instance, commentUrl)
}
return Bluebird.map(commentUrls, commentUrl => {
return addVideoComment(instance, commentUrl)
}, { concurrency: CRAWL_REQUEST_CONCURRENCY })
}
async function addVideoComment (videoInstance: VideoModel, commentUrl: string) {

View File

@@ -11,7 +11,7 @@ import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos
import { retryTransactionWrapper } from '../../helpers/database-utils'
import { logger } from '../../helpers/logger'
import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests'
import { ACTIVITY_PUB, CONFIG, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers'
import { ACTIVITY_PUB, CONFIG, CRAWL_REQUEST_CONCURRENCY, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers'
import { AccountVideoRateModel } from '../../models/account/account-video-rate'
import { ActorModel } from '../../models/activitypub/actor'
import { TagModel } from '../../models/video/tag'
@@ -26,6 +26,8 @@ import { sendCreateVideo, sendUpdateVideo } from './send'
import { shareVideoByServerAndChannel } from './index'
import { isArray } from '../../helpers/custom-validators/misc'
import { VideoCaptionModel } from '../../models/video/video-caption'
import { JobQueue } from '../job-queue'
import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher'
async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
// If the video is not private and published, we federate it
@@ -178,10 +180,10 @@ function getOrCreateVideoChannel (videoObject: VideoTorrentObject) {
return getOrCreateActorAndServerAndModel(channel.id)
}
async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel) {
async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) {
logger.debug('Adding remote video %s.', videoObject.id)
return sequelizeTypescript.transaction(async t => {
const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => {
const sequelizeOptions = {
transaction: t
}
@@ -191,10 +193,6 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor:
const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to)
const video = VideoModel.build(videoData)
// Don't block on remote HTTP request (we are in a transaction!)
generateThumbnailFromUrl(video, videoObject.icon)
.catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
const videoCreated = await video.save(sequelizeOptions)
// Process files
@@ -222,68 +220,100 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor:
videoCreated.VideoChannel = channelActor.VideoChannel
return videoCreated
})
const p = generateThumbnailFromUrl(videoCreated, videoObject.icon)
.catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
if (waitThumbnail === true) await p
return videoCreated
}
async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentObject | string, actor?: ActorModel) {
type SyncParam = {
likes: boolean,
dislikes: boolean,
shares: boolean,
comments: boolean,
thumbnail: boolean
}
async function getOrCreateAccountAndVideoAndChannel (
videoObject: VideoTorrentObject | string,
syncParam: SyncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
) {
const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id
const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl)
if (videoFromDatabase) {
return {
video: videoFromDatabase,
actor: videoFromDatabase.VideoChannel.Account.Actor,
channelActor: videoFromDatabase.VideoChannel.Actor
}
}
if (videoFromDatabase) return { video: videoFromDatabase }
videoObject = await fetchRemoteVideo(videoUrl)
if (!videoObject) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
const fetchedVideo = await fetchRemoteVideo(videoUrl)
if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
if (!actor) {
const actorObj = videoObject.attributedTo.find(a => a.type === 'Person')
if (!actorObj) throw new Error('Cannot find associated actor to video ' + videoObject.url)
actor = await getOrCreateActorAndServerAndModel(actorObj.id)
}
const channelActor = await getOrCreateVideoChannel(videoObject)
const video = await retryTransactionWrapper(getOrCreateVideo, videoObject, channelActor)
const channelActor = await getOrCreateVideoChannel(fetchedVideo)
const video = await retryTransactionWrapper(getOrCreateVideo, fetchedVideo, channelActor, syncParam.thumbnail)
// Process outside the transaction because we could fetch remote data
logger.info('Adding likes of video %s.', video.uuid)
await crawlCollectionPage<string>(videoObject.likes, (items) => createRates(items, video, 'like'))
logger.info('Adding dislikes of video %s.', video.uuid)
await crawlCollectionPage<string>(videoObject.dislikes, (items) => createRates(items, video, 'dislike'))
logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
logger.info('Adding shares of video %s.', video.uuid)
await crawlCollectionPage<string>(videoObject.shares, (items) => addVideoShares(items, video))
const jobPayloads: ActivitypubHttpFetcherPayload[] = []
logger.info('Adding comments of video %s.', video.uuid)
await crawlCollectionPage<string>(videoObject.comments, (items) => addVideoComments(items, video))
if (syncParam.likes === true) {
await crawlCollectionPage<string>(fetchedVideo.likes, items => createRates(items, video, 'like'))
.catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
} else {
jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
}
return { actor, channelActor, video }
if (syncParam.dislikes === true) {
await crawlCollectionPage<string>(fetchedVideo.dislikes, items => createRates(items, video, 'dislike'))
.catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
} else {
jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
}
if (syncParam.shares === true) {
await crawlCollectionPage<string>(fetchedVideo.shares, items => addVideoShares(items, video))
.catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
} else {
jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
}
if (syncParam.comments === true) {
await crawlCollectionPage<string>(fetchedVideo.comments, items => addVideoComments(items, video))
.catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
} else {
jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
}
await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))
return { video }
}
async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) {
let rateCounts = 0
const tasks: Bluebird<number>[] = []
for (const actorUrl of actorUrls) {
const actor = await getOrCreateActorAndServerAndModel(actorUrl)
const p = AccountVideoRateModel
.create({
videoId: video.id,
accountId: actor.Account.id,
type: rate
})
.then(() => rateCounts += 1)
await Bluebird.map(actorUrls, async actorUrl => {
try {
const actor = await getOrCreateActorAndServerAndModel(actorUrl)
const [ , created ] = await AccountVideoRateModel
.findOrCreate({
where: {
videoId: video.id,
accountId: actor.Account.id
},
defaults: {
videoId: video.id,
accountId: actor.Account.id,
type: rate
}
})
tasks.push(p)
}
await Promise.all(tasks)
if (created) rateCounts += 1
} catch (err) {
logger.warn('Cannot add rate %s for actor %s.', rate, actorUrl, { err })
}
}, { concurrency: CRAWL_REQUEST_CONCURRENCY })
logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid)
@@ -294,34 +324,35 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR
}
async function addVideoShares (shareUrls: string[], instance: VideoModel) {
for (const shareUrl of shareUrls) {
// Fetch url
const { body } = await doRequest({
uri: shareUrl,
json: true,
activityPub: true
})
if (!body || !body.actor) {
logger.warn('Cannot add remote share with url: %s, skipping...', shareUrl)
continue
}
await Bluebird.map(shareUrls, async shareUrl => {
try {
// Fetch url
const { body } = await doRequest({
uri: shareUrl,
json: true,
activityPub: true
})
if (!body || !body.actor) throw new Error('Body of body actor is invalid')
const actorUrl = body.actor
const actor = await getOrCreateActorAndServerAndModel(actorUrl)
const actorUrl = body.actor
const actor = await getOrCreateActorAndServerAndModel(actorUrl)
const entry = {
actorId: actor.id,
videoId: instance.id,
url: shareUrl
}
await VideoShareModel.findOrCreate({
where: {
const entry = {
actorId: actor.id,
videoId: instance.id,
url: shareUrl
},
defaults: entry
})
}
}
await VideoShareModel.findOrCreate({
where: {
url: shareUrl
},
defaults: entry
})
} catch (err) {
logger.warn('Cannot add share %s.', shareUrl, { err })
}
}, { concurrency: CRAWL_REQUEST_CONCURRENCY })
}
async function fetchRemoteVideo (videoUrl: string): Promise<VideoTorrentObject> {
@@ -355,5 +386,6 @@ export {
videoFileActivityUrlToDBAttributes,
getOrCreateVideo,
getOrCreateVideoChannel,
addVideoShares
addVideoShares,
createRates
}

View File

@@ -1,22 +1,36 @@
import * as Bull from 'bull'
import { logger } from '../../../helpers/logger'
import { processActivities } from '../../activitypub/process'
import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
import { VideoModel } from '../../../models/video/video'
import { addVideoShares, createRates } from '../../activitypub/videos'
import { addVideoComments } from '../../activitypub/video-comments'
import { crawlCollectionPage } from '../../activitypub/crawl'
import { Activity } from '../../../../shared/models/activitypub'
type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments'
export type ActivitypubHttpFetcherPayload = {
uris: string[]
uri: string
type: FetchType
videoId?: number
}
async function processActivityPubHttpFetcher (job: Bull.Job) {
logger.info('Processing ActivityPub fetcher in job %d.', job.id)
const payload = job.data as ActivitypubHttpBroadcastPayload
const payload = job.data as ActivitypubHttpFetcherPayload
for (const uri of payload.uris) {
await crawlCollectionPage<Activity>(uri, (items) => processActivities(items))
let video: VideoModel
if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId)
const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = {
'activity': items => processActivities(items),
'video-likes': items => createRates(items, video, 'like'),
'video-dislikes': items => createRates(items, video, 'dislike'),
'video-shares': items => addVideoShares(items, video),
'video-comments': items => addVideoComments(items, video)
}
return crawlCollectionPage(payload.uri, fetcherType[payload.type])
}
// ---------------------------------------------------------------------------