並列計算(snowfall)
マルチコアが主流となってきているので、並列計算のパッケージである「snowfall」を紹介します。
なお、特に断りが無い場合はSocketクラスターを使用しています。
現在編集中
現在調査中の内容があるため
信用に値しない内容も多分に含まれています。
目次
並列化できる割合によって、理論上の限界が変わってきます。
(アムダールの法則を参照)
なお、関数のオーバーヘッドや通信速度の問題から、かえって遅くなる場合もあります。
並列計算は計算速度向上の一つの手段にすぎません。
計算速度向上の手段はいくつもありますので、適切な方法を用いてください。
CRAN Task View: High-Performance and Parallel Computing with R
基本的にapply familyは全て並列化可能です。
詳細な条件としては、「計算の順序を入れ替えても計算結果が同じである」事になります。
apply familyも内部ではforを使用しているので、並列化できるfor・並列化できないforが存在します。
for(i in 1:5){ x[i] <- hoge(x[i - 1]) }これは、マルコフ連鎖にも言える事であり、マルコフ連鎖の生成スピードを上げようとするならばRcppを使用するなど並列計算ではない方法を用いなくてはなりません。
for(i in 1:5){ Sys.sleep(runif(1)) result <<- i }もし仮に、iが2の時の休止時間が一番長いとすると、これを並列計算してしまうとグローバル変数であるresultは2になってしまいます。
と、不安になるような事を書きましたが、上記のケースはどちらもsnowfallでは実現できないため、snowfallで並列計算を行う場合は気にしなくて大丈夫です。
> library(snowfall) 要求されたパッケージ snow をロード中です
> sfInit(parallel = TRUE, cpus = 4) R Version: R version 3.2.2 (2015-08-14) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.
> a <- 1:10 > sfExport("a")
> sfSapply(11:20, sum, a) [1] 66 67 68 69 70 71 72 73 74 75
> sfStop() Stopping cluster
とりあえず以下に従って、関数を書き換えてください。
引数は同じですので、いつも通りに計算できます。
apply → sfApply lapply → sfLapply sapply → sfSapply
ためしにsfSapplyを実行。約3倍の速度!
> library(snowfall) 要求されたパッケージ snow をロード中です > sfInit(parallel = TRUE, cpus = 4) R Version: R version 3.2.2 (2015-08-14) snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs. > set.seed(123) > x <- sample(1:20)/4 > system.time(sapply(x, Sys.sleep)) ユーザ システム 経過 0.0 0.0 52.5 > system.time(sfSapply(x, Sys.sleep)) ユーザ システム 経過 0.01 0.00 15.50 > sfStop() Stopping cluster >
関数snow.timeを使用することによって、各クラスターの使用状況を確認できます。
> st.hoge <- snow.time(hoge) > st.hoge elapsed send receive node 1 node 2 node 3 node 4 21.75 0.00 0.02 13.77 12.24 14.50 12.00
snow.timeで得られた結果は、plotで詳細に状況を確認できます。
> plot(st.hoge)
applyでの実行時間が53.03秒に対して、並列計算を行ったclusterApplyは21.76秒!
何と2倍以上の高速化が実現!素晴らしい!
&ref(): File not found: "fig01_apply.jpg" at page "並列計算(snowfall)";&ref(): File not found: "fig01_clusterApply.jpg" at page "並列計算(snowfall)";
> #準備 > library(snow) > cl <- makeSOCKcluster(4) > set.seed(123) > x <- sample(1:20)/4 > > #計算 > (st.apply <- snow.time(apply(t(x), 2, Sys.sleep))) $elapsed [1] 53.03 $send [1] 0 $receive [1] 0 > (st.clusterApply <- snow.time(clusterApply(cl, x, Sys.sleep))) elapsed send receive node 1 node 2 node 3 node 4 21.76 0.02 0.00 13.73 12.23 14.51 11.99 > > #グラフ表示 > dev.new(); plot(st.apply, title = "apply") > dev.new(); plot(st.clusterApplyLB, title = "clusterApplyLB") > > #後処理 > stopCluster(cl) >
#準備 library(snow) cl <- makeSOCKcluster(4) set.seed(123) x <- sample(1:20)/4 #計算 (st.apply <- snow.time(apply(t(x), 2, Sys.sleep))) (st.clusterApply <- snow.time(clusterApply(cl, x, Sys.sleep))) #グラフ表示 dev.new(); plot(st.apply, title = "apply") dev.new(); plot(st.clusterApplyLB, title = "clusterApplyLB") #後処理 stopCluster(cl)
clusterApplyは、各クラスターに1個引数を渡し、すべてのクラスターの計算終了を待って次の引数をクラスターに渡します。
そのため、実行時間に差がある場合は待機状態になるクラスターが発生してしまいます。
clusterApplyは確かに速くなっているが、クラスターの待機時間があるのでこれを減らすようにしたい。
そこで出てきたのが、clusterApplyLBです。
clusterApplyLBは、計算が終了したら次の引数を引き渡すので、待機状態になるクラスターが(ほとんど)ありません!
&ref(): File not found: "fig01_clusterApply.jpg" at page "並列計算(snowfall)";&ref(): File not found: "fig02_clusterApplyLB.jpg" at page "並列計算(snowfall)";
> #準備 > library(snow) > cl <- makeSOCKcluster(4) > set.seed(123) > x <- sample(1:20)/4 > > #計算 > (st.clusterApply <- snow.time(clusterApply(cl, x, Sys.sleep))) elapsed send receive node 1 node 2 node 3 node 4 21.75 0.00 0.02 13.77 12.24 14.50 12.00 > (st.clusterApplyLB <- snow.time(clusterApplyLB(cl, x, Sys.sleep))) elapsed send receive node 1 node 2 node 3 node 4 13.76 0.00 0.00 13.01 13.74 12.76 13.02 > > #グラフ表示 > dev.new(); plot(st.clusterApply, title = "clusterApply") > dev.new(); plot(st.clusterApplyLB, title = "clusterApplyLB") > > #後処理 > stopCluster(cl) >
#準備 library(snow) cl <- makeSOCKcluster(4) set.seed(123) x <- sample(1:20)/4 #計算 (st.clusterApply <- snow.time(clusterApply(cl, x, Sys.sleep))) (st.clusterApplyLB <- snow.time(clusterApplyLB(cl, x, Sys.sleep))) #グラフ表示 dev.new(); plot(st.clusterApply, title = "clusterApply") dev.new(); plot(st.clusterApplyLB, title = "clusterApplyLB") #後処理 stopCluster(cl)
snowでは全ての並列関数にクラスターを指定しなくてはなりません。
clusterApply(cl, X, MARGIN, FUN, ...)
これでは、まず通常処理でテストを行った後に、並列化するときは
まずapplyでテストを行った後、該当部分をすべてparApplyに書き換えなくてはならず手間がかかります。
そこで登場するのがsnowfallです。
sfApply(x, margin, fun, ...) apply(X, MARGIN, FUN, ...)
ほら、一緒。
これは非常にありがたい!並列化を意識せずに使用できます!
しかも、snowfallの並列関数は通常処理の場合、通常の関数を「自動的に」適用してくれます。
> sfApply function (x, margin, fun, ...) { sfCheck() checkFunction(fun) if (sfParallel()) return(parApply(sfGetCluster(), x, margin, fun, ...)) else return(apply(x, margin, fun, ...)) } <environment: namespace:snowfall>
テストを行った後に、sfInitで並列化をし、改めて実行すると並列計算になる。
何と便利なパッケージ!
少し横道にそれましたが、関数の選択に戻ります。
乏しい知識ですが、未実装関数を作成してみました。
バグや誤りなど、お気づきの点がありましたら、指摘・加筆・修正のご協力をお願いいたします。
並列計算を中断した場合は、必ずworkerをsfStop()で一度終了させてください。
終了しないと、意図としない結果が返ってきます。
> #普通に実行 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 [35] 0.035 0.036 0.037 0.038 0.039 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 [52] 0.052 0.053 0.054 0.055 0.056 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 [69] 0.069 0.070 0.071 0.072 0.073 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 [86] 0.086 0.087 0.088 0.089 0.090 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 [103] 0.103 0.104 0.105 0.106 0.107 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 [120] 0.120 0.121 0.122 0.123 0.124 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 [137] 0.137 0.138 0.139 0.140 0.141 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 [154] 0.154 0.155 0.156 0.157 0.158 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 [171] 0.171 0.172 0.173 0.174 0.175 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 [188] 0.188 0.189 0.190 0.191 0.192 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200 > #同じ作業を中断 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) > #変数を変えて20個のみ実行したが、前の結果が残っている? > sfSapply((1:20)/100, function(x){Sys.sleep(x); x}) [1] 0.010 0.020 0.030 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 0.035 0.036 0.037 0.038 0.039 [18] 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 0.052 0.053 0.054 0.055 0.056 [35] 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 0.069 0.070 0.071 0.072 0.073 [52] 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 0.086 0.087 0.088 0.089 0.090 [69] 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 0.103 0.104 0.105 0.106 0.107 [86] 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 0.120 0.121 0.122 0.123 0.124 [103] 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 0.137 0.138 0.139 0.140 0.141 [120] 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 0.154 0.155 0.156 0.157 0.158 [137] 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 0.171 0.172 0.173 0.174 0.175 [154] 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 0.188 0.189 0.190 0.191 0.192 [171] 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200 > #残っているので再度実行して、うまくいっているように思われるが > sfSapply((1:20)/100, function(x){Sys.sleep(x); x}) [1] 0.01 0.02 0.03 0.04 0.05 0.06 0.07 0.08 0.09 0.10 0.11 0.12 0.13 0.14 0.15 0.16 0.17 0.18 0.19 0.20 > #やっぱり返る結果は意図としない結果。 > sfSapply((1:200)/1000, function(x){Sys.sleep(x); x}) [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.040 0.050 0.060 0.070 0.080 0.090 0.100 0.110 0.120 [35] 0.130 0.140 0.150 0.160 0.170 0.180 0.190 0.200
以下のコードで取得できます。
なお、ハイパースレッディングを備えたCPUではスレッド数が取得されます。
> #parallelのdetectCoresを使用して取得します。 > library(parallel) > detectCores() [1] 4
help(package=snow) help(package=snowfall) vignette("snowfall")