neo4j-kafka-skill

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Neo4j Kafka Skill

Neo4j Kafka技能

When to Use

适用场景

  • Writing Kafka events into Neo4j (sink connector — Cypher, Pattern, CDC, CUD strategies)
  • Streaming Neo4j changes to Kafka topics (source connector — CDC or query-based)
  • Querying Neo4j change events natively without Kafka (
    db.cdc.query
    )
  • Configuring Confluent Cloud managed Neo4j sink connector
  • Setting up schema registry (Avro/JSON Schema) for typed Kafka messages
  • Enabling exactly-once semantics or dead-letter queue on sink
  • 将Kafka事件写入Neo4j(接收器连接器——Cypher、Pattern、CDC、CUD策略)
  • 将Neo4j变更流式传输到Kafka主题(源连接器——基于CDC或基于查询)
  • 无需Kafka即可原生查询Neo4j变更事件(
    db.cdc.query
  • 配置Confluent Cloud托管的Neo4j接收器连接器
  • 为类型化Kafka消息设置Schema Registry(Avro/JSON Schema)
  • 在接收器上启用精确一次语义或死信队列

When NOT to Use

不适用场景

  • Cypher query authoring
    neo4j-cypher-skill
  • Bulk CSV/JSON file import
    neo4j-import-skill
  • GDS algorithms
    neo4j-gds-skill
  • Live app write patterns
    neo4j-cypher-skill

  • Cypher查询编写
    neo4j-cypher-skill
  • CSV/JSON文件批量导入
    neo4j-import-skill
  • GDS算法
    neo4j-gds-skill
  • 实时应用写入模式
    neo4j-cypher-skill

Decision Table — Which connector strategy?

决策表——选择哪种连接器策略?

Use caseStrategy
Custom transformation of Kafka payload → graphSink: Cypher
Mirror another Neo4j CDC sourceSink: CDC (schema or source-id sub-strategy)
Map Kafka JSON fields to graph nodes/rels with no codeSink: Pattern
Consume pre-formatted CUD JSON messagesSink: CUD
Stream all Neo4j changes to Kafka (real-time)Source: CDC (Neo4j 5.13+ EE/Aura BC/VDC)
Stream specific query results on a scheduleSource: Query
Consume CDC events in-process, no KafkaNative CDC API (
db.cdc.query
)

用例策略
将Kafka负载自定义转换为图谱接收器:Cypher
镜像另一个Neo4j CDC源接收器:CDC(schema或source-id子策略)
无需代码即可将Kafka JSON字段映射到图谱节点/关系接收器:Pattern
消费预格式化的CUD JSON消息接收器:CUD
将所有Neo4j变更实时流式传输到Kafka源:CDC(Neo4j 5.13+企业版/Aura BC/VDC)
按计划流式传输特定查询结果源:Query
进程内消费CDC事件,无需Kafka原生CDC API
db.cdc.query

Prerequisites

前置条件

  • Neo4j Connector for Kafka ≥ 5.0 (download from neo4j.com/labs/kafka or Confluent Hub)
  • Kafka Connect ≥ 3.x or Confluent Platform ≥ 7.x
  • For CDC source/sink: Neo4j 5.13+ Enterprise Edition, AuraDB Business Critical, or AuraDB VDC
  • For query source: any Neo4j edition
  • Java 11+

  • Neo4j Connector for Kafka ≥ 5.0(从neo4j.com/labs/kafka或Confluent Hub下载)
  • Kafka Connect ≥ 3.x或Confluent Platform ≥ 7.x
  • 若使用CDC源/接收器:Neo4j 5.13+企业版、AuraDB Business Critical或AuraDB VDC
  • 若使用查询源:任何Neo4j版本
  • Java 11+

Core Connection Config (all connectors)

核心连接配置(所有连接器)

json
{
  "neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
  "neo4j.database": "neo4j"
}
Authentication types:
BASIC
|
BEARER
|
KERBEROS
|
CUSTOM
|
NONE
Never hardcode passwords — use Kafka Connect secrets provider (
${file:...}
or
${env:...}
).

json
{
  "neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
  "neo4j.database": "neo4j"
}
认证类型:
BASIC
|
BEARER
|
KERBEROS
|
CUSTOM
|
NONE
切勿硬编码密码——使用Kafka Connect密钥管理器(
${file:...}
${env:...}
)。

Sink Connector

接收器连接器

Strategy 1 — Cypher

策略1——Cypher

Connector auto-prepends
UNWIND $events AS __value
— write query using
__value
:
json
{
  "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
  "topics": "person-creates,person-updates",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.cypher.topic.person-creates":
    "MERGE (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.topic.person-updates":
    "MATCH (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.bind-value-as": "__value",
  "neo4j.cypher.bind-key-as": "__key",
  "neo4j.cypher.bind-header-as": "__header"
}
MERGE pattern — idempotent upsert:
cypher
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH  SET p.updatedAt = datetime(), p += __value.properties
连接器会自动在查询前添加
UNWIND $events AS __value
——使用
__value
编写查询:
json
{
  "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
  "topics": "person-creates,person-updates",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.cypher.topic.person-creates":
    "MERGE (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.topic.person-updates":
    "MATCH (p:Person {id: __value.id}) SET p += __value.properties",
  "neo4j.cypher.bind-value-as": "__value",
  "neo4j.cypher.bind-key-as": "__key",
  "neo4j.cypher.bind-header-as": "__header"
}
MERGE模式——幂等性更新插入:
cypher
MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH  SET p.updatedAt = datetime(), p += __value.properties

Strategy 2 — Pattern

策略2——Pattern

No Cypher needed — map message fields to graph via pattern syntax:
json
{
  "neo4j.pattern.topic.users": "(:User{!userId, name, email})",
  "neo4j.pattern.topic.friendships":
    "(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}
Pattern rules:
  • !prop
    = key property (used for MERGE)
  • prop: field.path
    = map from nested message field
  • *
    = map all message fields
  • -prop
    = exclude property (cannot mix with inclusions)
无需编写Cypher——通过模式语法将消息字段映射到图谱:
json
{
  "neo4j.pattern.topic.users": "(:User{!userId, name, email})",
  "neo4j.pattern.topic.friendships":
    "(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}
Pattern规则:
  • !prop
    = 键属性(用于MERGE)
  • prop: field.path
    = 从嵌套消息字段映射
  • *
    = 映射所有消息字段
  • -prop
    = 排除属性(不能与包含规则混用)

Strategy 3 — CDC (mirror another Neo4j)

策略3——CDC(镜像另一个Neo4j)

json
{
  "neo4j.cdc.schema.topics": "neo4j-cdc-events"
}
Or with source-id tracking (stores elementId as property):
json
{
  "neo4j.cdc.source-id.topics": "neo4j-cdc-events",
  "neo4j.cdc.source-id.label-name": "SourceEvent",
  "neo4j.cdc.source-id.property-name": "sourceId"
}
json
{
  "neo4j.cdc.schema.topics": "neo4j-cdc-events"
}
或使用source-id跟踪(将elementId存储为属性):
json
{
  "neo4j.cdc.source-id.topics": "neo4j-cdc-events",
  "neo4j.cdc.source-id.label-name": "SourceEvent",
  "neo4j.cdc.source-id.property-name": "sourceId"
}

Exactly-Once Semantics (EOS)

精确一次语义(EOS)

Requires: connector ≥ 5.3.0, Kafka broker EOS support, and a NODE KEY constraint.
Step 1 — Create constraint:
cypher
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;
Step 2 — Add to connector config:
json
{
  "neo4j.eos-offset-label": "__KafkaOffset"
}
Without EOS: connector provides at-least-once — write idempotent Cypher (MERGE, not CREATE).
要求:连接器≥5.3.0、Kafka broker支持EOS,且存在NODE KEY约束。
步骤1——创建约束:
cypher
CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;
步骤2——添加到连接器配置:
json
{
  "neo4j.eos-offset-label": "__KafkaOffset"
}
未启用EOS时:连接器提供至少一次语义——编写幂等性Cypher(使用MERGE而非CREATE)。

Error Handling / DLQ

错误处理/DLQ

json
{
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "neo4j-dlq",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.replication.factor": "3"
}
errors.tolerance=none
(default) — stops on first error. Use
all
+ DLQ for production.

json
{
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "neo4j-dlq",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.replication.factor": "3"
}
errors.tolerance=none
(默认)——遇到第一个错误时停止。生产环境请使用
all
+DLQ。

Source Connector

源连接器

CDC-Based Source (recommended, Neo4j 5.13+)

基于CDC的源(推荐,Neo4j 5.13+)

json
{
  "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.source-strategy": "CDC",
  "neo4j.start-from": "NOW",
  "neo4j.cdc.poll-interval": "1s",
  "neo4j.cdc.poll-duration": "5s",
  "neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
  "neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
  "neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}
neo4j.start-from
options:
NOW
|
EARLIEST
| a specific cursor string
Multiple patterns per topic — indexed 0, 1, 2...:
json
{
  "neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}
Cursor warning: after DB restore from backup, CDC cursors are invalidated. Reconfigure
neo4j.start-from
.
json
{
  "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "secret",
  "neo4j.source-strategy": "CDC",
  "neo4j.start-from": "NOW",
  "neo4j.cdc.poll-interval": "1s",
  "neo4j.cdc.poll-duration": "5s",
  "neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
  "neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
  "neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}
neo4j.start-from
选项:
NOW
|
EARLIEST
| 特定游标字符串
每个主题可配置多个模式——索引为0、1、2...:
json
{
  "neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
  "neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}
游标警告:从备份恢复数据库后,CDC游标将失效。重新配置
neo4j.start-from

Query-Based Source (legacy / any edition)

基于查询的源(旧版/支持所有版本)

json
{
  "neo4j.source-strategy": "QUERY",
  "neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
  "neo4j.query.streaming-property": "updatedAt",
  "neo4j.query.topic": "person-changes",
  "neo4j.query.polling-interval": "5s",
  "neo4j.query.polling-duration": "10s"
}
$lastCheck
is auto-injected by connector.
neo4j.query.streaming-property
must be returned by the query and should be indexed.

json
{
  "neo4j.source-strategy": "QUERY",
  "neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
  "neo4j.query.streaming-property": "updatedAt",
  "neo4j.query.topic": "person-changes",
  "neo4j.query.polling-interval": "5s",
  "neo4j.query.polling-duration": "10s"
}
$lastCheck
由连接器自动注入。
neo4j.query.streaming-property
必须由查询返回且需建立索引。

Native CDC API (no Kafka required)

原生CDC API(无需Kafka)

Requires: Neo4j 5.13+ Enterprise, AuraDB BC, or AuraDB VDC.
Enable CDC first (self-managed — set in neo4j.conf):
db.cdc.enabled=true
On Aura: enabled by default on eligible tiers.
要求:Neo4j 5.13+企业版、AuraDB BC或AuraDB VDC。
首先启用CDC(自托管版——在neo4j.conf中设置):
db.cdc.enabled=true
Aura环境:符合条件的版本默认启用。

Cursor Bootstrap

游标初始化

cypher
// Get cursor for "right now" — start tracking from this point forward
CALL db.cdc.current() YIELD id RETURN id AS cursor;

// Get earliest available cursor (replay from history start)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;
Cursors are exclusive:
db.cdc.current()
does NOT include the transaction it points to.
cypher
// 获取“当前时刻”的游标——从此刻开始追踪
CALL db.cdc.current() YIELD id RETURN id AS cursor;

// 获取最早可用的游标(从历史起始点重放)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;
游标为排他性:
db.cdc.current()
不包含其指向的事务。

Query Changes

查询变更

cypher
// All changes since cursor
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;
Filtered — nodes with label Person, CREATE only:
cypher
CALL db.cdc.query($cursor, [
  {select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;
Filtered — specific relationship type with property change tracking:
cypher
CALL db.cdc.query($cursor, [
  {select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;
cypher
// 游标之后的所有变更
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;
过滤查询——仅包含Person标签节点的CREATE操作:
cypher
CALL db.cdc.query($cursor, [
  {select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;
过滤查询——特定关系类型并跟踪属性变更:
cypher
CALL db.cdc.query($cursor, [
  {select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;

Selector Reference

选择器参考

FieldValuesApplies to
select
'e'
(all),
'n'
(nodes),
'r'
(rels)
both
operation
'c'
(create),
'u'
(update),
'd'
(delete)
both
labels
['Label1','Label2']
(node must have ALL)
nodes
type
'REL_TYPE'
relationships
elementId
specific element ID stringboth
key
{propName: value}
(requires key constraint)
both
changesTo
['prop1','prop2']
(AND — all must change)
both
authenticatedUser
username stringboth
executingUser
username stringboth
txMetadata
{key: value}
both
字段取值适用对象
select
'e'
(所有)、
'n'
(节点)、
'r'
(关系)
两者
operation
'c'
(创建)、
'u'
(更新)、
'd'
(删除)
两者
labels
['Label1','Label2']
(节点必须包含所有标签)
节点
type
'REL_TYPE'
关系
elementId
特定元素ID字符串两者
key
{propName: value}
(需要键约束)
两者
changesTo
['prop1','prop2']
(需同时变更所有指定属性)
两者
authenticatedUser
用户名字符串两者
executingUser
用户名字符串两者
txMetadata
{key: value}
两者

Event Structure

事件结构

{
  id:       STRING,           // cursor for this event (use as next $cursor)
  txId:     INTEGER,          // transaction ID
  seq:      INTEGER,          // ordering within transaction
  metadata: {
    executingUser:     STRING,
    authenticatedUser: STRING,
    captureMode:       STRING,  // "DIFF" or "FULL"
    txStartTime:       DATETIME,
    txCommitTime:      DATETIME,
    txMetadata:        MAP
  },
  event: {
    elementId:  STRING,
    eventType:  STRING,         // "n" or "r"
    operation:  STRING,         // "c", "u", "d"
    labels:     [STRING],       // nodes only
    type:       STRING,         // relationships only
    keys:       MAP,
    state: {
      before: { properties: MAP },  // null on CREATE
      after:  { properties: MAP }   // null on DELETE
    }
  }
}
{
  id:       STRING,           // 此事件的游标(用作下一个$cursor)
  txId:     INTEGER,          // 事务ID
  seq:      INTEGER,          // 事务内的顺序
  metadata: {
    executingUser:     STRING,
    authenticatedUser: STRING,
    captureMode:       STRING,  // "DIFF"或"FULL"
    txStartTime:       DATETIME,
    txCommitTime:      DATETIME,
    txMetadata:        MAP
  },
  event: {
    elementId:  STRING,
    eventType:  STRING,         // "n"或"r"
    operation:  STRING,         // "c"、"u"、"d"
    labels:     [STRING],       // 仅节点
    type:       STRING,         // 仅关系
    keys:       MAP,
    state: {
      before: { properties: MAP },  // CREATE操作时为null
      after:  { properties: MAP }   // DELETE操作时为null
    }
  }
}

Cursor-Loop Pattern (Python)

游标循环模式(Python)

python
from neo4j import GraphDatabase

driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))

def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
    records, _, _ = driver.execute_query(
        "CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
        "RETURN id, txId, seq, event ORDER BY txId, seq",
        cursor=cursor, selectors=selectors,
        database_="neo4j"
    )
    events = [r.data() for r in records]
    # Advance cursor to last event id; keep current if no events
    next_cursor = events[-1]["id"] if events else cursor
    return events, next_cursor
python
from neo4j import GraphDatabase

driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))

def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
    records, _, _ = driver.execute_query(
        "CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
        "RETURN id, txId, seq, event ORDER BY txId, seq",
        cursor=cursor, selectors=selectors,
        database_="neo4j"
    )
    events = [r.data() for r in records]
    # 将游标推进到最后一个事件的id;若无事件则保持当前游标
    next_cursor = events[-1]["id"] if events else cursor
    return events, next_cursor

Bootstrap

初始化

with driver.session(database="neo4j") as s: cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time while True: events, cursor = poll_changes(cursor, selectors) for e in events: print(e["event"]["operation"], e["event"]["elementId"]) time.sleep(1)

---
with driver.session(database="neo4j") as s: cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time while True: events, cursor = poll_changes(cursor, selectors) for e in events: print(e["event"]["operation"], e["event"]["elementId"]) time.sleep(1)

---

Confluent Cloud Managed Connector

Confluent Cloud托管连接器

Confluent Cloud hosts the Neo4j Sink connector as a fully managed service (no JAR upload needed).
Config differences vs self-managed:
  • No
    connector.class
    field — selected in UI/API
  • Credentials via Confluent Cloud secret manager or direct JSON
  • Private endpoints supported (AWS PrivateLink, Azure Private Link, GCP PSC)
  • Managed upgrades — pin connector version explicitly if needed
Required Confluent Cloud fields:
json
{
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "...",
  "kafka.api.secret": "...",
  "input.data.format": "JSON",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "..."
}
One strategy per topic — cannot mix Cypher and Pattern on same topic.

Confluent Cloud将Neo4j接收器连接器作为全托管服务提供(无需上传JAR包)。
与自托管版的配置差异:
  • connector.class
    字段——在UI/API中选择
  • 通过Confluent Cloud密钥管理器或直接JSON提供凭证
  • 支持私有端点(AWS PrivateLink、Azure Private Link、GCP PSC)
  • 托管升级——如需固定版本请明确指定
Confluent Cloud必填字段:
json
{
  "kafka.auth.mode": "KAFKA_API_KEY",
  "kafka.api.key": "...",
  "kafka.api.secret": "...",
  "input.data.format": "JSON",
  "neo4j.uri": "neo4j+s://...",
  "neo4j.authentication.type": "BASIC",
  "neo4j.authentication.basic.username": "neo4j",
  "neo4j.authentication.basic.password": "..."
}
每个主题仅能使用一种策略——同一主题不能混用Cypher和Pattern。

Schema Registry (Avro / JSON Schema)

Schema Registry(Avro / JSON Schema)

Source connector always generates messages with schema support — must configure converters:
json
{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://your-schema-registry",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://your-schema-registry"
}
For JSON Schema:
json
{
  "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
  "value.converter.schema.registry.url": "https://..."
}
Sink converter must match source — Avro sink cannot consume JSON schema source messages.

源连接器始终生成带Schema支持的消息——必须配置转换器:
json
{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://your-schema-registry",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://your-schema-registry"
}
对于JSON Schema:
json
{
  "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
  "value.converter.schema.registry.url": "https://..."
}
接收器转换器必须与源端匹配——Avro接收器无法消费JSON Schema源端的消息。

Common Errors

常见错误

ErrorCauseFix
CDC is not enabled
db.cdc.enabled
not set / wrong tier
Enable in neo4j.conf or upgrade to EE/BC/VDC
Invalid cursor
after DB restore
Backup invalidates cursorsReset
neo4j.start-from
to
NOW
or
EARLIEST
Cannot merge node using null
Key property missing in messageValidate message schema; add null check in Cypher
Messages replayed after restartNo EOS configuredAdd
neo4j.eos-offset-label
+ NODE KEY constraint
Connector stops on bad message
errors.tolerance=none
(default)
Set
errors.tolerance=all
+ DLQ topic
SchemaException
on sink
Converter mismatch source/sinkMatch key/value converters on both ends
Empty events from
db.cdc.query
Cursor points to currentUse
db.cdc.earliest()
to replay; wait for new txns

错误原因修复方案
CDC is not enabled
db.cdc.enabled
未设置/版本层级不符
在neo4j.conf中启用或升级到企业版/BC/VDC
数据库恢复后
Invalid cursor
备份会使游标失效
neo4j.start-from
重置为
NOW
EARLIEST
Cannot merge node using null
消息中缺少键属性验证消息Schema;在Cypher中添加空值检查
重启后消息重放未配置EOS添加
neo4j.eos-offset-label
+NODE KEY约束
连接器因错误消息停止
errors.tolerance=none
(默认)
设置
errors.tolerance=all
+DLQ主题
接收器出现
SchemaException
源端与接收器转换器不匹配两端的键/值转换器保持一致
db.cdc.query
返回空事件
游标指向当前时刻使用
db.cdc.earliest()
重放;等待新事务

References

参考资料

  • Full connector config reference — all neo4j.* properties, defaults, types
  • CDC API patterns — cursor loop, selector examples, event structure detail
  • Neo4j Connector for Kafka docs
  • CDC docs


Checklist

检查清单

  • CDC availability confirmed (Neo4j 5.13+ EE / Aura BC / VDC) if using CDC source or sink
  • Uniqueness/NODE KEY constraints created before sink import (MERGE uses them)
  • EOS constraint created if using
    neo4j.eos-offset-label
  • Credentials via secrets provider — not hardcoded in config
  • Cypher sink queries use MERGE (not CREATE) for idempotency
  • errors.tolerance=all
    + DLQ configured for production sink
  • Source:
    neo4j.query.streaming-property
    indexed
  • Schema registry converters match on both source and sink sides
  • After DB restore: CDC cursor reconfigured (
    neo4j.start-from
    )
  • CDC cursor-loop: advance cursor only after successful processing
  • 若使用CDC源或接收器,确认CDC可用(Neo4j 5.13+企业版/Aura BC/VDC)
  • 在接收器导入前创建唯一性/NODE KEY约束(MERGE会使用这些约束)
  • 若使用
    neo4j.eos-offset-label
    ,创建EOS约束
  • 通过密钥管理器提供凭证——不在配置中硬编码
  • Cypher接收器查询使用MERGE(而非CREATE)实现幂等性
  • 生产环境接收器配置
    errors.tolerance=all
    +DLQ
  • 源端:
    neo4j.query.streaming-property
    已建立索引
  • 源端与接收器的Schema Registry转换器匹配
  • 数据库恢复后:重新配置CDC游标(
    neo4j.start-from
  • CDC游标循环:仅在处理成功后推进游标