Skip to content

Commit

Permalink
Merge pull request #1886 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…1876-to-release-1.15

[release-1.15] Refactor metric defer() statements to gRPC metric interceptor
  • Loading branch information
k8s-ci-robot authored Dec 7, 2024
2 parents 9b98537 + 984b61c commit 6491802
Show file tree
Hide file tree
Showing 12 changed files with 379 additions and 123 deletions.
4 changes: 3 additions & 1 deletion cmd/gce-pd-csi-driver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func handle() {
}()
}

var metricsManager *metrics.MetricsManager = nil
if *runControllerService && *httpEndpoint != "" {
mm := metrics.NewMetricsManager()
mm.InitializeHttpHandler(*httpEndpoint, *metricsPath)
Expand All @@ -151,6 +152,7 @@ func handle() {
if metrics.IsGKEComponentVersionAvailable() {
mm.EmitGKEComponentVersion()
}
metricsManager = &mm
}

if len(*extraVolumeLabelsStr) > 0 && !*runControllerService {
Expand Down Expand Up @@ -261,7 +263,7 @@ func handle() {
gce.WaitForOpBackoff.Steps = *waitForOpBackoffSteps
gce.WaitForOpBackoff.Cap = *waitForOpBackoffCap

gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing)
gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager)
}

func notEmpty(v string) bool {
Expand Down
102 changes: 17 additions & 85 deletions pkg/gce-pd-csi-driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"math/rand"
neturl "net/url"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -303,12 +302,14 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre

func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags)
metrics.UpdateRequestMetadataFromParams(ctx, params)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error())
}

// Validate arguments
volumeCapabilities := req.GetVolumeCapabilities()
capacityRange := req.GetCapacityRange()
Expand All @@ -328,17 +329,6 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "VolumeCapabilities is invalid: %v", err.Error())
}

// Apply Parameters (case-insensitive). We leave validation of
// the values to the cloud provider.
params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags)
diskTypeForMetric = params.DiskType
enableConfidentialCompute = strconv.FormatBool(params.EnableConfidentialCompute)
hasStoragePools := len(params.StoragePools) > 0
enableStoragePools = strconv.FormatBool(hasStoragePools)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error())
}
// https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume
// mutable_parameters MUST take precedence over the values from parameters.
mutableParams := req.GetMutableParameters()
Expand Down Expand Up @@ -782,14 +772,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
return nil, status.Error(codes.InvalidArgument, "volume ID must be provided")
}

diskType := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools

defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerModifyVolume", err, diskType, enableConfidentialCompute, enableStoragePools)
}()

project, volKey, err := common.VolumeIDToKey(volumeID)
if err != nil {
// Cannot find volume associated with this ID because VolumeID is not in the correct format
Expand All @@ -806,6 +788,7 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
klog.V(4).Infof("Modify Volume Parameters for %s: %v", volumeID, volumeModifyParams)

existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionBeta)
metrics.UpdateRequestMetadataFromDisk(ctx, existingDisk)

if err != nil {
err = fmt.Errorf("Failed to get volume: %w", err)
Expand All @@ -816,9 +799,9 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
err = status.Errorf(codes.Internal, "failed to get volume : %s", volumeID)
return nil, err
}
diskType = existingDisk.GetPDType()

// Check if the disk supports dynamic IOPS/Throughput provisioning
diskType := existingDisk.GetPDType()
supportsIopsChange := gceCS.diskSupportsIopsChange(diskType)
supportsThroughputChange := gceCS.diskSupportsThroughputChange(diskType)
if !supportsIopsChange && !supportsThroughputChange {
Expand All @@ -834,8 +817,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re
return nil, err
}

enableStoragePools = strconv.FormatBool(existingDisk.GetEnableStoragePools())

