Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reject reconnecting agents with different resource pool configuration #9815

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/devcluster/double-reattach.devcluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ stages:
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
artificial_slots: 4
artificial_slots: 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to configure your editor so it does not remove newlines at the end of files. We can afford the extra byte of data. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted! I will configure my editor better. But surprisingly this .yaml file didn't have a trailing new line in the end of it, nor did I add/remove any. Will see what is wrong and correct it. Thanks!

Copy link
Contributor

@maxrussell maxrussell Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use VS Code, there's this setting you can toggle:
image

For more context, this is a POSIX standard thing: all lines should be terminated with a newline. Generally, this means that files' last line should have a \n at the end as well. Git and Github show us when the file does not comply with the standard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks!

111 changes: 111 additions & 0 deletions .circleci/devcluster/multi-resource-pools.devcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
stages:
- db:
name: db
port: 5434

- master:
pre:
- sh: make -C tools prep-root
config_file:
security:
initial_user_password: $INITIAL_USER_PASSWORD
db:
host: localhost
port: 5434
password: postgres
user: postgres
name: determined
__internal:
preemption_timeout: 60s
checkpoint_storage:
type: shared_fs
host_path: /tmp
storage_path: determined-cp
log:
level: debug
root: tools/build
cache:
cache_dir: /tmp/determined-cache
launch_error: false
telemetry:
enabled: false
resource_manager:
default_aux_resource_pool: default
default_compute_resource_pool: default
scheduler:
fitting_policy: best
type: fair_share
type: agent
resource_pools:
- agent_reattach_enabled: true
agent_reconnect_wait: 25s
description: ''
max_aux_containers_per_agent: 100
pool_name: default
provider: null
task_container_defaults: null
- pool_name: pool1
max_slots: 8
scheduler:
type: priority
scim:
enabled: true
auth:
type: basic
username: determined
password: password

- custom:
name: proxy
cmd: ["socat", "-d", "-d", "TCP-LISTEN:8081,reuseaddr,fork", "TCP:localhost:8080"]
post:
- conncheck:
port: 8081

- agent:
name: agent1
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
hooks:
on_connection_lost: ["touch", "/tmp/agent1-connection-lost"]


- agent:
name: agent2
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent2
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true

- agent:
name: agent10 # Copy of agent1, but with different resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
resource_pool: pool1

- agent:
name: agent20 # Copy of agent1, but with empty(default) resource pool.
config_file:
master_host: 127.0.0.1
master_port: 8081
agent_id: agent1
container_master_host: $DOCKER_LOCALHOST
agent_reconnect_attempts: 24
agent_reconnect_backoff: 5
container_auto_remove_disabled: true
14 changes: 14 additions & 0 deletions .circleci/real_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4512,6 +4512,20 @@ workflows:
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-managed-devcluster-resource-pools
Copy link
Contributor Author

@ShreyaLnuHpe ShreyaLnuHpe Aug 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 (DONE): Create a separate E2E test for the .circleci/devcluster/multi-resource-pools.devcluster.yaml config file to allow for broader test coverage in the future.
Option 2: Add a parallel run within test-e2e-managed-devcluster for the new config file.
https://circleci.com/docs/parallelism-faster-jobs/

In both options, the overall execution time of the CircleCI test-e2e runs remains unaffected since they run in parallel with other E2E tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think option 1 is good for now, when we need broader coverage we can decide how to split things up!

context:
- dev-ci-cluster-default-user-credentials
requires:
- build-go-ee
parallelism: 4
resource-class: large
tf2: true
mark: managed_devcluster_resource_pools
managed-devcluster: true
extra-pytest-flags: "--no-compare-stats"
collect-det-job-logs: false

