Go UDP

UDP(User Datagram Protocol)是一种无连接的、不可靠的传输层协议。与 TCP 不同,UDP 不建立连接,不保证数据包的顺序和可靠性,但具有更低的延迟和开销。

下面是一个示例:

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"
)

// 1. 基本 UDP 服务器和客户端
func basicUDPServer(port string) {
    fmt.Printf("启动基本 UDP 服务器 :%s\n", port)

    addr, err := net.ResolveUDPAddr("udp", ":"+port)
    if err != nil {
        log.Fatalf("解析地址失败: %v", err)
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatalf("启动服务器失败: %v", err)
    }
    defer conn.Close()

    fmt.Printf("UDP 服务器监听在 %s\n", conn.LocalAddr().String())

    buffer := make([]byte, 1024)

    for {
        // 读取数据(会阻塞直到收到数据)
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取数据错误: %v", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到来自 %s 的消息: %s\n", clientAddr.String(), message)

        // 回复客户端
        response := fmt.Sprintf("服务器回复: %s", strings.ToUpper(message))
        _, err = conn.WriteToUDP([]byte(response), clientAddr)
        if err != nil {
            log.Printf("发送回复错误: %v", err)
        }
    }
}

func basicUDPClient(serverAddr string) {
    fmt.Printf("连接 UDP 服务器: %s\n", serverAddr)

    // 解析服务器地址
    remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)
    if err != nil {
        log.Fatalf("解析服务器地址失败: %v", err)
    }

    // 创建客户端连接(UDP 不需要实际连接)
    conn, err := net.DialUDP("udp", nil, remoteAddr)
    if err != nil {
        log.Fatalf("创建客户端失败: %v", err)
    }
    defer conn.Close()

    // 发送消息
    messages := []string{
        "Hello UDP!",
        "这是第二条消息",
        "测试消息",
        "quit",
    }

    for _, message := range messages {
        fmt.Printf("发送消息: %s\n", message)

        // 发送数据
        _, err = conn.Write([]byte(message))
        if err != nil {
            log.Printf("发送失败: %v", err)
            continue
        }

        // 设置读取超时
        conn.SetReadDeadline(time.Now().Add(3 * time.Second))

        // 接收回复
        buffer := make([]byte, 1024)
        n, err := conn.Read(buffer)
        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                fmt.Println("接收超时(UDP 数据包可能丢失)")
                continue
            }
            log.Printf("接收错误: %v", err)
            continue
        }

        response := string(buffer[:n])
        fmt.Printf("服务器回复: %s\n", response)

        time.Sleep(1 * time.Second)

        if message == "quit" {
            break
        }
    }
}

// 2. 并发 UDP 服务器 with 连接跟踪
type UDPServer struct {
    conn     *net.UDPConn
    clients  map[string]*net.UDPAddr // 跟踪客户端
    clientsMu sync.RWMutex
}

func NewUDPServer(port string) *UDPServer {
    addr, err := net.ResolveUDPAddr("udp", ":"+port)
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }

    return &UDPServer{
        conn:    conn,
        clients: make(map[string]*net.UDPAddr),
    }
}

func (s *UDPServer) Start() {
    fmt.Printf("并发 UDP 服务器启动 :%s\n", s.conn.LocalAddr().(*net.UDPAddr).Port)

    buffer := make([]byte, 4096) // 更大的缓冲区

    for {
        n, clientAddr, err := s.conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取错误: %v", err)
            continue
        }

        // 处理每个客户端请求的 goroutine
        go s.handleClient(clientAddr, buffer[:n])
    }
}

func (s *UDPServer) handleClient(clientAddr *net.UDPAddr, data []byte) {
    clientKey := clientAddr.String()

    // 注册新客户端
    s.clientsMu.Lock()
    if _, exists := s.clients[clientKey]; !exists {
        s.clients[clientKey] = clientAddr
        fmt.Printf("新客户端连接: %s, 总客户端数: %d\n", clientKey, len(s.clients))
    }
    s.clientsMu.Unlock()

    message := string(data)
    fmt.Printf("来自 %s: %s\n", clientKey, message)

    // 处理命令
    if strings.HasPrefix(message, "/") {
        s.handleCommand(clientAddr, message)
    } else {
        s.broadcastMessage(clientAddr, message)
    }
}

