Hatena::ブログ(Diary)

<s>gnarl,</s>技術メモ”’<marquee><textarea>¥ このページをアンテナに追加 RSSフィード Twitter

2009-06-26

Javaでジェネレータ

つくった。javaでyield returnできるので精神が健康になる!

Generator<Integer> g=new Generator<Integer>() {
	@Override
	protected void iterate() throws InterruptedException {
		for(int i=0;i<10;i++)
			yield_return(i);
	}
};
while(g.hasNext())
	System.out.println(g.next());

無限リスト対応。

Generator<Long> fib=new Generator<Long>() {
	private void return_fib(long a,long b) throws InterruptedException {
		yield_return(a+b);
		return_fib(b,a+b);
	}
	@Override
	protected void iterate() throws InterruptedException {
		yield_return(0L);
		yield_return(1L);
		return_fib(0L,1L);
	}
};
for(int i=0;i<50;i++) {
	System.out.println("fib("+i+") = "+fib.next());
}
fib.shutdown();

参考資料


コード

なんでこんなに長くなったんだろう……。

/***
 * ジェネレータとかいうやつの実装だと思います. たぶんファイバーとかコルーチンとかいうのと関係ある. yield_returnできるよ. <br />
 * 独自スレッドを使ってるけど、yield_return/nextの呼び出しでブロックしてクライアントスレッドと交互に動く <br />
 * つかいかた:
 * 
 * <pre>
 * Iterator&lt;String&gt; iter=new Generator&lt;String&gt;() {
 *    &#64;Override
 *    public void iterate() {
 *      yield_return(&quot;hoge&quot;);
 *      yield_return(&quot;hage&quot;);
 *      for(int i=0;i&lt;10;i++)
 *        yield_return(Integer.toString(i));
 *    }
 *  });
 *  while(iter.hasNext())
 *    System.out.println(iter.next());
 * </pre>
 * 
 * 最後までイテレートしないときはshutdownを呼ばないとスレッドが残る.
 * 
 * @param <T>
 *            扱う値の型
 */
public static abstract class Generator<T> implements Iterator<T> {

	/**
	 * nullを表現するオブジェクト.
	 * なんでこんなことになってるかというとBlockingQueueがnullを許容してくれないからですね
	 */
	private static final Object nullObject = new Object();

	/** イテレート終了を表すオブジェクト. yield_returnの帰り値と同じキューにつっこみたかったのでこんなことに */
	private static final Object iterateEnd = new Object();

	/** 次の要素がまだ準備できていないことを示すオブジェクト */
	private static final Object notPrepared = new Object();

	/** yield_returnの値をクライアントに渡すためのキュー */
	private final BlockingQueue<Object> yieldValues = new LinkedBlockingQueue<Object>(1);
	
	/** next()が呼ばれるまでyield_returnをブロックするのに使う */
	private final Semaphore resumeLock = new Semaphore(0);
	
	/** iterateで例外が投げられたらその値 */
	private RuntimeException thrown=null;

	/** next()が返す値*/
	private Object nextValue = notPrepared;
	
	@Override
	protected void finalize() throws Throwable {
		try {
			super.finalize();
		} finally {
			shutdown();
		}
	}

	/** iterate()が動くスレッド */
	private final Thread thread = new Thread(new Runnable() {
		public void run() {
			try {
				// 本体実行
				Generator.this.iterate();
			} catch (InterruptedException e) {
				// インタラプトされたら終了
			} catch(RuntimeException e) {
				// 例外投げられたらそれを記録
				thrown=e;
			} finally {
				// 終了したことをあらわすオブジェクトをつっこむ
				yieldValues.add(iterateEnd);
			}
		}
	});

	/***
	 * 次の要素があるかどうか返します. iterate()がyield_returnするか終了するまでブロックします.
	 * <br />
	 * ブロック中にスレッドがインタラプトされた場合はインタラプトステータスをセットします
	 */
	@Override
	public boolean hasNext() {
		prepareNext();
		assert nextValue != notPrepared;
		return nextValue != iterateEnd;
	}

