Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop MultiProcessing #239

Merged
merged 10 commits into from
Feb 28, 2024
Merged

Drop MultiProcessing #239

merged 10 commits into from
Feb 28, 2024

Conversation

juliancarrivick
Copy link
Contributor

This builds off work that @rtobar started on in multiproc-drop-apps, with some slight renaming and simplifying to use ProcessPoolExecutor instead of manually managing a pool of processes. I've also fixed some unrelated tests that started failing: NGAS (updated domain, and skipped tests) and lgweb (a newer version of Pydantic seems to be stricter).

If no node manager is present, drops will run synchronously, otherwise they will use the provided DropRunner, which will either do work in threads or processes. To avoid complicated state synchronisation across processes, it's actually the drop's run() method that gets run in the DropRunner, not the whole execute() method.

Unfortunately this complicates the implementation of AppDROP.async_execute() which doesn't use the provided DropRunner. This is for two reasons: firstly we can encounter deadlocks e.g. In the case where the thread pool is of size 1 async_execute() submits to the pool, waiting for execute() to finish, but execute() can't run on the thread pool until there is a free slot. Secondly, running the drops using multiprocessing requires some AppDROP specific logic that doesn't generalise well for light waiting like async_execute() is trying to do. This is also why I renamed @rtobar's original WorkerPool to DropRunner.

I reverted to the previous logic (with some minor refactoring), running async_execute() as a daemon thread, however this is a problem since it is no longer running on the thread pool if it is available (since it might be a process pool now!) and this causes a memory leak that will scale as we execute new drops. I left it as is to push the multiprocessing through, but this should be probably addressed soon. Maybe the node manager has a secondary thread pool for this sort of thing, but it also seems wasteful to have a whole thread that essentially will block on the run() method (did somebody say asyncio?). Anyway, I figure that's probably a discussion to have outside of this PR.

@juliancarrivick juliancarrivick self-assigned this Jan 15, 2024
@coveralls
Copy link

Coverage Status

coverage: 79.991% (-0.5%) from 80.522%
when pulling 8b97854 on multiproc-drop-apps-julian
into 44edda3 on master.

Copy link
Contributor

@rtobar rtobar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @juliancarrivick for what looks like very good, solid work. I was fairly familiar with the changes of course, and I didn't notice anything too strange. I left a couple of questions/comments though, most likely it's stuff I messed up during my original commits, feel free to blame me 😄

daliuge-engine/dlg/apps/app_base.py Show resolved Hide resolved
daliuge-engine/dlg/drop.py Show resolved Hide resolved
daliuge-engine/dlg/drop.py Outdated Show resolved Hide resolved
daliuge-engine/dlg/manager/node_manager.py Show resolved Hide resolved
daliuge-engine/dlg/manager/node_manager.py Outdated Show resolved Hide resolved
@juliancarrivick juliancarrivick force-pushed the multiproc-drop-apps-julian branch from 8b97854 to 7df7fab Compare February 28, 2024 02:45
rtobar and others added 7 commits February 28, 2024 10:46
Signed-off-by: Rodrigo Tobar <[email protected]>
Calling random.randint(1e6) is deprecated, as 1e6 is a float, not an
integer. Likewise, threading.Event.isSet is deprecated in favour of
is_set, which we were using in most places anyway.

Signed-off-by: Rodrigo Tobar <[email protected]>
There is no need to re-compile these regular expressions each time we
create a drop, so this should save us some memory and CPU.

Signed-off-by: Rodrigo Tobar <[email protected]>
This object is a simple dataclass that holds the information that
DropProxy objects need to perform their duties. This is the *static*
information -- the DropProxy still requires a RPCClient to actually
interact with a remote RPC server.

This little utility class encapsulates some of the behavior that was
previously found in the dynlib module, where we setup proxies for
newly-created processes. While this isn't a great change in itself, it
prepares the codebase for a much bigger change: the introduction of
subprocesses for the effective execution of drop apps.

Signed-off-by: Rodrigo Tobar <[email protected]>
Drops stored a reference to the session that contained them in the
_dlg_session attribute. This attribute was then later used to extract
the ID of such session via *its* sessionID attribute. The existence of
this ID is then seen in a number of places across the code, causing a
number of effects (setting environment variables, logging the session
ID, etc).

A closer inspection to the code revealed that the sessionID attribute
was the only attribute ever read from the drop._dlg_session attribute.
Thus, storing the full session object is unnecessary. While during
normal usage this doesn't matter much, there is a negative effect on
serialisation of drops, which cannot be achieved because the Session
object bound to one of its attributes isn't serialisable (it holds not
only a lock, but also a reference to the Node Manager, which contains
open file descriptors, thread pools, and more).

This commit removes the internal _dlg_session attribute from the
AbstractDROP class, and replaces it with a _dlg_session_id. To make
things easier overall we default its value to an empty string, both when
the drops are created directly (e.g., MyDrop()) and when they are
constructed from the graph_loader module.

Signed-off-by: Rodrigo Tobar <[email protected]>
Similarly to how each drop doesn't need full access to its session, but
only needs to know about the session ID, full access to the RPC server
hosting the drop isn't necessary either, only its endpoint. The latter
is used to create drop proxies on newly spawned processes running app
drops so they can contact their inputs/outputs. This is currently needed
by the dynlib module, which does such spawning, but we want to move to a
multiprocessing world where most (if not all) app drops execute in
separate processes.

This commit removes the _rpc_server attribute injected by the Session
object into each drop, and referencing the full NodeManager, and
replaces it with a simpler _rpc_endpoint that simply contains the
(host,port) tuple needed to contact the RPC server.

Signed-off-by: Rodrigo Tobar <[email protected]>
By extracting this into an ABC we can change the implementation (by
default it will occur synchronously, but in the context of a
NodeManager, it will execute on a ThreadPool). This sets the stage for
an implementation that utilises Processes for true parrallelisation.

If a drop is asynchronously executed a seperate, daemon thread is
created to wait for execution to finish. This job shouldn't be run on
the same pool as the DropRunner as it is easy to deadlock by running out
of executors in the implementing pool (e.g. a ThreadPool with
max_executors=1, async_execute() is submitted, takes up the single
Thread prior to the actual run() method being submitted to the pool. In
this case run() will never execute as there is never a free thread and
async_execute() blocks forever). Note that the daemon thread will not be
terminated until the process exits, so this causes a memory leak and
will need to be addressed in future.
@juliancarrivick
Copy link
Contributor Author

Just realised this isn't running on top of the latest master, hopefully that fixes the CI failures...

@juliancarrivick juliancarrivick force-pushed the multiproc-drop-apps-julian branch from 7df7fab to c0ee839 Compare February 28, 2024 02:52
So each drop will run on seperate processes for true parallelism. Mirror
the ThreadDropRunner tests to ensure the functionality is the same
across both implementations.
As we can't rely on an external service for unit tests. Update the NGAS
host to one that will be maintained for manual tests when required.
Looks like a later version of pydantic is stricter about whether values
are required. A `Union[int, None]` without an initialised value of
`None` is no longer valid. To fix this, we simply initialise with
`None`.
@juliancarrivick juliancarrivick force-pushed the multiproc-drop-apps-julian branch from c0ee839 to ac31f87 Compare February 28, 2024 04:14
@juliancarrivick juliancarrivick merged commit 11bf09d into master Feb 28, 2024
10 of 16 checks passed
@juliancarrivick juliancarrivick deleted the multiproc-drop-apps-julian branch February 28, 2024 04:21
awicenec pushed a commit that referenced this pull request Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants