Use unnest in insertion methods (#1185)

This commit is contained in:
2025-11-28 17:28:11 +01:00
committed by GitHub
13 changed files with 700 additions and 538 deletions

View File

@@ -1,5 +1,5 @@
import path from "node:path";
import { getCurrentSpan, record, setAttributes } from "@elysiajs/opentelemetry";
import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry";
import { SpanStatusCode } from "@opentelemetry/api";
import { encode } from "blurhash";
import { and, eq, is, lt, type SQL, sql } from "drizzle-orm";
@@ -9,7 +9,9 @@ import type { PoolClient } from "pg";
import sharp from "sharp";
import { db, type Transaction } from "~/db";
import { mqueue } from "~/db/schema/mqueue";
import { unnestValues } from "~/db/utils";
import type { Image } from "~/models/utils";
import { record } from "~/otel";
import { getFile } from "~/utils";
export const imageDir = process.env.IMAGES_PATH ?? "/images";
@@ -76,91 +78,87 @@ export const enqueueOptImage = (
};
};
export const flushImageQueue = async (
tx: Transaction,
imgQueue: ImageTask[],
priority: number,
) => {
if (!imgQueue.length) return;
record("enqueue images", async () => {
await tx
.insert(mqueue)
.values(imgQueue.map((x) => ({ kind: "image", message: x, priority })));
export const flushImageQueue = record(
"enqueueImages",
async (tx: Transaction, imgQueue: ImageTask[], priority: number) => {
if (!imgQueue.length) return;
await tx.insert(mqueue).select(
unnestValues(
imgQueue.map((x) => ({ kind: "image", message: x, priority })),
mqueue,
),
);
await tx.execute(sql`notify kyoo_image`);
});
};
},
);
export const processImages = async () => {
return record("download images", async () => {
let running = false;
async function processAll() {
if (running) return;
running = true;
export const processImages = record("processImages", async () => {
let running = false;
async function processAll() {
if (running) return;
running = true;
let found = true;
while (found) {
found = await processOne();
}
running = false;
let found = true;
while (found) {
found = await processOne();
}
running = false;
}
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
processAll();
});
await client.query("listen kyoo_image");
// start processing old tasks
await processAll();
return () => client.release(true);
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
processAll();
});
};
await client.query("listen kyoo_image");
async function processOne() {
return record("download", async () => {
return await db.transaction(async (tx) => {
const [item] = await tx
.select()
.from(mqueue)
.for("update", { skipLocked: true })
.where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5)))
.orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt)
.limit(1);
// start processing old tasks
await processAll();
return () => client.release(true);
});
if (!item) return false;
const processOne = record("download", async () => {
return await db.transaction(async (tx) => {
const [item] = await tx
.select()
.from(mqueue)
.for("update", { skipLocked: true })
.where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5)))
.orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt)
.limit(1);
const img = item.message as ImageTask;
setAttributes({ "item.url": img.url });
try {
const blurhash = await downloadImage(img.id, img.url);
const ret: Image = { id: img.id, source: img.url, blurhash };
if (!item) return false;
const table = sql.raw(img.table);
const column = sql.raw(img.column);
const img = item.message as ImageTask;
setAttributes({ "item.url": img.url });
try {
const blurhash = await downloadImage(img.id, img.url);
const ret: Image = { id: img.id, source: img.url, blurhash };
await tx.execute(sql`
const table = sql.raw(img.table);
const column = sql.raw(img.column);
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)}
`);
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
} catch (err: any) {
const span = getCurrentSpan();
if (span) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
console.error("Failed to download image", img.url, err.message);
await tx
.update(mqueue)
.set({ attempt: sql`${mqueue.attempt}+1` })
.where(eq(mqueue.id, item.id));
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
} catch (err: any) {
const span = getCurrentSpan();
if (span) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
return true;
});
console.error("Failed to download image", img.url, err.message);
await tx
.update(mqueue)
.set({ attempt: sql`${mqueue.attempt}+1` })
.where(eq(mqueue.id, item.id));
}
return true;
});
}
});
async function downloadImage(id: string, url: string): Promise<string> {
const low = await getFile(path.join(imageDir, `${id}.low.jpg`))

View File

@@ -5,81 +5,89 @@ import { conflictUpdateAllExcept } from "~/db/utils";
import type { SeedCollection } from "~/models/collections";
import type { SeedMovie } from "~/models/movie";
import type { SeedSerie } from "~/models/serie";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type ShowTrans = typeof showTranslations.$inferInsert;
export const insertCollection = async (
collection: SeedCollection | undefined,
show: (({ kind: "movie" } & SeedMovie) | ({ kind: "serie" } & SeedSerie)) & {
nextRefresh: string;
export const insertCollection = record(
"insertCollection",
async (
collection: SeedCollection | undefined,
show: (
| ({ kind: "movie" } & SeedMovie)
| ({ kind: "serie" } & SeedSerie)
) & {
nextRefresh: string;
},
) => {
if (!collection) return null;
const { translations, ...col } = collection;
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const [ret] = await tx
.insert(shows)
.values({
kind: "collection",
status: "unknown",
startAir: show.kind === "movie" ? show.airDate : show.startAir,
endAir: show.kind === "movie" ? show.airDate : show.endAir,
nextRefresh: show.nextRefresh,
entriesCount: 0,
original: {} as any,
...col,
})
.onConflictDoUpdate({
target: shows.slug,
set: {
...conflictUpdateAllExcept(shows, [
"pk",
"id",
"slug",
"createdAt",
"startAir",
"endAir",
]),
startAir: sql`least(${shows.startAir}, excluded.start_air)`,
endAir: sql`greatest(${shows.endAir}, excluded.end_air)`,
},
})
.returning({ pk: shows.pk, id: shows.id, slug: shows.slug });
const trans: ShowTrans[] = Object.entries(translations).map(
([lang, tr]) => ({
pk: ret.pk,
language: lang,
...tr,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: showTranslations.poster,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: showTranslations.logo,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: showTranslations.banner,
}),
}),
);
await flushImageQueue(tx, imgQueue, 100);
// we can't unnest values here because show translations contains arrays.
await tx
.insert(showTranslations)
.values(trans)
.onConflictDoUpdate({
target: [showTranslations.pk, showTranslations.language],
set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]),
});
return ret;
});
},
) => {
if (!collection) return null;
const { translations, ...col } = collection;
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const [ret] = await tx
.insert(shows)
.values({
kind: "collection",
status: "unknown",
startAir: show.kind === "movie" ? show.airDate : show.startAir,
endAir: show.kind === "movie" ? show.airDate : show.endAir,
nextRefresh: show.nextRefresh,
entriesCount: 0,
original: {} as any,
...col,
})
.onConflictDoUpdate({
target: shows.slug,
set: {
...conflictUpdateAllExcept(shows, [
"pk",
"id",
"slug",
"createdAt",
"startAir",
"endAir",
]),
startAir: sql`least(${shows.startAir}, excluded.start_air)`,
endAir: sql`greatest(${shows.endAir}, excluded.end_air)`,
},
})
.returning({ pk: shows.pk, id: shows.id, slug: shows.slug });
const trans: ShowTrans[] = Object.entries(translations).map(
([lang, tr]) => ({
pk: ret.pk,
language: lang,
...tr,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: showTranslations.poster,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: showTranslations.logo,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: showTranslations.banner,
}),
}),
);
await flushImageQueue(tx, imgQueue, 100);
await tx
.insert(showTranslations)
.values(trans)
.onConflictDoUpdate({
target: [showTranslations.pk, showTranslations.language],
set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]),
});
return ret;
});
};
);

