Go RPC

RPC(Remote Procedure Call)是一种计算机通信协议,允许程序调用另一个地址空间(通常是另一台机器上)的过程或函数,就像调用本地方法一样。Go 语言在标准库中提供了完整的 RPC 支持。

下面是一个示例:

package main

import (
    "fmt"
    "log"
    "net"
    "net/rpc"
    "net/rpc/jsonrpc"
    "os"
    "sync"
    "time"
)

// 1. 基本 RPC 服务示例

// 算术服务
type Arith int

type Args struct {
    A, B int
}

type Quotient struct {
    Quo, Rem int
}

// 乘法运算
func (t *Arith) Multiply(args *Args, reply *int) error {
    *reply = args.A * args.B
    fmt.Printf("乘法计算: %d * %d = %d\n", args.A, args.B, *reply)
    return nil
}

// 除法运算
func (t *Arith) Divide(args *Args, quo *Quotient) error {
    if args.B == 0 {
        return fmt.Errorf("除数不能为零")
    }
    quo.Quo = args.A / args.B
    quo.Rem = args.A % args.B
    fmt.Printf("除法计算: %d / %d = %d 余 %d\n", args.A, args.B, quo.Quo, quo.Rem)
    return nil
}

// 2. 用户管理服务
type User struct {
    ID       int    `json:"id"`
    Name     string `json:"name"`
    Email    string `json:"email"`
    CreateAt int64  `json:"create_at"`
}

type UserService struct {
    users    map[int]*User
    mu       sync.RWMutex
    nextID   int
}

func NewUserService() *UserService {
    return &UserService{
        users:  make(map[int]*User),
        nextID: 1,
    }
}

type UserArgs struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

type UserReply struct {
    User  *User  `json:"user"`
    Error string `json:"error,omitempty"`
}

type UsersReply struct {
    Users []*User `json:"users"`
    Count int     `json:"count"`
}

// 创建用户
func (u *UserService) CreateUser(args *UserArgs, reply *UserReply) error {
    u.mu.Lock()
    defer u.mu.Unlock()

    user := &User{
        ID:       u.nextID,
        Name:     args.Name,
        Email:    args.Email,
        CreateAt: time.Now().Unix(),
    }

    u.users[user.ID] = user
    u.nextID++

    reply.User = user
    fmt.Printf("创建用户: ID=%d, Name=%s, Email=%s\n", user.ID, user.Name, user.Email)
    return nil
}

// 获取用户
func (u *UserService) GetUser(args *UserArgs, reply *UserReply) error {
    u.mu.RLock()
    defer u.mu.RUnlock()

    user, exists := u.users[args.ID]
    if !exists {
        reply.Error = "用户不存在"
        return fmt.Errorf("用户不存在: %d", args.ID)
    }

    reply.User = user
    fmt.Printf("获取用户: ID=%d, Name=%s\n", user.ID, user.Name)
    return nil
}

// 获取所有用户
func (u *UserService) GetAllUsers(args *struct{}, reply *UsersReply) error {
    u.mu.RLock()
    defer u.mu.RUnlock()

    users := make([]*User, 0, len(u.users))
    for _, user := range u.users {
        users = append(users, user)
    }

    reply.Users = users
    reply.Count = len(users)
    fmt.Printf("获取所有用户,总数: %d\n", len(users))
    return nil
}

// 更新用户
func (u *UserService) UpdateUser(args *UserArgs, reply *UserReply) error {
    u.mu.Lock()
    defer u.mu.Unlock()

    user, exists := u.users[args.ID]
    if !exists {
        reply.Error = "用户不存在"
        return fmt.Errorf("用户不存在: %d", args.ID)
    }

    if args.Name != "" {
        user.Name = args.Name
    }
    if args.Email != "" {
        user.Email = args.Email
    }

    reply.User = user
    fmt.Printf("更新用户: ID=%d, Name=%s, Email=%s\n", user.ID, user.Name, user.Email)
    return nil
}

// 3. 计算器服务(支持多种运算)
type Calculator struct{}

type CalcArgs struct {
    A, B float64
    Op   string // 操作符: +, -, *, /, pow
}

type CalcReply struct {
    Result float64 `json:"result"`
    Error  string  `json:"error,omitempty"`
}

