Java以外でMapReduceする
前回はJavaのネイティブプログラミングでMapReduceを実践してみましたが、
それぞれの処理の入出力は標準入出力を使用しますが、
PerlでMapReduceする
今回はPerlを使用しますが、

ネイティブな環境とストリーミングの違いはSort&Shuffleの出力
- キー、
[バリュー1,バリュー2,バリュー3]
だったのに対し、
- キー、
バリュー1 - キー、
バリュー2 - キー、
バリュー3
というふうに出力されます。
それではMap, ReduceをPerlで書いてみましょう。
まず準備としてMeCabのPerlのモジュールを導入します。MeCab本体は前回までの準備で入っているものとします。
MeCabのサイトからmecab-perl-0.
$ perl Makefile.PL $ make $ make test # make install
この作業は全てのサーバで必要になります。
MapプログラムをPerlで書く
それではJavaで書いたプログラムをPerl化する作業に入りましょう。入力と出力は変わりません。前回の入出力を転載しておきます。
入力
1287144100 299 名無しさんにズームイン! LnjhWICN sage 迫力ねえよ、香川w
1287144803 300 名無しさんにズームイン! 3M/Iym08 sage 利根川なんかちがう。
1287144804 301 名無しさんにズームイン! xueZONEO sage 利根川キター!
出力
1287144000 迫力
1287144000 ねえ
1287144000 よ
1287144000 、
1287144000 香川
1287144000 w
1287144600 利根川
1287144600 なんか
1287144600 ちがう
1287144600 。
1287144600 利根川
1287144600 キター
1287144600 !
キーがunixtime
#!/usr/bin/perl
#
#
use strict;
use MeCab;
use Data::Dumper;
use Encode qw(encode decode);
use utf8;
use constant TIMESEC=>600;
# 標準入力から読み込み
while (<STDIN>)
{
chomp; # 改行削除
my @line = split(/¥t/, $_); # タブでスプリット
# 時刻を丸める。
my $time = int($line[0]/TIMESEC)*TIMESEC;
my $body = $line[6];
$body = decode("utf8", $body);
if( my $buf = isAA($body) )
{
next;
}
$body = noGomi($body);
my $mecab = MeCab::Tagger->new();
my $node = $mecab->parseToNode($body);
while( $node = $node->{next} )
{
next if ( ! defined $node->{surface} );
my $surface = $node->{surface};
next if ( isGomi($surface) );
my($hinsi, $hinsi2) = (split( /,/, $node->{feature} ))[0..1];
if ( $hinsi eq encode("utf8", "名詞" ) && $hinsi2 ne encode("utf8", "非自立"))
{
# キーバリューで出力。セパレータはタブ。
print $time, "¥t", $surface,"\n";
}
}
}
sub isAA
{
my $s = shift;
$s =~ /.*[/ ̄_\ ]{2,}.*/;
return $&;
}
sub noGomi
{
my $s = shift;
$s =~ s/(h?ttp:¥/¥/|h?ttps:¥/¥/|sssp:¥/¥/){1}[¥w¥.-¥/:¥#¥?¥=¥&¥;¥%¥~¥+]+|>?>[0-9- ]+//g;
return $s;
}
sub isGomi
{
my $s = shift;
return $s =~ /^[¥゚¥/¥?¥!¥(¥),0-9a-zA-Z0-9$%”!#&、;:?|{}@`*+_?><∴∀Д▼△▲▽]/;
}
動作確認
基本的に標準入力、
$ cat 2ch_4.txt | perl ./J2chMap.pl 1287144000 迫力 1287144000 ねえ 1287144000 よ 1287144000 香川 1287144000 w ・ ・ $
コマンドラインで入力データをパイプで渡すことで簡単にテストが可能です。Mapの出力であるキーバリューが確認できればOKです
ReduceプログラムをPerlで書く
続けてReduceプログラムですがこちらも標準入力から読み込み、
入力です。
1287144000 迫力
1287144000 香川
1287144000 w
1287144600 利根川
1287144600 利根川
1287144600 キター
1287144600 !
次に出力です。
1287144000 1,迫力
1287144000 1,香川
1287144000 1,w
1287144600 2,利根川
1287144600 1,キター
1287144600 1,!
#!/usr/bin/perl
#
#
use strict;
use Data::Dumper;
use Encode qw(encode decode);
use utf8;
use constant LIMIT=>5;
my $hash;
my $_time;
# 標準入力から読み込み
while (<STDIN>)
{
chomp;
my @line = split(/¥t/, $_);
my $time = $line[0];
my $word = $line[1];
$word = decode("utf8", $word);
# 最初の1行目のみ
if ( ! $_time )
{
# 現在のキーを保持。
$_time = $time;
}
# キーである$timeが保持している値と変わった。
if ( $_time != $time && $_time )
{
# バッファにたまったデータを出力する。
showBuf($hash, LIMIT);
# バッファを初期化
$hash = undef;
}
# 単語ごとにカウントする。
if ( ! exists ( $hash->{$word} ) )
{
$hash->{$word} = 1;
}
else
{
$hash->{$word} += 1;
}
# 現在のキーを保持する。
$_time = $time;
}
# 残ったバッファを出力する。
showBuf($hash, LIMIT);
sub showBuf
{
my ($buf, $limit) = @_;
my @arr = sort{ $buf->{$b} <=> $buf->{$a} } keys %$buf;
my $cnt = 0;
foreach my $a ( @arr )
{
my $value = $buf->{$a};
# キーバリューを出力する。セパレータはタブ。
print $_time, "¥t", $value, ",", encode("utf8", $a), "\n";
last if ( ++$cnt >= $limit ) ;
}
}
濃いグレーのハッチング部分ポイントです。
直前のキーを$_timeに保持し、
動作確認
こちらもMap同様、
$ cat 2ch_4.txt | perl ./J2chMap.pl | sort | perl ./J2chRed.pl 1287136800 2,メニュー 1287136800 2,実況 1287136800 2,中 1287136800 1,風 1287136800 1,天国 1287137400 4,覇 1287137400 3,ばん 1287137400 3,こん 1287137400 2,夜勤 ・ ・ $
Hadoopで動かしてみましょう
それではこれらのMap, Reduceを実際にHadoopで動作させてみます。Hadoopではstreamingユーティリティがjarで用意されていますので、
$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar ¥…① -input 2ch_4.txt ¥…② -output 2ch_4_result ¥…③ -mapper /path/to/J2chMap.pl ¥…④ -reducer /path/to/J2chRed.pl ¥…⑤ -inputformat TextInputFormat …⑥
- ①jarファイルの指定。streaming用のjarが用意されていますのでそれを指定します。
- ②入力ファイルの指定。ここではHDFS上のファイルを指定します。
- ③出力ディレクトリの指定。ここではHDFS上のディレクトリを指定します。すでに存在すると動かないので、
以下のように事前に削除しておく必要があります。
$ hadoop dfs -rmr 2ch_4_result
- ④mapperプログラムの指定。先ほど作成したJ2chMap.
plを指定します。これは全てのサーバから参照されるパスにないといけません。今回はnfsで共有しているのでコピー等の必要はありません (-fileオプションを使用するとHDFS経由でプログラムを転送してくれるようです)。 - ⑤reduceプログラムの指定。J2chRed.
plを指定します。mapperと同じく全てのノードサーバから参照できる必要があります。 - ⑥入力フォーマットの指定。テキストフォーマットを指定します。他にはXMLやバイナリといったフォーマットを指定できます。
実行時の出力は以下のようになります。
10/11/08 23:27:18 INFO mapred.FileInputFormat: Total input paths to process : 1 10/11/08 23:27:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hdfs/h/mapred/local] 10/11/08 23:27:19 INFO streaming.StreamJob: Running job: job_201011072308_0012 10/11/08 23:27:19 INFO streaming.StreamJob: To kill this job, run: 10/11/08 23:27:19 INFO streaming.StreamJob: /usr/local/apache_proj/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=hadoop1:9001 -kill job_201011072308_0012 10/11/08 23:27:19 INFO streaming.StreamJob: Tracking URL: http://hadoop1:50030/jobdetails.jsp?jobid=job_201011072308_0012 10/11/08 23:27:20 INFO streaming.StreamJob: map 0% reduce 0% 10/11/08 23:27:33 INFO streaming.StreamJob: map 3% reduce 0% 10/11/08 23:27:36 INFO streaming.StreamJob: map 10% reduce 0% 10/11/08 23:27:39 INFO streaming.StreamJob: map 20% reduce 0% 《中略》 10/11/08 23:29:27 INFO streaming.StreamJob: map 100% reduce 32% 10/11/08 23:29:31 INFO streaming.StreamJob: map 100% reduce 49% 10/11/08 23:29:33 INFO streaming.StreamJob: map 100% reduce 83% 10/11/08 23:29:34 INFO streaming.StreamJob: map 100% reduce 100% 10/11/08 23:29:37 INFO streaming.StreamJob: Job complete: job_201011072308_0012 10/11/08 23:29:37 INFO streaming.StreamJob: Output: 2ch_4_result $
管理画面では進捗状況が図2のように見えます。

mapタスクが20分割され並列で処理されました。reduceタスクは4つに分割され処理されています。
結果の取得
結果はHDFS上に保存されますので、
$ hadoop dfs -getmerge 2ch_4_result 2ch_4_result.txt
-getmergeコマンドを使用することで、
さてファイルの中身は?
《略》
1287150000 93,土下座
1287150000 67,焼き
1287150000 47,゚
1287150000 31,カイジ
1287150000 31,利根川
1287139800 2,そう
1287139800 2,オッケイ
1287139800 2,羽鳥
1287139800 2,足
1287139800 2,今日
1287142800 4,借金
1287142800 4,侍
1287142800 2,ウイルス
1287142800 2,自由
1287142800 2,大丈夫
1287144600 242,゚
1287144600 154,これ
1287144600 154,ざわ
1287144600 119,利根川
1287144600 110,∀
1287146400 112,映画
1287146400 98,これ
1287146400 89,カット
1287146400 64,人
1287146400 63,原作
1287148200 230,ざわざわ
1287148200 114,ざわ
1287148200 105,カイジ
1287148200 88,これ
1287148200 85,利根川
1287151200 152,/
1287151200 102,.
1287151200 59,映画
1287151200 54,-
1287151200 47,カイジ
1287138000 4,ヤバ
1287138000 3,宣伝
《略》
なんとなくそれっぽいのが出ていますね。が、
これはReducerが4つになった影響で、
ストリーミングを使用することで、
それでは次回はまとめになります。