embulk-plugin-input-randomを作った

データベースを使って何かする際に、ダミーデータが超大量に欲しくなることがあるのでembulkのinput-pluginを作ってみた。

githubのリンク



何もない環境からなら

$ wget https://bintray.com/artifact/download/embulk/maven/embulk-0.2.1.jar -O embulk.jar

でembulk本体のが降ってくるのでそれを使って

$ java -jar embulk.jar gem install embulk-plugin-input-random

とすればプラグインが降ってくる。

exec: {}
in:
  type: random
  rows: 100
  schema:
    id: primary_key
    name: string
    score: integer
out:
  type: stdout

こんな感じのconfig.ymlを書いて

$ java -jar embulk.jar preview

と打てば

Random generation started.
Random generator input thread 0...
+---------+---------------------------------------------+------------+
| id:long |                                 name:string | score:long |
+---------+---------------------------------------------+------------+
|       0 | 9zfeb1fG7Ha3xxjA_UBUY3QdR1R3ltKYjADjYCthSVc |      6,180 |
|       1 | Ry68YnNi9mQQ-9EEtdftd9R21Z78dMZBDqc0IhNgI_M |      1,914 |
|       2 | DGZUeG961dILfRLx-sdcO6AJIvS6ulAj0qvffdR_uNo |      3,316 |
|       3 | spIhxVLfvd1tjDnbJZ2-nsITY8kZnL6mjpaDJ2tBf_M |      5,319 |
|       4 | ZwuViDwHONHCX47bv-tN7m48mdcr-G4d_laWBXscAUA |      5,767 |
|       5 | QM3H4QUvFSrqRAEUjbHRxnEGgSVUtfpMqh7scp-aiZk |        628 |
|       6 | oHzeKtNeULRxTn2_5SPWapSTPxTS6bTCPGZBTBKY_qo |      7,244 |
....

とこんな感じの物が画面に出てくる。
出力先を適当に書き換えてプラグインを用意すれば任意のデータベースにぶっこむ事ができる。

使い方としては以上なんだけど、これから作ろうと思っている機能についてもちょっと書く。
コンフィグの書き方として以下のような書き方ができるようにしていきたいという妄想。

in:
  type: random
  schema:
    - key: phone
      type:
        - japan-phone-number # 日本の電話番号っぽいものを生成する
    - key: name
      type:
        - japanese-human-name  # 日本人っぽい名前を生成する
        - english-human-name  # 英語圏っぽい名前を生成する
    - key: place
      type:
        - japan-prefecture  # 日本の都道府県をランダムに選ぶ
    - key: score
      type:
        random:  # 50から200までの一様分布の乱数を生成
          min: 50
          max: 200

こういう風に書いて

+---------------+-------------------+--------------+------------+
|  phone:string |       name:string | place:string | score:long |
+---------------+-------------------+--------------+------------+
|  03-1111-1111 |         佐藤 裕太 |       東京都 |        123 |
| 052-1234-5678 |         鈴木 健二 |       東京都 |         98 | 
| 090-2345-6789 | Michael Johonston |     神奈川県 |        169 |
|  03-3456-7890 |         井上 裕子 |       千葉県 |        112 |

こんな感じの物がランダム生成(全部完全に適当にランダムなので一切気にしないで)できるようになるといいかなと思っていて、次の余暇のプログラミングの時にでも作る。

ウェイトフリー性の証明について

気が向いたら書くと言ってしまったので忘れないうちに書く。

飢餓状態とは

  • スレッドAが操作xを試みる
  • スレッドBが操作yを試みる
  • スレッドCが操作zを試みる

という状況の時に、このアルゴリズムがLock-Freeである場合。すなわちスケジューラがどうスケジュールしても少なくともどれか1つのスレッドの操作が成功してしまう事が避けられない場合であってもスケジューラは特定のスレッドの操作だけを意図的に妨害する事は可能であることが多い。
例えば前回の記事のLock-Free Stackを例に話すと

  1. スレッドAがCAS(α→β)を試みようとする
  2. スケジューラがスレッドBのCASが成功するようスケジューリング
  3. 1〜2を繰り返し

こうすれば理論上スケジューラはスレッドAに無限クロックのCPUを食わせながらも、無限にスレッドAのCASを成功させない事が可能だ。
このような状況の事をStarvation(飢餓状態)と呼ぶ。
スケジューラが意図的に最悪のスケジューリングをしてくる環境下において「アルゴリズムが進行する」ことは必ずしも「全てのスレッドの1タスクが有限時間に終わる」事を意味しない*1

