appwrite-python
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAppwrite Python SDK
Appwrite Python SDK
Installation
安装
bash
pip install appwritebash
pip install appwriteSetting Up the Client
客户端设置
python
from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider
import os
client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project(os.environ['APPWRITE_PROJECT_ID'])
.set_key(os.environ['APPWRITE_API_KEY']))python
from appwrite.client import Client
from appwrite.id import ID
from appwrite.query import Query
from appwrite.services.users import Users
from appwrite.services.tablesdb import TablesDB
from appwrite.services.storage import Storage
from appwrite.services.functions import Functions
from appwrite.enums.o_auth_provider import OAuthProvider
import os
client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project(os.environ['APPWRITE_PROJECT_ID'])
.set_key(os.environ['APPWRITE_API_KEY']))Code Examples
代码示例
User Management
用户管理
python
users = Users(client)python
users = Users(client)Create user
Create user
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')
user = users.create(ID.unique(), 'user@example.com', None, 'password123', 'User Name')
List users
List users
result = users.list([Query.limit(25)])
result = users.list([Query.limit(25)])
Get user
Get user
fetched = users.get('[USER_ID]')
fetched = users.get('[USER_ID]')
Delete user
Delete user
users.delete('[USER_ID]')
undefinedusers.delete('[USER_ID]')
undefinedDatabase Operations
数据库操作
Note: Use(not the deprecatedTablesDBclass) for all new code. Only useDatabasesif the existing codebase already relies on it or the user explicitly requests it.DatabasesTip: Prefer keyword arguments (e.g.,) over positional arguments for all SDK method calls. Only use positional style if the existing codebase already uses it or the user explicitly requests it.database_id='...'
python
tables_db = TablesDB(client)注意: 所有新代码请使用(而非已弃用的TablesDB类)。仅当现有代码库已依赖Databases或用户明确要求时,才使用该类。Databases提示: 对于所有SDK方法调用,优先使用关键字参数(例如)而非位置参数。仅当现有代码库已使用位置参数或用户明确要求时,才使用该风格。database_id='...'
python
tables_db = TablesDB(client)Create database
Create database
db = tables_db.create(ID.unique(), 'My Database')
db = tables_db.create(ID.unique(), 'My Database')
Create row
Create row
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
})
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
})
Query rows
Query rows
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [
Query.equal('title', 'Hello World'),
Query.limit(10)
])
results = tables_db.list_rows('[DATABASE_ID]', '[TABLE_ID]', [
Query.equal('title', 'Hello World'),
Query.limit(10)
])
Get row
Get row
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
Update row
Update row
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', {
'title': 'Updated'
})
tables_db.update_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]', {
'title': 'Updated'
})
Delete row
Delete row
tables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
undefinedtables_db.delete_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
undefinedString Column Types
字符串列类型
Note: The legacytype is deprecated. Use explicit column types for all new columns.string
| Type | Max characters | Indexing | Storage |
|---|---|---|---|
| 16,383 | Full index (if size ≤ 768) | Inline in row |
| 16,383 | Prefix only | Off-page |
| 4,194,303 | Prefix only | Off-page |
| 1,073,741,823 | Prefix only | Off-page |
- is stored inline and counts towards the 64 KB row size limit. Prefer for short, indexed fields like names, slugs, or identifiers.
varchar - ,
text, andmediumtextare stored off-page (only a 20-byte pointer lives in the row), so they don't consume the row size budget.longtextis not required for these types.size
python
undefined注意: 旧版类型已被弃用。所有新列请使用明确的列类型。string
| 类型 | 最大字符数 | 索引 | 存储方式 |
|---|---|---|---|
| 16,383 | 全索引(若长度≤768) | 行内存储 |
| 16,383 | 仅前缀索引 | 页外存储 |
| 4,194,303 | 仅前缀索引 | 页外存储 |
| 1,073,741,823 | 仅前缀索引 | 页外存储 |
- 存储在行内,占用64KB行大小限制。适合短的、需要索引的字段,如名称、别名或标识符。
varchar - 、
text和mediumtext存储在页外(行内仅保留20字节指针),因此不会占用行大小配额。这些类型无需指定longtext。size
python
undefinedCreate table with explicit string column types
Create table with explicit string column types
tables_db.create_table(
database_id='[DATABASE_ID]',
table_id=ID.unique(),
name='articles',
columns=[
{'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable
{'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only
{'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars
{'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars
]
)
undefinedtables_db.create_table(
database_id='[DATABASE_ID]',
table_id=ID.unique(),
name='articles',
columns=[
{'key': 'title', 'type': 'varchar', 'size': 255, 'required': True}, # inline, fully indexable
{'key': 'summary', 'type': 'text', 'required': False}, # off-page, prefix index only
{'key': 'body', 'type': 'mediumtext', 'required': False}, # up to ~4 M chars
{'key': 'raw_data', 'type': 'longtext', 'required': False}, # up to ~1 B chars
]
)
undefinedQuery Methods
查询方法
python
undefinedpython
undefinedFiltering
Filtering
Query.equal('field', 'value') # == (or pass list for IN)
Query.not_equal('field', 'value') # !=
Query.less_than('field', 100) # <
Query.less_than_equal('field', 100) # <=
Query.greater_than('field', 100) # >
Query.greater_than_equal('field', 100) # >=
Query.between('field', 1, 100) # 1 <= field <= 100
Query.is_null('field') # is null
Query.is_not_null('field') # is not null
Query.starts_with('field', 'prefix') # starts with
Query.ends_with('field', 'suffix') # ends with
Query.contains('field', 'sub') # contains (string or array)
Query.search('field', 'keywords') # full-text search (requires index)
Query.equal('field', 'value') # == (or pass list for IN)
Query.not_equal('field', 'value') # !=
Query.less_than('field', 100) # <
Query.less_than_equal('field', 100) # <=
Query.greater_than('field', 100) # >
Query.greater_than_equal('field', 100) # >=
Query.between('field', 1, 100) # 1 <= field <= 100
Query.is_null('field') # is null
Query.is_not_null('field') # is not null
Query.starts_with('field', 'prefix') # starts with
Query.ends_with('field', 'suffix') # ends with
Query.contains('field', 'sub') # contains (string or array)
Query.search('field', 'keywords') # full-text search (requires index)
Sorting
Sorting
Query.order_asc('field')
Query.order_desc('field')
Query.order_asc('field')
Query.order_desc('field')
Pagination
Pagination
Query.limit(25) # max rows (default 25, max 100)
Query.offset(0) # skip N rows
Query.cursor_after('[ROW_ID]') # cursor pagination (preferred)
Query.cursor_before('[ROW_ID]')
Query.limit(25) # max rows (default 25, max 100)
Query.offset(0) # skip N rows
Query.cursor_after('[ROW_ID]') # cursor pagination (preferred)
Query.cursor_before('[ROW_ID]')
Selection & Logic
Selection & Logic
Query.select(['field1', 'field2']) # return only specified fields
Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR
Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
undefinedQuery.select(['field1', 'field2']) # return only specified fields
Query.or_queries([Query.equal('a', 1), Query.equal('b', 2)]) # OR
Query.and_queries([Query.greater_than('age', 18), Query.less_than('age', 65)]) # AND (default)
undefinedFile Storage
文件存储
python
from appwrite.input_file import InputFile
storage = Storage(client)python
from appwrite.input_file import InputFile
storage = Storage(client)Upload file
Upload file
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'))
List files
List files
files = storage.list_files('[BUCKET_ID]')
files = storage.list_files('[BUCKET_ID]')
Delete file
Delete file
storage.delete_file('[BUCKET_ID]', '[FILE_ID]')
undefinedstorage.delete_file('[BUCKET_ID]', '[FILE_ID]')
undefinedInputFile Factory Methods
InputFile工厂方法
python
from appwrite.input_file import InputFile
InputFile.from_path('/path/to/file.png') # from filesystem path
InputFile.from_bytes(byte_data, 'file.png') # from bytes
InputFile.from_string('Hello world', 'hello.txt') # from string contentpython
from appwrite.input_file import InputFile
InputFile.from_path('/path/to/file.png') # from filesystem path
InputFile.from_bytes(byte_data, 'file.png') # from bytes
InputFile.from_string('Hello world', 'hello.txt') # from string contentTeams
团队管理
python
from appwrite.services.teams import Teams
teams = Teams(client)python
from appwrite.services.teams import Teams
teams = Teams(client)Create team
Create team
team = teams.create(ID.unique(), 'Engineering')
team = teams.create(ID.unique(), 'Engineering')
List teams
List teams
team_list = teams.list()
team_list = teams.list()
Create membership (invite user by email)
Create membership (invite user by email)
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')
membership = teams.create_membership('[TEAM_ID]', roles=['editor'], email='user@example.com')
List memberships
List memberships
members = teams.list_memberships('[TEAM_ID]')
members = teams.list_memberships('[TEAM_ID]')
Update membership roles
Update membership roles
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])
teams.update_membership('[TEAM_ID]', '[MEMBERSHIP_ID]', roles=['admin'])
Delete team
Delete team
teams.delete('[TEAM_ID]')
> **Role-based access:** Use `Role.team('[TEAM_ID]')` for all team members or `Role.team('[TEAM_ID]', 'editor')` for a specific team role when setting permissions.teams.delete('[TEAM_ID]')
> **基于角色的访问控制:** 设置权限时,使用`Role.team('[TEAM_ID]')`表示所有团队成员,或使用`Role.team('[TEAM_ID]', 'editor')`表示特定团队角色。Serverless Functions
无服务器函数
python
functions = Functions(client)python
functions = Functions(client)Execute function
Execute function
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')
execution = functions.create_execution('[FUNCTION_ID]', body='{"key": "value"}')
List executions
List executions
executions = functions.list_executions('[FUNCTION_ID]')
undefinedexecutions = functions.list_executions('[FUNCTION_ID]')
undefinedWriting a Function Handler (Python runtime)
编写函数处理程序(Python运行时)
python
undefinedpython
undefinedsrc/main.py — Appwrite Function entry point
src/main.py — Appwrite Function entry point
def main(context):
# context.req — request object
# .body — raw request body (string)
# .body_json — parsed JSON body (dict, or None if not JSON)
# .headers — request headers (dict)
# .method — HTTP method (GET, POST, etc.)
# .path — URL path
# .query — parsed query parameters (dict)
# .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)
if context.req.method == 'GET':
return context.res.json({'message': 'Hello from Appwrite Function!'})
data = context.req.body_json or {}
if 'name' not in data:
context.error('Missing name field')
return context.res.json({'error': 'Name is required'}, 400)
# Response methods
return context.res.json({'success': True}) # JSON response
# return context.res.text('Hello') # plain text
# return context.res.empty() # 204 No Content
# return context.res.redirect('https://example.com') # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'}) # custom responseundefineddef main(context):
# context.req — request object
# .body — raw request body (string)
# .body_json — parsed JSON body (dict, or None if not JSON)
# .headers — request headers (dict)
# .method — HTTP method (GET, POST, etc.)
# .path — URL path
# .query — parsed query parameters (dict)
# .query_string — raw query string
context.log('Processing: ' + context.req.method + ' ' + context.req.path)
if context.req.method == 'GET':
return context.res.json({'message': 'Hello from Appwrite Function!'})
data = context.req.body_json or {}
if 'name' not in data:
context.error('Missing name field')
return context.res.json({'error': 'Name is required'}, 400)
# Response methods
return context.res.json({'success': True}) # JSON response
# return context.res.text('Hello') # plain text
# return context.res.empty() # 204 No Content
# return context.res.redirect('https://example.com') # 302 Redirect
# return context.res.send('data', 200, {'X-Custom': '1'}) # custom responseundefinedServer-Side Rendering (SSR) Authentication
服务端渲染(SSR)认证
SSR apps (Flask, Django, FastAPI, etc.) use the server SDK to handle auth. You need two clients:
- Admin client — uses an API key, creates sessions, bypasses rate limits (reusable singleton)
- Session client — uses a session cookie, acts on behalf of a user (create per-request, never share)
python
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirectSSR应用(Flask、Django、FastAPI等)使用服务器SDK处理认证。您需要两个客户端:
- 管理员客户端 — 使用API密钥,可创建会话,绕过速率限制(可复用单例)
- 会话客户端 — 使用会话Cookie,代表用户执行操作(每个请求创建一次,切勿共享)
python
from appwrite.client import Client
from appwrite.services.account import Account
from flask import request, jsonify, make_response, redirectAdmin client (reusable)
Admin client (reusable)
admin_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_key(os.environ['APPWRITE_API_KEY']))
admin_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_key(os.environ['APPWRITE_API_KEY']))
Session client (create per-request)
Session client (create per-request)
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]')
if session:
session_client.set_session(session)
undefinedsession_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]'))
session = request.cookies.get('a_session_[PROJECT_ID]')
if session:
session_client.set_session(session)
undefinedEmail/Password Login
邮箱/密码登录
python
@app.post('/login')
def login():
account = Account(admin_client)
session = account.create_email_password_session(
request.json['email'], request.json['password']
)
# Cookie name must be a_session_<PROJECT_ID>
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resppython
@app.post('/login')
def login():
account = Account(admin_client)
session = account.create_email_password_session(
request.json['email'], request.json['password']
)
# Cookie name must be a_session_<PROJECT_ID>
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return respAuthenticated Requests
已认证请求
python
@app.get('/user')
def get_user():
session = request.cookies.get('a_session_[PROJECT_ID]')
if not session:
return jsonify({'error': 'Unauthorized'}), 401
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_session(session))
account = Account(session_client)
return jsonify(account.get())python
@app.get('/user')
def get_user():
session = request.cookies.get('a_session_[PROJECT_ID]')
if not session:
return jsonify({'error': 'Unauthorized'}), 401
session_client = (Client()
.set_endpoint('https://<REGION>.cloud.appwrite.io/v1')
.set_project('[PROJECT_ID]')
.set_session(session))
account = Account(session_client)
return jsonify(account.get())OAuth2 SSR Flow
OAuth2 SSR流程
python
undefinedpython
undefinedStep 1: Redirect to OAuth provider
Step 1: Redirect to OAuth provider
@app.get('/oauth')
def oauth():
account = Account(admin_client)
redirect_url = account.create_o_auth2_token(
OAuthProvider.Github,
'https://example.com/oauth/success',
'https://example.com/oauth/failure',
)
return redirect(redirect_url)
@app.get('/oauth')
def oauth():
account = Account(admin_client)
redirect_url = account.create_o_auth2_token(
OAuthProvider.Github,
'https://example.com/oauth/success',
'https://example.com/oauth/failure',
)
return redirect(redirect_url)
Step 2: Handle callback — exchange token for session
Step 2: Handle callback — exchange token for session
@app.get('/oauth/success')
def oauth_success():
account = Account(admin_client)
session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resp
> **Cookie security:** Always use `httponly`, `secure`, and `samesite='Strict'` to prevent XSS. The cookie name must be `a_session_<PROJECT_ID>`.
> **Forwarding user agent:** Call `session_client.set_forwarded_user_agent(request.headers.get('user-agent'))` to record the end-user's browser info for debugging and security.@app.get('/oauth/success')
def oauth_success():
account = Account(admin_client)
session = account.create_session(request.args['userId'], request.args['secret'])
resp = make_response(jsonify({'success': True}))
resp.set_cookie('a_session_[PROJECT_ID]', session['secret'],
httponly=True, secure=True, samesite='Strict',
expires=session['expire'], path='/')
return resp
> **Cookie安全:** 始终使用`httponly`、`secure`和`samesite='Strict'`以防止XSS攻击。Cookie名称必须为`a_session_<PROJECT_ID>`。
>
> **转发用户代理:** 调用`session_client.set_forwarded_user_agent(request.headers.get('user-agent'))`以记录终端用户的浏览器信息,用于调试和安全目的。Error Handling
错误处理
python
from appwrite.exception import AppwriteException
try:
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
print(e.message) # human-readable error message
print(e.code) # HTTP status code (int)
print(e.type) # Appwrite error type string (e.g. 'document_not_found')
print(e.response) # full response body (dict)Common error codes:
| Code | Meaning |
|---|---|
| Unauthorized — missing or invalid session/API key |
| Forbidden — insufficient permissions for this action |
| Not found — resource does not exist |
| Conflict — duplicate ID or unique constraint violation |
| Rate limited — too many requests, retry after backoff |
python
from appwrite.exception import AppwriteException
try:
row = tables_db.get_row('[DATABASE_ID]', '[TABLE_ID]', '[ROW_ID]')
except AppwriteException as e:
print(e.message) # human-readable error message
print(e.code) # HTTP status code (int)
print(e.type) # Appwrite error type string (e.g. 'document_not_found')
print(e.response) # full response body (dict)常见错误码:
| 代码 | 含义 |
|---|---|
| 未授权 — 缺少或无效的会话/API密钥 |
| 禁止访问 — 操作权限不足 |
| 未找到 — 资源不存在 |
| 冲突 — 重复ID或唯一约束违反 |
| 请求受限 — 请求过多,请在退避后重试 |
Permissions & Roles (Critical)
权限与角色(关键)
Appwrite uses permission strings to control access to resources. Each permission pairs an action (, , , , or which grants create + update + delete) with a role target. By default, no user has access unless permissions are explicitly set at the document/file level or inherited from the collection/bucket settings. Permissions are arrays of strings built with the and helpers.
readupdatedeletecreatewritePermissionRolepython
from appwrite.permission import Permission
from appwrite.role import RoleAppwrite使用权限字符串控制资源访问。每个权限将操作(、、、,或(包含create+update+delete))与角色目标配对。默认情况下,所有用户均无访问权限,除非在文档/文件级别明确设置权限,或从集合/存储桶设置继承权限。权限是使用和助手构建的字符串数组。
readupdatedeletecreatewritePermissionRolepython
from appwrite.permission import Permission
from appwrite.role import RoleDatabase Row with Permissions
带权限的数据库行
python
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
}, [
Permission.read(Role.user('[USER_ID]')), # specific user can read
Permission.update(Role.user('[USER_ID]')), # specific user can update
Permission.read(Role.team('[TEAM_ID]')), # all team members can read
Permission.read(Role.any()), # anyone (including guests) can read
])python
doc = tables_db.create_row('[DATABASE_ID]', '[TABLE_ID]', ID.unique(), {
'title': 'Hello World'
}, [
Permission.read(Role.user('[USER_ID]')), # specific user can read
Permission.update(Role.user('[USER_ID]')), # specific user can update
Permission.read(Role.team('[TEAM_ID]')), # all team members can read
Permission.read(Role.any()), # anyone (including guests) can read
])File Upload with Permissions
带权限的文件上传
python
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
Permission.read(Role.any()),
Permission.update(Role.user('[USER_ID]')),
Permission.delete(Role.user('[USER_ID]')),
])When to set permissions: Set document/file-level permissions when you need per-resource access control. If all documents in a collection share the same rules, configure permissions at the collection/bucket level and leave document permissions empty.
Common mistakes:
- Forgetting permissions — the resource becomes inaccessible to all users (including the creator)
withRole.any()/write/update— allows any user, including unauthenticated guests, to modify or remove the resourcedelete on sensitive data — makes the resource publicly readablePermission.read(Role.any())
python
file = storage.create_file('[BUCKET_ID]', ID.unique(), InputFile.from_path('/path/to/file.png'), [
Permission.read(Role.any()),
Permission.update(Role.user('[USER_ID]')),
Permission.delete(Role.user('[USER_ID]')),
])何时设置权限: 当您需要按资源进行访问控制时,设置文档/文件级权限。如果集合中的所有文档共享相同规则,请在集合/存储桶级别配置权限,并保持文档权限为空。常见错误:
- 忘记设置权限 — 资源对所有用户(包括创建者)均不可访问
- 对
/write/update使用delete— 允许任何用户(包括未认证访客)修改或删除资源Role.any()- 对敏感数据使用
— 使资源可公开读取Permission.read(Role.any())