Hatena::ブログ(Diary)

CLOVER

2016-09-28

Hazelcast Internal(構造編)

Hazelcastの内部構造について、このブログで書いてみては?みたいな話があったので。

これまで、このブログではHazelcastの簡単な使い方と表層にしか触れてきませんでしたし、書いている自分自身もそういうスタンスだったのですが(もうちょっと踏み込んだIMDGはありますが)、せっかくなので追ってみることにしました。

というわけで、2回ほどに分けてHazelcastの内部を、初歩的な範囲で追ってみます。

このエントリを読むような方は、

  • ある程度Hazelcastの基本的な使い方を知っている
  • Hazelcastの中の構造などに興味がある

と、もともと狭い対象範囲がさらに狭くなる感じですが、ご興味があればということで。

最初のエントリは、構造編です。2回目は、ネットワークまわりを考えています。2回目がいつになるか、わかりませんが!

このエントリで扱うHazelcastのバージョンは、3.7.1とします。

このエントリで解説する内容

以下の内容を題材にします。

  • Partition/PartitionId
  • 代表的なHazelcastの内部クラス
  • Hazelcastのデータ構造のうち、Mapを代表としてRecordStore
  • エントリの登録とバックアップへの登録まで

Hazelcastは、以下のような多彩なデータ構造を扱えます。

  • Map
  • MultiMap
  • Replicated Map
  • Cache
  • Queue/Set/List(Collection)
  • RingBuffer
  • Topic/Reliable Topic

のちに記載するPartitionを使うという概念は同じですが、データの持ち方などはこの構造ごとに別々に管理されているため、今回は代表的に使うと思われるMap(Distributed Map)について扱います。

ただ、他のデータ構造においても似たような用語はソースコードを追っていくと現れるので、ひとつ読み解くと他のデータ構造もある程度わかるようになるような気がします。

今回のエントリで対象とするのは、Embeddedに絞ります。Client/Server構成は、対象外とします。また、Mapについて書くとしていますが、Distributed ExecutorやMap Reduce/Aggregationといった分散処理のAPIについても今回は対象外とします。

あと、クラス図っぽいものも登場しますが、記法の厳密性はそんなに気にしていないので、微妙なところは置いておいてください…。

Partition

まずはじめに、Partitionについて説明します。

Hazelcastを使ったコードを書く際には、例えば以下のようなコードを書きます。

        HazelcastInstance hazelcast = Hazelcast.newHazelcastInstance();

        IMap<String,String> map = hazelcast.getMap("default");
        map.put("key1", "value1");
        map.get("key1");

ここで、利用者側はHazelcastでどのようにデータが管理されているかは、特に意識しません。Hazelcastを使う時は、たいてい以下のように複数Nodeでのクラスタを構成するかと思いますが、
f:id:Kazuhira:20160928233825j:image

Mapなどに対してputやgetをすると、クラスタ内の適切なNodeからエントリを取得してきてくれます。


このデータの管理についてですが、HazelcastはPartitionと呼ばれる単位で管理しています。
Data Partitioning

Partitionと呼ばれる単位で、メモリを区切ってデータを管理します。デフォルトでは271個のPartitionを持つように設定されており、クラスタ内のNodeがひとつの場合はすべてのPartitionを1 Nodeが保持します。
f:id:Kazuhira:20160927174843j:image

※この図は、オフィシャルのドキュメントのものです

Nodeが追加されると、Partitionの分布がNode間で分かれます。
f:id:Kazuhira:20160927174844j:image

Nodeが2以上になると、Partitionのバックアップという概念が現れます。この図でいくと、黒い字で書かれている方がPrimary Partition(Owner)で、青い字で書かれている方がBackup Partitionという扱いになります。

ここでは、1〜135までのPartitionがひとつのNode、136〜271までのPartitionが別のNodeがPrimaryとなり、逆のNodeはPrimaryではないPartitionのBackupとなります。

さらにNodeを追加すると、リバランスされてPartitionの配置状況が変わります。
f:id:Kazuhira:20160927174845j:image

Nodeを追加したり、またNodeがダウンした際には、このPartition単位でNode間のデータの移動が行われますし、Nodeがダウンした際のデータの復旧やバックアップは残ったNodeにあるPartitionから行われることになります。

Hazelcastの代表的な内部クラス

で、ここからは、少しHazelcastの内部に踏み込んでいきます。

単純に提供されたデータ構造に従って実装をしていくのであれば、IMapやISetといったJava Collection Frameworkのインタフェースの拡張インターフェースに対してコードを書いていきます。実装クラスは、ほとんど現れません。

中心となるHazelcastInstanceも、インターフェースです。

実装クラスについて少し追っていくと、以下のようなクラスたちを目にすることになると思います。
f:id:Kazuhira:20160927174842j:image

だいたい目にするのは、NodeやNodeEngine(の実装)で、これらのクラスがHazelcastが動作するための中心的なクラスとなっています。

NodeはHazelcastのNode自身を表し、HazelcastInstanceの実装が作成される段階で、一緒に作成されます。このタイミングで、NodeEngineも作られます。

Hazelcastの起動後、MapやSetなどを使って操作を行った際には、裏をたどっていくとNodeやNodeEngineから取得したクラスを使って処理を行っていくことが見えると思います。

ただ、これらのクラス/インターフェースは、外部には公開されていないため、基本的に利用者側が目にすることはありません。

実装を追っていくと割と目にするので、こういうクラスたちが中心にいるんだなーと思っておきましょう。

Partition Internal

PartitionとNode/NodeEngineの話をしたところで、Partitionがどのように保持されているかを書いていきたいと思います。

HazelcastのAPIを使ってPartitionにアクセスするには、PartitionServiceを使って以下のようなコードを書きます。

            PartitionService ps = hazelcast.getPartitionService();
            Partition partition = ps.getPartition("key1");
            int partitionId = partition.getPartitionId();
            Member owner = partition.getOwner();
            Address ownerAddress = owner.getAddress();

