谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2017-12-15

[]ReactorでN+1問題な処理を実装してみた話

最近、格ゲーツイートが増えてる @ です。前のエントリーに書いた「18年ぶりに出る続編」のβテストがついに始まりまして、最近は夜な夜なコンボをやるなどしています。

シビアな反応が要求される格闘ゲームにおいて、継続して勝ち続けるためにはどうしても反射神経が必要となり、機械のような反射神経、つまり「反応装置」にならなくてはいけません、そうだから今日のテーマは「Reactor」なのです、、、みたいな流れを考えたんですが、どうにも苦しいですよね。ろくにスベることもできない中ですが、Javaアドベントカレンダー15日目が始まりました。

https://qiita.com/advent-calendar/2017/java


さて、Project ReactorはReactiveなノンブロッキング処理を書くためのライブラリです。最近はSpring 5.0などでも全面的に利用されているため話題になりがちです。今日はこれを勉強しながら、業務にありそうなケースを実装してみます。

なお、同期とか非同期とかブロッキングとかノンブロッキングとかReactiveとかの言葉の定義は、この際、置いておきます。実装を見て、雰囲気で掴んでください。


目次

0. はじめに

1. ノンブロッキング処理を体感する

2. Reactorらしいコードを書く

3. ノンブロッキングなら、シングルスレッドでも早いのか?

4. ブロッキング処理の場合はどうなる?

5. ブロッキング処理も、マルチスレッドで高速化


0〜2までは、Reactor初心者が苦慮してコードを書いていった話。

2〜5は、ノンブロッキング処理やブロッキング処理を、それぞれシングルスレッド、マルチスレッドで試してみた話です。

長いエントリーなので、興味がある部分を拾い読みしてもらえればと思います。


0. はじめに

まずは事前の準備や、目的などを説明します。


事前にやること

Reactorについて完全に素人だったので、まずは入門のスライドを読んでから、ハンズオン(チュートリアル)をやりました。


Spring 5に備えるリアクティブプログラミング入門

https://www.slideshare.net/TakuyaIwatsuka/spring-5


Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3

https://www.slideshare.net/makingx/reactive-web-spring-5-jjugccc-cccef3


Lite Rx API Hands-on(チュートリアル

https://github.com/reactor/lite-rx-api-hands-on/


なんとなく、Java5世代の非同期脳で、MonoがFutureのようなもの、FluxがList版のFutureのようなもの、という理解をしました。実際にはJava8で追加されたCompletableFutureのようなもののようですが、それはおいとくとして。

とりあえずこれらでReactorの概要をざっくり掴んだのですが、とにかく機能が多すぎて、全く覚えきれません。Stream APIをもう一度学びなおしてるような気持ちです。


今回の目的

今回は、いわゆる「N+1問題」をReactorで実装するとどうなるか、という検証をしてみます。

テーブルAから一覧データをN件持ってきて、次にそれに関連するデータをテーブルBから持ってくるという処理において、クエリがN+1回発生してしまうせいで遅い、というアレです。SQLを工夫すればクエリ1回で済むやろっていうコメントは、本題からズレるので★1です!


今回は「生徒一覧」を取得したうえで、生徒の「成績一覧」を検索することを想定します。

また検索対象は、RDBMSのようなブロッキング処理しかできないデータソースの場合と、何かしらイイ感じのノンブロッキングなデータソースの場合とを、それぞれ想定して比較します。


コードで言うと、次のようなイメージです。

生徒一覧の取得
Flux<Student> fetchStudents(String className)

点数一覧の取得
Flux<Score> fetchScores(int year, int id)

取り出したい形
Map<Student, List<Score>>

生徒の一覧を取り、それぞれの生徒の(2017年の)成績を取り、それをMono/FluxではなくMapやListの形にして返すというものです。

なぜわざわざMapに変換するんだ、なぜここでブロックしてしまうんだ、というツッコミを受けそうですが、あくまでも「全ての結果が揃ってから返す」けど、中の処理を並列にすることで、レスポンスを早くしたいというケースを想定しました。課題設定に口出し無用。


1. ノンブロッキング処理を体感する

まずは手探りでノンブロッキングなコードを書く所までをやります。

この章で紹介するソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample1.java


配列Fluxにする

まずは生徒一覧取得メソッドの実装として、生徒の配列からFluxを作って返します。

モックなので固定値を返してしまうのですが、検索を模しているので、1データ取得するのに100ミリ秒掛かるという想定にしました。こんな感じです。

Flux<Student> fetchStudents(String className) {
    Student[] students = {
            new Student(1, "Muto"),
            new Student(2, "Miyoshi"),
            new Student(3, "Matsui"),
            // 略
            new Student(28, "Mori"),
            new Student(29, "Tanaka"),
            new Student(30, "Yagi"),
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> students[i.intValue()])
            .take(students.length);
}

intervalメソッドで100ミリ秒おきに、mapメソッド配列から生徒を取り出して、takeメソッド配列分だけ生徒を取得したら終える、という実装です。

30人いるので、シーケンシャルに行えば、3000ミリ秒、つまり3秒掛かる処理ですね。


点数の一覧を取得するところも、実装の内容はほぼ同じです。

Flux<Score> fetchScores(int year, int id) {
    final Score[] scores = {
            new Score(id, "国語", 80),
            new Score(id, "数学", 90),
            new Score(id, "英語", 85),
            new Score(id, "社会", 93),
            new Score(id, "理科", 72)
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> scores[i.intValue()])
            .take(scores.length);
}

これもシーケンシャルに行えば、5科目 * 100ミリ秒で、500ミリ秒掛かる処理です。30人分あるので、15秒掛かる計算になります。

つまり生徒一覧と成績一覧の取得処理をすべてシーケンシャルに行うと、18秒掛かることになります。それをもっとうまく並列に処理したいというのが今回のテーマです。


生徒の結果が返ってき次第、成績を取得する

ここまでに書いた2つのメソッドを使って、生徒の一覧を取得して、それぞれの生徒ごとに成績の一覧を取得し、それをMapに変換するような処理を書いてみます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

System.out.println(map);

Fluxのsubscribeメソッドを使えば、値が1件戻ってくるたびにその値を使った処理を行うことができます。

そのメソッドを使って、

えーっと、、、先にMapのインスタンスを作っておいて、

subscribeの中でMapに追加していく、と、いう・・・、

完全に「素人ですかって」怒られるタイプのコードですよね、これ。いや素人ですから💢

あと、勘が良い方がは気づかれたかも知れませんが、ConcurrentModificationExceptionが発生しかねなかったりもしますね。悪い例がすぎますね。


ちなみにこのコードを実行すると、ほとんど結果が表示されずに終わってしまいます。mainスレッドが終了するからでしょうね。

であれば、その後にスリープしてしまえばいいんです。

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

いや素人にも程があるやろ💢💢 ってレベルのコードができあがりました。


これで実行させてみると・・・

{
{17 Kurosawa=[17 国語 80, 17 数学 90]
, 8 Sato=[8 国語 80, 8 数学 90, 8 英語 85, 8 社会 93, 8 理科 72]
, 11 Notsu=[11 国語 80, 11 数学 90, 11 英語 85, 11 社会 93, 11 理科 72]
, 1 Muto=[1 国語 80, 1 数学 90, 1 英語 85, 1 社会 93, 1 理科 72]
, 13 Ooga=[13 国語 80, 13 数学 90, 13 英語 85, 13 社会 93, 13 理科 72]
, 4 Nakamoto=[4 国語 80, 4 数学 90, 4 英語 85, 4 社会 93, 4 理科 72]
, 3 Matsui=[3 国語 80, 3 数学 90, 3 英語 85, 3 社会 93, 3 理科 72]
, 12 Taguchi=[12 国語 80, 12 数学 90, 12 英語 85, 12 社会 93, 12 理科 72]
, 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85]
, 10 Kikuchi=[10 国語 80, 10 数学 90, 10 英語 85, 10 社会 93, 10 理科 72]
, 5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72]
, 18 Kurashima=[18 国語 80]
, 2 Miyoshi=[2 国語 80, 2 数学 90, 2 英語 85, 2 社会 93, 2 理科 72]
, 15 Shiroi=[15 国語 80, 15 数学 90, 15 英語 85, 15 社会 93]
, 6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72]
, 14 Sugimoto=[14 国語 80, 14 数学 90, 14 英語 85, 14 社会 93, 14 理科 72]
, 7 Sugisaki=[7 国語 80, 7 数学 90, 7 英語 85, 7 社会 93, 7 理科 72]
, 9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72]
}

