Hatena::ブログ(Diary)

hamadakoichi blog このページをアンテナに追加 RSSフィード

2012-06-13

[][] 「Mobageを支える技術」を執筆しました  「Mobageを支える技術」を執筆しました - hamadakoichi blog を含むブックマーク  「Mobageを支える技術」を執筆しました - hamadakoichi blog のブックマークコメント

Mobageを支える技術」が 6/13 に発売されました。DeNAの11人の著者によりMobageの各技術領域の執筆が行われています。私は13章の「大規模データマイニング」を執筆しました。

Mobageを支える技術 ~ソーシャルゲームの舞台裏~ (WEB+DB PRESS plus)

Mobageを支える技術 ~ソーシャルゲームの舞台裏~ (WEB+DB PRESS plus)


内容・思い

13章「大規模データマイニング」の35Pageでは、読まれた方々が実際に大規模データを対象としたデータマイニング機械学習を「実行できる」ようになることを意図し執筆を行いました。Mahoutを用いたデータマイニング機械学習自体の実行とともに、Mahout実行に必要なデータの前処理や、活用に必要な結果データの解析の実装方法も含め、紹介しています。本章を読まれた皆さんそれぞれが、実際に大規模データを対象としたデータマイニング機械学習の実行を試し、各サービスをより魅力的なものとすることの一助となれたら嬉しいです。


当初、担当として与えられた35Pageをどう使うか、内容をどう絞るか、に最も悩みました。本技術領域は、統計解析、データマイニング機械学習、分散処理、等の各技術要素が絡んでおり、大規模分散での実行や活用に関し、日々挑戦を続けている領域です。理論・アルゴリズムから書き出すとその一部だけで全担当ページ数が埋まってしまいますし、固有のデータ・解析に特化しすぎたり専門的すぎる内容もまた読まれた方々自身での活用を困難にしてしまいます。

世の中のより多くのサービスで蓄積データが解析・活用され、継続的なサービス洗練が行われることに役立てる構成にしたい。そう考え、本章では、読まれた方々が実際に大規模データを対象として、Mahout を用い重要手法に関する一連の実行を行えることを意図した内容構成としています。Mahout が発展途上であるゆえに存在する、分野外の人に対する高い敷居を軽減し、データマイニング機械学習自身とともに、必要な関連するデータ前処理や、結果解析の実装・実行も行え、一連の実行を行えるよう意図しています。


Mahout は データマイニング機械学習の分散実装も提供されている非常に有用ライブラリです。しかしながら発展途上のためドキュメントがほとんどなくソースから読み解く必要があり、かつ、実装を理解するには 分散処理とともに、データマイニング機械学習の知識が必要であり、さらには、実行するためには、Mahout 実装にあった入力データ形式へ変換する実装を行う必要があり、結果を活用するためにも結果ファイルの実装も必要である、という様々な敷居が高く存在していると感じます。そして、それらを説明するドキュメントや書籍が欠けている状況。

担当章の内容が、これらの欠けている箇所を埋め敷居を下げ、読まれた皆さんそれぞれが実際に大規模データを対象とし、データマイニング機械学習の実行を試し、それぞれのサービスをより魅力的なものとすることに少しでも役に立てたら嬉しいです。


Mobage解析事例・構成

今回はページ数の制約上、実行を中心に書いていますが、Mobageでの大規模データマイニング事例、関連構成、PDCAサイクル、等に関し、カンファレンスで話した資料も以下に記載します。ぜひご覧下さい。

データマイニング機械学習 手法

また、データマイニング機械学習の各種方法論に関しても、導入的な内容から講師を行った内容を資料公開していますので興味がある方はこちらをご覧下さい。

データマイニング機械学習の実サービス適用事例

各サービス・ビジネス領域でのデータマイニング活用に関する事例に関しても、運営しているコミュニティで分野を越えてノウハウ共有・議論を行っています。興味のある方はこちらをご覧頂けたらと思います。


Mobageを支える技術

Mobageを支える技術DeNAの各技術領域に関し豪華執筆メンバーが執筆しています。興味がある分野がありましたらぜひご一読頂けたら嬉しいです。

Amazon 執筆者一覧

目次

Part1 ソーシャルゲーム開発技術

1章 ソーシャルゲーム概論

1-1 ソーシャルゲームとは

1-2 ソーシャルゲーム運用


2章 ブラウザベースのソーシャルゲーム(フィーチャーフォン)

2-1 フィーチャーフォンとは

2-2 フィーチャーフォン向けWebアプリケーション開発

2-3 フィーチャーフォン向けのHTML

2-4 Flash Lite

2-5 ソーシャルゲームUI設計

2-6 セキュリティ対策


3章 ブラウザベースのソーシャルゲーム(スマートフォン)

3-1 スマートフォンの利点

3-2 UIとUXにおける工夫

3-3 ハイパフォーマンス・ソーシャルゲーム

3-4 アニメーションテクニック


4章 アプリケーション版のソーシャルゲーム開発

4-1 アプリケーション方式の構成

4-2 アプリケーション形式のメリットとデメリット

4-3 ゲームエンジンを使う


Part2 ソーシャルゲーム運用技術

5章 35億PV/dayをさばくインフラ構成

5-1 ソーシャルゲームインフラの構成

5-2 Webアプリケーション層における工夫


6章 データベースレプリケーション

6-1 DB層で重要視される指標

6-2 レプリケーションの活用

6-3 レプリケーション遅延への対策

6-4 レプリケーション遅延を防ぐベストプラクティス


7章 データベースの高性能化/高可用性化

7-1 性能管理と台数削減

7-2 マスター分割(Sharding)の戦略

7-3 サービスの拡大/集約と無停止メンテナンス

7-4 マスターの自動フェイルオーバ


8章 数千台のサーバを運用する技術

8-1 サーバ情報の管理

8-2 サーバセットアップ

8-3 監視


Part3 ソーシャルゲーム効率化技術

9章 MySQLとの付き合い方

9-1 大規模環境におけるデータベースプログラミング

9-2 データベースプログラミングにおけるテスト手法

9-3 MySQLのストアドプロシージャ/トリガ/イベントスケジューラ

