structlog-structured-logging

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

structlog — Structured Logging for Python

structlog — Python结构化日志工具

structlog turns log entries into dictionaries processed through a chain of functions, giving you structured output (JSON, logfmt, pretty console) without sacrificing performance. It has been in production since 2013 and supports threads, asyncio, and greenlets.
structlog 可将日志条目转换为字典,并通过函数链进行处理,在不牺牲性能的前提下为你提供结构化输出(JSON、logfmt、美观的控制台格式)。自2013年起,它已投入生产环境使用,支持线程、asyncio和greenlets。

Why structlog over stdlib
logging

为什么选择structlog而非标准库
logging

stdlib
logging
structlog
String messages, hard to parseKey-value dictionaries, machine-readable
Global mutable stateImmutable bound loggers, safe to pass around
Complex handler/formatter hierarchySimple processor chain of plain callables
No built-in context propagation
contextvars
integration out of the box
Verbose boilerplate per fileOne
get_logger()
call per file
标准库
logging
structlog
字符串消息,难以解析键值对字典,机器可读
全局可变状态不可变绑定日志器,可安全传递
复杂的处理器/格式化器层级结构由普通可调用对象组成的简单处理器链
无内置上下文传播开箱即支持
contextvars
集成
每个文件需要大量冗余模板代码每个文件仅需调用一次
get_logger()

Installation

安装

bash
pip install structlog
bash
pip install structlog

For pretty dev exceptions (recommended):

如需美观的开发环境异常展示(推荐):

pip install structlog rich
pip install structlog rich

Windows only (for colors):

仅Windows系统(用于颜色显示):

pip install structlog rich colorama
undefined
pip install structlog rich colorama
undefined

Core Concepts

核心概念

Event dict

事件字典

Every log call builds a dictionary (
event_dict
). Context bound via
bind()
is merged with the kwargs of the log call.
每次日志调用都会生成一个字典
event_dict
)。通过
bind()
绑定的上下文会与日志调用的关键字参数合并。

Processor chain

处理器链

A list of callables, each with signature
(logger, method_name, event_dict) -> event_dict
. They run in order; the last one must return a string/bytes (the renderer).
由一系列可调用对象组成的列表,每个对象的签名为
(logger, method_name, event_dict) -> event_dict
。它们按顺序执行,最后一个处理器必须返回字符串/字节(即渲染器)。

Bound logger

绑定日志器

The object returned by
get_logger()
. It's immutable — calling
bind()
returns a new logger. Use
contextvars
for mutable global context.

get_logger()
返回的对象。它是不可变的——调用
bind()
会返回一个新的日志器。使用
contextvars
来存储可变全局上下文。

Basic Usage

基础用法

python
import structlog

log = structlog.get_logger()
python
import structlog

log = structlog.get_logger()

Simple log

简单日志

log.info("user_login", user_id=42, ip="1.2.3.4")
log.info("user_login", user_id=42, ip="1.2.3.4")

→ 2024-01-01 12:00:00 [info ] user_login user_id=42 ip=1.2.3.4

→ 2024-01-01 12:00:00 [info ] user_login user_id=42 ip=1.2.3.4

Bind context to a local logger

为本地日志器绑定上下文

log = log.bind(request_id="abc-123", user_id=42) log.info("processing_started") log.warning("slow_query", duration_ms=1500)
log = log.bind(request_id="abc-123", user_id=42) log.info("processing_started") log.warning("slow_query", duration_ms=1500)

Both entries include request_id and user_id automatically

两条日志条目都会自动包含request_id和user_id

Unbind a key

解绑某个键

log = log.unbind("user_id")
log = log.unbind("user_id")

Replace all context

替换所有上下文

log = log.new(request_id="new-456")
undefined
log = log.new(request_id="new-456")
undefined

Log levels

日志级别

python
log.debug("debug_event")
log.info("info_event")
log.warning("warn_event")
log.error("error_event")
log.critical("critical_event")
python
log.debug("debug_event")
log.info("info_event")
log.warning("warn_event")
log.error("error_event")
log.critical("critical_event")

Exception with traceback:

包含回溯信息的异常日志:

