neo4j-import-skill

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Neo4j Import Skill

Neo4j数据导入技能

When to Use

适用场景

  • Importing CSV, JSON, or Parquet files into Neo4j
  • Batch-upserting nodes and relationships (UNWIND + CALL IN TRANSACTIONS)
  • Migrating relational data (SQL → graph)
  • Bulk-loading large datasets offline (neo4j-admin import)
  • Choosing between online (Cypher) and offline (admin) import methods
  • Verifying import completeness (counts, constraints, index states)
  • 将CSV、JSON或Parquet文件导入Neo4j
  • 批量更新插入节点和关系(UNWIND + CALL IN TRANSACTIONS)
  • 关系型数据迁移(SQL → 图数据)
  • 离线批量加载大型数据集(neo4j-admin import)
  • 选择在线(Cypher)与离线(admin)导入方法
  • 验证导入完整性(计数、约束、索引状态)

When NOT to Use

不适用场景

  • Unstructured docs, PDFs, vector chunks
    neo4j-document-import-skill
  • Live application writes (MERGE/CREATE in app code)
    neo4j-cypher-skill
  • neo4j-admin backup/restore/config
    neo4j-cli-tools-skill
  • GDS algorithm projection from existing graph
    neo4j-gds-skill

  • 非结构化文档、PDF、向量分块 → 使用
    neo4j-document-import-skill
  • 实时应用写入(应用代码中的MERGE/CREATE) → 使用
    neo4j-cypher-skill
  • neo4j-admin备份/恢复/配置 → 使用
    neo4j-cli-tools-skill
  • 基于现有图的GDS算法投影 → 使用
    neo4j-gds-skill

Method Decision Table

方法决策表

Dataset sizeDB stateSourceMethod
Any sizeOnlineCSV (Aura or local)LOAD CSV + CALL IN TRANSACTIONS
< 1M rowsOnlineList/API responseUNWIND + CALL IN TRANSACTIONS
> 10M rowsOffline (local/self-managed)CSV / Parquet
neo4j-admin database import full
Any sizeOnlineAPOC available
apoc.periodic.iterate
+
apoc.load.csv
Any sizeOnlineJSON/API
apoc.load.json
or driver batching
Incremental deltaOffline (Enterprise)CSV
neo4j-admin database import incremental
Aura: only
https://
URLs — no
file:///
. Use neo4j-admin import only on self-managed.

数据集大小数据库状态数据源方法
任意大小在线CSV(Aura或本地)LOAD CSV + CALL IN TRANSACTIONS
< 100万行在线列表/API响应UNWIND + CALL IN TRANSACTIONS
> 1000万行离线(本地/自托管)CSV / Parquet
neo4j-admin database import full
任意大小在线已安装APOC
apoc.periodic.iterate
+
apoc.load.csv
任意大小在线JSON/API
apoc.load.json
或驱动批量写入
增量更新离线(企业版)CSV
neo4j-admin database import incremental
Aura说明:仅支持
https://
URL,不支持
file:///
。仅在自托管环境中使用neo4j-admin import。

Pre-Import Checklist

导入前检查清单

Run in this exact order — skipping causes hard-to-debug duplicates or missed index usage:
Constraints BEFORE import. Additional indexes AFTER import.
  • Constraints create implicit RANGE indexes used by MERGE during load + enforce uniqueness
  • Additional non-unique indexes (TEXT, RANGE on non-key props, FULLTEXT) created after load — Neo4j populates them async from the committed data; poll
    populationPercent
    until 100%
  • Creating extra indexes before import slows every write during load with no benefit
  1. Create uniqueness constraints (enables index used by MERGE):
    cypher
    CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
    CREATE CONSTRAINT IF NOT EXISTS FOR (n:Movie)  REQUIRE n.movieId IS UNIQUE;
  2. Verify APOC if using apoc. procedures*:
    cypher
    RETURN apoc.version();
    If fails → APOC not installed. Use plain LOAD CSV instead.
  3. Confirm target is PRIMARY (not replica):
    cypher
    CALL dbms.cluster.role() YIELD role RETURN role;
    If role ≠
    PRIMARY
    → stop. Redirect write to PRIMARY endpoint.
  4. Count source file rows before import (catch encoding issues early):
    bash
    wc -l data/persons.csv    # Linux/macOS
  5. Verify UTF-8 encoding — LOAD CSV requires UTF-8. Re-encode if needed:
    bash
    file -i persons.csv       # Check encoding
    iconv -f latin1 -t utf-8 persons.csv > persons_utf8.csv

请严格按以下顺序执行——跳过步骤会导致难以排查的重复数据或索引未被使用的问题:
导入前创建约束,导入后创建额外索引
  • 约束会创建隐式RANGE索引,供加载过程中的MERGE操作使用,并强制唯一性
  • 额外的非唯一索引(TEXT、非键属性的RANGE、FULLTEXT)需在加载完成后创建——Neo4j会从已提交的数据中异步填充索引;需轮询
    populationPercent
    直到其值为100%
  • 导入前创建多余索引会减慢加载过程中的每一次写入,且无任何收益
  1. 创建唯一性约束(启用MERGE操作使用的索引):
    cypher
    CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
    CREATE CONSTRAINT IF NOT EXISTS FOR (n:Movie)  REQUIRE n.movieId IS UNIQUE;
  2. *若使用apoc.过程,需验证APOC是否可用
    cypher
    RETURN apoc.version();
    若执行失败→未安装APOC,请改用普通LOAD CSV。
  3. 确认目标数据库为主节点(不是副本):
    cypher
    CALL dbms.cluster.role() YIELD role RETURN role;
    若角色≠
    PRIMARY
    →停止操作,将写入请求重定向到主节点端点。
  4. 导入前统计源文件行数(提前发现编码问题):
    bash
    wc -l data/persons.csv    # Linux/macOS
  5. 验证UTF-8编码——LOAD CSV要求UTF-8编码,若不符合需重新编码:
    bash
    file -i persons.csv       # 检查编码
    iconv -f latin1 -t utf-8 persons.csv > persons_utf8.csv

LOAD CSV Patterns

LOAD CSV模式

Basic node import with type coercion and null handling

带类型转换和空值处理的基础节点导入

cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CALL (row) {
  MERGE (p:Person {id: row.id})
  ON CREATE SET
    p.name       = row.name,
    p.age        = toIntegerOrNull(row.age),
    p.score      = toFloatOrNull(row.score),
    p.active     = toBoolean(row.active),
    p.born       = CASE WHEN row.born IS NOT NULL AND row.born <> '' THEN date(row.born) ELSE null END,
    p.createdAt  = datetime()
  ON MATCH SET
    p.updatedAt  = datetime()
} IN TRANSACTIONS OF 10000 ROWS
  ON ERROR CONTINUE
  REPORT STATUS AS s
RETURN s.transactionId, s.committed, s.errorMessage
Null/empty-string rules:
  • CSV missing column →
    null
    (safe)
  • CSV empty string
    ""
    → stored as
    ""
    not
    null
    — use
    nullIf(row.x, '')
    to convert
  • toInteger(null)
    throws → always use
    toIntegerOrNull()
  • toFloat(null)
    throws → always use
    toFloatOrNull()
  • Neo4j never stores
    null
    properties — they are silently dropped on SET
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CALL (row) {
  MERGE (p:Person {id: row.id})
  ON CREATE SET
    p.name       = row.name,
    p.age        = toIntegerOrNull(row.age),
    p.score      = toFloatOrNull(row.score),
    p.active     = toBoolean(row.active),
    p.born       = CASE WHEN row.born IS NOT NULL AND row.born <> '' THEN date(row.born) ELSE null END,
    p.createdAt  = datetime()
  ON MATCH SET
    p.updatedAt  = datetime()
} IN TRANSACTIONS OF 10000 ROWS
  ON ERROR CONTINUE
  REPORT STATUS AS s
RETURN s.transactionId, s.committed, s.errorMessage
空值/空字符串规则:
  • CSV缺失列→
    null
    (安全)
  • CSV空字符串
    ""
    →存储为
    ""
    而非
    null
    ——使用
    nullIf(row.x, '')
    转换
  • toInteger(null)
    会抛出异常→务必使用
    toIntegerOrNull()
  • toFloat(null)
    会抛出异常→务必使用
    toFloatOrNull()
  • Neo4j不会存储
    null
    属性——SET操作时会自动忽略

Relationship import (nodes must exist first)

关系导入(节点必须已存在)

cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///knows.csv' AS row
CALL (row) {
  MATCH (a:Person {id: row.fromId})
  MATCH (b:Person {id: row.toId})
  MERGE (a)-[:KNOWS {since: toIntegerOrNull(row.year)}]->(b)
} IN TRANSACTIONS OF 5000 ROWS
  ON ERROR CONTINUE
  REPORT STATUS AS s
