redis-state-management

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Redis State Management

Redis状态管理

A comprehensive skill for mastering Redis state management patterns in distributed systems. This skill covers caching strategies, session management, pub/sub messaging, distributed locks, data structures, and production-ready patterns using redis-py.
这是一份掌握分布式系统中Redis状态管理模式的全面指南,涵盖缓存策略、会话管理、发布/订阅消息、分布式锁、数据结构,以及基于redis-py的生产就绪模式。

When to Use This Skill

适用场景

Use this skill when:
  • Implementing high-performance caching layers for web applications
  • Managing user sessions in distributed environments
  • Building real-time messaging and event distribution systems
  • Coordinating distributed processes with locks and synchronization
  • Storing and querying structured data with Redis data structures
  • Optimizing application performance with Redis
  • Scaling applications horizontally with shared state
  • Implementing rate limiting, counters, and analytics
  • Building microservices with Redis as a communication layer
  • Managing temporary data with automatic expiration (TTL)
  • Implementing leaderboards, queues, and real-time features
在以下场景中使用本指南:
  • 为Web应用实现高性能缓存层
  • 在分布式环境中管理用户会话
  • 构建实时消息与事件分发系统
  • 通过锁和同步协调分布式进程
  • 使用Redis数据结构存储和查询结构化数据
  • 利用Redis优化应用性能
  • 通过共享状态实现应用水平扩展
  • 实现速率限制、计数器与分析功能
  • 以Redis为通信层构建微服务
  • 管理带自动过期(TTL)的临时数据
  • 实现排行榜、队列与实时功能

Core Concepts

核心概念

Redis Fundamentals

Redis基础

Redis (Remote Dictionary Server) is an in-memory data structure store used as:
  • Database: Persistent key-value storage
  • Cache: High-speed data layer
  • Message Broker: Pub/sub and stream messaging
  • Session Store: Distributed session management
Key Characteristics:
  • In-memory storage (microsecond latency)
  • Optional persistence (RDB snapshots, AOF logs)
  • Rich data structures beyond key-value
  • Atomic operations on complex data types
  • Built-in replication and clustering
  • Pub/sub messaging support
  • Lua scripting for complex operations
  • Pipelining for batch operations
Redis(Remote Dictionary Server)是一款内存数据结构存储,可用于:
  • 数据库:持久化键值存储
  • 缓存:高速数据层
  • 消息中间件:发布/订阅与流消息
  • 会话存储:分布式会话管理
核心特性:
  • 内存存储(微秒级延迟)
  • 可选持久化(RDB快照、AOF日志)
  • 丰富的键值之外的数据结构
  • 复杂数据类型的原子操作
  • 内置复制与集群功能
  • 发布/订阅消息支持
  • Lua脚本实现复杂操作
  • 流水线批量操作

Redis Data Structures

Redis数据结构

Redis provides multiple data types for different use cases:
  1. Strings: Simple key-value pairs, binary safe
    • Use for: Cache values, counters, flags, JSON objects
    • Max size: 512 MB
    • Commands: SET, GET, INCR, APPEND
  2. Hashes: Field-value maps (objects)
    • Use for: User profiles, configuration objects, small entities
    • Efficient for storing objects with multiple fields
    • Commands: HSET, HGET, HMGET, HINCRBY
  3. Lists: Ordered collections (linked lists)
    • Use for: Queues, activity feeds, recent items
    • Operations at head/tail are O(1)
    • Commands: LPUSH, RPUSH, LPOP, RPOP, LRANGE
  4. Sets: Unordered unique collections
    • Use for: Tags, unique visitors, relationships
    • Set operations: union, intersection, difference
    • Commands: SADD, SMEMBERS, SISMEMBER, SINTER
  5. Sorted Sets: Ordered sets with scores
    • Use for: Leaderboards, time-series, priority queues
    • Range queries by score or rank
    • Commands: ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
  6. Streams: Append-only logs with consumer groups
    • Use for: Event sourcing, activity logs, message queues
    • Built-in consumer group support
    • Commands: XADD, XREAD, XREADGROUP
Redis提供多种数据类型以适配不同场景:
  1. 字符串:简单键值对,支持二进制安全
    • 适用场景:缓存值、计数器、标志位、JSON对象
    • 最大容量:512 MB
    • 命令:SET, GET, INCR, APPEND
  2. 哈希:字段-值映射(对象)
    • 适用场景:用户资料、配置对象、小型实体
    • 高效存储多字段对象
    • 命令:HSET, HGET, HMGET, HINCRBY
  3. 列表:有序集合(链表)
    • 适用场景:队列、活动流、最近项
    • 头尾操作时间复杂度O(1)
    • 命令:LPUSH, RPUSH, LPOP, RPOP, LRANGE
  4. 集合:无序唯一集合
    • 适用场景:标签、唯一访客、关系管理
    • 支持集合操作:并集、交集、差集
    • 命令:SADD, SMEMBERS, SISMEMBER, SINTER
  5. 有序集合:带分数的有序集合
    • 适用场景:排行榜、时间序列、优先级队列
    • 支持按分数或排名范围查询
    • 命令:ZADD, ZRANGE, ZRANGEBYSCORE, ZRANK
  6. :追加式日志与消费者组
    • 适用场景:事件溯源、活动日志、消息队列
    • 内置消费者组支持
    • 命令:XADD, XREAD, XREADGROUP

Connection Management

连接管理

Connection Pools: Redis connections are expensive to create. Always use connection pools:
python
import redis
连接池: Redis连接创建成本较高,应始终使用连接池:
python
import redis

Connection pool (recommended)

连接池(推荐)

pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10) r = redis.Redis(connection_pool=pool)
pool = redis.ConnectionPool(host='localhost', port=6379, db=0, max_connections=10) r = redis.Redis(connection_pool=pool)

Direct connection (avoid in production)

直接连接(生产环境避免使用)

r = redis.Redis(host='localhost', port=6379, db=0)

**Best Practices:**
- Use connection pools for all applications
- Set appropriate max_connections based on workload
- Enable decode_responses=True for string data
- Configure socket_timeout and socket_keepalive
- Handle connection errors with retries
r = redis.Redis(host='localhost', port=6379, db=0)

**最佳实践:**
- 所有应用均使用连接池
- 根据工作负载设置合适的max_connections
- 对字符串数据启用decode_responses=True
- 配置socket_timeout与socket_keepalive
- 处理连接错误时加入重试机制

Data Persistence

数据持久化

Redis offers two persistence mechanisms:
RDB (Redis Database): Point-in-time snapshots
  • Compact binary format
  • Fast restart times
  • Lower disk I/O
  • Potential data loss between snapshots
AOF (Append-Only File): Log of write operations
  • Better durability (fsync policies)
  • Larger files, slower restarts
  • Can be automatically rewritten/compacted
  • Minimal data loss potential
Hybrid Approach: RDB + AOF for best of both worlds
Redis提供两种持久化机制:
RDB(Redis数据库):时间点快照
  • 紧凑的二进制格式
  • 重启速度快
  • 磁盘I/O低
  • 快照间隔期间可能丢失数据
AOF(追加文件):写操作日志
  • 更好的持久性(支持fsync策略)
  • 文件更大,重启速度较慢
  • 可自动重写/压缩
  • 数据丢失风险极低
混合方式:RDB + AOF兼顾两者优势

RESP 3 Protocol

RESP 3协议

Redis Serialization Protocol version 3 offers:
  • Client-side caching support
  • Better data type support
  • Push notifications
  • Performance improvements
python
import redis
from redis.cache import CacheConfig
Redis序列化协议版本3提供:
  • 客户端缓存支持
  • 更好的数据类型支持
  • 推送通知
  • 性能提升
python
import redis
from redis.cache import CacheConfig

Enable RESP3 with client-side caching

启用RESP3与客户端缓存

r = redis.Redis(host='localhost', port=6379, protocol=3, cache_config=CacheConfig())
undefined
r = redis.Redis(host='localhost', port=6379, protocol=3, cache_config=CacheConfig())
undefined

Caching Strategies

缓存策略

Cache-Aside (Lazy Loading)

缓存旁路(懒加载)

Pattern: Application checks cache first, loads from database on miss
python
import redis
import json
from typing import Optional, Dict, Any

r = redis.Redis(decode_responses=True)

def get_user(user_id: int) -> Optional[Dict[str, Any]]:
    """Cache-aside pattern for user data."""
    cache_key = f"user:{user_id}"

    # Try cache first
    cached_data = r.get(cache_key)
    if cached_data:
        return json.loads(cached_data)

    # Cache miss - load from database
    user_data = database.get_user(user_id)  # Your DB query
    if user_data:
        # Store in cache with 1 hour TTL
        r.setex(cache_key, 3600, json.dumps(user_data))

    return user_data
Advantages:
  • Only requested data is cached (efficient memory usage)
  • Cache failures don't break the application
  • Simple to implement
Disadvantages:
  • Cache miss penalty (latency spike)
  • Thundering herd on popular items
  • Stale data until cache expiration
模式:应用先检查缓存,缓存未命中时从数据库加载
python
import redis
import json
from typing import Optional, Dict, Any

r = redis.Redis(decode_responses=True)

def get_user(user_id: int) -> Optional[Dict[str, Any]]:
    """用户数据的缓存旁路模式。"""
    cache_key = f"user:{user_id}"

    # 先尝试从缓存获取
    cached_data = r.get(cache_key)
    if cached_data:
        return json.loads(cached_data)

    # 缓存未命中 - 从数据库加载
    user_data = database.get_user(user_id)  # 你的数据库查询
    if user_data:
        # 存入缓存并设置1小时TTL
        r.setex(cache_key, 3600, json.dumps(user_data))

    return user_data
优势:
  • 仅缓存请求过的数据(内存使用高效)
  • 缓存故障不会导致应用崩溃
  • 实现简单
劣势:
  • 缓存未命中时存在延迟峰值
  • 热门数据可能出现缓存击穿
  • 缓存过期前数据可能过期

Write-Through Cache

写穿缓存

Pattern: Write to cache and database simultaneously
python
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
    """Write-through pattern for user updates."""
    cache_key = f"user:{user_id}"

    # Write to database first
    success = database.update_user(user_id, user_data)

    if success:
        # Update cache immediately
        r.setex(cache_key, 3600, json.dumps(user_data))

    return success
Advantages:
  • Cache always consistent with database
  • No read penalty for recently written data
Disadvantages:
  • Write latency increases
  • Unused data may be cached
  • Extra cache write overhead
模式:同时写入缓存与数据库
python
def update_user(user_id: int, user_data: Dict[str, Any]) -> bool:
    """用户数据更新的写穿模式。"""
    cache_key = f"user:{user_id}"

    # 先写入数据库
    success = database.update_user(user_id, user_data)

    if success:
        # 立即更新缓存
        r.setex(cache_key, 3600, json.dumps(user_data))

    return success
优势:
  • 缓存始终与数据库保持一致
  • 最近写入的数据读取无延迟
劣势:
  • 写入延迟增加
  • 可能缓存未使用的数据
  • 额外的缓存写入开销

Write-Behind (Write-Back) Cache

写回缓存(写后异步同步)

Pattern: Write to cache immediately, sync to database asynchronously
python
import redis
import json
from queue import Queue
from threading import Thread

r = redis.Redis(decode_responses=True)
write_queue = Queue()

def async_writer():
    """Background worker to sync cache to database."""
    while True:
        user_id, user_data = write_queue.get()
        try:
            database.update_user(user_id, user_data)
        except Exception as e:
            # Log error, potentially retry
            print(f"Failed to write user {user_id}: {e}")
        finally:
            write_queue.task_done()
模式:立即写入缓存,异步同步到数据库
python
import redis
import json
from queue import Queue
from threading import Thread

r = redis.Redis(decode_responses=True)
write_queue = Queue()

def async_writer():
    """后台工作线程,将缓存同步到数据库。"""
    while True:
        user_id, user_data = write_queue.get()
        try:
            database.update_user(user_id, user_data)
        except Exception as e:
            # 记录错误,可重试
            print(f"写入用户 {user_id} 失败: {e}")
        finally:
            write_queue.task_done()

Start background writer

启动后台写入线程

Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]): """Write-behind pattern for fast writes.""" cache_key = f"user:{user_id}"
# Write to cache immediately (fast)
r.setex(cache_key, 3600, json.dumps(user_data))

# Queue database write (async)
write_queue.put((user_id, user_data))

**Advantages:**
- Minimal write latency
- Can batch database writes
- Handles write spikes

**Disadvantages:**
- Risk of data loss if cache fails
- Complex error handling
- Consistency challenges
Thread(target=async_writer, daemon=True).start()
def update_user_fast(user_id: int, user_data: Dict[str, Any]): """用户数据更新的写回模式(高速写入)。""" cache_key = f"user:{user_id}"
# 立即写入缓存(速度快)
r.setex(cache_key, 3600, json.dumps(user_data))

# 加入数据库写入队列(异步)
write_queue.put((user_id, user_data))

**优势:**
- 写入延迟极低
- 可批量写入数据库
- 应对写入峰值

**劣势:**
- 缓存故障可能导致数据丢失
- 错误处理复杂
- 一致性挑战

Cache Invalidation Strategies

缓存失效策略

Time-based Expiration (TTL):
python
undefined
基于时间的过期(TTL):
python
undefined

Set key with expiration

设置带过期时间的键

r.setex("session:abc123", 1800, session_data) # 30 minutes
r.setex("session:abc123", 1800, session_data) # 30分钟

Or set TTL on existing key

或为已有键设置TTL

r.expire("user:profile:123", 3600) # 1 hour
r.expire("user:profile:123", 3600) # 1小时

Check remaining TTL

检查剩余TTL

ttl = r.ttl("user:profile:123")

**Event-based Invalidation:**

```python
def update_product(product_id: int, product_data: dict):
    """Invalidate cache on update."""
    # Update database
    database.update_product(product_id, product_data)

    # Invalidate related caches
    r.delete(f"product:{product_id}")
    r.delete(f"product_list:category:{product_data['category']}")
    r.delete("products:featured")
Pattern-based Invalidation:
python
undefined
ttl = r.ttl("user:profile:123")

**基于事件的失效:**

```python
def update_product(product_id: int, product_data: dict):
    """更新时失效缓存。"""
    # 更新数据库
    database.update_product(product_id, product_data)

    # 失效相关缓存
    r.delete(f"product:{product_id}")
    r.delete(f"product_list:category:{product_data['category']}")
    r.delete("products:featured")
基于模式的失效:
python
undefined

Delete all keys matching pattern

