Hatena::ブログ(Diary)

cooldaemonの備忘録 RSSフィード

2011-09-02

RabbitMQ のクラスタリング機能にキューのミラーリングが追加されたので RabbitFoot (AnyEvent::RabbitMQ) から試してみる

クラスタリングやキューのミラーリングの詳細は、下記参照の事。

RabbitMQ をクラスタリングする

今回はサーバを複数用意できなかったので、一つのサーバ上で RabbitMQ を二つ起動する。

% RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit1 rabbitmq-server
% RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit2 rabbitmq-server

起動したら、rabbit2 の方を一旦停止(ErlangVM は停止しない。RabbitMQ だけ停止)して、クラスタリングの設定を行ってから再起動する。

% rabbitmqctl -n rabbit2 stop_app
% rabbitmqctl -n rabbit2 reset
% rabbitmqctl -n rabbit2 cluster rabbit1@`hostname -s`
% rabbitmqctl -n rabbit2 start_app

クラスタリング構成が正しく組まれているか確認するには、下記のコマンドを実行する。

% rabbitmqctl -n rabbit1 cluster_status
Cluster status of node rabbit1@ljob04 ...
[{nodes,[{disc,[rabbit1@ljob04]},{ram,[rabbit2@ljob04]}]},
 {running_nodes,[rabbit2@ljob04,rabbit1@ljob04]}]
...done.

ミラーリングされたキューを作る

今回は、Perl Script からキューを作る。

use Coro;
use Net::RabbitFoot;

my $rf = Net::RabbitFoot->new(
    verbose => 1
)->load_xml_spec()->connect(
    host    => 'localhost',
    port    => 5672,
    user    => 'guest',
    pass    => 'guest',
    vhost   => '/',
    timeout => 1,
);

my $ch = $rf->open_channel();

$ch->declare_queue(
    queue     => 'test_q',
    arguments => {
        'x-ha-policy' => 'all',
    },
);

$rf->close();

arguments テーブルを使用しているので AMQP 0-8 から使える。

x-ha-policy に 'all' を指定しているので、クラスタリングされている全てのノード上にキューがミラーリングされる。

x-ha-policy の詳細は、RabbitMQ - Highly Available Queues 参照の事。

キューのミラーリング状態を確認するには、下記のコマンドを実行する。

% rabbitmqctl -n rabbit1 list_queues name messages pid slave_pids synchronised_slave_pids
Listing queues ...
test_q  0       <rabbit1@ljob04.2.5022.0>       [<rabbit2@ljob04.1.4932.0>]     [<rabbit2@ljob04.1.4932.0>]
...done.

上記の場合、マスターとなるキューは rabbit1 上に存在し、コピーが rabbit2 に存在している。

念のため、幾つかメッセージをキューに追加しておく。

use Data::Dumper;
for (1..2) {
    $ch->publish(
        routing_key => 'test_q',
        body        => 'Hello HA Queue.',
        on_return   => unblock_sub {die Dumper(shift)},
    );
}

ノードを停止・起動してみる

キューのマスターが存在している rabbit1 を停止する。

% rabbitmqctl -n rabbit1 stop

状態を確認してみると…

% rabbitmqctl -n rabbit2 list_queues name messages pid slave_pids synchronised_slave_pids
Listing queues ...
test_q  2       <rabbit2@ljob04.1.4932.0>       []      []
...done.

キューのマスターが rabbit2 に移っており、コピーは空になっている。

この状態でキューにメッセージを追加しておく(この時点でメッセージ数 4)。追加の方法は、前述を参照の事。

次に、rabbit1 を起動する。

% RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit1 rabbitmq-server

クラスタリングの設定は保存されているので、これだけで良い。

状態を確認してみると…

% rabbitmqctl -n rabbit2 list_queues name messages pid slave_pids synchronised_slave_pids
Listing queues ...
test_q  4       <rabbit2@ljob04.1.4932.0>       [<rabbit1@ljob04.3.225.0>]      []
...done.

