はじめに
前回は、データ処理の方法を整理し、また、宣言型言語をインターフェースとして用いる並列データベースなどのデータ処理系を詳細に見ていく準備として、当該データ処理系における実行プランの作成の流れをかんたんに説明しました。今回は、当該データ処理系において、どのように実行プランを並列化するかについて、その概要を説明します。
データ処理における並列性について
並列データベースをはじめとするデータ処理系は、SQL文などの問い合わせ(クエリ)の内容に応じてデータ処理を行うものであり、問い合わせの観点においては、当該処理系において用いられる並列性(Parallelism)は、次の2つに分類することができます。
- 問い合わせ間の並列性(Inter-Query Parallelism)
- 問い合わせ内の並列性(Intra-Query Parallelism)
問い合わせ間の並列性は、複数の異なる問い合わせを並列に実行するときの並列性の形態です。問い合わせ間の並列性を活用することにより、システム全体のスループットの向上が期待でき、当該並列性は、特にOLTPと称されるデータベース処理において活用されています。
一方、問い合わせ内の並列性は、1つの問い合わせを並列に実行可能な複数のサブタスクに分解して実行するときの並列性の形態です。問い合わせ内の並列性を活用することにより、当該問い合わせのレイテンシの低下が期待でき、当該並列性は、特にOLAPと称されるデータベース処理において活用されています。
問い合わせ間の並列性は、複数の独立な問い合わせを並列に実行することにより得られることから、当該並列性を活用する方法は明らかであると考えられます。一方、問い合わせ内の並列性においては、問い合わせを並列実行可能な複数のサブタスクに分解する方法は必ずしも明らかではありません。したがって、以降においては、問い合わせ内の並列性を深堀りして見ていきます。
問い合わせ内の並列性
問い合わせ内の並列性は、さらに次の2つの並列性に分類することができます。
- オペレータ間の並列性(Inter-Operator Parallelism)
- オペレータ内の並列性(Intra-Operator Parallelism)
オペレータ間の並列性は、問い合わせにおける複数の異なるオペレータを並列に実行するときの並列性の形態です。オペレータ間の並列性は、さらに次の2つに分類することができます。
- パイプライン並列性(Pipelined Parallelism)
- 独立並列性(Independent Parallelism)
パイプライン並列性(図1(a))は、あるオペレータが出力するデータを別のあるオペレータが入力する場合において、これらのオペレータを並列に実行するときの並列性の形態です。独立並列性(図1(b))は、データの依存関係がない複数の独立したオペレータを並列に実行するときの並列性の形態です。
一方、オペレータ内の並列性(図1(c))は、1つのオペレータを並列実行可能な複数のサブタスクに分解して実行するときの並列性の形態です。多くの並列データ処理系においては、データを複数に分割(パーティショニング)し、オペレータの複数のインスタンスが当該分割データ(パーティション)を並列に読み出すことにより、オペレータ内の並列性を活用します。したがって、データ処理の文脈においては、オペレータ内の並列性はパーティション並列性(Pipelined Parallelism)とも呼ばれます。
ここまで出てきた並列性を次にまとめておきます。
- 問い合わせ間の並列性(Inter-query Parallelism)
- 問い合わせ内の並列性(Intra-query Parallelism)
- オペレータ間の並列性(Inter-operator Parallelism)
- パイプライン並列性(Pipelined Parallelism)
- 独立並列性(Independent Parallelism)
- オペレータ内の並列性(Intra-operator Parallelism)
- パーティション並列性(Partitioned Parallelism)
次回以降は、おもに問い合わせ内の並列性に焦点を当て、データ処理アルゴリズムがどのように並列化できるかを説明していきます。以下では、その準備として、パーティション並列性を活用する際に用いるデータ分割方法(パーティショニング方法)について述べ、さらに近年の並列データ処理系で用いられている並列性の概要を見ていきます。
データの分割方法
データ処理において用いられるデータの分割方法には、おもに次の3つの方法があります。
- ラウンドロビン分割(Round-Robin Partitioning)
- ハッシュ分割(Hash Partitioning)
- 範囲分割(Range Partitioning)
ラウンドロビン分割(図2(a))は、最も単純な分割方法であり、その名前が示すとおり、データの構成要素であるレコード(タプル)を順繰りに計算機に割り振ります。ハッシュ分割(図2(b))は、レコード内の属性の値にハッシュ関数を適用し、その結果に基づいて当該レコードを計算機に割り振ります。範囲分割(図2(c))は、レコード内の属性の値を用い、事前に決定された属性値の範囲と計算機の割り当てに従い、当該レコードを計算機に割り振ります。
ラウンドロビン分割を用いて格納されたデータにおいては、ある値を有するレコードを探したい場合に、当該レコードが格納されている計算機を判別することはできないため、すべてのデータを読み出す必要があります。一方、ハッシュ分割や範囲分割を用いて格納されたデータにおいては、属性値を用いて、事前に与えられているハッシュ関数の結果や範囲の割り当てに従い、レコードが格納されている計算機を判別することができる場合があります。
また、範囲分割においては、多くの場合、近い値をもつレコードが同じ計算機に割り振られるため、ハッシュ分割を用いた場合と比較して、データが値でクラスタ化されており、処理系が当該クラスタを利用して処理の効率化を図ることができる場合があります。しかしながら、レコードにおいて値の偏りがある場合は、ハッシュ分割を用いた場合と比較して、データが計算機に均等に分割されないことがあります。
ケーススタディ―Hadoop/Impala/Presto/Sparkで活用されている並列性
少し抽象的な話が続いたので、Hadoopなどのデータ処理系において、実際に活用されている並列性を見てみることにします。なお、データの管理を行うストレージとしては、分散ファイルシステムであるHDFSを用いているものとします。HDFSは、データをブロックなる小さい塊(64MBから512MB程度)に分け、当該ブロックをラウンドロビン分割により複数の計算機に分割します。それぞれの処理系の詳細については、また後の回で述べる予定です。
Hadoop MapReduce
Hadoop MapReduceは、Mapなる処理オペレータとReduceなる処理オペレータから構成され、当該処理系においては、アプリケーション開発者が当該処理を組み合わせることによりデータ処理を実現します。Mapにおいては、複数のMapインスタンスがパーティション並列性を活用し、HDFSに格納されているラウンドロビン分割されたデータを読み出します。Mapの出力結果は出力キーによりハッシュ分割され、Reduceにおいて、複数のReduceインスタンスがパーティション並列性を活用し、ハッシュ分割された当該データを読み出します。
Impala/Presto
Cloudera Impalaは、SQLをユーザインターフェースとして用いる並列データ処理系であり、一般的な並列データベースに類するデータ処理アーキテクチャを採用しています。Impalaにおいては、Hadoop MapReduceと同様に、各オペレータの複数のインスタンスが、パーティション並列性を活用し、HDFSにある分割データを読み出します。加えて、ハッシュ結合[1]などにおいては、パイプライン並列性を活用します。すなわち、Impalaのハッシュ結合においては、複数のデータを結合する際に、図3に示すように、先に複数のデータに対してハッシュ表を構築(ビルド:Build)し、その後、当該ハッシュ表に対してパイプラインで参照(プローブ:Probe)を行います[2]。当該アルゴリズムについては、後の回で詳しく説明する予定ですので、ここでは概要を把握していただくだけで結構です。
PrestoはImpalaと同様にSQLをユーザインターフェースとして用いる並列データ処理系であり、Impalaに類するデータ処理アーキテクチャを採用しています。
Spark
Sparkは、Hadoopに類したデータ処理系であり、当該処理系においては、アプリケーション開発者が事前に定義されている多数のオペレータを組み合わせることにより、データ処理を実現します。Sparkにおいては、HadoopやImpalaなどと同様に、各オペレータの複数のインスタンスがパーティション並列性を活用し、データを読み出します。また、アプリケーション開発者における実行プランの記述によっては、パイプライン並列性を活用することができると考えられます。
並列データ処理系においては、データを複数の計算機に分割して格納することは自然であるため、分割データを並列に読み出すことから得られるパーティション並列性は、その他の並列性と比較して、活用しやすい並列性であると考えられます。このため、上記の実際のシステムにおいても、パーティション並列性を活用し、また場合に応じて、パイプライン並列性(や独立並列性)を活用していることが見て取れます。
おわりに
今回は、宣言型言語によるデータ処理における並列性について説明しました。次回は、並列性の指標(スケーラビリティ)や、当該並列性を活用したデータ処理アルゴリズムについて触れていく予定です。
謝辞
Impalaの実装においては、Cloudera Japanの嶋内翔さん、および、ClouderaのMarcel Kornacker博士にご助言をいただきました。