删除所有匹配模式的键

def invalidate_user_cache(user_id: int): """Invalidate all cache entries for a user.""" pattern = f"user:{user_id}:*"
# Find and delete matching keys
for key in r.scan_iter(match=pattern, count=100):
    r.delete(key)
undefined
def invalidate_user_cache(user_id: int): """失效用户的所有缓存条目。""" pattern = f"user:{user_id}:*"
# 查找并删除匹配的键
for key in r.scan_iter(match=pattern, count=100):
    r.delete(key)
undefined

Cache Stampede Prevention

缓存击穿预防

Problem: Multiple requests simultaneously miss cache and query database
Solution 1: Probabilistic Early Expiration
python
import time
import random

def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
    """Prevent stampede with probabilistic early recomputation."""
    value = r.get(key)

    if value is None:
        # Cache miss - compute and cache
        value = compute_value(key)
        r.setex(key, ttl, value)
        return value

    # Check if we should recompute early
    current_time = time.time()
    delta = current_time - float(r.get(f"{key}:timestamp") or 0)
    expiry = ttl * random.random() * beta

    if delta > expiry:
        # Recompute in background
        value = compute_value(key)
        r.setex(key, ttl, value)
        r.set(f"{key}:timestamp", current_time)

    return value
Solution 2: Locking
python
from contextlib import contextmanager

@contextmanager
def cache_lock(key: str, timeout: int = 10):
    """Acquire lock for cache computation."""
    lock_key = f"{key}:lock"
    identifier = str(time.time())

    # Try to acquire lock
    if r.set(lock_key, identifier, nx=True, ex=timeout):
        try:
            yield True
        finally:
            # Release lock
            if r.get(lock_key) == identifier:
                r.delete(lock_key)
    else:
        yield False

def get_with_lock(key: str):
    """Use lock to prevent stampede."""
    value = r.get(key)

    if value is None:
        with cache_lock(key) as acquired:
            if acquired:
                # We got the lock - compute value
                value = compute_value(key)
                r.setex(key, 3600, value)
            else:
                # Someone else is computing - wait and retry
                time.sleep(0.1)
                value = r.get(key) or compute_value(key)

    return value
问题:多个请求同时缓存未命中,并发查询数据库
解决方案1:概率性提前过期
python
import time
import random

def get_with_early_expiration(key: str, ttl: int = 3600, beta: float = 1.0):
    """通过概率性提前计算预防缓存击穿。"""
    value = r.get(key)

    if value is None:
        # 缓存未命中 - 计算并存入缓存
        value = compute_value(key)
        r.setex(key, ttl, value)
        return value

    # 检查是否需要提前重新计算
    current_time = time.time()
    delta = current_time - float(r.get(f"{key}:timestamp") or 0)
    expiry = ttl * random.random() * beta

    if delta > expiry:
        # 后台重新计算
        value = compute_value(key)
        r.setex(key, ttl, value)
        r.set(f"{key}:timestamp", current_time)

    return value
解决方案2:加锁
python
from contextlib import contextmanager

@contextmanager
def cache_lock(key: str, timeout: int = 10):
    """为缓存计算获取锁。"""
    lock_key = f"{key}:lock"
    identifier = str(time.time())

    # 尝试获取锁
    if r.set(lock_key, identifier, nx=True, ex=timeout):
        try:
            yield True
        finally:
            # 释放锁
            if r.get(lock_key) == identifier:
                r.delete(lock_key)
    else:
        yield False

def get_with_lock(key: str):
    """使用锁预防缓存击穿。"""
    value = r.get(key)

    if value is None:
        with cache_lock(key) as acquired:
            if acquired:
                # 获取到锁 - 计算值
                value = compute_value(key)
                r.setex(key, 3600, value)
            else:
                # 其他进程正在计算 - 等待后重试
                time.sleep(0.1)
                value = r.get(key) or compute_value(key)

    return value

Session Management

会话管理

Distributed Session Storage

分布式会话存储

Basic Session Management:
python
import redis
import json
import uuid
from datetime import datetime, timedelta

r = redis.Redis(decode_responses=True)

class SessionManager:
    def __init__(self, ttl: int = 1800):
        """Session manager with Redis backend.

        Args:
            ttl: Session timeout in seconds (default 30 minutes)
        """
        self.ttl = ttl

    def create_session(self, user_id: int, data: dict = None) -> str:
        """Create new session and return session ID."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "data": data or {}
        }

        r.setex(session_key, self.ttl, json.dumps(session_data))
        return session_id

    def get_session(self, session_id: str) -> dict:
        """Retrieve session data and refresh TTL."""
        session_key = f"session:{session_id}"
        session_data = r.get(session_key)

        if session_data:
            # Refresh TTL on access (sliding expiration)
            r.expire(session_key, self.ttl)
            return json.loads(session_data)

        return None

    def update_session(self, session_id: str, data: dict) -> bool:
        """Update session data."""
        session_key = f"session:{session_id}"
        session_data = self.get_session(session_id)

        if session_data:
            session_data["data"].update(data)
            r.setex(session_key, self.ttl, json.dumps(session_data))
            return True

        return False

    def delete_session(self, session_id: str) -> bool:
        """Delete session (logout)."""
        session_key = f"session:{session_id}"
        return r.delete(session_key) > 0
基础会话管理:
python
import redis
import json
import uuid
from datetime import datetime, timedelta

r = redis.Redis(decode_responses=True)

class SessionManager:
    def __init__(self, ttl: int = 1800):
        """基于Redis的会话管理器。

        参数:
            ttl: 会话超时时间(秒,默认30分钟)
        """
        self.ttl = ttl

    def create_session(self, user_id: int, data: dict = None) -> str:
        """创建新会话并返回会话ID。"""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "data": data or {}
        }

        r.setex(session_key, self.ttl, json.dumps(session_data))
        return session_id

    def get_session(self, session_id: str) -> dict:
        """获取会话数据并刷新TTL。"""
        session_key = f"session:{session_id}"
        session_data = r.get(session_key)

        if session_data:
            # 访问时刷新TTL(滑动过期)
            r.expire(session_key, self.ttl)
            return json.loads(session_data)

        return None

    def update_session(self, session_id: str, data: dict) -> bool:
        """更新会话数据。"""
        session_key = f"session:{session_id}"
        session_data = self.get_session(session_id)

        if session_data:
            session_data["data"].update(data)
            r.setex(session_key, self.ttl, json.dumps(session_data))
            return True

        return False

    def delete_session(self, session_id: str) -> bool:
        """删除会话(登出)。"""
        session_key = f"session:{session_id}"
        return r.delete(session_key) > 0

Session with Hash Storage

基于哈希的会话存储

More efficient for session objects:
python
class HashSessionManager:
    """Session manager using Redis hashes for better performance."""

    def __init__(self, ttl: int = 1800):
        self.ttl = ttl

    def create_session(self, user_id: int, **kwargs) -> str:
        """Create session using hash."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        # Store as hash for efficient field access
        session_fields = {
            "user_id": str(user_id),
            "created_at": datetime.utcnow().isoformat(),
            **{k: str(v) for k, v in kwargs.items()}
        }

        r.hset(session_key, mapping=session_fields)
        r.expire(session_key, self.ttl)

        return session_id

    def get_field(self, session_id: str, field: str) -> str:
        """Get single session field efficiently."""
        session_key = f"session:{session_id}"
        value = r.hget(session_key, field)

        if value:
            r.expire(session_key, self.ttl)  # Refresh TTL

        return value

    def set_field(self, session_id: str, field: str, value: str) -> bool:
        """Update single session field."""
        session_key = f"session:{session_id}"

        if r.exists(session_key):
            r.hset(session_key, field, value)
            r.expire(session_key, self.ttl)
            return True

        return False

    def get_all(self, session_id: str) -> dict:
        """Get all session fields."""
        session_key = f"session:{session_id}"
        data = r.hgetall(session_key)

        if data:
            r.expire(session_key, self.ttl)

        return data
更高效的会话对象存储:
python
class HashSessionManager:
    """使用Redis哈希的会话管理器,性能更优。"""

    def __init__(self, ttl: int = 1800):
        self.ttl = ttl

    def create_session(self, user_id: int, **kwargs) -> str:
        """使用哈希创建会话。"""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        # 以哈希形式存储,支持高效字段访问
        session_fields = {
            "user_id": str(user_id),
            "created_at": datetime.utcnow().isoformat(),
            **{k: str(v) for k, v in kwargs.items()}
        }

        r.hset(session_key, mapping=session_fields)
        r.expire(session_key, self.ttl)

        return session_id

    def get_field(self, session_id: str, field: str) -> str:
        """高效获取单个会话字段。"""
        session_key = f"session:{session_id}"
        value = r.hget(session_key, field)

        if value:
            r.expire(session_key, self.ttl)  # 刷新TTL

        return value

    def set_field(self, session_id: str, field: str, value: str) -> bool:
        """更新单个会话字段。"""
        session_key = f"session:{session_id}"

        if r.exists(session_key):
            r.hset(session_key, field, value)
            r.expire(session_key, self.ttl)
            return True

        return False

    def get_all(self, session_id: str) -> dict:
        """获取所有会话字段。"""
        session_key = f"session:{session_id}"
        data = r.hgetall(session_key)

        if data:
            r.expire(session_key, self.ttl)

        return data

User Activity Tracking

用户活动追踪

python
def track_user_activity(user_id: int, action: str):
    """Track user activity with automatic expiration."""
    activity_key = f"user:{user_id}:activity"
    timestamp = datetime.utcnow().isoformat()

    # Add activity to list
    r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))

    # Keep only last 100 activities
    r.ltrim(activity_key, 0, 99)

    # Set expiration (30 days)
    r.expire(activity_key, 2592000)

def get_recent_activity(user_id: int, limit: int = 10) -> list:
    """Get recent user activities."""
    activity_key = f"user:{user_id}:activity"
    activities = r.lrange(activity_key, 0, limit - 1)

    return [json.loads(a) for a in activities]
python
def track_user_activity(user_id: int, action: str):
    """追踪用户活动,带自动过期。"""
    activity_key = f"user:{user_id}:activity"
    timestamp = datetime.utcnow().isoformat()

    # 将活动加入列表
    r.lpush(activity_key, json.dumps({"action": action, "timestamp": timestamp}))

    # 仅保留最近100条活动记录
    r.ltrim(activity_key, 0, 99)

    # 设置过期时间(30天)
    r.expire(activity_key, 2592000)

def get_recent_activity(user_id: int, limit: int = 10) -> list:
    """获取用户最近的活动记录。"""
    activity_key = f"user:{user_id}:activity"
    activities = r.lrange(activity_key, 0, limit - 1)

    return [json.loads(a) for a in activities]

Pub/Sub Patterns

发布/订阅模式

Basic Publisher/Subscriber

基础发布者/订阅者

Publisher:
python
import redis

r = redis.Redis(decode_responses=True)

def publish_event(channel: str, message: dict):
    """Publish event to channel."""
    import json
    r.publish(channel, json.dumps(message))
发布者:
python
import redis

r = redis.Redis(decode_responses=True)

def publish_event(channel: str, message: dict):
    """向频道发布事件。"""
    import json
    r.publish(channel, json.dumps(message))

Example usage

使用示例

publish_event("notifications", { "type": "user_signup", "user_id": 12345, "timestamp": datetime.utcnow().isoformat() })

**Subscriber:**

```python
import redis
import json

def handle_message(message):
    """Process received message."""
    data = json.loads(message['data'])
    print(f"Received: {data}")
publish_event("notifications", { "type": "user_signup", "user_id": 12345, "timestamp": datetime.utcnow().isoformat() })

**订阅者:**

```python
import redis
import json

def handle_message(message):
    """处理接收到的消息。"""
    data = json.loads(message['data'])
    print(f"收到消息: {data}")

Initialize pubsub

初始化pubsub

r = redis.Redis(decode_responses=True) p = r.pubsub()
r = redis.Redis(decode_responses=True) p = r.pubsub()

Subscribe to channels

订阅频道

p.subscribe('notifications', 'alerts')
p.subscribe('notifications', 'alerts')

Listen for messages

监听消息

for message in p.listen(): if message['type'] == 'message': handle_message(message)
undefined
for message in p.listen(): if message['type'] == 'message': handle_message(message)
undefined

Pattern-Based Subscriptions

基于模式的订阅

python
undefined
python
undefined

Subscribe to multiple channels with patterns

订阅多个匹配模式的频道

p = r.pubsub() p.psubscribe('user:', 'notification:')
p = r.pubsub() p.psubscribe('user:', 'notification:')

Get messages from pattern subscriptions

获取模式订阅的消息

for message in p.listen(): if message['type'] == 'pmessage': channel = message['channel'] pattern = message['pattern'] data = message['data'] print(f"Pattern {pattern} matched {channel}: {data}")
undefined
for message in p.listen(): if message['type'] == 'pmessage': channel = message['channel'] pattern = message['pattern'] data = message['data'] print(f"模式 {pattern} 匹配频道 {channel}: {data}")
undefined

Async Pub/Sub with Background Thread

带后台线程的异步发布/订阅

python
import redis
import time

r = redis.Redis(decode_responses=True)
p = r.pubsub()

def message_handler(message):
    """Handle messages in background thread."""
    print(f"Handler received: {message['data']}")
python
import redis
import time

r = redis.Redis(decode_responses=True)
p = r.pubsub()

def message_handler(message):
    """在后台线程中处理消息。"""
    print(f"处理器收到消息: {message['data']}")

Subscribe with handler

订阅并指定处理器

p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})
p.subscribe(**{'notifications': message_handler, 'alerts': message_handler})

Run in background thread

在后台线程运行

thread = p.run_in_thread(sleep_time=0.001)
thread = p.run_in_thread(sleep_time=0.001)

Publish some messages

发布一些消息

r.publish('notifications', 'Hello!') r.publish('alerts', 'Warning!')
time.sleep(1)
r.publish('notifications', '你好!') r.publish('alerts', '警告!')
time.sleep(1)

Stop background thread

停止后台线程

thread.stop()
undefined
thread.stop()
undefined

Async Pub/Sub with asyncio

基于asyncio的异步发布/订阅

python
import asyncio
import redis.asyncio as redis

async def reader(channel: redis.client.PubSub):
    """Async message reader."""
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
        if message is not None:
            print(f"Received: {message}")

            # Stop on specific message
            if message["data"].decode() == "STOP":
                break

