ai-querying-databases
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseBuild AI That Answers Questions About Your Database
构建可回答数据库相关问题的AI
Guide the user through building an AI that takes plain English questions and returns answers from a SQL database. The pattern: understand the schema, generate SQL, validate it, run it, and explain the results.
引导用户构建一款可接收自然语言问题并从SQL数据库返回答案的AI。流程模式:理解schema、生成SQL、验证SQL、执行查询、解读结果。
When you need this
适用场景
- Sales reps asking "how many deals closed last month?" without writing SQL
- Executives asking revenue questions in plain English
- Support agents looking up customer records by description
- Internal data assistants for non-technical staff
- Any "chat with your database" feature
- 销售代表无需编写SQL即可查询“上月完成了多少笔交易?”
- 高管用自然语言询问营收相关问题
- 客服人员通过描述查找客户记录
- 面向非技术员工的内部数据助手
- 任何“与数据库对话”的功能
How it's different from document search
与文档搜索的区别
Document search ( | Database querying (this skill) | |
|---|---|---|
| Data type | Unstructured text (PDFs, articles, docs) | Structured data (tables, rows, columns) |
| How it works | Embed + retrieve passages | Understand schema + generate SQL |
| Output | Text answer grounded in passages | Data from query results + interpretation |
| Key challenge | Finding relevant passages | Writing correct, safe SQL |
文档搜索( | 数据库查询(本技能) | |
|---|---|---|
| 数据类型 | 非结构化文本(PDF、文章、文档) | 结构化数据(表、行、列) |
| 工作原理 | 嵌入+检索段落 | 理解schema+生成SQL |
| 输出结果 | 基于段落的文本答案 | 查询结果数据+结果解读 |
| 核心挑战 | 找到相关段落 | 生成正确、安全的SQL |
Step 1: Understand the setup
步骤1:了解配置要求
Ask the user:
- What database? (Postgres, MySQL, SQLite, Snowflake, BigQuery, etc.)
- What tables matter? (all of them, or a subset?)
- Who asks questions? (technical users, business users, customers?)
- Read-only access? (this should always be yes for AI-generated SQL)
询问用户:
- 使用的数据库类型?(Postgres、MySQL、SQLite、Snowflake、BigQuery等)
- 涉及哪些表?(全部表还是部分表?)
- 谁会使用该功能查询问题?(技术用户、业务用户、客户?)
- 是否仅需只读权限?(AI生成的SQL应始终使用只读权限)
Step 2: Connect to your database
步骤2:连接数据库
Use SQLAlchemy for provider-agnostic database access:
python
from sqlalchemy import create_engine, inspect, text使用SQLAlchemy实现与数据库无关的访问:
python
from sqlalchemy import create_engine, inspect, textPostgreSQL
PostgreSQL
engine = create_engine("postgresql://user:pass@host:5432/mydb")
engine = create_engine("postgresql://user:pass@host:5432/mydb")
MySQL
MySQL
engine = create_engine("mysql+pymysql://user:pass@host:3306/mydb")
engine = create_engine("mysql+pymysql://user:pass@host:3306/mydb")
SQLite (for development)
SQLite(开发环境使用)
engine = create_engine("sqlite:///local.db")
engine = create_engine("sqlite:///local.db")
Snowflake
Snowflake
engine = create_engine("snowflake://user:pass@account/db/schema")
engine = create_engine("snowflake://user:pass@account/db/schema")
BigQuery
BigQuery
engine = create_engine("bigquery://project/dataset")
undefinedengine = create_engine("bigquery://project/dataset")
undefinedBuild schema descriptions for the AI
为AI构建schema描述
The AI needs to understand your tables to write correct SQL:
python
def get_schema_description(engine, tables=None):
"""Build a text description of database schema for the AI."""
inspector = inspect(engine)
tables = tables or inspector.get_table_names()
descriptions = []
for table in tables:
columns = inspector.get_columns(table)
col_descs = []
for col in columns:
col_descs.append(f" - {col['name']} ({col['type']})")
pk = inspector.get_pk_constraint(table)
pk_cols = pk['constrained_columns'] if pk else []
desc = f"Table: {table}\n"
if pk_cols:
desc += f" Primary key: {', '.join(pk_cols)}\n"
desc += " Columns:\n" + "\n".join(col_descs)
descriptions.append(desc)
return "\n\n".join(descriptions)
schema = get_schema_description(engine)
print(schema)AI需要理解数据库表结构才能生成正确的SQL:
python
def get_schema_description(engine, tables=None):
"""Build a text description of database schema for the AI."""
inspector = inspect(engine)
tables = tables or inspector.get_table_names()
descriptions = []
for table in tables:
columns = inspector.get_columns(table)
col_descs = []
for col in columns:
col_descs.append(f" - {col['name']} ({col['type']})")
pk = inspector.get_pk_constraint(table)
pk_cols = pk['constrained_columns'] if pk else []
desc = f"Table: {table}\n"
if pk_cols:
desc += f" Primary key: {', '.join(pk_cols)}\n"
desc += " Columns:\n" + "\n".join(col_descs)
descriptions.append(desc)
return "\n\n".join(descriptions)
schema = get_schema_description(engine)
print(schema)Add business context (optional but helpful)
添加业务上下文(可选但推荐)
Raw column names like don't mean much to the AI. Add descriptions:
cust_ltv_90dpython
TABLE_DESCRIPTIONS = {
"orders": "Customer orders with amounts, dates, and status",
"customers": "Customer profiles with contact info and signup date",
"products": "Product catalog with names, prices, and categories",
}
COLUMN_DESCRIPTIONS = {
"orders.cust_ltv_90d": "Customer lifetime value over the last 90 days in USD",
"orders.gmv": "Gross merchandise value (total order amount before discounts)",
}
def get_enriched_schema(engine, table_descs=None, col_descs=None):
"""Schema description with business context."""
inspector = inspect(engine)
table_descs = table_descs or {}
col_descs = col_descs or {}
descriptions = []
for table in inspector.get_table_names():
desc = f"Table: {table}"
if table in table_descs:
desc += f" -- {table_descs[table]}"
desc += "\n Columns:\n"
for col in inspector.get_columns(table):
col_key = f"{table}.{col['name']}"
col_desc = f" - {col['name']} ({col['type']})"
if col_key in col_descs:
col_desc += f" -- {col_descs[col_key]}"
desc += col_desc + "\n"
descriptions.append(desc)
return "\n".join(descriptions)像这类原始列名对AI来说意义不大,可添加描述:
cust_ltv_90dpython
TABLE_DESCRIPTIONS = {
"orders": "Customer orders with amounts, dates, and status",
"customers": "Customer profiles with contact info and signup date",
"products": "Product catalog with names, prices, and categories",
}
COLUMN_DESCRIPTIONS = {
"orders.cust_ltv_90d": "Customer lifetime value over the last 90 days in USD",
"orders.gmv": "Gross merchandise value (total order amount before discounts)",
}
def get_enriched_schema(engine, table_descs=None, col_descs=None):
"""Schema description with business context."""
inspector = inspect(engine)
table_descs = table_descs or {}
col_descs = col_descs or {}
descriptions = []
for table in inspector.get_table_names():
desc = f"Table: {table}"
if table in table_descs:
desc += f" -- {table_descs[table]}"
desc += "\n Columns:\n"
for col in inspector.get_columns(table):
col_key = f"{table}.{col['name']}"
col_desc = f" - {col['name']} ({col['type']})"
if col_key in col_descs:
col_desc += f" -- {col_descs[col_key]}"
desc += col_desc + "\n"
descriptions.append(desc)
return "\n".join(descriptions)Step 3: Build the text-to-SQL pipeline
步骤3:构建文本转SQL流水线
Two-stage approach: first pick the relevant tables, then generate SQL.
采用两阶段方案:先选择相关表,再生成SQL。
Stage 1: Table selection (for databases with many tables)
阶段1:表选择(适用于包含大量表的数据库)
python
import dspy
class SelectTables(dspy.Signature):
"""Given a database schema and a user question, select which tables
are needed to answer the question."""
schema: str = dspy.InputField(desc="Database schema description")
question: str = dspy.InputField(desc="User's question in plain English")
tables: list[str] = dspy.OutputField(desc="List of table names needed")
reasoning: str = dspy.OutputField(desc="Why these tables are needed")python
import dspy
class SelectTables(dspy.Signature):
"""Given a database schema and a user question, select which tables
are needed to answer the question."""
schema: str = dspy.InputField(desc="Database schema description")
question: str = dspy.InputField(desc="User's question in plain English")
tables: list[str] = dspy.OutputField(desc="List of table names needed")
reasoning: str = dspy.OutputField(desc="Why these tables are needed")Stage 2: SQL generation
阶段2:SQL生成
python
class GenerateSQL(dspy.Signature):
"""Write a SQL SELECT query to answer the user's question.
Only use tables and columns that exist in the schema."""
schema: str = dspy.InputField(desc="Database schema for relevant tables")
question: str = dspy.InputField(desc="User's question in plain English")
sql: str = dspy.OutputField(desc="SQL SELECT query (read-only, no mutations)")python
class GenerateSQL(dspy.Signature):
"""Write a SQL SELECT query to answer the user's question.
Only use tables and columns that exist in the schema."""
schema: str = dspy.InputField(desc="Database schema for relevant tables")
question: str = dspy.InputField(desc="User's question in plain English")
sql: str = dspy.OutputField(desc="SQL SELECT query (read-only, no mutations)")The full pipeline
完整流水线
python
class DatabaseQA(dspy.Module):
def __init__(self, engine, schema, use_table_selection=False):
self.engine = engine
self.full_schema = schema
self.use_table_selection = use_table_selection
if use_table_selection:
self.select_tables = dspy.ChainOfThought(SelectTables)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
# Pick relevant tables (for large schemas)
if self.use_table_selection:
selected = self.select_tables(
schema=self.full_schema, question=question
)
schema = filter_schema(self.full_schema, selected.tables)
else:
schema = self.full_schema
# Generate SQL
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
# Validate (see Step 4)
validate_sql(sql)
# Execute
rows = execute_query(self.engine, sql)
# Interpret results
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(
sql=sql, rows=rows, answer=interpretation.answer
)python
class DatabaseQA(dspy.Module):
def __init__(self, engine, schema, use_table_selection=False):
self.engine = engine
self.full_schema = schema
self.use_table_selection = use_table_selection
if use_table_selection:
self.select_tables = dspy.ChainOfThought(SelectTables)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
# Pick relevant tables (for large schemas)
if self.use_table_selection:
selected = self.select_tables(
schema=self.full_schema, question=question
)
schema = filter_schema(self.full_schema, selected.tables)
else:
schema = self.full_schema
# Generate SQL
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
# Validate (see Step 4)
validate_sql(sql)
# Execute
rows = execute_query(self.engine, sql)
# Interpret results
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(
sql=sql, rows=rows, answer=interpretation.answer
)Helper: filter schema to selected tables
辅助工具:筛选选中表的schema
python
def filter_schema(full_schema, table_names):
"""Keep only the schema sections for selected tables."""
sections = full_schema.split("\n\n")
filtered = []
for section in sections:
for table in table_names:
if section.startswith(f"Table: {table}"):
filtered.append(section)
break
return "\n\n".join(filtered)python
def filter_schema(full_schema, table_names):
"""Keep only the schema sections for selected tables."""
sections = full_schema.split("\n\n")
filtered = []
for section in sections:
for table in table_names:
if section.startswith(f"Table: {table}"):
filtered.append(section)
break
return "\n\n".join(filtered)Step 4: Validate SQL before execution
步骤4:执行前验证SQL
Never run AI-generated SQL without validation. Use for hard constraints and for style guidance:
dspy.Assertdspy.Suggestpython
import sqlparse
def validate_sql(sql):
"""Validate AI-generated SQL is safe and well-formed."""
sql_upper = sql.strip().upper()
# Hard safety constraints
dspy.Assert(
sql_upper.startswith("SELECT"),
"Only SELECT queries are allowed. Do not generate INSERT, UPDATE, DELETE, DROP, or ALTER."
)
dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "EXEC"]
for keyword in dangerous:
dspy.Assert(
keyword not in sql_upper.split("SELECT", 1)[0],
f"Query contains forbidden keyword: {keyword}"
)
# Syntax check
parsed = sqlparse.parse(sql)
dspy.Assert(len(parsed) == 1, "Generate exactly one SQL statement")
dspy.Assert(
parsed[0].get_type() == "SELECT",
"Only SELECT statements are allowed"
)
# Style suggestions (soft constraints — AI will retry if possible)
dspy.Suggest(
"JOIN" not in sql_upper or "ON" in sql_upper,
"Use explicit JOIN ... ON syntax instead of implicit joins in WHERE"
)绝不要直接运行AI生成的SQL,必须先验证。使用设置硬约束,提供风格指导:
dspy.Assertdspy.Suggestpython
import sqlparse
def validate_sql(sql):
"""Validate AI-generated SQL is safe and well-formed."""
sql_upper = sql.strip().upper()
# Hard safety constraints
dspy.Assert(
sql_upper.startswith("SELECT"),
"Only SELECT queries are allowed. Do not generate INSERT, UPDATE, DELETE, DROP, or ALTER."
)
dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "EXEC"]
for keyword in dangerous:
dspy.Assert(
keyword not in sql_upper.split("SELECT", 1)[0],
f"Query contains forbidden keyword: {keyword}"
)
# Syntax check
parsed = sqlparse.parse(sql)
dspy.Assert(len(parsed) == 1, "Generate exactly one SQL statement")
dspy.Assert(
parsed[0].get_type() == "SELECT",
"Only SELECT statements are allowed"
)
# Style suggestions (soft constraints — AI will retry if possible)
dspy.Suggest(
"JOIN" not in sql_upper or "ON" in sql_upper,
"Use explicit JOIN ... ON syntax instead of implicit joins in WHERE"
)Execute with safety limits
带安全限制的执行
python
from sqlalchemy import text
def execute_query(engine, sql, row_limit=100, timeout_seconds=30):
"""Execute validated SQL with safety limits."""
# Add row limit if not present
if "LIMIT" not in sql.upper():
sql = f"{sql} LIMIT {row_limit}"
with engine.connect() as conn:
conn = conn.execution_options(timeout=timeout_seconds)
result = conn.execute(text(sql))
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchall()]
return rowspython
from sqlalchemy import text
def execute_query(engine, sql, row_limit=100, timeout_seconds=30):
"""Execute validated SQL with safety limits."""
# Add row limit if not present
if "LIMIT" not in sql.upper():
sql = f"{sql} LIMIT {row_limit}"
with engine.connect() as conn:
conn = conn.execution_options(timeout=timeout_seconds)
result = conn.execute(text(sql))
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchall()]
return rowsStep 5: Interpret results
步骤5:解读结果
Convert raw query results back to a natural language answer:
python
class InterpretResults(dspy.Signature):
"""Convert SQL query results into a clear, natural language answer
to the user's original question."""
question: str = dspy.InputField(desc="The user's original question")
sql: str = dspy.InputField(desc="The SQL query that was run")
results: str = dspy.InputField(desc="Query results as a string")
answer: str = dspy.OutputField(desc="Natural language answer to the question")将原始查询结果转换为自然语言答案:
python
class InterpretResults(dspy.Signature):
"""Convert SQL query results into a clear, natural language answer
to the user's original question."""
question: str = dspy.InputField(desc="The user's original question")
sql: str = dspy.InputField(desc="The SQL query that was run")
results: str = dspy.InputField(desc="Query results as a string")
answer: str = dspy.OutputField(desc="Natural language answer to the question")Step 6: Handle large schemas
步骤6:处理大型schema
For databases with 50+ tables, sending the full schema to the AI is expensive and confusing. Use embedding-based schema retrieval:
python
import chromadb
def build_schema_index(engine, table_descriptions=None):
"""Build a searchable index of table schemas."""
client = chromadb.PersistentClient(path="./schema_index")
collection = client.get_or_create_collection("table_schemas")
inspector = inspect(engine)
table_descriptions = table_descriptions or {}
for table in inspector.get_table_names():
columns = inspector.get_columns(table)
col_names = [c["name"] for c in columns]
# Searchable description
desc = table_descriptions.get(table, table)
searchable = f"{table}: {desc}. Columns: {', '.join(col_names)}"
collection.upsert(
documents=[searchable],
ids=[table],
metadatas=[{"table": table}],
)
return collection
class SchemaRetriever(dspy.Retrieve):
"""Retrieve relevant table schemas based on the question."""
def __init__(self, collection, engine, k=5):
super().__init__(k=k)
self.collection = collection
self.engine = engine
def forward(self, query, k=None):
k = k or self.k
results = self.collection.query(query_texts=[query], n_results=k)
# Get full schema for matched tables
tables = [m["table"] for m in results["metadatas"][0]]
schema = get_schema_description(self.engine, tables=tables)
return dspy.Prediction(passages=[schema])Then use it in your pipeline:
python
class LargeSchemaQA(dspy.Module):
def __init__(self, engine, schema_collection):
self.engine = engine
self.schema_retriever = SchemaRetriever(schema_collection, engine, k=5)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
schema = self.schema_retriever(question).passages[0]
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
validate_sql(sql)
rows = execute_query(self.engine, sql)
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(sql=sql, rows=rows, answer=interpretation.answer)对于包含50张以上表的数据库,将完整schema发送给AI成本高且易混淆,可使用基于嵌入的schema检索:
python
import chromadb
def build_schema_index(engine, table_descriptions=None):
"""Build a searchable index of table schemas."""
client = chromadb.PersistentClient(path="./schema_index")
collection = client.get_or_create_collection("table_schemas")
inspector = inspect(engine)
table_descriptions = table_descriptions or {}
for table in inspector.get_table_names():
columns = inspector.get_columns(table)
col_names = [c["name"] for c in columns]
# Searchable description
desc = table_descriptions.get(table, table)
searchable = f"{table}: {desc}. Columns: {', '.join(col_names)}"
collection.upsert(
documents=[searchable],
ids=[table],
metadatas=[{"table": table}],
)
return collection
class SchemaRetriever(dspy.Retrieve):
"""Retrieve relevant table schemas based on the question."""
def __init__(self, collection, engine, k=5):
super().__init__(k=k)
self.collection = collection
self.engine = engine
def forward(self, query, k=None):
k = k or self.k
results = self.collection.query(query_texts=[query], n_results=k)
# Get full schema for matched tables
tables = [m["table"] for m in results["metadatas"][0]]
schema = get_schema_description(self.engine, tables=tables)
return dspy.Prediction(passages=[schema])然后在流水线中使用:
python
class LargeSchemaQA(dspy.Module):
def __init__(self, engine, schema_collection):
self.engine = engine
self.schema_retriever = SchemaRetriever(schema_collection, engine, k=5)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
schema = self.schema_retriever(question).passages[0]
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
validate_sql(sql)
rows = execute_query(self.engine, sql)
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(sql=sql, rows=rows, answer=interpretation.answer)Step 7: Test and optimize
步骤7:测试与优化
SQL execution accuracy metric
SQL执行准确率指标
python
def sql_accuracy(example, prediction, trace=None):
"""Check if the generated SQL returns the correct answer."""
try:
# Compare results (not SQL text — many valid SQL queries per question)
expected = set(str(r) for r in example.expected_rows)
actual = set(str(r) for r in prediction.rows)
return float(expected == actual)
except Exception:
return 0.0
def answer_quality(example, prediction, trace=None):
"""Check if the natural language answer is correct."""
judge = dspy.Predict("question, expected_answer, predicted_answer -> is_correct: bool")
result = judge(
question=example.question,
expected_answer=example.answer,
predicted_answer=prediction.answer,
)
return float(result.is_correct)python
def sql_accuracy(example, prediction, trace=None):
"""Check if the generated SQL returns the correct answer."""
try:
# Compare results (not SQL text — many valid SQL queries per question)
expected = set(str(r) for r in example.expected_rows)
actual = set(str(r) for r in prediction.rows)
return float(expected == actual)
except Exception:
return 0.0
def answer_quality(example, prediction, trace=None):
"""Check if the natural language answer is correct."""
judge = dspy.Predict("question, expected_answer, predicted_answer -> is_correct: bool")
result = judge(
question=example.question,
expected_answer=example.answer,
predicted_answer=prediction.answer,
)
return float(result.is_correct)Build training data
构建训练数据
python
trainset = [
dspy.Example(
question="How many orders were placed last month?",
answer="There were 1,247 orders placed last month.",
expected_sql="SELECT COUNT(*) FROM orders WHERE created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')",
).with_inputs("question"),
# Add 20-50 question/answer pairs covering your common queries
]python
trainset = [
dspy.Example(
question="How many orders were placed last month?",
answer="There were 1,247 orders placed last month.",
expected_sql="SELECT COUNT(*) FROM orders WHERE created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')",
).with_inputs("question"),
# Add 20-50 question/answer pairs covering your common queries
]Optimize
优化
python
optimizer = dspy.MIPROv2(metric=answer_quality, auto="medium")
optimized = optimizer.compile(DatabaseQA(engine, schema), trainset=trainset)
optimized.save("optimized_db_qa.json")python
optimizer = dspy.MIPROv2(metric=answer_quality, auto="medium")
optimized = optimizer.compile(DatabaseQA(engine, schema), trainset=trainset)
optimized.save("optimized_db_qa.json")Step 8: Security and production
步骤8:安全与生产部署
Security checklist
安全检查清单
| Control | How |
|---|---|
| Read-only database user | |
| Query timeout | |
| Row limit | Always append |
| Table allowlist | Only include permitted tables in the schema |
| SQL validation | |
| Audit logging | Log every question, generated SQL, and results |
| No raw credentials | Use environment variables or secrets manager |
| 控制措施 | 实现方式 |
|---|---|
| 只读数据库用户 | |
| 查询超时 | 在SQLAlchemy中使用 |
| 行数限制 | 始终为查询添加 |
| 表白名单 | schema中仅包含允许访问的表 |
| SQL验证 | 使用 |
| 审计日志 | 记录每个问题、生成的SQL及结果 |
| 不使用明文凭证 | 使用环境变量或密钥管理器 |
Audit logging
审计日志
python
import json
from datetime import datetime
def log_query(question, sql, row_count, user_id=None):
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"question": question,
"sql": sql,
"row_count": row_count,
}
with open("query_audit.jsonl", "a") as f:
f.write(json.dumps(entry) + "\n")python
import json
from datetime import datetime
def log_query(question, sql, row_count, user_id=None):
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"question": question,
"sql": sql,
"row_count": row_count,
}
with open("query_audit.jsonl", "a") as f:
f.write(json.dumps(entry) + "\n")Table allowlist
表白名单
python
ALLOWED_TABLES = {"orders", "products", "customers", "categories"}
def get_safe_schema(engine, allowed=ALLOWED_TABLES):
inspector = inspect(engine)
all_tables = set(inspector.get_table_names())
tables = list(all_tables & allowed)
return get_schema_description(engine, tables=tables)python
ALLOWED_TABLES = {"orders", "products", "customers", "categories"}
def get_safe_schema(engine, allowed=ALLOWED_TABLES):
inspector = inspect(engine)
all_tables = set(inspector.get_table_names())
tables = list(all_tables & allowed)
return get_schema_description(engine, tables=tables)Key patterns
核心模式
- Two-stage pipeline: table selection + SQL generation works better than one giant prompt
- Validate before executing: never run AI-generated SQL without safety checks
- Compare results, not SQL: many valid SQL queries produce the same answer
- Business context matters: column descriptions improve accuracy more than extra examples
- Start with a small table allowlist: expand as you build confidence
- Read-only, always: the AI database user should never have write permissions
- 两阶段流水线:表选择+SQL生成比单一长提示词效果更好
- 执行前验证:绝不要直接运行AI生成的SQL,必须经过安全检查
- 比较结果而非SQL文本:多个合法SQL查询可得到相同结果
- 业务上下文很重要:列描述比额外示例更能提升准确率
- 从小型表白名单开始:建立信心后再逐步扩展
- 始终使用只读权限:AI数据库用户绝不能拥有写入权限
Additional resources
额外资源
- For worked examples, see examples.md
- Use to put your database assistant behind a REST API
/ai-serving-apis - Use for complex multi-step query workflows
/ai-building-pipelines - Use for additional SQL validation patterns
/ai-checking-outputs - Use to enforce query policies (e.g., no queries on PII columns)
/ai-following-rules - Use to measure and optimize query quality
/ai-improving-accuracy - Use to debug individual query failures
/ai-tracing-requests
- 可查看examples.md获取实际示例
- 使用将数据库助手部署为REST API
/ai-serving-apis - 使用处理复杂的多步骤查询工作流
/ai-building-pipelines - 使用获取更多SQL验证模式
/ai-checking-outputs - 使用执行查询策略(例如禁止查询含PII的列)
/ai-following-rules - 使用衡量并优化查询质量
/ai-improving-accuracy - 使用调试单个查询失败案例
/ai-tracing-requests