{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    runServer,
) where

import Control.Concurrent.STM
import Data.IORef
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.HTTP.Types
import qualified System.ThreadManager as T

import Imports hiding (insert)
import Network.HTTP2.Frame
import Network.HTTP2.H2

----------------------------------------------------------------

runServer :: Config -> Server -> Launch
runServer :: Config -> Server -> Launch
runServer Config
conf Server
server ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
..} Stream
strm InpObj
req =
    ThreadManager -> String -> (Handle -> IO ()) -> IO ()
T.forkManagedTimeout ThreadManager
threadManager String
label ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
th -> do
        let req' :: InpObj
req' = Handle -> InpObj
pauseRequestBody Handle
th
            aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
            request :: Request
request = InpObj -> Request
Request InpObj
req'
        lc <- Stream -> Maybe (TBQueue StreamingChunk) -> IO LoopCheck
newLoopCheck Stream
strm Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing
        server request aux $ sendResponse conf ctx lc strm request
        adjustRxWindow ctx strm
  where
    label :: String
label = String
"H2 response sender for stream " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (Stream -> Int
streamNumber Stream
strm)
    pauseRequestBody :: Handle -> InpObj
pauseRequestBody Handle
th = InpObj
req{inpObjBody = readBody'}
      where
        readBody :: IO (ByteString, Bool)
readBody = InpObj -> IO (ByteString, Bool)
inpObjBody InpObj
req
        readBody' :: IO (ByteString, Bool)
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            bs <- IO (ByteString, Bool)
readBody
            T.resume th
            return bs

----------------------------------------------------------------

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
sendResponse
    :: Config
    -> Context
    -> LoopCheck
    -> Stream
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
sendResponse :: Config
-> Context
-> LoopCheck
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
sendResponse Config
conf Context
ctx LoopCheck
lc Stream
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = do
    mwait <- Config
-> Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO (Maybe (IO ()))
pushStream Config
conf Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
    case mwait of
        Maybe (IO ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IO ()
wait -> IO ()
wait -- all pushes are sent
    sendHeaderBody conf ctx lc strm rsp
  where
    (TokenHeaderList
_, ValueTable
reqvt) = InpObj -> (TokenHeaderList, ValueTable)
inpObjHeaders InpObj
req

----------------------------------------------------------------

pushStream
    :: Config
    -> Context
    -> Stream -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO (Maybe (IO ()))
pushStream :: Config
-> Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO (Maybe (IO ()))
pushStream Config
_ Context
_ Stream
_ ValueTable
_ [] = Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
pushStream Config
conf ctx :: Context
ctx@Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
pstrm ValueTable
reqvt [PushPromise]
pps0
    | Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
    | Bool
otherwise = do
        pushable <- Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
        if pushable
            then do
                tvar <- newTVarIO 0
                lim <- push tvar pps0 0
                if lim == 0
                    then return Nothing
                    else return $ Just $ waiter lim tvar
            else return Nothing
  where
    len :: Int
len = [PushPromise] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PushPromise]
pps0
    increment :: TVar a -> IO ()
increment TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
    -- Checking if all push are done.
    waiter :: a -> TVar a -> IO ()
waiter a
lim TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
        check (n >= lim)
    push :: TVar a -> [PushPromise] -> Int -> IO Int
push TVar a
_ [] Int
n = Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n :: Int)
    push TVar a
tvar (PushPromise
pp : [PushPromise]
pps) Int
n = do
        ThreadManager -> String -> IO () -> IO ()
T.forkManaged ThreadManager
threadManager String
"H2 server push" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            (pid, newstrm) <- Context -> Stream -> IO (Int, Stream)
makePushStream Context
ctx Stream
pstrm
            let scheme = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenScheme ValueTable
reqvt
                -- fixme: this value can be Nothing
                auth =
                    Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust
                        ( Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenAuthority ValueTable
reqvt
                            Maybe ByteString -> Maybe ByteString -> Maybe ByteString
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getFieldValue Token
tokenHost ValueTable
reqvt
                        )
                path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
                promiseRequest =
                    [ (Token
tokenMethod, ByteString
methodGet)
                    , (Token
tokenScheme, ByteString
scheme)
                    , (Token
tokenAuthority, ByteString
auth)
                    , (Token
tokenPath, ByteString
path)
                    ]
                ot = TokenHeaderList -> Int -> OutputType
