Skip to content

Commit

Permalink
[query] ensure nest_asyncio is applied, but only when necessary (#13899)
Browse files Browse the repository at this point in the history
CHANGELOG: Fix `RuntimeError: This event loop is already running` error
when running hail in a Jupyter Notebook.

Man this is really complicated.

OK, so, things I learned:

1. [asyncio will not create a new event loop if `set_event_loop` has
been called even if `set_event_loop(None)` has since been
called.](https://github.com/python/cpython/blob/main/Lib/asyncio/events.py#L676)
2. [asyncio will not create a new event loop in a thread other than the
main
thread.](https://github.com/python/cpython/blob/main/Lib/asyncio/events.py#L677)
3. `aiohttp.ClientSession` stashes a copy of the event loop present when
it starts. This can cause all manner of extremely confusing behavior if
you later change the event loop or use that session from a different
thread.

The fix, in the end, wasn't that complicated. Anywhere Hail explicitly
asks for an event loop (so that we can run async code), we apply nest
asyncio if the event loop is already running. Otherwise we do nothing.
Nest asyncio appears to [no longer
require](https://github.com/erdewit/nest_asyncio/tree/master#usage)
`apply` to be called before the event loop starts running.

This PR *does not* address:
1. Hail nesting async code in sync code in async code. I think we should
avoid this, but the `hailtop.fs` and `hailtop.batch` APIs, among others,
need async versions before we can do that.
2. This `aiohttp.ClientSession` nonsense. We really should take pains to
ensure we create one `ClientSession` per loop and we never mix loops.
  • Loading branch information
danking authored Oct 25, 2023
1 parent 0402aad commit a53f4ff
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 106 deletions.
12 changes: 6 additions & 6 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,8 @@ steps:
export VECLIB_MAXIMUM_THREADS=2
export NUMEXPR_NUM_THREADS=2
export HAIL_TEST_RESOURCES_DIR=./resources
export HAIL_DOCTEST_DATA_DIR=./data
export HAIL_TEST_RESOURCES_DIR=$(realpath ./resources)
export HAIL_DOCTEST_DATA_DIR=$(realpath ./data)
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
export PYSPARK_SUBMIT_ARGS="--driver-memory 6g pyspark-shell"
python3 -m pytest \
Expand Down Expand Up @@ -1142,8 +1142,8 @@ steps:
# pyspark/conf/core-site.xml already points at /gsa-key/key.json
mv /test-gsa-key/key.json /gsa-key/key.json
export HAIL_TEST_RESOURCES_DIR=./resources
export HAIL_DOCTEST_DATA_DIR=./data
export HAIL_TEST_RESOURCES_DIR=$(realpath ./resources)
export HAIL_DOCTEST_DATA_DIR=$(realpath ./data)
export HAIL_TEST_STORAGE_URI={{ global.test_storage_uri }}/{{ token }}
export PYSPARK_SUBMIT_ARGS="--driver-memory 6g pyspark-shell"
python3 -m pytest \
Expand Down Expand Up @@ -1212,8 +1212,8 @@ steps:
export HAIL_CLOUD={{ global.cloud }}
export HAIL_TEST_STORAGE_URI=/io/tmp/
export HAIL_TEST_RESOURCES_DIR=./resources
export HAIL_DOCTEST_DATA_DIR=./data
export HAIL_TEST_RESOURCES_DIR=$(realpath ./resources)
export HAIL_DOCTEST_DATA_DIR=$(realpath ./data)
export HAIL_LOCAL_BACKEND_HEAP_SIZE=3G
hailctl config set query/backend local
Expand Down
6 changes: 2 additions & 4 deletions hail/python/hail/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from hail.utils import get_env_or_default
from hail.utils.java import Env, warning, choose_backend
from hail.backend import Backend
from hailtop.hail_event_loop import hail_event_loop
from hailtop.utils import secret_alnum_string
from hailtop.fs.fs import FS
from hailtop.aiocloud.aiogoogle import GCSRequesterPaysConfiguration, get_gcs_requester_pays_configuration
Expand Down Expand Up @@ -342,10 +343,7 @@ def init(sc=None,
backend = 'batch'

if backend == 'batch':
import nest_asyncio
nest_asyncio.apply()
import asyncio
return asyncio.get_event_loop().run_until_complete(init_batch(
return hail_event_loop().run_until_complete(init_batch(
log=log,
quiet=quiet,
append=append,
Expand Down
8 changes: 5 additions & 3 deletions hail/python/hailtop/batch/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ class Backend(abc.ABC, Generic[RunningBatchType]):

def __init__(self):
self._requester_pays_fses: Dict[GCSRequesterPaysConfiguration, RouterAsyncFS] = {}
import nest_asyncio # pylint: disable=import-outside-toplevel
nest_asyncio.apply()

def requester_pays_fs(self, requester_pays_config: GCSRequesterPaysConfiguration) -> RouterAsyncFS:
try:
Expand Down Expand Up @@ -695,7 +693,11 @@ async def compile_job(job):
used_remote_tmpdir = await job._compile(local_tmpdir, batch_remote_tmpdir, dry_run=dry_run)
pbar.update(1)
return used_remote_tmpdir
used_remote_tmpdir_results = await bounded_gather(*[functools.partial(compile_job, j) for j in unsubmitted_jobs], parallelism=150)
used_remote_tmpdir_results = await bounded_gather(
*[functools.partial(compile_job, j) for j in unsubmitted_jobs],
parallelism=150,
cancel_on_error=True,
)
used_remote_tmpdir |= any(used_remote_tmpdir_results)

for job in track(unsubmitted_jobs, description='create job objects', disable=disable_setup_steps_progress_bar):
Expand Down
13 changes: 5 additions & 8 deletions hail/python/hailtop/batch/batch_pool_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional, Callable, Type, Union, List, Any, Iterable
from typing import Optional, Callable, Type, Union, List, Any, Iterable, AsyncGenerator
from types import TracebackType
from io import BytesIO
import warnings
Expand All @@ -7,7 +7,7 @@
import dill
import functools

from hailtop.utils import secret_alnum_string, partition
from hailtop.utils import secret_alnum_string, partition, async_to_blocking
import hailtop.batch_client.aioclient as low_level_batch_client
from hailtop.batch_client.parse import parse_cpu_in_mcpu
from hailtop.aiotools.router_fs import RouterAsyncFS
Expand All @@ -30,10 +30,6 @@ def chunkedfn(*args):
return chunkedfn


def async_to_blocking(coro):
return asyncio.get_event_loop().run_until_complete(coro)


class BatchPoolExecutor:
"""An executor which executes Python functions in the cloud.
Expand Down Expand Up @@ -229,10 +225,11 @@ async def async_map(self,
fn: Callable,
iterables: Iterable[Iterable[Any]],
timeout: Optional[Union[int, float]] = None,
chunksize: int = 1):
chunksize: int = 1
) -> AsyncGenerator[int, None]:
"""Aysncio compatible version of :meth:`.map`."""
if not iterables:
return iter([])
return (x for x in range(0))

if chunksize > 1:
list_per_argument = [list(x) for x in iterables]
Expand Down
1 change: 1 addition & 0 deletions hail/python/hailtop/batch_client/aioclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ async def _submit_job_bunches(self,
for bunch, size in zip(byte_job_specs_bunches, bunch_sizes)
],
parallelism=6,
cancel_on_error=True,
)

async def _submit(self,
Expand Down
2 changes: 1 addition & 1 deletion hail/python/hailtop/cleanup_gcr/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,4 @@ async def main():
cleanup_images.shutdown()


asyncio.get_event_loop().run_until_complete(main())
asyncio.run(main())
2 changes: 0 additions & 2 deletions hail/python/hailtop/fs/router_fs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List, AsyncContextManager, BinaryIO, Optional, Tuple, Dict, Any
import asyncio
import io
import nest_asyncio
import os
import functools
import glob
Expand Down Expand Up @@ -175,7 +174,6 @@ def __init__(self,
gcs_kwargs: Optional[Dict[str, Any]] = None,
azure_kwargs: Optional[Dict[str, Any]] = None,
s3_kwargs: Optional[Dict[str, Any]] = None):
nest_asyncio.apply()
if afs and (local_kwargs or gcs_kwargs or azure_kwargs or s3_kwargs):
raise ValueError(
f'If afs is specified, no other arguments may be specified: {afs=}, {local_kwargs=}, {gcs_kwargs=}, {azure_kwargs=}, {s3_kwargs=}'
Expand Down
16 changes: 16 additions & 0 deletions hail/python/hailtop/hail_event_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import asyncio
import nest_asyncio


def hail_event_loop():
'''If a running event loop exists, use nest_asyncio to allow Hail's event loops to nest inside
it.
If no event loop exists, ask asyncio to get one for us.
'''

try:
asyncio.get_running_loop()
nest_asyncio.apply()
return asyncio.get_running_loop()
except RuntimeError:
return asyncio.get_event_loop()
2 changes: 1 addition & 1 deletion hail/python/hailtop/hailctl/batch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,4 @@ def submit(
def initialize(
verbose: Ann[bool, Opt('--verbose', '-v', help='Print gcloud commands being executed')] = False
):
asyncio.get_event_loop().run_until_complete(async_basic_initialize(verbose=verbose))
asyncio.run(async_basic_initialize(verbose=verbose))
2 changes: 1 addition & 1 deletion hail/python/hailtop/hailctl/describe.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def describe(
'''
Describe the MatrixTable or Table at path FILE.
'''
asyncio.get_event_loop().run_until_complete(async_describe(file, requester_pays_project_id))
asyncio.run(async_describe(file, requester_pays_project_id))


async def async_describe(
Expand Down
2 changes: 1 addition & 1 deletion hail/python/hailtop/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ google-auth>=2.14.1,<3
google-auth-oauthlib>=0.5.2,<1
humanize>=1.0.0,<2
janus>=0.6,<1.1
nest_asyncio>=1.5.4,<2
nest_asyncio>=1.5.8,<2
orjson>=3.6.4,<4
protobuf==3.20.2
rich>=12.6.0,<13
Expand Down
90 changes: 13 additions & 77 deletions hail/python/hailtop/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import (Any, Callable, TypeVar, Awaitable, Mapping, Optional, Type, List, Dict, Iterable, Tuple,
Generic, cast, AsyncIterator, Iterator, Union)
from typing import (Any, Callable, TypeVar, Awaitable, Mapping, Optional, Type, List, Dict,
Iterable, Tuple, AsyncIterator, Iterator, Union)
from typing import Literal, Sequence
from typing_extensions import ParamSpec
from types import TracebackType
Expand Down Expand Up @@ -27,6 +27,7 @@
from urllib3.poolmanager import PoolManager

from .time import time_msecs
from ..hail_event_loop import hail_event_loop

try:
import aiodocker # pylint: disable=import-error
Expand Down Expand Up @@ -153,7 +154,7 @@ def unzip(lst: Iterable[Tuple[T, U]]) -> Tuple[List[T], List[U]]:


def async_to_blocking(coro: Awaitable[T]) -> T:
loop = asyncio.get_event_loop()
loop = hail_event_loop()
task = asyncio.ensure_future(coro)
try:
return loop.run_until_complete(task)
Expand All @@ -178,86 +179,21 @@ async def blocking_to_async(thread_pool: concurrent.futures.Executor,
fun: Callable[..., T],
*args,
**kwargs) -> T:
return await asyncio.get_event_loop().run_in_executor(
return await asyncio.get_running_loop().run_in_executor(
thread_pool, lambda: fun(*args, **kwargs))


async def bounded_gather(*pfs: Callable[[], Awaitable[T]],
parallelism: int = 10,
return_exceptions: bool = False
return_exceptions: bool = False,
cancel_on_error = False,
) -> List[T]:
gatherer = AsyncThrottledGather[T](*pfs,
parallelism=parallelism,
return_exceptions=return_exceptions)
return await gatherer.wait()


class AsyncThrottledGather(Generic[T]):
def __init__(self,
*pfs: Callable[[], Awaitable[T]],
parallelism: int = 10,
return_exceptions: bool = False):
self.count = len(pfs)
self.n_finished = 0

self._queue: asyncio.Queue[Tuple[int, Callable[[], Awaitable[T]]]] = asyncio.Queue()
self._done = asyncio.Event()
self._return_exceptions = return_exceptions

self._results: List[Union[T, Exception, None]] = [None] * len(pfs)
self._errors: List[BaseException] = []

self._workers: List[asyncio.Task] = []
for _ in range(parallelism):
self._workers.append(asyncio.create_task(self._worker()))

for i, pf in enumerate(pfs):
self._queue.put_nowait((i, pf))

def _cancel_workers(self):
for worker in self._workers:
try:
if worker.done() and not worker.cancelled():
exc = worker.exception()
if exc:
raise exc
else:
worker.cancel()
except Exception:
pass

async def _worker(self):
while True:
i, pf = await self._queue.get()

try:
res = await pf()
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception as err: # pylint: disable=broad-except
res = err # type: ignore
if not self._return_exceptions:
self._errors.append(err)
self._done.set()
return

self._results[i] = res
self.n_finished += 1

if self.n_finished == self.count:
self._done.set()

async def wait(self) -> List[T]:
try:
if self.count > 0:
await self._done.wait()
finally:
self._cancel_workers()

if self._errors:
raise self._errors[0]

return cast(List[T], self._results)
return await bounded_gather2(
asyncio.Semaphore(parallelism),
*pfs,
return_exceptions=return_exceptions,
cancel_on_error=cancel_on_error
)


class AsyncWorkerPool:
Expand Down
3 changes: 2 additions & 1 deletion hail/python/test/hail/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from hail import current_backend, init, reset_global_randomness
from hail.backend.service_backend import ServiceBackend
from hailtop.utils import secret_alnum_string
from .helpers import hl_init_for_test, hl_stop_for_test


Expand Down Expand Up @@ -77,6 +78,6 @@ def reinitialize_hail_for_each_qob_test(init_hail, request):
report: Dict[str, CollectReport] = request.node.stash[test_results_key]
if any(r.failed for r in report.values()):
log.info(f'cancelling failed test batch {batch.id}')
asyncio.get_event_loop().run_until_complete(batch.cancel())
hail_event_loop().run_until_complete(batch.cancel())
else:
yield
12 changes: 12 additions & 0 deletions hail/python/test/hail/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ def wrapper(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T:
return wrapper


def skip_when_local_backend(reason='skipping for Local Backend'):
from hail.backend.local_backend import LocalBackend
@decorator
def wrapper(func, *args, **kwargs):
if isinstance(hl.utils.java.Env.backend(), LocalBackend):
raise unittest.SkipTest(reason)
else:
return func(*args, **kwargs)

return wrapper


def skip_when_service_backend(reason='skipping for Service Backend'):
from hail.backend.service_backend import ServiceBackend
@decorator
Expand Down
39 changes: 39 additions & 0 deletions hail/python/test/hail/test_hail_in_notebook.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "45c29ec3-dd8f-4805-a24f-462818628992",
"metadata": {},
"outputs": [],
"source": [
"import hail as hl\n",
"import os\n",
"hl.utils.range_table(1).write(f'{os.environ[\"HAIL_TEST_STORAGE_URI\"]}/test_hail_in_notebook.ht')\n",
"from helpers import resource\n",
"hl.read_table(resource('backward_compatability/1.7.0/table/9.ht')).count()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
12 changes: 12 additions & 0 deletions hail/python/test/hail/test_hail_in_notebook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from hailtop.utils.process import sync_check_exec
import os
import pathlib
from .helpers import skip_when_local_backend


@skip_when_local_backend('In the LocalBackend, writing to a gs:// URL hangs indefinitely https://github.com/hail-is/hail/issues/13904')
def test_hail_in_notebook():
folder = pathlib.Path(__file__).parent.resolve()
source_ipynb = os.path.join(folder, 'test_hail_in_notebook.ipynb')
output_ipynb = os.path.join(folder, 'test_hail_in_notebook_out.ipynb')
sync_check_exec('jupyter', 'nbconvert', '--to', 'notebook', '--execute', str(source_ipynb), '--output', str(output_ipynb))
Loading

0 comments on commit a53f4ff

Please sign in to comment.