今回は2ちゃんねるデータを処理するという実践を行ってみます。
品詞解析ライブラリの導入
品詞解析にはいくつかのツールがありますが、
- MeCabのサイト
- URL:http://
mecab. sourceforge. net/
MeCab本体と、
MeCab本体のセットアップ
mecab-0.
$ ./configure $ make $ make check # make install
/usr/
辞書のセットアップ
MeCabの動作には辞書が必要ですので、
mecab-ipadic-2.
$ ./configure --with-charset=utf8 $ make # make install
以上でMeCabの導入が終わりました。
CMeCabの導入にはprotocol bufferとsconsが必要となります。
protocol buffer
URL:http://
このページから最新版をダウンロードします。
$ ./configure $ make # make install
scons
$ wget http://prdownloads.sourceforge.net/scons/scons-1.2.0-1.noarch.rpm # rpm -Uhv scons.rpm
CMeCab
CMeCabのサイトで公開されている。cmecab-1.
$ cd cmecab-1.7 $ ant
bin/
ライブラリの生成
続いてネイティブライブラリの生成です。sconsを使用してコンパイルしますが、
37 javahome = '/usr/lib/jvm/java' ←適切なパスに置き換える
sconsを実行します。
$ cd jni $ scons # cp libCMeCab_protobuf.so libCMeCab.so /usr/local/lib
環境変数の設定
今回jarファイルを/usr/
12 # Extra Java CLASSPATH elements. Optional.
13 export HADOOP_CLASSPATH=/usr/local/apache_proj/myproj/lib/cmecab-1.7.jar
cmecab-1.
$HADOOP_
1 <?xml version="1.0"?>
2 <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
3 <!-- Put site-specific property overrides in this file. -->
4 <configuration>
5 <property>
6 <name>mapred.child.java.opts</name>
7 <value>-Djava.library.path=/usr/local/lib</value>
8 </property>
9 <property>
10 <name>mapred.job.tracker</name>
11 <value>hadoop1:9001</value>
12 </property>
13 </configuration>
また、
export LD_LIBRARY_PATH=/usr/local/lib
ここでhadoopを再起動します。
$ stop-all.sh $ start-all.sh
これで準備が整いました!
MapReduceプログラム
ようやくプログラム本体に入れます。今回はJAVAのネイティブなプログラムで機能の実装を行います。
基本的な流れをおさらいすると以下の通りです。
プログラムの全体像
MapReduce処理はデータの入出力がキーとバリューと決まっているため、
プログラムの全体像はたったこれだけです。
public class J2ch
{
// Map処理
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
}
}
// Reduce処理
static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
{
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
}
}
// メイン関数
public static void main(String[] args ) throws Exception
{
if ( args.length != 2 )
{
System.err.println("Usage: hogehoge");
System.exit(-1);
}
// ジョブの定義
Job job = new Job();
job.setJarByClass(J2ch.class);
// 入力出力ファイルの定義(引数の1、2)
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// mapperとreducerクラスを定義
job.setMapperClass(J2chMapper.class);
job.setReducerClass(J2chReducer.class);
// 最終出力のキーとバリューの型を定義
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1 );
}
}
メイン処理ではジョブの定義とmapperとreducerは誰?
ここにMap処理、
Map処理
まずMap処理から見ていきましょう。
Map処理で入力とされる2ちゃんねるデータは以下のようなフォーマットとなります
- ※内容は先日放送された映画
「カイジ 人生逆転ゲーム」 の実況スレです。香川照之さん扮する利根川が” fuck you!” というシーン付近ですね。^^
1287144100 299 名無しさんにズームイン! LnjhWICN sage 迫力ねえよ、香川w
1287144803 300 名無しさんにズームイン! 3M/Iym08 sage 利根川なんかちがう。
1287144804 301 名無しさんにズームイン! xueZONEO sage 利根川キター!
それではMap処理の出力はどうでしょうか?
1287144000 迫力
1287144000 ねえ
1287144000 よ
1287144000 、
1287144000 香川
1287144000 w
1287144600 利根川
1287144600 なんか
1287144600 ちがう
1287144600 。
1287144600 利根川
1287144600 キター
1287144600 !
キーがunixtime
それでは実際のコードを見てみましょう。innerクラスJ2chMapperです。
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
// keyは使いません。
String line = value.toString(); // Text型で渡される1行のデータをString型に変換する。
String[] arr = line.split("\t"); // タブで分割する。
int _time = Integer.parseInt(arr[0]); // unixtimeを取得。
int time = (int)Math.floor(_time/600)*600; // 600秒で丸める。後のキーとなる。
String b = arr[6]; // ボディーテキストを取得
if ( isAA(b) ) return; // AAを含むモノは削除
String body = noGomi(b); // ゴミを削除(URLや>>123などを削除)
Tagger tagger = new StandardTagger("UTF-8", ""); // CMeCabのクラス
Node node = tagger.parse(body); // 品詞解析
// 品詞解析の結果の数(単語)だけループする。
while (node.hasNext())
{
String surface = node.next(); // 単語の取得
String feature = node.feature(); // 単語の解析結果
// 各項目の値を取得。以下を期待
// 名詞,副詞可能,*,*,*,*,本日,ホンジツ,ホンジツ
String featureArr[] = feature.split(",");
// 名詞でかつ内容がゴミではない場合、
// Map処理の結果としてキーバリューを出力する。
if ( featureArr[0].equals("名詞" ) && ! isGomi(surface))
{
//丸めたunixtime、単語のペアを出力Reducerに渡される。
context.write(new IntWritable(time), new Text(surface));
}
}
}
}
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
この部分で、
- 入力キー、
バリュー (LongWritable, Text) - 出力キー、
バリュー (IntWritable, Text)
を定義しています。map()には入力キー、
つまり、
key=0
value=“1287144100 299 名無しさんにズームイン! LnjhWICN sage 迫力ねえよ、香川w”
というふうに1行目が処理されます。2行目は、
key=96
value=“1287144803 300 名無しさんにズームイン! 3M/Iym08 sage 利根川なんかちがう。”
という具合です。
今回のmap処理ではバイト位置は不要ですので、
品詞解析している部分を別のクラスにして隠蔽化、
内容は単純なテキスト処理ですので、
Recude処理
続いてReduce処理です。
Reduce処理の入力はMap処理の出力になります。名詞のみに絞った場合下記のようになりました。
1287144000 迫力
1287144000 香川
1287144000 w
1287144600 利根川
1287144600 利根川
1287144600 キター
1287144600 !
次に出力です。
1287144000 1,迫力
1287144000 1,香川
1287144000 1,w
1287144600 2,利根川
1287144600 1,キター
1287144600 1,!
丸めたunixtimeごとに出現回数の多い単語順にベスト5が出力されます。これが最終的なMapReduceの出力となります
それでは実際のコードを見てみましょう。J2chReduerクラスです。
static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
{
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
// hashtableオブジェクトを作成
// Text=カウントというハッシュを作りたい。
Hashtable kvs = new Hashtable();
// valueにTextのリストが渡されるのでイテレーションする
for( Text value : values )
{
// Textの取り出し。
String k = value.toString();
// kvsオブジェクトに存在すれば。
if ( kvs.containsKey( k ))
{
// すでに存在するキーのバリューを+1する。出現回数を+1する。
Integer n = kvs.get(k) + 1;
kvs.put(k, n);
}
else // 無ければ
{
// kvsオブジェクトにキーと出現回数(1)を追加する。
kvs.put(k, 1);
}
}
// 配列のリストを作成してkvsの中身をセットする。
ArrayList entries = new ArrayList(kvs.entrySet());
// バリューの中身でソートする。(出現回数でソートする)
Collections.sort(entries, new Comparator(){
public int compare(Object obj1, Object obj2){
Map.Entry ent1 =(Map.Entry)obj1;
Map.Entry ent2 =(Map.Entry)obj2;
int val1 = Integer.parseInt(ent1.getValue().toString());
int val2 = Integer.parseInt(ent2.getValue().toString());
return (val2 - val1); // 降順
}
});
// ソート後の配列のリストを走査(降順)
// 最大4まで配列を回す
for( int i = 0; i < entries.size() && i < 5; i++ )
{
String word = (String)((Map.Entry)entries.get(i)).getKey();
int cnt = Integer.parseInt(((Map.Entry)entries.get(i)).getValue().toString());
// キーはIntWritable、バリューはText
context.write(key, new Text( key + "," + cnt + "," + word));
}
}
}
こちらもJ2chMapperと同様に、
static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
の部分が、
- 入力キー、
バリュー (IntWritable, Text) - 出力キー、
バリュー (IntWritable, Text)
を定義しています。Mapperの出力とReducerの入力が同じであることに注意してください。
さて、
1287144000 迫力
1287144000 香川
1287144000 w
1287144600 利根川
1287144600 利根川
1287144600 キター
1287144600 !
実際には
1287144000 [迫力,香川,w]
1287144600 [利根川,利根川,キター,!]
というふうに、
リストで渡された名詞の単語を一度Hashtableに登録し、
これで当初の目的である、
いかがでしたか? 思ってたよりずっと簡単に感じたんではないでしょうか? Map,Reduceのそれぞれの処理での入出力のフォーマットがある程度決まっていることで、
Map、
コンパイル
ソースのコンパイルですが、
$ javac -cp $HADOOP_HOME/hadoop-0.20.2-core.jar:$HADOOP_HOME/myproj/lib/cmecab-1.7.jar J2ch.java
jarファイルの作成は以下のように行います。
$ jar cvf J2ch.jar J2ch*.class
実行
hadoopコマンドにJ2ch.
hadoop jar プログラムのjar メイン関数を含んだクラス 第一引数 第二引数
という構文になります。
$ hadoop jar J2ch.jar J2ch 2ch_4.txt 2ch_4_result
ここでは、
$ hadoop dfs -rmr 2ch_4_result
というふうにディレクトリを削除しましょう。
出力データ例
「カイジ 人生逆転ゲーム」
- ※1287144000が21:00を表します。1287151200が23:00を表します
(放送開始から放送終了まで)。
ざっと眺めてみると、
1287144000 1287144000,53,カイジ
1287144000 1287144000,37,俺
1287144000 1287144000,35,女
1287144000 1287144000,34,カット
1287144000 1287144000,26,出
1287144600 1287144600,118,利根川
1287144600 1287144600,114,これ
1287144600 1287144600,110,www
1287144600 1287144600,108,ざわ
1287144600 1287144600,108,船井
1287145200 1287145200,161,カット
1287145200 1287145200,156,原作
1287145200 1287145200,154,映画
1287145200 1287145200,151,カイジ
1287145200 1287145200,126,展開
1287145800 1287145800,225,カット
1287145800 1287145800,201,ビール
1287145800 1287145800,182,カイジ
1287145800 1287145800,169,ペ
1287145800 1287145800,164,映画
1287146400 1287146400,96,映画
1287146400 1287146400,87,カット
1287146400 1287146400,66,落
1287146400 1287146400,65,これ
1287146400 1287146400,60,原作
1287147000 1287147000,126,落
1287147000 1287147000,101,カイジ
1287147000 1287147000,95,原作
1287147000 1287147000,90,これ
1287147000 1287147000,84,佐原
1287147600 1287147600,135,ざわざわ
1287147600 1287147600,115,ざわ
1287147600 1287147600,69,カード
1287147600 1287147600,54,耳
1287147600 1287147600,53,これ
1287148200 1287148200,105,カイジ
1287148200 1287148200,91,CM
1287148200 1287148200,85,利根川
1287148200 1287148200,81,ざわざわ
1287148200 1287148200,76,顔
1287148800 1287148800,88,遠藤
1287148800 1287148800,82,カイジ
1287148800 1287148800,65,映画
1287148800 1287148800,60,原作
1287148800 1287148800,51,何
1287149400 1287149400,198,キタ
1287149400 1287149400,91,利根川
1287149400 1287149400,85,カイジ
1287149400 1287149400,84,血
1287149400 1287149400,76,ざわ
1287150000 1287150000,91,土下座
1287150000 1287150000,88,焼
1287150000 1287150000,31,カイジ
1287150000 1287150000,29,利根川
1287150000 1287150000,22,演技
1287150600 1287150600,118,カイジ
1287150600 1287150600,116,映画
1287150600 1287150600,93,カット
1287150600 1287150600,84,原作
1287150600 1287150600,62,遠藤
1287151200 1287151200,52,映画
1287151200 1287151200,47,カイジ
1287151200 1287151200,35,原作
1287151200 1287151200,33,思
1287151200 1287151200,32,人
全プログラムリスト
最後に全リストを載せておきます。アスキーアートの除外や意味のない単語の削除など上の例では省いていた関数を載せいてます。
J2ch.java
import java.io.IOException;
import java.util.Hashtable;
import java.text.BreakIterator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.regex.*;
import net.moraleboost.mecab.Node;
import net.moraleboost.mecab.Tagger;
import net.moraleboost.mecab.impl.StandardTagger;
import net.moraleboost.mecab.MeCabException;
public class J2ch
{
static final int TIMESEC = 600; // 丸める秒数
static class J2chMapper extends Mapper <LongWritable, Text, IntWritable, Text>
{
private Boolean isAA( String s)
{
Pattern p = Pattern.compile(".*[/ ̄_\ ]{2}+.*");
Matcher m = p.matcher(s);
return m.matches();
}
private String noGomi( String s )
{
Pattern p = Pattern.compile("(h?ttp://|h?ttps://|sssp://){1}[\\w\\.\\-/:\\#\\?\\=\\&\\;\\%\\~\\+]+|>?>[0-9- ]+");
Matcher m = p.matcher(s);
String t = m.replaceAll("");
return t;
}
private Boolean isGomi( String s )
{
Pattern p = Pattern.compile("^[!()━・゚?/:;0-9a-zA-Z∀Д\\`=│*1234567890.\"'#~{}&,:;+<>%$_$%”!#&、;:?|{}@`*+_?><|∴▼△▲▽-]$");
Matcher m = p.matcher(s);
return m.matches();
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
// 入力データ
// 1288400498\t94\t主侍 </b>◆HIPPER/a9g <b>\t+rXxBAvc\t\t\t 友近とアジアンかって
String line = value.toString();
String[] arr = line.split("\t");
int _time = Integer.parseInt(arr[0]);
int time = (int)Math.floor(_time/TIMESEC)*TIMESEC; // TIMESECで丸める。
if ( arr.length != 7 ) return;
String b = arr[6]; // ボディーテキストを取得
if ( isAA(b) ) return; // AAを含むモノは削除
String body = noGomi(b); // ゴミを削除(URL,>>123など)
Tagger tagger = new StandardTagger("UTF-8", "");
Node node = tagger.parse(body);
// 名詞,副詞可能,*,*,*,*,本日,ホンジツ,ホンジツ
while (node.hasNext())
{
String surface = node.next();
String feature = node.feature();
String featureArr[] = feature.split(",");
if ( featureArr[0].equals("名詞" ))
{
context.write(new IntWritable(time), new Text(surface));
}
}
}
}
static class J2chReducer extends Reducer <IntWritable, Text, IntWritable, Text>
{
public void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
Hashtable<String, Integer> kvs = new Hashtable<String, Integer>();
for( Text value : values )
{
String k = value.toString();
if ( kvs.containsKey( k ))
{
Integer n = kvs.get(k) + 1;
kvs.put(k, n);
}
else
{
kvs.put(k, 1);
}
}
ArrayList entries = new ArrayList(kvs.entrySet());
Collections.sort(entries, new Comparator(){
public int compare(Object obj1, Object obj2){
Map.Entry ent1 =(Map.Entry)obj1;
Map.Entry ent2 =(Map.Entry)obj2;
int val1 = Integer.parseInt(ent1.getValue().toString());
int val2 = Integer.parseInt(ent2.getValue().toString());
return (val2 - val1);
}
});
for( int i = 0; i < entries.size() && i < 5; i++ )
{
String word = (String)((Map.Entry)entries.get(i)).getKey();
int cnt = Integer.parseInt(((Map.Entry)entries.get(i)).getValue().toString());
context.write(key, new Text( key + "," + cnt + "," + word));
}
}
}
public static void main(String[] args ) throws Exception
{
if ( args.length != 2 )
{
System.err.println("Usage: hogehoge");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(J2ch.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(J2chMapper.class);
job.setReducerClass(J2chReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1 );
}
}
次回はJavaを使わないMapReduceの書き方をご紹介します!