customerio-load-scale

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Customer.io Load & Scale

Customer.io 负载测试与扩容

Overview

概述

Load testing and scaling strategies for high-volume Customer.io integrations.
针对高流量Customer.io集成的负载测试与扩容策略。

Prerequisites

前置条件

  • Customer.io integration working
  • Load testing tools (k6, Artillery)
  • Staging environment with test workspace
  • 已正常运行的Customer.io集成
  • 负载测试工具(k6、Artillery)
  • 带有测试工作区的预发布环境

Capacity Planning

容量规划

Customer.io Rate Limits

Customer.io 速率限制

EndpointLimitNotes
Track API (identify/track)100 req/secPer workspace
App API (transactional)100 req/secPer workspace
Webhooks (outbound)VariesBased on plan
接口端点限制值说明
Track API(identify/track)100 请求/秒每个工作区
App API(事务型)100 请求/秒每个工作区
Webhooks(出站)按需变化基于所购套餐

Scaling Targets

扩容目标

VolumeArchitectureNotes
< 1M events/daySingle serviceDirect API calls
1-10M events/dayQueue-basedMessage queue buffer
> 10M events/dayDistributedMultiple workers
流量规模架构方案说明
每日少于100万条事件单服务架构直接调用API
每日100万-1000万条事件基于消息队列的架构消息队列缓冲
每日超过1000万条事件分布式架构多工作进程

Instructions

操作步骤

Step 1: Load Test Script (k6)

步骤1:负载测试脚本(k6)

javascript
// load-tests/customerio.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const identifyDuration = new Trend('identify_duration');
const trackDuration = new Trend('track_duration');

const BASE_URL = 'https://track.customer.io/api/v1';
const AUTH = __ENV.CUSTOMERIO_AUTH; // base64(site_id:api_key)

export const options = {
  scenarios: {
    identify_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'identifyScenario',
    },
    track_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'trackScenario',
    },
  },
  thresholds: {
    'errors': ['rate<0.01'],
    'identify_duration': ['p95<500'],
    'track_duration': ['p95<500'],
  },
};