	/***
	 * 次の要素を返します. iterate()がyield_returnするか終了するまでブロックします.
	 * クライアントのスレッドから呼ぶ.
	 * <br />
	 * ブロック中にスレッドがインタラプトされた場合はインタラプトステータスをセットします
	 * 
	 * @throws RuntimeException iterate()が例外を投げた場合
	 * @throws NoSuchElementException iterate()が終了した場合
	 */
	@Override
	public T next() {
		if (thread == Thread.currentThread())
			throw new IllegalStateException("変な場所から呼ぶな");

		prepareNext();
		
		if(thrown!=null) {
			final RuntimeException t=thrown;
			thrown=null;
			throw t;
		}
		
		if (!hasNext())
			throw new NoSuchElementException();

		assert nextValue != notPrepared && nextValue != iterateEnd;

		@SuppressWarnings("unchecked")
		final T theValue = (T) nextValue;

		nextValue = notPrepared;

		return theValue;
	}

	@Override
	public void remove() {
		throw new UnsupportedOperationException();
	}

	/***
	 * yield_returnがつかえるよ!
	 * @throws InterruptedException 
	 */
	protected abstract void iterate() throws InterruptedException;
	
	/***
	 * ジェネレータを中断して内部リソースを後始末します.
	 */
	public void shutdown() {
		thread.interrupt();
	}

	/***
	 * yield_returnだよ!
	 * iterate()の走ってるスレッドから呼ぶ
	 * @param value
	 * @throws InterruptedException 中断されたとき
	 */
	protected void yield_return(T value) throws InterruptedException {
		if (thread != Thread.currentThread()) {
			throw new IllegalStateException("変な場所から呼ぶな");
		}
		// nullを変換する
		final Object escapedValue=(value==null)?nullObject:value;
		// yield_returnとnextは交互に発生するので絶対成功する
		if (!yieldValues.offer(escapedValue))
			throw new AssertionError();

		// nextが呼ばれるまでブロック
		resumeLock.acquire();
	}

	/***
	 * 次の要素を準備するメソッドですよ(名前どおり). nextValueフィールドに値を設定します.
	 * yield_returnされた場合はその値を、iterate()が終了した場合はiterateEndを設定する.
	 * ブロック中にスレッドがインタラプトされた場合はインタラプトステータスをセットします
	 */
	private void prepareNext() {
		if (nextValue != notPrepared)
			return;

		// yield return(or 終了)するまで走らす
		if (thread.getState() == State.NEW)
			thread.start();
		else
			resumeLock.release();

		// yield return(or 終了)待ち
		// インタラプトされたらフラグセットしたのち取得再試行
		Object value;
		boolean interrupted=false;
		while(true) {
			try {
				value = yieldValues.take();
				break;
			} catch (InterruptedException e) {
				interrupted=true;
			}
		}
		if(interrupted) Thread.currentThread().interrupt();
		nextValue = (value == nullObject) ? null : value;
	}
}

テスト(junit4)

public class GeneratorTest {
	@Test
	public void test() throws Exception {
		final Generator<Integer> g = new Generator<Integer>() {
			protected void iterate() throws InterruptedException {
				yield_return(1);
			}
		};
		assertTrue(g.hasNext());
		assertTrue(g.hasNext());
		assertEquals(g.next(), 1);
		assertFalse(g.hasNext());
		assertFalse(g.hasNext());
		try {
			g.next();
			fail();
		} catch (NoSuchElementException e) {
			// success
		}
	}

	@Test
	public void emptyGenerator() {
		final Generator<Integer> empty = new Generator<Integer>() {
			@Override
			protected void iterate() {
			}
		};
		assertFalse(empty.hasNext());
		try {
			empty.next();
			fail();
		} catch (NoSuchElementException e) {
			// success
		}
	}

