Hatena::ブログ(Diary)

モバイル系ニートエンジニア`chobi_e`の日記 Twitter

2010-09-22

PHP+ZeroMQでお手軽メッセージング

PHPで扱えるなにかよいイベント通知がないかとtwitterでつぶやいたらid:heavenshellからZeroMQを教えてもらいました!


おかげさまで仕事のほうがさっくり終わりそうです(*´д`*)


ZeroMQとは?


様々なメッセージを高速にやり取りできるメッセージエンジンです。


シンプルに1対1のメッセージングや、多対多のメッセージングも行えて細かい接続などのハンドリングはZeroMQが行ってくれるのでとても楽チンです。


まずはZeroMQライブラリのインストールから行いましょう。


ZeroMQ及びZeroMQのPHPバインディングのインストール


CentOSの場合こんな感じで

  wget http://www.zeromq.org/local--files/area:download/zeromq-2.0.9.tar.gz
  tar zxf zeromq-2.0.9.tar.gz
  cd zeromq-2.0.9
  ./configure
  make && make install

  cd ..
  git clone git://github.com/mkoppanen/php-zmq.git
  cd php-zmq
  phpize && ./configure
  make && make install
  
  echo 'extension=zmq.so' > /etc/php.d/zeromq.ini
  service httpd restart

シェルで`php -m | grep zmq`と実行してみてzmqと書いてあれば上手く設定はすんでいるかと思います。


そいではまずは簡単なPUB/SUBのサンプルから


publisher.php

<?php
$server = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PUB);
$server->bind("tcp://127.0.0.1:5555");

for(;;){
  echo "say Hello to subscriber." . PHP_EOL;
  $server->send("test Hello");
  sleep(1);
}
?>

subscriber.php

<?php
$queue = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_SUB);
$queue->connect("tcp://127.0.0.1:5555");
// testに前方一致したメッセージを拾うよ
$queue->setsockopt(ZMQ::SOCKOPT_SUBSCRIBE,"test");

var_dump($queue->recv());
?>

上のソースをコピペできたら二つの端末を用意して片方で`php publisher.php`もう片方で`php subscriber.php`と実行してみてください。


どうでしょう?subscriber.php側の端末でHelloと1秒毎に表示されているでしょうか?たったのこれだけでメッセージングが行えてしまうのですよ!


試しにpublisher.phpを実行している端末でCtrl+Cを押して、もう一度`php publisher.php`を実行してみましょう・・・。


実行したらまたpublishされたメッセージが正常に配送されています。


接続しているか、接続していないかを気にせずにメッセージングを行えるのでとっても簡単ですね!他のサンプルについてはZeroMQのPHPバインディングのソースにあるexampleの中、もしくは昨日下調べをしてメモっといたhttp://gist.github.com/589911を参照してもらえると良いかと思います。


ワタシも昨日調べたばっかりな上に日本語の情報が極端に少ないので良くわかっていませんがとりあえず使うにあたってはこんな感じで把握しておけば色々と遊べると思います!


ZeroMQのメッセージ種別


ZeroMQのメッセージングの種別はPUB,SUB,REQ,REP,XREQ,XREP,PUSH,PULL,PAIRと有り、それぞれのコンテキストに対応する種別でなければ通信をすることが出来ないようになっています。


メッセージの対応一覧
  PUB × SUB
  REQ × REP
  REQ × XREP
  XREQ × REP
  XREQ × XREP
  XREQ × XREQ
  XREP × XREP
  PUSH × PULL
  PAIR × PAIR

  • PUB(PUBLISH)は複数のSUB(SUBSCRIBER)に対して一斉にメッセージを通知するメッセージング種別です。
  • REP(REPLY)はREQ(REQUEST)からのメッセージに対して返事をするメッセージング種別です。

他のものについてはまだ調べてないですが、X系は複数クライアントに対して実行する。PUSH,PULLはキューイングできるメッセージング、PAIRやUPSTREAM/DOWNSTREAMは相互に接続してメッセージのやりとりをするようです(未確認)


もう少し興味深いサンプルコード


メッセージを配送するだけでは面白くないので、ネットオークションでよく見られる入札されたら残り時間が延長されるような仕組みを考えてみましょう。


  • 入札を待ち受けつつ(REP)、オークションの残り時間が変更されたら時間(unixtime)をPUBLISHするメッセージハブ(manager.php)
  • オークションの残り時間をカウントダウンしつつ、残り時間が変更されたら更新をするサブスクライバ(countdown.php)
  • オークションに入札(REQ)を行うクライアント(bid.php)


とりあえずはこんな感じで三つのスクリプトを書いてみましょう。


manager.php

<?php
/*
** オークションの時間管理と入札の管理を行ないます
** @author chobi_e
** @created_at 22:44 2010/09/22
**/
date_default_timezone_set("Asia/Tokyo");
$publisher = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PUB);
$manager   = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);

$publisher->bind("tcp://127.0.0.1:5555");
$manager->bind("tcp://127.0.0.1:5556");

$remaining = strtotime("now + 1 minute");
$number = 0;
echo "[Info] オークションが開始されました。" . PHP_EOL;
for(;;){
	$now = time();

	if($now > $remaining){
		echo "[Info]オークションが終了しました." . PHP_EOL;
		exit(0);
	}
	
  try{
		$publisher->send($remaining);
    //ノンブロッキングでメッセージの受信を行ないます。
    $bid = $manager->recv(ZMQ::MODE_NOBLOCK);
  	$remaining = $remaining+20;
  	$number++;
		echo "[Info]{$number}入札目キタワァ*・゜゚・*:.。..。.:*・゜(n'∀')η゚・*:.。..。.:*・゜゚・* " .PHP_EOL;
    //とりあえず入札に対する返事はは常にOKとします。
    $manager->send("ok");
  }catch(ZMQSocketException $e){
  	//recvが行えない場合(reqがいない場合など)はEAGAINが即時に発生します。
    if ($e->getCode() === ZMQ::ERR_EAGAIN) {}
  }
  sleep(1);
}

countdown.php

<?php
/*
** オークションのカウントダウンを行ないます
** @author chobi_e
** @created_at 22:44 2010/09/22
**/

date_default_timezone_set("Asia/Tokyo");

$subscriber = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://127.0.0.1:5555");
// subscriberはPublishされたメッセージを何でも読みます
$subscriber->setsockopt(ZMQ::SOCKOPT_SUBSCRIBE,"");

$remaining = $subscriber->recv();
for(;;){
	$time = time();
	if($remaining < $time){
		echo "オークションが終了したみたいです". PHP_EOL;
		exit(0);
	}

	try{
		$_remaining = $subscriber->recv(ZMQ::MODE_NOBLOCK);
		if($remaining < $_remaining){
			echo "[Notice]入札があったみたいです!時間を20秒延長します" . PHP_EOL;
			$remaining = $_remaining;
		}
	}catch(ZMQSocketException $e){
    if ($e->getCode() === ZMQ::ERR_EAGAIN) {
    }
	}
	
	if($remaining-$time){
		printf("オークション終了まであと%s\r\n",countdown($remaining,$time));
	}
	sleep(1);
}



function countdown($remaining,$now){
  if($now > $remaining){
    return "00:00:00";
  }

  $hour    = (($remaining-$now)/3600);
  $minutes = (($remaining-$now)%(24*60*60)/60)%60;
  $second  = (($remaining-$now)%(24*60*60))%60%60;
  
  return sprintf("%02d:%02d:%02d",$hour,$minutes,$second);
}

bid.php

<?php
/*
** オークションへの入札を行ないます
** @author chobi_e
** @created_at 22:44 2010/09/22
**/
$requestor = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$requestor->connect("tcp://127.0.0.1:5556");

echo "入札開始: " . PHP_EOL;
try{
	$requestor->send(1);
	$msg = $requestor->recv();
	echo "入札成功!: " . $msg . PHP_EOL;
}catch(ZMQSocketException $e){
	var_dump($e->getCode());
}

そいでは三つの端末を用意してauction.php,countdown.phpを実行しておきましょう。

auction.phpはオークションの開始、入札、終了の時だけシェルに文字を表示するようにしてあります。countdown.phpではauction.phpがPUBLISHしているオークションの終了時間と現在の時間を比べてカウントダウンを行ないます。


準備は良いですか?それではあなたの好きなタイミングで>|php bid.php|<を実行してみましょう。どうですか?オークションの時間が延長されましたね!


それではよく流れを見てみましょう。



bid.php(REQ)からauction.php(REP)に入札のメッセージが届くとオークションの残り時間に20秒足すようにしてあります。

auction.php(PUB)では毎秒オークションの残り時間をPUBLISHしていたので、そのメッセージをSUBSCRIBEしていたcountdown.phpでは正しくオークションの終了時間が更新されるワケですね。


どうでしょうか?メッセージングが行えるとこんなことが手軽に実装できるみたいです。


あとがき


実際問題としてはPUB/SUBのメッセージングは配送されない場合があったりするので同期したメッセージングには不適切です。今回の場合はひとまず実装が簡単なサンプルとしてオークションの終了時間を延長するには?ということで扱ってみましたが実際に使用する場合には同期できるほかの仕組みも考慮してください。


ワタシも昨日今日で調べたばっかりな上にあんまり参考にできるページがなかったので間違い&用語に不適切な単語があるかもしれませんが、もし誤りがあったらどうか教えてください。



はてなダイアリー初回で長めのエントリになってしまいましたが今後も技術的なtipsなどをメモっていこうとおもうので気長にどうぞよろしくお願いします!