two-sigma-ml-at-scale

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Two Sigma Style Guide

Two Sigma 风格指南

Overview

概述

Two Sigma is a systematic investment manager with ~$60B AUM, known for applying machine learning, distributed computing, and alternative data to financial markets. They operate like a technology company, investing heavily in data infrastructure, ML platforms, and research tooling.
Two Sigma是一家管理规模约600亿美元资产(AUM)的系统性投资管理公司,以将机器学习、分布式计算和另类数据应用于金融市场而闻名。他们像科技公司一样运作,在数据基础设施、ML平台和研究工具上投入巨资。

Core Philosophy

核心理念

"We're a technology company that happens to be in finance."
"Data is the new oil, but only if you can refine it."
"The best model is worthless without the infrastructure to deploy it."
Two Sigma believes that competitive advantage comes from data infrastructure and research velocity—the ability to test more ideas faster than competitors.
"我们是一家恰好涉足金融领域的科技公司。"
"数据是新的石油,但只有能提炼它才有价值。"
"没有部署基础设施,再好的模型也毫无用处。"
Two Sigma认为,竞争优势来自数据基础设施和研究速度——比竞争对手更快测试更多想法的能力。

Design Principles

设计原则

  1. Data Platform First: Build the platform, then the models.
  2. Feature Store: Features are first-class citizens, versioned and shared.
  3. Reproducibility: Every experiment must be reproducible.
  4. Scale Horizontally: Design for 1000x more data than you have today.
  5. Research Velocity: Reduce time from idea to tested hypothesis.
  1. 数据平台优先:先搭建平台,再开发模型。
  2. 特征存储(Feature Store):特征是一等公民,需进行版本控制并共享。
  3. 可复现性:每个实验都必须可复现。
  4. 水平扩展:按当前数据量的1000倍规模进行设计。
  5. 研究速度:缩短从想法到验证假设的时间。

When Building ML Trading Systems

构建ML交易系统时的注意事项

Always

必须遵守

  • Version everything: data, features, models, code
  • Store point-in-time snapshots (no lookahead bias)
  • Track feature lineage and dependencies
  • Use distributed backtesting for scale
  • Monitor model drift in production
  • Separate feature engineering from model training
  • 对所有内容进行版本控制:数据、特征、模型、代码
  • 存储时点快照(避免前瞻偏差)
  • 跟踪特征的谱系和依赖关系
  • 使用分布式回测实现规模化
  • 监控生产环境中的模型漂移
  • 将特征工程与模型训练分离

Never

切勿

  • Use future data in features (even accidentally)
  • Train and test on overlapping time periods
  • Ignore transaction costs in backtests
  • Deploy models without monitoring
  • Hardcode features in model code
  • Trust a single backtest run
  • 在特征中使用未来数据(即使是意外)
  • 在重叠时间段上进行训练和测试
  • 在回测中忽略交易成本
  • 未经监控就部署模型
  • 在模型代码中硬编码特征
  • 信任单次回测结果

Prefer

推荐做法

  • Feature stores over ad-hoc feature computation
  • Declarative pipelines over imperative scripts
  • Ensemble methods over single models
  • Online learning for adaptation
  • A/B testing for model deployment
  • Distributed compute over single-machine
  • 使用特征存储而非临时特征计算
  • 使用声明式流水线而非命令式脚本
  • 使用集成方法而非单一模型
  • 使用在线学习实现自适应
  • 对模型部署进行A/B测试
  • 使用分布式计算而非单机计算

Code Patterns

代码模式

Feature Store Architecture

特征存储架构

