Hatena::ブログ(Diary)

cooldaemonの備忘録 RSSフィード

2010-02-11

Perl で作成した RabbitMQ 専用クライアントライブラリを AnyEvent と Coro で非同期化しました

以前、AMQP と RabbitMQ を学ぶために RabbitFoot という名前の Perl 版のクライアントライブラリを作成したのですが、非同期化して欲しいと要望を頂いたので、AnyEvent と Coro を利用してみました。

AnyEvent と Coro は、今回、初めての利用となるため、識者からの厳しいツッコミがあると嬉しいです。

cooldaemon’s RabbitFoot at master - GitHub

Consume 用の Channel を五つ、Publish 用の Channel を一つ開き、Channel 間でメッセージを送受信する例は、下記の通りです。

use Coro;
use RabbitFoot;

my $rf = RabbitFoot->new()->load_xml_spec(
    '/path/to/fixed_amqp0-8.xml',
)->connect(
    host    => 'localhosti',
    port    => 5672,
    user    => 'guest',
    port    => 'guest',
    vhost   => '/',
);

my $main = $Coro::current;
my $done = 0;

my @queues = map {
    my $queue = 'test_q' . $_;
    my $ch = $rf->open_channel();

    $ch->declare_queue(queue => $queue);

    my $frame; $frame = $ch->consume(
        queue      => $queue,
        on_consume => unblock_sub {
            my $response = shift;
            return if 'stop' ne $response->{body}->payload;

            $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag);
            $done++;
            $main->ready;
            schedule;
        },
    );

    $queue;
} (1 .. 5);

my $ch = $rf->open_channel();
for my $queue (@queues) {
    for (qw(hello stop)) {
        $ch->publish(
            routing_key => $queue,
            body        => $message,
        );
    }
}
schedule while $done < 5;

$ch->delete_queue(queue => $_) for @queues;

$rf->close;

consume メソッドの on_consume や、今回は利用していませんが publish メソッドの on_return には、sub ではなく unblock_sub を利用するのがポイントです。

sub を利用すると、イベントループからコールバック関数が呼ばれるので、Coro::rouse_cb を内部で利用している cancel は利用できません。

これで「一つのプロセスをスレッドにより分割し、一つのソケット接続を Channel により分割するので、資源を有効に利用できる」という AMQP の恩恵を、Perl 経由で享受できるハズです。*1

蛇足ですが、Coro はマルチコアに対応しているわけではないので、マルチコアを使い切りたいのであれば、プロセスを複数作る必要があります。

*1Erlang 層と Perl 層と繋ぐ際は、AnyEvent + Coro は必須ですね