Always import ALL nodes before ANY relationships — MATCH fails on missing nodes.
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///knows.csv' AS row
CALL (row) {
  MATCH (a:Person {id: row.fromId})
  MATCH (b:Person {id: row.toId})
  MERGE (a)-[:KNOWS {since: toIntegerOrNull(row.year)}]->(b)
} IN TRANSACTIONS OF 5000 ROWS
  ON ERROR CONTINUE
  REPORT STATUS AS s
务必先导入所有节点,再导入任何关系——节点缺失会导致MATCH操作失败。

Tab-separated or custom delimiter

制表符分隔或自定义分隔符

cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///data.tsv' AS row FIELDTERMINATOR '\t'
CALL (row) { MERGE (p:Person {id: row.id}) }
IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUE
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///data.tsv' AS row FIELDTERMINATOR '\t'
CALL (row) { MERGE (p:Person {id: row.id}) }
IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUE

Compressed files (ZIP / gzip — local files only)

压缩文件(ZIP / gzip — 仅支持本地文件)

cypher
LOAD CSV WITH HEADERS FROM 'file:///archive.csv.gz' AS row ...
cypher
LOAD CSV WITH HEADERS FROM 'file:///archive.csv.gz' AS row ...

Cloud storage (Enterprise Edition)

云存储(企业版)

SchemeExample
AWS S3
s3://my-bucket/data/persons.csv
Google Cloud Storage
gs://my-bucket/persons.csv
Azure Blob
azb://account/container/persons.csv
协议示例
AWS S3
s3://my-bucket/data/persons.csv
Google Cloud Storage
gs://my-bucket/persons.csv
Azure Blob
azb://account/container/persons.csv

Useful built-in functions inside LOAD CSV

LOAD CSV中实用的内置函数

cypher
linenumber()   // current line number — use as fallback ID
file()         // absolute path of file being loaded

cypher
linenumber()   // 当前行号——用作备用ID
file()         // 正在加载的文件的绝对路径

CALL IN TRANSACTIONS — Full Reference

CALL IN TRANSACTIONS — 完整参考

Syntax

语法

cypher
CALL (row) {
  // write logic
} IN [n CONCURRENT] TRANSACTIONS
  [OF batchSize ROW[S]]
  [ON ERROR {CONTINUE | BREAK | FAIL | RETRY [FOR duration SECONDS] [THEN {CONTINUE|BREAK|FAIL}]}]
  [REPORT STATUS AS statusVar]
cypher
CALL (row) {
  // 写入逻辑
} IN [n CONCURRENT] TRANSACTIONS
  [OF batchSize ROW[S]]
  [ON ERROR {CONTINUE | BREAK | FAIL | RETRY [FOR duration SECONDS] [THEN {CONTINUE|BREAK|FAIL}]}]
  [REPORT STATUS AS statusVar]

ON ERROR modes

ON ERROR模式

ModeBehaviorUse when
ON ERROR FAIL
Default. Rolls back entire outer tx on first errorAll-or-nothing strict import
ON ERROR CONTINUE
Skips failed batch, continues remaining batchesResilient bulk load — track errors via REPORT STATUS
ON ERROR BREAK
Stops after first failed batch; keeps completed workSemi-strict: stop early, keep successful batches
ON ERROR RETRY
Exponential backoff retry (default 30s) + fallbackConcurrent writes with deadlock risk
ON ERROR CONTINUE/BREAK
→ outer transaction succeeds even if inner batches fail.
ON ERROR FAIL
→ cannot be combined with
REPORT STATUS AS
.
模式行为使用场景
ON ERROR FAIL
默认模式。首次出错时回滚整个外部事务要求全量成功的严格导入场景
ON ERROR CONTINUE
跳过失败批次,继续处理剩余批次具备容错能力的批量加载——通过REPORT STATUS跟踪错误
ON ERROR BREAK
首次失败后停止处理,保留已完成的工作半严格场景:提前停止,保留成功批次的数据
ON ERROR RETRY
指数退避重试(默认30秒)+ 降级策略存在死锁风险的并发写入场景
ON ERROR CONTINUE/BREAK
→即使内部批次失败,外部事务仍会成功
ON ERROR FAIL
→无法与
REPORT STATUS AS
组合使用。

CONCURRENT TRANSACTIONS (parallel batches)

CONCURRENT TRANSACTIONS(并行批次)

cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///large.csv' AS row
CALL (row) {
  MERGE (p:Person {id: row.id}) SET p.name = row.name
} IN 4 CONCURRENT TRANSACTIONS OF 5000 ROWS
  ON ERROR RETRY FOR 30 SECONDS THEN CONTINUE
  REPORT STATUS AS s
