messaging-testing-rabbitmq
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseRabbitMQ Integration Testing
RabbitMQ集成测试
Quick References: Seefor @SpringRabbitTest details,quick-ref/spring-rabbit-test.mdfor Testcontainers patterns.quick-ref/testcontainers-rabbitmq.md
快速参考:查看获取@SpringRabbitTest详细信息,查看quick-ref/spring-rabbit-test.md获取Testcontainers使用模式。quick-ref/testcontainers-rabbitmq.md
Testing Approach Selection
测试方案选择
| Approach | Speed | Fidelity | Best For |
|---|---|---|---|
| @SpringRabbitTest + Harness | Fast (no broker) | Medium (spy/capture) | Testing listener logic with Spring context |
| TestRabbitTemplate | Fast (no broker) | Low (no routing) | Testing send/receive without real broker |
| Testcontainers RabbitMQContainer | Slow (~5s startup) | Highest (real broker) | Full integration tests, exchange/queue routing |
Decision rule: Use @SpringRabbitTest for unit-testing listeners in Spring context. Use Testcontainers for end-to-end message flow with real routing.
| 方案 | 速度 | 保真度 | 最佳适用场景 |
|---|---|---|---|
| @SpringRabbitTest + Harness | 快(无需消息代理) | 中等(间谍/捕获) | 在Spring上下文环境中测试监听器逻辑 |
| TestRabbitTemplate | 快(无需消息代理) | 低(无路由功能) | 无需真实消息代理的发送/接收测试 |
| Testcontainers RabbitMQContainer | 慢(启动约5秒) | 最高(真实消息代理) | 完整集成测试、交换机/队列路由验证 |
决策规则:在Spring上下文环境中对监听器进行单元测试时使用@SpringRabbitTest。需要验证带真实路由的端到端消息流时使用Testcontainers。
Java/Spring: @SpringRabbitTest + RabbitListenerTestHarness
Java/Spring:@SpringRabbitTest + RabbitListenerTestHarness
Dependencies
依赖
xml
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>xml
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>Spy Pattern — Verify Listener Called
间谍模式 — 验证监听器被调用
java
@SpringBootTest
@SpringRabbitTest
class OrderConsumerSpyTest {
@Autowired
private RabbitListenerTestHarness harness;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldInvokeListener() throws Exception {
OrderConsumer spy = harness.getSpy("orderListener");
assertThat(spy).isNotNull();
LatchCountDownAndCallRealMethodAnswer answer =
harness.getLatchAnswerFor("orderListener", 1);
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("123", "CREATED"));
assertThat(answer.await(10)).isTrue();
verify(spy).handleOrder(argThat(e -> e.getOrderId().equals("123")));
}
}java
@SpringBootTest
@SpringRabbitTest
class OrderConsumerSpyTest {
@Autowired
private RabbitListenerTestHarness harness;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldInvokeListener() throws Exception {
OrderConsumer spy = harness.getSpy("orderListener");
assertThat(spy).isNotNull();
LatchCountDownAndCallRealMethodAnswer answer =
harness.getLatchAnswerFor("orderListener", 1);
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("123", "CREATED"));
assertThat(answer.await(10)).isTrue();
verify(spy).handleOrder(argThat(e -> e.getOrderId().equals("123")));
}
}Capture Pattern — Inspect Invocation Data
捕获模式 — 检查调用数据
java
@SpringBootTest
@SpringRabbitTest
class OrderConsumerCaptureTest {
@Autowired
private RabbitListenerTestHarness harness;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldCaptureInvocationData() throws Exception {
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("456", "PAID"));
InvocationData data = harness.getNextInvocationDataFor(
"orderListener", 10, TimeUnit.SECONDS);
assertThat(data).isNotNull();
OrderEvent captured = (OrderEvent) data.getArguments()[0];
assertThat(captured.getOrderId()).isEqualTo("456");
assertThat(captured.getStatus()).isEqualTo("PAID");
}
}java
@SpringBootTest
@SpringRabbitTest
class OrderConsumerCaptureTest {
@Autowired
private RabbitListenerTestHarness harness;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldCaptureInvocationData() throws Exception {
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("456", "PAID"));
InvocationData data = harness.getNextInvocationDataFor(
"orderListener", 10, TimeUnit.SECONDS);
assertThat(data).isNotNull();
OrderEvent captured = (OrderEvent) data.getArguments()[0];
assertThat(captured.getOrderId()).isEqualTo("456");
assertThat(captured.getStatus()).isEqualTo("PAID");
}
}Request-Reply Test
请求-响应测试
java
@SpringBootTest
@SpringRabbitTest
class OrderServiceReplyTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldReturnOrderResponse() {
OrderRequest request = new OrderRequest("item-1", 2);
OrderResponse response = (OrderResponse) rabbitTemplate.convertSendAndReceive(
"orders.exchange", "orders.create", request);
assertThat(response).isNotNull();
assertThat(response.getStatus()).isEqualTo("CREATED");
}
}java
@SpringBootTest
@SpringRabbitTest
class OrderServiceReplyTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void shouldReturnOrderResponse() {
OrderRequest request = new OrderRequest("item-1", 2);
OrderResponse response = (OrderResponse) rabbitTemplate.convertSendAndReceive(
"orders.exchange", "orders.create", request);
assertThat(response).isNotNull();
assertThat(response.getStatus()).isEqualTo("CREATED");
}
}Java/Spring: TestRabbitTemplate
Java/Spring:TestRabbitTemplate
For testing without a running broker:
java
@SpringBootTest
@SpringRabbitTest
class NoBrokerTest {
@Autowired
private TestRabbitTemplate testRabbitTemplate;
@Test
void shouldSendWithoutBroker() {
testRabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("789", "CREATED"));
// TestRabbitTemplate routes directly to @RabbitListener methods
// Verify side effects (database writes, service calls, etc.)
}
}无需运行消息代理的测试场景:
java
@SpringBootTest
@SpringRabbitTest
class NoBrokerTest {
@Autowired
private TestRabbitTemplate testRabbitTemplate;
@Test
void shouldSendWithoutBroker() {
testRabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("789", "CREATED"));
// TestRabbitTemplate直接路由到@RabbitListener方法
// 验证副作用(数据库写入、服务调用等)
}
}Java/Spring: Testcontainers RabbitMQContainer
Java/Spring:Testcontainers RabbitMQContainer
With @ServiceConnection (Spring Boot 3.1+)
结合@ServiceConnection(Spring Boot 3.1+)
java
@SpringBootTest
@Testcontainers
class RabbitIntegrationTest {
@Container
@ServiceConnection
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management");
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderRepository orderRepository;
@Test
void shouldProduceAndConsumeOrder() {
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("123", "CREATED"));
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Optional<Order> order = orderRepository.findById("123");
assertThat(order).isPresent();
assertThat(order.get().getStatus()).isEqualTo("CREATED");
});
}
}java
@SpringBootTest
@Testcontainers
class RabbitIntegrationTest {
@Container
@ServiceConnection
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management");
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderRepository orderRepository;
@Test
void shouldProduceAndConsumeOrder() {
rabbitTemplate.convertAndSend("orders.exchange", "orders.created",
new OrderEvent("123", "CREATED"));
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Optional<Order> order = orderRepository.findById("123");
assertThat(order).isPresent();
assertThat(order.get().getStatus()).isEqualTo("CREATED");
});
}
}Pre-Provisioned Exchanges and Queues
预配置交换机和队列
java
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management")
.withExchange("orders.exchange", "direct")
.withQueue("orders.queue")
.withBinding("orders.exchange", "orders.queue",
Map.of(), "orders.created", "queue");java
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.13-management")
.withExchange("orders.exchange", "direct")
.withQueue("orders.queue")
.withBinding("orders.exchange", "orders.queue",
Map.of(), "orders.created", "queue");With @DynamicPropertySource (pre-3.1)
结合@DynamicPropertySource(3.1之前版本)
java
@DynamicPropertySource
static void rabbitProperties(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", rabbit::getHost);
registry.add("spring.rabbitmq.port", rabbit::getAmqpPort);
registry.add("spring.rabbitmq.username", rabbit::getAdminUsername);
registry.add("spring.rabbitmq.password", rabbit::getAdminPassword);
}java
@DynamicPropertySource
static void rabbitProperties(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", rabbit::getHost);
registry.add("spring.rabbitmq.port", rabbit::getAmqpPort);
registry.add("spring.rabbitmq.username", rabbit::getAdminUsername);
registry.add("spring.rabbitmq.password", rabbit::getAdminPassword);
}Node.js: amqplib + Testcontainers
Node.js:amqplib + Testcontainers
typescript
import { RabbitMQContainer } from "@testcontainers/rabbitmq";
import amqp from "amqplib";
describe("RabbitMQ Integration", () => {
let container: StartedTestContainer;
let connection: amqp.Connection;
beforeAll(async () => {
container = await new RabbitMQContainer("rabbitmq:3.13-management").start();
connection = await amqp.connect(container.getAmqpUrl());
}, 60_000);
afterAll(async () => {
await connection.close();
await container.stop();
});
it("should produce and consume messages", async () => {
const channel = await connection.createChannel();
const queue = "test-queue";
await channel.assertQueue(queue, { durable: false });
const message = { orderId: "123", status: "CREATED" };
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
const received = await new Promise<any>((resolve) => {
channel.consume(queue, (msg) => {
if (msg) resolve(JSON.parse(msg.content.toString()));
});
});
expect(received.orderId).toBe("123");
await channel.close();
});
});typescript
import { RabbitMQContainer } from "@testcontainers/rabbitmq";
import amqp from "amqplib";
describe("RabbitMQ Integration", () => {
let container: StartedTestContainer;
let connection: amqp.Connection;
beforeAll(async () => {
container = await new RabbitMQContainer("rabbitmq:3.13-management").start();
connection = await amqp.connect(container.getAmqpUrl());
}, 60_000);
afterAll(async () => {
await connection.close();
await container.stop();
});
it("should produce and consume messages", async () => {
const channel = await connection.createChannel();
const queue = "test-queue";
await channel.assertQueue(queue, { durable: false });
const message = { orderId: "123", status: "CREATED" };
channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));
const received = await new Promise<any>((resolve) => {
channel.consume(queue, (msg) => {
if (msg) resolve(JSON.parse(msg.content.toString()));
});
});
expect(received.orderId).toBe("123");
await channel.close();
});
});Python: pika + Testcontainers
Python:pika + Testcontainers
python
import pytest
import pika
import json
from testcontainers.rabbitmq import RabbitMqContainer
@pytest.fixture(scope="module")
def rabbitmq():
with RabbitMqContainer("rabbitmq:3.13-management") as container:
yield container
def test_produce_and_consume(rabbitmq):
params = pika.ConnectionParameters(
host=rabbitmq.get_container_host_ip(),
port=rabbitmq.get_exposed_port(5672),
credentials=pika.PlainCredentials("guest", "guest"),
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue="test-queue")
message = {"orderId": "123", "status": "CREATED"}
channel.basic_publish(exchange="", routing_key="test-queue",
body=json.dumps(message))
method, props, body = channel.basic_get(queue="test-queue", auto_ack=True)
assert method is not None
assert json.loads(body)["orderId"] == "123"
connection.close()python
import pytest
import pika
import json
from testcontainers.rabbitmq import RabbitMqContainer
@pytest.fixture(scope="module")
def rabbitmq():
with RabbitMqContainer("rabbitmq:3.13-management") as container:
yield container
def test_produce_and_consume(rabbitmq):
params = pika.ConnectionParameters(
host=rabbitmq.get_container_host_ip(),
port=rabbitmq.get_exposed_port(5672),
credentials=pika.PlainCredentials("guest", "guest"),
)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue="test-queue")
message = {"orderId": "123", "status": "CREATED"}
channel.basic_publish(exchange="", routing_key="test-queue",
body=json.dumps(message))
method, props, body = channel.basic_get(queue="test-queue", auto_ack=True)
assert method is not None
assert json.loads(body)["orderId"] == "123"
connection.close()Anti-Patterns
反模式
| Anti-Pattern | Problem | Solution |
|---|---|---|
Not using | Manual harness setup | Annotation auto-configures harness and template |
Ignoring | Flaky or hanging tests | Always pass timeout to |
| Hardcoded exchange/queue names in tests | Coupling to production config | Use constants or test-specific names |
No | Assertions run before consumption | Use Awaitility or CountDownLatch |
| Starting broker per test method | Extremely slow | Use static container shared across tests |
| 反模式 | 问题 | 解决方案 |
|---|---|---|
不使用 | 手动配置Harness繁琐 | 该注解会自动配置Harness和模板 |
忽略 | 测试不稳定或挂起 | 始终为 |
| 测试中硬编码交换机/队列名称 | 与生产配置耦合 | 使用常量或测试专用名称 |
异步消费者未使用 | 断言在消费完成前执行 | 使用Awaitility或CountDownLatch |
| 每个测试方法启动一次消息代理 | 测试速度极慢 | 使用静态容器在多个测试间共享 |
Quick Troubleshooting
快速故障排查
| Problem | Cause | Solution |
|---|---|---|
| Harness returns null spy | Listener ID mismatch | Verify |
| "No queue bound" error | Exchange/queue not declared | Use |
| Message not received | Wrong routing key | Verify exchange type and binding key match |
| Connection refused in tests | Container not ready | Use |
| TestRabbitTemplate silent failure | No listener found | Ensure |
| 问题 | 原因 | 解决方案 |
|---|---|---|
| Harness返回null间谍对象 | 监听器ID不匹配 | 验证 |
| "无绑定队列"错误 | 交换机/队列未声明 | 使用 |
| 未接收到消息 | 路由键错误 | 验证交换机类型和绑定键匹配 |
| 测试中连接被拒绝 | 容器未就绪 | 使用 |
| TestRabbitTemplate无响应失败 | 未找到监听器 | 确保 |
Reference Documentation
参考文档
Cross-reference: For Spring AMQP producer/consumer patterns, seeskill. For generic Testcontainers patterns, seespring-amqpskill.testcontainers
交叉参考:关于Spring AMQP生产者/消费者模式,请查看技能。关于通用Testcontainers模式,请查看spring-amqp技能。testcontainers