aws-sdk-java-v2-messaging
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAWS SDK for Java 2.x - Messaging (SQS & SNS)
AWS SDK for Java 2.x - 消息服务(SQS & SNS)
Overview
概述
Provide comprehensive AWS messaging patterns using AWS SDK for Java 2.x for both SQS and SNS services. Include client setup, queue management, message operations, subscription management, and Spring Boot integration patterns.
提供基于AWS SDK for Java 2.x的SQS和SNS服务的完整AWS消息模式实现,包括客户端配置、队列管理、消息操作、订阅管理以及Spring Boot集成模式。
When to Use
适用场景
Use this skill when working with:
- Amazon SQS queues for message queuing
- SNS topics for event publishing and notification
- FIFO queues and standard queues
- Dead Letter Queues (DLQ) for message handling
- SNS subscriptions with email, SMS, SQS, Lambda endpoints
- Pub/sub messaging patterns and event-driven architectures
- Spring Boot integration with AWS messaging services
- Testing strategies using LocalStack or Testcontainers
在以下场景中使用本技能:
- 使用Amazon SQS队列实现消息队列功能
- 使用SNS主题实现事件发布与通知
- FIFO队列与标准队列的使用
- 死信队列(DLQ)的消息处理
- 配置SNS订阅(支持邮箱、SMS、SQS、Lambda端点)
- 发布/订阅消息模式与事件驱动架构
- Spring Boot与AWS消息服务的集成
- 使用LocalStack或Testcontainers进行测试
Quick Start
快速开始
Dependencies
依赖
xml
<!-- SQS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<!-- SNS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
</dependency>xml
<!-- SQS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<!-- SNS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
</dependency>Basic Client Setup
基础客户端配置
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.build();java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.build();Examples
示例
Basic SQS Operations
基础SQS操作
Create and Send Message
创建队列并发送消息
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
// Setup SQS client
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
// Create queue
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("my-queue")
.build()).queueUrl();
// Send message
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("Hello, SQS!")
.build()).messageId();java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
// 配置SQS客户端
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
// 创建队列
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("my-queue")
.build()).queueUrl();
// 发送消息
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("Hello, SQS!")
.build()).messageId();Receive and Delete Message
接收并删除消息
java
// Receive messages with long polling
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(20)
.build());
// Process and delete messages
response.messages().forEach(message -> {
System.out.println("Received: " + message.body());
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
});java
// 使用长轮询接收消息
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(20)
.build());
// 处理并删除消息
response.messages().forEach(message -> {
System.out.println("Received: " + message.body());
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
});Basic SNS Operations
基础SNS操作
Create Topic and Publish
创建主题并发布消息
java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;
// Setup SNS client
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.build();
// Create topic
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
.name("my-topic")
.build()).topicArn();
// Publish message
String messageId = snsClient.publish(PublishRequest.builder()
.topicArn(topicArn)
.subject("Test Notification")
.message("Hello, SNS!")
.build()).messageId();java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;
// 配置SNS客户端
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.build();
// 创建主题
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
.name("my-topic")
.build()).topicArn();
// 发布消息
String messageId = snsClient.publish(PublishRequest.builder()
.topicArn(topicArn)
.subject("Test Notification")
.message("Hello, SNS!")
.build()).messageId();Advanced Examples
进阶示例
FIFO Queue Pattern
FIFO队列模式
java
// Create FIFO queue
Map<QueueAttributeName, String> attributes = Map.of(
QueueAttributeName.FIFO_QUEUE, "true",
QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);
String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("my-queue.fifo")
.attributes(attributes)
.build()).queueUrl();
// Send FIFO message with group ID
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(fifoQueueUrl)
.messageBody("Order #12345")
.messageGroupId("orders")
.messageDeduplicationId(UUID.randomUUID().toString())
.build()).messageId();java
// 创建FIFO队列
Map<QueueAttributeName, String> attributes = Map.of(
QueueAttributeName.FIFO_QUEUE, "true",
QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);
String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("my-queue.fifo")
.attributes(attributes)
.build()).queueUrl();
// 发送带分组ID的FIFO消息
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(fifoQueueUrl)
.messageBody("Order #12345")
.messageGroupId("orders")
.messageDeduplicationId(UUID.randomUUID().toString())
.build()).messageId();SNS to SQS Subscription
SNS到SQS的订阅
java
// Create SQS queue for subscription
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("notification-subscriber")
.build()).queueUrl();
// Get queue ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
.queueUrl(subscriptionQueueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build()).attributes().get(QueueAttributeName.QUEUE_ARN);
// Subscribe SQS to SNS
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
.protocol("sqs")
.endpoint(queueArn)
.topicArn(topicArn)
.build()).subscriptionArn();java
// 创建用于订阅的SQS队列
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
.queueName("notification-subscriber")
.build()).queueUrl();
// 获取队列ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
.queueUrl(subscriptionQueueUrl)
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build()).attributes().get(QueueAttributeName.QUEUE_ARN);
// 订阅SQS到SNS主题
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
.protocol("sqs")
.endpoint(queueArn)
.topicArn(topicArn)
.build()).subscriptionArn();Spring Boot Integration Example
Spring Boot集成示例
java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {
private final SnsClient snsClient;
private final ObjectMapper objectMapper;
@Value("${aws.sns.order-topic-arn}")
private String orderTopicArn;
public void sendOrderNotification(Order order) {
try {
String jsonMessage = objectMapper.writeValueAsString(order);
snsClient.publish(PublishRequest.builder()
.topicArn(orderTopicArn)
.subject("New Order Received")
.message(jsonMessage)
.messageAttributes(Map.of(
"orderType", MessageAttributeValue.builder()
.dataType("String")
.stringValue(order.getType())
.build()))
.build());
} catch (Exception e) {
throw new RuntimeException("Failed to send order notification", e);
}
}
}java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {
private final SnsClient snsClient;
private final ObjectMapper objectMapper;
@Value("${aws.sns.order-topic-arn}")
private String orderTopicArn;
public void sendOrderNotification(Order order) {
try {
String jsonMessage = objectMapper.writeValueAsString(order);
snsClient.publish(PublishRequest.builder()
.topicArn(orderTopicArn)
.subject("New Order Received")
.message(jsonMessage)
.messageAttributes(Map.of(
"orderType", MessageAttributeValue.builder()
.dataType("String")
.stringValue(order.getType())
.build()))
.build());
} catch (Exception e) {
throw new RuntimeException("Failed to send order notification", e);
}
}
}Best Practices
最佳实践
SQS Best Practices
SQS最佳实践
- Use long polling: Set (20-40 seconds) to reduce empty responses
waitTimeSeconds - Batch operations: Use for multiple messages to reduce API calls
sendMessageBatch - Visibility timeout: Set appropriately based on message processing time (default 30 seconds)
- Delete messages: Always delete messages after successful processing
- Handle duplicates: Implement idempotent processing for retries
- Implement DLQ: Route failed messages to dead letter queues for analysis
- Monitor queue depth: Use CloudWatch alarms for high queue backlog
- Use FIFO queues: When message order and deduplication are critical
- 使用长轮询:设置(20-40秒)以减少空响应
waitTimeSeconds - 批量操作:使用发送多条消息,减少API调用次数
sendMessageBatch - 可见性超时:根据消息处理时间合理设置(默认30秒)
- 删除消息:消息处理成功后务必删除
- 处理重复消息:实现幂等处理逻辑以应对重试
- 配置死信队列:将处理失败的消息路由到死信队列进行分析
- 监控队列深度:使用CloudWatch告警监控队列积压情况
- 使用FIFO队列:当消息顺序和去重至关重要时使用
SNS Best Practices
SNS最佳实践
- Use filter policies: Reduce noise by filtering messages at the source
- Message attributes: Add metadata for subscription routing decisions
- Retry logic: Handle transient failures with exponential backoff
- Monitor failed deliveries: Set up CloudWatch alarms for failed notifications
- Security: Use IAM policies for access control and data encryption
- FIFO topics: Use when order and deduplication are critical
- Avoid large payloads: Keep messages under 256KB for optimal performance
- 使用过滤策略:在源头过滤消息以减少无效消息
- 消息属性:添加元数据用于订阅路由决策
- 重试逻辑:使用指数退避处理临时失败
- 监控投递失败:配置CloudWatch告警监控通知失败情况
- 安全性:使用IAM策略进行访问控制和数据加密
- FIFO主题:当消息顺序和去重至关重要时使用
- 避免大负载:保持消息大小在256KB以下以获得最佳性能
General Guidelines
通用指南
- Region consistency: Use the same region for all AWS resources
- Resource naming: Use consistent naming conventions for queues and topics
- Error handling: Implement proper exception handling and logging
- Testing: Use LocalStack for local development and testing
- Documentation: Document subscription endpoints and message formats
- 区域一致性:所有AWS资源使用同一区域
- 资源命名:队列和主题使用一致的命名规范
- 错误处理:实现完善的异常处理和日志记录
- 测试:使用LocalStack进行本地开发和测试
- 文档:记录订阅端点和消息格式
Instructions
操作指南
Setup AWS Credentials
配置AWS凭证
Configure AWS credentials using environment variables, AWS CLI, or IAM roles:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1使用环境变量、AWS CLI或IAM角色配置AWS凭证:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1Configure Clients
配置客户端
java
// Basic client configuration
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
// Advanced client with custom configuration
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.credentialsProvider(DefaultCredentialsProvider.create())
.httpClient(UrlConnectionHttpClient.create())
.build();java
// 基础客户端配置
SqsClient sqsClient = SqsClient.builder()
.region(Region.US_EAST_1)
.build();
// 自定义配置的进阶客户端
SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1)
.credentialsProvider(DefaultCredentialsProvider.create())
.httpClient(UrlConnectionHttpClient.create())
.build();Implement Message Processing
实现消息处理
- Connect to SQS/SNS using the AWS SDK clients
- Create queues and topics as needed
- Send/receive messages with appropriate timeout settings
- Process messages in batches for efficiency
- Delete messages after successful processing
- Handle failures with proper error handling and retries
- 连接:使用AWS SDK客户端连接到SQS/SNS
- 创建:根据需要创建队列和主题
- 收发:使用合适的超时设置发送/接收消息
- 批量处理:批量处理消息以提高效率
- 删除:处理成功后删除消息
- 故障处理:实现完善的错误处理和重试逻辑
Integrate with Spring Boot
Spring Boot集成
- Configure beans for and
SqsClientinSnsClientclasses@Configuration - Use to inject queue URLs and topic ARNs from properties
@Value - Create service classes with business logic for messaging operations
- Implement error handling with or custom retry logic
@Retryable - Test integration using Testcontainers or LocalStack
- 配置:在类中配置
@Configuration和SqsClient的BeanSnsClient - 注入:使用从配置文件中注入队列URL和主题ARN
@Value - 创建服务类:编写包含消息操作业务逻辑的服务类
- 错误处理:使用或自定义重试逻辑实现错误处理
@Retryable - 测试:使用Testcontainers或LocalStack进行集成测试
Monitor and Debug
监控与调试
- Use AWS CloudWatch for monitoring queue depth and message metrics
- Enable AWS SDK logging for debugging client operations
- Implement proper logging for message processing activities
- Use AWS X-Ray for distributed tracing in production environments
- 使用AWS CloudWatch监控队列深度和消息指标
- 启用AWS SDK日志以调试客户端操作
- 为消息处理活动实现完善的日志记录
- 在生产环境中使用AWS X-Ray进行分布式追踪
Troubleshooting
故障排查
Common Issues
常见问题
- Queue does not exist: Verify queue URL and permissions
- Message not received: Check visibility timeout and consumer logic
- Permission denied: Verify IAM policies and credentials
- Connection timeout: Check network connectivity and region configuration
- Rate limiting: Implement retry logic with exponential backoff
- 队列不存在:验证队列URL和权限
- 未接收到消息:检查可见性超时和消费者逻辑
- 权限拒绝:验证IAM策略和凭证
- 连接超时:检查网络连接和区域配置
- 速率限制:实现带指数退避的重试逻辑
Performance Optimization
性能优化
- Use long polling to reduce empty responses
- Batch message operations to minimize API calls
- Adjust visibility timeout based on processing time
- Implement connection pooling and reuse clients
- Use appropriate message sizes to avoid fragmentation
- 使用长轮询减少空响应
- 批量消息操作以最小化API调用
- 根据处理时间调整可见性超时
- 实现连接池并复用客户端
- 使用合适的消息大小避免碎片化
Detailed References
详细参考
For comprehensive API documentation and advanced patterns, see:
- references/detailed-sqs-operations.md - Complete SQS operations reference
- references/detailed-sns-operations.md - Complete SNS operations reference
- references/spring-boot-integration.md - Spring Boot integration patterns
- references/aws-official-documentation.md - Official AWS documentation and best practices
如需完整的API文档和进阶模式,请查看:
- references/detailed-sqs-operations.md - 完整SQS操作参考
- references/detailed-sns-operations.md - 完整SNS操作参考
- references/spring-boot-integration.md - Spring Boot集成模式
- references/aws-official-documentation.md - AWS官方文档与最佳实践
Constraints and Warnings
限制与注意事项
- Message Size: SQS and SNS messages limited to 256KB
- Visibility Timeout: SQS messages become visible again after timeout if not deleted
- FIFO Naming: FIFO queues and topics must end with suffix
.fifo - FIFO Throughput: FIFO queues have lower throughput limits (300 msg/sec)
- Message Retention: SQS messages retained maximum 14 days
- Dead Letter Queues: Configure DLQ to prevent message loss
- Subscription Limits: SNS topics have limits on number of subscriptions
- Filter Policies: SNS filter policies have complexity limits
- Cross-Region: SQS queues are region-specific; SNS topics can be cross-region
- Cost: Both services charge per API call and data transfer
- 消息大小:SQS和SNS消息大小限制为256KB
- 可见性超时:如果未删除,SQS消息在超时后会重新变为可见
- FIFO命名规则:FIFO队列和主题必须以后缀结尾
.fifo - FIFO吞吐量:FIFO队列的吞吐量限制较低(300条消息/秒)
- 消息保留期:SQS消息最长保留14天
- 死信队列:配置死信队列以避免消息丢失
- 订阅限制:SNS主题的订阅数量存在限制
- 过滤策略复杂度:SNS过滤策略存在复杂度限制
- 跨区域限制:SQS队列是区域特定的;SNS主题支持跨区域
- 成本:两项服务均按API调用次数和数据传输量收费