谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2016-01-06

[]AWS Lambda + Javaは、なぜ1回目と3回目の処理が重いのか?

以前のエントリーで、AWS LambdaでJavaを使ってDynamoDBを呼び出した際に、初回起動にとても時間が掛かったという話を書きました。

http://d.hatena.ne.jp/cero-t/20160101/1451665326


今回は、この辺りの原因をもう少し追求してみます。


なぜ1回目と3回目のアクセスが遅いのか?

AWS Lambdaの中身はよく知りませんが、おそらく、アップロードしたモジュールTomcatみたいなコンテナとして起動させて、外部からコールしているんだろうと予想しました。それであれば、2回目以降のアクセスが早くなることは理解ができます。

ただ、1回目と3回目だけが極端に遅くて、2回目、4回目以降は早くなるというところは腑に落ちません。


その辺りを調べるべく、staticなカウンタを使って、値がどんな風に変化するかを調べてみました。

こんなソースコードです。

public class Pid {
    static AtomicLong counter = new AtomicLong();

    public String myHandler() {
        long count = counter.incrementAndGet();
        String name = ManagementFactory.getRuntimeMXBean().getName();
        System.out.println("Name: " + name);
        System.out.println("Count: " + count);
        return "SUCCESS";
    }
}

出力された結果は、次のようになりました。

回数NameCount
1回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal1
2回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal2
3回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal1
4回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal2
5回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal3
6回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal4
7回目1@ip-10-0-aaa-bbb.ap-northeast-1.compute.internal5
8回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal3
9回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal4
10回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal5
11回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal6
12回目1@ip-10-0-xxx-yyy.ap-northeast-1.compute.internal7

サーバIPアドレスが2種類あり、それぞれのサーバで、1から順番にカウントアップしていることが分かります。

なるほど、2台のサーバでロードバランシングしているのだと。そのため、それぞれのサーバの初回起動である、1回目と3回目の処理に時間が掛かるのですね。なかなか納得いく結果でした。

ちなみにロードバランシングは毎回このような結果になるわけではなく、1回目と2回目がそれぞれ別のサーバに行く(=処理に時間が掛かる)こともあります。


どんなコンテナを使っているのか?

先ほど「Tomcatみたいなコンテナ」を使っているんじゃないかと推測しましたが、実際、どんなコンテナを使っているのでしょうか。スレッドダンプを取って、確かめてみました。


こんなコードです。

public class StackTrace {
    public String myHandler() {
        new Exception("For stack trace").printStackTrace();
        Arrays.stream(ManagementFactory.getThreadMXBean().dumpAllThreads(true, true))
                .forEach(System.out::println);
        return "SUCCESS";
    }
}

結果、こうなりました。

java.lang.Exception: For stack trace
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:8)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at lambdainternal.EventHandlerLoader$PojoMethodRequestHandler.handleRequest(EventHandlerLoader.java:434)
	at lambdainternal.EventHandlerLoader$PojoHandlerAsStreamHandler.handleRequest(EventHandlerLoader.java:365)
	at lambdainternal.EventHandlerLoader$2.call(EventHandlerLoader.java:967)
	at lambdainternal.AWSLambda.startRuntime(AWSLambda.java:231)
	at lambdainternal.AWSLambda.<clinit>(AWSLambda.java:59)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at lambdainternal.LambdaRTEntry.main(LambdaRTEntry.java:93)

"Signal Dispatcher" Id=4 RUNNABLE

"Finalizer" Id=3 WAITING on java.lang.ref.ReferenceQueue$Lock@19bb089b
	at java.lang.Object.wait(Native Method)
(略)

"Reference Handler" Id=2 WAITING on java.lang.ref.Reference$Lock@4563e9ab
	at java.lang.Object.wait(Native Method)
(略)

"main" Id=1 RUNNABLE
	at sun.management.ThreadImpl.dumpThreads0(Native Method)
	at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:446)
	at cero.ninja.aws.analyze.StackTrace.myHandler(StackTrace.java:9)
(略)

何やらシンプルな独自コンテナを使っているみたいです。何度か実行してみても結果は同じでした。

ソースコードがないので推測になりますが、アップロードされたzipをロードして起動する独自コンテナがあり、外部からAPIコールされた際にAWSLambdaクラスあたりが処理を受け取って、zip内のハンドラを呼び出しているのでしょう。