func (s *UDPServer) handleCommand(clientAddr *net.UDPAddr, command string) {
    parts := strings.Fields(command)
    if len(parts) == 0 {
        return
    }

    var response string

    switch parts[0] {
    case "/time":
        response = fmt.Sprintf("服务器时间: %s", time.Now().Format("15:04:05.000"))
    case "/clients":
        s.clientsMu.RLock()
        count := len(s.clients)
        response = fmt.Sprintf("在线客户端数: %d", count)
        s.clientsMu.RUnlock()
    case "/stats":
        response = "UDP 服务器状态: 运行中"
    default:
        response = "未知命令。可用命令: /time, /clients, /stats"
    }

    s.sendToClient(clientAddr, response)
}

func (s *UDPServer) broadcastMessage(senderAddr *net.UDPAddr, message string) {
    s.clientsMu.RLock()
    defer s.clientsMu.RUnlock()

    broadcastMsg := fmt.Sprintf("[%s]: %s", senderAddr.String(), message)

    for _, clientAddr := range s.clients {
        if clientAddr.String() != senderAddr.String() {
            go s.sendToClient(clientAddr, broadcastMsg)
        }
    }
}

func (s *UDPServer) sendToClient(clientAddr *net.UDPAddr, message string) {
    _, err := s.conn.WriteToUDP([]byte(message), clientAddr)
    if err != nil {
        log.Printf("发送给 %s 失败: %v", clientAddr.String(), err)

        // 移除失效的客户端
        s.clientsMu.Lock()
        delete(s.clients, clientAddr.String())
        s.clientsMu.Unlock()
    }
}

func (s *UDPServer) Stop() {
    s.conn.Close()
    fmt.Println("UDP 服务器已关闭")
}

// 3. UDP 广播示例
func udpBroadcastSender(broadcastAddr string, port int) {
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", broadcastAddr, port))
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.DialUDP("udp", nil, addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    // 设置广播权限
    // 注意:在某些系统上可能需要特殊权限

    fmt.Printf("开始广播到 %s:%d\n", broadcastAddr, port)

    for i := 1; i <= 5; i++ {
        message := fmt.Sprintf("广播消息 #%d - 时间: %s", i, time.Now().Format("15:04:05"))

        _, err := conn.Write([]byte(message))
        if err != nil {
            log.Printf("广播发送失败: %v", err)
            continue
        }

        fmt.Printf("发送广播: %s\n", message)
        time.Sleep(2 * time.Second)
    }
}

func udpBroadcastReceiver(port int) {
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Printf("监听广播消息端口 :%d\n", port)

    buffer := make([]byte, 1024)

    for {
        n, srcAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取广播错误: %v", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到广播来自 %s: %s\n", srcAddr.String(), message)
    }
}

// 4. 可靠 UDP 协议实现(简单重传机制)
type ReliableUDPClient struct {
    serverAddr *net.UDPAddr
    conn       *net.UDPConn
    sequence   uint32
    mu         sync.Mutex
}

func NewReliableUDPClient(server string) *ReliableUDPClient {
    serverAddr, err := net.ResolveUDPAddr("udp", server)
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.DialUDP("udp", nil, serverAddr)
    if err != nil {
        log.Fatal(err)
    }

    return &ReliableUDPClient{
        serverAddr: serverAddr,
        conn:       conn,
        sequence:   1,
    }
}

func (c *ReliableUDPClient) SendReliable(message string) error {
    c.mu.Lock()
    seq := c.sequence
    c.sequence++
    c.mu.Unlock()

    // 创建可靠消息格式:序列号|数据
    reliableMsg := fmt.Sprintf("%d|%s", seq, message)

    maxRetries := 3
    timeout := 2 * time.Second

    for attempt := 1; attempt <= maxRetries; attempt++ {
        fmt.Printf("发送消息 (序列号 %d, 尝试 %d): %s\n", seq, attempt, message)

        // 发送消息
        _, err := c.conn.Write([]byte(reliableMsg))
        if err != nil {
            return fmt.Errorf("发送失败: %v", err)
        }

        // 等待确认
        c.conn.SetReadDeadline(time.Now().Add(timeout))

        ackBuffer := make([]byte, 1024)
        n, err := c.conn.Read(ackBuffer)

        if err != nil {
            if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
                fmt.Printf("尝试 %d 超时,重试...\n", attempt)
                continue
            }
            return fmt.Errorf("接收确认失败: %v", err)
        }

        ackMsg := string(ackBuffer[:n])
        if ackMsg == fmt.Sprintf("ACK:%d", seq) {
            fmt.Printf("消息 %d 确认成功\n", seq)
            return nil
        }
    }

    return fmt.Errorf("消息 %d 发送失败,达到最大重试次数", seq)
}

