Go Select
Go 的 select 语句是用于处理多个 channel 操作的强大工具,它允许 goroutine 同时等待多个通信操作。select 会阻塞,直到其中一个 case 可以执行,然后执行该 case。
下面是一个示例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 1. 基本的 select 使用
func basicSelectExample() {
fmt.Println("=== 基本 select 示例 ===")
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "来自 ch1 的消息"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "来自 ch2 的消息"
}()
// select 会等待第一个可用的 channel
select {
case msg1 := <-ch1:
fmt.Printf("收到: %s\n", msg1)
case msg2 := <-ch2:
fmt.Printf("收到: %s\n", msg2)
}
fmt.Println()
}
// 2. 带超时的 select
func selectWithTimeout() {
fmt.Println("=== 带超时的 select ===")
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second) // 模拟耗时操作
ch <- "操作完成"
}()
select {
case result := <-ch:
fmt.Printf("成功: %s\n", result)
case <-time.After(2 * time.Second): // 2秒超时
fmt.Println("操作超时!")
}
fmt.Println()
}
// 3. 非阻塞的 channel 操作(使用 default)
func nonBlockingSelect() {
fmt.Println("=== 非阻塞 select ===")
ch := make(chan string, 1) // 缓冲大小为 1
// 尝试发送(不会阻塞)
select {
case ch <- "消息":
fmt.Println("消息已发送")
default:
fmt.Println("发送失败,channel 已满")
}
// 尝试接收(不会阻塞)
select {
case msg := <-ch:
fmt.Printf("收到: %s\n", msg)
default:
fmt.Println("没有消息可接收")
}
// 再次尝试接收(此时 channel 为空)
select {
case msg := <-ch:
fmt.Printf("收到: %s\n", msg)
default:
fmt.Println("没有消息可接收")
}
fmt.Println()
}
// 4. 多路复用:同时监听多个 channel
func multiplexingExample() {
fmt.Println("=== 多路复用示例 ===")
worker1 := make(chan string)
worker2 := make(chan string)
done := make(chan bool)
// 启动两个工作协程
go worker("Worker 1", worker1, done)
go worker("Worker 2", worker2, done)
var results []string
activeWorkers := 2
for activeWorkers > 0 {
select {
case result := <-worker1:
results = append(results, result)
fmt.Printf("收到 Worker1 结果: %s\n", result)
case result := <-worker2:
results = append(results, result)
fmt.Printf("收到 Worker2 结果: %s\n", result)
case <-done:
activeWorkers--
fmt.Printf("一个 worker 完成,剩余: %d\n", activeWorkers)
}
}
fmt.Printf("所有结果: %v\n", results)
fmt.Println()
}
func worker(name string, resultChan chan<- string, done chan<- bool) {
defer func() { done <- true }()
// 模拟工作时间
workTime := time.Duration(rand.Intn(3)+1) * time.Second
time.Sleep(workTime)
resultChan <- fmt.Sprintf("%s 工作了 %v", name, workTime)
}
// 5. 使用 select 实现优雅关闭
func gracefulShutdownExample() {
fmt.Println("=== 优雅关闭示例 ===")
jobs := make(chan int, 5)
shutdown := make(chan bool)
done := make(chan bool)
// 启动工作协程
go func() {
for {
select {
case job := <-jobs:
fmt.Printf("处理任务 %d\n", job)
time.Sleep(500 * time.Millisecond)
case <-shutdown:
fmt.Println("收到关闭信号,完成剩余工作...")
// 处理剩余任务
for {
select {
case job := <-jobs:
fmt.Printf("处理剩余任务 %d\n", job)
default:
fmt.Println("所有任务完成,关闭 worker")
done <- true
return
}
}
}
}
}()
// 发送一些任务
for i := 1; i <= 5; i++ {
jobs <- i
}
// 等待一会儿,然后发送关闭信号
time.Sleep(1 * time.Second)
close(shutdown) // 发送关闭信号
// 再发送一些任务(这些应该会被处理)
for i := 6; i <= 8; i++ {
jobs <- i
}
<-done // 等待 worker 完成
fmt.Println("Worker 已关闭")
fmt.Println()
}
// 6. 定时器与 Ticker 结合 select
func timerAndTickerExample() {
fmt.Println("=== 定时器和 Ticker 示例 ===")
ticker := time.NewTicker(500 * time.Millisecond)
timer := time.NewTimer(3 * time.Second)
done := make(chan bool)
go func() {
for {
select {
case <-done:
fmt.Println("协程结束")
return
case t := <-ticker.C:
fmt.Printf("Tick at %v\n", t.Format("15:04:05.000"))
case <-timer.C:
fmt.Println("定时器触发!")
ticker.Stop()
done <- true
return
}
}
}()
<-done
fmt.Println()
}
// 7. 优先级选择
func prioritySelectExample() {
fmt.Println("=== 优先级选择示例 ===")
highPriority := make(chan string, 10)
lowPriority := make(chan string, 10)
done := make(chan bool)
// 填充一些数据
go func() {
for i := 0; i < 3; i++ {
lowPriority <- fmt.Sprintf("低优先级消息 %d", i)
time.Sleep(100 * time.Millisecond)
}
for i := 0; i < 3; i++ {
highPriority <- fmt.Sprintf("高优先级消息 %d", i)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(1 * time.Second)
done <- true
}()
var messages []string
for {
select {
case msg := <-highPriority:
messages = append(messages, msg)
fmt.Printf("处理高优先级: %s\n", msg)
case <-done:
fmt.Printf("所有消息: %v\n", messages)
fmt.Println()
return
default:
// 如果没有高优先级消息,处理低优先级
select {
case msg := <-lowPriority:
messages = append(messages, msg)
fmt.Printf("处理低优先级: %s\n", msg)
default:
// 都没有消息,短暂等待
time.Sleep(50 * time.Millisecond)
}
}
}
}
// 8. 使用 select 实现限制并发
func rateLimitingExample() {
fmt.Println("=== 速率限制示例 ===")
requests := make(chan int, 10)
limiter := time.Tick(200 * time.Millisecond) // 每200ms一个令牌
// 发送请求
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
// 处理请求,但受速率限制
for req := range requests {
<-limiter // 等待令牌
fmt.Printf("处理请求 %d at %v\n", req, time.Now().Format("15:04:05.000"))
}
fmt.Println()
}
// 9. 复杂的多路复用:任务分发和结果收集
func taskDispatcherExample() {
fmt.Println("=== 任务分发器示例 ===")
type Task struct {
ID int
Data string
}
tasks := make(chan Task, 10)
results := make(chan string, 10)
done := make(chan bool)
workers := 3
var wg sync.WaitGroup
// 启动 workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for task := range tasks {
// 模拟处理时间
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
result := fmt.Sprintf("Worker%d 处理了任务%d: %s", workerID, task.ID, task.Data)
results <- result
}
}(i)
}
// 结果收集器
go func() {
wg.Wait()
close(results)
done <- true
}()
// 发送任务
for i := 0; i < 8; i++ {
tasks <- Task{ID: i, Data: fmt.Sprintf("数据%d", i)}
}
close(tasks)
// 收集结果
var allResults []string
collecting := true
for collecting {
select {
case result, ok := <-results:
if ok {
allResults = append(allResults, result)
fmt.Println(result)
} else {
collecting = false
}
case <-time.After(1 * time.Second):
fmt.Println("收集超时")
collecting = false
}
}
<-done
fmt.Printf("共收集到 %d 个结果\n", len(allResults))
fmt.Println()
}
// 10. 高级模式:使用 select 实现超时控制组
func timeoutGroupExample() {
fmt.Println("=== 超时控制组示例 ===")
fastWorker := func() chan string {
ch := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch <- "快速任务完成"
}()
return ch
}
slowWorker := func() chan string {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "慢速任务完成"
}()
return ch
}
// 启动多个任务
tasks := []chan string{
fastWorker(),
slowWorker(),
fastWorker(),
}
timeout := time.After(1 * time.Second)
var completed []string
for i, taskCh := range tasks {
select {
case result := <-taskCh:
completed = append(completed, result)
fmt.Printf("任务 %d 完成: %s\n", i, result)
case <-timeout:
fmt.Printf("超时!已完成 %d/%d 个任务\n", len(completed), len(tasks))
return
}
}
fmt.Println("所有任务完成!")
fmt.Println()
}
func main() {
rand.Seed(time.Now().UnixNano())
basicSelectExample()
selectWithTimeout()
nonBlockingSelect()
multiplexingExample()
gracefulShutdownExample()
timerAndTickerExample()
prioritySelectExample()
rateLimitingExample()
taskDispatcherExample()
timeoutGroupExample()
fmt.Println("=== Select 关键特性总结 ===")
fmt.Println("1. 随机选择: 当多个 case 同时就绪时,随机选择一个")
fmt.Println("2. 阻塞特性: 没有 default 时会阻塞直到有 case 就绪")
fmt.Println("3. 超时控制: 结合 time.After 实现超时")
fmt.Println("4. 非阻塞操作: 使用 default 实现非阻塞")
fmt.Println("5. 多路复用: 同时监听多个 channel")
fmt.Println("6. 优雅关闭: 通过关闭 channel 通知退出")
}
Select 关键特性
- 随机选择
当多个 case 同时就绪时,Go 会随机选择一个执行,这保证了公平性。
- 阻塞行为
没有 default:阻塞直到有 case 就绪
有 default:非阻塞,立即执行 default
- 常见模式
超时控制 :time.After()
非阻塞操作 :使用 default
优雅关闭 :通过关闭 channel 通知
速率限制 :time.Tick()
- 最佳实践
总是处理超时情况
使用 close()来广播关闭信号
避免在 select 中执行耗时操作
合理使用缓冲 channel 防止阻塞
Select 是 Go 并发编程的核心特性,熟练掌握它对于编写高效的并发程序至关重要。
通关密语:select