Use CONCURRENT for read-heavy MERGE on non-overlapping key spaces. Risk: deadlocks on overlapping writes → combine with
ON ERROR RETRY
.
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///large.csv' AS row
CALL (row) {
  MERGE (p:Person {id: row.id}) SET p.name = row.name
} IN 4 CONCURRENT TRANSACTIONS OF 5000 ROWS
  ON ERROR RETRY FOR 30 SECONDS THEN CONTINUE
  REPORT STATUS AS s
在非重叠键空间上执行读密集型MERGE时使用CONCURRENT。风险:重叠写入可能导致死锁→需结合
ON ERROR RETRY
使用。

REPORT STATUS columns

REPORT STATUS列

ColumnTypeMeaning
s.started
BOOLEANBatch transaction started
s.committed
BOOLEANBatch committed successfully
s.transactionId
STRINGTransaction ID
s.errorMessage
STRING or nullError detail if batch failed
类型含义
s.started
BOOLEAN批次事务是否已启动
s.committed
BOOLEAN批次是否已成功提交
s.transactionId
STRING事务ID
s.errorMessage
STRING或null批次失败时的错误详情

Batch size guidance

批次大小指导

Row countRecommended batch sizeNotes
< 100k10 000Default is fine
100k – 1M10 000 – 50 000Monitor heap; increase if fast
1M – 10M50 000 – 100 000Enable CONCURRENT if CPUs available
> 10M online50 000Consider neo4j-admin import instead
Relationship import5 000Lower — each batch does 2x MATCH

行数推荐批次大小说明
< 10万10000默认值即可
10万 – 100万10000 – 50000监控堆内存;若速度快可增大批次
100万 – 1000万50000 – 100000若CPU充足,启用CONCURRENT
> 1000万(在线)50000考虑改用neo4j-admin import
关系导入5000较小批次——每个批次需执行2次MATCH操作

neo4j-admin import (Offline Bulk Load)

neo4j-admin import(离线批量加载)

Fastest method: ~3 min for 31M nodes / 78M rels on SSD. DB must be stopped or non-existent.
最快的导入方法:在SSD上加载3100万节点/7800万关系约需3分钟。数据库必须处于停止状态或尚未创建。

Command structure

命令结构

bash
neo4j-admin database import full \
  --nodes=Person="persons_header.csv,persons.csv" \
  --nodes=Movie="movies_header.csv,movies.csv" \
  --relationships=ACTED_IN="acted_in_header.csv,acted_in.csv" \
  --relationships=DIRECTED="directed_header.csv,directed.csv" \
  --delimiter=, \
  --id-type=STRING \
  --bad-tolerance=0 \
  --threads=$(nproc) \
  --high-parallel-io=on \
  neo4j
For SSDs: always set
--high-parallel-io=on
. For large graphs (>34B nodes/rels):
--format=block
.
Dry run (2026.02+) — validate without writing:
bash
neo4j-admin database import full --dry-run ...
bash
neo4j-admin database import full \
  --nodes=Person="persons_header.csv,persons.csv" \
  --nodes=Movie="movies_header.csv,movies.csv" \
  --relationships=ACTED_IN="acted_in_header.csv,acted_in.csv" \
  --relationships=DIRECTED="directed_header.csv,directed.csv" \
  --delimiter=, \
  --id-type=STRING \
  --bad-tolerance=0 \
  --threads=$(nproc) \
  --high-parallel-io=on \
  neo4j
对于SSD:务必设置
--high-parallel-io=on
。对于大型图(>340亿节点/关系):使用
--format=block
试运行(2026.02+)——验证配置但不写入数据:
bash
neo4j-admin database import full --dry-run ...

Node header file format

节点头文件格式

undefined
undefined

persons_header.csv

persons_header.csv

personId:ID,name,born:int,score:float,active:boolean,:LABEL
undefined
personId:ID,name,born:int,score:float,active:boolean,:LABEL
undefined

persons.csv (data file — no header row)

persons.csv(数据文件——无表头行)

p001,Alice,1985,9.2,true,Person p002,Bob,1990,7.1,false,Person

| Field | Meaning |
|---|---|
| `:ID` | Unique ID for relationship wiring (not stored as property by default) |
| `:ID(Group)` | Scoped ID space — use when node types share IDs |
| `:LABEL` | One or more labels; semicolon-separated: `Person;Employee` |
| `prop:int` | Typed property; types: `int long float double boolean byte short string` |
| `prop:date` | Temporal: `date localtime time localdatetime datetime duration` |
| `prop:int[]` | Array — semicolon-separated values in cell: `1;2;3` |
| `prop:vector` | Float vector (2025.10+) — semicolon-separated coordinates |
p001,Alice,1985,9.2,true,Person p002,Bob,1990,7.1,false,Person

