community-feed

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Community Feed

社区信息流

Social feed with trending algorithms, cursor pagination, and engagement tracking.
支持热门算法、游标分页和互动追踪的社交信息流。

When to Use This Skill

何时使用该Skill

  • Building social feeds with infinite scroll
  • Need trending/hot content algorithms
  • Implementing like/engagement systems
  • Want efficient pagination for large datasets
  • 构建带无限滚动的社交信息流
  • 需要热门/热点内容算法
  • 实现点赞/互动系统
  • 为大型数据集实现高效分页

Core Concepts

核心概念

Cursor pagination beats offset for large datasets. Batch-load relationships to avoid N+1. Store trending scores as computed columns for efficient sorting.
对于大型数据集,游标分页优于偏移量分页。批量加载关联数据以避免N+1查询。将热门分数存储为计算列,以便高效排序。

Implementation

实现

Python

Python

python
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List, Dict
import base64
import json


@dataclass
class PaginatedPosts:
    posts: List[Dict]
    total_count: int
    has_more: bool
    next_cursor: Optional[str]


class CommunityFeedService:
    """Service for community feed with cursor pagination."""

    def __init__(self, db):
        self.db = db

    async def get_feed(
        self,
        feed_type: str = "trending",
        viewer_id: Optional[str] = None,
        cursor: Optional[str] = None,
        limit: int = 20,
        tags: Optional[List[str]] = None,
    ) -> PaginatedPosts:
        cursor_data = self._parse_cursor(cursor) if cursor else None

        query = self.db.table("community_posts").select(
            "*",
            "assets!inner(url, asset_type)",
            "users!inner(id, display_name, avatar_url)",
        )

        if tags:
            query = query.contains("tags", tags)

        # Feed-specific ordering
        if feed_type == "following" and viewer_id:
            following = await self._get_following_ids(viewer_id)
            if not following:
                return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
            query = query.in_("user_id", following)

        if feed_type == "trending":
            query = query.order("trending_score", desc=True)
            if cursor_data:
                query = query.lt("trending_score", cursor_data["score"])
        else:
            query = query.order("created_at", desc=True)
            if cursor_data:
                query = query.lt("created_at", cursor_data["created_at"])

        # Fetch one extra to check has_more
        query = query.limit(limit + 1)
        result = query.execute()
        posts = result.data or []

        has_more = len(posts) > limit
        if has_more:
            posts = posts[:limit]

        # Batch load viewer's likes
        if viewer_id and posts:
            liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
            for post in posts:
                post["is_liked_by_viewer"] = post["id"] in liked_ids

        next_cursor = None
        if has_more and posts:
            next_cursor = self._generate_cursor(posts[-1], feed_type)

        return PaginatedPosts(
            posts=posts,
            total_count=await self._get_total_count(tags),
            has_more=has_more,
            next_cursor=next_cursor,
        )

    async def _get_following_ids(self, user_id: str) -> List[str]:
        result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
        return [r["following_id"] for r in (result.data or [])]

    async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
        result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
        return {r["post_id"] for r in (result.data or [])}

    def _parse_cursor(self, cursor: str) -> dict:
        try:
            return json.loads(base64.b64decode(cursor).decode())
        except:
            return {}

    def _generate_cursor(self, post: dict, feed_type: str) -> str:
        if feed_type == "trending":
            data = {"score": post.get("trending_score", 0)}
        else:
            data = {"created_at": post["created_at"]}
        return base64.b64encode(json.dumps(data).encode()).decode()
python
undefined
python
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Optional, List, Dict
import base64
import json


@dataclass
class PaginatedPosts:
    posts: List[Dict]
    total_count: int
    has_more: bool
    next_cursor: Optional[str]