python
class FeatureStore:
    """
    Two Sigma's insight: features are the real IP.
    Centralize, version, and share them.
    """
    
    def __init__(self, storage_backend, metadata_db):
        self.storage = storage_backend  # e.g., S3, HDFS
        self.metadata = metadata_db     # e.g., PostgreSQL
    
    def register_feature(self, 
                         name: str,
                         computation: Callable,
                         dependencies: List[str],
                         lookback_window: timedelta,
                         description: str):
        """Register a new feature definition."""
        feature_def = FeatureDefinition(
            name=name,
            computation=computation,
            dependencies=dependencies,
            lookback_window=lookback_window,
            description=description,
            version=self.get_next_version(name),
            created_at=datetime.utcnow()
        )
        self.metadata.save(feature_def)
        return feature_def
    
    def compute_feature(self, 
                        feature_name: str, 
                        as_of_date: date,
                        universe: List[str]) -> pd.DataFrame:
        """
        Compute feature values as of a specific date.
        CRITICAL: No future information leakage.
        """
        feature_def = self.metadata.get_latest(feature_name)
        
        # Get point-in-time data for dependencies
        dependency_data = {}
        for dep in feature_def.dependencies:
            dependency_data[dep] = self.get_pit_data(
                dep, 
                as_of_date, 
                lookback=feature_def.lookback_window
            )
        
        # Compute feature
        result = feature_def.computation(dependency_data, universe, as_of_date)
        
        # Cache result
        self.storage.save(
            feature_name=feature_name,
            version=feature_def.version,
            as_of_date=as_of_date,
            values=result
        )
        
        return result
    
    def get_training_data(self,
                          features: List[str],
                          target: str,
                          start_date: date,
                          end_date: date,
                          universe: List[str]) -> pd.DataFrame:
        """
        Get feature matrix for training.
        Each row is (date, symbol) with features computed as-of that date.
        """
        rows = []
        
        for current_date in date_range(start_date, end_date):
            feature_values = {}
            
            for feature_name in features:
                feature_values[feature_name] = self.compute_feature(
                    feature_name, current_date, universe
                )
            
            # Target must be from the FUTURE (what we're predicting)
            target_values = self.get_future_target(
                target, current_date, universe
            )
            
            row = self.merge_features_and_target(
                feature_values, target_values, current_date
            )
            rows.append(row)
        
        return pd.concat(rows)
python
class FeatureStore:
    """
    Two Sigma's insight: features are the real IP.
    Centralize, version, and share them.
    """
    
    def __init__(self, storage_backend, metadata_db):
        self.storage = storage_backend  # e.g., S3, HDFS
        self.metadata = metadata_db     # e.g., PostgreSQL
    
    def register_feature(self, 
                         name: str,
                         computation: Callable,
                         dependencies: List[str],
                         lookback_window: timedelta,
                         description: str):
        """Register a new feature definition."""
        feature_def = FeatureDefinition(
            name=name,
            computation=computation,
            dependencies=dependencies,
            lookback_window=lookback_window,
            description=description,
            version=self.get_next_version(name),
            created_at=datetime.utcnow()
        )
        self.metadata.save(feature_def)
        return feature_def
    
    def compute_feature(self, 
                        feature_name: str, 
                        as_of_date: date,
                        universe: List[str]) -> pd.DataFrame:
        """
        Compute feature values as of a specific date.
        CRITICAL: No future information leakage.
        """
        feature_def = self.metadata.get_latest(feature_name)
        
        # Get point-in-time data for dependencies
        dependency_data = {}
        for dep in feature_def.dependencies:
            dependency_data[dep] = self.get_pit_data(
                dep, 
                as_of_date, 
                lookback=feature_def.lookback_window
            )
        
        # Compute feature
        result = feature_def.computation(dependency_data, universe, as_of_date)
        
        # Cache result
        self.storage.save(
            feature_name=feature_name,
            version=feature_def.version,
            as_of_date=as_of_date,
            values=result
        )
        
        return result
    
    def get_training_data(self,
                          features: List[str],
                          target: str,
                          start_date: date,
                          end_date: date,
                          universe: List[str]) -> pd.DataFrame:
        """
        Get feature matrix for training.
        Each row is (date, symbol) with features computed as-of that date.
        """
        rows = []
        
        for current_date in date_range(start_date, end_date):
            feature_values = {}
            
            for feature_name in features:
                feature_values[feature_name] = self.compute_feature(
                    feature_name, current_date, universe
                )
            
            # Target must be from the FUTURE (what we're predicting)
            target_values = self.get_future_target(
                target, current_date, universe
            )
            
            row = self.merge_features_and_target(
                feature_values, target_values, current_date
            )
            rows.append(row)
        
        return pd.concat(rows)

