Carpe Diem

備忘録

gRPCにおける各RPC方式の実装方法【Server streaming RPC】

概要

前回の

gRPCにおける各RPC方式の実装方法【Simple RPC】 - Carpe Diem

に引き続き、今回はServer streaming RPCの実装方法を紹介します。
サーバ側から複数のレスポンスを送ることができるので、フィードなどをReactiveに取得したい時に使ったり、サーバサイドプッシュを使いたい時などで有用です。

環境

  • golang 1.9.2
  • grpc 1.7.2
  • protobuf 3.4.0

成果物

最終的に出来たコードはこちら

github.com

実装

リクエストを送ると、定期的にフィードを取得するサービスを作ります。

proto

レスポンスの方にstreamを付けるのがポイントです。

syntax = "proto3";

package feed;

service Feeder {
  rpc GetNewFeed (Empty) returns (stream FeedResponse) {}
}

message Empty {}

message FeedResponse {
  string message = 1;
}

サーバ

上のprotoをコンパイルすると

type FeederServer interface {
    GetNewFeed(*Empty, Feeder_GetNewFeedServer) error
}

というコードが生成されるので、これを実装します。

package main

import (
    "log"
    "net"
    "time"

    pb "github.com/jun06t/grpc-sample/server-streaming/proto"
    "google.golang.org/grpc"
)

const (
    port = ":8080"
)

type server struct{}

func (s *server) GetNewFeed(in *pb.Empty, stream pb.Feeder_GetNewFeedServer) error {
    feed := []string{"article1", "article2", "article3"}

    for _, v := range feed {
        // 1秒毎にメッセージを送信
        err := stream.Send(&pb.FeedResponse{Message: v})
        if err != nil {
            return err
        }
        time.Sleep(1 * time.Second)
    }

    // RPC終了
    return nil
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatal(err)
    }

    s := grpc.NewServer()
    pb.RegisterFeederServer(s, new(server))
    err = s.Serve(lis)
    if err != nil {
        log.Fatal(err)
    }
}

ポイント

  • stream.Send()でレスポンスを何度も送ることができる
  • return nilでRPCを終了させる

クライアント

package main

import (
    "fmt"
    "io"
    "log"

    pb "github.com/jun06t/grpc-sample/server-streaming/proto"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

const (
    address = "localhost:8080"
)

func main() {
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewFeederClient(conn)

    stream, err := client.GetNewFeed(context.Background(), new(pb.Empty))
    if err != nil {
        log.Fatal(err)
    }
    for {
        article, err := stream.Recv()
        // RPCの終了を検知
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println(article)
    }
}

ポイント

  • stream.Recv()でデータを受け取る
  • err == io.EOFでRPCの終了を検知

動作確認

サーバの起動

$ go run server/main.go 

クライアントの実行

$ go run client/main.go 
message:"article1" 
message:"article2" 
message:"article3" 

1秒毎にメッセージを送信してくれました。

ソース