Hatena::ブログ(Diary)

CLOVER

2016-06-28

RabbitMQのRoutingを試してみる

RabbitMQのチュートリアルで遊んでみるシリーズ、今度はRoutingです。

RabbitMQ - RabbitMQ tutorial - Routing

このひとつ前のチュートリアルでは、Producer側が送ったメッセージがConsumerにブロードキャストするといったものでした。

今回のチュートリアルでは、Consumer側がメッセージのサブセットを受け取れるようにしてみます。このエントリでは、単純なメッセージの振り分けに加えて、元のチュートリアルのログ出力サンプルをマネたものを作ってみたいと思います。

メッセージをブロードキャストしていた時の振り返り

Producer側から送ったメッセージをブロードキャストしていた時のConsumer側のコードは、こんな感じでした。

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, "logs", "");

Channel#queueBindの第3引数が空文字です。

この第3引数は、routingKeyという特別なもので、このチュートリアルではこれをバインディングキーと呼ぶそうです(Channel#queueBindで、Exchangeとキューをバインドして関連付けますが、その時のキーとなっているので)。

バインディングキーの意味は、Exchangeの種類に依存します。このひとつ前のチュートリアルでは、Exchangeの種類は「fanout」となっていましたが、この場合はバインディングキーは無視されるそうです。

Exchangeの種類については、ここを見ればよいみたいですね。

RabbitMQ - AMQP 0-9-1 Model Explained

それでは、進めてみましょう。

準備

まず、RabbitMQは起動済みとします。

そして、クライアントライブラリにJavaを利用するので、Maven依存関係を定義します。こちらです。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>

テストコード用に、JUnitとAssertJ付きです。

テストコードの雛形

テストコードで動作確認をしますが、import文などの大枠はこんな感じです。
src/test/java/org/littlewings/rabbitmq/routing/RoutingTest.java

package org.littlewings.rabbitmq.routing;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class RoutingTest {
    // ここに、テストを書く!
}

Producer側を書く

ここでは、Producerから送信するメッセージを、特定のExchange、キューにバインドしたConsumerに送るコードを書きます。

前回のチュートリアルでは「fanout」でブロードキャストしたわけですが、今回は「direct」を使用します。

「direct」を使用すると、routingKeyにマッチしたキューにメッセージが送信されることになります。

書いたコードは、こちらです。

    public void publish(String routingKey, String... messages) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "direct");

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        channel.close();
        connection.close();
    }

routingKeyは、メソッドの引数としてもらうようにしました。

Exchangeを「direct」で定義して

        channel.exchangeDeclare("messageExchange", "direct");

