Rで並列計算
(とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。)
snowパッケージを用いてクラスターマシン上で並列計算を行う。
Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。
MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,LAM-MPIを想定している.
LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに.
PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い.
# うっかり,--no-saveを忘れると痛ましいかも. mpiexec -boot N R CMD BATCH --no-save hoge.R
R上で
install.packages(c("rpvm", "Rmpi", "snow"))
rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。
ライブラリの読み込み
> library(snow)
SOCKを使用
> hosts <- rep("localhost", 10)
> scl <- makeCluster(hosts, "SOCK")
PVMを使用
> scl <- makeCluster(10, "PVM") Loading required package: rpvm
MPIを使用
> scl <- makeCluster(10, "MPI") Loading required package: Rmpi
終了時に解放
> .Last <- function() { stopCluster(scl) }
乱数の初期化
> set.seed(1234) > seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max)) > sclusterApply(cl, seeds, set.seed)
###自信がないですが・・・
> seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) > clusterApply(scl, seeds, set.seed)
###ですよね。ryamada22(05/24/2006)
関数を用意
> i <- c(1:10000)
> x <- runif(50)
> tmp.func <- function(i){xx <- sample(x); mean(xx)}
並列化なし
> system.time(lapply(i, tmp.func)) [1] 2.96 0.31 3.27 0.00 0.00
並列化あり
> clusterExport(cl, "x")
###ここも
> clusterExport(scl, "x")
###ですよね。 ryamada22(05/24/2006)
> system.time(parLapply(scl, i, tmp.func)) [1] 0.14 0.01 0.44 0.00 0.00
並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。
準備
scl <- makeCluster(40)
.Last <- function() { stopCluster(scl) }
seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max))
clusterApply(scl, seeds, set.seed)
nopt.calc <- function(tm, scl, nopt, verbose=T) {
if (verbose) cat("nopt n:", nopt$scl.size, " time:", tm)
if (nopt$ltime < tm) {
nopt$scl.size <- nopt$scl.old
if (verbose) cat(" (denied)")
} else {
nopt$ltime <- tm
if (verbose) cat(" (accepted)")
}
if (nopt$delta == 0) {
nopt$ltime <- tm
}
nopt$delta <- round(runif(1, -2.0, 2.0))
nopt$scl.old <- nopt$scl.size
nopt$scl.size <- nopt$scl.size + nopt$delta
if (nopt$scl.size > length(scl)) {
nopt$scl.size <- length(scl)
}
if (nopt$scl.size < 2) {
nopt$scl.size <- 2
}
if (verbose) cat(" next:", nopt$scl.size, "\n")
}
nopt.index <- function(scl, nopt) {
sort(sample(seq(along=scl), nopt$scl.size))
}
nopt <- new.env()
nopt$scl.size <- 5
nopt$scl.old <- nopt$scl.size
nopt$delta <- 0
nopt$ltime <- Inf
p.t.test <- function(x) {
t.test(x)$p.value
}
m <- matrix(runif(1000000), ncol=100)
例1
> for (i in 1:100) {
+ scl.idx <- nopt.index(scl, nopt)
+ etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test))
+ nopt.calc(etime[3], scl, nopt)
+ }
nopt n: 5 time: 26.8 (accepted) next: 7 <<-- n=5からスタート
.
.
.
nopt n: 5 time: 26.33 (accepted) next: 7
nopt n: 7 time: 28.46 (denied) next: 6
nopt n: 6 time: 27.08 (denied) next: 5 <<-- n=5~7でうろうろしている
例2
> for (i in 1:100) {
+ scl.idx <- nopt.index(scl, nopt)
+ etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test))
+ nopt.calc(etime[3], scl, nopt)
+ }
nopt n: 5 time: 13.5 (accepted) next: 4 <<-- n=5からスタート
.
.
.
nopt n: 39 time: 3.04 (denied) next: 40
nopt n: 40 time: 3.01 (denied) next: 40
nopt n: 40 time: 3.02 (denied) next: 40 <<-- n=40は全ノード数
例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。
ローカルホストへはログインせずに実行するための修正
newSOCKnode <- function(machine = "localhost", ...,
options = defaultClusterOptions) {
# **** allow some form of spec here
# **** make sure options are quoted
options <- addClusterOptions(options, list(...))
port <- getClusterOption("port", options)
scriptdir <- getClusterOption("scriptdir", options)
if (getClusterOption("homogeneous")) {
script <- file.path(scriptdir, "RSOCKnode.sh")
rlibs <- paste(getClusterOption("rlibs", options), collapse = ":")
rprog <- getClusterOption("rprog", options)
}
else {
script <- "RunSnowNode RSOCKnode.sh"
rlibs <- NULL
rprog <- NULL
}
rshcmd <- getClusterOption("rshcmd", options)
user <- getClusterOption("user", options)
env <- paste("MASTER=", getClusterOption("master", options),
" PORT=", port,
" OUT=", getClusterOption("outfile", options),
sep="")
if (! is.null(rprog))
env <- paste(env, " RPROG=", rprog, sep="")
if (! is.null(rlibs))
env <- paste(env, " R_LIBS=", rlibs, sep="")
if (machine == "localhost") {
system(paste("sh -c \'env", env, script, "\'")) # 追加
} else {
system(paste(rshcmd, "-l", user, machine, "env", env, script))
}
con <- socketConnection(port = port, server=TRUE, blocking=TRUE,
open="a+b")
structure(list(con = con, host = machine), class = "SOCKnode")
}
グローバル以外のオブジェクトでも利用可能なように修正
clusterExport <- function (cl, list){
gets <- function(n, v) {
assign(n, v, env = .GlobalEnv)
NULL
}
for (name in list) {
clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正
}
}
単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います.
require(snow)
# スレーブに渡す関数定義
foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) }
foo3<-function(x) { max(x) } ; foo<-list(foo1,foo2,foo3)
# 関数の数を基にスレーブ数を決定
task<-length(foo) # タスク数
# hostnameは識別用に使用します
remotenodes <- paste("localhost",1:task,sep="")
calc <-function(nodes,n,fun)
{
# Rmpiと同様に扱えるようホスト名でランク付け
rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")]
set.seed(rank)
x1<-array(runif(n*n),dim=c(n,n))
x2<-x1%*%x1
fun[[rank]](x2)
}
SOCK上でのディープな設定.
# スレーブ先の異なるマシン間でRのPATHが異なる場合利用します setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R") setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow")
まあ、こんな感じで
# slave起動
cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake")
# 各スレーブにライブラリのロード
clusterEvalQ(cl,library(boot))
# 仕事振り分け
rc <- clusterCall(cl, calc,remotenodes,100,foo)
sapply(rc,print)
rshの嘘っこ版
#!/bin/sh
if [ $# -lt 4 ] ;then
echo "usage: $0 -l user hostname [command [arg...]]"
exit 1
fi
SOCKRANK=$3
export SOCKRANK
shift 3
exec $*
でもそこまでするなら素直にLAM使った方が楽です(爆)
若干の記事をこちらに掲載 Rmpi、snow、RSPRNG