View File

@@ -6,8 +6,9 @@ import {
entryVideoJoin,
videos,
} from "~/db/schema";
import { conflictUpdateAllExcept, values } from "~/db/utils";
import { conflictUpdateAllExcept, unnest, unnestValues } from "~/db/utils";
import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
import { guessNextRefresh } from "../refresh";
import { updateAvailableCount, updateAvailableSince } from "./shows";
@@ -42,160 +43,164 @@ const generateSlug = (
}
};
export const insertEntries = async (
show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" },
items: (SeedEntry | SeedExtra)[],
onlyExtras = false,
) => {
if (!items.length) return [];
export const insertEntries = record(
"insertEntries",
async (
show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" },
items: (SeedEntry | SeedExtra)[],
onlyExtras = false,
) => {
if (!items.length) return [];
const retEntries = await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const vals: EntryI[] = items.map((seed) => {
const { translations, videos, video, ...entry } = seed;
return {
...entry,
showPk: show.pk,
slug: generateSlug(show.slug, seed),
thumbnail: enqueueOptImage(imgQueue, {
url: seed.thumbnail,
column: entries.thumbnail,
}),
nextRefresh:
entry.kind !== "extra"
? guessNextRefresh(entry.airDate ?? new Date())
: guessNextRefresh(new Date()),
episodeNumber:
entry.kind === "episode"
? entry.episodeNumber
: entry.kind === "special"
? entry.number
: undefined,
};
});
const ret = await tx
.insert(entries)
.values(vals)
.onConflictDoUpdate({
target: entries.slug,
set: conflictUpdateAllExcept(entries, [
"pk",
"showPk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: entries.pk, id: entries.id, slug: entries.slug });
const trans: EntryTransI[] = items.flatMap((seed, i) => {
if (seed.kind === "extra") {
return [
{
pk: ret[i].pk,
// yeah we hardcode the language to extra because if we want to support
// translations one day it won't be awkward
language: "extra",
name: seed.name,
description: null,
poster: undefined,
},
];
}
return Object.entries(seed.translations).map(([lang, tr]) => ({
// assumes ret is ordered like items.
pk: ret[i].pk,
language: lang,
...tr,
poster:
seed.kind === "movie"
? enqueueOptImage(imgQueue, {
url: (tr as any).poster,
column: entryTranslations.poster,
})
: undefined,
}));
});
await flushImageQueue(tx, imgQueue, 0);
await tx
.insert(entryTranslations)
.values(trans)
.onConflictDoUpdate({
target: [entryTranslations.pk, entryTranslations.language],
set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]),
const retEntries = await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const vals: EntryI[] = items.map((seed) => {
const { translations, videos, video, ...entry } = seed;
return {
...entry,
showPk: show.pk,
slug: generateSlug(show.slug, seed),
thumbnail: enqueueOptImage(imgQueue, {
url: seed.thumbnail,
column: entries.thumbnail,
}),
nextRefresh:
entry.kind !== "extra"
? guessNextRefresh(entry.airDate ?? new Date())
: guessNextRefresh(new Date()),
episodeNumber:
entry.kind === "episode"
? entry.episodeNumber
: entry.kind === "special"
? entry.number
: undefined,
};
});
const ret = await tx
.insert(entries)
.select(unnestValues(vals, entries))
.onConflictDoUpdate({
target: entries.slug,
set: conflictUpdateAllExcept(entries, [
"pk",
"showPk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: entries.pk, id: entries.id, slug: entries.slug });
return ret;
});
const trans: EntryTransI[] = items.flatMap((seed, i) => {
if (seed.kind === "extra") {
return [
{
pk: ret[i].pk,
// yeah we hardcode the language to extra because if we want to support
// translations one day it won't be awkward
language: "extra",
name: seed.name,
description: null,
poster: undefined,
},
];
}
const vids = items.flatMap((seed, i) => {
if (seed.kind === "extra") {
return {
videoId: seed.video,
return Object.entries(seed.translations).map(([lang, tr]) => ({
// assumes ret is ordered like items.
pk: ret[i].pk,
language: lang,
...tr,
poster:
seed.kind === "movie"
? enqueueOptImage(imgQueue, {
url: (tr as any).poster,
column: entryTranslations.poster,
})
: undefined,
}));
});
await flushImageQueue(tx, imgQueue, 0);
await tx
.insert(entryTranslations)
.select(unnestValues(trans, entryTranslations))
.onConflictDoUpdate({
target: [entryTranslations.pk, entryTranslations.language],
set: conflictUpdateAllExcept(entryTranslations, ["pk", "language"]),
});
return ret;
});
const vids = items.flatMap((seed, i) => {
if (seed.kind === "extra") {
return {
videoId: seed.video,
entryPk: retEntries[i].pk,
entrySlug: retEntries[i].slug,
needRendering: false,
};
}
if (!seed.videos) return [];
return seed.videos.map((x, j) => ({
videoId: x,
entryPk: retEntries[i].pk,
entrySlug: retEntries[i].slug,
needRendering: false,
};
// The first video should not have a rendering.
needRendering: j !== 0 && seed.videos!.length > 1,
}));
});
if (vids.length === 0) {
// we have not added videos but we need to update the `entriesCount`
if (show.kind === "serie" && !onlyExtras)
await updateAvailableCount(db, [show.pk], true);
return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] }));
}
if (!seed.videos) return [];
return seed.videos.map((x, j) => ({
videoId: x,
entryPk: retEntries[i].pk,
entrySlug: retEntries[i].slug,
// The first video should not have a rendering.
needRendering: j !== 0 && seed.videos!.length > 1,
const retVideos = await db.transaction(async (tx) => {
const ret = await tx
.insert(entryVideoJoin)
.select(
db
.select({
entryPk: sql<number>`vids."entryPk"`.as("entry"),
videoPk: videos.pk,
slug: computeVideoSlug(
sql`vids."entrySlug"`,
sql`vids."needRendering"`,
),
})
.from(
unnest(vids, "vids", {
entryPk: "integer",
entrySlug: "varchar(255)",
needRendering: "boolean",
videoId: "uuid",
}),
)
.innerJoin(videos, eq(videos.id, sql`vids."videoId"`)),
)
.onConflictDoNothing()
.returning({
slug: entryVideoJoin.slug,
entryPk: entryVideoJoin.entryPk,
});
if (!onlyExtras)
await updateAvailableCount(tx, [show.pk], show.kind === "serie");
await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]);
return ret;
});
return retEntries.map((entry) => ({
id: entry.id,
slug: entry.slug,
videos: retVideos.filter((x) => x.entryPk === entry.pk),
}));
});
if (vids.length === 0) {
// we have not added videos but we need to update the `entriesCount`
if (show.kind === "serie" && !onlyExtras)
await updateAvailableCount(db, [show.pk], true);
return retEntries.map((x) => ({ id: x.id, slug: x.slug, videos: [] }));
}
const retVideos = await db.transaction(async (tx) => {
const ret = await tx
.insert(entryVideoJoin)
.select(
db
.select({
entryPk: sql<number>`vids.entryPk`.as("entry"),
videoPk: videos.pk,
slug: computeVideoSlug(
sql`vids.entrySlug`,
sql`vids.needRendering`,
),
})
.from(
values(vids, {
entryPk: "integer",
needRendering: "boolean",
videoId: "uuid",
}).as("vids"),
)
.innerJoin(videos, eq(videos.id, sql`vids.videoId`)),
)
.onConflictDoNothing()
.returning({
slug: entryVideoJoin.slug,
entryPk: entryVideoJoin.entryPk,
});
if (!onlyExtras)
await updateAvailableCount(tx, [show.pk], show.kind === "serie");
await updateAvailableSince(tx, [...new Set(vids.map((x) => x.entryPk))]);
return ret;
});
return retEntries.map((entry) => ({
id: entry.id,
slug: entry.slug,
videos: retVideos.filter((x) => x.entryPk === entry.pk),
}));
};
},
);
export function computeVideoSlug(entrySlug: SQL | Column, needsRendering: SQL) {
return sql<string>`

View File

@@ -1,77 +1,78 @@
import { db } from "~/db";
import { seasons, seasonTranslations } from "~/db/schema";
import { conflictUpdateAllExcept } from "~/db/utils";
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
import type { SeedSeason } from "~/models/season";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
import { guessNextRefresh } from "../refresh";
type SeasonI = typeof seasons.$inferInsert;
type SeasonTransI = typeof seasonTranslations.$inferInsert;
export const insertSeasons = async (
show: { pk: number; slug: string },
items: SeedSeason[],
) => {
if (!items.length) return [];
export const insertSeasons = record(
"insertSeasons",
async (show: { pk: number; slug: string }, items: SeedSeason[]) => {
if (!items.length) return [];
return db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const vals: SeasonI[] = items.map((x) => {
const { translations, ...season } = x;
return {
...season,
showPk: show.pk,
slug:
season.seasonNumber === 0
? `${show.slug}-specials`
: `${show.slug}-s${season.seasonNumber}`,
nextRefresh: guessNextRefresh(season.startAir ?? new Date()),
};
});
const ret = await tx
.insert(seasons)
.values(vals)
.onConflictDoUpdate({
target: seasons.slug,
set: conflictUpdateAllExcept(seasons, [
"pk",
"showPk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug });
const trans: SeasonTransI[] = items.flatMap((seed, i) =>
Object.entries(seed.translations).map(([lang, tr]) => ({
// assumes ret is ordered like items.
pk: ret[i].pk,
language: lang,
...tr,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: seasonTranslations.poster,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: seasonTranslations.thumbnail,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: seasonTranslations.banner,
}),
})),
);
await flushImageQueue(tx, imgQueue, -10);
await tx
.insert(seasonTranslations)
.values(trans)
.onConflictDoUpdate({
target: [seasonTranslations.pk, seasonTranslations.language],
set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]),
return db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const vals: SeasonI[] = items.map((x) => {
const { translations, ...season } = x;
return {
...season,
showPk: show.pk,
slug:
season.seasonNumber === 0
? `${show.slug}-specials`
: `${show.slug}-s${season.seasonNumber}`,
nextRefresh: guessNextRefresh(season.startAir ?? new Date()),
};
});
const ret = await tx
.insert(seasons)
.select(unnestValues(vals, seasons))
.onConflictDoUpdate({
target: seasons.slug,
set: conflictUpdateAllExcept(seasons, [
"pk",
"showPk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug });
return ret;
});
};
const trans: SeasonTransI[] = items.flatMap((seed, i) =>
Object.entries(seed.translations).map(([lang, tr]) => ({
// assumes ret is ordered like items.
pk: ret[i].pk,
language: lang,
...tr,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: seasonTranslations.poster,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: seasonTranslations.thumbnail,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: seasonTranslations.banner,
}),
})),
);
await flushImageQueue(tx, imgQueue, -10);
await tx
.insert(seasonTranslations)
.select(unnestValues(trans, seasonTranslations))
.onConflictDoUpdate({
target: [seasonTranslations.pk, seasonTranslations.language],
set: conflictUpdateAllExcept(seasonTranslations, ["pk", "language"]),
});
return ret;
});
},
);

View File

@@ -21,88 +21,93 @@ import type { SeedCollection } from "~/models/collections";
import type { SeedMovie } from "~/models/movie";
import type { SeedSerie } from "~/models/serie";
import type { Original } from "~/models/utils";
import { record } from "~/otel";
import { getYear } from "~/utils";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type Show = typeof shows.$inferInsert;
type ShowTrans = typeof showTranslations.$inferInsert;
export const insertShow = async (
show: Omit<Show, "original">,
original: Original & {
poster: string | null;
thumbnail: string | null;
banner: string | null;
logo: string | null;
},
translations:
| SeedMovie["translations"]
| SeedSerie["translations"]
| SeedCollection["translations"],
) => {
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const orig = {
...original,
poster: enqueueOptImage(imgQueue, {
url: original.poster,
table: shows,
column: sql`${shows.original}['poster']`,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: original.thumbnail,
table: shows,
column: sql`${shows.original}['thumbnail']`,
}),
banner: enqueueOptImage(imgQueue, {
url: original.banner,
table: shows,
column: sql`${shows.original}['banner']`,
}),
logo: enqueueOptImage(imgQueue, {
url: original.logo,
table: shows,
column: sql`${shows.original}['logo']`,
}),
};
const ret = await insertBaseShow(tx, { ...show, original: orig });
if ("status" in ret) return ret;
const trans: ShowTrans[] = Object.entries(translations).map(
([lang, tr]) => ({
pk: ret.pk,
language: lang,
...tr,
latinName: tr.latinName ?? null,
export const insertShow = record(
"insertShow",
async (
show: Omit<Show, "original">,
original: Original & {
poster: string | null;
thumbnail: string | null;
banner: string | null;
logo: string | null;
},
translations:
| SeedMovie["translations"]
| SeedSerie["translations"]
| SeedCollection["translations"],
) => {
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const orig = {
...original,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: showTranslations.poster,
url: original.poster,
table: shows,
column: sql`${shows.original}['poster']`,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: showTranslations.logo,
url: original.thumbnail,
table: shows,
column: sql`${shows.original}['thumbnail']`,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: showTranslations.banner,
url: original.banner,
table: shows,
column: sql`${shows.original}['banner']`,
}),
}),
);
await flushImageQueue(tx, imgQueue, 200);
await tx
.insert(showTranslations)
.values(trans)
.onConflictDoUpdate({
target: [showTranslations.pk, showTranslations.language],
set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]),
});
return ret;
});
};
logo: enqueueOptImage(imgQueue, {
url: original.logo,
table: shows,
column: sql`${shows.original}['logo']`,
}),
};
const ret = await insertBaseShow(tx, { ...show, original: orig });
if ("status" in ret) return ret;
const trans: ShowTrans[] = Object.entries(translations).map(
([lang, tr]) => ({
pk: ret.pk,
language: lang,
...tr,
latinName: tr.latinName ?? null,
poster: enqueueOptImage(imgQueue, {
url: tr.poster,
column: showTranslations.poster,
}),
thumbnail: enqueueOptImage(imgQueue, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: showTranslations.logo,
}),
banner: enqueueOptImage(imgQueue, {
url: tr.banner,
column: showTranslations.banner,
}),
}),
);
await flushImageQueue(tx, imgQueue, 200);
// we can't unnest values here because show translations contains arrays.
await tx
.insert(showTranslations)
.values(trans)
.onConflictDoUpdate({
target: [showTranslations.pk, showTranslations.language],
set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]),
});
return ret;
});
},
);
async function insertBaseShow(tx: Transaction, show: Show) {
function insert() {

View File

@@ -1,57 +1,63 @@
import { eq, sql } from "drizzle-orm";
import { db } from "~/db";
import { roles, staff } from "~/db/schema";
import { conflictUpdateAllExcept } from "~/db/utils";
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
import type { SeedStaff } from "~/models/staff";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
export const insertStaff = async (
seed: SeedStaff[] | undefined,
showPk: number,
) => {
if (!seed?.length) return [];
export const insertStaff = record(
"insertStaff",
async (seed: SeedStaff[] | undefined, showPk: number) => {
if (!seed?.length) return [];
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const people = seed.map((x) => ({
...x.staff,
image: enqueueOptImage(imgQueue, {
url: x.staff.image,
column: staff.image,
}),
}));
const ret = await tx
.insert(staff)
.values(people)
.onConflictDoUpdate({
target: staff.slug,
set: conflictUpdateAllExcept(staff, ["pk", "id", "slug", "createdAt"]),
})
.returning({ pk: staff.pk, id: staff.id, slug: staff.slug });
const rval = seed.map((x, i) => ({
showPk,
staffPk: ret[i].pk,
kind: x.kind,
order: i,
character: {
...x.character,
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const people = seed.map((x) => ({
...x.staff,
image: enqueueOptImage(imgQueue, {
url: x.character.image,
table: roles,
column: sql`${roles.character}['image']`,
url: x.staff.image,
column: staff.image,
}),
},
}));
}));
const ret = await tx
.insert(staff)
.select(unnestValues(people, staff))
.onConflictDoUpdate({
target: staff.slug,
set: conflictUpdateAllExcept(staff, [
"pk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: staff.pk, id: staff.id, slug: staff.slug });
await flushImageQueue(tx, imgQueue, -200);
const rval = seed.map((x, i) => ({
showPk,
staffPk: ret[i].pk,
kind: x.kind,
order: i,
character: {
...x.character,
image: enqueueOptImage(imgQueue, {
url: x.character.image,
table: roles,
column: sql`${roles.character}['image']`,
}),
},
}));
// always replace all roles. this is because:
// - we want `order` to stay in sync (& without duplicates)
// - we don't have ways to identify a role so we can't onConflict
await tx.delete(roles).where(eq(roles.showPk, showPk));
await tx.insert(roles).values(rval);
await flushImageQueue(tx, imgQueue, -200);
return ret;
});
};
// always replace all roles. this is because:
// - we want `order` to stay in sync (& without duplicates)
// - we don't have ways to identify a role so we can't onConflict
await tx.delete(roles).where(eq(roles.showPk, showPk));
await tx.insert(roles).select(unnestValues(rval, roles));
return ret;
});
},
);

View File

@@ -1,63 +1,74 @@
import { sql } from "drizzle-orm";
import { db } from "~/db";
import { showStudioJoin, studios, studioTranslations } from "~/db/schema";
import { conflictUpdateAllExcept } from "~/db/utils";
import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils";
import type { SeedStudio } from "~/models/studio";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type StudioI = typeof studios.$inferInsert;
type StudioTransI = typeof studioTranslations.$inferInsert;
export const insertStudios = async (
seed: SeedStudio[] | undefined,
showPk: number,
) => {
if (!seed?.length) return [];
export const insertStudios = record(
"insertStudios",
async (seed: SeedStudio[] | undefined, showPk: number) => {
if (!seed?.length) return [];
return await db.transaction(async (tx) => {
const vals: StudioI[] = seed.map((x) => {
const { translations, ...item } = x;
return item;
});
const ret = await tx
.insert(studios)
.values(vals)
.onConflictDoUpdate({
target: studios.slug,
set: conflictUpdateAllExcept(studios, [
"pk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: studios.pk, id: studios.id, slug: studios.slug });
const imgQueue: ImageTask[] = [];
const trans: StudioTransI[] = seed.flatMap((x, i) =>
Object.entries(x.translations).map(([lang, tr]) => ({
pk: ret[i].pk,
language: lang,
name: tr.name,
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: studioTranslations.logo,
}),
})),
);
await flushImageQueue(tx, imgQueue, -100);
await tx
.insert(studioTranslations)
.values(trans)
.onConflictDoUpdate({
target: [studioTranslations.pk, studioTranslations.language],
set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]),
return await db.transaction(async (tx) => {
const vals: StudioI[] = seed.map((x) => {
const { translations, ...item } = x;
return item;
});
await tx
.insert(showStudioJoin)
.values(ret.map((studio) => ({ showPk: showPk, studioPk: studio.pk })))
.onConflictDoNothing();
return ret;
});
};
const ret = await tx
.insert(studios)
.select(unnestValues(vals, studios))
.onConflictDoUpdate({
target: studios.slug,
set: conflictUpdateAllExcept(studios, [
"pk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: studios.pk, id: studios.id, slug: studios.slug });
const imgQueue: ImageTask[] = [];
const trans: StudioTransI[] = seed.flatMap((x, i) =>
Object.entries(x.translations).map(([lang, tr]) => ({
pk: ret[i].pk,
language: lang,
name: tr.name,
logo: enqueueOptImage(imgQueue, {
url: tr.logo,
column: studioTranslations.logo,
}),
})),
);
await flushImageQueue(tx, imgQueue, -100);
await tx
.insert(studioTranslations)
.select(unnestValues(trans, studioTranslations))
.onConflictDoUpdate({
target: [studioTranslations.pk, studioTranslations.language],
set: conflictUpdateAllExcept(studioTranslations, ["pk", "language"]),
});
await tx
.insert(showStudioJoin)
.select(
db
.select({
showPk: sql`${showPk}`.as("showPk"),
studioPk: sql`v."studioPk"`.as("studioPk"),
})
.from(
sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`,
),
)
.onConflictDoNothing();
return ret;
});
},
);

View File

@@ -35,7 +35,8 @@ import {
jsonbBuildObject,
jsonbObjectAgg,
sqlarr,
values,
unnest,
unnestValues,
} from "~/db/utils";
import { Entry } from "~/models/entry";
import { KError } from "~/models/error";
@@ -129,10 +130,10 @@ async function linkVideos(
slug: computeVideoSlug(entriesQ.slug, hasRenderingQ),
})
.from(
values(links, {
unnest(links, "j", {
video: "integer",
entry: "jsonb",
}).as("j"),
}),
)
.innerJoin(videos, eq(videos.pk, sql`j.video`))
.innerJoin(
@@ -835,7 +836,7 @@ export const videosWriteH = new Elysia({ prefix: "/videos", tags: ["videos"] })
try {
vids = await tx
.insert(videos)
.values(body)
.select(unnestValues(body, videos))
.onConflictDoUpdate({
target: [videos.path],
set: conflictUpdateAllExcept(videos, ["pk", "id", "createdAt"]),

View File

@@ -6,6 +6,7 @@ import { sql } from "drizzle-orm";
import { drizzle } from "drizzle-orm/node-postgres";
import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator";
import type { PoolConfig } from "pg";
import { record } from "~/otel";
import * as schema from "./schema";
const config: PoolConfig = {
@@ -112,7 +113,6 @@ const postgresConfig = await parseSslConfig();
// use this when using drizzle-kit since it can't parse await statements
// const postgresConfig = config;
console.log("Connecting to postgres with config", postgresConfig);
export const db = drizzle({
schema,
connection: postgresConfig,
@@ -122,14 +122,14 @@ instrumentDrizzleClient(db, {
maxQueryTextLength: 100_000_000,
});
export const migrate = async () => {
export const migrate = record("migrate", async () => {
try {
await db.execute(
sql.raw(`
create extension if not exists pg_trgm;
set pg_trgm.word_similarity_threshold = 0.4;
alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4;
`),
create extension if not exists pg_trgm;
set pg_trgm.word_similarity_threshold = 0.4;
alter database "${postgresConfig.database}" set pg_trgm.word_similarity_threshold = 0.4;
`),
);
} catch (err: any) {
console.error("Error while updating pg_trgm", err.message);
@@ -139,7 +139,7 @@ export const migrate = async () => {
migrationsFolder: "./drizzle",
});
console.log(`Database ${postgresConfig.database} migrated!`);
};
});
export type Transaction =
| typeof db

View File

@@ -8,12 +8,17 @@ import {
type Subquery,
sql,
Table,
type TableConfig,
View,
ViewBaseConfig,
} from "drizzle-orm";
import type { CasingCache } from "drizzle-orm/casing";
import type { AnyMySqlSelect } from "drizzle-orm/mysql-core";
import type { AnyPgSelect, SelectedFieldsFlat } from "drizzle-orm/pg-core";
import type {
AnyPgSelect,
PgTableWithColumns,
SelectedFieldsFlat,
} from "drizzle-orm/pg-core";
import type { AnySQLiteSelect } from "drizzle-orm/sqlite-core";
import type { WithSubquery } from "drizzle-orm/subquery";
import { db } from "./index";
@@ -69,8 +74,18 @@ export function conflictUpdateAllExcept<
}
// drizzle is bugged and doesn't allow js arrays to be used in raw sql.
export function sqlarr(array: unknown[]) {
return `{${array.map((item) => `"${item}"`).join(",")}}`;
export function sqlarr(array: unknown[]): string {
return `{${array
.map((item) =>
item === "null" || item === null || item === undefined
? "null"
: Array.isArray(item)
? sqlarr(item)
: typeof item === "object"
? `"${JSON.stringify(item).replaceAll("\\", "\\\\").replaceAll('"', '\\"')}"`
: `"${item}"`,
)
.join(", ")}}`;
}
// See https://github.com/drizzle-team/drizzle-orm/issues/4044
@@ -103,6 +118,102 @@ export function values<K extends string>(
};
}
/* goal:
* unnestValues([{a: 1, b: 2}, {a: 3, b: 4}], tbl)
*
* ```sql
* select a, b, now() as updated_at from unnest($1::integer[], $2::integer[]);
* ```
* params:
* $1: [1, 2]
* $2: [3, 4]
*
* select
*/
export const unnestValues = <
T extends Record<string, unknown>,
C extends TableConfig = never,
>(
values: T[],
typeInfo: PgTableWithColumns<C>,
) => {
if (values[0] === undefined)
throw new Error("Invalid values, expecting at least one items");
const columns = getTableColumns(typeInfo);
const keys = Object.keys(values[0]).filter((x) => x in columns);
// @ts-expect-error: drizzle internal
const casing = db.dialect.casing as CasingCache;
const dbNames = Object.fromEntries(
Object.entries(columns).map(([k, v]) => [k, casing.getColumnCasing(v)]),
);
const vals = values.reduce(
(acc, cur, i) => {
for (const k of keys) {
if (k in cur) acc[k].push(cur[k]);
else acc[k].push(null);
}
for (const k of Object.keys(cur)) {
if (k in acc) continue;
if (!(k in columns)) continue;
keys.push(k);
acc[k] = new Array(i).fill(null);
acc[k].push(cur[k]);
}
return acc;
},
Object.fromEntries(keys.map((x) => [x, [] as unknown[]])),
);
const computed = Object.entries(columns)
.filter(([k, v]) => (v.defaultFn || v.onUpdateFn) && !keys.includes(k))
.map(([k]) => k);
return db
.select(
Object.fromEntries([
...keys.map((x) => [x, sql.raw(`"${dbNames[x]}"`)]),
...computed.map((x) => [
x,
(columns[x].defaultFn?.() ?? columns[x].onUpdateFn!()).as(dbNames[x]),
]),
]) as {
[k in keyof typeof typeInfo.$inferInsert]-?: SQL.Aliased<
(typeof typeInfo.$inferInsert)[k]
>;
},
)
.from(
sql`unnest(${sql.join(
keys.map(
(k) =>
sql`${sqlarr(vals[k])}${sql.raw(`::${columns[k].getSQLType()}[]`)}`,
),
sql.raw(", "),
)}) as v(${sql.raw(keys.map((x) => `"${dbNames[x]}"`).join(", "))})`,
);
};
export const unnest = <T extends Record<string, unknown>>(
values: T[],
name: string,
typeInfo: Record<keyof T, string>,
) => {
const keys = Object.keys(typeInfo);
const vals = values.reduce(
(acc, cur) => {
for (const k of keys) {
if (k in cur) acc[k].push(cur[k]);
else acc[k].push(null);
}
return acc;
},
Object.fromEntries(keys.map((x) => [x, [] as unknown[]])),
);
return sql`unnest(${sql.join(
keys.map((k) => sql`${sqlarr(vals[k])}${sql.raw(`::${typeInfo[k]}[]`)}`),
sql.raw(", "),
)}) as ${sql.raw(name)}(${sql.raw(keys.map((x) => `"${x}"`).join(", "))})`;
};
export const coalesce = <T>(val: SQL<T> | SQLWrapper, def: SQL<T> | Column) => {
return sql<T>`coalesce(${val}, ${def})`;
};

View File

@@ -1,4 +1,4 @@
import { opentelemetry } from "@elysiajs/opentelemetry";
import { record as elysiaRecord, opentelemetry } from "@elysiajs/opentelemetry";
import { OTLPMetricExporter as GrpcMetricExporter } from "@opentelemetry/exporter-metrics-otlp-grpc";
import { OTLPMetricExporter as HttpMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto";
import { OTLPTraceExporter as GrpcTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
@@ -32,3 +32,12 @@ export const otel = new Elysia()
}),
)
.as("global");
export function record<T extends (...args: any) => any>(
spanName: string,
fn: T,
): T {
const wrapped = (...args: Parameters<T>) =>
elysiaRecord(spanName, () => fn(...args));
return wrapped as T;
}

View File

@@ -17,6 +17,7 @@ describe("images", () => {
});
it("Create a serie download images", async () => {
await db.delete(mqueue);
await createSerie(madeInAbyss);
const release = await processImages();
// remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing)
@@ -31,12 +32,17 @@ describe("images", () => {
});
it("Download 404 image", async () => {
await db.delete(mqueue);
const url404 = "https://mockhttp.org/status/404";
const [ret, body] = await createMovie({
...dune,
translations: {
en: {
...dune.translations.en,
poster: "https://www.google.com/404",
poster: url404,
thumbnail: null,
banner: null,
logo: null,
},
},
});
@@ -49,7 +55,7 @@ describe("images", () => {
const failed = await db.query.mqueue.findFirst({
where: and(
eq(mqueue.kind, "image"),
eq(sql`${mqueue.message}->>'url'`, "https://www.google.com/404"),
eq(sql`${mqueue.message}->>'url'`, url404),
),
});
expect(failed!.attempt).toBe(5);

View File

@@ -3,6 +3,7 @@ import { beforeAll } from "bun:test";
process.env.PGDATABASE = "kyoo_test";
process.env.JWT_SECRET = "this is a secret";
process.env.JWT_ISSUER = "https://kyoo.zoriya.dev";
process.env.IMAGES_PATH = "./images";
beforeAll(async () => {
// lazy load this so env set before actually applies