try: 1 / 0 except ZeroDivisionError: log.exception("division_failed") # captures exc_info automatically
undefined
try: 1 / 0 except ZeroDivisionError: log.exception("division_failed") # 自动捕获异常信息
undefined

asyncio

asyncio支持

python
import asyncio
import structlog

logger = structlog.get_logger()

async def handle_request():
    await logger.ainfo("async_request", path="/api/items")
    # sync methods also work inside async code:
    logger.info("sync_log_in_async")

python
import asyncio
import structlog

logger = structlog.get_logger()

async def handle_request():
    await logger.ainfo("async_request", path="/api/items")
    # 同步方法也可在异步代码中使用:
    logger.info("sync_log_in_async")

Configuration

配置

Call
structlog.configure()
once at app startup, before any loggers are created.
python
import structlog

structlog.configure(
    processors=[...],          # list of processor callables
    wrapper_class=...,         # bound logger class (default: FilteringBoundLogger)
    context_class=dict,        # context storage class
    logger_factory=...,        # factory for the underlying output logger
    cache_logger_on_first_use=True,  # freeze config for performance (disable in tests)
)
Important:
get_logger()
returns a lazy proxy — safe to call at module level before
configure()
. Never call
bind()
or
new()
at module/class scope, as that freezes the default config. Use
get_logger(initial_key=value)
for pre-populated contexts instead.

在应用启动时,仅需调用一次
structlog.configure()
,且必须在创建任何日志器之前执行。
python
import structlog

structlog.configure(
    processors=[...],          # 处理器可调用对象列表
    wrapper_class=...,         # 绑定日志器类(默认:FilteringBoundLogger)
    context_class=dict,        # 上下文存储类
    logger_factory=...,        # 底层输出日志器的工厂
    cache_logger_on_first_use=True,  # 冻结配置以提升性能(测试环境中禁用)
)
重要提示:
get_logger()
返回一个延迟代理——在调用
configure()
之前,可安全地在模块级别调用。切勿在模块/类作用域中调用
bind()
new()
,因为这会冻结默认配置。如需预填充上下文,请使用
get_logger(initial_key=value)

Recommended Configurations

推荐配置

Development (pretty console output)

开发环境(美观的控制台输出)

python
import logging
import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.dev.set_exc_info,
        structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
        structlog.dev.ConsoleRenderer(),  # colorful, human-readable
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=False,  # keep False during development
)
python
import logging
import structlog

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.StackInfoRenderer(),
        structlog.dev.set_exc_info,
        structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
        structlog.dev.ConsoleRenderer(),  # 彩色、易读的格式
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.DEBUG),
    logger_factory=structlog.PrintLoggerFactory(),
    cache_logger_on_first_use=False,  # 开发环境中保持为False
)

Production (JSON output, stdlib integration)

生产环境(JSON输出,与标准库集成)

python
import logging
import sys
import structlog
python
import logging
import sys
import structlog

Configure stdlib logging first

先配置标准库logging