この飢餓状態が起きない、つまりスケジューラがどうあがいても有限のクロック数で全てのスレッドが目的を達成できる。という条件を満たす場合そのアルゴリズムWait-Freeと呼ばれる。
有限時間とはいえ、待つ事に変わりは無いのだからWait-Freeという名前は変だなと思っていたら 1974年のLamportの論文が初めてこのWait-Freeという言葉を使ったとTAoMP本に書いてあった。Lamport先生がそう言うんだから仕方ない。
ちなみに、有限のクロックで操作が完了できれば良いのだから「ただ単にメモリの値を読む」とか「ダイクストラ法で最小コストのパスを探す」「クイックソート」とかのシングルスレッドで解決できる問題をシングルスレッドで普通に書いている分にはそれはWait-Freeに分類される。

飢餓状態の回避方法

並行でWait-Freeで線形化可能*2なデータ構造の一般的な作り方というのはよく知られている。
universal constructionなどと呼ばれ、2010年のこの論文が手頃であるがもちろん読む必要はない*3

その一つの方法は簡単に言うとAnnounce Arrayという配列を用意して運用することである。
配列はスレッド数と同じだけの長さがあり、全てのスレッドは何かを操作する前に操作したい内容をその配列中の自分に割り当てられた場所に記述する。
全てのスレッドは自分の操作を実行する前にこの配列全体を調べ、既に自分以外のスレッドが操作を開始している場合にはそいつの操作を手伝う。
この手伝うというのが厄介で、自分自身の実行しようとした操作と全く同一の操作が自分以外のスレッドに1クロック前に達成されてしまう場合などもあるのでコードが非常に複雑になる。
詳しくは僕の魔導書に書かれたWait-Free Stackや、Wait-FreeなKP-Queueの論文あたりを読んでみると良い。
操作内容を書き込む歳にデータ構造の論理クロック*4を一緒に添えて置く事で、どの操作が最初に登録されたのか比較できるので、より古いものから優先的に実行できる。

証明の形

そうやって作ったWait-Freeな操作と、先ほどの「スレッド1の操作が未来永劫実行されなくなるよう配慮するがスレッド1にもCPU時間を割り当てないといけないスケジューラ」が対立した場合*5の最大動作クロックの1例は以下である。

  • スレッドはn本走っている
  • 操作は邪魔されなければkクロックで済むものとする
  1. スレッド1が操作xを行いたいとAnnounce Arrayに書き込む。aクロックかかる。この時の論理クロックはt。
  2. スレッド1の他にスレッド2からスレッドnの全スレッドがAnnounce Arrayに操作(論理クロックt未満)を書き込んでいるので先にそれらを手伝う。(n-1) * kクロックかかる。
  3. スレッド1が自分の操作xを実行しようとしたがスケジューラはそれを止めて他のスレッドに割り当てる。
  4. スレッドpが何か操作を行おうとしたが、スレッド1よりあとに開始したので論理クロックはt+1以上であり、Announce Arrayに操作x(論理クロックt)が先に入っているのを発見するのでそちらを手伝う事になる。aクロックかかる。
  5. 操作xが行われてしまうとスケジューラはスレッド1を足止めできた事にならないので、操作xを行おうとしたスレッドpも止める
  6. 上記4と5をn-1スレッド分繰り返す
  7. スケジューラはどのスレッドにCPUを割り当てても操作xが実行される事を見届けなくてはならなくなるので、どう転んでも操作x(=スレッド1)が進行する。結果として合計(n-1)*k + n * aクロック消費してスケジューラは投了。
  8. (n-1)*k + n * aは無限ではないので、このアルゴリズムはWait-Freeの条件を満たす

という証明をする。

発展的話題

個人的にはKP-Queueの次の年にでた論文、A methodology for creating fast wait-free data structuresが中々好きである。これは上記アルゴリズムを改造して、常時はLock-Freeなデータ構造の様に動かすが予め設定した上限に達しそうになったらWait-Freeに切り替えるという方法。上記アルゴリズムなら他スレッドがAnnounce Array内に居たら即手伝っていたが、こちらは各スレッドがローカルなカウンタを持っていて、予め設定した回数mに至るまではAnnounce Arrayにスレッドが居ても無視するという方法である。更に、自分がLock-Free的にトライした操作がk回失敗続きの場合にやっとAnnounce Arrayにデータを登録し始める、という方法を採用している。これによって各スレッドの操作完了までの最悪時間は「最初にk回Lock-Free的にトライするクロック数」や「他の全てのスレッドにm回見逃されるクロック数」を加算することになるのでクロック上限値が大きくなってしまうが有限の範囲なので依然としてWait-Freeの条件を満たしつつ、実測上Lock-Freeデータ構造に逼迫する性能が出る。という結果が出ている論文である。ここまでしてWait-Freeを実現する意義がいまいちな所も良い。

*1:これはもちろん次から次へと新しいタスクがくるという暗黙の前提に基づいている

*2:こっちの説明もいずれ書く

*3:というか僕もろくに読めていない

