{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.ThreadManager as T
import Text.Read (readMaybe)

import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { ClientConfig -> Scheme
scheme :: Scheme
    -- ^ https or http
    , ClientConfig -> Authority
authority :: Authority
    -- ^ Server name
    , ClientConfig -> Int
cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , ClientConfig -> Int
connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , ClientConfig -> Settings
settings :: Settings
    -- ^ Settings
    }
    deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, Int -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(Int -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(Int -> a -> ShowS) -> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ClientConfig -> ShowS
showsPrec :: Int -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 16777216, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing, pingRateLimit = 10, emptyFrameRateLimit = 4, settingsRateLimit = 4, rstRateLimit = 4}}
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme :: Scheme
scheme = Scheme
"http"
        , authority :: Authority
authority = Authority
"localhost"
        , cacheLimit :: Int
cacheLimit = Int
64
        , connectionWindowSize :: Int
connectionWindowSize = Int
defaultMaxData
        , settings :: Settings
settings = Settings
defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} Config
conf Client a
client = do
    ctx <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    runH2 conf ctx $ runClient ctx
  where
    serverMaxStreams :: Context -> IO Int
serverMaxStreams Context
ctx = do
        mx <- Settings -> Maybe Int
maxConcurrentStreams (Settings -> Maybe Int) -> IO Settings -> IO (Maybe Int)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
        case mx of
            Maybe Int
Nothing -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
forall a. Bounded a => a
maxBound
            Just Int
x -> Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
    possibleClientStream :: Context -> IO Int
possibleClientStream Context
ctx = do
        x <- Context -> IO Int
serverMaxStreams Context
ctx
        n <- oddConc <$> readTVarIO (oddStreamTable ctx)
        return (x - n)
    aux :: Context -> Aux
aux Context
ctx =
        Aux
            { auxPossibleClientStreams :: IO Int
auxPossibleClientStreams = Context -> IO Int
possibleClientStream Context
ctx
            }
    clientCore :: Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Request
req Response -> IO b
processResponse = do
        (strm, moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
        case moutobj of
            Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
False
        rsp <- getResponse strm
        x <- processResponse rsp
        adjustRxWindow ctx strm
        return x
    runClient :: Context -> IO a
runClient Context
ctx = Client a
client (Context -> Request -> (Response -> IO r) -> IO r
forall {b}. Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
..} ClientIO -> IO (IO a)
action = do
    ctx@Context{..} <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    let putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
        putR Request
req = do
            (strm, moutobj) <- Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream Context
ctx Scheme
scheme Authority
authority Request
req
            case moutobj of
                Maybe OutObj
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just OutObj
outobj -> Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config
conf Context
ctx Stream
strm OutObj
outobj Bool
True
            return (streamNumber strm, strm)
        get = Stream -> IO Response
getResponse
        create = Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
    runClient <-
        action $ ClientIO confMySockAddr confPeerSockAddr putR get putB create
    runH2 conf ctx runClient

getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
    mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a. MVar a -> IO a
takeMVar (MVar (Either SomeException InpObj)
 -> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
    case mRsp of
        Left SomeException
err -> SomeException -> IO Response
forall e a. (HasCallStack, Exception e) => e -> IO a
throwIO SomeException
err
        Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

setup :: ClientConfig -> Config -> IO Context
setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{Int
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> Int
connectionWindowSize :: ClientConfig -> Int
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: Int
connectionWindowSize :: Int
settings :: Settings
..} conf :: Config
conf@Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
    let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
    ctx <-
        RoleInfo
-> Config -> Int -> Int -> Settings -> Manager -> IO Context
newContext
            RoleInfo
clientInfo
            Config
conf
            Int
cacheLimit
            Int
connectionWindowSize
            Settings
settings
            Manager
confTimeoutManager
    exchangeSettings ctx
    return ctx

runH2 :: Config -> Context -> IO a -> IO a
runH2 :: forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient = do
    ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
forall a.
ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
T.stopAfter ThreadManager
mgr (IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try IO a
runAll IO (Either SomeException a)
-> (Either SomeException a -> IO a) -> IO a
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Config -> Either SomeException a -> IO a
forall a. Config -> Either SomeException a -> IO a
closureClient Config
conf) ((Maybe SomeException -> IO ()) -> IO a)
-> (Maybe SomeException -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \Maybe SomeException
res ->
        TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) Maybe SomeException
res
  where
    mgr :: ThreadManager
mgr = Context -> ThreadManager
threadManager Context
ctx
    runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
    runSender :: IO ()
runSender = Context -> Config -> IO ()
frameSender Context
ctx Config
conf
    runBackgroundThreads :: IO ()
runBackgroundThreads = do
        Authority -> IO ()
labelMe Authority
"H2 runBackgroundThreads"
        IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO ()
concurrently_ IO ()
runReceiver IO ()
runSender
    runAll :: IO a
runAll = do
        er <- IO () -> IO a -> IO (Either () a)
forall a b. IO a -> IO b -> IO (Either a b)
race IO ()
runBackgroundThreads IO a
runClient
        case er of
            Left () -> IO a
forall a. HasCallStack => a
undefined
            Right a
r -> a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r

makeStream
    :: Context
    -> Scheme
    -> Authority
    -> Request
    -> IO (Stream, Maybe OutObj)
makeStream :: Context
-> Scheme -> Authority -> Request -> IO (Stream, Maybe OutObj)
makeStream ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Scheme
scheme Authority
auth (Request OutObj
req) = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"makeStream:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
    case mstrm0 of
        Just Stream
strm0 -> do
            TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
            (Stream, Maybe OutObj) -> IO (Stream, Maybe OutObj)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream
strm0, Maybe OutObj
forall a. Maybe a
Nothing)
        Maybe Stream
Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            let isIPv6 :: Bool
isIPv6 = Maybe IPv6 -> Bool
forall a. Maybe a -> Bool
isJust (Authority -> Maybe IPv6
forall a. Read a => Authority -> Maybe a
readMaybe Authority
auth :: Maybe IPv6)
                auth' :: Scheme
auth'
                    | Bool
isIPv6 = Scheme
"[" Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Authority -> Scheme
UTF8.fromString Authority
auth Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Scheme
"]"
                    | Bool
otherwise = Authority -> Scheme
UTF8.fromString Authority
auth
            let hdr1, hdr2 :: [Header]
                hdr1 :: [Header]
hdr1
                    | Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
                    | Bool
otherwise = [Header]
hdr0
                hdr2 :: [Header]
hdr2
                    | Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Scheme
auth') Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
                    | Bool
otherwise = [Header]
hdr1
                req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (_sid, newstrm) <- Context -> IO (Int, Stream)
openOddStreamWait Context
ctx
            return (newstrm, Just req')

sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{Int
Buffer
SockAddr
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: Scheme -> IO ()
confReadN :: Int -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} Bool
io = do
    let sid :: Int
sid = Stream -> Int
streamNumber Stream
strm
    (mnext, mtbq) <- case OutBody
outObjBody of
        OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyFile (FileSpec Authority
path FileOffset
fileoff FileOffset
bytecount) -> do
            (pread, sentinel) <- PositionReadMaker
confPositionReadMaker Authority
path
            let next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
            return (Just next, Nothing)
        OutBodyBuilder Builder
builder -> do
            let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
            q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
                OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
            let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            return (Just next, Just q)
        OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
            q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm OutBodyIface -> IO ()
strmbdy
            let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            return (Just next, Just q)
    let ot = [Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers
    if io
        then do
            let out = Context -> Stream -> OutputType -> Output
makeOutputIO Context
ctx Stream
strm OutputType
ot
            pushOutput sid out
        else do
            (pop, out) <- makeOutput strm ot
            pushOutput sid out
            lc <- newLoopCheck strm mtbq
            T.forkManaged threadManager label $ syncWithSender' ctx pop lc
  where
    label :: Authority
label = Authority
"H2 request sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)
    pushOutput :: Int -> Output -> IO ()
pushOutput Int
sid Output
out = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        sidOK <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
        check (sidOK == sid)
        writeTVar outputQStreamID (sid + 2)
        enqueueOutputSTM outputQ out

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
    tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    T.forkManagedUnmask threadManager label $ \forall x. IO x -> IO x
unmask ->
        TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall x. IO x -> IO x
unmask OutBodyIface -> IO ()
strmbdy
    return tbq
  where
    label :: Authority
label = Authority
"H2 request streaming sender for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> Authority
forall a. Show a => a -> Authority
show (Stream -> Int
streamNumber Stream
strm)

exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} = do
    connRxWS <- RxFlow -> Int
rxfBufSize (RxFlow -> Int) -> IO RxFlow -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
    let frames = Settings -> Int -> [Scheme]
makeNegotiationFrames Settings
mySettings Int
connRxWS
        setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
    writeIORef myFirstSettings True
    enqueueControl controlQ setframe

data ClientIO = ClientIO
    { ClientIO -> SockAddr
cioMySockAddr :: SockAddr
    , ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
    , ClientIO -> Request -> IO (Int, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
    , ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
    , ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
    , ClientIO -> IO (Int, Stream)
cioCreateStream :: IO (StreamId, Stream)
    }