9-4 RESTful APIの考え方と実際


10章 Job QueueとMessage Queue

10-1 Job Queue/Message Queueの概要

10-2 Q4Mを利用したworkerの実装


11章 アプリケーションチューニング

11-1 キャッシュを利用する

11-2 さまざまなDNSキャッシュ

11-3 アプリケーションチューニング


12章 DevOps

12-1 ログ監視

12-2 運用


Part4 ソーシャルゲーム分析技術

13章 大規模データマイニング

13-1 ユーザの楽しさの法則とサービス洗練

13-2 Clustering

13-3 Frequent Pattern Mining

13-4 Classification

13-5 Recommendation

Mobageを支える技術 ~ソーシャルゲームの舞台裏~ (WEB+DB PRESS plus)

Mobageを支える技術 ~ソーシャルゲームの舞台裏~ (WEB+DB PRESS plus)

あわせて読みたい

書評

2012-05-04

[] 大規模データマイニング機械学習 Mahout 活用に向けて読んでおきたい12のプレゼン資料  大規模データマイニング・機械学習 Mahout 活用に向けて読んでおきたい12のプレゼン資料 - hamadakoichi blog を含むブックマーク  大規模データマイニング・機械学習 Mahout 活用に向けて読んでおきたい12のプレゼン資料 - hamadakoichi blog のブックマークコメント

2012年度が始まり1ヶ月が経ちました。2011年度は、大規模分散処理技術・データ基盤の普及が広く進んだ年だったと思います。2012年はそれら蓄積された大規模データを活用しデータマイニング機械学習を用い、ビジネス・サービス洗練を大きく広げていく年ではないでしょうか。

Mahoutは 大規模分散データマイニング機械学習ライブラリです。ApacheプロジェクトのOpen Sourceで、Hadoop上で動作しデータマイニング機械学習の大規模分散実行を行うことができます。


大規模分散 データマイニング機械学習を実行できる Mahout ですが、まだ「ドキュメント整備が発展途上で詳細を知るためにはソースコードから読み解く」必要がある場合が多く、また、活用には「対象とするデータマイニング機械学習の基礎知識」が必要なため、まだまだ活用の敷居が高いのが現状ではないでしょうか。

そこで今回は、これらの敷居を下げる、大規模データマイニング機械学習の実際の活用に必要な「対象データマイニング機械学習手法の導入的内容」から「Mahoutでの実際の実行方法」までのノウハウが公開されている、12のプレゼン資料を紹介したいと思います。Clustering、Classification、Pattern Mining、Recommendation、Algorithm全般、の各カテゴリごとに紹介します。


Clustering


Classification


Frequent Pattern Mining


Recommendation


Algorithms



以上、大規模データマイニング機械学習 活用へ向けた、「対象データマイニング機械学習手法の導入的内容」から「Mahoutでの実際の実行方法」までの12のプレゼン資料でした。他にもMahoutに関しオススメの資料がありましたらぜひ [Twitter:@hamadakoichi] へ教えて下さい。

2011-12-29

[][]「Mobageの大規模データマイニング」- #PRMU 2011 Big Data and Cloud で講演してきました 「Mobageの大規模データマイニング」- #PRMU 2011 Big Data and Cloud で講演してきました  - hamadakoichi blog を含むブックマーク 「Mobageの大規模データマイニング」- #PRMU 2011 Big Data and Cloud で講演してきました  - hamadakoichi blog のブックマークコメント

電子情報通信学会「パターン認識とメディア理解研究会 (PRMU: Pattern Recognition and Media Understanding)」@幕張メッセ国際会議場 で招待講演をしてきました。

Mobageの大規模データマイニング」に関して、話しています。

2900万人以上の登録会員をかかえるモバイルソーシャルゲームプラットフォーム「Mobage」では、1日20億超の行動情報が蓄積されています。これらの大規模行動データを対象に、データマイニング機械学習の各種方法論を適用することにより、隠された法則を解明・より良い解を導出し、迅速なサービス洗練を実現しています。今回の講演では、これらMobageでの大規模データマイニングに関し紹介しています。


PRMU 2011 Big Data and Cloud :Webスケール時代のパターン認識

一億台ものコンピュータが連携して地球規模でサービスを提供するクラウドが出現して以来,情報の共有が人間ネットワークを介して 瞬時に行われるソーシャルメディアが生まれ,さらには散在する情報を幅広く収集しそれを価値化して提供するという情報の社会化の 時代を迎えようとしています.3月11日の東日本大地震をきっかけに,地球規模の気象・環境・生態系,経済活動・人の動きを認識す ることに注目が集まっています.これらの分野に対する注目は一過性のものではなく,ネットワークの進化や経済発展と 環境への関心 が高まるにつれ,今後も引き続き発展することが考えられ,ここに,パターン認識・メディア理解技術が貢献できると考えられます.

そこで,10月のPRMU研究会では,「PRMUの拓く未来:Webスケール時代のパターン認識」という大きなテーマで開催し,「ソーシャルメディア」,「クラウド」,「地球環境」,「経済活動」の4つの分野におけるパターン認識研究のあるべき姿を議論いたしま

https://sites.google.com/site/bigdataandclould/

今回のPRMUのテーマは"Big Data and Cloud :Webスケール時代のパターン認識"でした。

Big Data and Cloud :Webスケール時代のパターン認識

今回のビッグデータに関するデータ処理、クラウド応用に関するものでとにかく日本で考えられる大規模データ処理を行っていそうな人をリストアップし参加してもらうことにしました。


Mobageの大規模データマイニングを紹介いただく濱田晃一(DeNA)さんから、統計数理の大御所、樋口知之(統計数理研)先生、金融系、消費者行動のモデリング、SNSデータ解析など考え付く一線の招待講演者を固めています。 最終日は、グーグルの栗原さん、楽天の森さんにもご参加いただき、機械学習気鋭の研究者杉山先生、web crawlingの鳥澤さん、PRMU委員長の山田さんのパネルで将来のWebスケール時代のデータ処理の方向性を議論します。

https://sites.google.com/site/bigdataandclould/

