並列計算 snowfallとその拡張

マルチコアが主流となってきているので、並列計算のパッケージである「snowfall」を紹介します。
なお、特に断りが無い場合はSocketクラスターを使用しています。
また、画像は縮小表示しています。クリックすると画像のみがオリジナルサイズで表示されます。

現在編集中
調査中の内容があるため
信用に値しない内容も多分に含まれています。


目次


========== 初級編 : snowfallを知る ==========

並列計算で計算速度は向上するか?

並列計算のイメージ

並列概念.jpg

並列化できる割合によって、理論上の限界が変わってきます。
アムダールの法則を参照)

なお、計算負荷の不均一性やデータ入出力負荷が理由で、かえって遅くなる場合もあります。

並列計算は計算速度向上の一つの手段にすぎません。
計算速度向上の手段はいくつもありますので、適切な方法を用いてください。
CRAN Task View: High-Performance and Parallel Computing with R

どのような計算が並列化できるか

基本的にapply familyは全て並列化可能です。
詳細な条件としては、「計算の順序を入れ替えても計算結果が同じである」事になります。

apply familyも内部ではforを使用しているので、並列化できるfor・並列化できないforが存在します。

と、不安になるような事を書きましたが、上記のケースはどちらもsnowfallでは実現できないため、snowfallで並列計算を行う場合は気にしなくて大丈夫です。

各種パッケージ比較

package利点欠点
parallel・デフォルトパッケージとしてインストールされる。
・誰でも使用できる?
・下記「snow」「multicore」のコピーみたいなものなので、少々使いにくい。
・snowで使用できたクラスター稼働状況を調査する「snow.time」が使用できない。
multicore・parallelパッケージの基礎となった優秀なパッケージ
(だと思う。ウィンドウズ環境しかないため使用していません。)
・うゐんどうずで動かない。
snow・様々な関数が用意されている。
・parallelパッケージの基礎となった優秀なパッケージ
・クラスター稼働状況を調査する「snow.time」が使用できる。
・常にクラスターの指定をしなくてはならない。
snowfall・クラスターの指定をする必要がない。
・関数の引数が(ほとんど)一緒。
・並列計算と通常処理を同じ関数でこなせる。
・「snow」に依存するため、バージョンアップで「snow」についていっているか確認が必要?

とりあえず動かす

主な流れ

  1. ライブラリーの読み込み
    > library(snowfall)
     要求されたパッケージ snow をロード中です
  2. sfInitでクラスターを準備する。
    > sfInit(parallel = TRUE, cpus = 4)
    R Version:  R version 3.2.2 (2015-08-14)
    
    snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.
  3. (必要があれば)ワーカーに変数やオブジェクトを渡す。
    > a <- 1:10
    > sfExport("a")
  4. snowfall 関数を使用して並列計算を行う。
    > sfSapply(11:20, sum, a)
     [1] 66 67 68 69 70 71 72 73 74 75
  5. クラスターを止める
    > sfStop()
    
    Stopping cluster

並列関数は何を使用するのか

とりあえず以下に従って、関数を書き換えてください。
引数は同じですので、いつも通りに計算できます。

apply  → sfApply
lapply → sfLapply
sapply → sfSapply

ためしにsfSapplyを実行。約3倍の速度!

> library(snowfall)
 要求されたパッケージ snow をロード中です
> sfInit(parallel = TRUE, cpus = 4)
R Version:  R version 3.2.2 (2015-08-14)

snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.

> set.seed(123)
> x <- sample(1:20)/4
> system.time(sapply(x, Sys.sleep))
   ユーザ   システム       経過
       0.0        0.0       52.5
> system.time(sfSapply(x, Sys.sleep))
   ユーザ   システム       経過
      0.01       0.00      15.50
> sfStop()

Stopping cluster

>

その他の関数を知る

関数内容
sfSource各クラスターにRファイルを読み込ませます。
sfLibrary各クラスターにパッケージを読み込ませます。
sfExport各クラスターに特定の変数を引き渡します。
sfExportAll各クラスターに全ての変数を引き渡します。
sfRemove各クラスターから特定の変数を削除します。
sfRemoveAll各クラスターから全ての変数を削除します。
sfClusterCall各クラスターに同一の命令を出します。
sfClusterEval各クラスターに同一の命令を出します。
なお、expr()と同様に評価式を入れます。
sfClusterEvalQ同上

