Carpe Diem

備忘録。https://github.com/jun06t

gRPCにおける各RPC方式の実装方法【Client streaming RPC】

概要

前回の

gRPCにおける各RPC方式の実装方法【Server streaming RPC】 - Carpe Diem

に引き続き、今回はClient streaming RPCの実装方法を紹介します。
こちらは大きなリクエストを分割して送りたい時に有用です。

環境

  • golang 1.9.2
  • grpc 1.7.2
  • protobuf 3.4.0

成果物

最終的に出来たコードはこちら

github.com

実装

画像をアップロードしてサーバ側で保存するサービスを作ります。

proto

ポイントはリクエストの方にstreamを付けることです。

syntax = "proto3";

package upload;

service Uploader {
  rpc Upload(stream Chunk) returns (UploadResponse) {}
}

message Chunk {
  bytes data = 1;
}

message UploadResponse {
  string status = 1;
}

サーバ

上のprotoをコンパイルすると

type UploaderServer interface {
    Upload(Uploader_UploadServer) error
}

というコードが生成されるので、これを実装します。

package main

import (
    "io"
    "log"
    "net"
    "os"

    pb "github.com/jun06t/grpc-sample/client-streaming/proto"
    "google.golang.org/grpc"
)

const (
    port = ":8080"
)

type server struct{}

func (s *server) Upload(stream pb.Uploader_UploadServer) error {
    file, err := os.Create("supercar.jpg")
    if err != nil {
        return err
    }
    defer file.Close()

    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        file.Write(resp.Data)
    }

    err = stream.SendAndClose(&pb.UploadResponse{"success"})
    if err != nil {
        return err
    }

    return nil
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatal(err)
    }

    s := grpc.NewServer()
    pb.RegisterUploaderServer(s, new(server))
    err = s.Serve(lis)
    if err != nil {
        log.Fatal(err)
    }
}

ポイント

  • stream.Recv()でデータを受け取る
  • err == io.EOFでリクエストの終了を検知
  • stream.SendAndClose()でレスポンスを送ってRPCを終了させる

クライアント

package main

import (
    "io"
    "log"
    "os"

    pb "github.com/jun06t/grpc-sample/client-streaming/proto"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
)

const (
    address = "localhost:8080"
)

func main() {
    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewUploaderClient(conn)
    stream, err := client.Upload(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    err = upload(stream)
    if err != nil {
        log.Fatal(err)
    }
}

func upload(stream pb.Uploader_UploadClient) error {
    file, err := os.Open("supercar.jpg")
    if err != nil {
        return err
    }
    defer file.Close()

    buf := make([]byte, 1024)
    for {
        _, err := file.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        stream.Send(&pb.Chunk{Data: buf})
    }

    resp, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }

    log.Println(resp.Status)
    return nil
}

ポイント

  • fileをbufに分割して送信している
  • stream.Send()でデータを複数回送る
  • stream.CloseAndRecv()でリクエストの完了を通知し、レスポンスを受け取る

動作確認

サーバの起動

$ go run server/main.go 

クライアントの実行

$ go run main.go 
2017/11/13 00:46:16 success

サーバ側のディレクトリを見てみると、supercar.jpgというファイルが保存されています。

$ md5 supercar.jpg 
MD5 (supercar.jpg) = fb7a18198978208362486a1ca31c0cd4

ハッシュ値もクライアントと一致しています。

まとめ

Protocol Buffersは大きなデータを送信するよう設計されていないので、画像のような大きなデータはこのように分割してStreamで扱うのが良いです。

ソース