Carpe Diem

備忘録

MongoDBのデータをElasticsearchにリアルタイム同期

概要

以前はMongoRiverを使う - Carpe Diemで紹介したようにRiverという機能を使って同期を実現させていました。
しかしながらElasticsearchがRiverを廃止することを決め、バージョン2.x以降は使うことができなくなりました。

そこで調べてみて挙がったのは以下です。

ツール 用途 デメリット
logstash 様々なデータをElasticsearchへ投入するツール。
公式が薦めてる
inputにmongoのプラグインがない
beats ログの解析に使える。ダッシュボードもすぐ用意できる mongoのデータを送るようなプラグインがない
embulk バルクローダー。fluentdのバッチ版 バッチ版というようにリアルタイム同期ではない
fluentd リアルタイムな同期システムを作れる データがすでにあるものに対しての全同期には不向き
transporter 全同期可能。リアルタイムなデータ同期もできる sharded mongo clusterに未対応
mongo-connector 同上 デフォルト設定だとログが少ない

以上の結果からmongo-connectorを使用することにしました。

mongo-connectorのいい点

  • mongodb labsが作っている
  • 初回の全同期が可能
  • oplogを追従するのでリアルタイムに同期できる
  • Sharded clusterに対応
  • DB、Collectionの指定が可能
  • Arrayに対応
  • プロセスが落ちてからもoplogの位置を見失わなければ再開可能

特に1つ目は大きくて、個人開発だとやはり保守に限界があるのでElasticsearchの更新に追従できなくなる事が多いです。
MongoDBを作ってるチームなのでそこは安心感があります。

環境

  • Ubuntu 14.04
  • MongoDB 3.2.9
  • Elasticsearch 2.3.5
  • mongo-connector 2.4.1

実装

準備

レプリカセットが必須なので、DockerでMongoDBのレプリカセットを構築 - Carpe Diemを参考に用意してください。

インストール

$ sudo pip install mongo-connector

Elasticsearchを使う場合はelastic2-doc-managerもインストールしてください

$ sudo pip install elastic2-doc-manager

config.json

コレクションの指定などをしたい場合はconfigで詳細を設定します。

  • mongodbはlocalhost:27017
  • elasticsearchはlocalhost:9200
  • mongoのtest_dbというDBのcol1col2というコレクションを、Elasticsearchのtest_indexというインデクスのtype1type2というタイプに変換したい

だと以下のようになります。

{
  "mainAddress": "localhost:27017",
  "oplogFile": "/var/log/mongo-connector/oplog.timestamp",
  "verbosity": 1,
  "logging": {
    "type": "file",
    "filename": "/var/log/mongo-connector/mongo-connector.log",
    "__rotationWhen": "D",
    "__rotationInterval": 1,
    "__rotationBackups": 10
  },
  "namespaces": {
    "include": ["test_db.col1", "test_db.col2"],
    "mapping": {
      "test_db.col1": "test_index.type1",
      "test_db.col2": "test_index.typ2"
    }
  },
  "docManagers": [
    {
      "docManager": "elastic2_doc_manager",
      "targetURL": "localhost:9200"
    }
  ]
}

詳細は以下です。

Configuration Options · yougov/mongo-connector Wiki · GitHub

設定例はgithubの例を見てください。

https://github.com/mongodb-labs/mongo-connector/blob/master/config.json

実行

$ mongo-connector -c config.json

で実行できます。フォアグラウンドで実行されるのでsupervisorなどで管理したほうが良いです。

注意点

以下実際に動かしてみて詰まったところをまとめます。

データソース(mainAddress)にmongosを指定することはできない、と書いてあるが実際はできる

shardで分けたmongod自体を指定して、と書いてありますが、ソースはmongosOK前提としたロジックであり、動かしてみるとmongos指定で動きます。


Elasticsearchのスペックが意外と必要

2000万件ほどのデータを同期してみましたが、デフォルトの設定だとc4.largeくらいないと途中で詰まって全然進まなくなります。


ELBは経由させないほうがいい

Elasticsearchの前に置いたELBを経由させると、ロードバランシングのせいでエラーが起きることがあります。

2016-08-25 08:00:26,733 [WARNING] elasticsearch:88 - POST http://elasticsearch.hoge.jp:80/_bulk [status:N/A request:10.011s]
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 94, in perform_request
    response = self.pool.urlopen(method, url, body, retries=False, headers=self.headers, **kw)
  File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 640, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/usr/local/lib/python2.7/dist-packages/urllib3/util/retry.py", line 238, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 595, in urlopen
    chunked=chunked)
  File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 395, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
  File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 315, in _raise_timeout
    raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value)
ReadTimeoutError: HTTPConnectionPool(host=u'elasticsearch.hoge.jp', port=80): Read timed out. (read timeout=10)
2016-08-25 08:00:26,736 [CRITICAL] mongo_connector.oplog_manager:625 - Exception during collection dump
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 578, in do_dump
    upsert_all(dm)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 562, in upsert_all
    dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 43, in wrapped
    reraise(new_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert
    for ok, resp in responses:
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 91, in _process_bulk_chunk
    raise e
OperationFailed: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'elasticsearch.hoge.jp', port=80): Read timed out. (read timeout=10))

