{-# LANGUAGE RankNTypes, Trustworthy #-}
module Pipes.Concurrent (
Input(..),
Output(..),
fromInput,
toOutput,
spawn,
spawn',
Buffer(..),
module Control.Concurrent,
module Control.Concurrent.STM,
module System.Mem
) where
import Control.Applicative (
Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) )
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, STM)
import qualified Control.Concurrent.STM as S
import Control.Monad (when)
import Data.IORef (newIORef, readIORef, mkWeakIORef)
import Data.Monoid (Monoid(mempty, mappend))
import GHC.Conc.Sync (unsafeIOToSTM)
import Pipes (MonadIO(liftIO), yield, await, Producer', Consumer')
import System.Mem (performGC)
newtype Input a = Input {
recv :: S.STM (Maybe a) }
instance Functor Input where
fmap f m = Input (fmap (fmap f) (recv m))
instance Applicative Input where
pure r = Input (pure (pure r))
mf <*> mx = Input ((<*>) <$> recv mf <*> recv mx)
instance Monad Input where
return r = Input (return (return r))
m >>= f = Input $ do
ma <- recv m
case ma of
Nothing -> return Nothing
Just a -> recv (f a)
instance Alternative Input where
empty = Input (return Nothing)
x <|> y = Input $ do
(i, ma) <- fmap ((,) y) (recv x) <|> fmap ((,) x)(recv y)
case ma of
Nothing -> recv i
Just a -> return (Just a)
instance Monoid (Input a) where
mempty = empty
mappend = (<|>)
newtype Output a = Output {
send :: a -> S.STM Bool }
instance Monoid (Output a) where
mempty = Output (\_ -> return False)
mappend i1 i2 = Output (\a -> (||) <$> send i1 a <*> send i2 a)
toOutput :: (MonadIO m) => Output a -> Consumer' a m ()
toOutput output = loop
where
loop = do
a <- await
alive <- liftIO $ S.atomically $ send output a
when alive loop
fromInput :: (MonadIO m) => Input a -> Producer' a m ()
fromInput input = loop
where
loop = do
ma <- liftIO $ S.atomically $ recv input
case ma of
Nothing -> return ()
Just a -> do
yield a
loop
spawn :: Buffer a -> IO (Output a, Input a)
spawn buffer = fmap simplify (spawn' buffer)
where
simplify (output, input, _) = (output, input)
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' buffer = do
(write, read) <- case buffer of
Bounded n -> do
q <- S.newTBQueueIO n
return (S.writeTBQueue q, S.readTBQueue q)
Unbounded -> do
q <- S.newTQueueIO
return (S.writeTQueue q, S.readTQueue q)
Single -> do
m <- S.newEmptyTMVarIO
return (S.putTMVar m, S.takeTMVar m)
Latest a -> do
t <- S.newTVarIO a
return (S.writeTVar t, S.readTVar t)
New -> do
m <- S.newEmptyTMVarIO
return (\x -> S.tryTakeTMVar m *> S.putTMVar m x, S.takeTMVar m)
Newest n -> do
q <- S.newTBQueueIO n
let write x = S.writeTBQueue q x <|> (S.tryReadTBQueue q *> write x)
return (write, S.readTBQueue q)
sealed <- S.newTVarIO False
let seal = S.writeTVar sealed True
rSend <- newIORef ()
mkWeakIORef rSend (S.atomically seal)
rRecv <- newIORef ()
mkWeakIORef rRecv (S.atomically seal)
let sendOrEnd a = do
b <- S.readTVar sealed
if b
then return False
else do
write a
return True
readOrEnd = (Just <$> read) <|> (do
b <- S.readTVar sealed
S.check b
return Nothing )
_send a = sendOrEnd a <* unsafeIOToSTM (readIORef rSend)
_recv = readOrEnd <* unsafeIOToSTM (readIORef rRecv)
return (Output _send, Input _recv, seal)
data Buffer a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New