Rで並列計算

(とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。)

snowパッケージを用いてクラスターマシン上で並列計算を行う。


インストール

PVM

Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。

MPI

MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,LAM-MPIを想定している.

LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに.

PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い.

  • Rmpi snow経由で使うのが簡単.並列マトリックス計算も付いているけど,冗談に等しい.
  • rsprng 並列乱数生成.sprng2.0(gmp+MPIを要求する)を使う.rsprngのページにsprng2.0に対するパッチがあるのでそれを忘れずに. rsprngが無いと各並列マシン上では同じ乱数が生成されるので必要だと思われる.
# うっかり,--no-saveを忘れると痛ましいかも.
mpiexec -boot N R CMD BATCH --no-save hoge.R

ライブラリ

R上で

install.packages(c("rpvm", "Rmpi", "snow"))

rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。

実行例

ライブラリの読み込み

> library(snow)

SOCKを使用

> hosts <- rep("localhost", 10)
> scl <- makeCluster(hosts, "SOCK")

PVMを使用

> scl <- makeCluster(10, "PVM")
Loading required package: rpvm

MPIを使用

> scl <- makeCluster(10, "MPI")
Loading required package: Rmpi

終了時に解放

> .Last <- function() { stopCluster(scl) }

乱数の初期化

> set.seed(1234)
> seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max))
> sclusterApply(cl, seeds, set.seed)

###自信がないですが・・・

> seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max))
> clusterApply(scl, seeds, set.seed)

###ですよね。ryamada22(05/24/2006)

関数を用意

> i <- c(1:10000)
> x <- runif(50)
> tmp.func <- function(i){xx <- sample(x); mean(xx)}

並列化なし

> system.time(lapply(i, tmp.func))
 [1] 2.96 0.31 3.27 0.00 0.00

並列化あり

> clusterExport(cl, "x")

###ここも

> clusterExport(scl, "x")

###ですよね。 ryamada22(05/24/2006)

> system.time(parLapply(scl, i, tmp.func))
 [1] 0.14 0.01 0.44 0.00 0.00

その他

実験

並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。

準備

scl <- makeCluster(40)
.Last <- function() { stopCluster(scl) }
seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max))
clusterApply(scl, seeds, set.seed)

nopt.calc <- function(tm, scl, nopt, verbose=T) {
 if (verbose) cat("nopt  n:", nopt$scl.size, " time:", tm)
 if (nopt$ltime < tm) {
   nopt$scl.size <- nopt$scl.old
   if (verbose) cat("  (denied)")
 } else {
   nopt$ltime <- tm
   if (verbose) cat("  (accepted)")
 }
 if (nopt$delta == 0) {
   nopt$ltime <- tm
 }
 nopt$delta <- round(runif(1, -2.0, 2.0))
 nopt$scl.old  <- nopt$scl.size
 nopt$scl.size <- nopt$scl.size + nopt$delta
 if (nopt$scl.size > length(scl)) {
   nopt$scl.size <- length(scl)
 }
 if (nopt$scl.size < 2) {
   nopt$scl.size <- 2
 }
 if (verbose) cat("  next:", nopt$scl.size, "\n")
}

nopt.index <- function(scl, nopt) {
 sort(sample(seq(along=scl), nopt$scl.size))
}

nopt <- new.env()

nopt$scl.size <- 5
nopt$scl.old  <- nopt$scl.size
nopt$delta <- 0
nopt$ltime <- Inf

p.t.test <- function(x) {
  t.test(x)$p.value
}

m <- matrix(runif(1000000), ncol=100)

例1

> for (i in 1:100) {
+   scl.idx <- nopt.index(scl, nopt)
+   etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test))
+   nopt.calc(etime[3], scl, nopt)
+ }
nopt  n: 5  time: 26.8  (accepted)  next: 7    <<-- n=5からスタート
 .
 .
 .
nopt  n: 5  time: 26.33  (accepted)  next: 7
nopt  n: 7  time: 28.46  (denied)  next: 6
nopt  n: 6  time: 27.08  (denied)  next: 5     <<-- n=5~7でうろうろしている

