このステータスは次の意味があります。
status = 0 未処理(Forward前、およびForward失敗時)
status = 1 処理済(Forward成功時)
したがって、このステータスはForwardが無事に成功したことをあらわしています。
次に、未処理のステータスをつくるために、AP4Rプロセスを Ctrl+C
で終了した状態で注文処理を実行します。ステータスは未処理をあらわす"0"となっているでしょう。
>> y Ap4r::StoredMessage.find(:all)
---
- !ruby/object:Ap4r::StoredMessage
attributes:
status: "1"
updated_at: 2007-09-14 10:21:30
(省略)
- !ruby/object:Ap4r::StoredMessage
attributes:
status: "0"
updated_at: 2007-09-14 10:26:58
duplication_check_id: 82912650-448f-012a-cfbe-0016cb9ad524
id: "30"
queue: queue.async_shop.payment
object: "\x04\b\"\x10order_id=30"
created_at: 2007-09-14 10:26:58
headers: "\x04\b{\n\ :\rdelivery:\tonce:\x0Fqueue_name\
"\x1Dqueue.async_shop.payment:\x12dispatch_mode:\tHTTP:\x0Ftarget_url\
"-http://localhost:3000/async_shop/payment:\x12target_method\"\tPOST"
=> nil
SAF用テーブルに保管されているメッセージが多い場合には、次のようにすると簡単に一覧が見られます。左から順に、ID、キューの名前、生成日時をあらわしています。
>> y Ap4r::StoredMessage.find_status_of(:all).map{|sm| sm.to_summary_string}
---
- 29, queue.async_shop.payment, Fri Sep 14 10:21:30 +0900 2007
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 30, queue.async_shop.payment, Fri Sep 14 10:26:58 +0900 2007
=> nil
リカバリ方法
メッセージを再度AP4Rに Forward します。AP4R プロセスを起動しておきましょう。
% cd as_ap4r
% ruby script/mongrel_ap4r start -A config/queues_mysql.cfg
SAF用テーブルに保管されている、すべての未処理のメッセージをリカバリするには次のようにします。戻り値は、リカバリに成功したメッセージの数と失敗した数の配列になります。ここでは、未処理のメッセージは1つだったので、無事にリカバリができたことになります。
% cd as_rails
% ruby script/console
Loading development environment.
>> Ap4r::StoredMessage.reforward_all
=> [1, 0]
リカバリ対象のメッセージが大量にある場合は、引数を指定することで、その数ごとにひとつのデータベーストランザクションとしてまとめて処理できます。引数なしの場合は、10メッセージごとに処理しています。下記の例では、未処理のメッセージを10用意しました。3つずつまとめて再送処理を実行していますが、そのなかでエラーが発生しています。エラーのあった回の3つは処理が失敗に終わり、それ以外の7つは無事に成功しています。失敗したメッセージはさきほどと同様にして確認できます。
>> Ap4r::StoredMessage.find_status_of(:unforwarded).size
=> 10
>> Ap4r::StoredMessage.reforward_all 3
ActiveRecord::RecordNotFound: ...(省略)
=> [7, 3]
>> y Ap4r::StoredMessage.find_status_of(:unforwarded).map{|sm| sm.to_summary_string}
---
- 35, queue.async_shop.payment, Fri Sep 14 10:38:07 +0900 2007
- 36, queue.async_shop.payment, Fri Sep 14 10:38:19 +0900 2007
- 37, queue.async_shop.payment, Fri Sep 14 10:38:28 +0900 2007
=> nil
また、特定のメッセージのみリカバリするには、次のようにします。
>> Ap4r::StoredMessage.reforward 36
=> true
SAF用テーブルを管理するRailsアプリケーションを作成すれば、GUI環境でリカバリすることも可能です。RubyForgeからHelloWorldアプリケーションがダウンロードできますが、そちらにそのサンプル実装があります。HelloWorldアプリケーションの動かし方は、ホームページのGetting Started を参考にしてください。
図7 HelloWorldサンプルアプリケーション
HelloWorldアプリケーションを起動し、以下のURLにアクセスすると、SAFリカバリの画面が表示されます。
各メッセージの中身も参照できますので、必要に応じて「Recovery」のリンクを押してください。
図8 SAFリカバリ画面
DLQリカバリ
発生状況
メッセージの処理中のネットワーク障害や、メッセージを処理するアプリケーションのエラーなどで、メッセージは元のキューからDLQに移動されます。ネットワークやDBサーバの障害の場合、DLQに入ったメッセージを元のキューに戻すことで、再度処理が行われます。メッセージ内のデータがおかしいなど、アプリケーションのエラーでDLQに入った場合は、アプリケーションのロジックを変更するか、データ自体を修正してから元のキューに戻す必要があります。今回は後者については触れません。
状況の確認
AP4Rが起動中であれば、irbから確認できます。
まず、DLQに入った状況を作るために、わざとエラーを発生させてみましょう。ここでは簡単のため、 AsyncShopController#payment
アクションの頭で
raise "dummy exception to test DLQ"
として、エラーを発生させます。この状況でブラウザから注文処理を実行すると、ブラウザには"Order was successfully created."と表示されますが、RailsおよびAP4Rのログにはエラーが記録されます。Railsのログでは payment
アクションの処理中にダミーの例外が発生したこと、それによりAP4R側のログでメッセージ処理がエラーになったことが残ります。
では、DLQの中身を見てみましょう。まず、irbを起動して、DLQへの(druby越しの)参照 dlq
とキューの管理をしている ReliableMsg::QueueManager
( のインスタンス)への参照 qm
を取得します。
% irb -rubygems -rap4r
>> dlq = ReliableMsg::Queue.new "$dlq"
=> #<ReliableMsg::Queue:0x2838360 @queue="$dlq">
>> qm = dlq.send :qm
=> #<DRb::DRbObject:0x2832b04 @ref=nil, @uri="druby://localhost:6438">
次に、DLQ(内部では、$dlq
という名前になっています)の一覧を表示します。
>> y qm.list(:queue => "$dlq")
--- # 一部整形しています
- :max_deliveries: 5
:priority: 0
:created: 1189737438
:target_method: POST
:redelivery: 1
:queue: queue.async_shop.payment
:expires:
:id: 56205b90-4499-012a-1774-0016cb9ad556
:delivery: :once
:target_url: http://localhost:3000/async_shop/payment
:dispatch_mode: :HTTP
=> nil
ここではメッセージのヘッダー部分のみが表示されていますが、1件のメッセージがあることを確認できます。ヘッダーの中で、:queue
の箇所に元のキューの名前が記録されています。
リカバリ方法
ではリカバリを行います。DLQからメッセージを抜いて、元のキューに入れるという手順になります。さきほど payment
アクションに入れたダミーの例外は削除しておきましょう。
>> dlq.get{|m| dlq.put(m.object, m.headers) }
=> "5f7df210-4499-012a-1774-0016cb9ad556"
これで元のキューにメッセージが入り、dispatcherスレッドがRailsにリクエストしてpayment
アクションが呼ばれます。少し待ってからブラウザをリロードすると「Payed at」に時刻が表示されるはずです。
ここで、ちょっと横道に逸れて、リカバリしている最中にエラーが発生した場合を考えてみましょう。DLQにメッセージを1つ入れてから、それを取得したタイミングで例外を投げてみます。
>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil
>> dlq.get{|m| raise "dummy recovery error"}
RuntimeError: dummy recovery error # 以下略
さて、DLQに入っていたメッセージはどうなっているでしょうか。
>> y qm.list(:queue => "$dlq")
--- # 抜粋。一部整形
- :id: e232a830-4499-012a-1774-0016cb9ad556
=> nil
同じ :id
でメッセージが残っていることが分かります。これは、reliable-msgの機能により、ブロック付きで ReliableMsg::Queue#get
を呼び出すと、( メッセージングの意味での)トランザクション内でブロックが実行されるためです。詳しくは、RDoc を参照してください。
DLQに入ったメッセージの確認と操作について説明しました。最低限のことは可能ですが、実用のためには、考慮すべき事項がいくつかあります。
リカバリが必要なメッセージ数が多い場合
メッセージの中身が大きい場合
リカバリ処理の自動化(例: DLQに30分入っているメッセージを自動的にリカバリする)
今後のAP4Rの拡張には、このようなことも含めていく予定です。
まとめ
全4回に渡ってAP4Rの解説をしてきました。AP4Rの概説にはじまり、キーワードとして「軽量」かつ「堅牢」をあげました。そして、それらを通じてアプリケーションを気軽に、かつ安心して非同期拡張することができることを、サンプルアプリケーションの作成を例にお話してきました。最後に、簡単なまとめをして連載の締めくくりとしたいと思います。
AP4Rとは
AP4Rとは、Asynchronous Pocessing for Rubyの略で、Rubyで非同期処理を実現するためのライブラリです。AP4Rをアプリケーションに適用することで、「 重い」処理を非同期に切り離し、ユーザーへの素早い応答が可能になります。また、疎結合化されることで処理の分散もしやすくなり、負荷増大時のスケーラビリティにもつながります。
「軽量」かつ「堅牢」
AP4RはRubyで書かれています。Rubyのもつ簡潔さ、柔軟さをうまくいかし、設定やAPIができるだけ使いやすいものになることを目指しています。そして、実装/テスト/運用をオールインワンでサポートした、導入しやすいライブラリとなるよう開発を進めています。
一方、簡単に導入できるライブラリでありながら、SAF機能により確実なメッセージの配送を実現しています。予期せずシステムを襲う異常時にもメッセージを死守する。これが信頼性のあるメッセージングに求められる必須要件となります。
シンプルなAPI
メッセージ送信の async_to
と、SAF機能のための transaction
の2つのAPIのみで非同期処理を組み込むことが可能です。また、引数の指定方法では、RubyやRailsとの親和性を考え、直感的に使えることを狙っています。
堅牢性の要、SAF機能
AP4RのメッセージングとしてのQoS(Quality of Service)は、at-least-onceです。すなわち、メッセージを生成するRailsプロセス(Producer)と、そのメッセージを処理するRailsプロセス(Consumer)の間で、最低でも 1回は処理が実行されることを保証します。データベースやネットワークの障害、プロセスの突然のダウンが発生してもメッセージが失なわれることがありません。AP4RではSAF(Store and Forward)という仕組みを通じて、このQoSを実現しています。
非同期でもテストはしっかり
非同期処理を含むアプリケーションでも、網羅的なテストが(比較的)簡単に実行できることは重要です。メッセージング処理では、複数のプロセスが関係することと、ネットワーク通信により時間がかかることの2点を考慮する必要があります。そこで、AP4Rでは、2つのテスト方法を用いて、テストしやすさと、網羅性を補完的に実現しています。メッセージのキューイングをスタブ化し素早く実行するfunctionalテストと、実際の通信も含め実動作となるべく近い環境で行うasyncテストです。
柔軟なシステム構成
AP4Rを利用した典型的なシステムの構成は、ウェブサーバ、Railsプロセス群、そしてAP4Rプロセスとなります。AP4Rを複数用意することでメッセージング層の負荷分散を図ったり、非同期処理用のプロセス群を分割することも出来ます。その一方、非同期メッセージ処理のリクエストは、HTTPなど、標準的なプロトコルを用いて送信されていますので、Railsに限らず、HTTPで受け付けているサービスと連携することも可能です。連携先は、URL変換フィルタの機能を利用して柔軟に対応できます。
また、様々な負荷分散のための設定を調整することで、処理量の増大に備えたスケールアウトや、メッセージの種類に応じた流量制御も可能です。
いざというときのリカバリ
稼動しはじめたシステムは、予期せぬエラーに見舞われることがあります。メッセージの通信中における障害や、データの不整合による処理の異常終了などです。こうした場合に備え、メッセージを復旧、再送する手だてが必要となります。AP4Rでは、SAFリカバリとDLQリカバリの方法を提供しています。
図9 軽量さと堅牢さのバランス
最後に
AP4Rでは今後の拡張として、Capistrano 用管理レシピ、Stompプロトコルのサポート、統合監視ツールへの対応などを予定しています。興味を持たれた方は、ユーザー向けのメーリングリスト がありますので、そちらで意見や質問、使い勝手のフィードバック、こんな機能が欲しいという要望などを頂ければと思います。筆者らのブログにトラックバックを飛ばしてもらうのでもOKです。
本連載に最後までお付き合いいただき、ありがとうござました。AP4Rについての理解が深まり、皆さんが立ちあげているサービスや手がけているプロジェクトで採用していただけたら幸いです。
AP4R関連リンク
RubyForge
開発者ブログ
その他