メッセージをroutingKeyを指定して送信します。Channel#basicPublishの第2引数に、routingKeyを指定しているところがポイントです。

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("messageExchange", routingKey, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

Consuer側を書く

続いて、Consumer側を書いてみます。

書いたコードは、こちら。

    public List<String> subscribe(String... routingKeys) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("messageExchange", "direct");
        String queueName = channel.queueDeclare().getQueue();

        Arrays.asList(routingKeys).forEach(routingKey -> {
            try {
                channel.queueBind(queueName, "messageExchange", routingKey);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) {
                receivedMessages.add(new String(bytes, StandardCharsets.UTF_8));
            }
        };

        channel.basicConsume(queueName, true, consumer);

        TimeUnit.SECONDS.sleep(5L);

        return receivedMessages;
    }

テストできるように、受信したメッセージはListで戻すようにしています。

ここでのポイントは、ExchangeをProducer側と同じく「direct」で宣言していることと

        channel.exchangeDeclare("messageExchange", "direct");

Channel#queueBindで、routingKeyを第3引数に指定していることですね。なお、あとでも書きますが、Channel#queueBindでは複数のroutingKeyをバインドさせることができます。

        String queueName = channel.queueDeclare().getQueue();

        Arrays.asList(routingKeys).forEach(routingKey -> {
            try {
                channel.queueBind(queueName, "messageExchange", routingKey);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

ひとつのroutingKeyにバインドさせてみる

それでは、動かしてみましょう。まずは、ひとつのrougingKeyを使ってバインドさせてみます。

書いたコードは、こちら。

    @Test
    public void routingOneToOneBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("black"));
        Future<List<String>> f2 = es.submit(() -> subscribe("white"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("black", messages.toArray(new String[messages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .isEmpty();

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

送信するメッセージは2つとし、それぞれ「black」と「white」というroutingKeyでExchange、キューにバインドします。

        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("black"));
        Future<List<String>> f2 = es.submit(() -> subscribe("white"));

そして、routingKey「black」で送信。

        // Publish
        publish("black", messages.toArray(new String[messages.size()]));

すると、「black」でバインドしたConsumer側はメッセージを受信しますが、「white」でバインドしたConsumer側はメッセージを受信しないという結果になります。

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .isEmpty();

ちゃんと振り分けてくれましたね。

複数のroutingKeyにバインドさせる

最後に、Consumerを複数のroutingKeyにバインドさせてみます。

書いたコードはこちら。

    @Test
    public void routingMultipleBinding() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("info"));
        Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error"));

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish("debug", "debug-message1");
        publish("info", "info-message1", "info-message2");
        publish("warn", "warn-message1");
        publish("error", "error-message1", "error-message2");

        assertThat(f1.get())
                .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2"));
        assertThat(f2.get())
                .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2"));

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

ここでは、元のチュートリアルと同じ感じで、ログレベルを模したものにしました。

Consumer側は、「info」のみにバインドさせたものと、「debug」、「info」、「warn」、「error」の4種類のroutingKeyにバインドさせたものの2つを用意します。

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(() -> subscribe("info"));
        Future<List<String>> f2 = es.submit(() -> subscribe("debug", "info", "warn", "error"));

Producer側は、「debug」、「info」、「warn」、「error」の4種類のroutingKeyに対して、それぞれメッセージを送ります。

        // Publish
        publish("debug", "debug-message1");
        publish("info", "info-message1", "info-message2");
        publish("warn", "warn-message1");
        publish("error", "error-message1", "error-message2");

すると、ひとつめのConsumerは「info」のみ、もうひとつのConsumerは「info」を含むすべてのメッセージを受信できました。

        assertThat(f1.get())
                .containsExactlyElementsOf(Arrays.asList("info-message1", "info-message2"));
        assertThat(f2.get())
                .containsExactlyElementsOf(Arrays.asList("debug-message1", "info-message1", "info-message2", "warn-message1", "error-message1", "error-message2"));

OKそうですね。

まとめ

今回は、複数のConsumerに対して、routingKeyでExchangeおよびキューの配信先をコントロールするといったチュートリアルを実践してみました。

RabbitMQのチュートリアルは動かしてみてもなかなかすぐにピンとこない感じがするのですが…これはけっこう動かしやすかったかなと思います。

もうちょっと、用語とかは理解していかないといけない気はしますけれどね。

2016-06-24

Lucene KuromojiでN-Best解を求める

Lucene 6.0から、KuromojiにN-Best解を求めることができる機能が入っていたそうです。

Lucene Change Log

[LUCENE-6837] Add N-best output capability to JapaneseTokenizer - ASF JIRA

moco(beta)’s backup: Hello Lucene 6.0! ?その1:PointValues を使ってみる

全然気付いていませんでした…。というか、N-Best解が求められるということが、どういうことか知りませんでした…。

Lucenen 5.5リリース前くらいに、パッチが投げられたということで資料などもあったようです。特に、最初のYahooさんの資料は必見ですね。

第17回Lucene/Solr勉強会 #SolrJP ? Apache Lucene Solrによる形態素解析の課題とN-bestの提案

Solr の形態素解析の新機能 N-best 解を試す - おうちらぼ

N-Best解を求めるということ

N-Best解を求めるというのは、形態素解析で得られるひとつの結果ではなくて、その他にとり得るパターンも候補として取り出すことができるもののようです。で、どこまでのパターンを許容するかを、コストとして指定すると。

このあたりは、MeCabのコスト計算を見ることになりました…。

MeCab: Yet Another Japanese Dependency StructureAnalyzer

日本テレビ東京で学ぶMeCabのコスト計算 | mwSoft

MeCab で N-Best 解の累積コストを出力する - あらびき日記

形態素解析での分割を行う時、MeCabのフォーマットでいくと「連接コスト + 単語生起コスト (文頭から累積) (%pc)」が最小となるものを選ぶようですが、ここでその他の解も出力しますよ、と。

Lucene KuromojiでN-Best解を求める

Lucene Kuromojiの場合はSEARCHモードやEXTENDEDモードなどありますが、N-Bestで複数の解を求めさせることで、SEARCHモードなどとは別のアプローチで適合率の低下を抑えつつ、再現率の向上を目指しているのだとか。

Apache Solrでは使えそうですね。ElasticsearchのKuromojiでは、まだ先のバージョンになりそうな感じです。

Lucene KuromojiでN-Best解を求めるには、JapaneseTokenizerを使います。JapaneseAnalyzerではありません。
また、JapaneseTokenizerのモードは、NORMALとすることになります。
※Atilika Kuromojiでは、N-Best解は求められなさそう?

コストはJapaneseTokenizer#setNBestCostで指定し、また適切なコスト差分を求めるためのJapaneseTokenizer#calcNBestCostも利用することができます。

今回は、単純にJapaneseTokenizer#setNBestCostでコストを指定してN-Best解を求めてみたいと思います。

とりあえず、結果を見ないとよくわからないので…。

準備

ビルド定義は、こんな感じ。
build.sbt

name := "lucene-kuromoji-n-best"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies += "org.apache.lucene" % "lucene-analyzers-kuromoji" % "6.1.0"

N-Best解を含む結果を出力するプログラムを書いてみる

それでは、Lucene Kuromojiを使ってN-Best解を含んだ形態素解析結果を出力するプログラムを作ってみます。

繰り返しますが、Lucene KuromojiでN-Best解を使うには、JapaneseTokenizerでNORMALモードとし、setNBestCostをintで指定する必要があります。

    val tokenizer = new JapaneseTokenizer(null, true, JapaneseTokenizer.Mode.NORMAL)
    tokenizer.setNBestCost(nbestCost)  // intで指定

NBestCostのデフォルト値は、0となります。

ですので、N-Best解を求めるにあたって指定するコストを0から変えつつ結果表示するプログラムをこんな感じに書いてみました。
src/main/scala/org/littlewings/lucene/kuromoji/KuromojiNBest.scala

package org.littlewings.lucene.kuromoji

import scala.language.postfixOps
import scala.sys.process._
import java.io.{ByteArrayInputStream, File, StringReader}
import java.nio.charset.StandardCharsets

import org.apache.lucene.analysis.ja.{GraphvizFormatter, JapaneseTokenizer}
import org.apache.lucene.analysis.ja.dict.ConnectionCosts
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute

import scala.language.postfixOps

object KuromojiNBest {
  def main(args: Array[String]): Unit = {
    val targets = Array(
      /** ここで、形態素解析する文章とNBestCostを指定 */
    )

    targets.foreach {
      case (word, costs) => costs.foreach(cost => displayNBestAndViterbi(word, cost))
    }
  }

  def displayNBestAndViterbi(target: String, nbestCost: Int): Unit = {
    val graphvizFormatter = new GraphvizFormatter(ConnectionCosts.getInstance)

    val tokenizer = new JapaneseTokenizer(null, true, JapaneseTokenizer.Mode.NORMAL)
    tokenizer.setReader(new StringReader(target))
    tokenizer.setNBestCost(nbestCost)
    tokenizer.setGraphvizFormatter(graphvizFormatter)

    val charTermAttr = tokenizer.addAttribute(classOf[CharTermAttribute])

    tokenizer.reset()

    val words =
      Iterator
        .continually(tokenizer.incrementToken())
        .takeWhile(identity)
        .map(_ => charTermAttr.toString)
        .toArray

    tokenizer.end()
    tokenizer.close()

    println {
      s"""|Input = ${
        target
      }, NBest = ${
        nbestCost
      }
          |${
        words.map(w => "  " + w).mkString(System.lineSeparator())
      }""".stripMargin
    }

    val dotOutput = graphvizFormatter.finish()
    "dot -Tgif" #< new ByteArrayInputStream(dotOutput.getBytes(StandardCharsets.UTF_8)) #> new File(s"target/${
      target
    }.gif") !
  }
}

この部分で、文章とNBestCostを与えつつ結果を見ていこうと思います。

    val targets = Array(
      /** ここで、形態素解析する文章とNBestCostを指定 */
    )

    targets.foreach {
      case (word, costs) => costs.foreach(cost => displayNBestAndViterbi(word, cost))
    }

なお、最後にGIFファイルを作成していますが、これは以下のエントリの内容を使って、形態素解析時のトークナイズの様子をビジュアル化するためのものです。

Lucene Kuromojiのトークナイズを、Graphvizを使ってビジュアル化する - CLOVER

では、他のエントリに習って、最初はこれで試してみます。

    val targets = Array(
      ("デジタル一眼レフ", Seq(0, 2000))
    )

    targets.foreach {
      case (word, costs) => costs.foreach(cost => displayNBestAndViterbi(word, cost))
    }

結果は、このように。

Input = デジタル一眼レフ, NBest = 0
  デジタル
  一
  眼
  レフ
Input = デジタル一眼レフ, NBest = 2000
  デジタル
  一
  一眼
  眼
  レフ

NBestCostが0(デフォルト)の時は「デジタル」「一」「眼」「レフ」ですが、NBestCostを2000にすると、「デジタル」「一」「一眼」「眼」「レフ」が得られるようになります。

この時に得られたGraphvizでの画像は、こちら。
f:id:Kazuhira:20160625005351g:image

NBestCostが0の時に選ばれたのが緑の線でトレースされているものですが、NBestCostが2000の時に得られたものは…どれでしょう…。

とりあえずいったん置いておいて、次に「水性ボールペン」で試してみます。

    val targets = Array(
      ("水性ボールペン", Seq(0, 2000, 5677))
    )

    targets.foreach {
      case (word, costs) => costs.foreach(cost => displayNBestAndViterbi(word, cost))
    }

「水性ボールペン」の場合は、NBestCostが2000では解が変わらなかったので、もっと大きく上げると他の候補が得られました。

結果は、こちら。

Input = 水性ボールペン, NBest = 0
  水性
  ボールペン
Input = 水性ボールペン, NBest = 2000
  水性
  ボールペン
Input = 水性ボールペン, NBest = 5677
  水
  水性
  性
  ボール
  ボールペン
  ペン

図はこうなりました。
f:id:Kazuhira:20160625005352g:image

どうもピンとこないので、もっと簡単な例で試してみましょう。

    val targets = Array(
      ("ボールペン", Seq(0, 5677))
    )

    targets.foreach {
      case (word, costs) => costs.foreach(cost => displayNBestAndViterbi(word, cost))
    }

「ボールペン」、です。こちらもNBestCostをけっこう大きく取らないと、他の候補が得られませんでした。

結果はこのように。

Input = ボールペン, NBest = 0
  ボールペン
Input = ボールペン, NBest = 5677
  ボール
  ボールペン
  ペン

図はこちら。
f:id:Kazuhira:20160625005353g:image

ここまでくると、なんとか読めるようになります。

NBestCostが0の時に選ばれているのが

3593 - 283 = 3310

そして、NBestCostを5677と指定した時に得られたものが

(4214 - 283) + (4994 + 62) = 8987

のパターンですね。

つまり、

8987 - 3310 = 5677

というわけで、5677をNBestCostに指定すると、他の候補が得られたわけですね。

ここで足しているコストが、MeCabのフォーマットでいう「連接コスト + 単語生起コスト (文頭から累積) (%pc)」というわけです。

このあたりは、ホントにここを見るとよいです。

MeCab: Yet Another Japanese Dependency StructureAnalyzer

日本テレビ東京で学ぶMeCabのコスト計算 | mwSoft

MeCabでN-Best解を求める

で、これをMeCabでも求められるようなので、合わせて見てみました。

「ボールペン」。

$ echo ボールペン | mecab -F"%m,%phl,%phr,%pb,%pw,%pc,%pn\n" -N2
ボールペン,1285,1285,*,3593,3310,3310
EOS
ボール,1285,1285, ,4214,3931,3931
ペン,1285,1285, ,4994,8987,5056
EOS

「-N2」で、N-Best解を2つ求める、と。

出力する内容を「-F」でフォーマット指定するのですが、「%pw」が単語生起コスト、「%pc」が連接コスト + 単語生起コスト (文頭から累積)です。

この出力では連結コストは見えませんが、単語生起コストに連結コストを加えたものが「%pc」なので、右から2番目の値を見ればよいということになります。
また、累積なので複数単語の場合は最後のものを見ればよいわけですね。

つまり、「ボールペン」なら3310、「ボール」と「ペン」なら8987です。

先ほど試してみた、他の言葉でも試してみましょう。

デジタル一眼レフ」。ここでは、求めるN-Best解を3にしました。

$ echo デジタル一眼レフ | mecab -F"%m,%phl,%phr,%pb,%pw,%pc,%pn\n" -N3
デジタル,1285,1285,*,3083,2800,2800
一,1295,1295,*,3485,4660,1860
眼,1285,1285,*,4540,7621,2961
レフ,1285,1285,*,3657,11340,3719
EOS
デジタル,1285,1285,*,3083,2800,2800
一眼,1285,1285, ,5117,7979,5179
レフ,1285,1285,*,3657,11340,3361
EOS
デジタル,1292,1292, ,6228,5250,5250
一眼,1285,1285, ,5117,7979,2729
レフ,1285,1285,*,3657,11340,3361
EOS

この場合は、先ほどのKuromojiの例と同じ結果を求めるには、眼と一眼の差、7979と7621の差、358をコスト指定すればよいみたいです。

最後に、「水性ボールペン」。

$ echo 水性ボールペン | mecab -F"%m,%phl,%phr,%pb,%pw,%pc,%pn\n" -N3
水性,1285,1285,*,5599,5316,5316
ボールペン,1285,1285,*,3593,8971,3655
EOS
水,1285,1285, ,7385,7102,7102
性,1298,1298, ,7390,9402,2300
ボールペン,1285,1285,*,3593,8971,-431
EOS
水性,1285,1285,*,5599,5316,5316
ボール,1285,1285, ,4214,9592,4276
ペン,1285,1285, ,4994,14648,5056
EOS

この場合、Kuromojiの例と同じ結果となるコストは、14648と8971の差っぽいですねぇ…。

まとめ

Lucene Kuromojiに追加されたN-Best解を求める方法で、NORMALモードでもSEARCHモードとは異なる方法で、複数の候補が出せるようになったことがわかりました。

コスト指定は悩みどころのような気がしますが、なかなか面白そうです。

ただ、JapaneseAnalyzerではNBestCostは指定できないので、実際にAnalyzerとして使う場合には各Filterを自分で組み上げたAnalyzerを書かないとダメそうですね(Solrを使う場合は、この限りではなさそうですが)。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/lucene-examples/tree/master/lucene-kuromoji-n-best

2016-06-19

RabbitMQでPublish/Subscribeして遊ぶ

RabbitMQのチュートリアル3段、Publish/Subscribeをやってみます。

RabbitMQ - RabbitMQ tutorial - Publish/Subscribe

これまでのチュートリアルは、キューを作成して、ひとつのProducerがひとつのConsumerにメッセージを配信する構成でした。今度は、メッセージを複数のConsumerに送ります。このパターンは、「Publish/Subscribe」として知られています。

こちらのチュートリアルを見つつ、ひとつのProducerから投げたメッセージを、複数のConsumerが受け取るコードを書いて動かしてみます。

ここでは、こういう動作を行うコードを書きます。

  • Producerがメッセージを送る
  • キューがバッファとしてメッセージを保存する
  • Consumerがメッセージを受け取る

RabbitMQの考えとして、Producerはキューに直接メッセージを送らないというものがあるようです。実際、Producerはキューに入ったメッセージが配信されたかどうかは知りません。

代わりに、ProducerはExchangeにメッセージを送信するだけです。Exchangeは、受け取ったメッセージをどのように扱うべきかを知っています。特定のキューに追加されるべきか?多くのキューに追加されるべきか?または破棄されるべきか?これは、Exchangeの種類によって決まります。

今回のチュートリアルでは、このあたりを見ていくことになります。

準備

まずは、Maven依存関係。

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.2</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.4.1</version>
            <scope>test</scope>
        </dependency>

RabbitMQのJava Clientと、テストコード用のJUnit/AssertJを加えています。

また、RabbitMQはデフォルト状態+アクセス可能なユーザーを作成した状態で起動しているものとします。

テストコードの雛形

テストコードの全体像は、こんな感じにしました。
src/test/java/org/littlewings/rabbitmq/publishsubscribe/PublishSubscribeTest.java

package org.littlewings.rabbitmq.publishsubscribe;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class PublishSubscribeTest {
    @Test
    public void publishSubscribe() throws IOException, TimeoutException, ExecutionException, InterruptedException {
        // Source messages
        List<String> messages = Arrays.asList("Hello World!", "Hello RabbitMQ!!");

        // Subscribe
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<List<String>> f1 = es.submit(this::subscribe);
        Future<List<String>> f2 = es.submit(this::subscribe);

        // Subscribe側が起動しきるまで、待機
        TimeUnit.SECONDS.sleep(2L);

        // Publish
        publish(messages.toArray(new String[messages.size()]));

        assertThat(f1.get())
                .containsExactlyElementsOf(messages);
        assertThat(f2.get())
                .containsExactlyElementsOf(messages);

        es.shutdown();
        es.awaitTermination(5L, TimeUnit.SECONDS);
    }

    // ここに、Publish/Subscribeのコードを書く!
}

先にConsumerを2つ起動し、それからProducerにメッセージを投げてもらいます。投げるメッセージは2つで、それぞれ「Hello World!」と「Hello RabbitMQ!!」です。2つのComsumerからは、この両方のメッセージが受け取れているかどうか確認します。Comsumer側は、スレッドを2つ使って2つ分のComsumerがいることを表現します。

なお、ちょっと待機時間が入っているのは、Consumerが起動しきる前にProducerがメッセージを投げてしまうと、Consumerがいない状態のメッセージは破棄されてしまうからです…。

送信側(Producer)

Producer側のコードは、こんな感じ。

    private void publish(String... messages) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "fanout");

        Arrays.asList(messages).forEach(message -> {
            try {
                channel.basicPublish("logs", "", null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        channel.close();
        connection.close();
    }

基本的にはキューを直接使っていた時と変わりませんが、変化があったのはここ。

        channel.exchangeDeclare("logs", "fanout");

Channel#exchangeDeclareを使用します。ここでは「logs」というのはExchangeの名前を指しますが、「fanout」というのがExchangeの種類を指します。

Exchangeの種類は「direct」、「topic」、「headers」、「fanout」のいずれかを使用できます。今回使用している「fanout」はとても単純なもので、すべてのキューにメッセージをブロードキャストするものだとか。

ブロードキャスト…?

RabbitMQ - AMQP 0-9-1 Model Explained

名前なしのExchange

利用可能なExchangeは、rabbitmqctlコマンドで確認することができます。

# sudo -u rabbitmq rabbitmqctl list_exchanges
Listing exchanges ...
amq.headers	headers
amq.topic	topic
	direct
amq.rabbitmq.log	topic
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.direct	direct
amq.match	headers

この「amq.*」となっているExchangeは、デフォルトの(名前なしの)Exchangeで作成すればすぐに使えるようになっています。

これまでのチュートリアルではExchangeについては意識しておらず、Channel#basicPublishを呼び出す時に、第1引数を空文字にしていました。

channel.basicPublish("", "hello", null, message.getBytes());

第1引数はExchangeの名前を表すらしく、ここで空文字を指定したということは名前なしのExchangeを使用していることになるようです。


そして、今回の例ではExchangeの名前を指定しています、と。

        channel.exchangeDeclare("logs", "fanout");

受信側(Comsumer)

Comsumer側のコードは、こんな感じになりました。

    private List<String> subscribe() throws IOException, InterruptedException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("kazuhira");
        factory.setPassword("password");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "fanout");
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, "logs", "");

        List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) {
                receivedMessages.add(new String(bytes, StandardCharsets.UTF_8));
            }
        };

        channel.basicConsume(queueName, true, consumer);

        TimeUnit.SECONDS.sleep(10L);

        return receivedMessages;
    }

Consumer側の特徴は、Channel#queueDeclare、queueBindの使用でしょうか。

単純なメッセージ送信、受信のコードでは、キューの名前を指定してConsumerの実装を紐づけていましたが、今回はProducer側でキューの作成をしていません。とはいえ、Channel#basicConsumerでの登録時にはキューの名前が必要になります。

そこで、引数なしのChannel#queueDeclareを呼び出すことで、ランダムな名前の空で、新しいキューを作成します。このキューは、クライアントが切断すると削除されます。
non-durable、exclusive、autodeleteなキューとなるそうな。

        channel.exchangeDeclare("logs", "fanout");
        String queueName = channel.queueDeclare().getQueue();

ランダムなキューの名前のサンプルは、「amq.gen-JzTY20BRgKO-HjmUJj0wLg」といった感じです。

そして、キューへのバインドを行います。この時に、先ほど取得したキューの名前、そしてExchangeの名前を使用します。

        channel.queueBind(queueName, "logs", "");

なお、現在のバインドされている状態は、「rabbitmqctl list_bindings」で確認することができます。

# sudo -u rabbitmq rabbitmqctl list_bindings
Listing bindings ...
	exchange	amq.gen-5rTz4mjD2G09MUACN_QIhw	queue	amq.gen-5rTz4mjD2G09MUACN_QIhw	[]
	exchange	amq.gen-x69BVWFhg5UPtTYJgtlg5A	queue	amq.gen-x69BVWFhg5UPtTYJgtlg5A	[]
logs	exchange	amq.gen-5rTz4mjD2G09MUACN_QIhw	queue		[]
logs	exchange	amq.gen-x69BVWFhg5UPtTYJgtlg5A	queue		[]

あとは、Consumerの実装を作成してChannel#basicConsumeで登録すると、Producerのメッセージ送信に反応して受信が行われます。

        channel.basicConsume(queueName, true, consumer);

まとめ

今回は、RabbitMQを使用してPublish/Subscribeについて書いてみました。

Exchangeが出てきたりと、新しいことも学びましたが、Exchangeの種類やキューへのバインドについてはちょっと理解が怪しいです。もう少し進めながら、ちょっとずつわかるといいなーという感じですね。

2016-06-18

Payara MicroでHazelcastの設定をする

Payara Microで、「--hzConfigFile」という起動オプションを使用することで、Hazelcastの設定ファイルを与えることができるようです。

なんとなくオプションが追加されていたのは気付いていましたが、これが使えるようになったのはPayara 4.1.153からっぽいですね。

Payara Micro Command Line Options

Payara 4.1.153以前はPayara MicroではHazelcastの設定に付いては、マルチキャストアドレス/ポート、コミュニケーションポートの開始ポート、そもそもクラスタを無効化するくらいが指定できたようですが、設定ファイルが使えるとだいぶ設定の幅が広がりますね。

せっかくなので、ちょっと試してみました。

準備

まずは、動作対象となるアプリケーションを作ってみます。

pom.xmlは、こんな感じに。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>payara-micro-hazelcast-configuration</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>

    <build>
        <finalName>app</finalName>
    </build>

    <dependencies>
        <dependency>
            <groupId>fish.payara.extras</groupId>
            <artifactId>payara-micro</artifactId>
            <version>4.1.1.162</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

アプリケーションとしては、HttpSessionを使った簡単なJAX-RS APIを用意します。
src/main/java/org/littlewings/payara/rest/JaxrsApplication.java

package org.littlewings.payara.rest;

import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;

@ApplicationPath("rest")
public class JaxrsApplication extends Application {
}

src/main/java/org/littlewings/payara/rest/HelloWorldResource.java

package org.littlewings.payara.rest;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;

@Path("helloworld")
public class HelloWorldResource {
    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String helloWorld(@Context HttpServletRequest request) {
        HttpSession session = request.getSession();

        String time = (String) session.getAttribute("time");
        if (time == null) {
            LocalDateTime now = LocalDateTime.now();
            time = now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            session.setAttribute("time", time);
        }

        return time;
    }
}

web.xmlには、distributableを付けておきます。
src/main/webapp/WEB-INF/web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <distributable/>
</web-app>

で、アプリケーションをビルドしておきます。

$ mvn package

以降、このアプリケーションを起動する時に、Payara MicroにデプロイしつつHazelcastの設定を変えていってみます。

今回利用するPayara MicroのJARファイルは、payara-micro-4.1.1.162.jarとなります。

Payara Server & Payara Micro - Downloads

また、今回はPayara Microで起動するアプリケーションは、Node 2つでクラスタを組むものとします。2つ目のNodeのHTTPリッスンポートは、9080とします。

起動時には、以下のコマンドで実行することを前提とします。以降では、以下のコマンドの[Hazelcastの設定ファイル]の部分に該当するファイルを作成、指定して実行していきます。

## Node 1
$ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war --hzConfigFile [Hazelcastの設定ファイル]

## Node 2
$ java -jar ../payara-micro-4.1.1.162.jar --deploy target/app.war --port 9080 --hzConfigFile [Hazelcastの設定ファイル]

HttpSessionが共有されているかどうかの動作確認には、以下のコマンドを使っています。

## Node 1
$ curl -b cookie.txt -c cookie.txt http://localhost:8080/app/rest/helloworld
2016-06-18 21:18:28

## Node 2
$ curl -b cookie.txt -c cookie.txt http://localhost:9080/app/rest/helloworld
2016-06-18 21:18:28

HttpSessionが共有されていれば、同じ日時が返ってきます、と。

グループ名を変えてみる

まず、設定ファイルの内容が反映されているか確認するために、グループ名を変えてみることにします。ここでいうグループ名とは、Hazelcastがクラスタを構成するためのグループで、Hazelcastクラスタはひとつのグループに属することになります。

Creating Cluster Groups

最初は、設定なしの状態で起動してみましょう。

$ java -jar payara-micro-4.1.1.162.jar --deploy target/app.war

起動時に、Hazelcastクラスタに属するメンバーが以下のように出力されます。

[2016-06-18T20:27:17.624+0900] [Payara Micro 4.1] [INFO] [] [com.hazelcast.cluster.impl.MulticastJoiner] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1466249237624] [levelValue: 800] [[
  [172.17.0.1]:5900 [development] [3.6.2] 


Members [1] {
	Member [172.17.0.1]:5900 this
}
]]

ここで、次のような設定ファイルを用意します。groupタグの中が、グループの設定です。クラスタグループ名は、「my-cluster」にしました。
注)マルチキャスト有効化の設定が入っていますが、これを書かないとクラスタ構成自体も無効化されてしまうようなので、設定ファイルを与える場合は明示する必要があるようです
hazelcast-custom-group.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <group>
        <name>my-cluster</name>
        <password>password</password>
    </group>

    <network>
        <join>
            <multicast enabled="true"/>
        </join>
    </network>
