From 097985ab6d1d1891258a72ac3998cc5324b96cab Mon Sep 17 00:00:00 2001 From: acelinkio <31336038+acelinkio@users.noreply.github.com> Date: Mon, 1 Dec 2025 14:50:28 -0800 Subject: [PATCH] scanner: refactor otel integration (#1194) --- scanner/scanner/__init__.py | 5 +- scanner/scanner/log.py | 32 +++++++++ scanner/scanner/otel.py | 136 +++++++++++++++++------------------- 3 files changed, 102 insertions(+), 71 deletions(-) create mode 100644 scanner/scanner/log.py diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index e50accd9..bc26c521 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -5,7 +5,8 @@ from fastapi import FastAPI from scanner.client import KyooClient from scanner.fsscan import FsScanner -from scanner.otel import instrument +from scanner.log import configure_logging +from scanner.otel import setup_otelproviders, instrument from scanner.providers.composite import CompositeProvider from scanner.providers.themoviedatabase import TheMovieDatabase from scanner.requests import RequestCreator, RequestProcessor @@ -68,4 +69,6 @@ app = FastAPI( lifespan=lifespan, ) app.include_router(router) +configure_logging() +setup_otelproviders() instrument(app) diff --git a/scanner/scanner/log.py b/scanner/scanner/log.py new file mode 100644 index 00000000..02011720 --- /dev/null +++ b/scanner/scanner/log.py @@ -0,0 +1,32 @@ +import logging +import os +import sys + +from opentelemetry.sdk._logs import LoggingHandler + + +def configure_logging(): + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + + logging.getLogger("watchfiles").setLevel(logging.WARNING) + logging.getLogger("rebulk").setLevel(logging.WARNING) + + # Add stdout handler + stdout_handler = logging.StreamHandler(sys.stdout) + # set logging level via STDOUT_LOG_LEVEL env var or default to INFO + stdout_handler.setLevel( + getattr(logging, os.getenv("STDOUT_LOG_LEVEL", "INFO").upper()) + ) + stdout_handler.setFormatter( + logging.Formatter( + fmt="[{levelname}][{name}] {message}", + style="{", + ) + ) + root_logger.addHandler(stdout_handler) + + # Add OpenTelemetry handler + # set logging level via OTEL_LOG_LEVEL env var + # https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#general-sdk-configuration + root_logger.addHandler(LoggingHandler()) diff --git a/scanner/scanner/otel.py b/scanner/scanner/otel.py index 41e39a87..dc689659 100644 --- a/scanner/scanner/otel.py +++ b/scanner/scanner/otel.py @@ -1,81 +1,77 @@ import logging -import os -import sys - from fastapi import FastAPI -from opentelemetry import metrics, trace -from opentelemetry._logs import set_logger_provider -from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( - OTLPLogExporter as GrpcLogExporter, -) -from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( - OTLPMetricExporter as GrpcMetricExporter, -) -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter as GrpcSpanExporter, -) -from opentelemetry.exporter.otlp.proto.http._log_exporter import ( - OTLPLogExporter as HttpLogExporter, -) -from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( - OTLPMetricExporter as HttpMetricExporter, -) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter as HttpSpanExporter, -) +from opentelemetry import trace, metrics, _logs +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.resources import Resource from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor + +logger = logging.getLogger(__name__) + + +def setup_otelproviders() -> tuple[object, object, object]: + import os + + if not (os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "").strip()): + logger.info( + "OTEL_EXPORTER_OTLP_ENDPOINT not specified, skipping otel provider setup." + ) + return None, None, None + + # choose exporters (grpc vs http) ... + if os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "").lower().strip() == "grpc": + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, + ) + + logger.info("Using gRPC libs for OpenTelemetry exporter.") + + else: + from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, + ) + + logger.info("Using HTTP libs for OpenTelemetry exporter.") + + resource = Resource.create( + {"service.name": os.getenv("OTEL_SERVICE_NAME", "kyoo.scanner")} + ) + + # Traces + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(tracer_provider) + + # Metrics + meter_provider = MeterProvider( + resource=resource, + metric_readers=[PeriodicExportingMetricReader(OTLPMetricExporter())], + ) + metrics.set_meter_provider(meter_provider) + + # Logs — install logger provider + processor/exporter + logger_provider = LoggerProvider(resource=resource) + logger_provider.add_log_record_processor(BatchLogRecordProcessor(OTLPLogExporter())) + _logs.set_logger_provider(logger_provider) + + return tracer_provider, meter_provider, logger_provider def instrument(app: FastAPI): - proto = os.getenv("OTEL_EXPORTER_OTLP_PROTOCOL", "http/protobuf") - resource = Resource.create(attributes={SERVICE_NAME: "kyoo.scanner"}) - - provider = LoggerProvider(resource=resource) - provider.add_log_record_processor( - BatchLogRecordProcessor( - HttpLogExporter() if proto == "http/protobuf" else GrpcLogExporter() - ) - ) - set_logger_provider(provider) - logging.basicConfig( - handlers=[ - LoggingHandler(level=logging.DEBUG, logger_provider=provider), - logging.StreamHandler(sys.stdout), - ], - level=logging.DEBUG, - ) - logging.getLogger("watchfiles").setLevel(logging.WARNING) - logging.getLogger("rebulk").setLevel(logging.WARNING) - - provider = TracerProvider(resource=resource) - provider.add_span_processor( - BatchSpanProcessor( - HttpSpanExporter() if proto == "http/protobuf" else GrpcSpanExporter() - ) - ) - trace.set_tracer_provider(provider) - - provider = MeterProvider( - metric_readers=[ - PeriodicExportingMetricReader( - HttpMetricExporter() - if proto == "http/protobuf" - else GrpcMetricExporter() - ) - ], - resource=resource, - ) - metrics.set_meter_provider(provider) - FastAPIInstrumentor.instrument_app( app, http_capture_headers_server_request=[".*"],