Distributed Backtesting

分布式回测

python
class DistributedBacktester:
    """
    Two Sigma approach: parallelize backtesting across cluster.
    Test hundreds of configurations simultaneously.
    """
    
    def __init__(self, cluster: SparkCluster):
        self.cluster = cluster
        self.feature_store = FeatureStore()
    
    def run_parameter_sweep(self,
                            strategy_class: Type[Strategy],
                            param_grid: Dict[str, List],
                            start_date: date,
                            end_date: date,
                            n_splits: int = 5) -> pd.DataFrame:
        """
        Distributed parameter sweep with cross-validation.
        """
        # Generate all parameter combinations
        param_combinations = list(ParameterGrid(param_grid))
        
        # Create time-series cross-validation splits
        cv_splits = self.create_ts_splits(start_date, end_date, n_splits)
        
        # Distribute work: (params, cv_fold) pairs
        work_items = [
            (params, train_dates, test_dates)
            for params in param_combinations
            for train_dates, test_dates in cv_splits
        ]
        
        # Run in parallel on cluster
        results = self.cluster.map(
            self.run_single_backtest,
            work_items,
            strategy_class=strategy_class
        )
        
        return self.aggregate_results(results)
    
    def run_single_backtest(self,
                            params: Dict,
                            train_dates: Tuple[date, date],
                            test_dates: Tuple[date, date],
                            strategy_class: Type[Strategy]) -> BacktestResult:
        """Single backtest run on a worker node."""
        
        # Get training data
        train_data = self.feature_store.get_training_data(
            features=strategy_class.required_features(),
            target='forward_returns',
            start_date=train_dates[0],
            end_date=train_dates[1]
        )
        
        # Train strategy
        strategy = strategy_class(**params)
        strategy.fit(train_data)
        
        # Run backtest on test period
        test_data = self.feature_store.get_training_data(
            features=strategy_class.required_features(),
            target='forward_returns',
            start_date=test_dates[0],
            end_date=test_dates[1]
        )
        
        returns = self.simulate_trading(strategy, test_data)
        
        return BacktestResult(
            params=params,
            train_period=train_dates,
            test_period=test_dates,
            returns=returns,
            sharpe=self.calculate_sharpe(returns),
            max_drawdown=self.calculate_max_drawdown(returns)
        )
python
class DistributedBacktester:
    """
    Two Sigma approach: parallelize backtesting across cluster.
    Test hundreds of configurations simultaneously.
    """
    
    def __init__(self, cluster: SparkCluster):
        self.cluster = cluster
        self.feature_store = FeatureStore()
    
    def run_parameter_sweep(self,
                            strategy_class: Type[Strategy],
                            param_grid: Dict[str, List],
                            start_date: date,
                            end_date: date,
                            n_splits: int = 5) -> pd.DataFrame:
        """
        Distributed parameter sweep with cross-validation.
        """
        # Generate all parameter combinations
        param_combinations = list(ParameterGrid(param_grid))
        
        # Create time-series cross-validation splits
        cv_splits = self.create_ts_splits(start_date, end_date, n_splits)
        
        # Distribute work: (params, cv_fold) pairs
        work_items = [
            (params, train_dates, test_dates)
            for params in param_combinations
            for train_dates, test_dates in cv_splits
        ]
        
        # Run in parallel on cluster
        results = self.cluster.map(
            self.run_single_backtest,
            work_items,
            strategy_class=strategy_class
        )
        
        return self.aggregate_results(results)
    
    def run_single_backtest(self,
                            params: Dict,
                            train_dates: Tuple[date, date],
                            test_dates: Tuple[date, date],
                            strategy_class: Type[Strategy]) -> BacktestResult:
        """Single backtest run on a worker node."""
        
        # Get training data
        train_data = self.feature_store.get_training_data(
            features=strategy_class.required_features(),
            target='forward_returns',
            start_date=train_dates[0],
            end_date=train_dates[1]
        )
        
        # Train strategy
        strategy = strategy_class(**params)
        strategy.fit(train_data)
        
        # Run backtest on test period
        test_data = self.feature_store.get_training_data(
            features=strategy_class.required_features(),
            target='forward_returns',
            start_date=test_dates[0],
            end_date=test_dates[1]
        )
        
        returns = self.simulate_trading(strategy, test_data)
        
        return BacktestResult(
            params=params,
            train_period=train_dates,
            test_period=test_dates,
            returns=returns,
            sharpe=self.calculate_sharpe(returns),
            max_drawdown=self.calculate_max_drawdown(returns)
        )

