Perl Hackers Hub

第22回Coroを使ったやさしいクローラの作り方(3)

(1)こちら⁠2)こちらから。

並列・分散処理のためのクローラ

ある程度大規模なクローラを作る場合は、一定時間内になるべく多くのURLから効率的にデータを収集する必要があります。接続先のホストごとに同時接続数やウェイトなどの配慮をしつつ、全体としては大量のリクエストを同時に処理する必要があります。検索エンジンのためのクローラ、定期的に同じURLにアクセスする必要がある更新チェックやフィードアグリゲータなどは、特に該当するでしょう。

Perlで高性能なクローラを書く場合の選択肢は2つあります。一つはforkによるマルチプロセスによるもので、Parallel::ForkManagerやParalell::Preforkを使うケースや、ジョブキューのためのフレームワークを使ったワーカがこれに該当します。もう一つはAnyEventやCoroを使ったI/O多重化によるアプローチです。筆者が好んで使うのはこれら2つを組み合わせた、I/Oを多重化しつつ、CPUのコア数に合わせた少数のプロセスを立ち上げるという方式です。

以降では、1プロセス内で複数のリクエストを同時に実行するI/O多重化によるアプローチでのクローラの作り方を紹介します。PerlでI/O多重化を実現するモジュールは数多くあり、それぞれ長い歴史がありますが、筆者がお勧めするのはAnyEventとCoroです。いずれもlibevの作者であるMark Lehmannによるものです。AnyEventは環境によって利用可能なイベントループのライブラリを自動で判別し、統一したインタフェースで利用可能にするモジュールです。Coroはコルーチンによる協調スレッドを実現するためのモジュールです。

AnyEventとCoroの違い

Coroを使うと複雑な非同期処理を、いわゆる「コールバック地獄」に陥ることなくシンプルに記述できます。たとえば1秒ごとに何か処理を実行したいというコードは、AnyEventとCoroだと次のような違いがあります。

AnyEventの場合
use AE;
my $i = 0;
my $cv = AE::cv;

# 1秒ごとに、指定した関数を実行せよ
my $watcher = AE::timer 0, 1, sub {
  warn $i++;
  $cv->send("done") if $i >= 10;
};
# CV = condition variable が利用可能になるまで待機
warn $cv->recv;
Coroの場合
use Coro;
my $main = Coro::current;
async {
    my $i = 0;
    while (1) {
        # async で囲まれた部分だけがsleep によって1 秒止まる
        Coro::Timer::sleep 1;
        warn $i++;
        last if ( $i > 10 );
    }
    # 次の切り替え時にメインスレッドに切り替わるようにスケジューラに指示
    $main->ready;
};
schedule;

Coroで「定期的に何かを実行」したければ、単にwhile文やfor文によるループを使って、その中でsleepを呼びます。Coro::Timer::sleepが呼ばれたタイミングでCoroのスレッドが停止し、別のスレッドが実行可能な状態になり、1秒後にもとのスレッドが実行可能な状態になり戻ってきます。

AnyEventとCoroの使い分け

Coroは黒魔術ですので、汎用的なモジュールや複雑にならない程度の非同期処理であればAnyEventで書いたほうが無難です。ただ、実際にプロダクションで動くコードで、一度でもCoroを使うことを選択したのであれば、Coroを積極的に使ってしまったほうがよいでしょう。癖はありますが、Coroを使わないと実現できないようなことが多くあり、AnyEventを使うよりもコードを短く、すっきりさせることができるでしょう。

Coro::Timerの内部は次のようになっています。

sub sleep($) {
   my $w = AE::timer $_[0], 0, Coro::rouse_cb;
   Coro::rouse_wait;
}

内部ではAE::timer を使って指定秒数後にCoro::rouse_cbが呼び出されるようにして、実行されるまで現在のcoroを停止させる(別のcoroが実行されるようにする)ということになります。この例を見てわかるように、AnyEventのcallback方式で記述可能なものは、すべてCoro::rouse_cb とCoro::rouse_wait を使ってCoroの流儀に変換できます。逆に、一度Coroを使用することを前提にして書かれたコードを、Coroを使わずにAnyEventだけで書き直す作業は非常に困難です。

そのためAnyEventとCoroの使い分けは、汎用的なライブラリにはAnyEventを使い、複雑な処理内容を含む実際のプロダクトやCoroを使わないとできないことがある場合はCoroを使う、というのがお勧めです。

Coroを使った典型的なクローラのひな型

Coroを使ってホストごとの接続数制限やウェイトを実装したクローラのサンプルはリスト1のようになります。実際には各処理は複数のクラスに分割したほうがよいですが、見通しが良いように1つのファイルにまとめてあります。

