Hadoopでレコメンドシステムを作ろう

第3回レコメンドシステム-協調フィルタリングのHadoopへの実装[前編]

今回はいよいよHadoopを用いたレコメンドシステムについて説明します。

今回のポイントは以下の通りです。

  • 処理をMapReduceフレームワークへ変換することで、分散処理のメリットを享受
  • アウトプットからkeyについて着目し、処理ロジックを考える
  • 簡単な処理でも数段階のMapReduce処理を踏む場合がある

前回までのおさらい

分散処理の基本的な考え方は、大規模データあるいは処理する問題を小さく、かつ、互いに独立した単位に分割して並列に処理することで、各処理単位の出力を結合することで最終的な結果を得るというものです。Hadoopは数ある分散処理のフレームワークの実装のひとつで、システムレベルの詳細の多くを意識せず、処理ロジックに集中して設計できる特徴があります。

Hadoopで処理するため、前回紹介したユーザの映画評価の履歴をHDFSのディレクトリにコピーすると、HDFSは履歴を各ノードにブロック単位に分割して保存します。HDFSからMapReduceはこの履歴を読み込みます。

前回紹介したように、Hadoopはデータをクラスタ内のローカルディスクに分散し、そのデータがあるノード上で処理を実行するというデータローカリティを実現しています。

そこで単純に考えると、この履歴からアイテム間の相関係数を求めるためには、Mapでアイテムをkey、ユーザ名をvalueとして抽出し、Reduceでアイテムごとに集約してアイテムシーケンスを作成し、そのシーケンスをベクトルに見立てて相関係数を計算すれば良い気がします。

ですが、この方法では分散処理のメリットを生かせません。その大きな理由の1つには、MapおよびReduceの処理はメモリ上で行うので、履歴が増えた場合、全アイテムシーケンスを保存するとメモリ不足に陥ります。

欲しいアウトプットはアイテム間のコサイン関数による値なので、コサイン関数を構成する要素を見直してみます。

cos(a,b)=a*b/|a||b| =(a1*b1+a2*b2+a3*b3+....an*bn)/(√a1^2+a2^2+a3^2+....an^2)(√b1^2+b2^2+b3^2+....bn^2)

するとa1b1,...(アイテムa,bを両方アクセスしたユーザ数)および√a,√b(アイテムa、bをそれぞれアクセスしたユーザ数)がわかれば計算できることがわかります。

今までの処理ロジックを分散環境でそのまま実行するだけでなく、MapReduceのフレームワークに変換することで、処理をノード毎に実行でき、処理可能なデータサイズや処理時間をスケールアウトできる分散処理のメリットを生かせます。

結果から逆に考える

コサイン関数を計算するために、各要素を計算する方法をみてみます。ここでは履歴から考えるより、DVDを逆再生するように、要素から履歴へ逆変換する方法で考えます。Mapperはkeyとvalueの抽出、Reducerはkeyごとの集約を行うので、まず各処理のkeyについて注目します。次にReducerで集約されるvalueから直前のMapperで出力するkeyとvalueのペアを特定します。

図1 コサイン関数を計算するための処理フロー
図1 コサイン関数を計算するための処理フロー

この図において、青い文字はkey、赤い文字はvalueで、青い背景の枠内はkey、赤い背景の枠内はvalueに関わる処理で、それぞれMapperおよびReducer内で処理します。

この図より、keyとなる要素が処理に応じて異なり、処理内容が異なるMapおよびReduceフェーズがあるので、1回のMapReduceの処理では計算は完了しないことがわかります。

