谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2017-03-20

TT-BA09で、テレビの音声をBluetoothで無線化してみた(失敗編)

成功の影に失敗あり。

テレビの音声をBluetoothでワイヤレス化しようという試みですが、一度失敗していたので、今回はその話を書きます。

成功した話は、前回のエントリーを読んでください。

http://d.hatena.ne.jp/cero-t/20170111/1484142249


テレビをBluetooth化するには「トランスミッター」と「レシーバー」が必要というのは前回説明した通りですが、それぞれ最初に選んだ製品は、トランスミッター側が前回も書いた「TT-BA09」で、レシーバー側はオーディオテクニカの「AT-PHA50BT」でした。

AT-PHA50BTは「ワイヤレスヘッドホンアンプ」とうたわれていて簡易アンプ機能がついているため、音量調整ができるだけでなくエフェクト機能などもついているので良さそうと思い、選びました。

これを、次のように接続しました。


テレビ → (光音声出力) → TT-BA09 → (Bluetooth/apt-X) → AT-PHA50BT → (ケーブル) → ヘッドホン


まぁトランスミッターとレシーバーを繋ぐときは、普通にこうなります。前回書いた「TT-BA08」のところがAT-PHA50BTになっただけです。


なんかおかしいぞ?

それで接続して使い始めたのですが、問題が2つありました

  1. 切断/再接続をすると、音が途切れ途切れになって使えなくなる
  2. 音量が変えられない

まず切断/再接続時の挙動ですが、接続中に一度AT-PHA50BT側(レシーバー側)の電源を入れ直すと、音が途切れ途切れになってしまい、実質的に使えなくなります。TT-BA09側(トランスミッター側)の電源を入れ直せば問題なくなりますし、まぁBluetooth機器だし、これぐらいの相性問題はあるよね、という認識で我慢して使おうかと思いました。


それより困るのが「音量を変えられない」という問題。AT-PHA50BT(レシーバー)のダイヤルをいくら回しても、ヘッドホンから聞こえる音量は全く変わらないのです。ヘッドホンアンプの役割を果たしてくれません。

もちろん、テレビ側の音量を変えても(光音声出力経由ですから)ヘッドホン側の音量は変わりません。ライブ音源なのに音が小さすぎたりすると、かなりガッカリです。


初期不良かな?

そう思って、試しにAT-PHA50BTをiPhoneBluetooth接続してみたのですが、音量もエフェクトも含めて、すべての機能がきちんと動きます。再接続しても問題ありません。


それで気づいたんですが、、、


あれ?

TT-BA09と繋ぎ直すと、さっきより音が大きくなってるぞ?


そうなんです、iPhoneで音量を変えた後に、改めてTT-BA09とAT-PHA50BTを接続し直すと、iPhoneで変えた時の音量のままになってるんです。いったい何を言ってるか分からねー(略)

要するにAT-PHA50BTは内部的に音量を保持していて、iPhone(などのスマホ)と接続した時には音量を変更できるけど、TT-BA09と接続した時には変更ができないと、そういう感じのようです。


なので、TT-BA09と繋いでいる時に音が大きい/小さいなーと思ったら、iPhoneと繋ぎ直して音量を調整して、またTT-BA09に繋ぎ直せばいいだけだ、ということです😇

ってできるわけないやろ💢


サポートへの問い合わせ

我慢しながら使おうかな、とも思ったのですが、一縷の望みを掛けて、サポートに問い合わせることにしました。

僕はトラブルシューターですから、こういう不具合報告はお手の物です。問題の状況や、いくつかの組み合わせで検証した結果などを、きちんと事実と推測に分けて報告をしました。もちろん、他社製品のせいだと言われないような予防線も張ります。こういう報告を送られると、逃げようがないですからね。


問い合わせを出して数日後、返信がありました。

返信内容は転載禁止なので詳しくは書けませんが、「スマートフォンと繋ぐことを前提にしたので、電話帳機能がない機器と繋ぐとボリュームとか効かなくなることがあります」みたいな事が書かれていました。


そんなこと、製品紹介のどこにも書かれてないやないか!!!💢💢

https://www.audio-technica.co.jp/atj/show_model.php?modelId=2661


・・・なんて怒りを抑えつつ、せめてと思い、「別に交換しろとは言わないから、せめてホームページに書いとけや!」に社会性フィルタを掛けた内容でメールを返信しておきました。すぐに「指摘を真摯に受け止めます」という旨の大人語な回答を頂きました。

2ヶ月以上前の話ですが、いまだ書かれている様子はないので、今後も変わらないでしょうね。


どうしようこれ

一時は我慢しながら使おうかと思ったAT-PHA50BTですが、このサポートとのやり取りも含めて諦めるしかないと判断し、前回の記事に書いたとおりTT-BA08を買い直したわけです。TT-BA08との組み合わせなら、何ら問題ありません。


と言うことで使わなくなったAT-PHA50BTですが、これ、何に使いましょうね。

iPhoneMacとはきちんと接続できるので、勉強会とかで「音を鳴らしたいけど、ケーブルが届かない!」という時に、サッと出してヒーローになる。そんな夢を見ながら、カバンに忍ばせておくことにしましょうかね。

2017-01-11

