Julia のコルーチン(タスク)の動作を理解する


この記事のまとめ:
  • Julia が提供する並列計算の1つであるコルーチン(タスク)の使い方を説明します。
  • Channel によるタスクとして関数呼び出し、タスク間通信、マルチスレッド処理について説明します。
背景

Julia で無線通信のリアルタイム処理を行おうと考えており、並列処理なしでは処理時間の限界を感じたので Julia の並列処理について調べてみました。

まだまだ Julia は勉強中なので間違い等ございましたらご指摘いただけると幸いです。

Julia の並列計算

Julia における並列計算を理解するためには Julia の公式マニュアルの “Parallel Computing” のページをまずは理解することが出発地点です。 ただ、これはかなりの分量の文章がありますので、少しずつ見ていきたいます。

まずは、Julia が提供する並列計算は主に3つに分類できます。

  • コルーチン (Coroutine) またはタスク
  • マルチスレッディング
  • マルチコアまたは分散処理

そのうちのコルーチンについて、今回は触れていきます。 なお、公式マニュアルだけでは完全に理解できないところもあったので Channel について説明されている Qiita の記事 “Channel のススメ for Julia v1.3” なども参考にしました。 実際にコードを動かしながらコルーチンの動作の振る舞いを説明していきます。

コルーチン

複数の計算を"切り替える"ためにタスク(別名コルーチン)を使用します。 並列計算といってもデフォルトでは同時に処理されるわけではないことに注意が必要です。 ファイルを読み込んでいる間や、外部サービスや外部プログラムの完了を待っている間にほかのタスクを実行できれば、全体的な実行時間を改善できます。 それを実現するものがタスクです。 公式マニュアルには次のように説明されています (下記は翻訳版)。

Juliaは、新しいタスクを作るコマンドとして、Channel(func::Function, ctype=Any, csize=0, taskref=nothing) を提供しています。 タスクは func 関数を呼び出し、それを ctypecsize の新しい Channel にバインドし、タスクをスケジュールします。 この場合、1つ条件があり、func 関数は引数なしで呼び出せることが必要です。

また、Channel はタスク間の通信を提供できます。 この場合、Channel{T}(sz::Int) として定義し、T 型、sz サイズのバッファ付きチャネルを作成します。 コードが fetchwait のような通信操作を行うたびに、現在のタスクは中断され、スケジューラが別のタスクを選択して実行します。 タスクは、待っているイベントが完了すると再起動されます。

ただし、Julia v1.3 でタスクをマルチスレッドで実行するオプションが追加されました。 これについても最後に説明します。

上記のとおり、タスクは Channel を使って作られるため、この Channel の使い方について説明していきます。

Channel の利用例①:関数呼び出し

上記で説明した Channel の2つの利用例を見ていきましょう。 次の例では、別の関数を呼び出すタスクの実行をします。

まずは、適当な関数を定義します。

function foo(ch::Channel)
    println("[bar] will run \"put\"")
    put!(ch, "put")
    println("[bar] done \"put\"")
end

Channel で関数を呼ぶ出す際にの csize の引数の有無で Channel と put の動作がよくわかるので比較してみます。 なお、csize はデフォルトでは 0 に設定されています

Channel で上で定義した foo 関数のタスクを csize を指定せずに作ります。 その際、どのような順序で処理されるか見ていきます。

c1 = Channel(foo)
println("[main] will run \"take\" channel c1, whose size is zero")
println(take!(c1))
sleep(1)
close(c1)

出力結果は次のとおりです。

[bar] will run "put"
[main] will run "take" channel c1, whose size is zero
put
[bar] done "put"

同じように、csize を 1 以上に指定してタスクを作ります。

c2 = Channel(foo, csize=2)
println("[main] done \"take\" channel c2, whose size is two")
println(take!(c2))
sleep(1)
close(c2)

出力結果は次のとおりです。

[bar] will run "put"
[bar] done "put"
[main] done "take" channel c2, whose size is two
put

まず、foo 関数内で put した文字列が別の場所から take によって取得できていることから、 Channel が異なる処理との通信の役割をになっていることがわかります。

また、1つ目の実行では Channel のサイズが 0 となっています。 公式のマニュアルに Channel がいっぱいのとき put を行うと処理をブロックすると記載されいるとおり、take が実行されるまで foo 関数内の処理は中断し、呼び出し元に処理が戻っていることがわかります。 そして、take を実行した時点で foo 関数の put 以降の処理に戻っています。 一方、2つ目の実行では Channel のサイズを 2 としたため、foo 関数内の put で処理がブロックされずすべての foo 関数の処理が完了していることがわかります。

