Hatena::ブログ(Diary)

bose999の試験管の中の話 このページをアンテナに追加 RSSフィード Twitter

bose999が所属するネットパイロティング株式会社ではSaaS開発・インフラエンジニアを
募集中です! 一緒にBtoBサービスを生み出して行きませんか?


また学生のインフラエンジニアのアルバイトも募集しています。
BtoBサービスを行なっている会社でアルバイトをしてみて
システム系の仕事の現場を経験してみませんか?

2013-04-15

ZeroMQのJava実装のJeroMQを試す

ZeroMQ

http://zguide.zeromq.org/page:all


JeroMQ

https://github.com/zeromq/jeromq


ZeroMQはシンプルで高速なプロセス間通信を実現する

C/C++ライブラリライブラリラッパーは対応言語が豊富。

キューは永続化はされない。メモリで処理されるので高速。

ライセンスLGPL


こちらのエントリが詳しい

ØMQ(zeromq)について調査する。


JeroMQはZeroMQがC/C++で書かれてるのに対してJavaで書かれている。

libzmq 3.2.2に基づいた0.3.0-SNAPSHOTを今回は使用する。

ZeroMQと違いJava実装なのでJVMjarがあれば動いてしまう手軽さが良い。

またパフォーマンスも頑張っている。ライセンスLGPL


f:id:bose999:20130415223348p:image


では上記のような

Client REQUEST <-> ROUTER Broker DEALER <-> REPLY Worker

という1対Nの構成をJeroMQで実装してみる。


通信はMessagePackで文字列シリアライズして渡し、

受け取り側でデシリアライズして文字列に戻す。


pom.xml

<?xml version="1.0" encoding="UTF-16"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>jp.teche</groupId>
	<artifactId>jeromq.sample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>JeroMQ-Sample</name>
	<description />

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<repositories>
		<repository>
			<id>msgpack.org</id>
			<name>MessagePack Repository for Maven</name>
			<url>http://msgpack.org/maven2</url>
		</repository>
		<repository>
			<id>typesafe.com</id>
			<name>typesafe.com</name>
			<url>http://repo.typesafe.com/typesafe/repo</url>
		</repository>
	</repositories>

	<dependencies>
		<dependency>
			<groupId>org.msgpack</groupId>
			<artifactId>msgpack</artifactId>
			<version>0.6.7</version>
		</dependency>
		<dependency>
			<groupId>org.jeromq</groupId>
			<artifactId>jeromq</artifactId>
			<version>0.3.0-SNAPSHOT</version>
		</dependency>
	</dependencies>
	<plugins>
		<plugin>
			<inherited>true</inherited>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<configuration>
				<source>1.7</source>
				<target>1.7</target>
				<optimize>true</optimize>
				<debug>true</debug>
			</configuration>
		</plugin>
	</plugins>
</project>
package jp.techie.jeromq.sample.one2many;

import java.io.IOException;

import org.msgpack.MessagePack;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Client
 * @author bose999
 * 
 */
public class ClientMaker {

