Carpe Diem

備忘録

EmbulkでMongoDBのデータをBigQueryへ

概要

ユーザの行動ログで利用しているデータがDBにあるので、それをまるっとBigQueryへ書き込む方法をEmbulkを使って説明します。

BigQueryにマスタデータを保存する理由は?

単純に行動ログに保存される関連データがIDのみで保存されていると、詳細が知りたい時にDBの情報が必要になりフローが複雑化します。

例)ユーザはquest001を攻略した。quest001の詳細(クエスト名など)は?

一方行動ログにマスタデータの情報を付加するのは冗長なログになってしまいます。
なので行動ログとマスタデータをそれぞれBigQueryに保存し、分析側だけで閉じれるようにする、という方針です。

環境

サンプルコード

今回書いたサンプルコードはこちらです。

github.com

MongoDB to BigQuery

MongoDBからデータを抽出し、BigQueryへ書き込みしてみます。

プラグインインストール

まずは各プラグインのインストール

embulk gem install embulk-input-mongodb
embulk gem install embulk-output-bigquery
embulk gem install embulk-filter-expand_json

設定ファイルの作成

in

以下のパラメータを設定をします。

パラメータ
type mongodb
uri mongodb://hostname:port/db_name
collection コレクション名
query データを抽出する際のクエリ
projection 抽出するフィールドの指定
sort ソート
json_column_name 各レコードJSONとして生成されるので、後にfilterする際の名称

liquidテンプレートを使って一部環境変数で設定できるようにしておきます。

in:
  type: mongodb
  uri: "mongodb://{{env.MONGO_HOST}}:27017/{{env.DB_NAME}}"
  collection: 'users'
  query: '{ updatedAt: { $gte: 1 } }'
  projection: '{ name: 1, updatedAt: 1 }'
  sort: '{ updatedAt: 1 }'
  json_column_name: record

filter

embulk-filter-expand_jsonを使ってJSONのレコードを型を持ったレコードに変換します。
またビルトインのrenameフィルタでカラム名を変更します。

filters:
  - type: expand_json
    json_column_name: record
    expanded_columns:
      - {name: _id, type: string}
      - {name: name, type: string}
      - {name: updatedAt, type: long}
  - type: rename
    columns:
      _id: id
      updatedAt: updated_at

out

embulk-output-bigqueryで転送先の設定を行います。

out:
  type: bigquery
  auth_method: json_key
  json_keyfile: ./key.json
  project: {{env.GCP_PROJECT}}
  dataset: {{env.BQ_DATASET}}
  table: {{env.BQ_TABLE}}
  auto_create_table: true
  schema_file: ./schema.json

※鍵ファイルjson_keyfileは自分で用意してください

schema.json

BigQueryのテーブルスキーマは以下にしています。

[
  {
    "name": "id",
    "type": "STRING",
    "mode": "REQUIRED"
  },
  {
    "name": "name",
    "type": "STRING",
    "mode": "NULLABLE"
  },
  {
    "name": "updated_at",
    "type": "INTEGER",
    "mode": "NULLABLE"
  }
]

実行

環境変数を指定してpreviewまで一通り完了したら実際に実行します。

$ embulk run config.yml.liquid
2019-06-19 18:30:32.057 +0900 [INFO] (main): Started Embulk v0.9.17
2019-06-19 18:30:32.122 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-mongodb (0.7.0)
2019-06-19 18:30:34.433 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-bigquery (0.4.13)
2019-06-19 18:30:34.471 +0900 [INFO] (0001:transaction): Loaded plugin embulk-filter-expand_json (0.3.0)
...
2019-06-19 18:31:46.620 +0900 [INFO] (0001:transaction): embulk-output-bigquery: delete /var/folders/2c/k6glwp7d5tjcdf2b5snh1m9r9k8syg/T/embulk_output_bigquery_20190619-72999-w17sht.72999.2028.csv
2019-06-19 18:31:46.628 +0900 [INFO] (main): Committed.
2019-06-19 18:31:46.628 +0900 [INFO] (main): Next config diff: {"in":{"last_record":{}},"out":{}}

結果

BigQueryの方を確認すると以下のようにデータがちゃんと登録されてました。

f:id:quoll00:20190619182708p:plain

まとめ

Embulk自体の学習コストは多少かかりますが、一度仕組みを作ってしまえば今後はそれをコピペして簡単に新しいフィールドの追加やコレクションの連携が可能になります。

ソース