Carpe Diem

備忘録

Cloud PubSubのOrdering Keyで考慮すること

概要

以下のような購入時のドメインイベントをメッセージング基盤に非同期で渡してから順番通りに実行したい、となった時に便利なのが順序保証をしてくれるPubSubのordering keyです。

  1. 購入処理
  2. 各マイクロサービスへの状態変更リクエス
  3. 行動ログ追記
  4. 購入完了メールの送信

ただordering keyを使う上での注意点はあまり見かけなかったのでこちらにまとめようと思います。

結論から言うとordering keyの粒度はなるべく細かくしないとスケールしないという話です。

環境

  • cloud.google.com/go/pubsub v1.19.0

考慮すべき点

同じordering keyのメッセージは同じパーティションにpublishされる

Cloud PubSubはメッセージング基盤の概念であるパーティションKinesisでいうシャード)をカプセル化しており、ユーザが意識しなくて済むようにしています。

しかしながら実際にはパーティションは存在し、同じorderingKeyのメッセージは同じパーティションにpublishされるとドキュメントに書いてあります。

f:id:quoll00:20220330063507p:plain

したがってordering keyの粒度が粗いとhot shardが発生することになります。

f:id:quoll00:20220330063716p:plain

そのため以下のようなリソース制限があります。

メッセージに順序指定キーが含まれている場合、パブリッシャーの最大スループットは各順序指定キーあたり 1 MBps

ref: Pub/Sub quotas and limits  |  Cloud Pub/Sub Documentation  |  Google Cloud

Publish時になんだか詰まるなぁとなった場合はこのケースを疑ってみると良いでしょう。

Publishに一度失敗すると以降全て失敗する

以下の順にPublishして、

  1. message001をpublish
  2. message002をpublish
  3. message003をpublish

最初のmessage001が失敗した場合、そのまま後続のpublishが成功してしまうと順序保証がされなくなってしまいます。

かといって全てシーケンシャルに実行し、コケたら再実行するロジックだとPublisherのスケーラビリティがなくなります。

そこでPubSubのSDKは順序保証を保ったままスケーラビリティも保証するため、非同期に並行してPublishはできるものの以下の制約があります。

  • Publishに失敗するとSDKのPublisherの内部キューにある残りの全てのメッセージも失敗する
  • 後続のPublishリクエストも失敗する

※GoのSDKだとこの辺の処理

そうなると一度失敗するとずっと失敗してしまうことになるため、その対処のためのメソッドResumePublish()SDKで提供されています。

    t := client.Topic(topicID)
    t.EnableMessageOrdering = true
    key := "some-ordering-key"

    res := t.Publish(ctx, &pubsub.Message{
        Data:        []byte("some-message"),
        OrderingKey: key,
    })
    _, err = res.Get(ctx)
    if err != nil {
        // Resume publish on an ordering key that has had unrecoverable errors.
        // After such an error publishes with this ordering key will fail
        // until this method is called.
        t.ResumePublish(key)
    }

ref: Resume publishing with ordering keys  |  Cloud Pub/Sub Documentation  |  Google Cloud

なのでエラーが発生したら必ず呼ぶようにしましょう。

Subscriberはordering keyのグループ単位で割り当てられる

メッセージング基盤は通常パーティションの数とSubscriberの数の最小値がSubscribeのスケール限界になります。

f:id:quoll00:20220330065052p:plain

つまりこのパーティション数の場合、Subscriberを増やしても処理はスケールしません。

f:id:quoll00:20220330065141p:plain

GCPのPubSubはその点非常に洗練されており、

  • パーティション数を意識せず、Subscriberを増やした分スケールする
  • 処理の重いメッセージによるhead-of-lineブロッキングが発生しない
  • hot shardを発生させない

という要件を満たしていますが、ordering keyでは制約が発生します

GCPのPubSubテックリードのブログ記事に

They assign subscribers to groups of ordering keys that are more fine-grained than a partition.

ref: Google Cloud Pub/Sub Ordered Delivery | by Kamal Aboul-Hosn | Google Cloud - Community | Medium

とあるようにPubSubではパーティションより粒度の細かいordering keyのグループ単位でSubscriberを割り当てます。

つまりordering keyの数とSubscriberの数の最小がSubscribeのスケール限界になります。

f:id:quoll00:20220330070501p:plain

例えば上記のように4種類のordering keyを使い回す、といった設計にしてしまうと、Subscriberをいくら増やしても4並行以上はスケールしません。

※同じTopicにordering keyのないメッセージが含まれてる場合であれば他のSubscriberがそれらのメッセージは分散して処理してくれます

同じorderingKeyは直列に実行される

