谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2017-12-15

[]ReactorでN+1問題な処理を実装してみた話

最近、格ゲーツイートが増えてる @ です。前のエントリーに書いた「18年ぶりに出る続編」のβテストがついに始まりまして、最近は夜な夜なコンボをやるなどしています。

シビアな反応が要求される格闘ゲームにおいて、継続して勝ち続けるためにはどうしても反射神経が必要となり、機械のような反射神経、つまり「反応装置」にならなくてはいけません、そうだから今日のテーマは「Reactor」なのです、、、みたいな流れを考えたんですが、どうにも苦しいですよね。ろくにスベることもできない中ですが、Javaアドベントカレンダー15日目が始まりました。

https://qiita.com/advent-calendar/2017/java


さて、Project ReactorはReactiveなノンブロッキング処理を書くためのライブラリです。最近はSpring 5.0などでも全面的に利用されているため話題になりがちです。今日はこれを勉強しながら、業務にありそうなケースを実装してみます。

なお、同期とか非同期とかブロッキングとかノンブロッキングとかReactiveとかの言葉の定義は、この際、置いておきます。実装を見て、雰囲気で掴んでください。


目次

0. はじめに

1. ノンブロッキング処理を体感する

2. Reactorらしいコードを書く

3. ノンブロッキングなら、シングルスレッドでも早いのか?

4. ブロッキング処理の場合はどうなる?

5. ブロッキング処理も、マルチスレッドで高速化


0〜2までは、Reactor初心者が苦慮してコードを書いていった話。

2〜5は、ノンブロッキング処理やブロッキング処理を、それぞれシングルスレッド、マルチスレッドで試してみた話です。

長いエントリーなので、興味がある部分を拾い読みしてもらえればと思います。


0. はじめに

まずは事前の準備や、目的などを説明します。


事前にやること

Reactorについて完全に素人だったので、まずは入門のスライドを読んでから、ハンズオン(チュートリアル)をやりました。


Spring 5に備えるリアクティブプログラミング入門

https://www.slideshare.net/TakuyaIwatsuka/spring-5


Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3

https://www.slideshare.net/makingx/reactive-web-spring-5-jjugccc-cccef3


Lite Rx API Hands-on(チュートリアル

https://github.com/reactor/lite-rx-api-hands-on/


なんとなく、Java5世代の非同期脳で、MonoがFutureのようなもの、FluxがList版のFutureのようなもの、という理解をしました。実際にはJava8で追加されたCompletableFutureのようなもののようですが、それはおいとくとして。

とりあえずこれらでReactorの概要をざっくり掴んだのですが、とにかく機能が多すぎて、全く覚えきれません。Stream APIをもう一度学びなおしてるような気持ちです。


今回の目的

今回は、いわゆる「N+1問題」をReactorで実装するとどうなるか、という検証をしてみます。

テーブルAから一覧データをN件持ってきて、次にそれに関連するデータをテーブルBから持ってくるという処理において、クエリがN+1回発生してしまうせいで遅い、というアレです。SQLを工夫すればクエリ1回で済むやろっていうコメントは、本題からズレるので★1です!


今回は「生徒一覧」を取得したうえで、生徒の「成績一覧」を検索することを想定します。

また検索対象は、RDBMSのようなブロッキング処理しかできないデータソースの場合と、何かしらイイ感じのノンブロッキングなデータソースの場合とを、それぞれ想定して比較します。


コードで言うと、次のようなイメージです。

生徒一覧の取得
Flux<Student> fetchStudents(String className)

点数一覧の取得
Flux<Score> fetchScores(int year, int id)

取り出したい形
Map<Student, List<Score>>

生徒の一覧を取り、それぞれの生徒の(2017年の)成績を取り、それをMono/FluxではなくMapやListの形にして返すというものです。

なぜわざわざMapに変換するんだ、なぜここでブロックしてしまうんだ、というツッコミを受けそうですが、あくまでも「全ての結果が揃ってから返す」けど、中の処理を並列にすることで、レスポンスを早くしたいというケースを想定しました。課題設定に口出し無用。


1. ノンブロッキング処理を体感する

まずは手探りでノンブロッキングなコードを書く所までをやります。

この章で紹介するソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample1.java


配列Fluxにする

まずは生徒一覧取得メソッドの実装として、生徒の配列からFluxを作って返します。

モックなので固定値を返してしまうのですが、検索を模しているので、1データ取得するのに100ミリ秒掛かるという想定にしました。こんな感じです。

Flux<Student> fetchStudents(String className) {
    Student[] students = {
            new Student(1, "Muto"),
            new Student(2, "Miyoshi"),
            new Student(3, "Matsui"),
            // 略
            new Student(28, "Mori"),
            new Student(29, "Tanaka"),
            new Student(30, "Yagi"),
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> students[i.intValue()])
            .take(students.length);
}

intervalメソッドで100ミリ秒おきに、mapメソッド配列から生徒を取り出して、takeメソッド配列分だけ生徒を取得したら終える、という実装です。

30人いるので、シーケンシャルに行えば、3000ミリ秒、つまり3秒掛かる処理ですね。


点数の一覧を取得するところも、実装の内容はほぼ同じです。

Flux<Score> fetchScores(int year, int id) {
    final Score[] scores = {
            new Score(id, "国語", 80),
            new Score(id, "数学", 90),
            new Score(id, "英語", 85),
            new Score(id, "社会", 93),
            new Score(id, "理科", 72)
    };

    return Flux.interval(Duration.ofMillis(100))
            .map(i -> scores[i.intValue()])
            .take(scores.length);
}

これもシーケンシャルに行えば、5科目 * 100ミリ秒で、500ミリ秒掛かる処理です。30人分あるので、15秒掛かる計算になります。

つまり生徒一覧と成績一覧の取得処理をすべてシーケンシャルに行うと、18秒掛かることになります。それをもっとうまく並列に処理したいというのが今回のテーマです。


生徒の結果が返ってき次第、成績を取得する

ここまでに書いた2つのメソッドを使って、生徒の一覧を取得して、それぞれの生徒ごとに成績の一覧を取得し、それをMapに変換するような処理を書いてみます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

System.out.println(map);

Fluxのsubscribeメソッドを使えば、値が1件戻ってくるたびにその値を使った処理を行うことができます。

そのメソッドを使って、

えーっと、、、先にMapのインスタンスを作っておいて、

subscribeの中でMapに追加していく、と、いう・・・、

完全に「素人ですかって」怒られるタイプのコードですよね、これ。いや素人ですから💢

あと、勘が良い方がは気づかれたかも知れませんが、ConcurrentModificationExceptionが発生しかねなかったりもしますね。悪い例がすぎますね。


ちなみにこのコードを実行すると、ほとんど結果が表示されずに終わってしまいます。mainスレッドが終了するからでしょうね。

であれば、その後にスリープしてしまえばいいんです。

try {
    Thread.sleep(2000L);
} catch (InterruptedException e) {
    e.printStackTrace();
}

いや素人にも程があるやろ💢💢 ってレベルのコードができあがりました。


これで実行させてみると・・・

{
{17 Kurosawa=[17 国語 80, 17 数学 90]
, 8 Sato=[8 国語 80, 8 数学 90, 8 英語 85, 8 社会 93, 8 理科 72]
, 11 Notsu=[11 国語 80, 11 数学 90, 11 英語 85, 11 社会 93, 11 理科 72]
, 1 Muto=[1 国語 80, 1 数学 90, 1 英語 85, 1 社会 93, 1 理科 72]
, 13 Ooga=[13 国語 80, 13 数学 90, 13 英語 85, 13 社会 93, 13 理科 72]
, 4 Nakamoto=[4 国語 80, 4 数学 90, 4 英語 85, 4 社会 93, 4 理科 72]
, 3 Matsui=[3 国語 80, 3 数学 90, 3 英語 85, 3 社会 93, 3 理科 72]
, 12 Taguchi=[12 国語 80, 12 数学 90, 12 英語 85, 12 社会 93, 12 理科 72]
, 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85]
, 10 Kikuchi=[10 国語 80, 10 数学 90, 10 英語 85, 10 社会 93, 10 理科 72]
, 5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72]
, 18 Kurashima=[18 国語 80]
, 2 Miyoshi=[2 国語 80, 2 数学 90, 2 英語 85, 2 社会 93, 2 理科 72]
, 15 Shiroi=[15 国語 80, 15 数学 90, 15 英語 85, 15 社会 93]
, 6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72]
, 14 Sugimoto=[14 国語 80, 14 数学 90, 14 英語 85, 14 社会 93, 14 理科 72]
, 7 Sugisaki=[7 国語 80, 7 数学 90, 7 英語 85, 7 社会 93, 7 理科 72]
, 9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72]
}

