Hatena::ブログ(Diary)

naoyaのはてなダイアリー

May 11, 2008

MapReduce

"MapReduce" は Google のバックエンドで利用されている並列計算システムです。検索エンジンのインデックス作成をはじめとする、大規模な入力データに対するバッチ処理を想定して作られたシステムです。

MapReduce の面白いところは、map() と reduce() という二つの関数の組み合わせを定義するだけで、大規模データに対する様々な計算問題を解決することができる点です。

MapReduce の計算モデル

map() にはその計算問題のデータとしての key-value ペアが次々に渡ってきます。map() では key-value 値のペアを異なる複数の key-value ペアに変換します。reduce() には、map() で作った key-value ペアを同一の key で束ねたものが順番に渡ってきます。その key-values ペアを任意の形式に変換することで、最終的な出力を得ます。

2004 年に発表された Google の MapReduce 論文 (Google Research Publication: MapReduce) では例として、あるドキュメント内から単語を探す、以下の疑似コードが紹介されています。

map(String key, String value):
  // key: document name
  // value: document contents
  for each word w in value:
    EmitIntermediate(w, "1");

まずは map() です。key-value ペアとして (ドキュメント名 => ドキュメントの内容) が渡って来ます。ドキュメントの内容を解析すると単語のリストが得られます。この単語リストから、(単語 => 1) という key-value ペアを作成して次のフェーズに渡します。EmitIntermediate() が、作った (単語 => 1) の key-value ペアを次のフェーズに渡す関数です。

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
    Emit(AsString(result));

次は reduce() です。reduce() には、map() で作成した key-value ペアが、同一のキーで束ねられたものが渡ってきます。引数 values が Iterator になっているところからも分かるとおり、キーに対する値はリストです。この例では (apple => [1, 1, 1, 1, 1]) といったデータになります。

目的は単語を数えることです。与えられた key-values ペアに対して values に含まれる 1 の数を数えて emit します。これで (apple => 5) というレコードが emit されます。次の reduce() 呼び出しにはまた別の単語に関する key-values が渡って来て、同様に (単語 => 出現回数) のレコードが emit されていきます。

map() と reduce() を定義して MapReduce のシステムに処理を依頼すると、Google のデータセンターにある数千ものホストにこれら map() + reduce() の処理が分散されて計算されます。入力を複数の部分に分割したり、map() で出力したデータを reduce() 用のデータ構造に束ねたりといったことは MapReduce システムが面倒を見ます。また、計算途中にホストが故障した場合に、別のホストが処理を引き継ぐといった冗長性も備えています。Google の論文の最後には、MapReduce に計算を実行させるための C++ のコードが掲載されています。

Perl によるデモ - MapReduce::Lite

MapReduce は、ネットワーク上での複数ホストでの分散 / テラバイト級のデータを扱う前提 / 冗長性などを放棄して、map() と reduce() の組み合わせにより多数の問題を解くフレームワーク部分のみに絞ってみた場合にはそれほど難しくはありません。(もちろん、この前提では、役にも立ちません。)

単一ホスト上で動作させることを前提に Perl で MapReduce のこのコア部分を MapReduce::Lite として実装してみました。MapReduce::Lite という名前とは裏腹に、Moose と ithreads を使っているので非常にヘビーです。ソースコードは github で公開しています。

この MapReduce::Lite を使って Apache のアクセスログを解析し、あるログファイルに含まれる HTTP ステータスコードの出現回数を数えてみます。MapReduce を操作するプログラマになりきった気持ちで、以下のようなログ解析用のコードを書きます。

#!/usr/bin/env/perl

package Analog::Mapper;
use Moose;
with 'MapReduce::Lite::Mapper';

sub map {
    my ($self, $key, $value) = @_;
    my @elements = split /\s+/, $value;
    if ($elements[8]) {
        $self->emit($elements[8], 1);
    }
}

package Analog::Reducer;
use Moose;
with 'MapReduce::Lite::Reducer';

sub reduce {
    my ($self, $key, $values) = @_;
    $self->emit($key, $values->size);
}

package main;
use FindBin::libs;
use MapReduce::Lite;

my $spec = MapReduce::Lite::Spec->new(intermidate_dir => "./tmp");

for (@ARGV) {
    my $in = $spec->create_input;
    $in->file($_);
    $in->mapper('Analog::Mapper');
}

$spec->out->reducer('Analog::Reducer');
$spec->out->num_tasks(3);

mapreduce($spec);

Analog::Mapper に map() を、Analog::Reducer に reduce() を定義します。

  • map() には (ログのファイル名 => ログの一行) のペアが渡ってくるので、行を解析してステータスコードに相当する箇所を取り出し、先の Google 論文の例に同じく (ステータスコード => 1) というペアにして emit します。
  • reduce() の引数には key-values ペアが渡ってきます。values は List::RubyLike (参考:http://d.hatena.ne.jp/naoya/20080419/1208579525) になっているので、size メソッドでリストの要素数を数えて emit します。
  • main 部分では、MapReduce::Lite に計算命令を出すためにパラメータを調整します。

このスクリプトを analog.pl として実行すると、

[naoya@colinux MapReduce-Lite]% perl examples/analog.pl /var/log/httpd/access_log
200 => 4606
304 => 262
404 => 24
500 => 43

という出力が得られます。

もう一つ別に、/etc/passwd から各列の単語の出現回数を数えるプログラムも作ってみました。

#!/usr/bin/env/perl

package TermCount::Mapper;
use Moose;
with 'MapReduce::Lite::Mapper';

sub map {
    my ($self, $key, $value) = @_;
    for (split /:/, $value) {
        next unless $_;
        if (! m!^\d+$!) {
            $self->emit($_ => 1);
        }
    }
}

package TermCount::Reducer;
use Moose;
with 'MapReduce::Lite::Reducer';

sub reduce {
    my ($self, $key, $values) = @_;
    $self->emit( $key => $values->size );
}

package main;

use FindBin::libs;
use MapReduce::Lite;

my $spec = MapReduce::Lite::Spec->new(intermidate_dir => "./tmp");

for (@ARGV) {
    my $in = $spec->create_input;
    $in->file($_);
    $in->mapper('TermCount::Mapper');
}

$spec->out->reducer('TermCount::Reducer');
$spec->out->num_tasks(1);

mapreduce($spec);

map() では /etc/passwd の各行を ":" で split して列に分けて emit、reduce() では例によって 1 の数を数えます。

[naoya@colinux MapReduce-Lite]% perl examples/termcount.pl /etc/passwd | tail
proxy => 2
root => 2
sshd => 1
sync => 2
sys => 2
telnetd => 1
uml-net => 1
uucp => 2
www-data => 2
x => 26

という出力が得られます。左が列の単語、右が出現回数です。

ところでこの単語の出現回数出力、ちょっと手を加えるだけで検索エンジンの転置インデックスが作れることが分かります。map の emit 時に (単語 => 1) ではなく (単語 => ドキュメントID) とします。reduce では (単語 => ドキュメントIDのリスト) とします。出力は単語順に並べられているので、これだけでごくシンプルではありますが、転置インデックスの完成です。MapReduce が検索エンジンのインデックスを作成するのに有効であることが分かります。

GFS と MapReduce

さて、この単一ホストでのみ動作する実装から、大規模並列処理が可能になるまでにどのようなハードルがあるかを考察してみます。

まず、入力ファイルがテラバイト級であった場合です。実はここが一番難しいところです。

Google では入力ファイルを 64MB 程度の chunk に分断して、それを各計算機の Map タスクの入力とします。この時ある特定のホストのローカルに保存された1TBのファイルを 64MB に分断してネットワークで配送するのでは、1TB のディスク I/Oとネットワーク I/O が発生し、そこがボトルネックになってしまいます。Google では分散ファイルシステムの GFS のノードと MapReduce のノードが同一のホストで動作します。GFS には巨大なファイルが分散されて保存されています。MapReduce は、極力対象の入力ファイルをローカルに持つ GFS のノードを選び、そのホストに MapReduce のワーカーを担当させる方法で、最適化を行います。

すなわち、MapReduce は GFS のような分散ファイルシステムとセットであるのが大前提ということになります。

タスクの分散

次に、ネットワーク上での分散です。ここはそれほど難しくないでしょう。ネットワーク上にマスタのホストを用意し、各計算ノード(ワーカー) はマスタへ接続します。マスタはクライアントから計算要求を受け取ったら、入力ファイルの分割、Map、Reduce など各タスクを、計算の状態遷移に合わせてワーカーに指示を出します。

試しに MapReduce::Lite の実装をベースにネットワーク対応のコードも書いてみましたが、まずまず動作しています。(こちらはまだ公開していません。)

Shuffle フェーズ

Map がはき出した key-value ペアを、Reduce 用のデータに集約する処理も考慮すべき点です。

Google の MapReduce ではこの処理は Shuffle と呼ばれています。ネットワーク上のデータ転送のボトルネックを回避するために、Mapper は key-value ペアを小さな中間ファイルとしてローカルのディスクに書き込みます。Reducer は、Mapper のローカルから RPC でその中間ファイルを取得します。このとき、Mapper は特定の分割関数 (key に対するハッシュ関数の結果に Reducer の数の mod を取ったもの) に従い中間ファイルを分割して保存しておき、Reducer はマスタの指示により、その分割されたファイル群から自分自身が必要とする中間ファイルだけを取得するようになっています。

Reducer は中間ファイルを集め終わった後、それらをキーによってソートします。データの数が膨大になると、このソートのアルゴリズムが律速になるように思います。2004年時点の Google のシステムではメモリにフィットする場合はメモリ内でソート、そうでない場合は二次記憶を使う外部ソートアルゴリズムを利用していたようです。

MapReduce::Lite ではメモリ内での処理に限定して、Tie::Hash::Sorted を使っています。

冗長性の確保

冗長性の確保も悩ましい問題です。

マスタがタスクのキューになり、ワーカーの故障を検知したら他のワーカーにタスクを割り振るなどの実装を行うことになるでしょう。ここでもやはり GFS が鍵になります。GFS ではあるデータを分散して冗長に持っていますから、仮にあるホストが故障しても、他のホストが同一のデータを持っていることを保証します。GFS と MapReduce がペアになることで、入力データの分散と冗長性の確保が可能になります。

マスタの冗長性確保も考慮すべき問題です。おそらく MySQL のバイナリログのような形で、タスクの履歴を共有しておき、一方が故障した場合にはもう一方が履歴から処理を再開する、などの実装を行うことになるでしょう。

その他

他にも考慮すべき問題があります。低速なマシンにタスクが割り当てられてしまった場合への対処、イレギュラーな入力によりプログラムがエラーになっても計算が継続できるような仕組み、ネットワーク上でのコマンド/データのやりとりに利用するプロトコルなどなどです。

MapReduce とは何か

こうして MapReduce の全体像を見たとき一つ分かるのは、MapReduce は大規模データを、多数のデータに分割してストリームのようにみせかけ分散処理するためのシステムであるという点です。巨大なデータでも MapReduce のアーキテクチャならストリーム的に処理できるので、各計算をメモリにうまくフィットさせて高速に処理することが可能です。

テラバイト級のデータを細かな塊に分割し、Map と Reduce でも key-value ペアはストリームのように流れていきます。分割されたデータが細かな粒となって次から次へと分散ファイルシステムと MapReduce システムの中を移動し、その途中で姿を変えて、最終出力のストレージへと溜まっていきます。この姿からは TCP/IP のパケットによるストリームデータのやりとりや、UNIX のパイプ & フィルタなどが連想されます。

Google 社内における MapReduce のコードベースは大変な勢いで成長しているようです。2004年の論文から4年が経過した2008年現在、それがどのようなものになっているか、非常に興味が沸きます。

参考文献

kzkkzk 2008/05/12 00:01 HadoopにはHadoop Streamingというaddonが有って、標準入出力を介する事で任意の言語でMapReduceプログラムを記述出来ます。是非試してみて下さい:-)

- インストール方法
-- http://wiki.apache.org/hadoop/Running_Hadoop_On_Ubuntu_Linux_(Single-Node_Cluster)
-- http://www.michael-noll.com/wiki/Running_Hadoop_On_Ubuntu_Linux_(Multi-Node_Cluster)

- HadoopStreamingの資料
-- http://hadoop.apache.org/core/docs/r0.15.3/streaming.html

naoyanaoya 2008/05/12 00:15 おお、了解! 試してみます!

naoyanaoya 2008/05/12 00:16 ドキュメントのサンプルが -mapper /bin/cat -reducer /bin/wc とかでいきなり萌えました。これはッ!

kzkkzk 2008/05/12 00:21 あ、これ貼り忘れました。

http://www.michael-noll.com/wiki/Writing_An_Hadoop_MapReduce_Program_In_Python