Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: serialize ops per volume #24852

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 103 additions & 42 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package nomad

import (
"context"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"

"github.com/armon/go-metrics"
Expand All @@ -28,6 +30,9 @@ type HostVolume struct {
srv *Server
ctx *RPCContext
logger hclog.Logger

// volOps is used to serialize operations per volume ID
volOps sync.Map
}

func NewHostVolumeEndpoint(srv *Server, ctx *RPCContext) *HostVolume {
Expand Down Expand Up @@ -263,25 +268,31 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct
return err
}

// Attempt to create the volume on the client.
//
// NOTE: creating the volume on the client via the plugin can't be made
// atomic with the registration, and creating the volume provides values we
// want to write on the Volume in raft anyways.
err = v.createVolume(vol)
if err != nil {
return err
}
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, func() (uint64, error) {
// Attempt to create the volume on the client.
//
// NOTE: creating the volume on the client via the plugin can't be made
// atomic with the registration, and creating the volume provides values
// we want to write on the Volume in raft anyways.
if err = v.createVolume(vol); err != nil {
return 0, err
}

// Write a newly created or modified volume to raft. We create a new request
// here because we've likely mutated the volume.
_, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
// Write a newly created or modified volume to raft. We create a new
// request here because we've likely mutated the volume.
_, idx, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return err
}

Expand Down Expand Up @@ -356,24 +367,30 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
return err
}

// Attempt to register the volume on the client.
//
// NOTE: registering the volume on the client via the plugin can't be made
// atomic with the registration.
err = v.registerVolume(vol)
if err != nil {
return err
}
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, func() (uint64, error) {
// Attempt to register the volume on the client.
//
// NOTE: registering the volume on the client via the plugin can't be made
// atomic with the registration.
if err = v.registerVolume(vol); err != nil {
return 0, err
}

// Write a newly created or modified volume to raft. We create a new request
// here because we've likely mutated the volume.
_, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
// Write a newly created or modified volume to raft. We create a new
// request here because we've likely mutated the volume.
_, idx, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return err
}

Expand Down Expand Up @@ -608,8 +625,6 @@ func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *struct
return fmt.Errorf("missing volume ID to delete")
}

var index uint64

snap, err := v.srv.State().Snapshot()
if err != nil {
return err
Expand All @@ -631,14 +646,19 @@ func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *struct
return fmt.Errorf("volume %s in use by allocations: %v", id, allocIDs)
}

err = v.deleteVolume(vol)
if err != nil {
return err
}

_, index, err = v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, func() (uint64, error) {
if err := v.deleteVolume(vol); err != nil {
return 0, err
}
_, idx, err := v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
return err
}

Expand All @@ -665,3 +685,44 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {

return nil
}

// serializeCall serializes fn() per volume, so DHV plugins can assume that
// Nomad will not run concurrent operations for the same volume, and for us
// to avoid interleaving client RPCs with raft writes.
// Concurrent calls should all run eventually (or timeout, or server shutdown),
// but there is no guarantee that they will run in the order received.
// The passed fn is expected to return a raft index and error.
func (v *HostVolume) serializeCall(volumeID string, fn func() (uint64, error)) (uint64, error) {
timeout := 2 * time.Minute // 2x the client RPC timeout
for {
ctx, done := context.WithTimeout(v.srv.shutdownCtx, timeout)

loaded, occupied := v.volOps.LoadOrStore(volumeID, ctx)

if !occupied {
// run the fn!
index, err := fn()

// done() must come after Delete, so that other unblocked requests
// will Store a fresh context when they continue.
v.volOps.Delete(volumeID)
done()

return index, err
}

// another one is running; wait for it to finish.

// cancel the tentative context; we'll use the one we pulled from
// volOps (set by another RPC call) instead.
done()

otherCtx := loaded.(context.Context)
select {
case <-otherCtx.Done():
continue
case <-v.srv.shutdownCh:
return 0, structs.ErrNoLeader
}
}
}
136 changes: 135 additions & 1 deletion nomad/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/nomad/version"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)
Expand Down Expand Up @@ -294,7 +295,7 @@ func TestHostVolumeEndpoint_CreateRegisterGetDelete(t *testing.T) {
// win a race with the get RPC goroutine
time.AfterFunc(200*time.Millisecond, func() {
codec := rpcClient(t, srv)
var registerResp structs.HostVolumeRegisterResponse
var registerResp structs.HostVolumeCreateResponse
gulducat marked this conversation as resolved.
Show resolved Hide resolved
err := msgpackrpc.CallWithCodec(codec, "HostVolume.Register", registerReq, &registerResp)
must.NoError(t, err)
})
Expand Down Expand Up @@ -763,6 +764,113 @@ func TestHostVolumeEndpoint_placeVolume(t *testing.T) {
}
}

