Skip to content

Commit

Permalink
feat: Provide hostname for zonal disk insert API
Browse files Browse the repository at this point in the history
CL contains changes to pass in hostname for zonal disk.insert API as a
part of location hint field. Code changes for regional disk API will be
part of future CLs.
  • Loading branch information
Rishita-Golla committed Nov 7, 2024
1 parent cf8da66 commit ba2bb17
Show file tree
Hide file tree
Showing 250 changed files with 28,602 additions and 24 deletions.
18 changes: 17 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"strings"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

Expand Down Expand Up @@ -72,6 +74,7 @@ var (
formatAndMountTimeout = flag.Duration("format-and-mount-timeout", 1*time.Minute, "The maximum duration of a format and mount operation before another such operation will be started. Used only if --serialize-format-and-mount")
fallbackRequisiteZonesFlag = flag.String("fallback-requisite-zones", "", "Comma separated list of requisite zones that will be used if there are not sufficient zones present in requisite topologies when provisioning a disk")
enableStoragePoolsFlag = flag.Bool("enable-storage-pools", false, "If set to true, the CSI Driver will allow volumes to be provisioned in Storage Pools")
enableVMLocationHint = flag.Bool("enable-vm-location-hint", false, "If set to true, the location hint field for create volume request will have hostname set")

multiZoneVolumeHandleDiskTypesFlag = flag.String("multi-zone-volume-handle-disk-types", "", "Comma separated list of allowed disk types that can use the multi-zone volumeHandle. Used only if --multi-zone-volume-handle-enable")
multiZoneVolumeHandleEnableFlag = flag.Bool("multi-zone-volume-handle-enable", false, "If set to true, the multi-zone volumeHandle feature will be enabled")
Expand Down Expand Up @@ -244,7 +247,20 @@ func handle() {
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, identityServer, controllerServer, nodeServer)
var kubeClient *kubernetes.Clientset
if *enableVMLocationHint {
cfg, err := rest.InClusterConfig()
if err != nil {
klog.Fatalf("Could not fetch in-cluster config: %v", err.Error())
}

kubeClient, err = kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Could not fetch in-cluster client: %v", err.Error())
}
}

