gRPC入门系列之3-stream模式

gRPC入门系列之3-stream模式

December 3, 2023
Grpc, 微服务
Grpc

gRPC的流模式有三种:

  • 客户端流模式,客户端向服务端发送多条消息,发送完毕后等待服务端返回结果。这可用于数据上报等场景。
  • 服务端流模式,客户端向服务端发送一条消息,服务端向客户端返回多条消息。这可用于服务端向客户端推送消息的场景,比如实时发送股票行情。
  • 双向流模式,客户端和服务端都向对方发送多条消息。你可能会在聊天应用中使用到这种模式。

下面我们来实现相关代码。

proto文件定义 #

首先,我们来编写proto文件,定义相关接口。我们在下面的示例中实现一个简单的calculator功能,分别实现加法、求平均值、生成多个随机数三种方法,分别使用客户端流、服务端流、双向流模式。

syntax = "proto3";

package proto;

option go_package = "git.gqnotes.com/guoqiang/grpcexercises/calculator/pb";

service CalculateService {
  // 求和-客户端流式
  rpc Sum(stream SumRequest) returns(SumResponse) {}
  // 生成一定数量的随机数-服务端流式
  rpc RandomNums(RandomNumsRequest) returns(stream RandomNumsResponse) {}
  // 双向流式求平均值
  rpc Average(stream AverageRequest) returns(stream AverageResponse) {}
}

message SumRequest {
  int64 num =1;
}


message SumResponse {
  int64 total = 1;
}

message RandomNumsRequest {
  int64 num =1;
}

message RandomNumsResponse {
  int64 num = 1;
}

message AverageRequest {
  int64 num =1;
}

message AverageResponse {
  float average = 1;
}

生成相关代码 #

我们修改一下Makefile文件,添加如下代码:

calculator-gen:
	 protoc -Icalculator/proto/ --go_out=./calculator --go_opt=module=git.gqnotes.com/guoqiang/grpcexercises/calculator --go-grpc_out=./calculator --go-grpc_opt=module=git.gqnotes.com/guoqiang/grpcexercises/calculator calculator/proto/*.proto

执行make calculator-gen,生成相关代码。

编写服务端代码 #

我们在server目录下新建相关go文件,编写服务端代码。我们先来实现Sum方法,代码如下:

package main

import (
	"git.gqnotes.com/guoqiang/grpcexercises/calculator/pb"
	"io"
)

// Sum 计算和-客户端流式
func (s *Server) Sum(stream pb.CalculateService_SumServer) error {
	var sum int64

	for {
		req, err := stream.Recv()

		if err == io.EOF {
			break
		}

		if err != nil {
			return err
		}

		sum += req.Num
	}

	if err := stream.SendAndClose(&pb.SumResponse{Total: sum}); err != nil {
		return err
	}

	return nil
}

在上面的代码中,我们从stream中读取数据,如果err为io.EOF,则结束循环,否则将读取到的数据累加到sum变量中。最后,我们将计算结果返回给客户端。

下面我们来实现main函数:

package main

import (
	"git.gqnotes.com/guoqiang/grpcexercises/calculator/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
	"log"
	"net"
)

func main() {
	// 实现服务端逻辑,监听5633端口
	lis, err := net.Listen("tcp", ":5633")

	if err != nil {
		log.Fatalf("failed to listen: %v", err)
		return
	}

	s := grpc.NewServer()

	// 反射服务
	reflection.Register(s)

	// 注册服务
	pb.RegisterCalculateServiceServer(s, &Server{})

	log.Println("grpc server start")

	if err = s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
		return
	}
}

type Server struct {
	pb.UnimplementedCalculateServiceServer
}

如你所见,我们启动了一个grpc服务,使用的端口是5633,并且进行了反射注册(这只是为了后面的测试,不是必须的)。

RandomNums和Average方法的视线,大家可以访问代码仓库查看。

编写客户端代码 #

在client目录下新建相关go文件,编写客户端代码。我们先来实现Sum方法,代码如下:

Sum.go
package main

import (
	"context"
	"git.gqnotes.com/guoqiang/grpcexercises/calculator/pb"
	"io"
	"log"
)

// Sum 计算和-客户端流式
func Sum(client pb.CalculateServiceClient) {
	// 构造一个切片
	nums := []int64{1, 2, 3, 4, 5}

	stream, err := client.Sum(context.Background())

	if err != nil {
		log.Fatalf("failed to call: %v", err)
		return
	}

	// 发送数据
	for i := 0; i < len(nums); i++ {
		err = stream.Send(&pb.SumRequest{
			Num: nums[i],
		})

		if err == io.EOF {
			break
		}

		if err != nil {
			log.Fatalf("failed to send: %v", err)
			return
		}
	}

	// 接收数据
	resp, err := stream.CloseAndRecv()

	if err != nil {
		log.Fatalf("failed to recv: %v", err)
	}

	log.Printf("nums:%+v\tresp:%+v\n", nums, resp)
}

main.go
package main

import (
	"git.gqnotes.com/guoqiang/grpcexercises/calculator/pb"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"log"
)

func main() {
	conn, err := grpc.Dial(":5633", grpc.WithTransportCredentials(insecure.NewCredentials()))

	if err != nil {
		log.Fatalf("failed to dial: %v", err)
		return
	}

	defer conn.Close()

	// 实例化客户端
	client := pb.NewCalculateServiceClient(conn)

	Sum(client)

	Average(client)
}

启动服务 #

我们修改一下Makefile文件,添加如下代码:

calculator-build:
	 go build -o ./calculator/bin/server-osx ./calculator/server
	 go build -o ./calculator/bin/client-osx ./calculator/client

执行make calculator-build,生成可执行文件。然后执行./calculator/bin/server-osx启动服务端程序。

在一个新的终端中,执行./calculator/bin/client-osx启动客户端程序,可以看到如下输出:

2023/12/03 10:32:02 nums:[1 2 3 4 5]    resp:total:15
2023/12/03 10:32:02 num:1       recv:average:1
2023/12/03 10:32:02 num:2       recv:average:1.5
2023/12/03 10:32:02 num:3       recv:average:2
2023/12/03 10:32:02 num:4       recv:average:2.5
2023/12/03 10:32:02 num:5       recv:average:3

在第一行,调用的是Sum方法,客户端输入了5个数字:[1 2 3 4 5],服务端返回了这5个数字的和15。而在后面的输出中,调用的是Average方法,客户端输入了5个数字:[1 2 3 4 5],服务端返回的是已经输入的数字的平均值。

使用evans测试接口 #

在上面,我们是通过编写客户端程序来测试接口。如果不想编写客户端代码,指向测试接口,可以使用evans。

我们先修改一下Makefile文件,添加如下代码:

calculator-evans-reflect:
	evans -r repl --port 5633

注意,上面的5633是grpc服务监听的端口。

接下来,执行make calculator-evans-reflect,启动evans,看到如下输出:

gq@gqdeMacBook-Air grpcexercises % make calculator-evans-reflect
evans -r repl --port 5633

  ______
 |  ____|
 | |__    __   __   __ _   _ __    ___
 |  __|   \ \ / /  / _. | | '_ \  / __|
 | |____   \ V /  | (_| | | | | | \__ \
 |______|   \_/    \__,_| |_| |_| |___/

 more expressive universal gRPC client


127.0.0.1:5633> 

我们再来温习一下几个evans的命令:

  • show package:显示包名。
  • package xx :使用xx包。
  • show service:显示服务名。
  • service xx:使用xx服务。
  • show message:显示消息名。
  • call xx:调用方法xx。

我们先执行几个命令,查看一下package和service:

127.0.0.1:5633> show package
+-------------------------+
|         PACKAGE         |
+-------------------------+
| grpc.reflection.v1      |
| grpc.reflection.v1alpha |
| proto                   |
+-------------------------+

127.0.0.1:5633> package proto

proto@127.0.0.1:5633> show service
+------------------+------------+-------------------+--------------------+
|     SERVICE      |    RPC     |   REQUEST TYPE    |   RESPONSE TYPE    |
+------------------+------------+-------------------+--------------------+
| CalculateService | Sum        | SumRequest        | SumResponse        |
| CalculateService | RandomNums | RandomNumsRequest | RandomNumsResponse |
| CalculateService | Average    | AverageRequest    | AverageResponse    |
+------------------+------------+-------------------+--------------------+

proto@127.0.0.1:5633> service CalculateService

proto.CalculateService@127.0.0.1:5633> 

接下来,让我们分别测试一下Sum、RandomNums和Average方法。

Sum:

proto.CalculateService@127.0.0.1:5633> call Sum
num (TYPE_INT64) => 1
num (TYPE_INT64) => 2
num (TYPE_INT64) => 3
num (TYPE_INT64) => 
{
  "total": "6"
}

proto.CalculateService@127.0.0.1:5633> 
当我们已经完成输入后,可以按Ctrl+D来结束输入,再次按Ctrl+D来退出evans。

RandomNums:

proto.CalculateService@127.0.0.1:5633> call RandomNums
num (TYPE_INT64) => 3
{
  "num": "752"
}
{
  "num": "798"
}
{
  "num": "528"
}

proto.CalculateService@127.0.0.1:5633> 

在上面的测试中,我们输入的参数是3,服务端返回了三个随机数。

Average:

proto.CalculateService@127.0.0.1:5633> call Average
num (TYPE_INT64) => 5
num (TYPE_INT64) => {
  "average": 5
}
num (TYPE_INT64) => 2
num (TYPE_INT64) => {
  "average": 3.5
}
num (TYPE_INT64) => 3
num (TYPE_INT64) => {
  "average": 3.3333333
}
num (TYPE_INT64) => 

proto.CalculateService@127.0.0.1:5633> 

在上面的测试中,我们分别输入了三个参数5、2、3,服务端返回了三次,每次返回的都是已经输入的数字的平均值。

感兴趣的朋友,可以查看我的代码仓库地址