alembic
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAlembic Database Migration Management Skill
Alembic数据库迁移管理Skill
Overview
概述
This skill provides comprehensive guidance for managing database migrations using Alembic in customer support environments. It covers everything from initial setup through complex production deployment scenarios, with a focus on maintaining data integrity and minimizing downtime for support operations.
本Skill为客户支持环境中使用Alembic管理数据库迁移提供全面指导,涵盖从初始设置到复杂生产部署场景的所有内容,重点在于维护数据完整性并最小化支持业务的停机时间。
Core Concepts
核心概念
What is Alembic?
什么是Alembic?
Alembic is a lightweight database migration tool for use with SQLAlchemy. It provides a way to manage changes to your database schema over time through version-controlled migration scripts. For customer support systems, this means:
- Version Control: Track all schema changes in your support database
- Reproducibility: Apply the same migrations across dev, staging, and production
- Rollback Capability: Safely revert problematic changes
- Team Collaboration: Merge schema changes from multiple developers
- Data Preservation: Migrate data during schema transformations
Alembic是一款搭配SQLAlchemy使用的轻量级数据库迁移工具。它通过版本控制的迁移脚本,提供了随时间管理数据库架构变更的方式。对于客户支持系统而言,这意味着:
- 版本控制:追踪支持数据库中的所有架构变更
- 可复现性:在开发、预发布和生产环境中应用相同的迁移
- 回滚能力:安全回滚有问题的变更
- 团队协作:合并来自多名开发者的架构变更
- 数据保留:在架构转换过程中迁移数据
Migration Lifecycle in Support Systems
支持系统中的迁移生命周期
- Development: Create migrations locally while developing new features
- Testing: Validate migrations in staging environment
- Review: Code review migration scripts before production
- Deployment: Apply migrations to production with minimal downtime
- Monitoring: Track migration status and handle failures
- Rollback: Revert if issues arise in production
- 开发阶段:在开发新功能时本地创建迁移
- 测试阶段:在预发布环境中验证迁移
- 评审阶段:在生产部署前对迁移脚本进行代码评审
- 部署阶段:以最小停机时间将迁移应用到生产环境
- 监控阶段:追踪迁移状态并处理故障
- 回滚阶段:若生产环境出现问题则执行回滚
Installation and Initial Setup
安装与初始设置
Installing Alembic
安装Alembic
bash
undefinedbash
undefinedInstall Alembic with PostgreSQL support
Install Alembic with PostgreSQL support
pip install alembic psycopg2-binary sqlalchemy
pip install alembic psycopg2-binary sqlalchemy
Or add to requirements.txt
Or add to requirements.txt
alembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
undefinedalembic>=1.13.0
sqlalchemy>=2.0.0
psycopg2-binary>=2.9.0
undefinedInitialize Alembic in Your Project
在项目中初始化Alembic
bash
undefinedbash
undefinedInitialize Alembic (creates alembic/ directory and alembic.ini)
Initialize Alembic (creates alembic/ directory and alembic.ini)
alembic init alembic
alembic init alembic
For multiple database support
For multiple database support
alembic init --template multidb alembic
This creates:
- `alembic/`: Directory containing migration scripts
- `alembic/versions/`: Where individual migration files live
- `alembic/env.py`: Migration environment configuration
- `alembic.ini`: Alembic configuration filealembic init --template multidb alembic
此操作会创建:
- `alembic/`: 存放迁移脚本的目录
- `alembic/versions/`: 单个迁移文件的存储位置
- `alembic/env.py`: 迁移环境配置文件
- `alembic.ini`: Alembic配置文件Configure Database Connection
配置数据库连接
Edit to set your database URL:
alembic.iniini
undefined编辑设置数据库URL:
alembic.iniini
undefinedFor development
For development
sqlalchemy.url = postgresql://user:password@localhost/support_dev
sqlalchemy.url = postgresql://user:password@localhost/support_dev
For production (use environment variables)
For production (use environment variables)
sqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
Better approach - use environment variables in `env.py`:
```python
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import contextsqlalchemy.url = postgresql://%(DB_USER)s:%(DB_PASSWORD)s@%(DB_HOST)s/%(DB_NAME)s
更优方案 - 在`env.py`中使用环境变量:
```python
import os
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import contextImport your models
Import your models
from myapp.models import Base
from myapp.models import Base
This is the Alembic Config object
This is the Alembic Config object
config = context.config
config = context.config
Override sqlalchemy.url from environment
Override sqlalchemy.url from environment
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost/support_dev')
config.set_main_option('sqlalchemy.url', db_url)
Set up target metadata for autogenerate
Set up target metadata for autogenerate
target_metadata = Base.metadata
undefinedtarget_metadata = Base.metadata
undefinedCreating Migrations
创建迁移
Manual Migration Creation
手动创建迁移
Create a migration manually when you need precise control:
bash
undefined当需要精确控制时,手动创建迁移:
bash
undefinedCreate empty migration file
Create empty migration file
alembic revision -m "add ticket priority column"
This generates a file like `versions/abc123_add_ticket_priority_column.py`:
```python
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as saalembic revision -m "add ticket priority column"
这会生成类似`versions/abc123_add_ticket_priority_column.py`的文件:
```python
"""add ticket priority column
Revision ID: abc123
Revises: def456
Create Date: 2025-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sarevision identifiers
revision identifiers
revision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')undefinedrevision = 'abc123'
down_revision = 'def456'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Add priority column to tickets table
op.add_column('tickets',
sa.Column('priority', sa.String(20), nullable=True, server_default='normal')
)
# Create index for performance
op.create_index('ix_tickets_priority', 'tickets', ['priority'])def downgrade() -> None:
# Remove index first
op.drop_index('ix_tickets_priority', 'tickets')
# Remove column
op.drop_column('tickets', 'priority')undefinedAutogenerate Migrations
自动生成迁移
Let Alembic detect schema changes automatically:
bash
undefined让Alembic自动检测架构变更:
bash
undefinedGenerate migration by comparing models to database
Generate migration by comparing models to database
alembic revision --autogenerate -m "add customer satisfaction table"
**Important**: Always review autogenerated migrations! They may miss:
- Renamed columns (appears as drop + add)
- Changed column types requiring data conversion
- Complex constraints
- Data migrations
Example autogenerated migration:
```python
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')alembic revision --autogenerate -m "add customer satisfaction table"
**重要提示**:务必评审自动生成的迁移!它们可能会遗漏:
- 重命名的列(显示为删除+添加)
- 需要数据转换的列类型变更
- 复杂约束
- 数据迁移
自动生成的迁移示例:
```python
"""add customer satisfaction table
Revision ID: xyz789
Revises: abc123
Create Date: 2025-01-15 11:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'xyz789'
down_revision = 'abc123'
branch_labels = None
depends_on = None
def upgrade() -> None:
# Auto-generated - review before running!
op.create_table(
'customer_satisfaction',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('ticket_id', sa.Integer(), nullable=False),
sa.Column('rating', sa.Integer(), nullable=False),
sa.Column('feedback', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(['ticket_id'], ['tickets.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_satisfaction_ticket_id', 'customer_satisfaction', ['ticket_id'])
op.create_index('ix_satisfaction_created_at', 'customer_satisfaction', ['created_at'])
def downgrade() -> None:
op.drop_index('ix_satisfaction_created_at', 'customer_satisfaction')
op.drop_index('ix_satisfaction_ticket_id', 'customer_satisfaction')
op.drop_table('customer_satisfaction')Data Migrations
数据迁移
Migrating Data During Schema Changes
架构变更期间迁移数据
When you need to transform existing data:
python
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')当需要转换现有数据时:
python
"""convert ticket status to new enum
Revision ID: data001
Revises: xyz789
Create Date: 2025-01-15 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column
revision = 'data001'
down_revision = 'xyz789'
def upgrade() -> None:
# Create new status column
op.add_column('tickets',
sa.Column('status_new', sa.String(50), nullable=True)
)
# Migrate data using bulk update
tickets = table('tickets',
column('status', sa.String),
column('status_new', sa.String)
)
# Map old statuses to new ones
status_mapping = {
'open': 'OPEN',
'in_progress': 'IN_PROGRESS',
'pending': 'WAITING_ON_CUSTOMER',
'resolved': 'RESOLVED',
'closed': 'CLOSED'
}
connection = op.get_bind()
for old_status, new_status in status_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == old_status
).values(status_new=new_status)
)
# Make new column non-nullable now that data is migrated
op.alter_column('tickets', 'status_new', nullable=False)
# Drop old column and rename new one
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_new', new_column_name='status')
def downgrade() -> None:
# Reverse the migration
op.add_column('tickets',
sa.Column('status_old', sa.String(50), nullable=True)
)
tickets = table('tickets',
column('status', sa.String),
column('status_old', sa.String)
)
# Reverse mapping
reverse_mapping = {
'OPEN': 'open',
'IN_PROGRESS': 'in_progress',
'WAITING_ON_CUSTOMER': 'pending',
'RESOLVED': 'resolved',
'CLOSED': 'closed'
}
connection = op.get_bind()
for new_status, old_status in reverse_mapping.items():
connection.execute(
tickets.update().where(
tickets.c.status == new_status
).values(status_old=old_status)
)
op.alter_column('tickets', 'status_old', nullable=False)
op.drop_column('tickets', 'status')
op.alter_column('tickets', 'status_old', new_column_name='status')Large Data Migrations with Batching
批量处理的大数据迁移
For large tables, process data in batches:
python
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')对于大型表,分批处理数据:
python
"""add computed resolution time to tickets
Revision ID: data002
Revises: data001
Create Date: 2025-01-15 13:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.sql import table, column, select
revision = 'data002'
down_revision = 'data001'
def upgrade() -> None:
# Add new column
op.add_column('tickets',
sa.Column('resolution_time_seconds', sa.Integer(), nullable=True)
)
connection = op.get_bind()
tickets = table('tickets',
column('id', sa.Integer),
column('created_at', sa.DateTime),
column('resolved_at', sa.DateTime),
column('resolution_time_seconds', sa.Integer)
)
# Process in batches to avoid memory issues
batch_size = 1000
offset = 0
while True:
# Get batch of tickets that need processing
batch = connection.execute(
select(
tickets.c.id,
tickets.c.created_at,
tickets.c.resolved_at
).where(
sa.and_(
tickets.c.resolved_at.isnot(None),
tickets.c.resolution_time_seconds.is_(None)
)
).limit(batch_size).offset(offset)
).fetchall()
if not batch:
break
# Update batch
for row in batch:
if row.resolved_at and row.created_at:
resolution_time = (row.resolved_at - row.created_at).total_seconds()
connection.execute(
tickets.update().where(
tickets.c.id == row.id
).values(resolution_time_seconds=int(resolution_time))
)
offset += batch_size
# Now make column non-nullable for future rows
op.alter_column('tickets', 'resolution_time_seconds',
nullable=False, server_default='0')
def downgrade() -> None:
op.drop_column('tickets', 'resolution_time_seconds')Running Migrations
执行迁移
Upgrade Database to Latest
将数据库升级到最新版本
bash
undefinedbash
undefinedUpgrade to latest revision (head)
Upgrade to latest revision (head)
alembic upgrade head
alembic upgrade head
See what would be executed (SQL only, don't run)
See what would be executed (SQL only, don't run)
alembic upgrade head --sql
alembic upgrade head --sql
Upgrade one step at a time
Upgrade one step at a time
alembic upgrade +1
alembic upgrade +1
Upgrade to specific revision
Upgrade to specific revision
alembic upgrade abc123
undefinedalembic upgrade abc123
undefinedDowngrade Database
回滚数据库
bash
undefinedbash
undefinedDowngrade one revision
Downgrade one revision
alembic downgrade -1
alembic downgrade -1
Downgrade to specific revision
Downgrade to specific revision
alembic downgrade abc123
alembic downgrade abc123
Downgrade to base (empty database)
Downgrade to base (empty database)
alembic downgrade base
alembic downgrade base
Generate SQL for downgrade without executing
Generate SQL for downgrade without executing
alembic downgrade -1 --sql
undefinedalembic downgrade -1 --sql
undefinedCheck Current Status
查看当前状态
bash
undefinedbash
undefinedShow current database revision
Show current database revision
alembic current
alembic current
Show current revision with details
Show current revision with details
alembic current --verbose
alembic current --verbose
Show migration history
Show migration history
alembic history
alembic history
Show history with current revision marked
Show history with current revision marked
alembic history --indicate-current
alembic history --indicate-current
Show specific revision range
Show specific revision range
alembic history -r base:head
undefinedalembic history -r base:head
undefinedBranching and Merging
分支与合并
Why Branch Migrations?
为什么要对迁移进行分支?
In customer support systems, you might have:
- Feature branches: New features developed in parallel
- Hotfix branches: Urgent fixes that can't wait for feature completion
- Team branches: Multiple teams working on different modules
在客户支持系统中,你可能会遇到以下场景:
- 功能分支:并行开发的新功能
- 热修复分支:无法等待功能完成的紧急修复
- 团队分支:多个团队在不同模块上工作
Creating a Branch
创建分支
bash
undefinedbash
undefinedCreate base for new branch
Create base for new branch
alembic revision -m "create reporting branch"
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting
alembic revision -m "create reporting branch"
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting
--head=base
--branch-label=reporting
--version-path=alembic/versions/reporting
Add migration to specific branch
Add migration to specific branch
alembic revision -m "add report tables"
--head=reporting@head
--head=reporting@head
Example branch structure:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
undefinedalembic revision -m "add report tables"
--head=reporting@head
--head=reporting@head
分支结构示例:
base
├── main branch
│ ├── abc123: initial schema
│ ├── def456: add tickets
│ └── ghi789: add users
└── reporting branch
├── rep001: create reports table
└── rep002: add scheduled reports
undefinedWorking with Multiple Branches
多分支协作
bash
undefinedbash
undefinedShow all branch heads
Show all branch heads
alembic heads
alembic heads
Show branch points
Show branch points
alembic branches
alembic branches
Upgrade specific branch
Upgrade specific branch
alembic upgrade reporting@head
alembic upgrade reporting@head
Upgrade all branches
Upgrade all branches
alembic upgrade heads
undefinedalembic upgrade heads
undefinedMerging Branches
合并分支
When features are ready to merge:
bash
undefined当功能开发完成准备合并时:
bash
undefinedMerge two branches
Merge two branches
alembic merge -m "merge reporting into main"
main@head reporting@head
main@head reporting@head
Generated merge migration:
```python
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
passalembic merge -m "merge reporting into main"
main@head reporting@head
main@head reporting@head
生成的合并迁移:
```python
"""merge reporting into main
Revision ID: merge001
Revises: ghi789, rep002
Create Date: 2025-01-15 14:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'merge001'
down_revision = ('ghi789', 'rep002') # Multiple parents
branch_labels = None
depends_on = None
def upgrade() -> None:
# Usually empty for simple merges
# Add code if you need to reconcile conflicting changes
pass
def downgrade() -> None:
passCross-Branch Dependencies
跨分支依赖
When one branch depends on another:
bash
undefined当一个分支依赖另一个分支时:
bash
undefinedCreate migration that depends on specific revision from another branch
Create migration that depends on specific revision from another branch
alembic revision -m "reporting needs user table"
--head=reporting@head
--depends-on=def456 # Revision from main branch
--head=reporting@head
--depends-on=def456 # Revision from main branch
undefinedalembic revision -m "reporting needs user table"
--head=reporting@head
--depends-on=def456 # Revision from main branch
--head=reporting@head
--depends-on=def456 # Revision from main branch
undefinedTesting Migrations
迁移测试
Unit Testing Migrations
单元测试迁移
python
undefinedpython
undefinedtests/test_migrations.py
tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tablesdef test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == strdef test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errorsdef test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()undefinedimport pytest
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
@pytest.fixture
def alembic_config():
"""Provide Alembic configuration for testing"""
config = Config("alembic.ini")
config.set_main_option(
"sqlalchemy.url",
"postgresql://localhost/support_test"
)
return config
@pytest.fixture
def test_db(alembic_config):
"""Create test database and apply migrations"""
# Create engine
engine = create_engine(
alembic_config.get_main_option("sqlalchemy.url")
)
# Run migrations to head
command.upgrade(alembic_config, "head")
yield engine
# Cleanup - downgrade to base
command.downgrade(alembic_config, "base")
engine.dispose()def test_migration_creates_tickets_table(test_db):
"""Test that migrations create expected tables"""
inspector = inspect(test_db)
tables = inspector.get_table_names()
assert 'tickets' in tables
assert 'users' in tables
assert 'customer_satisfaction' in tablesdef test_tickets_table_structure(test_db):
"""Test ticket table has correct columns"""
inspector = inspect(test_db)
columns = {col['name']: col for col in inspector.get_columns('tickets')}
assert 'id' in columns
assert 'priority' in columns
assert 'status' in columns
assert 'created_at' in columns
assert 'resolution_time_seconds' in columns
# Check column types
assert columns['priority']['type'].python_type == str
assert columns['status']['type'].python_type == strdef test_migration_upgrade_downgrade_cycle(alembic_config):
"""Test that upgrade -> downgrade -> upgrade works"""
# Start at base
command.downgrade(alembic_config, "base")
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade one step
command.downgrade(alembic_config, "-1")
# Upgrade back to head
command.upgrade(alembic_config, "head")
# Should complete without errorsdef test_data_migration_preserves_data(test_db):
"""Test that data migrations don't lose data"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Insert test data
ticket = Ticket(
title="Test ticket",
status="OPEN",
priority="high"
)
session.add(ticket)
session.commit()
ticket_id = ticket.id
session.close()
# Run a migration that modifies tickets table
# (This would be a specific revision)
# command.upgrade(alembic_config, "specific_revision")
# Verify data still exists
session = Session()
retrieved = session.query(Ticket).filter_by(id=ticket_id).first()
assert retrieved is not None
assert retrieved.title == "Test ticket"
session.close()undefinedIntegration Testing
集成测试
python
undefinedpython
undefinedtests/test_migration_integration.py
tests/test_migration_integration.py
import pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not Nonedef test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
alembic checkundefinedimport pytest
from alembic import command
from alembic.config import Config
from alembic.script import ScriptDirectory
from alembic.runtime.migration import MigrationContext
def test_no_pending_migrations(alembic_config, test_db):
"""Ensure all migrations are applied in test environment"""
script = ScriptDirectory.from_config(alembic_config)
with test_db.connect() as connection:
context = MigrationContext.configure(connection)
current_heads = set(context.get_current_heads())
script_heads = set(script.get_heads())
assert current_heads == script_heads, \
f"Database has pending migrations. Current: {current_heads}, Expected: {script_heads}"def test_migration_order_is_valid(alembic_config):
"""Verify migration chain has no gaps or conflicts"""
script = ScriptDirectory.from_config(alembic_config)
# Get all revisions
revisions = list(script.walk_revisions())
# Check each revision has valid down_revision
for revision in revisions:
if revision.down_revision is not None:
if isinstance(revision.down_revision, tuple):
# Merge point
for down_rev in revision.down_revision:
assert script.get_revision(down_rev) is not None
else:
assert script.get_revision(revision.down_revision) is not Nonedef test_check_command_detects_drift(alembic_config, test_db):
"""Test that check command detects schema drift"""
# This test verifies that works correctly
try:
command.check(alembic_config)
# If no exception, database matches models
assert True
except Exception as e:
# If exception, there's drift between DB and models
pytest.fail(f"Schema drift detected: {e}")
alembic checkundefinedTesting Migration Performance
迁移性能测试
python
undefinedpython
undefinedtests/test_migration_performance.py
tests/test_migration_performance.py
import time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"undefinedimport time
import pytest
from alembic import command
def test_migration_completes_within_time_limit(alembic_config):
"""Ensure migrations complete within acceptable time"""
# Downgrade to base
command.downgrade(alembic_config, "base")
# Time the upgrade
start = time.time()
command.upgrade(alembic_config, "head")
duration = time.time() - start
# Assert completes within 60 seconds
assert duration < 60, f"Migration took {duration}s, exceeds 60s limit"@pytest.mark.slow
def test_data_migration_with_large_dataset(alembic_config, test_db):
"""Test data migration performance with realistic data volume"""
from sqlalchemy.orm import sessionmaker
from myapp.models import Ticket
Session = sessionmaker(bind=test_db)
session = Session()
# Create 10,000 test tickets
tickets = [
Ticket(
title=f"Test ticket {i}",
status="OPEN",
priority="normal"
)
for i in range(10000)
]
session.bulk_save_objects(tickets)
session.commit()
session.close()
# Run data migration and measure time
start = time.time()
command.upgrade(alembic_config, "data002") # Specific data migration
duration = time.time() - start
# Should process 10k records in reasonable time
assert duration < 30, f"Data migration took {duration}s for 10k records"undefinedCI/CD Integration
CI/CD集成
GitHub Actions Workflow
GitHub Actions工作流
yaml
undefinedyaml
undefined.github/workflows/migrations.yml
.github/workflows/migrations.yml
name: Database Migrations
on:
pull_request:
paths:
- 'alembic/versions/'
- 'myapp/models/'
- 'alembic.ini'
- 'alembic/env.py'
push:
branches:
- main
- develop
jobs:
test-migrations:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: support_test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run migration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
# Test upgrade to head
alembic upgrade head
# Test downgrade to base
alembic downgrade base
# Test upgrade again
alembic upgrade head
# Run pytest for migration tests
pytest tests/test_migrations.py -v
- name: Check for schema drift
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
alembic check
- name: Validate migration history
run: |
# Check for multiple heads (should be only one)
HEADS_COUNT=$(alembic heads | wc -l)
if [ "$HEADS_COUNT" -gt 1 ]; then
echo "ERROR: Multiple heads detected. Please merge branches."
alembic heads
exit 1
fireview-migration-sql:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Generate SQL for review
run: |
# Generate SQL without executing
alembic upgrade head --sql > migration.sql
- name: Upload SQL artifact
uses: actions/upload-artifact@v3
with:
name: migration-sql
path: migration.sql
- name: Comment PR with SQL
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const sql = fs.readFileSync('migration.sql', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
});undefinedname: Database Migrations
on:
pull_request:
paths:
- 'alembic/versions/'
- 'myapp/models/'
- 'alembic.ini'
- 'alembic/env.py'
push:
branches:
- main
- develop
jobs:
test-migrations:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: support_test
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run migration tests
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
# Test upgrade to head
alembic upgrade head
# Test downgrade to base
alembic downgrade base
# Test upgrade again
alembic upgrade head
# Run pytest for migration tests
pytest tests/test_migrations.py -v
- name: Check for schema drift
env:
DATABASE_URL: postgresql://postgres:postgres@localhost/support_test
run: |
alembic check
- name: Validate migration history
run: |
# Check for multiple heads (should be only one)
HEADS_COUNT=$(alembic heads | wc -l)
if [ "$HEADS_COUNT" -gt 1 ]; then
echo "ERROR: Multiple heads detected. Please merge branches."
alembic heads
exit 1
fireview-migration-sql:
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Generate SQL for review
run: |
# Generate SQL without executing
alembic upgrade head --sql > migration.sql
- name: Upload SQL artifact
uses: actions/upload-artifact@v3
with:
name: migration-sql
path: migration.sql
- name: Comment PR with SQL
uses: actions/github-script@v6
with:
script: |
const fs = require('fs');
const sql = fs.readFileSync('migration.sql', 'utf8');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Migration SQL\n\n\`\`\`sql\n${sql}\n\`\`\``
});undefinedDeployment Script
部署脚本
bash
#!/bin/bashbash
#!/bin/bashscripts/deploy_migrations.sh
scripts/deploy_migrations.sh
set -e # Exit on error
echo "Starting database migration deployment..."
set -e # Exit on error
echo "Starting database migration deployment..."
Environment variables
Environment variables
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
DB_HOST="${DB_HOST:-localhost}"
DB_NAME="${DB_NAME:-support_prod}"
DB_USER="${DB_USER:-postgres}"
DATABASE_URL="postgresql://${DB_USER}:${DB_PASSWORD}@${DB_HOST}/${DB_NAME}"
Configuration
Configuration
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
BACKUP_DIR="./backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="${BACKUP_DIR}/pre_migration_${TIMESTAMP}.sql"
Create backup directory
Create backup directory
mkdir -p "$BACKUP_DIR"
mkdir -p "$BACKUP_DIR"
1. Backup database before migration
1. Backup database before migration
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"
echo "Creating database backup..."
pg_dump "$DATABASE_URL" > "$BACKUP_FILE"
echo "Backup created: $BACKUP_FILE"
2. Check current migration status
2. Check current migration status
echo "Current migration status:"
alembic current
echo "Current migration status:"
alembic current
3. Show pending migrations
3. Show pending migrations
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"
echo "Pending migrations:"
alembic history --verbose | grep -A 5 "head"
4. Run migrations with timeout
4. Run migrations with timeout
echo "Running migrations..."
timeout 300 alembic upgrade head || {
echo "ERROR: Migration failed or timed out!"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
}
echo "Running migrations..."
timeout 300 alembic upgrade head || {
echo "ERROR: Migration failed or timed out!"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
}
5. Verify migration success
5. Verify migration success
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
fi
echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"
echo "Verifying migration status..."
CURRENT_REV=$(alembic current | grep "Rev:" | awk '{print $2}')
HEAD_REV=$(alembic heads | awk '{print $1}')
if [ "$CURRENT_REV" != "$HEAD_REV" ]; then
echo "ERROR: Migration incomplete. Current: $CURRENT_REV, Expected: $HEAD_REV"
echo "Restoring from backup..."
psql "$DATABASE_URL" < "$BACKUP_FILE"
exit 1
fi
echo "Migration completed successfully!"
echo "Current revision: $CURRENT_REV"
6. Cleanup old backups (keep last 10)
6. Cleanup old backups (keep last 10)
echo "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
undefinedecho "Cleaning up old backups..."
ls -t "$BACKUP_DIR"/*.sql | tail -n +11 | xargs -r rm
echo "Deployment complete!"
undefinedProduction Best Practices
生产环境最佳实践
Pre-Deployment Checklist
部署前检查清单
- Migration tested in development environment
- Migration tested in staging with production-like data
- Migration reviewed by at least one team member
- Downgrade path tested and verified
- Performance impact assessed for large tables
- Database backup plan in place
- Rollback procedure documented
- Maintenance window scheduled (if needed)
- Team notified of deployment
- Monitoring alerts configured
- 迁移已在开发环境中测试
- 迁移已在类生产数据的预发布环境中测试
- 迁移已至少经过一名团队成员评审
- 回滚路径已测试并验证
- 已评估对大型表的性能影响
- 已制定数据库备份计划
- 已记录回滚流程
- 已安排维护窗口(若需要)
- 已通知团队部署事宜
- 已配置监控告警
Zero-Downtime Migrations
零停机迁移
For critical support systems that can't go offline:
Phase 1: Additive Changes
python
"""add new column (phase 1)
Revision ID: zd001
"""
def upgrade() -> None:
# Add new column as nullable
op.add_column('tickets',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('tickets', 'new_field')Phase 2: Data Migration (Background)
python
"""populate new column (phase 2)
Revision ID: zd002
"""
def upgrade() -> None:
# Update in small batches during low-traffic periods
connection = op.get_bind()
batch_size = 100
while True:
result = connection.execute(
"""
UPDATE tickets
SET new_field = calculate_value(old_field)
WHERE new_field IS NULL
LIMIT {batch_size}
""".format(batch_size=batch_size)
)
if result.rowcount == 0:
break
# Small delay to reduce database load
import time
time.sleep(0.1)
def downgrade() -> None:
connection = op.get_bind()
connection.execute("UPDATE tickets SET new_field = NULL")Phase 3: Make Required
python
"""make new column required (phase 3)
Revision ID: zd003
"""
def upgrade() -> None:
# Now that all rows have values, make it non-nullable
op.alter_column('tickets', 'new_field',
nullable=False,
server_default='default_value'
)
def downgrade() -> None:
op.alter_column('tickets', 'new_field',
nullable=True,
server_default=None
)Phase 4: Remove Old Column (Optional)
python
"""remove old column (phase 4)
Revision ID: zd004
"""
def upgrade() -> None:
op.drop_column('tickets', 'old_field')
def downgrade() -> None:
op.add_column('tickets',
sa.Column('old_field', sa.String(100), nullable=True)
)对于无法离线的关键支持系统:
阶段1:添加式变更
python
"""add new column (phase 1)
Revision ID: zd001
"""
def upgrade() -> None:
# Add new column as nullable
op.add_column('tickets',
sa.Column('new_field', sa.String(100), nullable=True)
)
def downgrade() -> None:
op.drop_column('tickets', 'new_field')阶段2:后台数据迁移
python
"""populate new column (phase 2)
Revision ID: zd002
"""
def upgrade() -> None:
# Update in small batches during low-traffic periods
connection = op.get_bind()
batch_size = 100
while True:
result = connection.execute(
"""
UPDATE tickets
SET new_field = calculate_value(old_field)
WHERE new_field IS NULL
LIMIT {batch_size}
""".format(batch_size=batch_size)
)
if result.rowcount == 0:
break
# Small delay to reduce database load
import time
time.sleep(0.1)
def downgrade() -> None:
connection = op.get_bind()
connection.execute("UPDATE tickets SET new_field = NULL")阶段3:设为必填项
python
"""make new column required (phase 3)
Revision ID: zd003
"""
def upgrade() -> None:
# Now that all rows have values, make it non-nullable
op.alter_column('tickets', 'new_field',
nullable=False,
server_default='default_value'
)
def downgrade() -> None:
op.alter_column('tickets', 'new_field',
nullable=True,
server_default=None
)阶段4:移除旧列(可选)
python
"""remove old column (phase 4)
Revision ID: zd004
"""
def upgrade() -> None:
op.drop_column('tickets', 'old_field')
def downgrade() -> None:
op.add_column('tickets',
sa.Column('old_field', sa.String(100), nullable=True)
)Handling Migration Failures
处理迁移失败
python
undefinedpython
undefinedalembic/env.py additions for error handling
alembic/env.py additions for error handling
from alembic import context
import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online():
"""Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True, # Rollback individual migrations
compare_type=True,
compare_server_default=True
)
try:
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Rolling back transaction...")
# Transaction automatically rolled back
raise
else:
logger.info("Migration completed successfully")undefinedfrom alembic import context
import logging
logger = logging.getLogger('alembic.env')
def run_migrations_online():
"""Run migrations in 'online' mode with error handling"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True, # Rollback individual migrations
compare_type=True,
compare_server_default=True
)
try:
with context.begin_transaction():
context.run_migrations()
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Rolling back transaction...")
# Transaction automatically rolled back
raise
else:
logger.info("Migration completed successfully")undefinedAdvanced Configuration
高级配置
Custom Migration Template
自定义迁移模板
Create custom template for your organization:
python
undefined为你的组织创建自定义模板:
python
undefinedalembic/script.py.mako
alembic/script.py.mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
Author: ${author if author else 'Support Team'}
Jira: ${jira_ticket if jira_ticket else 'N/A'}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
revision identifiers, used by Alembic.
revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
"""Apply migration changes"""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Revert migration changes"""
${downgrades if downgrades else "pass"}
undefinedrevision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
"""Apply migration changes"""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Revert migration changes"""
${downgrades if downgrades else "pass"}
undefinedMulti-Database Support
多数据库支持
For systems with separate databases (e.g., main DB + analytics):
python
undefined对于拥有独立数据库的系统(例如主数据库+分析数据库):
python
undefinedalembic/env.py for multiple databases
alembic/env.py for multiple databases
def run_migrations_online():
"""Run migrations for multiple databases"""
# Configuration for each database
engines = {
'main': {
'url': os.getenv('MAIN_DB_URL'),
'target_metadata': main_metadata
},
'analytics': {
'url': os.getenv('ANALYTICS_DB_URL'),
'target_metadata': analytics_metadata
}
}
for name, config in engines.items():
logger.info(f"Running migrations for {name} database")
engine = create_engine(config['url'])
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=config['target_metadata'],
upgrade_token=f"{name}_upgrade",
downgrade_token=f"{name}_downgrade"
)
with context.begin_transaction():
context.run_migrations(engine_name=name)undefineddef run_migrations_online():
"""Run migrations for multiple databases"""
# Configuration for each database
engines = {
'main': {
'url': os.getenv('MAIN_DB_URL'),
'target_metadata': main_metadata
},
'analytics': {
'url': os.getenv('ANALYTICS_DB_URL'),
'target_metadata': analytics_metadata
}
}
for name, config in engines.items():
logger.info(f"Running migrations for {name} database")
engine = create_engine(config['url'])
with engine.connect() as connection:
context.configure(
connection=connection,
target_metadata=config['target_metadata'],
upgrade_token=f"{name}_upgrade",
downgrade_token=f"{name}_downgrade"
)
with context.begin_transaction():
context.run_migrations(engine_name=name)undefinedTroubleshooting
故障排除
Common Issues and Solutions
常见问题与解决方案
Multiple Heads Error
bash
undefined多Head错误
bash
undefinedProblem: "Multiple heads exist"
Problem: "Multiple heads exist"
Solution: Merge the branches
Solution: Merge the branches
alembic merge heads -m "merge branches"
**Migration Out of Sync**
```bashalembic merge heads -m "merge branches"
**迁移不同步**
```bashProblem: Database revision doesn't match migration history
Problem: Database revision doesn't match migration history
Solution: Stamp database to specific revision
Solution: Stamp database to specific revision
alembic stamp head
alembic stamp head
Or stamp to specific revision
Or stamp to specific revision
alembic stamp abc123
**Failed Migration Cleanup**
```bashalembic stamp abc123
**迁移失败后的清理**
```bashProblem: Migration failed midway
Problem: Migration failed midway
Solution: Manual cleanup
Solution: Manual cleanup
1. Check current state
1. Check current state
alembic current
alembic current
2. Manually fix database issues
2. Manually fix database issues
psql $DATABASE_URL
psql $DATABASE_URL
3. Stamp to correct revision
3. Stamp to correct revision
alembic stamp previous_working_revision
alembic stamp previous_working_revision
4. Try migration again
4. Try migration again
alembic upgrade head
**Circular Dependencies**
```bashalembic upgrade head
**循环依赖**
```bashProblem: "Circular dependency detected"
Problem: "Circular dependency detected"
Solution: Use depends_on instead of down_revision
Solution: Use depends_on instead of down_revision
alembic revision -m "fix circular dependency"
--head=branch_a@head
--depends-on=branch_b_revision
--head=branch_a@head
--depends-on=branch_b_revision
undefinedalembic revision -m "fix circular dependency"
--head=branch_a@head
--depends-on=branch_b_revision
--head=branch_a@head
--depends-on=branch_b_revision
undefinedSummary
总结
This skill covered comprehensive Alembic usage for customer support systems:
- Setup: Installation, configuration, and initialization
- Creating Migrations: Manual and autogenerated approaches
- Data Migrations: Transforming data during schema changes
- Running Migrations: Upgrade, downgrade, and status commands
- Branching: Managing parallel development streams
- Testing: Unit, integration, and performance testing
- CI/CD: Automation and deployment strategies
- Production: Zero-downtime migrations and best practices
- Advanced: Custom templates and multi-database support
- Troubleshooting: Common issues and solutions
Always remember:
- Review autogenerated migrations
- Test migrations thoroughly before production
- Keep backups before major migrations
- Plan for rollback scenarios
- Monitor migration performance
- Document complex migrations
For more examples, see EXAMPLES.md in this skill package.
本Skill涵盖了客户支持系统中Alembic的全面使用方法:
- 设置:安装、配置与初始化
- 创建迁移:手动与自动生成两种方式
- 数据迁移:架构变更期间的数据转换
- 执行迁移:升级、回滚与状态命令
- 分支管理:并行开发流的管理
- 测试:单元、集成与性能测试
- CI/CD:自动化与部署策略
- 生产环境:零停机迁移与最佳实践
- 高级功能:自定义模板与多数据库支持
- 故障排除:常见问题与解决方案
请始终记住:
- 评审自动生成的迁移
- 生产部署前彻底测试迁移
- 重大迁移前保留备份
- 规划回滚场景
- 监控迁移性能
- 记录复杂迁移
更多示例,请查看本Skill包中的EXAMPLES.md。