Carpe Diem

備忘録

etcdを使った分散ロック

概要

前回のRedisを使った分散ロックでは、正確なロックを取るためにはZookeeperやetcdを使うと良い、とまとめていました。

なので今回はetcdを用いて分散ロックを実現します。

環境

  • etcd v3.4.15
  • pkg.go.dev/go.etcd.io/etcd v3.5.0
  • go 1.16.0

事前知識

分散ロックに必要なもの

分散ロックマネージャには以下の機能が必要です。

  • 自動リース機能
  • CAS
  • フェンシングトークンを発行する機能

1つ1つ説明していきます。

自動リース機能

ロックを取得したクライアントが何らかの要因で落ちた時、そのロックが解放されないままだと他のクライアントは永遠にロックを取得することができません。

したがって、ロックを取得したとしても、自動的に解放されるようTTLを付ける必要があります。

またこのTTLの管理はクライアント側でなく分散ロックを提供する側の機能である必要があります。
というのもクライアント側で生成したtimestampを使ったりすると、クライアント間で内部時計が揃っていないと

  • クライアントAではロックが有効な期間中
  • クライアントBでは内部時計が↑より未来になっており、ロックの有効期限を過ぎた

といった状態が発生するからです。

etcdの場合

v2では直接TTLを、v3ではLeaseというオブジェクトをセットすることでTTL管理するようになっています。

# grant a lease with 60 second TTL
$ etcdctl lease grant 60
lease 32695410dcc0ca06 granted with TTL(60s)

# attach key foo to lease 32695410dcc0ca06
$ etcdctl put --lease=32695410dcc0ca06 foo bar
OK

etcd docs | Interacting with etcd

CAS

CAS(Compare And Swap/Compare And Set)は、既存の値が期待する値と一致している時、アトミックに新しい値を書き込む処理です。値が一致しない時はエラーやfalseが返ります。

なので

  1. ロックの値が無ければ(==null)ロック(="1")する
  2. ロック中(=="1")であればアンロック(="0")する
  3. アンロック中(=="0")であればロック(="1")する

といった処理をアトミックに処理できます。

これにより例えば「他のクライアントがロック中なのにロックしようとすると、a, cのcompareを満たさないのでfalseになる」という処理が可能になります。

ここで重要なのはアトミックという点で、単純にプログラムでread-modify-writeしようとすると

  1. readする
  2. 条件比較する
  3. 条件を満たしていたのでデータを修正
  4. 並行で走っている別の処理が先にwriteしてしまう
  5. 修正したデータをwrite

といった問題が起きます。いわゆる更新のロスト問題(4での更新が消える)です。

etcdの場合

Lock()関数を使いますが、内部の実装は

cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
    return nil, err
}

ref: etcd/mutex.go at f4001630d9ee47d54613a34ab5238feb88c89f6d · etcd-io/etcd · GitHub

のようにCASになっています。

フェンシングトークンを発行する機能

先にロックを取得したクライアントがGCなどでポーズし、TTL後に処理を続行してしまうケースの対応です。

f:id:quoll00:20210306094202p:plain

ref: How to do distributed locking — Martin Kleppmann’s blog

対策としてこのように単調増加なフェンシングトークンもストレージ側に保存するようにしておきます。 そしてストレージにwriteする際にこの値が以前の値より大きいかどうかの条件文を入れるようにします。

etcdの場合

keyをセットした際にレスポンスで返るrevisionが単調増加なのでそれを使います。

実装

実際の分散ロックのコードは以下のようになります。

func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) {
        ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL))
        if err != nil {
                return 0, nil, err
        }
        m := concurrency.NewMutex(ss, key)
        // Orphan ends the refresh for the session lease.
        ss.Orphan()

        // TryLock returns immediately if lock is held by another session.
        err = m.TryLock(ctx)
        if err != nil {
                return 0, nil, err
        }

        return m.Header().Revision, func(ctx context.Context) error {
                return m.Unlock(ctx)
        }, nil
}

実装上ポイント