なぜコンストラクタで処理すると早いの?

そういえば、もう一つ、謎な挙動がありました。

それは、Credentialsを取るという重めの処理をコンストラクタで実行すれば、処理時間がかなり短くなるというものです。


ハンドラの中でCredentialsを取ると、実測値で24秒ぐらい、課金対象値で22秒ぐらいでした。

コンストラクタでCredentialsを取っておくと、実測値で8秒ぐらい、課金対象値で6秒ぐらいでした。

ここでいう実測値とは、手元のストップウォッチを使って計測したという意味です。


ここから推測できることは、コンストラクタは事前に処理されていて、そこは課金対象外になるのかも知れません。


・・・あれ、それなら、コンストラクタで重い処理をがっつり走らせて、ハンドラでその結果を取り出せば、課金額を抑えられるじゃないですか?

ということで、ハンドラの中で10秒スリープする場合と、コンストラクタスリープした場合の比較をしてみました。


こんな2つのクラスで試してみます。

public class Wait1 {
    static long origin = System.currentTimeMillis();

    public String myHandler() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "SUCCESS";
    }
}
public class Wait2 {
    static long origin = System.currentTimeMillis();

    public Wait2() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(10000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果は、、、


Wait1(ハンドラの中でsleep)

回数Before waitAftre wait課金対象値実測値
1回目3901039010100ms10秒
2回目3871038710100ms10秒
3回目209173091710100ms10秒
4回目688527885210100ms10秒

きっちり10秒sleepして、課金対象値もそのオーバーヘッド分ぐらい。ストップウォッチで計測した値も同じく10秒ぐらいになりました。


Wait2(コンストラクタでsleep)

回数Before waitAftre wait課金対象値実測値
1回目3381033814800ms25秒
2回目--100ms1秒以下
3回目2611035814800ms25秒
4回目--100ms1秒以下

えーっ、sleepは10秒だったのに、なぜか15秒分ぐらい課金されてしまい、ストップウォッチで計測すると25秒と、えらく時間が掛かりました。これは謎な挙動です。

2回目や4回目ではインスタンス生成が終わっているので、Before waitやAfter waitが出力されず、処理がすぐに終わるというのは納得ですが。


どうして、こんなことが起きるんでしょうか。

不思議に思って、CloudWatch Logsのログを確認してみると・・・

Before wait: 17 
START RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f Version: $LATEST 
Before wait: 249 
After wait: 10250 
Called: 14408
END RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f 
REPORT RequestId: 34ae18c3-b47b-11e5-858f-272ee689265f	Duration: 14531.57 ms	Billed Duration: 14600 ms Memory Size: 128 MB Max Memory Used: 27 MB	

最初のBefore waitの後にAfter waitがなく、Lambdaの処理がSTARTした後に再度Before waitが呼ばれ、After waitした後に、4秒ほど待ってから、ハンドラ処理が実行されてCalledが呼ばれていました。

なるほど、つまりこういうことでしょうか。


1. コンストラクタが実行され、10秒sleepの途中でタイムアウトして強制的に処理が止められ、インスタンス生成を諦めた(プロセスごと破棄された?)

2. 改めてコンストラクタが実行され、10秒sleepした。

3. AWS Lambda内の処理か何かで4秒ぐらい処理が掛かった。

4. ハンドラが実行された。

5. 2〜4の間が課金対象となり、15秒弱となった。

6. ストップウォッチで計測した時間は1から5までの間なので、25秒弱となった。


要するに、コンストラクタで重い処理を行うような悪いことを考える人への対策として、コンストラクタは一定時間で(おそらく10秒きっかりで)タイムアウトして、いったんプロセスは破棄される。

その後、改めてコンストラクタの処理がタイムアウト関係なく実行されたうえで、AWS Lambdaの内部処理と、ハンドラ処理が行われ、すべての処理が課金対象となる、ということころでしょうか。


コンストラクタの処理が短い場合は、どうなるの?

ということで、sleepの時間を短くして、再挑戦してみます。

public class Wait3 {
    static long origin = System.currentTimeMillis();

    public Wait3() {
        try {
            System.out.println("Before wait: " + (System.currentTimeMillis() - origin));
            Thread.sleep(2000);
            System.out.println("After wait: " + (System.currentTimeMillis() - origin));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String myHandler() {
        System.out.println("Called: " + (System.currentTimeMillis() - origin));
        return "SUCCESS";
    }
}

結果。

回数Before WaitAfter wait課金対象値実測値
1回目--100ms3秒
2回目--100ms1秒以内
3回目--100ms1秒以内

CloudWatch Logsでの出力

Before wait: 25 
After wait: 2026 
START RequestId: xxx Version: $LATEST 
Called: 2389 
END RequestId: xxx 
REPORT RequestId: xxx	Duration: 97.52 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	 
START RequestId: yyy Version: $LATEST 
Called: 16385 
END RequestId: yyy 
REPORT RequestId: yyy	Duration: 6.04 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 27 MB	

今度はSTARTの前に、きちんとBefore waitもAfter waitも出力され、ハンドラ処理のみが課金対象となっていました。


まとめ

ここまでの話をまとめると、AWS Lambdaは・・・


1. 複数台のサーバで処理されるため、それぞれのサーバでの初回起動時には処理時間が掛かる。

2. 独自のコンテナを利用して、モジュールデプロイしている。

3. コンストラクタの処理が軽い場合は、ハンドラ内の処理だけが課金対象となる。

4. コンストラクタの処理が重い場合は、コンストラクタの処理 + 5秒弱 + ハンドラ内の処理が課金対象となる。


ということですね。


・・・とは言え、Credentialsの処理をコンストラクタで行った場合に、実測値まで早くなる辺りは、少しだけ不可解です。というか、Credentialsの取得処理が重いこと自体が不可解なのですが。

この辺りはもう少し追試験をしてみれば解析できそうですが、長くなるので、今回はこの辺りまでにしたいと思います。


いやー、Lambdaさん、なかなかよく考えられてますね!

2015-12-12

[Java]Stream APIをつくろう

最近、上司からのパワハラではなく、部下からの「部下ハラ」というものがあると聞きますが、

それって要するにバックプレッシャー型ハラスメントでありリアクティブであるよなと思うこの頃ですが、

皆さんいかがお過ごしでしょうか。


さて、順調な滑り出しからスタートした本エントリーは Java Advent Calenda 2015 の12日目になります。


前日は最年少(?)OpenJDKコミッタの @ さんによる

DeprecatedがJDK9で変わるかもしれない件(JEP 277: Enhanced Deprecation)

明日は日本唯一のJava Championの @ さんのエントリーです。

なかなかエラいところに挟まれてしまったなという感じです!


では本題、

今日のテーマは「Stream APIをつくろう」です。


Stream APIって、使う側がよくフォーカスされるのですが、

作る側がどうなっているのか、あるいはその使いどころはどこなのか、というのはあまり情報がないように思います。


今日はその作る側にフォーカスしたいと思います。

少し長いエントリーですが、ゆっくりとおつきあいください。


おしながき

1. なぜStream APIを作りたいのか

2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

3. そもそもStream APIってどうやって作るの?

4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

5. まとめ、そして次回予告


1. なぜSteam APIを作りたいのか

もともとのきっかけは、AWSのS3上にある大量のログファイルを

elasticsearchに流し込む処理を書いたことでした。


ファイルサイズが膨大なのはもちろんのこと、ファイル数自体も大量ですから、

処理対象となるファイルをリストアップするだけでも大変という状況です。

なのでファイルリストを少しずつ作りつつ、

ファイルを開いて少し読んでは流し込むという、逐次処理をしなくてはいけません。


Java7までなら、うんたらHandlerを渡して処理をグルグル回すところですが

Java8なので、せっかくだからStream APIを使うことにしてみました。


ただ先にも書いたように、Stream APIを提供する側の処理はあまり情報がなかったため

@さんや@さん、@さんに色々と質問しながら勉強しました。

皆さん、ありがとうございました。今日はその辺りをまとめたいと思います。


ちなみに言葉の定義としてはStream APIを作るんじゃなくて

Stream APIProducerを作るっていう方が正しいとは思うんですが

ぱっと見での分かりやすさのためにこうしました。


2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

まず手始めに、ローカルファイルにあるファイルを処理する方法から考えます。

特定のディレクトリの直下にテキストファイルが大量にあり、

ファイル名などは別に表示しなくても良いから、

テキストファイルを読み込んで、一行ずつ処理したいというシチュエーションです。


Files.listとFiles.linesをどう組み合わせるの?

ディレクトリ内にあるファイルの一覧を Stream<Path> として取得するのは Files.list メソッド

ファイルを読み込んで Stream<String> として取得するのは Files.lines メソッドが使えます。

では、これを組み合わせる時は、どうしたら良いんでしょうか。


変換はmapメソッドだったよなーと思いつつ、こんな事をやると。

Stream<Stream<String>> stream = Files.list(path)
        .map(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
stream.forEach(System.out::println);

Stream<Stream<String>> になり、恐ろしく期待外れの結果になります。

java.util.stream.ReferencePipeline$Head@421faab1
java.util.stream.ReferencePipeline$Head@2b71fc7e
java.util.stream.ReferencePipeline$Head@5ce65a89

ではどうするかというと、そう、みんな大好きflatMapです。

try (Stream<String> stream = Files.list(path)
        .flatMap(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

こうすれば無事に結果をStream<String>として取り出すことができます。


出力例はこんな感じ。

This is file1.
Hello world.
This is file 2.
This is file3.
Good bye world.

ちなみにFiles.listで作ったStreamはきちんとcloseする必要があるので

try-with-resourcesを使うのがオススメです。


flatMapは毎回closeを呼び出してくれる

ちなみにflatMapがスグレモノなところは、

flatMapメソッドの中で作ったStreamを、毎回必ずcloseしてくれるところです。


上の例ではFiles.linesを利用しているので、別にcloseする必要はないのですが、

たとえばBufferedReaderやInputStreamなどを作った場合などには、closeする必要があります。

そういう時は、flatMapメソッド内で作るStreamにonCloseを書いておけば、毎回呼び出されるのです。

try (Stream<String> stream = Files.list(path)
        .flatMap(s -> {
            try {
                BufferedReader reader = Files.newBufferedReader(s);
                return reader.lines().onClose(() -> { // (1)
                    try {
                        reader.close(); // (2)
                        System.out.println("closed."); // (3)
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

(1) でStreamを作ったところにonCloseを書いて

(2) でBufferedReaderをcloseしています。

This is file1.
Hello world.
closed.
This is file 2.
closed.
This is file3.
Good bye world.
closed.

(3) で書いた「closed.」が3回表示され、きちんと毎回closeされていることが分かります。


この章のまとめ

1. Streamの要素からStreamに変換する時は、flatMapだ!

2. flatMapで作ったStreamは、きちんとcloseしてくれるぞ!


3. そもそもStream APIってどうやって作るの?

前の章ではflatMapが便利だという知見を得ました。

次は、そもそもStream APIってどうやって作れば良いのかを考えます。


恥ずかしながら全然知らなかったので、twitterで聞いてみたところ

@さんと @さんから、

Iteratorを作って、SpliteratorにしてStreamSupporに渡すと良いと教わりました。


この件については、BufferedReader.linesの実装が分かりやすすぎるので引用したいと思います。

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() { // (1)
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false); // (2)
}

(1) Iteratorを実装して

(2) Spliteratorsを使ってSpliteratorに変換し、StreamSupportを使ってStreamに変換しています。

これがStreamを作る時の、ひとつの王道のようですね。


この章のまとめ

1. ルークよ、Iteratorを使え


4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

Streamのつくりかたが分かったところで、

次は、AWS S3にあるテキストファイルを処理する方法を考えます。

と、その前に、

AWSになじみのない方がここで読んでしまうことを防ぐために説明したいのですが、

この章の目的は、AWS S3の操作がしたいのではなく、

Java標準APIでStream APIを作れないようなパターンで、どのように操作できるかを考えることです。


なのでAWSだけでなく、たとえばDBや外部Webサーバなどでも同じパターンが使えますので、

AWSはあくまで一例として捉えてもらえればと思います。


今回利用するクラス

まず簡単に、AWS S3からファイルを取ってくる処理を実装するために必要なクラスを紹介します。


ちなみにAWS S3とはファイルストレージのサービスで、

ブラウザからのファイルアップロード / ダウンロードができる他にも

コマンドラインツール(AWS CLI)や、AWS SDKAWS SDK for Javaなど)から操作ができます。

今回はAWS SDK for Javaを使うことにします


それで、使うクラスは以下の通りです。

  • AmazonS3Client
  • S3ObjectSummary
    • S3のファイル情報を持つクラス。Javaで言うPathやFileクラスに相当する。
  • ObjectListing
    • ファイル一覧取得結果のメタ情報を持つクラス。ここから List<S3ObjectSummary> を取り出すことができる。

あんまり多くないですね。


AWS S3へのアクセスを試す

まず簡単に、AWS S3にJavaからアクセスする方法を書いておきます。

型などがはっきり分かるよう、Stream APIを使わずに書きます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (1)
List<S3ObjectSummary> list = listing.getObjectSummaries(); // (2)

for (S3ObjectSummary s : list) {
    S3Object object = client.getObject(s.getBucketName(), s.getKey()); // (3)
    try (InputStream content = object.getObjectContent(); // (4)
         BufferedReader reader = new BufferedReader(new InputStreamReader(content))) {
        String line = reader.readLine();
        while (line != null) {
            System.out.println(line); // (5)
            line = reader.readLine();
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

(1)「cero.ninja.public」という名前のバケットにある「advent2015java」というフォルダから情報を取り

(2) ファイル情報をListとして取り出して

(3) ファイルの中身を読み込んで

(4) InputStreamを取り出して

(5) 標準出力に表示しています。


簡単でしょう?

ただ、実は (1) のメソッド呼び出しは最大1000件までしか取得できないという制限があります。

1001件以降のデータを取るためには、別のメソッドを使う必要があります。

List<S3ObjectSummary> allList = new ArrayList<>(); // (6)

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (7)
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    allList.addAll(list); // (8)
    listing = client.listNextBatchOfObjects(listing); // (9)
    list = listing.getObjectSummaries();
}

(6) Listを別に用意しておいて

(7) 最初の1000件を読み込み

(8) ファイル情報を (6) のListに入れながら

(9) 次の1000件を読む、を繰り返す


こんな風になります。

ただこれがたとえば10万件、100万件となってくると、Listがメモリを過剰に消費しますし

そもそも実際の処理を始めるまでのオーバーヘッドが大きくなりすぎます。


じゃぁwhileの中で処理しちゃえばいいやんか、という話になってきます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java");
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    list.stream().forEach(this::doSomething); // (10)
    listing = client.listNextBatchOfObjects(listing);
    list = listing.getObjectSummaries();
}

(10) 取得した1000件を使ってすぐに処理を始めます。


これはこれで正解だと思いますしシンプルで良いのですが、

「S3から情報を取るクラスに処理を渡す」よりも

「S3から情報を取ってStreamにする」ほうが、何かと取り回しが楽なのですよね。


AWSへのアクセスをStreamにしよう

ということで、ここまで学んだ知識を利用して、

AWS S3にあるテキストファイルをまとめて Stream<String> にする方法を考えます。


まずはファイル一覧を取ってくる部分を、Iteratorにします。

public class S3Iterator implements Iterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    ObjectListing listing;
    List<S3ObjectSummary> cache; // (1)

    public S3Iterator(String bucketName, String key) {
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    void load() {
        if (cache != null && cache.size() > 0) {
            return;
        }

        if (listing == null) {
            listing = client.listObjects(bucketName, key); // (2)
        } else {
            listing = client.listNextBatchOfObjects(listing); // (3)
        }

        cache = listing.getObjectSummaries(); // (4)
    }

    @Override
    public boolean hasNext() {
        if (cache != null && cache.size() > 0) {
            return true;
        }

        load();
        return cache.size() > 0;
    }

    @Override
    public InputStream next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        S3ObjectSummary summary = cache.remove(0); // (5)
        return client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); // (6)
    }
}

(1) 読み込んだ1000件のファイル情報を保持しておくcacheを作っておき

(2) 初回はlistObjectsを使って読み込み

(3) 2回目以降はlistNextBatchOfObjectsを使って読み込みます。

(4) 読み込んだファイル情報はcacheに入れておき

(5) removeしながら取り出します。

(6) また、S3ObjectSummaryのStreamにはせず、InputStreamにしてから返します。

このようにすれば、Iteratorの外側でAmazonS3Clientを利用する必要がなくなるためです。


そして、このIteratorからStreamを作り、さらにflatMapを使ってStream<String>に集約しましょう。

Stream<InputStream> fileStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
        new S3Iterator("cero.ninja.public", "advent2015java"), Spliterator.NONNULL), false); // (7)

Stream<String> stream = fileStream.flatMap(in -> { // (8)
    BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
    return reader.lines() // (9)
            .onClose(() -> { // (10)
            reader.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    });
});

stream.forEach(System.out::println); // (11)

(7) IteratorからSpliteratorを作り、そこからStreamに変換します。これでファイル一覧が Stream<InputStream> になります。

(8) InputStreamから読み込んだ内容を集約するために、flatMapを使って

(9) BufferedReader.linesで作ったStream<String>を集約します。

(10) ここでonCloseを書いておくと、BufferedReaderが毎回きちんとcloseされます。flatMapさまさま。

(11) 最後に標準出力への書き出しをしていますが、もちろんこの Stream<String> はどのように扱うこともできます。


という感じで、AWS S3上にあるテキストファイルを Stram<String> にして

処理することができるようになりました。


gzipzipだったらどうするの?

ちなみに大量のテキストファイルが存在する場合、たとえばそれがWebサーバのログだったりすると

たいていにおいてgzip圧縮されているかと思います。

そういう時は、読み込む時にGZIPInputStreamを使うと良いでしょう。


先に出した例とあまり変わらず、GZIPInputStreamを挟んだだけの形となります。

Stream<String> stream = fileStream.flatMap(in -> {
    BufferedReader reader;
    try {
        reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), StandardCharsets.UTF_8)); // (12)
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    return reader.lines()
            .onClose(() -> {
                try {
                    reader.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
});

(12) のところにGZIPInputStreamを挟みました。


では、読み込む対象が、複数のファイルを持つようなzipファイルだった場合はどうすれば良いでしょうか。

実はこれが割と難敵で、zipで処理を回せるようなIteratorをもう一つ作る必要があります。


ZipInputStreamにStream<InputStream>を返すようなAPIを作ってくれれば良いのにと思うんですが

(ZipFileやFileSystemsならStreamを作りやすいんですが、ZipInputStreamからは作れない!)

そのあたりは自前で少し実装する必要があります。

さすがにエントリーが長くなってきたので、今回は割愛します。


Spliteratorを直接作っても良い

@さんから、Iteratorを実装するよりも

AbstractSpliteratorを実装したSpliteratorを作ったほうが楽だよという知見をもらったので

それを使ったサンプルも掲載しておきます。

public class S3Spliterator extends Spliterators.AbstractSpliterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    List<S3ObjectSummary> cache;
    ObjectListing listing;

    public S3Spliterator(String bucketName, String key) {
        super(Long.MAX_VALUE, 0);
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    @Override
    public boolean tryAdvance(Consumer<? super InputStream> action) {
        if (cache == null) {
            listing = client.listObjects(bucketName, key);
            cache = listing.getObjectSummaries();
        } else if (cache.isEmpty()) {
            listing = client.listNextBatchOfObjects(listing);
            cache = listing.getObjectSummaries();
        }

        if (cache.isEmpty()) {
            return false;
        }

        S3ObjectSummary summary = cache.remove(0);
        S3ObjectInputStream in = client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent();
        action.accept(in);
        return true;

    }
}

tryAdvanceメソッドをひとつ作れば良いだけですので、

hasNextやnextを実装する時のように、状態がどうなるかよく分からなくなって混乱することもありません。

確かに、Streamを作ることだけが目的となる場合は、こちらを使った方が楽になりそうですね。


念のため、これを使った処理も書いておきます。

Stream<InputStream> fileStream = StreamSupport.stream(
        new S3Spliterator("cero.ninja.public", "advent2015java"), false);

Stream<String> stream = fileStream.flatMap(in -> { // 以下略

StreamSupportに直接渡すだけで済む、ということですね。


この章のまとめ

1. 「1000件ずつ読み出して処理」みたいなやつは、IteratorにしてStreamにすれば処理しやすいよ!

2. AbstractSpliteratorを継承すると、Spliteratorを作りやすいよ!


5. まとめ、そして次回予告

では今回のまとめです。

今回のエントリーでは、Streamを作るところと、

その使いどころとしてAWS S3上の大量ファイルを例に挙げて説明しました。


Iteratorを作り、Spliteratorにして、そこからStreamを作る、

また、Stream.flatMapを使って複数リソースシームレスに処理することができました。


Stream.flatMapを使うときちんとリソースのcloseができるのは、便利さがじわじわ効いてくる系のやつで、

たとえばStreamの代わりにIteratorを取り回そうとすると、リソースのcloseタイミングで困ることになります。

そういう点でも、Streamというのは取り回しやすい形なのかなと思います。


さて、長くなってきたので、今回のエントリーはここまでにしたいと思います。


でも実はもう一つ、言及したかったテーマがあります。

それが「Hot ProducerとCold Producer」というお話です。


JavaのStream APIは、どちらかと言えば「既に存在するListやMap」などに対する処理を

簡潔に書くところがクローズアップされているように思います。

これはCold Producer、あるいはCold Streamと呼ばれているそうです。


一方で、HTTPリクエストや、JMS、AMQP、あるいはTwitterのstreamなど、

いわゆるイベントを受け取るようなもの、あるいは無限Streamになるようなものは

Hot Producer、Hot Streamと呼ばれます。


JavaではHot Producerを扱う場合には、Listenerを実装する形が一般的ですが

Streamとして処理することもできるはずです。


Reactive StreamsがJava9に入るとか、バックプレッシャー型がどうしたと言われている中で、

このHot ProducerをStream APIを使って処理するような考え方が、

Javaにおいても、今後、重要になってくるはずです。

次回は、その辺りを考えてみたいと思います。


Stay metal,

See you!

2013-11-07

[]なぜ僕はCaliperではなくJMHを選んだのか。

今日、会社のblogのほうにJMHのエントリーを書きました。

そう、今日のテーマはマイクロベンチマークです。

Javaのマイクロベンチマークツール「JMH」 - Taste of Tech Topics

Javaのマイクロベンチマークに興味がある人は、

GoogleのCaliperというマイクロベンチマークツールを既にご存知かも知れません。

しかし上記のエントリーではCaliperには全く触れていません。


なぜか。

いや、Caliperを試そうとしたら、うまく動かせなかったんですよ (^^;


そんな、まともに動かせない所や、ドキュメントの更新のないCaliperに幻滅した後

JMHを試してみたら、思いのほか素直で使いやすかったため上で紹介するに至りました。


ちょっとここでは、その辺りの舞台裏を紹介してみます。


Caliperをはじめてみようとしたけど・・・

Caliperのトップページには、Get started的な始め方が記載されています。

http://code.google.com/p/caliper/


Mavenプロジェクトを作って、pom.xmlに追記して、

トップページにあるサンプルソースコードをコピーして・・・


あれ、えっと・・・?


一体、どうやって実行するのでしょう? まさか実行の仕方を調べるためだけに、

30分近いチュートリアルビデオを見なければいけないのでしょうか?

さすがにそれはありえないので、ドキュメントを見てみましょう。


Caliperはドキュメントを更新する気がないのかね。

Google Codeのドキュメントと言えば、Wiki。

https://code.google.com/p/caliper/w/list

早速、このWikiを見てみると・・・あぁ、UsersGuideというのがあるようです。


http://code.google.com/p/caliper/wiki/UsersGuide

何これ、目次だけでまだドキュメントが全然ありません。

QuickStartもないので、始めようがありませんでした。


この点、JMHも同じくドキュメントはほとんどありませんが

少なくともトップページで動かせるまでは面倒を見てくれるので

Caliperよりも親切だと言えます。


Code Samplesのコンパイルが通らない

トップページにあるサンプルだけではきっと動かないのだと思い、

リンクされてる「Very short tutorial examples.」を試してみることにしました。

https://code.google.com/p/caliper/source/browse/tutorial/Tutorial.java


うん、コンパイルが通らない (^^;


少し調べてみると、どうやらCaliperはバージョン1.0から大幅にAPIが変わって

アノテーションベースになるらしく、リンク先のサンプルはそれに合った内容となっています。


しかしいまMavenリポジトリにあるものは、命名規則ベースの古いバージョンで、

setUpやtimeXxxという名前のメソッドを探して実行するという、JUnit3のような思想になっています。


なんかこんな風に、複数のバージョンの情報が混在していて

きちんと情報が整理されていない辺りに、心が折れ始めました。


Caliperを動かしたら、いきなり30秒以上かかって死んだ。

その後もうしばらく調べてみて、どうやらCaliperMainクラスのmainメソッドから

ベンチマークを実行できることが分かりました。


ということで自分で簡単なベンチマークを書いてみたうえで

オプションなどつけずに実行してみました。


順調に進んでるかな・・・

Experiment selection: 
  Instruments:   [allocation, micro]
  User parameters:   {}
  Virtual machines:  [default]
  Selection type:    Full cartesian product

This selection yields 4 experiments.
Starting experiment 1 of 4: {instrument=allocation, method=WithInitialSize, vm=default, parameters={}}
Complete!
Starting experiment 2 of 4: {instrument=allocation, method=WithoutInitialSize, vm=default, parameters={}}
Complete!
Starting experiment 3 of 4: {instrument=micro, method=WithInitialSize, vm=default, parameters={}}
ERROR: Trial failed to complete (its results will not be included in the run):
  Trial exceeded the total allowable runtime (30s). The limit may be adjusted using the --time-limit flag.

・・・と思ったら

なんか30秒以上実行したせいでエラーとなりました。


引数を指定すれば30秒制限を突破できるようですが、

そもそも何を何回実行しようとしてエラーになったのか、さっぱり分からないので

30秒制限をオフにして良いのかどうかも分かりません。

だいぶ心が折れてきました。


Caliperでテスト終わったら、勝手に結果がアップロードされた。

先ほどのテストは、まだ続きがあります。

片方は30秒制限を受けましたが、もう一方は何やらきちんと終了したようです。

Starting experiment 4 of 4: {instrument=micro, method=WithoutInitialSize, vm=default, parameters={}}
Complete!

Execution complete: 1.151m.
Collected 45 measurements from:
  2 instrument(s)
  1 virtual machine(s)
  2 benchmark(s)
Results have been uploaded. View them at: https://microbenchmarks.appspot.com/runs/d94be1c2-8be0-48e7-8481-4492145eecd5

Process finished with exit code 0

えっと、結果は・・・1分と少々掛かって、えっと・・・

って、何!?

ベンチマークの結果がアップロードされたって?


そうなんです、普通に実行しただけなのに、ベンチマークの結果がGAEにアップロードされました。


恐らくオプションなどを指定してオフライン実行などもできるでしょうし、

こうやってアップロードされることで、より詳しく分析などもできるなど

嬉しいこともたくさんあるのだと思います。


しかしながら、簡単にマイクロベンチマークを取りたいという僕の想いとは

あまりに掛け離れた思想をしているCaliperに愛想がつきて、心は完全に折れました。


それに比べて、JMHは素直です。

まずは @GenerateMicroBenchmark アノテーションさえつければ動きますし、

オプションなど指定しなくともきちんと動き、分かりやすい結果を出してくれます。


少なくともいまこの時点で始めるのであれば、

JMHの方が使いやすいと言えるのではないのかな、と思います。


批判、コメント、アドバイス、どしどしお待ちしています!

2009-03-24

[][]VirtualBox + UbuntuのFTPで問題。

VirtualBox + Ubuntuの環境で、FTPによるファイル転送が正常に行えなくなった。

ホストのWindows(FFFTP)からゲストのUbuntuにファイルを転送したら、なぜかファイルが壊れてしまう。

1.5MB程度のファイルなのだが、何度ファイルをアップロードしても、

元のファイルサイズより数KB小さくなってしまった。しかも毎回ファイルサイズが違う。


環境は、WindowsXP SP3上のVirtualBox 2.1.2 + Ubuntu JeOS 8.0.4-1で、

問題の原因は明確にはしていないけど、

FTPdにはvsftpdにしてもproftpdにしても結果は変わらなかった。


Ramdiskの問題も疑ったが、SSD上のデータを転送しても症状は変わらず。

結局、FTPでファイルを正常に転送することはできなくなった。

2008-07-15

[]JSR 292と、動的なJava

端的に書いてる記事を探したら、ちょっと古いのしかなかった。

ITmediaのJavaOne2006の記事。

Java SE 7ではホットスワップの機能拡張を行い、次に示すような機能がサポートされる。

Java VMがJavaのものではなくなる日 -- ITmedia エンタープライズ

動的言語のためのホゲホゲだと思って、あんまり見てなかったんだけど、

Javaにとっても、嬉しい拡張なんだよね。

たとえば、これのおかげで、クラスローダでごにょごにょしなくても、

クラス単位のホットデプロイが可能になるよね。


OSGiで動的更新(プラグ/アンプラグ)するのも良いけど、

サクサク感ある開発とか、FTPで.classアップロードして即反映させたいとか、

そういう用途なら、これを使うんじゃないの。


いや、実情をよく知らないまま書いてますけども。