Go Redis

Redis 是一个开源的内存数据结构存储系统,常用作数据库、缓存和消息代理。它支持多种数据结构,包括字符串、哈希、列表、集合、有序集合等。

Go 语言中最常用的 Redis 客户端库是 go-redis。

Go 连接 Redis 非常简单,这是一段示例代码:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "strconv"
    "sync"
    "time"

    "github.com/redis/go-redis/v9"
)

// 1. 基本 Redis 操作
func basicRedisOperations() {
    fmt.Println("=== 基本 Redis 操作 ===")

    // 创建 Redis 客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379", // Redis 服务器地址
        Password: "",               // 密码
        DB:       0,                // 数据库编号
    })

    ctx := context.Background()

    // 测试连接
    pong, err := rdb.Ping(ctx).Result()
    if err != nil {
        log.Fatalf("连接 Redis 失败: %v", err)
    }
    fmt.Printf("Redis 连接成功: %s\n", pong)

    // 字符串操作
    fmt.Println("\n1. 字符串操作:")
    err = rdb.Set(ctx, "name", "Go Redis Demo", 0).Err()
    if err != nil {
        log.Fatal(err)
    }

    val, err := rdb.Get(ctx, "name").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("获取键 'name': %s\n", val)

    // 设置过期时间
    err = rdb.SetEX(ctx, "temp_key", "临时数据", 10*time.Second).Err()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("设置带过期时间的键")

    // 哈希操作
    fmt.Println("\n2. 哈希操作:")
    user := map[string]interface{}{
        "name":  "张三",
        "age":   30,
        "email": "zhang@example.com",
    }

    err = rdb.HSet(ctx, "user:1001", user).Err()
    if err != nil {
        log.Fatal(err)
    }

    // 获取哈希字段
    name, err := rdb.HGet(ctx, "user:1001", "name").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("用户姓名: %s\n", name)

    // 获取所有字段
    userData, err := rdb.HGetAll(ctx, "user:1001").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("用户数据: %+v\n", userData)

    // 列表操作
    fmt.Println("\n3. 列表操作:")
    err = rdb.RPush(ctx, "tasks", "任务1", "任务2", "任务3").Err()
    if err != nil {
        log.Fatal(err)
    }

    tasks, err := rdb.LRange(ctx, "tasks", 0, -1).Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("任务列表: %v\n", tasks)

    // 从左侧弹出任务
    task, err := rdb.LPop(ctx, "tasks").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("执行任务: %s\n", task)

    // 集合操作
    fmt.Println("\n4. 集合操作:")
    err = rdb.SAdd(ctx, "tags", "go", "redis", "database", "cache").Err()
    if err != nil {
        log.Fatal(err)
    }

    tags, err := rdb.SMembers(ctx, "tags").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("标签集合: %v\n", tags)

    // 检查成员是否存在
    isMember, err := rdb.SIsMember(ctx, "tags", "go").Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("'go' 是否在标签中: %t\n", isMember)

    // 有序集合操作
    fmt.Println("\n5. 有序集合操作:")
    zsetData := []redis.Z{
        {Score: 100, Member: "用户A"},
        {Score: 85, Member: "用户B"},
        {Score: 95, Member: "用户C"},
    }
    err = rdb.ZAdd(ctx, "leaderboard", zsetData...).Err()
    if err != nil {
        log.Fatal(err)
    }

    // 获取排名前3
    leaders, err := rdb.ZRevRangeWithScores(ctx, "leaderboard", 0, 2).Result()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("排行榜前3:")
    for i, leader := range leaders {
        fmt.Printf("第%d名: %s (分数: %.0f)\n", i+1, leader.Member, leader.Score)
    }

    rdb.Close()
}

// 2. 缓存系统示例
type CacheService struct {
    rdb *redis.Client
    ctx context.Context
}

func NewCacheService() *CacheService {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "",
        DB:       0,
    })

    return &CacheService{
        rdb: rdb,
        ctx: context.Background(),
    }
}

func (c *CacheService) Close() {
    c.rdb.Close()
}

