Loading...
Loading...
Implements OpenTelemetry (OTEL) logging with trace context correlation and structured logging. Use when setting up production logging with OTEL exporters, structlog/loguru integration, trace context propagation, and comprehensive test patterns. Covers Python implementations for FastAPI, Kafka consumers, and background jobs. Includes OTLP, Jaeger, and console exporters.
npx skill4agent add dawiddutoit/custom-claude otel-logging-patternsfrom app.core.monitoring.otel_logger import initialize_otel_logger
# Call once at app startup
initialize_otel_logger(
log_level="INFO",
enable_console=True,
enable_otlp=True,
otlp_endpoint="localhost:4317"
)from app.core.monitoring.otel_logger import logger, get_tracer
# Module-level initialization
logger = logger(__name__)
tracer = get_tracer(__name__)from app.core.monitoring.otel_logger import trace_span, logger
logger = logger(__name__)
# Logs automatically include trace_id and span_id
with trace_span("process_order", order_id="12345") as span:
logger.info("processing_order", order_id="12345")
# Do work...
logger.info("order_processed", result_count=5)app/core/monitoring/otel_logger.py# app/shared/otel_config.py
from opentelemetry import logs
from opentelemetry.sdk.logs import LoggerProvider
from opentelemetry.sdk.logs.export import ConsoleLogExporter, SimpleLogRecordExporter
from opentelemetry.sdk.resources import Resource
def setup_console_logging(service_name: str) -> LoggerProvider:
"""Set up OTEL logging with console exporter."""
resource = Resource.create({"service.name": service_name})
logger_provider = LoggerProvider(resource=resource)
exporter = ConsoleLogExporter()
processor = SimpleLogRecordExporter(exporter)
logger_provider.add_log_record_processor(processor)
logs.set_logger_provider(logger_provider)
return logger_provider# app/shared/logging_setup.py
import structlog
from opentelemetry.instrumentation.logging import LoggingInstrumentor
def setup_structlog() -> None:
"""Configure structlog with OTEL trace context integration."""
# Enable OTEL logging instrumentation
LoggingInstrumentor().instrument()
# Configure structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.ExceptionRenderer(),
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.logging.LoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str):
"""Get a logger instance with trace context support."""
return structlog.logger(name)# app/shared/observability.py
from contextvars import ContextVar
from opentelemetry import trace
# Context variables for request tracking
request_id_var: ContextVar[str | None] = ContextVar("request_id", default=None)
user_id_var: ContextVar[str | None] = ContextVar("user_id", default=None)
class ObservabilityContext:
"""Manage observability context (trace IDs, request IDs, user IDs)."""
@staticmethod
def set_request_id(request_id: str) -> None:
"""Set request ID in context."""
request_id_var.set(request_id)
@staticmethod
def get_tracer(name: str):
"""Get tracer instance."""
return trace.get_tracer(name)
@staticmethod
def set_span_attribute(key: str, value: any) -> None:
"""Set attribute on current span."""
span = trace.get_current_span()
if span.is_recording():
span.set_attribute(key, value)# app/config.py
from pydantic import Field
from pydantic_settings import BaseSettings
class Config(BaseSettings):
"""Configuration for OTEL logging."""
otel_enabled: bool = Field(default=True)
otel_exporter_type: str = Field(default="console") # 'otlp', 'jaeger', or 'console'
otel_otlp_endpoint: str = Field(default="localhost:4317")
otel_jaeger_host: str = Field(default="localhost")
otel_jaeger_port: int = Field(default=6831)# main.py
from app.config import Config
from app.shared.logging_setup import setup_structlog, get_logger
from app.shared.otel_config import OTELConfig
async def main() -> None:
"""Main entry point."""
config = Config()
setup_structlog()
if config.otel_enabled:
otel_config = OTELConfig(
service_name="my-service",
exporter_type=config.otel_exporter_type,
otlp_endpoint=config.otel_otlp_endpoint,
)
otel_config.setup_logging()
otel_config.setup_tracing()
logger = get_logger(__name__)
logger.info("service_started")# app/use_cases/extract_orders.py
from opentelemetry import trace
from app.shared.logging_setup import get_logger
class ExtractOrdersUseCase:
"""Use case with observability."""
def __init__(self, gateway, publisher) -> None:
self.gateway = gateway
self.publisher = publisher
self.logger = get_logger(__name__)
self.tracer = trace.get_tracer(__name__)
async def execute(self) -> int:
"""Execute with tracing."""
with self.tracer.start_as_current_span("extract_orders") as span:
self.logger.info("starting_extraction")
try:
orders = await self.gateway.fetch_all_orders()
self.logger.info("orders_fetched", count=len(orders))
for order in orders:
await self.publisher.publish(order)
span.set_attribute("orders_processed", len(orders))
self.logger.info("extraction_completed", total=len(orders))
return len(orders)
except Exception as e:
self.logger.error("extraction_failed", error=str(e))
span.record_exception(e)
raise# app/error_handlers.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app.shared.logging_setup import get_logger
from app.shared.observability import ObservabilityContext
logger = get_logger(__name__)
async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Global exception handler with OTEL logging."""
request_id = ObservabilityContext.get_request_id()
logger.error(
"unhandled_exception",
error=str(exc),
error_type=type(exc).__name__,
request_id=request_id,
path=request.url.path,
method=request.method,
exc_info=True, # Include full stack trace
)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error", "request_id": request_id},
)
def setup_error_handlers(app: FastAPI) -> None:
"""Register error handlers."""
app.add_exception_handler(Exception, global_exception_handler)opentelemetry-api>=1.22.0opentelemetry-sdk>=1.22.0opentelemetry-exporter-otlp>=0.43b0opentelemetry-instrumentation-logging>=0.43b0structlog>=23.2.0pydantic>=2.5.0from opentelemetry import trace
from app.shared.logging_setup import get_logger
logger = get_logger(__name__)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("process_payment"):
logger.info("payment_started", customer_id="cust-123")
logger.info("payment_completed", amount=99.99)
# All logs have same trace_id and span_idfrom app.shared.observability import ObservabilityContext
ObservabilityContext.set_request_id("req-12345")
ObservabilityContext.set_user_id("user-456")
logger.info("user_action", action="login")
# Log includes request_id and user_id automaticallyimport pytest
from opentelemetry import logs
from opentelemetry.sdk.logs import LoggerProvider
def test_logging_configuration():
"""Test OTEL logging provider setup."""
logger_provider = LoggerProvider()
assert logger_provider is not None
logs.set_logger_provider(logger_provider)
assert logs.get_logger_provider() == logger_provider@pytest.fixture
def in_memory_log_exporter():
"""In-memory exporter for testing."""
class InMemoryExporter:
def __init__(self):
self.records = []
def emit(self, log_records):
self.records.extend(log_records)
return InMemoryExporter()
def test_trace_context_in_logs(in_memory_log_exporter):
"""Verify trace context is included in logs."""
# Setup logger and tracer
# Log within span
# Verify trace context in exported logs
passLoggingInstrumentor().instrument()BatchLogRecordExporterSimpleLogRecordExportersetup_structlog()| Resource | Purpose |
|---|---|
| 10+ complete examples covering FastAPI, Kafka, background jobs, testing |
| Performance tuning, custom exporters, trace propagation details |