neo4j-kafka-skill
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseNeo4j Kafka Skill
Neo4j Kafka技能
When to Use
适用场景
- Writing Kafka events into Neo4j (sink connector — Cypher, Pattern, CDC, CUD strategies)
- Streaming Neo4j changes to Kafka topics (source connector — CDC or query-based)
- Querying Neo4j change events natively without Kafka ()
db.cdc.query - Configuring Confluent Cloud managed Neo4j sink connector
- Setting up schema registry (Avro/JSON Schema) for typed Kafka messages
- Enabling exactly-once semantics or dead-letter queue on sink
- 将Kafka事件写入Neo4j(接收器连接器——Cypher、Pattern、CDC、CUD策略)
- 将Neo4j变更流式传输到Kafka主题(源连接器——基于CDC或基于查询)
- 无需Kafka即可原生查询Neo4j变更事件()
db.cdc.query - 配置Confluent Cloud托管的Neo4j接收器连接器
- 为类型化Kafka消息设置Schema Registry(Avro/JSON Schema)
- 在接收器上启用精确一次语义或死信队列
When NOT to Use
不适用场景
- Cypher query authoring →
neo4j-cypher-skill - Bulk CSV/JSON file import →
neo4j-import-skill - GDS algorithms →
neo4j-gds-skill - Live app write patterns →
neo4j-cypher-skill
- Cypher查询编写 →
neo4j-cypher-skill - CSV/JSON文件批量导入 →
neo4j-import-skill - GDS算法 →
neo4j-gds-skill - 实时应用写入模式 →
neo4j-cypher-skill
Decision Table — Which connector strategy?
决策表——选择哪种连接器策略?
| Use case | Strategy |
|---|---|
| Custom transformation of Kafka payload → graph | Sink: Cypher |
| Mirror another Neo4j CDC source | Sink: CDC (schema or source-id sub-strategy) |
| Map Kafka JSON fields to graph nodes/rels with no code | Sink: Pattern |
| Consume pre-formatted CUD JSON messages | Sink: CUD |
| Stream all Neo4j changes to Kafka (real-time) | Source: CDC (Neo4j 5.13+ EE/Aura BC/VDC) |
| Stream specific query results on a schedule | Source: Query |
| Consume CDC events in-process, no Kafka | Native CDC API ( |
| 用例 | 策略 |
|---|---|
| 将Kafka负载自定义转换为图谱 | 接收器:Cypher |
| 镜像另一个Neo4j CDC源 | 接收器:CDC(schema或source-id子策略) |
| 无需代码即可将Kafka JSON字段映射到图谱节点/关系 | 接收器:Pattern |
| 消费预格式化的CUD JSON消息 | 接收器:CUD |
| 将所有Neo4j变更实时流式传输到Kafka | 源:CDC(Neo4j 5.13+企业版/Aura BC/VDC) |
| 按计划流式传输特定查询结果 | 源:Query |
| 进程内消费CDC事件,无需Kafka | 原生CDC API( |
Prerequisites
前置条件
- Neo4j Connector for Kafka ≥ 5.0 (download from neo4j.com/labs/kafka or Confluent Hub)
- Kafka Connect ≥ 3.x or Confluent Platform ≥ 7.x
- For CDC source/sink: Neo4j 5.13+ Enterprise Edition, AuraDB Business Critical, or AuraDB VDC
- For query source: any Neo4j edition
- Java 11+
- Neo4j Connector for Kafka ≥ 5.0(从neo4j.com/labs/kafka或Confluent Hub下载)
- Kafka Connect ≥ 3.x或Confluent Platform ≥ 7.x
- 若使用CDC源/接收器:Neo4j 5.13+企业版、AuraDB Business Critical或AuraDB VDC
- 若使用查询源:任何Neo4j版本
- Java 11+
Core Connection Config (all connectors)
核心连接配置(所有连接器)
json
{
"neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
"neo4j.database": "neo4j"
}Authentication types: | | | |
BASICBEARERKERBEROSCUSTOMNONENever hardcode passwords — use Kafka Connect secrets provider ( or ).
${file:...}${env:...}json
{
"neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
"neo4j.database": "neo4j"
}认证类型: | | | |
BASICBEARERKERBEROSCUSTOMNONE切勿硬编码密码——使用Kafka Connect密钥管理器(或)。
${file:...}${env:...}Sink Connector
接收器连接器
Strategy 1 — Cypher
策略1——Cypher
Connector auto-prepends — write query using :
UNWIND $events AS __value__valuejson
{
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "person-creates,person-updates",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.cypher.topic.person-creates":
"MERGE (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.topic.person-updates":
"MATCH (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-key-as": "__key",
"neo4j.cypher.bind-header-as": "__header"
}MERGE pattern — idempotent upsert:
cypher
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH SET p.updatedAt = datetime(), p += __value.properties连接器会自动在查询前添加——使用编写查询:
UNWIND $events AS __value__valuejson
{
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "person-creates,person-updates",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.cypher.topic.person-creates":
"MERGE (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.topic.person-updates":
"MATCH (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-key-as": "__key",
"neo4j.cypher.bind-header-as": "__header"
}MERGE模式——幂等性更新插入:
cypher
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH SET p.updatedAt = datetime(), p += __value.propertiesStrategy 2 — Pattern
策略2——Pattern
No Cypher needed — map message fields to graph via pattern syntax:
json
{
"neo4j.pattern.topic.users": "(:User{!userId, name, email})",
"neo4j.pattern.topic.friendships":
"(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}Pattern rules:
- = key property (used for MERGE)
!prop - = map from nested message field
prop: field.path - = map all message fields
* - = exclude property (cannot mix with inclusions)
-prop
无需编写Cypher——通过模式语法将消息字段映射到图谱:
json
{
"neo4j.pattern.topic.users": "(:User{!userId, name, email})",
"neo4j.pattern.topic.friendships":
"(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}Pattern规则:
- = 键属性(用于MERGE)
!prop - = 从嵌套消息字段映射
prop: field.path - = 映射所有消息字段
* - = 排除属性(不能与包含规则混用)
-prop
Strategy 3 — CDC (mirror another Neo4j)
策略3——CDC(镜像另一个Neo4j)
json
{
"neo4j.cdc.schema.topics": "neo4j-cdc-events"
}Or with source-id tracking (stores elementId as property):
json
{
"neo4j.cdc.source-id.topics": "neo4j-cdc-events",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}json
{
"neo4j.cdc.schema.topics": "neo4j-cdc-events"
}或使用source-id跟踪(将elementId存储为属性):
json
{
"neo4j.cdc.source-id.topics": "neo4j-cdc-events",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}Exactly-Once Semantics (EOS)
精确一次语义(EOS)
Requires: connector ≥ 5.3.0, Kafka broker EOS support, and a NODE KEY constraint.
Step 1 — Create constraint:
cypher
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;Step 2 — Add to connector config:
json
{
"neo4j.eos-offset-label": "__KafkaOffset"
}Without EOS: connector provides at-least-once — write idempotent Cypher (MERGE, not CREATE).
要求:连接器≥5.3.0、Kafka broker支持EOS,且存在NODE KEY约束。
步骤1——创建约束:
cypher
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;步骤2——添加到连接器配置:
json
{
"neo4j.eos-offset-label": "__KafkaOffset"
}未启用EOS时:连接器提供至少一次语义——编写幂等性Cypher(使用MERGE而非CREATE)。
Error Handling / DLQ
错误处理/DLQ
json
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "neo4j-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3"
}errors.tolerance=nonealljson
{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "neo4j-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3"
}errors.tolerance=noneallSource Connector
源连接器
CDC-Based Source (recommended, Neo4j 5.13+)
基于CDC的源(推荐,Neo4j 5.13+)
json
{
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "1s",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
"neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
"neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}neo4j.start-fromNOWEARLIESTMultiple patterns per topic — indexed 0, 1, 2...:
json
{
"neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}Cursor warning: after DB restore from backup, CDC cursors are invalidated. Reconfigure .
neo4j.start-fromjson
{
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "1s",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
"neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
"neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}neo4j.start-fromNOWEARLIEST每个主题可配置多个模式——索引为0、1、2...:
json
{
"neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}游标警告:从备份恢复数据库后,CDC游标将失效。重新配置。
neo4j.start-fromQuery-Based Source (legacy / any edition)
基于查询的源(旧版/支持所有版本)
json
{
"neo4j.source-strategy": "QUERY",
"neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
"neo4j.query.streaming-property": "updatedAt",
"neo4j.query.topic": "person-changes",
"neo4j.query.polling-interval": "5s",
"neo4j.query.polling-duration": "10s"
}$lastCheckneo4j.query.streaming-propertyjson
{
"neo4j.source-strategy": "QUERY",
"neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
"neo4j.query.streaming-property": "updatedAt",
"neo4j.query.topic": "person-changes",
"neo4j.query.polling-interval": "5s",
"neo4j.query.polling-duration": "10s"
}$lastCheckneo4j.query.streaming-propertyNative CDC API (no Kafka required)
原生CDC API(无需Kafka)
Requires: Neo4j 5.13+ Enterprise, AuraDB BC, or AuraDB VDC.
Enable CDC first (self-managed — set in neo4j.conf):
db.cdc.enabled=trueOn Aura: enabled by default on eligible tiers.
要求:Neo4j 5.13+企业版、AuraDB BC或AuraDB VDC。
首先启用CDC(自托管版——在neo4j.conf中设置):
db.cdc.enabled=trueAura环境:符合条件的版本默认启用。
Cursor Bootstrap
游标初始化
cypher
// Get cursor for "right now" — start tracking from this point forward
CALL db.cdc.current() YIELD id RETURN id AS cursor;
// Get earliest available cursor (replay from history start)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;Cursors are exclusive: does NOT include the transaction it points to.
db.cdc.current()cypher
// 获取“当前时刻”的游标——从此刻开始追踪
CALL db.cdc.current() YIELD id RETURN id AS cursor;
// 获取最早可用的游标(从历史起始点重放)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;游标为排他性:不包含其指向的事务。
db.cdc.current()Query Changes
查询变更
cypher
// All changes since cursor
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;Filtered — nodes with label Person, CREATE only:
cypher
CALL db.cdc.query($cursor, [
{select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;Filtered — specific relationship type with property change tracking:
cypher
CALL db.cdc.query($cursor, [
{select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;cypher
// 游标之后的所有变更
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;过滤查询——仅包含Person标签节点的CREATE操作:
cypher
CALL db.cdc.query($cursor, [
{select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;过滤查询——特定关系类型并跟踪属性变更:
cypher
CALL db.cdc.query($cursor, [
{select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;Selector Reference
选择器参考
| Field | Values | Applies to |
|---|---|---|
| | both |
| | both |
| | nodes |
| | relationships |
| specific element ID string | both |
| | both |
| | both |
| username string | both |
| username string | both |
| | both |
| 字段 | 取值 | 适用对象 |
|---|---|---|
| | 两者 |
| | 两者 |
| | 节点 |
| | 关系 |
| 特定元素ID字符串 | 两者 |
| | 两者 |
| | 两者 |
| 用户名字符串 | 两者 |
| 用户名字符串 | 两者 |
| | 两者 |
Event Structure
事件结构
{
id: STRING, // cursor for this event (use as next $cursor)
txId: INTEGER, // transaction ID
seq: INTEGER, // ordering within transaction
metadata: {
executingUser: STRING,
authenticatedUser: STRING,
captureMode: STRING, // "DIFF" or "FULL"
txStartTime: DATETIME,
txCommitTime: DATETIME,
txMetadata: MAP
},
event: {
elementId: STRING,
eventType: STRING, // "n" or "r"
operation: STRING, // "c", "u", "d"
labels: [STRING], // nodes only
type: STRING, // relationships only
keys: MAP,
state: {
before: { properties: MAP }, // null on CREATE
after: { properties: MAP } // null on DELETE
}
}
}{
id: STRING, // 此事件的游标(用作下一个$cursor)
txId: INTEGER, // 事务ID
seq: INTEGER, // 事务内的顺序
metadata: {
executingUser: STRING,
authenticatedUser: STRING,
captureMode: STRING, // "DIFF"或"FULL"
txStartTime: DATETIME,
txCommitTime: DATETIME,
txMetadata: MAP
},
event: {
elementId: STRING,
eventType: STRING, // "n"或"r"
operation: STRING, // "c"、"u"、"d"
labels: [STRING], // 仅节点
type: STRING, // 仅关系
keys: MAP,
state: {
before: { properties: MAP }, // CREATE操作时为null
after: { properties: MAP } // DELETE操作时为null
}
}
}Cursor-Loop Pattern (Python)
游标循环模式(Python)
python
from neo4j import GraphDatabase
driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))
def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
records, _, _ = driver.execute_query(
"CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
"RETURN id, txId, seq, event ORDER BY txId, seq",
cursor=cursor, selectors=selectors,
database_="neo4j"
)
events = [r.data() for r in records]
# Advance cursor to last event id; keep current if no events
next_cursor = events[-1]["id"] if events else cursor
return events, next_cursorpython
from neo4j import GraphDatabase
driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))
def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
records, _, _ = driver.execute_query(
"CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
"RETURN id, txId, seq, event ORDER BY txId, seq",
cursor=cursor, selectors=selectors,
database_="neo4j"
)
events = [r.data() for r in records]
# 将游标推进到最后一个事件的id;若无事件则保持当前游标
next_cursor = events[-1]["id"] if events else cursor
return events, next_cursorBootstrap
初始化
with driver.session(database="neo4j") as s:
cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time
while True:
events, cursor = poll_changes(cursor, selectors)
for e in events:
print(e["event"]["operation"], e["event"]["elementId"])
time.sleep(1)
---with driver.session(database="neo4j") as s:
cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time
while True:
events, cursor = poll_changes(cursor, selectors)
for e in events:
print(e["event"]["operation"], e["event"]["elementId"])
time.sleep(1)
---Confluent Cloud Managed Connector
Confluent Cloud托管连接器
Confluent Cloud hosts the Neo4j Sink connector as a fully managed service (no JAR upload needed).
Config differences vs self-managed:
- No field — selected in UI/API
connector.class - Credentials via Confluent Cloud secret manager or direct JSON
- Private endpoints supported (AWS PrivateLink, Azure Private Link, GCP PSC)
- Managed upgrades — pin connector version explicitly if needed
Required Confluent Cloud fields:
json
{
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "...",
"kafka.api.secret": "...",
"input.data.format": "JSON",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "..."
}One strategy per topic — cannot mix Cypher and Pattern on same topic.
Confluent Cloud将Neo4j接收器连接器作为全托管服务提供(无需上传JAR包)。
与自托管版的配置差异:
- 无字段——在UI/API中选择
connector.class - 通过Confluent Cloud密钥管理器或直接JSON提供凭证
- 支持私有端点(AWS PrivateLink、Azure Private Link、GCP PSC)
- 托管升级——如需固定版本请明确指定
Confluent Cloud必填字段:
json
{
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "...",
"kafka.api.secret": "...",
"input.data.format": "JSON",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "..."
}每个主题仅能使用一种策略——同一主题不能混用Cypher和Pattern。
Schema Registry (Avro / JSON Schema)
Schema Registry(Avro / JSON Schema)
Source connector always generates messages with schema support — must configure converters:
json
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://your-schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://your-schema-registry"
}For JSON Schema:
json
{
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://..."
}Sink converter must match source — Avro sink cannot consume JSON schema source messages.
源连接器始终生成带Schema支持的消息——必须配置转换器:
json
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://your-schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://your-schema-registry"
}对于JSON Schema:
json
{
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://..."
}接收器转换器必须与源端匹配——Avro接收器无法消费JSON Schema源端的消息。
Common Errors
常见错误
| Error | Cause | Fix |
|---|---|---|
| | Enable in neo4j.conf or upgrade to EE/BC/VDC |
| Backup invalidates cursors | Reset |
| Key property missing in message | Validate message schema; add null check in Cypher |
| Messages replayed after restart | No EOS configured | Add |
| Connector stops on bad message | | Set |
| Converter mismatch source/sink | Match key/value converters on both ends |
Empty events from | Cursor points to current | Use |
| 错误 | 原因 | 修复方案 |
|---|---|---|
| | 在neo4j.conf中启用或升级到企业版/BC/VDC |
数据库恢复后 | 备份会使游标失效 | 将 |
| 消息中缺少键属性 | 验证消息Schema;在Cypher中添加空值检查 |
| 重启后消息重放 | 未配置EOS | 添加 |
| 连接器因错误消息停止 | | 设置 |
接收器出现 | 源端与接收器转换器不匹配 | 两端的键/值转换器保持一致 |
| 游标指向当前时刻 | 使用 |
References
参考资料
- Full connector config reference — all neo4j.* properties, defaults, types
- CDC API patterns — cursor loop, selector examples, event structure detail
- Neo4j Connector for Kafka docs
- CDC docs
- 完整连接器配置参考——所有neo4j.*属性、默认值、类型
- CDC API模式——游标循环、选择器示例、事件结构详情
- Neo4j Connector for Kafka文档
- CDC文档
Checklist
检查清单
- CDC availability confirmed (Neo4j 5.13+ EE / Aura BC / VDC) if using CDC source or sink
- Uniqueness/NODE KEY constraints created before sink import (MERGE uses them)
- EOS constraint created if using
neo4j.eos-offset-label - Credentials via secrets provider — not hardcoded in config
- Cypher sink queries use MERGE (not CREATE) for idempotency
- + DLQ configured for production sink
errors.tolerance=all - Source: indexed
neo4j.query.streaming-property - Schema registry converters match on both source and sink sides
- After DB restore: CDC cursor reconfigured ()
neo4j.start-from - CDC cursor-loop: advance cursor only after successful processing
- 若使用CDC源或接收器,确认CDC可用(Neo4j 5.13+企业版/Aura BC/VDC)
- 在接收器导入前创建唯一性/NODE KEY约束(MERGE会使用这些约束)
- 若使用,创建EOS约束
neo4j.eos-offset-label - 通过密钥管理器提供凭证——不在配置中硬编码
- Cypher接收器查询使用MERGE(而非CREATE)实现幂等性
- 生产环境接收器配置+DLQ
errors.tolerance=all - 源端:已建立索引
neo4j.query.streaming-property - 源端与接收器的Schema Registry转换器匹配
- 数据库恢复后:重新配置CDC游标()
neo4j.start-from - CDC游标循环:仅在处理成功后推进游标