Channel の動作の理解から少し話がそれますが、Channel で処理する関数の定義は次のように Channel コンストラクターを利用して定義することもできます。 先ほどの foo 関数は次のとおりに定義できます。

c3 = Channel(csize=2) do ch
    println("[bar] will run \"put\"")
    put!(ch, "put")
    println("[bar] done \"put\"")
end
println("[main] done \"take\" channel c2, whose size is two")
println(take!(c3))
sleep(1)
close(c3)

出力結果は、2 つ目の実行と同じになります。

[bar] will run "put"
[bar] done "put"
[main] done "take" channel c2, whose size is two
put
Channel の利用例②:タスク間通信

続いては、Channel を使ったタスク間の通信のみを提供するケースを見てみます。 この例では、2 つのチャネルとチャネル間でデータの引き渡しをする関数を定義しています。 do_sleep 関数では、Channel jobs に値が入っているときに、ランダムな時間だけスリープし、jobs に入っていた値とスリープした時間を Channel resultsput するようなタスク間通信を行う処理です。 なお、公式マニュアルに説明があるとおり、Channel は for ループで繰り返し処理ができます。

jobs = Channel{Int}(32);
results = Channel{Tuple}(32);
 
function do_sleep()
    for job_id in jobs
       exec_time = rand()
       sleep(exec_time)                # simulates elapsed time doing actual work
                                       # typically performed externally.
       put!(results, (job_id, exec_time))
    end
end
 

Channel jobs に値を入れていきます。この場合では、jobs には 1-10 の値が入っている状態です。

n = 10
for i in 1:10
   put!(jobs, i)
end

do_sleep を 4 つのタスクとして非同期(バックグラウンド)で起動します。

for i in 1:4 # start 4 tasks to process requests in parallel
   @async do_sleep()
end

Channel results に何が入っているか見てみましょう。

@elapsed while n > 0 # print out results
   job_id, exec_time = take!(results)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds")
   global n = n - 1
end
4 finished in 0.2 seconds
3 finished in 0.37 seconds
2 finished in 0.57 seconds
5 finished in 0.43 seconds
1 finished in 0.76 seconds
7 finished in 0.22 seconds
8 finished in 0.57 seconds
6 finished in 0.93 seconds
10 finished in 0.59 seconds
9 finished in 1.0 seconds

Channel results には、jobs に入っていた 1-10 の値が入っていることがわかります。 また、その実行順序が今回の重要なポイントです。 出力の順番は昇順に並んでいるわけではなく、順序が前後していることがわかります。 これは、4 つのタスクを実行させたため、各タスクが sleep 処理に入った時点で別のタスクに処理が切り替わりながら 4 つのタスクを並列で処理したということがわかります。 これを理解するために、はじめの 4 つの結果を見る分かりやすいです。 はじめの 4 つのタスク (job_id が 1-4 のもの) はスリープ時間が昇順に並んでいることがわかります。 これは 4 つタスクを実行したうち、早くスリープが終わった順に並んでいるだけです。 そのあとにスリープが終わったものから次のループを処理しているということです。

ただし、ここで注意が必要なのは、はじめに説明したとおり、タスク(コルーチン)は処理を切り替えているだけです。 すなわち、この並列処理が成り立つのは、ファイルの入出力やネットワークの受信待ちなど CPU を占有しない処理での待ち時間がある処理だけに有効です。

ここで do_sleep 関数の中身を CPU 処理を必要とする処理に変えた do_heavy_load という関数を定義します。

jobs2 = Channel{Int}(32);
results2 = Channel{Tuple}(32);
 
function do_heavy_load()
    for job_id in jobs2
        start = time()
        for i = 1:10
            rand(5000,5000) .* rand(5000,5000)
        end
        put!(results2, (job_id, time()))
    end
end
 
n = 10
for i in 1:10
   put!(jobs2, i)
end
 
for i in 1:n # start 4 tasks to process requests in parallel
   @async do_heavy_load()
end
 
@elapsed while n > 0 # print out results
   job_id, exec_time = take!(results2)
   println("$job_id finished in $(round(exec_time; digits=2)) seconds")
   global n = n - 1
end
1 finished in 1.59333751797e9 seconds
2 finished in 1.59333752041e9 seconds
3 finished in 1.59333752285e9 seconds
4 finished in 1.59333752528e9 seconds
5 finished in 1.59333752772e9 seconds
6 finished in 1.59333753015e9 seconds
7 finished in 1.59333753259e9 seconds
8 finished in 1.59333753503e9 seconds
9 finished in 1.59333753746e9 seconds
10 finished in 1.5933375399e9 seconds

