database-migrator

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Database Migrator

数据库迁移工具

A production-grade database migration system that moves schemas, data, and logic between database providers. This skill reads the full source database schema, analyzes every table, column, index, constraint, trigger, stored procedure, and view, then generates executable migration scripts for the target provider. It validates the migration with row counts and data checksums, and produces a comprehensive migration-plan.md with step-by-step instructions, rollback procedures, and estimated downtime.
This is not a schema designer. It is a cross-provider migration engine that handles the hard parts: data type incompatibilities, provider-specific SQL dialects, foreign key ordering, sequence/auto-increment translation, trigger rewrites, and stored procedure conversion.
一款生产级别的数据库迁移系统,可在不同数据库服务商之间迁移Schema、数据和逻辑。该工具会读取源数据库的完整Schema,分析每张表、列、索引、约束、触发器、存储过程和视图,然后为目标服务商生成可执行的迁移脚本。它通过行数统计和数据校验和验证迁移结果,并生成包含分步说明、回滚流程和预估停机时间的全面migration-plan.md文档。
这不是Schema设计工具,而是跨服务商迁移引擎,负责处理最复杂的部分:数据类型不兼容、服务商特定SQL方言、外键创建顺序、序列/自增字段转换、触发器重写以及存储过程转换。

Supported Migration Paths

支持的迁移路径

Source ProviderTarget ProviderComplexity
PostgreSQLMySQLMedium
PostgreSQLSupabase (Postgres)Low
PostgreSQLPlanetScale (MySQL)Medium-High
PostgreSQLMongoDBHigh
MySQLPostgreSQLMedium
MySQLSupabase (Postgres)Medium
MySQLPlanetScale (MySQL)Low
MySQLMongoDBHigh
Supabase (Postgres)PostgreSQLLow
Supabase (Postgres)MySQLMedium
Supabase (Postgres)PlanetScale (MySQL)Medium-High
Supabase (Postgres)MongoDBHigh
PlanetScale (MySQL)PostgreSQLMedium
PlanetScale (MySQL)Supabase (Postgres)Medium
PlanetScale (MySQL)MySQLLow
PlanetScale (MySQL)MongoDBHigh
MongoDBPostgreSQLHigh
MongoDBMySQLHigh
MongoDBSupabase (Postgres)High
MongoDBPlanetScale (MySQL)High
源服务商目标服务商复杂度
PostgreSQLMySQL中等
PostgreSQLSupabase (Postgres)
PostgreSQLPlanetScale (MySQL)中高
PostgreSQLMongoDB
MySQLPostgreSQL中等
MySQLSupabase (Postgres)中等
MySQLPlanetScale (MySQL)
MySQLMongoDB
Supabase (Postgres)PostgreSQL
Supabase (Postgres)MySQL中等
Supabase (Postgres)PlanetScale (MySQL)中高
Supabase (Postgres)MongoDB
PlanetScale (MySQL)PostgreSQL中等
PlanetScale (MySQL)Supabase (Postgres)中等
PlanetScale (MySQL)MySQL
PlanetScale (MySQL)MongoDB
MongoDBPostgreSQL
MongoDBMySQL
MongoDBSupabase (Postgres)
MongoDBPlanetScale (MySQL)

Complexity Notes

复杂度说明

  • Low: Same underlying engine (e.g., Postgres to Supabase). Mostly connection string and permission changes.
  • Medium: Same paradigm, different dialect (e.g., Postgres to MySQL). Requires data type mapping and SQL dialect translation.
  • Medium-High: Different dialect plus provider constraints (e.g., PlanetScale has no foreign keys at the database level).
  • High: Paradigm shift (e.g., relational to document or vice versa). Requires schema redesign, not just translation.
  • 低复杂度:底层引擎相同(例如PostgreSQL到Supabase)。主要仅需修改连接字符串和权限设置。
  • 中等复杂度:范式相同但方言不同(例如PostgreSQL到MySQL)。需要进行数据类型映射和SQL方言转换。
  • 中高复杂度:方言不同且存在服务商约束(例如PlanetScale不支持数据库级外键)。
  • 高复杂度:范式转换(例如关系型数据库到文档型数据库或反之)。需要重新设计Schema,而非简单转换。

When to Use

使用场景

  • You are moving a production database from one provider to another
  • You are migrating from a self-hosted database to a managed service (or vice versa)
  • You need to replicate a schema across providers for multi-cloud or disaster recovery
  • You are consolidating multiple databases into a single provider
  • You need to move from a relational database to MongoDB (or vice versa) with a complete data model translation
  • You need a validated, auditable migration plan before executing anything in production
  • 将生产数据库从一个服务商迁移到另一个
  • 将自托管数据库迁移到托管服务(或反之)
  • 需要跨服务商复制Schema以实现多云部署或灾难恢复
  • 需将多个数据库合并到单一服务商
  • 需要从关系型数据库迁移到MongoDB(或反之)并完成完整数据模型转换
  • 在生产环境执行任何操作前,需要经过验证、可审计的迁移计划

When NOT to Use

非适用场景

  • You are designing a new database schema from scratch (use database-schema-designer instead)
  • You are migrating application code between frameworks (use full-codebase-migrator instead)
  • You only need to change a few columns or add a table to an existing database (just write the ALTER statements directly)
  • You need real-time replication or CDC (change data capture) -- this skill generates point-in-time migration scripts, not streaming pipelines
  • 从零开始设计新数据库Schema(请使用database-schema-designer)
  • 在不同框架之间迁移应用代码(请使用full-codebase-migrator)
  • 仅需修改现有数据库的少量列或添加表(直接编写ALTER语句即可)
  • 需要实时复制或CDC(变更数据捕获)——该工具生成的是时点迁移脚本,而非流式管道

Architecture

架构

You (Commander)
 |
 |-- Phase 1: Source Schema Discovery
 |    |-- Connect to source (connection string or dump file)
 |    |-- Extract full schema: tables, columns, types, defaults, constraints
 |    |-- Extract indexes, foreign keys, unique constraints, check constraints
 |    |-- Extract triggers, stored procedures, functions, views
 |    |-- Extract sequences, enums, custom types
 |    |-- Extract row counts per table
 |    |-- Extract sample data for type inference (MongoDB)
 |
 |-- Phase 2: Schema Analysis and Mapping
 |    |-- Map source data types to target data types
 |    |-- Identify incompatible features (provider-specific)
 |    |-- Resolve foreign key creation order (topological sort)
 |    |-- Plan trigger and stored procedure conversion
 |    |-- Identify data that needs transformation
 |    |-- Flag potential data loss or precision changes
 |
 |-- Phase 3: Migration Script Generation
 |    |-- Generate DDL scripts (CREATE TABLE, INDEX, etc.)
 |    |-- Generate DML scripts (INSERT, data transformation)
 |    |-- Generate trigger and stored procedure translations
 |    |-- Generate validation queries (row counts, checksums)
 |    |-- Generate rollback scripts (DROP, reverse migration)
 |
 |-- Phase 4: Validation Plan
 |    |-- Row count comparison queries
 |    |-- Data checksum queries (MD5/SHA256 of key columns)
 |    |-- Foreign key integrity checks
 |    |-- Index existence verification
 |    |-- Trigger and procedure existence verification
 |    |-- Sample data spot-checks
 |
 |-- Phase 5: migration-plan.md Generation
 |    |-- Executive summary
 |    |-- Step-by-step execution guide
 |    |-- Rollback procedures
 |    |-- Estimated downtime
 |    |-- Risk assessment
 |    |-- Pre-migration checklist
 |    |-- Post-migration verification
用户(指令发起方)
 |
 |-- 阶段1:源Schema发现
 |    |-- 连接到源数据库(通过连接字符串或转储文件)
 |    |-- 提取完整Schema:表、列、类型、默认值、约束
 |    |-- 提取索引、外键、唯一约束、检查约束
 |    |-- 提取触发器、存储过程、函数、视图
 |    |-- 提取序列、枚举、自定义类型
 |    |-- 提取每张表的行数
 |    |-- 提取样本数据用于类型推断(MongoDB)
 |
 |-- 阶段2:Schema分析与映射
 |    |-- 将源数据类型映射到目标数据类型
 |    |-- 识别不兼容特性(服务商特定)
 |    |-- 解决外键创建顺序问题(拓扑排序)
 |    |-- 规划触发器和存储过程转换
 |    |-- 识别需要转换的数据
 |    |-- 标记潜在的数据丢失或精度变化
 |
 |-- 阶段3:迁移脚本生成
 |    |-- 生成DDL脚本(CREATE TABLE、INDEX等)
 |    |-- 生成DML脚本(INSERT、数据转换)
 |    |-- 生成触发器和存储过程的转换代码
 |    |-- 生成验证查询(行数统计、校验和)
 |    |-- 生成回滚脚本(DROP、反向迁移)
 |
 |-- 阶段4:验证计划
 |    |-- 行数对比查询
 |    |-- 数据校验和查询(关键列的MD5/SHA256)
 |    |-- 外键完整性检查
 |    |-- 索引存在性验证
 |    |-- 触发器和存储过程存在性验证
 |    |-- 样本数据抽查
 |
 |-- 阶段5:生成migration-plan.md
 |    |-- 执行摘要
 |    |-- 分步执行指南
 |    |-- 回滚流程
 |    |-- 预估停机时间
 |    |-- 风险评估
 |    |-- 迁移前检查清单
 |    |-- 迁移后验证

Execution Protocol

执行流程

Follow these steps precisely when this skill is invoked.
调用该工具时,请严格遵循以下步骤。

Step 0: Gather Migration Parameters

步骤0:收集迁移参数

Before doing any analysis, establish these parameters with the user:
  1. Source provider and version -- Which database engine and version? (e.g., PostgreSQL 15, MySQL 8.0, MongoDB 7.0)
  2. Target provider and version -- Where is the data going? (e.g., Supabase, PlanetScale, self-hosted Postgres 16)
  3. Connection method -- Will you connect live, or work from a schema dump file / migration files?
  4. Schema scope -- All schemas/databases, or specific ones? Which tables to include/exclude?
  5. Data migration -- Schema only, or schema + data? If data, full or partial (e.g., last 90 days)?
  6. Downtime tolerance -- Zero-downtime required, or is a maintenance window acceptable? How long?
  7. Data volume -- Approximate total size (GB) and largest table row count.
  8. Application dependencies -- What applications connect to this database? Do they need code changes?
  9. Output location -- Where to write migration scripts and migration-plan.md. Default to current directory.
If the user has already specified these in their prompt, skip the questions and proceed.
在进行任何分析之前,需与用户确认以下参数:
  1. 源服务商及版本:使用的数据库引擎和版本?(例如PostgreSQL 15、MySQL 8.0、MongoDB 7.0)
  2. 目标服务商及版本:数据将迁移到哪里?(例如Supabase、PlanetScale、自托管PostgreSQL 16)
  3. 连接方式:将直接连接到数据库,还是基于Schema转储文件/迁移文件操作?
  4. Schema范围:所有Schema/数据库,还是特定的?需要包含或排除哪些表?
  5. 数据迁移范围:仅Schema,还是Schema+数据?如果包含数据,是全部还是部分(例如最近90天的数据)?
  6. 停机时间容忍度:是否需要零停机,还是可接受维护窗口?窗口时长?
  7. 数据量:预估总大小(GB)以及最大表的行数。
  8. 应用依赖:哪些应用连接到该数据库?是否需要修改代码?
  9. 输出位置:迁移脚本和migration-plan.md的写入位置。默认当前目录。
如果用户已在提示中指定这些参数,可跳过提问直接执行。

Step 1: Source Schema Discovery

步骤1:源Schema发现

This is the foundation of the entire migration. Extract everything from the source database.
这是整个迁移的基础。需从源数据库提取所有信息。

1a. Schema Extraction -- Relational Databases (Postgres, MySQL, Supabase, PlanetScale)

1a. Schema提取——关系型数据库(PostgreSQL、MySQL、Supabase、PlanetScale)

Extract the following using information_schema queries or provider-specific catalog queries.
Tables and Columns:
For PostgreSQL / Supabase:
sql
SELECT
    t.table_schema,
    t.table_name,
    c.column_name,
    c.ordinal_position,
    c.data_type,
    c.udt_name,
    c.character_maximum_length,
    c.numeric_precision,
    c.numeric_scale,
    c.is_nullable,
    c.column_default,
    c.is_identity,
    c.identity_generation,
    pgd.description AS column_comment
FROM information_schema.tables t
JOIN information_schema.columns c
    ON t.table_schema = c.table_schema AND t.table_name = c.table_name
LEFT JOIN pg_catalog.pg_statio_all_tables psat
    ON psat.schemaname = t.table_schema AND psat.relname = t.table_name
LEFT JOIN pg_catalog.pg_description pgd
    ON pgd.objoid = psat.relid AND pgd.objsubid = c.ordinal_position
WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema')
    AND t.table_type = 'BASE TABLE'
ORDER BY t.table_schema, t.table_name, c.ordinal_position;
For MySQL / PlanetScale:
sql
SELECT
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    c.COLUMN_NAME,
    c.ORDINAL_POSITION,
    c.DATA_TYPE,
    c.COLUMN_TYPE,
    c.CHARACTER_MAXIMUM_LENGTH,
    c.NUMERIC_PRECISION,
    c.NUMERIC_SCALE,
    c.IS_NULLABLE,
    c.COLUMN_DEFAULT,
    c.EXTRA,
    c.COLUMN_COMMENT
FROM information_schema.TABLES t
JOIN information_schema.COLUMNS c
    ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME
WHERE t.TABLE_SCHEMA = DATABASE()
    AND t.TABLE_TYPE = 'BASE TABLE'
ORDER BY t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;
Primary Keys:
For PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    kcu.column_name,
    kcu.ordinal_position
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name
    AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY tc.table_schema, tc.table_name, kcu.ordinal_position;
For MySQL / PlanetScale:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    kcu.COLUMN_NAME,
    kcu.ORDINAL_POSITION
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.KEY_COLUMN_USAGE kcu
    ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
    AND tc.TABLE_NAME = kcu.TABLE_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME, kcu.ORDINAL_POSITION;
