Hatena::ブログ(Diary)

naoyaのはてなダイアリー

May 13, 2008

Hadoop Streaming

id:naoya:20080511:1210506301 のエントリのコメント欄で kzk さんに教えていただいた Hadoop Streaming を試しています。

Hadoop はオープンソースの MapReduce + 分散ファイルシステムです。Java で作られています。Yahoo! Inc のバックエンドや、Facebook、Amazon.com などでも利用されているとのことです。詳しくは CodeZine(コードジン) (kzk さんによる連載記事)を参照してください。

Hadoop Streaming

記事にもあります通り、Hadoop 拡張の Hadoop Streaming を使うと標準入出力を介するプログラムを記述するだけで、Hadoop による MapReduce を利用することができます。つまり、Java 以外の任意の言語で MapReduce が可能ということです。

例によって Apache のアクセスログからステータスコードの回数を数える計算を行ってみます。言語は Perl です。

#!/usr/local/bin/perl
use strict;
use warnings;

while (<>) {
    chomp;
    my @segments = split /\s+/;
    printf "%s\t%s\n", $segments[8], 1;
}

という Mapper (map.pl) と

#!/usr/local/bin/perl
use strict;
use warnings;

my %count;
while (<>) {
    chomp;
    my ($key, $value) = split /\t/;
    $count{$key}++;
}

while (my ($key, $value) = each %count) {
    printf "%s\t%s\n", $key, $value;
}

という Reducer (reduce.pl) を書いて Hadoop に入力を与えると、Hadoop クラスタに分散されて計算が実行されます。map() への入力は標準入力を介して行ベースで渡ってきます。reduce() への入力は key-value ペアがタブ区切りです。

[naoya@colinux hadoop]% hadoop jar ~/hadoop/contrib/hadoop-0.15.3-streaming.jar \
-input httpd_logs \
-output logc_output \
-mapper /home/naoya/work/hadoop/analog/map.pl \
-reducer /home/naoya/work/hadoop/analog/reduce.pl \
-inputformat TextInputFormat \
-outputformat TextOutputFormat

上記のようにコマンドラインで、Mapper と Reducer に map.pl、reduce.pl を与えてやります。

出力は HDFS (Hadoop による分散ファイルシステム) 上に保存されます。HDFS のコマンドで cat します。

