Carpe Diem

備忘録

gRPCにおける各RPC方式の実装方法【Bidirectional streaming RPC】

概要

前回の

gRPCにおける各RPC方式の実装方法【Client streaming RPC】 - Carpe Diem

に引き続き、最後はBidirectional streaming RPCの実装方法を紹介します。
チャットのようなリアルタイム通信や、大きなデータを少しずつ処理したい時、Simple RPCで毎回コネクションを張るコストを避けたい時などに有用です。

環境

  • golang 1.9.2
  • grpc 1.7.2
  • protobuf 3.4.0

成果物

最終的に出来たコードはこちら

github.com

実装

リアルタイムで送信した文字列を大文字化するサービスを作ります。

proto

ポイントはリクエスト・レスポンスの両方にstreamを付けることです。

syntax = "proto3";

package uppercase;

service UppercaseService {
  rpc Transform(stream UppercaseRequest) returns (stream UppercaseResponse) {}
}

message UppercaseRequest {
  string message = 1;
}

message UppercaseResponse {
  string message = 1;
}

サーバ

上のprotoをコンパイルすると

type UppercaseServiceServer interface {
        Transform(UppercaseService_TransformServer) error
}

というコードが生成されるので、これを実装します。

package main

import (
    "io"
    "log"
    "net"
    "strings"

    pb "github.com/jun06t/grpc-sample/bidirectional-streaming/proto"
    "google.golang.org/grpc"
)

const (
    port = ":8080"
)

type server struct{}

func (s *server) Transform(stream pb.UppercaseService_TransformServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        resp := &pb.UppercaseResponse{
            Message: strings.ToUpper(in.Message),
        }
        err = stream.Send(resp)
        if err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatal(err)
    }

    s := grpc.NewServer()
    pb.RegisterUppercaseServiceServer(s, new(server))
    err = s.Serve(lis)
    if err != nil {
        log.Fatal(err)
    }
}

ポイント

  • stream.Recv()でデータを取得
  • 変換後stream.Send()データを送信
  • err == io.EOFでリクエストの終了を検知
  • return nilでRPCを終了させる

クライアント

package main

import (
    "io"
    "log"

    pb "github.com/jun06t/grpc-sample/bidirectional-streaming/proto"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

const (
    address = "localhost:8080"
)

func main() {
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewUppercaseServiceClient(conn)

    stream, err := client.Transform(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})

    go receive(done, stream)

    data := []string{"tokyo", "001", "Japan"}
    err = send(data, stream)
    if err != nil {
        log.Fatal(err)
    }

    <-done
}

func send(data []string, stream pb.UppercaseService_TransformClient) (err error) {
    for _, v := range data {
        log.Println("send message: ", v)
        err = stream.Send(&pb.UppercaseRequest{Message: v})
        if err != nil {
            return err
        }
    }
    err = stream.CloseSend()
    if err != nil {
        return err
    }

    return nil
}

func receive(done chan struct{}, stream pb.UppercaseService_TransformClient) {
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            // read done.
            close(done)
            return
        }
        if err != nil {
            log.Fatal(err)
        }
        log.Println("received message: ", resp.Message)
    }
}

ポイント

  • stream.Send()データを送信
  • stream.Recv()で変換後のデータを取得
  • stream.CloseSend()でリクエストを終了させる
  • err == io.EOFでレスポンスの終了を検知

動作確認

サーバの起動

$ go run server/main.go 

クライアントの実行

$ go run client/main.go 
2017/11/13 17:55:31 send message:  tokyo
2017/11/13 17:55:31 send message:  001
2017/11/13 17:55:31 send message:  Japan
2017/11/13 17:55:31 received message:  TOKYO
2017/11/13 17:55:31 received message:  001
2017/11/13 17:55:31 received message:  JAPAN

まとめ

一回のコネクションで複数のリクエストとレスポンスを扱うことができました。
大きなデータも分割すればリアルタイムで処理できるので、Batch処理などにも向いていると思いました。

ソース