L.Tierney氏のsnowパッケージでクラスタ計算を行う (間瀬茂)

必要に迫られ、計算機クラスタで計算をせざるを得なくなりました。

幸い私は東工大の誇るグリッドスパコンTSUBAME(現時点で公称アジア最速)を自由に使える(但し現在1ジョブあたり16ノードの制限)恵まれた立場にありますが、ただRを実行しても一台分しか使わないので、却って自分のパソコンよりも遅い(笑)。(追加註:TSUBAMEは32GBもしくは64GBのメモリを使えるため巨大なオブジェクトを扱う計算はさすがに早い。) 並列処理用にRのコードを書くのは面倒そうなので、これまで宝の持ち腐れ状態でしたが、最近snowパッケージを使うと、面倒無しにRで並列計算ができることを知り、やっと使ってみる気になりました。

ということで、勉強がてら作者Luke Tierney氏のウェブサイトにある解説文と、snowパッケージのヘルプ文章を以下に翻訳したものを紹介します。 snowパッケージは「完全に並列処理可能な複数計算(つまり一つの計算結果が他の計算結果を前提にしない)」を自動的にクラスタ内のワーカ(スレーブ)計算機に割り当て処理し、結果をマスタ計算機に集約できるようです。 具体的には apply() 関数の並列処理版関数を使うだけです。最終的には見近に転がっている遊休パソコンをつないでクラスタを作れば(このやりかたがよくわからない -> 求むノウハウ)、皆さんも手軽にクラスタ計算のメリットを享受できるのでは、と考えています。

コメント

  • MPIクラスタなら広島大学の伊藤さんの覚え書きが参考になります。MPIはマルチCPU/マルチコアCPUマシンの有効利用にも使える知識なので知っておいて損はないと思います。クラスタ構築用のライブCD Linuxディストリビューションとかもあります。 -- aki 2007-12-22 (土) 19:52:59
  • 早速の情報ありがとうございます。マルチコアCPUマシン自体の複数CPUをクラスタとして組織できるというのは初耳でした。実は最近4CPUマシンで二つの同一Rコードを同時に実行して実行速度をはかったら、一つだけ実行したときの速度の半分になってがっかりしていました(別々のCPUに自動的に割り当てられ速度は変わらないのかなと甘い期待をしていたのですが)。akiさん自身はすでにこうしたことを試みられたことがあるのでしょうか。 -- 間瀬茂 2007-12-22 (土) 20:51:00
  • 数年前MPI環境で動作する遺伝子配列の相同性検索プログラムであるMPIblastを動作を試みたことがあったのですが、何故かうまく行かず放置状態でした。というわけで恥かしくてコメントできなかったのですが、Rのパッケージ中にはsnowパッケージを前提として動作するものもいくつかあるようですね。 -- 樋口? 2007-12-22 (土) 21:20:34
  • RではありませんがGARLI/MrBayes?という系統推定プログラムをデュアル/クアッドコアマシンでMPIを用いて走らせています。blastも使っていますね。Rはスクリプトを用意しておいてPerlスクリプトからCPU個数分のRを立ち上げて実行させるということをやっています。 -- aki 2007-12-22 (土) 22:41:09
  • マルチCPUマシンでは基本的にはCPU個数だけRを起動して走らせればそれぞれのRが高速に動作するはずです。間瀬先生がお使いの4CPUマシンというのが、IntelのCore 2 Quadなのでしたら、これがデュアルコアCPUを2個くっつけているものであるということが影響しているのかもしれません。もしもPentium ExtremeEdition?などの2CPU+HyperThreading?による論理4CPUマシンの場合もジョブがどのように割り当てられるかによって変わるでしょう。同じスクリプトを4つ用意して1コアで4回と4コアで1回ずつ走らせる時間を比較されてみてはいかがですか。 -- aki 2007-12-22 (土) 22:47:28
  • また、Windowsだと常駐ソフトやプロセスの「優先度」、「プロセッサとの関係」(タスクマネージャでプロセスを右クリックして確認)によってはそのような結果になることがありますし、Linuxではカーネルが古かったりカーネルのコンパイル設定によってはそのようになることがありそうです。どちらもCPUをフルに使わない場合に起きるはずです。ちなみに私はGentoo Linuxでカーネルは手動設定でコンパイルしたものを利用しています。 -- aki 2007-12-22 (土) 22:54:15
  • うちの教官が2.8GHz 8-core Mac Proを購入してくれましたのでLinuxを入れてR+snow+MPI+pvclustで計算してみたところ、1プロセスでは100秒かかる計算が8プロセス使うことで14秒弱にまで短縮することが確認できました。 -- aki 2008-05-17 (土) 20:17:39

(1)R用の簡単なワークステーションネットワーク

Luke Tierney,
Department of Statistics and Actuarial Science,
University of Iowa