あぁー、どうあれ、なんか期待通りのものが取れてるじゃないですか! 勝利!!


と思ってよく見たら、18人分しか取れていません。僕的にはNakamoto, Kikuchi, Mizunoが取れていれば良いのですが。

また、15人目〜18人目の生徒は途中までしか成績が取れていません。なるほど非同期で処理してる途中でSystem.out.printlnが呼ばれて、処理が終わってしまったわけですね。


であれば、2000Lとしたsleep時間を10000Lぐらいにすれば解決しますよね!

・・・みたいな話を続けてるといい加減怒られそうなので、そろそろ真面目にやりましょう。


ログを出そう

真面目にやる前に、少し脇道に逸れて、ログの話をします。

ReactorのFluxMonoは、どこで何が起きているか分かりにくいのを少しでも解消するためか、随所に埋め込めるlogメソッドが用意されています。


このlogメソッドを使えばコンソールに最低限の情報を出ます。ただもう少し詳しい情報が欲しくなるため、ログフォーマットを指定したlogback.xmlを用意しておきます。

<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="debug">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

もちろんpom.xmlにもlogback-classicのdependencyを追加しておいてください。

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

あとは見るべき所にログを埋め込んでいきます。今回はfetchStudentsメソッドと、fetchScoresメソッドの2箇所に入れておきます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .log("fetchStudents")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .log("fetchScores")
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

細かな挙動を把握したければもう少し違う箇所にもログを埋め込めば良いのですが、ログが多すぎても見通しが悪くなるため、この2箇所だけにしています。


これで実行して取れたログを見てみます。 # で始まっている部分は僕がつけたコメントです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
16:00:09.593 [main] INFO  fetchStudents - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.601 [main] INFO  fetchStudents - request(unbounded)

# 生徒取得処理のonNextが呼ばれて、1人目のMutoさんの値が取得できる。
16:00:09.710 [parallel-1] INFO  fetchStudents - onNext(1 Muto)

# Mutoさんの成績取得処理のためにonSubscribeとrequestが呼ばれる。
16:00:09.712 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.712 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの値が取得でき、成績の取得が始まる。
16:00:09.804 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
16:00:09.805 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.805 [parallel-1] INFO  fetchScores - request(unbounded)

# 1人目のMutoさんの国語の成績が取れる。これは、先ほどまでとは別のスレッドで行われる。
16:00:09.816 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの値が取得でき、成績の取得が始まる。
16:00:09.906 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
16:00:09.906 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.906 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの国語の成績がまた別スレッドで取得できる。
16:00:09.907 [parallel-3] INFO  fetchScores - onNext(2 国語 80)

# 1人目のMutoさんの数学の成績が、国語が取れた時と同じスレッドで取得できる。
16:00:09.918 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの値が取得でき、成績の取得が始まる。
16:00:10.006 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
16:00:10.006 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:10.006 [parallel-1] INFO  fetchScores - request(unbounded)

ここまでの流れを確認すると、次のような流れになります。


1. まずmainスレッドで、生徒取得(fetchStudents)のsubscribe登録を行う

2. 生徒の値が返ってくると、parallel-1スレッドでsubscribeの中に書いた処理が行われる。

3. parallel-1スレッドで、成績取得(fetchScores)のsubscribe登録を行う

4. 成績の値が返ってくると、parallel-2からparallel-4までのスレッドでsubscribeの中に書いた処理が行われる。


ソースコードはとてもダメでしたが、処理自体はおおむね期待通りになっていることが分かりました。

ノンブロッキングかどうかみたいな話は、また後で詳しくやります。


ちなみに最後の方はこうなっていました。

16:00:11.509 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
16:00:11.511 [parallel-3] INFO  fetchScores - onNext(18 国語 80)
16:00:11.512 [parallel-4] INFO  fetchScores - onNext(15 社会 93)
16:00:11.512 [parallel-3] INFO  fetchScores - onNext(14 理科 72)
16:00:11.512 [parallel-3] INFO  fetchScores - onComplete()
16:00:11.514 [parallel-2] INFO  fetchScores - onNext(17 数学 90)
16:00:11.605 [parallel-1] INFO  fetchStudents - onNext(20 Yamaide)
16:00:11.606 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
{18 Kurashima=[18 国語 80], 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85], # (以降、省略)
16:00:11.606 [parallel-1] INFO  fetchScores - request(unbounded)

20人目のYamaideさんの処理を始めたところで、2000ミリ秒のsleepが終わって結果が出力された、という感じですね。


2. Reactorらしいコードを書く

上のようなコードを書いたあと、どうするのが正解なのかよく分からないなという気持ちになり @ さんに質問をしたところ、コードを添削して諸々教えてくれました。それが今回このエントリーを書くきっかけにもなったのです。


Reactorらしい修正をしたソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample2.java


subscribeではなくflatMapを使うが良い

Stream APIにおいて、foreachでListやMapに値を追加していくのが悪手であることは皆さんご存知だと思いますが、それはReactorのAPIでも変わりありません。subscribeの中で外部の変数に作用すると、処理の見通しが悪くなります。


今回の目的を実現するコードは、次のように修正できます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .flatMap(student -> fetchScores(2017, student.id)
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

subscribeではなくflatMapを使って結果を変換します。fetchScoresで取得した Flux<Student> をcollectListで Mono<List<Student>> にして、それを一度Tuple2にするという形です。

いやこれ、自分じゃ思いつかない流れですが・・・。


その後、collectMapでTuple2からMapを作り、blockを使って待ち受けて、Monoではない通常のMapを取得しました。

これを実行すると、きちんと30人分の成績を取得することができました。


配列からFluxを作る別の方法

ところで、配列からFluxを作るところも、map/takeを使うのではなく、次のような形で書く方法を教えてもらいました。

return Flux.interval(Duration.ofMillis(100))
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

こんな書き方、チュートリアルにはなかったよ!

まぁでも、なるほどですね。


ログも見てみよう

この書き方ではどのような順で処理が行われるのか、ログを見て確認をします。

上のコードに、ログ出力部分を加えます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

ログを入れるのは、やはりこの2箇所です。


出力されたログは次の通りです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
15:35:16.944 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
15:35:16.947 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理
15:35:17.073 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
15:35:17.083 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.084 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの処理
15:35:17.170 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
15:35:17.170 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.170 [parallel-1] INFO  fetchScores - request(unbounded)

# ここで1人目のMutoさんの国語の成績が返ってきた
15:35:17.187 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの処理
15:35:17.270 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
15:35:17.270 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.270 [parallel-1] INFO  fetchScores - request(unbounded)

# 次は2人目のMiyoshiさんの国語の成績と、1人目のMutoさんの数学の成績が返ってきた
15:35:17.273 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
15:35:17.285 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの処理
15:35:17.368 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
15:35:17.369 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.370 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの数学と、3人目のMatsuiさんの国語が返ってきた
15:35:17.374 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
15:35:17.374 [parallel-4] INFO  fetchScores - onNext(3 国語 80)

流れ的には元のものと同じですね。


またログの最後の部分を見ると、きちんと30人分の成績を受信しきってから、表示をしていました。

15:35:20.174 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
15:35:20.174 [parallel-4] INFO  fetchScores - onComplete()
15:35:20.270 [parallel-3] INFO  fetchScores - onNext(30 英語 85)
15:35:20.270 [parallel-2] INFO  fetchScores - onNext(29 社会 93)
15:35:20.276 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
15:35:20.276 [parallel-1] INFO  fetchScores - onComplete()
15:35:20.371 [parallel-2] INFO  fetchScores - onNext(29 理科 72)
15:35:20.371 [parallel-3] INFO  fetchScores - onNext(30 社会 93)
15:35:20.371 [parallel-2] INFO  fetchScores - onComplete()
15:35:20.471 [parallel-3] INFO  fetchScores - onNext(30 理科 72)
15:35:20.471 [parallel-3] INFO  fetchScores - onComplete()
{22 Shintani=[22 国語 80, 22 数学 90, 22 英語 85, 22 社会 93, 22 理科 72], # (以降、省略)

また、ログのタイムスタンプから、およそ3秒半ほどですべての処理が終わっていることが分かります。つまり、シーケンシャルに行えば18秒掛かる処理を、うまく並行させて3秒半で終わらせているのです。

この3秒半というのは、30人分の情報を取得するのに掛かる3秒 + 最後の1人の成績を取りきるのに掛かる0.5秒と、よく一致しています。

ノンブロッキングな感じで、いいじゃないですか。


3. ノンブロッキングなら、シングルスレッドでも早いのか?

ここまでの処理を見て、ノンブロッキングと非同期(マルチスレッド)の違いがよく分からなくなった方もいるかも知れません。

上で「ノンブロッキングな感じ」と書きましたが、果たしてそのおかげで早かったのか、マルチスレッドの恩恵で早くなったのか、分かりにくいところがあります。


では、シングルスレッドにしてみればどうなるでしょうか。見てみましょう。

シングルスレッドにしたソースコードは次のURLから取得できます。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample3.java


publishOnでスレッドを制御

処理をシングルスレッドにする場合は、Fluxを生成している所でpublishOnメソッドを用いて、スレッドの作成ポリシーを決めます。ここではSchedulers.single() を用いてシングルスレッドで実行することにします。

return Flux.interval(Duration.ofMillis(100))
        .publishOn(Schedulers.single())
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

publishOn(Schedulers.single()) の処理を挟んだだけですね。

これをfetchStudents、fetshScoresのそれぞれで行います。


実行してみた

それではシングルスレッドになっているか、実行してログを確認してみましょう。

16:21:49.073 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
16:21:49.080 [main] INFO  fetchStudents - request(256)
16:21:49.197 [single-1] INFO  fetchStudents - onNext(1 Muto)
16:21:49.202 [single-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
16:21:49.202 [single-1] INFO  fetchScores - request(unbounded)
16:21:49.296 [single-1] INFO  fetchStudents - onNext(2 Miyoshi)
# (省略)
16:21:52.501 [single-1] INFO  fetchScores - onNext(29 理科 72)
16:21:52.501 [single-1] INFO  fetchScores - onComplete()
16:21:52.501 [single-1] INFO  fetchScores - onNext(30 社会 93)
16:21:52.603 [single-1] INFO  fetchScores - onNext(30 理科 72)
16:21:52.604 [single-1] INFO  fetchScores - onComplete()
{6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72], # (以降、省略)

一部抜粋ですが、すべての処理が「single-1」スレッドで行われていました。


シングルスレッドで行われていたにも関わらず、処理は3.5秒程度で完了しています。これで「100ミリ秒待つ」というのがノンブロッキングで行われていることが、なんとなく実感できました。


4. ブロッキング処理の場合はどうなる?

さて、ここまでは「ノンブロッキング処理は早くていいね、18秒掛かる処理が3.5秒になったよ」みたいな話でした。

しかし、もしRDBMSのような、ブロッキング処理のあるデータソースから情報を取得しなくてはいけなくなった場合は、どうなるのでしょうか。それを模したコードを書いて検証してみます。


ブロッキング処理にしたソースコードは、ここにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample4.java


Thread.sleepはブロッキング処理

ブロッキング処理を行うためには、Fluxを作成する処理の中で、Thread.sleepを行うのが良いです。

生徒や成績のFluxを行う処理を、次のように変更します。

Flux<Student> studentFlux = Flux.create(sink -> {
    for (Student student : students) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        sink.next(student);
    }
    sink.complete();
});
return studentFlux;

Flux.createメソッドで、sinkに値を入れていれていく形です。ここでThread.sleepを使うことで(先程までのFlux.intervalと異なり)このFluxの作成処理がブロッキング処理となるんです。ほぅ。


実行してログで確認

これまでの流れ通り、実行してログを確認してみましょう。

# mainスレッドでfetchStudentsの呼び出し
16:38:37.457 [main] INFO  fetchStudents - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.461 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理をmainメソッドで行い、成績取得もすべてmainメソッド
16:38:37.570 [main] INFO  fetchStudents - onNext(1 Muto)
16:38:37.574 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.575 [main] INFO  fetchScores - request(unbounded)
16:38:37.677 [main] INFO  fetchScores - onNext(1 国語 80)
16:38:37.781 [main] INFO  fetchScores - onNext(1 数学 90)
16:38:37.882 [main] INFO  fetchScores - onNext(1 英語 85)
16:38:37.983 [main] INFO  fetchScores - onNext(1 社会 93)
16:38:38.085 [main] INFO  fetchScores - onNext(1 理科 72)
16:38:38.087 [main] INFO  fetchScores - onComplete()
16:38:38.087 [main] INFO  fetchStudents - request(1)

# 2人目のMiyoshiさんの処理もとにかくmainメソッド
16:38:38.193 [main] INFO  fetchStudents - onNext(2 Miyoshi)
16:38:38.195 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:38.195 [main] INFO  fetchScores - request(unbounded)
16:38:38.296 [main] INFO  fetchScores - onNext(2 国語 80)
16:38:38.401 [main] INFO  fetchScores - onNext(2 数学 90)
16:38:38.503 [main] INFO  fetchScores - onNext(2 英語 85)
16:38:38.606 [main] INFO  fetchScores - onNext(2 社会 93)
16:38:38.709 [main] INFO  fetchScores - onNext(2 理科 72)
16:38:38.709 [main] INFO  fetchScores - onComplete()
16:38:38.709 [main] INFO  fetchStudents - request(1)

# 3人目以降も同様
16:38:38.812 [main] INFO  fetchStudents - onNext(3 Matsui)
# (省略)
16:38:54.864 [main] INFO  fetchStudents - request(1)
16:38:54.967 [main] INFO  fetchStudents - onNext(29 Tanaka)
16:38:54.968 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:54.968 [main] INFO  fetchScores - request(unbounded)
16:38:55.072 [main] INFO  fetchScores - onNext(29 国語 80)
16:38:55.176 [main] INFO  fetchScores - onNext(29 数学 90)
16:38:55.281 [main] INFO  fetchScores - onNext(29 英語 85)
16:38:55.386 [main] INFO  fetchScores - onNext(29 社会 93)
16:38:55.488 [main] INFO  fetchScores - onNext(29 理科 72)
16:38:55.488 [main] INFO  fetchScores - onComplete()
16:38:55.488 [main] INFO  fetchStudents - request(1)
16:38:55.592 [main] INFO  fetchStudents - onNext(30 Yagi)
16:38:55.592 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:55.592 [main] INFO  fetchScores - request(unbounded)
16:38:55.697 [main] INFO  fetchScores - onNext(30 国語 80)
16:38:55.798 [main] INFO  fetchScores - onNext(30 数学 90)
16:38:55.902 [main] INFO  fetchScores - onNext(30 英語 85)
16:38:56.004 [main] INFO  fetchScores - onNext(30 社会 93)
16:38:56.110 [main] INFO  fetchScores - onNext(30 理科 72)
16:38:56.110 [main] INFO  fetchScores - onComplete()
16:38:56.110 [main] INFO  fetchStudents - request(1)
16:38:56.110 [main] INFO  fetchStudents - onComplete()
{5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72], # (以降、省略)

すべての処理がmainスレッドで行われており、時間も18.5秒掛かっています。これはすべての処理がシーケンシャルに行われれば18秒である、という計算と一致します。

いかにReactorを使っていようとも、途中にRDBMSへのJDBCドライバー経由でのアクセスなど、ブロッキング処理が入るとこのようになってしまうのです。


5. ブロッキング処理も、マルチスレッドで高速化

ブロッキング処理を使うと、Reactorを使う意味がないのでしょうか、というとそういうわけでもありません。

Reactorを使って、ブロッキング処理であっても、マルチスレッド処理を書くことができます。


ブロッキング処理をマルチスレッド化したソースコードは、次の場所にあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample5.java


subscribeOnでスレッドを制御

先の章ではFluxを生成する際にpublishOnメソッドを用いてスレッドの制御をしましたが、それと同様に、subscibeする側でsubscribeOnメソッドを用いることでも、スレッドを制御することができるようになります。シングルスレッドで生成されたものを、マルチスレッドで分担して処理するという形になります。


次のように、fetchStudents、fetchScoresの直後にsubscribeOnを渡します。ここで渡しているSchedulers.elastic()は、必要なだけスレッドを起こすというものです。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.elastic())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.elastic())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これだけでマルチスレッド化ができます。元の流れと大きく変わっていないところがポイントですね。


