From a605f006c658d1378d28cd67dc42468643da75dc Mon Sep 17 00:00:00 2001 From: Shreya <167355081+ShreyaLnuHpe@users.noreply.github.com> Date: Thu, 29 Aug 2024 09:28:31 -0700 Subject: [PATCH] fix: reject reconnecting agents with different resource pool configuration (#9815) --- .../multi-resource-pools.devcluster.yaml | 111 ++++++++++++++++++ .circleci/real_config.yml | 14 +++ agent/cmd/determined-agent/run_test.go | 4 + agent/internal/agent.go | 2 + agent/internal/options/options.go | 1 + agent/internal/options/options_test.go | 1 + .../get-started/architecture/introduction.rst | 5 + e2e_tests/pytest.ini | 1 + e2e_tests/tests/cluster/conftest.py | 3 + e2e_tests/tests/cluster/managed_cluster.py | 41 +++++++ .../tests/cluster/test_master_restart.py | 43 +++++++ e2e_tests/tests/conftest.py | 1 + master/internal/rm/agentrm/agent.go | 13 ++ master/internal/rm/agentrm/agent_state.go | 16 +++ .../internal/rm/agentrm/agent_state_test.go | 62 ++++++++++ .../rm/agentrm/resource_managers_test.go | 2 - master/pkg/aproto/master_message.go | 1 + 17 files changed, 319 insertions(+), 2 deletions(-) create mode 100644 .circleci/devcluster/multi-resource-pools.devcluster.yaml diff --git a/.circleci/devcluster/multi-resource-pools.devcluster.yaml b/.circleci/devcluster/multi-resource-pools.devcluster.yaml new file mode 100644 index 00000000000..5529e251148 --- /dev/null +++ b/.circleci/devcluster/multi-resource-pools.devcluster.yaml @@ -0,0 +1,111 @@ +stages: + - db: + name: db + port: 5434 + + - master: + pre: + - sh: make -C tools prep-root + config_file: + security: + initial_user_password: $INITIAL_USER_PASSWORD + db: + host: localhost + port: 5434 + password: postgres + user: postgres + name: determined + __internal: + preemption_timeout: 60s + checkpoint_storage: + type: shared_fs + host_path: /tmp + storage_path: determined-cp + log: + level: debug + root: tools/build + cache: + cache_dir: /tmp/determined-cache + launch_error: false + telemetry: + enabled: false + resource_manager: + default_aux_resource_pool: default + default_compute_resource_pool: default + scheduler: + fitting_policy: best + type: fair_share + type: agent + resource_pools: + - agent_reattach_enabled: true + agent_reconnect_wait: 25s + description: '' + max_aux_containers_per_agent: 100 + pool_name: default + provider: null + task_container_defaults: null + - pool_name: pool1 + max_slots: 8 + scheduler: + type: priority + scim: + enabled: true + auth: + type: basic + username: determined + password: password + + - custom: + name: proxy + cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"] + post: + - conncheck: + port: 8081 + + - agent: + name: agent1 + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + hooks: + on_connection_lost: ["touch", "/tmp/agent1-connection-lost"] + + + - agent: + name: agent2 + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent2 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + + - agent: + name: agent10 # Copy of agent1, but with different resource pool. + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true + resource_pool: pool1 + + - agent: + name: agent20 # Copy of agent1, but with empty(default) resource pool. + config_file: + master_host: 127.0.0.1 + master_port: 8081 + agent_id: agent1 + container_master_host: $DOCKER_LOCALHOST + agent_reconnect_attempts: 24 + agent_reconnect_backoff: 5 + container_auto_remove_disabled: true diff --git a/.circleci/real_config.yml b/.circleci/real_config.yml index bb502fa358a..7e9e5435ef0 100644 --- a/.circleci/real_config.yml +++ b/.circleci/real_config.yml @@ -4512,6 +4512,20 @@ workflows: extra-pytest-flags: "--no-compare-stats" collect-det-job-logs: false + - test-e2e: + name: test-e2e-managed-devcluster-resource-pools + context: + - dev-ci-cluster-default-user-credentials + requires: + - build-go-ee + parallelism: 4 + resource-class: large + tf2: true + mark: managed_devcluster_resource_pools + managed-devcluster: true + extra-pytest-flags: "--no-compare-stats" + collect-det-job-logs: false + - test-e2e: name: test-e2e-multi-k8s context: diff --git a/agent/cmd/determined-agent/run_test.go b/agent/cmd/determined-agent/run_test.go index 593d749a4c5..d9ebda1ad4b 100644 --- a/agent/cmd/determined-agent/run_test.go +++ b/agent/cmd/determined-agent/run_test.go @@ -20,6 +20,7 @@ const DefaultRawConfig = ` log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) { log: level: trace color: true +resource_pool: default slot_type: auto security: tls: diff --git a/agent/internal/agent.go b/agent/internal/agent.go index 80693407d32..e30003f92bf 100644 --- a/agent/internal/agent.go +++ b/agent/internal/agent.go @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error { Version: a.version, Devices: devices, ContainersReattached: reattached, + ResourcePoolName: a.opts.ResourcePool, }}: case <-ctx.Done(): return ctx.Err() @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow( Version: a.version, Devices: devices, ContainersReattached: reattached, + ResourcePoolName: a.opts.ResourcePool, }}: case <-ctx.Done(): return nil, nil, ctx.Err() diff --git a/agent/internal/options/options.go b/agent/internal/options/options.go index 74d72b9e426..02120f639ae 100644 --- a/agent/internal/options/options.go +++ b/agent/internal/options/options.go @@ -28,6 +28,7 @@ func DefaultOptions() *Options { Level: "trace", Color: true, }, + ResourcePool: "default", SlotType: "auto", VisibleGPUs: VisibleGPUsFromEnvironment(), BindIP: "0.0.0.0", diff --git a/agent/internal/options/options_test.go b/agent/internal/options/options_test.go index 3c114a2f513..8bc6f70813d 100644 --- a/agent/internal/options/options_test.go +++ b/agent/internal/options/options_test.go @@ -58,6 +58,7 @@ log: log: level: trace color: true +resource_pool: default slot_type: auto security: tls: diff --git a/docs/get-started/architecture/introduction.rst b/docs/get-started/architecture/introduction.rst index 1a4585cb65d..815e4fbd261 100644 --- a/docs/get-started/architecture/introduction.rst +++ b/docs/get-started/architecture/introduction.rst @@ -391,6 +391,11 @@ If you are using static resource pools and launching agents by hand, you will ne :ref:`agent configuration ` to specify which resource pool the agent should join. +To change the resource pool an agent is assigned to after it has already joined one, you need to +update the :ref:`agent configuration `. Before making this change, ensure +the agents are properly drained. Once the configuration is updated, restart the agent to connect it +to the new resource pool. + Migrate to Resource Pools ------------------------- diff --git a/e2e_tests/pytest.ini b/e2e_tests/pytest.ini index 44c0c56e259..153cecd22ff 100644 --- a/e2e_tests/pytest.ini +++ b/e2e_tests/pytest.ini @@ -37,6 +37,7 @@ markers = nightly: nightly tests det_deploy_local: test det deploy local managed_devcluster: cluster tests that require a pytest-side managed cluster + managed_devcluster_resource_pools: cluster tests that require a pytest-side managed cluster with multiple resource pools port_registry: tests for port registry and unique port offset distributed_quarantine: distributed training tests (quarantine) diff --git a/e2e_tests/tests/cluster/conftest.py b/e2e_tests/tests/cluster/conftest.py index 9b61e3a30d1..ba545dd0546 100644 --- a/e2e_tests/tests/cluster/conftest.py +++ b/e2e_tests/tests/cluster/conftest.py @@ -1,11 +1,14 @@ """Makes managed_cluster fixtures available to all files in the directory""" from .managed_cluster import ( # noqa + managed_cluster_multi_resource_pools, managed_cluster_priority_scheduler, managed_cluster_restarts, managed_cluster_session, + managed_cluster_session_multi_resource_pools, managed_cluster_session_priority_scheduler, restartable_managed_cluster, + restartable_managed_cluster_multi_resource_pools, ) from .managed_cluster_k8s import k8s_managed_cluster # noqa from .managed_slurm_cluster import ( # noqa diff --git a/e2e_tests/tests/cluster/managed_cluster.py b/e2e_tests/tests/cluster/managed_cluster.py index ea7a99fa592..c31484b6ef9 100644 --- a/e2e_tests/tests/cluster/managed_cluster.py +++ b/e2e_tests/tests/cluster/managed_cluster.py @@ -15,6 +15,9 @@ DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml" DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml" DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml" +DEVCLUSTER_MULTI_RP_CONFIG_PATH = ( + DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml" +) def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]: @@ -204,3 +207,41 @@ def restartable_managed_cluster( managed_cluster_restarts.restart_master() managed_cluster_restarts.restart_agent() raise + + +@pytest.fixture(scope="session") +def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]: + config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH) + with ManagedCluster(config) as mc: + mc.initial_startup() + yield mc + + +@pytest.fixture +def managed_cluster_multi_resource_pools( + managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any +) -> Iterator[ManagedCluster]: + config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH) + utils.set_master_port(config) + nodeid = request.node.nodeid + managed_cluster_session_multi_resource_pools.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} setup\n" + ) + yield managed_cluster_session_multi_resource_pools + managed_cluster_session_multi_resource_pools.log_marker( + f"pytest [{utils.now_ts()}] {nodeid} teardown\n" + ) + + +@pytest.fixture +def restartable_managed_cluster_multi_resource_pools( + managed_cluster_multi_resource_pools: ManagedCluster, +) -> Iterator[ManagedCluster]: + managed_cluster_multi_resource_pools.wait_for_agent_ok(20) + try: + yield managed_cluster_multi_resource_pools + managed_cluster_multi_resource_pools.wait_for_agent_ok(20) + except Exception: + managed_cluster_multi_resource_pools.restart_master() + managed_cluster_multi_resource_pools.restart_agent() + raise diff --git a/e2e_tests/tests/cluster/test_master_restart.py b/e2e_tests/tests/cluster/test_master_restart.py index 1d5b1c1fd10..8e95025cdea 100644 --- a/e2e_tests/tests/cluster/test_master_restart.py +++ b/e2e_tests/tests/cluster/test_master_restart.py @@ -756,3 +756,46 @@ def test_master_restart_with_queued( for cmd_id in [running_command_id, queued_command_id]: utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90) utils.assert_command_succeeded(sess, cmd_id) + + +@pytest.mark.managed_devcluster_resource_pools +def test_agent_resource_pool_change( + restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, +) -> None: + admin = api_utils.admin_session() + try: + restartable_managed_cluster_multi_resource_pools.kill_agent() + restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10") + + for _i in range(5): + agent_data = managed_cluster.get_agent_data(admin) + if len(agent_data) == 0: + # Agent has exploded and been wiped due to resource pool mismatch, as expected. + break + else: + pytest.fail( + f"agent with different resource pool is still present after {_i} ticks:{agent_data}" + ) + finally: + restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10") + restartable_managed_cluster_multi_resource_pools.restart_agent() + + +@pytest.mark.managed_devcluster_resource_pools +def test_agent_resource_pool_unchanged( + restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster, +) -> None: + admin = api_utils.admin_session() + try: + restartable_managed_cluster_multi_resource_pools.kill_agent() + restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20") + + for _i in range(5): + agent_data = managed_cluster.get_agent_data(admin) + if len(agent_data) == 0: + # Agent has exploded and been wiped due to resource pool mismatch, + # which is not expected. + pytest.fail("agent exploded even with the same resource pool") + finally: + restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20") + restartable_managed_cluster_multi_resource_pools.restart_agent() diff --git a/e2e_tests/tests/conftest.py b/e2e_tests/tests/conftest.py index 0c8a2498ce4..d0c96bf53d3 100644 --- a/e2e_tests/tests/conftest.py +++ b/e2e_tests/tests/conftest.py @@ -49,6 +49,7 @@ "model_hub_mmdetection", "deepspeed", "managed_devcluster", + "managed_devcluster_resource_pools", "port_registry", "distributed_quarantine", "det_deploy_local_quarantine", diff --git a/master/internal/rm/agentrm/agent.go b/master/internal/rm/agentrm/agent.go index a5465898910..ac87d2f427c 100644 --- a/master/internal/rm/agentrm/agent.go +++ b/master/internal/rm/agentrm/agent.go @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) { a.stop(err) return } + resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted) + if resourcePoolErr != nil { + a.syslog.WithError(resourcePoolErr). + Error("change in agent resource pool was detected during reconnect") + a.socket.Outbox <- aproto.AgentMessage{ + AgentShutdown: &aproto.AgentShutdown{ + ErrMsg: aproto.ErrAgentMustReconnect.Error(), + }, + } + + a.stop(resourcePoolErr) + return + } } else { a.agentStarted(msg.AgentStarted) } diff --git a/master/internal/rm/agentrm/agent_state.go b/master/internal/rm/agentrm/agent_state.go index 0a5941173db..6b4d6a4cd14 100644 --- a/master/internal/rm/agentrm/agent_state.go +++ b/master/internal/rm/agentrm/agent_state.go @@ -20,6 +20,8 @@ import ( "github.com/determined-ai/determined/master/pkg/model" ) +const defaultResourcePoolName = "default" + type slotEnabled struct { deviceAdded bool agentEnabled bool @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch( return nil } +// We need to compare the resource pool configurations between the agent and the master. +// Ideally, the master doesn't request new resource pool information if the agent reconnects +// within the designated reconnection period, while the agent should read from its updated configuration. +// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart. +func (a *agentState) checkAgentResourcePoolMatch( + agentStarted *aproto.AgentStarted, +) error { + if a.resourcePoolName != agentStarted.ResourcePoolName { + return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName) + } + + return nil +} + func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) { for _, d := range msg.Container.Devices { s, ok := a.slotStates[d.ID] diff --git a/master/internal/rm/agentrm/agent_state_test.go b/master/internal/rm/agentrm/agent_state_test.go index 3341f83f9d0..217c44c29d7 100644 --- a/master/internal/rm/agentrm/agent_state_test.go +++ b/master/internal/rm/agentrm/agent_state_test.go @@ -70,6 +70,7 @@ func TestAgentStatePersistence(t *testing.T) { Version: "", Devices: devices, ContainersReattached: []aproto.ContainerReattachAck{}, + ResourcePoolName: defaultResourcePoolName, } state.agentStarted(started) require.Len(t, state.getSlotsSummary("/myagent"), 2) @@ -322,6 +323,66 @@ func Test_agentState_checkAgentStartedDevicesMatch(t *testing.T) { } } +func Test_agentState_checkAgentResourcePoolMatch(t *testing.T) { + const ( + poolOne = "pool1" + poolTwo = "pool2" + ) + tests := []struct { + name string + state agentState + agentStarted *aproto.AgentStarted + wantErrContains string + }{ + { + name: "resource pool name match", + state: agentState{ + resourcePoolName: poolOne, + }, + agentStarted: &aproto.AgentStarted{ + ResourcePoolName: poolOne, + }, + wantErrContains: "", + }, + { + name: "resource pool name is missing", + state: agentState{ + resourcePoolName: poolOne, + }, + agentStarted: &aproto.AgentStarted{ResourcePoolName: defaultResourcePoolName}, + wantErrContains: "resource pool has changed", + }, + { + name: "resource pool name is missing", + state: agentState{ + resourcePoolName: defaultResourcePoolName, + }, + agentStarted: &aproto.AgentStarted{ResourcePoolName: poolOne}, + wantErrContains: "resource pool has changed", + }, + { + name: "mismatched resource pool name", + state: agentState{ + resourcePoolName: poolOne, + }, + agentStarted: &aproto.AgentStarted{ + ResourcePoolName: poolTwo, + }, + wantErrContains: "resource pool has changed", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.state.checkAgentResourcePoolMatch(tt.agentStarted) + if tt.wantErrContains == "" { + require.NoError(t, err) + return + } + require.ErrorContains(t, err, tt.wantErrContains) + }) + } +} + func TestSlotStates(t *testing.T) { rpName := "test" state := newAgentState(aproto.ID(uuid.NewString()), 64) @@ -345,6 +406,7 @@ func TestSlotStates(t *testing.T) { Version: "", Devices: devices, ContainersReattached: []aproto.ContainerReattachAck{}, + ResourcePoolName: defaultResourcePoolName, } state.agentStarted(started) slots := state.getSlotsSummary("/") diff --git a/master/internal/rm/agentrm/resource_managers_test.go b/master/internal/rm/agentrm/resource_managers_test.go index dea2af873ca..0cd82575b05 100644 --- a/master/internal/rm/agentrm/resource_managers_test.go +++ b/master/internal/rm/agentrm/resource_managers_test.go @@ -15,8 +15,6 @@ import ( "github.com/determined-ai/determined/master/internal/user" ) -const defaultResourcePoolName = "default" - func TestResourceManagerForwardMessage(t *testing.T) { user.InitService(nil, nil) conf := &config.ResourceConfig{ diff --git a/master/pkg/aproto/master_message.go b/master/pkg/aproto/master_message.go index edbefb08b1c..ea2385377b6 100644 --- a/master/pkg/aproto/master_message.go +++ b/master/pkg/aproto/master_message.go @@ -80,6 +80,7 @@ type AgentStarted struct { Version string Devices []device.Device ContainersReattached []ContainerReattachAck + ResourcePoolName string } // ContainerStateChanged notifies the master that the agent transitioned the container state.