開催2日間を通じ、上記豪華メンバーが講演しています。Tweet まとめも以下に記載します。

PRMU研究会 −Big Data and Cloud:Webスケール時代のパターン認識−#PRMU - Togetter

学術・ビジネス連携、日本から世界への新たなサービス発信

今回、学術領域での講演を機会を頂け嬉しく思っています。セッション後や懇親会で、学術・ビジネス領域ともにたくさんの方々に声を掛けてもらえ、出会い、話すことができました。

楽しさのマイニングのユーザー体験還元。今後、世界中の人々へ対しても、それぞれの国民性・民族性にあった楽しさの提供。今後、学術・ビジネスお互い協力し、一緒に日本から世界へ新たなサービス発信をしていきたい。そう私は考えています。学術、ビジネス、双方の力を合わせて日本から世界へ。

最後に、招待講演の機会を下さったPRMU副委員長 NTTドコモの栄藤稔さん、私の各種調整を担当してく下さったパナソニックの石井育規さん、懇親会でたくさんの方々を紹介して下さった 中部大学の藤吉弘亘さん、ありがとうございました。統計数理研究所所長の樋口知之さんとも理論物理の博士つながりで今後のデータマイニングの展開に関し面白い議論をさせて頂けました。Googleの栗原賢一さんも博士時代の研究に統計物理に関する共通点があり機械学習、物理を超えて面白い議論をさせて頂きました。

今回の開催を通じ、出会い話し議論できたみなさんに感謝しています。ありがとうございました。今後ともよろしくお願い致します。


関連資料

参考文献

Mahout in Action

Mahout in Action

Hadoop徹底入門

Hadoop徹底入門

Hadoop 第2版

Hadoop 第2版

2011-05-16

[][][] Perl で MapReduce - Mahout Frequent Pattern Mining Data -  Perl で MapReduce - Mahout Frequent Pattern Mining Data - - hamadakoichi blog を含むブックマーク  Perl で MapReduce - Mahout Frequent Pattern Mining Data - - hamadakoichi blog のブックマークコメント

最近、Perl も書き始めてみたので、Hadoop 上で分散実行できる Perl での MapReduce 実装を紹介する。大規模データマイニング機械学習ライブラリ Apache Mahout の Parallel Frequent Pattern Mining の入力データを生成する Perl MapReduce 実装の紹介。

Frequent Pattern Mining 入門

Frequent Pattern Mining (Association Analysis )は、隠されたルールパターンを抽出するアルゴリズム。有名な例としては、1992年のウォルマートのクリスマス商戦で「おむつを買った人は半ダースのビールを買う可能性が最も高い」という頻出ルールを抽出し、商品陳列に活かした売上向上した事例。

入門資料:

データ

MovieLens Data Sets 100k Ratings のデータを例に紹介する。

このデータは 943 userが 1682 movieに対して行った 100,000 の評価データ。

以下、HDFSの movielens/ 下に配置し実行。


データ形式
 userid | itemid | rating | timestamp(unixtime)
データ例: ua.test
1	20	4	887431883
1	33	4	878542699
1	61	4	878542420
1	117	3	874965739
1	155	2	878542201
1	160	4	875072547
1	171	5	889751711
1	189	3	888732928
1	202	5	875072442
1	265	4	878542441
2	13	4	888551922
2	50	5	888552084
2	251	5	888552084
...

Perl MapReduce

Perl での MapperとReducerの実装。各userごとに 5段階評価の中で4以上の好評価のmovieのid を ","(カンマ)区切りでつなげ出力する。

Mapperで (key, value) を (userid, 4以上の評価movieid)で出力し、Reducerで useridごとに、movieidsをカンマ区切りでつなげる。

#!/usr/bin/perl
use strict;

my $SEP = "\t";
my $NSEP = ",";

main();

sub main{
        if($ARGV[0] eq 'map'){
                mapper();
        }elsif($ARGV[0] eq 'reduce'){
                reducer();
        }
}

sub mapper{
        while(my $line = <STDIN>){
                chomp($line);
                my @values = split(/$SEP/, $line);
		if(@values != 0){ 
         	        my ($userid, $itemid, $rating, $timestamp) = @values;		
			if($rating >= 4){
       				print join($SEP, @values), "\n";
			}
		}
        }
}

sub reducer{
	my %eval;
        while(my $line = <STDIN>){
                chomp($line);
                my ($userid, $itemid) = split(/$SEP/, $line);
		if(!defined($eval{$userid})){
			$eval{$userid} .= $itemid;
		}else{
			$eval{$userid} .= $NSEP.$itemid;
		}
        }
	while(my ($key, $itemids) = each(%eval)){
                print join($NSEP, $itemids), "\n";
	}
}

実行

Hadoop Streaming を用いた Perl MapReduce 実行

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-x.xx.x-streaming.jar  \
-files mapreduce.pl \
-mapper "mapreduce.pl map" \
-reducer "mapreduce.pl reduce" \ 
-numReduceTasks 50 \ 
-input movielens/ua.test \
-output movielens/out

出力

出力:movielens/out/part-00000, part-00001,...

以下のMahout の Paralel Frequent Patten Mining の入力データ形式で出力が行われる。

各userごとに 5段階評価の中で4以上の好評価のmovieのid が ","(カンマ)区切りでつなげられたデータ。


movielens/out/part-00000

227,229,243,288,294,300,380,449,450,748
246,249,250,257,276,1012
2,4,8,11,14,22,56,58,64,68,69,70,76,78,81,91,101,117,121,124,125,127,135,139,141,147,154,156,157,164,168,169,171,176,180,181,186,188,196,198,203,204,206,207,209,210,214,223,226,227,228,230,234,238,246,249,250,262,265,268,270,272,273,276,282,288,290,294,301,302,307,315,317,318,324,333,340,347,357,403,404,408,417,418,419,420,423,427,429,432,436,448,463,471,474,496,547,550,559,566,568,569,578,581,588,628,640,649,652,684,693,732,737,742,746,747,751,770,806,843,845,853,854,922,969,1129,1135,1172,1218,1220,1240
6,9,11,12,14,17,22,25,30,31,33,42,79,83,86,96,98,100,131,132,143,155,166,177,178,179,180,185,187,190,191,197,199,203,213,237,241,269,272,287,301,307,311,312,313,318,347,357,367,382,421,423,425,435,471,478,483,486,489,494,499,501,506,507,512,521,526,529,602,603,604,611,613,614,618,631,648,652,657,659,662,690,692,705,707,713,739,813,855,900,903,923,945,958,964,966,971,995,1020,1048,1086,1101,1125,1193,1194,1197,1198,1200
258,288,302,313,315,333,895,896,1127
...

