{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
module System.IO.Streams.Internal
(
SP(..)
, StreamPair
, InputStream(..)
, OutputStream(..)
, read
, unRead
, peek
, write
, atEOF
, makeInputStream
, makeOutputStream
, appendInputStream
, concatInputStreams
, connect
, connectTo
, supply
, supplyTo
, lockingInputStream
, lockingOutputStream
, nullInput
, nullOutput
, Generator
, fromGenerator
, yield
, Consumer
, fromConsumer
, await
) where
import Control.Applicative (Applicative (..))
import Control.Concurrent (newMVar, withMVar)
import Control.Exception (throwIO)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO (..))
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Internal as S
import qualified Data.ByteString.Unsafe as S
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Maybe (isNothing)
import Data.Typeable (Typeable)
import Data.Word (Word8)
import Foreign.Marshal.Utils (copyBytes)
import Foreign.Ptr (castPtr)
import qualified GHC.IO.Buffer as H
import qualified GHC.IO.BufferedIO as H
import qualified GHC.IO.Device as H
import GHC.IO.Exception (unsupportedOperation)
import Prelude hiding (read)
data SP a b = SP !a !b
deriving (Typeable)
data InputStream a = InputStream {
_read :: IO (Maybe a)
, _unRead :: a -> IO ()
} deriving (Typeable)
data OutputStream a = OutputStream {
_write :: Maybe a -> IO ()
} deriving (Typeable)
read :: InputStream a -> IO (Maybe a)
read = _read
{-# INLINE read #-}
write :: Maybe a -> OutputStream a -> IO ()
write = flip _write
{-# INLINE write #-}
peek :: InputStream a -> IO (Maybe a)
peek s = do
x <- read s
maybe (return $! ()) (_unRead s) x
return x
unRead :: a -> InputStream a -> IO ()
unRead = flip _unRead
connect :: InputStream a -> OutputStream a -> IO ()
connect p q = loop
where
loop = do
m <- read p
maybe (write Nothing q)
(const $ write m q >> loop)
m
{-# INLINE connect #-}
connectTo :: OutputStream a -> InputStream a -> IO ()
connectTo = flip connect
{-# INLINE connectTo #-}
supply :: InputStream a -> OutputStream a -> IO ()
supply p q = loop
where
loop = do
m <- read p
maybe (return $! ())
(const $ write m q >> loop)
m
{-# INLINE supply #-}
supplyTo :: OutputStream a -> InputStream a -> IO ()
supplyTo = flip supply
{-# INLINE supplyTo #-}
makeInputStream :: IO (Maybe a) -> IO (InputStream a)
makeInputStream m = do
doneRef <- newIORef False
pbRef <- newIORef []
return $! InputStream (grab doneRef pbRef) (pb pbRef)
where
grab doneRef pbRef = do
l <- readIORef pbRef
case l of
[] -> do done <- readIORef doneRef
if done
then return Nothing
else do
x <- m
when (isNothing x) $ writeIORef doneRef True
return x
(x:xs) -> writeIORef pbRef xs >> (return $! Just x)
pb ref x = readIORef ref >>= \xs -> writeIORef ref (x:xs)
{-# INLINE makeInputStream #-}
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream = return . OutputStream
lockingInputStream :: InputStream a -> IO (InputStream a)
lockingInputStream s = do
mv <- newMVar $! ()
return $! InputStream (grab mv) (pb mv)
where
grab mv = withMVar mv $ const $ read s
pb mv x = withMVar mv $ const $ unRead x s
{-# INLINE lockingInputStream #-}
lockingOutputStream :: OutputStream a -> IO (OutputStream a)
lockingOutputStream s = do
mv <- newMVar $! ()
makeOutputStream $ f mv
where
f mv x = withMVar mv $ const $ write x s
{-# INLINE lockingOutputStream #-}
nullInput :: IO (InputStream a)
nullInput = makeInputStream $ return Nothing
nullOutput :: IO (OutputStream a)
nullOutput = makeOutputStream $ const $ return $! ()
appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a)
appendInputStream s1 s2 = concatInputStreams [s1, s2]
concatInputStreams :: [InputStream a] -> IO (InputStream a)
concatInputStreams inputStreams = do
ref <- newIORef inputStreams
makeInputStream $! run ref
where
run ref = go
where
go = do
streams <- readIORef ref
case streams of
[] -> return Nothing
(s:rest) -> do
next <- read s
case next of
Nothing -> writeIORef ref rest >> go
Just _ -> return next
atEOF :: InputStream a -> IO Bool
atEOF s = read s >>= maybe (return True) (\k -> unRead k s >> return False)
bUFSIZ :: Int
bUFSIZ = 32752
unsupported :: IO a
unsupported = throwIO unsupportedOperation
bufferToBS :: H.Buffer Word8 -> ByteString
bufferToBS buf = S.copy $! S.fromForeignPtr raw l sz
where
raw = H.bufRaw buf
l = H.bufL buf
r = H.bufR buf
sz = r - l
instance H.RawIO (InputStream ByteString) where
read is ptr n = read is >>= maybe (return 0) f
where
f s = S.unsafeUseAsCStringLen s $ \(cstr, l) -> do
let c = min n l
copyBytes ptr (castPtr cstr) c
return $! c
readNonBlocking _ _ _ = unsupported
write _ _ _ = unsupported
writeNonBlocking _ _ _ = unsupported
instance H.RawIO (OutputStream ByteString) where
read _ _ _ = unsupported
readNonBlocking _ _ _ = unsupported
write os ptr n = S.packCStringLen (castPtr ptr, n) >>=
flip write os . Just
writeNonBlocking _ _ _ = unsupported
type StreamPair a = SP (InputStream a) (OutputStream a)
instance H.RawIO (StreamPair ByteString) where
read (SP is _) ptr n = H.read is ptr n
readNonBlocking _ _ _ = unsupported
write (SP _ os) ptr n = H.write os ptr n
writeNonBlocking _ _ _ = unsupported
instance H.BufferedIO (OutputStream ByteString) where
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer !_ _ = unsupported
fillReadBuffer0 !_ _ = unsupported
flushWriteBuffer !os !buf = do
write (Just $! bufferToBS buf) os
emptyWriteBuffer buf
flushWriteBuffer0 !os !buf = do
let s = bufferToBS buf
let l = S.length s
write (Just s) os
buf' <- emptyWriteBuffer buf
return $! (l, buf')
instance H.BufferedIO (InputStream ByteString) where
newBuffer !_ !bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer !is !buf = H.readBuf is buf
fillReadBuffer0 _ _ = unsupported
flushWriteBuffer _ _ = unsupported
flushWriteBuffer0 _ _ = unsupported
instance H.BufferedIO (StreamPair ByteString) where
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs
fillReadBuffer (SP is _) = H.fillReadBuffer is
fillReadBuffer0 _ _ = unsupported
flushWriteBuffer (SP _ !os) = H.flushWriteBuffer os
flushWriteBuffer0 (SP _ !os) = H.flushWriteBuffer0 os
instance H.IODevice (OutputStream ByteString) where
ready _ _ _ = return True
close = write Nothing
devType _ = return H.Stream
instance H.IODevice (InputStream ByteString) where
ready _ _ _ = return True
close _ = return $! ()
devType _ = return H.Stream
instance H.IODevice (StreamPair ByteString) where
ready _ _ _ = return True
close (SP _ os) = write Nothing os
devType _ = return H.Stream
emptyWriteBuffer :: H.Buffer Word8
-> IO (H.Buffer Word8)
emptyWriteBuffer buf
= return buf { H.bufL=0, H.bufR=0, H.bufState = H.WriteBuffer }
newtype Generator r a = Generator {
unG :: IO (Either (SP r (Generator r a)) a)
} deriving (Typeable)
generatorBind :: Generator r a -> (a -> Generator r b) -> Generator r b
generatorBind (Generator m) f = Generator (m >>= either step value)
where
step (SP v r) = return $! Left $! SP v (generatorBind r f)
value = unG . f
{-# INLINE generatorBind #-}
instance Monad (Generator r) where
return = Generator . return . Right
(>>=) = generatorBind
instance MonadIO (Generator r) where
liftIO = Generator . (Right `fmap`)
instance Functor (Generator r) where
fmap f (Generator m) = Generator $ m >>= either step value
where
step (SP v m') = return $! Left $! SP v (fmap f m')
value v = return $! Right $! f v
instance Applicative (Generator r) where
pure = Generator . return . Right
m <*> n = do
f <- m
v <- n
return $! f v
yield :: r -> Generator r ()
yield x = Generator $! return $! Left $! SP x (return $! ())
fromGenerator :: Generator r a -> IO (InputStream r)
fromGenerator (Generator m) = do
ref <- newIORef m
makeInputStream $! go ref
where
go ref = readIORef ref >>= (\n -> n >>= either step finish)
where
step (SP v gen) = do
writeIORef ref $! unG gen
return $! Just v
finish _ = return Nothing
newtype Consumer c a = Consumer {
unC :: IO (Either (Maybe c -> Consumer c a) a)
} deriving (Typeable)
instance Monad (Consumer c) where
return = Consumer . return . Right
(Consumer m) >>= f = Consumer $ m >>= either step value
where
step g = return $! Left $! (>>= f) . g
value v = unC $ f v
instance MonadIO (Consumer c) where
liftIO = Consumer . fmap Right
instance Functor (Consumer r) where
fmap f (Consumer m) = Consumer (m >>= either step value)
where
step g = return $! Left $! (fmap f) . g
value v = return $! Right $! f v
instance Applicative (Consumer r) where
pure = return
m <*> n = do
f <- m
v <- n
return $! f v
await :: Consumer r (Maybe r)
await = Consumer $ return (Left return)
fromConsumer :: Consumer r a -> IO (OutputStream r)
fromConsumer c0 = newIORef c0 >>= makeOutputStream . go
where
go ref mb = do
c <- readIORef ref
c' <- unC c >>= either step (const $! return c)
writeIORef ref c'
where
force c = do e <- unC c
return $! Consumer $! return e
step g = force $! g mb