Wrap every insert with a trace

This commit is contained in:
2025-11-28 17:25:29 +01:00
parent 464d720ef9
commit 60d59d7f7b
10 changed files with 576 additions and 550 deletions

View File

@@ -1,5 +1,5 @@
import path from "node:path";
import { getCurrentSpan, record, setAttributes } from "@elysiajs/opentelemetry";
import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry";
import { SpanStatusCode } from "@opentelemetry/api";
import { encode } from "blurhash";
import { and, eq, is, lt, type SQL, sql } from "drizzle-orm";
@@ -9,9 +9,10 @@ import type { PoolClient } from "pg";
import sharp from "sharp";
import { db, type Transaction } from "~/db";
import { mqueue } from "~/db/schema/mqueue";
import type { Image } from "~/models/utils";
import { getFile } from "~/utils";
import { unnestValues } from "~/db/utils";
import type { Image } from "~/models/utils";
import { record } from "~/otel";
import { getFile } from "~/utils";
export const imageDir = process.env.IMAGES_PATH ?? "/images";
export const defaultBlurhash = "000000";
@@ -77,13 +78,10 @@ export const enqueueOptImage = (
};
};
export const flushImageQueue = async (
tx: Transaction,
imgQueue: ImageTask[],
priority: number,
) => {
export const flushImageQueue = record(
"enqueueImages",
async (tx: Transaction, imgQueue: ImageTask[], priority: number) => {
if (!imgQueue.length) return;
record("enqueue images", async () => {
await tx.insert(mqueue).select(
unnestValues(
imgQueue.map((x) => ({ kind: "image", message: x, priority })),
@@ -91,11 +89,10 @@ export const flushImageQueue = async (
),
);
await tx.execute(sql`notify kyoo_image`);
});
};
},
);
export const processImages = async () => {
return record("download images", async () => {
export const processImages = record("processImages", async () => {
let running = false;
async function processAll() {
if (running) return;
@@ -118,11 +115,9 @@ export const processImages = async () => {
// start processing old tasks
await processAll();
return () => client.release(true);
});
};
});
async function processOne() {
return record("download", async () => {
const processOne = record("download", async () => {
return await db.transaction(async (tx) => {
const [item] = await tx
.select()
@@ -163,8 +158,7 @@ async function processOne() {
}
return true;
});
});
}
});
async function downloadImage(id: string, url: string): Promise<string> {
const low = await getFile(path.join(imageDir, `${id}.low.jpg`))

View File

@@ -1,20 +1,26 @@
import { sql } from "drizzle-orm";
import { db } from "~/db";
import { shows, showTranslations } from "~/db/schema";
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
import { conflictUpdateAllExcept } from "~/db/utils";
import type { SeedCollection } from "~/models/collections";
import type { SeedMovie } from "~/models/movie";
import type { SeedSerie } from "~/models/serie";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type ShowTrans = typeof showTranslations.$inferInsert;
export const insertCollection = async (
export const insertCollection = record(
"insertCollection",
async (
collection: SeedCollection | undefined,
show: (({ kind: "movie" } & SeedMovie) | ({ kind: "serie" } & SeedSerie)) & {
show: (
| ({ kind: "movie" } & SeedMovie)
| ({ kind: "serie" } & SeedSerie)
) & {
nextRefresh: string;
},
) => {
) => {
if (!collection) return null;
const { translations, ...col } = collection;
@@ -83,4 +89,5 @@ export const insertCollection = async (
});
return ret;
});
};
},
);

View File

@@ -8,6 +8,7 @@ import {
} from "~/db/schema";
import { conflictUpdateAllExcept, unnest, unnestValues } from "~/db/utils";
import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
import { guessNextRefresh } from "../refresh";
import { updateAvailableCount, updateAvailableSince } from "./shows";
@@ -42,11 +43,13 @@ const generateSlug = (
}
};
export const insertEntries = async (
export const insertEntries = record(
"insertEntries",
async (
show: { pk: number; slug: string; kind: "movie" | "serie" | "collection" },
items: (SeedEntry | SeedExtra)[],
onlyExtras = false,
) => {
) => {
if (!items.length) return [];
const retEntries = await db.transaction(async (tx) => {
@@ -196,7 +199,8 @@ export const insertEntries = async (
slug: entry.slug,
videos: retVideos.filter((x) => x.entryPk === entry.pk),
}));
};
},
);
export function computeVideoSlug(entrySlug: SQL | Column, needsRendering: SQL) {
return sql<string>`

View File

@@ -2,16 +2,16 @@ import { db } from "~/db";
import { seasons, seasonTranslations } from "~/db/schema";
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
import type { SeedSeason } from "~/models/season";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
import { guessNextRefresh } from "../refresh";
type SeasonI = typeof seasons.$inferInsert;
type SeasonTransI = typeof seasonTranslations.$inferInsert;
export const insertSeasons = async (
show: { pk: number; slug: string },
items: SeedSeason[],
) => {
export const insertSeasons = record(
"insertSeasons",
async (show: { pk: number; slug: string }, items: SeedSeason[]) => {
if (!items.length) return [];
return db.transaction(async (tx) => {
@@ -74,4 +74,5 @@ export const insertSeasons = async (
return ret;
});
};
},
);

View File

@@ -16,18 +16,21 @@ import {
shows,
showTranslations,
} from "~/db/schema";
import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils";
import { conflictUpdateAllExcept, sqlarr } from "~/db/utils";
import type { SeedCollection } from "~/models/collections";
import type { SeedMovie } from "~/models/movie";
import type { SeedSerie } from "~/models/serie";
import type { Original } from "~/models/utils";
import { record } from "~/otel";
import { getYear } from "~/utils";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type Show = typeof shows.$inferInsert;
type ShowTrans = typeof showTranslations.$inferInsert;
export const insertShow = async (
export const insertShow = record(
"insertShow",
async (
show: Omit<Show, "original">,
original: Original & {
poster: string | null;
@@ -39,7 +42,7 @@ export const insertShow = async (
| SeedMovie["translations"]
| SeedSerie["translations"]
| SeedCollection["translations"],
) => {
) => {
return await db.transaction(async (tx) => {
const imgQueue: ImageTask[] = [];
const orig = {
@@ -103,7 +106,8 @@ export const insertShow = async (
});
return ret;
});
};
},
);
async function insertBaseShow(tx: Transaction, show: Show) {
function insert() {

View File

@@ -3,12 +3,12 @@ import { db } from "~/db";
import { roles, staff } from "~/db/schema";
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
import type { SeedStaff } from "~/models/staff";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
export const insertStaff = async (
seed: SeedStaff[] | undefined,
showPk: number,
) => {
export const insertStaff = record(
"insertStaff",
async (seed: SeedStaff[] | undefined, showPk: number) => {
if (!seed?.length) return [];
return await db.transaction(async (tx) => {
@@ -25,7 +25,12 @@ export const insertStaff = async (
.select(unnestValues(people, staff))
.onConflictDoUpdate({
target: staff.slug,
set: conflictUpdateAllExcept(staff, ["pk", "id", "slug", "createdAt"]),
set: conflictUpdateAllExcept(staff, [
"pk",
"id",
"slug",
"createdAt",
]),
})
.returning({ pk: staff.pk, id: staff.id, slug: staff.slug });
@@ -54,4 +59,5 @@ export const insertStaff = async (
return ret;
});
};
},
);

View File

@@ -3,15 +3,15 @@ import { db } from "~/db";
import { showStudioJoin, studios, studioTranslations } from "~/db/schema";
import { conflictUpdateAllExcept, sqlarr, unnestValues } from "~/db/utils";
import type { SeedStudio } from "~/models/studio";
import { record } from "~/otel";
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
type StudioI = typeof studios.$inferInsert;
type StudioTransI = typeof studioTranslations.$inferInsert;
export const insertStudios = async (
seed: SeedStudio[] | undefined,
showPk: number,
) => {
export const insertStudios = record(
"insertStudios",
async (seed: SeedStudio[] | undefined, showPk: number) => {
if (!seed?.length) return [];
return await db.transaction(async (tx) => {
@@ -63,9 +63,12 @@ export const insertStudios = async (
showPk: sql`${showPk}`.as("showPk"),
studioPk: sql`v."studioPk"`.as("studioPk"),
})
.from(sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`),
.from(
sql`unnest(${sqlarr(ret.map((x) => x.pk))}::integer[]) as v("studioPk")`,
),
)
.onConflictDoNothing();
return ret;
});
};
},
);

