Control.Concurrent
Prelude> import Control.Concurrent
Prelude Control.Concurrent>
Module Control.Concurrent thuộc thư viện chuẩn của Haskell, không cần cài đặt thêm.
forkIO threadDelay
data ThreadIdKiểu dữ liệu đại diện cho handle của thread.forkIO :: IO () -> IO ThreadIdTạo một thread mới và trả về ThreadId, thực hiện thao tác IO trong thread mới.threadDelay :: Int -> IO ()Tạm dừng thread hiện tại trong n micro giây (một phần triệu giây).
Ví dụ 1
haskell-concurrency-demo
module ThreadsDemo where
import Control.Concurrent (forkIO, threadDelay)
import Data.Foldable (for_)
main = do
-- Thực hiện công việc đồng bộ.
printMessagesFrom "chính"
-- Tạo thread mới để thực hiện công việc nền.
forkIO $ printMessagesFrom "phụ"
-- Tạo thread khác với hàm inline!
forkIO $ do
putStrLn "bắt đầu!"
sleepMs 5
putStrLn "kết thúc!"
-- Chờ các thread hoàn thành.
sleepMs 10
-- Hàm đơn giản in ra ba thông điệp với độ trễ nhỏ giữa chúng.
printMessagesFrom name = for_ [1..3] printMessage
where printMessage i = do
sleepMs 1
putStrLn (name ++ " số " ++ show i)
-- Hàm tiện ích - threadDelay nhận micro giây, hơi bất tiện.
sleepMs n = threadDelay (n * 1000)
*ThreadsDemo> main
chính số 1
chính số 2
chính số 3
bắt đầu!
phụ số 1
phụ số 2
phụ số 3
kết thúc!
forkIO $ printMessagesFrom "phụ"forkIO $ do putStrLn "bắt đầu!"; sleepMs 5; putStrLn "kết thúc!"Sử dụng forkIO để khởi tạo hai thread: thread in số và thread in bắt đầu-kết thúc Quá trình in bắt đầu-kết thúc: in bắt đầu, đợi 5 mili giây, sau đó in kết thúc.printMessagesFrom name = for_ [1..3] printMessageHàm in số sẽ in ra ba sốprintMessage i = do sleepMs 1; putStrLn (name ++ " số " ++ show i)Trước khi in mỗi số, đợi 1 mili giây- Kết quả cuối cùng sẽ là:
bắt đầu! -- 0 mili giây
phụ số 1 -- 1 mili giây
phụ số 2 -- 2 mili giây
phụ số 3 -- 3 mili giây
kết thúc! -- 5 mili giây
STM
$ cabal install stm
stm-2.4.5.0 installed
STM (Software Transactional Memory - Bộ nhớ giao dịch phần mềm) là một cơ chế được triển khai bằng phần mềm để điều khiển đồng bộ giữa các thread. Trong ngôn ngữ Haskell, STM là một Monad.
- atomically :: STM a -> IO a Thực hiện STM Action như một thao tác nguyên tử trong IO Monad.
TMVar
data TMVar aTMVar là biến đồng bộ, được sử dụng để đồng bộ giữa các thread. TMVar có thể được hình dung như một hộp chỉ chứa được một vật phẩm, có thể ở trạng thái đầy hoặc rỗng.newTMVar :: a -> STM (TMVar a)Tạo biến TMVar với trạng thái ban đầu là đầy, chứa giá trị được chỉ định.newEmptyTMVar :: STM (TMVar a)Tạo biến TMVar với trạng thái ban đầu là rỗng.takeTMVar :: TMVar a -> STM aLấy giá trị từ biến TMVar, trạng thái chuyển từ đầy sang rỗng. Nếu biến TMVar đang rỗng, thread sẽ chặn (block) và liên tục thử lại.putTMVar :: TMVar a -> a -> STM ()Đặt giá trị được chỉ định vào biến TMVar, trạng thái chuyển từ rỗng sang đầy. Nếu biến TMVar đã đầy, thread sẽ chặn (block) và liên tục thử lại.readTMVar :: TMVar a -> STM aKết hợp của takeTMVar và putTMVar, lấy giá trị từ biến TMVar, sau đó đặt lại giá trị đó, và cuối cùng trả về giá trị đó.
Ví dụ 2
module SharedTMVar where
import DemoUtils (sleepMs)
import Control.Concurrent.STM (atomically)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM.TMVar (newEmptyTMVar, takeTMVar, putTMVar)
main = do
ketqua <- atomically $ newEmptyTMVar
forkIO $ do
Giả sử có một công việc thực tế cần thực hiện.
sleepMs 5
putStrLn "Đã tính toán kết quả!"
atomically $ putTMVar ketqua 42
putStrLn "Đang chờ..."
gia_tri <- atomically $ takeTMVar ketqua
putStrLn ("Kết quả là: " ++ show gia_tri)
*SharedTMVar> main
Đang chờ...
Đã tính toán kết quả!
Kết quả là: 42
ketqua <- atomically newEmptyTMVarThread chính tạo biến TMVar rỗng ketqua.gia_tri <- atomically $ takeTMVar ketquaThread chính liên tục thử lấy giá trị từ biến TMVar ketqua.forkIO (do sleepMs 5; putStrLn "Đã tính toán kết quả!"; atomically $ putTMVar ketqua 42)forkIO khởi tạo một thread mới, đầu tiên tạm dừng 5 mili giây, sau đó đặt giá trị 42 vào biến TMVar ketqua.
Ví dụ 3
module CounterTMVar where
import DemoUtils (sleepMs)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar (newTMVar, takeTMVar, putTMVar)
import Control.Monad (replicateM)
main = do
dem <- atomically $ newTMVar 0
let tang = do
so_hien_tai <- atomically $ takeTMVar dem
atomically $ putTMVar dem $! so_hien_tai + 1
tang_dem = do
replicateM 1000 tang
return ()
threads <- replicateM 5 (forkIO tang_dem)
sleepMs 100
so_cuoi <- atomically $ takeTMVar dem
print so_cuoi
*CounterTMVar> main
5000
dem <- atomically $ newTMVar 0Thread chính tạo biến TMVar dem với giá trị ban đầu là 0.tang = do so_hien_tai <- atomically $ takeTMVar dem; atomically $ putTMVar dem $! so_hien_tai + 1Hàm tang tăng giá trị trong biến TMVar dem lên 1 (lấy ra, tăng 1, đặt lại).tang_dem = do replicateM 1000 tang; return ()Hàm tang_dem thực hiện 1000 lần hàm tang, tức 1000 lần "lấy ra, tăng 1, đặt lại".replicateM 5 (forkIO tang_dem)Khởi tạo 5 thread, mỗi thread thực hiện một lần hàm tang_dem, tổng cộng 5000 lần "lấy ra, tăng 1, đặt lại".sleepMs 100; so_cuoi <- atomically $ takeTMVar demThread chính đợi 0.1 giây rồi thử lấy giá trị- 5000 Kết quả đúng, không có xung đột dữ liệu (data races) giữa 5 thread.
TChan
data TChan aTChan là loại channel không giới hạn, hoạt động theo cơ chế先进先出 (FIFO).newTChan :: STM (TChan a)Tạo một TChan mới.readTChan :: TChan a -> STM aĐọc giá trị tiếp theo từ TChan.writeTChan :: TChan a -> a -> STM ()Ghi một giá trị vào TChan.dupTChan :: TChan a -> STM (TChan a)Sao chép một TChan, sau khi sao chép, bất kỳ dữ liệu nào được ghi vào TChan gốc cũng sẽ được sao chép vào bản sao và ngược lại.
Ví dụ 4
module ChannelDemo1 where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (newTChan, writeTChan, readTChan)
main = do
messages <- atomically newTChan
atomically $ writeTChan messages "không giới hạn"
atomically $ writeTChan messages "kênh"
-- Đọc một tin nhắn từ kênh, sau đó in ra.
msg <- atomically $ readTChan messages
putStrLn msg
-- Làm lại điều tương tự nhưng ngắn gọn hơn.
putStrLn =<< (atomically $ readTChan messages)
*ChannelDemo1> main
không giới hạn
kênh
Chương trình tạo một TChan, ghi hai chuỗi vào đó, sau đó đọc ra.
Ví dụ 5
module ProducerConsumer where
import Control.Monad.STM
import Control.Concurrent
import Control.Concurrent.STM.TChan
motGiay = 1000000
producerThread :: TChan Int -> IO ()
producerThread chan = do
atomically $ writeTChan chan 1
threadDelay motGiay
atomically $ writeTChan chan 2
threadDelay motGiay
atomically $ writeTChan chan 3
threadDelay motGiay
consumerThread :: TChan Int -> IO ()
consumerThread chan = do
newInt <- atomically $ readTChan chan
putStrLn $ "đọc giá trị mới: " ++ show newInt
consumerThread chan
main = do
chan <- atomically $ newTChan
forkIO $ consumerThread chan
forkIO $ producerThread chan
threadDelay $ 5 * motGiay
*ProducerConsumer> main
đọc giá trị mới: 1
đọc giá trị mới: 2
đọc giá trị mới: 3
- Thread producer每隔一秒向 chan 写入一个数。
- Thread consumer不断地尝试从 chan 里读出数并打印。
Ví dụ 6
module DuplicateChannel where
import DemoUtils (sleepMs)
import Control.Concurrent.STM
import Control.Concurrent (forkIO)
import Control.Concurrent.STM.TChan (newTChan, writeTChan, readTChan, dupTChan)
testNonDuplicated = do
messages <- atomically newTChan
forkIO $ messageReader messages "Đọc 1"
forkIO $ messageReader messages "Đọc 2"
atomically $ writeTChan messages "Xin chào!"
messageReader channel name = do
msg <- atomically $ readTChan channel
putStrLn (name ++ " đã đọc: " ++ msg)
testDuplicated = do
broadcast <- atomically newTChan
forkIO $ broadcastReader broadcast "Nghe 1"
forkIO $ broadcastReader broadcast "Nghe 2"
sleepMs 1
atomically $ writeTChan broadcast "Tạm biệt!"
broadcastReader channel name = do
channel' <- atomically $ dupTChan channel
messageReader channel' name
main = do
testNonDuplicated
testDuplicated
sleepMs 10
*DuplicateChannel> main
Đọc 1 đã đọc: Xin chào!
Nghe 1 đã đọc: Tạm biệt!
Nghe 2 đã đọc: Tạm biệt!
- Đối với một kênh A, thông tin ghi vào A chỉ có thể đọc ra một lần qua A.
- Khi hàm dupTChan được dùng để sao chép kênh A thành kênh B, thông tin ghi vào A có thể được đọc lại một lần qua B.
TVar
- data TVar a Biến bộ nhớ có thể chia sẻ, hỗ trợ STM.
- newTVar :: a -> STM (TVar a) Tạo một TVar mới với giá trị a.
- readTVar :: TVar a -> STM a Đọc giá trị của TVar.
- writeTVar :: TVar a -> a -> STM () Ghi giá trị vào TVar.
- modifyTVar :: TVar a -> (a -> a) -> STM () Sửa đổi giá trị của TVar.
Ví dụ 7
module CounterTVar where
import DemoUtils (sleepMs)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (newTVar, readTVar, writeTVar)
import Control.Monad (replicateM)
main = do
dem <- atomically $ newTVar 0
let tang = atomically $ do
so_hien_tai <- readTVar dem
writeTVar dem $! so_hien_tai + 1
tang_dem = do
replicateM 1000 tang
return ()
threads <- replicateM 5 (forkIO tang_dem)
sleepMs 100
so_cuoi <- atomically $ readTVar dem
print so_cuoi
*CounterTVar> main
5000
tang = atomically $ do so_hien_tai <- readTVar dem; writeTVar dem $! so_hien_tai + 1Lưu ý ở đây việc đọc và ghi phải kết hợp thành một thao tác nguyên tử, nếu thay đổi thànhtang = do so_hien_tai <- atomically $ readTVar dem; atomically $ writeTVar dem $! so_hien_tai + 1thì giữa các thread sẽ xảy ra xung đột dữ liệu, tức là tất cả thread có thể đều đọc cùng một giá trị trước khi ghi lại Kết quả cuối cùng có thể là 5000, cũng có thể là 1000.
TMVar và TVar
newtype TMVar a = TMVar (TVar (Maybe a))
Theo cách triển khai hiện tại, TMVar được đóng gói từ TVar (Maybe a), trạng thái rỗng hay đầy được thực hiện thông qua kiểu Maybe.
TMVar và MVar
MVar là biến đồng bộ được thực hiện trong IO Monad của thư viện chuẩn, không còn được khuyến khích sử dụng. Còn TMVar là biến đồng bộ trong STM Monad, mạnh mẽ và đáng tin cậy hơn.
TArray
Loại mảng hỗ trợ STM và giao diện MArray.