From 188ce3f67d68fa573d22c54a5ac07aa661fa7dc2 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 19 Nov 2025 23:59:04 +0100 Subject: [PATCH 1/3] Log to stdout & otel for scanner --- scanner/scanner/otel.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/scanner/scanner/otel.py b/scanner/scanner/otel.py index 74ff0ace..41e39a87 100644 --- a/scanner/scanner/otel.py +++ b/scanner/scanner/otel.py @@ -1,5 +1,6 @@ import logging import os +import sys from fastapi import FastAPI from opentelemetry import metrics, trace @@ -45,8 +46,13 @@ def instrument(app: FastAPI): ) ) set_logger_provider(provider) - handler = LoggingHandler(level=logging.DEBUG, logger_provider=provider) - logging.basicConfig(handlers=[handler], level=logging.DEBUG) + logging.basicConfig( + handlers=[ + LoggingHandler(level=logging.DEBUG, logger_provider=provider), + logging.StreamHandler(sys.stdout), + ], + level=logging.DEBUG, + ) logging.getLogger("watchfiles").setLevel(logging.WARNING) logging.getLogger("rebulk").setLevel(logging.WARNING) From 37ec32b52dcdf8dea540c7d95d07642a23bfcdf0 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Thu, 20 Nov 2025 00:03:24 +0100 Subject: [PATCH 2/3] Name migrate span of scanner --- scanner/scanner/database.py | 3 +++ scanner/scanner/requests.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/scanner/scanner/database.py b/scanner/scanner/database.py index e5184339..547ecfcd 100644 --- a/scanner/scanner/database.py +++ b/scanner/scanner/database.py @@ -5,8 +5,10 @@ from logging import getLogger from typing import Any, cast from asyncpg import Connection, Pool, create_pool +from opentelemetry import trace logger = getLogger(__name__) +tracer = trace.get_tracer("kyoo.scanner") pool: Pool @@ -55,6 +57,7 @@ async def get_db_fapi(): yield db +@tracer.start_as_current_span("migrate") async def migrate(migrations_dir="./migrations"): async with get_db() as db: _ = await db.execute( diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index 50ad5bff..fd6b8bb3 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -75,7 +75,7 @@ class RequestProcessor: self._database.add_termination_listener(terminated) await self._database.add_listener("scanner_requests", process) - logger.info("Listening for requestes") + logger.info("Listening for requests") _ = await closed.wait() logger.info("stopping...") except CancelledError: From f7e801e57407b75d4ea81238a6ae876a87a66ee4 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Thu, 20 Nov 2025 09:44:00 +0100 Subject: [PATCH 3/3] Chunk identify scans --- scanner/scanner/fsscan.py | 43 +++++----- scanner/scanner/identifiers/identify.py | 107 +++++++++++++----------- 2 files changed, 81 insertions(+), 69 deletions(-) diff --git a/scanner/scanner/fsscan.py b/scanner/scanner/fsscan.py index 018d33a3..d738a734 100644 --- a/scanner/scanner/fsscan.py +++ b/scanner/scanner/fsscan.py @@ -1,3 +1,5 @@ +import asyncio +import itertools import os import re from contextlib import asynccontextmanager @@ -111,30 +113,33 @@ class FsScanner: logger.error("Unexpected error while monitoring files.", exc_info=e) async def _register(self, videos: list[str] | set[str]): - # TODO: we should probably chunk those - vids: list[Video] = [] - for path in list(videos): + async def process(path: str): try: vid = await identify(path) - vid = self._match(vid) - vids.append(vid) + return self._match(vid) except Exception as e: logger.error("Couldn't identify %s.", path, exc_info=e) - created = await self._client.create_videos(vids) + return None - await self._requests.enqueue( - [ - Request( - kind=x.guess.kind, - title=x.guess.title, - year=next(iter(x.guess.years), None), - external_id=x.guess.external_id, - videos=[Request.Video(id=x.id, episodes=x.guess.episodes)], - ) - for x in created - if not any(x.entries) and x.guess.kind != "extra" - ] - ) + for batch in itertools.batched(videos, 20): + vids = await asyncio.gather(*(process(path) for path in batch)) + created = await self._client.create_videos( + [v for v in vids if v is not None] + ) + + await self._requests.enqueue( + [ + Request( + kind=x.guess.kind, + title=x.guess.title, + year=next(iter(x.guess.years), None), + external_id=x.guess.external_id, + videos=[Request.Video(id=x.id, episodes=x.guess.episodes)], + ) + for x in created + if not any(x.entries) and x.guess.kind != "extra" + ] + ) def _match(self, video: Video) -> Video: video.for_ = [] diff --git a/scanner/scanner/identifiers/identify.py b/scanner/scanner/identifiers/identify.py index 90e61f1a..7b51ce45 100644 --- a/scanner/scanner/identifiers/identify.py +++ b/scanner/scanner/identifiers/identify.py @@ -1,15 +1,18 @@ +import os from collections.abc import Awaitable from hashlib import sha256 from itertools import zip_longest from logging import getLogger from typing import Callable, Literal, cast +from opentelemetry import trace from rebulk.match import Match from ..models.videos import Guess, Video from .guess.guess import guessit logger = getLogger(__name__) +tracer = trace.get_tracer("kyoo.scanner") pipeline: list[Callable[[str, Guess], Awaitable[Guess]]] = [ # TODO: add nfo scanner @@ -19,62 +22,66 @@ pipeline: list[Callable[[str, Guess], Awaitable[Guess]]] = [ async def identify(path: str) -> Video: - raw = guessit(path, expected_titles=[]) + with tracer.start_as_current_span(f"identify {os.path.basename(path)}") as span: + span.set_attribute("video.path", path) - # guessit should only return one (according to the doc) - title = raw.get("title", [])[0] - kind = raw.get("type", [])[0] - version = next(iter(raw.get("version", [])), None) - # apparently guessit can return multiples but tbh idk what to do with - # multiples part. we'll just ignore them for now - part = next(iter(raw.get("part", [])), None) + raw = guessit(path, expected_titles=[]) - years = raw.get("year", []) - seasons = raw.get("season", []) - episodes = raw.get("episode", []) + # guessit should only return one (according to the doc) + title = raw.get("title", [])[0] + kind = raw.get("type", [])[0] + version = next(iter(raw.get("version", [])), None) + # apparently guessit can return multiples but tbh idk what to do with + # multiples part. we'll just ignore them for now + part = next(iter(raw.get("part", [])), None) - # just strip the version & part number from the path - rendering_path = "".join( - c - for i, c in enumerate(path) - if not (version and version.start <= i < version.end) - and not (part and part.start <= i < part.end) - ) + years = raw.get("year", []) + seasons = raw.get("season", []) + episodes = raw.get("episode", []) - guess = Guess( - title=cast(str, title.value), - kind=cast(Literal["episode", "movie"], kind.value), - extra_kind=None, - years=[cast(int, y.value) for y in years], - episodes=[ - Guess.Episode(season=cast(int, s.value), episode=cast(int, e.value)) - for s, e in zip_longest( - seasons, - episodes, - fillvalue=seasons[-1] if any(seasons) else Match(0, 0, value=1), - ) - ], - external_id={}, - from_="guessit", - raw={ - k: [x.value if x.value is int else str(x.value) for x in v] - for k, v in raw.items() - }, - ) + # just strip the version & part number from the path + rendering_path = "".join( + c + for i, c in enumerate(path) + if not (version and version.start <= i < version.end) + and not (part and part.start <= i < part.end) + ) - for step in pipeline: - try: - guess = await step(path, guess) - except Exception as e: - logger.error("Couldn't run %s.", step.__name__, exc_info=e) + guess = Guess( + title=cast(str, title.value), + kind=cast(Literal["episode", "movie"], kind.value), + extra_kind=None, + years=[cast(int, y.value) for y in years], + episodes=[ + Guess.Episode(season=cast(int, s.value), episode=cast(int, e.value)) + for s, e in zip_longest( + seasons, + episodes, + fillvalue=seasons[-1] if any(seasons) else Match(0, 0, value=1), + ) + ], + external_id={}, + from_="guessit", + raw={ + k: [x.value if x.value is int else str(x.value) for x in v] + for k, v in raw.items() + }, + ) + span.set_attribute("video.name", guess.title) - return Video( - path=path, - rendering=sha256(rendering_path.encode()).hexdigest(), - part=cast(int, part.value) if part else None, - version=cast(int, version.value) if version else 1, - guess=guess, - ) + for step in pipeline: + try: + guess = await step(path, guess) + except Exception as e: + logger.error("Couldn't run %s.", step.__name__, exc_info=e) + + return Video( + path=path, + rendering=sha256(rendering_path.encode()).hexdigest(), + part=cast(int, part.value) if part else None, + version=cast(int, version.value) if version else 1, + guess=guess, + ) if __name__ == "__main__":