Loading...
Loading...
Compare original and translation side by side
{
"workspace_id": "UUID",
"stf_id": "UUID",
"caller": "UUID | null",
"api_url": "http://api:8080",
"api_token": "internal_token",
"input": {
"operation": "...",
...user-defined parameters
},
"sources": {
"step_name": {
"output": {...previous step data}
}
}
}{
"workspace_id": "UUID",
"stf_id": "UUID",
"caller": "UUID | null",
"api_url": "http://api:8080",
"api_token": "internal_token",
"input": {
"operation": "...",
...user-defined parameters
},
"sources": {
"step_name": {
"output": {...previous step data}
}
}
}{
"output": {
"status": "success",
...custom result data
}
}{
"error": "Error message",
"type": "ErrorType"
}{
"output": {
"status": "success",
...custom result data
}
}{
"error": "Error message",
"type": "ErrorType"
}POST /api/v1/workspaces/{workspace_id}/sqlAuthorization: Bearer {api_token}
X-Internal-Bypass: true
X-Workspace-ID: {workspace_id}
X-STF-ID: {stf_id}{ "sql": "SELECT * FROM my_table LIMIT 10" }POST /api/v1/workspaces/{workspace_id}/sqlAuthorization: Bearer {api_token}
X-Internal-Bypass: true
X-Workspace-ID: {workspace_id}
X-STF-ID: {stf_id}{ "sql": "SELECT * FROM my_table LIMIT 10" }#!/usr/bin/env python3
import sys
import json
import requests
import logging
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
def execute_sql(api_url, api_token, workspace_id, stf_id, sql):
"""Execute SQL via D6E internal API"""
url = f"{api_url}/api/v1/workspaces/{workspace_id}/sql"
headers = {
"Authorization": f"Bearer {api_token}",
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspace_id,
"X-STF-ID": stf_id,
"Content-Type": "application/json"
}
response = requests.post(url, json={"sql": sql}, headers=headers)
response.raise_for_status()
return response.json()
def main():
try:
input_data = json.load(sys.stdin)
user_input = input_data["input"]
# Your business logic here
result = {"status": "success", "message": "Processed"}
print(json.dumps({"output": result}))
except Exception as e:
logging.error(f"Error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)
if __name__ == "__main__":
main()FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
RUN chmod +x main.py
ENTRYPOINT ["python3", "main.py"]requests>=2.31.0#!/usr/bin/env python3
import sys
import json
import requests
import logging
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
def execute_sql(api_url, api_token, workspace_id, stf_id, sql):
"""Execute SQL via D6E internal API"""
url = f"{api_url}/api/v1/workspaces/{workspace_id}/sql"
headers = {
"Authorization": f"Bearer {api_token}",
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspace_id,
"X-STF-ID": stf_id,
"Content-Type": "application/json"
}
response = requests.post(url, json={"sql": sql}, headers=headers)
response.raise_for_status()
return response.json()
def main():
try:
input_data = json.load(sys.stdin)
user_input = input_data["input"]
# Your business logic here
result = {"status": "success", "message": "Processed"}
print(json.dumps({"output": result}))
except Exception as e:
logging.error(f"Error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)
if __name__ == "__main__":
main()FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
RUN chmod +x main.py
ENTRYPOINT ["python3", "main.py"]requests>=2.31.0const axios = require("axios");
async function executeSql(apiUrl, apiToken, workspaceId, stfId, sql) {
const response = await axios.post(
`${apiUrl}/api/v1/workspaces/${workspaceId}/sql`,
{ sql },
{
headers: {
Authorization: `Bearer ${apiToken}`,
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspaceId,
"X-STF-ID": stfId,
"Content-Type": "application/json",
},
}
);
return response.data;
}
async function main() {
try {
const input = await readStdin();
const data = JSON.parse(input);
// Your business logic here
const result = { status: "success", message: "Processed" };
console.log(JSON.stringify({ output: result }));
} catch (error) {
console.error("Error:", error.message);
console.log(
JSON.stringify({
error: error.message,
type: error.name,
})
);
process.exit(1);
}
}
function readStdin() {
return new Promise((resolve) => {
let data = "";
process.stdin.on("data", (chunk) => (data += chunk));
process.stdin.on("end", () => resolve(data));
});
}
main();FROM node:18-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY index.js .
ENTRYPOINT ["node", "index.js"]const axios = require("axios");
async function executeSql(apiUrl, apiToken, workspaceId, stfId, sql) {
const response = await axios.post(
`${apiUrl}/api/v1/workspaces/${workspaceId}/sql`,
{ sql },
{
headers: {
Authorization: `Bearer ${apiToken}`,
"X-Internal-Bypass": "true",
"X-Workspace-ID": workspaceId,
"X-STF-ID": stfId,
"Content-Type": "application/json",
},
}
);
return response.data;
}
async function main() {
try {
const input = await readStdin();
const data = JSON.parse(input);
// Your business logic here
const result = { status: "success", message: "Processed" };
console.log(JSON.stringify({ output: result }));
} catch (error) {
console.error("Error:", error.message);
console.log(
JSON.stringify({
error: error.message,
type: error.name,
})
);
process.exit(1);
}
}
function readStdin() {
return new Promise((resolve) => {
let data = "";
process.stdin.on("data", (chunk) => (data += chunk));
process.stdin.on("end", () => resolve(data));
});
}
main();FROM node:18-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --omit=dev
COPY index.js .
ENTRYPOINT ["node", "index.js"]{"output": {...}}python:3.11-slim{"output": {...}}python:3.11-slim.dockerignore.dockerignoretry:
# Your logic
result = process_data(input_data)
print(json.dumps({"output": result}))
except ValueError as e:
# Validation errors
logging.error(f"Validation error: {str(e)}")
print(json.dumps({"error": str(e), "type": "ValidationError"}))
sys.exit(1)
except Exception as e:
# Unexpected errors
logging.error(f"Unexpected error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)try:
# Your logic
result = process_data(input_data)
print(json.dumps({"output": result}))
except ValueError as e:
# Validation errors
logging.error(f"Validation error: {str(e)}")
print(json.dumps({"error": str(e), "type": "ValidationError"}))
sys.exit(1)
except Exception as e:
# Unexpected errors
logging.error(f"Unexpected error: {str(e)}", exc_info=True)
print(json.dumps({"error": str(e), "type": type(e).__name__}))
sys.exit(1)import loggingimport loggingundefinedundefineddef validate_input(user_input):
required_fields = ["operation", "table_name"]
for field in required_fields:
if field not in user_input:
raise ValueError(f"Missing required field: {field}")
if user_input["operation"] not in ["query", "insert", "update"]:
raise ValueError(f"Invalid operation: {user_input['operation']}")
return Truedef validate_input(user_input):
required_fields = ["operation", "table_name"]
for field in required_fields:
if field not in user_input:
raise ValueError(f"Missing required field: {field}")
if user_input["operation"] not in ["query", "insert", "update"]:
raise ValueError(f"Invalid operation: {user_input['operation']}")
return Trueundefinedundefineddef safe_query(api_context, table_name, filters):
"""Execute a safe parameterized query"""
# Build WHERE clause safely
where_conditions = []
for key, value in filters.items():
# Simple validation
if not key.isidentifier():
raise ValueError(f"Invalid column name: {key}")
where_conditions.append(f"{key} = '{value}'")
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT 100"
return execute_sql(
api_context["api_url"],
api_context["api_token"],
api_context["workspace_id"],
api_context["stf_id"],
sql
)def safe_query(api_context, table_name, filters):
"""Execute a safe parameterized query"""
# Build WHERE clause safely
where_conditions = []
for key, value in filters.items():
# Simple validation
if not key.isidentifier():
raise ValueError(f"Invalid column name: {key}")
where_conditions.append(f"{key} = '{value}'")
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT 100"
return execute_sql(
api_context["api_url"],
api_context["api_token"],
api_context["workspace_id"],
api_context["stf_id"],
sql
)import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
"""Create session with retry logic"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def call_external_api(url, params):
"""Call external API with error handling"""
session = create_session()
try:
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.Timeout:
raise Exception("External API timeout")
except requests.RequestException as e:
raise Exception(f"External API error: {str(e)}")import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
"""Create session with retry logic"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def call_external_api(url, params):
"""Call external API with error handling"""
session = create_session()
try:
response = session.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.Timeout:
raise Exception("External API timeout")
except requests.RequestException as e:
raise Exception(f"External API error: {str(e)}")undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefined// Create policy group
d6e_create_policy_group({ name: "my-stf-group" });
// Add STF to group
d6e_add_member_to_policy_group({
policy_group_id: "{group_id}",
member_type: "stf",
member_id: "{stf_id}",
});
// Grant access
d6e_create_policy({
policy_group_id: "{group_id}",
table_name: "my_table",
operation: "select",
mode: "allow",
});// Create policy group
d6e_create_policy_group({ name: "my-stf-group" });
// Add STF to group
d6e_add_member_to_policy_group({
policy_group_id: "{group_id}",
member_type: "stf",
member_id: "{stf_id}",
});
// Grant access
d6e_create_policy({
policy_group_id: "{group_id}",
table_name: "my_table",
operation: "select",
mode: "allow",
});{"output": {...}}undefined{"output": {...}}undefinedundefinedundefinedundefinedundefinedundefinedundefinedmy-stf/
├── main.py (or index.js, main.go) # Entry point
├── Dockerfile # Container definition
├── requirements.txt (or package.json, go.mod) # Dependencies
├── .dockerignore # Exclude files
└── README.md # Documentation.git
.gitignore
*.md
tests/
__pycache__/
*.pyc
node_modules/
.envmy-stf/
├── main.py (或 index.js, main.go) # 入口文件
├── Dockerfile # 容器定义文件
├── requirements.txt (或 package.json, go.mod) # 依赖声明文件
├── .dockerignore # 排除文件配置
└── README.md # 文档说明.git
.gitignore
*.md
tests/
__pycache__/
*.pyc
node_modules/
.envghcr.io/d6e-ai/stf-xxx:latestghcr.io/d6e-ai/stf-xxx:latestundefinedundefinedghcr.io/{org}/{stf-name}:latestghcr.io/{组织名}/{stf名称}:latestd6e_create_stf({
name: "{stf-name}",
description: "{Description of the STF functionality}",
});d6e_create_stf({
name: "{stf名称}",
description: "{STF功能描述}",
});d6e_create_stf_version({
stf_id: "{stf_id from Step 1}",
version: "1.0.0",
runtime: "docker",
code: '{"image":"ghcr.io/{org}/{stf-name}:latest"}',
});runtime"docker"code{"image":"ghcr.io/{org}/{stf-name}:latest"}d6e_create_stf_version({
stf_id: "{步骤1中的stf_id}",
version: "1.0.0",
runtime: "docker",
code: '{"image":"ghcr.io/{组织名}/{stf名称}:latest"}',
});runtime"docker"code'{"image":"ghcr.io/{组织名}/{stf名称}:latest"}'d6e_create_workflow({
name: "{stf-name}-workflow",
input_steps: [],
stf_steps: [
{
stf_id: "{stf_id}",
version: "1.0.0",
},
],
effect_steps: [],
});d6e_create_workflow({
name: "{stf名称}-workflow",
input_steps: [],
stf_steps: [
{
stf_id: "{stf_id}",
version: "1.0.0",
},
],
effect_steps: [],
});d6e_execute_workflow({
workflow_id: "{workflow_id}",
input: {
operation: "{operation_name}",
// ...operation-specific parameters
},
});d6e_execute_workflow({
workflow_id: "{workflow_id}",
input: {
operation: "{操作名称}",
// ...操作特定参数
},
});| Operation | Required Parameters | Optional | DB Required | Description |
|---|---|---|---|---|
| | | ❌/✅ | {Description} |
| | - | ❌/✅ | {Description} |
| 操作 | 必填参数 | 可选参数 | 是否需要数据库 | 描述 |
|---|---|---|---|---|
| | | ❌/✅ | {描述} |
| | - | ❌/✅ | {描述} |
{
"operation": "{operation_name}",
"param1": "value1",
"param2": "value2"
}{
"output": {
"status": "success",
"operation": "{operation_name}",
"data": {
// ... result data
}
}
}{
"operation": "{操作名称}",
"param1": "value1",
"param2": "value2"
}{
"output": {
"status": "success",
"operation": "{操作名称}",
"data": {
// ... 结果数据
}
}
}Use the Docker skill for {task description} in D6E.
Docker Image: ghcr.io/{org}/{stf-name}:latest
Steps:
1. Create STF with d6e_create_stf (name: "{stf-name}")
2. Create STF version with d6e_create_stf_version:
- runtime: "docker"
- code: "{\"image\":\"ghcr.io/{org}/{stf-name}:latest\"}"
3. Create workflow with d6e_create_workflow
4. Execute with d6e_execute_workflow
Supported operations:
- "{operation_1}": {description} (required: {required_params})
- "{operation_2}": {description} (required: {required_params})
Start with {recommended_first_operation} to verify the setup.在D6E中使用Docker技能完成{任务描述}。
Docker镜像:ghcr.io/{组织名}/{stf名称}:latest
步骤:
1. 使用d6e_create_stf创建STF(名称:"{stf名称}")
2. 使用d6e_create_stf_version创建STF版本:
- runtime: "docker"
- code: '{"image":"ghcr.io/{组织名}/{stf名称}:latest"}'
3. 使用d6e_create_workflow创建工作流
4. 使用d6e_execute_workflow执行工作流
支持的操作:
- "{操作1}":{描述}(必填参数:{必填参数列表})
- "{操作2}":{描述}(必填参数:{必填参数列表})
建议先执行{推荐的首个操作}以验证设置是否正确。{Specific task description}
Skill to use:
- Docker Image: ghcr.io/{org}/{stf-name}:latest
- Operation: {operation_name}
Parameters:
- param1: "value1"
- param2: "value2"
Include the following in the results:
- {Expected output item 1}
- {Expected output item 2}{具体任务描述}
使用的技能:
- Docker镜像:ghcr.io/{组织名}/{stf名称}:latest
- 操作:{操作名称}
参数:
- param1: "value1"
- param2: "value2"
结果中需包含:
- {预期输出项1}
- {预期输出项2}{Complete workflow description}
Docker Image: ghcr.io/{org}/{stf-name}:latest
Execution steps:
1. Create STF (name: "{stf-name}", runtime: "docker")
2. {First operation description}:
- operation: "{operation_1}"
- param1: value1
- param2: value2
3. {Second operation description}:
- operation: "{operation_2}"
- param1: value1
4. Display results:
- {Output item 1}
- {Output item 2}
{Additional instructions or requests}{完整工作流描述}
Docker镜像:ghcr.io/{组织名}/{stf名称}:latest
执行步骤:
1. 创建STF(名称:"{stf名称}", runtime: "docker")
2. {第一个操作描述}:
- operation: "{操作1}"
- param1: value1
- param2: value2
3. {第二个操作描述}:
- operation: "{操作2}"
- param1: value1
4. 显示结果:
- {输出项1}
- {输出项2}
{额外说明或要求}undefinedundefinedundefinedundefinedundefinedundefinedruntime: "docker"code'{"image":"..."}'runtime: "docker"code'{"image":"..."}'