コピーがノード上に作られているが、同期は行われていない。

詳細は、RabbitMQ - Highly Available Queues の Unsynchronised Slaves を参照の事。

この状態で、更にキューにメッセージを追加しておく(この時点でメッセージ数 6)。

ここでメッセージを 4 つ受信すると同期状態となる(1 つずつメッセージを受信した方が解りやすい)。

for (1..4) {
    my $response = $ch->get(queue => 'test_q');
}

感想とか

クラスタリングを構築してもキューがミラーリングされるわけではなかったので、今までは、クラスタリング機能を使わずに運用していた。(2 ノード以上で運用し、1 ノード停止してもシステム全体が止まらないような設計を行っていた)

今後は、積極的にクラスタリング機能を使って行こうと思う。

2011-08-03

CCCrypt(Objective-C) で暗号化したデータを Crypt::OpenSSL::AES(perl) で復号化する

iPhone/iPad/iPod で暗号化(AES256 を使用)したデータをサーバ側で復号化するのに半日ハマったのでメモ。

まず、ObjC 側。CCCrypt の解説は方々に存在しているので割愛。IV に NULL を指定しているので 0x00 が 16 Byte という事になる。

CCCrypt(
  kCCEncrypt,
  kCCAlgorithmAES128,
  kCCOptionPKCS7Padding,
  key, keySize,
  NULL,
  data, dataSize,
  buffer, bufferSize,
  &bufferBytes
);

次に、Perl 側。

Crypt::CBC->new(
    -key         => $key,
    -literal_key => 1,
    -cipher      => 'Crypt::OpenSSL::AES',
    -header      => 'none',
    -iv          => pack('C*', map {0x00} (1..16)),
)->decrypt($data);

ポイントは下記の通り。

  • Crypt::CBC を使う
  • literal_key に true を指定しないと key に指定した値の MD5 が KEY として使用される
  • header を none にして IV を明示的に指定する
  • CCCrypt(ObjC)側で IV を NULL にしたなら 0x00 が 16Byte なバイナリ列を指定する

2011-05-06

Mochikit で sendXMLHttpRequest する際に Content-Type を設定しないと、CGI.pm で URL エンコードされた POST データを受け取れない

サーバの入れ替えを行う際に、時間を無駄にしたのでメモ。

旧環境では、下記のコードで問題なく動作していたのだが…

// ..snip..
var r = getXMLHttpRequest();
r.open('POST', CGI_PATH, true);

var args = formContents('form_id');
this.d = sendXMLHttpRequest(r, queryString(args[0], args[1]));
this.d.addCallbacks(
  bind(this.call_back, this),
  bind(this.error_back, this)
);
// ..snip..

新環境では、CGI.pm を new すると "POSTDATA" というキーにごそっと POST したデータが入ってしまう。

CGI.pm のコードを読んでみると…

if ($meth eq 'POST'
    && defined($ENV{'CONTENT_TYPE'})
    && $ENV{'CONTENT_TYPE'} !~ m|^application/x-www-form-urlencoded|
    && $ENV{'CONTENT_TYPE'} !~ m|^multipart/form-data| ) {
    my($param) = 'POSTDATA' ;
    $self->add_parameter($param) ;
  push (@{$self->{$param}},$query_string);
  undef $query_string ;
}

どうやら、好きな Parser を指定できるようにするための措置らしい。

"CGI.pm", "POSTDATA" などのキーワードでググると、古い記事が沢山引っかかるので、相当前からの仕様らしい…。

という事で、setRequestHeader を使って "Content-Type" してみた。

// ..snip..
var r = getXMLHttpRequest();
r.open('POST', CGI_PATH, true);
r.setRequestHeader('Content-Type', 'application/x-www-form-urlencoded');
// ..snip..

FireFox の場合、open の前で setRequestHeader するとエラーとなるので注意。

きっと一般的な Perl Web エンジニアの間では常識なんだろうなぁ orz

2010-06-19

