aws-sdk-java-v2-messaging

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

AWS SDK for Java 2.x - Messaging (SQS & SNS)

AWS SDK for Java 2.x - 消息服务(SQS & SNS)

Overview

概述

Provide comprehensive AWS messaging patterns using AWS SDK for Java 2.x for both SQS and SNS services. Include client setup, queue management, message operations, subscription management, and Spring Boot integration patterns.
本文档介绍如何使用AWS SDK for Java 2.x为SQS和SNS服务实现全面的AWS消息模式,包括客户端配置、队列管理、消息操作、订阅管理以及Spring Boot集成模式。

When to Use

适用场景

Use this skill when working with:
  • Amazon SQS queues for message queuing
  • SNS topics for event publishing and notification
  • FIFO queues and standard queues
  • Dead Letter Queues (DLQ) for message handling
  • SNS subscriptions with email, SMS, SQS, Lambda endpoints
  • Pub/sub messaging patterns and event-driven architectures
  • Spring Boot integration with AWS messaging services
  • Testing strategies using LocalStack or Testcontainers
当你需要处理以下场景时,可以使用本技能:
  • 使用Amazon SQS队列进行消息排队
  • 使用SNS主题进行事件发布与通知
  • FIFO队列与标准队列
  • 用于消息处理的死信队列(DLQ)
  • 包含电子邮件、SMS、SQS、Lambda端点的SNS订阅
  • 发布/订阅消息模式与事件驱动架构
  • Spring Boot与AWS消息服务的集成
  • 使用LocalStack或Testcontainers进行测试的策略

Quick Start

快速开始

Dependencies

依赖

xml
<!-- SQS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
</dependency>

<!-- SNS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sns</artifactId>
</dependency>
xml
<!-- SQS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sqs</artifactId>
</dependency>

<!-- SNS -->
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>sns</artifactId>
</dependency>

Basic Client Setup

基础客户端配置

java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sns.SnsClient;

SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

Examples

示例

Basic SQS Operations

基础SQS操作

Create and Send Message

创建并发送消息

java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

// Setup SQS client
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create queue
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue")
    .build()).queueUrl();

// Send message
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(queueUrl)
    .messageBody("Hello, SQS!")
    .build()).messageId();
java
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;

// Setup SQS client
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create queue
String queueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue")
    .build()).queueUrl();

// Send message
String messageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(queueUrl)
    .messageBody("Hello, SQS!")
    .build()).messageId();

Receive and Delete Message

接收并删除消息

java
// Receive messages with long polling
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(10)
    .waitTimeSeconds(20)
    .build());

// Process and delete messages
response.messages().forEach(message -> {
    System.out.println("Received: " + message.body());
    sqsClient.deleteMessage(DeleteMessageRequest.builder()
        .queueUrl(queueUrl)
        .receiptHandle(message.receiptHandle())
        .build());
});
java
// Receive messages with long polling
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
    .queueUrl(queueUrl)
    .maxNumberOfMessages(10)
    .waitTimeSeconds(20)
    .build());

// Process and delete messages
response.messages().forEach(message -> {
    System.out.println("Received: " + message.body());
    sqsClient.deleteMessage(DeleteMessageRequest.builder()
        .queueUrl(queueUrl)
        .receiptHandle(message.receiptHandle())
        .build());
});

Basic SNS Operations

基础SNS操作

Create Topic and Publish

创建主题并发布消息

java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

// Setup SNS client
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create topic
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
    .name("my-topic")
    .build()).topicArn();

// Publish message
String messageId = snsClient.publish(PublishRequest.builder()
    .topicArn(topicArn)
    .subject("Test Notification")
    .message("Hello, SNS!")
    .build()).messageId();
java
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.*;

// Setup SNS client
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Create topic
String topicArn = snsClient.createTopic(CreateTopicRequest.builder()
    .name("my-topic")
    .build()).topicArn();

// Publish message
String messageId = snsClient.publish(PublishRequest.builder()
    .topicArn(topicArn)
    .subject("Test Notification")
    .message("Hello, SNS!")
    .build()).messageId();

Advanced Examples

进阶示例

FIFO Queue Pattern

FIFO队列模式

java
// Create FIFO queue
Map<QueueAttributeName, String> attributes = Map.of(
    QueueAttributeName.FIFO_QUEUE, "true",
    QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);

String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue.fifo")
    .attributes(attributes)
    .build()).queueUrl();

// Send FIFO message with group ID
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody("Order #12345")
    .messageGroupId("orders")
    .messageDeduplicationId(UUID.randomUUID().toString())
    .build()).messageId();
java
// Create FIFO queue
Map<QueueAttributeName, String> attributes = Map.of(
    QueueAttributeName.FIFO_QUEUE, "true",
    QueueAttributeName.CONTENT_BASED_DEDUPLICATION, "true"
);

String fifoQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("my-queue.fifo")
    .attributes(attributes)
    .build()).queueUrl();

// Send FIFO message with group ID
String fifoMessageId = sqsClient.sendMessage(SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody("Order #12345")
    .messageGroupId("orders")
    .messageDeduplicationId(UUID.randomUUID().toString())
    .build()).messageId();

SNS to SQS Subscription

SNS到SQS的订阅

java
// Create SQS queue for subscription
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("notification-subscriber")
    .build()).queueUrl();

// Get queue ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
    .queueUrl(subscriptionQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build()).attributes().get(QueueAttributeName.QUEUE_ARN);

// Subscribe SQS to SNS
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
    .protocol("sqs")
    .endpoint(queueArn)
    .topicArn(topicArn)
    .build()).subscriptionArn();
java
// Create SQS queue for subscription
String subscriptionQueueUrl = sqsClient.createQueue(CreateQueueRequest.builder()
    .queueName("notification-subscriber")
    .build()).queueUrl();

// Get queue ARN
String queueArn = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
    .queueUrl(subscriptionQueueUrl)
    .attributeNames(QueueAttributeName.QUEUE_ARN)
    .build()).attributes().get(QueueAttributeName.QUEUE_ARN);

// Subscribe SQS to SNS
String subscriptionArn = snsClient.subscribe(SubscribeRequest.builder()
    .protocol("sqs")
    .endpoint(queueArn)
    .topicArn(topicArn)
    .build()).subscriptionArn();

Spring Boot Integration Example

Spring Boot集成示例

java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {

    private final SnsClient snsClient;
    private final ObjectMapper objectMapper;

    @Value("${aws.sns.order-topic-arn}")
    private String orderTopicArn;

    public void sendOrderNotification(Order order) {
        try {
            String jsonMessage = objectMapper.writeValueAsString(order);

            snsClient.publish(PublishRequest.builder()
                .topicArn(orderTopicArn)
                .subject("New Order Received")
                .message(jsonMessage)
                .messageAttributes(Map.of(
                    "orderType", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue(order.getType())
                        .build()))
                .build());

        } catch (Exception e) {
            throw new RuntimeException("Failed to send order notification", e);
        }
    }
}
java
@Service
@RequiredArgsConstructor
public class OrderNotificationService {

    private final SnsClient snsClient;
    private final ObjectMapper objectMapper;

    @Value("${aws.sns.order-topic-arn}")
    private String orderTopicArn;

    public void sendOrderNotification(Order order) {
        try {
            String jsonMessage = objectMapper.writeValueAsString(order);

            snsClient.publish(PublishRequest.builder()
                .topicArn(orderTopicArn)
                .subject("New Order Received")
                .message(jsonMessage)
                .messageAttributes(Map.of(
                    "orderType", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue(order.getType())
                        .build()))
                .build());

        } catch (Exception e) {
            throw new RuntimeException("Failed to send order notification", e);
        }
    }
}

Best Practices

最佳实践

SQS Best Practices

SQS最佳实践

  • Use long polling: Set
    waitTimeSeconds
    (20-40 seconds) to reduce empty responses
  • Batch operations: Use
    sendMessageBatch
    for multiple messages to reduce API calls
  • Visibility timeout: Set appropriately based on message processing time (default 30 seconds)
  • Delete messages: Always delete messages after successful processing
  • Handle duplicates: Implement idempotent processing for retries
  • Implement DLQ: Route failed messages to dead letter queues for analysis
  • Monitor queue depth: Use CloudWatch alarms for high queue backlog
  • Use FIFO queues: When message order and deduplication are critical
  • 使用长轮询:设置
    waitTimeSeconds
    (20-40秒)以减少空响应
  • 批量操作:使用
    sendMessageBatch
    批量发送多条消息,减少API调用次数
  • 可见性超时:根据消息处理时间合理设置(默认30秒)
  • 删除消息:处理成功后务必删除消息
  • 处理重复消息:实现幂等处理以应对重试
  • 实现DLQ:将处理失败的消息路由到死信队列进行分析
  • 监控队列深度:使用CloudWatch告警监控队列积压情况
  • 使用FIFO队列:当消息顺序和去重至关重要时使用

SNS Best Practices

SNS最佳实践

  • Use filter policies: Reduce noise by filtering messages at the source
  • Message attributes: Add metadata for subscription routing decisions
  • Retry logic: Handle transient failures with exponential backoff
  • Monitor failed deliveries: Set up CloudWatch alarms for failed notifications
  • Security: Use IAM policies for access control and data encryption
  • FIFO topics: Use when order and deduplication are critical
  • Avoid large payloads: Keep messages under 256KB for optimal performance
  • 使用过滤策略:在源头过滤消息以减少无效通知
  • 消息属性:添加元数据用于订阅路由决策
  • 重试逻辑:使用指数退避处理临时失败
  • 监控投递失败:设置CloudWatch告警监控通知失败情况
  • 安全性:使用IAM策略进行访问控制和数据加密
  • FIFO主题:当消息顺序和去重至关重要时使用
  • 避免大 payload:保持消息大小在256KB以下以获得最佳性能

General Guidelines

通用指南

  • Region consistency: Use the same region for all AWS resources
  • Resource naming: Use consistent naming conventions for queues and topics
  • Error handling: Implement proper exception handling and logging
  • Testing: Use LocalStack for local development and testing
  • Documentation: Document subscription endpoints and message formats
  • 区域一致性:所有AWS资源使用同一区域
  • 资源命名:队列和主题使用统一的命名规范
  • 错误处理:实现完善的异常处理和日志记录
  • 测试:使用LocalStack进行本地开发和测试
  • 文档:记录订阅端点和消息格式

Instructions

操作指南

Setup AWS Credentials

配置AWS凭证

Configure AWS credentials using environment variables, AWS CLI, or IAM roles:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1
使用环境变量、AWS CLI或IAM角色配置AWS凭证:
bash
export AWS_ACCESS_KEY_ID=your-access-key
export AWS_SECRET_ACCESS_KEY=your-secret-key
export AWS_REGION=us-east-1

Configure Clients

配置客户端

java
// Basic client configuration
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Advanced client with custom configuration
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(DefaultCredentialsProvider.create())
    .httpClient(UrlConnectionHttpClient.create())
    .build();
java
// Basic client configuration
SqsClient sqsClient = SqsClient.builder()
    .region(Region.US_EAST_1)
    .build();

// Advanced client with custom configuration
SnsClient snsClient = SnsClient.builder()
    .region(Region.US_EAST_1)
    .credentialsProvider(DefaultCredentialsProvider.create())
    .httpClient(UrlConnectionHttpClient.create())
    .build();

Implement Message Processing

实现消息处理

  1. Connect to SQS/SNS using the AWS SDK clients
  2. Create queues and topics as needed
  3. Send/receive messages with appropriate timeout settings
  4. Process messages in batches for efficiency
  5. Delete messages after successful processing
  6. Handle failures with proper error handling and retries
  1. 连接:使用AWS SDK客户端连接到SQS/SNS
  2. 创建:根据需要创建队列和主题
  3. 发送/接收:使用适当的超时设置发送/接收消息
  4. 处理:批量处理消息以提高效率
  5. 删除:处理成功后删除消息
  6. 处理失败:实现完善的错误处理和重试逻辑

Integrate with Spring Boot

与Spring Boot集成

  1. Configure beans for
    SqsClient
    and
    SnsClient
    in
    @Configuration
    classes
  2. Use
    @Value
    to inject queue URLs and topic ARNs from properties
  3. Create service classes with business logic for messaging operations
  4. Implement error handling with
    @Retryable
    or custom retry logic
  5. Test integration using Testcontainers or LocalStack
  1. 配置:在
    @Configuration
    类中配置
    SqsClient
    SnsClient
    的Bean
  2. 注入:使用
    @Value
    从配置文件注入队列URL和主题ARN
  3. 创建服务类:编写包含消息操作业务逻辑的服务类
  4. 实现错误处理:使用
    @Retryable
    或自定义重试逻辑
  5. 测试集成:使用Testcontainers或LocalStack进行测试

Monitor and Debug

监控与调试

  • Use AWS CloudWatch for monitoring queue depth and message metrics
  • Enable AWS SDK logging for debugging client operations
  • Implement proper logging for message processing activities
  • Use AWS X-Ray for distributed tracing in production environments
  • 使用AWS CloudWatch监控队列深度和消息指标
  • 启用AWS SDK日志以调试客户端操作
  • 为消息处理活动实现完善的日志记录
  • 在生产环境中使用AWS X-Ray进行分布式追踪

Troubleshooting

故障排查

Common Issues

常见问题

  • Queue does not exist: Verify queue URL and permissions
  • Message not received: Check visibility timeout and consumer logic
  • Permission denied: Verify IAM policies and credentials
  • Connection timeout: Check network connectivity and region configuration
  • Rate limiting: Implement retry logic with exponential backoff
  • 队列不存在:验证队列URL和权限
  • 未收到消息:检查可见性超时和消费者逻辑
  • 权限拒绝:验证IAM策略和凭证
  • 连接超时:检查网络连接和区域配置
  • 速率限制:实现带指数退避的重试逻辑

Performance Optimization

性能优化

  • Use long polling to reduce empty responses
  • Batch message operations to minimize API calls
  • Adjust visibility timeout based on processing time
  • Implement connection pooling and reuse clients
  • Use appropriate message sizes to avoid fragmentation
  • 使用长轮询减少空响应
  • 批量处理消息以最小化API调用
  • 根据处理时间调整可见性超时
  • 实现连接池并复用客户端
  • 使用合适的消息大小避免碎片化

Detailed References

详细参考资料

For comprehensive API documentation and advanced patterns, see:
  • [@references/detailed-sqs-operations] - Complete SQS operations reference
  • [@references/detailed-sns-operations] - Complete SNS operations reference
  • [@references/spring-boot-integration] - Spring Boot integration patterns
  • [@references/aws-official-documentation] - Official AWS documentation and best practices
如需完整的API文档和进阶模式,请查看:
  • [@references/detailed-sqs-operations] - 完整的SQS操作参考
  • [@references/detailed-sns-operations] - 完整的SNS操作参考
  • [@references/spring-boot-integration] - Spring Boot集成模式
  • [@references/aws-official-documentation] - AWS官方文档和最佳实践