あぁー、どうあれ、なんか期待通りのものが取れてるじゃないですか! 勝利!!


と思ってよく見たら、18人分しか取れていません。僕的にはNakamoto, Kikuchi, Mizunoが取れていれば良いのですが。

また、15人目〜18人目の生徒は途中までしか成績が取れていません。なるほど非同期で処理してる途中でSystem.out.printlnが呼ばれて、処理が終わってしまったわけですね。


であれば、2000Lとしたsleep時間を10000Lぐらいにすれば解決しますよね!

・・・みたいな話を続けてるといい加減怒られそうなので、そろそろ真面目にやりましょう。


ログを出そう

真面目にやる前に、少し脇道に逸れて、ログの話をします。

ReactorのFluxMonoは、どこで何が起きているか分かりにくいのを少しでも解消するためか、随所に埋め込めるlogメソッドが用意されています。


このlogメソッドを使えばコンソールに最低限の情報を出ます。ただもう少し詳しい情報が欲しくなるため、ログフォーマットを指定したlogback.xmlを用意しておきます。

<configuration>

  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="debug">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

もちろんpom.xmlにもlogback-classicのdependencyを追加しておいてください。

        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>

あとは見るべき所にログを埋め込んでいきます。今回はfetchStudentsメソッドと、fetchScoresメソッドの2箇所に入れておきます。

Map<Student, List<Score>> map = new HashMap<>();
fetchStudents("sg")
        .log("fetchStudents")
        .subscribe(student -> {
            fetchScores(2017, student.id)
                    .log("fetchScores")
                    .subscribe(score -> map.computeIfAbsent(student, s -> new ArrayList<>()).add(score));
        });

細かな挙動を把握したければもう少し違う箇所にもログを埋め込めば良いのですが、ログが多すぎても見通しが悪くなるため、この2箇所だけにしています。


これで実行して取れたログを見てみます。 # で始まっている部分は僕がつけたコメントです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
16:00:09.593 [main] INFO  fetchStudents - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.601 [main] INFO  fetchStudents - request(unbounded)

# 生徒取得処理のonNextが呼ばれて、1人目のMutoさんの値が取得できる。
16:00:09.710 [parallel-1] INFO  fetchStudents - onNext(1 Muto)

# Mutoさんの成績取得処理のためにonSubscribeとrequestが呼ばれる。
16:00:09.712 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.712 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの値が取得でき、成績の取得が始まる。
16:00:09.804 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
16:00:09.805 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.805 [parallel-1] INFO  fetchScores - request(unbounded)

# 1人目のMutoさんの国語の成績が取れる。これは、先ほどまでとは別のスレッドで行われる。
16:00:09.816 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの値が取得でき、成績の取得が始まる。
16:00:09.906 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
16:00:09.906 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:09.906 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの国語の成績がまた別スレッドで取得できる。
16:00:09.907 [parallel-3] INFO  fetchScores - onNext(2 国語 80)

# 1人目のMutoさんの数学の成績が、国語が取れた時と同じスレッドで取得できる。
16:00:09.918 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの値が取得でき、成績の取得が始まる。
16:00:10.006 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
16:00:10.006 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
16:00:10.006 [parallel-1] INFO  fetchScores - request(unbounded)

ここまでの流れを確認すると、次のような流れになります。


1. まずmainスレッドで、生徒取得(fetchStudents)のsubscribe登録を行う

2. 生徒の値が返ってくると、parallel-1スレッドでsubscribeの中に書いた処理が行われる。

3. parallel-1スレッドで、成績取得(fetchScores)のsubscribe登録を行う

4. 成績の値が返ってくると、parallel-2からparallel-4までのスレッドでsubscribeの中に書いた処理が行われる。


ソースコードはとてもダメでしたが、処理自体はおおむね期待通りになっていることが分かりました。

ノンブロッキングかどうかみたいな話は、また後で詳しくやります。


ちなみに最後の方はこうなっていました。

