Rで並列計算

(とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。)

snowパッケージを用いてクラスターマシン上で並列計算を行う。


インストール

PVM

Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。

MPI

MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,LAM-MPIを想定している.

LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに.

PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い.

# うっかり,--no-saveを忘れると痛ましいかも.
mpiexec -boot N R CMD BATCH --no-save hoge.R

ライブラリ

R上で

install.packages(c("rpvm", "Rmpi", "snow"))

rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。

実行例

ライブラリの読み込み

> library(snow)

SOCKを使用

> hosts <- rep("localhost", 10)
> scl <- makeCluster(hosts, "SOCK")

PVMを使用

> scl <- makeCluster(10, "PVM")
Loading required package: rpvm

MPIを使用

> scl <- makeCluster(10, "MPI")
Loading required package: Rmpi

終了時に解放

> .Last <- function() { stopCluster(scl) }

乱数の初期化

> set.seed(1234)
> seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max))
> sclusterApply(cl, seeds, set.seed)

###自信がないですが・・・

> seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max))
> clusterApply(scl, seeds, set.seed)

###ですよね。ryamada22(05/24/2006)

関数を用意

> i <- c(1:10000)
> x <- runif(50)
> tmp.func <- function(i){xx <- sample(x); mean(xx)}

並列化なし

> system.time(lapply(i, tmp.func))
 [1] 2.96 0.31 3.27 0.00 0.00

並列化あり

> clusterExport(cl, "x")

###ここも

> clusterExport(scl, "x")

###ですよね。 ryamada22(05/24/2006)

> system.time(parLapply(scl, i, tmp.func))
 [1] 0.14 0.01 0.44 0.00 0.00

その他

実験

並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。

準備

scl <- makeCluster(40)
.Last <- function() { stopCluster(scl) }
seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max))
clusterApply(scl, seeds, set.seed)

nopt.calc <- function(tm, scl, nopt, verbose=T) {
 if (verbose) cat("nopt  n:", nopt$scl.size, " time:", tm)
 if (nopt$ltime < tm) {
   nopt$scl.size <- nopt$scl.old
   if (verbose) cat("  (denied)")
 } else {
   nopt$ltime <- tm
   if (verbose) cat("  (accepted)")
 }
 if (nopt$delta == 0) {
   nopt$ltime <- tm
 }
 nopt$delta <- round(runif(1, -2.0, 2.0))
 nopt$scl.old  <- nopt$scl.size
 nopt$scl.size <- nopt$scl.size + nopt$delta
 if (nopt$scl.size > length(scl)) {
   nopt$scl.size <- length(scl)
 }
 if (nopt$scl.size < 2) {
   nopt$scl.size <- 2
 }
 if (verbose) cat("  next:", nopt$scl.size, "\n")
}

nopt.index <- function(scl, nopt) {
 sort(sample(seq(along=scl), nopt$scl.size))
}

nopt <- new.env()

nopt$scl.size <- 5
nopt$scl.old  <- nopt$scl.size
nopt$delta <- 0
nopt$ltime <- Inf

p.t.test <- function(x) {
  t.test(x)$p.value
}

m <- matrix(runif(1000000), ncol=100)

例1

> for (i in 1:100) {
+   scl.idx <- nopt.index(scl, nopt)
+   etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test))
+   nopt.calc(etime[3], scl, nopt)
+ }
nopt  n: 5  time: 26.8  (accepted)  next: 7    <<-- n=5からスタート
 .
 .
 .
nopt  n: 5  time: 26.33  (accepted)  next: 7
nopt  n: 7  time: 28.46  (denied)  next: 6
nopt  n: 6  time: 27.08  (denied)  next: 5     <<-- n=5~7でうろうろしている

例2

> for (i in 1:100) {
+   scl.idx <- nopt.index(scl, nopt)
+   etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test))
+   nopt.calc(etime[3], scl, nopt)
+ }
nopt  n: 5  time: 13.5  (accepted)  next: 4    <<-- n=5からスタート
 .
 .
 .
nopt  n: 39  time: 3.04  (denied)  next: 40
nopt  n: 40  time: 3.01  (denied)  next: 40
nopt  n: 40  time: 3.02  (denied)  next: 40    <<-- n=40は全ノード数

例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。

ライブラリの修正

ローカルホストへはログインせずに実行するための修正

newSOCKnode <- function(machine = "localhost", ...,
                       options = defaultClusterOptions) {
   # **** allow some form of spec here
   # **** make sure options are quoted
   options <- addClusterOptions(options, list(...))
   port <- getClusterOption("port", options)
   scriptdir <- getClusterOption("scriptdir", options)
   if (getClusterOption("homogeneous")) {
       script <- file.path(scriptdir, "RSOCKnode.sh")
       rlibs <- paste(getClusterOption("rlibs", options), collapse = ":")
       rprog <- getClusterOption("rprog", options)
   }
   else {
       script <- "RunSnowNode RSOCKnode.sh"
       rlibs <- NULL
       rprog <- NULL
   }
   rshcmd <- getClusterOption("rshcmd", options)
   user <- getClusterOption("user", options)
   env <- paste("MASTER=", getClusterOption("master", options),
                " PORT=", port,
                " OUT=", getClusterOption("outfile", options),
                sep="")
   if (! is.null(rprog))
       env <- paste(env, " RPROG=", rprog, sep="")
   if (! is.null(rlibs))
       env <- paste(env, " R_LIBS=", rlibs, sep="")

   if (machine == "localhost") {
       system(paste("sh -c \'env", env, script, "\'")) # 追加
   } else {
       system(paste(rshcmd, "-l", user, machine, "env", env, script))
   }
   con <- socketConnection(port = port, server=TRUE, blocking=TRUE,
                           open="a+b")
   structure(list(con = con, host = machine), class = "SOCKnode")
}

グローバル以外のオブジェクトでも利用可能なように修正

clusterExport <- function (cl, list){
 gets <- function(n, v) {
   assign(n, v, env = .GlobalEnv)
   NULL
 }
 for (name in list) {
   clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正
 }
}

snow SOCK例(nfsやlamは難しい方でもOK?)

単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います.

require(snow)
                                     # スレーブに渡す関数定義
foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) }
foo3<-function(x) { max(x) }  ; foo<-list(foo1,foo2,foo3)
                                     # 関数の数を基にスレーブ数を決定
task<-length(foo)                    # タスク数
                                     # hostnameは識別用に使用します
remotenodes <- paste("localhost",1:task,sep="")
calc <-function(nodes,n,fun)
{
   # Rmpiと同様に扱えるようホスト名でランク付け
   rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")]
   set.seed(rank)
   x1<-array(runif(n*n),dim=c(n,n))
   x2<-x1%*%x1
   fun[[rank]](x2)
}

SOCK上でのディープな設定.

# スレーブ先の異なるマシン間でRのPATHが異なる場合利用します
setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R")
setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow")

まあ、こんな感じで

                                      # slave起動
cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake")
                                      # 各スレーブにライブラリのロード
clusterEvalQ(cl,library(boot))
                                      # 仕事振り分け
rc <- clusterCall(cl, calc,remotenodes,100,foo)
sapply(rc,print)

rshの嘘っこ版

#!/bin/sh
if [ $# -lt 4 ] ;then
        echo "usage: $0 -l user hostname [command [arg...]]"
        exit 1
fi

SOCKRANK=$3
export SOCKRANK

shift 3
exec $*

でもそこまでするなら素直にLAM使った方が楽です(爆)


若干の記事をこちらに掲載 Rmpi、snow、RSPRNG




トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2023-03-25 (土) 11:19:17