Skip to content

Commit

Permalink
Memory improvements (3/3): Introduce new runner and executor API (#1690)
Browse files Browse the repository at this point in the history
* Mem (1/3): introduce new DAL and core Pydantic models

* Mem (1/3): Fix schemas

* Mem (1/3): DAL PR: temporarily redirect core dispatcher tests

* Mem (1/3): DAL PR: fix tests

Introduce temporary implementations of `update._node` and
`update.lattice_data`. These will be removed once core covalent is
transitioned to the new DAL.

* Mem (1/3): Fix requirements workflow

Change abs imports to rel imports. Needed to please pip-missing-reqs.

* Mem (1/3): Uncomment boilerplate in disabled unit tests

* Mem (1/3): Add unit test for format_server_url

* Mem (1/3): defer copy_file_locally to next PR

* Mem (1/3): update changelog

* Mem (1/3): Core DAL improvements

- Improve type hints

* Mem (2/3): Revert "DAL PR: fix tests"

* Mem (2/3): Revert "Mem (1/3): defer copy_file_locally to next PR"

This reverts commit a3ab70b.

* Mem (2/3): Revert "Mem (1/3): Uncomment boilerplate in disabled unit tests"

* Mem (2/3): Revert "Mem (1/3): Fix requirements workflow"

* Mem (2/3): Revert "DAL PR: temporarily redirect core dispatcher tests"

This reverts commit 388df38236ebec7555ec7e83ffc1834427b46650.

* Mem (2/3): Core migration -- Re-enable electron_tests

* Mem (2/3): migrate core to new DAL

* Mem (2/3): redirect dispatcher to in-memory runner

Make API endpoints restful

Cancel all dispatches upon shutdown

* Mem (2/3): Update changelog

* Mem (3/3): Revert "Mem (2/3): redirect dispatcher to in-memory runner"

* Mem (3/3): introduce new runner and executor API

* Mem (3/3): Update changelog

* updated license to Apache

* some fixes to make dispatching work

* reverted some changes to make the funcs sync

* changes after reviewing

* fixing tests, removed cancel_requested from Electron model

* fixing tests, sublattice issue fixed

* fixing tests, cancellation issue fixed

* fixing tests, sdk issue fixed

* cancel_tasks import location changed

* tg utils import location changed

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests

* fixing tests, skipping some ui backend tests

* Fix UI backend tests

* Undo some changes to UI assert data

* renamed core dispatcher tests

* removing the _db/load.py file

* fixing license

* fixing tests

* fixing tests

* fixing tests

* reverting dask related changes

* fixed dask issue

* skipping test as it requires rewriting it

* DAL: Remember to update Record attrs on Record.update()

* Tests: force task packing for ft

* Tests: Revert force task packing for ft

* Tests: enable task packing during FT

* SDK: don't pickle lattice_imports or cova_imports

* Fix race condition when recording Dask cluster state

Running `covalent start` causes the config file to be
updated from two processes:
* the CLI runner, after `_graceful_start()` returns, and
* the DaskCluster process, which records the cluster state after the
cluster starts up.

Unfortunately, the CLI runner previously wrote out the config it
loaded into memory before starting the dispatcher process. Its
ConfigManager instance was therefore unaware of any config file
updates that might have happened before `_graceful_start()`
returned. If the Dask cluster were to finish starting up and write out
its state before the CLI runner returned from `_graceful_start()`, the
CLI's config file update would obliterate the Dask cluster info.

This patch refreshes the CLI app's ConfigManager instance from the
on-disk config file after `_graceful_start()` and ensures that the
Dask cluster finishes starting before `_graceful_start()` returns.

* added more dask tests

* added local executor tests

* added more local executor tests

---------

Co-authored-by: sankalp <[email protected]>
  • Loading branch information
cjao and kessler-frost authored Nov 9, 2023
1 parent 7cfee65 commit 1c5ae4a
Show file tree
Hide file tree
Showing 40 changed files with 3,544 additions and 301 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,25 @@ jobs:
run: |
covalent db migrate
if [ "${{ matrix.backend }}" = 'dask' ] ; then
covalent start -d
COVALENT_ENABLE_TASK_PACKING=1 covalent start -d
elif [ "${{ matrix.backend }}" = 'local' ] ; then
covalent start --no-cluster -d
else
echo "Invalid backend specified in test matrix."
exit 1
fi
cat $HOME/.config/covalent/covalent.conf
env:
COVALENT_EXECUTOR_DIR: doc/source/how_to/execution/custom_executors

- name: Print Covalent status
if: env.BUILD_AND_RUN_ALL
id: covalent_status
run: |
covalent status
covalent cluster --info
covalent cluster --logs
- name: Run functional tests and measure coverage
id: functional-tests
if: env.BUILD_AND_RUN_ALL
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Documentation and test cases for database triggers.
- Added the `__pow__` method to the `Electron` class
- New Runner and executor API to bypass server-side memory when running tasks.

### Docs

- Added federated learning showcase code
- Updated tutorial for redispatching workflows with Streamlit
- Updated tutorial for redispatching workflows with Streamlit

### Tests

Expand Down
4 changes: 2 additions & 2 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"inputs": AssetType.TRANSPORTABLE,
"named_args": AssetType.TRANSPORTABLE,
"named_kwargs": AssetType.TRANSPORTABLE,
"cova_imports": AssetType.OBJECT,
"lattice_imports": AssetType.OBJECT,
"cova_imports": AssetType.JSONABLE,
"lattice_imports": AssetType.TEXT,
"deps": AssetType.JSONABLE,
"call_before": AssetType.JSONABLE,
"call_after": AssetType.JSONABLE,
Expand Down
4 changes: 2 additions & 2 deletions covalent/_shared_files/schemas/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@
LATTICE_DEPS_FILENAME = "deps.json"
LATTICE_CALL_BEFORE_FILENAME = "call_before.json"
LATTICE_CALL_AFTER_FILENAME = "call_after.json"
LATTICE_COVA_IMPORTS_FILENAME = "cova_imports.pkl"
LATTICE_LATTICE_IMPORTS_FILENAME = "lattice_imports.pkl"
LATTICE_COVA_IMPORTS_FILENAME = "cova_imports.json"
LATTICE_LATTICE_IMPORTS_FILENAME = "lattice_imports.txt"
LATTICE_STORAGE_TYPE = "file"


Expand Down
11 changes: 4 additions & 7 deletions covalent/_shared_files/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import shutil
import socket
from datetime import timedelta
from typing import Any, Callable, Dict, Set, Tuple
from typing import Any, Callable, Dict, List, Tuple

import cloudpickle
from pennylane._device import Device
Expand All @@ -37,9 +37,6 @@
DEFAULT_UI_PORT = get_config("user_interface.port")


# Dictionary to map Dask clients to their scheduler addresses
_address_client_mapper = {}

_IMPORT_PATH_SEPARATOR = ":"


Expand Down Expand Up @@ -141,7 +138,7 @@ def get_serialized_function_str(function):
return function_str + "\n\n"


def get_imports(func: Callable) -> Tuple[str, Set[str]]:
def get_imports(func: Callable) -> Tuple[str, List[str]]:
"""
Given an input workflow function, find the imports that were used, and determine
which ones are Covalent-related.
Expand All @@ -155,7 +152,7 @@ def get_imports(func: Callable) -> Tuple[str, Set[str]]:
"""

imports_str = ""
cova_imports = set()
cova_imports = []
for i, j in func.__globals__.items():
if inspect.ismodule(j) or (
inspect.isfunction(j) and j.__name__ in ["lattice", "electron"]
Expand All @@ -167,7 +164,7 @@ def get_imports(func: Callable) -> Tuple[str, Set[str]]:

if j.__name__ in ["covalent", "lattice", "electron"]:
import_line = f"# {import_line}"
cova_imports.add(i)
cova_imports.append(i)

imports_str += import_line

Expand Down
4 changes: 1 addition & 3 deletions covalent/_workflow/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ def __init__(
self.named_kwargs = None
self.electron_outputs = {}
self.lattice_imports, self.cova_imports = get_imports(self.workflow_function)
self.cova_imports.update({"electron"})

self.workflow_function = TransportableObject.make_transportable(self.workflow_function)

Expand Down Expand Up @@ -110,13 +109,11 @@ def serialize_to_json(self) -> str:
for node_name, output in self.electron_outputs.items():
attributes["electron_outputs"][node_name] = output.to_dict()

attributes["cova_imports"] = list(self.cova_imports)
return json.dumps(attributes)

@staticmethod
def deserialize_from_json(json_data: str) -> None:
attributes = json.loads(json_data)
attributes["cova_imports"] = set(attributes["cova_imports"])

for node_name, object_dict in attributes["electron_outputs"].items():
attributes["electron_outputs"][node_name] = TransportableObject.from_dict(object_dict)
Expand Down Expand Up @@ -211,6 +208,7 @@ def build_graph(self, *args, **kwargs) -> None:
self.inputs = TransportableObject({"args": args, "kwargs": kwargs})
self.named_args = TransportableObject(named_args)
self.named_kwargs = TransportableObject(named_kwargs)
self.lattice_imports, self.cova_imports = get_imports(workflow_function)

# Set any lattice metadata not explicitly set by the user
constraint_names = {"executor", "workflow_executor", "deps", "call_before", "call_after"}
Expand Down
2 changes: 1 addition & 1 deletion covalent/executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from .._shared_files import logger
from .._shared_files.config import get_config, update_config
from ..quantum import QCluster, Simulator
from .base import BaseExecutor, wrapper_fn
from .base import BaseExecutor

app_log = logger.app_log
log_stack_info = logger.log_stack_info
Expand Down
Loading

0 comments on commit 1c5ae4a

Please sign in to comment.