概要
以下のような購入時のドメインイベントをメッセージング基盤に非同期で渡してから順番通りに実行したい、となった時に便利なのが順序保証をしてくれるPubSubのordering keyです。
- 購入処理
- 各マイクロサービスへの状態変更リクエスト
- 行動ログ追記
- 購入完了メールの送信
ただordering keyを使う上での注意点はあまり見かけなかったのでこちらにまとめようと思います。
結論から言うとordering keyの粒度はなるべく細かくしないとスケールしないという話です。
環境
- cloud.google.com/go/pubsub v1.19.0
考慮すべき点
同じordering keyのメッセージは同じパーティションにpublishされる
Cloud PubSubはメッセージング基盤の概念であるパーティション(Kinesisでいうシャード)をカプセル化しており、ユーザが意識しなくて済むようにしています。
しかしながら実際にはパーティションは存在し、同じorderingKeyのメッセージは同じパーティションにpublishされるとドキュメントに書いてあります。
したがってordering keyの粒度が粗いとhot shardが発生することになります。
そのため以下のようなリソース制限があります。
メッセージに順序指定キーが含まれている場合、パブリッシャーの最大スループットは各順序指定キーあたり 1 MBps
ref: Pub/Sub quotas and limits | Cloud Pub/Sub Documentation | Google Cloud
Publish時になんだか詰まるなぁとなった場合はこのケースを疑ってみると良いでしょう。
Publishに一度失敗すると以降全て失敗する
以下の順にPublishして、
- message001をpublish
- message002をpublish
- message003をpublish
最初のmessage001が失敗した場合、そのまま後続のpublishが成功してしまうと順序保証がされなくなってしまいます。
かといって全てシーケンシャルに実行し、コケたら再実行するロジックだとPublisherのスケーラビリティがなくなります。
そこでPubSubのSDKは順序保証を保ったままスケーラビリティも保証するため、非同期に並行してPublishはできるものの以下の制約があります。
そうなると一度失敗するとずっと失敗してしまうことになるため、その対処のためのメソッドResumePublish()がSDKで提供されています。
t := client.Topic(topicID) t.EnableMessageOrdering = true key := "some-ordering-key" res := t.Publish(ctx, &pubsub.Message{ Data: []byte("some-message"), OrderingKey: key, }) _, err = res.Get(ctx) if err != nil { // Resume publish on an ordering key that has had unrecoverable errors. // After such an error publishes with this ordering key will fail // until this method is called. t.ResumePublish(key) }
ref: Resume publishing with ordering keys | Cloud Pub/Sub Documentation | Google Cloud
なのでエラーが発生したら必ず呼ぶようにしましょう。
Subscriberはordering keyのグループ単位で割り当てられる
メッセージング基盤は通常パーティションの数とSubscriberの数の最小値がSubscribeのスケール限界になります。
つまりこのパーティション数の場合、Subscriberを増やしても処理はスケールしません。
GCPのPubSubはその点非常に洗練されており、
という要件を満たしていますが、ordering keyでは制約が発生します。
They assign subscribers to groups of ordering keys that are more fine-grained than a partition.
ref: Google Cloud Pub/Sub Ordered Delivery | by Kamal Aboul-Hosn | Google Cloud - Community | Medium
とあるようにPubSubではパーティションより粒度の細かいordering keyのグループ単位でSubscriberを割り当てます。
つまりordering keyの数とSubscriberの数の最小がSubscribeのスケール限界になります。
例えば上記のように4種類のordering keyを使い回す、といった設計にしてしまうと、Subscriberをいくら増やしても4並行以上はスケールしません。
※同じTopicにordering keyのないメッセージが含まれてる場合であれば他のSubscriberがそれらのメッセージは分散して処理してくれます
同じorderingKeyは直列に実行される
Subscriber側のGo SDK実装を確認すると
- ordering keyがない場合はすべてgoroutineで並行実行される
- ordering keyがある場合は同じordering keyは直列実行される
であったため、ordering keyの粒度が粗いと実行に時間がかかることが分かります。
ordering keyではない場合
以下のようにworker(MaxOutstandingMessages)によるセマフォはありますが、メッセージに対するcallbackは並行実行されることが分かります。
func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error { if key == "" { // Spawn a worker. s.workers <- struct{}{} go func() { // Unordered keys can be handled immediately. handle(item) <-s.workers }() return nil }
ordering keyの場合
一方ordering keyがある場合は内部でキューを持っており、
- メッセージ及びそのcallback処理をキューに追加する
- キューを順次実行する
の2つはgoroutineで並行処理されてますが、キュー自体は直列で実行されるため時間がかかります。
s.mu.Lock() _, ok := s.m[key] s.m[key] = append(s.m[key], func() { handle(item) }) s.mu.Unlock() if ok { // Someone is already working on this key. return nil } // Spawn a worker. s.workers <- struct{}{} go func() { defer func() { <-s.workers }() // Key-Loop: loop through the available items in the key's queue. for { s.mu.Lock() if len(s.m[key]) == 0 { // We're done processing items - the queue is empty. Delete // the queue from the map and free up the worker. delete(s.m, key) s.mu.Unlock() return } // Pop an item from the queue. next := s.m[key][0] s.m[key] = s.m[key][1:] s.mu.Unlock() next() // Handle next in queue. } }()
したがって処理の重いメッセージによるhead-of-lineブロッキングが発生することになります。
subscribe時に同じordering keyの処理の途中でコケると1つ目から再配信される
例えば以下のような1500円かかった購入イベントをordering keyで順序保証してメッセージを処理するとしましょう。
order001: 商品1をカートに入れる order001: 商品2をカートに入れる order001: 購入処理の開始 order001: ポイントを1000円分消費する order001: クレジットカードで500円消費する order001: 加盟店の売上金に1500円を付与する order001: 購入完了メールを送る
ここで途中のクレジットカード処理でコケたとしましょう。
order001: 商品1をカートに入れる →ack order001: 商品2をカートに入れる →ack order001: 購入処理の開始 →ack order001: ポイントを1000円分消費する →ack order001: クレジットカードで500円消費する →nack order001: 加盟店の売上金に1500円を付与する order001: 購入完了メールを送る
通常のPubSubであればnackしたメッセージのみ再配信されますが、
order001: クレジットカードで500円消費する ←再配信 order001: 加盟店の売上金に1500円を付与する order001: 購入完了メールを送る
ordering keyでは同じordering keyのすべてのメッセージが再配信されます。
order001: 商品1をカートに入れる ←再配信 order001: 商品2をカートに入れる ←再配信 order001: 購入処理の開始 ←再配信 order001: ポイントを1000円分消費する ←再配信 order001: クレジットカードで500円消費する ←再配信 order001: 加盟店の売上金に1500円を付与する order001: 購入完了メールを送る
したがって場合によっては非常に多くの重複メッセージが発生します。
一連のイベントがすべて冪等に処理されるように実装する必要がありますし、前述の直列処理の観点から再配信になった場合は1から始まるので非常に時間がかかります。
ちなみにドキュメントにこう明記されています。
順序指定キーを持つメッセージを再配信する場合は、確認済みメッセージを含む、同じ順序キーを持つメッセージもすべて再配信します。
ref: Ordering messages | Cloud Pub/Sub Documentation | Google Cloud
ordering keyの設定は最初しかできない
サブスクリプションでordering keyを有効化するのは作成時しかできません。
なので切り替えたい場合は新しくサブスクリプションを作成する必要があります。
まとめ
ordering keyは便利な機能ではありますが、PubSubが本来解決していた
といった要件を満たさなくなってしまう可能性があります。
なのでordering keyの粒度はできる限り細かく設計するようにしましょう。