From d822463fe052dd594007bf53708cc74226403510 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 24 Nov 2025 17:31:46 +0100 Subject: [PATCH 1/6] Add a trace for api migrations --- api/src/db/index.ts | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/api/src/db/index.ts b/api/src/db/index.ts index adbe2eb5..cc4a8901 100644 --- a/api/src/db/index.ts +++ b/api/src/db/index.ts @@ -7,6 +7,7 @@ import { drizzle } from "drizzle-orm/node-postgres"; import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator"; import type { PoolConfig } from "pg"; import * as schema from "./schema"; +import { record } from "@elysiajs/opentelemetry"; const config: PoolConfig = { connectionString: process.env.POSTGRES_URL, @@ -112,7 +113,6 @@ const postgresConfig = await parseSslConfig(); // use this when using drizzle-kit since it can't parse await statements // const postgresConfig = config; -console.log("Connecting to postgres with config", postgresConfig); export const db = drizzle({ schema, connection: postgresConfig, @@ -123,22 +123,24 @@ instrumentDrizzleClient(db, { }); export const migrate = async () => { - try { - await db.execute( - sql.raw(` - create extension if not exists pg_trgm; - set pg_trgm.word_similarity_threshold = 0.4; - alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; - `), - ); - } catch (err: any) { - console.error("Error while updating pg_trgm", err.message); - } - await migrateDb(db, { - migrationsSchema: "kyoo", - migrationsFolder: "./drizzle", + return record("migrate", async () => { + try { + await db.execute( + sql.raw(` + create extension if not exists pg_trgm; + set pg_trgm.word_similarity_threshold = 0.4; + alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; + `), + ); + } catch (err: any) { + console.error("Error while updating pg_trgm", err.message); + } + await migrateDb(db, { + migrationsSchema: "kyoo", + migrationsFolder: "./drizzle", + }); + console.log(`Database ${postgresConfig.database} migrated!`); }); - console.log(`Database ${postgresConfig.database} migrated!`); }; export type Transaction = From 5f8ddd435adeee6840e3f3c77d5d7265b93ceb9e Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 25 Nov 2025 20:42:45 +0100 Subject: [PATCH 2/6] Use unnest for entries --- api/src/controllers/seed/insert/entries.ts | 6 +- api/src/db/index.ts | 2 +- api/src/db/utils.ts | 86 +++++++++++++++++++++- api/tests/misc/images.test.ts | 2 + api/tests/setup.ts | 1 + 5 files changed, 91 insertions(+), 6 deletions(-) diff --git a/api/src/controllers/seed/insert/entries.ts b/api/src/controllers/seed/insert/entries.ts index c53590c4..fb5c9e60 100644 --- a/api/src/controllers/seed/insert/entries.ts +++ b/api/src/controllers/seed/insert/entries.ts @@ -6,7 +6,7 @@ import { entryVideoJoin, videos, } from "~/db/schema"; -import { conflictUpdateAllExcept, values } from "~/db/utils"; +import { conflictUpdateAllExcept, unnestValues, values } from "~/db/utils"; import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; @@ -75,7 +75,7 @@ export const insertEntries = async ( }); const ret = await tx .insert(entries) - .values(vals) + .select(unnestValues(vals, entries)) .onConflictDoUpdate({ target: entries.slug, set: conflictUpdateAllExcept(entries, [ @@ -120,7 +120,7 @@ export const insertEntries = async ( await flushImageQueue(tx, imgQueue, 0); await tx .insert(entryTranslations) - .values(trans) + .select(unnestValues(trans, entryTranslations)) .onConflictDoUpdate({ target: [entryTranslations.pk, entryTranslations.language], set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]), diff --git a/api/src/db/index.ts b/api/src/db/index.ts index cc4a8901..c7132116 100644 --- a/api/src/db/index.ts +++ b/api/src/db/index.ts @@ -1,13 +1,13 @@ import os from "node:os"; import path from "node:path"; import tls, { type ConnectionOptions } from "node:tls"; +import { record } from "@elysiajs/opentelemetry"; import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; import { sql } from "drizzle-orm"; import { drizzle } from "drizzle-orm/node-postgres"; import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator"; import type { PoolConfig } from "pg"; import * as schema from "./schema"; -import { record } from "@elysiajs/opentelemetry"; const config: PoolConfig = { connectionString: process.env.POSTGRES_URL, diff --git a/api/src/db/utils.ts b/api/src/db/utils.ts index bab0eedf..e76423ed 100644 --- a/api/src/db/utils.ts +++ b/api/src/db/utils.ts @@ -8,12 +8,17 @@ import { type Subquery, sql, Table, + type TableConfig, View, ViewBaseConfig, } from "drizzle-orm"; import type { CasingCache } from "drizzle-orm/casing"; import type { AnyMySqlSelect } from "drizzle-orm/mysql-core"; -import type { AnyPgSelect, SelectedFieldsFlat } from "drizzle-orm/pg-core"; +import type { + AnyPgSelect, + PgTableWithColumns, + SelectedFieldsFlat, +} from "drizzle-orm/pg-core"; import type { AnySQLiteSelect } from "drizzle-orm/sqlite-core"; import type { WithSubquery } from "drizzle-orm/subquery"; import { db } from "./index"; @@ -70,7 +75,15 @@ export function conflictUpdateAllExcept< // drizzle is bugged and doesn't allow js arrays to be used in raw sql. export function sqlarr(array: unknown[]) { - return `{${array.map((item) => `"${item}"`).join(",")}}`; + return `{${array + .map((item) => + !item || item === "null" + ? "null" + : typeof item === "object" + ? `"${JSON.stringify(item).replaceAll('"', '\\"')}"` + : `"${item}"`, + ) + .join(", ")}}`; } // See https://github.com/drizzle-team/drizzle-orm/issues/4044 @@ -103,6 +116,75 @@ export function values( }; } +/* goal: + * unnestValues([{a: 1, b: 2}, {a: 3, b: 4}], tbl) + * + * ```sql + * select a, b, now() as updated_at from unnest($1::integer[], $2::integer[]); + * ``` + * params: + * $1: [1, 2] + * $2: [3, 4] + * + * select + */ +export const unnestValues = < + T extends Record, + C extends TableConfig = never, +>( + values: T[], + typeInfo: PgTableWithColumns, +) => { + if (values[0] === undefined) + throw new Error("Invalid values, expecting at least one items"); + const columns = getTableColumns(typeInfo); + const keys = Object.keys(values[0]).filter((x) => x in columns); + // @ts-expect-error: drizzle internal + const casing = db.dialect.casing as CasingCache; + const dbNames = Object.fromEntries( + Object.entries(columns).map(([k, v]) => [k, casing.getColumnCasing(v)]), + ); + const vals = values.reduce( + (acc, cur, i) => { + for (const k of keys) { + if (k in cur) acc[k].push(cur[k]); + else acc[k].push(null); + } + for (const k of Object.keys(cur)) { + if (k in acc) continue; + if (!(k in columns)) continue; + keys.push(k); + acc[k] = new Array(i).fill(null); + acc[k].push(cur[k]) + } + return acc; + }, + Object.fromEntries(keys.map((x) => [x, [] as unknown[]])), + ); + const computed = Object.entries(columns) + .filter(([k, v]) => (v.defaultFn || v.onUpdateFn) && !keys.includes(k)) + .map(([k]) => k); + return db + .select( + Object.fromEntries([ + ...keys.map((x) => [x, sql.raw(`"${dbNames[x]}"`)]), + ...computed.map((x) => [ + x, + (columns[x].defaultFn?.() ?? columns[x].onUpdateFn!()).as(dbNames[x]), + ]), + ]), + ) + .from( + sql`unnest(${sql.join( + keys.map( + (k) => + sql`${sqlarr(vals[k])}${sql.raw(`::${columns[k].getSQLType()}[]`)}`, + ), + sql.raw(", "), + )}) as v(${sql.raw(keys.map((x) => `"${dbNames[x]}"`).join(", "))})`, + ); +}; + export const coalesce = (val: SQL | SQLWrapper, def: SQL | Column) => { return sql`coalesce(${val}, ${def})`; }; diff --git a/api/tests/misc/images.test.ts b/api/tests/misc/images.test.ts index db5a3212..acaf6ff6 100644 --- a/api/tests/misc/images.test.ts +++ b/api/tests/misc/images.test.ts @@ -21,6 +21,7 @@ describe("images", () => { const release = await processImages(); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release(); + await db.delete(mqueue); const ret = await db.query.shows.findFirst({ where: eq(shows.slug, madeInAbyss.slug), @@ -45,6 +46,7 @@ describe("images", () => { const release = await processImages(); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release(); + await db.delete(mqueue); const failed = await db.query.mqueue.findFirst({ where: and( diff --git a/api/tests/setup.ts b/api/tests/setup.ts index 75382825..873c8192 100644 --- a/api/tests/setup.ts +++ b/api/tests/setup.ts @@ -3,6 +3,7 @@ import { beforeAll } from "bun:test"; process.env.PGDATABASE = "kyoo_test"; process.env.JWT_SECRET = "this is a secret"; process.env.JWT_ISSUER = "https://kyoo.zoriya.dev"; +process.env.IMAGES_PATH = "./images"; beforeAll(async () => { // lazy load this so env set before actually applies From a45e992339584e1bef4c20625d63d7aee8522175 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 26 Nov 2025 18:44:59 +0100 Subject: [PATCH 3/6] Properly type unnestValues return --- api/src/db/utils.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/src/db/utils.ts b/api/src/db/utils.ts index e76423ed..b0bb5040 100644 --- a/api/src/db/utils.ts +++ b/api/src/db/utils.ts @@ -155,7 +155,7 @@ export const unnestValues = < if (!(k in columns)) continue; keys.push(k); acc[k] = new Array(i).fill(null); - acc[k].push(cur[k]) + acc[k].push(cur[k]); } return acc; }, @@ -172,7 +172,11 @@ export const unnestValues = < x, (columns[x].defaultFn?.() ?? columns[x].onUpdateFn!()).as(dbNames[x]), ]), - ]), + ]) as { + [k in keyof typeof typeInfo.$inferInsert]-?: SQL.Aliased< + (typeof typeInfo.$inferInsert)[k] + >; + }, ) .from( sql`unnest(${sql.join( From 8fc279d2ed34ff8578b74cca5a071250d8a6336a Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 26 Nov 2025 19:42:17 +0100 Subject: [PATCH 4/6] Use unnest everywhere --- api/src/controllers/seed/images.ts | 10 ++++-- api/src/controllers/seed/insert/collection.ts | 4 +-- api/src/controllers/seed/insert/entries.ts | 7 ++-- api/src/controllers/seed/insert/seasons.ts | 6 ++-- api/src/controllers/seed/insert/shows.ts | 4 +-- api/src/controllers/seed/insert/staff.ts | 6 ++-- api/src/controllers/seed/insert/studios.ts | 16 ++++++--- api/src/controllers/videos.ts | 9 ++--- api/src/db/utils.ts | 33 ++++++++++++++++--- 9 files changed, 67 insertions(+), 28 deletions(-) diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 552df70f..743b0a6e 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -11,6 +11,7 @@ import { db, type Transaction } from "~/db"; import { mqueue } from "~/db/schema/mqueue"; import type { Image } from "~/models/utils"; import { getFile } from "~/utils"; +import { unnestValues } from "~/db/utils"; export const imageDir = process.env.IMAGES_PATH ?? "/images"; export const defaultBlurhash = "000000"; @@ -83,9 +84,12 @@ export const flushImageQueue = async ( ) => { if (!imgQueue.length) return; record("enqueue images", async () => { - await tx - .insert(mqueue) - .values(imgQueue.map((x) => ({ kind: "image", message: x, priority }))); + await tx.insert(mqueue).select( + unnestValues( + imgQueue.map((x) => ({ kind: "image", message: x, priority })), + mqueue, + ), + ); await tx.execute(sql`notify kyoo_image`); }); }; diff --git a/api/src/controllers/seed/insert/collection.ts b/api/src/controllers/seed/insert/collection.ts index 0b1a9b9e..b93c8b5a 100644 --- a/api/src/controllers/seed/insert/collection.ts +++ b/api/src/controllers/seed/insert/collection.ts @@ -1,7 +1,7 @@ import { sql } from "drizzle-orm"; import { db } from "~/db"; import { shows, showTranslations } from "~/db/schema"; -import { conflictUpdateAllExcept } from "~/db/utils"; +import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; @@ -75,7 +75,7 @@ export const insertCollection = async ( await flushImageQueue(tx, imgQueue, 100); await tx .insert(showTranslations) - .values(trans) + .select(unnestValues(trans, showTranslations)) .onConflictDoUpdate({ target: [showTranslations.pk, showTranslations.language], set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), diff --git a/api/src/controllers/seed/insert/entries.ts b/api/src/controllers/seed/insert/entries.ts index fb5c9e60..c6dcbc5c 100644 --- a/api/src/controllers/seed/insert/entries.ts +++ b/api/src/controllers/seed/insert/entries.ts @@ -6,7 +6,7 @@ import { entryVideoJoin, videos, } from "~/db/schema"; -import { conflictUpdateAllExcept, unnestValues, values } from "~/db/utils"; +import { conflictUpdateAllExcept, unnest, unnestValues } from "~/db/utils"; import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; @@ -169,11 +169,12 @@ export const insertEntries = async ( ), }) .from( - values(vids, { + unnest(vids, "vids", { entryPk: "integer", + entrySlug: "string", needRendering: "boolean", videoId: "uuid", - }).as("vids"), + }), ) .innerJoin(videos, eq(videos.id, sql`vids.videoId`)), ) diff --git a/api/src/controllers/seed/insert/seasons.ts b/api/src/controllers/seed/insert/seasons.ts index c0520da4..8caf947c 100644 --- a/api/src/controllers/seed/insert/seasons.ts +++ b/api/src/controllers/seed/insert/seasons.ts @@ -1,6 +1,6 @@ import { db } from "~/db"; import { seasons, seasonTranslations } from "~/db/schema"; -import { conflictUpdateAllExcept } from "~/db/utils"; +import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedSeason } from "~/models/season"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; @@ -30,7 +30,7 @@ export const insertSeasons = async ( }); const ret = await tx .insert(seasons) - .values(vals) + .select(unnestValues(vals, seasons)) .onConflictDoUpdate({ target: seasons.slug, set: conflictUpdateAllExcept(seasons, [ @@ -66,7 +66,7 @@ export const insertSeasons = async ( await flushImageQueue(tx, imgQueue, -10); await tx .insert(seasonTranslations) - .values(trans) + .select(unnestValues(trans, seasonTranslations)) .onConflictDoUpdate({ target: [seasonTranslations.pk, seasonTranslations.language], set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]), diff --git a/api/src/controllers/seed/insert/shows.ts b/api/src/controllers/seed/insert/shows.ts index 88cc8eab..5bb23af6 100644 --- a/api/src/controllers/seed/insert/shows.ts +++ b/api/src/controllers/seed/insert/shows.ts @@ -16,7 +16,7 @@ import { shows, showTranslations, } from "~/db/schema"; -import { conflictUpdateAllExcept, sqlarr } from "~/db/utils"; +import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; @@ -95,7 +95,7 @@ export const insertShow = async ( await flushImageQueue(tx, imgQueue, 200); await tx .insert(showTranslations) - .values(trans) + .select(unnestValues(trans, showTranslations)) .onConflictDoUpdate({ target: [showTranslations.pk, showTranslations.language], set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), diff --git a/api/src/controllers/seed/insert/staff.ts b/api/src/controllers/seed/insert/staff.ts index 11cd689b..8ebb6be9 100644 --- a/api/src/controllers/seed/insert/staff.ts +++ b/api/src/controllers/seed/insert/staff.ts @@ -1,7 +1,7 @@ import { eq, sql } from "drizzle-orm"; import { db } from "~/db"; import { roles, staff } from "~/db/schema"; -import { conflictUpdateAllExcept } from "~/db/utils"; +import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedStaff } from "~/models/staff"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; @@ -22,7 +22,7 @@ export const insertStaff = async ( })); const ret = await tx .insert(staff) - .values(people) + .select(unnestValues(people, staff)) .onConflictDoUpdate({ target: staff.slug, set: conflictUpdateAllExcept(staff, ["pk", "id", "slug", "createdAt"]), @@ -50,7 +50,7 @@ export const insertStaff = async ( // - we want `order` to stay in sync (& without duplicates) // - we don't have ways to identify a role so we can't onConflict await tx.delete(roles).where(eq(roles.showPk, showPk)); - await tx.insert(roles).values(rval); + await tx.insert(roles).select(unnestValues(rval, roles)); return ret; }); diff --git a/api/src/controllers/seed/insert/studios.ts b/api/src/controllers/seed/insert/studios.ts index e6695af4..c9f8f5dc 100644 --- a/api/src/controllers/seed/insert/studios.ts +++ b/api/src/controllers/seed/insert/studios.ts @@ -1,6 +1,7 @@ +import { sql } from "drizzle-orm"; import { db } from "~/db"; import { showStudioJoin, studios, studioTranslations } from "~/db/schema"; -import { conflictUpdateAllExcept } from "~/db/utils"; +import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; import type { SeedStudio } from "~/models/studio"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; @@ -21,7 +22,7 @@ export const insertStudios = async ( const ret = await tx .insert(studios) - .values(vals) + .select(unnestValues(vals, studios)) .onConflictDoUpdate({ target: studios.slug, set: conflictUpdateAllExcept(studios, [ @@ -48,7 +49,7 @@ export const insertStudios = async ( await flushImageQueue(tx, imgQueue, -100); await tx .insert(studioTranslations) - .values(trans) + .select(unnestValues(trans, studioTranslations)) .onConflictDoUpdate({ target: [studioTranslations.pk, studioTranslations.language], set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]), @@ -56,7 +57,14 @@ export const insertStudios = async ( await tx .insert(showStudioJoin) - .values(ret.map((studio) => ({ showPk: showPk, studioPk: studio.pk }))) + .select( + db + .select({ + showPk: sql`${showPk}`.as("showPk"), + studioPk: sql`v.studioPk`.as("studioPk"), + }) + .from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}) as v("studioPk")`), + ) .onConflictDoNothing(); return ret; }); diff --git a/api/src/controllers/videos.ts b/api/src/controllers/videos.ts index 49a1d6a5..87d35954 100644 --- a/api/src/controllers/videos.ts +++ b/api/src/controllers/videos.ts @@ -35,7 +35,8 @@ import { jsonbBuildObject, jsonbObjectAgg, sqlarr, - values, + unnest, + unnestValues, } from "~/db/utils"; import { Entry } from "~/models/entry"; import { KError } from "~/models/error"; @@ -129,10 +130,10 @@ async function linkVideos( slug: computeVideoSlug(entriesQ.slug, hasRenderingQ), }) .from( - values(links, { + unnest(links, "j", { video: "integer", entry: "jsonb", - }).as("j"), + }), ) .innerJoin(videos, eq(videos.pk, sql`j.video`)) .innerJoin( @@ -835,7 +836,7 @@ export const videosWriteH = new Elysia({ prefix: "/videos", tags: ["videos"] }) try { vids = await tx .insert(videos) - .values(body) + .select(unnestValues(body, videos)) .onConflictDoUpdate({ target: [videos.path], set: conflictUpdateAllExcept(videos, ["pk", "id", "createdAt"]), diff --git a/api/src/db/utils.ts b/api/src/db/utils.ts index b0bb5040..ab31b84c 100644 --- a/api/src/db/utils.ts +++ b/api/src/db/utils.ts @@ -74,14 +74,16 @@ export function conflictUpdateAllExcept< } // drizzle is bugged and doesn't allow js arrays to be used in raw sql. -export function sqlarr(array: unknown[]) { +export function sqlarr(array: unknown[]): string { return `{${array .map((item) => !item || item === "null" ? "null" - : typeof item === "object" - ? `"${JSON.stringify(item).replaceAll('"', '\\"')}"` - : `"${item}"`, + : Array.isArray(item) + ? sqlarr(item) + : typeof item === "object" + ? `"${JSON.stringify(item).replaceAll("\\", "\\\\").replaceAll('"', '\\"')}"` + : `"${item}"`, ) .join(", ")}}`; } @@ -137,6 +139,7 @@ export const unnestValues = < ) => { if (values[0] === undefined) throw new Error("Invalid values, expecting at least one items"); + const columns = getTableColumns(typeInfo); const keys = Object.keys(values[0]).filter((x) => x in columns); // @ts-expect-error: drizzle internal @@ -189,6 +192,28 @@ export const unnestValues = < ); }; +export const unnest = >( + values: T[], + name: string, + typeInfo: Record, +) => { + const keys = Object.keys(typeInfo); + const vals = values.reduce( + (acc, cur) => { + for (const k of keys) { + if (k in cur) acc[k].push(cur[k]); + else acc[k].push(null); + } + return acc; + }, + Object.fromEntries(keys.map((x) => [x, [] as unknown[]])), + ); + return sql`unnest(${sql.join( + keys.map((k) => sql`${sqlarr(vals[k])}${sql.raw(`::${typeInfo[k]}[]`)}`), + sql.raw(", "), + )}) as ${sql.raw(name)}(${sql.raw(keys.map((x) => `"${x}"`).join(", "))})`; +}; + export const coalesce = (val: SQL | SQLWrapper, def: SQL | Column) => { return sql`coalesce(${val}, ${def})`; }; From 464d720ef9f1761fb818dc9f600a184501970ff0 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Fri, 28 Nov 2025 17:11:29 +0100 Subject: [PATCH 5/6] Fix unnest issues --- api/src/controllers/seed/insert/collection.ts | 3 ++- api/src/controllers/seed/insert/entries.ts | 10 +++++----- api/src/controllers/seed/insert/shows.ts | 3 ++- api/src/controllers/seed/insert/studios.ts | 4 ++-- api/src/db/utils.ts | 2 +- api/tests/misc/images.test.ts | 12 ++++++++---- 6 files changed, 20 insertions(+), 14 deletions(-) diff --git a/api/src/controllers/seed/insert/collection.ts b/api/src/controllers/seed/insert/collection.ts index b93c8b5a..063bcc81 100644 --- a/api/src/controllers/seed/insert/collection.ts +++ b/api/src/controllers/seed/insert/collection.ts @@ -73,9 +73,10 @@ export const insertCollection = async ( }), ); await flushImageQueue(tx, imgQueue, 100); + // we can't unnest values here because show translations contains arrays. await tx .insert(showTranslations) - .select(unnestValues(trans, showTranslations)) + .values(trans) .onConflictDoUpdate({ target: [showTranslations.pk, showTranslations.language], set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), diff --git a/api/src/controllers/seed/insert/entries.ts b/api/src/controllers/seed/insert/entries.ts index c6dcbc5c..34f8d1f1 100644 --- a/api/src/controllers/seed/insert/entries.ts +++ b/api/src/controllers/seed/insert/entries.ts @@ -161,22 +161,22 @@ export const insertEntries = async ( .select( db .select({ - entryPk: sql`vids.entryPk`.as("entry"), + entryPk: sql`vids."entryPk"`.as("entry"), videoPk: videos.pk, slug: computeVideoSlug( - sql`vids.entrySlug`, - sql`vids.needRendering`, + sql`vids."entrySlug"`, + sql`vids."needRendering"`, ), }) .from( unnest(vids, "vids", { entryPk: "integer", - entrySlug: "string", + entrySlug: "varchar(255)", needRendering: "boolean", videoId: "uuid", }), ) - .innerJoin(videos, eq(videos.id, sql`vids.videoId`)), + .innerJoin(videos, eq(videos.id, sql`vids."videoId"`)), ) .onConflictDoNothing() .returning({ diff --git a/api/src/controllers/seed/insert/shows.ts b/api/src/controllers/seed/insert/shows.ts index 5bb23af6..cf59f9a0 100644 --- a/api/src/controllers/seed/insert/shows.ts +++ b/api/src/controllers/seed/insert/shows.ts @@ -93,9 +93,10 @@ export const insertShow = async ( }), ); await flushImageQueue(tx, imgQueue, 200); + // we can't unnest values here because show translations contains arrays. await tx .insert(showTranslations) - .select(unnestValues(trans, showTranslations)) + .values(trans) .onConflictDoUpdate({ target: [showTranslations.pk, showTranslations.language], set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), diff --git a/api/src/controllers/seed/insert/studios.ts b/api/src/controllers/seed/insert/studios.ts index c9f8f5dc..f15c4eb8 100644 --- a/api/src/controllers/seed/insert/studios.ts +++ b/api/src/controllers/seed/insert/studios.ts @@ -61,9 +61,9 @@ export const insertStudios = async ( db .select({ showPk: sql`${showPk}`.as("showPk"), - studioPk: sql`v.studioPk`.as("studioPk"), + studioPk: sql`v."studioPk"`.as("studioPk"), }) - .from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}) as v("studioPk")`), + .from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`), ) .onConflictDoNothing(); return ret; diff --git a/api/src/db/utils.ts b/api/src/db/utils.ts index ab31b84c..e36da34a 100644 --- a/api/src/db/utils.ts +++ b/api/src/db/utils.ts @@ -77,7 +77,7 @@ export function conflictUpdateAllExcept< export function sqlarr(array: unknown[]): string { return `{${array .map((item) => - !item || item === "null" + item === "null" || item === null || item === undefined ? "null" : Array.isArray(item) ? sqlarr(item) diff --git a/api/tests/misc/images.test.ts b/api/tests/misc/images.test.ts index acaf6ff6..1a81d224 100644 --- a/api/tests/misc/images.test.ts +++ b/api/tests/misc/images.test.ts @@ -17,11 +17,11 @@ describe("images", () => { }); it("Create a serie download images", async () => { + await db.delete(mqueue); await createSerie(madeInAbyss); const release = await processImages(); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release(); - await db.delete(mqueue); const ret = await db.query.shows.findFirst({ where: eq(shows.slug, madeInAbyss.slug), @@ -32,12 +32,17 @@ describe("images", () => { }); it("Download 404 image", async () => { + await db.delete(mqueue); + const url404 = "https://mockhttp.org/status/404"; const [ret, body] = await createMovie({ ...dune, translations: { en: { ...dune.translations.en, - poster: "https://www.google.com/404", + poster: url404, + thumbnail: null, + banner: null, + logo: null, }, }, }); @@ -46,12 +51,11 @@ describe("images", () => { const release = await processImages(); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release(); - await db.delete(mqueue); const failed = await db.query.mqueue.findFirst({ where: and( eq(mqueue.kind, "image"), - eq(sql`${mqueue.message}->>'url'`, "https://www.google.com/404"), + eq(sql`${mqueue.message}->>'url'`, url404), ), }); expect(failed!.attempt).toBe(5); From 60d59d7f7bf5255e44f493b1a5b4a6069adc991e Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Fri, 28 Nov 2025 17:25:29 +0100 Subject: [PATCH 6/6] Wrap every insert with a trace --- api/src/controllers/seed/images.ts | 130 ++++---- api/src/controllers/seed/insert/collection.ts | 157 ++++----- api/src/controllers/seed/insert/entries.ts | 298 +++++++++--------- api/src/controllers/seed/insert/seasons.ts | 129 ++++---- api/src/controllers/seed/insert/shows.ts | 148 ++++----- api/src/controllers/seed/insert/staff.ts | 94 +++--- api/src/controllers/seed/insert/studios.ts | 119 +++---- api/src/db/index.ts | 38 ++- api/src/otel.ts | 11 +- api/tests/misc/images.test.ts | 2 +- 10 files changed, 576 insertions(+), 550 deletions(-) diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 743b0a6e..39cf7784 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -1,5 +1,5 @@ import path from "node:path"; -import { getCurrentSpan, record, setAttributes } from "@elysiajs/opentelemetry"; +import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry"; import { SpanStatusCode } from "@opentelemetry/api"; import { encode } from "blurhash"; import { and, eq, is, lt, type SQL, sql } from "drizzle-orm"; @@ -9,9 +9,10 @@ import type { PoolClient } from "pg"; import sharp from "sharp"; import { db, type Transaction } from "~/db"; import { mqueue } from "~/db/schema/mqueue"; -import type { Image } from "~/models/utils"; -import { getFile } from "~/utils"; import { unnestValues } from "~/db/utils"; +import type { Image } from "~/models/utils"; +import { record } from "~/otel"; +import { getFile } from "~/utils"; export const imageDir = process.env.IMAGES_PATH ?? "/images"; export const defaultBlurhash = "000000"; @@ -77,13 +78,10 @@ export const enqueueOptImage = ( }; }; -export const flushImageQueue = async ( - tx: Transaction, - imgQueue: ImageTask[], - priority: number, -) => { - if (!imgQueue.length) return; - record("enqueue images", async () => { +export const flushImageQueue = record( + "enqueueImages", + async (tx: Transaction, imgQueue: ImageTask[], priority: number) => { + if (!imgQueue.length) return; await tx.insert(mqueue).select( unnestValues( imgQueue.map((x) => ({ kind: "image", message: x, priority })), @@ -91,80 +89,76 @@ export const flushImageQueue = async ( ), ); await tx.execute(sql`notify kyoo_image`); - }); -}; + }, +); -export const processImages = async () => { - return record("download images", async () => { - let running = false; - async function processAll() { - if (running) return; - running = true; +export const processImages = record("processImages", async () => { + let running = false; + async function processAll() { + if (running) return; + running = true; - let found = true; - while (found) { - found = await processOne(); - } - running = false; + let found = true; + while (found) { + found = await processOne(); } + running = false; + } - const client = (await db.$client.connect()) as PoolClient; - client.on("notification", (evt) => { - if (evt.channel !== "kyoo_image") return; - processAll(); - }); - await client.query("listen kyoo_image"); - - // start processing old tasks - await processAll(); - return () => client.release(true); + const client = (await db.$client.connect()) as PoolClient; + client.on("notification", (evt) => { + if (evt.channel !== "kyoo_image") return; + processAll(); }); -}; + await client.query("listen kyoo_image"); -async function processOne() { - return record("download", async () => { - return await db.transaction(async (tx) => { - const [item] = await tx - .select() - .from(mqueue) - .for("update", { skipLocked: true }) - .where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5))) - .orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt) - .limit(1); + // start processing old tasks + await processAll(); + return () => client.release(true); +}); - if (!item) return false; +const processOne = record("download", async () => { + return await db.transaction(async (tx) => { + const [item] = await tx + .select() + .from(mqueue) + .for("update", { skipLocked: true }) + .where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5))) + .orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt) + .limit(1); - const img = item.message as ImageTask; - setAttributes({ "item.url": img.url }); - try { - const blurhash = await downloadImage(img.id, img.url); - const ret: Image = { id: img.id, source: img.url, blurhash }; + if (!item) return false; - const table = sql.raw(img.table); - const column = sql.raw(img.column); + const img = item.message as ImageTask; + setAttributes({ "item.url": img.url }); + try { + const blurhash = await downloadImage(img.id, img.url); + const ret: Image = { id: img.id, source: img.url, blurhash }; - await tx.execute(sql` + const table = sql.raw(img.table); + const column = sql.raw(img.column); + + await tx.execute(sql` update ${table} set ${column} = ${ret} where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)} `); - await tx.delete(mqueue).where(eq(mqueue.id, item.id)); - } catch (err: any) { - const span = getCurrentSpan(); - if (span) { - span.recordException(err); - span.setStatus({ code: SpanStatusCode.ERROR }); - } - console.error("Failed to download image", img.url, err.message); - await tx - .update(mqueue) - .set({ attempt: sql`${mqueue.attempt}+1` }) - .where(eq(mqueue.id, item.id)); + await tx.delete(mqueue).where(eq(mqueue.id, item.id)); + } catch (err: any) { + const span = getCurrentSpan(); + if (span) { + span.recordException(err); + span.setStatus({ code: SpanStatusCode.ERROR }); } - return true; - }); + console.error("Failed to download image", img.url, err.message); + await tx + .update(mqueue) + .set({ attempt: sql`${mqueue.attempt}+1` }) + .where(eq(mqueue.id, item.id)); + } + return true; }); -} +}); async function downloadImage(id: string, url: string): Promise { const low = await getFile(path.join(imageDir, `${id}.low.jpg`)) diff --git a/api/src/controllers/seed/insert/collection.ts b/api/src/controllers/seed/insert/collection.ts index 063bcc81..b804005e 100644 --- a/api/src/controllers/seed/insert/collection.ts +++ b/api/src/controllers/seed/insert/collection.ts @@ -1,86 +1,93 @@ import { sql } from "drizzle-orm"; import { db } from "~/db"; import { shows, showTranslations } from "~/db/schema"; -import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; +import { conflictUpdateAllExcept } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type ShowTrans = typeof showTranslations.$inferInsert; -export const insertCollection = async ( - collection: SeedCollection | undefined, - show: (({ kind: "movie" } & SeedMovie) | ({ kind: "serie" } & SeedSerie)) & { - nextRefresh: string; +export const insertCollection = record( + "insertCollection", + async ( + collection: SeedCollection | undefined, + show: ( + | ({ kind: "movie" } & SeedMovie) + | ({ kind: "serie" } & SeedSerie) + ) & { + nextRefresh: string; + }, + ) => { + if (!collection) return null; + const { translations, ...col } = collection; + + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const [ret] = await tx + .insert(shows) + .values({ + kind: "collection", + status: "unknown", + startAir: show.kind === "movie" ? show.airDate : show.startAir, + endAir: show.kind === "movie" ? show.airDate : show.endAir, + nextRefresh: show.nextRefresh, + entriesCount: 0, + original: {} as any, + ...col, + }) + .onConflictDoUpdate({ + target: shows.slug, + set: { + ...conflictUpdateAllExcept(shows, [ + "pk", + "id", + "slug", + "createdAt", + "startAir", + "endAir", + ]), + startAir: sql`least(${shows.startAir}, excluded.start_air)`, + endAir: sql`greatest(${shows.endAir}, excluded.end_air)`, + }, + }) + .returning({ pk: shows.pk, id: shows.id, slug: shows.slug }); + + const trans: ShowTrans[] = Object.entries(translations).map( + ([lang, tr]) => ({ + pk: ret.pk, + language: lang, + ...tr, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: showTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: showTranslations.thumbnail, + }), + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: showTranslations.logo, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: showTranslations.banner, + }), + }), + ); + await flushImageQueue(tx, imgQueue, 100); + // we can't unnest values here because show translations contains arrays. + await tx + .insert(showTranslations) + .values(trans) + .onConflictDoUpdate({ + target: [showTranslations.pk, showTranslations.language], + set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), + }); + return ret; + }); }, -) => { - if (!collection) return null; - const { translations, ...col } = collection; - - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const [ret] = await tx - .insert(shows) - .values({ - kind: "collection", - status: "unknown", - startAir: show.kind === "movie" ? show.airDate : show.startAir, - endAir: show.kind === "movie" ? show.airDate : show.endAir, - nextRefresh: show.nextRefresh, - entriesCount: 0, - original: {} as any, - ...col, - }) - .onConflictDoUpdate({ - target: shows.slug, - set: { - ...conflictUpdateAllExcept(shows, [ - "pk", - "id", - "slug", - "createdAt", - "startAir", - "endAir", - ]), - startAir: sql`least(${shows.startAir}, excluded.start_air)`, - endAir: sql`greatest(${shows.endAir}, excluded.end_air)`, - }, - }) - .returning({ pk: shows.pk, id: shows.id, slug: shows.slug }); - - const trans: ShowTrans[] = Object.entries(translations).map( - ([lang, tr]) => ({ - pk: ret.pk, - language: lang, - ...tr, - poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: showTranslations.poster, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: showTranslations.thumbnail, - }), - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: showTranslations.logo, - }), - banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: showTranslations.banner, - }), - }), - ); - await flushImageQueue(tx, imgQueue, 100); - // we can't unnest values here because show translations contains arrays. - await tx - .insert(showTranslations) - .values(trans) - .onConflictDoUpdate({ - target: [showTranslations.pk, showTranslations.language], - set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), - }); - return ret; - }); -}; +); diff --git a/api/src/controllers/seed/insert/entries.ts b/api/src/controllers/seed/insert/entries.ts index 34f8d1f1..afff7c57 100644 --- a/api/src/controllers/seed/insert/entries.ts +++ b/api/src/controllers/seed/insert/entries.ts @@ -8,6 +8,7 @@ import { } from "~/db/schema"; import { conflictUpdateAllExcept, unnest, unnestValues } from "~/db/utils"; import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; import { updateAvailableCount, updateAvailableSince } from "./shows"; @@ -42,161 +43,164 @@ const generateSlug = ( } }; -export const insertEntries = async ( - show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" }, - items: (SeedEntry | SeedExtra)[], - onlyExtras = false, -) => { - if (!items.length) return []; +export const insertEntries = record( + "insertEntries", + async ( + show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" }, + items: (SeedEntry | SeedExtra)[], + onlyExtras = false, + ) => { + if (!items.length) return []; - const retEntries = await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const vals: EntryI[] = items.map((seed) => { - const { translations, videos, video, ...entry } = seed; - return { - ...entry, - showPk: show.pk, - slug: generateSlug(show.slug, seed), - thumbnail: enqueueOptImage(imgQueue, { - url: seed.thumbnail, - column: entries.thumbnail, - }), - nextRefresh: - entry.kind !== "extra" - ? guessNextRefresh(entry.airDate ?? new Date()) - : guessNextRefresh(new Date()), - episodeNumber: - entry.kind === "episode" - ? entry.episodeNumber - : entry.kind === "special" - ? entry.number - : undefined, - }; - }); - const ret = await tx - .insert(entries) - .select(unnestValues(vals, entries)) - .onConflictDoUpdate({ - target: entries.slug, - set: conflictUpdateAllExcept(entries, [ - "pk", - "showPk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: entries.pk, id: entries.id, slug: entries.slug }); - - const trans: EntryTransI[] = items.flatMap((seed, i) => { - if (seed.kind === "extra") { - return [ - { - pk: ret[i].pk, - // yeah we hardcode the language to extra because if we want to support - // translations one day it won't be awkward - language: "extra", - name: seed.name, - description: null, - poster: undefined, - }, - ]; - } - - return Object.entries(seed.translations).map(([lang, tr]) => ({ - // assumes ret is ordered like items. - pk: ret[i].pk, - language: lang, - ...tr, - poster: - seed.kind === "movie" - ? enqueueOptImage(imgQueue, { - url: (tr as any).poster, - column: entryTranslations.poster, - }) - : undefined, - })); - }); - await flushImageQueue(tx, imgQueue, 0); - await tx - .insert(entryTranslations) - .select(unnestValues(trans, entryTranslations)) - .onConflictDoUpdate({ - target: [entryTranslations.pk, entryTranslations.language], - set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]), + const retEntries = await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const vals: EntryI[] = items.map((seed) => { + const { translations, videos, video, ...entry } = seed; + return { + ...entry, + showPk: show.pk, + slug: generateSlug(show.slug, seed), + thumbnail: enqueueOptImage(imgQueue, { + url: seed.thumbnail, + column: entries.thumbnail, + }), + nextRefresh: + entry.kind !== "extra" + ? guessNextRefresh(entry.airDate ?? new Date()) + : guessNextRefresh(new Date()), + episodeNumber: + entry.kind === "episode" + ? entry.episodeNumber + : entry.kind === "special" + ? entry.number + : undefined, + }; }); + const ret = await tx + .insert(entries) + .select(unnestValues(vals, entries)) + .onConflictDoUpdate({ + target: entries.slug, + set: conflictUpdateAllExcept(entries, [ + "pk", + "showPk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: entries.pk, id: entries.id, slug: entries.slug }); - return ret; - }); + const trans: EntryTransI[] = items.flatMap((seed, i) => { + if (seed.kind === "extra") { + return [ + { + pk: ret[i].pk, + // yeah we hardcode the language to extra because if we want to support + // translations one day it won't be awkward + language: "extra", + name: seed.name, + description: null, + poster: undefined, + }, + ]; + } - const vids = items.flatMap((seed, i) => { - if (seed.kind === "extra") { - return { - videoId: seed.video, + return Object.entries(seed.translations).map(([lang, tr]) => ({ + // assumes ret is ordered like items. + pk: ret[i].pk, + language: lang, + ...tr, + poster: + seed.kind === "movie" + ? enqueueOptImage(imgQueue, { + url: (tr as any).poster, + column: entryTranslations.poster, + }) + : undefined, + })); + }); + await flushImageQueue(tx, imgQueue, 0); + await tx + .insert(entryTranslations) + .select(unnestValues(trans, entryTranslations)) + .onConflictDoUpdate({ + target: [entryTranslations.pk, entryTranslations.language], + set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]), + }); + + return ret; + }); + + const vids = items.flatMap((seed, i) => { + if (seed.kind === "extra") { + return { + videoId: seed.video, + entryPk: retEntries[i].pk, + entrySlug: retEntries[i].slug, + needRendering: false, + }; + } + if (!seed.videos) return []; + return seed.videos.map((x, j) => ({ + videoId: x, entryPk: retEntries[i].pk, entrySlug: retEntries[i].slug, - needRendering: false, - }; + // The first video should not have a rendering. + needRendering: j !== 0 && seed.videos!.length > 1, + })); + }); + + if (vids.length === 0) { + // we have not added videos but we need to update the `entriesCount` + if (show.kind === "serie" && !onlyExtras) + await updateAvailableCount(db, [show.pk], true); + return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] })); } - if (!seed.videos) return []; - return seed.videos.map((x, j) => ({ - videoId: x, - entryPk: retEntries[i].pk, - entrySlug: retEntries[i].slug, - // The first video should not have a rendering. - needRendering: j !== 0 && seed.videos!.length > 1, + + const retVideos = await db.transaction(async (tx) => { + const ret = await tx + .insert(entryVideoJoin) + .select( + db + .select({ + entryPk: sql`vids."entryPk"`.as("entry"), + videoPk: videos.pk, + slug: computeVideoSlug( + sql`vids."entrySlug"`, + sql`vids."needRendering"`, + ), + }) + .from( + unnest(vids, "vids", { + entryPk: "integer", + entrySlug: "varchar(255)", + needRendering: "boolean", + videoId: "uuid", + }), + ) + .innerJoin(videos, eq(videos.id, sql`vids."videoId"`)), + ) + .onConflictDoNothing() + .returning({ + slug: entryVideoJoin.slug, + entryPk: entryVideoJoin.entryPk, + }); + + if (!onlyExtras) + await updateAvailableCount(tx, [show.pk], show.kind === "serie"); + + await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]); + return ret; + }); + + return retEntries.map((entry) => ({ + id: entry.id, + slug: entry.slug, + videos: retVideos.filter((x) => x.entryPk === entry.pk), })); - }); - - if (vids.length === 0) { - // we have not added videos but we need to update the `entriesCount` - if (show.kind === "serie" && !onlyExtras) - await updateAvailableCount(db, [show.pk], true); - return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] })); - } - - const retVideos = await db.transaction(async (tx) => { - const ret = await tx - .insert(entryVideoJoin) - .select( - db - .select({ - entryPk: sql`vids."entryPk"`.as("entry"), - videoPk: videos.pk, - slug: computeVideoSlug( - sql`vids."entrySlug"`, - sql`vids."needRendering"`, - ), - }) - .from( - unnest(vids, "vids", { - entryPk: "integer", - entrySlug: "varchar(255)", - needRendering: "boolean", - videoId: "uuid", - }), - ) - .innerJoin(videos, eq(videos.id, sql`vids."videoId"`)), - ) - .onConflictDoNothing() - .returning({ - slug: entryVideoJoin.slug, - entryPk: entryVideoJoin.entryPk, - }); - - if (!onlyExtras) - await updateAvailableCount(tx, [show.pk], show.kind === "serie"); - - await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]); - return ret; - }); - - return retEntries.map((entry) => ({ - id: entry.id, - slug: entry.slug, - videos: retVideos.filter((x) => x.entryPk === entry.pk), - })); -}; + }, +); export function computeVideoSlug(entrySlug: SQL | Column, needsRendering: SQL) { return sql` diff --git a/api/src/controllers/seed/insert/seasons.ts b/api/src/controllers/seed/insert/seasons.ts index 8caf947c..00254c2b 100644 --- a/api/src/controllers/seed/insert/seasons.ts +++ b/api/src/controllers/seed/insert/seasons.ts @@ -2,76 +2,77 @@ import { db } from "~/db"; import { seasons, seasonTranslations } from "~/db/schema"; import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedSeason } from "~/models/season"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; import { guessNextRefresh } from "../refresh"; type SeasonI = typeof seasons.$inferInsert; type SeasonTransI = typeof seasonTranslations.$inferInsert; -export const insertSeasons = async ( - show: { pk: number; slug: string }, - items: SeedSeason[], -) => { - if (!items.length) return []; +export const insertSeasons = record( + "insertSeasons", + async (show: { pk: number; slug: string }, items: SeedSeason[]) => { + if (!items.length) return []; - return db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const vals: SeasonI[] = items.map((x) => { - const { translations, ...season } = x; - return { - ...season, - showPk: show.pk, - slug: - season.seasonNumber === 0 - ? `${show.slug}-specials` - : `${show.slug}-s${season.seasonNumber}`, - nextRefresh: guessNextRefresh(season.startAir ?? new Date()), - }; - }); - const ret = await tx - .insert(seasons) - .select(unnestValues(vals, seasons)) - .onConflictDoUpdate({ - target: seasons.slug, - set: conflictUpdateAllExcept(seasons, [ - "pk", - "showPk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug }); - - const trans: SeasonTransI[] = items.flatMap((seed, i) => - Object.entries(seed.translations).map(([lang, tr]) => ({ - // assumes ret is ordered like items. - pk: ret[i].pk, - language: lang, - ...tr, - poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: seasonTranslations.poster, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: seasonTranslations.thumbnail, - }), - banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: seasonTranslations.banner, - }), - })), - ); - await flushImageQueue(tx, imgQueue, -10); - await tx - .insert(seasonTranslations) - .select(unnestValues(trans, seasonTranslations)) - .onConflictDoUpdate({ - target: [seasonTranslations.pk, seasonTranslations.language], - set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]), + return db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const vals: SeasonI[] = items.map((x) => { + const { translations, ...season } = x; + return { + ...season, + showPk: show.pk, + slug: + season.seasonNumber === 0 + ? `${show.slug}-specials` + : `${show.slug}-s${season.seasonNumber}`, + nextRefresh: guessNextRefresh(season.startAir ?? new Date()), + }; }); + const ret = await tx + .insert(seasons) + .select(unnestValues(vals, seasons)) + .onConflictDoUpdate({ + target: seasons.slug, + set: conflictUpdateAllExcept(seasons, [ + "pk", + "showPk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug }); - return ret; - }); -}; + const trans: SeasonTransI[] = items.flatMap((seed, i) => + Object.entries(seed.translations).map(([lang, tr]) => ({ + // assumes ret is ordered like items. + pk: ret[i].pk, + language: lang, + ...tr, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: seasonTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: seasonTranslations.thumbnail, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: seasonTranslations.banner, + }), + })), + ); + await flushImageQueue(tx, imgQueue, -10); + await tx + .insert(seasonTranslations) + .select(unnestValues(trans, seasonTranslations)) + .onConflictDoUpdate({ + target: [seasonTranslations.pk, seasonTranslations.language], + set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]), + }); + + return ret; + }); + }, +); diff --git a/api/src/controllers/seed/insert/shows.ts b/api/src/controllers/seed/insert/shows.ts index cf59f9a0..a1fc5d19 100644 --- a/api/src/controllers/seed/insert/shows.ts +++ b/api/src/controllers/seed/insert/shows.ts @@ -16,94 +16,98 @@ import { shows, showTranslations, } from "~/db/schema"; -import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; +import { conflictUpdateAllExcept, sqlarr } from "~/db/utils"; import type { SeedCollection } from "~/models/collections"; import type { SeedMovie } from "~/models/movie"; import type { SeedSerie } from "~/models/serie"; import type { Original } from "~/models/utils"; +import { record } from "~/otel"; import { getYear } from "~/utils"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type Show = typeof shows.$inferInsert; type ShowTrans = typeof showTranslations.$inferInsert; -export const insertShow = async ( - show: Omit, - original: Original & { - poster: string | null; - thumbnail: string | null; - banner: string | null; - logo: string | null; - }, - translations: - | SeedMovie["translations"] - | SeedSerie["translations"] - | SeedCollection["translations"], -) => { - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const orig = { - ...original, - poster: enqueueOptImage(imgQueue, { - url: original.poster, - table: shows, - column: sql`${shows.original}['poster']`, - }), - thumbnail: enqueueOptImage(imgQueue, { - url: original.thumbnail, - table: shows, - column: sql`${shows.original}['thumbnail']`, - }), - banner: enqueueOptImage(imgQueue, { - url: original.banner, - table: shows, - column: sql`${shows.original}['banner']`, - }), - logo: enqueueOptImage(imgQueue, { - url: original.logo, - table: shows, - column: sql`${shows.original}['logo']`, - }), - }; - const ret = await insertBaseShow(tx, { ...show, original: orig }); - if ("status" in ret) return ret; - - const trans: ShowTrans[] = Object.entries(translations).map( - ([lang, tr]) => ({ - pk: ret.pk, - language: lang, - ...tr, - latinName: tr.latinName ?? null, +export const insertShow = record( + "insertShow", + async ( + show: Omit, + original: Original & { + poster: string | null; + thumbnail: string | null; + banner: string | null; + logo: string | null; + }, + translations: + | SeedMovie["translations"] + | SeedSerie["translations"] + | SeedCollection["translations"], + ) => { + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const orig = { + ...original, poster: enqueueOptImage(imgQueue, { - url: tr.poster, - column: showTranslations.poster, + url: original.poster, + table: shows, + column: sql`${shows.original}['poster']`, }), thumbnail: enqueueOptImage(imgQueue, { - url: tr.thumbnail, - column: showTranslations.thumbnail, - }), - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: showTranslations.logo, + url: original.thumbnail, + table: shows, + column: sql`${shows.original}['thumbnail']`, }), banner: enqueueOptImage(imgQueue, { - url: tr.banner, - column: showTranslations.banner, + url: original.banner, + table: shows, + column: sql`${shows.original}['banner']`, }), - }), - ); - await flushImageQueue(tx, imgQueue, 200); - // we can't unnest values here because show translations contains arrays. - await tx - .insert(showTranslations) - .values(trans) - .onConflictDoUpdate({ - target: [showTranslations.pk, showTranslations.language], - set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), - }); - return ret; - }); -}; + logo: enqueueOptImage(imgQueue, { + url: original.logo, + table: shows, + column: sql`${shows.original}['logo']`, + }), + }; + const ret = await insertBaseShow(tx, { ...show, original: orig }); + if ("status" in ret) return ret; + + const trans: ShowTrans[] = Object.entries(translations).map( + ([lang, tr]) => ({ + pk: ret.pk, + language: lang, + ...tr, + latinName: tr.latinName ?? null, + poster: enqueueOptImage(imgQueue, { + url: tr.poster, + column: showTranslations.poster, + }), + thumbnail: enqueueOptImage(imgQueue, { + url: tr.thumbnail, + column: showTranslations.thumbnail, + }), + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: showTranslations.logo, + }), + banner: enqueueOptImage(imgQueue, { + url: tr.banner, + column: showTranslations.banner, + }), + }), + ); + await flushImageQueue(tx, imgQueue, 200); + // we can't unnest values here because show translations contains arrays. + await tx + .insert(showTranslations) + .values(trans) + .onConflictDoUpdate({ + target: [showTranslations.pk, showTranslations.language], + set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), + }); + return ret; + }); + }, +); async function insertBaseShow(tx: Transaction, show: Show) { function insert() { diff --git a/api/src/controllers/seed/insert/staff.ts b/api/src/controllers/seed/insert/staff.ts index 8ebb6be9..837bf74b 100644 --- a/api/src/controllers/seed/insert/staff.ts +++ b/api/src/controllers/seed/insert/staff.ts @@ -3,55 +3,61 @@ import { db } from "~/db"; import { roles, staff } from "~/db/schema"; import { conflictUpdateAllExcept, unnestValues } from "~/db/utils"; import type { SeedStaff } from "~/models/staff"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; -export const insertStaff = async ( - seed: SeedStaff[] | undefined, - showPk: number, -) => { - if (!seed?.length) return []; +export const insertStaff = record( + "insertStaff", + async (seed: SeedStaff[] | undefined, showPk: number) => { + if (!seed?.length) return []; - return await db.transaction(async (tx) => { - const imgQueue: ImageTask[] = []; - const people = seed.map((x) => ({ - ...x.staff, - image: enqueueOptImage(imgQueue, { - url: x.staff.image, - column: staff.image, - }), - })); - const ret = await tx - .insert(staff) - .select(unnestValues(people, staff)) - .onConflictDoUpdate({ - target: staff.slug, - set: conflictUpdateAllExcept(staff, ["pk", "id", "slug", "createdAt"]), - }) - .returning({ pk: staff.pk, id: staff.id, slug: staff.slug }); - - const rval = seed.map((x, i) => ({ - showPk, - staffPk: ret[i].pk, - kind: x.kind, - order: i, - character: { - ...x.character, + return await db.transaction(async (tx) => { + const imgQueue: ImageTask[] = []; + const people = seed.map((x) => ({ + ...x.staff, image: enqueueOptImage(imgQueue, { - url: x.character.image, - table: roles, - column: sql`${roles.character}['image']`, + url: x.staff.image, + column: staff.image, }), - }, - })); + })); + const ret = await tx + .insert(staff) + .select(unnestValues(people, staff)) + .onConflictDoUpdate({ + target: staff.slug, + set: conflictUpdateAllExcept(staff, [ + "pk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: staff.pk, id: staff.id, slug: staff.slug }); - await flushImageQueue(tx, imgQueue, -200); + const rval = seed.map((x, i) => ({ + showPk, + staffPk: ret[i].pk, + kind: x.kind, + order: i, + character: { + ...x.character, + image: enqueueOptImage(imgQueue, { + url: x.character.image, + table: roles, + column: sql`${roles.character}['image']`, + }), + }, + })); - // always replace all roles. this is because: - // - we want `order` to stay in sync (& without duplicates) - // - we don't have ways to identify a role so we can't onConflict - await tx.delete(roles).where(eq(roles.showPk, showPk)); - await tx.insert(roles).select(unnestValues(rval, roles)); + await flushImageQueue(tx, imgQueue, -200); - return ret; - }); -}; + // always replace all roles. this is because: + // - we want `order` to stay in sync (& without duplicates) + // - we don't have ways to identify a role so we can't onConflict + await tx.delete(roles).where(eq(roles.showPk, showPk)); + await tx.insert(roles).select(unnestValues(rval, roles)); + + return ret; + }); + }, +); diff --git a/api/src/controllers/seed/insert/studios.ts b/api/src/controllers/seed/insert/studios.ts index f15c4eb8..2906aeb8 100644 --- a/api/src/controllers/seed/insert/studios.ts +++ b/api/src/controllers/seed/insert/studios.ts @@ -3,69 +3,72 @@ import { db } from "~/db"; import { showStudioJoin, studios, studioTranslations } from "~/db/schema"; import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils"; import type { SeedStudio } from "~/models/studio"; +import { record } from "~/otel"; import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images"; type StudioI = typeof studios.$inferInsert; type StudioTransI = typeof studioTranslations.$inferInsert; -export const insertStudios = async ( - seed: SeedStudio[] | undefined, - showPk: number, -) => { - if (!seed?.length) return []; +export const insertStudios = record( + "insertStudios", + async (seed: SeedStudio[] | undefined, showPk: number) => { + if (!seed?.length) return []; - return await db.transaction(async (tx) => { - const vals: StudioI[] = seed.map((x) => { - const { translations, ...item } = x; - return item; - }); - - const ret = await tx - .insert(studios) - .select(unnestValues(vals, studios)) - .onConflictDoUpdate({ - target: studios.slug, - set: conflictUpdateAllExcept(studios, [ - "pk", - "id", - "slug", - "createdAt", - ]), - }) - .returning({ pk: studios.pk, id: studios.id, slug: studios.slug }); - - const imgQueue: ImageTask[] = []; - const trans: StudioTransI[] = seed.flatMap((x, i) => - Object.entries(x.translations).map(([lang, tr]) => ({ - pk: ret[i].pk, - language: lang, - name: tr.name, - logo: enqueueOptImage(imgQueue, { - url: tr.logo, - column: studioTranslations.logo, - }), - })), - ); - await flushImageQueue(tx, imgQueue, -100); - await tx - .insert(studioTranslations) - .select(unnestValues(trans, studioTranslations)) - .onConflictDoUpdate({ - target: [studioTranslations.pk, studioTranslations.language], - set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]), + return await db.transaction(async (tx) => { + const vals: StudioI[] = seed.map((x) => { + const { translations, ...item } = x; + return item; }); - await tx - .insert(showStudioJoin) - .select( - db - .select({ - showPk: sql`${showPk}`.as("showPk"), - studioPk: sql`v."studioPk"`.as("studioPk"), - }) - .from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`), - ) - .onConflictDoNothing(); - return ret; - }); -}; + const ret = await tx + .insert(studios) + .select(unnestValues(vals, studios)) + .onConflictDoUpdate({ + target: studios.slug, + set: conflictUpdateAllExcept(studios, [ + "pk", + "id", + "slug", + "createdAt", + ]), + }) + .returning({ pk: studios.pk, id: studios.id, slug: studios.slug }); + + const imgQueue: ImageTask[] = []; + const trans: StudioTransI[] = seed.flatMap((x, i) => + Object.entries(x.translations).map(([lang, tr]) => ({ + pk: ret[i].pk, + language: lang, + name: tr.name, + logo: enqueueOptImage(imgQueue, { + url: tr.logo, + column: studioTranslations.logo, + }), + })), + ); + await flushImageQueue(tx, imgQueue, -100); + await tx + .insert(studioTranslations) + .select(unnestValues(trans, studioTranslations)) + .onConflictDoUpdate({ + target: [studioTranslations.pk, studioTranslations.language], + set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]), + }); + + await tx + .insert(showStudioJoin) + .select( + db + .select({ + showPk: sql`${showPk}`.as("showPk"), + studioPk: sql`v."studioPk"`.as("studioPk"), + }) + .from( + sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`, + ), + ) + .onConflictDoNothing(); + return ret; + }); + }, +); diff --git a/api/src/db/index.ts b/api/src/db/index.ts index c7132116..95fc853e 100644 --- a/api/src/db/index.ts +++ b/api/src/db/index.ts @@ -1,12 +1,12 @@ import os from "node:os"; import path from "node:path"; import tls, { type ConnectionOptions } from "node:tls"; -import { record } from "@elysiajs/opentelemetry"; import { instrumentDrizzleClient } from "@kubiks/otel-drizzle"; import { sql } from "drizzle-orm"; import { drizzle } from "drizzle-orm/node-postgres"; import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator"; import type { PoolConfig } from "pg"; +import { record } from "~/otel"; import * as schema from "./schema"; const config: PoolConfig = { @@ -122,26 +122,24 @@ instrumentDrizzleClient(db, { maxQueryTextLength: 100_000_000, }); -export const migrate = async () => { - return record("migrate", async () => { - try { - await db.execute( - sql.raw(` - create extension if not exists pg_trgm; - set pg_trgm.word_similarity_threshold = 0.4; - alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; - `), - ); - } catch (err: any) { - console.error("Error while updating pg_trgm", err.message); - } - await migrateDb(db, { - migrationsSchema: "kyoo", - migrationsFolder: "./drizzle", - }); - console.log(`Database ${postgresConfig.database} migrated!`); +export const migrate = record("migrate", async () => { + try { + await db.execute( + sql.raw(` + create extension if not exists pg_trgm; + set pg_trgm.word_similarity_threshold = 0.4; + alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4; + `), + ); + } catch (err: any) { + console.error("Error while updating pg_trgm", err.message); + } + await migrateDb(db, { + migrationsSchema: "kyoo", + migrationsFolder: "./drizzle", }); -}; + console.log(`Database ${postgresConfig.database} migrated!`); +}); export type Transaction = | typeof db diff --git a/api/src/otel.ts b/api/src/otel.ts index 1875b823..7dde8c37 100644 --- a/api/src/otel.ts +++ b/api/src/otel.ts @@ -1,4 +1,4 @@ -import { opentelemetry } from "@elysiajs/opentelemetry"; +import { record as elysiaRecord, opentelemetry } from "@elysiajs/opentelemetry"; import { OTLPMetricExporter as GrpcMetricExporter } from "@opentelemetry/exporter-metrics-otlp-grpc"; import { OTLPMetricExporter as HttpMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto"; import { OTLPTraceExporter as GrpcTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc"; @@ -32,3 +32,12 @@ export const otel = new Elysia() }), ) .as("global"); + +export function record any>( + spanName: string, + fn: T, +): T { + const wrapped = (...args: Parameters) => + elysiaRecord(spanName, () => fn(...args)); + return wrapped as T; +} diff --git a/api/tests/misc/images.test.ts b/api/tests/misc/images.test.ts index 1a81d224..e3d73c68 100644 --- a/api/tests/misc/images.test.ts +++ b/api/tests/misc/images.test.ts @@ -33,7 +33,7 @@ describe("images", () => { it("Download 404 image", async () => { await db.delete(mqueue); - const url404 = "https://mockhttp.org/status/404"; + const url404 = "https://mockhttp.org/status/404"; const [ret, body] = await createMovie({ ...dune, translations: {