Hatena::ブログ(Diary)

玉虫色に染まれ! このページをアンテナに追加 RSSフィード

2016-02-21

SciPyとその仲間たち(NumPy, IPython など)の違いと関係

python でちょっとした信号処理をしたくて、存在だけは以前から知っていた numpy に手を出したので、関連パッケージなどをまともに調べてまとめました。(ある程度素人の主観が入っているので、鵜呑みにしないように)


SciPy

総本山: http://www.scipy.org/

語源は Science + Python で、発音は "Sigh Pie(サイパイ)"。

今日の記事で挙げる各種ソフトの組み合わせによって実現されている、Python ベースの数学・科学技術計算用ソフトウェアエコシステムの名称。つまり、個別のソフトではなく、その集合体による計算環境のこと。乱暴にざっくり言ってしまえば、「Mathmatica や Matlab に代わる環境を Python でつくろうプロジェクト」 が SciPy。

なお、SciPy のコアパッケージの1つに、後述の SciPy library があり、そちらを指して SciPy と言うことも多い(というか大抵そっち)ので文脈で判断を。


NumPy

Pytyon は緩い型付けの言語なので、単純な処理を多量のデータに対して施す、といった処理が遅い。例えば、配列の数値の合計を計算する場合、Python の配列にはどんなデータも置けるので、いちいち各要素が数値なのかどうか型チェックをし、必要なら変換し……といった処理が必要になる。これがC言語であれば、1ループがアセンブリ数命令ですむし、SSEなどのベクトル計算命令を使うこともでき、かなり高速に処理できる。

NumPyは、この「大量のデータの配列の処理」を高速に行えるようにするライブラリである。NumPy は ndarray という名の型の配列を作ることができる。この型でデータ配列をつくると、データ自体は裏で C 言語の世界で確保され、配列全体に対する合計・平均や、値の設定・四則演算、果ては、FFT計算など、先ほど弱いと説明した処理は、 C 言語の世界に閉じて行われるようになるため、C言語に引けを取らない計算速度が得られる。

NumPyでは、N次元配列に対して、四則演算や統計系の演算、sinやlogなどの算術演算が定義されているが、完全な一覧はリファレンスマニュアル参照

ドキュメント: https://docs.scipy.org/doc/numpy/reference/routines.html


SciPy library

前述の通り、NumPy は N次元配列に対する基本的な操作は提供してくれてはいるが、Matlab相当か、と言われるとそんなことは無い。例えば、FFT変換は NumPy でも提供してくれているが、B-spline 補完を計算したい、とか、インテグラルの計算をしたい、とかはサポートしてくれていない。

SciPy library は、はそのような数学・科学技術計算の関数の詰め合わせで、Matlab 相当まで引き上げるための補完のライブラリだと思えば良い。

NumPyで元から提供されている関数であっても、大抵、scipyのネームスペースからリンクが張られているので、「ちょっと複雑な計算がしたい」と思ったら、scipyのリファレンスから探して、それを使うのが良いだろう。

ドキュメント: http://scipy.github.io/devdocs/


matplotlib

名前の通り、データをプロットした画像を作成するライブラリ。

NumPy配列で計算した結果のデータをグラフ化したい、というのは当然よくある話なので、大抵組み合わせて使用される。

どんなグラフがかけるかは下記公式ギャラリーを参照

http://matplotlib.org/gallery.html

グラフ画像とそれを出力するコードがセットになっているので、見れば大体使い方はわかるだろう。


IPython

Python はスクリプトを指定せずに起動すると、インタラクティブシェルを立ち上げることができるが、そんなにリッチなシェルではないので、Matlab のコンソールの置き換えとしては弱い。それを補完するのが IPython である。(つまり、IPython はアプリの名前であって、NumPy などのような Python モジュールというわけではない。)

IPython のシェルでは、タブ補完ができたり、動的に関数ヘルプが見れたり、matplotlibの結果表示も統合して表示できたりする。

特に Mathmatica Notebook のパクインスパイアな IPython notebook が便利で、Markupの文章や数式と、Pythonのコードやその出力を交ぜ書きし、記録に残して共有できる。詳細は以下の記事が詳しい

Self Defense Guide


SymPy

わかる人向けに一言でいうなら、「Matlab 担当が SciPy library なら、Mathmatica 担当が SymPy」といったところ。

NumPy や SciPy library は、前述の通り、数値配列(=離散的なデータ)を扱うものだが、SymPyでは symbol を定義して、数式を数式のまま扱うことができるため、理論上計算誤差は発生せず、微分積分を計算したり、連立方程式を解いたりすることもできる。

チュートリアル(イントロ部分): http://docs.sympy.org/latest/tutorial/intro.html

自分が興味があるのは数値計算のほうなので、SymPyのほうは積極的に使うことはないが、数学を忘れた頭には、微分を計算してくれるのはありがたいので、電卓としては使うかもしれない。


Pandas

numpy のデータ列に対して、SQLでやるような統計データ処理(WHERE で絞って GRUPE BY したものを SELECT SUM するみたいな)を行えるようにした便利ライブラリ。現実世界で収集したデータをCSVなどで取り込んできて処理するのが主な使い方だろう。

時間のフォーマットを解釈してくれるので「指定期間のデータを抜き出す」といったことができたり、「性別の項の値でグループ分けした時の、それぞれのグループの評価値の平均を求める」といったことができたりする。具体的にどんなことができるかは、中の人の下記の10分動画で紹介されている。

紹介動画: 10-minute tour of pandas on Vimeo

テーブルの結合や group by などは、SciPy library の処理結果に対して使えるシーンもあるかもしれない。(ぱっとは思いつかないが)


ほかには?

SciPy の公式サイトには、コアパッケージとして、上記や Python 自体の他、nose というテストフレームワークが挙げられている。

また、関連パッケージも鬼のような数が挙げられているが、 SciPy 自体が Python 畑ではない方面から流行っていることもあり、コアパッケージのみの素環境が主流のようだ。個人的には、使いやすいグラフ関連の補助ライブラリを探してみたいところ。

2015-08-16

純 Python で C のライブラリを呼び出す(Linux inotifyを例に)

Python は Batteries included 哲学のもと、豊富なモジュールが標準で組み込まれていますので、大抵のことは標準モジュールを使えばできますし、そうでなくても、どこかの誰かが Python binding を作ってくれているはずですので、Cのライブラリを直接呼び出すような必要はほとんどありません。

ただ、今回、Cのコンパイル無しに(≒モジュールの追加インストール無しに) inotify を使いたい、という事情があり、直接 libc を呼び出す binding を作りました。コード自体はすごく簡単なのですが、概念が若干わかりにくかったので、今日の記事はその備忘録です。

ctypes と struct を使う!

使うモジュールは主に ctypes です。加えて、inotify では、read() で構造体の読み込みを行うため、read で読んだバイナリ文字列をパースするために struct も必要になりました。

いずれも、公式の標準ライブラリドキュメントに詳細なチュートリアルがあるので、じっくり読み込めば理解はできます。(やることがマニアックなので、ちゃんと頭を回しながら読まないと辛いとおもいますが。)

ctypes でのライブラリ読み込み

まずは、目的とするライブラリを読み込みます。

ファイル名などが正確に分かっている場合はそれを指定して

libc = CDLL("libc.so.6")

などとします。

同名のライブラリでもバージョンが違うとAPIが変わる可能性があるため、バージョン番号まで含めた指名をする必要があります。

バージョンはどれでもよく、そのシステムにインストールされているものを使うのであれば、find_libray()を使って

libc = CDLL(find_library("c"))

といったやりかたができます。(が、もちろん、バージョンが違うと、元々欲しかったAPIが無かったり、変わっている可能性はありますので注意が必要です。)

関数の宣言

ライブラリが export している関数は、読み込んだ dll オブジェクトの属性としてアクセスできます。

print libc.time(None)

簡単な用途であれば、このまま使えばよいのですが、引数のチェックも働きませんし、戻り値も int とみなして扱われるため、このままでは不都合な場合もあると思います。

ですので、大抵の場合は、一旦関数プロトタイプをつくって、呼び出し用の外部関数オブジェクトをつくってやるのがよいでしょう。

inotify_add_watch = CFUNCTYPE(c_int, c_int, c_char_p, c_uint32)(
    ("inotify_add_watch", libc),
    ((1, "fd"), (1, "pathname"), (1, "mask"))
)

これで、 inotify_add_watch() 関数が使えるようになります。

CFUNCTYPE に与えている引数は、先頭が戻り値の型で、残りは引数の型です。CFUNCTYPEが返す関数に対して与えている引数は、第一引数が (シンボル名, dllオブジェクト) のタプルで、第二引数は引数の情報を表す (フラグ, パラメータ名, デフォルト値) タプルのリストになります。(各引数で、パラメータ名とデフォルト値は省略可能)

フラグは、1: 入力パラメータ 2: 出力パラメータ 4: デフォルトが整数0 の組み合わせの整数です。(ポインタを渡してそこに書かせるような関数なら 2 を指定するわけですね。)

パラメータ名は、関数を呼ぶときの名前付き引数として使えますし、デフォルト値まで指定しておけば、引数を省略して呼べるようになります。

ここで、引数の型として c_char_p などを使っていますが、これらは ctypes モジュールで定義されています。構造体などを自分で定義することもできますので、その時は公式リファレンスを参照のこと。

また、「戻り値が負の値だったら exception を raise する」とか、「出力パラメータの引数の値を関数の戻り値として返す」といったちょっと高度な結果処理をしたい場合、errcheck 属性を使うことで実現できます。こちらも公式リファレンス参照。

