Setsuna-0.0.2リリース~

SetsunaのVersion-0.0.2をリリースしました。


Github
Sourceforge.jpダウンロードページ


今回のリリースで一番大きな追加機能はServerモードの追加です。
0.0.1の時は、Setsunaにデータを入れる方法はパイプ入力しかなかったので、
コマンドラインで何かの出力をパイプでSetsunaに繋ぐしかなかったので、
Setsunaが動いているサーバ以外からデータをSetsunaに流せなかった
のが不便でした。そこでNetWork越しにデータを流せるようにこの機能を追加しました。


起動方法は簡単で"-server true"という表記を付けるだけです。ただServerモードの
場合はsetsuna.jar内に含まれないライブラリにも依存しているので従来の-jar指定での
起動は出来なくなっています。以下が起動サンプルです。


java -classpath ./:./lib/msgpack/*:setsuna.jar setsuna.core.SetsunaMain -server true
(※リリース物のsetsuna.jarがあるディレクトリで実行している想定です)
これでPort番号10028番でサーバで待ち受けています。
※起動ポート番号を変更する場合は"-bindport"オプションで変更できます。


で、今回Serverモードを実装するにあたってどのようなプロトコルと仕組みにしようか
悩んだのですが、独自に実装するのは止めて、MessagePack-RPCを利用しました。
MessagePack-RPCとても高速でありかつ、各言語のバインディングが作成されているため、
言語別でクライアントを別途作成する必要がないため、採用させていただきました。
MessagePack-RPCの詳しい情報はこちら


そして、サーバ側に定義されているリモートメソッドは以下になります。

int next (String[] sendData)
凄く単純にしました。パイプ入力をメソッド化したイメージです。
引数はデータをカラム単位で分解した後のデータを配列にして渡します。
つまり以下のPingの結果の場合

192.168.11.1 からの応答: バイト数 =32 時間 =16ms TTL=64
半角スペースでばらしてSetsunaに送り込みたい場合以下のような配列を渡すことになります。

String[] sendData = new String[7];
sendData[0] = "192.168.11.1";
sendData[1] = "からの応答:";
sendData[2] = "バイト数";
sendData[3] = "=32";
sendData[4] = "時間";
sendData[5] = "=16ms";
sendData[6] = "TTL=64";
今回も前回のSetsunaと同じで、Setsuna起動時に-columnオプションでカラム定義を決めれば
その定義に従って内部DBにテーブルが作られます。もし-columnを指定しなければ、最初に送られて
きたデータをもとに、テーブルが作られます。基本的に、入り口がパイプからMessagePackのサーバに
変わっただけで後の機能は全て同じ動きをします。


そして、サーバからの戻り値は3種類あります。

                                                                                        • -

0=データ入力成功
-9=カラム定義と送られてきたデータが合っていない
-1=サーバ側でエラー発生

                                                                                        • -

この戻り値を見て以降のクライアントでなんだかの処理をしてもらえればと思います。
実際にこれを送り込むJavaでのクライアント実装のサンプルはリリース物の
test/ServerClientSampleディレクトリのSetsunaServerModeClientSample.javaになります。
コードはこんな感じです。


1
2 import org.msgpack.rpc.Client;
3 import org.msgpack.rpc.loop.EventLoop;
4
5 /**
6 * Setsunaを"-server true"にて立ち上げた場合にデータを投入する

7 * Clientのサンプル.

8 * SetsunaのServerモードはMessagePack-RPCで出来ているため、

9 * ClientもMessagePack-RPCで作成する必要がある。

10 * Setsuna側には以下のメソッドが定義されているので

11 * それをに実装する.

12 * -- 定義メソッド ----------

13 * int next (String[] data)

14 * --------------------------

15 * 引数は1レコード当たりのデータを表す配列

16 * 戻り値は呼び出しの成否(0=正常終了, -9=カラム定義と合わない , -1=内部エラー)

17 *
18 * @author T.Okuyama
19 */
20 public class SetsunaServerModeClientSample {
21
22 /**
23 * SetsunaをRPC越しに呼び出し
24 * 渡す値はカラム数分のString値の配列.一貫して同じ長さ分を送ること
25 * 戻り値:正常終了 0, カラム定義と合わない -9, 内部エラー -1
26 */
27 public static interface RPCInterface {
28 /**
29 *
30 * @param data カラムデータの配列。必ずカラム数分必要
31 * @return int 結果:正常終了=0, カラム定義と合わない=-9, 内部エラー=-1
32 */
33 int next(String[] data);
34 }
35
36 public static void main(String[] args) throws Exception {
37 EventLoop loop = EventLoop.defaultEventLoop();
38
39 Client cli = new Client("localhost", 10028, loop);
40 RPCInterface iface = cli.proxy(RPCInterface.class);
41
42 String[] list = new String[3];
43 list[0] = "aa";
44 list[1] = "bb";
45 list[2] = "cc";
46 for (int i = 0; i < 1000; i++) {
47 // データを登録する
48 System.out.println(iface.next(list));
49 }
50 cli.close();
51 loop.shutdown();
52 }
53 }
このサンプルクライアントでポイントはまず、27行目〜34行目のリモートメソッドの定義で
上に示したサーバ側の定義をインプリしてます。そして37行目でEventLoopクラスを作成し39行目でクライアントを
Setsunaのアドレスとポートと先程のEventLoopを指定して作成。
そして次の40行目で、リモートメソッドを設定。
42行目〜45行目でSetsunaに渡したいデータを作成して、48行目でリモートメソッドを呼び出しています。


見てわかる通りMessagePack-RPCのおかげで非常に簡潔にかけます。
他言語のバインディングも本家サイトに出ているので、基本同じ要領だと思います。
これで他のサーバからデータが流せるので、イベント処理を集約出来ると思います。
ただ1台に集約するとそのサーバがこけたらどうするなどは今後の課題です。




