Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup: use rbd.Manager for fetching volumes in CSI-Addons operations #5092

Open
wants to merge 4 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions internal/csi-addons/rbd/encryptionkeyrotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ import (

type EncryptionKeyRotationServer struct {
*ekr.UnimplementedEncryptionKeyRotationControllerServer
volLock *util.VolumeLocks
driverInstance string
volLock *util.VolumeLocks
}

func NewEncryptionKeyRotationServer(volLock *util.VolumeLocks) *EncryptionKeyRotationServer {
return &EncryptionKeyRotationServer{volLock: volLock}
func NewEncryptionKeyRotationServer(driverInstance string, volLock *util.VolumeLocks) *EncryptionKeyRotationServer {
return &EncryptionKeyRotationServer{
driverInstance: driverInstance,
volLock: volLock,
}
}

func (ekrs *EncryptionKeyRotationServer) RegisterService(svc grpc.ServiceRegistrar) {
Expand All @@ -58,15 +62,10 @@ func (ekrs *EncryptionKeyRotationServer) EncryptionKeyRotate(
}
defer ekrs.volLock.Release(volID)

// Get the credentials required to authenticate
// against a ceph cluster
creds, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer creds.DeleteCredentials()
mgr := rbd.NewManager(ekrs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := rbd.GenVolFromVolID(ctx, volID, creds, req.GetSecrets())
rbdVol, err := mgr.GetVolumeByID(ctx, volID)
if err != nil {
switch {
case errors.Is(err, rbd.ErrImageNotFound):
Expand Down
27 changes: 16 additions & 11 deletions internal/csi-addons/rbd/reclaimspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ import (
// of CSI-addons reclaimspace controller service spec.
type ReclaimSpaceControllerServer struct {
*rs.UnimplementedReclaimSpaceControllerServer
volumeLocks *util.VolumeLocks

driverInstance string
volumeLocks *util.VolumeLocks
}

// NewReclaimSpaceControllerServer creates a new ReclaimSpaceControllerServer which handles
// the ReclaimSpace Service requests from the CSI-Addons specification.
func NewReclaimSpaceControllerServer(volumeLocks *util.VolumeLocks) *ReclaimSpaceControllerServer {
return &ReclaimSpaceControllerServer{volumeLocks: volumeLocks}
func NewReclaimSpaceControllerServer(
driverInstance string,
volumeLocks *util.VolumeLocks,
) *ReclaimSpaceControllerServer {
return &ReclaimSpaceControllerServer{
driverInstance: driverInstance,
volumeLocks: volumeLocks,
}
}

func (rscs *ReclaimSpaceControllerServer) RegisterService(server grpc.ServiceRegistrar) {
Expand All @@ -59,26 +67,23 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace(
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
}

cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()

if acquired := rscs.volumeLocks.TryAcquire(volumeID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID)

return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rscs.volumeLocks.Release(volumeID)

rbdVol, err := rbdutil.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
mgr := rbdutil.NewManager(rscs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error())
}
defer rbdVol.Destroy(ctx)

err = rbdVol.Sparsify()
err = rbdVol.Sparsify(ctx)
if errors.Is(err, rbdutil.ErrImageInUse) {
// FIXME: https://github.com/csi-addons/kubernetes-csi-addons/issues/406.
// treat sparsify call as no-op if volume is in use.
Expand Down
2 changes: 1 addition & 1 deletion internal/csi-addons/rbd/reclaimspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
func TestControllerReclaimSpace(t *testing.T) {
t.Parallel()

controller := NewReclaimSpaceControllerServer(util.NewVolumeLocks())
controller := NewReclaimSpaceControllerServer("test.driver", util.NewVolumeLocks())

req := &rs.ControllerReclaimSpaceRequest{
VolumeId: "",
Expand Down
18 changes: 9 additions & 9 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,16 @@ type ReplicationServer struct {
*replication.UnimplementedControllerServer
// Embed ControllerServer as it implements helper functions
*corerbd.ControllerServer
// csiID is the unique ID for this CSI-driver deployment.
csiID string
// driverInstance is the unique ID for this CSI-driver deployment.
driverInstance string
}

// NewReplicationServer creates a new ReplicationServer which handles
// the Replication Service requests from the CSI-Addons specification.
func NewReplicationServer(instanceID string, c *corerbd.ControllerServer) *ReplicationServer {
return &ReplicationServer{
ControllerServer: c,
csiID: instanceID,
driverInstance: instanceID,
}
}

Expand Down Expand Up @@ -277,7 +277,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)

mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down Expand Up @@ -347,7 +347,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)

mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down Expand Up @@ -415,7 +415,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)

mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down Expand Up @@ -508,7 +508,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)

mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down Expand Up @@ -618,7 +618,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
mgr := rbd.NewManager(rs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down Expand Up @@ -827,7 +827,7 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
mgr := rbd.NewManager(rs.csiID, nil, req.GetSecrets())
mgr := rbd.NewManager(rs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

rbdVol, err := mgr.GetVolumeByID(ctx, volumeID)
Expand Down
14 changes: 7 additions & 7 deletions internal/csi-addons/rbd/volumegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ type VolumeGroupServer struct {
// don't need to add all RPC methods leading to forward compatibility.
*volumegroup.UnimplementedControllerServer

// csiID is the unique ID for this CSI-driver deployment.
csiID string
// driverInstance is the unique ID for this CSI-driver deployment.
driverInstance string
}

// NewVolumeGroupServer creates a new VolumeGroupServer which handles the
// VolumeGroup Service requests from the CSI-Addons specification.
func NewVolumeGroupServer(instanceID string) *VolumeGroupServer {
return &VolumeGroupServer{
csiID: instanceID,
driverInstance: instanceID,
}
}

Expand Down Expand Up @@ -85,7 +85,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(vs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve all volumes
Expand Down Expand Up @@ -188,7 +188,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
mgr := rbd.NewManager(vs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
Expand Down Expand Up @@ -285,7 +285,7 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
ctx context.Context,
req *volumegroup.ModifyVolumeGroupMembershipRequest,
) (*volumegroup.ModifyVolumeGroupMembershipResponse, error) {
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
mgr := rbd.NewManager(vs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
Expand Down Expand Up @@ -427,7 +427,7 @@ func (vs *VolumeGroupServer) ControllerGetVolumeGroup(
ctx context.Context,
req *volumegroup.ControllerGetVolumeGroupRequest,
) (*volumegroup.ControllerGetVolumeGroupResponse, error) {
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
mgr := rbd.NewManager(vs.driverInstance, nil, req.GetSecrets())
defer mgr.Destroy(ctx)

// resolve the volume group
Expand Down
3 changes: 2 additions & 1 deletion internal/rbd/diskusage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rbd

import (
"context"
"fmt"
)

Expand All @@ -25,7 +26,7 @@ import (
// of the image.
// This function will return ErrImageInUse if the image is in use, since
// sparsifying an image on which i/o is in progress is not optimal.
func (ri *rbdImage) Sparsify() error {
func (ri *rbdImage) Sparsify(_ context.Context) error {
inUse, err := ri.isInUse()
if err != nil {
return fmt.Errorf("failed to check if image is in use: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
r.cas.RegisterService(is)

if conf.IsControllerServer {
rs := casrbd.NewReclaimSpaceControllerServer(r.cs.VolumeLocks)
rs := casrbd.NewReclaimSpaceControllerServer(conf.InstanceID, r.cs.VolumeLocks)
r.cas.RegisterService(rs)

fcs := casrbd.NewFenceControllerServer()
Expand All @@ -249,7 +249,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
rs := casrbd.NewReclaimSpaceNodeServer(r.ns.VolumeLocks)
r.cas.RegisterService(rs)

ekr := casrbd.NewEncryptionKeyRotationServer(r.ns.VolumeLocks)
ekr := casrbd.NewEncryptionKeyRotationServer(conf.InstanceID, r.ns.VolumeLocks)
r.cas.RegisterService(ekr)
}

Expand Down
26 changes: 13 additions & 13 deletions internal/rbd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
var _ types.Manager = &rbdManager{}

type rbdManager struct {
// csiID is the instance id of the CSI-driver (driver name).
csiID string
// driverInstance is the instance id of the CSI-driver (driver name).
driverInstance string
// parameters can contain the parameters of a create request.
parameters map[string]string
// secrets contain the credentials to connect to the Ceph cluster.
Expand All @@ -46,11 +46,11 @@ type rbdManager struct {

// NewManager returns a new manager for handling Volume and Volume Group
// operations, combining the requests for RBD and the journalling in RADOS.
func NewManager(csiID string, parameters, secrets map[string]string) types.Manager {
func NewManager(driverInstance string, parameters, secrets map[string]string) types.Manager {
return &rbdManager{
csiID: csiID,
parameters: parameters,
secrets: secrets,
driverInstance: driverInstance,
parameters: parameters,
secrets: secrets,
}
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGr
return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err)
}

vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.csiID, ns)
vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.driverInstance, ns)

vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds)
if err != nil {
Expand Down Expand Up @@ -221,7 +221,7 @@ func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types
return nil, err
}

vg, err := rbd_group.GetVolumeGroup(ctx, id, mgr.csiID, creds, mgr)
vg, err := rbd_group.GetVolumeGroup(ctx, id, mgr.driverInstance, creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
}
Expand Down Expand Up @@ -302,7 +302,7 @@ func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (type
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
}

vg, err := rbd_group.GetVolumeGroup(ctx, csiID, mgr.csiID, creds, mgr)
vg, err := rbd_group.GetVolumeGroup(ctx, csiID, mgr.driverInstance, creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err)
}
Expand All @@ -329,7 +329,7 @@ func (mgr *rbdManager) GetVolumeGroupSnapshotByID(
return nil, err
}

vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, id, mgr.csiID, creds, mgr)
vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, id, mgr.driverInstance, creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func (mgr *rbdManager) GetVolumeGroupSnapshotByName(
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid %q: %w", uuid, err)
}

vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, csiID, mgr.csiID, mgr.creds, mgr)
vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, csiID, mgr.driverInstance, mgr.creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get existing volume group snapshot with uuid %q: %w", uuid, err)
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
}

vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, groupID, mgr.csiID, mgr.creds, mgr)
vgs, err := rbd_group.GetVolumeGroupSnapshot(ctx, groupID, mgr.driverInstance, mgr.creds, mgr)
if vgs != nil {
log.DebugLog(ctx, "found existing volume group snapshot %q for id %q", vgs, groupID)

Expand Down Expand Up @@ -494,7 +494,7 @@ func (mgr *rbdManager) CreateVolumeGroupSnapshot(

log.DebugLog(ctx, "volume group snapshot %q contains %d snapshots: %v", name, len(snapshots), snapshots)

vgs, err = rbd_group.NewVolumeGroupSnapshot(ctx, groupID, mgr.csiID, mgr.creds, snapshots)
vgs, err = rbd_group.NewVolumeGroupSnapshot(ctx, groupID, mgr.driverInstance, mgr.creds, snapshots)
if err != nil {
return nil, fmt.Errorf("failed to create new volume group snapshot %q: %w", name, err)
}
Expand Down
Loading
Loading