azure-eventhub-py
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAzure Event Hubs SDK for Python
适用于Python的Azure Event Hubs SDK
Big data streaming platform for high-throughput event ingestion.
大数据流处理平台,用于高吞吐量事件摄入。
Installation
安装
bash
pip install azure-eventhub azure-identitybash
pip install azure-eventhub azure-identityFor checkpointing with blob storage
For checkpointing with blob storage
pip install azure-eventhub-checkpointstoreblob-aio
undefinedpip install azure-eventhub-checkpointstoreblob-aio
undefinedEnvironment Variables
环境变量
bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpointsbash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpointsAuthentication
身份验证
python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"Producer
Producer
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
Consumer
Consumer
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
undefinedconsumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
undefinedClient Types
客户端类型
| Client | Purpose |
|---|---|
| Send events to Event Hub |
| Receive events from Event Hub |
| Track consumer progress |
| 客户端 | 用途 |
|---|---|
| 向Event Hub发送事件 |
| 从Event Hub接收事件 |
| 跟踪消费者处理进度 |
Send Events
发送事件
python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Create batch (handles size limits)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Batch is full, send and create new one
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Send remaining
producer.send_batch(event_data_batch)Send to Specific Partition
发送至指定分区
python
undefinedpython
undefinedBy partition ID
By partition ID
event_data_batch = producer.create_batch(partition_id="0")
event_data_batch = producer.create_batch(partition_id="0")
By partition key (consistent hashing)
By partition key (consistent hashing)
event_data_batch = producer.create_batch(partition_key="user-123")
undefinedevent_data_batch = producer.create_batch(partition_key="user-123")
undefinedReceive Events
接收事件
Simple Receive
简单接收
python
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)python
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Beginning of stream
)With Blob Checkpoint Store (Production)
搭配Blob检查点存储(生产环境)
python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint after processing
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)Async Client
异步客户端
python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())Event Properties
事件属性
python
event = EventData("My event body")python
event = EventData("My event body")Set properties
Set properties
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
Read properties (on receive)
Read properties (on receive)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
undefinedprint(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
undefinedGet Event Hub Info
获取Event Hub信息
python
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")python
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")Best Practices
最佳实践
- Use batches for sending multiple events
- Use checkpoint store in production for reliable processing
- Use async client for high-throughput scenarios
- Use partition keys for ordered delivery within a partition
- Handle batch size limits — catch ValueError when batch is full
- Use context managers (/
with) for proper cleanupasync with - Set appropriate consumer groups for different applications
- 使用批量发送来处理多个事件
- 在生产环境中使用检查点存储以实现可靠处理
- 使用异步客户端应对高吞吐量场景
- 使用分区键保证分区内的有序投递
- 处理批量大小限制 —— 当批量已满时捕获ValueError异常
- 使用上下文管理器 (/
with) 以确保资源正确清理async with - 为不同应用设置合适的消费者组
Reference Files
参考文件
| File | Contents |
|---|---|
| references/checkpointing.md | Checkpoint store patterns, blob checkpointing, checkpoint strategies |
| references/partitions.md | Partition management, load balancing, starting positions |
| scripts/setup_consumer.py | CLI for Event Hub info, consumer setup, and event sending/receiving |
| 文件 | 内容 |
|---|---|
| references/checkpointing.md | 检查点存储模式、Blob检查点、检查点策略 |
| references/partitions.md | 分区管理、负载均衡、起始位置设置 |
| scripts/setup_consumer.py | 用于Event Hub信息查看、消费者设置以及事件收发的CLI工具 |