この他の追加機能ですが、主にデバッグ系の機能と取り込みデータを
データベースにマッピングする辺りを強化しました。
デバッグ&Setsuna自身のログ機能
"-debug"オプション
前のバージョンではSetsunaにどのようなデータがAdapterで送り込まれ、どのようなTriggerが動き、どのようなSQL
検証用に流れどのようなUserEventが実行されたか分かりませんでした。さすがにSetsunaを利用して開発するうえで
これでは厳しいので、デバッグログを出せるようにしました。
"-debug"というオプション設定で出力されます。実際に指定すると以下のようになります。

ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -debug on
指定してるは最後の"-debug on"という部分です。これでコンソールに標準入力から送り込まれたデータがデバッグされます。
以下のような感じになります。

Debug : Sat Mar 24 18:02:50 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.237 ms]
{"COLUMN0":"64","COLUMN1":"bytes","COLUMN2":"from","COLUMN3":"192.168.1.1:","COLUMN4":"icmp_seq=1","COLUMN5":"ttl=64","COLUMN6":"time=0.237","COLUMN7":"ms"}
Debug : Sat Mar 24 18:02:50 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.297 ms]
{"COLUMN0":"64","COLUMN1":"bytes","COLUMN2":"from","COLUMN3":"192.168.1.1:","COLUMN4":"icmp_seq=2","COLUMN5":"ttl=64","COLUMN6":"time=0.297","COLUMN7":"ms"}
Debug : Sat Mar 24 18:02:51 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=3 ttl=64 time=0.245 ms]
デバッグで出力される文字列は先頭が"Debug"という文字始まる行が実際に送り込まれたデータ文字列です。
その他の"-trigger"、"-query"、"-event"、"-eventquery"で指定した内容や実行した内容も同じように
"Debug"が先頭について出力されます。
ただ、見ても分かるようにデバッグ以外の出力も混じっているので非常に見辛いです。
これを改善するのが、以下の指定です。

ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -debug only
"-debug only"となっているのがその指定です。さきほどは"on"だった部分を"only"とすることで、以下のようになります。

Debug : Sat Mar 24 18:29:20 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.263 ms]
Debug : Sat Mar 24 18:29:20 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.243 ms]
Debug : Sat Mar 24 18:29:21 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=3 ttl=64 time=0.229 ms]
デバッグのみの出力になっているのがわかってもらえると思います。


"-errorlog"オプション
このオプションはSetsunaが吐き出すエラー内容をファイルに出力するためのものです。
従来はコンソールにしか出なかったので、それをファイルに出力出来るようにしました。
以下のように指定します。

ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -errorlog /var/log/setsuna_error
これで"/var/log/setsuna_error"にエラー出力が出るようになります。


インプットデータの読み込み開始レコード指定&カラム定義違いのデータを送り込んだ際に無視する指定
"-offsetオプション"
Setsunaにインプットされるデータを内部DBに取り込みを開始する位置をレコード数で設定できます。
設定は以下のようになります。このオプションは必ず"-column"オプションを指定する必要があります。

ping -i 1 -c 1000 192.168.1.1 | java -jar setsuna.jar -column "col1 col2 col3 col4 col5 col6 col7 col8" -offset 2
"-offset 2"の部分の指定です。これで、インプットされるデータの2レコード目から取り込まれます。
1行目はとりこまれず無視されます。
利用される想定は以下のようにpingのデータをSetsunaに送り込みたい場合

[setsuna]$ ping -i 1 -c 1000 192.168.1.1
PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data.
64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.277 ms
64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.278 ms
1行目のデータのフォーマットと2行目が異なるためそのまま取り込むとExceptionが発行されます。
これは"-column"指定で定義したカラム定義が8カラムあるのに対し1行目の
"PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data."をデフォルトの半角スペースで分解すると
以下のように7カラムしかないためです。

col1="PING"
col2="192.168.1.1"
col3="(192.168.1.1)"
col4="56(84)"
col5="bytes"
col6="of"
col7="data."
col8=データがない!!
このような場合に従来はgrepなどでSetsunaにデータを送り込む前に、不要なデータは省いてもらってたんですが、
このオプションを使うことで取り込み開始位置を調整することで、入力データのヘッダーをとばしたり出来ます。


"-skiperror"オプション
"-offset"は取り込み開始位置を調整できますが、取り込みを開始した出してから定義違いのデータがきた場合には
やはりExceptionが発生します。そこで"-skiperror"オプションを指定すると、途中に定義違いのデータが来た
場合にそのデータを取り込まずに無視します。以下のように指定します。

ping -i 1 -c 1000 192.168.1.1 | java -jar setsuna.jar -column "col1 col2 col3 col4 col5 col6 col7 col8" -skiperror true
最後の"-skiperror true"です。この指定は-columnの指定は必須ではありません。"-column"を指定しない場合は、
最初のレコードから自動的にカラム定義が作成されます。




以上が今回の追加機能です。これ以外にはいくつかの効率化とバグFixをしています。
追加機能も含めて全ての機能は"-help"オプションで確認してもらえます。
まだまだ、発展途上のプロジェクトなので、気軽にご意見いただけるとありがたいです!!
さあ、次は分散化でも考え始めるかな〜。

ComplexEventProcessingエンジン Setsuna をリリースしました!!

やっとリリースできました!

Sourceforge.jpダウンロードページ


名前はSetsuna(セツナ)としました。
時間の単位を表す言葉である刹那からきています。
刹那の短さでデータを処理出来ればいいなということでこの名前にしました。




このSetsunaですが、CEPといわれるカテゴリに属するものです。
ではCEPとは??なんですが、これは語源そのままです。


Complex Event Processing


複数のイベントつまりデータを処理するエンジンです。
イメージとしてはデータの流れに直接処理を行えるイメージです。




