Hatena::ブログ(Diary)

CLOVER

2017-07-23

Infinispan 9.1で追加された、Scattered Cacheを試す

先日、Infinispan 9.1.0.Finalがリリースされました。

Infinispan: Infinispan 9.1 ”Bastille”

新機能もいくつか増えていますが、今回は新しいCache、「Scattered Cache」を試してみたいと思います。

Scattered cache

A new clustered cache, similar to a distributed cache, but with a higher write throughput.

http://blog.infinispan.org/2017/07/infinispan-91-bastille.html

Scattered Cacheとは?

ドキュメントについては、こちら。

Scattered Mode

ざっと抜き出すと、こういう性格みたいです

  • クラスタリングすることで線形にスケールするCacheで、Distributed Cacheによく似ている
  • Distributed Cacheとは異なり、データの位置は固定されていない
  • Consistent Hashアルゴリズムを使用して、Primary Ownerを決定する
  • バックアップは、前回データを書き込んだNodeに格納される
  • バックアップの正確な位置は重要ではない(というか、わからない?)

バックアップについては、いまひとつ掴めないような…。

  • Distributed Cacheと異なり、書き込みが単一のRPCとなる利点がある
  • 読み取りは、常にPrimary Ownerを対象にする必要がある
  • 結果、書き込みは速くなり、読み込みは遅くなる可能性がある

つまり、書き込みが集中するアプリケーションに適したCacheですと。

  • 複数のバックアップコピーを保持した場合、メモリの消費量が増加する
  • 期限切れのバックアップコピーを削除するため、無効化するためのメッセージをクラスタ内にブロードキャストするが、オーバーヘッドがある
  • とても大きなクラスタだと、パフォーマンスが低下する

バックアップがあちこちにできそうな感じですね、それを無効化するメッセージをブロードキャストしてデータの
状態を保つ、と。

  • NodeがクラッシュしてPrimaryが失われた場合は、クラスタ内のバックアッププロセスを調整して、最後に書き込まれたバックアップコピーを探す
  • このため、ネットワークトラフィックが増加する
  • データライターもバックアップであるため、トランスポートレベルでmachine/rack/siteのIDを指定しても、同じmachine/rack/siteで複数の障害が発生した場合にクラスタを回復することができない
  • トランザクションと非同期レプリケーションはサポートされていない

といった感じです。

なんか、バックアップがあちこちにありそうなイメージの説明です。で、それを無効化する…と。

まあ、説明の読み解きはこのくらいにして、使ってみましょう。

準備

sbtでの依存関係の定義は、こちら。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "9.1.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.3" % Test
)

ScalaTestは、テストコード用です。

Infinispanの設定ファイルは、こちら。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.1 http://www.infinispan.org/schemas/infinispan-config-9.1.xsd"
        xmlns="urn:infinispan:config:9.1">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>
    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <!-- あとで -->

    </cache-container>
</infinispan>

テストコードの雛形

テストコードを書くにあたり、こんな雛形を用意。
src/test/scala/org/littlewings/infinispan/scattered/ScatteredCacheSpec.scala

package org.littlewings.infinispan.scattered

import java.util.concurrent.TimeUnit

import org.infinispan.Cache
import org.infinispan.commons.util.Util
import org.infinispan.distribution.group.impl.PartitionerConsistentHash
import org.infinispan.manager.{DefaultCacheManager, EmbeddedCacheManager}
import org.infinispan.util.function.SerializableToIntFunction
import org.scalatest.{FunSuite, Matchers}

class ScatteredCacheSpec extends FunSuite with Matchers {
  // あとでテストを書く!

  def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)

      fun(cache)
    } finally {
      managers.foreach(_.stop())
    }
  }

  def withCacheWithManagers[K, V](cacheName: String, numInstances: Int = 1)(fun: (Cache[K, V], IndexedSeq[EmbeddedCacheManager]) => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)

      fun(cache, managers)
    } finally {
      managers.foreach(_.stop())
    }
  }
}

簡単にクラスタを構成できるヘルパーメソッド付きですが、クラスタを構成する各EmbeddedCacheManagerを取得できるパターンも用意。

使ってみる

それでは、Scattered Cacheを使ってみます。

まずは、設定ファイルに最小構成で定義。

        <scattered-cache name="simpleScatteredCache"/>

Scattered Cacheの設定項目自体は、こちらを見るとよいでしょう。

urn:infinispan:config:9.1

とりあえず、Getting Started的な。

  test("simple use Scattered Cache") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      (1 to 10).foreach(i => cache.get(s"key$i") should be(s"value$i"))

      cache should have size (10)
    }
  }

ふつーのCacheですね。

クラスタ内のNode数は、3としています。

    withCache[String, String]("simpleScatteredCache", 3) { cache =>

少し、設定まわりを見てみましょう。

  test("Scattered Cache, default configuration") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val dm = cache.getAdvancedCache.getDistributionManager
      dm.getReadConsistentHash.toString should startWith("PartitionerConsistentHash:ScatteredConsistentHash{ns=")
      dm.getWriteConsistentHash.toString should startWith("PartitionerConsistentHash:ScatteredConsistentHash{ns=")

      dm.getReadConsistentHash.getNumSegments should be(256)
      dm.getWriteConsistentHash.getNumSegments should be(256)

      cache.getCacheConfiguration.clustering.hash.numOwners should be(1)

      val cacheTopology = dm.getCacheTopology

      (1 to 10).foreach { i =>
        val distributionInfo = cacheTopology.getDistribution(s"key$i")
        distributionInfo.writeOwners should have size (1)
        distributionInfo.writeBackups should be(empty)
      }
    }
  }

ConsistentHashとしては、PartitionerConsistentHash/ScatteredConsistentHashという階層になっているようです。

Segment数は、256。

      dm.getReadConsistentHash.getNumSegments should be(256)
      dm.getWriteConsistentHash.getNumSegments should be(256)

データのオーナー数は、なんと1です。

      cache.getCacheConfiguration.clustering.hash.numOwners should be(1)

変更することもできません。これで、バックアップ大丈夫?とか思うのですが、これでも大丈夫です。

この状態なので、Owner数は1、バックアップ先は0となっていますけれど…。

      val cacheTopology = dm.getCacheTopology

      (1 to 10).foreach { i =>
        val distributionInfo = cacheTopology.getDistribution(s"key$i")
        distributionInfo.writeOwners should have size (1)
        distributionInfo.writeBackups should be(empty)
      }

なので、Nodeダウンの時の挙動が気になるわけですが

  test("Scattered Cache, node down") {
    withCacheWithManagers[String, String]("simpleScatteredCache", 3) { (cache, managers) =>
      (1 to 100).foreach(i => cache.put(s"key$i", s"value$i"))
      cache should have size (100)

      val anotherCache = managers(1).getCache[String, String]("simpleScatteredCache")
      anotherCache should have size (100)

      cache.getCacheManager.stop()
      TimeUnit.SECONDS.sleep(3L)
      anotherCache should have size (100)

      managers(2).stop()
      TimeUnit.SECONDS.sleep(3L)
      anotherCache should have size (100)

      anotherCache.stop()
    }
  }

急激に多重障害でも起こさなければ、Distributed Cacheと同じようにバックアップから復旧してくれるようです。

データの配置状況を確認してみましょう。

  test("Scattered Cache, data distribution") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val self = cache.getCacheManager.getAddress
      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      println(s"self = $self")
      (1 to 10).foreach(i => println(cacheTopology.getDistribution(s"key$i").primary()))

      (1 to 10).foreach(i => cache.put(s"key$i", s"value2-$i"))

      println(s"self = $self")
      (1 to 10).foreach(i => println(cacheTopology.getDistribution(s"key$i").primary()))
    }
  }

1回、途中でputし直しています。

結果。なお、selfというのはLocal Nodeです。

## 1回目
self = xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-16211
xxxxx-3970
xxxxx-61451
xxxxx-16211
xxxxx-61451
xxxxx-3970
xxxxx-3970


## 2回目
self = xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-61451
xxxxx-16211
xxxxx-3970
xxxxx-61451
xxxxx-16211
xxxxx-61451
xxxxx-3970
xxxxx-3970

やっぱり、バックアップ先が表示されませんけどね…。

