Hatena::ブログ(Diary)

okuyamaooの日記

2012-03-24

Setsuna-0.0.2リリース~

21:52

SetsunaのVersion-0.0.2をリリースしました。


Github

Sourceforge.jpダウンロードページ


今回のリリースで一番大きな追加機能はServerモードの追加です。

0.0.1の時は、Setsunaにデータを入れる方法はパイプ入力しかなかったので、

コマンドラインで何かの出力をパイプでSetsunaに繋ぐしかなかったので、

Setsunaが動いているサーバ以外からデータをSetsunaに流せなかった

のが不便でした。そこでNetWork越しにデータを流せるようにこの機能を追加しました。


起動方法は簡単で"-server true"という表記を付けるだけです。ただServerモードの

場合はsetsuna.jar内に含まれないライブラリにも依存しているので従来の-jar指定での

起動は出来なくなっています。以下が起動サンプルです。

 java -classpath ./:./lib/msgpack/*:setsuna.jar setsuna.core.SetsunaMain -server true

(※リリース物のsetsuna.jarがあるディレクトリで実行している想定です)

これでPort番号10028番でサーバで待ち受けています。

※起動ポート番号を変更する場合は"-bindport"オプションで変更できます。


で、今回Serverモードを実装するにあたってどのようなプロトコルと仕組みにしようか

悩んだのですが、独自に実装するのは止めて、MessagePack-RPCを利用しました。

MessagePack-RPCとても高速でありかつ、各言語のバインディングが作成されているため、

言語別でクライアントを別途作成する必要がないため、採用させていただきました。

MessagePack-RPCの詳しい情報はこちら


そして、サーバ側に定義されているリモートメソッドは以下になります。

 int next (String[] sendData)

凄く単純にしました。パイプ入力をメソッド化したイメージです。

引数はデータをカラム単位で分解した後のデータを配列にして渡します。

つまり以下のPingの結果の場合

192.168.11.1 からの応答: バイト数 =32 時間 =16ms TTL=64

半角スペースでばらしてSetsunaに送り込みたい場合以下のような配列を渡すことになります。

String[] sendData = new String[7];
sendData[0] = "192.168.11.1";
sendData[1] = "からの応答:";
sendData[2] = "バイト数";
sendData[3] = "=32";
sendData[4] = "時間";
sendData[5] = "=16ms";
sendData[6] = "TTL=64";

今回も前回のSetsunaと同じで、Setsuna起動時に-columnオプションでカラム定義を決めれば

その定義に従って内部DBにテーブルが作られます。もし-columnを指定しなければ、最初に送られて

きたデータをもとに、テーブルが作られます。基本的に、入り口がパイプからMessagePackのサーバ

変わっただけで後の機能は全て同じ動きをします。


そして、サーバからの戻り値は3種類あります。

---------------------------------------------

0=データ入力成功

-9=カラム定義と送られてきたデータが合っていない

-1=サーバ側でエラー発生

---------------------------------------------

この戻り値を見て以降のクライアントでなんだかの処理をしてもらえればと思います。

実際にこれを送り込むJavaでのクライアント実装のサンプルはリリース物の

test/ServerClientSampleディレクトリのSetsunaServerModeClientSample.javaになります。

コードはこんな感じです。

1
2  import org.msgpack.rpc.Client;
3  import org.msgpack.rpc.loop.EventLoop;
4   
5  /**
6   * Setsunaを"-server true"にて立ち上げた場合にデータを投入する
7 * Clientのサンプル.
8 * SetsunaのServerモードはMessagePack-RPCで出来ているため、
9 * ClientもMessagePack-RPCで作成する必要がある。
10 * Setsuna側には以下のメソッドが定義されているので
11 * それをに実装する.
12 * -- 定義メソッド ----------
13 * int next (String[] data)
14 * --------------------------
15 * 引数は1レコード当たりのデータを表す配列
16 * 戻り値は呼び出しの成否(0=正常終了, -9=カラム定義と合わない , -1=内部エラー)
17 * 18 * @author T.Okuyama 19 */ 20 public class SetsunaServerModeClientSample { 21 22 /** 23 * SetsunaをRPC越しに呼び出し 24 * 渡す値はカラム数分のString値の配列.一貫して同じ長さ分を送ること 25 * 戻り値:正常終了 0, カラム定義と合わない -9, 内部エラー -1 26 */ 27 public static interface RPCInterface { 28 /** 29 * 30 * @param data カラムデータの配列。必ずカラム数分必要 31 * @return int 結果:正常終了=0, カラム定義と合わない=-9, 内部エラー=-1 32 */ 33 int next(String[] data); 34 } 35 36 public static void main(String[] args) throws Exception { 37 EventLoop loop = EventLoop.defaultEventLoop(); 38 39 Client cli = new Client("localhost", 10028, loop); 40 RPCInterface iface = cli.proxy(RPCInterface.class); 41 42 String[] list = new String[3]; 43 list[0] = "aa"; 44 list[1] = "bb"; 45 list[2] = "cc"; 46 for (int i = 0; i < 1000; i++) { 47 // データを登録する 48 System.out.println(iface.next(list)); 49 } 50 cli.close(); 51 loop.shutdown(); 52 } 53 }

このサンプルクライアントでポイントはまず、27行目〜34行目のリモートメソッドの定義で

上に示したサーバ側の定義をインプリしてます。そして37行目でEventLoopクラスを作成し39行目でクライアント

Setsunaのアドレスとポートと先程のEventLoopを指定して作成。

そして次の40行目で、リモートメソッドを設定。

42行目〜45行目でSetsunaに渡したいデータを作成して、48行目でリモートメソッドを呼び出しています。


見てわかる通りMessagePack-RPCのおかげで非常に簡潔にかけます。

他言語のバインディングも本家サイトに出ているので、基本同じ要領だと思います。

これで他のサーバからデータが流せるので、イベント処理を集約出来ると思います。

ただ1台に集約するとそのサーバがこけたらどうするなどは今後の課題です。



この他の追加機能ですが、主にデバッグ系の機能と取り込みデータを

データベースマッピングする辺りを強化しました。

デバッグ&Setsuna自身のログ機能

"-debug"オプション

前のバージョンではSetsunaにどのようなデータがAdapterで送り込まれ、どのようなTriggerが動き、どのようなSQL

検証用に流れどのようなUserEventが実行されたか分かりませんでした。さすがにSetsunaを利用して開発するうえで

これでは厳しいので、デバッグログを出せるようにしました。

"-debug"というオプション設定で出力されます。実際に指定すると以下のようになります。

 ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -debug on

指定してるは最後の"-debug on"という部分です。これでコンソールに標準入力から送り込まれたデータがデバッグされます。

以下のような感じになります。

Debug : Sat Mar 24 18:02:50 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.237 ms]
{"COLUMN0":"64","COLUMN1":"bytes","COLUMN2":"from","COLUMN3":"192.168.1.1:","COLUMN4":"icmp_seq=1","COLUMN5":"ttl=64","COLUMN6":"time=0.237","COLUMN7":"ms"}
Debug : Sat Mar 24 18:02:50 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.297 ms]
{"COLUMN0":"64","COLUMN1":"bytes","COLUMN2":"from","COLUMN3":"192.168.1.1:","COLUMN4":"icmp_seq=2","COLUMN5":"ttl=64","COLUMN6":"time=0.297","COLUMN7":"ms"}
Debug : Sat Mar 24 18:02:51 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=3 ttl=64 time=0.245 ms]

