DiaryException このページをアンテナに追加 RSSフィード Twitter

2016-12-03(土)

[]Raspberry PiOS管理をBerryBootベースにした

BerryBoot 配布サイトからzipダウンロードして解凍FAT32フォーマットしたSDカードファイル群を置いて、Raspberry Piに挿して起動。

ネットワーク設定や国設定を済ませれば、OS選択画面になる。

自分ビルドしたOSイメージインストールできるようだが、Home - Berryboot OS Imagesに割と新しめのイメージがあるので、これを利用した。

2016-11-05(土)

[]yahoo/Pulsar (standalone) 用のDockerfileを作った

PulsarYahooが作ったPub/Subシステムマルチテナント環境運用されることを前提にした設計、大量のトピック・パーティションがある時Kafkaよりパフォーマンスに優れる、などの特徴がある。

先日、Pulsar勉強会に参加して、そこそこに魅力が伝わってきたので、Dockerコンテナでサクッと試せるよう、Dockerfileを書いた。ついでにJavaクライアントも書いた。

使い方

Brokerの起動

まずDockerコンテナでPulsar Brokerを立ち上げる(standaloneモードだと同時にZookeeperやBookKeeperも立ち上がる)。

$ docker run -d \
        -p 8080:8080 -p 6650:6650 \
        --env BROKER_HOST=192.168.99.100 \
        laclefyoshi/pulsar-docker-standalone

ポート8080はBrokerのHTTPエンドポイントポートポート6650はBrokerのサービスポートで、クライアントHTTPエンドポイント(8080)からBroker情報を受け取り、実際にデータをやり取りするのはサービス(6650)を使う。クライアントが使うのはこの2つの通信経路だけ。

環境変数 BROKER_HOST はクライアントから見て、どのホストにBrokerがあるのかを指定するためのもの。Brokerとクライアントが同じマシンにあるのなら 127.0.0.1 でも可。

Javaクライアントの準備

GitHub - laclefyoshi/pulsar-docker: PulsarをDockerコンテナで使うclient-example に入り、Mavenビルドする。

$ git clone https://github.com/laclefyoshi/pulsar-docker.git
$ cd pulsar-docker/client-example/
$ mvn clean package
$ ls target/
classes/
generated-sources/
maven-archiver/
maven-status/
original-pulsar-client-example-1.0-SNAPSHOT.jar
pulsar-client-example-1.0-SNAPSHOT.jar

トピックなどの設定は、standaloneモードで用意されているデフォルトのものハードコーディングされている。

Concumerの起動

Brokerからメッセージを受け取るクライアントとしてConsumerを立ち上げる。BrokerのHTTPエンドポイント引数指定する。

$ java -cp target/pulsar-client-example-1.0-SNAPSHOT.jar \
       org.saekiyoshiyasu.PulsarConsumer http://192.168.99.100:8080
Producerの起動

Consumerとは別のターミナルで、Brokerへメッセージ送信するクライアントとしてProducerを立ち上げる。Consumerと同様、BrokerのHTTPエンドポイント引数指定する。

$ java -cp target/pulsar-client-example-1.0-SNAPSHOT.jar \
       org.saekiyoshiyasu.PulsarProducer http://192.168.99.100:8080
$
Consumerでのメッセージ受け取り

起動したままのConsumerは、Producer送信したメッセージを受け取り、内容を出力する。

Received message: Hello 0
Received message: Hello 1
Received message: Hello 2
Received message: Hello 3
Received message: Hello 4
Received message: Hello 5
Received message: Hello 6
Received message: Hello 7
Received message: Hello 8
Received message: Hello 9

standaloneモードじゃないclusterモード

Pulsarのドキュメントによると、clusterモードでの起動には以下の手順が必要のよう。

  1. Zookeeperの設定&起動
  2. Global Zookeeperの設定&起動
  3. クラスタメタデータ更新
  4. BookKeeperの設定&起動
  5. Brokerの設定&起動
  6. Service Discoveryの設定&起動
  7. テナント設定

一筋縄ではいかなかったので、今後の課題とする。

2016-08-21(日)

[]リセットピンをIOピンとして使う

ATtiny13だとPB5、ATtiny2313だとPA2、ATmega8だとPC6がリセットピンになっているので、そのままだとIOピンとして使えないが、FUSE設定でRSTDISBL=0にすると(RisetDisabledが有効になり)IOピンとして有効になる。

