streaming-data

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Streaming Data Processing

流式数据处理

Build production-ready event streaming systems and real-time data pipelines using modern message brokers and stream processors.
使用现代消息代理和流处理器构建可用于生产环境的事件流系统与实时数据管道。

When to Use This Skill

何时使用该技能

Use this skill when:
  • Building event-driven architectures and microservices communication
  • Processing real-time analytics, monitoring, or alerting systems
  • Implementing data integration pipelines (CDC, ETL/ELT)
  • Creating log or metrics aggregation systems
  • Developing IoT platforms or high-frequency trading systems
在以下场景中使用本技能:
  • 构建事件驱动架构与微服务通信机制
  • 处理实时分析、监控或告警系统
  • 实现数据集成管道(CDC、ETL/ELT)
  • 创建日志或指标聚合系统
  • 开发物联网平台或高频交易系统

Core Concepts

核心概念

Message Brokers vs Stream Processors

消息代理 vs 流处理器

Message Brokers (Kafka, Pulsar, Redpanda):
  • Store and distribute event streams
  • Provide durability, replay capability, partitioning
  • Handle producer/consumer coordination
Stream Processors (Flink, Spark, Kafka Streams):
  • Transform and aggregate streaming data
  • Provide windowing, joins, stateful operations
  • Execute complex event processing (CEP)
消息代理(Kafka、Pulsar、Redpanda):
  • 存储并分发事件流
  • 提供持久性、重放能力和分区功能
  • 处理生产者/消费者的协调工作
流处理器(Flink、Spark、Kafka Streams):
  • 转换并聚合流式数据
  • 提供窗口计算、关联操作、有状态处理能力
  • 执行复杂事件处理(CEP)

Delivery Guarantees

交付保障

At-Most-Once:
  • Messages may be lost, no duplicates
  • Lowest overhead
  • Use for: Metrics, logs where loss is acceptable
At-Least-Once:
  • Messages never lost, may have duplicates
  • Moderate overhead, requires idempotent consumers
  • Use for: Most applications (default choice)
Exactly-Once:
  • Messages never lost or duplicated
  • Highest overhead, requires transactional processing
  • Use for: Financial transactions, critical state updates
至多一次
  • 消息可能丢失,但不会重复
  • 开销最低
  • 适用场景:指标、日志等可接受少量丢失的场景
至少一次
  • 消息不会丢失,但可能重复
  • 开销中等,需要幂等消费者
  • 适用场景:大多数应用(默认选择)
恰好一次
  • 消息既不会丢失也不会重复
  • 开销最高,需要事务性处理
  • 适用场景:金融交易、关键状态更新

Quick Start Guide

快速入门指南

Step 1: Choose a Message Broker

步骤1:选择消息代理

See references/broker-selection.md for detailed comparison.
Quick decision:
  • Apache Kafka: Mature ecosystem, enterprise features, event sourcing
  • Redpanda: Low latency, Kafka-compatible, simpler operations (no ZooKeeper)
  • Apache Pulsar: Multi-tenancy, geo-replication, tiered storage
  • RabbitMQ: Traditional message queues, RPC patterns
详见references/broker-selection.md中的详细对比。
快速决策建议
  • Apache Kafka:成熟生态、企业级功能、事件溯源首选
  • Redpanda:低延迟、兼容Kafka API、运维更简单(无需ZooKeeper)
  • Apache Pulsar:多租户、地理复制、分层存储
  • RabbitMQ:传统消息队列、RPC模式首选

Step 2: Choose a Stream Processor (if needed)

步骤2:选择流处理器(如有需要)

See references/processor-selection.md for detailed comparison.
Quick decision:
  • Apache Flink: Millisecond latency, real-time analytics, CEP
  • Apache Spark: Batch + stream hybrid, ML integration, analytics
  • Kafka Streams: Embedded in microservices, no separate cluster
  • ksqlDB: SQL interface for stream processing
详见references/processor-selection.md中的详细对比。
快速决策建议
  • Apache Flink:毫秒级延迟、实时分析、复杂事件处理
  • Apache Spark:批流混合处理、机器学习集成、数据分析
  • Kafka Streams:嵌入微服务中、无需独立集群
  • ksqlDB:基于SQL的流处理接口

Step 3: Implement Producer/Consumer Patterns

步骤3:实现生产者/消费者模式

Choose language-specific guide:
  • TypeScript/Node.js: references/typescript-patterns.md (KafkaJS)
  • Python: references/python-patterns.md (confluent-kafka-python)
  • Go: references/go-patterns.md (kafka-go)
  • Java/Scala: references/java-patterns.md (Apache Kafka Java Client)
选择对应语言的指南:
  • TypeScript/Node.js:references/typescript-patterns.md(KafkaJS)
  • Python:references/python-patterns.md(confluent-kafka-python)
  • Go:references/go-patterns.md(kafka-go)
  • Java/Scala:references/java-patterns.md(Apache Kafka Java Client)

Common Patterns

常见模式

Basic Producer Pattern

基础生产者模式

Send events to a topic with error handling:
1. Create producer with broker addresses
2. Configure delivery guarantees (acks, retries, idempotence)
3. Send messages with key (for partitioning) and value
4. Handle delivery callbacks or errors
5. Flush and close producer on shutdown
通过错误处理机制向主题发送事件:
1. 使用代理地址创建生产者
2. 配置交付保障(acks、重试、幂等性)
3. 发送包含键(用于分区)和值的消息
4. 处理交付回调或错误
5. 关闭前刷新并关闭生产者

Basic Consumer Pattern

基础消费者模式

Process events from topics with offset management:
1. Create consumer with broker addresses and group ID
2. Subscribe to topics
3. Poll for messages
4. Process each message
5. Commit offsets (auto or manual)
6. Handle errors (retry, DLQ, skip)
7. Close consumer gracefully
通过偏移量管理处理来自主题的事件:
1. 使用代理地址和组ID创建消费者
2. 订阅主题
3. 拉取消息
4. 处理每条消息
5. 提交偏移量(自动或手动)
6. 处理错误(重试、死信队列、跳过)
7. 优雅关闭消费者

Error Handling Strategy

错误处理策略

For production systems, implement:
  • Dead Letter Queue (DLQ): Send failed messages to separate topic
  • Retry Logic: Configurable retry attempts with backoff
  • Graceful Shutdown: Finish processing, commit offsets, close connections
  • Monitoring: Track consumer lag, error rates, throughput
对于生产级系统,需实现:
  • 死信队列(DLQ):将处理失败的消息发送至独立主题
  • 重试逻辑:可配置重试次数及退避策略
  • 优雅关闭:完成当前处理、提交偏移量、关闭连接
  • 监控:跟踪消费者滞后量、错误率、吞吐量

Decision Frameworks

决策框架

Framework: Message Broker Selection

框架:消息代理选择

START: What are requirements?

1. Need Kafka API compatibility?
   YES → Kafka or Redpanda
   NO → Continue

2. Is multi-tenancy critical?
   YES → Apache Pulsar
   NO → Continue

3. Operational simplicity priority?
   YES → Redpanda (single binary, no ZooKeeper)
   NO → Continue

4. Mature ecosystem needed?
   YES → Apache Kafka
   NO → Redpanda (better performance)

5. Task queues (not event streams)?
   YES → RabbitMQ or message-queues skill
   NO → Kafka/Redpanda/Pulsar
开始:明确需求

1. 是否需要兼容Kafka API?
   是 → Kafka或Redpanda
   否 → 继续

2. 多租户是否为关键需求?
   是 → Apache Pulsar
   否 → 继续

3. 是否优先考虑运维简单性?
   是 → Redpanda(单二进制文件、无需ZooKeeper)
   否 → 继续

4. 是否需要成熟的生态系统?
   是 → Apache Kafka
   否 → Redpanda(性能更优)

5. 是否需要任务队列(而非事件流)?
   是 → RabbitMQ或消息队列技能
   否 → Kafka/Redpanda/Pulsar

Framework: Stream Processor Selection

框架:流处理器选择

START: What is latency requirement?

1. Millisecond-level latency needed?
   YES → Apache Flink
   NO → Continue

2. Batch + stream in same pipeline?
   YES → Apache Spark Streaming
   NO → Continue

3. Embedded in microservice?
   YES → Kafka Streams
   NO → Continue

4. SQL interface for analysts?
   YES → ksqlDB
   NO → Flink or Spark

5. Python primary language?
   YES → Spark (PySpark) or Faust
   NO → Flink (Java/Scala)
