DiaryException このページをアンテナに追加 RSSフィード

2014-12-21(日)

[]Apache Kafka + Druidを使ってインタラクティブ時系列データを集計処理してみた

以下の内容をより実用的・網羅的にまとめ、Kindle電子書籍としてリリースした。

概要、参考資料

Druidは、リアルタイムに(ストリーミングな)時系列データ収集するコンポーネントと、収集したデータセットに対し集計クエリを受け付け処理結果を返すコンポーネントからなるデータベースシステムである

Google技術でいうところのDremel (VLDB2010) とPowerDrill (VLDB2012) との中間に位置するらしく、つまりCloudera Impalaとも似ている。Druidが基準にしている性能指標は、6TBのデータを1桁秒で処理することであると挙げられているが、それを実現するためのシステム要件それほど高くないメモリ特にあればあるだけ安心といった感じである)。

Druidを用いた時系列データ収集

Druidの現在のStableバージョンは0.6.160である。本稿でもこのバージョンを用いる。github上ではバージョン毎にtagが付いているので、コードdruid-0.6.160、そして依存するdruid-api-0.2.14.1を参照する。

最新版と比較すると分かる通り、多くのクラスメソッド現在も変更が加えられており、コードが安定していない。いつ安定するかも不明であるため、Druid内でやる(拡張プラグインを作る)か、Druid外でやる(入力データの前処理や出力結果データの後処理をするためのシステムプログラム別に用意する)かは、判断が難しい。本稿では情報陳腐化を遅らせるため、Druidのコードを書くことを出来る限り避けた活用方針とする。

システムの構築のために、VagrantDockerを用いた。Vagrantfileはno titleを用い、メモリを4GB確保している。

diff --git a/Vagrantfile b/Vagrantfile
index 2a1572e..c877612 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -13,7 +13,7 @@ $num_instances = 1
 $update_channel = "alpha"
 $enable_serial_logging = false
 $vb_gui = false
-$vb_memory = 1024
+$vb_memory = 4096
 $vb_cpus = 1

 # Attempt to apply the deprecated environment variable NUM_INSTANCES to
$ vagrant up  # VM立ち上げ
$ vagrant ssh  # VMに入る
CoreOS (alpha)
core@core-01 ~ $ mkdir zookeeper kafka druid clients
core@core-01 ~ $ which docker
/usr/bin/docker

以降のコンソール操作は、全てVM上でのものである

依存システムの構築

Druidは複数コンポーネントサーバからなる分散システムである。そのシステム構成する複数サーバのコーディネーションのためにZookeeperが用いられる。

Dockerfileは https://github.com/laclefyoshi/druid_test/blob/master/zookeeper

$ cd zookeeper
$ docker build -t laclefyoshi/zookeeper:1.0 .
$ docker run -d -p 2181:2181 --name zookeeper laclefyoshi/zookeeper:1.0

DruidはFirehoseと呼ばれるデータから時系列データを取得する。拡張プラグイン実装することで様々なものをFirehoseとすることができる。例えば、書籍Storm Blueprints: Patterns for Distributed Real-time Computation」の中では、Apache StormのTrident StateをFirehoseとして扱い、Stormの処理結果をそのままDruidの入力データとするような例が掲載されている。

本稿では、メッセージキューイングシステムであるApache Kafka (Version 0.8) をFirehoseとして用いる。Apache Kafkaが依存するZookeeperは上のものを用い、Druidと共用とする。

Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/kafka

$ cd kafka
$ docker build -t laclefyoshi/kafka:1.0 .
$ docker run -d -p 9092:9092 --link zookeeper:zookeeper --name kafka laclefyoshi/kafka:1.0
DruidシステムとDruidサービスの構築

Druidシステムの上で、Druidサービスが動作する。Druidサービスは、サービス名の他、どのFirehoseをデータ入力源とし、どのようなデータを処理対象とするかを定義する。

Druidシステムの構築パターンには、スタンドアロン版とクラスタ版がある。本稿ではシンプルな方、スタンドアロン版の構築について紹介する。スタンドアロン版は、ストリーミングデータ入力永続化クエリの受付と処理・結果返却を1台のマシンで実行する。1つのスタンドアロンシステム上で動作するDruidサービスは1つである。ちなみに、クラスタ版になると、これらの処理が複数マシンで分担され、複数サービスを起動することができる。

Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/druid

$ cd druid
$ docker build -t laclefyoshi/druid:1.0 .
$ docker run -d -p 8083:8083 --link zookeeper:zookeeper --link kafka:kafka --name druid laclefyoshi/druid:1.0

最後に、Apache Kafkaにデータを投入するコンテナを用意する。これはいわゆるKafka Producerであり、Apache Kafkaに以下のようなJSONデータを送信する。

{"timestamp": "2014-12-21T10:17:00", "number": 7, "word": "fox"}

timestampはデータを送信した日時を示す。numberランダム整数で、wordランダム英単語を出力する。

Dockerfileは https://github.com/laclefyoshi/druid_test/tree/master/clients

$ cd clients
$ docker build -t laclefyoshi/clients:1.0 .
$ docker run -d --link zookeeper:zookeeper --link kafka:kafka --name clients laclefyoshi/clients:1.0

