9 Commits

Author SHA1 Message Date
acelinkio
12fe7c157f .github remove autosync references + fix whitespace (#1198) 2025-12-02 09:26:04 +01:00
c29ad99ca0 Fix pg admin password (#1186) 2025-12-02 09:22:49 +01:00
acelinkio
a99f29074c scanner: adding the probes back (#1197) 2025-12-01 18:30:23 -08:00
Arlan Lloyd
f449a0878a adding the probes back 2025-12-02 02:26:34 +00:00
acelinkio
097985ab6d scanner: refactor otel integration (#1194) 2025-12-01 23:50:28 +01:00
11c300ecf7 Add status api to get scanner's status (#1195) 2025-12-01 20:18:21 +01:00
1e975ce238 Set null not distinct in scanner request constraint 2025-12-01 20:15:57 +01:00
b39fa4262d Add status api to get scanner's status 2025-12-01 20:04:16 +01:00
d7699389bc Fix missing kid in apikeys jwt 2025-12-01 20:04:16 +01:00
20 changed files with 208 additions and 112 deletions

View File

@@ -38,7 +38,7 @@ PUBLIC_URL=http://localhost:8901
# Set `verified` to true if you don't wanna manually verify users. # Set `verified` to true if you don't wanna manually verify users.
EXTRA_CLAIMS='{"permissions": ["core.read", "core.play"], "verified": false}' EXTRA_CLAIMS='{"permissions": ["core.read", "core.play"], "verified": false}'
# This is the permissions of the first user (aka the first user is admin) # This is the permissions of the first user (aka the first user is admin)
FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "apikeys.read", "apikeys.write", "users.delete", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}' FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "users.delete", "apikeys.read", "apikeys.write", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}'
# Guest (meaning unlogged in users) can be: # Guest (meaning unlogged in users) can be:
# unauthorized (they need to connect before doing anything) # unauthorized (they need to connect before doing anything)

View File

@@ -15,7 +15,7 @@ jobs:
postgres: postgres:
image: postgres:15 image: postgres:15
ports: ports:
- "5432:5432" - "5432:5432"
env: env:
POSTGRES_USER: kyoo POSTGRES_USER: kyoo
POSTGRES_PASSWORD: password POSTGRES_PASSWORD: password

View File

@@ -37,7 +37,7 @@ jobs:
run: biome ci . run: biome ci .
scanner: scanner:
name: "Lint scanner/autosync" name: "Lint scanner"
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v6 - uses: actions/checkout@v6

View File

@@ -34,11 +34,6 @@ jobs:
label: scanner label: scanner
image: ${{ github.repository_owner }}/kyoo_scanner image: ${{ github.repository_owner }}/kyoo_scanner
- context: ./autosync
dockerfile: Dockerfile
label: autosync
image: ${{ github.repository_owner }}/kyoo_autosync
- context: ./transcoder - context: ./transcoder
dockerfile: Dockerfile dockerfile: Dockerfile
label: transcoder label: transcoder

View File

@@ -2,7 +2,7 @@ name: Native build
on: on:
push: push:
tags: tags:
- v* - v*
jobs: jobs:
update: update:

View File

@@ -2,7 +2,7 @@ name: Release
on: on:
push: push:
tags: tags:
- v* - v*
jobs: jobs:
update: update:

2
.gitignore vendored
View File

@@ -7,6 +7,4 @@
log.html log.html
output.xml output.xml
report.html report.html
chart/charts
chart/Chart.lock
tmp tmp

View File

@@ -211,6 +211,7 @@ func (h *Handler) createApiJwt(apikey string) (string, error) {
Time: time.Now().UTC().Add(time.Hour), Time: time.Now().UTC().Add(time.Hour),
} }
jwt := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) jwt := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
jwt.Header["kid"] = h.config.JwtKid
t, err := jwt.SignedString(h.config.JwtPrivateKey) t, err := jwt.SignedString(h.config.JwtPrivateKey)
if err != nil { if err != nil {
return "", err return "", err

2
chart/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
charts

6
chart/Chart.lock Normal file
View File

@@ -0,0 +1,6 @@
dependencies:
- name: postgres
repository: oci://registry-1.docker.io/cloudpirates
version: 0.12.4
digest: sha256:e486b44703c7a97eee25f7715ab040d197d79c41ea1c422ae009b1f68985f544
generated: "2025-12-01T20:17:25.152279487+01:00"

View File

@@ -12,4 +12,4 @@ dependencies:
- condition: postgres.enabled - condition: postgres.enabled
name: postgres name: postgres
repository: oci://registry-1.docker.io/cloudpirates repository: oci://registry-1.docker.io/cloudpirates
version: 0.12.0 version: 0.12.4

View File

@@ -469,7 +469,7 @@ postgres:
existingSecret: "{{ .Values.global.postgres.infra.existingSecret }}" existingSecret: "{{ .Values.global.postgres.infra.existingSecret }}"
secretKeys: secretKeys:
# set the postgres user password to the same as our user # set the postgres user password to the same as our user
passwordKey: "{{ .Values.global.postgres.infra.passwordKey }}" adminPasswordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
initdb: initdb:
scripts: scripts:
kyoo_api.sql: | kyoo_api.sql: |

View File

@@ -20,5 +20,5 @@ create table scanner.requests(
status scanner.request_status not null default 'pending', status scanner.request_status not null default 'pending',
started_at timestamptz, started_at timestamptz,
created_at timestamptz not null default now()::timestamptz, created_at timestamptz not null default now()::timestamptz,
constraint unique_kty unique(kind, title, year) constraint unique_kty unique nulls not distinct (kind, title, year)
); );

View File

@@ -5,13 +5,15 @@ from fastapi import FastAPI
from scanner.client import KyooClient from scanner.client import KyooClient
from scanner.fsscan import FsScanner from scanner.fsscan import FsScanner
from scanner.otel import instrument from scanner.log import configure_logging
from scanner.otel import setup_otelproviders, instrument
from scanner.providers.composite import CompositeProvider from scanner.providers.composite import CompositeProvider
from scanner.providers.themoviedatabase import TheMovieDatabase from scanner.providers.themoviedatabase import TheMovieDatabase
from scanner.requests import RequestCreator, RequestProcessor from scanner.requests import RequestCreator, RequestProcessor
from .database import get_db, init_pool, migrate from .database import get_db, init_pool, migrate
from .routers.routes import router from .routers.routes import router
from .routers.health import router as health_router
@asynccontextmanager @asynccontextmanager
@@ -68,4 +70,7 @@ app = FastAPI(
lifespan=lifespan, lifespan=lifespan,
) )
app.include_router(router) app.include_router(router)
app.include_router(health_router)
configure_logging()
setup_otelproviders()
instrument(app) instrument(app)

32
scanner/scanner/log.py Normal file
View File

@@ -0,0 +1,32 @@
import logging
import os
import sys
from opentelemetry.sdk._logs import LoggingHandler
def configure_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
# Add stdout handler
stdout_handler = logging.StreamHandler(sys.stdout)
# set logging level via STDOUT_LOG_LEVEL env var or default to INFO
stdout_handler.setLevel(
getattr(logging, os.getenv("STDOUT_LOG_LEVEL", "INFO").upper())
)
stdout_handler.setFormatter(
logging.Formatter(
fmt="[{levelname}][{name}] {message}",
style="{",
)
)
root_logger.addHandler(stdout_handler)
# Add OpenTelemetry handler
# set logging level via OTEL_LOG_LEVEL env var
# https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration
root_logger.addHandler(LoggingHandler())

View File

@@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from typing import Literal from typing import Literal
from pydantic import Field from pydantic import Field
@@ -18,3 +19,16 @@ class Request(Model, extra="allow"):
class Video(Model): class Video(Model):
id: str id: str
episodes: list[Guess.Episode] episodes: list[Guess.Episode]
class RequestRet(Model):
id: str
kind: Literal["episode", "movie"]
title: str
year: int | None
status: Literal[
"pending",
"running",
"failed",
]
started_at: datetime | None

View File

@@ -1,81 +1,77 @@
import logging import logging
import os
import sys
from fastapi import FastAPI from fastapi import FastAPI
from opentelemetry import metrics, trace from opentelemetry import trace, metrics, _logs
from opentelemetry._logs import set_logger_provider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( from opentelemetry.sdk.trace.export import BatchSpanProcessor
OTLPLogExporter as GrpcLogExporter, from opentelemetry.sdk.metrics import MeterProvider
) from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( from opentelemetry.sdk._logs import LoggerProvider
OTLPMetricExporter as GrpcMetricExporter, from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
) from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
OTLPLogExporter as HttpLogExporter,
)
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter as HttpMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HttpSpanExporter,
)
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor logger = logging.getLogger(__name__)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource def setup_otelproviders() -> tuple[object, object, object]:
from opentelemetry.sdk.trace import TracerProvider import os
from opentelemetry.sdk.trace.export import BatchSpanProcessor
if not (os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "").strip()):
logger.info(
"OTEL_EXPORTER_OTLP_ENDPOINT not specified, skipping otel provider setup."
)
return None, None, None
# choose exporters (grpc vs http) ...
if os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").lower().strip() == "grpc":
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
logger.info("Using gRPC libs for OpenTelemetry exporter.")
else:
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
logger.info("Using HTTP libs for OpenTelemetry exporter.")
resource = Resource.create(
{"service.name": os.getenv("OTEL_SERVICE_NAME", "kyoo.scanner")}
)
# Traces
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
# Metrics
meter_provider = MeterProvider(
resource=resource,
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())],
)
metrics.set_meter_provider(meter_provider)
# Logs — install logger provider + processor/exporter
logger_provider = LoggerProvider(resource=resource)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter()))
_logs.set_logger_provider(logger_provider)
return tracer_provider, meter_provider, logger_provider
def instrument(app: FastAPI): def instrument(app: FastAPI):
proto = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
resource = Resource.create(attributes={SERVICE_NAME: "kyoo.scanner"})
provider = LoggerProvider(resource=resource)
provider.add_log_record_processor(
BatchLogRecordProcessor(
HttpLogExporter() if proto == "http/protobuf" else GrpcLogExporter()
)
)
set_logger_provider(provider)
logging.basicConfig(
handlers=[
LoggingHandler(level=logging.DEBUG, logger_provider=provider),
logging.StreamHandler(sys.stdout),
],
level=logging.DEBUG,
)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
HttpSpanExporter() if proto == "http/protobuf" else GrpcSpanExporter()
)
)
trace.set_tracer_provider(provider)
provider = MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
HttpMetricExporter()
if proto == "http/protobuf"
else GrpcMetricExporter()
)
],
resource=resource,
)
metrics.set_meter_provider(provider)
FastAPIInstrumentor.instrument_app( FastAPIInstrumentor.instrument_app(
app, app,
http_capture_headers_server_request=[".*"], http_capture_headers_server_request=[".*"],

View File

@@ -0,0 +1,15 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/health")
def get_health():
return {"status": "healthy"}
@router.get("/ready")
def get_ready():
# child spans (`select 1` & db connection reset) was still logged,
# since i don't really wanna deal with it, let's just do that.
return {"status": "healthy"}

View File

@@ -1,9 +1,9 @@
from typing import Annotated from typing import Annotated, Literal
from asyncpg import Connection from fastapi import APIRouter, BackgroundTasks, Depends, Security
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Security
from scanner.database import get_db_fapi from scanner.models.request import RequestRet
from scanner.status import StatusService
from ..fsscan import create_scanner from ..fsscan import create_scanner
from ..jwt import validate_bearer from ..jwt import validate_bearer
@@ -11,6 +11,19 @@ from ..jwt import validate_bearer
router = APIRouter() router = APIRouter()
@router.get("/scan")
async def get_scan_status(
svc: Annotated[StatusService, Depends(StatusService.create)],
_: Annotated[None, Security(validate_bearer, scopes=["scanner.trigger"])],
status: Literal["pending", "running", "failed"] | None = None,
) -> list[RequestRet]:
"""
Get scan status, know what tasks are running, pending or failed.
"""
return await svc.list_requests(status=status)
@router.put( @router.put(
"/scan", "/scan",
status_code=204, status_code=204,
@@ -29,25 +42,3 @@ async def trigger_scan(
await scanner.scan() await scanner.scan()
tasks.add_task(run) tasks.add_task(run)
@router.get("/health")
def get_health():
return {"status": "healthy"}
@router.get("/ready")
def get_ready():
# child spans (`select 1` & db connection reset) was still logged,
# since i don't really wanna deal with it, let's just do that.
return {"status": "healthy"}
# async def get_ready(db: Annotated[Connection, Depends(get_db_fapi)]):
# try:
# _ = await db.execute("select 1")
# return {"status": "healthy", "database": "healthy"}
# except Exception as e:
# raise HTTPException(
# status_code=500, detail={"status": "unhealthy", "database": str(e)}
# )

41
scanner/scanner/status.py Normal file
View File

@@ -0,0 +1,41 @@
from typing import Literal
from asyncpg import Connection
from pydantic import TypeAdapter
from scanner.database import get_db
from .models.request import RequestRet
class StatusService:
def __init__(self, database: Connection):
self._database = database
@classmethod
async def create(cls):
async with get_db() as db:
yield StatusService(db)
async def list_requests(
self, *, status: Literal["pending", "running", "failed"] | None = None
) -> list[RequestRet]:
ret = await self._database.fetch(
f"""
select
pk::text as id,
kind,
title,
year,
status,
started_at
from
scanner.requests
order by
started_at,
pk
{"where status = $1" if status is not None else ""}
""",
*([status] if status is not None else []),
)
return TypeAdapter(list[RequestRet]).validate_python(ret)