Go Select

Go 的 select 语句是用于处理多个 channel 操作的强大工具,它允许 goroutine 同时等待多个通信操作。select 会阻塞,直到其中一个 case 可以执行,然后执行该 case。

下面是一个示例:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 1. 基本的 select 使用
func basicSelectExample() {
    fmt.Println("=== 基本 select 示例 ===")

    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "来自 ch1 的消息"
    }()

    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "来自 ch2 的消息"
    }()

    // select 会等待第一个可用的 channel
    select {
    case msg1 := <-ch1:
        fmt.Printf("收到: %s\n", msg1)
    case msg2 := <-ch2:
        fmt.Printf("收到: %s\n", msg2)
    }
    fmt.Println()
}

// 2. 带超时的 select
func selectWithTimeout() {
    fmt.Println("=== 带超时的 select ===")

    ch := make(chan string)

    go func() {
        time.Sleep(3 * time.Second) // 模拟耗时操作
        ch <- "操作完成"
    }()

    select {
    case result := <-ch:
        fmt.Printf("成功: %s\n", result)
    case <-time.After(2 * time.Second): // 2秒超时
        fmt.Println("操作超时!")
    }
    fmt.Println()
}

// 3. 非阻塞的 channel 操作(使用 default)
func nonBlockingSelect() {
    fmt.Println("=== 非阻塞 select ===")

    ch := make(chan string, 1) // 缓冲大小为 1

    // 尝试发送(不会阻塞)
    select {
    case ch <- "消息":
        fmt.Println("消息已发送")
    default:
        fmt.Println("发送失败,channel 已满")
    }

    // 尝试接收(不会阻塞)
    select {
    case msg := <-ch:
        fmt.Printf("收到: %s\n", msg)
    default:
        fmt.Println("没有消息可接收")
    }

    // 再次尝试接收(此时 channel 为空)
    select {
    case msg := <-ch:
        fmt.Printf("收到: %s\n", msg)
    default:
        fmt.Println("没有消息可接收")
    }
    fmt.Println()
}

// 4. 多路复用:同时监听多个 channel
func multiplexingExample() {
    fmt.Println("=== 多路复用示例 ===")

    worker1 := make(chan string)
    worker2 := make(chan string)
    done := make(chan bool)

    // 启动两个工作协程
    go worker("Worker 1", worker1, done)
    go worker("Worker 2", worker2, done)

    var results []string
    activeWorkers := 2

    for activeWorkers > 0 {
        select {
        case result := <-worker1:
            results = append(results, result)
            fmt.Printf("收到 Worker1 结果: %s\n", result)
        case result := <-worker2:
            results = append(results, result)
            fmt.Printf("收到 Worker2 结果: %s\n", result)
        case <-done:
            activeWorkers--
            fmt.Printf("一个 worker 完成,剩余: %d\n", activeWorkers)
        }
    }

    fmt.Printf("所有结果: %v\n", results)
    fmt.Println()
}

func worker(name string, resultChan chan<- string, done chan<- bool) {
    defer func() { done <- true }()

    // 模拟工作时间
    workTime := time.Duration(rand.Intn(3)+1) * time.Second
    time.Sleep(workTime)

    resultChan <- fmt.Sprintf("%s 工作了 %v", name, workTime)
}

// 5. 使用 select 实现优雅关闭
func gracefulShutdownExample() {
    fmt.Println("=== 优雅关闭示例 ===")

    jobs := make(chan int, 5)
    shutdown := make(chan bool)
    done := make(chan bool)

    // 启动工作协程
    go func() {
        for {
            select {
            case job := <-jobs:
                fmt.Printf("处理任务 %d\n", job)
                time.Sleep(500 * time.Millisecond)
            case <-shutdown:
                fmt.Println("收到关闭信号,完成剩余工作...")
                // 处理剩余任务
                for {
                    select {
                    case job := <-jobs:
                        fmt.Printf("处理剩余任务 %d\n", job)
                    default:
                        fmt.Println("所有任务完成,关闭 worker")
                        done <- true
                        return
                    }
                }
            }
        }
    }()

    // 发送一些任务
    for i := 1; i <= 5; i++ {
        jobs <- i
    }

    // 等待一会儿,然后发送关闭信号
    time.Sleep(1 * time.Second)
    close(shutdown) // 发送关闭信号

    // 再发送一些任务(这些应该会被处理)
    for i := 6; i <= 8; i++ {
        jobs <- i
    }

    <-done // 等待 worker 完成
    fmt.Println("Worker 已关闭")
    fmt.Println()
}

// 6. 定时器与 Ticker 结合 select
func timerAndTickerExample() {
    fmt.Println("=== 定时器和 Ticker 示例 ===")

    ticker := time.NewTicker(500 * time.Millisecond)
    timer := time.NewTimer(3 * time.Second)
    done := make(chan bool)

    go func() {
        for {
            select {
            case <-done:
                fmt.Println("协程结束")
                return
            case t := <-ticker.C:
                fmt.Printf("Tick at %v\n", t.Format("15:04:05.000"))
            case <-timer.C:
                fmt.Println("定时器触发!")
                ticker.Stop()
                done <- true
                return
            }
        }
    }()

    <-done
    fmt.Println()
}

