Ruby Freaks Lounge

第29回Reactorで非同期処理をやってみよう(1)

はじめに

WebサービスのAPIをコールするような、ネットワークを介した通信処理は、今日では頻繁に行われています。

ローカルマシンのみで完結する処理と比べると、通信が必要な処理は多大な時間が必要になります。相手サーバへの接続、相手サーバ側での処理、相手サーバからの受信など、何もすることなくただ待つだけの時間が存在します。

この無駄な時間の間に他の処理ができるならば、トータルの処理時間を大幅に短縮することが可能になります。これを実現するためにスレッドがよく使われています。しかしマルチスレッドプログラミングはいろいろと注意を払う点も多く、使いにくさを感じている方も多いのではないでしょうか。

今回はReactorパターンという、マルチスレッドとは違ったアプローチで非同期処理を実現してみたいと思います。

複数のwebサーバからHTML文章を取得してみる

同期処理

ひとまず非同期処理を忘れて、シーケンシャルに処理をしてみましょう。

非同期処理を採用する理由は、何も処理をしないでただ待っている無駄な時間を有効利用するためでした。逆に考えると、無駄な時間が存在しない(相手サーバが瞬時に処理を返す)ならば、同期的な処理がもっとも処理時間を短くできます。

リスト1 ひとつずつ同期的に取得
hosts.each do |host|
  sock = TCPSocket.new(host, 80)
  sock.write(request)
  sock.read
  sock.close
end

しかし、現実には無駄な時間は必ず発生します。上記のコードでは、ブロック内のすべてのメソッドコールで発生するでしょう。ひとつのサーバでの遅延は後続の処理の開始を遅らせる結果となり、それが積もり積もって全体の処理時間に無視できない影響を与えます。

マルチスレッド処理

次に処理時間を短縮するため、スレッドを使って並列的に処理を行ってみます。

本当にすべての処理が並列に動くとしたら、一番重いサーバの処理時間が全体の処理時間と等しくなります。実際はすべての処理が同時に並列に動くわけではないし、コンテキストスイッチのコストもあるため、処理時間はもう少しかかることになります。それでも、同期処理とは比較にならないほど高速に処理されるはずです。

リスト2 スレッドを使用
threads = hosts.map do |host|
  Thread.new(host) do |h|
    sock = TCPSocket.new(h, 80)
    sock.write(request)
    sock.read
    sock.close
  end
end
threads.each{|t| t.join}

Reactorパターンを使う

writeやreadのメソッドでの無駄な時間を減らすためにはどうしたらよいでしょう。

一番無駄なのが、相手サーバからデータが届いていないのにreadメソッドでずっと待つことです。データが読める状態になってはじめてreadメソッドをコールすれば、無駄な時間を短縮できることになります。

一番最初に書き込み可能になったソケット、また読み込み可能になったソケットに対して、それぞれwrite、readメソッドを呼んであげるために、ここではselectメソッドを使用します。selectメソッドは、対象のソケットが書き込み可能、または読み込み可能になるまで待機します。用意のできたソケットが出てきたとき、それを通知してくれます。イベントが発生してから処理を行うため、イベント駆動、イベントループなどとも呼ばれています。

リスト3 Reactorパターンで非同期処理を行う
# ①
write_socks = hosts.map do |host|
  TCPSocket.new(host, 80)
end
read_socks = []

# ②
write_proc = lambda{|sock|
  sock.write(request)
}

# ③
read_proc = lambda{|sock|
  sock.read
  sock.close
}

# ④
until (write_socks + read_socks).empty?

  # ⑤
  r_socks, w_socks, e_socks = IO.select(read_socks, write_socks)

  # ⑥
  if ws = w_socks.first
    write_proc.call(ws)
    read_socks << ws
    write_socks.delete(ws)
  end

  # ⑦
  if rs = r_socks.first
    read_proc.call(rs)
    read_socks.delete(rs)
  end
end

