Hatena::ブログ(Diary)

ny23の日記 このページをアンテナに追加 RSSフィード

2010-11-27 細かいことが気になる

密/疎ベクトルのトレードオフを調べてみた

|  密/疎ベクトルのトレードオフを調べてみたを含むブックマーク  密/疎ベクトルのトレードオフを調べてみたのブックマークコメント

k-means を実装していて,疎ベクトルと密ベクトルのトレードオフ(距離計算の速度差)が気になったので軽く実験してみた.具体的に知りたかったのは,どれぐらい疎なら疎ベクトルを使った方が距離計算が速くなるか,という問に対する答え.空間使用率の改善については sparse vector における index と value の型のサイズ比でほぼ自明に分かるが,速度に関してはコンパイラの最適化の加減もあるので良く分からない.以下がテストコード(ややずぼらな実装).

[追記] 折角なので,Eigen 3.0-beta2 とも比べてみた.

#include <sys/time.h>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <vector>
#include <tr1/random>
#include <eigen3/Eigen/Core>
#include <eigen3/Eigen/Sparse>

struct node {
  size_t idx;
  double val;
  node (int idx_, double val_) : idx (idx_), val (val_) {}
};

typedef std::vector <double> dv_t; // dense  vector
typedef std::vector <node>   sv_t; // sparse vector

int main (int argc, char ** argv) {
  if (argc < 4)
    { std::fprintf (stderr, "sparse_vs_dense d|s|ed|es n d r\n"); std::exit (1); }
  
  size_t n = std::strtol (argv[2], NULL, 10); // # data points
  size_t d = std::strtol (argv[3], NULL, 10); // # dimension
  double r = std::strtod (argv[4], NULL);     // # density (ratio < 1)

  struct timeval start, end;

  std::tr1::variate_generator <std::tr1::mt19937, std::tr1::uniform_real <double> >
    gen (std::tr1::mt19937 (), std::tr1::uniform_real <double> (0, 1));
  
  double ret = 0;
  if (std::strcmp (argv[1], "d") == 0) {
    // benchimark (dense vector)
    // gen
    gettimeofday (&start, 0);
    std::vector <dv_t> data (n, dv_t ());
    for (size_t i = 0; i < n; ++i) {
      data[i].resize (d, 0);
      for (size_t k = 0; k < d; ++k)
        if (gen () < r)
          data[i][k] = gen ();
    }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
    // dist
    gettimeofday (&start, 0);
    for (size_t i = 0; i < n; ++i)
      for (size_t j = 0; j < i; ++j)
        for (size_t k = 0; k < d; ++k) {
          double val = data[i][k] - data[j][k];
          ret += val * val;
        }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
  } else if (std::strcmp (argv[1], "s") == 0) {
    // benchimark (dense vector)
    // gen
    gettimeofday (&start, 0);
    std::vector <sv_t> data (n, sv_t ());
    for (size_t i = 0; i < n; ++i)
      for (size_t k = 0; k < d; ++k)
	if (gen () < r)
	  data[i].push_back (sv_t::value_type (k, gen ()));
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
    // dist
    gettimeofday (&start, 0);
    for (size_t i = 0; i < n; ++i)
      for (size_t j = 0; j < i; ++j) {
	sv_t::const_iterator it (data[i].begin ()), it_end (data[i].end ());
	sv_t::const_iterator jt (data[j].begin ()), jt_end (data[j].end ());
	while (it != it_end && jt != jt_end) {
          double val = 0;
          if      (it->idx == jt->idx) { val = it->val - jt->val; ++it; ++jt; }
          else if (it->idx  < jt->idx) { val = it->val; ++it; }
          else                         { val = jt->val; ++jt; }
          ret += val * val;
	}
	while (it != it_end) { ret += it->val * it->val; ++it; }
	while (jt != jt_end) { ret += jt->val * jt->val; ++jt; }
      }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
  } else if (std::strcmp (argv[1], "ed") == 0) {
    // eigen dense vector
    // gen
    gettimeofday (&start, 0);
    std::vector <Eigen::VectorXd> data (n, Eigen::VectorXd ());
    for (size_t i = 0; i < n; ++i) {
      data[i].setZero (d);
      for (size_t k = 0; k < d; ++k)
	if (gen () < r)
	  data[i][k] = gen ();
    }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
    // dist
    gettimeofday (&start, 0);
    for (int i = 0; i < n; ++i)
      for (size_t j = 0; j < i; ++j)
        ret += (data[j] - data[i]).squaredNorm ();
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
  } else if (std::strcmp (argv[1], "es") == 0) {
    // eigen sparse vector
    // gen
    gettimeofday (&start, 0);
    std::vector <Eigen::SparseVector <double, 0, size_t> > data (n, Eigen::SparseVector <double, 0, size_t> ());
    for (size_t i = 0; i < n; ++i) {
      for (size_t k = 0; k < d; ++k)
	if (gen () < r)
	  data[i].insertBack (k) = gen ();
    }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
    // dist
    gettimeofday (&start, 0);
    for (size_t i = 0; i < n; ++i)
      for (size_t j = 0; j < i; ++j) {
        Eigen::SparseVector <double, 0, size_t>::InnerIterator it (data[i]);
        Eigen::SparseVector <double, 0, size_t>::InnerIterator jt (data[j]);
        while (it && jt) {
          double val = 0;
          if      (it.index () == jt.index ()) { val = it.value () - jt.value (); ++it; ++jt; }
          else if (it.index ()  < jt.index ()) { val = it.value (); ++it; }
          else                                 { val = jt.value (); ++jt; }
          ret += val * val;
	}
	while (it) { ret += it.value () * it.value (); ++it; }
	while (jt) { ret += jt.value () * jt.value (); ++jt; }
      }
    gettimeofday (&end, 0);
    std::fprintf (stderr, "elapsed (real): %.2fs\n",
                  end.tv_sec - start.tv_sec + (end.tv_usec - start.tv_usec) * 1e-6);
  }
  std::fprintf (stderr, "%f\n", ret);

  return 0;
}

