logging-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Logging & Observability

日志与可观测性(Logging & Observability)

Structured logging with correlation IDs and context propagation.
基于correlation IDs和context propagation的结构化日志方案。

When to Use This Skill

何时使用该方案

  • Need request tracing across distributed services
  • Want structured JSON logs for aggregation
  • Require performance timing for operations
  • Building background workers with metrics
  • 需要在分布式服务中进行请求追踪
  • 希望使用结构化JSON日志进行聚合
  • 需要对操作进行性能计时
  • 构建带有指标收集的后台Worker

Core Concepts

核心概念

Effective observability requires:
  1. Correlation IDs - Track requests across services
  2. Structured logging - JSON format for machine parsing
  3. Context propagation - Carry metadata through async calls
  4. Performance timing - Measure operation durations
有效的可观测性需要满足以下几点:
  1. Correlation IDs - 跨服务追踪请求
  2. Structured logging - 采用JSON格式以便机器解析
  3. Context propagation - 在异步调用中传递元数据
  4. Performance timing - 测量操作执行时长

Implementation

实现方案

Python

Python 实现

python
import contextvars
import uuid
import json
import logging
import time
import functools
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Dict, Any, Callable, TypeVar
from contextlib import contextmanager
python
import contextvars
import uuid
import json
import logging
import time
import functools
import asyncio
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, Dict, Any, Callable, TypeVar
from contextlib import contextmanager

Context variable for request-scoped logging

Context variable for request-scoped logging

_logging_context: contextvars.ContextVar["LoggingContext"] = contextvars.ContextVar( "logging_context", default=None )
@dataclass class LoggingContext: """Request-scoped logging context.""" correlation_id: str = field(default_factory=lambda: str(uuid.uuid4())) user_id: Optional[str] = None request_path: Optional[str] = None request_method: Optional[str] = None service_name: str = "api" environment: str = "development" extra: Dict[str, Any] = field(default_factory=dict) start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> Dict[str, Any]:
    return {
        "correlation_id": self.correlation_id,
        "user_id": self.user_id,
        "request_path": self.request_path,
        "request_method": self.request_method,
        "service_name": self.service_name,
        "environment": self.environment,
        **self.extra,
    }

def with_extra(self, **kwargs) -> "LoggingContext":
    return LoggingContext(
        correlation_id=self.correlation_id,
        user_id=self.user_id,
        request_path=self.request_path,
        request_method=self.request_method,
        service_name=self.service_name,
        environment=self.environment,
        extra={**self.extra, **kwargs},
        start_time=self.start_time,
    )
def get_logging_context() -> Optional[LoggingContext]: return _logging_context.get()
def set_logging_context(context: LoggingContext) -> contextvars.Token: return _logging_context.set(context)
def reset_logging_context(token: contextvars.Token) -> None: _logging_context.reset(token)
class StructuredJsonFormatter(logging.Formatter): """JSON log formatter with context injection."""
def format(self, record: logging.LogRecord) -> str:
    log_entry: Dict[str, Any] = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
    }
    
    context = get_logging_context()
    if context:
        log_entry.update(context.to_dict())
    
    if record.exc_info:
        import traceback
        log_entry["exception"] = {
            "type": record.exc_info[0].__name__ if record.exc_info[0] else None,
            "message": str(record.exc_info[1]) if record.exc_info[1] else None,
            "traceback": traceback.format_exception(*record.exc_info),
        }
    
    # Add extra fields
    skip_keys = {
        "name", "msg", "args", "created", "filename", "funcName",
        "levelname", "levelno", "lineno", "module", "msecs",
        "pathname", "process", "processName", "relativeCreated",
        "stack_info", "exc_info", "exc_text", "thread", "threadName", "message",
    }
    for key, value in record.__dict__.items():
        if key not in skip_keys:
            log_entry[key] = value
    
    return json.dumps(log_entry, default=str)
def configure_logging( level: str = "INFO", service_name: str = "api", environment: str = "development", ) -> None: """Configure structured logging.""" formatter = StructuredJsonFormatter()
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))

for handler in root_logger.handlers[:]:
    root_logger.removeHandler(handler)

handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)

set_logging_context(LoggingContext(
    service_name=service_name,
    environment=environment,
))
class ContextLogger: """Logger wrapper that includes context automatically."""
def __init__(self, name: str):
    self._logger = logging.getLogger(name)