*4:操作するたびにインクリメントされるただの数字

*5:ややこしいけど従属的な進行の話はまた今度

ロックフリー性の証明について

http://www.slideshare.net/kumagi/lock-free-safe?next_slideshow=1

とか過去に自分で書いておきながら、その当時の自分の認識が甘かった事もあるのでここに一度書き出しておく。

Lock-Freeは「ロックを使わない事」ではない

STMの事をもってしてLock-Freeと呼んでる文脈はいっぱいあるけれど、STMの実装にロックを使う事は一般的だし、それは正しい専門用語で言う所のLock-Freeとは呼ばない。

Lock-Freeとは「どんなスケジューリングが為されようともどれかの操作が進行する」という進行保証(Progress Guarantee)を表している。

スケジューリング?

マルチコアCPUでもシングルコアCPUでも、OSは実行中のプログラムを任意のタイミングで強制的に一時停止させて他のプログラムにCPUリソースを割り当てる事ができる*1。実行中スレッドからのOSによるCPUの引き剥がし・割り付けの事を並行プログラムの世界ではスケジューリングと呼ぶ。
シングルCPUコアの環境でついうっかりC言語あたりで無限ループを記述してもkillできるのはこの仕組みのお陰である。

普通の排他ロックは悪意あるスケジューリングに対して無力

悪意あるスケジューリング、という状況は世の中には普通無いが、最悪の場合を想定してシステムが満たす特性の下限を保証する必要がある状況というのはある*2
例えばあるスレッドが共有資源に対する排他ロックXを獲得した後にOSがそのスレッドを強制的に一時停止させて、他のプログラムやスレッドだけにCPUリソースを割り当て続けた場合、他のスレッドは何兆クロックのCPUサイクルを与えられようが排他ロックXが確保されたままの資源にアクセスしてはならないので、利用可能なCPUサイクル全てをドブに捨てる事になる。これを専門用語でブロッキングと呼ぶ。

悪意あるスケジューリングに対して耐性のあるロックの使い方例

他のスレッドが排他ロックを獲得したままであってもプログラムを進行させる使い方はありうる。
try_lock()を用いて「ロックを取ろうとは試みるけど、ロックが取れなかったら別の手段を講じる」というパターンはブロッキングが発生しないのでLock-Freeと呼べる事がある、ロックを利用しているにも関わらず、だ。典型的にはがちゃぴん先生のmallocの話がそれに該当するアルゴリズムの一例。

Lock-Freeの進行保証について

スケジューラの立場からLock-Freeというのを解釈し直すと「どんなスケジューリングを行っても絶対に進行してしまう」アルゴリズムであって、Lock-Freeであることを証明するためには「考えうる限り最悪のスケジューリングを行った場合に、アルゴリズム側が何らかの操作を実現するまでに必要なクロック数の上限が無限でない」という事を示せば良い。

例えばシンプルなLock-Free Stackの場合、

  • t個のスレッドで実行している
  • スケジューラはpushもpopも1つでも実行させないよう最悪ケースのスケジュールを行おうとする

という状況にて

  1. headポインタのアドレスPを読むのにaクロック(有限)
  2. mallocにbクロック(有限)
  3. mallocしたメモリ領域Qk*3に必要なデータを書き込むのにcクロック(有限)
  4. P→QkのCASを試みるのにdクロック(有限)
  5. スケジューラはCASを失敗させたいのでCAS実行の直前で他のスレッドにCPU時間を割り当てる
  6. ↑の1〜5の動作をt個のスレッド全部の分繰り返す
  7. スケジューラはどのスレッドにCPU時間を割り当ててもP→QkのCASを実行しようとしているスレッドばかり
  8. t個のスレッドのうちどれかが必ずCAS成功する。スケジューラは t * (a + b + c + d)クロックで投了
  9. t * (a + b + c + d)は有限なのでこのアルゴリズムには進行保証がある

というようにLock-Free性(=進行保証)を証明できる。

Wait-Freeに関してはまた気が向いたら書く。

*1:当然OSのレベルでプリエンプティブな機能を実現している場合だが、WindowsLinuxMacOSもいまどきのOSはデフォルトでは大抵これを実装している。スパコンだとこの機能のオーバーヘッドを気にしてノンプリエンプティブなOSを利用する場合もあるらしいが詳しくない

*2:銀行のシステムだと全ての操作がキャッシュミスした場合を想定してそこでも性能がちゃんと出るように血を吐くような努力をしていると聞いたことがある

*3:kはそのスレッド固有の数字

userspace RCU(QSBR)の使い方と解説

http://lttng.org/urcu|Userspace RCU
という大変クールなプロジェクトがあります。

「RCU(リードコピーアップデート)をユーザースペースで行うもの」という事で、そこだけ聞くとなんのこっちゃという感じ。

リードコピーアップデートって何よ