// TestHostVolumeEndpoint_concurrency checks that create/register/delete RPC
// calls can not run concurrently for a single volume.
func TestHostVolumeEndpoint_concurrency(t *testing.T) {
ci.Parallel(t)

srv, cleanup := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
t.Cleanup(cleanup)
testutil.WaitForLeader(t, srv.RPC)

c, node := newMockHostVolumeClient(t, srv, "default")

vol := &structs.HostVolume{
Name: "test-vol",
Namespace: "default",
NodeID: node.ID,
PluginID: "mkdir",
HostPath: "/pretend/path",
RequestedCapabilities: []*structs.HostVolumeCapability{
{
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
},
},
}
wr := structs.WriteRequest{Region: srv.Region()}

// create the volume for us to attempt concurrent operations on
c.setCreate(&cstructs.ClientHostVolumeCreateResponse{
VolumeName: "test-vol",
HostPath: "/pretend/path",
}, nil)
createReq := &structs.HostVolumeCreateRequest{
Volume: vol.Copy(), // copy because HostPath gets mutated
WriteRequest: wr,
}
var createResp structs.HostVolumeCreateResponse
must.NoError(t, srv.RPC("HostVolume.Create", createReq, &createResp))
volumeID := createResp.Volume.ID // used by delete

// prepare blocking channels in the mock client.
// sending a struct{} (or closing) the channel will unblock the operation
cCh, rCh, dCh := make(chan struct{}), make(chan struct{}), make(chan struct{})
c.setBlockChan("create", cCh)
c.setBlockChan("register", rCh)
c.setBlockChan("delete", dCh)

// each RPC call that finishes will put its name here
opCh := make(chan string)

// start all the RPCs concurrently
go func() {
createReq = &structs.HostVolumeCreateRequest{
Volume: vol.Copy(), // copy because HostPath gets mutated
WriteRequest: wr,
}
createResp = structs.HostVolumeCreateResponse{}
test.NoError(t, srv.RPC("HostVolume.Create", createReq, &createResp),
test.Sprint("Create method should not error"))
opCh <- "create"
}()
go func() {
registerReq := &structs.HostVolumeRegisterRequest{
Volume: vol.Copy(),
WriteRequest: wr,
}
var registerResp structs.HostVolumeRegisterResponse
test.NoError(t, srv.RPC("HostVolume.Register", registerReq, &registerResp),
test.Sprint("Register method should not error"))
opCh <- "register"
}()
go func() {
deleteReq := &structs.HostVolumeDeleteRequest{
VolumeID: volumeID,
WriteRequest: wr,
}
var deleteResp structs.HostVolumeDeleteResponse
test.NoError(t, srv.RPC("HostVolume.Delete", deleteReq, &deleteResp),
test.Sprint("Delete method should not error"))
opCh <- "delete"
}()

// helper pulls an operation from the channel, or timeout
pullOp := func() string {
select {
case op := <-opCh:
return op
case <-time.After(200 * time.Millisecond): // generous headroom
return "timeout"
}
}

must.Eq(t, "timeout", pullOp(), must.Sprint("nothing should be unblocked yet"))

close(rCh)
must.Eq(t, "register", pullOp(), must.Sprint("closing register channel should unblock Register"))

must.Eq(t, "timeout", pullOp(), must.Sprint("again blocked RPCs should remain so"))
gulducat marked this conversation as resolved.
Show resolved Hide resolved

close(cCh)
must.Eq(t, "create", pullOp(), must.Sprint("closing create channel should unblock Create"))

must.Eq(t, "timeout", pullOp(), must.Sprint("last RPC should still be blocked"))

close(dCh)
must.Eq(t, "delete", pullOp(), must.Sprint("closing delete channel should unblock Delete"))
}

// mockHostVolumeClient models client RPCs that have side-effects on the
// client host
type mockHostVolumeClient struct {
Expand All @@ -771,6 +879,10 @@ type mockHostVolumeClient struct {
nextCreateErr error
nextRegisterErr error
nextDeleteErr error
// blockChans are used to test server->client RPC serialization.
// this is separate from lock because no single method should block while
// holding the lock for the whole client.
blockChans sync.Map
}

// newMockHostVolumeClient configures a RPC-only Nomad test agent and returns a
Expand Down Expand Up @@ -819,9 +931,19 @@ func (v *mockHostVolumeClient) setDelete(errMsg string) {
v.nextDeleteErr = errors.New(errMsg)
}

func (v *mockHostVolumeClient) setBlockChan(operation string, ch chan struct{}) {
v.blockChans.Store(operation, ch)
}

func (v *mockHostVolumeClient) Create(
req *cstructs.ClientHostVolumeCreateRequest,
resp *cstructs.ClientHostVolumeCreateResponse) error {

// block until the concurrency test closes the channel
if ch, ok := v.blockChans.Load("create"); ok {
<-ch.(chan struct{})
}

v.lock.Lock()
defer v.lock.Unlock()
if v.nextCreateResponse == nil {
Expand All @@ -834,6 +956,12 @@ func (v *mockHostVolumeClient) Create(
func (v *mockHostVolumeClient) Register(
req *cstructs.ClientHostVolumeRegisterRequest,
resp *cstructs.ClientHostVolumeRegisterResponse) error {

// block until the concurrency test closes the channel
if ch, ok := v.blockChans.Load("register"); ok {
<-ch.(chan struct{})
}

v.lock.Lock()
defer v.lock.Unlock()
*resp = cstructs.ClientHostVolumeRegisterResponse{}
Expand All @@ -843,6 +971,12 @@ func (v *mockHostVolumeClient) Register(
func (v *mockHostVolumeClient) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {

// block until the concurrency test closes the channel
if ch, ok := v.blockChans.Load("delete"); ok {
<-ch.(chan struct{})
}

v.lock.Lock()
defer v.lock.Unlock()
return v.nextDeleteErr
Expand Down
Loading