Subscriber側のGo SDK実装を確認すると

  • ordering keyがない場合はすべてgoroutineで並行実行される
  • ordering keyがある場合は同じordering keyは直列実行される

であったため、ordering keyの粒度が粗いと実行に時間がかかることが分かります。

ordering keyではない場合

以下のようにworker(MaxOutstandingMessages)によるセマフォはありますが、メッセージに対するcallbackは並行実行されることが分かります。

func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error {
    if key == "" {
        // Spawn a worker.
        s.workers <- struct{}{}
        go func() {
            // Unordered keys can be handled immediately.
            handle(item)
            <-s.workers
        }()
        return nil
    }

ref: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.19.0/pubsub/internal/scheduler/receive_scheduler.go#L70-L79

ordering keyの場合

一方ordering keyがある場合は内部でキューを持っており、

  • メッセージ及びそのcallback処理をキューに追加する
  • キューを順次実行する

の2つはgoroutineで並行処理されてますが、キュー自体は直列で実行されるため時間がかかります。

   s.mu.Lock()
    _, ok := s.m[key]
    s.m[key] = append(s.m[key], func() {
        handle(item)
    })
    s.mu.Unlock()
    if ok {
        // Someone is already working on this key.
        return nil
    }

    // Spawn a worker.
    s.workers <- struct{}{}

    go func() {
        defer func() { <-s.workers }()

        // Key-Loop: loop through the available items in the key's queue.
        for {
            s.mu.Lock()
            if len(s.m[key]) == 0 {
                // We're done processing items - the queue is empty. Delete
                // the queue from the map and free up the worker.
                delete(s.m, key)
                s.mu.Unlock()
                return
            }
            // Pop an item from the queue.
            next := s.m[key][0]
            s.m[key] = s.m[key][1:]
            s.mu.Unlock()

            next() // Handle next in queue.
        }
    }()

ref: https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.19.0/pubsub/internal/scheduler/receive_scheduler.go#L85-L119

したがって処理の重いメッセージによるhead-of-lineブロッキングが発生することになります。

subscribe時に同じordering keyの処理の途中でコケると1つ目から再配信される

例えば以下のような1500円かかった購入イベントをordering keyで順序保証してメッセージを処理するとしましょう。

order001: 商品1をカートに入れる
order001: 商品2をカートに入れる
order001: 購入処理の開始
order001: ポイントを1000円分消費する
order001: クレジットカードで500円消費する
order001: 加盟店の売上金に1500円を付与する
order001: 購入完了メールを送る

ここで途中のクレジットカード処理でコケたとしましょう。

order001: 商品1をカートに入れる →ack
order001: 商品2をカートに入れる →ack
order001: 購入処理の開始 →ack
order001: ポイントを1000円分消費する →ack
order001: クレジットカードで500円消費する →nack
order001: 加盟店の売上金に1500円を付与する
order001: 購入完了メールを送る

通常のPubSubであればnackしたメッセージのみ再配信されますが、

order001: クレジットカードで500円消費する ←再配信
order001: 加盟店の売上金に1500円を付与する
order001: 購入完了メールを送る

ordering keyでは同じordering keyのすべてのメッセージが再配信されます。

order001: 商品1をカートに入れる ←再配信
order001: 商品2をカートに入れる ←再配信
order001: 購入処理の開始 ←再配信
order001: ポイントを1000円分消費する ←再配信
order001: クレジットカードで500円消費する ←再配信
order001: 加盟店の売上金に1500円を付与する
order001: 購入完了メールを送る

したがって場合によっては非常に多くの重複メッセージが発生します。
一連のイベントがすべて冪等に処理されるように実装する必要がありますし、前述の直列処理の観点から再配信になった場合は1から始まるので非常に時間がかかります

ちなみにドキュメントにこう明記されています。

順序指定キーを持つメッセージを再配信する場合は、確認済みメッセージを含む、同じ順序キーを持つメッセージもすべて再配信します。

ref: Ordering messages  |  Cloud Pub/Sub Documentation  |  Google Cloud

ordering keyの設定は最初しかできない

サブスクリプションでordering keyを有効化するのは作成時しかできません。

f:id:quoll00:20220405162753p:plain

なので切り替えたい場合は新しくサブスクリプションを作成する必要があります。

まとめ

ordering keyは便利な機能ではありますが、PubSubが本来解決していた

  • パーティション数を意識せず、Subscriberを増やした分スケールする
  • 処理の重いメッセージによるhead-of-lineブロッキングが発生しない
  • hot shardを発生させない

といった要件を満たさなくなってしまう可能性があります。

なのでordering keyの粒度はできる限り細かく設計するようにしましょう。

参考