- test-e2e:
name: test-e2e-multi-k8s
context:
Expand Down
4 changes: 4 additions & 0 deletions agent/cmd/determined-agent/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const DefaultRawConfig = `
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -100,6 +101,7 @@ func TestMergeAgentConfigViaNewViper(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -157,6 +159,7 @@ func TestMergeAgentConfigViaViperWithDefaults(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down Expand Up @@ -213,6 +216,7 @@ func TestMergeAgentConfigViaViperWithDefaultsEnvAndFlags(t *testing.T) {
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
2 changes: 2 additions & 0 deletions agent/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (a *Agent) run(ctx context.Context) error {
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -351,6 +352,7 @@ func (a *Agent) reconnectFlow(
Version: a.version,
Devices: devices,
ContainersReattached: reattached,
ResourcePoolName: a.opts.ResourcePool,
}}:
case <-ctx.Done():
return nil, nil, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func DefaultOptions() *Options {
Level: "trace",
Color: true,
},
ResourcePool: "default",
SlotType: "auto",
VisibleGPUs: VisibleGPUsFromEnvironment(),
BindIP: "0.0.0.0",
Expand Down
1 change: 1 addition & 0 deletions agent/internal/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ log:
log:
level: trace
color: true
resource_pool: default
slot_type: auto
security:
tls:
Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ markers =
nightly: nightly tests
det_deploy_local: test det deploy local
managed_devcluster: cluster tests that require a pytest-side managed cluster
managed_devcluster_resource_pools: cluster tests that require a pytest-side managed cluster with multiple resource pools
port_registry: tests for port registry and unique port offset

distributed_quarantine: distributed training tests (quarantine)
Expand Down
3 changes: 3 additions & 0 deletions e2e_tests/tests/cluster/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Makes managed_cluster fixtures available to all files in the directory"""

from .managed_cluster import ( # noqa
managed_cluster_multi_resource_pools,
managed_cluster_priority_scheduler,
managed_cluster_restarts,
managed_cluster_session,
managed_cluster_session_multi_resource_pools,
managed_cluster_session_priority_scheduler,
restartable_managed_cluster,
restartable_managed_cluster_multi_resource_pools,
)
from .managed_cluster_k8s import k8s_managed_cluster # noqa
from .managed_slurm_cluster import ( # noqa
Expand Down
41 changes: 41 additions & 0 deletions e2e_tests/tests/cluster/managed_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
DEVCLUSTER_REATTACH_OFF_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double.devcluster.yaml"
DEVCLUSTER_REATTACH_ON_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "double-reattach.devcluster.yaml"
DEVCLUSTER_PRIORITY_SCHEDULER_CONFIG_PATH = DEVCLUSTER_CONFIG_ROOT_PATH / "priority.devcluster.yaml"
DEVCLUSTER_MULTI_RP_CONFIG_PATH = (
DEVCLUSTER_CONFIG_ROOT_PATH / "multi-resource-pools.devcluster.yaml"
)


def get_agent_data(sess: api.Session) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -204,3 +207,41 @@ def restartable_managed_cluster(
managed_cluster_restarts.restart_master()
managed_cluster_restarts.restart_agent()
raise


@pytest.fixture(scope="session")
def managed_cluster_session_multi_resource_pools(request: Any) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
with ManagedCluster(config) as mc:
mc.initial_startup()
yield mc


@pytest.fixture
def managed_cluster_multi_resource_pools(
managed_cluster_session_multi_resource_pools: ManagedCluster, request: Any
) -> Iterator[ManagedCluster]:
config = str(DEVCLUSTER_MULTI_RP_CONFIG_PATH)
utils.set_master_port(config)
nodeid = request.node.nodeid
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} setup\n"
)
yield managed_cluster_session_multi_resource_pools
managed_cluster_session_multi_resource_pools.log_marker(
f"pytest [{utils.now_ts()}] {nodeid} teardown\n"
)


@pytest.fixture
def restartable_managed_cluster_multi_resource_pools(
managed_cluster_multi_resource_pools: ManagedCluster,
) -> Iterator[ManagedCluster]:
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
try:
yield managed_cluster_multi_resource_pools
managed_cluster_multi_resource_pools.wait_for_agent_ok(20)
except Exception:
managed_cluster_multi_resource_pools.restart_master()
managed_cluster_multi_resource_pools.restart_agent()
raise
43 changes: 43 additions & 0 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,46 @@ def test_master_restart_with_queued(
for cmd_id in [running_command_id, queued_command_id]:
utils.wait_for_command_state(sess, cmd_id, "TERMINATED", 90)
utils.assert_command_succeeded(sess, cmd_id)


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_change(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent10")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch, as expected.
break
else:
pytest.fail(
f"agent with different resource pool is still present after {_i} ticks:{agent_data}"
)
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent10")
restartable_managed_cluster_multi_resource_pools.restart_agent()


@pytest.mark.managed_devcluster_resource_pools
def test_agent_resource_pool_unchanged(
restartable_managed_cluster_multi_resource_pools: managed_cluster.ManagedCluster,
) -> None:
admin = api_utils.admin_session()
try:
restartable_managed_cluster_multi_resource_pools.kill_agent()
restartable_managed_cluster_multi_resource_pools.dc.restart_stage("agent20")

for _i in range(5):
agent_data = managed_cluster.get_agent_data(admin)
if len(agent_data) == 0:
# Agent has exploded and been wiped due to resource pool mismatch,
# which is not expected.
pytest.fail("agent exploded even with the same resource pool")
finally:
restartable_managed_cluster_multi_resource_pools.dc.kill_stage("agent20")
restartable_managed_cluster_multi_resource_pools.restart_agent()
1 change: 1 addition & 0 deletions e2e_tests/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"model_hub_mmdetection",
"deepspeed",
"managed_devcluster",
"managed_devcluster_resource_pools",
"port_registry",
"distributed_quarantine",
"det_deploy_local_quarantine",
Expand Down
13 changes: 13 additions & 0 deletions master/internal/rm/agentrm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,19 @@ func (a *agent) HandleIncomingWebsocketMessage(msg *aproto.MasterMessage) {
a.stop(err)
return
}
resourcePoolErr := a.agentState.checkAgentResourcePoolMatch(msg.AgentStarted)
if resourcePoolErr != nil {
a.syslog.WithError(resourcePoolErr).
Error("change in agent resource pool was detected")
ShreyaLnuHpe marked this conversation as resolved.
Show resolved Hide resolved
a.socket.Outbox <- aproto.AgentMessage{
AgentShutdown: &aproto.AgentShutdown{
ErrMsg: aproto.ErrAgentMustReconnect.Error(),
},
}

a.stop(resourcePoolErr)
return
}
} else {
a.agentStarted(msg.AgentStarted)
}
Expand Down
16 changes: 16 additions & 0 deletions master/internal/rm/agentrm/agent_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/determined-ai/determined/master/pkg/model"
)

const defaultResourcePoolName = "default"

type slotEnabled struct {
deviceAdded bool
agentEnabled bool
Expand Down Expand Up @@ -289,6 +291,20 @@ func (a *agentState) checkAgentStartedDevicesMatch(
return nil
}

// We need to compare the resource pool configurations between the agent and the master.
// Ideally, the master doesn't request new resource pool information if the agent reconnects
// within the designated reconnection period, while the agent should read from its updated configuration.
// If there's a mismatch, an error will be thrown, causing the agent to stop and require a restart.
func (a *agentState) checkAgentResourcePoolMatch(
agentStarted *aproto.AgentStarted,
) error {
if a.resourcePoolName != agentStarted.ResourcePoolName {
return fmt.Errorf("resource pool has changed: %s -> %s", a.resourcePoolName, agentStarted.ResourcePoolName)
}

return nil
}

func (a *agentState) containerStateChanged(msg aproto.ContainerStateChanged) {
for _, d := range msg.Container.Devices {
s, ok := a.slotStates[d.ID]
Expand Down
Loading
Loading