谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2017-12-15

[]ReactorでN+1問題な処理を実装してみた話

最近、格ゲーツイートが増えてる @ です。前のエントリーに書いた「18年ぶりに出る続編」のβテストがついに始まりまして、最近は夜な夜なコンボをやるなどしています。

シビアな反応が要求される格闘ゲームにおいて、継続して勝ち続けるためにはどうしても反射神経が必要となり、機械のような反射神経、つまり「反応装置」にならなくてはいけません、そうだから今日のテーマは「Reactor」なのです、、、みたいな流れを考えたんですが、どうにも苦しいですよね。ろくにスベることもできない中ですが、Javaアドベントカレンダー15日目が始まりました。

https://qiita.com/advent-calendar/2017/java


さて、Project ReactorはReactiveなノンブロッキング処理を書くためのライブラリです。最近はSpring 5.0などでも全面的に利用されているため話題になりがちです。今日はこれを勉強しながら、業務にありそうなケースを実装してみます。

なお、同期とか非同期とかブロッキングとかノンブロッキングとかReactiveとかの言葉の定義は、この際、置いておきます。実装を見て、雰囲気で掴んでください。


目次

0. はじめに

1. ノンブロッキング処理を体感する

2. Reactorらしいコードを書く

3. ノンブロッキングなら、シングルスレッドでも早いのか?

4. ブロッキング処理の場合はどうなる?

5. ブロッキング処理も、マルチスレッドで高速化


0〜2までは、Reactor初心者が苦慮してコードを書いていった話。

2〜5は、ノンブロッキング処理やブロッキング処理を、それぞれシングルスレッド、マルチスレッドで試してみた話です。

長いエントリーなので、興味がある部分を拾い読みしてもらえればと思います。


0. はじめに

まずは事前の準備や、目的などを説明します。


事前にやること

Reactorについて完全に素人だったので、まずは入門のスライドを読んでから、ハンズオン(チュートリアル)をやりました。


Spring 5に備えるリアクティブプログラミング入門

https://www.slideshare.net/TakuyaIwatsuka/spring-5


Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3

https://www.slideshare.net/makingx/reactive-web-spring-5-jjugccc-cccef3


Lite Rx API Hands-on(チュートリアル

https://github.com/reactor/lite-rx-api-hands-on/


なんとなく、Java5世代の非同期脳で、MonoがFutureのようなもの、FluxがList版のFutureのようなもの、という理解をしました。実際にはJava8で追加されたCompletableFutureのようなもののようですが、それはおいとくとして。

とりあえずこれらでReactorの概要をざっくり掴んだのですが、とにかく機能が多すぎて、全く覚えきれません。Stream APIをもう一度学びなおしてるような気持ちです。


今回の目的

今回は、いわゆる「N+1問題」をReactorで実装するとどうなるか、という検証をしてみます。

テーブルAから一覧データをN件持ってきて、次にそれに関連するデータをテーブルBから持ってくるという処理において、クエリがN+1回発生してしまうせいで遅い、というアレです。SQLを工夫すればクエリ1回で済むやろっていうコメントは、本題からズレるので★1です!


今回は「生徒一覧」を取得したうえで、生徒の「成績一覧」を検索することを想定します。

また検索対象は、RDBMSのようなブロッキング処理しかできないデータソースの場合と、何かしらイイ感じのノンブロッキングなデータソースの場合とを、それぞれ想定して比較します。


コードで言うと、次のようなイメージです。

生徒一覧の取得
Flux<Student> fetchStudents(String className)

点数一覧の取得
Flux<Score> fetchScores(int year, int id)

取り出したい形
Map<Student, List<Score>>

生徒の一覧を取り、それぞれの生徒の(2017年の)成績を取り、それをMono/FluxではなくMapやListの形にして返すというものです。

なぜわざわざMapに変換するんだ、なぜここでブロックしてしまうんだ、というツッコミを受けそうですが、あくまでも「全ての結果が揃ってから返す」けど、中の処理を並列にすることで、レスポンスを早くしたいというケースを想定しました。課題設定に口出し無用。


1. ノンブロッキング処理を体感する

まずは手探りでノンブロッキングなコードを書く所までをやります。

この章で紹介するソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample1.java


配列Fluxにする

まずは生徒一覧取得メソッドの実装として、生徒の配列からFluxを作って返します。

モックなので固定値を返してしまうのですが、検索を模しているので、1データ取得するのに100ミリ秒掛かるという想定にしました。こんな感じです。

Flux<Student> fetchStudents(String className) {
    Student[] students = {
            new Student(1, "Muto"),
            new Student(2, "Miyoshi"),
            new Student(3, "Matsui"),
            // 略
            new Student(28, "Mori"),
            new Student(29, "Tanaka"),
            new Student(30, "Yagi"),
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> students[i.intValue()])
            .take(students.length);
}

intervalメソッドで100ミリ秒おきに、mapメソッド配列から生徒を取り出して、takeメソッド配列分だけ生徒を取得したら終える、という実装です。

30人いるので、シーケンシャルに行えば、3000ミリ秒、つまり3秒掛かる処理ですね。


点数の一覧を取得するところも、実装の内容はほぼ同じです。

Flux<Score> fetchScores(int year, int id) {
    final Score[] scores = {
            new Score(id, "国語", 80),
            new Score(id, "数学", 90),
            new Score(id, "英語", 85),
            new Score(id, "社会", 93),
            new Score(id, "理科", 72)
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> scores[i.intValue()])
            .take(scores.length);
}

これもシーケンシャルに行えば、5科目 * 100ミリ秒で、500ミリ秒掛かる処理です。30人分あるので、15秒掛かる計算になります。

つまり生徒一覧と成績一覧の取得処理をすべてシーケンシャルに行うと、18秒掛かることになります。それをもっとうまく並列に処理したいというのが今回のテーマです。


生徒の結果が返ってき次第、成績を取得する

ここまでに書いた2つのメソッドを使って、生徒の一覧を取得して、それぞれの生徒ごとに成績の一覧を取得し、それをMapに変換するような処理を書いてみます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

System.out.println(map);

Fluxのsubscribeメソッドを使えば、値が1件戻ってくるたびにその値を使った処理を行うことができます。

そのメソッドを使って、

えーっと、、、先にMapのインスタンスを作っておいて、

subscribeの中でMapに追加していく、と、いう・・・、

完全に「素人ですかって」怒られるタイプのコードですよね、これ。いや素人ですから💢

あと、勘が良い方がは気づかれたかも知れませんが、ConcurrentModificationExceptionが発生しかねなかったりもしますね。悪い例がすぎますね。


ちなみにこのコードを実行すると、ほとんど結果が表示されずに終わってしまいます。mainスレッドが終了するからでしょうね。

であれば、その後にスリープしてしまえばいいんです。

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

いや素人にも程があるやろ💢💢 ってレベルのコードができあがりました。


これで実行させてみると・・・

{
{17 Kurosawa=[17 国語 80, 17 数学 90]
, 8 Sato=[8 国語 80, 8 数学 90, 8 英語 85, 8 社会 93, 8 理科 72]
, 11 Notsu=[11 国語 80, 11 数学 90, 11 英語 85, 11 社会 93, 11 理科 72]
, 1 Muto=[1 国語 80, 1 数学 90, 1 英語 85, 1 社会 93, 1 理科 72]
, 13 Ooga=[13 国語 80, 13 数学 90, 13 英語 85, 13 社会 93, 13 理科 72]
, 4 Nakamoto=[4 国語 80, 4 数学 90, 4 英語 85, 4 社会 93, 4 理科 72]
, 3 Matsui=[3 国語 80, 3 数学 90, 3 英語 85, 3 社会 93, 3 理科 72]
, 12 Taguchi=[12 国語 80, 12 数学 90, 12 英語 85, 12 社会 93, 12 理科 72]
, 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85]
, 10 Kikuchi=[10 国語 80, 10 数学 90, 10 英語 85, 10 社会 93, 10 理科 72]
, 5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72]
, 18 Kurashima=[18 国語 80]
, 2 Miyoshi=[2 国語 80, 2 数学 90, 2 英語 85, 2 社会 93, 2 理科 72]
, 15 Shiroi=[15 国語 80, 15 数学 90, 15 英語 85, 15 社会 93]
, 6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72]
, 14 Sugimoto=[14 国語 80, 14 数学 90, 14 英語 85, 14 社会 93, 14 理科 72]
, 7 Sugisaki=[7 国語 80, 7 数学 90, 7 英語 85, 7 社会 93, 7 理科 72]
, 9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72]
}

