Loading...
Loading...
Use when reading from or writing to Neo4j with Apache Spark or Databricks using the Neo4j Connector for Apache Spark (org.neo4j:neo4j-connector-apache-spark). Covers SparkSession setup, DataFrame reads via labels/Cypher/relationship scan, DataFrame writes with SaveMode, node.keys for MERGE, relationship write mapping, partition and batch tuning, PySpark and Scala examples, Databricks cluster config, Databricks secrets for credentials, Delta Lake to Neo4j pipelines. Does NOT handle Cypher authoring — use neo4j-cypher-skill. Does NOT handle the Python bolt driver — use neo4j-driver-python-skill. Does NOT handle GDS algorithms — use neo4j-gds-skill.
npx skill4agent add neo4j-contrib/neo4j-skills neo4j-spark-skillneo4j-driver-python-skillneo4j-cypher-skillneo4j-gds-skillneo4j-spring-data-skill| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()org.neo4j:neo4j-connector-apache-spark_2.12neo4j.url neo4j+s://xxxx.databases.neo4j.io
neo4j.authentication.type basic
neo4j.authentication.basic.username {{secrets/neo4j/username}}
neo4j.authentication.basic.password {{secrets/neo4j/password}}# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)| Option | Description | Default |
|---|---|---|
| Bolt/Neo4j URI | — (required) |
| | |
| Username | driver default |
| Password | driver default |
| Bearer token | — |
| Target database | driver default |
| | |
| TLS (ignored with | |
.read()# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load().option("labels", ":Person:Employee")<id><labels>df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())SKIPLIMITdf = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())<rel.id><rel.type><source.*><target.*>df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())| SaveMode | Cypher | Requires |
|---|---|---|
| | nothing extra |
| | |
| | — |
node.keysOverwritefrom pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())node.keys.option("node.keys", "df_col:node_property,id:personId")import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()coalesce(1)rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())relationship.source.save.moderelationship.target.save.modeMatchAppendOverwrite# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Product").option("node.keys", "product_id").save()
# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
.option("relationship", "ORDERED") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Customer") \
.option("relationship.source.save.mode", "Match") \
.option("relationship.source.node.keys", "customer_id:customer_id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "Match") \
.option("relationship.target.node.keys", "product_id:product_id") \
.save()| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | |
| Relationship writes (lock risk) | |
| Large datasets | |
| MERGE-heavy loads | Add uniqueness constraint on |
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())| Error | Cause | Fix |
|---|---|---|
| JAR not on classpath | Add |
| Deadlock on relationship write | Multiple partitions locking nodes | |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | |
| OOM on Neo4j side | | Reduce to 5000–10000; check heap |
Schema all | No APOC, schema not sampled | Set |
| Session opened in read mode | Remove |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
_for_spark_3node.keysOverwritenode.keyscoalesce(1)batch.sizequerySKIPLIMIT