リードコピーアップデートそのものの正しい説明はWikipedia*1 でも読んでもらうとして、Wikipediaを読むのすら面倒な人の為に説明すると「みんなで共有してるデータをfree()しても良いタイミングを見極める技術」です。

特定の構造体をみんなで共有して書き換えたりしたいな → Lockとればよくね?
すごく頻繁にアクセスするし読み出しの方が多いからLockはやだな → Read-Write Lockでよくね?
Read-Write LockだとAtomic命令多すぎてパフォーマンスでないな → Lock無くしてデータをポインタで包んで書き換えの時だけ複製して差し替えればよくね?
読んでる最中のデータを他の奴が差し替えた後にこっちが読みかけのを勝手にfree()しやがるからSEGVするな → じゃあfree()の実行を遅延させればSEGVは避けられるんじゃね?

「じゃあ削除を遅延ってどれぐらい遅延させればいいのよ?」
が本題。長い道のりだった。

カーネル空間では古くから問題になっていた物で、その解決策としてRCUというものが使われている。
大雑把に説明すると

  • 大事なデータを読み出してる間にはプリエンプションを禁じる
  • データを削除する際にはそのスレッドをyieldし続けて他のスレッドに自分の載ってるプロセッサを貸し出す。他のスレッド全部が最低1度でも自分のCPUコアに載った事を確認したら消して良し(何故なら大事なデータを読みかけのスレッドはプリエンプションされないので、自分のCPUコアに載ってこない。載ってきたなら読みかけでない証拠)

という物。これは読み出し側に一切のペナルティが無くて素晴らしい。でも、カーネル空間内だからこそプリエンプションを禁じるとかスレッドのマイグレーションを検知するとかが出来るわけで、ユーザー空間で真似できる代物ではない。*2

そこで、アルゴリズムの力でユーザー空間でこれに近いものを実現しようぜってのがUserspace RCU。
個々のアルゴリズムを学ぶとおもしろいんだけど、日本語のドキュメント少なすぎて笑えるので日記に書こうと思った所存。

使ってみる

複数の実装を選べて、性能の特性や挙動が微妙に違うのだけど、目的はどれもおよそ一緒。
今回は僕の好きなQSBRを例に挙げて説明する。QSBRは他のアルゴリズムと比べて、Reader-Sideのパフォーマンスとスケーラビリティに優れる。*3

ロックフリーアルゴリズムではガベージコレクタに頼ったアルゴリズムが多くて、Javaで実装出来たものがCだと普通には実装できないことが多い。
そこでこういう方法でオブジェクトの寿命をちゃんと守ってあげると正しく使えるようになる。

インストール

urcuライブラリのインストールは何の変哲もない./configure && make && sudo make installで終わるので特に書くことはない。

使い方例

#include <urcu-qsbr.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <stdint.h>

int** global_array;
int array_length;

void* reader(void*) {
  // 配列内の適当な位置を読んで合計値を算出するだけのスレッド
  int local_sum = 0;
  int i;
  for (i = 0; i < 1000; ++i) {
    rcu_quiescent_state();
    // URCUで保護されているので安心してポインタの中身を参照できる
    local_sum += *global_array[rand() % array_length];
  }

  rcu_thread_offline();  // 処理が終わったので他のスレッドにこのスレッドを無視しろと伝える
  printf("my local sum is %d finish!\n", local_sum);
}

void* writer(void*) {
  // 配列内の適当な位置を適当な値にCASして回るだけのスレッド
  int i;
  for (i = 0; i < 1000; ++i) {
    rcu_quiescent_state();
    int* new_value = (int*)malloc(sizeof(int));
    *new_value = rand() % 100;
    int target = rand() % array_length;
    int* old_value;
    for (;;) {  // CASスピン
       old_value = global_array[target];
       if (__sync_bool_compare_and_swap_8(&global_array[target], (intptr_t)old_value, (intptr_t)new_value)) {
          break;
       }
    }
    synchronize_rcu();  // 他のスレッドのquiescent_stateが経過するのを待つ
    free(old_value);  // RCUが無かったらこの行は危険
  }

  rcu_thread_offline();  // 処理が終了したので他のスレッドにこのスレッドの状態を無視しろと明示
  printf("writer finish\n");
}

void array_init() {
  // int*の配列に適当な値を保持したint*を置いていく
  int i;
  array_length = 100;
  global_array = (int**)malloc(array_length * sizeof(int*));
  for (i = 0; i < array_length; ++i) {
    global_array[i] = (int*)malloc(sizeof(int));
    *global_array[i] = rand() % 100;
  }
}
void array_destroy() {
  // 上記の配列を解放する
  int i;
  for (i = 0; i < array_length; ++i) {
    free(global_array[i]);
  }
  free(global_array);
}

int main(void) {
  const int threads = 10;
  int i;
  pthread_t reader_thread[threads];
  pthread_t writer_thread[threads];

  // 配列初期化
  array_init();

  // スレッドを開始
  for (i = 0; i < threads; ++i) {
    pthread_create(&reader_thread[i], NULL, reader, NULL);
    pthread_create(&writer_thread[i], NULL, writer, NULL);
  }

  // 終わるのを待つ
  for (i = 0; i < threads; ++i) {
    pthread_join(reader_thread[i], NULL);
    pthread_join(writer_thread[i], NULL);
  }

  // 配列破棄
  array_destroy();

  printf("all threads finish\n");
}

実行例

$ gcc qsbr_test.cpp -lurcu-qsbr -lpthread -g
$ valgrind ./a.out
==14076== Memcheck, a memory error detector
==14076== Copyright (C) 2002-2011, and GNU GPL'd, by Julian Seward et al.
==14076== Using Valgrind-3.7.0 and LibVEX; rerun with -h for copyright info
==14076== Command: ./a.out
==14076==
my local sum is 52455 finish!
writer finish
my local sum is 53090 finish!
writer finish
writer finish
my local sum is 53159 finish!
my local sum is 50037 finish!
my local sum is 52187 finish!-
writer finish
writer finish
my local sum is 46717 finish!
my local sum is 45657 finish!
writer finish
writer finish
my local sum is 52845 finish!
writer finish
my local sum is 52102 finish!
writer finish
my local sum is 46073 finish!
writer finish
all threads finish
==14076==
==14076== HEAP SUMMARY:
==14076== in use at exit: 0 bytes in 0 blocks
==14076== total heap usage: 10,121 allocs, 10,121 frees, 47,280 bytes allocated
==14076==
==14076== All heap blocks were freed -- no leaks are possible
==14076==
==14076== For counts of detected and suppressed errors, rerun with: -v
==14076== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 2 from 2)

no leaks are possible の文字が燦々と輝いてますね。素晴らしい。

解説

どうやってこんなのを実現してるかというと、スレッドローカルなストレージにカウンタを置くことで解決してます。
QSBR(Quiescent State Based Reclamation)の名前の通り、Quiescent State(=静止状態)の有無を他のスレッドに対して通知できれば良いのです。

適当な実装を書くならこんな感じ。ただし擬似コードなのでコンパイル通らないし、同時に1スレッドしかsynchronize_rcuできないけれどコンセプトを示すには充分です。

int global_counter = 0;
__thread local_counter = 0;

void rcu_quiescent_state() {
  // 他のスレッドに自分が一旦静止状態に入った事を明示
  local_counter = gloabal_counter;
}

void synchronize_rcu() {
  int old_counter = global_counter;
  ++gloabal_counter;  // ここでカウンタの値を増やす(※)
  local_counter = global_counter;
  for (他のスレッドのそれぞれのlocal_counterを見て回る) {
    if (個々のlocal_counter <= old_counter) {
      continue;  // まだ仕事中らしいのでquiescent_stateを待つ
    }
  }
  // ここに至ったという事は全てのlocal_counterは上の※で増やしたカウンタを観測した。
  // つまり自分がsynchronize_rcuを呼んでから、他のスレッドが最低1回はrcu_quiescent_state()を呼び出した事を意味する
  // つまり1回でも静止状態に入ったので自分以外に到達不能になったデータは消して良いことがわかった。
}

簡単ですね。
他のスレッドから特定のデータを到達不能にした後、到達不能なスレッドがそのアドレスに触らない事を保証できるようになるのを待つわけです。
この待ち期間を猶予期間(grace period)と呼びます。


他のスレッドが最低1回でもQuiescent Stateを宣言するのを待ちます。

厳密にはこれを使うとアルゴリズムはlock-freeではなくなるのですが、実用上充分な速度とスケーラビリティが出ます。
readが多数を占める処理の場合、rcu_quiescent_stateはキャッシュラインがSharedステートに落ちたglobal_counterを読み続ける事になるのでL1キャッシュに当たり続ける事が期待されます。
May god lock-free you!

*1:http://ja.wikipedia.org/wiki/%E3%83%AA%E3%83%BC%E3%83%89%E3%83%BB%E3%82%B3%E3%83%94%E3%83%BC%E3%83%BB%E3%82%A2%E3%83%83%E3%83%97%E3%83%87%E3%83%BC%E3%83%88

*2:プリエンプション禁止なんて物騒な物をほいほいとユーザー空間で使えるようにしちゃったら誤用・悪用された時のダメージ半端ない。

*3:僕の読んだことのある実装例では、読み出し側のオーバーヘッドはメモリ読み書き一回ずつとメモリバリア1回で済んでる。しかも書き換えが起こって無ければキャッシュラインすら汚さない優れもの。

Stackを使ってQueueを作る

有名な話かと思ったら意外と知られていなかったのでメモ。
FILOを使ってFIFOを作るとも言います。StackでQueue作れてもQueueでStackを作る方法が思いつかないので誰か教えて下さい。もしくはこういう学問があったら紹介して頂けると嬉しいです。

簡単な説明としては、2つのStackを用意して、enqueueするときには1つ目にpush()し、dequeueするときには2つ目からpop()するだけ。
ただし2つ目のStackが空の場合は1つ目のスタックが空になるまで2つ目のスタックに移し替える。

template<typename T>
class MyQueue {
  std::stack<T> in, out;
  MyQueue(){}
  
  void enqueue(const T& v) {
    in.push(v);
  }

  T dequeue() {
    if (out.empty()) {
       if (in.empty()) {
         throw std::exception("Queue is empty!");
       }
       while (!in.empty()) {
         out.push(in.top());
         in.pop();
       }
    }
    T result = out.top();
    out.pop();
    return result;
  }
};

図に書くとこんな感じ。

実際にデータを貯めてみる。enqueue(1);enqueue(2);enqueue(3);

Stackなので最初に入れたデータが一番底に沈む。
ここからdequeueしてみる。out側のstackが空なのでin側から持って来る必要がある。

これでout側のstackを見るとあら不思議、最初に入れたものがStackの一番上に。これをpop()すればdequeueできる。

max_queue 最大値をいち早く返すデータ構造

ちょっとマニアックなデータ構造を紹介

その名もmax queue、使い勝手はほとんどFIFOなqueueと一緒で、enqueue()もdequeue()もO(1)だけどそれに加えて「入ってるデータのうち最大の値」もO(1)の計算量で算出するという代物。
兄弟分で最小の物を返すmin queueや、両方の機能を備えたmin-max queueもあるけどmax queueが判れば自明なので割愛。

最大の値を覚えておけば良いだけに思えるけど、最大の値がdequeueされてしまった場合にO(n)の時間を掛けて再び探索してたら条件を満たせない。

max-queueでは最大の値を覚えておくため専用のqueue(以後最大値queueと呼ぶ)を内部でもう一本持つ。

この青い線がqueueに入ってるデータ列で高さがデータの大きさを表す、下に書かれている薄い線の入ったデータ列が最大値queue
こちらの最大値queueでは過去に入れられたデータのうち、一番大きな物から順番に降順にデータを記録し、他は無視している。
薄い線で書かれたデータは最大値queueの側では保持していない。

複雑そうに見えるけど
enqueueする時に「挿入したいデータよりも小さいデータが最大値dequeueの末尾にあったらそれらを消してから入れる」だけ。
dequeueする時に一緒にその値が出ていくなら最大値queueからも同時にdequeueするだけ。
コードで書くと非常に明快。

https://gist.github.com/3847509

#include <list>

template <typename T>
struct max_queue {
    max_queue():queue_(),max_(){}
    void enqueue(const T& n){
        queue_.push_back(n);
        const T& new_item = queue_.back();
        while(!max_.empty() && *max_.back() < new_item){ // 最新の値より小さい末尾を全部消す
            max_.pop_back();
        }
        max_.push_back(&new_item);
    }
    T dequeue(){
        if(queue_.empty()){ throw std::exception(); }
        const T& deleting_item = queue_.front();
        const T ans = queue_.front(); // copy
        if(max_.front() == &deleting_item){ // 一緒に最大値が出ていくなら一緒に消す
            max_.pop_front();
        }
        queue_.pop_front();
        return ans;
    }
    const T& max()const{
        if(max_.empty()){ throw std::exception(); }
        return *max_.front(); // 最大値dequeueの先端を返すだけ。速い。
    }
private:
    std::list<T> queue_; // 普通のqueue
    std::list<const T*> max_;// 最大値queue
};

#include <assert.h>
#include <iostream>
int main(void){
    max_queue<int> q;
    q.enqueue(1); assert(q.max()==1);
    q.enqueue(3); assert(q.max()==3);
    q.enqueue(5); assert(q.max()==5);
    q.enqueue(2); assert(q.max()==5);
    q.enqueue(7); assert(q.max()==7);
    q.enqueue(4); assert(q.max()==7);
    q.enqueue(1); assert(q.max()==7);
    q.enqueue(2); assert(q.max()==7);
    q.enqueue(1); assert(q.max()==7);
    assert(q.dequeue()==1); assert(q.max()==7);
    assert(q.dequeue()==3); assert(q.max()==7);
    assert(q.dequeue()==5); assert(q.max()==7);
    assert(q.dequeue()==2); assert(q.max()==7);
    assert(q.dequeue()==7); assert(q.max()==4);
    assert(q.dequeue()==4); assert(q.max()==2);
    assert(q.dequeue()==1); assert(q.max()==2);
    assert(q.dequeue()==2); assert(q.max()==1);
    assert(q.dequeue()==1);
}

