Học Lập Trình Haskell: Lập Trình Đồng Thời Với STM

Control.Concurrent

Prelude> import Control.Concurrent
Prelude Control.Concurrent> 

Module Control.Concurrent thuộc thư viện chuẩn của Haskell, không cần cài đặt thêm.

forkIO threadDelay

  • data ThreadId Kiểu dữ liệu đại diện cho handle của thread.
  • forkIO :: IO () -> IO ThreadId Tạo một thread mới và trả về ThreadId, thực hiện thao tác IO trong thread mới.
  • threadDelay :: Int -> IO () Tạm dừng thread hiện tại trong n micro giây (một phần triệu giây).

Ví dụ 1

haskell-concurrency-demo

module ThreadsDemo where

import Control.Concurrent (forkIO, threadDelay)
import Data.Foldable (for_)

main = do
    -- Thực hiện công việc đồng bộ.
    printMessagesFrom "chính"

    -- Tạo thread mới để thực hiện công việc nền.
    forkIO $ printMessagesFrom "phụ"

    -- Tạo thread khác với hàm inline!
    forkIO $ do
        putStrLn "bắt đầu!"
        sleepMs 5
        putStrLn "kết thúc!"

    -- Chờ các thread hoàn thành.
    sleepMs 10

-- Hàm đơn giản in ra ba thông điệp với độ trễ nhỏ giữa chúng.
printMessagesFrom name = for_ [1..3] printMessage
    where printMessage i = do
            sleepMs 1
            putStrLn (name ++ " số " ++ show i)

-- Hàm tiện ích - threadDelay nhận micro giây, hơi bất tiện.
sleepMs n = threadDelay (n * 1000)

*ThreadsDemo> main
chính số 1
chính số 2
chính số 3
bắt đầu!
phụ số 1
phụ số 2
phụ số 3
kết thúc!

  • forkIO $ printMessagesFrom "phụ" forkIO $ do putStrLn "bắt đầu!"; sleepMs 5; putStrLn "kết thúc!" Sử dụng forkIO để khởi tạo hai thread: thread in số và thread in bắt đầu-kết thúc Quá trình in bắt đầu-kết thúc: in bắt đầu, đợi 5 mili giây, sau đó in kết thúc.
  • printMessagesFrom name = for_ [1..3] printMessage Hàm in số sẽ in ra ba số
  • printMessage i = do sleepMs 1; putStrLn (name ++ " số " ++ show i) Trước khi in mỗi số, đợi 1 mili giây
  • Kết quả cuối cùng sẽ là:
bắt đầu!          -- 0 mili giây
phụ số 1          -- 1 mili giây
phụ số 2          -- 2 mili giây
phụ số 3          -- 3 mili giây
kết thúc!         -- 5 mili giây

STM

$ cabal install stm
stm-2.4.5.0 installed

STM (Software Transactional Memory - Bộ nhớ giao dịch phần mềm) là một cơ chế được triển khai bằng phần mềm để điều khiển đồng bộ giữa các thread. Trong ngôn ngữ Haskell, STM là một Monad.

  • atomically :: STM a -> IO a Thực hiện STM Action như một thao tác nguyên tử trong IO Monad.

TMVar

  • data TMVar a TMVar là biến đồng bộ, được sử dụng để đồng bộ giữa các thread. TMVar có thể được hình dung như một hộp chỉ chứa được một vật phẩm, có thể ở trạng thái đầy hoặc rỗng.
  • newTMVar :: a -> STM (TMVar a) Tạo biến TMVar với trạng thái ban đầu là đầy, chứa giá trị được chỉ định.
  • newEmptyTMVar :: STM (TMVar a) Tạo biến TMVar với trạng thái ban đầu là rỗng.
  • takeTMVar :: TMVar a -> STM a Lấy giá trị từ biến TMVar, trạng thái chuyển từ đầy sang rỗng. Nếu biến TMVar đang rỗng, thread sẽ chặn (block) và liên tục thử lại.
  • putTMVar :: TMVar a -> a -> STM () Đặt giá trị được chỉ định vào biến TMVar, trạng thái chuyển từ rỗng sang đầy. Nếu biến TMVar đã đầy, thread sẽ chặn (block) và liên tục thử lại.
  • readTMVar :: TMVar a -> STM a Kết hợp của takeTMVar và putTMVar, lấy giá trị từ biến TMVar, sau đó đặt lại giá trị đó, và cuối cùng trả về giá trị đó.

Ví dụ 2

module SharedTMVar where

import DemoUtils (sleepMs)

