Базовая работа с MVar

В данном посте будет краткое описание одного из базовых примитивов для передачи данных ‘MVar’ (Mutable Variable, эм-вар).

Для начала, хочется дать очень краткое описание основных структур данных, позволяющих получать общий доступ к изменяемым переменным из разных потоков. Стандартная библиотека представляет три глобальные группы:

  • IORef - изменяемая переменная, над которой можно производить операции из разных потоков (поддерживает атомарные операции) все операции должны быть чистыми
  • TVar - примитив для работы с транзакционной памятью, работа с ним возможна только внутри STM монады (поддерживает объединяемые транзакции)
  • MVar - низкоуровневый примитив, при изменении которого можно производить произвольные IO операции.

Исходя из написанного выше видно, что MVar следует использовать тогда, когда при работе с переменной идёт и работа с ‘IO’, в этом случае другие механизмы не могут предоставить покрывающее данную задачу решение.

Краткий обзор

data MVar a можно рассматривать как ящик, защищенный мьютексом, в котором может или находиться или не находиться значение.

У MVar переменных есть достаточно простой базовый API:

data MVar a  -- abstract

newEmptyMVar :: IO (MVar a)
newMVar      :: a -> IO (MVar a)
takeMVar     :: MVar a -> IO a
putMVar      :: MVar a -> a -> IO ()

Операции newEmptyMVar и newMVar создают новую переменную, пустую или со значением соотвественно. Операция takeMVar получает текущее значение, оставляя MVar пустым или блокируется в случае, если MVar пуст. Операция putMVar наоборот кладёт значение в MVar, если он пуст или блокируется, если в MVar уже есть значение.

Небольшой пример того, как может использоваться MVar:

import Control.Concurrent

main = do
  result <- newEmptyMVar                    -- (1)
  forkIO $ do                               -- (2)
    r <- getUrl "http://ruhaskell.org/"     -- (3)
    putMVar result r                        -- (4)
  
  doSmthElse

  r <- takeMVar result                      -- (5)
  print r

В этом коде мы создали ячейку для результата (1), запустили фоновое действие (2), получили результат со страницы (3), положили результат в ячейку (4), в главном потоке после завершения действия получили данные из ячейки (5).

В случае, если все процессы заблокированы на MVar, система исполнения может автоматически отслеживать такие ситуации и, при обнаружении, выбрасывать исключение:

main = do
  m <- newEmptyMVar
  takeMVar m
 $ ./mvar3
 mvar3: thread blocked indefinitely in an MVar operation

Вкратце, такое происходит, если система исполнения “видит”, что ни у одного незаблокированного потока нет в области видимости MVar-переменной, запись или чтение из которой могут разблокировать систему. Однако, часто такое описание может вызвать затруднение. Подробно разные случае рассмотрены в посте Edward Z. Yang BlockedIndefinitelyOnMVar.

Использование MVar

В целом MVar могут быть применены в следующих случаях:

  1. как “одноместные” каналы передачи данных; в этом случае MVar можно рассматривать как базовый примитив для передачи данных;

  2. как контейнер для разделяемых изменяемых данных;

  3. как базовый компонент для более сложных конкурентных структур данных.

Примеры задач:

  1. 1. Возвращение результата из подпотока (пример с рабочим процессом выше).
    2. Синхронизация между потоками, например, ожидание завершения потоков
    haskell main = do finished <- newEmptyMVar forkFinally (const $ putMVar finished ()) run takeMVar finished
    1. Многопоточный доступ к состоянию: haskell st <- newMVar State replicateM 3 $ forkIO $ forever $ do r <- request modifyMVar st (processRequest r)
    2. Блокирока ресурса при многопоточном доступе: haskell lock <- newMVar () replicateM 3 $ forkIO $ do ... withMVar_ lock doScarryThings
  2. каналы Control.Concurrent.Chan

Более полные примеры можно найти в книге Simon Marlow Parallel and Concurrent Programming in Haskell Глава 7.

Для упрощения работы с MVar можно разбить функции на две группы для работы со случаем (1) и (2), третий случай попадает в одну из групп в зависимости от конкретной задачи MVar в структуре. И далее в каждом из случаев использовать функции из этой группы:

Передача данных Контейнер для разделенных данных
putMVar withMVar
takeMVar modifyMVar
isEmptyMVar readMVar
tryReadMVar,tryPutMVar swapMVar

Естестсвенно, никто не требует, чтобы при работе с одним из случаев не использовать функции из другой группы, но в этом случае необходимо четко понимать к каким последствиям это может приводить и учитывать при написании кода. Вот несколько интересных примеров:

  1. один поток вызывает withMVar и выполняет действие, после чего должен положить значение назад, но, если в это время другой поток “положит” результат в MVar (putMVar), то первый поток будет заблокирован.

  2. аналогичная ситуация может происходить с вызовом readMVar, который есть комбинация takeMVar и putMVar, в ghc начиная с 7.8 данное поведение исправлено и теперь операция readMVar не освобождает “ячейку”.

  3. при использовании takeMVar и putMVar, в случае исключения MVar останется не заполненым (если конечно исключения не обрабатываются должным образом, см. ниже).

В случае работы, как описано выше, проблемные ситуации сводятся к минимуму.

Гарантия “справедливости”

MVar таже имеют очень важное свойство “справедливость” (fairness), которое обозначает следующую гарантию:

Ни один поток не может заблокироваться на MVar навечно, за исключением случая, если другой поток вечно “держит” MVar.

Другими словами, если поток \(\tau\) вызывает takeMVar и выполняются регулярные операции putMVar на той же переменной, то takeMVar когда-нибудь завершится. В GHC данная гарантия реализована при помощи хранения всех потоков, заблокированных на MVar в очереди (FIFO), и при осводождении MVar выполнение пробуждения потока и выполнения операции над MVar за один атомарный шаг.

Важным следствием гарантии справедливости является то, что достаточно будить только один поток, что уменьшает нагрузку (это несправедливо для случая STM, где в при обновлении TVar должны быть пробужены все заблокированные потоки).

Исключения и MVar

Особенно аккуратным при работе с MVar нужно быть в случае возможных асинхронных исключений. Поскольку необходимо следить за тем, чтобы даже в при наличии исключений актуальность MVar не терялась.

Функции для работы с MVar как с контейнером “знают” об исключениях, и, в случае, если исключение появляется, возвращают состояние MVar. Делается это достаточно просто, но является интересным и простым примером работы с асинхронными исключениями:

modifyMVar m f = mask $ \restore -> do                  -- (1)
    x <- takeMVar                                       -- (2)
    (y, n) <- restore f `onException` putMVar x         -- (3)
    putMVar y                                           -- (4)
    return n                                            -- (5)

Здесь мы (1) - заблокировали исключения, и получили функцию для из разблокировки restore; (2) - получили текущее значение MVar; (3) - разблокировав исключения, запустили пользовательскую функцию. Если вовремя выполнения пользовательской функции пришло исключение, то мы возвращаем значение MVar и выбрасываем исключение выше; (4) - если мы дожили до этого участка кода без исключений, то они больше не случатся, мы кладём новое значение и возвращаем результат (5). Достаточно просто, но это и простейший примитив :)

Но все ли тут хорошо? Внимательный читатель спросит, а как же можно остановить поток, который заблокирован на takeMVar, если исключения замаскированы? Тут есть две части ответа: во-первых, IndefinitelyBlockedOnMVar - исключение магическое (работает как синхронное) и потому игнорирует маскировку, а, во-вторых, подобные операции, выполняющие блокирующие операции, снимают маскировку. Таким образом, при чтении MVar, в случае если MVar пуст и тред блокируется, асинхронное исключение может быть доставлено даже если исключения заблокированы. Аналогично с записью и не-пустым MVar. Данное правило является не самым простым, но его понимание важно при написании кода лишенного гонок. Для случаев, когда такое поведение нежелательно существует операция uninterruptibleMask, не позволяющая разблокировать исключения. Эта операция не несёт дополнительной сложности, но в этом случае разблокировать ветку будет невозможно (кроме IndefinitelyBlockedOnMVar). Подробно про работу и исключениями можно прочитать в документации модуля Control.Exception.

Но все ли теперь хорошо? На самом деле нет, существует ещё одна возможная проблема! В случае, если состояние MVar должно отражать состояние внешнего мира, а операция внутри MVar его изменяет, возможна рассинхронизация. Представьте следующий пример:

newConnection addr =
  modifyMVar state $ \state ->                      --  (1)
    mc <- connect addr                              --  (2)
                                                    --  (BOOM!)
    case mc of
      Left e -> return (state, ConnectionError e)   --  (3)
      Rigth r -> return (r:state, OK)               --  (4)

В этом коде мы захватываем состояние (1); открываем новое соединение (2); в случае неуспеха возвращаем ошибку, не изменяя состояние (3); в случае успеха сохраняем новое состояние и возращаем соотвествующий результат (4). Теперь представим, что мы получаем асинхронное исключение в момент (BOOM!), здесь соединение уже открыто, но ещё не сохранено в состояние, после получения исключение в state возвращается старое значение - ошибка.

Одно из решений использовать маскировку самому:

newConnection addr = mask $ \release ->
  modifyMVar state $ \state ->                      --  (1)
    mc <- release (connect addr)                    --  (2), (BOOM!)
    case ec of
      Left e -> return (state, ConnectionError e)   --  (3)
      Rigth r -> return (r:state, OK)               --  (4)

или если connect размаскирует исключения:

newConnection addr = mask_ $
  modifyMVar state $ \state ->                      --  (1)
    mc <- connect addr                              --  (2), (BOOM!)
    case ec of
      Left e -> return (state, ConnectionError e)   --  (3)
      Rigth r -> return (r:state, OK)               --  (4)

Данный код будет работать корректно. Также можно использовать метод modifyMVarMasked, который работает аналогично последнему блоку кода.

Следует заметить, что STM не имеет большей части из этих проблем и существенно проще в использовании, но там имеются свои сложности с включением IO операций в транзакции (или написания серии блоков STM -> IO -> STM) и с отсутствием гарантии справедливости.