でも使いどころは割と難しいと思う。

Lock-free スナップショットの撮り方を説明してみる

マルチスレッドなどの環境下で時間経過によって勝手に変化する複数の変数を読みたい場面は多くあります。
しかしCPUは一度に一つしか値を読み書きできないので、簡単には出来ません。

何故なら読んでるそばから値が変動してでたらめな値を読むかも知れないからです。
変動してるものを読むのだから読む側が完璧じゃなくても仕方ないという妥協は有り得ますが
特定のある一瞬の時刻での状態を写真のようにパチリとフィルムに収められたら嬉しいですよね。

一番簡単なのはロックを取りながら読む事ですが、ロックは諸般の事情により使えない事とします。

前提

値が勝手に変動するという状況が分かりにくいと思うので、適当な例えとして卵の孵化を想像してみましょう。

「卵」→「ヒビ」→「ひよこ」 の順でランダムに勝手に進んで行きます。止めることは出来ませんし後戻りもしません。
こんな卵が複数ある孵化器の監視を任されたとします。卵の状態が今どんな感じかを訊かれるので適切に答えましょう。
ただし、一度に目視できるのは卵一つだけです。卵は今回は図の関係で3つだけとしますがこれ以上に増えても手順は同様です。

レベル1

「全部の卵が『ヒビ』状態なのか教えて」
一度には見れないので、一個ずつ卵を見てみましょう。A→B→Cの順に卵を確認します。

たまたま全部にヒビが入っていましたね、これで「全部が『ヒビ』でした」と教えて良いのでしょうか?ダメです。
何故なら確認中に卵の状態が変わったかも知れないからです。もし見てない所での卵の状態がこんな感じだったらどうでしょう?

もしこうなら、仮に卵の状態を常時同時に監視し続けれたとしても「全部が『ヒビ』」という瞬間は一瞬たりとも発生していないのです。発生していなかった状態を観測して報告してしまっては監視員失格です。ではどうすれば良いのでしょうか?

答えは二度見れば良いのです。

二度見て、全てで『ヒビ』状態だった場合、どんな状況を想定しても緑色の横線を引いた瞬間は必ず3つとも『ヒビ』状態だったと言わざるを得ません。もしそうでなかったら、二度目見た時に状態が変わっていないと説明が付かないからです。
つまり合計6回観測して、全部『ヒビ』だった場合に限り「そうだよ」と返すだけですね。とても簡単です。コードに書くならこんな感じ。

static const int TAMAGO=0, HIBI=1, HIYOKO=2;
int eggs[3]; // 勝手に状態が変わる卵たち
bool check_hibi(){
  for(int i=0; i<2; ++i) // 二度見する
    for(int j=0; j<3; ++j) // 全部の卵を見る
      if(eggs[j] != HIBI) return false;
  return true;
}

スナップショットの考え方は大まかにこの通りです。

補足「そんな答えで大丈夫か」

緑の線の時に全部が『ヒビ』だからって、2度目見終わった後で報告する前の間にどれかが『ひよこ』になる場合があるよね、嘘の状態を報告することになるよね。と気になる方も居ると思います。
確かにその通りです。報告を受け取った側はその報告を受けたからといってその時もずっと『ヒビ』の状態が維持されている保障は全くありません。
しかし考えてみてください、全知全能の神が光より速く全ての卵の状態を観測して超すごい手段で報告をよこしてくれたとしても、全く同じ問題は起きます。ここで出来る最良の事は「全てが『ヒビ』だった瞬間が一瞬でも観測された」事を報告する事なのです。

レベル2

「全部の卵の状態を教えて」
やっとスナップショットらしくなって来ました。
方法は全く同じく「二度見する」だけです。

この場合は緑色の線の所でA=『ひよこ』B=『卵』C=『ヒビ』であることを保障できます。
何故なら2回見た間で一致しているからです。では一致しなかったらどうするのでしょう?

赤いキザギザ線のうちどこかでC卵の状態が変わっているので、安易にスナップショットがとれたとは言えません。
その場合は「更にもう一度見る」だけです。

一致するまで何度でも観測すれば良いです。コードで書くならこんな感じ。

int* snapshot_eggs(){
  int* const snapshot = static_cast<int*>malloc(3*sizeof(int));
  for(int i=0; i<3; ++i) snapshot[i] = eggs[i]; // 初回の観測
  while(true){
    bool check = true;
    for(int i=0; i<3; ++i){
      if(snapshot[i] != eggs[i]) check = false; // 前回のチェックと合わなかったら失敗
      snapshot[i] = eggs[i]; // 次回チェックあるいは返答のために記憶
    }
    if(check == true) return snapshot; // 二度観測した物が合ったらスナップショット成功

    // 合わなかったらスナップショットやり直し
  }
}