logging.basicConfig( format="%(message)s", stream=sys.stdout, level=logging.INFO, )
structlog.configure( processors=[ structlog.stdlib.filter_by_level, # drop below-threshold entries early structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer(), # final renderer → JSON string ], wrapper_class=structlog.stdlib.BoundLogger, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, )
undefined
logging.basicConfig( format="%(message)s", stream=sys.stdout, level=logging.INFO, )
structlog.configure( processors=[ structlog.stdlib.filter_by_level, # 提前过滤低于阈值的日志条目 structlog.contextvars.merge_contextvars, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer(), # 最终渲染器 → JSON字符串 ], wrapper_class=structlog.stdlib.BoundLogger, logger_factory=structlog.stdlib.LoggerFactory(), cache_logger_on_first_use=True, )
undefined

Dev/Prod auto-switch (single config)

开发/生产环境自动切换(单一配置)

python
import sys
import structlog

shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.stdlib.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
    structlog.processors.StackInfoRenderer(),
]

if sys.stderr.isatty():
    # Terminal session → pretty output
    processors = shared_processors + [structlog.dev.ConsoleRenderer()]
else:
    # Docker / CI / production → JSON with structured tracebacks
    processors = shared_processors + [
        structlog.processors.dict_tracebacks,
        structlog.processors.JSONRenderer(),
    ]

structlog.configure(processors=processors)

python
import sys
import structlog

shared_processors = [
    structlog.contextvars.merge_contextvars,
    structlog.stdlib.add_log_level,
    structlog.processors.TimeStamper(fmt="iso"),
    structlog.processors.StackInfoRenderer(),
]

if sys.stderr.isatty():
    # 终端会话 → 美观输出
    processors = shared_processors + [structlog.dev.ConsoleRenderer()]
else:
    # Docker / CI / 生产环境 → 带结构化回溯的JSON
    processors = shared_processors + [
        structlog.processors.dict_tracebacks,
        structlog.processors.JSONRenderer(),
    ]

structlog.configure(processors=processors)

Context Variables (Request-scoped Logging)

上下文变量(请求域日志)

Use
contextvars
to bind values like
request_id
once per request and have them appear in all log entries — even those in deeply nested functions.
使用
contextvars
在每个请求中绑定一次
request_id
等值,这些值会出现在所有日志条目中——即使是在深层嵌套的函数中。

Setup

配置

python
undefined
python
undefined

Must be first in the processor chain:

必须放在处理器链的首位:

structlog.configure( processors=[ structlog.contextvars.merge_contextvars, # ← first! ... ] )
undefined
structlog.configure( processors=[ structlog.contextvars.merge_contextvars, # ← 首位! ... ] )
undefined

Usage pattern

使用模式

python
from structlog.contextvars import (
    bind_contextvars,
    unbind_contextvars,
    clear_contextvars,
    bound_contextvars,  # context manager
)
python
from structlog.contextvars import (
    bind_contextvars,
    unbind_contextvars,
    clear_contextvars,
    bound_contextvars,  # 上下文管理器
)

In your request middleware / handler entry point:

在请求中间件/处理入口中:

def process_request(request): clear_contextvars() # reset from previous request! bind_contextvars( request_id=str(uuid.uuid4()), user_id=request.user.id, path=request.path, ) # All log calls anywhere in this thread/coroutine will include these values handle(request)
def process_request(request): clear_contextvars() # 重置之前请求的上下文! bind_contextvars( request_id=str(uuid.uuid4()), user_id=request.user.id, path=request.path, ) # 此线程/协程中的所有日志调用都会包含这些值 handle(request)

Temporarily bind extra context:

临时绑定额外上下文:

with bound_contextvars(operation="checkout"): log.info("starting_operation") do_checkout() log.info("operation_complete")
with bound_contextvars(operation="checkout"): log.info("starting_operation") do_checkout() log.info("operation_complete")

operation key is gone here

此处已无operation键

undefined
undefined

Flask example

Flask示例

python
import uuid
import flask
import structlog

logger = structlog.get_logger()
app = flask.Flask(__name__)

@app.before_request
def bind_request_context():
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=str(uuid.uuid4()),
        peer=flask.request.access_route[0],
        path=flask.request.path,
    )
FastAPI/Starlette warning: Context variables are isolated between sync and async execution contexts. Values bound in sync middleware won't appear in async route logs and vice versa. Use a dedicated async middleware that calls
bind_contextvars
inside the async context.
python
import uuid
import flask
import structlog

logger = structlog.get_logger()
app = flask.Flask(__name__)

@app.before_request
def bind_request_context():
    structlog.contextvars.clear_contextvars()
    structlog.contextvars.bind_contextvars(
        request_id=str(uuid.uuid4()),
        peer=flask.request.access_route[0],
        path=flask.request.path,
    )
**FastAPI/Starlette注意事项:**上下文变量在同步和异步执行上下文之间是隔离的。在同步中间件中绑定的值不会出现在异步路由日志中,反之亦然。请使用专用的异步中间件,在异步上下文中调用
bind_contextvars

Passing context to worker threads

将上下文传递给工作线程

python
from functools import partial
from structlog.contextvars import get_contextvars, bind_contextvars

def worker(ctx, item):
    bind_contextvars(**ctx)        # re-bind in the worker thread
    logger.info("processing_item", item=item)

ctx = get_contextvars()            # snapshot from the parent thread
with ThreadPoolExecutor() as pool:
    pool.map(partial(worker, ctx), items)

python
from functools import partial
from structlog.contextvars import get_contextvars, bind_contextvars

def worker(ctx, item):
    bind_contextvars(**ctx)        # 在工作线程中重新绑定
    logger.info("processing_item", item=item)

ctx = get_contextvars()            # 从父线程快照上下文
with ThreadPoolExecutor() as pool:
    pool.map(partial(worker, ctx), items)

Processors Reference

处理器参考

A processor is any callable with signature:
python
def my_processor(logger, method_name: str, event_dict: dict) -> dict:
    event_dict["my_key"] = compute_value()
    return event_dict
处理器是任何符合以下签名的可调用对象:
python
def my_processor(logger, method_name: str, event_dict: dict) -> dict:
    event_dict["my_key"] = compute_value()
    return event_dict

Built-in processors (most useful)

内置处理器(最实用的)

ProcessorPurpose
merge_contextvars
Merges contextvars into event dict (use first)
add_log_level
Adds
level
key
TimeStamper(fmt="iso")
Adds
timestamp
in ISO 8601
StackInfoRenderer()
Renders
stack_info
key if present
format_exc_info
Renders exception under
exception
key
dict_tracebacks
Structured (dict) exception tracebacks
UnicodeDecoder()
Decodes bytes values to str
CallsiteParameterAdder([...])
Adds filename, func_name, lineno
EventRenamer("msg")
Renames the
event
key
JSONRenderer()
Renders event dict to JSON string
ConsoleRenderer()
Pretty colorful console output
KeyValueRenderer()
Simple
key=value
output
DropEvent
Raise this exception to silently drop an entry
处理器用途
merge_contextvars
将contextvars合并到事件字典中(首位使用)
add_log_level
添加
level
TimeStamper(fmt="iso")
添加ISO 8601格式的
timestamp
StackInfoRenderer()
若存在
stack_info
键则进行渲染
format_exc_info
exception
键下渲染异常信息
dict_tracebacks
结构化(字典格式)的异常回溯
UnicodeDecoder()
将字节值解码为字符串
CallsiteParameterAdder([...])
添加文件名、函数名、行号
EventRenamer("msg")
重命名
event
JSONRenderer()
将事件字典渲染为JSON字符串
ConsoleRenderer()
美观的彩色控制台输出
KeyValueRenderer()
简单的
key=value
格式输出
DropEvent
抛出此异常可静默丢弃日志条目

Custom processor example

自定义处理器示例

python
def add_app_version(logger, method_name, event_dict):
    event_dict["app_version"] = "1.4.2"
    return event_dict

def drop_health_checks(logger, method_name, event_dict):
    if event_dict.get("path") == "/health":
        raise structlog.DropEvent
    return event_dict
python
def add_app_version(logger, method_name, event_dict):
    event_dict["app_version"] = "1.4.2"
    return event_dict

def drop_health_checks(logger, method_name, event_dict):
    if event_dict.get("path") == "/health":
        raise structlog.DropEvent
    return event_dict

Log-level filtering

日志级别过滤

python
import logging
python
import logging

Only log WARNING and above:

仅记录WARNING及以上级别:

structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(logging.WARNING), )

---
structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(logging.WARNING), )

---

Integration with stdlib
logging

与标准库
logging
集成

Quickest start

快速开始

python
import structlog
structlog.stdlib.recreate_defaults()
python
import structlog
structlog.stdlib.recreate_defaults()

structlog now routes through stdlib logging with sensible defaults

structlog现在会通过标准库logging路由,并使用合理的默认配置

undefined
undefined

Full integration (ProcessorFormatter)

完整集成(ProcessorFormatter)

Routes both structlog and stdlib
logging
through the same processor chain — consistent output for your code and third-party libraries:
python
import logging
import structlog

timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
shared_processors = [
    structlog.stdlib.add_log_level,
    structlog.stdlib.ExtraAdder(),   # pass `extra=` kwargs through
    timestamper,
]

structlog.configure(
    processors=shared_processors + [
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

formatter = structlog.stdlib.ProcessorFormatter(
    foreign_pre_chain=shared_processors,   # applied to stdlib-only entries
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.dev.ConsoleRenderer(),   # or JSONRenderer() for production
    ],
)

handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
Note: When using
ProcessorFormatter
, do not use
render_to_log_kwargs()
in the processor chain — use
wrap_for_formatter
instead.
Note: If using the same output stream for both structlog and stdlib, use
WriteLogger
(not
PrintLogger
) to prevent interleaved output.
PrintLogger
calls
print()
which writes message and newline separately.

将**structlog和标准库
logging
**都路由到同一处理器链中——你的代码和第三方库的输出格式保持一致:
python
import logging
import structlog

timestamper = structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S")
shared_processors = [
    structlog.stdlib.add_log_level,
    structlog.stdlib.ExtraAdder(),   # 传递`extra=`关键字参数
    timestamper,
]

structlog.configure(
    processors=shared_processors + [
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)

formatter = structlog.stdlib.ProcessorFormatter(
    foreign_pre_chain=shared_processors,   # 应用于仅使用标准库的日志条目
    processors=[
        structlog.stdlib.ProcessorFormatter.remove_processors_meta,
        structlog.dev.ConsoleRenderer(),   # 生产环境可替换为JSONRenderer()
    ],
)

handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger()
root_logger.addHandler(handler)
root_logger.setLevel(logging.INFO)
**注意:**使用
ProcessorFormatter
时,切勿在处理器链中使用
render_to_log_kwargs()
——请改用
wrap_for_formatter
**注意:**如果structlog和标准库使用同一输出流,请使用
WriteLogger
(而非
PrintLogger
)以避免输出交错。
PrintLogger
会调用
print()
,将消息和换行符分开写入。

Testing

测试

python
from structlog.testing import capture_logs
import structlog

def test_login_logs_user_id():
    with capture_logs() as cap:
        structlog.get_logger().bind(user_id=99).info("user_login")

    assert cap == [{"user_id": 99, "event": "user_login", "log_level": "info"}]
python
from structlog.testing import capture_logs
import structlog

def test_login_logs_user_id():
    with capture_logs() as cap:
        structlog.get_logger().bind(user_id=99).info("user_login")

    assert cap == [{"user_id": 99, "event": "user_login", "log_level": "info"}]

Capture with specific processors (e.g., contextvars)

捕获带特定处理器的日志(如contextvars)

python
from structlog import contextvars, get_logger
from structlog.testing import capture_logs

def test_contextvars_appear():
    with capture_logs(processors=[contextvars.merge_contextvars]) as cap:
        contextvars.bind_contextvars(request_id="xyz")
        get_logger().info("hello")

    assert cap[0]["request_id"] == "xyz"
python
from structlog import contextvars, get_logger
from structlog.testing import capture_logs

def test_contextvars_appear():
    with capture_logs(processors=[contextvars.merge_contextvars]) as cap:
        contextvars.bind_contextvars(request_id="xyz")
        get_logger().info("hello")

    assert cap[0]["request_id"] == "xyz"

pytest fixture

pytest夹具

python
import pytest
import structlog
from structlog.testing import LogCapture

@pytest.fixture(name="log_output")
def fixture_log_output():
    return LogCapture()

@pytest.fixture(autouse=True)
def fixture_configure_structlog(log_output):
    structlog.configure(processors=[log_output])
    yield
    structlog.reset_defaults()

def test_something(log_output):
    do_something()
    assert log_output.entries[0]["event"] == "expected_event"
Important: Disable
cache_logger_on_first_use=True
in test configuration — cached loggers won't be affected by
capture_logs()
.

python
import pytest
import structlog
from structlog.testing import LogCapture

@pytest.fixture(name="log_output")
def fixture_log_output():
    return LogCapture()

@pytest.fixture(autouse=True)
def fixture_configure_structlog(log_output):
    structlog.configure(processors=[log_output])
    yield
    structlog.reset_defaults()

def test_something(log_output):
    do_something()
    assert log_output.entries[0]["event"] == "expected_event"
**重要提示:**在测试配置中禁用
cache_logger_on_first_use=True
——缓存的日志器不会受
capture_logs()
影响。

Best Practices

最佳实践

Canonical log lines

标准日志行

Bind context incrementally throughout a request and emit one final summary log entry. Less noise, more signal.
python
log = structlog.get_logger()

def handle_order(order_id):
    log = log.bind(order_id=order_id)
    # ... process ...
    log.info("order_processed", items=5, total_usd=99.99, duration_ms=42)
在请求过程中逐步绑定上下文,并输出一条最终的汇总日志条目。减少冗余,提升信息密度。
python
log = structlog.get_logger()

def handle_order(order_id):
    log = log.bind(order_id=order_id)
    # ... 处理逻辑 ...
    log.info("order_processed", items=5, total_usd=99.99, duration_ms=42)

Use events as identifiers, not messages

将事件用作标识符,而非消息

python
undefined
python
undefined

Bad — hard to query in log aggregators:

不佳——在日志聚合器中难以查询:

log.info("User 42 logged in from 1.2.3.4")
log.info("User 42 logged in from 1.2.3.4")

Good — machine-readable, queryable:

良好——机器可读,可查询:

log.info("user_login", user_id=42, ip="1.2.3.4")
undefined
log.info("user_login", user_id=42, ip="1.2.3.4")
undefined

Log to stdout, let infrastructure handle the rest

输出到stdout,由基础设施处理后续流程

structlog → stdout → systemd/Docker/Kubernetes → log aggregator (ELK, Graylog, Datadog).
structlog → stdout → systemd/Docker/Kubernetes → 日志聚合器(ELK、Graylog、Datadog)。

Performance tip

性能优化提示

In hot paths, create a local bound logger to avoid per-call proxy overhead:
python
def process_batch(items):
    log = structlog.get_logger().bind(batch_size=len(items))  # one proxy resolution
    for item in items:
        log.debug("processing_item", item_id=item.id)

在热点路径中,创建本地绑定日志器以避免每次调用的代理开销:
python
def process_batch(items):
    log = structlog.get_logger().bind(batch_size=len(items))  # 仅解析一次代理
    for item in items:
        log.debug("processing_item", item_id=item.id)

Common Pitfalls

常见陷阱

PitfallFix
Calling
bind()
/
new()
at module scope
Use
get_logger(key=value)
for initial values instead
Forgetting
clear_contextvars()
at request start
Old request's context leaks into new requests
cache_logger_on_first_use=True
in tests
capture_logs()
won't work; disable it in test setup
Using
PrintLogger
alongside
logging.StreamHandler
on same stream
Use
WriteLogger
to avoid interleaved output
Not calling
structlog.configure()
before first log
Logs with default config (may not match your expected format)
Hybrid sync/async with FastAPI/StarletteContextvars don't cross sync↔async boundaries automatically
Putting
merge_contextvars
anywhere but first
Context vars won't appear in output

陷阱解决方法
在模块作用域中调用
bind()
/
new()
改用
get_logger(key=value)
设置初始值
请求开始时忘记调用
clear_contextvars()
旧请求的上下文会泄漏到新请求中
测试环境中设置
cache_logger_on_first_use=True
capture_logs()
将失效;在测试设置中禁用
同一流中同时使用
PrintLogger
logging.StreamHandler
使用
WriteLogger
避免输出交错
首次日志调用前未调用
structlog.configure()
日志将使用默认配置(可能与预期格式不符)
FastAPI/Starlette中混合同步/异步代码Contextvars不会自动跨同步↔异步边界传递
merge_contextvars
未放在处理器链首位
上下文变量不会出现在输出中

Advanced Examples

高级示例

Rename
event
key to
message
for ECS/Datadog compatibility

重命名
event
键为
message
以兼容ECS/Datadog

python
from structlog.processors import EventRenamer

structlog.configure(
    processors=[
        ...
        EventRenamer("message"),   # renames event → message in output
        structlog.processors.JSONRenderer(),
    ]
)
python
from structlog.processors import EventRenamer

structlog.configure(
    processors=[
        ...
        EventRenamer("message"),   # 输出中将event重命名为message
        structlog.processors.JSONRenderer(),
    ]
)

Fine-grained per-module filtering

细粒度的按模块过滤

python
def filter_noisy_module(logger, method_name, event_dict):
    if event_dict.get("func_name") in {"health_check", "ping"}:
        raise structlog.DropEvent
    return event_dict

structlog.configure(
    processors=[
        structlog.processors.CallsiteParameterAdder(
            [structlog.processors.CallsiteParameter.FUNC_NAME]
        ),
        filter_noisy_module,
        ...
    ]
)
python
def filter_noisy_module(logger, method_name, event_dict):
    if event_dict.get("func_name") in {"health_check", "ping"}:
        raise structlog.DropEvent
    return event_dict

structlog.configure(
    processors=[
        structlog.processors.CallsiteParameterAdder(
            [structlog.processors.CallsiteParameter.FUNC_NAME]
        ),
        filter_noisy_module,
        ...
    ]
)

Output to stderr

输出到stderr

python
import sys
structlog.configure(logger_factory=structlog.PrintLoggerFactory(sys.stderr))
python
import sys
structlog.configure(logger_factory=structlog.PrintLoggerFactory(sys.stderr))

Custom bound logger with domain-specific methods

带领域特定方法的自定义绑定日志器

python
from structlog import BoundLoggerBase, PrintLogger, wrap_logger

class AppLogger(BoundLoggerBase):
    def user_action(self, action: str, **kw):
        return self._proxy_to_logger("info", action, status="ok", **kw)

    def user_error(self, action: str, **kw):
        return self._proxy_to_logger("warning", action, status="error", **kw)

log = wrap_logger(PrintLogger(), wrapper_class=AppLogger)
log.user_action("checkout", cart_size=3)
python
from structlog import BoundLoggerBase, PrintLogger, wrap_logger

class AppLogger(BoundLoggerBase):
    def user_action(self, action: str, **kw):
        return self._proxy_to_logger("info", action, status="ok", **kw)

    def user_error(self, action: str, **kw):
        return self._proxy_to_logger("warning", action, status="error", **kw)

log = wrap_logger(PrintLogger(), wrapper_class=AppLogger)
log.user_action("checkout", cart_size=3)

Reset context for
contextvars.Token

使用
contextvars.Token
重置上下文

python
from structlog.contextvars import bind_contextvars, reset_contextvars

def handler():
    bind_contextvars(user="alice")
    _helper()
    log.info("back to alice")   # user=alice

def _helper():
    tokens = bind_contextvars(user="bob")
    log.info("inside helper")   # user=bob
    reset_contextvars(**tokens)  # restore previous values

python
from structlog.contextvars import bind_contextvars, reset_contextvars

def handler():
    bind_contextvars(user="alice")
    _helper()
    log.info("back to alice")   # user=alice

def _helper():
    tokens = bind_contextvars(user="bob")
    log.info("inside helper")   # user=bob
    reset_contextvars(**tokens)  # 恢复之前的值

Quick Reference

快速参考

python
import structlog
python
import structlog

Module-level logger (safe at import time)

模块级日志器(导入时安全调用)

logger = structlog.get_logger()
logger = structlog.get_logger()

Per-request context (in middleware)

请求级上下文(在中间件中)

structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars(request_id="...", user_id=1)
structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars(request_id="...", user_id=1)

Local immutable context

本地不可变上下文

log = logger.bind(component="payments") log = log.bind(order_id=42) # new logger, old unchanged log = log.unbind("order_id") log = log.new(session_id="fresh") # replace all context
log = logger.bind(component="payments") log = log.bind(order_id=42) # 新日志器,旧日志器保持不变 log = log.unbind("order_id") log = log.new(session_id="fresh") # 替换所有上下文

Log calls

日志调用

log.debug / .info / .warning / .error / .critical("event_name", key=value) log.exception("event_name") # includes exc_info
log.debug / .info / .warning / .error / .critical("event_name", key=value) log.exception("event_name") # 包含异常信息

Async

异步日志

await log.ainfo("async_event")
await log.ainfo("async_event")

Reset config (useful in tests)

重置配置(测试中实用)

structlog.reset_defaults()
undefined
structlog.reset_defaults()
undefined