{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Data.Conduit.Process
(
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
, FlushInput(..)
, BuilderInput(..)
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO () -> ConduitM i o m r -> IO (ConduitM i o m r)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO () -> (ConduitM i o m r, n r') -> IO (ConduitM i o m r, n r')
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h, IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> BuilderInput o m r -> IO (BuilderInput o m r)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (BuilderInput o m r -> IO (BuilderInput o m r))
-> BuilderInput o m r -> IO (BuilderInput o m r)
forall a b. (a -> b) -> a -> b
$ ConduitM Builder o m r -> BuilderInput o m r
forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput (ConduitM Builder o m r -> BuilderInput o m r)
-> ConduitM Builder o m r -> BuilderInput o m r
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM Builder o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (BuilderInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> (BuilderInput o m r, n r') -> IO (BuilderInput o m r, n r')
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitM Builder o m r -> BuilderInput o m r
forall o (m :: * -> *) r.
ConduitM Builder o m r -> BuilderInput o m r
BuilderInput (ConduitM Builder o m r -> BuilderInput o m r)
-> ConduitM Builder o m r -> BuilderInput o m r
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM Builder o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h, IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> FlushInput o m r -> IO (FlushInput o m r)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (FlushInput o m r -> IO (FlushInput o m r))
-> FlushInput o m r -> IO (FlushInput o m r)
forall a b. (a -> b) -> a -> b
$ ConduitM (Flush ByteString) o m r -> FlushInput o m r
forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput (ConduitM (Flush ByteString) o m r -> FlushInput o m r)
-> ConduitM (Flush ByteString) o m r -> FlushInput o m r
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream :: (Maybe Handle -> IO (FlushInput o m r, n r'), Maybe StdStream)
isStdStream = (\(Just Handle
h) -> (FlushInput o m r, n r') -> IO (FlushInput o m r, n r')
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitM (Flush ByteString) o m r -> FlushInput o m r
forall o (m :: * -> *) r.
ConduitM (Flush ByteString) o m r -> FlushInput o m r
FlushInput (ConduitM (Flush ByteString) o m r -> FlushInput o m r)
-> ConduitM (Flush ByteString) o m r -> FlushInput o m r
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h, IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO () -> ConduitM i o m r -> IO (ConduitM i o m r)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Handle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream :: (Maybe Handle -> IO (ConduitM i o m r, n r'), Maybe StdStream)
osStdStream = (\(Just Handle
h) -> Handle -> BufferMode -> IO ()
hSetBuffering Handle
h BufferMode
NoBuffering IO () -> (ConduitM i o m r, n r') -> IO (ConduitM i o m r, n r')
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> (Handle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h, IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> IO () -> n ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h), StdStream -> Maybe StdStream
forall a. a -> Maybe a
Just StdStream
CreatePipe)
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceProcessWithConsumer :: forall (m :: * -> *) a.
MonadIO m =>
CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer CreateProcess
cp ConduitT ByteString Void m a
consumer = do
(ClosedStream, (source, close), ClosedStream, cph) <- CreateProcess
-> m (ClosedStream, (ConduitT () ByteString m (), m ()),
ClosedStream, StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
res <- runConduit $ source .| consumer
close
ec <- waitForStreamingProcess cph
return (ec, res)
sourceCmdWithConsumer :: MonadIO m
=> String
-> ConduitT ByteString Void m a
-> m (ExitCode, a)
sourceCmdWithConsumer :: forall (m :: * -> *) a.
MonadIO m =>
String -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceCmdWithConsumer String
cmd = CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
forall (m :: * -> *) a.
MonadIO m =>
CreateProcess -> ConduitT ByteString Void m a -> m (ExitCode, a)
sourceProcessWithConsumer (String -> CreateProcess
shell String
cmd)
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams CreateProcess
cp ConduitT () ByteString m ()
producerStdin ConduitT ByteString Void m a
consumerStdout ConduitT ByteString Void m b
consumerStderr =
(UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b)
forall (m :: * -> *) a.
MonadUnliftIO m =>
(UnliftIO m -> IO a) -> m a
withUnliftIO ((UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b))
-> (UnliftIO m -> IO (ExitCode, a, b)) -> m (ExitCode, a, b)
forall a b. (a -> b) -> a -> b
$ \UnliftIO m
u -> do
( (sinkStdin, closeStdin)
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- CreateProcess
-> IO
((ConduitT ByteString Void m (), IO ()),
(ConduitT () ByteString m (), IO ()),
(ConduitT () ByteString m (), IO ()), StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
sourceCmdWithStreams
:: MonadUnliftIO m
=> String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
String
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceCmdWithStreams String
cmd = CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
CreateProcess
-> ConduitT () ByteString m ()
-> ConduitT ByteString Void m a
-> ConduitT ByteString Void m b
-> m (ExitCode, a, b)
sourceProcessWithStreams (String -> CreateProcess
shell String
cmd)
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup :: forall stdin stderr stdout (m :: * -> *) b.
(InputSource stdin, OutputSink stderr, OutputSink stdout,
MonadUnliftIO m) =>
CreateProcess -> (stdin -> stdout -> stderr -> m b) -> m b
withCheckedProcessCleanup CreateProcess
cp stdin -> stdout -> stderr -> m b
f = ((forall a. m a -> IO a) -> IO b) -> m b
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO (stdin, stdout, stderr, StreamingProcessHandle)
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO ())
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO b)
-> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(CreateProcess -> IO (stdin, stdout, stderr, StreamingProcessHandle)
forall (m :: * -> *) stdin stdout stderr.
(MonadIO m, InputSource stdin, OutputSink stdout,
OutputSink stderr) =>
CreateProcess -> m (stdin, stdout, stderr, StreamingProcessHandle)
streamingProcess CreateProcess
cp)
(\(stdin
_, stdout
_, stderr
_, StreamingProcessHandle
sph) -> StreamingProcessHandle -> IO ()
forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
closeStreamingProcessHandle StreamingProcessHandle
sph)
(((stdin, stdout, stderr, StreamingProcessHandle) -> IO b) -> IO b)
-> ((stdin, stdout, stderr, StreamingProcessHandle) -> IO b)
-> IO b
forall a b. (a -> b) -> a -> b
$ \(stdin
x, stdout
y, stderr
z, StreamingProcessHandle
sph) -> do
res <- m b -> IO b
forall a. m a -> IO a
run (stdin -> stdout -> stderr -> m b
f stdin
x stdout
y stderr
z) IO b -> IO () -> IO b
forall a b. IO a -> IO b -> IO a
`onException` StreamingProcessHandle -> IO ()
forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess StreamingProcessHandle
sph
ec <- waitForStreamingProcess sph
if ec == ExitSuccess
then return res
else throwIO $ ProcessExitedUnsuccessfully cp ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess :: forall (m :: * -> *). MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (StreamingProcessHandle -> IO ())
-> StreamingProcessHandle
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessHandle -> IO ()
terminateProcess (ProcessHandle -> IO ())
-> (StreamingProcessHandle -> ProcessHandle)
-> StreamingProcessHandle
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamingProcessHandle -> ProcessHandle
streamingProcessHandleRaw