Carpe Diem

備忘録

GCPのCloud PubSubで考慮すること

概要

GCPのCloud PubSubはメッセージング基盤として非常に有用です。
ただし利用する上で考慮すべきことも多々あるのでまとめておきます。

パラメータに関しては主にGoのクライアントライブラリをベースに説明します。

環境

  • Go 1.15.2
  • google-cloud-go v0.72.0

SLO/SLA

Cloud PubSubのSLOは99.95%です。
自身が提供するサービスのSLOを満たすかはこれを元に利用しましょう。

それを満たせなかったときのSLAとしては以下のように決められています。

各月の稼働率 対象サービスが SLO を満たさなかった場合、お客様の翌月以降の請求書に対して返金される月次請求額の割合
99%~99.95% 未満 10%
95%~99% 未満 25%
95% 未満 50%

ref: Pub/Sub Service Level Agreement (SLA)  |  Google Cloud

At least Once

PubSubの配信はat least onceを保証しています。
exactly onceではないため、subscribe側は冪等な処理をする必要があります。

exactly onceを実現するには以下のようにCloud Dataflowを組み合わせるか、

Streaming with Pub/Sub  |  Cloud Dataflow  |  Google Cloud

messageについているunique idを利用して、KVSを用いたステート管理をして自前で重複を排除する形になります。

Resources | DoiT International

Publish

メッセージ送信のバッチ処理

PubSubのクライアントライブラリは複数のメッセージをバッチにまとめて 1 回の呼び出しでサービスに送信します。
メッセージは

  • リクエストのサイズ(バイト単位)
  • メッセージの数
  • 時間

に応じてバッチ処理できます。これらのうちどれかがしきい値に達したらバッチ処理されます。

t := client.Topic(topicID)
t.PublishSettings.ByteThreshold = 5000
t.PublishSettings.CountThreshold = 10
t.PublishSettings.DelayThreshold = 100 * time.Millisecond

goの場合デフォルト値は以下です。

パラメータ デフォルト値
ByteThreshold 1e6(=1MB)
CountThreshold 100
DelayThreshold 10 * time.Millisecond

ref: google-cloud-go/pubsub/topic.go at 5c440192105fe3e9b5dd1b584118b309113935e3 · googleapis/google-cloud-go · GitHub

内部でキューを持ちバッチ処理を行っているため、Publishメソッド自体は非同期な処理です。

結果の取得はGet()メソッドで行います。
これは結果が返るまでブロックするため(=リクエストが1度もコケなくてもByteThreshold, CountThreshold, DelayThresholdのどれかを満たすまで待つ。コケたら再試行の分より長い時間待つ)、Publishするたびに呼び出すのでなくExampleのようにまとめて後から取得する方がレイテンシを低減できます。

r := topic.Publish(ctx, &pubsub.Message{
    Data: []byte("hello world"),
})
results = append(results, r)
// Do other work ...
for _, r := range results {
    id, err := r.Get(ctx)
    if err != nil {
        // TODO: Handle error.
    }
    fmt.Printf("Published a message with a message ID: %s\n", id)
}

リトライ

goのクライアントライブラリでは以下の値が設定されてます。

パラメータ 説明 デフォルト値
Initial リクエストがコケて最初に再試行するまでの時間 100ms
Multiplier 再試行間隔を少しずつ増やす際の掛け算値 1.3
Max 再試行するたびに掛け算した値がこの値を超えると、今後再試行間隔は増えなくなる(=一定時間でretryになる) 60s
Topic.PublishSettings.Timeout この時間を超えたら強制的に終了 60s

google-cloud-go/pubsub/apiv1/publisher_client.go at 5c440192105fe3e9b5dd1b584118b309113935e3 · googleapis/google-cloud-go · GitHub google-cloud-go/pubsub/topic.go at 99f7212bd70bb333c1aa1c7a57348b4dfd80d31b · googleapis/google-cloud-go · GitHub

実際にTopic.PublishSettings.Timeoutを10minにしてretryの動きを見ると以下のように再試行間隔が増加していきました。
Maxパラメータを超えて間隔が増えて言ったのは謎ですが

GoのクライアントライブラリはAPI call部分はgax-goに依存している事が多く、こちらがretryなどの処理をよろしくやってくれています。

gax-go/v2/invoke.go at 21fd469b63bc1a2bed9b99b614b4b9dc54a8aeae · googleapis/gax-go · GitHub

Go SDKエラーは?

topic.PublishSettings.Timeout = 120 * time.Second

の状態でパケットフィルタによって通信を止めた状態で、

log.Println("publish")
r := topic.Publish(ctx, &pubsub.Message{
        Data: []byte("hello world"),
})
id, err := r.Get(ctx)
if err != nil {
        log.Println(err)
        return err
}