ポイントは以下です。

  • SessionにTTLをセットする
  • Sessionはロックするたびに生成する
    • 同じSessionだと排他的にならない。revisionも同一
  • Orphan()でTTL(リース期限)が自動延長されないようにする
    • これをしない場合、TTLが過ぎてもkeepaliveが続く間ロックが取得できない
  • TryLock()はロック取得できなければ即時エラーが返る
    • ロック取得できるまでブロックしたければLock()を使う
  • Header().Revisionでフェンシングトークンを取得

使い方

使う際は以下のようにします。

func main() {
        cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
        if err != nil {
                log.Fatal(err)
        }
        defer cli.Close()

        rev, unlocker, err := Lock(context.Background(), cli, lockResource)
        if err != nil {
                log.Fatal(err)
        }
        // unlock
        defer func() {
                if err := unlocker(context.Background()); err != nil {
                        log.Fatal(err)
                }
                log.Println("unlocked")
        }()
        log.Println("acquired lock rev:", rev)

        // フェンシングトークン(rev)を使ったwrite処理
}

動作検証

同じリソースに対して2つの処理がロックをかけようとしてみます。

正常系(ロック取得を待たない)

他のトランザクションによってロックが取得されてた場合即時エラーを返すパターンです。

func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) {
        ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL))
        if err != nil {
                return 0, nil, err
        }
        m := concurrency.NewMutex(ss, key)
        // Orphan ends the refresh for the session lease.
        ss.Orphan()

        // TryLock returns immediately if lock is held by another session.
        err = m.TryLock(ctx)  // here
        if err != nil {
                return 0, nil, err
        }

        return m.Header().Revision, func(ctx context.Context) error {
                return m.Unlock(ctx)
        }, nil
}

transaction1

まだロックがかかっていないのでロック取得できます。

$ go run main.go
2021/03/06 09:32:59 acquired lock rev: 5
2021/03/06 09:33:04 unlocked

transaction2

transaction1でロックが取得されているので、こちらはコケてしまいます。

$ go run main.go
2021/03/06 09:33:02 mutex: Locked by another session
exit status 1

正常系(ロック取得を待つ)

先程のコードではTryLock()でしたが、待ちたい場合はLock()を使います。

func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) {
        ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL))
        if err != nil {
                return 0, nil, err
        }
        m := concurrency.NewMutex(ss, key)
        // Orphan ends the refresh for the session lease.
        ss.Orphan()

        // acquire lock for ss
        err = m.Lock(ctx)  // here
        if err != nil {
                return 0, nil, err
        }

        return m.Header().Revision, func(ctx context.Context) error {
                return m.Unlock(ctx)
        }, nil
}

transaction1

まだロックがかかっていないのでロック取得できます。

$ go run main.go
2021/03/06 09:33:50 acquired lock rev: 9
2021/03/06 09:33:55 unlocked

transaction2

実行後はしばらく待機していますが、transaction1の処理が終了後こちらもロックが取得できました。

$ go run main.go
2021/03/06 09:33:55 acquired lock rev: 11
2021/03/06 09:34:00 unlocked

またrevisionが常に増加していることが確認できます。

先にロックを取った方がアンロック前に死んだケース

自動リースの検証です。
ロックを取得したクライアントが何らかの要因で落ちたケースを想定します。

transaction1

ロックを取得後プロセスを落としてみます。TTL10secです。

$ go run main.go
2021/03/06 09:37:23 acquired lock rev: 19
^Csignal: interrupt

09:37:23にロック取得しています。

transaction2

TTLでロックがリースされるとロック取得できるようになります。

$ go run main.go
2021/03/06 09:37:33 acquired lock rev: 21
2021/03/06 09:37:38 unlocked

TTLである10秒後09:37:33にロック取得できています。

成果物

今回使用したコードはこちら↓
docker-composeのyamlも入っているので簡単に検証できます。

github.com

まとめ

分散ロックマネージャにとって必要な機能の説明と、それを満たしているetcdを用いた実際の分散ロックのコードを紹介しました。

参考

分散システムを考える上で以下の書籍がとても勉強になります。

www.oreilly.co.jp

www.oreilly.co.jp

その他リンク