密度 r を変えて d 次元のベクトルを n 個生成して,nC2 回 L2 norm を計算.d, n はあまり関係ないので,(d, n) = (1000000, 100) に固定し (nC2 = 4950),で r だけ変えて実行してみると以下のような感じ.

---------------------------------------------------------------------------------------
   r  |       dense       |      sparse       |   dense [eigen]   |  sparse [eigen]   
---------------------------------------------------------------------------------------
      | space|   time [s] | space|  time [s]  | space|   time [s] | space|  time [s]  
      |  [MB]| gen | dist |  [MB]| gen | dist |  [MB]| gen | dist |  [MB]| gen |  dist
---------------------------------------------------------------------------------------
1.0   | 763.8| 2.33| 23.16|1527.5| 4.61| 46.24| 763.8| 2.43| 22.04|1527.9| 5.13| 47.58
0.5   | 763.8| 2.60| 23.50| 763.9| 3.27| 26.87| 763.8| 2.64| 22.03| 764.2| 3.51| 30.43
0.1   | 763.8| 1.80| 23.50| 153.7| 1.35|  5.18| 763.8| 1.82| 22.06| 153.7| 1.39|  6.07
0.05  | 763.8| 1.68| 23.24|  77.3| 1.09|  2.60| 763.8| 1.71| 22.07|  77.3| 1.10|  3.10
0.01  | 763.8| 1.52| 23.20|  16.1| 0.88|  0.52| 763.8| 1.54| 22.03|  16.2| 0.89|  0.62
0.005 | 763.8| 1.49| 23.22|   8.4| 0.85|  0.25| 763.8| 1.52| 22.01|   8.5| 0.86|  0.30
0.001 | 763.8| 1.47| 23.49|   2.2| 0.83|  0.05| 763.8| 1.50| 22.25|   2.3| 0.83|  0.06
0.0005| 763.8| 1.47| 23.14|   1.5| 0.83|  0.02| 763.8| 1.50| 22.06|   1.7| 0.83|  0.03
0.0001| 763.8| 1.47| 23.58|   0.8| 0.83| <0.01| 763.8| 1.49| 22.05|   0.8| 0.82| <0.01
---------------------------------------------------------------------------------------