※Mahout の Parallel Frequent Pattern Mining を用いた Java実装方法に関しても改めて別エントリで紹介します。

関連資料

2011-05-07

[][] Mahout RandomForest Driver 実装法 -大規模分散 機械学習・判別 -  Mahout RandomForest Driver 実装法 -大規模分散 機械学習・判別 -  - hamadakoichi blog を含むブックマーク  Mahout RandomForest Driver 実装法 -大規模分散 機械学習・判別 -  - hamadakoichi blog のブックマークコメント

Apache Mahout は、Hadoop上で動作する大規模分散データマイニング機械学習ライブラリ

Random Forest は大規模データで高精度の分類・判別を実現するアルゴリズム


Random Forestを、"R言語での実行のように容易"に "大規模分散 学習・判別"できるように、

Mahout を用いた各種 Driver を実装しました。

以下に実行方法、実装を紹介します。


  • org.mahoutjp.df.ForestDriver
    • Random Forest の分散学習から、分散判別、判別結果出力、および、精度評価まで行う Driver。
  • org.mahoutjp.df.ForestClassificationDriver
    • 生成された Forest Modelを用いて、分散判別、判別結果出力、および、精度評価まで行う Driver。

両 Driver とも、1コマンドで容易に分散実行できる実装にしています。

コマンドライン実行とともに、public static の ForestDriver.run(...), ForestClassificationDriver.run(...) も用意しており、シンプルに呼び出し実行できます。

Mahoutの Example SoucrCode にある、出力結果が読めない等の各種あった課題も、解決する実装にしています。


Random Forest 入門

Random Forest は、大規模データでも他分類器に比べて高精度・高速に判別できるアルゴリズム

入門資料:


org.mathoujp.df.ForestDriver: 分散学習・判別

実行法

Forest Modelの分散学習から、分散判別、判別結果出力、および、精度評価まで行えます。

$HADOOP_HOME/bin/hadoop jar mahoutjp-0.1-job.jar \
org.mahoutjp.df.ForestDriver \
- ... (Options)
実行オプション

以下の実行オプションを指定できます。

Options
  --data (-d) path                                   Data path 
  --descriptor (-dsc) descriptor [descriptor ...]    data descriptor (N:Numerical, C:Categorical, L:Label)
  --descriptor_out (-ds) file path                   Path to generated descriptor file 
  --selection (-sl) m                                Number of variables to select randomly at each tree-node
  --nbtrees (-t) nbtrees                             Number of trees to grow
  --forest_out (-fo) dir path                        Output path of Decision Forest
  -oob                                               Optional, estimate the out-of-bag error
  --seed (-sd) seed                                  Optional, seed value used to initialise the Random number generator
  --partial (-p)                                     Optional, use the Partial Data implementation
  --testdata (-td) dir path                          Test data path
  --predout (-po) dir path                           Path to generated prediction output file
  --help (-h)                                        Print out help
データ形式

入力形式:

dataid, attributes1, attribute2, ... (comma sep)

出力形式:

dataid, predictionIndex, realLabelIndex (tab sep)
実行例
$HADOOP_HOME/bin/hadoop jar mahoutjp-0.1-job.jar \
org.mahoutjp.df.ForestDriver \
-Dmapred.max.split.size=5074231 \
-d testdata/kdd/KDDTrain.arff \
-ds testdata/kdd/KDD.info \
-fo testdata/kdd/forest \
-dsc N 4 C 2 N C 4 N C 8 N 2 C 19 N L \
-oob \
-sl 7 \
-p \
-t 500 \
-td testdata/kdd/KDDTest \
-po testdata/kdd/predictions

※Data Descriptorの短縮形表記。個数 Type で短縮表記ができる。 "N N N I N N C C L I I I I I"は"3 N I N N 2 C L 5 I"。

データ:

NSL-KDD データ

データ定義行(@で始まる行)を削除。dataIdを1列目に付加。

学習データ:

dataid, attributes1, attribute2, ...

1000001,0,"tcp","ftp_data","SF",491,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,2,0,0,0,0,1,0,0,150,25,0.17,0.03,0.17,0,0,0,0.05,0,"normal"
1000002,0,"udp","other","SF",146,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,13,1,0,0,0,0,0.08,0.15,0,255,1,0,0.6,0.88,0,0,0,0,0,"normal"
1000003,0,"tcp","private","S0",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,123,6,1,1,0,0,0.05,0.07,0,255,26,0.1,0.05,0,0,1,1,0,0,"anomaly"
...

テストデータ:

testdata/kdd/Test/part-00000

2000001,0,"tcp","private","REJ",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,229,10,0,0,1,1,0.04,0.06,0,255,10,0.04,0.06,0,0,0,0,1,1,"anomaly"
2000002,0,"tcp","private","REJ",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,136,1,0,0,1,1,0.01,0.06,0,255,1,0,0.06,0,0,0,0,1,1,"anomaly"
2000003,2,"tcp","ftp_data","SF",12983,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,1,0,0,134,86,0.61,0.04,0.61,0.02,0,0,0,0,"normal"
...
出力結果:
  • testdata/kdd/predictions/part-00000, part-00001, part-00002

dataid, predictionIndex, realLabelIndex

testdata/kdd/predictions/part-00000

2000001	1	1
2000002	1	1
2000003	0	0
...
実行結果ログ(抜粋)

精度評価まで実行される。

