neo4j-import-skill
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseNeo4j Import Skill
Neo4j数据导入技能
When to Use
适用场景
- Importing CSV, JSON, or Parquet files into Neo4j
- Batch-upserting nodes and relationships (UNWIND + CALL IN TRANSACTIONS)
- Migrating relational data (SQL → graph)
- Bulk-loading large datasets offline (neo4j-admin import)
- Choosing between online (Cypher) and offline (admin) import methods
- Verifying import completeness (counts, constraints, index states)
- 将CSV、JSON或Parquet文件导入Neo4j
- 批量更新插入节点和关系(UNWIND + CALL IN TRANSACTIONS)
- 关系型数据迁移(SQL → 图数据)
- 离线批量加载大型数据集(neo4j-admin import)
- 选择在线(Cypher)与离线(admin)导入方法
- 验证导入完整性(计数、约束、索引状态)
When NOT to Use
不适用场景
- Unstructured docs, PDFs, vector chunks →
neo4j-document-import-skill - Live application writes (MERGE/CREATE in app code) →
neo4j-cypher-skill - neo4j-admin backup/restore/config →
neo4j-cli-tools-skill - GDS algorithm projection from existing graph →
neo4j-gds-skill
- 非结构化文档、PDF、向量分块 → 使用
neo4j-document-import-skill - 实时应用写入(应用代码中的MERGE/CREATE) → 使用
neo4j-cypher-skill - neo4j-admin备份/恢复/配置 → 使用
neo4j-cli-tools-skill - 基于现有图的GDS算法投影 → 使用
neo4j-gds-skill
Method Decision Table
方法决策表
| Dataset size | DB state | Source | Method |
|---|---|---|---|
| Any size | Online | CSV (Aura or local) | LOAD CSV + CALL IN TRANSACTIONS |
| < 1M rows | Online | List/API response | UNWIND + CALL IN TRANSACTIONS |
| > 10M rows | Offline (local/self-managed) | CSV / Parquet | |
| Any size | Online | APOC available | |
| Any size | Online | JSON/API | |
| Incremental delta | Offline (Enterprise) | CSV | |
Aura: only URLs — no . Use neo4j-admin import only on self-managed.
https://file:///| 数据集大小 | 数据库状态 | 数据源 | 方法 |
|---|---|---|---|
| 任意大小 | 在线 | CSV(Aura或本地) | LOAD CSV + CALL IN TRANSACTIONS |
| < 100万行 | 在线 | 列表/API响应 | UNWIND + CALL IN TRANSACTIONS |
| > 1000万行 | 离线(本地/自托管) | CSV / Parquet | |
| 任意大小 | 在线 | 已安装APOC | |
| 任意大小 | 在线 | JSON/API | |
| 增量更新 | 离线(企业版) | CSV | |
Aura说明:仅支持 URL,不支持。仅在自托管环境中使用neo4j-admin import。
https://file:///Pre-Import Checklist
导入前检查清单
Run in this exact order — skipping causes hard-to-debug duplicates or missed index usage:
Constraints BEFORE import. Additional indexes AFTER import.
- Constraints create implicit RANGE indexes used by MERGE during load + enforce uniqueness
- Additional non-unique indexes (TEXT, RANGE on non-key props, FULLTEXT) created after load — Neo4j populates them async from the committed data; poll until 100%
populationPercent - Creating extra indexes before import slows every write during load with no benefit
-
Create uniqueness constraints (enables index used by MERGE):cypher
CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT IF NOT EXISTS FOR (n:Movie) REQUIRE n.movieId IS UNIQUE; -
Verify APOC if using apoc. procedures*:cypher
RETURN apoc.version();If fails → APOC not installed. Use plain LOAD CSV instead. -
Confirm target is PRIMARY (not replica):cypher
CALL dbms.cluster.role() YIELD role RETURN role;If role ≠→ stop. Redirect write to PRIMARY endpoint.PRIMARY -
Count source file rows before import (catch encoding issues early):bash
wc -l data/persons.csv # Linux/macOS -
Verify UTF-8 encoding — LOAD CSV requires UTF-8. Re-encode if needed:bash
file -i persons.csv # Check encoding iconv -f latin1 -t utf-8 persons.csv > persons_utf8.csv
请严格按以下顺序执行——跳过步骤会导致难以排查的重复数据或索引未被使用的问题:
导入前创建约束,导入后创建额外索引
- 约束会创建隐式RANGE索引,供加载过程中的MERGE操作使用,并强制唯一性
- 额外的非唯一索引(TEXT、非键属性的RANGE、FULLTEXT)需在加载完成后创建——Neo4j会从已提交的数据中异步填充索引;需轮询直到其值为100%
populationPercent - 导入前创建多余索引会减慢加载过程中的每一次写入,且无任何收益
-
创建唯一性约束(启用MERGE操作使用的索引):cypher
CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE; CREATE CONSTRAINT IF NOT EXISTS FOR (n:Movie) REQUIRE n.movieId IS UNIQUE; -
*若使用apoc.过程,需验证APOC是否可用:cypher
RETURN apoc.version();若执行失败→未安装APOC,请改用普通LOAD CSV。 -
确认目标数据库为主节点(不是副本):cypher
CALL dbms.cluster.role() YIELD role RETURN role;若角色≠→停止操作,将写入请求重定向到主节点端点。PRIMARY -
导入前统计源文件行数(提前发现编码问题):bash
wc -l data/persons.csv # Linux/macOS -
验证UTF-8编码——LOAD CSV要求UTF-8编码,若不符合需重新编码:bash
file -i persons.csv # 检查编码 iconv -f latin1 -t utf-8 persons.csv > persons_utf8.csv
LOAD CSV Patterns
LOAD CSV模式
Basic node import with type coercion and null handling
带类型转换和空值处理的基础节点导入
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CALL (row) {
MERGE (p:Person {id: row.id})
ON CREATE SET
p.name = row.name,
p.age = toIntegerOrNull(row.age),
p.score = toFloatOrNull(row.score),
p.active = toBoolean(row.active),
p.born = CASE WHEN row.born IS NOT NULL AND row.born <> '' THEN date(row.born) ELSE null END,
p.createdAt = datetime()
ON MATCH SET
p.updatedAt = datetime()
} IN TRANSACTIONS OF 10000 ROWS
ON ERROR CONTINUE
REPORT STATUS AS s
RETURN s.transactionId, s.committed, s.errorMessageNull/empty-string rules:
- CSV missing column → (safe)
null - CSV empty string → stored as
""not""— usenullto convertnullIf(row.x, '') - throws → always use
toInteger(null)toIntegerOrNull() - throws → always use
toFloat(null)toFloatOrNull() - Neo4j never stores properties — they are silently dropped on SET
null
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CALL (row) {
MERGE (p:Person {id: row.id})
ON CREATE SET
p.name = row.name,
p.age = toIntegerOrNull(row.age),
p.score = toFloatOrNull(row.score),
p.active = toBoolean(row.active),
p.born = CASE WHEN row.born IS NOT NULL AND row.born <> '' THEN date(row.born) ELSE null END,
p.createdAt = datetime()
ON MATCH SET
p.updatedAt = datetime()
} IN TRANSACTIONS OF 10000 ROWS
ON ERROR CONTINUE
REPORT STATUS AS s
RETURN s.transactionId, s.committed, s.errorMessage空值/空字符串规则:
- CSV缺失列→(安全)
null - CSV空字符串→存储为
""而非""——使用null转换nullIf(row.x, '') - 会抛出异常→务必使用
toInteger(null)toIntegerOrNull() - 会抛出异常→务必使用
toFloat(null)toFloatOrNull() - Neo4j不会存储属性——SET操作时会自动忽略
null
Relationship import (nodes must exist first)
关系导入(节点必须已存在)
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///knows.csv' AS row
CALL (row) {
MATCH (a:Person {id: row.fromId})
MATCH (b:Person {id: row.toId})
MERGE (a)-[:KNOWS {since: toIntegerOrNull(row.year)}]->(b)
} IN TRANSACTIONS OF 5000 ROWS
ON ERROR CONTINUE
REPORT STATUS AS sAlways import ALL nodes before ANY relationships — MATCH fails on missing nodes.
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///knows.csv' AS row
CALL (row) {
MATCH (a:Person {id: row.fromId})
MATCH (b:Person {id: row.toId})
MERGE (a)-[:KNOWS {since: toIntegerOrNull(row.year)}]->(b)
} IN TRANSACTIONS OF 5000 ROWS
ON ERROR CONTINUE
REPORT STATUS AS s务必先导入所有节点,再导入任何关系——节点缺失会导致MATCH操作失败。
Tab-separated or custom delimiter
制表符分隔或自定义分隔符
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///data.tsv' AS row FIELDTERMINATOR '\t'
CALL (row) { MERGE (p:Person {id: row.id}) }
IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUEcypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///data.tsv' AS row FIELDTERMINATOR '\t'
CALL (row) { MERGE (p:Person {id: row.id}) }
IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUECompressed files (ZIP / gzip — local files only)
压缩文件(ZIP / gzip — 仅支持本地文件)
cypher
LOAD CSV WITH HEADERS FROM 'file:///archive.csv.gz' AS row ...cypher
LOAD CSV WITH HEADERS FROM 'file:///archive.csv.gz' AS row ...Cloud storage (Enterprise Edition)
云存储(企业版)
| Scheme | Example |
|---|---|
| AWS S3 | |
| Google Cloud Storage | |
| Azure Blob | |
| 协议 | 示例 |
|---|---|
| AWS S3 | |
| Google Cloud Storage | |
| Azure Blob | |
Useful built-in functions inside LOAD CSV
LOAD CSV中实用的内置函数
cypher
linenumber() // current line number — use as fallback ID
file() // absolute path of file being loadedcypher
linenumber() // 当前行号——用作备用ID
file() // 正在加载的文件的绝对路径CALL IN TRANSACTIONS — Full Reference
CALL IN TRANSACTIONS — 完整参考
Syntax
语法
cypher
CALL (row) {
// write logic
} IN [n CONCURRENT] TRANSACTIONS
[OF batchSize ROW[S]]
[ON ERROR {CONTINUE | BREAK | FAIL | RETRY [FOR duration SECONDS] [THEN {CONTINUE|BREAK|FAIL}]}]
[REPORT STATUS AS statusVar]cypher
CALL (row) {
// 写入逻辑
} IN [n CONCURRENT] TRANSACTIONS
[OF batchSize ROW[S]]
[ON ERROR {CONTINUE | BREAK | FAIL | RETRY [FOR duration SECONDS] [THEN {CONTINUE|BREAK|FAIL}]}]
[REPORT STATUS AS statusVar]ON ERROR modes
ON ERROR模式
| Mode | Behavior | Use when |
|---|---|---|
| Default. Rolls back entire outer tx on first error | All-or-nothing strict import |
| Skips failed batch, continues remaining batches | Resilient bulk load — track errors via REPORT STATUS |
| Stops after first failed batch; keeps completed work | Semi-strict: stop early, keep successful batches |
| Exponential backoff retry (default 30s) + fallback | Concurrent writes with deadlock risk |
ON ERROR CONTINUE/BREAKON ERROR FAILREPORT STATUS AS| 模式 | 行为 | 使用场景 |
|---|---|---|
| 默认模式。首次出错时回滚整个外部事务 | 要求全量成功的严格导入场景 |
| 跳过失败批次,继续处理剩余批次 | 具备容错能力的批量加载——通过REPORT STATUS跟踪错误 |
| 首次失败后停止处理,保留已完成的工作 | 半严格场景:提前停止,保留成功批次的数据 |
| 指数退避重试(默认30秒)+ 降级策略 | 存在死锁风险的并发写入场景 |
ON ERROR CONTINUE/BREAKON ERROR FAILREPORT STATUS ASCONCURRENT TRANSACTIONS (parallel batches)
CONCURRENT TRANSACTIONS(并行批次)
cypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///large.csv' AS row
CALL (row) {
MERGE (p:Person {id: row.id}) SET p.name = row.name
} IN 4 CONCURRENT TRANSACTIONS OF 5000 ROWS
ON ERROR RETRY FOR 30 SECONDS THEN CONTINUE
REPORT STATUS AS sUse CONCURRENT for read-heavy MERGE on non-overlapping key spaces. Risk: deadlocks on overlapping writes → combine with .
ON ERROR RETRYcypher
CYPHER 25
LOAD CSV WITH HEADERS FROM 'file:///large.csv' AS row
CALL (row) {
MERGE (p:Person {id: row.id}) SET p.name = row.name
} IN 4 CONCURRENT TRANSACTIONS OF 5000 ROWS
ON ERROR RETRY FOR 30 SECONDS THEN CONTINUE
REPORT STATUS AS s在非重叠键空间上执行读密集型MERGE时使用CONCURRENT。风险:重叠写入可能导致死锁→需结合使用。
ON ERROR RETRYREPORT STATUS columns
REPORT STATUS列
| Column | Type | Meaning |
|---|---|---|
| BOOLEAN | Batch transaction started |
| BOOLEAN | Batch committed successfully |
| STRING | Transaction ID |
| STRING or null | Error detail if batch failed |
| 列 | 类型 | 含义 |
|---|---|---|
| BOOLEAN | 批次事务是否已启动 |
| BOOLEAN | 批次是否已成功提交 |
| STRING | 事务ID |
| STRING或null | 批次失败时的错误详情 |
Batch size guidance
批次大小指导
| Row count | Recommended batch size | Notes |
|---|---|---|
| < 100k | 10 000 | Default is fine |
| 100k – 1M | 10 000 – 50 000 | Monitor heap; increase if fast |
| 1M – 10M | 50 000 – 100 000 | Enable CONCURRENT if CPUs available |
| > 10M online | 50 000 | Consider neo4j-admin import instead |
| Relationship import | 5 000 | Lower — each batch does 2x MATCH |
| 行数 | 推荐批次大小 | 说明 |
|---|---|---|
| < 10万 | 10000 | 默认值即可 |
| 10万 – 100万 | 10000 – 50000 | 监控堆内存;若速度快可增大批次 |
| 100万 – 1000万 | 50000 – 100000 | 若CPU充足,启用CONCURRENT |
| > 1000万(在线) | 50000 | 考虑改用neo4j-admin import |
| 关系导入 | 5000 | 较小批次——每个批次需执行2次MATCH操作 |
neo4j-admin import (Offline Bulk Load)
neo4j-admin import(离线批量加载)
Fastest method: ~3 min for 31M nodes / 78M rels on SSD. DB must be stopped or non-existent.
最快的导入方法:在SSD上加载3100万节点/7800万关系约需3分钟。数据库必须处于停止状态或尚未创建。
Command structure
命令结构
bash
neo4j-admin database import full \
--nodes=Person="persons_header.csv,persons.csv" \
--nodes=Movie="movies_header.csv,movies.csv" \
--relationships=ACTED_IN="acted_in_header.csv,acted_in.csv" \
--relationships=DIRECTED="directed_header.csv,directed.csv" \
--delimiter=, \
--id-type=STRING \
--bad-tolerance=0 \
--threads=$(nproc) \
--high-parallel-io=on \
neo4jFor SSDs: always set . For large graphs (>34B nodes/rels): .
--high-parallel-io=on--format=blockDry run (2026.02+) — validate without writing:
bash
neo4j-admin database import full --dry-run ...bash
neo4j-admin database import full \
--nodes=Person="persons_header.csv,persons.csv" \
--nodes=Movie="movies_header.csv,movies.csv" \
--relationships=ACTED_IN="acted_in_header.csv,acted_in.csv" \
--relationships=DIRECTED="directed_header.csv,directed.csv" \
--delimiter=, \
--id-type=STRING \
--bad-tolerance=0 \
--threads=$(nproc) \
--high-parallel-io=on \
neo4j对于SSD:务必设置。对于大型图(>340亿节点/关系):使用。
--high-parallel-io=on--format=block试运行(2026.02+)——验证配置但不写入数据:
bash
neo4j-admin database import full --dry-run ...Node header file format
节点头文件格式
undefinedundefinedpersons_header.csv
persons_header.csv
personId:ID,name,born:int,score:float,active:boolean,:LABEL
undefinedpersonId:ID,name,born:int,score:float,active:boolean,:LABEL
undefinedpersons.csv (data file — no header row)
persons.csv(数据文件——无表头行)
p001,Alice,1985,9.2,true,Person
p002,Bob,1990,7.1,false,Person
| Field | Meaning |
|---|---|
| `:ID` | Unique ID for relationship wiring (not stored as property by default) |
| `:ID(Group)` | Scoped ID space — use when node types share IDs |
| `:LABEL` | One or more labels; semicolon-separated: `Person;Employee` |
| `prop:int` | Typed property; types: `int long float double boolean byte short string` |
| `prop:date` | Temporal: `date localtime time localdatetime datetime duration` |
| `prop:int[]` | Array — semicolon-separated values in cell: `1;2;3` |
| `prop:vector` | Float vector (2025.10+) — semicolon-separated coordinates |p001,Alice,1985,9.2,true,Person
p002,Bob,1990,7.1,false,Person
| 字段 | 含义 |
|---|---|
| `:ID` | 用于关系关联的唯一ID(默认不作为属性存储) |
| `:ID(Group)` | 作用域ID空间——当不同节点类型共享ID时使用 |
| `:LABEL` | 一个或多个标签;用分号分隔:`Person;Employee` |
| `prop:int` | 带类型的属性;支持类型:`int long float double boolean byte short string` |
| `prop:date` | 时间类型:`date localtime time localdatetime datetime duration` |
| `prop:int[]` | 数组——单元格中用分号分隔值:`1;2;3` |
| `prop:vector` | 浮点向量(2025.10+)——单元格中用分号分隔坐标 |Relationship header file format
关系头文件格式
undefinedundefinedacted_in_header.csv
acted_in_header.csv
:START_ID(Person),:END_ID(Movie),role,:TYPE
undefined:START_ID(Person),:END_ID(Movie),role,:TYPE
undefinedacted_in.csv
acted_in.csv
p001,tt0133093,Neo,ACTED_IN
p002,tt0133093,Morpheus,ACTED_IN
`:START_ID` / `:END_ID` must reference the same `:ID` group as the node files.p001,tt0133093,Neo,ACTED_IN
p002,tt0133093,Morpheus,ACTED_IN
`:START_ID` / `:END_ID`必须与节点文件中的`:ID`组对应。Key flags
关键参数
| Flag | Default | Notes |
|---|---|---|
| | Single char or |
| | |
| | Set |
| CPU count | Set explicitly on shared hosts |
| 90% RAM | Reduce if other services share host |
| | Set |
| | |
| false | Required if DB already exists |
| false | 2026.02+ — validate without writing |
| 参数 | 默认值 | 说明 |
|---|---|---|
| | 单个字符或 |
| | |
| | 生产环境严格导入时设置为 |
| CPU核心数 | 在共享主机上需显式设置 |
| 90%内存 | 若主机上运行其他服务,需调低该值 |
| | SSD/NVMe存储时设置为 |
| | 超过340亿节点/关系时使用 |
| false | 若数据库已存在,需设置该参数 |
| false | 2026.02+——验证配置但不写入数据 |
Schema file (--schema) [Enterprise, block format]
模式文件(--schema)[企业版,block格式]
Pass a Cypher file with / statements; executed automatically after import completes. Constraints are created first (correct order enforced). File paths can be local or remote (, , ).
CREATE CONSTRAINTCREATE INDEXs3://gs://https://bash
neo4j-admin database import full \
--format=block \
--schema=schema.cypher \
--nodes=Person="persons_header.csv,persons.csv" \
neo4jcypher
// schema.cypher
CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
CREATE CONSTRAINT movie_id IF NOT EXISTS FOR (n:Movie) REQUIRE n.id IS UNIQUE;
CREATE RANGE INDEX person_email IF NOT EXISTS FOR (n:Person) ON (n.email);
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);For incremental import, / are also supported [2025.02+] — used to remove indexes before the merge phase and recreate them after for faster writes.
DROP CONSTRAINTDROP INDEX传入包含 / 语句的Cypher文件;导入完成后会自动执行。约束会优先创建(强制执行正确顺序)。文件路径可以是本地或远程(, , )。
CREATE CONSTRAINTCREATE INDEXs3://gs://https://bash
neo4j-admin database import full \
--format=block \
--schema=schema.cypher \
--nodes=Person="persons_header.csv,persons.csv" \
neo4jcypher
// schema.cypher
CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE;
CREATE CONSTRAINT movie_id IF NOT EXISTS FOR (n:Movie) REQUIRE n.id IS UNIQUE;
CREATE RANGE INDEX person_email IF NOT EXISTS FOR (n:Person) ON (n.email);
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);对于增量导入,2025.02+版本也支持 / ——用于在合并阶段前删除索引,合并完成后重新创建以提高写入速度。
DROP CONSTRAINTDROP INDEXIncremental import (Enterprise only)
增量导入(仅企业版)
Three-phase process — use when DB must stay online during import preparation:
bash
undefined三阶段流程——适用于导入准备期间数据库需保持在线的场景:
bash
undefinedPhase 1: Prepare staging area
阶段1:准备暂存区
neo4j-admin database import incremental --stage=prepare
--nodes=Person=persons_header.csv,delta.csv --force neo4j
--nodes=Person=persons_header.csv,delta.csv --force neo4j
neo4j-admin database import incremental --stage=prepare
--nodes=Person=persons_header.csv,delta.csv --force neo4j
--nodes=Person=persons_header.csv,delta.csv --force neo4j
Phase 2: Build indexes (DB can be read-only during this phase)
阶段2:构建索引(此阶段数据库可处于只读状态)
neo4j-admin database import incremental --stage=build neo4j
neo4j-admin database import incremental --stage=build neo4j
Phase 3: Merge into live database (brief write-lock)
阶段3:合并到在线数据库(短暂写锁)
neo4j-admin database import incremental --stage=merge neo4j
Requires Enterprise Edition + `block` store format.
---neo4j-admin database import incremental --stage=merge neo4j
需使用企业版+`block`存储格式。
---APOC Patterns (when APOC is available)
APOC模式(已安装APOC时使用)
Verify first: — if fails, use LOAD CSV or driver instead.
RETURN apoc.version()先验证:——若执行失败,请改用LOAD CSV或驱动。
RETURN apoc.version()apoc.periodic.iterate — batch-process existing graph data
apoc.periodic.iterate — 批量处理现有图数据
cypher
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE NOT (p)-[:HAS_ACCOUNT]->() RETURN p",
"CREATE (p)-[:HAS_ACCOUNT]->(a:Account {id: randomUUID()})",
{batchSize: 10000, parallel: false, retries: 2}
) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages| Config key | Default | Notes |
|---|---|---|
| 10000 | Rows per inner transaction |
| false | Enable for non-overlapping writes; risk: deadlocks |
| 0 | Retry failed batches N times with 100ms delay |
Prefer (native Cypher) over for new code — it has , , and built in without APOC dependency.
CALL IN TRANSACTIONSapoc.periodic.iterateREPORT STATUSCONCURRENTRETRYcypher
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE NOT (p)-[:HAS_ACCOUNT]->() RETURN p",
"CREATE (p)-[:HAS_ACCOUNT]->(a:Account {id: randomUUID()})",
{batchSize: 10000, parallel: false, retries: 2}
) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages| 配置项 | 默认值 | 说明 |
|---|---|---|
| 10000 | 内部事务处理的行数 |
| false | 非重叠写入场景可启用;风险:死锁 |
| 0 | 失败批次重试N次,每次间隔100ms |
对于新代码,优先使用(原生Cypher)而非——它内置了、和功能,无需依赖APOC。
CALL IN TRANSACTIONSapoc.periodic.iterateREPORT STATUSCONCURRENTRETRYapoc.load.csv — load with config options
apoc.load.csv — 带配置选项的加载
cypher
CALL apoc.load.csv('file:///persons.csv', {
header: true,
sep: ',',
skip: 1,
limit: 1000000
}) YIELD lineNo, map, list
CALL (map) {
MERGE (p:Person {id: map.id}) SET p.name = map.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUEcypher
CALL apoc.load.csv('file:///persons.csv', {
header: true,
sep: ',',
skip: 1,
limit: 1000000
}) YIELD lineNo, map, list
CALL (map) {
MERGE (p:Person {id: map.id}) SET p.name = map.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUEapoc.load.json — load JSON from file or URL
apoc.load.json — 从文件或URL加载JSON
cypher
CALL apoc.load.json('https://api.example.com/persons') YIELD value
CALL (value) {
MERGE (p:Person {id: value.id}) SET p.name = value.name
} IN TRANSACTIONS OF 1000 ROWS ON ERROR CONTINUEcypher
CALL apoc.load.json('https://api.example.com/persons') YIELD value
CALL (value) {
MERGE (p:Person {id: value.id}) SET p.name = value.name
} IN TRANSACTIONS OF 10000 ROWS ON ERROR CONTINUEDriver Batch Write Pattern
驱动批量写入模式
Use when source is not a file (API responses, DB migrations). Collect into (10 000) lists, call per batch. ~10x faster than row-at-a-time. → Python + JS examples
BATCH_SIZEUNWIND $rows AS row MERGE ...适用于数据源不是文件的场景(API响应、数据库迁移)。将数据收集为(10000)的列表,每个批次调用。速度比逐行写入快约10倍。→ Python + JS示例
BATCH_SIZEUNWIND $rows AS row MERGE ...MCP Tool Usage
MCP工具使用
| Operation | MCP tool | Notes |
|---|---|---|
| | Always inspect before import |
| | Gate: show planned constraint, confirm |
| LOAD CSV / CALL IN TRANSACTIONS | | Gate: show row count + Cypher, confirm |
| Verify counts | | Post-import: |
| Poll index state | | Poll until all |
Write gate — before any bulk write via MCP, show:
- Query + affected labels
- Estimated row count from source
- plan
EXPLAIN
Wait for user confirmation. Never auto-execute or without confirmation.
CALL IN TRANSACTIONSCREATE CONSTRAINTAlways pass param if not default: .
database{"code": "...", "database": "neo4j"}| 操作 | MCP工具 | 说明 |
|---|---|---|
| | 导入前务必检查 |
| | 前置检查:展示计划创建的约束,确认后执行 |
| LOAD CSV / CALL IN TRANSACTIONS | | 前置检查:展示行数统计+Cypher语句,确认后执行 |
| 验证计数 | | 导入后: |
| 轮询索引状态 | | 轮询直到所有索引 |
写入前置检查——通过MCP执行任何批量写入前,需展示:
- 查询语句+受影响的标签
- 数据源的预估行数
- 执行计划
EXPLAIN
等待用户确认。未经确认,切勿自动执行或。
CALL IN TRANSACTIONSCREATE CONSTRAINT若使用非默认数据库,务必传入参数:。
database{"code": "...", "database": "neo4j"}Common Errors
常见错误
| Error | Cause | Fix |
|---|---|---|
| | Move file to |
| MERGE key resolved to null | Validate |
| Null column fed to non-null-safe fn | Replace |
| Duplicate source IDs | Dedup source CSV; use |
| Heap overflow / OutOfMemoryError | Batch too large or file too large | Reduce batch size; switch to |
| | Replace |
neo4j-admin: | Wrong header format or type mismatch | Check |
| neo4j-admin: import fails silently | | Set |
| Index not used during MERGE | Constraint not created before import | Drop data, create constraint, re-import |
| Relationship import missing nodes | Relationships imported before nodes | Always import ALL node files before ANY relationship files |
| 错误 | 原因 | 修复方案 |
|---|---|---|
| | 将文件移动到 |
| MERGE键解析为null | MERGE前验证 |
| 空列传入了非空安全函数 | 将 |
| 数据源存在重复ID | 对源CSV去重;使用 |
| 堆溢出 / OutOfMemoryError | 批次过大或文件过大 | 减小批次大小;改用 |
| 使用了 | 将 |
neo4j-admin: | 头文件格式错误或类型不匹配 | 检查 |
| neo4j-admin: 导入静默失败 | 2025.12前 | 设置 |
| MERGE未使用索引 | 导入前未创建约束 | 删除数据,创建约束,重新导入 |
| 关系导入缺失节点 | 先导入了关系再导入节点 | 务必先导入所有节点文件,再导入任何关系文件 |
Post-Import Validation
导入后验证
After import completes — run all:
cypher
// Row counts per label
MATCH (n:Person) RETURN count(n) AS persons;
MATCH ()-[:KNOWS]->() RETURN count(*) AS knows_rels;
// After import: create additional non-unique indexes (populated async)
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);
CREATE RANGE INDEX person_born IF NOT EXISTS FOR (n:Person) ON (n.born);
// Poll population — wait until populationPercent = 100 before opening to queries
SHOW INDEXES YIELD name, state, populationPercent
WHERE state <> 'ONLINE' OR populationPercent < 100
RETURN name, state, populationPercent
ORDER BY populationPercent;
// Spot check: null keys = import bug
MATCH (p:Person) WHERE p.id IS NULL RETURN count(p) AS missing_id;Do NOT run production queries until all indexes are ONLINE.
导入完成后,需执行以下所有操作:
cypher
// 按标签统计行数
MATCH (n:Person) RETURN count(n) AS persons;
MATCH ()-[:KNOWS]->() RETURN count(*) AS knows_rels;
// 导入后:创建额外的非唯一索引(异步填充)
CREATE TEXT INDEX movie_title IF NOT EXISTS FOR (n:Movie) ON (n.title);
CREATE RANGE INDEX person_born IF NOT EXISTS FOR (n:Person) ON (n.born);
// 轮询填充进度——所有索引populationPercent = 100后再开放查询
SHOW INDEXES YIELD name, state, populationPercent
WHERE state <> 'ONLINE' OR populationPercent < 100
RETURN name, state, populationPercent
ORDER BY populationPercent;
// 抽样检查:空键表示导入存在问题
MATCH (p:Person) WHERE p.id IS NULL RETURN count(p) AS missing_id;所有索引处于ONLINE状态前,请勿运行生产查询。
References
参考资料
- LOAD CSV — Cypher Manual 25
- CALL IN TRANSACTIONS — Cypher Manual
- neo4j-admin database import
- APOC periodic execution
- APOC load procedures
- GraphAcademy: Importing CSV Data
- Indexes and constraints — types, MERGE lock semantics, import pre-flight
- Data Importer GUI — when to use, Aura access, multi-pass, gotchas
- Post-import refactoring — split lists, extract nodes, add labels, FK validation
- LOAD CSV — Cypher手册25
- CALL IN TRANSACTIONS — Cypher手册
- neo4j-admin database import
- APOC周期性执行
- APOC加载过程
- GraphAcademy: 导入CSV数据
- 索引与约束——类型、MERGE锁语义、导入预检
- Data Importer GUI——适用场景、Aura访问、多轮导入、注意事项
- 导入后重构——拆分列表、提取节点、添加标签、外键验证
Checklist
检查清单
- Uniqueness constraints created before any MERGE-based import
- APOC availability verified if using procedures
apoc.* - Target confirmed as PRIMARY (not replica)
- Source files validated: UTF-8 encoding, expected row count, no BOM
- LOAD CSV uses /
toIntegerOrNull()— never baretoFloatOrNull()/toInteger()toFloat() - applied where empty string ≠ null
nullIf(row.x, '') - used (not
CALL IN TRANSACTIONS)USING PERIODIC COMMIT - +
ON ERROR CONTINUEfor production loadsREPORT STATUS - Node import completed before relationship import
- neo4j-admin: set;
--bad-tolerance=0for SSD--high-parallel-io=on - Post-import: row counts match source; all indexes ONLINE
- Write execution gate applied (MCP): showed query + estimate, got confirmation
- Credentials in ;
.envin.env.gitignore
- 所有基于MERGE的导入前已创建唯一性约束
- 使用过程前已验证APOC可用性
apoc.* - 已确认目标数据库为主节点(不是副本)
- 已验证源文件:UTF-8编码、行数符合预期、无BOM
- LOAD CSV使用了/
toIntegerOrNull()——从未使用裸toFloatOrNull()/toInteger()toFloat() - 对空字符串≠null的字段应用了
nullIf(row.x, '') - 使用了(而非
CALL IN TRANSACTIONS)USING PERIODIC COMMIT - 生产环境加载使用了+
ON ERROR CONTINUEREPORT STATUS - 节点导入完成后才开始关系导入
- neo4j-admin设置了;SSD存储时设置了
--bad-tolerance=0--high-parallel-io=on - 导入后:行数与数据源匹配;所有索引处于ONLINE状态
- 已应用写入执行前置检查(MCP):展示查询+预估行数,获得用户确认
- 凭据存储在中;
.env已加入.env.gitignore