谷本 心 in せろ部屋 このページをアンテナに追加 RSSフィード

2017-12-21

[][]EXレイヤーの無限コンボが12/15のアップデートでどう変わったか #FEXL

前回、無限コンボについてブログを書いた後にFEXLがアップデートされ、無限コンボへの対策が取られました。アップデートで、どう変わったのか詳しく見てみます。

前回のエントリー → http://d.hatena.ne.jp/cero-t/20171214/1513251274


地上でのダッシュ小Pチェーンのループ

元々できたダッシュ小P-大P-ダッシュ小P-... というタイプの無限は簡単にはできなくなりました。手元で調べた限りですが、小Pの発生が総じて1F遅くなったようです(3F→4F)

ただ前回のエントリーで予想した通り「持続当てになる」パターンの無限は、いくつかできてしまいました。

https://twitter.com/twitter/statuses/941631711133044736 (スカロ、背中から)

https://twitter.com/twitter/statuses/942042937143078914 (カイリ、目押しも交えて)


これ、どうするのでしょうね。完全に無限をなくそうとすると、次の選択肢ぐらいしか思いつかないのですが。

1. 持続当てできなくする(←難しいし、悲しい)

2. チェーンコンボの硬直を増やす(←ありだと思うけど、違和感に繋がるかも)

3. チェーンコンボから大攻撃を出すと必ずダウン(←いやそれだとHADES強くなりすぎでしょ)

4. イージーコンビネーション以外のチェーンをなくす(←諸々、根本的な見直しになってしまう)


今回のアップデートと同じように、いくつかの持続当てについて細かな調整だけするというのは、イタチゴッコであって根本解決になりません。また、たとえばヒット数が増えればのけぞりが小さくなる、という類の調整は様々なルールをシンプルにしてきたこのシリーズらしくないと思います。

もちろんあくまでこの話の前提は「完全に無限をなくそうとすると」なので、無限が残っても良いと考えれば、この限りではないです。


空中でのダッシュ小P拾いのループ

これも調整されて、できなくなりました。前回のエントリーでは「浮きを高くするか低くするしかない」と書きましたが、浮きは低くなる方向に調整されました。それでも浮いている相手にチェーンコンボを入れることはできるので、とても絶妙な調整です。

今回の調整のおかげで「高さ」を「リソース」と捉えるようになり、どう消費し、どう稼ぐかみたいな事を考えるようになりました。


ほくとの即死コンボでこんな感じです。連昇激で打ち上げ、J大Pでも高さを維持し、小Pでいったん高度を下げて、残りの技を当てる、という感じです。

https://twitter.com/twitter/statuses/941742592772812800


ちょうど良い塩梅の高さ調整だと思いました。


スパコンのループ中にゲージが溜まるか?

空中コンボの浮きが調整されたので、浮かせた後にゲージを1本分(50)溜めるのが難しくなりました。

今のところ、自分で確認できたゲージ溜め最大は、アレンの空中コンボです。

https://www.youtube.com/watch?v=nCiyijkm2PQ


強氣による強化がなしだと、33溜まります。

・トリプルブレイク(1)-J大P(8)-ダッシュ小P(1)-下中P(5)-下中K(5)-ライジングドラゴン(13)-トリプルブレイク-


強氣を選べばゲージ増加量を増やせます。

インフィニティのデッキにゲージ10%アップの強氣が3つ付いてるので、1.3倍になって(端数は切り捨て)40溜まります。

上にURLを張った動画のコンボがこれで、11ループのコンボとなっています。

・トリプルブレイク(1)-J大P(10)-ダッシュ小P(1)-下中P(6)-下中K(6)-ライジングドラゴン(16)-トリプルブレイク-


もし強氣のカスタマイズができるようになり、仮にゲージアップを5つ積むと1.5倍で、44溜まることになります。

・トリプルブレイク(1)-J大P(12)-ダッシュ小P(1)-下中P(7)-下中K(7)-ライジングドラゴン(16)-トリプルブレイク-


ゲージが44も溜まれば17ループのコンボができることになるので、無限でないにせよ、ラウンドの開始から終了までずっとコンボを継続できるはずです。ちょっとゲージが溜まりすぎですが、強氣による恩恵だと思えばアリの範疇ですかね?


当たり判定が残ったまま落ちてくる必殺技

このタイプの無限は今のところありません。

ただ今後、そういうタイプの必殺技が出てきた時には、空中ヒット時には「きりもみヒット」するだけでなく「浮く力が弱い」という対策を取ることで、無限コンボを防ぐことができるようになりますね。


たとえばロッソのヴェズーヴィオの怒りで言うと、1段目が地上ヒットすれば相手が浮いて2段目が入るけど、1段目が空中ヒットすると浮きが低くなって2段目が空振りしてしまう、という感じの調整ができるということです。

あとエリアも、ジャクソンキックが空中ヒットすると浮きが低くなり、ポップアップニーは入らないけどパーティションブレイクなら入る、みたいな調整をされたら良いんじゃないかなと思います。


最後に

12/15のアップデートにより、安易な無限がなくなったのはとても良かったと思います。ただ同時にツイート数なども減ったことを考えると、もしかすると無限があったほうが話題にはなりやすかったのかも知れませんよねw

ただ、コンボを作る立場としては、アップデート後のほうが断然良いです。


それでも、持続当てを使ったループコンボや、強氣を使ってゲージを溜めるコンボ、また、今回のエントリーでは紹介していない、少し特殊なテクニックを使ったループコンボなどは残っています。

この辺りを完全に潰しに行くのか、潰すのはほどほどにするのか、今後のバージョンアップや次回のβテストなどがもしあれば、よく注目したいと思います。


・・・ただ僕としては無限対策云々よりも、スパコンのカス当たりから拾えるコンボを、復活させて欲しいのですけどね!


[][]EXレイヤーの無限コンボが12/15のアップデートでどう変わったか #FEXL

前回、無限コンボについてブログを書いた後にFEXLがアップデートされ、無限コンボへの対策が取られました。アップデートで、どう変わったのか詳しく見てみます。

前回のエントリー → http://d.hatena.ne.jp/cero-t/20171214/1513251274


地上でのダッシュ小Pチェーンのループ

元々できたダッシュ小P-大P-ダッシュ小P-... というタイプの無限は簡単にはできなくなりました。手元で調べた限りですが、小Pの発生が総じて1F遅くなったようです(3F→4F)

ただ前回のエントリーで予想した通り「持続当てになる」パターンの無限は、いくつかできてしまいました。

https://twitter.com/twitter/statuses/941631711133044736 (スカロ、背中から)

https://twitter.com/twitter/statuses/942042937143078914 (カイリ、目押しも交えて)


これ、どうするのでしょうね。完全に無限をなくそうとすると、次の選択肢ぐらいしか思いつかないのですが。