あぁー、どうあれ、なんか期待通りのものが取れてるじゃないですか! 勝利!!


と思ってよく見たら、18人分しか取れていません。僕的にはNakamoto, Kikuchi, Mizunoが取れていれば良いのですが。

また、15人目〜18人目の生徒は途中までしか成績が取れていません。なるほど非同期で処理してる途中でSystem.out.printlnが呼ばれて、処理が終わってしまったわけですね。


であれば、2000Lとしたsleep時間を10000Lぐらいにすれば解決しますよね!

・・・みたいな話を続けてるといい加減怒られそうなので、そろそろ真面目にやりましょう。


ログを出そう

真面目にやる前に、少し脇道に逸れて、ログの話をします。

ReactorのFluxMonoは、どこで何が起きているか分かりにくいのを少しでも解消するためか、随所に埋め込めるlogメソッドが用意されています。


このlogメソッドを使えばコンソールに最低限の情報を出ます。ただもう少し詳しい情報が欲しくなるため、ログフォーマットを指定したlogback.xmlを用意しておきます。

<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="debug">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

もちろんpom.xmlにもlogback-classicのdependencyを追加しておいてください。

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

あとは見るべき所にログを埋め込んでいきます。今回はfetchStudentsメソッドと、fetchScoresメソッドの2箇所に入れておきます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .log("fetchStudents")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .log("fetchScores")
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

細かな挙動を把握したければもう少し違う箇所にもログを埋め込めば良いのですが、ログが多すぎても見通しが悪くなるため、この2箇所だけにしています。


これで実行して取れたログを見てみます。 # で始まっている部分は僕がつけたコメントです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
16:00:09.593 [main] INFO  fetchStudents - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.601 [main] INFO  fetchStudents - request(unbounded)

# 生徒取得処理のonNextが呼ばれて、1人目のMutoさんの値が取得できる。
16:00:09.710 [parallel-1] INFO  fetchStudents - onNext(1 Muto)

# Mutoさんの成績取得処理のためにonSubscribeとrequestが呼ばれる。
16:00:09.712 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.712 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの値が取得でき、成績の取得が始まる。
16:00:09.804 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
16:00:09.805 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.805 [parallel-1] INFO  fetchScores - request(unbounded)

# 1人目のMutoさんの国語の成績が取れる。これは、先ほどまでとは別のスレッドで行われる。
16:00:09.816 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの値が取得でき、成績の取得が始まる。
16:00:09.906 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
16:00:09.906 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.906 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの国語の成績がまた別スレッドで取得できる。
16:00:09.907 [parallel-3] INFO  fetchScores - onNext(2 国語 80)

# 1人目のMutoさんの数学の成績が、国語が取れた時と同じスレッドで取得できる。
16:00:09.918 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの値が取得でき、成績の取得が始まる。
16:00:10.006 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
16:00:10.006 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:10.006 [parallel-1] INFO  fetchScores - request(unbounded)

ここまでの流れを確認すると、次のような流れになります。


1. まずmainスレッドで、生徒取得(fetchStudents)のsubscribe登録を行う

2. 生徒の値が返ってくると、parallel-1スレッドでsubscribeの中に書いた処理が行われる。

3. parallel-1スレッドで、成績取得(fetchScores)のsubscribe登録を行う

4. 成績の値が返ってくると、parallel-2からparallel-4までのスレッドでsubscribeの中に書いた処理が行われる。


ソースコードはとてもダメでしたが、処理自体はおおむね期待通りになっていることが分かりました。

ノンブロッキングかどうかみたいな話は、また後で詳しくやります。


ちなみに最後の方はこうなっていました。

16:00:11.509 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
16:00:11.511 [parallel-3] INFO  fetchScores - onNext(18 国語 80)
16:00:11.512 [parallel-4] INFO  fetchScores - onNext(15 社会 93)
16:00:11.512 [parallel-3] INFO  fetchScores - onNext(14 理科 72)
16:00:11.512 [parallel-3] INFO  fetchScores - onComplete()
16:00:11.514 [parallel-2] INFO  fetchScores - onNext(17 数学 90)
16:00:11.605 [parallel-1] INFO  fetchStudents - onNext(20 Yamaide)
16:00:11.606 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
{18 Kurashima=[18 国語 80], 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85], # (以降、省略)
16:00:11.606 [parallel-1] INFO  fetchScores - request(unbounded)

20人目のYamaideさんの処理を始めたところで、2000ミリ秒のsleepが終わって結果が出力された、という感じですね。


2. Reactorらしいコードを書く

上のようなコードを書いたあと、どうするのが正解なのかよく分からないなという気持ちになり @ さんに質問をしたところ、コードを添削して諸々教えてくれました。それが今回このエントリーを書くきっかけにもなったのです。


Reactorらしい修正をしたソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample2.java


subscribeではなくflatMapを使うが良い

Stream APIにおいて、foreachでListやMapに値を追加していくのが悪手であることは皆さんご存知だと思いますが、それはReactorのAPIでも変わりありません。subscribeの中で外部の変数に作用すると、処理の見通しが悪くなります。