あまり文章ばかりでもつまらないので、実際Setsunaを使いながら説明していきたいと思います。
では題材はこんなのにしたいと思います。
Apacheが重い!! ひょっとして攻撃されてる?!」


すいません、わけわからんタイトルですね。。
やりたいことは、Webサーバの負荷が一定値を超えた場合に、Apacheアクセスログの直近1万件以内からの
IPアドレス別でアクセス回数を合計しその合計値が、規定回数を超えていた場合に、アラート内容をSystemログに
出力するというものです。


ではこの処理をどのようにするか順を追って見ていきたいと思います。
※環境はCentOS5.5(64bit)前提として進めます。


Step1.サーバの負荷を監視する
Step1:まずはサーバの負荷を監視しないとダメです。
ここではtopコマンドのロードアベレージを負荷の指標に利用したいと思います。


$top -d 1
結果は以下のような感じです。

top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
Tasks: 78 total, 1 running, 77 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.5%us, 3.0%sy, 0.0%ni, 93.9%id, 0.2%wa, 0.1%hi, 0.3%si, 0.0%st
Mem: 793284k total, 180116k used, 613168k free, 12272k buffers
Swap: 1048568k total, 0k used, 1048568k free, 130704k cached

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1 root 15 0 2072 636 544 S 0.0 0.1 0:00.72 init
2 root RT -5 0 0 0 S 0.0 0.0 0:00.00 migration/0
               ・
               ・
               ・

では早速これをSetsunaに流し込みたいと思います。
Setsunaは標準でパイプで繋ぐことでデータを流し込むことが可能です。
Setsunaをダウンダウンロードしてもらって適当な場所に解凍してもらうと
中にsetsuna.jarというファイルがあると思います。それが本体です。


実行可能jarなので、javaが動く環境であればこのファイル以外は何も必要ありません。

$top -b -d 1 | java -jar setsuna.jar
上記を実行すると以下のようなエラーが出ると思います。

setsuna.core.util.SetsunaException: The read data differs from definition information. Adapter name is PIPE
at setsuna.core.adapter.SetsunaCoreAdapterEngine.doEvent(SetsunaCoreAdapterEngine.java:67)
at setsuna.core.AbstractCoreEngine.run(AbstractCoreEngine.java:54)
これは、Setsunaの自動テーブルマッピングの機能がこけています。つまりSetsunaまでは
topの結果は届いています。ではどういったエラーか??
 それは、1行目のデータと2行目のデータの形式が違いますよということを言っています。
Setsunaはパイプライン入力で渡されたデータを内蔵するH2Databseに自動的に格納します。
そのため、格納するためにデータをレコードとして認識しかつ、カラムに分解しようとします。
Setsunaはデフォルトではデータ(レコード)の区切りを改行とし、カラムレベルへの分解に
半角スペースを利用します。(2つ以上続くスペースは自動的に1つにトリムされます)
topの結果をもう一度みてみると、


1: top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
2: Tasks: 78 total, 1 running, 77 sleeping, 0 stopped, 0 zombie
行番号1と2では全くフォーマットが異なります。フォーマットとはスペースで分解した際の分解できた数です。
そこでロードアベレージを含む1行目だけをSetsunaに送り込んでみましょう。これにはgrepを利用します。
そしてこのデータの流れに名前を付けたいと思います。-streamオプションを利用すると名前が付けれます。

$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top


Exceptionが出なくなり変わりに以下のような結果になっていないでしょうか?

{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:59","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:00","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:01","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}

これはSetsunaが取り込んだデータをJSONフォーマットで
カラム情報とセットで出力しています。
Setsunaはパイプラインでデータを与えるだけの場合このような動きになります。
ではそれぞれ見ていきましょう。


topのロードアベレージの行

top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
Setsunaの結果

{
"COLUMN0":"top",
"COLUMN1":"-",
"COLUMN2":"17:01:58",
"COLUMN3":"up",
"COLUMN4":"23",
"COLUMN5":"min,",
"COLUMN6":"2",
"COLUMN7":"users,",
"COLUMN8":"load",
"COLUMN9":"average:",
"COLUMN10":"0.20,",
"COLUMN11":"0.12,",
"COLUMN12":"0.09"
}
マッピング出来ていますね。
目当てのロードアベレージの値は"COLUMN10"の値です。
これでSetsunaへのデータの流し込みは完了です。
この流し込む処理の部分はAdapterと呼んでいます。




Step2
では次に、もう一つのデータであるApacheアクセスログを流し込みたいと思います。
このデータはアクセスIPの抽出に利用したいと思います。
先ほどと同じ手順でSetsunaにデータを流し込みます。
このデータの流れにはlogという名前を付けます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log
今回は先ほどと違い、-sepという項目がついています。
これは、レコードデータをカラムに分解する際のセパレータを指定しています。
僕の環境のアクセスログは以下のようなフォーマットでした。

