analytics-pipeline

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Analytics Pipeline

分析流水线

High-performance analytics with Redis counters and periodic database flush.
基于Redis计数器与定期数据库刷新的高性能分析方案。

When to Use This Skill

何时使用该方案

  • Need high-throughput event tracking (thousands/second)
  • Want real-time counters without database bottlenecks
  • Building dashboards with time-series data
  • Tracking user activity, feature usage, or page views
  • 需要高吞吐量事件追踪(每秒数千次)
  • 希望实现无数据库瓶颈的实时计数器
  • 构建基于时间序列数据的仪表盘
  • 追踪用户活动、功能使用情况或页面浏览量

Core Concepts

核心概念

Write to Redis for speed, flush to PostgreSQL for persistence. Redis handles high write throughput, periodic workers batch-flush to the database.
Events → Redis Counters → Periodic Flush Worker → PostgreSQL → Dashboard Queries
写入Redis以保证速度,刷新到PostgreSQL以实现持久化。Redis处理高写入吞吐量,定期运行的Worker将数据批量刷新到数据库。
Events → Redis Counters → Periodic Flush Worker → PostgreSQL → Dashboard Queries

Implementation

实现方案

Python

Python

python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List
import redis.asyncio as redis


class AnalyticsEventType(str, Enum):
    GENERATION_COMPLETED = "generation_completed"
    USER_SIGNUP = "user_signup"
    FEATURE_USED = "feature_used"
    PAGE_VIEW = "page_view"


@dataclass
class AnalyticsEvent:
    event_type: AnalyticsEventType
    user_id: Optional[str] = None
    properties: Optional[Dict] = None
    timestamp: Optional[datetime] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now(timezone.utc)