| 字段 | 含义 |
|---|---|
| `:ID` | 用于关系关联的唯一ID(默认不作为属性存储) |
| `:ID(Group)` | 作用域ID空间——当不同节点类型共享ID时使用 |
| `:LABEL` | 一个或多个标签;用分号分隔:`Person;Employee` |
| `prop:int` | 带类型的属性;支持类型:`int long float double boolean byte short string` |
| `prop:date` | 时间类型:`date localtime time localdatetime datetime duration` |
| `prop:int[]` | 数组——单元格中用分号分隔值:`1;2;3` |
| `prop:vector` | 浮点向量(2025.10+)——单元格中用分号分隔坐标 |

Relationship header file format

关系头文件格式

undefined
undefined

acted_in_header.csv

acted_in_header.csv

:START_ID(Person),:END_ID(Movie),role,:TYPE
undefined
:START_ID(Person),:END_ID(Movie),role,:TYPE
undefined

acted_in.csv

acted_in.csv

p001,tt0133093,Neo,ACTED_IN p002,tt0133093,Morpheus,ACTED_IN

`:START_ID` / `:END_ID` must reference the same `:ID` group as the node files.
p001,tt0133093,Neo,ACTED_IN p002,tt0133093,Morpheus,ACTED_IN

`:START_ID` / `:END_ID`必须与节点文件中的`:ID`组对应。

Key flags

关键参数

FlagDefaultNotes
--delimiter
,
Single char or
TAB
--id-type
STRING
STRING | INTEGER | ACTUAL
--bad-tolerance
-1
(unlimited, changed 2025.12)
Set
0
for strict prod imports
--threads
CPU countSet explicitly on shared hosts
--max-off-heap-memory
90% RAMReduce if other services share host
--high-parallel-io
off
Set
on
for SSD/NVMe
--format
standard
block
for >34B nodes/rels
--overwrite-destination
falseRequired if DB already exists
--dry-run
false2026.02+ — validate without writing
参数默认值说明
--delimiter
,
单个字符或
TAB
--id-type
STRING
STRING | INTEGER | ACTUAL
--bad-tolerance
-1
(无限制,2025.12起变更)
生产环境严格导入时设置为
0
--threads
CPU核心数在共享主机上需显式设置
--max-off-heap-memory
90%内存若主机上运行其他服务,需调低该值
--high-parallel-io
off
SSD/NVMe存储时设置为
on
--format
standard
超过340亿节点/关系时使用
block
--overwrite-destination
false若数据库已存在,需设置该参数
--dry-run
false2026.02+——验证配置但不写入数据

Schema file (--schema) [Enterprise, block format]

模式文件(--schema)[企业版,block格式]

Pass a Cypher file with
CREATE CONSTRAINT
/
CREATE INDEX
statements; executed automatically after import completes. Constraints are created first (correct order enforced). File paths can be local or remote (
s3://
,
gs://
,
https://
).
bash
neo4j-admin database import full \
  --format=block \
  --schema=schema.cypher \
  --nodes=Person="persons_header.csv,persons.csv" \
  neo4j
cypher
// schema.cypher
CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
CREATE CONSTRAINT movie_id  IF NOT EXISTS FOR (n:Movie)  REQUIRE n.id IS UNIQUE;
CREATE RANGE INDEX person_email IF NOT EXISTS FOR (n:Person) ON (n.email);
CREATE TEXT  INDEX movie_title  IF NOT EXISTS FOR (n:Movie)  ON (n.title);
For incremental import,
DROP CONSTRAINT
/
DROP INDEX
are also supported [2025.02+] — used to remove indexes before the merge phase and recreate them after for faster writes.

传入包含
CREATE CONSTRAINT
/
CREATE INDEX
语句的Cypher文件;导入完成后会自动执行。约束会优先创建(强制执行正确顺序)。文件路径可以是本地或远程(
s3://
,
gs://
,
https://
)。
bash
neo4j-admin database import full \
  --format=block \
  --schema=schema.cypher \
  --nodes=Person="persons_header.csv,persons.csv" \
  neo4j
cypher
// schema.cypher
CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
CREATE CONSTRAINT movie_id  IF NOT EXISTS FOR (n:Movie)  REQUIRE n.id IS UNIQUE;
CREATE RANGE INDEX person_email IF NOT EXISTS FOR (n:Person) ON (n.email);
CREATE TEXT  INDEX movie_title  IF NOT EXISTS FOR (n:Movie)  ON (n.title);
对于增量导入,2025.02+版本也支持
DROP CONSTRAINT
/
DROP INDEX
——用于在合并阶段前删除索引,合并完成后重新创建以提高写入速度。