// 设置缓存
func (c *CacheService) Set(key string, value interface{}, expiration time.Duration) error {
    data, err := json.Marshal(value)
    if err != nil {
        return err
    }
    return c.rdb.Set(c.ctx, key, data, expiration).Err()
}

// 获取缓存
func (c *CacheService) Get(key string, dest interface{}) error {
    data, err := c.rdb.Get(c.ctx, key).Result()
    if err != nil {
        return err
    }
    return json.Unmarshal([]byte(data), dest)
}

// 删除缓存
func (c *CacheService) Delete(key string) error {
    return c.rdb.Del(c.ctx, key).Err()
}

// 检查缓存是否存在
func (c *CacheService) Exists(key string) bool {
    result, err := c.rdb.Exists(c.ctx, key).Result()
    if err != nil {
        return false
    }
    return result > 0
}

// 缓存示例使用
type Product struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Price int    `json:"price"`
}

func demonstrateCache() {
    fmt.Println("\n=== 缓存系统示例 ===")

    cache := NewCacheService()
    defer cache.Close()

    // 模拟数据库查询
    getProductFromDB := func(id int) (*Product, error) {
        time.Sleep(100 * time.Millisecond) // 模拟数据库查询延迟
        return &Product{
            ID:    id,
            Name:  fmt.Sprintf("产品%d", id),
            Price: id * 100,
        }, nil
    }

    // 带缓存的查询
    getProductWithCache := func(id int) (*Product, error) {
        cacheKey := fmt.Sprintf("product:%d", id)
        var product Product

        // 先尝试从缓存获取
        if cache.Exists(cacheKey) {
            err := cache.Get(cacheKey, &product)
            if err == nil {
                fmt.Printf("从缓存获取产品 %d\n", id)
                return &product, nil
            }
        }

        // 缓存未命中,从数据库获取
        fmt.Printf("缓存未命中,从数据库查询产品 %d\n", id)
        productPtr, err := getProductFromDB(id)
        if err != nil {
            return nil, err
        }

        // 设置缓存,过期时间5分钟
        cache.Set(cacheKey, productPtr, 5*time.Minute)
        return productPtr, nil
    }

    // 测试缓存效果
    start := time.Now()

    // 第一次查询(会缓存)
    product1, err := getProductWithCache(1)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("产品1: %+v\n", product1)

    // 第二次查询(从缓存获取)
    product1Cached, err := getProductWithCache(1)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("缓存产品1: %+v\n", product1Cached)

    // 查询不同产品
    product2, err := getProductWithCache(2)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("产品2: %+v\n", product2)

    duration := time.Since(start)
    fmt.Printf("总查询时间: %v\n", duration)
}

// 3. 分布式锁示例
type DistributedLock struct {
    rdb     *redis.Client
    ctx     context.Context
    key     string
    token   string
    timeout time.Duration
}

func NewDistributedLock(rdb *redis.Client, key string, timeout time.Duration) *DistributedLock {
    return &DistributedLock{
        rdb:     rdb,
        ctx:     context.Background(),
        key:     key,
        token:   generateLockToken(),
        timeout: timeout,
    }
}

func generateLockToken() string {
    return fmt.Sprintf("lock-%d-%d", time.Now().UnixNano(), rand.Intn(1000))
}

// 获取锁
func (dl *DistributedLock) Acquire() (bool, error) {
    return dl.rdb.SetNX(dl.ctx, dl.key, dl.token, dl.timeout).Result()
}

// 释放锁
func (dl *DistributedLock) Release() error {
    // 使用 Lua 脚本确保原子性:只有锁的持有者才能释放
    script := `
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end
    `

    result, err := dl.rdb.Eval(dl.ctx, script, []string{dl.key}, dl.token).Result()
    if err != nil {
        return err
    }

    if result.(int64) == 0 {
        return fmt.Errorf("锁已被其他客户端持有")
    }

    return nil
}