def _log(self, level: int, message: str, exc_info: bool = False, **kwargs: Any) -> None:
    context = get_logging_context()
    extra = {}
    if context:
        extra.update(context.to_dict())
    extra.update(kwargs)
    self._logger.log(level, message, exc_info=exc_info, extra=extra)

def debug(self, message: str, **kwargs: Any) -> None:
    self._log(logging.DEBUG, message, **kwargs)

def info(self, message: str, **kwargs: Any) -> None:
    self._log(logging.INFO, message, **kwargs)

def warning(self, message: str, **kwargs: Any) -> None:
    self._log(logging.WARNING, message, **kwargs)

def error(self, message: str, exc_info: bool = False, **kwargs: Any) -> None:
    self._log(logging.ERROR, message, exc_info=exc_info, **kwargs)

def exception(self, message: str, **kwargs: Any) -> None:
    self._log(logging.ERROR, message, exc_info=True, **kwargs)
def get_logger(name: str) -> ContextLogger: return ContextLogger(name)
_logging_context: contextvars.ContextVar["LoggingContext"] = contextvars.ContextVar( "logging_context", default=None )
@dataclass class LoggingContext: """Request-scoped logging context.""" correlation_id: str = field(default_factory=lambda: str(uuid.uuid4())) user_id: Optional[str] = None request_path: Optional[str] = None request_method: Optional[str] = None service_name: str = "api" environment: str = "development" extra: Dict[str, Any] = field(default_factory=dict) start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
def to_dict(self) -> Dict[str, Any]:
    return {
        "correlation_id": self.correlation_id,
        "user_id": self.user_id,
        "request_path": self.request_path,
        "request_method": self.request_method,
        "service_name": self.service_name,
        "environment": self.environment,
        **self.extra,
    }

def with_extra(self, **kwargs) -> "LoggingContext":
    return LoggingContext(
        correlation_id=self.correlation_id,
        user_id=self.user_id,
        request_path=self.request_path,
        request_method=self.request_method,
        service_name=self.service_name,
        environment=self.environment,
        extra={**self.extra, **kwargs},
        start_time=self.start_time,
    )
def get_logging_context() -> Optional[LoggingContext]: return _logging_context.get()
def set_logging_context(context: LoggingContext) -> contextvars.Token: return _logging_context.set(context)
def reset_logging_context(token: contextvars.Token) -> None: _logging_context.reset(token)
class StructuredJsonFormatter(logging.Formatter): """JSON log formatter with context injection."""
def format(self, record: logging.LogRecord) -> str:
    log_entry: Dict[str, Any] = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "level": record.levelname,
        "logger": record.name,
        "message": record.getMessage(),
    }
    
    context = get_logging_context()
    if context:
        log_entry.update(context.to_dict())
    
    if record.exc_info:
        import traceback
        log_entry["exception"] = {
            "type": record.exc_info[0].__name__ if record.exc_info[0] else None,
            "message": str(record.exc_info[1]) if record.exc_info[1] else None,
            "traceback": traceback.format_exception(*record.exc_info),
        }
    
    # Add extra fields
    skip_keys = {
        "name", "msg", "args", "created", "filename", "funcName",
        "levelname", "levelno", "lineno", "module", "msecs",
        "pathname", "process", "processName", "relativeCreated",
        "stack_info", "exc_info", "exc_text", "thread", "threadName", "message",
    }
    for key, value in record.__dict__.items():
        if key not in skip_keys:
            log_entry[key] = value
    
    return json.dumps(log_entry, default=str)
def configure_logging( level: str = "INFO", service_name: str = "api", environment: str = "development", ) -> None: """Configure structured logging.""" formatter = StructuredJsonFormatter()
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, level.upper()))

for handler in root_logger.handlers[:]:
    root_logger.removeHandler(handler)

handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger.addHandler(handler)

set_logging_context(LoggingContext(
    service_name=service_name,
    environment=environment,
))
class ContextLogger: """Logger wrapper that includes context automatically."""
def __init__(self, name: str):
    self._logger = logging.getLogger(name)

def _log(self, level: int, message: str, exc_info: bool = False, **kwargs: Any) -> None:
    context = get_logging_context()
    extra = {}
    if context:
        extra.update(context.to_dict())
    extra.update(kwargs)
    self._logger.log(level, message, exc_info=exc_info, extra=extra)

def debug(self, message: str, **kwargs: Any) -> None:
    self._log(logging.DEBUG, message, **kwargs)

def info(self, message: str, **kwargs: Any) -> None:
    self._log(logging.INFO, message, **kwargs)