Incremental import (Enterprise only)

增量导入(仅企业版)

Three-phase process — use when DB must stay online during import preparation:
bash
undefined
三阶段流程——适用于导入准备期间数据库需保持在线的场景:
bash
undefined

Phase 1: Prepare staging area

阶段1:准备暂存区

neo4j-admin database import incremental --stage=prepare
--nodes=Person=persons_header.csv,delta.csv --force neo4j
neo4j-admin database import incremental --stage=prepare
--nodes=Person=persons_header.csv,delta.csv --force neo4j

Phase 2: Build indexes (DB can be read-only during this phase)

阶段2:构建索引(此阶段数据库可处于只读状态)

neo4j-admin database import incremental --stage=build neo4j
neo4j-admin database import incremental --stage=build neo4j

Phase 3: Merge into live database (brief write-lock)

阶段3:合并到在线数据库(短暂写锁)

neo4j-admin database import incremental --stage=merge neo4j

Requires Enterprise Edition + `block` store format.

---
neo4j-admin database import incremental --stage=merge neo4j

需使用企业版+`block`存储格式。

---

APOC Patterns (when APOC is available)

APOC模式(已安装APOC时使用)

Verify first:
RETURN apoc.version()
— if fails, use LOAD CSV or driver instead.
先验证:
RETURN apoc.version()
——若执行失败,请改用LOAD CSV或驱动。

apoc.periodic.iterate — batch-process existing graph data

apoc.periodic.iterate — 批量处理现有图数据

cypher
CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE NOT (p)-[:HAS_ACCOUNT]->() RETURN p",
  "CREATE (p)-[:HAS_ACCOUNT]->(a:Account {id: randomUUID()})",
  {batchSize: 10000, parallel: false, retries: 2}
) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
Config keyDefaultNotes
batchSize
10000Rows per inner transaction
parallel
falseEnable for non-overlapping writes; risk: deadlocks
retries
0Retry failed batches N times with 100ms delay
Prefer
CALL IN TRANSACTIONS
(native Cypher) over
apoc.periodic.iterate
for new code — it has
REPORT STATUS
,
CONCURRENT
, and
RETRY
built in without APOC dependency.
cypher
CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE NOT (p)-[:HAS_ACCOUNT]->() RETURN p",
  "CREATE (p)-[:HAS_ACCOUNT]->(a:Account {id: randomUUID()})",
  {batchSize: 10000, parallel: false, retries: 2}
) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages
配置项默认值说明
batchSize
10000内部事务处理的行数
parallel
false非重叠写入场景可启用;风险:死锁
retries
0失败批次重试N次,每次间隔100ms
对于新代码,优先使用
CALL IN TRANSACTIONS
(原生Cypher)而非
apoc.periodic.iterate
——它内置了
REPORT STATUS
CONCURRENT
RETRY
功能,无需依赖APOC。

apoc.load.csv — load with config options

apoc.load.csv — 带配置选项的加载