func reliableUDPServer(port string) {
    addr, err := net.ResolveUDPAddr("udp", ":"+port)
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Printf("可靠 UDP 服务器启动 :%s\n", port)

    receivedSequences := make(map[uint32]bool)
    buffer := make([]byte, 1024)

    for {
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取错误: %v", err)
            continue
        }

        message := string(buffer[:n])
        parts := strings.SplitN(message, "|", 2)

        if len(parts) != 2 {
            log.Printf("无效消息格式: %s", message)
            continue
        }

        seq, err := strconv.ParseUint(parts[0], 10, 32)
        if err != nil {
            log.Printf("无效序列号: %s", parts[0])
            continue
        }

        seqNum := uint32(seq)
        data := parts[1]

        // 检查重复消息
        if receivedSequences[seqNum] {
            fmt.Printf("收到重复消息,序列号: %d, 数据: %s\n", seqNum, data)
        } else {
            receivedSequences[seqNum] = true
            fmt.Printf("收到新消息,序列号: %d, 数据: %s\n", seqNum, data)
        }

        // 发送确认
        ack := fmt.Sprintf("ACK:%d", seqNum)
        _, err = conn.WriteToUDP([]byte(ack), clientAddr)
        if err != nil {
            log.Printf("发送确认失败: %v", err)
        }

        // 简单清理过期的序列号(生产环境需要更复杂的机制)
        if len(receivedSequences) > 1000 {
            receivedSequences = make(map[uint32]bool)
        }
    }
}