11/05/07 21:32:09 INFO df.ForestDriver: Generating the descriptor...
11/05/07 21:32:09 INFO df.ForestDriver: generating the descriptor dataset...
11/05/07 21:32:12 INFO df.ForestDriver: storing the dataset description
11/05/07 21:32:13 INFO df.ForestDriver: Partial Mapred
11/05/07 21:32:13 INFO df.ForestDriver: Building the forest...
...
11/05/07 21:35:24 INFO df.ForestDriver: Build Time: 0h 3m 11s 360
11/05/07 21:35:26 INFO df.ForestDriver: oob error estimate : 0.0016114564232017972
11/05/07 21:35:26 INFO df.ForestDriver: Storing the forest in: testdata/kdd/forest/forest.seq
...
11/05/07 21:36:00 INFO df.ForestClassifier: # of data which cannot be predicted: 0
11/05/07 21:36:00 INFO df.ForestClassificationDriver: =======================================================
Summary
-------------------------------------------------------
Correctly Classified Instances          :      18013       79.9015%
Incorrectly Classified Instances        :       4531       20.0985%
Total Classified Instances              :      22544

=======================================================
Confusion Matrix
-------------------------------------------------------
a       b       <--Classified as
9453    4273     |  13726       a     = "normal"
258     8560     |  8818        b     = "anomaly"
Default Category: unknown: 2

org.mathoujp.df.ForestClassificationDriver: 分散判別

分散分類のみを実行。生成されたForest Modelを用いて、分散判別、判別結果出力、および、精度評価まで行えます。

実行法
$HADOOP_HOME/bin/hadoop jar mahoutjp-0.1-job.jar \
org.mahoutjp.df.ForestClassifyDriver \
-...(Options)
実行Option
  --testdata (-td) dir path       Test data path
  --dataset (-ds) file path       Dataset path
  --model (-m) dir path           Path to the Decision Forest
  --predout (-po) dir path        Path to generated predictions file
  --analyze (-a)                  Analyze Results
  --help (-h)                     Print out help
実行例
$HADOOP_HOME/bin/hadoop jar mahoutjp-0.1-job.jar \
org.mahoutjp.df.ForestClassificationDriver \
-td testdata/kdd/KDDTest \
-ds testdata/kdd/KDD.info \
-m testdata/kdd/forest \
-po testdata/kdd/predictions \
-a


ソース

概要:

  • Random Forestの分散学習・判別
    • org.mahoutjp.df.ForestDriver
  • Forest Modelによる分散判別
    • org.mahoutjp.df.ForestClassificationDriver
    • org.mahoutjp.df.ForestClassifier

org.mahoutjp.df.ForestDriver

package org.mahoutjp.df;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.RandomUtils;
import org.apache.mahout.df.DFUtils;
import org.apache.mahout.df.DecisionForest;
import org.apache.mahout.df.ErrorEstimate;
import org.apache.mahout.df.builder.DefaultTreeBuilder;
import org.apache.mahout.df.callback.ForestPredictions;
import org.apache.mahout.df.data.Data;
import org.apache.mahout.df.data.DataLoader;
import org.apache.mahout.df.data.Dataset;
import org.apache.mahout.df.data.DescriptorException;
import org.apache.mahout.df.data.DescriptorUtils;
import org.apache.mahout.df.mapreduce.Builder;
import org.apache.mahout.df.mapreduce.inmem.InMemBuilder;
import org.apache.mahout.df.mapreduce.partial.PartialBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Random Forest Driver
 * 
 * @author hamadakoichi
 */
public class ForestDriver extends AbstractJob {

  private static final Logger log = LoggerFactory.getLogger(ForestDriver.class);

