Perl Hackers Hub

第2回AnyEventでイベント駆動プログラミング (3)

AnyEventベースのモジュール

前回まで紹介したのはAnyEventの基本APIのみです。CPANにはこれらをベースにさらに複雑な機能を実装した様々なモジュールがあり、それらをうまく組み合わせて簡単に高機能なイベント駆動プログラムを構築することができます。代表的なものとしては、以下のようなモジュールが登録されています。

  • 非同期でHTTP通信を行うAnyEvent::HTTP
  • 低レイヤのI/Oを簡単にするAnyEvent::Handle
  • ソケット接続を行うAnyEvent::Socket
  • データベース処理を非同期で行うAnyEvent::DBI
  • CouchDBと非同期に通信を行うAnyEvent::CouchDB
  • PSGI/Plackを非同期エンジンで動かすTwiggy

これらを組み合わせて使うことにより、ブラウザと非同期に通信しつつ裏ではデータベースおよびイントラネットのほかのREST APIとも非同期で通信しながら処理を進めるWebサーバ、なんていうものも簡単に作れてしまうわけです。

AnyEvent::Socketを使った簡易HTTPクライアント

リスト11は、任意のホストに接続し、GET / HTTP/1.0というHTTPリクエストは発行したあと、読み込めるだけデータを読み込むサンプルです。AnyEvent::Socketを使ってソケット接続を行います。

リスト11 簡易HTTPクライアント
use strict;
use AnyEvent;
use AnyEvent::Socket;

my $cv = AnyEvent->condvar;

my $guard; $guard = tcp_connect 'your.host.name', 80, sub {
    my ($fh) = @_;

    undef $guard;

    my $w; $w = AnyEvent->io( ……(1)
        fh => $fh,
        poll => "w",
        cb => sub {
            undef $w;
            my $buf = "GET / HTTP 1.0\r\n\r\n";
            my $length = syswrite( $fh, $buf, length($buf) );

            if ($length != length($buf)) {
                warn "failed to write";
                $cv->send;
            }

            my $r; $r = AnyEvent->io( ……(2)
                fh => $fh,
                poll => "r",
                cb => sub {
                    my $length = sysread( $fh, my $buf, 8192 );
                    if ($length > 0) {
                        print $buf, "\n";
                    } else {
                        undef $r;
                        $cv->send;
                    }
                }
            );
        }
    );
};

$cv->recv;

ソケットがつながったあとHTTPリクエストを送信しなくてはならないので、(1)でI/Oウォッチャーにpoll=> "w"を指定して書き込み可能になるまで待ちます。リクエストを送信したあと(2)で再度I/Oウォッチャーを設定し、今度は書き込み可能になるまで待ったあと、読み込みを行います。この読み込み用のI/Oウォッチャーは読み込みが不可能になるまで読み込み続けたいため、sysread( )が成功している間は$rを解放しないようにします。読み込みに失敗したときに$rを解放し、$cv->sendを呼び出します。

高速に複数サーバとHTTP接続する

AnyEvent::HTTP

前項では簡易HTTP通信を行うスクリプトをスクラッチで実装しましたが、それでは対応できないパターンも当然ありますので、実際にAnyEventでHTTP通信する場合はAnyEvent::HTTPを使用するべきです。AnyEvent::HTTPならリスト12のように書くだけで任意のURLの内容をGETで取得できます。POSTなどのリクエストも同様に簡単に書けます。

リスト12 AnyEvent….::HTTPの使用例
use strict;
use AnyEvent;
use AnyEvent::HTTP;

my $cv = AnyEvent->condvar;

my $guard; $guard = http_get 'http://gihyo.jp' => sub {
    my ($body, $headers) = @_;
    undef $guard;
    print $body;
    $cv->send;
};
$cv->recv;

ただ、これだけでは非同期I/Oの良いところがまったく使いこなせていません。最初に説明したとおり、協力式マルチタスキングを実装するためには「待ち」の時間をうまく利用する必要があるのですが、この例では1個のURLしか取得しにいっていないため待ちを活かせていません。待ちを活かすには、複数のURLをなるたけ速く取得したいような状況が必要です。

複数URLを並行して取得する

それでは普通の書き方とAnyEventを用いた書き方の違いをはっきり見るために、WebサイトのHTMLをダウンロードするスクリプトをそれぞれの書き方で実装してみましょう。標準入力から1行ずつURLを受け取り、それらをダウンロードするスクリプトhttp.plを実装します。

http.plを次のように実行すると入力待ち状態になるので、URLを入力するとHTMLを取得します。

> perl http.pl
http://gihyo.jp # 入力
+ http://gihyo.jp -> 200

上記のように標準入力からURLを受け取るようにしておけば、複数のURLをファイルに記入して次のように標準入力から渡すこともできます。

> cat urls.txt | perl http.pl

普通の書き方の場合

従来の方式で実装すると、リスト13のようになります。標準入力を1行ずつ読み込みながら、LWP::UserAgentを使用して指定されたURLを1個ずつダウンロードしにいきます。

リスト13 従来方式の実装
use strict;
use LWP::UserAgent;

main() unless caller();

sub main {
    my $ua = LWP::UserAgent->new();
    while (my $url = <STDIN>) {
        chomp $url;
        my $res = $ua->get($url);
        print " + $url -> ", $res->code, "\n";
    }
}

このスクリプトが抱える潜在的な問題は、取得すべきURLリストの中にレスポンスの遅いサイトが存在した場合に顕在化します。そのサイトに当たってしまった場合、データが返ってくるのを待ってから次の処理を行うことになります。サーバには接続できるもののレスポンスが返ってこないURLを最初に指定してしまうと、リクエストがタイムアウトするまで待ってからほかのURLを取得しにいくことになり、ほかのサイトのレスポンスがどんなに速くても最初に大幅に時間をロスしてしまいます。遅いURLが複数個存在した場合はさらに遅延は深刻化します。

