R&Dトレンドレポート

第11回MapReduce処理をやってみよう![実践編2]

Java以外でMapReduceする

前回はJavaのネイティブプログラミングでMapReduceを実践してみましたが、今回はもっと手軽な方法を試してみたいと思います。Hadoopに付属しているstreamingユーティリティ(以下ストリーミング)を使うという方法です。ストリーミングを使用することでMap, Reduce処理は独立したプログラムで記述することができます。

それぞれの処理の入出力は標準入出力を使用しますが、逆に言うと標準入出力を使用する処理であれば言語は問いません。つまり、PerlやRuby, PHP,AWKといった使い慣れた言語でHadoopの仕組みを利用できるようになります。すばらしい!

PerlでMapReduceする

今回はPerlを使用しますが、標準入出力を使える言語であれば何でも構いません。Map,Reduceの処理は次の図のようになります。

図1 PerlによるMapReduce処理
図1 PerlによるMapReduce処理

ネイティブな環境とストリーミングの違いはSort&Shuffleの出力(Reduceの入力)です。ネイティブ環境では、

  • キー、[バリュー1,バリュー2,バリュー3]

だったのに対し、ストリーミング環境では

  • キー、バリュー1
  • キー、バリュー2
  • キー、バリュー3

というふうに出力されます。

それではMap, ReduceをPerlで書いてみましょう。

まず準備としてMeCabのPerlのモジュールを導入します。MeCab本体は前回までの準備で入っているものとします。

MeCabのサイトからmecab-perl-0.98.tar.gzをダウンロード&展開します。

$ perl Makefile.PL
$ make
$ make test
# make install

この作業は全てのサーバで必要になります。

MapプログラムをPerlで書く

それではJavaで書いたプログラムをPerl化する作業に入りましょう。入力と出力は変わりません。前回の入出力を転載しておきます。

入力

1287144100    299    名無しさんにズームイン!    LnjhWICN        sage    迫力ねえよ、香川w
1287144803    300    名無しさんにズームイン!    3M/Iym08        sage    利根川なんかちがう。
1287144804    301    名無しさんにズームイン!    xueZONEO        sage    利根川キター!
出力

1287144000    迫力
1287144000    ねえ
1287144000    よ
1287144000    、
1287144000    香川
1287144000    w

1287144600    利根川
1287144600    なんか
1287144600    ちがう
1287144600    。

1287144600    利根川
1287144600    キター
1287144600    !

キーがunixtime(シリアルな秒)を600秒(10分)で丸めた値で、バリューとして品詞解析したパーツを並べています。1行が品詞の数だけ複数行に展開されたようなイメージです。キーが600秒で丸められていますので、2行目と3行目は同一のキーとして扱われています。

リスト1 Mapプログラム(J2chMap.pl)
#!/usr/bin/perl
#
#

use strict;
use MeCab;
use Data::Dumper;
use Encode qw(encode decode);
use utf8;

use constant TIMESEC=>600;

# 標準入力から読み込み
while (<STDIN>)
{
  chomp; # 改行削除
  my @line = split(/¥t/, $_); # タブでスプリット

  # 時刻を丸める。
  my $time = int($line[0]/TIMESEC)*TIMESEC;

  my $body = $line[6];
  $body = decode("utf8",  $body);

  if( my $buf = isAA($body) )
  {
    next;
  }

  $body = noGomi($body);

  my $mecab = MeCab::Tagger->new();
  my $node = $mecab->parseToNode($body);

  while( $node = $node->{next} ) 
  {
    next if ( !  defined $node->{surface} );

    my $surface = $node->{surface};
    next if ( isGomi($surface) );
    my($hinsi, $hinsi2) = (split( /,/, $node->{feature} ))[0..1];

    if ( $hinsi eq encode("utf8", "名詞" ) && $hinsi2 ne encode("utf8", "非自立"))
    {
      # キーバリューで出力。セパレータはタブ。
      print $time, "¥t", $surface,"\n";
    }
  }
}


sub isAA
{
  my $s = shift;

  $s =~ /.*[/ ̄_\ ]{2,}.*/;
  return $&; 
}

