Loading...
Loading...
Compare original and translation side by side
| Source | Gather |
|---|---|
| Codebase | Existing models, database setup, connection patterns |
| Conversation | Student's specific use case (what they're building), constraints |
| Skill References | Domain patterns from |
| User Guidelines | Project conventions, proficiency level |
| 来源 | 需要收集的信息 |
|---|---|
| 代码库 | 现有模型、数据库配置、连接模式 |
| 对话内容 | 学生的具体使用场景(正在构建的项目)、约束条件 |
| 技能参考资料 | |
| 用户指南 | 项目约定、学生熟练程度 |
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
description = Column(String(200))
amount = Column(Float)
category_id = Column(Integer, ForeignKey('categories.id'))from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
description = Column(String(200))
amount = Column(Float)
category_id = Column(Integer, ForeignKey('categories.id'))with Session(engine) as session:
new_expense = Expense(description="Groceries", amount=45.50, category_id=1)
session.add(new_expense)
session.commit() # All or nothingwith Session(engine) as session:
new_expense = Expense(description="Groceries", amount=45.50, category_id=1)
session.add(new_expense)
session.commit() # 要么全部成功,要么全部失败class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50))
expenses = relationship("Expense", back_populates="category")
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
category = relationship("Category", back_populates="expenses")category = session.query(Category).first()
print(category.expenses) # All expenses in this categoryclass Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50))
expenses = relationship("Expense", back_populates="category")
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
category = relationship("Category", back_populates="expenses")category = session.query(Category).first()
print(category.expenses) # 获取该分类下的所有支出记录undefinedundefinedundefinedundefinedpostgresql+psycopg2://user:pass@host/dbname?sslmode=require?sslmode=require.envpostgresql+psycopg2://user:pass@host/dbname?sslmode=require?sslmode=require.env| Scenario | Pattern | Why |
|---|---|---|
| First database model | Single table, one Column type | Simplest mental model before relationships |
| Need to link data | Use relationship() + ForeignKey | ORM handles complex joins for you |
| Many concurrent requests | Connection pooling with pool_size | Neon scales compute; pooling maximizes it |
| Data consistency critical | Transactions with try/except | Rollback on error; guarantees atomicity |
| Want to scale to zero | Neon serverless + pool with echo_pool | Auto-pause when idle; wake on first request |
| Debugging queries | Enable echo=True in engine | See generated SQL |
| 场景 | 适用模式 | 原因 |
|---|---|---|
| 首次创建数据库模型 | 单表、单一列类型 | 在学习关联关系前,先建立最简单的认知模型 |
| 需要关联不同数据 | 使用relationship() + ForeignKey | ORM会自动处理复杂的连接查询 |
| 高并发请求场景 | 配置连接池pool_size | Neon可扩缩容计算资源;连接池能最大化资源利用率 |
| 数据一致性要求高 | 带try/except的事务 | 出错时回滚;保证原子性 |
| 需要缩容至零 | Neon无服务器 + 带echo_pool的连接池 | 空闲时自动暂停;首次请求时唤醒 |
| 调试查询语句 | 在engine中启用echo=True | 查看自动生成的SQL语句 |
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, func
from sqlalchemy.orm import declarative_base, relationship, Session
from datetime import datetime
Base = declarative_base()
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
expenses = relationship("Expense", back_populates="category")
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
description = Column(String(200))
amount = Column(Float)
date = Column(DateTime, default=datetime.utcnow)
category_id = Column(Integer, ForeignKey('categories.id'))
category = relationship("Category", back_populates="expenses")from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey, func
from sqlalchemy.orm import declarative_base, relationship, Session
from datetime import datetime
Base = declarative_base()
class Category(Base):
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True)
expenses = relationship("Expense", back_populates="category")
class Expense(Base):
__tablename__ = 'expenses'
id = Column(Integer, primary_key=True)
description = Column(String(200))
amount = Column(Float)
date = Column(DateTime, default=datetime.utcnow)
category_id = Column(Integer, ForeignKey('categories.id'))
category = relationship("Category", back_populates="expenses")import os
from dotenv import load_dotenv
load_dotenv()import os
from dotenv import load_dotenv
load_dotenv()undefinedundefineddef create_expense(session, description, amount, category_id):
"""Create a new expense."""
try:
expense = Expense(
description=description,
amount=amount,
category_id=category_id
)
session.add(expense)
session.commit()
return expense
except Exception as e:
session.rollback()
print(f"Error creating expense: {e}")
return None
def read_expenses(session, category_id=None):
"""Read expenses, optionally filtered by category."""
query = session.query(Expense)
if category_id:
query = query.filter(Expense.category_id == category_id)
return query.all()
def update_expense(session, expense_id, amount=None, description=None):
"""Update an expense."""
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if expense:
if amount is not None:
expense.amount = amount
if description is not None:
expense.description = description
session.commit()
return expense
return None
def delete_expense(session, expense_id):
"""Delete an expense."""
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if expense:
session.delete(expense)
session.commit()
return True
return Falsedef create_expense(session, description, amount, category_id):
"""创建新的支出记录。"""
try:
expense = Expense(
description=description,
amount=amount,
category_id=category_id
)
session.add(expense)
session.commit()
return expense
except Exception as e:
session.rollback()
print(f"创建支出记录出错:{e}")
return None
def read_expenses(session, category_id=None):
"""读取支出记录,可按分类过滤。"""
query = session.query(Expense)
if category_id:
query = query.filter(Expense.category_id == category_id)
return query.all()
def update_expense(session, expense_id, amount=None, description=None):
"""更新支出记录。"""
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if expense:
if amount is not None:
expense.amount = amount
if description is not None:
expense.description = description
session.commit()
return expense
return None
def delete_expense(session, expense_id):
"""删除支出记录。"""
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if expense:
session.delete(expense)
session.commit()
return True
return Falseundefinedundefinedundefinedundefineddef transfer_expense(session, expense_id, new_category_id):
"""Move expense to different category (must succeed fully or not at all)."""
try:
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if not expense:
raise ValueError(f"Expense {expense_id} not found")
expense.category_id = new_category_id
session.commit()
return True
except Exception as e:
session.rollback()
print(f"Transaction failed, rolled back: {e}")
return Falsedef transfer_expense(session, expense_id, new_category_id):
"""将支出记录转移到其他分类(必须完全成功或完全失败)。"""
try:
expense = session.query(Expense).filter(Expense.id == expense_id).first()
if not expense:
raise ValueError(f"未找到支出记录 {expense_id}")
expense.category_id = new_category_id
session.commit()
return True
except Exception as e:
session.rollback()
print(f"事务失败,已回滚:{e}")
return FalseDATABASE_URL=postgresql+psycopg2://user:password@ep-ABC123.neon.tech/dbname?sslmode=requirefrom sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # Verify connections before use
echo=False # Set to True for debugging
)DATABASE_URL=postgresql+psycopg2://user:password@ep-ABC123.neon.tech/dbname?sslmode=requirefrom sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_pre_ping=True, # 使用前验证连接有效性
echo=False # 调试时设置为True
)undefinedundefined
Register these as MCP tools so the agent (Budget Manager) can use them.
---
将这些函数注册为MCP工具,以便预算管理AI代理调用。
---.envsession.commit()session.rollback()pool_pre_ping=True?sslmode=require.envsession.commit()session.rollback()pool_pre_ping=True?sslmode=require| Mistake | Impact | Fix |
|---|---|---|
Forgetting | Changes not saved | Always call commit() or use context manager |
| Not rolling back on error | Partial data in database | Wrap in try/except with rollback() |
| Hardcoding credentials | Security breach | Use environment variables |
| No connection pooling | Neon compute scaling inefficient | Set |
| Raw user input in queries | SQL injection | Use parameterized queries (ORM does this) |
| 错误 | 影响 | 修复方案 |
|---|---|---|
忘记调用 | 变更未保存 | 始终调用commit()或使用上下文管理器 |
| 出错时未回滚 | 数据库中存在部分数据 | 包裹在try/except块中并调用rollback() |
| 硬编码凭证 | 安全漏洞 | 使用环境变量 |
| 未配置连接池 | Neon计算资源扩缩容效率低下 | 设置 |
| 查询中使用原始用户输入 | SQL注入风险 | 使用参数化查询(ORM已自动处理) |
references/budget-tracker-complete.py| 问题 | 原因 | 解决方案 |
|---|---|---|
| 未安装SQLAlchemy | 执行 |
| 缺少PostgreSQL驱动 | 执行 |
| 连接字符串错误或Neon服务离线 | 检查 |
| 插入重复的唯一字段值 | 检查值是否已存在,改用更新操作 |
| 关联的分类不存在 | 先创建分类,或使用有效的category_id |
| 查询速度慢 | 未创建索引、缺少关联关系 | 查看 |
| Problem | Cause | Solution |
|---|---|---|
| Not installed | |
| PostgreSQL driver missing | |
| Wrong connection string or Neon offline | Check |
| Inserting duplicate unique field | Check if value already exists, use update instead |
| Category doesn't exist | Create category first, or use valid category_id |
| Queries are slow | No indexes, missing relationships | Check |