{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings #-}
module System.IO.Streams.ByteString
(
countInput
, countOutput
, fromByteString
, fromLazyByteString
, readExactly
, takeBytesWhile
, writeLazyByteString
, splitOn
, lines
, unlines
, words
, unwords
, giveBytes
, giveExactly
, takeBytes
, takeExactly
, throwIfConsumesMoreThan
, throwIfProducesMoreThan
, throwIfTooSlow
, MatchInfo(..)
, search
, RateTooSlowException
, ReadTooShortException
, TooManyBytesReadException
, TooManyBytesWrittenException
, TooFewBytesWrittenException
) where
import Control.Exception (Exception, throwIO)
import Control.Monad (when, (>=>))
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as S
import qualified Data.ByteString.Lazy.Char8 as L
import qualified Data.ByteString.Unsafe as S
import Data.Char (isSpace)
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Data.Typeable (Typeable)
import Prelude hiding (read, lines, unlines, words, unwords)
import System.IO.Streams.Combinators (filterM, intersperse, outputFoldM)
import System.IO.Streams.Internal (InputStream (..), OutputStream, makeInputStream, makeOutputStream, read, unRead, write)
import System.IO.Streams.Internal.Search (MatchInfo (..), search)
import System.IO.Streams.List (fromList, writeList)
{-# INLINE modifyRef #-}
modifyRef :: IORef a -> (a -> a) -> IO ()
modifyRef ref f = do
x <- readIORef ref
writeIORef ref $! f x
writeLazyByteString :: L.ByteString
-> OutputStream ByteString
-> IO ()
writeLazyByteString = writeList . L.toChunks
{-# INLINE writeLazyByteString #-}
fromByteString :: ByteString -> IO (InputStream ByteString)
fromByteString = fromList . (:[])
fromLazyByteString :: L.ByteString -> IO (InputStream ByteString)
fromLazyByteString = fromList . L.toChunks
countInput :: InputStream ByteString -> IO (InputStream ByteString, IO Int64)
countInput src = do
ref <- newIORef (0 :: Int64)
return $! (InputStream (prod ref) (pb ref), readIORef ref)
where
prod ref = read src >>= maybe (return Nothing) (\x -> do
modifyRef ref (+ (fromIntegral $ S.length x))
return $! Just x)
pb ref s = do
modifyRef ref (\x -> x - (fromIntegral $ S.length s))
unRead s src
countOutput :: OutputStream ByteString
-> IO (OutputStream ByteString, IO Int64)
countOutput = outputFoldM f 0
where
f !count s = return z
where
!c = S.length s
!z = toEnum c + count
takeBytes :: Int64
-> InputStream ByteString
-> IO (InputStream ByteString)
takeBytes k0 = takeBytes' k0 (return Nothing)
{-# INLINE takeBytes #-}
takeExactly :: Int64
-> InputStream ByteString
-> IO (InputStream ByteString)
takeExactly k0 = takeBytes' k0 (throwIO $ ReadTooShortException k0)
{-# INLINE takeExactly #-}
takeBytes' :: Int64
-> IO (Maybe ByteString)
-> InputStream ByteString
-> IO (InputStream ByteString)
takeBytes' k0 h src = do
kref <- newIORef k0
return $! InputStream (prod kref) (pb kref)
where
prod kref = do
k <- readIORef kref
if k <= 0
then return Nothing
else read src >>= maybe h (chunk k)
where
chunk k s = do
let l = fromIntegral $ S.length s
let k' = k - l
if k' <= 0
then let (a,b) = S.splitAt (fromIntegral k) s
in do
when (not $ S.null b) $ unRead b src
writeIORef kref 0
return $! Just a
else writeIORef kref k' >> return (Just s)
pb kref s = do
modifyRef kref (+ (fromIntegral $ S.length s))
unRead s src
{-# INLINE takeBytes' #-}
splitOn :: (Char -> Bool)
-> InputStream ByteString
-> IO (InputStream ByteString)
splitOn p is = do
ref <- newIORef id
makeInputStream $ start ref
where
start ref = go
where
go = read is >>= maybe end chunk
end = do
dl <- readIORef ref
case dl [] of
[] -> return Nothing
xs -> writeIORef ref id >>
(return $! Just $! S.concat xs)
chunk s = let (a, b) = S.break p s
in if S.null b
then modifyRef ref (\f -> f . (a:)) >> go
else do
let !b' = S.unsafeDrop 1 b
dl <- readIORef ref
if S.null b'
then do
writeIORef ref ("" :)
return $ Just $! S.concat $ dl [a]
else do
writeIORef ref id
unRead b' is
return $ Just $! S.concat $ dl [a]
lines :: InputStream ByteString -> IO (InputStream ByteString)
lines = splitOn (== '\n')
words :: InputStream ByteString -> IO (InputStream ByteString)
words = splitOn isSpace >=> filterM (return . not . S.all isSpace)
unlines :: OutputStream ByteString -> IO (OutputStream ByteString)
unlines os = makeOutputStream $ \m -> do
write m os
case m of
Nothing -> return $! ()
Just _ -> write (Just "\n") os
unwords :: OutputStream ByteString -> IO (OutputStream ByteString)
unwords = intersperse " "
data TooManyBytesReadException = TooManyBytesReadException deriving (Typeable)
instance Show TooManyBytesReadException where
show TooManyBytesReadException = "Too many bytes read"
instance Exception TooManyBytesReadException
data TooFewBytesWrittenException = TooFewBytesWrittenException deriving (Typeable)
instance Show TooFewBytesWrittenException where
show TooFewBytesWrittenException = "Too few bytes written"
instance Exception TooFewBytesWrittenException
data TooManyBytesWrittenException =
TooManyBytesWrittenException deriving (Typeable)
instance Show TooManyBytesWrittenException where
show TooManyBytesWrittenException = "Too many bytes written"
instance Exception TooManyBytesWrittenException
data ReadTooShortException = ReadTooShortException Int64 deriving (Typeable)
instance Show ReadTooShortException where
show (ReadTooShortException x) = "Short read, expected " ++ show x
++ " bytes"
instance Exception ReadTooShortException
throwIfProducesMoreThan
:: Int64
-> InputStream ByteString
-> IO (InputStream ByteString)
throwIfProducesMoreThan k0 src = do
kref <- newIORef k0
return $! InputStream (prod kref) (pb kref)
where
prod kref = read src >>= maybe (return Nothing) chunk
where
chunk s = do
k <- readIORef kref
let k' = k - l
case () of !_ | l == 0 -> return (Just s)
| k == 0 -> throwIO TooManyBytesReadException
| k' >= 0 -> writeIORef kref k' >> return (Just s)
| otherwise -> do
let (!a,!b) = S.splitAt (fromEnum k) s
writeIORef kref 0
unRead b src
return $! Just a
where
l = toEnum $ S.length s
pb kref s = do
unRead s src
modifyRef kref (+ (fromIntegral $ S.length s))
readExactly :: Int
-> InputStream ByteString
-> IO ByteString
readExactly n input = go id n
where
go !dl 0 = return $! S.concat $! dl []
go !dl k =
read input >>=
maybe (throwIO $ ReadTooShortException (fromIntegral n))
(\s -> do
let l = S.length s
if l >= k
then do
let (a,b) = S.splitAt k s
when (not $ S.null b) $ unRead b input
return $! S.concat $! dl [a]
else go (dl . (s:)) (k - l))
takeBytesWhile :: (Char -> Bool)
-> InputStream ByteString
-> IO (Maybe ByteString)
takeBytesWhile p input = read input >>= maybe (return Nothing) (go id)
where
go dl !s | S.null b = read input >>= maybe finish (go dl')
| otherwise = unRead b input >> finish
where
(a, b) = S.span p s
dl' = dl . (a:)
finish = return $! Just $! S.concat $! dl [a]
giveBytes :: Int64
-> OutputStream ByteString
-> IO (OutputStream ByteString)
giveBytes k0 str = do
kref <- newIORef k0
makeOutputStream $ sink kref
where
sink _ Nothing = write Nothing str
sink kref mb@(Just x) = do
k <- readIORef kref
let l = fromIntegral $ S.length x
let k' = k - l
if k' < 0
then do let a = S.take (fromIntegral k) x
when (not $ S.null a) $ write (Just a) str
writeIORef kref 0
else writeIORef kref k' >> write mb str
giveExactly :: Int64
-> OutputStream ByteString
-> IO (OutputStream ByteString)
giveExactly k0 os = do
ref <- newIORef k0
makeOutputStream $ go ref
where
go ref chunk = do
!n <- readIORef ref
case chunk of
Nothing -> if n /= 0
then throwIO TooFewBytesWrittenException
else return $! ()
Just s -> let n' = n - fromIntegral (S.length s)
in if n' < 0
then throwIO TooManyBytesWrittenException
else do writeIORef ref n'
write chunk os
throwIfConsumesMoreThan
:: Int64
-> OutputStream ByteString
-> IO (OutputStream ByteString)
throwIfConsumesMoreThan k0 str = do
kref <- newIORef k0
makeOutputStream $ sink kref
where
sink _ Nothing = write Nothing str
sink kref mb@(Just x) = do
k <- readIORef kref
let l = toEnum $ S.length x
let k' = k - l
if k' < 0
then throwIO TooManyBytesWrittenException
else writeIORef kref k' >> write mb str
getTime :: IO Double
getTime = realToFrac `fmap` getPOSIXTime
data RateTooSlowException = RateTooSlowException deriving (Typeable)
instance Show RateTooSlowException where
show RateTooSlowException = "Input rate too slow"
instance Exception RateTooSlowException
throwIfTooSlow
:: IO ()
-> Double
-> Int
-> InputStream ByteString
-> IO (InputStream ByteString)
throwIfTooSlow !bump !minRate !minSeconds' !stream = do
!_ <- bump
startTime <- getTime
bytesRead <- newIORef (0 :: Int64)
return $! InputStream (prod startTime bytesRead) (pb bytesRead)
where
prod startTime bytesReadRef = read stream >>= maybe (return Nothing) chunk
where
chunk s = do
let slen = S.length s
now <- getTime
let !delta = now - startTime
nb <- readIORef bytesReadRef
let newBytes = nb + fromIntegral slen
when (delta > minSeconds + 1 &&
(fromIntegral newBytes /
(delta - minSeconds)) < minRate) $
throwIO RateTooSlowException
!_ <- bump
writeIORef bytesReadRef newBytes
return $! Just s
pb bytesReadRef s = do
modifyRef bytesReadRef $ \x -> x - (fromIntegral $ S.length s)
unRead s stream
minSeconds = fromIntegral minSeconds'