[naoya@colinux hadoop]% hadoop dfs -cat logc_output/*
304     262
200     4606
500     43
404     24

MapReduce における reduce() への入力データ構造

Hadoop Streaming は標準入出力を扱うだけで良く、これはUNIX プログラマにとっては非常になじみ深いプログラミングモデルであり、大きな利点であると思います。

ところで、Google 論文 にある Reduce の疑似コードを今一度見てみましょう。

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
    Emit(AsString(result));

reduce に渡ってくる入力は key-value ペアではなく key-values ペアです。map() で emit された key-value ペアは、reduce() に渡る前に、同一のキーでまとめられて、且つキーの順にソートされます。すなわち

# key  => values (Iterator)
200    => [ 1,1,1,1,1,1,1,1,1,... ]
304    => [ 1,1,1,1,1,1,1,1,1,... ]
404    => [ 1,1,1,1,1,1,1,1,1,... ]
500    => [ 1,1,1,1,1,1,1,1,1,... ]

という形のデータ構造になっています。値はリストです。値はメモリに収まりきらないほど大きなリストになる可能性もあるので、Iterator になっています。

MapReduce の計算モデルが簡易でありつつ幅広い分野に応用が利くのは、この reduce() への入力データ構造が一つの鍵になっています。

最たるものはキーが昇順にソートされている点です。例えば MapReduce で検索エンジンの転置インデックスを作る場合は (単語 => ドキュメントIDのリスト) という入力が渡ってくることになります。この入力をストリームで処理して次々と emit してやると、reduce の出力もそのままキー、つまり単語順でソートされることになります。単語順でソートされていればその単語群に対する検索のアルゴリズムでアドバンテージが得られます。

これが、入力が不特定な順番で渡ってくるとなると、メモリに一度全部出力を溜めてから最後にソートする必要が出てきます。MapReduce で扱うようなデータは非常に膨大ですから、多くの場合単一のサーバーのメモリ内でのソートは難しい規模になるでしょう。このソートの必要性を排除するために、MapReduce は reduce() 前に分散環境であらかじめソートを行います。

Hadoop Streaming の reduce() への入力はソートこそされているものの、フラットな行入力です。つまり、

200	1
200	1
200	1
...
304	1
304	1
...

という具合です。このフラットな構造を単純に扱ったのでは MapReduce の利点が一つ失われてしまいます。現に、先のログ解析ではメモリに結果を一度溜めてしまっています。また、カウンタをハッシュテーブルで実装しているため出力がキー順になっていません。

Hadoop Streaming をより使いやすく

そこで、Hadoop Streaming の入力を構造化された状態で扱えるようにする簡単な Perl フレームワークを作りました。

map.pl は以下のように書きます。

#!/usr/bin/env perl

package Analog::Mapper;
use Moose;
with 'Hadoop::Mapper';

sub map {
    my ($self, $key, $value) = @_;

    my @segments = split /\s+/, $value;
    $self->emit($segments[8] => 1);
}

package main;
use FindBin::libs;

Analog::Mapper->run;

一方の reduce.pl は以下です。

#!/usr/bin/env perl

package Analog::Reducer;
use Moose;
with 'Hadoop::Reducer';

sub reduce {
    my ($self, $key, $values) = @_;

    my $count = 0;
    while ($values->has_next) {
        $count++;
        $values->next;
    }
    $self->emit($key, $count);
}

package main;
use FindBin::libs;

Analog::Reducer->run;

$values がイテレータになっている点にご注目ください。イテレータを回すことでストリームデータを構造化された入力として扱うことができます。

実際にこのフレームワークを使って計算させてみます。結果は以下のようになります。キーの順に並びます。

[naoya@colinux hadoop]% hadoop dfs -cat logc_output/*
200     4606
304     262
404     24
500     43

ソースは以下で公開しています。

半日弱ででっち上げたフレームワークなので、まだテストもありませんし、バグもありそうです。また、Hadoop Streaming はオプションにより入出力のフォーマットを切り替えることができますが、現時点では TextInputFormat / TextOutputFormat にしか対応していません。

Hadoop は Java 向け、ということでこれまで食わず嫌いでいましたが、Hadoop Streaming を知って俄然興味が沸いてきました。もう少し細部まで調べてみようと思います。kzk さん、ありがとうございます。連載の今後も期待しています。

kzkkzk 2008/05/14 06:06 お、早速試して頂いたようで有難うございます。

src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

このファイルのreduceクラスを見ると、せっかくキー毎にまとまっているものをわざわざ分解してプログラムに渡しているように見えます。何故こうなってるのかirc.freenode.net #hadoopでも聞いて見たんですがよく分からず。

その他にもcombinerにJavaのクラスしか指定できない等、Streamingにはまだまだ制約も大きいので今改良しようと思ってる所です。

naoyanaoya 2008/05/14 11:11 ソース見てみました。確かに PipeReducer.java で構造化を崩していますね。標準入力に入力を投げる手前、単純化のためにフラットな構造に戻している、などでしょうか。

以下、独り言です。

構造化されたまま標準入力を使おうとした場合に、例えば key <tab>val<tab>val<tab> ... のような行を投げてしまうと、val が大量だった場合に問題になります。受け側であくまで入力をストリームとして処理できるようにするには現状の方法でも、今回自分が作成した Iterator のようなものをクライアント側で用意すれば可能です。

key
<tab>val
<tab>val
<tab>val

key
<tab>val
<tab>val
<tab>val

...
EOF

のようなテキストによる構造 (あるいはストリームな XML など) で流すというのもアリとは思いますが、org.apache.hadoop.mapred 以下の OutputFormat にはその類のものは無いように思いますし、現状は Hadoop Developer ではなくユーザーが頑張れ、ということか、と推測しています。(空気読みすぎ危険)

white9white9 2008/05/14 20:51 う〜。
難しいなぁ。
変なコメントでごめんなさい(--;)