开始:明确延迟需求

1. 是否需要毫秒级延迟?
   是 → Apache Flink
   否 → 继续

2. 是否需要在同一管道中处理批流混合任务?
   是 → Apache Spark Streaming
   否 → 继续

3. 是否需要嵌入微服务?
   是 → Kafka Streams
   否 → 继续

4. 是否需要为分析师提供SQL接口?
   是 → ksqlDB
   否 → Flink或Spark

5. 是否以Python为主要开发语言?
   是 → Spark(PySpark)或Faust
   否 → Flink(Java/Scala)

Framework: Language Selection

框架:语言选择

TypeScript/Node.js:
  • API gateways, web services, real-time dashboards
  • KafkaJS library (827 code snippets, high reputation)
Python:
  • Data science, ML pipelines, analytics
  • confluent-kafka-python (192 snippets, score 68.8)
Go:
  • High-performance microservices, infrastructure tools
  • kafka-go (42 snippets, idiomatic Go)
Java/Scala:
  • Enterprise applications, Kafka Streams, Flink, Spark
  • Apache Kafka Java Client (683 snippets, score 76.9)
TypeScript/Node.js
  • API网关、Web服务、实时仪表盘
  • KafkaJS库(827个代码片段,高认可度)
Python
  • 数据科学、机器学习管道、数据分析
  • confluent-kafka-python(192个片段,评分68.8)
Go
  • 高性能微服务、基础设施工具
  • kafka-go(42个片段,符合Go语言风格)
Java/Scala
  • 企业级应用、Kafka Streams、Flink、Spark
  • Apache Kafka Java Client(683个片段,评分76.9)

Advanced Patterns

高级模式

Event Sourcing

事件溯源

Store state changes as immutable events. See references/event-sourcing.md for:
  • Event store design patterns
  • Event schema evolution
  • Snapshot strategies
  • Temporal queries and audit trails
将状态变更存储为不可变事件。详见references/event-sourcing.md中的内容:
  • 事件存储设计模式
  • 事件 schema 演进
  • 快照策略
  • 时间查询与审计追踪

Change Data Capture (CDC)

变更数据捕获(CDC)

Capture database changes as events. See references/cdc-patterns.md for:
  • Debezium integration (MySQL, PostgreSQL, MongoDB)
  • Real-time data synchronization
  • Microservices data integration patterns
将数据库变更捕获为事件。详见references/cdc-patterns.md中的内容:
  • Debezium集成(MySQL、PostgreSQL、MongoDB)
  • 实时数据同步
  • 微服务数据集成模式

Exactly-Once Processing

恰好一次处理

Implement transactional guarantees. See references/exactly-once.md for:
  • Idempotent producers
  • Transactional consumers
  • End-to-end exactly-once pipelines
实现事务性保障。详见references/exactly-once.md中的内容:
  • 幂等生产者
  • 事务性消费者
  • 端到端恰好一次管道

Error Handling

错误处理

Production-grade error management. See references/error-handling.md for:
  • Dead letter queue patterns
  • Retry strategies with exponential backoff
  • Backpressure handling
  • Circuit breakers for downstream failures
生产级错误管理。详见references/error-handling.md中的内容:
  • 死信队列模式
  • 带指数退避的重试策略
  • 背压处理
  • 下游故障的断路器机制

Reference Files

参考文件

Decision Guides

决策指南

  • references/broker-selection.md - Kafka vs Pulsar vs Redpanda comparison
  • references/processor-selection.md - Flink vs Spark vs Kafka Streams
  • references/delivery-guarantees.md - At-least-once, exactly-once patterns
  • references/broker-selection.md - Kafka vs Pulsar vs Redpanda对比
  • references/processor-selection.md - Flink vs Spark vs Kafka Streams对比
  • references/delivery-guarantees.md - 至少一次、恰好一次模式

Language-Specific Implementation

语言特定实现

  • references/typescript-patterns.md - KafkaJS patterns (producer, consumer, error handling)
  • references/python-patterns.md - confluent-kafka-python patterns
  • references/go-patterns.md - kafka-go patterns
  • references/java-patterns.md - Apache Kafka Java client patterns
  • references/typescript-patterns.md - KafkaJS模式(生产者、消费者、错误处理)
  • references/python-patterns.md - confluent-kafka-python模式
  • references/go-patterns.md - kafka-go模式
  • references/java-patterns.md - Apache Kafka Java客户端模式

