From 60d59d7f7bf5255e44f493b1a5b4a6069adc991e Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Fri, 28 Nov 2025 17:25:29 +0100 Subject: [PATCH] Wrap every insert with a trace --- api/src/controllers/seed/images.ts | 130 ++++---- api/src/controllers/seed/insert/collection.ts | 157 ++++----- api/src/controllers/seed/insert/entries.ts | 298 +++++++++--------- api/src/controllers/seed/insert/seasons.ts | 129 ++++---- api/src/controllers/seed/insert/shows.ts | 148 ++++----- api/src/controllers/seed/insert/staff.ts | 94 +++--- api/src/controllers/seed/insert/studios.ts | 119 +++---- api/src/db/index.ts | 38 ++- api/src/otel.ts | 11 +- api/tests/misc/images.test.ts | 2 +- 10 files changed, 576 insertions(+), 550 deletions(-) diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 743b0a6e..39cf7784 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -1,5 +1,5 @@ import path from "node:path"; -import { getCurrentSpan, record, setAttributes } from "@elysiajs/opentelemetry"; +import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry"; import { SpanStatusCode } from "@opentelemetry/api"; import { encode } from "blurhash"; import { and, eq, is, lt, type SQL, sql } from "drizzle-orm"; @@ -9,9 +9,10 @@ import type { PoolClient } from "pg"; import sharp from "sharp"; import { db, type Transaction } from "~/db"; import { mqueue } from "~/db/schema/mqueue"; -import type { Image } from "~/models/utils"; -import { getFile } from "~/utils"; import { unnestValues } from "~/db/utils"; +import type { Image } from "~/models/utils"; +import { record } from "~/otel"; +import { getFile } from "~/utils"; export const imageDir = process.env.IMAGES_PATH ?? "/images"; export const defaultBlurhash = "000000"; @@ -77,13 +78,10 @@ export const enqueueOptImage = ( }; }; -export const flushImageQueue = async ( - tx: Transaction, - imgQueue: ImageTask[], - priority: number, -) => { - if (!imgQueue.length) return; - record("enqueue images", async () => { +export const flushImageQueue = record( + "enqueueImages", + async (tx: Transaction, imgQueue: ImageTask[], priority: number) => { + if (!imgQueue.length) return; await tx.insert(mqueue).select( unnestValues( imgQueue.map((x) => ({ kind: "image", message: x, priority })), @@ -91,80 +89,76 @@ export const flushImageQueue = async ( ), ); await tx.execute(sql`notify kyoo_image`); - }); -}; + }, +); -export const processImages = async () => { - return record("download images", async () => { - let running = false; - async function processAll() { - if (running) return; - running = true; +export const processImages = record("processImages", async () => { + let running = false; + async function processAll() { + if (running) return; + running = true; - let found = true; - while (found) { - found = await processOne(); - } - running = false; + let found = true; + while (found) { + found = await processOne(); } + running = false; + } - const client = (await db.$client.connect()) as PoolClient; - client.on("notification", (evt) => { - if (evt.channel !== "kyoo_image") return; - processAll(); - }); - await client.query("listen kyoo_image"); - - // start processing old tasks - await processAll(); - return () => client.release(true); + const client = (await db.$client.connect()) as PoolClient; + client.on("notification", (evt) => { + if (evt.channel !== "kyoo_image") return; + processAll(); }); -}; + await client.query("listen kyoo_image"); -async function processOne() { - return record("download", async () => { - return await db.transaction(async (tx) => { - const [item] = await tx - .select() - .from(mqueue) - .for("update", { skipLocked: true }) - .where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5))) - .orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt) - .limit(1); + // start processing old tasks + await processAll(); + return () => client.release(true); +}); - if (!item) return false; +const processOne = record("download", async () => { + return await db.transaction(async (tx) => { + const [item] = await tx + .select() + .from(mqueue) + .for("update", { skipLocked: true }) + .where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5))) + .orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt) + .limit(1); - const img = item.message as ImageTask; - setAttributes({ "item.url": img.url }); - try { - const blurhash = await downloadImage(img.id, img.url); - const ret: Image = { id: img.id, source: img.url, blurhash }; + if (!item) return false; - const table = sql.raw(img.table); - const column = sql.raw(img.column); + const img = item.message as ImageTask; + setAttributes({ "item.url": img.url }); + try { + const blurhash = await downloadImage(img.id, img.url); + const ret: Image = { id: img.id, source: img.url, blurhash }; - await tx.execute(sql` + const table = sql.raw(img.table); + const column = sql.raw(img.column); + + await tx.execute(sql` update ${table} set ${column} = ${ret} where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)} `); - await tx.delete(mqueue).where(eq(mqueue.id, item.id)); - } catch (err: any) { - const span = getCurrentSpan(); - if (span) { - span.recordException(err); - span.setStatus({ code: SpanStatusCode.ERROR }); - } - console.error("Failed to download image", img.url, err.message); - await tx - .update(mqueue) - .set({ attempt: sql`${mqueue.attempt}+1` }) - .where(eq(mqueue.id, item.id)); + await tx.delete(mqueue).where(eq(mqueue.id, item.id)); + } catch (err: any) { + const span = getCurrentSpan(); + if (span) { + span.recordException(err); + span.setStatus({ code: SpanStatusCode.ERROR }); } - return true; - }); + console.error("Failed to download image", img.url, err.message); + await tx + .update(mqueue) + .set({ attempt: sql`${mqueue.attempt}+1` }) + .where(eq(mqueue.id, item.id)); + } + return true; }); -} +}); async function downloadImage(id: string, url: string): Promise { const low = await getFile(path.join(imageDir, `${id}.low.jpg`)) diff --git a/api/src/controllers/seed/insert/collection.ts b/api/src/controllers/seed/insert/collection.ts index 063bcc81..b804005e 100644 --- a/api/src/controllers/seed/insert/collection.ts +++ b/api/src/controllers/seed/insert/collection.ts @@ -1,86 +1,93 @@ import { sql } from "drizzle-orm"; import { db } from "~/db"; import { shows, showTranslations } from "~/db/schema"; -import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; +import { conflictUpdateAllExcept } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type ShowTrans = typeof showTranslations.$inferInsert; -export const insertCollection = async ( - collection: SeedCollection | undefined, - show: (({ kind: "movie" } & SeedMovie) | ({ kind: "serie" } & SeedSerie)) & { - nextRefresh: string; +export const insertCollection = record( + "insertCollection", + async ( + collection: SeedCollection | undefined, + show: ( + | ({ kind: "movie" } & SeedMovie) + | ({ kind: "serie" } & SeedSerie) + ) & { + nextRefresh: string; + }, + ) => { + if (!collection) return null; + const { translations, ...col } = collection; + + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const [ret] = await tx + .insert(shows) + .values({ + kind: "collection", + status: "unknown", + startAir: show.kind === "movie" ? show.airDate : show.startAir, + endAir: show.kind === "movie" ? show.airDate : show.endAir, + nextRefresh: show.nextRefresh, + entriesCount: 0, + original: {} as any, + ...col, + }) + .onConflictDoUpdate({ + target: shows.slug, + set: { + ...conflictUpdateAllExcept(shows, [ + "pk", + "id", + "slug", + "createdAt", + "startAir", + "endAir", + ]), + startAir: sql`least(${shows.startAir}, excluded.start_air)`, + endAir: sql`greatest(${shows.endAir}, excluded.end_air)`, + }, + }) + .returning({ pk: shows.pk, id: shows.id, slug: shows.slug }); + + const trans: ShowTrans[] = Object.entries(translations).map( + ([lang, tr]) => ({ + pk: ret.pk, + language: lang, + ...tr, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: showTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: showTranslations.thumbnail, + }), + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: showTranslations.logo, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: showTranslations.banner, + }), + }), + ); + await flushImageQueue(tx, imgQueue, 100); + // we can't unnest values here because show translations contains arrays. + await tx + .insert(showTranslations) + .values(trans) + .onConflictDoUpdate({ + target: [showTranslations.pk, showTranslations.language], + set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), + }); + return ret; + }); }, -) => { - if (!collection) return null; - const { translations, ...col } = collection; - - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const [ret] = await tx - .insert(shows) - .values({ - kind: "collection", - status: "unknown", - startAir: show.kind === "movie" ? show.airDate : show.startAir, - endAir: show.kind === "movie" ? show.airDate : show.endAir, - nextRefresh: show.nextRefresh, - entriesCount: 0, - original: {} as any, - ...col, - }) - .onConflictDoUpdate({ - target: shows.slug, - set: { - ...conflictUpdateAllExcept(shows, [ - "pk", - "id", - "slug", - "createdAt", - "startAir", - "endAir", - ]), - startAir: sql`least(${shows.startAir}, excluded.start_air)`, - endAir: sql`greatest(${shows.endAir}, excluded.end_air)`, - }, - }) - .returning({ pk: shows.pk, id: shows.id, slug: shows.slug }); - - const trans: ShowTrans[] = Object.entries(translations).map( - ([lang, tr]) => ({ - pk: ret.pk, - language: lang, - ...tr, - poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: showTranslations.poster, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: showTranslations.thumbnail, - }), - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: showTranslations.logo, - }), - banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: showTranslations.banner, - }), - }), - ); - await flushImageQueue(tx, imgQueue, 100); - // we can't unnest values here because show translations contains arrays. - await tx - .insert(showTranslations) - .values(trans) - .onConflictDoUpdate({ - target: [showTranslations.pk, showTranslations.language], - set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), - }); - return ret; - }); -}; +); diff --git a/api/src/controllers/seed/insert/entries.ts b/api/src/controllers/seed/insert/entries.ts index 34f8d1f1..afff7c57 100644 --- a/api/src/controllers/seed/insert/entries.ts +++ b/api/src/controllers/seed/insert/entries.ts @@ -8,6 +8,7 @@ import { } from "~/db/schema"; import { conflictUpdateAllExcept, unnest, unnestValues } from "~/db/utils"; import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; import { updateAvailableCount, updateAvailableSince } from "./shows"; @@ -42,161 +43,164 @@ const generateSlug = ( } }; -export const insertEntries = async ( - show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" }, - items: (SeedEntry | SeedExtra)[], - onlyExtras = false, -) => { - if (!items.length) return []; +export const insertEntries = record( + "insertEntries", + async ( + show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" }, + items: (SeedEntry | SeedExtra)[], + onlyExtras = false, + ) => { + if (!items.length) return []; - const retEntries = await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const vals: EntryI[] = items.map((seed) => { - const { translations, videos, video, ...entry } = seed; - return { - ...entry, - showPk: show.pk, - slug: generateSlug(show.slug, seed), - thumbnail: enqueueOptImage(imgQueue, { - url: seed.thumbnail, - column: entries.thumbnail, - }), - nextRefresh: - entry.kind !== "extra" - ? guessNextRefresh(entry.airDate ?? new Date()) - : guessNextRefresh(new Date()), - episodeNumber: - entry.kind === "episode" - ? entry.episodeNumber - : entry.kind === "special" - ? entry.number - : undefined, - }; - }); - const ret = await tx - .insert(entries) - .select(unnestValues(vals, entries)) - .onConflictDoUpdate({ - target: entries.slug, - set: conflictUpdateAllExcept(entries, [ - "pk", - "showPk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: entries.pk, id: entries.id, slug: entries.slug }); - - const trans: EntryTransI[] = items.flatMap((seed, i) => { - if (seed.kind === "extra") { - return [ - { - pk: ret[i].pk, - // yeah we hardcode the language to extra because if we want to support - // translations one day it won't be awkward - language: "extra", - name: seed.name, - description: null, - poster: undefined, - }, - ]; - } - - return Object.entries(seed.translations).map(([lang, tr]) => ({ - // assumes ret is ordered like items. - pk: ret[i].pk, - language: lang, - ...tr, - poster: - seed.kind === "movie" - ? enqueueOptImage(imgQueue, { - url: (tr as any).poster, - column: entryTranslations.poster, - }) - : undefined, - })); - }); - await flushImageQueue(tx, imgQueue, 0); - await tx - .insert(entryTranslations) - .select(unnestValues(trans, entryTranslations)) - .onConflictDoUpdate({ - target: [entryTranslations.pk, entryTranslations.language], - set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]), + const retEntries = await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const vals: EntryI[] = items.map((seed) => { + const { translations, videos, video, ...entry } = seed; + return { + ...entry, + showPk: show.pk, + slug: generateSlug(show.slug, seed), + thumbnail: enqueueOptImage(imgQueue, { + url: seed.thumbnail, + column: entries.thumbnail, + }), + nextRefresh: + entry.kind !== "extra" + ? guessNextRefresh(entry.airDate ?? new Date()) + : guessNextRefresh(new Date()), + episodeNumber: + entry.kind === "episode" + ? entry.episodeNumber + : entry.kind === "special" + ? entry.number + : undefined, + }; }); + const ret = await tx + .insert(entries) + .select(unnestValues(vals, entries)) + .onConflictDoUpdate({ + target: entries.slug, + set: conflictUpdateAllExcept(entries, [ + "pk", + "showPk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: entries.pk, id: entries.id, slug: entries.slug }); - return ret; - }); + const trans: EntryTransI[] = items.flatMap((seed, i) => { + if (seed.kind === "extra") { + return [ + { + pk: ret[i].pk, + // yeah we hardcode the language to extra because if we want to support + // translations one day it won't be awkward + language: "extra", + name: seed.name, + description: null, + poster: undefined, + }, + ]; + } - const vids = items.flatMap((seed, i) => { - if (seed.kind === "extra") { - return { - videoId: seed.video, + return Object.entries(seed.translations).map(([lang, tr]) => ({ + // assumes ret is ordered like items. + pk: ret[i].pk, + language: lang, + ...tr, + poster: + seed.kind === "movie" + ? enqueueOptImage(imgQueue, { + url: (tr as any).poster, + column: entryTranslations.poster, + }) + : undefined, + })); + }); + await flushImageQueue(tx, imgQueue, 0); + await tx + .insert(entryTranslations) + .select(unnestValues(trans, entryTranslations)) + .onConflictDoUpdate({ + target: [entryTranslations.pk, entryTranslations.language], + set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]), + }); + + return ret; + }); + + const vids = items.flatMap((seed, i) => { + if (seed.kind === "extra") { + return { + videoId: seed.video, + entryPk: retEntries[i].pk, + entrySlug: retEntries[i].slug, + needRendering: false, + }; + } + if (!seed.videos) return []; + return seed.videos.map((x, j) => ({ + videoId: x, entryPk: retEntries[i].pk, entrySlug: retEntries[i].slug, - needRendering: false, - }; + // The first video should not have a rendering. + needRendering: j !== 0 && seed.videos!.length > 1, + })); + }); + + if (vids.length === 0) { + // we have not added videos but we need to update the `entriesCount` + if (show.kind === "serie" && !onlyExtras) + await updateAvailableCount(db, [show.pk], true); + return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] })); } - if (!seed.videos) return []; - return seed.videos.map((x, j) => ({ - videoId: x, - entryPk: retEntries[i].pk, - entrySlug: retEntries[i].slug, - // The first video should not have a rendering. - needRendering: j !== 0 && seed.videos!.length > 1, + + const retVideos = await db.transaction(async (tx) => { + const ret = await tx + .insert(entryVideoJoin) + .select( + db + .select({ + entryPk: sql`vids."entryPk"`.as("entry"), + videoPk: videos.pk, + slug: computeVideoSlug( + sql`vids."entrySlug"`, + sql`vids."needRendering"`, + ), + }) + .from( + unnest(vids, "vids", { + entryPk: "integer", + entrySlug: "varchar(255)", + needRendering: "boolean", + videoId: "uuid", + }), + ) + .innerJoin(videos, eq(videos.id, sql`vids."videoId"`)), + ) + .onConflictDoNothing() + .returning({ + slug: entryVideoJoin.slug, + entryPk: entryVideoJoin.entryPk, + }); + + if (!onlyExtras) + await updateAvailableCount(tx, [show.pk], show.kind === "serie"); + + await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]); + return ret; + }); + + return retEntries.map((entry) => ({ + id: entry.id, + slug: entry.slug, + videos: retVideos.filter((x) => x.entryPk === entry.pk), })); - }); - - if (vids.length === 0) { - // we have not added videos but we need to update the `entriesCount` - if (show.kind === "serie" && !onlyExtras) - await updateAvailableCount(db, [show.pk], true); - return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] })); - } - - const retVideos = await db.transaction(async (tx) => { - const ret = await tx - .insert(entryVideoJoin) - .select( - db - .select({ - entryPk: sql`vids."entryPk"`.as("entry"), - videoPk: videos.pk, - slug: computeVideoSlug( - sql`vids."entrySlug"`, - sql`vids."needRendering"`, - ), - }) - .from( - unnest(vids, "vids", { - entryPk: "integer", - entrySlug: "varchar(255)", - needRendering: "boolean", - videoId: "uuid", - }), - ) - .innerJoin(videos, eq(videos.id, sql`vids."videoId"`)), - ) - .onConflictDoNothing() - .returning({ - slug: entryVideoJoin.slug, - entryPk: entryVideoJoin.entryPk, - }); - - if (!onlyExtras) - await updateAvailableCount(tx, [show.pk], show.kind === "serie"); - - await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]); - return ret; - }); - - return retEntries.map((entry) => ({ - id: entry.id, - slug: entry.slug, - videos: retVideos.filter((x) => x.entryPk === entry.pk), - })); -}; + }, +); export function computeVideoSlug(entrySlug: SQL | Column, needsRendering: SQL) { return sql` diff --git a/api/src/controllers/seed/insert/seasons.ts b/api/src/controllers/seed/insert/seasons.ts index 8caf947c..00254c2b 100644 --- a/api/src/controllers/seed/insert/seasons.ts +++ b/api/src/controllers/seed/insert/seasons.ts @@ -2,76 +2,77 @@ import { db } from "~/db"; import { seasons, seasonTranslations } from "~/db/schema"; import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedSeason } from "~/models/season"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; type SeasonI = typeof seasons.$inferInsert; type SeasonTransI = typeof seasonTranslations.$inferInsert; -export const insertSeasons = async ( - show: { pk: number; slug: string }, - items: SeedSeason[], -) => { - if (!items.length) return []; +export const insertSeasons = record( + "insertSeasons", + async (show: { pk: number; slug: string }, items: SeedSeason[]) => { + if (!items.length) return []; - return db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const vals: SeasonI[] = items.map((x) => { - const { translations, ...season } = x; - return { - ...season, - showPk: show.pk, - slug: - season.seasonNumber === 0 - ? `${show.slug}-specials` - : `${show.slug}-s${season.seasonNumber}`, - nextRefresh: guessNextRefresh(season.startAir ?? new Date()), - }; - }); - const ret = await tx - .insert(seasons) - .select(unnestValues(vals, seasons)) - .onConflictDoUpdate({ - target: seasons.slug, - set: conflictUpdateAllExcept(seasons, [ - "pk", - "showPk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug }); - - const trans: SeasonTransI[] = items.flatMap((seed, i) => - Object.entries(seed.translations).map(([lang, tr]) => ({ - // assumes ret is ordered like items. - pk: ret[i].pk, - language: lang, - ...tr, - poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: seasonTranslations.poster, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: seasonTranslations.thumbnail, - }), - banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: seasonTranslations.banner, - }), - })), - ); - await flushImageQueue(tx, imgQueue, -10); - await tx - .insert(seasonTranslations) - .select(unnestValues(trans, seasonTranslations)) - .onConflictDoUpdate({ - target: [seasonTranslations.pk, seasonTranslations.language], - set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]), + return db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const vals: SeasonI[] = items.map((x) => { + const { translations, ...season } = x; + return { + ...season, + showPk: show.pk, + slug: + season.seasonNumber === 0 + ? `${show.slug}-specials` + : `${show.slug}-s${season.seasonNumber}`, + nextRefresh: guessNextRefresh(season.startAir ?? new Date()), + }; }); + const ret = await tx + .insert(seasons) + .select(unnestValues(vals, seasons)) + .onConflictDoUpdate({ + target: seasons.slug, + set: conflictUpdateAllExcept(seasons, [ + "pk", + "showPk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug }); - return ret; - }); -}; + const trans: SeasonTransI[] = items.flatMap((seed, i) => + Object.entries(seed.translations).map(([lang, tr]) => ({ + // assumes ret is ordered like items. + pk: ret[i].pk, + language: lang, + ...tr, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: seasonTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: seasonTranslations.thumbnail, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: seasonTranslations.banner, + }), + })), + ); + await flushImageQueue(tx, imgQueue, -10); + await tx + .insert(seasonTranslations) + .select(unnestValues(trans, seasonTranslations)) + .onConflictDoUpdate({ + target: [seasonTranslations.pk, seasonTranslations.language], + set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]), + }); + + return ret; + }); + }, +); diff --git a/api/src/controllers/seed/insert/shows.ts b/api/src/controllers/seed/insert/shows.ts index cf59f9a0..a1fc5d19 100644 --- a/api/src/controllers/seed/insert/shows.ts +++ b/api/src/controllers/seed/insert/shows.ts @@ -16,94 +16,98 @@ import { shows, showTranslations, } from "~/db/schema"; -import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; +import { conflictUpdateAllExcept, sqlarr } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; import type { Original } from "~/models/utils"; +import { record } from "~/otel"; import { getYear } from "~/utils"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type Show = typeof shows.$inferInsert; type ShowTrans = typeof showTranslations.$inferInsert; -export const insertShow = async ( - show: Omit, - original: Original & { - poster: string | null; - thumbnail: string | null; - banner: string | null; - logo: string | null; - }, - translations: - | SeedMovie["translations"] - | SeedSerie["translations"] - | SeedCollection["translations"], -) => { - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const orig = { - ...original, - poster: enqueueOptImage(imgQueue, { - url: original.poster, - table: shows, - column: sql`${shows.original}['poster']`, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: original.thumbnail, - table: shows, - column: sql`${shows.original}['thumbnail']`, - }), - banner: enqueueOptImage(imgQueue, { - url: original.banner, - table: shows, - column: sql`${shows.original}['banner']`, - }), - logo: enqueueOptImage(imgQueue, { - url: original.logo, - table: shows, - column: sql`${shows.original}['logo']`, - }), - }; - const ret = await insertBaseShow(tx, { ...show, original: orig }); - if ("status" in ret) return ret; - - const trans: ShowTrans[] = Object.entries(translations).map( - ([lang, tr]) => ({ - pk: ret.pk, - language: lang, - ...tr, - latinName: tr.latinName ?? null, +export const insertShow = record( + "insertShow", + async ( + show: Omit, + original: Original & { + poster: string | null; + thumbnail: string | null; + banner: string | null; + logo: string | null; + }, + translations: + | SeedMovie["translations"] + | SeedSerie["translations"] + | SeedCollection["translations"], + ) => { + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const orig = { + ...original, poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: showTranslations.poster, + url: original.poster, + table: shows, + column: sql`${shows.original}['poster']`, }), thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: showTranslations.thumbnail, - }), - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: showTranslations.logo, + url: original.thumbnail, + table: shows, + column: sql`${shows.original}['thumbnail']`, }), banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: showTranslations.banner, + url: original.banner, + table: shows, + column: sql`${shows.original}['banner']`, }), - }), - ); - await flushImageQueue(tx, imgQueue, 200); - // we can't unnest values here because show translations contains arrays. - await tx - .insert(showTranslations) - .values(trans) - .onConflictDoUpdate({ - target: [showTranslations.pk, showTranslations.language], - set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), - }); - return ret; - }); -}; + logo: enqueueOptImage(imgQueue, { + url: original.logo, + table: shows, + column: sql`${shows.original}['logo']`, + }), + }; + const ret = await insertBaseShow(tx, { ...show, original: orig }); + if ("status" in ret) return ret; + + const trans: ShowTrans[] = Object.entries(translations).map( + ([lang, tr]) => ({ + pk: ret.pk, + language: lang, + ...tr, + latinName: tr.latinName ?? null, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: showTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: showTranslations.thumbnail, + }), + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: showTranslations.logo, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: showTranslations.banner, + }), + }), + ); + await flushImageQueue(tx, imgQueue, 200); + // we can't unnest values here because show translations contains arrays. + await tx + .insert(showTranslations) + .values(trans) + .onConflictDoUpdate({ + target: [showTranslations.pk, showTranslations.language], + set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), + }); + return ret; + }); + }, +); async function insertBaseShow(tx: Transaction, show: Show) { function insert() { diff --git a/api/src/controllers/seed/insert/staff.ts b/api/src/controllers/seed/insert/staff.ts index 8ebb6be9..837bf74b 100644 --- a/api/src/controllers/seed/insert/staff.ts +++ b/api/src/controllers/seed/insert/staff.ts @@ -3,55 +3,61 @@ import { db } from "~/db"; import { roles, staff } from "~/db/schema"; import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedStaff } from "~/models/staff"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; -export const insertStaff = async ( - seed: SeedStaff[] | undefined, - showPk: number, -) => { - if (!seed?.length) return []; +export const insertStaff = record( + "insertStaff", + async (seed: SeedStaff[] | undefined, showPk: number) => { + if (!seed?.length) return []; - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const people = seed.map((x) => ({ - ...x.staff, - image: enqueueOptImage(imgQueue, { - url: x.staff.image, - column: staff.image, - }), - })); - const ret = await tx - .insert(staff) - .select(unnestValues(people, staff)) - .onConflictDoUpdate({ - target: staff.slug, - set: conflictUpdateAllExcept(staff, ["pk", "id", "slug", "createdAt"]), - }) - .returning({ pk: staff.pk, id: staff.id, slug: staff.slug }); - - const rval = seed.map((x, i) => ({ - showPk, - staffPk: ret[i].pk, - kind: x.kind, - order: i, - character: { - ...x.character, + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const people = seed.map((x) => ({ + ...x.staff, image: enqueueOptImage(imgQueue, { - url: x.character.image, - table: roles, - column: sql`${roles.character}['image']`, + url: x.staff.image, + column: staff.image, }), - }, - })); + })); + const ret = await tx + .insert(staff) + .select(unnestValues(people, staff)) + .onConflictDoUpdate({ + target: staff.slug, + set: conflictUpdateAllExcept(staff, [ + "pk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: staff.pk, id: staff.id, slug: staff.slug }); - await flushImageQueue(tx, imgQueue, -200); + const rval = seed.map((x, i) => ({ + showPk, + staffPk: ret[i].pk, + kind: x.kind, + order: i, + character: { + ...x.character, + image: enqueueOptImage(imgQueue, { + url: x.character.image, + table: roles, + column: sql`${roles.character}['image']`, + }), + }, + })); - // always replace all roles. this is because: - // - we want `order` to stay in sync (& without duplicates) - // - we don't have ways to identify a role so we can't onConflict - await tx.delete(roles).where(eq(roles.showPk, showPk)); - await tx.insert(roles).select(unnestValues(rval, roles)); + await flushImageQueue(tx, imgQueue, -200); - return ret; - }); -}; + // always replace all roles. this is because: + // - we want `order` to stay in sync (& without duplicates) + // - we don't have ways to identify a role so we can't onConflict + await tx.delete(roles).where(eq(roles.showPk, showPk)); + await tx.insert(roles).select(unnestValues(rval, roles)); + + return ret; + }); + }, +); diff --git a/api/src/controllers/seed/insert/studios.ts b/api/src/controllers/seed/insert/studios.ts index f15c4eb8..2906aeb8 100644 --- a/api/src/controllers/seed/insert/studios.ts +++ b/api/src/controllers/seed/insert/studios.ts @@ -3,69 +3,72 @@ import { db } from "~/db"; import { showStudioJoin, studios, studioTranslations } from "~/db/schema"; import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; import type { SeedStudio } from "~/models/studio"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type StudioI = typeof studios.$inferInsert; type StudioTransI = typeof studioTranslations.$inferInsert; -export const insertStudios = async ( - seed: SeedStudio[] | undefined, - showPk: number, -) => { - if (!seed?.length) return []; +export const insertStudios = record( + "insertStudios", + async (seed: SeedStudio[] | undefined, showPk: number) => { + if (!seed?.length) return []; - return await db.transaction(async (tx) => { - const vals: StudioI[] = seed.map((x) => { - const { translations, ...item } = x; - return item; - }); - - const ret = await tx - .insert(studios) - .select(unnestValues(vals, studios)) - .onConflictDoUpdate({ - target: studios.slug, - set: conflictUpdateAllExcept(studios, [ - "pk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: studios.pk, id: studios.id, slug: studios.slug }); - - const imgQueue: ImageTask[] = []; - const trans: StudioTransI[] = seed.flatMap((x, i) => - Object.entries(x.translations).map(([lang, tr]) => ({ - pk: ret[i].pk, - language: lang, - name: tr.name, - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: studioTranslations.logo, - }), - })), - ); - await flushImageQueue(tx, imgQueue, -100); - await tx - .insert(studioTranslations) - .select(unnestValues(trans, studioTranslations)) - .onConflictDoUpdate({ - target: [studioTranslations.pk, studioTranslations.language], - set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]), + return await db.transaction(async (tx) => { + const vals: StudioI[] = seed.map((x) => { + const { translations, ...item } = x; + return item; }); - await tx - .insert(showStudioJoin) - .select( - db - .select({ - showPk: sql`${showPk}`.as("showPk"), - studioPk: sql`v."studioPk"`.as("studioPk"), - }) - .from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`), - ) - .onConflictDoNothing(); - return ret; - }); -}; + const ret = await tx + .insert(studios) + .select(unnestValues(vals, studios)) + .onConflictDoUpdate({ + target: studios.slug, + set: conflictUpdateAllExcept(studios, [ + "pk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: studios.pk, id: studios.id, slug: studios.slug }); + + const imgQueue: ImageTask[] = []; + const trans: StudioTransI[] = seed.flatMap((x, i) => + Object.entries(x.translations).map(([lang, tr]) => ({ + pk: ret[i].pk, + language: lang, + name: tr.name, + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: studioTranslations.logo, + }), + })), + ); + await flushImageQueue(tx, imgQueue, -100); + await tx + .insert(studioTranslations) + .select(unnestValues(trans, studioTranslations)) + .onConflictDoUpdate({ + target: [studioTranslations.pk, studioTranslations.language], + set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]), + }); + + await tx + .insert(showStudioJoin) + .select( + db + .select({ + showPk: sql`${showPk}`.as("showPk"), + studioPk: sql`v."studioPk"`.as("studioPk"), + }) + .from( + sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`, + ), + ) + .onConflictDoNothing(); + return ret; + }); + }, +); diff --git a/api/src/db/index.ts b/api/src/db/index.ts index c7132116..95fc853e 100644 --- a/api/src/db/index.ts +++ b/api/src/db/index.ts @@ -1,12 +1,12 @@ import os from "node:os"; import path from "node:path"; import tls, { type ConnectionOptions } from "node:tls"; -import { record } from "@elysiajs/opentelemetry"; import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; import { sql } from "drizzle-orm"; import { drizzle } from "drizzle-orm/node-postgres"; import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator"; import type { PoolConfig } from "pg"; +import { record } from "~/otel"; import * as schema from "./schema"; const config: PoolConfig = { @@ -122,26 +122,24 @@ instrumentDrizzleClient(db, { maxQueryTextLength: 100_000_000, }); -export const migrate = async () => { - return record("migrate", async () => { - try { - await db.execute( - sql.raw(` - create extension if not exists pg_trgm; - set pg_trgm.word_similarity_threshold = 0.4; - alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; - `), - ); - } catch (err: any) { - console.error("Error while updating pg_trgm", err.message); - } - await migrateDb(db, { - migrationsSchema: "kyoo", - migrationsFolder: "./drizzle", - }); - console.log(`Database ${postgresConfig.database} migrated!`); +export const migrate = record("migrate", async () => { + try { + await db.execute( + sql.raw(` + create extension if not exists pg_trgm; + set pg_trgm.word_similarity_threshold = 0.4; + alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; + `), + ); + } catch (err: any) { + console.error("Error while updating pg_trgm", err.message); + } + await migrateDb(db, { + migrationsSchema: "kyoo", + migrationsFolder: "./drizzle", }); -}; + console.log(`Database ${postgresConfig.database} migrated!`); +}); export type Transaction = | typeof db diff --git a/api/src/otel.ts b/api/src/otel.ts index 1875b823..7dde8c37 100644 --- a/api/src/otel.ts +++ b/api/src/otel.ts @@ -1,4 +1,4 @@ -import { opentelemetry } from "@elysiajs/opentelemetry"; +import { record as elysiaRecord, opentelemetry } from "@elysiajs/opentelemetry"; import { OTLPMetricExporter as GrpcMetricExporter } from "@opentelemetry/exporter-metrics-otlp-grpc"; import { OTLPMetricExporter as HttpMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto"; import { OTLPTraceExporter as GrpcTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"; @@ -32,3 +32,12 @@ export const otel = new Elysia() }), ) .as("global"); + +export function record any>( + spanName: string, + fn: T, +): T { + const wrapped = (...args: Parameters) => + elysiaRecord(spanName, () => fn(...args)); + return wrapped as T; +} diff --git a/api/tests/misc/images.test.ts b/api/tests/misc/images.test.ts index 1a81d224..e3d73c68 100644 --- a/api/tests/misc/images.test.ts +++ b/api/tests/misc/images.test.ts @@ -33,7 +33,7 @@ describe("images", () => { it("Download 404 image", async () => { await db.delete(mqueue); - const url404 = "https://mockhttp.org/status/404"; + const url404 = "https://mockhttp.org/status/404"; const [ret, body] = await createMovie({ ...dune, translations: {