def warning(self, message: str, **kwargs: Any) -> None:
    self._log(logging.WARNING, message, **kwargs)

def error(self, message: str, exc_info: bool = False, **kwargs: Any) -> None:
    self._log(logging.ERROR, message, exc_info=exc_info, **kwargs)

def exception(self, message: str, **kwargs: Any) -> None:
    self._log(logging.ERROR, message, exc_info=True, **kwargs)
def get_logger(name: str) -> ContextLogger: return ContextLogger(name)

Performance timing decorator

Performance timing decorator

F = TypeVar("F", bound=Callable[..., Any])
def timed( operation_name: str = None, log_args: bool = False, threshold_ms: float = None, ) -> Callable[[F], F]: """Decorator to log function execution time.""" def decorator(func: F) -> F: name = operation_name or func.name logger = get_logger(func.module)
    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = await func(*args, **kwargs)
            duration_ms = (time.perf_counter() - start) * 1000
            
            if threshold_ms is None or duration_ms >= threshold_ms:
                log_kwargs = {"operation": name, "duration_ms": round(duration_ms, 2)}
                if log_args:
                    log_kwargs["args"] = str(args)[:200]
                logger.info(f"{name} completed", **log_kwargs)
            
            return result
        except Exception as e:
            duration_ms = (time.perf_counter() - start) * 1000
            logger.error(f"{name} failed", operation=name, duration_ms=round(duration_ms, 2), exc_info=True)
            raise
    
    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            duration_ms = (time.perf_counter() - start) * 1000
            
            if threshold_ms is None or duration_ms >= threshold_ms:
                logger.info(f"{name} completed", operation=name, duration_ms=round(duration_ms, 2))
            
            return result
        except Exception as e:
            duration_ms = (time.perf_counter() - start) * 1000
            logger.error(f"{name} failed", operation=name, duration_ms=round(duration_ms, 2), exc_info=True)
            raise
    
    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    return sync_wrapper

return decorator
@contextmanager def worker_job_context(worker_name: str, job_id: str, user_id: str = None): """Context manager for worker job execution.""" context = LoggingContext( correlation_id=job_id, user_id=user_id, service_name=worker_name, extra={"job_id": job_id}, ) token = set_logging_context(context) logger = get_logger(worker_name)
start_time = time.perf_counter()

try:
    logger.info("Job started", job_id=job_id)
    yield
    
    duration_ms = (time.perf_counter() - start_time) * 1000
    logger.info("Job completed", job_id=job_id, duration_ms=round(duration_ms, 2))
except Exception as e:
    duration_ms = (time.perf_counter() - start_time) * 1000
    logger.exception("Job failed", job_id=job_id, duration_ms=round(duration_ms, 2))
    raise
finally:
    reset_logging_context(token)
undefined
F = TypeVar("F", bound=Callable[..., Any])
def timed( operation_name: str = None, log_args: bool = False, threshold_ms: float = None, ) -> Callable[[F], F]: """Decorator to log function execution time.""" def decorator(func: F) -> F: name = operation_name or func.name logger = get_logger(func.module)
    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = await func(*args, **kwargs)
            duration_ms = (time.perf_counter() - start) * 1000
            
            if threshold_ms is None or duration_ms >= threshold_ms:
                log_kwargs = {"operation": name, "duration_ms": round(duration_ms, 2)}
                if log_args:
                    log_kwargs["args"] = str(args)[:200]
                logger.info(f"{name} completed", **log_kwargs)
            
            return result
        except Exception as e:
            duration_ms = (time.perf_counter() - start) * 1000
            logger.error(f"{name} failed", operation=name, duration_ms=round(duration_ms, 2), exc_info=True)
            raise
    
    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        start = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            duration_ms = (time.perf_counter() - start) * 1000
            
            if threshold_ms is None or duration_ms >= threshold_ms:
                logger.info(f"{name} completed", operation=name, duration_ms=round(duration_ms, 2))
            
            return result
        except Exception as e:
            duration_ms = (time.perf_counter() - start) * 1000
            logger.error(f"{name} failed", operation=name, duration_ms=round(duration_ms, 2), exc_info=True)
            raise
    
    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    return sync_wrapper

return decorator
@contextmanager def worker_job_context(worker_name: str, job_id: str, user_id: str = None): """Context manager for worker job execution.""" context = LoggingContext( correlation_id=job_id, user_id=user_id, service_name=worker_name, extra={"job_id": job_id}, ) token = set_logging_context(context) logger = get_logger(worker_name)
start_time = time.perf_counter()