1. 持続当てできなくする(←難しいし、悲しい)

2. チェーンコンボの硬直を増やす(←ありだと思うけど、違和感に繋がるかも)

3. チェーンコンボから大攻撃を出すと必ずダウン(←いやそれだとHADES強くなりすぎでしょ)

4. イージーコンビネーション以外のチェーンをなくす(←諸々、根本的な見直しになってしまう)


今回のアップデートと同じように、いくつかの持続当てについて細かな調整だけするというのは、イタチゴッコであって根本解決になりません。また、たとえばヒット数が増えればのけぞりが小さくなる、という類の調整は様々なルールをシンプルにしてきたこのシリーズらしくないと思います。

もちろんあくまでこの話の前提は「完全に無限をなくそうとすると」なので、無限が残っても良いと考えれば、この限りではないです。


空中でのダッシュ小P拾いのループ

これも調整されて、できなくなりました。前回のエントリーでは「浮きを高くするか低くするしかない」と書きましたが、浮きは低くなる方向に調整されました。それでも浮いている相手にチェーンコンボを入れることはできるので、とても絶妙な調整です。

今回の調整のおかげで「高さ」を「リソース」と捉えるようになり、どう消費し、どう稼ぐかみたいな事を考えるようになりました。


ほくとの即死コンボでこんな感じです。連昇激で打ち上げ、J大Pでも高さを維持し、小Pでいったん高度を下げて、残りの技を当てる、という感じです。

https://twitter.com/twitter/statuses/941742592772812800


ちょうど良い塩梅の高さ調整だと思いました。


スパコンのループ中にゲージが溜まるか?

空中コンボの浮きが調整されたので、浮かせた後にゲージを1本分(50)溜めるのが難しくなりました。

今のところ、自分で確認できたゲージ溜め最大は、アレンの空中コンボです。

https://www.youtube.com/watch?v=nCiyijkm2PQ


強氣による強化がなしだと、33溜まります。

・トリプルブレイク(1)-J大P(8)-ダッシュ小P(1)-下中P(5)-下中K(5)-ライジングドラゴン(13)-トリプルブレイク-


強氣を選べばゲージ増加量を増やせます。

インフィニティのデッキにゲージ10%アップの強氣が3つ付いてるので、1.3倍になって(端数は切り捨て)40溜まります。

上にURLを張った動画のコンボがこれで、11ループのコンボとなっています。

・トリプルブレイク(1)-J大P(10)-ダッシュ小P(1)-下中P(6)-下中K(6)-ライジングドラゴン(16)-トリプルブレイク-


もし強氣のカスタマイズができるようになり、仮にゲージアップを5つ積むと1.5倍で、44溜まることになります。

・トリプルブレイク(1)-J大P(12)-ダッシュ小P(1)-下中P(7)-下中K(7)-ライジングドラゴン(16)-トリプルブレイク-


ゲージが44も溜まれば17ループのコンボができることになるので、無限でないにせよ、ラウンドの開始から終了までずっとコンボを継続できるはずです。ちょっとゲージが溜まりすぎですが、強氣による恩恵だと思えばアリの範疇ですかね?


当たり判定が残ったまま落ちてくる必殺技

このタイプの無限は今のところありません。

ただ今後、そういうタイプの必殺技が出てきた時には、空中ヒット時には「きりもみヒット」するだけでなく「浮く力が弱い」という対策を取ることで、無限コンボを防ぐことができるようになりますね。


たとえばロッソのヴェズーヴィオの怒りで言うと、1段目が地上ヒットすれば相手が浮いて2段目が入るけど、1段目が空中ヒットすると浮きが低くなって2段目が空振りしてしまう、という感じの調整ができるということです。

あとエリアも、ジャクソンキックが空中ヒットすると浮きが低くなり、ポップアップニーは入らないけどパーティションブレイクなら入る、みたいな調整をされたら良いんじゃないかなと思います。


最後に

12/15のアップデートにより、安易な無限がなくなったのはとても良かったと思います。ただ同時にツイート数なども減ったことを考えると、もしかすると無限があったほうが話題にはなりやすかったのかも知れませんよねw

ただ、コンボを作る立場としては、アップデート後のほうが断然良いです。


それでも、持続当てを使ったループコンボや、強氣を使ってゲージを溜めるコンボ、また、今回のエントリーでは紹介していない、少し特殊なテクニックを使ったループコンボなどは残っています。

この辺りを完全に潰しに行くのか、潰すのはほどほどにするのか、今後のバージョンアップや次回のβテストなどがもしあれば、よく注目したいと思います。


・・・ただ僕としては無限対策云々よりも、スパコンのカス当たりから拾えるコンボを、復活させて欲しいのですけどね!

2017-12-07

[][]Goで格闘ゲームマクロを実装してみた

このところ、夜な夜な格ゲーの動画を撮ってはYoutubeにアップしては、SRK(shoryuken.com)に取り上げられて承認欲求を満たしている @ です。SRKは世界最大の格ゲーコミュニティだよ!

http://shoryuken.com/?s=Shin+Tanimoto


このエントリーは Go2 Advent Calendar 2017 の7日目です。

https://qiita.com/advent-calendar/2017/go2


はじめに

まえがき

学生時代、僕は格闘ゲームの連続技(コンボ)の研究を嗜んでおりまして、学校に行くぐらいならゲーセンを行く、試験勉強するぐらいならコンボの研究をする、卒論を書くぐらいならコンボビデオを撮るという、クズみたいな学生生活を送っておりました。当時、いまは亡き「ゲーメスト」というアーケードゲーム雑誌で体験ライターとして編集部に泊まり込んで記事を書くような体験もしました。

その頃よくプレイしていたストリートファイター系のゲームが、なんと18年の歳月を経て新作を出すというのです。これは万難を排してやらねばならぬと相成ったのですが、当時20歳前後だった僕も先日ついに40歳を迎えたわけで、さすがにもうあの頃のようには手が動きません。

しかし、いまの僕には、あの頃の自分にはなかったプログラミング能力があります。人間が手作業で行っていることを、機械に行わせるのが仕事です。そう、格闘ゲームも機械にやらせてしまえば良いじゃないか、というのが今回のテーマになります。


※なお、ゲームのネット対戦やオンラインランキングでマクロを使うのはマナー違反です。決してやらないでください。


目的

プレステ3プレステ4Xboxなどで使えるマクロを作ります。実装にはGoを使いました。


用意するもの

ゲーム機とPCをつなぐ、何かしらのデバイスが必要になります。今回はConsole Tuner社のTitan Oneという機器を利用します。

https://www.consoletuner.com/products/titan-one/

f:id:cero-t:20171207075818j:image:w360


