概要
前回の
gRPCにおける各RPC方式の実装方法【Client streaming RPC】 - Carpe Diem
に引き続き、最後はBidirectional streaming RPCの実装方法を紹介します。
チャットのようなリアルタイム通信や、大きなデータを少しずつ処理したい時、Simple RPCで毎回コネクションを張るコストを避けたい時などに有用です。
環境
- golang 1.9.2
- grpc 1.7.2
- protobuf 3.4.0
成果物
最終的に出来たコードはこちら
実装
リアルタイムで送信した文字列を大文字化するサービスを作ります。
proto
ポイントはリクエスト・レスポンスの両方にstream
を付けることです。
syntax = "proto3"; package uppercase; service UppercaseService { rpc Transform(stream UppercaseRequest) returns (stream UppercaseResponse) {} } message UppercaseRequest { string message = 1; } message UppercaseResponse { string message = 1; }
サーバ
上のprotoをコンパイルすると
type UppercaseServiceServer interface { Transform(UppercaseService_TransformServer) error }
というコードが生成されるので、これを実装します。
package main import ( "io" "log" "net" "strings" pb "github.com/jun06t/grpc-sample/bidirectional-streaming/proto" "google.golang.org/grpc" ) const ( port = ":8080" ) type server struct{} func (s *server) Transform(stream pb.UppercaseService_TransformServer) error { for { in, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } resp := &pb.UppercaseResponse{ Message: strings.ToUpper(in.Message), } err = stream.Send(resp) if err != nil { return err } } } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatal(err) } s := grpc.NewServer() pb.RegisterUppercaseServiceServer(s, new(server)) err = s.Serve(lis) if err != nil { log.Fatal(err) } }
ポイント
stream.Recv()
でデータを取得- 変換後
stream.Send()
データを送信 err == io.EOF
でリクエストの終了を検知return nil
でRPCを終了させる
クライアント
package main import ( "io" "log" pb "github.com/jun06t/grpc-sample/bidirectional-streaming/proto" "golang.org/x/net/context" "google.golang.org/grpc" ) const ( address = "localhost:8080" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := pb.NewUppercaseServiceClient(conn) stream, err := client.Transform(context.Background()) if err != nil { log.Fatal(err) } done := make(chan struct{}) go receive(done, stream) data := []string{"tokyo", "001", "Japan"} err = send(data, stream) if err != nil { log.Fatal(err) } <-done } func send(data []string, stream pb.UppercaseService_TransformClient) (err error) { for _, v := range data { log.Println("send message: ", v) err = stream.Send(&pb.UppercaseRequest{Message: v}) if err != nil { return err } } err = stream.CloseSend() if err != nil { return err } return nil } func receive(done chan struct{}, stream pb.UppercaseService_TransformClient) { for { resp, err := stream.Recv() if err == io.EOF { // read done. close(done) return } if err != nil { log.Fatal(err) } log.Println("received message: ", resp.Message) } }
ポイント
stream.Send()
データを送信stream.Recv()
で変換後のデータを取得stream.CloseSend()
でリクエストを終了させるerr == io.EOF
でレスポンスの終了を検知
動作確認
サーバの起動
$ go run server/main.go
クライアントの実行
$ go run client/main.go 2017/11/13 17:55:31 send message: tokyo 2017/11/13 17:55:31 send message: 001 2017/11/13 17:55:31 send message: Japan 2017/11/13 17:55:31 received message: TOKYO 2017/11/13 17:55:31 received message: 001 2017/11/13 17:55:31 received message: JAPAN
まとめ
一回のコネクションで複数のリクエストとレスポンスを扱うことができました。
大きなデータも分割すればリアルタイムで処理できるので、Batch処理などにも向いていると思いました。