192.168.1.100 - - [15/Mar/2012:17:29:52 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11"
ここからIPだけを取り出したいので、データを分解するセパレータに" - -"という文字列を
利用しました。これで先頭のIP部分の直後で分解できます。
そして実際にSetsunaに流しこんだ結果が下です。


{"COLUMN0":"192.168.1.100","COLUMN1":" [15/Mar/2012:17:46:45 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11""}

"COLUMN0"にIP情報がマッピングされているのがわかります。




Step3
ここまではデータをSetsunaに流し込むだけでした、ここからは実際にデータを処理して負荷を検知してみます。
負荷の検知には先ほどのtopのデータの流れのロードアベレージの値を使いたいと思います。
そして、負荷を判定するための条件を設定したいと思います。それには-triggerオプションを利用します。
そして、より複雑な条件であるアクセス頻度の高いIPを割り出すために、-queryオプションを使い、
SQLにて、アクセスログのデータ内から基底回数を超えるアクセスを行っているIPがあった場合に、
Systemログにアラーを書きだそうと思います。


まず、先ほどのデータをクリアするために、2つのSetsunaプロセスを止めてください。Crtl+Cなどで強制終了でOKです。
そしてまずアクセスログを流し込みます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log
そして、topの結果を流し込むんですが、その前に、-triggerを作りたいと思います。
triggerではロードアベレージが規定値を超えていないかをチェックさせます。

  • trigger "COLUMN10 > 1"

ここで利用するのは先ほどの流しことんだデータを参考にCOLUMN10を指定しています。
これは直近のロードアベレージの値です。そして、'>'という条件記号を使って、
'1'とい数値以上を指定しています。
ここで注目してもらいたいのは、上の分解したtopのCOLUMN10の値です。

"COLUMN10":"0.20,"
COLUMN10の値は数値の後ろに','が入っています。この文字である"0.20,"を数値変換すると
当然最後の','のせいで変換エラーになってしまいます。
しかし、Setsunaの持つ'>'や'<'条件記号はこういった値を可能な限り数値化します。
つまり最後の','は外して数値に変換してから比べます。
これは、対象データの中から数値を最初に見つけたところから数値ではなくなるまでの間の
値を数値として認識するように出来ているからです。なので、"Count=10"みたいな値で
あっても10と認識することができます。
このほかに条件記号は完全一致を指定する'='や、部分一致を指定する'like'が存在します。
ではこれでtriggerは完成です。


次にアクセス頻度の高いIPを見つける-queryです。これは完全にSQLで記述します。
既に実行中のアクセスログのストリームを利用したいと思います。
アクセス頻度の高いIPを抽出するクエリだけ記述します。

select
count(*)
from
(select count(column0) as gcnt,
column0
from
log \ // 20120321:修正
group by column0)
where (gcnt*3) < (select
avg(cnt)
from (select
count(column0) as cnt,
column0
from
log
where
C_TIME > DATEADD(SECOND, -60, current_timestamp)
group by column0)
);
これで直近の60秒のログからアクセスしてきたIPアクセス回数の平均を
大きく超える(3倍)IPが存在しているかをカウントいてます。
ここで注目していただもらいたいのは、下から3行目のC_TIMEという項目です。
これはSetsunaがデータをテーブルに格納する際に自動的に作成しているカラムです。
このカラムには、データを投入した日時がTimestamp型で登録されます。
なので、このように現在時刻から60秒前のような指定が可能になっています。
"C_TIME > DATEADD(SECOND, -60, current_timestamp)"の部分。
このほかにlong型のシーケンス値をもっています。これは0始まりのシーケンス値
なので、単純にデータをソートする際はそちらが便利です。
この条件指定の部分をQueryと呼んでいます。




さて-tirggerと-queryでデータをハンドリング出来れば後は、UserEventと言う最後の部分です。
ここは、ユーザが自由に定義した処理を指定する部分です。
今回の例ではloggerと言われるsyslogにログを出力する標準のライブラリを使いたいと思います。
UserEventを利用する場合は-eventと記述し、それ以降に実行したコマンドを記述します。
以下のようになります。

-event "logger Warning!! System busy"
そして今までの全てを繋ぐと以下のようになります。



$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top\
-trigger "COLUMN10 > 1" \
-query "select \
* \
from \
(select count(column0) as gcnt,\
column0 \
from \
log \ // 20120321:修正
group by column0) \
where (gcnt*3) < (select \
avg(cnt) \
from (select \
count(column0) as cnt,\
column0\
from \
log\
where\
C_TIME > DATEADD(SECOND, -60, current_timestamp)\
group by column0)"\
-event "logger Warning!! System busy"
これでtopのロードアベレージ値が1以上の場合に、直近60秒間のアクセスの間に異常に
アクセス数が多いIPアドレスを見つけてsyslogにワーニングを出力できます。


※2012/03/20 追記

  • eventで指定したコマンドには引数としてこのUserEventを実行した引き金となる

Adapterからのデータが1レコードが渡されるので第一引数で取得して使えます。
こんな感じです。


-event "logger "
実際には下のようになります。

logger "{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}"

1レコードではなく、SQLなどでデータを取得して、それを利用したい場合は

  • eventqueryを使います。

これはUserEventでSQLを実行するもので書き方は-queryと全く同じです。
ただ結果が標準出力に出力されるので、Setsunaそのものをさらに別のコマンドにパイプで繋いで、-eventqueryの結果を利用出来ます。
バージョン0.0.1では-eventか、-eventqueryのどちらかしか指定出来ませんが
次のバージョンで、このクエリの結果を-eventで指定したコマンドにも渡せる様にする予定です。




このように、複数のデータをSetsunaに流しておけばそれらを複合してデータの変化を掴むことが
出来ます。ユーザイベントも指定は自由なので、メールを出したり、今回の例で言うとiptablesなどで
IPを遮断することもできます。
ただし、データベースではないためデータを蓄積することはできません。SQLでデータが引けるため
データは蓄積されるように見えますが、自動的に古いデータから消えていきます。(デフォルト10分前)
これは大量のインプットデータに高速に処理を行うために内臓DBがインメモリであるためです。
あくまで処理を行うエンジンというところですね。




現在はパイプのインプットしかないので今後はサーバ型の開発を行い、外部からNW越しにデータを
流せるようにする予定です。ログコレクタも最近は熱いので、それらともつなげたいです(fluentdとか良いですね!!)。

Complex Event Processing Engineをつくりはじめた

久しぶりの投稿だ。。




今まではokuyamaメインでOSS活動をしていたけども、新たにプロダクトを始めました。
新しいプロダクトはCEPと言われる種類のソフトウェアです。
そもそも、CEPってなんだ??
Complex Event Processingの略になります。
訳すと、複合イベント処理です。
でっ、何が出来るかなんですが


リアルタイム処理が得意なエンジンです。
これだけではピンとこないので、少し例を考えてみます。


例1)
あるサーバの監視がしたいとします。
監視には馴染みのsarや、topを使うと便利ですよね。
例えばtopを-bなどの標準出力に結果を書き出すモードで動かして、
これをパイプでgrepにつないでロードアベレージ
部分を取り出して、1を超えてたらメールを投げたい。こんな場合どう作りましょう?
ロードアベレージを取り出すまではこのままで良さそうです。
じゃあこれをファイルにリダイレクトして、それを監視しするスクリプトを書きましょうか。
結構面倒です。それに発見までにタイムラグがありそうです。


これを整理すると、
ロードアベレージというデータの流れに
条件検索を行って、
結果次第で処理をしています。


では、次は例2)
例1に別の要素を加えたらどうでしょう?
ロードアベレージが規定値を越えただけでメールが着たら夜寝れないので、
もう少し監視を賢くしたいです。
たとえばロードアベレージが規定値を超えてる場合にすぐメールを送るのではなく、
サイトの応答速度が3秒を越えてる場合にだけおくりたい。
または、I/O-Waitが30% を越えてる場合にメールを送りたい。
考え出すとキリがありません。


しかもロードアベレージがヤバくなってからサイトの応答速度を計測してたのでは遅そうです。
この処理は先ほどのロードアベレージだけではなくサイトの応答時間や、I/O-Waitなど、
ロードアベレージがこうなって、応答時間がこうなった場合という複数の条件検索をおこなって
満たした場合に処理をおこなっています。




こういった複数の条件を満たした場合を検知するには結構な仕組みが必要だと思います
しかも条件をみたしたと同時に処理をすぐに行わなければならない。




そこでCEPです。
CEPはリアルタイムに複数のデータに横断的に条件マッチングを行ってそれに
その結果次第でイベントを発生させることに特化しています。
okuyamaのようにデータをためたりはできません。でも、一定期間のデータを
蓄えることは出来ます。なので、その一定期間に対して処理をすることも
得意です。さっきの例でいうと、今のロードアベレージではなく直近5分のロード
アベレージの平均がこうだったらとか、サイトの応答が3秒以上かかっていることが
5分以内に100回以上あったらなどを見つけることが出来ます。
そしてユーザが作成したイベントを勝手に呼び出してくれます。
そしてなによりこの一連の処理を相当高速におこないます。
なのでアクセスログのような凄い勢いでつくりだされるデータを
リアルタイムに処理することにも向いています。


このあたりのお話と今作っているエンジンのお話を今度
神戸である勉強会でお話する予定です。
http://atnd.org/events/25695




まだリリースはしてませんが、一応以下に実装中のコードがあります。
http://sourceforge.jp/projects/setsuna/

Java7速攻テスト

というわけで、Java7がリリースされました。
色々と面白そうな機能がありますね。


僕はSCTPなどが結構注目なんですが、取り合えずJava7上で現行のokuyamaを動かして、
どれぐら単純比較で性能が違うのか検証してみました。


okuyama側は特にJava7に合わせて変更などはしていないので、リリースVersion-0.8.8のままです。
ちなみに、okuyamaはThread、java.lang系、java.util系、java.io系、java.util.concurrent系などを
いたるところで使っています。
コンパイルのみJdk6とJdk7でそれぞれ実行前に行いました。
okuyamaのモードは永続化用のWALファイルは常に吐き出して、それ後データをメモリに持つモードと、
ファイルに持つモードで計測しました。ファイルに持つ場合はファイルへのI/Oはランダムリードです。
メモリ展開はSeriarizeMapもためしました。この場合はObjectのシリアライズ、デシリアライズ且つ、zip圧縮が行われます。
以下は実行したPCのスペックです。1台で行いました。

                                                              • -

CPU:Intel Corei5 650 3.20GHz
Memory:4GB
HDD:SATA 7200rpm 500GB×1
OS:CentOS5.5 (64bit)

                                                              • -

okuyamaのGCオプションは以下です。
"-Xmx1024m -Xms1024m -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseParNewGC"


テストは同時に8クライアントをスレッドで生成し、1分アクセスをSet、Get共にし続けます。
実際にアクセスを行うクラスは以下です。
http://sourceforge.jp/projects/okuyama/svn/view/trunk/test/ResponseTestThread.java?view=markup&revision=637&root=okuyama
このクラスを下記のクラスで8個インスタンス化してThreadとして実行します。
http://sourceforge.jp/projects/okuyama/svn/view/trunk/test/ResponseTest.java?view=markup&revision=442&root=okuyama




それではそれぞれの結果です。

                                                                                                                                • -

モード:WALファイル+Key&Value共にメモリ

                                                                                                                                • -

●Set
■Java6
60秒 = 1045091
17418 QPS
■Java7
60秒 = 1201714
20028 QPS


●Get
■Java6
60秒 = 1359632
22660 QPS
■Java7
60秒 = 1553264
25887 QPS

                                                                                                                                • -

モード:WALファイル+Key&Value共にメモリ+SerializeMap

                                                                                                                                • -

●Set
■Java6
60秒 = 383142
6385 QPS
■Java7
60秒 = 418954
6982 QPS


●Get
■Java6
60秒 = 708405
11806 QPS
■Java7
60秒 = 799126
13318 QPS



                                                                                                                                • -

モード:WALファイル+Key=メモリ&Value=ファイル

                                                                                                                                • -

●Set
■Java6
60秒 = 837324
13955 QPS
■Java7
60秒 = 939385
15656 QPS


●Get
■Java6
60秒 = 1273324
21222 QPS
■Java7
60秒 = 1423650
23737 QPS




