Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic host volumes: serialize ops per volume #24852

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 105 additions & 42 deletions nomad/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package nomad

import (
"context"
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"

"github.com/armon/go-metrics"
Expand All @@ -28,6 +30,9 @@ type HostVolume struct {
srv *Server
ctx *RPCContext
logger hclog.Logger

// volOps is used to serialize operations per volume ID
volOps sync.Map
}

func NewHostVolumeEndpoint(srv *Server, ctx *RPCContext) *HostVolume {
Expand Down Expand Up @@ -263,25 +268,31 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct
return err
}

// Attempt to create the volume on the client.
//
// NOTE: creating the volume on the client via the plugin can't be made
// atomic with the registration, and creating the volume provides values we
// want to write on the Volume in raft anyways.
err = v.createVolume(vol)
if err != nil {
return err
}
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, "create", func() (uint64, error) {
// Attempt to create the volume on the client.
//
// NOTE: creating the volume on the client via the plugin can't be made
// atomic with the registration, and creating the volume provides values
// we want to write on the Volume in raft anyways.
if err = v.createVolume(vol); err != nil {
return 0, err
}

// Write a newly created or modified volume to raft. We create a new request
// here because we've likely mutated the volume.
_, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
// Write a newly created or modified volume to raft. We create a new
// request here because we've likely mutated the volume.
_, idx, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return err
}

Expand Down Expand Up @@ -356,24 +367,30 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
return err
}

// Attempt to register the volume on the client.
//
// NOTE: registering the volume on the client via the plugin can't be made
// atomic with the registration.
err = v.registerVolume(vol)
if err != nil {
return err
}
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, "register", func() (uint64, error) {
// Attempt to register the volume on the client.
//
// NOTE: registering the volume on the client via the plugin can't be made
// atomic with the registration.
if err = v.registerVolume(vol); err != nil {
return 0, err
}

// Write a newly created or modified volume to raft. We create a new request
// here because we've likely mutated the volume.
_, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
// Write a newly created or modified volume to raft. We create a new
// request here because we've likely mutated the volume.
_, idx, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
&structs.HostVolumeRegisterRequest{
Volume: vol,
WriteRequest: args.WriteRequest,
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "register")
return err
}

Expand Down Expand Up @@ -608,8 +625,6 @@ func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *struct
return fmt.Errorf("missing volume ID to delete")
}

var index uint64

snap, err := v.srv.State().Snapshot()
if err != nil {
return err
Expand All @@ -631,14 +646,19 @@ func (v *HostVolume) Delete(args *structs.HostVolumeDeleteRequest, reply *struct
return fmt.Errorf("volume %s in use by allocations: %v", id, allocIDs)
}

err = v.deleteVolume(vol)
if err != nil {
return err
}

_, index, err = v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
// serialize client RPC and raft write per volume ID
index, err := v.serializeCall(vol.ID, "delete", func() (uint64, error) {
if err := v.deleteVolume(vol); err != nil {
return 0, err
}
_, idx, err := v.srv.raftApply(structs.HostVolumeDeleteRequestType, args)
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
return 0, err
}
return idx, nil
})
if err != nil {
v.logger.Error("raft apply failed", "error", err, "method", "delete")
return err
}

Expand All @@ -665,3 +685,46 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {

return nil
}

// serializeCall serializes fn() per volume, so DHV plugins can assume that
// Nomad will not run concurrent operations for the same volume, and for us
// to avoid interleaving client RPCs with raft writes.
// Concurrent calls should all run eventually (or timeout, or server shutdown),
// but there is no guarantee that they will run in the order received.
// The passed fn is expected to return a raft index and error.
func (v *HostVolume) serializeCall(volumeID, op string, fn func() (uint64, error)) (uint64, error) {
timeout := 2 * time.Minute // 2x the client RPC timeout
for {
ctx, done := context.WithTimeout(v.srv.shutdownCtx, timeout)

loaded, occupied := v.volOps.LoadOrStore(volumeID, ctx)

if !occupied {
v.logger.Trace("HostVolume RPC running ", "operation", op)
// run the fn!
index, err := fn()

// done() must come after Delete, so that other unblocked requests
// will Store a fresh context when they continue.
v.volOps.Delete(volumeID)
done()

return index, err
}

// another one is running; wait for it to finish.
v.logger.Trace("HostVolume RPC waiting", "operation", op)

// cancel the tentative context; we'll use the one we pulled from
// volOps (set by another RPC call) instead.
done()

otherCtx := loaded.(context.Context)
select {
case <-otherCtx.Done():
continue
case <-v.srv.shutdownCh:
return 0, structs.ErrNoLeader
}
}
}
Loading
Loading