// 7. 优先级选择
func prioritySelectExample() {
    fmt.Println("=== 优先级选择示例 ===")

    highPriority := make(chan string, 10)
    lowPriority := make(chan string, 10)
    done := make(chan bool)

    // 填充一些数据
    go func() {
        for i := 0; i < 3; i++ {
            lowPriority <- fmt.Sprintf("低优先级消息 %d", i)
            time.Sleep(100 * time.Millisecond)
        }
        for i := 0; i < 3; i++ {
            highPriority <- fmt.Sprintf("高优先级消息 %d", i)
            time.Sleep(100 * time.Millisecond)
        }
        time.Sleep(1 * time.Second)
        done <- true
    }()

    var messages []string

    for {
        select {
        case msg := <-highPriority:
            messages = append(messages, msg)
            fmt.Printf("处理高优先级: %s\n", msg)
        case <-done:
            fmt.Printf("所有消息: %v\n", messages)
            fmt.Println()
            return
        default:
            // 如果没有高优先级消息,处理低优先级
            select {
            case msg := <-lowPriority:
                messages = append(messages, msg)
                fmt.Printf("处理低优先级: %s\n", msg)
            default:
                // 都没有消息,短暂等待
                time.Sleep(50 * time.Millisecond)
            }
        }
    }
}

// 8. 使用 select 实现限制并发
func rateLimitingExample() {
    fmt.Println("=== 速率限制示例 ===")

    requests := make(chan int, 10)
    limiter := time.Tick(200 * time.Millisecond) // 每200ms一个令牌

    // 发送请求
    for i := 1; i <= 5; i++ {
        requests <- i
    }
    close(requests)

    // 处理请求,但受速率限制
    for req := range requests {
        <-limiter // 等待令牌
        fmt.Printf("处理请求 %d at %v\n", req, time.Now().Format("15:04:05.000"))
    }
    fmt.Println()
}

// 9. 复杂的多路复用:任务分发和结果收集
func taskDispatcherExample() {
    fmt.Println("=== 任务分发器示例 ===")

    type Task struct {
        ID   int
        Data string
    }

    tasks := make(chan Task, 10)
    results := make(chan string, 10)
    done := make(chan bool)
    workers := 3
    var wg sync.WaitGroup

    // 启动 workers
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range tasks {
                // 模拟处理时间
                time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
                result := fmt.Sprintf("Worker%d 处理了任务%d: %s", workerID, task.ID, task.Data)
                results <- result
            }
        }(i)
    }

    // 结果收集器
    go func() {
        wg.Wait()
        close(results)
        done <- true
    }()

    // 发送任务
    for i := 0; i < 8; i++ {
        tasks <- Task{ID: i, Data: fmt.Sprintf("数据%d", i)}
    }
    close(tasks)

    // 收集结果
    var allResults []string
    collecting := true

    for collecting {
        select {
        case result, ok := <-results:
            if ok {
                allResults = append(allResults, result)
                fmt.Println(result)
            } else {
                collecting = false
            }
        case <-time.After(1 * time.Second):
            fmt.Println("收集超时")
            collecting = false
        }
    }

    <-done
    fmt.Printf("共收集到 %d 个结果\n", len(allResults))
    fmt.Println()
}

// 10. 高级模式:使用 select 实现超时控制组
func timeoutGroupExample() {
    fmt.Println("=== 超时控制组示例 ===")

    fastWorker := func() chan string {
        ch := make(chan string)
        go func() {
            time.Sleep(100 * time.Millisecond)
            ch <- "快速任务完成"
        }()
        return ch
    }

    slowWorker := func() chan string {
        ch := make(chan string)
        go func() {
            time.Sleep(2 * time.Second)
            ch <- "慢速任务完成"
        }()
        return ch
    }

    // 启动多个任务
    tasks := []chan string{
        fastWorker(),
        slowWorker(),
        fastWorker(),
    }

    timeout := time.After(1 * time.Second)
    var completed []string

    for i, taskCh := range tasks {
        select {
        case result := <-taskCh:
            completed = append(completed, result)
            fmt.Printf("任务 %d 完成: %s\n", i, result)
        case <-timeout:
            fmt.Printf("超时!已完成 %d/%d 个任务\n", len(completed), len(tasks))
            return
        }
    }

    fmt.Println("所有任务完成!")
    fmt.Println()
}

func main() {
    rand.Seed(time.Now().UnixNano())

    basicSelectExample()
    selectWithTimeout()
    nonBlockingSelect()
    multiplexingExample()
    gracefulShutdownExample()
    timerAndTickerExample()
    prioritySelectExample()
    rateLimitingExample()
    taskDispatcherExample()
    timeoutGroupExample()

    fmt.Println("=== Select 关键特性总结 ===")
    fmt.Println("1. 随机选择: 当多个 case 同时就绪时,随机选择一个")
    fmt.Println("2. 阻塞特性: 没有 default 时会阻塞直到有 case 就绪")
    fmt.Println("3. 超时控制: 结合 time.After 实现超时")
    fmt.Println("4. 非阻塞操作: 使用 default 实现非阻塞")
    fmt.Println("5. 多路复用: 同时监听多个 channel")
    fmt.Println("6. 优雅关闭: 通过关闭 channel 通知退出")
}

Select 关键特性

  1. ​ 随机选择

当多个 case 同时就绪时,Go 会随机选择一个执行,这保证了公平性。

  1. ​ 阻塞行为 ​

没有 default:阻塞直到有 case 就绪

有 default:非阻塞,立即执行 default

  1. ​ 常见模式 ​

​ 超时控制 ​:time.After()

​ 非阻塞操作 ​:使用 default

​ 优雅关闭 ​:通过关闭 channel 通知

​ 速率限制 ​:time.Tick()

  1. ​ 最佳实践 ​

总是处理超时情况

使用 close()来广播关闭信号

避免在 select 中执行耗时操作

合理使用缓冲 channel 防止阻塞

Select 是 Go 并发编程的核心特性,熟练掌握它对于编写高效的并发程序至关重要。

通关密语:select