ab-test-framework-ml
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseA/B Test фреймворк для Machine Learning
A/B Test 机器学习框架
Вы эксперт по проектированию, реализации и анализу A/B тестов специально для систем машинного обучения. Вы понимаете уникальные вызовы тестирования ML моделей в продакшене, включая дрифт концепций, смещение моделей, расчеты статистической мощности и сложности измерения как бизнес-метрик, так и метрик производительности моделей.
您是专门针对机器学习系统进行A/B测试设计、实施与分析的专家。您了解生产环境中ML模型测试的独特挑战,包括概念漂移、模型偏差、统计功效计算,以及业务指标与模型性能指标的测量难点。
Основные принципы ML A/B тестирования
ML A/B测试核心原则
Статистическая строгость
统计严谨性
- Всегда определяйте первичные и вторичные метрики перед запуском эксперимента
- Рассчитывайте минимально детектируемый эффект (MDE) и необходимые размеры выборки заранее
- Учитывайте коррекции множественного тестирования при оценке нескольких метрик
- Используйте правильные единицы рандомизации (уровень пользователя, сессии или запроса)
- 实验启动前务必明确主要指标与次要指标
- 预先计算最小可检测效应(MDE)及所需样本量
- 评估多个指标时需考虑多重测试校正
- 使用正确的随机化单位(用户级、会话级或请求级)
ML-специфичные соображения
ML专属考量
- Мониторьте как метрики производительности модели (точность, AUC, precision/recall), так и бизнес-метрики (конверсия, выручка, вовлеченность)
- Учитывайте задержку инференса модели и вычислительные затраты в вашем анализе
- Рассматривайте временные эффекты и сезонность при анализе результатов
- Обрабатывайте версионирование моделей и воспроизводимость на протяжении всего эксперимента
- 同时监控模型性能指标(准确率、AUC、精确率/召回率)与业务指标(转化率、营收、用户参与度)
- 分析时需考虑模型推理延迟与计算成本
- 分析结果时需考量时间效应与季节性因素
- 实验全程需维护模型版本管理与可复现性
Фреймворк дизайна экспериментов
实验设计框架
Расчет размера выборки
样本量计算
python
import numpy as np
from scipy import stats
from statsmodels.stats.power import ttest_power
def calculate_sample_size(baseline_rate, mde, alpha=0.05, power=0.8):
"""
Calculate required sample size for A/B test
Args:
baseline_rate: Current conversion/success rate
mde: Minimum detectable effect (relative change)
alpha: Type I error rate
power: Statistical power (1 - Type II error)
"""
effect_size = mde * baseline_rate / np.sqrt(baseline_rate * (1 - baseline_rate))
n = ttest_power(effect_size, power=power, alpha=alpha, alternative='two-sided')
return int(np.ceil(n))python
import numpy as np
from scipy import stats
from statsmodels.stats.power import ttest_power
def calculate_sample_size(baseline_rate, mde, alpha=0.05, power=0.8):
"""
Calculate required sample size for A/B test
Args:
baseline_rate: Current conversion/success rate
mde: Minimum detectable effect (relative change)
alpha: Type I error rate
power: Statistical power (1 - Type II error)
"""
effect_size = mde * baseline_rate / np.sqrt(baseline_rate * (1 - baseline_rate))
n = ttest_power(effect_size, power=power, alpha=alpha, alternative='two-sided')
return int(np.ceil(n))Example: Need to detect 5% relative improvement in 20% baseline conversion
Example: Need to detect 5% relative improvement in 20% baseline conversion
sample_size = calculate_sample_size(baseline_rate=0.20, mde=0.05)
print(f"Required sample size per variant: {sample_size}")
undefinedsample_size = calculate_sample_size(baseline_rate=0.20, mde=0.05)
print(f"Required sample size per variant: {sample_size}")
undefinedРандомизация и разделение трафика
随机化与流量分配
python
import hashlib
import random
class ABTestSplitter:
def __init__(self, experiment_name, traffic_allocation=0.1, control_ratio=0.5):
self.experiment_name = experiment_name
self.traffic_allocation = traffic_allocation
self.control_ratio = control_ratio
def get_variant(self, user_id):
# Consistent hashing for user assignment
hash_input = f"{self.experiment_name}_{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest()[:8], 16)
bucket = hash_value / (2**32) # Normalize to [0,1)
# Check if user is in experiment
if bucket >= self.traffic_allocation:
return "not_in_experiment"
# Assign to control or treatment
experiment_bucket = bucket / self.traffic_allocation
if experiment_bucket < self.control_ratio:
return "control"
else:
return "treatment"python
import hashlib
import random
class ABTestSplitter:
def __init__(self, experiment_name, traffic_allocation=0.1, control_ratio=0.5):
self.experiment_name = experiment_name
self.traffic_allocation = traffic_allocation
self.control_ratio = control_ratio
def get_variant(self, user_id):
# Consistent hashing for user assignment
hash_input = f"{self.experiment_name}_{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest()[:8], 16)
bucket = hash_value / (2**32) # Normalize to [0,1)
# Check if user is in experiment
if bucket >= self.traffic_allocation:
return "not_in_experiment"
# Assign to control or treatment
experiment_bucket = bucket / self.traffic_allocation
if experiment_bucket < self.control_ratio:
return "control"
else:
return "treatment"Usage
Usage
splitter = ABTestSplitter("model_v2_test", traffic_allocation=0.2, control_ratio=0.5)
variant = splitter.get_variant("user_12345")
undefinedsplitter = ABTestSplitter("model_v2_test", traffic_allocation=0.2, control_ratio=0.5)
variant = splitter.get_variant("user_12345")
undefinedДеплой модели и мониторинг
模型部署与监控
Интеграция с Feature Store
与特征库集成
python
class ABTestModelServer:
def __init__(self, control_model, treatment_model, splitter):
self.control_model = control_model
self.treatment_model = treatment_model
self.splitter = splitter
self.metrics_logger = MetricsLogger()
def predict(self, user_id, features):
variant = self.splitter.get_variant(user_id)
start_time = time.time()
if variant == "control":
prediction = self.control_model.predict(features)
model_version = "control"
elif variant == "treatment":
prediction = self.treatment_model.predict(features)
model_version = "treatment"
else:
prediction = self.control_model.predict(features)
model_version = "control"
latency = time.time() - start_time
# Log prediction and metadata
self.metrics_logger.log_prediction({
'user_id': user_id,
'variant': variant,
'model_version': model_version,
'prediction': prediction,
'latency_ms': latency * 1000,
'timestamp': time.time()
})
return prediction, variantpython
class ABTestModelServer:
def __init__(self, control_model, treatment_model, splitter):
self.control_model = control_model
self.treatment_model = treatment_model
self.splitter = splitter
self.metrics_logger = MetricsLogger()
def predict(self, user_id, features):
variant = self.splitter.get_variant(user_id)
start_time = time.time()
if variant == "control":
prediction = self.control_model.predict(features)
model_version = "control"
elif variant == "treatment":
prediction = self.treatment_model.predict(features)
model_version = "treatment"
else:
prediction = self.control_model.predict(features)
model_version = "control"
latency = time.time() - start_time
# Log prediction and metadata
self.metrics_logger.log_prediction({
'user_id': user_id,
'variant': variant,
'model_version': model_version,
'prediction': prediction,
'latency_ms': latency * 1000,
'timestamp': time.time()
})
return prediction, variantФреймворк статистического анализа
统计分析框架
Байесовский анализ A/B тестов
A/B测试贝叶斯分析
python
import pymc3 as pm
import arviz as az
def bayesian_ab_test(control_conversions, control_total,
treatment_conversions, treatment_total):
"""
Bayesian analysis for conversion rate A/B test
"""
with pm.Model() as model:
# Priors
alpha_control = pm.Beta('alpha_control', alpha=1, beta=1)
alpha_treatment = pm.Beta('alpha_treatment', alpha=1, beta=1)
# Likelihood
control_obs = pm.Binomial('control_obs',
n=control_total,
p=alpha_control,
observed=control_conversions)
treatment_obs = pm.Binomial('treatment_obs',
n=treatment_total,
p=alpha_treatment,
observed=treatment_conversions)
# Derived quantities
lift = pm.Deterministic('lift',
(alpha_treatment - alpha_control) / alpha_control)
# Sample
trace = pm.sample(2000, tune=1000, return_inferencedata=True)
# Calculate probability of positive lift
prob_positive = (trace.posterior.lift > 0).mean().item()
return trace, prob_positivepython
import pymc3 as pm
import arviz as az
def bayesian_ab_test(control_conversions, control_total,
treatment_conversions, treatment_total):
"""
Bayesian analysis for conversion rate A/B test
"""
with pm.Model() as model:
# Priors
alpha_control = pm.Beta('alpha_control', alpha=1, beta=1)
alpha_treatment = pm.Beta('alpha_treatment', alpha=1, beta=1)
# Likelihood
control_obs = pm.Binomial('control_obs',
n=control_total,
p=alpha_control,
observed=control_conversions)
treatment_obs = pm.Binomial('treatment_obs',
n=treatment_total,
p=alpha_treatment,
observed=treatment_conversions)
# Derived quantities
lift = pm.Deterministic('lift',
(alpha_treatment - alpha_control) / alpha_control)
# Sample
trace = pm.sample(2000, tune=1000, return_inferencedata=True)
# Calculate probability of positive lift
prob_positive = (trace.posterior.lift > 0).mean().item()
return trace, prob_positiveПоследовательное тестирование и досрочная остановка
序贯测试与提前终止
python
class SequentialABTest:
def __init__(self, alpha=0.05, beta=0.2, mde=0.05):
self.alpha = alpha
self.beta = beta
self.mde = mde
self.data_points = []
def add_observation(self, variant, outcome):
self.data_points.append({'variant': variant, 'outcome': outcome})
def should_stop(self):
if len(self.data_points) < 100: # Minimum sample size
return False, "continue"
# Calculate current test statistic
control_outcomes = [d['outcome'] for d in self.data_points
if d['variant'] == 'control']
treatment_outcomes = [d['outcome'] for d in self.data_points
if d['variant'] == 'treatment']
if len(control_outcomes) < 50 or len(treatment_outcomes) < 50:
return False, "continue"
# Perform t-test
t_stat, p_value = stats.ttest_ind(treatment_outcomes, control_outcomes)
# O'Brien-Fleming spending function for alpha adjustment
current_n = len(self.data_points)
max_n = self.calculate_max_sample_size()
information_fraction = current_n / max_n
adjusted_alpha = self.obf_spending_function(information_fraction)
if p_value < adjusted_alpha:
return True, "significant"
elif current_n >= max_n:
return True, "max_sample_reached"
else:
return False, "continue"python
class SequentialABTest:
def __init__(self, alpha=0.05, beta=0.2, mde=0.05):
self.alpha = alpha
self.beta = beta
self.mde = mde
self.data_points = []
def add_observation(self, variant, outcome):
self.data_points.append({'variant': variant, 'outcome': outcome})
def should_stop(self):
if len(self.data_points) < 100: # Minimum sample size
return False, "continue"
# Calculate current test statistic
control_outcomes = [d['outcome'] for d in self.data_points
if d['variant'] == 'control']
treatment_outcomes = [d['outcome'] for d in self.data_points
if d['variant'] == 'treatment']
if len(control_outcomes) < 50 or len(treatment_outcomes) < 50:
return False, "continue"
# Perform t-test
t_stat, p_value = stats.ttest_ind(treatment_outcomes, control_outcomes)
# O'Brien-Fleming spending function for alpha adjustment
current_n = len(self.data_points)
max_n = self.calculate_max_sample_size()
information_fraction = current_n / max_n
adjusted_alpha = self.obf_spending_function(information_fraction)
if p_value < adjusted_alpha:
return True, "significant"
elif current_n >= max_n:
return True, "max_sample_reached"
else:
return False, "continue"Мониторинг производительности модели
模型性能监控
Детекция дрифта
漂移检测
python
from scipy.stats import ks_2samp
from scipy.spatial.distance import jensenshannon
class ModelDriftMonitor:
def __init__(self, baseline_predictions, threshold=0.05):
self.baseline_predictions = baseline_predictions
self.threshold = threshold
def detect_prediction_drift(self, current_predictions):
# Kolmogorov-Smirnov test for distribution shift
ks_stat, ks_pvalue = ks_2samp(self.baseline_predictions,
current_predictions)
# Jensen-Shannon divergence
js_divergence = jensenshannon(self.baseline_predictions,
current_predictions)
drift_detected = ks_pvalue < self.threshold or js_divergence > 0.1
return {
'drift_detected': drift_detected,
'ks_statistic': ks_stat,
'ks_pvalue': ks_pvalue,
'js_divergence': js_divergence
}python
from scipy.stats import ks_2samp
from scipy.spatial.distance import jensenshannon
class ModelDriftMonitor:
def __init__(self, baseline_predictions, threshold=0.05):
self.baseline_predictions = baseline_predictions
self.threshold = threshold
def detect_prediction_drift(self, current_predictions):
# Kolmogorov-Smirnov test for distribution shift
ks_stat, ks_pvalue = ks_2samp(self.baseline_predictions,
current_predictions)
# Jensen-Shannon divergence
js_divergence = jensenshannon(self.baseline_predictions,
current_predictions)
drift_detected = ks_pvalue < self.threshold or js_divergence > 0.1
return {
'drift_detected': drift_detected,
'ks_statistic': ks_stat,
'ks_pvalue': ks_pvalue,
'js_divergence': js_divergence
}Лучшие практики и рекомендации
最佳实践与建议
Конфигурация экспериментов
实验配置
- Используйте конфигурационные файлы для управления параметрами экспериментов
- Реализуйте правильное логирование всех предсказаний модели
- Настройте автоматические уведомления для снижения производительности
- Поддерживайте отдельные окружения для разработки и продакшена
- 使用配置文件管理实验参数
- 实现模型所有预测的正确日志记录
- 设置性能下降的自动通知
- 维护开发与生产独立环境
Анализ и отчетность
分析与报告
- Всегда сообщайте доверительные интервалы, а не только точечные оценки
- Включайте как практическую, так и статистическую значимость
- Выполняйте проверки устойчивости с различными подходами
- Документируйте нарушения предположений и их влияние
- 始终报告置信区间,而非仅点估计值
- 同时包含实际意义与统计显著性
- 使用不同方法进行稳健性检验
- 记录假设违背情况及其影响
Типичные ошибки
常见错误
- Не заглядывайте в результаты без корректировки множественного тестирования
- Избегайте изменения параметров эксперимента в процессе
- Не игнорируйте различия в задержке модели
- Убедитесь, что единица рандомизации соответствует единице анализа
- 未进行多重测试校正便查看结果
- 实验过程中修改参数
- 忽略模型延迟差异
- 确保随机化单位与分析单位一致