Loading...
Loading...
Configure and operate the Neo4j Connector for Kafka (sink + source) and the native Neo4j CDC API. Covers Cypher/Pattern/CUD sink strategies, CDC-based and query-based source, exactly-once semantics, DLQ error handling, Confluent Cloud managed connector, schema registry (Avro/JSON), and native db.cdc.query cursor-loop patterns (Neo4j 5.13+ Enterprise/Aura BC/VDC). Use when streaming Kafka events into Neo4j, streaming Neo4j changes to Kafka, or querying Neo4j change events without Kafka. Does NOT handle Cypher query authoring — use neo4j-cypher-skill. Does NOT handle bulk CSV/file import — use neo4j-import-skill. Does NOT handle GDS algorithms — use neo4j-gds-skill.
npx skill4agent add neo4j-contrib/neo4j-skills neo4j-kafka-skilldb.cdc.queryneo4j-cypher-skillneo4j-import-skillneo4j-gds-skillneo4j-cypher-skill| Use case | Strategy |
|---|---|
| Custom transformation of Kafka payload → graph | Sink: Cypher |
| Mirror another Neo4j CDC source | Sink: CDC (schema or source-id sub-strategy) |
| Map Kafka JSON fields to graph nodes/rels with no code | Sink: Pattern |
| Consume pre-formatted CUD JSON messages | Sink: CUD |
| Stream all Neo4j changes to Kafka (real-time) | Source: CDC (Neo4j 5.13+ EE/Aura BC/VDC) |
| Stream specific query results on a schedule | Source: Query |
| Consume CDC events in-process, no Kafka | Native CDC API ( |
{
"neo4j.uri": "neo4j+s://your-instance.databases.neo4j.io:7687",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "${file:/opt/secrets.properties:neo4j.password}",
"neo4j.database": "neo4j"
}BASICBEARERKERBEROSCUSTOMNONE${file:...}${env:...}UNWIND $events AS __value__value{
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "person-creates,person-updates",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.cypher.topic.person-creates":
"MERGE (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.topic.person-updates":
"MATCH (p:Person {id: __value.id}) SET p += __value.properties",
"neo4j.cypher.bind-value-as": "__value",
"neo4j.cypher.bind-key-as": "__key",
"neo4j.cypher.bind-header-as": "__header"
}MERGE (p:Person {id: __value.id})
ON CREATE SET p.createdAt = datetime(), p += __value.properties
ON MATCH SET p.updatedAt = datetime(), p += __value.properties{
"neo4j.pattern.topic.users": "(:User{!userId, name, email})",
"neo4j.pattern.topic.friendships":
"(:User{!userId: from.userId})-[:KNOWS{since}]->(:User{!userId: to.userId})"
}!propprop: field.path*-prop{
"neo4j.cdc.schema.topics": "neo4j-cdc-events"
}{
"neo4j.cdc.source-id.topics": "neo4j-cdc-events",
"neo4j.cdc.source-id.label-name": "SourceEvent",
"neo4j.cdc.source-id.property-name": "sourceId"
}CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS
FOR (n:__KafkaOffset)
REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY;{
"neo4j.eos-offset-label": "__KafkaOffset"
}{
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "neo4j-dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.deadletterqueue.topic.replication.factor": "3"
}errors.tolerance=noneall{
"connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "secret",
"neo4j.source-strategy": "CDC",
"neo4j.start-from": "NOW",
"neo4j.cdc.poll-interval": "1s",
"neo4j.cdc.poll-duration": "5s",
"neo4j.cdc.topic.person-creates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-creates.patterns.0.operation": "CREATE",
"neo4j.cdc.topic.person-updates.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-updates.patterns.0.operation": "UPDATE",
"neo4j.cdc.topic.person-deletes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.person-deletes.patterns.0.operation": "DELETE"
}neo4j.start-fromNOWEARLIEST{
"neo4j.cdc.topic.all-changes.patterns.0.pattern": "(:Person)",
"neo4j.cdc.topic.all-changes.patterns.1.pattern": "(:Organization)"
}neo4j.start-from{
"neo4j.source-strategy": "QUERY",
"neo4j.query": "MATCH (p:Person) WHERE p.updatedAt > $lastCheck RETURN p.id AS id, p.name AS name, p.updatedAt AS updatedAt",
"neo4j.query.streaming-property": "updatedAt",
"neo4j.query.topic": "person-changes",
"neo4j.query.polling-interval": "5s",
"neo4j.query.polling-duration": "10s"
}$lastCheckneo4j.query.streaming-propertydb.cdc.enabled=true// Get cursor for "right now" — start tracking from this point forward
CALL db.cdc.current() YIELD id RETURN id AS cursor;
// Get earliest available cursor (replay from history start)
CALL db.cdc.earliest() YIELD id RETURN id AS cursor;db.cdc.current()// All changes since cursor
CALL db.cdc.query($cursor, []) YIELD id, txId, seq, metadata, event
RETURN id, txId, seq, metadata, event
ORDER BY txId, seq;CALL db.cdc.query($cursor, [
{select: 'n', labels: ['Person'], operation: 'c'}
]) YIELD id, txId, seq, event
RETURN id, event.state.after.properties AS newProps
ORDER BY txId, seq;CALL db.cdc.query($cursor, [
{select: 'r', type: 'KNOWS', changesTo: ['since', 'strength']}
]) YIELD id, txId, seq, event
RETURN id, event.state.before AS before, event.state.after AS after;| Field | Values | Applies to |
|---|---|---|
| | both |
| | both |
| | nodes |
| | relationships |
| specific element ID string | both |
| | both |
| | both |
| username string | both |
| username string | both |
| | both |
{
id: STRING, // cursor for this event (use as next $cursor)
txId: INTEGER, // transaction ID
seq: INTEGER, // ordering within transaction
metadata: {
executingUser: STRING,
authenticatedUser: STRING,
captureMode: STRING, // "DIFF" or "FULL"
txStartTime: DATETIME,
txCommitTime: DATETIME,
txMetadata: MAP
},
event: {
elementId: STRING,
eventType: STRING, // "n" or "r"
operation: STRING, // "c", "u", "d"
labels: [STRING], // nodes only
type: STRING, // relationships only
keys: MAP,
state: {
before: { properties: MAP }, // null on CREATE
after: { properties: MAP } // null on DELETE
}
}
}from neo4j import GraphDatabase
driver = GraphDatabase.driver("neo4j+s://...", auth=("neo4j", "password"))
def poll_changes(cursor: str, selectors: list) -> tuple[list, str]:
records, _, _ = driver.execute_query(
"CALL db.cdc.query($cursor, $selectors) YIELD id, txId, seq, event "
"RETURN id, txId, seq, event ORDER BY txId, seq",
cursor=cursor, selectors=selectors,
database_="neo4j"
)
events = [r.data() for r in records]
# Advance cursor to last event id; keep current if no events
next_cursor = events[-1]["id"] if events else cursor
return events, next_cursor
# Bootstrap
with driver.session(database="neo4j") as s:
cursor = s.run("CALL db.cdc.current() YIELD id RETURN id").single()["id"]
selectors = [{"select": "n", "labels": ["Person"]}]
import time
while True:
events, cursor = poll_changes(cursor, selectors)
for e in events:
print(e["event"]["operation"], e["event"]["elementId"])
time.sleep(1)connector.class{
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "...",
"kafka.api.secret": "...",
"input.data.format": "JSON",
"neo4j.uri": "neo4j+s://...",
"neo4j.authentication.type": "BASIC",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "..."
}{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://your-schema-registry",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://your-schema-registry"
}{
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "https://..."
}| Error | Cause | Fix |
|---|---|---|
| | Enable in neo4j.conf or upgrade to EE/BC/VDC |
| Backup invalidates cursors | Reset |
| Key property missing in message | Validate message schema; add null check in Cypher |
| Messages replayed after restart | No EOS configured | Add |
| Connector stops on bad message | | Set |
| Converter mismatch source/sink | Match key/value converters on both ends |
Empty events from | Cursor points to current | Use |
neo4j.eos-offset-labelerrors.tolerance=allneo4j.query.streaming-propertyneo4j.start-from