// 锁示例使用
func demonstrateDistributedLock() {
    fmt.Println("\n=== 分布式锁示例 ===")

    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    lockKey := "resource:lock"
    var wg sync.WaitGroup

    // 模拟多个客户端同时竞争锁
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(clientID int) {
            defer wg.Done()

            lock := NewDistributedLock(rdb, lockKey, 10*time.Second)

            // 尝试获取锁
            acquired, err := lock.Acquire()
            if err != nil {
                log.Printf("客户端 %d 获取锁失败: %v", clientID, err)
                return
            }

            if acquired {
                fmt.Printf("客户端 %d 获取锁成功\n", clientID)

                // 模拟处理资源
                time.Sleep(2 * time.Second)
                fmt.Printf("客户端 %d 处理完成\n", clientID)

                // 释放锁
                err = lock.Release()
                if err != nil {
                    log.Printf("客户端 %d 释放锁失败: %v", clientID, err)
                } else {
                    fmt.Printf("客户端 %d 释放锁成功\n", clientID)
                }
            } else {
                fmt.Printf("客户端 %d 获取锁失败(被其他客户端持有)\n", clientID)
            }
        }(i)

        time.Sleep(500 * time.Millisecond) // 错开启动时间
    }

    wg.Wait()
}

// 4. 发布订阅示例
type PubSubService struct {
    rdb *redis.Client
    ctx context.Context
}

func NewPubSubService() *PubSubService {
    return &PubSubService{
        rdb: redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
        }),
        ctx: context.Background(),
    }
}

func (ps *PubSubService) Close() {
    ps.rdb.Close()
}

// 发布消息
func (ps *PubSubService) Publish(channel, message string) error {
    return ps.rdb.Publish(ps.ctx, channel, message).Err()
}

// 订阅频道
func (ps *PubSubService) Subscribe(channels ...string) *redis.PubSub {
    return ps.rdb.Subscribe(ps.ctx, channels...)
}

func demonstratePubSub() {
    fmt.Println("\n=== 发布订阅示例 ===")

    pubsub := NewPubSubService()
    defer pubsub.Close()

    // 启动订阅者
    subscriber := pubsub.Subscribe("news", "updates")
    defer subscriber.Close()

    // 处理接收到的消息
    go func() {
        for {
            msg, err := subscriber.ReceiveMessage(pubsub.ctx)
            if err != nil {
                log.Printf("接收消息错误: %v", err)
                return
            }
            fmt.Printf("收到消息 - 频道: %s, 内容: %s\n", msg.Channel, msg.Payload)
        }
    }()

    // 发布消息
    time.Sleep(100 * time.Millisecond) // 等待订阅者准备

    messages := []struct {
        channel string
        message string
    }{
        {"news", "今日新闻: Go 1.20 发布"},
        {"updates", "系统更新: 新增缓存功能"},
        {"news", "技术分享: Redis 最佳实践"},
    }

    for _, m := range messages {
        err := pubsub.Publish(m.channel, m.message)
        if err != nil {
            log.Printf("发布消息失败: %v", err)
        } else {
            fmt.Printf("发布消息到频道 %s: %s\n", m.channel, m.message)
        }
        time.Sleep(1 * time.Second)
    }
}

// 5. 排行榜系统示例
type LeaderboardService struct {
    rdb *redis.Client
    ctx context.Context
}

func NewLeaderboardService() *LeaderboardService {
    return &LeaderboardService{
        rdb: redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
        }),
        ctx: context.Background(),
    }
}

func (ls *LeaderboardService) Close() {
    ls.rdb.Close()
}

// 添加分数
func (ls *LeaderboardService) AddScore(userID string, score int) error {
    return ls.rdb.ZIncrBy(ls.ctx, "leaderboard", float64(score), userID).Err()
}

// 获取排名
func (ls *LeaderboardService) GetRank(userID string) (int64, error) {
    return ls.rdb.ZRevRank(ls.ctx, "leaderboard", userID).Result()
}

// 获取排行榜
func (ls *LeaderboardService) GetTopN(n int64) ([]redis.Z, error) {
    return ls.rdb.ZRevRangeWithScores(ls.ctx, "leaderboard", 0, n-1).Result()
}

