これなら使える!ビッグデータ分析基盤のエコシステム

第3回ストリーミング処理とバッチ処理によるデータ収集 ~ Embulk編 ~

データ分析の最初の手順であるデータ収集には大きく分けてストリーミング処理とバッチ処理という2つの方法があります。前回は、ストリーミング処理の1つであるFluentdによるデータ収集の方法を紹介しました。今回はもう一方のバッチ処理による収集をEmbulkというツールで行う方法を解説します。

図1 今回紹介する処理(図中の色のついた部分)
図1 今回紹介する処理(図中の色のついた部分)

Embulkでマスターデータをバッチ処理として収集する

Embulkとは

Fluentdはリアルタイムに生成されるデータに対しては有効なツールでした。しかし、RDBのマスターデータや外部サービスから定期的に数GB生成されるようなデータに対してはFluentdではカバーできないという課題もわかってきました。

こうしたケースに対して、これまでユーザは自前のスクリプトやETL(Extract、Transform、Load)ツールを作って、データ収集を行っていましたが、エラーハンドリングやリトライの仕組み、インポート速度といったパフォーマンスの問題などの課題があり、スクリプトを作ること自体が大変労力のかかる作業となってしまっていました。

こうした課題を解決するために、Embulkという並列データ転送フレームワークが開発されました。

Embulkの特徴として、Fluentdと同じプラグイン機構を持っていることが挙げられます。入力、出力、データ加工などのプラグインを書くことで足りない機能を補完し、現場で使えるツールに拡張していくことができるのです。また、これらがオープンソースとして公開されており、各データソース毎にさまざまな制約があるという課題を多くの人と共有することで、ノウハウが詰まったスクリプトを継続的にメンテナンスできるようになります。EmbulkのPluginのリストはこちらで確認することができます。

そして、Embulkでデータのインポートは、3つのステップで実現することができます。

  1. guess
    データを一部読み込み、自動でスキーマを推定し、設定ファイルを生成します。
  2. preview
    設定ファイルのスキーマ情報を元に読み込んだ際のプレビューを行います。
    ここで想定と異なるスキーマに成ってしまっていた場合には、設定ファイルを手動で修正します。
  3. run
    完成した設定ファイルを元にEmbulkを動かし、データの転送を実行します。

Embulkをインストールする

では、Embulkをインストールしてみましょう。前回のFluentdと同様に、Ubuntu14.04にセットアップをします。なお、EmbulkはWindowsでも動作させることが可能です

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

マスターデータのフォーマット

Embulkをインストールして、データを収集する準備はできました。それでは対象となるデータについて見てみましょう。

今回は、PostgreSQLにマスターデータが格納されていることにします。一般的にマスターデータにはユーザ情報が格納されていると思います。今回は前回使ったアクセスログとは関係ないデータで試します、下記のようにテーブルが定義されているとします。

表1 PostgreSQL上のマスターデータ
user_id sex last_update closed_account_time age city device country
1190452 1 2011-07-08 22:13:19 0001-01-01 00:00:00 21 miyagi smart phone japan
1581708 1 2010-04-04 23:33:52 0001-01-01 00:00:00 40 tokyo feature phone japan
1629941 0 2009-11-04 20:41:10 0001-01-01 00:00:00 35 tokyo smart phone japan

このデータをローカルのPostgreSQLのtestdb以下で定義しておきます。また合わせてサンプルデータもインポートしておきます。

create table users (
  user_id bigint PRIMARY KEY,
  sex smallint,
  last_update timestamp NOT NULL,
  closed_account_time timestamp NOT NULL,
  age smallint,
  city varchar(30),
  device varchar(30),
  country varchar(30)
);

INSERT INTO users (user_id, sex, last_update, closed_account_time, age, city, device, country)
VALUES
(1190452, 1, '2011-07-08 22:13:19', '0001-01-01 00:00:00', 21, 'miyagi' ,'smart_phone', 'japan'),
(1581708, 1, '2010-04-04 23:33:52', '0001-01-01 00:00:00', 40, 'tokyo', 'feature phone', 'japan'),
(1629941, 0, '2009-11-04 20:41:10', '0001-01-01 00:00:00', 35, 'tokyo', 'smart phone', 'japan')

