9 Commits

Author SHA1 Message Date
acelinkio
12fe7c157f .github remove autosync references + fix whitespace (#1198) 2025-12-02 09:26:04 +01:00
c29ad99ca0 Fix pg admin password (#1186) 2025-12-02 09:22:49 +01:00
acelinkio
a99f29074c scanner: adding the probes back (#1197) 2025-12-01 18:30:23 -08:00
Arlan Lloyd
f449a0878a adding the probes back 2025-12-02 02:26:34 +00:00
acelinkio
097985ab6d scanner: refactor otel integration (#1194) 2025-12-01 23:50:28 +01:00
11c300ecf7 Add status api to get scanner's status (#1195) 2025-12-01 20:18:21 +01:00
1e975ce238 Set null not distinct in scanner request constraint 2025-12-01 20:15:57 +01:00
b39fa4262d Add status api to get scanner's status 2025-12-01 20:04:16 +01:00
d7699389bc Fix missing kid in apikeys jwt 2025-12-01 20:04:16 +01:00
20 changed files with 208 additions and 112 deletions

View File

@@ -38,7 +38,7 @@ PUBLIC_URL=http://localhost:8901
# Set `verified` to true if you don't wanna manually verify users.
EXTRA_CLAIMS='{"permissions": ["core.read", "core.play"], "verified": false}'
# This is the permissions of the first user (aka the first user is admin)
FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "apikeys.read", "apikeys.write", "users.delete", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}'
FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "users.delete", "apikeys.read", "apikeys.write", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}'
# Guest (meaning unlogged in users) can be:
# unauthorized (they need to connect before doing anything)

View File

@@ -37,7 +37,7 @@ jobs:
run: biome ci .
scanner:
name: "Lint scanner/autosync"
name: "Lint scanner"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

View File

@@ -34,11 +34,6 @@ jobs:
label: scanner
image: ${{ github.repository_owner }}/kyoo_scanner
- context: ./autosync
dockerfile: Dockerfile
label: autosync
image: ${{ github.repository_owner }}/kyoo_autosync
- context: ./transcoder
dockerfile: Dockerfile
label: transcoder

2
.gitignore vendored
View File

@@ -7,6 +7,4 @@
log.html
output.xml
report.html
chart/charts
chart/Chart.lock
tmp

View File

@@ -211,6 +211,7 @@ func (h *Handler) createApiJwt(apikey string) (string, error) {
Time: time.Now().UTC().Add(time.Hour),
}
jwt := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
jwt.Header["kid"] = h.config.JwtKid
t, err := jwt.SignedString(h.config.JwtPrivateKey)
if err != nil {
return "", err

2
chart/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
charts

6
chart/Chart.lock Normal file
View File

@@ -0,0 +1,6 @@
dependencies:
- name: postgres
repository: oci://registry-1.docker.io/cloudpirates
version: 0.12.4
digest: sha256:e486b44703c7a97eee25f7715ab040d197d79c41ea1c422ae009b1f68985f544
generated: "2025-12-01T20:17:25.152279487+01:00"

View File

@@ -12,4 +12,4 @@ dependencies:
- condition: postgres.enabled
name: postgres
repository: oci://registry-1.docker.io/cloudpirates
version: 0.12.0
version: 0.12.4

View File

@@ -469,7 +469,7 @@ postgres:
existingSecret: "{{ .Values.global.postgres.infra.existingSecret }}"
secretKeys:
# set the postgres user password to the same as our user
passwordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
adminPasswordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
initdb:
scripts:
kyoo_api.sql: |

View File

@@ -20,5 +20,5 @@ create table scanner.requests(
status scanner.request_status not null default 'pending',
started_at timestamptz,
created_at timestamptz not null default now()::timestamptz,
constraint unique_kty unique(kind, title, year)
constraint unique_kty unique nulls not distinct (kind, title, year)
);

View File

@@ -5,13 +5,15 @@ from fastapi import FastAPI
from scanner.client import KyooClient
from scanner.fsscan import FsScanner
from scanner.otel import instrument
from scanner.log import configure_logging
from scanner.otel import setup_otelproviders, instrument
from scanner.providers.composite import CompositeProvider
from scanner.providers.themoviedatabase import TheMovieDatabase
from scanner.requests import RequestCreator, RequestProcessor
from .database import get_db, init_pool, migrate
from .routers.routes import router
from .routers.health import router as health_router
@asynccontextmanager
@@ -68,4 +70,7 @@ app = FastAPI(
lifespan=lifespan,
)
app.include_router(router)
app.include_router(health_router)
configure_logging()
setup_otelproviders()
instrument(app)

32
scanner/scanner/log.py Normal file
View File

@@ -0,0 +1,32 @@
import logging
import os
import sys
from opentelemetry.sdk._logs import LoggingHandler
def configure_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
# Add stdout handler
stdout_handler = logging.StreamHandler(sys.stdout)
# set logging level via STDOUT_LOG_LEVEL env var or default to INFO
stdout_handler.setLevel(
getattr(logging, os.getenv("STDOUT_LOG_LEVEL", "INFO").upper())
)
stdout_handler.setFormatter(
logging.Formatter(
fmt="[{levelname}][{name}] {message}",
style="{",
)
)
root_logger.addHandler(stdout_handler)
# Add OpenTelemetry handler
# set logging level via OTEL_LOG_LEVEL env var
# https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration
root_logger.addHandler(LoggingHandler())

View File

@@ -1,4 +1,5 @@
from __future__ import annotations
from datetime import datetime
from typing import Literal
from pydantic import Field
@@ -18,3 +19,16 @@ class Request(Model, extra="allow"):
class Video(Model):
id: str
episodes: list[Guess.Episode]
class RequestRet(Model):
id: str
kind: Literal["episode", "movie"]
title: str
year: int | None
status: Literal[
"pending",
"running",
"failed",
]
started_at: datetime | None

View File

@@ -1,81 +1,77 @@
import logging
import os
import sys
from fastapi import FastAPI
from opentelemetry import metrics, trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter as GrpcLogExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter as GrpcMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
OTLPLogExporter as HttpLogExporter,
)
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter as HttpMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HttpSpanExporter,
)
from opentelemetry import trace, metrics, _logs
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
logger = logging.getLogger(__name__)
def setup_otelproviders() -> tuple[object, object, object]:
import os
if not (os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "").strip()):
logger.info(
"OTEL_EXPORTER_OTLP_ENDPOINT not specified, skipping otel provider setup."
)
return None, None, None
# choose exporters (grpc vs http) ...
if os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").lower().strip() == "grpc":
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
logger.info("Using gRPC libs for OpenTelemetry exporter.")
else:
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
logger.info("Using HTTP libs for OpenTelemetry exporter.")
resource = Resource.create(
{"service.name": os.getenv("OTEL_SERVICE_NAME", "kyoo.scanner")}
)
# Traces
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
# Metrics
meter_provider = MeterProvider(
resource=resource,
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())],
)
metrics.set_meter_provider(meter_provider)
# Logs — install logger provider + processor/exporter
logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter()))
_logs.set_logger_provider(logger_provider)
return tracer_provider, meter_provider, logger_provider
def instrument(app: FastAPI):
proto = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
resource = Resource.create(attributes={SERVICE_NAME: "kyoo.scanner"})
provider = LoggerProvider(resource=resource)
provider.add_log_record_processor(
BatchLogRecordProcessor(
HttpLogExporter() if proto == "http/protobuf" else GrpcLogExporter()
)
)
set_logger_provider(provider)
logging.basicConfig(
handlers=[
LoggingHandler(level=logging.DEBUG, logger_provider=provider),
logging.StreamHandler(sys.stdout),
],
level=logging.DEBUG,
)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
HttpSpanExporter() if proto == "http/protobuf" else GrpcSpanExporter()
)
)
trace.set_tracer_provider(provider)
provider = MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
HttpMetricExporter()
if proto == "http/protobuf"
else GrpcMetricExporter()
)
],
resource=resource,
)
metrics.set_meter_provider(provider)
FastAPIInstrumentor.instrument_app(
app,
http_capture_headers_server_request=[".*"],

View File

@@ -0,0 +1,15 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/health")
def get_health():
return {"status": "healthy"}
@router.get("/ready")
def get_ready():
# child spans (`select 1` & db connection reset) was still logged,
# since i don't really wanna deal with it, let's just do that.
return {"status": "healthy"}

View File

@@ -1,9 +1,9 @@
from typing import Annotated
from typing import Annotated, Literal
from asyncpg import Connection
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Security
from fastapi import APIRouter, BackgroundTasks, Depends, Security
from scanner.database import get_db_fapi
from scanner.models.request import RequestRet
from scanner.status import StatusService
from ..fsscan import create_scanner
from ..jwt import validate_bearer
@@ -11,6 +11,19 @@ from ..jwt import validate_bearer
router = APIRouter()
@router.get("/scan")
async def get_scan_status(
svc: Annotated[StatusService, Depends(StatusService.create)],
_: Annotated[None, Security(validate_bearer, scopes=["scanner.trigger"])],
status: Literal["pending", "running", "failed"] | None = None,
) -> list[RequestRet]:
"""
Get scan status, know what tasks are running, pending or failed.
"""
return await svc.list_requests(status=status)
@router.put(
"/scan",
status_code=204,
@@ -29,25 +42,3 @@ async def trigger_scan(
await scanner.scan()
tasks.add_task(run)
@router.get("/health")
def get_health():
return {"status": "healthy"}
@router.get("/ready")
def get_ready():
# child spans (`select 1` & db connection reset) was still logged,
# since i don't really wanna deal with it, let's just do that.
return {"status": "healthy"}
# async def get_ready(db: Annotated[Connection, Depends(get_db_fapi)]):
# try:
# _ = await db.execute("select 1")
# return {"status": "healthy", "database": "healthy"}
# except Exception as e:
# raise HTTPException(
# status_code=500, detail={"status": "unhealthy", "database": str(e)}
# )

41
scanner/scanner/status.py Normal file
View File

@@ -0,0 +1,41 @@
from typing import Literal
from asyncpg import Connection
from pydantic import TypeAdapter
from scanner.database import get_db
from .models.request import RequestRet
class StatusService:
def __init__(self, database: Connection):
self._database = database
@classmethod
async def create(cls):
async with get_db() as db:
yield StatusService(db)
async def list_requests(
self, *, status: Literal["pending", "running", "failed"] | None = None
) -> list[RequestRet]:
ret = await self._database.fetch(
f"""
select
pk::text as id,
kind,
title,
year,
status,
started_at
from
scanner.requests
order by
started_at,
pk
{"where status = $1" if status is not None else ""}
""",
*([status] if status is not None else []),
)
return TypeAdapter(list[RequestRet]).validate_python(ret)