Rで並列計算
(とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。)
snowパッケージを用いてクラスターマシン上で並列計算を行う。
Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。
MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,LAM-MPIを想定している.
LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに.
PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い.
# うっかり,--no-saveを忘れると痛ましいかも. mpiexec -boot N R CMD BATCH --no-save hoge.R
R上で
install.packages(c("rpvm", "Rmpi", "snow"))
rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。
ライブラリの読み込み
> library(snow)
SOCKを使用
> hosts <- rep("localhost", 10) > scl <- makeCluster(hosts, "SOCK")
PVMを使用
> scl <- makeCluster(10, "PVM") Loading required package: rpvm
MPIを使用
> scl <- makeCluster(10, "MPI") Loading required package: Rmpi
終了時に解放
> .Last <- function() { stopCluster(scl) }
乱数の初期化
> set.seed(1234) > seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max)) > sclusterApply(cl, seeds, set.seed)
###自信がないですが・・・
> seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) > clusterApply(scl, seeds, set.seed)
###ですよね。ryamada22(05/24/2006)
関数を用意
> i <- c(1:10000) > x <- runif(50) > tmp.func <- function(i){xx <- sample(x); mean(xx)}
並列化なし
> system.time(lapply(i, tmp.func)) [1] 2.96 0.31 3.27 0.00 0.00
並列化あり
> clusterExport(cl, "x")
###ここも
> clusterExport(scl, "x")
###ですよね。 ryamada22(05/24/2006)
> system.time(parLapply(scl, i, tmp.func)) [1] 0.14 0.01 0.44 0.00 0.00
並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。
準備
scl <- makeCluster(40) .Last <- function() { stopCluster(scl) } seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) clusterApply(scl, seeds, set.seed) nopt.calc <- function(tm, scl, nopt, verbose=T) { if (verbose) cat("nopt n:", nopt$scl.size, " time:", tm) if (nopt$ltime < tm) { nopt$scl.size <- nopt$scl.old if (verbose) cat(" (denied)") } else { nopt$ltime <- tm if (verbose) cat(" (accepted)") } if (nopt$delta == 0) { nopt$ltime <- tm } nopt$delta <- round(runif(1, -2.0, 2.0)) nopt$scl.old <- nopt$scl.size nopt$scl.size <- nopt$scl.size + nopt$delta if (nopt$scl.size > length(scl)) { nopt$scl.size <- length(scl) } if (nopt$scl.size < 2) { nopt$scl.size <- 2 } if (verbose) cat(" next:", nopt$scl.size, "\n") } nopt.index <- function(scl, nopt) { sort(sample(seq(along=scl), nopt$scl.size)) } nopt <- new.env() nopt$scl.size <- 5 nopt$scl.old <- nopt$scl.size nopt$delta <- 0 nopt$ltime <- Inf p.t.test <- function(x) { t.test(x)$p.value } m <- matrix(runif(1000000), ncol=100)
例1
> for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 26.8 (accepted) next: 7 <<-- n=5からスタート . . . nopt n: 5 time: 26.33 (accepted) next: 7 nopt n: 7 time: 28.46 (denied) next: 6 nopt n: 6 time: 27.08 (denied) next: 5 <<-- n=5~7でうろうろしている
例2
> for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 13.5 (accepted) next: 4 <<-- n=5からスタート . . . nopt n: 39 time: 3.04 (denied) next: 40 nopt n: 40 time: 3.01 (denied) next: 40 nopt n: 40 time: 3.02 (denied) next: 40 <<-- n=40は全ノード数
例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。
ローカルホストへはログインせずに実行するための修正
newSOCKnode <- function(machine = "localhost", ..., options = defaultClusterOptions) { # **** allow some form of spec here # **** make sure options are quoted options <- addClusterOptions(options, list(...)) port <- getClusterOption("port", options) scriptdir <- getClusterOption("scriptdir", options) if (getClusterOption("homogeneous")) { script <- file.path(scriptdir, "RSOCKnode.sh") rlibs <- paste(getClusterOption("rlibs", options), collapse = ":") rprog <- getClusterOption("rprog", options) } else { script <- "RunSnowNode RSOCKnode.sh" rlibs <- NULL rprog <- NULL } rshcmd <- getClusterOption("rshcmd", options) user <- getClusterOption("user", options) env <- paste("MASTER=", getClusterOption("master", options), " PORT=", port, " OUT=", getClusterOption("outfile", options), sep="") if (! is.null(rprog)) env <- paste(env, " RPROG=", rprog, sep="") if (! is.null(rlibs)) env <- paste(env, " R_LIBS=", rlibs, sep="") if (machine == "localhost") { system(paste("sh -c \'env", env, script, "\'")) # 追加 } else { system(paste(rshcmd, "-l", user, machine, "env", env, script)) } con <- socketConnection(port = port, server=TRUE, blocking=TRUE, open="a+b") structure(list(con = con, host = machine), class = "SOCKnode") }
グローバル以外のオブジェクトでも利用可能なように修正
clusterExport <- function (cl, list){ gets <- function(n, v) { assign(n, v, env = .GlobalEnv) NULL } for (name in list) { clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正 } }
単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います.
require(snow) # スレーブに渡す関数定義 foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) } foo3<-function(x) { max(x) } ; foo<-list(foo1,foo2,foo3) # 関数の数を基にスレーブ数を決定 task<-length(foo) # タスク数 # hostnameは識別用に使用します remotenodes <- paste("localhost",1:task,sep="") calc <-function(nodes,n,fun) { # Rmpiと同様に扱えるようホスト名でランク付け rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")] set.seed(rank) x1<-array(runif(n*n),dim=c(n,n)) x2<-x1%*%x1 fun[[rank]](x2) }
SOCK上でのディープな設定.
# スレーブ先の異なるマシン間でRのPATHが異なる場合利用します setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R") setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow")
まあ、こんな感じで
# slave起動 cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake") # 各スレーブにライブラリのロード clusterEvalQ(cl,library(boot)) # 仕事振り分け rc <- clusterCall(cl, calc,remotenodes,100,foo) sapply(rc,print)
rshの嘘っこ版
#!/bin/sh if [ $# -lt 4 ] ;then echo "usage: $0 -l user hostname [command [arg...]]" exit 1 fi SOCKRANK=$3 export SOCKRANK shift 3 exec $*
でもそこまでするなら素直にLAM使った方が楽です(爆)
若干の記事をこちらに掲載 Rmpi、snow、RSPRNG