testcontainers-python

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

testcontainers-python

testcontainers-python

Deep Knowledge: Use
mcp__documentation__fetch_docs
with technology:
testcontainers-python
for comprehensive documentation on all containers, wait strategies, and advanced patterns.
深度知识:使用
mcp__documentation__fetch_docs
工具,指定技术为
testcontainers-python
,可获取所有容器、等待策略及高级模式的完整文档。

Installation

安装

bash
pip install "testcontainers[postgres]"
pip install "testcontainers[postgres,redis,kafka]"  # multiple
pip install testcontainers  # base only (GenericContainer, DockerCompose)
bash
pip install "testcontainers[postgres]"
pip install "testcontainers[postgres,redis,kafka]"  # 多个模块
pip install testcontainers  # 仅基础模块(GenericContainer、DockerCompose)

Core Pattern — Session-Scoped Container

核心模式 — 会话作用域容器

python
import pytest
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def postgres_container():
    with PostgresContainer("postgres:16-alpine") as pg:
        yield pg

def test_with_postgres(postgres_container):
    url = postgres_container.get_connection_url()  # sqlalchemy URL
    # Use url to connect
python
import pytest
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def postgres_container():
    with PostgresContainer("postgres:16-alpine") as pg:
        yield pg

def test_with_postgres(postgres_container):
    url = postgres_container.get_connection_url()  # sqlalchemy 格式的URL
    # 使用该URL建立连接

Database Containers

数据库容器

python
from testcontainers.postgres import PostgresContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.mongodb import MongoDbContainer
python
from testcontainers.postgres import PostgresContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.mongodb import MongoDbContainer

PostgreSQL

PostgreSQL

with PostgresContainer("postgres:16-alpine") as pg: url = pg.get_connection_url() # postgresql+psycopg2://test:test@localhost:{port}/test
with PostgresContainer("postgres:16-alpine") as pg: url = pg.get_connection_url() # postgresql+psycopg2://test:test@localhost:{port}/test

MySQL

MySQL

with MySqlContainer("mysql:8.0") as mysql: url = mysql.get_connection_url()
with MySqlContainer("mysql:8.0") as mysql: url = mysql.get_connection_url()

MongoDB

MongoDB

with MongoDbContainer("mongo:7") as mongo: client = MongoClient(mongo.get_connection_url())
undefined
with MongoDbContainer("mongo:7") as mongo: client = MongoClient(mongo.get_connection_url())
undefined

Cache / Messaging Containers

缓存/消息队列容器

python
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.rabbitmq import RabbitMqContainer
python
from testcontainers.redis import RedisContainer
from testcontainers.kafka import KafkaContainer
from testcontainers.rabbitmq import RabbitMqContainer

Redis

Redis

with RedisContainer("redis:7-alpine") as redis: host = redis.get_container_host_ip() port = redis.get_exposed_port(6379)
with RedisContainer("redis:7-alpine") as redis: host = redis.get_container_host_ip() port = redis.get_exposed_port(6379)

Kafka (uses KRaft by default in v4+)

Kafka(v4+版本默认使用KRaft)

with KafkaContainer("confluentinc/cp-kafka:7.6.1") as kafka: bootstrap = kafka.get_bootstrap_server()
with KafkaContainer("confluentinc/cp-kafka:7.6.1") as kafka: bootstrap = kafka.get_bootstrap_server()

RabbitMQ

RabbitMQ

with RabbitMqContainer("rabbitmq:3-management") as rmq: url = rmq.get_connection_params() # pika connection params
undefined
with RabbitMqContainer("rabbitmq:3-management") as rmq: url = rmq.get_connection_params() # pika 连接参数
undefined

GenericContainer + Wait Strategies

GenericContainer + 等待策略

python
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

@pytest.fixture(scope="session")
def meilisearch():
    with (
        DockerContainer("getmeili/meilisearch:latest")
        .with_exposed_ports(7700)
        .with_env("MEILI_MASTER_KEY", "test_key")
        .with_kwargs(mem_limit="256m")
    ) as container:
        wait_for_logs(container, "Server listening on")
        yield container
python
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs

@pytest.fixture(scope="session")
def meilisearch():
    with (
        DockerContainer("getmeili/meilisearch:latest")
        .with_exposed_ports(7700)
        .with_env("MEILI_MASTER_KEY", "test_key")
        .with_kwargs(mem_limit="256m")
    ) as container:
        wait_for_logs(container, "Server listening on")
        yield container

All Wait Strategies

所有等待策略

