Hatena::ブログ(Diary)

作業記録/備忘録(仮) このページをアンテナに追加 RSSフィード

2012/12/02

celeryでclassのmethodを利用する方法

概要

django+celeryを利用して、あるクラスの処理を非同期に処理しようとした際に、メソッドを利用するところで、「takes exactly 2 arguments (1 given)」と怒られた。
(selfが渡っていないときなどによく見るエラー)
調べてみると、celeryから、classのmethodを利用する際には、それに応じた手続きが必要なようであったので、以下その方法を記載しておく。

背景

あるクラス(Converterというクラス)に含まれるメソッド"convert()"を利用した際に、「raised exception: TypeError('convert() takes exactly 2 arguments (1 given)',)」と怒られた。

本家のドキュメント(celery.contrib.methods)にあったので、それを参考に、後述の様に書き換えたところ動いた。

上記サイトによれば、taskのベースクラスが新しくなったそうで、これまでのような古い書き方は推奨されないらしい。

The task must use the new Task base class (celery.Task), and the old base class using classmethods (celery.task.Task, celery.task.base.Task).

This means that you have to use the task decorator from a Celery app instance, and not the old-API:

from celery import task # BAD
from celery.task import task # ALSO BAD
(略)


ということで、以下自分の使ったクラス(Conveterクラス)を例に、手順を示す。
元々のConverterクラスはこんな感じ。

from celery import task
class Converter(object):
     def __init__(self, converter="xxx", file_format="yyy"):
         pass
         # いろいろ処理

     @task()
     def convert(self, in_file):
     pass
         # いろいろ処理

以下の様に書き換えた。(from celery ... --> from celery.contrib.methods ...)

from celery.contrib.methods import task
class Converter(object):
     def __init__(self, converter="xxx", file_format="yyy"):
         pass
         # いろいろ処理

     @task()
     def convert(self, in_file):
     pass
         # いろいろ処理

あとは、普通に、非同期実行でOK。