デバッグで出力される文字列は先頭が"Debug"という文字始まる行が実際に送り込まれたデータ文字列です。

その他の"-trigger"、"-query"、"-event"、"-eventquery"で指定した内容や実行した内容も同じように

"Debug"が先頭について出力されます。

ただ、見ても分かるようにデバッグ以外の出力も混じっているので非常に見辛いです。

これを改善するのが、以下の指定です。

 ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -debug only

"-debug only"となっているのがその指定です。さきほどは"on"だった部分を"only"とすることで、以下のようになります。

Debug : Sat Mar 24 18:29:20 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.263 ms]
Debug : Sat Mar 24 18:29:20 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.243 ms]
Debug : Sat Mar 24 18:29:21 JST 2012 - - Pipe Input=[64 bytes from 192.168.1.1: icmp_seq=3 ttl=64 time=0.229 ms]

デバッグのみの出力になっているのがわかってもらえると思います。


"-errorlog"オプション

このオプションはSetsunaが吐き出すエラー内容をファイルに出力するためのものです。

従来はコンソールにしか出なかったので、それをファイルに出力出来るようにしました。

以下のように指定します。

 ping -i 1 -c 1000 192.168.1.1 | grep --line-buffered ttl | java -jar setsuna.jar -errorlog /var/log/setsuna_error