</hazelcast>

では、起動してみます。

$ java -jar ../payara-micro-4.1.1.162.jar --deploy target/app.war --hzConfigFile hazelcast-custom-group.xml

起動時に認識したメンバーの出力が、以下のようになりました。

[2016-06-18T20:34:14.325+0900] [Payara Micro 4.1] [INFO] [] [com.hazelcast.cluster.impl.MulticastJoiner] [tid: _ThreadID=1 _ThreadName=main] [timeMillis: 1466249654325] [levelValue: 800] [[
  [172.17.0.1]:5701 [my-cluster] [3.6.2] 


Members [1] {
	Member [172.17.0.1]:5701 this
}
]]

※コミュニケーションポートの設定がHazelcastのデフォルトに戻っているので、5701に…

ちゃんと「my-cluster」になっていますね。

  [172.17.0.1]:5701 [my-cluster] [3.6.2] 

続いて、他の設定も変えていってみましょう。

マルチキャストを無効化する

(デフォルトの)Hazelcastはマルチキャストを使ってクラスタを構成しますが(Node Discovery)、開発環境等でこれを無効化するには、ネットワーク設定でマルチキャストを使わないようにします。

Join

hazelcast-disable-mc.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <network>
        <join>
            <multicast enabled="false"/>
        </join>
    </network>