こんな結果になりました。
総じて性能が向上しているのが分かります。
だいたい10%〜15%といったところでしょか。
まだどこがどうなってこういう結果が出ているのか
調査できていないので、そのへんもやりたいですね。
とりあえず速攻テストでした。

「第1回分散処理ワークショップ in Tokyo」を開催しました

7/8に「第1回分散処理ワークショップ」を@doryokujinさんと共同で開催しました。
ATNDリンク
参加いただきました皆様ありがとうございましたm(_  _)m


当日の資料やその他の情報を可能な限りまとめてみます。


@understeerさんのよるToggetter
    http://togetter.com/li/159266


当日のUStream(2元中継でおこないました)
その1
    http://www.ustream.tv/channel/dist-study-1#utm_campaign=togetter.com&utm_source=8743151&utm_medium=social
その2
    http://www.ustream.tv/channel/dist-study-2#utm_campaign=unknown&utm_source=8743162&utm_medium=social


当日のスピーカの方々の資料とUStream録画(当日発表順)(※現時点で私が認識している範囲です)

  ・okuyama - @okuyamaoo
    資料:http://www.slideshare.net/okuyamaoo/20110708-dist-studyokuyama
    Ust:http://www.ustream.tv/recorded/15864685#dist_study
  ・MongoDB - @doryokujinさん
    資料:http://www.slideshare.net/doryokujin/mongodb-replicationshardingmapreduce
    Ust:http://www.ustream.tv/recorded/15864876
  ・Hibari - @tatsuya6502さん
    資料:http://www.slideshare.net/tatsuya6502/hibari-nosql-at-diststudy-20110708
    Ust:http://www.ustream.tv/recorded/15865171
  ・Cassandra - @yukimさん
    資料:http://m7a.me/s/diststudy1/#1
    Ust:http://www.ustream.tv/recorded/15865706
  ・HBase - @ueshinさん
    資料:http://www.slideshare.net/ueshin/20110708-h-base1-in-tokyo
    Ust:http://www.ustream.tv/recorded/15866025
  ・MesgpackとFluentとKumofsについて - @frsyuki
    Ust:http://www.ustream.tv/recorded/15866352




当日は@ueshinさんと@frsyukiさんのお二人から飛び込みで発表頂きました。ありがとうございました。
また、会場をご提供頂きそして、運営を強力にバックアップいただきました、ニフティ
UStream、togetterでの情報まとめや、受付、質問タイムの運営などを強力にバックアップいただいた、
@understeerさんをはじめてする皆様、本当にありがとうございました。
そして@doryokujinさん頼りっぱなしになってすいませんでした。


今回はどちらかというとNOSQLのイントロダクションとなりましたが、一部で深い話も出掛かっていましたので
次回はNOSQLを使ったブッチャけ開発トークや、ブッチャけ運用トークなどが出来れば面白いかなとも思っています。
僕自身okuyamaを実装してはいますが、それを使ったプロダクトの開発や、運用ではまだまだ迷うことも多いので
NOSQLの実システム適応や運用はまだまだ試行錯誤のフェーズなのかなとも考えています。
そのあたりをパート別で座談会出来ても面白いかなと思っています。
このへんは@doryokujinさんと反省会をして考える予定です。
関東ではこのままの勢いで近いうちに第2回が出来れば良いかなーと考えています。
後は、僕の地元でもある関西でも是非したいなーと思っています。
関東、関西両方に言えることなんですが、「こんなこと議論したいよ」などあれば@okuyamaoo
メンション頂ければと思います。

okuyama-0.8.8リリースまとめ

okuyamaのバージョン0.8.8をリリースしたので、
今回の変更点をまとめておこうと思います。

今回の主要な追加はSerializeMapなんですが、機能は前々回と、
前回の日記にまとめたとおりです。
あの時点では(デ)シリアライザはベタ実装だったんですが、
せっかくなんでインターフェース規定して、設定ファイルから
インジェクション出来るようにしました。
取り合えずはObjectOutputStream関係を使った実装を同梱してます。
後ほど、前の記事で調査して凄く相性が良かったMessagePackを使った実装もリリースします。


インターフェースクラスは以下です。
http://sourceforge.jp/projects/okuyama/svn/view/trunk/src/okuyama/imdst/util/serializemap/ISerializer.java?view=markup&revision=706&root=okuyama
シリアライズのインターフェース下の通りで、シリアライズ対象のMapと、そのMapに格納されている
KeyとValueそれぞれのClassが渡ってきますので参考程度に利用できます。
返却値はシリアライズ処理後のbyte配列です。
public byte serialize(Map serializeTarget, Class mapKeyClazz, Class mapValueClazz);


シリアライズのインターフェース下の通りで、先ほどのシリアライズで処理したbyte配列が引数で渡ってきます。
それをMap復元して返します。
public Map deSerialize(byte deserializeTarget);



取り合えず今のokuyamaで実装しているソースは以下です。
http://sourceforge.jp/projects/okuyama/svn/view/trunk/src/okuyama/imdst/util/serializemap/ObjectStreamSerializer.java?view=markup&revision=706&root=okuyama
この実装はシリアライズはObjectOutputStreamを利用してシリアライズしたものをZipで圧縮して、
シリアライズはそれをZip復元後、ObjectInputStreamでMap化しています。


設定方法は、DataNode.propertiesの"DataSaveMapType"と、"SerializerClassName"の設定を使います。
DataSaveMapType=serialize <=これでSerializeMapを使う宣言になります
SerializerClassName=okuyama.imdst.util.serializemap.ObjectStreamSerializer <=実装したシリアライズクラスを指定します。




残りの追加機能はしたの通りです。
1.OkuyamaClientからもデータの有効期限を設定可能にしました。
    今まででも、memcachedプロトコルであれば、有効期限は設定可能だったのですがOkuyamaClientでも
    使えるようにしました。利用方法はsetValueとsetNewValueの最終引数にInteger型で有効な秒数を
    渡すだけです。Integerの最大桁数まで設定可能です。