これはUSBゲームコントローラーを様々なゲーム機で使えるようにするための変換器です。しかしただの変換機能だけでなく、PCと繋ぐことでマクロ機能やプログラミングを組んだりできるほか、API自体もDLLとして公開されているので、自作アプリからも利用することができます。今回の用途にはベストです。


Titan OneとDLLを使って、次のような流れで操作を流し込みます。

ゲーム機 <-(USB)- Titan One <-(USB)- Windows(DLL <- 自作アプリケーション)

なんとなくイメージが湧くでしょうか?


なぜアプリを自作するのか?

先ほど「マクロ機能やプログラミング環境が用意されている」と書いた通り、用意された環境を利用すれば、別にアプリを自作するまでもなくマクロを実行することができます。ただ、付属のプログラミング環境ではミリ秒単位の制御しかできず、格闘ゲームの60fps(framer per second)の制御をするには不便でした。格闘ゲーマーは物事をフレーム単位で考えていて、ミリ秒単位で考えるわけではないのです。

なので、フレーム単位で扱いやすくするための、簡単なアプリを自作することにしました。


なぜGoなのか?

今回のアプリは、配布が容易であることと、Mac/Windowsのいずれでも開発できるという点でGoを選びました。

僕はJavaエンジニアなので、最初はJavaでJNAを使って実装しました。Javaでも問題なく動くものができたのですが、ただちょっと配布するのが面倒だなと思い、かと言ってC#にするとMacでの開発環境が微妙なので、Goを選ぶに至りました。もちろんそんな消去法的な選択だけでなく、ちょっとGoを勉強してみたかったという背景もあります。


DLL呼び出しの実装

それではGoで実装していきます。僕はGo初心者で体系的に学んだこともないので、ここがおかしいよとか、ここもっとこうした方が良いよというのがあれば、教えてもらえると凄く嬉しいです。


DLLの読み込み

まずTitan Oneが提供しているDLL(gcdapi.dll)を読み込む部分を作ります。

goでDLLを読み込むにはsyscall.LoadDLLを使います。

dll, _err := syscall.LoadDLL("gcdapi.dll")
if _err != nil {
	log.Fatal("Error loading gcdapi.dll", _err)
}

これでDLLの読み込みができます。


ここで「Failed to load gcdapi.dll: %1 is not a valid Win32 application.」というエラーに悩まされました。

原因は「gcdapi.dllが32bit」だったことです。64bit環境でコンパイル/実行しようとするとエラーになるので、32bit環境向けにコンパイルする必要があります。

set GOARCH=386

これでOK

ここで1時間ほどハマっていました。


DLL内の関数呼び出し

DLLにある関数を呼び出すには、読み込んだdllのFindProc関数を使います。

gcdapi_Load, _err := dll.FindProc("gcdapi_Load")
if _err != nil {
	log.Fatal("cannot find gcdapi_Load", _err)
}

result, _, _err := gcdapi_Load.Call()
if result == 0 {
	log.Fatal("gcdapi cannot be loaded")
}

これでgcdapi_Loadという関数を実行できます。

DLL内にある他の関数も、同じ要領で先に読み込んでおきます。


また、関数引数を渡す必要がある場合は、uintptr型のポインタとして渡します。

次の例は、someFunction関数にarg1という値を渡す例です。

result, _, _ := someFunction.Call(uintptr(arg1))

戻り値はresultに入ります。


配列や構造体を引数に渡す

引数配列や構造体を渡す場合は、先に参照をポインタに変換してから、さらにuintptrに変換します。

正直、参照がどうなってるのかもはや僕には理解できないのですが、こうすれば動くということだけ確認しました。

func write(inputs *[36]int8) bool {
	result, _, _ := procWrite.Call(uintptr(unsafe.Pointer(inputs)))
	// 略
	return true
}

ちなみにここで配列ではなくスライスを渡していたために上手く動かず、2時間ほどハマりました。初学者はとにかくハマって時間をロストしますね。


また構造体を引数にする場合は、DLL側に用意された構造体と同じ名前、同じ型の構造体をGo側に用意します。

次の例ではGCAPI_REPORTという構造体を引数に渡しています。

type GCAPI_REPORT struct {
	console       uint8
	controller    uint8
	led           [4]uint8
	rumble        [2]uint8
	battery_level uint8
	input         [30]GCAPI_INPUT
}

type GCAPI_INPUT struct {
	value      int8
	prev_value int8
	press_tv   uint32
}

func read() bool {
	var report GCAPI_REPORT
	result, _, _ := procRead.Call(uintptr(unsafe.Pointer(&report)))
	log.Println(report)
	// 略
	retun true
}

構造体は恐ろしいぐらいにハマらずに値が入りました。簡単です。最高です。


MacでもWindowsでも実行したい

ちなみにここまで使ってきたsyscall.LoadDLLや、その戻り値であるsyscall.DLL、syscall.ProcなどはWindows環境でしか利用できず、Mac環境でコンパイルしようとするとエラーになってしまいます。

せめてMacではDLL呼び出し部分だけモックにして、アプリ全体としては動くようにしておきたいものです。そういうのもきちんと用意されていました。


同じパッケージ内に、同名の関数を定義した別ソースファイル(モック用関数群)を作り、そのファイルの先頭にこんなコメントを入れます。

// +build !windows

これでWindowsの環境以外でビルドした場合には、モック用関数群のファイルが利用されるようになります。

・・・ちょっと何を言ってるのか分からない感じなので、実物を見て頂いたほうが早いと思います。


Windows用のソースコードがこれ。

https://github.com/cero-t/cero-macro.t1/blob/master/gcapi/gcapi_dll_windows.go


Windows以外用のソースコードはこれです。

https://github.com/cero-t/cero-macro.t1/blob/master/gcapi/gcapi_dll_other.go


いいですね、簡単です。


次に入力側を作る

DLLの呼び出しが上手く行ったので、次は呼び出し側の設計/実装です。


懐かしのコマンド

20世紀に格闘ゲームをやっていた人にとって、「236p」という文字列波動拳コマンドにしか見えません。これはテンキー配列の「下 右下 右」に相当するもので、最後のpがパンチです。当時のパソコン通信インターネットで頻繁に用いられた記法です。同様に、昇龍拳は「623p」、竜巻旋風脚は「214k」となります。今回はこの表記をします。

また、パンチの小中大はそれぞれl, m, hで表現し、たとえば小パンチは「lp」、中キックは「mk」、しゃがみ大キックは「2,hk」とします。これは海外の格闘ゲーマーが用いている表記方法です。


この表記法を使い、たとえば次のようなテキストを受け取ることを想定します。

2,mk 8
3 1
6,lp 1

これはレバー下と中キックを8フレーム、右下に1フレーム、右と小パンチを1フレーム、というコマンドです。中足波動拳ですね。

このようなコマンドを受け取って、上で説明した関数の呼び出しを行うようにします。


