neo4j-spark-skill

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Neo4j Connector for Apache Spark

Neo4j Connector for Apache Spark

When to Use

适用场景

  • Reading Neo4j nodes/relationships into Spark DataFrames
  • Writing Spark DataFrames to Neo4j as nodes or relationships
  • Databricks notebooks connecting to Neo4j
  • Delta Lake → Neo4j ingestion pipelines
  • Partitioned parallel reads from large Neo4j graphs
  • 将Neo4j节点/关系读取到Spark DataFrame中
  • 将Spark DataFrame写入Neo4j作为节点或关系
  • Databricks笔记本连接Neo4j
  • Delta Lake → Neo4j导入管道
  • 从大型Neo4j图中进行分区并行读取

When NOT to Use

不适用场景

  • Python bolt driver / execute_query
    neo4j-driver-python-skill
  • Cypher query writing
    neo4j-cypher-skill
  • GDS graph algorithms
    neo4j-gds-skill
  • Spring Boot + Neo4j
    neo4j-spring-data-skill

  • Python bolt驱动 / execute_query
    neo4j-driver-python-skill
  • Cypher查询编写
    neo4j-cypher-skill
  • GDS图算法
    neo4j-gds-skill
  • Spring Boot + Neo4j
    neo4j-spring-data-skill

Version Matrix

版本矩阵

ConnectorSparkScalaDatabricks RuntimeNeo4j
5.4.x3.3, 3.4, 3.52.12, 2.1312.2, 13.3, 14.3 LTS4.4, 5.x, 2025.x
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3

连接器版本Spark版本Scala版本Databricks运行时Neo4j版本
5.4.x3.3, 3.4, 3.52.12, 2.1312.2, 13.3, 14.3 LTS4.4, 5.x, 2025.x
Maven构件(Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
Scala 2.13变体:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3

Setup

配置

Standalone Spark (PySpark)

独立Spark(PySpark)

python
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("neo4j-app")
    .config("spark.jars.packages",
            "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
    .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "neo4j")
    .config("neo4j.authentication.basic.password", "password")
    .getOrCreate())
python
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .appName("neo4j-app")
    .config("spark.jars.packages",
            "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
    .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "neo4j")
    .config("neo4j.authentication.basic.password", "password")
    .getOrCreate())

Standalone Spark (Scala)

独立Spark(Scala)

scala
val spark = SparkSession.builder
  .appName("neo4j-app")
  .config("spark.jars.packages",
    "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
  .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
  .config("neo4j.authentication.type", "basic")
  .config("neo4j.authentication.basic.username", "neo4j")
  .config("neo4j.authentication.basic.password", "password")
  .getOrCreate()
scala
val spark = SparkSession.builder
  .appName("neo4j-app")
  .config("spark.jars.packages",
    "org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
  .config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
  .config("neo4j.authentication.type", "basic")
  .config("neo4j.authentication.basic.username", "neo4j")
  .config("neo4j.authentication.basic.password", "password")
  .getOrCreate()

Databricks — Cluster Installation

Databricks — 集群安装

  1. Cluster → LibrariesInstall NewMaven
  2. Search:
    org.neo4j:neo4j-connector-apache-spark_2.12
    — match Scala version to runtime
  3. Cluster → Advanced OptionsSpark tab — add config:
    neo4j.url neo4j+s://xxxx.databases.neo4j.io
    neo4j.authentication.type basic
    neo4j.authentication.basic.username {{secrets/neo4j/username}}
    neo4j.authentication.basic.password {{secrets/neo4j/password}}
  4. Use Single user access mode (Unity Catalog shared mode not supported)
  1. 集群 → 安装新库Maven
  2. 搜索:
    org.neo4j:neo4j-connector-apache-spark_2.12
    — 确保Scala版本与运行时匹配
  3. 集群 → 高级选项Spark标签页 — 添加配置:
    neo4j.url neo4j+s://xxxx.databases.neo4j.io
    neo4j.authentication.type basic
    neo4j.authentication.basic.username {{secrets/neo4j/username}}
    neo4j.authentication.basic.password {{secrets/neo4j/password}}
  4. 使用单用户访问模式(不支持Unity Catalog共享模式)

Databricks — Secrets (preferred over plaintext)

Databricks — 密钥(优先于明文)

python
undefined
python
undefined

Store credentials once:

存储凭证(执行一次):

databricks secrets create-scope --scope neo4j

databricks secrets create-scope --scope neo4j

databricks secrets put --scope neo4j --key url

databricks secrets put --scope neo4j --key url

databricks secrets put --scope neo4j --key username

databricks secrets put --scope neo4j --key username

databricks secrets put --scope neo4j --key password

databricks secrets put --scope neo4j --key password

neo4j_url = dbutils.secrets.get(scope="neo4j", key="url") neo4j_user = dbutils.secrets.get(scope="neo4j", key="username") neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url) spark.conf.set("neo4j.authentication.type", "basic") spark.conf.set("neo4j.authentication.basic.username", neo4j_user) spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)