PartitionServiceを使うと、キーからPartitionを取得したり(PartitionService#getPartition)、現在のPartitionをすべて取得することができます(PartitionService#getPartitions)。

PartitionServiceはインターフェースで、その実装クラスはPartitionServiceProxyとなります。ここから、すべてのPartitionの情報を取得することができます。
f:id:Kazuhira:20160928233823j:image

ただ、これらのクラスは利用者向けのコードのフロントエンドにすぎません。

実際には、Nodeクラスの配下に内部的なPartitionが管理されており、利用者向けに提供されているものとは異なる、内部APIとしてPartition関連の情報が表現されています。
f:id:Kazuhira:20160928233824j:image

ここまでたどると、PartitionのPrimaryとなっているMember、そしてそのBackupを持っているMember(内部的には、レプリカと表現されています)を知ることができます。

つまり、利用者側からはPartitionのBackup Memberを知ることができません。

まあ、内部情報を追っていけば、Partitionの持ち主全部を知ることができるわけですね。

とはいえ、PartitionはNodeの情報は持っていますが、実際に保存されているデータについてはまったく知りません。データがどこにあるかは、
利用するデータ構造に依存します。

というわけで、今回はMap(Distributed Map)を見ていきます。

Distributed Mapの内部

Hazelcastの提供するMap、インターフェースとしてはIMapですが、こちらの実装クラスを追ってみると以下のようなクラスにたどり着きます。
f:id:Kazuhira:20160928233826j:image

MapProxy〜みたいなクラスが出てくるはずです。なんでProxy?みたいなところもありますが(PartitionServiceインターフェースの実装クラスも、PartitionServiceProxyでしたね)、

で、MapProxy〜をさらに追い、データを扱っているところまで見ていくと、以下のようなクラスにたどり着きます。なお、ここで記載したクラスは、すべてMap専用のパッケージ(com.hazelcast.map.impl.〜)もしくはサブパッケージに属します。
f:id:Kazuhira:20160928233827j:image

PartitionIdに対応したPartitionContainerがあり、PartitionContainerがRecordStore、そしてStorageを持ちます。データ管理の末端は、このStorageと考えてよいと思います。
Hazelcastでメモリ中にデータを保存する際のフォーマットを選ぶことができますが、その設定はStorage構築時に渡されることになります。

Setting In-Memory Format

ところで、図の中に「name単位で」という表現が出てきましたが、ここで言うnameとは、HazelcastInstanceからMapを取得する時のnameです。

つまり、以下のコード例の場合だと「default」がMapの名前になります。

        IMap<String,String> map = hazelcast.getMap("default");

今回、Mapを例に記載しましたが、他のデータ構造でもあまり変わらずRecordStoreやStorageっぽい名前のクラスを見かけることになります(これらのクラスやインターフェースに、継承関係などがあるわけではありませんが)。

Partitionとの関係をあらためて整理すると、Hazelcastクラスタ内のNodeをPartitionと呼ばれる単位に区切り
f:id:Kazuhira:20160927174845j:image

そのPartitionの中に、RecordStoreがname単位にあり、末端がStorageという感じですね。
f:id:Kazuhira:20160928233828j:image

Storageにはシリアライズされたデータが格納されることになりますが、シリアライズ関係で出てくるのはSerializationServiceとDataになります。
f:id:Kazuhira:20160928233829j:image

Mapへ格納するオブジェクトがSerializableインターフェースを実装していることは前提ですが、Hazelcastの内部的にはシリアライズ後にDataというクラスに変換して保持することになります。SerializationServiceは、Hazelcastのシリアライズ処理を抽象化したものです。

そして、このData(キーをDataに変換したもの)とInternalPartitionServiceを使うことで、該当のDataがどのPartitionに配置されるのかを決める、PartitionIdを求めることができます。
f:id:Kazuhira:20160928233830j:image

こうやって、Partitionと配置されるデータの関係が決まっていくわけですと。

put/getを少し追う

最後に、HazelcastのMapに対してputやgetした時の動作を少し追ってみましょう。

IMapインターフェースの実装である、MapProxySupportやMapProxyImplを追っていくと、〜Operationみたいなクラスを目にすることになります。

たとえば、Map#putするとPutOperation、Map#getするとGetOperationといった具合に、たいていのメソッド呼び出しはOperation(com.hazelcast.spi.Operationインターフェース)に行き着きます。

Hazelcastのデータ構造に対する操作は、このOperationで表現されていると思ってよいでしょう。
※なので、インターフェースの実装クラスがProxyなのかなと思ったり

Mapの場合は抽象実装としてMapOperationというクラスがあり、このサブクラスとしてMapに対する各種Operationが実装してあります。
f:id:Kazuhira:20160928233831j:image

Mapに対する操作(Operation)は、キーからPartitionIdの計算後にRecordStoreを特定し、該当のRecordStoreからデータの出し入れを行うといったことになります。

Operationの実行は、1度キューに積まれて別スレッドで実行されます。この図は、エントリのPrimary OwnerがLocal Nodeだった場合です。
f:id:Kazuhira:20160928233832j:image

Operationの実行自体は、OperationRunnerで行われます。

なお、コマンド実行時にバックアップも行われます。その時は、再度Operationが実行されます(PutBackupOperation)。でも、バックアップ先はRemote Nodeです。

どうするかというと、エントリの配置先がRemote Nodeの場合は、Operationごとシリアライズして別Nodeに転送します。
f:id:Kazuhira:20160928233833j:image

別Nodeに転送されたOperationが、転送先のNodeで実行されるというわけです。Backupもこの理屈ですね。

まとめ

簡単にですが、Hazelcastの内部についての初歩的な(?)内容と、Mapを題材にしたデータの持ち方をご紹介してみました。

だいぶ端折っているところもありますが、このあたりが見えていると挙動が少し追いやすいのではないでしょうか。

今回はネットワークまわりは一切言っていませんが、気が向けばそのうち、そのうち…。

2016-09-27

NINJA AKASAKA

9月27日の午後8時頃から、赤坂見附にあるNINJA AKASAKAに行ってきました。

NINJA AKASAKA

なにしに行ってきたのって、Christoph Engelbertさん(@noctarius2k)とその奥さん、そして@cero_tさんとお会いしてきました。

Christoph EngelbertさんはHazelcastの開発者の方なのですが、JavaOneの後に来日されるということで、その日程の中で自分と夕食を一緒にどう?と言っていただけたと。



それはもう、めちゃくちゃビッッッックリしました。とても困りました。



そこに、自分と話していた@cero_tさんも招待されて、4人での夕食となりました。

それでもめっちゃ困っていたのですが、そこからは@cero_tさんが場所などをポンポンと決めていき、今回の場所・日程となりましたと…。




で、当日ですが、やっぱりあまりしゃべれませんでした()。ほぼ@cero_tさんに任せっぱなしだったと言えます。話している内容は、多少わかるのですが…。

ただ、ちょっと予想と違っていたのが、会話の内容がJavaとかHazelcastの話には一切ならずに主に日本についての雑談だったことでしょうか。
※ご本人は、HazelcastのTシャツを着ていましたが

とにかく、日本のアニメについてとてもとても詳しく、タイトルをいろいろ言われたのですが全部わかりませんでした。





自分、アニメは全然知らないのですが、少しは知っておいた方がいいのでしょうか…。

あとは、日本が蒸し暑いとか、自分や@cero_tさんの勤務時間(長い!)とか、出てきた料理の話に終始していた感じですね。

そういえば、日本での話なら一蘭(ラーメン)について聞きたいと思っていたの、忘れてた…。

ところで、NINJA AKASAKAには初めて来たのですが、これはこれで面白かったです。
「臨兵闘者皆陣列在前」とか言われた時には、「こういうの、久しぶりに聞いたわー」と思いました。忍者というよりは手品って感じも多かったですが、こういうのもいいでしょう、と。

料理については、うちのテーブルは忍びコース、しゃぶしゃぶコースでした。自分はしゃぶしゃぶのコースを選びましたが、美味しかったです♪

そんな流れで、今回のこの会はお終い。

JJUG CCC 2016 FallでChristoph Engelbertさんがまた来日されるようであれば、ご挨拶しておきたいところですね。

そして、今回は@cero_tさんなしではありえない会でした。本当にありがとうございました!
まだ貸りを返したつもりには全然なっていませんので、またご飯をご一緒する機会があれば、その時には。



本日お会いした方々、ありがとうございました〜。





今回、お声がけいただいたきっかけは自分がHazelcastについてのブログをよく書いていたことがきっかけだと思うのですが、今回そういう話にはならず。
とはいえ、こうやって声をかけていただいたこと自体が初めてだったので、記念として書いておきます。

余談)
ここは、読み通りだったらしいです。


