概要
gRPCで4MB以上のデータ転送をしようとすると
rpc error: code = ResourceExhausted desc = grpc: received message larger than max (xxxxxxx vs. 4194304)
のようなエラーが出ます。この上限はデフォルト値なのでgrpc.MaxRecvMsgSize()やgrpc.MaxCallSendMsgSize()を使うことで変更可能ですが、ドキュメントでも以下のように
Protocol Buffers are not designed to handle large messages. As a general rule of thumb, if you are dealing in messages larger than a megabyte each, it may be time to consider an alternate strategy.
ref: Techniques | Protocol Buffers | Google Developers
と、1メッセージがメガバイトを超えるのであれば別の手段で転送することを考えるべき、と言っています。
なので
- gRPCにおける各RPC方式の実装方法【Client streaming RPC】 - Carpe Diem
- gRPCにおける各RPC方式の実装方法【Bidirectional streaming RPC】 - Carpe Diem
のようにストリームで送るのが良さそうです。
環境
- golang 1.11.5
- grpc 1.18.0
- protobuf 3.6.1
- webp 1.0.2
画像変換サーバを作ってみる
要件
クライアントから画像をアップロードするとwebp画像に変換するサーバを作ってみます。
成果物
今回のコードはこちらです。Bidirectional streaming RPCを使っています。
proto定義
service Converter { rpc Convert(stream ConvertRequest) returns (stream ConvertResponse) {} } message ConvertRequest { oneof value { Meta meta = 1; Chunk chunk = 2; } } message Meta { string id = 1; string type = 2; string quality = 3; } message Chunk { bytes data = 1; int64 position = 2; } message ConvertResponse { bytes data = 1; int64 position = 2; }
ポイント
サーバ
長いのでポイントだけ紹介します。
データ受信
func receive(stream pb.Converter_ConvertServer, w io.Writer) (meta, error) { m := meta{} for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return m, err } if mt := resp.GetMeta(); mt != nil { m.qa = mt.Quality } if chunk := resp.GetChunk(); chunk != nil { _, err := w.Write(chunk.Data) if err != nil { return m, err } } } return m, nil }
- メタ情報とチャンクで処理を分ける
データ変換
cmd := exec.Command("cwebp", "-quiet", "-mt", "-q", m.qa, "-o", dst, src) err = cmd.Run() if err != nil { return err }
データ送信
func send(stream pb.Converter_ConvertServer, r io.Reader) error { buf := make([]byte, 1024) for { n, err := r.Read(buf) if err == io.EOF { break } if err != nil { return err } data := &pb.ConvertResponse{ Data: buf, Position: int64(n), } stream.Send(data) } return nil }
- データを細かく(今回は
1024byte
毎に)してレスポンスをストリームで返す - データ転送が完了したら
return nil
してクライアント側に終了を伝える
クライアント
データ送信部分
func send(stream pb.Converter_ConvertClient, src io.Reader, id string) error { meta := &pb.ConvertRequest{ Value: &pb.ConvertRequest_Meta{Meta: &pb.Meta{Id: id, Type: "png", Quality: "90"}}, } stream.Send(meta) buf := make([]byte, bufSize) for { n, err := src.Read(buf) if err == io.EOF { break } if err != nil { return err } data := &pb.ConvertRequest{ Value: &pb.ConvertRequest_Chunk{Chunk: &pb.Chunk{Data: buf, Position: int64(n)}}, } err = stream.Send(data) if err != nil { return err } } err := stream.CloseSend() if err != nil { return err } return nil }
- 最初にメタ情報を送信
- 画像データをチャンクに分けて送信
- データ転送が完了したら
CloseSend
で終了をサーバ側に伝える
データ受信部分
func receive(stream pb.Converter_ConvertClient, dst io.Writer) error { for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } _, err = dst.Write(resp.Data) if err != nil { return err } } return nil }
- サーバのレスポンスが完了するまで(
io.EOF
を受け取るまで)データを書き込む
動作検証
サーバ側を起動しておいて
$ go run main.go
クライアント側を実行してローカルにあるtestimage.png
をwebpに変換させます。
$ go run main.go
13.2MB
という大きな画像ファイルを転送でき、1.2MB
まで圧縮されました!