Julia のコルーチン(タスク)の動作を理解する
この記事のまとめ:
- Julia が提供する並列計算の1つであるコルーチン(タスク)の使い方を説明します。
- Channel によるタスクとして関数呼び出し、タスク間通信、マルチスレッド処理について説明します。
背景
Julia で無線通信のリアルタイム処理を行おうと考えており、並列処理なしでは処理時間の限界を感じたので Julia の並列処理について調べてみました。
まだまだ Julia は勉強中なので間違い等ございましたらご指摘いただけると幸いです。
Julia の並列計算
Julia における並列計算を理解するためには Julia の公式マニュアルの “Parallel Computing” のページをまずは理解することが出発地点です。 ただ、これはかなりの分量の文章がありますので、少しずつ見ていきたいます。
まずは、Julia が提供する並列計算は主に3つに分類できます。
- コルーチン (Coroutine) またはタスク
- マルチスレッディング
- マルチコアまたは分散処理
そのうちのコルーチンについて、今回は触れていきます。 なお、公式マニュアルだけでは完全に理解できないところもあったので Channel について説明されている Qiita の記事 “Channel のススメ for Julia v1.3” なども参考にしました。 実際にコードを動かしながらコルーチンの動作の振る舞いを説明していきます。
コルーチン
複数の計算を"切り替える"ためにタスク(別名コルーチン)を使用します。 並列計算といってもデフォルトでは同時に処理されるわけではないことに注意が必要です。 ファイルを読み込んでいる間や、外部サービスや外部プログラムの完了を待っている間にほかのタスクを実行できれば、全体的な実行時間を改善できます。 それを実現するものがタスクです。 公式マニュアルには次のように説明されています (下記は翻訳版)。
Juliaは、新しいタスクを作るコマンドとして、
Channel(func::Function, ctype=Any, csize=0, taskref=nothing)
を提供しています。 タスクはfunc
関数を呼び出し、それをctype
とcsize
の新しいChannel
にバインドし、タスクをスケジュールします。 この場合、1つ条件があり、func
関数は引数なしで呼び出せることが必要です。また、
Channel
はタスク間の通信を提供できます。 この場合、Channel{T}(sz::Int)
として定義し、T
型、sz
サイズのバッファ付きチャネルを作成します。 コードがfetch
やwait
のような通信操作を行うたびに、現在のタスクは中断され、スケジューラが別のタスクを選択して実行します。 タスクは、待っているイベントが完了すると再起動されます。
ただし、Julia v1.3 でタスクをマルチスレッドで実行するオプションが追加されました。 これについても最後に説明します。
上記のとおり、タスクは Channel を使って作られるため、この Channel の使い方について説明していきます。
Channel の利用例①:関数呼び出し
上記で説明した Channel の2つの利用例を見ていきましょう。 次の例では、別の関数を呼び出すタスクの実行をします。
まずは、適当な関数を定義します。
function foo(ch::Channel)println("[bar] will run \"put\"")put!(ch, "put")println("[bar] done \"put\"")end
Channel で関数を呼ぶ出す際にの csize
の引数の有無で Channel と put
の動作がよくわかるので比較してみます。
なお、csize
はデフォルトでは 0
に設定されています。
Channel で上で定義した foo
関数のタスクを csize
を指定せずに作ります。
その際、どのような順序で処理されるか見ていきます。
c1 = Channel(foo)println("[main] will run \"take\" channel c1, whose size is zero")println(take!(c1))sleep(1)close(c1)
出力結果は次のとおりです。
同じように、csize
を 1 以上に指定してタスクを作ります。
c2 = Channel(foo, csize=2)println("[main] done \"take\" channel c2, whose size is two")println(take!(c2))sleep(1)close(c2)
出力結果は次のとおりです。
まず、foo
関数内で put
した文字列が別の場所から take
によって取得できていることから、 Channel が異なる処理との通信の役割をになっていることがわかります。
また、1つ目の実行では Channel のサイズが 0 となっています。
公式のマニュアルに Channel がいっぱいのとき put
を行うと処理をブロックすると記載されいるとおり、take
が実行されるまで foo
関数内の処理は中断し、呼び出し元に処理が戻っていることがわかります。
そして、take
を実行した時点で foo
関数の put
以降の処理に戻っています。
一方、2つ目の実行では Channel のサイズを 2 としたため、foo
関数内の put
で処理がブロックされずすべての foo
関数の処理が完了していることがわかります。
Channel の動作の理解から少し話がそれますが、Channel で処理する関数の定義は次のように Channel コンストラクターを利用して定義することもできます。
先ほどの foo
関数は次のとおりに定義できます。
c3 = Channel(csize=2) do chprintln("[bar] will run \"put\"")put!(ch, "put")println("[bar] done \"put\"")endprintln("[main] done \"take\" channel c2, whose size is two")println(take!(c3))sleep(1)close(c3)
出力結果は、2 つ目の実行と同じになります。
Channel の利用例②:タスク間通信
続いては、Channel を使ったタスク間の通信のみを提供するケースを見てみます。
この例では、2 つのチャネルとチャネル間でデータの引き渡しをする関数を定義しています。
do_sleep
関数では、Channel jobs
に値が入っているときに、ランダムな時間だけスリープし、jobs
に入っていた値とスリープした時間を Channel results
に put
するようなタスク間通信を行う処理です。
なお、公式マニュアルに説明があるとおり、Channel は for
ループで繰り返し処理ができます。
jobs = Channel{Int}(32);results = Channel{Tuple}(32);function do_sleep()for job_id in jobsexec_time = rand()sleep(exec_time) # simulates elapsed time doing actual work# typically performed externally.put!(results, (job_id, exec_time))endend
Channel jobs
に値を入れていきます。この場合では、jobs
には 1-10 の値が入っている状態です。
n = 10for i in 1:10put!(jobs, i)end
do_sleep
を 4 つのタスクとして非同期(バックグラウンド)で起動します。
for i in 1:4 # start 4 tasks to process requests in parallel@async do_sleep()end
Channel results
に何が入っているか見てみましょう。
@elapsed while n > 0 # print out resultsjob_id, exec_time = take!(results)println("$job_id finished in $(round(exec_time; digits=2)) seconds")global n = n - 1end
Channel results
には、jobs
に入っていた 1-10 の値が入っていることがわかります。
また、その実行順序が今回の重要なポイントです。
出力の順番は昇順に並んでいるわけではなく、順序が前後していることがわかります。
これは、4 つのタスクを実行させたため、各タスクが sleep 処理に入った時点で別のタスクに処理が切り替わりながら 4 つのタスクを並列で処理したということがわかります。
これを理解するために、はじめの 4 つの結果を見る分かりやすいです。
はじめの 4 つのタスク (job_id
が 1-4 のもの) はスリープ時間が昇順に並んでいることがわかります。
これは 4 つタスクを実行したうち、早くスリープが終わった順に並んでいるだけです。
そのあとにスリープが終わったものから次のループを処理しているということです。
ただし、ここで注意が必要なのは、はじめに説明したとおり、タスク(コルーチン)は処理を切り替えているだけです。 すなわち、この並列処理が成り立つのは、ファイルの入出力やネットワークの受信待ちなど CPU を占有しない処理での待ち時間がある処理だけに有効です。
ここで do_sleep
関数の中身を CPU 処理を必要とする処理に変えた do_heavy_load
という関数を定義します。
jobs2 = Channel{Int}(32);results2 = Channel{Tuple}(32);function do_heavy_load()for job_id in jobs2start = time()for i = 1:10rand(5000,5000) .* rand(5000,5000)endput!(results2, (job_id, time()))endendn = 10for i in 1:10put!(jobs2, i)endfor i in 1:n # start 4 tasks to process requests in parallel@async do_heavy_load()end@elapsed while n > 0 # print out resultsjob_id, exec_time = take!(results2)println("$job_id finished in $(round(exec_time; digits=2)) seconds")global n = n - 1end
これを実行した結果を見ると Channel results2
に入っている job_id
が昇順に並んでいることがわかります。
do_heavy_load
関数では常に 1 つのタスクが CPU を占有するため、タスクの切り替えが行われません。
そのため、このような結果になります。
Channel の利用例③:マルチスレッド並列処理
Julia v1.3 からタスク処理でマルチスレッド並列処理ができるようになりました。 ここまでの説明ではタスクは処理の"切り替え"、つまり基本的※にはシングルスレッドで処理されるもの説明してきました。
※ Julia では逆行列計算など一部の処理はタスク等に関係なく、マルチスレッドで実行されるように設計されているものがありますので、そういったものは除きます。
ここから説明するタスクの使い方をすれば、並列で動かすタスクが異なるスレッドで実行されるため、同時に処理されます。
まず、マルチスレッド処理を行うためには、Julia が利用できるスレッド数を設定するところから必要です。
設定方法は、公式マニュアルの “Parallel Computing” のページに記載されています。
具体的には環境変数として JULIA_NUM_THREADS
に Julia で利用できるスレッド数を入れて、Julia を起動します。
環境設定の設定方法は OS によって異なりますので、それぞれの設定方法は上記のページに記載されています。
なお、Jupyter Notebook を使っている場合には別途設定方法がありますので、その説明方法をこの記事の付録に記載しておきます。
それでは設定が完了したら下記を実行し、設定したスレッド数どおりに表示されるか確認します。
Threads.nthreads()
次に、マルチスレッドで処理させたい関数とタスクからデータを引き渡すための channel を定義します。channel には終了順序を確認するための job_id
と終了時間を保存していきます。
using Datesresults3 = Channel{Tuple}(32)function do_heavy_load2(job_id)for i = 1:10rand(5000,5000) .* rand(5000,5000)endput!(results3, (job_id, unix2datetime(time())))end
do_heavy_load2
関数を動かすタスクを作る関数を定義します。
task_do_heavy_load(n) = Channel{String}(spawn=true) do chput!(ch, do_heavy_load2(n))end
なお、文法的には Channel コンストラクターを使ってタスクを作る assignment form 形式での関数を定義しています。
これを 10 回実行してみます。
for i = 1:10task_do_heavy_load(i)end
結果を確認します。
for (job_id, end_time) in results3println("$job_id: $end_time")if !isready(results3)breakendend
多少の終了時間の差分はありますが 4 つずつ並列に動いていることがわかります。
付録:Jupyter Notebook でコマンドラインオプションや環境変数を設定する方法
Jupyter Notebook を使って Julia のコーディングをしている場合、Ijulia パッケージからコマンドラインオプションと環境変数の設定ができます。 具体的にはそれらの設定を行った新しい Julia カーネルを作ります。
次の例は、環境変数として JULIA_NUM_THREADS=4
とする “Julia (4 threads)” という名前の Julia カーネルを作ります。
using IJuliainstallkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))
ブラウザーから Jupyter Notebook を再読み込みし、メニューバーの “Kernel” -> “Change kernel” を見ると先ほど作った “Julia (4 threads)” があるはずです。
このカーネルに切り替えると 4 スレッド使えるカーネルに切り替わります。
同様にコマンドラインオプションを付けて実行したい場合(たとえば、プロジェクトの指定(--project
)や、ワーカープロセス数の指定(--procs
)など)、個別のカーネルを作って対応させることができます。
詳細は上記リンクをご参照ください。
今回は以上です。 最後まで読んでいただき、ありがとうございます。
コメント
コメントを投稿