diff --git a/src/amulet/utils/__init__.pyi b/src/amulet/utils/__init__.pyi index 56501f46f..6b7bdcf1b 100644 --- a/src/amulet/utils/__init__.pyi +++ b/src/amulet/utils/__init__.pyi @@ -1,5 +1,5 @@ from __future__ import annotations -from . import mutex, numpy, task_manager +from . import lock, mutex, numpy, task_manager -__all__ = ["mutex", "numpy", "task_manager"] +__all__ = ["lock", "mutex", "numpy", "task_manager"] diff --git a/src/amulet/utils/_init_utils.py.cpp b/src/amulet/utils/_init_utils.py.cpp index 16aabebbc..c388c2f64 100644 --- a/src/amulet/utils/_init_utils.py.cpp +++ b/src/amulet/utils/_init_utils.py.cpp @@ -5,6 +5,7 @@ namespace py = pybind11; void init_utils_numpy(py::module); void init_task_manager(py::module); void init_mutex(py::module); +void init_lock(py::module); void init_utils(py::module m_parent){ auto m = pybind11_extensions::def_subpackage(m_parent, "utils"); @@ -12,4 +13,5 @@ void init_utils(py::module m_parent){ init_utils_numpy(m); init_task_manager(m); init_mutex(m); + init_lock(m); } diff --git a/src/amulet/utils/lock.py.cpp b/src/amulet/utils/lock.py.cpp new file mode 100644 index 000000000..d7ce91b4c --- /dev/null +++ b/src/amulet/utils/lock.py.cpp @@ -0,0 +1,356 @@ +// This is a lock class with a similar API to threading.Lock +// It is built on top of Amulet::OrderedSharedTimedMutex +// In C++ code std::unique_lock and std::shared_lock should be used instead of this. +// They aren't particuarly pythonic hence this class existing. + +#include +#include +#include + +#include +#include + +#include "mutex.hpp" +#include "task_manager/cancel_manager.hpp" + +namespace py = pybind11; + +namespace Amulet { + +class LockNotAcquired : public std::runtime_error { + using std::runtime_error::runtime_error; +}; + +class OrderedSharedLock; + +class UniqueLockContextManager { +private: + OrderedSharedLock& lock; + bool blocking; + double timeout; + AbstractCancelManager& cancel_manager; + +public: + UniqueLockContextManager( + OrderedSharedLock& lock, + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager); + void enter(); + void exit(py::object, py::object, py::object); +}; + +class SharedLockContextManager { + OrderedSharedLock& lock; + bool blocking; + double timeout; + AbstractCancelManager& cancel_manager; + +public: + SharedLockContextManager( + OrderedSharedLock& lock, + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager); + void enter(); + void exit(py::object, py::object, py::object); +}; + +class OrderedSharedLock { + // Will be a nullptr if constructed from a reference + std::unique_ptr _mutex_storage; + OrderedSharedTimedMutex& _mutex; + +public: + OrderedSharedLock(OrderedSharedTimedMutex& mutex); + OrderedSharedLock(std::unique_ptr mutex); + OrderedSharedLock(); + bool acquire_unique( + bool blocking = true, + double timeout = -1, + AbstractCancelManager& cancel_manager = global_VoidCancelManager); + void release_unique(); + bool acquire_shared( + bool blocking = true, + double timeout = -1, + AbstractCancelManager& cancel_manager = global_VoidCancelManager); + void release_shared(); + UniqueLockContextManager unique( + bool blocking = true, + double timeout = -1, + AbstractCancelManager& cancel_manager = global_VoidCancelManager); + SharedLockContextManager shared( + bool blocking = true, + double timeout = -1, + AbstractCancelManager& cancel_manager = global_VoidCancelManager); +}; + +UniqueLockContextManager::UniqueLockContextManager( + OrderedSharedLock& lock, + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) + : lock(lock) + , blocking(blocking) + , timeout(timeout) + , cancel_manager(cancel_manager) +{ +} +void UniqueLockContextManager::enter() +{ + if (!lock.acquire_unique(blocking, timeout, cancel_manager)) { + throw LockNotAcquired("Lock was not acquired."); + } +} +void UniqueLockContextManager::exit(py::object, py::object, py::object) +{ + lock.release_unique(); +} + +SharedLockContextManager::SharedLockContextManager( + OrderedSharedLock& lock, + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) + : lock(lock) + , blocking(blocking) + , timeout(timeout) + , cancel_manager(cancel_manager) +{ +} +void SharedLockContextManager::enter() +{ + if (!lock.acquire_shared(blocking, timeout, cancel_manager)) { + throw LockNotAcquired("Lock was not acquired."); + } +} +void SharedLockContextManager::exit(py::object, py::object, py::object) +{ + lock.release_shared(); +} + +// Construct from reference +OrderedSharedLock::OrderedSharedLock(OrderedSharedTimedMutex& mutex) + : _mutex(mutex) +{ +} +// Construct from unique_ptr +OrderedSharedLock::OrderedSharedLock(std::unique_ptr mutex) + : _mutex_storage(std::move(mutex)) + , _mutex(*_mutex_storage) +{ +} +// Default constructor +OrderedSharedLock::OrderedSharedLock() + : OrderedSharedLock(std::make_unique()) +{ +} +bool OrderedSharedLock::acquire_unique( + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) +{ + if (blocking) { + if (timeout > 0) { + return _mutex.try_lock_for(std::chrono::duration(timeout), cancel_manager); + } else { + return _mutex.try_lock_for(std::chrono::duration::max(), cancel_manager); + } + } else { + return _mutex.try_lock(); + } +} +void OrderedSharedLock::release_unique() +{ + _mutex.unlock(); +} +bool OrderedSharedLock::acquire_shared( + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) +{ + if (blocking) { + if (timeout > 0) { + return _mutex.try_lock_shared_for(std::chrono::duration(timeout), cancel_manager); + } else { + return _mutex.try_lock_shared_for(std::chrono::duration::max(), cancel_manager); + } + } else { + return _mutex.try_lock_shared(); + } +} +void OrderedSharedLock::release_shared() +{ + _mutex.unlock_shared(); +} +UniqueLockContextManager OrderedSharedLock::unique( + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) +{ + return { *this, blocking, timeout, cancel_manager }; +} +SharedLockContextManager OrderedSharedLock::shared( + bool blocking, + double timeout, + AbstractCancelManager& cancel_manager) +{ + return { *this, blocking, timeout, cancel_manager }; +} + +} // namespace Amulet + +void init_lock(py::module m_parent) +{ + auto m = m_parent.def_submodule("lock"); + + auto LockNotAcquired = py::register_exception(m, "LockNotAcquired", PyExc_RuntimeError); + LockNotAcquired.doc() = "An exception raised if the lock was not acquired."; + + py::class_ UniqueLockContextManager(m, "UniqueLockContextManager"); + UniqueLockContextManager.def( + "__enter__", + &Amulet::UniqueLockContextManager::enter, + py::call_guard()); + UniqueLockContextManager.def( + "__exit__", + &Amulet::UniqueLockContextManager::exit, + py::call_guard()); + py::class_ SharedLockContextManager(m, "SharedLockContextManager"); + SharedLockContextManager.def( + "__enter__", + &Amulet::SharedLockContextManager::enter, + py::call_guard()); + SharedLockContextManager.def( + "__exit__", + &Amulet::SharedLockContextManager::exit, + py::call_guard()); + + py::class_ OrderedSharedLock(m, "OrderedSharedLock", + "This is a custom lock implementation that can be acquired in\n" + "1) unique mode.\n" + " - This is the normal mode where only this thread can use the resource.\n" + " - All other acquires block until it is released.\n" + "2) shared mode.\n" + " - This allows multiple threads to acquire the resource at the same time.\n" + " - This is useful if multiple threads want to read a resource but not write to it.\n" + " - If the resource is locked in unique mode this will block.\n" + " - Once locked in shared mode it will block unique acquires until all shared threads release it.\n" + "Tasks are prioritised in the order the call is made"); + OrderedSharedLock.def( + py::init(), + py::arg("mutex"), + py::keep_alive<1, 2>()); + OrderedSharedLock.def(py::init<>()); + OrderedSharedLock.def( + "acquire_unique", + &Amulet::OrderedSharedLock::acquire_unique, + py::arg("blocking") = true, + py::arg("timeout") = -1.0, + py::arg("cancel_manager") = Amulet::VoidCancelManager(), + py::call_guard(), + py::doc( + "Only use this if you know what you are doing. Consider using :meth:`unique` instead\n" + "Acquire the lock in unique mode. This is equivalent to threading.Lock.acquire\n" + "With improper use this can lead to a deadlock.\n" + ":param blocking: Should this block until the lock can be acquired. Default is True.\n" + " If false and the lock cannot be acquired on the first try, this returns False.\n" + ":param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever.\n" + ":param task_manager: A custom object through which acquiring can be cancelled.\n" + " This effectively manually triggers timeout.\n" + " This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while.\n" + ":return: True if the lock was acquired otherwise False.")); + OrderedSharedLock.def( + "release_unique", + &Amulet::OrderedSharedLock::release_unique, + py::call_guard(), + py::doc( + "Only use this if you know what you are doing. Consider using :meth:`unique` instead\n" + "Release the unique hold on the lock. This must be called by the same thread that acquired it.\n" + "This must be called exactly the same number of times as :meth:`acquire_unique` was called.")); + OrderedSharedLock.def( + "acquire_shared", + &Amulet::OrderedSharedLock::acquire_shared, + py::arg("blocking") = true, + py::arg("timeout") = -1.0, + py::arg("cancel_manager") = Amulet::VoidCancelManager(), + py::call_guard(), + py::doc( + "Only use this if you know what you are doing. Consider using :meth:`shared` instead\n" + "Acquire the lock in shared mode.\n" + ":param blocking: Should this block until the lock can be acquired. Default is True.\n" + " If false and the lock cannot be acquired on the first try, this returns False.\n" + ":param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever.\n" + ":param task_manager: A custom object through which acquiring can be cancelled.\n" + " This effectively manually triggers timeout.\n" + " This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while.\n" + ":return: True if the lock was acquired otherwise False.")); + OrderedSharedLock.def( + "release_shared", + &Amulet::OrderedSharedLock::release_shared, + py::call_guard(), + py::doc( + "Only use this if you know what you are doing. Consider using :meth:`shared` instead\n" + "Release the shared hold on the lock. This must be called by the same thread that acquired it.\n" + "This must be called exactly the same number of times as :meth:`acquire_shared` was called.")); + OrderedSharedLock.def( + "unique", + &Amulet::OrderedSharedLock::unique, + py::arg("blocking") = true, + py::arg("timeout") = -1.0, + py::arg("cancel_manager") = Amulet::VoidCancelManager(), + py::call_guard(), + py::keep_alive<0, 1>(), + py::doc( + "Acquire the lock in unique mode.\n" + "This is used as follows\n" + "\n" + ">>> lock: OrderedSharedLock\n" + ">>> with lock.unique():\n" + ">>> # code with lock acquired\n" + ">>> # the lock will automatically be released here\n" + "\n" + "This will block while all other threads using the resource finish\n" + "and once acquired block all other threads until the lock is released.\n" + "\n" + ":param blocking: Should this block until the lock can be acquired. Default is True.\n" + " If false and the lock cannot be acquired on the first try, this returns False.\n" + ":param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever.\n" + ":param task_manager: A custom object through which acquiring can be cancelled.\n" + " This effectively manually triggers timeout.\n" + " This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while.\n" + ":return: None\n" + ":raises: LockNotAcquired if the lock could not be acquired.")); + OrderedSharedLock.def( + "shared", + &Amulet::OrderedSharedLock::shared, + py::arg("blocking") = true, + py::arg("timeout") = -1.0, + py::arg("cancel_manager") = Amulet::VoidCancelManager(), + py::call_guard(), + py::keep_alive<0, 1>(), + py::doc( + "Acquire the lock in shared mode.\n" + "This is used as follows\n" + "\n" + ">>> lock: OrderedSharedLock\n" + ">>> with lock.shared():\n" + ">>> # code with lock acquired\n" + ">>> # the lock will automatically be released here\n" + "\n" + "If the lock is acquired by a different thread in unique mode then this will block until it is finished.\n" + "If the lock is acquired in unique mode by this thread or by other threads in shared mode then this will acquire\n" + "the lock.\n" + "\n" + "If another thread wants to acquire the lock in unique mode it will block until all threads have finished in\n" + "shared mode.\n" + "\n" + ":param blocking: Should this block until the lock can be acquired. Default is True.\n" + " If false and the lock cannot be acquired on the first try, this returns False.\n" + ":param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever.\n" + ":param task_manager: A custom object through which acquiring can be cancelled.\n" + " This effectively manually triggers timeout.\n" + " This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while.\n" + ":return: None\n" + ":raises: LockNotAcquired if the lock could not be acquired.")); +} diff --git a/src/amulet/utils/lock.pyi b/src/amulet/utils/lock.pyi new file mode 100644 index 000000000..e099d8ad0 --- /dev/null +++ b/src/amulet/utils/lock.pyi @@ -0,0 +1,159 @@ +from __future__ import annotations + +import typing + +import amulet.utils.mutex +import amulet.utils.task_manager.cancel_manager + +__all__ = [ + "LockNotAcquired", + "OrderedSharedLock", + "SharedLockContextManager", + "UniqueLockContextManager", +] + +class LockNotAcquired(RuntimeError): + """ + An exception raised if the lock was not acquired. + """ + +class OrderedSharedLock: + """ + This is a custom lock implementation that can be acquired in + 1) unique mode. + - This is the normal mode where only this thread can use the resource. + - All other acquires block until it is released. + 2) shared mode. + - This allows multiple threads to acquire the resource at the same time. + - This is useful if multiple threads want to read a resource but not write to it. + - If the resource is locked in unique mode this will block. + - Once locked in shared mode it will block unique acquires until all shared threads release it. + Tasks are prioritised in the order the call is made + """ + + @typing.overload + def __init__(self, mutex: amulet.utils.mutex.OrderedSharedTimedMutex) -> None: ... + @typing.overload + def __init__(self) -> None: ... + def acquire_shared( + self, + blocking: bool = True, + timeout: float = -1.0, + cancel_manager: amulet.utils.task_manager.cancel_manager.AbstractCancelManager = ..., + ) -> bool: + """ + Only use this if you know what you are doing. Consider using :meth:`shared` instead + Acquire the lock in shared mode. + :param blocking: Should this block until the lock can be acquired. Default is True. + If false and the lock cannot be acquired on the first try, this returns False. + :param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever. + :param task_manager: A custom object through which acquiring can be cancelled. + This effectively manually triggers timeout. + This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while. + :return: True if the lock was acquired otherwise False. + """ + + def acquire_unique( + self, + blocking: bool = True, + timeout: float = -1.0, + cancel_manager: amulet.utils.task_manager.cancel_manager.AbstractCancelManager = ..., + ) -> bool: + """ + Only use this if you know what you are doing. Consider using :meth:`unique` instead + Acquire the lock in unique mode. This is equivalent to threading.Lock.acquire + With improper use this can lead to a deadlock. + :param blocking: Should this block until the lock can be acquired. Default is True. + If false and the lock cannot be acquired on the first try, this returns False. + :param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever. + :param task_manager: A custom object through which acquiring can be cancelled. + This effectively manually triggers timeout. + This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while. + :return: True if the lock was acquired otherwise False. + """ + + def release_shared(self) -> None: + """ + Only use this if you know what you are doing. Consider using :meth:`shared` instead + Release the shared hold on the lock. This must be called by the same thread that acquired it. + This must be called exactly the same number of times as :meth:`acquire_shared` was called. + """ + + def release_unique(self) -> None: + """ + Only use this if you know what you are doing. Consider using :meth:`unique` instead + Release the unique hold on the lock. This must be called by the same thread that acquired it. + This must be called exactly the same number of times as :meth:`acquire_unique` was called. + """ + + def shared( + self, + blocking: bool = True, + timeout: float = -1.0, + cancel_manager: amulet.utils.task_manager.cancel_manager.AbstractCancelManager = ..., + ) -> SharedLockContextManager: + """ + Acquire the lock in shared mode. + This is used as follows + + >>> lock: OrderedSharedLock + >>> with lock.shared(): + >>> # code with lock acquired + >>> # the lock will automatically be released here + + If the lock is acquired by a different thread in unique mode then this will block until it is finished. + If the lock is acquired in unique mode by this thread or by other threads in shared mode then this will acquire + the lock. + + If another thread wants to acquire the lock in unique mode it will block until all threads have finished in + shared mode. + + :param blocking: Should this block until the lock can be acquired. Default is True. + If false and the lock cannot be acquired on the first try, this returns False. + :param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever. + :param task_manager: A custom object through which acquiring can be cancelled. + This effectively manually triggers timeout. + This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while. + :return: None + :raises: LockNotAcquired if the lock could not be acquired. + """ + + def unique( + self, + blocking: bool = True, + timeout: float = -1.0, + cancel_manager: amulet.utils.task_manager.cancel_manager.AbstractCancelManager = ..., + ) -> UniqueLockContextManager: + """ + Acquire the lock in unique mode. + This is used as follows + + >>> lock: OrderedSharedLock + >>> with lock.unique(): + >>> # code with lock acquired + >>> # the lock will automatically be released here + + This will block while all other threads using the resource finish + and once acquired block all other threads until the lock is released. + + :param blocking: Should this block until the lock can be acquired. Default is True. + If false and the lock cannot be acquired on the first try, this returns False. + :param timeout: Maximum amount of time to block for. Has no effect is blocking is False. Default is forever. + :param task_manager: A custom object through which acquiring can be cancelled. + This effectively manually triggers timeout. + This is useful for GUIs so that the user can cancel an operation that may otherwise block for a while. + :return: None + :raises: LockNotAcquired if the lock could not be acquired. + """ + +class SharedLockContextManager: + def __enter__(self) -> None: ... + def __exit__( + self, arg0: typing.Any, arg1: typing.Any, arg2: typing.Any + ) -> None: ... + +class UniqueLockContextManager: + def __enter__(self) -> None: ... + def __exit__( + self, arg0: typing.Any, arg1: typing.Any, arg2: typing.Any + ) -> None: ... diff --git a/tests/test_amulet/test_util/test_lock.py b/tests/test_amulet/test_util/test_lock.py new file mode 100644 index 000000000..4e9972ee5 --- /dev/null +++ b/tests/test_amulet/test_util/test_lock.py @@ -0,0 +1,232 @@ +from unittest import TestCase +import weakref +import time +from threading import Thread + +from amulet.utils.task_manager import AbstractCancelManager, CancelManager +from amulet.utils.mutex import OrderedSharedTimedMutex, Deadlock +from amulet.utils.lock import OrderedSharedLock, LockNotAcquired + + +class LockTestCase(TestCase): + def test_lock_not_acquired(self) -> None: + self.assertTrue(issubclass(LockNotAcquired, RuntimeError)) + + def test_empty_constructor(self) -> None: + lock = OrderedSharedLock() + self.assertTrue(lock.acquire_unique(False)) + lock.release_unique() + + def test_mutex_constructor(self) -> None: + mutex = OrderedSharedTimedMutex() + lock = OrderedSharedLock(mutex) + self.assertTrue(lock.acquire_unique(False)) + lock.release_unique() + + def test_mutex_lifespan(self) -> None: + mutex = OrderedSharedTimedMutex() + mutex_ref = weakref.ref(mutex) + lock = OrderedSharedLock(mutex) + del mutex + self.assertIsNotNone(mutex_ref()) + del lock + self.assertIsNone(mutex_ref()) + + def test_unique_lifespan(self) -> None: + lock = OrderedSharedLock() + lock_ref = weakref.ref(lock) + unique = lock.unique() + del lock + self.assertIsNotNone(lock_ref()) + del unique + self.assertIsNone(lock_ref()) + + def test_shared_lifespan(self) -> None: + lock = OrderedSharedLock() + lock_ref = weakref.ref(lock) + shared = lock.shared() + del lock + self.assertIsNotNone(lock_ref()) + del shared + self.assertIsNone(lock_ref()) + + def test_exceptions(self) -> None: + lock = OrderedSharedLock() + with lock.unique(): + with self.assertRaises(Deadlock): + with lock.unique(): + pass + with self.assertRaises(Deadlock): + with lock.shared(): + pass + with self.assertRaises(Deadlock): + lock.acquire_unique(True) + with self.assertRaises(Deadlock): + lock.acquire_unique(False) + with self.assertRaises(Deadlock): + lock.acquire_shared(True) + with self.assertRaises(Deadlock): + lock.acquire_shared(False) + with lock.shared(): + with self.assertRaises(Deadlock): + with lock.unique(): + pass + with self.assertRaises(Deadlock): + with lock.shared(): + pass + with self.assertRaises(Deadlock): + lock.acquire_unique(True) + with self.assertRaises(Deadlock): + lock.acquire_unique(False) + with self.assertRaises(Deadlock): + lock.acquire_shared(True) + with self.assertRaises(Deadlock): + lock.acquire_shared(False) + + def test_parallel(self) -> None: + lock = OrderedSharedLock() + + sleep_time = 0.3 + + def parallel_func(): + with lock.shared(timeout=5): + time.sleep(sleep_time) + + def serial_func(): + with lock.unique(timeout=5): + time.sleep(sleep_time) + + thread_1 = Thread(target=parallel_func) + thread_2 = Thread(target=parallel_func) + thread_3 = Thread(target=serial_func) + thread_4 = Thread(target=parallel_func) + thread_5 = Thread(target=parallel_func) + + t = time.time() + + # Start threads + thread_1.start() + thread_2.start() + time.sleep(0.1) + thread_3.start() + thread_4.start() + thread_5.start() + + # Wait for threads to finish + thread_1.join() + thread_2.join() + thread_3.join() + thread_4.join() + thread_5.join() + + dt = time.time() - t + + expected_time = sleep_time * 3 + self.assertTrue( + expected_time - 0.01 <= dt <= expected_time + 0.1, + f"Expected {expected_time}s. Got {dt}s", + ) + + def test_timeout(self) -> None: + result_1 = False + result_2 = False + + lock = OrderedSharedLock() + + def func_1(): + nonlocal result_1 + try: + with lock.unique(blocking=False): + time.sleep(0.5) + except LockNotAcquired: + result_1 = False + else: + result_1 = True + + def func_2(): + nonlocal result_2 + try: + with lock.unique(timeout=0.1): + time.sleep(0.5) + except LockNotAcquired: + result_2 = True + else: + result_2 = False + + thread_1 = Thread(target=func_1) + thread_2 = Thread(target=func_2) + + t = time.time() + + # Start threads + thread_1.start() + time.sleep(0.1) + thread_2.start() + + # Wait for threads to finish + thread_1.join() + thread_2.join() + + dt = time.time() - t + + self.assertTrue( + 0.49 <= dt <= 0.6, + f"Expected 0.5s. Got {dt}s", + ) + + self.assertTrue(result_1) + self.assertTrue(result_2) + + def test_cancel(self) -> None: + result_1 = False + result_2 = False + + lock_1 = OrderedSharedLock() + lock_2 = OrderedSharedLock() + + def func_1(cancel_manager: AbstractCancelManager): + nonlocal result_1 + with lock_1.unique(blocking=False): + time.sleep(0.1) + with lock_2.unique(cancel_manager=cancel_manager): + result_1 = True + + def func_2(cancel_manager: AbstractCancelManager): + nonlocal result_2 + with lock_2.unique(blocking=False): + time.sleep(0.1) + try: + with lock_1.unique(cancel_manager=cancel_manager): + pass + except LockNotAcquired: + result_2 = True + + cancel_manager_1 = CancelManager() + cancel_manager_2 = CancelManager() + + thread_1 = Thread(target=func_1, args=(cancel_manager_1,)) + thread_2 = Thread(target=func_2, args=(cancel_manager_2,)) + + t = time.time() + + # Start threads + thread_1.start() + thread_2.start() + + time.sleep(0.5) + + cancel_manager_2.cancel() + + # Wait for threads to finish + thread_1.join() + thread_2.join() + + dt = time.time() - t + + self.assertTrue( + 0.49 <= dt <= 0.6, + f"Expected 0.5s. Got {dt}s", + ) + + self.assertTrue(result_1) + self.assertTrue(result_2) diff --git a/tests_old/test_amulet/utils/shareable_lock.py b/tests_old/test_amulet/utils/shareable_lock.py deleted file mode 100644 index 8949eae22..000000000 --- a/tests_old/test_amulet/utils/shareable_lock.py +++ /dev/null @@ -1,37 +0,0 @@ -import unittest - -from amulet.utils.shareable_lock import ShareableRLock - - -class TestShareableRLock(unittest.TestCase): - def test_lock(self) -> None: - lock = ShareableRLock() - - with lock.unique(): - pass - - with lock.shared(): - pass - - with lock.unique(): - with lock.unique(): - pass - - with lock.unique(): - with lock.shared(): - pass - - with lock.shared(): - with lock.unique(): - pass - - with lock.shared(): - with lock.shared(): - pass - - def test_deadlock(self) -> None: - pass - - -if __name__ == "__main__": - unittest.main()