Alternative Data Pipeline

另类数据流水线

python
class AlternativeDataPipeline:
    """
    Two Sigma's edge: alternative data processed at scale.
    Satellite imagery, credit card data, web scraping, etc.
    """
    
    def __init__(self, raw_storage, processed_storage, feature_store):
        self.raw = raw_storage
        self.processed = processed_storage
        self.feature_store = feature_store
    
    def ingest_satellite_imagery(self, 
                                  source: str,
                                  date: date) -> Dict[str, Any]:
        """
        Example: count cars in retail parking lots.
        """
        # Download raw imagery
        raw_images = self.fetch_images(source, date)
        
        # Store raw with metadata
        raw_path = self.raw.save(
            data=raw_images,
            source=source,
            date=date,
            ingested_at=datetime.utcnow()
        )
        
        # Process: detect and count vehicles
        processed = {}
        for location_id, image in raw_images.items():
            vehicle_count = self.detect_vehicles(image)
            processed[location_id] = {
                'vehicle_count': vehicle_count,
                'image_quality': self.assess_quality(image),
                'weather_conditions': self.detect_weather(image)
            }
        
        # Store processed
        processed_path = self.processed.save(
            data=processed,
            source=source,
            date=date,
            processing_version='v2.3'
        )
        
        return processed
    
    def build_retail_traffic_feature(self):
        """
        Convert satellite data into trading feature.
        """
        def compute(deps, universe, as_of_date):
            # Get satellite data up to as_of_date
            satellite = deps['satellite_parking_counts']
            
            # Map locations to companies
            location_mapping = deps['location_to_ticker']
            
            # Aggregate by company
            company_traffic = {}
            for ticker in universe:
                locations = location_mapping.get(ticker, [])
                counts = [satellite.get(loc, {}).get('vehicle_count', np.nan) 
                          for loc in locations]
                
                # Year-over-year change (careful about seasonality)
                current = np.nanmean(counts)
                year_ago = self.get_year_ago_counts(ticker, as_of_date)
                
                company_traffic[ticker] = {
                    'parking_lot_traffic': current,
                    'traffic_yoy_change': (current - year_ago) / year_ago if year_ago else np.nan
                }
            
            return pd.DataFrame(company_traffic).T
        
        self.feature_store.register_feature(
            name='retail_parking_traffic',
            computation=compute,
            dependencies=['satellite_parking_counts', 'location_to_ticker'],
            lookback_window=timedelta(days=7),
            description='YoY change in parking lot traffic from satellite imagery'
        )
