今回のテーマは、Fair Schedulerを使った複数ジョブの同時実行です。1つのHadoopクラスタを使って複数のジョブを実行する際、どのようにクラスタのリソースを割り振るかが重要になります。複数のMapReduceジョブを同時に実行した際に、デフォルトの設定とFair Schedulerを使った場合で、どのような挙動の違いが生じるか、見ていきましょう。
HadoopのSchedulerとは?
MapReduceジョブを実行する際、Hadoopクラスタ上では、予め指定されたslot数だけ、同時にMapタスクやReduceタスクが実行されます。デフォルトでは、1つのTaskTrackerあたり、Mapのslot数がとReduceのslot数がそれぞれ2に設定されています。そのため、ノードが10台ある環境では、MapとReduceのタスクがそれぞれ20ずつ同時に動くことができます。通常、1つのMapReduceを実行する際には、slotを目一杯使って処理が行われます。このslotを、複数のジョブに割り当てて実行させる仕組みが、Schedulerです。
Schedulerは、mapred-site.xmlのmapred.jobtracker.taskSchedulerプロパティで設定します。デフォルトでは、org.apache.hadoop.mapred.JobQueueTaskSchedulerが設定されています。これは、先に開始したジョブから順番にslotを使用するという単純な仕組みです。実際に、複数のMapReduceジョブを同時に動かすとどうなるか、halookで見てみましょう。
クラスタ全体でMapのslot数が8の環境で、以下の2つのジョブを実行します。
- ジョブA:Mapタスク数40、Reduceタスク数0
- ジョブB:Mapタスク数10、Reduceタスク数0
1Mapタスクの実行にかかる時間は1分とします。ジョブAだけをこのクラスタ上で動かすと、8タスクの同時実行×5回で5分かかります。ジョブBだけを動かすと、8タスク、2タスクを順に動かし、2分かかります。図1の(1)がジョブAを単体で動かしたもの、図1の(2)がジョブBを単体で動かしたものです。
図2が、ジョブAを、図3がジョブBを単体実行した結果です。どちらも、8タスクずつ同時に実行されていることが、halookのArrow Chart 画面の矢印の数およびConcurrent task num のグラフで確認できます。
今度は、ジョブAを開始した直後にジョブBを開始して、2つのジョブを同時に実行してやります。図1の(3)がジョブA、(4)がジョブBの実行です。先ほどの単体実行と比べてジョブBにかかる時間が大幅に伸びていることがわかります。これが、「先に開始したジョブのタスクから順番に実行される」という、デフォルトのSchedulerの設定によって後発ジョブが待たされた結果です。(4)のジョブを、図4で見てやるとその動きがはっきりします。開始してからずっと、Mapタスクが動いておらず、slotに空きができたタイミング、つまり、(3)のジョブが終了したタイミングから動き始めていることがわかります。
このように、先発のジョブが終わるまで後発のジョブが待たされると言うのがデフォルトの動きですが、運用上、このままでは使いづらい場面も多いでしょう。例えば、日時で実行されるデータのマージ処理が行われている間に、ちょっとした検索処理を実行したい場合などです。裏で何も動いていなければ3分で返ってくるはずの検索結果が、「タイミング次第では3時間待たされます」、となっては困ってしまいます。
Fair Schedulerの利用
ジョブの同時実行を制御する方法には、Fair Schedulerを使うやり方と、Capacity Schedulerを使うやり方があります。今回はその1つ、Fair Schedulerについて解説します。Fair Schedulerは、「Fair」の名前の通り、「公平に」slotを分け合うというのが基本的な考え方です。この考え方をベースに、ジョブの優先度などを細かく設定してやることによって、複数ジョブによるクラスタの共有が可能になります。
Fair Schedulerは、hadoopの「contrib」モジュールとして、ディストリビューションに同梱されています。詳しくは、Hadoopディストリビューションのsrc/contrib/fairscheduler ディレクトリにあるREADMEに使い方が記載されています。
Fair Schedulerを使うために必要な作業は、以下の2つです。
Fair SchedulerのJARファイルをHadoopのcontrib/fairscheduler ディレクトリからlib ディレクトリにコピーし、Hadoop のクラスパスに配置する
mapred-site.xmlのmapred.jobtracker.taskSchedulerプロパティを以下の値に設定する
まずは、これ以上の細かい設定はせず、先ほどのジョブAとジョブBを同時に動かしてみましょう。図5の(5)がジョブA、(6)がジョブBの実行結果です。Fair Schedulerを使っていなかった、(3)と(4)のジョブと比較してまずわかるのは、後から実行した(6)のジョブBが、(5)のジョブAより早く終わっていることです。
図6で(5)のジョブAの実行結果を見ると、開始直後は8つのMapタスクが同時に動いていますが、その後は同時実行数が4に減っていることがわかります。クラスタ全体で8あるslot数を、2つのジョブで4ずつに分け合っているため、このようになっています。
今度は図7で、(6)のジョブBの実行結果をみてやると、開始直後しばらくは待たされますがその後は4タスクずつMap処理が動いていることが確認できます。
Fair SchedulerのPreemption
Fair Schedulerには、Preemptionという機能が備わっています。これは、一定期間必要なslotを割り当てられなかったら他のジョブのタスクをkillしてslotを確保する、という仕組みです。図7で見たように、後発のジョブのタスクが開始するのは、先発のジョブの実行中のタスクが終了した後からになります。先発のジョブが、細かい多くのタスクからなるジョブであれば問題ありませんが、例えば1タスクに30分かかるジョブの場合、後発のジョブは30分程度開始まで待たされる可能性もあるということになります。そういった事態を回避するのがPreemptionです。
Preemptionはデフォルトではオフになっているため、mapred-site.xmlでmapred.fairscheduler.preemptionプロパティの値をtrueに設定してやります。さらに、mapred.fairscheduler.allocation.fileプロパティで設定したallocation file で、Preemptionに関する設定をしてやります。
以下は、30秒間スロットが割り当てられないとPreemptionを発動するようジョブBを設定して実行した結果です。
図9で、PreemptionによってジョブAのタスクの一部がkillされた様子が確認できます。killされたタスクは、Hadoopの機能によって、再実行が行われています。
Fair Schedulerの詳細な設定
Fair Schedulerは、poolと呼ばれる単位で、優先度を設定したり、最低限確保するslot数、最大で使うslot数などを設定することができます。先ほどのPreemptionを使うのではなく、ジョブAがそもそも最初から8 slotを全部使わないように設定することもできます。mapred-site.xmlのmapred.fairscheduler.allocation.fileプロパティで設定したallocation file で、各poolに関する細かい設定を行います。
以下のように設定し、動作を確認してみましょう。
pool A | 最大Map数:6 優先度:1.0(デフォルト) |
pool B | 最大Map数:指定なし 優先度:3.0 |
先ほどのジョブAをpool Aに、ジョブBをpool Bに割り当てて、ジョブAの開始直後にジョブBを実行してやります。
ジョブAが6つまでしかslotを使わないようになっているため、ジョブBが開始直後から2 slotを使うことができています。優先度が1.0と3.0で差があることから、その後はジョブAが2 slot、ジョブB が6 slotを使っています。
まとめ
デフォルトでは、実行した順番にMapReduceジョブが実行されますが、Fair Schedulerを使うことにより、slotを複数のジョブで共有することができるようになります。細かく設定が可能で、最低限確保するslot数や、逆に、使うslot数の上限を定めて、他のジョブがすぐに動けるようにslotに空きを作っておくことも可能です。
Scheduler機能は運用上必須と言えるでしょう。各ジョブが実際にどのようにslotを使っているかは、ここまで見たように、halookのビューで直感的に把握できます。運用後に、「他のジョブのせいで自分のジョブが終わらない!」などという事態にならないよう、しっかり設定の検討と確認を行いましょう。