func (c *Calculator) Calculate(args *CalcArgs, reply *CalcReply) error {
    switch args.Op {
    case "+":
        reply.Result = args.A + args.B
    case "-":
        reply.Result = args.A - args.B
    case "*":
        reply.Result = args.A * args.B
    case "/":
        if args.B == 0 {
            reply.Error = "除数不能为零"
            return fmt.Errorf("除数不能为零")
        }
        reply.Result = args.A / args.B
    case "pow":
        result := 1.0
        for i := 0; i < int(args.B); i++ {
            result *= args.A
        }
        reply.Result = result
    default:
        reply.Error = "不支持的操作符"
        return fmt.Errorf("不支持的操作符: %s", args.Op)
    }

    fmt.Printf("计算: %.2f %s %.2f = %.2f\n", args.A, args.Op, args.B, reply.Result)
    return nil
}

// 4. 统计服务
type StatsService struct {
    requestCount int
    mu          sync.RWMutex
}

type StatsArgs struct {
    Action string // "increment", "get"
}

type StatsReply struct {
    Count int `json:"count"`
}

func (s *StatsService) HandleStats(args *StatsArgs, reply *StatsReply) error {
    s.mu.Lock()
    defer s.mu.Unlock()

    switch args.Action {
    case "increment":
        s.requestCount++
        fmt.Printf("请求计数增加,当前: %d\n", s.requestCount)
    case "get":
        // 只是读取,不修改
    default:
        return fmt.Errorf("未知操作: %s", args.Action)
    }

    reply.Count = s.requestCount
    return nil
}

// 5. 文件操作服务
type FileService struct{}

type FileArgs struct {
    Filename string `json:"filename"`
    Content  string `json:"content,omitempty"`
}

type FileReply struct {
    Content string `json:"content,omitempty"`
    Size    int    `json:"size,omitempty"`
    Error   string `json:"error,omitempty"`
}

func (f *FileService) ReadFile(args *FileArgs, reply *FileReply) error {
    content, err := os.ReadFile(args.Filename)
    if err != nil {
        reply.Error = err.Error()
        return err
    }

    reply.Content = string(content)
    reply.Size = len(content)
    fmt.Printf("读取文件: %s, 大小: %d 字节\n", args.Filename, len(content))
    return nil
}

func (f *FileService) WriteFile(args *FileArgs, reply *FileReply) error {
    err := os.WriteFile(args.Filename, []byte(args.Content), 0644)
    if err != nil {
        reply.Error = err.Error()
        return err
    }

    reply.Size = len(args.Content)
    fmt.Printf("写入文件: %s, 大小: %d 字节\n", args.Filename, len(args.Content))
    return nil
}

// 6. TCP RPC 服务器
func startTCPRPCServer(port string) {
    arith := new(Arith)
    rpc.Register(arith)

    calculator := new(Calculator)
    rpc.Register(calculator)

    userService := NewUserService()
    rpc.Register(userService)

    statsService := new(StatsService)
    rpc.Register(statsService)

    fileService := new(FileService)
    rpc.Register(fileService)

    listener, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatal("监听错误:", err)
    }

    fmt.Printf("TCP RPC 服务器启动在 %s\n", port)
    fmt.Println("可用服务: Arith, Calculator, UserService, StatsService, FileService")

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("接受连接错误: %v", err)
            continue
        }

        go rpc.ServeConn(conn)
    }
}

// 7. HTTP RPC 服务器
func startHTTPRPCServer(port string) {
    arith := new(Arith)
    rpc.Register(arith)

    calculator := new(Calculator)
    rpc.Register(calculator)

    userService := NewUserService()
    rpc.Register(userService)

    rpc.HandleHTTP()

    fmt.Printf("HTTP RPC 服务器启动在 http://localhost%s\n", port)
    log.Fatal(http.ListenAndServe(port, nil))
}

// 8. JSON-RPC 服务器(跨语言兼容)
func startJSONRPCServer(port string) {
    arith := new(Arith)
    rpc.Register(arith)

    calculator := new(Calculator)
    rpc.Register(calculator)

    listener, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatal("监听错误:", err)
    }

    fmt.Printf("JSON-RPC 服务器启动在 %s\n", port)

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("接受连接错误: %v", err)
            continue
        }

        go jsonrpc.ServeConn(conn)
    }
}

