{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DeriveFunctor #-}
module Data.Conduit
(
Source
, Conduit
, Sink
, ConduitM
, ($$)
, ($=)
, (=$)
, (=$=)
, fuseBoth
, fuseUpstream
, await
, yield
, leftover
, bracketP
, addCleanup
, yieldOr
, catchC
, handleC
, tryC
, Producer
, Consumer
, toProducer
, toConsumer
, awaitForever
, transPipe
, mapOutput
, mapOutputMaybe
, mapInput
, passthroughSink
, ResumableSource
, newResumableSource
, ($$+)
, ($$++)
, ($$+-)
, ($=+)
, unwrapResumable
, closeResumableSource
, ResumableConduit
, newResumableConduit
, (=$$+)
, (=$$++)
, (=$$+-)
, unwrapResumableConduit
, fuseLeftovers
, fuseReturnLeftovers
, Flush (..)
, ZipSource (..)
, sequenceSources
, ZipSink (..)
, sequenceSinks
, ZipConduit (..)
, sequenceConduits
) where
import Control.Monad.Trans.Class (lift)
import Data.Conduit.Internal hiding (await, awaitForever, yield, yieldOr, leftover, bracketP, addCleanup, transPipe, mapOutput, mapOutputMaybe, mapInput, yieldM)
import qualified Data.Conduit.Internal as CI
import Control.Monad.Morph (hoist)
import Control.Monad (liftM, forever, when, unless)
import Control.Applicative (Applicative (..))
import Data.Traversable (Traversable (..))
import Control.Monad.Trans.Resource (MonadResource)
infixr 0 $$
infixl 1 $=
infixr 2 =$
infixr 2 =$=
infixr 0 $$+
infixr 0 $$++
infixr 0 $$+-
infixl 1 $=+
($$) :: Monad m => Source m a -> Sink a m b -> m b
src $$ sink = do
(rsrc, res) <- src $$+ sink
rsrc $$+- return ()
return res
{-# INLINE ($$) #-}
($=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
ConduitM src $= ConduitM con = ConduitM $ pipeL src con
{-# INLINE ($=) #-}
(=$) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
ConduitM con =$ ConduitM sink = ConduitM $ pipeL con sink
{-# INLINE (=$) #-}
(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
ConduitM left =$= ConduitM right = ConduitM $ pipeL left right
{-# INLINE (=$=) #-}
await :: Monad m => Consumer i m (Maybe i)
await = ConduitM CI.await
{-# RULES "await >>= maybe" forall x y. await >>= maybe x y = ConduitM (NeedInput (unConduitM . y) (unConduitM . const x)) #-}
{-# INLINE [1] await #-}
yield :: Monad m
=> o
-> ConduitM i o m ()
yield = ConduitM . CI.yield
{-# INLINE [1] yield #-}
yieldM :: Monad m => m o -> ConduitM i o m ()
yieldM = ConduitM . CI.yieldM
{-# INLINE [1] yieldM #-}
{-# RULES
"yield o >> p" forall o (p :: ConduitM i o m r). yield o >> p = ConduitM (HaveOutput (unConduitM p) (return ()) o)
; "mapM_ yield" mapM_ yield = ConduitM . sourceList
; "yieldOr o c >> p" forall o c (p :: ConduitM i o m r). yieldOr o c >> p =
ConduitM (HaveOutput (unConduitM p) c o)
; "when yield next" forall b o p. when b (yield o) >> p =
if b then ConduitM (HaveOutput (unConduitM p) (return ()) o) else p
; "unless yield next" forall b o p. unless b (yield o) >> p =
if b then p else ConduitM (HaveOutput (unConduitM p) (return ()) o)
; "lift m >>= yield" forall m. lift m >>= yield = yieldM m
#-}
leftover :: i -> ConduitM i o m ()
leftover = ConduitM . CI.leftover
{-# INLINE [1] leftover #-}
{-# RULES "leftover l >> p" forall l (p :: ConduitM i o m r). leftover l >> p =
ConduitM (Leftover (unConduitM p) l) #-}
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> ConduitM i o m r)
-> ConduitM i o m r
bracketP alloc free inside = ConduitM $ CI.bracketP alloc free $ unConduitM . inside
addCleanup :: Monad m
=> (Bool -> m ())
-> ConduitM i o m r
-> ConduitM i o m r
addCleanup f = ConduitM . CI.addCleanup f . unConduitM
yieldOr :: Monad m
=> o
-> m ()
-> ConduitM i o m ()
yieldOr o m = ConduitM $ CI.yieldOr o m
{-# INLINE [1] yieldOr #-}
awaitForever :: Monad m => (i -> ConduitM i o m r) -> ConduitM i o m ()
awaitForever f = ConduitM $ CI.awaitForever (unConduitM . f)
transPipe :: Monad m => (forall a. m a -> n a) -> ConduitM i o m r -> ConduitM i o n r
transPipe = hoist
mapOutput :: Monad m => (o1 -> o2) -> ConduitM i o1 m r -> ConduitM i o2 m r
mapOutput f (ConduitM p) = ConduitM $ CI.mapOutput f p
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> ConduitM i o1 m r -> ConduitM i o2 m r
mapOutputMaybe f (ConduitM p) = ConduitM $ CI.mapOutputMaybe f p
mapInput :: Monad m
=> (i1 -> i2)
-> (i2 -> Maybe i1)
-> ConduitM i2 o m r
-> ConduitM i1 o m r
mapInput f g (ConduitM p) = ConduitM $ CI.mapInput f g p
($$+) :: Monad m => Source m a -> Sink a m b -> m (ResumableSource m a, b)
src $$+ sink = connectResume (ResumableSource src (return ())) sink
{-# INLINE ($$+) #-}
($$++) :: Monad m => ResumableSource m a -> Sink a m b -> m (ResumableSource m a, b)
($$++) = connectResume
{-# INLINE ($$++) #-}
($$+-) :: Monad m => ResumableSource m a -> Sink a m b -> m b
rsrc $$+- sink = do
(ResumableSource _ final, res) <- connectResume rsrc sink
final
return res
{-# INLINE ($$+-) #-}
($=+) :: Monad m => ResumableSource m a -> Conduit a m b -> ResumableSource m b
ResumableSource src final $=+ sink = ResumableSource (src $= sink) final
closeResumableSource :: Monad m => ResumableSource m a -> m ()
closeResumableSource = ($$+- return ())
data Flush a = Chunk a | Flush
deriving (Show, Eq, Ord)
instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)
newtype ZipSource m o = ZipSource { getZipSource :: Source m o }
instance Monad m => Functor (ZipSource m) where
fmap f = ZipSource . mapOutput f . getZipSource
instance Monad m => Applicative (ZipSource m) where
pure = ZipSource . forever . yield
(ZipSource f) <*> (ZipSource x) = ZipSource $ zipSourcesApp f x
sequenceSources :: (Traversable f, Monad m) => f (Source m o) -> Source m (f o)
sequenceSources = getZipSource . sequenceA . fmap ZipSource
newtype ZipSink i m r = ZipSink { getZipSink :: Sink i m r }
instance Monad m => Functor (ZipSink i m) where
fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
pure = ZipSink . return
(ZipSink f) <*> (ZipSink x) =
ZipSink $ liftM (uncurry ($)) $ zipSinks f x
sequenceSinks :: (Traversable f, Monad m) => f (Sink i m r) -> Sink i m (f r)
sequenceSinks = getZipSink . sequenceA . fmap ZipSink
(=$$+) :: Monad m => Conduit a m b -> Sink b m r -> Sink a m (ResumableConduit a m b, r)
(=$$+) conduit = connectResumeConduit (ResumableConduit conduit (return ()))
{-# INLINE (=$$+) #-}
(=$$++) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m (ResumableConduit i m o, r)
(=$$++) = connectResumeConduit
{-# INLINE (=$$++) #-}
(=$$+-) :: Monad m => ResumableConduit i m o -> Sink o m r -> Sink i m r
rsrc =$$+- sink = do
(ResumableConduit _ final, res) <- connectResumeConduit rsrc sink
lift final
return res
{-# INLINE (=$$+-) #-}
infixr 0 =$$+
infixr 0 =$$++
infixr 0 =$$+-
newtype ZipConduit i o m r = ZipConduit { getZipConduit :: ConduitM i o m r }
deriving Functor
instance Monad m => Applicative (ZipConduit i o m) where
pure = ZipConduit . pure
ZipConduit left <*> ZipConduit right = ZipConduit (zipConduitApp left right)
sequenceConduits :: (Traversable f, Monad m) => f (ConduitM i o m r) -> ConduitM i o m (f r)
sequenceConduits = getZipConduit . sequenceA . fmap ZipConduit
fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2)
fuseBoth (ConduitM up) (ConduitM down) =
ConduitM $ pipeL up (withUpstream $ generalizeUpstream down)
{-# INLINE fuseBoth #-}
fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r
fuseUpstream up down = fmap fst (fuseBoth up down)
{-# INLINE fuseUpstream #-}