Hatena::ブログ(Diary)

happytanの足跡

2011-03-06

Executorsを利用してみる



JDK5から「java.lang.Thread」を直にnewしなくてもスレッドを扱えるようになりました。

Executorsクラスでは、スレッド処理に必要なクラスを生成します。ファクトリの役割を担ってくれてます。

ExecutorService」の実装クラス、「ScheduledExecutorService」の実装クラスを生成します。

 

メソッド)newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()をみてみる

ExecutorService」の実装クラスを生成するには、3つのメソッドがあります。

 

Noクラス名概要
1newSingleThreadExecutor単一スレッドを作成する
2newFixedThreadPool固定数のスレッドを再利用するスレッドプールを作成する
3newCachedThreadPool必要に応じ、新規スレッドを作成するスレッドプールを作成する。60 秒間使用されなかったスレッドは、終了して、キャッシュから削除される。

java.util.concurrent.Executorsクラスのソースをみてみると、それぞれのメソッドの実装は下記のようになっています。

 

newSingleThreadExecutorの実装

 new ThreadPoolExecutor(1, 1, 0L, 
                          TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>())

newFixedThreadPoolの実装

  new ThreadPoolExecutor(nThreads, nThreads,
                          0L, TimeUnit.MILLISECONDS,
                          new LinkedBlockingQueue<Runnable>())

newCachedThreadPoolの実装

 new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                         60L, TimeUnit.SECONDS,
                         new SynchronousQueue<Runnable>())

それぞれのメソッドをみてみると、実際の生成は、「ThreadPoolExecutor」クラスで行っています。

 

ThreadPoolExecutorをみてみる

 

ThreadPoolExecutorクラスには、下記のようなコンストラクタがあります。

 

 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 

 

それぞれの引数の意味は、下記の通りです。

No引数概要
1corePoolSizeアイドルであってもプール内に維持されるスレッドの数
2maximumPoolSizeプール内で可能なスレッドの最大数
3keepAliveTimeスレッドの数がコアよりも多い場合、これは超過したアイドル状態のスレッドが新しいタスクを待機してから終了するまでの最大時間
4unitkeepAliveTime 引数の時間単位
5workQueueタスクが超過するまで保持するために使用するキュー。このキューは、execute メソッドで送信された Runnable タスクだけを保持する

 

引数をながめていると自分でいろいろ組み立てることもできそうです。

 

メソッド)newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()を生成してみる

newSingleThreadExecutorメソッド、newFixedThreadPoolメソッド、newCachedThreadPoolメソッドを利用して

それぞれの実装の仕方をみていきたいとおもいます。

 

まずは、newSingleThreadExecutor()メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newSingleThreadExecutor();
        e.execute(new RunnableClass()); 
        e.shutdown();

 

RunnableClass*1クラスをExecutorServiceのexecuteに渡してあげることにより

スレッドで処理を実行してあげることができます。ExecutorService内で実行しているスレッドを終わらせる為に

shutdownメソッドを呼び出す必要があります。

 

次に、newFixedThreadPool(int nThreads)メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newFixedThreadPool(3);
    // RunnableClassで処理を追加していく
        for (int i = 0; i < 5; i++) {
            e.execute(new RunnableClass("Task" + i));
        } 
        e.shutdown();

 

固定数のスレッドを3としているので、3以上のスレッドは作成されません。

実行中のスレッドの上限は、常に3となります。

    java.lang.ThreadGroup[name=main,maxpri=10]
        Thread[main,5,main]
        Thread[pool-1-thread-1,5,main]
        Thread[pool-1-thread-2,5,main]
        Thread[pool-1-thread-3,5,main]

 

最後に、newCachedThreadPool()メソッドの利用の仕方をみていきます。

        ExecutorService e = Executors.newCachedThreadPool();
    // RunnableClassで処理を追加していく
        for (int i = 0; i < 5; i++) {
            e.execute(new RunnableClass("Task" + i));
            try {
                Thread.sleep(1500L);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        e.shutdown();

60 秒間内ならば、生成された既存のスレッドを利用して、処理を順次実行してくれます。

 

RunnableClassは、下記の様な実装としてます。

    class RunnableClass implements Runnable {
        private String name;

        public RunnableClass(String name) {
            this.name = name;
        }

        public void run() {
            System.out.println(name + " Starts.");

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException ex) {
                System.out.println(name + " is Canceled");
                return;
            }
            System.out.println(name + " is Done.");
        }
    }

 

CassandraDaemonで利用されているExecutorService

Cassandraのソースをみていたときに、org.apache.cassandra.thrift.CassandraDaemonというクラスで

ThreadPoolを作成している箇所がありました。その時に、java.util.concurrentパッケージのExecutorService

クラスが出現し、一度、java.util.concurrentパッケージを眺めて理解しないとソースが理解できなさそうだったので、

確認をしてみました。

 

            // ThreadPool Server
            CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options();
            options.minWorkerThreads = MIN_WORKER_THREADS;

            ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState,
                    options.minWorkerThreads,
                    options.maxWorkerThreads);
            serverEngine = new CustomTThreadPoolServer(new TProcessorFactory(processor),
                    tServerSocket,
                    inTransportFactory,
                    outTransportFactory,
                    tProtocolFactory,
                    tProtocolFactory,
                    options,
                    executorService);

Cassandraは、結構、thriftのクラスをそのまま利用しているところが多々見受けられたので、

どこかのタイミングでthriftに関しても確認できればと思います。

 


*1:Runnableインターフェイスの実装クラス

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/masa_to/20110306/1299423229