</hazelcast>

これで、スタンドアロンになります。

まあ、Payara Microの場合はHazelcastの設定ファイルを与えてしまうと、マルチキャストを明示的に有効化しないとスタンドアロンになるみたいですが…。

ちなみに、単にクラスタを無効化したいのであれば、Hazelcastの設定ファイルを与えずとも「--noCluster」を使えばOKです。起動もちょこっと速くなりますね。

$ java -jar ../payara-micro-4.1.1.162.jar --deploy target/app.war --noCluster

この方法でもクラスタは構成しなくなります。

TCPでクラスタを構成する

Payara Micro+Hazelcastのデフォルトのクラスタ構成(Node Discovery)はマルチキャストですが、これをTCPに変更してみます。

Join

設定ファイルの例は、以下のとおり。
hazelcast-enable-tcp.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <network>
        <port auto-increment="true">6000</port>
        <join>
            <multicast enabled="false"/>
            <tcp-ip enabled="true">
                <member>localhost</member>
            </tcp-ip>
        </join>
    </network>
</hazelcast>

今回はわかりやすいように、コミュニケーションポートも6000に変えてみました(Payara Microのデフォルトだと5900)。また、localhostの範囲でクラスタを作るようにしています。

これで、TCPでHazelcastクラスタが構成されます。