class CommunityFeedService:
    """Service for community feed with cursor pagination."""

    def __init__(self, db):
        self.db = db

    async def get_feed(
        self,
        feed_type: str = "trending",
        viewer_id: Optional[str] = None,
        cursor: Optional[str] = None,
        limit: int = 20,
        tags: Optional[List[str]] = None,
    ) -> PaginatedPosts:
        cursor_data = self._parse_cursor(cursor) if cursor else None

        query = self.db.table("community_posts").select(
            "*",
            "assets!inner(url, asset_type)",
            "users!inner(id, display_name, avatar_url)",
        )

        if tags:
            query = query.contains("tags", tags)

        # Feed-specific ordering
        if feed_type == "following" and viewer_id:
            following = await self._get_following_ids(viewer_id)
            if not following:
                return PaginatedPosts(posts=[], total_count=0, has_more=False, next_cursor=None)
            query = query.in_("user_id", following)

        if feed_type == "trending":
            query = query.order("trending_score", desc=True)
            if cursor_data:
                query = query.lt("trending_score", cursor_data["score"])
        else:
            query = query.order("created_at", desc=True)
            if cursor_data:
                query = query.lt("created_at", cursor_data["created_at"])

        # Fetch one extra to check has_more
        query = query.limit(limit + 1)
        result = query.execute()
        posts = result.data or []

        has_more = len(posts) > limit
        if has_more:
            posts = posts[:limit]

        # Batch load viewer's likes
        if viewer_id and posts:
            liked_ids = await self._get_liked_post_ids(viewer_id, [p["id"] for p in posts])
            for post in posts:
                post["is_liked_by_viewer"] = post["id"] in liked_ids

        next_cursor = None
        if has_more and posts:
            next_cursor = self._generate_cursor(posts[-1], feed_type)

        return PaginatedPosts(
            posts=posts,
            total_count=await self._get_total_count(tags),
            has_more=has_more,
            next_cursor=next_cursor,
        )

    async def _get_following_ids(self, user_id: str) -> List[str]:
        result = self.db.table("user_follows").select("following_id").eq("follower_id", user_id).execute()
        return [r["following_id"] for r in (result.data or [])]

    async def _get_liked_post_ids(self, user_id: str, post_ids: List[str]) -> set:
        result = self.db.table("post_likes").select("post_id").eq("user_id", user_id).in_("post_id", post_ids).execute()
        return {r["post_id"] for r in (result.data or [])}

    def _parse_cursor(self, cursor: str) -> dict:
        try:
            return json.loads(base64.b64decode(cursor).decode())
        except:
            return {}

    def _generate_cursor(self, post: dict, feed_type: str) -> str:
        if feed_type == "trending":
            data = {"score": post.get("trending_score", 0)}
        else:
            data = {"created_at": post["created_at"]}
        return base64.b64encode(json.dumps(data).encode()).decode()
python
undefined

Engagement operations

Engagement operations

async def like_post(self, post_id: str, user_id: str) -> bool: existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute() if existing.data: return False # Already liked
self.db.table("post_likes").insert({
    "post_id": post_id,
    "user_id": user_id,
    "created_at": datetime.now(timezone.utc).isoformat(),
}).execute()

# Atomic increment
self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
return True
async def unlike_post(self, post_id: str, user_id: str) -> bool: result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute() if not result.data: return False
self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
return True
async def like_post(self, post_id: str, user_id: str) -> bool: existing = self.db.table("post_likes").select("id").eq("post_id", post_id).eq("user_id", user_id).execute() if existing.data: return False # Already liked
self.db.table("post_likes").insert({
    "post_id": post_id,
    "user_id": user_id,
    "created_at": datetime.now(timezone.utc).isoformat(),
}).execute()

# Atomic increment
self.db.rpc("increment_like_count", {"post_id": post_id}).execute()
return True
async def unlike_post(self, post_id: str, user_id: str) -> bool: result = self.db.table("post_likes").delete().eq("post_id", post_id).eq("user_id", user_id).execute() if not result.data: return False
self.db.rpc("decrement_like_count", {"post_id": post_id}).execute()
return True

Trending algorithm

Trending algorithm