python
class AlternativeDataPipeline:
    """
    Two Sigma's edge: alternative data processed at scale.
    Satellite imagery, credit card data, web scraping, etc.
    """
    
    def __init__(self, raw_storage, processed_storage, feature_store):
        self.raw = raw_storage
        self.processed = processed_storage
        self.feature_store = feature_store
    
    def ingest_satellite_imagery(self, 
                                  source: str,
                                  date: date) -> Dict[str, Any]:
        """
        Example: count cars in retail parking lots.
        """
        # Download raw imagery
        raw_images = self.fetch_images(source, date)
        
        # Store raw with metadata
        raw_path = self.raw.save(
            data=raw_images,
            source=source,
            date=date,
            ingested_at=datetime.utcnow()
        )
        
        # Process: detect and count vehicles
        processed = {}
        for location_id, image in raw_images.items():
            vehicle_count = self.detect_vehicles(image)
            processed[location_id] = {
                'vehicle_count': vehicle_count,
                'image_quality': self.assess_quality(image),
                'weather_conditions': self.detect_weather(image)
            }
        
        # Store processed
        processed_path = self.processed.save(
            data=processed,
            source=source,
            date=date,
            processing_version='v2.3'
        )
        
        return processed
    
    def build_retail_traffic_feature(self):
        """
        Convert satellite data into trading feature.
        """
        def compute(deps, universe, as_of_date):
            # Get satellite data up to as_of_date
            satellite = deps['satellite_parking_counts']
            
            # Map locations to companies
            location_mapping = deps['location_to_ticker']
            
            # Aggregate by company
            company_traffic = {}
            for ticker in universe:
                locations = location_mapping.get(ticker, [])
                counts = [satellite.get(loc, {}).get('vehicle_count', np.nan) 
                          for loc in locations]
                
                # Year-over-year change (careful about seasonality)
                current = np.nanmean(counts)
                year_ago = self.get_year_ago_counts(ticker, as_of_date)
                
                company_traffic[ticker] = {
                    'parking_lot_traffic': current,
                    'traffic_yoy_change': (current - year_ago) / year_ago if year_ago else np.nan
                }
            
            return pd.DataFrame(company_traffic).T
        
        self.feature_store.register_feature(
            name='retail_parking_traffic',
            computation=compute,
            dependencies=['satellite_parking_counts', 'location_to_ticker'],
            lookback_window=timedelta(days=7),
            description='YoY change in parking lot traffic from satellite imagery'
        )

Model Monitoring and Drift Detection

模型监控与漂移检测

python
class ModelMonitor:
    """
    Two Sigma approach: continuous monitoring of production models.
    Detect drift before it becomes a problem.
    """
    
    def __init__(self, model_registry, metrics_store):
        self.registry = model_registry
        self.metrics = metrics_store
        self.drift_thresholds = {}
    
    def monitor_model(self, 
                      model_id: str,
                      predictions: pd.Series,
                      actuals: pd.Series,
                      features: pd.DataFrame):
        """Real-time monitoring of model performance."""
        
        # Performance metrics
        ic = stats.spearmanr(predictions, actuals).correlation
        hit_rate = (np.sign(predictions) == np.sign(actuals)).mean()
        
        # Feature drift detection
        feature_drift = self.detect_feature_drift(model_id, features)
        
        # Prediction distribution drift
        pred_drift = self.detect_prediction_drift(model_id, predictions)
        
        # Store metrics
        self.metrics.log(
            model_id=model_id,
            timestamp=datetime.utcnow(),
            ic=ic,
            hit_rate=hit_rate,
            feature_drift=feature_drift,
            prediction_drift=pred_drift
        )
        
        # Alert on significant drift
        if feature_drift > self.drift_thresholds.get(model_id, 0.1):
            self.alert(f"Feature drift detected for {model_id}: {feature_drift}")
        
        if ic < 0:
            self.alert(f"Negative IC for {model_id}: {ic}")
    
    def detect_feature_drift(self, 
                             model_id: str, 
                             current_features: pd.DataFrame) -> float:
        """
        Compare current feature distribution to training distribution.
        Uses Population Stability Index (PSI).
        """
        training_stats = self.registry.get_training_stats(model_id)
        
        psi_scores = []
        for col in current_features.columns:
            expected = training_stats[col]
            actual = current_features[col]
            psi = self.calculate_psi(expected, actual)
            psi_scores.append(psi)
        
        return np.mean(psi_scores)
    
    def calculate_psi(self, expected: pd.Series, actual: pd.Series) -> float:
        """Population Stability Index for drift detection."""
        # Bin the distributions
        bins = np.percentile(expected, np.linspace(0, 100, 11))
        
        expected_pct = np.histogram(expected, bins=bins)[0] / len(expected)
        actual_pct = np.histogram(actual, bins=bins)[0] / len(actual)
        
        # Avoid division by zero
        expected_pct = np.clip(expected_pct, 0.001, 1)
        actual_pct = np.clip(actual_pct, 0.001, 1)
        
        psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
        return psi