	 /**
     * Clientの生成と100回キューのやり取りをする
     * @param args none
     */
	public static void main(String[] args) {
		Context context = ZMQ.context(1);
		Socket request= context.socket(ZMQ.REQ);
		String requestAddress = "tcp://127.0.0.1:9001";
		request.connect(requestAddress);
		System.out.println("client - requestAddress:" + requestAddress);
		for (int i = 0; i < 100; i++) {
			long startTime = System.nanoTime();
			MessagePack msgpack = new MessagePack();
			String sendString = "Hello ( requestAddress: " + requestAddress + ")";
			byte[] sendStringBytes;
			try {
				// MessagePackでシリアライズして送る
				sendStringBytes = msgpack.write(sendString);
				request.send(sendStringBytes, 0);
				long endTime = System.nanoTime();
				
				// MessagePackで返答を文字列にデシリアライズ
				byte[] replyBytes = request.recv(0);
				String reply = msgpack.read(replyBytes, String.class);
				long executeTime = endTime - startTime;
				System.out.println(
						"Received reply ( requestAddress:" + requestAddress + ") forCount:" + i
						+ " [" + reply + "] " + executeTime + "ns"
				        );
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		request.close();
		context.term();
	}
}
package jp.techie.jeromq.sample.one2many;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Brocker
 * @author bose999
 *
 */
public class BrockerMaker{

    /**
     * Brokerの生成とポーリング実行
     * @param args none
     */
    public static void main (String[] args) {
    	Context context = ZMQ.context(1);
        Socket router = context.socket(ZMQ.ROUTER);
		String routerAddress = "tcp://127.0.0.1:9001";
		System.out.println("broker - routerAddress:" + routerAddress);
		router.bind(routerAddress);
		
		Socket dealer = context.socket(ZMQ.DEALER);
		String dealerAddressHead = "tcp://127.0.0.1:";
        int dealerPort = 10001;
        
		for (int i = 0; i < 30; i++){
	        	String backEndAddress = dealerAddressHead + dealerPort;
	        	dealer.bind(backEndAddress);
	        	System.out.println("broker - backEndAddress:" + backEndAddress);
	        	dealerPort++;
	    }
		
        Poller items = new Poller (2);
        items.register(router, Poller.POLLIN);
        items.register(dealer, Poller.POLLIN);

        boolean more = false;
        byte[] message;

        while (!Thread.currentThread().isInterrupted()) {            
            items.poll();
            if (items.pollin(0)) {
                while (true) {
                    message = router.recv(0);
                    more = router.hasReceiveMore();
                    dealer.send(message, more ? ZMQ.SNDMORE : 0);
                    if(!more){
                        break;
                    }
                }
            }
            if (items.pollin(1)) {
                while (true) {
                    message = dealer.recv(0);
                    more = dealer.hasReceiveMore();
                    router.send(message,  more ? ZMQ.SNDMORE : 0);
                    if(!more){
                        break;
                    }
                }
            }
        }
        router.close();
        System.out.println("broker - close router.");	
        dealer.close();
        System.out.println("broker - close dealer.");	
    }
}
package jp.techie.jeromq.sample.one2many;

import java.io.IOException;

import org.msgpack.MessagePack;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Worker
 * @author bose999
 *
 */
public class WorkerMaker {
	
	/**
     * Brokerの生成とポーリング実行
     * @param arg
     */
	public static void main (String[] args) {
        String replyAddressHead = "tcp://127.0.0.1:";
        int replyPort = 10001;        
        for (int i = 0; i < 30; i++){
        	// 30スレッド作ってポートを一意にしてWorkerとして処理を行わせる
        	WorkerExecuter workerExecuter = new WorkerExecuter();
        	String replyAddress = replyAddressHead + replyPort;
        	workerExecuter.setReplyAddress(replyAddress);
        	new Thread(workerExecuter).start();
        	replyPort++;
        }
    }
	
    /**
     * Worker複数スレッド実行
     * @author bose999
     *
     */
    private static class WorkerExecuter implements Runnable{
    	
    	/**
    	 * Worker TCP Address
    	 */
    	private String replyAddress = null;
    	    	
    	/**
    	 * setter
    	 * @param replyAddress
    	 */
    	public void setReplyAddress(String replyAddress){
    		this.replyAddress = replyAddress;
    	}
    	
		/* (non-Javadoc)
		 * @see java.lang.Runnable#run()
		 */
		public void run() {
			System.out.println("worker - replyAddress:" + this.replyAddress);
			Context context = ZMQ.context (1);
	        Socket responder = context.socket (ZMQ.REP);
	        responder.connect (this.replyAddress);
	        System.out.println("worker - replyAddress:" + this.replyAddress);
	        
	        while (!Thread.currentThread ().isInterrupted ()) {
	        	// clientから受信したMessagePackシリアライズ
	        	byte[] recvBytes = responder.recv(0);
	        	MessagePack msgpack = new MessagePack();
	            String recvString = "";
				try {
					// clientからbyte配列を受信してMessagePackでデシリアライズ
					recvString = msgpack.read(recvBytes, String.class);
					System.out.println(
							"worker - Received request ( replyAddress: " + this.replyAddress + "): " +
					         "[" + recvString + "]"
					         );
		            // clientに文字列をMessagePackでシリアライズして送信
					String sendString = recvString + " World ( replyAddress: " + this.replyAddress + ")";
					byte[] sendStringBytes = msgpack.write(sendString);
		            responder.send(sendStringBytes,0);
				} catch (IOException e) {
					e.printStackTrace();
				}
	        }
	        responder.close();
	        context.term();
	        System.out.println("worker - socket close ( replyAddress: " + this.replyAddress + ")");
	    }
    }
}

2012-10-16

2012-08-08

Mountain Lionへのログイン時にシェルを実行する

こんなかんじでシェル名を登録してやります。

rootユーザ権限で動くので処理記述内容は気をつけないといけません。

なので中身はsudo -u bose999 xxx.sh とかしたりしてます。


起動設定と確認方法

% sudo defaults write com.apple.loginwindow LoginHook /Library/Scripts/LoginHook.sh
% sudo defaults read com.apple.loginwindow LoginHook
/Library/Scripts/LoginHook.sh

起動設定を消す方法

% sudo defaults delete com.apple.loginwindow LoginHook

2012-08-06

cd移動 を easyに z.sh de GO!

インストールMacHomebrewだと下記で終わり。

% brew install z
% vi $HOME/.zshrc

.zshrc に追記するもの

.`brew --prefix`/etc/profile.d/z.sh
function precmd () {
  z --add "$(pwd -P)"
}

これで設定が完了。

シェルを使っているうちに $HOME/.z に

自分が使用したディレクトリが記録されて優先度等が決まります。


このデータを元を確認すると下記のようにzコマンド 引数なしで

自分が使用した数から決まった優先度が確認できます。

% z
8          /Users/bose999/Downloads
8          /Users/bose999/Pictures
8          /Users/bose999/bin
12         /usr/local/Library/Formula
16         /Applications
16         /Users/bose999/Desktop
16         /Users/bose999/data
20         /Users/bose999/Documents
24         /usr/local/Cellar
32         /usr/local
44         /usr/local/bin

どういう風に使うかという例

% z u l
% pwd
/usr/local
% z u l b
% pwd
/usr/local/bin
% z Ce
% pwd
/usr/local/Cellar
% z doc
% pwd
/Users/bose999/Documents
% z des
% pwd
/Users/bose999/Desktop

2012-08-05

HomebrewでMoutain Lionに入っていたものと別のvimをインストールする

すでにvimはMoutain Lionに入ってるのですが、

-clipboard なのでHomebrewインストールして

+clipboard にしてyankした文字列クリップボードに入るvim

インストールします。


また、macvimをCUIから使うという方法もありますが、

私のvimrcだと画面表示が崩れたりしたのでCUIはあえて

vimを入れて安定させました。


下記の手順で、vimのFormulaのRubyのパスを変更してますが、

コンパイルでエラーになったのでOSに入っているRubyから

Homebrewで入れたRubyにパスを変更しました。


brew tap homebrew/dupes というコマンドを

実行しているのは既にOSインストールされているものをあえて

HomebrewからインストールするためのFormulaを追加する為です。

% brew tap homebrew/dupes 
% brew install mercurial
% vi /usr/local/Library/Formula/vim
% brew install homebrew/dupes/vim

/usr/local/Library/Formula/vim オリジナル

"--with-ruby-command=/usr/bin/ruby",

/usr/local/Library/Formula/vim 変更後

"--with-ruby-command=/usr/local/bin/ruby",

Homebrewでバージョンの違うSubversionをインストールして使い分ける

まずは下記のような手順でインストールします。

brew tap homebrew/versions で通常提供されてるFormulaと

 別のバージョンのFormulaを入手しています。

% brew install subversion
% brew unlink subversion
% brew tap homebrew/versions
% brew link gettext
% brew install subversion16
% brew unlink subversion16
% brew unlink gettext

subversion1.6系を使う際は下記を実行します。

% brew unlink subversion
% brew link subversion16
% /usr/local/bin/svn --version 
svn, バージョン 1.6.17 (r1128011)
   コンパイル日時: Aug  5 2012, 06:19:45

Copyright (C) 2000-2009 CollabNet.
Subversion is open source software, see http://subversion.apache.org/
This product includes software developed by CollabNet (http://www.Collab.Net/).

以下のリポジトリアクセス (RA) モジュールが利用できます:

* ra_neon : Neon を利用して WebDAV (DeltaV) プロトコルでリポジトリにアクセスするモジュール。
  - 'http' スキームを操作します
  - 'https' スキームを操作します
* ra_svn : svn ネットワークプロトコルを使ってリポジトリにアクセスするモジュール。
  - Cyrus SASL 認証を併用
  - 'svn' スキームを操作します
* ra_local : ローカルディスク上のリポジトリにアクセスするモジュール。
  - 'file' スキームを操作します

subversion1.7系を使う際は下記を実行します。

% brew unlink subversion16
% brew link subversion
% /usr/local/bin/svn --version 
svn, バージョン 1.7.5 (r1336830)
   コンパイル日時: Aug  5 2012, 05:24:33

Copyright (C) 2012 The Apache Software Foundation.
This software consists of contributions made by many people; see the NOTICE
file for more information.
Subversion is open source software, see http://subversion.apache.org/

以下のリポジトリアクセス (RA) モジュールが利用できます:

* ra_neon : Neon を利用して WebDAV (DeltaV) プロトコルでリポジトリにアクセスするモジュール。
  - 'http' スキームを操作します
  - 'https' スキームを操作します
* ra_svn : svn ネットワークプロトコルを使ってリポジトリにアクセスするモジュール。
  - Cyrus SASL 認証を併用
  - 'svn' スキームを操作します
* ra_local : ローカルディスク上のリポジトリにアクセスするモジュール。
  - 'file' スキームを操作します
* ra_serf : serf を利用して WebDAV (DeltaV) プロトコルでリポジトリにアクセスするモジュール。
  - 'http' スキームを操作します
  - 'https' スキームを操作します