Consistent Hashで、キーから配置先のSegmentを算出していることの確認。

  test("Scattered Cache, hash segment") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val dm = cache.getAdvancedCache.getDistributionManager
      val readConHash = dm.getReadConsistentHash.asInstanceOf[PartitionerConsistentHash]
      val writeConHash = dm.getWriteConsistentHash.asInstanceOf[PartitionerConsistentHash]

      val readSegmentSize = readConHash.getNumSegments
      val readHash = readConHash.getHashFunction
      val writeSegmentSize = writeConHash.getNumSegments
      val writeHash = writeConHash.getHashFunction

      (1 to 10).foreach { i =>
        val key = s"key$i"
        ((readHash.hash(key) & Integer.MAX_VALUE) / Util.getSegmentSize(readSegmentSize)) should be(readConHash.getSegment(key))
        ((writeHash.hash(key) & Integer.MAX_VALUE) / Util.getSegmentSize(writeSegmentSize)) should be(writeConHash.getSegment(key))
      }
    }
  }

Distributed Stream APIの使用も可能です。

  test("Scattered Cache, stream api") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

      val sum =
        cache
          .values
          .stream
          .mapToInt(new SerializableToIntFunction[String] {
            override def applyAsInt(value: String): Int =
              Integer.parseInt(value.replace("value", ""))
          })
          .sum

      sum should be(55)
    }
  }

データの配置をコントロールしてみましょう。

デフォルトだと、HashFunctionPartitionerが使用されます。

なので、データの配置状況を確認すると

  test("Scattered Cache, non key partitioner") {
    withCache[String, String]("simpleScatteredCache", 3) { cache =>
      (1 to 10).foreach { i =>
        val partition = (i % 3) + 1
        cache.put(s"key$partition-$i", s"value$partition-$i")
      }

      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      val keys = cache.keySet

      keys.forEach { key =>
        println(s"key => $key")
        println(cacheTopology.getWriteOwners(key))
      }
    }
  }

まあ、バラバラになります。
※グルーピングのために、「keyN-M」としています

key => key3-5
[xxxxx-41610]
key => key3-8
[xxxxx-41610]
key => key3-2
[xxxxx-41610]
key => key1-9
[xxxxx-41610]
key => key2-7
[xxxxx-41610]
key => key2-4
[xxxxx-25138]
key => key2-10
[xxxxx-36570]
key => key1-3
[xxxxx-36570]
key => key1-6
[xxxxx-36570]
key => key2-1
[xxxxx-36570]

ここで、KeyPartitionerを作成してみます。内容は適当です。
src/test/scala/org/littlewings/infinispan/scattered/SimpleKeyPartitioner.scala

package org.littlewings.infinispan.scattered

import org.infinispan.configuration.cache.HashConfiguration
import org.infinispan.distribution.ch.KeyPartitioner

class SimpleKeyPartitioner extends KeyPartitioner {
  var configuration: HashConfiguration = _

  override def init(configuration: HashConfiguration): Unit = {
    this.configuration = configuration
  }

  override def getSegment(key: Any): Int =
    (Integer.parseInt(key.asInstanceOf[String].replaceAll("""key(\d+)-\d+""", "$1")) * 150 + 73) % configuration.numSegments
}

設定ファイル上では、このように設定。

        <scattered-cache name="keyPartitionedScatteredCache"
                         key-partitioner="org.littlewings.infinispan.scattered.SimpleKeyPartitioner"/>

あとは、テストを実行。

  test("Scattered Cache, key partitioner") {
    withCache[String, String]("keyPartitionedScatteredCache", 3) { cache =>
      (1 to 10).foreach { i =>
        val partition = (i % 3) + 1
        cache.put(s"key$partition-$i", s"value$partition-$i")
      }

      val dm = cache.getAdvancedCache.getDistributionManager
      val cacheTopology = dm.getCacheTopology

      val keys = cache.keySet

      keys.forEach { key =>
        println(s"key => $key")
        println(cacheTopology.getWriteOwners(key))
      }
    }
  }

とりあえず(?)、配置先が「keyN」ごとにグルーピングされましたね。

key => key2-10
[xxxxx-22564]
key => key1-9
[xxxxx-22564]
key => key1-3
[xxxxx-22564]
key => key1-6
[xxxxx-22564]
key => key2-7
[xxxxx-22564]
key => key2-4
[xxxxx-22564]
key => key2-1
[xxxxx-22564]
key => key3-5
[xxxxx-29791]
key => key3-8
[xxxxx-29791]
key => key3-2
[xxxxx-29791]

サンプルとしては、こんなところで。

少し中身を

と、ここまで試してみたところで、軽く中身の方を見ておきたいと思います。

説明が特徴的なRead/Writeのところですが、確かにPrimary Ownerにアクセスしにいくコードが目立ちます。

Rread - ScatteredDistributionInterceptor#handleReadCommand
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L595

Write - ScatteredDistributionInterceptor#handleWriteCommand
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L149

バックアップ先は、次の(?)Memberらしいですよ。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java#L569

バックアップとかの実装は、確かにある意味固定的(?)な感じが。

あと、今回の説明では記載しませんでしたが、Scattered Cacheの「invalidation-batch-size」という設定は、ScatteredVersionManagerImplクラスで
使用されます。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java

登録・削除したキーを保持しておいて、その数が「invalidation-batch-size」を超えるとクリーンアップが始まります。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java#L210
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/scattered/impl/ScatteredVersionManagerImpl.java#L465

登録の契機は、ScatteredDistributionInterceptorだったりします。
https://github.com/infinispan/infinispan/blob/9.1.0.Final/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java

…とりあえず、このくらいで。

まとめ

Infinispan 9.1.0.Finalで追加された、Scattered Cacheを試してみました。内部的な挙動はちゃんと把握しきれていませんが、使い方と実装コードの
なんとなくの雰囲気はつかめたのでいいかなと思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-scattered-cache

2017-07-14

Docker Compose 1.13.0から「scale」が非推奨になり、「up --scale」になったという話

Docker Composeで、「scale」コマンドを使用すると起動しているコンテナをスケールさせることができます。

docker-compose scale

こちらが、実行すると警告されるようになったという話。

例えば、こういうdocker-compose.ymlを用意して
docker-compose.yml

version: "3.3"
services:
  infinispan:
    image: jboss/infinispan-server:9.0.3.Final

起動。

$ docker-compose up

「scale」で、コンテナをスケールさせます。

$ docker-compose scale infinispan=3
WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.
Starting ispn_infinispan_1 ... done
Creating ispn_infinispan_2 ... 
Creating ispn_infinispan_3 ... 
Creating ispn_infinispan_2 ... done
Creating ispn_infinispan_3 ... done

すると、スケールするにはするのですが、よくよく見ると警告されています。

WARNING: The scale command is deprecated. Use the up command with the --scale flag instead.

確かに、「scale」コマンド上でもdeprecatedになっています。

Note: This command is deprecated. Use the up command with the --scale flag instead.

https://docs.docker.com/compose/reference/scale/

「--scale」を使え、と。

で、Release Notesを見てみると、1.13.0から「scale」コマンドが非推奨となり、「--scale」オプションになったようです。

Docker Compose release notes

「--scale」は、「up」コマンドのオプションになります。

docker-compose up

試してみましょう。コンテナを3つにして起動してみます。

$ docker-compose up --scale infinispan=3
Creating ispn_infinispan_1 ... 
Creating ispn_infinispan_2 ... 
Creating ispn_infinispan_3 ... 
Creating ispn_infinispan_1 ... done
Creating ispn_infinispan_2 ... done
Creating ispn_infinispan_3 ... done

〜省略〜

確かに、3つ起動しました。

これは便利ですね、覚えておきましょう。

Apache Kafkaのkafka-console-producer.shでキーを指定する

Apache Kafkaには付属のツールとして、コンソールを使ったProducer/Consumerが付いています。

このうち、Producer(kafka-console-producer.sh)を使った時に、キーを指定してみようというお話。

まずは、Apache Kafkaのクラスタを用意します。Brokerは3つ、Apache ZooKeeperはひとつとします。

Topicを作成。最初はキーを指定しないパターンでやってみます。Partition数は3、レプリケーションは2とします。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic
Created topic "my-topic".

確認。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 5,4	Isr: 5,4

Producer(kafka-console-producer.sh)でデータを登録してみます。

$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic
>value1
>value2
>value3
>value4
>value5
>value6
>value7
>value8
>value9