vim-ref のプラグインを自作する

vim-ref は、プラグインで拡張できる設計となっているので、試しに Twitter 検索用のプラグインを自作してみました。

と言っても、Twitter 検索を実行する適当なコマンドを Perl で自作し、プラグインの中から実行しているだけの簡単な仕組みです。

ref-twitter

search_twitter.pl

Net::Twitter や Term::ANSIColor を利用しているので、非常に行数は少ないのですが、Perl 環境が整っていない方は、準備として CPAN 祭りが必要です。

引数の文字コードは、UTF-8 固定です。

twitter.vim から使うには、PATH の通った場所に配置する必要があります。

twitter.vim

コマンド側で出力結果にエスケープシーケンスで色を付けていたのですが、何もせずに Vim のバッファに結果を表示するとエスケープシーケンスがそのまま表示されてしまいます。

そこで、下記からコードを拝借させて頂き、GVim でも色が付くように修正してから twitter.vim に組み込んでみました。

エスケープシーケンス による色付けを Vim のバッファで再現する - NaN days - subtech

ちなみに、vim-ref では、結果をキャッシュに保存する便利関数がプラグインから利用しやすい形で提供されています。しかし、常に最新の情報を参照したいので、今回はあえて利用していません。

vim-ref のプラグインを書く事は、そんなに敷居の高い行為ではないので、是非、お試しあれ。

2010-02-11

Perl で作成した RabbitMQ 専用クライアントライブラリを AnyEvent と Coro で非同期化しました

以前、AMQP と RabbitMQ を学ぶために RabbitFoot という名前の Perl 版のクライアントライブラリを作成したのですが、非同期化して欲しいと要望を頂いたので、AnyEvent と Coro を利用してみました。

AnyEvent と Coro は、今回、初めての利用となるため、識者からの厳しいツッコミがあると嬉しいです。

cooldaemon’s RabbitFoot at master - GitHub

Consume 用の Channel を五つ、Publish 用の Channel を一つ開き、Channel 間でメッセージを送受信する例は、下記の通りです。

use Coro;
use RabbitFoot;

my $rf = RabbitFoot->new()->load_xml_spec(
    '/path/to/fixed_amqp0-8.xml',
)->connect(
    host    => 'localhosti',
    port    => 5672,
    user    => 'guest',
    port    => 'guest',
    vhost   => '/',
);

my $main = $Coro::current;
my $done = 0;

my @queues = map {
    my $queue = 'test_q' . $_;
    my $ch = $rf->open_channel();

    $ch->declare_queue(queue => $queue);

    my $frame; $frame = $ch->consume(
        queue      => $queue,
        on_consume => unblock_sub {
            my $response = shift;
            return if 'stop' ne $response->{body}->payload;

            $ch->cancel(consumer_tag => $frame->method_frame->consumer_tag);
            $done++;
            $main->ready;
            schedule;
        },
    );

    $queue;
} (1 .. 5);

my $ch = $rf->open_channel();
for my $queue (@queues) {
    for (qw(hello stop)) {
        $ch->publish(
            routing_key => $queue,
            body        => $message,
        );
    }
}
schedule while $done < 5;

$ch->delete_queue(queue => $_) for @queues;

$rf->close;

consume メソッドの on_consume や、今回は利用していませんが publish メソッドの on_return には、sub ではなく unblock_sub を利用するのがポイントです。

sub を利用すると、イベントループからコールバック関数が呼ばれるので、Coro::rouse_cb を内部で利用している cancel は利用できません。

これで「一つのプロセスをスレッドにより分割し、一つのソケット接続を Channel により分割するので、資源を有効に利用できる」という AMQP の恩恵を、Perl 経由で享受できるハズです。*1

蛇足ですが、Coro はマルチコアに対応しているわけではないので、マルチコアを使い切りたいのであれば、プロセスを複数作る必要があります。

*1Erlang 層と Perl 層と繋ぐ際は、AnyEvent + Coro は必須ですね