Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aiotools] pervasively retrieve exceptions when cancelling tasks #13876

Closed
wants to merge 9 commits into from

Conversation

danking
Copy link
Contributor

@danking danking commented Oct 20, 2023

cc: @daniel-goldstein

I think I now understand the states of an asyncio.Task well enough to ensure we always retrieve exceptions.


I searched for all the uses of cancel and made sure they check for exceptions. One thing I might have got wrong: this code might add duplicate exception logs. This happens if one of the tasks sent to cancel_and_retrieve_all_exceptions already had its exception retrieved and logged.

These are my experiments that convinced me we need to cancel, wait, and then retrieve exceptions.

  1. When you cancel a never-run task it immediately enters the "cancelling" state; however, it will not become done or cancelled until the event loop has a chance to process it. The running coroutine must await to allow that.
In [3]: import asyncio
   ...:
   ...: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     t.cancel()
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task cancelling name='Task-502' coro=<sleep() running at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:593>>, False, False)
  1. If you let a task start running before you cancel it, it will be "pending". In this case, I believe the CancelledError needs to bubble up the entire coroutine task before the task can become cancelling or cancelled or done.
In [4]: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task pending name='Task-522' coro=<sleep() running at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:605> wait_for=<Future cancelled>>, False, False)
  1. If you yield to the task, it will encounter a CancelledError. That bubbles up, running any finally blocks, until it reaches the top of the coroutine's stack. At this point the task can become done and cancelled.
In [5]: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     await asyncio.wait([t])
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task cancelled name='Task-542' coro=<sleep() done, defined at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:593>>, True, True)
  1. Here's an example of a finally block that raises an exception. That exception takes priority over the CancelledError. Notice that the task is done but not cancelled. This exception is not retrieved, which asyncio reports.