---
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url") neo4j_user = dbutils.secrets.get(scope="neo4j", key="username") neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url) spark.conf.set("neo4j.authentication.type", "basic") spark.conf.set("neo4j.authentication.basic.username", neo4j_user) spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)

---

Key Configuration Options

关键配置选项

OptionDescriptionDefault
neo4j.url
Bolt/Neo4j URI— (required)
neo4j.authentication.type
none
,
basic
,
kerberos
,
bearer
basic
neo4j.authentication.basic.username
Usernamedriver default
neo4j.authentication.basic.password
Passworddriver default
neo4j.authentication.bearer.token
Bearer token
neo4j.database
Target databasedriver default
neo4j.access.mode
read
or
write
read
neo4j.encryption.enabled
TLS (ignored with
+s
/
+ssc
URI)
false

选项描述默认值
neo4j.url
Bolt/Neo4j URI—(必填)
neo4j.authentication.type
none
,
basic
,
kerberos
,
bearer
basic
neo4j.authentication.basic.username
用户名驱动默认值
neo4j.authentication.basic.password
密码驱动默认值
neo4j.authentication.bearer.token
Bearer令牌
neo4j.database
目标数据库驱动默认值
neo4j.access.mode
read
write
read
neo4j.encryption.enabled
TLS(使用
+s
/
+ssc
URI时忽略)
false

Reading from Neo4j

从Neo4j读取数据

Three mutually exclusive read modes — use exactly one per
.read()
call.
三种互斥的读取模式——每次
.read()
调用只能使用其中一种。

Label scan (nodes)

标签扫描(节点)

python
undefined
python
undefined

PySpark

PySpark

df = (spark.read.format("org.neo4j.spark.DataSource") .option("labels", ":Person") .load()) df.printSchema() df.show()

```scala
// Scala
val df = spark.read
  .format("org.neo4j.spark.DataSource")
  .option("labels", ":Person")
  .load()
Multi-label filter (AND):
.option("labels", ":Person:Employee")
Result includes
<id>
(internal Neo4j id) and
<labels>
columns.
df = (spark.read.format("org.neo4j.spark.DataSource") .option("labels", ":Person") .load()) df.printSchema() df.show()

```scala
// Scala
val df = spark.read
  .format("org.neo4j.spark.DataSource")
  .option("labels", ":Person")
  .load()
多标签过滤(逻辑与):
.option("labels", ":Person:Employee")
结果包含
<id>
(Neo4j内部ID)和
<labels>
列。

Cypher query read

Cypher查询读取

python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
    .load())
Use explicit RETURN aliases — they become DataFrame column names. No
SKIP
/
LIMIT
in query (connector handles pagination).
python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
    .load())
使用显式的RETURN别名——它们将成为DataFrame的列名。查询中不要使用
SKIP
/
LIMIT
(连接器会处理分页)。

Relationship scan

关系扫描

python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("relationship", "BOUGHT")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.target.labels", ":Product")
    .load())
Result columns:
<rel.id>
,
<rel.type>
,
<source.*>
,
<target.*>
, plus relationship properties.
python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("relationship", "BOUGHT")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.target.labels", ":Product")
    .load())
结果列:
<rel.id>
,
<rel.type>
,
<source.*>
,
<target.*>
, 以及关系属性。

Read partition tuning

读取分区调优

python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Transaction")
    .option("partitions", "10")        # parallel partitions (default: 1)
    .option("batch.size", "5000")      # rows per partition batch (default: 5000)
    .option("schema.flatten.limit", "100")  # rows sampled for schema inference
    .load())
Full read options reference: references/read-patterns.md

python
df = (spark.read.format("org.neo4j.spark.DataSource")
    .option("labels", ":Transaction")
    .option("partitions", "10")        # 并行分区数(默认:1)
    .option("batch.size", "5000")      # 每个分区的批处理行数(默认:5000)
    .option("schema.flatten.limit", "100")  # 用于模式推断的采样行数
    .load())
完整读取选项参考:references/read-patterns.md

Writing to Neo4j

写入Neo4j

SaveMode

SaveMode

