Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reject reconnecting agents with different resource pool configuration #9815

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions .circleci/devcluster/multi-resource-pools.devcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
stages:
- db:
name: db
port: 5434

- master:
pre:
- sh: make -C tools prep-root
config_file:
security:
initial_user_password: $INITIAL_USER_PASSWORD
db:
host: localhost
port: 5434
password: postgres
user: postgres
name: determined
__internal:
preemption_timeout: 60s
checkpoint_storage:
type: shared_fs
host_path: /tmp
storage_path: determined-cp
log:
level: debug
root: tools/build
cache:
cache_dir: /tmp/determined-cache
launch_error: false
telemetry:
enabled: false
resource_manager:
default_aux_resource_pool: default
default_compute_resource_pool: default
scheduler:
fitting_policy: best
type: fair_share
type: agent
resource_pools:
- agent_reattach_enabled: true
agent_reconnect_wait: 25s
description: ''
max_aux_containers_per_agent: 100
pool_name: default
provider: null
task_container_defaults: null
- pool_name: pool1
max_slots: 8
scheduler:
type: priority
scim:
enabled: true
auth:
type: basic
username: determined
password: password

- custom:
name: proxy
cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"]
post:
- conncheck:
port: 8081

- agent:
name: agent1
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
hooks:
on_connection_lost: ["touch", "/tmp/agent1-connection-lost"]


- agent:
name: agent2
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent2
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true

- agent:
name: agent10 # Copy of agent1, but with different resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
resource_pool: pool1

- agent:
name: agent20 # Copy of agent1, but with empty(default) resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
14 changes: 14 additions & 0 deletions .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4512,6 +4512,20 @@ workflows:
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-managed-devcluster-resource-pools
Copy link
Contributor Author

@ShreyaLnuHpe ShreyaLnuHpe Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 (DONE): Create a separate E2E test for the .circleci/devcluster/multi-resource-pools.devcluster.yaml config file to allow for broader test coverage in the future.
Option 2: Add a parallel run within test-e2e-managed-devcluster for the new config file.
https://circleci.com/docs/parallelism-faster-jobs/

In both options, the overall execution time of the CircleCI test-e2e runs remains unaffected since they run in parallel with other E2E tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think option 1 is good for now, when we need broader coverage we can decide how to split things up!