Coroを使うメリットとして、SemaphoreやChannelによって1プロセス内での排他制御やメッセージングが高速に行えるというものがあります。サンプルコードと共に見ていきましょう。

ホストごとの並列数を制限する

クローラを書くうえで、相手先のサーバに過大な負荷をかけないように1ホストあたりの同時接続数を制限したい場合、Coro::Semaphoreを使うのがよいでしょう。セマフォSemaphoreは共有リソースへのロックの獲得と解放を制御するためのしくみです。

たとえば同時接続数を4件に制限したい場合、典型的には次のようなコードになります。

my $hosts = {};
sub task {
    my $url = shift;
    my $host = URI->new($url)->host;
    # ホストごとのセマフォを呼び出す、なければ新しく作る
    my $semaphore = $hosts->{$host} ||= Coro::Semaphore->new(4);
    # 4 件以上呼ばれたら、その時点で別スレッドに切り替わる
    my $guard = $semaphore->guard;
    # 何らかの処理
    ...
}

上記のコードでは、$semaphore->guardを使って「自動解放されるロック」を作っています。guardを使うと、guardオブジェクトが格納されている変数の参照カウンタがゼロになり、そのオブジェクトが解放されたタイミングで何らかの処理が実行されるという機構を作ることができます。CoroやAnyEventを使う場合、よくこの「Guard」という概念が出てきます。

$semaphore->guardを使わずに、$semaphore->downでロックの確保、$semaphore->upでロックの解放と明示的に行うこともできますが、個人的にはguardを使うことをお勧めします。最初のうちは処理の開始時にdown、処理の終了時にupと呼び出したほうがわかりやすく感じるかもしれませんが、downだけ呼ばれてupが呼ばれないと、ロックされたままプログラムが停止してしまいます。しかしguardを使うと、特定のスレッドがエラーで終了してしまっても、guardオブジェクトが破棄されれば確実に$semaphore->upが呼び出されます。非同期タスクの寿命とguardオブジェクトの寿命を一致させるようにすることで、ロックの解放忘れがなくなります。

詳しい使い方はperldoc Coro::Semaphoreを参照してください。

スレッド間の連携を行う

クローラに必要な処理を複数のスレッドに分担させ、それらをCoro::Channelを使って連携します。Coro::current->descを使って役割に応じてスレッドに名前を付けておくと、デバッグの際に便利です。URLを受け取ってHTTPレスポンスを取得するためのfetcher(1)⁠、取得したレスポンスを解析するためのparser(2)⁠、解析した結果をファイルなどに書き出すためのupdater(3)という3種類のスレッドを動かしています。

もしこれが、それぞれcallback方式で記述されていた場合にはどうなるでしょうか。

リスト1 CoroのSemaphoreとChannelを使った典型的なクローラのためのコード(coro_crawler.pl)
use strict;
use Coro;
use Coro::Channel;
use Coro::Semaphore;
use Coro::Timer;
use URI;
use FurlX::Coro;
use Web::Query;
use Try::Tiny;

my @done;
my @fail;
sub done { push @done, $_[0] }
sub fail { push @fail, $_[0] }

my %queue;

sub queue {
    my $name = shift;
    $queue{$name} ||= Coro::Channel->new;
}

sub logger {
    my $msg = shift;
    # 時刻と現在のスレッド名を出力
    warn localtime . sprintf ": %s %s\n", Coro::current->desc, $msg;
}

# 全体の同時接続数制限
my $global_lock = Coro::Semaphore->new(5);
sub global_lock {
    $global_lock->guard;
}

# ホストごとの接続数制限
my %lock;
my $use_sleep = 1;
sub host_lock {
    my $url = shift;
    my $host = URI->new($url)->host;
    my $sem = $lock{$host} ||= Coro::Semaphore->new(1);
    # 最後の接続から一定時間ウェイトを入れる
    if ($use_sleep && $sem->count == 0) {
        my $guard = $sem->guard;
        Coro::Timer::sleep 3;
        return $guard;
    }
    $sem->guard;
}
(1)
sub fetcher {
    my $url = queue("fetch")->get;
    my $lock = host_lock($url);
    my $glock = global_lock();
    my $ua = FurlX::Coro->new;
    logger($url);
    my $res = $ua->get($url);
    queue("parse")->put([$url, $res]);
}
(2)
sub parser {
    my $data = queue("parse")->get;
    my ($url, $res) = @$data;
    logger($url);
    # タイトルを抜き出す場合
    my $title = Web::Query->new_from_html($res->content)
                ->find("title")->text;
    queue("update")->put([$url, $title])
}
(3)
sub updater {
    my $data = queue("update")->get;
    my ($url, $res) = @$data;
    logger($url);
    warn $res;
    done($res);
}
sub create_worker {
    my ( $name, $code, $num ) = @_;
    for ( 0 .. $num ) {
        my $desc = $name . "_" . $_;
        async_pool {
            Coro::current->desc($desc);
            while (1) {
                try {
                    $code->()
                } catch {
                    warn $_;
                    fail($_);
                }
            }
        }
    }
}