それでは、実行してログを見てみましょう。

17:35:03.769 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.773 [main] INFO  fetchStudents - request(256)
17:35:03.883 [elastic-2] INFO  fetchStudents - onNext(1 Muto)
17:35:03.897 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.898 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.001 [elastic-2] INFO  fetchStudents - onNext(2 Miyoshi)
17:35:04.001 [elastic-3] INFO  fetchScores - onNext(1 国語 80)
17:35:04.001 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.001 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-3] INFO  fetchScores - onNext(1 数学 90)
17:35:04.103 [elastic-2] INFO  fetchStudents - onNext(3 Matsui)
17:35:04.103 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.103 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-4] INFO  fetchScores - onNext(2 国語 80)
17:35:04.204 [elastic-3] INFO  fetchScores - onNext(1 英語 85)
17:35:04.204 [elastic-4] INFO  fetchScores - onNext(2 数学 90)
17:35:04.204 [elastic-2] INFO  fetchStudents - onNext(4 Nakamoto)
17:35:04.205 [elastic-5] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:35:07.131 [elastic-5] INFO  fetchScores - onComplete()
17:35:07.136 [elastic-6] INFO  fetchScores - onNext(28 社会 93)
17:35:07.136 [elastic-7] INFO  fetchScores - onNext(29 英語 85)
17:35:07.137 [elastic-8] INFO  fetchScores - onNext(30 数学 90)
17:35:07.241 [elastic-7] INFO  fetchScores - onNext(29 社会 93)
17:35:07.241 [elastic-6] INFO  fetchScores - onNext(28 理科 72)
17:35:07.241 [elastic-8] INFO  fetchScores - onNext(30 英語 85)
17:35:07.242 [elastic-6] INFO  fetchScores - onComplete()
17:35:07.344 [elastic-8] INFO  fetchScores - onNext(30 社会 93)
17:35:07.344 [elastic-7] INFO  fetchScores - onNext(29 理科 72)
17:35:07.345 [elastic-7] INFO  fetchScores - onComplete()
17:35:07.449 [elastic-8] INFO  fetchScores - onNext(30 理科 72)
17:35:07.450 [elastic-8] INFO  fetchScores - onComplete()
{30 Yagi=[30 国語 80, 30 数学 90, 30 英語 85, 30 社会 93, 30 理科 72], # (以降、省略)

詳細な説明は割愛しますが、ノンブロッキングで行っていたときと同じような処理の流れとなりました。処理は3.7秒で、ノンブロッキングのときと大差はありません。また、このログで見えているだけでも「elastic-8」まであり、8スレッド使っていることが分かります。


elastic vs parallel

上の例では8スレッド使っていましたが、Fluxを生成する際のsleep時間などを少し調整すると、優に数十スレッドを使ってしまいました。これはこれで、スレッドを使いすぎる問題が起きかねません。


もう少し加減してスレッドを使って欲しい場合には、Schedulers.elastic()ではなく、Schedulers.parallel()を使います。こちらはスレッド数をCPU数分までに制限します(ただし最低は4)

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.parallel())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.parallel())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これを実行した結果、こうなりました。