export function identifyScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    email: `${userId}@loadtest.com`,
    _load_test: true,
    created_at: Math.floor(Date.now() / 1000),
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  identifyDuration.add(new Date() - start);

  const success = check(res, {
    'identify status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}

export function trackScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    name: 'load_test_event',
    data: {
      source: 'k6',
      timestamp: new Date().toISOString(),
    },
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}/events`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  trackDuration.add(new Date() - start);

  const success = check(res, {
    'track status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}
javascript
// load-tests/customerio.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate, Trend } from 'k6/metrics';

const errorRate = new Rate('errors');
const identifyDuration = new Trend('identify_duration');
const trackDuration = new Trend('track_duration');

const BASE_URL = 'https://track.customer.io/api/v1';
const AUTH = __ENV.CUSTOMERIO_AUTH; // base64(site_id:api_key)

export const options = {
  scenarios: {
    identify_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'identifyScenario',
    },
    track_load: {
      executor: 'ramping-rate',
      startRate: 10,
      timeUnit: '1s',
      preAllocatedVUs: 50,
      stages: [
        { target: 50, duration: '1m' },
        { target: 100, duration: '2m' },
        { target: 100, duration: '5m' },
        { target: 0, duration: '1m' },
      ],
      exec: 'trackScenario',
    },
  },
  thresholds: {
    'errors': ['rate<0.01'],
    'identify_duration': ['p95<500'],
    'track_duration': ['p95<500'],
  },
};

export function identifyScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    email: `${userId}@loadtest.com`,
    _load_test: true,
    created_at: Math.floor(Date.now() / 1000),
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  identifyDuration.add(new Date() - start);

  const success = check(res, {
    'identify status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}

export function trackScenario() {
  const userId = `load-test-${__VU}-${__ITER}`;
  const payload = JSON.stringify({
    name: 'load_test_event',
    data: {
      source: 'k6',
      timestamp: new Date().toISOString(),
    },
  });

  const start = new Date();
  const res = http.post(
    `${BASE_URL}/customers/${userId}/events`,
    payload,
    {
      headers: {
        'Authorization': `Basic ${AUTH}`,
        'Content-Type': 'application/json',
      },
    }
  );
  trackDuration.add(new Date() - start);

  const success = check(res, {
    'track status is 200': (r) => r.status === 200,
  });
  errorRate.add(!success);

  sleep(0.1);
}

Step 2: Horizontal Scaling

步骤2:水平扩容

yaml
undefined
yaml
undefined

k8s/scaled-deployment.yaml

k8s/scaled-deployment.yaml

apiVersion: apps/v1 kind: Deployment metadata: name: customerio-worker spec: replicas: 3 selector: matchLabels: app: customerio-worker template: metadata: labels: app: customerio-worker spec: containers: - name: worker image: customerio-worker:latest resources: requests: cpu: "500m" memory: "256Mi" limits: cpu: "1000m" memory: "512Mi" env: - name: CONCURRENCY value: "10"

apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: customerio-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: customerio-worker minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: External external: metric: name: pubsub.googleapis.com|subscription|num_undelivered_messages selector: matchLabels: resource.labels.subscription_id: customerio-events target: type: AverageValue averageValue: 1000
undefined

apiVersion: apps/v1 kind: Deployment metadata: name: customerio-worker spec: replicas: 3 selector: matchLabels: app: customerio-worker template: metadata: labels: app: customerio-worker spec: containers: - name: worker image: customerio-worker:latest resources: requests: cpu: "500m" memory: "256Mi" limits: cpu: "1000m" memory: "512Mi" env: - name: CONCURRENCY value: "10"

apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: customerio-worker-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: customerio-worker minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: External external: metric: name: pubsub.googleapis.com|subscription|num_undelivered_messages selector: matchLabels: resource.labels.subscription_id: customerio-events target: type: AverageValue averageValue: 1000
undefined

Step 3: Message Queue Architecture

步骤3:消息队列架构

typescript
// lib/scaled-processor.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { TrackClient, RegionUS } from '@customerio/track';

const kafka = new Kafka({
  clientId: 'customerio-worker',
  brokers: process.env.KAFKA_BROKERS!.split(',')
});

const consumer = kafka.consumer({
  groupId: 'customerio-workers',
  sessionTimeout: 30000,
  heartbeatInterval: 3000
});

const client = new TrackClient(
  process.env.CUSTOMERIO_SITE_ID!,
  process.env.CUSTOMERIO_API_KEY!,
  { region: RegionUS }
);

interface CustomerIOEvent {
  type: 'identify' | 'track';
  userId: string;
  payload: any;
}

async function processMessage(message: EachMessagePayload): Promise<void> {
  const event: CustomerIOEvent = JSON.parse(message.message.value!.toString());

  if (event.type === 'identify') {
    await client.identify(event.userId, event.payload);
  } else if (event.type === 'track') {
    await client.track(event.userId, {
      name: event.payload.event,
      data: event.payload.properties
    });
  }
}

async function start(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topic: 'customerio-events', fromBeginning: false });

  await consumer.run({
    partitionsConsumedConcurrently: 10,
    eachMessage: async (payload) => {
      try {
        await processMessage(payload);
      } catch (error) {
        console.error('Processing error:', error);
        // Dead letter or retry logic
      }
    }
  });
}

start().catch(console.error);
typescript
// lib/scaled-processor.ts
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { TrackClient, RegionUS } from '@customerio/track';

const kafka = new Kafka({
  clientId: 'customerio-worker',
  brokers: process.env.KAFKA_BROKERS!.split(',')
});

const consumer = kafka.consumer({
  groupId: 'customerio-workers',
  sessionTimeout: 30000,
  heartbeatInterval: 3000
});

const client = new TrackClient(
  process.env.CUSTOMERIO_SITE_ID!,
  process.env.CUSTOMERIO_API_KEY!,
  { region: RegionUS }
);

interface CustomerIOEvent {
  type: 'identify' | 'track';
  userId: string;
  payload: any;
}

async function processMessage(message: EachMessagePayload): Promise<void> {
  const event: CustomerIOEvent = JSON.parse(message.message.value!.toString());

  if (event.type === 'identify') {
    await client.identify(event.userId, event.payload);
  } else if (event.type === 'track') {
    await client.track(event.userId, {
      name: event.payload.event,
      data: event.payload.properties
    });
  }
}

async function start(): Promise<void> {
  await consumer.connect();
  await consumer.subscribe({ topic: 'customerio-events', fromBeginning: false });

  await consumer.run({
    partitionsConsumedConcurrently: 10,
    eachMessage: async (payload) => {
      try {
        await processMessage(payload);
      } catch (error) {
        console.error('Processing error:', error);
        // Dead letter or retry logic
      }
    }
  });
}

start().catch(console.error);

Step 4: Rate Limiter for Fair Usage

步骤4:公平使用速率限制器

typescript
// lib/rate-limiter.ts
import Bottleneck from 'bottleneck';

// Respect Customer.io's 100 req/sec limit
// Leave headroom for other services
const limiter = new Bottleneck({
  reservoir: 80, // 80 tokens
  reservoirRefreshAmount: 80,
  reservoirRefreshInterval: 1000, // per second
  maxConcurrent: 20,
  minTime: 10 // Minimum 10ms between requests
});

// Track rate limit events
limiter.on('depleted', () => {
  console.warn('Rate limiter depleted, requests queued');
});

limiter.on('error', (error) => {
  console.error('Rate limiter error:', error);
});

export async function rateLimitedIdentify(
  client: TrackClient,
  userId: string,
  attributes: Record<string, any>
): Promise<void> {
  return limiter.schedule(() => client.identify(userId, attributes));
}

export async function rateLimitedTrack(
  client: TrackClient,
  userId: string,
  event: string,
  data?: Record<string, any>
): Promise<void> {
  return limiter.schedule(() =>
    client.track(userId, { name: event, data })
  );
}

// Get limiter stats
export function getLimiterStats() {
  return {
    running: limiter.running(),
    queued: limiter.queued(),
    done: limiter.done(),
    reservoir: limiter.reservoir
  };
}
typescript
// lib/rate-limiter.ts
import Bottleneck from 'bottleneck';

// Respect Customer.io's 100 req/sec limit
// Leave headroom for other services
const limiter = new Bottleneck({
  reservoir: 80, // 80 tokens
  reservoirRefreshAmount: 80,
  reservoirRefreshInterval: 1000, // per second
  maxConcurrent: 20,
  minTime: 10 // Minimum 10ms between requests
});

// Track rate limit events
limiter.on('depleted', () => {
  console.warn('Rate limiter depleted, requests queued');
});

limiter.on('error', (error) => {
  console.error('Rate limiter error:', error);
});

export async function rateLimitedIdentify(
  client: TrackClient,
  userId: string,
  attributes: Record<string, any>
): Promise<void> {
  return limiter.schedule(() => client.identify(userId, attributes));
}

export async function rateLimitedTrack(
  client: TrackClient,
  userId: string,
  event: string,
  data?: Record<string, any>
): Promise<void> {
  return limiter.schedule(() =>
    client.track(userId, { name: event, data })
  );
}

// Get limiter stats
export function getLimiterStats() {
  return {
    running: limiter.running(),
    queued: limiter.queued(),
    done: limiter.done(),
    reservoir: limiter.reservoir
  };
}

Step 5: Batch Processing

步骤5:批量处理

typescript
// lib/batch-sender.ts
interface BatchConfig {
  maxBatchSize: number;
  maxWaitMs: number;
  concurrency: number;
}

class BatchSender {
  private batch: Array<{ userId: string; operation: 'identify' | 'track'; data: any }> = [];
  private timer: NodeJS.Timer | null = null;
  private processing = false;

  constructor(
    private client: TrackClient,
    private config: BatchConfig = { maxBatchSize: 100, maxWaitMs: 1000, concurrency: 10 }
  ) {}

  add(userId: string, operation: 'identify' | 'track', data: any): void {
    this.batch.push({ userId, operation, data });

    if (this.batch.length >= this.config.maxBatchSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.config.maxWaitMs);
    }
  }

  async flush(): Promise<void> {
    if (this.processing || this.batch.length === 0) return;

    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    this.processing = true;
    const items = this.batch.splice(0, this.config.maxBatchSize);

    // Process in parallel with limited concurrency
    for (let i = 0; i < items.length; i += this.config.concurrency) {
      const chunk = items.slice(i, i + this.config.concurrency);
      await Promise.allSettled(chunk.map(item => this.processItem(item)));
    }

    this.processing = false;
  }

  private async processItem(item: { userId: string; operation: string; data: any }): Promise<void> {
    if (item.operation === 'identify') {
      await this.client.identify(item.userId, item.data);
    } else {
      await this.client.track(item.userId, {
        name: item.data.event,
        data: item.data.properties
      });
    }
  }
}
typescript
// lib/batch-sender.ts
interface BatchConfig {
  maxBatchSize: number;
  maxWaitMs: number;
  concurrency: number;
}

class BatchSender {
  private batch: Array<{ userId: string; operation: 'identify' | 'track'; data: any }> = [];
  private timer: NodeJS.Timer | null = null;
  private processing = false;

  constructor(
    private client: TrackClient,
    private config: BatchConfig = { maxBatchSize: 100, maxWaitMs: 1000, concurrency: 10 }
  ) {}

  add(userId: string, operation: 'identify' | 'track', data: any): void {
    this.batch.push({ userId, operation, data });

    if (this.batch.length >= this.config.maxBatchSize) {
      this.flush();
    } else if (!this.timer) {
      this.timer = setTimeout(() => this.flush(), this.config.maxWaitMs);
    }
  }

  async flush(): Promise<void> {
    if (this.processing || this.batch.length === 0) return;

    if (this.timer) {
      clearTimeout(this.timer);
      this.timer = null;
    }

    this.processing = true;
    const items = this.batch.splice(0, this.config.maxBatchSize);

    // Process in parallel with limited concurrency
    for (let i = 0; i < items.length; i += this.config.concurrency) {
      const chunk = items.slice(i, i + this.config.concurrency);
      await Promise.allSettled(chunk.map(item => this.processItem(item)));
    }

    this.processing = false;
  }

  private async processItem(item: { userId: string; operation: string; data: any }): Promise<void> {
    if (item.operation === 'identify') {
      await this.client.identify(item.userId, item.data);
    } else {
      await this.client.track(item.userId, {
        name: item.data.event,
        data: item.data.properties
      });
    }
  }
}

Step 6: Load Test Execution

步骤6:执行负载测试

bash
#!/bin/bash
bash
#!/bin/bash

scripts/run-load-test.sh

scripts/run-load-test.sh

Set credentials

Set credentials

export CUSTOMERIO_AUTH=$(echo -n "$CIO_SITE_ID:$CIO_API_KEY" | base64)
export CUSTOMERIO_AUTH=$(echo -n "$CIO_SITE_ID:$CIO_API_KEY" | base64)

Run k6 load test

Run k6 load test

k6 run
--out json=results.json
--out influxdb=http://localhost:8086/k6
load-tests/customerio.js
k6 run
--out json=results.json
--out influxdb=http://localhost:8086/k6
load-tests/customerio.js

Generate report

Generate report

k6 run --summary-export=summary.json load-tests/customerio.js
echo "Load test complete. Results in results.json"
undefined
k6 run --summary-export=summary.json load-tests/customerio.js
echo "Load test complete. Results in results.json"
undefined

Scaling Checklist

扩容检查清单

  • Rate limits understood
  • Load tests written
  • Horizontal scaling configured
  • Message queue buffering
  • Rate limiting implemented
  • Batch processing enabled
  • Monitoring during tests
  • 已了解速率限制
  • 已编写负载测试脚本
  • 已配置水平扩容
  • 已启用消息队列缓冲
  • 已实现速率限制
  • 已启用批量处理
  • 测试期间已开启监控

Error Handling

错误处理

IssueSolution
Rate limited (429)Reduce concurrency
Timeout errorsIncrease timeout
Queue backlogScale workers
问题解决方案
被速率限制(429)降低并发量
超时错误增加超时时间
消息队列积压扩容工作进程

Resources

参考资源

Next Steps

后续步骤

After load testing, proceed to
customerio-known-pitfalls
for anti-patterns.
完成负载测试后,请查看
customerio-known-pitfalls
了解常见反模式。