概要
前回の
gRPCにおける各RPC方式の実装方法【Simple RPC】 - Carpe Diem
に引き続き、今回はServer streaming RPCの実装方法を紹介します。
サーバ側から複数のレスポンスを送ることができるので、フィードなどをReactiveに取得したい時に使ったり、サーバサイドプッシュを使いたい時などで有用です。
環境
- golang 1.9.2
- grpc 1.7.2
- protobuf 3.4.0
成果物
最終的に出来たコードはこちら
実装
リクエストを送ると、定期的にフィードを取得するサービスを作ります。
proto
レスポンスの方にstream
を付けるのがポイントです。
syntax = "proto3"; package feed; service Feeder { rpc GetNewFeed (Empty) returns (stream FeedResponse) {} } message Empty {} message FeedResponse { string message = 1; }
サーバ
上のprotoをコンパイルすると
type FeederServer interface { GetNewFeed(*Empty, Feeder_GetNewFeedServer) error }
というコードが生成されるので、これを実装します。
package main import ( "log" "net" "time" pb "github.com/jun06t/grpc-sample/server-streaming/proto" "google.golang.org/grpc" ) const ( port = ":8080" ) type server struct{} func (s *server) GetNewFeed(in *pb.Empty, stream pb.Feeder_GetNewFeedServer) error { feed := []string{"article1", "article2", "article3"} for _, v := range feed { // 1秒毎にメッセージを送信 err := stream.Send(&pb.FeedResponse{Message: v}) if err != nil { return err } time.Sleep(1 * time.Second) } // RPC終了 return nil } func main() { lis, err := net.Listen("tcp", port) if err != nil { log.Fatal(err) } s := grpc.NewServer() pb.RegisterFeederServer(s, new(server)) err = s.Serve(lis) if err != nil { log.Fatal(err) } }
ポイント
stream.Send()
でレスポンスを何度も送ることができるreturn nil
でRPCを終了させる
クライアント
package main import ( "fmt" "io" "log" pb "github.com/jun06t/grpc-sample/server-streaming/proto" "golang.org/x/net/context" "google.golang.org/grpc" ) const ( address = "localhost:8080" ) func main() { conn, err := grpc.Dial(address, grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close() client := pb.NewFeederClient(conn) stream, err := client.GetNewFeed(context.Background(), new(pb.Empty)) if err != nil { log.Fatal(err) } for { article, err := stream.Recv() // RPCの終了を検知 if err == io.EOF { break } if err != nil { log.Fatal(err) } fmt.Println(article) } }
ポイント
stream.Recv()
でデータを受け取るerr == io.EOF
でRPCの終了を検知
動作確認
サーバの起動
$ go run server/main.go
クライアントの実行
$ go run client/main.go message:"article1" message:"article2" message:"article3"
1秒毎にメッセージを送信してくれました。