例2

> for (i in 1:100) {
+   scl.idx <- nopt.index(scl, nopt)
+   etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test))
+   nopt.calc(etime[3], scl, nopt)
+ }
nopt  n: 5  time: 13.5  (accepted)  next: 4    <<-- n=5からスタート
 .
 .
 .
nopt  n: 39  time: 3.04  (denied)  next: 40
nopt  n: 40  time: 3.01  (denied)  next: 40
nopt  n: 40  time: 3.02  (denied)  next: 40    <<-- n=40は全ノード数

例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。

ライブラリの修正

ローカルホストへはログインせずに実行するための修正

newSOCKnode <- function(machine = "localhost", ...,
                       options = defaultClusterOptions) {
   # **** allow some form of spec here
   # **** make sure options are quoted
   options <- addClusterOptions(options, list(...))
   port <- getClusterOption("port", options)
   scriptdir <- getClusterOption("scriptdir", options)
   if (getClusterOption("homogeneous")) {
       script <- file.path(scriptdir, "RSOCKnode.sh")
       rlibs <- paste(getClusterOption("rlibs", options), collapse = ":")
       rprog <- getClusterOption("rprog", options)
   }
   else {
       script <- "RunSnowNode RSOCKnode.sh"
       rlibs <- NULL
       rprog <- NULL
   }
   rshcmd <- getClusterOption("rshcmd", options)
   user <- getClusterOption("user", options)
   env <- paste("MASTER=", getClusterOption("master", options),
                " PORT=", port,
                " OUT=", getClusterOption("outfile", options),
                sep="")
   if (! is.null(rprog))
       env <- paste(env, " RPROG=", rprog, sep="")
   if (! is.null(rlibs))
       env <- paste(env, " R_LIBS=", rlibs, sep="")

   if (machine == "localhost") {
       system(paste("sh -c \'env", env, script, "\'")) # 追加
   } else {
       system(paste(rshcmd, "-l", user, machine, "env", env, script))
   }
   con <- socketConnection(port = port, server=TRUE, blocking=TRUE,
                           open="a+b")
   structure(list(con = con, host = machine), class = "SOCKnode")
}

グローバル以外のオブジェクトでも利用可能なように修正

clusterExport <- function (cl, list){
 gets <- function(n, v) {
   assign(n, v, env = .GlobalEnv)
   NULL
 }
 for (name in list) {
   clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正
 }
}

snow SOCK例(nfsやlamは難しい方でもOK?)

単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います.

require(snow)
                                     # スレーブに渡す関数定義
foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) }
foo3<-function(x) { max(x) }  ; foo<-list(foo1,foo2,foo3)
                                     # 関数の数を基にスレーブ数を決定
task<-length(foo)                    # タスク数
                                     # hostnameは識別用に使用します
remotenodes <- paste("localhost",1:task,sep="")
calc <-function(nodes,n,fun)
{
   # Rmpiと同様に扱えるようホスト名でランク付け
   rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")]
   set.seed(rank)
   x1<-array(runif(n*n),dim=c(n,n))
   x2<-x1%*%x1
   fun[[rank]](x2)
}

SOCK上でのディープな設定.

# スレーブ先の異なるマシン間でRのPATHが異なる場合利用します
setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R")
setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow")

まあ、こんな感じで

                                      # slave起動
cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake")
                                      # 各スレーブにライブラリのロード
clusterEvalQ(cl,library(boot))
                                      # 仕事振り分け
rc <- clusterCall(cl, calc,remotenodes,100,foo)
sapply(rc,print)

rshの嘘っこ版

#!/bin/sh
if [ $# -lt 4 ] ;then
        echo "usage: $0 -l user hostname [command [arg...]]"
        exit 1
fi

SOCKRANK=$3
export SOCKRANK

shift 3
exec $*

でもそこまでするなら素直にLAM使った方が楽です(爆)


若干の記事をこちらに掲載 Rmpi、snow、RSPRNG




トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Google
WWW を検索 OKADAJP.ORG を検索
Last-modified: 2015-03-24 (火) 12:40:52 (465d)