RSTDISBLは、ATtinyだとHIGHの0bit目で、ATmegaだとHIGHの7bit目で設定する様子。

2016-08-12(金)

[]ESP8266であれこれする本を書いた

ESP8266で始めるWi-Fi IoTプロジェクト

という内容にした。ESPUSBなんてものが出ていたりするので、まだまだ紹介していないができることは多いのだろうけれど、一旦、すぐにできるところとか、応用が効くノウハウとかをまとめたつもり。

myThingsのIDCFチャンネルとESP-WROOM-02とのMQTT双方向連携 - DiaryExceptionを書いていた頃には概ね実機検証完了していたのだけれど、それからいろいろあって半年以上延びてしまった。このタイミングでKindle Unlimited日本版が出たので、前に書いたものと同じようにセレクト登録しておいた。

秋ごろにはESP-WROOM-32なるものも出るらしいし、まだまだESP8266系は楽しめそうな予感がある。

2016-07-19(火)

[]KPLでKinesisに投げたデータをboto3で取得してみた

KPLはKinesis Producer Libraryのこと。

受信したデータを処理してKinesisに流す処理をStormのトポロジ(Java)で実装した。で、Kinesisに入ったデータPythonで処理するために、boto3を使ったのだが、思ったように動かず、試行錯誤したのでメモ

Aggregated Recordのフォーマットドキュメントが古い

まず、boto3のAPIに書かれている通りに、Kinesisからデータを取得するプログラムを書いた

import boto3

for record in boto3.client("kinesis").get_records(...)["Records"]:
  data = record["Data"]
  print(data)

とすると、投入したデータが出てくると思ったが、実際はなにやら色々付いたものが出てきた。

少なくとも以下のようなパターンデータを取得した。RECORD_DATAが実際に投げた文字列である

  • \xf3\x89\x9a\xc2\n\nPARTITION_KEY\x1a...RECORD_DATA...
  • \xf3\x89\x9a\xc2\n\nPARTITION_KEY\n\nPARTITION_KEY\x1a...RECORD_DATA...
  • RECORD_DATA

どうやら、KPLを使うとデフォルトAggregated Record有効になるらしく、上2つはKPL Aggregated Record Formatと突き合わせると、\xf3\x89\x9a\xc2というMagicNumberは一致するが、その後に続くデータはProtobufMessageではなかった。

\n\nPARTITION_KEYに出てくる値は、Aggregated RecordにまとめられたRECORD_DATAで指定されていたパーティションキーの集合のようだった。

それを読み飛ばして、\x1a以降がProtobufMessageで、末尾16バイトは確かにチェックサムだったので、どうやら正しくは下記の構造をしているようだった。

0               4                    M                  N          N+15
+---+---+---+---+---+---+....+---+---+==================+---+...+---+
|  MAGIC NUMBER | \n\nPARTITION KEY+ | PROTOBUF MESSAGE |    MD5    |
+---+---+---+---+---+---+....+---+---+==================+---+...+---+

なので、最終的にAggregated RecordからRECORD_DATAを取り出すために、PARTITION_KEYに\x1a が出てこないことを前提として、そしてこれからもProtobufMessageの始まりが\x1aであることを前提にして以下のようにした。

from generated.messages_pb2 import AggregatedRecord  # KPLのソースにある.protoファイルをコンパイルした

if data[:4] == "\xF3\x89\x9A\xC2":
  idx = data.index("\x1A")
  ar = AggregatedRecord()
  ar.ParseFromString(data[idx:-16])
  for record in ar.records:
      record_data = record.data
      print(record_data)
else:
  print(data)

1日分のデータを取得してみたが、とりあえず全件ちゃんと取れている。

もっと楽に解決できたかも

ここまでプログラムを書いて、KCL(Kinesis Client Library)とかkinesis-aggregationとかライブラリを見つけて、Deaggregationしてくれるとか書いてあるので、げんなり。

2016-06-26(日)

Stormコンポーネントを起動・監視するMonitスクリプトを書いた

This command should be run under supervision with a tool like daemontools or monit.

https://github.com/apache/storm/blob/master/bin/storm.py#L526

などとコードの中に書いている割には、そのためのスクリプト提供していないので、自分で書いた。