これで"/var/log/setsuna_error"にエラー出力が出るようになります。


インプットデータの読み込み開始レコード指定&カラム定義違いのデータを送り込んだ際に無視する指定

"-offsetオプション"

Setsunaにインプットされるデータを内部DBに取り込みを開始する位置をレコード数で設定できます。

設定は以下のようになります。このオプションは必ず"-column"オプションを指定する必要があります。

ping -i 1 -c 1000 192.168.1.1 | java -jar setsuna.jar -column "col1 col2 col3 col4 col5 col6 col7 col8" -offset 2

"-offset 2"の部分の指定です。これで、インプットされるデータの2レコード目から取り込まれます。

1行目はとりこまれず無視されます。

利用される想定は以下のようにpingのデータをSetsunaに送り込みたい場合

[setsuna]$ ping -i 1 -c 1000 192.168.1.1
PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data.
64 bytes from 192.168.1.1: icmp_seq=1 ttl=64 time=0.277 ms
64 bytes from 192.168.1.1: icmp_seq=2 ttl=64 time=0.278 ms

1行目のデータのフォーマットと2行目が異なるためそのまま取り込むとExceptionが発行されます。

これは"-column"指定で定義したカラム定義が8カラムあるのに対し1行目の

"PING 192.168.1.1 (192.168.1.1) 56(84) bytes of data."をデフォルトの半角スペースで分解すると

以下のように7カラムしかないためです。

col1="PING" 
col2="192.168.1.1"
col3="(192.168.1.1)"
col4="56(84)"
col5="bytes"
col6="of"
col7="data."
col8=データがない!!

このような場合に従来はgrepなどでSetsunaにデータを送り込む前に、不要なデータは省いてもらってたんですが、

このオプションを使うことで取り込み開始位置を調整することで、入力データのヘッダーをとばしたり出来ます。


"-skiperror"オプション

"-offset"は取り込み開始位置を調整できますが、取り込みを開始した出してから定義違いのデータがきた場合には

やはりExceptionが発生します。そこで"-skiperror"オプションを指定すると、途中に定義違いのデータが来た

場合にそのデータを取り込まずに無視します。以下のように指定します。

ping -i 1 -c 1000 192.168.1.1 | java -jar setsuna.jar -column "col1 col2 col3 col4 col5 col6 col7 col8" -skiperror true

最後の"-skiperror true"です。この指定は-columnの指定は必須ではありません。"-column"を指定しない場合は、

最初のレコードから自動的にカラム定義が作成されます。



以上が今回の追加機能です。これ以外にはいくつかの効率化とバグFixをしています。

追加機能も含めて全ての機能は"-help"オプションで確認してもらえます。

まだまだ、発展途上のプロジェクトなので、気軽にご意見いただけるとありがたいです!!

さあ、次は分散化でも考え始めるかな〜。

2012-03-15

ComplexEventProcessingエンジン Setsuna をリリースしました!!

01:17

やっとリリースできました!

Sourceforge.jpダウンロードページ


名前はSetsuna(セツナ)としました。

時間の単位を表す言葉である刹那からきています。

刹那の短さでデータを処理出来ればいいなということでこの名前にしました。



このSetsunaですが、CEPといわれるカテゴリに属するものです。

ではCEPとは??なんですが、これは語源そのままです。


Complex Event Processing


複数のイベントつまりデータを処理するエンジンです。

イメージとしてはデータの流れに直接処理を行えるイメージです。



あまり文章ばかりでもつまらないので、実際Setsunaを使いながら説明していきたいと思います。

では題材はこんなのにしたいと思います。

Apacheが重い!! ひょっとして攻撃されてる?!」


すいません、わけわからんタイトルですね。。

やりたいことは、Webサーバの負荷が一定値を超えた場合に、Apacheアクセスログの直近1万件以内からの

IPアドレス別でアクセス回数を合計しその合計値が、規定回数を超えていた場合に、アラート内容をSystemログに