今回の目的を実現するコードは、次のように修正できます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .flatMap(student -> fetchScores(2017, student.id)
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

subscribeではなくflatMapを使って結果を変換します。fetchScoresで取得した Flux<Student> をcollectListで Mono<List<Student>> にして、それを一度Tuple2にするという形です。

いやこれ、自分じゃ思いつかない流れですが・・・。


その後、collectMapでTuple2からMapを作り、blockを使って待ち受けて、Monoではない通常のMapを取得しました。

これを実行すると、きちんと30人分の成績を取得することができました。


配列からFluxを作る別の方法

ところで、配列からFluxを作るところも、map/takeを使うのではなく、次のような形で書く方法を教えてもらいました。

return Flux.interval(Duration.ofMillis(100))
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

こんな書き方、チュートリアルにはなかったよ!

まぁでも、なるほどですね。


ログも見てみよう

この書き方ではどのような順で処理が行われるのか、ログを見て確認をします。

上のコードに、ログ出力部分を加えます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

ログを入れるのは、やはりこの2箇所です。


出力されたログは次の通りです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
15:35:16.944 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
15:35:16.947 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理
15:35:17.073 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
15:35:17.083 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.084 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの処理
15:35:17.170 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
15:35:17.170 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.170 [parallel-1] INFO  fetchScores - request(unbounded)

# ここで1人目のMutoさんの国語の成績が返ってきた
15:35:17.187 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの処理
15:35:17.270 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
15:35:17.270 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.270 [parallel-1] INFO  fetchScores - request(unbounded)

# 次は2人目のMiyoshiさんの国語の成績と、1人目のMutoさんの数学の成績が返ってきた
15:35:17.273 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
15:35:17.285 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの処理
15:35:17.368 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
15:35:17.369 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.370 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの数学と、3人目のMatsuiさんの国語が返ってきた
15:35:17.374 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
15:35:17.374 [parallel-4] INFO  fetchScores - onNext(3 国語 80)

流れ的には元のものと同じですね。


またログの最後の部分を見ると、きちんと30人分の成績を受信しきってから、表示をしていました。

15:35:20.174 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
15:35:20.174 [parallel-4] INFO  fetchScores - onComplete()
15:35:20.270 [parallel-3] INFO  fetchScores - onNext(30 英語 85)
15:35:20.270 [parallel-2] INFO  fetchScores - onNext(29 社会 93)
15:35:20.276 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
15:35:20.276 [parallel-1] INFO  fetchScores - onComplete()
15:35:20.371 [parallel-2] INFO  fetchScores - onNext(29 理科 72)
15:35:20.371 [parallel-3] INFO  fetchScores - onNext(30 社会 93)
15:35:20.371 [parallel-2] INFO  fetchScores - onComplete()
15:35:20.471 [parallel-3] INFO  fetchScores - onNext(30 理科 72)
15:35:20.471 [parallel-3] INFO  fetchScores - onComplete()
{22 Shintani=[22 国語 80, 22 数学 90, 22 英語 85, 22 社会 93, 22 理科 72], # (以降、省略)

また、ログのタイムスタンプから、およそ3秒半ほどですべての処理が終わっていることが分かります。つまり、シーケンシャルに行えば18秒掛かる処理を、うまく並行させて3秒半で終わらせているのです。

この3秒半というのは、30人分の情報を取得するのに掛かる3秒 + 最後の1人の成績を取りきるのに掛かる0.5秒と、よく一致しています。

ノンブロッキングな感じで、いいじゃないですか。


3. ノンブロッキングなら、シングルスレッドでも早いのか?

ここまでの処理を見て、ノンブロッキングと非同期(マルチスレッド)の違いがよく分からなくなった方もいるかも知れません。

上で「ノンブロッキングな感じ」と書きましたが、果たしてそのおかげで早かったのか、マルチスレッドの恩恵で早くなったのか、分かりにくいところがあります。


では、シングルスレッドにしてみればどうなるでしょうか。見てみましょう。

シングルスレッドにしたソースコードは次のURLから取得できます。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample3.java


publishOnでスレッドを制御

処理をシングルスレッドにする場合は、Fluxを生成している所でpublishOnメソッドを用いて、スレッドの作成ポリシーを決めます。ここではSchedulers.single() を用いてシングルスレッドで実行することにします。

return Flux.interval(Duration.ofMillis(100))
        .publishOn(Schedulers.single())
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

publishOn(Schedulers.single()) の処理を挟んだだけですね。

これをfetchStudents、fetshScoresのそれぞれで行います。


実行してみた

それではシングルスレッドになっているか、実行してログを確認してみましょう。

16:21:49.073 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
16:21:49.080 [main] INFO  fetchStudents - request(256)
16:21:49.197 [single-1] INFO  fetchStudents - onNext(1 Muto)
16:21:49.202 [single-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
16:21:49.202 [single-1] INFO  fetchScores - request(unbounded)
16:21:49.296 [single-1] INFO  fetchStudents - onNext(2 Miyoshi)
# (省略)
16:21:52.501 [single-1] INFO  fetchScores - onNext(29 理科 72)
16:21:52.501 [single-1] INFO  fetchScores - onComplete()
16:21:52.501 [single-1] INFO  fetchScores - onNext(30 社会 93)
16:21:52.603 [single-1] INFO  fetchScores - onNext(30 理科 72)
16:21:52.604 [single-1] INFO  fetchScores - onComplete()
{6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72], # (以降、省略)

一部抜粋ですが、すべての処理が「single-1」スレッドで行われていました。


シングルスレッドで行われていたにも関わらず、処理は3.5秒程度で完了しています。これで「100ミリ秒待つ」というのがノンブロッキングで行われていることが、なんとなく実感できました。


4. ブロッキング処理の場合はどうなる?

さて、ここまでは「ノンブロッキング処理は早くていいね、18秒掛かる処理が3.5秒になったよ」みたいな話でした。

しかし、もしRDBMSのような、ブロッキング処理のあるデータソースから情報を取得しなくてはいけなくなった場合は、どうなるのでしょうか。それを模したコードを書いて検証してみます。


ブロッキング処理にしたソースコードは、ここにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample4.java


Thread.sleepはブロッキング処理

ブロッキング処理を行うためには、Fluxを作成する処理の中で、Thread.sleepを行うのが良いです。

生徒や成績のFluxを行う処理を、次のように変更します。

Flux<Student> studentFlux = Flux.create(sink -> {
    for (Student student : students) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        sink.next(student);
    }
    sink.complete();
});
return studentFlux;

Flux.createメソッドで、sinkに値を入れていれていく形です。ここでThread.sleepを使うことで(先程までのFlux.intervalと異なり)このFluxの作成処理がブロッキング処理となるんです。ほぅ。


実行してログで確認

これまでの流れ通り、実行してログを確認してみましょう。

# mainスレッドでfetchStudentsの呼び出し
16:38:37.457 [main] INFO  fetchStudents - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.461 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理をmainメソッドで行い、成績取得もすべてmainメソッド
16:38:37.570 [main] INFO  fetchStudents - onNext(1 Muto)
16:38:37.574 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.575 [main] INFO  fetchScores - request(unbounded)
16:38:37.677 [main] INFO  fetchScores - onNext(1 国語 80)
16:38:37.781 [main] INFO  fetchScores - onNext(1 数学 90)
16:38:37.882 [main] INFO  fetchScores - onNext(1 英語 85)
16:38:37.983 [main] INFO  fetchScores - onNext(1 社会 93)
16:38:38.085 [main] INFO  fetchScores - onNext(1 理科 72)
16:38:38.087 [main] INFO  fetchScores - onComplete()
16:38:38.087 [main] INFO  fetchStudents - request(1)

# 2人目のMiyoshiさんの処理もとにかくmainメソッド
16:38:38.193 [main] INFO  fetchStudents - onNext(2 Miyoshi)
16:38:38.195 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:38.195 [main] INFO  fetchScores - request(unbounded)
16:38:38.296 [main] INFO  fetchScores - onNext(2 国語 80)
16:38:38.401 [main] INFO  fetchScores - onNext(2 数学 90)
16:38:38.503 [main] INFO  fetchScores - onNext(2 英語 85)
16:38:38.606 [main] INFO  fetchScores - onNext(2 社会 93)
16:38:38.709 [main] INFO  fetchScores - onNext(2 理科 72)
16:38:38.709 [main] INFO  fetchScores - onComplete()
16:38:38.709 [main] INFO  fetchStudents - request(1)

# 3人目以降も同様
16:38:38.812 [main] INFO  fetchStudents - onNext(3 Matsui)
# (省略)
16:38:54.864 [main] INFO  fetchStudents - request(1)
16:38:54.967 [main] INFO  fetchStudents - onNext(29 Tanaka)
16:38:54.968 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:54.968 [main] INFO  fetchScores - request(unbounded)
16:38:55.072 [main] INFO  fetchScores - onNext(29 国語 80)
16:38:55.176 [main] INFO  fetchScores - onNext(29 数学 90)
16:38:55.281 [main] INFO  fetchScores - onNext(29 英語 85)
16:38:55.386 [main] INFO  fetchScores - onNext(29 社会 93)
16:38:55.488 [main] INFO  fetchScores - onNext(29 理科 72)
16:38:55.488 [main] INFO  fetchScores - onComplete()
16:38:55.488 [main] INFO  fetchStudents - request(1)
16:38:55.592 [main] INFO  fetchStudents - onNext(30 Yagi)
16:38:55.592 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:55.592 [main] INFO  fetchScores - request(unbounded)
16:38:55.697 [main] INFO  fetchScores - onNext(30 国語 80)
16:38:55.798 [main] INFO  fetchScores - onNext(30 数学 90)
16:38:55.902 [main] INFO  fetchScores - onNext(30 英語 85)
16:38:56.004 [main] INFO  fetchScores - onNext(30 社会 93)
16:38:56.110 [main] INFO  fetchScores - onNext(30 理科 72)
16:38:56.110 [main] INFO  fetchScores - onComplete()
16:38:56.110 [main] INFO  fetchStudents - request(1)
16:38:56.110 [main] INFO  fetchStudents - onComplete()
{5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72], # (以降、省略)

すべての処理がmainスレッドで行われており、時間も18.5秒掛かっています。これはすべての処理がシーケンシャルに行われれば18秒である、という計算と一致します。

いかにReactorを使っていようとも、途中にRDBMSへのJDBCドライバー経由でのアクセスなど、ブロッキング処理が入るとこのようになってしまうのです。


5. ブロッキング処理も、マルチスレッドで高速化

ブロッキング処理を使うと、Reactorを使う意味がないのでしょうか、というとそういうわけでもありません。

Reactorを使って、ブロッキング処理であっても、マルチスレッド処理を書くことができます。


ブロッキング処理をマルチスレッド化したソースコードは、次の場所にあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample5.java


subscribeOnでスレッドを制御

先の章ではFluxを生成する際にpublishOnメソッドを用いてスレッドの制御をしましたが、それと同様に、subscibeする側でsubscribeOnメソッドを用いることでも、スレッドを制御することができるようになります。シングルスレッドで生成されたものを、マルチスレッドで分担して処理するという形になります。


次のように、fetchStudents、fetchScoresの直後にsubscribeOnを渡します。ここで渡しているSchedulers.elastic()は、必要なだけスレッドを起こすというものです。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.elastic())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.elastic())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これだけでマルチスレッド化ができます。元の流れと大きく変わっていないところがポイントですね。


それでは、実行してログを見てみましょう。

17:35:03.769 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.773 [main] INFO  fetchStudents - request(256)
17:35:03.883 [elastic-2] INFO  fetchStudents - onNext(1 Muto)
17:35:03.897 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.898 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.001 [elastic-2] INFO  fetchStudents - onNext(2 Miyoshi)
17:35:04.001 [elastic-3] INFO  fetchScores - onNext(1 国語 80)
17:35:04.001 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.001 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-3] INFO  fetchScores - onNext(1 数学 90)
17:35:04.103 [elastic-2] INFO  fetchStudents - onNext(3 Matsui)
17:35:04.103 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.103 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-4] INFO  fetchScores - onNext(2 国語 80)
17:35:04.204 [elastic-3] INFO  fetchScores - onNext(1 英語 85)
17:35:04.204 [elastic-4] INFO  fetchScores - onNext(2 数学 90)
17:35:04.204 [elastic-2] INFO  fetchStudents - onNext(4 Nakamoto)
17:35:04.205 [elastic-5] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:35:07.131 [elastic-5] INFO  fetchScores - onComplete()
17:35:07.136 [elastic-6] INFO  fetchScores - onNext(28 社会 93)
17:35:07.136 [elastic-7] INFO  fetchScores - onNext(29 英語 85)
17:35:07.137 [elastic-8] INFO  fetchScores - onNext(30 数学 90)
17:35:07.241 [elastic-7] INFO  fetchScores - onNext(29 社会 93)
17:35:07.241 [elastic-6] INFO  fetchScores - onNext(28 理科 72)
17:35:07.241 [elastic-8] INFO  fetchScores - onNext(30 英語 85)
17:35:07.242 [elastic-6] INFO  fetchScores - onComplete()
17:35:07.344 [elastic-8] INFO  fetchScores - onNext(30 社会 93)
17:35:07.344 [elastic-7] INFO  fetchScores - onNext(29 理科 72)
17:35:07.345 [elastic-7] INFO  fetchScores - onComplete()
17:35:07.449 [elastic-8] INFO  fetchScores - onNext(30 理科 72)
17:35:07.450 [elastic-8] INFO  fetchScores - onComplete()
{30 Yagi=[30 国語 80, 30 数学 90, 30 英語 85, 30 社会 93, 30 理科 72], # (以降、省略)

詳細な説明は割愛しますが、ノンブロッキングで行っていたときと同じような処理の流れとなりました。処理は3.7秒で、ノンブロッキングのときと大差はありません。また、このログで見えているだけでも「elastic-8」まであり、8スレッド使っていることが分かります。


elastic vs parallel

上の例では8スレッド使っていましたが、Fluxを生成する際のsleep時間などを少し調整すると、優に数十スレッドを使ってしまいました。これはこれで、スレッドを使いすぎる問題が起きかねません。


もう少し加減してスレッドを使って欲しい場合には、Schedulers.elastic()ではなく、Schedulers.parallel()を使います。こちらはスレッド数をCPU数分までに制限します(ただし最低は4)

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.parallel())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.parallel())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これを実行した結果、こうなりました。

17:40:42.309 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.312 [main] INFO  fetchStudents - request(256)
17:40:42.427 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
17:40:42.438 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.438 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.542 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
17:40:42.542 [parallel-2] INFO  fetchScores - onNext(1 国語 80)
17:40:42.542 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.542 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.646 [parallel-2] INFO  fetchScores - onNext(1 数学 90)
17:40:42.646 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
17:40:42.646 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
17:40:42.648 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.648 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.751 [parallel-2] INFO  fetchScores - onNext(1 英語 85)
17:40:42.751 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
17:40:42.751 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
17:40:42.751 [parallel-4] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:40:47.023 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.125 [parallel-1] INFO  fetchScores - onNext(16 国語 80)
17:40:47.228 [parallel-1] INFO  fetchScores - onNext(16 数学 90)
17:40:47.333 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
17:40:47.437 [parallel-1] INFO  fetchScores - onNext(16 社会 93)
17:40:47.542 [parallel-1] INFO  fetchScores - onNext(16 理科 72)
17:40:47.543 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.648 [parallel-1] INFO  fetchScores - onNext(20 国語 80)
17:40:47.750 [parallel-1] INFO  fetchScores - onNext(20 数学 90)
17:40:47.853 [parallel-1] INFO  fetchScores - onNext(20 英語 85)
17:40:47.955 [parallel-1] INFO  fetchScores - onNext(20 社会 93)
17:40:48.060 [parallel-1] INFO  fetchScores - onNext(20 理科 72)
17:40:48.060 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.163 [parallel-1] INFO  fetchScores - onNext(24 国語 80)
17:40:48.264 [parallel-1] INFO  fetchScores - onNext(24 数学 90)
17:40:48.365 [parallel-1] INFO  fetchScores - onNext(24 英語 85)
17:40:48.468 [parallel-1] INFO  fetchScores - onNext(24 社会 93)
17:40:48.569 [parallel-1] INFO  fetchScores - onNext(24 理科 72)
17:40:48.569 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.671 [parallel-1] INFO  fetchScores - onNext(28 国語 80)
17:40:48.775 [parallel-1] INFO  fetchScores - onNext(28 数学 90)
17:40:48.879 [parallel-1] INFO  fetchScores - onNext(28 英語 85)
17:40:48.985 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
17:40:49.087 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
17:40:49.087 [parallel-1] INFO  fetchScores - onComplete()
{9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72], 1 # (以降、省略)

スレッド名が parallel-1 から parallel-4 までの4スレッドになりました。4スレッド使ってできる範囲で処理を分担し、最後は残った処理を1スレッドで片付けている、という形です。処理全体には7秒弱掛かっており、スレッド生成し放題のelasticに比べれば2倍近く時間が掛かっていますが、単一スレッドで行うよりもレスポンスは半分以下に短縮されています。