SaveModeCypherRequires
Append
CREATE
nothing extra
Overwrite
MERGE
node.keys
(nodes) or
*.node.keys
(rels)
ErrorIfExists
CREATE
+ error if exists
Always create uniqueness constraints on
node.keys
properties before writing in
Overwrite
mode.
SaveMode对应的Cypher语句要求
Append
CREATE
无额外要求
Overwrite
MERGE
node.keys
(节点)或
*.node.keys
(关系)
ErrorIfExists
CREATE
+ 已存在则报错
Overwrite
模式下写入前,务必在
node.keys
属性上创建唯一性约束。

Write nodes — Append (CREATE)

写入节点 — Append(CREATE)

python
from pyspark.sql import Row

people = spark.createDataFrame([
    {"name": "Alice", "age": 30},
    {"name": "Bob",   "age": 25},
])

(people.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save())
python
from pyspark.sql import Row

people = spark.createDataFrame([
    {"name": "Alice", "age": 30},
    {"name": "Bob",   "age": 25},
])

(people.write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("labels", ":Person")
    .save())

Write nodes — Overwrite (MERGE)

写入节点 — Overwrite(MERGE)

python
(people.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Person")
    .option("node.keys", "name")       # comma-separated; df_col:node_prop if names differ
    .save())
node.keys
with rename:
.option("node.keys", "df_col:node_property,id:personId")
python
(people.write.format("org.neo4j.spark.DataSource")
    .mode("Overwrite")
    .option("labels", ":Person")
    .option("node.keys", "name")       # 逗号分隔;若列名与节点属性名不同,格式为df_col:node_prop
    .save())
重命名的
node.keys
用法:
.option("node.keys", "df_col:node_property,id:personId")

Write nodes — Scala

写入节点 — Scala

scala
import org.apache.spark.sql.SaveMode

peopleDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("labels", ":Person")
  .option("node.keys", "name")
  .save()
scala
import org.apache.spark.sql.SaveMode

peopleDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("labels", ":Person")
  .option("node.keys", "name")
  .save()

Write relationships

写入关系

Use
coalesce(1)
before relationship writes to avoid deadlocks.
python
rel_df = spark.createDataFrame([
    {"cust_id": "C1", "prod_id": "P1", "qty": 3},
    {"cust_id": "C2", "prod_id": "P2", "qty": 1},
])

(rel_df.coalesce(1)
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "BOUGHT")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.source.save.mode", "Match")          # require existing nodes
    .option("relationship.source.node.keys", "cust_id:id")
    .option("relationship.target.labels", ":Product")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.node.keys", "prod_id:id")
    .option("relationship.properties", "qty:quantity")
    .save())
relationship.source.save.mode
/
relationship.target.save.mode
:
  • Match
    — find existing nodes (fail if missing)
  • Append
    — always CREATE new nodes
  • Overwrite
    — MERGE nodes
Full write options reference: references/write-patterns.md

写入关系前使用
coalesce(1)
避免死锁。
python
rel_df = spark.createDataFrame([
    {"cust_id": "C1", "prod_id": "P1", "qty": 3},
    {"cust_id": "C2", "prod_id": "P2", "qty": 1},
])

(rel_df.coalesce(1)
    .write.format("org.neo4j.spark.DataSource")
    .mode("Append")
    .option("relationship", "BOUGHT")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Customer")
    .option("relationship.source.save.mode", "Match")          # 要求节点已存在
    .option("relationship.source.node.keys", "cust_id:id")
    .option("relationship.target.labels", ":Product")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.node.keys", "prod_id:id")
    .option("relationship.properties", "qty:quantity")
    .save())
relationship.source.save.mode
/
relationship.target.save.mode
:
  • Match
    — 查找已存在的节点(不存在则失败)
  • Append
    — 始终创建新节点
  • Overwrite
    — MERGE节点
完整写入选项参考:references/write-patterns.md

Databricks — Delta Lake → Neo4j Pipeline

Databricks — Delta Lake → Neo4j管道

python
undefined
python
undefined

Read from Delta table (Unity Catalog or DBFS)

从Delta表读取(Unity Catalog或DBFS)

delta_df = spark.read.format("delta").table("catalog.schema.customers")
delta_df = spark.read.format("delta").table("catalog.schema.customers")

Optional: filter/transform in Spark before writing

可选:写入前在Spark中进行过滤/转换

filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")

Write to Neo4j

写入Neo4j