Tips

cpu数

sfInitではcpusを指定します。

ご自身のPCのCPU数を指定しますが、以下のコードも取得できます。
なお、ハイパースレッディングを備えたCPUではスレッド数が取得されます。

> #parallelのdetectCoresを使用して取得します。
> library(parallel)
> detectCores()
[1] 4

スレッド数を指定するか、CPU数を指定するかはどちらでもいいと思います。

通常処理と並列処理の切り替え

通常処理と並列処理が同じ関数で出来ます。
これは大変なメリットです!
試しに行ってみましょう。

まずは通常の並列計算。

> sfInit(parallel = TRUE, cpus = 4)
R Version:  R version 3.2.2 (2015-08-14) 

snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.

> sfSapply(1:10, sqrt)
 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

並列をせずに初期化しても、通常通り計算できます。

> sfInit(parallel = FALSE)
snowfall 1.84-6 initialized: sequential execution, one CPU.

> sfSapply(1:10, sqrt)
 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

sfInitをし忘れても「sfInitしてないから、シーケンシャルで実行するね」と、自動的にsfInitを実行してくれます。

> sfSapply(1:10, sqrt)
Calling a snowfall function without calling 'sfInit' first or after sfStop().
'sfInit()' is called now.
snowfall 1.84-6 initialized: sequential execution, one CPU.

 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

乱数の使用

並列計算を中断した場合

並列計算を中断した場合は、必ずworkerをsfStop()で一度終了させてください。
終了しないと、意図としない結果が返ってきます。

> #普通に実行
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})
  [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017
 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034
 [35] 0.035 0.036 0.037 0.038 0.039 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051
 [52] 0.052 0.053 0.054 0.055 0.056 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068
 [69] 0.069 0.070 0.071 0.072 0.073 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085
 [86] 0.086 0.087 0.088 0.089 0.090 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102
[103] 0.103 0.104 0.105 0.106 0.107 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119
[120] 0.120 0.121 0.122 0.123 0.124 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136
[137] 0.137 0.138 0.139 0.140 0.141 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153
[154] 0.154 0.155 0.156 0.157 0.158 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170
[171] 0.171 0.172 0.173 0.174 0.175 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187
[188] 0.188 0.189 0.190 0.191 0.192 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200

> #同じ作業を中断
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})

> #変数を変えて20個のみ実行したが、前の結果が残っている?
> sfSapply((1:20)/100, function(x){Sys.sleep(x); x})
  [1] 0.010 0.020 0.030 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 0.035 0.036 0.037 0.038 0.039
 [18] 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 0.052 0.053 0.054 0.055 0.056
 [35] 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 0.069 0.070 0.071 0.072 0.073
 [52] 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 0.086 0.087 0.088 0.089 0.090
 [69] 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 0.103 0.104 0.105 0.106 0.107
 [86] 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 0.120 0.121 0.122 0.123 0.124
[103] 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 0.137 0.138 0.139 0.140 0.141
[120] 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 0.154 0.155 0.156 0.157 0.158
[137] 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 0.171 0.172 0.173 0.174 0.175
[154] 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 0.188 0.189 0.190 0.191 0.192
[171] 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200

> #残っているので再度実行して、うまくいっているように思われるが
> sfSapply((1:20)/100, function(x){Sys.sleep(x); x})
 [1] 0.01 0.02 0.03 0.04 0.05 0.06 0.07 0.08 0.09 0.10 0.11 0.12 0.13 0.14 0.15 0.16 0.17 0.18 0.19 0.20

> #やっぱり返る結果は意図としない結果。
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})
 [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017
[18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.040 0.050 0.060 0.070 0.080 0.090 0.100 0.110 0.120
[35] 0.130 0.140 0.150 0.160 0.170 0.180 0.190 0.200

R起動時のsnowfallの設定

起動時にsnowfallのsfInit使用時のデフォルト設定を指定することが出来ます。
なお、各引数は[--args]の後に書きます。

引数内容
--parallel並列計算をデフォルトにする。
--cpus=X並列計算におけるCPU数を指定する。
--type=Xクラスターのタイプを指定する。
--restoreSRsfClusterApplySR使用時に、restoreをデフォルトにする。
Windowsでは以下のように指定するとsfInitの引数無しでも
並列計算をデフォルトにし、CPU数は4、タイプはSOCKとします。
"C:\hogehoge\Rgui.exe" --args --parallel --cpus=4 --type=SOCK
なお、リンク先をいじるなら、作業フォルダも変更した方が何かと便利です。
R起動時のsnowfallの設定.JPG

========== 中級編 : 並列関数の特性を知る ==========

並列処理問題BIG3

勝手にBIG3を決めてますが、意外とあたっていると思います。
以降、この「並列処理問題BIG3」に気を付けながら紹介していきます。

並列化コーディング問題

いくら並列化できるといっても、通常処理から並列化するのに関数をいちいち書き換えたりするのはとても面倒。
一般的なイメージとして

  1. まずは、通常処理を小規模なデータで意図とする結果が得られるかを確認
  2. 小規模なデータはそのままに、並列化のコードに書き換えてエラーと格闘
  3. 小規模なデータで並列化をして、通常処理と同じ結果が得られることを確認
  4. 晴れて、大規模並列化へ

これ結構面倒ですよね。

計算負荷の不均一性問題

計算負荷が均一の場合は、あまり気にせず並列化を行えます。
24時間かかっていた処理を4コアで実行すると6時間で終わります。
でも仮に、一つの計算がやたらと時間かかる場合で他のコアが待機状態になるコードを書いていたら
実際は18時間位かかってしまうかもしれません。
待機状態を以下に少なくして、PCやサーバー、ワークステーションの力を引き出してあげるかが重要です。

データ入出力負荷問題

並列計算では、並列に計算します。(禅問答みたいですが・・・)
並列に計算させるなら、それぞれの計算先にデータ(引数など)を送ったり、結果を受け取ったりします。
大容量のデータ送受信が必要な場合、その時間は無視できません。
計算自体の時間は早くなっても、それ以上にデータ入出力負荷がかかっては元も子もありません。

データの入力負荷問題
入力負荷は並列関数を工夫する事で減らすことができます。
データの出力負荷問題
出力負荷は並列関数側ではどうにもできません。
そのため、出力負荷問題は以降取扱いません。

並列関数の特性調査

snow.time が無くては始まらない

関数snow.timeを使用することによって、各クラスターの稼働状況を確認できます。
この関数を使用しないと、各クラスターがいつ動いて、どれくらい入出力にかかっているかが分かりません。
クラスターの稼働状況を知るためにも必須アイテムです。

snow.timeによって稼働状況を取得

> st <- snow.time(clusterApply(sfGetCluster(),sample(1:16),Sys.sleep))

printで概要が示されます。

> print(st) #stだけでもOK
elapsed    send receive  node 1  node 2  node 3  node 4 
  52.01    0.00   -0.02   24.02   33.00   45.01   34.03 

snow.timeの実態は2つの要素からから出来ています。。

> names(st)
[1] "elapsed" "data"   

総経過時間と各クラスターの詳細なデータになります。

> st$elapsed
elapsed 
  52.01 
> st$data
[[1]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      11.00    11.00 11.00
[2,]      13.99    13.99      14.99    14.99  1.00
[3,]      23.00    23.00      25.02    25.01  2.02
[4,]      39.00    39.00      49.00    49.00 10.00

[[2]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      11.00       11  2.99
[2,]      13.99    13.99      22.99       23  9.00
[3,]      23.00    23.00      38.01       38 15.01
[4,]      39.00    39.00      49.00       49  6.00

[[3]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00         14    13.99 14.00
[2,]      13.99    13.99         23    23.00  7.00
[3,]      23.00    23.00         39    39.00 16.00
[4,]      39.00    39.00         49    49.00  8.01

[[4]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      13.99    13.99  3.99
[2,]      13.99    13.99      23.00    23.00  5.01
[3,]      23.00    23.00      39.00    39.00 12.02
[4,]      39.00    39.00      52.01    52.01 13.01

plotすることで、視覚的にわかるようになります。
(通常はplotして調査します。)

> plot(st)
snowTimingData.jpeg

いきなりまとめ

いきなりですが、一覧表で特性をまとめてしまいます。
(使用頻度の高いapply family を取り上げます。)

package関数コーディング問題計算負荷問題入出力負荷問題備考
snowclusterApply××ここから始まる並列関数。
なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。
snowfallsfClusterApply要素数が指定したcpusを超えると使えないなど多くの問題があります。
よって、評価対象外。
sfClusterApplyLB×負荷分散(Load Balancing)を行うことにより、コンピュータの性能をフルに活かすことが出来ます。
なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。
sfLapply
sfSapply
sfApply
×要素数を各クラスターへまとめて送るため入出力は1回のみとなり、入出力負荷問題が解決。
ただし、計算負荷問題は残る。
parallelparLapplyLB××lapplyの負荷分散(Load Balancing)版。
ただし、入出力負荷問題を犠牲にして、負荷分散を優先させている。
snowclusterMap×××mapplyの並列版。クラスターの指定が必要だったりします。
評価は散々だけど、無いよりましな並列関数。
snowfallsfClusterMap実は未実装。よって評価対象外。
実は他にも「関数はあるけど未実装」の関数がsnowfallにはあります。
snowsfMM???%*%の並列版。現在調査中

clusterApply

並列関数はここから始まったといっても過言ではない、全ての基となる並列関数です。

概要
1.各クラスターに対して、1要素を渡す。
2.全てのクラスターから計算結果を受け取ってから、各クラスターに次の1要素を渡す。
3.その繰り返し。
コーディング問題
以下の理由でコーディングは問題です。
・パッケージがsnowで、クラスターの指定をしなくてはならない。
・クラスターの指定があるので、通常・並列の切り替えが容易ではない。
・そもそもsnowfallではないので、扱いにくい。
計算負荷問題
まずは準備。
> cl <- sfGetCluster()
> set.seed(0)
> arg.sleep <- sample(1:20)/100
で、並列計算。
速くはなっていますが、待機時間が長い結果となっています。
> st <- snow.time(clusterApply(cl, arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   0.89    0.00    0.00    0.62    0.53    0.35    0.61 
> dev.new()
> plot(st, title = "clusterApply")
clusterApply.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

まずは準備。
> cl <- sfGetCluster()
> set.seed(0)
> arg.sleep <- sample(1:20)/100
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
行う実験は、Rにarg.sleep分だけ寝てもらうので
> sum(arg.sleep)
[1] 2.1
のとおり、通常処理を行えば2.1秒で終了するはずです。

さて、小さな引数を加えて並列処理を実行します。
クラスターの待機状態がありながら、約2倍の高速化となりました。
> st.S <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   0.88    0.00    0.00    0.64    0.55    0.35    0.57 
> dev.new()
> plot(st.S, title = "clusterApply with SmallArg")
clusterApply with SmallArg.jpeg
今度は、大きな引数を加えて並列処理を実行します。
なんと15.38秒と大幅に遅くなっています。
各クラスターの実行時間はほとんど変わらないにも関わらず
データの送信である「send」に非常に多くの時間がかかっているのが原因です。
このように、大きなデータを扱う場合にはかえって遅くなる場合があるのです。
> st.L <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  15.38   14.78    0.00    0.64    0.52    0.34    0.60 
> dev.new()
> plot(st.L, title = "clusterApply with LargeArg")
clusterApply with LargeArg.jpeg
総評
基本となる関数ではあるものの、コーディング問題もあるため
単体で使用するケースはほとんどないと思われる。

sfClusterApplyLB

clusterApplyの負荷分散(LoadBalancing)バージョンです。
なお、各要素がどのクラスターで計算されるかはその時のクラスターの状態によるため
計算時間やplot図が同じになることはありません。

概要
1.clusterApplyと同じように各クラスターに対して、1要素を渡す。
2.一つのクラスターが計算を終えたら、即時次の要素をクラスターに渡す。
3.その繰り返し。
コーディング問題
snowfallなので、通常・並列の切り替えが容易。
ちなみに、通常時はlapplyに引き渡される。
計算負荷問題
準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/100
で、並列計算。
clusterApplyとは違い負荷分散を行っているため、各クラスターの待機状態がほとんどありません!
> st <- snow.time(sfClusterApplyLB(arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   0.59    0.00   -0.02    0.45    0.52    0.53    0.60 
> dev.new()
> plot(st, title = "sfClusterApplyLB")
sfClusterApplyLB.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/100
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   6.01    0.00    0.02    6.00    5.20    5.30    4.51 
> dev.new()
> plot(st.S, title = "sfClusterApplyLB with SmallArg")
sfClusterApplyLB with SmallArg.jpeg
続いて、大きな引数を加えて並列処理を実行します。
負荷分散はされていますが、入出力負荷問題は残ったままです。
> st.L <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  14.17   12.47    0.01    8.09    4.46    4.29    4.10 
> dev.new()
> plot(st.L, title = "sfClusterApplyLB with LargeArg")
sfClusterApplyLB with LargeArg.jpeg
総評
負荷分散がなされているものの、入出力負荷問題は依然として残る。
また、馴染みの無い関数名なので、直接使うことは少ない。
拡張するときの基礎となる関数なので、覚えといて損はない。

sfLapply・sfSapply・sfApply

各apply familyを並列化したバージョンです。

概要
1.引数をクラスターの数に分割する。
2.分割した引数をそれぞれのクラスターに引き渡す。
3.計算結果の回収をする。
コーディング問題
snowfallなので、通常・並列の切り替えが容易。
引数は通常関数と同じ。
計算負荷問題
準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/100
で、並列計算。
いままでのclusterApply系と異なり、まず4分割して一気に並列計算させるため
入出力は各クラスターで1回のみとなっていることが分かります。
ただし、負荷分散が行われていないため、計算時間はクラスター間でまちまちの状態です。
> st <- snow.time(sfLapply(arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4
   0.75    0.00    0.00    0.56    0.59    0.21    0.75
> dev.new()
> plot(st, title = "sfLapply")
sfLapply.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/100
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4
   0.75    0.00    0.00    0.56    0.59    0.20    0.75
> dev.new()
> plot(st.S, title = "sfLapply with SmallArg")
sfLapply with SmallArg.jpeg
続いて、大きな引数を加えて並列処理を実行します。
clusterApply系では1要素の計算ごとに引数を渡していたため、入出力の回数は要素数となりましたが
Apply系では各クラスターで1回のみなので、clusterApply系に比べて早い結果となりました。
> st.L <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4
   3.87    3.12    0.00    0.56    0.60    0.19    0.75
> dev.new()
> plot(st.L, title = "sfLapply with LargeArg")
sfLapply with LargeArg.jpeg
総評
各apply familyを並列化するため、入出力が1回で済むところがこの関数の特徴。
一番簡単な並列化と言えるかもしれません。

parLapplyLB

hoge

概要
hogehoge
計算負荷問題
foo
入出力負荷問題
piyo
総評
bar

clusterMap

hoge

概要
hogehoge
計算負荷問題
foo
入出力負荷問題
piyo
総評
bar

sfMM

hoge

概要
hogehoge
計算負荷問題
foo
入出力負荷問題
piyo
総評
bar

sfClusterApplySR

hoge

概要
hogehoge
計算負荷問題
foo
入出力負荷問題
piyo
総評
bar

========== 上級編 : snowfallの拡張 ==========

未実装関数の作成

乏しい知識ですが、未実装関数を作成してみました。
バグや誤りなど、お気づきの点がありましたら、指摘・加筆・修正のご協力をお願いいたします。

lapply

sapply

apply

mapply

replicate

%*%

sfet.set.seed

01  > clusterSetupRNG
02  function (cl, type = "RNGstream", ...) 
03  {
04      RNGnames <- c("RNGstream", "SPRNG")
05      rng <- pmatch(type, RNGnames)
06      if (is.na(rng)) 
07          stop(paste("'", type, "' is not a valid choice. Choose 'RNGstream' or 'SPRNG'.", 
08              sep = ""))
09      type <- RNGnames[rng]
10      if (rng == 1) 
11          clusterSetupRNGstream(cl, ...)
12      else clusterSetupSPRNG(cl, ...)
13      type
14  }
15  <environment: namespace:snow>
01  > sfClusterSetupRNGstream
02  function (seed = rep(12345, 6), ...)
03  {
04      sfCheck()
05      if (sfParallel())
06          clusterSetupRNGstream(sfGetCluster(), seed = seed, ...)
07      else {
08          warning(paste("Uniform random number streams (currently) not available in serial execution.",
09              "Random numbers may differ in serial & parallel execution."))
10          set.seed(seed[1])
11      }
12  }
13  <environment: namespace:snowfall>
01  > clusterSetupRNGstream
02  function (cl, seed = rep(12345, 6), ...)
03  {
04      if (!require(rlecuyer))
05          stop("the `rlecuyer' package is needed for RNGstream support.")
06      .lec.init()
07      .lec.SetPackageSeed(seed)
08      nc <- length(cl)
09      names <- as.character(1:nc)
10      .lec.CreateStream(names)
11      states <- lapply(names, .lec.GetStateList)
12      invisible(clusterApply(cl, states, initRNGstreamNode))
13  }
14  <environment: namespace:snow>
01  > .lec.SetPackageSeed
02  function (seed = rep(12345, 6))
03  {
04      if (!exists(".lec.Random.seed.table", envir = .GlobalEnv))
05          .lec.init()
06      seed <- .lec.CheckSeed(seed)
07      .Call("r_set_package_seed", as.double(seed), PACKAGE = "rlecuyer")
08      return(seed)
09  }
10  <environment: namespace:rlecuyer>
01  > .lec.CheckSeed
02  function (seed)
03  {
04      ll <- length(seed)
05      if (ll < 6) {
06          tmpseed <- 12345
07          seed <- c(seed, rep(tmpseed, 6 - ll))
08      }
09      if (ll > 6) {
10          seed <- seed[1:6]
11          warning("Seed may not exceed the length of 6. Truncated to ",
12              paste(seed, collapse = ", "))
13      }
14      return(seed)
15  }
16  <environment: namespace:rlecuyer>

クラスターへの乱数設定のために、以下の関数が用意されています。

sfClusterSetupRNG
sfClusterSetupRNGstream

でも、役に立たない。(と感じています。)

試しに2CPUsで。

> sfInit(TRUE,2)
snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 2 CPUs.

乱数を初期化。

> sfClusterSetupRNG()
[1] "RNGstream"
> sfClusterCall(runif,5)
[[1]]
[1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299

[[2]]
[1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954

もう一回初期化。

> sfClusterSetupRNG()
[1] "RNGstream"
> sfClusterCall(runif,5)
[[1]]
[1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299

[[2]]
[1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954

おんなじやん。使えん。

まぁ、sfClusterSetupRNGで使用しているclusterSetupRNGstreamの中身が

clusterSetupRNGstream (cl, seed=rep(12345,6), ...)

なので~、いつも同じseed値が使われるので当たり前の結果ですね。
しかも、set.seedのように引数にNULL値を入れて、再初期化(re-initializes)することが出来ません。

■検証中■

sfSet.Seed <- function(seed){
    snowfall:::sfCheck()
    seed.alt <- runif(6, -.Machine$integer.max, .Machine$integer.max)

    if (sfParallel()) {
        if (is.null(seed)) {
            sfClusterSetupRNGstream(seed.alt)
            invisible(NULL)
        } else {
            sfClusterSetupRNGstream(seed)
            invisible(NULL)
        }
    } else {
        set.seed(seed)
        invisible(NULL)
    }
}

sfSet.Seed(NULL);sfClusterCall(runif,5)

Tips

========== その他 ==========

参考文献・文書など

コメント



トップ   新規 一覧 検索 最終更新   ヘルプ   最終更新のRSS