出力するというものです。


ではこの処理をどのようにするか順を追って見ていきたいと思います。

※環境はCentOS5.5(64bit)前提として進めます。


Step1.サーバの負荷を監視する

Step1:まずはサーバの負荷を監視しないとダメです。

ここではtopコマンドのロードアベレージを負荷の指標に利用したいと思います。

$top -d 1

結果は以下のような感じです。

top - 16:47:19 up 8 min,  2 users,  load average: 0.06, 0.11, 0.08
Tasks:  78 total,   1 running,  77 sleeping,   0 stopped,   0 zombie
Cpu(s):  2.5%us,  3.0%sy,  0.0%ni, 93.9%id,  0.2%wa,  0.1%hi,  0.3%si,  0.0%st
Mem:    793284k total,   180116k used,   613168k free,    12272k buffers
Swap:  1048568k total,        0k used,  1048568k free,   130704k cached

  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
    1 root      15   0  2072  636  544 S  0.0  0.1   0:00.72 init
    2 root      RT  -5     0    0    0 S  0.0  0.0   0:00.00 migration/0
               ・
               ・
               ・

では早速これをSetsunaに流し込みたいと思います。

Setsunaは標準でパイプで繋ぐことでデータを流し込むことが可能です。

Setsunaをダウンダウンロードしてもらって適当な場所に解凍してもらうと

中にsetsuna.jarというファイルがあると思います。それが本体です。


実行可能jarなので、javaが動く環境であればこのファイル以外は何も必要ありません。

$top -b -d 1 | java -jar setsuna.jar

上記を実行すると以下のようなエラーが出ると思います。

setsuna.core.util.SetsunaException: The read data differs from definition information. Adapter name is PIPE
        at setsuna.core.adapter.SetsunaCoreAdapterEngine.doEvent(SetsunaCoreAdapterEngine.java:67)
        at setsuna.core.AbstractCoreEngine.run(AbstractCoreEngine.java:54)

これは、Setsunaの自動テーブルマッピングの機能がこけています。つまりSetsunaまでは

topの結果は届いています。ではどういったエラーか??

 それは、1行目のデータと2行目のデータの形式が違いますよということを言っています。

Setsunaはパイプライン入力で渡されたデータを内蔵するH2Databseに自動的に格納します。

そのため、格納するためにデータをレコードとして認識しかつ、カラムに分解しようとします。

Setsunaはデフォルトではデータ(レコード)の区切りを改行とし、カラムレベルへの分解に

半角スペースを利用します。(2つ以上続くスペースは自動的に1つにトリムされます)

topの結果をもう一度みてみると、

1: top - 16:47:19 up 8 min,  2 users,  load average: 0.06, 0.11, 0.08
2: Tasks:  78 total,   1 running,  77 sleeping,   0 stopped,   0 zombie

行番号1と2では全くフォーマットが異なります。フォーマットとはスペースで分解した際の分解できた数です。

そこでロードアベレージを含む1行目だけをSetsunaに送り込んでみましょう。これにはgrepを利用します。

そしてこのデータの流れに名前を付けたいと思います。-streamオプションを利用すると名前が付けれます。

$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top


Exceptionが出なくなり変わりに以下のような結果になっていないでしょうか?

{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:59","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:00","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:01","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}

これはSetsunaが取り込んだデータをJSONフォーマットで

カラム情報とセットで出力しています。

Setsunaはパイプラインでデータを与えるだけの場合このような動きになります。

ではそれぞれ見ていきましょう。


topのロードアベレージの行

top - 16:47:19 up 8 min,  2 users,  load average: 0.06, 0.11, 0.08

Setsunaの結果

{
 "COLUMN0":"top",
 "COLUMN1":"-",
 "COLUMN2":"17:01:58",
 "COLUMN3":"up",
 "COLUMN4":"23",
 "COLUMN5":"min,",
 "COLUMN6":"2",
 "COLUMN7":"users,",
 "COLUMN8":"load",
 "COLUMN9":"average:",
 "COLUMN10":"0.20,",
 "COLUMN11":"0.12,",
 "COLUMN12":"0.09"
}

マッピング出来ていますね。

目当てのロードアベレージの値は"COLUMN10"の値です。

