Carpe Diem

備忘録

Bigtableの条件付き書き込みパターン

概要

BigtableにはCheckAndMutateRow APIがあり、書き込む際に条件をつけることで更新のロストなどを防ぐ仕組みが用意されています。

※更新のロストについては以下を参考にしてください

トランザクションの分離レベルで出てくる用語 - Carpe Diem

今回はよくある書き込みパターンとその実装例を紹介します。

環境

条件付き書き込み

GoのSDKでは条件付き書き込みにはNewCondMutationを使います。

NewCondMutation(条件, 一致した時の変更処理, 一致しなかった時の変更処理)

と言った形式で利用します。

条件にはFilter interfaceを使います。
主なFilterとしては

などがあり、複数組み合わせて条件を絞ることも可能です。

条件が一致しなかったら書き込み

データが存在しなかったら書き込み

RedisのSETNXにもあるような、存在しなかったら書き込むパターンです。
初回のみ書き込みOKというケースですね。

func (c *Client) writeOnce(rowKey, columnFamilyName, value string) error {
        tbl := c.cli.Open(tableName)

        mut := bigtable.NewMutation()
        mut.Set(columnFamilyName, columnQualifier, bigtable.Timestamp(0), []byte(value))

        filter := bigtable.ChainFilters(
                bigtable.FamilyFilter(columnFamilyName),
        )
        conditionalMutation := bigtable.NewCondMutation(filter, nil, mut)

        if err := tbl.Apply(context.Background(), rowKey, conditionalMutation); err != nil {
                return err
        }

        return nil
}

存在しなかったら、という条件を「書き込み先の列ファミリを持たなかったら」という条件で表現しています。

動作確認

複数回書き込んだとしても

func writeOnce(client *Client) error {
        rowKey := "write once"
        err := client.writeOnce(rowKey, columnFamilyName, "1")
        if err != nil {
                return err
        }
        err = client.writeOnce(rowKey, columnFamilyName, "2")
        if err != nil {
                return err
        }
        err = client.read(rowKey, columnFamilyName)
        if err != nil {
                return err
        }
        return nil
}

初回の値しか保存されません。

row: write once, column: data:value, value: 1, timestamp: 0

条件が一致したら書き込み

CAS (Compare And Swap)

Read-Modify-Writeパターンの際に楽観ロックとして使うパターンです。

func (c *Client) compareAndSwap(rowKey, columnFamilyName, old, value string) (bool, error) {
        tbl := c.cli.Open(tableName)

        mut := bigtable.NewMutation()
        mut.Set(columnFamilyName, columnQualifier, bigtable.Timestamp(0), []byte(value))

        filter := bigtable.ChainFilters(
                bigtable.FamilyFilter(columnFamilyName),
                bigtable.ColumnFilter(columnQualifier),
                bigtable.ValueFilter(old),
        )
        conditionalMutation := bigtable.NewCondMutation(filter, mut, nil)
        var match bool
        opt := bigtable.GetCondMutationResult(&match)

        if err := tbl.Apply(context.Background(), rowKey, conditionalMutation, opt); err != nil {
                return false, err
        }

        return match, nil
}

読み込みした時の値(old)が、書き込む際にも変わっていなかったら書き込みを実行します。
ValueFilterを使って値の一致をチェックしています。

動作確認

以下のように初回の値をまず取得して、比較する値とします。
リクエストが並行に飛んだようなケースですね。

func compareAndSwap(client *Client) error {
        rowKey := "compare and swap"
        err := client.write(rowKey, columnFamilyName, "first")
        if err != nil {
                return err
        }
        row, err := client.getRow(rowKey, columnFamilyName)
        if err != nil {
                return err
        }
        old := getValue(row, columnFamilyName, columnQualifier)

        match, err := client.compareAndSwap(rowKey, columnFamilyName, old, "second")
        if err != nil {
                return err
        }
        fmt.Println("second:", match)
        match, err = client.compareAndSwap(rowKey, columnFamilyName, old, "third")
        if err != nil {
                return err
        }
        fmt.Println("third:", match)
        err = client.read(rowKey, columnFamilyName)
        if err != nil {
                return err
        }
        return nil
}

結果としては先に書き込まれたsecondの方のみ反映されます。

second: true
third: false
row: compare and swap, column: data:value, value: second, timestamp: 0

thirdの方はsecondの書き込みによって値が変わってしまったので反映されなくなっています。

