structlog-structured-logging
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
Chinesestructlog — Structured Logging for Python
structlog — Python结构化日志工具
structlog turns log entries into dictionaries processed through a chain of functions, giving you structured output (JSON, logfmt, pretty console) without sacrificing performance. It has been in production since 2013 and supports threads, asyncio, and greenlets.
structlog 可将日志条目转换为字典,并通过函数链进行处理,在不牺牲性能的前提下为你提供结构化输出(JSON、logfmt、美观的控制台格式)。自2013年起,它已投入生产环境使用,支持线程、asyncio和greenlets。
Why structlog over stdlib logging
logging为什么选择structlog而非标准库logging
loggingstdlib | structlog |
|---|---|
| String messages, hard to parse | Key-value dictionaries, machine-readable |
| Global mutable state | Immutable bound loggers, safe to pass around |
| Complex handler/formatter hierarchy | Simple processor chain of plain callables |
| No built-in context propagation | |
| Verbose boilerplate per file | One |
标准库 | structlog |
|---|---|
| 字符串消息,难以解析 | 键值对字典,机器可读 |
| 全局可变状态 | 不可变绑定日志器,可安全传递 |
| 复杂的处理器/格式化器层级结构 | 由普通可调用对象组成的简单处理器链 |
| 无内置上下文传播 | 开箱即支持 |
| 每个文件需要大量冗余模板代码 | 每个文件仅需调用一次 |
Installation
安装
bash
pip install structlogbash
pip install structlogFor pretty dev exceptions (recommended):
如需美观的开发环境异常展示(推荐):
pip install structlog rich
pip install structlog rich
Windows only (for colors):
仅Windows系统(用于颜色显示):
pip install structlog rich colorama
undefinedpip install structlog rich colorama
undefinedCore Concepts
核心概念
Event dict
事件字典
Every log call builds a dictionary (). Context bound via is merged with the kwargs of the log call.
event_dictbind()每次日志调用都会生成一个字典()。通过绑定的上下文会与日志调用的关键字参数合并。
event_dictbind()Processor chain
处理器链
A list of callables, each with signature . They run in order; the last one must return a string/bytes (the renderer).
(logger, method_name, event_dict) -> event_dict由一系列可调用对象组成的列表,每个对象的签名为。它们按顺序执行,最后一个处理器必须返回字符串/字节(即渲染器)。
(logger, method_name, event_dict) -> event_dictBound logger
绑定日志器
The object returned by . It's immutable — calling returns a new logger. Use for mutable global context.
get_logger()bind()contextvarsget_logger()bind()contextvarsBasic Usage
基础用法
python
import structlog
log = structlog.get_logger()python
import structlog
log = structlog.get_logger()Simple log
简单日志
log.info("user_login", user_id=42, ip="1.2.3.4")
log.info("user_login", user_id=42, ip="1.2.3.4")
→ 2024-01-01 12:00:00 [info ] user_login user_id=42 ip=1.2.3.4
→ 2024-01-01 12:00:00 [info ] user_login user_id=42 ip=1.2.3.4
Bind context to a local logger
为本地日志器绑定上下文
log = log.bind(request_id="abc-123", user_id=42)
log.info("processing_started")
log.warning("slow_query", duration_ms=1500)
log = log.bind(request_id="abc-123", user_id=42)
log.info("processing_started")
log.warning("slow_query", duration_ms=1500)
Both entries include request_id and user_id automatically
两条日志条目都会自动包含request_id和user_id
Unbind a key
解绑某个键
log = log.unbind("user_id")
log = log.unbind("user_id")
Replace all context
替换所有上下文
log = log.new(request_id="new-456")
undefinedlog = log.new(request_id="new-456")
undefinedLog levels
日志级别
python
log.debug("debug_event")
log.info("info_event")
log.warning("warn_event")
log.error("error_event")
log.critical("critical_event")python
log.debug("debug_event")
log.info("info_event")
log.warning("warn_event")
log.error("error_event")
log.critical("critical_event")Exception with traceback:
包含回溯信息的异常日志:
try:
1 / 0
except ZeroDivisionError:
log.exception("division_failed") # captures exc_info automatically
undefinedtry:
1 / 0
except ZeroDivisionError:
log.exception("division_failed") # 自动捕获异常信息
undefinedasyncio
asyncio支持
python
import asyncio
import structlog
logger = structlog.get_logger()
async def handle_request():
await logger.ainfo("async_request", path="/api/items")
# sync methods also work inside async code:
logger.info("sync_log_in_async")python
import asyncio
import structlog
logger = structlog.get_logger()
async def handle_request():
await logger.ainfo("async_request", path="/api/items")
# 同步方法也可在异步代码中使用:
logger.info("sync_log_in_async")Configuration
配置
Call once at app startup, before any loggers are created.
structlog.configure()python
import structlog
structlog.configure(
processors=[...], # list of processor callables
wrapper_class=..., # bound logger class (default: FilteringBoundLogger)
context_class=dict, # context storage class
logger_factory=..., # factory for the underlying output logger
cache_logger_on_first_use=True, # freeze config for performance (disable in tests)
)Important:returns a lazy proxy — safe to call at module level beforeget_logger(). Never callconfigure()orbind()at module/class scope, as that freezes the default config. Usenew()for pre-populated contexts instead.get_logger(initial_key=value)
在应用启动时,仅需调用一次,且必须在创建任何日志器之前执行。
structlog.configure()python
import structlog
structlog.configure(
processors=[...], # 处理器可调用对象列表
wrapper_class=..., # 绑定日志器类(默认:FilteringBoundLogger)
context_class=dict, # 上下文存储类
logger_factory=..., # 底层输出日志器的工厂
cache_logger_on_first_use=True, # 冻结配置以提升性能(测试环境中禁用)
)重要提示:返回一个延迟代理——在调用get_logger()之前,可安全地在模块级别调用。切勿在模块/类作用域中调用configure()或bind(),因为这会冻结默认配置。如需预填充上下文,请使用new()。get_logger(initial_key=value)
Recommended Configurations
推荐配置
Development (pretty console output)
开发环境(美观的控制台输出)
python
import logging
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.dev.ConsoleRenderer(), # colorful, human-readable
],
wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False, # keep False during development
)python
import logging
import structlog
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.dev.ConsoleRenderer(), # 彩色、易读的格式
],
wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False, # 开发环境中保持为False
)Production (JSON output, stdlib integration)
生产环境(JSON输出,与标准库集成)
python
import logging
import sys
import structlogpython
import logging
import sys
import structlogConfigure stdlib logging first
先配置标准库logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=logging.INFO,
)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level, # drop below-threshold entries early
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(), # final renderer → JSON string
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
undefinedlogging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=logging.INFO,
)
structlog.configure(
processors=[
structlog.stdlib.filter_by_level, # 提前过滤低于阈值的日志条目
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer(), # 最终渲染器 → JSON字符串
],
wrapper_class=structlog.stdlib.BoundLogger,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
undefinedDev/Prod auto-switch (single config)
开发/生产环境自动切换(单一配置)
python
import sys
import structlog
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if sys.stderr.isatty():
# Terminal session → pretty output
processors = shared_processors + [structlog.dev.ConsoleRenderer()]
else:
# Docker / CI / production → JSON with structured tracebacks
processors = shared_processors + [
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
structlog.configure(processors=processors)python
import sys
import structlog
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if sys.stderr.isatty():
# 终端会话 → 美观输出
processors = shared_processors + [structlog.dev.ConsoleRenderer()]
else:
# Docker / CI / 生产环境 → 带结构化回溯的JSON
processors = shared_processors + [
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
structlog.configure(processors=processors)Context Variables (Request-scoped Logging)
上下文变量(请求域日志)
Use to bind values like once per request and have them appear in all log entries — even those in deeply nested functions.
contextvarsrequest_id使用在每个请求中绑定一次等值,这些值会出现在所有日志条目中——即使是在深层嵌套的函数中。
contextvarsrequest_idSetup
配置
python
undefinedpython
undefinedMust be first in the processor chain:
必须放在处理器链的首位:
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # ← first!
...
]
)
undefinedstructlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # ← 首位!
...
]
)
undefinedUsage pattern
使用模式
python
from structlog.contextvars import (
bind_contextvars,
unbind_contextvars,
clear_contextvars,
bound_contextvars, # context manager
)python
from structlog.contextvars import (
bind_contextvars,
unbind_contextvars,
clear_contextvars,
bound_contextvars, # 上下文管理器
)In your request middleware / handler entry point:
在请求中间件/处理入口中:
def process_request(request):
clear_contextvars() # reset from previous request!
bind_contextvars(
request_id=str(uuid.uuid4()),
user_id=request.user.id,
path=request.path,
)
# All log calls anywhere in this thread/coroutine will include these values
handle(request)
def process_request(request):
clear_contextvars() # 重置之前请求的上下文!
bind_contextvars(
request_id=str(uuid.uuid4()),
user_id=request.user.id,
path=request.path,
)
# 此线程/协程中的所有日志调用都会包含这些值
handle(request)
Temporarily bind extra context:
临时绑定额外上下文:
with bound_contextvars(operation="checkout"):
log.info("starting_operation")
do_checkout()
log.info("operation_complete")
with bound_contextvars(operation="checkout"):
log.info("starting_operation")
do_checkout()
log.info("operation_complete")
operation key is gone here
此处已无operation键
undefinedundefinedFlask example
Flask示例
python
import uuid
import flask
import structlog
logger = structlog.get_logger()
app = flask.Flask(__name__)
@app.before_request
def bind_request_context():
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=str(uuid.uuid4()),
peer=flask.request.access_route[0],
path=flask.request.path,
)FastAPI/Starlette warning: Context variables are isolated between sync and async execution contexts. Values bound in sync middleware won't appear in async route logs and vice versa. Use a dedicated async middleware that callsinside the async context.bind_contextvars
python
import uuid
import flask
import structlog
logger = structlog.get_logger()
app = flask.Flask(__name__)
@app.before_request
def bind_request_context():
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=str(uuid.uuid4()),
peer=flask.request.access_route[0],
path=flask.request.path,
)**FastAPI/Starlette注意事项:**上下文变量在同步和异步执行上下文之间是隔离的。在同步中间件中绑定的值不会出现在异步路由日志中,反之亦然。请使用专用的异步中间件,在异步上下文中调用。bind_contextvars
Passing context to worker threads
将上下文传递给工作线程
python
from functools import partial
from structlog.contextvars import get_contextvars, bind_contextvars
def worker(ctx, item):
bind_contextvars(**ctx) # re-bind in the worker thread
logger.info("processing_item", item=item)
ctx = get_contextvars() # snapshot from the parent thread
with ThreadPoolExecutor() as pool:
pool.map(partial(worker, ctx), items)python
from functools import partial
from structlog.contextvars import get_contextvars, bind_contextvars
def worker(ctx, item):
bind_contextvars(**ctx) # 在工作线程中重新绑定
logger.info("processing_item", item=item)
ctx = get_contextvars() # 从父线程快照上下文
with ThreadPoolExecutor() as pool:
pool.map(partial(worker, ctx), items)Processors Reference
处理器参考
A processor is any callable with signature:
python
def my_processor(logger, method_name: str, event_dict: dict) -> dict:
event_dict["my_key"] = compute_value()
return event_dict处理器是任何符合以下签名的可调用对象:
python
def my_processor(logger, method_name: str, event_dict: dict) -> dict:
event_dict["my_key"] = compute_value()
return event_dictBuilt-in processors (most useful)
内置处理器(最实用的)
| Processor | Purpose |
|---|---|
| Merges contextvars into event dict (use first) |
| Adds |
| Adds |
| Renders |
| Renders exception under |
| Structured (dict) exception tracebacks |
| Decodes bytes values to str |
| Adds filename, func_name, lineno |
| Renames the |
| Renders event dict to JSON string |
| Pretty colorful console output |
| Simple |
| Raise this exception to silently drop an entry |
| 处理器 | 用途 |
|---|---|
| 将contextvars合并到事件字典中(首位使用) |
| 添加 |
| 添加ISO 8601格式的 |
| 若存在 |
| 在 |
| 结构化(字典格式)的异常回溯 |
| 将字节值解码为字符串 |
| 添加文件名、函数名、行号 |
| 重命名 |
| 将事件字典渲染为JSON字符串 |
| 美观的彩色控制台输出 |
| 简单的 |
| 抛出此异常可静默丢弃日志条目 |
Custom processor example
自定义处理器示例
python
def add_app_version(logger, method_name, event_dict):
event_dict["app_version"] = "1.4.2"
return event_dict
def drop_health_checks(logger, method_name, event_dict):
if event_dict.get("path") == "/health":
raise structlog.DropEvent
return event_dictpython
def add_app_version(logger, method_name, event_dict):
event_dict["app_version"] = "1.4.2"
return event_dict
def drop_health_checks(logger, method_name, event_dict):
if event_dict.get("path") == "/health":
raise structlog.DropEvent
return event_dictLog-level filtering
日志级别过滤
python
import loggingpython
import loggingOnly log WARNING and above:
仅记录WARNING及以上级别:
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.WARNING),
)
---structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.WARNING),
)
---Integration with stdlib logging
logging与标准库logging
集成
loggingQuickest start
快速开始
python
import structlog
structlog.stdlib.recreate_defaults()python
import structlog
structlog.stdlib.recreate_defaults()structlog now routes through stdlib logging with sensible defaults
structlog现在会通过标准库logging路由,并使用合理的默认配置
undefinedundefinedFull integration (ProcessorFormatter)
完整集成(ProcessorFormatter)
Routes both structlog and stdlib through the same processor chain — consistent output for your code and third-party libraries:
loggingpython
import logging
import structlog
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
shared_processors = [
structlog.stdlib.add_log_level,
structlog.stdlib.ExtraAdder(), # pass `extra=` kwargs through
timestamper,
]
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors, # applied to stdlib-only entries
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(), # or JSONRenderer() for production
],
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)Note: When using, do not useProcessorFormatterin the processor chain — userender_to_log_kwargs()instead.wrap_for_formatter
Note: If using the same output stream for both structlog and stdlib, use(notWriteLogger) to prevent interleaved output.PrintLoggercallsPrintLoggerwhich writes message and newline separately.print()
将**structlog和标准库**都路由到同一处理器链中——你的代码和第三方库的输出格式保持一致:
loggingpython
import logging
import structlog
timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
shared_processors = [
structlog.stdlib.add_log_level,
structlog.stdlib.ExtraAdder(), # 传递`extra=`关键字参数
timestamper,
]
structlog.configure(
processors=shared_processors + [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
formatter = structlog.stdlib.ProcessorFormatter(
foreign_pre_chain=shared_processors, # 应用于仅使用标准库的日志条目
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
structlog.dev.ConsoleRenderer(), # 生产环境可替换为JSONRenderer()
],
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)**注意:**使用时,切勿在处理器链中使用ProcessorFormatter——请改用render_to_log_kwargs()。wrap_for_formatter
**注意:**如果structlog和标准库使用同一输出流,请使用(而非WriteLogger)以避免输出交错。PrintLogger会调用PrintLogger,将消息和换行符分开写入。print()
Testing
测试
python
from structlog.testing import capture_logs
import structlog
def test_login_logs_user_id():
with capture_logs() as cap:
structlog.get_logger().bind(user_id=99).info("user_login")
assert cap == [{"user_id": 99, "event": "user_login", "log_level": "info"}]python
from structlog.testing import capture_logs
import structlog
def test_login_logs_user_id():
with capture_logs() as cap:
structlog.get_logger().bind(user_id=99).info("user_login")
assert cap == [{"user_id": 99, "event": "user_login", "log_level": "info"}]Capture with specific processors (e.g., contextvars)
捕获带特定处理器的日志(如contextvars)
python
from structlog import contextvars, get_logger
from structlog.testing import capture_logs
def test_contextvars_appear():
with capture_logs(processors=[contextvars.merge_contextvars]) as cap:
contextvars.bind_contextvars(request_id="xyz")
get_logger().info("hello")
assert cap[0]["request_id"] == "xyz"python
from structlog import contextvars, get_logger
from structlog.testing import capture_logs
def test_contextvars_appear():
with capture_logs(processors=[contextvars.merge_contextvars]) as cap:
contextvars.bind_contextvars(request_id="xyz")
get_logger().info("hello")
assert cap[0]["request_id"] == "xyz"pytest fixture
pytest夹具
python
import pytest
import structlog
from structlog.testing import LogCapture
@pytest.fixture(name="log_output")
def fixture_log_output():
return LogCapture()
@pytest.fixture(autouse=True)
def fixture_configure_structlog(log_output):
structlog.configure(processors=[log_output])
yield
structlog.reset_defaults()
def test_something(log_output):
do_something()
assert log_output.entries[0]["event"] == "expected_event"Important: Disablein test configuration — cached loggers won't be affected bycache_logger_on_first_use=True.capture_logs()
python
import pytest
import structlog
from structlog.testing import LogCapture
@pytest.fixture(name="log_output")
def fixture_log_output():
return LogCapture()
@pytest.fixture(autouse=True)
def fixture_configure_structlog(log_output):
structlog.configure(processors=[log_output])
yield
structlog.reset_defaults()
def test_something(log_output):
do_something()
assert log_output.entries[0]["event"] == "expected_event"**重要提示:**在测试配置中禁用——缓存的日志器不会受cache_logger_on_first_use=True影响。capture_logs()
Best Practices
最佳实践
Canonical log lines
标准日志行
Bind context incrementally throughout a request and emit one final summary log entry. Less noise, more signal.
python
log = structlog.get_logger()
def handle_order(order_id):
log = log.bind(order_id=order_id)
# ... process ...
log.info("order_processed", items=5, total_usd=99.99, duration_ms=42)在请求过程中逐步绑定上下文,并输出一条最终的汇总日志条目。减少冗余,提升信息密度。
python
log = structlog.get_logger()
def handle_order(order_id):
log = log.bind(order_id=order_id)
# ... 处理逻辑 ...
log.info("order_processed", items=5, total_usd=99.99, duration_ms=42)Use events as identifiers, not messages
将事件用作标识符,而非消息
python
undefinedpython
undefinedBad — hard to query in log aggregators:
不佳——在日志聚合器中难以查询:
log.info("User 42 logged in from 1.2.3.4")
log.info("User 42 logged in from 1.2.3.4")
Good — machine-readable, queryable:
良好——机器可读,可查询:
log.info("user_login", user_id=42, ip="1.2.3.4")
undefinedlog.info("user_login", user_id=42, ip="1.2.3.4")
undefinedLog to stdout, let infrastructure handle the rest
输出到stdout,由基础设施处理后续流程
structlog → stdout → systemd/Docker/Kubernetes → log aggregator (ELK, Graylog, Datadog).
structlog → stdout → systemd/Docker/Kubernetes → 日志聚合器(ELK、Graylog、Datadog)。
Performance tip
性能优化提示
In hot paths, create a local bound logger to avoid per-call proxy overhead:
python
def process_batch(items):
log = structlog.get_logger().bind(batch_size=len(items)) # one proxy resolution
for item in items:
log.debug("processing_item", item_id=item.id)在热点路径中,创建本地绑定日志器以避免每次调用的代理开销:
python
def process_batch(items):
log = structlog.get_logger().bind(batch_size=len(items)) # 仅解析一次代理
for item in items:
log.debug("processing_item", item_id=item.id)Common Pitfalls
常见陷阱
| Pitfall | Fix |
|---|---|
Calling | Use |
Forgetting | Old request's context leaks into new requests |
| |
Using | Use |
Not calling | Logs with default config (may not match your expected format) |
| Hybrid sync/async with FastAPI/Starlette | Contextvars don't cross sync↔async boundaries automatically |
Putting | Context vars won't appear in output |
| 陷阱 | 解决方法 |
|---|---|
在模块作用域中调用 | 改用 |
请求开始时忘记调用 | 旧请求的上下文会泄漏到新请求中 |
测试环境中设置 | |
同一流中同时使用 | 使用 |
首次日志调用前未调用 | 日志将使用默认配置(可能与预期格式不符) |
| FastAPI/Starlette中混合同步/异步代码 | Contextvars不会自动跨同步↔异步边界传递 |
| 上下文变量不会出现在输出中 |
Advanced Examples
高级示例
Rename event
key to message
for ECS/Datadog compatibility
eventmessage重命名event
键为message
以兼容ECS/Datadog
eventmessagepython
from structlog.processors import EventRenamer
structlog.configure(
processors=[
...
EventRenamer("message"), # renames event → message in output
structlog.processors.JSONRenderer(),
]
)python
from structlog.processors import EventRenamer
structlog.configure(
processors=[
...
EventRenamer("message"), # 输出中将event重命名为message
structlog.processors.JSONRenderer(),
]
)Fine-grained per-module filtering
细粒度的按模块过滤
python
def filter_noisy_module(logger, method_name, event_dict):
if event_dict.get("func_name") in {"health_check", "ping"}:
raise structlog.DropEvent
return event_dict
structlog.configure(
processors=[
structlog.processors.CallsiteParameterAdder(
[structlog.processors.CallsiteParameter.FUNC_NAME]
),
filter_noisy_module,
...
]
)python
def filter_noisy_module(logger, method_name, event_dict):
if event_dict.get("func_name") in {"health_check", "ping"}:
raise structlog.DropEvent
return event_dict
structlog.configure(
processors=[
structlog.processors.CallsiteParameterAdder(
[structlog.processors.CallsiteParameter.FUNC_NAME]
),
filter_noisy_module,
...
]
)Output to stderr
输出到stderr
python
import sys
structlog.configure(logger_factory=structlog.PrintLoggerFactory(sys.stderr))python
import sys
structlog.configure(logger_factory=structlog.PrintLoggerFactory(sys.stderr))Custom bound logger with domain-specific methods
带领域特定方法的自定义绑定日志器
python
from structlog import BoundLoggerBase, PrintLogger, wrap_logger
class AppLogger(BoundLoggerBase):
def user_action(self, action: str, **kw):
return self._proxy_to_logger("info", action, status="ok", **kw)
def user_error(self, action: str, **kw):
return self._proxy_to_logger("warning", action, status="error", **kw)
log = wrap_logger(PrintLogger(), wrapper_class=AppLogger)
log.user_action("checkout", cart_size=3)python
from structlog import BoundLoggerBase, PrintLogger, wrap_logger
class AppLogger(BoundLoggerBase):
def user_action(self, action: str, **kw):
return self._proxy_to_logger("info", action, status="ok", **kw)
def user_error(self, action: str, **kw):
return self._proxy_to_logger("warning", action, status="error", **kw)
log = wrap_logger(PrintLogger(), wrapper_class=AppLogger)
log.user_action("checkout", cart_size=3)Reset context for contextvars.Token
contextvars.Token使用contextvars.Token
重置上下文
contextvars.Tokenpython
from structlog.contextvars import bind_contextvars, reset_contextvars
def handler():
bind_contextvars(user="alice")
_helper()
log.info("back to alice") # user=alice
def _helper():
tokens = bind_contextvars(user="bob")
log.info("inside helper") # user=bob
reset_contextvars(**tokens) # restore previous valuespython
from structlog.contextvars import bind_contextvars, reset_contextvars
def handler():
bind_contextvars(user="alice")
_helper()
log.info("back to alice") # user=alice
def _helper():
tokens = bind_contextvars(user="bob")
log.info("inside helper") # user=bob
reset_contextvars(**tokens) # 恢复之前的值Quick Reference
快速参考
python
import structlogpython
import structlogModule-level logger (safe at import time)
模块级日志器(导入时安全调用)
logger = structlog.get_logger()
logger = structlog.get_logger()
Per-request context (in middleware)
请求级上下文(在中间件中)
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(request_id="...", user_id=1)
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(request_id="...", user_id=1)
Local immutable context
本地不可变上下文
log = logger.bind(component="payments")
log = log.bind(order_id=42) # new logger, old unchanged
log = log.unbind("order_id")
log = log.new(session_id="fresh") # replace all context
log = logger.bind(component="payments")
log = log.bind(order_id=42) # 新日志器,旧日志器保持不变
log = log.unbind("order_id")
log = log.new(session_id="fresh") # 替换所有上下文
Log calls
日志调用
log.debug / .info / .warning / .error / .critical("event_name", key=value)
log.exception("event_name") # includes exc_info
log.debug / .info / .warning / .error / .critical("event_name", key=value)
log.exception("event_name") # 包含异常信息
Async
异步日志
await log.ainfo("async_event")
await log.ainfo("async_event")
Reset config (useful in tests)
重置配置(测试中实用)
structlog.reset_defaults()
undefinedstructlog.reset_defaults()
undefined