pure-python の inotify binding

というわけで、上記のテクニックを使って inotify の binding をつくってみました。

#!/usr/bin/env python2

from ctypes import CDLL, CFUNCTYPE, c_char_p, c_int, c_uint32
from ctypes.util import find_library
import argparse
import os
import struct
import sys

__libc = CDLL(find_library("c"))

inotify_init = CFUNCTYPE(c_int)(("inotify_init", __libc), ())
inotify_init1 = CFUNCTYPE(c_int, c_int)(("inotify_init", __libc), ((1, "flags"),))

inotify_add_watch = CFUNCTYPE(c_int, c_int, c_char_p, c_uint32)(
    ("inotify_add_watch", __libc),
    ((1, "fd"), (1, "pathname"), (1, "mask"))
)

inotify_rm_watch = CFUNCTYPE(c_int, c_int, c_int)(
    ("inotify_rm_watch", __libc),
    ((1, "fd"), (1, "wd"))
)

IN_ACCESS           = 0x00000001
IN_MODIFY           = 0x00000002
IN_ATTRIB           = 0x00000004
IN_CLOSE_WRITE      = 0x00000008
IN_CLOSE_NOWRITE    = 0x00000010
IN_OPEN             = 0x00000020
IN_MOVED_FROM       = 0x00000040
IN_MOVED_TO         = 0x00000080
IN_CREATE           = 0x00000100
IN_DELETE           = 0x00000200
IN_DELETE_SELF      = 0x00000400
IN_MOVE_SELF        = 0x00000800

IN_UNMOUNT          = 0x00002000
IN_Q_OVERFLOW       = 0x00004000
IN_IGNORED          = 0x00008000

IN_CLOSE            = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE
IN_MOVE             = IN_MOVED_FROM | IN_MOVED_TO

IN_ONLYDIR          = 0x01000000
IN_DONT_FOLLOW      = 0x02000000
IN_EXCL_UNLINK      = 0x04000000
IN_MASK_ADD         = 0x20000000
IN_ISDIR            = 0x40000000
IN_ONESHOT          = 0x80000000

def _decode_flag(flag):
    flag_names = [x for x in globals().keys() if x.startswith('IN_')]
    flag_names.remove('IN_CLOSE')
    flag_names.remove('IN_MOVE')
    flag_names.sort(key=lambda x: globals()[x])
    r = []
    for n in flag_names:
        if globals()[n] & flag != 0:
            r.append(n)
    return r

def main():
    def integer(x):
        return int(x, base=0)
    parser = argparse.ArgumentParser()
    parser.add_argument('path', nargs='?', default='.',
        help='directory or file to watch')
    parser.add_argument('-f', '--flag', action='append', default=[],
        choices=[x for x in globals().keys() if x.startswith('IN_')],
        metavar='FLAG', help='event name to watch')
    parser.add_argument('-m', '--mask', default=0, type=integer,
        help='numeric event mask to watch')
    parser.add_argument('-a', '--all', action='store_const',
        dest='mask', const=0xfff, help='watch all event(-m 0xfff)')
    args = parser.parse_args()

    mask = args.mask
    for x in args.flag:
        mask |= globals()[x]
    if mask == 0:
        mask = IN_ACCESS

    print 'watching {} for inotify events: {}'.format(
        args.path, ",".join(_decode_flag(mask)))

    fd = inotify_init()
    wd = inotify_add_watch(fd, args.path, mask)
    try:
        while True:
            buf = os.read(fd, 4096)
            i = 0
            fmt = 'iIII'
            fmt_size = struct.calcsize(fmt)
            while i < len(buf):
                wd, mask, cookie, name_len = struct.unpack_from(fmt, buf, i)
                i += fmt_size
                name = buf[i:i+name_len]
                i += name_len
                print("{:08x}({}) {} {}".format(
                    mask, ",".join(_decode_flag(mask)), cookie, name))
    except KeyboardInterrupt:
        pass
    inotify_rm_watch(fd, wd)
    os.close(fd)

if __name__ == '__main__':
    sys.exit(main())

binding 本体は先頭の 20行ほどで終わっていて、残りは定数定義と、単体実行時の main() 関数になります。

このスクリプトを -a オプション付きで実行すると、カレントディレクトリに対するすべてのファイルアクセスを監視しますので、試してみてください。(他のオプションは -h で)

2015-05-31

CentOS 5 に Python 2.6 & PIP を導入(EPELリポジトリの利用)

我が家には CentOS 5 で動作しているサーバーがいるのですが、そいつに新しいことをやらせたくて、Python 環境をアップグレードしようとしたので、今日はそのメモです。

CentOS 5 は Python 2.4 が標準となっていますが、PIP以前ネタにした setuptools の上位版のようなもの)は、2.6以降にしか対応していません。 CentOS のオフィシャルなリポジトリには Python のアップデートはありませんので、今回は EPEL リポジトリを有効にしてそちらから Python 2.6 を入れ、その上で easy_install と PIP を導入しました。

導入手順

EPEL は、RHEL(Red Hat Enterprise Linux)やその派生ディストリビューション(CentOSなどですね)向けの RPM リポジトリで、Fedora向けにつくられた RPM パッケージを RHEL に適用できるように調整したものが収められています。

公式サイトにかかれているとおり、RHELのバージョンに応じた epel-release パッケージをダウンロードしてインストールすれば、yum が EPEL リポジトリも参照するようになり、python2.6もインストールできるようになります。

$ wget http://ftp.iij.ad.jp/pub/linux/fedora/epel/5/i386/epel-release-5-4.noarch.rpm
$ sudo rpm -i ./epel-release-5-4.noarch.rpm 
$ sudo yum install python26

