module System.IO.Streams.Tests.Concurrent (tests) where
import Control.Concurrent
import Control.Monad
import Prelude hiding (lines, read,
takeWhile, unlines,
unwords, unwords,
words)
import qualified Prelude
import qualified System.IO.Streams as Streams
import qualified System.IO.Streams.Concurrent as Streams
import System.IO.Streams.Tests.Common
import Test.Framework
import Test.Framework.Providers.HUnit
import Test.Framework.Providers.QuickCheck2
import Test.HUnit hiding (Test)
import Test.QuickCheck hiding (output)
import Test.QuickCheck.Monadic
tests :: [Test]
tests = [ testMakeChanPipe
, testConcurrentMerge
, testConcurrentMergeException
, testInputOutput
]
testMakeChanPipe :: Test
testMakeChanPipe = testProperty "concurrent/makeChanPipe" $
monadicIO $
forAllM arbitrary prop
where
prop :: [Int] -> PropertyM IO ()
prop l = liftQ $ do
(is, os) <- Streams.makeChanPipe
_ <- forkIO $ Streams.writeList l os >> Streams.write Nothing os
Streams.toList is >>= assertEqual "makeChanPipe" l
testConcurrentMerge :: Test
testConcurrentMerge = testCase "concurrent/concurrentMerge" $ do
mvars <- replicateM nthreads newEmptyMVar
chans <- replicateM nthreads newChan
let firstMVar = head mvars
mapM_ (forkIO . ring) $ zip3 mvars (take nthreads $ drop 1 $ cycle mvars)
chans
inputs <- mapM Streams.chanToInput chans
resultMVar <- newEmptyMVar
forkIO (Streams.concurrentMerge inputs >>= Streams.toList
>>= putMVar resultMVar)
putMVar firstMVar 0
result <- takeMVar resultMVar
assertEqual "concurrent merge" [0..10] result
where
maxval = 10 :: Int
nthreads = 4 :: Int
ring (prev, next, chan) = loop
where
loop = do x <- takeMVar prev
if x > maxval
then do writeChan chan Nothing
putMVar next x
else do writeChan chan $ Just x
threadDelay 100000
putMVar next $! x + 1
loop
testConcurrentMergeException :: Test
testConcurrentMergeException =
testCase "concurrent/concurrentMerge/exception" $ do
inp <- Streams.makeInputStream (error "bad") >>=
Streams.concurrentMerge . (:[])
expectExceptionH (Streams.toList inp)
testInputOutput :: Test
testInputOutput = testCase "concurrent/input-output" $ do
is <- Streams.fromList [1..10::Int]
chan <- newChan
is' <- Streams.chanToInput chan
Streams.inputToChan is chan
Streams.toList is' >>= assertEqual "input-output" [1..10]