インクリメントなど一部のユースケースでは条件付き書き込みでなくReadModifyWriteRowを使う選択肢もあります。

バージョン管理

順序保証されないメッセージング基盤を経由してデータの更新を行う際に、古いデータによって上書きされないようにするパターンです。

v1v2v3といった感じで変更されたものの、メッセージング基盤によって書き込み順序が変わってもv3が最新として保持されるようにします。

func (c *Client) updateWhenNewVersion(rowKey, columnFamilyName, value string, version string) error {
        tbl := c.cli.Open(tableName)

        mut := bigtable.NewMutation()
        mut.Set(columnFamilyName, columnQualifier, bigtable.Timestamp(0), []byte(value))
        mut.Set(columnFamilyName, versionColumnQualifier, bigtable.Timestamp(0), []byte(version))

        // if no data, set initial value
        filter := bigtable.ChainFilters(
                bigtable.FamilyFilter(columnFamilyName),
                bigtable.ColumnFilter(versionColumnQualifier),
        )
        nxCond := bigtable.NewCondMutation(filter, nil, mut)
        var match bool
        opt := bigtable.GetCondMutationResult(&match)
        if err := tbl.Apply(context.Background(), rowKey, nxCond, opt); err != nil {
                return err
        }
        if !match {
                return nil
        }

        // if data exists and version is more than current version, updates value
        filter = bigtable.ChainFilters(
                bigtable.FamilyFilter(columnFamilyName),
                bigtable.ColumnFilter(versionColumnQualifier),
                bigtable.ValueRangeFilter(nil, []byte(version)),
        )
        conditionalMutation := bigtable.NewCondMutation(filter, mut, nil)

        if err := tbl.Apply(context.Background(), rowKey, conditionalMutation); err != nil {
                return err
        }

        return nil
}

ここではValueRangeFilterを使って「現在のデータが自分より新しくない」ことを条件としています。

動作確認

以下のように132の順にデータが流れたとします。

func updateWhenNewVersion(client *Client) error {
        rowKey := "update when new version"
        err := client.updateWhenNewVersion(rowKey, columnFamilyName, "first data", "1")
        if err != nil {
                return err
        }
        err = client.updateWhenNewVersion(rowKey, columnFamilyName, "third data", "3")
        if err != nil {
                return err
        }
        err = client.updateWhenNewVersion(rowKey, columnFamilyName, "second data", "2")
        if err != nil {
                return err
        }
        err = client.read(rowKey, columnFamilyName)
        if err != nil {
                return err
        }
        return nil
}

しかし結果としてはちゃんと3の値が保持されます。

row: update when new version, column: data:value, value: third data, timestamp: 0
row: update when new version, column: data:version, value: 3, timestamp: 0

注意

マルチクラスタでは利用できない

以下のユースケースでは、条件付き書き込みを使用できません。

  • 複数クラスタ ルーティングが含まれるアプリ プロファイルを使用している場合。
  • 複数の単一クラスタ アプリ プロファイルを使用していて、インスタンス内の他のクラスタの同じ行と列に書き込まれるデータと競合する可能性のある書き込みを送信する場合。単一クラスタ ルーティングでは、書き込みリクエストは単一クラスタに送信されてから複製されます。

ref: https://cloud.google.com/bigtable/docs/writes#not-conditional

とあるように、マルチクラスタではデフォルト設定だと条件付き書き込みを使用できません。

マルチクラスタ構成で条件付き書き込みを利用したい場合は、アプリプロファイルを利用して書き込みをシングルクラスタルーティングにする必要があります。

以下にまとめました。

christina04.hatenablog.com

シングルクラスタでもアトミック性が保証されないケースも

Bigtableの条件付き書き込みは、ドキュメント上では行に対するアトミック性を保証しています。

確認に続いて書き込みが行われるこのプロセスは、単一のアトミックな処理として完了します。

Writes  |  Cloud Bigtable Documentation  |  Google Cloud

しかしながら条件によってはアトミック性が保証されないケースもあるそうです。

For ConditionalRowMutation we use single conditional row mutation (should be atomic) to acquire lock and also single mutation to keep it alive. However, in some conditions, like the cluster being overloaded, or the request taking longer than expected, locking in Bigtable will become non-exclusive.

ref: Google Cloud Bigtable – Expect the Unexpected - inovex GmbH

サンプルコード

今回のサンプルコードはこちらです。

github.com

まとめ

Bigtableにおける条件付き書き込みのパターンと実装例を紹介しました。