Haskell
Up one levelHaskellのスレッドシステムとSTMについて その1
概要
HaskellのスレッドシステムとSTMについて調べたので、ここにまとめます。
Haskellのスレッドシステムは、予想よりも複雑でした。 Haskellの世界で閉じた処理ならば、比較的簡単なのですが、 FFI(Foreign Function Interface: 外部のCの関数を呼び出す仕組み)を使うと、 とたんに複雑になってしまいます。
一度に書くと、量が多いため、数回に分けて書こうと思っています。
注意
今回の調査は、環境に強く依存します。
- GHC 6.8.2
- Mac OS X 10.5.1 (32bit code)
の環境で調べた結果に基づいています。 特にGHC以外の処理系では、このような結果にならないかもしれません。 [*]
[*] | GHC拡張を使っているコードもあるので、それ以前に実行できないかもしれません。 |
Linuxや*BSDの環境には、それほどコードを変更せずに適用出来ると思いますが、 Windows環境には適用するのが難しい所もあります。 また、32bit/64bitの違いでもコードを変更する必要があるかもしれません。
Haskellのスレッドシステムについて
Haskellのスレッドシステムには、2つのスレッドが存在します。 forkIO と forkOS という2つの関数を用いて生成することができます。
forkIO :: IO () -> IO ThreadId forkOS :: IO () -> IO ThreadId
どちらの関数も、第一引数の IO () のアクションを評価するスレッドを生成し、スレッドIDを返します。 やっている事は同じなのですが、2つの関数では生成されるスレッドの性質が異なっています。 また、 forkOS は必ずしもサポートされているわけではありません [1] 。
スレッドの性質の違いについては、ややこしいので後回しにします。 しばらくは forkIO の方しか使いません。
Haskellの参照型について
forkIO の定義を見てもわかるように、 スレッドで実行されるアクションは値を返すことができません。 しかし、時には別スレッドで実行された結果を受け取りたいこともあります。 そんな時に使うのが、参照型です。
Haskellにはいくつか参照型があり、それぞれ異なる性質を持ちます。代表的なのは以下のものです。
IORef: | ただの参照型。一番高速だが、ロックが制限的。 |
---|---|
Mvar: | ロックの使える参照型。 |
Chan: | 同時に複数のスレッドから操作しても破綻しない FIFO キュー。 |
TVar: | STMという特殊なモナドの中でのみ変更できる参照型。ロックフリー。 |
IORef は、高速ですが、 atomicModifyIORef という関数でしか アトミック性を保証出来ません。 また、複数の変数を同時にロックすることができません。
続いて MVar です。これは、ロックのかけられる IORef と捉えることができます。 また、 Chan はこの MVar を用いて実装されているキューです。 複数のスレッドから操作しても壊れることはないように設計されています [2] 。
最後のが、今回中心的に取り上げる、 TVar です。 これは STM モナドの中でのみ操作することができます。 STMとは、"Software Transactional Memory"の略です。 Software Transactional Memoryとは、RDBのそれの様に、 メモリ変更が、完全に成功したか、何もしていないかの二択の状態になるメモリ操作のことです。 一部だけ変更されるという、中途半端な状態を取らないようになります。 また、ハードウエアで実装されたものは、"Hardware Transactional Memory"と呼ばれます。
さっそく動かしてみる
ソースコード
さっそく、スレッドを動かしてみます。
import Control.Concurrent import Control.Monad (forever) -- threadDelay in milliseconds. -- `delay 1000' means that this thread sleeps 1 sec at least. delay :: Int -> IO () delay = threadDelay . (* 1000) -- Do the first argument action every `time' ms infinitely. every :: Int -> IO a -> IO () every time io = forever $ delay time >> io main :: IO () main = do forkIO $ every 1000 $ putStrLn "a" forkIO $ every 500 $ putStrLn "b" return ()
2つのユーティリティー関数とmain関数から成るプログラムです。
delay: | 標準関数の threadDelay はマイクロ秒を引数に取ります。 桁が増えて扱いがよくないので、ミリ秒を引数に取る関数を作ります。 |
---|---|
every: | time ミリ秒ごとに io を実行する関数です。永遠に止りません。 |
main 関数では、2つのスレッドを起動し、 片方のスレッドでは1秒間隔で"a"を出力します。 もう一方のスレッドでは0.5秒間隔で"b"を出力します。
このコードでは、2つのスレッドが同時に putStrLn を実行すると、 表示がおかしくなる場合がありますが、 このコードの目標が「とりあえず動かす」なので無視しています。
実行結果
$ ./mt $
生成されたファイルを実行しましたが、すぐに終了していまいました。 Haskellでは、生成したスレッドの終了を待つ他の言語で言う join という仕組みがありません。 今回のサンプルでは、一切スレッドの終了を待っていないため、 メインスレッドが終了すると、プログラム全体が終了していまいます。
他のスレッドを待つ方法については、また次回…。
- Category(s)
- Haskell
- The URL to Trackback this entry is:
- http://dev.ariel-networks.com/Members/mizyo/haskell306e30b930ec30c330b730b930c630e03068stml306b306430443066-305d306e1/tbping
HaskellのスレッドシステムとSTMについて その2
スレッドの終了を検出する
前回の続きで、スレッドの終了を検出する方法です。
いくつか実装方法があると思いますが、今回は 子スレッドにカウンターを渡しておき、終了する時にそのカウンターを1増やします。 親スレッドはそのカウンターを監視し、それが自分が作った子の数になったら 全ての子スレッドが終了したとみなすことにします。
でもその前に
これを実装するためのカウンターは、複数のスレッドで共有される変数である必要があります。 また、複数のスレッドから同時に更新しても壊れない変数である必要があります。 MVar を使ってもカウンターは実装出来ますが、今回は TVar を使います。
STM
STM モナド及び TVar の定義は Control.Concurrent.STM モジュールにあります。 使う前にそのモジュールを import する必要があります。
newTVar :: a -> STM (TVar a) readTVar :: TVar a -> STM a writeTVar :: TVar a -> a -> STM () atomically :: STM a -> IO a
TVar は STM モナド環境の中でのみ、データを操作することができます。 そして、 atomically 関数を使うと、STMアクションを原子性を持って実行することができます。 イメージとしては、STMモナドで貯めた動作を、一気に実行するイメージです。 readTVar, writeTVar は名前の通り、 TVar を読み書きする関数です。 そして、 newTVar で新しいのを作ります。
肝心の atomically の実装方法なのですが、ソース [*] を読んでもよくわかりませんでした。 ここから先の話は間違っている可能性が非常に高いため、信用しないでください [†] 。 コミットする時に他のスレッドによる変更を検出しているようです。
- コミットする時に readTVar した値が変更されていない
- コミットする時に writeTVar する変数が他のスレッドによって変更されていない
この2点が満されると、 atomically が成功するようです。 失敗した場合は、最初から STM アクションをやりなおしているのだと思われます [‡] 。
[*] | rts/PrimOps.cmm, rts/STM.c に実装があります。 |
[†] | 他の事柄についても、正しいかは無保証ですが。 |
[‡] | 間違ってたら教えてください。 |
合成性とロックフリー
この様な変更手段を取るため、 TVar はロックが必要ありません。 他のスレッドと衝突しても、STMの実装が良い様に処理をしてくれます。 ロックがいらないため、デッドロックと呼ばれる状態にはなりません。
また、 STM はモナドであるため、処理を合成することができます。 合成してもデッドロックが発生することはありません。
retry
STM モナドには retry という関数があります。 [1]
retry を呼ぶと、そのSTMモナドの処理全体を再実行することができます。 ただ、何の工夫もなく再実行するだけでは CPU の無駄遣いです。 ドキュメントにも書かれていますが、 retry されるまでに読んだ( readTVar した) 変数が変更されるまで、スレッドの実行が止まります。
この作用により、変数が1になるまで待つといった動作や、リストが空でなくなるまで待つといった動作を書く事ができます。
また、 retry を検出するために使う orElse 関数というのもあります。
カウンターを作る
ソースコード
STM モナドについて説明がおわった所で、カウンターを作りたいと思います。
- 子スレッドはカウンターを親から貰い、処理が終ったら +1 する。
- 親スレッドはカウンターが一定の値になるまで待つ。
という動作にします。
import Control.Concurrent import Control.Concurrent.STM import Control.Monad (unless, forever) newtype Counter = Counter (TVar Int) modifyTVar :: (a -> a) -> TVar a -> STM () modifyTVar f tv = readTVar tv >>= writeTVar tv . f -- create a new counter and set its value 0. newCounter :: IO Counter newCounter = do tv <- newTVarIO 0 return $ Counter tv -- Increments the counter value. incCounter :: Counter -> IO () incCounter (Counter c) = atomically $ modifyTVar (+1) c -- Wait until the counter value becomes `n' waitCounter :: Counter -> Int -> IO () waitCounter (Counter c) n = atomically $ do v <- readTVar c unless (v == n) retry -- forkIO with Counter. fork :: Counter -> IO () -> IO ThreadId fork counter act = forkIO $ act >> incCounter counter -- threadDelay in milliseconds. -- `delay 1000' means that this thread sleeps 1 sec at least. delay :: Int -> IO () delay = threadDelay . (* 1000) -- Do the first argument action every `time' ms infinitely. every :: Int -> IO a -> IO () every time io = forever $ delay time >> io main :: IO () main = do c <- newCounter fork c $ every 1000 $ putStrLn "a" fork c $ every 500 $ putStrLn "b" waitCounter c 2
その1から処理自体はあまり変化していません。 every , dalay の内容も同じです。
modifyTVar は TVar を変更するユーティリティーです。 よく使うと思うのですが、何故か標準モジュールにありません。
waitCounter の中で、例の retry を使っています。 カウンターの値が n でなければ処理をやりなおします。 ですが retry の特性のため、カウンターの値が変化するまで スレッドが起きることはありません。 これにより、無駄なループが起らないようになっています。
fork もユーティリティー関数です。 受け取ったアクションの後に incCounter するように合成してから forkIO をします。 ただし、このコードには問題があります。 エラーが起った時にカウンターが増えません。 サンプルなので省略していますが、エラー処理を入れないと危険です。
書いていて気付いたのですが、このサンプルだと、スレッドが永遠と終了しないので、 waitCounterの実感が沸きにくいですね…すみません。
- Category(s)
- Haskell
- The URL to Trackback this entry is:
- http://dev.ariel-networks.com/Members/mizyo/haskell306e30b930ec30c330b730b930c630e03068stm306b306430443066-305d306e2/tbping
HaskellのスレッドシステムとSTMについて その3
STMとロックについて
だいぶん間が開いてしまいましたが、その3です。
STMモナドによる操作は、ロックフリーです。 synchronize だとか lock だとか unlock だとかいうキーワードを書く必要はありません。 しかし、それは Haskell のコードからの視点のようです。 内部的にはロックを使っています。 そこで、ロックを実感するために、このようなコードを実行してみます。 import宣言と、ここにない関数はその1、その2から持ってきてください。
fib :: Int -> Int fib 0 = 0 fib 1 = 1 fib n = fib (n - 1) + fib (n - 2) threadA :: TVar Int -> IO () threadA v = do n <- atomically $ readTVar v print n atomically $ writeTVar v (n + 1) threadB :: TVar Int -> IO () threadB v = atomically $ writeTVar v $ fib 40 main :: IO () main = do c <- newCounter v <- newTVarIO 0 fork c $ every 500 $ threadA v fork c $ threadB v waitCounter c 2
fib は有名なフィボナッチ数列です。 時間がかからないと意味がないので、わざと遅い実装にしています。 threadA は fib 40 を計算して、2つのスレッドで共有している変数 v に結果を代入しています。 threadB は v の値を出力して v を1増やしています。
このコードを実行すると、
0 1 2 (中略) 102334155 102334156 102334157
このような結果が期待されます。 しかし、このような結果にはなりません。 実際には、
102334155 102334156 102334157 102334158 102334159
のようになります。0, 1, 2 ...の値は出力されません。
atomicallyとロック
atomically は、内部的にTVarをロックしているようです。 そのロックの影響で、前章でのサンプルコードは期待されるような結果になりません。 atomically はトランザクションをコミットする時に関係するTVarを全てロックします。 ロックされているTVarは他のスレッドから参照することも、書き込むこともできません。
また、Haskellは遅延評価をする言語です。 atomically がTVarのロックを取得した後に、 v に書き込むための値を計算します。 ( readTVar や writeTVar をする時にロックしているわけではありません。) しかし、 fib 40 の計算には非常に時間がかかります。 TVarをロックした後に計算しているため、``v`` を読もうとしている threadA も長時間ブロックされてしまいます。
正格評価をしてみる
threadB の内容を変更し $ の代りに正格性のある $! を使ってみます。
threadB :: TVar Int -> IO () threadB v = atomically $ writeTVar v $! fib 40
長いので変化のある所だけにしますが、結果はこのようになります。
18 19 20 102334155 102334156 102334157
今回は期待通りに動いています。 writeTVar する時に fib 40 の計算をすましているので、 atomically の内部で計算待ちでロックしなくなります。
しかし、手元の環境では -O と最適化フラグを付けてコンパイルすると、 遅延評価版と同じ結果になってしまいます。 インライン展開が悪さをしているようです。
{-# NOINLINE fib #-} fib :: Int -> Int fib 0 = 0 fib 1 = 1 fib n = fib (n - 1) + fib (n - 2)
このように、 NOINLINE プラグマを付けると、最適化を行っても大丈夫でした。
でも…
atomically の中で正格評価を行うのは時に問題となります。 プログラムをこのように書き換えてみます。
{-# NOINLINE fib #-} fib :: Int -> Int fib n = trace ("fib " ++ show n) $ fib' n where fib' 0 = 0 fib' 1 = 1 fib' n = fib' (n - 1) + fib' (n - 2) threadA :: TVar Int -> IO () threadA v = do x <- atomically modify print x where modify = do x <- readTVar v writeTVar v $ x + 1 return x threadB :: TVar Int -> IO () threadB v = atomically $ do x <- readTVar v writeTVar v $! fib x main :: IO () main = do c <- newCounter v <- newTVarIO 35 fork c $ every 50 $ threadA v fork c $ threadB v waitCounter c 2
まず、変更内容についてですが、 fib が実行された時に、引数の n の値をトレースするようにします。 threadA は次々と v の内容を変化させ、 threadB は v の内容を使って fib を計算しています。 main の内容は変化していませんが、 threadA の実行間隔が短い方がわかりやすいので、変更しています。
このプログラムを実行すると、このようになります。 fib が何度も呼び出され、いつまでたっても計算が終りません。
fib 35 35 fib 36 36 fib 37 37 fib 38 38 fib 39 39 fib 40 40
threadB が fib を計算中に threadA によって v の内容が書き換えられるため、 threadB の atomically が何度も再実行されます。 遅延評価( $ を使う)コードにしておくと、 v のロックを得てから fib を計算するため、 何回も実行されることはありません。(そのかわり threadA の実行は止まりますが。)
外で計算する
threadB の内容を書き換えて、 atomically の外で fib を計算するようにすると、 fib が何度も計算されることはなく、 threadA の実行も止まりません。 (ここで、 $! を使って正格評価にしないと、意味がありません。)
threadB :: TVar Int -> IO () threadB v = do x <- atomically $ readTVar v y <- return $! fib x atomically $ writeTVar v y
ロックフリーと言えども
ロックフリーと言えども、 STM を使う上ではロックに気を付ける必要がありそうです。 ロックの開放忘れを気にしなくてもよかったり、 ロックの順番を気にしなくてもよかったり、STMモナドが合成できたり、いいこともありますが、 ややこしいバグを発生させる可能性もありそうです。
その4
その4に続くかもしれません。
- Category(s)
- Haskell
- The URL to Trackback this entry is:
- http://dev.ariel-networks.com/Members/mizyo/haskell306e30b930ec30c330b730b930c630e03068stm306b306430443066-305d306e3/tbping
Re:HaskellのスレッドシステムとSTMについて その3
最初の例ですが、これはTVarをロックした後にfib 40を計算している訳ではありません。
まず、threadBのatomicallyは即座に終了し、vには未評価のfib 40が入った状態になります。
500ms後、これをthreadAが取り出し、printで表示する段になって、初めてfib 40が計算されるので、このような結果になります。
print nの直前に、putStrLnなどでメッセージを表示させてみると、どこに時間が掛かっているか分かりやすいと思います。
Re:HaskellのスレッドシステムとSTMについて その3
そして6ヶ月も放置してすみません。
なるほど。そういう処理をしているのですね。
TVarに入れられる値は計算されてから入れられるだろう という思い込みをしていました。