データ分析の最初の手順であるデータ収集には大きく分けてストリーミング処理とバッチ処理という2つの方法があります。前回は、ストリーミング処理の1つであるFluentdによるデータ収集の方法を紹介しました。今回はもう一方のバッチ処理による収集をEmbulkというツールで行う方法を解説します。
Embulkでマスターデータをバッチ処理として収集する
Embulkとは
Fluentdはリアルタイムに生成されるデータに対しては有効なツールでした。しかし、RDBのマスターデータや外部サービスから定期的に数GB生成されるようなデータに対してはFluentdではカバーできないという課題もわかってきました。
こうしたケースに対して、これまでユーザは自前のスクリプトやETL(Extract、Transform、Load)ツールを作って、データ収集を行っていましたが、エラーハンドリングやリトライの仕組み、インポート速度といったパフォーマンスの問題などの課題があり、スクリプトを作ること自体が大変労力のかかる作業となってしまっていました。
こうした課題を解決するために、Embulkという並列データ転送フレームワークが開発されました。
Embulkの特徴として、Fluentdと同じプラグイン機構を持っていることが挙げられます。入力、出力、データ加工などのプラグインを書くことで足りない機能を補完し、現場で使えるツールに拡張していくことができるのです。また、これらがオープンソースとして公開されており、各データソース毎にさまざまな制約があるという課題を多くの人と共有することで、ノウハウが詰まったスクリプトを継続的にメンテナンスできるようになります。EmbulkのPluginのリストはこちらで確認することができます。
そして、Embulkでデータのインポートは、3つのステップで実現することができます。
-
- guess
- データを一部読み込み、自動でスキーマを推定し、設定ファイルを生成します。
-
- preview
- 設定ファイルのスキーマ情報を元に読み込んだ際のプレビューを行います。
- ここで想定と異なるスキーマに成ってしまっていた場合には、設定ファイルを手動で修正します。
-
- run
- 完成した設定ファイルを元にEmbulkを動かし、データの転送を実行します。
Embulkをインストールする
では、Embulkをインストールしてみましょう。前回のFluentdと同様に、Ubuntu14.04にセットアップをします。なお、EmbulkはWindowsでも動作させることが可能です。
マスターデータのフォーマット
Embulkをインストールして、データを収集する準備はできました。それでは対象となるデータについて見てみましょう。
今回は、PostgreSQLにマスターデータが格納されていることにします。一般的にマスターデータにはユーザ情報が格納されていると思います。今回は前回使ったアクセスログとは関係ないデータで試します、下記のようにテーブルが定義されているとします。
表1 PostgreSQL上のマスターデータ
user_id |
sex |
last_update |
closed_account_time |
age |
city |
device |
country |
1190452 |
1 |
2011-07-08 22:13:19 |
0001-01-01 00:00:00 |
21 |
miyagi |
smart phone |
japan |
1581708 |
1 |
2010-04-04 23:33:52 |
0001-01-01 00:00:00 |
40 |
tokyo |
feature phone |
japan |
1629941 |
0 |
2009-11-04 20:41:10 |
0001-01-01 00:00:00 |
35 |
tokyo |
smart phone |
japan |
このデータをローカルのPostgreSQLのtestdb以下で定義しておきます。また合わせてサンプルデータもインポートしておきます。
また、分析エンジン側のMySQLにも同様のテーブルを作っておきます。
PostgreSQLのマスタデータを分析エンジンにインポートする
Pluginのインストール
マスターデータを格納するローカルのPostgreSQLから分析エンジンにしているMySQLへのインポートを行います。そこで今回は、embulk-input-postgresqlとembulk-output-mysqlを使って行います。このプラグインで、MySQLやPostgreSQLなどのJDBCが使えるデータベースに対してのインポートエクスポートが可能になります。Embulkのpluginのリストはこちらで確認ができます。
Guessコマンドによるスキーマの推定
プラグインのインストールができたら、まずはGuessコマンドを実施します。入力元を参照するためのシードファイル(seed.yml)をはじめに作る必要があります。シードファイルには、データベースへの認証情報や、データ取得するための任意のクエリの設定や、データの型変換などの設定を書くことができます。
また設定のパートとして主にinとexecとoutの3ヵ所があり、inではデータの読み込み元を設定し、execでは読み込んだデータを加工する処理を加え(今回は利用していません)、outでは出力先を設定することができます。
ここでの注意点として、データベースによってはスキーマが異なる可能性がありますが、そうした際にはcolumn_optionsを使って対応することができます。上記ではtimestampで読み込んでいたデータをdatetimeに変換をしています。
Previewコマンドによるデータチェック
previewコマンドでは、先ほどのシードファイルを元に処理されるデータの確認を行います。これにより各カラムが設定されたスキーマを元にデータを表示することができます。
Runコマンドによるデータ転送
プレビューでスキーマの設定に問題がないことを確認したらRunコマンドを実行します。これにより、先ほどのプレビューで表示したデータがMySQLにインポートされます。
インポートが完了したら、正しくデータが入っているかを確認してみましょう。
おまけ:Embulkをスケジュール実行する
先ほどまでの例では、1回で全部のデータをインポートする方法を採っていました。それ以外に、EmbulkをCronに登録して1日1度実行するといった利用例も考えられます。その場合に、ファイルから読み込むFileInput Pluginの場合にはlast_path
を利用することで、最後に読み込んだファイルパスを覚えておき、次回はそれ以降のファイルから読み出すことが可能です。
では、今回入力元となったPostgreSQLから毎日更新されたデータのみをインポートする場合を考えてみます。
embulk-input-postgresqlには、上記のlast_pathの仕組みはありません。しかし、設定内に任意のクエリを記載することも可能です。そこで、クエリの条件句にて差分のインポートを行います。ただしこの方法の場合には、レコードに作成日や更新日の情報を持ったカラムが必要となります。
それでは、前述のusersテーブルを例に挙げると、last_update
カラムを利用して、前日に更新されたデータのみを取得してみます
まずは、設定ファイルのin句を書き換えてみましょう。下記の設定ファイルでは、WHEREの条件が追加していることがわかります。これにより、実行日から1日前に更新されたデータを取得することが可能です。
最後に、この設定ファイルを元にembulk run load.yml
をcronで毎日実行するだけで、スケジュール実行を実現できます。
データインポートのまとめ
前回は、Fluentdによってリアルタイムに生成されるログデータをストリーミングでインポートし、今回はバッチ型のデータ転送フレームワークであるEmbulkによってマスターデータを保管するPostgresQLのデータを一括で分析エンジンにインポートしました。
これらの2つの方法を用いることで、従来、煩雑で手間のかかる作業であったデータ収集を手軽に行えるようになります。次回以降では、これら一元的に溜められたデータを用いて、基本KPI分析と応用KPI分析を進めていきます。