cypher
CALL apoc.load.csv('file:///persons.csv', {
  header: true,
  sep: ',',
  skip: 1,
  limit: 1000000
}) YIELD lineNo, map, list
CALL (map) {
  MERGE (p:Person {id: map.id}) SET p.name = map.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUE
cypher
CALL apoc.load.csv('file:///persons.csv', {
  header: true,
  sep: ',',
  skip: 1,
  limit: 1000000
}) YIELD lineNo, map, list
CALL (map) {
  MERGE (p:Person {id: map.id}) SET p.name = map.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUE

apoc.load.json — load JSON from file or URL

apoc.load.json — 从文件或URL加载JSON

cypher
CALL apoc.load.json('https://api.example.com/persons') YIELD value
CALL (value) {
  MERGE (p:Person {id: value.id}) SET p.name = value.name
} IN TRANSACTIONS OF 1000 ROWS ON ERROR CONTINUE

cypher
CALL apoc.load.json('https://api.example.com/persons') YIELD value
CALL (value) {
  MERGE (p:Person {id: value.id}) SET p.name = value.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUE

Driver Batch Write Pattern

驱动批量写入模式

Use when source is not a file (API responses, DB migrations). Collect into
BATCH_SIZE
(10 000) lists, call
UNWIND $rows AS row MERGE ...
per batch. ~10x faster than row-at-a-time. → Python + JS examples

适用于数据源不是文件的场景(API响应、数据库迁移)。将数据收集为
BATCH_SIZE
(10000)的列表,每个批次调用
UNWIND $rows AS row MERGE ...
。速度比逐行写入快约10倍。→ Python + JS示例

MCP Tool Usage

MCP工具使用

OperationMCP toolNotes
SHOW CONSTRAINTS
,
SHOW INDEXES
read-cypher
Always inspect before import
CREATE CONSTRAINT
,
CREATE INDEX
write-cypher
Gate: show planned constraint, confirm
LOAD CSV / CALL IN TRANSACTIONS
write-cypher
Gate: show row count + Cypher, confirm
Verify counts
read-cypher
Post-import:
MATCH (n:Label) RETURN count(n)
Poll index state
read-cypher
Poll until all
state = 'ONLINE'
Write gate — before any bulk write via MCP, show:
  1. Query + affected labels
  2. Estimated row count from source
  3. EXPLAIN
    plan
Wait for user confirmation. Never auto-execute
CALL IN TRANSACTIONS
or
CREATE CONSTRAINT
without confirmation.
Always pass
database
param if not default:
{"code": "...", "database": "neo4j"}
.

操作MCP工具说明
SHOW CONSTRAINTS
,
SHOW INDEXES
read-cypher
导入前务必检查
CREATE CONSTRAINT
,
CREATE INDEX
write-cypher
前置检查:展示计划创建的约束,确认后执行
LOAD CSV / CALL IN TRANSACTIONS
write-cypher
前置检查:展示行数统计+Cypher语句,确认后执行
验证计数
read-cypher
导入后:
MATCH (n:Label) RETURN count(n)
轮询索引状态
read-cypher
轮询直到所有索引
state = 'ONLINE'
写入前置检查——通过MCP执行任何批量写入前,需展示:
  1. 查询语句+受影响的标签
  2. 数据源的预估行数
  3. EXPLAIN
    执行计划
等待用户确认。未经确认,切勿自动执行
CALL IN TRANSACTIONS
CREATE CONSTRAINT
若使用非默认数据库,务必传入
database
参数:
{"code": "...", "database": "neo4j"}

Common Errors

常见错误

ErrorCauseFix
Couldn't load the external resource
file:///
path not in Neo4j import dir
Move file to
$NEO4J_HOME/import/
; check
dbms.security.allow_csv_import_from_file_urls=true
Cannot merge node using null property value
MERGE key resolved to nullValidate
row.id IS NOT NULL
before MERGE; add
WHERE row.id IS NOT NULL
toInteger() called on null
Null column fed to non-null-safe fnReplace
toInteger()
toIntegerOrNull()
,
toFloat()
toFloatOrNull()
Node N already exists
/ constraint violation mid-import
Duplicate source IDsDedup source CSV; use
MERGE
not
CREATE
; add
IF NOT EXISTS
to constraint
Heap overflow / OutOfMemoryErrorBatch too large or file too largeReduce batch size; switch to
CALL IN TRANSACTIONS
; neo4j-admin for offline
Invalid input 'IN': expected...'
PERIODIC COMMIT
used
Replace
USING PERIODIC COMMIT
CALL IN TRANSACTIONS
— PERIODIC COMMIT removed in Cypher 25
neo4j-admin:
Bad input data
Wrong header format or type mismatchCheck
:ID
,
:START_ID
,
:END_ID
present; check typed columns parse correctly
neo4j-admin: import fails silently
--bad-tolerance
default was unlimited pre-2025.12
Set
--bad-tolerance=0
to surface all errors
Index not used during MERGEConstraint not created before importDrop data, create constraint, re-import
Relationship import missing nodesRelationships imported before nodesAlways import ALL node files before ANY relationship files

错误原因修复方案
Couldn't load the external resource
file:///
路径不在Neo4j导入目录中
将文件移动到
$NEO4J_HOME/import/
;检查
dbms.security.allow_csv_import_from_file_urls=true
Cannot merge node using null property value
MERGE键解析为nullMERGE前验证
row.id IS NOT NULL
;添加
WHERE row.id IS NOT NULL
toInteger() called on null
空列传入了非空安全函数
toInteger()
替换为
toIntegerOrNull()
toFloat()
替换为
toFloatOrNull()
Node N already exists
/ 导入中途约束冲突
数据源存在重复ID对源CSV去重;使用
MERGE
而非
CREATE
;为约束添加
IF NOT EXISTS
堆溢出 / OutOfMemoryError批次过大或文件过大减小批次大小;改用
CALL IN TRANSACTIONS
;离线场景使用neo4j-admin
Invalid input 'IN': expected...'
使用了
PERIODIC COMMIT
USING PERIODIC COMMIT
替换为
CALL IN TRANSACTIONS
——PERIODIC COMMIT在Cypher 25中已移除
neo4j-admin:
Bad input data
头文件格式错误或类型不匹配检查
:ID
:START_ID
:END_ID
是否存在;检查带类型的列是否能正确解析
neo4j-admin: 导入静默失败2025.12前
--bad-tolerance
默认无限制
设置
--bad-tolerance=0
以暴露所有错误
MERGE未使用索引导入前未创建约束删除数据,创建约束,重新导入
关系导入缺失节点先导入了关系再导入节点务必先导入所有节点文件,再导入任何关系文件

Post-Import Validation

导入后验证

After import completes — run all:
cypher
// Row counts per label
MATCH (n:Person) RETURN count(n) AS persons;
MATCH ()-[:KNOWS]->() RETURN count(*) AS knows_rels;

// After import: create additional non-unique indexes (populated async)
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);
CREATE RANGE INDEX person_born IF NOT EXISTS FOR (n:Person) ON (n.born);

// Poll population — wait until populationPercent = 100 before opening to queries
SHOW INDEXES YIELD name, state, populationPercent
WHERE state <> 'ONLINE' OR populationPercent < 100
RETURN name, state, populationPercent
ORDER BY populationPercent;

// Spot check: null keys = import bug
MATCH (p:Person) WHERE p.id IS NULL RETURN count(p) AS missing_id;
Do NOT run production queries until all indexes are ONLINE.

导入完成后,需执行以下所有操作:
cypher
// 按标签统计行数
MATCH (n:Person) RETURN count(n) AS persons;
MATCH ()-[:KNOWS]->() RETURN count(*) AS knows_rels;

// 导入后:创建额外的非唯一索引(异步填充)
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);
CREATE RANGE INDEX person_born IF NOT EXISTS FOR (n:Person) ON (n.born);

// 轮询填充进度——所有索引populationPercent = 100后再开放查询
SHOW INDEXES YIELD name, state, populationPercent
WHERE state <> 'ONLINE' OR populationPercent < 100
RETURN name, state, populationPercent
ORDER BY populationPercent;

// 抽样检查:空键表示导入存在问题
MATCH (p:Person) WHERE p.id IS NULL RETURN count(p) AS missing_id;
所有索引处于ONLINE状态前,请勿运行生产查询。

References

参考资料



Checklist

检查清单

  • Uniqueness constraints created before any MERGE-based import
  • APOC availability verified if using
    apoc.*
    procedures
  • Target confirmed as PRIMARY (not replica)
  • Source files validated: UTF-8 encoding, expected row count, no BOM
  • LOAD CSV uses
    toIntegerOrNull()
    /
    toFloatOrNull()
    — never bare
    toInteger()
    /
    toFloat()
  • nullIf(row.x, '')
    applied where empty string ≠ null
  • CALL IN TRANSACTIONS
    used (not
    USING PERIODIC COMMIT
    )
  • ON ERROR CONTINUE
    +
    REPORT STATUS
    for production loads
  • Node import completed before relationship import
  • neo4j-admin:
    --bad-tolerance=0
    set;
    --high-parallel-io=on
    for SSD
  • Post-import: row counts match source; all indexes ONLINE
  • Write execution gate applied (MCP): showed query + estimate, got confirmation
  • Credentials in
    .env
    ;
    .env
    in
    .gitignore
  • 所有基于MERGE的导入前已创建唯一性约束
  • 使用
    apoc.*
    过程前已验证APOC可用性
  • 已确认目标数据库为主节点(不是副本)
  • 已验证源文件:UTF-8编码、行数符合预期、无BOM
  • LOAD CSV使用了
    toIntegerOrNull()
    /
    toFloatOrNull()
    ——从未使用裸
    toInteger()
    /
    toFloat()
  • 对空字符串≠null的字段应用了
    nullIf(row.x, '')
  • 使用了
    CALL IN TRANSACTIONS
    (而非
    USING PERIODIC COMMIT
  • 生产环境加载使用了
    ON ERROR CONTINUE
    +
    REPORT STATUS
  • 节点导入完成后才开始关系导入
  • neo4j-admin设置了
    --bad-tolerance=0
    ;SSD存储时设置了
    --high-parallel-io=on
  • 导入后:行数与数据源匹配;所有索引处于ONLINE状态
  • 已应用写入执行前置检查(MCP):展示查询+预估行数,获得用户确认
  • 凭据存储在
    .env
    中;
    .env
    已加入
    .gitignore