Go 缓存

在 Go 中,缓存主要用于提高应用性能,通过将频繁访问的数据存储在快速存储介质(通常是内存)中,减少对慢速数据源(如数据库、API)的访问。

groupcache 是 Go 语言中最经典的分布式缓存库,由 Google 开发,被用于生产环境。

这是一个使用缓存的示例:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "strconv"
    "sync"
    "time"

    "github.com/golang/groupcache"
)

// 1. 基本的内存缓存实现
type InMemoryCache struct {
    mu    sync.RWMutex
    items map[string]Item
}

type Item struct {
    Value      interface{}
    Expiration int64
}

func NewInMemoryCache() *InMemoryCache {
    return &InMemoryCache{
        items: make(map[string]Item),
    }
}

func (c *InMemoryCache) Set(key string, value interface{}, duration time.Duration) {
    c.mu.Lock()
    defer c.mu.Unlock()

    var expiration int64
    if duration > 0 {
        expiration = time.Now().Add(duration).UnixNano()
    }

    c.items[key] = Item{
        Value:      value,
        Expiration: expiration,
    }
}

func (c *InMemoryCache) Get(key string) (interface{}, bool) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    item, found := c.items[key]
    if !found {
        return nil, false
    }

    if item.Expiration > 0 && time.Now().UnixNano() > item.Expiration {
        return nil, false
    }

    return item.Value, true
}

func (c *InMemoryCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}

func (c *InMemoryCache) Cleanup(interval time.Duration) {
    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for range ticker.C {
        c.mu.Lock()
        now := time.Now().UnixNano()
        for key, item := range c.items {
            if item.Expiration > 0 && now > item.Expiration {
                delete(c.items, key)
            }
        }
        c.mu.Unlock()
    }
}

// 2. 使用 groupcache 的分布式缓存示例
type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

// 模拟数据库
type UserDatabase struct {
    users map[int]*User
    mu    sync.RWMutex
}

func NewUserDatabase() *UserDatabase {
    return &UserDatabase{
        users: map[int]*User{
            1: {ID: 1, Name: "Alice", Email: "alice@example.com"},
            2: {ID: 2, Name: "Bob", Email: "bob@example.com"},
            3: {ID: 3, Name: "Charlie", Email: "charlie@example.com"},
        },
    }
}

func (db *UserDatabase) GetUser(id int) (*User, error) {
    db.mu.RLock()
    defer db.mu.RUnlock()

    user, exists := db.users[id]
    if !exists {
        return nil, fmt.Errorf("user not found: %d", id)
    }

    // 模拟数据库查询延迟
    time.Sleep(100 * time.Millisecond)
    return user, nil
}

// GroupCache 服务
type CacheServer struct {
    group *groupcache.Group
    db    *UserDatabase
}

func NewCacheServer(self string, peers ...string) *CacheServer {
    db := NewUserDatabase()

    // 创建 peer 池
    pool := groupcache.NewHTTPPool(self)
    pool.Set(peers...)

    // 创建缓存组
    userGroup := groupcache.NewGroup("users", 64<<20, groupcache.GetterFunc(
        func(ctx context.Context, key string, dest groupcache.Sink) error {
            log.Printf("缓存未命中,从数据库查询用户: %s", key)

            id, err := strconv.Atoi(key)
            if err != nil {
                return fmt.Errorf("invalid user id: %s", key)
            }

            user, err := db.GetUser(id)
            if err != nil {
                return err
            }

            // 序列化用户数据
            data, err := json.Marshal(user)
            if err != nil {
                return err
            }

            // 设置缓存过期时间(1分钟)
            dest.SetBytes(data, time.Now().Add(time.Minute))
            return nil
        },
    ))

    return &CacheServer{
        group: userGroup,
        db:    db,
    }
}

func (s *CacheServer) GetUserHandler(w http.ResponseWriter, r *http.Request) {
    userID := r.URL.Query().Get("id")
    if userID == "" {
        http.Error(w, "Missing user id", http.StatusBadRequest)
        return
    }

    var data []byte
    ctx := context.Background()

    // 从缓存获取数据
    start := time.Now()
    err := s.group.Get(ctx, userID, groupcache.AllocatingByteSliceSink(&data))
    duration := time.Since(start)

    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    var user User
    if err := json.Unmarshal(data, &user); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    response := map[string]interface{}{
        "user":     user,
        "duration": duration.String(),
        "source":   "cache",
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(response)
}

// 3. 高级缓存策略示例
type CacheWithStats struct {
    cache     *InMemoryCache
    hits      int64
    misses    int64
    mu        sync.RWMutex
}

func NewCacheWithStats() *CacheWithStats {
    c := &CacheWithStats{
        cache: NewInMemoryCache(),
    }

    // 启动清理协程
    go c.cache.Cleanup(time.Minute)

    return c
}

func (c *CacheWithStats) Get(key string) (interface{}, bool) {
    c.mu.Lock()
    defer c.mu.Unlock()

    value, found := c.cache.Get(key)
    if found {
        c.hits++
    } else {
        c.misses++
    }

    return value, found
}

func (c *CacheWithStats) Set(key string, value interface{}, duration time.Duration) {
    c.cache.Set(key, value, duration)
}

func (c *CacheWithStats) Stats() (hits, misses int64, hitRatio float64) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    total := c.hits + c.misses
    if total == 0 {
        return c.hits, c.misses, 0
    }

    return c.hits, c.misses, float64(c.hits) / float64(total) * 100
}

