Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Simple sync #148

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 50 additions & 41 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ run cconf@ClientConfig{..} conf client = do
{ auxPossibleClientStreams = possibleClientStream ctx
}
clientCore ctx req processResponse = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
rsp <- getResponse strm
x <- processResponse rsp
adjustRxWindow ctx strm
Expand All @@ -109,7 +112,10 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
ctx@Context{..} <- setup cconf conf
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
putR req = do
strm <- sendRequest conf ctx scheme authority req
(strm, moutobj) <- makeStream ctx scheme authority req
case moutobj of
Nothing -> return ()
Just outobj -> sendRequest conf ctx strm outobj
return (streamNumber strm, strm)
get = getResponse
create = openOddStreamWait ctx
Expand Down Expand Up @@ -165,23 +171,22 @@ runH2 conf ctx runClient = do
Left () -> do
wait runningClient

sendRequest
:: Config
-> Context
makeStream
:: Context
-> Scheme
-> Authority
-> Request
-> IO Stream
sendRequest conf ctx@Context{..} scheme auth (Request req) = do
-> IO (Stream, Maybe OutObj)
makeStream ctx@Context{..} scheme auth (Request req) = do
-- Checking push promises
let hdr0 = outObjHeaders req
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
path = fromMaybe (error "sendRequest:path") $ lookup ":path" hdr0
method = fromMaybe (error "makeStream:method") $ lookup ":method" hdr0
path = fromMaybe (error "makeStream:path") $ lookup ":path" hdr0
mstrm0 <- lookupEvenCache evenStreamTable method path
case mstrm0 of
Just strm0 -> do
deleteEvenCache evenStreamTable method path
return strm0
return (strm0, Nothing)
Nothing -> do
-- Arch/Sender is originally implemented for servers where
-- the ordering of responses can be out-of-order.
Expand All @@ -200,38 +205,42 @@ sendRequest conf ctx@Context{..} scheme auth (Request req) = do
| otherwise = hdr1
req' = req{outObjHeaders = hdr2}
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
(sid, newstrm) <- openOddStreamWait ctx
sendHeaderBody conf ctx sid newstrm req'
return newstrm
(_sid, newstrm) <- openOddStreamWait ctx
return (newstrm, Just req')

sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx newstrm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx newstrm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
((var, sync), out) <-
prepareSync newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
enqueueOutputSTM outputQ out
writeTVar outputQStreamID (sid + 2)
forkManaged threadManager "H2 worker" $ syncWithSender ctx newstrm var sync
sendRequest :: Config -> Context -> Stream -> OutObj -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
forkManaged threadManager label $ do
let sid = streamNumber strm
(mnext, mtbq) <- case outObjBody of
OutBodyNone -> return (Nothing, Nothing)
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel) <- confPositionReadMaker path
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
return (Just next, Nothing)
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
return (Just next, Nothing)
OutBodyStreaming strmbdy -> do
q <- sendStreaming ctx strm $ \iface ->
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
let next = nextForStreaming q
return (Just next, Just q)
OutBodyStreamingIface strmbdy -> do
q <- sendStreaming ctx strm strmbdy
let next = nextForStreaming q
return (Just next, Just q)
let ot = OHeader outObjHeaders mnext outObjTrailers
(var, out) <- makeOutput strm ot
atomically $ do
sidOK <- readTVar outputQStreamID
check (sidOK == sid)
writeTVar outputQStreamID (sid + 2)
enqueueOutputSTM outputQ out
lc <- newLoopCheck strm mtbq
syncWithSender' ctx var lc
where
label = "H2 request sender for stream " ++ show (streamNumber strm)

sendStreaming
:: Context
Expand Down
165 changes: 43 additions & 122 deletions Network/HTTP2/H2/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ module Network.HTTP2.H2.Manager (
stopAfter,
forkManaged,
forkManagedUnmask,
timeoutKillThread,
timeoutClose,
withTimeout,
KilledByHttp2ThreadManager (..),
waitCounter0,
) where
Expand All @@ -28,14 +27,12 @@ import Imports

----------------------------------------------------------------

data Command
= Stop (MVar ()) (Maybe SomeException)
| Add ThreadId
| RegisterTimeout ThreadId T.Handle
| Delete ThreadId

-- | Manager to manage the thread and the timer.
data Manager = Manager (TQueue Command) (TVar Int) T.Manager
data Manager = Manager T.Manager (TVar ManagedThreads)

type ManagedThreads = Map ThreadId TimeoutHandle

----------------------------------------------------------------

data TimeoutHandle
= ThreadWithTimeout T.Handle
Expand All @@ -45,38 +42,23 @@ cancelTimeout :: TimeoutHandle -> IO ()
cancelTimeout (ThreadWithTimeout th) = T.cancel th
cancelTimeout ThreadWithoutTimeout = return ()

type ManagedThreads = Map ThreadId TimeoutHandle
----------------------------------------------------------------

-- | Starting a thread manager.
-- Its action is initially set to 'return ()' and should be set
-- by 'setAction'. This allows that the action can include
-- the manager itself.
start :: T.Manager -> IO Manager
start timmgr = do
q <- newTQueueIO
cnt <- newTVarIO 0
void $ forkIO $ do
labelMe "H2 thread manager"
go q Map.empty
return $ Manager q cnt timmgr
where
-- This runs in a separate thread whose ThreadId is not known by anyone
-- else, so it cannot be killed by asynchronous exceptions.
go :: TQueue Command -> ManagedThreads -> IO ()
go q threadMap0 = do
x <- atomically $ readTQueue q
case x of
Stop signalTimeoutsDisabled err -> do
kill signalTimeoutsDisabled threadMap0 err
Add newtid -> do
let threadMap = add newtid threadMap0
go q threadMap
RegisterTimeout tid h -> do
let threadMap = registerTimeout tid h threadMap0
go q threadMap
Delete oldtid -> do
threadMap <- del oldtid threadMap0
go q threadMap
start timmgr = Manager timmgr <$> newTVarIO Map.empty

----------------------------------------------------------------

data KilledByHttp2ThreadManager = KilledByHttp2ThreadManager (Maybe SomeException)
deriving (Show)

instance Exception KilledByHttp2ThreadManager where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

-- | Stopping the manager.
--
Expand All @@ -85,16 +67,17 @@ start timmgr = do
-- to cleanup in all circumstances. If an exception is caught, it is rethrown
-- after the cleanup is complete.
stopAfter :: Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter (Manager q _ _) action cleanup = do
stopAfter (Manager _timmgr var) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
signalTimeoutsDisabled <- newEmptyMVar
atomically $
writeTQueue q $
Stop signalTimeoutsDisabled (either Just (const Nothing) ma)
-- This call to takeMVar /will/ eventually succeed, because the Manager
-- thread cannot be killed (see comment on 'go' in 'start').
takeMVar signalTimeoutsDisabled
m <- atomically $ do
m0 <- readTVar var
writeTVar var Map.empty
return m0
forM_ (Map.elems m) cancelTimeout
let er = either Just (const Nothing) ma
forM_ (Map.keys m) $ \tid ->
E.throwTo tid $ KilledByHttp2ThreadManager er
case ma of
Left err -> cleanup (Just err) >> throwIO err
Right a -> cleanup Nothing >> return a
Expand All @@ -113,96 +96,34 @@ forkManaged mgr label io =
-- | Like 'forkManaged', but run action with exceptions masked
forkManagedUnmask
:: Manager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask mgr label io =
forkManagedUnmask (Manager _timmgr var) label io =
void $ mask_ $ forkIOWithUnmask $ \unmask -> E.handle handler $ do
labelMe label
addMyId mgr
incCounter mgr
tid <- myThreadId
atomically $ modifyTVar var $ Map.insert tid ThreadWithoutTimeout
-- We catch the exception and do not rethrow it: we don't want the
-- exception printed to stderr.
io unmask `catch` \(_e :: SomeException) -> return ()
deleteMyId mgr
decCounter mgr
atomically $ modifyTVar var $ Map.delete tid
where
handler (E.SomeException _) = return ()

-- | Adding my thread id to the kill-thread list on stopping.
--
-- This is not part of the public API; see 'forkManaged' instead.
addMyId :: Manager -> IO ()
addMyId (Manager q _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Add tid

-- | Deleting my thread id from the kill-thread list on stopping.
--
-- This is /only/ necessary when you want to remove the thread's ID from
-- the manager /before/ the thread terminates (thereby assuming responsibility
-- for thread cleanup yourself).
deleteMyId :: Manager -> IO ()
deleteMyId (Manager q _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Delete tid
waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _timmgr var) = atomically $ do
m <- readTVar var
check (Map.size m == 0)

----------------------------------------------------------------

add :: ThreadId -> ManagedThreads -> ManagedThreads
add tid = Map.insert tid ThreadWithoutTimeout

registerTimeout :: ThreadId -> T.Handle -> ManagedThreads -> ManagedThreads
registerTimeout tid = Map.insert tid . ThreadWithTimeout

del :: ThreadId -> ManagedThreads -> IO ManagedThreads
del tid threadMap = do
forM_ (Map.lookup tid threadMap) cancelTimeout
return $ Map.delete tid threadMap

-- | Kill all threads
--
-- We first remove all threads from the timeout manager, then signal that that
-- is complete, and finally kill all threads. This avoids a race between the
-- timeout manager and our manager: we want to ensure that the exception that
-- gets delivered is 'KilledByHttp2ThreadManager', not 'TimeoutThread'.
kill :: MVar () -> ManagedThreads -> Maybe SomeException -> IO ()
kill signalTimeoutsDisabled threadMap err = do
forM_ (Map.elems threadMap) cancelTimeout
putMVar signalTimeoutsDisabled ()
forM_ (Map.keys threadMap) $ \tid ->
E.throwTo tid $ KilledByHttp2ThreadManager err

-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO a) -> IO a
timeoutKillThread (Manager q _ tmgr) action = E.bracket register T.cancel action
withTimeout :: Manager -> (T.Handle -> IO a) -> IO a
withTimeout (Manager timmgr var) action = do
E.bracket register unregister $ \h ->
action h
where
register = do
h <- T.registerKillThread tmgr (return ())
tid <- myThreadId
atomically $ writeTQueue q (RegisterTimeout tid h)
return h

-- | Registering closer for a resource and
-- returning a timer refresher.
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose (Manager _ _ tmgr) closer = do
th <- T.register tmgr closer
return $ T.tickle th

data KilledByHttp2ThreadManager = KilledByHttp2ThreadManager (Maybe SomeException)
deriving (Show)

instance Exception KilledByHttp2ThreadManager where
toException = asyncExceptionToException
fromException = asyncExceptionFromException

----------------------------------------------------------------

incCounter :: Manager -> IO ()
incCounter (Manager _ cnt _) = atomically $ modifyTVar' cnt (+ 1)

decCounter :: Manager -> IO ()
decCounter (Manager _ cnt _) = atomically $ modifyTVar' cnt (subtract 1)

waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _ cnt _) = atomically $ do
n <- readTVar cnt
check (n < 1)
th <- T.registerKillThread timmgr $ return ()
-- overriding ThreadWithoutTimeout
atomically $ modifyTVar var $ Map.insert tid $ ThreadWithTimeout th
return th
unregister th = T.cancel th
Loading
Loading