前回は、Amazon Elastic MapReduce Rubyを使ってAmazon Elastic MapReduce(以下EMR)を起動する方法を説明しました。今回はJava SDKを使って起動する方法を紹介していきます。
Java SDKを使う準備をする
実際に使用するにはSDKをダウンロードしておく必要があります。下記サイトからダウンロードしておきましょう。
- AWS SDK for Java
- URL:http://aws.amazon.com/sdkforjava/
- Java Library for Amazon Elastic MapReduce
- URL:http://aws.amazon.com/code/Elastic-MapReduce/2305
AWS SDK for Javaは、EMRだけでなくEC2、S3などほかのAWSを操作することもできますし、AWSオフィシャルのものです。積極的に利用することをお勧めします。
また、mavenを使用しているのであれば、設定(pom.xml)に以下を追加してダウンロードすることもできます(執筆時点での最新バージョンは1.1.4です)。
Amazon Elastic MapReduce Clientのコンストラクタにプロパティファイルを渡す
では、実際にコードを見ていきましょう。ファイルが少し大きいので、一部抜粋しながら説明していきます。全体については、本記事の記事末にあるリンクからサンプルプログラムをダウンロードしてご確認ください。
SDKでEMRを操作するにはAmazonElasticMapReduceClientを使います。AmazonElasticMapReduceClientのコンストラクタに渡している「AWSCredentials.properties」は、AWSでの認証のためのプロパティファイルです。以下のように定義します。
これらは前回も出てきたアクセスキー、シークレットキーと同じものです。もしくはEMRManager.javaのプログラム中で、
としてBasicAWSCredentialsを使って各キーを指定することもできます。
ステップを定義する
EMRを実行するにあたっては、まず「ステップ」を定義する作業が必要になります。ステップとは、MapReduceのそれぞれのJobのことです。このステップを複数定義して渡すことによって、EMRを一度起動するだけで複数のJobを実行することができます。
リスト4の3行目以降の意味は以下のとおりです。
- StepConfig#withName
- →Jobの名前を指定します。
- StepConfig#withActionOnFailure
- →Jobの実行に失敗した時の動作を指定します。CANCEL_AND_WAITはJobに失敗した場合、Job自体をキャンセルし、EMRを待機状態にします。
- StepConfig#withHadoopJarStep
- →実行するJarを定義します。
- HadoopJarStepConfig#withJar
- →Jarのパスを指定します。
- HadoopJarStepConfig#withArgs
- →Jarに渡すパラメータを指定します。
デバッグモードで起動するには
前回でも紹介しましたが、SDKでもデバッグモードで起動することができます。大まかには以下のようになります。
Jobと違うのは以下の2点です。
- withActionOnFailureにTERMINATE_JOB_FLOWを指定している
(実行に失敗した場合、EMR自体を終了させます)
- withHadoopJarStepにデバッグ用のステップを指定している
Jobを実行させるリクエストを定義する
最後に、EMRに対してJobを実行させるリクエストを定義します。
リスト6の3行目以降の意味を、runJobFlowRequestの部分とJobFlowInstancesConfigの部分に分けて見ていきましょう。まずはrunJobFlowRequestの部分です。
- runJobFlowRequest#withName
- →このJob全体の名前を定義します。
- runJobFlowRequest#withSteps
- →先ほど定義した実際のJobとデバッグ用のステップを定義します。
- runJobFlowRequest#withLogUri
- →今回はデバッグの指定をしているので出力先を定義します。
- runJobFlowRequest#withInstances
- →JobFlowInstancesConfigで細かいインスタンスの情報を定義します。
続いて、JobFlowInstancesConfigの部分の定義です。
- JobFlowInstancesConfig#withEc2KeyName
- →EC2で定義してあるSSHのキーを指定します。
- JobFlowInstancesConfig#withHadoopVersion
- →Hadoopのバージョンを定義します。現在は0.19と0.20を指定することができます。
- JobFlowInstancesConfig#withInstanceCount
- →インスタンスの起動数を定義します。このインスタンス数は全体でのインスタンス数となります。今回は10インスタンスとしました。そのため、マスタノード1台、スレーブノード9台という構成になります。
- JobFlowInstancesConfig#withKeepJobFlowAliveWhenNoSteps
- →ステップが終了した時の動作です。trueとしてあるので1つのJobが終了してもEMR自体は終了せずに待機状態になります。
- JobFlowInstancesConfig#withMasterInstanceType
- →マスタのインスタンスタイプを指定します。
- JobFlowInstancesConfig#withSlaveInstanceType
- →スレーブのインスタンスタイプを指定します。
- JobFlowInstancesConfig#withPlacement
- →PlacementTypeを定義し、インスタンスをどのEC2のゾーンに起動するかを定義します。
なお、今回は上記のように定義しましたが、ゾーンなどは任意です。特に指定しなくても大丈夫です。
Jobを実行させる
それでは最後にEMRを起動し、Jobを実行させてみましょう。
まずはAmazonElasticMapReduce#runJobFlowを渡します。実行するとRunJobFlowResultに結果が返ってきますが、「結果が返ってくる」と言ってもすぐに終了するものではありません。前回同様、JobフローIDを取得し、これ以降はこのJobフローIDを使って操作します。
また、今回は1度の起動で複数のJobを実行させることが前提なので、2回目以降のJobはリスト8のようにして実行させます。
それぞれの行の意味は以下のとおりです。
- AddJobFlowStepsRequestに追加のJobリクエストを定義
- AddJobFlowStepsRequest#withJobFlowIdにRunJobFlowResultで取得したJobフローIDを指定
- AddJobFlowStepsRequest#withStepsで新しいステップを定義
- AmazonElasticMapReduce#addJobFlowStepsに追加のリクエストを渡して完了
Jobが完了したか確認する
ここまでで起動させる方法を見てきましたが、実際にはJobが完了したのがエラーなのかを判断しなければなりません。Jobの状態を確認するコードはリスト9のようになります。
ここではステップの実行が待ち、もしくは実行している間、1分間隔でステップの状態を監視するようループの条件を設定しています。
まずは、Job状態を取得するリクエストを定義します。DescribeJobFlowsRequest#withJobFlowIdsにJobフローIDを指定します。そしてAmazonElasticMapReduce#describeJobFlowsでリクエストを送信します。結果は、DescribeJobFlowsResultとして戻されます。
DescribeJobFlowsResultからステップのステータスを取得し、待ちでも実行中でもなければメインのループを抜けます。Jobの実行に成功していれば、ステータスはCOMPLETEDが返されます。
以上で、Jobの実行から監視までが完了です。
EMRを終了させる
最後に忘れてはいけないのはEMR自体を終了させることです。これを忘れてしまうといつまでもEMR(正確にはインスタンス)が起動したままになり、ずっと課金されてしまうことになるためです。
終了のコードはリスト10のようになります。
以下の3つを行うのがポイントとなります。
- TerminateJobFlowsRequestを定義する
- 終了させるJobフローIDを指定する
- AmazonElasticMapReduce#terminateJobFlowsを送信して完了
今回は、Java SDKを使ってEMRを起動する方法を説明してきました。Java Docにはここでは説明しきれなかったその他の機能についても書かれていますので、確認してみてください。
次回は、ここまで何度か出てきたデバッグモードの詳細を説明します。