	@Test
	public void nextBeforeHasNext() {
		final Generator<Integer> empty = new Generator<Integer>() {
			@Override
			protected void iterate() {
			}
		};
		try {
			empty.next();
			fail();
		} catch (NoSuchElementException e) {
			// success
		}
		assertFalse(empty.hasNext());
	}

	@Test
	public void nullTest() {
		final Generator<String> g = new Generator<String>() {
			@Override
			protected void iterate() throws InterruptedException {
				for (String s : new String[] { "hoge", null, "hage", "fuga"})
					yield_return(s);
			}
		};
		assertEquals("hoge", g.next());
		assertEquals(null, g.next());
		assertEquals("hage", g.next());
		assertEquals("fuga", g.next());
	}

	@Test
	public void threadSafetyTest() {
		final List<Integer> arr = new ArrayList<Integer>();
		final Generator<Integer> g = new Generator<Integer>() {
			@Override
			protected void iterate() throws InterruptedException {
				for (int i = 1; i < 100; i += 2) {
					arr.add(i);
					yield_return(i);
				}
			}
		};
		// 同期なしでもまざらない(ジェネレータのスレッドはhasNext()/next()でブロックしてる間だけ走る)
		for (int i = 0; i < 100; i += 2) {
			arr.add(i);
			assertEquals(i + 1, g.next());
		}
		assertFalse(g.hasNext());

		for (int i = 0; i < 100; i++)
			assertEquals(i, arr.get(i));
	}
	
	private static void sleepUninterruptibly(long mills) {
		try {Thread.sleep(mills);}catch (InterruptedException ignored) {}
	}
	
	@Test
	public void shutdownTest() {
		final int initialActiveThreads=Thread.activeCount();
		final Generator<Integer> g=new Generator<Integer>() {
			@Override
			protected void iterate() throws InterruptedException {
				yield_return(1);
				yield_return(2);
				yield_return(3);
			}
		};
		assertEquals(initialActiveThreads, Thread.activeCount());
		assertEquals(1,g.next());
		assertEquals(initialActiveThreads+1, Thread.activeCount());
		g.shutdown();
		sleepUninterruptibly(100);
		assertEquals(initialActiveThreads, Thread.activeCount());
		assertFalse(g.hasNext());
	}
	
	@Test
	public void exceptionTest() {
		final Generator<Integer> g=new Generator<Integer>() {
			@Override
			protected void iterate() throws InterruptedException {
				yield_return(0);
				throw new IllegalStateException("hoge");
			}
		};
		assertEquals(0, g.next());
		try {
			g.next();
			fail();
		} catch(RuntimeException e) {
			assertEquals(e.getMessage(), "hoge");
			assertTrue(e instanceof IllegalStateException);
		}
		assertFalse(g.hasNext());
	}
	
	@Test
	public void clientThreadInterruptedTest() throws Throwable {
		final Thread t=new Thread(new Runnable() {
			public void run() {
				Generator<Integer> g=new Generator<Integer>() {
					@Override
					protected void iterate() throws InterruptedException {
						Thread.sleep(200);
						yield_return(100);
					}
				};
				// next()のブロック中にインタラプトされる
				// インタラプトフラグセットして計算続行
				assertEquals(100, g.next());
				assertTrue(Thread.interrupted());
			}
		});
		final List<Throwable> throwns=new ArrayList<Throwable>();
		t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
			@Override
			public void uncaughtException(Thread t, Throwable e) {
				synchronized (throwns) {
					throwns.add(e);
				}
			}
		});
		t.start();
		sleepUninterruptibly(100);
		t.interrupt();
		try { t.join(); } catch(InterruptedException ignored) {}
		for(Throwable thrown : throwns) {
			if(thrown instanceof AssertionError)
				throw thrown;
		}
		if(!throwns.isEmpty()) {
			fail(throwns.size()+" unexpected exception(s)");
		}
	}
}

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証

トラックバック - http://d.hatena.ne.jp/gnarl/20090626/1246026543