普通のPythonスクリプトをSpark化してお手軽並列処理する #spark
この記事は1年以上前に投稿されました。情報が古い可能性がありますので、ご注意ください。
こんにちは。木内です。
Apache Sparkはいわゆる「スケーラブルな汎用分散処理エンジン」なのですが、実際にはユーザの利用形態はSQLに関する処理や、機械学習などのデータ分析関連に偏っているように思えます。"汎用"というからにはデータ分析に限らずおおよそ並列処理できるようなユースケースにも使用できると Apache Spark の用途の幅が広がるのではないかなと思います。
そこで今回はデータ分析とは全く関係のないような処理をApache Sparkで並列化してみます。
そもそもどんなところで並列処理は使用されているのか
一般的に並列処理が使用されているのはいわゆるスパコンの分野です。私はスパコンのことはよくわからないのですが、線形代数、数値解析といった用途に利用されているようです。例えば Abaqus というソフトウェアは有限要素解析を行うことができるソフトウェアですが、応用分野としては弾性解析(どれくらいの圧力をあたえるとどの程度物体が歪むか)などがあり、自動車の車体の強度(例えばハンドルを切った時にどの程度車体が"よれる"か、など)や、タイヤの材料の解析(例えばブレーキをかけた時にどの程度路面との間に摩擦が生じるか、など)などに利用されています。
身近なところではゲームの3次元グラフィックスにも並列処理が活用されています。人間が目で見ることのできる画面が、実際にはどの色で塗りつぶされるべきか1ピクセルづつ計算しながら高速に描画されています。画面上の全てのピクセルを高速に塗りつぶすために、GPUを使用した並列計算が活用されています。
今回はレイトレーシングという3次元グラフィックスを描画する計算を Apache Spark を使用して並列化してみます。レイトレーシングは上記に述べた3次元グラフィックスを描画する方法の一つで、比較的並列処理に向いているので今回使用してみます。
Spark化する前のレイトレーシングプログラム
今回はこちら( https://gist.github.com/rossant/6046463 )のプログラムを改変前のプログラムとして使用します。実行してみると以下のような画像が生成されます。
このプログラムは特に並列処理されておらず、シングルコアを使用した1プロセスで実行されます。実際には画像の左上から右下まで1ピクセルづつ全く同じ処理を繰り返しています。つまり同時に同じ処理を複数のピクセルに対して実行できれば、並列化による高速化が期待できます。
Apache Sparkでの並列化
Apache SparkではRDD(Resilient Distributed Dataset)が分散処理のためのデータの単位になります。RDDはざっくり言うと、配列を任意の単位で区切ったもので、RDD単位で別のプロセス、別のノードで処理を行うことができます。処理された結果もまた別のRDDに格納され、後段の処理にまわすことができます。
引用: http://datastrophic.io/core-concepts-architecture-and-internals-of-apache-spark/
今回は1ピクセルを処理するためのデータをRDD化し、一律に同じ処理を実行し、結果を結合するようにします。
Spark化 Step-1: 全ての処理を関数化
元のプログラムを見ると、いくつかグローバル変数が適されており、複数の関数から参照されています。Apache Sparkでは基本的には全ての関数から共有できるグローバル変数を許しておらず、変数はそれぞれの関数の中で完結していることが必要です。(broadcast変数という例外もありますが、ここでは触れません)
そこで関数 main() という関数を作り、変数と処理を移動します。変更したものが以下のコードになります。
https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-2.py
Spark化 Step-2: 変数を引数としてそれぞれの関数に渡す
ただしStep-1の変更によってエラーが出るようになってしまいました。
$ python3 raytracing-2.py 0.00% Traceback (most recent call last): File "raytracing-2.py", line 185, in <module> main() File "raytracing-2.py", line 171, in main traced = trace_ray(rayO, rayD) File "raytracing-2.py", line 82, in trace_ray for i, obj in enumerate(scene): NameError: name 'scene' is not defined
いままでグローバル変数として使用していた変数 scene が関数の中のローカル変数となったことにより、別の関数から参照できなくなってしまったことが原因です。ここではグローバル変数を参照するのではなく、関数を呼び出す時に引数として変数の内容を渡すようにします。
変更したものが以下のコードになります。
https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-3.py
Spark化 Step-3: 並列化したい部分を関数化する
Apache SparkではRDD毎に同じ関数を実行する map() 関数を定義することができます。今回はこの map() 関数を使用して並列化するため、呼び出し先の関数を定義します。
先ほどこのレイトレーシングプログラムは1ピクセルづつ同じ処理を実行していると書きました。その部分が以下の内容です。
while depth < depth_max: in_param = { (略) } traced = trace_ray(in_param) if not traced: break obj, M, N, col_ray = traced # Reflection: create a new ray. rayO, rayD = M + N * .0001, normalize(rayD - 2 * np.dot(rayD, N) * N) depth += 1 col += reflection * col_ray reflection *= obj.get('reflection', 1.)
上記の部分を関数化して、新しい関数 genpix() を作りました。変更したものが以下のソースコードになります。
https://github.com/m-kiuchi/cllab181217-spark/blob/master/raytracing-4.py
Spark化 Step-4: 入力値をRDD化し、並列実行する
プログラム側で並列化の準備ができました。最後に入力データをRDD化します。入力値は以下の部分で作成されています。
[cc lang="python"]
for i, x in enumerate(np.linspace(S[0], S[2], w)):
if i % 10 == 0:
print("{0:.2f}%".format(i / float(w) * 100))
for j, y in enumerate(np.linspace(S[1], S[3], h)):
col = 0
Q