2.データ取得時に有効期限を自動的に延長するgetValueAndUpdateExpireTimeというメソッドをOkuyamaClientに追加しました。
    通常のgetValueと同様の動きをするのですが、このメソッドは実行時点でそのデータの有効期限内で
    あれば、最初に設定した有効期限分有効な期間が延長されます。
    利用想定としてはWeb系のセッション情報などで、セッション情報にアクセスすればセッションを
    破棄する時間を延ばすような場合です。


3.複数Tagを指定して紐付くKeyとValueを取得するgetMultiTagValuesメソッドをOkuyamaClientに追加しました。
    今までTagへの取得系処理はKey、KeyとValueのセットどちらの場合でも指定できるTagは1つだったんですが、
    それを、複数のTagを指定出来るようにしました。
    取得される値はKeyとValueがセットされたMapです。
    あと、複数指定なので挙動としてANDとORを指定できるようにしました。
    ANDなら指定したTag全てに紐付くKey-Valueセットだけ、ORならどれかのTagに紐付いていれば取得しれます。
    ただ、まだ実装があまりイケてないので、OkuyamaClientのインターフェースは変えませんが、内部実装は後々
    変えるかもしれません。


4.データ一括削除機能をUtilClientに追加しました。
    okuyama.imdst.client.UtilClientにokuyamaの全てのデータもしくは、指定したパーティションのデータを
    一括削除する機能を追加しました。truncateのような処理です。
    利用方法はしたの通りです
    java -classpath ./:./classes okuyama.imdst.client.UtilClient truncatedata 192.168.1.1 8888 all
    1つ目の引数は、'truncatedata'固定です。
    2つ目の引数は、MasterNodeの中で、MainMasterNodeという管理を行っているMasterNodeの接続IP情報です。
    3つ目の引数は、MasterNodeの起動ポート番号です。
    4つ目の引数は削除する領域指定です。'all'なら全てのデータを削除、パーティション用のPrefix値ならその領域だけ削除されます。
    削除はデータ量に依存して処理時間がかかります。


