Hatena::ブログ(Diary)

CLOVER

2016-08-28

SLF4J Localizationを試す

SLF4J extensionsの中にある、SLF4J Localizationですが、こちらをちょっと試しておきたいなと思いまして。

Localization

Javadocは、こちら。

SLF4J 1.7.21 API

このSLF4J Localizationを使うことで、ログメッセージを国際化対応させることができます。

使い方はある程度ドキュメントを見たらわかるのですが、

  • 通常のSLF4JのLoggerの代わりに、LocLoggerを使う
  • ログメッセージを記述した.propertiesファイルを用意する(使用するLocaleの分だけ)
  • 用意したログメッセージの.propertiesファイルの各キーに対応する、Enumを用意する
  • 用意したEnumを引数に、LocLoggerを使ってログ出力

といった感じです。

要するにログメッセージは外部ファイル管理になり、メッセージはEnumで指定するという使い方になります。

また、Pluggable Annotation Processing APIを使用しており、Enumとプロパティファイルのキーが合わなかったりすると、コンパイルエラーになります。

内部的にはCompiler Assisted Localization (CAL10N)というものを使っているので、さらに詳細はこちらを参照。

CAL10N Home

Manual

では、使ってみましょう。

ちなみに、ログメッセージの国際化対応というよりは、プロパティファイルでの管理を試してみたい、という動機だったりします…。

準備

まずは、Maven依存関係から。

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-ext</artifactId>
            <version>1.7.21</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

「slf4j-ext」が必要になります。「slf4j-api」は、「slf4j-ext」が推移的に解決してくれるので、なくてもいいです。ログ出力は今回はslf4j-simpleとしました。

JUnitはただの実行用です。

使ってみる

それでは、SLF4J Localizationを使ってみましょう。

とりあえず、雛形的にこんなコードを用意。
src/test/java/org/littlewings/slf4j/localization/Slf4jLocalizationTest.java

package org.littlewings.slf4j.localization;

import java.util.Locale;

import ch.qos.cal10n.IMessageConveyor;
import ch.qos.cal10n.MessageConveyor;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.cal10n.LocLogger;
import org.slf4j.cal10n.LocLoggerFactory;

public class Slf4jLocalizationTest {
    // ここに、テストを書く!
}

まずは、普通にSLF4Jを使った場合。見慣れたものかと思います。

    @Test
    public void useNormalSlf4j() {
        Logger logger = LoggerFactory.getLogger(getClass());
        logger.info("Hello SLF4J!!");
        logger.info("Hello {}", "SLF4J!!");
    }

続いて、SLF4J Localizationを使う場合。

最初にEnumを定義しておく必要があります。
src/test/java/org/littlewings/slf4j/localization/HelloLocalization.java

package org.littlewings.slf4j.localization;

import ch.qos.cal10n.BaseName;
import ch.qos.cal10n.Locale;
import ch.qos.cal10n.LocaleData;

@BaseName("hello-localization")
@LocaleData(value = @Locale("ja_JP"),
        defaultCharset = "UTF-8")
public enum HelloLocalization {
    HELLO_WORLD,
    HELLO_WORLD_WITH_ARGS
}

作成したEnumでは、@BaseNameアノテーションで作成するログメッセージを管理する、基底部分の名前を定義します。また、@LocaleDataで利用するLocaleを記述する必要があります。今回は、「ja_JP」のみとしました。

また、用意するプロパティファイルは、native2asciiをかけておく必要はなかったりします(かかっていてもいいみたいですが)。

とりあえず、用意したプロパティファイルはこちら。今回は@Localeで「ja_JP」を指定したので、用意するファイルは「hello-localization_ja_JP.properties」となります。
src/test/resources/hello-localization_ja_JP.properties

HELLO_WORLD=こんにちは、世界
HELLO_WORLD_WITH_ARGS=こんにちは、{0} {1}

プロパティファイルのエンコーディングの指定は、@LocaleDataのdefaultCharset、または@Localeのcharsetで指定が可能です。

@LocaleData(value = @Locale("ja_JP"),
        defaultCharset = "UTF-8")

Pick your charset, per locale (no native2ascii)

しれっと引数を取るメッセージも用意していますが、こちらについてはMessageFormatの書式で指定することができます。

HELLO_WORLD_WITH_ARGS=こんにちは、{0} {1}

Retrieving internationalized messages

では、呼び出し側を書いてみましょう。

    @Test
    public void useLocLogger() {
        IMessageConveyor messageConveyor = new MessageConveyor(Locale.JAPAN);
        LocLoggerFactory loggerFactory = new LocLoggerFactory(messageConveyor);
        LocLogger logger = loggerFactory.getLocLogger(getClass());

        logger.info(HelloLocalization.HELLO_WORLD);
        logger.info(HelloLocalization.HELLO_WORLD_WITH_ARGS, "SLF4J", "Localization");
    }

通常のSLF4Jとは違い、先にIMessageConveyorのインスタンスを生成する必要があります。IMessageConveyorは、CAL10N側のインターフェースです。

        IMessageConveyor messageConveyor = new MessageConveyor(Locale.JAPAN);

このIMessageConveyorを使ってLocLoggerFactoryを生成し、それからLocLoggerを取得します。

        LocLoggerFactory loggerFactory = new LocLoggerFactory(messageConveyor);
        LocLogger logger = loggerFactory.getLocLogger(getClass());

あとは、LocLoggerのinfoやdebugなどの各種ログ出力時に、第1引数に先ほど作成したEnumを渡してログ出力を行います。

        logger.info(HelloLocalization.HELLO_WORLD);
        logger.info(HelloLocalization.HELLO_WORLD_WITH_ARGS, "SLF4J", "Localization");

実行結果はこちら。

[main] INFO org.littlewings.slf4j.localization.Slf4jLocalizationTest - こんにちは、世界
[main] INFO org.littlewings.slf4j.localization.Slf4jLocalizationTest - こんにちは、SLF4J Localization

なお、LocLoggerはSLF4JのLoggerを拡張したものでもあるので、通常のSLF4JのLoggerとしても使うことができます。

        logger.error("エラー!");

ところで、Pluggable Annotation Processing APIでチェックが入るよ、ということでしたが、例えばEnum側にしか存在しないキーを書いて

@BaseName("hello-localization")
@LocaleData(value = @Locale("ja_JP"),
        defaultCharset = "UTF-8")
public enum HelloLocalization {
    HELLO_WORLD,
    HELLO_WORLD_WITH_ARGS,
    ONLY_ENUM_KEY
}

コンパイルしようとすると、プロパティファイルの方にそんなキーないよ、と怒られます。

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:testCompile (default-testCompile) on project slf4j-localization-example: Compilation failure
[ERROR] /path/to/src/test/java/org/littlewings/slf4j/localization/HelloLocalization.java:[10,8] Key [ONLY_ENUM_KEY] present in enum type [org.littlewings.slf4j.localization.HelloLocalization] but absent in resource bundle named [hello-localization] for locale [ja_JP]

反対に、プロパティファイルにしかないキーを定義しても

HELLO_WORLD=こんにちは、世界
HELLO_WORLD_WITH_ARGS=こんにちは、{0} {1}
ONLY_PROPERTY_FILE_KEY=こんにちは、{0} {1}

やっぱりエラーになります。

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:testCompile (default-testCompile) on project slf4j-localization-example: Compilation failure
[ERROR] /paht/to/src/test/java/org/littlewings/slf4j/localization/HelloLocalization.java:[10,8] Key [ONLY_PROPERTY_FILE_KEY] present in resource bundle named [hello-localization] for locale [ja_JP] but absent in enum type [org.littlewings.slf4j.localization.HelloLocalization]

詳細は、このあたりを。

JSR-269 support, i.e. verification at compile time

使うかどうかは考えどころですが、こういうのがあるということは押さえておこうかなと思います。

2016-08-27

Reactor CoreのSchedulersとParallelFluxで遊ぶ

Reactorをちょっとずつ触ってみようという話、次はSchedulersとParallelFluxということで。

Schedulers

https://github.com/reactor/reactor-core#parallelflux=title=ParallelFlux

まあ、GitHubに書かれている内容を上から試していってるだけなんですけど。

Schedulersではタスクをどのスケジューラー上(実際はスレッドプールっぽいですけど)で実行するか、ParallelFluxではタスクを分割しての並行実行についてをテーマとしているみたいです。

とりあえず、使ってみましょう。

準備

Maven依存関係は、以下のとおり。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

そういえば、Reactor Coreが3.0.0.RELEASEになりましたね。

特に値の検証とかはやらないんですけど、テストコードで実行するのでJUnitを入れています。

テストコードの全体像は、以下のようにしています。
src/test/java/org/littlewings/reactor/schedulerparallelism/ReactorSchedulerParallelismTest.java

package org.littlewings.reactor.schedulerparallelism;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactorSchedulerParallelismTest {
    // ここにテストを書く!
}

Schedulers

では、まずはGitHub上のサンプルに習ってSchedulersから。

こんなコードを用意。

    @Test
    public void schedulers() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .publishOn(Schedulers.single())
                .flatMap(dateTime ->
                        Mono.fromCallable(() -> {
                            TimeUnit.SECONDS.sleep(1L);
                            return dateTime;
                        }).subscribeOn(Schedulers.parallel()), 8)
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