try:
    logger.info("Job started", job_id=job_id)
    yield
    
    duration_ms = (time.perf_counter() - start_time) * 1000
    logger.info("Job completed", job_id=job_id, duration_ms=round(duration_ms, 2))
except Exception as e:
    duration_ms = (time.perf_counter() - start_time) * 1000
    logger.exception("Job failed", job_id=job_id, duration_ms=round(duration_ms, 2))
    raise
finally:
    reset_logging_context(token)
undefined

TypeScript

TypeScript 实现

typescript
import { AsyncLocalStorage } from 'async_hooks';
import { randomUUID } from 'crypto';

interface LoggingContext {
  correlationId: string;
  userId?: string;
  requestPath?: string;
  requestMethod?: string;
  serviceName: string;
  environment: string;
  extra: Record<string, unknown>;
  startTime: Date;
}

const asyncLocalStorage = new AsyncLocalStorage<LoggingContext>();

function createContext(overrides: Partial<LoggingContext> = {}): LoggingContext {
  return {
    correlationId: randomUUID(),
    serviceName: 'api',
    environment: process.env.NODE_ENV || 'development',
    extra: {},
    startTime: new Date(),
    ...overrides,
  };
}

function getContext(): LoggingContext | undefined {
  return asyncLocalStorage.getStore();
}

function runWithContext<T>(context: LoggingContext, fn: () => T): T {
  return asyncLocalStorage.run(context, fn);
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  correlationId?: string;
  userId?: string;
  serviceName?: string;
  [key: string]: unknown;
}

class ContextLogger {
  constructor(private name: string) {}

  private log(level: string, message: string, extra: Record<string, unknown> = {}): void {
    const context = getContext();
    
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      logger: this.name,
      message,
      ...extra,
    };

    if (context) {
      entry.correlationId = context.correlationId;
      entry.userId = context.userId;
      entry.serviceName = context.serviceName;
      entry.requestPath = context.requestPath;
      Object.assign(entry, context.extra);
    }

    console.log(JSON.stringify(entry));
  }

  debug(message: string, extra?: Record<string, unknown>): void {
    this.log('DEBUG', message, extra);
  }

  info(message: string, extra?: Record<string, unknown>): void {
    this.log('INFO', message, extra);
  }

  warn(message: string, extra?: Record<string, unknown>): void {
    this.log('WARN', message, extra);
  }

  error(message: string, error?: Error, extra?: Record<string, unknown>): void {
    const errorInfo = error ? {
      errorType: error.name,
      errorMessage: error.message,
      stack: error.stack,
    } : {};
    this.log('ERROR', message, { ...errorInfo, ...extra });
  }
}

function getLogger(name: string): ContextLogger {
  return new ContextLogger(name);
}

// Performance timing decorator
function timed(operationName?: string, options: { threshold?: number } = {}) {
  return function <T extends (...args: any[]) => any>(
    target: any,
    propertyKey: string,
    descriptor: TypedPropertyDescriptor<T>
  ): TypedPropertyDescriptor<T> {
    const originalMethod = descriptor.value!;
    const name = operationName || propertyKey;
    const logger = getLogger(target.constructor.name);

    descriptor.value = async function (...args: any[]) {
      const start = performance.now();
      try {
        const result = await originalMethod.apply(this, args);
        const durationMs = performance.now() - start;
        
        if (!options.threshold || durationMs >= options.threshold) {
          logger.info(`${name} completed`, { operation: name, durationMs: Math.round(durationMs * 100) / 100 });
        }
        
        return result;
      } catch (error) {
        const durationMs = performance.now() - start;
        logger.error(`${name} failed`, error as Error, { operation: name, durationMs: Math.round(durationMs * 100) / 100 });
        throw error;
      }
    } as T;

    return descriptor;
  };
}

// Express middleware
function loggingMiddleware(serviceName: string) {
  return (req: any, res: any, next: any) => {
    const correlationId = req.headers['x-correlation-id'] || randomUUID();
    
    const context = createContext({
      correlationId,
      userId: req.user?.id,
      requestPath: req.path,
      requestMethod: req.method,
      serviceName,
    });

    runWithContext(context, () => {
      const logger = getLogger('http');
      const start = performance.now();

      logger.info('Request started', {
        queryParams: req.query,
        clientIp: req.ip,
      });

      res.on('finish', () => {
        const durationMs = performance.now() - start;
        logger.info('Request completed', {
          statusCode: res.statusCode,
          durationMs: Math.round(durationMs * 100) / 100,
        });
      });

      res.setHeader('X-Correlation-ID', correlationId);
      next();
    });
  };
}

