谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2016-01-06

[]AWS Lambda + Javaは、なぜ1回目と3回目の処理が重いのか?

以前のエントリーで、AWS LambdaでJavaを使ってDynamoDBを呼び出した際に、初回起動にとても時間が掛かったという話を書きました。

http://d.hatena.ne.jp/cero-t/20160101/1451665326


今回は、この辺りの原因をもう少し追求してみます。


なぜ1回目と3回目のアクセスが遅いのか?

AWS Lambdaの中身はよく知りませんが、おそらく、アップロードしたモジュールTomcatみたいなコンテナとして起動させて、外部からコールしているんだろうと予想しました。それであれば、2回目以降のアクセスが早くなることは理解ができます。

ただ、1回目と3回目だけが極端に遅くて、2回目、4回目以降は早くなるというところは腑に落ちません。


その辺りを調べるべく、staticなカウンタを使って、値がどんな風に変化するかを調べてみました。

こんなソースコードです。

public class Pid {
    static AtomicLong counter = new AtomicLong();

    public String myHandler() {
        long count = counter.incrementAndGet();
        String name = ManagementFactory.getRuntimeMXBean().getName();
        System.out.println("Name: " + name);
        System.out.println("Count: " + count);
        return "SUCCESS";
    }
}

出力された結果は、次のようになりました。

回数NameCount
1回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal1
2回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal2
3回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal1
4回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal2
5回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal3
6回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal4
7回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal5
8回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal3
9回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal4
10回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal5
11回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal6
12回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal7

サーバIPアドレスが2種類あり、それぞれのサーバで、1から順番にカウントアップしていることが分かります。

なるほど、2台のサーバでロードバランシングしているのだと。そのため、それぞれのサーバの初回起動である、1回目と3回目の処理に時間が掛かるのですね。なかなか納得いく結果でした。

ちなみにロードバランシングは毎回このような結果になるわけではなく、1回目と2回目がそれぞれ別のサーバに行く(=処理に時間が掛かる)こともあります。


どんなコンテナを使っているのか?

先ほど「Tomcatみたいなコンテナ」を使っているんじゃないかと推測しましたが、実際、どんなコンテナを使っているのでしょうか。スレッドダンプを取って、確かめてみました。


こんなコードです。

public class StackTrace {
    public String myHandler() {
        new Exception("For stack trace").printStackTrace();
        Arrays.stream(ManagementFactory.getThreadMXBean().dumpAllThreads(true, true))
                .forEach(System.out::println);
        return "SUCCESS";
    }
}

結果、こうなりました。

java.lang.Exception: For stack trace
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:8)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at lambdainternal.EventHandlerLoader$PojoMethodRequestHandler.handleRequest(EventHandlerLoader.java:434)
	at lambdainternal.EventHandlerLoader$PojoHandlerAsStreamHandler.handleRequest(EventHandlerLoader.java:365)
	at lambdainternal.EventHandlerLoader$2.call(EventHandlerLoader.java:967)
	at lambdainternal.AWSLambda.startRuntime(AWSLambda.java:231)
	at lambdainternal.AWSLambda.<clinit>(AWSLambda.java:59)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at lambdainternal.LambdaRTEntry.main(LambdaRTEntry.java:93)

"Signal Dispatcher" Id=4 RUNNABLE

"Finalizer" Id=3 WAITING on java.lang.ref.ReferenceQueue$Lock@19bb089b
	at java.lang.Object.wait(Native Method)
(略)

"Reference Handler" Id=2 WAITING on java.lang.ref.Reference$Lock@4563e9ab
	at java.lang.Object.wait(Native Method)
(略)

"main" Id=1 RUNNABLE
	at sun.management.ThreadImpl.dumpThreads0(Native Method)
	at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446)
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:9)
(略)

何やらシンプルな独自コンテナを使っているみたいです。何度か実行してみても結果は同じでした。

ソースコードがないので推測になりますが、アップロードされたzipをロードして起動する独自コンテナがあり、外部からAPIコールされた際にAWSLambdaクラスあたりが処理を受け取って、zip内のハンドラを呼び出しているのでしょう。


なぜコンストラクタで処理すると早いの?

そういえば、もう一つ、謎な挙動がありました。

それは、Credentialsを取るという重めの処理をコンストラクタで実行すれば、処理時間がかなり短くなるというものです。


ハンドラの中でCredentialsを取ると、実測値で24秒ぐらい、課金対象値で22秒ぐらいでした。

コンストラクタでCredentialsを取っておくと、実測値で8秒ぐらい、課金対象値で6秒ぐらいでした。

ここでいう実測値とは、手元のストップウォッチを使って計測したという意味です。


ここから推測できることは、コンストラクタは事前に処理されていて、そこは課金対象外になるのかも知れません。


・・・あれ、それなら、コンストラクタで重い処理をがっつり走らせて、ハンドラでその結果を取り出せば、課金額を抑えられるじゃないですか?

