aws-sdk-java-v2-messaging

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AWS SDK for Java 2.x - Messaging (SQS & SNS)

AWS SDK for Java 2.x - 消息服务(SQS & SNS)

Overview

概述

Provide comprehensive AWS messaging patterns using AWS SDK for Java 2.x for both SQS and SNS services. Include client setup, queue management, message operations, subscription management, and Spring Boot integration patterns.
提供基于AWS SDK for Java 2.x的SQS和SNS服务的完整AWS消息模式实现,包括客户端配置、队列管理、消息操作、订阅管理以及Spring Boot集成模式。

When to Use

适用场景

Use this skill when working with:
  • Amazon SQS queues for message queuing
  • SNS topics for event publishing and notification
  • FIFO queues and standard queues
  • Dead Letter Queues (DLQ) for message handling
  • SNS subscriptions with email, SMS, SQS, Lambda endpoints
  • Pub/sub messaging patterns and event-driven architectures
  • Spring Boot integration with AWS messaging services
  • Testing strategies using LocalStack or Testcontainers
在以下场景中使用本技能:
  • 使用Amazon SQS队列实现消息队列功能
  • 使用SNS主题实现事件发布与通知
  • FIFO队列与标准队列的使用
  • 死信队列(DLQ)的消息处理
  • 配置SNS订阅(支持邮箱、SMS、SQS、Lambda端点)
  • 发布/订阅消息模式与事件驱动架构
  • Spring Boot与AWS消息服务的集成
  • 使用LocalStack或Testcontainers进行测试

Quick Start

快速开始

Dependencies

依赖

xml
<!-- SQS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
</dependency>

<!-- SNS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sns</artifactId>
</dependency>
xml
<!-- SQS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
</dependency>

<!-- SNS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sns</artifactId>
</dependency>

Basic Client Setup

基础客户端配置

java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

Examples

示例

Basic SQS Operations

基础SQS操作

Create and Send Message

创建队列并发送消息

java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

// Setup SQS client
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create queue
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue")
    .build()).queueUrl();

// Send message
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(queueUrl)
    .messageBody("Hello, SQS!")
    .build()).messageId();
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

// 配置SQS客户端
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// 创建队列
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue")
    .build()).queueUrl();

// 发送消息
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(queueUrl)
    .messageBody("Hello, SQS!")
    .build()).messageId();

Receive and Delete Message

接收并删除消息

java
// Receive messages with long polling
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(10)
    .waitTimeSeconds(20)
    .build());

// Process and delete messages
response.messages().forEach(message -> {
    System.out.println("Received: " + message.body());
    sqsClient.deleteMessage(DeleteMessageRequest.builder()
        .queueUrl(queueUrl)
        .receiptHandle(message.receiptHandle())
        .build());
});
java
// 使用长轮询接收消息
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(10)
    .waitTimeSeconds(20)
    .build());

// 处理并删除消息
response.messages().forEach(message -> {
    System.out.println("Received: " + message.body());
    sqsClient.deleteMessage(DeleteMessageRequest.builder()
        .queueUrl(queueUrl)
        .receiptHandle(message.receiptHandle())
        .build());
});

Basic SNS Operations

基础SNS操作

Create Topic and Publish

创建主题并发布消息

java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

// Setup SNS client
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create topic
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
    .name("my-topic")
    .build()).topicArn();

// Publish message
String messageId = snsClient.publish(PublishRequest.builder()
    .topicArn(topicArn)
    .subject("Test Notification")
    .message("Hello, SNS!")
    .build()).messageId();
java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

// 配置SNS客户端
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// 创建主题
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
    .name("my-topic")
    .build()).topicArn();

// 发布消息
String messageId = snsClient.publish(PublishRequest.builder()
    .topicArn(topicArn)
    .subject("Test Notification")
    .message("Hello, SNS!")
    .build()).messageId();

Advanced Examples

进阶示例

FIFO Queue Pattern

FIFO队列模式

