-
-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace legacy commands with 'dask worker' and 'dask scheduler'. #399
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,7 +77,7 @@ class Task: | |
AWS resource tags to be applied to any resources that are created. | ||
|
||
name: str (optional) | ||
Name for the task. Currently used for the --namecommand line argument to dask-worker. | ||
Name for the task. Currently used for the --namecommand line argument to `dask worker`. | ||
|
||
platform_version: str (optional) | ||
Version of the AWS Fargate platform to use, e.g. "1.4.0" or "LATEST". This | ||
|
@@ -368,7 +368,7 @@ class Scheduler(Task): | |
scheduler_timeout: str | ||
Time of inactivity after which to kill the scheduler. | ||
scheduler_extra_args: List[str] (optional) | ||
Any extra command line arguments to pass to dask-scheduler, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
Any extra command line arguments to pass to ``dask scheduler``, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
|
||
Defaults to `None`, no extra command line arguments. | ||
kwargs: | ||
|
@@ -386,7 +386,8 @@ def __init__( | |
self.task_type = "scheduler" | ||
self._overrides = { | ||
"command": [ | ||
"dask-scheduler", | ||
"dask", | ||
"scheduler", | ||
"--idle-timeout", | ||
scheduler_timeout, | ||
] | ||
|
@@ -434,24 +435,25 @@ def __init__( | |
self._mem = mem | ||
self._gpu = gpu | ||
self._nthreads = nthreads | ||
_command = [ | ||
"dask", | ||
"cuda" if self._gpu else None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm apprehensive about having a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a None but it gets filtered out before we use it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I found the introduction and filtering of I think it would be more readable to have something like if self._gpu:
_command = ["dask", "cuda", "worker"]
else:
_command = ["dask", "worker"]
_command += [OTHERARGS...] |
||
"worker", | ||
self.scheduler, | ||
"--name", | ||
str(self.name), | ||
"--nthreads", | ||
"{}".format( | ||
max(int(self._cpu / 1024), 1) if nthreads is None else self._nthreads | ||
), | ||
"--memory-limit", | ||
"{}GB".format(int(self._mem / 1024)), | ||
"--death-timeout", | ||
"60", | ||
] | ||
_command = [e for e in _command if e is not None] | ||
self._overrides = { | ||
"command": [ | ||
"dask-cuda-worker" if self._gpu else "dask-worker", | ||
self.scheduler, | ||
"--name", | ||
str(self.name), | ||
"--nthreads", | ||
"{}".format( | ||
max(int(self._cpu / 1024), 1) | ||
if nthreads is None | ||
else self._nthreads | ||
), | ||
"--memory-limit", | ||
"{}GB".format(int(self._mem / 1024)), | ||
"--death-timeout", | ||
"60", | ||
] | ||
+ (list() if not extra_args else extra_args) | ||
"command": _command + (list() if not extra_args else extra_args) | ||
} | ||
|
||
|
||
|
@@ -507,7 +509,7 @@ class ECSCluster(SpecCluster, ConfigMixin): | |
|
||
Defaults to ``8786`` | ||
scheduler_extra_args: List[str] (optional) | ||
Any extra command line arguments to pass to dask-scheduler, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
Any extra command line arguments to pass to ``dask scheduler``, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
|
||
Defaults to `None`, no extra command line arguments. | ||
scheduler_task_definition_arn: str (optional) | ||
|
@@ -553,7 +555,7 @@ class ECSCluster(SpecCluster, ConfigMixin): | |
Defaults to `None`, meaning that the task definition will be created along with the cluster, and cleaned up once | ||
the cluster is shut down. | ||
worker_extra_args: List[str] (optional) | ||
Any extra command line arguments to pass to dask-worker, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
Any extra command line arguments to pass to ``dask worker``, e.g. ``["--tls-cert", "/path/to/cert.pem"]`` | ||
|
||
Defaults to `None`, no extra command line arguments. | ||
worker_task_kwargs: dict (optional) | ||
|
@@ -702,7 +704,7 @@ class ECSCluster(SpecCluster, ConfigMixin): | |
... worker_gpu=1) | ||
|
||
By setting the ``worker_gpu`` option to something other than ``None`` will cause the cluster | ||
to run ``dask-cuda-worker`` as the worker startup command. Setting this option will also change | ||
to run ``dask cuda worker`` as the worker startup command. Setting this option will also change | ||
the default Docker image to ``rapidsai/rapidsai:latest``, if you're using a custom image | ||
you must ensure the NVIDIA CUDA toolkit is installed with a version that matches the host machine | ||
along with ``dask-cuda``. | ||
|
@@ -1195,7 +1197,8 @@ async def _create_scheduler_task_definition_arn(self): | |
"memoryReservation": self._scheduler_mem, | ||
"essential": True, | ||
"command": [ | ||
"dask-scheduler", | ||
"dask", | ||
"scheduler", | ||
"--idle-timeout", | ||
self._scheduler_timeout, | ||
] | ||
|
@@ -1266,17 +1269,23 @@ async def _create_worker_task_definition_arn(self): | |
"resourceRequirements": resource_requirements, | ||
"essential": True, | ||
"command": [ | ||
"dask-cuda-worker" if self._worker_gpu else "dask-worker", | ||
"--nthreads", | ||
"{}".format( | ||
max(int(self._worker_cpu / 1024), 1) | ||
if self._worker_nthreads is None | ||
else self._worker_nthreads | ||
), | ||
"--memory-limit", | ||
"{}MB".format(int(self._worker_mem)), | ||
"--death-timeout", | ||
"60", | ||
e | ||
for e in [ | ||
"dask", | ||
"cuda" if self._worker_gpu else None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To both the above, just so it was more visible that we're filtering Nones out. |
||
"worker", | ||
"--nthreads", | ||
"{}".format( | ||
max(int(self._worker_cpu / 1024), 1) | ||
if self._worker_nthreads is None | ||
else self._worker_nthreads | ||
), | ||
"--memory-limit", | ||
"{}MB".format(int(self._worker_mem)), | ||
"--death-timeout", | ||
"60", | ||
] | ||
if e is not None | ||
] | ||
+ ( | ||
list() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why create this variable?