python
from testcontainers.core.waiting_utils import (
    wait_for_logs,           # wait_for_logs(container, "started", timeout=60)
    wait_container_is_ready, # wait_container_is_ready(container)
)
from testcontainers.core.wait.http_wait import HttpWaitStrategy
from testcontainers.core.wait.log_wait import LogMessageWaitStrategy
from testcontainers.core.wait.exec_wait import ExecWaitStrategy
python
from testcontainers.core.waiting_utils import (
    wait_for_logs,           # wait_for_logs(container, "started", timeout=60)
    wait_container_is_ready, # wait_container_is_ready(container)
)
from testcontainers.core.wait.http_wait import HttpWaitStrategy
from testcontainers.core.wait.log_wait import LogMessageWaitStrategy
from testcontainers.core.wait.exec_wait import ExecWaitStrategy

HTTP health check

HTTP健康检查

container.with_wait_for( HttpWaitStrategy("/health").with_status_code(200).with_startup_timeout(60) )
container.with_wait_for( HttpWaitStrategy("/health").with_status_code(200).with_startup_timeout(60) )

Log message

日志消息

container.with_wait_for( LogMessageWaitStrategy(r".Application started.") )
container.with_wait_for( LogMessageWaitStrategy(r".Application started.") )

Exec command

执行命令

container.with_wait_for( ExecWaitStrategy(["pg_isready", "-U", "test"]) )
undefined
container.with_wait_for( ExecWaitStrategy(["pg_isready", "-U", "test"]) )
undefined

Docker Compose

Docker Compose

python
from testcontainers.compose import DockerCompose

@pytest.fixture(scope="session")
def compose():
    with DockerCompose(
        filepath="tests/",
        compose_file_name="docker-compose.test.yml",
        pull=True,
    ) as c:
        c.wait_for("http://localhost:8080/health")
        yield c
python
from testcontainers.compose import DockerCompose

@pytest.fixture(scope="session")
def compose():
    with DockerCompose(
        filepath="tests/",
        compose_file_name="docker-compose.test.yml",
        pull=True,
    ) as c:
        c.wait_for("http://localhost:8080/health")
        yield c

Multi-Container Network

多容器网络

python
from testcontainers.core.network import Network
from testcontainers.core.container import DockerContainer
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def test_network():
    with Network() as net:
        yield net

@pytest.fixture(scope="session")
def pg(test_network):
    with (
        PostgresContainer("postgres:16-alpine")
        .with_network(test_network)
        .with_network_aliases("postgres")
    ) as pg:
        yield pg
python
from testcontainers.core.network import Network
from testcontainers.core.container import DockerContainer
from testcontainers.postgres import PostgresContainer

@pytest.fixture(scope="session")
def test_network():
    with Network() as net:
        yield net

@pytest.fixture(scope="session")
def pg(test_network):
    with (
        PostgresContainer("postgres:16-alpine")
        .with_network(test_network)
        .with_network_aliases("postgres")
    ) as pg:
        yield pg

Async Support

异步支持

python
import asyncpg

@pytest.fixture(scope="session")
async def async_pg(postgres_container):
    url = postgres_container.get_connection_url().replace(
        "postgresql+psycopg2://", "postgresql://"
    )
    pool = await asyncpg.create_pool(url)
    yield pool
    await pool.close()
python
import asyncpg

@pytest.fixture(scope="session")
async def async_pg(postgres_container):
    url = postgres_container.get_connection_url().replace(
        "postgresql+psycopg2://", "postgresql://"
    )
    pool = await asyncpg.create_pool(url)
    yield pool
    await pool.close()

Container Reuse (Dev Mode)

容器复用(开发模式)

bash
undefined
bash
undefined

~/.testcontainers.properties

~/.testcontainers.properties

testcontainers.reuse.enable=true

```python
container = PostgresContainer("postgres:16").with_kwargs(labels={"testcontainers.reuse": "true"})
testcontainers.reuse.enable=true

```python
container = PostgresContainer("postgres:16").with_kwargs(labels={"testcontainers.reuse": "true"})

CI/CD — GitHub Actions

CI/CD — GitHub Actions

yaml
jobs:
  integration:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      - run: pip install -e ".[test]"
      - run: pytest -m integration
        env:
          DOCKER_HOST: unix:///var/run/docker.sock
          TESTCONTAINERS_RYUK_DISABLED: "false"
yaml
jobs:
  integration:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.12" }
      - run: pip install -e ".[test]"
      - run: pytest -m integration
        env:
          DOCKER_HOST: unix:///var/run/docker.sock
          TESTCONTAINERS_RYUK_DISABLED: "false"

Anti-Patterns

反模式

Anti-PatternSolution
New container per testSession-scoped with
scope="session"
Hardcoded
localhost:5432
container.get_exposed_port(5432)
Using
latest
image
Pin to
postgres:16-alpine
No wait strategyAlways use wait_for_logs or HttpWaitStrategy
反模式解决方案
每个测试创建新容器使用
scope="session"
设置会话作用域
硬编码
localhost:5432
使用
container.get_exposed_port(5432)
使用
latest
镜像
固定版本,如
postgres:16-alpine
未使用等待策略始终使用wait_for_logs或HttpWaitStrategy

Reference Documentation

参考文档

  • Comprehensive Reference
  • 完整参考