Java以外でMapReduceする
前回はJavaのネイティブプログラミングでMapReduceを実践してみましたが、今回はもっと手軽な方法を試してみたいと思います。Hadoopに付属しているstreamingユーティリティ(以下ストリーミング)を使うという方法です。ストリーミングを使用することでMap, Reduce処理は独立したプログラムで記述することができます。
それぞれの処理の入出力は標準入出力を使用しますが、逆に言うと標準入出力を使用する処理であれば言語は問いません。つまり、PerlやRuby, PHP,AWKといった使い慣れた言語でHadoopの仕組みを利用できるようになります。すばらしい!
PerlでMapReduceする
今回はPerlを使用しますが、標準入出力を使える言語であれば何でも構いません。Map,Reduceの処理は次の図のようになります。
ネイティブな環境とストリーミングの違いはSort&Shuffleの出力(Reduceの入力)です。ネイティブ環境では、
だったのに対し、ストリーミング環境では
- キー、バリュー1
- キー、バリュー2
- キー、バリュー3
というふうに出力されます。
それではMap, ReduceをPerlで書いてみましょう。
まず準備としてMeCabのPerlのモジュールを導入します。MeCab本体は前回までの準備で入っているものとします。
MeCabのサイトからmecab-perl-0.98.tar.gzをダウンロード&展開します。
この作業は全てのサーバで必要になります。
MapプログラムをPerlで書く
それではJavaで書いたプログラムをPerl化する作業に入りましょう。入力と出力は変わりません。前回の入出力を転載しておきます。
キーがunixtime(シリアルな秒)を600秒(10分)で丸めた値で、バリューとして品詞解析したパーツを並べています。1行が品詞の数だけ複数行に展開されたようなイメージです。キーが600秒で丸められていますので、2行目と3行目は同一のキーとして扱われています。
動作確認
基本的に標準入力、出力をベースに動作していますので、linuxであれば以下のように動作確認が可能です。
コマンドラインで入力データをパイプで渡すことで簡単にテストが可能です。Mapの出力であるキーバリューが確認できればOKです(セパレータはタブ文字です)。
ReduceプログラムをPerlで書く
続けてReduceプログラムですがこちらも標準入力から読み込み、標準出力に出力するというプログラムになります。Sort&Shuffleフェーズでキー順にソートされたデータが渡されます。ネイティブ実装との違いはキーに対してバリューがリスト化されていないことです。Reduceプログラムではその点に注意しましょう。
入力です。
次に出力です。
濃いグレーのハッチング部分ポイントです。
直前のキーを$_timeに保持し、現在のキーと比較することでキーが変わったことを検知します。これはキーがソートされているという前提でのみ動作しますが、Hadoopの場合、Reduce処理に渡されるデータは必ずソートされているという”お約束”があるため成立します。
動作確認
こちらもMap同様、コマンドラインで動作確認が可能です。ただし入力についてはキーでソートされている前提ですので、以下のようにsortコマンドを間にかませて実行しましょう。
Hadoopで動かしてみましょう
それではこれらのMap, Reduceを実際にHadoopで動作させてみます。Hadoopではstreamingユーティリティがjarで用意されていますので、それを引数に指定します。さらにストリーミングのオプションを指定します。
- ①jarファイルの指定。streaming用のjarが用意されていますのでそれを指定します。
- ②入力ファイルの指定。ここではHDFS上のファイルを指定します。
- ③出力ディレクトリの指定。ここではHDFS上のディレクトリを指定します。すでに存在すると動かないので、以下のように事前に削除しておく必要があります。
- ④mapperプログラムの指定。先ほど作成したJ2chMap.plを指定します。これは全てのサーバから参照されるパスにないといけません。今回はnfsで共有しているのでコピー等の必要はありません(-fileオプションを使用するとHDFS経由でプログラムを転送してくれるようです)。
- ⑤reduceプログラムの指定。J2chRed.plを指定します。mapperと同じく全てのノードサーバから参照できる必要があります。
- ⑥入力フォーマットの指定。テキストフォーマットを指定します。他にはXMLやバイナリといったフォーマットを指定できます。
実行時の出力は以下のようになります。
管理画面では進捗状況が図2のように見えます。
mapタスクが20分割され並列で処理されました。reduceタスクは4つに分割され処理されています。
結果の取得
結果はHDFS上に保存されますので、コマンドで取得します。
-getmergeコマンドを使用することで、複数のファイルに分かれた結果が1つのファイルにマージされてダウンロードされます。
さてファイルの中身は?
なんとなくそれっぽいのが出ていますね。が、キー順でのソートが全体で有効になっていないようです。
これはReducerが4つになった影響で、それぞれのReducerないではキー順でのソートが行われていますが、全体に対してのソートが行われないためです。そのため、今回の出力も4つのブロックに分かれています。しかし、最終的にこのデータをDBに入れたりすることを考えるとそれほど問題ではありません。
ストリーミングを使用することで、簡単にHadoopを使用できることがわかったと思います。PerlやRubyといった手慣れた言語でさっさと書けるのが魅力ですね。
それでは次回はまとめになります。