ここでMapperおよびReducerの設計に使えるポイントをまとめます。これらのポイントはこのコサイン関数だけでなく、他の関数や処理フローを実装する時にも使えます。

  • Mapperは入力データからkeyとvalueを入力データから指定して抽出するだけでなく、内部で抽出したデータに対して変換や演算処理を行うように設計できる。
  • keyはデータをユニーク(一意)に識別するためのもので、型として文字列(ユーザ名やアイテム名など)だけでなく、数値列、文字と数値の組み合わせ(ユーザIDやアイテムID⁠⁠、リストなども使える。
  • valueはkeyに関連付けられたデータで、keyと同じ型を取れ、複数のIDを組み合わせたリストや構造体も扱える。したがってReducerでの集約を、数値の集計だけでなく、結合してリスト作成することもできる。
  • Mapperの処理が終わってはじめて、Reducerの処理が始まる。
  • 同じkeyに関連付けられた全データ(keyが同じkeyとvalueの全ペア⁠⁠、は同じReducerにソートして送られる。
  • Reducerの出力はHDFSに書き出され、これを次のMapperの入力として使える。つまり、MapReduceの処理が何段階に分かれても、データローカリティを確保できる。

MapReduceへマップする

この処理を各段階ごとに見ていきます。

第一段階:履歴からユーザシーケンスへ

前回登場したアイテムシーケンスはアイテム毎にユーザを集約したリスト形式のデータですが、これとは逆にユーザシーケンスはユーザ毎にアイテムを集約したリスト形式のデータです。つまり、各ユーザが評価したアイテムのリストになっています。

この段階ではkeyをユーザ、valueをkeyが評価したアイテムのリストを得るために、次のようにMapperおよびReducerを設計します。この図ではユーザIDとアイテムIDをそれぞれユーザ名および映画のタイトルにしています。

図2 履歴からユーザシーケンスを作成するまでの処理フロー
図2 履歴からユーザシーケンスを作成するまでの処理フロー
  • ①は履歴の各行から、ユーザ名をkey、ユーザが評価した映画をvalueとして抽出します。
  • ②でユーザ名でMapperの出力結果、key とvalueのペアをソートします。ユーザ名が同じkeyとvalueのペアは同じReducerに送られます。
  • ③は各keyについてvalueを結合することで、リストを生成します。

次に、この処理を実装した例を示します。今回利用するデータはAmazonのレビューデータなので、先に示した履歴を次のように読み替えてください。

  • ユーザ名 → ユーザID
  • 映画のタイトル → アイテムID

このデータは各行がタブ区切りになっており、最初のカラムにユーザID、次のカラムにアイテムIDが入っています。

リスト1 Mapperの例
#!/usr/bin/perl

#Perlスクリプトの安全性を高める。
use strict;

#スクリプト内で利用する変数の宣言
my $mem_id;
my $item_id;

#標準入力からデータファイルを一行毎読みこむ
while (<STDIN>) {

        #改行コードの削除
        chomp $_;
        my @string = split ( /\t/, $_);
        $mem_id = $string[0];
        if ( $mem_id eq '' ) { next; }
        $item_id = $string[1];
        if ( $item_id eq '' ) { next; }

#出力形式はkeyを最初に、タブで区切ってvalueを一行毎に出力する。
#タブ以外でも可能であるが、その場合はkeyおよびvalueに含まれないものを使う。
        print ($mem_id . "\t" . $item_id . "\n" );
}
リスト2 Reducerの例
#!/usr/bin/perl

use strict;

#スクリプト内で利用する変数の宣言
my ($key,$value);

#key値の比較に用いる変数の初期化。ここではkeyが文字列であるので、空白化している。
my $key_current = "";
my $cnt = 0;
my @list = ();

#標準入力からMapperの出力データを一行毎読みこむ
while (<STDIN>) {
        chomp $_;

#入力したデータをkeyとvalueのペアに分割する。分割するデリミタはMap関数の出力形式に合わせる。
#ここでは前出のMap関数がタブ区切りで出力したので、デリミタをタブにする。
        ($key,$value) = split(/\t/,$_);

#key値が変わった=以前のkeyに関するデータの読み込みが終了のサインなので、
#対応するkey(以前のkey)に対する処理を開始する。
        if ( $key ne $key_current )  {
          #valueが存在するkeyについてvalueを出力する。
                if ( $#list > -1 ) {
                        print ($key_current . "\t");
                        foreach ( @list ) {
                                print ($_ . ":");
                        }
                        print ("\n");
                }
           #比較対象となるkeyを変更する。
                $key_current = $key;
                @list = ();
        }
       #keyについてvalueをリスト化する。
        push @list,$value;
}
#標準入力を読み終えた段階で、まだ出力していないkeyに対し、集約を行い、その結果を出力する。
print ($key . "\t");
if ( $#list > -1 ) {
        foreach ( @list ) {
                print ($_ . ":");
        }
        print ("\n");
}

次回はユーザシーケンスからの説明になります。

おすすめ記事

記事・ニュース一覧