java
// Create FIFO queue
Map<QueueAttributeName, String> attributes = Map.of(
    QueueAttributeName.FIFO_QUEUE, "true",
    QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);

String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue.fifo")
    .attributes(attributes)
    .build()).queueUrl();

// Send FIFO message with group ID
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody("Order #12345")
    .messageGroupId("orders")
    .messageDeduplicationId(UUID.randomUUID().toString())
    .build()).messageId();
java
// 创建FIFO队列
Map<QueueAttributeName, String> attributes = Map.of(
    QueueAttributeName.FIFO_QUEUE, "true",
    QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);

String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue.fifo")
    .attributes(attributes)
    .build()).queueUrl();

// 发送带分组ID的FIFO消息
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody("Order #12345")
    .messageGroupId("orders")
    .messageDeduplicationId(UUID.randomUUID().toString())
    .build()).messageId();

SNS to SQS Subscription

SNS到SQS的订阅

java
// Create SQS queue for subscription
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("notification-subscriber")
    .build()).queueUrl();

// Get queue ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
    .queueUrl(subscriptionQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build()).attributes().get(QueueAttributeName.QUEUE_ARN);

// Subscribe SQS to SNS
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
    .protocol("sqs")
    .endpoint(queueArn)
    .topicArn(topicArn)
    .build()).subscriptionArn();
java
// 创建用于订阅的SQS队列
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("notification-subscriber")
    .build()).queueUrl();

// 获取队列ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
    .queueUrl(subscriptionQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build()).attributes().get(QueueAttributeName.QUEUE_ARN);

// 订阅SQS到SNS主题
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
    .protocol("sqs")
    .endpoint(queueArn)
    .topicArn(topicArn)
    .build()).subscriptionArn();

Spring Boot Integration Example

Spring Boot集成示例

java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {

    private final SnsClient snsClient;
    private final ObjectMapper objectMapper;

    @Value("${aws.sns.order-topic-arn}")
    private String orderTopicArn;

    public void sendOrderNotification(Order order) {
        try {
            String jsonMessage = objectMapper.writeValueAsString(order);

            snsClient.publish(PublishRequest.builder()
                .topicArn(orderTopicArn)
                .subject("New Order Received")
                .message(jsonMessage)
                .messageAttributes(Map.of(
                    "orderType", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue(order.getType())
                        .build()))
                .build());

        } catch (Exception e) {
            throw new RuntimeException("Failed to send order notification", e);
        }
    }
}
java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {

    private final SnsClient snsClient;
    private final ObjectMapper objectMapper;

    @Value("${aws.sns.order-topic-arn}")
    private String orderTopicArn;

    public void sendOrderNotification(Order order) {
        try {
            String jsonMessage = objectMapper.writeValueAsString(order);

            snsClient.publish(PublishRequest.builder()
                .topicArn(orderTopicArn)
                .subject("New Order Received")
                .message(jsonMessage)
                .messageAttributes(Map.of(
                    "orderType", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue(order.getType())
                        .build()))
                .build());

        } catch (Exception e) {
            throw new RuntimeException("Failed to send order notification", e);
        }
    }
}

Best Practices

最佳实践

SQS Best Practices

SQS最佳实践

  • Use long polling: Set
    waitTimeSeconds
    (20-40 seconds) to reduce empty responses
  • Batch operations: Use
    sendMessageBatch
    for multiple messages to reduce API calls
  • Visibility timeout: Set appropriately based on message processing time (default 30 seconds)
  • Delete messages: Always delete messages after successful processing
  • Handle duplicates: Implement idempotent processing for retries
  • Implement DLQ: Route failed messages to dead letter queues for analysis
  • Monitor queue depth: Use CloudWatch alarms for high queue backlog
  • Use FIFO queues: When message order and deduplication are critical
  • 使用长轮询:设置
    waitTimeSeconds
    (20-40秒)以减少空响应
  • 批量操作:使用
    sendMessageBatch
    发送多条消息,减少API调用次数
  • 可见性超时:根据消息处理时间合理设置(默认30秒)
  • 删除消息:消息处理成功后务必删除
  • 处理重复消息:实现幂等处理逻辑以应对重试
  • 配置死信队列:将处理失败的消息路由到死信队列进行分析
  • 监控队列深度:使用CloudWatch告警监控队列积压情况
  • 使用FIFO队列:当消息顺序和去重至关重要时使用

SNS Best Practices

SNS最佳实践

  • Use filter policies: Reduce noise by filtering messages at the source
  • Message attributes: Add metadata for subscription routing decisions
  • Retry logic: Handle transient failures with exponential backoff
  • Monitor failed deliveries: Set up CloudWatch alarms for failed notifications
  • Security: Use IAM policies for access control and data encryption
  • FIFO topics: Use when order and deduplication are critical
  • Avoid large payloads: Keep messages under 256KB for optimal performance
  • 使用过滤策略:在源头过滤消息以减少无效消息
  • 消息属性:添加元数据用于订阅路由决策
  • 重试逻辑:使用指数退避处理临时失败
  • 监控投递失败:配置CloudWatch告警监控通知失败情况
  • 安全性:使用IAM策略进行访问控制和数据加密
  • FIFO主题:当消息顺序和去重至关重要时使用
  • 避免大负载:保持消息大小在256KB以下以获得最佳性能

General Guidelines

通用指南

  • Region consistency: Use the same region for all AWS resources
  • Resource naming: Use consistent naming conventions for queues and topics
  • Error handling: Implement proper exception handling and logging
  • Testing: Use LocalStack for local development and testing
  • Documentation: Document subscription endpoints and message formats
  • 区域一致性:所有AWS资源使用同一区域
  • 资源命名:队列和主题使用一致的命名规范
  • 错误处理:实现完善的异常处理和日志记录
  • 测试:使用LocalStack进行本地开发和测试
  • 文档:记录订阅端点和消息格式

Instructions

操作指南

Setup AWS Credentials

配置AWS凭证

Configure AWS credentials using environment variables, AWS CLI, or IAM roles:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1
使用环境变量、AWS CLI或IAM角色配置AWS凭证:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1

Configure Clients

配置客户端

java
// Basic client configuration
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Advanced client with custom configuration
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(DefaultCredentialsProvider.create())
    .httpClient(UrlConnectionHttpClient.create())
    .build();
java
// 基础客户端配置
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// 自定义配置的进阶客户端
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(DefaultCredentialsProvider.create())
    .httpClient(UrlConnectionHttpClient.create())
    .build();

Implement Message Processing

实现消息处理

  1. Connect to SQS/SNS using the AWS SDK clients
  2. Create queues and topics as needed
  3. Send/receive messages with appropriate timeout settings
  4. Process messages in batches for efficiency
  5. Delete messages after successful processing
  6. Handle failures with proper error handling and retries
  1. 连接:使用AWS SDK客户端连接到SQS/SNS
  2. 创建:根据需要创建队列和主题
  3. 收发:使用合适的超时设置发送/接收消息
  4. 批量处理:批量处理消息以提高效率
  5. 删除:处理成功后删除消息
  6. 故障处理:实现完善的错误处理和重试逻辑

Integrate with Spring Boot

Spring Boot集成

  1. Configure beans for
    SqsClient
    and
    SnsClient
    in
    @Configuration
    classes
  2. Use
    @Value
    to inject queue URLs and topic ARNs from properties
  3. Create service classes with business logic for messaging operations
  4. Implement error handling with
    @Retryable
    or custom retry logic
  5. Test integration using Testcontainers or LocalStack
  1. 配置:在
    @Configuration
    类中配置
    SqsClient
    SnsClient
    的Bean
  2. 注入:使用
    @Value
    从配置文件中注入队列URL和主题ARN
  3. 创建服务类:编写包含消息操作业务逻辑的服务类
  4. 错误处理:使用
    @Retryable
    或自定义重试逻辑实现错误处理
  5. 测试:使用Testcontainers或LocalStack进行集成测试