(filtered.write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", ":Customer") .option("node.keys", "customer_id") .option("batch.size", "20000") .save())

Pipeline pattern for relationships — load both node sets first, then write edges:

```python
(filtered.write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", ":Customer") .option("node.keys", "customer_id") .option("batch.size", "20000") .save())

关系管道模式——先加载两个节点集,再写入边:

```python

Step 1: ensure nodes exist

步骤1:确保节点存在

customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Product").option("node.keys", "product_id").save()
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite")
.option("labels", ":Product").option("node.keys", "product_id").save()

Step 2: write relationships (single partition)

步骤2:写入关系(单分区)

orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append")
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()

---
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append")
.option("relationship", "ORDERED")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match")
.option("relationship.source.node.keys", "customer_id:customer_id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "product_id:product_id")
.save()

---

Write Performance Tuning

写入性能调优

ScenarioRecommendation
Node writes (no lock contention)
repartition(N)
where N ≤ Neo4j CPU cores
Relationship writes (lock risk)
coalesce(1)
— single partition
Large datasets
batch.size
10000–20000 (adjust to heap)
MERGE-heavy loadsAdd uniqueness constraint on
node.keys
properties first
python
undefined
场景建议
节点写入(无锁竞争)
repartition(N)
,其中N ≤ Neo4j CPU核心数
关系写入(有锁风险)
coalesce(1)
— 单分区
大型数据集
batch.size
设置为10000–20000(根据堆内存调整)
大量MERGE操作的负载先在
node.keys
属性上添加唯一性约束
python
undefined

Aggressive batch — monitor Neo4j heap; OOM risk above 50k

大批次写入——监控Neo4j堆内存;超过50k有OOM风险

(big_df.repartition(8) .write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", ":Event") .option("node.keys", "event_id") .option("batch.size", "20000") .save())

---
(big_df.repartition(8) .write.format("org.neo4j.spark.DataSource") .mode("Overwrite") .option("labels", ":Event") .option("node.keys", "event_id") .option("batch.size", "20000") .save())

---

Common Errors

常见错误

ErrorCauseFix
ClassNotFoundException: org.neo4j.spark.DataSource
JAR not on classpathAdd
spark.jars.packages
or attach library
Deadlock on relationship writeMultiple partitions locking nodes
coalesce(1)
before write
Duplicate nodes on OverwriteNo uniqueness constraint on keys
CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE
OOM on Neo4j side
batch.size
too large
Reduce to 5000–10000; check heap
Schema all
string
columns
No APOC, schema not sampledSet
schema.flatten.limit
higher; or use
query
mode with explicit types
Access mode is read
error on write
Session opened in read modeRemove
neo4j.access.mode
or set to
write
Databricks Shared cluster failsUnity Catalog shared mode unsupportedSwitch to Single User access mode

错误原因修复方案
ClassNotFoundException: org.neo4j.spark.DataSource
JAR不在类路径中添加
spark.jars.packages
或附加库
关系写入时死锁多分区锁定节点写入前使用
coalesce(1)
Overwrite模式下出现重复节点键上无唯一性约束执行
CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE
Neo4j端出现OOM
batch.size
过大
减小到5000–10000;检查堆内存
所有列都是
string
类型
无APOC,未采样模式提高
schema.flatten.limit
的值;或使用
query
模式并指定显式类型
写入时出现
Access mode is read
错误
会话以只读模式打开删除
neo4j.access.mode
或设置为
write
Databricks共享集群失败不支持Unity Catalog共享模式切换到单用户访问模式

Checklist

检查清单

  • Connector JAR version matches Spark version suffix (
    _for_spark_3
    )
  • Scala version in artifact matches cluster runtime (2.12 vs 2.13)
  • Credentials in Databricks secrets or env vars — not hardcoded
  • node.keys
    set when using
    Overwrite
    mode
  • Uniqueness constraint created on
    node.keys
    properties before MERGE writes
  • coalesce(1)
    applied before relationship writes
  • batch.size
    sized to Neo4j heap (start 5000, tune up)
  • Delta Lake → Neo4j: nodes written before relationships
  • query
    mode: no
    SKIP
    /
    LIMIT
    in Cypher (connector paginates internally)
  • Databricks: Single User access mode (not Shared)
  • 连接器JAR版本与Spark版本后缀匹配(
    _for_spark_3
  • 构件中的Scala版本与集群运行时匹配(2.12 vs 2.13)
  • 凭证存储在Databricks密钥或环境变量中——不要硬编码
  • 使用
    Overwrite
    模式时设置了
    node.keys
  • MERGE写入前在
    node.keys
    属性上创建了唯一性约束
  • 关系写入前应用了
    coalesce(1)
  • batch.size
    根据Neo4j堆内存调整(初始设为5000,逐步调优)
  • Delta Lake → Neo4j:先写入节点再写入关系
  • query
    模式:Cypher中不使用
    SKIP
    /
    LIMIT
    (连接器内部处理分页)
  • Databricks:使用单用户访问模式(非共享)