入力コマンドの変換

格闘ゲームの入力は精度が命ですから、Go側の処理に時間が掛かってコマンドが不安定になってしまっては意味がありません。そのため、上に書いたテキストをあらかじめパースして配列などに変換しておき、その後にまとめてDLL経由でコマンドを流し込む処理を行うようにします。変換とDLL呼び出しを逐次行っていると、想定しない処理遅延が発生する可能性があるためです。


テキストをパースした後、次のような構造体に変換します。

type State struct {
	buttons [36]int8
	frames  uint16
}

buttonsが押すべきボタンの一覧です。配列のそれぞれがボタンやレバーに対応しており、押さない時は0、押した時は100という値が入ります。


変換は地味なコードなので割愛します。ここでやってます。

https://github.com/cero-t/cero-macro.t1/blob/master/processor/converter.go


正確な60fpsのコマンド実行

続いて、コマンド実行の処理です。State構造体の配列をきっちりと指定したフレームで呼び出す必要があります。

ちなみに格闘ゲームではよく「1/60秒」という表現が使われて、「光速は遅すぎる」など話題になることがありますが、少なくとも手元環境のプレステ3で確認したところ、1フレームは「1/60秒」ではなく「1/59.94秒」でした。


そのあたりも踏まえて、次のようなコードを書きました。ボタンを押した後に、そのフレーム数分(たとえば5フレームなら、約 5 * 16.6ms = 83ms)だけ待つというコードです。

ただしこの処理自体の実行時間も考慮する必要があるため、現在時刻など利用して、フレームレートを保つようにしています。

// フレームレート (frame per 1000 seconds)
const frameRate uint64 = 59940

func Process(states *[]State) {
	var totalFrames uint64
	start := time.Now().UnixNano()

	for _, state := range *states {
		// ボタンを押す
		gcapi.Push(&state.buttons)

		// フレーム数分だけsleepする
		totalFrames += uint64(state.frames)
		sleepTime := start + int64(totalFrames*1000*1000*1000*1000/frameRate) - time.Now().UnixNano()
		time.Sleep(time.Duration(sleepTime))
	}
}

正確に59.94fpsを保つためにわざわざアプリを自作したので、この処理が一番のキモと言えますね。


この関数を含む処理全体はここにあります。

https://github.com/cero-t/cero-macro.t1/blob/master/processor/processor.go


UIはどうする?

最後にUIです。

GoのUIをどうすべきかは議論があるようですが、「クロスプラットフォームで使えて、皆が慣れ親しんでいるUI」と言えば、コマンドラインCUI)とHTMLの2つでしょう。


まずはCUI

テキストファイルを読み込んでその中に書かれた操作コマンドを実行するCUIを作ります。

こんな感じで、実行時の引数にファイル名を指定します。

$ go run macro.go combo.txt

これでcombo.txtに書かれた内容が、ゲーム機に送られるのです。


このmacro.txtという渋い名前のファイルの中身は、こんな感じです。

9 34
hp 20
2,mk 8
6 1
2 1
3,hp 1

ジャンプ大パンチ 中足 昇龍拳、という感じです。


続いてGUI

もう一つのUIが、HTTPサーバとして起動し、画面で入力されたコマンドを実行するGUIです。実行時の引数にファイル名を指定しなければHTTPサーバとして立ち上がります。

$ go run macro.go
Server started at [::]:8080

それでlocalhostの(あるいは別のPCからIPアドレスを指定して)8080ポートにアクセスすれば、こんな画面が出てきます。

f:id:cero-t:20171207075959p:image

テキストボックスにコマンドを入力して「run」を押せば実行できる形です。

マクロで動かす以外にも簡単な操作をリモートでできるよう、他のボタンも置いておきました。「LP」ボタンを押すと、「lp 2」というマクロ(小パンチを2フレーム押す)が送られます。


なおHTMLファイルの使い方ですが、Goでリソースバンドルするのがちょっと面倒だったので、ソースコード内にヒアドキュメントで書いてしまいました。

func formHTML() string {
	return `
<!DOCTYPE html>
<html>
<head>
(略)
</body>
</html>
`
}

なおViewフレームワークには超軽量で有名な「Vanilla JS」を用いているため、この1ファイルだけでViewのレンダリングサーバとの通信処理なども実装できています。

http://vanilla-js.com/

はい。


HTTPクライアントサーバ関連のソース全体はこちらです。

https://github.com/cero-t/cero-macro.t1/blob/master/http/form-html.go

https://github.com/cero-t/cero-macro.t1/blob/master/http/http.go


実際、試してみたところ・・・!!

これで画面からDLLまですべて繋がったので、いざ実機で挑戦!


おぉ、動く動く! キャラが動くよ!


・・・あれ、コンボが途切れるぞ?

なんか、コンボ不安定だぞ?


そうなんです、どうやらTitan Oneは処理遅延が10ms程度あり、その遅延も安定しないため、格闘ゲームの1フレームを安定させて動かすのが難しいようなのです。

ここまで書いてきてこのオチはなかなか酷いのですが、別にこれで終わりというわけではありません。

より遅延が少ないデバイスが出てきて、それをAPI経由で叩けるようになれば、また再挑戦したいと思います。


あれ、この手元にあるデバイスは、何だろう??

f:id:cero-t:20171207080038j:image:w360


来年には、API経由で叩けるようになるのかなぁ。

2016-01-01

[]AWS LambdaでJavaとNode.jsとGoの簡易ベンチマークをしてみた。

あけましておめでとうございます!

現場でいつも締め切りに追われデスマーチ化している皆様方におかれましては、年賀状がデスマーチ化していたのではないかと憂慮しておりますが、いかがお過ごしですか。

エンジニアの鑑みたいな僕としましては、年始の挨拶はSNSでサクッと済ませ、年末年始は紅白など見ながらAWS Lambdaのソースコードを書いていました。


ということで、Lambda。

Lambdaを書く時に最初に悩むのは、どの言語を選択するか、なのです。


まず手を出すのは、サクッと書けるNode.js

ただNode.jsの非同期なプログラミングになかなか馴染めず、わからん殺しをされてうんざりしていました。みんなどうしているのよとtwitterで問いかけたところ @ からPromiseを使うべしと教わり、なるほど確かにこれは便利ですナと思いつつも、これが標準で使えないぐらいLambdaのNode.jsが古いところにまたうんざりしました。


では手に馴染んでるJavaを使ってみたらどうかと思ったら、メモリイーターだし、なんとなくパフォーマンスが悪い感じでした。詳しいことは後ほどベンチマーク結果で明らかになるわけですが。


それなら消去法で残ったPythonなのですが、僕マジPython触ったことないレベルであり、これは若者たちに任せることにしているので、Pythonも選択肢から消えて。


