message-deliverability-optimization
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMessage Deliverability Optimization
消息Deliverability优化
You are an expert in building sales bots that maintain high deliverability through sender reputation management. Your goal is to help developers create systems that rotate numbers, warm domains, and manage reputation to ensure messages reach prospects.
您是一位擅长构建通过sender reputation管理维持高deliverability的销售机器人专家。您的目标是帮助开发者创建能够轮换号码、预热域名(warm domains)并管理信誉的系统,确保消息触达潜在客户。
Why Deliverability Matters
为什么Deliverability至关重要
The Hidden Problem
隐藏的问题
You think:
- 1000 emails sent
- 50 opens (5% open rate)
- "Low engagement, need better copy"
Reality:
- 1000 emails sent
- 400 landed in inbox
- 600 went to spam
- 50 opens (12.5% of delivered)
Your copy might be fine. Deliverability is broken.您以为:
- 发送了1000封邮件
- 50次打开(5%的打开率)
- "用户参与度低,需要优化文案"
实际情况:
- 发送了1000封邮件
- 400封进入收件箱
- 600封进入垃圾邮件
- 50次打开(占送达邮件的12.5%)
您的文案可能没问题,是Deliverability出了问题。Email Deliverability
邮件Deliverability
Domain Reputation Management
域名信誉管理(Domain Reputation Management)
python
class DomainManager:
def __init__(self, domain):
self.domain = domain
self.reputation_score = 100
self.daily_send_limit = self.calculate_limit()
self.sent_today = 0
def calculate_limit(self):
"""Dynamic limit based on reputation"""
base_limit = 200 # New domain
# Increase with age and reputation
if self.domain_age_days > 90:
base_limit += 100
if self.reputation_score > 90:
base_limit *= 2
elif self.reputation_score < 70:
base_limit *= 0.5
return int(base_limit)
def can_send(self):
if self.sent_today >= self.daily_send_limit:
return False
if self.reputation_score < 50:
return False # Domain is burned
return True
def record_send(self, result):
self.sent_today += 1
# Update reputation based on results
if result.get("bounced"):
self.reputation_score -= 2
if result.get("spam_complaint"):
self.reputation_score -= 10
if result.get("opened"):
self.reputation_score += 0.1
if result.get("replied"):
self.reputation_score += 0.5python
class DomainManager:
def __init__(self, domain):
self.domain = domain
self.reputation_score = 100
self.daily_send_limit = self.calculate_limit()
self.sent_today = 0
def calculate_limit(self):
"""基于reputation的动态发送限制"""
base_limit = 200 # 新域名初始限制
# 根据域名时长和信誉调整限制
if self.domain_age_days > 90:
base_limit += 100
if self.reputation_score > 90:
base_limit *= 2
elif self.reputation_score < 70:
base_limit *= 0.5
return int(base_limit)
def can_send(self):
if self.sent_today >= self.daily_send_limit:
return False
if self.reputation_score < 50:
return False # 域名已失效
return True
def record_send(self, result):
self.sent_today += 1
# 根据发送结果更新信誉
if result.get("bounced"):
self.reputation_score -= 2
if result.get("spam_complaint"):
self.reputation_score -= 10
if result.get("opened"):
self.reputation_score += 0.1
if result.get("replied"):
self.reputation_score += 0.5Domain Warming
域名预热(Domain Warming)
python
def warm_new_domain(domain, target_volume):
"""Gradually increase sending volume for new domain"""
warming_schedule = [
{"day": 1, "sends": 20, "target": "engaged_list"},
{"day": 2, "sends": 30, "target": "engaged_list"},
{"day": 3, "sends": 50, "target": "engaged_list"},
{"day": 4, "sends": 75, "target": "engaged_list"},
{"day": 5, "sends": 100, "target": "mixed_list"},
{"day": 6, "sends": 150, "target": "mixed_list"},
{"day": 7, "sends": 200, "target": "mixed_list"},
# Continue gradually...
]
for day_config in warming_schedule:
schedule_warming_sends(
domain=domain,
count=day_config["sends"],
list_type=day_config["target"]
)
# Monitor during warmup
monitor_warmup_metrics(domain)python
def warm_new_domain(domain, target_volume):
"""逐步提升新域名的发送量"""
warming_schedule = [
{"day": 1, "sends": 20, "target": "engaged_list"},
{"day": 2, "sends": 30, "target": "engaged_list"},
{"day": 3, "sends": 50, "target": "engaged_list"},
{"day": 4, "sends": 75, "target": "engaged_list"},
{"day": 5, "sends": 100, "target": "mixed_list"},
{"day": 6, "sends": 150, "target": "mixed_list"},
{"day": 7, "sends": 200, "target": "mixed_list"},
# 持续逐步提升...
]
for day_config in warming_schedule:
schedule_warming_sends(
domain=domain,
count=day_config["sends"],
list_type=day_config["target"]
)
# 预热期间监控指标
monitor_warmup_metrics(domain)Domain Rotation
域名轮换(Domain Rotation)
python
class DomainRotator:
def __init__(self, domains):
self.domains = domains
self.domain_managers = {d: DomainManager(d) for d in domains}
def select_domain(self, prospect):
available = [
d for d in self.domains
if self.domain_managers[d].can_send()
]
if not available:
return None
# Select based on rotation strategy
if self.strategy == "round_robin":
return self.round_robin(available)
elif self.strategy == "reputation_weighted":
return self.reputation_weighted(available)
elif self.strategy == "recipient_matched":
return self.match_to_recipient(available, prospect)
def reputation_weighted(self, available):
"""Prefer domains with better reputation"""
weights = [
self.domain_managers[d].reputation_score
for d in available
]
total = sum(weights)
probs = [w/total for w in weights]
return random.choices(available, probs)[0]python
class DomainRotator:
def __init__(self, domains):
self.domains = domains
self.domain_managers = {d: DomainManager(d) for d in domains}
def select_domain(self, prospect):
available = [
d for d in self.domains
if self.domain_managers[d].can_send()
]
if not available:
return None
# 根据轮换策略选择域名
if self.strategy == "round_robin":
return self.round_robin(available)
elif self.strategy == "reputation_weighted":
return self.reputation_weighted(available)
elif self.strategy == "recipient_matched":
return self.match_to_recipient(available, prospect)
def reputation_weighted(self, available):
"""优先选择信誉更好的域名"""
weights = [
self.domain_managers[d].reputation_score
for d in available
]
total = sum(weights)
probs = [w/total for w in weights]
return random.choices(available, probs)[0]SMS Deliverability
SMS Deliverability
Number Rotation
号码轮换(Number Rotation)
python
class SMSNumberManager:
def __init__(self):
self.numbers = []
self.daily_limits = {}
self.carrier_scores = {}
def add_number(self, number, carrier=None):
self.numbers.append({
"number": number,
"carrier": carrier,
"reputation": 100,
"daily_sent": 0,
"daily_limit": 200, # Start conservative
"status": "active"
})
def select_number(self, recipient_number):
available = [n for n in self.numbers if self.can_send(n)]
if not available:
return None
# Consider carrier matching
recipient_carrier = lookup_carrier(recipient_number)
matching_carrier = [
n for n in available
if n["carrier"] == recipient_carrier
]
if matching_carrier:
return self.select_best(matching_carrier)
return self.select_best(available)
def select_best(self, numbers):
"""Select number with best reputation and capacity"""
return max(numbers, key=lambda n: (
n["reputation"],
n["daily_limit"] - n["daily_sent"]
))
def report_issue(self, number, issue_type):
for n in self.numbers:
if n["number"] == number:
if issue_type == "undelivered":
n["reputation"] -= 5
elif issue_type == "blocked":
n["reputation"] -= 20
if n["reputation"] < 50:
n["status"] = "burned"
elif issue_type == "carrier_block":
n["status"] = "carrier_blocked"python
class SMSNumberManager:
def __init__(self):
self.numbers = []
self.daily_limits = {}
self.carrier_scores = {}
def add_number(self, number, carrier=None):
self.numbers.append({
"number": number,
"carrier": carrier,
"reputation": 100,
"daily_sent": 0,
"daily_limit": 200, # 初始保守限制
"status": "active"
})
def select_number(self, recipient_number):
available = [n for n in self.numbers if self.can_send(n)]
if not available:
return None
# 考虑运营商匹配
recipient_carrier = lookup_carrier(recipient_number)
matching_carrier = [
n for n in available
if n["carrier"] == recipient_carrier
]
if matching_carrier:
return self.select_best(matching_carrier)
return self.select_best(available)
def select_best(self, numbers):
"""选择信誉和剩余容量最佳的号码"""
return max(numbers, key=lambda n: (
n["reputation"],
n["daily_limit"] - n["daily_sent"]
))
def report_issue(self, number, issue_type):
for n in self.numbers:
if n["number"] == number:
if issue_type == "undelivered":
n["reputation"] -= 5
elif issue_type == "blocked":
n["reputation"] -= 20
if n["reputation"] < 50:
n["status"] = "burned"
elif issue_type == "carrier_block":
n["status"] = "carrier_blocked"SMS Compliance
SMS合规(SMS Compliance)
python
def ensure_sms_compliance(message, recipient):
issues = []
# Check opt-in status
if not has_sms_consent(recipient):
issues.append("no_consent")
# Check time (TCPA)
recipient_time = get_local_time(recipient)
if not (8 <= recipient_time.hour <= 21):
issues.append("outside_hours")
# Check content
if contains_blocked_keywords(message):
issues.append("blocked_content")
# Check rate limits
recent_sms = count_recent_sms(recipient, hours=24)
if recent_sms >= MAX_SMS_PER_DAY:
issues.append("rate_limit_exceeded")
return issuespython
def ensure_sms_compliance(message, recipient):
issues = []
# 检查用户授权状态
if not has_sms_consent(recipient):
issues.append("no_consent")
# 检查发送时间(符合TCPA规定)
recipient_time = get_local_time(recipient)
if not (8 <= recipient_time.hour <= 21):
issues.append("outside_hours")
# 检查内容
if contains_blocked_keywords(message):
issues.append("blocked_content")
# 检查发送频率限制
recent_sms = count_recent_sms(recipient, hours=24)
if recent_sms >= MAX_SMS_PER_DAY:
issues.append("rate_limit_exceeded")
return issuesContent Optimization
内容优化
Spam Filter Avoidance
规避垃圾邮件过滤器(Spam Filter Avoidance)
python
def check_spam_triggers(content):
triggers = []
# Known spam words
spam_words = [
"free", "guarantee", "urgent", "act now",
"limited time", "winner", "congratulations"
]
for word in spam_words:
if word.lower() in content.lower():
triggers.append({"type": "spam_word", "word": word})
# Excessive caps
caps_ratio = sum(1 for c in content if c.isupper()) / len(content)
if caps_ratio > 0.3:
triggers.append({"type": "excessive_caps", "ratio": caps_ratio})
# Excessive punctuation
if content.count('!') > 2 or content.count('?') > 3:
triggers.append({"type": "excessive_punctuation"})
# Link analysis
links = extract_links(content)
for link in links:
if is_shortened_url(link):
triggers.append({"type": "shortened_url", "url": link})
if is_suspicious_domain(link):
triggers.append({"type": "suspicious_domain", "url": link})
return triggers
def optimize_for_deliverability(content):
triggers = check_spam_triggers(content)
if triggers:
# Suggest modifications
suggestions = []
for trigger in triggers:
if trigger["type"] == "spam_word":
suggestions.append(f"Replace '{trigger['word']}' with alternative")
elif trigger["type"] == "shortened_url":
suggestions.append(f"Use full URL instead of shortened")
return {
"original": content,
"triggers": triggers,
"suggestions": suggestions,
"optimized": auto_optimize(content, triggers)
}
return {"original": content, "triggers": [], "optimized": content}python
def check_spam_triggers(content):
triggers = []
# 已知垃圾邮件关键词
spam_words = [
"free", "guarantee", "urgent", "act now",
"limited time", "winner", "congratulations"
]
for word in spam_words:
if word.lower() in content.lower():
triggers.append({"type": "spam_word", "word": word})
# 过多大写字母
caps_ratio = sum(1 for c in content if c.isupper()) / len(content)
if caps_ratio > 0.3:
triggers.append({"type": "excessive_caps", "ratio": caps_ratio})
# 过多标点符号
if content.count('!') > 2 or content.count('?') > 3:
triggers.append({"type": "excessive_punctuation"})
# 链接分析
links = extract_links(content)
for link in links:
if is_shortened_url(link):
triggers.append({"type": "shortened_url", "url": link})
if is_suspicious_domain(link):
triggers.append({"type": "suspicious_domain", "url": link})
return triggers
def optimize_for_deliverability(content):
triggers = check_spam_triggers(content)
if triggers:
# 提供修改建议
suggestions = []
for trigger in triggers:
if trigger["type"] == "spam_word":
suggestions.append(f"将'{trigger['word']}'替换为其他表述")
elif trigger["type"] == "shortened_url":
suggestions.append(f"使用完整URL而非短链接")
return {
"original": content,
"triggers": triggers,
"suggestions": suggestions,
"optimized": auto_optimize(content, triggers)
}
return {"original": content, "triggers": [], "optimized": content}Personalization for Deliverability
个性化提升Deliverability
python
def personalize_for_deliverability(template, prospect):
"""Personalization helps deliverability"""
personalized = template
# Replace placeholders
personalized = personalized.replace("{first_name}", prospect.first_name)
personalized = personalized.replace("{company}", prospect.company)
# Add unique elements
personalized = add_unique_element(personalized, prospect)
# Vary sentence structure
personalized = apply_sentence_variations(personalized)
return personalized
def add_unique_element(content, prospect):
"""Add something unique to avoid template detection"""
options = [
f"I noticed {prospect.company}'s work on {prospect.recent_news}",
f"Your {prospect.role} perspective would be valuable",
f"Given {prospect.company}'s focus on {prospect.industry_vertical}",
]
# Select based on available data
for option in options:
if all(placeholder in option for placeholder in extract_placeholders(option)):
return content + "\n\n" + option
return contentpython
def personalize_for_deliverability(template, prospect):
"""个性化有助于提升Deliverability"""
personalized = template
# 替换占位符
personalized = personalized.replace("{first_name}", prospect.first_name)
personalized = personalized.replace("{company}", prospect.company)
# 添加独特元素
personalized = add_unique_element(personalized, prospect)
# 调整句子结构多样性
personalized = apply_sentence_variations(personalized)
return personalized
def add_unique_element(content, prospect):
"""添加独特内容,避免被识别为模板消息"""
options = [
f"I noticed {prospect.company}'s work on {prospect.recent_news}",
f"Your {prospect.role} perspective would be valuable",
f"Given {prospect.company}'s focus on {prospect.industry_vertical}",
]
# 根据可用数据选择合适内容
for option in options:
if all(placeholder in option for placeholder in extract_placeholders(option)):
return content + "\n\n" + option
return contentMonitoring & Metrics
监控与指标
Deliverability Metrics
Deliverability指标
python
def calculate_deliverability_metrics(time_period):
sends = get_sends(time_period)
metrics = {
"total_sent": len(sends),
"delivered": len([s for s in sends if s.delivered]),
"bounced": len([s for s in sends if s.bounced]),
"spam_complaints": len([s for s in sends if s.spam_complaint]),
"delivery_rate": calculate_rate(sends, "delivered"),
"bounce_rate": calculate_rate(sends, "bounced"),
"complaint_rate": calculate_rate(sends, "spam_complaint"),
"inbox_placement": estimate_inbox_placement(sends)
}
# By domain
metrics["by_domain"] = {}
for domain in get_domains():
domain_sends = [s for s in sends if s.domain == domain]
metrics["by_domain"][domain] = {
"delivery_rate": calculate_rate(domain_sends, "delivered"),
"bounce_rate": calculate_rate(domain_sends, "bounced"),
"reputation_score": get_domain_reputation(domain)
}
return metricspython
def calculate_deliverability_metrics(time_period):
sends = get_sends(time_period)
metrics = {
"total_sent": len(sends),
"delivered": len([s for s in sends if s.delivered]),
"bounced": len([s for s in sends if s.bounced]),
"spam_complaints": len([s for s in sends if s.spam_complaint]),
"delivery_rate": calculate_rate(sends, "delivered"),
"bounce_rate": calculate_rate(sends, "bounced"),
"complaint_rate": calculate_rate(sends, "spam_complaint"),
"inbox_placement": estimate_inbox_placement(sends)
}
# 按域名拆分指标
metrics["by_domain"] = {}
for domain in get_domains():
domain_sends = [s for s in sends if s.domain == domain]
metrics["by_domain"][domain] = {
"delivery_rate": calculate_rate(domain_sends, "delivered"),
"bounce_rate": calculate_rate(domain_sends, "bounced"),
"reputation_score": get_domain_reputation(domain)
}
return metricsAlerting
告警机制
python
DELIVERABILITY_ALERTS = [
{
"name": "high_bounce_rate",
"condition": lambda m: m["bounce_rate"] > 0.05,
"action": "pause_sending"
},
{
"name": "spam_complaints",
"condition": lambda m: m["complaint_rate"] > 0.001,
"action": "review_content"
},
{
"name": "domain_reputation_drop",
"condition": lambda m: any(d["reputation_score"] < 70 for d in m["by_domain"].values()),
"action": "rotate_domains"
}
]python
DELIVERABILITY_ALERTS = [
{
"name": "high_bounce_rate",
"condition": lambda m: m["bounce_rate"] > 0.05,
"action": "pause_sending"
},
{
"name": "spam_complaints",
"condition": lambda m: m["complaint_rate"] > 0.001,
"action": "review_content"
},
{
"name": "domain_reputation_drop",
"condition": lambda m: any(d["reputation_score"] < 70 for d in m["by_domain"].values()),
"action": "rotate_domains"
}
]Recovery
信誉恢复
Reputation Recovery
域名信誉恢复
python
def recover_domain_reputation(domain):
"""Steps to recover a damaged domain"""
# 1. Pause sending
pause_domain(domain)
# 2. Analyze issues
issues = analyze_domain_issues(domain)
# 3. Clean email list
remove_hard_bounces(domain)
remove_complainers(domain)
# 4. Start slow recovery
recovery_schedule = [
{"day": 1, "sends": 10, "target": "most_engaged"},
{"day": 2, "sends": 20, "target": "most_engaged"},
{"day": 3, "sends": 30, "target": "highly_engaged"},
# Gradually increase
]
# 5. Monitor closely
enable_enhanced_monitoring(domain)
return schedule_recovery(domain, recovery_schedule)python
def recover_domain_reputation(domain):
"""修复受损域名信誉的步骤"""
# 1. 暂停发送
pause_domain(domain)
# 2. 分析问题根源
issues = analyze_domain_issues(domain)
# 3. 清理邮件列表
remove_hard_bounces(domain)
remove_complainers(domain)
# 4. 启动缓慢恢复计划
recovery_schedule = [
{"day": 1, "sends": 10, "target": "most_engaged"},
{"day": 2, "sends": 20, "target": "most_engaged"},
{"day": 3, "sends": 30, "target": "highly_engaged"},
# 逐步提升发送量
]
# 5. 密切监控
enable_enhanced_monitoring(domain)
return schedule_recovery(domain, recovery_schedule)