Foreign Keys:
For PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    kcu.column_name,
    ccu.table_schema AS foreign_table_schema,
    ccu.table_name AS foreign_table_name,
    ccu.column_name AS foreign_column_name,
    rc.update_rule,
    rc.delete_rule
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
    ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema
JOIN information_schema.referential_constraints rc
    ON tc.constraint_name = rc.constraint_name AND tc.table_schema = rc.constraint_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY tc.table_schema, tc.table_name;
For MySQL / PlanetScale:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    kcu.COLUMN_NAME,
    kcu.REFERENCED_TABLE_SCHEMA,
    kcu.REFERENCED_TABLE_NAME,
    kcu.REFERENCED_COLUMN_NAME,
    rc.UPDATE_RULE,
    rc.DELETE_RULE
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.KEY_COLUMN_USAGE kcu
    ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
    AND tc.TABLE_NAME = kcu.TABLE_NAME
JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
    ON tc.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
WHERE tc.CONSTRAINT_TYPE = 'FOREIGN KEY'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME;
Indexes:
For PostgreSQL / Supabase:
sql
SELECT
    schemaname,
    tablename,
    indexname,
    indexdef
FROM pg_indexes
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename, indexname;
For MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME,
    INDEX_NAME,
    NON_UNIQUE,
    SEQ_IN_INDEX,
    COLUMN_NAME,
    INDEX_TYPE,
    SUB_PART,
    EXPRESSION
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;
Check Constraints:
For PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    cc.check_clause
FROM information_schema.table_constraints tc
JOIN information_schema.check_constraints cc
    ON tc.constraint_name = cc.constraint_name AND tc.constraint_schema = cc.constraint_schema
WHERE tc.constraint_type = 'CHECK'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
    AND cc.check_clause NOT LIKE '%IS NOT NULL%'
ORDER BY tc.table_schema, tc.table_name;
For MySQL 8.0+:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    cc.CHECK_CLAUSE
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.CHECK_CONSTRAINTS cc
    ON tc.CONSTRAINT_NAME = cc.CONSTRAINT_NAME AND tc.CONSTRAINT_SCHEMA = cc.CONSTRAINT_SCHEMA
WHERE tc.CONSTRAINT_TYPE = 'CHECK'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME;
Triggers:
For PostgreSQL / Supabase:
sql
SELECT
    trigger_schema,
    trigger_name,
    event_manipulation,
    event_object_schema,
    event_object_table,
    action_statement,
    action_timing,
    action_orientation
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY trigger_schema, event_object_table, trigger_name;
For MySQL / PlanetScale:
sql
SELECT
    TRIGGER_SCHEMA,
    TRIGGER_NAME,
    EVENT_MANIPULATION,
    EVENT_OBJECT_SCHEMA,
    EVENT_OBJECT_TABLE,
    ACTION_STATEMENT,
    ACTION_TIMING,
    ACTION_ORIENTATION
FROM information_schema.TRIGGERS
WHERE TRIGGER_SCHEMA = DATABASE()
ORDER BY TRIGGER_SCHEMA, EVENT_OBJECT_TABLE, TRIGGER_NAME;
Stored Procedures and Functions:
For PostgreSQL / Supabase:
sql
SELECT
    n.nspname AS schema_name,
    p.proname AS function_name,
    pg_get_function_arguments(p.oid) AS arguments,
    pg_get_function_result(p.oid) AS return_type,
    CASE p.prokind
        WHEN 'f' THEN 'FUNCTION'
        WHEN 'p' THEN 'PROCEDURE'
        WHEN 'a' THEN 'AGGREGATE'
        WHEN 'w' THEN 'WINDOW'
    END AS kind,
    l.lanname AS language,
    pg_get_functiondef(p.oid) AS definition
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
JOIN pg_language l ON p.prolang = l.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
ORDER BY n.nspname, p.proname;
For MySQL / PlanetScale:
sql
SELECT
    ROUTINE_SCHEMA,
    ROUTINE_NAME,
    ROUTINE_TYPE,
    DATA_TYPE,
    ROUTINE_DEFINITION,
    EXTERNAL_LANGUAGE
FROM information_schema.ROUTINES
WHERE ROUTINE_SCHEMA = DATABASE()
ORDER BY ROUTINE_SCHEMA, ROUTINE_NAME;
Views:
For PostgreSQL / Supabase:
sql
SELECT
    table_schema,
    table_name AS view_name,
    view_definition
FROM information_schema.views
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY table_schema, table_name;
For MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME AS VIEW_NAME,
    VIEW_DEFINITION
FROM information_schema.VIEWS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_SCHEMA, TABLE_NAME;
Sequences (PostgreSQL / Supabase only):
sql
SELECT
    schemaname,
    sequencename,
    data_type,
    start_value,
    min_value,
    max_value,
    increment_by,
    cycle,
    last_value
FROM pg_sequences
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, sequencename;
Enums (PostgreSQL / Supabase only):
sql
SELECT
    n.nspname AS schema_name,
    t.typname AS enum_name,
    string_agg(e.enumlabel, ', ' ORDER BY e.enumsortorder) AS enum_values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
GROUP BY n.nspname, t.typname
ORDER BY n.nspname, t.typname;
Row Counts:
For PostgreSQL / Supabase (fast estimate):
sql
SELECT
    schemaname,
    relname AS table_name,
    n_live_tup AS estimated_row_count
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC;
For MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME,
    TABLE_ROWS AS estimated_row_count,
    DATA_LENGTH,
    INDEX_LENGTH
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
    AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_ROWS DESC;
使用information_schema查询或服务商特定的目录查询提取以下内容。
表与列:
适用于PostgreSQL / Supabase:
sql
SELECT
    t.table_schema,
    t.table_name,
    c.column_name,
    c.ordinal_position,
    c.data_type,
    c.udt_name,
    c.character_maximum_length,
    c.numeric_precision,
    c.numeric_scale,
    c.is_nullable,
    c.column_default,
    c.is_identity,
    c.identity_generation,
    pgd.description AS column_comment
FROM information_schema.tables t
JOIN information_schema.columns c
    ON t.table_schema = c.table_schema AND t.table_name = c.table_name
LEFT JOIN pg_catalog.pg_statio_all_tables psat
    ON psat.schemaname = t.table_schema AND psat.relname = t.table_name
LEFT JOIN pg_catalog.pg_description pgd
    ON pgd.objoid = psat.relid AND pgd.objsubid = c.ordinal_position
WHERE t.table_schema NOT IN ('pg_catalog', 'information_schema')
    AND t.table_type = 'BASE TABLE'
ORDER BY t.table_schema, t.table_name, c.ordinal_position;
适用于MySQL / PlanetScale:
sql
SELECT
    t.TABLE_SCHEMA,
    t.TABLE_NAME,
    c.COLUMN_NAME,
    c.ORDINAL_POSITION,
    c.DATA_TYPE,
    c.COLUMN_TYPE,
    c.CHARACTER_MAXIMUM_LENGTH,
    c.NUMERIC_PRECISION,
    c.NUMERIC_SCALE,
    c.IS_NULLABLE,
    c.COLUMN_DEFAULT,
    c.EXTRA,
    c.COLUMN_COMMENT
FROM information_schema.TABLES t
JOIN information_schema.COLUMNS c
    ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME
WHERE t.TABLE_SCHEMA = DATABASE()
    AND t.TABLE_TYPE = 'BASE TABLE'
ORDER BY t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;
主键:
适用于PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    kcu.column_name,
    kcu.ordinal_position
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name
    AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY tc.table_schema, tc.table_name, kcu.ordinal_position;
适用于MySQL / PlanetScale:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    kcu.COLUMN_NAME,
    kcu.ORDINAL_POSITION
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.KEY_COLUMN_USAGE kcu
    ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
    AND tc.TABLE_NAME = kcu.TABLE_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME, kcu.ORDINAL_POSITION;
外键:
适用于PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    kcu.column_name,
    ccu.table_schema AS foreign_table_schema,
    ccu.table_name AS foreign_table_name,
    ccu.column_name AS foreign_column_name,
    rc.update_rule,
    rc.delete_rule
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
    ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema
JOIN information_schema.referential_constraints rc
    ON tc.constraint_name = rc.constraint_name AND tc.table_schema = rc.constraint_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY tc.table_schema, tc.table_name;
适用于MySQL / PlanetScale:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    kcu.COLUMN_NAME,
    kcu.REFERENCED_TABLE_SCHEMA,
    kcu.REFERENCED_TABLE_NAME,
    kcu.REFERENCED_COLUMN_NAME,
    rc.UPDATE_RULE,
    rc.DELETE_RULE
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.KEY_COLUMN_USAGE kcu
    ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
    AND tc.TABLE_NAME = kcu.TABLE_NAME
JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
    ON tc.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
    AND tc.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
WHERE tc.CONSTRAINT_TYPE = 'FOREIGN KEY'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME;
索引:
适用于PostgreSQL / Supabase:
sql
SELECT
    schemaname,
    tablename,
    indexname,
    indexdef
FROM pg_indexes
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename, indexname;
适用于MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME,
    INDEX_NAME,
    NON_UNIQUE,
    SEQ_IN_INDEX,
    COLUMN_NAME,
    INDEX_TYPE,
    SUB_PART,
    EXPRESSION
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;
检查约束:
适用于PostgreSQL / Supabase:
sql
SELECT
    tc.table_schema,
    tc.table_name,
    tc.constraint_name,
    cc.check_clause
FROM information_schema.table_constraints tc
JOIN information_schema.check_constraints cc
    ON tc.constraint_name = cc.constraint_name AND tc.constraint_schema = cc.constraint_schema
WHERE tc.constraint_type = 'CHECK'
    AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')
    AND cc.check_clause NOT LIKE '%IS NOT NULL%'
ORDER BY tc.table_schema, tc.table_name;
适用于MySQL 8.0+:
sql
SELECT
    tc.TABLE_SCHEMA,
    tc.TABLE_NAME,
    tc.CONSTRAINT_NAME,
    cc.CHECK_CLAUSE
FROM information_schema.TABLE_CONSTRAINTS tc
JOIN information_schema.CHECK_CONSTRAINTS cc
    ON tc.CONSTRAINT_NAME = cc.CONSTRAINT_NAME AND tc.CONSTRAINT_SCHEMA = cc.CONSTRAINT_SCHEMA
WHERE tc.CONSTRAINT_TYPE = 'CHECK'
    AND tc.TABLE_SCHEMA = DATABASE()
ORDER BY tc.TABLE_SCHEMA, tc.TABLE_NAME;
触发器:
适用于PostgreSQL / Supabase:
sql
SELECT
    trigger_schema,
    trigger_name,
    event_manipulation,
    event_object_schema,
    event_object_table,
    action_statement,
    action_timing,
    action_orientation
FROM information_schema.triggers
WHERE trigger_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY trigger_schema, event_object_table, trigger_name;
适用于MySQL / PlanetScale:
sql
SELECT
    TRIGGER_SCHEMA,
    TRIGGER_NAME,
    EVENT_MANIPULATION,
    EVENT_OBJECT_SCHEMA,
    EVENT_OBJECT_TABLE,
    ACTION_STATEMENT,
    ACTION_TIMING,
    ACTION_ORIENTATION
FROM information_schema.TRIGGERS
WHERE TRIGGER_SCHEMA = DATABASE()
ORDER BY TRIGGER_SCHEMA, EVENT_OBJECT_TABLE, TRIGGER_NAME;
存储过程与函数:
适用于PostgreSQL / Supabase:
sql
SELECT
    n.nspname AS schema_name,
    p.proname AS function_name,
    pg_get_function_arguments(p.oid) AS arguments,
    pg_get_function_result(p.oid) AS return_type,
    CASE p.prokind
        WHEN 'f' THEN 'FUNCTION'
        WHEN 'p' THEN 'PROCEDURE'
        WHEN 'a' THEN 'AGGREGATE'
        WHEN 'w' THEN 'WINDOW'
    END AS kind,
    l.lanname AS language,
    pg_get_functiondef(p.oid) AS definition
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
JOIN pg_language l ON p.prolang = l.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
ORDER BY n.nspname, p.proname;
适用于MySQL / PlanetScale:
sql
SELECT
    ROUTINE_SCHEMA,
    ROUTINE_NAME,
    ROUTINE_TYPE,
    DATA_TYPE,
    ROUTINE_DEFINITION,
    EXTERNAL_LANGUAGE
FROM information_schema.ROUTINES
WHERE ROUTINE_SCHEMA = DATABASE()
ORDER BY ROUTINE_SCHEMA, ROUTINE_NAME;
视图:
适用于PostgreSQL / Supabase:
sql
SELECT
    table_schema,
    table_name AS view_name,
    view_definition
FROM information_schema.views
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY table_schema, table_name;
适用于MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME AS VIEW_NAME,
    VIEW_DEFINITION
FROM information_schema.VIEWS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_SCHEMA, TABLE_NAME;
序列(仅PostgreSQL / Supabase):
sql
SELECT
    schemaname,
    sequencename,
    data_type,
    start_value,
    min_value,
    max_value,
    increment_by,
    cycle,
    last_value
FROM pg_sequences
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, sequencename;
枚举(仅PostgreSQL / Supabase):
sql
SELECT
    n.nspname AS schema_name,
    t.typname AS enum_name,
    string_agg(e.enumlabel, ', ' ORDER BY e.enumsortorder) AS enum_values
FROM pg_type t
JOIN pg_enum e ON t.oid = e.enumtypid
JOIN pg_namespace n ON t.typnamespace = n.oid
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
GROUP BY n.nspname, t.typname
ORDER BY n.nspname, t.typname;
行数统计:
适用于PostgreSQL / Supabase(快速估算):
sql
SELECT
    schemaname,
    relname AS table_name,
    n_live_tup AS estimated_row_count
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC;
适用于MySQL / PlanetScale:
sql
SELECT
    TABLE_SCHEMA,
    TABLE_NAME,
    TABLE_ROWS AS estimated_row_count,
    DATA_LENGTH,
    INDEX_LENGTH
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE()
    AND TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_ROWS DESC;