[2016-06-18T20:43:26.613+0900] [Payara Micro 4.1] [INFO] [] [com.hazelcast.cluster.ClusterService] [tid: _ThreadID=47 _ThreadName=hz._hzInstance_1_dev.generic-operation.thread-0] [timeMillis: 1466250206613] [levelValue: 800] [[
  [localhost]:6000 [dev] [3.6.2] 

Members [2] {
	Member [localhost]:6000 this
	Member [localhost]:6001
}
]]

Mapの設定をしてみる

続いて、Payara MicroがHttpSessionのデータ保存に利用する、Distributed Mapの設定をしてみましょう。

Map

今回は、Distributed Map名「/app」に対して、いろいろ設定してみました。
hazelcast-custom-map.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <network>
        <join>
            <multicast enabled="true"/>
        </join>
    </network>

    <map name="/app">
        <in-memory-format>BINARY</in-memory-format>
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>

        <near-cache name="default">
            <in-memory-format>BINARY</in-memory-format>
            <max-size>100</max-size>
            <time-to-live-seconds>600</time-to-live-seconds>
            <max-idle-seconds>60</max-idle-seconds>
            <invalidate-on-change>true</invalidate-on-change>
            <cache-local-entries>false</cache-local-entries>
        </near-cache>
    </map>