async def pubsub_example():
    """Async pub/sub example."""
    r = await redis.from_url("redis://localhost")

    async with r.pubsub() as pubsub:
        # Subscribe to channels
        await pubsub.subscribe("channel:1", "channel:2")

        # Create reader task
        reader_task = asyncio.create_task(reader(pubsub))

        # Publish messages
        await r.publish("channel:1", "Hello")
        await r.publish("channel:2", "World")
        await r.publish("channel:1", "STOP")

        # Wait for reader to finish
        await reader_task

    await r.close()
python
import asyncio
import redis.asyncio as redis

async def reader(channel: redis.client.PubSub):
    """异步消息读取器。"""
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
        if message is not None:
            print(f"收到消息: {message}")

            # 收到特定消息时停止
            if message["data"].decode() == "STOP":
                break

async def pubsub_example():
    """异步发布/订阅示例。"""
    r = await redis.from_url("redis://localhost")

    async with r.pubsub() as pubsub:
        # 订阅频道
        await pubsub.subscribe("channel:1", "channel:2")

        # 创建读取任务
        reader_task = asyncio.create_task(reader(pubsub))

        # 发布消息
        await r.publish("channel:1", "你好")
        await r.publish("channel:2", "世界")
        await r.publish("channel:1", "STOP")

        # 等待读取器完成
        await reader_task

    await r.close()

Run async example

运行异步示例

asyncio.run(pubsub_example())
undefined
asyncio.run(pubsub_example())
undefined

Sharded Pub/Sub (Redis 7.0+)

分片发布/订阅(Redis 7.0+)

python
from redis.cluster import RedisCluster, ClusterNode
python
from redis.cluster import RedisCluster, ClusterNode

Connect to cluster

连接集群

rc = RedisCluster(startup_nodes=[ ClusterNode('localhost', 6379), ClusterNode('localhost', 6380) ])
rc = RedisCluster(startup_nodes=[ ClusterNode('localhost', 6379), ClusterNode('localhost', 6380) ])

Create sharded pubsub

创建分片pubsub

p = rc.pubsub() p.ssubscribe('foo')
p = rc.pubsub() p.ssubscribe('foo')

Get message from specific node

从特定节点获取消息

message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
undefined
message = p.get_sharded_message(target_node=ClusterNode('localhost', 6379))
undefined

Distributed Locks

分布式锁

Simple Lock Implementation

简单锁实现

python
import redis
import time
import uuid

class RedisLock:
    """Simple distributed lock using Redis."""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """Acquire lock."""
        end_time = time.time() + (timeout or self.timeout)

        while True:
            # Try to set lock with NX (only if not exists) and EX (expiration)
            if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
                return True

            if not blocking:
                return False

            if timeout and time.time() > end_time:
                return False

            # Wait before retry
            time.sleep(0.01)

    def release(self) -> bool:
        """Release lock only if we own it."""
        # Use Lua script for atomic check-and-delete
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1

    def __enter__(self):
        """Context manager support."""
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager cleanup."""
        self.release()
python
import redis
import time
import uuid

class RedisLock:
    """基于Redis的简单分布式锁。"""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """获取锁。"""
        end_time = time.time() + (timeout or self.timeout)

        while True:
            # 尝试设置锁,使用NX(仅当键不存在时)和EX(过期时间)
            if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
                return True

            if not blocking:
                return False

            if timeout and time.time() > end_time:
                return False

            # 重试前等待
            time.sleep(0.01)

    def release(self) -> bool:
        """仅释放自己持有的锁。"""
        # 使用Lua脚本实现原子性检查与删除
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1

    def __enter__(self):
        """上下文管理器支持。"""
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器清理。"""
        self.release()

Usage example

使用示例

r = redis.Redis() lock = RedisLock(r, "resource:123", timeout=5)
with lock: # Critical section - only one process at a time print("Processing resource 123") process_resource()
undefined
r = redis.Redis() lock = RedisLock(r, "resource:123", timeout=5)
with lock: # 临界区 - 同一时间仅一个进程执行 print("处理资源123") process_resource()
undefined

Advanced Lock with Auto-Renewal

带自动续约的高级锁

python
import threading

class RenewableLock:
    """Distributed lock with automatic renewal."""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self.renewal_thread = None
        self.stop_renewal = threading.Event()

    def _renew_lock(self):
        """Background task to renew lock."""
        while not self.stop_renewal.is_set():
            time.sleep(self.timeout / 3)  # Renew at 1/3 of timeout

            # Renew only if we still own the lock
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("expire", KEYS[1], ARGV[2])
            else
                return 0
            end
            """

            result = self.redis.eval(lua_script, 1, self.key,
                                   self.identifier, self.timeout)

            if result == 0:
                # We lost the lock
                self.stop_renewal.set()

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """Acquire lock and start auto-renewal."""
        if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
            # Start renewal thread
            self.stop_renewal.clear()
            self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
            self.renewal_thread.start()
            return True

        return False

    def release(self) -> bool:
        """Release lock and stop renewal."""
        self.stop_renewal.set()

        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1
python
import threading

class RenewableLock:
    """带自动续约的分布式锁。"""

    def __init__(self, redis_client: redis.Redis, key: str, timeout: int = 10):
        self.redis = redis_client
        self.key = f"lock:{key}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self.renewal_thread = None
        self.stop_renewal = threading.Event()

    def _renew_lock(self):
        """后台任务,自动续约锁。"""
        while not self.stop_renewal.is_set():
            time.sleep(self.timeout / 3)  # 每1/3超时时间续约一次

            # 仅当锁仍属于自己时续约
            lua_script = """
            if redis.call("get", KEYS[1]) == ARGV[1] then
                return redis.call("expire", KEYS[1], ARGV[2])
            else
                return 0
            end
            """

            result = self.redis.eval(lua_script, 1, self.key,
                                   self.identifier, self.timeout)

            if result == 0:
                # 锁已丢失
                self.stop_renewal.set()

    def acquire(self, blocking: bool = True, timeout: float = None) -> bool:
        """获取锁并启动自动续约。"""
        if self.redis.set(self.key, self.identifier, nx=True, ex=self.timeout):
            # 启动续约线程
            self.stop_renewal.clear()
            self.renewal_thread = threading.Thread(target=self._renew_lock, daemon=True)
            self.renewal_thread.start()
            return True

        return False

    def release(self) -> bool:
        """释放锁并停止续约。"""
        self.stop_renewal.set()

        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        result = self.redis.eval(lua_script, 1, self.key, self.identifier)
        return result == 1

Redlock Algorithm (Multiple Redis Instances)

Redlock算法(多Redis实例)

python
class Redlock:
    """Redlock algorithm for distributed locking across multiple Redis instances."""

    def __init__(self, redis_instances: list):
        """
        Args:
            redis_instances: List of Redis client connections
        """
        self.instances = redis_instances
        self.quorum = len(redis_instances) // 2 + 1

    def acquire(self, resource: str, ttl: int = 10000) -> tuple:
        """
        Acquire lock across multiple Redis instances.

        Returns:
            (success: bool, lock_identifier: str)
        """
        identifier = str(uuid.uuid4())
        start_time = int(time.time() * 1000)

        # Try to acquire lock on all instances
        acquired = 0
        for instance in self.instances:
            try:
                if instance.set(f"lock:{resource}", identifier,
                              nx=True, px=ttl):
                    acquired += 1
            except Exception:
                pass

        # Calculate elapsed time
        elapsed = int(time.time() * 1000) - start_time
        validity_time = ttl - elapsed - 100  # drift compensation

        # Check if we got quorum
        if acquired >= self.quorum and validity_time > 0:
            return True, identifier
        else:
            # Release locks if we didn't get quorum
            self._release_all(resource, identifier)
            return False, None

    def _release_all(self, resource: str, identifier: str):
        """Release lock on all instances."""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        for instance in self.instances:
            try:
                instance.eval(lua_script, 1, f"lock:{resource}", identifier)
            except Exception:
                pass
python
class Redlock:
    """跨多Redis实例的分布式锁Redlock算法。"""

    def __init__(self, redis_instances: list):
        """
        参数:
            redis_instances: Redis客户端连接列表
        """
        self.instances = redis_instances
        self.quorum = len(redis_instances) // 2 + 1

    def acquire(self, resource: str, ttl: int = 10000) -> tuple:
        """
        跨多Redis实例获取锁。

        返回:
            (success: bool, lock_identifier: str)
        """
        identifier = str(uuid.uuid4())
        start_time = int(time.time() * 1000)

        # 尝试在所有实例上获取锁
        acquired = 0
        for instance in self.instances:
            try:
                if instance.set(f"lock:{resource}", identifier,
                              nx=True, px=ttl):
                    acquired += 1
            except Exception:
                pass

        # 计算耗时
        elapsed = int(time.time() * 1000) - start_time
        validity_time = ttl - elapsed - 100  # 补偿时钟漂移

        # 检查是否达到法定票数
        if acquired >= self.quorum and validity_time > 0:
            return True, identifier
        else:
            # 未达到法定票数,释放已获取的锁
            self._release_all(resource, identifier)
            return False, None

    def _release_all(self, resource: str, identifier: str):
        """在所有实例上释放锁。"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """

        for instance in self.instances:
            try:
                instance.eval(lua_script, 1, f"lock:{resource}", identifier)
            except Exception:
                pass

Data Structures and Operations

数据结构与操作

Working with Hashes

哈希的使用

python
undefined
python
undefined

User profile storage

用户资料存储

def save_user_profile(user_id: int, profile: dict): """Save user profile as hash.""" key = f"user:profile:{user_id}" r.hset(key, mapping=profile) r.expire(key, 86400) # 24 hour TTL
def get_user_profile(user_id: int) -> dict: """Get complete user profile.""" key = f"user:profile:{user_id}" return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str): """Update single profile field.""" key = f"user:profile:{user_id}" r.hset(key, field, value)
def save_user_profile(user_id: int, profile: dict): """将用户资料存储为哈希。""" key = f"user:profile:{user_id}" r.hset(key, mapping=profile) r.expire(key, 86400) # 24小时TTL
def get_user_profile(user_id: int) -> dict: """获取完整的用户资料。""" key = f"user:profile:{user_id}" return r.hgetall(key)
def update_user_field(user_id: int, field: str, value: str): """更新单个资料字段。""" key = f"user:profile:{user_id}" r.hset(key, field, value)

Example usage

使用示例

save_user_profile(123, { "username": "alice", "email": "alice@example.com", "age": "30" })
save_user_profile(123, { "username": "alice", "email": "alice@example.com", "age": "30" })

Atomic increment

原子递增

r.hincrby("user:profile:123", "login_count", 1)
undefined
r.hincrby("user:profile:123", "login_count", 1)
undefined

Working with Lists

列表的使用

python
undefined
python
undefined

Job queue implementation

任务队列实现

def enqueue_job(queue_name: str, job_data: dict): """Add job to queue.""" key = f"queue:{queue_name}" r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict: """Get job from queue (blocking).""" key = f"queue:{queue_name}"
if timeout > 0:
    # Blocking pop with timeout
    result = r.blpop(key, timeout=timeout)
    if result:
        _, job_data = result
        return json.loads(job_data)
else:
    # Non-blocking pop
    job_data = r.lpop(key)
    if job_data:
        return json.loads(job_data)

return None
def enqueue_job(queue_name: str, job_data: dict): """将任务加入队列。""" key = f"queue:{queue_name}" r.rpush(key, json.dumps(job_data))
def dequeue_job(queue_name: str, timeout: int = 0) -> dict: """从队列获取任务(支持阻塞)。""" key = f"queue:{queue_name}"
if timeout > 0:
    # 阻塞式弹出,带超时
    result = r.blpop(key, timeout=timeout)
    if result:
        _, job_data = result
        return json.loads(job_data)
else:
    # 非阻塞式弹出
    job_data = r.lpop(key)
    if job_data:
        return json.loads(job_data)

return None

Activity feed

活动流

def add_to_feed(user_id: int, activity: dict): """Add activity to user feed.""" key = f"feed:{user_id}" r.lpush(key, json.dumps(activity)) r.ltrim(key, 0, 99) # Keep only latest 100 items r.expire(key, 604800) # 7 days
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list: """Get user feed with pagination.""" key = f"feed:{user_id}" items = r.lrange(key, start, end) return [json.loads(item) for item in items]
undefined
def add_to_feed(user_id: int, activity: dict): """将活动加入用户流。""" key = f"feed:{user_id}" r.lpush(key, json.dumps(activity)) r.ltrim(key, 0, 99) # 仅保留最近100条 r.expire(key, 604800) # 7天
def get_feed(user_id: int, start: int = 0, end: int = 19) -> list: """分页获取用户活动流。""" key = f"feed:{user_id}" items = r.lrange(key, start, end) return [json.loads(item) for item in items]
undefined

Working with Sets

集合的使用

python
undefined
python
undefined

Tags and relationships

标签与关系管理

def add_tags(item_id: int, tags: list): """Add tags to item.""" key = f"item:{item_id}:tags" r.sadd(key, *tags)
def get_tags(item_id: int) -> set: """Get all tags for item.""" key = f"item:{item_id}:tags" return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set: """Find items having all specified tags.""" keys = [f"item:*:tags" for _ in tags] # This is simplified - in practice, you'd need to track item IDs differently return r.sinter(*keys)
def add_tags(item_id: int, tags: list): """为项目添加标签。""" key = f"item:{item_id}:tags" r.sadd(key, *tags)
def get_tags(item_id: int) -> set: """获取项目的所有标签。""" key = f"item:{item_id}:tags" return r.smembers(key)
def find_items_with_all_tags(tags: list) -> set: """查找包含所有指定标签的项目。""" keys = [f"item:*:tags" for _ in tags] # 此处为简化实现,实际中需以不同方式跟踪项目ID return r.sinter(*keys)

Online users tracking

在线用户追踪

def user_online(user_id: int): """Mark user as online.""" r.sadd("users:online", user_id) r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int): """Mark user as offline.""" r.srem("users:online", user_id)
def get_online_users() -> set: """Get all online users.""" return r.smembers("users:online")
def get_online_count() -> int: """Get count of online users.""" return r.scard("users:online")
undefined
def user_online(user_id: int): """标记用户为在线。""" r.sadd("users:online", user_id) r.expire(f"user:{user_id}:heartbeat", 60)
def user_offline(user_id: int): """标记用户为离线。""" r.srem("users:online", user_id)
def get_online_users() -> set: """获取所有在线用户。""" return r.smembers("users:online")
def get_online_count() -> int: """获取在线用户数量。""" return r.scard("users:online")
undefined

Working with Sorted Sets

有序集合的使用