これを実行すると、こんな結果になります。

[parallel-3] time -> 2016-08-27T18:03:40.819
[parallel-3] time -> 2016-08-27T18:03:40.820
[parallel-3] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-2] time -> 2016-08-27T18:03:40.820
[parallel-2] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-1] time -> 2016-08-27T18:03:40.820
[parallel-1] time -> 2016-08-27T18:03:40.820

〜省略〜

書いたコードがどういう意味か?という話ですが、Mono#fromCallableで現在時刻を取得する処理をrepeatで繰り返します。

        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()

それをシングルスレッド上(Schedulers#single)で動かします。

                .publishOn(Schedulers.single())

次に、FlatMapで1度sleepを置いていますが、この時に並列度8にしてSchedulers#parallelで並列実行可能にします。ログ出力している時に、並列っぽく動いているのはこのためですね。

                .flatMap(dateTime ->
                        Mono.fromCallable(() -> {
                            TimeUnit.SECONDS.sleep(1L);
                            return dateTime;
                        }).subscribeOn(Schedulers.parallel()), 8)

最後に、現在のスレッド名と共にMono#fromCallableで取得したLocalDateTimeを出力しています。

                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

ParallelFlux

続いて、ParallelFlux。

    @Test
    public void parallelFlux() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())
                .sequential()
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

コードとしてはParallelFluxという名前は出てきていませんが、Mono#repeatを呼んだ後のFlux#parallelの後からは、ParallelFluxになっています。

こちらを実行すると、こんな結果になります。

[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439

〜省略〜

スレッド名、ひとつだけですね…。このあたりはもう少し後で見てみましょう。

組んだコードを見直してみます。Mono#fromCallable、repeatの後に、並列度8でSchedulers#parallel上で動作するように設定します。

        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())

その結果を、シーケンシャルにsubscribeするように設定しています。

                .sequential()

ちなみに、sequencialという名前ですが、結果の渡ってくる順番については"unordered"なのでご注意を。実際にログを見たらわかりますが、LocalDateTime#nowの順番で並ぶわけではありません。

で、最後に結果を出力、と。

                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

なるほど。とりあえず、GitHubに載っているものは試しました。

でも、これだけでは面白くありません。もう少しいろいろやってみましょう。

ParallelFlux#groups

先ほどのParallelFluxの例でしたが、sequentialを使いました。もうひとつ、ParallelFlux#groupsを使う方法もあるようです。

使ってみたコードが、こちら。

    @Test
    public void parallelFluxGroups() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())
                .groups()
                .subscribe(grouped ->
                        grouped.subscribe(dateTime ->
                                System.out.printf("[%s] group[%s] time -> %s%n", Thread.currentThread().getName(), grouped.key(), dateTime)
                        )
                );

        TimeUnit.SECONDS.sleep(3L);
    }

結果は、このように。

[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936

〜省略〜

[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895

〜省略〜

[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-7] group[6] time -> 2016-08-27T18:23:53.042
[parallel-7] group[6] time -> 2016-08-27T18:23:53.042
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068

とまあ、並列っぽく動いている感じになります。

先ほどのsequentialの例から変わったのは、ここだけですね。

                .groups()
                .subscribe(grouped ->
                        grouped.subscribe(dateTime ->
                                System.out.printf("[%s] group[%s] time -> %s%n", Thread.currentThread().getName(), grouped.key(), dateTime)
                        )

sequentialの部分をgroupsに変えています。すると、そこから返るParallelFluxが

Flux<GroupedFlux<java.lang.Integer,T>>

となります。subscribe時に、GroupedFluxに対してさらにsubscribeすることになります。GroupedFluxもFluxです。

groupsを使うと、グループ単位でsubscribe部の並列実行が可能になるようですね。

delay

先ほどの例では、開始とともにMono#fromCallableに対してsubscribeにし行っていたのですが、Mono#delay〜を使用することでそのタイミングをコントロールすることができます。

今回は、500ミリ秒おきにsubscribeしてみましょう。

    @Test
    public void schedulersDelayed() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(5L);
    }

結果。今回は、これで全部です。subscribe時も、ParallelFlux#runOnで割り当てたスケジューラ上で各スレッドで実行されているようですね。

[parallel-1] time -> 2016-08-27T18:31:03.225
[parallel-2] time -> 2016-08-27T18:31:03.727
[parallel-3] time -> 2016-08-27T18:31:04.228
[parallel-4] time -> 2016-08-27T18:31:04.728
[parallel-5] time -> 2016-08-27T18:31:05.229
[parallel-6] time -> 2016-08-27T18:31:05.730
[parallel-7] time -> 2016-08-27T18:31:06.231
[parallel-8] time -> 2016-08-27T18:31:06.732
[parallel-1] time -> 2016-08-27T18:31:07.233

Schedulerとスレッド

1番最初に乗せた例はflatMapなどいろいろあったので、もっと単純にしてみましょう。

    @Test
    public void fromCallThreadSingle() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .repeat()
                // .publishOn(Schedulers.single())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

ただ、ここからはMono#fromCallableがどのスレッドで呼ばれたかも見ていくことにします。

        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))

Entryというクラスの定義は、こんな感じです。
src/test/java/org/littlewings/reactor/schedulerparallelism/Entry.java

package org.littlewings.reactor.schedulerparallelism;

import java.time.LocalDateTime;

public class Entry {
    private String name;
    private LocalDateTime time;

    public static Entry of(String name, LocalDateTime time) {
        Entry entry = new Entry();
        entry.name = name;
        entry.time = time;
        return entry;
    }

    public String getName() {
        return name;
    }

    public LocalDateTime getTime() {
        return time;
    }

    @Override
    public String toString() {
        return String.format("thread[%s], time[%s]", name, time);
    }
}

で、動かしてみると全部mainスレッド上で動き出します。

[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]

〜省略〜

ここで、repeatの後のFlux#publishOnにSchedulers#singleを渡しているところのコメントアウトを外すと

                .repeat()
                .publishOn(Schedulers.single())

こうなります。

[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]

どのスケジューラー上で動かすかは、両方のスレッドに影響するんですね、と。

続いて、ParallelFlux。

    @Test
    public void fromCallThreadParallel() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

このサンプルの場合、こういう結果になります。

[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]

〜省略〜

[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]

〜省略〜

[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]

こちらも、Mono#fromCallableとsubscribeの両方の動作に使用されるスレッドに影響があることがわかりました。

なお、ParallelFlux#parallelで並列度を指定する他、Schedulerも並列度が指定できるのですが、両方指定している場合はParallelFlux#parallelで指定した並列度で動作するようです。そりゃあそうか、という気はしますね…。

Schedulerとdelayとスレッド

delayさせてみましょう。

    @Test
    public void fromCallThreadSingleWithDelayed() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                // .publishOn(Schedulers.single())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

結果は、こんな感じ。まあ、事実上singleなので、そうですねと。

[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:45.534]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:46.036]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:46.537]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:47.038]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:47.539]

続いてParallelFlux。

    @Test
    public void fromCallThreadParallelWithDelayed() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

こちらでは、Mono#fromCallableを呼ぶスレッドがシングルスレッドになり、subscribe側で各スレッドが使われるようになります。

[parallel-1] entry -> thread[timer-1], time[2016-08-27T18:43:01.802]
[parallel-2] entry -> thread[timer-1], time[2016-08-27T18:43:02.305]
[parallel-3] entry -> thread[timer-1], time[2016-08-27T18:43:02.806]
[parallel-4] entry -> thread[timer-1], time[2016-08-27T18:43:03.306]
[parallel-5] entry -> thread[timer-1], time[2016-08-27T18:43:03.807]

まとめ

Reactor Coreで、SchedulersとParallelFluxをちょっとかじってみました。

興味本位でいくつかパターンを試してみましたが、けっこう挙動が変わるんですね。ちょっとずつ慣れていきましょう。

2016-08-20

はじめてのReactor Core

Reacive〜と名の付くものに関する話題について、そろそろ少しずつ追ってみようかなということで。

とりあえず、触りつつ試しつつ始めてみようと思い、まずはReactorを触ってみることにしました。

Project Reactor

GitHub - reactor/reactor: A set of foundational libraries for building reactive cloud-ready applications on the JVM.

Reactorとは、Pivotalが開発しているReactive Streamsの実装っぽいです。

Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3

まあ、いろいろ難しいことは抜きにして、単純に動かしてみましょう。

準備

今回は、Reactor Coreを使ってコードを書いて試すことにします。

GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM

Reactor Coreは3.0.0.RC2が最新のようですが、Maven Centralにはまだないので、SpringのMilestoneリポジトリを加えます。

        <repository>
            <id>spring-milestone</id>
            <name>Spring Milestone Repository</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>

あとは、Reactor Coreを動作確認用のテストライブラリをMaven依存関係に記述。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.0.RC2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

はじめてのReactor

とりあえずまったく経験がないのですが、Reactor Coreのドキュメントや軽く調べてみるとFluxとMonoというものを使ってみるようです。

Getting Started

これを書いた後に気付きましたが、ハンズオンを見たらよかったかも…。
GitHub - reactor/lite-rx-api-hands-on: Lite Rx API Hands-On with Reactor Core 3.x

その他、参考。
Reactor Core 2.5: もう一つのJava向けReactive Extensions実装 - Qiita

JavaでReactiveプログラミング ~Reactor~ - Qiita

感覚的には、Fluxが複数の値を扱えるもので、Monoが単発の値を扱えるものみたいですね。これをStreamによく似たAPIで扱える感じです。


src/test/java/org/littlewings/reactor/helloworld/HelloReactor.java

package org.littlewings.reactor.helloworld;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.assertj.core.api.Assertions.assertThat;

public class HelloReactor {
    // ここに、テストを書く!
}

Flux

では、まずはFluxから。今回は、ListからFluxを作成して、Subscirbeで受け取ってみます。

    @Test
    public void flux() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux");
    }

単純に、Fluxを作成した時の値がそのまま渡ってきています。

mapで変換。

    @Test
    public void fluxMap() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .map(s -> "★" + s + "★")
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★");
    }

takeで、最初の2つを選ぶ。

    @Test
    public void fluxTake() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .take((2))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World");
    }

filterで絞り込み。

    @Test
    public void fluxFilter() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .filter(s -> s.contains("e"))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "Reactor");

    }

bufferで、複数の要素をまとめてsubscribe時にListで受け取る。

    @Test
    public void fluxBuffer() {
        List<List<String>> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .buffer(2)
                .subscribe(strings -> received.add(strings));

        assertThat(received).containsExactly(Arrays.asList("Hello", "World"), Arrays.asList("Reactor", "Flux"));
    }

2つのFluxをマージ。

    @Test
    public void fluxMergeWith() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .mergeWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D")))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux", "A", "B", "C", "D");
    }

2つのFluxをzipで結合。

    @Test
    public void fluxZip() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .zipWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D")))
                .subscribe(tuple -> received.add(tuple.getT1() + "-" + tuple.getT2()));

        assertThat(received).containsExactly("Hello-A", "World-B", "Reactor-C", "Flux-D");
    }

もちろん、これらの操作を組み合わせて使うこともできます。

    @Test
    public void fluxUsing() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .take(2)
                .map(s -> "★" + s + "★")
                .mergeWith(Flux.fromIterable(Arrays.asList("A", "B")))
                .buffer(2)
                .subscribe(strings -> received.add(strings.get(0) + "-" + strings.get(1)));

        assertThat(received).containsExactly("★Hello★-★World★", "A-B");
    }

また、subscribeで受け取るだけではなく、Iterableなどの形で戻り値としても返却することも可能です。

    @Test
    public void fluxBlock() {
        Iterable<String> result = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .map(s -> "★" + s + "★")
                .toIterable();

        assertThat(result).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★");
    }

ブロックするのは嫌われるでしょうから、あんまりやらないでしょうけれど。

他にもいろいろFluxに関する操作はあるみたいですが、とりあえずこのくらいで。

Mono

続いてMono。

今回は、Mono#createでインスタンスを作成。

    @Test
    public void mono() {
        AtomicReference<String> result = new AtomicReference<>();

        Mono.fromSupplier(() -> "Hello World")
                .subscribe(s -> result.set(s));

        assertThat(result.get()).isEqualTo("Hello World");
    }

Fluxのようにmapなどの操作が可能です。

    @Test
    public void monoMap() {
        AtomicReference<String> result = new AtomicReference<>();

        Mono.fromSupplier(() -> "Hello World")
                .map(s -> "★" + s + "★")
                .subscribe(s -> result.set(s));

        assertThat(result.get()).isEqualTo("★Hello World★");
    }

subscribeではなく、結果を戻り値として受け取ることもできます。

    @Test
    public void monoResult() {
        String result = Mono.fromSupplier(() -> "Hello World")
                .map(s -> "★" + s + "★")
                .block();

        assertThat(result).isEqualTo("★Hello World★");
    }

blockと、いかにも使うのをためらいそうな名前ですが。

FluxとMonoの変換

FluxとMonoは、両者の変換が可能です。

たとえば、FluxからMonoにするには、Flux#nextなどが使えます。

    @Test
    public void fluxToMono() {
        Mono<String> mono = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .next();

        assertThat(mono.block()).isEqualTo("Hello");
    }

他にもlastや、単一要素のみ保持している場合にはsingleが使えたりします。

Flux#collectを使って、単一の要素にしてMonoにすることもできます。

    @Test
    public void fluxToMonoCollect() {
        Mono<String> mono = Flux.fromIterable(Arrays.<CharSequence>asList("Hello", "World", "Reactor", "Flux"))
                .collect(Collectors.joining("-"));

        assertThat(mono.block()).isEqualTo("Hello-World-Reactor-Flux");
    }

そして、MonoからFlux。今回は、Mono#fluxで変換。

    @Test
    public void monoToFlux() {
        Flux<String> flux = Mono.fromSupplier(() -> "Hello World")
                .flux();

        assertThat(flux.toIterable()).containsExactly("Hello World");
    }

この他、Mono#flatMapでFluxにすることもできるようです。

    @Test
    public void monoToFluxFlatMap() {
        Flux<String> flux = Mono.fromSupplier(() -> "Hello World")
                .flatMap(s -> Flux.fromIterable(Arrays.asList(s)));

        assertThat(flux.toIterable()).containsExactly("Hello World");
    }

まとめ

とりえあずAPIに触れる程度の意味で、Reactorを試してみました。

まだ「Streamっぽいなー」くらいにしか思っていないのですが、これからちょっとずつ調べていこうかなと思います。

2016-08-14

Apache DeltaSpikeのData Moduleを試す(Test-Control/Arquillianテスト付き)

Apache DeltaSpikeには、Data Moduleというものがあります。

Data Module

サンプルを見ていると雰囲気はなんとなくわかるのですが、JPAでRepositoryパターンを実装するためのもので、

といった、どこかで見たような機能が使えるようになります(Spring Data JPA)。

で、せっかくなので今回のエントリはこういう構成でいってみたいと思います。

  • Data Moduleの簡単な紹介
  • Test-Controlを使った単体テスト
  • Aqruillian(WildFly Remote)を使ってJTAを絡めたインテグレーションテスト

以降、順に書いていきます。

Data Moduleを使ってみる

それでは、まずはData Moduleを使ってみましょう。

準備

Maven依存関係としては、以下のように定義します。

        <!-- Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- Java EE Web Profile -->
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-web-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- Apache DeltaSpike Core & Data Module -->
        <dependency>
            <groupId>org.apache.deltaspike.core</groupId>
            <artifactId>deltaspike-core-api</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.deltaspike.core</groupId>
            <artifactId>deltaspike-core-impl</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.deltaspike.modules</groupId>
            <artifactId>deltaspike-data-module-api</artifactId>
            <version>1.7.1</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.deltaspike.modules</groupId>
            <artifactId>deltaspike-data-module-impl</artifactId>
            <version>1.7.1</version>
            <scope>runtime</scope>
        </dependency>

Scalaを使うのは、ご愛嬌…。

Scalaを使うので、Scalaのプラグインも足しておきます。
※本質的ではありませんが

            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-Xlint</arg>
                        <arg>-unchecked</arg>
                        <arg>-deprecation</arg>
                        <arg>-feature</arg>
                    </args>
                    <recompileMode>incremental</recompileMode>
                </configuration>
            </plugin>

beans.xmlも用意します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
</beans>

Entityを作成する

JPAで使うEntityを作成します。お題は、書籍とします。
src/main/scala/org/littlewings/javaee7/entity/Book.scala

package org.littlewings.javaee7.entity

import javax.persistence.{Column, Entity, Id, Table}

import scala.beans.BeanProperty

object Book {
  def apply(isbn: String, title: String, price: Int): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book
  }
}

@Entity
@Table(name = "book")
@SerialVersionUID(1L)
class Book extends Serializable {
  @Id
  @BeanProperty
  var isbn: String = _

  @Column
  @BeanProperty
  var title: String = _

  @Column
  @BeanProperty
  var price: Int = _
}

Data Moduleを使う

Data Moduleの使い方の基本は、@Repositoryアノテーションの付与とEntityRepositoryインターフェースを実装したインターフェースの作成です。
※Scalaなのでトレイトになっていますが
src/main/scala/org/littlewings/javaee7/repository/BookRepository.scala

package org.littlewings.javaee7.repository

import org.apache.deltaspike.data.api.{EntityRepository, Query, Repository}
import org.littlewings.javaee7.entity.Book

@Repository
trait BookRepository extends EntityRepository[Book, String]

これだけで、基本的な操作(save/remove/findBy(PrimaryKey)/findAll/count)などはできるようになります。

Repositories

実装自体は、Partial-Beanという仕組みで生成しているようです。

Partial-Bean Module

メソッドの命名規則によるクエリの生成も可能です。

  def findByPriceGreaterThanEqualsOrderByPriceDesc(price: Int): java.util.List[Book]

メソッドの実装自体は、行う必要はありません。

使える命名規則としては、以下のようになっています。

  • Equal
  • NotEqual
  • Like
  • GreaterThan
  • GreaterThanEquals
  • LessThan
  • LessThanEquals
  • Between
  • IsNull
  • IsNotNull

Using Method Expressions

ORDER BYやLIMITといったことも可能です。

Query Ordering

Query Limits

それでも表現できない場合は、@Queryアノテーションを使用してクエリを書きます。
Query Annotations