snow (Simple Network of Workstations) パッケージは、Rで「明白に並列的な」計算実行するための、 ワークステーション集合もしくは Beowulf クラスタ使用に対する簡単なメカニズムを実装している。 部分的に Python CoW(Cluster of Workstations)パッケージ に 基づくインタフェイスは極めて単純であることを目指しており、数種類の異なった低水準コミュニケーション 機構のトップに実装できるようにデザインされている。3種類の低水準インタフェイスがこれまでに 実装されている。一つは Li と Rossini による rpvm パッケージ経由で PVM を使う。もう一つは Hao Yu による Rmpi パッケージ経由で MPI を 使う。今一つは raw ソケットを使うもので PVM や MPI が使えない場合に有益である。 このノートはクラスタの開始法、クラスタを用いた計算に対する基本関数、そして 並列ブートストラップ法に対してクラスタを用いる一つの例を紹介する。

二つの注意:

  • 計算を ^C で中断することは問題を引き起こしやすく、 snow の実行中は避けた方が良い。
  • Rを終了する前にクラスタに対して stopCluster を実行すること。こうすることにより きれいにシャットダウンできるようになる。

クラスタを開始し終了する

ワークステーションクラスタの開始は、基底にあるコミュニケーション機構に明示的に依存する クラスタを使用する際の唯一のステップである。 クラスタは makeCluster 関数を呼び出すと開始されるが、呼び出しの詳細は現在のところクラスタの タイプにより少々異なる。PVM と MPI クラスタはまた、PVM もしくは MPI システムを開始するために幾つかの 予備的準備的を必要とするかもしれない。

PVM クラスタを開始する

PVM (Parallel Virtual Machine) クラスタを開始するには、まず PVM を開始しなければならない。 私は、このためには、私は PVM コンソール pvm を使うのが好きだ:

<starting PVM using the pvm console>

luke@itasca cluster% pvm
pvm&gt; add itasca
add itasca
0 successful
                    HOST     DTID
                  itasca Duplicate host
pvm> add owasso
add owasso
1 successful
                    HOST     DTID
                  owasso    80000
pvm> conf
conf
2 hosts, 2 data formats
                    HOST     DTID     ARCH   SPEED       DSIG
                  itasca    40000    LINUX    1000 0x00408841
                  owasso    80000   DARWIN    1000 0x0658eb59
pvm> 

conf 命令で示された並列仮想マシン構成は二つのホスト、 i386 Linux が動く itasca と Mac OS X が動く owasso からなる。コマンドラインコンソール pvm の代わりに、もしインストール されていればグラフィカルなコンソール xpvm を使っても良い。もしくは rpvm パッケージを使って、rpvm パッケージが提供する R 関数を使っても良い。

一旦 PVM が動き出せば、二つの作業ノードからなるクラスタを使うためには、 マスターでRを開始し、 snow パッケージをロードし、そして makeCluster を二つの引数、作業ノード数と type = "PVM" で実行する。:

<starting a PVM cluster>

cl <-  makeCluster(2, type = "PVM")

返り値はこれらの二つのプロセスへの参照からなるリストである。

もし rpvmパッケージが利用でき Rmpi がまだロードされていなければ、 現在の既定のクラスタタイプは "PVM" であり、従って type = "PVM" 引数は省略できる。

PVM を使った作業が終了したら、それをシャットダウンすることを忘れないこと。 例えば、pvm コンソールで halt 命令を実行する。

MPI クラスタを開始する

もし MPI 2 プロセスを生み出す(spawn) API をサポートしている LAM のような MPI システムを使っているのなら、ソケットや PVM クラスタと同様にして MPI クラスタを 生成できる。もしプロセス生成機能をサポートしない MPI システムを使っているなら、異なった手順が 必要になる。例えば lamboot を使い、最初に MPI を開始する必要があるかも知れない。 詳細は使用中の MPI システムに依存する。

MPI の利用が終ったらそれをシャットダウンする(もし使用中の MPI が必要とするならば)。 LAM-MPI に対しては例えば lamhalt を使う。

生成された MPI クラスタ

プロセス生成機能をサポートする MPI システムに対する MPI クラスタは次のように開始できる;

<starting an MPI cluster with process spawning>

cl <-  makeCluster(2, type = "MPI")

もし rpvm が利用不可能だが Rmpi は利用できるなら、 現在の既定のクラスタタイプは "MPI" であり、引数 type = "MPI" は省略できる。

プロセス生成機能を持たない MPI クラスタ

もし使用中の MPI システムがプロセス生成機能を持たないか、mpirun を使って MPI ジョブを 実行したいのなら、 Rmpi をインストールする必要があるだろう。 またインストールした snow パッケージか、パッケージのソースの inst ディレクトリから RMPISNOW というシェルスクリプトを適当な位置(できればユーザから アクセス可能なパス)にインストールしておく必要がある。 もし RMPISNOW がユーザのパス上にあれば、R を以下のように開始する:

<starting R with mpirun>

mpirun -np 3 RMPISNOW

これはマスターと二つのワーカプロセスを開始する。クラスタはマスタプロセス中に作られる。 このクラスタへの参照は次のようにして得られる:

<obtaining a cluster started with mpirun>

cl <-  getMPIcluster()

最新版の snow では稼働中のクラスタへの参照はまた以下でも得られる:

obtaining a cluster started with mpirun>

cl <-  makeCluster()

もしくは、二つのワーカをを持つクラスタに対しては次のようにしても良い:

<obtaining a cluster started with mpirun>

cl <-  makeCluster(2)

ソケットクラスタを開始する

ソケットクラスタを開始するには命令 makeCluster を使い、 type="SOCK" と使用する計算機のリストを指定する。例えば

<starting a socket cluster>

cl <-  makeCluster(c("itasca", "owasso"), type = "SOCK")

二つの R プロセスからなるクラスタを開始する。一つは itasca という名前の計算機、今一つは owasso という計算機上にある. 返り値はこれら二つのプロセスへの参照である。

補足的なクラスタ生成引数

クラスタ生成関数 makeCluster には、ノードを生成プロセスに対するオプションを 指定する幾つかの追加的な名前付き引数を与えることができる。 最も有用な引数はクラスタタイプに対する type 引数、開始プロセスが、ワーカのホストがマスタと同一の アーキテクチャとファイルシステムのレイアウトを持つと仮定して良いかを指定する
homogeneous 引数(異質的(heterogeneous)クラスタに付いては「異質的(inhomogeneous)システム」節 に解説がある)、 そしてワーカからの出力が出力先を指示する outfile である。 既定では、outfile/dev/null である(つまり無視される)。 デバッグ時、特に初期設定の試行中、には出力を適当なファイルにリダイレクトすることが有益なことがある。

各クラスタ生成呼び出しに対する追加のクラスタオプションを指定する もう一つの方法が、setDefaultClusterOptions? を使って既定動作を変更することである。 例えば、既定値が「異質的 MPI クラスタ」と指定するには次のようにする:

setDefaultClusterOptions(type="MPI", homogeneous = FALSE)

クラスタを停止する

クラスタを停止するには次のようにすべきである:

<stopping a cluster>

stopCluster(cl)

ソケットクラスタはそれらを生成したプロセスが終了すると自動的に停止するが、 それでも stopCluster を呼び出すのが好ましい。 それが生成したプロセスが存在するより前に PVM クラスタを終了するのに stopCluster を使用する代わりに、PVM コンソールで halt 命令を 実行しても良い。使用している MPI システムは同様の機能を持っている可能性がある。 例えば、LAM システムに対しては lamhalt 命令や lamwipe 命令が存在する。

異質的(inhomogeneous)システム

クラスタ生成時の既定オプション設定では、 R シェルスクリプトと snow ライブラリが全てのノードで同一の絶対パスにあるという同質的(homogeneous) システムを前提にしている。snow はシステムライブラリにインストールされている必要はないが、 既定の設定が有効であるためには、それがマスタに対する R_LIBS 環境変数中で指定された ライブラリのどれかに存在することが必要である。

異なったレイアウトのファイルシステムや、異なったアーキテクチャが同一のファイルシステムを共有するシステムを 持つような、異質的(inhomogeneous)システムでは幾つかの追加の設定が必要になる: ワーカプロセスを実行するのに使われる各計算機毎に、スクリプト RunSnowNode? をインストールされた snow からその計算機の検索パス上の位置にコピーし、それが実行可能であることを確認する。 マスタ計算機とワーカプロセスを実行する各計算機で、環境変数 R_SNOW_LIBsnow パッケージや、rpvm, Rmpi もしくは 適切なアーキテクチャに対する rlecuyer といったサポートパッケージを含む ディレクトリの名前にする。 各計算機の検索パス上に適切なバージョンの R が利用可能であることを確認する。

現時点では、変数 R_SNOW_LIB を空でない値に設定すると、マスタは homogeneous クラスタオプションを FALSE とする。 さもなければ、それは TRUE とされる。

クラスタを使う

基本的関数

クラスタを使用するための基本的関数は clusterCallclusterApply である。clusterCall は指定された関数を クラスタの各メンバ上で同一引数で呼び出し、結果をリストで返す。実行は 並列処理される。例えば、ノードにその名前と計算機タイプを問い合わせるには次のようにする:

<R session>

> clusterCall(cl, function() Sys.info()[c("nodename","machine")])
[[1]]
               nodename                 machine 
"owasso.stat.uiowa.edu"       "Power Macintosh" 

[[2]]
nodename  machine 
"itasca"   "i686" 

有用な clusterCall の変種は clusterEvalQ で以下のように 定義される:

<definition of clusterEvalQ>

clusterEvalQ <- function(cl, expr)
    clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)

これは例えば、各クラスタノードにあるパッケージをロードするのに使える:

<loading a library on all nodes>

clusterEvalQ(cl, library(boot))

clusterApplylapply の変種で、クラスタの異なったメンバ上で各々の 呼び出しを実行する。クラスタのノード数はリスト引数中の要素数以上でなければならない。 簡単な例:

<R session>

> clusterApply(cl, 1:2, get("+"), 3)
[[1]]
[1] 4

[[2]]
[1] 5

乱数発生

既定の乱数発生器は強い相関を持つ可能性が高い:

<R session>

> clusterCall(cl, runif, 3)
[[1]]
[1] 0.08496597 0.35232298 0.60300751

[[2]]
[1] 0.08496597 0.35232298 0.60300751

これを回避する手軽な方法は乱数種であり、 次のようにする:

<random seeding of cluster generators>

clusterApply(cl, runif(length(cl),max=10000000), set.seed)

より良い方法は並列乱数発生器 パッケージを使うことである。そうしたものが幾つかある。 snow は既定では rlecuyer パッケージを使うが、これは [[L'Ecuyer, Simard, Chen, and Kelton: http://www.iro.umontreal.ca/%7Elecuyer/myftp/papers/streams00.pdf]] の並列乱数発生パッケージへの インタフェイスである。 関数 clusterSetupRNG は初期化を行う。追加引数無しで呼び出すと ランダムな種を使う。ヘルプ頁で説明された名前付き引数を使えば初期値をコントロールできる:

<R session>

> clusterSetupRNG(cl)
> clusterCall(cl, runif, 3)
[[1]]
[1] 0.749391854 0.007316102 0.152742874

[[2]]
[1] 0.8424790 0.8896625 0.2256776

ブートストラップの例

boot パッケージはデータ nuclear を使った例を含む。 boot のヘルプ頁に与えられた準備コードは次のようになる:

<bootstrap setup>

library(boot)
#  この例では nuclear データに基づく回帰予測で boot を使う。
#  この例は Davison and Hinkley (1997) の例6.8 から取った。
#  統計量に対する2つの追加引数が boot 経由で渡されていることを注意。
data(nuclear)
nuke  <-  nuclear[,c(1,2,5,7,8,10,11)]
nuke.lm  <-  glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke)
nuke.diag  <-  glm.diag(nuke.lm)
nuke.res  <-  nuke.diag$res*nuke.diag$sd
nuke.res  <-  nuke.res-mean(nuke.res)

#  データ、標準化残さ、当てはめ値を持つ新しいデータフレームを
#  ブートストラップで使うために作る。
nuke.data  <-  data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm))

#  番号32のプラントの日付 73.00 での予測値を得たい。
new.data  <-  data.frame(cost=1, date=73.00, cap=886, ne=0,
                       ct=0, cum.n=11, pt=1)
new.fit  <-  predict(nuke.lm, new.data)

nuke.fun  <-  function(dat, inds, i.pred, fit.pred, x.pred) {
     assign(".inds", inds, envir=.GlobalEnv)
     lm.b  <-  glm(fit+resid[.inds] ~date+log(cap)+ne+ct+
                 log(cum.n)+pt, data=dat)
     pred.b  <-  predict(lm.b,x.pred)
     remove(".inds", envir=.GlobalEnv)
     c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred]))
}

一つのワークステーションではブートストラップ例は 約30秒かかった。

<R session>

> system.time(nuke.boot  <- 
+             boot(nuke.data, nuke.fun, R=999, m=1,
+                  fit.pred=new.fit, x.pred=new.data))
[1] 26.32  0.71 27.02  0.00  0.00

二つの計算機からなるクラスタ (このときは同一の i686 ノード) では 約2倍早かった。

<R session>

> clusterEvalQ(cl, library(boot))
> system.time(cl.nuke.boot  <- 
+             clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1,
+                         fit.pred=new.fit, x.pred=new.data))
[1]  0.01  0.00 14.27  0.00  0.00

4台のノードのクラスタでは約6秒であった。

<R session>

> cl <- makeCluster(5)
> clusterEvalQ(cl,library(boot))
> system.time(cl.nuke.boot  <- 
+             clusterCall(cl,boot,nuke.data, nuke.fun, R=200, m=1,
+                         fit.pred=new.fit, x.pred=new.data))
[1] 0.03 0.00 5.58 0.00 0.00

速度を比較するのに使うべき値は三番目の経過時間である。

高水準関数

clusterApply を用いた高水準関数は parLapply, parSapply, そして parApply を含む。 これらはそれぞれ lapply, sapply、そして apply の並列版である。幾つかの例を示す:

<higher level examples>

sum(parApply(cl, matrix(1:100,10), 1, sum))
sum(parApply(cl, matrix(1:10000,100), 1, sum))

<higher level examples>

x <- 1:100/101
system.time(qtukey(x, 2, df=2))
system.time(unlist(parLapply(cl, x, qtukey, 2, df=2)))

解説のために行列積の並列計算の非常に簡単な例をあげる:

<higher level examples>

A <- matrix(rnorm(10000),100)
system.time(A %*% A)
system.time(parMM(cl,A , A))

コメント

留意すべき幾つかの注意:

  • 計算機間の通信速度は計算速度に比べるとかなり遅い。
  • この構成では通信は直列化されている。
  • 大きなサイズの結果を返す R 関数がある。結果を返す前に縮約した方が良い。
  • PVM と MPI 版は迷子のジョブを残さないことを確実にするという意味でより頑健である。

PVM コンソール、もしくは lamhaltlamwipe を使って PVM もしくは LAM-MPI 領域を停止することにより R プロセスを殺すことができる。 ソケットを使った実装では、マスタが存在するときはどんな稼働中のノードも 残すべきではない。しかし、妙な具合でそうなることもあり得る。 解説のために行列積の並列計算の非常に簡単な例をあげる:

<higher level examples>

A <- matrix(rnorm(10000),100)
system.time(A %*% A)
system.time(parMM(cl,A , A))

コメント

留意すべき幾つかの注意:

  • 計算機間の通信速度は計算速度に比べるとかなり遅い。
  • この構成では通信は直列化されている。
  • 大きなサイズの結果を返す R 関数がある。結果を返す前に縮約した方が良い。
  • PVM と MPI 版は迷子のジョブを残さないことを確実にするという意味でより頑健である。

PVM コンソール、もしくは lamhaltlamwipe を使って PVM もしくは LAM-MPI 領域を停止することにより R プロセスを殺すことができる。 ソケットを使った実装では、マスタが存在するときはどんな稼働中のノードも 残すべきではない。しかし、妙な具合でそうなることもあり得る。 時としてまずいことが起き得、手動による後かたづけが必要になる。

未解決の問題

エラー処理

エラー処理は現在のところ非常に素朴である。ワーカノードでの全ての評価は try 関数を用いて実行される。従って、返り値は真正の結果であるか、 try 関数が返すエラーである。原則としてユーザのコードがこれをチェックすべきである。

幾つかのノードがエラーを出し、その他はエラーにならない状況を処理する正しい方法は 明らかでなく、問題による。R がより洗練されたエラー処理機構を備えるようになれば、 snow 中のエラー処理に対する柔軟なユーザ制御をその上に構築することが できるであろう。

中断処理

理想的には、マスタに於ける中断はワーカに伝播すべきで、現在進行中の全ての計算を中断すべきである。 しかしそうなってはいない。実際、snow を使った計算中のマスタ側での中断は 通信基盤を混乱した状態に陥れ、それ以降現在の R セッションでのクラスタの利用を不可能にする。 一時しのぎの解決策: snow による計算を中断してはならない。勿論これは満足のいく解決策ではない。 R が中断に対する処理を制御する機構を持てば、もう少しましなことが可能になるであろう。 実際に中断をワーカノードに伝播させることが可能になるかどうかは全く不明で OS に依存するかもしれない。

(2) snowクラスタでの一様乱数発生 (snowパッケージのヘルプ文章)

説明:

SNOWクラスタで使う独立一様乱数ストリームを初期化する。L'Ecuyer の乱数発生器 (パッケージ rlecuyer が必要) もしくは SPRNG 発生器(パッケージ rsprng が必要)のどちらかを使う。

用法:

    clusterSetupRNG (cl, type = "RNGstream", ...)
    clusterSetupRNGstream (cl, seed=rep(12345,6), ...)
    clusterSetupSPRNG (cl, seed = round(2^32 *runif(1)),
                       prngkind = "default", para = 0, ...) 

引数:

      cl: クラスタオブジェクト
    type: 'type="RNGstream"' (既定) は L'Ecuyer の RNG、 
          'type="SPRNG"' は SPRNG 発生器を使う
     ...: 背景にある関数(以下の詳細を見よ)へ渡される引数
    seed: RNGの種として使われる整数値 (SPRNG) もしくは長さ6の整数ベクトル
prngkind: SPRNG で使われる発生法タイプの名前文字列
    para: 発生器にたいする追加引数

詳細:

  • clusterSetupRNG はその引数値に応じて他の関数を引数 (cl, ...) で呼び出す。もしタイプ "SPRNG" が使われたら関数 "RNGstream" が使われる。
  • clusterSetupSPRNG は 'rsprng' パッケージをロードし、各ノードごとに別個のストリームを初期化する。より詳細は 'init.sprng' の解説を見よ。マスタ上の発生器は変更されない。
  • clusterSetupRNGstream は 'rlecuyer' パッケージをロードし、ノードごとに一つずつストリームを生成し、ノードにストリーム状態を分配する。
  • より詳しくは URL (上の解説文章(1))を見よ。

例:

    ## Not run: 
    clusterSetupSPRNG(cl)
    clusterSetupSPRNG(cl, seed=1234)
    clusterSetupRNG(cl, seed=rep(1,6))
    ## End(Not run)

(3) クラスタレベルのSNOW関数 (snowパッケージのヘルプ文章)

説明:

SNOWクラスタ上で計算を行う関数。

用法:

    clusterSplit(cl, seq)
    clusterCall(cl, fun, ...)
    clusterApply(cl, x, fun, ...)
    clusterApplyLB(cl, x, fun, ...)
    clusterEvalQ(cl, expr)
    clusterExport(cl, list)

引数:

     cl: クラスタオブジェクト
    fun: 関数もしくは関数名を与える文字列
   expr: 評価すべき表現式
    seq: 分割されるベクトル
   list: エクスポートされる変数名の文字列
      x: 配列
    ...: 標準関数に引き渡される追加引数

詳細:

  • これらはクラスタ上で計算を行うための基本的関数である。スレーブノードでの計算はすべて 'try' 関数を用いて実行される。現在のところ、クラスタ計算関数が返す結果リストは真正な結果と 'try' のエラーオブジェクトの結合である可能性がある。より洗練されたアプローチは将来検討されるであろう。
  • clusterCall は関数 'fun' をクラスタ 'cl' の各ノードで同じ引数 '...' で呼び出し、結果のリストを返す。
  • clusterEvalQ は各クラスタノードで文字通りの表現式を評価する。これは 'evalq' のクラスタ版であり、clusterCall を用いた便利関数である。
  • clusterApply は関数 'fun' を、最初のクラスタノードでは引数 'seq1?' と '...', で、二番目のクラスタノードでは引数 'seq2?' と '...' で、等と並列実行する。'seq' の長さはクラスタのノード数以下でなければならない。結果のリストが返される。結果リストの長さは 'seq' の長さに等しくなるであろう。
  • clusterApplyLBclusterApply の負荷均等化版である。もし 'seq' の長さ 'p' がクラスタノード数 'n' より大きければ、最初の 'n' 個のジョブが順番に 'n' 個のノードで実行される。最初のジョブが終了すると次のジョブが利用可能なノードで実行される。これがすべてのジョブが終了するまで実行される。clusterApplyLB の利用により clusterApply 関数の利用よりもクラスタを効率的に使用できる。しかしながら、増加する通信量がパーフォーマンスを低下させる可能性がある。さらに、特定のジョブを実行するノード不確定であり、シミュレーションの再現性保障に問題なしとしない。
  • clusterExport は 'list' 中に名前があるマスタの大域的変数の値を、各ノードの同じ名前の大域的変数に付値する。
  • clusterSplit は 'seq' を各クラスタに対する連続的な断片に分割し、結果をクラスタノードと同じ長さのリストとして返す。現在のところ、断片は長さが同じになるように分割される。将来はノードの相対的なパーフォーマンス情報に応じて分割するようになるであろう。
  • より詳しくは URL (上の解説文章(1))を見よ。

例:

    ## Not run: 
    cl <- makeSOCKcluster(c("localhost","localhost"))
    clusterApply(cl, 1:2, get("+"), 3)
    clusterEvalQ(cl, library(boot))
    x<-1
    clusterExport(cl, "x")
    clusterCall(cl, function(y) x + y, 2)
    ## End(Not run)

(4) snow クラスタを開始・停止する (snowパッケージのヘルプ文章)

説明:

SNOWクラスタを開始・停止し、クラスタオプションを設定する関数。

用法:

    makeCluster(spec, type = getClusterOption("type"), ...)
    stopCluster(cl)
    setDefaultClusterOptions(...)
    makeSOCKcluster(names, ..., options = defaultClusterOptions)
    makePVMcluster(count, ..., options = defaultClusterOptions)
    makeMPIcluster(count, ..., options = defaultClusterOptions)
    getMPIcluster()

引数:

   spec: クラスタ指定
  count: 生成するノー度数
  names: ノード名の文字列ベクトル
options: クラスタオプションオブジェクト
     cl: クラスタオブジェクト
    ...: クラスタオプション指定
   type: クラスタのタイプを指定する文字列

詳細:

  • makeCluster は指定もしくは既定タイプのクラスタを開始し、そのクラスタへの参照を返す。サポートされるクラスタタイプは値の '"SOCK"', '"PVM"' そして '"MPI"' である。'"PVM"' と '"MPI"' クラスタに対しては 'spec' 引数は生成されるべきスレーブノードの数を表す整数でなければならない。 '"SOCK"' クラスタに対しては 'spec' はスレーブノードがその上に開始されるホストの名前である文字列ベクトルでなければならない。 このベクトル中の各ノードに対してひとつのノードが開始される。
  • stopCluster はクラスタを適切にシャットダウンするために、Rを終了する前に、呼び出されるべきである。もし呼び出されないと、すべてのスレーブ過程がシャットダウウンされたことを保障するために外部的な手段が必要になるかも知れない。
  • setDefaultClusterOptions? は既定のクラスタオプションを別の値に指定するのに使うことが出来る。多くのオプションがある。もっとも重要なオプションは 'type' と'homogeneous' である。'type' の現在の既定値は、もし 'rpvm' パッケージが利用可能なら '"PVM"' とされる。さもなければ、もし 'Rmpi' パッケージが利用可能なら '"MPI"' とされる。どちらのパッケージも利用不可能なら '"SOCK"' とされる。
  • homogeneous オプションは非同質的なクラスタに対する開始手順を使うことを指定するのなら 'FALSE' と設定すべきである。これはいくつかの追加のコンフィギュレーションを必要とするかもしれない。既定設定値は、マスタホストの環境変数 R_HOME_LIB が空でない値で定義されていない限り、 'TRUE' である。
  • オプション outfile はスレーブノードでの出力が記録されるファイルを指定するのに使える。既定値は '/dev/null' (つまり放棄)である。インストール時のデバッグに際してはこれを適切なファイルに設定しておくことが有用である。
  • 関数 makeSOCKcluster, makePVMcluster そして makeMPIcluster はそれぞれのタイプでクラスタを開始するのに使うことが出来る。
  • プロセス生成(process spawning)が利用できず、マスタとスレーブ集合を開始するのに mpirun のような命令が使われる MPI コンフィギュレーションでは、対応するクラスタはあらかじめ構成されているはずで、getMPIcluster で得ることができる。このインタフェイスは依然実験段階で変更される可能性がある。
  • より詳しくは URL (上の解説文章(1))を見よ。

例:

    ## Not run: 
    cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
    clusterApply(cl, 1:2, get("+"), 3)  
    ## End(Not run)

(5) 高水準SNOW関数 (snowパッケージのヘルプ文章)

説明:

'apply' 関数の並列版と関連関数。

用法:

    parLapply(cl, x, fun, ...)
    parSapply(cl, X, FUN, ..., simplify = TRUE, USE.NAMES = TRUE) 
    parApply(cl, X, MARGIN, FUN, ...)
    parRapply(cl, x, fun, ...)
    parCapply(cl, x, fun, ...)
    parMM(cl, A, B)

引数:

       cl: クラスタオブジェクト
      fun: 関数もしくは関数名文字列
        X: 使用される配列
        x: 使用される行列
      FUN: 関数もしくは関数名文字列
   MARGIN: 使用される次元を指定するベクトル
 simplify: 論理値、'sapply' を参照
USE.NAMES: 論理値、'sapply' を参照
      ...: 標準関数に引き渡される追加引数
        A: 行列
        B: 行列

詳細:

  • parLapply, parSapply そして parApply はそれぞれ 'lapply', 'sapply' そして 'apply' の並列版である。
  • parRapplyparCapply は行列 'x' の行および列に対する 'apply' 関数の並列版である。
  • parMM は非常に簡単な行列積の並列版である。説明用を意図している。
  • より詳しくは URL (上の解説文章(1))を見よ。

例:

    ## Not run: 
    cl <- makeSOCKcluster(c("localhost","localhost"))
    parSapply(cl, 1:20, get("+"), 3)
    ## End(Not run)

(6) Rmpi パッケージ情報

snow パッケージが依存している Rmpi パッケージは MPI (Message-Passing Interface) と呼ばれるクラスタ構成ソフトへのインタフェイスで、それ自体 R でクラスタ並列計算を行う多くの関数が提供されているようです。以下にパッケージ内の関数の簡易説明一覧を紹介します。

MPI API:

mpi.abort               Abort (quit) all tasks associated with a comm
mpi.allgather           Gather data from each process to all process
mpi.allgatherv          Gather diff size data from each process to all process
mpi.allreduce           Reduce all process's vectors into one vector
mpi.barrier             Block the caller until all group have called it
mpi.bcast               Broadcast a vector (int,double,char) to every process 
mpi.cancel              Cancel a nonblocking send or recv
mpi.cart.coords         Translate a rank to the Cartesian topology coordinate
mpi.cart.create         Create a Cartesian structure of arbitrary dimension
mpi.cartdim.get         Get dim information about a Cartesian topology
mpi.cart.get            Provide the Cartesian topology associated with a comm
mpi.cart.rank           Translate a Cartesian topology coordinate to the rank
mpi.cart.shift          Shift Cartesian topology in displacement and direction
mpi.comm.disconnect     Disconeect and free a comm
mpi.comm.dup            Duplicate a comm to a new comm
mpi.comm.free           Free a comm
mpi.comm.get.parent     Get the parent intercomm
mpi.comm.rank           Find the rank (process id) of master and slaves
mpi.comm.remote.size    Find the size of a remote group from an intercomm
mpi.comm.size           Find the size (total # of master and slaves)
mpi.comm.set.errhandler Set comm to error return (no crash)
mpi.comm.spawn          Spawn slaves
mpi.comm.test.inter     Test if a comm is an intercomm
mpi.dims.create         Create a Cartesian dim used by mpi.cart.create
mpi.finalize            Exit MPI environment (call MPI_Finalize())
mpi.gather              Gather data from each process to a root process
mpi.gatherv             Gather diff data from each process to a root process
mpi.get.count           Get the length of a message for given status and type
mpi.get.processor.name  Get the process (host) name 
mpi.info.create         Create an info object
mpi.info.free           Free an info object
mpi.info.get            Get the value from an info object and a key
mpi.info.set            Set a key/value pair of an info object
mpi.intercomm.merge     Merge a intercomm to a comm 
mpi.iprobe              Nonblocking use a source and a tag to set status
mpi.irecv               Nonblocking receive a vector from a specific process
mpi.isend               Nonblocking send a vector to a specific process
mpi.probe               Use a source and a tag to set status
mpi.recv                Receive a vector from a specific process
mpi.reduce              Reduce all processes's vectors into one (one process)
mpi.scatter             Opposite of mpi.gather
mpi.scatterv            Opposite of mpi.gatherv (diff size data)
mpi.send                Send a vector to a specific process
mpi.sendrecv            Send & receive different vectors in one call
mpi.sendrecv.replace    Send & replace a vector in one call
mpi.test                Test if a nonblocking send/recv request is complete
mpi.testall             Test if all nonblocking send/recv requests are complete
mpi.testany             Test if any nonblocking send/recv requests are complete
mpi.testsome            Test if some nonblocking send/recv requests are complete
mpi.test.cancelled      Test if a communication is cancelled by mpi.cancel
mpi.universe.size       Total number of CPUs available
mpi.wait                Wait if a nonblocking send/recv request is complete
mpi.waitall             Wait if all nonblocking send/recv requests are complete
mpi.waitany             Wait if any nonblocking send/recv requests are complete
mpi.waitsome            Wait if some nonblocking send/recv requests are complete

MPI R 環境の拡張:

lamhosts            Hosts id and machine host name mapping 
mpi.allgather.Robj  Gather any type of objects to every number
mpi.any.source      A constant for receiving a message from any source
mpi.any.tag         A constant for receiving a message from any tag
mpi.bcast.Robj      Broadcast an R object to every process
mpi.comm.maxsize    Find the length of comm array
mpi.exit            MPI_Finalize を呼出し、Rmpi ライブラリを切り離す(detach)
mpi.gather.Robj     Gather any type of object to  a root process
mpi.get.sourcetag   Get the source and tag for a given status
mpi.hostinfo        Get the host information that the process is running
mpi.init.sprng      rsprng ライブラリ中の SPRNG を初期化する
mpi.is.master       TRUE if it is a master otherwise FALSE (slave)
mpi.isend.Robj      Nonblocking send an R object to a specific process
mpi.proc.null       Dummy source and destination
mpi.quit            MPI_Finalize を呼出し R を終了する
mpi.recv.Robj       Receive an R object from a process (by mpi.send.Robj)
mpi.realloc.comm    Increase comm array to a new size
mpi.realloc.request Increase request array to a new size
mpi.realloc.status  Increase status array to a new size
mpi.request.maxsize Find the length of request array
mpi.scatter.Robj    Scatter an list to every number 
mpi.send.Robj       Send an R object to a specific process
mpi.spawn.Rslaves   Spawn R slaves. The default R script is slavedaemon.R
mpi.status.maxsize  Find the length of status array
mpichosts           finds host names from master Windows registery database 

MPI slavedaemon.R スクリプト固有の拡張 (インタラクティブな R スレーブ).

mpi.apply            配列をスレーブに分配し、それから fun を適用する
mpi.applyLB          mpi.apply の負荷均等版
mpi.bcast.Robj2slave Master sends an Robj to all slaves
mpi.bcast.cmd        全てのプロセスにある命令をブロードキャストする
mpi.close.Rslaves    mpi.spawn.Rslaves() で起動された全てのスレーブを閉じる
mpi.parApply         (負荷均等版) 並列 apply 関数
mpi.parCapply        (負荷均等版) 列への並列 apply 関数 
mpi.parLapply        (負荷均等版) 並列 lapply 関数
mpi.parRapply        (負荷均等版) 行への並列 apply 関数
mpi.parReplicate     ある表現式の反復評価のための mpi.parSapply 関数へのラッパ関数
mpi.parSapply        (負荷均等版) 並列 sapply 関数
mpi.parSim           (負荷均等版) 並列モンテカルロシミュレーション
mpi.remote.exec      ある命令をスレーブで遠隔実行し、結果をマスタに返す
mpi.setup.rngstream  パッケージ rlecuyer の RNDstream を全てのスレーブで設定する
mpi.setup.sprng      パッケージ rsprng の SPRNG を全てのスレーブで設定する
slave.hostinfo       全てのスレーブの rank, comm, host information を示す
tailslave.log        スレーブのログファイルの末尾部分を表示する

他の MPI 関数が用いる内部関数

bin.nchar            Find the length of a binary string
mpi.comm.is.null     Test if a comm is NULL (no members)
string               Create a string (empty space character) buffer

(7) パッケージ rpvm の情報

パッケージ rpvm はPVM (Parallel Virtual Machine) API へのRインタフェイスを提供する。

パッケージ 'rpvm' 内の関数の簡易説明一覧

.PVM.barrier      Group synchronization
.PVM.bcast        Broadcasting the data
.PVM.bufinfo      Message buffer infomation
.PVM.config       PVMD configuration
.PVM.exit         Unregister process from local PVM deamon.
.PVM.freebuf      Free message buffer
.PVM.gather       Gather the data into root
.PVM.getinst      Instance number identified by group name and task id
.PVM.getsbuf      Manipulating Message Buffers
.PVM.gettid       Task id identified by group name and instance number
.PVM.gsize        Get the size of the group
.PVM.initsend     Initialize send buffer
.PVM.joingroup    Join or leave a names group
.PVM.kill         pvm プロセスを停止する
.PVM.mcast        Multicast data
.PVM.mkbuf        Create message buffer
.PVM.mstats       Status of host machines
.PVM.mytid        Task IDs
.PVM.notify       Monitor pvmd
.PVM.nrecv        Nonblocking receive
.PVM.probe        Probe receive
.PVM.pstats       Status of PVM processes
.PVM.recv         Blocking receive
.PVM.reduce       Reduction
.PVM.scatter      Scatter a vector across the group
.PVM.send         Send data
.PVM.serialize    Serialize R Objects
.PVM.spawn        Spawn child tasks
.PVM.tasks        Tasks information
.PVM.tidtohost    Host id of a task
.PVM.trecv        Timeout receive
PVM.options       libpvm Options
PVM.pack          Packing data
PVM.rapply        並列版 apply 関数
PVM.unpack        Unpacking data
PVMD              Vitural Machine Control
init.sprng.master SPRNG を初期化する rsprng へのインタフェイス

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Google
WWW を検索 OKADAJP.ORG を検索
Last-modified: 2015-03-01 (日) 01:15:59 (1354d)