python
undefined
python
undefined

Leaderboard implementation

排行榜实现

def update_score(leaderboard: str, user_id: int, score: float): """Update user score in leaderboard.""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list: """Get top players (descending order).""" key = f"leaderboard:{leaderboard}" # ZREVRANGE for descending order (highest scores first) return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int: """Get user's rank (0-indexed).""" key = f"leaderboard:{leaderboard}" # ZREVRANK for descending rank rank = r.zrevrank(key, user_id) return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float: """Get user's score.""" key = f"leaderboard:{leaderboard}" score = r.zscore(key, user_id) return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list: """Get users within score range.""" key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True)
def update_score(leaderboard: str, user_id: int, score: float): """更新用户在排行榜中的分数。""" key = f"leaderboard:{leaderboard}" r.zadd(key, {user_id: score})
def get_leaderboard(leaderboard: str, start: int = 0, end: int = 9) -> list: """获取排行榜前N名(降序)。""" key = f"leaderboard:{leaderboard}" # ZREVRANGE用于降序排列(分数从高到低) return r.zrevrange(key, start, end, withscores=True)
def get_user_rank(leaderboard: str, user_id: int) -> int: """获取用户排名(0起始)。""" key = f"leaderboard:{leaderboard}" # ZREVRANK用于降序排名 rank = r.zrevrank(key, user_id) return rank if rank is not None else -1
def get_user_score(leaderboard: str, user_id: int) -> float: """获取用户分数。""" key = f"leaderboard:{leaderboard}" score = r.zscore(key, user_id) return score if score is not None else 0.0
def get_score_range(leaderboard: str, min_score: float, max_score: float) -> list: """获取分数范围内的用户。""" key = f"leaderboard:{leaderboard}" return r.zrangebyscore(key, min_score, max_score, withscores=True)

Time-based sorted set (activity stream)

基于时间的有序集合(活动流)

def add_activity(user_id: int, activity: str): """Add timestamped activity.""" key = f"user:{user_id}:activities" timestamp = time.time() r.zadd(key, {activity: timestamp})
# Keep only last 24 hours
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)
def get_recent_activities(user_id: int, count: int = 10) -> list: """Get recent activities.""" key = f"user:{user_id}:activities" # Get most recent (highest timestamps) return r.zrevrange(key, 0, count - 1, withscores=True)
undefined
def add_activity(user_id: int, activity: str): """添加带时间戳的活动。""" key = f"user:{user_id}:activities" timestamp = time.time() r.zadd(key, {activity: timestamp})
# 仅保留最近24小时的活动
cutoff = timestamp - 86400
r.zremrangebyscore(key, '-inf', cutoff)
def get_recent_activities(user_id: int, count: int = 10) -> list: """获取最近的活动。""" key = f"user:{user_id}:activities" # 获取最近的活动(时间戳最高) return r.zrevrange(key, 0, count - 1, withscores=True)
undefined

Working with Streams

流的使用

python
undefined
python
undefined

Event stream

事件流

def add_event(stream_key: str, event_data: dict) -> str: """Add event to stream.""" # Returns auto-generated ID (timestamp-sequence) event_id = r.xadd(stream_key, event_data) return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list: """Read events from stream.""" events = r.xread({stream_key: start_id}, count=count)
# events format: [(stream_name, [(id, data), (id, data), ...])]
if events:
    _, event_list = events[0]
    return event_list

return []
def add_event(stream_key: str, event_data: dict) -> str: """向流中添加事件。""" # 返回自动生成的ID(时间戳-序列号) event_id = r.xadd(stream_key, event_data) return event_id
def read_events(stream_key: str, count: int = 10, start_id: str = '0') -> list: """从流中读取事件。""" events = r.xread({stream_key: start_id}, count=count)
# events格式: [(流名称, [(ID, 数据), (ID, 数据), ...])]
if events:
    _, event_list = events[0]
    return event_list

return []

Consumer groups

消费者组

def create_consumer_group(stream_key: str, group_name: str): """Create consumer group for stream.""" try: r.xgroup_create(name=stream_key, groupname=group_name, id='0') except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise
def read_from_group(stream_key: str, group_name: str, consumer_name: str, count: int = 10) -> list: """Read events as consumer in group.""" # Read new messages with '>' events = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: '>'}, count=count, block=5000 # 5 second timeout )
if events:
    _, event_list = events[0]
    return event_list

return []
def acknowledge_event(stream_key: str, group_name: str, event_id: str): """Acknowledge processed event.""" r.xack(stream_key, group_name, event_id)
def create_consumer_group(stream_key: str, group_name: str): """为流创建消费者组。""" try: r.xgroup_create(name=stream_key, groupname=group_name, id='0') except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise
def read_from_group(stream_key: str, group_name: str, consumer_name: str, count: int = 10) -> list: """以组内消费者身份读取事件。""" # 使用'>'读取新消息 events = r.xreadgroup( groupname=group_name, consumername=consumer_name, streams={stream_key: '>'}, count=count, block=5000 # 5秒超时 )
if events:
    _, event_list = events[0]
    return event_list

return []
def acknowledge_event(stream_key: str, group_name: str, event_id: str): """确认已处理的事件。""" r.xack(stream_key, group_name, event_id)

Example: Processing events with consumer group

示例:使用消费者组处理事件

def process_events(stream_key: str, group_name: str, consumer_name: str): """Process events from stream.""" create_consumer_group(stream_key, group_name)
while True:
    events = read_from_group(stream_key, group_name, consumer_name, count=10)

    for event_id, event_data in events:
        try:
            # Process event
            process_event(event_data)

            # Acknowledge successful processing
            acknowledge_event(stream_key, group_name, event_id)
        except Exception as e:
            print(f"Failed to process event {event_id}: {e}")
            # Event remains unacknowledged for retry
undefined
def process_events(stream_key: str, group_name: str, consumer_name: str): """处理流中的事件。""" create_consumer_group(stream_key, group_name)
while True:
    events = read_from_group(stream_key, group_name, consumer_name, count=10)

    for event_id, event_data in events:
        try:
            # 处理事件
            process_event(event_data)

            # 确认处理成功
            acknowledge_event(stream_key, group_name, event_id)
        except Exception as e:
            print(f"处理事件 {event_id} 失败: {e}")
            # 事件保持未确认状态,以便重试
undefined

Performance Optimization

性能优化

Pipelining for Batch Operations

批量操作流水线

python
undefined
python
undefined

Without pipelining (slow - multiple round trips)

不使用流水线(慢 - 多次往返)

for i in range(1000): r.set(f"key:{i}", f"value:{i}")
for i in range(1000): r.set(f"key:{i}", f"value:{i}")

With pipelining (fast - single round trip)

使用流水线(快 - 单次往返)

pipe = r.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") results = pipe.execute()
pipe = r.pipeline() for i in range(1000): pipe.set(f"key:{i}", f"value:{i}") results = pipe.execute()

Pipelining with reads

带读取操作的流水线

pipe = r.pipeline() for i in range(100): pipe.get(f"key:{i}") values = pipe.execute()
pipe = r.pipeline() for i in range(100): pipe.get(f"key:{i}") values = pipe.execute()

Builder pattern with pipeline

流水线构建器模式

class DataLoader: def init(self): self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
    """Add user data."""
    self.pipeline.hset(f"user:{user_id}", mapping=user_data)
    return self

def add_to_set(self, set_name: str, value: str):
    """Add to set."""
    self.pipeline.sadd(set_name, value)
    return self

def execute(self):
    """Execute all pipelined commands."""
    return self.pipeline.execute()
class DataLoader: def init(self): self.pipeline = r.pipeline()
def add_user(self, user_id: int, user_data: dict):
    """添加用户数据。"""
    self.pipeline.hset(f"user:{user_id}", mapping=user_data)
    return self

def add_to_set(self, set_name: str, value: str):
    """添加到集合。"""
    self.pipeline.sadd(set_name, value)
    return self

def execute(self):
    """执行所有流水线命令。"""
    return self.pipeline.execute()

Usage

使用

loader = DataLoader() results = (loader .add_user(1, {"name": "Alice", "email": "alice@example.com"}) .add_user(2, {"name": "Bob", "email": "bob@example.com"}) .add_to_set("active_users", "1") .add_to_set("active_users", "2") .execute())
undefined
loader = DataLoader() results = (loader .add_user(1, {"name": "Alice", "email": "alice@example.com"}) .add_user(2, {"name": "Bob", "email": "bob@example.com"}) .add_to_set("active_users", "1") .add_to_set("active_users", "2") .execute())
undefined

Transactions with WATCH

带WATCH的事务

python
undefined
python
undefined

Optimistic locking with WATCH

基于WATCH的乐观锁

def transfer_credits(from_user: int, to_user: int, amount: int) -> bool: """Transfer credits between users with optimistic locking."""
with r.pipeline() as pipe:
    while True:
        try:
            # Watch the keys we're going to modify
            pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")

            # Get current values
            from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
            to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)

            # Check if transfer is possible
            if from_credits < amount:
                pipe.unwatch()
                return False

            # Start transaction
            pipe.multi()
            pipe.set(f"user:{from_user}:credits", from_credits - amount)
            pipe.set(f"user:{to_user}:credits", to_credits + amount)

            # Execute transaction
            pipe.execute()
            return True

        except redis.WatchError:
            # Key was modified by another client - retry
            continue
def transfer_credits(from_user: int, to_user: int, amount: int) -> bool: """使用乐观锁在用户间转移积分。"""
with r.pipeline() as pipe:
    while True:
        try:
            # 监视要修改的键
            pipe.watch(f"user:{from_user}:credits", f"user:{to_user}:credits")

            # 获取当前值
            from_credits = int(pipe.get(f"user:{from_user}:credits") or 0)
            to_credits = int(pipe.get(f"user:{to_user}:credits") or 0)

            # 检查是否可以转移
            if from_credits < amount:
                pipe.unwatch()
                return False

            # 开始事务
            pipe.multi()
            pipe.set(f"user:{from_user}:credits", from_credits - amount)
            pipe.set(f"user:{to_user}:credits", to_credits + amount)

            # 执行事务
            pipe.execute()
            return True

        except redis.WatchError:
            # 键被其他客户端修改 - 重试
            continue

Lua scripts for atomic operations

用于原子操作的Lua脚本

increment_script = """ local current = redis.call('GET', KEYS[1]) if not current then current = 0 end local new_val = tonumber(current) + tonumber(ARGV[1]) redis.call('SET', KEYS[1], new_val) return new_val """
increment_script = """ local current = redis.call('GET', KEYS[1]) if not current then current = 0 end local new_val = tonumber(current) + tonumber(ARGV[1]) redis.call('SET', KEYS[1], new_val) return new_val """

Register and use Lua script

注册并使用Lua脚本

increment = r.register_script(increment_script) new_value = increment(keys=['counter:views'], args=[1])
undefined
increment = r.register_script(increment_script) new_value = increment(keys=['counter:views'], args=[1])
undefined

Lua Scripts for Complex Operations

复杂操作的Lua脚本

python
undefined
python
undefined

Rate limiting with Lua

基于Lua的速率限制

rate_limit_script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key)
if current == 1 then redis.call('EXPIRE', key, window) end
if current > limit then return 0 else return 1 end """
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool: """Check if user is within rate limit.""" key = f"rate_limit:{user_id}" result = rate_limiter(keys=[key], args=[limit, window]) return result == 1
rate_limit_script = """ local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local current = redis.call('INCR', key)
if current == 1 then redis.call('EXPIRE', key, window) end
if current > limit then return 0 else return 1 end """
rate_limiter = r.register_script(rate_limit_script)
def is_allowed(user_id: int, limit: int = 100, window: int = 60) -> bool: """检查用户是否在速率限制内。""" key = f"rate_limit:{user_id}" result = rate_limiter(keys=[key], args=[limit, window]) return result == 1

Get-or-set pattern with Lua

基于Lua的获取或设置模式

get_or_set_script = """ local value = redis.call('GET', KEYS[1]) if value then return value else redis.call('SET', KEYS[1], ARGV[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return ARGV[1] end """
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600): """Get value from cache or compute and cache it.""" value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
    # We set the placeholder - compute the real value
    computed = compute_fn()
    r.setex(key, ttl, computed)
    return computed

return value
undefined
get_or_set_script = """ local value = redis.call('GET', KEYS[1]) if value then return value else redis.call('SET', KEYS[1], ARGV[1]) redis.call('EXPIRE', KEYS[1], ARGV[2]) return ARGV[1] end """
get_or_set = r.register_script(get_or_set_script)
def get_or_compute(key: str, compute_fn, ttl: int = 3600): """从缓存获取值,或计算并存入缓存。""" value = get_or_set(keys=[key], args=["COMPUTING", ttl])
if value == "__COMPUTING__":
    # 我们设置了占位符 - 计算实际值
    computed = compute_fn()
    r.setex(key, ttl, computed)
    return computed

return value
undefined

Production Patterns

生产模式

High Availability with Sentinel

基于Sentinel的高可用

python
from redis.sentinel import Sentinel
python
from redis.sentinel import Sentinel

Connect to Sentinel

连接到Sentinel

sentinel = Sentinel([ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379) ], socket_timeout=0.5)
sentinel = Sentinel([ ('sentinel1', 26379), ('sentinel2', 26379), ('sentinel3', 26379) ], socket_timeout=0.5)

Get master connection

获取主节点连接

master = sentinel.master_for('mymaster', socket_timeout=0.5)
master = sentinel.master_for('mymaster', socket_timeout=0.5)

Get replica connection (for read-only operations)

获取从节点连接(用于只读操作)

replica = sentinel.slave_for('mymaster', socket_timeout=0.5)
replica = sentinel.slave_for('mymaster', socket_timeout=0.5)

Use master for writes

使用主节点进行写入

master.set('key', 'value')
master.set('key', 'value')

Use replica for reads (optional, for load distribution)

使用从节点进行读取(可选,用于负载分发)

value = replica.get('key')
undefined
value = replica.get('key')
undefined

Async Redis with asyncio

基于asyncio的异步Redis

python
import asyncio
import redis.asyncio as redis

async def async_redis_operations():
    """Async Redis operations example."""
    # Create async connection
    r = await redis.from_url("redis://localhost")

    try:
        # Async operations
        await r.set("async_key", "async_value")
        value = await r.get("async_key")
        print(f"Value: {value}")

        # Async pipeline
        async with r.pipeline(transaction=True) as pipe:
            await pipe.set("key1", "value1")
            await pipe.set("key2", "value2")
            await pipe.get("key1")
            results = await pipe.execute()

        print(f"Pipeline results: {results}")

    finally:
        await r.close()