TT-BA09で、テレビの音声をBluetoothで無線化してみた(成功編)

BABYMETALのライブが放送される時期だけWOWOWを契約する、ちょっとWOWOWにとっては迷惑気味なメイトのCERO-METALデス!


さて、普段ヘッドホンで音楽を聴くようになってから、テレビでもライブなどを観る時にはテレビのスピーカーでは物足りなくなり、ヘッドホンで聴くようになりました。ただ3mぐらいのケーブルを使ってもテレビまで近いですし、キッチンで料理や洗い物をしながらテレビを観る時になんかには使えません。

そんな背景から、テレビの音声をワイヤレスにしてヘッドホンで聴きたいなーと思っていました。


ちょうどWOWOWBABYMETAL東京ドームライブが放送される年末年始のこのタイミングで、テレビの音声をBluetoothで飛ばして、ヘッドホンで聴ける環境を作ることにしました。


何がいるの?

そもそも、テレビの音声をBluetoothで飛ばすには何が必要か、という整理からです。接続の流れはこんな感じになります。


テレビ → Bluetoothトランスミッター → Blueotoothレシーバー → ヘッドホン


テレビからの音声入力を受けてBluetoothで飛ばすものを「トランスミッター」と呼び、Bluetoothで受信をする方を「レシーバー」と呼びます。ITエンジニアの皆様方にも分かりやすく説明すると、トランスミッターがWi-Fiルーターに相当し、レシーバーWi-Fi子機に相当します。

またレシーバーとヘッドホンをケーブルで繋ぐものもあれば、ヘッドホン自体がレシーバーになるワイヤレスヘッドホンなどもあります。


製品の選び方

Bluetoothトランスミッターやレシーバーを選ぶ際の観点になるのが、次の2つです。

  • 音声入力の方式(ヘッドホンジャック、赤と白のRCAピンプラグ、光デジタルなど)
  • 対応コーデックSBCAACapt-X、LDACなど)

音声入力の方式はまぁ分かるので割愛するとして、よく分からないのがコーデックです。

ざっくり調べた感じ、次のような違いがあるようです。

  • SBC: ほとんどの機器が対応するけど、音質が悪い。
  • AAC: 主にApple製品で使われる。SBCより音質が良い。
  • apt-X: Androidや多くの機器で使われる。AACより遅延が少ない。
  • apt-X HD: apt-Xをさらに高音質にしたもの。まだ対応製品が少ない。
  • LDAC: ハイレゾ音源に対応するもの。音質は一番良いがソニー製品でしか採用されていない。

対応製品が多いapt-Xが、現在ではほぼスタンダードになってるようです。apt-Xに対応したトランスミッターはいくつかあるのですが、AACやLDACに対応した単体のトランスミッターというのは、ちょっと見つけられませんでした。


実際に買った製品

それで今回用意したのは、次の製品です。

この組み合わせで、ばっちりワイヤレスで視聴できる環境が整いました。


TT-BA09

音声のトランスミッター、レシーバーの両方に使える製品です。TaoTronics社からは似たような製品がいくつか出ているのですが、このTT-BA09は光音声入力があったので、これを選びました。ちなみにTT-BA09には音量を調整する機能がありません。


TT-BA08

上の製品と同じく音声のトランスミッター、レシーバーの両方に使える製品です。TT-BA09よりも一回り小さな製品です。

こちらはレシーバーとして使う際に音量の調整ができるようになっています。通常、光音声入力では出力側での音量調整ができず、レシーバーで音量調整する必要があるため、これを選びました。


それで、繋いでみると。

さすがは同じメーカーの同一シリーズの製品でしょうか、全く何の問題もなく接続できました。

接続はこんな流れです。


テレビ → (光音声出力) → TT-BA09 → (Bluetooth/apt-X) → TT-BA08 → (ケーブル) → ヘッドホン


Bluetoothを切断/再接続をしても、全く問題なく音声を聞くことができます。

また、テレビの音量を変えてもヘッドホンから聞こえる音量は変わらないため、テレビは消音にし、音量をTT-BA08で調整するような使い方になります。音量を調整できないレシーバーだと、こういう事ができないんですよね。


遅延とか音質は?

Bluetoothでよく言われるのが音声の遅延。

apt-Xでは40msほどの遅延があると言われており、口と声がズレるのを気にする僕としては、遅延があるのははっきり分かります。遅延があまり分からない人でも、テレビとヘッドホンの音を両方出せば、はっきりとズレていることは分かるでしょう。

ただ、そうは言っても40msですから、ズレを検出しようとしない限りは、さほど気にならない程度でした。


そして気になる音質ですが、これはとても良いです。

ショボいテレビのスピーカーに比べれば音声はクリアに聞こえ、特に海外映画などの英語もだいぶ聞き取りやすくなりました。音楽番組などもしっかり低音が聞こえるのは、さすがヘッドホンですね。

音楽番組を流しっぱなしにしながらリビングキッチンでウロウロしても、ヘッドホンで聴けるのは最高です。


・・・ということで、特にトラブルも起きることないつまらない話になったのですが、実はこの前に、別メーカーの製品との組み合わせでトラブルがありました。せっかくなので、それについては、また改めて書きたいと思います。


See you!

2015-12-12