16:00:11.509 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
16:00:11.511 [parallel-3] INFO  fetchScores - onNext(18 国語 80)
16:00:11.512 [parallel-4] INFO  fetchScores - onNext(15 社会 93)
16:00:11.512 [parallel-3] INFO  fetchScores - onNext(14 理科 72)
16:00:11.512 [parallel-3] INFO  fetchScores - onComplete()
16:00:11.514 [parallel-2] INFO  fetchScores - onNext(17 数学 90)
16:00:11.605 [parallel-1] INFO  fetchStudents - onNext(20 Yamaide)
16:00:11.606 [parallel-1] INFO  fetchScores - onSubscribe(FluxTake.TakeSubscriber)
{18 Kurashima=[18 国語 80], 16 Isono=[16 国語 80, 16 数学 90, 16 英語 85], # (以降、省略)
16:00:11.606 [parallel-1] INFO  fetchScores - request(unbounded)

20人目のYamaideさんの処理を始めたところで、2000ミリ秒のsleepが終わって結果が出力された、という感じですね。


2. Reactorらしいコードを書く

上のようなコードを書いたあと、どうするのが正解なのかよく分からないなという気持ちになり @ さんに質問をしたところ、コードを添削して諸々教えてくれました。それが今回このエントリーを書くきっかけにもなったのです。


Reactorらしい修正をしたソースコードは次のURLにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample2.java


subscribeではなくflatMapを使うが良い

Stream APIにおいて、foreachでListやMapに値を追加していくのが悪手であることは皆さんご存知だと思いますが、それはReactorのAPIでも変わりありません。subscribeの中で外部の変数に作用すると、処理の見通しが悪くなります。


今回の目的を実現するコードは、次のように修正できます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .flatMap(student -> fetchScores(2017, student.id)
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

subscribeではなくflatMapを使って結果を変換します。fetchScoresで取得した Flux<Student> をcollectListで Mono<List<Student>> にして、それを一度Tuple2にするという形です。

いやこれ、自分じゃ思いつかない流れですが・・・。


その後、collectMapでTuple2からMapを作り、blockを使って待ち受けて、Monoではない通常のMapを取得しました。

これを実行すると、きちんと30人分の成績を取得することができました。


配列からFluxを作る別の方法

ところで、配列からFluxを作るところも、map/takeを使うのではなく、次のような形で書く方法を教えてもらいました。

return Flux.interval(Duration.ofMillis(100))
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

こんな書き方、チュートリアルにはなかったよ!

まぁでも、なるほどですね。


ログも見てみよう

この書き方ではどのような順で処理が行われるのか、ログを見て確認をします。

上のコードに、ログ出力部分を加えます。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

ログを入れるのは、やはりこの2箇所です。


出力されたログは次の通りです。

# まずは生徒取得処理の、onSubscribeとrequestが呼ばれる。
15:35:16.944 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
15:35:16.947 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理
15:35:17.073 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
15:35:17.083 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.084 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの処理
15:35:17.170 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
15:35:17.170 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.170 [parallel-1] INFO  fetchScores - request(unbounded)

# ここで1人目のMutoさんの国語の成績が返ってきた
15:35:17.187 [parallel-2] INFO  fetchScores - onNext(1 国語 80)

# 3人目のMatsuiさんの処理
15:35:17.270 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
15:35:17.270 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.270 [parallel-1] INFO  fetchScores - request(unbounded)

# 次は2人目のMiyoshiさんの国語の成績と、1人目のMutoさんの数学の成績が返ってきた
15:35:17.273 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
15:35:17.285 [parallel-2] INFO  fetchScores - onNext(1 数学 90)

# 4人目のNakamotoさんの処理
15:35:17.368 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
15:35:17.369 [parallel-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
15:35:17.370 [parallel-1] INFO  fetchScores - request(unbounded)

# 2人目のMiyoshiさんの数学と、3人目のMatsuiさんの国語が返ってきた
15:35:17.374 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
15:35:17.374 [parallel-4] INFO  fetchScores - onNext(3 国語 80)

流れ的には元のものと同じですね。


またログの最後の部分を見ると、きちんと30人分の成績を受信しきってから、表示をしていました。

15:35:20.174 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
15:35:20.174 [parallel-4] INFO  fetchScores - onComplete()
15:35:20.270 [parallel-3] INFO  fetchScores - onNext(30 英語 85)
15:35:20.270 [parallel-2] INFO  fetchScores - onNext(29 社会 93)
15:35:20.276 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
15:35:20.276 [parallel-1] INFO  fetchScores - onComplete()
15:35:20.371 [parallel-2] INFO  fetchScores - onNext(29 理科 72)
15:35:20.371 [parallel-3] INFO  fetchScores - onNext(30 社会 93)
15:35:20.371 [parallel-2] INFO  fetchScores - onComplete()
15:35:20.471 [parallel-3] INFO  fetchScores - onNext(30 理科 72)
15:35:20.471 [parallel-3] INFO  fetchScores - onComplete()
{22 Shintani=[22 国語 80, 22 数学 90, 22 英語 85, 22 社会 93, 22 理科 72], # (以降、省略)

また、ログのタイムスタンプから、およそ3秒半ほどですべての処理が終わっていることが分かります。つまり、シーケンシャルに行えば18秒掛かる処理を、うまく並行させて3秒半で終わらせているのです。

この3秒半というのは、30人分の情報を取得するのに掛かる3秒 + 最後の1人の成績を取りきるのに掛かる0.5秒と、よく一致しています。

ノンブロッキングな感じで、いいじゃないですか。


3. ノンブロッキングなら、シングルスレッドでも早いのか?

ここまでの処理を見て、ノンブロッキングと非同期(マルチスレッド)の違いがよく分からなくなった方もいるかも知れません。

上で「ノンブロッキングな感じ」と書きましたが、果たしてそのおかげで早かったのか、マルチスレッドの恩恵で早くなったのか、分かりにくいところがあります。


では、シングルスレッドにしてみればどうなるでしょうか。見てみましょう。

シングルスレッドにしたソースコードは次のURLから取得できます。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample3.java


publishOnでスレッドを制御

処理をシングルスレッドにする場合は、Fluxを生成している所でpublishOnメソッドを用いて、スレッドの作成ポリシーを決めます。ここではSchedulers.single() を用いてシングルスレッドで実行することにします。

return Flux.interval(Duration.ofMillis(100))
        .publishOn(Schedulers.single())
        .zipWith(Flux.fromArray(students))
        .map(Tuple2::getT2);

publishOn(Schedulers.single()) の処理を挟んだだけですね。

これをfetchStudents、fetshScoresのそれぞれで行います。


実行してみた

それではシングルスレッドになっているか、実行してログを確認してみましょう。

16:21:49.073 [main] INFO  fetchStudents - onSubscribe(FluxMap.MapSubscriber)
16:21:49.080 [main] INFO  fetchStudents - request(256)
16:21:49.197 [single-1] INFO  fetchStudents - onNext(1 Muto)
16:21:49.202 [single-1] INFO  fetchScores - onSubscribe(FluxMap.MapSubscriber)
16:21:49.202 [single-1] INFO  fetchScores - request(unbounded)
16:21:49.296 [single-1] INFO  fetchStudents - onNext(2 Miyoshi)
# (省略)
16:21:52.501 [single-1] INFO  fetchScores - onNext(29 理科 72)
16:21:52.501 [single-1] INFO  fetchScores - onComplete()
16:21:52.501 [single-1] INFO  fetchScores - onNext(30 社会 93)
16:21:52.603 [single-1] INFO  fetchScores - onNext(30 理科 72)
16:21:52.604 [single-1] INFO  fetchScores - onComplete()
{6 Horiuchi=[6 国語 80, 6 数学 90, 6 英語 85, 6 社会 93, 6 理科 72], # (以降、省略)

一部抜粋ですが、すべての処理が「single-1」スレッドで行われていました。


シングルスレッドで行われていたにも関わらず、処理は3.5秒程度で完了しています。これで「100ミリ秒待つ」というのがノンブロッキングで行われていることが、なんとなく実感できました。


4. ブロッキング処理の場合はどうなる?

さて、ここまでは「ノンブロッキング処理は早くていいね、18秒掛かる処理が3.5秒になったよ」みたいな話でした。

しかし、もしRDBMSのような、ブロッキング処理のあるデータソースから情報を取得しなくてはいけなくなった場合は、どうなるのでしょうか。それを模したコードを書いて検証してみます。


ブロッキング処理にしたソースコードは、ここにあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample4.java


Thread.sleepはブロッキング処理

ブロッキング処理を行うためには、Fluxを作成する処理の中で、Thread.sleepを行うのが良いです。

生徒や成績のFluxを行う処理を、次のように変更します。

Flux<Student> studentFlux = Flux.create(sink -> {
    for (Student student : students) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        sink.next(student);
    }
    sink.complete();
});
return studentFlux;

Flux.createメソッドで、sinkに値を入れていれていく形です。ここでThread.sleepを使うことで(先程までのFlux.intervalと異なり)このFluxの作成処理がブロッキング処理となるんです。ほぅ。


実行してログで確認

これまでの流れ通り、実行してログを確認してみましょう。

# mainスレッドでfetchStudentsの呼び出し
16:38:37.457 [main] INFO  fetchStudents - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.461 [main] INFO  fetchStudents - request(256)

# 1人目のMutoさんの処理をmainメソッドで行い、成績取得もすべてmainメソッド
16:38:37.570 [main] INFO  fetchStudents - onNext(1 Muto)
16:38:37.574 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:37.575 [main] INFO  fetchScores - request(unbounded)
16:38:37.677 [main] INFO  fetchScores - onNext(1 国語 80)
16:38:37.781 [main] INFO  fetchScores - onNext(1 数学 90)
16:38:37.882 [main] INFO  fetchScores - onNext(1 英語 85)
16:38:37.983 [main] INFO  fetchScores - onNext(1 社会 93)
16:38:38.085 [main] INFO  fetchScores - onNext(1 理科 72)
16:38:38.087 [main] INFO  fetchScores - onComplete()
16:38:38.087 [main] INFO  fetchStudents - request(1)

# 2人目のMiyoshiさんの処理もとにかくmainメソッド
16:38:38.193 [main] INFO  fetchStudents - onNext(2 Miyoshi)
16:38:38.195 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:38.195 [main] INFO  fetchScores - request(unbounded)
16:38:38.296 [main] INFO  fetchScores - onNext(2 国語 80)
16:38:38.401 [main] INFO  fetchScores - onNext(2 数学 90)
16:38:38.503 [main] INFO  fetchScores - onNext(2 英語 85)
16:38:38.606 [main] INFO  fetchScores - onNext(2 社会 93)
16:38:38.709 [main] INFO  fetchScores - onNext(2 理科 72)
16:38:38.709 [main] INFO  fetchScores - onComplete()
16:38:38.709 [main] INFO  fetchStudents - request(1)

# 3人目以降も同様
16:38:38.812 [main] INFO  fetchStudents - onNext(3 Matsui)
# (省略)
16:38:54.864 [main] INFO  fetchStudents - request(1)
16:38:54.967 [main] INFO  fetchStudents - onNext(29 Tanaka)
16:38:54.968 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:54.968 [main] INFO  fetchScores - request(unbounded)
16:38:55.072 [main] INFO  fetchScores - onNext(29 国語 80)
16:38:55.176 [main] INFO  fetchScores - onNext(29 数学 90)
16:38:55.281 [main] INFO  fetchScores - onNext(29 英語 85)
16:38:55.386 [main] INFO  fetchScores - onNext(29 社会 93)
16:38:55.488 [main] INFO  fetchScores - onNext(29 理科 72)
16:38:55.488 [main] INFO  fetchScores - onComplete()
16:38:55.488 [main] INFO  fetchStudents - request(1)
16:38:55.592 [main] INFO  fetchStudents - onNext(30 Yagi)
16:38:55.592 [main] INFO  fetchScores - onSubscribe(FluxCreate.BufferAsyncSink)
16:38:55.592 [main] INFO  fetchScores - request(unbounded)
16:38:55.697 [main] INFO  fetchScores - onNext(30 国語 80)
16:38:55.798 [main] INFO  fetchScores - onNext(30 数学 90)
16:38:55.902 [main] INFO  fetchScores - onNext(30 英語 85)
16:38:56.004 [main] INFO  fetchScores - onNext(30 社会 93)
16:38:56.110 [main] INFO  fetchScores - onNext(30 理科 72)
16:38:56.110 [main] INFO  fetchScores - onComplete()
16:38:56.110 [main] INFO  fetchStudents - request(1)
16:38:56.110 [main] INFO  fetchStudents - onComplete()
{5 Iida=[5 国語 80, 5 数学 90, 5 英語 85, 5 社会 93, 5 理科 72], # (以降、省略)

すべての処理がmainスレッドで行われており、時間も18.5秒掛かっています。これはすべての処理がシーケンシャルに行われれば18秒である、という計算と一致します。

いかにReactorを使っていようとも、途中にRDBMSへのJDBCドライバー経由でのアクセスなど、ブロッキング処理が入るとこのようになってしまうのです。


5. ブロッキング処理も、マルチスレッドで高速化

ブロッキング処理を使うと、Reactorを使う意味がないのでしょうか、というとそういうわけでもありません。

Reactorを使って、ブロッキング処理であっても、マルチスレッド処理を書くことができます。


ブロッキング処理をマルチスレッド化したソースコードは、次の場所にあります。

https://github.com/cero-t/reactor-example/blob/master/src/main/java/ninja/cero/example/reactor/ReactorExample5.java


subscribeOnでスレッドを制御

先の章ではFluxを生成する際にpublishOnメソッドを用いてスレッドの制御をしましたが、それと同様に、subscibeする側でsubscribeOnメソッドを用いることでも、スレッドを制御することができるようになります。シングルスレッドで生成されたものを、マルチスレッドで分担して処理するという形になります。


次のように、fetchStudents、fetchScoresの直後にsubscribeOnを渡します。ここで渡しているSchedulers.elastic()は、必要なだけスレッドを起こすというものです。

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.elastic())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.elastic())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これだけでマルチスレッド化ができます。元の流れと大きく変わっていないところがポイントですね。


それでは、実行してログを見てみましょう。

17:35:03.769 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.773 [main] INFO  fetchStudents - request(256)
17:35:03.883 [elastic-2] INFO  fetchStudents - onNext(1 Muto)
17:35:03.897 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:03.898 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.001 [elastic-2] INFO  fetchStudents - onNext(2 Miyoshi)
17:35:04.001 [elastic-3] INFO  fetchScores - onNext(1 国語 80)
17:35:04.001 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.001 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-3] INFO  fetchScores - onNext(1 数学 90)
17:35:04.103 [elastic-2] INFO  fetchStudents - onNext(3 Matsui)
17:35:04.103 [elastic-2] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:35:04.103 [elastic-2] INFO  fetchScores - request(unbounded)
17:35:04.103 [elastic-4] INFO  fetchScores - onNext(2 国語 80)
17:35:04.204 [elastic-3] INFO  fetchScores - onNext(1 英語 85)
17:35:04.204 [elastic-4] INFO  fetchScores - onNext(2 数学 90)
17:35:04.204 [elastic-2] INFO  fetchStudents - onNext(4 Nakamoto)
17:35:04.205 [elastic-5] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:35:07.131 [elastic-5] INFO  fetchScores - onComplete()
17:35:07.136 [elastic-6] INFO  fetchScores - onNext(28 社会 93)
17:35:07.136 [elastic-7] INFO  fetchScores - onNext(29 英語 85)
17:35:07.137 [elastic-8] INFO  fetchScores - onNext(30 数学 90)
17:35:07.241 [elastic-7] INFO  fetchScores - onNext(29 社会 93)
17:35:07.241 [elastic-6] INFO  fetchScores - onNext(28 理科 72)
17:35:07.241 [elastic-8] INFO  fetchScores - onNext(30 英語 85)
17:35:07.242 [elastic-6] INFO  fetchScores - onComplete()
17:35:07.344 [elastic-8] INFO  fetchScores - onNext(30 社会 93)
17:35:07.344 [elastic-7] INFO  fetchScores - onNext(29 理科 72)
17:35:07.345 [elastic-7] INFO  fetchScores - onComplete()
17:35:07.449 [elastic-8] INFO  fetchScores - onNext(30 理科 72)
17:35:07.450 [elastic-8] INFO  fetchScores - onComplete()
{30 Yagi=[30 国語 80, 30 数学 90, 30 英語 85, 30 社会 93, 30 理科 72], # (以降、省略)

詳細な説明は割愛しますが、ノンブロッキングで行っていたときと同じような処理の流れとなりました。処理は3.7秒で、ノンブロッキングのときと大差はありません。また、このログで見えているだけでも「elastic-8」まであり、8スレッド使っていることが分かります。


elastic vs parallel

上の例では8スレッド使っていましたが、Fluxを生成する際のsleep時間などを少し調整すると、優に数十スレッドを使ってしまいました。これはこれで、スレッドを使いすぎる問題が起きかねません。


もう少し加減してスレッドを使って欲しい場合には、Schedulers.elastic()ではなく、Schedulers.parallel()を使います。こちらはスレッド数をCPU数分までに制限します(ただし最低は4)

Mono<Map<Student, List<Score>>> monoMap = fetchStudents("sg")
        .subscribeOn(Schedulers.parallel())
        .log("fetchStudents")
        .flatMap(student -> fetchScores(2017, student.id)
                .subscribeOn(Schedulers.parallel())
                .log("fetchScores")
                .collectList()
                .map(scores -> Tuples.of(student, scores))
        )
        .collectMap(Tuple2::getT1, Tuple2::getT2);

Map<Student, List<Score>> map = monoMap.block();
System.out.println(map);

これを実行した結果、こうなりました。

17:40:42.309 [main] INFO  fetchStudents - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.312 [main] INFO  fetchStudents - request(256)
17:40:42.427 [parallel-1] INFO  fetchStudents - onNext(1 Muto)
17:40:42.438 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.438 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.542 [parallel-1] INFO  fetchStudents - onNext(2 Miyoshi)
17:40:42.542 [parallel-2] INFO  fetchScores - onNext(1 国語 80)
17:40:42.542 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.542 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.646 [parallel-2] INFO  fetchScores - onNext(1 数学 90)
17:40:42.646 [parallel-3] INFO  fetchScores - onNext(2 国語 80)
17:40:42.646 [parallel-1] INFO  fetchStudents - onNext(3 Matsui)
17:40:42.648 [parallel-1] INFO  fetchScores - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
17:40:42.648 [parallel-1] INFO  fetchScores - request(unbounded)
17:40:42.751 [parallel-2] INFO  fetchScores - onNext(1 英語 85)
17:40:42.751 [parallel-3] INFO  fetchScores - onNext(2 数学 90)
17:40:42.751 [parallel-1] INFO  fetchStudents - onNext(4 Nakamoto)
17:40:42.751 [parallel-4] INFO  fetchScores - onNext(3 国語 80)
# (省略)
17:40:47.023 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.125 [parallel-1] INFO  fetchScores - onNext(16 国語 80)
17:40:47.228 [parallel-1] INFO  fetchScores - onNext(16 数学 90)
17:40:47.333 [parallel-1] INFO  fetchScores - onNext(16 英語 85)
17:40:47.437 [parallel-1] INFO  fetchScores - onNext(16 社会 93)
17:40:47.542 [parallel-1] INFO  fetchScores - onNext(16 理科 72)
17:40:47.543 [parallel-1] INFO  fetchScores - onComplete()
17:40:47.648 [parallel-1] INFO  fetchScores - onNext(20 国語 80)
17:40:47.750 [parallel-1] INFO  fetchScores - onNext(20 数学 90)
17:40:47.853 [parallel-1] INFO  fetchScores - onNext(20 英語 85)
17:40:47.955 [parallel-1] INFO  fetchScores - onNext(20 社会 93)
17:40:48.060 [parallel-1] INFO  fetchScores - onNext(20 理科 72)
17:40:48.060 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.163 [parallel-1] INFO  fetchScores - onNext(24 国語 80)
17:40:48.264 [parallel-1] INFO  fetchScores - onNext(24 数学 90)
17:40:48.365 [parallel-1] INFO  fetchScores - onNext(24 英語 85)
17:40:48.468 [parallel-1] INFO  fetchScores - onNext(24 社会 93)
17:40:48.569 [parallel-1] INFO  fetchScores - onNext(24 理科 72)
17:40:48.569 [parallel-1] INFO  fetchScores - onComplete()
17:40:48.671 [parallel-1] INFO  fetchScores - onNext(28 国語 80)
17:40:48.775 [parallel-1] INFO  fetchScores - onNext(28 数学 90)
17:40:48.879 [parallel-1] INFO  fetchScores - onNext(28 英語 85)
17:40:48.985 [parallel-1] INFO  fetchScores - onNext(28 社会 93)
17:40:49.087 [parallel-1] INFO  fetchScores - onNext(28 理科 72)
17:40:49.087 [parallel-1] INFO  fetchScores - onComplete()
{9 Mizuno=[9 国語 80, 9 数学 90, 9 英語 85, 9 社会 93, 9 理科 72], 1 # (以降、省略)

スレッド名が parallel-1 から parallel-4 までの4スレッドになりました。4スレッド使ってできる範囲で処理を分担し、最後は残った処理を1スレッドで片付けている、という形です。処理全体には7秒弱掛かっており、スレッド生成し放題のelasticに比べれば2倍近く時間が掛かっていますが、単一スレッドで行うよりもレスポンスは半分以下に短縮されています。


もちろんこの処理はReactorを使わずともJava標準のExecutorServiceなり何なりを使っても同じことができます。ただ、ノンブロッキングな処理も扱えるReactorと、統一した書き方ができるところがメリットの一つになると思います。いや、Reactorでブロッキング処理を扱うようなものを書けること自体、おまけみたいなものかも知れませんが。


ちなみにelasticやparallel以外にも、ExecutorServiceを渡すなどすることもできますが、shutdownのタイミングなど考えると管理が面倒なので、普通にelasticかparallelを使っておくのが良いと私は思っています。


まとめ

Reactorを使って、ノンブロッキング処理、ブロッキング処理を、それぞれどのように扱うかを確認しました。


特にデータアクセスや、マイクロサービス呼び出しのような待ち時間の多いシチュエーションにおいて、うまくノンブロッキング処理にできれば、レスポンスタイムを短縮できるんじゃないかなと思います。

もちろん、その分データソース側に負荷が掛かるため、データソース側が十分にスケールアウトできることが前提となりますが、現代であればそういう環境は手に入りやすいため、取り組む価値があると見ています。


今回のケースがReactorの最適なユースケースだというわけではないですが、業務において、このような使い方もできるという一例として、参考にしてもらえばと思います!

Enjoy, Reactor!

2016-01-04

[]続けて、JavaNode.jsとGoで外部ライブラリを使わないベンチマークをしてみた。

前回の記事では、DynamoDBを呼び出す処理の簡単なベンチマークを行いました。

http://d.hatena.ne.jp/cero-t/20160101/1451665326


ライブラリを伴ったときの初回起動ではJavaが不利な感じの結果になりましたが、ライブラリを使わなければどのような差が出るのか、改めて確認してみました。


今回のベンチマークは、フィボナッチ数列の38番目を取るというものです。

ソースコードは前回と同じリポジトリに追加しています。

https://github.com/cero-t/lambda-benchmark


結果
回数JavaNode.jsGo
1回目2.9199.284.14
2回目2.6789.3024.141
3回目2.5799.3964.14

時間はいずれも秒。ミリ秒より下の精度は切り捨て。


今回はいずれもメモリ128MBで、同じ性能の環境で実施しました。

なおBilled Durationは、ほぼこの結果の2倍程度(=ウォームアップのための実行分)になっていて、オーバーヘッドはあまり感じられませんでした。


メモリの実使用量は

Java : 12MB

Node.js : 9MB

Go : 10MB

でした。


Python書けないマンなので、Pythonは計測していません。

他のサイトのベンチマークを見る限りは、数十秒から数分ぐらい掛かりそうな気がします。

考察

Javaが一番早く、Goはその1.5倍ぐらい、Node.jsJavaの3倍ぐらい時間が掛かるという結果でした。

ウォームアップをもうちょっと取ればJavaJITコンパイルが効くのかも知れませんが、Lambdaで動かすという性質上、本来ならウォームアップなしで動かしたいぐらいなので、これぐらいの比較で十分かと思います。


ちなみに僕の手元のMacBook Pro(Late 2013 / Core i5 2.4GHz)だと、Javaで0.28秒ぐらい、Goで0.33秒ぐらいでしたから、メモリ128MBのLambdaの性能はその1/10ぐらいということになりますね。


もちろん今回は対象がフィボナッチ数の計算という一つの処理なので、結果的にJavaが良かっただけかも知れません。様々なアルゴリズムで言語のベンチマークを行っているサイトでは、Goの方が有利な結果もいくつか出ています。

https://benchmarksgame.alioth.debian.org/u64q/go.html


ということで、ライブラリのローディングさえなければ、Javaだって悪くないということが分かりました。

まぁ実際、ライブラリを使わないことなんて、ほぼ考えられないですけどね!

2016-01-01

[]AWS LambdaでJavaNode.jsとGoの簡易ベンチマークをしてみた。

あけましておめでとうございます!

現場でいつも締め切りに追われデスマーチ化している皆様方におかれましては、年賀状がデスマーチ化していたのではないかと憂慮しておりますが、いかがお過ごしですか。

エンジニアの鑑みたいな僕としましては、年始の挨拶はSNSでサクッと済ませ、年末年始は紅白など見ながらAWS Lambdaのソースコードを書いていました。


ということで、Lambda。

Lambdaを書く時に最初に悩むのは、どの言語を選択するか、なのです。


まず手を出すのは、サクッと書けるNode.js

ただNode.jsの非同期なプログラミングになかなか馴染めず、わからん殺しをされてうんざりしていました。みんなどうしているのよとtwitterで問いかけたところ @ からPromiseを使うべしと教わり、なるほど確かにこれは便利ですナと思いつつも、これが標準で使えないぐらいLambdaのNode.jsが古いところにまたうんざりしました。


では手に馴染んでるJavaを使ってみたらどうかと思ったら、メモリイーターだし、なんとなくパフォーマンスが悪い感じでした。詳しいことは後ほどベンチマーク結果で明らかになるわけですが。


それなら消去法で残ったPythonなのですが、僕マジPython触ったことないレベルであり、これは若者たちに任せることにしているので、Pythonも選択肢から消えて。


本来、Lambdaみたいな用途にはGoが向いているはずなのだけど、いつ使えるようになるんだろうなぁなどと思って調べてみたら、Node.jsからGoを呼び出すというテクを見つけて、こりゃいいやとなりました。

http://blog.0x82.com/2014/11/24/aws-lambda-functions-in-go/


ただGoを呼ぶにしてもNode.jsを経由するためのオーバーヘッドはあるわけで、じゃぁ実際にどれぐらいパフォーマンス影響があるのか、調べてみることにしました。


処理の中身

簡単に試したいだけだったので、数行で済む程度のごく簡単な処理を書いてベンチマークすることにしました。


1. 引数で渡されたJSONをパースして、中身を表示する

2. DynamoDBから1件のデータを取得する


当初は1だけだったのですが、あまり性能に差が出なかったので2を追加した感じです。そんな経緯のベンチマークなので、実装も雑ですが、とりあえずソースをGitHubに置いておきました。

https://github.com/cero-t/lambda-benchmark


実行結果
回数Java(1)Java(2)Node.jsGo
1回目218006700900600
2回目13007300800500
3回目19000500200500
4回目10001300200400
5回目500200400500
6回目400100200500
7回目400300400500

時間はいずれもミリ秒のBilled duration


Java(1) : メモリ192MB。処理中にAmazonDynamoDBClientを初期化

Java(2) : メモリ192MB。処理前に(コンストラクタで)AmazonDynamoDBClientを初期化

Node.js : メモリ128MB

Go : メモリ128MB


メモリの実使用量は

Java : 68MB

Node.js : 29MB

Go : 15MB

でした。


考察など

正味の話、Lambdaで行っている処理などほとんどないので、性能的には大差ないかと思っていたのですが、思ったより特性が出ました。これは処理時間というよりは、関連ライブラリのロード時間な気がしますね。


Javaは最初の数回が20秒、実装を改善しても7秒とかなり遅かったのですが、ウォーミングアップが終わった後は200msぐらいまで早くなりました。呼ばれる頻度が高い処理であれば良いのでしょうけど、たまに呼ばれるような処理では、この特性はネックになる気がします。


ちなみにJavaだけは192MBで計測しましたが、最初にメモリ128MBで試したところ、30秒ぐらいで処理が強制的に中断されたり、60秒でタイムアウトするなど、計測になりませんでした。こういう所を見ても、Javaを使う時にはメモリを多め(CPU性能はメモリと比例して向上)にしなくてはいけない感じでした。


Node.jsも少しはウォーミングアップで性能が変わりますが、最初から性能は良い感じです。


Goを使った場合は性能が安定しており、ウォーミングアップしても性能が変わりません。メモリ使用量が少ないのも良いですね。Node.jsから呼び出す際のオーバーヘッドがあるせいか、性能的にはウォーミングアップ後のJavaNode.jsに一歩劣る感じでした。


なお、Pythonの実装をしてくれる人がいらっしゃれば、プルリクしていただければ、データを追加したいと思います。


結論

繰り返しになりますが、雑なベンチマークなので特性の一部しか掴んでないと思います。

そもそもJavaだけメモリが1.5倍になっている(=CPU性能も1.5倍になっている)ので公平ではないですし。


ただ「たまに行う処理」を「少ないリソース」で行うという観点では、Goで実装するのが良さそうです。GoはそのうちLambdaでも正式サポートされそうですしね。

きちんとしたベンチマークは、また別の機会にじっくり行ってみたいと思います。


ということで、Goを使う大義名分ができたベンチマークでした!

See you!

2014-12-15

[][]SqlTemplateっていうJdbcTemplateのラッパーを作ってみました。

SQLが書きたいんや!」という想いのもと、

Spring Bootと組み合わせて簡単に使える、

JdbcTemplateのラッパーライブラリを勢いで作ってみました。


GitHubに置いています。

https://github.com/cero-t/sqltemplate


JdbcTemplate / NamedParameterJdbcTemplateをベースにして、

 1. SQLファイルが使えること

 2. Date and Time APIに対応すること

 3. publicフィールドに対応すること

 4. APIが今風であること

の4つを目的にして作りました。


それならMirageでいいんじゃね? という想いは消えませんが、

Spring標準機能のみを使うことによる、政治的な使いやすさを取りました。


作りましたって言っても、ただのラッパーですので

ソースコードはすっごく小さくて、空行とコメントを入れても600行ぐらいしかありません。

ジェバンニでなくとも一晩でやってくれるぐらいのサイズです。


利用イメージ

exampleのプロジェクトも作っておきました。

https://github.com/cero-t/sqltemplate/tree/master/sqltemplate-example


使う側のソースコードは、こんな感じになります。

@Component
public class SampleProcess {
    @Autowired
    SqlTemplate query;

    public void process() {
        List<Emp> emps = query.forList("sql/selectAll.sql", Emp.class);
        emps.forEach(e -> System.out.println(e.ename));

        Emp emp = query.forObject("sql/selectByEmpno.sql", Emp.class, 7839);
        System.out.println(emp.ename);

        Map<String, Object> condition = new HashMap<>();
        condition.put("deptno", 30);
        condition.put("job", "SALESMAN");
        emps = query.forList("sql/selectByCondition.sql", Emp.class, condition);
        emps.forEach(e -> System.out.println(e.ename));
    }
}

forObjectで1件検索、forListで複数件検索。

第一引数がSQLファイル名で、第二引数が戻り値の型、

第三引数以降がSQLにバインドするパラメータです。


ちなみにIntelliJを使っていると、ファイル名にカーソルをあわせて

Ctrl (Command) + クリックでSQLファイルを開けるのが嬉しいですね。


SQLファイルは、こんな感じになります。

select
    *
from
    emp
inner join dept
    on emp.deptno = dept.deptno
where
    dept.deptno = :deptno
    and emp.job = :job

第三引数に指定したMapやEntityの値を、SQLのパラメータとしてバインドします。

内部的にはNamedParameterJdbcTemplateに処理を委譲しているだけです。


名前を指定せずに ? を使うこともできます。

select
    *
from
    emp
where
    empno = ?

第三引数以降に指定した任意の数の基本型(String、Date、Number)をバインドします。

こちらは内部的にJdbcTemplateに委譲しているだけです。


使い方

使うための設定は一つだけ。

@Configurationアノテーションをつけたクラスに

SqlTemplateを返すメソッドを作り、@Beanアノテーションをつけます。

@Bean
SqlTemplate sqlTemplate(JdbcTemplate jdbcTemplate, NamedParameterJdbcTemplate namedParameterJdbcTemplate) {
    return new SqlTemplate(jdbcTemplate, namedParameterJdbcTemplate);
}

この初期化の仕方は @ がpullリクエストで教えてくれました。

ありがとう!


ちなみにSqlTemplateのコンストラクタの第三引数には、

SQLファイルを読み込む際のテンプレートエンジンを指定することができます。

たとえばここで2-way SQLパーサーを指定すれば、

2-way SQLにも対応できるというスンポーです。


現時点でもFreeMarkerを使うことができるようにしているんですが、

一度も動作確認してないので、動くかどうか分かりません。てへぺろ。


いまあるAPI一覧

用意したメソッドの一覧は、以下になります。

<T> T forObject(String fileName, Class<T> clazz, Object... args)
<T> T forObject(String fileName, Class<T> clazz, Map<String, Object> params)
<T> T forObject(String fileName, Class<T> clazz, Object entity)

<T> List<T> forList(String fileName, Class<T> clazz, Object... args)
<T> List<T> forList(String fileName, Class<T> clazz, Map<String, Object> params)
<T> List<T> forList(String fileName, Class<T> clazz, Object entity)

int update(String fileName, Map<String, Object> params)
int update(String fileName, Object entity)
int update(String fileName, Object... args)

戻り値をMapにする「forMap」とか

実案件では欠かせない「batchUpdate」は、まだ作っていません。

委譲するだけなので、作っちゃえばいいんですけどね。


制限事項的なやつ

とりあえずコンセプト実証した程度なので、色々できません。

 1. 上に書いた通り、forMapとbatchUpdateがありません。

 2. JSR-310を使っているので、Java8でしか動きません。

 3. publicフィールドのないgetter/setterベースのJavaBeansは使えません。

 4. パッケージ名が変です。

 5. mvnリポジトリに置いてないです。

 6. README.mdちゃんと書け。


ひとまずは「こんなコンセプトでサクッとできたよ! 」っていう位置づけです。

ご自由に参考にしてください!

2014-12-14

[][]MyBatisをやめて、JdbcTemplateを使うわ。

以前のエントリーで、DBアクセスにはMyBatisを選んだと書きました。

http://d.hatena.ne.jp/cero-t/20141212/1418339302


そしたら渋谷JavaのLTで @ さんに拾ってもらっちゃいました。

http://www.slideshare.net/yukung/j-ooq-shibuyajava9


そんなこともあってMyBatisイチオシなエンジニアに思われたかも知れませんが、

ごめんなさい、

あの記事はあくまでも伏線で、僕、もうMyBatis使ってないんです!


MyBatisを使って半月ぐらいして、

どうにも我慢できなくないことが出てきました。


1. Spring Bootとの連携がイマイチ

Spring BootでMyBatisを使おうとすると、

前回のエントリーで書いた通り、ちょっと設定ファイルが必要になったり、

その設定ファイルの読み込みに失敗して、謎の無限ループが起きることがあるなど、

やや不可解なことがあります。


設定ファイルの問題というなら、設定ファイルを使わず、

ソースコードだけで設定できれば良いのですが、その方法も、結局よく分かりませんでした。


まぁMyBatisAutoConfigurationとかができてからが本番というか、

ないなら自分で作るゾ、ぐらいの勢いで挑む必要があるように思いました。


2. XMLSQLを書くと、インデントががが。

じゃぁMyBatisAutoConfigurationを作れば良いわけですが、

そういう気になれなかったのは、やっぱり、

XMLファイルにSQLを書くのが嫌だったから、でした。


だって、標準的なフォーマッタでフォーマットした瞬間、

インデントが全部消えるじゃないですか。


自動フォーマットをこよなく愛する僕としては、

フォーマットする手段がないというのは、いただけませんでした。


3. そこでJdbcTemplateですよ

じゃぁ何を使ってるのか?

で、結局、Springに標準でついているJdbcTemplateを使っています。


無設定で使えて、値のバインドは適切にできて、

NamedParameterJdbcTemplateならSQL変数が使えて、

Entityにはアノテーションとか付けなくて良くて、

余計な機能はなくて、シンプルに使える感じでした。


ただ、もちろん、JdbcTemplateも欠点だらけです。

 ・publicフィールドに対応していない

 ・Java8のLocalDateなど、Date and Time API (JSR-310) に対応していない

 ・RowMapperを求めるAPIになっているなど、APIに古くさいものが混ざっている

 ・そもそも、SQLファイルを読み込む機能なんてない!


なので、これらを補うような

独自ラッパーでラッピングして使うことにしました。


詳しい紹介はまた改めて書くとして、モノはここに置いています。

https://github.com/cero-t/sqltemplate


独自ライブラリではなく、Spring標準のJdbcTemplateを

ちょっとだけラッピングして使っているだけなので、

政治的な意味で使いやすいかな、と思っています。