2016-09-22

はじめてのWildFly Swarm

前々から気になっていたWildFly Swarmですが、そろそろちょっと試してみることにしました。

Rightsize your Java EE Applications | WildFly Swarm

WildFly Swarmって?

WildFlyを組み込んで、実行可能JARを作成したり、必要な機能やその他のライブラリなどを統合した機能を提供してくれる仕組み。

詳しくは、こちらの資料などを見ていただくとよいかと。



WildFly Swarm - Rightsize Your Java EE Apps from Yoshimasa Tanabe

また、ドキュメントはこちら。

WildFly Swarm User’s Guide

こちらも、合わせて読むとよいでしょう。

WildFly Swarm Tour

サンプルリポジトリもあるので、適宜参考に。
https://github.com/wildfly-swarm/wildfly-swarm-examples

自分がWildFly Swarmを使うモチベーション

アプリケーションサーバーにWARファイルをデプロイするのが面倒。

これに尽きます。

なので、uberjarとかMicroserviesとかの高尚な(?)話は割とよくて、

ふつうに作ったJava EEアプリケーションを簡単に起動したい、というのがまずは使ってみたい動機です。

またMicroProfile Serverという、JARファイルにWARファイルを与えて起動できるというものも登場したので(Payara Microっぽいですね)、こちらを使いたいなぁと。

WildFly Swarmのuberjar、でかいんですよ…。

なので、作るWARファイルはまずはふつうのJava EEの範囲でのものを考えます。WildFly Swarmならではのものは、そのうち気が向いたら試していくかも?

使ってみる

というわけで、この目的でWildFly Swarmを使ってアプリケーションを動かしてみます。今回は、JAX-RSCDIで簡単なアプリケーションを書いて、最後にArquillianでテストまでしてみます。

Mavenの設定は、このように用意。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.littlewings</groupId>
    <artifactId>wildfly-swarm-mp-jaxrs-cdi</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>

        <wildfly.swarm.version>2016.9</wildfly.swarm.version>
        <arquillian.version>1.1.10.Final</arquillian.version>
        <scala.major.version>2.11</scala.major.version>
        <scala.version>${scala.major.version}.8</scala.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.wildfly.swarm</groupId>
                <artifactId>bom-all</artifactId>
                <version>${wildfly.swarm.version}</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            <dependency>
                <groupId>org.jboss.arquillian</groupId>
                <artifactId>arquillian-bom</artifactId>
                <version>${arquillian.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.jboss.resteasy</groupId>
            <artifactId>resteasy-client</artifactId>
            <version>3.0.19.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.wildfly.swarm</groupId>
            <artifactId>jaxrs-cdi</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.wildfly.swarm</groupId>
            <artifactId>arquillian</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.arquillian.junit</groupId>
            <artifactId>arquillian-junit-container</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.major.version}</artifactId>
            <version>3.0.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>app</finalName>
        <plugins>
            <plugin>
                <groupId>org.wildfly.swarm</groupId>
                <artifactId>wildfly-swarm-plugin</artifactId>
                <version>${wildfly.swarm.version}</version>
                <!--
                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
                -->
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-Xlint</arg>
                        <arg>-unchecked</arg>
                        <arg>-deprecation</arg>
                        <arg>-feature</arg>
                    </args>
                    <recompileMode>incremental</recompileMode>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Scalaが入っているのは、放っておいてください(笑)。

BOMとして、WildFly SwarmのものとArquillianのものを入れておきます。

providedスコープにJava EEAPI(とScala)を追加。あとは、テスト用のものです。WildFly Swarmのjaxrs-cdiがtestスコープに混じっていますが、これはテストの起動時には要るから…。

また、JAX-RSのリソースクラスに対して、HTTP経由でテストするため、RESTEasyのClient APIも入れています。

WildFly SwarmのMaven Pluginですが、uberjarは作らないのでexecutionsのところはコメントアウト。

            <plugin>
                <groupId>org.wildfly.swarm</groupId>
                <artifactId>wildfly-swarm-plugin</artifactId>
                <version>${wildfly.swarm.version}</version>
                <!--
                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
                -->
            </plugin>

ビルドしてできあがるWARファイルの名前は、「app.war」とします。

        <finalName>app</finalName>

サンプルアプリケーション

足し算をするだけの、簡単なアプリケーションを作ります。

JAX-RSの有効化。
src/main/scala/org/littlewings/javaee7/microprofile/rest/JaxrsActivator.scala

package org.littlewings.javaee7.microprofile.rest

import javax.ws.rs.ApplicationPath
import javax.ws.rs.core.Application

@ApplicationPath("rest")
class JaxrsActivator extends Application

JAX-RSリソースクラス。
src/main/scala/org/littlewings/javaee7/microprofile/rest/CalcResource.scala

package org.littlewings.javaee7.microprofile.rest

import javax.enterprise.context.RequestScoped
import javax.inject.Inject
import javax.ws.rs.core.MediaType
import javax.ws.rs.{GET, Path, Produces, QueryParam}

import org.littlewings.javaee7.microprofile.service.CalcService

@Path("calc")
@RequestScoped
class CalcResource {
  @Inject
  var calcService: CalcService = _

  @Path("add")
  @GET
  @Produces(Array(MediaType.TEXT_PLAIN))
  def add(@QueryParam("a") a: Int, @QueryParam("b") b: Int): Int =
    calcService.add(a, b)
}

CDI管理Bean。
src/main/scala/org/littlewings/javaee7/microprofile/service/CalcService.scala

package org.littlewings.javaee7.microprofile.service

import javax.enterprise.context.ApplicationScoped

@ApplicationScoped
class CalcService {
  def add(a: Int, b: Int): Int = a + b
}

とまあ、こんな感じで。特にWildFly Swarmに依存するコードは入っていません。

実行

いくつか方法がありますが、まずはWildFly SwarmのMaven Pluginで起動してみます。

$ mvn wildfly-swarm:run

確認。

$ curl 'http://localhost:8080/rest/calc/add?a=5&b=3'
8

OKですね。

次に、MicroProfile Serverで起動してみます。ダウンロードページから、「microprofile-hollowswarm.jar」をダウンロードします。

http://wildfly-swarm.io/downloads/

現時点では、「microprofile-2016.9-hollowswarm.jar」というものになります。

WARファイルを作ります。

$ mvn package

WARファイルを指定して、「java -jar」で起動。

$ java -jar microprofile-2016.9-hollowswarm.jar target/app.war

確認。この場合は、URLにコンテキストパス(今回は「app」)が入ります。

$ curl 'http://localhost:8080/app/rest/calc/add?a=5&b=3'
8

uberjarで実行したい場合は、WildFly SwarmのMaven Pluginをpackage時に実行するように仕込んでください。

            <plugin>
                <groupId>org.wildfly.swarm</groupId>
                <artifactId>wildfly-swarm-plugin</artifactId>
                <version>${wildfly.swarm.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>package</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

これで、「mvn package」でuberjarができあがります。

テストをする

最後に、Arquillianを使ってテストをします。

特にArquillianの設定ファイルなどを用意しなくても、実行可能です。

Testing with Arquillian

Arquillian を用いた Integration Test

それでは、テストを用意します。

JAX-RS側のテストコード。こちらは、@RunAsClientでクライアントとして動かします。
src/test/scala/org/littlewings/javaee7/microprofile/rest/CalcResourceTest.scala

package org.littlewings.javaee7.microprofile.rest

import java.net.URL
import javax.ws.rs.ApplicationPath
import javax.ws.rs.client.ClientBuilder

import org.jboss.arquillian.container.test.api.{Deployment, RunAsClient}
import org.jboss.arquillian.junit.Arquillian
import org.jboss.arquillian.test.api.ArquillianResource
import org.jboss.shrinkwrap.api.ShrinkWrap
import org.jboss.shrinkwrap.resolver.api.maven.Maven
import org.junit.Test
import org.junit.runner.RunWith
import org.littlewings.javaee7.microprofile.service.CalcService
import org.scalatest.Matchers
import org.wildfly.swarm.Swarm
import org.wildfly.swarm.arquillian.CreateSwarm
import org.wildfly.swarm.jaxrs.JAXRSArchive

object CalcResourceTest {
  @Deployment
  def createDeployment: JAXRSArchive =
    ShrinkWrap
      .create(classOf[JAXRSArchive])
      .addClasses(classOf[JaxrsActivator], classOf[CalcResource], classOf[CalcService])
      .addAsLibraries(
        Maven
          .resolver
          .loadPomFromFile("pom.xml")
          .importRuntimeDependencies
          .resolve("org.scalatest:scalatest_2.11:3.0.0")
          .withTransitivity
          .asFile: _*
      )

  @CreateSwarm
  def newContainer: Swarm = new Swarm
}

@RunWith(classOf[Arquillian])
@RunAsClient
class CalcResourceTest extends Matchers {
  private val resourcePrefix: String =
    classOf[JaxrsActivator]
      .getAnnotation(classOf[ApplicationPath])
      .value

  @ArquillianResource
  private var deploymentUrl: URL = _

  @Test
  def calcTest(): Unit = {
    val client = ClientBuilder.newBuilder.build

    try {
      val response =
        client
          .target(s"${deploymentUrl}${resourcePrefix}/calc/add?a=5&b=3")
          .request
          .get

      response.readEntity(classOf[String]) should be("8")

      response.close()
    } finally {
      client.close()
    }
  }
}

CDI管理Beanのテスト。
src/test/scala/org/littlewings/javaee7/microprofile/service/CalcServiceTest.scala

package org.littlewings.javaee7.microprofile.service

import javax.inject.Inject

import org.jboss.arquillian.container.test.api.Deployment
import org.jboss.arquillian.junit.Arquillian
import org.jboss.shrinkwrap.api.ShrinkWrap
import org.jboss.shrinkwrap.api.spec.WebArchive
import org.jboss.shrinkwrap.resolver.api.maven.Maven
import org.junit.Test
import org.junit.runner.RunWith
import org.scalatest.Matchers
import org.wildfly.swarm.Swarm
import org.wildfly.swarm.arquillian.CreateSwarm

object CalcServiceTest {
  @Deployment
  def createDeployment: WebArchive =
    ShrinkWrap
      .create(classOf[WebArchive])
      .addClass(classOf[CalcService])
      .addAsLibraries(
        Maven
          .resolver
          .loadPomFromFile("pom.xml")
          .importRuntimeDependencies
          .resolve("org.scalatest:scalatest_2.11:3.0.0")
          .withTransitivity
          .asFile: _*
      )

  @CreateSwarm
  def newContainer: Swarm = new Swarm
}

@RunWith(classOf[Arquillian])
class CalcServiceTest extends Matchers {
  @Inject
  var calcService: CalcService = _

  @Test
  def addTest(): Unit =
    calcService.add(5, 3) should be(8)
}

コードの書き方自体は通常のArquillianを使った場合と変わりませんが、WildFly Swarmのコンテナについての設定などを入れる場合は、@CreateSwarmを付与したstaticメソッドでコンテナを返すように実装するようです。

  @CreateSwarm
  def newContainer: Swarm = new Swarm

今回は特になにもしていないので、このメソッドは実はなくても動作します。

あとは、テストを実行するだけ。

$ mvn test

ちょっと重いのですが、これでArquillianを使ったテストが実行されます。

まとめ

WildFly Swarmを使って、ふつうのJava EEアプリケーションを動作させてみました。

このままだとIDEからmainメソッドで実行できないじゃないかとかありますが、一応やりたいことは実現できましたと。

IDEからmainメソッド欲しかったらさすがにコンテナ使ったコード書くかなぁとか、DataSource使いたかったらどうしようとかあるのですが…。

DataSourceとかは、どちらかというと設定ファイルにしたい派なんですよね。アプリケーションサーバー依存になりますけど、*-ds.xmlとか用意すればいいのかな。

あと、Arquillianを使ったテストはやっぱり重いので、UnitTestの時はApache DeltaSpikeでできる範囲は代用とかかなぁとちょっと思ったり。

まあ、個人の趣味程度に遊んでいきたいと思います。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/wildfly-swarm-mp-jaxrs-cdi

2016-09-17

Spring Data Hazelcastで遊ぶ

HazelcastによるSpring Data向けのモジュールが、8月末くらいにリリースされているのに気付きました。

GitHub - hazelcast/spring-data-hazelcast: Hazelcast Spring Data integration Project http://projects.spring.io/spring-data/

現時点のバージョンは、1.0です。

せっかくなので、こちらで遊んでみたいと思います。

Spring Data Hazelcastとは

Spring Data Key Valueをベースにしたモジュールのようです。

Spring Data Key Value - Reference Documentation

Spring Data KeyValue 1.1.2.RELEASE API

GitHub - spring-projects/spring-data-keyvalue: Project to provide infrastructure to implement Spring Data repositories on top of key-value-based, in-memory data stores.

こちらを使うことで、Key Valueなデータ構造に対して、Spring Dataでのアクセスができるようになるみたいですね。Spring Data Key Valueでは、Mapに対してのAdapterがあります。

で、これのHazelcast版のAdapterやクエリを実行する仕組みを備えたのが、Spring Data Hazelcastのようです。

Hazelcastでの利用するデータ構造は、IMap(Distributed Map)となります。

それでは、README.mdに沿って試してみましょう。

準備

Spring Data Hazelcastを使う際のMaven依存関係は、こちら。

        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>spring-data-hazelcast</artifactId>
            <version>1.0</version>
        </dependency>

Spring Data Hazelcast 1.0で引き込まれてくるHazelcastのバージョンは、3.6.4です。現時点のHazelcastの最新版は、3.7.1ですが。

サンプルとして動かす際にはSpring Bootを使用したいと思いますので、pom.xmlの全体的な構成は以下のようになりました。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>hazelcast-spring-data</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <spring.boot.version>1.4.0.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>spring-data-hazelcast</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring.boot.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Entity

では、まずEntityを作成します。お題は書籍とします。
src/main/java/org/littlewings/hazelcast/spring/entity/Book.java

package org.littlewings.hazelcast.spring.entity;

import java.io.Serializable;

import org.springframework.data.annotation.Id;
import org.springframework.data.keyvalue.annotation.KeySpace;

@KeySpace("books")  // @KeySpace を付けない場合は、EntityのFQCNがIMapの名前になる
public class Book implements Serializable {
    @Id
    private String isbn;
    private String title;
    private int price;

    public static Book create(String isbn, String title, int price) {
        Book book = new Book();
        book.setIsbn(isbn);
        book.setTitle(title);
        book.setPrice(price);

        return book;
    }

    public String getIsbn() {
        return isbn;
    }

    public void setIsbn(String isbn) {
        this.isbn = isbn;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}

Serializableは実装しておきましょう。

キーとなる項目に対しては、@Idアノテーションを付与しておく必要があります。

@KeySpaceについてはあってもなくてもいいのですが、Hazelcastの場合はここで指定した名前がIMap(Distributed Map)の名前(データ格納先)となります。
Keyspaces

指定しない場合はクラスのFQCNIMap名が取られてしまうようなので、指定しておく方がいいのかなと思います。

Repository

Spring Dataよろしく、こんなインターフェースを作成します。
src/main/java/org/littlewings/hazelcast/spring/repository/BookRepository.java

package org.littlewings.hazelcast.spring.repository;

import java.util.List;

import org.littlewings.hazelcast.spring.entity.Book;
import org.springframework.data.hazelcast.repository.HazelcastRepository;

public interface BookRepository extends HazelcastRepository<Book, String> {
}

この時に継承するインターフェースは、HazelcastRepositoryインターフェースとなります。
Usage

現時点では、特にメソッド定義は行いません。これだけでもfindAllやcountなど、基本的なメソッドが使用できるのはよいなと思います。

Core concepts

Configuration

最後に、Spring Data Hazelcastの設定を行う必要があります。
src/main/java/org/littlewings/hazelcast/spring/HazelcastConfig.java

package org.littlewings.hazelcast.spring;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.hazelcast.HazelcastKeyValueAdapter;
import org.springframework.data.hazelcast.repository.config.EnableHazelcastRepositories;
import org.springframework.data.keyvalue.core.KeyValueOperations;
import org.springframework.data.keyvalue.core.KeyValueTemplate;

@Configuration
@EnableHazelcastRepositories
public class HazelcastConfig {
    @Bean(destroyMethod = "shutdown")
    public HazelcastInstance hazelcastInstance() {
        return Hazelcast.newHazelcastInstance();
    }

    @Bean
    public KeyValueOperations keyValueTemplate(HazelcastKeyValueAdapter keyValueAdapter) {
        return new KeyValueTemplate(keyValueAdapter);
    }

    @Bean
    public HazelcastKeyValueAdapter hazelcastKeyValueAdapter(HazelcastInstance hazelcast) {
        return new HazelcastKeyValueAdapter(hazelcast);
    }
}

まず、@EnableHazelcastRepositoriesアノテーションを付けておくことがポイントです。

@Configuration
@EnableHazelcastRepositories
public class HazelcastConfig {

あとは、HazelcastInstance、HazelcastKeyValueAdapter、KeyValuteTemplateをセットアップする必要があります。
Usage

Hazelcast自体の設定は、この時にConfigで構築するか、設定ファイルから読み込んでHazelcastInstanceを作成すればよいでしょう。

使ってみる

それでは、使ってみます。実行は、テストコードで行うものとします。

まずは雛形。
src/test/java/org/littlewings/hazelcast/spring/SpringDataHazelcastTest.java

package org.littlewings.hazelcast.spring;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.query.EntryObject;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.PredicateBuilder;
import com.hazelcast.query.Predicates;
import com.hazelcast.query.SqlPredicate;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.littlewings.hazelcast.spring.entity.Book;
import org.littlewings.hazelcast.spring.repository.BookRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Sort;
import org.springframework.data.keyvalue.core.KeyValueOperations;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HazelcastConfig.class)
public class SpringDataHazelcastTest {
    // ここに、テストを書く!
}

とりあえず、作成したRepositoryを@Autowiredしておきます。

    @Autowired
    BookRepository bookRepository;

とすると、Repository越しにsave、findAll、countなど基本的なメソッドが使用できるようになります。

    @Test
    public void saveAndFind() {
        bookRepository.save(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848));

        assertThat(bookRepository.findOne("978-1785285332").getTitle())
                .isEqualTo("Getting Started With Hazelcast");

        assertThat(bookRepository.findAll())
                .hasSize(1);
        assertThat(bookRepository.count())
                .isEqualTo(1);
    }

また、HazelcastInstanceも@Autowiredして、実際にHazelcastのIMap(Distributed Map)にもデータが入ったことを確認してみましょう。

    @Autowired
    HazelcastInstance hazelcast;

    @Test
    public void saveAndFindAndUnderlying() {
        bookRepository.save(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848));

        assertThat(bookRepository.findOne("978-1785285332").getTitle())
                .isEqualTo("Getting Started With Hazelcast");

        assertThat(bookRepository.findAll())
                .hasSize(1);
        assertThat(bookRepository.count())
                .isEqualTo(1);

        // assertThat(hazelcast.getMap("org.littlewings.hazelcast.spring.entity.Book"))
        //         .hasSize(1);
        assertThat(hazelcast.getMap("books"))
                .hasSize(1);

        assertThat(hazelcast.<String, Book>getMap("books").get("978-1785285332").getTitle())
                .isEqualTo("Getting Started With Hazelcast");
    }

HazelcastInstance#getMapする時の名前が、@KeySpaceで指定した名前(未指定の場合はEntityのFQCN)となります。

Queryを投げる

Spring Data Hazelcastでも、作成したRepositoryに命名規則に沿ったメソッドを定義することで、Queryを定義することができます。

というか、これ自体はSpring Data Key Valueの話ですね。
Query methods

そんなわけで、先ほどのBookRepositoryにメソッドを追加します。

public interface BookRepository extends HazelcastRepository<Book, String> {
    Book findByTitle(String title);

    List<Book> findByPriceGreaterThan(int price);
}

これで、Queryが投げられるようになります。

    @Test
    public void query() {
        bookRepository.save(Arrays.asList(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848),
                Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
                Book.create("978-1783988181", "Mastering Redis", 6172)));

        assertThat(bookRepository.findByTitle("Getting Started With Hazelcast").getPrice())
                .isEqualTo(3848);

        List<Book> books = bookRepository.findByPriceGreaterThan(4000);

        assertThat(books.stream().map(Book::getTitle).collect(Collectors.toList()))
                .hasSize(2)
                .containsExactly("Infinispan Data Grid Platform Definitive Guide", "Mastering Redis");
    }

すべての命名に沿ったQueryがサポートされているわけではないと思いますが、使えるのはこのあたりではないでしょうか。
https://github.com/hazelcast/spring-data-hazelcast/blob/v1.0/src/main/java/org/springframework/data/hazelcast/repository/query/HazelcastQueryCreator.java#L137-L185

Predicateを使う

先ほどのRepositoryインターフェースにメソッドを定義する形以外に、もうちょっと凝ったことをしたいと思う場合には、HazelcastのPredicateとSpring Data Key ValueのKeyValueTemplateとKeyValueQueryを使うのかなと思います。
※Spring Data Key ValueにQueryDSLのサポートがあったみたいでしたが、こちらは今回は置いておきます
Querydsl Extension

Configurationで定義した、KeyValueTemplate(KeyValueOperations)を@Autowiredします。

    @Autowired
    KeyValueOperations keyValueOperations;

あとは、こちらにKeyValueQueryを渡してQueryを作ります。KeyValueQueryには、HazelcastのPredicateを渡せばよさそうです。

以下、パターン別に見てみましょう。

PredicatesでQuery
    @Test
    public void usingPredicates() {
        bookRepository.save(Arrays.asList(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848),
                Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
                Book.create("978-1783988181", "Mastering Redis", 6172)));

        Predicate<String, Book> predicate =
                Predicates.and(Predicates.equal("isbn", "978-1785285332"), Predicates.greaterEqual("price", 3000));

        KeyValueQuery<Predicate<String, Book>> query = new KeyValueQuery<>(predicate);

        Iterable<Book> books = keyValueOperations.find(query, Book.class);

        assertThat(StreamSupport.stream(books.spliterator(), false).map(Book::getTitle).collect(Collectors.toList()))
                .hasSize(1)
                .containsExactly("Getting Started With Hazelcast");
    }

PredicateBuilder/EntryObjectでQuery
    @Test
    public void usingPredicateBuilder() {
        bookRepository.save(Arrays.asList(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848),
                Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
                Book.create("978-1783988181", "Mastering Redis", 6172)));

        EntryObject e = new PredicateBuilder().getEntryObject();
        Predicate<String, Book> predicate =
                e.get("isbn").equal("978-1785285332").and(e.get("price").greaterEqual(3000));

        KeyValueQuery<Predicate<String, Book>> query = new KeyValueQuery<>(predicate);

        Iterable<Book> books = keyValueOperations.find(query, Book.class);

        assertThat(StreamSupport.stream(books.spliterator(), false).map(Book::getTitle).collect(Collectors.toList()))
                .hasSize(1)
                .containsExactly("Getting Started With Hazelcast");
    }

SqlPredicateでQuery
    @Test
    public void usingSqlPredicate() {
        bookRepository.save(Arrays.asList(Book.create("978-1785285332", "Getting Started With Hazelcast", 3848),
                Book.create("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
                Book.create("978-1783988181", "Mastering Redis", 6172)));

        Predicate<String, Book> predicate = new SqlPredicate("price > 4000");
        KeyValueQuery<Predicate<String, Book>> query = new KeyValueQuery<>(predicate);
        query.setSort(new Sort(Sort.Direction.DESC, "price"));

        Iterable<Book> books = keyValueOperations.find(query, Book.class);

        assertThat(StreamSupport.stream(books.spliterator(), false).map(Book::getTitle).collect(Collectors.toList()))
                .hasSize(2)
                .containsExactly("Mastering Redis", "Infinispan Data Grid Platform Definitive Guide");
    }

しれっと、SqlPredicatesだけソートを入れてあります…。

KeyValueQueryに対してソートを仕込めるので、これを使えばいいのかな?
Sorting

Predicateでやる場合は、PagingPredicateを使うのだと思いますが、HazelcastのPredicateの説明自体はHazelcastのドキュメントを参照のこと…。

Distributed Query

@Transactionalはどうした?

今回試していません。Hazelcast 3.7からSpringの@Transactionalのサポートが追加されているようなので、Hazelcastを3.7にしてhazelcast-springを使ったらできたりするのかな?
https://github.com/hazelcast/hazelcast/blob/v3.7.1/hazelcast-spring/src/main/java/com/hazelcast/spring/transaction/HazelcastTransactionManager.java

まとめ

Spring Data Hazelcastを試してみました。ベースのSpring Data Key Value自体、初めて使うものだったので少々設定などでオロオロしたところもありましたが、ふつうに使うことができました。

Springとの統合が進んでて、いいですねー。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-spring-data

2016-09-04

Reactor CoreのProcessors(Hot Publishing)を試す

Reactor CoreのREADMEに載っている最後のテーマ、Hot Publishing:Processorsを試してみます。

Hot Publishing : Processors

いくつかのProcessorがあるようですが、そこからSinkを取得してSubscribeしたり、Processorに直接Subscriberを登録してSubscribeするといった使い方になるようです。

とりあえず、順次使っていってみましょう。

準備

Maven依存関係は、こんな感じ。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

9月になってから、Reactor Coreが3.0.1.RELEASE、3.0.2.RELEASEとリリースされています。

また、テストコードにはTestSubscriberを使うのがよいと@makingさんに教えていただいたので






こちらを使ってテストコードを書いてみることにします。

とはいえ、このツイートの通りテストコードそのものはMaven Centralにアップロードされていないので、今は自分で手元に置くことにしておきましょう。

$ wget https://raw.githubusercontent.com/reactor/reactor-core/v3.0.2.RELEASE/src/test/java/reactor/test/TestSubscriber.java -O src/test/java/reactor/test/TestSubscriber.java

こちらを使って、できる範囲でテストコードを書いていくことにします。

また、テストコード全体の雛形は、こんな感じ。
src/test/java/org/littlewings/reactor/processors/HotPublishingProcessorsTest.java

package org.littlewings.reactor.processors;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Test;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.TopicProcessor;
import reactor.core.publisher.WorkQueueProcessor;
import reactor.test.TestSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HotPublishingProcessorsTest {
    // ここに、テストを書く!
}

では、見ていってみます。

EmitterProcessor

N個のSubscriberを紐付けることができる、Processor。

Pub-Sub : EmitterProcessor

READMEに非同期〜みたいなことが書いてありますが、Javadocを見ると内部的にループして同期的っぽく処理する方法を取ってそうな感じです。
http://projectreactor.io/core/docs/api/?reactor/core/publisher/EmitterProcessor.html

では、コードを書いてみます。

    @Test
    public void pubSub_emitterProcesssr() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

        emitter.subscribe(subscriber);

        sink.next(1);
        sink.next(2);

        subscriber.assertValues(1, 2);

        sink.next(3);
        subscriber.assertValues(1, 2, 3);

        sink.complete();
        subscriber.assertComplete();
    }

まずはEmitterProcessor#createでEmitterProcessorを取得し、そこからBlockingSinkを取得します(READMEとちょっと違う)。

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

このSinkにSubscriberを紐付けます。ここでのSubscriberはTestSubscriberです。

        emitter.subscribe(subscriber);

あとはBlockingSink#nextでデータを送ります。

        sink.next(1);
        sink.next(2);

        subscriber.assertValues(1, 2);

        sink.next(3);
        subscriber.assertValues(1, 2, 3);

Subscriber側では、受信ができます、と。

BlockingSink#completeで終了。

        sink.complete();
        subscriber.assertComplete();

続いて、複数のSubscriberを紐付けてみましょう。

    @Test
    public void pubSub_emitterProcessorAndSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();
        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();
        emitter.subscribe(subscriber1);
        emitter.subscribe(subscriber2);
        sink.next(1);
        sink.next(2);

        subscriber1.assertValues(1, 2);
        subscriber2.assertValues(1, 2);

        sink.next(3);
        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        sink.complete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

普通にどのSubscriberもデータを受信できますし、BlokingSink#completeするとどのSubscriberも終了します、と。

また、Subscriberを紐付けていない状態でBlockingSink#nextすると、例外となります。

    @Test
    public void pubSub_emitterProcesssrOverflow() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

        assertThatThrownBy(() -> sink.next(1))
                .isInstanceOf(IllegalStateException.class)
                .hasMessage("The receiver is overrun by more signals than expected (bounded queue...)");
    }

なるほど。

ReplayProcessor