ですのでELBは経由させないほうが安定します。


max_docsが肥大化する

num_docsに比べてmax_docsが異常に多くなります。 f:id:quoll00:20160825005824p:plain

※Elasticsearchでシャード構成にしていないケースでは一致してました。


デフォルトだとログが非常に少ない

デフォルトは"verbosity": 0なのですが、この状態だとログがほとんど出ないためデバッグに非常に苦労します。なので最低でも1にした方がいいです。


InvalidBSON: 'utf8' codec can't decode byte

DBのドキュメントに変なものが混ざっているとこのようなエラーが出てきます。

2016-08-25 23:15:09,008 [CRITICAL] mongo_connector.oplog_manager:625 - Exception during collection dump
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 578, in do_dump
    upsert_all(dm)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 562, in upsert_all
    dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert
    for ok, resp in responses:
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 161, in streaming_bulk
    for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer):
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 55, in _chunk_actions
    for action, data in actions:
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 195, in docs_to_upsert
    for doc in docs:
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 525, in docs_to_dump
    for doc in cursor:
  File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 1090, in next
    if len(self.__data) or self._refresh():
  File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 1032, in _refresh
    self.__max_await_time_ms))
  File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 903, in __send_message
    codec_options=self.__codec_options)
  File "/usr/local/lib/python2.7/dist-packages/pymongo/helpers.py", line 142, in _unpack_response
    "data": bson.decode_all(response[20:], codec_options)}
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 815, in decode_all
    reraise(InvalidBSON, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 807, in decode_all
    codec_options))
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 344, in _elements_to_dict
    for key, value in _iterate_elements(data, position, obj_end, opts):
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 337, in _iterate_elements
    (key, value, position) = _element_to_dict(data, position, obj_end, opts)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 326, in _element_to_dict
    element_name)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 178, in _get_array
    data, position, obj_end, opts, element_name)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 147, in _get_object
    obj = _elements_to_dict(data, position + 4, end, opts)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 344, in _elements_to_dict
    for key, value in _iterate_elements(data, position, obj_end, opts):
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 337, in _iterate_elements
    (key, value, position) = _element_to_dict(data, position, obj_end, opts)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 326, in _element_to_dict
    element_name)
  File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 132, in _get_string
    opts.unicode_decode_error_handler, True)[0], end + 1
InvalidBSON: 'utf8' codec can't decode byte 0x8f in position 0: invalid start byte

対応方法としてはデータをきれいにしないといけないので一度該当のコレクションをmongoexportし、mongoimportでupsertしてください

$ mongoexport --host mongodb.hoge.jp -d test_db -c test_collection -o data.json
$ mongoimport --host mongodb.hoge.jp -d test_db -c test_collection data.json —upsert

もしくは下記のissueで行っているように

github.com

  "mainAddress": "mongodb://localhost:27017/?unicode_decode_error_handler=ignore",

とすると無視できます。


ReadTimeoutError

通常だと出ないらしいですが、ドキュメントが非常に多いと以下のようなtimeout(デフォルト10秒)によるエラーが起きます。

2016-12-12 14:29:06,226 [CRITICAL] mongo_connector.oplog_manager:630 - Exception during collection dump
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 583, in do_dump
    upsert_all(dm)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 567, in upsert_all
    dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 43, in wrapped
    reraise(new_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert
    for ok, resp in responses:
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 91, in _process_bulk_chunk
    raise e
OperationFailed: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'elasticsearch.example.com', port=9200): Read timed out. (read timeout=10))

github.com

この場合はtimeoutを延長すればOKです。

  "docManagers": [
    {
      "docManager": "elastic2_doc_manager",
      "targetURL": "elasticsearch.example.com",
      "args": {
        "clientOptions": {
          "timeout": 100
        }
      }
    }
  ]

InvalidBSON: Date value out of range

github.com

oplogのtimestampがおかしくなって、pymongoの範囲(0~9999年)を超える時に起きます。

対処法としては、pymongoにC extensionを導入すれば良いので、

Install PyMongo with C Extensions — MongoDB Ops Manager 1.3

こちらに沿ってpython-devをインストールした後にpymongoを再インストールしてください。


データ同期が段々遅れる

[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_1' is 25744 seconds behind the oplog.
[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_0' is 25864 seconds behind the oplog.
[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_2' is 23896 seconds behind the oplog.
[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_3' is 113880 seconds behind the oplog.
[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_4' is 22731 seconds 

こんな感じにbehind the oplogがどんどん増えていく問題が起きました。

対応としてはbatchSizeをデフォルトの-1(上限なし)ではなく、500~2000に調整したところbehindがなくなりました。

sizeを調整したところoplog.timestampが頻繁に更新されるようになったので、おそらくデフォルトの上限なしだと処理数が多すぎてoplog.timestampの更新がなかなか進まず、どんどんbehindしてしまうのではと考えています。


readPreferenceを設定したい

Mongoからはreadしかしないため、できればレプリカから読み込みたいですね。
その場合は以下のように

  "mainAddress": "mongodb://localhost:27017/?readPreference=secondaryPreferred",

を設定すると反映できます。

ソース