mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-12-06 06:36:25 +00:00
Compare commits
16 Commits
7b38c12b85
...
a1b975cc5d
| Author | SHA1 | Date | |
|---|---|---|---|
| a1b975cc5d | |||
| 4f2b2d2cd2 | |||
| d3ccd14fe0 | |||
| 7f5bc2f57c | |||
| c2c9bbe555 | |||
| 20e6fbbc33 | |||
| 5f9064ec37 | |||
| 433b90a3fb | |||
| 81c6f68509 | |||
| 96ac331903 | |||
|
|
f1c2724a7b | ||
|
|
12fe7c157f | ||
| c29ad99ca0 | |||
|
|
a99f29074c | ||
|
|
f449a0878a | ||
|
|
097985ab6d |
2
.github/workflows/auth-hurl.yml
vendored
2
.github/workflows/auth-hurl.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
postgres:
|
||||
image: postgres:15
|
||||
ports:
|
||||
- "5432:5432"
|
||||
- "5432:5432"
|
||||
env:
|
||||
POSTGRES_USER: kyoo
|
||||
POSTGRES_PASSWORD: password
|
||||
|
||||
2
.github/workflows/coding-style.yml
vendored
2
.github/workflows/coding-style.yml
vendored
@@ -37,7 +37,7 @@ jobs:
|
||||
run: biome ci .
|
||||
|
||||
scanner:
|
||||
name: "Lint scanner/autosync"
|
||||
name: "Lint scanner"
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
5
.github/workflows/docker.yml
vendored
5
.github/workflows/docker.yml
vendored
@@ -34,11 +34,6 @@ jobs:
|
||||
label: scanner
|
||||
image: ${{ github.repository_owner }}/kyoo_scanner
|
||||
|
||||
- context: ./autosync
|
||||
dockerfile: Dockerfile
|
||||
label: autosync
|
||||
image: ${{ github.repository_owner }}/kyoo_autosync
|
||||
|
||||
- context: ./transcoder
|
||||
dockerfile: Dockerfile
|
||||
label: transcoder
|
||||
|
||||
2
.github/workflows/native-build.yml
vendored
2
.github/workflows/native-build.yml
vendored
@@ -2,7 +2,7 @@ name: Native build
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- v*
|
||||
- v*
|
||||
|
||||
jobs:
|
||||
update:
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -2,7 +2,7 @@ name: Release
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- v*
|
||||
- v*
|
||||
|
||||
jobs:
|
||||
update:
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -7,6 +7,4 @@
|
||||
log.html
|
||||
output.xml
|
||||
report.html
|
||||
chart/charts
|
||||
chart/Chart.lock
|
||||
tmp
|
||||
|
||||
@@ -13,4 +13,7 @@ pkgs.mkShell {
|
||||
];
|
||||
|
||||
SHARP_FORCE_GLOBAL_LIBVIPS = 1;
|
||||
shellHook = ''
|
||||
export LD_LIBRARY_PATH=${pkgs.stdenv.cc.cc.lib}/lib:$LD_LIBRARY_PATH
|
||||
'';
|
||||
}
|
||||
|
||||
@@ -52,8 +52,7 @@ export const base = new Elysia({ name: "base" })
|
||||
console.error(code, error);
|
||||
return {
|
||||
status: 500,
|
||||
message: "message" in error ? (error?.message ?? code) : code,
|
||||
details: error,
|
||||
message: "Internal server error",
|
||||
} as KError;
|
||||
})
|
||||
.get("/health", () => ({ status: "healthy" }) as const, {
|
||||
|
||||
@@ -139,9 +139,9 @@ const processOne = record("download", async () => {
|
||||
const column = sql.raw(img.column);
|
||||
|
||||
await tx.execute(sql`
|
||||
update ${table} set ${column} = ${ret}
|
||||
where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)}
|
||||
`);
|
||||
update ${table} set ${column} = ${ret}
|
||||
where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)}
|
||||
`);
|
||||
|
||||
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
|
||||
} catch (err: any) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import { roles, staff } from "~/db/schema";
|
||||
import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
|
||||
import type { SeedStaff } from "~/models/staff";
|
||||
import { record } from "~/otel";
|
||||
import { uniqBy } from "~/utils";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
|
||||
export const insertStaff = record(
|
||||
@@ -13,13 +14,16 @@ export const insertStaff = record(
|
||||
|
||||
return await db.transaction(async (tx) => {
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const people = seed.map((x) => ({
|
||||
...x.staff,
|
||||
image: enqueueOptImage(imgQueue, {
|
||||
url: x.staff.image,
|
||||
column: staff.image,
|
||||
}),
|
||||
}));
|
||||
const people = uniqBy(
|
||||
seed.map((x) => ({
|
||||
...x.staff,
|
||||
image: enqueueOptImage(imgQueue, {
|
||||
url: x.staff.image,
|
||||
column: staff.image,
|
||||
}),
|
||||
})),
|
||||
(x) => x.slug,
|
||||
);
|
||||
const ret = await tx
|
||||
.insert(staff)
|
||||
.select(unnestValues(people, staff))
|
||||
@@ -36,7 +40,7 @@ export const insertStaff = record(
|
||||
|
||||
const rval = seed.map((x, i) => ({
|
||||
showPk,
|
||||
staffPk: ret[i].pk,
|
||||
staffPk: ret.find(y => y.slug === x.staff.slug)!.pk,
|
||||
kind: x.kind,
|
||||
order: i,
|
||||
character: {
|
||||
|
||||
@@ -831,6 +831,9 @@ export const videosWriteH = new Elysia({ prefix: "/videos", tags: ["videos"] })
|
||||
.post(
|
||||
"",
|
||||
async ({ body, status }) => {
|
||||
if (body.length === 0) {
|
||||
return status(422, { status: 422, message: "No videos" });
|
||||
}
|
||||
return await db.transaction(async (tx) => {
|
||||
let vids: { pk: number; id: string; path: string; guess: Guess }[] = [];
|
||||
try {
|
||||
@@ -925,6 +928,7 @@ export const videosWriteH = new Elysia({ prefix: "/videos", tags: ["videos"] })
|
||||
description:
|
||||
"Invalid rendering specified. (conflicts with an existing video)",
|
||||
},
|
||||
422: KError,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@@ -91,7 +91,7 @@ export const seasonRelations = relations(seasons, ({ one, many }) => ({
|
||||
|
||||
export const seasonTrRelations = relations(seasonTranslations, ({ one }) => ({
|
||||
season: one(seasons, {
|
||||
relationName: "season_translation",
|
||||
relationName: "season_translations",
|
||||
fields: [seasonTranslations.pk],
|
||||
references: [seasons.pk],
|
||||
}),
|
||||
|
||||
@@ -75,6 +75,10 @@ export function conflictUpdateAllExcept<
|
||||
|
||||
// drizzle is bugged and doesn't allow js arrays to be used in raw sql.
|
||||
export function sqlarr(array: unknown[]): string {
|
||||
function escapeStr(str: string) {
|
||||
return str.replaceAll("\\", "\\\\").replaceAll('"', '\\"');
|
||||
}
|
||||
|
||||
return `{${array
|
||||
.map((item) =>
|
||||
item === "null" || item === null || item === undefined
|
||||
@@ -82,8 +86,8 @@ export function sqlarr(array: unknown[]): string {
|
||||
: Array.isArray(item)
|
||||
? sqlarr(item)
|
||||
: typeof item === "object"
|
||||
? `"${JSON.stringify(item).replaceAll("\\", "\\\\").replaceAll('"', '\\"')}"`
|
||||
: `"${item?.toString().replaceAll('"', '\\"')}"`,
|
||||
? `"${escapeStr(JSON.stringify(item))}"`
|
||||
: `"${escapeStr(item.toString())}"`,
|
||||
)
|
||||
.join(", ")}}`;
|
||||
}
|
||||
|
||||
@@ -28,3 +28,13 @@ export function getFile(path: string): BunFile | S3File {
|
||||
|
||||
return Bun.file(path);
|
||||
}
|
||||
|
||||
export function uniqBy<T>(a: T[], key: (val: T) => string) {
|
||||
const seen: Record<string, boolean> = {};
|
||||
return a.filter((item) => {
|
||||
const k = key(item);
|
||||
if (seen[k]) return false;
|
||||
seen[k] = true;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -104,4 +104,60 @@ describe("Serie seeding", () => {
|
||||
],
|
||||
});
|
||||
});
|
||||
|
||||
it("Can create a serie with quotes", async () => {
|
||||
const [resp, body] = await createSerie({
|
||||
...madeInAbyss,
|
||||
slug: "quote-test",
|
||||
seasons: [
|
||||
{
|
||||
...madeInAbyss.seasons[0],
|
||||
translations: {
|
||||
en: {
|
||||
...madeInAbyss.seasons[0].translations.en,
|
||||
name: "Season'1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
...madeInAbyss.seasons[1],
|
||||
translations: {
|
||||
en: {
|
||||
...madeInAbyss.seasons[0].translations.en,
|
||||
name: 'Season"2',
|
||||
description: `This's """""quote, idk'''''`,
|
||||
},
|
||||
},
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expectStatus(resp, body).toBe(201);
|
||||
expect(body.id).toBeString();
|
||||
expect(body.slug).toBe("quote-test");
|
||||
|
||||
const ret = await db.query.shows.findFirst({
|
||||
where: eq(shows.id, body.id),
|
||||
with: {
|
||||
seasons: {
|
||||
orderBy: seasons.seasonNumber,
|
||||
with: { translations: true },
|
||||
},
|
||||
entries: {
|
||||
with: {
|
||||
translations: true,
|
||||
evj: { with: { video: true } },
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
expect(ret).not.toBeNull();
|
||||
expect(ret!.seasons).toBeArrayOfSize(2);
|
||||
expect(ret!.seasons[0].translations[0].name).toBe("Season'1");
|
||||
expect(ret!.seasons[1].translations[0].name).toBe('Season"2');
|
||||
expect(ret!.entries).toBeArrayOfSize(
|
||||
madeInAbyss.entries.length + madeInAbyss.extras.length,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"compilerOptions": {
|
||||
"target": "ES2021",
|
||||
"target": "ES2022",
|
||||
"module": "ES2022",
|
||||
"moduleResolution": "node",
|
||||
"esModuleInterop": true,
|
||||
|
||||
@@ -88,7 +88,9 @@ func setupOtel(e *echo.Echo) (func(), error) {
|
||||
otel.SetTracerProvider(tp)
|
||||
|
||||
e.Use(otelecho.Middleware("kyoo.auth", otelecho.WithSkipper(func(c echo.Context) bool {
|
||||
return c.Path() == "/auth/health" || c.Path() == "/auth/ready"
|
||||
return (c.Path() == "/auth/health" ||
|
||||
c.Path() == "/auth/ready" ||
|
||||
strings.HasPrefix(c.Path(), "/.well-known/"))
|
||||
})))
|
||||
|
||||
return func() {
|
||||
|
||||
2
chart/.gitignore
vendored
Normal file
2
chart/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
charts
|
||||
|
||||
6
chart/Chart.lock
Normal file
6
chart/Chart.lock
Normal file
@@ -0,0 +1,6 @@
|
||||
dependencies:
|
||||
- name: postgres
|
||||
repository: oci://registry-1.docker.io/cloudpirates
|
||||
version: 0.12.4
|
||||
digest: sha256:e486b44703c7a97eee25f7715ab040d197d79c41ea1c422ae009b1f68985f544
|
||||
generated: "2025-12-01T20:17:25.152279487+01:00"
|
||||
@@ -12,4 +12,4 @@ dependencies:
|
||||
- condition: postgres.enabled
|
||||
name: postgres
|
||||
repository: oci://registry-1.docker.io/cloudpirates
|
||||
version: 0.12.0
|
||||
version: 0.12.4
|
||||
|
||||
@@ -469,7 +469,7 @@ postgres:
|
||||
existingSecret: "{{ .Values.global.postgres.infra.existingSecret }}"
|
||||
secretKeys:
|
||||
# set the postgres user password to the same as our user
|
||||
passwordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
|
||||
adminPasswordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
|
||||
initdb:
|
||||
scripts:
|
||||
kyoo_api.sql: |
|
||||
|
||||
@@ -88,7 +88,7 @@ services:
|
||||
env_file:
|
||||
- ./.env
|
||||
volumes:
|
||||
- images:/app/images
|
||||
- images:/images
|
||||
labels:
|
||||
- "traefik.enable=true"
|
||||
- "traefik.http.routers.swagger.rule=PathPrefix(`/swagger`)"
|
||||
@@ -177,7 +177,7 @@ services:
|
||||
profiles: ['qsv']
|
||||
|
||||
traefik:
|
||||
image: traefik:v3.5
|
||||
image: traefik:v3.6
|
||||
restart: on-failure
|
||||
command:
|
||||
- "--providers.docker=true"
|
||||
|
||||
@@ -58,7 +58,7 @@ services:
|
||||
env_file:
|
||||
- ./.env
|
||||
volumes:
|
||||
- images:/app/images
|
||||
- images:/images
|
||||
labels:
|
||||
- "traefik.enable=true"
|
||||
- "traefik.http.routers.swagger.rule=PathPrefix(`/swagger`)"
|
||||
@@ -126,7 +126,7 @@ services:
|
||||
profiles: ["qsv"]
|
||||
|
||||
traefik:
|
||||
image: traefik:v3.5
|
||||
image: traefik:v3.6
|
||||
restart: unless-stopped
|
||||
command:
|
||||
- "--providers.docker=true"
|
||||
|
||||
@@ -84,6 +84,7 @@ export const login = async (
|
||||
export const logout = async () => {
|
||||
const accounts = readAccounts();
|
||||
const account = accounts.find((x) => x.selected);
|
||||
removeAccounts((x) => x.selected);
|
||||
if (account) {
|
||||
await queryFn({
|
||||
method: "DELETE",
|
||||
@@ -92,7 +93,6 @@ export const logout = async () => {
|
||||
parser: null,
|
||||
});
|
||||
}
|
||||
removeAccounts((x) => x.selected);
|
||||
};
|
||||
|
||||
export const deleteAccount = async () => {
|
||||
|
||||
@@ -18,6 +18,7 @@ create table scanner.requests(
|
||||
external_id jsonb not null default '{}'::jsonb,
|
||||
videos jsonb not null default '[]'::jsonb,
|
||||
status scanner.request_status not null default 'pending',
|
||||
error jsonb,
|
||||
started_at timestamptz,
|
||||
created_at timestamptz not null default now()::timestamptz,
|
||||
constraint unique_kty unique nulls not distinct (kind, title, year)
|
||||
|
||||
@@ -5,13 +5,15 @@ from fastapi import FastAPI
|
||||
|
||||
from scanner.client import KyooClient
|
||||
from scanner.fsscan import FsScanner
|
||||
from scanner.otel import instrument
|
||||
from scanner.log import configure_logging
|
||||
from scanner.otel import setup_otelproviders, instrument
|
||||
from scanner.providers.composite import CompositeProvider
|
||||
from scanner.providers.themoviedatabase import TheMovieDatabase
|
||||
from scanner.requests import RequestCreator, RequestProcessor
|
||||
|
||||
from .database import get_db, init_pool, migrate
|
||||
from .routers.routes import router
|
||||
from .routers.health import router as health_router
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -24,6 +26,10 @@ async def lifespan(_):
|
||||
):
|
||||
# there's no way someone else used the same id, right?
|
||||
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
|
||||
is_http = not is_master and await db.fetchval("select pg_try_advisory_lock(645633)")
|
||||
if is_http:
|
||||
yield
|
||||
return
|
||||
if is_master:
|
||||
await migrate()
|
||||
processor = RequestProcessor(pool, client, tmdb)
|
||||
@@ -68,4 +74,7 @@ app = FastAPI(
|
||||
lifespan=lifespan,
|
||||
)
|
||||
app.include_router(router)
|
||||
app.include_router(health_router)
|
||||
configure_logging()
|
||||
setup_otelproviders()
|
||||
instrument(app)
|
||||
|
||||
@@ -3,7 +3,7 @@ from logging import getLogger
|
||||
from types import TracebackType
|
||||
from typing import Literal
|
||||
|
||||
from aiohttp import ClientSession
|
||||
from aiohttp import ClientResponse, ClientResponseError, ClientSession
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from .models.movie import Movie
|
||||
@@ -38,9 +38,19 @@ class KyooClient(metaclass=Singleton):
|
||||
):
|
||||
await self._client.close()
|
||||
|
||||
async def raise_for_status(self, r: ClientResponse):
|
||||
if r.status >= 400:
|
||||
raise ClientResponseError(
|
||||
r.request_info,
|
||||
r.history,
|
||||
status=r.status,
|
||||
message=await r.text(),
|
||||
headers=r.headers,
|
||||
)
|
||||
|
||||
async def get_videos_info(self) -> VideoInfo:
|
||||
async with self._client.get("videos") as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
return VideoInfo(**await r.json())
|
||||
|
||||
async def create_videos(self, videos: list[Video]) -> list[VideoCreated]:
|
||||
@@ -48,7 +58,7 @@ class KyooClient(metaclass=Singleton):
|
||||
"videos",
|
||||
data=TypeAdapter(list[Video]).dump_json(videos, by_alias=True),
|
||||
) as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
return TypeAdapter(list[VideoCreated]).validate_json(await r.text())
|
||||
|
||||
async def delete_videos(self, videos: list[str] | set[str]):
|
||||
@@ -56,14 +66,14 @@ class KyooClient(metaclass=Singleton):
|
||||
"videos",
|
||||
data=TypeAdapter(list[str] | set[str]).dump_json(videos, by_alias=True),
|
||||
) as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
|
||||
async def create_movie(self, movie: Movie) -> Resource:
|
||||
async with self._client.post(
|
||||
"movies",
|
||||
data=movie.model_dump_json(by_alias=True),
|
||||
) as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
return Resource.model_validate(await r.json())
|
||||
|
||||
async def create_serie(self, serie: Serie) -> Resource:
|
||||
@@ -71,7 +81,7 @@ class KyooClient(metaclass=Singleton):
|
||||
"series",
|
||||
data=serie.model_dump_json(by_alias=True),
|
||||
) as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
return Resource.model_validate(await r.json())
|
||||
|
||||
async def link_videos(
|
||||
@@ -100,4 +110,4 @@ class KyooClient(metaclass=Singleton):
|
||||
by_alias=True,
|
||||
),
|
||||
) as r:
|
||||
r.raise_for_status()
|
||||
await self.raise_for_status(r)
|
||||
|
||||
32
scanner/scanner/log.py
Normal file
32
scanner/scanner/log.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from opentelemetry.sdk._logs import LoggingHandler
|
||||
|
||||
|
||||
def configure_logging():
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
|
||||
logging.getLogger("watchfiles").setLevel(logging.WARNING)
|
||||
logging.getLogger("rebulk").setLevel(logging.WARNING)
|
||||
|
||||
# Add stdout handler
|
||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||
# set logging level via STDOUT_LOG_LEVEL env var or default to INFO
|
||||
stdout_handler.setLevel(
|
||||
getattr(logging, os.getenv("STDOUT_LOG_LEVEL", "INFO").upper())
|
||||
)
|
||||
stdout_handler.setFormatter(
|
||||
logging.Formatter(
|
||||
fmt="[{levelname}][{name}] {message}",
|
||||
style="{",
|
||||
)
|
||||
)
|
||||
root_logger.addHandler(stdout_handler)
|
||||
|
||||
# Add OpenTelemetry handler
|
||||
# set logging level via OTEL_LOG_LEVEL env var
|
||||
# https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration
|
||||
root_logger.addHandler(LoggingHandler())
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
@@ -31,4 +31,5 @@ class RequestRet(Model):
|
||||
"running",
|
||||
"failed",
|
||||
]
|
||||
error: dict[str, Any] | None
|
||||
started_at: datetime | None
|
||||
|
||||
@@ -1,81 +1,77 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry._logs import set_logger_provider
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
|
||||
OTLPLogExporter as GrpcLogExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||
OTLPMetricExporter as GrpcMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
||||
OTLPSpanExporter as GrpcSpanExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
|
||||
OTLPLogExporter as HttpLogExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter as HttpMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter as HttpSpanExporter,
|
||||
)
|
||||
from opentelemetry import trace, metrics, _logs
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
from opentelemetry.sdk._logs import LoggerProvider
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
|
||||
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_otelproviders() -> tuple[object, object, object]:
|
||||
import os
|
||||
|
||||
if not (os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "").strip()):
|
||||
logger.info(
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT not specified, skipping otel provider setup."
|
||||
)
|
||||
return None, None, None
|
||||
|
||||
# choose exporters (grpc vs http) ...
|
||||
if os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").lower().strip() == "grpc":
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||
OTLPMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
|
||||
logger.info("Using gRPC libs for OpenTelemetry exporter.")
|
||||
|
||||
else:
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
|
||||
logger.info("Using HTTP libs for OpenTelemetry exporter.")
|
||||
|
||||
resource = Resource.create(
|
||||
{"service.name": os.getenv("OTEL_SERVICE_NAME", "kyoo.scanner")}
|
||||
)
|
||||
|
||||
# Traces
|
||||
tracer_provider = TracerProvider(resource=resource)
|
||||
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||
trace.set_tracer_provider(tracer_provider)
|
||||
|
||||
# Metrics
|
||||
meter_provider = MeterProvider(
|
||||
resource=resource,
|
||||
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())],
|
||||
)
|
||||
metrics.set_meter_provider(meter_provider)
|
||||
|
||||
# Logs — install logger provider + processor/exporter
|
||||
logger_provider = LoggerProvider(resource=resource)
|
||||
logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter()))
|
||||
_logs.set_logger_provider(logger_provider)
|
||||
|
||||
return tracer_provider, meter_provider, logger_provider
|
||||
|
||||
|
||||
def instrument(app: FastAPI):
|
||||
proto = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
|
||||
resource = Resource.create(attributes={SERVICE_NAME: "kyoo.scanner"})
|
||||
|
||||
provider = LoggerProvider(resource=resource)
|
||||
provider.add_log_record_processor(
|
||||
BatchLogRecordProcessor(
|
||||
HttpLogExporter() if proto == "http/protobuf" else GrpcLogExporter()
|
||||
)
|
||||
)
|
||||
set_logger_provider(provider)
|
||||
logging.basicConfig(
|
||||
handlers=[
|
||||
LoggingHandler(level=logging.DEBUG, logger_provider=provider),
|
||||
logging.StreamHandler(sys.stdout),
|
||||
],
|
||||
level=logging.DEBUG,
|
||||
)
|
||||
logging.getLogger("watchfiles").setLevel(logging.WARNING)
|
||||
logging.getLogger("rebulk").setLevel(logging.WARNING)
|
||||
|
||||
provider = TracerProvider(resource=resource)
|
||||
provider.add_span_processor(
|
||||
BatchSpanProcessor(
|
||||
HttpSpanExporter() if proto == "http/protobuf" else GrpcSpanExporter()
|
||||
)
|
||||
)
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
provider = MeterProvider(
|
||||
metric_readers=[
|
||||
PeriodicExportingMetricReader(
|
||||
HttpMetricExporter()
|
||||
if proto == "http/protobuf"
|
||||
else GrpcMetricExporter()
|
||||
)
|
||||
],
|
||||
resource=resource,
|
||||
)
|
||||
metrics.set_meter_provider(provider)
|
||||
|
||||
FastAPIInstrumentor.instrument_app(
|
||||
app,
|
||||
http_capture_headers_server_request=[".*"],
|
||||
|
||||
@@ -47,7 +47,7 @@ class Provider(ABC):
|
||||
search = await self.search_movies(title, year, language=[])
|
||||
if not any(search):
|
||||
raise ProviderError(
|
||||
f"Couldn't find a movie with title {title}. (year: {year}"
|
||||
f"Couldn't find a movie with title {title}. (year: {year})"
|
||||
)
|
||||
ret = await self.get_movie(
|
||||
{k: v.data_id for k, v in search[0].external_id.items()}
|
||||
@@ -68,7 +68,7 @@ class Provider(ABC):
|
||||
search = await self.search_series(title, year, language=[])
|
||||
if not any(search):
|
||||
raise ProviderError(
|
||||
f"Couldn't find a serie with title {title}. (year: {year}"
|
||||
f"Couldn't find a serie with title {title}. (year: {year})"
|
||||
)
|
||||
ret = await self.get_serie(
|
||||
{k: v.data_id for k, v in search[0].external_id.items()}
|
||||
|
||||
@@ -420,6 +420,8 @@ class TheMovieDatabase(Provider):
|
||||
(x["episode_number"] for x in season["episodes"]), None
|
||||
),
|
||||
"entries_count": len(season["episodes"]),
|
||||
# there can be gaps in episodes (like 1,2,5,6,7)
|
||||
"episodes": [x["episode_number"] for x in season["episodes"]],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -429,9 +431,9 @@ class TheMovieDatabase(Provider):
|
||||
# TODO: batch those
|
||||
ret = await asyncio.gather(
|
||||
*[
|
||||
self._get_entry(serie_id, s.season_number, s.extra["first_entry"] + e)
|
||||
self._get_entry(serie_id, s.season_number, e)
|
||||
for s in seasons
|
||||
for e in range(0, s.extra["entries_count"])
|
||||
for e in s.extra["episodes"]
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from asyncio import CancelledError, Event, TaskGroup
|
||||
from logging import getLogger
|
||||
from traceback import TracebackException
|
||||
from typing import cast
|
||||
|
||||
from asyncpg import Connection, Pool
|
||||
@@ -40,6 +41,8 @@ class RequestCreator:
|
||||
"""
|
||||
delete from scanner.requests
|
||||
where status = 'failed'
|
||||
or (status = 'running'
|
||||
and now() - started_at > interval '1 hour')
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -161,11 +164,22 @@ class RequestProcessor:
|
||||
update
|
||||
scanner.requests
|
||||
set
|
||||
status = 'failed'
|
||||
status = 'failed',
|
||||
error = $2
|
||||
where
|
||||
pk = $1
|
||||
""",
|
||||
request.pk,
|
||||
{
|
||||
"title": type(e).__name__,
|
||||
"message": str(e),
|
||||
"traceback": [
|
||||
line
|
||||
for part in TracebackException.from_exception(e).format()
|
||||
for line in part.split("\n")
|
||||
if line.strip()
|
||||
],
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ class StatusService:
|
||||
title,
|
||||
year,
|
||||
status,
|
||||
error,
|
||||
started_at
|
||||
from
|
||||
scanner.requests
|
||||
|
||||
Reference in New Issue
Block a user