17:40:42.309 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.312 [main] INFO  fetchStudents - request(256)
17:40:42.427 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
17:40:42.438 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.438 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.542 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
17:40:42.542 [parallel-2] INFO  fetchScores - onNext(1 国語 80)
17:40:42.542 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.542 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.646 [parallel-2] INFO  fetchScores - onNext(1 数学 90)
17:40:42.646 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
17:40:42.646 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
17:40:42.648 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.648 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.751 [parallel-2] INFO  fetchScores - onNext(1 英語 85)
17:40:42.751 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
17:40:42.751 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
17:40:42.751 [parallel-4] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:40:47.023 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.125 [parallel-1] INFO  fetchScores - onNext(16 国語 80)
17:40:47.228 [parallel-1] INFO  fetchScores - onNext(16 数学 90)
17:40:47.333 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
17:40:47.437 [parallel-1] INFO  fetchScores - onNext(16 社会 93)
17:40:47.542 [parallel-1] INFO  fetchScores - onNext(16 理科 72)
17:40:47.543 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.648 [parallel-1] INFO  fetchScores - onNext(20 国語 80)
17:40:47.750 [parallel-1] INFO  fetchScores - onNext(20 数学 90)
17:40:47.853 [parallel-1] INFO  fetchScores - onNext(20 英語 85)
17:40:47.955 [parallel-1] INFO  fetchScores - onNext(20 社会 93)
17:40:48.060 [parallel-1] INFO  fetchScores - onNext(20 理科 72)
17:40:48.060 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.163 [parallel-1] INFO  fetchScores - onNext(24 国語 80)
17:40:48.264 [parallel-1] INFO  fetchScores - onNext(24 数学 90)
17:40:48.365 [parallel-1] INFO  fetchScores - onNext(24 英語 85)
17:40:48.468 [parallel-1] INFO  fetchScores - onNext(24 社会 93)
17:40:48.569 [parallel-1] INFO  fetchScores - onNext(24 理科 72)
17:40:48.569 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.671 [parallel-1] INFO  fetchScores - onNext(28 国語 80)
17:40:48.775 [parallel-1] INFO  fetchScores - onNext(28 数学 90)
17:40:48.879 [parallel-1] INFO  fetchScores - onNext(28 英語 85)
17:40:48.985 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
17:40:49.087 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
17:40:49.087 [parallel-1] INFO  fetchScores - onComplete()
{9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72], 1 # (以降、省略)

スレッド名が parallel-1 から parallel-4 までの4スレッドになりました。4スレッド使ってできる範囲で処理を分担し、最後は残った処理を1スレッドで片付けている、という形です。処理全体には7秒弱掛かっており、スレッド生成し放題のelasticに比べれば2倍近く時間が掛かっていますが、単一スレッドで行うよりもレスポンスは半分以下に短縮されています。


もちろんこの処理はReactorを使わずともJava標準のExecutorServiceなり何なりを使っても同じことができます。ただ、ノンブロッキングな処理も扱えるReactorと、統一した書き方ができるところがメリットの一つになると思います。いや、Reactorでブロッキング処理を扱うようなものを書けること自体、おまけみたいなものかも知れませんが。


ちなみにelasticやparallel以外にも、ExecutorServiceを渡すなどすることもできますが、shutdownのタイミングなど考えると管理が面倒なので、普通にelasticかparallelを使っておくのが良いと私は思っています。


まとめ

Reactorを使って、ノンブロッキング処理、ブロッキング処理を、それぞれどのように扱うかを確認しました。


特にデータアクセスや、マイクロサービス呼び出しのような待ち時間の多いシチュエーションにおいて、うまくノンブロッキング処理にできれば、レスポンスタイムを短縮できるんじゃないかなと思います。

もちろん、その分データソース側に負荷が掛かるため、データソース側が十分にスケールアウトできることが前提となりますが、現代であればそういう環境は手に入りやすいため、取り組む価値があると見ています。


今回のケースがReactorの最適なユースケースだというわけではないですが、業務において、このような使い方もできるという一例として、参考にしてもらえばと思います!

Enjoy, Reactor!

2015-12-24

[][]JMXで情報を取得する時のベンチマーク

JMHを使って、JMX経由でMBeanの情報を取る際のパフォーマンスを測定してみた。

ベンチマークソースコードはこちら。

https://github.com/cero-t/Benchmarks/blob/master/src/main/java/ninja/cero/benchmark/JmxBenchmark.java


ベンチマーク環境はMacBook Pro Late 2013 (Core i5 2.4GHz) で、

他のアプリなども立ち上げっぱなしの環境なのでノイズは多めでだけど、

傾向を見たいだけなのであまり気にせず。


VirtualMachine.attachのパフォーマンス

VMオンデマンドアタッチする際のパフォーマンス。


No1 : VirtualMachine.attacheしてからシステムプロパティを取ってdetachする

No2 : キャッシュしていたVirtualMachineを使ってシステムプロパティを取得する

@Benchmark
public void no1_vmAttach() throws Exception {
    VirtualMachine vm = VirtualMachine.attach(PID);
    vm.getSystemProperties();
    vm.detach();
}

@Benchmark
public void no2_vmCachedGetProperties() throws Exception {
    vm.getSystemProperties();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no1_vmAttachthrpt10653.834± 108.790ops/s
JmxBenchmark.no2_vmCachedGetPropertiesthrpt102330.529± 270.268ops/s

アタッチありは1.5msec程度、キャッシュした場合は0.4msec程度。

ということで、アタッチに掛かる時間は1msec程度と推定。割とでかい。


JMXConnectorFactory.connectのパフォーマンス

続いて、VMに対するJMX接続を行う際のパフォーマンス。


No3 : JMXConnectorの取得処理と、クローズ処理をする

No4 : キャッシュしていたJMXConnectorを使って、MBeanServerConnectionの取得とMbean情報を取得する

public JMXConnector no3_vmCachedGetConnector() throws Exception {
    String connectorAddress = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress");

    if (connectorAddress == null) {
        String agent = vm.getSystemProperties().getProperty("java.home") + File.separator + "lib" + File.separator + "management-agent.jar";
        vm.loadAgent(agent);
        connectorAddress = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress");
    }

    JMXServiceURL serviceURL = new JMXServiceURL(connectorAddress);
    JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL);
    jmxConnector.close();
}