もちろんこの処理はReactorを使わずともJava標準のExecutorServiceなり何なりを使っても同じことができます。ただ、ノンブロッキングな処理も扱えるReactorと、統一した書き方ができるところがメリットの一つになると思います。いや、Reactorでブロッキング処理を扱うようなものを書けること自体、おまけみたいなものかも知れませんが。


ちなみにelasticやparallel以外にも、ExecutorServiceを渡すなどすることもできますが、shutdownのタイミングなど考えると管理が面倒なので、普通にelasticかparallelを使っておくのが良いと私は思っています。


まとめ

Reactorを使って、ノンブロッキング処理、ブロッキング処理を、それぞれどのように扱うかを確認しました。


特にデータアクセスや、マイクロサービス呼び出しのような待ち時間の多いシチュエーションにおいて、うまくノンブロッキング処理にできれば、レスポンスタイムを短縮できるんじゃないかなと思います。

もちろん、その分データソース側に負荷が掛かるため、データソース側が十分にスケールアウトできることが前提となりますが、現代であればそういう環境は手に入りやすいため、取り組む価値があると見ています。


今回のケースがReactorの最適なユースケースだというわけではないですが、業務において、このような使い方もできるという一例として、参考にしてもらえばと思います!

Enjoy, Reactor!

2016-12-08

Optimizing JavaというJavaパフォーマンス系の書籍が面白そう

急激な冷え込みのせいで「寒い!」というつぶやきがTLに散見されるこの頃ですが、皆さんお風邪など召していらっしゃらないでしょうか。

否応なしに寒いという言葉に反応してしまう、けなげなエンジニアの @ です。


このエントリーは Java Advent Calendar 2016 の8日目です。

昨日は @ さんの「Java Stream APIでハマったこと」で、

明日は @ さんの「マイクロベンチマークツール、JMHについて」でした。


今日のエントリーでは、Javaのパフォーマンス系書籍を紹介したいと思います。

Optimizing Java - O’Reilly Media

URLを見るにつけ、あのオライリー様のサイトですら拡張子が由緒正しい .do なのですから、日本のSIerStrutsを使うことをどうして否定できましょうか。

いえ、今日はそんな話題ではありません。


紹介したいのは上のリンク先の本、「Optimizing Java - Practical Techniques for Improved Performance Tuning」です。名前の通り、Javaのパフォーマンスに関する書籍です。まだEarly Releaseの段階で、全体の1/3ほどしか書かれていませんが、現状の版を入手したので紹介したいと思います。


ここまでで、「あれ、なんか似たような本がなかったっけ」と思った方がいらっしゃるかも知れません。そう、オライリー社からは2015年に「Javaパフォーマンス」という書籍が出版されています。

Javaパフォーマンス - O’Reilly Japan

こちらの日本語版では、私も監訳者まえがきを書かせて頂き、Java Day Tokyoで寺田佳央さんと共にサイン会を行いました。

当時はきっと「この寺田さんの横にいて本に落書きしてる人、誰なんだろう」と思われていたかも知れませんが、私を誰だと思ってるんでしょう、せろさんだぞ?


この2冊について、比較しながら紹介しましょう。


目次

Javaパフォーマンス」の目次は、次の通りです。

1章イントロダクション
2章パフォーマンステストのアプローチ
3章Javaパフォーマンスのツールボックス
4章JITコンパイラのしくみ
5章ガベージコレクションの基礎
6章ガベージコレクションアルゴリズム
7章ヒープのベストプラクティス
8章ネイティブメモリのベストプラクティス
9章スレッドと同期のパフォーマンス
10章Java EEのパフォーマンス
11章データベースのベストプラクティス
12章Java SEAPIのパフォーマンス

JavaのメモリやGCスレッドに関する紹介から、SE / EEやデータベースのパフォーマンスに広げた話をしています。


一方、「Optimizing Java」の目次は次の通りです。

Chapter 1Optimization and Performance Defined
Chapter 2Overview of the JVM
Chapter 3Hardware and Operating Systems
Chapter 4Performance Testing
Chapter 5Measurement and Bottom-Up Performance
Chapter 6Monitoring and Analysis
Chapter 7Hotspot GC Deep Dive
Chapter 8Garbage Collection Monitoring and Tuning
Chapter 9Hotspot JIT Compilation
Chapter 10Java Language Performance Techniques
Chapter 11Profiling
Chapter 12Concurrent Performance Techniques
Chapter 13The Future

うん、ほとんど一緒やん?


「Optimizing Java」には、「Javaパフォーマンス」では触れられていたSEやEEの話などはないため、そこが差分になりそうにも見えます。ただ正直、「Javaパフォーマンス」の10章以降はちょっと薄口な感じでしたので、そこを飛ばせばほとんど同じ内容を網羅していると言えます。


では、何が違うんでしょうか。


Javaパフォーマンス vs Optimizing Java

僕が見た限りでは「Javaパフォーマンス」は教科書に近い内容、「Optimizing Java」はやや読み物寄りの内容になっています。

「Optimizing Java」は、現在執筆されているChapter 5までしか読めていませんが、「Javaパフォーマンス」には書かれていなかったOSJVM周りのレイヤーの話や、テスト戦略の話など、少し目線が違った内容を書いていました。


たとえば、Javaのクラスファイルが「0xCAFEBABE」から始まっていることは、Javaに詳しい方なら既にご存じかと思います。ただ、その先はどうなっているのか。

書籍では次のように紹介されています。

  • Magic Number (0xCAFEBABE)
  • Version of Class File Format
  • Constant Pool
  • Access Flags
  • This Class Name
  • Super Class Name
  • Interfaces
  • Fields
  • Methods
  • Attributes

この先頭を取って

M V C A T S I F M A、

語呂合わせして

My Very Cute Animal Turn Savage In Full Moon Areas

なんて紹介されています。


「僕のとってもかわいい猫は、満月のエリアで凶暴になる」

・・・覚えやすいんですかね、これ?


あ、なんかふざけた本だなと思ったかも知れませんが、もちろん技術的な面もきちんと紹介されています。

あくまで上に書いたようなウィット(?)も挟みながら、Javaの領域だけでなく、必要に応じて低レイヤーにも触れて紹介する本となっているわけです。そのため、「Javaパフォーマンス」を読んだ方でも楽しめる本になるのではないかと思います。


で、いつ出るの? 日本語版は?

この本は2017年3月に出版予定となっています。


また、皆さん気になる日本語版ですが、残念ながらまだ翻訳されることは決まっていないようです。

ただ原著の人気が高かったり、この後に公開される6章以降の内容が「Javaパフォーマンス」とはまた違った切り口であり楽しめるのであれば、翻訳される可能性も十分にあるんじゃないかなと思っています。


そんなわけで、日本語版が出ることを祈りながら、このエントリーを書きました。

Stay tuned, see you!

2016-01-06

[]AWS Lambda + Javaは、なぜ1回目と3回目の処理が重いのか?

以前のエントリーで、AWS LambdaでJavaを使ってDynamoDBを呼び出した際に、初回起動にとても時間が掛かったという話を書きました。

http://d.hatena.ne.jp/cero-t/20160101/1451665326


今回は、この辺りの原因をもう少し追求してみます。


なぜ1回目と3回目のアクセスが遅いのか?

AWS Lambdaの中身はよく知りませんが、おそらく、アップロードしたモジュールをTomcatみたいなコンテナとして起動させて、外部からコールしているんだろうと予想しました。それであれば、2回目以降のアクセスが早くなることは理解ができます。

ただ、1回目と3回目だけが極端に遅くて、2回目、4回目以降は早くなるというところは腑に落ちません。


その辺りを調べるべく、staticなカウンタを使って、値がどんな風に変化するかを調べてみました。

こんなソースコードです。

public class Pid {
    static AtomicLong counter = new AtomicLong();

    public String myHandler() {
        long count = counter.incrementAndGet();
        String name = ManagementFactory.getRuntimeMXBean().getName();
        System.out.println("Name: " + name);
        System.out.println("Count: " + count);
        return "SUCCESS";
    }
}

出力された結果は、次のようになりました。

回数NameCount
1回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal1
2回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal2
3回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal1
4回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal2
5回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal3
6回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal4
7回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal5
8回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal3
9回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal4
10回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal5
11回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal6
12回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal7

サーバのIPアドレスが2種類あり、それぞれのサーバで、1から順番にカウントアップしていることが分かります。

なるほど、2台のサーバでロードバランシングしているのだと。そのため、それぞれのサーバの初回起動である、1回目と3回目の処理に時間が掛かるのですね。なかなか納得いく結果でした。

ちなみにロードバランシングは毎回このような結果になるわけではなく、1回目と2回目がそれぞれ別のサーバに行く(=処理に時間が掛かる)こともあります。


どんなコンテナを使っているのか?

先ほど「Tomcatみたいなコンテナ」を使っているんじゃないかと推測しましたが、実際、どんなコンテナを使っているのでしょうか。スレッドダンプを取って、確かめてみました。


こんなコードです。

public class StackTrace {
    public String myHandler() {
        new Exception("For stack trace").printStackTrace();
        Arrays.stream(ManagementFactory.getThreadMXBean().dumpAllThreads(true, true))
                .forEach(System.out::println);
        return "SUCCESS";
    }
}

結果、こうなりました。

java.lang.Exception: For stack trace
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:8)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at lambdainternal.EventHandlerLoader$PojoMethodRequestHandler.handleRequest(EventHandlerLoader.java:434)
	at lambdainternal.EventHandlerLoader$PojoHandlerAsStreamHandler.handleRequest(EventHandlerLoader.java:365)
	at lambdainternal.EventHandlerLoader$2.call(EventHandlerLoader.java:967)
	at lambdainternal.AWSLambda.startRuntime(AWSLambda.java:231)
	at lambdainternal.AWSLambda.<clinit>(AWSLambda.java:59)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at lambdainternal.LambdaRTEntry.main(LambdaRTEntry.java:93)

"Signal Dispatcher" Id=4 RUNNABLE

"Finalizer" Id=3 WAITING on java.lang.ref.ReferenceQueue$Lock@19bb089b
	at java.lang.Object.wait(Native Method)
(略)

"Reference Handler" Id=2 WAITING on java.lang.ref.Reference$Lock@4563e9ab
	at java.lang.Object.wait(Native Method)
(略)

"main" Id=1 RUNNABLE
	at sun.management.ThreadImpl.dumpThreads0(Native Method)
	at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446)
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:9)
(略)

何やらシンプルな独自コンテナを使っているみたいです。何度か実行してみても結果は同じでした。

ソースコードがないので推測になりますが、アップロードされたzipをロードして起動する独自コンテナがあり、外部からAPIコールされた際にAWSLambdaクラスあたりが処理を受け取って、zip内のハンドラを呼び出しているのでしょう。


なぜコンストラクタで処理すると早いの?

そういえば、もう一つ、謎な挙動がありました。

それは、Credentialsを取るという重めの処理をコンストラクタで実行すれば、処理時間がかなり短くなるというものです。


ハンドラの中でCredentialsを取ると、実測値で24秒ぐらい、課金対象値で22秒ぐらいでした。

コンストラクタでCredentialsを取っておくと、実測値で8秒ぐらい、課金対象値で6秒ぐらいでした。

ここでいう実測値とは、手元のストップウォッチを使って計測したという意味です。


ここから推測できることは、コンストラクタは事前に処理されていて、そこは課金対象外になるのかも知れません。


・・・あれ、それなら、コンストラクタで重い処理をがっつり走らせて、ハンドラでその結果を取り出せば、課金額を抑えられるじゃないですか?

ということで、ハンドラの中で10秒スリープする場合と、コンストラクタでスリープした場合の比較をしてみました。


こんな2つのクラスで試してみます。

public class Wait1 {
    static long origin = System.currentTimeMillis();

    public String myHandler() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
}
public class Wait2 {
    static long origin = System.currentTimeMillis();

    public Wait2() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果は、、、


Wait1(ハンドラの中でsleep)

回数Before waitAftre wait課金対象値実測値
1回目3901039010100ms10秒
2回目3871038710100ms10秒
3回目209173091710100ms10秒
4回目688527885210100ms10秒

きっちり10秒sleepして、課金対象値もそのオーバーヘッド分ぐらい。ストップウォッチで計測した値も同じく10秒ぐらいになりました。


Wait2(コンストラクタでsleep)

回数Before waitAftre wait課金対象値実測値
1回目3381033814800ms25秒
2回目--100ms1秒以下
3回目2611035814800ms25秒
4回目--100ms1秒以下

えーっ、sleepは10秒だったのに、なぜか15秒分ぐらい課金されてしまい、ストップウォッチで計測すると25秒と、えらく時間が掛かりました。これは謎な挙動です。

2回目や4回目ではインスタンス生成が終わっているので、Before waitやAfter waitが出力されず、処理がすぐに終わるというのは納得ですが。


どうして、こんなことが起きるんでしょうか。

不思議に思って、CloudWatch Logsのログを確認してみると・・・

Before wait: 17 
START RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f Version: $LATEST 
Before wait: 249 
After wait: 10250 
Called: 14408
END RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f 
REPORT RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f	Duration: 14531.57 ms	Billed Duration: 14600 ms Memory Size: 128 MB Max Memory Used: 27 MB	

最初のBefore waitの後にAfter waitがなく、Lambdaの処理がSTARTした後に再度Before waitが呼ばれ、After waitした後に、4秒ほど待ってから、ハンドラ処理が実行されてCalledが呼ばれていました。

なるほど、つまりこういうことでしょうか。


1. コンストラクタが実行され、10秒sleepの途中でタイムアウトして強制的に処理が止められ、インスタンス生成を諦めた(プロセスごと破棄された?)

2. 改めてコンストラクタが実行され、10秒sleepした。

3. AWS Lambda内の処理か何かで4秒ぐらい処理が掛かった。

4. ハンドラが実行された。

5. 2〜4の間が課金対象となり、15秒弱となった。

6. ストップウォッチで計測した時間は1から5までの間なので、25秒弱となった。


要するに、コンストラクタで重い処理を行うような悪いことを考える人への対策として、コンストラクタは一定時間で(おそらく10秒きっかりで)タイムアウトして、いったんプロセスは破棄される。

その後、改めてコンストラクタの処理がタイムアウト関係なく実行されたうえで、AWS Lambdaの内部処理と、ハンドラ処理が行われ、すべての処理が課金対象となる、ということころでしょうか。


コンストラクタの処理が短い場合は、どうなるの?

ということで、sleepの時間を短くして、再挑戦してみます。

public class Wait3 {
    static long origin = System.currentTimeMillis();

