Embulkとは
fluentdのバッチ版のようなツールで、データを一括転送したい時に利用します。
ref: GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader.
メリット
自前で実装すると面倒な
- 並列実行でパフォーマンスの最適化
- validation
- dry-run
- エラーリカバリ
- 冪等なリトライ
といった部分をサポートしてくれている上、
- fluentdと同様にプラガブルに作られているので、inputもoutputも開発者が自由に開発できる
ため、プラグインが豊富なので大抵のユースケースをカバーできます。
なので自前でデータ転送をスクリプトやら簡易コードで書いたりすることはできるけれど、後々の運用を考えるとEmbulkを導入した方がいいケースは多々あると思います。
環境
サンプルコード
今回書いたサンプルコードはこちらです。
準備
インストール
Javaが必要なので予めインストールしておきます。
sudo apt install openjdk-8-jdk
ドキュメント通りにインストールコマンドを実行します。
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" chmod +x ~/.embulk/bin/embulk echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc source ~/.bashrc
Embulkの型
Embulkの型としては以下が利用できます。
型 | 説明 |
---|---|
boolean | true / false |
long | 64bit符号付き整数型 |
timestamp | 日付。ナノ秒を含む |
double | 64bit浮動小数点型 |
string | 文字列 |
ref: Configuration — Embulk 0.8 documentation
プラグイン
先に述べたように、Embulkはプラガブルに作られているので開発者が自由に開発できます。
どんなプラグインがあるかは以下から探すと良いです。
List of Embulk Plugins by Category
インストールも
$ embulk gem install emublk-filter-expand_json
といった感じで簡単にできます。
基本的な使い方
サンプル
ドキュメントにサンプル実行方法があるので試してみます。
example
まずはexample
コマンドでサンプルファイルを生成します。
$ embulk example ./try1
guess
次にguess
コマンドで最小限の設定を補完した設定ファイルを生成します。
$ embulk guess ./try1/seed.yml -o config.yml
これにより
in: type: file path_prefix: '/home/vagrant/./try1/csv/sample_' out: type: stdout
というファイルが
in: type: file path_prefix: /home/vagrant/./try1/csv/sample_ decoders: - {type: gzip} parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: 'NULL' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: {type: stdout}
という形になります。
preview
preview
でdry-runができます。
$ embulk preview config.yml 2019-06-18 04:33:57.454 +0000: Embulk v0.9.17 2019-06-18 04:33:57.827 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2019-06-18 04:33:59.601 +0000 [INFO] (main): Gem's home and path are set by default: "/home/vagrant/.embulk/lib/gems" 2019-06-18 04:34:00.497 +0000 [INFO] (main): Started Embulk v0.9.17 2019-06-18 04:34:00.571 +0000 [INFO] (0001:preview): Listing local files at directory '/home/vagrant/./try1/csv' filtering filename by prefix 'sample_' 2019-06-18 04:34:00.572 +0000 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2019-06-18 04:34:00.573 +0000 [INFO] (0001:preview): Loading files [/home/vagrant/./try1/csv/sample_01.csv.gz] 2019-06-18 04:34:00.584 +0000 [INFO] (0001:preview): Try to read 32,768 bytes from input source +---------+--------------+-------------------------+-------------------------+----------------------------+ | id:long | account:long | time:timestamp | purchase:timestamp | comment:string | +---------+--------------+-------------------------+-------------------------+----------------------------+ | 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk | | 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby | | 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin | | 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | | +---------+--------------+-------------------------+-------------------------+----------------------------+
run
run
で実際に実行します。この例ではstdoutにoutputしています。
$ embulk run config.yml 2019-06-18 04:35:33.838 +0000: Embulk v0.9.17 2019-06-18 04:35:34.677 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected. 2019-06-18 04:35:38.497 +0000 [INFO] (main): Gem's home and path are set by default: "/home/vagrant/.embulk/lib/gems" 2019-06-18 04:35:40.151 +0000 [INFO] (main): Started Embulk v0.9.17 2019-06-18 04:35:40.250 +0000 [INFO] (0001:transaction): Listing local files at directory '/home/vagrant/./try1/csv' filtering filename by prefix 'sample_' 2019-06-18 04:35:40.255 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped. 2019-06-18 04:35:40.257 +0000 [INFO] (0001:transaction): Loading files [/home/vagrant/./try1/csv/sample_01.csv.gz] 2019-06-18 04:35:40.353 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1 2019-06-18 04:35:40.365 +0000 [INFO] (0001:transaction): {done: 0 / 1, running: 0} 1,32864,2015-01-27 19:23:49,20150127,embulk 2,14824,2015-01-27 19:01:23,20150127,embulk jruby 3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin 4,11270,2015-01-29 11:54:36,20150129, 2019-06-18 04:35:40.605 +0000 [INFO] (0001:transaction): {done: 1 / 1, running: 0} 2019-06-18 04:35:40.609 +0000 [INFO] (main): Committed. 2019-06-18 04:35:40.611 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/home/vagrant/./try1/csv/sample_01.csv.gz"},"out":{}}
filter
EmbulkはETL(Extract, Transform, Load)ツールとも言われます。
in
はExtract(データの抽出)、out
はLoad(データの書き出し)が行えます。
そしてfilter
ではTransform(データの加工)が行えます。
例えばembulk-filter-hashは値をハッシュ化します。
filters: - type: hash columns: - { name: account }
これを追加すると先程のexampleの結果が以下のようになります。
+---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+ | id:long | account:string | time:timestamp | purchase:timestamp | comment:string | +---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+ | 1 | 5001005fce342d61388dbfdd08106571334f950e64bfc909206a5eb9d5bf9792 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk | | 2 | 715d9526f896b1d61038e6563101c6caed617c79061201657660a9ea9e545bee | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby | | 3 | 78900179607eef6f475fd5887fcf9f238ebb9265967e3ff72f175902656e9676 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin | | 4 | 0ca09f3815a1a11d4c18f8f468291f391d2ee0716a1f96083eb0bde233afa43c | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | | +---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+
liquidテンプレートエンジン
liquidというテンプレートエンジンを使うと、変数を使って値を注入することが可能です。
例えばdev, stg, prdなど環境によって変わる部分は環境変数を使うと良いです。
ファイル名に.liquid
のsuffixを付けた上で
in: type: mongodb uri: "mongodb://{{env.MONGO_HOST}}:27017/{{env.DB_NAME}}" collection: 'users' query: '{ updatedAt: { $gte: 1 } }' projection: '{ name: 1, updatedAt: 1 }' sort: '{ updatedAt: 1 }' out: type: stdout
のように{{ }}
で変数を扱えます。
今回は環境変数なのでenv.
という変数で指定できます。
$ export MONGO_HOST=localhost $ export DB_NAME=testdb $ embulk preview config.yml.liquid
で環境変数を注入した状態で実行できます。
まとめ
データ転送ツールEmbulkの基本的な使い方を説明しました。
先に説明したようにデータ転送系の運用が非常に楽になると感じました。
またDigdagと組み合わせることでより柔軟なワークフローも実現できます。