grpc学习

背景

gRPC是一个高性能、通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf序列化协议开发,且支持众多开发语言。关于ProtoBuf的详细内容可以参考我的这篇文章

根据官方页面显示,目前支持的语言如下:

也就是说在这些语言平台上,完全可以用grpc实现rpc通信,同时又由于protobuf的编码效率高,所以可以实现替代jsonrpc

HelloWorld

国际惯例,我们先用go语言写一个grpc版本的hellworld。首先要安装grpc库

go get -u google.golang.org/grpc

protobuf定义及编译

首先定义protobuf文件如下:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}


message HelloRequest {
    string name = 1;
}

message HelloReply {
    string msg = 1;
}

几点需要注意的,首先protobuf版本为proto3,其中定义了两个message,分别用于请求和响应,又定义了一个service,其中有一个rpc方法,接受请求返回响应

然后执行下面命令生成pb.go文件

protoc ./helloworld/helloworld.proto --go_out=plugins=grpc:./

我们看一下生成的go代码,首先定义的两个message各自成为一个struct

type HelloRequest struct {
	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}
type HelloReply struct {
	Msg                  string   `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

这两个struct中含有我们定义message时定义的成员变量。之后我们还定义了一个名为Greeter的服务,它变成了一个interface

type GreeterServer interface {
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

这就是要我们写服务时实现这个接口才能正确定义grpc

服务端

服务端代码如下

type server struct {

}

func (s *server)SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error){
	fmt.Println("received:",in.Name)
	return &helloworld.HelloReply{Msg:"hello " + in.Name},nil
}

func main() {
	lis,err:=net.Listen("tcp",":1234")
	if err!=nil {
		log.Fatal("listen:",err.Error())
	}
	s:=grpc.NewServer()
	helloworld.RegisterGreeterServer(s,&server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

和写普通rpc代码类型,先定义一个服务类,不过这里要实现我们定义的接口。在main入口里可以发现grpc还是基于tcp传输的,这里的逻辑是定义tcp连接和grpc服务,然后将我们刚才写的服务类注册到grpc中,最后用grpc提供的方法去接管tcp连接即可

客户端

客户端代码如下

func main() {
	conn,err:=grpc.Dial("127.0.0.1:1234",grpc.WithInsecure())
	if err!=nil {
		log.Fatal("dial:",err.Error())
	}
	defer conn.Close()

	c:=helloworld.NewGreeterClient(conn)

	ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
	defer cancel()

	r,err:=c.SayHello(ctx,&helloworld.HelloRequest{Name:"world"})
	if err!=nil {
		log.Fatal("dial:",err.Error())
	}
	fmt.Println("greeting",r.Msg)
}

可能是为了降低大家学习成本,grpc的接口设置和go rpc标准包类似。首先利用grpc提供的接口去发起一个连接。接下来,grpc很方便的帮我们包装了一个client类,其中有我们可以调用的rpc方法,省去了之前用字符串表示的麻烦,随后就像函数调用一样去调用远程方法即可,返回一个响应,然后我们从响应中读数据。

有一个小问题需要注意的是,用grpc去发起连接时,目标地址如果是本机地址的话不能像go标准包那样简写为”:1234”,需要像上面代码那样写出全名。

最后先运行服务端代码再运行客户端代码,就成功实现利用grpc通信。该节示例代码见这里

grpc相关概念

服务类型

grpc定义了四种服务

单项rpc

就是最普通的请求响应模式

rpc SayHello(HelloRequest) returns (HelloResponse){
}

服务端流式rpc

客户端向服务端发起一个请求,服务端返回一个数据流响应

rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){
}

客户端流式rpc

客户端向服务端发送数据流请求,客户端仅返回一个响应

rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {
}

双向流式rpc

客户端与服务端都以数据流形式通信

rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){
}

RPC终止与取消

在客户端与服务端,二者对调用的成功与否是独立,可能会出现不同的判断。客户端与服务端可在任何时间取消一个rpc,rpc会立即终止,但只影响之后状态,取消前完成的不会回滚。

超时

客户端可以指定超时时间,超过这个时间没有响应测返回DEADLINE_EXCEEDED错误,服务端也可查询还有多久去完成响应。

详细内容

实际上上面的helloworld已经可以说明grpc的基本使用,下面只是针对另外几种rpc服务类型进行介绍,这里引用官方的例子,完整代码见这里

protobuf定义

service RouteGuide {

    rpc GetFeature(Point) returns (Feature) {}

    rpc ListFeatures(Rectangle) returns (stream Feature) {}

    rpc RecordRoute(stream Point) returns (RouteSummary) {}

    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

message Point {
    int32 latitude = 1;
    int32 longitude = 2;
}

message Rectangle {
    Point lo = 1;

    Point hi = 2;
}

message Feature {
    string name = 1;

    Point location = 2;
}

message RouteNote {
    Point location = 1;

    string message = 2;
}

message RouteSummary {
    int32 point_count = 1;

    int32 feature_count = 2;

    int32 distance = 3;

    int32 elapsed_time = 4;
}

如上所述,我们对四种rpc分别定义了一个方法。编译命令如下

protoc --go_out=plugins=grpc:./ route/route.proto

创建服务端

回看刚才生成的代码,有这样一个接口

type RouteGuideServer interface {
	GetFeature(context.Context, *Point) (*Feature, error)
	ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error
	RecordRoute(RouteGuide_RecordRouteServer) error
	RouteChat(RouteGuide_RouteChatServer) error
}

我们的服务端首先要创建一个类实现这个接口

普通rpc

func (s *routeGuideServer)GetFeature(ctx context.Context, point *route.Point) (*route.Feature, error){
	for _,feature:=range s.savedFeatures{
		if proto.Equal(feature.Location,point){
			return feature,nil
		}
	}
	return &route.Feature{Location:point},nil
}

很简单,参数中的Point和Feature都是我们定义的message,通过输入一个point,然后遍历feature集合,查找符合条件的feature。注意比较两个message可以使用proto包提供的equal方法

服务端流式rpc

func (s *routeGuideServer)ListFeatures(rect *route.Rectangle, stream route.RouteGuide_ListFeaturesServer) error{
	for _,feature := range s.savedFeatures{
		if inRange(feature.Location,rect) {
			if err:=stream.Send(feature);err!=nil{
				return err
			}
		}
	}
	return nil
}

形象的说,这种rpc就是向服务端发送一个请求,服务端以流的形式返回数据。如代码所示,我们指定一个区域,然后判断所有保存的feature是否在区域内,每判断一个,如果符合条件就以send形式发出。grpc为我们做了很形象的包装,对于输出流,我们只需send即可,不必考虑缓存,同步之类的问题,非常方便。唯一需要注意的是,每send一个数据,就需要判断是否报错,然后随时停止。

客户端流式rpc

func (s *routeGuideServer)RecordRoute(stream route.RouteGuide_RecordRouteServer) error{
	var pointCount, featureCount, distance int32
	var lastPoint *route.Point
	startTime := time.Now()
	for {
		point,err:=stream.Recv()
		if err == io.EOF{
			endTime := time.Now()
			return stream.SendAndClose(&route.RouteSummary{
				PointCount:pointCount,
				FeatureCount:featureCount,
				Distance:distance,
				ElapsedTime:int32(endTime.Sub(startTime).Seconds()),
			})
		}
		if err!=nil{
			return err
		}
		pointCount++
		for _,feature := range s.savedFeatures{
			if proto.Equal(feature.Location,point){
				featureCount++
			}
		}
		if lastPoint != nil{
			distance += calcDistance(lastPoint,point)
		}
		lastPoint = point
	}
}

我们定义的是客户端给服务端发送流式数据,然后服务端返回一个响应。如代码所示,grpc也给我提供了一个形象的封装,对于输入流,我们只需要Recv即可,然后判断输入流是否结束,通过EOF标志位,如果结束返回一个响应。注意这里返回的方法是通过调用SendAndClose方法。通过查看SendAndClose实现,他和上面的send方法一样,都是调用了SendMsg方法。

这段实现的大概意思是,客户端发送一个流,流中包含多个point数据的路径,我们遍历这个流,判断每个point是否在已有的feature集合中,然后计算路径长度,最后返回一个RouteSummary信息。

双向流式rpc

func (s *routeGuideServer)RouteChat(stream route.RouteGuide_RouteChatServer) error{
	for{
		in,err:=stream.Recv()
		if err==io.EOF{
			return nil
		}
		if err!=nil{
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key],in)
		rn :=make([]*route.RouteNote,len(s.routeNotes[key]))
		copy(rn,s.routeNotes[key])
		s.mu.Unlock()

		for _,note:=range rn{
			if err:=stream.Send(note);err!=nil {
				return err
			}
		}
	}
}

既然是双向流,那么就是即能收也能发,如代码所示,每次循环通过Recv从客户端接受一个数据,经过一系列处理后通过send给客户端发送数据。最后知道客户端发送完毕或中间报错为止

最后服务端main方法如下

func newServer() *routeGuideServer {
	s := &routeGuideServer{routeNotes: make(map[string][]*route.RouteNote)}
	data := exampleData
	if err := json.Unmarshal(data, &s.savedFeatures); err != nil {
		log.Fatalf("Failed to load default features: %v", err)
	}
	return s
}

func main() {
	lis,err:=net.Listen("tcp",":1234")
	if err!=nil{
		log.Fatal("listen error:",err.Error())
	}
	server:=grpc.NewServer()
	route.RegisterRouteGuideServer(server,newServer())
	server.Serve(lis)
}

和helloworld一样,显示创建grpc服务实例,然后注册服务,最后让grpc接管tcp连接

创建客户端

首先创建连接部分还是和helloworld一样

conn,err:=grpc.Dial("127.0.0.1:1234",grpc.WithInsecure())
if err!=nil{
	log.Fatal("dail error",err.Error())
}
defer conn.Close()
client:=pb.NewRouteGuideClient(conn)

我们接下来还是对四种服务进行调用

普通rpc

printFeature(client, &pb.Point{Latitude: 409146138, Longitude: -746188906})

func printFeature(client pb.RouteGuideClient, point *pb.Point) {
	log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	feature, err := client.GetFeature(ctx, point)
	if err != nil {
		log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
	}
	log.Println(feature)
}

对于普通的rpc我们当做函数调用即可,输入一个point返回一个feature

服务端流式rpc

printFeatures(client, &pb.Rectangle{
	Lo: &pb.Point{Latitude: 400000000, Longitude: -750000000},
	Hi: &pb.Point{Latitude: 420000000, Longitude: -730000000},
})

func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
	log.Printf("Looking for features within %v", rect)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.ListFeatures(ctx, rect)
	if err != nil {
		log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
	}
	for {
		feature, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
		}
		log.Println(feature)
	}
}

前面说过,这种类型的rpc是客户端发送一个请求,服务端返回一个数据流。这里我们调用ListFeatures方法,给我们返回一个流,然后我们遍历这个流,通过Recv()一次接受一个数据,并通过EOF标志位判断流是否结束

客户端流式rpc

runRecordRoute(client)

func runRecordRoute(client pb.RouteGuideClient) {
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	pointCount := int(r.Int31n(100)) + 2 
	var points []*pb.Point
	for i := 0; i < pointCount; i++ {
		points = append(points, randomPoint(r))
	}
	log.Printf("Traversing %d points.", len(points))
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RecordRoute(ctx)
	if err != nil {
		log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
	}
	for _, point := range points {
		if err := stream.Send(point); err != nil {
			log.Fatalf("%v.Send(%v) = %v", stream, point, err)
		}
	}
	reply, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
	}
	log.Printf("Route summary: %v", reply)
}

对于这种类型,客户端是发送一系列数据,而服务端返回一个响应,如上面代码所述,我们先随机一些point,然后通过send方法一个一个发送给客户端,发送完后,调用CloseAndRecv关闭流然后等待服务端响应。CloseAndRecv和服务端使用的SendAndClose相对应。

双向流式rpc

runRouteChat(client)

func runRouteChat(client pb.RouteGuideClient) {
	notes := []*pb.RouteNote{
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
		{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := client.RouteChat(ctx)
	if err != nil {
		log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a note : %v", err)
			}
			log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
		}
	}()
	for _, note := range notes {
		if err := stream.Send(note); err != nil {
			log.Fatalf("Failed to send a note: %v", err)
		}
	}
	stream.CloseSend()
	<-waitc
}

由于客户端与服务端都是收发流数据,在服务端是收到一个数据处理一个然后返回一个,所以我们在客户端不断的发送数据的同时,也启动一个goroutine去接受数据,在发送完数据后要关闭输出流。

总结

  1. 对于普通rpc,客户端就当做方法调用一样去调用某个远端的方法,服务端接受数据后按照返回参数返回值即可
  2. 对于服务端流式rpc,客户端通过调用方法发送一个数据,然后利用返回值stream去不断接受数据;而服务端在接受到请求后,不断用send发送数据。
  3. 对于客户端流式rpc,客户端通过调用方法获得一个stream后,不断的send发送数据,发送完毕后调用CloseAndRecv关闭输出流并等待响应;而服务端不断使用Recv去接受数据,接受完之后调用SendAndClose关闭输入流发送响应
  4. 对于双向流式rpc,客户端通过调用方法获得一个stream后,不断的send发送数据,并在发送的时候启动一个goroutine通过Recv接受数据;而服务端则不断的接受数据、处理数据、返回数据。客户端在发送完毕后注意通过CloseSend关闭输出流