// 获取用户分数
func (ls *LeaderboardService) GetScore(userID string) (float64, error) {
    return ls.rdb.ZScore(ls.ctx, "leaderboard", userID).Result()
}

func demonstrateLeaderboard() {
    fmt.Println("\n=== 排行榜系统示例 ===")

    leaderboard := NewLeaderboardService()
    defer leaderboard.Close()

    // 清空排行榜
    leaderboard.rdb.Del(leaderboard.ctx, "leaderboard")

    // 添加测试数据
    users := []struct {
        id    string
        score int
    }{
        {"user1", 1000}, {"user2", 1500}, {"user3", 800},
        {"user4", 2000}, {"user5", 1200},
    }

    for _, user := range users {
        err := leaderboard.AddScore(user.id, user.score)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("添加用户 %s 分数: %d\n", user.id, user.score)
    }

    // 获取排行榜
    top3, err := leaderboard.GetTopN(3)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println("\n排行榜前3:")
    for i, user := range top3 {
        rank, _ := leaderboard.GetRank(user.Member.(string))
        fmt.Printf("第%d名: %s (分数: %.0f, 排名: %d)\n",
            i+1, user.Member, user.Score, rank+1)
    }

    // 更新用户分数
    fmt.Println("\n更新用户分数:")
    leaderboard.AddScore("user3", 500) // user3 增加500分
    score, _ := leaderboard.GetScore("user3")
    rank, _ := leaderboard.GetRank("user3")
    fmt.Printf("user3 新分数: %.0f, 新排名: %d\n", score, rank+1)
}

// 6. 限流器示例
type RateLimiter struct {
    rdb *redis.Client
    ctx context.Context
}

func NewRateLimiter() *RateLimiter {
    return &RateLimiter{
        rdb: redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
        }),
        ctx: context.Background(),
    }
}

// 滑动窗口限流
func (rl *RateLimiter) IsAllowed(key string, limit int, window time.Duration) (bool, error) {
    now := time.Now().UnixNano()
    windowMicro := window.Microseconds()

    pipeline := rl.rdb.Pipeline()

    // 移除时间窗口之外的数据
    removeByScore := pipeline.ZRemRangeByScore(rl.ctx, key, "0",
        strconv.FormatInt(now-windowMicro, 10))

    // 添加当前请求
    add := pipeline.ZAdd(rl.ctx, key, redis.Z{
        Score:  float64(now),
        Member: strconv.FormatInt(now, 10),
    })

    // 设置过期时间
    expire := pipeline.Expire(rl.ctx, key, window)

    // 获取当前窗口内的请求数量
    count := pipeline.ZCard(rl.ctx, key)

    _, err := pipeline.Exec(rl.ctx)
    if err != nil {
        return false, err
    }

    _ = removeByScore
    _ = add
    _ = expire

    currentCount, err := count.Result()
    if err != nil {
        return false, err
    }

    return currentCount <= int64(limit), nil
}

func demonstrateRateLimiting() {
    fmt.Println("\n=== 限流器示例 ===")

    limiter := NewRateLimiter()

    // 测试限流:每分钟最多5次请求
    key := "api:user1"
    limit := 5
    window := time.Minute

    for i := 1; i <= 10; i++ {
        allowed, err := limiter.IsAllowed(key, limit, window)
        if err != nil {
            log.Fatal(err)
        }

        if allowed {
            fmt.Printf("请求 %d: 允许\n", i)
        } else {
            fmt.Printf("请求 %d: 拒绝(超过限制)\n", i)
        }

        time.Sleep(100 * time.Millisecond)
    }
}

