HaskellのスレッドシステムとSTMについて その2
スレッドの終了を検出する
前回の続きで、スレッドの終了を検出する方法です。
いくつか実装方法があると思いますが、今回は 子スレッドにカウンターを渡しておき、終了する時にそのカウンターを1増やします。 親スレッドはそのカウンターを監視し、それが自分が作った子の数になったら 全ての子スレッドが終了したとみなすことにします。
でもその前に
これを実装するためのカウンターは、複数のスレッドで共有される変数である必要があります。 また、複数のスレッドから同時に更新しても壊れない変数である必要があります。 MVar を使ってもカウンターは実装出来ますが、今回は TVar を使います。
STM
STM モナド及び TVar の定義は Control.Concurrent.STM モジュールにあります。 使う前にそのモジュールを import する必要があります。
newTVar :: a -> STM (TVar a) readTVar :: TVar a -> STM a writeTVar :: TVar a -> a -> STM () atomically :: STM a -> IO a
TVar は STM モナド環境の中でのみ、データを操作することができます。 そして、 atomically 関数を使うと、STMアクションを原子性を持って実行することができます。 イメージとしては、STMモナドで貯めた動作を、一気に実行するイメージです。 readTVar, writeTVar は名前の通り、 TVar を読み書きする関数です。 そして、 newTVar で新しいのを作ります。
肝心の atomically の実装方法なのですが、ソース [*] を読んでもよくわかりませんでした。 ここから先の話は間違っている可能性が非常に高いため、信用しないでください [†] 。 コミットする時に他のスレッドによる変更を検出しているようです。
- コミットする時に readTVar した値が変更されていない
- コミットする時に writeTVar する変数が他のスレッドによって変更されていない
この2点が満されると、 atomically が成功するようです。 失敗した場合は、最初から STM アクションをやりなおしているのだと思われます [‡] 。
[*] | rts/PrimOps.cmm, rts/STM.c に実装があります。 |
[†] | 他の事柄についても、正しいかは無保証ですが。 |
[‡] | 間違ってたら教えてください。 |
合成性とロックフリー
この様な変更手段を取るため、 TVar はロックが必要ありません。 他のスレッドと衝突しても、STMの実装が良い様に処理をしてくれます。 ロックがいらないため、デッドロックと呼ばれる状態にはなりません。
また、 STM はモナドであるため、処理を合成することができます。 合成してもデッドロックが発生することはありません。
retry
STM モナドには retry という関数があります。 [1]
retry を呼ぶと、そのSTMモナドの処理全体を再実行することができます。 ただ、何の工夫もなく再実行するだけでは CPU の無駄遣いです。 ドキュメントにも書かれていますが、 retry されるまでに読んだ( readTVar した) 変数が変更されるまで、スレッドの実行が止まります。
この作用により、変数が1になるまで待つといった動作や、リストが空でなくなるまで待つといった動作を書く事ができます。
また、 retry を検出するために使う orElse 関数というのもあります。
カウンターを作る
ソースコード
STM モナドについて説明がおわった所で、カウンターを作りたいと思います。
- 子スレッドはカウンターを親から貰い、処理が終ったら +1 する。
- 親スレッドはカウンターが一定の値になるまで待つ。
という動作にします。
import Control.Concurrent import Control.Concurrent.STM import Control.Monad (unless, forever) newtype Counter = Counter (TVar Int) modifyTVar :: (a -> a) -> TVar a -> STM () modifyTVar f tv = readTVar tv >>= writeTVar tv . f -- create a new counter and set its value 0. newCounter :: IO Counter newCounter = do tv <- newTVarIO 0 return $ Counter tv -- Increments the counter value. incCounter :: Counter -> IO () incCounter (Counter c) = atomically $ modifyTVar (+1) c -- Wait until the counter value becomes `n' waitCounter :: Counter -> Int -> IO () waitCounter (Counter c) n = atomically $ do v <- readTVar c unless (v == n) retry -- forkIO with Counter. fork :: Counter -> IO () -> IO ThreadId fork counter act = forkIO $ act >> incCounter counter -- threadDelay in milliseconds. -- `delay 1000' means that this thread sleeps 1 sec at least. delay :: Int -> IO () delay = threadDelay . (* 1000) -- Do the first argument action every `time' ms infinitely. every :: Int -> IO a -> IO () every time io = forever $ delay time >> io main :: IO () main = do c <- newCounter fork c $ every 1000 $ putStrLn "a" fork c $ every 500 $ putStrLn "b" waitCounter c 2
その1から処理自体はあまり変化していません。 every , dalay の内容も同じです。
modifyTVar は TVar を変更するユーティリティーです。 よく使うと思うのですが、何故か標準モジュールにありません。
waitCounter の中で、例の retry を使っています。 カウンターの値が n でなければ処理をやりなおします。 ですが retry の特性のため、カウンターの値が変化するまで スレッドが起きることはありません。 これにより、無駄なループが起らないようになっています。
fork もユーティリティー関数です。 受け取ったアクションの後に incCounter するように合成してから forkIO をします。 ただし、このコードには問題があります。 エラーが起った時にカウンターが増えません。 サンプルなので省略していますが、エラー処理を入れないと危険です。
書いていて気付いたのですが、このサンプルだと、スレッドが永遠と終了しないので、 waitCounterの実感が沸きにくいですね…すみません。
- Category(s)
- Haskell
- The URL to Trackback this entry is:
- http://dev.ariel-networks.com/Members/mizyo/haskell306e30b930ec30c330b730b930c630e03068stm306b306430443066-305d306e2/tbping