Monitor and Debug

监控与调试

  • Use AWS CloudWatch for monitoring queue depth and message metrics
  • Enable AWS SDK logging for debugging client operations
  • Implement proper logging for message processing activities
  • Use AWS X-Ray for distributed tracing in production environments
  • 使用AWS CloudWatch监控队列深度和消息指标
  • 启用AWS SDK日志以调试客户端操作
  • 为消息处理活动实现完善的日志记录
  • 在生产环境中使用AWS X-Ray进行分布式追踪

Troubleshooting

故障排查

Common Issues

常见问题

  • Queue does not exist: Verify queue URL and permissions
  • Message not received: Check visibility timeout and consumer logic
  • Permission denied: Verify IAM policies and credentials
  • Connection timeout: Check network connectivity and region configuration
  • Rate limiting: Implement retry logic with exponential backoff
  • 队列不存在:验证队列URL和权限
  • 未接收到消息:检查可见性超时和消费者逻辑
  • 权限拒绝:验证IAM策略和凭证
  • 连接超时:检查网络连接和区域配置
  • 速率限制:实现带指数退避的重试逻辑

Performance Optimization

性能优化

  • Use long polling to reduce empty responses
  • Batch message operations to minimize API calls
  • Adjust visibility timeout based on processing time
  • Implement connection pooling and reuse clients
  • Use appropriate message sizes to avoid fragmentation
  • 使用长轮询减少空响应
  • 批量消息操作以最小化API调用
  • 根据处理时间调整可见性超时
  • 实现连接池并复用客户端
  • 使用合适的消息大小避免碎片化

Detailed References

详细参考

For comprehensive API documentation and advanced patterns, see:
  • references/detailed-sqs-operations.md - Complete SQS operations reference
  • references/detailed-sns-operations.md - Complete SNS operations reference
  • references/spring-boot-integration.md - Spring Boot integration patterns
  • references/aws-official-documentation.md - Official AWS documentation and best practices
如需完整的API文档和进阶模式,请查看:
  • references/detailed-sqs-operations.md - 完整SQS操作参考
  • references/detailed-sns-operations.md - 完整SNS操作参考
  • references/spring-boot-integration.md - Spring Boot集成模式
  • references/aws-official-documentation.md - AWS官方文档与最佳实践

Constraints and Warnings

限制与注意事项

  • Message Size: SQS and SNS messages limited to 256KB
  • Visibility Timeout: SQS messages become visible again after timeout if not deleted
  • FIFO Naming: FIFO queues and topics must end with
    .fifo
    suffix
  • FIFO Throughput: FIFO queues have lower throughput limits (300 msg/sec)
  • Message Retention: SQS messages retained maximum 14 days
  • Dead Letter Queues: Configure DLQ to prevent message loss
  • Subscription Limits: SNS topics have limits on number of subscriptions
  • Filter Policies: SNS filter policies have complexity limits
  • Cross-Region: SQS queues are region-specific; SNS topics can be cross-region
  • Cost: Both services charge per API call and data transfer
  • 消息大小:SQS和SNS消息大小限制为256KB
  • 可见性超时:如果未删除,SQS消息在超时后会重新变为可见
  • FIFO命名规则:FIFO队列和主题必须以
    .fifo
    后缀结尾
  • FIFO吞吐量:FIFO队列的吞吐量限制较低(300条消息/秒)
  • 消息保留期:SQS消息最长保留14天
  • 死信队列:配置死信队列以避免消息丢失
  • 订阅限制:SNS主题的订阅数量存在限制
  • 过滤策略复杂度:SNS过滤策略存在复杂度限制
  • 跨区域限制:SQS队列是区域特定的;SNS主题支持跨区域
  • 成本:两项服务均按API调用次数和数据传输量收费