Advanced Topics

高级主题

  • references/event-sourcing.md - Event sourcing architecture
  • references/cdc-patterns.md - Change Data Capture with Debezium
  • references/exactly-once.md - Transactional processing
  • references/error-handling.md - DLQ, retries, backpressure
  • references/performance-tuning.md - Throughput optimization, partitioning strategies
  • references/event-sourcing.md - 事件溯源架构
  • references/cdc-patterns.md - 基于Debezium的变更数据捕获
  • references/exactly-once.md - 事务性处理
  • references/error-handling.md - DLQ、重试、背压
  • references/performance-tuning.md - 吞吐量优化、分区策略

Validation Scripts

验证脚本

Run these scripts for token-free validation and generation:
运行以下脚本进行无令牌验证和生成:

Validate Kafka Configuration

验证Kafka配置

bash
python scripts/validate-kafka-config.py --config producer.yaml
python scripts/validate-kafka-config.py --config consumer.yaml
Checks: broker connectivity, configuration validity, serialization format
bash
python scripts/validate-kafka-config.py --config producer.yaml
python scripts/validate-kafka-config.py --config consumer.yaml
检查内容:代理连通性、配置有效性、序列化格式

Generate Schema Registry Templates

生成Schema Registry模板

bash
python scripts/generate-schema.py --type avro --entity User
python scripts/generate-schema.py --type protobuf --entity Event
Creates: Avro/Protobuf schema definitions for Schema Registry
bash
python scripts/generate-schema.py --type avro --entity User
python scripts/generate-schema.py --type protobuf --entity Event
生成内容:用于Schema Registry的Avro/Protobuf schema定义

Benchmark Throughput

基准吞吐量测试

bash
bash scripts/benchmark-throughput.sh --broker localhost:9092 --topic test
Tests: Producer/consumer throughput, latency percentiles
bash
bash scripts/benchmark-throughput.sh --broker localhost:9092 --topic test
测试内容:生产者/消费者吞吐量、延迟百分位数

Code Examples

代码示例

TypeScript Example (KafkaJS)

TypeScript示例(KafkaJS)

See examples/typescript/ for:
  • basic-producer.ts - Simple event producer with error handling
  • basic-consumer.ts - Consumer with manual offset commits
  • transactional-producer.ts - Exactly-once producer pattern
  • consumer-with-dlq.ts - Dead letter queue implementation
详见examples/typescript/中的内容:
  • basic-producer.ts - 带错误处理的简单事件生产者
  • basic-consumer.ts - 手动提交偏移量的消费者
  • transactional-producer.ts - 恰好一次生产者模式
  • consumer-with-dlq.ts - 死信队列实现

Python Example (confluent-kafka-python)

Python示例(confluent-kafka-python)

See examples/python/ for:
  • basic_producer.py - Producer with delivery callbacks
  • basic_consumer.py - Consumer with error handling
  • async_producer.py - AsyncIO producer (aiokafka)
  • schema_registry.py - Avro serialization with Schema Registry
详见examples/python/中的内容:
  • basic_producer.py - 带交付回调的生产者
  • basic_consumer.py - 带错误处理的消费者
  • async_producer.py - AsyncIO生产者(aiokafka)
  • schema_registry.py - 基于Schema Registry的Avro序列化

Go Example (kafka-go)

Go示例(kafka-go)

See examples/go/ for:
  • basic_producer.go - Idiomatic Go producer
  • basic_consumer.go - Consumer with manual commits
  • high_perf_consumer.go - Concurrent processing pattern
  • batch_producer.go - Batch message sending
详见examples/go/中的内容:
  • basic_producer.go - 符合Go语言风格的生产者
  • basic_consumer.go - 手动提交偏移量的消费者
  • high_perf_consumer.go - 并发处理模式
  • batch_producer.go - 批量消息发送

Java Example (Apache Kafka)

Java示例(Apache Kafka)

See examples/java/ for:
  • BasicProducer.java - Producer with idempotence
  • BasicConsumer.java - Consumer with error recovery
  • TransactionalProducer.java - Exactly-once transactions
  • StreamsAggregation.java - Kafka Streams aggregation
详见examples/java/中的内容:
  • BasicProducer.java - 带幂等性的生产者
  • BasicConsumer.java - 带错误恢复的消费者
  • TransactionalProducer.java - 恰好一次事务
  • StreamsAggregation.java - Kafka Streams聚合

Technology Comparison

技术对比

Message Broker Comparison

消息代理对比

FeatureKafkaPulsarRedpandaRabbitMQ
ThroughputVery HighHighVery HighMedium
LatencyMediumMediumLowLow
Event ReplayYesYesYesNo
Multi-TenancyManualNativeManualManual
Operational ComplexityMediumHighLowLow
Best ForEnterprise, big dataSaaS, IoTPerformance-criticalTask queues
特性KafkaPulsarRedpandaRabbitMQ
吞吐量极高极高中等
延迟中等中等
事件重放
多租户手动配置原生支持手动配置手动配置
运维复杂度中等
最佳适用场景企业级、大数据SaaS、物联网性能敏感场景任务队列

Stream Processor Comparison

流处理器对比

FeatureFlinkSparkKafka StreamsksqlDB
Processing ModelTrue streamingMicro-batchLibrarySQL engine
LatencyMillisecondSecondMillisecondSecond
DeploymentClusterClusterEmbeddedServer
Best ForReal-time analyticsBatch + streamMicroservicesAnalysts
特性FlinkSparkKafka StreamsksqlDB
处理模型纯流式微批处理库模式SQL引擎
延迟毫秒级秒级毫秒级秒级
部署方式集群集群嵌入式服务端
最佳适用场景实时分析批流混合微服务分析师使用

Client Library Recommendations

客户端库推荐

LanguageLibraryTrust ScoreSnippetsUse Case
TypeScriptKafkaJSHigh827Web services, APIs
Pythonconfluent-kafka-pythonHigh (68.8)192Data pipelines, ML
Gokafka-goHigh42High-perf services
JavaKafka Java ClientHigh (76.9)683Enterprise, Flink/Spark
语言信任评分代码片段数适用场景
TypeScriptKafkaJS827Web服务、API
Pythonconfluent-kafka-python高(68.8)192数据管道、机器学习
Gokafka-go42高性能服务
JavaKafka Java Client高(76.9)683企业级应用、Flink/Spark

Related Skills

相关技能

For authentication and security patterns, see the auth-security skill. For infrastructure deployment (Kubernetes operators, Terraform), see the infrastructure-as-code skill. For monitoring metrics and tracing, see the observability skill. For API design patterns, see the api-design-principles skill. For data architecture and warehousing, see the data-architecture skill.
关于认证与安全模式,详见auth-security技能。 关于基础设施部署(Kubernetes Operator、Terraform),详见infrastructure-as-code技能。 关于监控指标与追踪,详见observability技能。 关于API设计模式,详见api-design-principles技能。 关于数据架构与数据仓库,详见data-architecture技能。

Troubleshooting

故障排查

Consumer Lag Issues

消费者滞后问题

  • Check partition count vs consumer count (match for parallelism)
  • Increase consumer instances or reduce processing time
  • Monitor with Kafka consumer lag metrics
  • 检查分区数与消费者数是否匹配(保证并行性)
  • 增加消费者实例或减少处理时间
  • 通过Kafka消费者滞后指标进行监控

Message Loss

消息丢失问题

  • Verify producer acks=all configuration
  • Check broker replication factor (>1)
  • Ensure consumers commit offsets after processing
  • 验证生产者acks=all配置
  • 检查代理复制因子(>1)
  • 确保消费者在处理完成后提交偏移量

Duplicate Messages

重复消息问题

  • Implement idempotent consumers (track message IDs)
  • Use exactly-once semantics (transactions)
  • Design for at-least-once delivery
  • 实现幂等消费者(跟踪消息ID)
  • 使用恰好一次语义(事务)
  • 按至少一次交付设计系统

Performance Bottlenecks

性能瓶颈问题

  • Increase partition count for parallelism
  • Tune batch size and linger time
  • Enable compression (GZIP, LZ4, Snappy)
  • See references/performance-tuning.md for details
  • 增加分区数以提升并行性
  • 调整批大小和 linger 时间
  • 启用压缩(GZIP、LZ4、Snappy)
  • 详见references/performance-tuning.md中的详细内容