// 5. UDP 多播示例
func udpMulticastSender(multicastAddr string, port int) {
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", multicastAddr, port))
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.DialUDP("udp", nil, addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Printf("多播发送到 %s:%d\n", multicastAddr, port)

    for i := 1; i <= 10; i++ {
        message := fmt.Sprintf("多播消息 #%d - 时间: %s", i, time.Now().Format("15:04:05.000"))

        _, err := conn.Write([]byte(message))
        if err != nil {
            log.Printf("多播发送失败: %v", err)
            continue
        }

        fmt.Printf("发送多播: %s\n", message)
        time.Sleep(1 * time.Second)
    }
}

func udpMulticastReceiver(multicastAddr string, port int) {
    addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", multicastAddr, port))
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.ListenMulticastUDP("udp", nil, addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Printf("加入多播组 %s:%d\n", multicastAddr, port)

    buffer := make([]byte, 1024)

    for {
        n, srcAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取多播错误: %v", err)
            continue
        }

        message := string(buffer[:n])
        fmt.Printf("收到多播来自 %s: %s\n", srcAddr.String(), message)
    }
}

// 6. 性能测试:UDP vs 简单模拟
func udpPerformanceTest(serverAddr string) {
    fmt.Println("=== UDP 性能测试 ===")

    conn, err := net.Dial("udp", serverAddr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    messageSize := 100 // 字节
    messageCount := 1000
    message := strings.Repeat("X", messageSize)

    start := time.Now()

    for i := 0; i < messageCount; i++ {
        _, err := conn.Write([]byte(message))
        if err != nil {
            log.Printf("发送错误: %v", err)
        }
    }

    duration := time.Since(start)
    totalData := int64(messageSize * messageCount)

    fmt.Printf("发送 %d 条消息 (%d bytes)\n", messageCount, totalData)
    fmt.Printf("总时间: %v\n", duration)
    fmt.Printf("吞吐量: %.2f msg/秒\n", float64(messageCount)/duration.Seconds())
    fmt.Printf("带宽: %.2f MB/秒\n", float64(totalData)/duration.Seconds()/1024/1024)
}

// 7. JSON 协议化的 UDP 通信
type UDPMessage struct {
    Type      string      `json:"type"`
    Sequence  int         `json:"sequence,omitempty"`
    Timestamp int64       `json:"timestamp"`
    Data      interface{} `json:"data"`
}

func jsonUDPServer(port string) {
    addr, err := net.ResolveUDPAddr("udp", ":"+port)
    if err != nil {
        log.Fatal(err)
    }

    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    fmt.Printf("JSON UDP 服务器启动 :%s\n", port)

    buffer := make([]byte, 4096)
    sequence := 1

    for {
        n, clientAddr, err := conn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("读取错误: %v", err)
            continue
        }

        var msg UDPMessage
        if err := json.Unmarshal(buffer[:n], &msg); err != nil {
            log.Printf("JSON 解析错误: %v", err)
            continue
        }

        fmt.Printf("收到 JSON 消息: %+v\n", msg)

        // 准备回复
        response := UDPMessage{
            Type:      "response",
            Sequence:  sequence,
            Timestamp: time.Now().UnixMilli(),
            Data:      fmt.Sprintf("已处理: %v", msg.Data),
        }

        sequence++

        responseData, err := json.Marshal(response)
        if err != nil {
            log.Printf("JSON 序列化错误: %v", err)
            continue
        }

        _, err = conn.WriteToUDP(responseData, clientAddr)
        if err != nil {
            log.Printf("发送回复错误: %v", err)
        }
    }
}

func main() {
    if len(os.Args) < 2 {
        fmt.Println("UDP 示例用法:")
        fmt.Println("  基本服务器: go run udp.go server [port]")
        fmt.Println("  基本客户端: go run udp.go client [host:port]")
        fmt.Println("  并发服务器: go run udp.go concurrent [port]")
        fmt.Println("  广播发送: go run udp.go broadcast-send [port]")
        fmt.Println("  广播接收: go run udp.go broadcast-recv [port]")
        fmt.Println("  可靠UDP服务器: go run udp.go reliable-server [port]")
        fmt.Println("  多播发送: go run udp.go multicast-send [port]")
        fmt.Println("  多播接收: go run udp.go multicast-recv [port]")
        fmt.Println("  JSON服务器: go run udp.go json-server [port]")
        return
    }

    mode := os.Args[1]
    port := "8080"
    if len(os.Args) > 2 {
        port = os.Args[2]
    }

    switch mode {
    case "server":
        basicUDPServer(port)

    case "client":
        addr := "localhost:" + port
        if len(os.Args) > 3 {
            addr = os.Args[2] + ":" + os.Args[3]
        }
        basicUDPClient(addr)

    case "concurrent":
        server := NewUDPServer(port)
        server.Start()

    case "broadcast-send":
        udpBroadcastSender("255.255.255.255", 9999)

    case "broadcast-recv":
        udpBroadcastReceiver(9999)

    case "reliable-server":
        reliableUDPServer(port)

    case "multicast-send":
        udpMulticastSender("224.0.0.1", 8888)

    case "multicast-recv":
        udpMulticastReceiver("224.0.0.1", 8888)

    case "json-server":
        jsonUDPServer(port)

    default:
        fmt.Println("未知模式")
    }
}

UDP 编程关键要点

  1. ​ 核心差异(vs TCP)​​

​ 无连接 ​:不需要 Accept(),直接读写

​ 数据报 ​:基于消息而非流

​ 地址跟踪 ​:需要手动管理客户端地址

  1. ​ 重要函数 ​

net.ResolveUDPAddr()- 解析 UDP 地址

net.ListenUDP()- 创建 UDP 服务器

net.DialUDP()- 创建 UDP 客户端

ReadFromUDP()/WriteToUDP()- 读写数据

  1. ​UDP 适用场景 ​

​ 实时应用 ​:音视频流、游戏

​ 广播/多播 ​:服务发现、通知

​DNS 查询 ​:简单的请求-响应

​ 监控数据 ​:允许少量数据丢失

  1. ​ 最佳实践 ​

​ 处理数据包丢失 ​:实现重传机制

​ 数据包大小 ​:注意 MTU(通常 1500 字节)

​ 错误处理 ​:UDP 更易出现网络错误

​ 超时控制 ​:设置合理的读写超时

​ 消息边界 ​:保持消息的完整性

UDP 在需要低延迟和低开销的场景中非常有用,但需要应用层处理可靠性和顺序问题。

通关密语:udp