def calculate_trending_score( like_count: int, comment_count: int, view_count: int, created_at: datetime, is_featured: bool = False, ) -> float: """ Trending score = engagement / age^decay Higher engagement + newer = higher score """ engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1 age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1) score = engagement / (age_hours ** 1.5) return score * 1.5 if is_featured else score
undefined
def calculate_trending_score( like_count: int, comment_count: int, view_count: int, created_at: datetime, is_featured: bool = False, ) -> float: """ Trending score = engagement / age^decay Higher engagement + newer = higher score """ engagement = like_count * 1.0 + comment_count * 2.0 + view_count * 0.1 age_hours = max((datetime.now(timezone.utc) - created_at).total_seconds() / 3600, 0.1) score = engagement / (age_hours ** 1.5) return score * 1.5 if is_featured else score
undefined

SQL Schema

SQL Schema

sql
CREATE TABLE community_posts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(200) NOT NULL,
    like_count INTEGER DEFAULT 0,
    comment_count INTEGER DEFAULT 0,
    view_count INTEGER DEFAULT 0,
    is_featured BOOLEAN DEFAULT FALSE,
    tags TEXT[] DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    -- Computed trending score
    trending_score FLOAT GENERATED ALWAYS AS (
        (like_count + comment_count * 2 + view_count * 0.1) / 
        POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)
    ) STORED
);

CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);
CREATE INDEX idx_posts_created ON community_posts(created_at DESC);

-- Atomic increment functions
CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$
BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;
$$ LANGUAGE plpgsql;
sql
CREATE TABLE community_posts (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID NOT NULL REFERENCES users(id),
    title VARCHAR(200) NOT NULL,
    like_count INTEGER DEFAULT 0,
    comment_count INTEGER DEFAULT 0,
    view_count INTEGER DEFAULT 0,
    is_featured BOOLEAN DEFAULT FALSE,
    tags TEXT[] DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    -- Computed trending score
    trending_score FLOAT GENERATED ALWAYS AS (
        (like_count + comment_count * 2 + view_count * 0.1) / 
        POWER(GREATEST(EXTRACT(EPOCH FROM (NOW() - created_at)) / 3600, 0.1), 1.5)
    ) STORED
);

CREATE INDEX idx_posts_trending ON community_posts(trending_score DESC);
CREATE INDEX idx_posts_created ON community_posts(created_at DESC);

-- Atomic increment functions
CREATE FUNCTION increment_like_count(post_id UUID) RETURNS VOID AS $$
BEGIN UPDATE community_posts SET like_count = like_count + 1 WHERE id = post_id; END;
$$ LANGUAGE plpgsql;

Usage Examples

Usage Examples

python
feed_service = CommunityFeedService(db)
python
feed_service = CommunityFeedService(db)

Get trending feed

Get trending feed

result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20) for post in result.posts: print(f"{post['title']} - {post['like_count']} likes")
result = await feed_service.get_feed(feed_type="trending", viewer_id="user_123", limit=20) for post in result.posts: print(f"{post['title']} - {post['like_count']} likes")

Load next page

Load next page

if result.has_more: next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)
if result.has_more: next_page = await feed_service.get_feed(feed_type="trending", cursor=result.next_cursor)

Like a post

Like a post

await feed_service.like_post("post_456", "user_123")
undefined
await feed_service.like_post("post_456", "user_123")
undefined

Best Practices

最佳实践

  1. Use cursor pagination over offset for large datasets
  2. Batch-load relationships to avoid N+1 queries
  3. Store trending score as computed column for efficient sorting
  4. Use atomic database functions for counter updates
  5. Cache total counts (expensive to compute)
  1. 对于大型数据集,使用游标分页而非偏移量分页
  2. 批量加载关联数据以避免N+1查询
  3. 将热门分数存储为计算列以便高效排序
  4. 使用原子数据库函数更新计数器
  5. 缓存总计数(计算成本高)

Common Mistakes

常见错误

  • Using offset pagination (slow for large offsets)
  • N+1 queries for author/like data
  • Computing trending score on every query
  • Non-atomic counter updates (race conditions)
  • 使用偏移量分页(大偏移量时速度慢)
  • 获取作者/点赞数据时出现N+1查询
  • 每次查询都计算热门分数
  • 非原子的计数器更新(存在竞态条件)

Related Patterns

相关模式

  • analytics-pipeline (event tracking)
  • intelligent-cache (caching feeds)
  • analytics-pipeline(事件追踪)
  • intelligent-cache(信息流缓存)