err = gceDriver.SetupGCEDriver(driverName, version, extraVolumeLabels, extraTags, kubeClient, identityServer, controllerServer, nodeServer)
if err != nil {
klog.Fatalf("Failed to initialize GCE CSI Driver: %v", err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.5.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d/go.mod h1:ZZMPRZwes7CROmyNKgQzC3XPs6L/G2EJLHddWejkmf4=
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
Expand Down
7 changes: 6 additions & 1 deletion pkg/gce-cloud-provider/compute/fake-gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (cloud *FakeCloudProvider) ValidateExistingDisk(ctx context.Context, resp *
return ValidateDiskParameters(resp, params)
}

func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error {
func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error {
if disk, ok := cloud.disks[volKey.String()]; ok {
err := cloud.ValidateExistingDisk(ctx, disk, params,
int64(capacityRange.GetRequiredBytes()),
Expand All @@ -242,6 +242,7 @@ func (cloud *FakeCloudProvider) InsertDisk(ctx context.Context, project string,
Labels: params.Labels,
ProvisionedIops: params.ProvisionedIOPSOnCreate,
ProvisionedThroughput: params.ProvisionedThroughputOnCreate,
LocationHint: cloud.GetLocationHintURI(project, volKey.Zone, hostName),
}

if snapshotID != "" {
Expand Down Expand Up @@ -382,6 +383,10 @@ func (cloud *FakeCloudProvider) GetDiskTypeURI(project string, volKey *meta.Key,
}
}

func (cloud *FakeCloudProvider) GetLocationHintURI(project, zone, hostName string) string {
return fmt.Sprintf(locationHintURITemplate, project, zone, hostName)
}

func (cloud *FakeCloudProvider) getZonalDiskTypeURI(project, zone, diskType string) string {
return fmt.Sprintf(diskTypeURITemplateSingleZone, project, zone, diskType)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/gce-cloud-provider/compute/gce-compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type GCECompute interface {
GetDisk(ctx context.Context, project string, volumeKey *meta.Key, gceAPIVersion GCEAPIVersion) (*CloudDisk, error)
RepairUnderspecifiedVolumeKey(ctx context.Context, project string, volumeKey *meta.Key) (string, *meta.Key, error)
ValidateExistingDisk(ctx context.Context, disk *CloudDisk, params common.DiskParameters, reqBytes, limBytes int64, multiWriter bool) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error
InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error
DeleteDisk(ctx context.Context, project string, volumeKey *meta.Key) error
UpdateDisk(ctx context.Context, project string, volKey *meta.Key, existingDisk *CloudDisk, params common.ModifyVolumeParameters) error
AttachDisk(ctx context.Context, project string, volKey *meta.Key, readWrite, diskType, instanceZone, instanceName string, forceAttach bool) error
Expand Down Expand Up @@ -436,7 +436,7 @@ func ValidateDiskParameters(disk *CloudDisk, params common.DiskParameters) error
return nil
}

func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) error {
func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volKey *meta.Key, params common.DiskParameters, capBytes int64, capacityRange *csi.CapacityRange, replicaZones []string, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) error {
klog.V(5).Infof("Inserting disk %v", volKey)

description, err := encodeTags(params.Tags)
Expand All @@ -449,7 +449,7 @@ func (cloud *CloudProvider) InsertDisk(ctx context.Context, project string, volK
if description == "" {
description = "Disk created by GCE-PD CSI Driver"
}
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode)
return cloud.insertZonalDisk(ctx, project, volKey, params, capBytes, capacityRange, snapshotID, volumeContentSourceVolumeID, description, multiWriter, accessMode, hostName)
case meta.Regional:
if description == "" {
description = "Regional disk created by GCE-PD CSI Driver"
Expand Down Expand Up @@ -772,7 +772,7 @@ func (cloud *CloudProvider) insertZonalDisk(
volumeContentSourceVolumeID string,
description string,
multiWriter bool,
accessMode string) error {
accessMode, hostName string) error {
var (
err error
opName string
Expand All @@ -788,6 +788,7 @@ func (cloud *CloudProvider) insertZonalDisk(
Description: description,
Type: cloud.GetDiskTypeURI(project, volKey, params.DiskType),
Labels: params.Labels,
LocationHint: cloud.GetLocationHintURI(project, volKey.Zone, hostName),
}

if params.ProvisionedIOPSOnCreate > 0 {
Expand Down Expand Up @@ -1107,6 +1108,10 @@ func (cloud *CloudProvider) GetDiskTypeURI(project string, volKey *meta.Key, dis
}
}

func (cloud *CloudProvider) GetLocationHintURI(project, zone, hostName string) string {
return fmt.Sprintf(locationHintURITemplate, project, zone, hostName)
}

func (cloud *CloudProvider) getZonalDiskTypeURI(project string, zone, diskType string) string {
return cloud.service.BasePath + fmt.Sprintf(diskTypeURITemplateSingleZone, project, zone, diskType)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/gce-cloud-provider/compute/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
diskTypeURITemplateRegional = "projects/%s/regions/%s/diskTypes/%s" // {gce.projectID}/regions/{disk.Region}/diskTypes/{disk.Type}"

regionURITemplate = "projects/%s/regions/%s"
locationHintURITemplate = "projects/%s/zone/%s/instance/%s"

replicaZoneURITemplateSingleZone = "projects/%s/zones/%s" // {gce.projectID}/zones/{disk.Zone}
EnvironmentStaging Environment = "staging"
Expand Down
37 changes: 31 additions & 6 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
Expand Down Expand Up @@ -208,6 +210,8 @@ const (
listDisksUsersField = googleapi.Field("items/users")

readOnlyManyAccessMode = "READ_ONLY_MANY"

annSelectedNode = "volume.kubernetes.io/selected-node"
)

var (
Expand Down Expand Up @@ -720,21 +724,23 @@ func (gceCS *GCEControllerServer) createSingleDisk(ctx context.Context, req *csi
// Create the disk
var disk *gce.CloudDisk
name := req.GetName()
pvcName, pvcNamespace := req.Parameters[common.ParameterKeyPVCName], req.Parameters[common.ParameterKeyPVCNamespace]
hostName := getHostNameFromPVC(ctx, pvcName, pvcNamespace, gceCS.Driver.kubeClient)

switch params.ReplicationType {
case replicationTypeNone:
if len(zones) != 1 {
return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a single zone for creating zonal disk, instead got: %v", zones)
}
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
disk, err = createSingleZoneDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, common.LoggedError("CreateVolume failed to create single zonal disk "+name+": ", err)
}
case replicationTypeRegionalPD:
if len(zones) != 2 {
return nil, status.Errorf(codes.Internal, "CreateVolume failed to get a 2 zones for creating regional disk, instead got: %v", zones)
}
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
disk, err = createRegionalDisk(ctx, gceCS.CloudProvider, name, zones, params, capacityRange, capBytes, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, "")
if err != nil {
return nil, common.LoggedError("CreateVolume failed to create regional disk "+name+": ", err)
}
Expand Down Expand Up @@ -872,6 +878,25 @@ func (gceCS *GCEControllerServer) DeleteVolume(ctx context.Context, req *csi.Del
return gceCS.deleteSingleDeviceDisk(ctx, req, project, volKey)
}

func getHostNameFromPVC(ctx context.Context, pvcName, pvcNamespace string, client kubernetes.Interface) string {
if client == nil {
// Client is initialized only when location hint is enabled
return ""
}

pvc, err := client.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get persistent volume claim %s: %v", pvcName, err)
// Don't return the error as create volume request can be continued without hostname
return ""
}
if hostName, ok := pvc.Annotations[annSelectedNode]; ok {
klog.V(4).Infof("Retrieved hostname %q from PVC %v", hostName, pvcName)
return hostName
}
return ""
}

func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion {
if multiWriter {
return gce.GCEAPIVersionBeta
Expand Down Expand Up @@ -2482,7 +2507,7 @@ func getResourceId(resourceLink string) (string, error) {
return strings.Join(elts[3:], "/"), nil
}

func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) {
func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
region, err := common.GetRegionFromZones(zones)
if err != nil {
Expand All @@ -2495,7 +2520,7 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
fullyQualifiedReplicaZones, cloudProvider.GetReplicaZoneURI(project, replicaZone))
}

err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
err = cloudProvider.InsertDisk(ctx, project, meta.RegionalKey(name, region), params, capBytes, capacityRange, fullyQualifiedReplicaZones, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, fmt.Errorf("failed to insert regional disk: %w", err)
}
Expand All @@ -2512,13 +2537,13 @@ func createRegionalDisk(ctx context.Context, cloudProvider gce.GCECompute, name
return disk, nil
}

func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode string) (*gce.CloudDisk, error) {
func createSingleZoneDisk(ctx context.Context, cloudProvider gce.GCECompute, name string, zones []string, params common.DiskParameters, capacityRange *csi.CapacityRange, capBytes int64, snapshotID string, volumeContentSourceVolumeID string, multiWriter bool, accessMode, hostName string) (*gce.CloudDisk, error) {
project := cloudProvider.GetDefaultProject()
if len(zones) != 1 {
return nil, fmt.Errorf("got wrong number of zones for zonal create volume: %v", len(zones))
}
diskZone := zones[0]
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode)
err := cloudProvider.InsertDisk(ctx, project, meta.ZonalKey(name, diskZone), params, capBytes, capacityRange, nil, snapshotID, volumeContentSourceVolumeID, multiWriter, accessMode, hostName)
if err != nil {
return nil, fmt.Errorf("failed to insert zonal disk: %w", err)
}
Expand Down
Loading

0 comments on commit ba2bb17

Please sign in to comment.