</hazelcast>

ムダにNear Cacheを使ってみたり、バックアップは非同期で作成するような設定にしています。

なんで名前が「/app」かなのですが、WARファイル名前というか、コンテキストパスに依存しています?

今回のWARファイルの名前は、「app.war」です。
※pom.xmlより

    <build>
        <finalName>app</finalName>
    </build>

なお、「default」という特殊な名前のDistributed Mapの設定を行うと、他に作成されるDistributed Mapにも設定が引き継がれるので注意しておきましょう。

    <map name="default">

明示的に名前を付けて設定を定義した場合は、「default」には引きずられないようです。

Hazelcastの設定をしてみよう - CLOVER

Mananagement Centerを使う

最後は、Hazelcastが提供するManagement Centerと接続するように設定してみましょう。

Management Center

このManagement Centerを使うと、Hazelcastクラスタのモニタリングができるようになります。
※Payara自体がどの程度Hazelcastの状態を見れるかはわかっていません、スミマセン…

Management Centerを使うには、まずHazelcastのダウンロードページよりHazelcastのディストリビューションをダウンロードしてくる必要があります。

Download - Hazelcast - The Leading Open Source In-Memory Data Grid

今回は、Payaraが依存しているHazelcast 3.6.2のtar.gzファイルをダウンロードしました。