ということで、ハンドラの中で10秒スリープする場合と、コンストラクタスリープした場合の比較をしてみました。


こんな2つのクラスで試してみます。

public class Wait1 {
    static long origin = System.currentTimeMillis();

    public String myHandler() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
}
public class Wait2 {
    static long origin = System.currentTimeMillis();

    public Wait2() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果は、、、


Wait1(ハンドラの中でsleep)

回数Before waitAftre wait課金対象値実測値
1回目3901039010100ms10秒
2回目3871038710100ms10秒
3回目209173091710100ms10秒
4回目688527885210100ms10秒

きっちり10秒sleepして、課金対象値もそのオーバーヘッド分ぐらい。ストップウォッチで計測した値も同じく10秒ぐらいになりました。


Wait2(コンストラクタでsleep)

回数Before waitAftre wait課金対象値実測値
1回目3381033814800ms25秒
2回目--100ms1秒以下
3回目2611035814800ms25秒
4回目--100ms1秒以下

えーっ、sleepは10秒だったのに、なぜか15秒分ぐらい課金されてしまい、ストップウォッチで計測すると25秒と、えらく時間が掛かりました。これは謎な挙動です。

2回目や4回目ではインスタンス生成が終わっているので、Before waitやAfter waitが出力されず、処理がすぐに終わるというのは納得ですが。


どうして、こんなことが起きるんでしょうか。

不思議に思って、CloudWatch Logsのログを確認してみると・・・

Before wait: 17 
START RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f Version: $LATEST 
Before wait: 249 
After wait: 10250 
Called: 14408
END RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f 
REPORT RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f	Duration: 14531.57 ms	Billed Duration: 14600 ms Memory Size: 128 MB Max Memory Used: 27 MB	

最初のBefore waitの後にAfter waitがなく、Lambdaの処理がSTARTした後に再度Before waitが呼ばれ、After waitした後に、4秒ほど待ってから、ハンドラ処理が実行されてCalledが呼ばれていました。

なるほど、つまりこういうことでしょうか。


1. コンストラクタが実行され、10秒sleepの途中でタイムアウトして強制的に処理が止められ、インスタンス生成を諦めた(プロセスごと破棄された?)

2. 改めてコンストラクタが実行され、10秒sleepした。

3. AWS Lambda内の処理か何かで4秒ぐらい処理が掛かった。

4. ハンドラが実行された。

5. 2〜4の間が課金対象となり、15秒弱となった。

6. ストップウォッチで計測した時間は1から5までの間なので、25秒弱となった。


要するに、コンストラクタで重い処理を行うような悪いことを考える人への対策として、コンストラクタは一定時間で(おそらく10秒きっかりで)タイムアウトして、いったんプロセスは破棄される。

その後、改めてコンストラクタの処理がタイムアウト関係なく実行されたうえで、AWS Lambdaの内部処理と、ハンドラ処理が行われ、すべての処理が課金対象となる、ということころでしょうか。


コンストラクタの処理が短い場合は、どうなるの?

ということで、sleepの時間を短くして、再挑戦してみます。

public class Wait3 {
    static long origin = System.currentTimeMillis();

