From 821e597092e0249c8142c387ed2cf96228999924 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 27 Apr 2026 18:55:24 +0200 Subject: [PATCH] Add `/api/videos/streams` route --- api/src/auth.ts | 20 +-- api/src/base.ts | 2 + api/src/controllers/streams.ts | 222 +++++++++++++++++++++++++++++++++ api/src/models/user.ts | 24 ++++ auth/models/users.go | 2 +- 5 files changed, 250 insertions(+), 20 deletions(-) create mode 100644 api/src/controllers/streams.ts create mode 100644 api/src/models/user.ts diff --git a/api/src/auth.ts b/api/src/auth.ts index e0cc3e6d..d2be7c8e 100644 --- a/api/src/auth.ts +++ b/api/src/auth.ts @@ -2,7 +2,7 @@ import { TypeCompiler } from "@sinclair/typebox/compiler"; import { Value } from "@sinclair/typebox/value"; import Elysia, { t } from "elysia"; import { createRemoteJWKSet, jwtVerify } from "jose"; -import { KError } from "./models/error"; +import { UserC } from "~/models/user"; import type { Prettify } from "./utils"; const jwtSecret = process.env.JWT_SECRET @@ -91,24 +91,6 @@ export const auth = new Elysia({ name: "auth" }) }) .as("scoped"); -const User = t.Object({ - id: t.String({ format: "uuid" }), - username: t.String(), - email: t.String({ format: "email" }), - createdDate: t.Date(), - lastSeen: t.Date(), - claims: t.Record(t.String(), t.Any()), - oidc: t.Record( - t.String(), - t.Object({ - id: t.String(), - username: t.String(), - profileUrl: t.Nullable(t.String({ format: "url" })), - }), - ), -}); -const UserC = TypeCompiler.Compile(t.Union([User, KError])); - export async function getUserInfo( id: string, headers: { authorization: string }, diff --git a/api/src/base.ts b/api/src/base.ts index d6db13a7..73215c9f 100644 --- a/api/src/base.ts +++ b/api/src/base.ts @@ -15,6 +15,7 @@ import { movies } from "./controllers/shows/movies"; import { series } from "./controllers/shows/series"; import { showsH } from "./controllers/shows/shows"; import { staffH } from "./controllers/staff"; +import { streamsH } from "./controllers/streams"; import { studiosH } from "./controllers/studios"; import { videosMetadata } from "./controllers/video-metadata"; import { videosReadH } from "./controllers/videos"; @@ -115,6 +116,7 @@ export const handlers = new Elysia({ prefix }) }, (app) => app + .use(streamsH) .use(showsH) .use(movies) .use(series) diff --git a/api/src/controllers/streams.ts b/api/src/controllers/streams.ts new file mode 100644 index 00000000..773633b6 --- /dev/null +++ b/api/src/controllers/streams.ts @@ -0,0 +1,222 @@ +import { TypeCompiler } from "@sinclair/typebox/compiler"; +import { and, desc, eq, sql } from "drizzle-orm"; +import { Elysia, t } from "elysia"; +import { auth } from "~/auth"; +import { db } from "~/db"; +import { history, profiles, videos } from "~/db/schema"; +import { sqlarr } from "~/db/utils"; +import { Entry } from "~/models/entry"; +import { KError } from "~/models/error"; +import { Show } from "~/models/show"; +import { User } from "~/models/user"; +import { AcceptLanguage, processLanguages } from "~/models/utils"; +import { uniq } from "~/utils"; +import { getVideos } from "./videos"; + +const TranscodeStatus = t.Object({ + index: t.Integer(), + quality: t.String(), + heads: t.Array( + t.Object({ + start: t.Number(), + end: t.Number(), + startHead: t.Integer(), + endHead: t.Integer(), + isRunning: t.Boolean(), + }), + ), +}); + +const ViewerTrack = t.Object({ + index: t.Integer(), + quality: t.String(), + head: t.Integer(), +}); + +const TranscoderViewer = t.Object({ + clientId: t.String(), + profileId: t.Nullable(t.String({ format: "uuid" })), + sessionId: t.Nullable(t.String()), + video: t.Nullable(ViewerTrack), + audio: t.Nullable(ViewerTrack), +}); + +const TranscoderStream = t.Object({ + path: t.String(), + sha: t.String(), + duration: t.Number(), + videos: t.Array(TranscodeStatus), + audios: t.Array(TranscodeStatus), + viewers: t.Array(TranscoderViewer), +}); +type TranscoderStream = typeof TranscoderStream.static; +const TranscoderStreamListC = TypeCompiler.Compile(t.Array(TranscoderStream)); + +const UserPageC = TypeCompiler.Compile(t.Object({ items: t.Array(User) })); + +const RunningViewerTrack = t.Object({ + index: t.Integer(), + quality: t.String(), + head: t.Integer(), +}); + +const RunningStream = t.Object({ + id: t.String({ format: "uuid" }), + path: t.String(), + duration: t.Number(), + videos: t.Array(TranscodeStatus), + audios: t.Array(TranscodeStatus), + viewers: t.Array( + t.Object({ + user: t.Nullable(User), + progress: t.Nullable(t.Number()), + video: t.Nullable(RunningViewerTrack), + audio: t.Nullable(RunningViewerTrack), + }), + ), + entries: t.Array(t.Omit(Entry, ["videos", "progress"])), + show: t.Nullable(Show), +}); + +export const streamsH = new Elysia({ tags: ["videos"] }).use(auth).get( + "videos/streams", + async ({ + headers: { authorization, "accept-language": langs }, + jwt: { sub, settings }, + status, + }) => { + let streams: TranscoderStream[]; + + try { + const response = await fetch( + new URL( + "/video/streams", + process.env.TRANSCODER_SERVER ?? "http://transcoder:7666", + ), + { + headers: authorization ? { authorization } : undefined, + }, + ); + + if (!response.ok) { + return status(502, { + status: 502, + message: "Cannot fetch running streams from transcoder.", + details: await response.text(), + }); + } + + streams = TranscoderStreamListC.Decode(await response.json()); + } catch (e) { + return status(502, { + status: 502, + message: "Cannot reach transcoder service.", + details: e, + }); + } + + if (!streams.length) return []; + + const usersById = new Map(); + try { + const resp: Response = await fetch( + new URL("/auth/users", process.env.AUTH_SERVER ?? "http://auth:4568"), + { + headers: authorization ? { authorization } : undefined, + }, + ); + if (!resp.ok) { + return status(502, { + status: 502, + message: "Cannot fetch users from auth service.", + details: await resp.text(), + }); + } + + const { items } = UserPageC.Decode(await resp.json()); + + for (const user of items) { + usersById.set(user.id, user); + } + } catch (e) { + return status(502, { + status: 502, + message: "Cannot reach auth service.", + details: e, + }); + } + + const paths = streams.map((x) => x.path); + const items = await getVideos({ + limit: paths.length, + filter: eq(videos.path, sql`any(${sqlarr(paths)})`), + languages: processLanguages(langs), + preferOriginal: settings.preferOriginal, + relations: ["show"], + userId: sub, + }); + + const profileIds = uniq( + streams + .flatMap((x) => x.viewers.map((v) => v.profileId)) + .filter((x): x is string => !!x), + ); + const videoIds = items.map((x) => x.id); + const progress = new Map(); + if (profileIds.length > 0 && videoIds.length > 0) { + const progressRows = await db + .selectDistinctOn([profiles.id, videos.id], { + profileId: profiles.id, + videoId: videos.id, + time: history.time, + }) + .from(history) + .innerJoin(profiles, eq(history.profilePk, profiles.pk)) + .innerJoin(videos, eq(history.videoPk, videos.pk)) + .where( + and( + eq(profiles.id, sql`any(${sqlarr(profileIds)}::uuid[])`), + eq(videos.id, sql`any(${sqlarr(videoIds)}::uuid[])`), + ), + ) + .orderBy(profiles.id, videos.id, desc(history.playedDate)); + + for (const row of progressRows) { + progress.set(`${row.profileId}:${row.videoId}`, row.time); + } + } + + const videosByPath = new Map(items.map((x) => [x.path, x])); + return streams.map((stream) => { + const video = videosByPath.get(stream.path); + return { + id: video!.id, + path: stream.path, + duration: stream.duration, + videos: stream.videos, + audios: stream.audios, + viewers: stream.viewers.map((viewer) => ({ + user: usersById.get(viewer.profileId ?? ""), + progress: progress.get(`${viewer.profileId}:${video?.id}`) ?? null, + video: viewer.video, + audio: viewer.audio, + })), + entries: video?.entries ?? [], + show: video?.show ?? null, + }; + }); + }, + { + detail: { + description: "List currently running streams", + }, + headers: t.Object({ + "accept-language": AcceptLanguage({ autoFallback: true }), + }), + response: { + 200: t.Array(RunningStream), + 422: KError, + 502: KError, + }, + }, +); diff --git a/api/src/models/user.ts b/api/src/models/user.ts new file mode 100644 index 00000000..d8cb7d0e --- /dev/null +++ b/api/src/models/user.ts @@ -0,0 +1,24 @@ +import { TypeCompiler } from "@sinclair/typebox/compiler"; +import { t } from "elysia"; +import { KError } from "./error"; + +export const User = t.Object({ + id: t.String({ format: "uuid" }), + username: t.String(), + email: t.String({ format: "email" }), + createdDate: t.Date(), + lastSeen: t.Date(), + claims: t.Record(t.String(), t.Any()), + oidc: t.Record( + t.String(), + t.Object({ + id: t.String(), + username: t.String(), + profileUrl: t.Nullable(t.String({ format: "url" })), + }), + ), +}); + +export type User = typeof User.static; + +export const UserC = TypeCompiler.Compile(t.Union([User, KError])); diff --git a/auth/models/users.go b/auth/models/users.go index de704348..a047d214 100644 --- a/auth/models/users.go +++ b/auth/models/users.go @@ -25,7 +25,7 @@ type User struct { // List of custom claims JWT created via get /jwt will have Claims jwt.MapClaims `json:"claims" example:"isAdmin: true"` // List of other login method available for this user. Access tokens wont be returned here. - Oidc map[string]OidcHandle `json:"oidc,omitempty"` + Oidc map[string]OidcHandle `json:"oidc"` } type OidcHandle struct {