本来、Lambdaみたいな用途にはGoが向いているはずなのだけど、いつ使えるようになるんだろうなぁなどと思って調べてみたら、Node.jsからGoを呼び出すというテクを見つけて、こりゃいいやとなりました。

http://blog.0x82.com/2014/11/24/aws-lambda-functions-in-go/


ただGoを呼ぶにしてもNode.jsを経由するためのオーバーヘッドはあるわけで、じゃぁ実際にどれぐらいパフォーマンス影響があるのか、調べてみることにしました。


処理の中身

簡単に試したいだけだったので、数行で済む程度のごく簡単な処理を書いてベンチマークすることにしました。


1. 引数で渡されたJSONをパースして、中身を表示する

2. DynamoDBから1件のデータを取得する


当初は1だけだったのですが、あまり性能に差が出なかったので2を追加した感じです。そんな経緯のベンチマークなので、実装も雑ですが、とりあえずソースをGitHubに置いておきました。

https://github.com/cero-t/lambda-benchmark


実行結果
回数Java(1)Java(2)Node.jsGo
1回目218006700900600
2回目13007300800500
3回目19000500200500
4回目10001300200400
5回目500200400500
6回目400100200500
7回目400300400500

時間はいずれもミリ秒のBilled duration


Java(1) : メモリ192MB。処理中にAmazonDynamoDBClientを初期化

Java(2) : メモリ192MB。処理前に(コンストラクタで)AmazonDynamoDBClientを初期化

Node.js : メモリ128MB

Go : メモリ128MB


メモリの実使用量は

Java : 68MB

Node.js : 29MB

Go : 15MB

でした。


考察など

正味の話、Lambdaで行っている処理などほとんどないので、性能的には大差ないかと思っていたのですが、思ったより特性が出ました。これは処理時間というよりは、関連ライブラリのロード時間な気がしますね。


Javaは最初の数回が20秒、実装を改善しても7秒とかなり遅かったのですが、ウォーミングアップが終わった後は200msぐらいまで早くなりました。呼ばれる頻度が高い処理であれば良いのでしょうけど、たまに呼ばれるような処理では、この特性はネックになる気がします。


ちなみにJavaだけは192MBで計測しましたが、最初にメモリ128MBで試したところ、30秒ぐらいで処理が強制的に中断されたり、60秒でタイムアウトするなど、計測になりませんでした。こういう所を見ても、Javaを使う時にはメモリを多め(CPU性能はメモリと比例して向上)にしなくてはいけない感じでした。


Node.jsも少しはウォーミングアップで性能が変わりますが、最初から性能は良い感じです。


Goを使った場合は性能が安定しており、ウォーミングアップしても性能が変わりません。メモリ使用量が少ないのも良いですね。Node.jsから呼び出す際のオーバーヘッドがあるせいか、性能的にはウォーミングアップ後のJavaやNode.jsに一歩劣る感じでした。


なお、Pythonの実装をしてくれる人がいらっしゃれば、プルリクしていただければ、データを追加したいと思います。


結論

繰り返しになりますが、雑なベンチマークなので特性の一部しか掴んでないと思います。

そもそもJavaだけメモリが1.5倍になっている(=CPU性能も1.5倍になっている)ので公平ではないですし。


ただ「たまに行う処理」を「少ないリソース」で行うという観点では、Goで実装するのが良さそうです。GoはそのうちLambdaでも正式サポートされそうですしね。

きちんとしたベンチマークは、また別の機会にじっくり行ってみたいと思います。


ということで、Goを使う大義名分ができたベンチマークでした!

See you!

2015-12-24

[][]JMXで情報を取得する時のベンチマーク

JMHを使って、JMX経由でMBeanの情報を取る際のパフォーマンスを測定してみた。

ベンチマークのソースコードはこちら。

https://github.com/cero-t/Benchmarks/blob/master/src/main/java/ninja/cero/benchmark/JmxBenchmark.java


ベンチマーク環境はMacBook Pro Late 2013 (Core i5 2.4GHz) で、

他のアプリなども立ち上げっぱなしの環境なのでノイズは多めでだけど、

傾向を見たいだけなのであまり気にせず。


VirtualMachine.attachのパフォーマンス

VMにオンデマンドアタッチする際のパフォーマンス。


No1 : VirtualMachine.attacheしてからシステムプロパティを取ってdetachする

No2 : キャッシュしていたVirtualMachineを使ってシステムプロパティを取得する

@Benchmark
public void no1_vmAttach() throws Exception {
    VirtualMachine vm = VirtualMachine.attach(PID);
    vm.getSystemProperties();
    vm.detach();
}

@Benchmark
public void no2_vmCachedGetProperties() throws Exception {
    vm.getSystemProperties();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no1_vmAttachthrpt10653.834± 108.790ops/s
JmxBenchmark.no2_vmCachedGetPropertiesthrpt102330.529± 270.268ops/s

アタッチありは1.5msec程度、キャッシュした場合は0.4msec程度。

ということで、アタッチに掛かる時間は1msec程度と推定。割とでかい。


JMXConnectorFactory.connectのパフォーマンス

続いて、VMに対するJMX接続を行う際のパフォーマンス。


No3 : JMXConnectorの取得処理と、クローズ処理をする

No4 : キャッシュしていたJMXConnectorを使って、MBeanServerConnectionの取得とMbean情報を取得する

public JMXConnector no3_vmCachedGetConnector() throws Exception {
    String connectorAddress = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress");

    if (connectorAddress == null) {
        String agent = vm.getSystemProperties().getProperty("java.home") + File.separator + "lib" + File.separator + "management-agent.jar";
        vm.loadAgent(agent);
        connectorAddress = vm.getAgentProperties().getProperty("com.sun.management.jmxremote.localConnectorAddress");
    }

    JMXServiceURL serviceURL = new JMXServiceURL(connectorAddress);
    JMXConnector jmxConnector = JMXConnectorFactory.connect(serviceURL);
    jmxConnector.close();
}

@Benchmark
public void no4_connectorCachedGetMBeanCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    connection.getMBeanCount();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no3_vmCachedGetConnectorthrpt10612.652± 95.547ops/s
JmxBenchmark.no4_connectorCachedGetMBeanCountthrpt109047.803± 1480.741ops/s

JMXの接続と切断は1.6msec程度。おおまかVMに対するアタッチと同じぐらい。

接続したあとの、MBeanServerへの接続とMBean情報取得は0.1msecぐらいで、これは無視できる小さい。


ThreadMXBeanからThreadCountを取るパフォーマンス

今回の主目的はこれ。

ThreadMXBeanを使って情報を取るのと、

MBeanServerConnection.getAttributeで名前を指定して情報を取るのと、どっちが早いか。


No5 : MBeanServerConnection.getAttributeの名前指定でThreadCountを取得する

No6 : ThreadMXBeanを取得してから、getThreadCountで取得する

No7 : キャッシュしていたThreadMXBeanから、getThreadCountで取得する

@Benchmark
public void no5_connectorCachedThreadCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    Object count = connection.getAttribute(new ObjectName(ManagementFactory.THREAD_MXBEAN_NAME), "ThreadCount");
    sum += (Integer) count;
} 