展開。

$ tar -zxvf hazelcast-3.6.2.tar.gz

ll hazelcast-3.6.2/mancenter
合計 34924
drwxr-xr-x 2 xxxxx xxxxx     4096  618 19:51 ./
drwxr-xr-x 9 xxxxx xxxxx     4096  618 19:51 ../
-rw-r--r-- 1 xxxxx xxxxx 35743282  618 19:51 mancenter-3.6.2.war
-rw-r--r-- 1 xxxxx xxxxx      453  618 19:51 startManCenter.bat
-rw-r--r-- 1 xxxxx xxxxx      306  618 19:51 startManCenter.sh

このManagement Center、単独でも実行可能WARとして起動できるのですが、今回はこれもPayara Microにデプロイしてみます。

クラスタを構成する必要はないので、「--noCluster」を付与します。また、HTTPリッスンポートは20000とします。

$ java -jar payara-micro-4.1.1.162.jar --deploy hazelcast-3.6.2/mancenter/mancenter-3.6.2.war --noCluster --port 20000

この設定で、「http://localhost:20000/mancenter-3.6.2」にアクセスすると以下のような画面が表示されます。
f:id:Kazuhira:20160618205749p:image

デフォルトだと、そのまま「login」でログインできます。

まだクラスタから情報が届いていないので、Hazelcastクラスタに関する情報はありません。
f:id:Kazuhira:20160618210237p:image

それでは、アプリケーション側のHazelcastの設定を行います。
hazelcast-mancenter.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <network>
        <join>
            <multicast enabled="true"/>
        </join>
    </network>

    <management-center enabled="true" update-interval="3">http://localhost:20000/mancenter-3.6.2</management-center>
</hazelcast>

「management-center」タグを使うことで、Management Centerへの接続設定を行うことができます。

今回は、「http://localhost:20000/mancenter-3.6.2」へ3秒に1度情報を更新する設定になっています。

Distributed Mapなどの設定は、デフォルトです。

ここで、2 Node起動してみます。すると、Management Centerの方でHazelcastクラスタを検知しているのでManagement Centerの開いているブラウザをリロードします。
f:id:Kazuhira:20160618210239p:image

ここで、このまま「Connect」ボタンを押すと、Hazelcastクラスタの情報が参照できます。
f:id:Kazuhira:20160618210240p:image

「/app」というMapsへのリンクがあるので、こちらを見てみます。
f:id:Kazuhira:20160618210242p:image

その後でcurlでアプリケーションに対してアクセスしてみます。

## to Node 1
curl -b cookie.txt -c cookie.txt http://localhost:8080/app/rest/helloworld

## to Node 2
curl -b cookie.txt -c cookie.txt http://localhost:9080/app/rest/helloworld

すると、「/app」Map側に情報が反映されます。
f:id:Kazuhira:20160618210243p:image

とまあ、こんな感じにPayara Microが使っているHttpSession(Distributed Map)の情報が見れました、と。

まとめ

Payara Microで起動するアプリケーションに対して、Hazelcastの設定ファイルを与えていくつか挙動を変えてみました。

Management CenterなどはちょっとHazelcastの製品寄りの話な気もしますが…あと、Payara自体の管理機能は見ていませんが…。

まあ、Payara Microを使ってもある程度Hazelcastの設定を制御できそうなことがわかってよかったです。

2016-06-16

Apache Spark(DataFrame API/Spark SQL)で、MySQLのデータを読み書きする

Apache SparkのDataFrame API、Spark SQLで、通常のJDBCアクセス可能なデータベースに対しても操作ができそうな感じだったので、ちょっと試してみました。



Thrift JDBC/ODBC serverを使わない?パターンっぽいですが…。

JDBC To Other Databases

お題

まず、対象のデータベースはMySQLとします。また、データのテーマは書籍で、bookテーブルとします。

このテーブルに対して、Apache Sparkでデータを読み書きしてみましょう。

テーブル定義は、こちら。

CREATE TABLE book(
  isbn VARCHAR(14),
  title VARCHAR(255),
  price INT,
  PRIMARY KEY(isbn)
);

準備

ビルド定義は、このようにしました。
build.sbt

name := "spark-sql-mysql"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := false

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
  "mysql" % "mysql-connector-java" % "6.0.2" % "runtime",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

Spark SQLは、providedにしておきました。また、テストコードで確認しようかなと思い、ScalaTestを加えています。

Case Classとテストコードの雛形

