R&Dトレンドレポート

第10回MapReduce処理をやってみよう![実践編]

今回は2ちゃんねるデータを処理するという実践を行ってみます。

品詞解析ライブラリの導入

品詞解析にはいくつかのツールがありますが、今回はMeCabというツールを使用します。

MeCabのサイト
URL:http://mecab.sourceforge.net/

MeCab本体と、CMeCabというMeCabをJNIで使用するライブラリを導入します。

MeCab本体のセットアップ

mecab-0.98.tar.gzをダウンロード&展開します。

$ ./configure
$ make
$ make check
# make install

/usr/local/libにlibmecab.so.1が入ります。

辞書のセットアップ

MeCabの動作には辞書が必要ですので、辞書のセットアップを行います。

mecab-ipadic-2.7.0-20070801.tar.gzをダウンロード&展開します。

$ ./configure --with-charset=utf8
$ make
# make install

以上でMeCabの導入が終わりました。

CMeCabの導入にはprotocol bufferとsconsが必要となります。

protocol buffer

URL:http://code.google.com/p/protobuf/downloads/list

このページから最新版をダウンロードします。

protobuf-2.3.0.tar.bz2 をダウンロード&展開
$ ./configure
$ make
# make install

scons

ダウンロード&インストール
$ wget http://prdownloads.sourceforge.net/scons/scons-1.2.0-1.noarch.rpm
# rpm -Uhv scons.rpm

CMeCab

CMeCabのサイトで公開されている。cmecab-1.7.tar.gzをダウンロード&展開

jarファイルの生成
$ cd cmecab-1.7
$ ant

bin/cmecab-1.7.jarが生成されるので、任意のクラスパスにコピーします。ここでは、/usr/local/apache_proj/myproj/lib/を作成しそこにコピーしました。

ライブラリの生成

続いてネイティブライブラリの生成です。sconsを使用してコンパイルしますが、javahomeのパスが間違っている場合は、jni/SConstructファイルを次のように編集します。

37     javahome = '/usr/lib/jvm/java' ←適切なパスに置き換える

sconsを実行します。

$ cd jni
$ scons
# cp libCMeCab_protobuf.so libCMeCab.so /usr/local/lib

環境変数の設定

今回jarファイルを/usr/local/apache_proj/myproj/libにおきましたが、hadoopはこのパスのことを知りません。hadoopの環境変数に設定してあげる必要があります。$HADOOP_HOME/conf/hadoop-env.shを編集します。13行目がコメントになっていると思いますので、コメントを外し、cmecab-1.7.jarへのパスを記述します。

12 # Extra Java CLASSPATH elements.  Optional.
13 export HADOOP_CLASSPATH=/usr/local/apache_proj/myproj/lib/cmecab-1.7.jar

cmecab-1.7.jarからJNIによりCMeCabライブラリが呼び出されますが、下記の設定が必要です。MapReduceの処理はJVMのプロセスが新規で起こされますが、そのJVMにパラメータを渡すことができます。cmecab-1.7.jarで使用されるCMeCabのライブラリへのパスを記述する必要があります。