ロックを取っていないのでこのアルゴリズムはノンブロッキングに分類されます
また、値が書き変わり続けた場合にスナップショットがいつまでも成功しない状況(=starvation)が有りうるため、このスナップショットアルゴリズムはlock-freeです。値が書き変わり続ける場合、自分は失敗しますが他のスレッドの何らかの操作は進んでいるため進行保障は満たされます。

レベル3

さて、ここまでの状況はとても簡単でした。何故なら卵は孵化する方向にしか進まないから起こり得る状況の数が知れてるからです。もう少し難しい状況を考えてみましょう。

さて信号機は状態遷移が巡回してしまいます。地点の離れた複数の信号機をどうやって一人でスナップショットを取ったら良いのでしょうか?
結論から言うと無理です。2つの信号を例にあげます。

青信号を2回連続で観測したからと言って、その2回の青信号が同一の青信号とは限りません。2回目に見た青信号はぐるっと一周した物で、ひょっとしたら一瞬たりとも2つの信号が同一になった瞬間は存在しないかもしれません。

しかしプログラミングの世界ならもう少しがんばれます。
信号機に信号と一緒に変化するカウンタを付けるよう国土交通省あたりに掛け合えば良いのです。*1

こうすれば赤信号を二回観測してもそれが同一の赤信号なのかぐるっと一周してきた赤信号なのかはたまた何周かした後のものかを区別できます。そうすればもうこっちの物でヒヨコのスナップショットがそのまま適用できます。桁溢れが心配ならカウンタに64bit用意しておけばたぶんカウンタが巡回する前に宇宙が崩壊することでしょう*2

こうすることで無事にスナップショットを撮れます。

memcachedはgetsコマンドを使う事でキーごとに取り付けられたユニーク値を獲得できます。この値は64bit有って、memcachedが起動してから加えた操作の論理クロックが書き込み時に一緒に書き込まれる物で、一言で言うとここで付けたカウンタと全く同じ使い方をして構わない物です。
つまりmemcachedはgetsコマンドで、ユニーク値を比較し複数の任意のキーバリューペアのスナップショットを取れます!*3

さて、信号機ですがカウンタ値を返されても困る、実際の信号の値を返してくれ、というのが普通だと思います。
信号とカウンタとを一瞬で読めれば完璧なんですが常にそれが可能とは限りませんし、カウンタと信号の変化が完璧に同時とも限りません。
困りましたね、しかしそこにもスナップショットが適用できます。
カウンタを読む→信号を読む→カウンタを読む(二度目)→信号を読む(二度目) という作法で可能です。コードで書くとこんな感じ。

struct lamp_t{
  int lamp_;
  uint64_t cnt_;
  lamp_t(const lamp_t& org)
  :lamp_(org.lamp_),cnt_(org.cnt_){ // スナップショットコピーコンストラクタ
    while(true){
      if(lamp_ == org.lamp_ && cnt_ == org.cnt_) return; // 一致したので成功
      lamp_ = org.lamp_; cnt_ = org.cnt_; // 失敗したのでもう一回コピー
    }
  }

  bool operator==(const lamp_t& rhs)const{ // スナップショット比較
    if(lamp_ != rhs.lamp_ || cnt_ != rhs.cnt_) return false;
    if(lamp_ != rhs.lamp_ || cnt_ != rhs.cnt_) return false; // 二度見する必要がある
    return true;
  }

  lamp_t& operator=(const lamp_t& rhs){ // スナップショット代入
    lamp_ = rhs.lamp_; cnt_ = rhs.cnt_;
    while(true){ // 一致するまで繰り返す
      if(lamp_ == rhs.lamp_ && cnt_ == rhs.cnt_) return *this;
      lamp_ = rhs.lamp_; cnt_ = rhs.cnt_; // 一致しなかったら読みなおし
    }
  }
};

信号の値を読むときはこれらのメンバ関数を使って、ヒヨコの時と同じ作法でスナップショットを取る事で達成できます。

まとめ

今回は複数の値をあたかも一瞬で全部確認したかのようにチェックするLock-freeスナップショットについて解説しました。
複数の値を読み出す操作を二回行なって、一致を見るだけで可能です。*4

次回はこれを使ってk-Compare Single Swap(k個の値が一定の値であることをチェックしてそのうち一つを書き換えるまでをアトミックに行う)を実現してみます。

*1:もちろん本当に掛け合えという意味ではなく、そういう設計を自分で書くという事です

*2:ハノイの塔的な意味で

*3:なんでlibmemcachedのクライアントライブラリにmuluti_getsが無いのか理解に苦しむところです。

*4:そんな簡単な方法をスナップショットと見做して良い事のほうが意外だったかも知れません