ブログトップ 記事一覧 ログイン 無料ブログ開設

akishin999の日記 RSSフィード Twitter


Google

2010-05-07

Java から Cassandra を使ってみる その7

Windows で Cassandra を動かしてみる
Java から Cassandra を使ってみる その1
Java から Cassandra を使ってみる その2
Java から Cassandra を使ってみる その3
Java から Cassandra を使ってみる その4
Java から Cassandra を使ってみる その5
Java から Cassandra を使ってみる その6


の続きです。

batch_mutate() を使ってみる


データの一括登録・更新を行う batch_mutate() メソッドを Java から使ってみました。
引数の構造が若干複雑なため、実際に呼び出してみるまでの準備の段階が面倒臭いです。

package example.cassandra;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class CassandraBatchMutateExample {

    private static TTransport transport;
    private static TProtocol protocol;
    private static Cassandra.Client client;
    
    private static final String KEY_SPACE = "Examples";
    private static final String COLUMN_FAMILY = "Entries";
    
    private static final String KEY_1 = "9000";
    private static final String KEY_2 = "9001";
    private static final String KEY_3 = "9002";
    
    @SuppressWarnings("serial")
    public static void main(String[] args) {
        try {
            transport = new TSocket("localhost", 9160);
            protocol = new TBinaryProtocol(transport);
            client = new Cassandra.Client(protocol);
            transport.open();
            
            long timestamp = System.currentTimeMillis();
            
            // 登録・更新したい値から Mutation オブジェクトの List を作成
            List<Mutation> mutationList = new ArrayList<Mutation>();
            mutationList.add(toMutation("title", "batch_mutate テスト", timestamp));
            mutationList.add(toMutation("content", "batch_mutate のテストです。", timestamp));
            
            // カラムファミリと Mutation のリストの Map を作成
            Map<String, List<Mutation>> columnFamilyMap = new HashMap<String, List<Mutation>>();
            columnFamilyMap.put(COLUMN_FAMILY, mutationList);

            // key と カラムファミリの Map の Map を作成
            Map<String, Map<String, List<Mutation>>> rowsMap = 
                new HashMap<String, Map<String, List<Mutation>>>();
            
            // 取り合えず同じ内容を key だけ別にして追加。
            rowsMap.put(KEY_1, columnFamilyMap);
            rowsMap.put(KEY_2, columnFamilyMap);
            rowsMap.put(KEY_3, columnFamilyMap);
            
            // 一括登録
            client.batch_mutate(KEY_SPACE, rowsMap, ConsistencyLevel.ONE);

            System.out.println("登録完了。");
            
            // 登録した値を取得してみる
            showEntries(new ArrayList<String>() {{add(KEY_1); add(KEY_2); add(KEY_3);}});
            
            transport.close();
        } catch(Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 登録したデータを表示する
     * @throws InvalidRequestException
     * @throws UnavailableException
     * @throws TimedOutException
     * @throws TException
     * @throws UnsupportedEncodingException
     */
    private static void showEntries(final List<String> keys) throws InvalidRequestException,
            UnavailableException, TimedOutException, TException,
            UnsupportedEncodingException {
        
        ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY);

        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[0]);
        sliceRange.setFinish(new byte[0]);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);
        
        Map<String, List<ColumnOrSuperColumn>> results = client.multiget_slice(KEY_SPACE, 
                                                                               keys, 
                                                                               columnParent, 
                                                                               slicePredicate, 
                                                                               ConsistencyLevel.ONE);
      
        for (Map.Entry<String, List<ColumnOrSuperColumn>> entry : results.entrySet()) {
            String key = entry.getKey();
            List<ColumnOrSuperColumn> list = entry.getValue();
            for (int i = 0; i < list.size(); i++) {
                ColumnOrSuperColumn result = list.get(i); 
                Column col = result.column;
                System.out.printf("key:[%s] [%d] カラム名:[%s] 値:[%s] タイムスタンプ:[%s]\n",
                        key,
                        i + 1,
                        new String(col.name, "UTF8"),
                        new String(col.value, "UTF8"),
                        new Date(col.timestamp));
            }
            System.out.println("--------------------");
        }
    }
    
    /**
     * カラム名、値、タイムスタンプ値 から Mutation オブジェクトを生成する。
     * @param name
     * @param value
     * @param timestamp
     * @return
     * @throws UnsupportedEncodingException
     */
    public static Mutation toMutation(final String name, final String value, final long timestamp) throws UnsupportedEncodingException {
        Mutation mutation = new Mutation();
        ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
        columnOrSuperColumn.setColumn(new Column(name.getBytes("UTF8"), value.getBytes("UTF8"), timestamp));
        mutation.setColumn_or_supercolumn(columnOrSuperColumn);
        return mutation;
    }
}


実行すると、以下のように一回のメソッド呼び出しで 3 行分のデータが登録されていることが分かります。

登録完了。
key:[9002] [1] カラム名:[content] 値:[batch_mutate のテストです。] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
key:[9002] [2] カラム名:[title] 値:[batch_mutate テスト] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
--------------------
key:[9001] [1] カラム名:[content] 値:[batch_mutate のテストです。] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
key:[9001] [2] カラム名:[title] 値:[batch_mutate テスト] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
--------------------
key:[9000] [1] カラム名:[content] 値:[batch_mutate のテストです。] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
key:[9000] [2] カラム名:[title] 値:[batch_mutate テスト] タイムスタンプ:[Sat May 08 02:35:01 JST 2010]
--------------------


それにしても第二引数の Map<String, Map<String, List<Mutation>>> は複雑すぎて、中々覚えられそうにないですね。




スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/akishin999/20100507/1273254065