Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Workerqueue, add waiting empty utility for testing #4296

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ghcide/session-loader/Development/IDE/Session.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@

import Control.Concurrent.STM.Stats (atomically, modifyTVar',
readTVar, writeTVar)
import Control.Concurrent.STM.TQueue
import Control.DeepSeq
import Control.Exception (evaluate)
import Control.Monad.IO.Unlift (MonadUnliftIO)
Expand All @@ -103,7 +102,8 @@
import qualified Data.Set as OS
import Database.SQLite.Simple
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (awaitRunInThread,
import Development.IDE.Core.WorkerThread (WorkerQueue,
awaitRunInThread,
withWorkerQueue)
import qualified Development.IDE.GHC.Compat.Util as Compat
import Development.IDE.Session.Diagnostics (renderCradleError)
Expand Down Expand Up @@ -421,7 +421,7 @@
-- components mapping to the same hie.yaml file are mapped to the same
-- HscEnv which is updated as new components are discovered.

loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> WorkerQueue (IO ()) -> IO (Action IdeGhcSession)
loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do
let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory]
cradle_files <- newIORef []
Expand Down Expand Up @@ -638,7 +638,7 @@
[] -> error $ "GHC version could not be parsed: " <> version
((runTime, _):_)
| compileTime == runTime -> do
atomicModifyIORef' cradle_files (\xs -> (cfp:xs,()))

Check warning on line 641 in ghcide/session-loader/Development/IDE/Session.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in loadSessionWithOptions in module Development.IDE.Session: Use atomicModifyIORef'_ ▫︎ Found: "atomicModifyIORef' cradle_files (\\ xs -> (cfp : xs, ()))" ▫︎ Perhaps: "atomicModifyIORef'_ cradle_files ((:) cfp)"
session (hieYaml, toNormalizedFilePath' cfp, opts, libDir)
| otherwise -> return (([renderPackageSetupException cfp GhcVersionMismatch{..}], Nothing),[])
-- Failure case, either a cradle error or the none cradle
Expand Down Expand Up @@ -886,7 +886,7 @@
x <- bagToList $ mapBag errMsgDiagnostic $ unionManyBags $ map Compat.getMessages closure_errs
DriverHomePackagesNotClosed us <- pure x
pure us
isBad ci = (homeUnitId_ (componentDynFlags ci)) `OS.member` bad_units

Check warning on line 889 in ghcide/session-loader/Development/IDE/Session.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Suggestion in newComponentCache in module Development.IDE.Session: Redundant bracket ▫︎ Found: "(homeUnitId_ (componentDynFlags ci)) `OS.member` bad_units" ▫︎ Perhaps: "homeUnitId_ (componentDynFlags ci) `OS.member` bad_units"
-- Whenever we spin up a session on Linux, dynamically load libm.so.6
-- in. We need this in case the binary is statically linked, in which
-- case the interactive session will fail when trying to load
Expand Down
4 changes: 3 additions & 1 deletion ghcide/src/Development/IDE/Core/Compile.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@
import Data.Time (UTCTime (..))
import Data.Tuple.Extra (dupe)
import Debug.Trace
import Development.IDE.Core.FileStore (resetInterfaceStore)

Check warning on line 72 in ghcide/src/Development/IDE/Core/Compile.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in module Development.IDE.Core.Compile: Use fewer imports ▫︎ Found: "import Development.IDE.Core.FileStore ( resetInterfaceStore )\nimport Development.IDE.Core.FileStore ( shareFilePath )\n" ▫︎ Perhaps: "import Development.IDE.Core.FileStore\n ( resetInterfaceStore, shareFilePath )\n"
import Development.IDE.Core.Preprocessor
import Development.IDE.Core.ProgressReporting (ProgressReporting (..))

Check warning on line 74 in ghcide/src/Development/IDE/Core/Compile.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Warning in module Development.IDE.Core.Compile: Use fewer imports ▫︎ Found: "import Development.IDE.Core.ProgressReporting\n ( ProgressReporting(..) )\nimport Development.IDE.Core.ProgressReporting\n ( ProgressReporting(..), progressReportingOutsideState )\n" ▫︎ Perhaps: "import Development.IDE.Core.ProgressReporting\n ( ProgressReporting(..),\n ProgressReporting(..),\n progressReportingOutsideState )\n"
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Shake
import Development.IDE.Core.Tracing (withTrace)
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
import Development.IDE.GHC.Compat hiding (assert,
loadInterface,
parseHeader,
Expand All @@ -84,6 +85,7 @@
import qualified Development.IDE.GHC.Compat as Compat
import qualified Development.IDE.GHC.Compat as GHC
import qualified Development.IDE.GHC.Compat.Util as Util
import Development.IDE.Core.ProgressReporting (ProgressReporting (..), progressReportingOutsideState)
import Development.IDE.GHC.CoreFile
import Development.IDE.GHC.Error
import Development.IDE.GHC.Orphans ()
Expand Down Expand Up @@ -795,7 +797,7 @@
-- hiedb doesn't use the Haskell src, so we clear it to avoid unnecessarily keeping it around
let !hf' = hf{hie_hs_src = mempty}
modifyTVar' indexPending $ HashMap.insert srcPath hash
writeTQueue indexQueue $ \withHieDb -> do
writeWorkerQueue indexQueue $ \withHieDb -> do
-- We are now in the worker thread
-- Check if a newer index of this file has been scheduled, and if so skip this one
newerScheduled <- atomically $ do
Expand Down Expand Up @@ -956,7 +958,7 @@


convImport (L _ i) = (
(ideclPkgQual i)

Check warning on line 961 in ghcide/src/Development/IDE/Core/Compile.hs

View workflow job for this annotation

GitHub Actions / Hlint check run

Suggestion in getModSummaryFromImports in module Development.IDE.Core.Compile: Redundant bracket ▫︎ Found: "((ideclPkgQual i), reLoc $ ideclName i)" ▫︎ Perhaps: "(ideclPkgQual i, reLoc $ ideclName i)"
, reLoc $ ideclName i)

msrImports = implicit_imports ++ imps
Expand Down
4 changes: 2 additions & 2 deletions ghcide/src/Development/IDE/Core/FileStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module Development.IDE.Core.FileStore(
) where

import Control.Concurrent.STM.Stats (STM, atomically)
import Control.Concurrent.STM.TQueue (writeTQueue)
import Control.Exception
import Control.Monad.Extra
import Control.Monad.IO.Class
Expand All @@ -40,6 +39,7 @@ import Development.IDE.Core.IdeConfiguration (isWorkspaceFile)
import Development.IDE.Core.RuleTypes
import Development.IDE.Core.Shake hiding (Log)
import qualified Development.IDE.Core.Shake as Shake
import Development.IDE.Core.WorkerThread (writeWorkerQueue)
import Development.IDE.GHC.Orphans ()
import Development.IDE.Graph
import Development.IDE.Import.DependencyInformation
Expand Down Expand Up @@ -247,7 +247,7 @@ typecheckParentsAction recorder nfp = do
setSomethingModified :: VFSModified -> IdeState -> String -> IO [Key] -> IO ()
setSomethingModified vfs state reason actionBetweenSession = do
-- Update database to remove any files that might have been renamed/deleted
atomically $ writeTQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
atomically $ writeWorkerQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles)
void $ restartShakeSession (shakeExtras state) vfs reason [] actionBetweenSession

registerFileWatches :: [String] -> LSP.LspT Config IO Bool
Expand Down
10 changes: 5 additions & 5 deletions ghcide/src/Development/IDE/Core/Shake.hs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ data HieDbWriter
-- | Actions to queue up on the index worker thread
-- The inner `(HieDb -> IO ()) -> IO ()` wraps `HieDb -> IO ()`
-- with (currently) retry functionality
type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ())
type IndexQueue = WorkerQueue (((HieDb -> IO ()) -> IO ()) -> IO ())

