並列計算 snowfallとその拡張

マルチコアが主流となってきているので、並列計算のパッケージである「snowfall」を紹介します。
なお、特に断りが無い場合は4個のSocketクラスターを使用しています。
また、画像は縮小表示しています。クリックすると画像のみがオリジナルサイズで表示されます。

現在編集中
調査中の内容があるため
信用に値しない内容も多分に含まれています。


目次


========== 初級編 : snowfallを知る ==========

初級編に関するコメント


並列計算で計算速度は向上するか?

並列計算のイメージ

並列概念.jpg

並列化できる割合によって、理論上の限界が変わってきます。
アムダールの法則を参照)

なお、計算負荷の不均一性やデータ入出力負荷が理由で、かえって遅くなる場合もあります。

並列計算は計算速度向上の一つの手段にすぎません。
計算速度向上の手段はいくつもありますので、適切な方法を用いてください。
CRAN Task View: High-Performance and Parallel Computing with R

どのような計算が並列化できるか

基本的にapply familyは全て並列化可能です。
詳細な条件としては、「計算の順序を入れ替えても計算結果が同じである」事になります。

apply familyも内部ではforを使用しているので、並列化できるfor・並列化できないforが存在します。

  • 並列化できないforの例
    • ケース1:依存関係がある。
      以下の例は、ひとつ前の数値に対して、関数hogeを適用するため並列化が出来ません。
      順序よくx[1] → x[2] → x[3] → x[4] → x[5] と実行しないと正しい計算ができません。
      for(i in 1:5){
          x[i] <- hoge(x[i - 1])
      }
      これは、マルコフ連鎖にも言える事であり、マルコフ連鎖の生成スピードを上げようとするならばRcppを使用するなど並列計算ではない方法を用いなくてはなりません。
  • ケース2:グローバル変数など、現在の環境以外の数値を変更する
    以下の例は、Sys.sleepの時間によって休止する時間が不明であり、そのあとiをグローバル変数であるresultに代入するため並列化が出来ません。
    for(i in 1:5){
        Sys.sleep(runif(1))
        result <<- i
    }
    もし仮に、iが2の時の休止時間が一番長いとすると、これを並列計算してしまうとグローバル変数であるresultは2になってしまいます。

と、不安になるような事を書きましたが、上記のケースはどちらもsnowfallでは実現できないため、snowfallで並列計算を行う場合は気にしなくて大丈夫です。

各種パッケージ比較

package利点欠点
parallel・デフォルトパッケージとしてインストールされる。
・誰でも使用できる?
・下記「snow」「multicore」のコピーみたいなものなので、少々使いにくい。
・snowで使用できたクラスター稼働状況を調査する「snow.time」が使用できない。
multicore・parallelパッケージの基礎となった優秀なパッケージ
(だと思う。ウィンドウズ環境しかないため使用していません。)
・うゐんどうずで動かない。
snow・様々な関数が用意されている。
・parallelパッケージの基礎となった優秀なパッケージ
・クラスター稼働状況を調査する「snow.time」が使用できる。
・常にクラスターの指定をしなくてはならない。
snowfall・クラスターの指定をする必要がない。
・関数の引数が(ほとんど)一緒。
・並列計算と通常処理を同じ関数でこなせる。
・「snow」に依存するため、バージョンアップで「snow」についていっているか確認が必要?

とりあえず動かす

主な流れ

  1. ライブラリーの読み込み
    > library(snowfall)
     要求されたパッケージ snow をロード中です
  2. sfInitでクラスターを準備する。
    > sfInit(parallel = TRUE, cpus = 4)
    R Version:  R version 3.2.2 (2015-08-14)
    
    snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.
  3. (必要があれば)クラスターに変数やオブジェクトを渡す。
    > a <- 1:10
    > sfExport("a")
  4. snowfall 関数を使用して並列計算を行う。
    > sfSapply(11:20, sum, a)
     [1] 66 67 68 69 70 71 72 73 74 75
  5. クラスターを止める
    > sfStop()
    
    Stopping cluster

並列関数は何を使用するのか

とりあえず以下に従って、関数を書き換えてください。
引数は同じですので、いつも通りに計算できます。

apply  → sfApply
lapply → sfLapply
sapply → sfSapply

ためしにsfSapplyを実行。約3倍の速度!

> library(snowfall)
 要求されたパッケージ snow をロード中です
> sfInit(parallel = TRUE, cpus = 4)
R Version:  R version 3.2.2 (2015-08-14)

snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.

> set.seed(123)
> x <- sample(1:20)/4
> system.time(sapply(x, Sys.sleep))
   ユーザ   システム       経過
       0.0        0.0       52.5
> system.time(sfSapply(x, Sys.sleep))
   ユーザ   システム       経過
      0.01       0.00      15.50
> sfStop()

Stopping cluster

>

その他の関数を知る

関数内容
sfSource各クラスターにRファイルを読み込ませます。
sfLibrary各クラスターにパッケージを読み込ませます。
sfExport各クラスターに特定の変数を引き渡します。
sfExportAll?各クラスターに全ての変数を引き渡します。
sfRemove各クラスターから特定の変数を削除します。
sfRemoveAll?各クラスターから全ての変数を削除します。
sfClusterCall?各クラスターに同一の命令を出します。
sfClusterEval?各クラスターに同一の命令を出します。
なお、expr()と同様に評価式を入れます。
sfClusterEvalQ同上

