Hatena::ブログ(Diary)

※ただし体調がよければ

世の中分からないことだらけ。ツッコミ大歓迎です。

2010年02月05日

1分未満の間隔でタスクを定期的に実行させる方法(TaskQueue使用)

ここ数日、初めてGoogle App Engine/Javaを使って、Twitterのボットを作っていました。

その作業で、1分未満の間隔でTwitterにアクセスしてタイムラインを取得するという処理が必要になったのですが、これはCronだけでは実装できません。

そこで、Cronと共にTaskQueueを併用することで、1分未満の間隔での処理を実現しました。

ここ1日ほど観察した限りではどうやら意図したとおりに動いてくれているようなので、その方法を紹介したいと思います。

環境

  • Google App Engine 1.3.0/Java

課題とアプローチ

作ったボットは@erabotです。ユーザからの問いかけに対して答えるボットです。

Twitterは1時間に150回まで、つまり最短で24秒に一度TwitterにアクセスしてTimelineやMentionを取得できるように制限を設けています。ですから、これに近い間隔で定期的にTwitterにアクセスするような仕組みが必要なわけです。

この種の定期的に処理を実行する仕組みというのは、GAE/JではCronが用意されています。しかし、残念ながらCronでは最短でも1分間に一度しかタスクを実行できません。つまり、ボットに話しかけたら、返事が返ってくるまで最長で1分待たなければならないということなのです。これは、すぐ答えが返ってきて欲しいせっかちな自分としては許容しがたいものがあります(笑

では、どうすればよいでしょうか?

僕は、TaskQueueを使ってそれを実現しようと思いました。ここで登場する役は次の2つです。

Updater
(前回の動作から一定間隔以上あいていたら)Twitterのタイムラインを取得してStatusを更新する役割
Supervisor
Updaterの動作を監視し、動作が確認できなくなった場合には新しいUpdaterを動作させる役割

Updaterは短い間隔で動作させる必要があるので、TaskQueueで繰り返し実行させます。これは、終了する直前に新しいタスクをまたTaskQueueに追加するという方法で実現させます。しかし、TaskQueueは現状安定しているとは言い難いようです。そこで登場するのがSupervisorです。

SupervisorはCronで定期的に実行され、Updaterの動作を監視します。そして、もしUpdaterの動作が長時間止まっている場合には新しいUpdaterをTaskQueueに追加します。ただ、もし動作が止まったUpdaterがまた動き出して複数同時にTaskQueueで走るとなると、負荷が増えてマズいです。そこで、個々のUpdaterに乱数で生成したIDを割り振って、そのID以外は動作させないように管理します。

ちなみに、このIDはTaskQueueの問題で同じタスクが複数回実行された場合にも役立ちます。


この仕組みでは、データストアのエンティティを一つ使用します。各プロパティの役割を紹介します。

lastExec
最後にTwitterにアクセスした時刻
taskUniqueId
実行が許可されているUpdaterのID(一つのみ)

実際には、前回読み込んだTwitterのStatus IDを管理する必要もあるのですが、それはここでは省きます。

それでは、動作の詳細を、実際のコードで確認しましょう。

実装

Supervisorのコードは以下のようになります。このコードは、数分間隔でCronで実行させるとよいでしょう。間隔を開けすぎると、Updaterが落ちたとき復活までに時間が掛かることになります。短すぎると、無駄にCPU時間を使います。

public class SampleSupervisorServlet extends HttpServlet {

	private static final long serialVersionUID = 1L;

	// 必要なキーはこれ一つのみ
	private static final Key key = KeyFactory.createKey("bot", 1);

	// タスクの実行間隔の許容上限(秒)
	// 最低でもこの間隔で実行されるということが保証されるわけではない
	private static final int MAX_INTERVAL = 60;

	@Override
	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		final long taskUniqueId = new Random().nextLong();

		final DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
		final Transaction tx = ds.beginTransaction();
		try {
			// エンティティを取得。なければ作成
			Entity entity;
			try {
				entity = ds.get(tx, key);
			} catch (EntityNotFoundException e) {
				entity = new Entity(key);
			}

			// タスクがしばらく動作してない場合
			final Date lastExec = (Date)entity.getProperty("lastExec");
			if (lastExec == null || MAX_INTERVAL * 1000 < new Date().getTime() - lastExec.getTime()) {
				// これからTaskQueueに追加するタスクのみ実行許可。
				// 複数のタスクが並列で動作することを抑止
				entity.setProperty("taskUniqueId", taskUniqueId);
				ds.put(entity);
				
				// TaskQueueに新しいタスクを追加
				QueueFactory.getDefaultQueue().add(tx, url("/admin/update").param("id", String.valueOf(taskUniqueId)));
			}

			tx.commit();
		} finally {
			if (tx.isActive()) {
				tx.rollback();
			}
		}
	}
}

