streaming-data
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseStreaming Data Processing
流式数据处理
Build production-ready event streaming systems and real-time data pipelines using modern message brokers and stream processors.
使用现代消息代理和流处理器构建可用于生产环境的事件流系统与实时数据管道。
When to Use This Skill
何时使用该技能
Use this skill when:
- Building event-driven architectures and microservices communication
- Processing real-time analytics, monitoring, or alerting systems
- Implementing data integration pipelines (CDC, ETL/ELT)
- Creating log or metrics aggregation systems
- Developing IoT platforms or high-frequency trading systems
在以下场景中使用本技能:
- 构建事件驱动架构与微服务通信机制
- 处理实时分析、监控或告警系统
- 实现数据集成管道(CDC、ETL/ELT)
- 创建日志或指标聚合系统
- 开发物联网平台或高频交易系统
Core Concepts
核心概念
Message Brokers vs Stream Processors
消息代理 vs 流处理器
Message Brokers (Kafka, Pulsar, Redpanda):
- Store and distribute event streams
- Provide durability, replay capability, partitioning
- Handle producer/consumer coordination
Stream Processors (Flink, Spark, Kafka Streams):
- Transform and aggregate streaming data
- Provide windowing, joins, stateful operations
- Execute complex event processing (CEP)
消息代理(Kafka、Pulsar、Redpanda):
- 存储并分发事件流
- 提供持久性、重放能力和分区功能
- 处理生产者/消费者的协调工作
流处理器(Flink、Spark、Kafka Streams):
- 转换并聚合流式数据
- 提供窗口计算、关联操作、有状态处理能力
- 执行复杂事件处理(CEP)
Delivery Guarantees
交付保障
At-Most-Once:
- Messages may be lost, no duplicates
- Lowest overhead
- Use for: Metrics, logs where loss is acceptable
At-Least-Once:
- Messages never lost, may have duplicates
- Moderate overhead, requires idempotent consumers
- Use for: Most applications (default choice)
Exactly-Once:
- Messages never lost or duplicated
- Highest overhead, requires transactional processing
- Use for: Financial transactions, critical state updates
至多一次:
- 消息可能丢失,但不会重复
- 开销最低
- 适用场景:指标、日志等可接受少量丢失的场景
至少一次:
- 消息不会丢失,但可能重复
- 开销中等,需要幂等消费者
- 适用场景:大多数应用(默认选择)
恰好一次:
- 消息既不会丢失也不会重复
- 开销最高,需要事务性处理
- 适用场景:金融交易、关键状态更新
Quick Start Guide
快速入门指南
Step 1: Choose a Message Broker
步骤1:选择消息代理
See references/broker-selection.md for detailed comparison.
Quick decision:
- Apache Kafka: Mature ecosystem, enterprise features, event sourcing
- Redpanda: Low latency, Kafka-compatible, simpler operations (no ZooKeeper)
- Apache Pulsar: Multi-tenancy, geo-replication, tiered storage
- RabbitMQ: Traditional message queues, RPC patterns
详见references/broker-selection.md中的详细对比。
快速决策建议:
- Apache Kafka:成熟生态、企业级功能、事件溯源首选
- Redpanda:低延迟、兼容Kafka API、运维更简单(无需ZooKeeper)
- Apache Pulsar:多租户、地理复制、分层存储
- RabbitMQ:传统消息队列、RPC模式首选
Step 2: Choose a Stream Processor (if needed)
步骤2:选择流处理器(如有需要)
See references/processor-selection.md for detailed comparison.
Quick decision:
- Apache Flink: Millisecond latency, real-time analytics, CEP
- Apache Spark: Batch + stream hybrid, ML integration, analytics
- Kafka Streams: Embedded in microservices, no separate cluster
- ksqlDB: SQL interface for stream processing
详见references/processor-selection.md中的详细对比。
快速决策建议:
- Apache Flink:毫秒级延迟、实时分析、复杂事件处理
- Apache Spark:批流混合处理、机器学习集成、数据分析
- Kafka Streams:嵌入微服务中、无需独立集群
- ksqlDB:基于SQL的流处理接口
Step 3: Implement Producer/Consumer Patterns
步骤3:实现生产者/消费者模式
Choose language-specific guide:
- TypeScript/Node.js: references/typescript-patterns.md (KafkaJS)
- Python: references/python-patterns.md (confluent-kafka-python)
- Go: references/go-patterns.md (kafka-go)
- Java/Scala: references/java-patterns.md (Apache Kafka Java Client)
选择对应语言的指南:
- TypeScript/Node.js:references/typescript-patterns.md(KafkaJS)
- Python:references/python-patterns.md(confluent-kafka-python)
- Go:references/go-patterns.md(kafka-go)
- Java/Scala:references/java-patterns.md(Apache Kafka Java Client)
Common Patterns
常见模式
Basic Producer Pattern
基础生产者模式
Send events to a topic with error handling:
1. Create producer with broker addresses
2. Configure delivery guarantees (acks, retries, idempotence)
3. Send messages with key (for partitioning) and value
4. Handle delivery callbacks or errors
5. Flush and close producer on shutdown通过错误处理机制向主题发送事件:
1. 使用代理地址创建生产者
2. 配置交付保障(acks、重试、幂等性)
3. 发送包含键(用于分区)和值的消息
4. 处理交付回调或错误
5. 关闭前刷新并关闭生产者Basic Consumer Pattern
基础消费者模式
Process events from topics with offset management:
1. Create consumer with broker addresses and group ID
2. Subscribe to topics
3. Poll for messages
4. Process each message
5. Commit offsets (auto or manual)
6. Handle errors (retry, DLQ, skip)
7. Close consumer gracefully通过偏移量管理处理来自主题的事件:
1. 使用代理地址和组ID创建消费者
2. 订阅主题
3. 拉取消息
4. 处理每条消息
5. 提交偏移量(自动或手动)
6. 处理错误(重试、死信队列、跳过)
7. 优雅关闭消费者Error Handling Strategy
错误处理策略
For production systems, implement:
- Dead Letter Queue (DLQ): Send failed messages to separate topic
- Retry Logic: Configurable retry attempts with backoff
- Graceful Shutdown: Finish processing, commit offsets, close connections
- Monitoring: Track consumer lag, error rates, throughput
对于生产级系统,需实现:
- 死信队列(DLQ):将处理失败的消息发送至独立主题
- 重试逻辑:可配置重试次数及退避策略
- 优雅关闭:完成当前处理、提交偏移量、关闭连接
- 监控:跟踪消费者滞后量、错误率、吞吐量
Decision Frameworks
决策框架
Framework: Message Broker Selection
框架:消息代理选择
START: What are requirements?
1. Need Kafka API compatibility?
YES → Kafka or Redpanda
NO → Continue
2. Is multi-tenancy critical?
YES → Apache Pulsar
NO → Continue
3. Operational simplicity priority?
YES → Redpanda (single binary, no ZooKeeper)
NO → Continue
4. Mature ecosystem needed?
YES → Apache Kafka
NO → Redpanda (better performance)
5. Task queues (not event streams)?
YES → RabbitMQ or message-queues skill
NO → Kafka/Redpanda/Pulsar开始:明确需求
1. 是否需要兼容Kafka API?
是 → Kafka或Redpanda
否 → 继续
2. 多租户是否为关键需求?
是 → Apache Pulsar
否 → 继续
3. 是否优先考虑运维简单性?
是 → Redpanda(单二进制文件、无需ZooKeeper)
否 → 继续
4. 是否需要成熟的生态系统?
是 → Apache Kafka
否 → Redpanda(性能更优)
5. 是否需要任务队列(而非事件流)?
是 → RabbitMQ或消息队列技能
否 → Kafka/Redpanda/PulsarFramework: Stream Processor Selection
框架:流处理器选择
START: What is latency requirement?
1. Millisecond-level latency needed?
YES → Apache Flink
NO → Continue
2. Batch + stream in same pipeline?
YES → Apache Spark Streaming
NO → Continue
3. Embedded in microservice?
YES → Kafka Streams
NO → Continue
4. SQL interface for analysts?
YES → ksqlDB
NO → Flink or Spark
5. Python primary language?
YES → Spark (PySpark) or Faust
NO → Flink (Java/Scala)开始:明确延迟需求
1. 是否需要毫秒级延迟?
是 → Apache Flink
否 → 继续
2. 是否需要在同一管道中处理批流混合任务?
是 → Apache Spark Streaming
否 → 继续
3. 是否需要嵌入微服务?
是 → Kafka Streams
否 → 继续
4. 是否需要为分析师提供SQL接口?
是 → ksqlDB
否 → Flink或Spark
5. 是否以Python为主要开发语言?
是 → Spark(PySpark)或Faust
否 → Flink(Java/Scala)Framework: Language Selection
框架:语言选择
TypeScript/Node.js:
- API gateways, web services, real-time dashboards
- KafkaJS library (827 code snippets, high reputation)
Python:
- Data science, ML pipelines, analytics
- confluent-kafka-python (192 snippets, score 68.8)
Go:
- High-performance microservices, infrastructure tools
- kafka-go (42 snippets, idiomatic Go)
Java/Scala:
- Enterprise applications, Kafka Streams, Flink, Spark
- Apache Kafka Java Client (683 snippets, score 76.9)
TypeScript/Node.js:
- API网关、Web服务、实时仪表盘
- KafkaJS库(827个代码片段,高认可度)
Python:
- 数据科学、机器学习管道、数据分析
- confluent-kafka-python(192个片段,评分68.8)
Go:
- 高性能微服务、基础设施工具
- kafka-go(42个片段,符合Go语言风格)
Java/Scala:
- 企业级应用、Kafka Streams、Flink、Spark
- Apache Kafka Java Client(683个片段,评分76.9)
Advanced Patterns
高级模式
Event Sourcing
事件溯源
Store state changes as immutable events. See references/event-sourcing.md for:
- Event store design patterns
- Event schema evolution
- Snapshot strategies
- Temporal queries and audit trails
将状态变更存储为不可变事件。详见references/event-sourcing.md中的内容:
- 事件存储设计模式
- 事件 schema 演进
- 快照策略
- 时间查询与审计追踪
Change Data Capture (CDC)
变更数据捕获(CDC)
Capture database changes as events. See references/cdc-patterns.md for:
- Debezium integration (MySQL, PostgreSQL, MongoDB)
- Real-time data synchronization
- Microservices data integration patterns
将数据库变更捕获为事件。详见references/cdc-patterns.md中的内容:
- Debezium集成(MySQL、PostgreSQL、MongoDB)
- 实时数据同步
- 微服务数据集成模式
Exactly-Once Processing
恰好一次处理
Implement transactional guarantees. See references/exactly-once.md for:
- Idempotent producers
- Transactional consumers
- End-to-end exactly-once pipelines
实现事务性保障。详见references/exactly-once.md中的内容:
- 幂等生产者
- 事务性消费者
- 端到端恰好一次管道
Error Handling
错误处理
Production-grade error management. See references/error-handling.md for:
- Dead letter queue patterns
- Retry strategies with exponential backoff
- Backpressure handling
- Circuit breakers for downstream failures
生产级错误管理。详见references/error-handling.md中的内容:
- 死信队列模式
- 带指数退避的重试策略
- 背压处理
- 下游故障的断路器机制
Reference Files
参考文件
Decision Guides
决策指南
- references/broker-selection.md - Kafka vs Pulsar vs Redpanda comparison
- references/processor-selection.md - Flink vs Spark vs Kafka Streams
- references/delivery-guarantees.md - At-least-once, exactly-once patterns
- references/broker-selection.md - Kafka vs Pulsar vs Redpanda对比
- references/processor-selection.md - Flink vs Spark vs Kafka Streams对比
- references/delivery-guarantees.md - 至少一次、恰好一次模式
Language-Specific Implementation
语言特定实现
- references/typescript-patterns.md - KafkaJS patterns (producer, consumer, error handling)
- references/python-patterns.md - confluent-kafka-python patterns
- references/go-patterns.md - kafka-go patterns
- references/java-patterns.md - Apache Kafka Java client patterns
- references/typescript-patterns.md - KafkaJS模式(生产者、消费者、错误处理)
- references/python-patterns.md - confluent-kafka-python模式
- references/go-patterns.md - kafka-go模式
- references/java-patterns.md - Apache Kafka Java客户端模式
Advanced Topics
高级主题
- references/event-sourcing.md - Event sourcing architecture
- references/cdc-patterns.md - Change Data Capture with Debezium
- references/exactly-once.md - Transactional processing
- references/error-handling.md - DLQ, retries, backpressure
- references/performance-tuning.md - Throughput optimization, partitioning strategies
- references/event-sourcing.md - 事件溯源架构
- references/cdc-patterns.md - 基于Debezium的变更数据捕获
- references/exactly-once.md - 事务性处理
- references/error-handling.md - DLQ、重试、背压
- references/performance-tuning.md - 吞吐量优化、分区策略
Validation Scripts
验证脚本
Run these scripts for token-free validation and generation:
运行以下脚本进行无令牌验证和生成:
Validate Kafka Configuration
验证Kafka配置
bash
python scripts/validate-kafka-config.py --config producer.yaml
python scripts/validate-kafka-config.py --config consumer.yamlChecks: broker connectivity, configuration validity, serialization format
bash
python scripts/validate-kafka-config.py --config producer.yaml
python scripts/validate-kafka-config.py --config consumer.yaml检查内容:代理连通性、配置有效性、序列化格式
Generate Schema Registry Templates
生成Schema Registry模板
bash
python scripts/generate-schema.py --type avro --entity User
python scripts/generate-schema.py --type protobuf --entity EventCreates: Avro/Protobuf schema definitions for Schema Registry
bash
python scripts/generate-schema.py --type avro --entity User
python scripts/generate-schema.py --type protobuf --entity Event生成内容:用于Schema Registry的Avro/Protobuf schema定义
Benchmark Throughput
基准吞吐量测试
bash
bash scripts/benchmark-throughput.sh --broker localhost:9092 --topic testTests: Producer/consumer throughput, latency percentiles
bash
bash scripts/benchmark-throughput.sh --broker localhost:9092 --topic test测试内容:生产者/消费者吞吐量、延迟百分位数
Code Examples
代码示例
TypeScript Example (KafkaJS)
TypeScript示例(KafkaJS)
See examples/typescript/ for:
- basic-producer.ts - Simple event producer with error handling
- basic-consumer.ts - Consumer with manual offset commits
- transactional-producer.ts - Exactly-once producer pattern
- consumer-with-dlq.ts - Dead letter queue implementation
详见examples/typescript/中的内容:
- basic-producer.ts - 带错误处理的简单事件生产者
- basic-consumer.ts - 手动提交偏移量的消费者
- transactional-producer.ts - 恰好一次生产者模式
- consumer-with-dlq.ts - 死信队列实现
Python Example (confluent-kafka-python)
Python示例(confluent-kafka-python)
See examples/python/ for:
- basic_producer.py - Producer with delivery callbacks
- basic_consumer.py - Consumer with error handling
- async_producer.py - AsyncIO producer (aiokafka)
- schema_registry.py - Avro serialization with Schema Registry
详见examples/python/中的内容:
- basic_producer.py - 带交付回调的生产者
- basic_consumer.py - 带错误处理的消费者
- async_producer.py - AsyncIO生产者(aiokafka)
- schema_registry.py - 基于Schema Registry的Avro序列化
Go Example (kafka-go)
Go示例(kafka-go)
See examples/go/ for:
- basic_producer.go - Idiomatic Go producer
- basic_consumer.go - Consumer with manual commits
- high_perf_consumer.go - Concurrent processing pattern
- batch_producer.go - Batch message sending
详见examples/go/中的内容:
- basic_producer.go - 符合Go语言风格的生产者
- basic_consumer.go - 手动提交偏移量的消费者
- high_perf_consumer.go - 并发处理模式
- batch_producer.go - 批量消息发送
Java Example (Apache Kafka)
Java示例(Apache Kafka)
See examples/java/ for:
- BasicProducer.java - Producer with idempotence
- BasicConsumer.java - Consumer with error recovery
- TransactionalProducer.java - Exactly-once transactions
- StreamsAggregation.java - Kafka Streams aggregation
详见examples/java/中的内容:
- BasicProducer.java - 带幂等性的生产者
- BasicConsumer.java - 带错误恢复的消费者
- TransactionalProducer.java - 恰好一次事务
- StreamsAggregation.java - Kafka Streams聚合
Technology Comparison
技术对比
Message Broker Comparison
消息代理对比
| Feature | Kafka | Pulsar | Redpanda | RabbitMQ |
|---|---|---|---|---|
| Throughput | Very High | High | Very High | Medium |
| Latency | Medium | Medium | Low | Low |
| Event Replay | Yes | Yes | Yes | No |
| Multi-Tenancy | Manual | Native | Manual | Manual |
| Operational Complexity | Medium | High | Low | Low |
| Best For | Enterprise, big data | SaaS, IoT | Performance-critical | Task queues |
| 特性 | Kafka | Pulsar | Redpanda | RabbitMQ |
|---|---|---|---|---|
| 吞吐量 | 极高 | 高 | 极高 | 中等 |
| 延迟 | 中等 | 中等 | 低 | 低 |
| 事件重放 | 是 | 是 | 是 | 否 |
| 多租户 | 手动配置 | 原生支持 | 手动配置 | 手动配置 |
| 运维复杂度 | 中等 | 高 | 低 | 低 |
| 最佳适用场景 | 企业级、大数据 | SaaS、物联网 | 性能敏感场景 | 任务队列 |
Stream Processor Comparison
流处理器对比
| Feature | Flink | Spark | Kafka Streams | ksqlDB |
|---|---|---|---|---|
| Processing Model | True streaming | Micro-batch | Library | SQL engine |
| Latency | Millisecond | Second | Millisecond | Second |
| Deployment | Cluster | Cluster | Embedded | Server |
| Best For | Real-time analytics | Batch + stream | Microservices | Analysts |
| 特性 | Flink | Spark | Kafka Streams | ksqlDB |
|---|---|---|---|---|
| 处理模型 | 纯流式 | 微批处理 | 库模式 | SQL引擎 |
| 延迟 | 毫秒级 | 秒级 | 毫秒级 | 秒级 |
| 部署方式 | 集群 | 集群 | 嵌入式 | 服务端 |
| 最佳适用场景 | 实时分析 | 批流混合 | 微服务 | 分析师使用 |
Client Library Recommendations
客户端库推荐
| Language | Library | Trust Score | Snippets | Use Case |
|---|---|---|---|---|
| TypeScript | KafkaJS | High | 827 | Web services, APIs |
| Python | confluent-kafka-python | High (68.8) | 192 | Data pipelines, ML |
| Go | kafka-go | High | 42 | High-perf services |
| Java | Kafka Java Client | High (76.9) | 683 | Enterprise, Flink/Spark |
| 语言 | 库 | 信任评分 | 代码片段数 | 适用场景 |
|---|---|---|---|---|
| TypeScript | KafkaJS | 高 | 827 | Web服务、API |
| Python | confluent-kafka-python | 高(68.8) | 192 | 数据管道、机器学习 |
| Go | kafka-go | 高 | 42 | 高性能服务 |
| Java | Kafka Java Client | 高(76.9) | 683 | 企业级应用、Flink/Spark |
Related Skills
相关技能
For authentication and security patterns, see the auth-security skill.
For infrastructure deployment (Kubernetes operators, Terraform), see the infrastructure-as-code skill.
For monitoring metrics and tracing, see the observability skill.
For API design patterns, see the api-design-principles skill.
For data architecture and warehousing, see the data-architecture skill.
关于认证与安全模式,详见auth-security技能。
关于基础设施部署(Kubernetes Operator、Terraform),详见infrastructure-as-code技能。
关于监控指标与追踪,详见observability技能。
关于API设计模式,详见api-design-principles技能。
关于数据架构与数据仓库,详见data-architecture技能。
Troubleshooting
故障排查
Consumer Lag Issues
消费者滞后问题
- Check partition count vs consumer count (match for parallelism)
- Increase consumer instances or reduce processing time
- Monitor with Kafka consumer lag metrics
- 检查分区数与消费者数是否匹配(保证并行性)
- 增加消费者实例或减少处理时间
- 通过Kafka消费者滞后指标进行监控
Message Loss
消息丢失问题
- Verify producer acks=all configuration
- Check broker replication factor (>1)
- Ensure consumers commit offsets after processing
- 验证生产者acks=all配置
- 检查代理复制因子(>1)
- 确保消费者在处理完成后提交偏移量
Duplicate Messages
重复消息问题
- Implement idempotent consumers (track message IDs)
- Use exactly-once semantics (transactions)
- Design for at-least-once delivery
- 实现幂等消费者(跟踪消息ID)
- 使用恰好一次语义(事务)
- 按至少一次交付设计系统
Performance Bottlenecks
性能瓶颈问题
- Increase partition count for parallelism
- Tune batch size and linger time
- Enable compression (GZIP, LZ4, Snappy)
- See references/performance-tuning.md for details
- 增加分区数以提升并行性
- 调整批大小和 linger 时间
- 启用压缩(GZIP、LZ4、Snappy)
- 详见references/performance-tuning.md中的详细内容