例えば、こんな感じで。

  @Query("SELECT COUNT(b) FROM Book b WHERE b.price >= ?1")
  def countPriceGreaterThan(price: Int): Long

  @Query("SELECT b FROM Book b WHERE b.title LIKE ?1 ORDER BY b.price DESC")
  def findByTitleLike(title: String): java.util.List[Book]

パラメータは、「?N」なインデックス形式で指定するんですねぇ。こちらもメソッドの実装自体は、行う必要はありません。

QueryResultを使用することで、オプションやページングの指定ができたり

Query Options

Pagination

OptionalやStreamも使える模様。

Java 8 Semantics

細かくは書ききれないので、興味のある方はドキュメントを…。

というわけで、今回はこんなRepositoryを作成しました。
src/main/scala/org/littlewings/javaee7/repository/BookRepository.scala

package org.littlewings.javaee7.repository

import org.apache.deltaspike.data.api.{EntityRepository, Query, Repository}
import org.littlewings.javaee7.entity.Book

@Repository
trait BookRepository extends EntityRepository[Book, String] {
  def findByPriceGreaterThanEqualsOrderByPriceDesc(price: Int): java.util.List[Book]

  @Query("SELECT COUNT(b) FROM Book b WHERE b.price >= ?1")
  def countPriceGreaterThan(price: Int): Long

  @Query("SELECT b FROM Book b WHERE b.title LIKE ?1 ORDER BY b.price DESC")
  def findByTitleLike(title: String): java.util.List[Book]
}

Test-Controlで単体テストする

それでは、動作確認を兼ねてApache DeltaSpikeのTest-Controlでテストしてみましょう。

準備

まずは、Test-Control用のモジュールおよびCDIコンテナ制御の依存関係、JPA実装、テストライブラリが必要なので追加します。データベースは、H2とします。
JPAの実装はHibernateですが、このあとWildFlyで使う関係上、WildFly 10.0.0.Finalに含まれているHibernateと同じバージョンを使用しています。

        <!-- Unit Test -->
        <!-- Apache DeltaSpike Test-Control Module -->
        <dependency>
            <groupId>org.apache.deltaspike.modules</groupId>
            <artifactId>deltaspike-test-control-module-api</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.deltaspike.modules</groupId>
            <artifactId>deltaspike-test-control-module-impl</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.deltaspike.cdictrl</groupId>
            <artifactId>deltaspike-cdictrl-weld</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.weld.se</groupId>
            <artifactId>weld-se-core</artifactId>
            <version>2.3.5.Final</version>
            <scope>test</scope>
        </dependency>
        <!-- JPA Implementation for UnitTest -->
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-entitymanager</artifactId>
            <version>5.0.7.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>1.4.192</version>
            <scope>test</scope>
        </dependency>
        <!-- UnitTest Library -->
        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.major.version}</artifactId>
            <version>3.0.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

※「${scala.major.version}」は、「2.11」です

persistence.xml

先ほどのコード例では、Repositoryやエンティティは書きましたが、永続化ユニットの設定は書いていません。

単体テスト向けということで、こんなpersistence.xmlを用意。
src/main/resources/META-INF/persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence xmlns="http://xmlns.jcp.org/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence
                                 http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"
             version="2.1">
    <persistence-unit name="unit-test.persistence.unit" transaction-type="RESOURCE_LOCAL">
        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
        <properties>
            <property name="javax.persistence.jdbc.driver" value="org.h2.Driver"/>
            <property name="javax.persistence.jdbc.url"
                      value="jdbc:h2:mem:test;DB_CLOSE_DELAY=-1"/>
            <property name="hibernate.dialect" value="org.hibernate.dialect.H2Dialect"/>
            <property name="hibernate.hbm2ddl.auto" value="update"/>
            <property name="hibernate.show_sql" value="true"/>
            <property name="hibernate.format_sql" value="true"/>
        </properties>
    </persistence-unit>
</persistence>

EntityManagerのProducerを作成する

続いて、EntityManagerをCDI管理BeanとするためのProducerを作成します。こちらは、ProjectStageがUnitTestで有効となるように実装しました。
src/main/scala/org/littlewings/javaee7/producer/EntityManagerProducers.scala

package org.littlewings.javaee7.producer

import javax.enterprise.context.{ApplicationScoped, Dependent}
import javax.enterprise.inject.Produces
import javax.persistence.{EntityManager, EntityManagerFactory, Persistence, PersistenceContext}

import org.apache.deltaspike.core.api.exclude.Exclude
import org.apache.deltaspike.core.api.projectstage.ProjectStage

object EntityManagerProducers {
  @Dependent
  @Exclude(exceptIfProjectStage = Array(classOf[ProjectStage.UnitTest]))
  class UnitTestEntityManagerProducer {
    @Produces
    @ApplicationScoped
    def entityManagerFactoryProducer: EntityManagerFactory = Persistence.createEntityManagerFactory("unit-test.persistence.unit")

    @Produces
    def entityManagerProducer(emf: EntityManagerFactory): EntityManager =
      emf.createEntityManager
  }
}

※あとでハマったのですが、@Excludeは@Producesに付けるのでは効かないことがわかり、こういう形になりました

テストコード

では、テストコードを書きます。

ざっくり、こんな感じで確認。
src/test/scala/org/littlewings/javaee7/repository/DeltaSpikeRepositoryTest.scala

package org.littlewings.javaee7.repository

import javax.inject.Inject
import javax.persistence.EntityManager

import org.apache.deltaspike.testcontrol.api.junit.CdiTestRunner
import org.junit.runner.RunWith
import org.junit.{Before, Test}
import org.littlewings.javaee7.entity.Book
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite

@RunWith(classOf[CdiTestRunner])
class DeltaSpikeRepositoryTest extends JUnitSuite with Matchers {
  @Inject
  var bookRepository: BookRepository = _

  @Inject
  var em: EntityManager = _

  @Before
  def setUp(): Unit = {
    val tx = em.getTransaction
    tx.begin()
    em.createNativeQuery("TRUNCATE TABLE book").executeUpdate()
    tx.commit()
  }

  @Test
  def save(): Unit = {
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    bookRepository.save(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
    bookRepository.save(Book("978-4798124605", "Beginning Java EE 6~GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION)", 4536))

    bookRepository.count should be(3L)
  }

  @Test
  def findByPrimaryKey(): Unit = {
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 3456))

    bookRepository.findBy("978-4774183169").price should be(3456)
  }

  @Test
  def update(): Unit = {
    val tx = em.getTransaction
    tx.begin()
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    tx.commit()

    bookRepository.findBy("978-4774183169").price should be(3456)

    val tx2 = em.getTransaction
    tx2.begin()
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 4456))
    tx2.commit()

    bookRepository.findBy("978-4774183169").price should be(4456)
  }

  @Test
  def usingMethodExpressions(): Unit = {
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    bookRepository.save(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
    bookRepository.save(Book("978-4798124605", "Beginning Java EE 6~GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION)", 4536))

    val resultBooks = bookRepository.findByPriceGreaterThanEqualsOrderByPriceDesc(4000)
    resultBooks should have size (2)
    resultBooks.get(0).isbn should be("978-4798124605") // Begging Java EE 6
    resultBooks.get(1).isbn should be("978-4798140926") // Java EE 7
  }

  @Test
  def usingQueryAnnotation(): Unit = {
    bookRepository.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    bookRepository.save(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
    bookRepository.save(Book("978-4798124605", "Beginning Java EE 6~GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION)", 4536))

    bookRepository.countPriceGreaterThan(4000) should be(2L)

    val resultBooks = bookRepository.findByTitleLike("%GlassFish%")
    resultBooks should have size (1)
    resultBooks.get(0).isbn should be("978-4798124605")
  }
}

テストの内容については、そう奇抜なことはしていないので、省略…。

実行は、「mvn test」で。

Arquillianを使ってインテグレーションテスト

では、最後にArquillianを使ってアプリケーションサーバー(ここではWildFly)にデプロイして確認してみます。

Java EE環境でやるということで、トランザクション管理はJTAの@Transactionalを使って管理したいと思います。

Apache DeltaSpikeにも@Transactionalというアノテーションがあるのですが、今回はこちらは置いておきます。
JPA Module

という条件を満たす実装、テストコードを書いてみましたよ、というお話です。

準備

ArquillianをWildFly Remoteで使うということと、デプロイの関係上Shrinkwrapが必要です。

Dependency Managementに、bomを追加。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.jboss.arquillian</groupId>
                <artifactId>arquillian-bom</artifactId>
                <version>1.1.11.Final</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
            <dependency>
                <groupId>org.jboss.shrinkwrap</groupId>
                <artifactId>shrinkwrap-depchain</artifactId>
                <version>1.2.6</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>

あとは依存関係にArquillianとWildFly Remote、ShrinkwrapのMaven Resolverを足していきます。

        <!-- Arquillian Integration Test -->
        <dependency>
            <groupId>org.jboss.arquillian.junit</groupId>
            <artifactId>arquillian-junit-container</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.wildfly.arquillian</groupId>
            <artifactId>wildfly-arquillian-container-remote</artifactId>
            <version>2.0.0.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.shrinkwrap.resolver</groupId>
            <artifactId>shrinkwrap-resolver-api-maven</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.shrinkwrap.resolver</groupId>
            <artifactId>shrinkwrap-resolver-spi-maven</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.shrinkwrap.resolver</groupId>
            <artifactId>shrinkwrap-resolver-impl-maven</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.shrinkwrap.resolver</groupId>
            <artifactId>shrinkwrap-resolver-impl-maven-archive</artifactId>
            <scope>test</scope>
        </dependency>

参考)
Creating Deployable Archives with ShrinkWrap · Arquillian Guides
GitHub - shrinkwrap/resolver: ShrinkWrap Resolvers

