2010年02月05日
1分未満の間隔でタスクを定期的に実行させる方法(TaskQueue使用)
ここ数日、初めてGoogle App Engine/Javaを使って、Twitterのボットを作っていました。
その作業で、1分未満の間隔でTwitterにアクセスしてタイムラインを取得するという処理が必要になったのですが、これはCronだけでは実装できません。
そこで、Cronと共にTaskQueueを併用することで、1分未満の間隔での処理を実現しました。
ここ1日ほど観察した限りではどうやら意図したとおりに動いてくれているようなので、その方法を紹介したいと思います。
環境
- Google App Engine 1.3.0/Java
課題とアプローチ
作ったボットは@erabotです。ユーザからの問いかけに対して答えるボットです。
Twitterは1時間に150回まで、つまり最短で24秒に一度TwitterにアクセスしてTimelineやMentionを取得できるように制限を設けています。ですから、これに近い間隔で定期的にTwitterにアクセスするような仕組みが必要なわけです。
この種の定期的に処理を実行する仕組みというのは、GAE/JではCronが用意されています。しかし、残念ながらCronでは最短でも1分間に一度しかタスクを実行できません。つまり、ボットに話しかけたら、返事が返ってくるまで最長で1分待たなければならないということなのです。これは、すぐ答えが返ってきて欲しいせっかちな自分としては許容しがたいものがあります(笑
では、どうすればよいでしょうか?
僕は、TaskQueueを使ってそれを実現しようと思いました。ここで登場する役は次の2つです。
- Updater
- (前回の動作から一定間隔以上あいていたら)Twitterのタイムラインを取得してStatusを更新する役割
- Supervisor
- Updaterの動作を監視し、動作が確認できなくなった場合には新しいUpdaterを動作させる役割
Updaterは短い間隔で動作させる必要があるので、TaskQueueで繰り返し実行させます。これは、終了する直前に新しいタスクをまたTaskQueueに追加するという方法で実現させます。しかし、TaskQueueは現状安定しているとは言い難いようです。そこで登場するのがSupervisorです。
SupervisorはCronで定期的に実行され、Updaterの動作を監視します。そして、もしUpdaterの動作が長時間止まっている場合には新しいUpdaterをTaskQueueに追加します。ただ、もし動作が止まったUpdaterがまた動き出して複数同時にTaskQueueで走るとなると、負荷が増えてマズいです。そこで、個々のUpdaterに乱数で生成したIDを割り振って、そのID以外は動作させないように管理します。
ちなみに、このIDはTaskQueueの問題で同じタスクが複数回実行された場合にも役立ちます。
この仕組みでは、データストアのエンティティを一つ使用します。各プロパティの役割を紹介します。
- lastExec
- 最後にTwitterにアクセスした時刻
- taskUniqueId
- 実行が許可されているUpdaterのID(一つのみ)
実際には、前回読み込んだTwitterのStatus IDを管理する必要もあるのですが、それはここでは省きます。
それでは、動作の詳細を、実際のコードで確認しましょう。
実装
Supervisorのコードは以下のようになります。このコードは、数分間隔でCronで実行させるとよいでしょう。間隔を開けすぎると、Updaterが落ちたとき復活までに時間が掛かることになります。短すぎると、無駄にCPU時間を使います。
public class SampleSupervisorServlet extends HttpServlet { private static final long serialVersionUID = 1L; // 必要なキーはこれ一つのみ private static final Key key = KeyFactory.createKey("bot", 1); // タスクの実行間隔の許容上限(秒) // 最低でもこの間隔で実行されるということが保証されるわけではない private static final int MAX_INTERVAL = 60; @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { final long taskUniqueId = new Random().nextLong(); final DatastoreService ds = DatastoreServiceFactory.getDatastoreService(); final Transaction tx = ds.beginTransaction(); try { // エンティティを取得。なければ作成 Entity entity; try { entity = ds.get(tx, key); } catch (EntityNotFoundException e) { entity = new Entity(key); } // タスクがしばらく動作してない場合 final Date lastExec = (Date)entity.getProperty("lastExec"); if (lastExec == null || MAX_INTERVAL * 1000 < new Date().getTime() - lastExec.getTime()) { // これからTaskQueueに追加するタスクのみ実行許可。 // 複数のタスクが並列で動作することを抑止 entity.setProperty("taskUniqueId", taskUniqueId); ds.put(entity); // TaskQueueに新しいタスクを追加 QueueFactory.getDefaultQueue().add(tx, url("/admin/update").param("id", String.valueOf(taskUniqueId))); } tx.commit(); } finally { if (tx.isActive()) { tx.rollback(); } } } }
一方、Updaterは次のようになります。
この例では、Updaterは約5秒間隔で実行させ、前回からMIN_INTERVAL秒以上が経過した場合にのみTwitterにアクセスして処理を実行するようにしています。
この間隔をどう設定するかも難しいところで、短すぎるとCPU時間を浪費しますが、逆に長すぎるとCronを使って1分間隔で実行した場合と大差なくなってしまいます。
5秒で実行し続けた場合、一日でCPU時間の25%を使いました。このくらいが妥当なところでしょうか?
// /admin/updateで呼び出される public class SampleUpdaterServlet extends HttpServlet { private static final long serialVersionUID = 1L; // タスクを実行する際に最低限開ける間隔(秒) // Twitterの場合、タイムラインの取得は1時間あたり150回までという制限がある // http://apiwiki.twitter.com/Rate-limiting private static final int MIN_INTERVAL = 60 * 60 / 150; // 必要なキーはこれ一つのみ private static final Key key = KeyFactory.createKey("bot", 1); @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { final long thisId = Long.parseLong(req.getParameter("id")); final long nextId = new Random().nextLong(); final Date now = new Date(); final DatastoreService ds = DatastoreServiceFactory.getDatastoreService(); final Transaction tx = ds.beginTransaction(); try { // エンティティを取得。なければ作成 Entity entity; try { entity = ds.get(tx, key); } catch (EntityNotFoundException e) { entity = new Entity(key); } // タスクのIDが許可されたものかどうか判別。許可されてなければタスクは終了 final Long permittedId = (Long)entity.getProperty("taskUniqueId"); if (thisId != permittedId) { return; } // 次にTaskQueueに登録するタスクを許可するように設定 entity.setProperty("taskUniqueId", (Long)nextId); // 最後にタスクが実行された時刻を取得し、間隔が開いていればタスクを実行 final Date lastUpdate = (Date)entity.getProperty("lastExec"); if (lastUpdate == null || MIN_INTERVAL * 1000 < now.getTime() - lastUpdate.getTime()) { // // Twitterにアクセスするタスクをここに記述 // // 実行した時刻を記録 entity.setProperty("lastExec", now); } ds.put(tx, entity); tx.commit(); } finally { if (tx.isActive()) { tx.rollback(); } } // 一定時間待った後、次のタスクをTaskQueueに追加 try { Thread.sleep(5000); } catch (InterruptedException e) { } finally { QueueFactory.getDefaultQueue().add(url("/admin/update").param("id", String.valueOf(nextId))); } } }
結論。Google App Engineは面白い。
こういうしくみを自分で作っていくのが、僕はたまらなく大好きです(笑
追記(22時26分)
コードが一部間違っていたので修正しました。
- 58 http://pipes.yahoo.com/pipes/pipe.info?_id=faa858a20082ef6d25ad27557e37e011
- 27 http://reader.livedoor.com/reader/
- 14 http://www.google.co.jp/search?hl=ja&source=hp&q=タスク 一分おきに実行&lr=&aq=o&oq=
- 10 http://www.google.co.jp/search?q=タスク 実行&btnG=検索&hl=ja&client=firefox-a&hs=BX0&rls=org.mozilla:ja:official&sa=2
- 9 http://www.google.co.jp/search?sourceid=navclient&hl=ja&ie=UTF-8&rlz=1T4GGLL_jaJP304JP304&q=秒間隔 タスク
- 8 http://www.google.co.jp/reader/view/?hl=ja&tab=wy
- 7 http://www.google.co.jp/search?hl=ja&q=タスク+1分毎&lr=&aq=f&oq=
- 7 http://www.google.co.jp/search?sourceid=chrome&ie=UTF-8&q=TaskQueue+admin
- 7 http://www.reddit.com/r/AppEngineJa/comments/ayfjq/1分未満の間隔でタスクを定期的に実行させる方法taskq
- 5 http://b.hatena.ne.jp/hotentry/diary