また、分析エンジン側のMySQLにも同様のテーブルを作っておきます。

create table users (
  user_id bigint PRIMARY KEY,
  sex smallint,
  last_update DATETIME,
  closed_account_time DATETIME,
  age smallint,
  city varchar(30),
  device varchar(30),
  country varchar(30)
);

PostgreSQLのマスタデータを分析エンジンにインポートする

Pluginのインストール

マスターデータを格納するローカルのPostgreSQLから分析エンジンにしているMySQLへのインポートを行います。そこで今回は、embulk-input-postgresqlとembulk-output-mysqlを使って行います。このプラグインで、MySQLやPostgreSQLなどのJDBCが使えるデータベースに対してのインポートエクスポートが可能になります。Embulkのpluginのリストはこちらで確認ができます。

$ embulk gem install embulk-input-postgresql
$ embulk gem install embulk-output-mysql

Guessコマンドによるスキーマの推定

プラグインのインストールができたら、まずはGuessコマンドを実施します。入力元を参照するためのシードファイル(seed.yml)をはじめに作る必要があります。シードファイルには、データベースへの認証情報や、データ取得するための任意のクエリの設定や、データの型変換などの設定を書くことができます。

また設定のパートとして主にinとexecとoutの3ヵ所があり、inではデータの読み込み元を設定し、execでは読み込んだデータを加工する処理を加え(今回は利用していません⁠⁠、outでは出力先を設定することができます。

seed.yml
in:
  type: postgresql
  host: localhost
  user: postgres
  password: "password"
  database: testdb
  table: users
  select: "*"
out:
  type: mysql
  host: HOSTNAME
  user: mysql
  password: "password"
  database: testdb
  table: users
  mode: insert
  column_options:
    last_update: {type: datetime}
    closed_account_time: {type: datetime}

ここでの注意点として、データベースによってはスキーマが異なる可能性がありますが、そうした際にはcolumn_optionsを使って対応することができます。上記ではtimestampで読み込んでいたデータをdatetimeに変換をしています。

$ embulk guess seed.yml -o guess.yml
2015-07-29 18:10:15.650 +0900: Embulk v0.6.19
2015-07-29 18:10:17.095 +0900 [INFO] (guess): Loaded plugin embulk-input-postgresql (0.6.0)
in:
  type: postgresql
  host: localhost
  user: postgres
  password: password
  database: testdb
  table: users
  select: '*'
  column_options:
    last_update: {type: string, timestamp_format: '%Y/%m/%d %H:%M:%S', timezone: '+0900'}
    closed_account_time: {type: string, timestamp_format: '%Y/%m/%d %H:%M:%S', timezone: '+0900'}
out:
  type: mysql
  host: HOSTNAME
  user: mysql
  password: password
  database: testdb
  table: users
  mode: insert
  column_options:
    last_update: {type: datetime}
    closed_account_time: {type: datetime}

Created 'guess.yml' file.

Previewコマンドによるデータチェック

previewコマンドでは、先ほどのシードファイルを元に処理されるデータの確認を行います。これにより各カラムが設定されたスキーマを元にデータを表示することができます。

$ embulk preview guess.yml
2015-07-29 18:15:50.053 +0900: Embulk v0.6.19
2015-07-29 18:15:51.473 +0900 [INFO] (preview): Loaded plugin embulk-input-postgresql (0.6.0)
2015-07-29 18:15:51.549 +0900 [INFO] (preview): SQL: SET search_path TO "public"
2015-07-29 18:15:51.769 +0900 [INFO] (preview): SQL: SET search_path TO "public"
2015-07-29 18:15:51.770 +0900 [INFO] (preview): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "users"
2015-07-29 18:15:51.771 +0900 [INFO] (preview): SQL: FETCH FORWARD 10000 FROM cur
2015-07-29 18:15:51.773 +0900 [INFO] (preview): > 0.00 seconds
2015-07-29 18:15:51.795 +0900 [INFO] (preview): SQL: FETCH FORWARD 10000 FROM cur
2015-07-29 18:15:51.796 +0900 [INFO] (preview): > 0.00 seconds
+--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+
| user_id:long | sex:long |   last_update:timestamp | closed_account_time:timestamp | age:long | city:string | device:string | country:string |
+--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+
|    1,190,452 |        1 | 2011-07-08 13:13:19 UTC |       0000-12-29 15:00:00 UTC |       21 |      miyagi |   smart_phone |          japan |
|    1,581,708 |        1 | 2010-04-04 14:33:52 UTC |       0000-12-29 15:00:00 UTC |       40 |       tokyo | feature phone |          japan |
|    1,629,941 |        0 | 2009-11-04 11:41:10 UTC |       0000-12-29 15:00:00 UTC |       35 |       tokyo |   smart phone |          japan |
+--------------+----------+-------------------------+-------------------------------+----------+-------------+---------------+----------------+

Runコマンドによるデータ転送

プレビューでスキーマの設定に問題がないことを確認したらRunコマンドを実行します。これにより、先ほどのプレビューで表示したデータがMySQLにインポートされます。

$ embulk run guess.yml
2015-07-29 18:12:46.199 +0900: Embulk v0.6.19
2015-07-29 18:12:48.901 +0900 [INFO] (transaction): Loaded plugin embulk-input-postgresql (0.6.0)
2015-07-29 18:12:48.951 +0900 [INFO] (transaction): Loaded plugin embulk-output-mysql (0.4.1)
2015-07-29 18:12:49.158 +0900 [INFO] (transaction): SQL: SET search_path TO "public"
2015-07-29 18:12:49.339 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2015-07-29 18:12:49.653 +0900 [INFO] (transaction): Using insert mode
2015-07-29 18:12:49.668 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS `users_0000000055b8990f21bc7980_bl_tmp000`
2015-07-29 18:12:49.675 +0900 [INFO] (transaction): > 0.01 seconds
2015-07-29 18:12:49.684 +0900 [INFO] (transaction): SQL: CREATE TABLE IF NOT EXISTS `users_0000000055b8990f21bc7980_bl_tmp000` (`user_id` BIGINT, `sex` SMALLINT, `last_update` datetime, `closed_account_time` datetime, `age` SMALLINT, `city` VARCHAR(30), `device` VARCHAR(30), `country` VARCHAR(30))
2015-07-29 18:12:49.714 +0900 [INFO] (transaction): > 0.03 seconds
2015-07-29 18:12:49.756 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-07-29 18:12:49.795 +0900 [INFO] (task-0000): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2015-07-29 18:12:49.807 +0900 [INFO] (task-0000): Prepared SQL: INSERT INTO `users_0000000055b8990f21bc7980_bl_tmp000` (`user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country`) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
2015-07-29 18:12:50.010 +0900 [INFO] (task-0000): SQL: SET search_path TO "public"
2015-07-29 18:12:50.011 +0900 [INFO] (task-0000): SQL: DECLARE cur NO SCROLL CURSOR FOR SELECT * FROM "users"
2015-07-29 18:12:50.012 +0900 [INFO] (task-0000): SQL: FETCH FORWARD 10000 FROM cur
2015-07-29 18:12:50.013 +0900 [INFO] (task-0000): > 0.00 seconds
2015-07-29 18:12:50.056 +0900 [INFO] (task-0000): SQL: FETCH FORWARD 10000 FROM cur
2015-07-29 18:12:50.057 +0900 [INFO] (task-0000): > 0.00 seconds
2015-07-29 18:12:50.062 +0900 [INFO] (task-0000): Loading 3 rows
2015-07-29 18:12:50.075 +0900 [INFO] (task-0000): > 0.01 seconds (loaded 3 rows in total)
2015-07-29 18:12:50.076 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-07-29 18:12:50.076 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=2700000}
2015-07-29 18:12:50.091 +0900 [INFO] (transaction): SQL: INSERT INTO `users` (`user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country`) SELECT `user_id`, `sex`, `last_update`, `closed_account_time`, `age`, `city`, `device`, `country` FROM `users_0000000055b8990f21bc7980_bl_tmp000`
2015-07-29 18:12:50.092 +0900 [INFO] (transaction): > 0.00 seconds (3 rows)
2015-07-29 18:12:50.108 +0900 [INFO] (transaction): Connecting to jdbc:mysql://HOSTNAME:3306/testdb options {user=mysql, tcpKeepAlive=true, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2015-07-29 18:12:50.125 +0900 [INFO] (transaction): SQL: DROP TABLE IF EXISTS `users_0000000055b8990f21bc7980_bl_tmp000`
2015-07-29 18:12:50.137 +0900 [INFO] (transaction): > 0.01 seconds
2015-07-29 18:12:50.137 +0900 [INFO] (main): Committed.
2015-07-29 18:12:50.138 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

インポートが完了したら、正しくデータが入っているかを確認してみましょう。

mysql> select * from users;                                                                               +---------+------+---------------------+---------------------+------+--------+---------------+---------+
| user_id | sex  | last_update         | closed_account_time | age  | city   | device        | country |
+---------+------+---------------------+---------------------+------+--------+---------------+---------+
| 1190452 |    1 | 2011-07-08 22:13:19 | 0001-01-01 00:00:00 |   21 | miyagi | smart_phone   | japan   |
| 1581708 |    1 | 2010-04-04 23:33:52 | 0001-01-01 00:00:00 |   40 | tokyo  | feature phone | japan   |
| 1629941 |    0 | 2009-11-04 20:41:10 | 0001-01-01 00:00:00 |   35 | tokyo  | smart phone   | japan   |
+---------+------+---------------------+---------------------+------+--------+---------------+---------+
3 rows in set (0.01 sec)

おまけ:Embulkをスケジュール実行する

先ほどまでの例では、1回で全部のデータをインポートする方法を採っていました。それ以外に、EmbulkをCronに登録して1日1度実行するといった利用例も考えられます。その場合に、ファイルから読み込むFileInput Pluginの場合にはlast_pathを利用することで、最後に読み込んだファイルパスを覚えておき、次回はそれ以降のファイルから読み出すことが可能です。