sub noGomi
{
  my $s = shift;

  $s =~ s/(h?ttp:¥/¥/|h?ttps:¥/¥/|sssp:¥/¥/){1}[¥w¥.-¥/:¥#¥?¥=¥&¥;¥%¥~¥+]+|>?>[0-9- ]+//g;

  return $s;
}

sub isGomi
{
  my $s = shift;

  return $s =~ /^[¥゚¥/¥?¥!¥(¥),0-9a-zA-Z0-9$%”!#&、;:?|{}@`*+_?><∴∀Д▼△▲▽]/;
}

動作確認

基本的に標準入力、出力をベースに動作していますので、linuxであれば以下のように動作確認が可能です。

$ cat 2ch_4.txt | perl ./J2chMap.pl
1287144000    迫力
1287144000    ねえ
1287144000    よ
1287144000    香川
1287144000    w
・
・
$

コマンドラインで入力データをパイプで渡すことで簡単にテストが可能です。Mapの出力であるキーバリューが確認できればOKです(セパレータはタブ文字です⁠⁠。

ReduceプログラムをPerlで書く

続けてReduceプログラムですがこちらも標準入力から読み込み、標準出力に出力するというプログラムになります。Sort&Shuffleフェーズでキー順にソートされたデータが渡されます。ネイティブ実装との違いはキーに対してバリューがリスト化されていないことです。Reduceプログラムではその点に注意しましょう。

入力です。

1287144000    迫力
1287144000    香川
1287144000    w
1287144600    利根川
1287144600    利根川
1287144600    キター
1287144600    !

次に出力です。

1287144000    1,迫力
1287144000    1,香川
1287144000    1,w
1287144600    2,利根川
1287144600    1,キター
1287144600    1,!
リスト2 Reduceプログラム(J2chRed.pl)
#!/usr/bin/perl
#
#

use strict;
use Data::Dumper;
use Encode qw(encode decode);
use utf8;

use constant LIMIT=>5;

my $hash;
my $_time;

# 標準入力から読み込み
while (<STDIN>)
{
  chomp;
  my @line = split(/¥t/, $_);

  my $time = $line[0];
  my $word = $line[1];

  $word = decode("utf8",  $word);

  # 最初の1行目のみ
  if ( ! $_time ) 
  {
    # 現在のキーを保持。
    $_time = $time;
  }

# キーである$timeが保持している値と変わった。
  if ( $_time != $time && $_time )
  {
    # バッファにたまったデータを出力する。
    showBuf($hash, LIMIT);

    # バッファを初期化
    $hash = undef;
  }

  # 単語ごとにカウントする。
  if ( ! exists ( $hash->{$word} ) ) 
  {
    $hash->{$word} =  1;  
  }
  else
  {
    $hash->{$word} += 1;
  }

  # 現在のキーを保持する。
  $_time = $time;

}

# 残ったバッファを出力する。
showBuf($hash, LIMIT);


sub showBuf
{
  my ($buf, $limit) = @_; 

  my @arr = sort{ $buf->{$b} <=> $buf->{$a} } keys %$buf;
  my $cnt = 0;
  foreach my $a ( @arr )
  {
    my $value = $buf->{$a};
    # キーバリューを出力する。セパレータはタブ。
    print $_time, "¥t", $value, ",", encode("utf8", $a), "\n";
    last if ( ++$cnt >= $limit ) ; 
  }
}

濃いグレーのハッチング部分ポイントです。

直前のキーを$_timeに保持し、現在のキーと比較することでキーが変わったことを検知します。これはキーがソートされているという前提でのみ動作しますが、Hadoopの場合、Reduce処理に渡されるデータは必ずソートされているという⁠お約束⁠があるため成立します。

動作確認

こちらもMap同様、コマンドラインで動作確認が可能です。ただし入力についてはキーでソートされている前提ですので、以下のようにsortコマンドを間にかませて実行しましょう。

$ cat 2ch_4.txt | perl ./J2chMap.pl | sort | perl ./J2chRed.pl
1287136800      2,メニュー
1287136800      2,実況
1287136800      2,中
1287136800      1,風
1287136800      1,天国
1287137400      4,覇
1287137400      3,ばん
1287137400      3,こん
1287137400      2,夜勤
・
・
$

Hadoopで動かしてみましょう

それではこれらのMap, Reduceを実際にHadoopで動作させてみます。Hadoopではstreamingユーティリティがjarで用意されていますので、それを引数に指定します。さらにストリーミングのオプションを指定します。

$ hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar ¥…①
	-input 2ch_4.txt  ¥…②
	-output 2ch_4_result ¥…③
	-mapper /path/to/J2chMap.pl ¥…④
	-reducer /path/to/J2chRed.pl ¥…⑤
	-inputformat TextInputFormat …⑥
  • ①jarファイルの指定。streaming用のjarが用意されていますのでそれを指定します。
  • ②入力ファイルの指定。ここではHDFS上のファイルを指定します。
  • ③出力ディレクトリの指定。ここではHDFS上のディレクトリを指定します。すでに存在すると動かないので、以下のように事前に削除しておく必要があります。
$ hadoop dfs -rmr 2ch_4_result
  • ④mapperプログラムの指定。先ほど作成したJ2chMap.plを指定します。これは全てのサーバから参照されるパスにないといけません。今回はnfsで共有しているのでコピー等の必要はありません(-fileオプションを使用するとHDFS経由でプログラムを転送してくれるようです⁠⁠。
  • ⑤reduceプログラムの指定。J2chRed.plを指定します。mapperと同じく全てのノードサーバから参照できる必要があります。
  • ⑥入力フォーマットの指定。テキストフォーマットを指定します。他にはXMLやバイナリといったフォーマットを指定できます。

実行時の出力は以下のようになります。

10/11/08 23:27:18 INFO mapred.FileInputFormat: Total input paths to process : 1
10/11/08 23:27:19 INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hdfs/h/mapred/local]
10/11/08 23:27:19 INFO streaming.StreamJob: Running job: job_201011072308_0012
10/11/08 23:27:19 INFO streaming.StreamJob: To kill this job, run:
10/11/08 23:27:19 INFO streaming.StreamJob: /usr/local/apache_proj/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=hadoop1:9001 -kill job_201011072308_0012
10/11/08 23:27:19 INFO streaming.StreamJob: Tracking URL: http://hadoop1:50030/jobdetails.jsp?jobid=job_201011072308_0012
10/11/08 23:27:20 INFO streaming.StreamJob:  map 0%  reduce 0%
10/11/08 23:27:33 INFO streaming.StreamJob:  map 3%  reduce 0%
10/11/08 23:27:36 INFO streaming.StreamJob:  map 10%  reduce 0%
10/11/08 23:27:39 INFO streaming.StreamJob:  map 20%  reduce 0%

《中略》

10/11/08 23:29:27 INFO streaming.StreamJob:  map 100%  reduce 32%
10/11/08 23:29:31 INFO streaming.StreamJob:  map 100%  reduce 49%
10/11/08 23:29:33 INFO streaming.StreamJob:  map 100%  reduce 83%
10/11/08 23:29:34 INFO streaming.StreamJob:  map 100%  reduce 100%
10/11/08 23:29:37 INFO streaming.StreamJob: Job complete: job_201011072308_0012
10/11/08 23:29:37 INFO streaming.StreamJob: Output: 2ch_4_result
$

管理画面では進捗状況が図2のように見えます。

図2
図2

mapタスクが20分割され並列で処理されました。reduceタスクは4つに分割され処理されています。

結果の取得

結果はHDFS上に保存されますので、コマンドで取得します。

$ hadoop dfs -getmerge 2ch_4_result 2ch_4_result.txt

-getmergeコマンドを使用することで、複数のファイルに分かれた結果が1つのファイルにマージされてダウンロードされます。

さてファイルの中身は?

《略》

1287150000      93,土下座
1287150000      67,焼き
1287150000      47,゚
1287150000      31,カイジ
1287150000      31,利根川
1287139800      2,そう
1287139800      2,オッケイ
1287139800      2,羽鳥
1287139800      2,足
1287139800      2,今日
1287142800      4,借金
1287142800      4,侍
1287142800      2,ウイルス
1287142800      2,自由
1287142800      2,大丈夫
1287144600      242,゚
1287144600      154,これ
1287144600      154,ざわ
1287144600      119,利根川
1287144600      110,∀
1287146400      112,映画
1287146400      98,これ
1287146400      89,カット
1287146400      64,人
1287146400      63,原作
1287148200      230,ざわざわ
1287148200      114,ざわ
1287148200      105,カイジ
1287148200      88,これ
1287148200      85,利根川
1287151200      152,/
1287151200      102,.
1287151200      59,映画
1287151200      54,-
1287151200      47,カイジ
1287138000      4,ヤバ
1287138000      3,宣伝

《略》

なんとなくそれっぽいのが出ていますね。が、キー順でのソートが全体で有効になっていないようです。

これはReducerが4つになった影響で、それぞれのReducerないではキー順でのソートが行われていますが、全体に対してのソートが行われないためです。そのため、今回の出力も4つのブロックに分かれています。しかし、最終的にこのデータをDBに入れたりすることを考えるとそれほど問題ではありません。

ストリーミングを使用することで、簡単にHadoopを使用できることがわかったと思います。PerlやRubyといった手慣れた言語でさっさと書けるのが魅力ですね。

それでは次回はまとめになります。

おすすめ記事

記事・ニュース一覧