Go UDP
UDP(User Datagram Protocol)是一种无连接的、不可靠的传输层协议。与 TCP 不同,UDP 不建立连接,不保证数据包的顺序和可靠性,但具有更低的延迟和开销。
下面是一个示例:
package main
import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"
)
// 1. 基本 UDP 服务器和客户端
func basicUDPServer(port string) {
fmt.Printf("启动基本 UDP 服务器 :%s\n", port)
addr, err := net.ResolveUDPAddr("udp", ":"+port)
if err != nil {
log.Fatalf("解析地址失败: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("启动服务器失败: %v", err)
}
defer conn.Close()
fmt.Printf("UDP 服务器监听在 %s\n", conn.LocalAddr().String())
buffer := make([]byte, 1024)
for {
// 读取数据(会阻塞直到收到数据)
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取数据错误: %v", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到来自 %s 的消息: %s\n", clientAddr.String(), message)
// 回复客户端
response := fmt.Sprintf("服务器回复: %s", strings.ToUpper(message))
_, err = conn.WriteToUDP([]byte(response), clientAddr)
if err != nil {
log.Printf("发送回复错误: %v", err)
}
}
}
func basicUDPClient(serverAddr string) {
fmt.Printf("连接 UDP 服务器: %s\n", serverAddr)
// 解析服务器地址
remoteAddr, err := net.ResolveUDPAddr("udp", serverAddr)
if err != nil {
log.Fatalf("解析服务器地址失败: %v", err)
}
// 创建客户端连接(UDP 不需要实际连接)
conn, err := net.DialUDP("udp", nil, remoteAddr)
if err != nil {
log.Fatalf("创建客户端失败: %v", err)
}
defer conn.Close()
// 发送消息
messages := []string{
"Hello UDP!",
"这是第二条消息",
"测试消息",
"quit",
}
for _, message := range messages {
fmt.Printf("发送消息: %s\n", message)
// 发送数据
_, err = conn.Write([]byte(message))
if err != nil {
log.Printf("发送失败: %v", err)
continue
}
// 设置读取超时
conn.SetReadDeadline(time.Now().Add(3 * time.Second))
// 接收回复
buffer := make([]byte, 1024)
n, err := conn.Read(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Println("接收超时(UDP 数据包可能丢失)")
continue
}
log.Printf("接收错误: %v", err)
continue
}
response := string(buffer[:n])
fmt.Printf("服务器回复: %s\n", response)
time.Sleep(1 * time.Second)
if message == "quit" {
break
}
}
}
// 2. 并发 UDP 服务器 with 连接跟踪
type UDPServer struct {
conn *net.UDPConn
clients map[string]*net.UDPAddr // 跟踪客户端
clientsMu sync.RWMutex
}
func NewUDPServer(port string) *UDPServer {
addr, err := net.ResolveUDPAddr("udp", ":"+port)
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
return &UDPServer{
conn: conn,
clients: make(map[string]*net.UDPAddr),
}
}
func (s *UDPServer) Start() {
fmt.Printf("并发 UDP 服务器启动 :%s\n", s.conn.LocalAddr().(*net.UDPAddr).Port)
buffer := make([]byte, 4096) // 更大的缓冲区
for {
n, clientAddr, err := s.conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取错误: %v", err)
continue
}
// 处理每个客户端请求的 goroutine
go s.handleClient(clientAddr, buffer[:n])
}
}
func (s *UDPServer) handleClient(clientAddr *net.UDPAddr, data []byte) {
clientKey := clientAddr.String()
// 注册新客户端
s.clientsMu.Lock()
if _, exists := s.clients[clientKey]; !exists {
s.clients[clientKey] = clientAddr
fmt.Printf("新客户端连接: %s, 总客户端数: %d\n", clientKey, len(s.clients))
}
s.clientsMu.Unlock()
message := string(data)
fmt.Printf("来自 %s: %s\n", clientKey, message)
// 处理命令
if strings.HasPrefix(message, "/") {
s.handleCommand(clientAddr, message)
} else {
s.broadcastMessage(clientAddr, message)
}
}
func (s *UDPServer) handleCommand(clientAddr *net.UDPAddr, command string) {
parts := strings.Fields(command)
if len(parts) == 0 {
return
}
var response string
switch parts[0] {
case "/time":
response = fmt.Sprintf("服务器时间: %s", time.Now().Format("15:04:05.000"))
case "/clients":
s.clientsMu.RLock()
count := len(s.clients)
response = fmt.Sprintf("在线客户端数: %d", count)
s.clientsMu.RUnlock()
case "/stats":
response = "UDP 服务器状态: 运行中"
default:
response = "未知命令。可用命令: /time, /clients, /stats"
}
s.sendToClient(clientAddr, response)
}
func (s *UDPServer) broadcastMessage(senderAddr *net.UDPAddr, message string) {
s.clientsMu.RLock()
defer s.clientsMu.RUnlock()
broadcastMsg := fmt.Sprintf("[%s]: %s", senderAddr.String(), message)
for _, clientAddr := range s.clients {
if clientAddr.String() != senderAddr.String() {
go s.sendToClient(clientAddr, broadcastMsg)
}
}
}
func (s *UDPServer) sendToClient(clientAddr *net.UDPAddr, message string) {
_, err := s.conn.WriteToUDP([]byte(message), clientAddr)
if err != nil {
log.Printf("发送给 %s 失败: %v", clientAddr.String(), err)
// 移除失效的客户端
s.clientsMu.Lock()
delete(s.clients, clientAddr.String())
s.clientsMu.Unlock()
}
}
func (s *UDPServer) Stop() {
s.conn.Close()
fmt.Println("UDP 服务器已关闭")
}
// 3. UDP 广播示例
func udpBroadcastSender(broadcastAddr string, port int) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", broadcastAddr, port))
if err != nil {
log.Fatal(err)
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
// 设置广播权限
// 注意:在某些系统上可能需要特殊权限
fmt.Printf("开始广播到 %s:%d\n", broadcastAddr, port)
for i := 1; i <= 5; i++ {
message := fmt.Sprintf("广播消息 #%d - 时间: %s", i, time.Now().Format("15:04:05"))
_, err := conn.Write([]byte(message))
if err != nil {
log.Printf("广播发送失败: %v", err)
continue
}
fmt.Printf("发送广播: %s\n", message)
time.Sleep(2 * time.Second)
}
}
func udpBroadcastReceiver(port int) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Printf("监听广播消息端口 :%d\n", port)
buffer := make([]byte, 1024)
for {
n, srcAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取广播错误: %v", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到广播来自 %s: %s\n", srcAddr.String(), message)
}
}
// 4. 可靠 UDP 协议实现(简单重传机制)
type ReliableUDPClient struct {
serverAddr *net.UDPAddr
conn *net.UDPConn
sequence uint32
mu sync.Mutex
}
func NewReliableUDPClient(server string) *ReliableUDPClient {
serverAddr, err := net.ResolveUDPAddr("udp", server)
if err != nil {
log.Fatal(err)
}
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
log.Fatal(err)
}
return &ReliableUDPClient{
serverAddr: serverAddr,
conn: conn,
sequence: 1,
}
}
func (c *ReliableUDPClient) SendReliable(message string) error {
c.mu.Lock()
seq := c.sequence
c.sequence++
c.mu.Unlock()
// 创建可靠消息格式:序列号|数据
reliableMsg := fmt.Sprintf("%d|%s", seq, message)
maxRetries := 3
timeout := 2 * time.Second
for attempt := 1; attempt <= maxRetries; attempt++ {
fmt.Printf("发送消息 (序列号 %d, 尝试 %d): %s\n", seq, attempt, message)
// 发送消息
_, err := c.conn.Write([]byte(reliableMsg))
if err != nil {
return fmt.Errorf("发送失败: %v", err)
}
// 等待确认
c.conn.SetReadDeadline(time.Now().Add(timeout))
ackBuffer := make([]byte, 1024)
n, err := c.conn.Read(ackBuffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("尝试 %d 超时,重试...\n", attempt)
continue
}
return fmt.Errorf("接收确认失败: %v", err)
}
ackMsg := string(ackBuffer[:n])
if ackMsg == fmt.Sprintf("ACK:%d", seq) {
fmt.Printf("消息 %d 确认成功\n", seq)
return nil
}
}
return fmt.Errorf("消息 %d 发送失败,达到最大重试次数", seq)
}
func reliableUDPServer(port string) {
addr, err := net.ResolveUDPAddr("udp", ":"+port)
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Printf("可靠 UDP 服务器启动 :%s\n", port)
receivedSequences := make(map[uint32]bool)
buffer := make([]byte, 1024)
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取错误: %v", err)
continue
}
message := string(buffer[:n])
parts := strings.SplitN(message, "|", 2)
if len(parts) != 2 {
log.Printf("无效消息格式: %s", message)
continue
}
seq, err := strconv.ParseUint(parts[0], 10, 32)
if err != nil {
log.Printf("无效序列号: %s", parts[0])
continue
}
seqNum := uint32(seq)
data := parts[1]
// 检查重复消息
if receivedSequences[seqNum] {
fmt.Printf("收到重复消息,序列号: %d, 数据: %s\n", seqNum, data)
} else {
receivedSequences[seqNum] = true
fmt.Printf("收到新消息,序列号: %d, 数据: %s\n", seqNum, data)
}
// 发送确认
ack := fmt.Sprintf("ACK:%d", seqNum)
_, err = conn.WriteToUDP([]byte(ack), clientAddr)
if err != nil {
log.Printf("发送确认失败: %v", err)
}
// 简单清理过期的序列号(生产环境需要更复杂的机制)
if len(receivedSequences) > 1000 {
receivedSequences = make(map[uint32]bool)
}
}
}
// 5. UDP 多播示例
func udpMulticastSender(multicastAddr string, port int) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", multicastAddr, port))
if err != nil {
log.Fatal(err)
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Printf("多播发送到 %s:%d\n", multicastAddr, port)
for i := 1; i <= 10; i++ {
message := fmt.Sprintf("多播消息 #%d - 时间: %s", i, time.Now().Format("15:04:05.000"))
_, err := conn.Write([]byte(message))
if err != nil {
log.Printf("多播发送失败: %v", err)
continue
}
fmt.Printf("发送多播: %s\n", message)
time.Sleep(1 * time.Second)
}
}
func udpMulticastReceiver(multicastAddr string, port int) {
addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", multicastAddr, port))
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Printf("加入多播组 %s:%d\n", multicastAddr, port)
buffer := make([]byte, 1024)
for {
n, srcAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取多播错误: %v", err)
continue
}
message := string(buffer[:n])
fmt.Printf("收到多播来自 %s: %s\n", srcAddr.String(), message)
}
}
// 6. 性能测试:UDP vs 简单模拟
func udpPerformanceTest(serverAddr string) {
fmt.Println("=== UDP 性能测试 ===")
conn, err := net.Dial("udp", serverAddr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
messageSize := 100 // 字节
messageCount := 1000
message := strings.Repeat("X", messageSize)
start := time.Now()
for i := 0; i < messageCount; i++ {
_, err := conn.Write([]byte(message))
if err != nil {
log.Printf("发送错误: %v", err)
}
}
duration := time.Since(start)
totalData := int64(messageSize * messageCount)
fmt.Printf("发送 %d 条消息 (%d bytes)\n", messageCount, totalData)
fmt.Printf("总时间: %v\n", duration)
fmt.Printf("吞吐量: %.2f msg/秒\n", float64(messageCount)/duration.Seconds())
fmt.Printf("带宽: %.2f MB/秒\n", float64(totalData)/duration.Seconds()/1024/1024)
}
// 7. JSON 协议化的 UDP 通信
type UDPMessage struct {
Type string `json:"type"`
Sequence int `json:"sequence,omitempty"`
Timestamp int64 `json:"timestamp"`
Data interface{} `json:"data"`
}
func jsonUDPServer(port string) {
addr, err := net.ResolveUDPAddr("udp", ":"+port)
if err != nil {
log.Fatal(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
fmt.Printf("JSON UDP 服务器启动 :%s\n", port)
buffer := make([]byte, 4096)
sequence := 1
for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
log.Printf("读取错误: %v", err)
continue
}
var msg UDPMessage
if err := json.Unmarshal(buffer[:n], &msg); err != nil {
log.Printf("JSON 解析错误: %v", err)
continue
}
fmt.Printf("收到 JSON 消息: %+v\n", msg)
// 准备回复
response := UDPMessage{
Type: "response",
Sequence: sequence,
Timestamp: time.Now().UnixMilli(),
Data: fmt.Sprintf("已处理: %v", msg.Data),
}
sequence++
responseData, err := json.Marshal(response)
if err != nil {
log.Printf("JSON 序列化错误: %v", err)
continue
}
_, err = conn.WriteToUDP(responseData, clientAddr)
if err != nil {
log.Printf("发送回复错误: %v", err)
}
}
}
func main() {
if len(os.Args) < 2 {
fmt.Println("UDP 示例用法:")
fmt.Println(" 基本服务器: go run udp.go server [port]")
fmt.Println(" 基本客户端: go run udp.go client [host:port]")
fmt.Println(" 并发服务器: go run udp.go concurrent [port]")
fmt.Println(" 广播发送: go run udp.go broadcast-send [port]")
fmt.Println(" 广播接收: go run udp.go broadcast-recv [port]")
fmt.Println(" 可靠UDP服务器: go run udp.go reliable-server [port]")
fmt.Println(" 多播发送: go run udp.go multicast-send [port]")
fmt.Println(" 多播接收: go run udp.go multicast-recv [port]")
fmt.Println(" JSON服务器: go run udp.go json-server [port]")
return
}
mode := os.Args[1]
port := "8080"
if len(os.Args) > 2 {
port = os.Args[2]
}
switch mode {
case "server":
basicUDPServer(port)
case "client":
addr := "localhost:" + port
if len(os.Args) > 3 {
addr = os.Args[2] + ":" + os.Args[3]
}
basicUDPClient(addr)
case "concurrent":
server := NewUDPServer(port)
server.Start()
case "broadcast-send":
udpBroadcastSender("255.255.255.255", 9999)
case "broadcast-recv":
udpBroadcastReceiver(9999)
case "reliable-server":
reliableUDPServer(port)
case "multicast-send":
udpMulticastSender("224.0.0.1", 8888)
case "multicast-recv":
udpMulticastReceiver("224.0.0.1", 8888)
case "json-server":
jsonUDPServer(port)
default:
fmt.Println("未知模式")
}
}
UDP 编程关键要点
- 核心差异(vs TCP)
无连接 :不需要 Accept(),直接读写
数据报 :基于消息而非流
地址跟踪 :需要手动管理客户端地址
- 重要函数
net.ResolveUDPAddr()- 解析 UDP 地址
net.ListenUDP()- 创建 UDP 服务器
net.DialUDP()- 创建 UDP 客户端
ReadFromUDP()/WriteToUDP()- 读写数据
- UDP 适用场景
实时应用 :音视频流、游戏
广播/多播 :服务发现、通知
DNS 查询 :简单的请求-响应
监控数据 :允许少量数据丢失
- 最佳实践
处理数据包丢失 :实现重传机制
数据包大小 :注意 MTU(通常 1500 字节)
错误处理 :UDP 更易出现网络错误
超时控制 :设置合理的读写超时
消息边界 :保持消息的完整性
UDP 在需要低延迟和低开销的场景中非常有用,但需要应用层处理可靠性和顺序问题。
通关密语:udp