[Java]Stream APIをつくろう

最近、上司からのパワハラではなく、部下からの「部下ハラ」というものがあると聞きますが、

それって要するにバックプレッシャー型ハラスメントでありリアクティブであるよなと思うこの頃ですが、

皆さんいかがお過ごしでしょうか。


さて、順調な滑り出しからスタートした本エントリーは Java Advent Calenda 2015 の12日目になります。


前日は最年少(?)OpenJDKコミッタの @ さんによる

DeprecatedがJDK9で変わるかもしれない件(JEP 277: Enhanced Deprecation)

明日は日本唯一のJava Championの @ さんのエントリーです。

なかなかエラいところに挟まれてしまったなという感じです!


では本題、

今日のテーマは「Stream APIをつくろう」です。


Stream APIって、使う側がよくフォーカスされるのですが、

作る側がどうなっているのか、あるいはその使いどころはどこなのか、というのはあまり情報がないように思います。


今日はその作る側にフォーカスしたいと思います。

少し長いエントリーですが、ゆっくりとおつきあいください。


おしながき

1. なぜStream APIを作りたいのか

2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

3. そもそもStream APIってどうやって作るの?

4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

5. まとめ、そして次回予告


1. なぜSteam APIを作りたいのか

もともとのきっかけは、AWSのS3上にある大量のログファイルを

elasticsearchに流し込む処理を書いたことでした。


ファイルサイズが膨大なのはもちろんのこと、ファイル数自体も大量ですから、

処理対象となるファイルをリストアップするだけでも大変という状況です。

なのでファイルリストを少しずつ作りつつ、

ファイルを開いて少し読んでは流し込むという、逐次処理をしなくてはいけません。


Java7までなら、うんたらHandlerを渡して処理をグルグル回すところですが

Java8なので、せっかくだからStream APIを使うことにしてみました。


ただ先にも書いたように、Stream APIを提供する側の処理はあまり情報がなかったため

@さんや@さん、@さんに色々と質問しながら勉強しました。

皆さん、ありがとうございました。今日はその辺りをまとめたいと思います。


ちなみに言葉の定義としてはStream APIを作るんじゃなくて

Stream APIのProducerを作るっていう方が正しいとは思うんですが

ぱっと見での分かりやすさのためにこうしました。


2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

まず手始めに、ローカルファイルにあるファイルを処理する方法から考えます。

特定のディレクトリの直下にテキストファイルが大量にあり、

ファイル名などは別に表示しなくても良いから、

テキストファイルを読み込んで、一行ずつ処理したいというシチュエーションです。


Files.listとFiles.linesをどう組み合わせるの?

ディレクトリ内にあるファイルの一覧を Stream<Path> として取得するのは Files.list メソッド、

ファイルを読み込んで Stream<String> として取得するのは Files.lines メソッドが使えます。

では、これを組み合わせる時は、どうしたら良いんでしょうか。


変換はmapメソッドだったよなーと思いつつ、こんな事をやると。