これでSetsunaへのデータの流し込みは完了です。

この流し込む処理の部分はAdapterと呼んでいます。



Step2

では次に、もう一つのデータであるApacheアクセスログを流し込みたいと思います。

このデータはアクセスIPの抽出に利用したいと思います。

先ほどと同じ手順でSetsunaにデータを流し込みます。

このデータの流れにはlogという名前を付けます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log

今回は先ほどと違い、-sepという項目がついています。

これは、レコードデータをカラムに分解する際のセパレータを指定しています。

僕の環境のアクセスログは以下のようなフォーマットでした。

192.168.1.100 - - [15/Mar/2012:17:29:52 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11"

ここからIPだけを取り出したいので、データを分解するセパレータに" - -"という文字列

利用しました。これで先頭のIP部分の直後で分解できます。

そして実際にSetsunaに流しこんだ結果が下です。


{"COLUMN0":"192.168.1.100","COLUMN1":" [15/Mar/2012:17:46:45 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11""}

"COLUMN0"にIP情報がマッピングされているのがわかります。



Step3

ここまではデータをSetsunaに流し込むだけでした、ここからは実際にデータを処理して負荷を検知してみます。

負荷の検知には先ほどのtopのデータの流れのロードアベレージの値を使いたいと思います。

そして、負荷を判定するための条件を設定したいと思います。それには-triggerオプションを利用します。

そして、より複雑な条件であるアクセス頻度の高いIPを割り出すために、-queryオプションを使い、

SQLにて、アクセスログのデータ内から基底回数を超えるアクセスを行っているIPがあった場合に、

Systemログにアラーを書きだそうと思います。


まず、先ほどのデータをクリアするために、2つのSetsunaプロセスを止めてください。Crtl+Cなどで強制終了でOKです。

そしてまずアクセスログを流し込みます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log

そして、topの結果を流し込むんですが、その前に、-triggerを作りたいと思います。

triggerではロードアベレージが規定値を超えていないかをチェックさせます。


			
  • trigger "COLUMN10 > 1"

ここで利用するのは先ほどの流しことんだデータを参考にCOLUMN10を指定しています。

これは直近のロードアベレージの値です。そして、'>'という条件記号を使って、

'1'とい数値以上を指定しています。

ここで注目してもらいたいのは、上の分解したtopのCOLUMN10の値です。

 "COLUMN10":"0.20,"

COLUMN10の値は数値の後ろに','が入っています。この文字である"0.20,"を数値変換すると

当然最後の','のせいで変換エラーになってしまいます。

しかし、Setsunaの持つ'>'や'<'条件記号はこういった値を可能な限り数値化します。

つまり最後の','は外して数値に変換してから比べます。

これは、対象データの中から数値を最初に見つけたところから数値ではなくなるまでの間の

値を数値として認識するように出来ているからです。なので、"Count=10"みたいな値で

あっても10と認識することができます。

このほかに条件記号は完全一致を指定する'='や、部分一致を指定する'like'が存在します。

ではこれでtriggerは完成です。


次にアクセス頻度の高いIPを見つける-queryです。これは完全にSQLで記述します。

既に実行中のアクセスログストリームを利用したいと思います。

アクセス頻度の高いIPを抽出するクエリだけ記述します。

select 
 count(*) 
from 
 (select count(column0)  as gcnt,
               column0 
   from 
     log \ // 20120321:修正 
   group by column0) 
where (gcnt*3) < (select 
                    avg(cnt) 
                  from (select 
                         count(column0) as cnt,
                               column0
                        from 
                         log
                        where 
                         C_TIME > DATEADD(SECOND, -60, current_timestamp)
                        group by column0)
);

これで直近の60秒のログからアクセスしてきたIPアクセス回数の平均を

大きく超える(3倍)IPが存在しているかをカウントいてます。

ここで注目していただもらいたいのは、下から3行目のC_TIMEという項目です。

これはSetsunaがデータをテーブルに格納する際に自動的に作成しているカラムです。

このカラムには、データを投入した日時がTimestamp型で登録されます。

なので、このように現在時刻から60秒前のような指定が可能になっています。

"C_TIME > DATEADD(SECOND, -60, current_timestamp)"の部分。

このほかにlong型のシーケンス値をもっています。これは0始まりのシーケンス

なので、単純にデータをソートする際はそちらが便利です。

この条件指定の部分をQueryと呼んでいます。



さて-tirggerと-queryでデータをハンドリング出来れば後は、UserEventと言う最後の部分です。

ここは、ユーザが自由に定義した処理を指定する部分です。

今回の例ではloggerと言われるsyslogにログを出力する標準のライブラリを使いたいと思います。

UserEventを利用する場合は-eventと記述し、それ以降に実行したコマンドを記述します。

以下のようになります。

 -event "logger Warning!! System busy"

そして今までの全てを繋ぐと以下のようになります。


$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top\
 -trigger "COLUMN10 > 1" \
 -query "select \
          * \
        from \
         (select count(column0)  as gcnt,\
                       column0 \
           from \
             log \  // 20120321:修正
           group by column0) \
        where (gcnt*3) < (select \
                            avg(cnt) \
                          from (select \
                                 count(column0) as cnt,\
                                       column0\
                                from \
                                 log\
                                where\ 
                                 C_TIME > DATEADD(SECOND, -60, current_timestamp)\
                                group by column0)"\
 -event "logger Warning!! System busy"

これでtopのロードアベレージ値が1以上の場合に、直近60秒間のアクセスの間に異常に

アクセス数が多いIPアドレスを見つけてsyslogにワーニングを出力できます。


※2012/03/20 追記

  • eventで指定したコマンドには引数としてこのUserEventを実行した引き金となる

Adapterからのデータが1レコードが渡されるので第一引数で取得して使えます。

こんな感じです。

 -event "logger "

実際には下のようになります。

 logger "{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}"

1レコードではなく、SQLなどでデータを取得して、それを利用したい場合は

  • eventqueryを使います。

これはUserEventでSQLを実行するもので書き方は-queryと全く同じです。

ただ結果が標準出力に出力されるので、Setsunaそのものをさらに別のコマンドにパイプで繋いで、-eventqueryの結果を利用出来ます。

バージョン0.0.1では-eventか、-eventqueryのどちらかしか指定出来ませんが

次のバージョンで、このクエリの結果を-eventで指定したコマンドにも渡せる様にする予定です。



このように、複数のデータをSetsunaに流しておけばそれらを複合してデータの変化を掴むことが

出来ます。ユーザイベントも指定は自由なので、メールを出したり、今回の例で言うとiptablesなどで

IPを遮断することもできます。

ただし、データベースではないためデータを蓄積することはできません。SQLでデータが引けるため

データは蓄積されるように見えますが、自動的に古いデータから消えていきます。(デフォルト10分前)

これは大量のインプットデータに高速に処理を行うために内臓DBインメモリであるためです。

あくまで処理を行うエンジンというところですね。



現在はパイプのインプットしかないので今後はサーバ型の開発を行い、外部からNW越しにデータを

流せるようにする予定です。ログコレクタも最近は熱いので、それらともつなげたいです(fluentdとか良いですね!!)。

2012-02-24

Complex Event Processing Engineをつくりはじめた

00:58

久しぶりの投稿だ。。



今まではokuyamaメインでOSS活動をしていたけども、新たにプロダクトを始めました。

新しいプロダクトはCEPと言われる種類のソフトウェアです。

そもそも、CEPってなんだ??

Complex Event Processingの略になります。

訳すと、複合イベント処理です。

でっ、何が出来るかなんですが


リアルタイム処理が得意なエンジンです。

これだけではピンとこないので、少し例を考えてみます。


例1)

あるサーバの監視がしたいとします。

監視には馴染みのsarや、topを使うと便利ですよね。

例えばtopを-bなどの標準出力に結果を書き出すモードで動かして、

これをパイプでgrepにつないでロードアベレージ

部分を取り出して、1を超えてたらメールを投げたい。こんな場合どう作りましょう?

ロードアベレージを取り出すまではこのままで良さそうです。

じゃあこれをファイルにリダイレクトして、それを監視しするスクリプトを書きましょうか。

結構面倒です。それに発見までにタイムラグがありそうです。


これを整理すると、

ロードアベレージというデータの流れに

条件検索を行って、

結果次第で処理をしています。


では、次は例2)