これを実行した結果を見ると Channel results2 に入っている job_id が昇順に並んでいることがわかります。 do_heavy_load 関数では常に 1 つのタスクが CPU を占有するため、タスクの切り替えが行われません。 そのため、このような結果になります。

Channel の利用例③:マルチスレッド並列処理

Julia v1.3 からタスク処理でマルチスレッド並列処理ができるようになりました。 ここまでの説明ではタスクは処理の"切り替え"、つまり基本的※にはシングルスレッドで処理されるもの説明してきました。

※ Julia では逆行列計算など一部の処理はタスク等に関係なく、マルチスレッドで実行されるように設計されているものがありますので、そういったものは除きます。

ここから説明するタスクの使い方をすれば、並列で動かすタスクが異なるスレッドで実行されるため、同時に処理されます。

まず、マルチスレッド処理を行うためには、Julia が利用できるスレッド数を設定するところから必要です。 設定方法は、公式マニュアルの “Parallel Computing” のページに記載されています。 具体的には環境変数として JULIA_NUM_THREADS に Julia で利用できるスレッド数を入れて、Julia を起動します。 環境設定の設定方法は OS によって異なりますので、それぞれの設定方法は上記のページに記載されています。 なお、Jupyter Notebook を使っている場合には別途設定方法がありますので、その説明方法をこの記事の付録に記載しておきます。

それでは設定が完了したら下記を実行し、設定したスレッド数どおりに表示されるか確認します。

Threads.nthreads()
4

次に、マルチスレッドで処理させたい関数とタスクからデータを引き渡すための channel を定義します。channel には終了順序を確認するための job_id と終了時間を保存していきます。

using Dates
 
results3 = Channel{Tuple}(32)
 
function do_heavy_load2(job_id)
    for i = 1:10
        rand(5000,5000) .* rand(5000,5000)
    end
    put!(results3, (job_id, unix2datetime(time())))
end
do_heavy_load2 (generic function with 1 method)

do_heavy_load2 関数を動かすタスクを作る関数を定義します。

task_do_heavy_load(n) = Channel{String}(spawn=true) do ch
    put!(ch, do_heavy_load2(n))
end
task_do_heavy_load (generic function with 1 method)

なお、文法的には Channel コンストラクターを使ってタスクを作る assignment form 形式での関数を定義しています。

これを 10 回実行してみます。

for i = 1:10
    task_do_heavy_load(i)
end

結果を確認します。

for (job_id, end_time) in results3
    println("$job_id: $end_time")
    if !isready(results3)
        break
    end
end
1: 2020-06-28T09:46:03.428
2: 2020-06-28T09:46:03.769
3: 2020-06-28T09:46:03.77
4: 2020-06-28T09:46:03.772
5: 2020-06-28T09:46:07.026
10: 2020-06-28T09:46:07.027
6: 2020-06-28T09:46:07.191
7: 2020-06-28T09:46:07.191
9: 2020-06-28T09:46:09.956
8: 2020-06-28T09:46:10.085

多少の終了時間の差分はありますが 4 つずつ並列に動いていることがわかります。

付録:Jupyter Notebook でコマンドラインオプションや環境変数を設定する方法

Jupyter Notebook を使って Julia のコーディングをしている場合、Ijulia パッケージからコマンドラインオプションと環境変数の設定ができます。 具体的にはそれらの設定を行った新しい Julia カーネルを作ります。

次の例は、環境変数として JULIA_NUM_THREADS=4 とする “Julia (4 threads)” という名前の Julia カーネルを作ります。

using IJulia
installkernel("Julia (4 threads)", env=Dict("JULIA_NUM_THREADS"=>"4"))

ブラウザーから Jupyter Notebook を再読み込みし、メニューバーの “Kernel” -> “Change kernel” を見ると先ほど作った “Julia (4 threads)” があるはずです。 このカーネルに切り替えると 4 スレッド使えるカーネルに切り替わります。 同様にコマンドラインオプションを付けて実行したい場合(たとえば、プロジェクトの指定(--project)や、ワーカープロセス数の指定(--procs)など)、個別のカーネルを作って対応させることができます。 詳細は上記リンクをご参照ください。


今回は以上です。 最後まで読んでいただき、ありがとうございます。
関連記事



コメント

このブログの人気の投稿

ネットワーク越しの RTL-SDR で SDR# を使う方法

PythonでPinterestのPin (画像)の検索結果を取得する