並列計算したい

計算量が多くなってきた。
最近、並列計算というのが流行っているらしい。
今までは、Rコンソールとターミナル(コマンドプロンプト)でRを複数起動させて、loopを適度に分割して計算していた、という超アナログ手法だった。
そんなことしなくてもRにも実装されているとかないとか。

どうすればRで並列計算できるのか〜そもそも並列計算って何?

という訳でまず先生の記事から。snowパッケージというものがあるらしい。うん、なるほどわからん
こちらにもまとめてある。うん、なるほどわk(ry。
毎度おなじみのR wiki先生とグーグル先生に聞いてみる。
そもそも、並列計算とは何か、という話なのだが、私は工学屋ではないので、佐藤整尚先生(統計数理研)の説明でわかった気になったことする。
Rで並列計算をするとき、というか、Rを計算機として使う時の問題点として、スペックがどんどん進化してマルチコアになったCPUを、デフォルトの状態では複数個を使って計算できない、とたいていどのページにも書いてある。
それをなんとかかんとかしてくれるのが、並列計算関連のパッケージ。

並列計算をやる

とにかく、並列計算をしたいけど、スクリプトをどう書けばいいかわからない人(自分のことだが)用に、いくつかのサイトからスクリプトを集めてきた。勝手に転載してます。何かあったら言ってください。
環境
Mac OS X 10.6.8
2.4 GHz Intel Core 2 Duo
8 GB 1067 MHz DDR3
R version 2.13.0
コアの割当は基本的に2個までしかできません。

snow

Rで並列処理 (snowライブラリ)から、パッケージsnowの使い方その1。

install.packages("snow")
library(snow)
#localhostに2個のノードを割り当てられます。
cl <- makeSOCKcluster(c("localhost", "localhost"))
#試しにgetwd()でそれぞれの環境のカレントディレクトリを取得してみます。
clusterCall(cl, getwd)
#1000行100列 (マイクロアレイデータを想定、行が遺伝子、列がサンプル)のデータを作り、各行に対してWelchのt検定を行ってみます。
mat <- matrix(rnorm(100000), 1000, 100)
f <- factor(rep(0:1, each=50))
# 各ノードに変数をエクスポートするには、仕様上グローバル変数にする必要がある。
g <<- f
clusterExport(cl, "g")
#単独ノード
system.time(apply(mat, 1, function(x) t.test(x ~ g)$p.value) -> c1)
# 2ノード 
system.time(parRapply(cl, mat, function(x) t.test(x ~ g)$p.value) -> c2)
# もちろん、結果は一致しないと困る
sum(abs(c1-c2))

実は、factorの扱いに慣れていないので、正直なところ私にとって使いにくい…
 
Rで並列計算から、パッケージsnowの使い方その2。

###並列計算用に関数作成###
start_simulation <- function() { 
###データの生成###
a <- rnorm(400)
b <- rnorm(400)
###回帰###
c <- lm(a ~ b)
###とりあえず、係数でも記録します###
return(c(a = coef(c)[1], 
         b = coef(c)[2])
       )
}
###繰り返し数###
k <- 10000

library(snow)
###乱数の都合でこいつも使います。ないと面白いことになります。###
library(rlecuyer)
#Core 2 Duoなので2つです。クアッドコアとかなら4ですかね。開きすぎると閉じさせられます。
#Firewallは無視です。ブロック解除です。
cl <- makeCluster(2, type = "SOCK")
###乱数種はここでセット###
clusterSetupRNG (cl,seed=123)
###並列計算するfunctionをここで明示###
clusterExport(cl, "start_simulation")
###スタート。ついでに時間も計っちゃいましょう###
###係数はpに、時間はtimesに入るようになっています###
times1 <- system.time(p <- parSapply(cl, 1:k, function(x){start_simulation()})) #並列計算
times2 <- system.time(for(ki in 1:k){start_simulation()}) #単独
times1;times2

乱数の設定ができるらしい。

clusterExport()

で、CPUで計算すべき関数を渡すようである。
 
Rで円周率計算+並列計算から、パッケージsnowの使い方その3。

library(snow)
set.seed(1)
tens <- 10^c(1:5)
# データ数を10倍しながら、かかる時間を調べている
elapsed.time <- matrix(0, length(tens), 2)
dimnames(elapsed.time) <- list(tens, c("single", "multi"))
for( points.num in 1:length(tens) ){
	coords <- matrix(runif(2*tens[points.num]),ncol=2)
	start <- proc.time()
	cl <- makeSOCKcluster(rep("localhost", 2)) # 2は利用するマシンのコア数
	elapsed.time[points.num, 2] <- system.time(myPI <- sum(parRapply(cl,coords,function(x){sqrt(sum(x^2))})<1)/points.num*4)[3]
	cl <- makeSOCKcluster(rep("localhost", 1)) # 1は利用するマシンのコア数
	elapsed.time[points.num, 1] <- system.time(myPI <- sum(parRapply(cl,coords,function(x){sqrt(sum(x^2))})<1)/points.num*4)[3]
}
stopCluster(cl)
elapsed.time
       single multi
10      0.009 0.003
100     0.097 0.004
1000    0.943 0.014
10000   9.781 0.074
1e+05 104.304 0.744

使うCPUはいくつがいいのか

こんな質問があったので、これを改変してみる。

aaa2<-function(x){
	mean(rnorm(100))
}
elapsed.time <- matrix(0, 8, 2)
colnames(elapsed.time) <- c("single", "multi")
for(i in 1:8){
	cl<-makeCluster(i, type="SOCK")
	clusterExport(cl, "aaa2")
	elapsed.time[i, 2] <- system.time(parSapply(cl, 1:1000, aaa2))[3]
	elapsed.time[i, 1] <- system.time(sapply(1:100000, aaa2))[3]
}
elapsed.time
     single multi
[1,]  4.472 0.569
[2,]  4.457 0.026
[3,]  4.496 0.026
[4,]  4.580 0.051
[5,]  4.558 0.030
[6,]  4.435 0.028
[7,]  4.473 0.028
[8,]  4.448 0.037

CPUの割当は、ぶっちゃけ2より大きな数を割り当てても機能するようだ。
実行後に、

使われていないコネクションn (<-localhost:xxxxx) を閉じます

とか云々言われるけど、割り当てるように言われたけど使ってませんよ、という意味なのだろう。たぶん。
3以上のCPU割当でも、計算してくれているし、実行速度もこの程度なら高速化できている。
質問の回答を読むと、CPUに計算をしてもらうときの通信速度やらメモリやら分割やらうんたらかんたら書いてあって、うん、なるほどわからん

foreachとdoSNOW

foreach+doSNOWパッケージを使って、並列処理をやってみたから、foreachとdoSNOWの使い方その1。

install.packages("foreach")
install.packages("doSNOW")
library(foreach)
library(doSNOW)
#基本系(返却値リスト)
foreach(i = 1:3) %do% {sqrt(i)}
#.combineオプションを指定することで返却値をベクトルにすることもできる
foreach(i = 1:3,.combine = "c") %do% {sqrt(i)}
#{}内の返却値自体がベクトルの時は.combineにcbindを指定して行列にもできる
foreach(i = 1:3,.combine = "cbind") %do% {letters[1:4]}
#自分で定義した結合方式も可能(↑の.combine関数にcを指定したケースと同じになるようにしてみた)
MyFunc <- function(x,y)c(x,y)
foreach(i = 1:3, .combine = "MyFunc") %do% {
  sqrt(i)
}
Reduce(function(x,y)c(x,y),sqrt(1:3))
foreach(a = 1:1000, b = rep(10, 2)) %do% {a + b}
library(doSNOW)
getDoParWorkers()
getDoParName()
registerDoSNOW(makeCluster(2, type = "SOCK"))
getDoParWorkers()
getDoParName()
getDoParVersion()

N <- 10^4
system.time(foreach(i = 1:N,.combine = "cbind") %do% {
	sum(rnorm(N))
})
system.time(foreach(i = 1:N,.combine = "cbind") %dopar% {
	sum(rnorm(N))
})

並列化させるためには%do%を%dopar%に変えるだけでOK

とあるので、そうした。ちょっとだけ速くなった。
 
RにおけるHPC〜並列計算編〜 Tokyo.R#11 2011/1/29から、foreachとdoSNOWの使い方その2。
スライド19枚目。

pricing.by.mc <- function(S0=100, K=100, B=90, r=0.06, sigma=0.10, T=1, M=52, N=4e5, type="knockout"){ 
	# S0:原資産の現在価格 
	# K :権利公使価格 
	# T :満期 
	# M :時間方向のステップ数 
	# N :モンテカルロ・シミュレーション試行回数 
	# ノックアウトオプションはダウン・アンド・アウトのみを考慮 
	
	delta.t <- T/M # 時間方向の刻み幅 
	z <- matrix(rnorm(M * N), nrow=M) 
	t <- replicate(N, 1:M)/M 
	S <- S0 * exp((r - sigma^2/2) * t + sigma * sqrt(t) * apply(z, 2, cumsum)) 
	if (type=="european") { 
		mean(pmax(S[M,]-K, 0)) 
	} else if (type=="knockout") { 
		y <- apply(S-B, 2, function(x) if(any(x < 0)) 0 
	else max(x[M] - K, 0)) 
	mean(y) 
	}
}
system.time(print(mean(unlist(clusterCall(cl, pricing.by.mc)))))
system.time(print(pricing.by.mc(N=1.2e6, type="knockout")))

スライド22枚目。

library(foreach)
library(doSNOW)
library(randomForest)
# doSNOWパッケージを用いて並列化するための設定
cl <-  makeSOCKcluster(2)
registerDoSNOW(cl)
# 人工的なデータを生成
x <- matrix(runif(20000), 4000)
y <- gl(2, 2000)
# 並列化を行わない場合
system.time(rf.simple <- randomForest(x, y, ntree=1000))
# foreach,doSNOWを用いて並列化した場合
system.time(rf.fe <- foreach(ntree=rep(500, 2), .combine=combine,.packages="randomForest") %dopar% randomForest(x, y, ntree=ntree))

こちらはあまり速くならなかった。原因追究は保留。

doMC

RでdoMCを使ったお手軽並列計算から、doMCの使い方。
MacLinuxWindowsで並列計算用Rパッケージが使える、使えない、があるらしい。とりあえず今はMacを使っているのでそこらへんの検証は後回し。

install.packages("doMC")
library(doMC)
#計算コア数ベクトル生成(コア数: 1,2,4,6,8)
n_core <- c(1, seq(2, 8, by=2))
#正規乱数発生数を設定
N <- 10^5
#計算時間の結果を突っ込む空マトリックスの作成
result.mat <- matrix(0, length(n_core), 3)
for(i in 1:length(n_core)){
	registerDoMC(n_core[i])
	result <- system.time(foreach(i = 1:1000, .combine = "cbind") %dopar% {
		sum(rnorm(N))
	})
	core.d <- c(n_core[i], result[3])
	names(core.d) <- c("Cores", "ProcTime")
	print(core.d)
	result.mat[i, ] <- as.numeric(result)[1:3]
}
result.mat
   Cores ProcTime 
   1.000   16.797 
   Cores ProcTime 
   2.000   23.233 
   Cores ProcTime 
   4.000   30.411 
   Cores ProcTime 
   6.000   13.507 
   Cores ProcTime 
   8.000   13.435 

むしろ時間がかかっているが…これも保留。