  @Override
  public int run(String[] args) throws Exception {

		DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
		ArgumentBuilder abuilder = new ArgumentBuilder();
		GroupBuilder gbuilder = new GroupBuilder();

		Option dataOpt = obuilder.withLongName("data").withShortName("d")
				.withRequired(true).withArgument(abuilder.withName("file path").withMinimum(1).withMaximum(1).create())
				.withDescription("Data file path")
				.create();

		Option descriptorOpt = obuilder
				.withLongName("descriptor").withShortName("dsc")
				.withRequired(true)
				.withArgument(abuilder.withName("descriptor").withMinimum(1).create())
				.withDescription("data descriptor file path").create();

		Option descPathOpt = obuilder.withLongName("descriptor_out")
				.withShortName("ds")
				.withRequired(true).withArgument(abuilder.withName("file path").withMinimum(1).withMaximum(1).create())
				.withDescription("Path to generated descriptor file").create();

		Option selectionOpt = obuilder
				.withLongName("selection").withShortName("sl")
				.withRequired(true)
				.withArgument(abuilder.withName("m").withMinimum(1).withMaximum(1).create())
				.withDescription("Number of variables to select randomly at each tree-node").create();

		Option oobOpt = obuilder.withShortName("oob").withRequired(false)
				.withDescription("Optional, estimate the out-of-bag error").create();

		Option seedOpt = obuilder
				.withLongName("seed").withShortName("sd")
				.withRequired(false)
				.withArgument(abuilder.withName("seed").withMinimum(1).withMaximum(1).create())
				.withDescription("Optional, seed value used to initialise the Random number generator").create();

		Option partialOpt = obuilder.withLongName("partial").withShortName("p")
				.withRequired(false).withDescription("Optional, use the Partial Data implementation").create();

		Option nbtreesOpt = obuilder.withLongName("nbtrees").withShortName("t")
				.withRequired(true).withArgument(abuilder.withName("nbtrees").withMinimum(1).withMaximum(1).create())
				.withDescription("Number of trees to grow").create();

		Option forestOutOpt = obuilder.withLongName("forest_out").withShortName("fo")
				.withRequired(true).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create())
				.withDescription("Output path of Decision Forest").create();

		
		//For Predictions
		Option testDataOpt = obuilder.withLongName("testdata").withShortName("td")
				.withRequired(true).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create())
				.withDescription("Test data path").create();

		Option predictionOutOpt = obuilder.withLongName("predout").withShortName("po")
				.withRequired(true).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create()).
				withDescription("Path to generated prediction output file").create();		

		Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();

		Group group = gbuilder.withName("Options").withOption(oobOpt)
				.withOption(dataOpt).withOption(descriptorOpt).withOption(descPathOpt)
				.withOption(selectionOpt).withOption(seedOpt).withOption(partialOpt)
				.withOption(nbtreesOpt).withOption(forestOutOpt)
				.withOption(testDataOpt).withOption(predictionOutOpt)
				.withOption(helpOpt).create();
    
		try {
			Parser parser = new Parser();
			parser.setGroup(group);
			CommandLine cmdLine = parser.parse(args);

			if (cmdLine.hasOption("help")) {
				CommandLineUtil.printHelp(group);
				return -1;
			}

			// Forest Parameter
			boolean isPartial = cmdLine.hasOption(partialOpt);
			boolean isOob = cmdLine.hasOption(oobOpt);
			String dataName = cmdLine.getValue(dataOpt).toString();
			List<String> descriptor = convert(cmdLine.getValues(descriptorOpt));
			String descriptorName = cmdLine.getValue(descPathOpt).toString();
			String forestName = cmdLine.getValue(forestOutOpt).toString();
			int m = Integer.parseInt(cmdLine.getValue(selectionOpt).toString());
			int nbTrees = Integer.parseInt(cmdLine.getValue(nbtreesOpt).toString());
			Long seed = 0L ; 
			if (cmdLine.hasOption(seedOpt)) {
				seed = Long.valueOf(cmdLine.getValue(seedOpt).toString());
			}

			// Classification Parameters
			String testdataName =  cmdLine.getValue(testDataOpt).toString();
			String predictionOutName =  cmdLine.getValue(predictionOutOpt).toString();
			

			log.debug("data : {}", dataName);
			log.debug("descriptor: {}", descriptor);
			log.debug("descriptorOut: {}", descriptorName);
			log.debug("forestOut    : {}", forestName);
			log.debug("m : {}", m);
			log.debug("seed : {}", seed);
			log.debug("nbtrees : {}", nbTrees);
			log.debug("isPartial : {}", isPartial);
			log.debug("isOob : {}", isOob);
			log.debug("testData : {}", testdataName);
			log.debug("predictionOut : {}", predictionOutName);

			// Execute
			run(getConf(), dataName, descriptor, descriptorName, forestName, m, 
					seed, nbTrees, isPartial, isOob, testdataName, predictionOutName);
			
		} catch (OptionException e) {
			log.error("Exception", e);
			CommandLineUtil.printHelp(group);
			return -1;
		}
		return 0;
    }

    public static void run(Configuration conf, String dataPathName, List<String> description,
		String descriptorPathName, String forestPathName, int m, Long seed, int nbTrees,
		boolean isPartial, boolean isOob, 
		String testdataPathName, String predictionOutPathName)
    		throws DescriptorException, IOException, ClassNotFoundException, InterruptedException {
   
	  	// Create Descriptor
		Path dataPath = validateInput(dataPathName);
		Path descriptorPath = validateOutput(descriptorPathName);
	  	createDescriptor(conf, dataPath, description, descriptorPath);
		
		// Build Forest
		Path forestPath = validateOutput(forestPathName);
		buildForest(conf, dataPath, description,
				descriptorPath, forestPath, m, seed, nbTrees,
				isPartial, isOob);
		
		// Predict
		boolean analyze = true;
		ForestClassificationDriver.run(conf, forestPathName, testdataPathName, 
				descriptorPathName, predictionOutPathName, analyze);
    }	  
	/**
	 * Greate Descreptor
	 */	
	private static void createDescriptor(Configuration conf, Path dataPath, List<String> description,
			Path outPath) throws DescriptorException, IOException {
		
		log.info("Generating the descriptor...");
		String descriptor = DescriptorUtils.generateDescriptor(description);
		log.info("generating the descriptor dataset...");
		Dataset dataset = generateDataset(descriptor, dataPath);
		log.info("storing the dataset description");

		DFUtils.storeWritable(conf, outPath, dataset);
	}

	/**
	 * Generate DataSet
	 */	
	private static Dataset generateDataset(String descriptor, Path dataPath)
			throws IOException, DescriptorException {
		
		FileSystem fs = dataPath.getFileSystem(new Configuration());		
		Path[] files = DFUtils.listOutputFiles(fs, dataPath);
		return DataLoader.generateDataset(descriptor, fs, files[0]);
	}

	/**
	 * Build Forest
	 */	
	private static void buildForest(Configuration conf, Path dataPath, List<String> description,
			Path descriptorPath, Path forestPath, int m, Long seed, int nbTrees,
			boolean isPartial, boolean isOob)
			throws IOException, ClassNotFoundException,InterruptedException {

		FileSystem ofs = forestPath.getFileSystem(conf);
		if (ofs.exists(forestPath)) {
			log.error("Forest Output Path already exists");
			return;
		}

		DefaultTreeBuilder treeBuilder = new DefaultTreeBuilder();
		treeBuilder.setM(m);
		Dataset dataset = Dataset.load(conf, descriptorPath);

		ForestPredictions callback = isOob ? new ForestPredictions(dataset
				.nbInstances(), dataset.nblabels()) : null;

		Builder forestBuilder;

		if (isPartial) {
			log.info("Partial Mapred");
			forestBuilder = new PartialBuilder(treeBuilder, dataPath,
					descriptorPath, seed, conf);
		} else {
			log.info("InMemory Mapred");
			forestBuilder = new InMemBuilder(treeBuilder, dataPath,
					descriptorPath, seed, conf);
		}

		forestBuilder.setOutputDirName(forestPath.getName());

		log.info("Building the forest...");
		long time = System.currentTimeMillis();

		DecisionForest forest = forestBuilder.build(nbTrees, callback);

		time = System.currentTimeMillis() - time;
		log.info("Build Time: {}", DFUtils.elapsedTime(time));

		if (isOob) {
			Random rng;
			if (seed != null) {
				rng = RandomUtils.getRandom(seed);
			} else {
				rng = RandomUtils.getRandom();
			}

			FileSystem fs = dataPath.getFileSystem(conf);
			int[] labels = Data.extractLabels(dataset, fs, dataPath);

			log.info("oob error estimate : "
					+ ErrorEstimate.errorRate(labels, callback
							.computePredictions(rng)));
		}
		
		// store the forest
		Path forestoutPath = new Path(forestPath, "forest.seq");
		log.info("Storing the forest in: " + forestoutPath);
		DFUtils.storeWritable(conf, forestPath, forest);
	}

	/**
	 * Load data
	 */	
	protected static Data loadData(Configuration conf, Path dataPath,
			Dataset dataset) throws IOException {
		log.info("Loading the data...");
		FileSystem fs = dataPath.getFileSystem(conf);
		Data data = DataLoader.loadData(dataset, fs, dataPath);
		log.info("Data Loaded");

		return data;
	}

	/**
	 * Convert Collections to a String List
	 */	
	private static List<String> convert(Collection<?> values) {
		List<String> list = new ArrayList<String>(values.size());
		for (Object value : values) {
			list.add(value.toString());
		}
		return list;
	}

	/**
	 * Validation of the Output Path
	 */	
	private static Path validateOutput(String filePath) throws IOException {
		Path path = new Path(filePath);
		FileSystem fs = path.getFileSystem(new Configuration());
		if (fs.exists(path)) {
			throw new IllegalStateException(path.toString() + " already exists");
		}
		return path;
	}  
	

	/**
	 * Validation of the Input Path
	 */
	private static Path validateInput(String filePath) throws IOException {
		Path path = new Path(filePath);
		FileSystem fs = path.getFileSystem(new Configuration());
		if (!fs.exists(path)) {
			throw new IllegalArgumentException(path.toString() + " does not exist");
		}
		return path;		
	}

	public static void main(String[] args) throws Exception {
		    ToolRunner.run(new Configuration(), new ForestDriver(), args);
	}
}

