Базовая работа с MVar
date = fromGregorian 2015 feb 13
category = "Теория"
tags = ["конкуретность"]
В данном посте будет краткое описание одного из базовых примитивов для передачи данных ‘MVar’ (Mutable Variable, эм-вар).
Для начала, хочется дать очень краткое описание основных структур данных, позволяющих получать общий доступ к изменяемым переменным из разных потоков. Стандартная библиотека представляет три глобальные группы:
- IORef - изменяемая переменная, над которой можно производить операции из разных потоков (поддерживает атомарные операции) все операции должны быть чистыми
- TVar - примитив для работы с транзакционной памятью, работа
с ним возможна только внутри
STM
монады (поддерживает объединяемые транзакции) - MVar - низкоуровневый примитив, при изменении которого можно
производить произвольные
IO
операции.
Исходя из написанного выше видно, что MVar
следует использовать тогда,
когда при работе с переменной идёт и работа с ‘IO’, в этом случае другие
механизмы не могут предоставить покрывающее данную задачу решение.
Краткий обзор
data MVar a
можно рассматривать как ящик, защищенный мьютексом, в котором может
или находиться или не находиться значение.
У MVar
переменных есть достаточно простой базовый API:
data MVar a -- abstract
newEmptyMVar :: IO (MVar a)
newMVar :: a -> IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
Операции newEmptyMVar
и newMVar
создают новую переменную, пустую или со
значением соотвественно. Операция takeMVar
получает текущее значение, оставляя
MVar
пустым или блокируется в случае, если MVar
пуст. Операция putMVar
наоборот
кладёт значение в MVar
, если он пуст или блокируется, если в MVar
уже есть значение.
Небольшой пример того, как может использоваться MVar
:
import Control.Concurrent
= do
main <- newEmptyMVar -- (1)
result $ do -- (2)
forkIO <- getUrl "http://ruhaskell.org/" -- (3)
r -- (4)
putMVar result r
doSmthElse
<- takeMVar result -- (5)
r print r
В этом коде мы создали ячейку для результата (1)
, запустили фоновое действие (2)
,
получили результат со страницы (3)
, положили результат в ячейку (4)
, в главном
потоке после завершения действия получили данные из ячейки (5)
.
В случае, если все процессы заблокированы на MVar
, система исполнения может
автоматически отслеживать такие ситуации и, при обнаружении, выбрасывать исключение:
= do
main <- newEmptyMVar
m takeMVar m
$ ./mvar3
mvar3: thread blocked indefinitely in an MVar operation
Вкратце, такое происходит, если система исполнения “видит”, что ни у одного
незаблокированного потока нет в области видимости MVar
-переменной, запись
или чтение из которой могут разблокировать систему. Однако, часто такое
описание может вызвать затруднение. Подробно разные случае рассмотрены в посте Edward Z. Yang BlockedIndefinitelyOnMVar.
Использование MVar
В целом MVar
могут быть применены в следующих случаях:
как “одноместные” каналы передачи данных; в этом случае
MVar
можно рассматривать как базовый примитив для передачи данных;как контейнер для разделяемых изменяемых данных;
как базовый компонент для более сложных конкурентных структур данных.
Примеры задач:
1. Возвращение результата из подпотока (пример с рабочим процессом выше). 2. Синхронизация между потоками, например, ожидание завершения потоков
haskell main = do finished <- newEmptyMVar forkFinally (const $ putMVar finished ()) run takeMVar finished
- Многопоточный доступ к состоянию:
haskell st <- newMVar State replicateM 3 $ forkIO $ forever $ do r <- request modifyMVar st (processRequest r)
- Блокирока ресурса при многопоточном доступе:
haskell lock <- newMVar () replicateM 3 $ forkIO $ do ... withMVar_ lock doScarryThings
- Многопоточный доступ к состоянию:
- каналы Control.Concurrent.Chan
Более полные примеры можно найти в книге Simon Marlow Parallel and Concurrent Programming in Haskell Глава 7.
Для упрощения работы с MVar
можно разбить функции на две группы для работы со случаем (1)
и (2)
, третий случай попадает в одну из групп в зависимости от конкретной задачи MVar
в структуре.
И далее в каждом из случаев использовать функции из этой группы:
Передача данных | Контейнер для разделенных данных |
---|---|
putMVar | withMVar |
takeMVar | modifyMVar |
isEmptyMVar | readMVar |
tryReadMVar,tryPutMVar | swapMVar |
Естестсвенно, никто не требует, чтобы при работе с одним из случаев не использовать функции из другой группы, но в этом случае необходимо четко понимать к каким последствиям это может приводить и учитывать при написании кода. Вот несколько интересных примеров:
один поток вызывает
withMVar
и выполняет действие, после чего должен положить значение назад, но, если в это время другой поток “положит” результат вMVar
(putMVar
), то первый поток будет заблокирован.аналогичная ситуация может происходить с вызовом
readMVar
, который есть комбинацияtakeMVar
иputMVar
, в ghc начиная с 7.8 данное поведение исправлено и теперь операцияreadMVar
не освобождает “ячейку”.при использовании
takeMVar
иputMVar
, в случае исключенияMVar
останется не заполненым (если конечно исключения не обрабатываются должным образом, см. ниже).
В случае работы, как описано выше, проблемные ситуации сводятся к минимуму.
Гарантия “справедливости”
MVar
таже имеют очень важное свойство “справедливость” (fairness), которое
обозначает следующую гарантию:
Ни один поток не может заблокироваться на
MVar
навечно, за исключением случая, если другой поток вечно “держит”MVar
.
Другими словами, если поток \(\tau\) вызывает takeMVar
и выполняются регулярные
операции putMVar
на той же переменной, то takeMVar
когда-нибудь завершится.
В GHC данная гарантия реализована при помощи хранения всех потоков, заблокированных
на MVar
в очереди (FIFO), и при осводождении MVar
выполнение пробуждения потока
и выполнения операции над MVar
за один атомарный шаг.
Важным следствием гарантии справедливости является то, что достаточно будить только
один поток, что уменьшает нагрузку (это несправедливо для случая STM
, где в при
обновлении TVar
должны быть пробужены все заблокированные потоки).
Исключения и MVar
Особенно аккуратным при работе с MVar
нужно быть в случае возможных асинхронных
исключений. Поскольку необходимо следить за тем, чтобы даже в при наличии исключений
актуальность MVar
не терялась.
Функции для работы с MVar
как с контейнером “знают” об исключениях, и, в случае,
если исключение появляется, возвращают состояние MVar
. Делается это достаточно
просто, но является интересным и простым примером работы с асинхронными исключениями:
= mask $ \restore -> do -- (1)
modifyMVar m f <- takeMVar -- (2)
x <- restore f `onException` putMVar x -- (3)
(y, n) -- (4)
putMVar y return n -- (5)
Здесь мы (1)
- заблокировали исключения, и получили функцию для из разблокировки restore
;
(2)
- получили текущее значение MVar
; (3)
- разблокировав исключения, запустили
пользовательскую функцию. Если вовремя выполнения пользовательской функции пришло исключение,
то мы возвращаем значение MVar
и выбрасываем исключение выше; (4)
- если мы дожили до этого
участка кода без исключений, то они больше не случатся, мы кладём новое значение и возвращаем результат (5)
.
Достаточно просто, но это и простейший примитив :)
Но все ли тут хорошо? Внимательный читатель спросит, а как же можно остановить поток, который
заблокирован на takeMVar
, если исключения замаскированы? Тут есть две части ответа: во-первых,
IndefinitelyBlockedOnMVar
- исключение магическое (работает как синхронное) и потому игнорирует
маскировку, а, во-вторых, подобные операции, выполняющие блокирующие операции, снимают маскировку.
Таким образом, при чтении MVar
, в случае если MVar
пуст и тред блокируется, асинхронное исключение
может быть доставлено даже если исключения заблокированы. Аналогично с записью и не-пустым MVar
.
Данное правило является не самым простым, но его понимание важно при написании кода
лишенного гонок. Для случаев, когда такое поведение нежелательно существует операция uninterruptibleMask
,
не позволяющая разблокировать исключения. Эта операция не несёт дополнительной сложности, но в
этом случае разблокировать ветку будет невозможно (кроме IndefinitelyBlockedOnMVar
). Подробно
про работу и исключениями можно прочитать в документации модуля Control.Exception.
Но все ли теперь хорошо? На самом деле нет, существует ещё одна возможная проблема! В случае,
если состояние MVar
должно отражать состояние внешнего мира, а операция внутри MVar
его изменяет,
возможна рассинхронизация. Представьте следующий пример:
=
newConnection addr $ \state -> -- (1)
modifyMVar state <- connect addr -- (2)
mc -- (BOOM!)
case mc of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
В этом коде мы захватываем состояние (1)
; открываем новое соединение (2)
; в случае неуспеха
возвращаем ошибку, не изменяя состояние (3)
; в случае успеха сохраняем новое состояние и возращаем
соотвествующий результат (4)
. Теперь представим, что мы получаем асинхронное исключение в момент
(BOOM!)
, здесь соединение уже открыто, но ещё не сохранено в состояние, после получения исключение
в state
возвращается старое значение - ошибка.
Одно из решений использовать маскировку самому:
= mask $ \release ->
newConnection addr $ \state -> -- (1)
modifyMVar state <- release (connect addr) -- (2), (BOOM!)
mc case ec of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
или если connect
размаскирует исключения:
= mask_ $
newConnection addr $ \state -> -- (1)
modifyMVar state <- connect addr -- (2), (BOOM!)
mc case ec of
Left e -> return (state, ConnectionError e) -- (3)
Rigth r -> return (r:state, OK) -- (4)
Данный код будет работать корректно. Также можно использовать метод modifyMVarMasked
,
который работает аналогично последнему блоку кода.
Следует заметить, что STM
не имеет большей части из этих проблем и существенно проще в
использовании, но там имеются свои сложности с включением IO
операций в транзакции
(или написания серии блоков STM -> IO -> STM
) и с отсутствием гарантии справедливости.