{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Sync (
    LoopCheck (..),
    newLoopCheck,
    syncWithSender,
    syncWithSender',
    makeOutput,
    makeOutputIO,
    enqueueOutputSIO,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Network.Control
import Network.HTTP.Semantics.IO

import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types

syncWithSender
    :: Context
    -> Stream
    -> OutputType
    -> LoopCheck
    -> IO ()
syncWithSender :: Context -> Stream -> OutputType -> LoopCheck -> IO ()
syncWithSender ctx :: Context
ctx@Context{TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
..} Stream
strm OutputType
otyp LoopCheck
lc = do
    (pop, out) <- Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp
    enqueueOutput outputQ out
    syncWithSender' ctx pop lc

makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput :: Stream -> OutputType -> IO (IO Sync, Output)
makeOutput Stream
strm OutputType
otyp = do
    var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
    let push Maybe Output
mout = case Maybe Output
mout of
            Maybe Output
Nothing -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done
            Just Output
ot -> MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ Output -> Sync
Cont Output
ot
        pop = MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
        out =
            Output
                { outputStream :: Stream
outputStream = Stream
strm
                , outputType :: OutputType
outputType = OutputType
otyp
                , outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
                }
    return (pop, out)

makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO :: Context -> Stream -> OutputType -> Output
makeOutputIO Context{TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutputType
otyp = Output
out
  where
    push :: Maybe Output -> IO ()
push Maybe Output
mout = case Maybe Output
mout of
        Maybe Output
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        -- Sender enqueues output again ignoring
        -- the stream TX window.
        Just Output
ot -> TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
ot
    out :: Output
out =
        Output
            { outputStream :: Stream
outputStream = Stream
strm
            , outputType :: OutputType
outputType = OutputType
otyp
            , outputSync :: Maybe Output -> IO ()
outputSync = Maybe Output -> IO ()
push
            }

enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO :: Context -> Stream -> OutputType -> IO ()
enqueueOutputSIO ctx :: Context
ctx@Context{TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutputType
otyp = do
    let out :: Output
out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
otyp
    TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out

syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' :: Context -> IO Sync -> LoopCheck -> IO ()
syncWithSender' Context{TVar WindowSize
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef WindowSize
IORef (Maybe WindowSize)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar WindowSize
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef WindowSize
peerStreamId :: Context -> IORef WindowSize
myStreamId :: Context -> TVar WindowSize
continued :: Context -> IORef (Maybe WindowSize)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe WindowSize)
myStreamId :: TVar WindowSize
peerStreamId :: IORef WindowSize
outputBufferLimit :: IORef WindowSize
outputQ :: TQueue Output
outputQStreamID :: TVar WindowSize
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} IO Sync
pop LoopCheck
lc = IO ()
loop
  where
    loop :: IO ()
loop = do
        s <- IO Sync
pop
        case s of
            Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Cont Output
newout -> do
                cont <- LoopCheck -> IO Bool
checkLoop LoopCheck
lc
                when cont $ do
                    -- This is justified by the precondition above
                    enqueueOutput outputQ newout
                    loop

newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck :: Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = do
    tovar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
    return $
        LoopCheck
            { lcTBQ = mtbq
            , lcTimeout = tovar
            , lcWindow = streamTxFlow strm
            }

data LoopCheck = LoopCheck
    { LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTBQ :: Maybe (TBQueue StreamingChunk)
    , LoopCheck -> TVar Bool
lcTimeout :: TVar Bool
    , LoopCheck -> TVar TxFlow
lcWindow :: TVar TxFlow
    }

checkLoop :: LoopCheck -> IO Bool
checkLoop :: LoopCheck -> IO Bool
checkLoop LoopCheck{Maybe (TBQueue StreamingChunk)
TVar Bool
TVar TxFlow
lcTBQ :: LoopCheck -> Maybe (TBQueue StreamingChunk)
lcTimeout :: LoopCheck -> TVar Bool
lcWindow :: LoopCheck -> TVar TxFlow
lcTBQ :: Maybe (TBQueue StreamingChunk)
lcTimeout :: TVar Bool
lcWindow :: TVar TxFlow
..} = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
    tout <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
lcTimeout
    if tout
        then return False
        else do
            waitStreaming' lcTBQ
            waitStreamWindowSizeSTM lcWindow
            return True

waitStreaming' :: Maybe (TBQueue a) -> STM ()
waitStreaming' :: forall a. Maybe (TBQueue a) -> STM ()
waitStreaming' Maybe (TBQueue a)
Nothing = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
waitStreaming' (Just TBQueue a
tbq) = do
    isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
    check (not isEmpty)

waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM :: TVar TxFlow -> STM ()
waitStreamWindowSizeSTM TVar TxFlow
txf = do
    w <- TxFlow -> WindowSize
txWindowSize (TxFlow -> WindowSize) -> STM TxFlow -> STM WindowSize
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar TxFlow -> STM TxFlow
forall a. TVar a -> STM a
readTVar TVar TxFlow
txf
    check (w > 0)