Carpe Diem

備忘録

gRPCで大きなファイルのやり取りをする

概要

gRPCで4MB以上のデータ転送をしようとすると

rpc error: code = ResourceExhausted desc = grpc: received message larger than max (xxxxxxx vs. 4194304)

のようなエラーが出ます。この上限はデフォルト値なのでgrpc.MaxRecvMsgSize()grpc.MaxCallSendMsgSize()を使うことで変更可能ですが、ドキュメントでも以下のように

Protocol Buffers are not designed to handle large messages. As a general rule of thumb, if you are dealing in messages larger than a megabyte each, it may be time to consider an alternate strategy.

ref: Techniques  |  Protocol Buffers  |  Google Developers

と、1メッセージがメガバイトを超えるのであれば別の手段で転送することを考えるべき、と言っています。

なので

のようにストリームで送るのが良さそうです。

環境

  • golang 1.11.5
  • grpc 1.18.0
  • protobuf 3.6.1
  • webp 1.0.2

画像変換サーバを作ってみる

要件

クライアントから画像をアップロードするとwebp画像に変換するサーバを作ってみます。

成果物

今回のコードはこちらです。Bidirectional streaming RPCを使っています。

github.com

proto定義

service Converter {
  rpc Convert(stream ConvertRequest) returns (stream ConvertResponse) {}
}

message ConvertRequest {
  oneof value {
    Meta  meta  = 1;
    Chunk chunk = 2;
  }
}

message Meta {
  string id      = 1;
  string type    = 2;
  string quality = 3;
}

message Chunk {
  bytes data     = 1;
  int64 position = 2;
}

message ConvertResponse {
  bytes data     = 1;
  int64 position = 2;
}

ポイント

  • rpcのリクエスト・レスポンスの両方にstreamを付ける
  • リクエストにはメタ情報と、分割送信するためのデータチャンクを定義
  • メタ情報は1度きりで良いのでoneofで一方だけ送信するようにする

サーバ

長いのでポイントだけ紹介します。

データ受信

func receive(stream pb.Converter_ConvertServer, w io.Writer) (meta, error) {
    m := meta{}
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return m, err
        }

        if mt := resp.GetMeta(); mt != nil {
            m.qa = mt.Quality
        }
        if chunk := resp.GetChunk(); chunk != nil {
            _, err := w.Write(chunk.Data)
            if err != nil {
                return m, err
            }
        }
    }

    return m, nil
}
  • メタ情報とチャンクで処理を分ける

データ変換

cmd := exec.Command("cwebp", "-quiet", "-mt", "-q", m.qa, "-o", dst, src)
err = cmd.Run()
if err != nil {
    return err
}
  • ローカルに受信したデータを書き出す
  • golangにはwebpのエンコード実装がないので、ローカルのcwebpコマンドで変換する

データ送信

func send(stream pb.Converter_ConvertServer, r io.Reader) error {
    buf := make([]byte, 1024)
    for {
        n, err := r.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        data := &pb.ConvertResponse{
            Data:     buf,
            Position: int64(n),
        }
        stream.Send(data)
    }

    return nil
}
  • データを細かく(今回は1024byte毎に)してレスポンスをストリームで返す
  • データ転送が完了したらreturn nilしてクライアント側に終了を伝える

クライアント

データ送信部分

func send(stream pb.Converter_ConvertClient, src io.Reader, id string) error {
    meta := &pb.ConvertRequest{
        Value: &pb.ConvertRequest_Meta{Meta: &pb.Meta{Id: id, Type: "png", Quality: "90"}},
    }
    stream.Send(meta)

    buf := make([]byte, bufSize)
    for {
        n, err := src.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        data := &pb.ConvertRequest{
            Value: &pb.ConvertRequest_Chunk{Chunk: &pb.Chunk{Data: buf, Position: int64(n)}},
        }
        err = stream.Send(data)
        if err != nil {
            return err
        }
    }

    err := stream.CloseSend()
    if err != nil {
        return err
    }

    return nil
}
  • 最初にメタ情報を送信
  • 画像データをチャンクに分けて送信
  • データ転送が完了したらCloseSendで終了をサーバ側に伝える

データ受信部分

func receive(stream pb.Converter_ConvertClient, dst io.Writer) error {
    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        _, err = dst.Write(resp.Data)
        if err != nil {
            return err
        }
    }

    return nil
}
  • サーバのレスポンスが完了するまで(io.EOFを受け取るまで)データを書き込む

動作検証

サーバ側を起動しておいて

$ go run main.go

クライアント側を実行してローカルにあるtestimage.pngをwebpに変換させます。

$ go run main.go

f:id:quoll00:20190203001240p:plain

13.2MBという大きな画像ファイルを転送でき、1.2MBまで圧縮されました!