Stream<Stream<String>> stream = Files.list(path)
        .map(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
stream.forEach(System.out::println);

Stream<Stream<String>> になり、恐ろしく期待外れの結果になります。

java.util.stream.ReferencePipeline$Head@421faab1
java.util.stream.ReferencePipeline$Head@2b71fc7e
java.util.stream.ReferencePipeline$Head@5ce65a89

ではどうするかというと、そう、みんな大好きflatMapです。

try (Stream<String> stream = Files.list(path)
        .flatMap(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

こうすれば無事に結果をStream<String>として取り出すことができます。


出力例はこんな感じ。

This is file1.
Hello world.
This is file 2.
This is file3.
Good bye world.

ちなみにFiles.listで作ったStreamはきちんとcloseする必要があるので

try-with-resourcesを使うのがオススメです。


flatMapは毎回closeを呼び出してくれる

ちなみにflatMapがスグレモノなところは、

flatMapメソッドの中で作ったStreamを、毎回必ずcloseしてくれるところです。


上の例ではFiles.linesを利用しているので、別にcloseする必要はないのですが、

たとえばBufferedReaderやInputStreamなどを作った場合などには、closeする必要があります。

そういう時は、flatMapメソッド内で作るStreamにonCloseを書いておけば、毎回呼び出されるのです。

try (Stream<String> stream = Files.list(path)
        .flatMap(s -> {
            try {
                BufferedReader reader = Files.newBufferedReader(s);
                return reader.lines().onClose(() -> { // (1)
                    try {
                        reader.close(); // (2)
                        System.out.println("closed."); // (3)
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

(1) でStreamを作ったところにonCloseを書いて

(2) でBufferedReaderをcloseしています。

This is file1.
Hello world.
closed.
This is file 2.
closed.
This is file3.
Good bye world.
closed.

(3) で書いた「closed.」が3回表示され、きちんと毎回closeされていることが分かります。


この章のまとめ

1. Streamの要素からStreamに変換する時は、flatMapだ!

2. flatMapで作ったStreamは、きちんとcloseしてくれるぞ!


3. そもそもStream APIってどうやって作るの?

前の章ではflatMapが便利だという知見を得ました。

次は、そもそもStream APIってどうやって作れば良いのかを考えます。


恥ずかしながら全然知らなかったので、twitterで聞いてみたところ

@さんと @さんから、

Iteratorを作って、SpliteratorにしてStreamSupporに渡すと良いと教わりました。


この件については、BufferedReader.linesの実装が分かりやすすぎるので引用したいと思います。

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() { // (1)
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false); // (2)
}

(1) Iteratorを実装して

(2) Spliteratorsを使ってSpliteratorに変換し、StreamSupportを使ってStreamに変換しています。

これがStreamを作る時の、ひとつの王道のようですね。


この章のまとめ

1. ルークよ、Iteratorを使え


4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

Streamのつくりかたが分かったところで、

次は、AWS S3にあるテキストファイルを処理する方法を考えます。

と、その前に、

AWSになじみのない方がここで読んでしまうことを防ぐために説明したいのですが、

この章の目的は、AWS S3の操作がしたいのではなく、

Java標準APIでStream APIを作れないようなパターンで、どのように操作できるかを考えることです。


なのでAWSだけでなく、たとえばDBや外部Webサーバなどでも同じパターンが使えますので、

AWSはあくまで一例として捉えてもらえればと思います。


今回利用するクラス

まず簡単に、AWS S3からファイルを取ってくる処理を実装するために必要なクラスを紹介します。


ちなみにAWS S3とはファイルストレージのサービスで、

ブラウザからのファイルアップロード / ダウンロードができる他にも

コマンドラインツール(AWS CLI)や、AWS SDK(AWS SDK for Javaなど)から操作ができます。

今回はAWS SDK for Javaを使うことにします


それで、使うクラスは以下の通りです。

  • AmazonS3Client
    • AWS S3にアクセスするためのクライアント。通信するくせにclose不要なやつ。
  • S3ObjectSummary
    • S3のファイル情報を持つクラス。Javaで言うPathやFileクラスに相当する。
  • ObjectListing
    • ファイル一覧取得結果のメタ情報を持つクラス。ここから List<S3ObjectSummary> を取り出すことができる。

あんまり多くないですね。


AWS S3へのアクセスを試す

まず簡単に、AWS S3にJavaからアクセスする方法を書いておきます。

型などがはっきり分かるよう、Stream APIを使わずに書きます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (1)
List<S3ObjectSummary> list = listing.getObjectSummaries(); // (2)

for (S3ObjectSummary s : list) {
    S3Object object = client.getObject(s.getBucketName(), s.getKey()); // (3)
    try (InputStream content = object.getObjectContent(); // (4)
         BufferedReader reader = new BufferedReader(new InputStreamReader(content))) {
        String line = reader.readLine();
        while (line != null) {
            System.out.println(line); // (5)
            line = reader.readLine();
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

(1)「cero.ninja.public」という名前のバケットにある「advent2015java」というフォルダから情報を取り

(2) ファイル情報をListとして取り出して

(3) ファイルの中身を読み込んで

(4) InputStreamを取り出して

(5) 標準出力に表示しています。


簡単でしょう?

ただ、実は (1) のメソッド呼び出しは最大1000件までしか取得できないという制限があります。

1001件以降のデータを取るためには、別のメソッドを使う必要があります。

List<S3ObjectSummary> allList = new ArrayList<>(); // (6)

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (7)
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    allList.addAll(list); // (8)
    listing = client.listNextBatchOfObjects(listing); // (9)
    list = listing.getObjectSummaries();
}

(6) Listを別に用意しておいて

(7) 最初の1000件を読み込み

(8) ファイル情報を (6) のListに入れながら

(9) 次の1000件を読む、を繰り返す


こんな風になります。

ただこれがたとえば10万件、100万件となってくると、Listがメモリを過剰に消費しますし

そもそも実際の処理を始めるまでのオーバーヘッドが大きくなりすぎます。


じゃぁwhileの中で処理しちゃえばいいやんか、という話になってきます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java");
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    list.stream().forEach(this::doSomething); // (10)
    listing = client.listNextBatchOfObjects(listing);
    list = listing.getObjectSummaries();
}

(10) 取得した1000件を使ってすぐに処理を始めます。


これはこれで正解だと思いますしシンプルで良いのですが、

「S3から情報を取るクラスに処理を渡す」よりも

「S3から情報を取ってStreamにする」ほうが、何かと取り回しが楽なのですよね。


AWSへのアクセスをStreamにしよう

ということで、ここまで学んだ知識を利用して、

AWS S3にあるテキストファイルをまとめて Stream<String> にする方法を考えます。


まずはファイル一覧を取ってくる部分を、Iteratorにします。

public class S3Iterator implements Iterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    ObjectListing listing;
    List<S3ObjectSummary> cache; // (1)

    public S3Iterator(String bucketName, String key) {
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    void load() {
        if (cache != null && cache.size() > 0) {
            return;
        }

        if (listing == null) {
            listing = client.listObjects(bucketName, key); // (2)
        } else {
            listing = client.listNextBatchOfObjects(listing); // (3)
        }

        cache = listing.getObjectSummaries(); // (4)
    }

    @Override
    public boolean hasNext() {
        if (cache != null && cache.size() > 0) {
            return true;
        }

        load();
        return cache.size() > 0;
    }

    @Override
    public InputStream next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        S3ObjectSummary summary = cache.remove(0); // (5)
        return client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); // (6)
    }
}

(1) 読み込んだ1000件のファイル情報を保持しておくcacheを作っておき

(2) 初回はlistObjectsを使って読み込み

(3) 2回目以降はlistNextBatchOfObjectsを使って読み込みます。

(4) 読み込んだファイル情報はcacheに入れておき

(5) removeしながら取り出します。

(6) また、S3ObjectSummaryのStreamにはせず、InputStreamにしてから返します。

このようにすれば、Iteratorの外側でAmazonS3Clientを利用する必要がなくなるためです。


そして、このIteratorからStreamを作り、さらにflatMapを使ってStream<String>に集約しましょう。

Stream<InputStream> fileStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
        new S3Iterator("cero.ninja.public", "advent2015java"), Spliterator.NONNULL), false); // (7)

Stream<String> stream = fileStream.flatMap(in -> { // (8)
    BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
    return reader.lines() // (9)
            .onClose(() -> { // (10)
            reader.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    });
});

stream.forEach(System.out::println); // (11)

(7) IteratorからSpliteratorを作り、そこからStreamに変換します。これでファイル一覧が Stream<InputStream> になります。

(8) InputStreamから読み込んだ内容を集約するために、flatMapを使って

(9) BufferedReader.linesで作ったStream<String>を集約します。

(10) ここでonCloseを書いておくと、BufferedReaderが毎回きちんとcloseされます。flatMapさまさま。

(11) 最後に標準出力への書き出しをしていますが、もちろんこの Stream<String> はどのように扱うこともできます。


という感じで、AWS S3上にあるテキストファイルを Stram<String> にして

処理することができるようになりました。


gzipやzipだったらどうするの?

ちなみに大量のテキストファイルが存在する場合、たとえばそれがWebサーバのログだったりすると

たいていにおいてgzip圧縮されているかと思います。

そういう時は、読み込む時にGZIPInputStreamを使うと良いでしょう。


先に出した例とあまり変わらず、GZIPInputStreamを挟んだだけの形となります。

Stream<String> stream = fileStream.flatMap(in -> {
    BufferedReader reader;
    try {
        reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), StandardCharsets.UTF_8)); // (12)
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    return reader.lines()
            .onClose(() -> {
                try {
                    reader.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
});

(12) のところにGZIPInputStreamを挟みました。


では、読み込む対象が、複数のファイルを持つようなzipファイルだった場合はどうすれば良いでしょうか。

実はこれが割と難敵で、zipで処理を回せるようなIteratorをもう一つ作る必要があります。


ZipInputStreamにStream<InputStream>を返すようなAPIを作ってくれれば良いのにと思うんですが

(ZipFileやFileSystemsならStreamを作りやすいんですが、ZipInputStreamからは作れない!)

そのあたりは自前で少し実装する必要があります。

さすがにエントリーが長くなってきたので、今回は割愛します。


Spliteratorを直接作っても良い

@さんから、Iteratorを実装するよりも

AbstractSpliteratorを実装したSpliteratorを作ったほうが楽だよという知見をもらったので

それを使ったサンプルも掲載しておきます。

public class S3Spliterator extends Spliterators.AbstractSpliterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    List<S3ObjectSummary> cache;
    ObjectListing listing;

    public S3Spliterator(String bucketName, String key) {
        super(Long.MAX_VALUE, 0);
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    @Override
    public boolean tryAdvance(Consumer<? super InputStream> action) {
        if (cache == null) {
            listing = client.listObjects(bucketName, key);
            cache = listing.getObjectSummaries();
        } else if (cache.isEmpty()) {
            listing = client.listNextBatchOfObjects(listing);
            cache = listing.getObjectSummaries();
        }

        if (cache.isEmpty()) {
            return false;
        }

        S3ObjectSummary summary = cache.remove(0);
        S3ObjectInputStream in = client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent();
        action.accept(in);
        return true;

    }
}

tryAdvanceメソッドをひとつ作れば良いだけですので、

hasNextやnextを実装する時のように、状態がどうなるかよく分からなくなって混乱することもありません。

確かに、Streamを作ることだけが目的となる場合は、こちらを使った方が楽になりそうですね。


念のため、これを使った処理も書いておきます。

Stream<InputStream> fileStream = StreamSupport.stream(
        new S3Spliterator("cero.ninja.public", "advent2015java"), false);

Stream<String> stream = fileStream.flatMap(in -> { // 以下略

StreamSupportに直接渡すだけで済む、ということですね。


この章のまとめ

1. 「1000件ずつ読み出して処理」みたいなやつは、IteratorにしてStreamにすれば処理しやすいよ!

2. AbstractSpliteratorを継承すると、Spliteratorを作りやすいよ!


5. まとめ、そして次回予告

では今回のまとめです。

今回のエントリーでは、Streamを作るところと、

その使いどころとしてAWS S3上の大量ファイルを例に挙げて説明しました。


Iteratorを作り、Spliteratorにして、そこからStreamを作る、

また、Stream.flatMapを使って複数リソースをシームレスに処理することができました。


Stream.flatMapを使うときちんとリソースのcloseができるのは、便利さがじわじわ効いてくる系のやつで、

たとえばStreamの代わりにIteratorを取り回そうとすると、リソースのcloseタイミングで困ることになります。

そういう点でも、Streamというのは取り回しやすい形なのかなと思います。


さて、長くなってきたので、今回のエントリーはここまでにしたいと思います。


でも実はもう一つ、言及したかったテーマがあります。

それが「Hot ProducerとCold Producer」というお話です。


JavaのStream APIは、どちらかと言えば「既に存在するListやMap」などに対する処理を

簡潔に書くところがクローズアップされているように思います。

これはCold Producer、あるいはCold Streamと呼ばれているそうです。


一方で、HTTPリクエストや、JMS、AMQP、あるいはTwitterのstreamなど、

いわゆるイベントを受け取るようなもの、あるいは無限Streamになるようなものは

Hot Producer、Hot Streamと呼ばれます。


JavaではHot Producerを扱う場合には、Listenerを実装する形が一般的ですが

Streamとして処理することもできるはずです。


Reactive StreamsがJava9に入るとか、バックプレッシャー型がどうしたと言われている中で、

このHot ProducerをStream APIを使って処理するような考え方が、

Javaにおいても、今後、重要になってくるはずです。

次回は、その辺りを考えてみたいと思います。


Stay metal,

See you!

2015-11-02

[]アメリカのプリペイドSIM事情 2015年版

JavaOne参加レポートを書かずに、SIMレポートを書く @ です。

参加レポートは会社ブログの方に書くと思うので、そちらを待っててくださいね!


ということで日本から持ち込んだSIMフリー端末をアメリカで使う場合、

どうするのが良いねんっていう話です。


AT&T or T-Mobile

SIMフリーのスマホをアメリカで使う場合、AT&TかT-MobileでSIMを買うことになります。

過去にJavaOneの会場で入りやすかったという経験から、今回もT-Mobileを選びました。


一緒に行った後輩はAT&Tをチョイスしていましたが、大きな差は見られず、

どちらもホテル、町中、JavaOne会場問わず利用できている感じでした。

またJavaOne会場はWi-Fiがかなり強化されていたおかげで、

人数が集まるキーノートセッション含め、スマホ(テザリング)に頼る必要は全くありませんでした。


ただ標準的なプランがT-Mobileは$40で、AT&Tが$45だったので

ちょっとだけT-Mobileの方がお得でした。


T-Mobileのどのプランがいいの?

T-Mobileにはプリペイドプランがいくつかあって、

(1) $40/月で通話/SMSし放題、データは1GBまで(超えると128kbpsに制限)

(2) $50/月で通話/SMSし放題、データは3GBまで(超えると128kbpsに制限)

(3) $60/月で通話/SMSし放題、データは7GBまで(超えると128kbpsに制限)

(4) $3/月で通話/SMSは10セント、データは別途$10/週で1GBまで(超えると通信不可)

この4つです。


(1)〜(3)は似たようなプランで、

通話もSMSも発着信ともし放題、データ通信は規定量までが高速、

それを超えると128kbpsに制限されます。


いまの時代、通話やSMSなんか使わないんじゃないかと思うんですが

日本みたいにLINEでヨロとか言えませんし、Uberの登録にもSMSが必要です。

(アメリカでは電話やSMSを受ける側もお金が必要になります)

なので意外と発着信がフリーなのは助かったりします。


ただしデータ通信が規定量を超えた時が大変で、お金を払っても追加することができません。

プラン変更も30日経過するまではできません。つまり低速回線から逃れられなくなります。

(今回はそれを踏んでしまったので大変でした)


データ通信量が読めない場合に使いやすいのが (4) のPay As You Goプランです。

データ通信は1GBごとに$10を払えば良いので、足りなくなった時に追加すれば済みます。

ただし電話やSMSを受けるには、少し残高に入れてく必要がありますが(入金は最低$10)


一週間の滞在で言えば、

月額($3)+データ通信1GB($10)+入金($10)でも

$23なので、$40のプランよりはお得になる計算です。

本当にこれで済むのかは微妙なところかも知れませんが。


以降では、今回の経験などメモしておきます。


1GBでは1週間もちませんでした

今回は10日間の滞在なので、$40のプランを選択しました。

店頭で支払った額は$40ちょうど。特に手数料など掛かりませんでした。


ただとても残念なことに、通信が1週間弱で1GBを超えてしまい、速度制限を受けました。

ちょいちょテザリングしたり、短時間とは言えリモートデスクトップもしたので

1GBを削るには十分すぎるデータ通信をしてしまったようです。


128kbps制限を受けると、もちろん使えなくはないものの、

画像がロードされなかったり、リモートデスクトップが絶望的になったり

インターネットマンにはちょっと辛い状況でした。


先に書いたように、この状態になるとお金を追加してもどうにもならないので、

もう一枚、SIMを買い足さざるを得なくなりました。

($10で1GBを追加できるもんだと思い込んでたんですよね・・・)


追加で買ったPay As You Go

それで次に購入したのがPay As You GoプランのSIM。

なぜか契約手数料が発生して、$10(本当は$15だけどキャンペーンで安く)追加で掛かりました。


お店の人の説明では、$40のプランなどにも手数料は掛かるようだったので

最初に$40のプランを$40だけで購入できたのは、

JavaOne前の特別キャンペーンだったのかも知れません。


そんなわけで、店頭で払ったのは

月額($3)+データ通信($10)+手数料($10)+税で、$24ほど。


残高を入れないとSMSを受信できないのですが

通話やSMSは、必要な時だけ日本から持ってきたケータイを使えば良いや、

という強い気持ちを持って、残高を追加するのはやめておきました。


どうすれば良かったんだろうね?

結果論的には、$50のプランが正解だったと思います。

日本ではだいたい月に5GB、時には7GB近く使っていることを考えれば

10日間で2GB前後になることは推測できたはずです。

今回は、後から追加できないと知らなかったので、選択をミスってしまいました。


利用量を読めないなら、使った分に応じて追加購入できる

Pay As You Goの方が良いと思います。

ただ、残高やら何やらを追加するのって面倒だったりしますし、

想定外に残高が吹っ飛んで残念な目に遭うこともあるので、

その手間を考えると、通常の月額プランの方が良いんじゃないかなと思います。


そんなわけで、次も機会があるなら、3GBのプランにするかなーと思った次第です。

いや、ソフトバンクのアメリカ放題を使えば、こんなことに悩まなくて済むんですけどね!

2015-10-19

[]DynamoDBでTomcatのセッション共有をするとハマるかも

AWSを仕事で使い始めて1年半、

ようやく頭がクラウド脳に切り替わってきた @ です。

好きなAWSサービスはKinesisです。まだ使ってませんけどね!


さて、今日のテーマは「AWSでTomcatのセッション共有」です。

EC2上で動くTomcatのセッションオブジェクトを、DynamoDBを使って共有するというものです。


話題としてはそれなりに枯れていると思うのですが、

実案件で使おうと思ったら問題が出そうになって困ってる、という話です。


発生する問題は?

どういう問題が起きるか、先に書いておきます。


発生する問題は、

複数のTomcatをELBで分散させている時に、

スケールインやスケールアウトが短時間に連続して発生すると、

セッションが巻き戻る(先祖返りする)可能性がある、というものです。


セッションが消えるならまだしも、

先祖返りするというのは、実案件において許されない感じです。


あっ、

そもそも「セッションなんか使うから問題が起きるんだ」というツッコミはナシでお願いします。

それは分かったうえで、やむを得ずセッションを使うならどうしようか、という検討なのです。


Tomcat + DynamoDBの組み合わせ方

Tomcatのセッション共有になぜDynamoDBを使うのか、どういう設定をするのか、

というのは、この辺りのエントリーで学びました。


Amazon DynamoDBによるTomcatセッション永続化とフェイルオーバー - Developers.IO

Tomcat 7.x時代の記事ですが、考え方や注意点がとても分かりやすく紹介されています。


AWSでセッションをクラスタリングする方法について考えてみた結果、DynamoDBがよさそうなので試してみた。 - Qiita

Tomcat8 / Spring Boot / DynamoDBの連携がかなり詳しく紹介されています。


これらのサイトでも紹介されている内容を踏まえると、

以下のような流れになりそうです。

1. ELBの設定でスティッキーにして、同一セッションIDは同じTomcatに振り分ける

2. TomcatからDynamoDBに非同期で書き込む。遅延時間は調整可能(最低1秒?)

3. Tomcatがセッションを持っていない場合に限り、DynamoDBを参照する。

 (スケールイン / スケールアウトなどが起きた時に、セッションを引き継ぐことができる)


この流れは、いわゆる「リードスルー方式」と「ライトビハインド方式」を組み合わせたものだと言えます。

一見、この流れで問題がなさそうなのですが、

よくよく考えるとセッションの巻き戻しが起きることが分かりました。


どういう時に問題が起きる?

問題の再現状況は以下の通りです。

  • ELB
  • Tomcat 2台(仮にTomcat1、Tomcat2と呼ぶ)
  • DynamoDB

この構成で「フェイルオーバー → 復帰 → フェイルオーバー」を、

セッションタイムアウトよりも短い時間内で繰り返すと、問題が発生します。


時系列で順を追って説明しますね。


(1) Tomcat1 + DynamoDBにセッション保持(Tomcat1 → DynamoDB)

ユーザがアクセスした際に、ELBによってTomcat1に振り分けられたとします。

以後、このユーザは必ずTomcat1に振り分けられるため

ユーザがセッションに書き込んだ内容は、Tomcat1からDynamoDBに永続化され

Tomcat1のメモリとDynamoDBの両方で保持されることになります。


(2) フェイルオーバー(DynamoDB → Tomcat2)

ここでELBからTomcat1への振り分けを遮断すると、

ユーザのアクセスは、ELBによってTomcat2に振り分けられます。

ここでセッション情報はDynamoDBからTomcat2にロードされるため

これまで蓄積してきたセッション情報が消失することも、巻き戻ることもありません。


(3) Tomcat2 + DynamoDBにセッション保持(Tomcat2 → DynamoDB)

ユーザのアクセスはTomcat2に振り分けられていますので

ユーザがセッションに書き込んだ内容は、

Tomcat2のメモリとDynamoDBの両方で保持されます。


(4) Tomcat1の復帰(DynamoDB → Tomcat1)

次に、ELBからTomcat1への振り分けを再開すると

ユーザのアクセスは、Tomcat1に振り分けられます。

(Tomcat2に固定されず、Tomcat1に戻るんですよね)


Tomcat1を一度再起動するなどして、メモリにあったセッション情報を空にしておけば

最新のセッション情報がDynamoDBからTomcat1にロードされるため

やはりセッションの巻き戻しはありません。


(5) Tomcat1 + DynamoDBにセッション保持(Tomcat1 → DynamoDB)

この状況でユーザがセッションに書き込んだ内容は、

Tomcat1のメモリとDynamoDBの両方で保持されます。

ここまでは問題ありません。


(6) 改めてフェイルオーバー(Tomcat2のみ)

ここで再度ELBからTomcat1への振り分けを遮断すると、

ユーザのアクセスは、ELBによってTomcat2に振り分けられます。


この時、Tomcat2のメモリ内には (3) の時に書き込んだセッション情報が存在するため

わざわざDynamoDBを読みに行かず、Tomcat2が保持しているセッション情報を利用します。

そのため (5) で更新した内容から (3) の内容まで、巻き戻しが発生してしまいます。


つまり、0〜1秒ぐらいのタイミング問題(避けられない事故)ならまだしも、

数分ぐらいのオペレーションでも、セッションの巻き戻りが起きることになります。


じゃぁどうするの?

ここまで見てきた通り、書き込みが非同期である以上、

巻き戻りの問題が発生することは避けられません。


もちろん運用上、このような操作(短時間でのフェイルオーバー)を行なわないようにするのは一つの解ですが、

たとえば「リリース失敗時の切り戻し」なんてことを考えると、発生する可能性はゼロではありません。

そのため「買い物カゴ」のような、巻き戻りが業務に影響してしまうものは、この方式では扱えません。


では、設定を修正して何とかできないか考えてみます。

そもそも「非同期書き込み」と言えば、リードスルー / ライトビハインド方式以外にも

 1. ライトスルー方式(DynamoDBの更新時に、全Tomcatも同時に更新する)

 2. Tomcatのメモリを一切使わない(セッションの読み書き時には必ずDynamoDBを利用する)

の2つが考えられます。


ただ、TomcatのPersistenceManagerを利用する限りは、

どう設定しても1にも2にもならないことが分かりました。


そもそもPersistenceManagerはセッション共有の仕組みではなく、

JavaVMのヒープを過剰に占有しないために永続化するものです。

それをセッション共有のために代用しているだけであり、

本気でセッション共有を考えられたものではありませんでした。


・・・という事で、今回は、セッションを利用することを諦めました (^^;

最初に「セッション使うなっていうツッコミはナシ」とか言っておきながら、すみません (^^;;


まず「買い物カゴ」のような、決して巻き戻ってはいけない重要な情報は

セッションで扱うことを諦め、直接DBに永続化することにしました。


一方で「ログイン情報」や「行動をトレースするための情報」など、

最悪、多少巻き戻っても業務に影響しないような情報のみ、セッションに残すようにしました。


ちゃんちゃん♪


他の選択肢 - Spring Session

そんなわけで、DynamoDB、というか、

PersistenceManagerを利用したセッション共有は、お手軽にできるものの、

問題がありそうだという結論に至りました。


それでもセッション共有は諦めきれず、

他の選択肢として Spring Session を確認してみました。

Spring Sessionは、Redisなどをバックエンドとして利用できるセッション共有の仕組みです。


軽くソースを読んでみたところ

 1. session#setAttributeすると「差分データ」として内部で保持される

 2. ServletFilterで、処理の終了後に「差分データ」をまとめてRedisに反映させる

 3. session#getAttributeしたオブジェクトに対して操作しても、差分は反映されないので注意

ということが分かりました。

この仕組みなら、フェイルオーバー時の挙動もあまり問題にならなさそうです。


ただ、2の通り、セッションには即時反映せず、リクエスト終了時に反映するため、

たとえば同一セッションで複数リクエストを処理する際には、

(たとえば同一セッションから同時にアクセスカウンターをインクリメントしようとしても)

上手くいかないことがありそうです。


即時反映をサポートをして欲しいというチケットが挙がっているので、

将来的には即時反映がサポートされるかも知れません。

https://github.com/spring-projects/spring-session/issues/250


また3の制約により、既存のアプリケーションにSpring Sessionを適用する場合には

多少ソースを改修しなければいけないこともあるでしょう。


そんな理由で、今回は採用を見送りました。

今後の案件でSessionを使わざるを得ない場合に、改めて検証してみたいと思います。


まとめ

1. TomcatのPersistenceManager + DynamoDBなどによるセッション共有は、

 書き込み遅延や、フェイルオーバー時の巻き戻りを受け入れざるを得ない。


2. Spring Session + Redisによるセッション共有は、

 同一セッションでの複数リクエストへの考慮をすること、

 セッション更新時に、きちんとsession#setAttributeすることをルール化すれば、

 それなりにきちんと使えそう。


3. そもそも、絶対に巻き戻っちゃいけないトランザクショナルなデータは

 セッションなんてあいまいなものに持たせず、RDB管理しようぜ。


まぁ結局、セッション使うなという所に戻ってくるのは、何ともですね。