create_worker( fetcher => \&fetcher, 1000 );
create_worker( parser => \&parser, 1 );
create_worker( updater => \&updater, 1 );

# 取得するURL
my @list = qw(
    http://localhost/1
    http://localhost/2
    http://local.example.com/2
    http://local.example.com/3
);

my $stop_flag = 0;
my $force_exit = 0;
my $total = scalar @list;
my $main = Coro::current;

# シグナルを受け取って「終了」フラグを立てる
$SIG{INT} = sub {
    if ($stop_flag == 1) { $force_exit = 1 }
    $stop_flag = 1;
    if ($force_exit) {
       die "exit";
    }
};

# manager
async {
    while (1) {
        Coro::Timer::sleep 1;
# 作業途中のデータが失われないようにするとよい
# 新規のジョブは受け取らないようにして
# 実行中のジョブが終了するのを待つ、など
        if ($stop_flag) {
            warn "signal recieved!!! ";
            async {
                Coro::Timer::sleep 10;
                $stop_flag = 0
            }
        }
        warn sprintf "Task: %s/%s Fail: %s",
        scalar @done, $total, scalar @fail;
        my $done = scalar @done + scalar @fail;
        if ($done == $total) {
            warn "All task done!";
            $main->ready;
        }
    }
};

queue("fetch")->put($_) for @list;
schedule;



http_fetch($url, sub {
    my ($url, $res) = @_;
    parse_response($res, sub {
        my $res = shift;
        update_databse($res, sub {
            warn "done!"
        })
    })
});

こんな具合にネストが深いものになってしまうでしょう。もちろんコーディングテクニックで解消できる部分も多くありますし、シンプルなワーカをキューを使って連携するというふうにすれば、AnyEventを使う場合でも見通しの良いコードになるでしょう。

複数のセマフォを組み合わせて使う

ある程度大きなクローラを書くことになると、複数の制限を加えることになるでしょう。⁠生成するCoroの最大数」⁠ホストごとの最大接続数」⁠プロセス中の全体での最大同時接続数」をそれぞれセマフォを使って制限します。セマフォによって何も処理しない休止中のスレッドが出てくるので、CoroのスレッドをHTTPクライアントの同時接続数より多く作り待機させます。

最後の接続から一定秒数のウェイトを入れるような処理も、単にCoro::Timer::sleepを使って実装できます。条件が整うまではそのスレッドを一時停止して別のスレッドに切り替わるといった、AnyEventでは書くことが難しい処理でもすんなりと直感的に書けるのがCoroの良いところです。Coroのスレッド作成はforkで子プロセスを作るよりも非常に高速、省メモリであるため、100や200程度であれば気軽に作成できます。

複数プロセスで共有するキューを作る

Coro::Channelを使うことで別のスレッドにデータを受け渡すことができますが、これはあくまでそのプロセス内のオンメモリでの処理でしかありません。複数のプロセス、複数のサーバに跨ってデータを送受信するには何らかのストレージにデータを保存しなくてはいけません。またI/OではなくCPUがボトルネックになるような処理がある場合は別プロセスに分割したほうがよいので、複数のプロセス間でデータを受け渡すためのキューサーバがあると何かと便利です。ここではRedis[5]を使ってみましょう。

RedisのLIST型を使う

Redisにはちょうどキューとして使えるデータ型とコマンドが存在しているので、複数プロセスで共有するキューを作ることができます。publish側はメッセージをrpushで追加していき、subscribe側はblpopで先頭から取り出していくことでRedisのLISTをキューとして使うことができます。blpopのbはblockingを意味します。LISTが空だった場合に、即座にレスポンスを返さずに指定時間までLISTにデータが追加されるのを待つことができます。

PerlからRedisにアクセスするためのCPANモジュールはいくつかありますが、AnyEvent::Redisを使うことでほかのCoroと並列して動作させることができます。RedisのLISTにCoro::Channel 風のインタフェースを付けてみましょう。