Consumer(bin/kafka-console-consumer.sh)で、最初からデータを読み出してみます。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning
value1
value4
value7
value2
value5
value8
value3
value6
value9

けっこうすごい順番になりました。パーティションごとにデータを抜いてきている感じがしますねぇ。

なお、キー指定がない場合のパーティション振り分けは、ラウンドロビンなのでした。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L57-L66

今度は、キーを指定するようにしてみます。先ほどとは別のTopicを作ってみましょう。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic-with-key
Created topic "my-topic-with-key".

Partition数とレプリケーション数は同じです。

describe。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic-with-key
Topic:my-topic-with-key	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic-with-key	Partition: 0	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic-with-key	Partition: 1	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic-with-key	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4

データ登録。キーは、データ3つにつき1回切り替えます。

$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic-with-key --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key1:value2
>key1:value3
>key2:value4
>key2:value5
>key2:value6
>key3:value7
>key3:value8
>key3:value9

プロパティ「parse.key」をtrueにすることでキーのパースを有効に、セパレーターはプロパティ「key.separator」で指定することができます。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L299-L302

今回は、セパレーターを「:」としました。

データの取得。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic-with-key --from-beginning
value1
value2
value3
value4
value5
value6
value7
value8
value9

今回は、順番どおりになりました、と。キーを指定された場合の振り分けの実装は、こちら。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L68-L69

Trifectaで見ると、ちゃんと(?)振り分けられているのが確認できます。

キー指定なし。
f:id:Kazuhira:20170715002255p:image

キー指定あり。
f:id:Kazuhira:20170715002336p:image

ちょっと偏っていますが…ご愛嬌…。

Apache Kafkaでクラスタを構成してみる

Apache KafkaをしばらくSingle Nodeで扱ってきましたが、そろそろクラスタを構成してみたいと思います。

クラスタの構成の仕方は、Apache Kafkaのドキュメント、Quick Startを見ると書いてあったりします。

Quick Start / Step 6: Setting up a multi-broker cluster

また、このあたりも参考に。

CentOS 7でKafkaクラスタを構築する | 俺的備忘録 〜なんかいろいろ〜

複数台マシンを用いたKafkaクラスタの構築方法 - 夢とガラクタの集積場

Apache Kafkaのクラスタ構成を試してみる - Qiita

各Brokerのidを設定し、Apache ZooKeeperの接続先を合わせてあげればいいみたいです。

というわけで、クラスタを組んでみましょう。今回は、こんな構成でいきます。

  • Apache ZooKeeper … 172.21.0.2
  • Apache Kafka - Broker 1 … 172.21.0.3
  • Apache Kafka - Broker 2 … 172.21.0.4
  • Apache Kafka - Broker 3 … 172.21.0.5

Apache ZooKeeper 1台に、Apache Kafka 3台という構成。

Apache Kafka側で設定するポイントは、config/server.propertiesです。設定したポイントを抜粋すると、こんな感じ。

## Broker 1
broker.id=1
listeners=PLAINTEXT://172.21.0.3:9092
zookeeper.connect=172.21.0.2:2181


## Broker 2
broker.id=2
listeners=PLAINTEXT://172.21.0.4:9092
zookeeper.connect=172.21.0.2:2181


## Broker 3
broker.id=3
listeners=PLAINTEXT://172.21.0.5:9092
zookeeper.connect=172.21.0.2:2181

listenersはリモート接続を受け付けるためですが、BrokerにはユニークなIDを設定する必要があるみたいで、それが「broker.id」です。
あとは、Apache ZooKeeprの接続先を合わせておきました。

Apache ZooKeeperは、とりあえずふつうに起動します。

$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

あとは、各Brokerを順次起動。

## Broker 1
$ bin/kafka-server-start.sh -daemon config/server.properties

## Broker 2
$ bin/kafka-server-start.sh -daemon config/server.properties

## Broker 3
$ bin/kafka-server-start.sh -daemon config/server.properties

kafka-managerで見ると、こんな感じになっています。
f:id:Kazuhira:20170714232527p:image

Topicを追加してみましょう。Broker 1で実行します。Partition数は3、レプリケーションは2に設定。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic replicated-topic
Created topic "replicated-topic".

describeして、状態を見てみます。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic replicated-topic
Topic:replicated-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: replicated-topic	Partition: 1	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: replicated-topic	Partition: 2	Leader: 3	Replicas: 3,1	Isr: 3,1

それぞれの意味は、こんな感じ。

・"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
・"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
・"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

https://kafka.apache.org/documentation/#quickstart_multibroker

Partition 0のリーダーはIDが1のBroker、バックアップは2、Partition 1はリーダーが2、バックアップが3、Partition 2はリーダーが3、バックアップが1
という感じですね。

データを登録(Producer)してみましょう。Broker 1を「--broker-list」に指定して実行します。

$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic replicated-topic

Consumerは、Broker 2から読み出してみます。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.4:9092 --topic replicated-topic --from-beginning

では、データを登録して読み出してみます。

## Producer
>Hello Workd
>Kafka Cluster
>Apache ZooKeeper
>Oops!!

## Consumer
Hello Workd
Kafka Cluster
Apache ZooKeeper
Oops!!

OKそうですね。

どのBrokerから読み出してもOK。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092  --topic replicated-topic --from-beginning
Hello Workd
Oops!!
Kafka Cluster
Apache ZooKeeper

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.5:9092 --topic replicated-topic --from-beginning
Hello Workd
Oops!!
Apache ZooKeeper
Kafka Cluster

「--from-beginning」をつけているので、最初から読み出してくれます。

全Brokerを指定してもかまいません。

## Producer
$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092 --topic replicated-topic

## Consumer
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092 --topic replicated-topic

Consumerは、今度は最後から読むことにします。

実行。

## Producer
>Test
>Foo
>Bar
>Fuga
>Hoge

## Consumer
Test
Foo
Bar
Fuga
Hoge

こちらもOK、と。

まとめ

Apache Kafkaを使ってクラスタを構成して、付属のコンソールツールでデータの登録と読み出しをやってみました。

とりあえず組んでみる、というのであれば割と簡単にできるようです。

2017-07-08

はじめてのHazelcast Jet(Client/Server Distributed Stream API)

この前、はじめてHazelcast Jetを使って分散Stream APIでWord Countをしてみたのですが、

はじめてのHazelcast Jet(Embedded Distributed Stream API) - CLOVER

この時はEmbedded Modeで書いていました。今度は、同じネタをClient/Server Modeで行ってみたいと思います。

内容は前回とほとんど同じなので、似たような説明は割愛します。

準備

今回使ったMaven依存関係は、こちら。

        <dependency>
            <groupId>com.hazelcast.jet</groupId>
            <artifactId>hazelcast-jet-core</artifactId>
            <version>0.4</version>
        </dependency>

Embeddedの時にも書きましたが、アーティファクト「hazelcast-jet」を選択してもOKです。この場合は、
Maven Shade PluginでひとかたまりのJARとして依存関係に追加されるという話でした。

また、Hazelcast JetではEmbedded ModeであってもClient/Server Modeであっても、通常指定するモジュールでは
Client/Serverの区別はありません。同じように依存関係に追加すればOKです。

まあ、利用するAPIは一部違いますが。

お題

今回のお題は、このように。

  • Jet 0.4 is Released | Hazelcast Blogの内容を元に、WordCount
  • WordCountには、Stream APIを使用
  • 結果は、ソートして上位20件を取得
  • Hazelcast Jet Nodeは、Client Nodeひとつ、Server Node 3つで構成

前回は全部Embeddedな感じでしたが、今回はジョブを起動するのがClientになり、それ以外はServer、となります。
まあ、Server=Embeddedなとらえ方でいいのですが。

WordCount対象のテキストは、前回と同じようにこんな感じで。

$ head -n 20 src/main/resources/jet-0_4-release.txt
We are happy to announce the release of Hazelcast Jet 0.4. This is the first major release since our inital version and has many improvements bringing us closer to our final vision for Jet:

Improved streaming support including windowing support with event-time semantics.
Out of the box support for tumbling, sliding and session window aggregations.
New AggregateOperation abstraction with several built-in ones including count, average, sum, min, max and linear regression.
Hazelcast version updated to 3.8.2 and Hazelcast IMDG is now shaded inside hazelcast-jet.jar.
Several built-in diagnostic processors and unit test support for writing custom processors.
Many new code samples including several streaming examples and enrichment and co-group for batch operations.
New sources and sinks including ICache, socket and file.
Windowing Support with Event-Time semantics