では、今回入力元となったPostgreSQLから毎日更新されたデータのみをインポートする場合を考えてみます。

embulk-input-postgresqlには、上記のlast_pathの仕組みはありません。しかし、設定内に任意のクエリを記載することも可能です。そこで、クエリの条件句にて差分のインポートを行います。ただしこの方法の場合には、レコードに作成日や更新日の情報を持ったカラムが必要となります。

それでは、前述のusersテーブルを例に挙げると、last_updateカラムを利用して、前日に更新されたデータのみを取得してみます

まずは、設定ファイルのin句を書き換えてみましょう。下記の設定ファイルでは、WHEREの条件が追加していることがわかります。これにより、実行日から1日前に更新されたデータを取得することが可能です。

in:
  type: postgresql
  host: localhost
  user: postgres
  password: "password"
  database: testdb
  table: users
  select: "*"
  where: "last_update between (current_date - interval '1 day') and (current_date)"

最後に、この設定ファイルを元にembulk run load.ymlをcronで毎日実行するだけで、スケジュール実行を実現できます。

データインポートのまとめ

前回は、Fluentdによってリアルタイムに生成されるログデータをストリーミングでインポートし、今回はバッチ型のデータ転送フレームワークであるEmbulkによってマスターデータを保管するPostgresQLのデータを一括で分析エンジンにインポートしました。

これらの2つの方法を用いることで、従来、煩雑で手間のかかる作業であったデータ収集を手軽に行えるようになります。次回以降では、これら一元的に溜められたデータを用いて、基本KPI分析と応用KPI分析を進めていきます。

おすすめ記事

記事・ニュース一覧