data ThreadQueue = ThreadQueue {
tIndexQueue :: IndexQueue
, tRestartQueue :: TQueue (IO ())
, tLoaderQueue :: TQueue (IO ())
, tRestartQueue :: WorkerQueue (IO ())
, tLoaderQueue :: WorkerQueue (IO ())
}

-- Note [Semantic Tokens Cache Location]
Expand Down Expand Up @@ -326,9 +326,9 @@ data ShakeExtras = ShakeExtras
-- ^ Default HLS config, only relevant if the client does not provide any Config
, dirtyKeys :: TVar KeySet
-- ^ Set of dirty rule keys since the last Shake run
, restartQueue :: TQueue (IO ())
, restartQueue :: WorkerQueue (IO ())
-- ^ Queue of restart actions to be run.
, loaderQueue :: TQueue (IO ())
, loaderQueue :: WorkerQueue (IO ())
-- ^ Queue of loader actions to be run.
}

Expand Down
75 changes: 65 additions & 10 deletions ghcide/src/Development/IDE/Core/WorkerThread.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@ Description : This module provides an API for managing worker threads in the IDE
see Note [Serializing runs in separate thread]
-}
module Development.IDE.Core.WorkerThread
(withWorkerQueue, awaitRunInThread)
(withWorkerQueue
, awaitRunInThread
, withWorkerQueueOfOne
, WorkerQueue
, writeWorkerQueue
, waitUntilWorkerQueueEmpty)
where

import Control.Concurrent.Async (withAsync)
import Control.Concurrent.STM
import Control.Concurrent.Strict (newBarrier, signalBarrier,
waitBarrier)
import Control.Monad (forever)
import Control.Exception (finally)
import Control.Monad (forever, unless)
import Control.Monad.Cont (ContT (ContT))
import Control.Monad.IO.Class (liftIO)