一方、Updaterは次のようになります。

この例では、Updaterは約5秒間隔で実行させ、前回からMIN_INTERVAL秒以上が経過した場合にのみTwitterにアクセスして処理を実行するようにしています。

この間隔をどう設定するかも難しいところで、短すぎるとCPU時間を浪費しますが、逆に長すぎるとCronを使って1分間隔で実行した場合と大差なくなってしまいます。

5秒で実行し続けた場合、一日でCPU時間の25%を使いました。このくらいが妥当なところでしょうか?

// /admin/updateで呼び出される
public class SampleUpdaterServlet extends HttpServlet {

	private static final long serialVersionUID = 1L;
	
	// タスクを実行する際に最低限開ける間隔(秒)
	// Twitterの場合、タイムラインの取得は1時間あたり150回までという制限がある
	// http://apiwiki.twitter.com/Rate-limiting
	private static final int MIN_INTERVAL = 60 * 60 / 150;
	
	// 必要なキーはこれ一つのみ
	private static final Key key = KeyFactory.createKey("bot", 1);

	@Override
	protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
		final long thisId = Long.parseLong(req.getParameter("id"));
		final long nextId = new Random().nextLong();

		final Date now = new Date();
		final DatastoreService ds = DatastoreServiceFactory.getDatastoreService();
		final Transaction tx = ds.beginTransaction();
		try {
			// エンティティを取得。なければ作成
			Entity entity;
			try {
				entity = ds.get(tx, key);
			} catch (EntityNotFoundException e) {
				entity = new Entity(key);
			}

			// タスクのIDが許可されたものかどうか判別。許可されてなければタスクは終了
			final Long permittedId = (Long)entity.getProperty("taskUniqueId");
			if (thisId != permittedId) {
				return;
			}
			// 次にTaskQueueに登録するタスクを許可するように設定
			entity.setProperty("taskUniqueId", (Long)nextId);

			// 最後にタスクが実行された時刻を取得し、間隔が開いていればタスクを実行
			final Date lastUpdate = (Date)entity.getProperty("lastExec");
			if (lastUpdate == null || MIN_INTERVAL * 1000 < now.getTime() - lastUpdate.getTime()) {
				//
				// Twitterにアクセスするタスクをここに記述
				//

				// 実行した時刻を記録
				entity.setProperty("lastExec", now);
			}

			ds.put(tx, entity);
			tx.commit();
		} finally {
			if (tx.isActive()) {
				tx.rollback();
			}
		}

		// 一定時間待った後、次のタスクをTaskQueueに追加
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
		} finally {
			QueueFactory.getDefaultQueue().add(url("/admin/update").param("id", String.valueOf(nextId)));
		}
	}
}

結論。Google App Engineは面白い。

こういうしくみを自分で作っていくのが、僕はたまらなく大好きです(笑

追記(22時26分)

コードが一部間違っていたので修正しました。

スパム対策のためのダミーです。もし見えても何も入力しないでください
ゲスト


画像認証