並列計算 snowfallとその拡張
マルチコアが主流となってきているので、並列計算のパッケージである「snowfall」を紹介します。
なお、特に断りが無い場合は4個のSocketクラスターを使用しています。
また、画像は縮小表示しています。クリックすると画像のみがオリジナルサイズで表示されます。
現在編集中
調査中の内容があるため
信用に値しない内容も多分に含まれています。
目次
初級編に関するコメント
並列化できる割合によって、理論上の限界が変わってきます。
(アムダールの法則を参照)
なお、計算負荷の不均一性やデータ入出力負荷が理由で、かえって遅くなる場合もあります。
並列計算は計算速度向上の一つの手段にすぎません。
計算速度向上の手段はいくつもありますので、適切な方法を用いてください。
CRAN Task View: High-Performance and Parallel Computing with R
基本的にapply familyは全て並列化可能です。
詳細な条件としては、「計算の順序を入れ替えても計算結果が同じである」事になります。
apply familyも内部ではforを使用しているので、並列化できるfor・並列化できないforが存在します。
for(i in 1:5){ x[i] <- hoge(x[i - 1]) }これは、マルコフ連鎖にも言える事であり、マルコフ連鎖の生成スピードを上げようとするならばRcppを使用するなど並列計算ではない方法を用いなくてはなりません。
for(i in 1:5){ Sys.sleep(runif(1)) result <<- i }もし仮に、iが2の時の休止時間が一番長いとすると、これを並列計算してしまうとグローバル変数であるresultは2になってしまいます。
と、不安になるような事を書きましたが、上記のケースはどちらもsnowfallでは実現できないため、snowfallで並列計算を行う場合は気にしなくて大丈夫です。
package | 利点 | 欠点 |
parallel | ・デフォルトパッケージとしてインストールされる。 ・誰でも使用できる? | ・下記「snow」「multicore」のコピーみたいなものなので、少々使いにくい。 ・snowで使用できたクラスター稼働状況を調査する「snow.time」が使用できない。 |
multicore | ・parallelパッケージの基礎となった優秀なパッケージ (だと思う。ウィンドウズ環境しかないため使用していません。) | ・うゐんどうずで動かない。 |
snow | ・様々な関数が用意されている。 ・parallelパッケージの基礎となった優秀なパッケージ ・クラスター稼働状況を調査する「snow.time」が使用できる。 | ・常にクラスターの指定をしなくてはならない。 |
snowfall | ・クラスターの指定をする必要がない。 ・関数の引数が(ほとんど)一緒。 ・並列計算と通常処理を同じ関数でこなせる。 | ・「snow」に依存するため、バージョンアップで「snow」についていっているか確認が必要? |
> library(snowfall) 要求されたパッケージ snow をロード中です
> sfInit(parallel = TRUE, cpus = 4) R Version: R version 3.2.2 (2015-08-14) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.
> a <- 1:10 > sfExport("a")
> sfSapply(11:20, sum, a) [1] 66 67 68 69 70 71 72 73 74 75
> sfStop() Stopping cluster
とりあえず以下に従って、関数を書き換えてください。
引数は同じですので、いつも通りに計算できます。
apply → sfApply lapply → sfLapply sapply → sfSapply
ためしにsfSapplyを実行。約3倍の速度!
> library(snowfall) 要求されたパッケージ snow をロード中です > sfInit(parallel = TRUE, cpus = 4) R Version: R version 3.2.2 (2015-08-14) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs. > set.seed(123) > x <- sample(1:20)/4 > system.time(sapply(x, Sys.sleep)) ユーザ システム 経過 0.0 0.0 52.5 > system.time(sfSapply(x, Sys.sleep)) ユーザ システム 経過 0.01 0.00 15.50 > sfStop() Stopping cluster >
関数 | 内容 |
sfSource | 各クラスターにRファイルを読み込ませます。 |
sfLibrary | 各クラスターにパッケージを読み込ませます。 |
sfExport | 各クラスターに特定の変数を引き渡します。 |
sfExportAll | 各クラスターに全ての変数を引き渡します。 |
sfRemove | 各クラスターから特定の変数を削除します。 |
sfRemoveAll | 各クラスターから全ての変数を削除します。 |
sfClusterCall | 各クラスターに同一の命令を出します。 |
sfClusterEval | 各クラスターに同一の命令を出します。 なお、expr()と同様に評価式を入れます。 |
sfClusterEvalQ | 同上 |
sfInitではcpusを指定します。
ご自身のPCのCPU数を指定しますが、以下のコードも取得できます。
なお、ハイパースレッディングを備えたCPUではスレッド数が取得されます。
> #parallelのdetectCoresを使用して取得します。 > library(parallel) > detectCores() [1] 4
スレッド数を指定するか、CPU数を指定するかはどちらでもいいと思います。
通常処理と並列処理が同じ関数で出来ます。
これは大変なメリットです!
試しに行ってみましょう。
まずは通常の並列計算。
> sfInit(parallel = TRUE, cpus = 4) R Version: R version 3.2.2 (2015-08-14) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs. > sfSapply(1:10, sqrt) [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278
並列をせずに初期化しても、通常通り計算できます。
> sfInit(parallel = FALSE) snowfall 1.84-6 initialized: sequential execution, one CPU. > sfSapply(1:10, sqrt) [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278
sfInitをし忘れても「sfInitしてないから、シーケンシャルで実行するね」と、自動的にsfInitを実行してくれます。
> sfSapply(1:10, sqrt) Calling a snowfall function without calling 'sfInit' first or after sfStop(). 'sfInit()' is called now. snowfall 1.84-6 initialized: sequential execution, one CPU. [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278
乱数を使用する場合は
sfClusterSetupRNGstream
を使用して、乱数列を指定してください。
なお、引数にNULLを指定してもset.seedのようにランダムな値をシード値に設定したことにはなりませんので、整数値を指定してください。
並列計算を中断した場合は、必ずクラスターをsfStop()で一度終了させてください。
終了しないと、意図としない結果が返ってきます。
> #普通に実行 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 [35] 0.035 0.036 0.037 0.038 0.039 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 [52] 0.052 0.053 0.054 0.055 0.056 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 [69] 0.069 0.070 0.071 0.072 0.073 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 [86] 0.086 0.087 0.088 0.089 0.090 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 [103] 0.103 0.104 0.105 0.106 0.107 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 [120] 0.120 0.121 0.122 0.123 0.124 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 [137] 0.137 0.138 0.139 0.140 0.141 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 [154] 0.154 0.155 0.156 0.157 0.158 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 [171] 0.171 0.172 0.173 0.174 0.175 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 [188] 0.188 0.189 0.190 0.191 0.192 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200 > #同じ作業を中断 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) > #変数を変えて20個のみ実行したが、前の結果が残っている? > sfSapply((1:20)/100, function(x){Sys.sleep(x); x}) [1] 0.010 0.020 0.030 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 0.035 0.036 0.037 0.038 0.039 [18] 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 0.052 0.053 0.054 0.055 0.056 [35] 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 0.069 0.070 0.071 0.072 0.073 [52] 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 0.086 0.087 0.088 0.089 0.090 [69] 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 0.103 0.104 0.105 0.106 0.107 [86] 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 0.120 0.121 0.122 0.123 0.124 [103] 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 0.137 0.138 0.139 0.140 0.141 [120] 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 0.154 0.155 0.156 0.157 0.158 [137] 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 0.171 0.172 0.173 0.174 0.175 [154] 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 0.188 0.189 0.190 0.191 0.192 [171] 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200 > #残っているので再度実行して、うまくいっているように思われるが > sfSapply((1:20)/100, function(x){Sys.sleep(x); x}) [1] 0.01 0.02 0.03 0.04 0.05 0.06 0.07 0.08 0.09 0.10 0.11 0.12 0.13 0.14 0.15 0.16 0.17 0.18 0.19 0.20 > #やっぱり返る結果は意図としない結果。 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.040 0.050 0.060 0.070 0.080 0.090 0.100 0.110 0.120 [35] 0.130 0.140 0.150 0.160 0.170 0.180 0.190 0.200
起動時にsnowfallのsfInit使用時のデフォルト設定を指定することが出来ます。
なお、各引数は[--args]の後に書きます。
引数 | 内容 |
--parallel | 並列計算をデフォルトにする。 |
--cpus=X | 並列計算におけるCPU数を指定する。 |
--type=X | クラスターのタイプを指定する。 |
詳しい設定は
vignette("snowfall") snowfall:::searchCommandline
を参照してください。
"C:\hogehoge\Rgui.exe" --args --parallel --cpus=4 --type=SOCKなお、リンク先をいじるなら、作業フォルダも変更した方が何かと便利です。
中級編に関するコメント
勝手にBIG3を決めてますが、意外とあたっていると思います。
以降、この「並列処理問題BIG3」に気を付けながら紹介していきます。
いくら並列化できるといっても、通常処理から並列化するのに関数をいちいち書き換えたりするのはとても面倒。
一般的なイメージとして
これ結構面倒ですよね。
計算負荷が均一の場合は、あまり気にせず並列化を行えます。
24時間かかっていた処理を4コアで実行すると6時間で終わります。
でも仮に、一つの計算がやたらと時間かかる場合で他のコアが待機状態になるコードを書いていたら
実際は18時間位かかってしまうかもしれません。
待機状態を以下に少なくして、PCやサーバー、ワークステーションの力を引き出してあげるかが重要です。
並列計算では、並列に計算します。(禅問答みたいですが・・・)
並列に計算させるなら、それぞれの計算先にデータ(引数など)を送ったり、結果を受け取ったりします。
大容量のデータ送受信が必要な場合、その時間は無視できません。
計算自体の時間は早くなっても、それ以上にデータ入出力負荷がかかっては元も子もありません。
関数snow.timeを使用することによって、各クラスターの稼働状況を確認できます。
この関数を使用しないと、各クラスターがいつ動いて、どれくらい入出力にかかっているかが分かりません。
クラスターの稼働状況を知るためにも必須アイテムです。
snow.timeによって稼働状況を取得
> st <- snow.time(clusterApply(sfGetCluster(),sample(1:16),Sys.sleep))
printで概要が示されます。
> print(st) #stだけでもOK elapsed send receive node 1 node 2 node 3 node 4 52.01 0.00 -0.02 24.02 33.00 45.01 34.03
snow.timeの実態は2つの要素からから出来ています。
> names(st) [1] "elapsed" "data"
総経過時間と各クラスターの詳細なデータになります。
> st$elapsed elapsed 52.01 > st$data [[1]] send_start send_end recv_start recv_end exec [1,] 0.00 0.00 11.00 11.00 11.00 [2,] 13.99 13.99 14.99 14.99 1.00 [3,] 23.00 23.00 25.02 25.01 2.02 [4,] 39.00 39.00 49.00 49.00 10.00 [[2]] send_start send_end recv_start recv_end exec [1,] 0.00 0.00 11.00 11 2.99 [2,] 13.99 13.99 22.99 23 9.00 [3,] 23.00 23.00 38.01 38 15.01 [4,] 39.00 39.00 49.00 49 6.00 [[3]] send_start send_end recv_start recv_end exec [1,] 0.00 0.00 14 13.99 14.00 [2,] 13.99 13.99 23 23.00 7.00 [3,] 23.00 23.00 39 39.00 16.00 [4,] 39.00 39.00 49 49.00 8.01 [[4]] send_start send_end recv_start recv_end exec [1,] 0.00 0.00 13.99 13.99 3.99 [2,] 13.99 13.99 23.00 23.00 5.01 [3,] 23.00 23.00 39.00 39.00 12.02 [4,] 39.00 39.00 52.01 52.01 13.01
plotすることで、視覚的にわかるようになります。
(通常はplotして調査します。)
> plot(st)
いきなりですが、一覧表で特性をまとめてしまいます。
(使用頻度の高いapply family を取り上げます。)
package | 関数 | コーディング問題 | 計算負荷問題 | 入出力負荷問題 | 備考 |
snow | clusterApply | - | × | × | ここから始まる並列関数。 なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。 |
snowfall | sfClusterApply | - | - | - | 要素数が指定したcpusを超えると使えないなど多くの問題があります。 よって、評価対象外。 |
sfClusterApplyLB | - | ○ | × | 負荷分散(Load Balancing)を行うことにより、コンピュータの性能をフルに活かすことが出来ます。 なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。 | |
sfLapply sfSapply sfApply | ○ | × | ○ | 要素数を各クラスターへまとめて送るため入出力は1回のみとなり、入出力負荷問題が解決。 ただし、計算負荷問題は残る。 | |
snow | clusterMap | × | × | × | mapplyの並列版。クラスターの指定が必要だったりします。 評価は散々だけど、無いよりましな並列関数。 |
snowfall | sfClusterMap | - | - | - | 実は未実装。よって評価対象外。 実は他にも「関数はあるけど未実装」の関数がsnowfallにはあります。 |
sfClusterApplySR | ○ | × | × | sfClusterApplyを実行中の結果を外部ファイルに記録して(Saves Result) 途中で予期せぬシャットダウンなどがあった場合でも計算途中から復帰できます。 出来ますが、あまり需要はないかもしれません。 |
並列関数はここから始まったといっても過言ではない、全ての基となる並列関数です。
> cl <- sfGetCluster() > set.seed(0) > arg.sleep <- sample(1:20)/10で、並列計算。
> st <- snow.time(clusterApply(cl, arg.sleep, Sys.sleep)) > st elapsed send receive node 1 node 2 node 3 node 4 8.82 0.00 -0.02 6.21 5.41 3.50 5.92 > dev.new() > plot(st, title = "clusterApply")
> cl <- sfGetCluster() > set.seed(0) > arg.sleep <- sample(1:20)/10 > fun.sleep <- function(x, ...) Sys.sleep(x) > arg.S <- NULL > arg.L <- numeric(10^7)行う実験は、Rにarg.sleep分だけ寝てもらうので
> sum(arg.sleep) [1] 21のとおり、通常処理を行えば21秒で終了するはずです。
> st.S <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.S)) > st.S elapsed send receive node 1 node 2 node 3 node 4 8.81 0.03 0.01 6.19 5.39 3.50 5.90 > dev.new() > plot(st.S, title = "clusterApply with SmallArg")今度は、大きな引数を加えて並列処理を実行します。
> st.L <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.L)) > st.L elapsed send receive node 1 node 2 node 3 node 4 18.23 11.73 0.02 6.21 5.42 3.51 5.90 > dev.new() > plot(st.L, title = "clusterApply with LargeArg")
clusterApplyの負荷分散(LoadBalancing)バージョンです。
なお、各要素がどのクラスターで計算されるかはその時のクラスターの状態によるため
計算時間やplot図が同じになることはありません。
> set.seed(0) > arg.sleep <- sample(1:20)/10で、並列計算。
> st <- snow.time(sfClusterApplyLB(arg.sleep, Sys.sleep)) > st elapsed send receive node 1 node 2 node 3 node 4 5.99 0.01 -0.02 5.98 5.19 5.31 4.49 > dev.new() > plot(st, title = "sfClusterApplyLB")
> set.seed(0) > arg.sleep <- sample(1:20)/10 > fun.sleep <- function(x, ...) Sys.sleep(x) > arg.S <- NULL > arg.L <- numeric(10^7)小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.S)) > st.S elapsed send receive node 1 node 2 node 3 node 4 6.01 0.00 0.01 6.00 5.22 5.30 4.51 > dev.new() > plot(st.S, title = "sfClusterApplyLB with SmallArg")続いて、大きな引数を加えて並列処理を実行します。
> st.L <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.L)) > st.L elapsed send receive node 1 node 2 node 3 node 4 12.72 10.97 0.04 6.01 5.20 3.50 6.28 > dev.new() > plot(st.L, title = "sfClusterApplyLB with LargeArg")
各apply familyを並列化したバージョンです。
> set.seed(0) > arg.sleep <- sample(1:20)/10で、並列計算。
> st <- snow.time(sfLapply(arg.sleep, Sys.sleep)) > st elapsed send receive node 1 node 2 node 3 node 4 7.51 0.00 0.01 5.60 5.90 1.99 7.50 > dev.new() > plot(st, title = "sfLapply")
> set.seed(0) > arg.sleep <- sample(1:20)/10 > fun.sleep <- function(x, ...) Sys.sleep(x) > arg.S <- NULL > arg.L <- numeric(10^7)小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.S)) > st.S elapsed send receive node 1 node 2 node 3 node 4 7.51 0.00 0.01 5.60 5.90 1.99 7.50 > dev.new() > plot(st.S, title = "sfLapply with SmallArg")続いて、大きな引数を加えて並列処理を実行します。
> st.L <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.L)) > st.L elapsed send receive node 1 node 2 node 3 node 4 10.02 2.51 -0.01 5.60 5.90 1.99 7.51 > dev.new() > plot(st.L, title = "sfLapply with LargeArg")
これはmapplyを並列化したバージョンです。
上記apply familyとは異なり、引数を分割して計算します。
> cl <- sfGetCluster() > set.seed(0) > arg.sleep <- sample(1:20)/10で、並列計算。
> st <- snow.time(clusterMap(cl, Sys.sleep, arg.sleep)) > st elapsed send receive node 1 node 2 node 3 node 4 8.82 0.00 0.00 6.22 5.39 3.51 5.89 > dev.new() > plot(st, title = "clusterMap")
> set.seed(0) > arg.sleep <- sample(1:20)/10 > fun.sleep <- function(x, ...) Sys.sleep(x) > arg.S <- list(NULL) > arg.L <- list(numeric(10^7))小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(clusterMap(cl, fun.sleep, arg.sleep, MoreArgs = arg.S)) > st.S elapsed send receive node 1 node 2 node 3 node 4 8.82 0.00 0.01 6.20 5.40 3.51 5.91 > dev.new() > plot(st.S, title = "clusterMap with SmallArg")続いて、大きな引数を加えて並列処理を実行します。
> st.L <- snow.time(clusterMap(cl, fun.sleep, arg.sleep, MoreArgs = arg.L)) > st.L elapsed send receive node 1 node 2 node 3 node 4 17.31 10.75 0.01 6.20 5.40 3.49 5.90 > dev.new() > plot(st.L, title = "clusterMap with LargeArg")
sfClusterApplySRは今までの並列関数とは一味異なる関数です。
基本的にsfClusterApplyと同等の処理を行いますが、一回の処理ごとに中間ファイルに保存をします。
保存することで、予期せぬシャットダウンによって処理が中断されてしまっても、初めから計算を行うのではなく
計算途中から復帰できるメリットがあります。
ありますが、使用する場面は少ないかもしれません。
実際に例を見てみましょう。
sfClusterApplySRを使用して計算を開始します。
その際に、以下のメッセージが表示されます。
そして、計算が開始され進捗状況も表示されます。
> result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x}) Restore is not active! No results are loaded. Saving results to: /hoge/piyo/SAVE_DEFAULT_default SR 'default' processed: 20% SR 'default' processed: 40% SR 'default' processed: 60%
と、ここで予期せぬシャットダウンが起きたとします。
60%まで計算終わっているので、再度Rを立ち上げてクラスターなどの準備後、同じ命令文の引数に
restore = TRUE
を付けると、「復帰してください。」という命令文になります。
> result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x}, restore = TRUE) Restoring previous made results from file: /hoge/piyo/SAVE_DEFAULT_default Starting calculation at 64 % ( 65 / 100 ) SR 'default' processed: 80% SR 'default' processed: 100%
計算が終了して、結果を見てみると正しい答えが返ってきています。
> matrix(unlist(result), 10) [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10] [1,] 1 11 21 31 41 51 61 71 81 91 [2,] 2 12 22 32 42 52 62 72 82 92 [3,] 3 13 23 33 43 53 63 73 83 93 [4,] 4 14 24 34 44 54 64 74 84 94 [5,] 5 15 25 35 45 55 65 75 85 95 [6,] 6 16 26 36 46 56 66 76 86 96 [7,] 7 17 27 37 47 57 67 77 87 97 [8,] 8 18 28 38 48 58 68 78 88 98 [9,] 9 19 29 39 49 59 69 79 89 99 [10,] 10 20 30 40 50 60 70 80 90 100
なお、restore対象の数が合わないときはエラーとなります。
その際はエラーの通り、中間ファイルを「手動で削除」してください。
> result <- sfClusterApplySR(1:200, function(x){Sys.sleep(1); x}) Restore is not active! No results are loaded. Saving results to: /hoge/piyo/SAVE_DEFAULT_default SR 'default' processed: 10% SR 'default' processed: 20% #たとえばここで予期せぬシャットダウン > result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x}, restore = TRUE) Restoring previous made results from file: /hoge/piyo/SAVE_DEFAULT_default sfClusterApplySR(1:100, function(x) { でエラー: Variable result from resultfile has different length to data: 200 <-> 100 Please remove file manually.
上記までは、同じ関数で引数のサイズによる入出力負荷問題を見てきました。
今回は、引数のサイズを同じにした場合の関数の違いを見てみます。
比較するのはsfClusterApplyLBとsfLapplyであり、計算はsqrtを10万回行います。
もちろん通常処理の方が速いですが、特性を見るために比較します。
回数の設定
> arg <- 1:10^5
処理の比較をしましたが、100倍もsfClusterApplyLBが遅い結果となっています。
これは、sfClusterApplyLBが10万回クラスターと入出力を行っているのに対し、sfLapplyは4回のみ入出力を行っているためです。
> system.time(sfClusterApplyLB(arg, sqrt)) ユーザ システム 経過 13.31 2.48 16.54 > > system.time(sfLapply(arg, sqrt)) ユーザ システム 経過 0.11 0.02 0.14
snow.timeでクラスターの使用状況を見てみます。(なおsnow.timeを呼び出すことにより、さらに遅くなっています。)
snow.timeを見ると、sfClusterApplyLBは真っ赤。相当量の入出力が行われているのが分かります。
> st.sfClusterApplyLB <- snow.time(sfClusterApplyLB(arg, sqrt)) > dev.new(); plot(st.sfClusterApplyLB, title = "sfClusterApplyLB") > st.sfClusterApplyLB elapsed send receive node 1 node 2 node 3 node 4 187.21 10.51 10.00 4.41 0.31 0.19 0.10 > > st.sfLapply <- snow.time(sfLapply(arg, sqrt)) > dev.new(); plot(st.sfLapply, title = "sfLapply") > st.sfLapply elapsed send receive node 1 node 2 node 3 node 4 0.16 0.00 0.05 0.03 0.03 0.03 0.03
このように非常に多くの回数を繰り返す場合は、慎重に関数を選択する必要があります。
上級編に関するコメント
help(package=snow) help(package=snowfall) vignette("snowfall")
カウンター:11697
snowfallの関数を調査してみると、おおよそ以下のような感じです。
まずは、sfInitされているかのチェック。されていなければ、引数無しでsfInitを呼び出します。
次に、関数のチェック。(と思われる)
hoge <- function (hogehoge, piyo, fuga, ...) { sfCheck() checkFunction(fun) if (sfParallel()) { #並列時の処理 } else { #通常時の処理 } }
sfCheckは以下のとおりです。
> snowfall:::sfCheck function () { if (!sfIsRunning()) { message(paste("Calling a snowfall function without calling 'sfInit'", "first or after sfStop().\n'sfInit()' is called now.")) return(invisible(sfInit())) } return(invisible(TRUE)) } <environment: namespace:snowfall>
checkFunctionは以下のとおりです。
内容に関してですが、全てTRUEを返しており、開発時の名残なのかなと思っています。
> snowfall:::checkFunction function (fun, stopOnError = TRUE) { return(TRUE) state <- FALSE try(if (!exists(as.character(substitute(fun)), inherits = TRUE) || !is.function(fun) || is.null(get(as.character(substitute(fun)), inherits = TRUE)) || !is.function(fun)) state <- TRUE) if (!state) { if (stopOnError) stop(paste("Not a function in sfCluster function call: '", fun, "'")) } return(state) } <environment: namespace:snowfall>
checkFunctionは用途を終えた関数と考えますので、拡張にあたっての基本系は以下のようにします。
(誤りなどありましたら、ご指摘下さい。)
hoge <- function (hogehoge, piyo, fuga, ...) { sfCheck() if (sfParallel()) { #並列時の処理 } else { #通常時の処理 } }
関数名は衝突しては具合が悪いので、まずは調査。
> apropos("sf") [1] ".__T__slotsFromS3:methods" "default.stringsAsFactors" "existsFunction" [4] "getMethodsForDispatch" "lsf.str" "lsfit" [7] "sfApply" "sfCapply" "sfCat" [10] "sfClusterApply" "sfClusterApplyLB" "sfClusterApplySR" [13] "sfClusterCall" "sfClusterEval" "sfClusterEvalQ" [16] "sfClusterMap" "sfClusterSetupRNG" "sfClusterSetupRNGstream" [19] "sfClusterSetupSPRNG" "sfClusterSplit" "sfCpus" [22] "sfExport" "sfExportAll" "sfGetCluster" [25] "sfInit" "sfIsRunning" "sfLapply" [28] "sfLibrary" "sfMM" "sfNodes" [31] "sfParallel" "sfRapply" "sfRemove" [34] "sfRemoveAll" "sfRestore" "sfSapply" [37] "sfSession" "sfSetMaxCPUs" "sfSocketHosts" [40] "sfSource" "sfStop" "sfTest" [43] "sfType" "slotsFromS3" "SSfol" [46] "SSfpl" "suppressForeignCheck" "transform" [49] "transform.data.frame" "transform.default" "windowsFont" [52] "windowsFonts"
「sf.」で始まるのは無いようなので、snowfallに敬意を表して「sf.」を基本関数の先頭に付ける形を基本系にします。
hoge
01 > clusterSetupRNG 02 function (cl, type = "RNGstream", ...) 03 { 04 RNGnames <- c("RNGstream", "SPRNG") 05 rng <- pmatch(type, RNGnames) 06 if (is.na(rng)) 07 stop(paste("'", type, "' is not a valid choice. Choose 'RNGstream' or 'SPRNG'.", 08 sep = "")) 09 type <- RNGnames[rng] 10 if (rng == 1) 11 clusterSetupRNGstream(cl, ...) 12 else clusterSetupSPRNG(cl, ...) 13 type 14 } 15 <environment: namespace:snow>
01 > sfClusterSetupRNGstream 02 function (seed = rep(12345, 6), ...) 03 { 04 sfCheck() 05 if (sfParallel()) 06 clusterSetupRNGstream(sfGetCluster(), seed = seed, ...) 07 else { 08 warning(paste("Uniform random number streams (currently) not available in serial execution.", 09 "Random numbers may differ in serial & parallel execution.")) 10 set.seed(seed[1]) 11 } 12 } 13 <environment: namespace:snowfall>
01 > clusterSetupRNGstream 02 function (cl, seed = rep(12345, 6), ...) 03 { 04 if (!require(rlecuyer)) 05 stop("the `rlecuyer' package is needed for RNGstream support.") 06 .lec.init() 07 .lec.SetPackageSeed(seed) 08 nc <- length(cl) 09 names <- as.character(1:nc) 10 .lec.CreateStream(names) 11 states <- lapply(names, .lec.GetStateList) 12 invisible(clusterApply(cl, states, initRNGstreamNode)) 13 } 14 <environment: namespace:snow>
01 > .lec.SetPackageSeed 02 function (seed = rep(12345, 6)) 03 { 04 if (!exists(".lec.Random.seed.table", envir = .GlobalEnv)) 05 .lec.init() 06 seed <- .lec.CheckSeed(seed) 07 .Call("r_set_package_seed", as.double(seed), PACKAGE = "rlecuyer") 08 return(seed) 09 } 10 <environment: namespace:rlecuyer>
01 > .lec.CheckSeed 02 function (seed) 03 { 04 ll <- length(seed) 05 if (ll < 6) { 06 tmpseed <- 12345 07 seed <- c(seed, rep(tmpseed, 6 - ll)) 08 } 09 if (ll > 6) { 10 seed <- seed[1:6] 11 warning("Seed may not exceed the length of 6. Truncated to ", 12 paste(seed, collapse = ", ")) 13 } 14 return(seed) 15 } 16 <environment: namespace:rlecuyer>
クラスターへの乱数設定のために、以下の関数が用意されています。
sfClusterSetupRNG sfClusterSetupRNGstream
でも、役に立たない。(と感じています。)
試しに2CPUsで。
> sfInit(TRUE,2) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 2 CPUs.
乱数を初期化。
> sfClusterSetupRNG() [1] "RNGstream" > sfClusterCall(runif,5) [[1]] [1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299 [[2]] [1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954
もう一回初期化。
> sfClusterSetupRNG() [1] "RNGstream" > sfClusterCall(runif,5) [[1]] [1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299 [[2]] [1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954
おんなじやん。使えん。
まぁ、sfClusterSetupRNGで使用しているclusterSetupRNGstreamの中身が
clusterSetupRNGstream (cl, seed=rep(12345,6), ...)
ですので、いつも同じseed値が使われるので当たり前の結果ですね。
しかも、set.seedのように引数にNULL値を入れて、再初期化(re-initializes)することが出来ません。
■検証中■
sfSet.Seed <- function(seed){ snowfall:::sfCheck() seed.alt <- runif(6, -.Machine$integer.max, .Machine$integer.max) if (sfParallel()) { if (is.null(seed)) { sfClusterSetupRNGstream(seed.alt) invisible(NULL) } else { sfClusterSetupRNGstream(seed) invisible(NULL) } } else { set.seed(seed) invisible(NULL) } } sfSet.Seed(NULL);sfClusterCall(runif,5)
#--------------------------------------------------------- sc.cluster.assign <- function(arg.name, arg.value, env.name = ".GlobalEnv"){ if (env.name != ".GlobalEnv") { sfClusterCall(assign, env.name, new.env(parent = globalenv()), pos = globalenv()) } Inner.assign <- function(name, value, env.name) { position <- eval(parse(text = env.name)) assign(name, value, pos = position) return(NULL) } sfClusterCall(Inner.assign, name = arg.name, value = arg.value, env.name = env.name) invisible(NULL) } #--------------------------------------------------------- sc.cluster.assign("hoge", 1:10, env.name = "ClusterWorkspace") sc.cluster.assign("piyo", 11:20) > sfClusterEval(ClusterWorkspace$hoge) [[1]] [1] 1 2 3 4 5 6 7 8 9 10 [[2]] [1] 1 2 3 4 5 6 7 8 9 10 [[3]] [1] 1 2 3 4 5 6 7 8 9 10 [[4]] [1] 1 2 3 4 5 6 7 8 9 10 > sfClusterEval(piyo) [[1]] [1] 11 12 13 14 15 16 17 18 19 20 [[2]] [1] 11 12 13 14 15 16 17 18 19 20 [[3]] [1] 11 12 13 14 15 16 17 18 19 20 [[4]] [1] 11 12 13 14 15 16 17 18 19 20