2011-08-07
PigでNULLを扱う場合の挙動まとめ
最近、象とか豚と戯れるサファリパークな日々が続いていますが、豚と戯れる時に一番気を付けないといけないのは NULL の扱い方だと思います。
そんなわけで NULL を扱った場合の挙動についてまとめました。
準備
NULLを含むデータとして次のようなデータを使います。
product_id, user_id ってとこですかね。
null_data_sample.tsv
101 100001 100003 101 100001 102 100002
まぁ、こういうデータになんで NULL が含まれてるのかってツッコミたくなりますが!
手元でサクッと試したいのでローカルモードで Grunt を立ち上げてデータをロードします
$ pig -x local grunt> a = LOAD 'null_data_sample.tsv' AS (product_id:int, user_id:int);
Filter
NULL との比較は IS [NOT] NULL を使用しないとどのような比較を行っても NULL を含むレコードは取り除かれます。
これは SQL と一緒ですね。
product_id が 101 のものだけ抽出される
grunt> b = FILTER a BY product_id == 101; grunt> DUMP b; (101,100001) (101,)
product_id が 101 じゃないものが抽出されるが NULL のものは抽出されない
grunt> c = FILTER a BY product_id != 101; grunt> DUMP c; (102,100002)
product_id が NULL のものが抽出される
grunt> d = FILTER a BY product_id IS NULL; grunt> DUMP d; (,100003) (,100001)
product_id が NULL じゃないものが抽出される
grunt> e = FILTER a BY product_id IS NOT NULL; grunt> DUMP e; (101,100001) (101,) (102,100002)
OR 条件だと次の条件に NULL 判定を入れれば product_id が NULL のものも抽出される
grunt> f = FILTER a BY product_id == 101 OR product_id is null; (101,100001) (,100003) (101,) (,100001)
Join
Inner Join
Inner Join は結合するキーに NULL が含まれるとそのレコードは取り除かれます。
product_id が NULL のものは取り除かれる
grunt> b = LOAD 'null_data_sample.tsv' AS (product_id:int, user_id:int); grunt> c = JOIN a BY product_id, b BY product_id; grunt> DUMP c; (101,100001,101,100001) (101,100001,101,) (101,,101,100001) (101,,101,) (102,100002,102,100002)
複合キーの場合、1つでも NULL が含まれると取り除かれる
grunt> d = JOIN a BY (product_id, user_id), b BY (product_id, user_id); grunt> DUMP d; (101,100001,101,100001) (102,100002,102,100002)
Outer Join
Outer Join は結合するキーに NULL が含まれると、NULL 同士であっても別の値として扱われます。
Full Join だと Self Join でも NULL を含むレコードは別のレコードができる
grunt> e = JOIN a BY product_id FULL, b BY product_id; grunt> DUMP e; (101,100001,101,100001) (101,100001,101,) (101,,101,100001) (101,,101,) (102,100002,102,100002) (,100003,,) (,100001,,) (,,,100003) (,,,100001)
Left Join だと案の定 Self Join でも NULL を含むものは結合されない
grunt> f = JOIN a BY product_id LEFT, b BY product_id; grunt> DUMP f; (101,100001,101,100001) (101,100001,101,) (101,,101,100001) (101,,101,) (102,100002,102,100002) (,100003,,) (,100001,,)
複合キーでも同様に NULL は NULL 同士であっても結合されない
grunt> g = JOIN a BY (product_id, user_id) FULL, b BY (product_id, user_id); grunt> DUMP g; (101,100001,101,100001) (101,,,) (,,101,) (102,100002,102,100002) (,100001,,) (,,,100001) (,100003,,) (,,,100003)
Group
結合する場合は NULL 同士が結合されないのに Group だと NULL 同士でグルーピングされます。
grunt> b = GROUP a BY product_id;
grunt> DUMP b;
(101,{(101,100001),(101,)})
(102,{(102,100002)})
(,{(,100003),(,100001)})
Order
ソートする場合は数値でも文字列でも昇順だと NULL が最初に来ます
数値の昇順だと NULL は最初に位置する
grunt> b = ORDER a BY product_id; grunt> DUMP b; (,100003) (,100001) (101,100001) (101,) (102,100002)
当然降順だと NULL は最後に位置する
grunt> c = ORDER a BY product_id DESC; grunt> DUMP c; (102,100002) (101,100001) (101,) (,100003) (,100001)
文字列の昇順でも NULL は最初に位置する
grunt> d = LOAD 'null_data_sample.tsv' AS (product_id:chararray, user_id:int); grunt> e = ORDER d BY product_id; (,100003) (,100001) (101,100001) (101,) (102,100002)
当然文字列でも降順だと NULL は最後に位置する
grunt> f = ORDER d BY product_id DESC; (102,100002) (101,100001) (101,) (,100003) (,100001)
Distinct
Distinct は NULL を1種類の値として扱うみたいです。
grunt> b = FOREACH a GENERATE product_id; grunt> c = DISTINCT b; grunt> DUMP c; (101) (102) ()
四則演算
NULL に対する四則演算は NULL になります。
grunt> b = FOREACH a GENERATE product_id + user_id, product_id - user_id, product_id * user_id, product_id / user_id; grunt> DUMP b; (100102,-99900,10100101,0) (,,,) (,,,) (,,,) (100104,-99900,10200204,0)
三項演算子
三項演算子の場合、NULL に対して NULL 判定以外の比較を行うと結果が NULL になります。
なので、NULL を取り得る値に対して三項演算子を適用する場合は必ず最初に NULL 判定を行わないとネストして三項演算子を適用しても所望の結果が得られないことになります。
NULL 判定をしないと値が NULL のものは返り値も NULL になる。
grunt> b = FOREACH a GENERATE (product_id == 101 ? 'product_id is 101' : 'product_id is not 101'); (product_id is 101) () (product_id is 101) () (product_id is not 101)
NULL 判定を最初に持ってこないと NULL になる
grunt> c = FOREACH a GENERATE (product_id == 101 ? 'product_id is 101' : >> (product_id IS NULL ? 'product_id is NULL' : 'product_id is neither 101 nor NULL')); grunt> DUMP c; (product_id is 101) () (product_id is 101) () (product_id is neither 101 nor NULL)
NULL の可能性があれば NULL 判定を最初に行うことで所望の結果が得られる
grunt> d = FOREACH a GENERATE (product_id IS NULL ? 'product_id is NULL' : >> (product_id == 101 ? 'product_id is 101' : 'product_id is neither 101 nor NULL')); grunt> DUMP d; (product_id is 101) (product_id is NULL) (product_id is 101) (product_id is NULL) (product_id is neither 101 nor NULL)
Filter 同様 OR 条件に NULL 判定を用いてもOK
grunt> e = FOREACH a GENERATE (product_id == 101 OR product_id IS NULL ? 'product_id is 101 or NULL' : 'product_id is neither 101 nor NULL'); grunt> DUMP e; (product_id is 101 or NULL) (product_id is 101 or NULL) (product_id is 101 or NULL) (product_id is 101 or NULL) (product_id is neither 101 nor NULL)
というわけで、NULL の扱いには気を付けましょう!
Hadoop StreamingでHDFS上のレコード数を数える
ローカルファイルの行数を数える時は wc コマンドを使いますよね。
ふと、HDFS上のファイルの行数を数える時は何が良いのか考えてみました。
Pigだと次のような感じです。
$ pig -e "a = load 'inputdir'; b = group a all; c = foreach b generate COUNT_STAR(a); dump c"
これでも良さそうですが、レコードの数を数えるのが combiner と reducer なので、たいていの場合は mapper からの出力を一度ファイルに書き出す必要があり、膨大なファイルを扱う際に時間がかかります。
※グループ化する前に1つ目のフィールドだけを取り出すと若干速くなると思います
→ ファイルに書き出される前に combine 処理が行われるはずなのでファイルIOはほとんど問題じゃないみたいですがとりあえず時間がかかります
そこで、Hadoop Streaming を使って次のように実行してみると Pig に比べてかなり速くなります。
$ tempfile=$(mktemp -u)
$ hadoop jar hadoop-streaming.jar -D mapred.reduce.tasks=1 -mapper wc -reducer "awk '{sum += \$1} END {print sum}'" -input inputdir -output $tempfile
$ hadoop dfs -cat $tempfile/*
$ hadoop dfs -rmr $tempfile
まぁ MapReduce の思想に反したやり方な気はしますが・・・
そんなこんなで次のようなファイル作成して /usr/local/bin にでも置いておくと幸せになれるかもしれません。
wc_hdfs
設置して実際に使ってみましょう
$ chmod +x wc_hdfs $ sudo mv wc_hdfs /usr/local/bin/ $ wc_hdfs inputdir 316262217 1265048448 9697306421
いい感じですね!!
2011/08/07 追記:
mapper に awk を指定してフィルタリングできるようにしました
$ wc_hdfs -c '$1 != ""' inputdir 316261899 1265047596 9697299107
※-Fを指定すればカンマ区切りのファイルにも適用できます
※単語数はnull(空文字列)も含まれてしまうのでwcの出力結果とは若干異なります
ちなみに Hadoop Streaming の公式ドキュメント には
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /bin/wc
という記述が見られますが、これだとおそらくファイルIOに加えてネットワーク転送まで発生するので不適切だと思います。
※これを実行するとエラーで落ちるんですが・・・