    public Wait3() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(2000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果。

回数Before WaitAfter wait課金対象値実測値
1回目--100ms3秒
2回目--100ms1秒以内
3回目--100ms1秒以内

CloudWatch Logsでの出力

Before wait: 25 
After wait: 2026 
START RequestId: xxx Version: $LATEST 
Called: 2389 
END RequestId: xxx 
REPORT RequestId: xxx	Duration: 97.52 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	 
START RequestId: yyy Version: $LATEST 
Called: 16385 
END RequestId: yyy 
REPORT RequestId: yyy	Duration: 6.04 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	

今度はSTARTの前に、きちんとBefore waitもAfter waitも出力され、ハンドラ処理のみが課金対象となっていました。


まとめ

ここまでの話をまとめると、AWS Lambdaは・・・


1. 複数台のサーバで処理されるため、それぞれのサーバでの初回起動時には処理時間が掛かる。

2. 独自のコンテナを利用して、モジュールデプロイしている。

3. コンストラクタの処理が軽い場合は、ハンドラ内の処理だけが課金対象となる。

4. コンストラクタの処理が重い場合は、コンストラクタの処理 + 5秒弱 + ハンドラ内の処理が課金対象となる。


ということですね。


・・・とは言え、Credentialsの処理をコンストラクタで行った場合に、実測値まで早くなる辺りは、少しだけ不可解です。というか、Credentialsの取得処理が重いこと自体が不可解なのですが。

この辺りはもう少し追試験をしてみれば解析できそうですが、長くなるので、今回はこの辺りまでにしたいと思います。


いやー、Lambdaさん、なかなかよく考えられてますね!

2016-01-04

[]続けて、JavaNode.jsとGoで外部ライブラリを使わないベンチマークをしてみた。

前回の記事では、DynamoDBを呼び出す処理の簡単なベンチマークを行いました。

http://d.hatena.ne.jp/cero-t/20160101/1451665326


ライブラリを伴ったときの初回起動ではJavaが不利な感じの結果になりましたが、ライブラリを使わなければどのような差が出るのか、改めて確認してみました。


今回のベンチマークは、フィボナッチ数列の38番目を取るというものです。

ソースコードは前回と同じリポジトリに追加しています。

https://github.com/cero-t/lambda-benchmark


結果
回数JavaNode.jsGo
1回目2.9199.284.14
2回目2.6789.3024.141
3回目2.5799.3964.14

時間はいずれも秒。ミリ秒より下の精度は切り捨て。


今回はいずれもメモリ128MBで、同じ性能の環境で実施しました。

なおBilled Durationは、ほぼこの結果の2倍程度(=ウォームアップのための実行分)になっていて、オーバーヘッドはあまり感じられませんでした。


メモリの実使用量は

Java : 12MB

Node.js : 9MB

Go : 10MB

でした。


Python書けないマンなので、Pythonは計測していません。

他のサイトのベンチマークを見る限りは、数十秒から数分ぐらい掛かりそうな気がします。

考察

Javaが一番早く、Goはその1.5倍ぐらい、Node.jsJavaの3倍ぐらい時間が掛かるという結果でした。

ウォームアップをもうちょっと取ればJavaJITコンパイルが効くのかも知れませんが、Lambdaで動かすという性質上、本来ならウォームアップなしで動かしたいぐらいなので、これぐらいの比較で十分かと思います。


ちなみに僕の手元のMacBook Pro(Late 2013 / Core i5 2.4GHz)だと、Javaで0.28秒ぐらい、Goで0.33秒ぐらいでしたから、メモリ128MBのLambdaの性能はその1/10ぐらいということになりますね。


もちろん今回は対象がフィボナッチ数の計算という一つの処理なので、結果的にJavaが良かっただけかも知れません。様々なアルゴリズムで言語のベンチマークを行っているサイトでは、Goの方が有利な結果もいくつか出ています。

https://benchmarksgame.alioth.debian.org/u64q/go.html


ということで、ライブラリのローディングさえなければ、Javaだって悪くないということが分かりました。

まぁ実際、ライブラリを使わないことなんて、ほぼ考えられないですけどね!

2016-01-01

[]AWS LambdaでJavaNode.jsとGoの簡易ベンチマークをしてみた。

あけましておめでとうございます!

現場でいつも締め切りに追われデスマーチ化している皆様方におかれましては、年賀状デスマーチ化していたのではないかと憂慮しておりますが、いかがお過ごしですか。

エンジニアの鑑みたいな僕としましては、年始の挨拶はSNSでサクッと済ませ、年末年始は紅白など見ながらAWS Lambdaのソースコードを書いていました。


ということで、Lambda。

Lambdaを書く時に最初に悩むのは、どの言語を選択するか、なのです。


まず手を出すのは、サクッと書けるNode.js

ただNode.jsの非同期なプログラミングになかなか馴染めず、わからん殺しをされてうんざりしていました。みんなどうしているのよとtwitterで問いかけたところ @ からPromiseを使うべしと教わり、なるほど確かにこれは便利ですナと思いつつも、これが標準で使えないぐらいLambdaのNode.jsが古いところにまたうんざりしました。


では手に馴染んでるJavaを使ってみたらどうかと思ったら、メモリイーターだし、なんとなくパフォーマンスが悪い感じでした。詳しいことは後ほどベンチマーク結果で明らかになるわけですが。


それなら消去法で残ったPythonなのですが、僕マジPython触ったことないレベルであり、これは若者たちに任せることにしているので、Pythonも選択肢から消えて。


本来、Lambdaみたいな用途にはGoが向いているはずなのだけど、いつ使えるようになるんだろうなぁなどと思って調べてみたら、Node.jsからGoを呼び出すというテクを見つけて、こりゃいいやとなりました。

http://blog.0x82.com/2014/11/24/aws-lambda-functions-in-go/


ただGoを呼ぶにしてもNode.jsを経由するためのオーバーヘッドはあるわけで、じゃぁ実際にどれぐらいパフォーマンス影響があるのか、調べてみることにしました。


処理の中身

簡単に試したいだけだったので、数行で済む程度のごく簡単な処理を書いてベンチマークすることにしました。


1. 引数で渡されたJSONパースして、中身を表示する

2. DynamoDBから1件のデータを取得する


当初は1だけだったのですが、あまり性能に差が出なかったので2を追加した感じです。そんな経緯のベンチマークなので、実装も雑ですが、とりあえずソースをGitHubに置いておきました。

https://github.com/cero-t/lambda-benchmark


実行結果
回数Java(1)Java(2)Node.jsGo
1回目218006700900600
2回目13007300800500
3回目19000500200500
4回目10001300200400
5回目500200400500
6回目400100200500
7回目400300400500

時間はいずれもミリ秒のBilled duration


Java(1) : メモリ192MB。処理中にAmazonDynamoDBClientを初期化

Java(2) : メモリ192MB。処理前に(コンストラクタで)AmazonDynamoDBClientを初期化

Node.js : メモリ128MB

Go : メモリ128MB


メモリの実使用量は

Java : 68MB

Node.js : 29MB

Go : 15MB

でした。


考察など

正味の話、Lambdaで行っている処理などほとんどないので、性能的には大差ないかと思っていたのですが、思ったより特性が出ました。これは処理時間というよりは、関連ライブラリのロード時間な気がしますね。


Javaは最初の数回が20秒、実装を改善しても7秒とかなり遅かったのですが、ウォーミングアップが終わった後は200msぐらいまで早くなりました。呼ばれる頻度が高い処理であれば良いのでしょうけど、たまに呼ばれるような処理では、この特性はネックになる気がします。


ちなみにJavaだけは192MBで計測しましたが、最初にメモリ128MBで試したところ、30秒ぐらいで処理が強制的に中断されたり、60秒でタイムアウトするなど、計測になりませんでした。こういう所を見ても、Javaを使う時にはメモリを多め(CPU性能はメモリと比例して向上)にしなくてはいけない感じでした。


Node.jsも少しはウォーミングアップで性能が変わりますが、最初から性能は良い感じです。


Goを使った場合は性能が安定しており、ウォーミングアップしても性能が変わりません。メモリ使用量が少ないのも良いですね。Node.jsから呼び出す際のオーバーヘッドがあるせいか、性能的にはウォーミングアップ後のJavaNode.jsに一歩劣る感じでした。


なお、Pythonの実装をしてくれる人がいらっしゃれば、プルリクしていただければ、データを追加したいと思います。


結論

繰り返しになりますが、雑なベンチマークなので特性の一部しか掴んでないと思います。

そもそもJavaだけメモリが1.5倍になっている(=CPU性能も1.5倍になっている)ので公平ではないですし。


ただ「たまに行う処理」を「少ないリソース」で行うという観点では、Goで実装するのが良さそうです。GoはそのうちLambdaでも正式サポートされそうですしね。

きちんとしたベンチマークは、また別の機会にじっくり行ってみたいと思います。


ということで、Goを使う大義名分ができたベンチマークでした!

See you!

2015-12-12

[Java]Stream APIをつくろう

最近、上司からのパワハラではなく、部下からの「部下ハラ」というものがあると聞きますが、

それって要するにバックプレッシャー型ハラスメントでありリアクティブであるよなと思うこの頃ですが、

皆さんいかがお過ごしでしょうか。


さて、順調な滑り出しからスタートした本エントリーは Java Advent Calenda 2015 の12日目になります。


前日は最年少(?)OpenJDKコミッタの @ さんによる

DeprecatedがJDK9で変わるかもしれない件(JEP 277: Enhanced Deprecation)

明日は日本唯一のJava Championの @ さんのエントリーです。

なかなかエラいところに挟まれてしまったなという感じです!


では本題、

今日のテーマは「Stream APIをつくろう」です。


Stream APIって、使う側がよくフォーカスされるのですが、

作る側がどうなっているのか、あるいはその使いどころはどこなのか、というのはあまり情報がないように思います。


今日はその作る側にフォーカスしたいと思います。

少し長いエントリーですが、ゆっくりとおつきあいください。


おしながき

1. なぜStream APIを作りたいのか

2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

3. そもそもStream APIってどうやって作るの?

4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

5. まとめ、そして次回予告


1. なぜSteam APIを作りたいのか

もともとのきっかけは、AWSのS3上にある大量のログファイルを

elasticsearchに流し込む処理を書いたことでした。


ファイルサイズが膨大なのはもちろんのこと、ファイル数自体も大量ですから、

処理対象となるファイルをリストアップするだけでも大変という状況です。

なのでファイルリストを少しずつ作りつつ、

ファイルを開いて少し読んでは流し込むという、逐次処理をしなくてはいけません。


Java7までなら、うんたらHandlerを渡して処理をグルグル回すところですが

Java8なので、せっかくだからStream APIを使うことにしてみました。


ただ先にも書いたように、Stream APIを提供する側の処理はあまり情報がなかったため

@さんや@さん、@さんに色々と質問しながら勉強しました。

皆さん、ありがとうございました。今日はその辺りをまとめたいと思います。


ちなみに言葉の定義としてはStream APIを作るんじゃなくて

Stream APIのProducerを作るっていう方が正しいとは思うんですが

ぱっと見での分かりやすさのためにこうしました。


2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

まず手始めに、ローカルファイルにあるファイルを処理する方法から考えます。

特定のディレクトリの直下にテキストファイルが大量にあり、

ファイル名などは別に表示しなくても良いから、

テキストファイルを読み込んで、一行ずつ処理したいというシチュエーションです。


Files.listとFiles.linesをどう組み合わせるの?

ディレクトリ内にあるファイルの一覧を Stream<Path> として取得するのは Files.list メソッド、

ファイルを読み込んで Stream<String> として取得するのは Files.lines メソッドが使えます。

では、これを組み合わせる時は、どうしたら良いんでしょうか。


変換はmapメソッドだったよなーと思いつつ、こんな事をやると。

Stream<Stream<String>> stream = Files.list(path)
        .map(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
stream.forEach(System.out::println);

Stream<Stream<String>> になり、恐ろしく期待外れの結果になります。

java.util.stream.ReferencePipeline$Head@421faab1
java.util.stream.ReferencePipeline$Head@2b71fc7e
java.util.stream.ReferencePipeline$Head@5ce65a89

ではどうするかというと、そう、みんな大好きflatMapです。

try (Stream<String> stream = Files.list(path)
        .flatMap(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

こうすれば無事に結果をStream<String>として取り出すことができます。


出力例はこんな感じ。

This is file1.
Hello world.
This is file 2.
This is file3.
Good bye world.

ちなみにFiles.listで作ったStreamはきちんとcloseする必要があるので

try-with-resourcesを使うのがオススメです。


flatMapは毎回closeを呼び出してくれる

ちなみにflatMapがスグレモノなところは、

flatMapメソッドの中で作ったStreamを、毎回必ずcloseしてくれるところです。


上の例ではFiles.linesを利用しているので、別にcloseする必要はないのですが、

たとえばBufferedReaderやInputStreamなどを作った場合などには、closeする必要があります。

そういう時は、flatMapメソッド内で作るStreamにonCloseを書いておけば、毎回呼び出されるのです。

try (Stream<String> stream = Files.list(path)
        .flatMap(s -> {
            try {
                BufferedReader reader = Files.newBufferedReader(s);
                return reader.lines().onClose(() -> { // (1)
                    try {
                        reader.close(); // (2)
                        System.out.println("closed."); // (3)
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

(1) でStreamを作ったところにonCloseを書いて

(2) でBufferedReaderをcloseしています。

This is file1.
Hello world.
closed.
This is file 2.
closed.
This is file3.
Good bye world.
closed.

(3) で書いた「closed.」が3回表示され、きちんと毎回closeされていることが分かります。


この章のまとめ

1. Streamの要素からStreamに変換する時は、flatMapだ!

2. flatMapで作ったStreamは、きちんとcloseしてくれるぞ!


3. そもそもStream APIってどうやって作るの?

前の章ではflatMapが便利だという知見を得ました。

次は、そもそもStream APIってどうやって作れば良いのかを考えます。


恥ずかしながら全然知らなかったので、twitterで聞いてみたところ

@さんと @さんから、

Iteratorを作って、SpliteratorにしてStreamSupporに渡すと良いと教わりました。


この件については、BufferedReader.linesの実装が分かりやすすぎるので引用したいと思います。

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() { // (1)
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false); // (2)
}

(1) Iteratorを実装して

(2) Spliteratorsを使ってSpliteratorに変換し、StreamSupportを使ってStreamに変換しています。

これがStreamを作る時の、ひとつの王道のようですね。


この章のまとめ

1. ルークよ、Iteratorを使え


4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

Streamのつくりかたが分かったところで、

次は、AWS S3にあるテキストファイルを処理する方法を考えます。

と、その前に、

AWSになじみのない方がここで読んでしまうことを防ぐために説明したいのですが、

この章の目的は、AWS S3の操作がしたいのではなく、

Java標準APIでStream APIを作れないようなパターンで、どのように操作できるかを考えることです。


なのでAWSだけでなく、たとえばDBや外部Webサーバなどでも同じパターンが使えますので、

AWSはあくまで一例として捉えてもらえればと思います。


今回利用するクラス

まず簡単に、AWS S3からファイルを取ってくる処理を実装するために必要なクラスを紹介します。


ちなみにAWS S3とはファイルストレージのサービスで、

ブラウザからのファイルアップロード / ダウンロードができる他にも

コマンドラインツール(AWS CLI)や、AWS SDK(AWS SDK for Javaなど)から操作ができます。

今回はAWS SDK for Javaを使うことにします


それで、使うクラスは以下の通りです。

  • AmazonS3Client
    • AWS S3にアクセスするためのクライアント。通信するくせにclose不要なやつ。
  • S3ObjectSummary
    • S3のファイル情報を持つクラス。Javaで言うPathやFileクラスに相当する。
  • ObjectListing
    • ファイル一覧取得結果のメタ情報を持つクラス。ここから List<S3ObjectSummary> を取り出すことができる。

あんまり多くないですね。


AWS S3へのアクセスを試す

まず簡単に、AWS S3にJavaからアクセスする方法を書いておきます。

型などがはっきり分かるよう、Stream APIを使わずに書きます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (1)
List<S3ObjectSummary> list = listing.getObjectSummaries(); // (2)

for (S3ObjectSummary s : list) {
    S3Object object = client.getObject(s.getBucketName(), s.getKey()); // (3)
    try (InputStream content = object.getObjectContent(); // (4)
         BufferedReader reader = new BufferedReader(new InputStreamReader(content))) {
        String line = reader.readLine();
        while (line != null) {
            System.out.println(line); // (5)
            line = reader.readLine();
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

(1)「cero.ninja.public」という名前のバケットにある「advent2015java」というフォルダから情報を取り

(2) ファイル情報をListとして取り出して

(3) ファイルの中身を読み込んで

(4) InputStreamを取り出して

(5) 標準出力に表示しています。


簡単でしょう?

ただ、実は (1) のメソッド呼び出しは最大1000件までしか取得できないという制限があります。

1001件以降のデータを取るためには、別のメソッドを使う必要があります。

List<S3ObjectSummary> allList = new ArrayList<>(); // (6)

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (7)
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    allList.addAll(list); // (8)
    listing = client.listNextBatchOfObjects(listing); // (9)
    list = listing.getObjectSummaries();
}

(6) Listを別に用意しておいて

(7) 最初の1000件を読み込み

(8) ファイル情報を (6) のListに入れながら

(9) 次の1000件を読む、を繰り返す


こんな風になります。

ただこれがたとえば10万件、100万件となってくると、Listがメモリを過剰に消費しますし

そもそも実際の処理を始めるまでのオーバーヘッドが大きくなりすぎます。


じゃぁwhileの中で処理しちゃえばいいやんか、という話になってきます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java");
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    list.stream().forEach(this::doSomething); // (10)
    listing = client.listNextBatchOfObjects(listing);
    list = listing.getObjectSummaries();
}

(10) 取得した1000件を使ってすぐに処理を始めます。


これはこれで正解だと思いますしシンプルで良いのですが、

「S3から情報を取るクラスに処理を渡す」よりも

「S3から情報を取ってStreamにする」ほうが、何かと取り回しが楽なのですよね。


AWSへのアクセスをStreamにしよう

ということで、ここまで学んだ知識を利用して、

AWS S3にあるテキストファイルをまとめて Stream<String> にする方法を考えます。


まずはファイル一覧を取ってくる部分を、Iteratorにします。

public class S3Iterator implements Iterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    ObjectListing listing;
    List<S3ObjectSummary> cache; // (1)

    public S3Iterator(String bucketName, String key) {
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    void load() {
        if (cache != null && cache.size() > 0) {
            return;
        }

        if (listing == null) {
            listing = client.listObjects(bucketName, key); // (2)
        } else {
            listing = client.listNextBatchOfObjects(listing); // (3)
        }

        cache = listing.getObjectSummaries(); // (4)
    }

    @Override
    public boolean hasNext() {
        if (cache != null && cache.size() > 0) {
            return true;
        }

        load();
        return cache.size() > 0;
    }

    @Override
    public InputStream next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        S3ObjectSummary summary = cache.remove(0); // (5)
        return client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); // (6)
    }
}

(1) 読み込んだ1000件のファイル情報を保持しておくcacheを作っておき

(2) 初回はlistObjectsを使って読み込み

(3) 2回目以降はlistNextBatchOfObjectsを使って読み込みます。

(4) 読み込んだファイル情報はcacheに入れておき

(5) removeしながら取り出します。

(6) また、S3ObjectSummaryのStreamにはせず、InputStreamにしてから返します。

このようにすれば、Iteratorの外側でAmazonS3Clientを利用する必要がなくなるためです。


そして、このIteratorからStreamを作り、さらにflatMapを使ってStream<String>に集約しましょう。

Stream<InputStream> fileStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
        new S3Iterator("cero.ninja.public", "advent2015java"), Spliterator.NONNULL), false); // (7)

Stream<String> stream = fileStream.flatMap(in -> { // (8)
    BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
    return reader.lines() // (9)
            .onClose(() -> { // (10)
            reader.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    });
});

stream.forEach(System.out::println); // (11)

(7) IteratorからSpliteratorを作り、そこからStreamに変換します。これでファイル一覧が Stream<InputStream> になります。

(8) InputStreamから読み込んだ内容を集約するために、flatMapを使って

(9) BufferedReader.linesで作ったStream<String>を集約します。

(10) ここでonCloseを書いておくと、BufferedReaderが毎回きちんとcloseされます。flatMapさまさま。

(11) 最後に標準出力への書き出しをしていますが、もちろんこの Stream<String> はどのように扱うこともできます。


という感じで、AWS S3上にあるテキストファイルを Stram<String> にして

処理することができるようになりました。


gzipやzipだったらどうするの?

ちなみに大量のテキストファイルが存在する場合、たとえばそれがWebサーバのログだったりすると

たいていにおいてgzip圧縮されているかと思います。

そういう時は、読み込む時にGZIPInputStreamを使うと良いでしょう。


先に出した例とあまり変わらず、GZIPInputStreamを挟んだだけの形となります。

Stream<String> stream = fileStream.flatMap(in -> {
    BufferedReader reader;
    try {
        reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), StandardCharsets.UTF_8)); // (12)
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    return reader.lines()
            .onClose(() -> {
                try {
                    reader.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
});

(12) のところにGZIPInputStreamを挟みました。


では、読み込む対象が、複数のファイルを持つようなzipファイルだった場合はどうすれば良いでしょうか。

実はこれが割と難敵で、zipで処理を回せるようなIteratorをもう一つ作る必要があります。


ZipInputStreamにStream<InputStream>を返すようなAPIを作ってくれれば良いのにと思うんですが

(ZipFileやFileSystemsならStreamを作りやすいんですが、ZipInputStreamからは作れない!)

そのあたりは自前で少し実装する必要があります。

さすがにエントリーが長くなってきたので、今回は割愛します。


Spliteratorを直接作っても良い

@さんから、Iteratorを実装するよりも

AbstractSpliteratorを実装したSpliteratorを作ったほうが楽だよという知見をもらったので

それを使ったサンプルも掲載しておきます。

public class S3Spliterator extends Spliterators.AbstractSpliterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    List<S3ObjectSummary> cache;
    ObjectListing listing;

    public S3Spliterator(String bucketName, String key) {
        super(Long.MAX_VALUE, 0);
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    @Override
    public boolean tryAdvance(Consumer<? super InputStream> action) {
        if (cache == null) {
            listing = client.listObjects(bucketName, key);
            cache = listing.getObjectSummaries();
        } else if (cache.isEmpty()) {
            listing = client.listNextBatchOfObjects(listing);
            cache = listing.getObjectSummaries();
        }

        if (cache.isEmpty()) {
            return false;
        }

        S3ObjectSummary summary = cache.remove(0);
        S3ObjectInputStream in = client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent();
        action.accept(in);
        return true;

    }
}

tryAdvanceメソッドをひとつ作れば良いだけですので、

hasNextやnextを実装する時のように、状態がどうなるかよく分からなくなって混乱することもありません。

確かに、Streamを作ることだけが目的となる場合は、こちらを使った方が楽になりそうですね。


念のため、これを使った処理も書いておきます。

Stream<InputStream> fileStream = StreamSupport.stream(
        new S3Spliterator("cero.ninja.public", "advent2015java"), false);

Stream<String> stream = fileStream.flatMap(in -> { // 以下略

StreamSupportに直接渡すだけで済む、ということですね。


この章のまとめ

1. 「1000件ずつ読み出して処理」みたいなやつは、IteratorにしてStreamにすれば処理しやすいよ!

2. AbstractSpliteratorを継承すると、Spliteratorを作りやすいよ!


5. まとめ、そして次回予告

では今回のまとめです。

今回のエントリーでは、Streamを作るところと、

その使いどころとしてAWS S3上の大量ファイルを例に挙げて説明しました。


Iteratorを作り、Spliteratorにして、そこからStreamを作る、

また、Stream.flatMapを使って複数リソースをシームレスに処理することができました。


Stream.flatMapを使うときちんとリソースのcloseができるのは、便利さがじわじわ効いてくる系のやつで、

たとえばStreamの代わりにIteratorを取り回そうとすると、リソースのcloseタイミングで困ることになります。

そういう点でも、Streamというのは取り回しやすい形なのかなと思います。


さて、長くなってきたので、今回のエントリーはここまでにしたいと思います。


でも実はもう一つ、言及したかったテーマがあります。

それが「Hot ProducerとCold Producer」というお話です。


JavaのStream APIは、どちらかと言えば「既に存在するListやMap」などに対する処理を

簡潔に書くところがクローズアップされているように思います。

これはCold Producer、あるいはCold Streamと呼ばれているそうです。


一方で、HTTPリクエストや、JMS、AMQP、あるいはTwitterのstreamなど、

いわゆるイベントを受け取るようなもの、あるいは無限Streamになるようなものは

Hot Producer、Hot Streamと呼ばれます。


JavaではHot Producerを扱う場合には、Listenerを実装する形が一般的ですが

Streamとして処理することもできるはずです。


Reactive StreamsがJava9に入るとか、バックプレッシャー型がどうしたと言われている中で、

このHot ProducerをStream APIを使って処理するような考え方が、

Javaにおいても、今後、重要になってくるはずです。

次回は、その辺りを考えてみたいと思います。


Stay metal,

See you!

2011-10-03 Day 1 - 期待外れとCollectionとHotRockit

[] HotRockitかわいいよ、HotRockit

HotRockit: What to Expect from Oracle's Converged JVM

新情報もありました。


VisualVMなどなかった(気づかなかった?)頃には、

詳しい障害情報が取れたり、パフォーマンスがGUIで見えるMission Controlは衝撃的で、

私自身、開発用のJDKとしてはHotSpotよりもJRockitの方を好んで使っていました。

なのでHotRockitにも注目と期待をしています。


さて、そんなHotRockitですが、以下の機能を持つとのこと。


1. JRCMDがJCMDに名前を変えて、利用できるようになる。

print_threads、print_memusage、print_object_summary、heap_diagnotics、start_flightrecordingなどなど。

元々、HotSpotでも別コマンドで実現できたものもありますが、

FlightRecorder系の新しいコマンドなども追加されることになります。


2. JDPも正式名称を変えて、利用できるようになる。

JDPは JRockit Discovery Protocol から Java Discovery Protocol に名前を変えて

HotRockitで使えるようになります。

ネットワーク上の管理可能なJVMを見つけるために使うプロトコルで、

既に動いてないJVMだって見つけることができるそうです。


3. MBeanもアップデート

JRockitのMBeanも、HotRockitで使えるようになります。

DiagnosticCommandMBean、ProfilingMBean、PerfCounterMBeanなど。


4. そして、Java FlightRecorder

JRocit Flight Recorder は Java Flight Recorder という名前になって利用ができます。

Flight Recorderは名前の通り、飛行機のブラックボックスのようなもので、

Javaプロセスを常時記録して(それでいてオーバーヘッドが少ない)

問題が起きた時などに情報を取り出すことができるツールです。

トラブルシュート屋として注目のツールですね。


5. Java Mission Controlも来ました

もう皆さんお気づきですが、これまでJRockitホゲホゲという名前だったツール群は

いずれもJavaホゲホゲという名前になってHotRockitでも使えるようになるのです。

このMission Controlも例外ではありません。

「VisualVMとの棲み分けは?」とは誰もが思う疑問ですが、今の所、明瞭な回答はありません。


6. Memleak Serverだって?

前のJJUG CCCでも紹介されていたそうですが、私は初見だった、Memleak Server。

Mission ControlからHeapの情報を部分的に見に行くような機能のようですね?

デモではHeapの親子関係を図示して、メモリリークの親を見つけるということをやっていました。

これまた、トラブルシュート現場で役立ちそうなツールですね。


その他にも、

HotRockitではPermGen領域がなくなるとか、リアルタイムGCが使えるようになる、

なども説明されちえました。


質疑応答では「リアルタイムGCとG1GC、どっちがメインなの?」という

質問が出ていましたが、まだ決まってないとのこと。

また、JRockit Virtual Editionの統合などもまだ決まっていないとの事でした。


以上のように、JRockitの要素がHotSpotに入ってくることで

特にトラブルシュートの手法に大きな影響(効率化)が出てくると思います。

僕も、今のうちからもうちょっと勉強しておこうと、改めて思わされました。

[] コレクションの細かい話、だが面白い!

Collections Gathering

これぞJavaOne、これぞBoFという内容。

もう、すーーっごい細かいCollectionの話なんですが、だから面白かったんです。


Java7でCollections#sortが改善され、

TimSortとDual Pivot Quicksortが導入されてソートが2〜3から10倍早くなりました。

知ってたらドヤ顔できる系の情報ですね!


Java7でList/TreeMap/TreeSetのIteratorがかなり早くなって、

ArrayListのforeach文(拡張for文)も、普通のfor文より早くなりました。


これまでは

「統一感のために、ArrayListもforeach文を使おうぜ」派と

「ArrayListだけはfor文使った方がパフォーマンスいいから、for文使うぜ」派に

分派していましたが、これからは皆さん、foreach派になってもらえそうです。


「ArrayListだけはfor文使った方がパフォーマンスいいから、俺はfor文使うぜ!」

「はぁ? お前Java7からforeach文の方がパフォーマンス良くなったの知らないの? 知識、中途半端じゃね?」

とかって言う機会があるかと考えただけで、今からワクワクします!

# これが僕の聞き間違いで、誤情報だったらどうしよう・・・。


その他、Java7で増えたいくつかのキューイング系のクラス、

ThreadLocalRandomクラスの紹介や、

ConcurrentHashMapのオーバーヘッドが少なくなったことなどが紹介されました。


そして、Java8のCollectionも一瞬でしたが、紹介がありました!


1. 既存のCollection系のメソッドを、Lambdaベースで書けるようになる。

list.removeAll(s -> s.length() > 20);

2. ソート処理の並列化

list.parallelSort(cmparing(Person::getAge));

3. コレクションの並列処理

int max = list.parallel()
	.filter(x -> x.age >= 21)
	.map(x -> x.score)
	.reduce(0, Math::max);

これホントにJavaなんですか? っていう感じですが。

Java8以降は、並列化の強化、そのためのLambda導入のおかげで

ずいぶんとコードの見栄えが変わってくることになりそうです。