From faaa28864dd9506eeb64a9069a0a0d2dbd621e28 Mon Sep 17 00:00:00 2001 From: Eugene Toder Date: Fri, 19 Apr 2024 14:04:33 -0400 Subject: [PATCH] Use forkserver start method for multiprocessing Use forkserver when available to avoid issues with forking multi-threaded processes. See [1] for more context. This also removes a warning when running on python 3.12 that can be seen in CI: > /opt/hostedtoolcache/Python/3.12.2/x64/lib/python3.12/multiprocessing/popen_fork.py:66: DeprecationWarning: This process (pid=1991) is multi-threaded, use of fork() may lead to deadlocks in the child. > self.pid = os.fork() [1] https://github.com/python/cpython/issues/84559 --- green/cmdline.py | 28 +++++++++------------------- green/config.py | 3 +-- green/runner.py | 13 +++++++++++-- green/test/test_runner.py | 11 +++++++---- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/green/cmdline.py b/green/cmdline.py index a6c00dd..7eb3544 100644 --- a/green/cmdline.py +++ b/green/cmdline.py @@ -2,8 +2,9 @@ from __future__ import annotations - +import atexit import os +import shutil import sys import tempfile from typing import Sequence @@ -87,24 +88,13 @@ def _main(argv: Sequence[str] | None, testing: bool) -> int: def main(argv: Sequence[str] | None = None, testing: bool = False) -> int: # create the temp dir only once (i.e., not while in the recursed call) if not os.environ.get("TMPDIR"): # pragma: nocover - try: - with tempfile.TemporaryDirectory() as temp_dir_for_tests: - try: - os.environ["TMPDIR"] = temp_dir_for_tests - tempfile.tempdir = temp_dir_for_tests - return _main(argv, testing) - finally: - del os.environ["TMPDIR"] - tempfile.tempdir = None - except OSError as os_error: - if os_error.errno == 39: - # "Directory not empty" when trying to delete the temp dir can just be a warning - print(f"warning: {os_error.strerror}") - return 0 - else: - raise os_error - else: - return _main(argv, testing) + # Use `atexit` to cleanup `temp_dir_for_tests` so that multiprocessing can run its + # own cleanup before its temp directory is deleted. + temp_dir_for_tests = tempfile.mkdtemp() + atexit.register(lambda: shutil.rmtree(temp_dir_for_tests, ignore_errors=True)) + os.environ["TMPDIR"] = temp_dir_for_tests + tempfile.tempdir = temp_dir_for_tests + return _main(argv, testing) if __name__ == "__main__": # pragma: no cover diff --git a/green/config.py b/green/config.py index 139e2b7..b32538e 100644 --- a/green/config.py +++ b/green/config.py @@ -12,7 +12,6 @@ import copy # pragma: no cover import functools # pragma: no cover import logging # pragma: no cover -import multiprocessing # pragma: no cover import os # pragma: no cover import pathlib # pragma: no cover import sys # pragma: no cover @@ -36,7 +35,7 @@ def get_default_args() -> argparse.Namespace: """ return argparse.Namespace( # pragma: no cover targets=["."], # Not in configs - processes=multiprocessing.cpu_count(), + processes=os.cpu_count(), initializer="", finalizer="", maxtasksperchild=None, diff --git a/green/runner.py b/green/runner.py index 82ebd15..252d7cd 100644 --- a/green/runner.py +++ b/green/runner.py @@ -102,13 +102,21 @@ def run( # The call to toParallelTargets needs to happen before pool stuff so we can crash if there # are, for example, syntax errors in the code to be loaded. parallel_targets = toParallelTargets(suite, args.targets) + # Use "forkserver" method when available to avoid problems with "fork". See, for example, + # https://github.com/python/cpython/issues/84559 + if "forkserver" in multiprocessing.get_all_start_methods(): + mp_method = "forkserver" + else: + mp_method = None + mp_context = multiprocessing.get_context(mp_method) pool = LoggingDaemonlessPool( processes=args.processes or None, initializer=InitializerOrFinalizer(args.initializer), finalizer=InitializerOrFinalizer(args.finalizer), maxtasksperchild=args.maxtasksperchild, + context=mp_context, ) - manager: SyncManager = multiprocessing.Manager() + manager: SyncManager = mp_context.Manager() targets: list[tuple[str, Queue]] = [ (target, manager.Queue()) for target in parallel_targets ] @@ -165,10 +173,11 @@ def run( pool.close() pool.join() + manager.shutdown() result.stopTestRun() - # Ignore the type mismatch untile we make GreenTestResult a subclass of unittest.TestResult. + # Ignore the type mismatch until we make GreenTestResult a subclass of unittest.TestResult. removeResult(result) # type: ignore return result diff --git a/green/test/test_runner.py b/green/test/test_runner.py index 0b66406..49044a0 100644 --- a/green/test/test_runner.py +++ b/green/test/test_runner.py @@ -10,6 +10,7 @@ from textwrap import dedent import unittest from unittest import mock +import warnings import weakref from green.config import get_default_args @@ -114,7 +115,7 @@ def setUp(self): self.loader = GreenTestLoader() def tearDown(self): - del self.tmpdir + shutil.rmtree(self.tmpdir, ignore_errors=True) del self.stream def test_stdout(self): @@ -162,7 +163,7 @@ def test01(self): def test_warnings(self): """ - setting warnings='always' doesn't crash + test runner does not generate warnings """ self.args.warnings = "always" sub_tmpdir = pathlib.Path(tempfile.mkdtemp(dir=self.tmpdir)) @@ -177,10 +178,12 @@ def test01(self): (sub_tmpdir / "test_warnings.py").write_text(content, encoding="utf-8") os.chdir(sub_tmpdir) try: - tests = self.loader.loadTargets("test_warnings") - result = run(tests, self.stream, self.args) + with warnings.catch_warnings(record=True) as recorded: + tests = self.loader.loadTargets("test_warnings") + result = run(tests, self.stream, self.args) finally: os.chdir(self.startdir) + self.assertEqual(recorded, []) self.assertEqual(result.testsRun, 1) self.assertIn("OK", self.stream.getvalue())