@Benchmark
public void no4_connectorCachedGetMBeanCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    connection.getMBeanCount();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no3_vmCachedGetConnectorthrpt10612.652± 95.547ops/s
JmxBenchmark.no4_connectorCachedGetMBeanCountthrpt109047.803± 1480.741ops/s

JMXの接続と切断は1.6msec程度。おおまかVMに対するアタッチと同じぐらい。

接続したあとの、MBeanServerへの接続とMBean情報取得は0.1msecぐらいで、これは無視できる小さい。


ThreadMXBeanからThreadCountを取るパフォーマンス

今回の主目的はこれ。

ThreadMXBeanを使って情報を取るのと、

MBeanServerConnection.getAttributeで名前を指定して情報を取るのと、どっちが早いか。


No5 : MBeanServerConnection.getAttributeの名前指定でThreadCountを取得する

No6 : ThreadMXBeanを取得してから、getThreadCountで取得する

No7 : キャッシュしていたThreadMXBeanから、getThreadCountで取得する

@Benchmark
public void no5_connectorCachedThreadCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    Object count = connection.getAttribute(new ObjectName(ManagementFactory.THREAD_MXBEAN_NAME), "ThreadCount");
    sum += (Integer) count;
} 

@Benchmark
public void no6_connectorCachedGetThreadCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    ThreadMXBean threadBean = ManagementFactory.newPlatformMXBeanProxy(
            connection, ManagementFactory.THREAD_MXBEAN_NAME, ThreadMXBean.class);
    sum += threadBean.getThreadCount();
}

