Skip to content

Commit

Permalink
Implement client-side timeouts
Browse files Browse the repository at this point in the history
Closes #221.
  • Loading branch information
edsko committed Jan 17, 2025
1 parent be0cf48 commit ba7d442
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 2 deletions.
71 changes: 71 additions & 0 deletions grapesy/src/Network/GRPC/Client/Call.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ module Network.GRPC.Client.Call (
, recvInitialResponse
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.Thread.Delay qualified as UnboundedDelays
import Control.Monad
import Control.Monad.Catch
import Control.Monad.IO.Class
Expand Down Expand Up @@ -104,6 +106,13 @@ data Call rpc = SupportsClientRpc rpc => Call {
-- If there are still /inbound/ messages upon leaving the scope of 'withRPC' no
-- exception is raised (but the call is nonetheless still closed, and the server
-- handler will be informed that the client has disappeared).
--
-- Note on timeouts: if a timeout is specified for the call (either through
-- 'callTimeout' or through 'connDefaultTimeout'), when the timeout is reached
-- the RPC is cancelled; any further attempts to receive or send messages will
-- result in a 'GrpcException' with 'GrpcDeadlineExceeded'. As per the gRPC
-- specification, this does /not/ rely on the server; this does mean that the
-- same deadline also applies if the /client/ is slow (rather than the server).
withRPC :: forall rpc m a.
(MonadMask m, MonadIO m, SupportsClientRpc rpc, HasCallStack)
=> Connection -> CallParams rpc -> Proxy rpc -> (Call rpc -> m a) -> m a
Expand Down Expand Up @@ -153,6 +162,67 @@ startRPC conn _ callParams = do
serverClosedConnection
flowStart

-- The spec mandates that
--
-- > If a server has gone past the deadline when processing a request, the
-- > client will give up and fail the RPC with the DEADLINE_EXCEEDED status.
--
-- and also that the deadline applies when when wait-for-ready semantics is
-- used.
--
-- We have to be careful implementing this. In particular, we definitely
-- don't want to impose the timeout on the /client/ (that is, we should not
-- force the client to exit the scope of 'withRPC' within the timeout).
-- Instead, we work a thread that cancels the RPC after the timeout expires;
-- this means that /if/ the client that attempts to communicate with the
-- server after the timeout, only then will it receive an exception.
--
-- The thread we spawn here is cleaned up by the monitor thread (below).
--
-- See
--
-- o <https://grpc.io/docs/guides/deadlines/>
-- o <https://grpc.io/docs/guides/wait-for-ready/>
mClientSideTimeout <-
case callTimeout callParams of
Nothing -> return Nothing
Just t -> fmap Just $ forkLabelled "grapesy:clientSideTimeout" $ do
UnboundedDelays.delay (timeoutToMicro t)
let timeout :: SomeException
timeout = toException $ GrpcException {
grpcError = GrpcDeadlineExceeded
, grpcErrorMessage = Nothing
, grpcErrorMetadata = []
}

-- We recognized client-side that the timeout we imposed on the server
-- has passed. Acting on this is however tricky:
--
-- o A call to 'closeRPC' will only terminate the /outbound/ thread;
-- the idea is the inbound thread might still be reading in-flight
-- messages, and it will terminate once the last message is read or
-- the thread notices a broken connection.
-- o Unfortunately, this does not work in the timeout case: /if/ the
-- outbound thread has not yet terminated (that is, the client has
-- not yet sent their final message), then calling 'closeRPC' will
-- result in a RST_STREAM being sent to the server, which /should/
-- result in the inbound connection being closed also, but may not,
-- in the case of a non-compliant server.
-- o Worse, if the client /did/ already send their final message, the
-- outbound thread has already terminated, no RST_STREAM will be
-- sent, and the we will continue to wait for messages from the
-- server.
--
-- Ideally we'd inform the receiving thread that a timeout has been
-- reached and to "continue until it would block", but that is hard
-- to do. So instead we just kill the receiving thread, which means
-- that once the timeout is reached, the client will not be able to
-- receive any further messages (even if that is because the /client/
-- was slow, rather than the server).

void $ Thread.cancelThread (Session.channelInbound channel) timeout
closeRPC channel cancelRequest $ ExitCaseException timeout

-- Spawn a thread to monitor the connection, and close the new channel when
-- the connection is closed. To prevent a memory leak by hanging on to the
-- channel for the lifetime of the connection, the thread also terminates in
Expand All @@ -163,6 +233,7 @@ startRPC conn _ callParams = do
(Session.channelInbound channel))
`orElse`
(Right <$> readTMVar connClosed)
forM_ mClientSideTimeout killThread
case status of
Left _ -> return () -- Channel closed before the connection
Right mErr -> do
Expand Down
41 changes: 41 additions & 0 deletions grapesy/test-grapesy/Test/Sanity/BrokenDeployments.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

module Test.Sanity.BrokenDeployments (tests) where

import Control.Concurrent
import Control.Exception
import Data.ByteString.Char8 qualified as BS.Strict.Char8
import Data.ByteString.UTF8 qualified as BS.Strict.UTF8
Expand Down Expand Up @@ -54,6 +55,9 @@ tests = testGroup "Test.Sanity.BrokenDeployments" [
, testGroup "Undefined" [
testCase "output" test_undefinedOutput
]
, testGroup "Timeout" [
testCase "serverIgnoresTimeout" test_serverIgnoresTimeout
]
]

connParams :: Client.ConnParams
Expand Down Expand Up @@ -380,3 +384,40 @@ test_undefinedOutput = do
if isFirst
then return $ throw $ DeliberateException (userError "uhoh")
else return $ defMessage & #id .~ req ^. #id
{-------------------------------------------------------------------------------
Timeouts
-------------------------------------------------------------------------------}

-- | Check that timeouts don't depend on the server
--
-- When a timeout is set for an RPC, the server should respect it, but the
-- client should not /depend/ on the server respecting it.
--
-- See also <https://github.com/well-typed/grapesy/issues/221>.
test_serverIgnoresTimeout :: Assertion
test_serverIgnoresTimeout = respondWithIO response $ \addr -> do
mResp :: Either GrpcException
(StreamElem NoMetadata (Proto PongMessage)) <- try $
Client.withConnection connParams (Client.ServerInsecure addr) $ \conn ->
Client.withRPC conn callParams (Proxy @Ping) $ \call -> do
Client.sendFinalInput call defMessage
Client.recvOutput call
case mResp of
Left e | grpcError e == GrpcDeadlineExceeded ->
return ()
Left e ->
assertFailure $ "unexpected error: " ++ show e
Right _ ->
assertFailure "Timeout did not trigger"
where
response :: IO Response
response = do
threadDelay 10_000_000
return def

callParams :: Client.CallParams Ping
callParams = def {
Client.callTimeout = Just $
Client.Timeout Client.Millisecond (Client.TimeoutValue 100)
}

10 changes: 8 additions & 2 deletions grapesy/test-grapesy/Test/Util/RawTestServer.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Test.Util.RawTestServer
( -- * Raw test server
respondWith
, respondWithIO

-- * Abstract response type
, Response(..)
Expand Down Expand Up @@ -35,7 +36,7 @@ withTestServer server k = do
ServerConfig {
serverInsecure = Just $ InsecureConfig {
insecureHost = Just "127.0.0.1"
, insecurePort = 0
, insecurePort = 50051
}
, serverSecure = Nothing
}
Expand All @@ -51,7 +52,12 @@ withTestServer server k = do

-- | Server that responds with the given 'Response', independent of the request
respondWith :: Response -> (Client.Address -> IO a) -> IO a
respondWith response = withTestServer $ \_req _aux respond ->
respondWith resp = respondWithIO (return resp)

-- | Version of 'respondWith' that constructs the response
respondWithIO :: IO Response -> (Client.Address -> IO a) -> IO a
respondWithIO mkResponse = withTestServer $ \_req _aux respond -> do
response <- mkResponse
respond (toHTTP2Response response) []

data Response = Response {
Expand Down

0 comments on commit ba7d442

Please sign in to comment.