gRPC golang开发指南
主要基于官网介绍的文档总结而来。 需要先了解 protocol buffers 为什么使用gRPC通过gPRC,我们可以仅仅定义一次service 到.proto文件中,然后使用gRPC支持的任何开发语言开发客户端或服务器。 样例代码和环境的建立首先要确保golang开发环境的正确配置,go1.5+。 $ go get -u -v google.golang.org/grpc
本人在测试中遇到报错,主要原因在于样例需要 "golang.org/x/net"
"golang.org/x/text"
的支持,本人的解决方法如下 $GOPATH/src/golang.org/x/
目录下,如果golang.org/x/ 不存在则手动创建一个。 git clone https://github.com/golang/net.git
git clone https://github.com/golang/text.git
样例测试$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
$ go run server/server.go
$ go run client/client.go
下面对样例的代码进行分析 服务定义gRPC使用 protocol buffers定义服务。 service RouteGuide {
...
}
然后可以在servie的定义rpc方法,指定对应的request和response类型。gPRC允许开发者定义4中service方法,这4中方法在样例RouteGuide 中都有用到。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field),as the rectangle may cover a large area and contain a // huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
// Accepts a stream of Points on a route being traversed,returning a // RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
样例中的.proto文件包含了服务端方法中使用的request和response类型所使用的类型的协议池消息类型定义( protocol buffer message type definitions )。 // Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
生成客户端和服务端代码根据.proto文件生成客户端和服务端所需的gRPC接口代码 protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
创建服务端服务端代码主要做两方面的工作: 实现RouteGuideAs you can see,our server has a routeGuideServer struct type that implements the generated RouteGuideServer interface: type routeGuideServer struct {
...
}
...
func (s *routeGuideServer) GetFeature(ctx context.Context,point *pb.Point) (*pb.Feature,error) {
...
}
...
func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle,stream pb.RouteGuide_ListFeaturesServer) error {
...
}
...
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
...
}
...
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
...
}
...
Simple RPCGetFeature,从客户端获取一个Point然后从数据库中返回对应的特征信息。 func (s *routeGuideServer) GetFeature(ctx context.Context,error) {
for _,feature := range s.savedFeatures {
if proto.Equal(feature.Location,point) {
return feature,nil
}
}
// No feature was found,return an unnamed feature
return &pb.Feature{"",point},nil
}
这个方法输入参数是一个RPC的context对象以及客户端发过来的点协议池(Point protocol buffer)请求。这个方法返回一个特征协议池(Feature protocol buffer)对象,对象中包含响应信息和错误。在这个方法中,我们为Feature转入了正确的信息然后和nil error一起返回,告诉gRPC服务器已经完成对RPC的处理,Feature可以返回给客户端了。 Server-side streaming RPCListFeatures是一个服务端stream的RPC,所以我们需要返回多个Features到客户端。 func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle,stream pb.RouteGuide_ListFeaturesServer) error {
for _,feature := range s.savedFeatures {
if inRange(feature.Location,rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
可以看出,该方法获取一个request对象以及一个特殊的RouteGuide_ListFeaturesServer 来写相应。这个方法中我们用Send方法把所有需要返回的Feature特征写入到RouteGuide_ListFeaturesServer 中。最后返回一个nil error告诉gRPC服务端已经写好相应。如果期间有什么错误发生,我们返回一个非nil的error,gRPC会转换为正确的RPC状态发送到线路中。 Client-side streaming RPC. func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount,featureCount,distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point,err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,FeatureCount: featureCount,Distance: distance,ElapsedTime: int32(endTime.Sub(startTime).Seconds()),})
}
if err != nil {
return err
}
pointCount++
for _,feature := range s.savedFeatures {
if proto.Equal(feature.Location,point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint,point)
}
lastPoint = point
}
}
在这个方法中,我们使用RouteGuide_RecordRouteServer’s 的Recv方法不停的从客户端的请求中读取数据到requesst对象直到没有数据可读。服务器需要检测每次Recv返回的error,如果是nil,表示这个stream正常可以继续读,如果是io.EOF表示流已经停止了此时服务端可以返回RouteSummary。 Bidirectional streaming RPCfunc (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in,err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
... // look for notes to be sent to client
for _,note := range s.routeNotes[key] {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
这个方法中使用RouteGuide_RouteChatServer 流对象,可以用来读消息和写消息。然而这次我们通过流返回数据的同时客户端仍然在往他们的消息流中写消息。
Starting the serverflag.Parse()
lis,err := net.Listen("tcp",fmt.Sprintf(":%d",*port))
if err != nil {
log.Fatalf("Failed to listen: %v",err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer,&routeGuideServer{})
... // determine whether to use TLS
grpcServer.Serve(lis)
如代码所示,我们创建和启动一个服务器需要下面4个步骤: err := net.Listen("tcp",fmt.Sprintf(":%d",*port)).
grpc.NewServer().
使用TLS func main() {
flag.Parse()
lis,err := net.Listen("tcp",fmt.Sprintf(":%d",*port))
if err != nil {
grpclog.Fatalf("Failed to listen: %v",err)
}
var opts []grpc.ServerOption
if *tls {
creds,err := credentials.NewServerTLSFromFile(*certFile,*keyFile)
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v",err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer,newServer())
grpcServer.Serve(lis)
}
Creating the client创建客户端flag.Parse()
var opts []grpc.DialOption
if *tls {
var sn string
if *serverHostOverride != "" {
sn = *serverHostOverride
}
var creds credentials.TransportCredentials
if *caFile != "" {
var err error
creds,err = credentials.NewClientTLSFromFile(*caFile,sn)
if err != nil {
grpclog.Fatalf("Failed to create TLS credentials %v",err)
}
} else {
creds = credentials.NewClientTLSFromCert(nil,sn)
}
opts = append(opts,grpc.WithTransportCredentials(creds))
} else {
opts = append(opts,grpc.WithInsecure())
}
conn,err := grpc.Dial(*serverAddr,opts...)
if err != nil {
grpclog.Fatalf("fail to dial: %v",err)
}
defer conn.Close()
client := pb.NewRouteGuideClient(conn)
为了能够调用服务端的方法,我们首先创建一个gRPC通道来和服务端沟通。通过传入服务器地址和端口号给grpc.Dial()来创建。如代码,我们还可以使用DialOptions来设置grpc中的认证方法。 Calling service methods对应服务端的四种方法,客户端也要采用不同的调用方法。 Simple RPCfeature,err := client.GetFeature(context.Background(),&pb.Point{409146138,-746188906})
if err != nil {
...
}
从代码中看出,客户端调用方法GetFeature(在),传递协议池(protocol buffer object)对象pb.Point作为参数,同时传递一个context.Context 对象,可以让我们方便的改变RPC的行为,例如超时或取消RPC。 Server-side streaming RPCrect := &pb.Rectangle{ ... } // initialize a pb.Rectangle
stream,err := client.ListFeatures(context.Background(),rect)
if err != nil {
...
}
for {
feature,err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%v.ListFeatures(_) = _,%v",client,err)
}
log.Println(feature)
}
cient.ListFeaturens参见.proto生成的route_guide.pb.go func (c *routeGuideClient) ListFeatures(ctx context.Context,in *Rectangle,opts ...grpc.CallOption) (RouteGuide_ListFeaturesClient,error) {
在这个方法中,同样的传递一个context对象和一个请求,但是返回一个RouteGuide_ListFeaturesClient实例,客户端可以从这个实例中读取得到服务端的响应。 Client-side streaming RPC// Create a random number of random points
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // Traverse at least two points
var points []*pb.Point
for i := 0; i < pointCount; i++ {
points = append(points,randomPoint(r))
}
log.Printf("Traversing %d points.",len(points))
stream,err := client.RecordRoute(context.Background())
if err != nil {
log.Fatalf("%v.RecordRoute(_) = _,%v",err)
}
for _,point := range points {
if err := stream.Send(point); err != nil {
log.Fatalf("%v.Send(%v) = %v",stream,point,err)
}
}
reply,err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v,want %v",err,nil)
}
log.Printf("Route summary: %v",reply)
同样参见route_guide.pb.go中RecordRoute的定义 func (c *routeGuideClient) RecordRoute(ctx context.Context,opts ...grpc.CallOption) (RouteGuide_RecordRouteClient,error) {
stream,err := grpc.NewClientStream(ctx,&_RouteGuide_serviceDesc.Streams[1],c.cc,"/routeguide.RouteGuide/RecordRoute",opts...)
RecordRoute方法仅仅需要传递一个context参数,然后返回一个RouteGuide_RecordRouteClient流对象用于客户端写消息和读消息。 RouteGuide_RecordRouteClient的Send()方法用于向客户端发送请求,一旦完成客户端的所有请求,客户端需要调用CloseAndRecv方法来让gRPC知道客户端已经完成请求并且期望获得一个响应。 Bidirectional streaming RPCstream,err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in,err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v",err)
}
log.Printf("Got message %s at point(%d,%d)",in.Message,in.Location.Latitude,in.Location.Longitude)
}
}()
for _,note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v",err)
}
}
stream.CloseSend()
<-waitc
和RecordRoute类型,方法RouteChat仅需要传递一个context对象,返回一个RouteGuide_RouteChatClient用于客户端读消息和写消息。 func (c *routeGuideClient) RouteChat(ctx context.Context,opts ...grpc.CallOption) (RouteGuide_RouteChatClient,&_RouteGuide_serviceDesc.Streams[2],"/routeguide.RouteGuide/RouteChat",opts...)
不过和RecordRoute不同的是,客户端在往客户端的stream里写消息的同时,服务端也在往服务端的stream中写消息。另外,该方法中客户端中读和写是分开独立运行的,没有先后顺序,还有就是客户端写消息完毕后使用CloseSend而不是CloseAndRecv 后记之前一直在CSDN上写文章,后面会逐步转换到简书上,还请大家多多支持。 (编辑:北几岛) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |