概要
以前はMongoRiverを使う - Carpe Diemで紹介したようにRiver
という機能を使って同期を実現させていました。
しかしながらElasticsearchがRiverを廃止することを決め、バージョン2.x以降は使うことができなくなりました。
そこで調べてみて挙がったのは以下です。
ツール | 用途 | デメリット |
---|---|---|
logstash | 様々なデータをElasticsearchへ投入するツール。 公式が薦めてる |
inputにmongoのプラグインがない |
beats | ログの解析に使える。ダッシュボードもすぐ用意できる | mongoのデータを送るようなプラグインがない |
embulk | バルクローダー。fluentdのバッチ版 | バッチ版というようにリアルタイム同期ではない |
fluentd | リアルタイムな同期システムを作れる | データがすでにあるものに対しての全同期には不向き |
transporter | 全同期可能。リアルタイムなデータ同期もできる | sharded mongo clusterに未対応 |
mongo-connector | 同上 | デフォルト設定だとログが少ない |
以上の結果からmongo-connector
を使用することにしました。
mongo-connectorのいい点
- mongodb labsが作っている
- 初回の全同期が可能
- oplogを追従するのでリアルタイムに同期できる
- Sharded clusterに対応
- DB、Collectionの指定が可能
- Arrayに対応
- プロセスが落ちてからもoplogの位置を見失わなければ再開可能
特に1つ目は大きくて、個人開発だとやはり保守に限界があるのでElasticsearchの更新に追従できなくなる事が多いです。
MongoDBを作ってるチームなのでそこは安心感があります。
環境
- Ubuntu 14.04
- MongoDB 3.2.9
- Elasticsearch 2.3.5
- mongo-connector 2.4.1
実装
準備
レプリカセットが必須なので、DockerでMongoDBのレプリカセットを構築 - Carpe Diemを参考に用意してください。
インストール
$ sudo pip install mongo-connector
Elasticsearchを使う場合はelastic2-doc-manager
もインストールしてください
$ sudo pip install elastic2-doc-manager
config.json
コレクションの指定などをしたい場合はconfigで詳細を設定します。
- mongodbは
localhost:27017
- elasticsearchは
localhost:9200
- mongoの
test_db
というDBのcol1
、col2
というコレクションを、Elasticsearchのtest_index
というインデクスのtype1
、type2
というタイプに変換したい
だと以下のようになります。
{ "mainAddress": "localhost:27017", "oplogFile": "/var/log/mongo-connector/oplog.timestamp", "verbosity": 1, "logging": { "type": "file", "filename": "/var/log/mongo-connector/mongo-connector.log", "__rotationWhen": "D", "__rotationInterval": 1, "__rotationBackups": 10 }, "namespaces": { "include": ["test_db.col1", "test_db.col2"], "mapping": { "test_db.col1": "test_index.type1", "test_db.col2": "test_index.typ2" } }, "docManagers": [ { "docManager": "elastic2_doc_manager", "targetURL": "localhost:9200" } ] }
詳細は以下です。
Configuration Options · yougov/mongo-connector Wiki · GitHub
設定例はgithubの例を見てください。
https://github.com/mongodb-labs/mongo-connector/blob/master/config.json
実行
$ mongo-connector -c config.json
で実行できます。フォアグラウンドで実行されるのでsupervisorなどで管理したほうが良いです。
注意点
以下実際に動かしてみて詰まったところをまとめます。
データソース(mainAddress)にmongosを指定することはできない、と書いてあるが実際はできる
shardで分けたmongod自体を指定して、と書いてありますが、ソースはmongosOK前提としたロジックであり、動かしてみるとmongos指定で動きます。
Elasticsearchのスペックが意外と必要
2000万件ほどのデータを同期してみましたが、デフォルトの設定だとc4.large
くらいないと途中で詰まって全然進まなくなります。
ELBは経由させないほうがいい
Elasticsearchの前に置いたELBを経由させると、ロードバランシングのせいでエラーが起きることがあります。
2016-08-25 08:00:26,733 [WARNING] elasticsearch:88 - POST http://elasticsearch.hoge.jp:80/_bulk [status:N/A request:10.011s] Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/connection/http_urllib3.py", line 94, in perform_request response = self.pool.urlopen(method, url, body, retries=False, headers=self.headers, **kw) File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 640, in urlopen _stacktrace=sys.exc_info()[2]) File "/usr/local/lib/python2.7/dist-packages/urllib3/util/retry.py", line 238, in increment raise six.reraise(type(error), error, _stacktrace) File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 595, in urlopen chunked=chunked) File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 395, in _make_request self._raise_timeout(err=e, url=url, timeout_value=read_timeout) File "/usr/local/lib/python2.7/dist-packages/urllib3/connectionpool.py", line 315, in _raise_timeout raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value) ReadTimeoutError: HTTPConnectionPool(host=u'elasticsearch.hoge.jp', port=80): Read timed out. (read timeout=10) 2016-08-25 08:00:26,736 [CRITICAL] mongo_connector.oplog_manager:625 - Exception during collection dump Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 578, in do_dump upsert_all(dm) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 562, in upsert_all dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 43, in wrapped reraise(new_type, exc_value, exc_tb) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert for ok, resp in responses: File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 91, in _process_bulk_chunk raise e OperationFailed: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'elasticsearch.hoge.jp', port=80): Read timed out. (read timeout=10))
ですのでELBは経由させないほうが安定します。
max_docs
が肥大化する
num_docs
に比べてmax_docs
が異常に多くなります。
※Elasticsearchでシャード構成にしていないケースでは一致してました。
デフォルトだとログが非常に少ない
デフォルトは"verbosity": 0
なのですが、この状態だとログがほとんど出ないためデバッグに非常に苦労します。なので最低でも1にした方がいいです。
InvalidBSON: 'utf8' codec can't decode byte
DBのドキュメントに変なものが混ざっているとこのようなエラーが出てきます。
2016-08-25 23:15:09,008 [CRITICAL] mongo_connector.oplog_manager:625 - Exception during collection dump Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 578, in do_dump upsert_all(dm) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 562, in upsert_all dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert for ok, resp in responses: File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 161, in streaming_bulk for bulk_actions in _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 55, in _chunk_actions for action, data in actions: File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 195, in docs_to_upsert for doc in docs: File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 525, in docs_to_dump for doc in cursor: File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 1090, in next if len(self.__data) or self._refresh(): File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 1032, in _refresh self.__max_await_time_ms)) File "/usr/local/lib/python2.7/dist-packages/pymongo/cursor.py", line 903, in __send_message codec_options=self.__codec_options) File "/usr/local/lib/python2.7/dist-packages/pymongo/helpers.py", line 142, in _unpack_response "data": bson.decode_all(response[20:], codec_options)} File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 815, in decode_all reraise(InvalidBSON, exc_value, exc_tb) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 807, in decode_all codec_options)) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 344, in _elements_to_dict for key, value in _iterate_elements(data, position, obj_end, opts): File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 337, in _iterate_elements (key, value, position) = _element_to_dict(data, position, obj_end, opts) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 326, in _element_to_dict element_name) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 178, in _get_array data, position, obj_end, opts, element_name) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 147, in _get_object obj = _elements_to_dict(data, position + 4, end, opts) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 344, in _elements_to_dict for key, value in _iterate_elements(data, position, obj_end, opts): File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 337, in _iterate_elements (key, value, position) = _element_to_dict(data, position, obj_end, opts) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 326, in _element_to_dict element_name) File "/usr/local/lib/python2.7/dist-packages/bson/__init__.py", line 132, in _get_string opts.unicode_decode_error_handler, True)[0], end + 1 InvalidBSON: 'utf8' codec can't decode byte 0x8f in position 0: invalid start byte
対応方法としてはデータをきれいにしないといけないので一度該当のコレクションをmongoexportし、mongoimportでupsertしてください
$ mongoexport --host mongodb.hoge.jp -d test_db -c test_collection -o data.json $ mongoimport --host mongodb.hoge.jp -d test_db -c test_collection data.json —upsert
もしくは下記のissueで行っているように
"mainAddress": "mongodb://localhost:27017/?unicode_decode_error_handler=ignore",
とすると無視できます。
ReadTimeoutError
通常だと出ないらしいですが、ドキュメントが非常に多いと以下のようなtimeout(デフォルト10秒)によるエラーが起きます。
2016-12-12 14:29:06,226 [CRITICAL] mongo_connector.oplog_manager:630 - Exception during collection dump Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 583, in do_dump upsert_all(dm) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/oplog_manager.py", line 567, in upsert_all dm.bulk_upsert(docs_to_dump(namespace), mapped_ns, long_ts) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 43, in wrapped reraise(new_type, exc_value, exc_tb) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/util.py", line 32, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mongo_connector/doc_managers/elastic2_doc_manager.py", line 229, in bulk_upsert for ok, resp in responses: File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/__init__.py", line 91, in _process_bulk_chunk raise e OperationFailed: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'elasticsearch.example.com', port=9200): Read timed out. (read timeout=10))
この場合はtimeoutを延長すればOKです。
"docManagers": [ { "docManager": "elastic2_doc_manager", "targetURL": "elasticsearch.example.com", "args": { "clientOptions": { "timeout": 100 } } } ]
InvalidBSON: Date value out of range
oplogのtimestampがおかしくなって、pymongo
の範囲(0~9999年)を超える時に起きます。
対処法としては、pymongoにC extensionを導入すれば良いので、
Install PyMongo with C Extensions — MongoDB Ops Manager 1.3
こちらに沿ってpython-dev
をインストールした後にpymongo
を再インストールしてください。
データ同期が段々遅れる
[INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_1' is 25744 seconds behind the oplog. [INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_0' is 25864 seconds behind the oplog. [INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_2' is 23896 seconds behind the oplog. [INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_3' is 113880 seconds behind the oplog. [INFO] mongo_connector.oplog_manager:61 - OplogThread for replica set 'production_4' is 22731 seconds
こんな感じにbehind the oplog
がどんどん増えていく問題が起きました。
対応としてはbatchSize
をデフォルトの-1(上限なし)
ではなく、500~2000に調整したところbehindがなくなりました。
sizeを調整したところoplog.timestamp
が頻繁に更新されるようになったので、おそらくデフォルトの上限なしだと処理数が多すぎてoplog.timestampの更新がなかなか進まず、どんどんbehindしてしまうのではと考えています。
readPreferenceを設定したい
Mongoからはreadしかしないため、できればレプリカから読み込みたいですね。
その場合は以下のように
"mainAddress": "mongodb://localhost:27017/?readPreference=secondaryPreferred",
を設定すると反映できます。