概要
ユーザの行動ログで利用しているデータがDBにあるので、それをまるっとBigQueryへ書き込む方法をEmbulkを使って説明します。
BigQueryにマスタデータを保存する理由は?
単純に行動ログに保存される関連データがIDのみで保存されていると、詳細が知りたい時にDBの情報が必要になりフローが複雑化します。
例)ユーザはquest001を攻略した。quest001の詳細(クエスト名など)は?
一方行動ログにマスタデータの情報を付加するのは冗長なログになってしまいます。
なので行動ログとマスタデータをそれぞれBigQueryに保存し、分析側だけで閉じれるようにする、という方針です。
環境
- Ubuntu 18.04
- Embulk v0.9.17
- embulk-input-mongodb v0.7.0
- embulk-output-bigquery v0.4.13
- embulk-filter-expand_json v0.3.0
サンプルコード
今回書いたサンプルコードはこちらです。
MongoDB to BigQuery
MongoDBからデータを抽出し、BigQueryへ書き込みしてみます。
プラグインインストール
まずは各プラグインのインストール
embulk gem install embulk-input-mongodb embulk gem install embulk-output-bigquery embulk gem install embulk-filter-expand_json
設定ファイルの作成
in
以下のパラメータを設定をします。
パラメータ | 例 |
---|---|
type | mongodb |
uri | mongodb://hostname:port/db_name |
collection | コレクション名 |
query | データを抽出する際のクエリ |
projection | 抽出するフィールドの指定 |
sort | ソート |
json_column_name | 各レコードJSONとして生成されるので、後にfilterする際の名称 |
liquidテンプレートを使って一部環境変数で設定できるようにしておきます。
in: type: mongodb uri: "mongodb://{{env.MONGO_HOST}}:27017/{{env.DB_NAME}}" collection: 'users' query: '{ updatedAt: { $gte: 1 } }' projection: '{ name: 1, updatedAt: 1 }' sort: '{ updatedAt: 1 }' json_column_name: record
filter
embulk-filter-expand_jsonを使ってJSONのレコードを型を持ったレコードに変換します。
またビルトインのrename
フィルタでカラム名を変更します。
filters: - type: expand_json json_column_name: record expanded_columns: - {name: _id, type: string} - {name: name, type: string} - {name: updatedAt, type: long} - type: rename columns: _id: id updatedAt: updated_at
out
embulk-output-bigqueryで転送先の設定を行います。
out: type: bigquery auth_method: json_key json_keyfile: ./key.json project: {{env.GCP_PROJECT}} dataset: {{env.BQ_DATASET}} table: {{env.BQ_TABLE}} auto_create_table: true schema_file: ./schema.json
※鍵ファイルjson_keyfile
は自分で用意してください
schema.json
BigQueryのテーブルスキーマは以下にしています。
[ { "name": "id", "type": "STRING", "mode": "REQUIRED" }, { "name": "name", "type": "STRING", "mode": "NULLABLE" }, { "name": "updated_at", "type": "INTEGER", "mode": "NULLABLE" } ]
実行
環境変数を指定してpreviewまで一通り完了したら実際に実行します。
$ embulk run config.yml.liquid 2019-06-19 18:30:32.057 +0900 [INFO] (main): Started Embulk v0.9.17 2019-06-19 18:30:32.122 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-mongodb (0.7.0) 2019-06-19 18:30:34.433 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13) 2019-06-19 18:30:34.471 +0900 [INFO] (0001:transaction): Loaded plugin embulk-filter-expand_json (0.3.0) ... 2019-06-19 18:31:46.620 +0900 [INFO] (0001:transaction): embulk-output-bigquery: delete /var/folders/2c/k6glwp7d5tjcdf2b5snh1m9r9k8syg/T/embulk_output_bigquery_20190619-72999-w17sht.72999.2028.csv 2019-06-19 18:31:46.628 +0900 [INFO] (main): Committed. 2019-06-19 18:31:46.628 +0900 [INFO] (main): Next config diff: {"in":{"last_record":{}},"out":{}}
結果
BigQueryの方を確認すると以下のようにデータがちゃんと登録されてました。
まとめ
Embulk自体の学習コストは多少かかりますが、一度仕組みを作ってしまえば今後はそれをコピペして簡単に新しいフィールドの追加やコレクションの連携が可能になります。