以上でシステムの構築は完了した。clientsコンテナからApache Kafkaに投入されたデータは、Druidによってキャッチされ、Druidの入力データとして収集され続ける。

$ docker ps
CONTAINER ID        IMAGE                       COMMAND                CREATED              STATUS              PORTS                    NAMES
4021acc277c7        laclefyoshi/clients:1.0     "bash send_data.sh"    About a minute ago   Up About a minute                            clients
9280e328e97d        laclefyoshi/druid:1.0       "bash ./start_druid_   9 minutes ago        Up 9 minutes        0.0.0.0:8083->8083/tcp   druid
9e17bab05cc2        laclefyoshi/kafka:1.0       "bash ./kafka_create   11 minutes ago       Up 11 minutes       0.0.0.0:9092->9092/tcp   kafka
c0614de5dce0        laclefyoshi/zookeeper:1.0   "./bin/zkServer.sh s   25 minutes ago       Up 25 minutes       0.0.0.0:2181->2181/tcp   zookeeper

Druidを用いた時系列データの集計

clientsコンテナが起動しいくらか時間が経ってから、以下の様なクエリHTTP POSTでdruidコンテナに投げる。

$ cat query.json
{
  "queryType":"timeBoundary",
  "dataSource":"druid_test",
  "intervals":["2014-12-01/2014-12-31"]
}

$ curl -X 'POST' -H 'Content-Type:application/json' -d @query.json http://localhost:8083/druid/v2/?pretty
[ {
  "timestamp" : "2014-12-21T10:17:00.000Z",
  "result" : {
    "minTime" : "2014-12-21T10:17:00.000Z",
    "maxTime" : "2014-12-21T10:51:00.000Z"
  }
} ]

このクエリは、druid_testという名前定義されたDruidサービスに、いつからいつまでのデータが投入されているか(集計対象になっているか)を確認するためのクエリであるサービス名前はDruidのruntime.propertiesのdruid.service定義し、タイムスタンプデータのどのフィールドにあるかはrealtime.specのパーサ設定定義されている。

次に、指定時間内を15分ずつ区切り、各15分間でどの単語が何回出現したかをカウントするクエリを投げる。ついでに、カウントしたら、降順でソートさせる。

$ cat query.json
{
  "queryType": "groupBy",
  "dataSource": "druid_test",
  "granularity": "fifteen_minute",
  "dimensions": ["word"],
  "limitSpec": {
     "type": "default",
     "columns": [
       {"dimension": "count_word", "direction": "DESCENDING"}
      ],
     "limit": 10
  },
  "aggregations": [
    {"type": "count", "name": "count_word"}
  ],
  "intervals": ["2014-12-21T10:20/2014-12-21T10:50"]
} 

$ time curl -X 'POST' -H 'Content-Type:application/json' -d @query.json http://localhost:8083/druid/v2/?pretty
[ {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 4,
    "word" : "lazy"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 4,
    "word" : "the"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 3,
    "word" : "over"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 3,
    "word" : "quick"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 2,
    "word" : "dog"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 2,
    "word" : "jumps"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:15:00.000Z",
  "event" : {
    "count_word" : 1,
    "word" : "fox"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:30:00.000Z",
  "event" : {
    "count_word" : 6,
    "word" : "brown"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:30:00.000Z",
  "event" : {
    "count_word" : 6,
    "word" : "quick"
  }
}, {
  "version" : "v1",
  "timestamp" : "2014-12-21T10:30:00.000Z",
  "event" : {
    "count_word" : 4,
    "word" : "fox"
  }
} ]
real    0m0.035s
user    0m0.003s
sys     0m0.001s

データは1分に1〜2個程度投げられているらしい。合計30分間、15分間隔の処理(つまり処理は2回行われた)に、0.03秒掛かっていることが分かる。

カウントの結果を見てみると、10:15から10:30の間に"lazy"と"the"が4回投げられているし、10:30から10:45の間に、"brown"が6回投げられていることが分かる。これは非常に単純なデータの集計である。例えばデータWebサーバログであったなら、IPアドレスユーザエージェント毎のアクセス数の集計などが可能になるし、SSHDログであれば、アクセス失敗のログ集計から異常検知に応用できる。

Druidが受け付けるクエリはgroupByの他に、ある数値フィールドの合計値を求めるものや、集計結果に対し上位N件を求めるもの、あるフィールドの値によりデータフィルタリングを実行するものなどがある。

まとめ

本稿では、Apache Kafkaに入ったデータ入力とするシンプルなDruidサービスを構築した。実際にデータ収集させ、集計クエリを投げ、結果を得ることを確認した。

DruidのFirehoseとしたいデータ源が、必ずしもDruidの入力に適したフォーマットデータを持っているとは限らない。そのため、データ源とFirehoseの間にApache Stormなど逐次ストリーミングデータ処理システムを挟むか、Druidのパーサを拡張することが必要とされる場面がほとんどだろう。

時間があるときに、クラスタ版Druidシステムの構築、Druidによるデータ永続化Hadoopとの連携を試してみたい。

2005 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2006 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2007 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2008 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2009 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2010 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2011 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2012 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2013 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2014 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2015 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2016 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |
2017 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 |