Rで並列計算
の編集
http://www.okadajp.org/RWiki/?R%E3%81%A7%E4%B8%A6%E5%88%97%E8%A8%88%E7%AE%97
[
トップ
] [
編集
|
差分
|
バックアップ
|
添付
|
リロード
] [
新規
|
一覧
|
検索
|
最終更新
|
ヘルプ
]
-- 雛形とするページ --
(no template pages)
//門脇(2005-8-26) COLOR(red){SIZE(30){Rで並列計算}} (とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。) snowパッケージを用いてクラスターマシン上で並列計算を行う。 // "*", "**", "***" による段落に応じて、ハイパーリンク付きの目次一覧をこの位置に置く // 行頭が "//" で始まる行はコメントで表示されません。 #contents // SIZE(num){テキスト} はテキスト文字の大きさを指定します // COLOR{color}{テキスト} はテキスト文字の色を指定します // 行頭が "**" で始まると、直前の "*" で始まる段落の副段落として整形されます // 二重括弧 "((...))" で囲ったテキストは注釈になります。注釈はページ末尾に置かれ、ハイパーリンクが張られます // 行頭に半角空白を置くとこの通り枠で囲って見易く整形されます // 行頭に半角空白がある引き続く行は一まとめに整形されます // コードを書くのに便利です // 行頭が "***" で始まると、直前の "**" で始まる副段落の副段落として整形されます // 行頭に "----" を置くとその場所に水平線が引かれます ---- // 行頭に "*" を置くと、段落見出しとして整形されます *インストール [#wc008de9] ** [[PVM:http://www.csm.ornl.gov/pvm/pvm_home.html]] [#h35a56aa] Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。 ** MPI [#x162b1e4] MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,[[LAM-MPI:http://www.lam-mpi.org]]を想定している. LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに. PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い. - [[Rmpi:http://cran.r-project.org/src/contrib/Descriptions/Rmpi.html]] snow経由で使うのが簡単.並列マトリックス計算も付いているけど,冗談に等しい. - [[rsprng:http://www.biostat.umn.edu/~nali/]] 並列乱数生成.[[sprng2.0:http://sprng.cs.fsu.edu/Version2.0/index.html]]([[gmp:http://swox.com/gmp/]]+MPIを要求する)を使う.rsprngのページにsprng2.0に対する[[パッチ:http://www.biostat.umn.edu/~nali/RsprngNotes.html]]があるのでそれを忘れずに. rsprngが無いと各並列マシン上では同じ乱数が生成されるので必要だと思われる. # うっかり,--no-saveを忘れると痛ましいかも. mpiexec -boot N R CMD BATCH --no-save hoge.R // 簡単ですが... **ライブラリ [#vbeb8571] R上で install.packages(c("rpvm", "Rmpi", "snow")) rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。 *実行例 [#pee3d04d] ライブラリの読み込み > library(snow) SOCKを使用 > hosts <- rep("localhost", 10) > scl <- makeCluster(hosts, "SOCK") // fixed (2008-03-24) PVMを使用 > scl <- makeCluster(10, "PVM") Loading required package: rpvm MPIを使用 > scl <- makeCluster(10, "MPI") Loading required package: Rmpi 終了時に解放 > .Last <- function() { stopCluster(scl) } 乱数の初期化 > set.seed(1234) > seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max)) > sclusterApply(cl, seeds, set.seed) ###自信がないですが・・・ > seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) > clusterApply(scl, seeds, set.seed) ###ですよね。ryamada22(05/24/2006) 関数を用意 > i <- c(1:10000) > x <- runif(50) > tmp.func <- function(i){xx <- sample(x); mean(xx)} 並列化なし > system.time(lapply(i, tmp.func)) [1] 2.96 0.31 3.27 0.00 0.00 並列化あり > clusterExport(cl, "x") ###ここも > clusterExport(scl, "x") ###ですよね。 ryamada22(05/24/2006) > system.time(parLapply(scl, i, tmp.func)) [1] 0.14 0.01 0.44 0.00 0.00 *その他 [#ta00c3e1] **実験 [#be842447] 並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。 準備 scl <- makeCluster(40) .Last <- function() { stopCluster(scl) } seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) clusterApply(scl, seeds, set.seed) nopt.calc <- function(tm, scl, nopt, verbose=T) { if (verbose) cat("nopt n:", nopt$scl.size, " time:", tm) if (nopt$ltime < tm) { nopt$scl.size <- nopt$scl.old if (verbose) cat(" (denied)") } else { nopt$ltime <- tm if (verbose) cat(" (accepted)") } if (nopt$delta == 0) { nopt$ltime <- tm } nopt$delta <- round(runif(1, -2.0, 2.0)) nopt$scl.old <- nopt$scl.size nopt$scl.size <- nopt$scl.size + nopt$delta if (nopt$scl.size > length(scl)) { nopt$scl.size <- length(scl) } if (nopt$scl.size < 2) { nopt$scl.size <- 2 } if (verbose) cat(" next:", nopt$scl.size, "\n") } nopt.index <- function(scl, nopt) { sort(sample(seq(along=scl), nopt$scl.size)) } nopt <- new.env() nopt$scl.size <- 5 nopt$scl.old <- nopt$scl.size nopt$delta <- 0 nopt$ltime <- Inf p.t.test <- function(x) { t.test(x)$p.value } m <- matrix(runif(1000000), ncol=100) 例1 > for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 26.8 (accepted) next: 7 <<-- n=5からスタート . . . nopt n: 5 time: 26.33 (accepted) next: 7 nopt n: 7 time: 28.46 (denied) next: 6 nopt n: 6 time: 27.08 (denied) next: 5 <<-- n=5~7でうろうろしている 例2 > for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 13.5 (accepted) next: 4 <<-- n=5からスタート . . . nopt n: 39 time: 3.04 (denied) next: 40 nopt n: 40 time: 3.01 (denied) next: 40 nopt n: 40 time: 3.02 (denied) next: 40 <<-- n=40は全ノード数 例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。 **ライブラリの修正 [#z68c3fb8] ローカルホストへはログインせずに実行するための修正 newSOCKnode <- function(machine = "localhost", ..., options = defaultClusterOptions) { # **** allow some form of spec here # **** make sure options are quoted options <- addClusterOptions(options, list(...)) port <- getClusterOption("port", options) scriptdir <- getClusterOption("scriptdir", options) if (getClusterOption("homogeneous")) { script <- file.path(scriptdir, "RSOCKnode.sh") rlibs <- paste(getClusterOption("rlibs", options), collapse = ":") rprog <- getClusterOption("rprog", options) } else { script <- "RunSnowNode RSOCKnode.sh" rlibs <- NULL rprog <- NULL } rshcmd <- getClusterOption("rshcmd", options) user <- getClusterOption("user", options) env <- paste("MASTER=", getClusterOption("master", options), " PORT=", port, " OUT=", getClusterOption("outfile", options), sep="") if (! is.null(rprog)) env <- paste(env, " RPROG=", rprog, sep="") if (! is.null(rlibs)) env <- paste(env, " R_LIBS=", rlibs, sep="") if (machine == "localhost") { system(paste("sh -c \'env", env, script, "\'")) # 追加 } else { system(paste(rshcmd, "-l", user, machine, "env", env, script)) } con <- socketConnection(port = port, server=TRUE, blocking=TRUE, open="a+b") structure(list(con = con, host = machine), class = "SOCKnode") } グローバル以外のオブジェクトでも利用可能なように修正 clusterExport <- function (cl, list){ gets <- function(n, v) { assign(n, v, env = .GlobalEnv) NULL } for (name in list) { clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正 } } **snow SOCK例(nfsやlamは難しい方でもOK?) [#ece2c779] 単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います. require(snow) # スレーブに渡す関数定義 foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) } foo3<-function(x) { max(x) } ; foo<-list(foo1,foo2,foo3) # 関数の数を基にスレーブ数を決定 task<-length(foo) # タスク数 # hostnameは識別用に使用します remotenodes <- paste("localhost",1:task,sep="") calc <-function(nodes,n,fun) { # Rmpiと同様に扱えるようホスト名でランク付け rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")] set.seed(rank) x1<-array(runif(n*n),dim=c(n,n)) x2<-x1%*%x1 fun[[rank]](x2) } SOCK上でのディープな設定. # スレーブ先の異なるマシン間でRのPATHが異なる場合利用します setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R") setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow") まあ、こんな感じで # slave起動 cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake") # 各スレーブにライブラリのロード clusterEvalQ(cl,library(boot)) # 仕事振り分け rc <- clusterCall(cl, calc,remotenodes,100,foo) sapply(rc,print) rshの嘘っこ版 #!/bin/sh if [ $# -lt 4 ] ;then echo "usage: $0 -l user hostname [command [arg...]]" exit 1 fi SOCKRANK=$3 export SOCKRANK shift 3 exec $* でもそこまでするなら素直にLAM使った方が楽です(爆) ----- 若干の記事をこちらに掲載 Rmpi、snow、RSPRNG ->http://d.hatena.ne.jp/ryamada22/20060524 ---- -動作環境教えていただけませんか? -- [[HPC]] &new{2006-05-25 (木) 10:36:18}; - 新刊書 [[Parallel R:http://www.amazon.co.jp/Parallel-R-Q-Ethan-McCallum/dp/1449309925/ref=sr_1_1?s=english-books&ie=UTF8&qid=1323943184&sr=1-1]] R の本体付属パッケージ parallel の解説もあり。 -- &new{2011-12-15 (木) 19:02:55}; - pforeachパッケージがよいか。 http://d.hatena.ne.jp/hoxo_m/20141222/p1 -- &new{2015-01-18 (日) 00:07:59}; - [[Workshop on Distributed Computing in R:http://www.hpl.hp.com/research/systems-research/R-workshop/]] -- &new{2015-03-24 (火) 12:40:52}; #comment
タイムスタンプを変更しない
//門脇(2005-8-26) COLOR(red){SIZE(30){Rで並列計算}} (とりあえず、動いたので報告です。256ノードで動かすと、さすがに速いです。修正大歓迎。) snowパッケージを用いてクラスターマシン上で並列計算を行う。 // "*", "**", "***" による段落に応じて、ハイパーリンク付きの目次一覧をこの位置に置く // 行頭が "//" で始まる行はコメントで表示されません。 #contents // SIZE(num){テキスト} はテキスト文字の大きさを指定します // COLOR{color}{テキスト} はテキスト文字の色を指定します // 行頭が "**" で始まると、直前の "*" で始まる段落の副段落として整形されます // 二重括弧 "((...))" で囲ったテキストは注釈になります。注釈はページ末尾に置かれ、ハイパーリンクが張られます // 行頭に半角空白を置くとこの通り枠で囲って見易く整形されます // 行頭に半角空白がある引き続く行は一まとめに整形されます // コードを書くのに便利です // 行頭が "***" で始まると、直前の "**" で始まる副段落の副段落として整形されます // 行頭に "----" を置くとその場所に水平線が引かれます ---- // 行頭に "*" を置くと、段落見出しとして整形されます *インストール [#wc008de9] ** [[PVM:http://www.csm.ornl.gov/pvm/pvm_home.html]] [#h35a56aa] Makefile, Makefile.aimkを修正してmake。展開したディレクトリをそのまま使う。コンパイルには、Rをビルドしたものと同じコンパイラを指定しないと、実行時に動かない可能性あり。シェルの起動スクリプトに、パスの追加、PVM_ROOTやPVM_ARCHの設定。 ** MPI [#x162b1e4] MPIの実装は色々あるがRのMPI関連のパッケージは殆ど,[[LAM-MPI:http://www.lam-mpi.org]]を想定している. LAM-MPIはNFS,RSH(SSHでもよいがパスワード無しの運用するならRSHで分離してportmapで 制限をかけた方が良い気がする)が必要でテンポラリ(環境変数 LAM_MPI_SESSION_PREFIX :以前はLAM_TMP) 配下でセション連携(NFS上)を行うのでテンポラリをNFS上に切る必要がある.デフォルトではTMP_DIR(つまりデフォルトのままだと/tmp)が使われるので忘れずに. PBS環境ではlibpbsが必要(これが無いとバックエンドは暇なプロセスとして殺される). 某英三文字のメーカ製のopenPBSのlibpbsはPICコードでは無いのでリンク不可なので. 適当に本尊からダウンロードしてきてでっち上げれば問題無い. - [[Rmpi:http://cran.r-project.org/src/contrib/Descriptions/Rmpi.html]] snow経由で使うのが簡単.並列マトリックス計算も付いているけど,冗談に等しい. - [[rsprng:http://www.biostat.umn.edu/~nali/]] 並列乱数生成.[[sprng2.0:http://sprng.cs.fsu.edu/Version2.0/index.html]]([[gmp:http://swox.com/gmp/]]+MPIを要求する)を使う.rsprngのページにsprng2.0に対する[[パッチ:http://www.biostat.umn.edu/~nali/RsprngNotes.html]]があるのでそれを忘れずに. rsprngが無いと各並列マシン上では同じ乱数が生成されるので必要だと思われる. # うっかり,--no-saveを忘れると痛ましいかも. mpiexec -boot N R CMD BATCH --no-save hoge.R // 簡単ですが... **ライブラリ [#vbeb8571] R上で install.packages(c("rpvm", "Rmpi", "snow")) rpvmおよびRmpiは、並列計算ライブラリのMPIおよびPVM、を利用する場合に必要。 それ以外にsshによるリモートログインを用いた並列計算も可能。 *実行例 [#pee3d04d] ライブラリの読み込み > library(snow) SOCKを使用 > hosts <- rep("localhost", 10) > scl <- makeCluster(hosts, "SOCK") // fixed (2008-03-24) PVMを使用 > scl <- makeCluster(10, "PVM") Loading required package: rpvm MPIを使用 > scl <- makeCluster(10, "MPI") Loading required package: Rmpi 終了時に解放 > .Last <- function() { stopCluster(scl) } 乱数の初期化 > set.seed(1234) > seeds <- trunc(runif(length(cl), -.Machine$integer.max, .Machine$integer.max)) > sclusterApply(cl, seeds, set.seed) ###自信がないですが・・・ > seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) > clusterApply(scl, seeds, set.seed) ###ですよね。ryamada22(05/24/2006) 関数を用意 > i <- c(1:10000) > x <- runif(50) > tmp.func <- function(i){xx <- sample(x); mean(xx)} 並列化なし > system.time(lapply(i, tmp.func)) [1] 2.96 0.31 3.27 0.00 0.00 並列化あり > clusterExport(cl, "x") ###ここも > clusterExport(scl, "x") ###ですよね。 ryamada22(05/24/2006) > system.time(parLapply(scl, i, tmp.func)) [1] 0.14 0.01 0.44 0.00 0.00 *その他 [#ta00c3e1] **実験 [#be842447] 並列化するCPUを増やしても通信のコストがかかるため、場合によってはあまり速くならないことがある。そこで、並列化するノードの数の最適化を動的に行ってみる。 準備 scl <- makeCluster(40) .Last <- function() { stopCluster(scl) } seeds <- trunc(runif(length(scl), -.Machine$integer.max, .Machine$integer.max)) clusterApply(scl, seeds, set.seed) nopt.calc <- function(tm, scl, nopt, verbose=T) { if (verbose) cat("nopt n:", nopt$scl.size, " time:", tm) if (nopt$ltime < tm) { nopt$scl.size <- nopt$scl.old if (verbose) cat(" (denied)") } else { nopt$ltime <- tm if (verbose) cat(" (accepted)") } if (nopt$delta == 0) { nopt$ltime <- tm } nopt$delta <- round(runif(1, -2.0, 2.0)) nopt$scl.old <- nopt$scl.size nopt$scl.size <- nopt$scl.size + nopt$delta if (nopt$scl.size > length(scl)) { nopt$scl.size <- length(scl) } if (nopt$scl.size < 2) { nopt$scl.size <- 2 } if (verbose) cat(" next:", nopt$scl.size, "\n") } nopt.index <- function(scl, nopt) { sort(sample(seq(along=scl), nopt$scl.size)) } nopt <- new.env() nopt$scl.size <- 5 nopt$scl.old <- nopt$scl.size nopt$delta <- 0 nopt$ltime <- Inf p.t.test <- function(x) { t.test(x)$p.value } m <- matrix(runif(1000000), ncol=100) 例1 > for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parApply(scl[scl.idx], m, 1, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 26.8 (accepted) next: 7 <<-- n=5からスタート . . . nopt n: 5 time: 26.33 (accepted) next: 7 nopt n: 7 time: 28.46 (denied) next: 6 nopt n: 6 time: 27.08 (denied) next: 5 <<-- n=5~7でうろうろしている 例2 > for (i in 1:100) { + scl.idx <- nopt.index(scl, nopt) + etime <- system.time(res <- parRapply(scl[scl.idx], m, p.t.test)) + nopt.calc(etime[3], scl, nopt) + } nopt n: 5 time: 13.5 (accepted) next: 4 <<-- n=5からスタート . . . nopt n: 39 time: 3.04 (denied) next: 40 nopt n: 40 time: 3.01 (denied) next: 40 nopt n: 40 time: 3.02 (denied) next: 40 <<-- n=40は全ノード数 例からは、parApplyよりもparRapplyを使った方が良いことが分かる。 もちろん、呼び出す関数によって最適なノードサイズが異なる。 **ライブラリの修正 [#z68c3fb8] ローカルホストへはログインせずに実行するための修正 newSOCKnode <- function(machine = "localhost", ..., options = defaultClusterOptions) { # **** allow some form of spec here # **** make sure options are quoted options <- addClusterOptions(options, list(...)) port <- getClusterOption("port", options) scriptdir <- getClusterOption("scriptdir", options) if (getClusterOption("homogeneous")) { script <- file.path(scriptdir, "RSOCKnode.sh") rlibs <- paste(getClusterOption("rlibs", options), collapse = ":") rprog <- getClusterOption("rprog", options) } else { script <- "RunSnowNode RSOCKnode.sh" rlibs <- NULL rprog <- NULL } rshcmd <- getClusterOption("rshcmd", options) user <- getClusterOption("user", options) env <- paste("MASTER=", getClusterOption("master", options), " PORT=", port, " OUT=", getClusterOption("outfile", options), sep="") if (! is.null(rprog)) env <- paste(env, " RPROG=", rprog, sep="") if (! is.null(rlibs)) env <- paste(env, " R_LIBS=", rlibs, sep="") if (machine == "localhost") { system(paste("sh -c \'env", env, script, "\'")) # 追加 } else { system(paste(rshcmd, "-l", user, machine, "env", env, script)) } con <- socketConnection(port = port, server=TRUE, blocking=TRUE, open="a+b") structure(list(con = con, host = machine), class = "SOCKnode") } グローバル以外のオブジェクトでも利用可能なように修正 clusterExport <- function (cl, list){ gets <- function(n, v) { assign(n, v, env = .GlobalEnv) NULL } for (name in list) { clusterCall(cl, gets, name, get(name, env = sys.parent(1))) # 修正 } } **snow SOCK例(nfsやlamは難しい方でもOK?) [#ece2c779] 単一ノードではslaveのプロセスの振り分けができないのでrsh-fake経由に よる並列化を行います. require(snow) # スレーブに渡す関数定義 foo1<-function(x) { mean(x) } ; foo2<-function(x) { min(x) } foo3<-function(x) { max(x) } ; foo<-list(foo1,foo2,foo3) # 関数の数を基にスレーブ数を決定 task<-length(foo) # タスク数 # hostnameは識別用に使用します remotenodes <- paste("localhost",1:task,sep="") calc <-function(nodes,n,fun) { # Rmpiと同様に扱えるようホスト名でランク付け rank<-(1:length(nodes))[nodes==Sys.getenv("SOCKRANK")] set.seed(rank) x1<-array(runif(n*n),dim=c(n,n)) x2<-x1%*%x1 fun[[rank]](x2) } SOCK上でのディープな設定. # スレーブ先の異なるマシン間でRのPATHが異なる場合利用します setDefaultClusterOptions(rprog="/usr/local/R-1.9.1/bin/R") setDefaultClusterOptions(scriptdir="/usr/local/R-1.9.1/lib/R/library/snow") まあ、こんな感じで # slave起動 cl <- makeSOCKcluster(remotenodes, rshcmd="rsh-fake") # 各スレーブにライブラリのロード clusterEvalQ(cl,library(boot)) # 仕事振り分け rc <- clusterCall(cl, calc,remotenodes,100,foo) sapply(rc,print) rshの嘘っこ版 #!/bin/sh if [ $# -lt 4 ] ;then echo "usage: $0 -l user hostname [command [arg...]]" exit 1 fi SOCKRANK=$3 export SOCKRANK shift 3 exec $* でもそこまでするなら素直にLAM使った方が楽です(爆) ----- 若干の記事をこちらに掲載 Rmpi、snow、RSPRNG ->http://d.hatena.ne.jp/ryamada22/20060524 ---- -動作環境教えていただけませんか? -- [[HPC]] &new{2006-05-25 (木) 10:36:18}; - 新刊書 [[Parallel R:http://www.amazon.co.jp/Parallel-R-Q-Ethan-McCallum/dp/1449309925/ref=sr_1_1?s=english-books&ie=UTF8&qid=1323943184&sr=1-1]] R の本体付属パッケージ parallel の解説もあり。 -- &new{2011-12-15 (木) 19:02:55}; - pforeachパッケージがよいか。 http://d.hatena.ne.jp/hoxo_m/20141222/p1 -- &new{2015-01-18 (日) 00:07:59}; - [[Workshop on Distributed Computing in R:http://www.hpl.hp.com/research/systems-research/R-workshop/]] -- &new{2015-03-24 (火) 12:40:52}; #comment
テキスト整形のルールを表示する