export {
  LoggingContext,
  createContext,
  getContext,
  runWithContext,
  getLogger,
  timed,
  loggingMiddleware,
};
typescript
import { AsyncLocalStorage } from 'async_hooks';
import { randomUUID } from 'crypto';

interface LoggingContext {
  correlationId: string;
  userId?: string;
  requestPath?: string;
  requestMethod?: string;
  serviceName: string;
  environment: string;
  extra: Record<string, unknown>;
  startTime: Date;
}

const asyncLocalStorage = new AsyncLocalStorage<LoggingContext>();

function createContext(overrides: Partial<LoggingContext> = {}): LoggingContext {
  return {
    correlationId: randomUUID(),
    serviceName: 'api',
    environment: process.env.NODE_ENV || 'development',
    extra: {},
    startTime: new Date(),
    ...overrides,
  };
}

function getContext(): LoggingContext | undefined {
  return asyncLocalStorage.getStore();
}

function runWithContext<T>(context: LoggingContext, fn: () => T): T {
  return asyncLocalStorage.run(context, fn);
}

interface LogEntry {
  timestamp: string;
  level: string;
  message: string;
  correlationId?: string;
  userId?: string;
  serviceName?: string;
  [key: string]: unknown;
}

class ContextLogger {
  constructor(private name: string) {}

  private log(level: string, message: string, extra: Record<string, unknown> = {}): void {
    const context = getContext();
    
    const entry: LogEntry = {
      timestamp: new Date().toISOString(),
      level,
      logger: this.name,
      message,
      ...extra,
    };

    if (context) {
      entry.correlationId = context.correlationId;
      entry.userId = context.userId;
      entry.serviceName = context.serviceName;
      entry.requestPath = context.requestPath;
      Object.assign(entry, context.extra);
    }

    console.log(JSON.stringify(entry));
  }

  debug(message: string, extra?: Record<string, unknown>): void {
    this.log('DEBUG', message, extra);
  }

  info(message: string, extra?: Record<string, unknown>): void {
    this.log('INFO', message, extra);
  }

  warn(message: string, extra?: Record<string, unknown>): void {
    this.log('WARN', message, extra);
  }

  error(message: string, error?: Error, extra?: Record<string, unknown>): void {
    const errorInfo = error ? {
      errorType: error.name,
      errorMessage: error.message,
      stack: error.stack,
    } : {};
    this.log('ERROR', message, { ...errorInfo, ...extra });
  }
}

function getLogger(name: string): ContextLogger {
  return new ContextLogger(name);
}

// Performance timing decorator
function timed(operationName?: string, options: { threshold?: number } = {}) {
  return function <T extends (...args: any[]) => any>(
    target: any,
    propertyKey: string,
    descriptor: TypedPropertyDescriptor<T>
  ): TypedPropertyDescriptor<T> {
    const originalMethod = descriptor.value!;
    const name = operationName || propertyKey;
    const logger = getLogger(target.constructor.name);

    descriptor.value = async function (...args: any[]) {
      const start = performance.now();
      try {
        const result = await originalMethod.apply(this, args);
        const durationMs = performance.now() - start;
        
        if (!options.threshold || durationMs >= options.threshold) {
          logger.info(`${name} completed`, { operation: name, durationMs: Math.round(durationMs * 100) / 100 });
        }
        
        return result;
      } catch (error) {
        const durationMs = performance.now() - start;
        logger.error(`${name} failed`, error as Error, { operation: name, durationMs: Math.round(durationMs * 100) / 100 });
        throw error;
      }
    } as T;

    return descriptor;
  };
}

// Express middleware
function loggingMiddleware(serviceName: string) {
  return (req: any, res: any, next: any) => {
    const correlationId = req.headers['x-correlation-id'] || randomUUID();
    
    const context = createContext({
      correlationId,
      userId: req.user?.id,
      requestPath: req.path,
      requestMethod: req.method,
      serviceName,
    });

    runWithContext(context, () => {
      const logger = getLogger('http');
      const start = performance.now();

      logger.info('Request started', {
        queryParams: req.query,
        clientIp: req.ip,
      });

      res.on('finish', () => {
        const durationMs = performance.now() - start;
        logger.info('Request completed', {
          statusCode: res.statusCode,
          durationMs: Math.round(durationMs * 100) / 100,
        });
      });

      res.setHeader('X-Correlation-ID', correlationId);
      next();
    });
  };
}