{-
Note [Serializing runs in separate thread]
Expand All @@ -28,27 +35,75 @@ Originally we used various ways to implement this, but it was hard to maintain a
Moreover, we can not stop these threads uniformly when we are shutting down the server.
-}

-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
data WorkerQueue a = WorkerQueueOfOne (TMVar a) | WorkerQueueOfMany (TQueue a)

-- | peekWorkerQueue returns the next action in the queue without removing it.
peekWorkerQueue :: WorkerQueue a -> STM a
peekWorkerQueue (WorkerQueueOfOne tVar) = readTMVar tVar
peekWorkerQueue (WorkerQueueOfMany tQueue) = peekTQueue tQueue

-- | readWorkerQueue returns the next action in the queue and removes it.
readWorkerQueue :: WorkerQueue a -> STM a
readWorkerQueue (WorkerQueueOfOne tVar) = takeTMVar tVar
readWorkerQueue (WorkerQueueOfMany tQueue) = readTQueue tQueue

writeWorkerQueue :: WorkerQueue a -> a -> STM ()
writeWorkerQueue (WorkerQueueOfOne tVar) action = putTMVar tVar action
writeWorkerQueue (WorkerQueueOfMany tQueue) action = writeTQueue tQueue action

-- | waitUntilWorkerQueueEmpty blocks until the worker queue is empty.
waitUntilWorkerQueueEmpty :: WorkerQueue a -> STM ()
waitUntilWorkerQueueEmpty (WorkerQueueOfOne tVar) = do
isEmpty <- isEmptyTMVar tVar
unless isEmpty retry
waitUntilWorkerQueueEmpty (WorkerQueueOfMany queue) = do
isEmpty <- isEmptyTQueue queue
unless isEmpty retry

newWorkerQueue :: STM (WorkerQueue a)
newWorkerQueue = WorkerQueueOfMany <$> newTQueue

newWorkerQueueOfOne :: STM (WorkerQueue a)
newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar

-- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker
-- thread which polls the queue for requests and runs the given worker
-- function on them.
withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t)
withWorkerQueue workerAction = ContT $ \mainAction -> do
q <- newTQueueIO
withWorkerQueue :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueue workerAction = do
q <- liftIO $ atomically newWorkerQueue
runWorkerQueue q workerAction

-- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time.
-- and one action can only be queued after the previous action has been done.
-- this is useful when we want to cancel the action waiting to be enqueue if it's thread is cancelled.
-- e.g. session loading in session loader. When a shake session is restarted
-- , we want to cancel the previous pending session loading.
-- since the hls-graph can handle the retrying of the session loading.
withWorkerQueueOfOne :: (t -> IO a) -> ContT () IO (WorkerQueue t)
withWorkerQueueOfOne workerAction = do
q <- liftIO $ atomically newWorkerQueueOfOne
runWorkerQueue q workerAction

runWorkerQueue :: WorkerQueue t -> (t -> IO a) -> ContT () IO (WorkerQueue t)
runWorkerQueue q workerAction = ContT $ \mainAction -> do
withAsync (writerThread q) $ \_ -> mainAction q
where
writerThread q =
forever $ do
l <- atomically $ readTQueue q
workerAction l
-- peek the action from the queue, run it and then remove it from the queue
l <- atomically $ peekWorkerQueue q
workerAction l `finally` atomically (readWorkerQueue q)


-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
-- and then blocks until the result is computed.
awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result
awaitRunInThread :: WorkerQueue (IO ()) -> IO result -> IO result
awaitRunInThread q act = do
-- Take an action from TQueue, run it and
-- use barrier to wait for the result
barrier <- newBarrier
atomically $ writeTQueue q $ do
atomically $ writeWorkerQueue q $ do
res <- act
signalBarrier barrier res
waitBarrier barrier
5 changes: 3 additions & 2 deletions ghcide/src/Development/IDE/LSP/LanguageServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import Control.Monad.Trans.Cont (evalContT)
import Development.IDE.Core.IdeConfiguration
import Development.IDE.Core.Shake hiding (Log)
import Development.IDE.Core.Tracing
import Development.IDE.Core.WorkerThread (withWorkerQueue)
import Development.IDE.Core.WorkerThread (withWorkerQueue,
withWorkerQueueOfOne)
import qualified Development.IDE.Session as Session
import Development.IDE.Types.Shake (WithHieDb,
WithHieDbShield (..))
Expand Down Expand Up @@ -261,7 +262,7 @@ handleInit recorder defaultRoot getHieDbLoc getIdeState lifetime exitClientMsg c
runWithWorkerThreads :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO ()
runWithWorkerThreads recorder dbLoc f = evalContT $ do
sessionRestartTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueue id
sessionLoaderTQueue <- withWorkerQueueOfOne id
(WithHieDbShield hiedb, threadQueue) <- runWithDb recorder dbLoc
liftIO $ f hiedb (ThreadQueue threadQueue sessionRestartTQueue sessionLoaderTQueue)

Expand Down
Loading