import Control.Concurrent.STM (atomically)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM.TMVar (newEmptyTMVar, takeTMVar, putTMVar)

main = do
    ketqua <- atomically $ newEmptyTMVar

    forkIO $ do
        Giả sử có một công việc thực tế cần thực hiện.
        sleepMs 5
        putStrLn "Đã tính toán kết quả!"
        atomically $ putTMVar ketqua 42

    putStrLn "Đang chờ..."
    gia_tri <- atomically $ takeTMVar ketqua
    putStrLn ("Kết quả là: " ++ show gia_tri)

*SharedTMVar> main
Đang chờ...
Đã tính toán kết quả!
Kết quả là: 42

  • ketqua <- atomically newEmptyTMVar Thread chính tạo biến TMVar rỗng ketqua.
  • gia_tri <- atomically $ takeTMVar ketqua Thread chính liên tục thử lấy giá trị từ biến TMVar ketqua.
  • forkIO (do sleepMs 5; putStrLn "Đã tính toán kết quả!"; atomically $ putTMVar ketqua 42) forkIO khởi tạo một thread mới, đầu tiên tạm dừng 5 mili giây, sau đó đặt giá trị 42 vào biến TMVar ketqua.

Ví dụ 3

module CounterTMVar where

import DemoUtils (sleepMs)

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar (newTMVar, takeTMVar, putTMVar)
import Control.Monad (replicateM)

main = do
    dem <- atomically $ newTMVar 0

    let tang = do
            so_hien_tai <- atomically $ takeTMVar dem
            atomically $ putTMVar dem $! so_hien_tai + 1
        tang_dem = do
            replicateM 1000 tang
            return ()

    threads <- replicateM 5 (forkIO tang_dem)

    sleepMs 100
    so_cuoi <- atomically $ takeTMVar dem
    print so_cuoi

*CounterTMVar> main
5000

  • dem <- atomically $ newTMVar 0 Thread chính tạo biến TMVar dem với giá trị ban đầu là 0.
  • tang = do so_hien_tai <- atomically $ takeTMVar dem; atomically $ putTMVar dem $! so_hien_tai + 1 Hàm tang tăng giá trị trong biến TMVar dem lên 1 (lấy ra, tăng 1, đặt lại).
  • tang_dem = do replicateM 1000 tang; return () Hàm tang_dem thực hiện 1000 lần hàm tang, tức 1000 lần "lấy ra, tăng 1, đặt lại".
  • replicateM 5 (forkIO tang_dem) Khởi tạo 5 thread, mỗi thread thực hiện một lần hàm tang_dem, tổng cộng 5000 lần "lấy ra, tăng 1, đặt lại".
  • sleepMs 100; so_cuoi <- atomically $ takeTMVar dem Thread chính đợi 0.1 giây rồi thử lấy giá trị
  • 5000 Kết quả đúng, không có xung đột dữ liệu (data races) giữa 5 thread.

TChan

  • data TChan a TChan là loại channel không giới hạn, hoạt động theo cơ chế先进先出 (FIFO).
  • newTChan :: STM (TChan a) Tạo một TChan mới.
  • readTChan :: TChan a -> STM a Đọc giá trị tiếp theo từ TChan.
  • writeTChan :: TChan a -> a -> STM () Ghi một giá trị vào TChan.
  • dupTChan :: TChan a -> STM (TChan a) Sao chép một TChan, sau khi sao chép, bất kỳ dữ liệu nào được ghi vào TChan gốc cũng sẽ được sao chép vào bản sao và ngược lại.

Ví dụ 4

module ChannelDemo1 where

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (newTChan, writeTChan, readTChan)

main = do
    messages <- atomically newTChan
    atomically $ writeTChan messages "không giới hạn"
    atomically $ writeTChan messages "kênh"

    -- Đọc một tin nhắn từ kênh, sau đó in ra.
    msg <- atomically $ readTChan messages
    putStrLn msg
    -- Làm lại điều tương tự nhưng ngắn gọn hơn.
    putStrLn =<< (atomically $ readTChan messages)

*ChannelDemo1> main
không giới hạn
kênh

Chương trình tạo một TChan, ghi hai chuỗi vào đó, sau đó đọc ra.

Ví dụ 5

module ProducerConsumer where

import Control.Monad.STM
import Control.Concurrent
import Control.Concurrent.STM.TChan

motGiay = 1000000

producerThread :: TChan Int -> IO ()
producerThread chan = do
        atomically $ writeTChan chan 1
        threadDelay motGiay
        atomically $ writeTChan chan 2
        threadDelay motGiay
        atomically $ writeTChan chan 3
        threadDelay motGiay

consumerThread :: TChan Int -> IO ()
consumerThread chan = do
        newInt <- atomically $ readTChan chan
        putStrLn $ "đọc giá trị mới: " ++ show newInt
        consumerThread chan

main = do
        chan <- atomically $ newTChan
        forkIO $ consumerThread chan
        forkIO $ producerThread chan
        threadDelay $ 5 * motGiay

*ProducerConsumer> main
đọc giá trị mới: 1
đọc giá trị mới: 2
đọc giá trị mới: 3

  • Thread producer每隔一秒向 chan 写入一个数。
  • Thread consumer不断地尝试从 chan 里读出数并打印。

Ví dụ 6

module DuplicateChannel where

import DemoUtils (sleepMs)

import Control.Concurrent.STM
import Control.Concurrent (forkIO)
import Control.Concurrent.STM.TChan (newTChan, writeTChan, readTChan, dupTChan)

testNonDuplicated = do
    messages <- atomically newTChan
    forkIO $ messageReader messages "Đọc 1"
    forkIO $ messageReader messages "Đọc 2"
    atomically $ writeTChan messages "Xin chào!"

messageReader channel name = do
    msg <- atomically $ readTChan channel
    putStrLn (name ++ " đã đọc: " ++ msg)

testDuplicated = do
    broadcast <- atomically newTChan
    forkIO $ broadcastReader broadcast "Nghe 1"
    forkIO $ broadcastReader broadcast "Nghe 2"
    sleepMs 1
    atomically $ writeTChan broadcast "Tạm biệt!"

broadcastReader channel name = do
    channel' <- atomically $ dupTChan channel
    messageReader channel' name

main = do
    testNonDuplicated
    testDuplicated
    sleepMs 10

*DuplicateChannel> main
Đọc 1 đã đọc: Xin chào!
Nghe 1 đã đọc: Tạm biệt!
Nghe 2 đã đọc: Tạm biệt!

  • Đối với một kênh A, thông tin ghi vào A chỉ có thể đọc ra một lần qua A.
  • Khi hàm dupTChan được dùng để sao chép kênh A thành kênh B, thông tin ghi vào A có thể được đọc lại một lần qua B.

TVar

  • data TVar a Biến bộ nhớ có thể chia sẻ, hỗ trợ STM.
  • newTVar :: a -> STM (TVar a) Tạo một TVar mới với giá trị a.
  • readTVar :: TVar a -> STM a Đọc giá trị của TVar.
  • writeTVar :: TVar a -> a -> STM () Ghi giá trị vào TVar.
  • modifyTVar :: TVar a -> (a -> a) -> STM () Sửa đổi giá trị của TVar.

Ví dụ 7

module CounterTVar where

import DemoUtils (sleepMs)

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (newTVar, readTVar, writeTVar)
import Control.Monad (replicateM)

main = do
    dem <- atomically $ newTVar 0

    let tang = atomically $ do
            so_hien_tai <- readTVar dem
            writeTVar dem $! so_hien_tai + 1
        tang_dem = do
            replicateM 1000 tang
            return ()

    threads <- replicateM 5 (forkIO tang_dem)

    sleepMs 100
    so_cuoi <- atomically $ readTVar dem
    print so_cuoi

*CounterTVar> main
5000

  • tang = atomically $ do so_hien_tai <- readTVar dem; writeTVar dem $! so_hien_tai + 1 Lưu ý ở đây việc đọc và ghi phải kết hợp thành một thao tác nguyên tử, nếu thay đổi thành tang = do so_hien_tai <- atomically $ readTVar dem; atomically $ writeTVar dem $! so_hien_tai + 1 thì giữa các thread sẽ xảy ra xung đột dữ liệu, tức là tất cả thread có thể đều đọc cùng một giá trị trước khi ghi lại Kết quả cuối cùng có thể là 5000, cũng có thể là 1000.

TMVar và TVar

newtype TMVar a = TMVar (TVar (Maybe a)) Theo cách triển khai hiện tại, TMVar được đóng gói từ TVar (Maybe a), trạng thái rỗng hay đầy được thực hiện thông qua kiểu Maybe.

TMVar và MVar

MVar là biến đồng bộ được thực hiện trong IO Monad của thư viện chuẩn, không còn được khuyến khích sử dụng. Còn TMVar là biến đồng bộ trong STM Monad, mạnh mẽ và đáng tin cậy hơn.

TArray

Loại mảng hỗ trợ STM và giao diện MArray.

Thẻ: Haskell STM Concurrent Programming TMVar TChan

Đăng vào ngày 1 tháng 6 lúc 19:48