// 9. RPC 客户端示例
func demonstrateRPCClient(serverType, address string) {
    fmt.Printf("\n=== %s RPC 客户端测试 ===\n", serverType)

    var client *rpc.Client
    var err error

    switch serverType {
    case "tcp":
        client, err = rpc.Dial("tcp", address)
    case "http":
        client, err = rpc.DialHTTP("tcp", address)
    case "json":
        conn, err := net.Dial("tcp", address)
        if err != nil {
            log.Fatal("连接错误:", err)
        }
        client = jsonrpc.NewClient(conn)
    default:
        log.Fatal("不支持的服务器类型")
    }

    if err != nil {
        log.Fatal("连接错误:", err)
    }
    defer client.Close()

    // 测试算术服务
    fmt.Println("\n1. 测试算术服务:")

    // 乘法测试
    var multiplyResult int
    multiplyArgs := &Args{A: 15, B: 4}
    err = client.Call("Arith.Multiply", multiplyArgs, &multiplyResult)
    if err != nil {
        log.Printf("乘法调用错误: %v", err)
    } else {
        fmt.Printf("乘法结果: %d * %d = %d\n", multiplyArgs.A, multiplyArgs.B, multiplyResult)
    }

    // 除法测试
    var divideResult Quotient
    divideArgs := &Args{A: 17, B: 3}
    err = client.Call("Arith.Divide", divideArgs, &divideResult)
    if err != nil {
        log.Printf("除法调用错误: %v", err)
    } else {
        fmt.Printf("除法结果: %d / %d = %d 余 %d\n",
            divideArgs.A, divideArgs.B, divideResult.Quo, divideResult.Rem)
    }

    // 测试计算器服务
    fmt.Println("\n2. 测试计算器服务:")

    calcArgs := &CalcArgs{A: 10, B: 3, Op: "+"}
    var calcReply CalcReply
    err = client.Call("Calculator.Calculate", calcArgs, &calcReply)
    if err != nil {
        log.Printf("计算器调用错误: %v", err)
    } else {
        fmt.Printf("计算器结果: %.2f %s %.2f = %.2f\n",
            calcArgs.A, calcArgs.Op, calcArgs.B, calcReply.Result)
    }

    // 测试用户服务
    fmt.Println("\n3. 测试用户服务:")

    // 创建用户
    userArgs := &UserArgs{Name: "张三", Email: "zhang@example.com"}
    var userReply UserReply
    err = client.Call("UserService.CreateUser", userArgs, &userReply)
    if err != nil {
        log.Printf("创建用户错误: %v", err)
    } else {
        fmt.Printf("创建用户成功: ID=%d, Name=%s, Email=%s\n",
            userReply.User.ID, userReply.User.Name, userReply.User.Email)
    }

    // 获取用户
    getUserArgs := &UserArgs{ID: 1}
    var getUserReply UserReply
    err = client.Call("UserService.GetUser", getUserArgs, &getUserReply)
    if err != nil {
        log.Printf("获取用户错误: %v", err)
    } else {
        fmt.Printf("获取用户成功: ID=%d, Name=%s\n",
            getUserReply.User.ID, getUserReply.User.Name)
    }

    // 获取所有用户
    var usersReply UsersReply
    err = client.Call("UserService.GetAllUsers", &struct{}{}, &usersReply)
    if err != nil {
        log.Printf("获取所有用户错误: %v", err)
    } else {
        fmt.Printf("获取所有用户成功: 总数=%d\n", usersReply.Count)
        for _, user := range usersReply.Users {
            fmt.Printf("  - 用户%d: %s (%s)\n", user.ID, user.Name, user.Email)
        }
    }

    // 异步调用示例
    fmt.Println("\n4. 测试异步调用:")

    var asyncResult int
    asyncArgs := &Args{A: 8, B: 7}
    asyncCall := client.Go("Arith.Multiply", asyncArgs, &asyncResult, nil)

    // 执行其他工作
    fmt.Println("异步调用中,执行其他工作...")
    time.Sleep(100 * time.Millisecond)

    // 等待异步调用完成
    <-asyncCall.Done
    if asyncCall.Error != nil {
        log.Printf("异步调用错误: %v", asyncCall.Error)
    } else {
        fmt.Printf("异步乘法结果: %d * %d = %d\n", asyncArgs.A, asyncArgs.B, asyncResult)
    }
}