続いて、ReplayProcessor。このProcessorもN個のSubscriberを紐付けることができますが、このProcessorは発生したイベントを覚えておくことができます。

Pub-Sub Replay : ReplayProcessor

覚えておくサイズや方法は、ある程度設定することができます。

まずは例。

    @Test
    public void pubSub_replayProcessor() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        replayer.onNext(1);
        replayer.onNext(2);

        subscriber1.assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValues(1, 2);

        replayer.onNext(3);

        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

ここでも使うSubscriberは、TestSubscriberとします。

最初にReplayProcessor#createでRelayProcessorを作成し、RelayProcessorに直接Subscriberを紐付けます。RelayProcessor#onNextでデータを送ると、Subscriberが受信します。ここまでは、普通です。

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        replayer.onNext(1);
        replayer.onNext(2);

        subscriber1.assertValues(1, 2);

この後にSubscriberを追加すると、この時点までに送られた内容も受信します。

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValues(1, 2);

結果、途中で追加されたSubscriberも、他のSubscriberに追いついたような動きになります。

        replayer.onNext(3);

        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();

終了は、ReplayProcessor#onCompleteで。

で、どのくらい過去の履歴を保持しているかですが、デフォルトでは256です。でも、それ以上の数を与えても過去のデータを持っていたりします。

    @Test
    public void pubSub_replayProcessorOverflow() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        IntStream.rangeClosed(1, 300).forEach(replayer::onNext);

        subscriber1.assertValueCount(300L);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValueCount(300L);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

これは、ReplayProcessor#createの引数で設定することができますが、historySizeが256で、限界を越えた時もメモリに溜め込むような設定(unbounded)になってるからです。

ここで、historySizeの明示とunbounded=falseにしてReplayProcessorを作成してみます。

    @Test
    public void pubSub_replayProcessorOverflow2() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create(256, false);

        replayer.subscribe(subscriber1);

        IntStream.rangeClosed(1, 300).forEach(replayer::onNext);

        subscriber1.assertValueCount(300L);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValueCount(256L);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

追加したSubscriber側では、256個までしかイベントを受信しなくなりました。

TopicProcessor

次は、TopicProcessor。TopicProcessorはpublish-subscribeを、非同期のイベントループで行うProcessorです。

Async Pub-Sub : TopicProcessor

このため、ここまでのProcessorのコードと異なり、テストコードだと待ち合わせを考慮しないと、テストが不安定になります。が、このあたりはTestSubscriberがうまくやってくれます。

では、作成したコード。

    @Test
    public void asyncPubSub_topicProcessor() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber);

        topic.onNext(1);
        topic.onNext(2);

        subscriber
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        topic.onNext(3);

        subscriber
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3);

        topic.onComplete();
        subscriber.await().assertComplete();
    }

TopicProcessor#createした後にSubscriberを紐付けるところまではふつうですが、

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber);

TopicProcessor#onNextでデータを送った後は、テストコードとしてはSuscriber側でwaitの考慮がないとうまく動かなくなります。

        topic.onNext(1);
        topic.onNext(2);

        subscriber
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

まあ、非同期に動いてますよってことですね。

TopicProcessor#onCompleteも非同期。

        topic.onComplete();
        subscriber.await().assertComplete();

Subscriberを複数紐付けた場合は、途中からの受信になります。

   @Test
    public void asyncPubSub_topicProcessorAndSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber1);

        topic.onNext(1);
        topic.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        topic.subscribe(subscriber2);

        topic.onNext(3);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3);

        subscriber2
                .awaitAndAssertNextValueCount(1L)
                .assertValues(3);

        topic.onComplete();

        subscriber1.await().assertComplete();
        subscriber2.await().assertComplete();
    }

WorkQueueProcessor

最後は、WorkQueueProcessorです。このProcessorは、TopicProcessorに似ていますが複数のSubscriberを紐付けて、各Subscriberにイベントは送信されないようです。どれかひとつって感じのようですね。

Async Distributed : WorkQueueProcessor

また、Reactive Streamsでの仕様は、一部守っていないっぽい?です。

The processor is very similar to TopicProcessor but only partially respects the Reactive Streams contract.

The purpose of this processor is to distribute the signals to only one of the subscribed subscribers and to share the demand amongst all subscribers. The scenario is akin to Executor or Round-Robin distribution. However there is no guarantee the distribution will be respecting a round-robin distribution all the time.

http://projectreactor.io/core/docs/api/?reactor/core/publisher/WorkQueueProcessor.html

とりあえず、コードを書いてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(subscriber1);
        queue.onNext(1);
        queue.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        queue.subscribe(subscriber2);

        queue.onNext(3);
        queue.onNext(4);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3, 4);
        subscriber2
                .awaitAndAssertNextValueCount(0L);

        queue.onComplete();

        subscriber1.await().assertComplete();
        subscriber2.await().assertComplete();
    }

WorkQueueProcessor#createでWorkQueueProcessorを作成し、Subscriberを紐付けます。

        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(subscriber1);
        queue.onNext(1);
        queue.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

ですが、途中で紐付けたSubscriberにはonNextで送った内容が来なかったり。

        queue.onNext(3);
        queue.onNext(4);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3, 4);
        subscriber2
                .awaitAndAssertNextValueCount(0L);

こちらも、やっぱり非同期で動いているようです。

最初から、2つSubscriberを紐付けてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribers2() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(1);
        queue.onNext(2);

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 2
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6

全部、Subscriber1に寄りました…。

そこで、今度はWorkQueueProcessor#shareでWorkQueueProcessorを作成してみます。Subscriberは、途中から紐付けます。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribersShared() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.share(true);
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.onNext(1);
        queue.onNext(2);

        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 2
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6

ちなみに、この結果は不定で、subscriber2に寄ることもありました。

もう少し長めに紐付けてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribersShared2() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.share(true);
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(1);
        queue.onNext(2);

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6
subscriber2: 2

subscriber2も現れました。

まあ、ひとつのイベントを、ひとつのSubscriberのみに配信するよってことだと思いますが、ちょっと試したデータ件数が少なかったかな?

まとめ

Reactor CoreのREADME.mdに載っている、各種Processorを試してみました。

それぞれ特性が違うのはよくわかりましたが、さてどう使ったものかな?という感じです。

とりあえず、こんなところで。