$HADOOP_HOME/conf/mapred-site.xmlに追記(赤字部分⁠⁠。

 1 <?xml version="1.0"?>
 2 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 3 <!-- Put site-specific property overrides in this file. -->
 4 <configuration>
 5   <property>
 6     <name>mapred.child.java.opts</name>
 7     <value>-Djava.library.path=/usr/local/lib</value>
 8   </property>
 9   <property>
10     <name>mapred.job.tracker</name>
11     <value>hadoop1:9001</value>
12   </property>
13 </configuration>

また、上記CMeCabライブラリがMeCabのライブラリを使用しているため、全てのサーバのhユーザの.bashrcファイルに環境変数を設定します。~h/.bashrcに下記を追記します。

export LD_LIBRARY_PATH=/usr/local/lib

ここでhadoopを再起動します。

$ stop-all.sh
$ start-all.sh

これで準備が整いました!

MapReduceプログラム

ようやくプログラム本体に入れます。今回はJAVAのネイティブなプログラムで機能の実装を行います。

基本的な流れをおさらいすると以下の通りです。

プログラムの全体像

MapReduce処理はデータの入出力がキーとバリューと決まっているため、Map, Reduce処理に何をキーにして、何をバリューにするか、にさえ注意すればわりと簡単に書くことができます。ここら辺が複雑な分散コンピューティングとの差であると言えます。

プログラムの全体像はたったこれだけです。

public class J2ch

{
           // Map処理
	static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
	{
		public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException
		{
		}
	}

           // Reduce処理
	static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
	{
		public void reduce(IntWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException
		{
		}
	}

           // メイン関数
	public static void main(String[] args ) throws Exception
	{
		if ( args.length != 2 )
		{
			System.err.println("Usage: hogehoge");
			System.exit(-1);
		}
                       // ジョブの定義
		Job job = new Job();
		job.setJarByClass(J2ch.class);

                       // 入力出力ファイルの定義(引数の1、2)
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

                      // mapperとreducerクラスを定義
		job.setMapperClass(J2chMapper.class);
		job.setReducerClass(J2chReducer.class);

                       // 最終出力のキーとバリューの型を定義
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1 );

	}
}

メイン処理ではジョブの定義とmapperとreducerは誰?というのを定義しています。色をつけた部分がメインとmapper, reducerの関係する部分です。

ここにMap処理、Reduce処理の部分に処理を埋めていけばそれでOKです。簡単ですよね!?

Map処理

まずMap処理から見ていきましょう。

Map処理で入力とされる2ちゃんねるデータは以下のようなフォーマットとなります(ある3行だけ抜粋&加工しています⁠⁠。

※内容は先日放送された映画「カイジ 人生逆転ゲーム」の実況スレです。香川照之さん扮する利根川が⁠fuck you!⁠というシーン付近ですね。^^
1287144100  299  名無しさんにズームイン!    LnjhWICN        sage   迫力ねえよ、香川w
1287144803  300  名無しさんにズームイン!    3M/Iym08        sage   利根川なんかちがう。
1287144804  301  名無しさんにズームイン!    xueZONEO        sage   利根川キター!

それではMap処理の出力はどうでしょうか?

1287144000    迫力
1287144000    ねえ
1287144000    よ
1287144000    、
1287144000    香川
1287144000    w

1287144600    利根川
1287144600    なんか
1287144600    ちがう
1287144600    。

1287144600    利根川
1287144600    キター
1287144600    !

キーがunixtime(シリアルな秒)を600秒(10分)で丸めた値で、バリューとして品詞解析したパーツを並べています。1行が品詞の数だけ複数行に展開されたようなイメージです。キーが600秒で丸められていますので、2行目と3行目は同一のキーとして扱われています。ただ⁠、⁠⁠。⁠などが出力されてしまうのはイマイチかもしれません。ここは名詞だけを出力するようにします。

それでは実際のコードを見てみましょう。innerクラスJ2chMapperです。

static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
{
	public void map(LongWritable key, Text value, Context context)
		throws IOException, InterruptedException
	{
		// keyは使いません。
		String line = value.toString(); // Text型で渡される1行のデータをString型に変換する。
		String[] arr = line.split("\t");    // タブで分割する。
	
		int _time = Integer.parseInt(arr[0]); // unixtimeを取得。
		int time = (int)Math.floor(_time/600)*600; // 600秒で丸める。後のキーとなる。
	
		String b = arr[6]; // ボディーテキストを取得
	
		if ( isAA(b) ) return; // AAを含むモノは削除
		String body = noGomi(b); // ゴミを削除(URLや>>123などを削除)
	
		Tagger tagger = new StandardTagger("UTF-8", ""); // CMeCabのクラス
		Node node = tagger.parse(body); // 品詞解析
	
	 	// 品詞解析の結果の数(単語)だけループする。
	 	while (node.hasNext())
		{
			String surface = node.next(); // 単語の取得
			String feature = node.feature(); // 単語の解析結果	
	
			// 各項目の値を取得。以下を期待
			// 名詞,副詞可能,*,*,*,*,本日,ホンジツ,ホンジツ
			String featureArr[] = feature.split(",");
	
			// 名詞でかつ内容がゴミではない場合、
			// Map処理の結果としてキーバリューを出力する。
			if ( featureArr[0].equals("名詞" ) && ! isGomi(surface))
			{
	                                    //丸めたunixtime、単語のペアを出力Reducerに渡される。
				context.write(new IntWritable(time), new Text(surface));
			}
	
		}
	}
}
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text> 

この部分で、mapperクラスの

  • 入力キー、バリュー(LongWritable, Text)
  • 出力キー、バリュー(IntWritable, Text)

を定義しています。map()には入力キー、バリューが渡され実行されます(第三引数のContextは意識しなくてもいいです⁠⁠。今回のmapにおけるキーはファイルのバイト位置になります(デフォルトの入力フォーマットTextInputFormatが適用されているため⁠⁠。また、バリューでは行データが渡されます。

つまり、今回のデータの場合、

key=0
value=“1287144100  299  名無しさんにズームイン!    LnjhWICN        sage   迫力ねえよ、香川w”

というふうに1行目が処理されます。2行目は、

key=96
value=“1287144803  300  名無しさんにズームイン!    3M/Iym08        sage   利根川なんかちがう。”

という具合です。

今回のmap処理ではバイト位置は不要ですので、key変数は使っていません。⁠600⁠というマジックナンバーを使ってたりしますが、この辺はstaticな値として変数にした方がいいですね。

品詞解析している部分を別のクラスにして隠蔽化、抽象化した方が別の品詞解析エンジンを使ったときに柔軟に対応できますね、など突っ込みどころがあるかと思いますが、まずはこれだけのコードでmap処理がかけることに驚いてください。

内容は単純なテキスト処理ですので、コメントを追っていただけるとご理解いただけると思います。

Recude処理

続いてReduce処理です。

Reduce処理の入力はMap処理の出力になります。名詞のみに絞った場合下記のようになりました。

1287144000    迫力
1287144000    香川
1287144000    w
1287144600    利根川
1287144600    利根川
1287144600    キター
1287144600    !

次に出力です。

1287144000    1,迫力
1287144000    1,香川
1287144000    1,w
1287144600    2,利根川
1287144600    1,キター
1287144600    1,!

丸めたunixtimeごとに出現回数の多い単語順にベスト5が出力されます。これが最終的なMapReduceの出力となります(ファイル出力となる⁠⁠。

それでは実際のコードを見てみましょう。J2chReduerクラスです。

static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
{
	public void reduce(IntWritable key, Iterable<Text> values, Context context)
		throws IOException, InterruptedException
	{
		// hashtableオブジェクトを作成
		// Text=カウントというハッシュを作りたい。
		Hashtable kvs = new Hashtable();
		
		// valueにTextのリストが渡されるのでイテレーションする
		for( Text value : values )
		{
			// Textの取り出し。
			String k = value.toString();

			// kvsオブジェクトに存在すれば。
			if ( kvs.containsKey( k ))
			{
				// すでに存在するキーのバリューを+1する。出現回数を+1する。
				Integer n = kvs.get(k) + 1;
				kvs.put(k, n);
			}
			else // 無ければ
			{
				// kvsオブジェクトにキーと出現回数(1)を追加する。
				kvs.put(k, 1);
			}
		}

		// 配列のリストを作成してkvsの中身をセットする。
		ArrayList entries = new ArrayList(kvs.entrySet());

		// バリューの中身でソートする。(出現回数でソートする)
		Collections.sort(entries, new Comparator(){
				public int compare(Object obj1, Object obj2){
				Map.Entry ent1 =(Map.Entry)obj1;
				Map.Entry ent2 =(Map.Entry)obj2;
				int val1 = Integer.parseInt(ent1.getValue().toString());
				int val2 = Integer.parseInt(ent2.getValue().toString());
				return (val2 - val1); // 降順
			}
		});

		// ソート後の配列のリストを走査(降順)
		// 最大4まで配列を回す
		for( int i = 0; i < entries.size() && i < 5; i++ )
		{
			String word = (String)((Map.Entry)entries.get(i)).getKey();
			int cnt = Integer.parseInt(((Map.Entry)entries.get(i)).getValue().toString());

			// キーはIntWritable、バリューはText
			context.write(key, new Text( key + "," + cnt + "," + word));
		}
	}
}

こちらもJ2chMapperと同様に、

static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>

の部分が、reducerクラスの

  • 入力キー、バリュー(IntWritable, Text)
  • 出力キー、バリュー(IntWritable, Text)

を定義しています。Mapperの出力とReducerの入力が同じであることに注意してください。

さて、reduce()関数ですが、第二引数がiterable<Text>となっています。mapの出力であればTextじゃないか、と思われるかもしれませんが、実際にはText型のリストが渡されます。つまり、mapの出力であると想定された下記は、

1287144000    迫力
1287144000    香川
1287144000    w
1287144600    利根川
1287144600    利根川
1287144600    キター
1287144600    !

実際には

1287144000    [迫力,香川,w]
1287144600    [利根川,利根川,キター,!]

というふうに、キーごとにTextのリストが渡されます。また、Reduce処理の前にキーでソートされますので、同一キーについては必ず1つにまとめられます。

リストで渡された名詞の単語を一度Hashtableに登録し、ArrayListに入れ直してバリューで降順にソートしています(明らかに無駄な動作をしているような気がしますが…⁠⁠。ソート後の単語と出現回数を上位5つまで出力しています。

これで当初の目的である、2ちゃんねるの実況スレッドから時間単位に出現数の多い単語を出力する、という機能が実装されました。

いかがでしたか? 思ってたよりずっと簡単に感じたんではないでしょうか? Map,Reduceのそれぞれの処理での入出力のフォーマットがある程度決まっていることで、肝心のやりたいことに集中できたと思います。

Map、Reduceというふうに役割がきっちり分かれていることで、どこに何を実装するべきかというのが自然に分担されたと思います。さらに、MapとReduceの間にキーでのソート、マージがあることもReduce処理の書きやすさを増していたと思います。

コンパイル

ソースのコンパイルですが、以下のようにクラスパスを指定してコンパイルします。

$ javac -cp $HADOOP_HOME/hadoop-0.20.2-core.jar:$HADOOP_HOME/myproj/lib/cmecab-1.7.jar  J2ch.java

jarファイルの作成は以下のように行います。

$ jar cvf J2ch.jar J2ch*.class

実行

hadoopコマンドにJ2ch.jarを渡して実行させます。

hadoop jar プログラムのjar メイン関数を含んだクラス 第一引数 第二引数

という構文になります。

$ hadoop jar J2ch.jar J2ch 2ch_4.txt 2ch_4_result

ここでは、第一引数に入力ファイル名、第二引数に出力ディレクトリを指定しています。入力ファイル、出力ディレクトリはいずれもHDFS上となります。出力先ディレクトリがすでにあるとエラーになるので、

$ hadoop dfs -rmr 2ch_4_result

というふうにディレクトリを削除しましょう。

出力データ例

「カイジ 人生逆転ゲーム」の実況スレのデータを処理しました。

※1287144000が21:00を表します。1287151200が23:00を表します(放送開始から放送終了まで⁠⁠。

ざっと眺めてみると、カイジという主人公の名前、利根川という相手役の名前、船井という人物、佐原、ざわざわ、土下座、焼、と原作をご存じの方や映画を見た方はピンとくるキーワードが並んでいるんではないでしょうか?? カット、原作という単語が多いのも原作からカットされているシーンが多いぞ、ということでしょうか。出現回数だけの解析結果としては傾向がつかめていると思います。おもしろいですね !

1287144000	1287144000,53,カイジ
1287144000	1287144000,37,俺
1287144000	1287144000,35,女
1287144000	1287144000,34,カット
1287144000	1287144000,26,出
1287144600	1287144600,118,利根川
1287144600	1287144600,114,これ
1287144600	1287144600,110,www
1287144600	1287144600,108,ざわ
1287144600	1287144600,108,船井
1287145200	1287145200,161,カット
1287145200	1287145200,156,原作
1287145200	1287145200,154,映画
1287145200	1287145200,151,カイジ
1287145200	1287145200,126,展開
1287145800	1287145800,225,カット
1287145800	1287145800,201,ビール
1287145800	1287145800,182,カイジ
1287145800	1287145800,169,ペ
1287145800	1287145800,164,映画
1287146400	1287146400,96,映画
1287146400	1287146400,87,カット
1287146400	1287146400,66,落
1287146400	1287146400,65,これ
1287146400	1287146400,60,原作
1287147000	1287147000,126,落
1287147000	1287147000,101,カイジ
1287147000	1287147000,95,原作
1287147000	1287147000,90,これ
1287147000	1287147000,84,佐原
1287147600	1287147600,135,ざわざわ
1287147600	1287147600,115,ざわ
1287147600	1287147600,69,カード
1287147600	1287147600,54,耳
1287147600	1287147600,53,これ
1287148200	1287148200,105,カイジ
1287148200	1287148200,91,CM
1287148200	1287148200,85,利根川
1287148200	1287148200,81,ざわざわ
1287148200	1287148200,76,顔
1287148800	1287148800,88,遠藤
1287148800	1287148800,82,カイジ
1287148800	1287148800,65,映画
1287148800	1287148800,60,原作
1287148800	1287148800,51,何
1287149400	1287149400,198,キタ
1287149400	1287149400,91,利根川
1287149400	1287149400,85,カイジ
1287149400	1287149400,84,血
1287149400	1287149400,76,ざわ
1287150000	1287150000,91,土下座
1287150000	1287150000,88,焼
1287150000	1287150000,31,カイジ
1287150000	1287150000,29,利根川
1287150000	1287150000,22,演技
1287150600	1287150600,118,カイジ
1287150600	1287150600,116,映画
1287150600	1287150600,93,カット
1287150600	1287150600,84,原作
1287150600	1287150600,62,遠藤
1287151200	1287151200,52,映画
1287151200	1287151200,47,カイジ
1287151200	1287151200,35,原作
1287151200	1287151200,33,思
1287151200	1287151200,32,人

全プログラムリスト

最後に全リストを載せておきます。アスキーアートの除外や意味のない単語の削除など上の例では省いていた関数を載せいてます。

J2ch.java
import java.io.IOException;
import java.util.Hashtable;
import java.text.BreakIterator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.util.regex.*;

import net.moraleboost.mecab.Node;
import net.moraleboost.mecab.Tagger;
import net.moraleboost.mecab.impl.StandardTagger;
import net.moraleboost.mecab.MeCabException;

public class J2ch
{
	static final int TIMESEC = 600; // 丸める秒数

	static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
	{
		private Boolean isAA( String s)
		{
			Pattern p = Pattern.compile(".*[/ ̄_\ ]{2}+.*");
			Matcher m = p.matcher(s);

			return m.matches();
		}

		private String noGomi( String s )
		{
			Pattern p = Pattern.compile("(h?ttp://|h?ttps://|sssp://){1}[\\w\\.\\-/:\\#\\?\\=\\&\\;\\%\\~\\+]+|>?>[0-9- ]+");
			Matcher m = p.matcher(s);
			String t = m.replaceAll("");

			return t;
		}


		private Boolean isGomi( String s )
		{
			Pattern p = Pattern.compile("^[!()━・゚?/:;0-9a-zA-Z∀Д\\`=│*1234567890.\"'#~{}&,:;+<>%$_$%”!#&、;:?|{}@`*+_?><|∴▼△▲▽-]$");
			Matcher m = p.matcher(s);

			return m.matches();
		}


		public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException
		{
			// 入力データ
			// 1288400498\t94\t主侍 </b>◆HIPPER/a9g <b>\t+rXxBAvc\t\t\t 友近とアジアンかって
			String line = value.toString();
			String[] arr = line.split("\t");

			int _time = Integer.parseInt(arr[0]);
			int time = (int)Math.floor(_time/TIMESEC)*TIMESEC; // TIMESECで丸める。

			if ( arr.length != 7 ) return;

			String b = arr[6]; // ボディーテキストを取得

			if ( isAA(b) ) return; // AAを含むモノは削除

			String body = noGomi(b); // ゴミを削除(URL,>>123など)

			Tagger tagger = new StandardTagger("UTF-8", "");

			Node node = tagger.parse(body);

			// 名詞,副詞可能,*,*,*,*,本日,ホンジツ,ホンジツ
			while (node.hasNext())
			{

				String surface = node.next();
				String feature = node.feature();
				String featureArr[] = feature.split(",");

				if ( featureArr[0].equals("名詞" ))
				{
					context.write(new IntWritable(time), new Text(surface));
				}

			}
		}
	}

	static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
	{
		public void reduce(IntWritable key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException
		{
			Hashtable<String, Integer> kvs = new Hashtable<String, Integer>();

			for( Text value : values )
			{
				String k = value.toString();
				if ( kvs.containsKey( k ))
				{
					Integer n = kvs.get(k) + 1;
					kvs.put(k, n);
				}
				else
				{
					kvs.put(k, 1);
				}
			}


			ArrayList entries = new ArrayList(kvs.entrySet());

			Collections.sort(entries, new Comparator(){
				public int compare(Object obj1, Object obj2){
					Map.Entry ent1 =(Map.Entry)obj1;
					Map.Entry ent2 =(Map.Entry)obj2;
					int val1 = Integer.parseInt(ent1.getValue().toString());
					int val2 = Integer.parseInt(ent2.getValue().toString());
					return (val2 - val1);
				}
			});


			for( int i = 0; i < entries.size() && i < 5; i++ )
			{
				String word = (String)((Map.Entry)entries.get(i)).getKey();
				int cnt = Integer.parseInt(((Map.Entry)entries.get(i)).getValue().toString());
				context.write(key, new Text( key + "," + cnt + "," + word));
			}

		}
	}

	public static void main(String[] args ) throws Exception
	{
		if ( args.length != 2 )
		{
			System.err.println("Usage: hogehoge");
			System.exit(-1);
		}

		Job job = new Job();
		job.setJarByClass(J2ch.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(J2chMapper.class);
		job.setReducerClass(J2chReducer.class);

		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);


		System.exit(job.waitForCompletion(true) ? 0 : 1 );
	}
}

次回はJavaを使わないMapReduceの書き方をご紹介します!

おすすめ記事

記事・ニュース一覧