Amazon Elastic MapReduceの使い方─Hadoopより手軽にはじめる大規模計算

第4回Java SDKでEMRを起動する

前回は、Amazon Elastic MapReduce Rubyを使ってAmazon Elastic MapReduce(以下EMR)を起動する方法を説明しました。今回はJava SDKを使って起動する方法を紹介していきます。

Java SDKを使う準備をする

実際に使用するにはSDKをダウンロードしておく必要があります。下記サイトからダウンロードしておきましょう。

AWS SDK for Java
URL:http://aws.amazon.com/sdkforjava/
Java Library for Amazon Elastic MapReduce
URL:http://aws.amazon.com/code/Elastic-MapReduce/2305

AWS SDK for Javaは、EMRだけでなくEC2、S3などほかのAWSを操作することもできますし、AWSオフィシャルのものです。積極的に利用することをお勧めします。

また、mavenを使用しているのであれば、設定(pom.xml)に以下を追加してダウンロードすることもできます(執筆時点での最新バージョンは1.1.4です⁠⁠。

リスト1 mavenの設定への追加
<dependency>
  <groupId>com.amazonaws</groupId>
  <artifactId>aws-java-sdk</artifactId>
  <version>1.1.1</version>
</dependency>

Amazon Elastic MapReduce Clientのコンストラクタにプロパティファイルを渡す

では、実際にコードを見ていきましょう。ファイルが少し大きいので、一部抜粋しながら説明していきます。全体については、本記事の記事末にあるリンクからサンプルプログラムをダウンロードしてご確認ください。

リスト2 EMRManager.java(抜粋)
private static final String AWS_CREDENTIALS_PROPERTIES = "/example/emr/AWSCredentials.properties";

private AmazonElasticMapReduce emr;

public EMRManager() throws IOException {
  emr = new AmazonElasticMapReduceClient(
                new PropertiesCredentials(
                        EMRManager.class.getResourceAsStream(AWS_CREDENTIALS_PROPERTIES)));
}

SDKでEMRを操作するにはAmazonElasticMapReduceClientを使います。AmazonElasticMapReduceClientのコンストラクタに渡している「AWSCredentials.properties」は、AWSでの認証のためのプロパティファイルです。以下のように定義します。

リスト3 AWSCredentials.propertiesの内容
accessKey=YYYYYYYYYYYYYYYYY
secretKey=ZZZZZZZZZZZZZZZZZZ

これらは前回も出てきたアクセスキー、シークレットキーと同じものです。もしくはEMRManager.javaのプログラム中で、

public EMRManager() throws IOException {
  emr = new AmazonElasticMapReduceClient(new BasicAWSCredentials("accessKey", "secretKey"));
}

としてBasicAWSCredentialsを使って各キーを指定することもできます。

ステップを定義する

EMRを実行するにあたっては、まず「ステップ」を定義する作業が必要になります。ステップとは、MapReduceのそれぞれのJobのことです。このステップを複数定義して渡すことによって、EMRを一度起動するだけで複数のJobを実行することができます。

リスト4 EMRManager.java(ステップ定義部分 抜粋)
StepConfig stepConfig =
    new StepConfig()
            .withName("job")
            .withActionOnFailure(ACTION_ON_FAILURE)
            .withHadoopJarStep(new HadoopJarStepConfig()
                                        .withJar(MAPREDUCE_JAR)
                                        .withArgs(Arrays.asList(jobName, inputPath, outputPath)));

リスト4の3行目以降の意味は以下のとおりです。

StepConfig#withName
→Jobの名前を指定します。
StepConfig#withActionOnFailure
→Jobの実行に失敗した時の動作を指定します。CANCEL_AND_WAITはJobに失敗した場合、Job自体をキャンセルし、EMRを待機状態にします。
StepConfig#withHadoopJarStep
→実行するJarを定義します。
HadoopJarStepConfig#withJar
→Jarのパスを指定します。
HadoopJarStepConfig#withArgs
→Jarに渡すパラメータを指定します。

デバッグモードで起動するには

前回でも紹介しましたが、SDKでもデバッグモードで起動することができます。大まかには以下のようになります。

リスト5 EMRManager.java(デバッグモード部分 抜粋)
private static final String EMR_DEBUGGIN_NAME = "Debugging";

StepConfig enableDebugging =
    new StepConfig()
            .withName(EMR_DEBUGGIN_NAME)
            .withActionOnFailure(ACTION_ON_TERMINATE)
            .withHadoopJarStep(new StepFactory().newEnableDebuggingStep());

Jobと違うのは以下の2点です。

  • withActionOnFailureにTERMINATE_JOB_FLOWを指定している
    (実行に失敗した場合、EMR自体を終了させます)
  • withHadoopJarStepにデバッグ用のステップを指定している

Jobを実行させるリクエストを定義する

最後に、EMRに対してJobを実行させるリクエストを定義します。

リスト6 EMRManager.java(リクエスト定義部分 抜粋)
private static final String INSTANCE = "m1.large";
private static final String AVAILABILITY_ZONE = "us-east-1a";
private static final String HADOOP_VERSION = "0.20";
private static final String KEY_PAIR_NAME = "cluster-key";
private static final int INSTANCE_NUMBER = 10;
private static final String MAPREDUCE_JAR = "s3://example/mr.jar";
private static final String EMR_LOG_URI = "s3://emr-log/";

RunJobFlowRequest runJobFlowRequest =
    new RunJobFlowRequest()
            .withName(MAP_REDUCE_NAME)
            .withSteps(enableDebugging, stepConfig)
            .withLogUri(EMR_LOG_URI)
            .withInstances(new JobFlowInstancesConfig()
                                    .withEc2KeyName(KEY_PAIR_NAME)
                                    .withHadoopVersion(HADOOP_VERSION)
                                    .withInstanceCount(INSTANCE_NUMBER)
                                    .withKeepJobFlowAliveWhenNoSteps(true)
                                    .withMasterInstanceType(INSTANCE)
                                    .withSlaveInstanceType(INSTANCE)
                                    .withPlacement(new PlacementType()
                                                          .withAvailabilityZone(AVAILABILITY_ZONE)));

リスト6の3行目以降の意味を、runJobFlowRequestの部分とJobFlowInstancesConfigの部分に分けて見ていきましょう。まずはrunJobFlowRequestの部分です。

runJobFlowRequest#withName
→このJob全体の名前を定義します。
runJobFlowRequest#withSteps
→先ほど定義した実際のJobとデバッグ用のステップを定義します。
runJobFlowRequest#withLogUri
→今回はデバッグの指定をしているので出力先を定義します。
runJobFlowRequest#withInstances
→JobFlowInstancesConfigで細かいインスタンスの情報を定義します。

続いて、JobFlowInstancesConfigの部分の定義です。

JobFlowInstancesConfig#withEc2KeyName
→EC2で定義してあるSSHのキーを指定します。
JobFlowInstancesConfig#withHadoopVersion
→Hadoopのバージョンを定義します。現在は0.19と0.20を指定することができます。
JobFlowInstancesConfig#withInstanceCount
→インスタンスの起動数を定義します。このインスタンス数は全体でのインスタンス数となります。今回は10インスタンスとしました。そのため、マスタノード1台、スレーブノード9台という構成になります。
JobFlowInstancesConfig#withKeepJobFlowAliveWhenNoSteps
→ステップが終了した時の動作です。trueとしてあるので1つのJobが終了してもEMR自体は終了せずに待機状態になります。
JobFlowInstancesConfig#withMasterInstanceType
→マスタのインスタンスタイプを指定します。
JobFlowInstancesConfig#withSlaveInstanceType
→スレーブのインスタンスタイプを指定します。
JobFlowInstancesConfig#withPlacement
→PlacementTypeを定義し、インスタンスをどのEC2のゾーンに起動するかを定義します。

なお、今回は上記のように定義しましたが、ゾーンなどは任意です。特に指定しなくても大丈夫です。

Jobを実行させる

それでは最後にEMRを起動し、Jobを実行させてみましょう。

リスト7 EMRManager.java(job実行部分 抜粋)
RunJobFlowResult result = emr.runJobFlow(runJobFlowRequest);
String jobFlowId = result.getJobFlowId();

まずはAmazonElasticMapReduce#runJobFlowを渡します。実行するとRunJobFlowResultに結果が返ってきますが、⁠結果が返ってくる」と言ってもすぐに終了するものではありません。前回同様、JobフローIDを取得し、これ以降はこのJobフローIDを使って操作します。

また、今回は1度の起動で複数のJobを実行させることが前提なので、2回目以降のJobはリスト8のようにして実行させます。

リスト8 EMRManager.java(複数job実行部分 抜粋)
AddJobFlowStepsRequest addJobFlowStepsRequest =
    new AddJobFlowStepsRequest().withJobFlowId(jobFlowId)
                                .withSteps(stepConfig);
emr.addJobFlowSteps(addJobFlowStepsRequest);

それぞれの行の意味は以下のとおりです。

  1. AddJobFlowStepsRequestに追加のJobリクエストを定義
  2. AddJobFlowStepsRequest#withJobFlowIdにRunJobFlowResultで取得したJobフローIDを指定
  3. AddJobFlowStepsRequest#withStepsで新しいステップを定義
  4. AmazonElasticMapReduce#addJobFlowStepsに追加のリクエストを渡して完了

Jobが完了したか確認する

ここまでで起動させる方法を見てきましたが、実際にはJobが完了したのがエラーなのかを判断しなければなりません。Jobの状態を確認するコードはリスト9のようになります。

リスト9 EMRManager.java(Job状態確認部分 抜粋)
private static final String STEP_DETAIL_STATUS_PENDING = "PENDING";
private static final String STEP_DETAIL_STATUS_RUNNING = "RUNNING";
private static final String STEP_DETAIL_STATUS_COMPLETED = "COMPLETED";

String stepStatus = STEP_DETAIL_STATUS_PENDING;
while (stepStatus.equals(STEP_DETAIL_STATUS_PENDING) ||
       stepStatus.equals(STEP_DETAIL_STATUS_RUNNING)) {
  try {
    Thread.sleep(1 * (1000 * 60));
  } catch (Exception e) {
  }

  DescribeJobFlowsRequest describeJobFlowsRequest =
    new DescribeJobFlowsRequest().withJobFlowIds(jobFlowId);
  DescribeJobFlowsResult describeJobFlowsResult =
    emr.describeJobFlows(describeJobFlowsRequest);

  boolean found = false;
  for (JobFlowDetail jobFlowDetail : describeJobFlowsResult.getJobFlows()) {
    for (StepDetail stepDetail : jobFlowDetail.getSteps()) {
      if (stepDetail.getStepConfig().getName().equals("job1")) {
        stepStatus = stepDetail.getExecutionStatusDetail().getState();
        found = true;
        break;
      }
    }

    if (found) {
      break;
    }
  }
}

ここではステップの実行が待ち、もしくは実行している間、1分間隔でステップの状態を監視するようループの条件を設定しています。

まずは、Job状態を取得するリクエストを定義します。DescribeJobFlowsRequest#withJobFlowIdsにJobフローIDを指定します。そしてAmazonElasticMapReduce#describeJobFlowsでリクエストを送信します。結果は、DescribeJobFlowsResultとして戻されます。

DescribeJobFlowsResultからステップのステータスを取得し、待ちでも実行中でもなければメインのループを抜けます。Jobの実行に成功していれば、ステータスはCOMPLETEDが返されます。

以上で、Jobの実行から監視までが完了です。

EMRを終了させる

最後に忘れてはいけないのはEMR自体を終了させることです。これを忘れてしまうといつまでもEMR(正確にはインスタンス)が起動したままになり、ずっと課金されてしまうことになるためです。

終了のコードはリスト10のようになります。

リスト10 EMRManager.java(EMR終了部分 抜粋)
TerminateJobFlowsRequest request = new TerminateJobFlowsRequest();
request.setJobFlowIds(Arrays.asList(jobFlowId));
emr.terminateJobFlows(request);

以下の3つを行うのがポイントとなります。

  • TerminateJobFlowsRequestを定義する
  • 終了させるJobフローIDを指定する
  • AmazonElasticMapReduce#terminateJobFlowsを送信して完了

今回は、Java SDKを使ってEMRを起動する方法を説明してきました。Java Docにはここでは説明しきれなかったその他の機能についても書かれていますので、確認してみてください。

次回は、ここまで何度か出てきたデバッグモードの詳細を説明します。

  • 今回の記事内で紹介したサンプルコードはこちらからダウンロードしてください:samples.zip

おすすめ記事

記事・ニュース一覧