// 10. 高级特性:带超时的 RPC 客户端
type TimeoutClient struct {
    client  *rpc.Client
    timeout time.Duration
}

func NewTimeoutClient(network, address string, timeout time.Duration) (*TimeoutClient, error) {
    conn, err := net.DialTimeout(network, address, timeout)
    if err != nil {
        return nil, err
    }

    client := rpc.NewClient(conn)
    return &TimeoutClient{
        client:  client,
        timeout: timeout,
    }, nil
}

func (c *TimeoutClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
    done := make(chan error, 1)

    go func() {
        done <- c.client.Call(serviceMethod, args, reply)
    }()

    select {
    case err := <-done:
        return err
    case <-time.After(c.timeout):
        return fmt.Errorf("RPC 调用超时")
    }
}

func demonstrateTimeoutClient() {
    fmt.Println("\n=== 超时客户端测试 ===")

    client, err := NewTimeoutClient("tcp", "localhost:1235", 2*time.Second)
    if err != nil {
        log.Printf("连接错误: %v", err)
        return
    }
    defer client.client.Close()

    var result int
    args := &Args{A: 5, B: 6}

    err = client.Call("Arith.Multiply", args, &result)
    if err != nil {
        fmt.Printf("调用失败: %v\n", err)
    } else {
        fmt.Printf("调用成功: %d * %d = %d\n", args.A, args.B, result)
    }
}

func main() {
    if len(os.Args) < 2 {
        fmt.Println("RPC 示例用法:")
        fmt.Println("  TCP 服务器: go run rpc.go tcp-server [port]")
        fmt.Println("  HTTP 服务器: go run rpc.go http-server [port]")
        fmt.Println("  JSON-RPC 服务器: go run rpc.go json-server [port]")
        fmt.Println("  TCP 客户端: go run rpc.go tcp-client [host:port]")
        fmt.Println("  HTTP 客户端: go run rpc.go http-client [host:port]")
        fmt.Println("  JSON-RPC 客户端: go run rpc.go json-client [host:port]")
        fmt.Println("  超时客户端测试: go run rpc.go timeout-client")
        return
    }

    mode := os.Args[1]
    port := ":1234"
    if len(os.Args) > 2 {
        port = ":" + os.Args[2]
    }

    switch mode {
    case "tcp-server":
        startTCPRPCServer(port)
    case "http-server":
        startHTTPRPCServer(port)
    case "json-server":
        startJSONRPCServer(port)
    case "tcp-client":
        address := "localhost:1234"
        if len(os.Args) > 2 {
            address = os.Args[2]
        }
        demonstrateRPCClient("tcp", address)
    case "http-client":
        address := "localhost:1234"
        if len(os.Args) > 2 {
            address = os.Args[2]
        }
        demonstrateRPCClient("http", address)
    case "json-client":
        address := "localhost:1234"
        if len(os.Args) > 2 {
            address = os.Args[2]
        }
        demonstrateRPCClient("json", address)
    case "timeout-client":
        demonstrateTimeoutClient()
    default:
        fmt.Println("未知模式")
    }
}

RPC 编程关键要点

  1. ​ 核心概念 ​

​ 服务注册 ​:rpc.Register()注册服务实例

​ 方法签名 ​:func (t *T) Method(args *ArgType, reply *ReplyType) error

​ 传输协议 ​:TCP、HTTP、JSON-RPC 等

  1. ​ 方法规则 ​

方法必须是导出的(大写字母开头)

必须有两个参数,且都是指针类型

第二个参数必须是指针类型

必须返回 error 类型

  1. ​ 重要特性 ​

​ 并发安全 ​:多个客户端可以同时调用

​ 类型安全 ​:编译时类型检查

​ 自动序列化 ​:gob 编码(可替换为 JSON 等)

  1. ​ 最佳实践 ​

​ 错误处理 ​:方法始终返回 error

​ 超时控制 ​:客户端设置合理的超时时间

​ 连接管理 ​:妥善处理连接建立和关闭

​ 服务发现 ​:在生产环境中需要服务发现机制

​ 负载均衡 ​:多个服务实例的负载均衡

  1. ​ 适用场景 ​

微服务架构中的服务间通信

分布式系统组件通信

需要强类型接口的远程调用

高性能的内部服务调用

Go 的 RPC 框架简单易用且性能优秀,非常适合构建分布式系统和微服务架构。

通关密语:rpc