を実行してみると、

2022/06/29 15:45:35 publish
2022/06/29 15:47:35 context deadline exceeded

このようにTopic.PublishSettings.Timeoutが過ぎてからdeadline exceededエラーが返りました。

終了前のFlush

プロセスを終了させる前にキューに溜まっているメッセージを前もってフラッシュ(全て送信)しておきましょう。

Javaならshutdown()、GoならStop()を実行します。
フラッシュ中は新規メッセージはpublishしようとしてもpublishできずキューに溜まりません。

処理としてはブロッキングで、全て送信が完了するかリトライ上限を超えて失敗するまでブロックします。プロセスの終了はこのフラッシュが終わるまで待つように実装しましょう。

フラッシュ時に失敗したメッセージはロストします。

Pull

非同期Pullと同期Pull

PubSubにはメッセージを取得するためのPull APIが、非同期Pullと同期Pullの2つあります。
サポートしているかどうかはクライアントライブラリに依ります。

非同期Pull (Streaming Pull)

gRPCのBidirectional streaming RPCを用いた永続的な双方向接続を使っています。
これにより

でメッセージを受信することができます。

ctx := context.Background()
client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
    // TODO: Handle error.
}
sub := client.Subscription("subName")
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
    // TODO: Handle message.
    // NOTE: May be called concurrently; synchronize access to shared memory.
    m.Ack()
})
if err != context.Canceled {
    // TODO: Handle error.
}

同期Pull (Synchronous Pull)

以下のユースケースでは同期Pullが向いています。

  • ポーリングパターン
  • メッセージ数の上限が明確
    • Subscriberのインスタンス数が制限されているが、スパイクなどで一時的に大量のPublishがされる場合
ctx := context.Background()
client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
    // TODO: Handle error.
}
sub := client.Subscription("subName")
// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
 // the max number of messages the client will hold in memory at a time.
sub.ReceiveSettings.Synchronous = true
err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
    // TODO: Handle message.
    // NOTE: May be called concurrently; synchronize access to shared memory.
    m.Ack()
})
if err != context.Canceled {
    // TODO: Handle error.
}

goの場合は自分が運用しているサービスにおいては同期Pullの方が比較的CPUリソースが低く済んでいました。

Ack Deadline

メッセージにはack期限(Ack Deadline)というものがあり

  • ack期限を過ぎる
  • Nackを返す

場合、メッセージは再配送されます。

処理中にack期限を超えてしまわないよう、基本的にクライアントライブラリはメッセージの受信時にack期限延長リクエストを送信します。
不要に延長リクエストを飛ばさないよう、この延長期間はメッセージの処理にかかった時間によって調整されていきます。ロジックとしてはパーセンタイルランクで決定されており、99%ileの時間を延長期間としてack期限を延長します。

具体的にいうとほとんどのメッセージが1分以内に処理が完了するものであった場合、「ack期限は1分後に延長する」というリクエストを1分毎(実装では猶予期間として5秒引いた間隔で)に送ります。

ただし

という上限と下限が設定されているので、それを超えたり下回る場合は上限下限値に上書きされます。

初期値は10秒なので、10 - graceperiod=5秒毎に

「10秒延長して」というリクエストが飛んでることが確認できます。

上記間隔で何回か延長し、最終的にs.ReceiveSettings.MaxExtensionで指定された期間まで、フェッチされたすべてのメッセージのack期限を自動的に延長します。

途中で処理に失敗した場合、Nackを返さないと延長されたack期限までそのメッセージは再配送されない(=再実行されません)です。
なので失敗時には必ずNackを呼び出すようにしましょう。

各種パラメータ

Pull側で使う各種パラメータの説明です。

パラメータ 説明 デフォルト値
MaxExtension ack期限を延長する最大値 60min
MaxExtensionPeriod ack期限の延長間隔を決定するロジックをパーセンタイルランクでなく固定値にする 0
MaxOutstandingMessages 一度に処理される未処理(=outstanding)メッセージの数。
負数にすると制限なしになる。
またReceiveに渡すcallbackの並行実行数上限でもある
1000
MaxOutstandingBytes 一度に処理される未処理メッセージのサイズ 1GB
NumGoroutines メッセージ受信処理の並行数
同期Pullの場合この設定は無視され1として扱われる
Receiveに渡すcallbackの並行実行数上限ではない。
10

pubsub package - cloud.google.com/go/pubsub - Go Packages

まとめ

メッセージング基盤はマイクロサービスでは必須のミドルウェアになります。
そのため可用性や冪等性、スループット、retryロジックなどには注意して利用する必要があります。
今回はそれらを中心に説明しました。