Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Add kubernetes backend #589

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
117 changes: 117 additions & 0 deletions openff/evaluator/_tests/test_backends/test_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import pytest
from openff.units import unit

from openff.evaluator.backends.backends import PodResources


class TestPodResources:

@pytest.fixture
def gpu_resources(self):
node_affinity = {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "nvidia.com/cuda.runtime.major",
"operator": "In",
"values": ["12"],
},
{
"key": "nvidia.com/cuda.runtime.minor",
"operator": "In",
"values": ["4"],
},
]
}
]
}
}
}
return PodResources(
number_of_threads=1,
number_of_gpus=1,
affinity_specification=node_affinity,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
)

@pytest.fixture
def cpu_resources(self):
return PodResources(
number_of_threads=1,
number_of_gpus=0,
memory_limit=5 * unit.terabyte,
ephemeral_storage_limit=20.0 * unit.megabyte,
affinity_specification=None,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
)

def test_podresources_initialization_gpu(self, gpu_resources):
assert gpu_resources._number_of_threads == 1
assert gpu_resources._number_of_gpus == 1
assert gpu_resources._affinity_specification == {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "nvidia.com/cuda.runtime.major",
"operator": "In",
"values": ["12"],
},
{
"key": "nvidia.com/cuda.runtime.minor",
"operator": "In",
"values": ["4"],
},
]
}
]
}
}
}
assert gpu_resources._minimum_number_of_workers == 1
assert gpu_resources._maximum_number_of_workers == 1
assert gpu_resources._resources == {"GPU": 1, "notGPU": 0}

def test_to_kubernetes_resources_limits_gpu(self, gpu_resources):
k8s_resources = gpu_resources._to_kubernetes_resource_limits()
assert k8s_resources == {
"cpu": "1",
"memory": "4.000Gi",
"ephemeral-storage": "20.000Gi",
"nvidia.com/gpu": "1",
}

def _to_dask_worker_resources_gpu(self, gpu_resources):
assert gpu_resources._to_dask_worker_resources() == [
"--resources",
"GPU=1,notGPU=0",
]

def test_podresources_initialization_cpu(self, cpu_resources):
assert cpu_resources._number_of_threads == 1
assert cpu_resources._number_of_gpus == 0
assert cpu_resources._affinity_specification == {}
assert cpu_resources._minimum_number_of_workers == 1
assert cpu_resources._maximum_number_of_workers == 1
assert cpu_resources._resources == {"GPU": 0, "notGPU": 1}

def test_to_kubernetes_resources_limits_cpu(self, cpu_resources):
k8s_resources = cpu_resources._to_kubernetes_resource_limits()
assert k8s_resources == {
"cpu": "1",
"memory": "5000.000Gi",
"ephemeral-storage": "0.020Gi",
}

def _to_dask_worker_resources_cpu(self, cpu_resources):
assert cpu_resources._to_dask_worker_resources() == [
"--resources",
"GPU=0,notGPU=1",
]
166 changes: 166 additions & 0 deletions openff/evaluator/_tests/test_backends/test_dask_kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import json
import pathlib

import pytest
import yaml
from openff.units import unit
from openff.utilities.utilities import get_data_dir_path

from openff.evaluator.backends.backends import PodResources
from openff.evaluator.backends.dask_kubernetes import (
DaskKubernetesBackend,
KubernetesEmptyDirVolume,
KubernetesSecret,
)


class TestDaskKubernetesBackend:
@pytest.fixture
def gpu_resources(self):
node_affinity = {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "nvidia.com/cuda.runtime.major",
"operator": "In",
"values": ["12"],
},
{
"key": "nvidia.com/cuda.runtime.minor",
"operator": "In",
"values": ["4"],
},
]
}
]
}
}
}
return PodResources(
number_of_threads=1,
number_of_gpus=1,
affinity_specification=node_affinity,
minimum_number_of_workers=1,
maximum_number_of_workers=10,
)

@pytest.fixture
def cpu_resources(self):
return PodResources(
number_of_threads=1,
number_of_gpus=0,
affinity_specification=None,
maximum_number_of_workers=20,
)

@pytest.fixture
def calculation_backend(self, gpu_resources, cpu_resources):
volume = KubernetesEmptyDirVolume(
name="evaluator-storage",
mount_path="/evaluator-storage",
)
secret = KubernetesSecret(
name="openeye-license",
secret_name="oe-license-feb-2024",
mount_path="/secrets/oe_license.txt",
sub_path="oe_license.txt",
)
Comment on lines +65 to +70
Copy link
Contributor Author

@lilyminium lilyminium Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very narrow view of a secret, which can be done in a few ways but for an OpenEye license as a file is easiest... this takes a previously-configured k8s secret called oe-license-feb-2024 and mounts it at the /secrets/oe_license.txt path to make it available

calculation_backend = DaskKubernetesBackend(
gpu_resources_per_worker=gpu_resources,
cpu_resources_per_worker=cpu_resources,
cluster_name="evaluator",
image="ghcr.io/lilyminium/openff-images:evaluator-0.4.10-kubernetes-dask-v0",
namespace="openforcefield",
env={
"OE_LICENSE": "/secrets/oe_license.txt",
# daemonic processes are not allowed to have children
"DASK_DISTRIBUTED__WORKER__DAEMON": "False",
"DASK_LOGGING__DISTRIBUTED": "debug",
"DASK__TEMPORARY_DIRECTORY": "/evaluator-storage",
},
volumes=[volume],
secrets=[secret],
)
return calculation_backend

def test_no_initialization_without_volumes(self, gpu_resources):
with pytest.raises(ValueError, match="No volumes specified"):
DaskKubernetesBackend(
gpu_resources_per_worker=gpu_resources,
cluster_name="evaluator",
image="ghcr.io/lilyminium/openff-images:evaluator-0.4.10-kubernetes-dask-v0",
namespace="openforcefield",
env={
"OE_LICENSE": "/secrets/oe_license.txt",
# daemonic processes are not allowed to have children
"DASK_DISTRIBUTED__WORKER__DAEMON": "False",
"DASK_LOGGING__DISTRIBUTED": "debug",
"DASK__TEMPORARY_DIRECTORY": "/evaluator-storage",
},
)

def test_no_initialization_without_resources(self):
with pytest.raises(ValueError, match="must be specified"):
DaskKubernetesBackend()

def test_generate_volume_specifications(self, calculation_backend):
volume_mounts, volumes = calculation_backend._generate_volume_specifications()
assert volume_mounts == [
{
"name": "evaluator-storage",
"mountPath": "/evaluator-storage",
"readOnly": False,
},
{
"name": "openeye-license",
"mountPath": "/secrets/oe_license.txt",
"subPath": "oe_license.txt",
"readOnly": True,
},
]

assert volumes == [
{
"name": "evaluator-storage",
"emptyDir": {},
},
{
"name": "openeye-license",
"secret": {
"secretName": "oe-license-feb-2024",
},
},
]

def test_generate_worker_spec(self, calculation_backend):
data_directory = pathlib.Path(
get_data_dir_path("test/kubernetes", "openff.evaluator")
)
reference_file = data_directory / "dask_worker_spec.yaml"

worker_spec = calculation_backend._generate_worker_spec(
calculation_backend._other_resources["cpu"]
)
with open(reference_file, "r") as file:
reference_spec = yaml.safe_load(file)

assert worker_spec == reference_spec

def test_generate_cluster_spec(self, calculation_backend):
cluster_spec = calculation_backend._generate_cluster_spec()

data_directory = pathlib.Path(
get_data_dir_path("test/kubernetes", "openff.evaluator")
)
reference_file = data_directory / "dask_cluster_spec.yaml"
with open(reference_file, "r") as file:
reference_spec = yaml.safe_load(file)

assert cluster_spec == reference_spec

@pytest.mark.skip(reason="Currently only works with existing kubectl credentials.")
def test_start(self, calculation_backend):
calculation_backend.start()
Loading
Loading