python
import asyncio
import redis.asyncio as redis

async def async_redis_operations():
    """异步Redis操作示例。"""
    # 创建异步连接
    r = await redis.from_url("redis://localhost")

    try:
        # 异步操作
        await r.set("async_key", "async_value")
        value = await r.get("async_key")
        print(f"值: {value}")

        # 异步流水线
        async with r.pipeline(transaction=True) as pipe:
            await pipe.set("key1", "value1")
            await pipe.set("key2", "value2")
            await pipe.get("key1")
            results = await pipe.execute()

        print(f"流水线结果: {results}")

    finally:
        await r.close()

Run async operations

运行异步操作

asyncio.run(async_redis_operations())
undefined
asyncio.run(async_redis_operations())
undefined

Connection Pool Configuration

连接池配置

python
undefined
python
undefined

Production-ready connection pool

生产环境就绪的连接池

pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=50, # Max pool size socket_timeout=5, # Socket timeout socket_connect_timeout=5, # Connection timeout socket_keepalive=True, # Keep TCP connection alive socket_keepalive_options={ socket.TCP_KEEPIDLE: 60, socket.TCP_KEEPINTVL: 10, socket.TCP_KEEPCNT: 3 }, retry_on_timeout=True, # Retry on timeout health_check_interval=30, # Health check every 30s decode_responses=True # Auto-decode bytes to strings )
r = redis.Redis(connection_pool=pool)
undefined
pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=50, # 连接池最大容量 socket_timeout=5, # 套接字超时 socket_connect_timeout=5, # 连接超时 socket_keepalive=True, # 保持TCP连接 socket_keepalive_options={ socket.TCP_KEEPIDLE: 60, socket.TCP_KEEPINTVL: 10, socket.TCP_KEEPCNT: 3 }, retry_on_timeout=True, # 超时重试 health_check_interval=30, # 每30秒进行健康检查 decode_responses=True # 自动将字节解码为字符串 )
r = redis.Redis(connection_pool=pool)
undefined

Error Handling and Resilience

错误处理与弹性

python
import redis
from redis.exceptions import ConnectionError, TimeoutError
import time

class ResilientRedisClient:
    """Redis client with retry logic and circuit breaker."""

    def __init__(self, max_retries: int = 3, backoff: float = 0.1):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            socket_timeout=5,
            retry_on_timeout=True
        )
        self.max_retries = max_retries
        self.backoff = backoff

    def get_with_retry(self, key: str, default=None):
        """Get value with exponential backoff retry."""
        for attempt in range(self.max_retries):
            try:
                return self.redis.get(key) or default
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    # Log error and return default
                    print(f"Redis error after {self.max_retries} attempts: {e}")
                    return default

                # Exponential backoff
                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)

    def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
        """Set value with retry logic."""
        for attempt in range(self.max_retries):
            try:
                if ttl:
                    return self.redis.setex(key, ttl, value)
                else:
                    return self.redis.set(key, value)
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    print(f"Redis error after {self.max_retries} attempts: {e}")
                    return False

                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)
python
import redis
from redis.exceptions import ConnectionError, TimeoutError
import time

class ResilientRedisClient:
    """带重试逻辑与断路器的Redis客户端。"""

    def __init__(self, max_retries: int = 3, backoff: float = 0.1):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            socket_timeout=5,
            retry_on_timeout=True
        )
        self.max_retries = max_retries
        self.backoff = backoff

    def get_with_retry(self, key: str, default=None):
        """带指数退避重试的获取操作。"""
        for attempt in range(self.max_retries):
            try:
                return self.redis.get(key) or default
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    # 记录错误并返回默认值
                    print(f"经过{self.max_retries}次尝试后Redis出错: {e}")
                    return default

                # 指数退避
                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)

    def set_with_retry(self, key: str, value: str, ttl: int = None) -> bool:
        """带重试逻辑的设置操作。"""
        for attempt in range(self.max_retries):
            try:
                if ttl:
                    return self.redis.setex(key, ttl, value)
                else:
                    return self.redis.set(key, value)
            except (ConnectionError, TimeoutError) as e:
                if attempt == self.max_retries - 1:
                    print(f"经过{self.max_retries}次尝试后Redis出错: {e}")
                    return False

                wait_time = self.backoff * (2 ** attempt)
                time.sleep(wait_time)

Monitoring and Metrics

监控与指标

python
def get_redis_info(section: str = None) -> dict:
    """Get Redis server information."""
    return r.info(section=section)

def monitor_memory_usage():
    """Monitor Redis memory usage."""
    info = r.info('memory')

    used_memory = info['used_memory_human']
    peak_memory = info['used_memory_peak_human']
    memory_fragmentation = info['mem_fragmentation_ratio']

    print(f"Used Memory: {used_memory}")
    print(f"Peak Memory: {peak_memory}")
    print(f"Fragmentation Ratio: {memory_fragmentation}")

    return info

def monitor_stats():
    """Monitor Redis statistics."""
    info = r.info('stats')

    total_connections = info['total_connections_received']
    total_commands = info['total_commands_processed']
    ops_per_sec = info['instantaneous_ops_per_sec']

    print(f"Total Connections: {total_connections}")
    print(f"Total Commands: {total_commands}")
    print(f"Ops/sec: {ops_per_sec}")

    return info

def get_slow_log(count: int = 10):
    """Get slow query log."""
    slow_log = r.slowlog_get(count)

    for entry in slow_log:
        print(f"Command: {entry['command']}")
        print(f"Duration: {entry['duration']} microseconds")
        print(f"Time: {entry['start_time']}")
        print("---")

    return slow_log
python
def get_redis_info(section: str = None) -> dict:
    """获取Redis服务器信息。"""
    return r.info(section=section)

def monitor_memory_usage():
    """监控Redis内存使用情况。"""
    info = r.info('memory')

    used_memory = info['used_memory_human']
    peak_memory = info['used_memory_peak_human']
    memory_fragmentation = info['mem_fragmentation_ratio']

    print(f"已用内存: {used_memory}")
    print(f"峰值内存: {peak_memory}")
    print(f"内存碎片率: {memory_fragmentation}")

    return info

def monitor_stats():
    """监控Redis统计信息。"""
    info = r.info('stats')

    total_connections = info['total_connections_received']
    total_commands = info['total_commands_processed']
    ops_per_sec = info['instantaneous_ops_per_sec']

    print(f"总连接数: {total_connections}")
    print(f"总命令数: {total_commands}")
    print(f"每秒操作数: {ops_per_sec}")

    return info

def get_slow_log(count: int = 10):
    """获取慢查询日志。"""
    slow_log = r.slowlog_get(count)

    for entry in slow_log:
        print(f"命令: {entry['command']}")
        print(f"耗时: {entry['duration']} 微秒")
        print(f"时间: {entry['start_time']}")
        print("---")

    return slow_log

Best Practices

最佳实践

Key Naming Conventions

键命名规范

Use consistent, hierarchical naming:
python
undefined
使用一致的分层命名:
python
undefined

Good naming patterns

良好的命名规范

user:123:profile # User profile data user:123:sessions:abc # User session cache:product:456 # Cached product queue:emails:pending # Email queue lock:resource:789 # Resource lock counter:api:requests:daily # Daily API request counter leaderboard:global:score # Global leaderboard
user:123:profile # 用户资料数据 user:123:sessions:abc # 用户会话 cache:product:456 # 缓存的产品数据 queue:emails:pending # 待发送邮件队列 lock:resource:789 # 资源锁 counter:api:requests:daily # 每日API请求计数器 leaderboard:global:score # 全球排行榜

Avoid

避免使用

u123 # Too cryptic user_profile_123 # Underscores less common 123:user # Wrong hierarchy
undefined
u123 # 过于晦涩 user_profile_123 # 下划线使用较少 123:user # 层级错误
undefined

Memory Management

内存管理

python
undefined
python
undefined

Set TTL on all temporary data

为所有临时数据设置TTL

r.setex("temp:data", 3600, value) # Expires in 1 hour
r.setex("temp:data", 3600, value) # 1小时后过期

Limit collection sizes

限制集合大小

r.lpush("activity_log", entry) r.ltrim("activity_log", 0, 999) # Keep only 1000 items
r.lpush("activity_log", entry) r.ltrim("activity_log", 0, 999) # 仅保留1000条

Use appropriate data structures

使用合适的数据结构

Hash is more memory-efficient than multiple keys

哈希比多个独立键更节省内存

r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})
r.hset("user:123", mapping={"name": "Alice", "email": "alice@example.com"})

vs

对比

r.set("user:123:name", "Alice") r.set("user:123:email", "alice@example.com")
r.set("user:123:name", "Alice") r.set("user:123:email", "alice@example.com")

Monitor memory usage

监控内存使用

if r.info('memory')['used_memory'] > threshold: # Implement eviction or cleanup cleanup_old_data()
undefined
if r.info('memory')['used_memory'] > threshold: # 实现淘汰或清理逻辑 cleanup_old_data()
undefined

Security

安全

python
undefined
python
undefined

Use authentication

使用身份验证

r = redis.Redis( host='localhost', port=6379, password='your-secure-password', username='your-username' # Redis 6+ )
r = redis.Redis( host='localhost', port=6379, password='your-secure-password', username='your-username' # Redis 6+ )

Use SSL/TLS for production

生产环境使用SSL/TLS

pool = redis.ConnectionPool( host='redis.example.com', port=6380, connection_class=redis.SSLConnection, ssl_cert_reqs='required', ssl_ca_certs='/path/to/ca-cert.pem' )
pool = redis.ConnectionPool( host='redis.example.com', port=6380, connection_class=redis.SSLConnection, ssl_cert_reqs='required', ssl_ca_certs='/path/to/ca-cert.pem' )

Credential provider pattern

凭证提供者模式

from redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password") r = redis.Redis( host="localhost", port=6379, credential_provider=creds_provider )
undefined
from redis import UsernamePasswordCredentialProvider
creds_provider = UsernamePasswordCredentialProvider("username", "password") r = redis.Redis( host="localhost", port=6379, credential_provider=creds_provider )
undefined

Testing

测试

python
import fakeredis
import pytest

@pytest.fixture
def redis_client():
    """Provide fake Redis client for testing."""
    return fakeredis.FakeRedis(decode_responses=True)

def test_caching(redis_client):
    """Test caching logic."""
    # Test cache miss
    assert redis_client.get("test_key") is None

    # Test cache set
    redis_client.setex("test_key", 60, "test_value")
    assert redis_client.get("test_key") == "test_value"

    # Test expiration
    assert redis_client.ttl("test_key") <= 60

def test_session_management(redis_client):
    """Test session operations."""
    session_manager = SessionManager(redis_client)

    # Create session
    session_id = session_manager.create_session(user_id=123)
    assert session_id is not None

    # Get session
    session = session_manager.get_session(session_id)
    assert session['user_id'] == 123

    # Delete session
    assert session_manager.delete_session(session_id) is True
    assert session_manager.get_session(session_id) is None
python
import fakeredis
import pytest

@pytest.fixture
def redis_client():
    """为测试提供模拟Redis客户端。"""
    return fakeredis.FakeRedis(decode_responses=True)

def test_caching(redis_client):
    """测试缓存逻辑。"""
    # 测试缓存未命中
    assert redis_client.get("test_key") is None

    # 测试缓存设置
    redis_client.setex("test_key", 60, "test_value")
    assert redis_client.get("test_key") == "test_value"

    # 测试过期
    assert redis_client.ttl("test_key") <= 60

def test_session_management(redis_client):
    """测试会话操作。"""
    session_manager = SessionManager(redis_client)

    # 创建会话
    session_id = session_manager.create_session(user_id=123)
    assert session_id is not None

    # 获取会话
    session = session_manager.get_session(session_id)
    assert session['user_id'] == 123

    # 删除会话
    assert session_manager.delete_session(session_id) is True
    assert session_manager.get_session(session_id) is None

Examples

示例

Example 1: User Session Management with Redis

示例1:基于Redis的用户会话管理

python
import redis
import json
import uuid
from datetime import datetime, timedelta

class UserSessionManager:
    """Complete user session management with Redis."""

    def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
        self.redis = redis_client
        self.ttl = ttl

    def create_session(self, user_id: int, user_data: dict = None) -> str:
        """Create new user session."""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "last_accessed": datetime.utcnow().isoformat(),
            "data": user_data or {}
        }

        # Store session with TTL
        self.redis.setex(session_key, self.ttl, json.dumps(session_data))

        # Track user's active sessions
        self.redis.sadd(f"user:{user_id}:sessions", session_id)

        return session_id

    def get_session(self, session_id: str) -> dict:
        """Get session and refresh TTL."""
        session_key = f"session:{session_id}"
        session_data = self.redis.get(session_key)

        if session_data:
            session = json.loads(session_data)
            session['last_accessed'] = datetime.utcnow().isoformat()

            # Refresh TTL
            self.redis.setex(session_key, self.ttl, json.dumps(session))

            return session

        return None

    def delete_session(self, session_id: str) -> bool:
        """Delete session."""
        session = self.get_session(session_id)
        if not session:
            return False

        user_id = session['user_id']

        # Remove session
        self.redis.delete(f"session:{session_id}")

        # Remove from user's session set
        self.redis.srem(f"user:{user_id}:sessions", session_id)

        return True

    def delete_all_user_sessions(self, user_id: int):
        """Delete all sessions for a user."""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        for session_id in session_ids:
            self.redis.delete(f"session:{session_id}")

        self.redis.delete(sessions_key)

    def get_user_sessions(self, user_id: int) -> list:
        """Get all active sessions for a user."""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        sessions = []
        for session_id in session_ids:
            session = self.get_session(session_id)
            if session:
                session['session_id'] = session_id
                sessions.append(session)

        return sessions
python
import redis
import json
import uuid
from datetime import datetime, timedelta

class UserSessionManager:
    """基于Redis的完整用户会话管理器。"""

    def __init__(self, redis_client: redis.Redis, ttl: int = 1800):
        self.redis = redis_client
        self.ttl = ttl

    def create_session(self, user_id: int, user_data: dict = None) -> str:
        """创建新用户会话。"""
        session_id = str(uuid.uuid4())
        session_key = f"session:{session_id}"

        session_data = {
            "user_id": user_id,
            "created_at": datetime.utcnow().isoformat(),
            "last_accessed": datetime.utcnow().isoformat(),
            "data": user_data or {}
        }

        # 存储会话并设置TTL
        self.redis.setex(session_key, self.ttl, json.dumps(session_data))

        # 跟踪用户的活跃会话
        self.redis.sadd(f"user:{user_id}:sessions", session_id)

        return session_id

    def get_session(self, session_id: str) -> dict:
        """获取会话并刷新TTL。"""
        session_key = f"session:{session_id}"
        session_data = self.redis.get(session_key)

        if session_data:
            session = json.loads(session_data)
            session['last_accessed'] = datetime.utcnow().isoformat()

            # 刷新TTL
            self.redis.setex(session_key, self.ttl, json.dumps(session))

            return session

        return None

    def delete_session(self, session_id: str) -> bool:
        """删除会话。"""
        session = self.get_session(session_id)
        if not session:
            return False

        user_id = session['user_id']

        # 删除会话
        self.redis.delete(f"session:{session_id}")

        # 从用户会话集合中移除
        self.redis.srem(f"user:{user_id}:sessions", session_id)

        return True

    def delete_all_user_sessions(self, user_id: int):
        """删除用户的所有会话。"""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        for session_id in session_ids:
            self.redis.delete(f"session:{session_id}")

        self.redis.delete(sessions_key)

    def get_user_sessions(self, user_id: int) -> list:
        """获取用户的所有活跃会话。"""
        sessions_key = f"user:{user_id}:sessions"
        session_ids = self.redis.smembers(sessions_key)

        sessions = []
        for session_id in session_ids:
            session = self.get_session(session_id)
            if session:
                session['session_id'] = session_id
                sessions.append(session)

        return sessions

Usage

使用

r = redis.Redis(decode_responses=True) session_mgr = UserSessionManager(r)
r = redis.Redis(decode_responses=True) session_mgr = UserSessionManager(r)

Create session

创建会话

session_id = session_mgr.create_session( user_id=123, user_data={"role": "admin", "permissions": ["read", "write"]} )
session_id = session_mgr.create_session( user_id=123, user_data={"role": "admin", "permissions": ["read", "write"]} )

Get session

获取会话

session = session_mgr.get_session(session_id) print(f"User ID: {session['user_id']}")
session = session_mgr.get_session(session_id) print(f"用户ID: {session['user_id']}")

List all user sessions

列出用户所有会话

sessions = session_mgr.get_user_sessions(123) print(f"Active sessions: {len(sessions)}")
sessions = session_mgr.get_user_sessions(123) print(f"活跃会话数: {len(sessions)}")

Logout (delete session)

登出(删除会话)

session_mgr.delete_session(session_id)
undefined
session_mgr.delete_session(session_id)
undefined

Example 2: Real-Time Leaderboard

示例2:实时排行榜

python
import redis
import time

class Leaderboard:
    """Real-time leaderboard using Redis sorted sets."""

    def __init__(self, redis_client: redis.Redis, name: str):
        self.redis = redis_client
        self.key = f"leaderboard:{name}"

    def add_score(self, player_id: str, score: float):
        """Add or update player score."""
        self.redis.zadd(self.key, {player_id: score})

    def increment_score(self, player_id: str, increment: float):
        """Increment player score."""
        self.redis.zincrby(self.key, increment, player_id)

    def get_top(self, count: int = 10) -> list:
        """Get top players."""
        # ZREVRANGE for highest scores first
        players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)

        return [
            {
                "rank": idx + 1,
                "player_id": player_id,
                "score": score
            }
            for idx, (player_id, score) in enumerate(players)
        ]

    def get_rank(self, player_id: str) -> dict:
        """Get player rank and score."""
        score = self.redis.zscore(self.key, player_id)
        if score is None:
            return None

        # ZREVRANK for rank (0-indexed, highest first)
        rank = self.redis.zrevrank(self.key, player_id)

        return {
            "player_id": player_id,
            "rank": rank + 1 if rank is not None else None,
            "score": score
        }

    def get_around(self, player_id: str, count: int = 5) -> list:
        """Get players around a specific player."""
        rank = self.redis.zrevrank(self.key, player_id)
        if rank is None:
            return []

        # Get players before and after
        start = max(0, rank - count)
        end = rank + count

        players = self.redis.zrevrange(self.key, start, end, withscores=True)

        return [
            {
                "rank": start + idx + 1,
                "player_id": pid,
                "score": score,
                "is_current": pid == player_id
            }
            for idx, (pid, score) in enumerate(players)
        ]

    def get_total_players(self) -> int:
        """Get total number of players."""
        return self.redis.zcard(self.key)

    def remove_player(self, player_id: str) -> bool:
        """Remove player from leaderboard."""
        return self.redis.zrem(self.key, player_id) > 0