In [7]: async def foo():
   ...:     async def raises(message):
   ...:         try:
   ...:             await asyncio.sleep(100)
   ...:         finally:
   ...:             raise ValueError(message)
   ...:
   ...:     unretrieved = asyncio.create_task(raises('unretrieved case'))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     unretrieved.cancel()
   ...:     await asyncio.wait([unretrieved])
   ...:     print((unretrieved, unretrieved.done(), unretrieved.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task finished name='Task-579' coro=<foo.<locals>.raises() done, defined at <ipython-input-7-ccfd397235b8>:2> exception=ValueError('unretrieved case')>, True, False)
Task exception was never retrieved
future: <Task finished name='Task-579' coro=<foo.<locals>.raises() done, defined at <ipython-input-7-ccfd397235b8>:2> exception=ValueError('unretrieved case')>
Traceback (most recent call last):
  File "<ipython-input-7-ccfd397235b8>", line 4, in raises
    await asyncio.sleep(100)
  File "/Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<ipython-input-7-ccfd397235b8>", line 6, in raises
    raise ValueError(message)
ValueError: unretrieved case
  1. And here is an example of "retrieving" the exception (by calling Task.exception). Notice we do not get the "Task exception was never retrieved" message.
In [8]: async def foo():
   ...:     async def raises(message):
   ...:         try:
   ...:             await asyncio.sleep(100)
   ...:         finally:
   ...:             raise ValueError(message)
   ...:
   ...:     t = asyncio.create_task(raises('retrieved case'))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     await asyncio.wait([t])
   ...:     print((t, t.done(), t.cancelled(), t.exception()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task finished name='Task-599' coro=<foo.<locals>.raises() done, defined at <ipython-input-8-5fa396151822>:2> exception=ValueError('retrieved case')>, True, False, ValueError('retrieved case'))

One more thing of interest, I confirmed that the ExitStack throws the first exception it encountered after performing all callbacks.

In [6]: with ExitStack() as exit:
   ...:     def foo(i):
   ...:         print(str(i))
   ...:         raise ValueError(i)
   ...:     exit.callback(foo, 1)
   ...:     exit.callback(foo, 2)
2
1
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[6], line 1
----> 1 with ExitStack() as exit:
      2     def foo(i):
      3         print(str(i))

File ~/miniconda3/lib/python3.10/contextlib.py:576, in ExitStack.__exit__(self, *exc_details)
    572 try:
    573     # bare "raise exc_details[1]" replaces our carefully
    574     # set-up context
    575     fixed_ctx = exc_details[1].__context__
--> 576     raise exc_details[1]
    577 except BaseException:
    578     exc_details[1].__context__ = fixed_ctx

File ~/miniconda3/lib/python3.10/contextlib.py:561, in ExitStack.__exit__(self, *exc_details)
    559 assert is_sync
    560 try:
--> 561     if cb(*exc_details):
    562         suppressed_exc = True
    563         pending_raise = False

File ~/miniconda3/lib/python3.10/contextlib.py:449, in _BaseExitStack._create_cb_wrapper.<locals>._exit_wrapper(exc_type, exc, tb)
    448 def _exit_wrapper(exc_type, exc, tb):
--> 449     callback(*args, **kwds)

Cell In[6], line 4, in foo(i)
      2 def foo(i):
      3     print(str(i))
----> 4     raise ValueError(i)

ValueError: 1

I searched for all the uses of `cancel` and made sure they check for exceptions. One thing I might
have got wrong: if someone *else* already checked for and handled an exception, this code may raise
exceptions in places we did not expect them.

These are my experiments that convinced me we need to cancel, wait, and then retrieve exceptions.

1. When you cancel a never-run task it immediately enters the "cancelling" state; however, it will
   not become done or cancelled until the event loop has a chance to process it. The running
   coroutine must `await` to allow that.

```ipython
In [3]: import asyncio
   ...:
   ...: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     t.cancel()
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task cancelling name='Task-502' coro=<sleep() running at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:593>>, False, False)
```

2. If you let a task start running before you cancel it, it will be "pending". In this case, I
   believe the `CancelledError` needs to bubble up the entire coroutine task before the task can
   become cancelling or cancelled or done.

```ipython
In [4]: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task pending name='Task-522' coro=<sleep() running at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:605> wait_for=<Future cancelled>>, False, False)
```

3. If you yield to the task, it will encounter a `CancelledError`. That bubbles up, running any
   `finally` blocks, until it reaches the top of the coroutine's stack. At this point the task can
   become done and cancelled.

```ipython
In [5]: async def foo():
   ...:     t = asyncio.create_task(asyncio.sleep(100))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     await asyncio.wait([t])
   ...:     print((t, t.done(), t.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task cancelled name='Task-542' coro=<sleep() done, defined at /Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py:593>>, True, True)
```

4. Here's an example of a finally block that raises an exception. That exception takes priority over
   the CancelledError. Notice that the task is done but *not cancelled*. This exception is not
   retrieved, which asyncio reports.

```
In [7]: async def foo():
   ...:     async def raises(message):
   ...:         try:
   ...:             await asyncio.sleep(100)
   ...:         finally:
   ...:             raise ValueError(message)
   ...:
   ...:     unretrieved = asyncio.create_task(raises('unretrieved case'))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     unretrieved.cancel()
   ...:     await asyncio.wait([unretrieved])
   ...:     print((unretrieved, unretrieved.done(), unretrieved.cancelled()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task finished name='Task-579' coro=<foo.<locals>.raises() done, defined at <ipython-input-7-ccfd397235b8>:2> exception=ValueError('unretrieved case')>, True, False)
Task exception was never retrieved
future: <Task finished name='Task-579' coro=<foo.<locals>.raises() done, defined at <ipython-input-7-ccfd397235b8>:2> exception=ValueError('unretrieved case')>
Traceback (most recent call last):
  File "<ipython-input-7-ccfd397235b8>", line 4, in raises
    await asyncio.sleep(100)
  File "/Users/dking/miniconda3/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<ipython-input-7-ccfd397235b8>", line 6, in raises
    raise ValueError(message)
ValueError: unretrieved case
```

5. And here is an example of "retrieving" the exception (by calling `Task.exception`). Notice we do
   not get the "Task exception was never retrieved" message.

```
In [8]: async def foo():
   ...:     async def raises(message):
   ...:         try:
   ...:             await asyncio.sleep(100)
   ...:         finally:
   ...:             raise ValueError(message)
   ...:
   ...:     t = asyncio.create_task(raises('retrieved case'))
   ...:     await asyncio.sleep(0)  # let the other task run for a moment
   ...:     t.cancel()
   ...:     await asyncio.wait([t])
   ...:     print((t, t.done(), t.cancelled(), t.exception()))
   ...:
   ...: asyncio.run(foo())
   ...:
(<Task finished name='Task-599' coro=<foo.<locals>.raises() done, defined at <ipython-input-8-5fa396151822>:2> exception=ValueError('retrieved case')>, True, False, ValueError('retrieved case'))
```

---

One more thing of interest, I confirmed that the ExitStack throws the first exception it
encountered *after* performing all callbacks.

```ipython
In [6]: with ExitStack() as exit:
   ...:     def foo(i):
   ...:         print(str(i))
   ...:         raise ValueError(i)
   ...:     exit.callback(foo, 1)
   ...:     exit.callback(foo, 2)
2
1
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[6], line 1
----> 1 with ExitStack() as exit:
      2     def foo(i):
      3         print(str(i))

File ~/miniconda3/lib/python3.10/contextlib.py:576, in ExitStack.__exit__(self, *exc_details)
    572 try:
    573     # bare "raise exc_details[1]" replaces our carefully
    574     # set-up context
    575     fixed_ctx = exc_details[1].__context__
--> 576     raise exc_details[1]
    577 except BaseException:
    578     exc_details[1].__context__ = fixed_ctx

File ~/miniconda3/lib/python3.10/contextlib.py:561, in ExitStack.__exit__(self, *exc_details)
    559 assert is_sync
    560 try:
--> 561     if cb(*exc_details):
    562         suppressed_exc = True
    563         pending_raise = False

File ~/miniconda3/lib/python3.10/contextlib.py:449, in _BaseExitStack._create_cb_wrapper.<locals>._exit_wrapper(exc_type, exc, tb)
    448 def _exit_wrapper(exc_type, exc, tb):
--> 449     callback(*args, **kwds)

Cell In[6], line 4, in foo(i)
      2 def foo(i):
      3     print(str(i))
----> 4     raise ValueError(i)

ValueError: 1
```
@jigold
Copy link
Contributor

jigold commented Oct 20, 2023

This seems fine to me. I think we should check the worker and driver logs and make sure there's no unexpected messages though before merging.

@jigold
Copy link
Contributor

jigold commented Oct 31, 2023

@danking Do you want me to adopt this? I'm wondering if these changes will fix #13863?

@danking danking closed this Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants