Skip to content

Commit

Permalink
fix: reject reconnecting agents with different resource pool configur…
Browse files Browse the repository at this point in the history
…ation (#9815)
  • Loading branch information
ShreyaLnuHpe authored Aug 29, 2024
1 parent db92bad commit a605f00
Show file tree
Hide file tree
Showing 17 changed files with 319 additions and 2 deletions.
111 changes: 111 additions & 0 deletions .circleci/devcluster/multi-resource-pools.devcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
stages:
- db:
name: db
port: 5434

- master:
pre:
- sh: make -C tools prep-root
config_file:
security:
initial_user_password: $INITIAL_USER_PASSWORD
db:
host: localhost
port: 5434
password: postgres
user: postgres
name: determined
__internal:
preemption_timeout: 60s
checkpoint_storage:
type: shared_fs
host_path: /tmp
storage_path: determined-cp
log:
level: debug
root: tools/build
cache:
cache_dir: /tmp/determined-cache
launch_error: false
telemetry:
enabled: false
resource_manager:
default_aux_resource_pool: default
default_compute_resource_pool: default
scheduler:
fitting_policy: best
type: fair_share
type: agent
resource_pools:
- agent_reattach_enabled: true
agent_reconnect_wait: 25s
description: ''
max_aux_containers_per_agent: 100
pool_name: default
provider: null
task_container_defaults: null
- pool_name: pool1
max_slots: 8
scheduler:
type: priority
scim:
enabled: true
auth:
type: basic
username: determined
password: password

- custom:
name: proxy
cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"]
post:
- conncheck:
port: 8081

- agent:
name: agent1
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
hooks:
on_connection_lost: ["touch", "/tmp/agent1-connection-lost"]


- agent:
name: agent2
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent2
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true

- agent:
name: agent10 # Copy of agent1, but with different resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
resource_pool: pool1

- agent:
name: agent20 # Copy of agent1, but with empty(default) resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
14 changes: 14 additions & 0 deletions .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4512,6 +4512,20 @@ workflows:
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-managed-devcluster-resource-pools
context:
- dev-ci-cluster-default-user-credentials
requires:
- build-go-ee
parallelism: 4
resource-class: large
tf2: true
mark: managed_devcluster_resource_pools
managed-devcluster: true
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-multi-k8s
context:
Expand Down
4 changes: 4 additions & 0 deletions agent/cmd/determined-agent/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DefaultRawConfig = `
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
2 changes: 2 additions & 0 deletions agent/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error {
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow(
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return nil, nil, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func DefaultOptions() *Options {
Level: "trace",
Color: true,
},
ResourcePool: "default",
SlotType: "auto",
VisibleGPUs: VisibleGPUsFromEnvironment(),
BindIP: "0.0.0.0",
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ log:
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
5 changes: 5 additions & 0 deletions docs/get-started/architecture/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ If you are using static resource pools and launching agents by hand, you will ne
:ref:`agent configuration <agent-config-reference>` to specify which resource pool the agent should
join.

To change the resource pool an agent is assigned to after it has already joined one, you need to
update the :ref:`agent configuration <agent-config-reference>`. Before making this change, ensure
the agents are properly drained. Once the configuration is updated, restart the agent to connect it
to the new resource pool.

Migrate to Resource Pools
-------------------------

Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ markers =
nightly: nightly tests
det_deploy_local: test det deploy local
managed_devcluster: cluster tests that require a pytest-side managed cluster
managed_devcluster_resource_pools: cluster tests that require a pytest-side managed cluster with multiple resource pools
port_registry: tests for port registry and unique port offset

distributed_quarantine: distributed training tests (quarantine)
Expand Down
3 changes: 3 additions & 0 deletions e2e_tests/tests/cluster/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Makes managed_cluster fixtures available to all files in the directory"""

from .managed_cluster import ( # noqa
managed_cluster_multi_resource_pools,
managed_cluster_priority_scheduler,
managed_cluster_restarts,
managed_cluster_session,
managed_cluster_session_multi_resource_pools,
managed_cluster_session_priority_scheduler,
restartable_managed_cluster,
restartable_managed_cluster_multi_resource_pools,
)
from .managed_cluster_k8s import k8s_managed_cluster # noqa
from .managed_slurm_cluster import ( # noqa
Expand Down
41 changes: 41 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml"
DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml"
DEVCLUSTER_MULTI_RP_CONFIG_PATH = (
DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml"
)


def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -204,3 +207,41 @@ def restartable_managed_cluster(
managed_cluster_restarts.restart_master()
managed_cluster_restarts.restart_agent()
raise


@pytest.fixture(scope="session")
def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
with ManagedCluster(config) as mc:
mc.initial_startup()
yield mc


@pytest.fixture
def managed_cluster_multi_resource_pools(
managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any
) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
utils.set_master_port(config)
nodeid = request.node.nodeid
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} setup\n"
)
yield managed_cluster_session_multi_resource_pools
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} teardown\n"
)


@pytest.fixture
def restartable_managed_cluster_multi_resource_pools(
managed_cluster_multi_resource_pools: ManagedCluster,
) -> Iterator[ManagedCluster]:
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
try:
yield managed_cluster_multi_resource_pools
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
except Exception:
managed_cluster_multi_resource_pools.restart_master()
managed_cluster_multi_resource_pools.restart_agent()
raise
43 changes: 43 additions & 0 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,46 @@ def test_master_restart_with_queued(
for cmd_id in [running_command_id, queued_command_id]:
utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90)
utils.assert_command_succeeded(sess, cmd_id)


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_change(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch, as expected.
break
else:
pytest.fail(
f"agent with different resource pool is still present after {_i} ticks:{agent_data}"
)
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10")
restartable_managed_cluster_multi_resource_pools.restart_agent()


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_unchanged(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch,
# which is not expected.
pytest.fail("agent exploded even with the same resource pool")
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20")
restartable_managed_cluster_multi_resource_pools.restart_agent()
1 change: 1 addition & 0 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"model_hub_mmdetection",
"deepspeed",
"managed_devcluster",
"managed_devcluster_resource_pools",
"port_registry",
"distributed_quarantine",
"det_deploy_local_quarantine",
Expand Down
13 changes: 13 additions & 0 deletions master/internal/rm/agentrm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) {
a.stop(err)
return
}
resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted)
if resourcePoolErr != nil {
a.syslog.WithError(resourcePoolErr).
Error("change in agent resource pool was detected during reconnect")
a.socket.Outbox <- aproto.AgentMessage{
AgentShutdown: &aproto.AgentShutdown{
ErrMsg: aproto.ErrAgentMustReconnect.Error(),
},
}

a.stop(resourcePoolErr)
return
}
} else {
a.agentStarted(msg.AgentStarted)
}
Expand Down
16 changes: 16 additions & 0 deletions master/internal/rm/agentrm/agent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
)

const defaultResourcePoolName = "default"

type slotEnabled struct {
deviceAdded bool
agentEnabled bool
Expand Down Expand Up @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch(
return nil
}

// We need to compare the resource pool configurations between the agent and the master.
// Ideally, the master doesn't request new resource pool information if the agent reconnects
// within the designated reconnection period, while the agent should read from its updated configuration.
// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart.
func (a *agentState) checkAgentResourcePoolMatch(
agentStarted *aproto.AgentStarted,
) error {
if a.resourcePoolName != agentStarted.ResourcePoolName {
return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName)
}

return nil
}

func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) {
for _, d := range msg.Container.Devices {
s, ok := a.slotStates[d.ID]
Expand Down
Loading

0 comments on commit a605f00

Please sign in to comment.