OPush TokenHeaderList
promiseRequest Int
pid
                Response rsp = promiseResponse pp
            increment tvar
            lc <- newLoopCheck newstrm Nothing
            syncWithSender ctx newstrm ot lc
            sendHeaderBody conf ctx lc newstrm rsp
        TVar a -> [PushPromise] -> Int -> IO Int
push TVar a
tvar [PushPromise]
pps (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

----------------------------------------------------------------

makePushStream :: Context -> Stream -> IO (StreamId, Stream)
makePushStream :: Context -> Stream -> IO (Int, Stream)
makePushStream Context
ctx Stream
pstrm = do
    -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
    (_, newstrm) <- Context -> IO (Int, Stream)
openEvenStreamWait Context
ctx
    let pid = Stream -> Int
streamNumber Stream
pstrm
    return (pid, newstrm)

----------------------------------------------------------------

sendHeaderBody
    :: Config
    -> Context
    -> LoopCheck
    -> Stream
    -> OutObj
    -> IO ()
sendHeaderBody :: Config -> Context -> LoopCheck -> Stream -> OutObj -> IO ()
sendHeaderBody Config{Int
Buffer
SockAddr
Manager
Int -> IO ByteString
PositionReadMaker
ByteString -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: ByteString -> IO ()
confReadN :: Int -> IO ByteString
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO ByteString
confSendAll :: Config -> ByteString -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
..} Context
ctx LoopCheck
lc Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjHeaders :: OutObj -> [Header]
outObjTrailers :: OutObj -> TrailersMaker
..} = do
    (mnext, mtbq) <- case OutBody
outObjBody of
        OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyFile (FileSpec String
path FileOffset
fileoff FileOffset
bytecount) -> do
            (pread, sentinel) <- PositionReadMaker
confPositionReadMaker String
path
            let next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
            return (Just next, Nothing)
        OutBodyBuilder Builder
builder -> do
            let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
            q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface{IO ()
Maybe SomeException -> IO ()
Builder -> IO ()
forall x. IO x -> IO x
outBodyUnmask :: forall x. IO x -> IO x
outBodyPush :: Builder -> IO ()
outBodyPushFinal :: Builder -> IO ()
outBodyCancel :: Maybe SomeException -> IO ()
outBodyFlush :: IO ()
outBodyCancel :: OutBodyIface -> Maybe SomeException -> IO ()
outBodyFlush :: OutBodyIface -> IO ()
outBodyPush :: OutBodyIface -> Builder -> IO ()
outBodyPushFinal :: OutBodyIface -> Builder -> IO ()
outBodyUnmask :: OutBodyIface -> forall x. IO x -> IO x
..} -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
outBodyPush IO ()
outBodyFlush
            let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            return (Just next, Just q)
        OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
            q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm OutBodyIface -> IO ()
strmbdy
            let next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            return (Just next, Just q)
    let lc' = LoopCheck
lc{lcTBQ = mtbq}
    syncWithSender ctx strm (OHeader outObjHeaders mnext outObjTrailers) lc'

----------------------------------------------------------------

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
DynamicTable
Rate
Settings
TQueue Control
TQueue Output
SockAddr
ThreadManager
RoleInfo
Role
threadManager :: Context -> ThreadManager
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue Output
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> TVar Int
continued :: Context -> IORef (Maybe Int)
evenStreamTable :: Context -> TVar EvenStreamTable
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: ThreadManager
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
    tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    T.forkManagedTimeout threadManager label $ \Handle
th ->
        TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall a. a -> a
forall x. IO x -> IO x
id ((OutBodyIface -> IO ()) -> IO ())
-> (OutBodyIface -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface -> do
            let iface' :: OutBodyIface
iface' =
                    OutBodyIface
iface
                        { outBodyPush = \Builder
b -> do
                            Handle -> IO ()
T.pause Handle
th
                            OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface Builder
b
                            Handle -> IO ()
T.resume Handle
th
                        , outBodyPushFinal = \Builder
b -> do
                            Handle -> IO ()
T.pause Handle
th
                            OutBodyIface -> Builder -> IO ()
outBodyPushFinal OutBodyIface
iface Builder
b
                            Handle -> IO ()
T.resume Handle
th
                        }
            OutBodyIface -> IO ()
strmbdy OutBodyIface
iface'
    return tbq
  where
    label :: String
label = String
"H2 response streaming sender for " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (Stream -> Int
streamNumber Stream
strm)