context:
- dev-ci-cluster-default-user-credentials
requires:
- build-go-ee
parallelism: 4
resource-class: large
tf2: true
mark: managed_devcluster_resource_pools
managed-devcluster: true
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-multi-k8s
context:
Expand Down
4 changes: 4 additions & 0 deletions agent/cmd/determined-agent/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DefaultRawConfig = `
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
2 changes: 2 additions & 0 deletions agent/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error {
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow(
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return nil, nil, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func DefaultOptions() *Options {
Level: "trace",
Color: true,
},
ResourcePool: "default",
SlotType: "auto",
VisibleGPUs: VisibleGPUsFromEnvironment(),
BindIP: "0.0.0.0",
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ log:
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
5 changes: 5 additions & 0 deletions docs/get-started/architecture/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ If you are using static resource pools and launching agents by hand, you will ne
:ref:`agent configuration <agent-config-reference>` to specify which resource pool the agent should
join.

To change the resource pool an agent is assigned to after it has already joined one, you need to
update the :ref:`agent configuration <agent-config-reference>`. Before making this change, ensure
the agents are properly drained. Once the configuration is updated, restart the agent to connect it
to the new resource pool.

Migrate to Resource Pools
-------------------------

Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ markers =
nightly: nightly tests
det_deploy_local: test det deploy local
managed_devcluster: cluster tests that require a pytest-side managed cluster
managed_devcluster_resource_pools: cluster tests that require a pytest-side managed cluster with multiple resource pools
port_registry: tests for port registry and unique port offset

distributed_quarantine: distributed training tests (quarantine)
Expand Down
3 changes: 3 additions & 0 deletions e2e_tests/tests/cluster/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Makes managed_cluster fixtures available to all files in the directory"""

from .managed_cluster import ( # noqa
managed_cluster_multi_resource_pools,
managed_cluster_priority_scheduler,
managed_cluster_restarts,
managed_cluster_session,
managed_cluster_session_multi_resource_pools,
managed_cluster_session_priority_scheduler,
restartable_managed_cluster,
restartable_managed_cluster_multi_resource_pools,
)
from .managed_cluster_k8s import k8s_managed_cluster # noqa
from .managed_slurm_cluster import ( # noqa
Expand Down
41 changes: 41 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml"
DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml"
DEVCLUSTER_MULTI_RP_CONFIG_PATH = (
DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml"
)


def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -204,3 +207,41 @@ def restartable_managed_cluster(
managed_cluster_restarts.restart_master()
managed_cluster_restarts.restart_agent()
raise


@pytest.fixture(scope="session")
def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
with ManagedCluster(config) as mc:
mc.initial_startup()
yield mc


@pytest.fixture
def managed_cluster_multi_resource_pools(
managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any
) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
utils.set_master_port(config)
nodeid = request.node.nodeid
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} setup\n"
)
yield managed_cluster_session_multi_resource_pools
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} teardown\n"
)


@pytest.fixture
def restartable_managed_cluster_multi_resource_pools(
managed_cluster_multi_resource_pools: ManagedCluster,
) -> Iterator[ManagedCluster]:
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
try:
yield managed_cluster_multi_resource_pools
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
except Exception:
managed_cluster_multi_resource_pools.restart_master()
managed_cluster_multi_resource_pools.restart_agent()
raise
43 changes: 43 additions & 0 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,46 @@ def test_master_restart_with_queued(
for cmd_id in [running_command_id, queued_command_id]:
utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90)
utils.assert_command_succeeded(sess, cmd_id)


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_change(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch, as expected.
break
else:
pytest.fail(
f"agent with different resource pool is still present after {_i} ticks:{agent_data}"
)
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10")
restartable_managed_cluster_multi_resource_pools.restart_agent()


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_unchanged(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch,
# which is not expected.
pytest.fail("agent exploded even with the same resource pool")
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20")
restartable_managed_cluster_multi_resource_pools.restart_agent()
1 change: 1 addition & 0 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"model_hub_mmdetection",
"deepspeed",
"managed_devcluster",
"managed_devcluster_resource_pools",
"port_registry",
"distributed_quarantine",
"det_deploy_local_quarantine",
Expand Down
13 changes: 13 additions & 0 deletions master/internal/rm/agentrm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) {
a.stop(err)
return
}
resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted)
if resourcePoolErr != nil {
a.syslog.WithError(resourcePoolErr).
Error("change in agent resource pool was detected during reconnect")
a.socket.Outbox <- aproto.AgentMessage{
AgentShutdown: &aproto.AgentShutdown{
ErrMsg: aproto.ErrAgentMustReconnect.Error(),
},
}

a.stop(resourcePoolErr)
return
}
} else {
a.agentStarted(msg.AgentStarted)
}
Expand Down
16 changes: 16 additions & 0 deletions master/internal/rm/agentrm/agent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
)

const defaultResourcePoolName = "default"

type slotEnabled struct {
deviceAdded bool
agentEnabled bool
Expand Down Expand Up @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch(
return nil
}

// We need to compare the resource pool configurations between the agent and the master.
// Ideally, the master doesn't request new resource pool information if the agent reconnects
// within the designated reconnection period, while the agent should read from its updated configuration.
// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart.
func (a *agentState) checkAgentResourcePoolMatch(
agentStarted *aproto.AgentStarted,
) error {
if a.resourcePoolName != agentStarted.ResourcePoolName {
return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName)
}

return nil
}

func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) {
for _, d := range msg.Container.Devices {
s, ok := a.slotStates[d.ID]
Expand Down
Loading
Loading