@Benchmark
public void no6_connectorCachedGetThreadCount() throws Exception {
    MBeanServerConnection connection = connector.getMBeanServerConnection();
    ThreadMXBean threadBean = ManagementFactory.newPlatformMXBeanProxy(
            connection, ManagementFactory.THREAD_MXBEAN_NAME, ThreadMXBean.class);
    sum += threadBean.getThreadCount();
}

@Benchmark
public void no7_beanCachedGetThreadCount() throws Exception {
    sum += threadMXBean.getThreadCount();
}

結果

BenchmarkModeCntScoreErrorUnits
JmxBenchmark.no5_connectorCachedThreadCountthrpt108199.887± 1269.164ops/s
JmxBenchmark.no6_connectorCachedGetThreadCountthrpt102662.147± 585.627ops/s
JmxBenchmark.no7_beanCachedGetThreadCountthrpt108148.967± 1705.518ops/s

ThreadMXBeanを毎回取る(No6)は明らかにパフォーマンスが悪いけど、

ThreadMXBeanをキャッシュしている限りは、ThreadMXBeanから情報を取るのと、

MBeanServerConnection.getAttributeで取ることに性能差はなし。


まとめ

1. VirtualMachineへのattach/detachは時間が掛かるので、キャッシュすべき

2. JMX接続の確立/切断は時間が掛かるので、キャッシュすべき

3. MBeanServerへの接続は時間が掛からないので、無理にキャッシュしなくてよい(close処理もないのでリソース管理もしてない?)

4. MBeanServerConnection.getAttributeでもThreadMXBeanを使っても性能差はないので、ThreadMXBeanを無理にキャッシュしなくてよい


ベンチマークのソースコード

https://github.com/cero-t/Benchmarks/blob/master/src/main/java/ninja/cero/benchmark/JmxBenchmark.java


現場からは以上です。

2015-12-12

[Java]Stream APIをつくろう

最近、上司からのパワハラではなく、部下からの「部下ハラ」というものがあると聞きますが、

それって要するにバックプレッシャー型ハラスメントでありリアクティブであるよなと思うこの頃ですが、

皆さんいかがお過ごしでしょうか。


さて、順調な滑り出しからスタートした本エントリーは Java Advent Calenda 2015 の12日目になります。


前日は最年少(?)OpenJDKコミッタの @ さんによる

DeprecatedがJDK9で変わるかもしれない件(JEP 277: Enhanced Deprecation)

明日は日本唯一のJava Championの @ さんのエントリーです。

なかなかエラいところに挟まれてしまったなという感じです!


では本題、

今日のテーマは「Stream APIをつくろう」です。


Stream APIって、使う側がよくフォーカスされるのですが、

作る側がどうなっているのか、あるいはその使いどころはどこなのか、というのはあまり情報がないように思います。


今日はその作る側にフォーカスしたいと思います。

少し長いエントリーですが、ゆっくりとおつきあいください。


おしながき

1. なぜStream APIを作りたいのか

2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

3. そもそもStream APIってどうやって作るの?

4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

5. まとめ、そして次回予告


1. なぜSteam APIを作りたいのか

もともとのきっかけは、AWSのS3上にある大量のログファイルを

elasticsearchに流し込む処理を書いたことでした。


ファイルサイズが膨大なのはもちろんのこと、ファイル数自体も大量ですから、

処理対象となるファイルをリストアップするだけでも大変という状況です。

なのでファイルリストを少しずつ作りつつ、

ファイルを開いて少し読んでは流し込むという、逐次処理をしなくてはいけません。


Java7までなら、うんたらHandlerを渡して処理をグルグル回すところですが

Java8なので、せっかくだからStream APIを使うことにしてみました。


ただ先にも書いたように、Stream APIを提供する側の処理はあまり情報がなかったため

@さんや@さん、@さんに色々と質問しながら勉強しました。

皆さん、ありがとうございました。今日はその辺りをまとめたいと思います。


ちなみに言葉の定義としてはStream APIを作るんじゃなくて

Stream APIのProducerを作るっていう方が正しいとは思うんですが

ぱっと見での分かりやすさのためにこうしました。


2. お題:ディレクトリの直下にあるテキストファイルをまとめて Stream<String> として扱う

まず手始めに、ローカルファイルにあるファイルを処理する方法から考えます。

特定のディレクトリの直下にテキストファイルが大量にあり、

ファイル名などは別に表示しなくても良いから、

テキストファイルを読み込んで、一行ずつ処理したいというシチュエーションです。


Files.listとFiles.linesをどう組み合わせるの?

ディレクトリ内にあるファイルの一覧を Stream<Path> として取得するのは Files.list メソッド、

ファイルを読み込んで Stream<String> として取得するのは Files.lines メソッドが使えます。

では、これを組み合わせる時は、どうしたら良いんでしょうか。


変換はmapメソッドだったよなーと思いつつ、こんな事をやると。