インテグレーションテスト用のソースコードおよび設定は、「src/integration-test」配下に置くことにして、テスト対象は「**/*IT」とします。

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.12</version>
                <executions>
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/integration-test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-failsafe-plugin</artifactId>
                <version>2.19.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>integration-test</goal>
                            <goal>verify</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <includes>
                        <include>**/*IT</include>
                    </includes>
                </configuration>
            </plugin>

Arquillianの設定としては、今回はServlet 3.0のみを指定しておきます。
src/integration-test/resources/aqruillian.xml

<?xml version="1.0" encoding="UTF-8"?>
<arquillian xmlns="http://jboss.org/schema/arquillian"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://jboss.org/schema/arquillian
                                http://jboss.org/schema/arquillian/arquillian_1_0.xsd">

    <defaultProtocol type="Servlet 3.0"/>

</arquillian>

またWildFlyをダウンロード、展開し、ProjectStageをIntegrationTestとして起動しておきます。

$ wildfly-10.0.0.Final/bin/standalone.sh -Dorg.apache.deltaspike.ProjectStage=IntegrationTest

Serviceクラスの実装

@Transactionalを使うということで、先ほど作成したRepositoryを使ったServiceクラスを作成します。
src/main/scala/org/littlewings/javaee7/service/BookService.scala

package org.littlewings.javaee7.service

import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject
import javax.transaction.Transactional

import org.littlewings.javaee7.entity.Book
import org.littlewings.javaee7.repository.BookRepository

@ApplicationScoped
class BookService {
  @Inject
  var bookRepository: BookRepository = _

  @Transactional
  def save(book: Book): Book = bookRepository.save(book)

  @Transactional
  def saveFail(book: Book): Book = {
    save(book)
    throw new RuntimeException("Oops!!")
  }

  @Transactional
  def findByIsbn(isbn: String): Book = bookRepository.findBy(isbn)

  @Transactional
  def countPriceGreaterThan(price: Int): Long = bookRepository.countPriceGreaterThan(price)

  @Transactional
  def findByTitleLike(title: String): java.util.List[Book] = bookRepository.findByTitleLike(title)
}

ロールバックを確認するためのメソッドも入れています。

DataSourceとpersistence.xmlとEntityManagerのProducer

インテグレーションテストでは、単体テストで使っていたH2はやめて、MySQLで確認することにします。

データベースについては、MySQL側に事前にこんなテーブルを作っておきます。

CREATE TABLE IF NOT EXISTS book(
  isbn VARCHAR(14),
  title VARCHAR(200),
  price INT(10),
  PRIMARY KEY(isbn)
);

データソースは、こんなスクリプトでJDBCドライバのデプロイとともに、一気に作成。JDBCドライバは、MySQLのドライバがあらかじめMavenローカルリポジトリにあるものとします。
src/integration-test/resources/jboss-deploy-and-create-ds-script

connect
deploy ~/.m2/repository/mysql/mysql-connector-java/6.0.3/mysql-connector-java-6.0.3.jar
data-source add --name=mysqlDs --driver-name=mysql-connector-java-6.0.3.jar --driver-class=com.mysql.cj.jdbc.Driver --jndi-name=java:jboss/datasources/jdbc/mysqlDs --jta=true --connection-url=jdbc:mysql://localhost:3306/test --user-name=kazuhira --password=password
cd /subsystem=datasources/data-source=mysqlDs
./connection-properties=useUnicode:add(value=true)
./connection-properties=characterEncoding:add(value=utf-8)
./connection-properties=characterSetResults:add(value=utf-8)
./connection-properties=useServerPrepStmts:add(value=true)
./connection-properties=useLocalSessionState:add(value=true)
./connection-properties=elideSetAutoCommits:add(value=true)
./connection-properties=alwaysSendSetIsolation:add(value=false)
./connection-properties=useSSL:add(value=false)
reload

実行。

$ /path/to/wildfly-10.0.0.Final/bin/jboss-cli.sh --file=src/integration-test/resources/jboss-deploy-and-create-ds-script 

これで、データソースの作成まで完了です。

このデータソースをpersistence.xmlの永続化ユニットとして使うように定義し

    <persistence-unit name="integration-test.persistence.unit" transaction-type="JTA">
        <jta-data-source>java:jboss/datasources/jdbc/mysqlDs</jta-data-source>
        <properties>
            <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL57InnoDBDialect"/>
            <property name="hibernate.show_sql" value="true"/>
            <property name="hibernate.format_sql" value="true"/>
            <property name="hibernate.hbm2ddl.auto" value="validate"/>
        </properties>
    </persistence-unit>

EntityManagerのProducer側にも追加します。

object EntityManagerProducers {
  @Dependent
  @Exclude(exceptIfProjectStage = Array(classOf[ProjectStage.IntegrationTest]))
  class IntegrationTestEntityManagerProducer {
    @PersistenceContext(unitName = "integration-test.persistence.unit")
    var entityManager: EntityManager = _

    @Produces
    @ApplicationScoped
    def entityManagerProducer: EntityManager = entityManager
  }

  @Dependent
  @Exclude(exceptIfProjectStage = Array(classOf[ProjectStage.UnitTest]))
  class UnitTestEntityManagerProducer {
    // 省略
  }
}

テストを書いて実行

では、これでArquillianを使ったテストを書いていきます。
src/integration-test/scala/org/littlewings/javaee7/service/ArquillianWithDeltaSpikeServiceIT.scala

package org.littlewings.javaee7.service

import javax.inject.Inject
import javax.persistence.EntityManager
import javax.transaction.UserTransaction

import org.jboss.arquillian.container.test.api.Deployment
import org.jboss.arquillian.junit.Arquillian
import org.jboss.shrinkwrap.api.ShrinkWrap
import org.jboss.shrinkwrap.api.spec.WebArchive
import org.jboss.shrinkwrap.resolver.api.maven.Maven
import org.junit.runner.RunWith
import org.junit.{Before, Test}
import org.littlewings.javaee7.entity.Book
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite

@RunWith(classOf[Arquillian])
class ArquillianWithDeltaSpikeServiceIT extends JUnitSuite with Matchers {
  @Inject
  var userTransaction: UserTransaction = _

  @Inject
  var em: EntityManager = _

  @Inject
  var bookService: BookService = _

  @Before
  def setUp(): Unit = {
    userTransaction.begin()
    em.createNativeQuery("TRUNCATE TABLE book").executeUpdate()
    userTransaction.commit()
  }

  @Test
  def save(): Unit = {
    bookService.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    bookService.save(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
    bookService.save(Book("978-4798124605", "Beginning Java EE 6~GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION)", 4536))

    bookService.findByIsbn("978-4774183169").price should be(3456)
  }

  @Test
  def rollback(): Unit = {
    val thrown = the[RuntimeException] thrownBy bookService.saveFail(Book("978-4774183169", "パーフェクト Java EE", 3456))
    thrown.getMessage should be("Oops!!")

    bookService.findByIsbn("978-4774183169") should be(null)
  }

  @Test
  def usingQueryAnnotation(): Unit = {
    bookService.save(Book("978-4774183169", "パーフェクト Java EE", 3456))
    bookService.save(Book("978-4798140926", "Java EE 7徹底入門 標準Javaフレームワークによる高信頼性Webシステムの構築", 4104))
    bookService.save(Book("978-4798124605", "Beginning Java EE 6~GlassFish 3で始めるエンタープライズJava (Programmer's SELECTION)", 4536))

    val resultBooks = bookService.findByTitleLike("%徹底入門%")
    resultBooks should have size(1)
    resultBooks.get(0).isbn should be("978-4798140926")
  }
}

object ArquillianWithDeltaSpikeServiceIT {
  @Deployment
  def createDeployment: WebArchive = {
    ShrinkWrap
      .create(classOf[WebArchive])
      .addPackages(true, "org.littlewings.javaee7")
      .addAsResource("META-INF/apache-deltaspike.properties", "META-INF/apache-deltaspike.properties")
      .addAsResource("META-INF/beans.xml", "META-INF/beans.xml")
      .addAsResource("META-INF/persistence.xml", "META-INF/persistence.xml")
      .addAsLibraries(
        Maven
          .resolver
          .loadPomFromFile("pom.xml")
          .importRuntimeDependencies
          .resolve("org.scalatest:scalatest_2.11:3.0.0")
          .withTransitivity
          .asFile: _*
      )
  }
}

ではこれで、テストを実行。

$ mvn test-compile failsafe:integration-test

単体テストのコードは動かしたくなかったので、この指定…

ところが、これはうまくいきません。

このコードをそのまま動かすと、以下のようなエラーを見ることになります。

java.lang.IllegalStateException: A JTA EntityManager cannot use getTransaction()

これは、Apache DeltaSpikeのData Moduleが依存する、JPA ModuleがRESOURCE_LOCALを想定していることが理由です。

参考)
DeltaSpike DataをJava EEアプリケーションサーバー上で動かす(2) | なるほど!ザ・Weld

これを回避するためには、META-INF/apache-deltaspile.propertiesに以下のような記述をします。
src/main/resources/META-INF/apache-deltaspike.properties

globalAlternatives.org.apache.deltaspike.jpa.spi.transaction.TransactionStrategy=org.apache.deltaspike.jpa.impl.transaction.ContainerManagedTransactionStrategy

コンテナ管理のトランザクション制御を利用するようにしましょう、と。TransactionStrategyインターフェースの実装で選択するのですが、以下のパッケージに実装があります。
https://github.com/apache/deltaspike/tree/deltaspike-1.7.1/deltaspike/modules/jpa/impl/src/main/java/org/apache/deltaspike/jpa/impl/transaction

デフォルトはResourceLocalTransactionStrategyで、その他にBeanManagedUserTransactionStrategy、ContainerManagedTransactionStrategy、EnvironmentAwareTransactionStrategyがありますが、今回の目的で使えるのはContainerManagedTransactionStrategyだけです。

また、ドキュメントの方には、beans.xmlにalternativesを書けばいいよと書かれていましたが、うまく動作しませんでした…。

この設定を入れると、テストをパスするようになります。

めでたし、めでたし。

単体テストはどうした

ですが、ここで「src/main/resources/META-INF/apache-deltaspike.properties」にTransactionStrategyの設定を書いてしまったので、実は単体テスト側の動作を破壊したことになります。

さて、どうしましょう?

「src/main/resources/META-INF/apache-deltaspike.properties」の内容は変えないとして、単体テストではResourceLocalTransactionStrategyを使いたいわけです。また、globalAlternativesについてはProjectStageは見てくれません。

今回は、別の設定ファイルを追加して、apache-deltaspike.propertiesより優先度の高いファイルを作り、その内容を上書きする方法で対処しました。

次のようなクラスを作成します。追加するファイルは、「unit-test-apache-deltaspike.properties」とします。
src/test/scala/org/littlewings/javaee7/repository/UnitTestApacheDeltaSpikePropertyFileConfig.scala

package org.littlewings.javaee7.repository

import org.apache.deltaspike.core.api.config.PropertyFileConfig

class UnitTestApacheDeltaSpikePropertyFileConfig extends PropertyFileConfig {
  override def getPropertyFileName: String = "META-INF/unit-test-apache-deltaspike.properties"

  override def isOptional: Boolean = false
}

このクラスをService Providerの仕組みに乗せます。
src/test/resources/META-INF/services/org.apache.deltaspike.core.api.config.PropertyFileConfig

org.littlewings.javaee7.repository.UnitTestApacheDeltaSpikePropertyFileConfig

「unit-test-apache-deltaspike.properties」の内容は、以下のように書きます。
src/test/resources/META-INF/unit-test-apache-deltaspike.properties

deltaspike_ordinal=110
globalAlternatives.org.apache.deltaspike.jpa.spi.transaction.TransactionStrategy=org.apache.deltaspike.jpa.impl.transaction.ResourceLocalTransactionStrategy

TransactionStrategyをResourceLocalTransactionStrategyにするのはもちろんですが、deltaspike_ordinalを100よりも大きくすることで、オリジナルのapache-deltaspile.propertiesよりも優先度を上にします。
Providing configuration using ConfigSources

Propertiesファイルの優先度は100で、値が大きいほど優先度が高くなります。システムプロパティが最高で、400になっています。

今回は、これで単体テストではResourceLocalTransactionStrategyを使い、インテグレーションテストではContainerManagedTransactionStrategyを使うようにしました。

まとめ

Apache DeltaSpikeのData Moduleを使い、JPAのクエリ実行を試しつつ、Test-Controlで単体テスト、ArquillianでインテグレーションテストをしてJTAとの統合まで確認してみました。

Data Module自体より、ProjectStageとTransactionStrategyにとてもハマったのですが、なんとか目標にしていた形には持っていけました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/cdi-jpa-deltaspike-data

参考)
DeltaSpike Dataの紹介 | なるほど!ザ・Weld
DeltaSpike DataをJava EEアプリケーションサーバー上で動かす(1) | なるほど!ザ・Weld
DeltaSpike DataをJava EEアプリケーションサーバー上で動かす(2) | なるほど!ザ・Weld
Creating Deployable Archives with ShrinkWrap · Arquillian Guides
GitHub - shrinkwrap/resolver: ShrinkWrap Resolvers
build - Prevent unit tests in maven but allow integration tests - Stack Overflow

2016-08-11

Apache DeltaSpikeのScheduler Moduleを試す

Apache DeltaSpikeには、Scheduler Moduleというものがあり、こちらを使って定期的に動かすジョブを作成することができます。

Scheduler Module

ジョブのスケジュールはCRON形式で指定でき、実装としてはQuartzを使っているようです。

Quartz Enterprize Job Scheduler

こちらを利用すると、EJBのTimerServiceを使わなくてもCDIでジョブ起動ができる、と。
EJB TimerServiceのPersistenceみたいなのはないみたいですが

では、試してみましょう。

準備

まずは、ビルド定義から。
build.sbt

name := "cdi-deltaspike-scheduler"

organization := "org.littlewings"

scalaVersion := "2.11.8"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

enablePlugins(JettyPlugin)

webappWebInfClasses := true