更新が止まったdaemontoolsよりはMonitの方が長く使えるだろうということで、Monitを選択した。

Stormの配置

Stormの各種コンポーネントバックグラウンド起動するためのスクリプトと、コンポーネント強制終了させるためのスクリプト作成する。

$ sudo mv apache-storm-1.0.1 /opt/

$ sudo vi /opt/apache-storm-1.0.1/bin/storm_daemon.sh
#!/bin/sh
COMMAND=$1
/opt/apache-storm-1.0.1/bin/storm $COMMAND >/dev/null 2>&1 &

$ sudo chmod a+x /opt/apache-storm-1.0.1/bin/storm_daemon.sh

$ sudo vi /opt/apache-storm-1.0.1/bin/storm_killer.sh
#!/bin/sh
TARGET=$1
/usr/bin/pkill -9 -f $TARGET

$ sudo chmod a+x /opt/apache-storm-1.0.1/bin/storm_killer.sh

Monitの配置

Monit自体はsystemdにプロセス管理させる。

$ sudo mv monit-5.17.1 /opt/

あとは M/Monit | Wiki 等を参考に。

Stormコンポーネント起動スクリプト作成

コンポーネントのためのMonitスクリプトは `/opt/monit-5.17.1/conf.d` に配置している。

一般にMonitの監視プロセスIDを記録したファイルを用いて行われるが、ここでは、プロセス名に特定文字列が含まれるプロセスがあるかどうかによる監視採用した。

Nimbus
check process storm_nimbus matching "org.apache.storm.daemon.nimbus"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh nimbus" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.daemon.nimbus'"
UI
check process storm_ui matching "org.apache.storm.ui.core"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh ui" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.ui.core'"
Logviewer
check process storm_logviewer matching "org.apache.storm.daemon.logviewer"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh logviewer" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.daemon.logviewer'"
Supervisor
check process storm_supervisor matching "org.apache.storm.daemon.supervisor"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh supervisor" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.daemon.supervisor'"
DRPCサーバ
check process storm_drpc matching "org.apache.storm.daemon.drpc"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh drpc" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.daemon.drpc'"
Pacemaker
check process storm_ui matching "org.apache.storm.pacemaker.pacemaker"
 start program = "/opt/apache-storm-1.0.1/bin/storm_daemon.sh pacemaker" with timeout 180 seconds
 stop program = "/opt/apache-storm-1.0.1/bin/storm_killer.sh 'org.apache.storm.pacemaker.pacemaker'"

2016-02-13(土)

[]Apache Storm DRPCのHTTP APIを使う

Apache Stormのdefaults.yamldrpc.http* の項目があることから分かるように、Storm 0.10.0のDRPCサーバはDRPCクライアントから利用されるほかに、HTTP APIを持っている(0.9.6にはない)。つまり、HTTPクライアントPerlHTTPクライアントライブラリでもcurlでもWebブラウザでも)を用いて、DRPCサーバに処理リクエストを投げ、トポロジの処理結果をレスポンスとして受け取ることができる。

何が嬉しいか

HTTPリクエストに対する処理がスケールアウトしやすい

StormのDRPCは、Distributed Remote Procedure Callの略だが、Stormのトポロジの入力を外部から受け付け、トポロジを通して得られた処理結果を外部に返すためのものである。Stormのトポロジは、入力データを流す始点となるSpoutと、データを処理するBoltに分けられるが、DRPCアプリケーション場合、Spoutは外部から入力データを受け付け次のBoltに流し、Boltはデータに処理を行いその結果を次のBoltに渡し、最後のBoltが最終的な処理結果のデータを外部に返す。

Stormのトポロジを構成するSpout、Boltはスケールアウトできるように設計されているので、処理が重い部分・並列分散処理させたい部分の並列数を増やす指定をすることで、実行するスレッド(あるいはプロセスマシン)を増やしたり、逆に処理を分散させたくない部分の並列数を1にしたりもできる。また、クラスタ構成するマシンの台数を増やすことで、アプリケーション内の並列分散処理の性能を高めることができる。マシンを増やした際の作業は、Stormの仕組みのおかげで、非常に簡単で、アプリケーションに変更は必要ない。

DRPCのHTTP APIを用いることで、このStormの並列分散処理の仕組みをそのまま、HTTPリクエストに対する処理に適用することができる。