1b. Schema Extraction -- MongoDB

1b. Schema提取——MongoDB

MongoDB has no enforced schema, so discovery requires collection scanning:
javascript
// List all collections
db.getCollectionNames().forEach(function(collName) {
    print("--- Collection: " + collName + " ---");

    // Row count
    print("Document count: " + db[collName].countDocuments({}));

    // Sample documents for schema inference
    var sample = db[collName].aggregate([{ $sample: { size: 100 } }]).toArray();

    // Infer schema from sample
    var schema = {};
    sample.forEach(function(doc) {
        function inferType(obj, prefix) {
            for (var key in obj) {
                var fullKey = prefix ? prefix + "." + key : key;
                var val = obj[key];
                var type = typeof val;
                if (val === null) type = "null";
                else if (Array.isArray(val)) type = "array";
                else if (val instanceof ObjectId) type = "ObjectId";
                else if (val instanceof Date) type = "Date";
                else if (val instanceof NumberDecimal) type = "Decimal128";
                else if (type === "object") {
                    inferType(val, fullKey);
                    type = "object";
                }
                if (!schema[fullKey]) schema[fullKey] = {};
                schema[fullKey][type] = (schema[fullKey][type] || 0) + 1;
            }
        }
        inferType(doc, "");
    });

    printjson(schema);

    // Indexes
    printjson(db[collName].getIndexes());
});
Also extract:
  • Validators:
    db.getCollectionInfos()
    for JSON Schema validators
  • Capped collections: Size and max document limits
  • Sharding config:
    sh.status()
    if sharded
  • Aggregation pipelines saved as views:
    db.system.views.find()
MongoDB没有强制Schema,因此需要通过扫描集合来发现:
javascript
// 列出所有集合
db.getCollectionNames().forEach(function(collName) {
    print("--- Collection: " + collName + " ---");

    // 行数统计
    print("Document count: " + db[collName].countDocuments({}));

    // 提取样本文档用于Schema推断
    var sample = db[collName].aggregate([{ $sample: { size: 100 } }]).toArray();

    // 从样本中推断Schema
    var schema = {};
    sample.forEach(function(doc) {
        function inferType(obj, prefix) {
            for (var key in obj) {
                var fullKey = prefix ? prefix + "." + key : key;
                var val = obj[key];
                var type = typeof val;
                if (val === null) type = "null";
                else if (Array.isArray(val)) type = "array";
                else if (val instanceof ObjectId) type = "ObjectId";
                else if (val instanceof Date) type = "Date";
                else if (val instanceof NumberDecimal) type = "Decimal128";
                else if (type === "object") {
                    inferType(val, fullKey);
                    type = "object";
                }
                if (!schema[fullKey]) schema[fullKey] = {};
                schema[fullKey][type] = (schema[fullKey][type] || 0) + 1;
            }
        }
        inferType(doc, "");
    });

    printjson(schema);

    // 索引
    printjson(db[collName].getIndexes());
});
还需提取:
  • 验证器:使用
    db.getCollectionInfos()
    获取JSON Schema验证器
  • 固定集合:大小和最大文档限制
  • 分片配置:如果是分片集群,使用
    sh.status()
  • 保存为视图的聚合管道
    db.system.views.find()

1c. Supabase-Specific Extraction

1c. Supabase特定提取

When the source or target is Supabase, also extract:
sql
-- Row Level Security policies
SELECT
    schemaname,
    tablename,
    policyname,
    permissive,
    roles,
    cmd,
    qual,
    with_check
FROM pg_policies
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename, policyname;

-- RLS enabled tables
SELECT
    schemaname,
    tablename,
    rowsecurity
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename;

-- Extensions
SELECT extname, extversion FROM pg_extension ORDER BY extname;

-- Publication/subscription (for realtime)
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
当源或目标是Supabase时,还需提取:
sql
-- 行级安全策略
SELECT
    schemaname,
    tablename,
    policyname,
    permissive,
    roles,
    cmd,
    qual,
    with_check
FROM pg_policies
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename, policyname;

-- 启用RLS的表
SELECT
    schemaname,
    tablename,
    rowsecurity
FROM pg_tables
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename;

-- 扩展
SELECT extname, extversion FROM pg_extension ORDER BY extname;

-- 发布/订阅(用于实时功能)
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

1d. PlanetScale-Specific Considerations

1d. PlanetScale特定注意事项

PlanetScale does not support:
  • Foreign key constraints at the database level (enforced at application level)
  • Stored procedures
  • Triggers
  • Events
When PlanetScale is the target, flag all of these for application-level handling. When PlanetScale is the source, note that foreign key relationships must be inferred from naming conventions and application code.
PlanetScale不支持:
  • 数据库级外键约束(需在应用层实现)
  • 存储过程
  • 触发器
  • 事件
当PlanetScale作为目标时,需标记所有上述特性,说明需在应用层处理。当PlanetScale作为源时,需说明外键关系必须通过命名约定和应用代码推断。

Step 2: Data Type Mapping

步骤2:数据类型映射

This is the core translation layer. Map every source type to the best target type.
这是核心转换层。需将每个源类型映射到最合适的目标类型。

2a. PostgreSQL to MySQL Type Map

2a. PostgreSQL到MySQL类型映射

PostgreSQL TypeMySQL TypeNotes
SMALLINTSMALLINTDirect
INTEGERINTDirect
BIGINTBIGINTDirect
SERIALINT AUTO_INCREMENTRemove DEFAULT nextval()
BIGSERIALBIGINT AUTO_INCREMENTRemove DEFAULT nextval()
NUMERIC(p,s)DECIMAL(p,s)Direct
REALFLOATDirect
DOUBLE PRECISIONDOUBLEDirect
MONEYDECIMAL(19,4)Loses currency formatting
BOOLEANTINYINT(1)TRUE/FALSE to 1/0
CHAR(n)CHAR(n)Direct
VARCHAR(n)VARCHAR(n)Direct
TEXTLONGTEXTMySQL TEXT is 65KB; LONGTEXT is 4GB
BYTEALONGBLOBBinary data
DATEDATEDirect
TIMETIMEDirect
TIMESTAMPDATETIME(6)MySQL TIMESTAMP has 2038 limit
TIMESTAMP WITH TIME ZONEDATETIME(6)Store timezone separately or use UTC
INTERVALVARCHAR(255)No native interval in MySQL
UUIDCHAR(36) or BINARY(16)CHAR(36) for readability, BINARY(16) for performance
JSONJSONDirect (MySQL 5.7+)
JSONBJSONLoses binary optimization; add generated columns for indexed paths
ARRAYJSONNo native arrays in MySQL
HSTOREJSONKey-value to JSON object
INETVARCHAR(45)IPv4 and IPv6
CIDRVARCHAR(45)Network address
MACADDRVARCHAR(17)MAC address string
POINTPOINTSpatial type (requires spatial index changes)
GEOMETRYGEOMETRYSpatial type
TSVECTORFULLTEXT INDEXUse FULLTEXT index on relevant columns
ENUM('a','b')ENUM('a','b')Direct (but MySQL ENUM has different behavior)
INT4RANGEVARCHAR(255)No native range types in MySQL
BIT(n)BIT(n)Direct
XMLLONGTEXTNo native XML in MySQL
PostgreSQL类型MySQL类型说明
SMALLINTSMALLINT直接映射
INTEGERINT直接映射
BIGINTBIGINT直接映射
SERIALINT AUTO_INCREMENT移除DEFAULT nextval()
BIGSERIALBIGINT AUTO_INCREMENT移除DEFAULT nextval()
NUMERIC(p,s)DECIMAL(p,s)直接映射
REALFLOAT直接映射
DOUBLE PRECISIONDOUBLE直接映射
MONEYDECIMAL(19,4)丢失货币格式
BOOLEANTINYINT(1)TRUE/FALSE转换为1/0
CHAR(n)CHAR(n)直接映射
VARCHAR(n)VARCHAR(n)直接映射
TEXTLONGTEXTMySQL TEXT最大65KB;LONGTEXT最大4GB
BYTEALONGBLOB二进制数据
DATEDATE直接映射
TIMETIME直接映射
TIMESTAMPDATETIME(6)MySQL TIMESTAMP存在2038年限制
TIMESTAMP WITH TIME ZONEDATETIME(6)单独存储时区或使用UTC
INTERVALVARCHAR(255)MySQL无原生间隔类型
UUIDCHAR(36)或BINARY(16)CHAR(36)可读性好,BINARY(16)性能高
JSONJSON直接映射(MySQL 5.7+)
JSONBJSON丢失二进制优化;为索引路径添加生成列
ARRAYJSONMySQL无原生数组类型
HSTOREJSON键值对转换为JSON对象
INETVARCHAR(45)IPv4和IPv6
CIDRVARCHAR(45)网络地址
MACADDRVARCHAR(17)MAC地址字符串
POINTPOINT空间类型(需修改空间索引)
GEOMETRYGEOMETRY空间类型
TSVECTORFULLTEXT INDEX在相关列上使用全文索引
ENUM('a','b')ENUM('a','b')直接映射(但MySQL ENUM行为不同)
INT4RANGEVARCHAR(255)MySQL无原生范围类型
BIT(n)BIT(n)直接映射
XMLLONGTEXTMySQL无原生XML类型

2b. MySQL to PostgreSQL Type Map

2b. MySQL到PostgreSQL类型映射

MySQL TypePostgreSQL TypeNotes
TINYINTSMALLINTDirect
TINYINT(1)BOOLEANIf used as boolean
SMALLINTSMALLINTDirect
MEDIUMINTINTEGERNo MEDIUMINT in Postgres
INTINTEGERDirect
BIGINTBIGINTDirect
INT AUTO_INCREMENTSERIAL or GENERATED ALWAYS AS IDENTITYPrefer IDENTITY for new schemas
FLOATREALDirect
DOUBLEDOUBLE PRECISIONDirect
DECIMAL(p,s)NUMERIC(p,s)Direct
BIT(n)BIT(n)Direct
CHAR(n)CHAR(n)Direct
VARCHAR(n)VARCHAR(n)Direct
TINYTEXTTEXTPostgres TEXT has no size limit
TEXTTEXTDirect
MEDIUMTEXTTEXTDirect
LONGTEXTTEXTDirect
TINYBLOBBYTEADirect
BLOBBYTEADirect
MEDIUMBLOBBYTEADirect
LONGBLOBBYTEADirect
DATEDATEDirect
TIMETIMEDirect
DATETIMETIMESTAMPDirect
TIMESTAMPTIMESTAMP WITH TIME ZONEMySQL TIMESTAMP is UTC-converted
YEARSMALLINTNo YEAR type in Postgres
ENUM('a','b')VARCHAR + CHECK or CREATE TYPEPrefer CREATE TYPE for Postgres enums
SET('a','b')TEXT[] or VARCHAR + CHECKUse array type
JSONJSONBPrefer JSONB for indexing
GEOMETRYGEOMETRY (PostGIS)Requires PostGIS extension
POINTPOINTNative or PostGIS
BINARY(n)BYTEADirect
VARBINARY(n)BYTEADirect
MySQL类型PostgreSQL类型说明
TINYINTSMALLINT直接映射
TINYINT(1)BOOLEAN若用作布尔值
SMALLINTSMALLINT直接映射
MEDIUMINTINTEGERPostgreSQL无MEDIUMINT类型
INTINTEGER直接映射
BIGINTBIGINT直接映射
INT AUTO_INCREMENTSERIAL或GENERATED ALWAYS AS IDENTITY推荐使用IDENTITY用于新Schema
FLOATREAL直接映射
DOUBLEDOUBLE PRECISION直接映射
DECIMAL(p,s)NUMERIC(p,s)直接映射
BIT(n)BIT(n)直接映射
CHAR(n)CHAR(n)直接映射
VARCHAR(n)VARCHAR(n)直接映射
TINYTEXTTEXTPostgreSQL TEXT无大小限制
TEXTTEXT直接映射
MEDIUMTEXTTEXT直接映射
LONGTEXTTEXT直接映射
TINYBLOBBYTEA直接映射
BLOBBYTEA直接映射
MEDIUMBLOBBYTEA直接映射
LONGBLOBBYTEA直接映射
DATEDATE直接映射
TIMETIME直接映射
DATETIMETIMESTAMP直接映射
TIMESTAMPTIMESTAMP WITH TIME ZONEMySQL TIMESTAMP会转换为UTC
YEARSMALLINTPostgreSQL无YEAR类型
ENUM('a','b')VARCHAR + CHECK或CREATE TYPE推荐使用CREATE TYPE创建PostgreSQL枚举
SET('a','b')TEXT[]或VARCHAR + CHECK使用数组类型
JSONJSONB推荐使用JSONB用于索引
GEOMETRYGEOMETRY (PostGIS)需要PostGIS扩展
POINTPOINT原生类型或PostGIS类型
BINARY(n)BYTEA直接映射
VARBINARY(n)BYTEA直接映射

2c. Relational to MongoDB Type Map

2c. 关系型到MongoDB类型映射

SQL TypeMongoDB (BSON) TypeNotes
INTEGER / INTNumberInt (int32)Direct
BIGINTNumberLong (int64)Direct
SERIAL / AUTO_INCREMENTObjectId or NumberLongObjectId preferred for _id
NUMERIC / DECIMALNumberDecimal (Decimal128)Direct
FLOAT / REALDoubleDirect
BOOLEANBooleanDirect
CHAR / VARCHAR / TEXTStringDirect
DATEDateDirect
TIMESTAMPDateMongoDB Date is millisecond precision
BYTEA / BLOBBinDataDirect
UUIDString or BinData(4)BinData(4) is more compact
JSON / JSONBObjectNative -- embed directly
ARRAYArrayNative -- embed directly
ENUMString + validationUse JSON Schema validator
SQL类型MongoDB (BSON)类型说明
INTEGER / INTNumberInt (int32)直接映射
BIGINTNumberLong (int64)直接映射
SERIAL / AUTO_INCREMENTObjectId或NumberLong推荐使用ObjectId作为_id
NUMERIC / DECIMALNumberDecimal (Decimal128)直接映射
FLOAT / REALDouble直接映射
BOOLEANBoolean直接映射
CHAR / VARCHAR / TEXTString直接映射
DATEDate直接映射
TIMESTAMPDateMongoDB Date为毫秒精度
BYTEA / BLOBBinData直接映射
UUIDString或BinData(4)BinData(4)更紧凑
JSON / JSONBObject原生类型——直接嵌入
ARRAYArray原生类型——直接嵌入
ENUMString + 验证使用JSON Schema验证器

2d. MongoDB to Relational Type Map

2d. MongoDB到关系型类型映射

MongoDB (BSON) TypePostgreSQL TypeMySQL TypeNotes
ObjectIdCHAR(24) or UUIDCHAR(24) or BINARY(12)Convert to hex string or generate new UUID
StringTEXT or VARCHARVARCHAR(n) or TEXTInspect max lengths in sample data
NumberInt (int32)INTEGERINTDirect
NumberLong (int64)BIGINTBIGINTDirect
DoubleDOUBLE PRECISIONDOUBLEDirect
NumberDecimalNUMERICDECIMALDirect
BooleanBOOLEANTINYINT(1)Direct
DateTIMESTAMP WITH TIME ZONEDATETIME(3)Direct
BinDataBYTEALONGBLOBDirect
ArrayJSONB or junction tableJSON or junction tableSimple arrays: JSONB/JSON; relational arrays: junction table
Embedded ObjectJSONB or separate tableJSON or separate tableDecide based on query patterns
NullNULLNULLNullable columns
RegexTEXTVARCHARStore pattern as string
Timestamp (BSON)TIMESTAMPTIMESTAMPInternal MongoDB type -- convert to standard timestamp
MongoDB (BSON)类型PostgreSQL类型MySQL类型说明
ObjectIdCHAR(24)或UUIDCHAR(24)或BINARY(12)转换为十六进制字符串或生成新UUID
StringTEXT或VARCHARVARCHAR(n)或TEXT检查样本数据中的最大长度
NumberInt (int32)INTEGERINT直接映射
NumberLong (int64)BIGINTBIGINT直接映射
DoubleDOUBLE PRECISIONDOUBLE直接映射
NumberDecimalNUMERICDECIMAL直接映射
BooleanBOOLEANTINYINT(1)直接映射
DateTIMESTAMP WITH TIME ZONEDATETIME(3)直接映射
BinDataBYTEALONGBLOB直接映射
ArrayJSONB或关联表JSON或关联表简单数组:JSONB/JSON;关系型数组:关联表
嵌入对象JSONB或单独表JSON或单独表根据查询模式决定
NullNULLNULL可空列
RegexTEXTVARCHAR将模式存储为字符串
Timestamp (BSON)TIMESTAMPTIMESTAMPMongoDB内部类型——转换为标准时间戳

Step 3: Schema Translation and Script Generation

步骤3:Schema转换与脚本生成

With the full schema extracted and type mapping established, generate the migration scripts.
在提取完整Schema并完成类型映射后,生成迁移脚本。

3a. Table Creation Order

3a. 表创建顺序

Foreign keys create dependencies between tables. Tables must be created in topological order (dependencies first):
  1. Build a directed graph where an edge from table A to table B means A has a foreign key referencing B.
  2. Perform a topological sort on this graph.
  3. If cycles exist (mutual foreign keys), break the cycle by deferring one foreign key constraint to be added after all tables are created.
Table creation order algorithm:
1. Find all tables with zero foreign key dependencies -- these go first.
2. Remove those tables from the graph.
3. Repeat until all tables are placed.
4. If the graph is not empty after exhaustion, cycles exist.
   For each cycle: create all tables without the cyclic FK, then ALTER TABLE to add it.
外键会在表之间创建依赖关系。表必须按拓扑顺序创建(先创建被依赖的表):
  1. 构建有向图,其中从表A到表B的边表示A有外键引用B。
  2. 对该图执行拓扑排序。
  3. 如果存在循环(相互外键),通过延迟添加其中一个外键约束来打破循环,待所有表创建完成后再添加。
表创建顺序算法:
1. 找到所有没有外键依赖的表——这些表优先创建。
2. 将这些表从图中移除。
3. 重复步骤1-2,直到所有表都被处理。
4. 如果遍历后图仍不为空,说明存在循环。
   对于每个循环:先创建所有表但不添加循环外键,然后通过ALTER TABLE添加外键。

3b. DDL Script Generation

3b. DDL脚本生成

For each table, generate the target-dialect CREATE TABLE statement. The script must include:
  1. Table definition with all columns, types (mapped per Step 2), defaults, and NOT NULL constraints
  2. Primary key definition (inline or as constraint)
  3. Unique constraints
  4. Check constraints (translated to target dialect)
  5. Foreign key constraints (respecting creation order from 3a)
  6. Indexes (translated to target syntax)
  7. Comments on tables and columns (if supported by target)
Template for each table in the output:
sql
-- =============================================================================
-- Table: [schema].[table_name]
-- Source: [source_provider] [schema].[table_name]
-- Rows (estimated): [N]
-- =============================================================================

CREATE TABLE [target_schema].[table_name] (
    [column definitions with mapped types]
);

-- Primary Key
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT pk_[table_name] PRIMARY KEY ([columns]);

-- Unique Constraints
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT uk_[table_name]_[columns] UNIQUE ([columns]);

-- Check Constraints
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT ck_[table_name]_[name] CHECK ([translated_expression]);

-- Foreign Keys (only if target supports them)
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT fk_[table_name]_[column]
    FOREIGN KEY ([column]) REFERENCES [target_schema].[referenced_table]([referenced_column])
    ON UPDATE [action] ON DELETE [action];

-- Indexes
CREATE INDEX idx_[table_name]_[columns] ON [target_schema].[table_name] ([columns]);
CREATE UNIQUE INDEX uidx_[table_name]_[columns] ON [target_schema].[table_name] ([columns]);

-- Comments
COMMENT ON TABLE [target_schema].[table_name] IS '[description]';
COMMENT ON COLUMN [target_schema].[table_name].[column] IS '[description]';
为每张表生成目标方言的CREATE TABLE语句。脚本必须包含:
  1. 表定义:包含所有列、映射后的类型、默认值和NOT NULL约束
  2. 主键定义(内联或作为约束)
  3. 唯一约束
  4. 检查约束(转换为目标方言)
  5. 外键约束(遵循3a中的创建顺序)
  6. 索引(转换为目标语法)
  7. 表和列的注释(如果目标支持)
输出中每张表的模板:
sql
-- =============================================================================
-- 表:[schema].[table_name]
-- 源:[source_provider] [schema].[table_name]
-- 预估行数:[N]
-- =============================================================================

CREATE TABLE [target_schema].[table_name] (
    [带映射类型的列定义]
);

-- 主键
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT pk_[table_name] PRIMARY KEY ([columns]);

-- 唯一约束
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT uk_[table_name]_[columns] UNIQUE ([columns]);

-- 检查约束
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT ck_[table_name]_[name] CHECK ([转换后的表达式]);

-- 外键(仅当目标支持时)
ALTER TABLE [target_schema].[table_name]
    ADD CONSTRAINT fk_[table_name]_[column]
    FOREIGN KEY ([column]) REFERENCES [target_schema].[referenced_table]([referenced_column])
    ON UPDATE [action] ON DELETE [action];

-- 索引
CREATE INDEX idx_[table_name]_[columns] ON [target_schema].[table_name] ([columns]);
CREATE UNIQUE INDEX uidx_[table_name]_[columns] ON [target_schema].[table_name] ([columns]);

-- 注释
COMMENT ON TABLE [target_schema].[table_name] IS '[描述]';
COMMENT ON COLUMN [target_schema].[table_name].[column] IS '[描述]';

3c. Sequence and Auto-Increment Translation

3c. 序列与自增字段转换

PostgreSQL to MySQL:
  • Replace
    SERIAL
    /
    BIGSERIAL
    with
    AUTO_INCREMENT
  • Replace
    GENERATED ALWAYS AS IDENTITY
    with
    AUTO_INCREMENT
  • Remove all
    CREATE SEQUENCE
    statements
  • Remove all
    DEFAULT nextval('sequence_name')
    and use
    AUTO_INCREMENT
    on the column
  • After data load, set
    AUTO_INCREMENT
    value:
    ALTER TABLE t AUTO_INCREMENT = [max_id + 1];
MySQL to PostgreSQL:
  • Replace
    AUTO_INCREMENT
    with
    GENERATED ALWAYS AS IDENTITY
    (preferred) or
    SERIAL
  • After data load, reset sequence:
    SELECT setval('table_column_seq', (SELECT MAX(column) FROM table));
Relational to MongoDB:
  • Remove auto-increment entirely; use ObjectId for
    _id
    unless the application requires numeric IDs
  • If numeric IDs required, document a counter collection pattern:
javascript
// Counter collection for auto-increment emulation
db.counters.insertOne({ _id: "table_name", seq: 0 });

// Get next ID
function getNextSequence(name) {
    var ret = db.counters.findOneAndUpdate(
        { _id: name },
        { $inc: { seq: 1 } },
        { returnDocument: "after" }
    );
    return ret.seq;
}
PostgreSQL到MySQL:
  • SERIAL
    /
    BIGSERIAL
    替换为
    AUTO_INCREMENT
  • GENERATED ALWAYS AS IDENTITY
    替换为
    AUTO_INCREMENT
  • 删除所有
    CREATE SEQUENCE
    语句
  • 删除所有
    DEFAULT nextval('sequence_name')
    ,改为在列上使用
    AUTO_INCREMENT
  • 数据加载完成后,设置
    AUTO_INCREMENT
    值:
    ALTER TABLE t AUTO_INCREMENT = [max_id + 1];
MySQL到PostgreSQL:
  • AUTO_INCREMENT
    替换为
    GENERATED ALWAYS AS IDENTITY
    (推荐)或
    SERIAL
  • 数据加载完成后,重置序列:
    SELECT setval('table_column_seq', (SELECT MAX(column) FROM table));
关系型到MongoDB:
  • 完全移除自增;使用ObjectId作为
    _id
    ,除非应用需要数值ID
  • 如果需要数值ID,记录计数器集合模式:
javascript
// 用于模拟自增的计数器集合
db.counters.insertOne({ _id: "table_name", seq: 0 });

// 获取下一个ID
function getNextSequence(name) {
    var ret = db.counters.findOneAndUpdate(
        { _id: name },
        { $inc: { seq: 1 } },
        { returnDocument: "after" }
    );
    return ret.seq;
}

3d. Trigger Translation

3d. 触发器转换

Triggers are the most provider-specific feature. Each translation requires careful rewriting.
PostgreSQL triggers to MySQL:
  • PostgreSQL uses trigger functions (PL/pgSQL); MySQL uses inline trigger bodies
  • Replace
    NEW.column
    /
    OLD.column
    syntax (same in both, but function wrapper differs)
  • Replace
    RETURN NEW;
    /
    RETURN OLD;
    (not needed in MySQL)
  • Replace
    TG_OP
    with separate triggers per operation
  • Replace
    RAISE EXCEPTION
    with
    SIGNAL SQLSTATE
MySQL triggers to PostgreSQL:
  • Wrap trigger body in a PL/pgSQL function
  • Add
    RETURN NEW;
    or
    RETURN NULL;
    as appropriate
  • Replace
    SIGNAL SQLSTATE
    with
    RAISE EXCEPTION
Relational triggers to MongoDB:
  • Document that MongoDB does not have database-level triggers
  • Recommend alternatives:
    • MongoDB Change Streams (for event-driven processing)
    • Application-level middleware (Mongoose pre/post hooks)
    • Atlas Triggers (if using MongoDB Atlas)
PlanetScale target:
  • PlanetScale does not support triggers
  • Document all trigger logic that must move to the application layer
  • Generate application-level middleware code or ORM hooks as replacements
触发器是最具服务商特异性的特性。每次转换都需要仔细重写。
PostgreSQL触发器到MySQL:
  • PostgreSQL使用触发器函数(PL/pgSQL);MySQL使用内联触发器体
  • 替换
    NEW.column
    /
    OLD.column
    语法(两者语法相同,但函数包装不同)
  • 移除
    RETURN NEW;
    /
    RETURN OLD;
    (MySQL不需要)
  • TG_OP
    替换为按操作拆分的独立触发器
  • RAISE EXCEPTION
    替换为
    SIGNAL SQLSTATE
MySQL触发器到PostgreSQL:
  • 将触发器体包装在PL/pgSQL函数中
  • 根据情况添加
    RETURN NEW;
    RETURN NULL;
  • SIGNAL SQLSTATE
    替换为
    RAISE EXCEPTION
关系型触发器到MongoDB:
  • 说明MongoDB没有数据库级触发器
  • 推荐替代方案:
    • MongoDB Change Streams(用于事件驱动处理)
    • 应用层中间件(Mongoose pre/post钩子)
    • Atlas Triggers(如果使用MongoDB Atlas)
PlanetScale作为目标:
  • PlanetScale不支持触发器
  • 记录所有必须迁移到应用层的触发器逻辑
  • 生成应用层中间件代码或ORM钩子作为替代

3e. Stored Procedure and Function Translation

3e. 存储过程与函数转换

PostgreSQL to MySQL:
  • Replace
    CREATE OR REPLACE FUNCTION
    with
    CREATE PROCEDURE
    or
    CREATE FUNCTION
  • Replace PL/pgSQL syntax with MySQL procedural SQL
  • Replace
    RETURNS TABLE(...)
    with result set from SELECT
  • Replace
    $$
    delimiters with
    DELIMITER //
    ...
    //
    pattern
  • Replace
    RAISE NOTICE
    with
    SELECT
    for debug output
  • Replace
    RAISE EXCEPTION
    with
    SIGNAL SQLSTATE
  • Replace
    PERFORM
    with
    DO
    or
    SELECT ... INTO @dummy
  • Replace
    RETURNING
    clause (not available in MySQL; use
    LAST_INSERT_ID()
    )
MySQL to PostgreSQL:
  • Replace
    DELIMITER
    pattern with
    $$
    delimiters
  • Replace
    SIGNAL SQLSTATE
    with
    RAISE EXCEPTION
  • Replace
    LAST_INSERT_ID()
    with
    RETURNING
    clause or
    currval()
  • Replace
    GROUP_CONCAT
    with
    string_agg
  • Replace
    IFNULL
    with
    COALESCE
  • Replace
    IF()
    function with
    CASE WHEN
Relational to MongoDB:
  • Stored procedures do not exist in MongoDB
  • Translate to:
    • Aggregation pipelines (for data processing logic)
    • Application-level service functions
    • MongoDB Atlas Functions (if using Atlas)
PlanetScale target:
  • PlanetScale does not support stored procedures
  • All procedural logic must move to the application layer
PostgreSQL到MySQL:
  • CREATE OR REPLACE FUNCTION
    替换为
    CREATE PROCEDURE
    CREATE FUNCTION
  • 将PL/pgSQL语法替换为MySQL过程化SQL
  • RETURNS TABLE(...)
    替换为SELECT返回的结果集
  • $$
    分隔符替换为
    DELIMITER //
    ...
    //
    模式
  • RAISE NOTICE
    替换为
    SELECT
    用于调试输出
  • RAISE EXCEPTION
    替换为
    SIGNAL SQLSTATE
  • PERFORM
    替换为
    DO
    SELECT ... INTO @dummy
  • 替换
    RETURNING
    子句(MySQL不支持;使用
    LAST_INSERT_ID()
MySQL到PostgreSQL:
  • DELIMITER
    模式替换为
    $$
    分隔符
  • SIGNAL SQLSTATE
    替换为
    RAISE EXCEPTION
  • LAST_INSERT_ID()
    替换为
    RETURNING
    子句或
    currval()
  • GROUP_CONCAT
    替换为
    string_agg
  • IFNULL
    替换为
    COALESCE
  • IF()
    函数替换为
    CASE WHEN
关系型到MongoDB:
  • MongoDB没有存储过程
  • 转换为:
    • 聚合管道(用于数据处理逻辑)
    • 应用层服务函数
    • MongoDB Atlas Functions(如果使用Atlas)
PlanetScale作为目标:
  • PlanetScale不支持存储过程
  • 所有过程化逻辑必须迁移到应用层

3f. View Translation

3f. 视图转换

Views are generally straightforward to translate but may contain provider-specific SQL:
  1. Extract the view definition SQL
  2. Translate any provider-specific functions (see function mapping below)
  3. Translate data types in CAST expressions
  4. Adjust JOIN syntax if needed
  5. For materialized views (PostgreSQL), note that MySQL does not support them natively -- recommend creating a table with a refresh procedure instead
视图转换通常比较简单,但可能包含服务商特定的SQL:
  1. 提取视图定义SQL
  2. 转换任何服务商特定的函数(参见下方函数映射)
  3. 转换CAST表达式中的数据类型
  4. 根据需要调整JOIN语法
  5. 对于物化视图(PostgreSQL),说明MySQL不原生支持——推荐创建表并添加刷新过程

3g. Common SQL Function Mapping

3g. 常用SQL函数映射

PostgreSQLMySQLNotes
NOW()NOW()Direct
CURRENT_TIMESTAMPCURRENT_TIMESTAMPDirect
string_agg(col, ',')GROUP_CONCAT(col SEPARATOR ',')Different syntax
COALESCE(a, b)COALESCE(a, b) or IFNULL(a, b)Direct
CONCAT_WS(',', a, b)CONCAT_WS(',', a, b)Direct
SUBSTRING(s FROM n FOR m)SUBSTRING(s, n, m)Different syntax
EXTRACT(EPOCH FROM ts)UNIX_TIMESTAMP(ts)Different function
TO_CHAR(ts, 'YYYY-MM-DD')DATE_FORMAT(ts, '%Y-%m-%d')Different format codes
INTERVAL '1 day'INTERVAL 1 DAYDifferent syntax
GENERATE_SERIES(1, 10)Recursive CTE or sequence tableNo direct equivalent
ARRAY_AGG(col)JSON_ARRAYAGG(col)MySQL 5.7+
UNNEST(array_col)JSON_TABLE(...)MySQL 8.0+
ANY(array)IN (...) or JSON_CONTAINSDifferent approach
ILIKELIKE (case-insensitive collation)Set collation or use LOWER()
SIMILAR TOREGEXPDifferent regex engine
~ (regex match)REGEXPDirect equivalent
gen_random_uuid()UUID()Direct equivalent
RETURNING idLAST_INSERT_ID()Different approach
ON CONFLICT DO UPDATEINSERT ... ON DUPLICATE KEY UPDATEDifferent syntax
LIMIT n OFFSET mLIMIT m, n or LIMIT n OFFSET mMySQL supports both
BOOLEAN true/false1/0Literal translation
::type (cast)CAST(x AS type)Postgres shorthand
PostgreSQLMySQL说明
NOW()NOW()直接映射
CURRENT_TIMESTAMPCURRENT_TIMESTAMP直接映射
string_agg(col, ',')GROUP_CONCAT(col SEPARATOR ',')语法不同
COALESCE(a, b)COALESCE(a, b)或IFNULL(a, b)直接映射
CONCAT_WS(',', a, b)CONCAT_WS(',', a, b)直接映射
SUBSTRING(s FROM n FOR m)SUBSTRING(s, n, m)语法不同
EXTRACT(EPOCH FROM ts)UNIX_TIMESTAMP(ts)函数不同
TO_CHAR(ts, 'YYYY-MM-DD')DATE_FORMAT(ts, '%Y-%m-%d')格式代码不同
INTERVAL '1 day'INTERVAL 1 DAY语法不同
GENERATE_SERIES(1, 10)递归CTE或序列表无直接等价函数
ARRAY_AGG(col)JSON_ARRAYAGG(col)MySQL 5.7+支持
UNNEST(array_col)JSON_TABLE(...)MySQL 8.0+支持
ANY(array)IN (...)或JSON_CONTAINS实现方式不同
ILIKELIKE(不区分大小写排序规则)设置排序规则或使用LOWER()
SIMILAR TOREGEXP正则引擎不同
~(正则匹配)REGEXP直接等价
gen_random_uuid()UUID()直接等价
RETURNING idLAST_INSERT_ID()实现方式不同
ON CONFLICT DO UPDATEINSERT ... ON DUPLICATE KEY UPDATE语法不同
LIMIT n OFFSET mLIMIT m, n或LIMIT n OFFSET mMySQL两种语法都支持
BOOLEAN true/false1/0字面量转换
::type(强制转换)CAST(x AS type)PostgreSQL简写语法

Step 4: Data Migration Scripts

步骤4:数据迁移脚本

4a. Data Export from Source

4a. 从源数据库导出数据

For relational sources, generate export commands:
PostgreSQL / Supabase:
bash
undefined
对于关系型源,生成导出命令:
PostgreSQL / Supabase:
bash
undefined

Full table export to CSV

将完整表导出为CSV

pg_dump --data-only --format=plain --table=[schema].[table] --file=[table].sql [dbname]
pg_dump --data-only --format=plain --table=[schema].[table] --file=[table].sql [dbname]

Or CSV format for cross-platform compatibility

或使用CSV格式以实现跨平台兼容性

psql -c "COPY [schema].[table] TO STDOUT WITH CSV HEADER" [dbname] > [table].csv

**MySQL / PlanetScale:**
```bash
psql -c "COPY [schema].[table] TO STDOUT WITH CSV HEADER" [dbname] > [table].csv

**MySQL / PlanetScale:**
```bash

Full table export

导出完整表

mysqldump --no-create-info --tab=/tmp/export --fields-terminated-by=',' --fields-enclosed-by='"' --lines-terminated-by='\n' [dbname] [table]
mysqldump --no-create-info --tab=/tmp/export --fields-terminated-by=',' --fields-enclosed-by='"' --lines-terminated-by='\n' [dbname] [table]

Or SELECT INTO OUTFILE

或使用SELECT INTO OUTFILE

mysql -e "SELECT * INTO OUTFILE '/tmp/[table].csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' FROM [table]" [dbname]

**MongoDB:**
```bash
mysql -e "SELECT * INTO OUTFILE '/tmp/[table].csv' FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' FROM [table]" [dbname]

**MongoDB:**
```bash

Export collection to JSON

将集合导出为JSON

mongoexport --db=[dbname] --collection=[collection] --out=[collection].json --jsonArray
mongoexport --db=[dbname] --collection=[collection] --out=[collection].json --jsonArray

Or BSON for binary data preservation

或使用BSON以保留二进制数据

mongodump --db=[dbname] --collection=[collection] --out=./dump/
undefined
mongodump --db=[dbname] --collection=[collection] --out=./dump/
undefined

4b. Data Transformation

4b. 数据转换

Generate transformation scripts for data that needs conversion:
sql
-- Example: PostgreSQL boolean to MySQL tinyint
-- In the INSERT or LOAD DATA statement:
-- Replace TRUE with 1, FALSE with 0

-- Example: PostgreSQL array to MySQL JSON
-- Transform: '{1,2,3}' becomes '[1,2,3]'

-- Example: PostgreSQL UUID to MySQL CHAR(36)
-- No transformation needed if stored as text

-- Example: PostgreSQL TIMESTAMP WITH TIME ZONE to MySQL DATETIME
-- Convert to UTC before export:
SET timezone = 'UTC';
COPY (SELECT id, created_at AT TIME ZONE 'UTC' AS created_at FROM table) TO STDOUT WITH CSV HEADER;
为需要转换的数据生成转换脚本:
sql
-- 示例:PostgreSQL布尔值转换为MySQL tinyint
-- 在INSERT或LOAD DATA语句中:
-- 将TRUE替换为1,FALSE替换为0

-- 示例:PostgreSQL数组转换为MySQL JSON
-- 转换:'{1,2,3}'变为'[1,2,3]'

-- 示例:PostgreSQL UUID转换为MySQL CHAR(36)
-- 如果存储为文本则无需转换

-- 示例:PostgreSQL TIMESTAMP WITH TIME ZONE转换为MySQL DATETIME
-- 导出前转换为UTC:
SET timezone = 'UTC';
COPY (SELECT id, created_at AT TIME ZONE 'UTC' AS created_at FROM table) TO STDOUT WITH CSV HEADER;

4c. Data Import to Target

4c. 导入数据到目标数据库

PostgreSQL / Supabase (target):
sql
-- Disable triggers during load
ALTER TABLE [table] DISABLE TRIGGER ALL;

-- Disable foreign key checks during load
SET session_replication_role = replica;

-- Load data
COPY [schema].[table] FROM '/path/to/[table].csv' WITH CSV HEADER;

-- Reset sequences after load
SELECT setval(pg_get_serial_sequence('[schema].[table]', '[id_column]'),
    (SELECT COALESCE(MAX([id_column]), 0) FROM [schema].[table]));

-- Re-enable triggers
ALTER TABLE [table] ENABLE TRIGGER ALL;

-- Re-enable foreign key checks
SET session_replication_role = DEFAULT;

-- Analyze tables for query planner
ANALYZE [schema].[table];
MySQL / PlanetScale (target):
sql
-- Disable foreign key checks
SET FOREIGN_KEY_CHECKS = 0;
SET UNIQUE_CHECKS = 0;
SET AUTOCOMMIT = 0;

-- Load data
LOAD DATA INFILE '/path/to/[table].csv'
INTO TABLE [table]
FIELDS TERMINATED BY ',' ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS;

-- Reset auto_increment
ALTER TABLE [table] AUTO_INCREMENT = (SELECT MAX(id) + 1 FROM [table]);

-- Re-enable checks
SET FOREIGN_KEY_CHECKS = 1;
SET UNIQUE_CHECKS = 1;
COMMIT;

-- Analyze tables
ANALYZE TABLE [table];
MongoDB (target):
bash
undefined
PostgreSQL / Supabase(目标):
sql
-- 加载期间禁用触发器
ALTER TABLE [table] DISABLE TRIGGER ALL;

-- 加载期间禁用外键检查
SET session_replication_role = replica;

-- 加载数据
COPY [schema].[table] FROM '/path/to/[table].csv' WITH CSV HEADER;

-- 加载完成后重置序列
SELECT setval(pg_get_serial_sequence('[schema].[table]', '[id_column]'),
    (SELECT COALESCE(MAX([id_column]), 0) FROM [schema].[table]));

-- 重新启用触发器
ALTER TABLE [table] ENABLE TRIGGER ALL;

-- 重新启用外键检查
SET session_replication_role = DEFAULT;

-- 分析表以优化查询计划
ANALYZE [schema].[table];
MySQL / PlanetScale(目标):
sql
-- 禁用外键检查
SET FOREIGN_KEY_CHECKS = 0;
SET UNIQUE_CHECKS = 0;
SET AUTOCOMMIT = 0;

-- 加载数据
LOAD DATA INFILE '/path/to/[table].csv'
INTO TABLE [table]
FIELDS TERMINATED BY ',' ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS;

-- 重置auto_increment
ALTER TABLE [table] AUTO_INCREMENT = (SELECT MAX(id) + 1 FROM [table]);

-- 重新启用检查
SET FOREIGN_KEY_CHECKS = 1;
SET UNIQUE_CHECKS = 1;
COMMIT;

-- 分析表
ANALYZE TABLE [table];
MongoDB(目标):
bash
-- 从JSON导入
mongoimport --db=[dbname] --collection=[collection] --file=[collection].json --jsonArray

-- 或从BSON转储导入
mongorestore --db=[dbname] --collection=[collection] ./dump/[dbname]/[collection].bson

Import from JSON

步骤5:验证计划

mongoimport --db=[dbname] --collection=[collection] --file=[collection].json --jsonArray
每次迁移都必须验证。为源和目标数据库生成验证查询。

Or from BSON dump

5a. 行数验证

mongorestore --db=[dbname] --collection=[collection] ./dump/[dbname]/[collection].bson
undefined
为每张表生成查询,对比源和目标的行数:
sql
-- 源(在源数据库运行)
SELECT '[table_name]' AS table_name, COUNT(*) AS row_count FROM [source_schema].[table_name]
UNION ALL
SELECT '[table_name_2]', COUNT(*) FROM [source_schema].[table_name_2]
-- ... 为所有表重复
ORDER BY table_name;

-- 目标(在目标数据库运行)
SELECT '[table_name]' AS table_name, COUNT(*) AS row_count FROM [target_schema].[table_name]
UNION ALL
SELECT '[table_name_2]', COUNT(*) FROM [target_schema].[table_name_2]
-- ... 为所有表重复
ORDER BY table_name;
预期结果:每张表的行数完全一致。

Step 5: Validation Plan

5b. 数据校验和验证

Every migration must be validated. Generate validation queries for both source and target.
为关键表生成校验和查询。对比源和目标中关键列的哈希值:
PostgreSQL(源或目标):
sql
SELECT
    MD5(string_agg(
        COALESCE(id::text, 'NULL') || '|' ||
        COALESCE(name, 'NULL') || '|' ||
        COALESCE(email, 'NULL') || '|' ||
        COALESCE(created_at::text, 'NULL'),
        ',' ORDER BY id
    )) AS table_checksum
FROM [schema].[table];
MySQL(源或目标):
sql
SELECT
    MD5(GROUP_CONCAT(
        CONCAT_WS('|',
            COALESCE(id, 'NULL'),
            COALESCE(name, 'NULL'),
            COALESCE(email, 'NULL'),
            COALESCE(created_at, 'NULL')
        )
        ORDER BY id SEPARATOR ','
    )) AS table_checksum
FROM [table];
MongoDB(源或目标):
javascript
-- 对集合中的所有文档生成哈希值
var hash = db[collection].aggregate([
    { $sort: { _id: 1 } },
    { $group: {
        _id: null,
        docs: { $push: { $concat: [
            { $toString: "$_id" }, "|",
            { $ifNull: ["$name", "NULL"] }, "|",
            { $ifNull: ["$email", "NULL"] }
        ]}}
    }},
    { $project: {
        checksum: { $function: {
            body: function(arr) { return hex_md5(arr.join(",")); },
            args: ["$docs"],
            lang: "js"
        }}
    }}
]);
预期结果:每个验证表的源和目标校验和匹配。

5a. Row Count Validation

5c. 外键完整性验证

Generate a query for each table that compares source and target row counts:
sql
-- Source (run on source database)
SELECT '[table_name]' AS table_name, COUNT(*) AS row_count FROM [source_schema].[table_name]
UNION ALL
SELECT '[table_name_2]', COUNT(*) FROM [source_schema].[table_name_2]
-- ... repeat for all tables
ORDER BY table_name;

-- Target (run on target database)
SELECT '[table_name]' AS table_name, COUNT(*) AS row_count FROM [target_schema].[table_name]
UNION ALL
SELECT '[table_name_2]', COUNT(*) FROM [target_schema].[table_name_2]
-- ... repeat for all tables
ORDER BY table_name;
Expected result: every table has identical row counts.
对于目标中的每个外键,验证引用完整性:
sql
-- 验证是否存在孤立外键
SELECT COUNT(*) AS orphaned_rows
FROM [child_table] c
LEFT JOIN [parent_table] p ON c.[fk_column] = p.[pk_column]
WHERE c.[fk_column] IS NOT NULL AND p.[pk_column] IS NULL;
预期结果:每个外键关系的孤立行数为零。

5b. Data Checksum Validation

5d. 索引验证

Generate checksum queries for critical tables. Compare a hash of key columns between source and target:
PostgreSQL (source or target):
sql
SELECT
    MD5(string_agg(
        COALESCE(id::text, 'NULL') || '|' ||
        COALESCE(name, 'NULL') || '|' ||
        COALESCE(email, 'NULL') || '|' ||
        COALESCE(created_at::text, 'NULL'),
        ',' ORDER BY id
    )) AS table_checksum
FROM [schema].[table];
MySQL (source or target):
sql
SELECT
    MD5(GROUP_CONCAT(
        CONCAT_WS('|',
            COALESCE(id, 'NULL'),
            COALESCE(name, 'NULL'),
            COALESCE(email, 'NULL'),
            COALESCE(created_at, 'NULL')
        )
        ORDER BY id SEPARATOR ','
    )) AS table_checksum
FROM [table];
MongoDB (source or target):
javascript
// Hash all documents in a collection
var hash = db[collection].aggregate([
    { $sort: { _id: 1 } },
    { $group: {
        _id: null,
        docs: { $push: { $concat: [
            { $toString: "$_id" }, "|",
            { $ifNull: ["$name", "NULL"] }, "|",
            { $ifNull: ["$email", "NULL"] }
        ]}}
    }},
    { $project: {
        checksum: { $function: {
            body: function(arr) { return hex_md5(arr.join(",")); },
            args: ["$docs"],
            lang: "js"
        }}
    }}
]);
Expected result: checksums match between source and target for every validated table.
验证所有索引已创建:
PostgreSQL / Supabase:
sql
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = '[target_schema]'
ORDER BY tablename, indexname;
MySQL / PlanetScale:
sql
SELECT TABLE_NAME, INDEX_NAME, COLUMN_NAME, NON_UNIQUE
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;
将索引数量和名称与迁移脚本中的预期索引列表对比。

5c. Foreign Key Integrity Validation

5e. 触发器与存储过程验证

For every foreign key in the target, verify referential integrity:
sql
-- Verify no orphaned foreign keys
SELECT COUNT(*) AS orphaned_rows
FROM [child_table] c
LEFT JOIN [parent_table] p ON c.[fk_column] = p.[pk_column]
WHERE c.[fk_column] IS NOT NULL AND p.[pk_column] IS NULL;
Expected result: zero orphaned rows for every foreign key relationship.
验证所有触发器和存储过程已创建:
sql
-- PostgreSQL:验证触发器
SELECT trigger_name, event_object_table, action_timing, event_manipulation
FROM information_schema.triggers
WHERE trigger_schema = '[target_schema]';

-- PostgreSQL:验证函数/存储过程
SELECT routine_name, routine_type
FROM information_schema.routines
WHERE routine_schema = '[target_schema]';

-- MySQL:验证触发器
SELECT TRIGGER_NAME, EVENT_OBJECT_TABLE, ACTION_TIMING, EVENT_MANIPULATION
FROM information_schema.TRIGGERS
WHERE TRIGGER_SCHEMA = DATABASE();

-- MySQL:验证存储过程
SELECT ROUTINE_NAME, ROUTINE_TYPE
FROM information_schema.ROUTINES
WHERE ROUTINE_SCHEMA = DATABASE();

5d. Index Verification

5f. 样本数据抽查

Verify all indexes were created:
PostgreSQL / Supabase:
sql
SELECT indexname, indexdef
FROM pg_indexes
WHERE schemaname = '[target_schema]'
ORDER BY tablename, indexname;
MySQL / PlanetScale:
sql
SELECT TABLE_NAME, INDEX_NAME, COLUMN_NAME, NON_UNIQUE
FROM information_schema.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;
Compare the count and names against the expected index list from the migration scripts.
对于5个最大的表,生成查询对比特定行:
sql
-- 从源中选取10个随机ID,然后验证这些行在目标中存在且数据匹配
-- 源:
SELECT * FROM [table] WHERE id IN ([random_id_1], [random_id_2], ..., [random_id_10]) ORDER BY id;

-- 目标:
SELECT * FROM [table] WHERE id IN ([random_id_1], [random_id_2], ..., [random_id_10]) ORDER BY id;

5e. Trigger and Procedure Verification

步骤6:回滚计划

Verify all triggers and procedures were created:
sql
-- PostgreSQL: verify triggers
SELECT trigger_name, event_object_table, action_timing, event_manipulation
FROM information_schema.triggers
WHERE trigger_schema = '[target_schema]';

-- PostgreSQL: verify functions/procedures
SELECT routine_name, routine_type
FROM information_schema.routines
WHERE routine_schema = '[target_schema]';

-- MySQL: verify triggers
SELECT TRIGGER_NAME, EVENT_OBJECT_TABLE, ACTION_TIMING, EVENT_MANIPULATION
FROM information_schema.TRIGGERS
WHERE TRIGGER_SCHEMA = DATABASE();

-- MySQL: verify procedures
SELECT ROUTINE_NAME, ROUTINE_TYPE
FROM information_schema.ROUTINES
WHERE ROUTINE_SCHEMA = DATABASE();
每次迁移都必须有经过测试的回滚计划。为每个阶段生成回滚脚本。

5f. Sample Data Spot-Check

6a. Schema回滚

For the 5 largest tables, generate queries that compare specific rows:
sql
-- Pick 10 random IDs from source, then verify those exact rows exist in target with matching data
-- Source:
SELECT * FROM [table] WHERE id IN ([random_id_1], [random_id_2], ..., [random_id_10]) ORDER BY id;

-- Target:
SELECT * FROM [table] WHERE id IN ([random_id_1], [random_id_2], ..., [random_id_10]) ORDER BY id;
按反向拓扑顺序生成DROP语句:
sql
-- =============================================================================
-- 回滚脚本:删除所有迁移的对象
-- 必须按此顺序执行(反向依赖顺序)
-- =============================================================================

-- 先删除视图(依赖于表)
DROP VIEW IF EXISTS [target_schema].[view_name] CASCADE;

-- 删除触发器
DROP TRIGGER IF EXISTS [trigger_name] ON [target_schema].[table_name];

-- 删除函数和存储过程
DROP FUNCTION IF EXISTS [target_schema].[function_name]([arg_types]);
DROP PROCEDURE IF EXISTS [target_schema].[procedure_name]([arg_types]);

-- 按反向拓扑顺序删除表(子表先于父表)
DROP TABLE IF EXISTS [target_schema].[child_table] CASCADE;
DROP TABLE IF EXISTS [target_schema].[parent_table] CASCADE;

-- 删除序列(PostgreSQL)
DROP SEQUENCE IF EXISTS [target_schema].[sequence_name];

-- 删除自定义类型(PostgreSQL)
DROP TYPE IF EXISTS [target_schema].[type_name];

-- 如果迁移时创建了Schema,则删除它
DROP SCHEMA IF EXISTS [target_schema];

Step 6: Rollback Plan

6b. 数据回滚

Every migration must have a tested rollback plan. Generate rollback scripts for each phase.
如果迁移替换了现有数据库(而非全新目标):
  1. 迁移前备份:生成迁移开始前需运行的备份命令
  2. 从备份恢复:记录恢复流程
bash
undefined

6a. Schema Rollback

迁移前备份(PostgreSQL)

Generate DROP statements in reverse topological order:
sql
-- =============================================================================
-- ROLLBACK SCRIPT: Drop all migrated objects
-- Execute in this exact order (reverse dependency order)
-- =============================================================================

-- Drop views first (they depend on tables)
DROP VIEW IF EXISTS [target_schema].[view_name] CASCADE;

-- Drop triggers
DROP TRIGGER IF EXISTS [trigger_name] ON [target_schema].[table_name];

-- Drop functions and procedures
DROP FUNCTION IF EXISTS [target_schema].[function_name]([arg_types]);
DROP PROCEDURE IF EXISTS [target_schema].[procedure_name]([arg_types]);

-- Drop tables in reverse topological order (children before parents)
DROP TABLE IF EXISTS [target_schema].[child_table] CASCADE;
DROP TABLE IF EXISTS [target_schema].[parent_table] CASCADE;

-- Drop sequences (PostgreSQL)
DROP SEQUENCE IF EXISTS [target_schema].[sequence_name];

-- Drop custom types (PostgreSQL)
DROP TYPE IF EXISTS [target_schema].[type_name];

-- Drop schema if it was created for the migration
DROP SCHEMA IF EXISTS [target_schema];
pg_dump --format=custom --file=pre_migration_backup_$(date +%Y%m%d_%H%M%S).dump [dbname]

6b. Data Rollback

从备份恢复(PostgreSQL)

If the migration replaces an existing database (not a fresh target):
  1. Pre-migration backup: Generate a backup command to run BEFORE the migration starts
  2. Restore from backup: Document the restore procedure
bash
undefined
pg_restore --clean --if-exists --dbname=[dbname] pre_migration_backup_[timestamp].dump

Pre-migration backup (PostgreSQL)

迁移前备份(MySQL)

pg_dump --format=custom --file=pre_migration_backup_$(date +%Y%m%d_%H%M%S).dump [dbname]
mysqldump --single-transaction --routines --triggers --events [dbname] > pre_migration_backup_$(date +%Y%m%d_%H%M%S).sql

Restore from backup (PostgreSQL)

从备份恢复(MySQL)

pg_restore --clean --if-exists --dbname=[dbname] pre_migration_backup_[timestamp].dump
mysql [dbname] < pre_migration_backup_[timestamp].sql

Pre-migration backup (MySQL)

迁移前备份(MongoDB)

mysqldump --single-transaction --routines --triggers --events [dbname] > pre_migration_backup_$(date +%Y%m%d_%H%M%S).sql
mongodump --db=[dbname] --out=pre_migration_backup_$(date +%Y%m%d_%H%M%S)/

Restore from backup (MySQL)

从备份恢复(MongoDB)

mysql [dbname] < pre_migration_backup_[timestamp].sql
mongorestore --db=[dbname] --drop pre_migration_backup_[timestamp]/[dbname]/
undefined

Pre-migration backup (MongoDB)

6c. 应用回滚

mongodump --db=[dbname] --out=pre_migration_backup_$(date +%Y%m%d_%H%M%S)/
记录迁移回滚时需要的应用变更:
  • 恢复连接字符串
  • 恢复ORM/查询变更
  • 恢复环境变量变更
  • 恢复DNS / 连接池变更

Restore from backup (MongoDB)

步骤7:停机时间估算

mongorestore --db=[dbname] --drop pre_migration_backup_[timestamp]/[dbname]/
undefined
根据数据量和迁移方法计算预估停机时间。

6c. Application Rollback

7a. 停机时间影响因素

Document application changes needed if the migration is rolled back:
  • Connection string changes to revert
  • ORM/query changes to revert
  • Environment variable changes to revert
  • DNS / connection pooler changes to revert
因素对停机时间的影响
Schema DDL执行几秒到几分钟(大多数Schema可忽略)
从源数据库导出数据取决于数据量和磁盘I/O
数据传输(网络)取决于数据量和网络带宽
导入数据到目标数据库取决于数据量、索引和约束
创建索引对于大表可能耗时显著(几分钟到几小时)
验证查询行数统计需几分钟;大表校验和耗时更长
应用切换几秒(连接字符串变更)到几分钟(DNS传播)

Step 7: Downtime Estimation

7b. 估算公式

Calculate estimated downtime based on data volume and migration method.
预估停机时间 =
    Schema DDL执行:           ~每100张表1分钟
  + 数据导出:          ~每GB数据1分钟(SSD)或3分钟(HDD)
  + 数据传输:        data_size_gb / (network_bandwidth_mbps / 8 / 1024)
  + 数据导入:          ~每GB数据2分钟(无索引)或5分钟(有索引)
  + 创建索引:       ~每百万行索引1分钟
  + 约束验证: ~每百万行外键30秒
  + 验证:           ~每10张表2分钟
  + 应用切换:   ~5分钟(保守估计)
  + 缓冲时间(20%):         总时间 * 0.2

7a. Downtime Factors

7c. 减少停机时间的策略

FactorImpact on Downtime
Schema DDL executionSeconds to low minutes (negligible for most schemas)
Data export from sourceDependent on data volume and disk I/O
Data transfer (network)Dependent on data volume and network bandwidth
Data import to targetDependent on data volume, indexes, and constraints
Index creationCan be significant for large tables (minutes to hours)
Validation queriesMinutes for row counts; longer for checksums on large tables
Application switchoverSeconds (connection string change) to minutes (DNS propagation)
在迁移计划中记录这些选项:
  1. 预创建Schema:在维护窗口前创建所有表、索引和约束。仅在停机期间加载数据。
  2. 加载后创建索引:无索引加载数据,然后创建索引。整体更快,但索引需从头构建。
  3. 并行加载表:同时加载独立表(表之间无FK依赖)。
  4. 加载期间禁用约束:批量加载时关闭FK检查、唯一检查。加载完成后重新启用。
  5. 使用原生复制实现零停机:对于同引擎迁移(例如Postgres到Supabase),使用逻辑复制实时同步,然后切换。
  6. 双写阶段:过渡期间应用同时写入新旧数据库。实现复杂但可消除停机。

7b. Estimation Formula

步骤8:生成migration-plan.md

Estimated downtime =
    Schema DDL:           ~1 minute per 100 tables
  + Data export:          ~1 minute per GB (SSD) or ~3 minutes per GB (HDD)
  + Data transfer:        data_size_gb / (network_bandwidth_mbps / 8 / 1024)
  + Data import:          ~2 minutes per GB (without indexes) or ~5 minutes per GB (with indexes)
  + Index creation:       ~1 minute per index per million rows
  + Constraint validation: ~30 seconds per foreign key per million rows
  + Validation:           ~2 minutes per 10 tables
  + Application switch:   ~5 minutes (conservative)
  + Buffer (20%):         total * 0.2
编写最终交付文档。文档结构:
markdown
undefined

7c. Downtime Reduction Strategies

数据库迁移计划:[源服务商] 到 [目标服务商]

Document these options in the migration plan:
  1. Pre-create schema: Create all tables, indexes, and constraints before the maintenance window. Only data load happens during downtime.
  2. Create indexes after load: Load data without indexes, then create indexes. Faster overall but indexes are built from scratch.
  3. Parallel table loads: Load independent tables simultaneously (tables with no FK dependencies between them).
  4. Disable constraints during load: Turn off FK checks, unique checks during bulk load. Re-enable after.
  5. Use native replication for zero-downtime: For same-engine migrations (e.g., Postgres to Supabase), use logical replication to sync in real-time, then cut over.
  6. Dual-write period: Application writes to both old and new database during transition. Complex but eliminates downtime.
生成时间: [时间戳] : [服务商] [版本] 位于 [主机/连接信息] 目标: [服务商] [版本] 位于 [主机/连接信息] 总表数: [N] 预估总行数: [N] 预估总数据大小: [N GB] 预估停机时间: [N小时N分钟] 迁移策略: [离线 / 双写在线 / 基于复制]

Step 8: Generate migration-plan.md

目录

Database Migration Plan: [Source Provider] to [Target Provider]

执行摘要

Generated: [timestamp] Source: [provider] [version] at [host/connection] Target: [provider] [version] at [host/connection] Total tables: [N] Total estimated rows: [N] Total estimated data size: [N GB] Estimated downtime: [N hours N minutes] Migration strategy: [Offline / Online with dual-write / Replication-based]

[2-3段:迁移内容、原因、关键风险、数据量、预估停机时间、推荐策略以及需要关注的关键不兼容特性。]

Table of Contents

迁移范围

  • : [服务商、版本、主机、Schema/数据库名称]
  • 目标: [服务商、版本、主机、Schema/数据库名称]
  • 包含表: [N](列表或"全部")
  • 排除表: [列表(如有)及原因]
  • 数据范围: [全部 / 部分(例如最近90天)]
  • 包含触发器: [是/否 -- 数量]
  • 包含存储过程: [是/否 -- 数量]
  • 包含视图: [是/否 -- 数量]

Executive Summary

Schema清单

表摘要

[2-3 paragraphs: what is being migrated, why, key risks, data volume, estimated downtime, recommended strategy, and critical incompatibilities that require attention.]
#表名列数预估行数预估大小外键数索引数触发器数说明
1users1250,00015 MB031--
2orders181,200,000890 MB372最大表
...........................

Migration Scope

枚举与自定义类型

  • Source: [provider, version, host, schema/database name]
  • Target: [provider, version, host, schema/database name]
  • Tables included: [N] (list or "all")
  • Tables excluded: [list, if any, with reasons]
  • Data scope: [Full / Partial (e.g., last 90 days)]
  • Includes triggers: [Yes/No -- count]
  • Includes stored procedures: [Yes/No -- count]
  • Includes views: [Yes/No -- count]
[列出所有枚举、自定义类型及其转换策略]

Schema Inventory

序列

Table Summary

#Table NameColumnsRows (est.)Size (est.)Foreign KeysIndexesTriggersNotes
1users1250,00015 MB031--
2orders181,200,000890 MB372Largest table
...........................
[列出所有序列及其转换策略]

Enums and Custom Types

数据类型映射

[List all enums, custom types, and their translation strategy]
[表格展示每个列的源类型、目标类型及所需转换]
源类型目标类型转换操作风险
usersidUUIDCHAR(36)
usersmetadataJSONBJSON丢失GIN索引;添加生成列
orderstotalNUMERIC(12,4)DECIMAL(12,4)
..................

Sequences

不兼容特性与解决方案

[List all sequences and their translation strategy]
[列出所有无法直接转换的特性及推荐解决方案]
特性源行为目标限制解决方案
JSONB GIN索引原生索引JSON查询JSON类型无GIN索引添加生成列 + B-tree索引
数组列原生ARRAY类型无原生数组存储为JSON数组
............

Data Type Mapping

迁移脚本

[Table showing every column's source type, target type, and any transformation needed]
TableColumnSource TypeTarget TypeTransformationRisk
usersidUUIDCHAR(36)NoneLow
usersmetadataJSONBJSONLoses GIN index; add generated columnsMedium
orderstotalNUMERIC(12,4)DECIMAL(12,4)NoneLow
..................
[包含或引用生成的DDL脚本,按执行顺序组织]

Incompatibilities and Workarounds

表创建顺序

[List every feature that does not translate directly, with the recommended workaround]
FeatureSource BehaviorTarget LimitationWorkaround
JSONB GIN indexesNative indexed JSON queriesJSON type without GINAdd generated columns + B-tree indexes
Array columnsNative ARRAY typeNo native arraysStore as JSON array
............
  1. [无FK依赖的表]
  2. [无FK依赖的表]
  3. [依赖于#1的表]
  4. ...

Migration Scripts

DDL脚本

[Include or reference the generated DDL scripts, organized by execution order]
[完整的CREATE TABLE、INDEX、CONSTRAINT语句]

Table Creation Order

延迟约束

  1. [table with no FK dependencies]
  2. [table with no FK dependencies]
  3. [table depending on #1]
  4. ...
[ALTER TABLE语句用于循环外键,待所有表创建完成后执行]

DDL Scripts

数据迁移流程

[Full CREATE TABLE, INDEX, CONSTRAINT statements]
[分步数据导出、转换、加载说明]

Deferred Constraints

触发器与存储过程转换

[ALTER TABLE statements for cyclic foreign keys, to run after all tables exist]
[对于每个触发器和存储过程:原始源代码、转换后的目标代码、行为差异]

Data Migration Procedure

验证计划

行数检查

[Step-by-step data export, transform, load instructions]
[对比源和目标行数的查询]

Trigger and Procedure Translation

数据校验和检查

[For each trigger and procedure: original source code, translated target code, behavioral differences]
[关键表的校验和对比查询]

Validation Plan

引用完整性检查

Row Count Checks

[Queries to compare row counts between source and target]
[验证目标中外键完整性的查询]

Data Checksum Checks

索引验证

[Queries to compare checksums of critical tables]
[验证目标中所有索引存在的查询]

Referential Integrity Checks

样本数据抽查

[Queries to verify foreign key integrity in the target]
[源和目标之间对比的特定行]

Index Verification

回滚流程

完全回滚

[Queries to verify all indexes exist in the target]
[完整的回滚脚本以撤销迁移]

Sample Data Spot-Checks

部分回滚(按表)

[Specific rows to compare between source and target]
[单个表迁移失败时的回滚脚本]

Rollback Procedures

应用回滚

Full Rollback

[Complete rollback script to reverse the migration]
[恢复应用连接字符串和代码变更的步骤]

Partial Rollback (per-table)

备份与恢复

[Rollback scripts for individual tables if a single table fails]
[迁移前需运行的备份命令;恢复命令用于故障恢复]

Application Rollback

停机时间估算

[Steps to revert application connection strings and code changes]
阶段预估时长是否可在维护窗口前执行
预创建Schema[N分钟]
从源导出数据[N分钟]是(如果可接受数据过期)
数据传输[N分钟]取决于策略
导入数据到目标[N分钟]否(需要写锁)
创建索引[N分钟]导入后
执行验证[N分钟]导入后
应用切换[N分钟]最终步骤
总维护窗口[N小时N分钟]--
含缓冲时间(20%)总时长[N小时N分钟]--

Backup and Restore

风险评估

[Backup commands to run before migration; restore commands for recovery]
风险可能性影响缓解措施
数据类型精度丢失[低/中/高][低/中/高][缓解措施]
触发器行为差异[低/中/高][低/中/高][缓解措施]
应用查询不兼容[低/中/高][低/中/高][缓解措施]
停机时间超出预估[低/中/高][低/中/高][缓解措施]
部分迁移后需回滚[低/中/高][低/中/高][缓解措施]
连接池不兼容[低/中/高][低/中/高][缓解措施]

Downtime Estimate

迁移前检查清单

PhaseEstimated DurationCan Run Before Maintenance Window
Pre-create schema[N min]Yes
Export data from source[N min]Yes (if acceptable staleness)
Transfer data[N min]Depends on strategy
Import data to target[N min]No (requires write lock)
Create indexes[N min]After import
Run validation[N min]After import
Application switchover[N min]Final step
Total maintenance window[N hours N min]--
Total with buffer (20%)[N hours N min]--
  • 源数据库凭证已验证并测试
  • 目标数据库已配置并可访问
  • 目标数据库版本已确认兼容
  • 源与目标之间的网络连通性已验证
  • 导出机器有足够磁盘空间(推荐2倍数据大小)
  • 目标有足够磁盘空间(导入+索引构建需3倍数据大小)
  • 源数据库迁移前备份已完成
  • 目标数据库迁移前备份已完成(如果非空)
  • 所有团队成员已收到维护窗口通知
  • 应用部署管道已准备好更新连接字符串
  • 目标数据库已配置监控与告警
  • 回滚流程已在预演环境中审核并测试
  • 迁移脚本已在预演环境中使用类生产数据测试
  • 应用已在预演环境中针对目标数据库测试
  • DNS TTL已降低(如果使用DNS切换)

Risk Assessment

分步执行指南

维护窗口前(可提前执行)

RiskLikelihoodImpactMitigation
Data type precision loss[L/M/H][L/M/H][Mitigation]
Trigger behavior difference[L/M/H][L/M/H][Mitigation]
Application query incompatibility[L/M/H][L/M/H][Mitigation]
Downtime exceeds estimate[L/M/H][L/M/H][Mitigation]
Rollback needed after partial migration[L/M/H][L/M/H][Mitigation]
Connection pooler incompatibility[L/M/H][L/M/H][Mitigation]
  1. 运行源数据库迁移前备份
  2. 在目标数据库执行Schema DDL脚本(表、索引、类型、枚举)
  3. 验证Schema已正确创建(运行索引和约束验证查询)
  4. 测试应用与目标数据库的连接(只读)

Pre-Migration Checklist

维护窗口期间

  • Source database credentials verified and tested
  • Target database provisioned and accessible
  • Target database version confirmed compatible
  • Network connectivity between source and target verified
  • Sufficient disk space on export machine (2x data size recommended)
  • Sufficient disk space on target (3x data size for import + index building)
  • Pre-migration backup of source database completed
  • Pre-migration backup of target database completed (if not empty)
  • All team members notified of maintenance window
  • Application deployment pipeline ready for connection string update
  • Monitoring and alerting configured for target database
  • Rollback procedure reviewed and tested on staging
  • Migration scripts tested on staging environment with production-like data
  • Application tested against target database on staging
  • DNS TTL lowered (if using DNS-based switchover)
  1. 宣布维护窗口开始
  2. 将应用设置为维护模式/只读模式
  3. 验证源数据库无活跃写入
  4. 从源数据库导出数据
  5. 转换数据(如果需要)
  6. 禁用目标数据库的外键检查和触发器
  7. 导入数据到目标数据库
  8. 重新启用目标数据库的外键检查和触发器
  9. 重置序列/自增值
  10. 执行验证:行数统计
  11. 执行验证:关键表校验和
  12. 执行验证:外键完整性
  13. 执行验证:样本数据抽查
  14. 如果验证通过:更新应用连接字符串到目标数据库
  15. 如果验证失败:执行回滚流程,恢复源数据库
  16. 部署带有新连接字符串的应用
  17. 验证应用功能正常
  18. 监控错误率和查询性能30分钟
  19. 宣布维护窗口结束

Step-by-Step Execution Guide

维护窗口后

Before Maintenance Window (can be done in advance)

  1. Run pre-migration backup of source database
  2. Execute schema DDL scripts on target (tables, indexes, types, enums)
  3. Verify schema created correctly (run index and constraint verification queries)
  4. Test application connectivity to target database (read-only)
  1. 监控目标数据库性能24小时
  2. 对比查询性能(慢查询日志)与基线
  3. 验证所有定时任务和后台工作正常运行
  4. 保持源数据库运行(只读)7天作为安全保障
  5. 7天无问题后:停用源数据库
  6. 更新文档中的新连接信息
  7. 归档迁移脚本和计划作为审计记录

During Maintenance Window

迁移后验证

  1. Announce maintenance window start
  2. Set application to maintenance mode / read-only mode
  3. Verify no active writes to source database
  4. Export data from source database
  5. Transform data (if transformations needed)
  6. Disable foreign key checks and triggers on target
  7. Import data to target database
  8. Re-enable foreign key checks and triggers on target
  9. Reset sequences / auto-increment values
  10. Run validation: row counts
  11. Run validation: checksums on critical tables
  12. Run validation: foreign key integrity
  13. Run validation: sample data spot-checks
  14. If validation passes: update application connection string to target
  15. If validation fails: execute rollback procedure, restore source
  16. Deploy application with new connection string
  17. Verify application is functioning correctly
  18. Monitor error rates and query performance for 30 minutes
  19. Announce maintenance window end
  • 所有表已在目标中存在且Schema正确
  • 所有表的源和目标行数匹配
  • 关键表的校验和匹配
  • 所有索引已存在且可用
  • 所有外键已存在且有效(或已记录为应用层实现)
  • 所有触发器已存在且可用(或已记录为迁移到应用层)
  • 所有存储过程已存在且可用(或已记录为迁移到应用层)
  • 所有视图已存在且返回正确结果
  • 应用登录和认证正常
  • 应用CRUD操作正常
  • 应用搜索功能正常
  • 后台任务执行成功
  • API响应时间在可接受范围内
  • 错误率未上升
  • 监控仪表盘已更新为跟踪目标数据库

After Maintenance Window

所需应用变更

  1. Monitor target database performance for 24 hours
  2. Compare query performance (slow query log) against baseline
  3. Verify all scheduled jobs and background workers are functioning
  4. Keep source database running (read-only) for 7 days as safety net
  5. After 7 days with no issues: decommission source database
  6. Update documentation with new connection details
  7. Archive migration scripts and plan for audit trail
[列出所有适配新数据库所需的应用层变更]
变更文件/服务描述优先级
连接字符串.env / 配置更新DATABASE_URL到目标数据库关键
ORM方言db/config将方言从X改为Y关键
查询语法[文件列表]重写服务商特定查询
触发器逻辑[文件列表]将触发器逻辑迁移到应用中间件
存储过程调用[文件列表]将CALL/SELECT替换为应用函数
类型处理[文件列表]更新类型映射(例如布尔值处理)
undefined

Post-Migration Verification

边缘情况处理

超大表(1亿+行)

  • All tables present in target with correct schema
  • Row counts match between source and target for all tables
  • Checksums match for critical tables
  • All indexes present and functional
  • All foreign keys present and valid (or documented as application-level)
  • All triggers present and functional (or documented as moved to application)
  • All stored procedures present and functional (or documented as moved to application)
  • All views present and returning correct results
  • Application login and authentication working
  • Application CRUD operations working
  • Application search functionality working
  • Background jobs executing successfully
  • API response times within acceptable range
  • No increase in error rates
  • Monitoring dashboards updated to track target database
对于超过1亿行的表:
  1. 分块导出:使用
    LIMIT/OFFSET
    或基于范围的
    WHERE id BETWEEN x AND y
    ,按1000万-1亿行的批次导出。
  2. 并行导入:拆分数据文件,使用多个连接并行导入。
  3. 延迟创建索引:先创建无索引的表,加载数据后再创建索引。这比加载到已索引表快得多。
  4. 分区加载:如果目标支持分区,先创建分区,然后并行加载到每个分区。
  5. 进度跟踪:生成进度脚本,报告已加载行数与总行数。

Application Changes Required

需要数据转换的Schema差异

[List all application-level changes needed to work with the new database]
ChangeFile/ServiceDescriptionPriority
Connection string.env / configUpdate DATABASE_URL to targetCritical
ORM dialectdb/configChange dialect from X to YCritical
Query syntax[list files]Rewrite provider-specific queriesHigh
Trigger logic[list files]Move trigger logic to application middlewareHigh
Stored proc calls[list files]Replace CALL/SELECT with application functionsHigh
Type handling[list files]Update type mappings (e.g., boolean handling)Medium
undefined
当类型映射存在损失时:
  1. 精度损失:标记目标类型精度更低的列。例如:PostgreSQL NUMERIC(38,18)到MySQL DECIMAL(38,18)是无损的,但NUMERIC(100,50)超出MySQL DECIMAL(65,30)的限制。
  2. 编码问题:UTF-8 4字节字符(表情符号)在MySQL中需要使用
    utf8mb4
    ,而非
    utf8
  3. 时区处理:记录时间戳是存储为UTC还是本地时间,以及目标数据库如何处理时区转换。
  4. NULL与空字符串:PostgreSQL区分NULL和空字符串;部分应用可能不区分。

Handling Edge Cases

MongoDB文档扁平化(MongoDB到关系型)

Extremely Large Tables (100M+ rows)

For tables exceeding 100 million rows:
  1. Chunked export: Export in batches of 1-10 million rows using
    LIMIT/OFFSET
    or range-based
    WHERE id BETWEEN x AND y
    .
  2. Parallel import: Split data files and import in parallel using multiple connections.
  3. Deferred index creation: Create the table without indexes, load data, then create indexes. This is significantly faster than loading into an indexed table.
  4. Partitioned loading: If the target supports partitioning, create partitions first and load into each partition in parallel.
  5. Progress tracking: Generate a progress script that reports rows loaded vs total rows.
从MongoDB迁移到关系型数据库时:
  1. 顶层字段:直接映射到主表的列。
  2. 嵌入对象:两种策略:
    • 扁平化:使用对象路径作为列名前缀(例如
      address.street
      变为
      address_street
      )。适用于对象始终存在且Schema固定的情况。
    • 单独表:创建关联表并添加外键。适用于对象可选或Schema可变的情况。
  3. 嵌入数组:始终创建关联表或子表。每个数组元素成为一行。
  4. 多态文档:同一集合中存在不同结构的文档。选项:
    • 单表继承:一张宽表,包含每种文档结构的可空列。
    • 类表继承:每种文档结构对应一张单独的表,共享基础表。
    • 鉴别器列:单表,使用
      type
      列区分不同结构。
  5. 嵌套对象数组:需要递归扁平化为多个表,每层都有外键。

Schema Differences That Require Data Transformation

PlanetScale外键解决方案

When a type mapping is not lossless:
  1. Precision loss: Flag any column where the target type has less precision. Example: PostgreSQL NUMERIC(38,18) to MySQL DECIMAL(38,18) is lossless, but NUMERIC(100,50) exceeds MySQL's limit of DECIMAL(65,30).
  2. Encoding issues: UTF-8 4-byte characters (emojis) require
    utf8mb4
    in MySQL, not
    utf8
    .
  3. Timezone handling: Document whether timestamps are stored as UTC or local time, and how the target handles timezone conversion.
  4. NULL vs empty string: PostgreSQL distinguishes NULL from empty string; some applications may not.
由于PlanetScale不支持数据库级外键:
  1. 记录所有关系:在迁移计划中记录为应用层约束。
  2. 生成应用层验证代码(例如带有
    @relation
    的Prisma Schema,或自定义中间件)。
  3. 生成孤立检测查询并定期运行:
    sql
    -- 检查孤立子行(定期运行)
    SELECT c.id, c.[fk_column]
    FROM [child_table] c
    LEFT JOIN [parent_table] p ON c.[fk_column] = p.id
    WHERE c.[fk_column] IS NOT NULL AND p.id IS NULL;
  4. 记录必须在应用层实现的级联删除逻辑

MongoDB Document Flattening (MongoDB to Relational)

Supabase特定迁移注意事项

When migrating from MongoDB to a relational database:
  1. Top-level fields: Map directly to columns in a primary table.
  2. Embedded objects: Two strategies:
    • Flatten: Prefix column names with the object path (e.g.,
      address.street
      becomes
      address_street
      ). Use when the object is always present and has a fixed schema.
    • Separate table: Create a related table with a foreign key. Use when the object is optional or has variable schema.
  3. Embedded arrays: Always create a junction or child table. Each array element becomes a row.
  4. Polymorphic documents: Documents in the same collection with different shapes. Options:
    • Single Table Inheritance: One wide table with nullable columns for each document shape.
    • Class Table Inheritance: Separate tables per document shape with a shared base table.
    • Discriminator column: Single table with a
      type
      column to distinguish shapes.
  5. Nested arrays of objects: Requires recursive flattening into multiple tables with foreign keys at each level.
迁移到Supabase时:
  1. 行级安全(RLS):根据应用授权模型生成RLS策略。记录所有通过Supabase API暴露的表必须启用RLS。
  2. 实时订阅:识别需要实时功能的表并添加到Supabase发布中。
  3. 存储桶:如果源数据库存储文件引用,映射到Supabase Storage。
  4. 边缘函数:如果存储过程包含可通过API调用的逻辑,建议迁移到Supabase Edge Functions。
  5. Auth集成:如果源有用户表,记录如何与Supabase Auth集成。

PlanetScale Foreign Key Workarounds

多Schema或多数据库迁移

Since PlanetScale does not support database-level foreign keys:
  1. Document all relationships in the migration plan as application-level constraints.
  2. Generate application-level validation code (e.g., Prisma schema with
    @relation
    , or custom middleware).
  3. Generate orphan detection queries to run periodically:
    sql
    -- Check for orphaned child rows (run periodically)
    SELECT c.id, c.[fk_column]
    FROM [child_table] c
    LEFT JOIN [parent_table] p ON c.[fk_column] = p.id
    WHERE c.[fk_column] IS NOT NULL AND p.id IS NULL;
  4. Document cascade delete logic that must be implemented in the application.
当源有多个Schema或数据库时:
  1. 映射源Schema到目标Schema(如果目标支持多Schema)。
  2. 合并Schema:如果目标是单Schema(例如PlanetScale),在表名前添加Schema前缀。
  3. 跨Schema引用:识别并记录所有跨Schema外键。这些可能需要特殊处理。
  4. Schema级权限:记录源与目标之间的权限差异。

Supabase-Specific Migration Considerations

迁移计划质量检查清单

When migrating to Supabase:
  1. Row Level Security (RLS): Generate RLS policies based on the application's authorization model. Document that RLS must be enabled on all tables exposed via the Supabase API.
  2. Realtime subscriptions: Identify tables that need realtime and add them to the Supabase publication.
  3. Storage buckets: If the source database stores file references, map them to Supabase Storage.
  4. Edge Functions: If stored procedures contain API-callable logic, recommend migrating to Supabase Edge Functions.
  5. Auth integration: If the source has a users table, document how to integrate with Supabase Auth.
交付计划前,验证:
  • 每个源表都已在Schema清单中记录
  • 每个列都已记录类型映射
  • 每个外键都已分配创建顺序
  • 每个索引都已包含在DDL脚本中
  • 每个触发器都已转换或记录为需要迁移到应用层
  • 每个存储过程都已转换或记录为需要迁移到应用层
  • 每个视图都已转换
  • 数据类型不兼容特性已标记并提供解决方案
  • 每个表都已生成行数验证查询
  • 关键表都已生成校验和验证查询
  • 回滚脚本完整且已测试
  • 停机时间估算已包含所有阶段
  • 迁移前和迁移后检查清单已包含
  • 应用变更已记录
  • 分步执行指南可操作——DBA无需额外上下文即可遵循

Multi-Schema or Multi-Database Migration

When the source has multiple schemas or databases:
  1. Map source schemas to target schemas (if the target supports multiple schemas).
  2. Merge schemas: If the target is single-schema (e.g., PlanetScale), prefix table names with the schema name.
  3. Cross-schema references: Identify and document all cross-schema foreign keys. These may need special handling.
  4. Schema-level permissions: Document permission differences between source and target.

Quality Checklist for the Migration Plan

Before delivering the plan, verify:
  • Every source table is accounted for in the schema inventory
  • Every column has a type mapping documented
  • Every foreign key has a creation order assigned
  • Every index is included in the DDL scripts
  • Every trigger is translated or documented as needing application-level migration
  • Every stored procedure is translated or documented as needing application-level migration
  • Every view is translated
  • Data type incompatibilities are flagged with workarounds
  • Row count validation queries are generated for every table
  • Checksum validation queries are generated for critical tables
  • Rollback scripts are complete and tested
  • Downtime estimate accounts for all phases
  • Pre-migration and post-migration checklists are included
  • Application changes are documented
  • The step-by-step execution guide is actionable -- a DBA can follow it without additional context