gcc 4.6.0 20101127 (-O2 -march=core2 -m64)
Intel Xeon 3.2 Ghz CPU

最適化の加減でもう少し差がつくかとも思ったけど,sparse vector を使った距離計算は予想より速いな.ちなみにいまの実験環境だと,r=0.44 ぐらいで距離計算にかかる時間は逆転する.gen は乱数生成の時間を含んでいるのであまり比較しても意味はないかもしれないが,メモリ確保にかかる時間の差が効いている.上記のコードでは index に size_t (8 bytes), 浮動小数点型に double (8 bytes) を使っているので,完全に密だと sparse vector は dense vector の約二倍メモリを消費する.

真面目にスピードのことを考える人は,ループを手動/自動ベクトル化したり,valarray やeigen とか使うとよろし.

[追記] その他,密ベクトルの方で std::inner_product を使ってみたり,不遇の std::valarray ( (((data[i] - data[j]) * (data[i] - data[i])).sum () ) を使ってみたりしたけど,ほとんど速度は変わらなかった(valarray は結果が僅かに変わったので計算順序に違いがあるかもしれず).

[追記] eigen を使った場合のコードと結果を追加した(初めて使ったので,最適な使い方になっていないかも知れず).密ベクトルで少し速く,疎ベクトルで少し遅くなっている.密ベクトルは計算結果が他と僅かにずれていたので,計算順序に違いがあるのかも(expression template を使ってるとか.興味がないので追っていないが).疎ベクトルは,中を少し見た限り,index と value を別の配列で実装しているので,キャッシュミスが起きやすく,それが時間差に繋がっているのかな(その分,メモリの使用量は index/value の型の組合せの影響を受けにくいが).疎ベクトルで引き算をすると,処理が戻って来なかったため,eigen が提供する API (squaredNorm) で計算するのを諦めた.

[追記; 12/4] ベンチマークにあるように,ベクトルの次元が小さいと,eigen の密ベクトルはかなり速いようだ.(d, n) = (1000, 1000) で固定して実験したものは以下.低次元密ベクトルを使う場合や,一時オブジェクトを作らないようにするとコードが複雑になりがちな行列の計算をスッキリ書きたい場合は,eigen を使うと良さそう.

--------------------------------------------------------------------------------
  r  |       dense     |      sparse     |   dense [eigen]  |  sparse [eigen]   
--------------------------------------------------------------------------------
     |space|  time [s] |space|  time [s] |space|   time [s] |space|  time [s]  
     | [MB]| gen | dist| [MB]| gen | dist| [MB]| gen | dist | [MB]| gen |  dist
--------------------------------------------------------------------------------
1.0  | 8.4 | 0.02| 1.48| 16.3| 0.03| 2.28| 8.4 | 0.02|  0.54| 16.3| 0.03|  2.32
0.5  | 8.4 | 0.03| 1.48|  9.3| 0.03| 2.44| 8.4 | 0.03|  0.54| 10.4| 0.03|  2.86
0.1  | 8.4 | 0.02| 1.48|  2.5| 0.01| 0.49| 8.4 | 0.02|  0.54|  2.6| 0.01|  0.59
--------------------------------------------------------------------------------

gcc 4.6.0 20101113 (-O2 -march=core2 -m64)
Intel Xeon 3.2 Ghz CPU

ACAC 2010/12/04 13:40 手元で、Visual Studio 2010 + eigen2( + boost)でやってみました。

eigen2ならベクトル間距離の2乗はSparseVectorでも
ret += (data[i] - data[j]).squaredNorm();
で計算可能でした。
でも、10-20%くらい遅いので、実用にはどうでしょうね?
私もeigenを使い始めたところなので、最適化の方法は良くわかりません。

一番良かったのは、Visual StudioのConcurrency::parallel_forを使って、
並列化する方法でした。

ny23ny23 2010/12/04 22:27 > eigen2ならベクトル間距離の2乗はSparseVectorでも
> ret += (data[i] - data[j]).squaredNorm();
> で計算可能でした。

eigen 2.0.15 / eigen3-beta2 のコードをもう少し読んでみました.

squaredNorm () は,内部で cwiseAbs2().sum () を呼んでいるのですが,sum () では (rows () または cols () で得られる) sparseVector のサイズ (m_size) が 0 の場合は assert でコケるようになっています (Eigen/src/Sparse/SparseRedux.h, line 32).上記のコードのように,eigen3 で insertBack () で動的に要素を追加する場合(eigen2 で coeffRef () で追加する場合も)m_size は更新されず 0 のままのため,squaredNorm () は sum () のところでコケます.

http://listengine.tuxfamily.org/lists.tuxfamily.org/eigen/2009/01/msg00093.html

辺りを読むと,sparseVector はそもそも動的な resize は想定外とのこと(上のコードの例のように,iterator でアクセスするのは ok).

さらに,data[i] - data[j] は疎ベクトルの場合,要素数が違うと計算前に assert でコケます(eigen2/Eigen/src/Sparse/SparseCwiseBinaryOp.h, line 97, eigen3/Eigen/src/Core/CwiseBinaryOp.h, line 141).結局,

(data[i] - data[j]).squaredNorm()

が疎ベクトルで動くのは,data[i] と data[j] のサイズが同じ場合(resize で同じ値を指定している場合)だけ,ということになります(使えない orz).

> でも、10-20%くらい遅いので、実用にはどうでしょうね?

疎/密ベクトル間の演算程度なら実装も容易ですし,unstable な外部ライブラリ (eigen) をわざわざ使う必要はないかも知れませんね.

> 一番良かったのは、Visual StudioのConcurrency::parallel_forを使って、
> 並列化する方法でした。

Visual Studio は使ったことがないのですが,Parallel Pattern Library (Concurrency) は便利そうですね.浮動小数点演算で計算順序が不定だと,毎回に微妙に結果がズレたりしそうですが.

ACAC 2010/12/05 03:59 うろ覚えですが、手元でデバッグしたときはEigen3の方はsquaredNorm()じゃなくて、
data[i] - data[j] の部分で無限ループにはまっていました。
まじめにコード読んでなくてすいません。

>Parallel Pattern Library (Concurrency) は便利そうですね.
gccでもThreading Building Blocksがありますよね。
そう言えば、どこかでVisual StudioはParallel Pattern Library、
それ以外はThreading Building Blocksにわかれると書いている人もいました。

>浮動小数点演算で計算順序が不定だと,毎回に微妙に結果がズレたりしそうですが.
r = 1では20-30%くらいずれることがあります(Concurrency::combinableの使い方がおかしいのかもしれませんが)。
手元ではズレないようにするためにチョコチョコと手を加えて、
非並列版と同じ値が出るようにしました。

ny23ny23 2010/12/05 17:22 > うろ覚えですが、手元でデバッグしたときはEigen3の方はsquaredNorm()じゃなくて、
> data[i] - data[j] の部分で無限ループにはまっていました。
> まじめにコード読んでなくてすいません。

eigen3 は,SparseVector の index の型(template の第三引数; eigen2 ではない)に size_t を指定すると,引き算で無限ループにはまるみたいです.あと,(data[i] - data[j]).squaredNorm () の場合は,expression template で引き算の計算が遅延されて,squaredNorm () の中の sum () の中の assert のチェックの方が先にかかるので,resize でサイズ (m_size) を1以上にしていない場合はこちらでコケます(二重の罠).

網羅的に確認したわけではないですが,SparseVector のサイズ (m_size) は,入力の適合性のチェック (assert) でしか使われていないようなので,resize で1以上の固定値を適当に指定しておけば(そして eigen3 では SparseVector の template の第三引数を無指定にしておけば),data[i] と data[j] の内容が動的に変わっても,(data[i] - data[j]).squaredNorm () は動くようです(ああ,ややこしい).

> gccでもThreading Building Blocksがありますよね。
情報ありがとうございます.試しに tbb::parallel_reduce で計算してみましたが,(コア数ほどではないものの)分かりやすく高速化されました(結果の誤差は無視できる程度).