Hadoop: 自作クラスのシリアライズ方法

以前 Hadoop におけるオブジェクトのシリアライズの仕方について述べました。その際、ArrayWritable オブジェクトをシリアライズして HBase に登録する簡単なサンプルプログラムを紹介しました。

Hadoop 本には, ArrayWritable, IntWritable や Text のように Hadoop ライブラリに存在する Writable クラスの使用方法だけでなく、Writable インターフェースを拡張して自作オブジェクトをシリアライズする例が載っています。このテクニックを使うことで HBase や HDFS に自作オブジェクトをシリアライズして追加することができ、必要になった際は、(テキストからオブジェクトを生成するよりは)高速に取り出すことができます。

勉強のために、前回作った '図書館' の例を下敷きにして、自作クラスの Writable クラス (UserWritable) の動作確認をしてみました。以下 UserWritable クラスです。

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ArrayWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public final class UserWritable implements Writable {

    public static class LongArrayWritable extends ArrayWritable {
        public LongArrayWritable() {
            super(LongWritable.class);
        }
    }

    private Text name;
    private Text job;
    private LongArrayWritable borrowedBooks;

    public UserWritable() {
        this.name = new Text();
        this.job = new Text();
        this.borrowedBooks = new LongArrayWritable();
    }

    public UserWritable(Text name, Text job,LongArrayWritable borrowedBooks) {
        this.name = name;
        this.job = job;
        this.borrowedBooks = borrowedBooks;
    }

    public Text getJob() {
        return this.job;
    }

    public Text getName() {
        return this.name;
    }

    public LongArrayWritable getBorrowedHistory() {
        return borrowedBooks;
    }

    @Override
        public void write(DataOutput out) throws IOException {
        name.write(out);
        job.write(out);
        borrowedBooks.write(out);
    }

    @Override
        public void readFields(DataInput in) throws IOException {
        name.readFields(in);
        job.readFields(in);
        borrowedBooks.readFields(in);
    }
}

UserWritable クラスはユーザの名前 (name), 職業(job), 借りた本の履歴 (borrowedBooks) フィールドを持ちます。さらに, UserWritable クラスは Writable インターフェースを実装するため、Writableインターフェースに存在する二つのメソッド (write, readFields) を実装する必要があります。

UserWritable オブジェクトが正常にシリアライズされるかを以下のサンプルプログラム (HBaseObjectSerialization) で試してみました。

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;

public class HBaseObjectSerialization {

    static final String TABLE_NAME  = "library2";
    static final String BOOK_FAMILY = "users";

    public static void main(String[] args) throws Exception {
        HBaseConfiguration config = new HBaseConfiguration();
        HBaseAdmin admin          = new HBaseAdmin(config);

        if (admin.tableExists(TABLE_NAME)) {
            admin.disableTable(TABLE_NAME);
            admin.deleteTable(TABLE_NAME);
        }

        // create table
        HTableDescriptor desc = new HTableDescriptor(TABLE_NAME.getBytes());
        desc.addFamily(new HColumnDescriptor(BOOK_FAMILY.getBytes()));
        admin.createTable(desc);

        HTable table   = new HTable(config, TABLE_NAME);

        // put ids of users and user profile to HBase
        Put userOnePut = new Put(Writables.getBytes(new LongWritable(1))); // user 1
        Put userTwoPut = new Put(Writables.getBytes(new LongWritable(2))); // user 2

        UserWritable.LongArrayWritable userOneHistory = new UserWritable.LongArrayWritable();
        UserWritable.LongArrayWritable userTwoHistory = new UserWritable.LongArrayWritable();

        userOneHistory.set(new LongWritable[] {new LongWritable(21),
                                               new LongWritable(121)});
        userTwoHistory.set(new LongWritable[] {new LongWritable(821),
                                               new LongWritable(26),
                                               new LongWritable(40)});

        UserWritable userOne = new UserWritable(new Text("John"),
                                                new Text("Professor"),
                                                userOneHistory);
        UserWritable userTwo = new UserWritable(new Text("Mike"),
                                                new Text("Engineer"),
                                                userTwoHistory);

        byte[] ueserOneHistoryByte = Writables.getBytes(userOne);
        byte[] ueserTwoHistoryByte = Writables.getBytes(userTwo);

        userOnePut.add((BOOK_FAMILY).getBytes(), null, ueserOneHistoryByte);
        userTwoPut.add((BOOK_FAMILY).getBytes(), null, ueserTwoHistoryByte);

        table.put(userOnePut);
        table.put(userTwoPut);

        // get the borrowing histories from HBase
        Scan scan = new Scan();
        scan.addFamily(BOOK_FAMILY.getBytes());
        ResultScanner scanner = table.getScanner(scan);

        for (Result result : scanner) {
            LongWritable userId = (LongWritable) Writables.getWritable(result.getRow(),
                                                                       new LongWritable());
            UserWritable deserializedUser =
                (UserWritable) Writables.getWritable(result.getValue(Bytes.toBytes(BOOK_FAMILY)),
                                                     new UserWritable());
            System.out.println("The job of " + deserializedUser.getName()
                               + " is '"     + deserializedUser.getJob() + "'");
            Object[] userHistoryArray = deserializedUser.getBorrowedHistory().get();

            for (int j = 0; j < userHistoryArray.length; j++) {
                LongWritable bookId = (LongWritable) userHistoryArray[j];
                System.out.println("\t he/she borrwowed "
                                   + new String(Long.toString(bookId.get())));
            }
        }
    }
}

このプログラムでは二人の図書館ユーザ ("John" と "Mike") を表すオブジェクト (UserWritable) を作り、それをシリアライズして HBase に追加します。その後 HBase から Scan で追加済みのシリアライズされたオブジェクトを入手し、最後に getWritable メソッドをシリアライズされたオブジェクトに適用することで、オブジェクトを復元しています。

このプログラムを実行すると, 以下のような出力が得られ、UserWritable オブジェクトに保存されている各ユーザの情報が取り出せている (デシリアライズされている) ことが分かります。

The job of John is 'Professor'
he borrwowed 21
he borrwowed 121
The job of Mike is 'Engineer'
he borrwowed 821
he borrwowed 26
he borrwowed 40

ちなみに本ブログで載っているプログラムは全て私の環境で動作テストはしておりますが、あくまで自分が Hadoop を理解するために書いてあるものですので、例外への対処等は十分に行われていませんので悪しからず。

とはいえ、英語、日本語を含めてあまりにも参照できるドキュメントやソースコードが少ないので、おいておくとどなたかの役に立つことを期待しております。紹介させていただいているソースにあるやり方よりも良い、もしくは新しい方法があればご指摘いただけると幸いです。