neo4j-spark-skill
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseNeo4j Connector for Apache Spark
Neo4j Connector for Apache Spark
When to Use
适用场景
- Reading Neo4j nodes/relationships into Spark DataFrames
- Writing Spark DataFrames to Neo4j as nodes or relationships
- Databricks notebooks connecting to Neo4j
- Delta Lake → Neo4j ingestion pipelines
- Partitioned parallel reads from large Neo4j graphs
- 将Neo4j节点/关系读取到Spark DataFrame中
- 将Spark DataFrame写入Neo4j作为节点或关系
- Databricks笔记本连接Neo4j
- Delta Lake → Neo4j导入管道
- 从大型Neo4j图中进行分区并行读取
When NOT to Use
不适用场景
- Python bolt driver / execute_query →
neo4j-driver-python-skill - Cypher query writing →
neo4j-cypher-skill - GDS graph algorithms →
neo4j-gds-skill - Spring Boot + Neo4j →
neo4j-spring-data-skill
- Python bolt驱动 / execute_query →
neo4j-driver-python-skill - Cypher查询编写 →
neo4j-cypher-skill - GDS图算法 →
neo4j-gds-skill - Spring Boot + Neo4j →
neo4j-spring-data-skill
Version Matrix
版本矩阵
| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3| 连接器版本 | Spark版本 | Scala版本 | Databricks运行时 | Neo4j版本 |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven构件(Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3Scala 2.13变体:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3Setup
配置
Standalone Spark (PySpark)
独立Spark(PySpark)
python
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())python
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())Standalone Spark (Scala)
独立Spark(Scala)
scala
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()scala
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()Databricks — Cluster Installation
Databricks — 集群安装
- Cluster → Libraries → Install New → Maven
- Search: — match Scala version to runtime
org.neo4j:neo4j-connector-apache-spark_2.12 - Cluster → Advanced Options → Spark tab — add config:
neo4j.url neo4j+s://xxxx.databases.neo4j.io neo4j.authentication.type basic neo4j.authentication.basic.username {{secrets/neo4j/username}} neo4j.authentication.basic.password {{secrets/neo4j/password}} - Use Single user access mode (Unity Catalog shared mode not supported)
- 集群 → 库 → 安装新库 → Maven
- 搜索:— 确保Scala版本与运行时匹配
org.neo4j:neo4j-connector-apache-spark_2.12 - 集群 → 高级选项 → Spark标签页 — 添加配置:
neo4j.url neo4j+s://xxxx.databases.neo4j.io neo4j.authentication.type basic neo4j.authentication.basic.username {{secrets/neo4j/username}} neo4j.authentication.basic.password {{secrets/neo4j/password}} - 使用单用户访问模式(不支持Unity Catalog共享模式)
Databricks — Secrets (preferred over plaintext)
Databricks — 密钥(优先于明文)
python
undefinedpython
undefinedStore credentials once:
存储凭证(执行一次):
databricks secrets create-scope --scope neo4j
databricks secrets create-scope --scope neo4j
databricks secrets put --scope neo4j --key url
databricks secrets put --scope neo4j --key url
databricks secrets put --scope neo4j --key username
databricks secrets put --scope neo4j --key username
databricks secrets put --scope neo4j --key password
databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
---neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
---Key Configuration Options
关键配置选项
| Option | Description | Default |
|---|---|---|
| Bolt/Neo4j URI | — (required) |
| | |
| Username | driver default |
| Password | driver default |
| Bearer token | — |
| Target database | driver default |
| | |
| TLS (ignored with | |
| 选项 | 描述 | 默认值 |
|---|---|---|
| Bolt/Neo4j URI | —(必填) |
| | |
| 用户名 | 驱动默认值 |
| 密码 | 驱动默认值 |
| Bearer令牌 | — |
| 目标数据库 | 驱动默认值 |
| | |
| TLS(使用 | |
Reading from Neo4j
从Neo4j读取数据
Three mutually exclusive read modes — use exactly one per call.
.read()三种互斥的读取模式——每次调用只能使用其中一种。
.read()Label scan (nodes)
标签扫描(节点)
python
undefinedpython
undefinedPySpark
PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
```scala
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()Multi-label filter (AND):
.option("labels", ":Person:Employee")Result includes (internal Neo4j id) and columns.
<id><labels>df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
```scala
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()多标签过滤(逻辑与):
.option("labels", ":Person:Employee")结果包含(Neo4j内部ID)和列。
<id><labels>Cypher query read
Cypher查询读取
python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())Use explicit RETURN aliases — they become DataFrame column names. No / in query (connector handles pagination).
SKIPLIMITpython
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())使用显式的RETURN别名——它们将成为DataFrame的列名。查询中不要使用/(连接器会处理分页)。
SKIPLIMITRelationship scan
关系扫描
python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())Result columns: , , , , plus relationship properties.
<rel.id><rel.type><source.*><target.*>python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())结果列:, , , , 以及关系属性。
<rel.id><rel.type><source.*><target.*>Read partition tuning
读取分区调优
python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())Full read options reference: references/read-patterns.md
python
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # 并行分区数(默认:1)
.option("batch.size", "5000") # 每个分区的批处理行数(默认:5000)
.option("schema.flatten.limit", "100") # 用于模式推断的采样行数
.load())完整读取选项参考:references/read-patterns.md
Writing to Neo4j
写入Neo4j
SaveMode
SaveMode
| SaveMode | Cypher | Requires |
|---|---|---|
| | nothing extra |
| | |
| | — |
Always create uniqueness constraints on properties before writing in mode.
node.keysOverwrite| SaveMode | 对应的Cypher语句 | 要求 |
|---|---|---|
| | 无额外要求 |
| | |
| | — |
在模式下写入前,务必在属性上创建唯一性约束。
Overwritenode.keysWrite nodes — Append (CREATE)
写入节点 — Append(CREATE)
python
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())python
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())Write nodes — Overwrite (MERGE)
写入节点 — Overwrite(MERGE)
python
(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())node.keys.option("node.keys", "df_col:node_property,id:personId")python
(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # 逗号分隔;若列名与节点属性名不同,格式为df_col:node_prop
.save())重命名的用法:
node.keys.option("node.keys", "df_col:node_property,id:personId")Write nodes — Scala
写入节点 — Scala
scala
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()scala
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()Write relationships
写入关系
Use before relationship writes to avoid deadlocks.
coalesce(1)python
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())relationship.source.save.moderelationship.target.save.mode- — find existing nodes (fail if missing)
Match - — always CREATE new nodes
Append - — MERGE nodes
Overwrite
Full write options reference: references/write-patterns.md
写入关系前使用避免死锁。
coalesce(1)python
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # 要求节点已存在
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())relationship.source.save.moderelationship.target.save.mode- — 查找已存在的节点(不存在则失败)
Match - — 始终创建新节点
Append - — MERGE节点
Overwrite
完整写入选项参考:references/write-patterns.md
Databricks — Delta Lake → Neo4j Pipeline
Databricks — Delta Lake → Neo4j管道
python
undefinedpython
undefinedRead from Delta table (Unity Catalog or DBFS)
从Delta表读取(Unity Catalog或DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
delta_df = spark.read.format("delta").table("catalog.schema.customers")
Optional: filter/transform in Spark before writing
可选:写入前在Spark中进行过滤/转换
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
Write to Neo4j
写入Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())
Pipeline pattern for relationships — load both node sets first, then write edges:
```python(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())
关系管道模式——先加载两个节点集,再写入边:
```pythonStep 1: ensure nodes exist
步骤1:确保节点存在
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Customer").option("node.keys", "customer_id").save()
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Product").option("node.keys", "product_id").save()
.option("labels", ":Product").option("node.keys", "product_id").save()
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Customer").option("node.keys", "customer_id").save()
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Product").option("node.keys", "product_id").save()
.option("labels", ":Product").option("node.keys", "product_id").save()
Step 2: write relationships (single partition)
步骤2:写入关系(单分区)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append")
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()
---orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append")
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()
---Write Performance Tuning
写入性能调优
| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | |
| Relationship writes (lock risk) | |
| Large datasets | |
| MERGE-heavy loads | Add uniqueness constraint on |
python
undefined| 场景 | 建议 |
|---|---|
| 节点写入(无锁竞争) | |
| 关系写入(有锁风险) | |
| 大型数据集 | |
| 大量MERGE操作的负载 | 先在 |
python
undefinedAggressive batch — monitor Neo4j heap; OOM risk above 50k
大批次写入——监控Neo4j堆内存;超过50k有OOM风险
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())
---(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())
---Common Errors
常见错误
| Error | Cause | Fix |
|---|---|---|
| JAR not on classpath | Add |
| Deadlock on relationship write | Multiple partitions locking nodes | |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | |
| OOM on Neo4j side | | Reduce to 5000–10000; check heap |
Schema all | No APOC, schema not sampled | Set |
| Session opened in read mode | Remove |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
| 错误 | 原因 | 修复方案 |
|---|---|---|
| JAR不在类路径中 | 添加 |
| 关系写入时死锁 | 多分区锁定节点 | 写入前使用 |
| Overwrite模式下出现重复节点 | 键上无唯一性约束 | 执行 |
| Neo4j端出现OOM | | 减小到5000–10000;检查堆内存 |
所有列都是 | 无APOC,未采样模式 | 提高 |
写入时出现 | 会话以只读模式打开 | 删除 |
| Databricks共享集群失败 | 不支持Unity Catalog共享模式 | 切换到单用户访问模式 |
Checklist
检查清单
- Connector JAR version matches Spark version suffix ()
_for_spark_3 - Scala version in artifact matches cluster runtime (2.12 vs 2.13)
- Credentials in Databricks secrets or env vars — not hardcoded
- set when using
node.keysmodeOverwrite - Uniqueness constraint created on properties before MERGE writes
node.keys - applied before relationship writes
coalesce(1) - sized to Neo4j heap (start 5000, tune up)
batch.size - Delta Lake → Neo4j: nodes written before relationships
- mode: no
query/SKIPin Cypher (connector paginates internally)LIMIT - Databricks: Single User access mode (not Shared)
- 连接器JAR版本与Spark版本后缀匹配()
_for_spark_3 - 构件中的Scala版本与集群运行时匹配(2.12 vs 2.13)
- 凭证存储在Databricks密钥或环境变量中——不要硬编码
- 使用模式时设置了
Overwritenode.keys - MERGE写入前在属性上创建了唯一性约束
node.keys - 关系写入前应用了
coalesce(1) - 根据Neo4j堆内存调整(初始设为5000,逐步调优)
batch.size - Delta Lake → Neo4j:先写入节点再写入关系
- 模式:Cypher中不使用
query/SKIP(连接器内部处理分页)LIMIT - Databricks:使用单用户访问模式(非共享)