これなら使える!ビッグデータ分析基盤のエコシステム

第9回[最終回] データパイプラインのためのワークフロー管理

基本KPIや応用KPIが決まり、実際に、毎日の運用の中で定期的にデータを更新して、可視化するためには、一連の処理を自動化する必要があります。今回は、データパイプラインを扱うためのワークフロー管理ツールを紹介していきます。

データパイプラインとワークフロー管理

データパイプライン ⁠以下、パイプライン)とは、データ処理を行なう小さなタスク(1回のファイルコピーや、SQLの実行など)を順次実行することにより、最終的に求める結果を得るための一連のプロセスを指します。狭義には、単体のシステム内で完結するパイプラインを指します(SparkやGoogle Cloud Dataflow、など⁠⁠。

また、広義には、複数のシステムを組み合わせて大きなパイプラインを構成することもあります(MySQLから取り出したデータをRedshiftで集計する、など⁠⁠。今回、取り上げるパイプラインとは、広義のパイプラインで、その実現にパイプラインを扱うワークフロー管理が必要です。

ワークフロー管理 とは、指定した時間に複数のジョブを実行し、各ジョブの進行を管理するためのシステムを指します。たとえば、ジョブのスケジュール実行、ジョブの依存関係の解決、エラー時の通知や自動リトライ、指定したジョブの再実行、過去の実行履歴の管理などといった機能があります。

パイプラインを実現するのに、次のようにワークフロー管理を行なっているケースが多いようです。

  1. 自分でスクリプトを書く(リトライ制御などもスクリプトの中に記述する⁠⁠。
  2. Jenkinsを用いる(Jenkinsには依存解決、エラー通知など、ワークフロー管理に必要な機能がある⁠⁠。
  3. JP1を用いる(国内のIT部門では定番のワークフロー管理システム⁠⁠。

しかし、毎回スクリプトを書くのは大変ですし、ちょっとしたパイプラインにJenkinsやJP1を導入するのも大袈裟です。もっと手軽にパイプラインを管理できるツールが欲しい、ということで作られているのが、最近のワークフロー管理ツールです。

ETLツールとの違い

ところで、この分野の伝統的なソフトウェアといえばETLツールですが、それを使っているという話はほとんど耳にしません。従来のデータウェアハウスのように、要件定義やテーブル設計が明確な世界では使われているのかもしれませんが、ビッグデータ界隈とは相性が悪いのかもしれません。

ETLツールは、それ自体が大量のデータ処理を行うため、Javaによる高速実行や分散処理を想定して設計されています。ところが、最近は性能が必要なところではHadoopなどが用いられるようになり、ワークフロー管理はAPIを呼び出すだけになりつつあるため、既存のETLツールではオーバースペックとなり、より簡潔なツールが求められているのでしょう。

今後、クラウドサービスの普及が進むにつれて、ますますAPIによってシステム連携が行われるようになると、ワークフロー管理もAPIによるコントロールが中心となり、ソフトウェアとしては軽量化されていくことが考えられます。

ワークフロー管理の役割

前提として、パイプライン実現のために求められるワークフロー管理の要件をまとめておきます。

大量のデータを扱う場合、1つのジョブに数時間かかることもよくあります。たとえば、データのロードに1時間、前処理に1時間、集計に1時間、結果のアウトプットに30分、といった具合です。

ここで最後のアウトプットで予期せぬエラーが発生し、スクリプトの実行に失敗したとしましょう。最初からやりなおすとまた3時間以上かかってしまうので、最後のアウトプットだけやり直したいでしょう。そうするとスクリプトを修正して、手動でリカバリ作業を行うことになります。スクリプトを実行した翌日になってエラーに気付くのもよくあることで、そうするとますます時間を使ってしまいます。

ワークフロー管理の役割のひとつは、こうしたエラーのリカバリを簡単にすることにあります。最初からリカバリのことを想定し、どこでエラーが起きたらどうやり直すのかを意識しながらプロセスを記述します。大きなジョブを複数の小さな「タスク」に分割し、各タスクはリトライ可能となるように実装します。

もうひとつ重要な役割が、タスクの並列実行です。ビッグデータ分析エンジンで手軽に大量のデータにクエリを実行できるようになってはいますが、⁠データが大きすぎるので24分割してください」などと言われる場合があります。多くのシステムは「巨大なタスクを一つ実行」するよりも「小さな多数のタスクを並列実行」する方が効率良く動作するようになっているためです。経験的には、一つのタスクが数分から1時間程度で終わるように分割すると効率的なように思います。ただし、並列度があまりに高すぎるのは駄目で、どんなシステムであれ、同時に100万リクエストも発行しようものなら大量のエラーを受け取ることになるでしょう。

そこで必要になるのが「タスクキュー」のモデルで、それにより並列処理を適度に抑制することができます。たとえ100万タスクをキューに入れても、並列度を20に設定しておけば、一度に20以上のリクエストは実行されなくなります。利用するAPIやクエリ実行エンジンによって最適な並列度は異なるため、複数のキューを使い分けて並列度をコントロールするのが理想です。

最後に必要なのが、タスクの依存関係の解決です。タスクキューで多数のタスクを並列実行し、エラーのリトライを繰り返す中で、タスクの実行順序を保証するには、各タスクの依存関係を記述できなければなりません。

これらが、ワークフロー管理に求める必須要件で、これらに加えてジョブのスケジュール実行や、わかりやすいUIなどをパッケージにまとめたものが、パイプライン処理のためのワークフロー管理ソフトウェアです。

既存のソフトウェア

ビッグデータ界隈でよく耳にするソフトウェアをいくつか取り上げます。概ね下にいくほど新しいツールです。

Apache Oozie

Apache Oozieは、Hadoop標準のワークフロー管理ツールです。Hadoopに渡すパラメータなどをまとめてXMLに記述しておくと、その定義にしたがってジョブを順次実行してくれます。

Apache Falcon

Apache Falconは、 Hortonworksに組み込まれているワークフロー管理ツールです。Oozieの後継として作られているようで、こちらもXMLにジョブのパラメータを記述するタイプです。

Azkaban

Azkabanは、LinkedInで開発され、オープンソースとして公開されているJava製のワークフロー管理ツールです。タスクごとに個別のテキストファイル(ini形式)を作成し、それらをzipファイルにアーカイブしてREST APIでアップロードすると実行される仕組みになっています。Hadoop以外の拡張パッケージはあまり多くありません。リトライ時には、手動でパラメータを再設定した上で、ブラウザから再実行します。

Azkaban UI
Azkaban UI

Luigi

Luigiは、Spotifyで開発され、オープンソースとして公開されているPython製のパイプラインのためのワークフロー管理ツール(このあたりから「パイプライン」という呼び方が一般的になったよう)です。Hadoop以外にも、MySQLやPostgreSQL、Sparkなどさまざまなエンジンに標準対応しています。タスクをPythonのクラスとして定義する仕様で、プログラミング能力が求められます。

データベースやサーバがなくとも、単体のスクリプトとして実行できるため、手軽さは抜群です。ただし、それは欠点でもあり、完了したタスクの管理がユーザ任せになるなど、利用する側に負担を強いることになります。自分でコーディングするのが苦にならないPythonプログラマには扱いやすいですが、そうでなければ敷居が高そうです。

Luigi
Luigi

Amazon Data Pipeline

Amazon Data Pipelineは、AWSで提供されている名前の通りパイプラインのためのワークフロー管理サービスです。Webコンソール、またはJSONでジョブを記述します。決まった時間にEMRを起動し、Hiveの実行結果をS3に書き出し、それをRedshiftで集計する、などといったAWSに閉じたパイプラインを実行するのに良いです。

Data Pipeline
Data Pipeline

Azure Data Factory

Azure Data Factoryは、Microsoft Azureで提供されているパイプラインのためのワークフロー管理サービスです。Webコンソール、またはJSONでジョブを記述します。Hadoopクラスタの起動やHiveクエリの実行などが出来るようで、Amazon Data Pipelineと似ています。

Azure
Azure

Google Cloud Dataflow

Google Cloud Dataflowは、Googleで提供されているパイプラインのためのワークフロー管理サービスです。Java SDKが提供されており、処理内容はJavaで記述します。これはどちらかというと狭義のパイプラインであり、ここから他のサービスをコントロールするのではなく、Googleに格納されたデータをGoogleの計算リソースでパイプライン処理するのに用いられます。

Dataflow
Dataflow

Dataswarm

Dataswarmは、Facebook社内で使われているというPython製のワークフロー管理ツール。ジョブの記述にPythonを用いるのはLuigiと同じですが、クラスを定義するのではなく、一連のタスクをDAGとして記述するようです。ソースコードは公開されておらず、利用はできません。

Airflow

Airflowは、Airbnbで開発され、オープンソースとして公開されているPython製のワークフロー管理ツールです。⁠Dataswarmにインスパイアされた」とあるように、DAGとしてジョブを記述するようになっています。

Airflow
Airflow

パイプラインを記述する3つのスタイル

これまで紹介したワークフロー管理ツールを利用するための記述方法には、大きく分けて次の3つのスタイルがあります。

  1. 宣言型: 宣言型とは、XMLやJSONなどで実行したい内容を記述するタイプを指します。
    宣言型は、Java製のツールに多いです。また、あらかじめ用意された機能では不十分な場合に、Javaクラスとして拡張モジュールを登録するか、あるいは外部コマンド(orスクリプト)を用意する必要があります。

  2. 手続き型: 手続き型とは、プログラムに組み込む形でパイプラインを記述するタイプを指します。
    パイプラインを実装する過程で、自前でスクリプトを書くことが多いために、最初からスクリプト言語でパイプラインを記述した方が効率が良くなります。そこで、データ処理と相性のいい(Pandasが使える)Pythonをパイプラインの記述に用いることが増えてきています。

  3. DAG型: DAG(Directed Acyclic Graphs)とは、ノードとノードが矢印で結び付いたデータ構造のことで、それを基礎としたプログラミングモデルを、ここでは特にDAGスタイルとして区別します。
    これも手続き型と同じくスクリプトにパイプラインを埋め込む点は同じですが、プログラムの書き方が異なります。

これらのスタイルが具体的にどう異なるか、についての説明は今回は省きます。ワークフロー管理ツールは、ビッグデータ分析エンジンと同じようにまだまだホットな領域で、さまざまなツールが生まれ、デファクトスタンダードはまだありません。こうしたさまざまなツールがある中で自分たちのユースケースに合わせてツールを取捨選択し、自分たちのビッグデータ分析エンジンを効率的に運用するために活用してみてください。

ビッグデータ分析基盤を支えるエコシステムまとめ

この連載では、ビッグデータ分析基盤の第一歩となるエコシステムを紹介しました。

ログコレクタとして、ストリーミング処理のためのFlunetd、バッチ処理のためのEmbulkを紹介し、アクセスログやRDBからデータのインポートを行いました。

次に、分析エンジン上に収集されたデータをアドホックに分析する環境として、JupyterとPandasを利用しながら、ウェブサイト全体の指標となる基本KPI分析と、ユーザのパスに着目した応用KPI分析を行いました。

そして、最終回となる今回では、これらの処理をパイプラインとして実運用で定期実行するためのワークフロー管理システムを紹介しました。

ここで紹介したシステムを利用することで、さまざまなビッグデータ分析エンジンにも柔軟に対応して、ビッグデータ分析の第一歩を踏み出すことができるでしょう。

しかし、今回のビッグデータ分析基盤の構成は、あくまでも第一歩であり、この先には、リアルタイム処理や機械学習エンジンなど考えるべき仕組みがまだまだあります。

そのためにも柔軟にシステムの追加ができるように考えていきながら、自分たちのビッグデータ分析基盤に合ったエコシステムを選んでいきましょう。

おすすめ記事

記事・ニュース一覧