From ad807fadb4b113a9805ff03745ec971994e32850 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Sun, 5 Apr 2026 12:06:47 +0200 Subject: [PATCH] Prepare next video automatically (#1423) --- api/.env.example | 2 + api/src/controllers/profiles/history.ts | 46 ++++++++++++++--------- api/src/websockets.ts | 50 ++++++++++++++++++++++--- chart/templates/api/deployment.yaml | 2 + transcoder/src/api/metadata.go | 43 +++++++++++++++++++++ 5 files changed, 121 insertions(+), 22 deletions(-) diff --git a/api/.env.example b/api/.env.example index bb25b658..44ba726d 100644 --- a/api/.env.example +++ b/api/.env.example @@ -8,6 +8,8 @@ JWT_SECRET= JWT_ISSUER=$PUBLIC_URL # keibi's server to retrieve the public jwt secret AUTH_SERVER=http://auth:4568 +# transcoder url used to prepare videos beforehand. +TRANSCODER_SERVER=http://transcoder:7666 IMAGES_PATH=./images diff --git a/api/src/controllers/profiles/history.ts b/api/src/controllers/profiles/history.ts index e024dcc6..b4cedfcc 100644 --- a/api/src/controllers/profiles/history.ts +++ b/api/src/controllers/profiles/history.ts @@ -46,6 +46,7 @@ export async function updateProgress(userPk: number, progress: SeedHistory[]) { const hist = await updateHistory(tx, userPk, progress); if (hist.created.length + hist.updated.length !== progress.length) { tx.rollback(); + throw "unreachable"; } // only return new and entries whose status has changed. // we don't need to update the watchlist every 10s when watching a video. @@ -53,7 +54,11 @@ export async function updateProgress(userPk: number, progress: SeedHistory[]) { ...hist.created, ...hist.updated.filter((x) => x.percent >= 95), ]); - return { status: 201, inserted: hist.created.length } as const; + return { + status: 201, + inserted: hist.created.length, + history: hist, + } as const; }); } catch (e) { if (!(e instanceof TransactionRollbackError)) throw e; @@ -72,26 +77,33 @@ async function updateHistory( return dbTx.transaction(async (tx) => { // `for("update", { of: history })` will put the `kyoo.history` instead // of `history` in the sql and that triggers a sql error. - const existing = ( - await tx - .select({ videoId: videos.id }) - .from(history) - .for("update", { of: sql`history` as any }) - .innerJoin(videos, eq(videos.pk, history.videoPk)) - .where( - and( - eq(history.profilePk, userPk), - lte(sql`now() - ${history.playedDate}`, sql`interval '1 day'`), - ), - ) - ).map((x) => x.videoId); + const existing = await tx + .select({ + videoPk: videos.pk, + videoId: videos.id, + percent: history.percent, + }) + .from(history) + .for("update", { of: sql`history` as any }) + .innerJoin(videos, eq(videos.pk, history.videoPk)) + .where( + and( + eq(history.profilePk, userPk), + lte(sql`now() - ${history.playedDate}`, sql`interval '1 day'`), + ), + ); const toUpdate = traverse( - progress.filter((x) => x.videoId && existing.includes(x.videoId)), + progress.filter( + (x) => x.videoId && existing.map((x) => x.videoId).includes(x.videoId), + ), ); const newEntries = traverse( progress - .filter((x) => !x.videoId || !existing.includes(x.videoId)) + .filter( + (x) => + !x.videoId || !existing.map((x) => x.videoId).includes(x.videoId), + ) .map((x) => ({ ...x, external: x.external ?? false, @@ -174,7 +186,7 @@ async function updateHistory( playedDate: history.playedDate, }); - return { created, updated }; + return { created, updated, existing }; }); } diff --git a/api/src/websockets.ts b/api/src/websockets.ts index 6cc2bc7a..d41792dc 100644 --- a/api/src/websockets.ts +++ b/api/src/websockets.ts @@ -1,8 +1,14 @@ +import { getLogger } from "@logtape/logtape"; import type { TObject, TString } from "@sinclair/typebox"; +import { eq } from "drizzle-orm"; import Elysia, { type TSchema, t } from "elysia"; import { auth } from "./auth"; import { updateProgress } from "./controllers/profiles/history"; import { getOrCreateProfile } from "./controllers/profiles/profile"; +import { getVideos } from "./controllers/videos"; +import { videos } from "./db/schema"; + +const logger = getLogger(); const actionMap = { ping: handler({ @@ -16,11 +22,9 @@ const actionMap = { time: t.Integer({ minimum: 0, }), - videoId: t.Nullable( - t.String({ - format: "uuid", - }), - ), + videoId: t.String({ + format: "uuid", + }), entry: t.String(), }), permissions: ["core.read"], @@ -38,7 +42,43 @@ const actionMap = { const ret = await updateProgress(profilePk, [ { ...body, playedDate: null }, ]); + ws.send({ action: "watch", ...ret }); + + if (ret.status !== 201) return; + + const old = ret.history.existing.find((x) => x.videoId === body.videoId); + if (!old) return; + + if ( + (old.percent < 50 && body.percent >= 50) || + (old.percent < 75 && body.percent >= 75) + ) { + const [vid] = await getVideos({ + filter: eq(videos.id, body.videoId), + limit: 1, + relations: ["next"], + languages: ["*"], + userId: ws.data.jwt.sub, + }); + if (!vid) return; + + logger.info("Preparing next video {videoId}", { + videoId: vid.id, + }); + const path = Buffer.from(vid.path, "utf8").toString("base64url"); + await fetch( + new URL( + `/video/${path}/prepare`, + process.env.TRANSCODER_SERVER ?? "http://transcoder:7666", + ), + { + headers: { + authorization: ws.data.headers.authorization!, + }, + }, + ); + } }, }), }; diff --git a/chart/templates/api/deployment.yaml b/chart/templates/api/deployment.yaml index 7a9b5b60..a2bf5c05 100644 --- a/chart/templates/api/deployment.yaml +++ b/chart/templates/api/deployment.yaml @@ -60,6 +60,8 @@ spec: value: {{ .Values.kyoo.address | quote }} - name: AUTH_SERVER value: "http://{{ include "kyoo.auth.fullname" . }}:4568" + - name: TRANSCODER_SERVER + value: "http://{{ include "kyoo.transcoder.fullname" . }}:7666" - name: IMAGES_PATH value: "/images" - name: PGUSER diff --git a/transcoder/src/api/metadata.go b/transcoder/src/api/metadata.go index 91ca67ae..a314fc19 100644 --- a/transcoder/src/api/metadata.go +++ b/transcoder/src/api/metadata.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "io" "mime" @@ -22,6 +23,7 @@ func RegisterMetadataHandlers(e *echo.Group, metadata *src.MetadataService) { h := mhandler{metadata} e.GET("/:path/info", h.GetInfo) + e.GET("/:path/prepare", h.Prepare) e.GET("/:path/subtitle/:name", h.GetSubtitle) e.GET("/:path/attachment/:name", h.GetAttachment) e.GET("/:path/thumbnails.png", h.GetThumbnails) @@ -75,6 +77,47 @@ func (h *mhandler) GetInfo(c *echo.Context) error { return c.JSON(http.StatusOK, ret) } +// @Summary Prepare metadata +// +// @Description Starts metadata preparation in background (info, extract, thumbs, keyframes). +// +// @Tags metadata +// @Param path path string true "Base64 of a video's path" format(base64) example(L3ZpZGVvL2J1YmJsZS5ta3YK) +// +// @Success 202 "Preparation started" +// @Router /:path/prepare [get] +func (h *mhandler) Prepare(c *echo.Context) error { + path, sha, err := getPath(c) + if err != nil { + return err + } + + go func(path string, sha string) { + bgCtx := context.Background() + + info, err := h.metadata.GetMetadata(bgCtx, path, sha) + if err != nil { + fmt.Printf("failed to prepare metadata for %s: %v\n", path, err) + return + } + + // thumb & subs are already extracted in `GetMetadata` + + for _, video := range info.Videos { + if _, err := h.metadata.GetKeyframes(info, true, video.Index); err != nil { + fmt.Printf("failed to extract video keyframes for %s (stream %d): %v\n", path, video.Index, err) + } + } + for _, audio := range info.Audios { + if _, err := h.metadata.GetKeyframes(info, false, audio.Index); err != nil { + fmt.Printf("failed to extract audio keyframes for %s (stream %d): %v\n", path, audio.Index, err) + } + } + }(path, sha) + + return c.NoContent(http.StatusAccepted) +} + // @Summary Get subtitle // // @Description Get a specific subtitle.