// 7. 事务和管道示例
func demonstrateTransaction() {
    fmt.Println("\n=== 事务和管道示例 ===")

    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer rdb.Close()

    ctx := context.Background()

    // 管道操作(批量执行)
    fmt.Println("1. 管道操作:")
    pipeline := rdb.Pipeline()

    set1 := pipeline.Set(ctx, "pipe1", "value1", 0)
    set2 := pipeline.Set(ctx, "pipe2", "value2", 0)
    get1 := pipeline.Get(ctx, "pipe1")

    _, err := pipeline.Exec(ctx)
    if err != nil {
        log.Fatal(err)
    }

    value, _ := get1.Result()
    fmt.Printf("管道操作结果: %s\n", value)

    // 事务操作(Watch + Multi)
    fmt.Println("\n2. 事务操作:")
    key := "counter"

    // 监视键
    err = rdb.Watch(ctx, func(tx *redis.Tx) error {
        // 获取当前值
        current, err := tx.Get(ctx, key).Int()
        if err != nil && err != redis.Nil {
            return err
        }

        // 在事务中执行操作
        _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
            pipe.Set(ctx, key, current+1, 0)
            return nil
        })
        return err
    }, key)

    if err != nil {
        log.Printf("事务执行失败: %v", err)
    } else {
        fmt.Println("事务执行成功")
    }
}

func main() {
    if len(os.Args) < 2 {
        fmt.Println("Redis 示例用法:")
        fmt.Println("  基本操作: go run redis.go basic")
        fmt.Println("  缓存系统: go run redis.go cache")
        fmt.Println("  分布式锁: go run redis.go lock")
        fmt.Println("  发布订阅: go run redis.go pubsub")
        fmt.Println("  排行榜: go run redis.go leaderboard")
        fmt.Println("  限流器: go run redis.go ratelimit")
        fmt.Println("  事务: go run redis.go transaction")
        fmt.Println("  全部演示: go run redis.go all")
        return
    }

    mode := os.Args[1]

    switch mode {
    case "basic":
        basicRedisOperations()
    case "cache":
        demonstrateCache()
    case "lock":
        demonstrateDistributedLock()
    case "pubsub":
        demonstratePubSub()
    case "leaderboard":
        demonstrateLeaderboard()
    case "ratelimit":
        demonstrateRateLimiting()
    case "transaction":
        demonstrateTransaction()
    case "all":
        basicRedisOperations()
        demonstrateCache()
        demonstrateDistributedLock()
        demonstratePubSub()
        demonstrateLeaderboard()
        demonstrateRateLimiting()
        demonstrateTransaction()
    default:
        fmt.Println("未知模式")
    }
}

Redis 编程关键要点

  1. ​ 核心概念 ​

​ 连接管理 ​:使用 redis.NewClient()创建客户端

​ 上下文传递 ​:使用 context.Context 控制超时和取消

​ 连接池 ​:客户端自动管理连接池

  1. ​ 数据结构操作 ​

​ 字符串 ​:Set(), Get(), Incr()

​ 哈希 ​:HSet(), HGet(), HGetAll()

​ 列表 ​:LPush(), RPop(), LRange()

​ 集合 ​:SAdd(), SMembers(), SIsMember()

​ 有序集合 ​:ZAdd(), ZRange(), ZScore()

  1. ​ 高级特性 ​

​ 事务 ​:Watch(), Multi(), Exec()

​ 管道 ​:Pipeline()批量操作

​ 发布订阅 ​:Publish(), Subscribe()

​Lua 脚本 ​:Eval()执行原子操作

  1. ​ 最佳实践 ​

​ 错误处理 ​:检查所有 Redis 操作的错误

​ 连接管理 ​:及时关闭客户端连接

​ 超时设置 ​:设置合理的超时时间

​ 重试机制 ​:实现适当的重试逻辑

​ 监控指标 ​:监控 Redis 性能指标

  1. ​ 适用场景 ​

​ 缓存 ​:热点数据缓存

​ 会话存储 ​:用户会话管理

​ 消息队列 ​:异步任务处理

​ 排行榜 ​:实时排名系统

​ 分布式锁 ​:资源访问控制

​ 限流 ​:API 访问频率控制

Go 的 Redis 客户端库功能强大且易于使用,非常适合构建高性能的分布式应用。

通关密语:redis