    public Wait3() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(2000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果。

回数Before WaitAfter wait課金対象値実測値
1回目--100ms3秒
2回目--100ms1秒以内
3回目--100ms1秒以内

CloudWatch Logsでの出力

Before wait: 25 
After wait: 2026 
START RequestId: xxx Version: $LATEST 
Called: 2389 
END RequestId: xxx 
REPORT RequestId: xxx	Duration: 97.52 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	 
START RequestId: yyy Version: $LATEST 
Called: 16385 
END RequestId: yyy 
REPORT RequestId: yyy	Duration: 6.04 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	

今度はSTARTの前に、きちんとBefore waitもAfter waitも出力され、ハンドラ処理のみが課金対象となっていました。


まとめ

ここまでの話をまとめると、AWS Lambdaは・・・


1. 複数台のサーバで処理されるため、それぞれのサーバでの初回起動時には処理時間が掛かる。

2. 独自のコンテナを利用して、モジュールをデプロイしている。

3. コンストラクタの処理が軽い場合は、ハンドラ内の処理だけが課金対象となる。

4. コンストラクタの処理が重い場合は、コンストラクタの処理 + 5秒弱 + ハンドラ内の処理が課金対象となる。


ということですね。


・・・とは言え、Credentialsの処理をコンストラクタで行った場合に、実測値まで早くなる辺りは、少しだけ不可解です。というか、Credentialsの取得処理が重いこと自体が不可解なのですが。

この辺りはもう少し追試験をしてみれば解析できそうですが、長くなるので、今回はこの辺りまでにしたいと思います。


いやー、Lambdaさん、なかなかよく考えられてますね!

2013-09-30

[]俺様とJavaOne 2013(中編)

JavaOne 3日目、自分のセッション当日は

やっぱり直前まで資料準備&練習でバタバタしていました。

この性格、死ぬまで治らない予感!


Day 3 : 解析ツールのセッションは人気

3日目、僕のセッションは夜7時半からなので、

朝イチにあった面白そうなセッションに参加していました。


[CON5092] Diagnosing Your Application on the JVM

元BEAのStaffan Larsenのセッション。

朝イチにも関わらず、満席になる人気セッションでした。


内容は、診断・解析ツールについて、デモを交えて次々と紹介するセッションで

主に7u40から使えるようになった(7u4から使えてたものもあるけど)

「jcmd」(旧jrcmd)を中心に紹介されていました。


ちょっと列挙しますと・・・

jps : Javaプロセスの一覧を列挙する

jcmd : 引数なしならjpsと同じ

jcmd <pid> VM.uptime : Javaプロセスの起動後の経過時間

jcmd <pid> Thread.print : スレッドダンプ。jstackと同じ。

jcmd <pid> GC.heap_dump : ヒープダンプ。jmapと同じ。

jcmd -gcnew <pid> 1s : 毎秒のGC領域のサイズを見る

jcmd <pid> PerfCounter.print : JVM内部で保持している様々なカウンタを取得

などなど。


自分的に衝撃だった事と言えば、jcmdコマンドの引数で pid に 0 を指定すると

全てのJavaプロセスの情報をまとめて取れる、というところ。

実際に使う機会があるかどうかは分かりませんが、良い事を知った感がありますね(笑


またセッション後半は、このような解析ツールをリモートから実行するために、

jstatdやJMX Remoteを利用するという話や、その裏側の仕組みが説明されました。


正直、他の作業をしながら聞いていたので、かなり聞き逃してしまったのですが

リモートから診断・解析をする時に、何ができるか・できないかを判断するための

背景となる知識が得られる良い内容でした。まさにJavaOneらしい内容だったと言えます。


特に解析する機会が多いとか、開発ツールを作る立場であるとか、

そういう人は、このセッションの資料をきちんと読むべきだと思いますね、

っていうか、私も、きちんと読み直します!


ところで、セッションの後に

こんな風にスピーカーのStaffanさんに、お礼なのか挑発なのか分からないツイートをして

私のセッションに来て頂きました。我ながら、強引なことをしたもんです。


そんなわけで、夜には自分のセッションがあったわけですが

それについては、前後の話も含めて、またきちんと別エントリとして投稿します。


Day 4 - Lambda、Lambda、JFR

おはよう世界。

自分のセッションが終わった開放感からか、倒れ込むように寝てしまい

これは昼まで寝るかなと思ったら、意外と5時間睡眠ぐらいで目が覚めてしまい

時差ボケの威力を実感した早朝でした。


そんなわけで、朝から元気にセッションに参加します。

そう、僕のJavaOne参加はここから始まったわけです。


[CON2055] Programming with Lambda Expressions in Java

Agile Developer, Inc.の社長、Venkat Subramaniamによる

軽妙でウィットに溢れたLambdaのセッションでした。


内容的には、外部イテレーターから内部イテレーターの書き方の移り変わり、

Lambdaの文法やstreamの使い方や効果などを紹介するという

比較的、初級者向けのセッションなのですが、その語り口調が面白すぎて

本当に笑いの絶えないセッションでした。


直接的な表現よりも、間接的な表現を軽妙に語る事で面白さを増す感じでしたね、たとえば

  • 汚い → とても子供に見せられない。しっ見ちゃいけません!
  • 危険 → 何をやろうとしているんだ、家に帰って考え直せ!
  • 素敵 → これは食欲をそそる!

などなど。

って私はJavaOneに来て何を学んでるんですかね。


もう少し実用的なところをフィードバックすると、

やはりLambda時代にはAPIデザインが少し変わるということでしょうか。


たとえば自分で比較するユーティリティメソッドを書く際には、

isPriceLessThan(500, value) と書けるようなAPIを提供するのではなく、

isPriceLessThan(500).test(value) と書けるようなAPIを提供することで

Lambda式として利用できるようにしていました。


ちょっとこの辺り、自分でも消化しきれていないので

日本に帰ったら資料を見ながら復習しようと思います。


ってよく考えたら、このセッション、

テキストエディタだけで話してたから、資料ないんだった (^^;;


[CON7942] Java 8 Streams: Lambda in Top Gear

続けてのLambdaセッション。

Paul Sandozと、Lambdaの神Brian Goetzのセッションです。


streamのAPIは、集計処理などにおいて、

うまくparallel化ができるもの、できないものがあったり、

処理を途中で中断しても良いもの、全ての要素を走査するものがあるなど、

APIは、いくつかのカテゴリで「分類」することができます。


この分類次第で、parallel化した時のパフォーマンスなども変わってくるため

streamを使う際には、この分類をきちんと押さえておかなければいけない、

ということが説明されていました。ちょっと自分にはなかった視点でした。


この辺りは、資料をダウンロードして学び直す必要があるので、

ボロが出ないうちに、説明をこの辺で切り上げましょう (^^;


[CON5091] Java Flight Recorder Behind the Scenes

3日の朝イチに解析ツールの紹介していた、Staffan Larsenのセッション。

前半こそFlight Recorderの紹介だったのですが、

後半はFlight Recorderの設計の話が展開され、かなり興味深かったですね。

そんな後半の話だけピックアップして紹介します。


1. Thread buffers

Flight Recorderが取得した情報は、スレッドローカルのThread buffersに貯めてから、

共有のGlobal buffersに書き出します。

こうすることで、Global buffersへの書き込みが衝突することを抑えています。

この辺りは、Flight Recorderがメイン処理に影響を与えないようにするために

欠かせない、いわば当たり前の設計でしょう。


2. Flight Recorderは永久には情報を取り続けない。

メモリリークへの対策として、情報は一定期間かサイズごとに消すか上書きしています。

これも当たり前のことですが、私は過去にちょっとやらかした事があります (^^;


3. Flight Recorderは、アプリケーションのクラスやオブジェクトへの参照を持たない。

これも、メモリリークを防ぐうえで不可欠のポリシーです。


4. クラス名はIDに変換する。

クラス名(文字列)を、int程度の数値に変換することで、

ファイルやメモリの空間効率を向上させます。

また、そのクラス名と数値のマップをFlight Recorderの出力ファイルに

持たせておくことで、互換性や移植性にも配慮しています。

こういう細かいところも、きちんと工夫しているんですね。


5. クラス一覧自体も定期的にリセットする。

クラス一覧がメモリリークの原因にならないよう、

一定期間ごとにファイルに出力して、クラス一覧をクリアしてしまいます。

(このタイミングを「チェックポイント」と読んでいました)

最後にクラス一覧をマージするかどうかは、ちょっと分かりませんでした。

マージしないと重複が出てしまって、ファイル効率がよくない気がします。


6. スタックトレースのpoolを作る。

同じスタックトレースが何度も表れることが多いため、

スタックトレースのプールを作っておいて、(全く)同じスタックトレースが

発生した場合には、前のスタックトレースへの参照を使うだけにします。

なるほど、勉強になります。


というような、ENdoSnipeの開発者的にありがたい情報がたくさんありました。

もちろんアプリケーション開発をするうえでも、このようなメモリやデータの

効率化の仕組みを「発想」することは、とても大切だと思います。

まだまだやるべき事があるのだなと、改めて思い知らされた感じです。


[CON2959] Modular JavaScript

Luminis Technologies社のSander Mak、Paul Bakkerのセッション。

なんかJavaとの連携もありそうな感じのアジェンダが提示されていたんですが

実際には、JavaScriptのライブラリやフレームワークを使ったときの

packageやclassの可視性なんかを、延々延々とJavaScriptのソースで説明するセッションで、

Javaコードは全く出てきませんでした。


ここJavaOneやぞ!


4日目終わり

この後の時間帯に、kotlinのセッションがあったり、

あの #てらだよしお さんのJavaEEのセッションがあったのですが、

どうしても眠かったため、ホテルに戻って休んでいました。


聞くところによると、kotlinのセッションは10人いなかったそうです。

kotlinの過疎感ハンパない!(><)


そして寺田さんのセッションは「質問はTwitterでお願い」と言っていたにも関わらず、

バンバン質問が出て、大変だったそうです(そして、きちんと回答したそうです!)

そりゃOracleのエンジニアがJavaEEについて話したら、質問出るって!


ちなみに4日目の夜には、トレジャーアイランドで

Maroon5というバンドのライブなどあったのですが、

上にも書いた通り、ホテルに戻ってお休みしていました。


Folder5が来るんだったら、無理してでも行ったと思うんですけどね。

#行かねーよ。


そんなわけで、JavaOneも後半戦に差し掛かってきました。

2012-08-30

[]volatileとか使うなと怒られた話

JJUG Night Seminar ~ Java VM<&納涼会 ~

http://kokucheese.com/event/index/48437/

に参加してきました。


「スタックマシンとしてのJavaVM」なんて言う

一見さんお断りみたいなタイトルに集まった人たちはもちろんレベルが高く

参加者のjavap経験率が20%ぐらいになるなど、なかなか偏った集客具合でした。


そんな中、私もLTでBTraceの話をしようと、45枚のスライドと、2種類のデモを用意していました。

ただ客層を考えるに、最近Twitterでちょっと呟いていた

「longに複数スレッドから値を代入すると、想定外の値になる」話をした方が良いんじゃないかと思い、

観客にどちらが聞きたいか尋ねてみたところ、longの方が圧倒的人気。


そんなわけで、45枚のスライドをドブに捨てて(いつか再利用するよ!)

スライドなしで、longの話をしてきました。

longに1と-1を入れ続けると、4294967295や-4294967295になる

論より証拠、まずはWindowsXP 32bitなど、

普通の32bit OSで、以下のコードを実行してみてください。

public class VolatileTest {
	private static long longValue = 0;

	public static void main(String[] args) throws Exception {
		final int LOOP = 1000 * 1000 * 1000;

		Thread th1 = new Thread(new Runnable() {
			public void run() {
				for (int i = 0; i < LOOP; i++) {
					longValue = 1;
					check(longValue);
				}
			}
		});

		Thread th2 = new Thread(new Runnable() {
			public void run() {
				for (int i = 0; i < LOOP; i++) {
					longValue = -1;
					check(longValue);
				}
			}
		});

		th1.start();
		th2.start();

		th1.join();
		th2.join();

		System.out.println("Finished");
	}

	private static void check(long value) {
		if (value != 1 && value != -1) {
			throw new RuntimeException(String.valueOf(value));
		}
	}
}

2スレッドからlongValueに1と-1を入れ続け、

もし値が1と-1以外になった場合には例外を発生させる、というプログラムです。


素直に考えると「1」と「-1」以外になることはあり得ないのですが、

実際には「4294967295」や「-4294967295」になることがあります。

64bitなlongへの代入は、上位32bitと下位32bitに分けて行なう

なぜそのような事が起こるのでしょうか?

それは、longへの代入がアトミック(分割できない最小単位)ではないためです。


見出しに書いた通りですが、Javaのlongは64bitであり、

特に32bit版のJavaVMでは、上位32bitと下位32bitの代入は別々に行なわれるのです。


つまり上に書いたプログラムの代入部分は、こんな風に読み替えることができます。

// Thread1側 (longValue = 1)
longValue_upper32 = 0x00000000;
longValue_lower32 = 0x00000001;

// Thread2側 (longValue = -1)
longValue_upper32 = 0xFFFFFFFF;
longValue_lower32 = 0xFFFFFFFF;

この処理をマルチスレッドで動かすわけですから、値の状態として

 longValue_upper32 = 0x00000000;

 longValue_lower32 = 0xFFFFFFFF;

とか

 longValue_upper32 = 0xFFFFFFFF;

 longValue_lower32 = 0x00000001;

とかになることは、容易に想像ができます。


その結果、

 前者の0x00000000FFFFFFFFが「4294967295」

 後者の0xFFFFFFFF00000001が「-4294967295」

になるのです。


ただしこの問題、64bitなMac OSJavaでは発生しません。

少なくとも私のMacBook Air(Lion)では、代入を上位下位に分割せず

64bit分を一度に代入するため、常に1か-1になります。

そこでvolatileの登場です

「ここで出てくるのがvolatileです」

って言ったら櫻庭さんから「あり得ない!」っていきなり怒られたのですが(><)

めげずに説明を続けますと

private volatile static long longValue = 0;

と、volatileで宣言すれば32bit Javaでも問題は起きません。


volatileには「値の参照の際に、常に最新の値を見に行く」という性質があり、

ざっくり言えば、volatileの代入や参照はロックされているような振る舞いをします。


そのため、longValueの値を上位32bit、下位32bitともきちんと処理し終わってから

他のスレッドからlongValueの参照ができるようになります。


なんだかよく分からない修飾子ナンバーワンのvolatileですが、

この例を通して見て頂ければ、その振る舞いがよく分かるのではないかと思います。

でもホントはAtomicLongを使う。

とは言え、

実際にどう振る舞うかが保証(規定)されていないvolatileを

このような場所で使うのは、必ずしも安全とは言えません。


また、よく分からない修飾子ナンバーワンなのですから、

よく分からないままコピーされたり、よく分からないまま削除されたり(!)しかねません。


では代わりに何を使うべきかというと、AtomicLongです。

private static AtomicLong longValue = new AtomicLong(0);

AtomicLongのgetやsetは、その名前の通りアトミックに行なわれるため

マルチスレッドで操作しても問題は発生しません。

分かりやすい名前のため、可読性も高いと言えるでしょう。


ところで、

変数をvolatileで宣言しても、インクリメント処理はアトミックにならないため、

複数スレッドからインクリメント処理を行なうと、値が壊れる可能性があります。

(これはlongだけでなく、shortやintなどでも発生します)

private volatile static int intValue = 0;

// 複数スレッドから実行すると値を更新しないことがある。
public void increment() {
	intValue++;
}

そのような場合でも、

AtomicIntegerやAtomicLongのincrementAndGetメソッドを使うことで

インクリメント処理をアトミックに行なうことができるのです。

volatileの使いどころ

そんなわけで使いにくいvolatileですが、

ステートフラグとか、ダブルチェックロッキングとかでは使えますよね!

まとめ

  • 変数への代入が、いつもアトミックだと過信してはいけない。
  • volatile! volatile!
  • 普通にAtomicIntegerやAtomicLongを使ってください。