今回は、データを扱うのにCase Classを使うことにしました。

簡単にですが、Bookクラスを定義。
src/test/scala/org/littlewings/spark/mysql/Book.scala

package org.littlewings.spark.mysql

case class Book(isbn: String, title: String, price: Int)

テストコードの雛形は、こんな感じにしました。お題の書籍データは、テストコード内に持っておきます。
src/test/scala/org/littlewings/spark/mysql/SparkSqlMySqlSpec.scala

package org.littlewings.spark.mysql

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{FunSpec, Matchers}

class SparkSqlMySqlSpec extends FunSpec with Matchers {
  val javaeeBook: Book = Book("978-4774127804", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava", 4200)
  val springBootBook: Book = Book("978-4777518654", "はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発", 2500)
  val sparkBook: Book = Book("978-4774181240", "詳解 Apache Spark", 3888)

  val books: Array[Book] = Array(javaeeBook, springBootBook, sparkBook)

  describe("Spark SQL MySQL Spec") {
    // ここに、テストを書く!
  }
}

ここに、テストコードを埋めていきます。

データを保存する

それでは、まずはデータを保存してみましょう。

できあがったコードは、このようになりました。

    it("write to MySQL, from DataFrame") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE book
          |USING jdbc
          |OPTIONS (
          | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false',
          | user 'kazuhira',
          | password 'password',
          | dbtable 'book'
          |)""".stripMargin)

      val bookDf = sqlContext.createDataFrame(books)
      bookDf.write.insertInto("book")

      sc.stop()
    }

最初にCREATE TEMPORARY TABLEする必要があるようです。OPTIONSでは、JDBCの接続URLやユーザー名、パスワードなどのJDBC接続関連のプロパティを設定します。また、dbtableというパラメーターも設定します。

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE book
          |USING jdbc
          |OPTIONS (
          | url 'jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false',
          | user 'kazuhira',
          | password 'password',
          | dbtable 'book'
          |)""".stripMargin)

この状態のSQLContextに対して、あらかじめArrayとして作成しておいた書籍データをDataFrameに変換して保存します。

      val bookDf = sqlContext.createDataFrame(books)
      bookDf.write.insertInto("book")

これでなんと、MySQLにデータが保存できました。

mysql> SELECT * FROM book;
+----------------+---------------------------------------------------------------------------------+-------+
| isbn           | title                                                                           | price |
+----------------+---------------------------------------------------------------------------------+-------+
| 978-4774127804 | Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava                     |  4200 |
| 978-4774181240 | 詳解 Apache Spark                                                               |  3888 |
| 978-4777518654 | はじめてのSpring Boot―「Spring Framework」で簡単Javaアプリ開発                  |  2500 |
+----------------+---------------------------------------------------------------------------------+-------+
3 rows in set (0.00 sec)

DataFrame APIでデータを読み出す

では、今度は保存したデータを読み出してみましょう。ここでは、DataFrame APIを使います。

できあがったコードは、こんな感じ。

    it("read from MySQL, as DataFrame") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      import sqlContext.implicits._

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      val resultBooks =
        jdbcDf
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(3)
      resultBooks(0) should be(springBootBook)
      resultBooks(1) should be(sparkBook)
      resultBooks(2) should be(javaeeBook)

      sc.stop()
    }

先ほどと変わったところは、SQLContext#read#formatで「jdbc」を指定し、そのあとoptionsに接続に関する情報はMapとして渡して接続します。こちらの場合は、CREATE TEMPORARY TABLEしなくても、このコードで動きました…。

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()

あとは、DataFrame APIに沿って操作すればOKっぽいです。

      val resultBooks =
        jdbcDf
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

WHERE句を使ったりもできます。

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      val resultBooks =
        jdbcDf
          .where('price >= 4000)
          .orderBy('price.asc)
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(1)
      resultBooks(0) should be(javaeeBook)

Spark SQLでデータを読み出す

最後は、Spark SQLで書いてみます。

コードは、こんな感じになりました。

    it("read from MySQL, as SQL") {
      val conf =
        new SparkConf()
          .setAppName("Spark SQL from MySQL")
          .setMaster("local[*]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      val options = Map(
        "url" -> "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&characterSetResults=utf-8&useServerPrepStmts=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false",
        "user" -> "kazuhira",
        "password" -> "password",
        "dbtable" -> "book"
      )

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      jdbcDf.registerTempTable("book")

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(3)
      resultBooks(0) should be(springBootBook)
      resultBooks(1) should be(sparkBook)
      resultBooks(2) should be(javaeeBook)

      sc.stop()
    }

接続あたりまではDataFrame APIの時とかなり近いですが、こちらの場合はregisterTempTableを行っておく必要があります。

      jdbcDf.registerTempTable("book")

あとはSQLContext#sqlでクエリが投げられるので、ここからDataFrameを取得、操作します。

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

WHERE句を使っても大丈夫。

      val jdbcDf = sqlContext.read.format("jdbc").options(options).load()
      jdbcDf.registerTempTable("book")

      val selectDf =
        sqlContext.sql("SELECT isbn, title, price FROM book WHERE price >= 4000 ORDER BY price ASC")

      val resultBooks =
        selectDf
          .map(row => Book(row.getString(0), row.getString(1), row.getInt(2)))
          .collect()

      resultBooks should have size(1)
      resultBooks(0) should be(javaeeBook)

まとめ

JDBC接続可能なデータベース(今回はMySQL)に対して、Apache Sparkで接続してDataFrame API/Spark SQLでいろいろ操作しました。

あんまりふつうのRDBMSにつなげるような例を見かけなかったので、動かすまでにはそれなりに苦労しましたが、まあなんとかなってよかったです。

今回のコードを書くにあたっては、最初に載せたオフィシャルのドキュメント以外に、以下のページを参考にしています。

Apache Spark の JdbcRDD を使ってみた結果 - Qiita

Spark SQL MySQL Example with JDBC