export {
  LoggingContext,
  createContext,
  getContext,
  runWithContext,
  getLogger,
  timed,
  loggingMiddleware,
};

Usage Examples

使用示例

API Routes with Context

带上下文的API路由

python
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()
logger = get_logger(__name__)

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
        
        context = LoggingContext(
            correlation_id=correlation_id,
            request_path=request.url.path,
            request_method=request.method,
        )
        token = set_logging_context(context)
        
        try:
            logger.info("Request started")
            start = time.perf_counter()
            
            response = await call_next(request)
            
            duration_ms = (time.perf_counter() - start) * 1000
            logger.info("Request completed", status_code=response.status_code, duration_ms=round(duration_ms, 2))
            
            response.headers["X-Correlation-ID"] = correlation_id
            return response
        finally:
            reset_logging_context(token)

app.add_middleware(LoggingMiddleware)

@app.post("/generate")
@timed("generate_asset")
async def generate_asset(request: GenerateRequest):
    logger.info("Starting generation", asset_type=request.asset_type)
    # All logs include correlation_id automatically
    return {"job_id": job.id}
python
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()
logger = get_logger(__name__)

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
        
        context = LoggingContext(
            correlation_id=correlation_id,
            request_path=request.url.path,
            request_method=request.method,
        )
        token = set_logging_context(context)
        
        try:
            logger.info("Request started")
            start = time.perf_counter()
            
            response = await call_next(request)
            
            duration_ms = (time.perf_counter() - start) * 1000
            logger.info("Request completed", status_code=response.status_code, duration_ms=round(duration_ms, 2))
            
            response.headers["X-Correlation-ID"] = correlation_id
            return response
        finally:
            reset_logging_context(token)

app.add_middleware(LoggingMiddleware)

@app.post("/generate")
@timed("generate_asset")
async def generate_asset(request: GenerateRequest):
    logger.info("Starting generation", asset_type=request.asset_type)
    # All logs include correlation_id automatically
    return {"job_id": job.id}

Background Workers

后台Worker

python
def process_job(job_id: str, user_id: str):
    with worker_job_context("generation_worker", job_id, user_id):
        logger.info("Processing generation job")
        # All logs include job_id and user_id
        result = generate_image(...)
        logger.info("Image generated", size_bytes=len(result))
python
def process_job(job_id: str, user_id: str):
    with worker_job_context("generation_worker", job_id, user_id):
        logger.info("Processing generation job")
        # All logs include job_id and user_id
        result = generate_image(...)
        logger.info("Image generated", size_bytes=len(result))

Timed Operations

计时操作

python
@timed("database_query", threshold_ms=100)
async def fetch_user(user_id: str):
    # Only logs if query takes > 100ms
    return await db.users.find_one({"id": user_id})
python
@timed("database_query", threshold_ms=100)
async def fetch_user(user_id: str):
    # Only logs if query takes > 100ms
    return await db.users.find_one({"id": user_id})

Best Practices

最佳实践

  1. Always use correlation IDs for distributed tracing
  2. Use JSON format for log aggregation tools
  3. Propagate context through async boundaries with contextvars
  4. Set thresholds to only log slow operations
  5. Never log passwords, tokens, or PII
  6. Use DEBUG for development, INFO for production
  1. 始终使用correlation IDs进行分布式追踪
  2. 为日志聚合工具使用JSON格式
  3. 使用contextvars在异步边界中传播上下文
  4. 设置阈值仅记录慢操作
  5. 切勿记录密码、令牌或PII(个人可识别信息)
  6. 开发环境使用DEBUG级别,生产环境使用INFO级别

Common Mistakes

常见错误

  • Missing correlation IDs (can't trace requests)
  • Unstructured log messages (hard to parse)
  • Logging sensitive data (security risk)
  • No context propagation (lose trace in async code)
  • Logging everything (noise and cost)
  • 缺失correlation IDs(无法追踪请求)
  • 非结构化日志消息(难以解析)
  • 记录敏感数据(存在安全风险)
  • 未进行上下文传播(在异步代码中丢失追踪)
  • 记录所有内容(产生日志噪音并增加成本)

Related Patterns

相关模式

  • anomaly-detection - Uses logs to detect issues
  • health-checks - Logs health status
  • error-sanitization - Safe error logging
  • anomaly-detection - 利用日志检测问题
  • health-checks - 记录健康状态
  • error-sanitization - 安全的错误日志记录