org.mahoutjp.df.ForestClassificationDriver

package org.mahoutjp.df;

import java.io.IOException;

import org.apache.commons.cli2.CommandLine;
import org.apache.commons.cli2.Group;
import org.apache.commons.cli2.Option;
import org.apache.commons.cli2.OptionException;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.CommandLineUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Classification Mapreduce using a built Decision Forest
 * 
 * @author hamadakoichi
 */
public class ForestClassificationDriver extends AbstractJob {

  private static final Logger log = LoggerFactory.getLogger(ForestClassificationDriver.class);

  @Override
  public int run(String[] args) throws Exception {

		DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
		ArgumentBuilder abuilder = new ArgumentBuilder();
		GroupBuilder gbuilder = new GroupBuilder();
		
		Option testDataOpt = obuilder.withLongName("testdata").withShortName("td")
				.withRequired(true).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create())
				.withDescription("Test data path").create();

		Option datasetOpt = obuilder.withLongName("dataset").withShortName("ds")
				.withRequired(true).withArgument(abuilder.withName("file path").withMinimum(1).withMaximum(1).create())
				.withDescription("Dataset path").create();

		Option modelOpt = obuilder.withLongName("model").withShortName("m")
				.withRequired(true).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create())
				.withDescription("Path to the Decision Forest").create();

		Option predictionOutOpt = obuilder.withLongName("predout").withShortName("po")
				.withRequired(false).withArgument(abuilder.withName("dir path").withMinimum(1).withMaximum(1).create())
				.withDescription("Path to generated predictions file").create();

		Option analyzeOpt = obuilder.withLongName("analyze").withShortName("a")
				.withRequired(false).create();

		Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();

		Group group = gbuilder.withName("Options").withOption(testDataOpt)
				.withOption(datasetOpt).withOption(modelOpt).withOption(predictionOutOpt)
				.withOption(analyzeOpt).withOption(helpOpt).create();

    
		try {
			Parser parser = new Parser();
			parser.setGroup(group);
			CommandLine cmdLine = parser.parse(args);

			if (cmdLine.hasOption("help")) {
				CommandLineUtil.printHelp(group);
				return -1;
			}
			
			//Parameters
			String testDataName = cmdLine.getValue(testDataOpt).toString();
			String descriptorName = cmdLine.getValue(datasetOpt).toString();
			String forestName = cmdLine.getValue(modelOpt).toString();
			String predictionName = (cmdLine.hasOption(predictionOutOpt)) ? cmdLine
					.getValue(predictionOutOpt).toString() : null;
			boolean analyze = cmdLine.hasOption(analyzeOpt);
			
			log.debug("inout     : {}", testDataName);
			log.debug("descriptor: {}", descriptorName);
			log.debug("forest    : {}", forestName);
			log.debug("prediction: {}", predictionName);
			log.debug("analyze   : {}", analyze);
			
			//Execute Classification
			log.info("Execute the Mapreduce Classification ...");
			run(getConf(), forestName, testDataName, descriptorName, predictionName, analyze);

		} catch (OptionException e) {
			log.warn(e.toString(), e);
			CommandLineUtil.printHelp(group);
			return -1;
		}
		return 0;
    }

  
    public static void run(Configuration conf, String forestPathName, 
    		String testDataPathName, String descriptorPathName, String predictionPathName, boolean analyze)
    	throws IOException, ClassNotFoundException, InterruptedException{

		// Classify data
		Path testDataPath = validateInput(testDataPathName);
		Path descriptorPath = validateInput(descriptorPathName);
		
		Path forestPath = validateInput(forestPathName);
		Path predictionPath = validateOutput(conf, predictionPathName);

		ForestClassifier classifier = new ForestClassifier(conf, forestPath, testDataPath,
				descriptorPath, predictionPath, analyze);
		classifier.run();

		// Analyze Results
		if (analyze) {
			log.info(classifier.getAnalyzer().summarize());
		}
    }	  

	/**
	 * Validation of the Output Path
	 */
	private static Path validateOutput(Configuration conf, String filePath) throws IOException {
		Path path = new Path(filePath);
		FileSystem fs = path.getFileSystem(conf);
		if (fs.exists(path)) {
			throw new IllegalStateException(path.toString() + " already exists");
		}
		return path;
	}

	/**
	 * Validation of the Input Path
	 */
	private static Path validateInput(String filePath) throws IOException {
		Path path = new Path(filePath);
		FileSystem fs = path.getFileSystem(new Configuration());
		if (!fs.exists(path)) {
			throw new IllegalArgumentException(path.toString() + " does not exist");
		}
		return path;		
	}

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new ForestClassificationDriver(), args);
	}
}