class AnalyticsKeys:
    """Redis key patterns for analytics counters."""
    PREFIX = "analytics"

    @staticmethod
    def daily_counter(event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d')}"

    @staticmethod
    def hourly_counter(event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d:%H')}"

    @staticmethod
    def user_daily_counter(user_id: str, event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:user:{user_id}:{event_type}:{d.strftime('%Y-%m-%d')}"

    @staticmethod
    def pending_flush_set() -> str:
        return "analytics:pending_flush"


class AnalyticsService:
    """High-performance analytics using Redis counters."""
    COUNTER_TTL = 7 * 24 * 60 * 60  # 7 days

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def track_event(self, event: AnalyticsEvent) -> None:
        pipe = self.redis.pipeline()

        # Daily counter
        daily_key = AnalyticsKeys.daily_counter(event.event_type.value, event.timestamp)
        pipe.incr(daily_key)
        pipe.expire(daily_key, self.COUNTER_TTL)

        # Hourly counter
        hourly_key = AnalyticsKeys.hourly_counter(event.event_type.value, event.timestamp)
        pipe.incr(hourly_key)
        pipe.expire(hourly_key, self.COUNTER_TTL)

        # Per-user counter
        if event.user_id:
            user_key = AnalyticsKeys.user_daily_counter(event.user_id, event.event_type.value, event.timestamp)
            pipe.incr(user_key)
            pipe.expire(user_key, self.COUNTER_TTL)

        # Track for flush
        pipe.sadd(AnalyticsKeys.pending_flush_set(), 
                  f"{event.event_type.value}:{event.timestamp.strftime('%Y-%m-%d')}")

        await pipe.execute()

    async def get_daily_count(self, event_type: AnalyticsEventType, date: datetime = None) -> int:
        key = AnalyticsKeys.daily_counter(event_type.value, date)
        count = await self.redis.get(key)
        return int(count) if count else 0

    async def get_hourly_counts(self, event_type: AnalyticsEventType, date: datetime = None) -> Dict[int, int]:
        d = date or datetime.now(timezone.utc)
        pipe = self.redis.pipeline()
        for hour in range(24):
            hour_dt = d.replace(hour=hour, minute=0, second=0, microsecond=0)
            pipe.get(AnalyticsKeys.hourly_counter(event_type.value, hour_dt))
        results = await pipe.execute()
        return {hour: int(count) if count else 0 for hour, count in enumerate(results)}
python
class AnalyticsFlushWorker:
    """Periodically flushes Redis counters to PostgreSQL."""
    FLUSH_INTERVAL = 300  # 5 minutes
    BATCH_SIZE = 100

    def __init__(self, redis_client: redis.Redis, pg_pool):
        self.redis = redis_client
        self.pg = pg_pool
        self._running = False

    async def start(self) -> None:
        self._running = True
        while self._running:
            try:
                await self.flush()
            except Exception as e:
                logger.error(f"Flush error: {e}")
            await asyncio.sleep(self.FLUSH_INTERVAL)

    async def flush(self) -> int:
        pending = await self.redis.smembers(AnalyticsKeys.pending_flush_set())
        if not pending:
            return 0

        flushed = 0
        pending_list = list(pending)

        for i in range(0, len(pending_list), self.BATCH_SIZE):
            batch = pending_list[i:i + self.BATCH_SIZE]
            counters = await self._collect_counters(batch)

            if counters:
                await self._write_to_postgres(counters)
                flushed += len(counters)
                await self.redis.srem(AnalyticsKeys.pending_flush_set(), *batch)

        return flushed

    async def _collect_counters(self, pending_keys: List[str]) -> List[tuple]:
        counters = []
        pipe = self.redis.pipeline()

        for pending in pending_keys:
            parts = pending.split(":", 1)
            if len(parts) != 2:
                continue
            event_type, date = parts
            key = AnalyticsKeys.daily_counter(event_type, datetime.fromisoformat(date))
            pipe.getdel(key)  # Atomic get-and-delete

        results = await pipe.execute()

        for pending, count in zip(pending_keys, results):
            if count:
                parts = pending.split(":", 1)
                counters.append((parts[0], parts[1], int(count)))

        return counters

    async def _write_to_postgres(self, counters: List[tuple]) -> None:
        async with self.pg.acquire() as conn:
            await conn.executemany("""
                INSERT INTO analytics_daily (event_type, date, count, updated_at)
                VALUES ($1, $2, $3, NOW())
                ON CONFLICT (event_type, date)
                DO UPDATE SET count = analytics_daily.count + EXCLUDED.count, updated_at = NOW()
            """, counters)
python
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, List
import redis.asyncio as redis


class AnalyticsEventType(str, Enum):
    GENERATION_COMPLETED = "generation_completed"
    USER_SIGNUP = "user_signup"
    FEATURE_USED = "feature_used"
    PAGE_VIEW = "page_view"


@dataclass
class AnalyticsEvent:
    event_type: AnalyticsEventType
    user_id: Optional[str] = None
    properties: Optional[Dict] = None
    timestamp: Optional[datetime] = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now(timezone.utc)


class AnalyticsKeys:
    """Redis key patterns for analytics counters."""
    PREFIX = "analytics"

    @staticmethod
    def daily_counter(event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d')}"

    @staticmethod
    def hourly_counter(event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:counter:{event_type}:{d.strftime('%Y-%m-%d:%H')}"

    @staticmethod
    def user_daily_counter(user_id: str, event_type: str, date: datetime = None) -> str:
        d = date or datetime.now(timezone.utc)
        return f"analytics:user:{user_id}:{event_type}:{d.strftime('%Y-%m-%d')}"

    @staticmethod
    def pending_flush_set() -> str:
        return "analytics:pending_flush"


class AnalyticsService:
    """High-performance analytics using Redis counters."""
    COUNTER_TTL = 7 * 24 * 60 * 60  # 7 days

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def track_event(self, event: AnalyticsEvent) -> None:
        pipe = self.redis.pipeline()

        # Daily counter
        daily_key = AnalyticsKeys.daily_counter(event.event_type.value, event.timestamp)
        pipe.incr(daily_key)
        pipe.expire(daily_key, self.COUNTER_TTL)

        # Hourly counter
        hourly_key = AnalyticsKeys.hourly_counter(event.event_type.value, event.timestamp)
        pipe.incr(hourly_key)
        pipe.expire(hourly_key, self.COUNTER_TTL)

        # Per-user counter
        if event.user_id:
            user_key = AnalyticsKeys.user_daily_counter(event.user_id, event.event_type.value, event.timestamp)
            pipe.incr(user_key)
            pipe.expire(user_key, self.COUNTER_TTL)

        # Track for flush
        pipe.sadd(AnalyticsKeys.pending_flush_set(), 
                  f"{event.event_type.value}:{event.timestamp.strftime('%Y-%m-%d')}")

        await pipe.execute()

    async def get_daily_count(self, event_type: AnalyticsEventType, date: datetime = None) -> int:
        key = AnalyticsKeys.daily_counter(event_type.value, date)
        count = await self.redis.get(key)
        return int(count) if count else 0

    async def get_hourly_counts(self, event_type: AnalyticsEventType, date: datetime = None) -> Dict[int, int]:
        d = date or datetime.now(timezone.utc)
        pipe = self.redis.pipeline()
        for hour in range(24):
            hour_dt = d.replace(hour=hour, minute=0, second=0, microsecond=0)
            pipe.get(AnalyticsKeys.hourly_counter(event_type.value, hour_dt))
        results = await pipe.execute()
        return {hour: int(count) if count else 0 for hour, count in enumerate(results)}
python
class AnalyticsFlushWorker:
    """Periodically flushes Redis counters to PostgreSQL."""
    FLUSH_INTERVAL = 300  # 5 minutes
    BATCH_SIZE = 100

    def __init__(self, redis_client: redis.Redis, pg_pool):
        self.redis = redis_client
        self.pg = pg_pool
        self._running = False

    async def start(self) -> None:
        self._running = True
        while self._running:
            try:
                await self.flush()
            except Exception as e:
                logger.error(f"Flush error: {e}")
            await asyncio.sleep(self.FLUSH_INTERVAL)

    async def flush(self) -> int:
        pending = await self.redis.smembers(AnalyticsKeys.pending_flush_set())
        if not pending:
            return 0

        flushed = 0
        pending_list = list(pending)

        for i in range(0, len(pending_list), self.BATCH_SIZE):
            batch = pending_list[i:i + self.BATCH_SIZE]
            counters = await self._collect_counters(batch)

            if counters:
                await self._write_to_postgres(counters)
                flushed += len(counters)
                await self.redis.srem(AnalyticsKeys.pending_flush_set(), *batch)

        return flushed

    async def _collect_counters(self, pending_keys: List[str]) -> List[tuple]:
        counters = []
        pipe = self.redis.pipeline()

        for pending in pending_keys:
            parts = pending.split(":", 1)
            if len(parts) != 2:
                continue
            event_type, date = parts
            key = AnalyticsKeys.daily_counter(event_type, datetime.fromisoformat(date))
            pipe.getdel(key)  # Atomic get-and-delete

        results = await pipe.execute()

        for pending, count in zip(pending_keys, results):
            if count:
                parts = pending.split(":", 1)
                counters.append((parts[0], parts[1], int(count)))

        return counters

    async def _write_to_postgres(self, counters: List[tuple]) -> None:
        async with self.pg.acquire() as conn:
            await conn.executemany("""
                INSERT INTO analytics_daily (event_type, date, count, updated_at)
                VALUES ($1, $2, $3, NOW())
                ON CONFLICT (event_type, date)
                DO UPDATE SET count = analytics_daily.count + EXCLUDED.count, updated_at = NOW()
            """, counters)

Usage Examples

使用示例

python
undefined
python
undefined

Track events

Track events

analytics = AnalyticsService(redis_client)
await analytics.track_event(AnalyticsEvent( event_type=AnalyticsEventType.GENERATION_COMPLETED, user_id="user_123", properties={"model": "gpt-4"}, ))
analytics = AnalyticsService(redis_client)
await analytics.track_event(AnalyticsEvent( event_type=AnalyticsEventType.GENERATION_COMPLETED, user_id="user_123", properties={"model": "gpt-4"}, ))

Query real-time counts

Query real-time counts

today_count = await analytics.get_daily_count(AnalyticsEventType.GENERATION_COMPLETED) hourly = await analytics.get_hourly_counts(AnalyticsEventType.GENERATION_COMPLETED)
today_count = await analytics.get_daily_count(AnalyticsEventType.GENERATION_COMPLETED) hourly = await analytics.get_hourly_counts(AnalyticsEventType.GENERATION_COMPLETED)

Start flush worker

Start flush worker

worker = AnalyticsFlushWorker(redis_client, pg_pool) asyncio.create_task(worker.start())
undefined
worker = AnalyticsFlushWorker(redis_client, pg_pool) asyncio.create_task(worker.start())
undefined

Best Practices

最佳实践

  1. Use Redis pipelines for batched counter updates
  2. Set TTL on counters to prevent memory growth
  3. Use GETDEL for atomic flush to prevent double-counting
  4. Upsert on flush to handle duplicate dates gracefully
  5. Separate user vs global analytics tables for query efficiency
  1. 使用Redis流水线进行批量计数器更新
  2. 为计数器设置TTL以避免内存增长
  3. 使用GETDEL实现原子刷新,防止重复计数
  4. 刷新时使用Upsert优雅处理重复日期
  5. 分离用户与全局分析表以提升查询效率

Common Mistakes

常见错误

  • Not setting TTL on Redis keys (memory leak)
  • Using GET then DEL instead of GETDEL (race condition)
  • Flushing too frequently (database load)
  • Not batching flush operations
  • 未为Redis键设置TTL(内存泄漏)
  • 使用GET后再DEL而非GETDEL(竞态条件)
  • 刷新过于频繁(增加数据库负载)
  • 未对刷新操作进行批处理

Related Patterns

相关模式

  • metrics-collection (system metrics)
  • intelligent-cache (caching strategies)
  • metrics-collection(系统指标)
  • intelligent-cache(缓存策略)