例1に別の要素を加えたらどうでしょう?

ロードアベレージが規定値を越えただけでメールが着たら夜寝れないので、

もう少し監視を賢くしたいです。

たとえばロードアベレージが規定値を超えてる場合にすぐメールを送るのではなく、

サイトの応答速度が3秒を越えてる場合にだけおくりたい。

または、I/O-Waitが30% を越えてる場合にメールを送りたい。

考え出すとキリがありません。


しかもロードアベレージがヤバくなってからサイトの応答速度を計測してたのでは遅そうです。

この処理は先ほどのロードアベレージだけではなくサイトの応答時間や、I/O-Waitなど、

ロードアベレージがこうなって、応答時間がこうなった場合という複数の条件検索をおこなって

満たした場合に処理をおこなっています。



こういった複数の条件を満たした場合を検知するには結構な仕組みが必要だと思います

しかも条件をみたしたと同時に処理をすぐに行わなければならない。



そこでCEPです。

CEPはリアルタイムに複数のデータに横断的に条件マッチングを行ってそれに

その結果次第でイベントを発生させることに特化しています。

okuyamaのようにデータをためたりはできません。でも、一定期間のデータを

蓄えることは出来ます。なので、その一定期間に対して処理をすることも

得意です。さっきの例でいうと、今のロードアベレージではなく直近5分のロード