Redisを使った簡易キューサーバ
package Rq;
# Redis as Queue
use strict;
use Coro;
use AnyEvent::Redis;
our %REDIS_SERVER = (server => '127.0.0.1', port => 6379);

sub new {
  my $class = shift;
  my $name = shift;
  my $self = {
    name => $name,
    redis => AnyEvent::Redis->new(%REDIS_SERVER),
  };
  bless $self, $class;
}
# LIST から取得
sub get {
  my $self = shift;
  while(1) {
    $self->{redis}->blpop($self->{name}, 10, rouse_cb);
    my $res = rouse_wait;
    return $res->[1] if ($res);
  }
}
# LIST に追加
sub put {
  my ($self, $msg) = @_;
  $self->{redis}->rpush($self->{name}, $msg);
}

1;
publish側
use strict;
use Rq;
use Coro;
use Coro::Timer;

my $queue = Rq->new("test_channel");
sub publish {
    my $message = shift;
    $queue->put($message);
}
my $broker = async {
    Coro::current->desc("broker");
    my $i = 0;
    while (1) {
        $i++;
        Coro::Timer::sleep 1;
        publish( "task: " . $i );
    }
};

schedule;
subscribe側
use strict;
use Coro;
use Coro::Timer;
use Rq;
my $channel = Rq->new("test_channel");
async {
    while(1) {
        my $msg = $channel->get;
        warn $msg;
    }
};
# $channel->get がブロッキング中でもタイマーは動き続ける
my $timer = async {
    while(1) {
        Coro::Timer::sleep 1;
        warn time;
    }
};
schedule;

これで複数のプロセス間でRedisを介してメッセージの送受信ができるようになりました。Coro::Channelと違って文字列しか受け渡すことができないので、キューを便利に使うためにはハッシュやArrayを格納する際のシリアライズ/デシリアライズの方法を決めておく必要があるでしょう。注意すべき点としては、異常終了時には処理中のデータが消えてしまうことです。

一般的なメッセージキューのためのミドルウェアでは、メッセージを受信したあとにAckコマンドやそれ相当の命令を送ることで初めて、受信したデータがキューから消えるようになっています。たとえばQ4Mの場合には、queue_waitとqueue_endを組み合わせて使用します。処理中のデータはほかのクライアントから「隠れて見え」て、処理が終わったらメッセージが削除されるようになっています。高い信頼性が求められる場合には、こういった処理が必要になるでしょう。

Coroを使うメリット

昨今は、マルチコア、メニーコアの時代になっているので、Coroによる1コアしか使うことができないシングルプロセス内での軽量スレッドや、I/O多重化による並行処理よりも、あまり深く考えずに愚直に「たくさんのプロセスを立ち上げたほうが速い」という状況もあり得るでしょう。1プロセス内でのCoro/AnyEventを使ったI/O多重化によるアプローチは、複数の処理を「並行」して動作させることができますが、複数のCPUコアを使って真に「並列」には動作しません。

Coroのメリットは、マルチプロセスによる並列処理と違って1プロセス内での状態の共有、協調動作が非常に容易にできるということにあります。たとえばセマフォによる同時接続数の管理など、複数のスレッドから状態を共有することによって容易に行えるようになっています。Coroを使った1プロセス内での並行処理を少数forkして行うといった構成の場合でも、1プロセスあたりの同時接続数が限られていれば、⁠1プロセスあたりの接続数×プロセス数」で同時接続数にも自然に制限がかかることになります。これが大量のプロセスが共有メモリを持たずに独立して動いている場合には、接続数を管理するための中央サーバが必要になってしまいますし、1リクエストごとに問い合わせるという非効率なしくみになってしまいます(もちろん厳格に管理したい場合はそういったものが必要になってしまいますが⁠⁠。

インメモリのキャッシュを活用する

また、プロセス内でのDNSのキャッシュ処理も、1プロセスの生存期間内に、同じサーバに何度もアクセスする場合は非常に効果的です[6]⁠。Keep-Alive接続に対応したいといったケースでも、複数のプロセスに跨ってしまえばコネクションを共有できませんが、1プロセス内でI/O多重化による並行処理を行っているのであれば、一度接続したホストへのコネクションをキャッシュしておくことができます。このように「1プロセス内で状態を共有する」ことによって得られるメリットが大きければ、CoroやAnyEventによるアプローチは非常に強力なものになります。

まとめ

本稿では、クローラを作る際に参考になる情報を紹介しました。

さて、次回の執筆者は久森達郎さんで、テーマは「Perlアプリケーションのテストと高速なCIContinuous Integration継続的インテグレーション)環境構築術」です。

おすすめ記事

記事・ニュース一覧