イベント駆動で非同期I/Oを実現した場合

AnyEventを使用し非同期で複数のURLを同時に見るようにすれば、そのようなことは起こりません。次に非同期の実装を見てみましょうリスト14⁠。

リスト14  AnyEvent::Handle、AnyEvent::HTTPで非同期I/Oの実装
use strict;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::HTTP;

main() unless caller();

sub main {
    my $cv = AnyEvent->condvar;

    # 標準入力からURLを1行ずつもらうので、STDINを監視する
    # AnyEvent::Handleオブジェクトを作成する
    my $handle = AnyEvent::Handle->new(
        fh => \*STDIN,
        # エラー、もしくはEOF時に$cvに終了通知を送る
        on_eof => sub { $cv->end },
        on_error => sub { $cv->end },
    );

    # 最後の待ちを有効にするために1回beginを呼んでおく
    $cv->begin();

    my $w;
    my $read_stdin; $read_stdin = sub {
        $handle->push_read( line => sub {
            my ($handle, $url) = @_;
            chomp $url;
            if ($url) {
                $cv->begin(); # このURLを処理している間は
                                 # $cvの条件を満たさないように
                                 # フラグを立てておく

                # AnyEvent::HTTPを使ってリクエストを送る
                http_get $url, sub {
                    my ($body, $headers) = @_;
                    print " + $headers->{URL} -> " .
                        "$headers->{Status}\n";
                    
                    # URLを取得したのでフラグを落とす
                    $cv->end();
                };
            }
            # 1行読んだので、もう1行読むために$read_stdinを
            # イベントキューにいれてもらう
            $w = AnyEvent->timer(after => 0, cb => $read_stdin);
        } );
    };

    $w = AnyEvent->timer(after => 0, cb => $read_stdin);

    # $cv->end()がbegin()の回数分呼ばれるまで待つ
    $cv->recv();
}

普通の書き方と比べてコードは大分複雑になってしまいましたが、そのほとんどは終了条件を調整するためのものです。

リスト14では標準入力を監視するためにAnyEvent::Handleというモジュールを使っています。このモジュールはpush_read(line => sub { ... })と指定しておくことによって1行分データが溜まると任意のコールバックを呼んでくれるので、これを使って標準入力にURLが入力されたことを感知できます。

実際のHTTP処理はAnyEvent::HTTPに実装されているhttp_getが非同期で行ってくれるので、我々は標準入力から取得したURLをhttp_getに渡し、URL先が取得できた際に実行するコールバックを指定して呼び出すだけです。

このコードで重要なのは$cvの扱いです。最終行で$cv->recvとしていますが、これをしないとURLの取得や標準入力からの入力を待たずにスクリプトが終了してしまいます。$cv->recvを指定することによりスクリプトはここでブロックし、イベントループが処理を開始できるようになるわけです。ですが、逆にこの変数に適切に終了シグナルを送信しないと、このスクリプトはいつまでもブロックされたまま終了しません。

このスクリプトは基本的には標準入力が閉じられたら終了という前提ですので、まず標準入力を監視しているAnyEvent::HandleオブジェクトのエラーコールバックとEOFコールバックにそれぞれ$cv->endへの呼び出しを仕込んでおきます。

また、HTTP通信を始める前にも$cv->beginを呼び出し、結果が返ってきてから$cv->endを呼び出すようにしておきます。こちらはさっきと逆で、標準入力が閉じられたあとでもそれまでにリクエストされたURLがすべて取得されるまでスクリプトを終了しないために指定します。このように指定しておくことによって、

> cat urls.txt | perl http.pl

のようなURLのリストを受け取ったあとすぐに標準入力が閉じられてしまうような状況でも正しく動作するようにしています。

比較のため、存在しないURLやレスポンスが遅いサーバのURLなどを混ぜてリスト13とリスト14を実行してみてください。リスト13では遅いサーバのURLを取得しにいく際にプログラム全体が停止しますが、リスト14ではほかのURLを先に取得しにいきます。

以上見てきたように、非同期I/Oを使用するとコードは若干複雑になってしまいますが、使いどころさえ間違えなければ強力なツールとなります。特にいつ起こるかわからないイベント通知(Twitterの発言やDBの書き込みなど)をリアルタイムに感知および通知することが必要になる今どきのプログラミングでは、きっと重宝するはずです。

まとめ

今回はAnyEventの基本APIとAnyEventを利用したモジュールの使い方を紹介しましたが、AnyEventとイベント駆動プログラミングの世界ははるかに奥が深く可能性に満ちています。

AnyEventの基本APIに関してはそのシンプルさにだまされてはいけません。基本APIだけでも非常に複雑なイベント駆動なプログラムを書くことができますし、CPANにはこのほかにもさまざまなモジュールが登録されているので、基本APIとこれらのモジュールを組み合わせると驚くほど高機能なアプリケーションを簡単に書くことができてしまいます。

たとえば本連載の前回で紹介したPlackの非同期エンジンTwiggyの中で、TwitterのストリーミングAPIを扱うためのAnyEvent::Twitter::Streamやデータベースへの問い合わせを非同期に行うAnyEvent::DBIを利用することにより、高機能なTwitterプロキシなどを実装できたりします。

みなさんもこれを機会に非同期プログラミングに挑戦してみてはいかがでしょうか。

次回は村瀬大輔さんでテーマはDBIx::Classです!

おすすめ記事

記事・ニュース一覧