アベレージの平均がこうだったらとか、サイトの応答が3秒以上かかっていることが

5分以内に100回以上あったらなどを見つけることが出来ます。

そしてユーザが作成したイベントを勝手に呼び出してくれます。

そしてなによりこの一連の処理を相当高速におこないます。

なのでアクセスログのような凄い勢いでつくりだされるデータを

リアルタイムに処理することにも向いています。


このあたりのお話と今作っているエンジンのお話を今度

神戸である勉強会でお話する予定です。

http://atnd.org/events/25695



まだリリースはしてませんが、一応以下に実装中のコードがあります。

http://sourceforge.jp/projects/setsuna/

2011-07-29

Java7速攻テスト

| 00:46

というわけで、Java7がリリースされました。

色々と面白そうな機能がありますね。


僕はSCTPなどが結構注目なんですが、取り合えずJava7上で現行のokuyamaを動かして、

どれぐら単純比較で性能が違うのか検証してみました。


okuyama側は特にJava7に合わせて変更などはしていないので、リリースVersion-0.8.8のままです。

ちなみに、okuyamaはThread、java.lang系、java.util系、java.io系、java.util.concurrent系などを

いたるところで使っています。

コンパイルのみJdk6とJdk7でそれぞれ実行前に行いました。

okuyamaのモードは永続化用のWALファイルは常に吐き出して、それ後データをメモリに持つモードと、

ファイルに持つモードで計測しました。ファイルに持つ場合はファイルへのI/Oランダムリードです。

メモリ展開はSeriarizeMapもためしました。この場合はObjectのシリアライズ、デシリアライズ且つ、zip圧縮が行われます。

以下は実行したPCのスペックです。1台で行いました。

--------------------------------

CPUIntel Corei5 650 3.20GHz

Memory:4GB

HDDSATA 7200rpm 500GB×1

OS:CentOS5.5 (64bit)

--------------------------------

okuyamaのGCオプションは以下です。

"-Xmx1024m -Xms1024m -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseParNewGC"


テストは同時に8クライアントスレッドで生成し、1分アクセスをSet、Get共にし続けます。

実際にアクセスを行うクラスは以下です。

http://sourceforge.jp/projects/okuyama/svn/view/trunk/test/ResponseTestThread.java?view=markup&revision=637&root=okuyama

このクラスを下記のクラスで8個インスタンス化してThreadとして実行します。

http://sourceforge.jp/projects/okuyama/svn/view/trunk/test/ResponseTest.java?view=markup&revision=442&root=okuyama



それではそれぞれの結果です。

-----------------------------------------------------------------

モード:WALファイル+Key&Value共にメモリ

-----------------------------------------------------------------

●Set

■Java6

60秒 = 1045091

17418 QPS

■Java7

60秒 = 1201714

20028 QPS


●Get

■Java6

60秒 = 1359632

22660 QPS

■Java7

60秒 = 1553264

25887 QPS


-----------------------------------------------------------------