python
import redis
import time

class Leaderboard:
    """基于Redis有序集合的实时排行榜。"""

    def __init__(self, redis_client: redis.Redis, name: str):
        self.redis = redis_client
        self.key = f"leaderboard:{name}"

    def add_score(self, player_id: str, score: float):
        """添加或更新玩家分数。"""
        self.redis.zadd(self.key, {player_id: score})

    def increment_score(self, player_id: str, increment: float):
        """递增玩家分数。"""
        self.redis.zincrby(self.key, increment, player_id)

    def get_top(self, count: int = 10) -> list:
        """获取排行榜前N名。"""
        # ZREVRANGE用于降序排列(分数从高到低)
        players = self.redis.zrevrange(self.key, 0, count - 1, withscores=True)

        return [
            {
                "rank": idx + 1,
                "player_id": player_id,
                "score": score
            }
            for idx, (player_id, score) in enumerate(players)
        ]

    def get_rank(self, player_id: str) -> dict:
        """获取玩家排名与分数。"""
        score = self.redis.zscore(self.key, player_id)
        if score is None:
            return None

        # ZREVRANK用于降序排名
        rank = self.redis.zrevrank(self.key, player_id)

        return {
            "player_id": player_id,
            "rank": rank + 1 if rank is not None else None,
            "score": score
        }

    def get_around(self, player_id: str, count: int = 5) -> list:
        """获取指定玩家附近的玩家。"""
        rank = self.redis.zrevrank(self.key, player_id)
        if rank is None:
            return []

        # 获取前后的玩家
        start = max(0, rank - count)
        end = rank + count

        players = self.redis.zrevrange(self.key, start, end, withscores=True)

        return [
            {
                "rank": start + idx + 1,
                "player_id": pid,
                "score": score,
                "is_current": pid == player_id
            }
            for idx, (pid, score) in enumerate(players)
        ]

    def get_total_players(self) -> int:
        """获取总玩家数。"""
        return self.redis.zcard(self.key)

    def remove_player(self, player_id: str) -> bool:
        """从排行榜移除玩家。"""
        return self.redis.zrem(self.key, player_id) > 0

Usage

使用

r = redis.Redis(decode_responses=True) leaderboard = Leaderboard(r, "global")
r = redis.Redis(decode_responses=True) leaderboard = Leaderboard(r, "global")

Add scores

添加分数

leaderboard.add_score("alice", 1500) leaderboard.add_score("bob", 2000) leaderboard.add_score("charlie", 1800) leaderboard.increment_score("alice", 200) # alice now at 1700
leaderboard.add_score("alice", 1500) leaderboard.add_score("bob", 2000) leaderboard.add_score("charlie", 1800) leaderboard.increment_score("alice", 200) # alice现在1700分

Get top 10

获取前10名

top_players = leaderboard.get_top(10) for player in top_players: print(f"#{player['rank']}: {player['player_id']} - {player['score']}")
top_players = leaderboard.get_top(10) for player in top_players: print(f"#{player['rank']}: {player['player_id']} - {player['score']}")

Get player rank

获取玩家排名

alice_stats = leaderboard.get_rank("alice") print(f"Alice is rank {alice_stats['rank']} with {alice_stats['score']} points")
alice_stats = leaderboard.get_rank("alice") print(f"Alice排名 {alice_stats['rank']},分数 {alice_stats['score']}")

Get players around alice

获取Alice附近的玩家

nearby = leaderboard.get_around("alice", count=2) for player in nearby: marker = " <-- YOU" if player['is_current'] else "" print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
undefined
nearby = leaderboard.get_around("alice", count=2) for player in nearby: marker = " <-- 你" if player['is_current'] else "" print(f"#{player['rank']}: {player['player_id']} - {player['score']}{marker}")
undefined

Example 3: Distributed Rate Limiter

示例3:分布式速率限制器

python
import redis
import time

