From 14f72582b7165613da11a88337010405f4432682 Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Thu, 2 Nov 2023 21:43:35 -0400 Subject: [PATCH] Fix race condition when recording Dask cluster state Previously, running `covalent start` would cause the config file to be updated from two processes: * the CLI runner, after confirming that the dispatcher server is up, * the DaskCluster process, which writes the state (such as scheduler_address). Unfortunately these updates were not synchronized with each other. If the Dask cluster finishes starting up and writes out its state before the CLI runner returns from `_graceful_start()`, the latter's config file update would obliterate the Dask cluster info. This patch ensures that the Dask cluster info is written to disk before the FastAPI server starts. --- .github/workflows/tests.yml | 9 +++++++ covalent_dispatcher/_cli/service.py | 10 +++++-- covalent_dispatcher/_service/app_dask.py | 33 ++++++++++++++---------- covalent_ui/api/v1/routes/routes.py | 3 ++- covalent_ui/app.py | 8 ++++-- 5 files changed, 45 insertions(+), 18 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 43758a1f3..cf30e1f1e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -218,9 +218,18 @@ jobs: echo "Invalid backend specified in test matrix." exit 1 fi + cat $HOME/.config/covalent/covalent.conf env: COVALENT_EXECUTOR_DIR: doc/source/how_to/execution/custom_executors + - name: Print Covalent status + if: env.BUILD_AND_RUN_ALL + id: covalent_status + run: | + covalent status + covalent cluster --info + covalent cluster --logs + - name: Run functional tests and measure coverage id: functional-tests if: env.BUILD_AND_RUN_ALL diff --git a/covalent_dispatcher/_cli/service.py b/covalent_dispatcher/_cli/service.py index bcd7a5557..ef7cab6b0 100644 --- a/covalent_dispatcher/_cli/service.py +++ b/covalent_dispatcher/_cli/service.py @@ -51,7 +51,7 @@ from rich.table import Table from rich.text import Text -from covalent._shared_files.config import ConfigManager, get_config, set_config +from covalent._shared_files.config import ConfigManager, get_config, reload_config, set_config from .._db.datastore import DataStore from .migrate import migrate_pickled_result_object @@ -241,6 +241,11 @@ def _graceful_start( except requests.exceptions.ConnectionError: time.sleep(1) + # Since the dispatcher process might update the config file with the Dask cluster's state, + # we need to sync those changes with the CLI's ConfigManager instance. Otherwise the next + # call to `set_config()` from this module would obliterate the Dask cluster state. + reload_config() + Path(get_config("dispatcher.cache_dir")).mkdir(parents=True, exist_ok=True) Path(get_config("dispatcher.results_dir")).mkdir(parents=True, exist_ok=True) Path(get_config("dispatcher.log_dir")).mkdir(parents=True, exist_ok=True) @@ -858,11 +863,12 @@ async def _get_cluster_logs(uri): def _get_cluster_admin_address(): try: + reload_config() admin_host = get_config("dask.admin_host") admin_port = get_config("dask.admin_port") admin_server_addr = unparse_address("tcp", f"{admin_host}:{admin_port}") return admin_server_addr - except KeyError: + except KeyError as e: return diff --git a/covalent_dispatcher/_service/app_dask.py b/covalent_dispatcher/_service/app_dask.py index 58a4bd0bc..854615bd9 100644 --- a/covalent_dispatcher/_service/app_dask.py +++ b/covalent_dispatcher/_service/app_dask.py @@ -20,14 +20,16 @@ import os from logging import Logger from multiprocessing import Process, current_process +from multiprocessing.connection import Connection from threading import Thread import dask.config from dask.distributed import LocalCluster from distributed.core import Server, rpc +from fastapi import APIRouter from covalent._shared_files import logger -from covalent._shared_files.config import get_config, update_config +from covalent._shared_files.config import get_config from covalent._shared_files.utils import get_random_available_port app_log = logger.app_log @@ -35,6 +37,8 @@ # Configure dask to not allow daemon workers dask.config.set({"distributed.worker.daemon": False}) +router: APIRouter = APIRouter() + class DaskAdminWorker(Thread): """ @@ -170,12 +174,15 @@ class DaskCluster(Process): randomly selected TCP port that is available """ - def __init__(self, name: str, logger: Logger): + def __init__(self, name: str, logger: Logger, conn: Connection): super(DaskCluster, self).__init__() self.name = name self.logger = logger self.cluster = None + # For sending cluster state back to main covalent process + self.conn = conn + # Cluster configuration self.num_workers = None self.mem_per_worker = None @@ -219,18 +226,18 @@ def run(self): dashboard_link = self.cluster.dashboard_link try: - update_config( - { - "dask": { - "scheduler_address": scheduler_address, - "dashboard_link": dashboard_link, - "process_info": current_process(), - "pid": os.getpid(), - "admin_host": self.admin_host, - "admin_port": self.admin_port, - } + dask_config = { + "dask": { + "scheduler_address": scheduler_address, + "dashboard_link": dashboard_link, + "process_info": str(current_process()), + "pid": os.getpid(), + "admin_host": self.admin_host, + "admin_port": self.admin_port, } - ) + } + + self.conn.send(dask_config) admin = DaskAdminWorker(self.cluster, self.admin_host, self.admin_port, self.logger) admin.start() diff --git a/covalent_ui/api/v1/routes/routes.py b/covalent_ui/api/v1/routes/routes.py index 9b6c50b45..6727c9260 100644 --- a/covalent_ui/api/v1/routes/routes.py +++ b/covalent_ui/api/v1/routes/routes.py @@ -18,7 +18,7 @@ from fastapi import APIRouter -from covalent_dispatcher._service import app, assets, runnersvc +from covalent_dispatcher._service import app, app_dask, assets, runnersvc from covalent_dispatcher._triggers_app.app import router as tr_router from covalent_ui.api.v1.routes.end_points import ( electron_routes, @@ -39,6 +39,7 @@ routes.include_router(electron_routes.routes, prefix=dispatch_prefix, tags=["Electrons"]) routes.include_router(settings_routes.routes, prefix="/api/v1", tags=["Settings"]) routes.include_router(logs_route.routes, prefix="/api/v1/logs", tags=["Logs"]) +routes.include_router(app_dask.router, prefix="/api/v0/dask", tags=["Dask"]) routes.include_router(tr_router, prefix="/api", tags=["Triggers"]) routes.include_router(app.router, prefix="/api/v2", tags=["Dispatcher"]) routes.include_router(assets.router, prefix="/api/v2", tags=["Assets"]) diff --git a/covalent_ui/app.py b/covalent_ui/app.py index aa2c830a5..bf1d473eb 100644 --- a/covalent_ui/app.py +++ b/covalent_ui/app.py @@ -16,6 +16,7 @@ import argparse import os +from multiprocessing import Pipe import socketio import uvicorn @@ -23,7 +24,7 @@ from fastapi.templating import Jinja2Templates from covalent._shared_files import logger -from covalent._shared_files.config import get_config +from covalent._shared_files.config import get_config, update_config from covalent_dispatcher._service.app_dask import DaskCluster from covalent_dispatcher._triggers_app import triggers_only_app # nopycln: import from covalent_ui.api.main import app as fastapi_app @@ -110,8 +111,11 @@ def get_home(request: Request, rest_of_path: str): # Start dask if no-cluster flag is not specified (covalent stop auto terminates all child processes of this) if args.cluster: - dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log) + parent_conn, child_conn = Pipe() + dask_cluster = DaskCluster(name="LocalDaskCluster", logger=app_log, conn=child_conn) dask_cluster.start() + dask_config = parent_conn.recv() + update_config(dask_config) app_name = "app:fastapi_app" if args.triggers_only: