Go Redis
Redis 是一个开源的内存数据结构存储系统,常用作数据库、缓存和消息代理。它支持多种数据结构,包括字符串、哈希、列表、集合、有序集合等。
Go 语言中最常用的 Redis 客户端库是 go-redis。
Go 连接 Redis 非常简单,这是一段示例代码:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"math/rand"
"strconv"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
// 1. 基本 Redis 操作
func basicRedisOperations() {
fmt.Println("=== 基本 Redis 操作 ===")
// 创建 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
Password: "", // 密码
DB: 0, // 数据库编号
})
ctx := context.Background()
// 测试连接
pong, err := rdb.Ping(ctx).Result()
if err != nil {
log.Fatalf("连接 Redis 失败: %v", err)
}
fmt.Printf("Redis 连接成功: %s\n", pong)
// 字符串操作
fmt.Println("\n1. 字符串操作:")
err = rdb.Set(ctx, "name", "Go Redis Demo", 0).Err()
if err != nil {
log.Fatal(err)
}
val, err := rdb.Get(ctx, "name").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("获取键 'name': %s\n", val)
// 设置过期时间
err = rdb.SetEX(ctx, "temp_key", "临时数据", 10*time.Second).Err()
if err != nil {
log.Fatal(err)
}
fmt.Println("设置带过期时间的键")
// 哈希操作
fmt.Println("\n2. 哈希操作:")
user := map[string]interface{}{
"name": "张三",
"age": 30,
"email": "zhang@example.com",
}
err = rdb.HSet(ctx, "user:1001", user).Err()
if err != nil {
log.Fatal(err)
}
// 获取哈希字段
name, err := rdb.HGet(ctx, "user:1001", "name").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户姓名: %s\n", name)
// 获取所有字段
userData, err := rdb.HGetAll(ctx, "user:1001").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户数据: %+v\n", userData)
// 列表操作
fmt.Println("\n3. 列表操作:")
err = rdb.RPush(ctx, "tasks", "任务1", "任务2", "任务3").Err()
if err != nil {
log.Fatal(err)
}
tasks, err := rdb.LRange(ctx, "tasks", 0, -1).Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("任务列表: %v\n", tasks)
// 从左侧弹出任务
task, err := rdb.LPop(ctx, "tasks").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("执行任务: %s\n", task)
// 集合操作
fmt.Println("\n4. 集合操作:")
err = rdb.SAdd(ctx, "tags", "go", "redis", "database", "cache").Err()
if err != nil {
log.Fatal(err)
}
tags, err := rdb.SMembers(ctx, "tags").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("标签集合: %v\n", tags)
// 检查成员是否存在
isMember, err := rdb.SIsMember(ctx, "tags", "go").Result()
if err != nil {
log.Fatal(err)
}
fmt.Printf("'go' 是否在标签中: %t\n", isMember)
// 有序集合操作
fmt.Println("\n5. 有序集合操作:")
zsetData := []redis.Z{
{Score: 100, Member: "用户A"},
{Score: 85, Member: "用户B"},
{Score: 95, Member: "用户C"},
}
err = rdb.ZAdd(ctx, "leaderboard", zsetData...).Err()
if err != nil {
log.Fatal(err)
}
// 获取排名前3
leaders, err := rdb.ZRevRangeWithScores(ctx, "leaderboard", 0, 2).Result()
if err != nil {
log.Fatal(err)
}
fmt.Println("排行榜前3:")
for i, leader := range leaders {
fmt.Printf("第%d名: %s (分数: %.0f)\n", i+1, leader.Member, leader.Score)
}
rdb.Close()
}
// 2. 缓存系统示例
type CacheService struct {
rdb *redis.Client
ctx context.Context
}
func NewCacheService() *CacheService {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
return &CacheService{
rdb: rdb,
ctx: context.Background(),
}
}
func (c *CacheService) Close() {
c.rdb.Close()
}
// 设置缓存
func (c *CacheService) Set(key string, value interface{}, expiration time.Duration) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
return c.rdb.Set(c.ctx, key, data, expiration).Err()
}
// 获取缓存
func (c *CacheService) Get(key string, dest interface{}) error {
data, err := c.rdb.Get(c.ctx, key).Result()
if err != nil {
return err
}
return json.Unmarshal([]byte(data), dest)
}
// 删除缓存
func (c *CacheService) Delete(key string) error {
return c.rdb.Del(c.ctx, key).Err()
}
// 检查缓存是否存在
func (c *CacheService) Exists(key string) bool {
result, err := c.rdb.Exists(c.ctx, key).Result()
if err != nil {
return false
}
return result > 0
}
// 缓存示例使用
type Product struct {
ID int `json:"id"`
Name string `json:"name"`
Price int `json:"price"`
}
func demonstrateCache() {
fmt.Println("\n=== 缓存系统示例 ===")
cache := NewCacheService()
defer cache.Close()
// 模拟数据库查询
getProductFromDB := func(id int) (*Product, error) {
time.Sleep(100 * time.Millisecond) // 模拟数据库查询延迟
return &Product{
ID: id,
Name: fmt.Sprintf("产品%d", id),
Price: id * 100,
}, nil
}
// 带缓存的查询
getProductWithCache := func(id int) (*Product, error) {
cacheKey := fmt.Sprintf("product:%d", id)
var product Product
// 先尝试从缓存获取
if cache.Exists(cacheKey) {
err := cache.Get(cacheKey, &product)
if err == nil {
fmt.Printf("从缓存获取产品 %d\n", id)
return &product, nil
}
}
// 缓存未命中,从数据库获取
fmt.Printf("缓存未命中,从数据库查询产品 %d\n", id)
productPtr, err := getProductFromDB(id)
if err != nil {
return nil, err
}
// 设置缓存,过期时间5分钟
cache.Set(cacheKey, productPtr, 5*time.Minute)
return productPtr, nil
}
// 测试缓存效果
start := time.Now()
// 第一次查询(会缓存)
product1, err := getProductWithCache(1)
if err != nil {
log.Fatal(err)
}
fmt.Printf("产品1: %+v\n", product1)
// 第二次查询(从缓存获取)
product1Cached, err := getProductWithCache(1)
if err != nil {
log.Fatal(err)
}
fmt.Printf("缓存产品1: %+v\n", product1Cached)
// 查询不同产品
product2, err := getProductWithCache(2)
if err != nil {
log.Fatal(err)
}
fmt.Printf("产品2: %+v\n", product2)
duration := time.Since(start)
fmt.Printf("总查询时间: %v\n", duration)
}
// 3. 分布式锁示例
type DistributedLock struct {
rdb *redis.Client
ctx context.Context
key string
token string
timeout time.Duration
}
func NewDistributedLock(rdb *redis.Client, key string, timeout time.Duration) *DistributedLock {
return &DistributedLock{
rdb: rdb,
ctx: context.Background(),
key: key,
token: generateLockToken(),
timeout: timeout,
}
}
func generateLockToken() string {
return fmt.Sprintf("lock-%d-%d", time.Now().UnixNano(), rand.Intn(1000))
}
// 获取锁
func (dl *DistributedLock) Acquire() (bool, error) {
return dl.rdb.SetNX(dl.ctx, dl.key, dl.token, dl.timeout).Result()
}
// 释放锁
func (dl *DistributedLock) Release() error {
// 使用 Lua 脚本确保原子性:只有锁的持有者才能释放
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
result, err := dl.rdb.Eval(dl.ctx, script, []string{dl.key}, dl.token).Result()
if err != nil {
return err
}
if result.(int64) == 0 {
return fmt.Errorf("锁已被其他客户端持有")
}
return nil
}
// 锁示例使用
func demonstrateDistributedLock() {
fmt.Println("\n=== 分布式锁示例 ===")
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
lockKey := "resource:lock"
var wg sync.WaitGroup
// 模拟多个客户端同时竞争锁
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(clientID int) {
defer wg.Done()
lock := NewDistributedLock(rdb, lockKey, 10*time.Second)
// 尝试获取锁
acquired, err := lock.Acquire()
if err != nil {
log.Printf("客户端 %d 获取锁失败: %v", clientID, err)
return
}
if acquired {
fmt.Printf("客户端 %d 获取锁成功\n", clientID)
// 模拟处理资源
time.Sleep(2 * time.Second)
fmt.Printf("客户端 %d 处理完成\n", clientID)
// 释放锁
err = lock.Release()
if err != nil {
log.Printf("客户端 %d 释放锁失败: %v", clientID, err)
} else {
fmt.Printf("客户端 %d 释放锁成功\n", clientID)
}
} else {
fmt.Printf("客户端 %d 获取锁失败(被其他客户端持有)\n", clientID)
}
}(i)
time.Sleep(500 * time.Millisecond) // 错开启动时间
}
wg.Wait()
}
// 4. 发布订阅示例
type PubSubService struct {
rdb *redis.Client
ctx context.Context
}
func NewPubSubService() *PubSubService {
return &PubSubService{
rdb: redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}),
ctx: context.Background(),
}
}
func (ps *PubSubService) Close() {
ps.rdb.Close()
}
// 发布消息
func (ps *PubSubService) Publish(channel, message string) error {
return ps.rdb.Publish(ps.ctx, channel, message).Err()
}
// 订阅频道
func (ps *PubSubService) Subscribe(channels ...string) *redis.PubSub {
return ps.rdb.Subscribe(ps.ctx, channels...)
}
func demonstratePubSub() {
fmt.Println("\n=== 发布订阅示例 ===")
pubsub := NewPubSubService()
defer pubsub.Close()
// 启动订阅者
subscriber := pubsub.Subscribe("news", "updates")
defer subscriber.Close()
// 处理接收到的消息
go func() {
for {
msg, err := subscriber.ReceiveMessage(pubsub.ctx)
if err != nil {
log.Printf("接收消息错误: %v", err)
return
}
fmt.Printf("收到消息 - 频道: %s, 内容: %s\n", msg.Channel, msg.Payload)
}
}()
// 发布消息
time.Sleep(100 * time.Millisecond) // 等待订阅者准备
messages := []struct {
channel string
message string
}{
{"news", "今日新闻: Go 1.20 发布"},
{"updates", "系统更新: 新增缓存功能"},
{"news", "技术分享: Redis 最佳实践"},
}
for _, m := range messages {
err := pubsub.Publish(m.channel, m.message)
if err != nil {
log.Printf("发布消息失败: %v", err)
} else {
fmt.Printf("发布消息到频道 %s: %s\n", m.channel, m.message)
}
time.Sleep(1 * time.Second)
}
}
// 5. 排行榜系统示例
type LeaderboardService struct {
rdb *redis.Client
ctx context.Context
}
func NewLeaderboardService() *LeaderboardService {
return &LeaderboardService{
rdb: redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}),
ctx: context.Background(),
}
}
func (ls *LeaderboardService) Close() {
ls.rdb.Close()
}
// 添加分数
func (ls *LeaderboardService) AddScore(userID string, score int) error {
return ls.rdb.ZIncrBy(ls.ctx, "leaderboard", float64(score), userID).Err()
}
// 获取排名
func (ls *LeaderboardService) GetRank(userID string) (int64, error) {
return ls.rdb.ZRevRank(ls.ctx, "leaderboard", userID).Result()
}
// 获取排行榜
func (ls *LeaderboardService) GetTopN(n int64) ([]redis.Z, error) {
return ls.rdb.ZRevRangeWithScores(ls.ctx, "leaderboard", 0, n-1).Result()
}
// 获取用户分数
func (ls *LeaderboardService) GetScore(userID string) (float64, error) {
return ls.rdb.ZScore(ls.ctx, "leaderboard", userID).Result()
}
func demonstrateLeaderboard() {
fmt.Println("\n=== 排行榜系统示例 ===")
leaderboard := NewLeaderboardService()
defer leaderboard.Close()
// 清空排行榜
leaderboard.rdb.Del(leaderboard.ctx, "leaderboard")
// 添加测试数据
users := []struct {
id string
score int
}{
{"user1", 1000}, {"user2", 1500}, {"user3", 800},
{"user4", 2000}, {"user5", 1200},
}
for _, user := range users {
err := leaderboard.AddScore(user.id, user.score)
if err != nil {
log.Fatal(err)
}
fmt.Printf("添加用户 %s 分数: %d\n", user.id, user.score)
}
// 获取排行榜
top3, err := leaderboard.GetTopN(3)
if err != nil {
log.Fatal(err)
}
fmt.Println("\n排行榜前3:")
for i, user := range top3 {
rank, _ := leaderboard.GetRank(user.Member.(string))
fmt.Printf("第%d名: %s (分数: %.0f, 排名: %d)\n",
i+1, user.Member, user.Score, rank+1)
}
// 更新用户分数
fmt.Println("\n更新用户分数:")
leaderboard.AddScore("user3", 500) // user3 增加500分
score, _ := leaderboard.GetScore("user3")
rank, _ := leaderboard.GetRank("user3")
fmt.Printf("user3 新分数: %.0f, 新排名: %d\n", score, rank+1)
}
// 6. 限流器示例
type RateLimiter struct {
rdb *redis.Client
ctx context.Context
}
func NewRateLimiter() *RateLimiter {
return &RateLimiter{
rdb: redis.NewClient(&redis.Options{
Addr: "localhost:6379",
}),
ctx: context.Background(),
}
}
// 滑动窗口限流
func (rl *RateLimiter) IsAllowed(key string, limit int, window time.Duration) (bool, error) {
now := time.Now().UnixNano()
windowMicro := window.Microseconds()
pipeline := rl.rdb.Pipeline()
// 移除时间窗口之外的数据
removeByScore := pipeline.ZRemRangeByScore(rl.ctx, key, "0",
strconv.FormatInt(now-windowMicro, 10))
// 添加当前请求
add := pipeline.ZAdd(rl.ctx, key, redis.Z{
Score: float64(now),
Member: strconv.FormatInt(now, 10),
})
// 设置过期时间
expire := pipeline.Expire(rl.ctx, key, window)
// 获取当前窗口内的请求数量
count := pipeline.ZCard(rl.ctx, key)
_, err := pipeline.Exec(rl.ctx)
if err != nil {
return false, err
}
_ = removeByScore
_ = add
_ = expire
currentCount, err := count.Result()
if err != nil {
return false, err
}
return currentCount <= int64(limit), nil
}
func demonstrateRateLimiting() {
fmt.Println("\n=== 限流器示例 ===")
limiter := NewRateLimiter()
// 测试限流:每分钟最多5次请求
key := "api:user1"
limit := 5
window := time.Minute
for i := 1; i <= 10; i++ {
allowed, err := limiter.IsAllowed(key, limit, window)
if err != nil {
log.Fatal(err)
}
if allowed {
fmt.Printf("请求 %d: 允许\n", i)
} else {
fmt.Printf("请求 %d: 拒绝(超过限制)\n", i)
}
time.Sleep(100 * time.Millisecond)
}
}
// 7. 事务和管道示例
func demonstrateTransaction() {
fmt.Println("\n=== 事务和管道示例 ===")
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
defer rdb.Close()
ctx := context.Background()
// 管道操作(批量执行)
fmt.Println("1. 管道操作:")
pipeline := rdb.Pipeline()
set1 := pipeline.Set(ctx, "pipe1", "value1", 0)
set2 := pipeline.Set(ctx, "pipe2", "value2", 0)
get1 := pipeline.Get(ctx, "pipe1")
_, err := pipeline.Exec(ctx)
if err != nil {
log.Fatal(err)
}
value, _ := get1.Result()
fmt.Printf("管道操作结果: %s\n", value)
// 事务操作(Watch + Multi)
fmt.Println("\n2. 事务操作:")
key := "counter"
// 监视键
err = rdb.Watch(ctx, func(tx *redis.Tx) error {
// 获取当前值
current, err := tx.Get(ctx, key).Int()
if err != nil && err != redis.Nil {
return err
}
// 在事务中执行操作
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, current+1, 0)
return nil
})
return err
}, key)
if err != nil {
log.Printf("事务执行失败: %v", err)
} else {
fmt.Println("事务执行成功")
}
}
func main() {
if len(os.Args) < 2 {
fmt.Println("Redis 示例用法:")
fmt.Println(" 基本操作: go run redis.go basic")
fmt.Println(" 缓存系统: go run redis.go cache")
fmt.Println(" 分布式锁: go run redis.go lock")
fmt.Println(" 发布订阅: go run redis.go pubsub")
fmt.Println(" 排行榜: go run redis.go leaderboard")
fmt.Println(" 限流器: go run redis.go ratelimit")
fmt.Println(" 事务: go run redis.go transaction")
fmt.Println(" 全部演示: go run redis.go all")
return
}
mode := os.Args[1]
switch mode {
case "basic":
basicRedisOperations()
case "cache":
demonstrateCache()
case "lock":
demonstrateDistributedLock()
case "pubsub":
demonstratePubSub()
case "leaderboard":
demonstrateLeaderboard()
case "ratelimit":
demonstrateRateLimiting()
case "transaction":
demonstrateTransaction()
case "all":
basicRedisOperations()
demonstrateCache()
demonstrateDistributedLock()
demonstratePubSub()
demonstrateLeaderboard()
demonstrateRateLimiting()
demonstrateTransaction()
default:
fmt.Println("未知模式")
}
}
Redis 编程关键要点
- 核心概念
连接管理 :使用 redis.NewClient()创建客户端
上下文传递 :使用 context.Context 控制超时和取消
连接池 :客户端自动管理连接池
- 数据结构操作
字符串 :Set(), Get(), Incr()
哈希 :HSet(), HGet(), HGetAll()
列表 :LPush(), RPop(), LRange()
集合 :SAdd(), SMembers(), SIsMember()
有序集合 :ZAdd(), ZRange(), ZScore()
- 高级特性
事务 :Watch(), Multi(), Exec()
管道 :Pipeline()批量操作
发布订阅 :Publish(), Subscribe()
Lua 脚本 :Eval()执行原子操作
- 最佳实践
错误处理 :检查所有 Redis 操作的错误
连接管理 :及时关闭客户端连接
超时设置 :设置合理的超时时间
重试机制 :实现适当的重试逻辑
监控指标 :监控 Redis 性能指标
- 适用场景
缓存 :热点数据缓存
会话存储 :用户会话管理
消息队列 :异步任务处理
排行榜 :实时排名系统
分布式锁 :资源访问控制
限流 :API 访问频率控制
Go 的 Redis 客户端库功能强大且易于使用,非常适合构建高性能的分布式应用。
通关密语:redis