Hatena::ブログ(Diary)

あらびき日記 Twitter

2011-08-07

PigでNULLを扱う場合の挙動まとめ

最近、象とか豚と戯れるサファリパークな日々が続いていますが、豚と戯れる時に一番気を付けないといけないのは NULL の扱い方だと思います。

そんなわけで NULL を扱った場合の挙動についてまとめました。


準備

NULLを含むデータとして次のようなデータを使います。

product_id, user_id ってとこですかね。

null_data_sample.tsv

101	100001
	100003
101	
	100001
102	100002

まぁ、こういうデータになんで NULL が含まれてるのかってツッコミたくなりますが!


手元でサクッと試したいのでローカルモードで Grunt を立ち上げてデータをロードします

$ pig -x local
grunt> a = LOAD 'null_data_sample.tsv' AS (product_id:int, user_id:int);

Filter

NULL との比較は IS [NOT] NULL を使用しないとどのような比較を行っても NULL を含むレコードは取り除かれます。

これは SQL と一緒ですね。


product_id が 101 のものだけ抽出される

grunt> b = FILTER a BY product_id == 101;
grunt> DUMP b;
(101,100001)
(101,)

product_id が 101 じゃないものが抽出されるが NULL のものは抽出されない

grunt> c = FILTER a BY product_id != 101;
grunt> DUMP c;
(102,100002)

product_id が NULL のものが抽出される

grunt> d = FILTER a BY product_id IS NULL;
grunt> DUMP d;
(,100003)
(,100001)

product_id が NULL じゃないものが抽出される

grunt> e = FILTER a BY product_id IS NOT NULL;
grunt> DUMP e;
(101,100001)
(101,)
(102,100002)

OR 条件だと次の条件に NULL 判定を入れれば product_id が NULL のものも抽出される

grunt> f = FILTER a BY product_id == 101 OR product_id is null;
(101,100001)
(,100003)
(101,)
(,100001)


Join


Inner Join

Inner Join は結合するキーに NULL が含まれるとそのレコードは取り除かれます。


product_id が NULL のものは取り除かれる

grunt> b = LOAD 'null_data_sample.tsv' AS (product_id:int, user_id:int);
grunt> c = JOIN a BY product_id, b BY product_id;
grunt> DUMP c;
(101,100001,101,100001)
(101,100001,101,)
(101,,101,100001)
(101,,101,)
(102,100002,102,100002)

複合キーの場合、1つでも NULL が含まれると取り除かれる

grunt> d = JOIN a BY (product_id, user_id), b BY (product_id, user_id);
grunt> DUMP d;
(101,100001,101,100001)
(102,100002,102,100002)

Outer Join

Outer Join は結合するキーに NULL が含まれると、NULL 同士であっても別の値として扱われます。


Full Join だと Self Join でも NULL を含むレコードは別のレコードができる

grunt> e = JOIN a BY product_id FULL, b BY product_id;
grunt> DUMP e;
(101,100001,101,100001)
(101,100001,101,)
(101,,101,100001)
(101,,101,)
(102,100002,102,100002)
(,100003,,)
(,100001,,)
(,,,100003)
(,,,100001)

Left Join だと案の定 Self Join でも NULL を含むものは結合されない

grunt> f = JOIN a BY product_id LEFT, b BY product_id;
grunt> DUMP f;
(101,100001,101,100001)
(101,100001,101,)
(101,,101,100001)
(101,,101,)
(102,100002,102,100002)
(,100003,,)
(,100001,,)

複合キーでも同様に NULL は NULL 同士であっても結合されない

grunt> g = JOIN a BY (product_id, user_id) FULL, b BY (product_id, user_id);
grunt> DUMP g;
(101,100001,101,100001)
(101,,,)
(,,101,)
(102,100002,102,100002)
(,100001,,)
(,,,100001)
(,100003,,)
(,,,100003)


Group

結合する場合は NULL 同士が結合されないのに Group だと NULL 同士でグルーピングされます。

grunt> b = GROUP a BY product_id;
grunt> DUMP b;
(101,{(101,100001),(101,)})
(102,{(102,100002)})
(,{(,100003),(,100001)})


Order

ソートする場合は数値でも文字列でも昇順だと NULL が最初に来ます


数値の昇順だと NULL は最初に位置する

grunt> b = ORDER a BY product_id;
grunt> DUMP b;
(,100003)
(,100001)
(101,100001)
(101,)
(102,100002)

当然降順だと NULL は最後に位置する

grunt> c = ORDER a BY product_id DESC;
grunt> DUMP c;
(102,100002)
(101,100001)
(101,)
(,100003)
(,100001)

文字列の昇順でも NULL は最初に位置する

grunt> d = LOAD 'null_data_sample.tsv' AS (product_id:chararray, user_id:int);
grunt> e = ORDER d BY product_id;
(,100003)
(,100001)
(101,100001)
(101,)
(102,100002)

