grpc学习
背景
gRPC是一个高性能、通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf序列化协议开发,且支持众多开发语言。关于ProtoBuf的详细内容可以参考我的这篇文章
根据官方页面显示,目前支持的语言如下:
也就是说在这些语言平台上,完全可以用grpc实现rpc通信,同时又由于protobuf的编码效率高,所以可以实现替代jsonrpc
HelloWorld
国际惯例,我们先用go语言写一个grpc版本的hellworld。首先要安装grpc库
go get -u google.golang.org/grpc
protobuf定义及编译
首先定义protobuf文件如下:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string msg = 1;
}
几点需要注意的,首先protobuf版本为proto3,其中定义了两个message,分别用于请求和响应,又定义了一个service,其中有一个rpc方法,接受请求返回响应
然后执行下面命令生成pb.go文件
protoc ./helloworld/helloworld.proto --go_out=plugins=grpc:./
我们看一下生成的go代码,首先定义的两个message各自成为一个struct
type HelloRequest struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
type HelloReply struct {
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
这两个struct中含有我们定义message时定义的成员变量。之后我们还定义了一个名为Greeter的服务,它变成了一个interface
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}
这就是要我们写服务时实现这个接口才能正确定义grpc
服务端
服务端代码如下
type server struct {
}
func (s *server)SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error){
fmt.Println("received:",in.Name)
return &helloworld.HelloReply{Msg:"hello " + in.Name},nil
}
func main() {
lis,err:=net.Listen("tcp",":1234")
if err!=nil {
log.Fatal("listen:",err.Error())
}
s:=grpc.NewServer()
helloworld.RegisterGreeterServer(s,&server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
和写普通rpc代码类型,先定义一个服务类,不过这里要实现我们定义的接口。在main入口里可以发现grpc还是基于tcp传输的,这里的逻辑是定义tcp连接和grpc服务,然后将我们刚才写的服务类注册到grpc中,最后用grpc提供的方法去接管tcp连接即可
客户端
客户端代码如下
func main() {
conn,err:=grpc.Dial("127.0.0.1:1234",grpc.WithInsecure())
if err!=nil {
log.Fatal("dial:",err.Error())
}
defer conn.Close()
c:=helloworld.NewGreeterClient(conn)
ctx,cancel:=context.WithTimeout(context.Background(),time.Second)
defer cancel()
r,err:=c.SayHello(ctx,&helloworld.HelloRequest{Name:"world"})
if err!=nil {
log.Fatal("dial:",err.Error())
}
fmt.Println("greeting",r.Msg)
}
可能是为了降低大家学习成本,grpc的接口设置和go rpc标准包类似。首先利用grpc提供的接口去发起一个连接。接下来,grpc很方便的帮我们包装了一个client类,其中有我们可以调用的rpc方法,省去了之前用字符串表示的麻烦,随后就像函数调用一样去调用远程方法即可,返回一个响应,然后我们从响应中读数据。
有一个小问题需要注意的是,用grpc去发起连接时,目标地址如果是本机地址的话不能像go标准包那样简写为”:1234”,需要像上面代码那样写出全名。
最后先运行服务端代码再运行客户端代码,就成功实现利用grpc通信。该节示例代码见这里
grpc相关概念
服务类型
grpc定义了四种服务
单项rpc
就是最普通的请求响应模式
rpc SayHello(HelloRequest) returns (HelloResponse){
}
服务端流式rpc
客户端向服务端发起一个请求,服务端返回一个数据流响应
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){
}
客户端流式rpc
客户端向服务端发送数据流请求,客户端仅返回一个响应
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) {
}
双向流式rpc
客户端与服务端都以数据流形式通信
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){
}
RPC终止与取消
在客户端与服务端,二者对调用的成功与否是独立,可能会出现不同的判断。客户端与服务端可在任何时间取消一个rpc,rpc会立即终止,但只影响之后状态,取消前完成的不会回滚。
超时
客户端可以指定超时时间,超过这个时间没有响应测返回DEADLINE_EXCEEDED错误,服务端也可查询还有多久去完成响应。
详细内容
实际上上面的helloworld已经可以说明grpc的基本使用,下面只是针对另外几种rpc服务类型进行介绍,这里引用官方的例子,完整代码见这里。
protobuf定义
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
rpc RecordRoute(stream Point) returns (RouteSummary) {}
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message Feature {
string name = 1;
Point location = 2;
}
message RouteNote {
Point location = 1;
string message = 2;
}
message RouteSummary {
int32 point_count = 1;
int32 feature_count = 2;
int32 distance = 3;
int32 elapsed_time = 4;
}
如上所述,我们对四种rpc分别定义了一个方法。编译命令如下
protoc --go_out=plugins=grpc:./ route/route.proto
创建服务端
回看刚才生成的代码,有这样一个接口
type RouteGuideServer interface {
GetFeature(context.Context, *Point) (*Feature, error)
ListFeatures(*Rectangle, RouteGuide_ListFeaturesServer) error
RecordRoute(RouteGuide_RecordRouteServer) error
RouteChat(RouteGuide_RouteChatServer) error
}
我们的服务端首先要创建一个类实现这个接口
普通rpc
func (s *routeGuideServer)GetFeature(ctx context.Context, point *route.Point) (*route.Feature, error){
for _,feature:=range s.savedFeatures{
if proto.Equal(feature.Location,point){
return feature,nil
}
}
return &route.Feature{Location:point},nil
}
很简单,参数中的Point和Feature都是我们定义的message,通过输入一个point,然后遍历feature集合,查找符合条件的feature。注意比较两个message可以使用proto包提供的equal方法
服务端流式rpc
func (s *routeGuideServer)ListFeatures(rect *route.Rectangle, stream route.RouteGuide_ListFeaturesServer) error{
for _,feature := range s.savedFeatures{
if inRange(feature.Location,rect) {
if err:=stream.Send(feature);err!=nil{
return err
}
}
}
return nil
}
形象的说,这种rpc就是向服务端发送一个请求,服务端以流的形式返回数据。如代码所示,我们指定一个区域,然后判断所有保存的feature是否在区域内,每判断一个,如果符合条件就以send形式发出。grpc为我们做了很形象的包装,对于输出流,我们只需send即可,不必考虑缓存,同步之类的问题,非常方便。唯一需要注意的是,每send一个数据,就需要判断是否报错,然后随时停止。
客户端流式rpc
func (s *routeGuideServer)RecordRoute(stream route.RouteGuide_RecordRouteServer) error{
var pointCount, featureCount, distance int32
var lastPoint *route.Point
startTime := time.Now()
for {
point,err:=stream.Recv()
if err == io.EOF{
endTime := time.Now()
return stream.SendAndClose(&route.RouteSummary{
PointCount:pointCount,
FeatureCount:featureCount,
Distance:distance,
ElapsedTime:int32(endTime.Sub(startTime).Seconds()),
})
}
if err!=nil{
return err
}
pointCount++
for _,feature := range s.savedFeatures{
if proto.Equal(feature.Location,point){
featureCount++
}
}
if lastPoint != nil{
distance += calcDistance(lastPoint,point)
}
lastPoint = point
}
}
我们定义的是客户端给服务端发送流式数据,然后服务端返回一个响应。如代码所示,grpc也给我提供了一个形象的封装,对于输入流,我们只需要Recv即可,然后判断输入流是否结束,通过EOF标志位,如果结束返回一个响应。注意这里返回的方法是通过调用SendAndClose方法。通过查看SendAndClose实现,他和上面的send方法一样,都是调用了SendMsg方法。
这段实现的大概意思是,客户端发送一个流,流中包含多个point数据的路径,我们遍历这个流,判断每个point是否在已有的feature集合中,然后计算路径长度,最后返回一个RouteSummary信息。
双向流式rpc
func (s *routeGuideServer)RouteChat(stream route.RouteGuide_RouteChatServer) error{
for{
in,err:=stream.Recv()
if err==io.EOF{
return nil
}
if err!=nil{
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key],in)
rn :=make([]*route.RouteNote,len(s.routeNotes[key]))
copy(rn,s.routeNotes[key])
s.mu.Unlock()
for _,note:=range rn{
if err:=stream.Send(note);err!=nil {
return err
}
}
}
}
既然是双向流,那么就是即能收也能发,如代码所示,每次循环通过Recv从客户端接受一个数据,经过一系列处理后通过send给客户端发送数据。最后知道客户端发送完毕或中间报错为止
最后服务端main方法如下
func newServer() *routeGuideServer {
s := &routeGuideServer{routeNotes: make(map[string][]*route.RouteNote)}
data := exampleData
if err := json.Unmarshal(data, &s.savedFeatures); err != nil {
log.Fatalf("Failed to load default features: %v", err)
}
return s
}
func main() {
lis,err:=net.Listen("tcp",":1234")
if err!=nil{
log.Fatal("listen error:",err.Error())
}
server:=grpc.NewServer()
route.RegisterRouteGuideServer(server,newServer())
server.Serve(lis)
}
和helloworld一样,显示创建grpc服务实例,然后注册服务,最后让grpc接管tcp连接
创建客户端
首先创建连接部分还是和helloworld一样
conn,err:=grpc.Dial("127.0.0.1:1234",grpc.WithInsecure())
if err!=nil{
log.Fatal("dail error",err.Error())
}
defer conn.Close()
client:=pb.NewRouteGuideClient(conn)
我们接下来还是对四种服务进行调用
普通rpc
printFeature(client, &pb.Point{Latitude: 409146138, Longitude: -746188906})
func printFeature(client pb.RouteGuideClient, point *pb.Point) {
log.Printf("Getting feature for point (%d, %d)", point.Latitude, point.Longitude)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
feature, err := client.GetFeature(ctx, point)
if err != nil {
log.Fatalf("%v.GetFeatures(_) = _, %v: ", client, err)
}
log.Println(feature)
}
对于普通的rpc我们当做函数调用即可,输入一个point返回一个feature
服务端流式rpc
printFeatures(client, &pb.Rectangle{
Lo: &pb.Point{Latitude: 400000000, Longitude: -750000000},
Hi: &pb.Point{Latitude: 420000000, Longitude: -730000000},
})
func printFeatures(client pb.RouteGuideClient, rect *pb.Rectangle) {
log.Printf("Looking for features within %v", rect)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.ListFeatures(ctx, rect)
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
for {
feature, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
}
log.Println(feature)
}
}
前面说过,这种类型的rpc是客户端发送一个请求,服务端返回一个数据流。这里我们调用ListFeatures方法,给我们返回一个流,然后我们遍历这个流,通过Recv()一次接受一个数据,并通过EOF标志位判断流是否结束
客户端流式rpc
runRecordRoute(client)
func runRecordRoute(client pb.RouteGuideClient) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.RecordRoute(ctx)
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v", stream, point, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)
}
对于这种类型,客户端是发送一系列数据,而服务端返回一个响应,如上面代码所述,我们先随机一些point,然后通过send方法一个一个发送给客户端,发送完后,调用CloseAndRecv关闭流然后等待服务端响应。CloseAndRecv和服务端使用的SendAndClose相对应。
双向流式rpc
runRouteChat(client)
func runRouteChat(client pb.RouteGuideClient) {
notes := []*pb.RouteNote{
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.RouteChat(ctx)
if err != nil {
log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
}
由于客户端与服务端都是收发流数据,在服务端是收到一个数据处理一个然后返回一个,所以我们在客户端不断的发送数据的同时,也启动一个goroutine去接受数据,在发送完数据后要关闭输出流。
总结
- 对于普通rpc,客户端就当做方法调用一样去调用某个远端的方法,服务端接受数据后按照返回参数返回值即可
- 对于服务端流式rpc,客户端通过调用方法发送一个数据,然后利用返回值stream去不断接受数据;而服务端在接受到请求后,不断用send发送数据。
- 对于客户端流式rpc,客户端通过调用方法获得一个stream后,不断的send发送数据,发送完毕后调用CloseAndRecv关闭输出流并等待响应;而服务端不断使用Recv去接受数据,接受完之后调用SendAndClose关闭输入流发送响应
- 对于双向流式rpc,客户端通过调用方法获得一个stream后,不断的send发送数据,并在发送的时候启动一个goroutine通过Recv接受数据;而服务端则不断的接受数据、处理数据、返回数据。客户端在发送完毕后注意通过CloseSend关闭输出流