err = gceCS.CloudProvider.UpdateDisk(ctx, project, volKey, existingDisk, volumeModifyParams)
if err != nil {
klog.Errorf("Failed to modify volume %s: %v", volumeID, err)
Expand Down Expand Up @@ -883,12 +864,6 @@ func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion {
func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) {
// List disks with same name
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
existingZones := []string{gceCS.CloudProvider.GetDefaultZone()}
zones, err := getDefaultZonesInRegion(ctx, gceCS, existingZones)
if err != nil {
Expand All @@ -910,7 +885,7 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *
}
disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, zonalVolKey, gce.GCEAPIVersionV1)
// TODO: Consolidate the parameters here, rather than taking the last.
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
err := gceCS.CloudProvider.DeleteDisk(ctx, project, zonalVolKey)
if err != nil {
deleteDiskErrs = append(deleteDiskErrs, gceCS.CloudProvider.DeleteDisk(ctx, project, volKey))
Expand All @@ -927,12 +902,6 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *

func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
volumeID := req.GetVolumeId()
project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey)
if err != nil {
Expand All @@ -948,7 +917,7 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re
}
defer gceCS.volumeLocks.Release(volumeID)
disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey)
if err != nil {
return nil, common.LoggedError("Failed to delete disk: ", err)
Expand All @@ -960,12 +929,6 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re

func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Only valid requests will be accepted
_, _, _, err = gceCS.validateControllerPublishVolumeRequest(ctx, req)
if err != nil {
Expand All @@ -978,7 +941,7 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r
}

resp, err, disk := gceCS.executeControllerPublishVolume(ctx, req)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
gceCS.errorBackoff.next(backoffId, common.CodeForError(err))
Expand Down Expand Up @@ -1192,12 +1155,6 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con

func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
_, _, err = gceCS.validateControllerUnpublishVolumeRequest(ctx, req)
if err != nil {
return nil, err
Expand All @@ -1209,7 +1166,7 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context,
return nil, status.Errorf(gceCS.errorBackoff.code(backoffId), "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId)
}
resp, err, disk := gceCS.executeControllerUnpublishVolume(ctx, req)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err)
gceCS.errorBackoff.next(backoffId, common.CodeForError(err))
Expand Down Expand Up @@ -1316,12 +1273,6 @@ func (gceCS *GCEControllerServer) parameterProcessor() *common.ParameterProcesso

func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ValidateVolumeCapabilities", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
if req.GetVolumeCapabilities() == nil || len(req.GetVolumeCapabilities()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities must be provided")
}
Expand All @@ -1348,7 +1299,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context
defer gceCS.volumeLocks.Release(volumeID)

disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error())
Expand Down Expand Up @@ -1564,12 +1515,6 @@ func (gceCS *GCEControllerServer) ControllerGetCapabilities(ctx context.Context,

func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Validate arguments
volumeID := req.GetSourceVolumeId()
if len(req.Name) == 0 {
Expand All @@ -1595,7 +1540,7 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C

// Check if volume exists
disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk)
metrics.UpdateRequestMetadataFromDisk(ctx, disk)
if err != nil {
if gce.IsGCENotFoundError(err) {
return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error())
Expand Down Expand Up @@ -1823,12 +1768,6 @@ func isCSISnapshotReady(status string) (bool, error) {

func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("DeleteSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
// Validate arguments
snapshotID := req.GetSnapshotId()
if len(snapshotID) == 0 {
Expand Down Expand Up @@ -1913,14 +1852,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li
}

func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {

var err error
diskTypeForMetric := metrics.DefaultDiskTypeForMetric
enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute
enableStoragePools := metrics.DefaultEnableStoragePools
defer func() {
gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools)
}()
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided")
Expand Down Expand Up @@ -1950,7 +1882,7 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re
}

sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1)
diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(sourceDisk)
metrics.UpdateRequestMetadataFromDisk(ctx, sourceDisk)
resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes)

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/gce-pd-csi-driver/gce-pd-driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils"
gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute"
metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"
mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager"
)

Expand Down Expand Up @@ -170,12 +171,12 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err
}
}

func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool) {
func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool, metricsManager *metrics.MetricsManager) {
maxLogChar = grpcLogCharCap

klog.V(4).Infof("Driver: %v", gceDriver.name)
//Start the nonblocking GRPC
s := NewNonBlockingGRPCServer(enableOtelTracing)
s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager)
// TODO(#34): Only start specific servers based on a flag.
// In the future have this only run specific combinations of servers depending on which version this is.
// The schema for that was in util. basically it was just s.start but with some nil servers.
Expand Down
11 changes: 8 additions & 3 deletions pkg/gce-pd-csi-driver/gce-pd-driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExe
return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider)
}

func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
vendorVersion := "test-vendor"
func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer {
gceDriver := GetGCEDriver()
errorBackoffInitialDuration := 200 * time.Millisecond
errorBackoffMaxDuration := 5 * time.Minute
Expand All @@ -55,7 +54,13 @@ func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute)
SupportsThroughputChange: []string{"hyperdisk-balanced", "hyperdisk-throughput", "hyperdisk-ml"},
}

controllerServer := NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig)
}

func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver {
vendorVersion := "test-vendor"
gceDriver := GetGCEDriver()
controllerServer := controllerServerForTest(cloudProvider)
err := gceDriver.SetupGCEDriver(driver, vendorVersion, nil, nil, nil, controllerServer, nil)
if err != nil {
t.Fatalf("Failed to setup GCE Driver: %v", err)
Expand Down
23 changes: 16 additions & 7 deletions pkg/gce-pd-csi-driver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics"

csi "github.com/container-storage-interface/spec/lib/go/csi"
)
Expand All @@ -40,15 +41,16 @@ type NonBlockingGRPCServer interface {
ForceStop()
}

func NewNonBlockingGRPCServer(enableOtelTracing bool) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{otelTracing: enableOtelTracing}
func NewNonBlockingGRPCServer(enableOtelTracing bool, metricsManager *metrics.MetricsManager) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{otelTracing: enableOtelTracing, metricsManager: metricsManager}
}

// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
otelTracing bool
wg sync.WaitGroup
server *grpc.Server
otelTracing bool
metricsManager *metrics.MetricsManager
}

func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
Expand All @@ -73,10 +75,17 @@ func (s *nonBlockingGRPCServer) ForceStop() {
}

func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
grpcInterceptor := grpc.UnaryInterceptor(logGRPC)
interceptors := []grpc.UnaryServerInterceptor{logGRPC}
if s.metricsManager != nil {
metricsInterceptor := metrics.MetricInterceptor{
MetricsManager: s.metricsManager,
}
interceptors = append(interceptors, metricsInterceptor.UnaryInterceptor())
}
if s.otelTracing {
grpcInterceptor = grpc.ChainUnaryInterceptor(logGRPC, otelgrpc.UnaryServerInterceptor())
interceptors = append(interceptors, otelgrpc.UnaryServerInterceptor())
}
grpcInterceptor := grpc.ChainUnaryInterceptor(interceptors...)

opts := []grpc.ServerOption{
grpcInterceptor,
Expand Down
Loading

0 comments on commit 6491802

Please sign in to comment.