Stream<Stream<String>> stream = Files.list(path)
        .map(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
stream.forEach(System.out::println);

Stream<Stream<String>> になり、恐ろしく期待外れの結果になります。

java.util.stream.ReferencePipeline$Head@421faab1
java.util.stream.ReferencePipeline$Head@2b71fc7e
java.util.stream.ReferencePipeline$Head@5ce65a89

ではどうするかというと、そう、みんな大好きflatMapです。

try (Stream<String> stream = Files.list(path)
        .flatMap(p -> {
            try {
                return Files.lines(p);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

こうすれば無事に結果をStream<String>として取り出すことができます。


出力例はこんな感じ。

This is file1.
Hello world.
This is file 2.
This is file3.
Good bye world.

ちなみにFiles.listで作ったStreamはきちんとcloseする必要があるので

try-with-resourcesを使うのがオススメです。


flatMapは毎回closeを呼び出してくれる

ちなみにflatMapがスグレモノなところは、

flatMapメソッドの中で作ったStreamを、毎回必ずcloseしてくれるところです。


上の例ではFiles.linesを利用しているので、別にcloseする必要はないのですが、

たとえばBufferedReaderやInputStreamなどを作った場合などには、closeする必要があります。

そういう時は、flatMapメソッド内で作るStreamにonCloseを書いておけば、毎回呼び出されるのです。

try (Stream<String> stream = Files.list(path)
        .flatMap(s -> {
            try {
                BufferedReader reader = Files.newBufferedReader(s);
                return reader.lines().onClose(() -> { // (1)
                    try {
                        reader.close(); // (2)
                        System.out.println("closed."); // (3)
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        })) {
    stream.forEach(System.out::println);
}

(1) でStreamを作ったところにonCloseを書いて

(2) でBufferedReaderをcloseしています。

This is file1.
Hello world.
closed.
This is file 2.
closed.
This is file3.
Good bye world.
closed.

(3) で書いた「closed.」が3回表示され、きちんと毎回closeされていることが分かります。


この章のまとめ

1. Streamの要素からStreamに変換する時は、flatMapだ!

2. flatMapで作ったStreamは、きちんとcloseしてくれるぞ!


3. そもそもStream APIってどうやって作るの?

前の章ではflatMapが便利だという知見を得ました。

次は、そもそもStream APIってどうやって作れば良いのかを考えます。


恥ずかしながら全然知らなかったので、twitterで聞いてみたところ

@さんと @さんから、

Iteratorを作って、SpliteratorにしてStreamSupporに渡すと良いと教わりました。


この件については、BufferedReader.linesの実装が分かりやすすぎるので引用したいと思います。

public Stream<String> lines() {
    Iterator<String> iter = new Iterator<String>() { // (1)
        String nextLine = null;

        @Override
        public boolean hasNext() {
            if (nextLine != null) {
                return true;
            } else {
                try {
                    nextLine = readLine();
                    return (nextLine != null);
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
        }

        @Override
        public String next() {
            if (nextLine != null || hasNext()) {
                String line = nextLine;
                nextLine = null;
                return line;
            } else {
                throw new NoSuchElementException();
            }
        }
    };
    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
            iter, Spliterator.ORDERED | Spliterator.NONNULL), false); // (2)
}

(1) Iteratorを実装して

(2) Spliteratorsを使ってSpliteratorに変換し、StreamSupportを使ってStreamに変換しています。

これがStreamを作る時の、ひとつの王道のようですね。


この章のまとめ

1. ルークよ、Iteratorを使え


4. お題:AWS S3にあるテキストファイルをまとめて Stream<String> として扱う

Streamのつくりかたが分かったところで、

次は、AWS S3にあるテキストファイルを処理する方法を考えます。

と、その前に、

AWSになじみのない方がここで読んでしまうことを防ぐために説明したいのですが、

この章の目的は、AWS S3の操作がしたいのではなく、

Java標準APIでStream APIを作れないようなパターンで、どのように操作できるかを考えることです。


なのでAWSだけでなく、たとえばDBや外部Webサーバなどでも同じパターンが使えますので、

AWSはあくまで一例として捉えてもらえればと思います。


今回利用するクラス

まず簡単に、AWS S3からファイルを取ってくる処理を実装するために必要なクラスを紹介します。


ちなみにAWS S3とはファイルストレージのサービスで、

ブラウザからのファイルアップロード / ダウンロードができる他にも

コマンドラインツール(AWS CLI)や、AWS SDK(AWS SDK for Javaなど)から操作ができます。

今回はAWS SDK for Javaを使うことにします


それで、使うクラスは以下の通りです。

  • AmazonS3Client
  • S3ObjectSummary
    • S3のファイル情報を持つクラス。Javaで言うPathやFileクラスに相当する。
  • ObjectListing
    • ファイル一覧取得結果のメタ情報を持つクラス。ここから List<S3ObjectSummary> を取り出すことができる。

あんまり多くないですね。


AWS S3へのアクセスを試す

まず簡単に、AWS S3にJavaからアクセスする方法を書いておきます。

型などがはっきり分かるよう、Stream APIを使わずに書きます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (1)
List<S3ObjectSummary> list = listing.getObjectSummaries(); // (2)

for (S3ObjectSummary s : list) {
    S3Object object = client.getObject(s.getBucketName(), s.getKey()); // (3)
    try (InputStream content = object.getObjectContent(); // (4)
         BufferedReader reader = new BufferedReader(new InputStreamReader(content))) {
        String line = reader.readLine();
        while (line != null) {
            System.out.println(line); // (5)
            line = reader.readLine();
        }
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

(1)「cero.ninja.public」という名前のバケットにある「advent2015java」というフォルダから情報を取り

(2) ファイル情報をListとして取り出して

(3) ファイルの中身を読み込んで

(4) InputStreamを取り出して

(5) 標準出力に表示しています。


簡単でしょう?

ただ、実は (1) のメソッド呼び出しは最大1000件までしか取得できないという制限があります。

1001件以降のデータを取るためには、別のメソッドを使う必要があります。

List<S3ObjectSummary> allList = new ArrayList<>(); // (6)

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java"); // (7)
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    allList.addAll(list); // (8)
    listing = client.listNextBatchOfObjects(listing); // (9)
    list = listing.getObjectSummaries();
}

(6) Listを別に用意しておいて

(7) 最初の1000件を読み込み

(8) ファイル情報を (6) のListに入れながら

(9) 次の1000件を読む、を繰り返す


こんな風になります。

ただこれがたとえば10万件、100万件となってくると、Listがメモリを過剰に消費しますし

そもそも実際の処理を始めるまでのオーバーヘッドが大きくなりすぎます。


じゃぁwhileの中で処理しちゃえばいいやんか、という話になってきます。

AmazonS3Client client = new AmazonS3Client();
ObjectListing listing = client.listObjects("cero.ninja.public", "advent2015java");
List<S3ObjectSummary> list = listing.getObjectSummaries();

while (list.isEmpty() == false) {
    list.stream().forEach(this::doSomething); // (10)
    listing = client.listNextBatchOfObjects(listing);
    list = listing.getObjectSummaries();
}

(10) 取得した1000件を使ってすぐに処理を始めます。


これはこれで正解だと思いますしシンプルで良いのですが、

「S3から情報を取るクラスに処理を渡す」よりも

「S3から情報を取ってStreamにする」ほうが、何かと取り回しが楽なのですよね。


AWSへのアクセスをStreamにしよう

ということで、ここまで学んだ知識を利用して、

AWS S3にあるテキストファイルをまとめて Stream<String> にする方法を考えます。


まずはファイル一覧を取ってくる部分を、Iteratorにします。

public class S3Iterator implements Iterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    ObjectListing listing;
    List<S3ObjectSummary> cache; // (1)

    public S3Iterator(String bucketName, String key) {
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    void load() {
        if (cache != null && cache.size() > 0) {
            return;
        }

        if (listing == null) {
            listing = client.listObjects(bucketName, key); // (2)
        } else {
            listing = client.listNextBatchOfObjects(listing); // (3)
        }

        cache = listing.getObjectSummaries(); // (4)
    }

    @Override
    public boolean hasNext() {
        if (cache != null && cache.size() > 0) {
            return true;
        }

        load();
        return cache.size() > 0;
    }

    @Override
    public InputStream next() {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }

        S3ObjectSummary summary = cache.remove(0); // (5)
        return client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent(); // (6)
    }
}

(1) 読み込んだ1000件のファイル情報を保持しておくcacheを作っておき

(2) 初回はlistObjectsを使って読み込み

(3) 2回目以降はlistNextBatchOfObjectsを使って読み込みます。

(4) 読み込んだファイル情報はcacheに入れておき

(5) removeしながら取り出します。

(6) また、S3ObjectSummaryのStreamにはせず、InputStreamにしてから返します。

このようにすれば、Iteratorの外側でAmazonS3Clientを利用する必要がなくなるためです。


そして、このIteratorからStreamを作り、さらにflatMapを使ってStream<String>に集約しましょう。

Stream<InputStream> fileStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(
        new S3Iterator("cero.ninja.public", "advent2015java"), Spliterator.NONNULL), false); // (7)

Stream<String> stream = fileStream.flatMap(in -> { // (8)
    BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
    return reader.lines() // (9)
            .onClose(() -> { // (10)
            reader.close();
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    });
});

stream.forEach(System.out::println); // (11)

(7) IteratorからSpliteratorを作り、そこからStreamに変換します。これでファイル一覧が Stream<InputStream> になります。

(8) InputStreamから読み込んだ内容を集約するために、flatMapを使って

(9) BufferedReader.linesで作ったStream<String>を集約します。

(10) ここでonCloseを書いておくと、BufferedReaderが毎回きちんとcloseされます。flatMapさまさま。

(11) 最後に標準出力への書き出しをしていますが、もちろんこの Stream<String> はどのように扱うこともできます。


という感じで、AWS S3上にあるテキストファイルを Stram<String> にして

処理することができるようになりました。


gzipやzipだったらどうするの?

ちなみに大量のテキストファイルが存在する場合、たとえばそれがWebサーバのログだったりすると

たいていにおいてgzip圧縮されているかと思います。

そういう時は、読み込む時にGZIPInputStreamを使うと良いでしょう。


先に出した例とあまり変わらず、GZIPInputStreamを挟んだだけの形となります。

Stream<String> stream = fileStream.flatMap(in -> {
    BufferedReader reader;
    try {
        reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(in), StandardCharsets.UTF_8)); // (12)
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    return reader.lines()
            .onClose(() -> {
                try {
                    reader.close();
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            });
});

(12) のところにGZIPInputStreamを挟みました。


では、読み込む対象が、複数のファイルを持つようなzipファイルだった場合はどうすれば良いでしょうか。

実はこれが割と難敵で、zipで処理を回せるようなIteratorをもう一つ作る必要があります。


ZipInputStreamにStream<InputStream>を返すようなAPIを作ってくれれば良いのにと思うんですが

(ZipFileやFileSystemsならStreamを作りやすいんですが、ZipInputStreamからは作れない!)

そのあたりは自前で少し実装する必要があります。

さすがにエントリーが長くなってきたので、今回は割愛します。


Spliteratorを直接作っても良い

@さんから、Iteratorを実装するよりも

AbstractSpliteratorを実装したSpliteratorを作ったほうが楽だよという知見をもらったので

それを使ったサンプルも掲載しておきます。

public class S3Spliterator extends Spliterators.AbstractSpliterator<InputStream> {
    AmazonS3Client client;
    String bucketName;
    String key;

    List<S3ObjectSummary> cache;
    ObjectListing listing;

    public S3Spliterator(String bucketName, String key) {
        super(Long.MAX_VALUE, 0);
        this.client = new AmazonS3Client();
        this.bucketName = bucketName;
        this.key = key;
    }

    @Override
    public boolean tryAdvance(Consumer<? super InputStream> action) {
        if (cache == null) {
            listing = client.listObjects(bucketName, key);
            cache = listing.getObjectSummaries();
        } else if (cache.isEmpty()) {
            listing = client.listNextBatchOfObjects(listing);
            cache = listing.getObjectSummaries();
        }

        if (cache.isEmpty()) {
            return false;
        }

        S3ObjectSummary summary = cache.remove(0);
        S3ObjectInputStream in = client.getObject(summary.getBucketName(), summary.getKey()).getObjectContent();
        action.accept(in);
        return true;

    }
}

tryAdvanceメソッドをひとつ作れば良いだけですので、

hasNextやnextを実装する時のように、状態がどうなるかよく分からなくなって混乱することもありません。

確かに、Streamを作ることだけが目的となる場合は、こちらを使った方が楽になりそうですね。


念のため、これを使った処理も書いておきます。

Stream<InputStream> fileStream = StreamSupport.stream(
        new S3Spliterator("cero.ninja.public", "advent2015java"), false);

Stream<String> stream = fileStream.flatMap(in -> { // 以下略

StreamSupportに直接渡すだけで済む、ということですね。


この章のまとめ

1. 「1000件ずつ読み出して処理」みたいなやつは、IteratorにしてStreamにすれば処理しやすいよ!

2. AbstractSpliteratorを継承すると、Spliteratorを作りやすいよ!


5. まとめ、そして次回予告

では今回のまとめです。

今回のエントリーでは、Streamを作るところと、

その使いどころとしてAWS S3上の大量ファイルを例に挙げて説明しました。


Iteratorを作り、Spliteratorにして、そこからStreamを作る、

また、Stream.flatMapを使って複数リソースをシームレスに処理することができました。


Stream.flatMapを使うときちんとリソースのcloseができるのは、便利さがじわじわ効いてくる系のやつで、

たとえばStreamの代わりにIteratorを取り回そうとすると、リソースのcloseタイミングで困ることになります。

そういう点でも、Streamというのは取り回しやすい形なのかなと思います。


さて、長くなってきたので、今回のエントリーはここまでにしたいと思います。


でも実はもう一つ、言及したかったテーマがあります。

それが「Hot ProducerとCold Producer」というお話です。


JavaのStream APIは、どちらかと言えば「既に存在するListやMap」などに対する処理を

簡潔に書くところがクローズアップされているように思います。

これはCold Producer、あるいはCold Streamと呼ばれているそうです。


一方で、HTTPリクエストや、JMS、AMQP、あるいはTwitterのstreamなど、

いわゆるイベントを受け取るようなもの、あるいは無限Streamになるようなものは

Hot Producer、Hot Streamと呼ばれます。


JavaではHot Producerを扱う場合には、Listenerを実装する形が一般的ですが

Streamとして処理することもできるはずです。


Reactive StreamsがJava9に入るとか、バックプレッシャー型がどうしたと言われている中で、

このHot ProducerをStream APIを使って処理するような考え方が、

Javaにおいても、今後、重要になってくるはずです。

次回は、その辺りを考えてみたいと思います。


Stay metal,

See you!