$ python
Python 2.6.6 (r266:84292, Sep 11 2012, 08:34:23) 
[GCC 4.4.6 20120305 (Red Hat 4.4.6-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from xxx.converter import Converter
>>> c = Converter()
>>> c.convert.delay("xxx/aaa.bbb")
<AsyncResult: 43c72c43-f535-45f0-b6a2-2ed60897c352>
>>>

celery側のログ

[2012-12-02 03:32:37,116: INFO/MainProcess] Task xxx.converter.convert[50e07a93-6eef-48f8-84f8-249d03354089] succeeded in 0.123465061188s: 'xxx/aaa.bbb'

2012/11/11

django-celeryのインストール

以下、2012/12/02に全面的に追記/改修した。

概要

django-celeryインストール手順と、簡単な動作確認の方法について以下記載する。

django-celeryとは

celeryを使う(CentOS)で書いた"celery"をdjangoと連携して動作させるモジュールである。
これを使うことで,djangoの設定ファイル(settings.py)内の"INSTALLED_APPS"に記載したアプリ向けモジュールを、celeryタスクとして利用し、djangoからceleryタスク実行できる。
以下は、公式サイトの説明。

django-celery provides Celery integration for Django;
Using the Django ORM and cache backend for storing results,
autodiscovery of task modules for applications listed in INSTALLED_APPS, and more.

インストール手順

easy_install django-celeryのみで基本OK。
ただし、自分の場合、途中で以下のようなエラーが出た。
(けど結果的にはインストールされていた。正しくインストールされていないかもしれないが...)

$ sudo easy_install django-celery
(中略)
Installed /usr/lib/python2.6/site-packages/billiard-2.7.3.18-py2.6-linux-x86_64.egg
Finished processing dependencies for django-celery
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.6/site-packages/billiard-2.7.3.10-py2.6-linux-x86_64.egg/billiard/util.py", line 291, in _exit_function
    for p in active_children():
  File "/usr/lib/python2.6/site-packages/billiard-2.7.3.10-py2.6-linux-x86_64.egg/billiard/process.py", line 75, in active_children
    _cleanup()
TypeError: 'NoneType' object is not callable
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib64/python2.6/multiprocessing/util.py", line 258, in _exit_function
    info('process shutting down')
TypeError: 'NoneType' object is not callable
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib64/python2.6/multiprocessing/util.py", line 258, in _exit_function
    info('process shutting down')
TypeError: 'NoneType' object is not callable

なお、Install途中で、勝手に(親切に) celeryのVersionがupgradeされていたようであった。
(3.0.1->3.0.12になった)

インストールの確認

上記でうまく言ってなさそうだったので、sudoでなくsu -になってやってみたが、入っている様子であった。

# easy_install django-celery
Searching for django-celery
Best match: django-celery 3.0.11
Processing django_celery-3.0.11-py2.6.egg
django-celery 3.0.11 is already the active version in easy-install.pth
Installing djcelerymon script to /usr/bin
Installing djcelerymon script to /usr/bin

Using /usr/lib/python2.6/site-packages/django_celery-3.0.11-py2.6.egg
Processing dependencies for django-celery
Finished processing dependencies for django-celery

一応、VERSIONの確認。
python:2.6.6
djcelery:3.0.11
celery:3.0.12
django:1.4.0

root@localhost:~# python
Python 2.6.6 (r266:84292, Sep 11 2012, 08:34:23) 
[GCC 4.4.6 20120305 (Red Hat 4.4.6-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import djcelery
>>> djcelery.VERSION
(3, 0, 11)
>>> import celery
>>> celery.VERSION
(3, 0, 12)
>>> import django
>>> django.VERSION
(1, 4, 0, 'final', 0)
>>> 

動作確認

django側の設定を行い、続いてサンプルタスクを用意して、実行する例を以下に記載する。
djangoプロジェクトの作成。(ここでは"mysite"とした。)

$ django-admin.py startproject mysite
$ cd mysite/

できたdjangoの設定ファイル(setting.py)に以下を記載する。

setting.pyの設定

冒頭に、モジュールロードのための設定を追記する。

import djcelery
djcelery.setup_loader()

djceleryモジュールdjangoから呼べるように設定を追記する。

INSTALLED_APPS += ("djcelery", )

メッセージブローカー*1URL指定を行う。
(RabbitMQを利用する場合は以下で動いた)

BROKER_URL = 'amqp://guest:guest@localhost:5672/'

DBの設定を追記する。
(以下は簡単のためにsqliteを利用する設定例)

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3', # Add 'postgresql_psycopg2', 'mysql', 'sqlite3' or 'oracle'.                 
        'NAME': 'sample_django_celery',                      # Or path to database file if using sqlite3.                    
        'USER': '',                      # Not used with sqlite3.                                                            
        'PASSWORD': '',                  # Not used with sqlite3.                                                            
        'HOST': '',                      # Set to empty string for localhost. Not used with sqlite3.                         
	'PORT': '',                      # Set to empty string for default. Not used with sqlite3.                           
    }
}

DB初期化

$ python ./manage.py syncdb
Creating tables ...
Creating table auth_permission
Creating table auth_group_permissions
Creating table auth_group
Creating table auth_user_user_permissions
Creating table auth_user_groups
Creating table auth_user
Creating table django_content_type
Creating table django_session
Creating table django_site
Creating table celery_taskmeta
Creating table celery_tasksetmeta
Creating table djcelery_intervalschedule
Creating table djcelery_crontabschedule
Creating table djcelery_periodictasks
Creating table djcelery_periodictask
Creating table djcelery_workerstate
Creating table djcelery_taskstate

You just installed Django's auth system, which means you don't have any superusers defined.
Would you like to create one now? (yes/no): yes  ★この辺適当に答える
Username (leave blank to use 'username'):   ★この辺適当に答える
E-mail address: username@hoge.com   ★この辺適当に答える
Password:   ★この辺適当に答える
Password (again):   ★この辺適当に答える
Superuser created successfully.
Installing custom SQL ...
Installing indexes ...
Installed 0 object(s) from 0 fixture(s)

※ここで、参考サイトのとおりに、"migrate"しようとしたら以下のように実行できなかったのでスルーした。

$ python ./manage.py migrate djcelery
Unknown command: 'migrate'
Type 'manage.py help' for usage.

アプリ初期化

$ python manage.py startapp celerytest

First steps with Djangoに従って、サンプルアプリとしてcelerytestにtasks.pyをつくる。
具体的には以下の通り。
celerytest/tasks.pyに以下を記載する。

from celery import task
@task()
def add(x, y):
    return x + y

celerytestのtasksモジュールdjangoから呼べるように設定を追記する。

INSTALLED_APPS = (
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.sites',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    # Uncomment the next line to enable the admin:                                                                    
    # 'django.contrib.admin',                                                                                         
    # Uncomment the next line to enable admin documentation:                                                          
    # 'django.contrib.admindocs',                                                                                     
    'djcelery', # for django 2012/11/11added                                                                
    'celerytest.tasks', ★ここ
)

※記載の仕方は、project.appのように指定する。

django+celeryの起動。

$ python ./manage.py celery worker --loglevel=debug

command promptからadd(3,5)を実行。

$ python
Python 2.6.6 (r266:84292, Sep 11 2012, 08:34:23) 
[GCC 4.4.6 20120305 (Red Hat 4.4.6-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from celerytest.tasks import add
>>> add.delay(3,5)
<AsyncResult: 24ea2b65-2898-4756-bc00-9bee2314cbcc>
>>>

この時、django+celeryを起動した側のlogを見ると、以下のように、3+5の結果の"8"を表示していることがわかる。

[2012-12-01 23:51:47,143: INFO/MainProcess] Task celerytest.tasks.add[24ea2b65-2898-4756-bc00-9bee2314cbcc] succeeded in 0.045814037323s: 8

celerydaemon化などは、以前やって少々面倒くさかったのでいつか記載するかも。

参考

本家(django-celery 3.0.11)
http://pypi.python.org/pypi/django-celery
First steps with Django
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

*1celeryを使う(CentOS)あたりも参照。

2012/08/07

celeryの実行でハマったこと(serializeにpickleが使われていた)

結論

celeryではserialize/deserialize(defaultではpickleを利用)してデータの受け渡しをしているため、celeryに渡すデータは、serialize/deserialize可能なデータにしましょう。

背景

celeryに、タスクを渡した時(do.delay(something))に、decode errorになる現象が起きていました。

celeryを利用してprocessの起動までは辿りつけましたが、processに渡す引数(something)にStringO形式のものが含まれていたりするとうまく動かないことが分かりました。
例えば、ファイルなどが相当します。

この辺の情報
http://stackoverflow.com/questions/4330719/django-celery-how-to-send-request-filesphoto-to-task

http://stackoverflow.com/questions/3642107/python-pickling-error-when-using-sessions


そこで、StringOが含まれない形でceleryに渡すように改造しましたが、まだどうも期待通りに動いていない部分がありました。
例えば、Can't decode message body: RuntimeError('maximum recursion depth
exceeded')と怒られました。(データの階層が深すぎという事かもしれません)

この"decode"という言葉が気になって調べていくと、どうやら単に渡したデータをqueueに積む(message broker経由で渡す)だけで無いらしいことが分かりました。

celeryでの処理手順

まず、celeryでは、どのように処理されているか説明します。
taskをqueueに積む際に、「どのデータをどの関数で処理するか」をceleryに通知します。
例えば、通常以下のようにします。

@task
def do(something):
 # something to do
 print something

上記が定義されていたとすると、

do.delay(something)

のようにしてceleryに実行依頼します。
(詳細の使い方は、http://d.hatena.ne.jp/nihohi/20120711なども参照ください)
このとき、somethingをserializeした形でqueueに積みます。
defaultではserializeにpickleを使用します。
(serialize moduleはconfigのCELERY_TASK_SERIALIZERで変更可能)

同様に、celeryが実際にタスク実行する際には、queueからdeserializeして実行します。

従って、celeryに渡す"something"はpickleでserialize/deselialize可能なものでなければなりません。

最初に失敗していたのは、このserialize(pickle化)で失敗していました。
StringO形式のものを含む場合、このserializeで失敗するようです。

一方"背景"で説明した失敗の方は、deserialize(unpickle化)で失敗していました。
こちらの、正確な原因は分かっていませんが、「極端に再帰的なデータ構造を pickle 化しようとした場合には再帰の深さ制限を越えてしまうかもしれず、この場合には RuntimeError が送出され」るそうですのできっとこれが起きていたでしょう。
なお、「 sys.setrecursionlimit() で慎重に上げていくことは可能」だそうです。([参考]リンク参照)

また、debug/検討過程でやはりceleryへの実行依頼に失敗する(serializeに失敗)することがありました。

こちらは、pickleでは、serializeする際に、「pickle化可能な関数やクラスがモジュールのトップレベルで定義されていなければならない」そうですが、これまで、関数内で定義したsub classを利用して生成していたため、失敗していたようです。

参考

何を pickle 化したり unpickle 化できるのか?
http://www.python.jp/doc/2.6/library/pickle.html?highlight=pickle#pickle-unpickle

2012/08/03

apache + wsgi + djangoでのdebug方法

※「wsgiファイルにdebugger起動の設定を書いておく」を追記し、「link」に関連するポインタを追記しました。2012/08/05

概要

apache + wsgi + djangoを利用している時、pdbを利用したdebug方法について示します。

背景

djangoでdebugしていて、通常であればmanager.py runserverなどで確認していたが、事情によりapacheでの動作中に確認する必要があった。
簡単なところは、print debugしていたが、だんだん辛くなり、debuggerを利用する方法について調べたら分かったので備忘として残しておきます。

環境

OS CentOS 6.0
python 2.6.6
mod_wsgi 3.3
apache 2.2.15

(1)wsgiファイルにdebugger起動の設定を書いておく

djangoのprojectディレクトリ(setting.pyとかあるところ)にあるxxx.wsgiファイルにclass Debugerの設定と、applicationに適用する設定を書いておく。
個の記載をすることで、applicationが呼ばれる際にdebuggerを起動できる。

下記例では、class Debugger(object):以降の部分が追記する部分。

# cat ../gui/django.wsgi
import os, sys
sys.path.append('/var/www/xxx')
os.environ['DJANGO_SETTINGS_MODULE'] = 'gui.settings'

import django.core.handlers.wsgi
application = django.core.handlers.wsgi.WSGIHandler()

class Debugger(object):
    def __init__(self, object):
        self.__object = object

    def __call__(self, *args, **kwargs):
        import pdb, sys
        debugger = pdb.Pdb()
        debugger.use_rawinput = 0
        debugger.reset()
        sys.settrace(debugger.trace_dispatch)

        try:
            return self.__object(*args, **kwargs)
        finally:
            debugger.quitting = 1
            sys.settrace(None)

application = Debugger(application)

(2).htaccessをdebugしたいprogramのあるdirectoryに配置する

PythonHandlerのところに、debuggerを起動させたいprogramを指定する(?)。<--ちゃんと調べてません
下記例では、yyy.py

 AddHandler python-program .py
 PythonHandler yyy.py
 PythonEnablePdb On

(3)debugger起動待ち状態にする

 # apachectl -k stop
 # apachectl -DONE_PROCESS

プロンプトが戻ってこなくなり、pdb起動待ちになります。

(4)web accessすると、debuggerが起動するのでそのままdebugをすすめる。

[root@xxx XxxYyy]# apachectl -DONE_PROCESS   <--ここで待ちになるが、web browserアクセスしたら返ってくる
> /usr/lib/python2.6/site-packages/Django-1.4.1-py2.6.egg/django/core/handlers/wsgi.py(213)__call__()
-> if self._request_middleware is None:
(Pdb) l
208  	    request_class = WSGIRequest
209  	
210  	    def __call__(self, environ, start_response):
211  	        # Set up middleware if needed. We couldn't do this earlier, because
212  	        # settings weren't available.
213  ->	        if self._request_middleware is None:
214  	            self.initLock.acquire()
215  	            try:
216  	                try:
217  	                    # Check that middleware is still uninitialised.
218  	                    if self._request_middleware is None:
(Pdb) b /var/www/xxx/yyy.py:39
Breakpoint 1 at /var/www/xxx/yyy.py:39
(Pdb)

link

Mod_pythonアプリケーションをpdbでデバッグ
http://www.daemonfreaks.com/blog/200705281740.html

Debugging Techniques
http://code.google.com/p/modwsgi/wiki/DebuggingTechniques

2012/07/11

celeryを使う(CentOS)

概要

Celery(セロリ)は、非同期処理を行うタスクキュー/ジョブキューで、メッセージブローカーと呼ばれる、メッセージ送受信する仕組みと組み合わせて使用する。
メッセージブローカーは、RabbitMQ、Redis、Databaseなどから選択できるが、お勧めは、RabbitMQらしい。

Celeryの説明〜本家サイトより〜

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.
The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).


背景

非同期処理を仲介する、ありもののmodule/applicationとして、celeryの調査を行った。
pythonでは、multi thread処理ではGILの問題があるので、当初multiprocessing moduleを利用して複数process化(worker)+server化+queueを用意、などを考えていた。
結論としては、今回の用途にはceleryを使えば一発で済む。
また、設定も比較的容易にできた。
上記のような目的の場合であれば、celeryを使えば容易に済んだので、そういった方は利用を検討してみてもいいかもしれない。

以下の設定手順では、RabbitMQ+Celeryでの動作例を記載する。




以下の設定手順では、RabbitMQ+Celeryでの動作例を記載する。

設定手順

erlang*1rabbitmq-serverのinstall

repositoryの設定

# wget http://ftp.jaist.ac.jp/pub/Linux/Fedora/epel/6/x86_64/epel-release-6-7.noarch.rpm
# rpm -ivh epel-release-6-7.noarch.rpm 

erlangのinstall

# yum install erlang

rabbitmq-serverのinstall

# wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.4/rabbitmq-server-2.8.4-1.noarch.rpm
# rpm -ivh rabbitmq-server-2.8.4-1.noarch.rpm

serverの起動

# service rabbitmq-server start
celeryの設定

celeryのinstall

# yum install gcc
# yum install python-devel 
# easy_install celery

celeryのサンプルコードと起動

セロリアプリとして、tasks.pyを以下のように用意する。(以降、本家チュートリアルより)

from celery import Celery

celery = Celery('tasks', broker='amqp://guest@localhost//')

@celery.task
def add(x, y):
    return x + y

作成したtasks.pyを呼び出してceleryを起動する。
workerとしてプログラムを動作させる。

$ celery -A tasks worker --loglevel=info

celery_test.pyとして以下を作成する。
delay()メソッドを使うことで、celeryを使える。

#! /usr/bin/env python
# -*- coding: utf-8 -*-
from tasks import add
add.delay(4, 4)

実行する。

$ ./celery_test.py

すると、celeryを起動したconsoleで、以下のように出力され、実行されたことがわかる。
log末尾に計算結果の"8"が見える。

[Tasks]
  . tasks.add

[2012-07-12 01:45:22,873: WARNING/MainProcess] celery@localhost.localdomain has started.
[2012-07-12 01:45:27,038: INFO/MainProcess] Got task from broker: tasks.add[d4ccee7e-2f4a-4f75-84fc-e91846bfe6f9]
[2012-07-12 01:45:27,052: INFO/MainProcess] Task tasks.add[d4ccee7e-2f4a-4f75-84fc-e91846bfe6f9] succeeded in 0.000603914260864s: 8

先の、tasks.pyにtime.sleep(3)等を入れると、celery_test.pyとtasks.pyが非同期に処理されることがわかる。(celery_test.pyはメッセージを投げてすぐ返る。tasks.pyはメッセージをceleryから受けて処理して返る。)

後日、djangoとの連携についても書きます。

リンク

本家:
http://celeryproject.org/
チュートリアル(わかりやすい):
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html
rokujyouhitomaの日記([備忘録]DjangoCelery、RabbitMQの設定。):
http://d.hatena.ne.jp/rokujyouhitoma/20100625/1277477730

*1:erlang(アーラン)とは、「コンピュータにおいて汎用的な用途に使うことができる並行処理指向のプログラミング言語および実行環境」とのこと。http://ow.ly/caBTG RabbitMQはerlangで実装されている。