From neo4j-skills
Reads Neo4j nodes/relationships into Apache Spark DataFrames and writes DataFrames back to Neo4j using the official connector. Covers PySpark/Scala setup, Databricks clusters, partitioning, and Delta Lake pipelines.
npx claudepluginhub neo4j-contrib/neo4j-skillsThis skill is limited to using the following tools:
- Reading Neo4j nodes/relationships into Spark DataFrames
Imports structured CSV/JSON/Parquet data into Neo4j using LOAD CSV, CALL IN TRANSACTIONS, neo4j-admin bulk import, APOC procedures, and driver batching. Guides method selection, constraints, validation for migrations and large datasets.
Optimizes Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.
Develops Lakeflow Spark Declarative Pipelines on Databricks for batch and streaming data pipelines using Python or SQL. Guides dataset types like Streaming Tables and features like Auto Loader, Auto CDC via decision tree.
Share bugs, ideas, or general feedback.
neo4j-driver-python-skillneo4j-cypher-skillneo4j-gds-skillneo4j-spring-data-skill| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()
org.neo4j:neo4j-connector-apache-spark_2.12 — match Scala version to runtimeneo4j.url neo4j+s://xxxx.databases.neo4j.io
neo4j.authentication.type basic
neo4j.authentication.basic.username {{secrets/neo4j/username}}
neo4j.authentication.basic.password {{secrets/neo4j/password}}
# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
| Option | Description | Default |
|---|---|---|
neo4j.url | Bolt/Neo4j URI | — (required) |
neo4j.authentication.type | none, basic, kerberos, bearer | basic |
neo4j.authentication.basic.username | Username | driver default |
neo4j.authentication.basic.password | Password | driver default |
neo4j.authentication.bearer.token | Bearer token | — |
neo4j.database | Target database | driver default |
neo4j.access.mode | read or write | read |
neo4j.encryption.enabled | TLS (ignored with +s/+ssc URI) | false |
Three mutually exclusive read modes — use exactly one per .read() call.
# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()
Multi-label filter (AND): .option("labels", ":Person:Employee")
Result includes <id> (internal Neo4j id) and <labels> columns.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())
Use explicit RETURN aliases — they become DataFrame column names. No SKIP/LIMIT in query (connector handles pagination).
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())
Result columns: <rel.id>, <rel.type>, <source.*>, <target.*>, plus relationship properties.
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())
Full read options reference: references/read-patterns.md
| SaveMode | Cypher | Requires |
|---|---|---|
Append | CREATE | nothing extra |
Overwrite | MERGE | node.keys (nodes) or *.node.keys (rels) |
ErrorIfExists | CREATE + error if exists | — |
Always create uniqueness constraints on node.keys properties before writing in Overwrite mode.
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())
(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())
node.keys with rename: .option("node.keys", "df_col:node_property,id:personId")
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()
Use coalesce(1) before relationship writes to avoid deadlocks.
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())
relationship.source.save.mode / relationship.target.save.mode:
Match — find existing nodes (fail if missing)Append — always CREATE new nodesOverwrite — MERGE nodesFull write options reference: references/write-patterns.md
# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())
Pipeline pattern for relationships — load both node sets first, then write edges:
# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Product").option("node.keys", "product_id").save()
# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
.option("relationship", "ORDERED") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Customer") \
.option("relationship.source.save.mode", "Match") \
.option("relationship.source.node.keys", "customer_id:customer_id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "Match") \
.option("relationship.target.node.keys", "product_id:product_id") \
.save()
| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | repartition(N) where N ≤ Neo4j CPU cores |
| Relationship writes (lock risk) | coalesce(1) — single partition |
| Large datasets | batch.size 10000–20000 (adjust to heap) |
| MERGE-heavy loads | Add uniqueness constraint on node.keys properties first |
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())
| Error | Cause | Fix |
|---|---|---|
ClassNotFoundException: org.neo4j.spark.DataSource | JAR not on classpath | Add spark.jars.packages or attach library |
| Deadlock on relationship write | Multiple partitions locking nodes | coalesce(1) before write |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE |
| OOM on Neo4j side | batch.size too large | Reduce to 5000–10000; check heap |
Schema all string columns | No APOC, schema not sampled | Set schema.flatten.limit higher; or use query mode with explicit types |
Access mode is read error on write | Session opened in read mode | Remove neo4j.access.mode or set to write |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
_for_spark_3)node.keys set when using Overwrite modenode.keys properties before MERGE writescoalesce(1) applied before relationship writesbatch.size sized to Neo4j heap (start 5000, tune up)query mode: no SKIP/LIMIT in Cypher (connector paginates internally)