@Benchmark
public void no7_beanCachedGetThreadCount() throws Exception {
    sum += threadMXBean.getThreadCount();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no5_connectorCachedThreadCountthrpt108199.887± 1269.164ops/s
JmxBenchmark.no6_connectorCachedGetThreadCountthrpt102662.147± 585.627ops/s
JmxBenchmark.no7_beanCachedGetThreadCountthrpt108148.967± 1705.518ops/s

ThreadMXBeanを毎回取る(No6)は明らかにパフォーマンスが悪いけど、

ThreadMXBeanをキャッシュしている限りは、ThreadMXBeanから情報を取るのと、

MBeanServerConnection.getAttributeで取ることに性能差はなし。


まとめ

1. VirtualMachineへのattach/detachは時間が掛かるので、キャッシュすべき

2. JMX接続の確立/切断は時間が掛かるので、キャッシュすべき

3. MBeanServerへの接続は時間が掛からないので、無理にキャッシュしなくてよい(close処理もないのでリソース管理もしてない?)

4. MBeanServerConnection.getAttributeでもThreadMXBeanを使っても性能差はないので、ThreadMXBeanを無理にキャッシュしなくてよい


ベンチマークソースコード

https://github.com/cero-t/Benchmarks/blob/master/src/main/java/ninja/cero/benchmark/JmxBenchmark.java


現場からは以上です。

2013-11-07

[]なぜ僕はCaliperではなくJMHを選んだのか。

今日、会社のblogのほうにJMHのエントリーを書きました。

そう、今日のテーマはマイクロベンチマークです。

Javaのマイクロベンチマークツール「JMH」 - Taste of Tech Topics

Javaのマイクロベンチマークに興味がある人は、

GoogleのCaliperというマイクロベンチマークツールを既にご存知かも知れません。

しかし上記のエントリーではCaliperには全く触れていません。


なぜか。

いや、Caliperを試そうとしたら、うまく動かせなかったんですよ (^^;


そんな、まともに動かせない所や、ドキュメントの更新のないCaliperに幻滅した後

JMHを試してみたら、思いのほか素直で使いやすかったため上で紹介するに至りました。


ちょっとここでは、その辺りの舞台裏を紹介してみます。


Caliperをはじめてみようとしたけど・・・

Caliperのトップページには、Get started的な始め方が記載されています。

http://code.google.com/p/caliper/


Mavenプロジェクトを作って、pom.xmlに追記して、

トップページにあるサンプルソースコードをコピーして・・・


あれ、えっと・・・?


一体、どうやって実行するのでしょう? まさか実行の仕方を調べるためだけに、

30分近いチュートリアルビデオを見なければいけないのでしょうか?

さすがにそれはありえないので、ドキュメントを見てみましょう。


Caliperはドキュメントを更新する気がないのかね。

Google Codeのドキュメントと言えば、Wiki。

https://code.google.com/p/caliper/w/list

早速、このWikiを見てみると・・・あぁ、UsersGuideというのがあるようです。


http://code.google.com/p/caliper/wiki/UsersGuide

何これ、目次だけでまだドキュメントが全然ありません。

QuickStartもないので、始めようがありませんでした。


この点、JMHも同じくドキュメントはほとんどありませんが

少なくともトップページで動かせるまでは面倒を見てくれるので

Caliperよりも親切だと言えます。


Code Samplesのコンパイルが通らない

トップページにあるサンプルだけではきっと動かないのだと思い、

リンクされてる「Very short tutorial examples.」を試してみることにしました。

https://code.google.com/p/caliper/source/browse/tutorial/Tutorial.java


うん、コンパイルが通らない (^^;


少し調べてみると、どうやらCaliperはバージョン1.0から大幅にAPIが変わって

アノテーションベースになるらしく、リンク先のサンプルはそれに合った内容となっています。


しかしいまMavenリポジトリにあるものは、命名規則ベースの古いバージョンで、

setUpやtimeXxxという名前のメソッドを探して実行するという、JUnit3のような思想になっています。


なんかこんな風に、複数のバージョンの情報が混在していて

きちんと情報が整理されていない辺りに、心が折れ始めました。


Caliperを動かしたら、いきなり30秒以上かかって死んだ。

その後もうしばらく調べてみて、どうやらCaliperMainクラスのmainメソッドから

ベンチマークを実行できることが分かりました。


ということで自分で簡単なベンチマークを書いてみたうえで

オプションなどつけずに実行してみました。


順調に進んでるかな・・・

Experiment selection: 
  Instruments:   [allocation, micro]
  User parameters:   {}
  Virtual machines:  [default]
  Selection type:    Full cartesian product

This selection yields 4 experiments.
Starting experiment 1 of 4: {instrument=allocation, method=WithInitialSize, vm=default, parameters={}}
Complete!
Starting experiment 2 of 4: {instrument=allocation, method=WithoutInitialSize, vm=default, parameters={}}
Complete!
Starting experiment 3 of 4: {instrument=micro, method=WithInitialSize, vm=default, parameters={}}
ERROR: Trial failed to complete (its results will not be included in the run):
  Trial exceeded the total allowable runtime (30s). The limit may be adjusted using the --time-limit flag.

・・・と思ったら

なんか30秒以上実行したせいでエラーとなりました。


引数を指定すれば30秒制限を突破できるようですが、

そもそも何を何回実行しようとしてエラーになったのか、さっぱり分からないので

30秒制限をオフにして良いのかどうかも分かりません。

だいぶ心が折れてきました。


Caliperでテスト終わったら、勝手に結果がアップロードされた。

先ほどのテストは、まだ続きがあります。

片方は30秒制限を受けましたが、もう一方は何やらきちんと終了したようです。

Starting experiment 4 of 4: {instrument=micro, method=WithoutInitialSize, vm=default, parameters={}}
Complete!

Execution complete: 1.151m.
Collected 45 measurements from:
  2 instrument(s)
  1 virtual machine(s)
  2 benchmark(s)
Results have been uploaded. View them at: https://microbenchmarks.appspot.com/runs/d94be1c2-8be0-48e7-8481-4492145eecd5

Process finished with exit code 0

えっと、結果は・・・1分と少々掛かって、えっと・・・

って、何!?

ベンチマークの結果がアップロードされたって?


そうなんです、普通に実行しただけなのに、ベンチマークの結果がGAEにアップロードされました。


恐らくオプションなどを指定してオフライン実行などもできるでしょうし、

こうやってアップロードされることで、より詳しく分析などもできるなど

嬉しいこともたくさんあるのだと思います。


しかしながら、簡単にマイクロベンチマークを取りたいという僕の想いとは

あまりに掛け離れた思想をしているCaliperに愛想がつきて、心は完全に折れました。


それに比べて、JMHは素直です。

まずは @GenerateMicroBenchmark アノテーションさえつければ動きますし、

オプションなど指定しなくともきちんと動き、分かりやすい結果を出してくれます。


少なくともいまこの時点で始めるのであれば、

JMHの方が使いやすいと言えるのではないのかな、と思います。


批判、コメント、アドバイス、どしどしお待ちしています!

2013-07-12

[]定義的プログラミング

定義的プログラミングっていう言葉を思いついたから、メモしとく。

イメージ的にはこんなん。


Before: 普通の処理コード

public Dto process(Map<String, String> input) {
	Dto dto = new Dto();
	dto.userName = input.get("ユーザ名");
	dto.password = input.get("パスワード");

	ValidateUtils.checkRequired(dto.userName);
	ValidateUtils.checkLength(dto.userName, 0, 24);
	ValidateUtils.checkRequired(dto.password);
	ValidateUtils.checkLength(dto.password, 8, 24);
	ValidateUtils.checkAlphanumericSymbol(dto.password);

After: 定義的プログラミング

static {
  define("ユーザ名", "userName", REQUIRED, ALPHANUMERIC);
  define("パスワード", "password", NOT_REQUIRED, ALPHANUMERIC_SYMBOL);
}

public static void define(String columnName, String propertyName, FieldType... types) {
	nameMap.put(columnName, propertyName);
	typeMap.put(columnName, types);
}

public T process(Map<String, String> input, Class<T> clazz) {
	// がんばる感じのコード
}

仕様書に載っている表をそのままコードに落とすようなイメージなので

バグが減らせるし、仕様変更にも対応しやすい。

状態遷移のような、複雑な処理ほど効果的だと思う。


自分で実装する時はよくこんな感じにするのだけど、

なかなか後輩には伝えにくかった。

名前をつけておけば伝えやすくなるかなと思って、メモしといた。

2013-06-29

[]JJUGナイトセミナー「from old Java to modern Java

先日のJJUGナイトセミナーで

「from old Java to modern Java」という話をしてきました。

公式サイト : http://www.java-users.jp/?p=551

togetter : http://togetter.com/li/521481


Java以前のCライクな書き方から、J2SE 1.4、5.0・・・からJava SE 8まで

歴史を辿りながらソースコードの変遷を紹介しました。

このスライドは、日経ソフトウエア 2013年7月号に執筆させて頂いた

特集記事「そのコードは古い」のJava編のアイデアを活用したものです。

http://itpro.nikkeibp.co.jp/article/MAG/20130617/485621/

雑誌記事のほうでは、文法が変化した背景やメリットなどにも触れていますので

スライドだけでは物足りない方は、ぜひ雑誌を手にとって頂ければと思います。


また、SlideShareに公開したスライドは、たくさんの方に見ていただけたようで

はてブのホッテントリや、SlideShareのHot on Twitterに掲載され、閲覧数も1万回を超えました。

つぶやいてくださった皆さん、ありがとうございます!


そのおかげで、Javaから遠ざかっていた方や、Java以外の言語を使っている方、

ナイトセミナーにはいらっしゃらない方々にも

Javaのイマを垣間見てもらうことができたのは、良かったかなと思います。


あと、スライドの最後に to be continued... とか入れてしまっているので

もしかしたら、どこかで続編をやるかも知れませんね (^^;