artifactName := {
  (scalaVersion: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    // artifact.name + "." + artifact.extension
    "ROOT." + artifact.extension
}

fork in Test := true

libraryDependencies ++= Seq(
  "javax" % "javaee-web-api" % "7.0" % Provided,
  "org.apache.deltaspike.core" % "deltaspike-core-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.core" % "deltaspike-core-impl" % "1.7.1" % Runtime,
  "org.apache.deltaspike.modules" % "deltaspike-scheduler-module-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.modules" % "deltaspike-scheduler-module-impl" % "1.7.1" % Runtime,
  "org.apache.deltaspike.cdictrl" % "deltaspike-cdictrl-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.cdictrl" % "deltaspike-cdictrl-weld" % "1.7.1" % Runtime,
  "org.quartz-scheduler" % "quartz" % "2.2.1" % Compile
)

Webプロジェクトとして作成します。
project/plugins.sbt

logLevel := Level.Warn

addSbtPlugin("com.earldouglas" % "xsbt-web-plugin" % "2.1.0")

依存関係としては、Apache DeltaSpikeのCoreモジュール。

  "org.apache.deltaspike.core" % "deltaspike-core-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.core" % "deltaspike-core-impl" % "1.7.1" % Runtime,

Schedulerモジュール。

  "org.apache.deltaspike.modules" % "deltaspike-scheduler-module-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.modules" % "deltaspike-scheduler-module-impl" % "1.7.1" % Runtime,

そして、Container-Controlモジュールが必要です。

  "org.apache.deltaspike.cdictrl" % "deltaspike-cdictrl-api" % "1.7.1" % Compile,
  "org.apache.deltaspike.cdictrl" % "deltaspike-cdictrl-weld" % "1.7.1" % Runtime,

Quartzについては、デフォルトのスケジューラーとしてApache DeltaSpikeのSchedule Moduleが採用しているのですが、Quartz自体への依存関係は明示的に記述する必要があります。

  "org.quartz-scheduler" % "quartz" % "2.2.1" % Compile

Project Setup

アプリケーションの実行は、WildFlyへデプロイして確認するものとします。とりあえず、WildFlyを起動しておきましょう。

$ bin/standalone.sh

Jobを作成する

Apache DeltaSpikeのSchedule ModuleでJobを作成する方法は、以下の2つがあります。

  • org.quartz.Jobインターフェースを実装したクラスを作成する
  • java.lang.Runnableインターフェースを実装したクラスを作成する

いずれにしろ、Quartzの上で動作します。

@Scheduled with org.quartz.Job or java.lang.Runnable

Jobの起動タイミングについては、CRON形式で記述します。
Configurable CRON expressions

それでは、まずはQuartzのJobインターフェースを実装する方法で実装してみましょう。
src/main/scala/org/littlewings/javaee7/cdi/QuartzBasedJob.scala

package org.littlewings.javaee7.cdi

import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject

import org.apache.deltaspike.scheduler.api.Scheduled
import org.quartz.{Job, JobExecutionContext}
import org.slf4j.{Logger, LoggerFactory}

@Scheduled(cronExpression = "0 0/1 * * * ?")
@ApplicationScoped
class QuartzBasedJob extends Job {
  @Inject
  var applicationScopedMessageService: ApplicationScopedMessageService = _

  @Inject
  var sessionScopedMessageService: SessionScopedMessageService = _

  @Inject
  var requestScopedMessageService: RequestScopedMessageService = _

  @Inject
  var pseudoScopedMessageService: PseudoScopedMessageService = _

  val logger: Logger = LoggerFactory.getLogger(getClass)

  override def execute(context: JobExecutionContext): Unit = {
    logger.info("[{}] startup job", getClass.getSimpleName)

    applicationScopedMessageService.loggingMessage()
    sessionScopedMessageService.loggingMessage()
    requestScopedMessageService.loggingMessage()
    pseudoScopedMessageService.loggingMessage()

    logger.info("[{}] end job", getClass.getSimpleName)
  }
}

@ScheduleアノテーションおよびcronExpressionで、起動タイミングを指定します。また、CDI管理Beanとして宣言しておきます。クラス自体は、QuartzのJobインターフェースを実装して作成します。

@Scheduled(cronExpression = "0 0/1 * * * ?")
@ApplicationScoped
class QuartzBasedJob extends Job {

今回は、1分毎に起動するJobを作成しました。

あとは、Job#executeメソッドを実装すればOKです。

  override def execute(context: JobExecutionContext): Unit = {
    logger.info("[{}] startup job", getClass.getSimpleName)

    applicationScopedMessageService.loggingMessage()
    sessionScopedMessageService.loggingMessage()
    requestScopedMessageService.loggingMessage()
    pseudoScopedMessageService.loggingMessage()

    logger.info("[{}] end job", getClass.getSimpleName)
  }

と書くと、QuartzのJobを作成するのと何が違う?という話になりますが、このJobへは@InjectでCDI管理Beanをインジェクションすることができます。

  @Inject
  var applicationScopedMessageService: ApplicationScopedMessageService = _

  @Inject
  var sessionScopedMessageService: SessionScopedMessageService = _

  @Inject
  var requestScopedMessageService: RequestScopedMessageService = _

  @Inject
  var pseudoScopedMessageService: PseudoScopedMessageService = _

ここで利用しているCDI管理Beanは、名称から自明かもしれませんが、各スコープに応じたCDI管理Beanです。
src/main/scala/org/littlewings/javaee7/cdi/MessageService.scala

package org.littlewings.javaee7.cdi

import java.time.LocalDateTime
import javax.enterprise.context.{ApplicationScoped, Dependent, RequestScoped, SessionScoped}

import org.slf4j.{Logger, LoggerFactory}

trait MessageServiceSupport {
  val logger: Logger = LoggerFactory.getLogger(getClass)

  def loggingMessage(): Unit = {
    logger.info(s"Hello ${getClass.getSimpleName}@${hashCode}, now = ${LocalDateTime.now}")
  }
}

@ApplicationScoped
class ApplicationScopedMessageService extends MessageServiceSupport

@SessionScoped
@SerialVersionUID(1L)
class SessionScopedMessageService extends MessageServiceSupport with Serializable

@RequestScoped
class RequestScopedMessageService extends MessageServiceSupport

@Dependent
class PseudoScopedMessageService extends MessageServiceSupport

何気に、SessionScopedやRequestScopedなCDI管理Beanも混じっています。これが動くのでしょうか…?

とりあえず、パッケージングして

> package

デプロイ。

$ cp target/scala-2.11/ROOT.war /path/to/wildfly-10.0.0.Final/standalone/deployments

すると、1分おきにジョブが実行されます。

01:05:00,041 INFO  [org.littlewings.javaee7.cdi.QuartzBasedJob] (DefaultQuartzScheduler_Worker-1) [QuartzBasedJob] startup job
01:05:00,130 INFO  [org.littlewings.javaee7.cdi.ApplicationScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello ApplicationScopedMessageService@1340878689, now = 2016-08-12T01:05:00.127
01:05:00,131 INFO  [org.littlewings.javaee7.cdi.SessionScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello SessionScopedMessageService@1552885928, now = 2016-08-12T01:05:00.131
01:05:00,131 INFO  [org.littlewings.javaee7.cdi.RequestScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello RequestScopedMessageService@1988410949, now = 2016-08-12T01:05:00.131
01:05:00,131 INFO  [org.littlewings.javaee7.cdi.PseudoScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello PseudoScopedMessageService@157109538, now = 2016-08-12T01:05:00.131
01:05:00,131 INFO  [org.littlewings.javaee7.cdi.QuartzBasedJob] (DefaultQuartzScheduler_Worker-1) [QuartzBasedJob] end job


01:06:00,001 INFO  [org.littlewings.javaee7.cdi.QuartzBasedJob] (DefaultQuartzScheduler_Worker-3) [QuartzBasedJob] startup job
01:06:00,004 INFO  [org.littlewings.javaee7.cdi.ApplicationScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello ApplicationScopedMessageService@1340878689, now = 2016-08-12T01:06:00.004
01:06:00,005 INFO  [org.littlewings.javaee7.cdi.SessionScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello SessionScopedMessageService@1186961072, now = 2016-08-12T01:06:00.005
01:06:00,005 INFO  [org.littlewings.javaee7.cdi.RequestScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello RequestScopedMessageService@1215852567, now = 2016-08-12T01:06:00.005
01:06:00,005 INFO  [org.littlewings.javaee7.cdi.PseudoScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello PseudoScopedMessageService@157109538, now = 2016-08-12T01:06:00.005
01:06:00,006 INFO  [org.littlewings.javaee7.cdi.QuartzBasedJob] (DefaultQuartzScheduler_Worker-3) [QuartzBasedJob] end job

一緒にハッシュコードも出力していますが、よくよく見るとSessionScopedやRequestScopedなCDI管理Beanは、都度インスタンスが作成されてインジェクションされているようですね。

In such scheduled-tasks CDI based dependency-injection is enabled. Furthermore, the request- and session-scope get started (and stopped) per job-execution. Therefore, the container-control module (of DeltaSpike) is required. That can be controlled via @Scheduled#startScopes (possible values: all scopes supported by the container-control module as well as {} for 'no scopes').

https://deltaspike.apache.org/documentation/scheduler.html#@Scheduledwithorg.quartz.Joborjava.lang.Runnable

とりあえず、最低限の動作は確認できました。

では、続いてRunnableインターフェースを実装したクラスで、Jobを作成してみましょう。
src/main/scala/org/littlewings/javaee7/cdi/RunnableBasedJob.scala

package org.littlewings.javaee7.cdi

import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject

import org.apache.deltaspike.scheduler.api.Scheduled
import org.slf4j.{Logger, LoggerFactory}

@Scheduled(cronExpression = "0 0/1 * * * ?")
@ApplicationScoped
class RunnableBasedJob extends Runnable {
  @Inject
  var applicationScopedMessageService: ApplicationScopedMessageService = _

  @Inject
  var sessionScopedMessageService: SessionScopedMessageService = _

  @Inject
  var requestScopedMessageService: RequestScopedMessageService = _

  @Inject
  var pseudoScopedMessageService: PseudoScopedMessageService = _

  val logger: Logger = LoggerFactory.getLogger(getClass)

  override def run(): Unit = {
    logger.info("[{}] startup job", getClass.getSimpleName)

    applicationScopedMessageService.loggingMessage()
    sessionScopedMessageService.loggingMessage()
    requestScopedMessageService.loggingMessage()
    pseudoScopedMessageService.loggingMessage()

    logger.info("[{}] end job", getClass.getSimpleName)
  }
}

QuartzのJobインターフェースを実装した場合と、やっていることはほとんど同じです。実装しているメソッドが、Runnable#runなくらいですね。

デプロイして動かした時のログは、このように。

01:08:00,445 INFO  [org.littlewings.javaee7.cdi.RunnableBasedJob] (DefaultQuartzScheduler_Worker-1) [RunnableBasedJob] startup job
01:08:00,552 INFO  [org.littlewings.javaee7.cdi.ApplicationScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello ApplicationScopedMessageService@1897461758, now = 2016-08-12T01:08:00.550
01:08:00,552 INFO  [org.littlewings.javaee7.cdi.SessionScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello SessionScopedMessageService@1805686172, now = 2016-08-12T01:08:00.552
01:08:00,553 INFO  [org.littlewings.javaee7.cdi.RequestScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello RequestScopedMessageService@1407251698, now = 2016-08-12T01:08:00.553
01:08:00,553 INFO  [org.littlewings.javaee7.cdi.PseudoScopedMessageService] (DefaultQuartzScheduler_Worker-1) Hello PseudoScopedMessageService@1527663713, now = 2016-08-12T01:08:00.553
01:08:00,553 INFO  [org.littlewings.javaee7.cdi.RunnableBasedJob] (DefaultQuartzScheduler_Worker-1) [RunnableBasedJob] end job


01:09:00,002 INFO  [org.littlewings.javaee7.cdi.RunnableBasedJob] (DefaultQuartzScheduler_Worker-3) [RunnableBasedJob] startup job
01:09:00,003 INFO  [org.littlewings.javaee7.cdi.ApplicationScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello ApplicationScopedMessageService@1897461758, now = 2016-08-12T01:09:00.003
01:09:00,004 INFO  [org.littlewings.javaee7.cdi.SessionScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello SessionScopedMessageService@1975586858, now = 2016-08-12T01:09:00.004
01:09:00,005 INFO  [org.littlewings.javaee7.cdi.RequestScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello RequestScopedMessageService@2064048354, now = 2016-08-12T01:09:00.005
01:09:00,006 INFO  [org.littlewings.javaee7.cdi.PseudoScopedMessageService] (DefaultQuartzScheduler_Worker-3) Hello PseudoScopedMessageService@1527663713, now = 2016-08-12T01:09:00.005
01:09:00,006 INFO  [org.littlewings.javaee7.cdi.RunnableBasedJob] (DefaultQuartzScheduler_Worker-3) [RunnableBasedJob] end job

JobとRunnableは同時に使えない

と、あたかもさらっと動かしたように書きましたが、実は上記までを忠実に実行すると、このアプリケーションは動作しません。

どういうことかというと、上記までのコードを両方書くと、「@ScheduledなCDI管理BeanがJobとRunnableの2つの実装方法で両方とも存在している」という状態になります。

Apache DeltaSpikeのScheduler Moduleは、この状態を許容していません。JobおよびRunnableを実装して作成したCDI管理BeanとしてのJobが存在すると、デプロイ時に以下のようなエラーを見ることになります。

java.lang.IllegalStateException: Please only annotate classes with @org.apache.deltaspike.scheduler.api.Scheduled of type org.quartz.Job or of type java.lang.Runnable, but not both!

なお、@ScheduledなCDI管理Beanの登録は、Apache DeltaSpikeが実装しているCDI Extensionsによって実現されています。

https://github.com/apache/deltaspike/blob/deltaspike-1.7.1/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java#L44

JobおよびRunnableの使ったJobがそれぞれ定義されている場合は、このクラスが検出してエラーとします。

ドキュメントにも、一応それっぽいことが書いてあります。

Behind the scenes DeltaSpike registers an adapter for Quartz which just delegates to the run-method once the adapter gets called by Quartz. Technically you end up with almost the same, just with a reduced API for implementing (all) your scheduled jobs. Therefore the main difference is that your code is independent of Quartz-classes. However, you need to select org.quartz.Job or java.lang.Runnable for all your scheduled-tasks, bot not both!

https://deltaspike.apache.org/documentation/scheduler.html#@Scheduledwithorg.quartz.Joborjava.lang.Runnable

なお、JobもしくはRunnableのどちらかを使用することで統一していれば、@ScheduledなCDI管理Beanは、複数登録可能です。

このように。

@Scheduled(cronExpression = "0 0/1 * * * ?")
@ApplicationScoped
class QuartzBasedJob extends Job {
  // 省略
}

@Scheduled(cronExpression = "10 0/1 * * * ?")
@ApplicationScoped
class QuartzBasedJob2 extends Job {
  // 省略
}

もしくは、こう。

@Scheduled(cronExpression = "0 0/1 * * * ?")
@ApplicationScoped
class RunnableBasedJob extends Runnable {
  // 省略
}

@Scheduled(cronExpression = "10 0/1 * * * ?")
@ApplicationScoped
class RunnableBasedJob2 extends Runnable {
  // 省略
}

手動でJob登録する

これまでは、コンテナの起動時にJobを登録して、即時にスケジューリングしていました。

これを手動登録することもできます。

@Scheduled with org.quartz.Job or java.lang.Runnable
Manual Scheduler Control

ここで書かれている方法は、QuartzのJobのみになるようです。Runnableでやりたければ、ManagedExecutorServiceを使うことになりそうな?

Execute java.lang.Runnable with ManagedExecutorService

で、話を戻して手動でQuartzのJobを実行する方法ですが、@AlternativesなCDI管理Bean、QuartzSchedulerProducerをbeans.xmlに指定します。
src/main/resources/META-INF/beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee
                           http://xmlns.jcp.org/xml/ns/javaee/beans_1_1.xsd"
       bean-discovery-mode="annotated">
    <alternatives>
        <class>org.apache.deltaspike.scheduler.impl.QuartzSchedulerProducer</class>
    </alternatives>
</beans>

そして、これまでと同じようにJobを作成します。@Injectもふつうに使えます。
src/main/scala/org/littlewings/javaee7/cdi/ManualQuartzBasedJob.scala

package org.littlewings.javaee7.cdi

import javax.enterprise.context.ApplicationScoped
import javax.inject.Inject

import org.apache.deltaspike.scheduler.api.Scheduled
import org.quartz.{Job, JobExecutionContext}
import org.slf4j.{Logger, LoggerFactory}

@Scheduled(cronExpression = "30 0/1 * * * ?", onStartup = false)
@ApplicationScoped
class ManualQuartzBasedJob extends Job {
  @Inject
  var applicationScopedMessageService: ApplicationScopedMessageService = _

  @Inject
  var sessionScopedMessageService: SessionScopedMessageService = _

  @Inject
  var requestScopedMessageService: RequestScopedMessageService = _

  @Inject
  var pseudoScopedMessageService: PseudoScopedMessageService = _

  val logger: Logger = LoggerFactory.getLogger(getClass)

  override def execute(context: JobExecutionContext): Unit = {
    logger.info("[{}] startup job", getClass.getSimpleName)

    applicationScopedMessageService.loggingMessage()
    sessionScopedMessageService.loggingMessage()
    requestScopedMessageService.loggingMessage()
    pseudoScopedMessageService.loggingMessage()

    logger.info("[{}] end job", getClass.getSimpleName)
  }
}

先ほどまでのJobとほとんど同じですが、違いは@ScheudledのonStarupをfalseにしていることです。

@Scheduled(cronExpression = "30 0/1 * * * ?", onStartup = false)
@ApplicationScoped
class ManualQuartzBasedJob extends Job {

こうしておくと、コンテナ起動時にスケジューラーに登録されなくなります。

https://github.com/apache/deltaspike/blob/deltaspike-1.7.1/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java#L103

ただ、こうするとJobを登録する処理が必要ですね。こちらはJAX-RSで作成しました。

JAX-RSの有効化。
src/main/scala/org/littlewings/javaee7/rest/JaxrsApplication.scala

package org.littlewings.javaee7.rest

import javax.ws.rs.ApplicationPath
import javax.ws.rs.core.Application

@ApplicationPath("rest")
class JaxrsApplication extends Application

Jobをスケジューラーに登録する、JAX-RSリソースクラス。
src/main/scala/org/littlewings/javaee7/rest/ManualJobResource.scala

package org.littlewings.javaee7.rest

import javax.enterprise.context.RequestScoped
import javax.inject.Inject
import javax.ws.rs.core.MediaType
import javax.ws.rs.{GET, Path, Produces}

import org.apache.deltaspike.scheduler.spi.Scheduler
import org.littlewings.javaee7.cdi.ManualQuartzBasedJob
import org.quartz.Job

@Path("job")
@RequestScoped
class ManualJobResource {
  @Inject
  var scheculer: Scheduler[Job] = _

  @GET
  @Path("start-job")
  @Produces(Array(MediaType.TEXT_PLAIN))
  def startJob: String = {
    scheculer.registerNewJob(classOf[ManualQuartzBasedJob])

    "Register Job!!"
  }
}

ここでのポイントは、Apache DeltaSpikeのSchedulerを@Injectして

  @Inject
  var scheculer: Scheduler[Job] = _

作成したJobのClassクラスをScheduler#registerNewJobすることですね。

    scheculer.registerNewJob(classOf[ManualQuartzBasedJob])

これで、Job登録完了です。

デプロイして確認してみます。デプロイ後、以下のコマンドを実行するとJobが登録され

$ curl http://localhost:8080/rest/job/start-job
Register Job!!

実行されるようになります。

01:32:30,012 INFO  [org.littlewings.javaee7.cdi.ManualQuartzBasedJob] (DefaultQuartzScheduler_Worker-5) [ManualQuartzBasedJob] startup job
01:32:30,013 INFO  [org.littlewings.javaee7.cdi.ApplicationScopedMessageService] (DefaultQuartzScheduler_Worker-5) Hello ApplicationScopedMessageService@1293488989, now = 2016-08-12T01:32:30.013
01:32:30,014 INFO  [org.littlewings.javaee7.cdi.SessionScopedMessageService] (DefaultQuartzScheduler_Worker-5) Hello SessionScopedMessageService@1057457274, now = 2016-08-12T01:32:30.014
01:32:30,015 INFO  [org.littlewings.javaee7.cdi.RequestScopedMessageService] (DefaultQuartzScheduler_Worker-5) Hello RequestScopedMessageService@2047151614, now = 2016-08-12T01:32:30.015
01:32:30,016 INFO  [org.littlewings.javaee7.cdi.PseudoScopedMessageService] (DefaultQuartzScheduler_Worker-5) Hello PseudoScopedMessageService@818428427, now = 2016-08-12T01:32:30.015
01:32:30,016 INFO  [org.littlewings.javaee7.cdi.ManualQuartzBasedJob] (DefaultQuartzScheduler_Worker-5) [ManualQuartzBasedJob] end job

その他、気になること

今回は特に試しませんでしたが、CRONのスケジュールを指定している部分には設定ファイルの内容を参照することもできるようです。

@Scheduled(cronExpression = "{myCronExpression}")

Configurable CRON expressions

こちらを使うと、JobのスケジューリングをProjectStageごとに管理することもできるようですね。

また、Quartzではなく、独自のスケジューラーを実装することも可能ではあるようです。

Custom Scheduler

まとめ

Apache DeltaSpikeのSchedule Moduleを試してみました。

内部実装はQuartzですが、CDI管理BeanをインジェクションできるJobを作成できるなど、ちょっと便利そうだなぁと思いました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/javaee7-scala-examples/tree/master/cdi-deltaspike-scheduler