モード:WALファイル+Key&Value共にメモリ+SerializeMap

-----------------------------------------------------------------

●Set

■Java6

60秒 = 383142

6385 QPS

■Java7

60秒 = 418954

6982 QPS


●Get

■Java6

60秒 = 708405

11806 QPS

■Java7

60秒 = 799126

13318 QPS



-----------------------------------------------------------------

モード:WALファイル+Key=メモリ&Value=ファイル

-----------------------------------------------------------------

●Set

■Java6

60秒 = 837324

13955 QPS

■Java7

60秒 = 939385

15656 QPS


●Get

■Java6

60秒 = 1273324

21222 QPS

■Java7

60秒 = 1423650

23737 QPS



こんな結果になりました。

総じて性能が向上しているのが分かります。

だいたい10%〜15%といったところでしょか。

まだどこがどうなってこういう結果が出ているのか

調査できていないので、そのへんもやりたいですね。

とりあえず速攻テストでした。

2011-07-09

「第1回分散処理ワークショップ in Tokyo」を開催しました

| 01:53

7/8に「第1回分散処理ワークショップ」を@doryokujinさんと共同で開催しました。

ATNDリンク

参加いただきました皆様ありがとうございましたm(_  _)m


当日の資料やその他の情報を可能な限りまとめてみます。


@understeerさんのよるToggetter

    http://togetter.com/li/159266


当日のUStream(2元中継でおこないました)

その1

    http://www.ustream.tv/channel/dist-study-1#utm_campaign=togetter.com&utm_source=8743151&utm_medium=social

その2

    http://www.ustream.tv/channel/dist-study-2#utm_campaign=unknown&utm_source=8743162&utm_medium=social


当日のスピーカの方々の資料とUStream録画(当日発表順)(※現時点で私が認識している範囲です)

  ・okuyama - @okuyamaoo

    資料:http://www.slideshare.net/okuyamaoo/20110708-dist-studyokuyama

    Usthttp://www.ustream.tv/recorded/15864685#dist_study

  ・MongoDB - @doryokujinさん

    資料:http://www.slideshare.net/doryokujin/mongodb-replicationshardingmapreduce

    Usthttp://www.ustream.tv/recorded/15864876

  ・Hibari - @tatsuya6502さん

    資料:http://www.slideshare.net/tatsuya6502/hibari-nosql-at-diststudy-20110708

    Usthttp://www.ustream.tv/recorded/15865171

  ・Cassandra - @yukimさん

    資料:http://m7a.me/s/diststudy1/#1

    Usthttp://www.ustream.tv/recorded/15865706

  ・HBase - @ueshinさん

    資料:http://www.slideshare.net/ueshin/20110708-h-base1-in-tokyo

    Usthttp://www.ustream.tv/recorded/15866025

  ・MesgpackとFluentとKumofsについて - @frsyuki

    Usthttp://www.ustream.tv/recorded/15866352



当日は@ueshinさんと@frsyukiさんのお二人から飛び込みで発表頂きました。ありがとうございました。

また、会場をご提供頂きそして、運営を強力にバックアップいただきました、ニフティ

UStreamtogetterでの情報まとめや、受付、質問タイムの運営などを強力にバックアップいただいた、

@understeerさんをはじめてする皆様、本当にありがとうございました。

そして@doryokujinさん頼りっぱなしになってすいませんでした。


今回はどちらかというとNOSQLのイントロダクションとなりましたが、一部で深い話も出掛かっていましたので

次回はNOSQLを使ったブッチャけ開発トークや、ブッチャけ運用トークなどが出来れば面白いかなとも思っています。

僕自身okuyamaを実装してはいますが、それを使ったプロダクトの開発や、運用ではまだまだ迷うことも多いので

NOSQLの実システム適応や運用はまだまだ試行錯誤のフェーズなのかなとも考えています。

そのあたりをパート別で座談会出来ても面白いかなと思っています。

このへんは@doryokujinさんと反省会をして考える予定です。

関東ではこのままの勢いで近いうちに第2回が出来れば良いかなーと考えています。

後は、僕の地元でもある関西でも是非したいなーと思っています。

関東、関西両方に言えることなんですが、「こんなこと議論したいよ」などあれば@okuyamaoo

メンション頂ければと思います。