なお、EPELからのパッケージを始めてインストールしようとするときは、署名の確認が行われますので、「署名を信用する」と答えてやる必要があります。(が、[公式の鍵一覧:title=https://getfedora.org/keys/には、もはや EPEL 5 用の鍵は掲載されていないという……)

つづけて、setuptools(easy_install) と pip の導入ですが、手順は以下のとおりです。

$ wget --no-check-certificate https://bootstrap.pypa.io/ez_setup.py
$ sudo python26 ez_setup.py --insecure
$ sudo easy_install-2.6 pip

ez_setup.py の取得等で証明書チェックなどを無効にしているのは、 setuptools の公式解説に書いてあるとおり、CentOS 5 のような古い環境での workaround です。SSLの署名チェックが行われないので、中間者攻撃などが行われた場合は悪意のあるバイナリが紛れ込む可能性のある、危ない状況ですので、やる場合は覚悟が必要です。

これで、なんとか、python26, easy_install-2.6, pip2.6 という名前のコマンドがインストールされ、使えるようになりました。

とはいえ、こんなことまでして古い環境延命させるぐらいなら、とっとと新しい環境に移行したいところですね。

2012-04-08

python のローカル変数のスコープと functools.partial

今日のネタは、ちょっとミスって python でエンバグしてしまったので、自戒の意味を込めたメモです。

失敗コードは下記のようなものです(無関係の所はごっそり削ってます)。 引数 e や n の値に応じて、 go に関数オブジェクト (か None) を設定しています。

def in_stage(e=None, n=None):
	go = None
	if (e != None):
		go = lambda: goto_event_stage(e)
	elif (n != None):
		go = lambda: goto_normal_stage(n)

	...(実際の処理本体。途中で go() を使う)...

これ、何が悪いかというと、下のほうの処理本体部分で e や n を変更してしまうと、goを呼んだ時の挙動が変わってしまうんですね。

	# 関数の引数が n=3 だった場合
	go()   # goto_normal_state(3) が呼ばれる

	n = 100 # でも途中で n を変更してしまうと
	go()   # goto_normal_stage(100) が呼ばれる

この現象は lambda 式ではなく、def による名前持ちの関数であっても起こります。

原因と対処法

python のローカル変数は、関数の引数に定義されていたり、代入を行った場合には、そのスコープ内に作成されます。本件の例では、代入は lambda の中では行われていないので、新規の変数は作成されません。登場するすべてのローカル変数は、親の in_stage のものであり、lambda 内で参照している x は、親スコープの x そのものです。

この辺、若干 Python 固有なので、幾つか例を挙げてみます。


x = 3
def func():
	print x    #「3」が出力される。 この x は、親のx
func()
def func():
	print x    #「3」。実行時点では親のxは値を持っているのでエラーじゃない
x = 3
func()

def func():
	x = 5   # func 内で x を代入したので、
	        # func 内の x と親の x は別物になった
	print x    # 「5」
x = 3
func()
print x   # 「3」親の x は変更されていない

まあ、ここまでは良い。ここからがややこしい。

x = 3
def func():
	print x   # エラー: x は未初期化。なぜなら……
	x = 5     # ここで代入しているので func 内での x はすべて
	          # 親の x とは別物になったので。
func()
x = 3
def func():
	x += 1   # 同上。エラー: x は未初期化。 この文は x = x + 1 と同義だが、
	         # 「代入しているのでxは全部別物」で、「まだ設定してないxを読んでいる」のでエラー
	print x
func()

名前のバインドの扱いが他の言語と若干違うんですよね。代入されたタイミングで新しくバインドされるのではなく、そのスコープ全体に遡って適用されるというか。同じスコープ内で、同じ名前が指しているものは同じものという保証がある、というか。

じゃぁ、クロージャ的なものはどうするのよ、って話ですが、「そういった『個々に固有の値を保持するオブジェクトを作りたい』のであれば、ちゃんとオブジェクトを作って、オブジェクトに持たせなさい」というのが、python の流儀なのではないかと。

実装例はこちらの方が書かれています。→ 最もタメになる「初心者用言語」は Python! - 西尾泰和のはてなダイアリー

例が2つ挙げられていますが、いずれの例でも、覚える値はオブジェクトのアトリビュートとして保持しています。

私はどう書くべきだったか

……と、ここで終わってはいけません。

クロージャを書くことが目的ではないのですから。より問題に合った、わかりやすい記述を求めるべきでしょう。

やりたかったのは「go という変数に、goto_normal_stage(3)とかを呼んでくれる関数オブジェクトを設定したい」という物ですが、もし、goto_normal_stage_3() みたいな関数が定義済みであれば

if n = 3: go = goto_normal_stage_3

とでも書けば良かったわけです。

つまり goto_normal_stage_3 のような、「(一部の)引数が設定済みの関数オブジェクト」を動的に作る方法が欲しかった訳です。これは関数型言語で言う所の「部分適用」です。

そのための道具は functools に partial() として既に用意されています。

from functools import partial
def in_stage(e=None, n=None):
	go = None
	if (e != None):
		go = partial(goto_event_stage, e)
	elif (n != None):
		go = partial(goto_normal_stage, n)

これですっきり。

2011-12-27

Google Code Jam Japan 2011 景品: Goo

python で Google Quine を再現させてみた

先日(といっても2ヶ月以上前ですが)参加した Google Code Jam Japan 2011 ですが、おかげさまで入賞することが出来、右の写真の通り、記念Tシャツを頂くことが出来ました。主催された方々、参加された皆さん、楽しい時間をありがとうございました。

ところで、このTシャツにプリントされたAAですが、これは ruby のスクリプトになっていて、これを実行すると自分自身のソースコード(つまりこのAAそのもの)を出力するという変わった性質をもったコードになっています。このようなプログラムのことを quine と呼ぶのだそうで、このTシャツのコードの作りかたは、Code Jam Tシャツについてに詳しく書かれています。

今回は、この Google Quine を ruby ではなく python で再現させてみよう、というネタです。

(余談ですが、このTシャツを受け取ってネタを思いついた翌日にはがっつり時間をかけてコードを完成させていたのですが、ブログのエントリへのまとめは2ヶ月も放置していたという……)

目標とするのは

  1. きちんと Quine になっていること
  2. 元ネタと(可能な限り)同形であること
  3. 元ネタと同等の機能(HTMLカラー出力オプション)を持っていること

です。

Python をご存知の方は判ると思いますが、Pythonはこのようなコードを書くのには全然向いていない特徴を持った言語です。やる前はかなり無謀な気がしていたのですが、いざやってみたらなんとかなってしまって、機能拡張も出来てしまいました。

今日の記事は完成品の紹介と、このネタをやるうえで障害となったPythonの特徴やそれに対するテクニックの紹介の紹介です。

完成品

先に完成形を出しておきましょう。こんなかんじです。

\
        q='''f=l                                          ambda               
     (l,r),(p,m):(                                         l+d(m              
    )+r[:    p],r[p                                         :]);              
   v=(_        _im                                          port              
  __('          s                                           ys')              
 .arg                 v+[0])[1      ];c=chr;      s=c(32);  n=c(   10);y=x  TM
 =c(9                2);l=c(60)    ;g=lambda(   n):c(27)+'  [%sm  '%(n+30*(   
 n!=0               ));d     ,t=( (lam    bda(  n):   '','  '),( g,'   '),(l  
 ambd               a(n)     :l+' fon      t'+s+'co    lor  =%s> '%['black    
 ','r        ed','g ree      n','yell       ow' ,'b    lue  '][n ],l+'pr      
  e>')       )[(v=='-h')      *2+ (v=       ='- e')]  ;pri  nt(t +y+          
  n+red        uce(  f,z     ip(( 20,1     4,13  ,11,7,11   ,3), (4,1         
   ,3,4,       2,1,  0))*   15+[(  990,   4)],     ('',s    *8+'  q='+"  '"   
     *3+q+"'"*3+";"   +s*6+"8;"     +y+n+s*46     +"exec"   +s*6+  "(''"+n+   
       s*47+"+''.j      oin(q        ."+n+s     *48+"split  ()))"   ))[0]+    
                                              d(0))    ;88;
                                              ''';      8;\
                                              exec      (''
                                               +''.join(q.
                                                split()))

このコードは Python 2.x用です。元ネタと同様、そのまま実行で自分自身を出力し、引数に-hを渡すと色付きHTMLで出力されます(元ネタみたく大量のタグを出したりしないので、Firefoxでもきちんと表示できます)、ついでに、-eでANSIエスケープシーケンスによる色付きテキストを出す機能も付けました。


形に関しては、1行目のバックスラッシュ1文字以外は元ネタと完全に一致するようにしています。それぞれの機能確認方法はこんなかんじ:

(元ネタと同様、quine となっているか)
$ diff -u google_quine.rb <(ruby google_quine.rb)
$ diff -u google_quine.py <(python google_quine.py)

(元ネタと同形になっているか)
$ diff -ub <(sed 's/[^ ]/x/g' google_quine.py) <(sed 's/[^ ]/x/g' google_quine.rb)

(HTMLの出力が出来ているか)
$ python google_quine.py -h > tmp.html
$ firefox tmp.html
$ diff -ub google_quine.py <(w3m tmp.html)

(エスケープシーケンスでの色付けが出来ているか)
$ python google_quine.py -e
$ diff -u google_quine.py <(python google_quine.py -e | sed "s/\x1b[^m]*m//g") 

python google_quine.py -h の場合:

f:id:over80:20111227162311p:image

python google_quine.py -e の場合:

f:id:over80:20111227162310p:image


インデントに意味がある

python はインデントで制御構造を表すため、インデント量には非常に強い文法的制約があります。

元ネタのように、コードの殆どを文字列定数に埋めてしまうにしても、「少くとも最初の1行は必ずインデント無しで始める」必要があるのです。

色々考えてはみたものの、さすがにこれだけは仕方が無いので、極力目立たない最小の表現に留める事で妥協しました。

方法は幾つかありますが、結局採用したのは、完成形で示したように、頭にバックスラッシュを1文字だけ追加する形です。このようにすれば、このバックスラッシュの位置が最初の論理行のインデント量を指定することになるため、次の物理行は先頭に幾つスペースが入っていても問題無くなります。

\
                  print "ここの物理行の字下げ量は自由"

他の手段としては、開き括弧を置いて暗黙の継続行とする方法があります。ただ、妥協した部分に必要以上の機能を持たせたく無いのと、括弧の中では「Statementが書けない」のを嫌って、バックスラッシュの方を採用しました。

代入など、多くの構文が Statement である

多くの C 言語 like な言語では、代入文は「副作用のある式(expression)」ですが、 python の代入文は statement です。そのため、「代入しつつ何かをやる」という表現が出来ません。また、printもpython2ではstatementですし、evalはexpressionである変わりにexpressionしか実行できないので、statement版のexecという別構文があります。

本家の解説で使われている

eval(q="puts q")

も、同じ事をするには

q="print q";exec q

と、2文に分けて書くことになるわけです。

改行を含む文字定数は専用の書式が必要

元ネタの方では ruby の "%p" というフォーマット文字列を使用していますが、python にも同等機能の "%r" という物があります。

>>> print 'quoted(%r)' % 'test'
quoted('test')

文字列に対してこの書式として出力されるのは上記の通り「単一のシングルクオートで囲んだ文字列」です。ところが、このプログラムで出力したいのは複数行に渡る文字列定数なので、引用符で三重に包んだpython独特の記法で出力してやる必要があります。

これに対しては、%r を使わず、データ処理で必要な出力を得る事にしました。

>>> print 'quoted('+"'"*3+'test'+"'"*3+')'
quoted('''test''')

この制限は一見するとquineを作る上でのデメリットに見えますが、文字列定数の中のコードでシングルクオートもダブルクオートも使えるようになる(三重にさえしなければ)ので、メリットでもありますね。

アルファベットのキーワードが多い

読みやすさが正義のPythonは、構文に記号が少ない事でも有名です。例えば論理和や論理積を使った演算は C では

result=a&&b;

のようになるところが、pythonでは

result=a and b

となり、文字数がかさみがちなうえに、普通に書くとキーワード間にスペースを入れなければなりません。

また、print文も、Python 2.x では expression ではなく statement なので、例えば文字列変数 a を出力しようと思ったら、普通はこう書くでしょう。

print a

ところが、元ネタを真似て、「ソースには整形用のスペースを埋めて、実行時にはそれを取り除いてevalする」という作戦で行くためには、文法上スペースが必須の構文を含めるわけには行きません。

このような場所でスペースが必要になるのは、変数名や関数名、アルファベットのキーワードなどが隣り合っている所で、字句解析上区別さえ付けば間のスペースは省略できますので、変数を括弧を挟むなど工夫してスペースを無くしました。

result=(a)and(b)
print(a)

lambda 式の仮引数や、リスト内包表記なども同様です。

lambda a,b:a+b   # スペースあり
lambda(a),b:a+b  # スペースなし
[x**2 for x in range(10)]     # スペースあり
[x**2for(x)in(range(10))]    # スペースなし

(なお、lambda(a,b):a+b だと意味が変わってしまいます(引数がタプル1個という事になる)。reduceなどの引数に渡すには不適切です。)

今回のコードを書く上では、殆どはこの対策で十分ですが、唯一、import 文だけはこの手が使えません。

import sys

import 文に書くモジュール名は代入の左辺値でも文字列でも無いので、勝手に括弧で括ったりする事は出来ません。import を使わないで済めば良かったのですが、元ネタと同じ機能にするためには起動時の引数を参照するために sys.argv が必要です。

これに対しては、 404 Not Found の「import文を式にする」を参考にして、下記のような表記を使う事でスペース無しで記述するようにしました。

v=__import__('sys').argv

制御構造文が使えない

Pythonでワンライナー的なコードを書こうとすると、制御構造文(forやifなど)は厄介です。

print 文などと同様にすれば 1行で書く事も出来なくは無いですが、ループの終端や、else節の終端を指定できないので、特殊なケースを除き、使用できません。

for(x)in(range(10)):print(x);print"end";  # endは10回表示される

とはいえ、今回作成するのは、プログラムのフローとしては単純な物で、分岐は引数処理のみ、反復も文字列処理だけなので、対処は容易です。

for文は意味を考えてリスト内包表記などに置き換えれば良いでしょう。

print"".join(str(x)for(x)in(range(10))),"end"

条件分岐は、and/or による振り分けか、3項演算子を用いることで表現できます。

(condition)and"TRUE"or"FALSE"
"TRUE"if(condition)else"FALSE"

キーワードがアルファベットなので、各項は括弧で括るなど、スペース削除対策は必要です。

ただ、今回のプログラムでは、プログラムの引数に応じて3分岐(引数無し = そのまま。"-h" = HTML 出力。 "-e" = エスケープシーケンス出力)となるので別の方法を使いました。

Python では「論理式の評価結果である bool 型は int の派生型で、True = 1, False = 0 と評価出来る」という定義なので、この特徴を利用して以下のような形で処理しました。

(d,t)=(
  (引数無しの時にdに設定したい値, 引数無しの時にtに設定したい値)
  ("-e"の時にdに設定したい値, "-e"の時にtに設定したい値)
  ("-h"の時にdに設定したい値, "-h"の時にtに設定したい値)
)[(v=="-h")*2+(v=="-e")];

条件によって変わる所を変数化しておいて、その変数をテーブル引きで一気に設定するわけです。

(なお、最終プログラム中では d は色換えコードの出力関数(lambda式)で、t は先頭に静的に出力するヘッダ文字列です。)

文字列の色付け加工処理

文字列に色を付ける際には、文字の色を付けたい所に適宜、HTMLタグなりエスケープシーケンスなりを埋めるわけですが、このような加工処理は関数型言語(Lisp とか)みたく、lambda式によるフィルタを行うつもりで書くと、大抵簡潔なワンライナーになります。

たとえば、こんな関数を定義してやれば、

f=lambda(l,r),(p,m):(l+m+r[:p],r[p:])

これは「タプル(l,r)に対して、lの後ろにmを足したうえで、rの先頭c文字をlに移す」というフィルタ関数なので、

>>> f(("","testing"),(3,"+"))
('+tes', 'ting')
>>> f(f(("","testing"),(3,"+")),(2,"-"))
('+tes-ti', 'ng')

という風に多段に適用することで、文字を少しずつ加工しながら進めていけますし、さらにタプルのかけ算と reduce 関数を使えば

>>> reduce(f,((3,"+"),(2,"-"))*4,("","sample long string"))[0]
'+sam-pl+e l-on+g s-tr+ing-'

と、繰り返しの加工処理も簡単に書けてしまいます。


仕上げ

変数qに包まれていない最初と最後のコードは配置の制約が厳しいので、先に調整しておいて、それ以外の場所は適当なまま一通りの機能を実装させたのがコチラです。

\
        q='''
s=chr(32);
n=chr(10);
y=chr(92);
l=chr(60);
v=(__import__('sys').argv+[0])[1];
h=v=='-h';
e=v=='-e';
g=lambda(n):chr(27)+'[%sm'%(n+30*(n!=0));
d,t=(
(lambda(n):'',''),
(g,''),
(lambda(n):l+'font'+s+'color=%s>'%['black','red','green','yellow','blue'][n],
 l+'pre>')
)[h*2+e];
f=lambda(l,r),(p,m):(l+d(m)+r[:p],r[p:]);
r=y+n+s*8+'q='+"'"*3+q+"'"*3+";"+s*6+"8;"+y+n+s*46+"exec"+s*6+"(''"+n+s*47+"+''.join(q."+n+s*48+"split()))";
r=reduce(f,[(2,0)]+[(20,4),(14,1),(13,3),(11,4),(7,2),(11,1),(3,0)]*15+[(999,4),(0,0)],('',r))[0];
print(t+r);
                                              ''';      8;\
                                              exec      (''
                                               +''.join(q.
                                                split()))

この状態で quine としては成立していますが、まだ微妙に文字数をオーバーしているので、変数をインライン展開したり、細かな表現をかえたりして文字数内に収め、整形すれば完成です。

このプログラムで、これまでの所で説明していない細かい点を挙げるとすると

  1. argvの長さチェックは大変なので、末尾にゴミを足してチェックを不要に
  2. スペース、改行、バックスラッシュ、HTMLタグの不等号などは、そのまま書くわけには行かないので、chr 関数で生成
  3. 文字定数にかけ算で繰り返しを表現(スペースを沢山並べるのに使用)

ぐらいでしょうか。

こうして眺めれば、大したことはやっていないのが判ると思います。

python 3 だとどうなる?

ちなみに、このコードは python 3.x では動作しません。

python 3 と聞くと print 文の関数化あたりを思い出す方も多いと思いますが、このプログラムではスペース文字削除の都合で、print文の引数を括弧で括っているので、そのままで OK です。

じゃぁどこが問題か、というと、Python 3 では reduce がビルトイン関数では無くなったのと、lambda式の仮引数でタプルをアンパックしてくれなくなった所が原因です。

reduce は import すれば使えるとはいえ、文字数がかさんでしまいますし、lambda式の方はスペース文字の削除が不可能になってしまうため、根本的に考えなおす必要があります。

言い換えると、この辺りの機能は読みやすさを信条とする python として、公式に邪教認定されたという事なので、真似して使ったりしないようにしましょう(笑)。

なぜ制限が厳しいのに機能を増やせたのか?

python は ruby にくらべると、このような曲芸コードを書くのには向かない、文字効率が悪くて制限の多い言語なわけですが、rubyで書かれた元ネタよりも python で書かれたこちらの方が高機能な物が書けてしまいました。

それはなぜか。

それは、整形の処理に対するコードの方針が大きく異なったからです。

元ネタの方はTシャツに印字された物を手打ちしてもきちんと動作しますが、私の物の方はスペースを厳密に入力しないとおかしくなるので、印刷されたスペース文字を心眼で読める人以外はTシャツからの再現は出来ません。元ネタの方はこの弱さを嫌ってわざと面倒な方針をとったのではないかと考えられます。

元ネタの解説でも、このブログのエントリでも、その部分は詳しくは書いていませんので、気になった方はそれぞれのコードを解析してみるのも良いかと思います。

といったところで、今回のエントリはこれまで。たぶん年内はこれが最後の投稿になります。皆様よいお年を。

2011-10-07

Google Code Jam 向け python template

明日は Google Code Jam Japan 2011 本戦です。

予選はなんとか突破できたので、Tシャツ目指してもうひと頑張りしてみます。

予選の前日にpython 向けテンプレートを公開しましたが、いざ予選で使ってみると、若干不満が残ったので修正してみました。修正箇所は以下の通りです。

  1. 便利なライブラリを幾つか import
  2. "-m" でマルチプロセス処理時に Ctrl + C で止められるようにした
  3. "-q" を付けると、print 文による画面出力を抑制できるようにした
  4. solve_xxx のような名前で複数の solver を実装しておいて、"-c"を付けて実行すると、それぞれの solver の出力を比較して相違が無いかを確認してくれるようにした

最後の奴は、「とりあえず small を力技で解く」とかやってしまった場合用です。

この場合、作りなおすなりリファクタリングするなりして、Large に耐える解答を作らなければならないわけですが、smallを正解できた solver を solver_small とでも rename しておけば、作りなおした関数が間違っていないかは -c を付けるだけで確認できるわけです。

まあ、いろいろ小細工を足してみましたが、最初から素直に正しい解答が書ける人なら無用の長物ばかりですね。orz

#!/usr/bin/env python2.7

import getopt
from collections import Counter, namedtuple
from itertools import *
import operator
import multiprocessing
import os.path
from random import *
import sys
from copy import deepcopy

# utils
def read_numbers():
    return map(int, infile.readline().split())
def read_string():
    return infile.readline().rstrip('\r\n')
def list2str(*array):
    return ' '.join(map(str,array))

############################################################

def read_precond():
    # NoTC is number of testcases.  others are preconditions.
    (NoTC,) = read_numbers()
    return locals()

# testcase parser
def read_testcase():
    # X,Y = read_numbers()
    # Ls = read_string()
    # Coff = namedtuple("Coff", ["x_id", "count", "limit", "satisfy"])
    #  for i in xrange(N):
    #    coffs.append(Coff(*([i,] + read_numbers())))
    # del i,x, y
    return locals()

# solver
def solve(Ls,**rest):
    # TODO: modify args to receive input data

    # idioms:
    #  Ls = [a*a for x in range(3,10,2)]
    #  map(lambda a,b: a*b, Ls, Ts)
    #  filter(lambda a: a!=1, Ls)
    #  product(range(1,X+1), range(1,Y+1))
    #  reduce(lambda s,a: s*2+a, range(3,5), 0)
    #  seq.sort(key=operator.attrgetter("key"), reverse=True)
    #  seq.sort(key=lambda a: (a.key, d[a]), reverse=True)
    return list2str(rest)

############################################################
# random testcase
def rand_testcase():
    raise NotImplementedError, "Please implement rand_testcase() by yourself."
    #N = randint(5,1000)
    #N = randrange(5,1000+1,1)
    #chars = [chr(x) for x in range(ord('a'), ord('z')+1)] + [' ']
    #Ls = [choice(chars) for x in range(1000)]
    #Ds = sample(range(10), 2)
    #del chars, x
    return locals() 

############################################################
#
#  Note:
#    Following lines are come from the template
#    which is distributed on the blog article:
#      http://d.hatena.ne.jp/over80/20111007/1318002963
#
#    There are no problem specific codes from here to the end of file.
#

opt_single = True
opt_random = 0
opt_call_all_solvers = False
opt_quiet = False

try:
    opts, args = getopt.gnu_getopt(sys.argv[1:], "mr:cq")
    for o, a in opts:
        if o == '-m': opt_single = False
        elif o == '-r': opt_random = int(a)
        elif o == '-c': opt_call_all_solvers = True
        elif o == '-q': opt_quiet = True
        else: raise Exception("unknown option(%s)" % (o,))
    outfilename = None
    if not opt_random:
        infilename = args[0]
        outfilename = os.path.splitext(infilename)[0] + ".out"
        infile = open(infilename, 'r')
except Exception, e:
    print >>sys.stderr, e
    print >>sys.stderr, 'Usage: %s [-c] [-q] [-m] <inputfile>' % (sys.argv[0])
    print >>sys.stderr, '       %s [-c] [-q] [-m] -r num' % (sys.argv[0])
    exit(1);

if opt_random:
    precond = dict()
    num_tests = opt_random
    get_testcase = rand_testcase
else:
    precond = read_precond()
    num_tests = precond.pop("NoTC")
    get_testcase = read_testcase

# set output target
outfile = [sys.stdout,]
if outfilename != None and not opt_call_all_solvers:
    outfile.append(open(outfilename, 'w'))
if opt_quiet:
    sys.stdout = open('/dev/null', 'w')

# prepare testcase iterator
def read_testcases(num_tests):
    for i in range(num_tests):
        yield get_testcase()
testcases = read_testcases(num_tests)

# prepare solve iterator
def call_solve(q):
    d = dict(precond)
    d.update(q)
    if opt_call_all_solvers:
        res = set()
        for name, func in solvers:
            r = func(**deepcopy(d))
            print >>sys.stderr, "SOLVER:", name, "  \tANSWER:", r
            res.add(r)
        if len(res) != 1:
            raise RuntimeError, "ansers does not match"
        return r
    else:
        return solve(**d)
def call_solve_trap(q):
    try:
        return call_solve(q)
    except KeyboardInterrupt:
        pass
if opt_single:
    res = imap(call_solve, testcases)
else:
    p = multiprocessing.Pool()
    res = p.imap(call_solve, testcases)

# solve & print
for (i, r) in enumerate(res, start=1):
    for f in outfile:
        print >>f, "Case #%d: %s" % (i, str(r))

2011-10-06

アルゴリズム問題に効くPython小ネタ

今週末の Google Code Jam Japan 2011 の決勝に向けて練習でもしておきたい所ですが、時間がなかなか取れないので、代わりにテクニックを紹介するブログエントリを書くことにしました。(代わりになってない、という意見は却下です)

namedtuple

namedtuple は python の標準ライブラリとして同梱されているクラスの一つです。

from collections import namedtuple

名前付きタプルという名が表す通り、タプルなのですが、タプルの各要素を名前で参照出来るようになります。たとえば、

Point3D = namedtuple('Point3D', 'x y z')
Range = namedtuple('Range', 'start end')

こんな感じで型を宣言します。そうすると、

p = Point3D(1, 2, 3)
r = Range(start=10, end=30)

こんな風に実体化できます。

もちろん、タプルなので、要素には添字アクセスできますし、それに加えて名前でもアクセスできます。

a = p[0] + p[1] + p[2]
a = p.x + p.y + p.z

もちろんタプルは immutable なので辞書のキーにしたり、setで多重値を取り除いたり出来ます。

d = dict()
d[p] = a

s = set()
s.add(Point3D(1, 2, 3))
s.add(Point3D(1, 2, 3))
assert(len(s) == 1)

ちょっと難しい話になりますので詳細は省略しますが、この型のオブジェクトは通常のオブジェクトよりも軽量に作られているので、大量のデータがあっても素の tuple に引けをとらない効率を誇ります。

使いどころとしては、専用のメソッドを持たなくても良いような簡単な構造データを表現するのに丁度良いかんじでしょうか。

Code Jam では、問題の入力データが構造を持っている時に、それを読み込む先を個別の配列にせず、コレで構造化しておくと、コードがすっきりして読みやすくなるでしょう。

N[i], M[i] のかわりに、a = r[i]; r.begin, r.end などとするわけです。

任意の条件でソート

さて、その namedtuple で定義したデータの配列などを持っている時、それのソートはどうすれば良いでしょうか。

単に tuple のリストをソート系関数に渡すと、タプルの各要素を順に比較し、最初に差が出た要素の大小で比較した結果でソートされます。

>>> range_list = [(i % 3, i, i % 2) for i in range(5)]
>>> range_list
[(0, 0, 0), (1, 1, 1), (2, 2, 0), (0, 3, 1), (1, 4, 0)]
>>> sorted(range_list)
[(0, 0, 0), (0, 3, 1), (1, 1, 1), (1, 4, 0), (2, 2, 0)]

この動作を調整するには、配列の sort() や sorted() に cmp や key といった引数を渡すことで操作できます。特に、key の方は上記の動作が頭に入っているとすごく判りやすく書けます。

たとえば、タプルの3番目の要素でソート(同じ値なら現在の順番でそのまま)なら

>>> range_list
[(0, 0, 0), (1, 1, 1), (2, 2, 0), (0, 3, 1), (1, 4, 0)]
>>> sorted(range_list, key=lambda x: x[2])
[(0, 0, 0), (2, 2, 0), (1, 4, 0), (1, 1, 1), (0, 3, 1)]

タプルの3番目の要素でソートし、同じ値なら1番目の要素、さらに同じなら2番目でソートするなら、

>>> range_list
[(0, 0, 0), (1, 1, 1), (2, 2, 0), (0, 3, 1), (1, 4, 0)]
>>> sorted(range_list, key=lambda x: (x[2], x[0], x[1]))
[(0, 0, 0), (1, 4, 0), (2, 2, 0), (0, 3, 1), (1, 1, 1)]

3つの要素の積でソートするなら

>>> range_list
[(0, 0, 0), (1, 1, 1), (2, 2, 0), (0, 3, 1), (1, 4, 0)]
>>> sorted(range_list, key=lambda x: x[0] * x[1] * x[2])
[(0, 0, 0), (2, 2, 0), (0, 3, 1), (1, 4, 0), (1, 1, 1)]

と、ラムダ関数と組み合わせて、最短の記述で自在なソートが実現できます。

ソートはアルゴリズム問題の基本なので、覚えておいて損はありません。

とはいえ、競技プログラミングでは最小さえ判れば良くて heapq が使われたりするので、意外と活躍の場は少ないんですが。(私のように富豪プログラミング寄りなコードを書く人間は高々100個程度な要素なら毎回ソートも気にしません。)

2011-10-01

Code Jam Japan 2011 報告

昨日書いた Google Code Jam Japan 2011 の予選が先程おわりました。

問題サイズに対する見積りがあまくて、最初の2問の Large を速攻で落とすという痛い失態をかましてしまい、3問目だけは慎重に解いてなんとか正解して本戦に残ることが出来ました。

Large は提出期限が切れた後も解き続け、予選終了後の演習モードで再提出してみたら、ちゃんと正解できましたので、本番もそのくらい慎重にやれ、ってことですかね。

とりあえず、(ちゃんと解いたんだぞ、という証明代わりに)全3問の解答と解説をこちらに貼っておきます。(あ、昨日貼ったテンプレートを使用しているので、実際に問題を解いているのは solve 関数の中だけですよ。)

あとで解いてみようと思っている人は、以下は読まないように注意!

(2011-10-02 追記: ちゃんとデキる人が書いてくれている解説を見つけたので、参考にする人はそちらをどうぞ → 不確定な Android Blog - Google Code Jam Japan 2011 予選終了 )

A. カードシャッフル

シャッフルマシーンが特定のパターンでカードをシャッフルした時のカード順を計算する問題。

カードの配列を持って、指定通りに並びかえれば、small は解けますが、Large だと配列のサイズが巨大になって扱いきれません。

問題の数値配分だと、カードの山は十分に切れず、連番数字があちこちにあるままになりますので、連番数字の部分を短縮して表現することで扱いやすくしています。

具体的には、連番部分を「(先頭の数字,続いている枚数)」で表し、カード山全体はその配列で表しました。

例えば 1〜100 のカードは

[ (1,100) ]

70〜100, 1〜69 と並んだカードは

[ (70,31), (1,69) ]

といった具合です。

あとは、このデータ型のまま「カードを切る」処理を実装してやればOK。

カードのカット回数は高々100回なので、配列の長さは最大でも 201 になります。Small と Large で変わるのはカードの総数だけなので、この形式で処理してやればは処理時間は同じです。

#!/usr/bin/env python2.6

import getopt
from collections import deque, defaultdict, namedtuple
from itertools import *
import multiprocessing
import os.path
from random import *
import sys

# utils
def read_numbers():
    return map(int, infile.readline().split())
def read_string():
    return infile.readline().rstrip('\r\n')
def list2str(*array):
    return ' '.join(map(str,array))

############################################################

def read_precond():
    # NoTC is number of testcases.  others are preconditions.
    (NoTC,) = read_numbers()
    return locals()

# testcase parser
def read_testcase():
    (M,C,W) = read_numbers()
    As = []
    Bs = []
    for c in range(C):
        (a,b) = read_numbers()
        As.append(a)
        Bs.append(b)
    del a, b
    return locals()

CardSequence = namedtuple('CardSequence', ['start', 'count'])

# solver
def solve(M,C,W,As,Bs,**rest):
    cards = (CardSequence(1,M),)
    for c in xrange(C):
        cards = cut(cards, As[c], Bs[c])

    for start,count in cards:
        if W <= count:
            return str(start + W - 1)
        else:
            W -= count 
            continue

def split_single(card, p):
    a, b = card
    if p == 0:
        return ((), (card,))
    elif p == b:
        return ((card,), ())
    else:
        return ((CardSequence(a,p),),(CardSequence(a+p,b-p),))

def split(cards, p):
    s = 0
    for i in xrange(len(cards)):
        if s + cards[i].count == p:
            return (cards[:i+1], cards[i+1:])
        if s + cards[i].count > p: 
            h1,h2 = split_single(cards[i], p - s)
            return (cards[:i] + h1, h2 + cards[i+1:])
        s += cards[i].count

def cut(cards, a, b):
    g1, g2 = split(cards, a - 1)
    g2, g3 = split(g2, b)
    return g2 + g1 + g3


############################################################
# random testcase
def rand_testcase():
    raise NotImplementedError, "Please implement rand_testcase() by yourself."
    #chars = [chr(x) for x in range(ord('a'), ord('z')+1)] + [' ']
    #Ls = [choice(chars) for x in range(1000)]
    #del chars, x
    return locals() 

############################################################

opt_single = True
opt_random = 0

try:
    opts, args = getopt.gnu_getopt(sys.argv[1:], "mr:")
    for o, a in opts:
        if o == '-m':
            opt_single = False
        elif o == '-r':
            opt_random = int(a)
        else:
            raise Exception("unknown option(%s)" % (o,))
    if opt_random:
        outfile = (sys.stdout,)
    else:
        infilename = args[0]
        outfilename = os.path.splitext(infilename)[0] + ".out"
        infile = open(infilename, 'r')
        outfile = (sys.stdout, open(outfilename, 'w'))
except Exception, e:
    print >>sys.stderr, e
    print >>sys.stderr, 'Usage: %s [-m] <inputfile>' % (sys.argv[0])
    print >>sys.stderr, '       %s -r num' % (sys.argv[0])
    exit(1);

if opt_random:
    num_tests = opt_random
    get_testcase = rand_testcase
else:
    precond = read_precond()
    num_tests = precond.pop("NoTC")
    get_testcase = read_testcase

# prepare testcase iterator
def read_testcases(num_tests):
    for i in range(num_tests):
        yield get_testcase()
testcases = read_testcases(num_tests)

# prepare solve iterator
def call_solve(q):
    d = dict(precond)
    d.update(q)
    return solve(**d)
if opt_single:
    res = imap(call_solve, testcases)
else:
    p = multiprocessing.Pool()
    res = p.imap(call_solve, testcases)

# solve & print
for (i, r) in enumerate(res, start=1):
    for f in outfile:
        print >>f, "Case #%d: %s" % (i, str(r))

B. 最高のコーヒー

複数種のコーヒーの賞味期限と残量と満足度を天秤にかけながら、最も満足度の総量が多くなる飲み方を求める、という問題で、多分3問中一番難しい問題です。

単に先に満足度の高い物を飲んでしまうと、次点の美味しいコーヒーの賞味期限が切れてしまいますので、解答のポイントは「最終日から逆順で飲むコーヒーを決めていくこと」になります。最終日まで賞味期限がもつ物の中で最も美味しい物をその日に飲むことにして予約し、その前日はそれを除いた残りから選んで……とくりかえせば良いわけです。

また、Large では日付けが異様に長いものがあるので、1日1日ずつ処理すると間に合いません。最終日に飲むコーヒーを決めたら、そのコーヒーは「いつからいつまで飲むのか」を数値で計算して、日付を飛ばしてやる必要があります。コーヒーの残り個数や次点のコーヒーの賞味期限などが複雑にからむので、ミスしやすく、気付きにくい所ですので、「BのLargeは提出したけど、不正解だった」って人も少くないかもしれません。

#!/usr/bin/env python2.7

import getopt
from collections import Counter, namedtuple
from itertools import *
import operator
import multiprocessing
import os.path
from random import *
import sys
from timeit import Timer
from copy import deepcopy

# utils
def read_numbers():
    return map(int, infile.readline().split())
def read_string():
    return infile.readline().rstrip('\r\n')
def list2str(*array):
    return ' '.join(map(str,array))

############################################################

def read_precond():
    # NoTC is number of testcases.  others are preconditions.
    (NoTC,) = read_numbers()
    return locals()

Coffs = namedtuple("Coffs", ["x_id", "count", "limit", "satisfy"])

# testcase parser
def read_testcase():
    (N, K) = read_numbers()
    cs = []
    ts = []
    ss = []
    coffs = []
    for i in range(N):
        (c,t,s) = read_numbers()
        cs.append(c)
        ts.append(t)
        ss.append(s)
        coffs.append(Coffs(i,c,t,s))
        #coffs.append(Coffs(*([i,] + read_numbers())))
    del i,c,t,s
    return locals()

# solver
def solve(N,K,coffs,**rest):
    left = Counter()
    for coff in coffs:
        left[coff] = coff.count

    total = 0
    k = K
    while k > 0:
        coffs.sort(key=lambda a: (min(a.limit,k), a.satisfy), reverse=True)
        if k > coffs[0].limit:
            k = coffs[0].limit
        # find next
        next_date = 0
        for coff in coffs:
            if coff.satisfy > coffs[0].satisfy:
                next_date = coff.limit
                break

        can_have = min(left[coffs[0]], k - next_date)

        #print total, next_date, coffs,k,can_have,left
        assert(can_have > 0)

        left[coffs[0]] -= can_have
        k -= can_have
        total += coffs[0].satisfy * can_have
        
        if left[coffs[0]] == 0:
            coffs.pop(0)
            if len(coffs) == 0:
                break

    return str(total)

def solve_small(N,K,cs,ts,ss,**rest):
    s_sort = sorted([(-ss[i],i) for i in range(N)])
    s_sort = [s_sort[i][1] for i in range(N)]
    total = 0
    for k in xrange(K,0,-1):
        for s in s_sort:
            if ts[s] >= k and cs[s] > 0:
                print "select: " + str(s)
                total += ss[s]
                cs[s] -= 1
                break
    return str(total)

solvers = filter(lambda a: a[0].startswith("solve"), globals().items())
solvers.sort()

############################################################
# random testcase
def rand_testcase():
    raise NotImplementedError, "Please implement rand_testcase() by yourself."
    #chars = [chr(x) for x in range(ord('a'), ord('z')+1)] + [' ']
    #Ls = [choice(chars) for x in range(1000)]
    #del chars, x
    return locals() 

############################################################

opt_single = True
opt_random = 0
opt_call_all_solvers = False

try:
    opts, args = getopt.gnu_getopt(sys.argv[1:], "mr:c")
    for o, a in opts:
        if o == '-m':
            opt_single = False
        elif o == '-r':
            opt_random = int(a)
        elif o == '-c':
            opt_call_all_solvers = True
        else:
            raise Exception("unknown option(%s)" % (o,))
    outfilename = None
    if not opt_random:
        infilename = args[0]
        outfilename = os.path.splitext(infilename)[0] + ".out"
        infile = open(infilename, 'r')
except Exception, e:
    print >>sys.stderr, e
    print >>sys.stderr, 'Usage: %s [-c] [-m] <inputfile>' % (sys.argv[0])
    print >>sys.stderr, '       %s [-c] -r num' % (sys.argv[0])
    exit(1);

if opt_random:
    num_tests = opt_random
    get_testcase = rand_testcase
else:
    precond = read_precond()
    num_tests = precond.pop("NoTC")
    get_testcase = read_testcase

# prepare testcase iterator
def read_testcases(num_tests):
    for i in range(num_tests):
        yield get_testcase()
testcases = read_testcases(num_tests)

# prepare solve iterator
def call_solve(q):
    d = dict(precond)
    d.update(q)
    if opt_call_all_solvers:
        res = dict()
        for name, func in solvers:
            res[name] = func(**deepcopy(d))
            print "SOLVER:", name, "  \tANSWER:", res[name]
        res = res.values()
        for r in res[1:]:
            if res[0] != r:
                raise RuntimeError, "ansers does not match"
        return res[0]
    else:
        return solve(**d)
if opt_single:
    res = imap(call_solve, testcases)
else:
    p = multiprocessing.Pool()
    res = p.imap(call_solve, testcases)

# open output file
if outfilename != None and not opt_call_all_solvers:
    outfile = (sys.stdout, open(outfilename, 'w'))
else:
    outfile = (sys.stdout,)

# solve & print
for (i, r) in enumerate(res, start=1):
    for f in outfile:
        print >>f, "Case #%d: %s" % (i, str(r))

C. ビット数

これは、与えられた数字 N を N = a + b の形に分解して、a と b を2進数表現した時の 1 のビットの数の合計が最大になる物を求める、という問題です。カンが働けば、コーディングは一番簡単でしょう。

紙に幾つか数字を書いて確認してみたところ、だいたいこんな感じで a と b を作ると、1が最大になるようでした。

N:  100100011001111
-------------------
a:   11111111111111
b:     100011010000

N の下から1が連続している部分と、それより上の部分に分けたうえで、a,bを眺めると、特徴がわかりますよね。なぜこうなるかは、Nの各桁の数字と、a+bの各桁の繰り上がりに注目すればわかるかと思います。

あとはこの数を数えるだけです。(下の解答は若干まわりくどい数えかたをしてしまっています。)

#!/usr/bin/env python2.6

import getopt
from itertools import *
import multiprocessing
import os.path
from random import *
import sys

# utils
def read_numbers():
    return map(int, infile.readline().split())
def read_string():
    return infile.readline().rstrip('\r\n')
def list2str(*array):
    return ' '.join(map(str,array))

############################################################

def read_precond():
    # NoTC is number of testcases.  others are preconditions.
    (NoTC,) = read_numbers()
    return locals()

# testcase parser
def read_testcase():
    (N,) = read_numbers()
    return locals()

# solver
def solve(N,**rest):
    b = list(format(N, 'b'))
    s = 0
    while len(b) > 0:
        if b[-1] == '1':
            s += 1
            b.pop()
        else:
            break
    if len(b) > 0:
        for x in b:
            if x == '1':
                s += 2
            else:
                s += 1
        s -= 1
    return str(s)

############################################################
# random testcase
def rand_testcase():
    raise NotImplementedError, "Please implement rand_testcase() by yourself."
    #chars = [chr(x) for x in range(ord('a'), ord('z')+1)] + [' ']
    #Ls = [choice(chars) for x in range(1000)]
    #del chars, x
    return locals() 

############################################################

opt_single = True
opt_random = 0

try:
    opts, args = getopt.gnu_getopt(sys.argv[1:], "mr:")
    for o, a in opts:
        if o == '-m':
            opt_single = False
        elif o == '-r':
            opt_random = int(a)
        else:
            raise Exception("unknown option(%s)" % (o,))
    if opt_random:
        outfile = (sys.stdout,)
    else:
        infilename = args[0]
        outfilename = os.path.splitext(infilename)[0] + ".out"
        infile = open(infilename, 'r')
        outfile = (sys.stdout, open(outfilename, 'w'))
except Exception, e:
    print >>sys.stderr, e
    print >>sys.stderr, 'Usage: %s [-m] <inputfile>' % (sys.argv[0])
    print >>sys.stderr, '       %s -r num' % (sys.argv[0])
    exit(1);

if opt_random:
    num_tests = opt_random
    get_testcase = rand_testcase
else:
    precond = read_precond()
    num_tests = precond.pop("NoTC")
    get_testcase = read_testcase

# prepare testcase iterator
def read_testcases(num_tests):
    for i in range(num_tests):
        yield get_testcase()
testcases = read_testcases(num_tests)

# prepare solve iterator
def call_solve(q):
    d = dict(precond)
    d.update(q)
    return solve(**d)
if opt_single:
    res = imap(call_solve, testcases)
else:
    p = multiprocessing.Pool()
    res = p.imap(call_solve, testcases)

# solve & print
for (i, r) in enumerate(res, start=1):
    for f in outfile:
        print >>f, "Case #%d: %s" % (i, str(r))

2011-09-30

Google Code Jam Japan 2011 明日開催

ご無沙汰してます。Over80です。

最後にこのブログにエントリを書いたのは3月頭ですから、半年ぶりの更新になります。

その間の事に関しては、書くと愚痴が多くなってしまうのでやめておきますが、震災の影響はいろいろと大きかったことと、私は直接被災はしておらず無事であること、あと、震災に遭われた方々へお見舞い申しあげます、とだけは言っておきます。

さて、今日の本題は、明日に開催をひかえた Google Code Jam Japan 2011 と、それに向けた Python の自作テンプレートの紹介です。

Google Code Jam Japan 2011 とは?

簡単に言うと、競技プログラミングコンテストで、明日午後1時にWebで公開される問題(複数)に対して、それを解くプログラムを作成して提出すればポイントゲット。沢山、早く解いた人が勝ち。みたいなコンテストです。

問題は比較的典型的なアルゴリズム問題(たとえば「迷路の最短経路を答える」のような)で、Webから問題データ(たとえば「迷路を表現したテキストデータ」)をダウンロードでき、それを作ったプログラムで解いて、答えのデータをアップロードすることで回答になります。

類似のコンテストと比べると以下のような個性的な特徴があります。

  • 予選から決勝までオンラインで開催される。自分の慣れたPCで集中して解けば良い。
  • プログラミング言語や実行環境に(事実上)制限が無い。事務局や他の参加者が確認できるよう、フリーで実行環境が入手可能なもの、という制限があるだけで、VisualStudio も Express があるので OK となっている。あまつさえ、表計算ソフトのような物で解くことも認められている。("プログラム"の代わりに、そのソフトをどう使ったかを説明するテキストの提出が求められますが。)
  • 今回の Japan 2011 は、日本語で開催される。大抵の大会(通常の google code jamも含む)は英語で開催され、問題文も英語なので、読むのに時間がかかる所が最初からハンデだったりする。

詳しくは Google Code Jam Japan 2011 のサイト を参照してください。(私の説明には誤謬があるかもしれません。)

上位入賞者には賞金や景品がありますので、気楽に参加してみてはいかがでしょうか?

日本語の例題は既に Code Jam Japan のサイトに上がっていますし、英語で良ければ過去問はこちらにあります。試しに解いてみるだけでも勉強になるかもしれません。

Python のテンプレート

さて、私もこれに参加してみようかと思い、過去の Google Code Jam の問題を幾つか解いてみたのですが、これ、問題の入力や出力がある程度定型フォーマット化していますので、予め parsing するコードなどは作成しておく事をおすすめします。

とりあえず私はこんな感じのコードを書いてみました。

これは Code Jam 2009, Qualification Round の Alien Language という問題 に対する回答です。

#!/usr/bin/env python2.6

import getopt
from itertools import *
import multiprocessing
import os.path
from random import *
import sys

# utils
def read_numbers():
    return map(int, infile.readline().split())
def read_string():
    return infile.readline().rstrip('\r\n')
def list2str(*array):
    return ' '.join(map(str,array))

############################################################

# random testcase
def rand_testcase():
    chars = [chr(x) for x in range(ord('a'), ord('z')+1)] + [' ']
    Ls = [choice(chars) for x in range(1000)]
    del chars, x
    return locals() 

# pre-condition parser
def read_precond():
    (L,D,N,) = read_numbers()
    Ds = set()
    for d in range(D):
        Ds.add(read_string())
    
    return (N, dict(Ds=Ds))

# testcase parser
def read_testcase():
    s = read_string()
    pat = []
    cs = None
    for c in s:
        if c == '(':
            cs = []
        elif c == ')':
            pat.append(cs)
            cs = None
        else:
            if cs != None:
                cs.append(c)
            else:
                pat.append([c])
    print pat
    return dict(pat=pat)

# solver
def solve(Ds, pat):
    i = 0
    for p in pat:
        Ds = filter(lambda a: a[i] in p, Ds) 
        i += 1
    return len(Ds)


############################################################

opt_single = True
opt_random = 0

try:
    opts, args = getopt.gnu_getopt(sys.argv[1:], "mr:")
    for o, a in opts:
        if o == '-m':
            opt_single = False
        elif o == '-r':
            opt_random = int(a)
        else:
            raise Exception("unknown option(%s)" % (o,))
    if opt_random:
        outfile = (sys.stdout,)
    else:
        infilename = args[0]
        outfilename = os.path.splitext(infilename)[0] + ".out"
        infile = open(infilename, 'r')
        outfile = (sys.stdout, open(outfilename, 'w'))
except Exception, e:
    print >>sys.stderr, e
    print >>sys.stderr, 'Usage: %s [-m] <inputfile>' % (sys.argv[0])
    print >>sys.stderr, '       %s -r num' % (sys.argv[0])
    exit(1);

if opt_random:
    num_tests = opt_random
    get_testcase = rand_testcase
else:
    (num_tests, precond) = read_precond()
    get_testcase = read_testcase

# prepare testcase iterator
def read_testcases(num_tests):
    for i in range(num_tests):
        yield get_testcase()
testcases = read_testcases(num_tests)

# prepare solve iterator
def call_solve(q):
    d = dict(precond)
    d.update(q)
    return solve(**d)
if opt_single:
    res = imap(call_solve, testcases)
else:
    p = multiprocessing.Pool()
    res = p.imap(call_solve, testcases)

# solve & print
for (i, r) in enumerate(res, start=1):
    for f in outfile:
        print >>f, "Case #%d: %s" % (i, str(r))

ちょっと長いですが、本番で問題を解く場合には上記のコードのうち中程にある、 read_precond (入力データの先頭にあるヘッダの読み込み) と read_testcase (個々のテストケースの読み込み) と solve (与えられたテストケースを解く) の 3 つの関数の中身を埋めるだけで良いようになっています。上記の例では高々30行です。

パースや整形出力は極力楽に出来るようデザインしています。この問題の場合、テストケースのフォーマットが特殊なので解析のコードが若干長くなっていますが、入力データが数字だけの場合は、

def read_testcase():
   Ds = read_numbers()
   return locals()

のように簡潔な記述ができますし、問題を解く solve も、引数で与えられたテストケースを解き、答えを return するだけで良く、問題に集中できるようにしてあります。

プログラムには引数として入力ファイルを与えて実行します。結果は入力ファイルの拡張子を .out に変更したファイルに、自動的に定型フォーマットに整形した形で出力されますので、そのまま提出できます。デバッグ情報を print しても、出力ファイルが汚される事はありません。

また、"-m"オプションを付けて実行するだけで、自動でマルチプロセスによる並列計算を行いますので、マルチコアCPUでもプロセッサを無駄にしません。さらに、問題に応じて rand_testcase 関数としてランダム問題ジェネレータを自分で実装すれば、入力ファイル無しで実行テストが出来ますので、Large のデータで回答するまえにパフォーマンスが十分かどうかの確認に使えます。

itertools をフルでインポートしてたりする辺りも密かにポイントです。このてのアルゴリズム問題を解くのに便利なツールが大量に詰まっているモジュールです。


とまぁ、いろいろ無駄に詰め込んだスケルトンコードなのですが、実はコレを作成したのは半年前、まさに震災の頃で、その時にこれを作って以降、練習はおろか、python すら触っていないので、明日の朝にかるくリハビリしてから予選に望もうかと思います。

(2011/10/08 追記: テンプレートを拡張しました → Google Code Jam 向け python template - 玉虫色に染まれ!)

2010-04-14

Pythonでネットラジオプレイヤーを書いてみた

今日のエントリは、Python + GStreamer + GTK2 + libnotify の組み合わせでネットラジオプレイヤーを作ってみましたので、そのソースコード公開&参考サイトメモです。

完成形

こちらが動作時の画面キャプチャです。

f:id:over80:20100414234635p:image

起動するとタスクバーのトレイ部分にアイコンが増え、自動でラジオの再生が始まります。アイコンの左クリックで停止/再生のトグル切り替え、右クリックでメニューが表示され別のラジオ局に切り替えたり出来ます。

ラジオからの曲情報は、曲が切り替わったタイミングでバルーン表示されますし、アイコンにマウスカーソルを合わせるとTipとしても表示されます。

ソースコード

使用したプログラミング言語は Python で 120 行ほどのコードになりました。

ライブラリとしては、音楽再生にGStreamer、トレイのステータスアイコンやメニュー表示に GTK2、曲名のバルーン通知に libnotify を、それぞれPythonバインディング経由で使用しています。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import pygst
pygst.require('0.10')
import gst
import gtk
import os
import pynotify

os.putenv("GST_TAG_ENCODING", "CP932");

class NetRadioPlayer(object):

    def __init__(self, stations):

        self._stations = stations

        self.menu = gtk.Menu()
        def add_image_menu(stock, func):
            item = gtk.ImageMenuItem(stock)
            item.connect("activate", lambda event: func())
            self.menu.append(item)
        add_image_menu(gtk.STOCK_MEDIA_STOP, self.stop)
        for (name, uri) in stations:
            def make_handler(uri):
                return lambda event: (self.change(uri), self.play())
            item = gtk.MenuItem(name)
            item.connect("activate", make_handler(uri))
            self.menu.append(item)
        self.menu.append(gtk.SeparatorMenuItem())
        add_image_menu(gtk.STOCK_QUIT, self.quit)
        self.menu.show_all()

        self.bin = gst.element_factory_make("playbin2")
        self.pipeline = gst.Pipeline()
        self.pipeline.add(self.bin)

        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_message)

        icon = gtk.StatusIcon()
        icon.connect('activate', self.on_icon_activate)
        icon.connect('popup-menu', self.on_icon_popup_menu)
        self.icon = icon

        if not pynotify.init("NetRadioPlayer"):
            print "failed to init pynotify"
            sys.exit(0)
        self._notify = pynotify.Notification("-", "-")
        self._notify.attach_to_status_icon(icon)

        self.change(stations[0][1])
        self.stop()

    def change(self, uri):
        if self.bin.props.uri != uri:
            self.stop()
            self.bin.props.uri = uri
            self._tags = gst.TagList()

    def play(self):
        self.pipeline.set_state(gst.STATE_PLAYING)
        self.icon.props.stock = gtk.STOCK_MEDIA_PLAY

    def stop(self):
        self.pipeline.set_state(gst.STATE_NULL)
        self.pipeline.get_state() # this will block until state changed
        self.icon.props.stock = gtk.STOCK_MEDIA_STOP

    def quit(self):
        self.stop()
        gtk.main_quit()

    def on_icon_activate(self, event):
        if self.pipeline.get_state()[1] == gst.STATE_PLAYING:
            self.stop()
        else:
            self.play()

    def on_icon_popup_menu(self, status, button, time):
        self.menu.popup(None, None, None, button, time)

    def notify(self):
        try:
            song = self._tags["title"]
            station = self._tags["organization"] + " " + self._tags["genre"]
            self._notify.update(song, station)
            if not self._notify.show():
                pass # FIXME
            tooltip = '<big>%s</big>\n%s' % (song, station)
            self.icon.props.tooltip_markup = tooltip
        except KeyError:
            pass

    def on_message(self, bus, message):
        t = message.type
        if t == gst.MESSAGE_EOS or t == gst.MESSAGE_ERROR:
            self.stop()
        elif t == gst.MESSAGE_TAG:
            tags = message.parse_tag()
            self._tags = self._tags.merge(tags, gst.TAG_MERGE_REPLACE)
            self.notify()

if __name__ == '__main__':
    gtk.gdk.threads_init()

    p = NetRadioPlayer([
        (u"BLACK ANGEL 同人音楽 東方SIDE",
         "http://std1.ladio.net:8070/doujin_tou.mp3"),
        (u"初音ミクたれ流し",
         "http://std2.ladio.net:8120/mikumikutare"),
    ])
    p.play()

    gtk.main()

Pythonに不慣れな人間が書いたコードなので、サンプルとしては不向きかもしれませんが、似たようなことをしようとしている人には参考になるかもしれません。

コードの解説と参考サイト

実は、コードの半分はメニュー対応のためのコードで、右クリックメニューのサポートを追加する前のコードは70行ほどでした。メニューアイテムを1つ1つ個別に作成してますし、項目ごとのハンドラも作らなきゃいけないので、どうしてもコードがかさんでしまうんですよね。

ラジオ局情報とタグの文字コード

ラジオ局情報はスクリプトの中に埋め込んでいます@コードの最後の方。これは私が聞く局が固定されているので問題無いのですが、そうではない人は、ねとらじなら ねとらじの技術情報のページ を参照して、ラジオ局リストを取得するコードを書けば良いんじゃないかと思います。

最初の方にある GST_TAG_ENCODING の指定は、以前書いたタグの文字化け問題の対策ですね。このスクリプトのプロセスの環境変数を直接いじってやることで、ユーザーの環境変数の設定に依存せず、Shift JIS を強制できます。タグの文字コードはラジオ局サーバーの設定次第なので、ラジオ局情報をコードに埋め込んでいるこのスクリプトの場合は、このように強制してしまって問題無いでしょう。

GStreamer

パイプライン構築は playbin2 に完全に任せているので、URIを指定したら STATE_PLAYING に移行させるだけです。

バスから来るメッセージは、エラー停止の時の念のための処理と、タグ情報が届いた時の処理を行っています。タグ情報はオフィシャルチュートリアルのタグの節にも書いてあるとおり、多数のエレメントがバラバラの情報を送出してきます。icecast系のネットラジオの場合オーディオのビットレートなどの情報の他、ラジオ局のタグ情報が再生開始時に1度送られ、再生開始時と曲の変わり目で曲名タグが送られて来ますので、タグ情報はためておいて、曲名情報が届いた時だけバルーン通知を上げるようにしています。

上記ソースコード上では、pipeline と bus と bin(playbin2) あたりのキーワードを拾って読めばだいたい GStreamer 関連のコードです。

gtk.StatusIcon と PyNotify

gtk.StatusIcon と pynotify(libnotify) の使い方は、こちらにかかれているものとほとんど同じです。

gtk.StatusIconとpynotifyをくっつける - conc7996のメモ

ただし、ネットラジオの場合、接続したタイミングがたまたま曲が切り替わる直前だった場合、普通に実装してしまうと2つの通知が連続して上がってしまって見苦しいので、バルーン表示中(or 表示前)だった場合は動的に上書きされるよう、update()を使っています。

ステータスアイコンにカーソルを合わせたときに出るチップに表示される文字列はアイコンの tooltip_markup プロパティで設定していますが、これは pango に渡って処理されますので pango のマークアップが使えます。マークアップの詳細はこちら:Text Attribute Markup: Pango Reference Manual

この手の情報を得るためのコツ

このページをググって見つけた方もいるかと思いますが、新しいライブラリを使えるようになる道はそれだけではありません。まともなライブラリなら公式ドキュメントが充実していることが多いので、そちらを積極的に活用してみてはいかがでしょうか。

まずは公式のチュートリアル(例えば PyGTK なら http://www.pygtk.org/pygtk2tutorial/index.html )を見て雰囲気を掴み、用語やクラス名、メソッド名などがなんとなく分かったら、あとはリファレンス (PyGTK は http://library.gnome.org/devel/pygtk/stable/ 、その前提知識の GObject は http://library.gnome.org/devel/pygobject/stable/ )を確認しながら自分のやりたいことを実現させる、というのが結局は近道なんじゃないかと思います。GStreamer も、公式サイトのドキュメントページ http://gstreamer.freedesktop.org/documentation/ にある "Application Development Manual" がチュートリアル的な文章で、これをざっと読めば概念が一通り理解できるようになります。

ググったりCookbookを漁ったりして見つける「サンプル」、読んで理解する「チュートリアル」、辞書としてつかう「リファレンス」、この3つはどれが欠けても苦労することになるでしょう。

まぁ、デスクトップ通知サービスのD-Busの仕様をそのまま見せている libnotify のようなライブラリには、まともなドキュメントが無いものもあったりするので油断はならないわけですが。