The biggest improvement is the addition of tools for using Jet on infinite streams of data. Dealing with streaming data is fundamentally different than batch or micro-batch processing as both input and output is continuous. Most streaming computations also deal with some notion of time where you are interested in how a value changes over time. The typical way to deal with streaming data is to look at it in terms of “windows”, where a window represents a slice of the data stream, usually constrained for a period of time.

Jet 0.4 adds several processors which deal specifically with aggregation of streaming data into windows. Types of windowing supported can be summarised as follows:

Tumbling Windows: Fixed-size, non-overlapping and contiguous intervals. For example a tumbling window of 1 minute would mean that each window is of 1 minute length, are not overlapping, and contiguous to each other.
Sliding Windows: A variation on tumbling windows, sliding windows can have overlapping elements and slide in fixed increments as time advances.
Session Windows: Typically captures a burst of events which are separated by a period of inactivity. This can be used to window events by a user session, for example.
Jet also supports the notion of “event-time” where events can have their own timestamp and can arrive out of order. This is achieved by inserting watermarks into the stream of events which drive the passage of time forward.


サンプルプログラム

サンプルプログラムとしては、次の2つを用意します。

  • とりあえず浮いていてもらう、EmbeddedなHazelcast Jet Server
  • Hazelcast Jetクラスタにデータを入れ、WordCountを行うClient

Serverの方、なにか作るの?という感じかもしれませんが、まあ要ります。

先に簡単なEmbeddedなServerの方から。こちらは、前回とまったく同じです。
src/main/java/org/littlewings/hazelcast/jet/EmbeddedJetServer.java

package org.littlewings.hazelcast.jet;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;

public class EmbeddedJetServer {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetInstance();

        System.console().readLine("> stop enter.");

        jet.shutdown();
        Jet.shutdownAll();
    }
}

Enterを打ったら、おしまい。

Word Countの方も、基本的には前回と同じです。
src/main/java/org/littlewings/hazelcast/jet/WordCountRunner.java

package org.littlewings.hazelcast.jet;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import com.hazelcast.jet.Jet;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.stream.DistributedCollectors;
import com.hazelcast.jet.stream.IStreamMap;