5.MasterNodeの設定情報を取得する機能をUtilClientに追加しました。
    MasterNodeの持っている設定情報は起動後、DataNodeの追加や、MasterNodeの追加などで変化し続けます。
    そこで現時点でのMasterNodeが利用している設定情報を取得する機能を追加しました。
    MasterNodeの挙動がおかしい場合などこの機能でMasterNodeを起動させたまま状態診断ができます。
    利用方法はしたの通りです。
    java -classpath ./:./classes okuyama.imdst.client.UtilClient masterconfig 192.168.1.1 8888
    1つ目の引数は、'masterconfig'固定です。
    2つ目の引数は、設定情報を確認したいMasterNodeの接続IP情報です。
    3つ目の引数は、MasterNodeの起動ポート番号です。
    出力される情報はしたの通りで、
    "998,true,MainMasterNode=[true]- MyInfo=[127.0.0.1:8888]- MainMasterNodeInfo=[127.0.0.1:8888]- AllMasterNodeInfo=[127.0.0.1:8888 127.0.0.1:8889 127.0.0.1:11211]-
    CheckMasterNodeTargetInfo=- Algorithm [0]:mod [1]:consistenthash=[1]- AllDataNodeInfo=[{third=[localhost:7553 localhost:7554] sub=[localhost:6553 localhost:6554]
    main=[localhost:5553 localhost:5554]}"
    それぞれの意味は
    "MainMasterNode=[true]":このMasterNodeがMainMasterNodeとして稼動しているかの情報
    "MyInfo=[127.0.0.1:8888]]":このMasterNodeのユニーク名
    "MainMasterNodeInfo=[127.0.0.1:8888]":現在のMainMasterNodeの情報
    "AllMasterNodeInfo=[127.0.0.1:8888 127.0.0.1:8889 127.0.0.1:11211]":現在のokuyamaクラスタの全MasterNode情報
    "CheckMasterNodeTargetInfo=":このMasterNodeが生存確認を行う必要があるMasterNodeの情報

    "Algorithm [0]:mod [1]:consistenthash=[1]":現在の振り分けアルゴリズム(最後の[1]がその値)
    "main=[localhost:5553 localhost:5554]":全てのDataNodeの情報
    "sub=[localhost:6553 localhost:6554]":全てのSubDataNodeの情報
    "third=[localhost:7553 localhost:7554]":全てのThirdDataNodeの情報


6.バグFixと性能向上をしました。
    いくつかバグをFixしました。まともに動かないバグではなく、最適化系です。




リリース内容は以上です。
今回はSerializeMapが目玉なのですが、今okuyamaを利用いただいていれば、是非利用を検討してみてください。
KeyもValueもファイルを使っている場合は恩恵はありませんが、Key-Value両方もしくはKeyだけでもメモリに
載っているいるのであれば、レスポンスとのトレードオフにはなりますが飛躍的に格納できる件数が向上します。
検証を行ったところ以下の設定でKey:40byte、Value:1000byte程度のデータを1DataNodeで1億件以上格納できました。

OS:CentOS(64bit)
メモリ:4GB
CPU:Core  i5
HDD:500GB×2(ソフトウェアRAID0)



KeyManagerJob1.memoryMode=false
KeyManagerJob1.dataMemory=false
KeyManagerJob1.keyMemory=true
KeyManagerJob1.keySize=20000000
KeyManagerJob1.memoryLimitSize=98
KeyManagerJob1.virtualStoreDirs=./keymapfile/virtualdata1/
KeyManagerJob1.keyStoreDirs=./keymapfile/data1/,./keymapfile/data2/



  • Xmx:2880m
  • Xms:2880m

okuyamaのSerializeMapを検証してみた(Get編)(番外編MessagePack for Javaも試してみた)

前回の記事でokuyamaに追加予定の機能である、
SerializeMapというメモリの効率化を考えて実装してみたMap実装の
Set性能を図ってみましたが、Mapなので当然Getがいるので今回はその性能を計測してみます。


まず、その前にこのSerializeMapの構造を下手ですが絵にしてみました。



上記のような構造です。内部でConcurrentHashMapを1つだけ保持していて、
このMapのValueにさらにHashMapをもっています。このHashMapに実際にKeyとValue
セットが格納されます。そしてこのHashMapはSerializeした後に
圧縮してbyte型の配列でもっているので、実際にMapが格納されているわけではありません。
Set、Getなどの処理が走った場合は、処理依頼のKey値をハッシュ関数を使って数値にして、
それをConcurrentHashMapの要素数で割って出た余りの場所のHashMapのbyte配列をDserializeして
処理を行います。ConcurrentHashMapの要素数は初期化時に固定化するので、固定数を超えることはなく、
メモリ中のMapオブジェクトはConcurrentHashMap1つになります。


利用したPCは前回と全く同じ、JVMオプションも同じで、メモリ割り当てだけ1024MBにしました。


テスト用のPGMは以下です。
http://sourceforge.jp/projects/okuyama/svn/view/trunk/test/SerializeMapGetTest.java?view=markup&revision=684&root=okuyama
このPGMは事前に指定した件数だけ、要素を登録してそれに対して並列に指定スレッド数で、Get処理をランダムに行います。
今回のテストは、事前に300万件登録したところに、8スレッド並列Getを行うパターンと、16スレッド並列Getを行うパターン。
それと、事前登録件数を500万件に増やしたパターンの4パターンを試しました。


まず、ConcurrentHashMapをそのまま利用した場合の参考値です。
※メモリ割り当てが512MBでは初期の300万件が格納しきれないので、メモリだけ2048MBにしました。
ConcurrentHashMapのテスト結果
・300万件 - 8スレッド
 4208588 QPS


・300万件 - 16スレッド
 4263423 QPS




・500万件 - 8スレッド
 3333208 QPS


・500万件 - 16スレッド
 3360552 QPS


速いなー!!
300万件の時に400万QPS超えてるし。




次にSerializeMapのテスト結果です。


・300万件 - 8スレッド
 189167 QPS


・300万件 - 16スレッド
 191084 QPS




・500万件 - 8スレッド
 145988 QPS


・500万件 - 16スレッド
 144266 QPS




上記のような結果になりました。
ConcurrentHashMapと比べてしまうと、全然ですが、
一応20万QPSぐらい出ているので、そこそこのスピードは出るようです。
並列数でのスピード低下はあまりないようです。
当然同じ値にアクセスを繰り返せば遅くなりますが、ある程度ランダムにアクセスがある場合は大丈夫なようです。
それよりもデータ数の増加に影響を受けるようです。これは圧縮解除、Dserializeの部分で大きなデータを扱うからですね。




さてここからは番外編です。
シリアライズしてデータを持つということで、ここまでは(デ)シリアライザにJavaのObjectOutputStreamを利用していたのですが、
その他のものも試してみたいと思います、MessagePack for Javaをためさせていただきました。
バージョンは0.5.2を利用。
プログラムとしてはSerializeMapのシリアライズ処理とデシリアライズ処理の部分をmsgpack用に変更して、圧縮は既に
ライブラリ側で行っているという情報がありましたので除外。


シリアライズは以下のような構文
    public static byte dataSerialize(Map data) {
      return MessagePack.pack(data);
    }


デシリアライズは以下のような構文
    public static Map dataDeserialize(byte
data) {
      return (Map)MessagePack.unpack(data, tMap(TString, TString));
    }


ものすごく分かりやすいですね!!


では早速Setからテスト。
条件は[http://d.hatena.ne.jp/okuyamaoo/20110616:title=前回]とまったく同じ状態です。


・1秒当たりのセット数
TotalExecCount = 2981566
QPS = 298156
凄いです。秒間30万Setなので、標準のObjectシリアライズが6万QPS程度だったので、
大体5倍程度のスピードです。単純にConcurrentHashMapと比べて、2分の1程度のスピードが出ています。




つぎに、限界格納数


開始直後
1124725
2112027
2788948
3377631
3938942
大体、3秒で100万から70万件のペースでsizeが増えているので、
30〜25万QPSでているのが分かります。
非常に高速ですね。


そして、1000万を超えた当たりでは、
10337128
10462071
10597972
10760863
10907505
大体、3秒で13万件から15万件程度増えているので、5万QPSぐらでしょうか。
引き続きかなり高速です。


そして、1400万件程度のデータを格納した状態で、秒間のSet数が700QPS程度になりました。
14685807
14688025
14690492
14692575
14694516


この後、1480万程度でOutOfMemoryとなりました。
ConcurrentHashMapをただ利用した場合は200万件程度でOutOfMemoryでしたので、
大体7倍程度のデータが保持できて且つ、1000万件保持時点でもかなり高速です。


続いて、Getのテストです。
こちらも最初のほうに書いているのと同じ条件で試しました。
テスト結果です。


・300万件 - 8スレッド
 972738 QPS


・300万件 - 16スレッド
 963889 QPS




・500万件 - 8スレッド
 683249 QPS


・500万件 - 16スレッド
 684657 QPS




凄く速い!!
データの増加で遅くなることはどうしようもないとは思いますが、
300万件へのアクセスにいたっては、100万QPSに届きそうな勢いなので、
実にObjectOutputStreamの5倍です。500万件でも非常に高速であることが分かりますが。




シリアライズによりデータサイズがかなり小さくなり、そして高速なシリアライズ、デシリアライズは非常に有効ですね。
okuyamaでも利用させて頂こうかと考えております。