他のアプローチと比べると、コードが長くて複雑に見えるかもしれません。ひとつひとつ追ってみましょう。

①の部分では、書き込み対象のソケット群と読み込み対象のソケット群を用意しています。webサーバからHTMLファイルを取得するときは、まずリクエストを送信する必要があります。そのため、ここではリクエストを送信するソケットを、サーバーの数分だけ用意しています。一方、読み込みが可能なソケットはまだないはずなので、空配列になります。

②、③の部分では、書き込み可能、または読み込み可能となったソケットに対する処理を記述しています。書き込み可能となったソケットに対してはリクエストを送信します。読み込み可能となったソケットからはレスポンスを受信し、そのソケットをクローズしています。このように、イベントが発生した際に呼ばれる処理をコールバックと呼びます。

④のブロックがイベントループにあたります。書き込みソケット群、および読み込みソケット群が空になるまでこのループの中の処理を行います。

⑤では、selectを使って、対象ソケットが読み込み可能、または書き込み可能になるのを待っています。いずれかのソケットでイベントが発生すると、処理は⑥以降へと進みます。

⑥では書き込み可能状態となったソケットに対して処理を行います。②で定義したコールバック処理を実行し、その後で対象ソケットを書き込みソケット群から読み込みソケット群に移動させています。こうすることで、送信が完了したソケットは相手のサーバからのレスポンスを待つことができるようになります。

⑦では、読み込み可能状態となったソケットに対して処理を行います。③で定義したブロックをコールし、その後で対象ソケットを読み込みソケット群から除外しています。相手サーバからのレスポンスを受け取ったこのソケットはもう仕事を終えているため、これ以上イベント待ちする必要はありません。

なぜReactorパターンか

一見、Reactorパターンのコードは煩雑で処理の流れもよく分かりません。スレッドを使ったほうがシンプルに見えます。Reactorパターンを使うと何が嬉しいのでしょうか。

なんといっても一番の利点は、シングルスレッドだという点です。マルチスレッドで一番問題になるのが、スレッド間のデータの共有方法です。同時に共通のオブジェクトにアクセスしにいったときに整合性が取れなくなる可能性があります。

リスト4 スレッドの間違った使い方
count = 0
threads = []
10.times{threads << Thread.new{puts count; count += 1}}
threads.each{|t| t.join}

上記リストの場合、それぞれのスレッド内でのcountの値が何になるのかは分かりません。2つのスレッドが同じ値のcountを使用する可能性は十分にあります。マルチスレッドで、スレッド間の共有データにアクセスするときはロックなどを使う必要があります。

Railsのwebサーバとして昔はよく使用されていたmongrelは、マルチスレッドで動作していますが、Railsに処理をさせる部分は大きなロックの中です。バージョン2.2以前のRailsはスレッドセーフではなかったためです。

このようにマルチスレッドを使用するときは、実際に書きたいコード以外の部分で頭を使う必要があります。それを嫌って巨大なロックを使用すると、マルチスレッドの恩恵が大きく失われてしまうことになります。

Reactor パターンはひとつのスレッドの中で動作します。共有データについて意識を払う必要はありません。スレッド切り替えのコンテキストスイッチも発生せず、 selectで待っている間はリソースも消費しません。selectの代わりにepollが使えるならば、いくらIOストリームが増えたとしても処理速度は変化しません。

だけど、分かりにくい

Reactorパターンのコードは理解するのが面倒です。処理の流れが把握しにくいため、慣れていないと思わぬところでバグを仕込んでしまう可能性があります。

また、イベントの発生しないIOをselectに渡してしまうと、処理が返ってこないことになってしまいます。

しかし、使いにくいなら使いやすくすればいいのです。そう考えたRubyistたちが、Reactorパターンを内包したライブラリをたくさん用意してくれています。

Reactor、Rev、NeverBlockなどは、耳にしたことがある方も多いでしょう。次回はその中でもおそらく一番利用されている、EventMachineについてお話をしようと思います。

おすすめ記事

記事・ニュース一覧