public class WordCountRunner {
    public static void main(String... args) {
        JetInstance jet = Jet.newJetClient();

        IStreamMap<Integer, String> map = jet.getMap("source");

        try (InputStream is = WordCountRunner.class.getClassLoader().getResourceAsStream("jet-0_4-release.txt");
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger count = new AtomicInteger();
            lines.forEach(line -> map.put(count.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        IStreamMap<String, Integer> wordCount =
                map
                        .stream()
                        .map(e -> e.getValue().replaceAll("[=()\"',.:;”-]", " ").trim())
                        .filter(line -> !line.isEmpty())
                        .flatMap(line -> Arrays.stream(line.split(" +")))
                        .map(word -> {
                            String lc = word.toLowerCase();
                            System.out.printf("[%s] %s%n", Thread.currentThread().getName(), lc);
                            return lc;
                        })
                        .collect(DistributedCollectors.toIMap(
                                "word-count-map",
                                word -> word,
                                word -> 1,
                                Integer::sum)
                        );

        List<Map.Entry<String, Integer>> top20 =
                wordCount
                        .stream()
                        .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                        .limit(20)
                        .collect(DistributedCollectors.toList());

        System.out.println("result:");
        top20.forEach(e -> System.out.println("  " + e));

        System.out.println();
        System.out.println("source map size = " + jet.getMap("source").size());
        System.out.println("word-count-map size = " + jet.getMap("word-count-map").size());

        jet.shutdown();
        Jet.shutdownAll();
    }
}

違うのは、ここだけです。

        JetInstance jet = Jet.newJetClient();

Hazelcast Jetを、HazelcastのClientとして動作させます。

あとはStream APIで、FunctionやCollectorをSerializableなものとして放り込みます。このあたりの型は、
Hazelcast Jetが面倒をみています。

動作確認

では、動作確認してみます。

Hazelcast Jet Serverを3 Node起動します。

## Node 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

## Node 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

## Node 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.EmbeddedJetServer

クラスタが構成されます。

Members [3] {
	Member [172.20.0.1]:5701 - 1ff27fc1-798c-4449-8a3b-5721be2838b7 this
	Member [172.20.0.1]:5702 - 3c8e2714-d0b0-47d4-ace2-340e406a9ef3
	Member [172.20.0.1]:5703 - c368c771-a84f-4216-ae14-311f3f35b0cd
}

Word Countを実行。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.hazelcast.jet.WordCountRunner

どういうプランで実行されるかも表示されますが、これが出力されるのはServer側です。

7 09, 2017 12:40:20 午前 com.hazelcast.jet.impl.operation.ExecuteJobOperation
情報: [172.20.0.1]:5701 [jet] [0.4] [3.8.2] Start executing job 1: dag
    .vertex("read-map-word-count-map")
    .vertex("sort-34e8961fcc47").localParallelism(1)
    .vertex("limit-local-1a0370323cca").localParallelism(1)
    .vertex("accumulator").localParallelism(1)
    .vertex("combiner").localParallelism(1)
    .vertex("write-__jet_list_6a26dceb-1c18-4346-90d0-a174aa291b0f")
    .edge(between("read-map-word-count-map", "sort-34e8961fcc47").partitioned(?).distributed())
    .edge(between("sort-34e8961fcc47", "limit-local-1a0370323cca"))
    .edge(between("limit-local-1a0370323cca", "accumulator"))
    .edge(between("accumulator", "combiner").partitioned(?).distributed())
    .edge(between("combiner", "write-__jet_list_6a26dceb-1c18-4346-90d0-a174aa291b0f"))

また、各Server Nodeで処理が実行されていることもわかります。

## Node 1
情報: [172.20.0.1]:5701 [jet] [0.4] [3.8.2] Start execution of plan for job 0 from caller [172.20.0.1]:5703.
[hz._hzInstance_1_jet.jet.cooperative.thread-1] averaginglong
[hz._hzInstance_1_jet.jet.cooperative.thread-4] as
[hz._hzInstance_1_jet.jet.cooperative.thread-4] with
[hz._hzInstance_1_jet.jet.cooperative.thread-0] new
[hz._hzInstance_1_jet.jet.cooperative.thread-0] aggregateoperation
[hz._hzInstance_1_jet.jet.cooperative.thread-0] abstraction
[hz._hzInstance_1_jet.jet.cooperative.thread-0] with
[hz._hzInstance_1_jet.jet.cooperative.thread-0] several
[hz._hzInstance_1_jet.jet.cooperative.thread-0] built
[hz._hzInstance_1_jet.jet.cooperative.thread-0] in
[hz._hzInstance_1_jet.jet.cooperative.thread-0] ones
〜省略〜


## Node 2
[hz._hzInstance_1_jet.jet.cooperative.thread-2] vertex
[hz._hzInstance_1_jet.jet.cooperative.thread-2] source
[hz._hzInstance_1_jet.jet.cooperative.thread-2] dag
[hz._hzInstance_1_jet.jet.cooperative.thread-2] newvertex
[hz._hzInstance_1_jet.jet.cooperative.thread-2] source
[hz._hzInstance_1_jet.jet.cooperative.thread-2] sources
[hz._hzInstance_1_jet.jet.cooperative.thread-2] streamsocket
[hz._hzInstance_1_jet.jet.cooperative.thread-2] host
[hz._hzInstance_1_jet.jet.cooperative.thread-2] port
[hz._hzInstance_1_jet.jet.cooperative.thread-2] several
[hz._hzInstance_1_jet.jet.cooperative.thread-2] examples
[hz._hzInstance_1_jet.jet.cooperative.thread-2] with
[hz._hzInstance_1_jet.jet.cooperative.thread-2] windowing
[hz._hzInstance_1_jet.jet.cooperative.thread-2] have
〜省略〜


## Node 3
[hz._hzInstance_1_jet.jet.cooperative.thread-2] new
[hz._hzInstance_1_jet.jet.cooperative.thread-2] classes
[hz._hzInstance_1_jet.jet.cooperative.thread-2] have
[hz._hzInstance_1_jet.jet.cooperative.thread-2] been
[hz._hzInstance_1_jet.jet.cooperative.thread-2] added
[hz._hzInstance_1_jet.jet.cooperative.thread-2] to
[hz._hzInstance_1_jet.jet.cooperative.thread-2] make
[hz._hzInstance_1_jet.jet.cooperative.thread-2] it
[hz._hzInstance_1_jet.jet.cooperative.thread-2] easier
[hz._hzInstance_1_jet.jet.cooperative.thread-2] to
[hz._hzInstance_1_jet.jet.cooperative.thread-2] write
[hz._hzInstance_1_jet.jet.cooperative.thread-2] unit
〜省略〜

最後に、結果表示。これはClient側で出力されます。

情報: hz.client_0 [jet] [0.4] [3.8.2] Authenticated with server [172.20.0.1]:5702, server version:3.8.2 Local address: /172.20.0.1:44551
result:
  and=28
  the=25
  of=24
  jet=19
  for=18
  a=17
  is=16
  can=15
  to=15
  hazelcast=13
  as=11
  be=11
  in=10
  with=10
  new=9
  streaming=9
  processors=9
  time=8
  also=8
  windows=7

source map size = 87
word-count-map size = 330

結果は、Embedded Modeの時と同じです(そりゃそうだ)。

Client/Server Modeになった時のポイントとまとめ

Stream APIで組み上げられたジョブをHazelcast Jet Serverのクラスタに放り込むのはClientですが、Clientで
実装したクラスがシリアライズされてServer Node上で実行されます。

このため、Server Node上にClient側で実装したクラスが含まれている必要があります。

これを無視して、素のHazelcast Jet Serverを起動すると、Server側でClassNotFoundExceptionと言われてしまいます。

情報: hz.client_0 [jet] [0.4] [3.8.2] HazelcastClient 3.8.2 (20170518 - a60f944) is CLIENT_CONNECTED
[WARNING] 
java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: java.lang.ClassNotFoundException: org.littlewings.hazelcast.jet.WordCountRunner
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:108)
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject$Serializer.read(CustomClassLoadedObject.java:87)
	at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.read(StreamSerializerAdapter.java:48)
	at com.hazelcast.internal.serialization.impl.AbstractSerializationService.readObject(AbstractSerializationService.java:266)
	at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataInput.readObject(ByteArrayObjectDataInput.java:599)
	at com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject.read(CustomClassLoadedObject.java:54)
	at com.hazelcast.jet.Vertex.readData(Vertex.java:167)
〜省略〜

なので、今回はEmbeddedなサーバーを作り、同じMavenプロジェクトから起動するようにしました、と。

今回の範囲だと、注意点はこれくらいですね。

あとはすんなり動きました。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-jet-examples/tree/master/client-server-getting-started

2017-07-07

InfinispanのSpring Session Support(Remote)を試す

前に、Infinispan 9から入ったSpring Sessionのサポート(Embedded Mode)を試しました。

InfinispanのSpring Session Support(Embedded)を試す - CLOVER

今度は、Remote(Hot Rod)で試してみたいと思います。

Externalizing session using Spring Session

どういうものかというと、Spring Sessionでのデータの保存先を、Infinispan Server(Hot Rod)にできるという
話ですね。

プログラムを作るにあたり、前回同様、今回もSpring Boot向けのモジュールを使用します。

Using Infinispan with Spring Boot

GitHub - infinispan/infinispan-spring-boot: Infinispan Spring Boot

Infinispan: Spring Boot Starters

準備

作成したpom.xmlは、このような感じで。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>remote-spring-session</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>

        <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>

        <scala.version>2.12.2</scala.version>
        <infinispan.version>9.0.3.Final</infinispan.version>
        <infinispan-spring-boot.version>1.0.0.Final</infinispan-spring-boot.version>
        <spring-boot.version>1.5.4.RELEASE</spring-boot.version>
        <spring-session.version>1.3.1.RELEASE</spring-session.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>${infinispan.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-spring-boot-starter</artifactId>
            <version>${infinispan-spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-spring4-remote</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.session</groupId>
            <artifactId>spring-session</artifactId>
            <version>${spring-session.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${scala-maven-plugin.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-Xlint</arg>
                        <arg>-unchecked</arg>
                        <arg>-deprecation</arg>
                        <arg>-feature</arg>
                    </args>
                    <recompileMode>incremental</recompileMode>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

主要なライブラリのバージョンは、BOMで指定。今回はInfinispanのものを優先したいので、こちらを先に書いておきます。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.infinispan</groupId>
                <artifactId>infinispan-bom</artifactId>
                <version>${infinispan.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

InfinispanのSpring Boot用のモジュール、Spring 4向けのRemote(Hot Rod)用モジュールを加えます。

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-spring-boot-starter</artifactId>
            <version>${infinispan-spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-spring4-remote</artifactId>
        </dependency>

最後に、Spring BootのWeb用のStarterとSpring Sessionを追加。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.session</groupId>
            <artifactId>spring-session</artifactId>
            <version>${spring-session.version}</version>
        </dependency>

依存関係については、このくらいですね。

Infinispan Serverについては1 Nodeで起動済みとしますが、InfinispanのSpring Session向けのモジュールでは、セッション用のCacheとして
デフォルトで「sessions」という名前のCacheを利用するので、こちらを先に作成しておきます。

Infinispan Serverの管理ユーザーの作成。今回は「ispn-admin / ispn-password」とします。

$ bin/add-user.sh -u ispn-admin -p ispn-password
Added user 'ispn-admin' to file '/path/to/standalone/configuration/mgmt-users.properties'
Added user 'ispn-admin' to file '/path/to/domain/configuration/mgmt-users.properties'

Distributed Cacheの作成。

$ bin/ispn-cli.sh -c -u=ispn-admin -p=ispn-password
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/configurations=CONFIGURATIONS/distributed-cache-configuration=sessions-configuration:add(start=EAGER,mode=SYNC) 
{"outcome" => "success"}
[standalone@localhost:9990 /] /subsystem=datagrid-infinispan/cache-container=clustered/distributed-cache=sessions:add(configuration=sessions-configuration)
{"outcome" => "success"}

これで、準備はおしまいです。

サンプルコードの作成

サンプルコード自体は、Embedded Modeとほぼ同じものを使います。
src/main/scala/org/littlewings/infinispan/spring/App.scala

package org.littlewings.infinispan.spring

import org.infinispan.spring.session.configuration.EnableInfinispanRemoteHttpSession
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication

object App {
  def main(args: Array[String]): Unit = {
    SpringApplication.run(classOf[App], args: _*)
  }
}

@EnableInfinispanRemoteHttpSession
@SpringBootApplication
class App

ポイントは、@EnableInfinispanRemoteHttpSessionアノテーションを付与しておくことです。@EnableInfinispanRemoteHttpSessionアノテーションでは、
Spring Sessionで使うInfinispanのCache名とセッションの有効期限を設定することができます。

デフォルトでは、Cacheの名前が「sessions」で、有効期限が30分です。
https://github.com/infinispan/infinispan/blob/9.0.3.Final/spring/spring4/spring4-remote/src/main/java/org/infinispan/spring/session/configuration/EnableInfinispanRemoteHttpSession.java


今回は、デフォルトのまま使用します。

あとは、RestControllerとセッション間で共有するBeanを作成します。
src/main/scala/org/littlewings/infinispan/spring/CounterController.scala

package org.littlewings.infinispan.spring

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.{GetMapping, RestController}
import org.springframework.web.context.annotation.SessionScope

import scala.collection.JavaConverters._

@RestController
class CounterController(counter: Counter) {
  @GetMapping(Array("counter/access"))
  def access: java.util.Map[String, AnyRef] = {
    counter.increment()
    Map[String, AnyRef](
      "value" -> Integer.valueOf(counter.value),
      "time" -> counter.time.format(DateTimeFormatter.ISO_DATE_TIME)
    ).asJava
  }
}

@SessionScope
@Component
@SerialVersionUID(1L)
class Counter extends Serializable {
  var value: Int = 0

  val time: LocalDateTime = LocalDateTime.now

  def increment(): Unit = value += 1
}

設定ファイルについては、このように。
src/main/resources/hotrod-client.properties

infinispan.client.hotrod.server_list=172.17.0.2:11222

これは、InfinispanのHot Rod Clientが読み込む設定ファイルです。「hotrod-client.properties」ファイルを用意しない場合は、
Spring Bootのapplication.propertiesに設定を書きます。
src/main/resources/application.properties

## hotrod-client.properties を使わない場合はこちらでも可(ファイルがない場合に、このプロパティが参照される)
infinispan.remote.server-list=172.17.0.2:11222

コメントにも似たようなことを書いていますが、「hotrod-client.properties」ファイルを用意した場合は、application.propertiesに書いた
設定は無視されます。

Using Client/Server mode

なお、hotrod-client.propertiesに設定できる項目、キーは、こちらを参照するとよいです。
https://github.com/infinispan/infinispan/blob/9.0.3.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/ConfigurationProperties.java

これで、準備はおしまいです。

確認

では、パッケージングして確認してみます。

$ mvn package

アプリケーションは、2つ起動させましょう。

# No.1
$ java -jar target/remote-spring-session-0.0.1-SNAPSHOT.jar

# No.2
$ java -jar target/remote-spring-session-0.0.1-SNAPSHOT.jar --server.port=9080

curlで各アプリケーションにアクセスして確認します。

## アプリケーション1
$ curl -c cookie.txt -b cookie.txt -i http://localhost:8080/counter/access
HTTP/1.1 200 
Set-Cookie: SESSION=32ebed0d-d477-4352-b2ab-e31163a2ebfb; Path=/; HttpOnly
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:36:03 GMT

{"value":1,"time":"2017-07-07T23:36:00.944"}


## アプリケーション2
$ curl -c cookie.txt -b cookie.txt -i http://localhost:9080/counter/access
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:36:21 GMT

{"value":2,"time":"2017-07-07T23:36:00.944"}


## アプリケーション1
$ curl -c cookie.txt -b cookie.txt -i http://localhost:908080/counter/access
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:36:56 GMT

{"value":3,"time":"2017-07-07T23:36:00.944"}


## アプリケーション2
$ curl -c cookie.txt -b cookie.txt -i http://localhost:809080/counter/access
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:37:23 GMT

{"value":4,"time":"2017-07-07T23:36:00.944"}


## アプリケーション1
$ curl -c cookie.txt -b cookie.txt -i http://localhost:908080/counter/access
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:37:32 GMT

{"value":5,"time":"2017-07-07T23:36:00.944"}

## アプリケーション2
$ curl -c cookie.txt -b cookie.txt -i http://localhost:809080/counter/access
HTTP/1.1 200 
Content-Type: application/json;charset=UTF-8
Transfer-Encoding: chunked
Date: Fri, 07 Jul 2017 14:37:41 GMT

{"value":6,"time":"2017-07-07T23:36:00.944"}


Bean作成時に生成された時間も変わっていませんし、インクリメントした値も共有されてそうなのでOKですね。

まとめ

InfinispanのSpring Session向けのモジュールを、今回はHot Rod Clientで、かつSpring Boot向けのStarterと一緒に使ってみました。

Infinispan ServerにつなぐところとInfinispanのSpring Session有効化のアノテーションが違うくらいで、さらっと使えて
いいですね。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-spring-session

2017-07-06

Spring Cloud StreamでSchema Evolution

Spring Cloud Streamのドキュメントで、Schema Evolutionというものが載っていて、ちょっと興味があったので
試してみることにしました。

Schema evolution support

Spring Cloud Stream Brooklyn.M1 is available

Schema Evolutionとは?

Spring Cloud Streamでは、SchemaベースのMessageConverterをサポートしていて、オブジェクトの
シリアライズ/デシリアライズ時に適用されます。

現在サポートされているのは、Apache Avroのみだそうです。

Welcome to Apache Avro!

Converterは2つあり、オブジェクトのシリアライズ/デシリアライズをする際のクラスの情報か、もしくはアプリケーションの起動時に
把握しているSchemaを使うもの。それからSchema Registryを使って実行時にSchemaを特定し、ドメインオブジェクトが
進化するに従い新しいSchemaを動的にSchema Registryに登録するものです。

Schema Evolution自体は、対象となるデータ構造のバージョンアップ(ダウン)に伴うSchema変更の際に、互換性を
もたせましょうという話みたいですね。

  • Backward Compatibility (Newer version can read old version)
  • Forward Compatibility (Older version can read new version)

Schema Evolution for Resilient Data microservices

Backward Compatibilityではフィールドのリネームに、Forward Compatibilityではフィールドのリネームと削除に耐えられる
ような感じですね。

参考)

GitHub - making-demo-scst/schema-evolution-demo: Schema Evolution with Spring Cloud Stream

GitHub - viniciusccarvalho/schema-evolution-samples: Samples for different schema evolution options

構成要素

この仕組みを構成する要素として、以下の2つがあります。

  • Schema Registry Server
  • Schema Registry Client

Schema Registry Serverは、Schemaを保存するサーバーで、RDBMSを使ってSchemaを保存します。デフォルトはインメモリ
データベース(H2 Database)を使用します。Spring Bootで作られているので、他のデータベースを使うようにカスタマイズ
することも可能です。

Schema Registry Server

Schema Registry Serverを使うには、「spring-cloud-stream-schema-server」というモジュールを使います。

内部的には、Spring Data JPAを使ってデータの管理を行い、またREST APIも提供します。

Schema Registry Server API

Schema Registry Clientは、Schema Registry Serverとのインターフェースを抽象化したものです。Schema Registry Clientを使う場合は、
「spring-cloud-stream-schema」モジュールを依存関係に追加します。

Spring Cloud Stream Reference Guide

Schema Registry Clientとなるのは、SourceやSink側になります。

お題

とまあ、前置きはこのくらいにして、Spring Cloud Streamの提供するSchema Evolutionを試してみましょう。

お題を書籍として、以下のようにSourceとSinkが把握しているSchemaのバージョンを変えていき、合わせてSourceに投入するデータが
表現するSchemaのバージョンも変えていってみましょう。

こんな感じで。


データ Ver.Source Known Schema Ver.Sink Known Schema Ver.
v1v1v1
v1v1v2
v1v2v2
v2v2v2
v2v2v1

データの方はSchemaに合わせたバージョンを、SourceとSinkはそれ自身が持つSchemaのバージョンを指します。

SourceとSinkは、バージョンごとにそれぞれMavenプロジェクトでのサブモジュールとします。

準備

まずは準備から。今回は、こういうMavenのマルチプロジェクト構成とします。

pom.xml
source-v1/pom.xml
source-v2/pom.xml
sink-v1/pom.xml
sink-v2/pom.xml
schema-registry-server/pom.xml

親pom.xmlがあって、SourceとSinkをサブモジュールとしてバージョンごとに用意します。それから、Schema Registry Server用にもひとつ。

親pom.xmlの内容は、こんな感じです。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>schema-evolution</artifactId>
    <packaging>pom</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <modules>
        <module>source-v1</module>
        <module>schema-registry-server</module>
        <module>sink-v1</module>
        <module>sink-v2</module>
        <module>source-v2</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Chelsea.SR2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>1.5.4.RELEASE</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Spring Cloud StreamのBOMのimport、Spring BootのMaven Pluginの設定を入れているくらいです。Apache Avroなどの設定は、今回は個別に
入れていくスタイルとします。

SourceとSinkのpom.xmlの主要な部分は、こんな感じです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

他はアーティファクトのIDが違うくらいなので、割愛…。

メッセージブローカーはApache Kafkaとするので、「spring-cloud-starter-stream-kafka」を加えます。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

Schema Evolutionのサポートを使うには、「spring-cloud-stream-schema」を追加して

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
        </dependency>

Apache Avroへの依存関係と

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.8.2</version>
        </dependency>

プラグインの設定を加えます。

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>src/main/resources/avro</sourceDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

これで、Apache AvroのIDLからJavaソースコードを自動生成してくれるようになります。これは、SourceおよびSinkで
行います。IDLは、各プロジェクトの「src/main/resources/avro」ディレクトリに置くように設定しています。

Apache Kafkaは、起動済みとします。

Schema Registry Serverの作成

先に、Schema Registry Serverから作りましょう。ドキュメントに沿って作成すればOKです。

Schema Registry Server

pom.xmlは、こんな感じです。
schema-registry-server/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>schema-evolution</artifactId>
        <groupId>org.littlewings</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>schema-registry-server</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema-server</artifactId>
        </dependency>
    </dependencies>
</project>

「spring-cloud-stream-schema-server」を追加しました。

あとは、@EnableSchemaRegistryServerアノテーションを付与したSpring Bootの起動クラスを作成すれば完了です。
schema-registry-server/src/main/java/org/littlewings/spring/cloud/SchemaRegistryServer.java

package org.littlewings.spring.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.schema.server.EnableSchemaRegistryServer;

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServer {
    public static void main(String... args) {
        SpringApplication.run(SchemaRegistryServer.class, args);
    }
}

Source v1の作成

データを登録する、Sourceのversion 1を作成します。

ソースコードは、こんな感じ。
source-v1/src/main/java/org/littlewings/spring/cloud/SourceApp.java

package org.littlewings.spring.cloud;

import java.time.LocalDateTime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
@RestController
public class SourceApp {
    public static void main(String... args) {
        SpringApplication.run(SourceApp.class, args);
    }

    Source source;

    public SourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    public String register(@RequestBody Book book) {
        source.output().send(MessageBuilder.withPayload(book).build());

        System.out.printf(
                "[%s] send book, isbn = %s, title = %s, price = %d, version = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion()
        );

        return "OK!!" + System.lineSeparator();
    }
}

@RestControllerとし、受け取ったデータをメッセージブローカーに送るわけですが、その時に内容を標準出力に表示するように
しています。特に意味はないのですが、あるバージョンのSchemaに対応したJavaクラスを使うという意味で…。

で、このBookクラスは、自分では実装しません。Apache AvroのIDLから、Maven Pluginで自動生成してもらいます。

作成したIDLはこちら。
source-v1/src/main/resources/avro/book.avsc

{
  "type": "record",
  "name": "Book",
  "namespace": "org.littlewings.spring.cloud",
  "fields": [
    { "name": "isbn", "type": "string" },
    { "name": "title", "type": "string", "default": "" },
    { "name": "price", "type": "int", "default": 0 },
    { "name": "version", "type": "string", "default": "" }
  ]
}

IDLは、こちらを見ながら作ってみました。

Schema Declaration / Complex Types / Records

namespaceは、パッケージになるみたいですねぇ…。

ビルドすると、今回の設定だと「source-v1/target/generated-sources/avro」にソースコードが出力されます。
source-v1/target/generated-sources/avro/org/littlewings/spring/cloud/Book.java

/**
 * Autogenerated by Avro
 *
 * DO NOT EDIT DIRECTLY
 */
package org.littlewings.spring.cloud;

import org.apache.avro.specific.SpecificData;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.message.SchemaStore;

@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Book extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {

〜省略〜

ちなみに、出力先のディレクトリを変えたかったらApache AvroのMaven Pluginで「outputDirectory」パラメーターで指定すれば
いいみたいですよ。

例えば、こんな感じ。

      <configuration>
        <sourceDirectory>src/main/resources/avro</sourceDirectory>
        <outputDirectory>src/main/java/</outputDirectory>
      </configuration>

続いて、設定です。
source-v1/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=book
spring.cloud.stream.bindings.output.contentType=application/*+avro

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

ここでのポイントは、「spring.cloud.stream.bindings.output.contentType」を「application/*+avro」としていることですね。

spring.cloud.stream.bindings.output.contentType=application/*+avro

これで、Apache Avroを使うMessageConverterを自動設定してくれます。

この時のContent-Typeは、このようになるのだとか。prefixは設定可能で、subjectはPayloadのタイプから決定されます。

application/[prefix].[subject].v[version]+avro

あとは、Schema Registry Serverへの接続先も設定しています。実はこれ、デフォルト値なのでこの値だと不要なのですが、
設定として把握しておこうかと。

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

ここまでで、Source v1の準備はおしまいです。

Sink v1

続いて、Sink version1。Sourceと似たり寄ったりで、@EnableSchemaRegistryClientアノテーションを付与してあげればOKです。
sink-v1/src/main/java/org/littlewings/spring/cloud/SinkApp.java

package org.littlewings.spring.cloud;

import java.time.LocalDateTime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;

@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class SinkApp {
    public static void main(String... args) {
        SpringApplication.run(SinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Book book) {
        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion()
        );
    }
}

Apache AvroのMaven Plugin設定は、Sourceの時と同じでOKです。

Apache AvroのIDLは、Sourceとまったく同じものを使用するので割愛。

sink-v1/src/main/resources/avro/book.avsc

〜省略〜

設定。
sink-v1/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=book
spring.cloud.stream.bindings.input.group=book-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

spring.cloud.stream.schemaRegistryClient.endpoint=http://localhost:8990

server.port=18080

Schema Registry Serverへの接続情報を書いている以外は、あまり新しい情報はありません。Sourceとかぶらないように、Listen Portを
ずらしておいたくらいですね。

Source v2/Sink v2

最後は、SourceとSinkのversion 2をまとめて。

IDLは、それぞれのプロジェクトで同じものを使用します。

IDLの定義自体は、追加フィールドで確認します。
source-v2/src/main/resources/avro/book.avsc
sink-v2/src/main/resources/avro/book.avsc

{
  "type": "record",
  "name": "Book",
  "namespace": "org.littlewings.spring.cloud",
  "fields": [
    { "name": "isbn", "type": "string" },
    { "name": "title", "type": "string", "default": "" },
    { "name": "price", "type": "int", "default": 0 },
    { "name": "version", "type": "string", "default": "" },
    { "name": "tags", "type": { "type": "array", "items": "string" }, "default": [] }
  ]
}

「tags」というフィールドを追加しました。

SourceとSinkのコードには、増えたフィールドを使うコードに変更しておきます。

Source。

@SpringBootApplication
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
@RestController
public class SourceApp {
    public static void main(String... args) {
        SpringApplication.run(SourceApp.class, args);
    }

    Source source;

    public SourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    public String register(@RequestBody Book book) {
        source.output().send(MessageBuilder.withPayload(book).build());

        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion(),
                book.getTags()
        );

        return "OK!!" + System.lineSeparator();
    }
}

Sink。

@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class SinkApp {
    public static void main(String... args) {
        SpringApplication.run(SinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Book book) {
        System.out.printf(
                "[%s] received book, isbn = %s, title = %s, price = %d, version = %s, tags = %s%n",
                LocalDateTime.now(),
                book.getIsbn(),
                book.getTitle(),
                book.getPrice(),
                book.getVersion(),
                book.getTags()
        );
    }
}

あとの設定は、version 1の時と同じです。

動作確認

それでは、パッケージングして動作確認してみましょう。

$ mvn package

最初に、Schema Registry Serverを起動しておきます。

$ java -jar schema-registry-server/target/schema-registry-server-0.0.1-SNAPSHOT.jar

Schema Registry Serverは、デフォルトで8990ポートでListenします。

Data v1 / Source v1 → Sink v1

では、SourceとSinkのversion 1を起動します。

## Source
$ java -jar source-v1/target/source-v1-0.0.1-SNAPSHOT.jar

## Sink
$ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar

データをPOSTしてみます。curlで実行しようと思ったのですが、ちょっと面倒になってこういうスクリプトを用意。
post.sh

#!/bin/bash

FILE=$1
curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d @$FILE

はい。

で、次のようなデータを2つ放り込みます。
book-v1-1.json

{
  "isbn": "978-4798142470",
  "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発",
  "price": 4320,
  "version": "v1"
}

book-v1-2.json

{
  "isbn": "978-4774183169",
  "title": "パーフェクト Java EE",
  "price": 3456,
  "version": "v1"
}

データのフォーマットは、version 1です。

登録。

$ ./post.sh book-v1-1.json
OK!!
$ ./post.sh book-v1-2.json
OK!!

標準出力に、それぞれログが出力されます。

## Source
[2017-07-06T22:52:22.243] send book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1
[2017-07-06T22:52:24.662] send book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1

## Sink
[2017-07-06T22:52:23.098] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1
[2017-07-06T22:52:24.669] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1

とりあえず、Apache Avroを使って動きました。

Data v1 / Source v1 → Sink v2

ここで、Sink v1を落としてSink v2を起動させます。

$ java -jar sink-v2/target/sink-v2-0.0.1-SNAPSHOT.jar

もう1度、version 1形式のデータを登録してみます。

$ ./post.sh book-v1-1.json
OK!!
$ ./post.sh book-v1-2.json
OK!!

Sink側で、問題なく受け取れます。tagsの中身は、もちろん空ですが。

[2017-07-06T22:54:43.637] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v1, tags = []
[2017-07-06T22:54:44.542] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v1, tags = []

Data v1 / Source v2 → Sink v2

今度は、Source v1を落としてSource v2を起動します。

$ java -jar source-v2/target/source-v2-0.0.1-SNAPSHOT.jar

で、データを登録しようとすると、うまくいきま…せん。

$ ./post.sh book-v1-1.json 
{"timestamp":1499349527822,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"}

$ ./post.sh book-v1-2.json 
{"timestamp":1499349545349,"status":500,"error":"Internal Server Error","exception":"org.springframework.messaging.MessageDeliveryException","message":"failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException: null of array of org.littlewings.spring.cloud.Book","path":"/register"}

tagsがないからですと…。これは…仕方ないかな…。

Data v2 / Source v2 → Sink v2

というわけで、データの形式もversion 2にします。

book-v2-1.json

{
  "isbn": "978-4798142470",
  "title": "Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発",
  "price": 4320,
  "version": "v2",
  "tags": [
    "Java", "Spring"
  ]
}

book-v2-2.json

{
  "isbn": "978-4774183169",
  "title": "パーフェクト Java EE",
  "price": 3456,
  "version": "v2",
  "tags": [
    "Java", "Java EE"
  ]
}

登録。

$ ./post.sh book-v2-1.json
OK!!
$ ./post.sh book-v2-2.json
OK!!

コンソールの出力結果。

## Source
[2017-07-06T23:02:04.908] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:02:07.321] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

## Sink
[2017-07-06T23:02:04.935] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:02:07.327] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

tagsの内容が渡っていますね。

Data v2 / Source v2 → Sink v1

最後に、Sinkをversion 1に戻してみます。

$ java -jar sink-v1/target/sink-v1-0.0.1-SNAPSHOT.jar

登録。

$ ./post.sh book-v2-1.json
OK!!
$ ./post.sh book-v2-2.json
OK!!

こちらは、Sinkでも受け取れます。

[2017-07-06T23:05:37.345] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2
[2017-07-06T23:05:37.899] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2

tagsはなかったことになりますけれど。

Sourceでは、出力されています。

[2017-07-06T23:05:36.770] received book, isbn = 978-4798142470, title = Spring徹底入門 Spring FrameworkによるJavaアプリケーション開発, price = 4320, version = v2, tags = [Java, Spring]
[2017-07-06T23:05:37.893] received book, isbn = 978-4774183169, title = パーフェクト Java EE, price = 3456, version = v2, tags = [Java, Java EE]

これで、SourceとSinkのSchema定義が異なるものであって、データの受け渡しができることが確認できました。

Schema Registry Serverをもう少し

Schema Registry Serverで保存されるSchema情報は、JPAのEntityとして表現されます。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/model/Schema.java

また、Schema登録時にはSubjectとFormatが一致していれば、特定のバージョンにマッピングして扱われ、そうでなければバージョン1として
扱うようです。
https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/controllers/ServerController.java

		List<Schema> registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion(
				schema.getSubject(), schema.getFormat());
		if (registeredEntities == null || registeredEntities.size() == 0) {
			schema.setVersion(1);
			result = this.repository.save(schema);
		}
		else {
			result = validator.match(registeredEntities, schema.getDefinition());
			if (result == null) {
				schema.setVersion(
						registeredEntities.get(registeredEntities.size() - 1).getVersion()
								+ 1);
				result = this.repository.save(schema);
			}

		}

登録されているSchemaを、REST APIで見てみましょう。

Schema Registry Server API

登録は…いいかなと思うので、参照を。

まずは、こちら。

GET /{subject}/{format}

結果。

$ curl http://localhost:8990/book/avro | jq .
  {
    "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}",
    "format": "avro",
    "subject": "book",
    "version": 1,
    "id": 1
  },
  {
    "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"tags\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"default\":[]}]}",
    "format": "avro",
    "subject": "book",
    "version": 2,
    "id": 2
  }
]

確かに、定義したSchemaが入ってますね。

続いて、バージョンを指定して。

GET /{subject}/{format}/{version}

ここで気づく。

$ curl http://localhost:8990/book/avro/v1 | jq .
{
  "definition": "{\"type\":\"record\",\"name\":\"Book\",\"namespace\":\"org.littlewings.spring.cloud\",\"fields\":[{\"name\":\"isbn\",\"type\":\"string\"},{\"name\":\"title\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"price\",\"type\":\"int\",\"default\":0},{\"name\":\"version\",\"type\":\"string\",\"default\":\"\"}]}",
  "format": "avro",
  "subject": "book",
  "version": 1,
  "id": 1
}

なんか、「v」って要るんですけど…。

http://localhost:8990/book/avro/v1

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-schema-server/src/main/java/org/springframework/cloud/stream/schema/server/controllers/ServerController.java#L114

まあ、いいですけど…。

Schema Registry Server & Clientを使ったSchema登録&解決について

ドキュメントに、Schema登録と解決時の図があるのですが…

Schema Registration and Resolution

小さすぎて見えないので、ちょっと貼っておきます…。

Schema Registration Process (Serialization)

Schema Registration Process (Serialization)

登録…シリアライズの最初のプロセスは、Channel送信時にSpecificRecordやGenericRecordであればSchemaを取得できるのでそちらを使い、POJOであればプロパティ
「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnable」がtrueであれば動的にSchemaを推論します。

プロパティ「spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled」は、デフォルトでtrueです。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/schema_resolution.png
f:id:Kazuhira:20170706232501p:image

Schemaを取得後は、Schema Registry Serverよりメタデータ(バージョン)を取得します。次にキャッシュを参照し、見つからなければSchemaを
Schema Registry Serverに送信し、バージョン情報を取得します。Converterはその結果をキャッシュし、シリアライズのフェーズごとに
Schema Registry Serverを照会するオーバーヘッドを回避します。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/registration.png
f:id:Kazuhira:20170706232505p:image

Schemaのバージョンは、Content-Typeヘッダーに含まれます、と。

Schema Resolution Process (Deserialization)

Schema Resolution Process (Deserialization)

シリアライズ時のSchema解決について。Content-Typeヘッダーなどからバージョンを取得し、ConverterはメッセージのWriter Schemaを
取得するようにSchema Registry Serverにクエリを投げます(キャッシュされていなければ)。メッセージに対する正しいSchemaを
見つけたら、次にReader Schemaを取得しApache AvroのSchema解決サポートを利用してReader定義を読み込みます。

https://github.com/spring-cloud/spring-cloud-stream/blob/v1.2.2.RELEASE/spring-cloud-stream-core-docs/src/main/asciidoc/images/schema_reading.png

f:id:Kazuhira:20170706232641p:image

WriterとReaderで、異なるSchemaが使えるみたいですね。これは重要なことみたいです。

Spring Cloud Streamはどのようにメッセージを読むのかを決定するためにWriter Schemaをフェッチしますし、ちゃんとApache AvroのSchema Evolutionが
有効にするためにはReader Schemaがアプリケーションに適切に設定されている必要がありますと。

Apache AvroのSchema Resolutionについては、こちら。

Schema Resolution

「if both are records:」のところを見ればいいですね。

  • フィールドの順序は異なっていてもよく、名前で照会される
  • 両方のレコードで、同じ名前のフィールドは再帰的に解決される
  • Writerのレコードに存在するフィールドで、Reader側のレコードに同じ名前のフィールドがない場合、Writer側のフィールドの値は無視される
  • ReaderのレコードのSchemaにデフォルト値を含むフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、Readerはそのデフォルト値を使用する
  • ReaderのレコードのSchemaにデフォルト値のないフィールドがあり、WriterのSchemaに同じ名前のフィールドがない場合、エラーとなる

まとめ

Spring Cloud Streamで、Schema Evolutionを試してみました。

確認するのにまあまあ時間がかかりましたが、とりあえず動かせたのと理屈はある程度見れたのでいいかなと。

Apache Avro、知っておいた方がいいんでしょうかね。