// 4. 缓存使用示例
func demonstrateBasicCache() {
    fmt.Println("=== 基本内存缓存示例 ===")

    cache := NewInMemoryCache()

    // 设置缓存,5秒过期
    cache.Set("greeting", "Hello, World!", 5*time.Second)
    cache.Set("number", 42, 10*time.Second) // 10秒过期
    cache.Set("permanent", "I never expire", 0) // 永不过期

    // 读取缓存
    if value, found := cache.Get("greeting"); found {
        fmt.Printf("获取到 greeting: %v\n", value)
    }

    if value, found := cache.Get("number"); found {
        fmt.Printf("获取到 number: %v\n", value)
    }

    // 等待过期
    fmt.Println("等待6秒让 greeting 过期...")
    time.Sleep(6 * time.Second)

    if value, found := cache.Get("greeting"); found {
        fmt.Printf("greeting 仍然存在: %v\n", value)
    } else {
        fmt.Println("greeting 已过期")
    }

    fmt.Println()
}

func demonstrateCacheWithStats() {
    fmt.Println("=== 带统计信息的缓存示例 ===")

    cache := NewCacheWithStats()

    // 模拟一些缓存操作
    for i := 0; i < 100; i++ {
        key := fmt.Sprintf("key%d", i%20) // 模拟局部性访问
        if _, found := cache.Get(key); !found {
            cache.Set(key, fmt.Sprintf("value%d", i), time.Minute)
        }
    }

    hits, misses, ratio := cache.Stats()
    fmt.Printf("缓存统计: 命中=%d, 未命中=%d, 命中率=%.2f%%\n", hits, misses, ratio)
    fmt.Println()
}

// 5. 实际应用:API 响应缓存
type APICache struct {
    cache *CacheWithStats
}

func NewAPICache() *APICache {
    return &APICache{
        cache: NewCacheWithStats(),
    }
}

func (ac *APICache) GetCachedResponse(key string, generateFunc func() (interface{}, error), expire time.Duration) (interface{}, error) {
    // 尝试从缓存获取
    if cached, found := ac.cache.Get(key); found {
        return cached, nil
    }

    // 缓存未命中,执行生成函数
    result, err := generateFunc()
    if err != nil {
        return nil, err
    }

    // 缓存结果
    ac.cache.Set(key, result, expire)
    return result, nil
}

func simulateExpensiveAPICall() (interface{}, error) {
    // 模拟昂贵的API调用
    time.Sleep(200 * time.Millisecond)
    return map[string]interface{}{
        "data":    "这是昂贵的API调用的结果",
        "time":    time.Now().Unix(),
        "message": "Hello from expensive API!",
    }, nil
}

func demonstrateAPICache() {
    fmt.Println("=== API 响应缓存示例 ===")

    apiCache := NewAPICache()

    // 第一次调用(未命中缓存)
    start := time.Now()
    result1, _ := apiCache.GetCachedResponse("api_data", simulateExpensiveAPICall, time.Minute)
    duration1 := time.Since(start)

    // 第二次调用(命中缓存)
    start = time.Now()
    result2, _ := apiCache.GetCachedResponse("api_data", simulateExpensiveAPICall, time.Minute)
    duration2 := time.Since(start)

    fmt.Printf("第一次调用耗时: %v\n", duration1)
    fmt.Printf("第二次调用耗时: %v\n", duration2)
    fmt.Printf("性能提升: %.2f%%\n", (float64(duration1-duration2)/float64(duration1))*100)
    fmt.Printf("结果相等: %t\n", result1 == result2)

    hits, misses, ratio := apiCache.cache.Stats()
    fmt.Printf("缓存统计: 命中=%d, 未命中=%d, 命中率=%.2f%%\n", hits, misses, ratio)
    fmt.Println()
}

// 6. 启动 GroupCache 服务器
func startGroupCacheServer(addr string, peers []string) {
    server := NewCacheServer(addr, peers...)

    http.HandleFunc("/user", server.GetUserHandler)

    fmt.Printf("启动缓存服务器: %s\n", addr)
    log.Fatal(http.ListenAndServe(addr, nil))
}

func main() {
    // 演示基本缓存
    demonstrateBasicCache()

    // 演示带统计的缓存
    demonstrateCacheWithStats()

    // 演示API缓存
    demonstrateAPICache()

    // 启动分布式缓存服务器(注释掉以避免阻塞,实际使用时取消注释)
    /*
    peers := []string{
        "http://localhost:8081",
        "http://localhost:8082",
        "http://localhost:8083",
    }

    // 在实际应用中,可以启动多个实例
    if len(os.Args) > 1 {
        addr := "http://localhost:" + os.Args[1]
        go startGroupCacheServer(":"+os.Args[1], peers)
    } else {
        go startGroupCacheServer(":8081", peers)
    }

    // 保持程序运行
    select {}
    */

    fmt.Println("=== 缓存最佳实践总结 ===")
    fmt.Println("1. 选择合适的缓存策略(LRU、TTL等)")
    fmt.Println("2. 设置合理的过期时间")
    fmt.Println("3. 监控缓存命中率")
    fmt.Println("4. 考虑缓存穿透、雪崩、击穿问题")
    fmt.Println("5. 分布式缓存要考虑一致性问题")
}

groupcache 是 Go 中最经典的分布式缓存解决方案,特别适合构建高可用的缓存系统。

通关密语:cache