python
class ModelMonitor:
    """
    Two Sigma approach: continuous monitoring of production models.
    Detect drift before it becomes a problem.
    """
    
    def __init__(self, model_registry, metrics_store):
        self.registry = model_registry
        self.metrics = metrics_store
        self.drift_thresholds = {}
    
    def monitor_model(self, 
                      model_id: str,
                      predictions: pd.Series,
                      actuals: pd.Series,
                      features: pd.DataFrame):
        """Real-time monitoring of model performance."""
        
        # Performance metrics
        ic = stats.spearmanr(predictions, actuals).correlation
        hit_rate = (np.sign(predictions) == np.sign(actuals)).mean()
        
        # Feature drift detection
        feature_drift = self.detect_feature_drift(model_id, features)
        
        # Prediction distribution drift
        pred_drift = self.detect_prediction_drift(model_id, predictions)
        
        # Store metrics
        self.metrics.log(
            model_id=model_id,
            timestamp=datetime.utcnow(),
            ic=ic,
            hit_rate=hit_rate,
            feature_drift=feature_drift,
            prediction_drift=pred_drift
        )
        
        # Alert on significant drift
        if feature_drift > self.drift_thresholds.get(model_id, 0.1):
            self.alert(f"Feature drift detected for {model_id}: {feature_drift}")
        
        if ic < 0:
            self.alert(f"Negative IC for {model_id}: {ic}")
    
    def detect_feature_drift(self, 
                             model_id: str, 
                             current_features: pd.DataFrame) -> float:
        """
        Compare current feature distribution to training distribution.
        Uses Population Stability Index (PSI).
        """
        training_stats = self.registry.get_training_stats(model_id)
        
        psi_scores = []
        for col in current_features.columns:
            expected = training_stats[col]
            actual = current_features[col]
            psi = self.calculate_psi(expected, actual)
            psi_scores.append(psi)
        
        return np.mean(psi_scores)
    
    def calculate_psi(self, expected: pd.Series, actual: pd.Series) -> float:
        """Population Stability Index for drift detection."""
        # Bin the distributions
        bins = np.percentile(expected, np.linspace(0, 100, 11))
        
        expected_pct = np.histogram(expected, bins=bins)[0] / len(expected)
        actual_pct = np.histogram(actual, bins=bins)[0] / len(actual)
        
        # Avoid division by zero
        expected_pct = np.clip(expected_pct, 0.001, 1)
        actual_pct = np.clip(actual_pct, 0.001, 1)
        
        psi = np.sum((actual_pct - expected_pct) * np.log(actual_pct / expected_pct))
        return psi

Mental Model

思维模型

Two Sigma approaches ML trading by asking:
  1. What data do we have? And what data could we get?
  2. What features can we extract? Systematic feature engineering
  3. How do we avoid lookahead bias? Point-in-time everything
  4. How do we scale? Distributed compute for backtesting
  5. How do we monitor? Continuous drift detection
Two Sigma构建ML交易系统时会思考以下问题:
  1. 我们拥有哪些数据? 以及我们可以获取哪些数据?
  2. 我们可以提取哪些特征? 系统化的特征工程
  3. 如何避免前瞻偏差? 所有操作基于时点数据
  4. 如何实现规模化? 用分布式计算进行回测
  5. 如何监控? 持续的漂移检测

Signature Two Sigma Moves

Two Sigma的标志性实践

  • Feature stores as central infrastructure
  • Point-in-time data management
  • Distributed backtesting at scale
  • Alternative data pipelines
  • Continuous model monitoring
  • Experiment tracking and reproducibility
  • Declarative feature definitions
  • Horizontal scaling for research
  • 将特征存储作为核心基础设施
  • 时点数据管理
  • 规模化分布式回测
  • 另类数据流水线
  • 持续模型监控
  • 实验跟踪与可复现性
  • 声明式特征定义
  • 研究能力的水平扩展