このメリットイメージ図を551肉まん風に書いてみた。

f:id:LaclefYoshi:20160213215135p:image

マイクロサービスを作りやすい

1つのStormクラスタに、複数のDRPCアプリケーションデプロイすることができる。Netflixなどがよく言っている、マイクロサービスアーキテクチャ実装のために、この仕組みは適している。

1つのDRPCアプリケーションリクエストに対する重厚長大な処理をさせるよりも、複数のDRPCアプリケーションを1つのクラスタの中に並べ、それぞれのDRPCアプリケーションに小さな処理を担当させた方が、より変化に対応しやすい環境となる。たとえば、スケールアウトしやすかったり、処理のある一部分の変更が容易になる。

DRPCのHTTP API仕様

DRPCサーバHTTP APIstorm-core/src/clj/org/apache/storm/daemon/drpc.cljを見ると、

となっていることがわかる。引数を省略した場合、空文字関数に渡される。GETとPOSTの両APIに実行時の違いは無さそうなので、クライアントの都合でどちらを使うかを選択すれば良いようだ。

実際に使ってみた。

Stormクラスタセットアップ

最低限の設定として、ZookeeperとNimbus、DRPCサーバアドレス指定する(すべてlocalhost)。それを使ってコンポーネントを起動する。

$ curl -O http://ftp.meisei-u.ac.jp/mirror/apache/dist/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
$ tar zxvf apache-storm-0.10.0.tar.gz; cd apache-storm-0.10.0/
$ cp conf/storm.yaml conf/storm.yaml.default
$ vim conf/storm.yaml
$ diff conf/storm.yaml.default conf/storm.yaml
18,23c18,22
< # storm.zookeeper.servers:
< #     - "server1"
< #     - "server2"
< #
< # nimbus.host: "nimbus"
< #
---
> storm.zookeeper.servers:
>     - "localhost"
>
> nimbus.host: "localhost"
>
37,39c36,37
< # drpc.servers:
< #     - "server1"
< #     - "server2"
---
> drpc.servers:
>     - "localhost"

$ ./bin/storm dev-zookeeper &
$ ./bin/storm nimbus &
$ ./bin/storm supervisor &
$ ./bin/storm drpc &

Storm DRPCアプリケーションデプロイ

DRPCストリームを含むStormトポロジを、Stormクラスタデプロイする。今回はapache-storm-0.10.0.tar.gzに同梱されている、storm-starterから2つ拝借する。

$ ./bin/storm jar \
  examples/storm-starter/storm-starter-topologies-0.10.0.jar \
  storm.starter.trident.TridentWordCount TridentWordCount
$ ./bin/storm jar \
  examples/storm-starter/storm-starter-topologies-0.10.0.jar \
  storm.starter.BasicDRPCTopology BasicDRPCTopology

DRPCのHTTP APIアクセス

TridentWordCountは時間が経つにつれストリームに流れた文の量が増え、単語カウントが大きくなっていく。適当なタイミングでアクセスしてみた。対象関数wordsで、引数として2つ「the」と「eat」をそれぞれGETとPOSTで渡してみた。

$ curl http://localhost:3774/drpc/words/the
[[2679]]

$ curl -X POST -d eat http://localhost:3774/drpc/words/
[[536]]

DRPCクライアントを用いた時と同様の返り値であるようだ。

次に、BasicDRPCTopologyについて。対象関数exclamationで、引数として「eat」と「Hello」をそれぞれGETとPOSTで渡してみた。

$ curl http://localhost:3774/drpc/exclamation/eat
eat!

$ curl -X POST -d Hello http://localhost:3774/drpc/exclamation
Hello!

こちらも想定通り。

ちなみに、空白を含む文字列などを引数に渡す際にはURLエンコードして渡してやればStormの方でデコードしてくれて、結果もデコードされた文字列で返される。

$ curl http://localhost:3774/drpc/exclamation/The%20cow%20jumped%20over%20the%20moon
The cow jumped over the moon!

DRPCはStormの機能の中でも割とマイナな方だと思っていたが、かなり実用的な進化をしていて嬉しい。drpc.https.* などの設定項目があることから、セキュアな環境もサポートしているようだ。

2005 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2006 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2007 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2008 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2009 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2010 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2011 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2012 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2013 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2014 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2015 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2016 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |