mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-12-06 06:36:25 +00:00
Compare commits
9 Commits
renovate/p
...
12fe7c157f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12fe7c157f | ||
| c29ad99ca0 | |||
|
|
a99f29074c | ||
|
|
f449a0878a | ||
|
|
097985ab6d | ||
| 11c300ecf7 | |||
| 1e975ce238 | |||
| b39fa4262d | |||
| d7699389bc |
@@ -38,7 +38,7 @@ PUBLIC_URL=http://localhost:8901
|
||||
# Set `verified` to true if you don't wanna manually verify users.
|
||||
EXTRA_CLAIMS='{"permissions": ["core.read", "core.play"], "verified": false}'
|
||||
# This is the permissions of the first user (aka the first user is admin)
|
||||
FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "apikeys.read", "apikeys.write", "users.delete", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}'
|
||||
FIRST_USER_CLAIMS='{"permissions": ["users.read", "users.write", "users.delete", "apikeys.read", "apikeys.write", "core.read", "core.write", "core.play", "scanner.trigger"], "verified": true}'
|
||||
|
||||
# Guest (meaning unlogged in users) can be:
|
||||
# unauthorized (they need to connect before doing anything)
|
||||
|
||||
2
.github/workflows/auth-hurl.yml
vendored
2
.github/workflows/auth-hurl.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
postgres:
|
||||
image: postgres:15
|
||||
ports:
|
||||
- "5432:5432"
|
||||
- "5432:5432"
|
||||
env:
|
||||
POSTGRES_USER: kyoo
|
||||
POSTGRES_PASSWORD: password
|
||||
|
||||
2
.github/workflows/coding-style.yml
vendored
2
.github/workflows/coding-style.yml
vendored
@@ -37,7 +37,7 @@ jobs:
|
||||
run: biome ci .
|
||||
|
||||
scanner:
|
||||
name: "Lint scanner/autosync"
|
||||
name: "Lint scanner"
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v6
|
||||
|
||||
5
.github/workflows/docker.yml
vendored
5
.github/workflows/docker.yml
vendored
@@ -34,11 +34,6 @@ jobs:
|
||||
label: scanner
|
||||
image: ${{ github.repository_owner }}/kyoo_scanner
|
||||
|
||||
- context: ./autosync
|
||||
dockerfile: Dockerfile
|
||||
label: autosync
|
||||
image: ${{ github.repository_owner }}/kyoo_autosync
|
||||
|
||||
- context: ./transcoder
|
||||
dockerfile: Dockerfile
|
||||
label: transcoder
|
||||
|
||||
2
.github/workflows/native-build.yml
vendored
2
.github/workflows/native-build.yml
vendored
@@ -2,7 +2,7 @@ name: Native build
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- v*
|
||||
- v*
|
||||
|
||||
jobs:
|
||||
update:
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -2,7 +2,7 @@ name: Release
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- v*
|
||||
- v*
|
||||
|
||||
jobs:
|
||||
update:
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -7,6 +7,4 @@
|
||||
log.html
|
||||
output.xml
|
||||
report.html
|
||||
chart/charts
|
||||
chart/Chart.lock
|
||||
tmp
|
||||
|
||||
@@ -211,6 +211,7 @@ func (h *Handler) createApiJwt(apikey string) (string, error) {
|
||||
Time: time.Now().UTC().Add(time.Hour),
|
||||
}
|
||||
jwt := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
|
||||
jwt.Header["kid"] = h.config.JwtKid
|
||||
t, err := jwt.SignedString(h.config.JwtPrivateKey)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
||||
2
chart/.gitignore
vendored
Normal file
2
chart/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
charts
|
||||
|
||||
6
chart/Chart.lock
Normal file
6
chart/Chart.lock
Normal file
@@ -0,0 +1,6 @@
|
||||
dependencies:
|
||||
- name: postgres
|
||||
repository: oci://registry-1.docker.io/cloudpirates
|
||||
version: 0.12.4
|
||||
digest: sha256:e486b44703c7a97eee25f7715ab040d197d79c41ea1c422ae009b1f68985f544
|
||||
generated: "2025-12-01T20:17:25.152279487+01:00"
|
||||
@@ -12,4 +12,4 @@ dependencies:
|
||||
- condition: postgres.enabled
|
||||
name: postgres
|
||||
repository: oci://registry-1.docker.io/cloudpirates
|
||||
version: 0.12.0
|
||||
version: 0.12.4
|
||||
|
||||
@@ -469,7 +469,7 @@ postgres:
|
||||
existingSecret: "{{ .Values.global.postgres.infra.existingSecret }}"
|
||||
secretKeys:
|
||||
# set the postgres user password to the same as our user
|
||||
passwordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
|
||||
adminPasswordKey: "{{ .Values.global.postgres.infra.passwordKey }}"
|
||||
initdb:
|
||||
scripts:
|
||||
kyoo_api.sql: |
|
||||
|
||||
@@ -20,5 +20,5 @@ create table scanner.requests(
|
||||
status scanner.request_status not null default 'pending',
|
||||
started_at timestamptz,
|
||||
created_at timestamptz not null default now()::timestamptz,
|
||||
constraint unique_kty unique(kind, title, year)
|
||||
constraint unique_kty unique nulls not distinct (kind, title, year)
|
||||
);
|
||||
|
||||
@@ -5,13 +5,15 @@ from fastapi import FastAPI
|
||||
|
||||
from scanner.client import KyooClient
|
||||
from scanner.fsscan import FsScanner
|
||||
from scanner.otel import instrument
|
||||
from scanner.log import configure_logging
|
||||
from scanner.otel import setup_otelproviders, instrument
|
||||
from scanner.providers.composite import CompositeProvider
|
||||
from scanner.providers.themoviedatabase import TheMovieDatabase
|
||||
from scanner.requests import RequestCreator, RequestProcessor
|
||||
|
||||
from .database import get_db, init_pool, migrate
|
||||
from .routers.routes import router
|
||||
from .routers.health import router as health_router
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -68,4 +70,7 @@ app = FastAPI(
|
||||
lifespan=lifespan,
|
||||
)
|
||||
app.include_router(router)
|
||||
app.include_router(health_router)
|
||||
configure_logging()
|
||||
setup_otelproviders()
|
||||
instrument(app)
|
||||
|
||||
32
scanner/scanner/log.py
Normal file
32
scanner/scanner/log.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from opentelemetry.sdk._logs import LoggingHandler
|
||||
|
||||
|
||||
def configure_logging():
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.DEBUG)
|
||||
|
||||
logging.getLogger("watchfiles").setLevel(logging.WARNING)
|
||||
logging.getLogger("rebulk").setLevel(logging.WARNING)
|
||||
|
||||
# Add stdout handler
|
||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||
# set logging level via STDOUT_LOG_LEVEL env var or default to INFO
|
||||
stdout_handler.setLevel(
|
||||
getattr(logging, os.getenv("STDOUT_LOG_LEVEL", "INFO").upper())
|
||||
)
|
||||
stdout_handler.setFormatter(
|
||||
logging.Formatter(
|
||||
fmt="[{levelname}][{name}] {message}",
|
||||
style="{",
|
||||
)
|
||||
)
|
||||
root_logger.addHandler(stdout_handler)
|
||||
|
||||
# Add OpenTelemetry handler
|
||||
# set logging level via OTEL_LOG_LEVEL env var
|
||||
# https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration
|
||||
root_logger.addHandler(LoggingHandler())
|
||||
@@ -1,4 +1,5 @@
|
||||
from __future__ import annotations
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import Field
|
||||
@@ -18,3 +19,16 @@ class Request(Model, extra="allow"):
|
||||
class Video(Model):
|
||||
id: str
|
||||
episodes: list[Guess.Episode]
|
||||
|
||||
|
||||
class RequestRet(Model):
|
||||
id: str
|
||||
kind: Literal["episode", "movie"]
|
||||
title: str
|
||||
year: int | None
|
||||
status: Literal[
|
||||
"pending",
|
||||
"running",
|
||||
"failed",
|
||||
]
|
||||
started_at: datetime | None
|
||||
|
||||
@@ -1,81 +1,77 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from fastapi import FastAPI
|
||||
from opentelemetry import metrics, trace
|
||||
from opentelemetry._logs import set_logger_provider
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
|
||||
OTLPLogExporter as GrpcLogExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||
OTLPMetricExporter as GrpcMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
||||
OTLPSpanExporter as GrpcSpanExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
|
||||
OTLPLogExporter as HttpLogExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter as HttpMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter as HttpSpanExporter,
|
||||
)
|
||||
from opentelemetry import trace, metrics, _logs
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
from opentelemetry.sdk._logs import LoggerProvider
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
|
||||
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
|
||||
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
||||
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
|
||||
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_otelproviders() -> tuple[object, object, object]:
|
||||
import os
|
||||
|
||||
if not (os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "").strip()):
|
||||
logger.info(
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT not specified, skipping otel provider setup."
|
||||
)
|
||||
return None, None, None
|
||||
|
||||
# choose exporters (grpc vs http) ...
|
||||
if os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").lower().strip() == "grpc":
|
||||
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
|
||||
OTLPMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
|
||||
logger.info("Using gRPC libs for OpenTelemetry exporter.")
|
||||
|
||||
else:
|
||||
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
|
||||
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
|
||||
OTLPMetricExporter,
|
||||
)
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
|
||||
logger.info("Using HTTP libs for OpenTelemetry exporter.")
|
||||
|
||||
resource = Resource.create(
|
||||
{"service.name": os.getenv("OTEL_SERVICE_NAME", "kyoo.scanner")}
|
||||
)
|
||||
|
||||
# Traces
|
||||
tracer_provider = TracerProvider(resource=resource)
|
||||
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
|
||||
trace.set_tracer_provider(tracer_provider)
|
||||
|
||||
# Metrics
|
||||
meter_provider = MeterProvider(
|
||||
resource=resource,
|
||||
metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())],
|
||||
)
|
||||
metrics.set_meter_provider(meter_provider)
|
||||
|
||||
# Logs — install logger provider + processor/exporter
|
||||
logger_provider = LoggerProvider(resource=resource)
|
||||
logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter()))
|
||||
_logs.set_logger_provider(logger_provider)
|
||||
|
||||
return tracer_provider, meter_provider, logger_provider
|
||||
|
||||
|
||||
def instrument(app: FastAPI):
|
||||
proto = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf")
|
||||
resource = Resource.create(attributes={SERVICE_NAME: "kyoo.scanner"})
|
||||
|
||||
provider = LoggerProvider(resource=resource)
|
||||
provider.add_log_record_processor(
|
||||
BatchLogRecordProcessor(
|
||||
HttpLogExporter() if proto == "http/protobuf" else GrpcLogExporter()
|
||||
)
|
||||
)
|
||||
set_logger_provider(provider)
|
||||
logging.basicConfig(
|
||||
handlers=[
|
||||
LoggingHandler(level=logging.DEBUG, logger_provider=provider),
|
||||
logging.StreamHandler(sys.stdout),
|
||||
],
|
||||
level=logging.DEBUG,
|
||||
)
|
||||
logging.getLogger("watchfiles").setLevel(logging.WARNING)
|
||||
logging.getLogger("rebulk").setLevel(logging.WARNING)
|
||||
|
||||
provider = TracerProvider(resource=resource)
|
||||
provider.add_span_processor(
|
||||
BatchSpanProcessor(
|
||||
HttpSpanExporter() if proto == "http/protobuf" else GrpcSpanExporter()
|
||||
)
|
||||
)
|
||||
trace.set_tracer_provider(provider)
|
||||
|
||||
provider = MeterProvider(
|
||||
metric_readers=[
|
||||
PeriodicExportingMetricReader(
|
||||
HttpMetricExporter()
|
||||
if proto == "http/protobuf"
|
||||
else GrpcMetricExporter()
|
||||
)
|
||||
],
|
||||
resource=resource,
|
||||
)
|
||||
metrics.set_meter_provider(provider)
|
||||
|
||||
FastAPIInstrumentor.instrument_app(
|
||||
app,
|
||||
http_capture_headers_server_request=[".*"],
|
||||
|
||||
15
scanner/scanner/routers/health.py
Normal file
15
scanner/scanner/routers/health.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from fastapi import APIRouter
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/health")
|
||||
def get_health():
|
||||
return {"status": "healthy"}
|
||||
|
||||
|
||||
@router.get("/ready")
|
||||
def get_ready():
|
||||
# child spans (`select 1` & db connection reset) was still logged,
|
||||
# since i don't really wanna deal with it, let's just do that.
|
||||
return {"status": "healthy"}
|
||||
@@ -1,9 +1,9 @@
|
||||
from typing import Annotated
|
||||
from typing import Annotated, Literal
|
||||
|
||||
from asyncpg import Connection
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Security
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, Security
|
||||
|
||||
from scanner.database import get_db_fapi
|
||||
from scanner.models.request import RequestRet
|
||||
from scanner.status import StatusService
|
||||
|
||||
from ..fsscan import create_scanner
|
||||
from ..jwt import validate_bearer
|
||||
@@ -11,6 +11,19 @@ from ..jwt import validate_bearer
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get("/scan")
|
||||
async def get_scan_status(
|
||||
svc: Annotated[StatusService, Depends(StatusService.create)],
|
||||
_: Annotated[None, Security(validate_bearer, scopes=["scanner.trigger"])],
|
||||
status: Literal["pending", "running", "failed"] | None = None,
|
||||
) -> list[RequestRet]:
|
||||
"""
|
||||
Get scan status, know what tasks are running, pending or failed.
|
||||
"""
|
||||
|
||||
return await svc.list_requests(status=status)
|
||||
|
||||
|
||||
@router.put(
|
||||
"/scan",
|
||||
status_code=204,
|
||||
@@ -29,25 +42,3 @@ async def trigger_scan(
|
||||
await scanner.scan()
|
||||
|
||||
tasks.add_task(run)
|
||||
|
||||
|
||||
@router.get("/health")
|
||||
def get_health():
|
||||
return {"status": "healthy"}
|
||||
|
||||
|
||||
@router.get("/ready")
|
||||
def get_ready():
|
||||
# child spans (`select 1` & db connection reset) was still logged,
|
||||
# since i don't really wanna deal with it, let's just do that.
|
||||
return {"status": "healthy"}
|
||||
|
||||
|
||||
# async def get_ready(db: Annotated[Connection, Depends(get_db_fapi)]):
|
||||
# try:
|
||||
# _ = await db.execute("select 1")
|
||||
# return {"status": "healthy", "database": "healthy"}
|
||||
# except Exception as e:
|
||||
# raise HTTPException(
|
||||
# status_code=500, detail={"status": "unhealthy", "database": str(e)}
|
||||
# )
|
||||
|
||||
41
scanner/scanner/status.py
Normal file
41
scanner/scanner/status.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from typing import Literal
|
||||
|
||||
from asyncpg import Connection
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from scanner.database import get_db
|
||||
|
||||
from .models.request import RequestRet
|
||||
|
||||
|
||||
class StatusService:
|
||||
def __init__(self, database: Connection):
|
||||
self._database = database
|
||||
|
||||
@classmethod
|
||||
async def create(cls):
|
||||
async with get_db() as db:
|
||||
yield StatusService(db)
|
||||
|
||||
async def list_requests(
|
||||
self, *, status: Literal["pending", "running", "failed"] | None = None
|
||||
) -> list[RequestRet]:
|
||||
ret = await self._database.fetch(
|
||||
f"""
|
||||
select
|
||||
pk::text as id,
|
||||
kind,
|
||||
title,
|
||||
year,
|
||||
status,
|
||||
started_at
|
||||
from
|
||||
scanner.requests
|
||||
order by
|
||||
started_at,
|
||||
pk
|
||||
{"where status = $1" if status is not None else ""}
|
||||
""",
|
||||
*([status] if status is not None else []),
|
||||
)
|
||||
return TypeAdapter(list[RequestRet]).validate_python(ret)
|
||||
Reference in New Issue
Block a user