Twitter4JとDroolsでなんかリアルタイムな感じのもの その3 ルール編

やっとこさルールです。ここを見ないとDroolsが何なのか分かりませんね。
ルールはDrools Rule Language (DRL) という言語で記述します。

twitterRules1.drl

declare Status
    @role( event )
    @timestamp( createdAt )
    @expires( 2s )
end

まず、入力されるPOJOをEventとして定義します。Statusというのは1つのtweetを表す、Twitter4Jのインターフェースです。
@timestampはこのイベントのタイムスタンプです。タイムスタンプがルールマッチに関連する場合に重要になります。デフォルトではイベントがエントリポイントに投入された時刻がタイムスタンプになりますが、Statusの場合、createdAtというプロパティ、つまりtweetが投稿された時刻を持っているので、それをマッピングします。
また @expires( 2s ) というのはEventが 2秒後には GC の対象になってもよいということを明示的に指定します。OutOfMemoryを避けるためには重要です。

rule "Dump tweets"
when
    $s : Status() from entry-point "twitter"
then
    System.out.println( MessageFormat.format( "[{0,time,full}] @{1} - {2}",
                                              $s.getCreatedAt(),
                                              $s.getUser().getScreenName(),
                                              $s.getText() ) );
end

これがルールの本体です。

WHEN にはイベントがルールにマッチするための条件
THEN にはマッチした際に実行される処理
を記述します。

基本的にパターンマッチは Status() の括弧の中に記述されます。今回は何も書いていないので、全てのStatusがマッチします。 $s は変数のバインドです。マッチしたイベントは$sとして参照可能です。

これではちょっと面白く無いですね。
続いてtwitterRules2.drl

rule "Dump tweets from people laughing"
when
    $s : Status( text matches ".*lol.*" ) from entry-point "twitter"
then
...

textはStatusのプロパティ、tweet本文です。matchesはDRLのオペレータです。想像通り、正規表現にマッチします。「lol」のところを「なう」とかに変えて試してみよう。

twitterRules3.drl

rule "Dump tweets from US"
when
    $s : Status( ) from entry-point "twitter"
    $p : Place( countryCode == "US" ) from $s.place
then
...

Tweetメタデータも使えます。

twitterRules4.drl

rule "Dump tweets from user conversation"
when
    $s1 : Status( $name : user.screenName ) from entry-point "twitter"
    $s2 : Status( inReplyToScreenName == $name, this after[1ms,2m] $s1 ) from entry-point "twitter"
then
...

ここがイベント処理の真骨頂です。
このルールはふたつのStatus($s1, $s2)に対して、$s2が$s1にリプライしており、かつ$s2が$s1の後、2分以内に発生している、ということを表しています。つまり、2分以内の会話を抽出している、ということです。
これをJavaのロジックで記述しようとすると、少々面倒でしょう。(そして不要なオブジェクトを貯めすぎないよう、メモリ対策もしなければならない!)
今回の例、Tweetは「ある一瞬の時刻に発生するイベント」なので、時間の関係は「後(after)」「前(before)」「同時(coincides)」の3つだけですが、「一定の長さを持つイベント」の場合は「Aが終わったときにBが始まる(meets)」「Aが終わるときにBが終わる(finishes)」などなど、様々な関係を表すオペレータが定義されています。
http://www.slideshare.net/ge0ffrey/applying-cep-drools-fusion-drools-jbpm-bootcamps-2011
のスライド 24-25 ページを見てもらえるとイメージがつかめると思います。

twitterRules5.drl

rule "Count tweets in 10 seconds"
    duration( 10s )
when
    Number( $count : intValue ) from accumulate(
        Status() over window:time( 30s ) from entry-point "twitter",
        count(1) )
then
...

これは過去30秒間に発生したtweetの数をカウントするルールです。まあ、sampleストリームなので現実に世界中で発生した全てのtweetをカウントしているわけでは無いんですけどね。
accumulateの文法はちょっとキモいけど、短い文で書ける、ということは分かってもらえると思います。
duration( 10s ) という属性もポイントで、これにより「10秒ごとにこのルールを評価する」という動きにできます。

最後は複数ルールのコンビネーションです。
twitterRules6.drl

rule "Filter retweeted messages"
when
    $s : Status( retweetCount > 0 ) from entry-point "twitter"
then
    entryPoints["retweeted"].insert( $s );
end

rule "Filter non-retweeted messages"
when
    $s : Status( retweetCount == 0 ) from entry-point "twitter"
then
    entryPoints["non-retweeted"].insert( $s );
end

まずリツイートされたツイートと、そうでないツイートを別々のエントリポイント("retweeted", "non-retweeted")に振り分けます。もともと定義していないエントリポイントを、entryPoints["retweeted"]のようにダイナミックに用意できます。

rule "Dump retweeted"
when
    $s : Status( user.screenName matches "[a-g].*" ) from entry-point "retweeted"
then
...
end

rule "Dump non retweeted"
when
    $s : Status( user.screenName matches "[h-p].*" ) from entry-point "non-retweeted"
then
...

続いて、それぞれのエントリポイントからのイベントに対して、パターンマッチが適用されます。ま、これ自体はこのような段階を踏まなくても実装できるルールでしたが、エントリポイントをチェインすることでイベントの流れをルールに作ることができます。

一方で、この複数のルールが独立して定義されていることにも注目してください。Javaのif/thenで大量のルールを実装しようとすると「実行パス」に非常に頭を悩ませることになるでしょう。Droolsは各ルールを並列に管理し、各ルールのwhen節を元に効率的な評価ツリーを構築したうえで、イベントのマッチングを評価していきます。なのでユーザは単純にルールを並べることに専念できます。
基本的にはルールには「順番」の概念がありません。もし、「ルールAのあとにルールBを実行して欲しい」などの要件がある場合は、いくつかの属性や、「ルールフロー」が役にたちます。上記の「エントリポイントのチェイン」もフローを作るひとつの方法です。もしそれが「ビジネスプロセス」であればjBPM( http://www.jboss.org/jbpm )の出番かもしれません。

ここでひとまず終わりです。その4はあるかもないかも。