Go RPC
RPC(Remote Procedure Call)是一种计算机通信协议,允许程序调用另一个地址空间(通常是另一台机器上)的过程或函数,就像调用本地方法一样。Go 语言在标准库中提供了完整的 RPC 支持。
下面是一个示例:
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"os"
"sync"
"time"
)
// 1. 基本 RPC 服务示例
// 算术服务
type Arith int
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
// 乘法运算
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
fmt.Printf("乘法计算: %d * %d = %d\n", args.A, args.B, *reply)
return nil
}
// 除法运算
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return fmt.Errorf("除数不能为零")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
fmt.Printf("除法计算: %d / %d = %d 余 %d\n", args.A, args.B, quo.Quo, quo.Rem)
return nil
}
// 2. 用户管理服务
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
CreateAt int64 `json:"create_at"`
}
type UserService struct {
users map[int]*User
mu sync.RWMutex
nextID int
}
func NewUserService() *UserService {
return &UserService{
users: make(map[int]*User),
nextID: 1,
}
}
type UserArgs struct {
ID int `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
}
type UserReply struct {
User *User `json:"user"`
Error string `json:"error,omitempty"`
}
type UsersReply struct {
Users []*User `json:"users"`
Count int `json:"count"`
}
// 创建用户
func (u *UserService) CreateUser(args *UserArgs, reply *UserReply) error {
u.mu.Lock()
defer u.mu.Unlock()
user := &User{
ID: u.nextID,
Name: args.Name,
Email: args.Email,
CreateAt: time.Now().Unix(),
}
u.users[user.ID] = user
u.nextID++
reply.User = user
fmt.Printf("创建用户: ID=%d, Name=%s, Email=%s\n", user.ID, user.Name, user.Email)
return nil
}
// 获取用户
func (u *UserService) GetUser(args *UserArgs, reply *UserReply) error {
u.mu.RLock()
defer u.mu.RUnlock()
user, exists := u.users[args.ID]
if !exists {
reply.Error = "用户不存在"
return fmt.Errorf("用户不存在: %d", args.ID)
}
reply.User = user
fmt.Printf("获取用户: ID=%d, Name=%s\n", user.ID, user.Name)
return nil
}
// 获取所有用户
func (u *UserService) GetAllUsers(args *struct{}, reply *UsersReply) error {
u.mu.RLock()
defer u.mu.RUnlock()
users := make([]*User, 0, len(u.users))
for _, user := range u.users {
users = append(users, user)
}
reply.Users = users
reply.Count = len(users)
fmt.Printf("获取所有用户,总数: %d\n", len(users))
return nil
}
// 更新用户
func (u *UserService) UpdateUser(args *UserArgs, reply *UserReply) error {
u.mu.Lock()
defer u.mu.Unlock()
user, exists := u.users[args.ID]
if !exists {
reply.Error = "用户不存在"
return fmt.Errorf("用户不存在: %d", args.ID)
}
if args.Name != "" {
user.Name = args.Name
}
if args.Email != "" {
user.Email = args.Email
}
reply.User = user
fmt.Printf("更新用户: ID=%d, Name=%s, Email=%s\n", user.ID, user.Name, user.Email)
return nil
}
// 3. 计算器服务(支持多种运算)
type Calculator struct{}
type CalcArgs struct {
A, B float64
Op string // 操作符: +, -, *, /, pow
}
type CalcReply struct {
Result float64 `json:"result"`
Error string `json:"error,omitempty"`
}
func (c *Calculator) Calculate(args *CalcArgs, reply *CalcReply) error {
switch args.Op {
case "+":
reply.Result = args.A + args.B
case "-":
reply.Result = args.A - args.B
case "*":
reply.Result = args.A * args.B
case "/":
if args.B == 0 {
reply.Error = "除数不能为零"
return fmt.Errorf("除数不能为零")
}
reply.Result = args.A / args.B
case "pow":
result := 1.0
for i := 0; i < int(args.B); i++ {
result *= args.A
}
reply.Result = result
default:
reply.Error = "不支持的操作符"
return fmt.Errorf("不支持的操作符: %s", args.Op)
}
fmt.Printf("计算: %.2f %s %.2f = %.2f\n", args.A, args.Op, args.B, reply.Result)
return nil
}
// 4. 统计服务
type StatsService struct {
requestCount int
mu sync.RWMutex
}
type StatsArgs struct {
Action string // "increment", "get"
}
type StatsReply struct {
Count int `json:"count"`
}
func (s *StatsService) HandleStats(args *StatsArgs, reply *StatsReply) error {
s.mu.Lock()
defer s.mu.Unlock()
switch args.Action {
case "increment":
s.requestCount++
fmt.Printf("请求计数增加,当前: %d\n", s.requestCount)
case "get":
// 只是读取,不修改
default:
return fmt.Errorf("未知操作: %s", args.Action)
}
reply.Count = s.requestCount
return nil
}
// 5. 文件操作服务
type FileService struct{}
type FileArgs struct {
Filename string `json:"filename"`
Content string `json:"content,omitempty"`
}
type FileReply struct {
Content string `json:"content,omitempty"`
Size int `json:"size,omitempty"`
Error string `json:"error,omitempty"`
}
func (f *FileService) ReadFile(args *FileArgs, reply *FileReply) error {
content, err := os.ReadFile(args.Filename)
if err != nil {
reply.Error = err.Error()
return err
}
reply.Content = string(content)
reply.Size = len(content)
fmt.Printf("读取文件: %s, 大小: %d 字节\n", args.Filename, len(content))
return nil
}
func (f *FileService) WriteFile(args *FileArgs, reply *FileReply) error {
err := os.WriteFile(args.Filename, []byte(args.Content), 0644)
if err != nil {
reply.Error = err.Error()
return err
}
reply.Size = len(args.Content)
fmt.Printf("写入文件: %s, 大小: %d 字节\n", args.Filename, len(args.Content))
return nil
}
// 6. TCP RPC 服务器
func startTCPRPCServer(port string) {
arith := new(Arith)
rpc.Register(arith)
calculator := new(Calculator)
rpc.Register(calculator)
userService := NewUserService()
rpc.Register(userService)
statsService := new(StatsService)
rpc.Register(statsService)
fileService := new(FileService)
rpc.Register(fileService)
listener, err := net.Listen("tcp", port)
if err != nil {
log.Fatal("监听错误:", err)
}
fmt.Printf("TCP RPC 服务器启动在 %s\n", port)
fmt.Println("可用服务: Arith, Calculator, UserService, StatsService, FileService")
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("接受连接错误: %v", err)
continue
}
go rpc.ServeConn(conn)
}
}
// 7. HTTP RPC 服务器
func startHTTPRPCServer(port string) {
arith := new(Arith)
rpc.Register(arith)
calculator := new(Calculator)
rpc.Register(calculator)
userService := NewUserService()
rpc.Register(userService)
rpc.HandleHTTP()
fmt.Printf("HTTP RPC 服务器启动在 http://localhost%s\n", port)
log.Fatal(http.ListenAndServe(port, nil))
}
// 8. JSON-RPC 服务器(跨语言兼容)
func startJSONRPCServer(port string) {
arith := new(Arith)
rpc.Register(arith)
calculator := new(Calculator)
rpc.Register(calculator)
listener, err := net.Listen("tcp", port)
if err != nil {
log.Fatal("监听错误:", err)
}
fmt.Printf("JSON-RPC 服务器启动在 %s\n", port)
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("接受连接错误: %v", err)
continue
}
go jsonrpc.ServeConn(conn)
}
}
// 9. RPC 客户端示例
func demonstrateRPCClient(serverType, address string) {
fmt.Printf("\n=== %s RPC 客户端测试 ===\n", serverType)
var client *rpc.Client
var err error
switch serverType {
case "tcp":
client, err = rpc.Dial("tcp", address)
case "http":
client, err = rpc.DialHTTP("tcp", address)
case "json":
conn, err := net.Dial("tcp", address)
if err != nil {
log.Fatal("连接错误:", err)
}
client = jsonrpc.NewClient(conn)
default:
log.Fatal("不支持的服务器类型")
}
if err != nil {
log.Fatal("连接错误:", err)
}
defer client.Close()
// 测试算术服务
fmt.Println("\n1. 测试算术服务:")
// 乘法测试
var multiplyResult int
multiplyArgs := &Args{A: 15, B: 4}
err = client.Call("Arith.Multiply", multiplyArgs, &multiplyResult)
if err != nil {
log.Printf("乘法调用错误: %v", err)
} else {
fmt.Printf("乘法结果: %d * %d = %d\n", multiplyArgs.A, multiplyArgs.B, multiplyResult)
}
// 除法测试
var divideResult Quotient
divideArgs := &Args{A: 17, B: 3}
err = client.Call("Arith.Divide", divideArgs, ÷Result)
if err != nil {
log.Printf("除法调用错误: %v", err)
} else {
fmt.Printf("除法结果: %d / %d = %d 余 %d\n",
divideArgs.A, divideArgs.B, divideResult.Quo, divideResult.Rem)
}
// 测试计算器服务
fmt.Println("\n2. 测试计算器服务:")
calcArgs := &CalcArgs{A: 10, B: 3, Op: "+"}
var calcReply CalcReply
err = client.Call("Calculator.Calculate", calcArgs, &calcReply)
if err != nil {
log.Printf("计算器调用错误: %v", err)
} else {
fmt.Printf("计算器结果: %.2f %s %.2f = %.2f\n",
calcArgs.A, calcArgs.Op, calcArgs.B, calcReply.Result)
}
// 测试用户服务
fmt.Println("\n3. 测试用户服务:")
// 创建用户
userArgs := &UserArgs{Name: "张三", Email: "zhang@example.com"}
var userReply UserReply
err = client.Call("UserService.CreateUser", userArgs, &userReply)
if err != nil {
log.Printf("创建用户错误: %v", err)
} else {
fmt.Printf("创建用户成功: ID=%d, Name=%s, Email=%s\n",
userReply.User.ID, userReply.User.Name, userReply.User.Email)
}
// 获取用户
getUserArgs := &UserArgs{ID: 1}
var getUserReply UserReply
err = client.Call("UserService.GetUser", getUserArgs, &getUserReply)
if err != nil {
log.Printf("获取用户错误: %v", err)
} else {
fmt.Printf("获取用户成功: ID=%d, Name=%s\n",
getUserReply.User.ID, getUserReply.User.Name)
}
// 获取所有用户
var usersReply UsersReply
err = client.Call("UserService.GetAllUsers", &struct{}{}, &usersReply)
if err != nil {
log.Printf("获取所有用户错误: %v", err)
} else {
fmt.Printf("获取所有用户成功: 总数=%d\n", usersReply.Count)
for _, user := range usersReply.Users {
fmt.Printf(" - 用户%d: %s (%s)\n", user.ID, user.Name, user.Email)
}
}
// 异步调用示例
fmt.Println("\n4. 测试异步调用:")
var asyncResult int
asyncArgs := &Args{A: 8, B: 7}
asyncCall := client.Go("Arith.Multiply", asyncArgs, &asyncResult, nil)
// 执行其他工作
fmt.Println("异步调用中,执行其他工作...")
time.Sleep(100 * time.Millisecond)
// 等待异步调用完成
<-asyncCall.Done
if asyncCall.Error != nil {
log.Printf("异步调用错误: %v", asyncCall.Error)
} else {
fmt.Printf("异步乘法结果: %d * %d = %d\n", asyncArgs.A, asyncArgs.B, asyncResult)
}
}
// 10. 高级特性:带超时的 RPC 客户端
type TimeoutClient struct {
client *rpc.Client
timeout time.Duration
}
func NewTimeoutClient(network, address string, timeout time.Duration) (*TimeoutClient, error) {
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, err
}
client := rpc.NewClient(conn)
return &TimeoutClient{
client: client,
timeout: timeout,
}, nil
}
func (c *TimeoutClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
done := make(chan error, 1)
go func() {
done <- c.client.Call(serviceMethod, args, reply)
}()
select {
case err := <-done:
return err
case <-time.After(c.timeout):
return fmt.Errorf("RPC 调用超时")
}
}
func demonstrateTimeoutClient() {
fmt.Println("\n=== 超时客户端测试 ===")
client, err := NewTimeoutClient("tcp", "localhost:1235", 2*time.Second)
if err != nil {
log.Printf("连接错误: %v", err)
return
}
defer client.client.Close()
var result int
args := &Args{A: 5, B: 6}
err = client.Call("Arith.Multiply", args, &result)
if err != nil {
fmt.Printf("调用失败: %v\n", err)
} else {
fmt.Printf("调用成功: %d * %d = %d\n", args.A, args.B, result)
}
}
func main() {
if len(os.Args) < 2 {
fmt.Println("RPC 示例用法:")
fmt.Println(" TCP 服务器: go run rpc.go tcp-server [port]")
fmt.Println(" HTTP 服务器: go run rpc.go http-server [port]")
fmt.Println(" JSON-RPC 服务器: go run rpc.go json-server [port]")
fmt.Println(" TCP 客户端: go run rpc.go tcp-client [host:port]")
fmt.Println(" HTTP 客户端: go run rpc.go http-client [host:port]")
fmt.Println(" JSON-RPC 客户端: go run rpc.go json-client [host:port]")
fmt.Println(" 超时客户端测试: go run rpc.go timeout-client")
return
}
mode := os.Args[1]
port := ":1234"
if len(os.Args) > 2 {
port = ":" + os.Args[2]
}
switch mode {
case "tcp-server":
startTCPRPCServer(port)
case "http-server":
startHTTPRPCServer(port)
case "json-server":
startJSONRPCServer(port)
case "tcp-client":
address := "localhost:1234"
if len(os.Args) > 2 {
address = os.Args[2]
}
demonstrateRPCClient("tcp", address)
case "http-client":
address := "localhost:1234"
if len(os.Args) > 2 {
address = os.Args[2]
}
demonstrateRPCClient("http", address)
case "json-client":
address := "localhost:1234"
if len(os.Args) > 2 {
address = os.Args[2]
}
demonstrateRPCClient("json", address)
case "timeout-client":
demonstrateTimeoutClient()
default:
fmt.Println("未知模式")
}
}
RPC 编程关键要点
- 核心概念
服务注册 :rpc.Register()注册服务实例
方法签名 :func (t *T) Method(args *ArgType, reply *ReplyType) error
传输协议 :TCP、HTTP、JSON-RPC 等
- 方法规则
方法必须是导出的(大写字母开头)
必须有两个参数,且都是指针类型
第二个参数必须是指针类型
必须返回 error 类型
- 重要特性
并发安全 :多个客户端可以同时调用
类型安全 :编译时类型检查
自动序列化 :gob 编码(可替换为 JSON 等)
- 最佳实践
错误处理 :方法始终返回 error
超时控制 :客户端设置合理的超时时间
连接管理 :妥善处理连接建立和关闭
服务发现 :在生产环境中需要服务发现机制
负载均衡 :多个服务实例的负载均衡
- 适用场景
微服务架构中的服务间通信
分布式系统组件通信
需要强类型接口的远程调用
高性能的内部服务调用
Go 的 RPC 框架简单易用且性能优秀,非常适合构建分布式系统和微服务架构。
通关密语:rpc