Skip to content

Commit

Permalink
Fix race condition when recording Dask cluster state
Browse files Browse the repository at this point in the history
Previously, running `covalent start` would cause the config file to be
updated from two processes:
* the CLI runner, after confirming that the dispatcher server is up,
* the DaskCluster process, which writes the state (such as
scheduler_address).

Unfortunately these updates were not synchronized with each other. If
the Dask cluster finishes starting up and writes out its state before
the CLI runner returns from `_graceful_start()`, the latter's config
file update would obliterate the Dask cluster info.

This patch ensures that the Dask cluster info is written to disk
before the FastAPI server starts.
  • Loading branch information
cjao committed Nov 7, 2023
1 parent a5cbb95 commit 14f7258
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 18 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,18 @@ jobs:
echo "Invalid backend specified in test matrix."
exit 1
fi
cat $HOME/.config/covalent/covalent.conf
env:
COVALENT_EXECUTOR_DIR: doc/source/how_to/execution/custom_executors

- name: Print Covalent status
if: env.BUILD_AND_RUN_ALL
id: covalent_status
run: |
covalent status
covalent cluster --info
covalent cluster --logs
- name: Run functional tests and measure coverage
id: functional-tests
if: env.BUILD_AND_RUN_ALL
Expand Down
10 changes: 8 additions & 2 deletions covalent_dispatcher/_cli/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from rich.table import Table
from rich.text import Text

from covalent._shared_files.config import ConfigManager, get_config, set_config
from covalent._shared_files.config import ConfigManager, get_config, reload_config, set_config

from .._db.datastore import DataStore
from .migrate import migrate_pickled_result_object
Expand Down Expand Up @@ -241,6 +241,11 @@ def _graceful_start(
except requests.exceptions.ConnectionError:
time.sleep(1)

# Since the dispatcher process might update the config file with the Dask cluster's state,
# we need to sync those changes with the CLI's ConfigManager instance. Otherwise the next
# call to `set_config()` from this module would obliterate the Dask cluster state.
reload_config()

Path(get_config("dispatcher.cache_dir")).mkdir(parents=True, exist_ok=True)
Path(get_config("dispatcher.results_dir")).mkdir(parents=True, exist_ok=True)
Path(get_config("dispatcher.log_dir")).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -858,11 +863,12 @@ async def _get_cluster_logs(uri):

def _get_cluster_admin_address():
try:
reload_config()
admin_host = get_config("dask.admin_host")
admin_port = get_config("dask.admin_port")
admin_server_addr = unparse_address("tcp", f"{admin_host}:{admin_port}")
return admin_server_addr
except KeyError:
except KeyError as e:

Check warning on line 871 in covalent_dispatcher/_cli/service.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_cli/service.py#L871

Added line #L871 was not covered by tests
return


Expand Down
33 changes: 20 additions & 13 deletions covalent_dispatcher/_service/app_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,25 @@
import os
from logging import Logger
from multiprocessing import Process, current_process
from multiprocessing.connection import Connection
from threading import Thread

import dask.config
from dask.distributed import LocalCluster
from distributed.core import Server, rpc
from fastapi import APIRouter

from covalent._shared_files import logger
from covalent._shared_files.config import get_config, update_config
from covalent._shared_files.config import get_config
from covalent._shared_files.utils import get_random_available_port

app_log = logger.app_log

# Configure dask to not allow daemon workers
dask.config.set({"distributed.worker.daemon": False})

router: APIRouter = APIRouter()


class DaskAdminWorker(Thread):
"""
Expand Down Expand Up @@ -170,12 +174,15 @@ class DaskCluster(Process):
randomly selected TCP port that is available
"""

def __init__(self, name: str, logger: Logger):
def __init__(self, name: str, logger: Logger, conn: Connection):
super(DaskCluster, self).__init__()
self.name = name
self.logger = logger
self.cluster = None

# For sending cluster state back to main covalent process
self.conn = conn

Check warning on line 184 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L184

Added line #L184 was not covered by tests

# Cluster configuration
self.num_workers = None
self.mem_per_worker = None
Expand Down Expand Up @@ -219,18 +226,18 @@ def run(self):
dashboard_link = self.cluster.dashboard_link

try:
update_config(
{
"dask": {
"scheduler_address": scheduler_address,
"dashboard_link": dashboard_link,
"process_info": current_process(),
"pid": os.getpid(),
"admin_host": self.admin_host,
"admin_port": self.admin_port,
}
dask_config = {

Check warning on line 229 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L229

Added line #L229 was not covered by tests
"dask": {
"scheduler_address": scheduler_address,
"dashboard_link": dashboard_link,
"process_info": str(current_process()),
"pid": os.getpid(),
"admin_host": self.admin_host,
"admin_port": self.admin_port,
}
)
}

self.conn.send(dask_config)

Check warning on line 240 in covalent_dispatcher/_service/app_dask.py

View check run for this annotation

Codecov / codecov/patch

covalent_dispatcher/_service/app_dask.py#L240

Added line #L240 was not covered by tests

admin = DaskAdminWorker(self.cluster, self.admin_host, self.admin_port, self.logger)
admin.start()
Expand Down
3 changes: 2 additions & 1 deletion covalent_ui/api/v1/routes/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from fastapi import APIRouter

from covalent_dispatcher._service import app, assets, runnersvc
from covalent_dispatcher._service import app, app_dask, assets, runnersvc
from covalent_dispatcher._triggers_app.app import router as tr_router
from covalent_ui.api.v1.routes.end_points import (
electron_routes,
Expand All @@ -39,6 +39,7 @@
routes.include_router(electron_routes.routes, prefix=dispatch_prefix, tags=["Electrons"])
routes.include_router(settings_routes.routes, prefix="/api/v1", tags=["Settings"])
routes.include_router(logs_route.routes, prefix="/api/v1/logs", tags=["Logs"])
routes.include_router(app_dask.router, prefix="/api/v0/dask", tags=["Dask"])
routes.include_router(tr_router, prefix="/api", tags=["Triggers"])
routes.include_router(app.router, prefix="/api/v2", tags=["Dispatcher"])
routes.include_router(assets.router, prefix="/api/v2", tags=["Assets"])
Expand Down
8 changes: 6 additions & 2 deletions covalent_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

import argparse
import os
from multiprocessing import Pipe

import socketio
import uvicorn
from fastapi import Request
from fastapi.templating import Jinja2Templates

from covalent._shared_files import logger
from covalent._shared_files.config import get_config
from covalent._shared_files.config import get_config, update_config
from covalent_dispatcher._service.app_dask import DaskCluster
from covalent_dispatcher._triggers_app import triggers_only_app # nopycln: import
from covalent_ui.api.main import app as fastapi_app
Expand Down Expand Up @@ -110,8 +111,11 @@ def get_home(request: Request, rest_of_path: str):

# Start dask if no-cluster flag is not specified (covalent stop auto terminates all child processes of this)
if args.cluster:
dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log)
parent_conn, child_conn = Pipe()
dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log, conn=child_conn)

Check warning on line 115 in covalent_ui/app.py

View check run for this annotation

Codecov / codecov/patch

covalent_ui/app.py#L114-L115

Added lines #L114 - L115 were not covered by tests
dask_cluster.start()
dask_config = parent_conn.recv()
update_config(dask_config)

Check warning on line 118 in covalent_ui/app.py

View check run for this annotation

Codecov / codecov/patch

covalent_ui/app.py#L117-L118

Added lines #L117 - L118 were not covered by tests

app_name = "app:fastapi_app"
if args.triggers_only:
Expand Down

0 comments on commit 14f7258

Please sign in to comment.