Tips

cpu数

sfInitではcpusを指定します。

ご自身のPCのCPU数を指定しますが、以下のコードも取得できます。
なお、ハイパースレッディングを備えたCPUではスレッド数が取得されます。

> #parallelのdetectCoresを使用して取得します。
> library(parallel)
> detectCores()
[1] 4

スレッド数を指定するか、CPU数を指定するかはどちらでもいいと思います。

通常処理と並列処理の切り替え

通常処理と並列処理が同じ関数で出来ます。
これは大変なメリットです!
試しに行ってみましょう。

まずは通常の並列計算。

> sfInit(parallel = TRUE, cpus = 4)
R Version:  R version 3.2.2 (2015-08-14) 

snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 4 CPUs.

> sfSapply(1:10, sqrt)
 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

並列をせずに初期化しても、通常通り計算できます。

> sfInit(parallel = FALSE)
snowfall 1.84-6 initialized: sequential execution, one CPU.

> sfSapply(1:10, sqrt)
 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

sfInitをし忘れても「sfInitしてないから、シーケンシャルで実行するね」と、自動的にsfInitを実行してくれます。

> sfSapply(1:10, sqrt)
Calling a snowfall function without calling 'sfInit' first or after sfStop().
'sfInit()' is called now.
snowfall 1.84-6 initialized: sequential execution, one CPU.

 [1] 1.000000 1.414214 1.732051 2.000000 2.236068 2.449490 2.645751 2.828427 3.000000 3.162278

乱数の使用

乱数を使用する場合は

sfClusterSetupRNGstream

を使用して、乱数列を指定してください。
なお、引数にNULLを指定してもset.seedのようにランダムな値をシード値に設定したことにはなりませんので、整数値を指定してください。

並列計算を中断した場合

並列計算を中断した場合は、必ずクラスターをsfStop()で一度終了させてください。
終了しないと、意図としない結果が返ってきます。

> #普通に実行
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})
  [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017
 [18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034
 [35] 0.035 0.036 0.037 0.038 0.039 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051
 [52] 0.052 0.053 0.054 0.055 0.056 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068
 [69] 0.069 0.070 0.071 0.072 0.073 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085
 [86] 0.086 0.087 0.088 0.089 0.090 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102
[103] 0.103 0.104 0.105 0.106 0.107 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119
[120] 0.120 0.121 0.122 0.123 0.124 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136
[137] 0.137 0.138 0.139 0.140 0.141 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153
[154] 0.154 0.155 0.156 0.157 0.158 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170
[171] 0.171 0.172 0.173 0.174 0.175 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187
[188] 0.188 0.189 0.190 0.191 0.192 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200

> #同じ作業を中断
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})

> #変数を変えて20個のみ実行したが、前の結果が残っている?
> sfSapply((1:20)/100, function(x){Sys.sleep(x); x})
  [1] 0.010 0.020 0.030 0.026 0.027 0.028 0.029 0.030 0.031 0.032 0.033 0.034 0.035 0.036 0.037 0.038 0.039
 [18] 0.040 0.041 0.042 0.043 0.044 0.045 0.046 0.047 0.048 0.049 0.050 0.051 0.052 0.053 0.054 0.055 0.056
 [35] 0.057 0.058 0.059 0.060 0.061 0.062 0.063 0.064 0.065 0.066 0.067 0.068 0.069 0.070 0.071 0.072 0.073
 [52] 0.074 0.075 0.076 0.077 0.078 0.079 0.080 0.081 0.082 0.083 0.084 0.085 0.086 0.087 0.088 0.089 0.090
 [69] 0.091 0.092 0.093 0.094 0.095 0.096 0.097 0.098 0.099 0.100 0.101 0.102 0.103 0.104 0.105 0.106 0.107
 [86] 0.108 0.109 0.110 0.111 0.112 0.113 0.114 0.115 0.116 0.117 0.118 0.119 0.120 0.121 0.122 0.123 0.124
[103] 0.125 0.126 0.127 0.128 0.129 0.130 0.131 0.132 0.133 0.134 0.135 0.136 0.137 0.138 0.139 0.140 0.141
[120] 0.142 0.143 0.144 0.145 0.146 0.147 0.148 0.149 0.150 0.151 0.152 0.153 0.154 0.155 0.156 0.157 0.158
[137] 0.159 0.160 0.161 0.162 0.163 0.164 0.165 0.166 0.167 0.168 0.169 0.170 0.171 0.172 0.173 0.174 0.175
[154] 0.176 0.177 0.178 0.179 0.180 0.181 0.182 0.183 0.184 0.185 0.186 0.187 0.188 0.189 0.190 0.191 0.192
[171] 0.193 0.194 0.195 0.196 0.197 0.198 0.199 0.200

> #残っているので再度実行して、うまくいっているように思われるが
> sfSapply((1:20)/100, function(x){Sys.sleep(x); x})
 [1] 0.01 0.02 0.03 0.04 0.05 0.06 0.07 0.08 0.09 0.10 0.11 0.12 0.13 0.14 0.15 0.16 0.17 0.18 0.19 0.20

> #やっぱり返る結果は意図としない結果。
> sfSapply((1:200)/1000, function(x){Sys.sleep(x); x})
 [1] 0.001 0.002 0.003 0.004 0.005 0.006 0.007 0.008 0.009 0.010 0.011 0.012 0.013 0.014 0.015 0.016 0.017
[18] 0.018 0.019 0.020 0.021 0.022 0.023 0.024 0.025 0.040 0.050 0.060 0.070 0.080 0.090 0.100 0.110 0.120
[35] 0.130 0.140 0.150 0.160 0.170 0.180 0.190 0.200

R起動時のsnowfallの設定

起動時にsnowfallのsfInit使用時のデフォルト設定を指定することが出来ます。
なお、各引数は[--args]の後に書きます。

引数内容
--parallel並列計算をデフォルトにする。
--cpus=X並列計算におけるCPU数を指定する。
--type=Xクラスターのタイプを指定する。

詳しい設定は

vignette("snowfall")
snowfall:::searchCommandline

を参照してください。

Windowsでは以下のように指定するとsfInitの引数無しでも
並列計算をデフォルトにし、CPU数は4、タイプはSOCKとします。
"C:\hogehoge\Rgui.exe" --args --parallel --cpus=4 --type=SOCK
R起動時のsnowfallの設定.JPG
なお、リンク先をいじるなら、作業フォルダも変更した方が何かと便利です。

========== 中級編 : 並列関数の特性を知る ==========

中級編に関するコメント


並列処理問題BIG3

勝手にBIG3を決めてますが、意外とあたっていると思います。
以降、この「並列処理問題BIG3」に気を付けながら紹介していきます。

並列化コーディング問題

いくら並列化できるといっても、通常処理から並列化するのに関数をいちいち書き換えたりするのはとても面倒。
一般的なイメージとして

  1. まずは、通常処理を小規模なデータで意図とする結果が得られるかを確認
  2. 小規模なデータはそのままに、並列化のコードに書き換えてエラーと格闘
  3. 小規模なデータで並列化をして、通常処理と同じ結果が得られることを確認
  4. 晴れて、大規模並列化へ

これ結構面倒ですよね。

計算負荷の不均一性問題

計算負荷が均一の場合は、あまり気にせず並列化を行えます。
24時間かかっていた処理を4コアで実行すると6時間で終わります。
でも仮に、一つの計算がやたらと時間かかる場合で他のコアが待機状態になるコードを書いていたら
実際は18時間位かかってしまうかもしれません。
待機状態を以下に少なくして、PCやサーバー、ワークステーションの力を引き出してあげるかが重要です。

データ入出力負荷問題

並列計算では、並列に計算します。(禅問答みたいですが・・・)
並列に計算させるなら、それぞれの計算先にデータ(引数など)を送ったり、結果を受け取ったりします。
大容量のデータ送受信が必要な場合、その時間は無視できません。
計算自体の時間は早くなっても、それ以上にデータ入出力負荷がかかっては元も子もありません。

データの入力負荷問題
入力負荷は並列関数を工夫する事で減らすことができます。
データの出力負荷問題
出力負荷は並列関数側ではどうにもできません。
そのため、出力負荷問題は以降取扱いません。

並列関数の特性調査

snow.time が無くては始まらない

関数snow.timeを使用することによって、各クラスターの稼働状況を確認できます。
この関数を使用しないと、各クラスターがいつ動いて、どれくらい入出力にかかっているかが分かりません。
クラスターの稼働状況を知るためにも必須アイテムです。

snow.timeによって稼働状況を取得

> st <- snow.time(clusterApply(sfGetCluster(),sample(1:16),Sys.sleep))

printで概要が示されます。

> print(st) #stだけでもOK
elapsed    send receive  node 1  node 2  node 3  node 4 
  52.01    0.00   -0.02   24.02   33.00   45.01   34.03 

snow.timeの実態は2つの要素からから出来ています。

> names(st)
[1] "elapsed" "data"   

総経過時間と各クラスターの詳細なデータになります。

> st$elapsed
elapsed 
  52.01 
> st$data
[[1]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      11.00    11.00 11.00
[2,]      13.99    13.99      14.99    14.99  1.00
[3,]      23.00    23.00      25.02    25.01  2.02
[4,]      39.00    39.00      49.00    49.00 10.00

[[2]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      11.00       11  2.99
[2,]      13.99    13.99      22.99       23  9.00
[3,]      23.00    23.00      38.01       38 15.01
[4,]      39.00    39.00      49.00       49  6.00

[[3]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00         14    13.99 14.00
[2,]      13.99    13.99         23    23.00  7.00
[3,]      23.00    23.00         39    39.00 16.00
[4,]      39.00    39.00         49    49.00  8.01

[[4]]
     send_start send_end recv_start recv_end  exec
[1,]       0.00     0.00      13.99    13.99  3.99
[2,]      13.99    13.99      23.00    23.00  5.01
[3,]      23.00    23.00      39.00    39.00 12.02
[4,]      39.00    39.00      52.01    52.01 13.01

plotすることで、視覚的にわかるようになります。
(通常はplotして調査します。)

> plot(st)
snowTimingData.jpeg
  • 緑の四角:クラスター使用中
  • 青の実線:結果をマスターに返すために待機中
  • 赤の実線:マスターとクラスター間でデータの受け渡し
  • 黒の破線:クラスター待機中

いきなりまとめ

いきなりですが、一覧表で特性をまとめてしまいます。
(使用頻度の高いapply family を取り上げます。)

package関数コーディング問題計算負荷問題入出力負荷問題備考
snowclusterApply××ここから始まる並列関数。
なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。
snowfallsfClusterApply?要素数が指定したcpusを超えると使えないなど多くの問題があります。
よって、評価対象外。
sfClusterApplyLB×負荷分散(Load Balancing)を行うことにより、コンピュータの性能をフルに活かすことが出来ます。
なお、内部的にするのが主な目的なので、この関数をそのまま使用することはないと思います。
sfLapply
sfSapply
sfApply
×要素数を各クラスターへまとめて送るため入出力は1回のみとなり、入出力負荷問題が解決。
ただし、計算負荷問題は残る。
snowclusterMap×××mapplyの並列版。クラスターの指定が必要だったりします。
評価は散々だけど、無いよりましな並列関数。
snowfallsfClusterMap?実は未実装。よって評価対象外。
実は他にも「関数はあるけど未実装」の関数がsnowfallにはあります。
sfClusterApplySR××sfClusterApply?を実行中の結果を外部ファイルに記録して(Saves Result)
途中で予期せぬシャットダウンなどがあった場合でも計算途中から復帰できます。
出来ますが、あまり需要はないかもしれません。

clusterApply

並列関数はここから始まったといっても過言ではない、全ての基となる並列関数です。

概要
1.各クラスターに対して、1要素を渡す。
2.全てのクラスターから計算結果を受け取ってから、各クラスターに次の1要素を渡す。
3.その繰り返し。
コーディング問題
以下の理由でコーディングは問題です。
・パッケージがsnowで、クラスターの指定をしなくてはならない。
・クラスターの指定があるので、通常・並列の切り替えが容易ではない。
・そもそもsnowfallではないので、扱いにくい。
計算負荷問題
まずは準備。
> cl <- sfGetCluster()
> set.seed(0)
> arg.sleep <- sample(1:20)/10
で、並列計算。
速くはなっていますが、待機時間が長い結果となっています。
> st <- snow.time(clusterApply(cl, arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   8.82    0.00   -0.02    6.21    5.41    3.50    5.92 
> dev.new()
> plot(st, title = "clusterApply")
clusterApply.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

まずは準備。
> cl <- sfGetCluster()
> set.seed(0)
> arg.sleep <- sample(1:20)/10
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
行う実験は、Rにarg.sleep分だけ寝てもらうので
> sum(arg.sleep)
[1] 21
のとおり、通常処理を行えば21秒で終了するはずです。

さて、小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   8.81    0.03    0.01    6.19    5.39    3.50    5.90 
> dev.new()
> plot(st.S, title = "clusterApply with SmallArg")
clusterApply with SmallArg.jpeg
今度は、大きな引数を加えて並列処理を実行します。
なんと大幅に遅くなっています。
各クラスターの実行時間はほとんど変わらないにも関わらず
データの送信である「send」に非常に多くの時間がかかっているのが原因です。
このように、大きなデータを扱う場合にはかえって遅くなる場合があるのです。
> st.L <- snow.time(clusterApply(cl, arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  18.23   11.73    0.02    6.21    5.42    3.51    5.90 
> dev.new()
> plot(st.L, title = "clusterApply with LargeArg")
clusterApply with LargeArg.jpeg
総評
基本となる関数ではあるものの、コーディング問題もあるため
単体で使用するケースはほとんどないと思われる。

sfClusterApplyLB

clusterApplyの負荷分散(LoadBalancing?)バージョンです。
なお、各要素がどのクラスターで計算されるかはその時のクラスターの状態によるため
計算時間やplot図が同じになることはありません。

概要
1.clusterApplyと同じように各クラスターに対して、1要素を渡す。
2.一つのクラスターが計算を終えたら、即時次の要素をクラスターに渡す。
3.その繰り返し。
コーディング問題
snowfallなので、通常・並列の切り替えが容易。
ちなみに、通常時はlapplyに引き渡される。
計算負荷問題
準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/10
で、並列計算。
clusterApplyとは違い負荷分散を行っているため、各クラスターの待機状態がほとんどありません!
> st <- snow.time(sfClusterApplyLB(arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   5.99    0.01   -0.02    5.98    5.19    5.31    4.49 
> dev.new()
> plot(st, title = "sfClusterApplyLB")
sfClusterApplyLB.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/10
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   6.01    0.00    0.01    6.00    5.22    5.30    4.51 
> dev.new()
> plot(st.S, title = "sfClusterApplyLB with SmallArg")
sfClusterApplyLB with SmallArg.jpeg
続いて、大きな引数を加えて並列処理を実行します。
負荷分散はされていますが、入出力負荷問題は残ったままです。
> st.L <- snow.time(sfClusterApplyLB(arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  12.72   10.97    0.04    6.01    5.20    3.50    6.28 
> dev.new()
> plot(st.L, title = "sfClusterApplyLB with LargeArg")
sfClusterApplyLB with LargeArg.jpeg
総評
負荷分散がなされているものの、入出力負荷問題は依然として残る。
また、馴染みの無い関数名なので、直接使うことは少ない。
拡張するときの基礎となる関数なので、覚えといて損はない。

sfLapply・sfSapply・sfApply

各apply familyを並列化したバージョンです。

概要
1.引数をクラスターの数に分割する。
2.分割した引数をそれぞれのクラスターに引き渡す。
3.計算結果の回収をする。
コーディング問題
snowfallなので、通常・並列の切り替えが容易。
引数は通常関数と同じ。
計算負荷問題
準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/10
で、並列計算。
いままでのclusterApply系と異なり、まず4分割して一気に並列計算させるため
入出力は各クラスターで1回のみとなっていることが分かります。
ただし、負荷分散が行われていないため、計算時間はクラスター間でまちまちの状態です。
> st <- snow.time(sfLapply(arg.sleep, Sys.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   7.51    0.00    0.01    5.60    5.90    1.99    7.50 
> dev.new()
> plot(st, title = "sfLapply")
sfLapply.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/10
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- NULL
> arg.L <- numeric(10^7)
小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   7.51    0.00    0.01    5.60    5.90    1.99    7.50 
> dev.new()
> plot(st.S, title = "sfLapply with SmallArg")
sfLapply with SmallArg.jpeg
続いて、大きな引数を加えて並列処理を実行します。
clusterApply系では1要素の計算ごとに引数を渡していたため、入出力の回数は要素数となりましたが
Apply系では各クラスターで1回のみなので、clusterApply系に比べて早い結果となりました。
> st.L <- snow.time(sfLapply(arg.sleep, fun.sleep, arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  10.02    2.51   -0.01    5.60    5.90    1.99    7.51 
> dev.new()
> plot(st.L, title = "sfLapply with LargeArg")
sfLapply with LargeArg.jpeg
総評
各apply familyを並列化するため、入出力が1回で済むところがこの関数の特徴。
一番簡単な並列化と言えるかもしれません。

clusterMap

これはmapplyを並列化したバージョンです。
上記apply familyとは異なり、引数を分割して計算します。

概要
1.各クラスターに対して、1要素を渡す。
2.全てのクラスターから計算結果を受け取ってから、各クラスターに次の1要素を渡す。
3.その繰り返し。
コーディング問題
以下の理由でコーディングは問題です。
・パッケージがsnowで、クラスターの指定をしなくてはならない。
・クラスターの指定があるので、通常・並列の切り替えが容易ではない。
・そもそもsnowfallではないので、扱いにくい。
計算負荷問題
準備。
> cl <- sfGetCluster()
> set.seed(0)
> arg.sleep <- sample(1:20)/10
で、並列計算。
グラフの形がclusterApplyにそっくりです。
これはclusterMapの内部でclusterApply(正確にはstaticClusterApply?)を使用しているためです。
各クラスターの計算終了を待ってから次の計算に移るため、待機時間が発生しています。
> st <- snow.time(clusterMap(cl, Sys.sleep, arg.sleep))
> st
elapsed    send receive  node 1  node 2  node 3  node 4 
   8.82    0.00    0.00    6.22    5.39    3.51    5.89 
> dev.new()
> plot(st, title = "clusterMap")
clusterMap.jpeg
入出力負荷問題
今度は入出力負荷をかけた場合を見てみます。

準備。
> set.seed(0)
> arg.sleep <- sample(1:20)/10
> fun.sleep <- function(x, ...) Sys.sleep(x)
> arg.S <- list(NULL)
> arg.L <- list(numeric(10^7))
小さな引数を加えて並列処理を実行します。
> st.S <- snow.time(clusterMap(cl, fun.sleep, arg.sleep, MoreArgs = arg.S))
> st.S
elapsed    send receive  node 1  node 2  node 3  node 4 
   8.82    0.00    0.01    6.20    5.40    3.51    5.91 
> dev.new()
> plot(st.S, title = "clusterMap with SmallArg")
clusterMap with SmallArg.jpeg
続いて、大きな引数を加えて並列処理を実行します。
clusterApplyと同様に入出力負荷問題が発生しています。
> st.L <- snow.time(clusterMap(cl, fun.sleep, arg.sleep, MoreArgs = arg.L))
> st.L
elapsed    send receive  node 1  node 2  node 3  node 4 
  17.31   10.75    0.01    6.20    5.40    3.49    5.90 
> dev.new()
> plot(st.L, title = "clusterMap with LargeArg")
clusterMap with LargeArg.jpeg
総評
コーディングにも問題があり、計算負荷分散もない、入出力負荷問題も残る。
悪いことだらけだが、拡張のための貴重な資料として参考にする。

sfClusterApplySR

sfClusterApplySRは今までの並列関数とは一味異なる関数です。
基本的にsfClusterApply?と同等の処理を行いますが、一回の処理ごとに中間ファイルに保存をします。
保存することで、予期せぬシャットダウンによって処理が中断されてしまっても、初めから計算を行うのではなく
計算途中から復帰できるメリットがあります。
ありますが、使用する場面は少ないかもしれません。

実際に例を見てみましょう。

sfClusterApplySRを使用して計算を開始します。
その際に、以下のメッセージが表示されます。

  • Restoreが指定されていないため、結果を復帰しません。
  • 中間ファイルは「/hoge/piyo/SAVE_DEFAULT_default」です。

そして、計算が開始され進捗状況も表示されます。

> result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x})
Restore is not active! No results are loaded.
Saving results to:  /hoge/piyo/SAVE_DEFAULT_default

SR 'default' processed: 20%
SR 'default' processed: 40%
SR 'default' processed: 60%

と、ここで予期せぬシャットダウンが起きたとします。
60%まで計算終わっているので、再度Rを立ち上げてクラスターなどの準備後、同じ命令文の引数に

restore = TRUE

を付けると、「復帰してください。」という命令文になります。

> result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x}, restore = TRUE)
Restoring previous made results from file: /hoge/piyo/SAVE_DEFAULT_default
Starting calculation at  64 % ( 65 / 100 )
SR 'default' processed: 80%
SR 'default' processed: 100%

計算が終了して、結果を見てみると正しい答えが返ってきています。

> matrix(unlist(result), 10)
      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9] [,10]
 [1,]    1   11   21   31   41   51   61   71   81    91
 [2,]    2   12   22   32   42   52   62   72   82    92
 [3,]    3   13   23   33   43   53   63   73   83    93
 [4,]    4   14   24   34   44   54   64   74   84    94
 [5,]    5   15   25   35   45   55   65   75   85    95
 [6,]    6   16   26   36   46   56   66   76   86    96
 [7,]    7   17   27   37   47   57   67   77   87    97
 [8,]    8   18   28   38   48   58   68   78   88    98
 [9,]    9   19   29   39   49   59   69   79   89    99
[10,]   10   20   30   40   50   60   70   80   90   100

なお、restore対象の数が合わないときはエラーとなります。
その際はエラーの通り、中間ファイルを「手動で削除」してください。

> result <- sfClusterApplySR(1:200, function(x){Sys.sleep(1); x})
Restore is not active! No results are loaded.
Saving results to:  /hoge/piyo/SAVE_DEFAULT_default

SR 'default' processed: 10%
SR 'default' processed: 20%

#たとえばここで予期せぬシャットダウン

> result <- sfClusterApplySR(1:100, function(x){Sys.sleep(1); x}, restore = TRUE)
Restoring previous made results from file: /hoge/piyo/SAVE_DEFAULT_default
 sfClusterApplySR(1:100, function(x) { でエラー:
  Variable result from resultfile has different length to data: 200 <-> 100
Please remove file manually.
注意
・計算の終了は、計算結果が「NULL」ではない事で判定しています。そのため、返り値がNULLの場合は、そこから復帰します。
・意図的に「ESC」や「Ctrl + C」で終了した場合は、sfStop()を使用してクラスターを一度終了させてください。(詳細は並列計算を中断した場合を参照。)

sfClusterApplyLB vs sfLapply

上記までは、同じ関数で引数のサイズによる入出力負荷問題を見てきました。
今回は、引数のサイズを同じにした場合の関数の違いを見てみます。
比較するのはsfClusterApplyLBとsfLapplyであり、計算はsqrtを10万回行います。
もちろん通常処理の方が速いですが、特性を見るために比較します。

回数の設定

> arg <- 1:10^5

処理の比較をしましたが、100倍もsfClusterApplyLBが遅い結果となっています。
これは、sfClusterApplyLBが10万回クラスターと入出力を行っているのに対し、sfLapplyは4回のみ入出力を行っているためです。

> system.time(sfClusterApplyLB(arg, sqrt))
   ユーザ   システム       経過
     13.31       2.48      16.54
>
> system.time(sfLapply(arg, sqrt))
   ユーザ   システム       経過
      0.11       0.02       0.14

snow.timeでクラスターの使用状況を見てみます。(なおsnow.timeを呼び出すことにより、さらに遅くなっています。)
snow.timeを見ると、sfClusterApplyLBは真っ赤。相当量の入出力が行われているのが分かります。

> st.sfClusterApplyLB <- snow.time(sfClusterApplyLB(arg, sqrt))
> dev.new(); plot(st.sfClusterApplyLB, title = "sfClusterApplyLB")
> st.sfClusterApplyLB
elapsed    send receive  node 1  node 2  node 3  node 4
 187.21   10.51   10.00    4.41    0.31    0.19    0.10
>
> st.sfLapply <- snow.time(sfLapply(arg, sqrt))
> dev.new(); plot(st.sfLapply, title = "sfLapply")
> st.sfLapply
elapsed    send receive  node 1  node 2  node 3  node 4
   0.16    0.00    0.05    0.03    0.03    0.03    0.03

vs_sfClusterApplyLB.jpegvs_sfLapply.jpeg
このように非常に多くの回数を繰り返す場合は、慎重に関数を選択する必要があります。

========== 上級編 : snowfallの拡張 ==========

上級編に関するコメント


Tips

========== その他 ==========

全体を通してのコメント

  • とりあえずページを立ち上げました。 -- markovchainmontecarlo? 2015-09-26 (土) 09:14:04

参考文献・文書など


カウンター:1538


■■■■■■■■■■■■■■■■以降下書き■■■■■■■■■■■■■■■■


拡張前の調査と方針

snowfallの関数調査

snowfallの関数を調査してみると、おおよそ以下のような感じです。
まずは、sfInitされているかのチェック。されていなければ、引数無しでsfInitを呼び出します。
次に、関数のチェック。(と思われる)

hoge <- function (hogehoge, piyo, fuga, ...) {
    sfCheck()
    checkFunction(fun)
    if (sfParallel()) {
        #並列時の処理
    } else {
        #通常時の処理
    }
}

sfCheckは以下のとおりです。

> snowfall:::sfCheck
function () 
{
    if (!sfIsRunning()) {
        message(paste("Calling a snowfall function without calling 'sfInit'", 
            "first or after sfStop().\n'sfInit()' is called now."))
        return(invisible(sfInit()))
    }
    return(invisible(TRUE))
}
<environment: namespace:snowfall>

checkFunctionは以下のとおりです。
内容に関してですが、全てTRUEを返しており、開発時の名残なのかなと思っています。

> snowfall:::checkFunction
function (fun, stopOnError = TRUE) 
{
    return(TRUE)
    state <- FALSE
    try(if (!exists(as.character(substitute(fun)), inherits = TRUE) || 
        !is.function(fun) || is.null(get(as.character(substitute(fun)), 
        inherits = TRUE)) || !is.function(fun)) 
        state <- TRUE)
    if (!state) {
        if (stopOnError) 
            stop(paste("Not a function in sfCluster function call: '", 
                fun, "'"))
    }
    return(state)
}
<environment: namespace:snowfall>

関数拡張の基本系

checkFunctionは用途を終えた関数と考えますので、拡張にあたっての基本系は以下のようにします。
(誤りなどありましたら、ご指摘下さい。)

hoge <- function (hogehoge, piyo, fuga, ...) {
    sfCheck()
    if (sfParallel()) {
        #並列時の処理
    } else {
        #通常時の処理
    }
}

関数名は衝突しては具合が悪いので、まずは調査。

> apropos("sf")
 [1] ".__T__slotsFromS3:methods" "default.stringsAsFactors"  "existsFunction"
 [4] "getMethodsForDispatch"     "lsf.str"                   "lsfit"
 [7] "sfApply"                   "sfCapply"                  "sfCat"
[10] "sfClusterApply"            "sfClusterApplyLB"          "sfClusterApplySR"
[13] "sfClusterCall"             "sfClusterEval"             "sfClusterEvalQ"
[16] "sfClusterMap"              "sfClusterSetupRNG"         "sfClusterSetupRNGstream"
[19] "sfClusterSetupSPRNG"       "sfClusterSplit"            "sfCpus"
[22] "sfExport"                  "sfExportAll"               "sfGetCluster"
[25] "sfInit"                    "sfIsRunning"               "sfLapply"
[28] "sfLibrary"                 "sfMM"                      "sfNodes"
[31] "sfParallel"                "sfRapply"                  "sfRemove"
[34] "sfRemoveAll"               "sfRestore"                 "sfSapply"
[37] "sfSession"                 "sfSetMaxCPUs"              "sfSocketHosts"
[40] "sfSource"                  "sfStop"                    "sfTest"
[43] "sfType"                    "slotsFromS3"               "SSfol"
[46] "SSfpl"                     "suppressForeignCheck"      "transform"
[49] "transform.data.frame"      "transform.default"         "windowsFont"
[52] "windowsFonts"

「sf.」で始まるのは無いようなので、snowfallに敬意を表して「sf.」を基本関数の先頭に付ける形を基本系にします。

関数拡張の基本方針

hoge

関数の拡張

lapply

sapply

apply

mapply

replicate

%*%

sfet.set.seed

01  > clusterSetupRNG
02  function (cl, type = "RNGstream", ...) 
03  {
04      RNGnames <- c("RNGstream", "SPRNG")
05      rng <- pmatch(type, RNGnames)
06      if (is.na(rng)) 
07          stop(paste("'", type, "' is not a valid choice. Choose 'RNGstream' or 'SPRNG'.", 
08              sep = ""))
09      type <- RNGnames[rng]
10      if (rng == 1) 
11          clusterSetupRNGstream(cl, ...)
12      else clusterSetupSPRNG(cl, ...)
13      type
14  }
15  <environment: namespace:snow>
01  > sfClusterSetupRNGstream
02  function (seed = rep(12345, 6), ...)
03  {
04      sfCheck()
05      if (sfParallel())
06          clusterSetupRNGstream(sfGetCluster(), seed = seed, ...)
07      else {
08          warning(paste("Uniform random number streams (currently) not available in serial execution.",
09              "Random numbers may differ in serial & parallel execution."))
10          set.seed(seed[1])
11      }
12  }
13  <environment: namespace:snowfall>
01  > clusterSetupRNGstream
02  function (cl, seed = rep(12345, 6), ...)
03  {
04      if (!require(rlecuyer))
05          stop("the `rlecuyer' package is needed for RNGstream support.")
06      .lec.init()
07      .lec.SetPackageSeed(seed)
08      nc <- length(cl)
09      names <- as.character(1:nc)
10      .lec.CreateStream(names)
11      states <- lapply(names, .lec.GetStateList)
12      invisible(clusterApply(cl, states, initRNGstreamNode))
13  }
14  <environment: namespace:snow>
01  > .lec.SetPackageSeed
02  function (seed = rep(12345, 6))
03  {
04      if (!exists(".lec.Random.seed.table", envir = .GlobalEnv))
05          .lec.init()
06      seed <- .lec.CheckSeed(seed)
07      .Call("r_set_package_seed", as.double(seed), PACKAGE = "rlecuyer")
08      return(seed)
09  }
10  <environment: namespace:rlecuyer>
01  > .lec.CheckSeed
02  function (seed)
03  {
04      ll <- length(seed)
05      if (ll < 6) {
06          tmpseed <- 12345
07          seed <- c(seed, rep(tmpseed, 6 - ll))
08      }
09      if (ll > 6) {
10          seed <- seed[1:6]
11          warning("Seed may not exceed the length of 6. Truncated to ",
12              paste(seed, collapse = ", "))
13      }
14      return(seed)
15  }
16  <environment: namespace:rlecuyer>

クラスターへの乱数設定のために、以下の関数が用意されています。

sfClusterSetupRNG
sfClusterSetupRNGstream

でも、役に立たない。(と感じています。)

試しに2CPUsで。

> sfInit(TRUE,2)
snowfall 1.84-6 initialized (using snow 0.3-13): parallel execution on 2 CPUs.

乱数を初期化。

> sfClusterSetupRNG()
[1] "RNGstream"
> sfClusterCall(runif,5)
[[1]]
[1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299

[[2]]
[1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954

もう一回初期化。

> sfClusterSetupRNG()
[1] "RNGstream"
> sfClusterCall(runif,5)
[[1]]
[1] 0.1270111 0.3185276 0.3091860 0.8258469 0.2216299

[[2]]
[1] 0.75958186 0.97831057 0.68513581 0.27926960 0.09942954

おんなじやん。使えん。

まぁ、sfClusterSetupRNGで使用しているclusterSetupRNGstreamの中身が

clusterSetupRNGstream (cl, seed=rep(12345,6), ...)

ですので、いつも同じseed値が使われるので当たり前の結果ですね。
しかも、set.seedのように引数にNULL値を入れて、再初期化(re-initializes)することが出来ません。

■検証中■

sfSet.Seed <- function(seed){
    snowfall:::sfCheck()
    seed.alt <- runif(6, -.Machine$integer.max, .Machine$integer.max)

    if (sfParallel()) {
        if (is.null(seed)) {
            sfClusterSetupRNGstream(seed.alt)
            invisible(NULL)
        } else {
            sfClusterSetupRNGstream(seed)
            invisible(NULL)
        }
    } else {
        set.seed(seed)
        invisible(NULL)
    }
}

sfSet.Seed(NULL);sfClusterCall(runif,5)
#---------------------------------------------------------
sc.cluster.assign <- function(arg.name, arg.value, env.name = ".GlobalEnv"){
	if (env.name != ".GlobalEnv") {
		sfClusterCall(assign, env.name, new.env(parent = globalenv()), pos = globalenv())
	}
	Inner.assign <- function(name, value, env.name) {
		position <- eval(parse(text = env.name))
		assign(name, value, pos = position)
		return(NULL)
	}
	sfClusterCall(Inner.assign, name = arg.name, value = arg.value, env.name = env.name)
	invisible(NULL)
}
#---------------------------------------------------------
sc.cluster.assign("hoge", 1:10, env.name = "ClusterWorkspace")
sc.cluster.assign("piyo", 11:20)

> sfClusterEval(ClusterWorkspace$hoge)
[[1]]
 [1]  1  2  3  4  5  6  7  8  9 10

[[2]]
 [1]  1  2  3  4  5  6  7  8  9 10

[[3]]
 [1]  1  2  3  4  5  6  7  8  9 10

[[4]]
 [1]  1  2  3  4  5  6  7  8  9 10

> sfClusterEval(piyo)
[[1]]
 [1] 11 12 13 14 15 16 17 18 19 20

[[2]]
 [1] 11 12 13 14 15 16 17 18 19 20

[[3]]
 [1] 11 12 13 14 15 16 17 18 19 20

[[4]]
 [1] 11 12 13 14 15 16 17 18 19 20

添付ファイル: filevs_sfClusterApplyLB.jpeg 145件 [詳細] filevs_sfLapply.jpeg 132件 [詳細] fileclusterMap with LargeArg.jpeg 139件 [詳細] fileclusterMap with SmallArg.jpeg 141件 [詳細] fileclusterMap.jpeg 138件 [詳細] filesfLapply with LargeArg.jpeg 127件 [詳細] filesfLapply with SmallArg.jpeg 125件 [詳細] filesfLapply.jpeg 125件 [詳細] filesfClusterApplyLB with LargeArg.jpeg 117件 [詳細] filesfClusterApplyLB with SmallArg.jpeg 120件 [詳細] filesfClusterApplyLB.jpeg 122件 [詳細] fileclusterApply with LargeArg.jpeg 131件 [詳細] fileclusterApply with SmallArg.jpeg 137件 [詳細] fileclusterApply.jpeg 141件 [詳細] fileR起動時のsnowfallの設定.JPG 150件 [詳細] filesnowTimingData.jpeg 135件 [詳細] file並列概念.jpg 170件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Google
WWW を検索 OKADAJP.ORG を検索
Last-modified: 2015-10-13 (火) 20:43:29 (258d)