Carpe Diem

備忘録

Embulkの基本的な使い方

Embulkとは

fluentdのバッチ版のようなツールで、データを一括転送したい時に利用します。

f:id:quoll00:20190618131523p:plain

ref: GitHub - embulk/embulk: Embulk: Pluggable Bulk Data Loader.

メリット

自前で実装すると面倒な

  • 並列実行でパフォーマンスの最適化
  • validation
  • dry-run
  • エラーリカバリ
  • 冪等なリトライ

といった部分をサポートしてくれている上、

  • fluentdと同様にプラガブルに作られているので、inputもoutputも開発者が自由に開発できる

ため、プラグインが豊富なので大抵のユースケースをカバーできます。

なので自前でデータ転送をスクリプトやら簡易コードで書いたりすることはできるけれど、後々の運用を考えるとEmbulkを導入した方がいいケースは多々あると思います。

環境

サンプルコード

今回書いたサンプルコードはこちらです。

github.com

準備

インストール

Javaが必要なので予めインストールしておきます。

sudo apt install openjdk-8-jdk

ドキュメント通りにインストールコマンドを実行します。

curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

Embulkの型

Embulkの型としては以下が利用できます。

説明
boolean true / false
long 64bit符号付き整数型
timestamp 日付。ナノ秒を含む
double 64bit浮動小数点型
string 文字列

ref: Configuration — Embulk 0.8 documentation

プラグイン

先に述べたように、Embulkはプラガブルに作られているので開発者が自由に開発できます。
どんなプラグインがあるかは以下から探すと良いです。

List of Embulk Plugins by Category

インストールも

$ embulk gem install emublk-filter-expand_json

といった感じで簡単にできます。

基本的な使い方

サンプル

ドキュメントにサンプル実行方法があるので試してみます。

example

まずはexampleコマンドでサンプルファイルを生成します。

$ embulk example ./try1

guess

次にguessコマンドで最小限の設定を補完した設定ファイルを生成します。

$ embulk guess ./try1/seed.yml -o config.yml

これにより

in:
  type: file
  path_prefix: '/home/vagrant/./try1/csv/sample_'
out:
  type: stdout

というファイルが

in:
  type: file
  path_prefix: /home/vagrant/./try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

という形になります。

preview

previewでdry-runができます。

$ embulk preview config.yml
2019-06-18 04:33:57.454 +0000: Embulk v0.9.17
2019-06-18 04:33:57.827 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2019-06-18 04:33:59.601 +0000 [INFO] (main): Gem's home and path are set by default: "/home/vagrant/.embulk/lib/gems"
2019-06-18 04:34:00.497 +0000 [INFO] (main): Started Embulk v0.9.17
2019-06-18 04:34:00.571 +0000 [INFO] (0001:preview): Listing local files at directory '/home/vagrant/./try1/csv' filtering filename by prefix 'sample_'
2019-06-18 04:34:00.572 +0000 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2019-06-18 04:34:00.573 +0000 [INFO] (0001:preview): Loading files [/home/vagrant/./try1/csv/sample_01.csv.gz]
2019-06-18 04:34:00.584 +0000 [INFO] (0001:preview): Try to read 32,768 bytes from input source
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                            |
+---------+--------------+-------------------------+-------------------------+----------------------------+

run

runで実際に実行します。この例ではstdoutにoutputしています。

$ embulk run config.yml 
2019-06-18 04:35:33.838 +0000: Embulk v0.9.17
2019-06-18 04:35:34.677 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2019-06-18 04:35:38.497 +0000 [INFO] (main): Gem's home and path are set by default: "/home/vagrant/.embulk/lib/gems"
2019-06-18 04:35:40.151 +0000 [INFO] (main): Started Embulk v0.9.17
2019-06-18 04:35:40.250 +0000 [INFO] (0001:transaction): Listing local files at directory '/home/vagrant/./try1/csv' filtering filename by prefix 'sample_'
2019-06-18 04:35:40.255 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2019-06-18 04:35:40.257 +0000 [INFO] (0001:transaction): Loading files [/home/vagrant/./try1/csv/sample_01.csv.gz]
2019-06-18 04:35:40.353 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2019-06-18 04:35:40.365 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,
2019-06-18 04:35:40.605 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2019-06-18 04:35:40.609 +0000 [INFO] (main): Committed.
2019-06-18 04:35:40.611 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/home/vagrant/./try1/csv/sample_01.csv.gz"},"out":{}}

filter

EmbulkはETL(Extract, Transform, Load)ツールとも言われます。
inはExtract(データの抽出)、outはLoad(データの書き出し)が行えます。
そしてfilterではTransform(データの加工)が行えます。

例えばembulk-filter-hashは値をハッシュ化します。

filters:
  - type: hash
    columns:
    - { name: account }

これを追加すると先程のexampleの結果が以下のようになります。

+---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+
| id:long |                                                   account:string |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+
|       1 | 5001005fce342d61388dbfdd08106571334f950e64bfc909206a5eb9d5bf9792 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 | 715d9526f896b1d61038e6563101c6caed617c79061201657660a9ea9e545bee | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 | 78900179607eef6f475fd5887fcf9f238ebb9265967e3ff72f175902656e9676 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 | 0ca09f3815a1a11d4c18f8f468291f391d2ee0716a1f96083eb0bde233afa43c | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                            |
+---------+------------------------------------------------------------------+-------------------------+-------------------------+----------------------------+

liquidテンプレートエンジン

liquidというテンプレートエンジンを使うと、変数を使って値を注入することが可能です。
例えばdev, stg, prdなど環境によって変わる部分は環境変数を使うと良いです。

ファイル名に.liquidのsuffixを付けた上で

in:
  type: mongodb
  uri: "mongodb://{{env.MONGO_HOST}}:27017/{{env.DB_NAME}}"
  collection: 'users'
  query: '{ updatedAt: { $gte: 1 } }'
  projection: '{ name: 1, updatedAt: 1 }'
  sort: '{ updatedAt: 1 }'
out:
  type: stdout

のように{{ }}で変数を扱えます。
今回は環境変数なのでenv.という変数で指定できます。

$ export MONGO_HOST=localhost
$ export DB_NAME=testdb
$ embulk preview config.yml.liquid

環境変数を注入した状態で実行できます。

まとめ

データ転送ツールEmbulkの基本的な使い方を説明しました。
先に説明したようにデータ転送系の運用が非常に楽になると感じました。
またDigdagと組み合わせることでより柔軟なワークフローも実現できます。