org.mahoutjp.df.ForestClassifier

package org.mahoutjp.df;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.mahout.df.DecisionForest;
import org.apache.mahout.df.DFUtils;
import org.apache.mahout.df.data.DataConverter;
import org.apache.mahout.df.data.Dataset;
import org.apache.mahout.df.data.Instance;
import org.apache.mahout.common.RandomUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.Scanner;
import java.net.URI;
import org.apache.mahout.classifier.ClassifierResult;
import org.apache.mahout.classifier.ResultAnalyzer;

/**
 * Mapreduce implementation of Classifier with Forest
 * 
 * @author hamadakoichi
 */
public class ForestClassifier {

	private static final Logger log = LoggerFactory
			.getLogger(ForestClassifier.class);
	private final Path forestPath;
	private final Path inputPath;
	private final Path datasetPath;
	private final Configuration conf;

	private final ResultAnalyzer analyzer;
	private final Dataset dataset;
	private final Path outputPath;
	
	public ForestClassifier(Configuration conf, Path forestPath, Path inputPath, Path datasetPath,
			Path outputPath, boolean analyze)
			throws IOException {
		this.forestPath = forestPath;
		this.inputPath = inputPath;
		this.datasetPath = datasetPath;
		this.conf = conf;

		if (analyze) {
			dataset = Dataset.load(conf, datasetPath);
			analyzer = new ResultAnalyzer(Arrays.asList(dataset.labels()),
					"unknown");
		} else {
			dataset = null;
			analyzer = null;
		}
		this.outputPath = outputPath;
	}

	/**
	 * Classification Job Configure
	 */
	private void configureClsssifyJob(Job job) throws IOException {

		job.setJarByClass(ForestClassifier.class);

		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(ClassifyMapper.class);
		job.setNumReduceTasks(0); // Classification Mapper Only

		job.setInputFormatClass(ClassifyTextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
	}

	public void run() throws IOException, ClassNotFoundException,
			InterruptedException {
		FileSystem fs = FileSystem.get(conf);

		if (fs.exists(outputPath)) {
			throw new IOException(outputPath + " already exists");
		}

		// put the dataset
		log.info("Adding the dataset to the DistributedCache");
		DistributedCache.addCacheFile(datasetPath.toUri(), conf);

		// load the forest
		log.info("Adding the forest to the DistributedCache");
		DistributedCache.addCacheFile(forestPath.toUri(), conf);

		// Classification
		Job cjob = new Job(conf, "Decision Forest classification");
		log.info("Configuring the Classification Job...");
		configureClsssifyJob(cjob);

		log.info("Running the Classification Job...");
		if (!cjob.waitForCompletion(true)) {
			log.error("Classification Job failed!");
			return;
		}
		
		// Analyze Results
		if (analyzer != null) {
			analyzeOutput(cjob);
		}
	}
	
	public ResultAnalyzer getAnalyzer() {
		return analyzer;
	}

	/**
	 * Analyze the Classification Results
	 * @param job
	 */
	private void analyzeOutput(Job job) throws IOException {
		Configuration conf = job.getConfiguration();
		Integer prediction;
		Integer realLabel;
		
		FileSystem fs = outputPath.getFileSystem(conf);
		Path[] outfiles = DFUtils.listOutputFiles(fs, outputPath);

		int cnt_cnp = 0;
		
		for (Path path : outfiles) {
			FSDataInputStream input = fs.open(path);
			Scanner scanner = new Scanner(input);

			while (scanner.hasNext()) {
				String line = scanner.nextLine();
				if (line.isEmpty()) {
					continue;
				}

				// id, predict, realLabel with \t sep
				String[] tmp = line.split("\t", -1);
				prediction = Integer.parseInt(tmp[1]);
				realLabel = Integer.parseInt(tmp[2]);
				
				if(prediction == -1) {
					// label cannot be predicted
					cnt_cnp++;
				}
				else{					
					if (analyzer != null) {
						analyzer.addInstance(dataset.getLabel(prediction), new ClassifierResult(dataset
								.getLabel(realLabel), 1.0));
					}
				}
			}
		}
		log.info("# of data which cannot be predicted: " + cnt_cnp);
	}

	/**
	 * Text Input Format: Each file is processed by single Mapper.
	 */
	public static class ClassifyTextInputFormat extends TextInputFormat {
		@Override
		protected boolean isSplitable(JobContext jobContext, Path path) {
			return false;
		}
	}

	/**
	 * Classification Mapper.
	 */
	public static class ClassifyMapper extends
			Mapper<LongWritable, Text, LongWritable, Text> {

		private DataConverter converter;
		private DecisionForest forest;
		private final Random rng = RandomUtils.getRandom();

		private final Text val = new Text();

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			super.setup(context); // To change body of overridden methods use

			Configuration conf = context.getConfiguration();
			URI[] files = DistributedCache.getCacheFiles(conf);

			if ((files == null) || (files.length < 2)) {
				throw new IOException(
						"not enough paths in the DistributedCache");
			}

			Dataset dataset = Dataset.load(conf, new Path(files[0].getPath()));
			converter = new DataConverter(dataset);
			forest = DecisionForest.load(conf, new Path(files[1].getPath()));
			if (forest == null) {
				throw new InterruptedException("DecisionForest not found!");
			}
		}

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();
			if (!line.isEmpty()) {
				String[] idVal = line.split(",", -1);
				Integer id = Integer.parseInt(idVal[0]);

				Instance instance = converter.convert(id, line);
				int prediction = forest.classify(rng, instance);

				// key:id 
				key.set(instance.getId());

				// val: prediction, originalLabel (with tab sep)
				StringBuffer sb = new StringBuffer();
				sb.append(Integer.toString(prediction));
				sb.append("\t");
				sb.append(instance.getLabel());
				val.set(sb.toString());

				context.write(key, val);
			}
		}
	}
}

関連資料