View File

@@ -1,12 +1,12 @@
import os from "node:os";
import path from "node:path";
import tls, { type ConnectionOptions } from "node:tls";
import { record } from "@elysiajs/opentelemetry";
import { instrumentDrizzleClient } from "@kubiks/otel-drizzle";
import { sql } from "drizzle-orm";
import { drizzle } from "drizzle-orm/node-postgres";
import { migrate as migrateDb } from "drizzle-orm/node-postgres/migrator";
import type { PoolConfig } from "pg";
import { record } from "~/otel";
import * as schema from "./schema";
const config: PoolConfig = {
@@ -122,8 +122,7 @@ instrumentDrizzleClient(db, {
maxQueryTextLength: 100_000_000,
});
export const migrate = async () => {
return record("migrate", async () => {
export const migrate = record("migrate", async () => {
try {
await db.execute(
sql.raw(`
@@ -140,8 +139,7 @@ export const migrate = async () => {
migrationsFolder: "./drizzle",
});
console.log(`Database ${postgresConfig.database} migrated!`);
});
};
});
export type Transaction =
| typeof db

View File

@@ -1,4 +1,4 @@
import { opentelemetry } from "@elysiajs/opentelemetry";
import { record as elysiaRecord, opentelemetry } from "@elysiajs/opentelemetry";
import { OTLPMetricExporter as GrpcMetricExporter } from "@opentelemetry/exporter-metrics-otlp-grpc";
import { OTLPMetricExporter as HttpMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto";
import { OTLPTraceExporter as GrpcTraceExporter } from "@opentelemetry/exporter-trace-otlp-grpc";
@@ -32,3 +32,12 @@ export const otel = new Elysia()
}),
)
.as("global");
export function record<T extends (...args: any) => any>(
spanName: string,
fn: T,
): T {
const wrapped = (...args: Parameters<T>) =>
elysiaRecord(spanName, () => fn(...args));
return wrapped as T;
}