class RateLimiter:
    """Distributed rate limiter using Redis."""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

        # Lua script for atomic rate limiting
        self.rate_limit_script = self.redis.register_script("""
            local key = KEYS[1]
            local limit = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])

            local current = redis.call('INCR', key)

            if current == 1 then
                redis.call('EXPIRE', key, window)
            end

            if current > limit then
                return {0, limit, current - 1}
            else
                return {1, limit, current}
            end
        """)

    def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
        """
        Check if request is within rate limit.

        Args:
            identifier: User ID, IP address, or API key
            limit: Maximum requests allowed
            window: Time window in seconds

        Returns:
            dict with allowed (bool), limit, current, remaining
        """
        key = f"rate_limit:{identifier}:{int(time.time() // window)}"

        allowed, max_limit, current = self.rate_limit_script(
            keys=[key],
            args=[limit, window]
        )

        return {
            "allowed": bool(allowed),
            "limit": max_limit,
            "current": current,
            "remaining": max(0, max_limit - current),
            "reset_at": (int(time.time() // window) + 1) * window
        }

    def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
        """
        Sliding window rate limiter using sorted sets.
        More accurate but slightly more expensive.
        """
        key = f"rate_limit:sliding:{identifier}"
        now = time.time()
        window_start = now - window

        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)

        # Count current requests
        current = self.redis.zcard(key)

        if current < limit:
            # Add new request
            self.redis.zadd(key, {str(now): now})
            self.redis.expire(key, window)

            return {
                "allowed": True,
                "limit": limit,
                "current": current + 1,
                "remaining": limit - current - 1
            }
        else:
            return {
                "allowed": False,
                "limit": limit,
                "current": current,
                "remaining": 0
            }
python
import redis
import time

class RateLimiter:
    """基于Redis的分布式速率限制器。"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

        # 用于原子速率限制的Lua脚本
        self.rate_limit_script = self.redis.register_script("""
            local key = KEYS[1]
            local limit = tonumber(ARGV[1])
            local window = tonumber(ARGV[2])

            local current = redis.call('INCR', key)

            if current == 1 then
                redis.call('EXPIRE', key, window)
            end

            if current > limit then
                return {0, limit, current - 1}
            else
                return {1, limit, current}
            end
        """)

    def check_rate_limit(self, identifier: str, limit: int, window: int) -> dict:
        """
        检查请求是否在速率限制内。

        参数:
            identifier: 用户ID、IP地址或API密钥
            limit: 允许的最大请求数
            window: 时间窗口(秒)

        返回:
            包含allowed(布尔值)、limit、current、remaining的字典
        """
        key = f"rate_limit:{identifier}:{int(time.time() // window)}"

        allowed, max_limit, current = self.rate_limit_script(
            keys=[key],
            args=[limit, window]
        )

        return {
            "allowed": bool(allowed),
            "limit": max_limit,
            "current": current,
            "remaining": max(0, max_limit - current),
            "reset_at": (int(time.time() // window) + 1) * window
        }

    def sliding_window_check(self, identifier: str, limit: int, window: int) -> dict:
        """
        基于有序集合的滑动窗口速率限制器。
        精度更高但开销略大。
        """
        key = f"rate_limit:sliding:{identifier}"
        now = time.time()
        window_start = now - window

        # 删除旧条目
        self.redis.zremrangebyscore(key, 0, window_start)

        # 统计当前请求数
        current = self.redis.zcard(key)

        if current < limit:
            # 添加新请求
            self.redis.zadd(key, {str(now): now})
            self.redis.expire(key, window)

            return {
                "allowed": True,
                "limit": limit,
                "current": current + 1,
                "remaining": limit - current - 1
            }
        else:
            return {
                "allowed": False,
                "limit": limit,
                "current": current,
                "remaining": 0
            }

Usage

使用

r = redis.Redis(decode_responses=True) limiter = RateLimiter(r)
r = redis.Redis(decode_responses=True) limiter = RateLimiter(r)

API rate limiting: 100 requests per minute

API速率限制:每分钟100次请求

user_id = "user_123" result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]: print(f"Request allowed. {result['remaining']} requests remaining.") # Process request else: print(f"Rate limit exceeded. Try again at {result['reset_at']}") # Return 429 Too Many Requests
user_id = "user_123" result = limiter.check_rate_limit(user_id, limit=100, window=60)
if result["allowed"]: print(f"请求允许。剩余请求数: {result['remaining']}") # 处理请求 else: print(f"超出速率限制。请在 {result['reset_at']} 后重试") # 返回429 Too Many Requests

More accurate sliding window

更精确的滑动窗口

result = limiter.sliding_window_check(user_id, limit=100, window=60)
undefined
result = limiter.sliding_window_check(user_id, limit=100, window=60)
undefined

Example 4: Distributed Job Queue

示例4:分布式任务队列

python
import redis
import json
import time
import uuid
from typing import Optional, Callable

class JobQueue:
    """Distributed job queue with Redis."""

    def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
        self.redis = redis_client
        self.queue_name = queue_name
        self.queue_key = f"queue:{queue_name}"
        self.processing_key = f"queue:{queue_name}:processing"

    def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
        """
        Add job to queue.

        Args:
            job_type: Type of job (for routing to workers)
            payload: Job data
            priority: Higher priority = processed first (0 = normal)

        Returns:
            job_id
        """
        job_id = str(uuid.uuid4())

        job_data = {
            "id": job_id,
            "type": job_type,
            "payload": payload,
            "enqueued_at": time.time(),
            "attempts": 0
        }

        # Add to queue (use ZADD for priority queue)
        score = -priority  # Negative for higher priority first
        self.redis.zadd(self.queue_key, {json.dumps(job_data): score})

        return job_id

    def dequeue(self, timeout: int = 0) -> Optional[dict]:
        """
        Get next job from queue.

        Args:
            timeout: Block for this many seconds (0 = no blocking)

        Returns:
            Job data or None
        """
        # Get highest priority job (lowest score)
        jobs = self.redis.zrange(self.queue_key, 0, 0)

        if not jobs:
            if timeout > 0:
                time.sleep(min(timeout, 1))
                return self.dequeue(timeout - 1)
            return None

        job_json = jobs[0]

        # Move to processing set atomically
        pipe = self.redis.pipeline()
        pipe.zrem(self.queue_key, job_json)
        pipe.zadd(self.processing_key, {job_json: time.time()})
        pipe.execute()

        job_data = json.loads(job_json)
        job_data['attempts'] += 1

        return job_data

    def complete(self, job_data: dict):
        """Mark job as completed."""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # Remove from processing
        self.redis.zrem(self.processing_key, job_json)

    def retry(self, job_data: dict, delay: int = 0):
        """Retry failed job."""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # Remove from processing
        self.redis.zrem(self.processing_key, job_json)

        # Re-enqueue with delay
        if delay > 0:
            time.sleep(delay)

        self.redis.zadd(self.queue_key, {job_json: 0})

    def get_stats(self) -> dict:
        """Get queue statistics."""
        return {
            "queued": self.redis.zcard(self.queue_key),
            "processing": self.redis.zcard(self.processing_key)
        }
python
import redis
import json
import time
import uuid
from typing import Optional, Callable

class JobQueue:
    """基于Redis的分布式任务队列。"""

    def __init__(self, redis_client: redis.Redis, queue_name: str = "default"):
        self.redis = redis_client
        self.queue_name = queue_name
        self.queue_key = f"queue:{queue_name}"
        self.processing_key = f"queue:{queue_name}:processing"

    def enqueue(self, job_type: str, payload: dict, priority: int = 0) -> str:
        """
        将任务加入队列。

        参数:
            job_type: 任务类型(用于路由到对应工作器)
            payload: 任务数据
            priority: 优先级越高越先处理(0为普通)

        返回:
            job_id
        """
        job_id = str(uuid.uuid4())

        job_data = {
            "id": job_id,
            "type": job_type,
            "payload": payload,
            "enqueued_at": time.time(),
            "attempts": 0
        }

        # 加入队列(使用ZADD实现优先级队列)
        score = -priority  # 负数实现优先级高的先处理
        self.redis.zadd(self.queue_key, {json.dumps(job_data): score})

        return job_id

    def dequeue(self, timeout: int = 0) -> Optional[dict]:
        """
        从队列获取下一个任务。

        参数:
            timeout: 阻塞等待时间(秒,0为非阻塞)

        返回:
            任务数据或None
        """
        # 获取最高优先级任务(分数最低)
        jobs = self.redis.zrange(self.queue_key, 0, 0)

        if not jobs:
            if timeout > 0:
                time.sleep(min(timeout, 1))
                return self.dequeue(timeout - 1)
            return None

        job_json = jobs[0]

        # 原子性地移至处理中集合
        pipe = self.redis.pipeline()
        pipe.zrem(self.queue_key, job_json)
        pipe.zadd(self.processing_key, {job_json: time.time()})
        pipe.execute()

        job_data = json.loads(job_json)
        job_data['attempts'] += 1

        return job_data

    def complete(self, job_data: dict):
        """标记任务为已完成。"""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # 从处理中集合移除
        self.redis.zrem(self.processing_key, job_json)

    def retry(self, job_data: dict, delay: int = 0):
        """重试失败的任务。"""
        job_json = json.dumps({
            k: v for k, v in job_data.items()
            if k != 'attempts'
        })

        # 从处理中集合移除
        self.redis.zrem(self.processing_key, job_json)

        # 延迟后重新加入队列
        if delay > 0:
            time.sleep(delay)

        self.redis.zadd(self.queue_key, {job_json: 0})

    def get_stats(self) -> dict:
        """获取队列统计信息。"""
        return {
            "queued": self.redis.zcard(self.queue_key),
            "processing": self.redis.zcard(self.processing_key)
        }

Worker example

工作器示例

class Worker: """Job worker."""
def __init__(self, queue: JobQueue, handlers: dict):
    self.queue = queue
    self.handlers = handlers

def process_jobs(self):
    """Process jobs from queue."""
    print("Worker started. Waiting for jobs...")

    while True:
        job = self.queue.dequeue(timeout=5)

        if job:
            print(f"Processing job {job['id']} (type: {job['type']})")

            try:
                # Get handler for job type
                handler = self.handlers.get(job['type'])

                if handler:
                    handler(job['payload'])
                    self.queue.complete(job)
                    print(f"Job {job['id']} completed")
                else:
                    print(f"No handler for job type: {job['type']}")
                    self.queue.complete(job)

            except Exception as e:
                print(f"Job {job['id']} failed: {e}")

                if job['attempts'] < 3:
                    # Retry with exponential backoff
                    delay = 2 ** job['attempts']
                    print(f"Retrying in {delay}s...")
                    self.queue.retry(job, delay=delay)
                else:
                    print(f"Job {job['id']} failed permanently")
                    self.queue.complete(job)
class Worker: """任务工作器。"""
def __init__(self, queue: JobQueue, handlers: dict):
    self.queue = queue
    self.handlers = handlers

def process_jobs(self):
    """处理队列中的任务。"""
    print("工作器已启动。等待任务...")

    while True:
        job = self.queue.dequeue(timeout=5)

        if job:
            print(f"处理任务 {job['id']}(类型: {job['type']})")

            try:
                # 获取对应任务类型的处理器
                handler = self.handlers.get(job['type'])

                if handler:
                    handler(job['payload'])
                    self.queue.complete(job)
                    print(f"任务 {job['id']} 已完成")
                else:
                    print(f"无对应任务类型的处理器: {job['type']}")
                    self.queue.complete(job)

            except Exception as e:
                print(f"任务 {job['id']} 处理失败: {e}")

                if job['attempts'] < 3:
                    # 指数退避重试
                    delay = 2 ** job['attempts']
                    print(f"{delay}秒后重试...")
                    self.queue.retry(job, delay=delay)
                else:
                    print(f"任务 {job['id']} 永久失败")
                    self.queue.complete(job)

Usage

使用

r = redis.Redis(decode_responses=True) queue = JobQueue(r, "email_queue")
r = redis.Redis(decode_responses=True) queue = JobQueue(r, "email_queue")

Enqueue jobs

加入任务

job_id = queue.enqueue("send_email", { "to": "user@example.com", "subject": "Welcome!", "body": "Thanks for signing up." }, priority=1)
job_id = queue.enqueue("send_email", { "to": "user@example.com", "subject": "欢迎!", "body": "感谢注册。" }, priority=1)

Define handlers

定义处理器

def send_email_handler(payload): print(f"Sending email to {payload['to']}") # Email sending logic here time.sleep(1) # Simulate work
handlers = { "send_email": send_email_handler }
def send_email_handler(payload): print(f"向 {payload['to']} 发送邮件") # 邮件发送逻辑 time.sleep(1) # 模拟工作
handlers = { "send_email": send_email_handler }

Start worker

启动工作器

worker = Worker(queue, handlers)
worker = Worker(queue, handlers)

worker.process_jobs() # This blocks - run in separate process

worker.process_jobs() # 阻塞运行 - 在独立进程中执行

undefined
undefined

Example 5: Real-Time Event Streaming

示例5:实时事件流

python
import redis
import json
import time
from typing import Callable, Optional

class EventStream:
    """Real-time event streaming with Redis Streams."""

    def __init__(self, redis_client: redis.Redis, stream_name: str):
        self.redis = redis_client
        self.stream_name = stream_name

    def publish(self, event_type: str, data: dict) -> str:
        """Publish event to stream."""
        event = {
            "type": event_type,
            "data": json.dumps(data),
            "timestamp": time.time()
        }

        # Add to stream (returns auto-generated ID)
        event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
        return event_id

    def read_events(self, last_id: str = '0', count: int = 10) -> list:
        """Read events from stream."""
        events = self.redis.xread(
            {self.stream_name: last_id},
            count=count,
            block=1000  # 1 second timeout
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def create_consumer_group(self, group_name: str):
        """Create consumer group for parallel processing."""
        try:
            self.redis.xgroup_create(
                name=self.stream_name,
                groupname=group_name,
                id='0',
                mkstream=True
            )
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    def consume_events(self, group_name: str, consumer_name: str,
                      count: int = 10) -> list:
        """Consume events as part of consumer group."""
        events = self.redis.xreadgroup(
            groupname=group_name,
            consumername=consumer_name,
            streams={self.stream_name: '>'},
            count=count,
            block=5000
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def acknowledge(self, group_name: str, event_id: str):
        """Acknowledge processed event."""
        self.redis.xack(self.stream_name, group_name, event_id)

    def get_pending(self, group_name: str) -> list:
        """Get pending (unacknowledged) events."""
        pending = self.redis.xpending_range(
            name=self.stream_name,
            groupname=group_name,
            min='-',
            max='+',
            count=100
        )

        return pending
python
import redis
import json
import time
from typing import Callable, Optional

class EventStream:
    """基于Redis Streams的实时事件流。"""

    def __init__(self, redis_client: redis.Redis, stream_name: str):
        self.redis = redis_client
        self.stream_name = stream_name

    def publish(self, event_type: str, data: dict) -> str:
        """向流发布事件。"""
        event = {
            "type": event_type,
            "data": json.dumps(data),
            "timestamp": time.time()
        }

        # 加入流(返回自动生成的ID)
        event_id = self.redis.xadd(self.stream_name, event, maxlen=10000)
        return event_id

    def read_events(self, last_id: str = '0', count: int = 10) -> list:
        """从流中读取事件。"""
        events = self.redis.xread(
            {self.stream_name: last_id},
            count=count,
            block=1000  # 1秒超时
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def create_consumer_group(self, group_name: str):
        """创建消费者组以实现并行处理。"""
        try:
            self.redis.xgroup_create(
                name=self.stream_name,
                groupname=group_name,
                id='0',
                mkstream=True
            )
        except redis.ResponseError as e:
            if "BUSYGROUP" not in str(e):
                raise

    def consume_events(self, group_name: str, consumer_name: str,
                      count: int = 10) -> list:
        """以组内消费者身份消费事件。"""
        events = self.redis.xreadgroup(
            groupname=group_name,
            consumername=consumer_name,
            streams={self.stream_name: '>'},
            count=count,
            block=5000
        )

        if not events:
            return []

        _, event_list = events[0]

        return [
            {
                "id": event_id,
                "type": event_data[b'type'].decode(),
                "data": json.loads(event_data[b'data'].decode()),
                "timestamp": float(event_data[b'timestamp'])
            }
            for event_id, event_data in event_list
        ]

    def acknowledge(self, group_name: str, event_id: str):
        """确认已处理的事件。"""
        self.redis.xack(self.stream_name, group_name, event_id)

    def get_pending(self, group_name: str) -> list:
        """获取未确认的事件。"""
        pending = self.redis.xpending_range(
            name=self.stream_name,
            groupname=group_name,
            min='-',
            max='+',
            count=100
        )

        return pending

Usage Example: Activity Feed

使用示例:活动流

r = redis.Redis() activity_stream = EventStream(r, "user_activity")
r = redis.Redis() activity_stream = EventStream(r, "user_activity")

Publish events

发布事件

activity_stream.publish("user_signup", { "user_id": 123, "email": "alice@example.com" })
activity_stream.publish("post_created", { "user_id": 123, "post_id": 456, "title": "My First Post" })
activity_stream.publish("user_signup", { "user_id": 123, "email": "alice@example.com" })
activity_stream.publish("post_created", { "user_id": 123, "post_id": 456, "title": "我的第一篇帖子" })

Read events (simple consumer)

读取事件(简单消费者)

last_id = '0' while True: events = activity_stream.read_events(last_id, count=10)
for event in events:
    print(f"Event: {event['type']}")
    print(f"Data: {event['data']}")
    last_id = event['id']

if not events:
    break
last_id = '0' while True: events = activity_stream.read_events(last_id, count=10)
for event in events:
    print(f"事件: {event['type']}")
    print(f"数据: {event['data']}")
    last_id = event['id']

if not events:
    break

Consumer group example

消费者组示例

activity_stream.create_consumer_group("processors")
activity_stream.create_consumer_group("processors")

Worker consuming events

工作器消费事件

while True: events = activity_stream.consume_events( group_name="processors", consumer_name="worker-1", count=10 )
for event in events:
    try:
        # Process event
        process_event(event)

        # Acknowledge
        activity_stream.acknowledge("processors", event['id'])
    except Exception as e:
        print(f"Failed to process event {event['id']}: {e}")
        # Event remains unacknowledged for retry
undefined
while True: events = activity_stream.consume_events( group_name="processors", consumer_name="worker-1", count=10 )
for event in events:
    try:
        # 处理事件
        process_event(event)

        # 确认处理
        activity_stream.acknowledge("processors", event['id'])
    except Exception as e:
        print(f"处理事件 {event['id']} 失败: {e}")
        # 事件保持未确认状态以便重试
undefined

Example 6: Cache-Aside Pattern with Multi-Level Caching

示例6:带多级缓存的缓存旁路模式

python
import redis
import json
import hashlib
from typing import Optional, Any, Callable

class MultiLevelCache:
    """Multi-level caching with Redis and local cache."""

    def __init__(self, redis_client: redis.Redis,
                 local_cache_size: int = 100,
                 local_ttl: int = 60,
                 redis_ttl: int = 3600):
        self.redis = redis_client
        self.local_cache = {}
        self.local_cache_size = local_cache_size
        self.local_ttl = local_ttl
        self.redis_ttl = redis_ttl

    def _make_key(self, namespace: str, key: str) -> str:
        """Generate cache key."""
        return f"cache:{namespace}:{key}"

    def get(self, namespace: str, key: str,
            compute_fn: Optional[Callable] = None) -> Optional[Any]:
        """
        Get value from cache with fallback to compute function.

        Lookup order: Local cache → Redis → Compute function
        """
        cache_key = self._make_key(namespace, key)

        # Level 1: Local cache
        if cache_key in self.local_cache:
            entry = self.local_cache[cache_key]
            if time.time() < entry['expires_at']:
                return entry['value']
            else:
                del self.local_cache[cache_key]

        # Level 2: Redis cache
        redis_value = self.redis.get(cache_key)
        if redis_value:
            value = json.loads(redis_value)

            # Populate local cache
            self._set_local(cache_key, value)

            return value

        # Level 3: Compute function
        if compute_fn:
            value = compute_fn()
            if value is not None:
                self.set(namespace, key, value)
            return value

        return None

    def set(self, namespace: str, key: str, value: Any):
        """Set value in both cache levels."""
        cache_key = self._make_key(namespace, key)
        serialized = json.dumps(value)

        # Set in Redis
        self.redis.setex(cache_key, self.redis_ttl, serialized)

        # Set in local cache
        self._set_local(cache_key, value)

    def _set_local(self, key: str, value: Any):
        """Set value in local cache with LRU eviction."""
        # Simple LRU: remove oldest if at capacity
        if len(self.local_cache) >= self.local_cache_size:
            # Remove oldest entry
            oldest_key = min(
                self.local_cache.keys(),
                key=lambda k: self.local_cache[k]['expires_at']
            )
            del self.local_cache[oldest_key]

        self.local_cache[key] = {
            'value': value,
            'expires_at': time.time() + self.local_ttl
        }

    def delete(self, namespace: str, key: str):
        """Delete from all cache levels."""
        cache_key = self._make_key(namespace, key)

        # Delete from Redis
        self.redis.delete(cache_key)

        # Delete from local cache
        if cache_key in self.local_cache:
            del self.local_cache[cache_key]

    def invalidate_namespace(self, namespace: str):
        """Invalidate all keys in namespace."""
        pattern = f"cache:{namespace}:*"

        # Delete from Redis
        for key in self.redis.scan_iter(match=pattern, count=100):
            self.redis.delete(key)

        # Delete from local cache
        to_delete = [
            k for k in self.local_cache.keys()
            if k.startswith(f"cache:{namespace}:")
        ]
        for k in to_delete:
            del self.local_cache[k]
python
import redis
import json
import hashlib
from typing import Optional, Any, Callable

class MultiLevelCache:
    """带Redis与本地缓存的多级缓存。"""

    def __init__(self, redis_client: redis.Redis,
                 local_cache_size: int = 100,
                 local_ttl: int = 60,
                 redis_ttl: int = 3600):
        self.redis = redis_client
        self.local_cache = {}
        self.local_cache_size = local_cache_size
        self.local_ttl = local_ttl
        self.redis_ttl = redis_ttl

    def _make_key(self, namespace: str, key: str) -> str:
        """生成缓存键。"""
        return f"cache:{namespace}:{key}"

    def get(self, namespace: str, key: str,
            compute_fn: Optional[Callable] = None) -> Optional[Any]:
        """
        从缓存获取值,回退到计算函数。

        查找顺序: 本地缓存 → Redis → 计算函数
        """
        cache_key = self._make_key(namespace, key)

        # 一级缓存:本地缓存
        if cache_key in self.local_cache:
            entry = self.local_cache[cache_key]
            if time.time() < entry['expires_at']:
                return entry['value']
            else:
                del self.local_cache[cache_key]

        # 二级缓存:Redis
        redis_value = self.redis.get(cache_key)
        if redis_value:
            value = json.loads(redis_value)

            # 填充本地缓存
            self._set_local(cache_key, value)

            return value

        # 三级回退:计算函数
        if compute_fn:
            value = compute_fn()
            if value is not None:
                self.set(namespace, key, value)
            return value

        return None

    def set(self, namespace: str, key: str, value: Any):
        """在两级缓存中设置值。"""
        cache_key = self._make_key(namespace, key)
        serialized = json.dumps(value)

        # 存入Redis
        self.redis.setex(cache_key, self.redis_ttl, serialized)

        # 存入本地缓存
        self._set_local(cache_key, value)

    def _set_local(self, key: str, value: Any):
        """存入本地缓存,带LRU淘汰。"""
        # 简单LRU:达到容量时移除最旧条目
        if len(self.local_cache) >= self.local_cache_size:
            # 移除最旧条目
            oldest_key = min(
                self.local_cache.keys(),
                key=lambda k: self.local_cache[k]['expires_at']
            )
            del self.local_cache[oldest_key]

        self.local_cache[key] = {
            'value': value,
            'expires_at': time.time() + self.local_ttl
        }

    def delete(self, namespace: str, key: str):
        """从两级缓存中删除。"""
        cache_key = self._make_key(namespace, key)

        # 从Redis删除
        self.redis.delete(cache_key)

        # 从本地缓存删除
        if cache_key in self.local_cache:
            del self.local_cache[cache_key]

    def invalidate_namespace(self, namespace: str):
        """失效命名空间下的所有键。"""
        pattern = f"cache:{namespace}:*"

        # 从Redis删除
        for key in self.redis.scan_iter(match=pattern, count=100):
            self.redis.delete(key)

        # 从本地缓存删除
        to_delete = [
            k for k in self.local_cache.keys()
            if k.startswith(f"cache:{namespace}:")
        ]
        for k in to_delete:
            del self.local_cache[k]

Usage

使用

r = redis.Redis(decode_responses=True) cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict: """Get user with multi-level caching.""" return cache.get( namespace="users", key=str(user_id), compute_fn=lambda: database.query_user(user_id) )
r = redis.Redis(decode_responses=True) cache = MultiLevelCache(r)
def get_user(user_id: int) -> dict: """带多级缓存的用户获取。""" return cache.get( namespace="users", key=str(user_id), compute_fn=lambda: database.query_user(user_id) )

First call: Queries database, caches result

首次调用:查询数据库,缓存结果

user = get_user(123)
user = get_user(123)

Second call: Returns from local cache (fastest)

第二次调用:从本地缓存返回(最快)

user = get_user(123)
user = get_user(123)

Update user

更新用户

def update_user(user_id: int, data: dict): database.update_user(user_id, data)
# Invalidate cache
cache.delete("users", str(user_id))
def update_user(user_id: int, data: dict): database.update_user(user_id, data)
# 失效缓存
cache.delete("users", str(user_id))

Invalidate all user caches

失效所有用户缓存

cache.invalidate_namespace("users")
undefined
cache.invalidate_namespace("users")
undefined

Example 7: Geo-Location with Redis

示例7:基于Redis的地理位置

python
import redis

class GeoLocation:
    """Geo-spatial indexing and queries with Redis."""

    def __init__(self, redis_client: redis.Redis, index_name: str):
        self.redis = redis_client
        self.key = f"geo:{index_name}"

    def add_location(self, location_id: str, longitude: float, latitude: float):
        """Add location to geo index."""
        self.redis.geoadd(self.key, longitude, latitude, location_id)

    def add_locations(self, locations: list):
        """Batch add locations.

        Args:
            locations: List of (location_id, longitude, latitude) tuples
        """
        self.redis.geoadd(self.key, *[
            item for loc in locations
            for item in (loc[1], loc[2], loc[0])
        ])

    def get_position(self, location_id: str) -> tuple:
        """Get coordinates of a location."""
        result = self.redis.geopos(self.key, location_id)
        if result and result[0]:
            return result[0]  # (longitude, latitude)
        return None

    def find_nearby(self, longitude: float, latitude: float,
                   radius: float, unit: str = 'km', count: int = None) -> list:
        """
        Find locations within radius.

        Args:
            longitude: Center longitude
            latitude: Center latitude
            radius: Search radius
            unit: Distance unit ('m', 'km', 'mi', 'ft')
            count: Maximum results
        """
        args = {
            'longitude': longitude,
            'latitude': latitude,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'withcoord': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadius(self.key, **args)

        return [
            {
                'location_id': location_id,
                'distance': distance,
                'coordinates': (longitude, latitude)
            }
            for location_id, distance, (longitude, latitude) in results
        ]

    def find_nearby_member(self, location_id: str, radius: float,
                          unit: str = 'km', count: int = None) -> list:
        """Find locations near an existing member."""
        args = {
            'member': location_id,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadiusbymember(self.key, **args)

        return [
            {
                'location_id': loc_id,
                'distance': distance
            }
            for loc_id, distance in results
            if loc_id != location_id  # Exclude self
        ]

    def distance_between(self, location_id1: str, location_id2: str,
                        unit: str = 'km') -> float:
        """Calculate distance between two locations."""
        return self.redis.geodist(self.key, location_id1, location_id2, unit)
python
import redis

class GeoLocation:
    """基于Redis的地理空间索引与查询。"""

    def __init__(self, redis_client: redis.Redis, index_name: str):
        self.redis = redis_client
        self.key = f"geo:{index_name}"

    def add_location(self, location_id: str, longitude: float, latitude: float):
        """向地理索引添加位置。"""
        self.redis.geoadd(self.key, longitude, latitude, location_id)

    def add_locations(self, locations: list):
        """批量添加位置。

        参数:
            locations: (location_id, longitude, latitude)元组列表
        """
        self.redis.geoadd(self.key, *[
            item for loc in locations
            for item in (loc[1], loc[2], loc[0])
        ])

    def get_position(self, location_id: str) -> tuple:
        """获取位置坐标。"""
        result = self.redis.geopos(self.key, location_id)
        if result and result[0]:
            return result[0]  # (经度, 纬度)
        return None

    def find_nearby(self, longitude: float, latitude: float,
                   radius: float, unit: str = 'km', count: int = None) -> list:
        """
        查找指定半径内的位置。

        参数:
            longitude: 中心经度
            latitude: 中心纬度
            radius: 搜索半径
            unit: 距离单位('m', 'km', 'mi', 'ft')
            count: 最大结果数
        """
        args = {
            'longitude': longitude,
            'latitude': latitude,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'withcoord': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadius(self.key, **args)

        return [
            {
                'location_id': location_id,
                'distance': distance,
                'coordinates': (longitude, latitude)
            }
            for location_id, distance, (longitude, latitude) in results
        ]

    def find_nearby_member(self, location_id: str, radius: float,
                          unit: str = 'km', count: int = None) -> list:
        """查找指定位置附近的其他位置。"""
        args = {
            'member': location_id,
            'radius': radius,
            'unit': unit,
            'withdist': True,
            'sort': 'ASC'
        }

        if count:
            args['count'] = count

        results = self.redis.georadiusbymember(self.key, **args)

        return [
            {
                'location_id': loc_id,
                'distance': distance
            }
            for loc_id, distance in results
            if loc_id != location_id  # 排除自身
        ]

    def distance_between(self, location_id1: str, location_id2: str,
                        unit: str = 'km') -> float:
        """计算两个位置间的距离。"""
        return self.redis.geodist(self.key, location_id1, location_id2, unit)

Usage Example: Restaurant finder

使用示例:餐厅查找器

r = redis.Redis(decode_responses=True) restaurants = GeoLocation(r, "restaurants")
r = redis.Redis(decode_responses=True) restaurants = GeoLocation(r, "restaurants")

Add restaurants

添加餐厅

restaurants.add_locations([ ("rest1", -122.4194, 37.7749), # San Francisco ("rest2", -122.4068, 37.7849), ("rest3", -122.4312, 37.7652), ])
restaurants.add_locations([ ("rest1", -122.4194, 37.7749), # 旧金山 ("rest2", -122.4068, 37.7849), ("rest3", -122.4312, 37.7652), ])

Find restaurants near coordinates

查找指定坐标附近的餐厅

nearby = restaurants.find_nearby( longitude=-122.4194, latitude=37.7749, radius=5, unit='km', count=10 )
for restaurant in nearby: print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} km away")
nearby = restaurants.find_nearby( longitude=-122.4194, latitude=37.7749, radius=5, unit='km', count=10 )
for restaurant in nearby: print(f"{restaurant['location_id']}: {restaurant['distance']:.2f} 公里远")

Find restaurants near a specific restaurant

查找指定餐厅附近的餐厅

similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')
similar = restaurants.find_nearby_member("rest1", radius=2, unit='km')

Get distance between two restaurants

计算两个餐厅间的距离

distance = restaurants.distance_between("rest1", "rest2", unit='km') print(f"Distance: {distance:.2f} km")
undefined
distance = restaurants.distance_between("rest1", "rest2", unit='km') print(f"距离: {distance:.2f} 公里")
undefined

Quick Reference

快速参考

Common Operations

常见操作

python
undefined
python
undefined

Connection

连接

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

Strings

字符串

r.set('key', 'value') r.setex('key', 3600, 'value') # With TTL r.get('key') r.incr('counter')
r.set('key', 'value') r.setex('key', 3600, 'value') # 带TTL r.get('key') r.incr('counter')

Hashes

哈希

r.hset('user:123', 'name', 'Alice') r.hset('user:123', mapping={'name': 'Alice', 'age': 30}) r.hget('user:123', 'name') r.hgetall('user:123')
r.hset('user:123', 'name', 'Alice') r.hset('user:123', mapping={'name': 'Alice', 'age': 30}) r.hget('user:123', 'name') r.hgetall('user:123')

Lists

列表

r.lpush('queue', 'item') r.rpush('queue', 'item') r.lpop('queue') r.lrange('queue', 0, -1)
r.lpush('queue', 'item') r.rpush('queue', 'item') r.lpop('queue') r.lrange('queue', 0, -1)

Sets

集合

r.sadd('tags', 'python', 'redis') r.smembers('tags') r.sismember('tags', 'python')
r.sadd('tags', 'python', 'redis') r.smembers('tags') r.sismember('tags', 'python')

Sorted Sets

有序集合

r.zadd('leaderboard', {'alice': 100, 'bob': 200}) r.zrange('leaderboard', 0, -1, withscores=True) r.zrank('leaderboard', 'alice')
r.zadd('leaderboard', {'alice': 100, 'bob': 200}) r.zrange('leaderboard', 0, -1, withscores=True) r.zrank('leaderboard', 'alice')

Expiration

过期

r.expire('key', 3600) r.ttl('key')
r.expire('key', 3600) r.ttl('key')

Pipelining

流水线

pipe = r.pipeline() pipe.set('key1', 'value1') pipe.set('key2', 'value2') results = pipe.execute()
undefined
pipe = r.pipeline() pipe.set('key1', 'value1') pipe.set('key2', 'value2') results = pipe.execute()
undefined

Time Complexity

时间复杂度

  • GET, SET: O(1)
  • HGET, HSET: O(1)
  • LPUSH, RPUSH, LPOP, RPOP: O(1)
  • SADD, SREM, SISMEMBER: O(1)
  • ZADD, ZREM: O(log(N))
  • ZRANGE, ZREVRANGE: O(log(N)+M) where M is result size
  • SCAN, SSCAN, HSCAN, ZSCAN: O(1) per iteration

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: State Management, Distributed Systems, Performance Optimization Compatible With: redis-py, Redis 6.0+, Redis 7.0+
  • GET, SET: O(1)
  • HGET, HSET: O(1)
  • LPUSH, RPUSH, LPOP, RPOP: O(1)
  • SADD, SREM, SISMEMBER: O(1)
  • ZADD, ZREM: O(log(N))
  • ZRANGE, ZREVRANGE: O(log(N)+M) 其中M为结果数量
  • SCAN, SSCAN, HSCAN, ZSCAN: O(1) 每次迭代

技能版本: 1.0.0 最后更新: 2025年10月 技能分类: 状态管理、分布式系统、性能优化 兼容版本: redis-py, Redis 6.0+, Redis 7.0+