当然文字列でも降順だと NULL は最後に位置する

grunt> f = ORDER d BY product_id DESC;
(102,100002)
(101,100001)
(101,)
(,100003)
(,100001)


Distinct

Distinct は NULL を1種類の値として扱うみたいです。

grunt> b = FOREACH a GENERATE product_id;
grunt> c = DISTINCT b;
grunt> DUMP c;
(101)
(102)
()


四則演算

NULL に対する四則演算は NULL になります。

grunt> b = FOREACH a GENERATE product_id + user_id, product_id - user_id, product_id * user_id, product_id / user_id;
grunt> DUMP b;
(100102,-99900,10100101,0)
(,,,)
(,,,)
(,,,)
(100104,-99900,10200204,0)


三項演算子

三項演算子の場合、NULL に対して NULL 判定以外の比較を行うと結果が NULL になります。

なので、NULL を取り得る値に対して三項演算子を適用する場合は必ず最初に NULL 判定を行わないとネストして三項演算子を適用しても所望の結果が得られないことになります。


NULL 判定をしないと値が NULL のものは返り値も NULL になる。

grunt> b = FOREACH a GENERATE (product_id == 101 ? 'product_id is 101' : 'product_id is not 101');
(product_id is 101)
()
(product_id is 101)
()
(product_id is not 101)

NULL 判定を最初に持ってこないと NULL になる

grunt> c = FOREACH a GENERATE (product_id == 101 ? 'product_id is 101' :
>>                            (product_id IS NULL ? 'product_id is NULL' : 'product_id is neither 101 nor NULL'));
grunt> DUMP c;
(product_id is 101)
()
(product_id is 101)
()
(product_id is neither 101 nor NULL)

NULL の可能性があれば NULL 判定を最初に行うことで所望の結果が得られる

grunt> d = FOREACH a GENERATE (product_id IS NULL ? 'product_id is NULL' :
>>                            (product_id == 101 ? 'product_id is 101' : 'product_id is neither 101 nor NULL'));
grunt> DUMP d;
(product_id is 101)
(product_id is NULL)
(product_id is 101)
(product_id is NULL)
(product_id is neither 101 nor NULL)

Filter 同様 OR 条件に NULL 判定を用いてもOK

grunt> e = FOREACH a GENERATE (product_id == 101 OR product_id IS NULL ? 'product_id is 101 or NULL' : 'product_id is neither 101 nor NULL');
grunt> DUMP e;
(product_id is 101 or NULL)
(product_id is 101 or NULL)
(product_id is 101 or NULL)
(product_id is 101 or NULL)
(product_id is neither 101 nor NULL)




というわけで、NULL の扱いには気を付けましょう!

Hadoop StreamingでHDFS上のレコード数を数える

ローカルファイルの行数を数える時は wc コマンドを使いますよね。

ふと、HDFS上のファイルの行数を数える時は何が良いのか考えてみました。


Pigだと次のような感じです。

$ pig -e "a = load 'inputdir'; b = group a all; c = foreach b generate COUNT_STAR(a); dump c"

これでも良さそうですが、レコードの数を数えるのが combiner と reducer なので、たいていの場合は mapper からの出力を一度ファイルに書き出す必要があり、膨大なファイルを扱う際に時間がかかります。

※グループ化する前に1つ目のフィールドだけを取り出すと若干速くなると思います

→ ファイルに書き出される前に combine 処理が行われるはずなのでファイルIOはほとんど問題じゃないみたいですがとりあえず時間がかかります



そこで、Hadoop Streaming を使って次のように実行してみると Pig に比べてかなり速くなります。

$ tempfile=$(mktemp -u)
$ hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 -mapper wc -reducer "awk '{sum += \$1} END {print sum}'" -input inputdir -output $tempfile
$ hadoop dfs -cat $tempfile/*
$ hadoop dfs -rmr $tempfile

まぁ MapReduce の思想に反したやり方な気はしますが・・・


そんなこんなで次のようなファイル作成して /usr/local/bin にでも置いておくと幸せになれるかもしれません。


wc_hdfs


設置して実際に使ってみましょう

$ chmod +x wc_hdfs
$ sudo mv wc_hdfs /usr/local/bin/
$ wc_hdfs inputdir
316262217 1265048448 9697306421

いい感じですね!!


2011/08/07 追記:

mapper に awk を指定してフィルタリングできるようにしました

$ wc_hdfs -c '$1 != ""' inputdir
316261899 1265047596 9697299107

※-Fを指定すればカンマ区切りのファイルにも適用できます

※単語数はnull(空文字列)も含まれてしまうのでwcの出力結果とは若干異なります





ちなみに Hadoop Streaming の公式ドキュメント には

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer /bin/wc

という記述が見られますが、これだとおそらくファイルIOに加えてネットワーク転送まで発生するので不適切だと思います。

※これを実行するとエラーで落ちるんですが・・・